承接上文基于redis,redisson的延迟队列实践,今天介绍下基于rabbitmq延迟插件rabbitmq_delayed_message_exchange实现延迟任务。

一、延迟任务的使用场景

1、下单成功,30分钟未支付。支付超时,自动取消订单

2、订单签收,签收后7天未进行评价。订单超时未评价,系统默认好评

3、下单成功,商家5分钟未接单,订单取消

4、配送超时,推送短信提醒

5、三天会员试用期,三天到期后准时准点通知用户,试用产品到期了

......

对于延时比较长的场景、实时性不高的场景,我们可以采用任务调度的方式定时轮询处理。如:xxl-job。

今天我们讲解延迟队列的实现方式,而延迟队列有很多种实现方式,普遍会采用如下等方式,如:

  • 1.如基于RabbitMQ的队列ttl+死信路由策略:通过设置一个队列的超时未消费时间,配合死信路由策略,到达时间未消费后,回会将此消息路由到指定队列
  • 2.基于RabbitMQ延迟队列插件(rabbitmq-delayed-message-exchange):发送消息时通过在请求头添加延时参数(headers.put("x-delay", 5000))即可达到延迟队列的效果。(顺便说一句阿里云的收费版rabbitMQ当前可支持一天以内的延迟消息),局限性:目前该插件的当前设计并不真正适合包含大量延迟消息(例如数十万或数百万)的场景,详情参见 #/issues/72 另外该插件的一个可变性来源是依赖于 Erlang 计时器,在系统中使用了一定数量的长时间计时器之后,它们开始争用调度程序资源。
  1. 3.使用redis的zset有序性,轮询zset中的每个元素,到点后将内容迁移至待消费的队列,(redisson已有实现)
  • 4.使用redis的key的过期通知策略,设置一个key的过期时间为延迟时间,过期后通知客户端(此方式依赖redis过期检查机制key多后延迟会比较严重;Redis的pubsub不会被持久化,服务器宕机就会被丢弃)。

二、组件安装

安装rabbitMQ需要依赖erlang语言环境,所以需要我们下载erlang的环境安装程序。网上有很多安装教程,这里不再贴图累述,需要注意的是:该延迟插件支持的版本匹配。

插件Git官方地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange

当你成功安装好插件后运行起rabbitmq管理后台,在新建exchange里就可以看到type类型中多出了这个选项

三、RabbitMQ延迟队列插件的延迟队列实现

1、基本原理

  通过 x-delayed-message 声明的交换机,它的消息在发布之后不会立即进入队列,先将消息保存至 Mnesia(一个分布式数据库管理系统,适合于电信和其它需要持续运行和具备软实时特性的 Erlang 应用。目前资料介绍的不是很多)

  这个插件将会尝试确认消息是否过期,首先要确保消息的延迟范围是 Delay > 0, Delay =< ?ERL_MAX_T(在 Erlang 中可以被设置的范围为 (2^32)-1 毫秒),如果消息过期通过 x-delayed-type 类型标记的交换机投递至目标队列,整个消息的投递过程也就完成了。

2、核心组件开发走起

引入maven依赖

 <dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

application.yml简单配置

  rabbitmq:
host: localhost
port: 5672
virtual-host: /

RabbitMqConfig配置文件

package com.example.code.bot_monomer.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.ExchangeBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import java.util.HashMap;
import java.util.Map; /**
* @author: shf description: date: 2022/1/5 15:00
*/
@Configuration
public class RabbitMQConfig { /**
* 普通
*/
public static final String EXCHANGE_NAME = "test_exchange";
public static final String QUEUE_NAME = "test001_queue";
public static final String NEW_QUEUE_NAME = "test002_queue";
/**
* 延迟
*/
public static final String DELAY_EXCHANGE_NAME = "delay_exchange";
public static final String DELAY_QUEUE_NAME = "delay001_queue";
public static final String DELAY_QUEUE_ROUT_KEY = "key001_delay";
//由于阿里rabbitmq增加队列要额外收费,现改为各业务延迟任务共同使用一个queue:delay001_queue
//public static final String NEW_DELAY_QUEUE_NAME = "delay002_queue"; @Bean
public CustomExchange delayMessageExchange() {
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
//自定义交换机
return new CustomExchange(DELAY_EXCHANGE_NAME, "x-delayed-message", true, false, args);
} @Bean
public Queue delayMessageQueue() {
return new Queue(DELAY_QUEUE_NAME, true, false, false);
} @Bean
public Binding bindingDelayExchangeAndQueue(Queue delayMessageQueue, Exchange delayMessageExchange) {
return new Binding(DELAY_QUEUE_NAME, Binding.DestinationType.QUEUE, DELAY_EXCHANGE_NAME, DELAY_QUEUE_ROUT_KEY, null);
//return BindingBuilder.bind(delayMessageQueue).to(delayMessageExchange).with("key001_delay").noargs();
} /**
* 交换机
*/
@Bean
public Exchange orderExchange() {
return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
//return new TopicExchange(EXCHANGE_NAME, true, false);
} /**
* 队列
*/
@Bean
public Queue orderQueue() {
//return QueueBuilder.durable(QUEUE_NAME).build();
return new Queue(QUEUE_NAME, true, false, false, null);
} /**
* 队列
*/
@Bean
public Queue orderQueue1() {
//return QueueBuilder.durable(NEW_QUEUE_NAME).build();
return new Queue(NEW_QUEUE_NAME, true, false, false, null);
} /**
* 交换机和队列绑定关系
*/
@Bean
public Binding orderBinding(Queue orderQueue, Exchange orderExchange) {
//return BindingBuilder.bind(queue).to(exchange).with("#.delay").noargs();
return new Binding(QUEUE_NAME, Binding.DestinationType.QUEUE, EXCHANGE_NAME, "test001_common", null);
} /**
* 交换机和队列绑定关系
*/
@Bean
public Binding orderBinding1(Queue orderQueue1, Exchange orderExchange) {
//return BindingBuilder.bind(queue).to(exchange).with("#.delay").noargs();
return new Binding(NEW_QUEUE_NAME, Binding.DestinationType.QUEUE, EXCHANGE_NAME, "test001_common", null);
} }

MqDelayQueueEnum枚举类

package com.example.code.bot_monomer.enums;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor; /**
* @author: shf description: 延迟队列业务枚举类
* date: 2021/8/27 14:03
*/
@Getter
@NoArgsConstructor
@AllArgsConstructor
public enum MqDelayQueueEnum {
/**
* 业务0001
*/
YW0001("yw0001", "测试0001", "yw0001"),
/**
* 业务0002
*/
YW0002("yw0002", "测试0002", "yw0002"); /**
* 延迟队列业务区分唯一Key
*/
private String code; /**
* 中文描述
*/
private String name; /**
* 延迟队列具体业务实现的 Bean 可通过 Spring 的上下文获取
*/
private String beanId; public static String getBeanIdByCode(String code) {
for (MqDelayQueueEnum queueEnum : MqDelayQueueEnum.values()) {
if (queueEnum.code.equals(code)) {
return queueEnum.beanId;
}
}
return null;
}
}

模板接口处理类:MqDelayQueueHandle

package com.example.code.bot_monomer.service.mqDelayQueue;

/**
* @author: shf description: RabbitMQ延迟队列方案处理接口
* date: 2022/1/10 10:46
*/
public interface MqDelayQueueHandle<T> { void execute(T t);
}

具体业务实现处理类

@Slf4j
@Component("yw0001")
public class MqTaskHandle01 implements MqDelayQueueHandle<String> { @Override
public void execute(String s) {
log.info("MqTaskHandle01.param=[{}]",s);
//TODO
}
}

注意:@Component("yw0001") 要和业务枚举类MqDelayQueueEnum中对应的beanId保持一致。

统一消息体封装类

/**
* @author: shf description: date: 2022/1/10 10:51
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class MqDelayMsg<T> { /**
* 业务区分唯一key
*/
@NonNull
String businessCode; /**
* 消息内容
*/
@NonNull
T content;
}

统一消费分发处理Consumer

package com.example.code.bot_monomer.service.mqConsumer;

import com.alibaba.fastjson.JSONObject;
import com.example.code.bot_monomer.config.common.MqDelayMsg;
import com.example.code.bot_monomer.enums.MqDelayQueueEnum;
import com.example.code.bot_monomer.service.mqDelayQueue.MqDelayQueueHandle; import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component; import lombok.extern.slf4j.Slf4j; /**
* @author: shf description: date: 2022/1/5 15:12
*/
@Slf4j
@Component
//@RabbitListener(queues = "test001_queue")
@RabbitListener(queues = "delay001_queue")
public class TestConsumer { @Autowired
ApplicationContext context; /**
* RabbitHandler 会自动匹配 消息类型(消息自动确认)
*
* @param msgStr
* @param message
*/
@RabbitHandler
public void taskHandle(String msgStr, Message message) {
try {
MqDelayMsg msg = JSONObject.parseObject(msgStr, MqDelayMsg.class);
log.info("TestConsumer.taskHandle:businessCode=[{}],deliveryTag=[{}]", msg.getBusinessCode(), message.getMessageProperties().getDeliveryTag());
String beanId = MqDelayQueueEnum.getBeanIdByCode(msg.getBusinessCode());
if (StringUtils.isNotBlank(beanId)) {
MqDelayQueueHandle<Object> handle = (MqDelayQueueHandle<Object>) context.getBean(beanId);
handle.execute(msg.getContent());
} else {
log.warn("TestConsumer.taskHandle:MQ延迟任务不存在的beanId,businessCode=[{}]", msg.getBusinessCode());
}
} catch (Exception e) {
log.error("TestConsumer.taskHandle:MQ延迟任务Handle异常:", e);
}
}
}

最后简单封装个工具类

package com.example.code.bot_monomer.utils;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.example.code.bot_monomer.config.RabbitMQConfig;
import com.example.code.bot_monomer.config.common.MqDelayMsg; import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.lang.NonNull;
import org.springframework.stereotype.Component; import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;
import java.util.Objects; import lombok.extern.slf4j.Slf4j; /**
* @author: shf description: MQ分布式延迟队列工具类 date: 2022/1/10 15:20
*/
@Slf4j
@Component
public class MqDelayQueueUtil { @Autowired
private RabbitTemplate template; @Value("${mqdelaytask.limit.days:2}")
private Integer mqDelayLimitDays; /**
* 添加延迟任务
*
* @param bindId 业务绑定ID,用于关联具体消息
* @param businessCode 业务区分唯一标识
* @param content 消息内容
* @param delayTime 设置的延迟时间 单位毫秒
* @return 成功true;失败false
*/
public boolean addDelayQueueTask(@NonNull String bindId, @NonNull String businessCode, @NonNull Object content, @NonNull Long delayTime) {
log.info("MqDelayQueueUtil.addDelayQueueTask:bindId={},businessCode={},delayTime={},content={}", bindId, businessCode, delayTime, JSON.toJSONString(content));
if (StringUtils.isAnyBlank(bindId, businessCode) || Objects.isNull(content) || Objects.isNull(delayTime)) {
return false;
}
try {
//TODO 延时时间大于2天的先加入数据库表记录,后由定时任务每天拉取2次将低于2天的延迟记录放入MQ中等待到期执行
if (ChronoUnit.DAYS.between(LocalDateTime.now(), LocalDateTime.now().plus(delayTime, ChronoUnit.MILLIS)) >= mqDelayLimitDays) {
//TODO
} else {
this.template.convertAndSend(
RabbitMQConfig.DELAY_EXCHANGE_NAME,
RabbitMQConfig.DELAY_QUEUE_ROUT_KEY,
JSONObject.toJSONString(MqDelayMsg.<Object>builder().businessCode(businessCode).content(content).build()),
message -> {
//注意这里时间可使用long类型,毫秒单位,设置header
message.getMessageProperties().setHeader("x-delay", delayTime);
return message;
}
);
}
} catch (Exception e) {
log.error("MqDelayQueueUtil.addDelayQueueTask:bindId={}businessCode={}异常:", bindId, businessCode, e);
return false;
}
return true;
} /**
* 撤销延迟消息
* @param bindId 业务绑定ID,用于关联具体消息
* @param businessCode 业务区分唯一标识
* @return 成功true;失败false
*/
public boolean cancelDelayQueueTask(@NonNull String bindId, @NonNull String businessCode) {
if (StringUtils.isAnyBlank(bindId,businessCode)) {
return false;
}
try {
//TODO 查询DB,如果消息还存在即可删除
} catch (Exception e) {
log.error("MqDelayQueueUtil.cancelDelayQueueTask:bindId={}businessCode={}异常:", bindId, businessCode, e);
return false;
}
return true;
} /**
* 修改延迟消息
* @param bindId 业务绑定ID,用于关联具体消息
* @param businessCode 业务区分唯一标识
* @param content 消息内容
* @param delayTime 设置的延迟时间 单位毫秒
* @return 成功true;失败false
*/
public boolean updateDelayQueueTask(@NonNull String bindId, @NonNull String businessCode, @NonNull Object content, @NonNull Long delayTime) {
if (StringUtils.isAnyBlank(bindId, businessCode) || Objects.isNull(content) || Objects.isNull(delayTime)) {
return false;
}
try {
//TODO 查询DB,消息不存在返回false,存在判断延迟时长入库或入mq
//TODO 延时时间大于2天的先加入数据库表记录,后由定时任务每天拉取2次将低于2天的延迟记录放入MQ中等待到期执行
if (ChronoUnit.DAYS.between(LocalDateTime.now(), LocalDateTime.now().plus(delayTime, ChronoUnit.MILLIS)) >= mqDelayLimitDays) {
//TODO
} else {
this.template.convertAndSend(
RabbitMQConfig.DELAY_EXCHANGE_NAME,
RabbitMQConfig.DELAY_QUEUE_ROUT_KEY,
JSONObject.toJSONString(MqDelayMsg.<Object>builder().businessCode(businessCode).content(content).build()),
message -> {
//注意这里时间可使用long类型,毫秒单位,设置header
message.getMessageProperties().setHeader("x-delay", delayTime);
return message;
}
);
}
} catch (Exception e) {
log.error("MqDelayQueueUtil.updateDelayQueueTask:bindId={}businessCode={}异常:", bindId, businessCode, e);
return false;
}
return true;
} }

附上测试类:

/**
* description: 延迟队列测试
*
* @author: shf date: 2021/8/27 14:18
*/
@RestController
@RequestMapping("/mq")
@Slf4j
public class MqQueueController { @Autowired
private MqDelayQueueUtil mqDelayUtil; @PostMapping("/addQueue")
public String addQueue() {
mqDelayUtil.addDelayQueueTask("00001",MqDelayQueueEnum.YW0001.getCode(),"delay0001测试",3000L);
return "SUCCESS";
} }

贴下DB记录表的字段设置

配合xxl-job定时任务即可。

  由于投递后的消息无法修改,设置延迟消息需谨慎!并需要与业务方配合,如:延迟时间在2天以内(该时间天数可调整,你也可以设置阈值单位为小时,看业务需求)的消息不支持修改与撤销。2天之外的延迟消息支持撤销与修改,需要注意的是,需要绑定关联具体操作业务唯一标识ID以对应关联操作撤销或修改。(PS:延迟时间设置在2天以外的会先保存到DB记录表由定时任务每天拉取到时2天内的投放到延迟对列)。

  再稳妥点,为了防止进入DB记录的消息有操作时间误差导致的不一致问题,可在消费统一Consumer消费分发前,查询DB记录表,该消息是否已被撤销删除(增加个删除标记字段记录),并且当前时间大于等于DB表中记录的到期执行时间才能分发出去执行,否则弃用。


此外,利用rabbitmq的死信队列机制也可以实现延迟任务,有时间再附上实现案例。

最新文章

  1. android 手机去哪儿7.2版本客户端 账号存储信息分析
  2. android studio每次启动都要在fetching Android sdk compoment information停好久的解决方案
  3. openfire使用自定义用户表
  4. shared_ptr和多线程
  5. 【Jsch】使用SSH协议连接到远程Shell执行脚本
  6. 设计模式之桥接模式(Bridge)
  7. log4j 分类输出
  8. 学习Linux第六天
  9. [iOS 多线程 &amp; 网络 - 1.2] - 多线程GCD
  10. CAShapeLayer--备用
  11. 一句话改变TWinControl控件的left坐标的前世今生(入口函数是SetBounds,然后调用SetWindowPos起作用,并发消息更新Delphi的left属性值)
  12. OCR怎么能离开扫描仪呢?
  13. 使用supervisor 进行进程管理时调整最大文件打开数
  14. 是否可能两个ETH私钥对应同一个地址
  15. 学习笔记01(mybatis逆向工程)
  16. JavaScript面试技巧(二):JS-Web-API
  17. Linux移植之auto.conf、autoconf.h、Mach-types.h的生成过程简析
  18. C++for的几种方式
  19. suse11 sp4(虚拟机) 安装程序时报错 找不到iso
  20. Flutter Stack布局中定位的方式

热门文章

  1. Cilium 1.11 发布,带来内核级服务网格、拓扑感知路由....
  2. LR常见报错
  3. 一站式云原生在线研发平台 StarOS 种子用户邀请计划正式开启!
  4. Google earth engine 绘制图像间散点图
  5. CF570A Elections 题解
  6. CF1461A String Generation 题解
  7. 二、Uniapp+vue+腾讯IM+腾讯音视频开发仿微信的IM聊天APP,支持各类消息收发,音视频通话,附vue实现源码(已开源)-腾讯云后台配置TXIM
  8. SpringBoot项目使用Caffeine本地缓存
  9. 【LeetCode】1438. 绝对差不超过限制的最长连续子数组 Longest Continuous Subarray With Absolute Diff Less Than or Equal t
  10. 【LeetCode】198. House Robber 打家劫舍 解题报告(Java & Python)