匠心零度 转载请注明原创出处,谢谢!

RocketMQ网络部署图

  • NameServer:在系统中是做命名服务,更新和发现 broker服务。
  • Broker-Master:broker 消息主机服务器。
  • Broker-Slave: broker 消息从机服务器。
  • Producer: 消息生产者。
  • Consumer: 消息消费者。

说明: rocketmq系列都将会以rocketmq-4.1.0-incubating进行介绍。

在阅读源码时做了一定的注释,公众号【匠心零度】回复:rocketmq,可获得基于rocketmq4.1.0加详细中文代码注释 。欢迎大家 star、fork !

厮大说过消息中间件的本质消息中间件大道至简:一发一存一消费 ,今天主要来讨论下,就是RocketMQ网络部署图中用颜色标记的部分。

往期rocketmq系列文章

消息发送概述

上面的图大概就是producer发送message到broker的核心逻辑了。

问题思考:

把broker相关信息缓存到客户端减少了与namesrv的交互,但是也降低了broker变化的实时性了,如何忽然有一台broker不可用了会怎么样呢?(后续看看rocketmq的处理),为什么producer发送会那么快呢?本质是由于netty的writeAndFlush?producer如何做到异步发送?同步发送?oneway发送的呢?如果发送失败会怎么处理呢?

消息发送一般流程分析

由于发送还涉及到定时发送,顺序发送,批量发送等情况,本篇考虑到篇幅问题就是一般的发送逻辑讲解,后面继续分享其他情况。

阅读本篇前应该重点阅读下:RocketMQ(二):RPC通讯

如何在本地调试之前文章也分享过了,在此就不提了,发送的逻辑相对于存储以及消费来说是最简单的(直接根据一条线不断的跟下去基本就差不多了),而存储最复杂,其次消费(这些过程可能一条线不好找,后续分享)。

同步发送写法

备注: 可以参考RocketMQ快速入门即可。

producer.start

    /**
* Start this producer instance.
* </p>
*
* <strong>
* Much internal initializing procedures are carried out to make this instance prepared, thus, it's a must to invoke
* this method before sending or querying messages.
* </strong>
* </p>
*
* @throws MQClientException if there is any unexpected error.
*/
@Override
public void start() throws MQClientException {
this.defaultMQProducerImpl.start();
}

主要做了下列事情(核心事情):

  • 一些配置检查。
  • 构建与namesrv通信的netty客户端。
  • 默认每30s与namesrv交换获取broker相关信息。
  • 默认每30s去掉失效的broker信息以及发送心跳到所有broker上面。

构建Message对象

producer是以Message对象进行发送的,看看Message构造:

    public Message() {
} public Message(String topic, byte[] body) {
this(topic, "", "", 0, body, true);
} public Message(String topic, String tags, String keys, int flag, byte[] body, boolean waitStoreMsgOK) {
this.topic = topic;
this.flag = flag;
this.body = body; if (tags != null && tags.length() > 0)
this.setTags(tags); if (keys != null && keys.length() > 0)
this.setKeys(keys); this.setWaitStoreMsgOK(waitStoreMsgOK);
} public Message(String topic, String tags, byte[] body) {
this(topic, tags, "", 0, body, true);
} public Message(String topic, String tags, String keys, byte[] body) {
this(topic, tags, keys, 0, body, true);
}

备注: 主要就是topic、tags、以及body真实内容等。

send发送

SendResult sendResult = producer.send(msg);

进行发送处理。下面我们重点看看send如何处理。

发送send核心分析

发送的几种方式:同步 异步 oneway(应该选择哪种,需要自己根据情况进行判断)

以同步发送为例子,默认超时时间为3s,

SendResult sendResult = producer.send(msg);

这个就是发送的触发方法,我们一直跟进去就行了,第一初步感受:通过跟踪进去第一感觉就是涉及到了JUC相关使用,大量运用享元模式(本质一个map进行缓存)以及netty使用。

核心逻辑:

代码就不大量复制了,需要的github里面获取基于rocketmq4.1.0加详细中文代码注释 。欢迎大家 star、fork !

  • 判断服务是否可用? 不可用直接结束流程。

  • 消息的验证:

  • 获取topic路由信息

    缓存中有就获取,没有就namesrv交互一次(也可能2次)由于topic信息在broker服务端不一定存在,如果不存在就用默认的(TBW102)。

封装请求头信息:

// Namesrv 根据Topic获取Broker Name、队列数(包含读队列与写队列)
public static final int GET_ROUTEINTO_BY_TOPIC = 105;

namesrv服务端接受到这个请求的处理情况。

最后得到的路由信息类似下面的:

  • 发送模式是sync 会有3次其他1次

    //发送模式是sync 会有3次其他1次
    int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
  • 选择一个queue

    如何选择发送那个broker的那个queueid上面?(客户端自己负载),由于broker相关信息缓存在客户端里面,问题来了(由于30s会同步一次信息,那么在30s之内broker出现问题会怎么样呢? )rocketmq是这样处理的:sendLatencyFaultEnable开关是否打开

    1.打开--> 有多长时间内不可用情况

    2.不打开(默认)-->直接随机一个(如果带了lastBrokerName不为空 尽量换不是这个broker的,如果都没有又是随机一个)

  • 调用sendKernelImpl发送消息 发送消息核心

    根据broker的name获取到ip地址,如果通道没有建立并且保存。

    设置设置UNIQ_id,里面保护客户端ip地址信息。

    发送的时候 会有钩子函数提供执行(禁止消息钩子 ,发送消息钩子(executeSendMessageHookBefore、executeSendMessageHookAfter)。

    构建SendMessageRequestHeader,包括生成消息时间戳,所以各各机器时间最好一致,(这样后期也可以查下broker接受消息花了多少时间)。

  • 根据发送消息模式,选择发送方式

    下面这次主要看同步发送情况。

    如果1情况执行nettywriteAndFlush发送成功者跳出来,到达3情况进行等等最多等待3s。这里什么时候唤醒呢? 其实是在broker情况响应客户端的时候进行唤醒的:

    备注: 这里使用CountDownLatch异步转同步的。

    如果是2情况表示发送失败,直接唤醒3情况不进行阻塞了(最后抛异常表示发送失败)

  • 更新broker可用时间

  • retryAnotherBrokerWhenNotStoreOK情况判断

    如果设置为retryAnotherBrokerWhenNotStoreOK为true之后,在发送失败的时候,会选择换一个broker。

  • 如下异常continue,进行发送消息重试

客户端发送流程大概到这里就分析完成了。


如果读完觉得有收获的话,欢迎点赞、关注、加公众号【匠心零度】,查阅更多精彩历史!!!

加入知识星球,一起探讨!

最新文章

  1. ORACLE 连接SQLSERVER 数据库备忘
  2. 利用PHPMailer发送邮件时报错
  3. 51nod 1099 任务执行顺序 (贪心算法)
  4. Wince 6.0 窗口最大化显示
  5. ubuntu 彻底删除卸载
  6. 示例可重用的web component方式组织angular应用模块
  7. 开源调度框架Quartz最佳实践
  8. (转载)Linux定时任务cron配置
  9. Linux下给mysql创建用户分配权限
  10. AVR开发 Arduino方法(一) 端口子系统
  11. Spring Cloud微服务Ribbon负载均衡/Zuul网关使用
  12. 对于EMC DAE、DPE、SPE、SPS的解释
  13. ReactiveX 学习笔记(25)使用 RxJS + Vue.js 调用 REST API
  14. Python:decorator [转]
  15. C#进阶のMEF注入
  16. freemarker 简单操作
  17. cookie与session的区别与关系
  18. .net代码修改webconfig
  19. 第8章 用SQL语句操作数据
  20. 【C#】C#操作Excel文件(转)

热门文章

  1. Android 使用Retrofit获取JSON数据
  2. jquery中prop()方法和attr()方法
  3. ThinkPad X260 UEFI安装 win7 64位 方法
  4. Python环境搭建—安利Python小白的Python安装详细教程
  5. PHP获取随机字符串的两种方法
  6. 学习《人工智能一种现代的方法(第3版)》中文PDF+英文PDF
  7. Python协程一点理解
  8. 局域网内机器不能对ping问题
  9. js02---字符串
  10. 63.C++异常