消息可靠性

确保消息至少被消费了一次(不丢失)

消息丢失的几种情况:

  1. 消息在网络传输时丢失,如生产者到交换机,交换机到队列的过程中
  2. MQ宕机:消息到达Queue了,但在消费者消费之前,MQ就宕机了
  3. 消费者宕机:消费者接收了消息但是还没处理就宕机了

如何解决? RabbitMQ分别针对生产者, MQ和消费者这三个角色提供了一些解决方法

生产者消息确认

根据下图,消息的传输丢失可能有3种情况:

  • 消息可能在从生产者到exchange的时候丢失,即没到达exchange。
  • 消息从exchange通过routingKey将消息路由到queue 时丢失,即没到达queue
  • 消息成功到达queue后,没有被消费者消费或者消费者获取消息还没经过处理就宕机了

Publisher Confirms

Since AMQP gives few guarantees regarding message persistence/handling, the traditional way to do this is with transactions, which can be unacceptably slow. To remedy this problem, we introduce an extension to AMQP in the form of Lightweight Publisher Confirms. (渣翻:传统的方式是采用事务控制,但这种方法十分地慢,RabbitMQ提供了一种更加轻量级的方式:生产者确认。

  • RabbitMq发送到消费者的消息确认
  • RabbitMQ到生产者的消息确认 (这种机制是Rabbitmq对Amqp协议的扩展)

    以上两种特性都收到了TCP协议的启发

它们对于从发布者到 RabbitMQ 节点以及从 RabbitMQ 节点到消费者的可靠交付都是必不可少的。换句话说,它们对于数据安全至关重要,应用程序与 RabbitMQ 节点一样负责。

示例

Spring-amqp支持 publish confirm and returns 的 RabbitTemplate实现。 我们将使用springboot+rabbitmq来模拟上面三种情况。

RabbitTemplate 是spring-amqp定义的一个模板,它实现了AmqpTemplate接口(此接口定义了涵盖发送和接收消息的一般行为。)

  1. 新建工程,包含两个模块:consumer和publisher

配置文件:

# publisher的配置文件: (consumer的配置文件差不多)
logging:
pattern:
dateformat: HH:mm:ss:SSS
level:
cn.itcast: debug
spring:
rabbitmq:
host: 192.168.57.100 # rabbitMQ的ip地址
port: 5672 # 端口
username: xxx # 用户名称
password: 1234
virtual-host: /
publisher-confirm-type: correlated # 指定生产者确认的模式,correlated表示异步
publisher-returns: true # 开启生产者返回
template: # rabbitTemplage的设置,也可以通过代码配置
mandatory: true # 如果设置为true,发送失败的消息会被ReturnCallBack方法回调

Publish配置类,通过RabbitTemplate配置ReturnCallback函数用来处理消息没到队列的情况。

根据spring-amqp文档,一个RabbitTemplate只能支持一个ReturnCallback 函数。


@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware{
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
// 获取rabbitTemplate对象
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) ->
log.error("消息发送队列失败:{},响应码:{},失败原因:{},交换机:{},routingKey:{}",
message, replyCode, replyText, exchange, routingKey));
//重发消息...
}
}

这样我们就完成了一种失败处理:消息无法路由到队列。

测试类:

@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate; @Test
public void testSendMessage2SimpleQueue() throws InterruptedException {
String routingKey = "simple.test";
String message = "hello spring-amqp!";
//准备CorrelationData,它包含了一个全局的ID用来标识消息,并且设置了两个函数用于处理其余两种错误
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
correlationData.getFuture().addCallback(confirm -> {
if(confirm.isAck()){
log.debug("消息成功投递到交换机!消息ID:{}",correlationData.getId());
}else{
log.error("消息投递到交换机失败,消息ID:{}",correlationData.getId());
}
}, throwable -> {
//失败,记录日志
log.error("消息发送失败 ",throwable) ;
});
rabbitTemplate.convertAndSend("amq.topic",routingKey,message,correlationData);
}

在测试中:

  • 如果我们将routingKey改成MQ中不存在值,则会触发 ReturnCallback回调函数, 因为消息到达交换机后无法路由到任何一个队列
  • 同样的,如果我们将交换机amq.topic设置成MQ中不存在的值,则会得到 nack publish confirm, 因为消息无法投递到交换机。

消费者消息确认

消息几经波折终于到达了消费者这里,结果消费者还没来得及处理就宕机了,或在处理过程中发生了异常,导致消息没有被正确处理。而RabbitMQ认为消费者已经消费了,直接把消息丢掉。那前面的工作都白忙了。

于是,RabbitMQ提供了消费者确认机制。消费者处理消息后向MQ发送ack回执,MQ收到ack才会丢弃消息

Spring-amqp则允许配置三种确认模式:

  • manual:手动ack,业务处理完成后,由程序员调用api发送ack
  • auto:自动ack,由spring监测listen代码是否出现异常,没有异常则返回ack,抛出异常则返回nack
  • none: 关闭ack,MQ假定消费者拿到消息后会成功处理,因此消息投递后会马上删除

示例

consumer配置文件yaml中指定spring-amqp的确认模式,这里以auto来演示,并开启失败重传。

logging:
pattern:
dateformat: HH:mm:ss:SSS
level:
cn.itcast: debug
spring:
rabbitmq:
host: 192.168.57.100 # rabbitMQ的ip地址
port: 5672 # 端口
username: wingdd
password: 1234
virtual-host: /
listener:
simple:
prefetch: 1
acknowledge-mode: auto # 开启自动确认模式

消费者发生异常后,消息会重新入队requeue到队列,再发送给消费者,这会导致mq压力过大。因此,我们需要利用spring的retry机制,消费者异常时进行本地重试,而不是无限制的入队。


增加重传的配置:

       retry:
enabled: true # 开启重传
initial-interval: 1000ms # 初始失败等待时长
multiplier: 3 # 下一次等待时长的倍数,下一次等待时长=multiplier * last-interval
max-attempts: 3 # 最大重传次数
stateless: true # 无状态,默认为true。 如果业务中有事务,则要改成false
max-interval: 10000ms # 最大等待时长,约束multiplier和interval
@Slf4j
@Component
public class SpringRabbitListener { @RabbitListener(queues="simple.queue")
public void ListenSimpleQueue(String msg) {
System.out.println("消费者接收到来自simple.queue的消息:【 " + msg + " 】");
int a= 1/0 ; //将抛出异常
log.info("消费者处理成功") ;
}
}

死信交换机

死信交换机就是指接收死信的交换机。

什么样的消息会称为死信?

  • 被消费者使用basic.reject或者basic.nack返回,并且requeue参数是false的消息。
  • TTL过期未被消费的消息
  • 队列满了导致被丢弃了的消息

TTL,Time-To-Live,如果一个队列的消息TTL结束仍未消费,则会变成死信,ttl超时分为两种情况:1. 消息所在的队列设置了存活时间 2.消息本身设置了存活时间。( 如果两者都有设置,选短的那个)

使用死信交换机和TTL,可以实现消费者延迟接收消息的效果,这种消息模式乘坐延迟队列(Delay Queue)模式

延迟队列的使用场景包括:

  • 延迟发送短信
  • 用户下单后15分钟未支付则自动取消
  • 预约工作会议,20分钟后自动通知参会人员。

例子

如图,我们将实现这样的场景,publish将消息发送到ttl交换机再到ttl Queue后,消息超时将被投递到死信交换机和队列,此时consumer再消费消息,这就达到了延迟消息的效果



①、在consumer中添加 监听dl.queue

   @RabbitListener(bindings= @QueueBinding(
value=@Queue(name="dl.queue",durable = "true"),
exchange=@Exchange(name="dl.direct"),
key="dl"
))
public void listenDlQueue(String msg){
log.info("消费者接收到了dl.queue的消息 【"+msg+"】");
}

②、comsuer增加配置类用于创建ttl.queue和ttl交换机,并绑定。

package cn.itcast.mq.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; @Configuration
public class TTLConfiguration {
@Bean
public DirectExchange ttlExchange(){
return new DirectExchange("ttl.direct") ;
}
@Bean
public Queue ttlQueue(){
return QueueBuilder.durable("ttl.queue").
ttl(6000).
deadLetterExchange("dl.direct").
deadLetterRoutingKey("dl").
build();
}
// 将ttl交换机和ttlQueue绑定起来
@Bean
public Binding dlbinding(){
return BindingBuilder.bind(ttlQueue()).to(ttlExchange()).with("ttl");
}
}

③、在publisher模块中向ttl.queue发送消息

    @Test
public void TTLTest() {
Message message = MessageBuilder.withBody("hello TTL Queue Or Dead Queue!".getBytes(StandardCharsets.UTF_8))
.build();
rabbitTemplate.convertAndSend("ttl.direct", "ttl",message) ;
log.info("消息发送成功了!");
}

结果:消费者最终接收到了dl.queue的消息

高可用问题

从单点MQ到集群MQ

消息堆积问题

当生产者发送消息的速度超过了消费者处理消息的速度,就会导致队列中的消息堆积,直到队列存储消息达到上限,此时

最早接收到的消息就会变成死信,被丢弃。 这就是消息堆积问题

解决消息堆积的思路:

  1. 增加更多的消费者,提高消费速度
  2. 在消费者内开启线程池加快消息处理速度
  3. 扩大队列容积,提高堆积上限

惰性队列

RabbitMQ从3.6.0版本开始就增加了Lazy Queues的概念,惰性队列的特征如下:

  • 接收到消息后直接存入磁盘而非内存
  • 消费者要消费消息时才会从磁盘中读取并加载到内存
  • 支持数百万条的消息存储

参考

代码:根据Bilibili黑马教程编写

https://www.cnblogs.com/binghe001/p/14443360.html

https://docs.spring.io/spring-amqp/docs/1.6.3.RELEASE/reference/html/_reference.html

最新文章

  1. Oracle常用函数
  2. 出现( linker command failed with exit code 1)错误总结 (转)
  3. Java基础类型总结
  4. [CareerCup] 18.4 Count Number of Two 统计数字2的个数
  5. ubuntu下安装Node.js(源码安装)
  6. 自动发牌(C#版)
  7. VIM实用基本操作技巧
  8. lvs持久性工作原理和配置
  9. html input[type=file] css样式美化【转藏】
  10. svo:一个半直接单目视觉里程计(长期更新)
  11. SQL导入txt以及SQL中的时间格式操作
  12. shell脚本小案例
  13. Spring 链接数据库
  14. Mysql数据约束 整理
  15. Win10系列:C#应用控件进阶1
  16. jquery select使用
  17. C# windows服务:创建Windows服务(Windows Services)的一般步骤
  18. LightOj 1118 - Incredible Molecules(两圆的交集面积)
  19. java String分配内存空间备忘
  20. importlib模块与__import__详解

热门文章

  1. 破界!Omi生态omi-mp发布,用小程序开发生成Web
  2. Element UI table参数中的selectable的使用
  3. h5 在全屏iphonex中的适配
  4. 我的python学习记04
  5. 安装PLSQLDeveloper
  6. SSM实现个人博客-day01
  7. Adobe Xd 练习
  8. AWS - Basic 1
  9. Codeforces Round #671 (Div. 2) B. Stairs 难度1200
  10. Codeforces Round #703 (Div. 2)__ B. Eastern Exhibition__ 纯纯的思维