一 什么是死信队列

当一条消息在队列中出现以下三种情况的时候,该消息就会变成一条死信。

  • 消息被拒绝(basic.reject / basic.nack),并且requeue = false
  • 消息TTL过期
  • 队列达到最大长度

当消息在一个队列中变成一个死信之后,如果配置了死信队列,它将被重新publish到死信交换机,死信交换机将死信投递到一个队列上,这个队列就是死信队列。

二 实现死信队列

2.1 原理图

2.2 创建消费者

创建一个消费者,绑定消费队列及死信交换机,交换机默认为direct模型,死信交换机也是,arguments绑定死信交换机和key。(注解支持的具体参数文末会附上)

public class DirectConsumer {
@RabbitListener(bindings = {
@QueueBinding(value = @Queue(value = "javatrip",arguments =
{@Argument(name="x-dead-letter-exchange",value = "deadExchange"),
@Argument(name="x-dead-letter-routing-key",value = "deadKey")
}),
exchange = @Exchange(value="javatripDirect"),
key = {"info","error","warning"}
)
})
public void receive1(String message, @Headers Map<String,Object> headers, Channel channel)throws Exception{
System.out.println("消费者1"+message);
}

2.3 创建生产者

public void publishMessage(String message){

    rabbitTemplate.setMandatory(true);
rabbitTemplate.convertAndSend("javatripDirect","info",message);
}

三 造成死信的三种情况

3.1 拒绝消息,并且禁止重新入队

  1. 设置yml为手动签收模式
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual
  1. 设置拒绝消息并禁止重新入队
Long deliverTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
channel.basicNack(deliverTag,false,false);
  1. 绑定死信队列
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue(value = "javatripDead"),
exchange = @Exchange(value = "deadExchange"),
key = "deadKey"
)
})
public void receive2(String message){
System.out.println("我是一条死信:"+message);
}

3.2 消息TTL过期

绑定业务队列的时候,增加消息的过期时长,当消息过期后,消息将被转发到死信队列中。

@RabbitListener(bindings = {
@QueueBinding(value = @Queue(value = "javatrip",arguments =
{@Argument(name="x-dead-letter-exchange",value = "deadExchange"),
@Argument(name="x-dead-letter-routing-key",value = "deadKey"),
@Argument(name = "x-message-ttl",value = "3000")
}),
exchange = @Exchange(value="javatripDirect"),
key = {"info","error","warning"}
)
})
public void receive1(String message, @Headers Map<String,Object> headers, Channel channel)throws Exception{
System.out.println("消费者1"+message);
}

3.3 队列达到最大长度

设置消息队列长度,当队列中的消息达到最大长度后,继续发送消息,消息将被转发到死信队列中。

@RabbitListener(bindings = {
@QueueBinding(value = @Queue(value = "javatrip",arguments =
{@Argument(name="x-dead-letter-exchange",value = "deadExchange"),
@Argument(name="x-dead-letter-routing-key",value = "deadKey"),
@Argument(name = "x-max-length",value = "3")
}),
exchange = @Exchange(value="javatripDirect"),
key = {"info","error","warning"}
)
})
public void receive1(String message, @Headers Map<String,Object> headers, Channel channel)throws Exception{
System.out.println("消费者1"+message);
}

四 Spring Boot整合RabbitMQ用到的几个注解

  1. @QueueBinding作用就是将队列和交换机进行绑定,主要有以下三个参数:
@Target({})
@Retention(RetentionPolicy.RUNTIME)
public @interface QueueBinding { /**
* @return the queue.
*/
Queue value(); /**
* @return the exchange.
*/
Exchange exchange(); /**
* @return the routing key or pattern for the binding.
* Multiple elements will result in multiple bindings.
*/
String[] key() default {};
}
  1. @Queue是声明队列及队列的一些属性,主要参数如下:
@Target({})
@Retention(RetentionPolicy.RUNTIME)
public @interface Queue { /**
* @return the queue name or "" for a generated queue name (default).
*/
@AliasFor("name")
String value() default ""; /**
* @return the queue name or "" for a generated queue name (default).
* @since 2.0
*/
@AliasFor("value")
String name() default ""; /**
* 是否持久化
*/
String durable() default ""; /**
* 是否独享、排外的.
*/
String exclusive() default ""; /**
* 是否自动删除;
*/
String autoDelete() default ""; /**
* 队列的其他属性参数
* (1)x-message-ttl:消息的过期时间,单位:毫秒;
*(2)x-expires:队列过期时间,队列在多长时间未被访问将被删除,单位:毫秒;
*(3)x-max-length:队列最大长度,超过该最大值,则将从队列头部开始删除消息;
*(4)x-max-length-bytes:队列消息内容占用最大空间,受限于内存大小,超过该阈值则从队列头部开始删除消息;
*(5)x-overflow:设置队列溢出行为。这决定了当达到队列的最大长度时消息会发生什么。有效值是drop-head、reject-publish或reject-publish-dlx。仲裁队列类型仅支持drop-head;
*(6)x-dead-letter-exchange:死信交换器名称,过期或被删除(因队列长度超长或因空间超出阈值)的消息可指定发送到该交换器中;
*(7)x-dead-letter-routing-key:死信消息路由键,在消息发送到死信交换器时会使用该路由键,如果不设置,则使用消息的原来的路由键值
*(8)x-single-active-consumer:表示队列是否是单一活动消费者,true时,注册的消费组内只有一个消费者消费消息,其他被忽略,false时消息循环分发给所有消费者(默认false)
*(9)x-max-priority:队列要支持的最大优先级数;如果未设置,队列将不支持消息优先级;
*(10)x-queue-mode(Lazy mode):将队列设置为延迟模式,在磁盘上保留尽可能多的消息,以减少RAM的使用;如果未设置,队列将保留内存缓存以尽可能快地传递消息;
*(11)x-queue-master-locator:在集群模式下设置镜像队列的主节点信息。
*/
Argument[] arguments() default {};
}
  1. @Exchange是声明交换及交换机的一些属性,
@Target({})
@Retention(RetentionPolicy.RUNTIME)
public @interface Exchange { String TRUE = "true"; String FALSE = "false"; /**
* @return the exchange name.
*/
@AliasFor("name")
String value() default ""; /**
* @return the exchange name.
* @since 2.0
*/
@AliasFor("value")
String name() default ""; /**
* 交换机类型,默认DIRECT
*/
String type() default ExchangeTypes.DIRECT; /**
* 是否持久化
*/
String durable() default TRUE; /**
* 是否自动删除
*/
String autoDelete() default FALSE; /**
* @return the arguments to apply when declaring this exchange.
* @since 1.6
*/
Argument[] arguments() default {};
}

最新文章

  1. MRDS学习四——自动型机器车
  2. Centos 6.5安装oracle 11g
  3. NPOI 导入导出excel 支持 03 07
  4. 升级Flash Builder 4.6中的Flash Player版本
  5. partition by
  6. Sql Server CTE递归
  7. cf C. Divisible by Seven
  8. Linux--本地yum库
  9. SAP开发快捷键
  10. forwardport--源码笔记--注释
  11. PYTHON定义函数制作简单登录程序(详细)
  12. Android OpenSL ES 开发:Android OpenSL 介绍和开发流程说明
  13. 跟踪调试JDK源码时遇到的问题及解决方法
  14. python的数据驱动
  15. 为什么要编译Linux内核?
  16. EasyUI DataGrid Checkbox 多选 获取选中行中的内容
  17. [转]微信小程序之购物车功能
  18. 【JavaScript】JavaScript(V8)实现输入输出
  19. python开发_tkinter_图形随鼠标移动
  20. Educational Codeforces Round 47

热门文章

  1. 爬虫06 /scrapy框架
  2. Kite: 一个分布式微服务框架(翻译)
  3. 更优雅的在 Xunit 中使用依赖注入
  4. 五分钟带你深入了解Redis
  5. 利用华为eNSP模拟器实现vlan之间的通信
  6. Java之枚举类
  7. Java实现简单的增删改查操作
  8. Java流程控制(Scanner)
  9. webserver 返回json 如何去掉 &lt;string xmlns=&quot;http://tempuri.org/&quot;&gt;
  10. ParallelsDesktop下Kali安装