1、什么是延迟队列

延迟队列中存储延迟消息,延迟消息是指当消息被发送到队列中不会立即消费,而是等待一段时间后再消费该消息。

延迟队列很多应用场景,一个典型的应用场景是订单未支付超时取消,用户下单之后30分钟内未支付成功,则把订单取消。

2、使用要求

RabbitMQ 本身没有直接支持延迟队列的功能,但是可以通过过期时间TTL和死信队列来模拟延迟队列。

过期时间TTL 可以参考文章: 【RabbitMQ 实战指南】一 过期时间TTL

死信队列可以参考文章:【RabbitMQ 实战指南】一 死信队列

3、延迟队列测试

采用订单未支付超时取消的应用场景来做测试,其具体步骤如下:

  • 1、创建两个交换器 exchange.order 和 exchange.delay, 分别绑定两个队列 queue.order 和 queue.delay

  • 2、把 queue.delay 队列里面的消息配置过期时间,一般订单是30分钟,这里设置成10秒,然后通过 x-dead-letter-exchange 指定死信交换器为 exchange.delay

  • 3、发送消息到 queue.order 中,消息过期之后流入 exchange.delay,然后路由到 queue.delay 队列中,然后检查订单状态,如果未支付,则进行取消操作

3.1、生产者代码

<?php
require __DIR__ . '/../../../../vendor/autoload.php'; use PhpAmqpLib\Wire\AMQPTable;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Connection\AMQPStreamConnection; // todo 更改配置
$connection = new AMQPStreamConnection('192.168.33.1', 5672, 'zhangcs', 'zhangcs', '/'); $channel = $connection->channel(); $channel->exchange_declare('exchange.order', AMQPExchangeType::DIRECT, false, true);
$channel->exchange_declare('exchange.delay', AMQPExchangeType::DIRECT, false, true);
$args = new AMQPTable();
// 消息过期方式:设置 queue.order 队列中的消息10s之后过期
$args->set('x-message-ttl', 10000);
$args->set('x-dead-letter-exchange', 'exchange.delay');
$args->set('x-dead-letter-routing-key', 'routingkey.delay');
$channel->queue_declare('queue.order', false, true, false, false, false, $args);
$channel->queue_declare('queue.delay', false, true, false, false); $channel->queue_bind('queue.order', 'exchange.order', 'routingkey.cancel.order');
$channel->queue_bind('queue.delay', 'exchange.delay', 'routingkey.delay');
$message = new AMQPMessage('F20190413180108970');
$channel->basic_publish($message, 'exchange.order', 'routingkey.cancel.order');

$channel->close();
$connection->close();

运行生产者代码之后,queue.order 队列会有一条消息,如下图:

10秒之后,消息会过期,然后被进入 exchange.delay, 进而路由到 queue.delay 队列中:

3.2、消费者代码

<?php
require __DIR__ . '/../../../../vendor/autoload.php'; use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Connection\AMQPStreamConnection; // todo 更改配置
$connection = new AMQPStreamConnection('192.168.33.1', 5672, 'zhangcs', 'zhangcs', '/');
$channel = $connection->channel(); $channel->exchange_declare('exchange.delay', AMQPExchangeType::DIRECT, false, true);
$channel->queue_declare('queue.delay', false, true, false, false); $channel->queue_bind('queue.delay', 'exchange.delay', 'routingkey.delay'); function process_message($message)
{
echo "开始处理订单,订单号:" . $message->body . PHP_EOL;
echo "获取订单的状态,如果未支付,则进行取消订单操作" . PHP_EOL;
$message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
} $channel->basic_consume('queue.delay', 'cancelOrder', false, false, false, false, 'process_message'); function shutdown($channel, $connection)
{
$channel->close();
$connection->close();
}
register_shutdown_function('shutdown', $channel, $connection); while ($channel ->is_consuming()) {
$channel->wait();
}

运行消费者代码之后,会获取到订单号,之后可以检查该订单的状态,如果未支付则进行取消操作,如下图:

最新文章

  1. EasyUI 解决 datagrid 中 NumberBox 限制小数位数后不能输入小数点问题
  2. codeforces C. Vanya and Scales
  3. 移动端中pagehide、pageshow的应用
  4. 时间就像Hourglass一样,积累(沉淀)越多,收获越大
  5. httpClient 4.x post get方法
  6. scala函数定义的四种方式
  7. 瞬态电压抑制二极管(TVS)选用原则
  8. Ajax禁止缓存的几个解决方案
  9. 如何自学Python?
  10. 9.2.2、Libgdx的输入处理之事件处理
  11. restful规范快速记忆
  12. CentOS 6.5 升级内核
  13. vagrant 安装笔记
  14. alt.js 使用教程
  15. [原创]HTML 用div模拟select下拉框
  16. bash 括号(小括号,双小括号,中括号,双中括号,大括号)
  17. wpgcms---设置应用模板
  18. 【转】Windows Server 2008 R2怎样设置自动登陆
  19. 工具类-vim在shell中卡死的情况
  20. PVS 7.6 部署教程

热门文章

  1. java、python、MYSQL环境安装
  2. 读《深入理解Elasticsearch》点滴-查询模版(结合官网手册,版本5.1)
  3. 在网页中打印一个99乘法表--JavaScript描述
  4. Docker系列(五):.Net Core实现k8s健康探测机制
  5. python 对excel进行截图
  6. HBase学习与实践
  7. ssrf漏洞学习(PHP)
  8. 有了这套模板,女朋友再也不用担心我刷不动 LeetCode 了
  9. zoj 3886 Nico Number
  10. mysql锁表处理语句