from kafka import KafkaConsumer
from kafka import TopicPartition
from kafka.structs import OffsetAndMetadata
...
topic = 'your_topic'
partition = 0
tp = TopicPartition(topic,partition)
kafkaConsumer = KafkaConsumer(config here...)
kafkaConsumer.assign([tp])
offset = 15394125
kafkaConsumer.commit({
tp: OffsetAndMetadata(offset, None)
}) meta = consumer.partitions_for_topic(topic)
options = {}
options[partition] = OffsetAndMetadata(message.offset, meta)
consumer.commit(options)


# a better way, remove assign partition manually, and extract partition info from kafka message
topic_partition = TopicPartition(command_params["topic"], message.partition) 
consumer.seek(topic_partition, offset_value) 
consumer.commit()

from: http://stackoverflow.com/questions/36579815/kafka-python-how-do-i-commit-a-partition

如果consumer.commit()不可以,可以使用seek(),使用seek()时,如果有多个partition,需
要为每个partition都手动进行consumer assign:

topic_partition = TopicPartition("TOPIC_TEST", 1)
# 格式为topic, partition, 1表示partition 1.
consumer.assign([topic_partition])

consumer.seek(topic_partition, 1660000)

使用最下面的方法,不再需要手动指定partition,直接从message获取partition,更加灵活。

 

最新文章

  1. IOS开发基础知识--碎片48
  2. Mini projects #5 ---- Memory
  3. John the Ripper
  4. Android之天气APP
  5. objective-C nil,Nil,NULL 和NSNull的小结
  6. 03-树1. List Leaves (25)
  7. Django将request对象传入模板配置
  8. dqname.go
  9. H3C交换机配置命令(收集)
  10. vs code解决golang开发环境问题 dial tcp 216.239.37.1:443: connectex: A connection attempt failed
  11. liunx文件操作 文件压缩
  12. 单元测试工具Junit浅谈
  13. python 内建函数专题
  14. lamp环境搭建之配置apache与fpm方式的php
  15. java12小时制的时间转换为24小时制
  16. java开发编译器:中间语言格式
  17. 基于Bind实现的DNS正反向解析及主从DNS的配置
  18. DotNetty学习笔记
  19. Bootstrap 文件上传插件 FileInput的使用问题
  20. 【leetcode 简单】第四十题 求众数

热门文章

  1. load-display-image之c#版
  2. Oracle 实例恢复
  3. 服务检测sh脚本
  4. linq to sql 项目移植后,数据库实体类需要重新创建?
  5. ubuntu 14.04 安装Eclipse与配置环境变量
  6. PHP CI框架如何去掉 sql 里的反引号
  7. vue 操作数组,原数组怎么不让它改变
  8. BASIC-5_蓝桥杯_查找整数
  9. springMVC集成CXF快速发布webService
  10. Java堆外内存之五:堆外内存管理类ByteBuffer