kafka连接脚本

环境:python3,用到的模块有 pykafka,kazoo

# coding=utf-8

import pykafka

class KafkaReaderThread(object):
def __init__(self, hosts, broker_version, topic, consumer_group):
self.hosts = hosts
self.broker_version = broker_version
self.topic = topic
self.consumer_group = consumer_group
self.client = self.new_client() def new_client(self):
print "start connect..."
try:
new_client = pykafka.KafkaClient(
hosts=self.hosts,
# zookeeper_hosts=self.proc_setting['setting']['zk_server'],
broker_version=self.broker_version
)
print "connected"
return new_client
except Exception as e:
print("error: {}".format(e))
return def fetchmany(self):
client = self.client
if client:
consumer = None
try:
topic = client.topics[self.topic] consumer = topic.get_balanced_consumer(
consumer_group=self.consumer_group,
managed=True,
auto_start=False,
# auto_commit_enable=True,
# auto_commit_interval_ms=1
# reset_offset_on_start=False,
# auto_offset_reset=pykafka.common.OffsetType.LATEST,
)
# print topic.partitions
consumer.start()
# _offset = consumer.held_offsets()
for message in consumer:
# test modle print(message.value, consumer.held_offsets) consumer.commit_offsets()
# consumer.stop()
# continue except Exception as e:
print("error: {}".format(e))
try:
consumer.stop()
except Exception as e:
pass if __name__ == '__main__':
hosts = "172.16.1.249:9092"
broker_version = '0.9.0'
topic = "BAYONET_VEHICLEALARM"
consumer_group = "consumer_group_police_seemmo" kafka = KafkaReaderThread(hosts, broker_version, topic, consumer_group)
if kafka.client:
kafka.fetchmany()

最新文章

  1. POJ1961[KMP 失配函数]
  2. DOSBOX 自动挂载技巧
  3. mysql的数据转换
  4. gruntJs篇之connect+watch自动刷新
  5. CCActionManager
  6. 工作之余,花2个月时间系统学习前端和PHP
  7. 将类型(int,string,…)转换为 T 类型
  8. codeforces 712A A. Memory and Crow(水题)
  9. ytu 1041: 迭代法求平方根(水题)
  10. 【py网页】urllib模块,urlopen
  11. HDU5052 Yaoge’s maximum profit(LCT)
  12. android启动页优化实践
  13. Technical diagrams for SharePoint 2013
  14. Github上图文演示仓库创建
  15. Swift—Cocoa Touch设计模式-备
  16. 快速配置SSH证书登录
  17. android activity传递实体类对象
  18. Android 网络框架 Retrofit2
  19. bzoj4361 isn(树状数组优化dp+容斥)
  20. *浅解嵌入式中的BootLoader

热门文章

  1. java泛型--问号?和T或E或K或V的区别
  2. [原][资料整理][osg]osgDB文件读取插件,工作机制,支持格式,自定义插件
  3. shell编程系列13--文本处理三剑客之sed利用sed追加文件内容
  4. VAE论文学习
  5. 泡泡一分钟:Perception-aware Receding Horizon Navigation for MAVs
  6. Qt编写气体安全管理系统8-曲线监控
  7. matlab基本函数find
  8. 基于Java API for WebSocket (JSR-356)的标准websocket客户端
  9. idea导入eclipse包乱码问题全局解决方案:
  10. [Xamarin] - "GenerateJavaStubs" 异常之解决