参考:https://kafka.js.org/docs
确保同一个消息发送到同一个partition,一个topic,一个partition,一个consumer,内部单线程消费
1.封装kafkaUtil类

const {
Kafka,
logLevel
} = require('kafkajs')
//const cache = require('../conn/redis.js');
const kafka = new Kafka({
clientId: 'my-app',
brokers: [
"lcoalhost:8092",
"localhost:8093",
"localhost:8094",
"lcoalhost:8095",
"localhost:8096",
],
retry: {
retries: 8
},
logLevel: logLevel.ERROR
})
/**
* 如果groupId已存在重复的,建立不同的kafka实例会报错
*/
/**
* kafka生产者发送消息
* messages: [{
value: 'Hello KafkaJS user!',
}, {
value: 'Hello KafkaJS user2!',
}],
*/ exports.producer = async (topic, groupId, msg) => {
try {
const producer = kafka.producer({
groupId: groupId
})
await producer.connect()
await producer.send({
topic: topic,
messages: msg,
acks: 1
})
} catch (error) {
throw error;
} } exports.consumer = async (topic, groupId, callback) => {
try {
const consumer = kafka.consumer({
groupId: groupId
})
await consumer.connect()
await consumer.subscribe({
topic: topic
})
await consumer.run({
autoCommit: true,
eachMessage: async ({
topic,
partition,
message
}) => {
//防止重复消费数据
await consumer.commitOffsets([{
topic: topic,
partition: partition,
offset: Number(message.offset) + 1
}])
let msg = message.value.toString()
console.log(72, '消费者接收到的数据为:', msg);
callback(msg);
}
})
} catch (err) {
throw err;
}
}

2.producer.js

const kafka = require('./kafkaUtil');
(async function () {
const topic = 'MY——TOPIC1'
const groupId = 'MY——TOPIC1'
try {
for (let i = 0; i < 10000; i++) {
await new Promise((resolve, reject) => {
setTimeout(async () => {
resolve(1)
}, 1000)
}).then(async () => {
console.log('发送的数据为:', i)
await kafka.producer(topic, groupId, [{
key: "a",//key值为了保证消费者按照生产者生产的数据顺序,消费数据,key值必须一致;如果不需要消费者按照生产的顺序消费,key去掉即可,参考: https://www.zhihu.com/question/266390197
value: `${i}`
}])
})
}
} catch (error) {
console.log(14, error)
throw error;
} })()

3.consumer.js

const kafka = require('./kafkaUtil');
(async function () {
const fs = require('fs');
let count = 1;
const topic = 'MY——TOPIC1'
const groupId = 'MY——TOPIC1'
try {
await kafka.consumer(topic, groupId, async (msg) => {
let str = `第${count}接收到的数据为:${msg}`;
count++;
fs.writeFileSync(`${process.cwd()}/test01.txt`, str, {
flag: 'a',
})
console.log(str)
})
} catch (error) {
console.log(14, error)
throw error;
}
})()

经实际测试,没有发现消费问题。如有发现问题,请多多指教,谢谢。。。  

 

最新文章

  1. jenkins+git+maven搭建自动化部署项目环境
  2. Spring学习4-面向切面(AOP)之aspectj注解方式
  3. RabbitMQ学习总结 第五篇:路由Routing
  4. 无法完成安装:&#39;unsupported configuration: hda-duplex not supported in this QEMU binary&#39;
  5. android开发无障碍app
  6. passport.js
  7. 【OpenCV】基于kmeans的细胞检测方法
  8. .NET 4.0 兼容 .NET 2.0 的方法
  9. CDZSC_2015寒假新人(1)——基础 d
  10. Ubuntu 12.04 中自定义DNS服务器设置
  11. Codevs2018 反病毒软件
  12. [leetcode-572-Subtree of Another Tree]
  13. python简单路由系统
  14. CentOS7 yum方式安装MySQL5.7
  15. Python之路(第三十三篇) 网络编程:socketserver深度解析
  16. 百度前端代码规范:CSS
  17. linux装tomcat遇到的坑
  18. prometheus远程连接m3db实现存储
  19. quartz储存方式之JDBC JobStoreTX
  20. list的迭代器能解决并发问题,collection 的迭代器不能解决并发问题,for可以解决并发问题

热门文章

  1. docker 搭建minio
  2. Java基础学习:12、代码块
  3. 微信开发者工具代码管理设置使用SSH Key(使用密码)方法
  4. 看到项目中的DateTimeFormat和JsonFormat就头大
  5. VMware vSphere Client(4.1/5.0/5.1/5.5/6.0) 客户端下载地址
  6. Java8函数式编程(A)
  7. ICPC2020 沈阳
  8. win7安装AutoCAD2019
  9. docker镜像原理(一)
  10. db2存储过程很慢如何查看