前言

本文主要讲述一下spring for kafka的consumer在spring.kafka.consumer.enable-auto-commit是false情况下,AckMode的选项,及手动提交分析总结。

AckMode

RECORD
每处理一条commit一次
BATCH(默认)
每次poll的时候批量提交一次,频率取决于每次poll的调用频率
TIME
每次间隔ackTime的时间去commit(跟auto commit interval有什么区别呢?)
COUNT
累积达到ackCount次的ack去commit
COUNT_TIME
ackTime或ackCount哪个条件先满足,就commit
MANUAL
listener负责ack,但是背后也是批量上去
MANUAL_IMMEDIATE
listner负责ack,每调用一次,就立即commit

Manual Commit

  • 消费端手动提交offset代码如下:
  /**
* 这是手动提交的消费方式
* @param record
* @param ack
* @throws Exception
*/
@KafkaListener(topics = TopicConstants.COMMON_PAY,groupId = "写自己的消费组 id")
public void listenXXXPay(ConsumerRecord<String, String> record , Acknowledgment ack) throws Exception {
String msg = JSONObject.parseObject(record.value(), String.class);
System.out.println(msg);
if (new Random().nextInt(100)<50){
logger.info(String.format("kafka 综合收费消费消息成功---------------- listen1 topic = %s, offset = %d, value = %s ", record.topic(), record.offset(), record.value()));
ack.acknowledge();
}
}

前提要配置AckMode:

instance.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);
  • 接下来问题来了, 如果代码中没有进行ack.acknowledge(),会怎么办呢??

消费者在消费消息的过程中,配置参数设置为不自动提交offset,在消费完数据之后如果不手动提交offset,那么在程序中和kafak中的数据会如何被处理呢?

1. 如果在消费kafka的数据过程中,一直没有提交offset,那么在此程序运行的过程中它不会重复消费。但是如果重启之后,就会重复消费之前没有提交offset的数据。

2. 如果在消费的过程中有几条或者一批数据数据没有提交offset,后面其他的消息消费后正常提交offset,那么服务端会更新为消费后最新的offset,不会重新消费,就算重启程序也不会重新消费。

3. 消费者如果没有提交offset,程序不会阻塞或者重复消费,除非在消费到这个你不想提交offset的消息时你尝试重新初始化一个客户端消费者,即可再次消费这个未提交offset的数据。因为客户端也记录了当前消费者的offset信息,所以程序会在每次消费了数据之后,自己记录offset,而手动提交到服务端的offset与这个并没有关系,所以程序会继续往下消费。在你重新初始化客户端消费者之后,会从服务端得到最新的offset信息记录到本地。所以说如果当前的消费的消息没有提交offset,此时在你重新初始化消费者之后,可得到这条未提交消息的offset,从此位置开始消费。

最新文章

  1. python处理地理数据-geopandas和pyshp
  2. beat your own python env
  3. 【BZOJ】2741: 【FOTILE模拟赛】L
  4. linux kernel 如何处理大小端
  5. 趋势型指标——MACD
  6. eclipse新建一个Android项目,就会报错android.support.v7.app.ActionBarActivity
  7. PHP学习笔记:keditor的使用
  8. POJ 3345-Bribing FIPA(树状背包)
  9. 利用pscp命令实现linux与windows文件互传
  10. 前端的3D(css3版本)
  11. Activity的启动
  12. echarts相关的可视化数据
  13. linux 系统下apache 找不到apxs 文件
  14. (33)关于django中路由自带的admin + 建表关系的讲解
  15. PHP 错误 系列:编码格式错误解决
  16. Mysql初级第三天(wangyun)
  17. Ugly Number leetcode java
  18. [转][C#]BarCodeToHTML
  19. [Unity动画]06.子状态机
  20. ios开发之-- tableview/collectionview获取当前点击的cell

热门文章

  1. FFmpeg常用命令学习笔记(三)分解/复用命令
  2. c语言1-2019秋作业02
  3. django nginx uwsgi 502 Gateway
  4. 莫比乌斯函数介绍&amp;&amp;基础
  5. python--ctypes模块:调用C函数
  6. 诊断和修复Web测试记录器(Web Test Recorder)问题
  7. Linux下 Nginx 启动 重启 关闭
  8. [svn]指定用户名
  9. 基于CentOS 7下最小化安装的操作系统搭建Zabbix3.0环境
  10. ARTS打卡计划第十周