1.安装需要插件

npm install express

npm install  socket.io

npm install  kafka-node

2.kafkatest.js文件

var express = require('express');
var app = express(); var server = require('http').createServer(app);
var io = require('socket.io')(server);
var kafka = require('kafka-node');
var users = [];
app.use('/', express.static(__dirname + '/')); app.get('/send', function (req, res) {
var msg=req.query.msg;
var Producer = kafka.Producer,
client = new kafka.Client('localhost:2181'),
producer = new Producer(client);
payloads = [
{ topic: 'test', messages: msg, partition: 0 },
];
producer.on('ready', function(){
producer.send(payloads, function(err, data){
console.log(data);
// socket.emit('server_counter',data);
});
});
producer.on('error', function(err){}) res.send('输入消息='+msg);
})
server.listen(8080); setTimeout(function(){
var Consumer = kafka.Consumer;
var Offset = kafka.Offset;
var topic = 'test'; var client = new kafka.Client('localhost:2181');//'localhost:2181'
var topics = [{ topic: topic, partition: 0 }];
var options = { autoCommit: false };//, fetchMaxWaitMs: 1000, fetchMaxBytes: 1024 * 1024 var consumer = new Consumer(client, topics, options);
var offset = new Offset(client);
consumer.on('message', function (message) {
console.log(message);
io.sockets.emit('server_counter',message);
}); consumer.on('error', function (err) {
console.log('error', err);
});
consumer.on('offsetOutOfRange', function (topic) {
topic.maxNum = 2;
offset.fetch([topic], function (err, offsets) {
if (err) {
return console.error(err);
}
var min = Math.min.apply(null, offsets[topic.topic][topic.partition]);
consumer.setOffset(topic.topic, topic.partition, min);
});
}); },1000);

3.counter.html

<!DOCTYPE html>
<html> <head>
<title>socket</title>
</head> <body> <div style="margin: 0 auto;" id='msg'> </div>
</body>
<script type="text/javascript" src="./node_modules/socket.io-client/dist/socket.io.js"></script>
<script type="text/javascript">
var socket=io.connect('localhost:8080'),//与服务器进行连接
send=document.getElementById('sendBtn'),
leave=document.getElementById('leaveBtn'); //接收来自服务端的信息事件
socket.on('server_counter',function(msg){
var div =document.createElement('div');
div.innerHTML=JSON.stringify(msg);
document.getElementById('msg').appendChild(div);
})
</script> </html>

3.效果展示

启动 kafkatest.js

打开http://localhost:8080/counter.html

打开http://localhost:8080/send?msg=谢大神你好

最新文章

  1. React-Native android在windows下的踩坑记
  2. JMS开发(一):基础理论认知
  3. MySQL索引视图
  4. C#解析JSON数据
  5. ssh伪登陆执行远程主机脚本命令 C程序基于ssh passwordless执行远程主机命令及基于配置文件的验证伪登陆执行命令
  6. Mysql ODBC 5.1 Driver免安装脚本
  7. linux 下一个 jira-6.3.6 组态 皴 翻译 迁移数据库
  8. ImageView.ScaleType设置图解
  9. 详细了解 int? 类型
  10. monkeyrunner简介
  11. GitHub下载安装以及开源项目
  12. Caffe上用SSD训练和测试自己的数据
  13. [Android] SeekBar---可拖动进度条
  14. Java继承访问权限
  15. Python读取ini配置文件封装方法
  16. node中__dirname、__filename、process.cwd()表示的路径
  17. 【PaPaPa】系统架构搭建浅析 - 人人可以搭架构
  18. [转帖] .net 2.1 是 LTS长期支持版本.
  19. 样式 bootstrap purecss Amaze UI 推荐
  20. acdream 1735 输油管道 贪心

热门文章

  1. forEach方法
  2. webstorm/vs取消eslint
  3. 2019_7_30python
  4. 读书笔记二、ndarray的数据类型
  5. JUC 一 ReentrantLock 可重入锁
  6. NX二次开发-UFUN拾取平面对话框UF_UI_specify_plane
  7. 黑科技之杜教bm
  8. 其它课程中的python---3、numpy总结(非常全)
  9. Hadoop2.X主要模块默认端口及作用
  10. Ngui之UI框架的层级处理