from kafka import KafkaConsumer,TopicPartition
import json
scrapy_kafka_hosts = ["ip:端口", "ip:端口"]
topic = 'wangliang_test'
consumer = KafkaConsumer(bootstrap_servers=scrapy_kafka_hosts,
group_id='', # 消费组
value_deserializer=lambda m: json.loads(m.decode('ascii')), # 消费json 格式的消息
auto_offset_reset='latest', # latest 最新的偏移量,默认最新的 # earliest 最早的偏远量,在还一个组时候才可以使用最早的
enable_auto_commit=True, # 是否开启自动提交 默认开启
auto_commit_interval_ms = 6000 # 提交偏移量的时间间隔,默认5000ms = 5 秒
)
consumer.subscribe([topic]) # 消息的主题,可以指定多个
print(consumer.partitions_for_topic(topic))
# print(consumer.topics()) #获取主题列表
# print(consumer.subscription()) #获取当前消费者订阅的主题
# print(consumer.assignment()) #获取当前消费者topic、分区信息
# print(consumer.beginning_offsets(consumer.assignment())) #获取当前消费者可消费的偏移量 consumer.assign([
TopicPartition(topic=topic, partition=0),
TopicPartition(topic=topic, partition=1),
TopicPartition(topic=topic, partition=3)
]) # 该命令与 subscribe 操作只能存在一个 表示指定主题和分区 获取 consumer 消息 # print(consumer.assignment()) #获取当前消费者topic、分区信息
consumer.seek(TopicPartition(topic=topic, partition=0), 1) # 指定起始offset为12 1表示offset 表示offset 开始连接
# consumer.seek(TopicPartition(topic=topic, partition=1), 0) # 可以注册多个分区,此分区从第一条消息开始接收
consumer.seek(TopicPartition(topic=topic, partition=3), 0) # 没有注册的分区上的消息不会被消费 for msg in consumer:
print(msg)
recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)
print(recv)
print(msg.value["name"],msg.value["age"])

可以指定分区和消费者消费组和管道时间控制,消费数据。

from kafka import KafkaProducer, KafkaConsumer, TopicPartition
import time class ConsumerForKFK(object):
_MESSAGE_NAME = 'wangliang_test' def __init__(self, kafkahost, client_id):
self.kafkaHost = kafkahost
self.group_id = client_id @property
def consumer_client(self, group_id=None):
return KafkaConsumer(
bootstrap_servers=self.kafkaHost,
# client_id = self.client_id,
group_id=self.group_id,
auto_offset_reset="latest"
# 若不指定 consumer_timeout_ms,默认一直循环等待接收,若指定,则超时返回,不再等待
# consumer_timeout_ms : 毫秒数
# consumer_timeout_ms=5000
) def consumer_seek(self, partition=None, partition_all=None, offset_time=None):
partition_number = range(partition_all) if partition_all else partition
consumer = self.consumer_client
Partition_list = []
for i in partition_number:
Partition_list.append(TopicPartition(topic=self._MESSAGE_NAME, partition=i))
consumer.assign(Partition_list)
print(consumer.assignment()) # 获取当前消费者topic、分区信息
if offset_time:
for Partition in Partition_list:
consumer.seek(Partition, 0)
# 发送到指定的消息主题(异步,不阻塞)
for msg in consumer: # 迭代器,等待下一条消息 print(int(time.time()) - int(msg.timestamp / 1000))
if int(time.time()) - int(msg.timestamp / 1000) <= offset_time: # 打印消息
print(msg)
else:
for msg in consumer: # 迭代器,等待下一条消息
print(msg) scrapy_kafka_hosts = ["ip:端口", "ip:端口"]
topic = 'wangliang_test'
cl = ConsumerForKFK(scrapy_kafka_hosts, "")
cl.consumer_seek(
partition=[1, 2],
# partition_all=3,
offset_time=3000
)

消费者学习   https://www.jianshu.com/p/c89997867d48

Python往kafka生产消费数据  https://www.cnblogs.com/longsongpong/p/11010195.html

python操作kafka实践 https://www.cnblogs.com/small-office/p/9399907.html

最新文章

  1. Delphi: 有关Form处理 :需要调用的时候进行调用。
  2. 作业六—图书管理系统(SPEC)系统性能评估测试
  3. Report_报表中Ref Cursor数据源的概念和用法(案例)
  4. c++ 设计模式6 (Decorator 装饰模式)
  5. WebService简单介绍
  6. UI基础视图----UILabel总结
  7. 使用DataReader读取数据
  8. app兼容性测试的几种方案
  9. 获得正在编辑行的数据 esayui datagrid
  10. 从项目经理的角度看.net的MVC中Razor语法真的很垃圾.
  11. Linux内核中的算法和数据结构
  12. LuoGu P2002 消息扩散
  13. 树上差分——点差分裸题 P3128 [USACO15DEC]最大流Max Flow
  14. CM记录-配置Hive on Spark
  15. 【Other】希腊诸神大全-中英文名称
  16. talib 中文文档(三):talib 方法大全
  17. jQuery操作table tr td
  18. windows下openresty中使用lua做接口转发、二次封装等
  19. 配置进程外Session
  20. 认识多渲染目标(Multiple Render Targets)技术【转】

热门文章

  1. github pages + hexo 搭建 blog 遇到的问题
  2. redis安装-备份-恢复
  3. Reactor系列(二)Flux Mono创建
  4. 飞腾1500A 上面银河麒麟操作系统 进行远程以及添加用户的方法 linux xrdp
  5. 使用pycharm开发web——django2.1.5(一)入坑尝试第一步,基本搭建
  6. fiddler笔记:快捷工具栏
  7. RabbitMQ安装&amp;简单使用
  8. LASSO回归与L1正则化 西瓜书
  9. hdu 4826 三维dp
  10. 初学java4 编译器优化