高可用

多副本机制: 主副本和从副本,从副本只负责同步主副本数据,只有主副本进行读写。

高并发

网络结构设计

多路复用

多selector -> 多线程-> 多队列

高性能

  • 把数据先写入os cache
  • 然后顺序写入磁盘

  • 根据稀疏索引快速定位到要消费消息
  • 零拷贝机制,减少上下文切换和cpu拷贝

如何提高吞吐量

  • 设置缓存区数据量
  • 开启压缩
  • 设置合适批大小batch.size, 太小网络请求频繁,太大导致发送消息慢

重试机制带来问题

  • 消息会重复: 幂等支持
  • 消息乱序: max.in.flight.requests.per.connection=1 producer 同一时间只能发送一条消息,默认重试间隔: retry.backoff.ms=100

偏移量管理

每个consumer内存里数据结构保存对每个topic的每个分区的消费offset,定期会提交offset,老kafak写入zookeeper(废弃)。

提交offset发送给kafka内部topic:__consumer_offsets,提交过去的时候, key是group.id+topic+分区号,value就是当前offset的值,每隔一段时间,kafka内部会对这个topic进行compact(合并),也就是每个group.id+topic+分区号就保留最新数据

消费异常感知

  • heartbeat.interval.ms:consumer心跳时间间隔,必须得与coordinator保持心跳才能知道consumer是否故障了, 然后如果故障之后,就会通过心跳下发rebalance的指令给其他的consumer通知他们进行rebalance的操作
  • session.timeout.mskafka多长时间感知不到一个consumer就认为他故障了,默认是10
  • max.poll.interval.ms:如果在两次poll操作之间,超过了这个时间,那么就会认为这个consume处理能力太弱了,会被踢出消费组,分区分配给别人去消费,一般来说结合业务处理的性能来设置就可以了。

消费者是如何实现rebalance的?

根据coordinator实现

  • 什么是coordinator 每个consumer group都会选择一个broker作为自己的coordinator,他是负责监控这个消费组里的各个消费者的心跳,以及判断是否宕机,然后开启rebalance的
  • 如何选择coordinator机器 首先对groupId进行hash(数字),接着对__consumer_offsets的分区数量取模,默认是50,_consumer_offsets的分区数可以通过offsets.topic.num.partitions来设置,找到分区以后,这个分区所在的broker机器就是coordinator机器。比如说:groupId,“myconsumer_group” -> hash值(数字)-> 对50取模 -> 8 __consumer_offsets 这个主题的8号分区在哪台broker上面,那一台就是coordinator 就知道这个consumer group下的所有的消费者提交offset的时候是往哪个分区去提交offset,

  • (1)每个consumer都发送JoinGroup请求到Coordinator,然后Coordinator从一个consumer group中选择一个consumer作为leader(第一个),Coordinator把consumer group情况发送给这个leader,leader定制消费方案,通过SyncGroup发给Coordinator,接着Coordinator就把消费方案下发给各个consumer,他们会从指定的分区的 leader broker开始进行socket连接以及消费消息。

谈谈Kafka客户端如何巧妙解决JVM GC问题?

1. Kafka 客户端缓冲机制

kafak Produer 流程

1)进行 Producer 初始化,加载配置参数,开启网络线程。

2)执行拦截器逻辑,预处理消息, 封装 Producer Record。

3)调用 Serializer.serialize() 方法进行消息的 key/value 序列化。

4)调用 partition() 选择合适的分区策略,给消息体 Producer Record 分配要发送的 Topic 分区号。

5)从 Kafka Broker 集群获取集群元数据 metadata。

6)将消息缓存到 RecordAccumulator 收集器中, 最后判断是否要发送。这个加入消息收集器,首先得从 Deque<RecordBatch> 里找到自己的目标分区,如果没有就新建一个 Batch 消息 Deque 加进入。

7)当达到发送阈值,唤醒 Sender 线程,实例化 NetWorkClient 将 batch record 转换成 request client 的发送消息体, 并将待发送的数据按 【Broker Id <=> List】的数据进行归类。

8)与服务端不同的 Broker 建立网络连接,将对应 Broker 待发送的消息 List 发送出去。

9)批次发送的条件为: 缓冲区数据大小达到 batch.size 或者 linger.ms 达到上限,哪个先达到就算哪个。

Kafka 实现的缓冲机制 ,减少垃圾回收,降低STW

在 Kafka 客户端内部,针对这个问题实现了一个非常优秀的机制,就是「缓冲池机制」。即每个 Batch 底层都对应一块内存空间,这个内存空间就是专门用来存放写进去的消息。

当一个 Batch 数据被发送到了 kafka 服务端,这个 Batch 的内存空间不再使用了。此时这个 Batch 底层的内存空间先不交给 JVM 去垃圾回收,而是把这块内存空间给放入一个缓冲池里。

这个缓冲池里存放了很多块内存空间,下次如果有一个新的 Batch 数据了,那么直接从缓冲池获取一块内存空间是不是就可以了?然后如果一个 Batch 数据发送出去了之后,再把内存空间还回来是不是就可以了?以此类推,循环往复。
 

kafka开启精确发送一次

通过引入「PID及Sequence Number」支持幂等性,保证精确一次「exactly once」语义。

其中启用幂等传递的方法配置:enable.idempotence = true。启用事务支持的方法配置:设置属性 transcational.id = “指定值”。

谈谈你对Kafka控制器及选举机制是如何理解

所谓的控制器「Controller」就是通过 ZooKeeper 来管理和协调整个 Kafka 集群的组件。集群中任意一台 Broker 都可以充当控制器的角色,但是在正常运行过程中,只能有一个 Broker 成为控制器。

控制器的职责主要包括:

1)集群元信息管理及更新同步 (Topic路由信息等)。

2)主题管理(创建、删除、增加分区等)。

3)分区重新分配。

4)副本故障转移、 Leader 选举、ISR 变更。

5)集群成员管理(通过 watch 机制自动检测新增 Broker、Broker 主动关闭、Broker 宕机等)。

在2.x中 zookeeper作用: 帮助kafka选择controller ,通知controller节点关闭或者加入

Kafka 3.X 版本中,内部实现一个类似于 Raft 的共识算法来选举 Controller

HW 和LEO 理解

HW 作用:

1)用来标识分区下的哪些消息是可以被消费者消费的。

2)协助 Kafka 完成副本数据同步。

LEO 作用:

1)如果 Follower 和 Leader 的 LEO 数据同步了, 那么 HW 就可以更新了。

2)HW 之前的消息数据对消费者是可见的,属于 commited 状态, HW 之后的消息数据对消费者是不可见的。

谈谈 Kafka 消息分配策略都有哪些?

  • RangeAssignor 是 Kafka 默认的分区分配算法,它是按照 Topic 的维度进行分配的,首先对 每个Topic 的 Partition 按照分区ID进行排序,然后对订阅该 Topic 的 Consumer Group 的 Consumer 按名称字典进行排序,之后尽量均衡的按照范围区段将分区分配给 Consumer。此时也可能会造成先分配分区的 Consumer 任务过重(分区数无法被消费者数量整除)

  • RoundRobinAssignor:

  • 该分区分配策略是将 Consumer Group 订阅的所有 Topic 的 Partition 及所有 Consumer 按照字典进行排序后尽量均衡的挨个进行分配。如果 Consumer Group 内,每个 Consumer 订阅都订阅了相同的Topic,那么分配结果是均衡的。如果订阅 Topic 是不同的,那么分配结果是不保证「 尽量均衡」的,因为某些 Consumer 可能不参与一些 Topic 的分配

  • StickyAssignor

    该分区分配算法是最复杂的一种,可以通过 partition.assignment.strategy 参数去设置,从 0.11 版本开始引入,目的就是在执行新分配时,尽量在上一次分配结果上少做调整,其主要实现了以下2个目标:

    1、Topic Partition 的分配要尽量均衡。

    2、当 Rebalance 发生时,尽量与上一次分配结果保持一致。

Rebalance 触发后如何通知其他 Consumer 进程?

1
2
rebalance 的通知机制就是靠 Consumer 端的心跳线程,它会定期发送心跳请求到 Broker 端的 Coordinator 协调者组件,当协调者决定开启 Rebalance 后,它会将「REBALANCE_IN_PROGRESS」封装进心跳请求的响应中发送给 Consumer ,当 Consumer 发现心跳响应中包含了「REBALANCE_IN_PROGRESS」,就知道是 Rebalance 开始了。

 

谈谈Kafka线上大量消息积压你是如何处理的?

事前:

  • 避免大消息发送
  • 分区数和消费组数尽量相等
  • 优化消费端逻辑,避免重平衡

最新文章

  1. javascript 值类型与引用类型
  2. POJ 2828 Buy Tickets(线段树 树状数组/单点更新)
  3. Java程序员开发参考资源
  4. android 完美退出所有Activity的demo
  5. 到目前为止,Linux下最完整的Samba服务器配置攻略 (转)
  6. 【风马一族_php】NO0_搭建web服务器
  7. MES系统的有用存储过程
  8. 将Uploads文件夹移到其它地方
  9. php5.4安装ecshopphp5.4问题及解决
  10. web前端之 CSS
  11. Ajax基础与登入
  12. Python百题计划
  13. D. Flood Fill 区间DP 或lcs匹配
  14. netty的对象传输
  15. html5應用緩存
  16. IntellJ IDEA2017 springboot2.0.2中读取配置
  17. 图像质量评估(IQA)
  18. cmd命令行安装,删除Windows证书(certgmr的简单使用)
  19. python标准库 - socket
  20. RPM包及其管理 rpm命令

热门文章

  1. dispatch_source_t定时器
  2. Java 进阶P-8.5+P-8.6
  3. immutable.js学习笔记(六)----- OrderedSet
  4. 12月20日内容总结——ajax补充知识点、多对多外键的三种创建方式、django内置序列化组件、批量操作数据、分页器推导思路与自定义分页器的使用、form组件
  5. 四、流程控制和break、continue、range函数的讲解
  6. JSP第七次作业
  7. GaussDB(DWS)现网案例:collation报错
  8. elementui表格自定义格式实现原理???
  9. 修改Element - plus的样式
  10. Vue3中的响应式api