1、安装kafka-python

执行命令

pip install kafka-python

kafka-python        1.4.6

2、编写python kafka 生产者消费者代码

# test.py

import sys
import time
import json from kafka import KafkaProducer
from kafka import KafkaConsumer
from kafka.errors import KafkaError KAFAKA_HOST = "127.0.0.1"
KAFAKA_PORT = 9092
KAFAKA_TOPIC = "test123" class Kafka_producer():
'''''
生产模块:根据不同的key,区分消息
''' def __init__(self, kafkahost,kafkaport, kafkatopic, key):
self.kafkaHost = kafkahost
self.kafkaPort = kafkaport
self.kafkatopic = kafkatopic
self.key = key
print("producer:h,p,t,k",kafkahost,kafkaport,kafkatopic,key)
bootstrap_servers = '{kafka_host}:{kafka_port}'.format(
kafka_host=self.kafkaHost,
kafka_port=self.kafkaPort
)
print("boot svr:",bootstrap_servers)
self.producer = KafkaProducer(bootstrap_servers = bootstrap_servers
) def sendjsondata(self, params):
try:
parmas_message = json.dumps(params,ensure_ascii=False)
producer = self.producer
print(parmas_message)
v = parmas_message.encode('utf-8')
k = key.encode('utf-8')
print("send msg:(k,v)",k,v)
producer.send(self.kafkatopic, key=k, value= v)
producer.flush()
except KafkaError as e:
print (e) class Kafka_consumer():
'''''
消费模块: 通过不同groupid消费topic里面的消息
''' def __init__(self, kafkahost, kafkaport, kafkatopic, groupid):
self.kafkaHost = kafkahost
self.kafkaPort = kafkaport
self.kafkatopic = kafkatopic
self.groupid = groupid
self.key = key
self.consumer = KafkaConsumer(self.kafkatopic, group_id = self.groupid,
bootstrap_servers = '{kafka_host}:{kafka_port}'.format(
kafka_host=self.kafkaHost,
kafka_port=self.kafkaPort )
) def consume_data(self):
try:
for message in self.consumer:
yield message
print("")
print(message)
except KeyboardInterrupt as e:
print (e) def main(xtype, group, key):
'''''
测试consumer和producer
'''
if xtype == "p":
# 生产模块
producer = Kafka_producer(KAFAKA_HOST, KAFAKA_PORT, KAFAKA_TOPIC, key)
print ("===========> producer:", producer)
for _id in range(100):
# params = '{"msg" : "%s"}' % str(_id)
params=[{"msg0" :_id},{"msg1" :_id}]
producer.sendjsondata(params)
time.sleep(1) if xtype == 'c':
# 消费模块
consumer = Kafka_consumer(KAFAKA_HOST, KAFAKA_PORT, KAFAKA_TOPIC, group)
print ("===========> consumer:", consumer)
message = consumer.consume_data()
print('')
print(message)
for msg in message:
print ('msg---------------->k,v', msg.key,msg.value)
print ('offset---------------->', msg.offset) if __name__ == '__main__':
xtype = sys.argv[1]
group = sys.argv[2]
key = sys.argv[3]
main(xtype, group, key)

3、启动kafka服务

打开终端 输入命令:

kafka-server-start /usr/local/etc/kafka/server.properties

4、新开一终端创建生产者

切换到程序路径执行如下指令

python test.py p g k

5、新开一终端创建消费者

切换到程序路径执行如下指令

python test.py c g k

至此已经完成kafka 的消息收发了。

最新文章

  1. Visual Studio 生成事件命令
  2. sql语句修改列
  3. html5 弹框 可用于安卓手机弹出输入框
  4. javascript的执行和预解析
  5. vue学习笔记之v-if
  6. FreeMarker 一二事 - 静态模板结合spring展示
  7. poj 1986
  8. OC-KVO简介
  9. mysql数据库优化日志(更)-howyue
  10. Delphi XE7中新并行库
  11. Java笔记:Java集合概述和Set集合
  12. 标准模式 怪异模式 盒模型 doctype
  13. 直播-rtmp学习
  14. 海量数据挖掘MMDS week4: 推荐系统Recommendation System
  15. jdk12+tomcat9 配置
  16. 【Go】深入剖析slice和array
  17. iFrame跨域解决办法
  18. 单例&多线程
  19. [Command] lrzsz - 文件传输工具包
  20. Tomcat配置JNDI数据源的三种方式-转-http://blog.51cto.com/xficc/1564691

热门文章

  1. mysql使用命令
  2. 提升ML.NET模型的准确性
  3. 2. java 运算符
  4. word最近文档清除
  5. PageHelper分页(十)
  6. C#获取CPU和内存使用率
  7. mq代替db
  8. Mondb
  9. Eclipse左侧的工程目录消失解决办法
  10. Codeforces Round #596 (Div. 2, based on Technocup 2020 Elimination Round 2) A. Forgetting Things 水题