Consumer

consumer pull message

订阅

在Consumer启动之前先将自己放到一个本地的集合中,再以后获取消费者的时候会用到,同时会将自己订阅的信息告诉broker

接收消息

consumer启动的时候会启动两个service:

RebalanceService:主要实现consumer的负载均衡,但是并不会直接发送获取消息的请求,而是构造request之后放到PullMessageService中,等待PullMessageService的线程取出执行

PullMessageService:主要负责从broker获取message,包含一个需要获取消息的请求队列(是阻塞的),并不断依次从队列中取出请求向broker send Request

执行时序图太大,截屏截不全,所以放在git上,在这里:RocketMQ.asta

从Broker pullMessage

在PullMessageService中通过netty发送pull消息的请求之后,Broker的remoteServer会收到request,然后在PullMessageProcessor中的processRequest处理,先会解析requestHeader,request中带了读取MessageStore的参数:

  • consumerGroup
  • topic
  • queueId
  • queueOffset
  • MaxMsgNums
  • subscriptionData(ConsumerManager中获取)

processRequest处理流程

  • 判断Broker当前是否允许接收消息
  • 找到subscriptionGroupConfig,subscriptionGroupTable,如果不存在当前的group则新增一个
  • 是否subscriptionGroupConfig.consumeEnable
  • 获取从TopicConfigManager.topicConfigTable获取topicConfig
  • topicConfig是否有读权限
  • 校验queueId是否在范围内
  • ConsumerManager.getConsumerGroupInfo获取ConsumerGroupInfo
  • consumerGroupInfo.findSubscriptionData查找subscriptionData
  • MessageStore.getMessage读取消息,从文件中读取消息,属于pullMessage的核心方法
  • 判断brokerRole,master和slave工作模式的哪一种
  • 判断MessageResult.status
  • 如果responseCode是SUSCESS,判断是使用heap还是非heap方式传输数据
  • 使用netty序列化response返回netty客户端

负载均衡

// AllocateMessageQueueAveragely
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
List<String> cidAll) {
if (currentCID == null || currentCID.length() < 1) {
throw new IllegalArgumentException("currentCID is empty");
}
if (mqAll == null || mqAll.isEmpty()) {
throw new IllegalArgumentException("mqAll is null or mqAll empty");
}
if (cidAll == null || cidAll.isEmpty()) {
throw new IllegalArgumentException("cidAll is null or cidAll empty");
} List<MessageQueue> result = new ArrayList<MessageQueue>();
if (!cidAll.contains(currentCID)) {
log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}", //
consumerGroup, //
currentCID,//
cidAll);
return result;
} int index = cidAll.indexOf(currentCID);
int mod = mqAll.size() % cidAll.size();
int averageSize =
mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
+ 1 : mqAll.size() / cidAll.size());
int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
int range = Math.min(averageSize, mqAll.size() - startIndex);
for (int i = 0; i < range; i++) {
result.add(mqAll.get((startIndex + i) % mqAll.size()));
}
return result;
} // RebalanceImpl
private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet, final boolean isOrder) {
boolean changed = false; // 去掉topic对应的无用MessageQueue(不包含在processQueueTable或者pullExpired)
Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
while (it.hasNext()) {
Entry<MessageQueue, ProcessQueue> next = it.next();
MessageQueue mq = next.getKey();
ProcessQueue pq = next.getValue(); if (mq.getTopic().equals(topic)) {
if (!mqSet.contains(mq)) {
pq.setDropped(true);
if (this.removeUnnecessaryMessageQueue(mq, pq)) {
it.remove();
changed = true;
log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
}
} else if (pq.isPullExpired()) {
switch (this.consumeType()) {
case CONSUME_ACTIVELY:
break;
case CONSUME_PASSIVELY:
pq.setDropped(true);
if (this.removeUnnecessaryMessageQueue(mq, pq)) {
it.remove();
changed = true;
log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",
consumerGroup, mq);
}
break;
default:
break;
}
}
}
} List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
for (MessageQueue mq : mqSet) {
// 如果MessageQueue不在processQueueTable
if (!this.processQueueTable.containsKey(mq)) {
if (isOrder && !this.lock(mq)) {
log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
continue;
} this.removeDirtyOffset(mq);
ProcessQueue pq = new ProcessQueue();
long nextOffset = this.computePullFromWhere(mq);
// 如果message还有数据需要读
if (nextOffset >= 0) {
ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
if (pre != null) {
log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
} else {
log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);
pullRequest.setNextOffset(nextOffset);
pullRequest.setMessageQueue(mq);
pullRequest.setProcessQueue(pq);
pullRequestList.add(pullRequest);
changed = true;
}
} else {
log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
}
}
} // 将pullRequest放在pullRequestQueue中等待去取数据
this.dispatchPullRequest(pullRequestList); return changed;
}

issue

多个consumer读取同一个consumerQueue的时候怎么记录每个读取的进度

每个consumer只要记住自己读取哪一个队列,以及offset就可以了

pull消息过程中的关键类

DefaultMQPushConsumerImpl

供DefaultMQPushConsumer调用,作为consumer的默认实现:

AllocateMessageQueueAveragely

这个类主要是consumer消费的负载均衡算法

public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
List<String> cidAll) {
if (currentCID == null || currentCID.length() < 1) {
throw new IllegalArgumentException("currentCID is empty");
}
if (mqAll == null || mqAll.isEmpty()) {
throw new IllegalArgumentException("mqAll is null or mqAll empty");
}
if (cidAll == null || cidAll.isEmpty()) {
throw new IllegalArgumentException("cidAll is null or cidAll empty");
} List<MessageQueue> result = new ArrayList<MessageQueue>();
if (!cidAll.contains(currentCID)) {
log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}", //
consumerGroup, //
currentCID,//
cidAll);
return result;
} // 基本原则,每个队列只能被一个consumer消费
// 当messageQueue个数小于等于consume的时候,排在前面(在list中的顺序)的consumer消费一个queue,index大于messageQueue之后的consumer消费不到queue,也就是为0
// 当messageQueue个数大于consumer的时候,分两种情况
// 当有余数(mod > 0)并且index < mod的时候,当前comsumer可以消费的队列个数是 mqAll.size() / cidAll.size() + 1
// 可以整除或者index 大于余数的时候,队列数为:mqAll.size() / cidAll.size()
int index = cidAll.indexOf(currentCID);
int mod = mqAll.size() % cidAll.size();
int averageSize =
mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
+ 1 : mqAll.size() / cidAll.size());
int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
int range = Math.min(averageSize, mqAll.size() - startIndex);
for (int i = 0; i < range; i++) {
result.add(mqAll.get((startIndex + i) % mqAll.size()));
}
return result;
}

最新文章

  1. [转]C语言SOCKET编程指南
  2. 用Eclipse搭建ssh框架
  3. Coursera系列-R Programming第二周
  4. python3+任务计划实现的人人影视网站自动签到
  5. 移动平台对 meta 标签的定义
  6. jQuery Select操作大集合
  7. AE与AO的区别
  8. C#学习笔记(十六):Attribute
  9. 九度OJ 1402 特殊的数 -- 位操作
  10. hdu 4278 Faulty Odometer
  11. WPF自定义控件与样式(15)-终结篇
  12. CodeForces 448
  13. PLSQL Developer下报错信息显示乱码问题
  14. 删除正在登录的SQL账号
  15. 缩进(Python很将就格式)
  16. 基于RTKLIB构建高并发通信测试工具
  17. 使用Java实现二叉树的添加,删除,获取以及遍历
  18. RDLC报表系列--------初级报表
  19. js中的类
  20. fffmgg

热门文章

  1. promise 链式
  2. https多网站1个IP多个SSL证书的Apache设置办法
  3. C#反编译笔记
  4. mysql官方的测试数据库employees超30万的数据,安装方法介绍
  5. 阿里云Centos+Django+Nginx+uWSGI
  6. HDU5813 Elegant Construction
  7. Vue route部分简单高级用法
  8. 设置MessageBox自动关闭
  9. Android-Could not download kotlin-reflect.jar
  10. 【百度杯】ctf夺旗大战,16万元奖励池等你拿