一、基本思路

异步发送的基本思路就是:send的时候,KafkaProducer把消息放到本地的消息队列RecordAccumulator,然后一个后台线程Sender不断循环,把消息发给Kafka集群。

要实现这个,还得有一个前提条件:就是KafkaProducer/Sender都需要获取集群的配置信息Metadata。所谓Metadata,也就是在上一篇所讲的,Topic/Partion与broker的映射关系:每一个Topic的每一个Partion,得知道其对应的broker列表是什么,其中leader是谁,follower是谁。

二、2个数据流

所以在上图中,有2个数据流:

Metadata流(A1,A2,A3):Sender从集群获取信息,然后更新Metadata; KafkaProducer先读取Metadata,然后把消息放入队列。

消息流(B1, B2, B3)

从上图可以看出,Metadata是多个producer线程读,一个sender线程更新,因此它必须是线程安全的

三、Metadata的线程安全性

从下面代码也可以看出,它的所有public方法都是synchronized:

 public final class Metadata {
。。。
public synchronized Cluster fetch() {
return this.cluster;
}
public synchronized long timeToNextUpdate(long nowMs) {
。。。
}
public synchronized int requestUpdate() {
。。。
}
。。。

四、Metadata的数据结构

 public final class Metadata {
...
private final long refreshBackoffMs; //更新失败的情况下,下1次更新的补偿时间(这个变量在代码中意义不是太大)
private final long metadataExpireMs; //关键值:每隔多久,更新一次。缺省是600*1000,也就是10分种
private int version; //每更新成功1次,version递增1。这个变量主要用于在while循环,wait的时候,作为循环判断条件
private long lastRefreshMs; //上一次更新时间(也包含更新失败的情况)
private long lastSuccessfulRefreshMs; //上一次成功更新的时间(如果每次都成功的话,则2者相等。否则,lastSuccessulRefreshMs < lastRefreshMs)
private Cluster cluster; //集群配置信息
private boolean needUpdate; //是否强制刷新

...
}

最新文章

  1. GroupBy(..)的四种声明方式的理解及调用
  2. hibernate学习笔记之一 hibernate简介
  3. [转]sysctl -P 报错解决办法
  4. K-means聚类的Python实现
  5. ARM与FPGA通过spi通信设计1.spi基础知识
  6. 轻松搭建Xposed Hook
  7. LOJ#6282. 数列分块入门 6
  8. [LeetCode] 100. Same Tree_Easy tag: DFS
  9. java 静态代理和动态代理
  10. VS2017自带VS2015编译器等在命令行下无法使用问题
  11. elasticsearch 6.0java api的使用
  12. storm的优化以及雪崩问题
  13. 检索COM类工厂的组件失败:80040111
  14. 基于Oracle的EntityFramework的WEBAPI2的实现(二)——使用DbFirst
  15. BZOJ2337:[HNOI2011]XOR和路径(高斯消元)
  16. &lt;input&gt;type类型
  17. iOS-Pods里三方文件导入找不到头文件
  18. 转!java操作redis
  19. 对 tensorflow 中 tf.nn.embedding_lookup 函数的解释
  20. AES加密 AESCrypt 类

热门文章

  1. 一次 .NET Core 中玩锁的经历:ManualResetEventSlim, Semaphore 与 SemaphoreSlim
  2. Vmware 占用宿主机硬盘空间只增不减
  3. IOS开发之KVC KVO KVB
  4. C#热敏打印图片 串口打印图片
  5. WPF ScrollViewer(滚动条) 自定义样式表制作 (改良+美化)
  6. Expression Blend学习5控件
  7. xadmin下修改左道航的显示不是中文字修改方法
  8. 照片美妆---基于Haar特征的Adaboost级联人脸检测分类器
  9. ztree的树形结构不能正常显示原因
  10. 在DELPHI中*.wav 文件怎么加到资源文件中