nodejs中的kafkajs,消费顺序,不重复消费
2024-09-08 17:56:47
参考:https://kafka.js.org/docs
确保同一个消息发送到同一个partition,一个topic,一个partition,一个consumer,内部单线程消费
1.封装kafkaUtil类
const {
Kafka,
logLevel
} = require('kafkajs')
//const cache = require('../conn/redis.js');
const kafka = new Kafka({
clientId: 'my-app',
brokers: [
"lcoalhost:8092",
"localhost:8093",
"localhost:8094",
"lcoalhost:8095",
"localhost:8096",
],
retry: {
retries: 8
},
logLevel: logLevel.ERROR
})
/**
* 如果groupId已存在重复的,建立不同的kafka实例会报错
*/
/**
* kafka生产者发送消息
* messages: [{
value: 'Hello KafkaJS user!',
}, {
value: 'Hello KafkaJS user2!',
}],
*/ exports.producer = async (topic, groupId, msg) => {
try {
const producer = kafka.producer({
groupId: groupId
})
await producer.connect()
await producer.send({
topic: topic,
messages: msg,
acks: 1
})
} catch (error) {
throw error;
} } exports.consumer = async (topic, groupId, callback) => {
try {
const consumer = kafka.consumer({
groupId: groupId
})
await consumer.connect()
await consumer.subscribe({
topic: topic
})
await consumer.run({
autoCommit: true,
eachMessage: async ({
topic,
partition,
message
}) => {
//防止重复消费数据
await consumer.commitOffsets([{
topic: topic,
partition: partition,
offset: Number(message.offset) + 1
}])
let msg = message.value.toString()
console.log(72, '消费者接收到的数据为:', msg);
callback(msg);
}
})
} catch (err) {
throw err;
}
}
2.producer.js
const kafka = require('./kafkaUtil');
(async function () {
const topic = 'MY——TOPIC1'
const groupId = 'MY——TOPIC1'
try {
for (let i = 0; i < 10000; i++) {
await new Promise((resolve, reject) => {
setTimeout(async () => {
resolve(1)
}, 1000)
}).then(async () => {
console.log('发送的数据为:', i)
await kafka.producer(topic, groupId, [{
key: "a",//key值为了保证消费者按照生产者生产的数据顺序,消费数据,key值必须一致;如果不需要消费者按照生产的顺序消费,key去掉即可,参考: https://www.zhihu.com/question/266390197
value: `${i}`
}])
})
}
} catch (error) {
console.log(14, error)
throw error;
} })()
3.consumer.js
const kafka = require('./kafkaUtil');
(async function () {
const fs = require('fs');
let count = 1;
const topic = 'MY——TOPIC1'
const groupId = 'MY——TOPIC1'
try {
await kafka.consumer(topic, groupId, async (msg) => {
let str = `第${count}接收到的数据为:${msg}`;
count++;
fs.writeFileSync(`${process.cwd()}/test01.txt`, str, {
flag: 'a',
})
console.log(str)
})
} catch (error) {
console.log(14, error)
throw error;
}
})()
经实际测试,没有发现消费问题。如有发现问题,请多多指教,谢谢。。。
最新文章
- jenkins+git+maven搭建自动化部署项目环境
- Spring学习4-面向切面(AOP)之aspectj注解方式
- RabbitMQ学习总结 第五篇:路由Routing
- 无法完成安装:&#39;unsupported configuration: hda-duplex not supported in this QEMU binary&#39;
- android开发无障碍app
- passport.js
- 【OpenCV】基于kmeans的细胞检测方法
- .NET 4.0 兼容 .NET 2.0 的方法
- CDZSC_2015寒假新人(1)——基础 d
- Ubuntu 12.04 中自定义DNS服务器设置
- Codevs2018 反病毒软件
- [leetcode-572-Subtree of Another Tree]
- python简单路由系统
- CentOS7 yum方式安装MySQL5.7
- Python之路(第三十三篇) 网络编程:socketserver深度解析
- 百度前端代码规范:CSS
- linux装tomcat遇到的坑
- prometheus远程连接m3db实现存储
- quartz储存方式之JDBC JobStoreTX
- list的迭代器能解决并发问题,collection 的迭代器不能解决并发问题,for可以解决并发问题
热门文章
- docker 搭建minio
- Java基础学习:12、代码块
- 微信开发者工具代码管理设置使用SSH Key(使用密码)方法
- 看到项目中的DateTimeFormat和JsonFormat就头大
- VMware vSphere Client(4.1/5.0/5.1/5.5/6.0) 客户端下载地址
- Java8函数式编程(A)
- ICPC2020 沈阳
- win7安装AutoCAD2019
- docker镜像原理(一)
- db2存储过程很慢如何查看