使用 pykafka 进行消费
2024-08-27 09:16:08
kafka连接脚本
环境:python3,用到的模块有 pykafka,kazoo
# coding=utf-8 import pykafka class KafkaReaderThread(object):
def __init__(self, hosts, broker_version, topic, consumer_group):
self.hosts = hosts
self.broker_version = broker_version
self.topic = topic
self.consumer_group = consumer_group
self.client = self.new_client() def new_client(self):
print "start connect..."
try:
new_client = pykafka.KafkaClient(
hosts=self.hosts,
# zookeeper_hosts=self.proc_setting['setting']['zk_server'],
broker_version=self.broker_version
)
print "connected"
return new_client
except Exception as e:
print("error: {}".format(e))
return def fetchmany(self):
client = self.client
if client:
consumer = None
try:
topic = client.topics[self.topic] consumer = topic.get_balanced_consumer(
consumer_group=self.consumer_group,
managed=True,
auto_start=False,
# auto_commit_enable=True,
# auto_commit_interval_ms=1
# reset_offset_on_start=False,
# auto_offset_reset=pykafka.common.OffsetType.LATEST,
)
# print topic.partitions
consumer.start()
# _offset = consumer.held_offsets()
for message in consumer:
# test modle print(message.value, consumer.held_offsets) consumer.commit_offsets()
# consumer.stop()
# continue except Exception as e:
print("error: {}".format(e))
try:
consumer.stop()
except Exception as e:
pass if __name__ == '__main__':
hosts = "172.16.1.249:9092"
broker_version = '0.9.0'
topic = "BAYONET_VEHICLEALARM"
consumer_group = "consumer_group_police_seemmo" kafka = KafkaReaderThread(hosts, broker_version, topic, consumer_group)
if kafka.client:
kafka.fetchmany()
最新文章
- POJ1961[KMP 失配函数]
- DOSBOX 自动挂载技巧
- mysql的数据转换
- gruntJs篇之connect+watch自动刷新
- CCActionManager
- 工作之余,花2个月时间系统学习前端和PHP
- 将类型(int,string,…)转换为 T 类型
- codeforces 712A A. Memory and Crow(水题)
- ytu 1041: 迭代法求平方根(水题)
- 【py网页】urllib模块,urlopen
- HDU5052 Yaoge’s maximum profit(LCT)
- android启动页优化实践
- Technical diagrams for SharePoint 2013
- Github上图文演示仓库创建
- Swift—Cocoa Touch设计模式-备
- 快速配置SSH证书登录
- android activity传递实体类对象
- Android 网络框架 Retrofit2
- bzoj4361 isn(树状数组优化dp+容斥)
- *浅解嵌入式中的BootLoader
热门文章
- java泛型--问号?和T或E或K或V的区别
- [原][资料整理][osg]osgDB文件读取插件,工作机制,支持格式,自定义插件
- shell编程系列13--文本处理三剑客之sed利用sed追加文件内容
- VAE论文学习
- 泡泡一分钟:Perception-aware Receding Horizon Navigation for MAVs
- Qt编写气体安全管理系统8-曲线监控
- matlab基本函数find
- 基于Java API for WebSocket (JSR-356)的标准websocket客户端
- idea导入eclipse包乱码问题全局解决方案:
- [Xamarin] - ";GenerateJavaStubs"; 异常之解决