概述

  接着我们上一篇继续分析消息发送,上节讲到消息发送前有可能遇到 broker 失效的情况,RocketMQ 主要是采用两种策略 :

  • 重试发送
  • broker 故障延迟机制
      后者指的是当发送给某一broker失败后,会将该broker暂时排除在消息队列的选择范围内,到达某个时间点后再继续重试发送,发送的时候消耗的时长越多,那么延迟的时长就越多(就像缓存算法一样,使用得越少,越容易给淘汰)。下面介绍broker 故障延时机制

LatencyFaultToleranceImpl

接着昨天选取某一个消息队列的代码
    /**
* Broker 故障延迟机制
* 获取,判断是否失效,失效则移存记录
*
*/
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
if (this.sendLatencyFaultEnable) {
try {
//同一个线程,index 增加 1
int index = tpInfo.getSendWhichQueue().getAndIncrement();
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
//取余,使同个线程发送到不同的 messagequeue
int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
if (pos < 0)
pos = 0;
MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
//判断该 broker 是否失效
if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
return mq;
}
}
// 获取的 mq 失效
final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
if (writeQueueNums > 0) {
final MessageQueue mq = tpInfo.selectOneMessageQueue();
if (notBestBroker != null) {
mq.setBrokerName(notBestBroker);
mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
}
return mq;
} else {
latencyFaultTolerance.remove(notBestBroker);
}
} catch (Exception e) {
log.error("Error occurred when selecting message queue", e);
} return tpInfo.selectOneMessageQueue();
} return tpInfo.selectOneMessageQueue(lastBrokerName);
}
broker 故障延迟机制相关类如下

其中LatencyFaultToleranceImpl 定义内部类 FaultItem ,从名字也可以看出这是放置失效broker用的,内部存放 currentLatency表示延时时间,startTimestamp 表示失效时间终点时间戳。
    // LatencyFaultToleranceImpl#updateFaultItem 

    @Override
/**
*
* @param name broker名字
* @param currentLatency 当前延迟时长
* @param notAvailableDuration 延迟时长
*/
@Override
public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {
FaultItem old = this.faultItemTable.get(name);
if (null == old) {
final FaultItem faultItem = new FaultItem(name);
faultItem.setCurrentLatency(currentLatency);
//可以看到 下次可用的时间 = 现在的时间 + 延迟的时间
faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); old = this.faultItemTable.putIfAbsent(name, faultItem);
if (old != null) {
old.setCurrentLatency(currentLatency);
old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
}
} else {
old.setCurrentLatency(currentLatency);
old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
}
} /**
* 尝试从规避的broker 中选择一个可用的较好的 broker ,如果没找到,将返回 null
* 哪一个较好呢? 肯定是规避时间较短的
*/
@Override
public String pickOneAtLeast() {
final Enumeration<FaultItem> elements = this.faultItemTable.elements();
List<FaultItem> tmpList = new LinkedList<FaultItem>();
while (elements.hasMoreElements()) {
final FaultItem faultItem = elements.nextElement();
tmpList.add(faultItem);
} if (!tmpList.isEmpty()) {
//洗牌后又排序,这里看不懂为啥要洗牌后排序,为什么不直接排序?
Collections.shuffle(tmpList);
//排序的根据看 FaultItem 的 compareTo 方法,根据时间排序
Collections.sort(tmpList); final int half = tmpList.size() / 2;
if (half <= 0) {
return tmpList.get(0).getName();
} else {
// whichItemWorst 记录本次选取的 broker ,加若选去后还是发送失败,那么同个线程下次再次调用 pickOneAtLeast 的时候
//就会调用选用另外一个broker
final int i = this.whichItemWorst.getAndIncrement() % half;
return tmpList.get(i).getName();
}
} return null;
}
    // MQFaultStrategy#updateFaultItem
// 其中 currentLatency 是发送时长(发送花了多少时间)
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
if (this.sendLatencyFaultEnable) {
long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
}
} //计算出需要延迟的时长
private long computeNotAvailableDuration(final long currentLatency) {
for (int i = latencyMax.length - 1; i >= 0; i--) {
if (currentLatency >= latencyMax[i])
return this.notAvailableDuration[i];
} return 0;
} private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};

消息发送核心方法

  假如是一个异步发送方法

    public static void main(
String[] args) throws MQClientException, InterruptedException, UnsupportedEncodingException { DefaultMQProducer producer = new DefaultMQProducer("Jodie_Daily_test");
producer.start();
producer.setRetryTimesWhenSendAsyncFailed(0); for (int i = 0; i < 10000000; i++) {
try {
final int index = i;
Message msg = new Message("Jodie_topic_1023",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
} @Override
public void onException(Throwable e) {
System.out.printf("%-10d Exception %s %n", index, e);
e.printStackTrace();
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
producer.shutdown();
}
DefaultMQProducerimpl#sendKernelImpl是消息发送的核心方法,我们看一下
    /**
*
* @param msg 发送的消息
* @param mq 选定的消息队列
* @param communicationMode 交互模式
* @param sendCallback 异步消息 callback 回调
* @param topicPublishInfo topic消息
* @param timeout 超时时长
* @return
* @throws MQClientException
* @throws RemotingException
* @throws MQBrokerException
* @throws InterruptedException
*/
private SendResult sendKernelImpl(final Message msg,
final MessageQueue mq,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final TopicPublishInfo topicPublishInfo,
final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
//No.1 获取地址
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
if (null == brokerAddr) {
tryToFindTopicPublishInfo(mq.getTopic());
brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
} SendMessageContext context = null;
if (brokerAddr != null) {
//
brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr); byte[] prevBody = msg.getBody();
try {
//for MessageBatch,ID has been set in the generating process
// 为消息分配全局唯一 ID ,如果消息体默认超过 4K ,会对消息采用 zip 压缩,设置标识位
if (!(msg instanceof MessageBatch)) {
MessageClientIDSetter.setUniqID(msg);
} int sysFlag = 0;
if (this.tryToCompressMessage(msg)) {
sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
} final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
} // 如果有钩子函数,执行钩子函数
if (hasCheckForbiddenHook()) {
CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());
checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());
checkForbiddenContext.setCommunicationMode(communicationMode);
checkForbiddenContext.setBrokerAddr(brokerAddr);
checkForbiddenContext.setMessage(msg);
checkForbiddenContext.setMq(mq);
checkForbiddenContext.setUnitMode(this.isUnitMode());
this.executeCheckForbiddenHook(checkForbiddenContext);
} if (this.hasSendMessageHook()) {
context = new SendMessageContext();
context.setProducer(this);
context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
context.setCommunicationMode(communicationMode);
context.setBornHost(this.defaultMQProducer.getClientIP());
context.setBrokerAddr(brokerAddr);
context.setMessage(msg);
context.setMq(mq);
String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (isTrans != null && isTrans.equals("true")) {
context.setMsgType(MessageType.Trans_Msg_Half);
} if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
context.setMsgType(MessageType.Delay_Msg);
}
this.executeSendMessageHookBefore(context);
} //==================== 构造request请求体 ================== SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTopic(msg.getTopic());
requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setSysFlag(sysFlag);
requestHeader.setBornTimestamp(System.currentTimeMillis());
requestHeader.setFlag(msg.getFlag());
requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
requestHeader.setReconsumeTimes(0);
requestHeader.setUnitMode(this.isUnitMode());
requestHeader.setBatch(msg instanceof MessageBatch);
if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
if (reconsumeTimes != null) {
requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
} String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
if (maxReconsumeTimes != null) {
requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
}
} SendResult sendResult = null; //==================== 构造request请求体 ================== //(重要)根据不同的交互模式,发送请求,核心发送逻辑
switch (communicationMode) {
case ASYNC:
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr,
mq.getBrokerName(),
msg,
requestHeader,
timeout,
communicationMode,
sendCallback,
topicPublishInfo,
this.mQClientFactory,
this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
context,
this);
break;
case ONEWAY:
case SYNC:
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr,
mq.getBrokerName(),
msg,
requestHeader,
timeout,
communicationMode,
context,
this);
break;
default:
assert false;
break;
} if (this.hasSendMessageHook()) {
context.setSendResult(sendResult);
this.executeSendMessageHookAfter(context);
} return sendResult;
....
....
(异常处理) }

底层发送

  MQ 客户端发送消息的入口是 MQClientAPIImpl#sendMessage,MQClientAPIImpl 持有 RemotingClient 字段,它是个接口,实现类是 NettyRemotingClient ,即是它就是真正执行发送的对象,请求命令是 RequestCode.SEND_MESSAGE,我们可以找到该命令的处理类: org . apache.rocketmq. broker.processor.SendMessageProcessor。人口方法在 SendMessageProcessor#sendMessage。

    //SendMessageProcessor#sendMessage 

    private RemotingCommand sendMessage(final ChannelHandlerContext ctx,
final RemotingCommand request,
final SendMessageContext sendMessageContext,
final SendMessageRequestHeader requestHeader) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader) response.readCustomHeader(); response.setOpaque(request.getOpaque()); response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId());
response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn())); log.debug("receive SendMessage request command, {}", request); final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();
if (this.brokerController.getMessageStore().now() < startTimstamp) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimstamp)));
return response;
} response.setCode(-1);
// No.1 先检查
super.msgCheck(ctx, requestHeader, response);
if (response.getCode() != -1) {
return response;
} final byte[] body = request.getBody(); int queueIdInt = requestHeader.getQueueId();
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic()); if (queueIdInt < 0) {
queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums();
} MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setTopic(requestHeader.getTopic());
msgInner.setQueueId(queueIdInt);
// 如果消息重试次数超过允许的最大重试次数,消息将进入DLD延迟队列
if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) {
return response;
} msgInner.setBody(body);
msgInner.setFlag(requestHeader.getFlag());
MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties()));
msgInner.setPropertiesString(requestHeader.getProperties());
msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
msgInner.setBornHost(ctx.channel().remoteAddress());
msgInner.setStoreHost(this.getStoreHost());
msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes()); if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
String traFlag = msgInner.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (traFlag != null) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark(
"the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending transaction message is forbidden");
return response;
}
} //(重要)进行消息存储,后续分析
PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner); return handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt); }
    //

    protected RemotingCommand msgCheck(final ChannelHandlerContext ctx,
final SendMessageRequestHeader requestHeader, final RemotingCommand response) {
// 检查是否可写
if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())
&& this.brokerController.getTopicConfigManager().isOrderTopic(requestHeader.getTopic())) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
+ "] sending message is forbidden");
return response;
}
// 检查该Topic是否可以进行消息发送
if (!this.brokerController.getTopicConfigManager().isTopicCanSendMessage(requestHeader.getTopic())) {
String errorMsg = "the topic[" + requestHeader.getTopic() + "] is conflict with system reserved words.";
log.warn(errorMsg);
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(errorMsg);
return response;
} //在 NameServer端存储主题的配置信息
TopicConfig topicConfig =
this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
if (null == topicConfig) {
int topicSysFlag = 0;
if (requestHeader.isUnitMode()) {
if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
} else {
topicSysFlag = TopicSysFlag.buildSysFlag(true, false);
}
} log.warn("the topic {} not exist, producer: {}", requestHeader.getTopic(), ctx.channel().remoteAddress());
topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageMethod(
requestHeader.getTopic(),
requestHeader.getDefaultTopic(),
RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
requestHeader.getDefaultTopicQueueNums(), topicSysFlag); if (null == topicConfig) {
if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
topicConfig =
this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
requestHeader.getTopic(), 1, PermName.PERM_WRITE | PermName.PERM_READ,
topicSysFlag);
}
} if (null == topicConfig) {
response.setCode(ResponseCode.TOPIC_NOT_EXIST);
response.setRemark("topic[" + requestHeader.getTopic() + "] not exist, apply first please!"
+ FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
return response;
}
}
// 检查队列,如果队列不合法,返回错误码
int queueIdInt = requestHeader.getQueueId();
int idValid = Math.max(topicConfig.getWriteQueueNums(), topicConfig.getReadQueueNums());
if (queueIdInt >= idValid) {
String errorInfo = String.format("request queueId[%d] is illegal, %s Producer: %s",
queueIdInt,
topicConfig.toString(),
RemotingHelper.parseChannelRemoteAddr(ctx.channel())); log.warn(errorInfo);
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(errorInfo); return response;
}
return response;
}

总结

  发送的逻辑我们可以归纳一下

  1. 创建 DefaultProducer,启动其 start 方法
  2. send 方法调用会,利用 DefaultProducer 内部的 DefaultProducerImpl 执行发送操作,DefaultProducerImpl 使用 MQClient 执行发送,这中间有个broker 故障延迟发送机制,最后到了MQClientAPIImpl 的发送方法,MQClientAPIImpl 使用 RemotingClient 发送command 的,消息检查,最后到达DefaultMessageStore#putMessage 进行消息存储 。
      再有一个,我们知道了brokerController 和 NamerServerController 这两个类存放这各种重要的信息,使得消息在发送模块等其他模块(只要该模块持有controller)就可以方法得到各种消息。
      同时类在设计的时候通过设计接口,功能扩展封装在抽象类中,而实际使用不要持有最终实现,而是持有接口类,方便以后扩展。

参考资料

  • 《RocketMQ技术内幕》

最新文章

  1. 韩国&quot;被申遗&quot; (转自果壳)
  2. Life is short
  3. MOOCULUS微积分-2: 数列与级数学习笔记 5. Another comparison test
  4. 深入浅出 - Android系统移植与平台开发(二) - 准备Android开发环境
  5. oracle sql 优化
  6. OWIN概述
  7. java.sql.SQLException: 关闭的连接 解决办法
  8. kakfa源码编译打包
  9. Junit4入门
  10. C和C++中结构体(struct)、联合体(union)、枚举(enum)的区别
  11. Innodb buffer pool/redo log_buffer 相关
  12. 动态修改log4net组件的日志文件名
  13. AngularJS中的$http.post与jQuery.post的区别
  14. 开源协议瞎扯淡,什么是 MIT 协议?[转]
  15. php序列化漏洞理解
  16. 第59章 IdentityServer交互服务 - Identity Server 4 中文文档(v1.0.0)
  17. IOT
  18. centos7的Kubernetes部署记录
  19. 1816647 - Error &quot;Data file of SAP Note is incomplete&quot; uploading a note in SNOTE
  20. [51nod1514] 美妙的序列

热门文章

  1. apache配置跨域请求代理
  2. linq和初始化器
  3. U盘拷贝目标文件过大无法复制时的解决方法
  4. JS jQuery 点击页面弹出文字
  5. SpringBoot整合WEB开发--(一)处理JSON返回数据
  6. python面试的100题(4)
  7. 链接测试工具:Xenu
  8. .net core 2.2 使用imagemagick 将pdf转化为png
  9. mybatis报错:A query was run and no Result Maps were found for the Mapped Statement、、Property [login_ip] not found on type [com.thinkgem.jeesite.common.permission.entity.PremissUser]问题解决
  10. opencv:图像轮廓发现