前景回顾

【mq】从零开始实现 mq-01-生产者、消费者启动

【mq】从零开始实现 mq-02-如何实现生产者调用消费者?

【mq】从零开始实现 mq-03-引入 broker 中间人

【mq】从零开始实现 mq-04-启动检测与实现优化

【mq】从零开始实现 mq-05-实现优雅停机

【mq】从零开始实现 mq-06-消费者心跳检测 heartbeat

【mq】从零开始实现 mq-07-负载均衡 load balance

【mq】从零开始实现 mq-08-配置优化 fluent

【mq】从零开始实现 mq-09-消费者拉取消息 pull message

【mq】从零开始实现 mq-10-消费者拉取消息回执 pull message ack

状态回执

大家好,我是老马。

上一节我们只实现了拉取消息的实现,但是缺少了消费状态回执。

这一节我们一起来学习下如何实现状态回执。

代码实现

回执状态的设计

我们规定如下几种回执状态:

package com.github.houbb.mq.common.constant;

/**
* @author binbin.hou
* @since 0.0.3
*/
public final class MessageStatusConst { private MessageStatusConst(){} /**
* 待消费
* ps: 生产者推送到 broker 的初始化状态
*/
public static final String WAIT_CONSUMER = "W"; /**
* 推送给消费端处理中
* ps: broker 准备推送时,首先将状态更新为 P,等待推送结果
* @since 0.1.0
*/
public static final String TO_CONSUMER_PROCESS = "TCP"; /**
* 推送给消费端成功
* @since 0.1.0
*/
public static final String TO_CONSUMER_SUCCESS = "TCS"; /**
* 推送给消费端失败
* @since 0.1.0
*/
public static final String TO_CONSUMER_FAILED = "TCF"; /**
* 消费完成
*/
public static final String CONSUMER_SUCCESS = "CS"; /**
* 消费失败
*/
public static final String CONSUMER_FAILED = "CF"; /**
* 稍后消费
* @since 0.1.0
*/
public static final String CONSUMER_LATER = "CL"; }

消费者状态回执

我们在消费之后,添加状态回执:

for(MqMessage mqMessage : mqMessageList) {
IMqConsumerListenerContext context = new MqConsumerListenerContext();
final String messageId = mqMessage.getTraceId();
ConsumerStatus consumerStatus = mqListenerService.consumer(mqMessage, context);
log.info("消息:{} 消费结果 {}", messageId, consumerStatus); // 状态同步更新
MqCommonResp ackResp = consumerBrokerService.consumerStatusAck(messageId, consumerStatus);
log.info("消息:{} 状态回执结果 {}", messageId, JSON.toJSON(ackResp));
}

回执实现,根据 messageId 更新对应的消息消费状态。

public MqCommonResp consumerStatusAck(String messageId, ConsumerStatus consumerStatus) {
final MqConsumerUpdateStatusReq req = new MqConsumerUpdateStatusReq();
req.setMessageId(messageId);
req.setMessageStatus(consumerStatus.getCode());
final String traceId = IdHelper.uuid32();
req.setTraceId(traceId);
req.setMethodType(MethodType.C_CONSUMER_STATUS); // 重试
return Retryer.<MqCommonResp>newInstance()
.maxAttempt(consumerStatusMaxAttempt)
.callable(new Callable<MqCommonResp>() {
@Override
public MqCommonResp call() throws Exception {
Channel channel = getChannel(null);
MqCommonResp resp = callServer(channel, req, MqCommonResp.class);
if(!MqCommonRespCode.SUCCESS.getCode().equals(resp.getRespCode())) {
throw new MqException(ConsumerRespCode.CONSUMER_STATUS_ACK_FAILED);
}
return resp;
}
}).retryCall();
}

Broker 回执处理

消息分发

// 消费者消费状态 ACK
if(MethodType.C_CONSUMER_STATUS.equals(methodType)) {
MqConsumerUpdateStatusReq req = JSON.parseObject(json, MqConsumerUpdateStatusReq.class);
final String messageId = req.getMessageId();
final String messageStatus = req.getMessageStatus();
return mqBrokerPersist.updateStatus(messageId, messageStatus);
}

简单实现

这里是基于本地 map 更新状态的,性能比较差。

后续会以 mysql 实现。

public MqCommonResp updateStatus(String messageId, String status) {
// 这里性能比较差,所以不可以用于生产。仅作为测试验证
for(List<MqMessagePersistPut> list : map.values()) {
for(MqMessagePersistPut put : list) {
MqMessage mqMessage = put.getMqMessage();
if(mqMessage.getTraceId().equals(messageId)) {
put.setMessageStatus(status);
break;
}
}
} MqCommonResp commonResp = new MqCommonResp();
commonResp.setRespCode(MqCommonRespCode.SUCCESS.getCode());
commonResp.setRespMessage(MqCommonRespCode.SUCCESS.getMsg());
return commonResp;
}

小结

对于消息状态的细化,更加便于我们后续的管理,和问题的定位。

希望本文对你有所帮助,如果喜欢,欢迎点赞收藏转发一波。

我是老马,期待与你的下次重逢。

开源地址

The message queue in java.(java 简易版本 mq 实现) https://github.com/houbb/mq

拓展阅读

rpc-从零开始实现 rpc https://github.com/houbb/rpc

最新文章

  1. scikit-learn 线性回归算法库小结
  2. oracle 字符串
  3. spring 使用redis集群配置
  4. 【processing】小代码
  5. IOS网络第二天 - 02-异步HTTP请求block回调 解析
  6. poj3122
  7. 【Linux探索之旅】第一部分第四课:磁盘分区,并完成Ubuntu安装
  8. windows phone 8.1开发SQlite数据库操作详解
  9. ubuntu11.04启动 及虚拟文件系统
  10. spring ioc(反转控制)
  11. 15 ActionProvider代码例子
  12. BIM特点及格式文件说明
  13. Cobalt Strike 服务器搭建及使用
  14. img标签的onerror事件
  15. SpringBoot获取配置文件的自定义参数
  16. datagrid点删除,弹出一个确认和取消的消息框
  17. java 反射机制--根据属性名获取属性值
  18. Python 图形界面(GUI)设计
  19. java学习笔记—HTTP协议(10)
  20. 一个奇怪的JS函数

热门文章

  1. 面试题目:手写一个LRU算法实现
  2. 决策树算法2:(增益比率C4.5)
  3. ECMAScript中有两种属性:数据属性和访问器属性。
  4. 祖先元素transform非none时在Iphone6上引起后代fixed/absolute元素的怪异表现及解决方案
  5. java中字符串池,String池,共享池到底是怎么回事?
  6. uniapp清理缓存
  7. for 循环详解
  8. Python使用函数实现杨辉三角
  9. 解决stram++的host代理443端口被占用的问题(电脑有虚拟机进!!)
  10. Spring Boot-自动配置之底层原理