我们知道Kafka 的消息通过topic进行分类。topic可以被分为若干个partition来存储消息。消息以追加的方式写入partition,然后以先入先出的顺序读取。

下面是topic和partition的关系图:

我们一般会在server.conf中通过num.partitions参数指定创建topic时包含多少个partition。默认是num.partitions=1。

既然一个topic有多个partition,那么消息是怎么样分配到partition的呢?

生产者生产一个消息send到topic分区器,分区器会根据消息里面的分区参数key值把消息分到对应的partition。这里就像我们快递代发网点一样,快递代发网点可以代理很多种快递公司,如果要寄快递者P(生产者)指定用什么快递公司,代发网点人员C(分区器)就会把该物品M(消息)归类到指定的快递公司区域存放。如果P不要求具体的快递公司寄件,那么就由C随意分配快递公司(哈哈,那就要看这个家伙的心情了,心情好点给你一个顺丰比较快到达,心情不好时就GG吧)。

下面是Kafka对消息分配分区 DefaultPartitioner.java 类的核心代码:

     public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) {
int nextValue = counter.getAndIncrement();
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// no partitions are available, give a non-available partition
return Utils.toPositive(nextValue) % numPartitions;
}
} else {
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}

第4、7行:如果没有指定key值并且可用分区个数大于0时,在就可用分区中做轮询决定改消息分配到哪个partition。

第4、10行:如果没有指定key值并且没有可用分区时,在所有分区中轮询决定改消息分配到哪个partition。

第14行:如果指定key值,对key做hash分配到指定的partition。

所以当同一个key的消息会被分配到同一个partition中。消息在同一个partition处理的顺序是FIFO,这就保证了消息的顺序性。

最新文章

  1. Dynamics AX 2012 R2 业务系列-采购业务流程
  2. Codeforces 731C:Socks(并查集)
  3. 结对开发训练(郭林林&amp;胡潇丹)
  4. Android反编译
  5. phpcms v9 下拉菜单 二级 三级子栏目调用方法
  6. ECshop中TemplateBeginEditable 和后台编辑讲解
  7. 解析CSS加密技术之“障眼法”
  8. python 使用__slots__
  9. CentOS6.5 服务器+apache5.3绑定多个域名+SELinux设置
  10. MySQL中的空间扩展
  11. POJ1201-Intervals(差动限制)
  12. PAT (Advanced Level) 1060. Are They Equal (25)
  13. tp框架基础(详细步骤分解,易懂)下
  14. iOS 将视频流(h264)和音频流封装成PS流
  15. [转载]innodb 的预读
  16. redis -hash(哈希.对象)
  17. 操作系统PV编程题目总结一
  18. CDC在sql server 2017中无法使用的问题
  19. CentOS7 下源码安装 python3
  20. (网页)JS去掉字符串前后空格或去掉所有空格的用法(转)

热门文章

  1. 算法与数据结构基础 - 折半查找(Binary Search)
  2. JavaWeb——Servlet开发3
  3. Spark 系列(九)—— Spark SQL 之 Structured API
  4. 页面性能监控之performance
  5. spark源码阅读---Utils.getCallSite
  6. 白话--长短期记忆(LSTM)的几个步骤,附代码!
  7. 谷歌hack
  8. BeautifulSoup 库简单学习使用
  9. C# ModBus 读取数据
  10. mysql datetime timestamp区别