前言

当系统中出现“生产“和“消费“的速度或稳定性等因素不一致的时候,就需要消息队列。

名词

  • exchange: 交换机
  • routingkey: 路由key
  • queue:队列
  • 控制台端口:15672

   exchange和queue是需要绑定在一起的,然后消息发送到exchange再由exchange通过routingkey发送到对应的队列中。

使用场景

  • 1.技能订单3分钟自动取消,改变状态
  • 2.直播开始前15分钟提醒
  • 3.直播状态自动结束

参考链接

  https://juejin.im/entry/5a17909a518825329314397d
  https://www.jianshu.com/p/ea953f633466

流程

  生产者发送消息 —> order_pre_exchange交换机 —> order_per_ttl_delay_queue队列

  —> 时间到期 —> order_delay_exchange交换机 —> order_delay_process_queue队列 —> 消费者

第一步:在pom文件中添加

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

第二步:在application.properties文件中添加

spring.rabbitmq.host=172.xx.xx.xxx
spring.rabbitmq.port=5672
spring.rabbitmq.username=rabbit
spring.rabbitmq.password=123456
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000 spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.template.mandatory=true

第三步:配置 OrderQueueConfig

  1 package com.tuohang.platform.config;
2
3 import org.springframework.amqp.core.Binding;
4 import org.springframework.amqp.core.BindingBuilder;
5 import org.springframework.amqp.core.DirectExchange;
6 import org.springframework.amqp.core.Queue;
7 import org.springframework.amqp.core.QueueBuilder;
8 import org.springframework.amqp.rabbit.connection.ConnectionFactory;
9 import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
10 import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
11 import org.springframework.context.annotation.Bean;
12 import org.springframework.context.annotation.Configuration;
13
14
15 /**
16 * rabbitMQ的队列设置(生产者发送的消息,永远是先进入exchange,再通过路由,转发到队列)
17 *
18 *
19 * @author Administrator
20 * @version 1.0
21 * @Date 2018年9月18日
22 */
23 @Configuration
24 public class OrderQueueConfig {
25
26 /**
27 * 订单缓冲交换机名称
28 */
29 public final static String ORDER_PRE_EXCHANGE_NAME = "order_pre_exchange";
30
31 /**
32 * 发送到该队列的message会在一段时间后过期进入到order_delay_process_queue 【队列里所有的message都有统一的失效时间】
33 */
34 public final static String ORDER_PRE_TTL_DELAY_QUEUE_NAME = "order_pre_ttl_delay_queue";
35
36 /**
37 * 订单的交换机DLX 名字
38 */
39 final static String ORDER_DELAY_EXCHANGE_NAME = "order_delay_exchange";
40
41 /**
42 * 订单message时间过期后进入的队列,也就是订单实际的消费队列
43 */
44 public final static String ORDER_DELAY_PROCESS_QUEUE_NAME = "order_delay_process_queue";
45
46 /**
47 * 订单在缓冲队列过期时间(毫秒)30分钟
48 */
49 public final static int ORDER_QUEUE_EXPIRATION = 1800000;
50
51 /**
52 * 订单缓冲交换机
53 *
54 * @return
55 */
56 @Bean
57 public DirectExchange preOrderExange() {
58 return new DirectExchange(ORDER_PRE_EXCHANGE_NAME);
59 }
60
61 /**
62 * 创建order_per_ttl_delay_queue队列,订单消息经过缓冲交换机,会进入该队列
63 *
64 * @return
65 */
66 @Bean
67 public Queue delayQueuePerOrderTTLQueue() {
68 return QueueBuilder.durable(ORDER_PRE_TTL_DELAY_QUEUE_NAME)
69 .withArgument("x-dead-letter-exchange", ORDER_DELAY_EXCHANGE_NAME) // DLX
70 .withArgument("x-dead-letter-routing-key", ORDER_DELAY_PROCESS_QUEUE_NAME) // dead letter携带的routing key
71 .withArgument("x-message-ttl", ORDER_QUEUE_EXPIRATION) // 设置订单队列的过期时间
72 .build();
73 }
74
75 /**
76 * 将order_pre_exchange绑定到order_pre_ttl_delay_queue队列
77 *
78 * @param delayQueuePerOrderTTLQueue
79 * @param preOrderExange
80 * @return
81 */
82 @Bean
83 public Binding queueOrderTTLBinding(Queue delayQueuePerOrderTTLQueue, DirectExchange preOrderExange) {
84 return BindingBuilder.bind(delayQueuePerOrderTTLQueue).to(preOrderExange).with(ORDER_PRE_TTL_DELAY_QUEUE_NAME);
85 }
86
87 /**
88 * 创建订单的DLX exchange
89 *
90 * @return
91 */
92 @Bean
93 public DirectExchange delayOrderExchange() {
94 return new DirectExchange(ORDER_DELAY_EXCHANGE_NAME);
95 }
96
97 /**
98 * 创建order_delay_process_queue队列,也就是订单实际消费队列
99 *
100 * @return
101 */
102 @Bean
103 public Queue delayProcessOrderQueue() {
104 return QueueBuilder.durable(ORDER_DELAY_PROCESS_QUEUE_NAME).build();
105 }
106
107 /**
108 * 将DLX绑定到实际消费队列
109 *
110 * @param delayProcessOrderQueue
111 * @param delayExchange
112 * @return
113 */
114 @Bean
115 public Binding dlxOrderBinding(Queue delayProcessOrderQueue, DirectExchange delayOrderExchange) {
116 return BindingBuilder.bind(delayProcessOrderQueue).to(delayOrderExchange).with(ORDER_DELAY_PROCESS_QUEUE_NAME);
117 }
118
119 /**
120 * 监听订单实际消费者队列order_delay_process_queue
121 *
122 * @param connectionFactory
123 * @param processReceiver
124 * @return
125 */
126 @Bean
127 public SimpleMessageListenerContainer orderProcessContainer(ConnectionFactory connectionFactory,
128 OrderProcessReceiver processReceiver) {
129 SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
130 container.setConnectionFactory(connectionFactory);
131 container.setQueueNames(ORDER_DELAY_PROCESS_QUEUE_NAME); // 监听order_delay_process_queue
132 container.setMessageListener(new MessageListenerAdapter(processReceiver));
133 return container;
134 }
135 }

消费者 OrderProcessReceiver :

 1 package com.tuohang.platform.config;
2
3 import java.util.Objects;
4
5 import org.apache.tools.ant.types.resources.selectors.Date;
6 import org.slf4j.Logger;
7 import org.slf4j.LoggerFactory;
8 import org.springframework.amqp.core.Message;
9 import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
10 import org.springframework.stereotype.Component;
11 import com.rabbitmq.client.Channel;
12
13 /**
14 * 订单延迟处理消费者
15 *
16 *
17 * @author Administrator
18 * @version 1.0
19 * @Date 2018年9月18日
20 */
21 @Component
22 public class OrderProcessReceiver implements ChannelAwareMessageListener {
23
24 private static Logger logger = LoggerFactory.getLogger(OrderProcessReceiver.class);
25
26 String msg = "The failed message will auto retry after a certain delay";
27
28 @Override
29 public void onMessage(Message message, Channel channel) throws Exception {
30 try {
31 processMessage(message);
32 } catch (Exception e) {
33 // 如果发生了异常,则将该消息重定向到缓冲队列,会在一定延迟之后自动重做
34 channel.basicPublish(OrderQueueConfig.ORDER_PRE_EXCHANGE_NAME, OrderQueueConfig.ORDER_PRE_TTL_DELAY_QUEUE_NAME, null,
35 msg.getBytes());
36 }
37 }
38
39 /**
40 * 处理订单消息,如果订单未支付,取消订单(如果当消息内容为FAIL_MESSAGE的话,则需要抛出异常)
41 *
42 * @param message
43 * @throws Exception
44 */
45 public void processMessage(Message message) throws Exception {
46 String realMessage = new String(message.getBody());
47 logger.info("Received <" + realMessage + ">");
48 // 取消订单
49 if(!Objects.equals(realMessage, msg)) {
50 // SpringKit.getBean(ITestService.class).resetSexById(Long.valueOf(realMessage));
51 System.out.println("测试111111-----------"+new Date());
52 System.out.println(message);
53 }
54 }
55 }

或者

 1 /**
2 * 测试 rabbit 消费者
3 *
4 *
5 * @author Administrator
6 * @version 1.0
7 * @Date 2018年9月25日
8 */
9 @Component
10 @RabbitListener(queues = TestQueueConfig.TEST_DELAY_PROCESS_QUEUE_NAME)
11 public class TestProcessReceiver {
12
13 private static Logger logger = LoggerFactory.getLogger(TestProcessReceiver.class);
14
15 String msg = "The failed message will auto retry after a certain delay";
16
17 @RabbitHandler
18 public void onMessage(Message message, Channel channel) throws Exception {
19 try {
20 processMessage(message);
21 //告诉服务器收到这条消息 已经被我消费了 可以在队列删掉;否则消息服务器以为这条消息没处理掉 后续还会在发
22 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
23 } catch (Exception e) {
24 // 如果发生了异常,则将该消息重定向到缓冲队列,会在一定延迟之后自动重做
25 channel.basicPublish(TestQueueConfig.TEST_PRE_EXCHANGE_NAME, TestQueueConfig.TEST_PRE_TTL_DELAY_QUEUE_NAME, null,
26 msg.getBytes());
27 }
28 }
29
30 /**
31 * 处理订单消息,如果订单未支付,取消订单(如果当消息内容为FAIL_MESSAGE的话,则需要抛出异常)
32 *
33 * @param message
34 * @throws Exception
35 */
36 public void processMessage(Message message) throws Exception {
37 String realMessage = new String(message.getBody());
38 logger.info("Received < " + realMessage + " >");
39 // 取消订单
40 if(!Objects.equals(realMessage, msg)) {
41 System.out.println("测试111111-----------"+new Date());
42 }else {
43 System.out.println("rabbit else...");
44 }
45 }
46 }

生产者

 1 /**
2 * 测试rabbitmq
3 *
4 * @return
5 */
6 @RequestMapping(value = "/testrab")
7 public String testraa() {
8 GenericResult gr = null;
9 try {
10 String name = "test_pre_ttl_delay_queue";
11 long expiration = 10000;//10s 过期时间
12 rabbitTemplate.convertAndSend(name,String.valueOf(123456));
13 // 在单个消息上设置过期时间
14 //rabbitTemplate.convertAndSend(name,(Object)String.valueOf(123456), new ExpirationMessagePostProcessor(expiration));
15
16
17 } catch (ServiceException e) {
18 e.printStackTrace();
19 gr = new GenericResult(StateCode.ERROR, languageMap.get("network_error"), e.getMessage());
20 }
21
22 return getWrite(gr);
23 }

最新文章

  1. 项目 CTR预估
  2. lvs/dr配置
  3. UI: 标题栏
  4. linux创建静态库
  5. linux Bash
  6. ArcGis 创建IWorkspace
  7. 使用BusyBox制作根文件系统【转】
  8. adb uninstall/pull/push 命令的使用总结
  9. C#局域网聊天工具_UDP广播
  10. executeQuery,executeUpdate 和 execute 区别
  11. cygwin 扩展
  12. html精确定位
  13. Swing-GridBagLayout用法-入门
  14. poj1988 Cube Stacking 带权并查集
  15. LeetCode算法题-Sum of Square Numbers(Java实现)
  16. 老毛桃pe装机工具一键还原系统
  17. docker 配置文件:/etc/docker/daemon.json
  18. ZT android -- 蓝牙 bluetooth (四)OPP文件传输
  19. 1-19-1 RHEL6启动原理和故障排除
  20. 使用spring的多线程机制

热门文章

  1. Codeforces Round #658 (Div. 2) D. Unmerge(dp)
  2. GPLT L2-006 树的遍历(二叉树)
  3. 【noi 2.6_6049】买书(DP)
  4. hdu1541 Stars
  5. Codeforces Round #575 (Div. 3) B. Odd Sum Segments 、C Robot Breakout
  6. Windows Terminal 更换主题
  7. 【Azure Redis 缓存】使用Python代码获取Azure Redis的监控指标值 (含Powershell脚本方式)
  8. 基于OpenCV全景拼接(Python)SIFT/SURF
  9. 深入理解JavaScript中的类继承
  10. 016.NET5_MVC_视图组件扩展定制