应用程序使用KafkaConsumer向Kafka订阅主题,并从订阅的主题上接收消息。Kafka消费者从属于消费者群组,一个群组里的消费者订阅的是同一个主题,每个消费者接收主题的一部分分区的消息。

一个分区不能被一个消费者群组里的多个消费者消费,因此如果消费者超过主题的分区数量,那么就有一部分消费者被闲置。

分区的所有权从一个消费者转移到另一个消费者,这样的行为叫做在均衡,不过在均衡期间消费者无法读取消息,造成整个群组一小段时间不可用。

消费者通过被指派为群组协调器的broker发送心跳来维持它们和群组的从属关系以及它们对分区的所有权关系。

在读取消息之前,首先创建一个KafkaConsumer对象,有三个必选属性:bootstrap.servers,key.deserializer,value.deserializer,第四个属性group.id不是必须的,它指定了消费者属于哪个消费者群组。

订阅主题consumer.subscribe()方法,可以指定特定主题,或使用正则表达式。消息轮询是消费者API的核心,通过一个简单的轮询向服务器请求数据。一旦消费者订阅了主题,轮询就会处理所有细节,包括群组协调,分区再均衡,发送心跳和获取数据。

返回的每条数据都包含记录所属主题信息,记录所做分区信息,记录在分区的偏移量,以及记录键值对。

在退出之前使用consume.close()关闭消费者,网络连接和socket也会随之关闭。

我们无法让一个线程运行多个消费者,也无法让多个线程安全共享一个消费者。按照规则,一个消费者使用一个线程。

消费者的配置

1.fetch.min.bytes

该属性指定了消费者从服务器获取记录的最小字节数。

2.fetch.max.wait.ms

指定broker的等待时间,默认是500ms,这个条件和上一个条件哪一个先满足,都会触发broker向消费者发送数据。

3.max.partition.fetch.bytes

该属性指定了服务器从每个分区返回给消费者的最大字节数,默认是1MB.这个数值必须比max.message.size大。

4.session.timeout.ms

该属性指定了消费者在被认为死亡之前可以与服务器断开连接的时间,默认是3秒,如果消费者没有在这个指定时间内发送心跳给群组协调器,就会被认为已经死亡。协调器就会触发在平衡,把它的分区分配给群组的其他消费者。这个属性与heartbeat.interval.ms紧密相关,这个属性指定了消费者可以多久不发送心跳。一般同时修改这两个属性,heartbeat.interval.ms一般是session.timeout.ms的三分之一。

5.auto.offset.reset

该属性指定了消费者在读取一个没有偏移量的分区,或偏移量无效情况下该作何处理,默认值是latest,意思是偏移量无效情况下,消费者从最新记录开始读取数据,另一个值是earliest,意思是从起始位置读取数据。

6.enable.auto.commit

该属性指定了消费者是否自动提交偏移量,默认是true,为了尽量避免重复数据和数据丢失,可以把它设为false,由自己控制何时提交偏移量。

7.partition.assignment.strategy

分区会被分配给群组的消费者,partitionAssignor根据给定消费者和主题,决定哪些分区应该被分配给哪个消费者,有两个默认分配策略:

Range:若干连续分区分配

RoundRobin:逐个分配给消费者

默认是org.apache.kafka.clients.consumer.RangeAssignor,这个类实现了Range策略,也可以改为org.apache.kafka.clients.consumer.RoundRobinAssignor

8.client.id

可以是任意字符串,broker用它标记从客户端发送过来的消息,通常被用在日志,度量指标和配额里。

9.max.poll.records

该属性用于控制单次调用call方法能够返回的记录数量

10.receive.buffer.bytes和send.buffer.bytes

socket读写数据时tcp缓冲区的大小,如果=-1,就使用操作系统默认值。

最新文章

  1. [WCF编程]13.并发:服务并发模式
  2. 数独检测器:帝国理工C++作业
  3. 使用CSS3的appearance属性改变元素的外观
  4. 浅谈new operator、operator new和placement new 分类: C/C++ 2015-05-05 00:19 41人阅读 评论(0) 收藏
  5. 配置Java环境-20160613
  6. 自己搭建Wifi Pineapple Mark V
  7. C语言每日一题之No.7
  8. Html5 常见的新增API详解
  9. JDK - Tomcat - JSP - Servlet 配置运行全攻略(转)
  10. DEDECMS中,arclist标签
  11. 服务器迁移之debian重新配置Web服务的细节
  12. 进程环境之C程序的存储空间布局
  13. Unity SendMessage方法
  14. 4月6日--js生成随机数列
  15. spring boot + easypoi两行代码excel导入导出
  16. python lxml库生成xml文件-节点命名空间问题
  17. jmeter中实现java请求实战日志
  18. ERP产品购进批量提交文件(三十六)
  19. PyQt5--ToolBar
  20. Unity 代码优化

热门文章

  1. C++闭包,一样很简单
  2. vue sourcemap详解
  3. Springmvc案例1----基于spring2.5的採用xml配置
  4. vcmi(魔法门英雄无敌3 - 开源复刻版) 源码编译
  5. 机器学习: Tensor Flow +CNN 做笑脸识别
  6. 距离北京奥运还有359天,发布WPF版本的北京2008标志(下)
  7. Fiddler教程(Web调试工具)
  8. Matlab随笔之判别分析
  9. WPF实现弹幕
  10. 阿里云访问控制(RAM)授权子账户管理磁盘快照