注意异常情况导致整个消费无限重试 阻塞消费

mq支持局部消息顺序消费,可以确保同一个消息消费队列中的消息被顺序消费。看下针对顺序消息在整个消费过程中做的调整:

队列负载:

DefaultMQPushConsumerImpl#consumeOrderly决定是否是顺序消息,

org.apache.rocketmq.client.impl.consumer.RebalanceImpl#rebalanceByTopic:

在新分配到队列时,新添加消息拉取任务之前会先检查是否是顺序消息。如果是顺序消息检查上锁是否成功。

回顾之前我们在队列负载均衡最后的总结:

负载均衡20s进行一次,思考2次负载之间,新加入的comsumer分得之前consumer的队列,在之前的consumer进行负载之前是否会出现一个队列被多个consumer消费的情况?

消息拉取:

DefaultMQPushConsumerImpl#pullMessage:

如果是顺序消息但队列未被锁定,则延迟3s后再将pullRequest对象放入到拉取任务中,如果该处理队列是第一次拉取任务,则首先计算拉取偏移量,然后向消息服务端拉取消息。

消息消费:

public class ConsumeMessageOrderlyService implements ConsumeMessageService {
private static final InternalLogger log = ClientLogger.getLog();
private final static long MAX_TIME_CONSUME_CONTINUOUSLY =
Long.parseLong(System.getProperty("rocketmq.client.maxTimeConsumeContinuously", "60000"));//每次消费任务最大持续时间,默认60秒
private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;//消息消费者实现类
private final DefaultMQPushConsumer defaultMQPushConsumer;//消息消费者
private final MessageListenerOrderly messageListener;//顺序消息消费监听器
private final BlockingQueue<Runnable> consumeRequestQueue;//消息消费任务队列
private final ThreadPoolExecutor consumeExecutor;//消息消费线程池
private final String consumerGroup;//消息组名
private final MessageQueueLock messageQueueLock = new MessageQueueLock();//消息消费端消息队列锁容器
private final ScheduledExecutorService scheduledExecutorService;//调度任务线程池
private volatile boolean stopped = false;

public ConsumeMessageOrderlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl,
MessageListenerOrderly messageListener) {
this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;
this.messageListener = messageListener; this.defaultMQPushConsumer = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer();
this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup();
this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>(); this.consumeExecutor = new ThreadPoolExecutor(
this.defaultMQPushConsumer.getConsumeThreadMin(),
this.defaultMQPushConsumer.getConsumeThreadMax(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.consumeRequestQueue,
new ThreadFactoryImpl("ConsumeMessageThread_")); this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_"));
}

注意消息任务队列为LinkedBlockingQueue。

org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService#start:

public void start() {
if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
ConsumeMessageOrderlyService.this.lockMQPeriodically();
}
}, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);
}
}

集群模式下,启动定时任务每20s执行一次锁定分配给自己的消息消费队列。具体实现过程:

将消息队列按照Broker组织成Map<String/*BrokerName*/,set<MessageQueue>>,方便下一步向Broker发送锁定消息队列请求。

向Broker(master主节点)发送锁定消息队列,该方法返回成功被当前消费者锁定的消息消费队列。

将成功锁定的消息消费队列相对应的处理队列设置为锁定状态,同时更新加锁时间。

遍历当前处理队列中的消息消费队列,如果当前消费者不持有该消费队列的锁,将处理队列锁状态设置为false,暂停该消息消费队列的消息拉取于消息消费。

3.提交消息消费任务

org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService#submitConsumeRequest:

顺序消息的ConsumeRequest消费任务不会直接消费本次拉取的消息,而是在消息消费时,从处理队列中拉取消息:

org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService.ConsumeRequest#run:

根据消息队列获取一个对象,然后消息消费时先申请独占objLock。顺序消息消费的并发度为消息队列。也就是一个消息消费队列同一时刻只会被一个消费线程池中一个线程消费。

 if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
|| (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
                //消息消费逻辑
  }
else {
if (this.processQueue.isDropped()) {
log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
return;
}

ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);
}

如果是广播模式的话,直接进入消费,无须锁定处理队列,因为相互之间无竞争;

如果是集群模式,消息消费的前提条件是processQueue被锁定且锁未超时。思考:会不会出现当消息队列重新负载时,原先由自己处理的消息队列被另外一个消费者分配,此时如果还未来得及讲ProcessQueue解除锁定,就被另外一个消费者添加进去,此时会存在多个消息消费者同时消费一个消息队列?答案是不会的,因为当一个新的消费队列分配给消费者是,在添加其拉取任务之前必须先向Broker发送对该消息队列枷锁请求,只有加锁成功后,才能添加拉取消息,否则等到下一次负载后,只有消息队列被原先占有的消费者释放后,才能开始新的拉取任务。集群模式下,如果未锁定处理队列,则延迟该队列的消息消费。

顺序消息消费处理逻辑,每一个ConsumeRequest消费任务不是以消费消息条数来计算的,而是根据消费时间,默认当消费时长大于MAX_TIME_CONSUME_CONTINUOUSLY,默认60s后,本次消费任务结束,有消费组内其他线程继续消费。

顺序消息消费时,从ProcessQueue中取出的消息,会临时存储在ProceeQueue的consumingMsgOrderlyTreeMap属性中。

执行消息消费钩子函数.

消费成功,提交,就是将该批消息从ProcessQueue中移除,维护msgCount(消息处理队列中消息条数)并获取消息消费的偏移量offset,并返回待保存的消息消费进度,

org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService#processConsumeResult:

checkReconsumeTimes发送ack失败,则本地重试,否则则认为消息消费成功 直接commit。

存储消息消费进度。

MessageQueue的锁和ProcessQueue的锁 是为了防止重新负载时出现多个消费者消费统一队列,线程池使用的对象锁,保证了consumeRequest被线程池顺序消费,但是ProcessQueue的锁什么时候unlock?(队列被丢弃移除的时候?)

总结:

1.消息的发送和存储(业务自己实现):利用消息队列局部有序 hash到不同的队列

2.负载均衡:负载均衡算法保证队列只被分到一个consumer,对broker端的queue加锁,保证新分的consumer不会再旧的consumer停止消费前(drop+unlock)开始消费

3.消息拉取:每20秒锁分给自己的processQueue,持有锁的线程才可以pullMessage,防止重平衡时,新分在旧停止拉消息前拉消息,当锁定brokerqueue时,已分队列不在锁定队列中会解锁

4.消息消费:pullRqeust中的消息put到processQueue也是加锁有序的过程,submitMessage时,只提交给线程池一个task,在单个线程里做分组消费。且不成功会无限重试

最新文章

  1. 第一章 Java的I/O演进之路
  2. MySQL各版本的区别
  3. [POJ1631]Bridging signals (DP,二分优化)
  4. Java Servlet-入门
  5. u-boot Makefile Source Test
  6. 布局重用 include merge ViewStub
  7. chart.js 示例
  8. Oracle百问百答(二)
  9. man.go 阅读笔记
  10. 安装sql server2017出现错误:Visual Studio 运行时&quot;Microsoft visual c++2017 X64 Minimum Runtime - 14.10.25008&quot;需要修复
  11. 在oracle表中增加字段,并调整字段的顺序
  12. python测试开发django-56.模板渲染markdown语法+代码高亮
  13. Sigma Function LightOJ - 1336 (约数和为奇数)
  14. CentOS7 yum安装Java+Apache(httpd)+Tomcat并开启自启动
  15. python 连接 Oracle 乱码问题(cx_Oracle)
  16. xcode如何运行下载的demo工程
  17. [emacs] org-mode的一些小技巧
  18. FMX StringGrid向上滑动自动加载记录(一)
  19. vi文字处理器
  20. java工具类 --千分位方法

热门文章

  1. 【源码】spring循环引用
  2. VC中句柄、指针、ID之间的转换
  3. .netcore中的依赖注入
  4. NOI2020D1T1美食家
  5. WSL Ubuntu 18.04 LTS + VS Code
  6. leetcode103:permutations-ii
  7. Linux C Socket 编程
  8. git commit后如何取消commit
  9. 安装tomcat for ubuntu linux差点没晕死我!
  10. Failed connect to mirrors.cloud.aliyuncs.com:80