【RabbitMQ 实战指南】一 延迟队列
2024-08-24 13:14:16
1、什么是延迟队列
延迟队列中存储延迟消息,延迟消息是指当消息被发送到队列中不会立即消费,而是等待一段时间后再消费该消息。
延迟队列很多应用场景,一个典型的应用场景是订单未支付超时取消,用户下单之后30分钟内未支付成功,则把订单取消。
2、使用要求
RabbitMQ 本身没有直接支持延迟队列的功能,但是可以通过过期时间TTL和死信队列来模拟延迟队列。
过期时间TTL 可以参考文章: 【RabbitMQ 实战指南】一 过期时间TTL
死信队列可以参考文章:【RabbitMQ 实战指南】一 死信队列
3、延迟队列测试
采用订单未支付超时取消的应用场景来做测试,其具体步骤如下:
1、创建两个交换器 exchange.order 和 exchange.delay, 分别绑定两个队列 queue.order 和 queue.delay
2、把 queue.delay 队列里面的消息配置过期时间,一般订单是30分钟,这里设置成10秒,然后通过 x-dead-letter-exchange 指定死信交换器为 exchange.delay
3、发送消息到 queue.order 中,消息过期之后流入 exchange.delay,然后路由到 queue.delay 队列中,然后检查订单状态,如果未支付,则进行取消操作
3.1、生产者代码
<?php
require __DIR__ . '/../../../../vendor/autoload.php'; use PhpAmqpLib\Wire\AMQPTable;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Connection\AMQPStreamConnection; // todo 更改配置
$connection = new AMQPStreamConnection('192.168.33.1', 5672, 'zhangcs', 'zhangcs', '/'); $channel = $connection->channel(); $channel->exchange_declare('exchange.order', AMQPExchangeType::DIRECT, false, true);
$channel->exchange_declare('exchange.delay', AMQPExchangeType::DIRECT, false, true);
$args = new AMQPTable();
// 消息过期方式:设置 queue.order 队列中的消息10s之后过期
$args->set('x-message-ttl', 10000);
$args->set('x-dead-letter-exchange', 'exchange.delay');
$args->set('x-dead-letter-routing-key', 'routingkey.delay');
$channel->queue_declare('queue.order', false, true, false, false, false, $args);
$channel->queue_declare('queue.delay', false, true, false, false); $channel->queue_bind('queue.order', 'exchange.order', 'routingkey.cancel.order');
$channel->queue_bind('queue.delay', 'exchange.delay', 'routingkey.delay');
$message = new AMQPMessage('F20190413180108970');
$channel->basic_publish($message, 'exchange.order', 'routingkey.cancel.order'); $channel->close();
$connection->close();
运行生产者代码之后,queue.order 队列会有一条消息,如下图:
10秒之后,消息会过期,然后被进入 exchange.delay, 进而路由到 queue.delay 队列中:
3.2、消费者代码
<?php
require __DIR__ . '/../../../../vendor/autoload.php'; use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Connection\AMQPStreamConnection; // todo 更改配置
$connection = new AMQPStreamConnection('192.168.33.1', 5672, 'zhangcs', 'zhangcs', '/');
$channel = $connection->channel(); $channel->exchange_declare('exchange.delay', AMQPExchangeType::DIRECT, false, true);
$channel->queue_declare('queue.delay', false, true, false, false); $channel->queue_bind('queue.delay', 'exchange.delay', 'routingkey.delay'); function process_message($message)
{
echo "开始处理订单,订单号:" . $message->body . PHP_EOL;
echo "获取订单的状态,如果未支付,则进行取消订单操作" . PHP_EOL;
$message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
} $channel->basic_consume('queue.delay', 'cancelOrder', false, false, false, false, 'process_message'); function shutdown($channel, $connection)
{
$channel->close();
$connection->close();
}
register_shutdown_function('shutdown', $channel, $connection); while ($channel ->is_consuming()) {
$channel->wait();
}
运行消费者代码之后,会获取到订单号,之后可以检查该订单的状态,如果未支付则进行取消操作,如下图:
最新文章
- EasyUI 解决 datagrid 中 NumberBox 限制小数位数后不能输入小数点问题
- codeforces C. Vanya and Scales
- 移动端中pagehide、pageshow的应用
- 时间就像Hourglass一样,积累(沉淀)越多,收获越大
- httpClient 4.x post get方法
- scala函数定义的四种方式
- 瞬态电压抑制二极管(TVS)选用原则
- Ajax禁止缓存的几个解决方案
- 如何自学Python?
- 9.2.2、Libgdx的输入处理之事件处理
- restful规范快速记忆
- CentOS 6.5 升级内核
- vagrant 安装笔记
- alt.js 使用教程
- [原创]HTML 用div模拟select下拉框
- bash 括号(小括号,双小括号,中括号,双中括号,大括号)
- wpgcms---设置应用模板
- 【转】Windows Server 2008 R2怎样设置自动登陆
- 工具类-vim在shell中卡死的情况
- PVS 7.6 部署教程