RocketMQ客户端的消息发送通常分为以下3层

  • 业务层:通常指直接调用RocketMQ Client发送API的业务代码。
  • 消息处理层:指RocketMQ Client获取业务发送的消息对象后,一系列的参数检查、消息发送准备、参数包装等操作。
  • 通信层:指RocketMQ基于Netty封装的一个RPC通信服务,RocketMQ的各个组件之间的通信全部使用该通信层。

消息发送流程首先是RocketMQ客户端接收业务层消息,然后通过DefaultMQProducerImpl发送一个RPC请求给Broker,再由Broker处理请求并保存消息。

下面以DefaultMQProducer.send(Messagemsg)接口为例讲解发送流程,

  • 第一步:调用defaultMQProducerImpl.send()方法发送消息。
  • 第二步:通过设置的发送超时时间,调用defaultMQProducerImpl.send()方法发送消息。设置的超时时间可以通过sendMsgTimeout进行变更,其默认值为3s。
  • 第三步:执行 defaultMQProducerImpl.sendDefaultImpl()方法。
private SendResult sendDefaultImpl(
Message msg,
//通信模式,同步、异步还是单向
final CommunicationMode communicationMode,
//对于异步模式,需要设置发送完成后的回调
final SendCallback sendCallback,
final long timeout
){}

该方法是发送消息的核心方法,执行过程分为5步:

第一步: 两个检查:生产者状态、消息及消息内容。没有运行的生产者不能发送消息。消息检查主要检查消息是否为空,消息的Topic的名字是否为空或者是否符合规范;消息体大小是否符合要求,最大值为4MB,可以通过maxMessageSize进行设置。

第二步: 执行tryToFindTopicPublishInfo()方法:获取Topic路由信息,如果不存在则发出异常提醒用户。如果本地缓存没有路由信息,就通过Namesrv获取路由信息,更新到本地,再返回。具体实现代码如下:

private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
} if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
return topicPublishInfo;
} else {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishInfo;
}
}

第三步: 计算消息发送的重试次数,同步重试和异步重试的执行方式是不同的。

第四步: 执行队列选择方法selectOneMessageQueue()。根据队列对象中保存的上次发送消息的Broker的名字和Topic路由,选择(轮询)一个Queue将消息发送到 Broker 。 我们可以通过sendLatencyFaultEnable 来设置是否总是发送到延迟级别较低的 Broker,默认值为False。

第五步: 执行sendKernelImpl()方法。该方法是发送消息的核心方法,主要用于准备通信层的入参(比如Broker地址、请求体等),将请求传递给通信层,内部实现是基于Netty的,在封装为通信层request对象RemotingCommand前,会设置RequestCode表示当前请求是发送单个消息还是批量消息。

最新文章

  1. 承接Unity3D外包公司 — 技术分享
  2. LINQ日常使用记录
  3. 怎么修改git提交过的内容
  4. RABBITMQ/JAVA 客户端测试(再补:利用文件流)
  5. 在Hibernate中配置Hilo进行数据绑定测试时出错:org.hibernate.MappingException: Could not instantiate id generator
  6. Objective-C 语法之 static 关键字
  7. 支付宝支付集成,上传RSA公钥一直显示格式错误
  8. 自己在OC考试中的试题
  9. [NOIP2015]推销员
  10. 【转】MYSQL入门学习之六:MYSQL的运算符
  11. python zip函数介绍
  12. [MySql] 设置了UTF8,中文存数据库中仍然出现问号
  13. css匹配原理与优化
  14. 《University Calculus》-chape8-无穷序列和无穷级数-欧拉恒等式
  15. 字符编码知识:Unicode、UTF-8、ASCII、GB2312等编码之间是如何转换的?
  16. ExtJS4 的dom
  17. 认识:ThinkPHP的编译缓存文件~runtime.php
  18. Linux基础 - 系统优化及常用命令
  19. Hyperledger超级账本在Centos7下搭建运行环境
  20. [链接]最短路径的几种算法[迪杰斯特拉算法][Floyd算法]

热门文章

  1. JUC-学习笔记
  2. java 常用的jar包下载地址
  3. Ubuntu20.04创建快捷方式(CLion)
  4. 网络编程 - OSI七层协议详解
  5. pandas中groupby的使用
  6. 【HarmonyOS】ArkTS Native开发——使用 system函数创建文件
  7. 【FAQ】在华为鸿蒙车机上集成华为帐号的常见问题总结
  8. vivo 云原生容器探索和落地实践
  9. [Linux Kernel 源码分析] 通过vconfig配置vlan的系统调用/驱动流程分析
  10. 【CTF隐写工具】binwalk工具使用方法