1. 死信队列概念

    死信队列(Dead Letter Exchange),死信交换器。当业务队列中的消息被拒绝或者过期或者超过队列的最大长度时,消息会被丢弃,但若是配置了死信队列,那么消息可以被重新发布到另一个交换器,这个交换器就是DLX,与DLX绑定的队列称为死信队列。

若业务队列想绑定死信队列,那么在声明业务队列时,需要指定DLX(死信Exchange)和DLK(死信RoutingKey)。

控制台.png

2.消息成为"死信"的前提

消息被否定确认,使用 channel.basicNack或channel.basicReject ,并且此时requeue属性被设置为false。

消息在队列的存活时间超过设置的TTL时间。

消息队列的消息数量已经超过最大队列长度。

“死信”消息会被RabbitMQ进行特殊处理,如果配置了死信队列信息,那么该消息将会被丢进死信队列中,如果没有配置,则该消息将会被丢弃。

  1. 如何为业务队列配置死信队列

    在声明业务队列时,指定死信配置。

@Configuration

public class RabbitMqConfig {

/**
* 死信队列 交换机标识符
*/
public static final String DEAD_LETTER_QUEUE_KEY = "x-dead-letter-exchange";
/**
* 死信队列交换机绑定键标识符
*/
public static final String DEAD_LETTER_ROUTING_KEY = "x-dead-letter-routing-key"; /********************************************************
* 创建死信队列
*******************************************************/ public final static String deadQueueName = "dead_queue";
public final static String deadRoutingKey = "dead_routing_key";
public final static String deadExchangeName = "dead_exchange"; /**
* 创建死信队列
*/
@Bean
public Queue deadQueue() {
return new Queue(deadQueueName, true);
} /**
* 创建死信交换机
*/
@Bean
public DirectExchange deadExchange() {
return new DirectExchange(deadExchangeName);
} /**
* 死信队列与死信交换机绑定
*/
@Bean
public Binding bindingDeadExchange(Queue deadQueue, DirectExchange deadExchange) {
return BindingBuilder.bind(deadQueue).to(deadExchange).with(deadRoutingKey);
} /*************************************************
* 创建业务队列
************************************************/
public final static String kinsonQueueName = "kinson_queue";
public final static String kinsonRoutingKey = "kinson_routing_key";
public final static String kinsonExchangeName = "kinson_exchange"; /**
* 创建业务交换机
*/
@Bean
public DirectExchange kinsonExchange() {
return new DirectExchange(kinsonExchangeName);
} /**
* 创建业务队列时——声明了死信队列
*/
@Bean
public Queue kinsonQueue() {
// 将普通队列绑定到死信队列交换机上
Map<String, Object> args = new HashMap<>(2);
args.put(DEAD_LETTER_QUEUE_KEY, deadExchangeName);
args.put(DEAD_LETTER_ROUTING_KEY, deadRoutingKey);
return new Queue(kinsonQueueName, true, false, false, args);
} /**
* 绑定关系
*/
@Bean
public Binding kinsonRoutingKey(Queue kinsonQueue, DirectExchange kinsonExchange) {
return BindingBuilder.bind(kinsonQueue).to(kinsonExchange).with(kinsonRoutingKey);
}

}

死信队列并不是特殊的队列,只是绑定到死信交换机上的队列。死信交换机只是接受死信的普通交换机,它的类型也是[Direct、Fanout、Topic]。一般来说,会给每一个业务队列都声明一个独有的路由key,并对应配置一个死信队列进行监听(但是一个项目可共用一个死信交换机)。

  1. 死信消息的Header

    @Component

    @Slf4j

    public class CustomerRev {

    //业务队列

    @RabbitListener(queues = {"kinson_queue"})

    public void receiver(Message msg, Channel channel) {

    try {

    //打印数据

    String message = new String(msg.getBody(), StandardCharsets.UTF_8);

    log.info("【业务队列msg.getMessageProperties().getHeaders()】:{}",JSON.toJSONString(msg.getMessageProperties().getHeaders()));

    //(手动确认)丢弃消息,且不重新回队列,消息会进入死信队列

    channel.basicReject(msg.getMessageProperties().getDeliveryTag(), false);

    } catch (Exception e) {

    log.error("错误信息:{}", e.getMessage());

    }

    }

    //死信队列

    @RabbitListener(queues = {"dead_queue"})

    public void receiver2(Message msg, Channel channel) {

    try {

    //打印数据

    log.info("【死信队列msg.getMessageProperties().getHeaders():{}】", JSON.toJSONString(msg.getMessageProperties().getHeaders()));

    log.info("【死信队列msg.getMessageProperties().getXDeathHeader():{}】", JSON.toJSONString(msg.getMessageProperties().getXDeathHeader()));

    String message = new String(msg.getBody(), StandardCharsets.UTF_8);

    log.info("死信队列收取消息:"+message);

    channel.basicAck(msg.getMessageProperties().getDeliveryTag(),false);

    } catch (Exception e) {

    log.error("错误信息:{}", e.getMessage());

    }

    }

    }

    执行消息:

【业务队列msg.getMessageProperties().getHeaders()】:{}

【死信队列msg.getMessageProperties().getHeaders():{"x-first-death-exchange":"","x-death":[{"reason":"rejected","count":1,"exchange":"","time":1604549067000,"routing-keys":["kinson_queue"],"queue":"kinson_queue"}],"x-first-death-reason":"rejected","x-first-death-queue":"kinson_queue"}】

【死信队列msg.getMessageProperties().getXDeathHeader():[{"reason":"rejected","count":1,"exchange":"","time":1604549067000,"routing-keys":["kinson_queue"],"queue":"kinson_queue"}]】

死信队列收取消息:Date:1604549064079

死信队列消费的消息的Header:

字段名 含义

x-first-death-exchange 第一次被抛入的死信交换机的名称

x-first-death-reason 第一次成为死信的原因,rejected:消息在重新进入队列时被队列拒绝,由于default-requeue-rejected参数被设置为false。expired :消息过期。maxlen: 队列内消息数量超过队列最大容量

x-first-death-queue 第一次成为死信前所在队列名称

x-death 历次被投入死信交换机的信息列表,同一个消息每次进入一个死信交换机,这个数组的信息就会被更新

5. 死信队列使用场景

在较为重要的业务场景中,确保未被消费的消息不被丢弃,一般发送消息异常可能原因主要有消息本身存在错误导致业务处理异常,参数校验异常,网络波动导致查询或调用接口异常。为了不使消息堆积,从而丢弃消息。

当发生异常时,不能每次通过日志来获取原消息,处理完问题后重新投递消息。通过配置死信队列,可以让未正确消费的消息暂存到死信队列中,后续排查清楚问题,编写相应的代码来处理死信消息(不在手动的恢复数据)。

推荐阅读

一文了解死信队列

最新文章

  1. 轻量级表达式树解析框架Faller
  2. spring mvc 跳转后页面cs样式表丢失
  3. 如何在Flash Builder里新建ActionScript工程
  4. uva 11991 Easy Problem from Rujia Liu? vector+map
  5. JDK源码阅读(二) AbstractList
  6. strstr和memcmp函数的实现
  7. [转] HTC:html组件
  8. 终于懂了:Delphi消息的Result域出现的原因——要代替回调函数的返回值!(MakeObjectInstance不会帮助处理(接收)消息回调函数的返回值)
  9. 老李推荐:第8章1节《MonkeyRunner源码剖析》MonkeyRunner启动运行过程-运行环境初始化
  10. 用jlink在mini2440上烧写uboot
  11. 在JS中调用CS里的方法(PageMethods)
  12. Spring Boot入门一:在Eclipse中使用Spring boot
  13. 给Linux系统管理员准备的Nmap命令的29个实用范例
  14. Delphi中使用ADO连接Excel
  15. scipy构建稀疏矩阵
  16. git入门教程,主要命令详解。
  17. 1077 Kuchiguse (20 分)
  18. 【BZOJ】2006: [NOI2010]超级钢琴
  19. linux查看tftp服务是否启动
  20. linux关于ftp查看不到文件列表的问题

热门文章

  1. 玩转控件:GDI+动态绘制流程图
  2. fastadmin 增加批量操作字段 提示无权限
  3. Microsoft.VisualBasic.dll内置的判断变量类型的一系列实用方法
  4. centos8安装fastdfs6.06集群方式二之:tracker的安装/配置/运行
  5. Ubuntu搜狗输入法安装
  6. 开源 Open Source
  7. mysql You can&#39;t specify target table &#39;sys_right_menu&#39; for update in FROM clause (不能从Objor子句中指定目标表“SysRyType菜单)
  8. java 第二课 标识符
  9. Stream(四)
  10. Redis的一些问题