1、死信队列

DLX,全称为 Dead-Letter-Exchange,可以称之为死信交换器。当消息在一个队列中变成死信(dead message)之后,它能被发送到另一个交换器中,这个交换器就是DLX,绑定DLX的队列就称之为死信队列。

DLX 也是一个正常的交换器,和一般的交换器没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。当这个队列中存在死信时,RabbitMQ就会自动地将这个消息重新发布到设置的DLX上去,进而被路由到另一个队列,即死信队列。然后可以监听这个死信队列中的消息进行相应的处理。

2、消息变成死信的情况

  • 消息被拒绝(Basic.Reject/Basic.Nack),并且设置 requeue 参数为false, requeue 表示是否重新入队
  • 消息过期
  • 队列达到最大长度(声明队列的时候设置 x-max-length 参数,表示队列最大长度)

3、死信队列设置

可以通过为队列设置 x-dead-letter-exchange 参数设置 DLX,也可以通过设置 x-dead-letter-routing-key 参数为这个DLX指定路由键,如果没有特殊指定,则使用原队列的路由键。

4、死信队列测试

4.1 测试过程

整个过程如下图:

  • 第一步:创建两个交换器 exchange.normal 和 exchange.dlx, 分别绑定两个队列 queue.normal 和 queue.dlx
  • 第二步:把 queue.normal 队列里面的消息配置过期时间,然后通过 x-dead-letter-exchange 指定死信交换器为 exchange.dlx
  • 第三步:发送消息到 queue.normal 中,消息过期之后流入 exchange.dlx,然后路由到 queue.dlx 队列中,然后进行消费

4.2 生产者代码

<?php
require __DIR__ . '/../../../../vendor/autoload.php'; use PhpAmqpLib\Wire\AMQPTable;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Connection\AMQPStreamConnection; /**
* 死信队列测试
* 1、创建两个交换器 exchange.normal 和 exchange.dlx, 分别绑定两个队列 queue.normal 和 queue.dlx
* 2、把 queue.normal 队列里面的消息配置过期时间,然后通过 x-dead-letter-exchange 指定死信交换器为 exchange.dlx
* 3、发送消息到 queue.normal 中,消息过期之后流入 exchange.dlx,然后路由到 queue.dlx 队列中,进行消费
*/ // todo 更改配置
$connection = new AMQPStreamConnection('192.168.33.1', 5672, 'zhangcs', 'zhangcs', '/'); $channel = $connection->channel(); $channel->exchange_declare('exchange.dlx', AMQPExchangeType::DIRECT, false, true);
$channel->exchange_declare('exchange.normal', AMQPExchangeType::FANOUT, false, true);
$args = new AMQPTable();
// 消息过期方式:设置 queue.normal 队列中的消息10s之后过期
$args->set('x-message-ttl', 10000);
// 设置队列最大长度方式: x-max-length
//$args->set('x-max-length', 1);
$args->set('x-dead-letter-exchange', 'exchange.dlx');
$args->set('x-dead-letter-routing-key', 'routingkey');
$channel->queue_declare('queue.normal', false, true, false, false, false, $args);
$channel->queue_declare('queue.dlx', false, true, false, false); $channel->queue_bind('queue.normal', 'exchange.normal');
$channel->queue_bind('queue.dlx', 'exchange.dlx', 'routingkey');
$message = new AMQPMessage('Hello DLX Message');
$channel->basic_publish($message, 'exchange.normal', 'rk'); $channel->close();
$connection->close();

运行生产者代码之后,queue.normal 队列会有一条消息,如下图:

10秒之后,消息会过期,然后被进入 exchange.dlx, 进而路由到 queue.dlx 队列中:

4.3、消费者代码

<?php
require __DIR__ . '/../../../../vendor/autoload.php'; use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Connection\AMQPStreamConnection; // todo 更改配置
$connection = new AMQPStreamConnection('192.168.33.1', 5672, 'zhangcs', 'zhangcs', '/');
$channel = $connection->channel(); $channel->exchange_declare('exchange.dlx', AMQPExchangeType::DIRECT, false, true);
$channel->queue_declare('queue.dlx', false, true, false, false); $channel->queue_bind('queue.dlx', 'exchange.dlx', 'routingkey'); function process_message($message)
{
echo "\n--------\n";
echo $message->body;
echo "\n--------\n";
$message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
} $channel->basic_consume('queue.dlx', 'consumer_tag', false, false, false, false, 'process_message'); function shutdown($channel, $connection)
{
$channel->close();
$connection->close();
}
register_shutdown_function('shutdown', $channel, $connection); while ($channel ->is_consuming()) {
$channel->wait();
}

运行消费者代码之后,消费会从 queue.dlx 中消费掉:

最新文章

  1. NOIP2016呵呵记
  2. 在实体注解OneToMany时,要加上mappedby,避免产生中间表。
  3. 常用的 css 命名规则
  4. HTML1网页三部份内容
  5. 【LeetCode】Flatten Binary Tree to Linked List
  6. CodeForces 525C Ilya and Sticks 贪心
  7. hdu4417 Super Mario 树阵离线/划分树
  8. 关于设置CFileDialog的默认路径
  9. java-两个jre目录和三个lib目录-转
  10. https post
  11. Vue框架下的node.js安装教程
  12. [再寄小读者之数学篇](2014-06-20 Beta 函数)
  13. php学习----运算符
  14. python中get pass用法
  15. 解决nginx access日志中400 bad request 错误(转)
  16. Linux下使用date命令查看和修改时间
  17. 使用jquery的$.ajax向服务端传递中文,避免乱码的解决办法!
  18. [转]CocoaPods安装和使用教程
  19. kill -9 a postgres process
  20. 项目提交到github的忽略文件

热门文章

  1. 深入理解JVM内存分配策略
  2. 基于Asp.Net Core MVC和AdminLTE的响应式管理后台之侧边栏处理
  3. python正则表达式字符记录
  4. js控制滚动条在内容更新超出时自动滚到底部
  5. Autofac的AOP面向切面编程研究
  6. [Abp vNext 源码分析] - 9. 接口参数的验证
  7. 阿里云虚拟主机安装wordpress,提示连接数据库失败的解决方法
  8. 从零开始入门 K8s | 应用存储和持久化数据卷:核心知识
  9. Spring5源码解析3-refresh方法初探
  10. win10下使用Linux命令