Kafka Delivery Semantics

在Kafka Consumer中,有3种delivery semantics,分别为:至多一次(at most once)、至少一次(at least once)、以及准确一次(exactly once),下面我们分别介绍这3种Delivery 语义。

1. At Most Once

在message batch在被consumer接收后,立即commit offsets。此时若是在消息处理逻辑中出现异常,则未被处理的消息会丢失(不会再次被读取)。

此场景一个例子如下图:

此例流程如下:

  1. Consumer读一个batch的消息
  2. 在接收到消息后,Consumer commits offsets
  3. Consumer 处理数据,例如发送邮件,但是此时一个batch中的最后两条消息由于consumer异常宕机而未被正常处理
  4. Consumer 重启并重新开始读数据。但是此时由于已经committed offset,所以consumer会在最新的offset处读一个batch的消息,之前上一个batch中由于异常而未被处理的消息会丢失

所以at most once 会有丢失数据的风险,但若是应用可以承受丢失数据的风险,则可以使用此方式。

2. At Least Once

在消息被consumer接收并处理后,offsets才被 commit。若是在消息处理时发生异常,则消息会被重新消费。也就是说,会导致消息被重复处理。

At Least Once 是默认使用的语义,在这种情况下,需要保证应用是idempotent 类型(处理重复的消息不会对应用产生影响)。

此场景一个例子如下:

此示例流程如下:

  1. Consumer 读一个batch的消息
  2. 在接收到消息并正常处理
  3. 在consumer 正常处理消息完毕后,commits offset
  4. 继续读并处理下一个batch 的消息。若是在此过程中发生异常(例如consumer 重启),则consumer会从最近的 offset 开始读一个batch的消息并处理。所以此时会导致有重复消息被处理(此例中为4263、4264、4265)

3. Exactly once

此语义较难实现,在kafka中仅能在Kafka => Kafka的工作流中,通过使用Kafka Stream API 实现。对于Kafka => Sink 的工作流,请使用 idempotent consumer。

对于大部分应用程序,我们应使用at least once processing,并确保consumer端的transformation/processing 是idempotent类型。

4. 构建 idempotent consumer

一个idempotent consumer可以在处理重复消息时,不影响整个应用的逻辑。在ElasticSearch 中,通过一个_id 字段唯一识别一条消息。所以在这个场景下,为了实现idempotent consumer,我们需要对同样_id字段的消息做同样的处理。

之前给出的Elastic Search Consumer的例子中,每条消息的 _id 都是默认随机生成的,也就是说:若是处理之前重复的消息,生成的id也是一条新的随机_id,此行为不符合一个idempotent consumer。对此,我们可以自定义一个­_id 模式,修改代码如下:

// poll for new data
while(true){
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMinutes(100)); for(ConsumerRecord record : records) { // construct a kafka generic ID
String kafka_generic_id = record.topic() + "_" + record.partition() + "_" + record.offset(); // where we insert data into ElasticSearch
IndexRequest indexRequest = new IndexRequest(
"kafkademo"
).id(kafka_generic_id).source(record.value(), XContentType.JSON); IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT);
String id = indexResponse.getId(); logger.info(id); try {
Thread.sleep(1000); // introduce a small delay
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

打印出id结果为:

可以看到新的 id 由 kafka topic + partition + offset 这3 部分组成,可以唯一定位一个 record。所以即使重复处理一条record,它发往 ElasticSearch 的 id 也是一样的(即处理逻辑一样)。在这个场景下,即为一个imdepotent consumer。

最新文章

  1. 使用GridVIew显示Gantt(甘特图),动态增减列
  2. Protocol 编码的三种常用方式
  3. shiro 从入门到放弃
  4. WCF初探-19:WCF消息协定
  5. Software caused connection abort: socket write error
  6. LinkedHashMap介绍
  7. JQuery 表格 隔行换色 和鼠标滑过的样式
  8. Toad for Oracle 使用文档
  9. vojis1523 NOI2002 贪吃的九头龙
  10. (转)Linux整合apache和tomcat构建Web服务器
  11. PHP-微信公众平台开发-接收用户输入消息类型并响应
  12. 使用StyleCop.Analyzers进行代码审查
  13. mybatis入门介绍一
  14. FormData 上传多种格式的文件
  15. 背包问题(01背包,完全背包,多重背包(朴素算法&amp;&amp;二进制优化))
  16. vue-cli关闭eslint及配置eslint
  17. [Android] TextView上同时显示图标和文字
  18. MySQL百万级、千万级数据多表关联SQL语句调优
  19. {MySQL存储引擎介绍}一 存储引擎解释 二 MySQL存储引擎分类 三 不同存储引擎的使用
  20. python中变量的数据类型总结

热门文章

  1. Linux系统目录结构和常用目录主要存放内容的说明
  2. 【巨杉数据库SequoiaDB】巨杉数据库无人值守智能自动化测试实践
  3. BZOJ2809&amp;&amp;LG1552 APIO2012派遣(线段树合并)
  4. 根据指定id取出数组中指定对象
  5. linux 配置compoer
  6. 爬虫学习笔记2requests库和beautifulsoup4库学习笔记
  7. 请求 - axios
  8. 《NVM-Express-1_4-2019.06.10-Ratified》学习笔记(5.21.1.10-加-6.4)Atomic_Operations
  9. LED And Incandescent, Who Is Suitable For Holiday Lighting?
  10. MongoDB的安装问题