PHP 下基于 php-amqp 扩展的 RabbitMQ 简单用例 (五) -- 自动 ACK、手动 ACK、NACK
2024-08-22 19:54:31
以 Direct 类型的 交换机和 Queue 的 get 方法为例.
producer.php
// 连接设置
$conConfig = [
'host' => '127.0.0.1',
'port' => 5672,
'login' => 'root',
'password' => 'root',
'vhost' => '/'
];
try
{
// RabbitMQ 连接实例
$con = new AMQPConnection($conConfig);
// 发起连接
$con->connect();
// 新建通道
$channel = new AMQPChannel($con);
// 在指定通道上新建交换机
$exchange = new AMQPExchange($channel);
// 交换机名称
$exchange->setName('test.exchange.ack');
// 交换机类型
$exchange->setType('direct');
// 声明交换机
$exchange->declareExchange(); for($i = 1; $i <= 3; $i++)
{
$msg = '消息' . $i;
// 发送消息,同时为消息指定routing key,成功返回true,失败false
$state = $exchange->publish($msg, 'test.rt.ack');
if($state)
{
echo 'Success' . PHP_EOL;
}else
{
echo 'Fail' . PHP_EOL;
}
} // 关闭连接
//$con->disconnect();
}catch(\Exception $e)
{
echo $e->getMessage();
}
自动 ACK
consumer.php
$conConfig = [
'host' => '127.0.0.1',
'port' => 5672,
'login' => 'root',
'password' => 'root',
'vhost' => '/'
]; try
{
$con = new AMQPConnection($conConfig);
$con->connect(); $channel = new AMQPChannel($con);
$exchange =new AMQPExchange($channel);
$exchange->setName('test.exchange.ack');
$exchange->setType('direct');
$exchange->declareExchange(); $queue = new AMQPQueue($channel);
$queue->setName('test.ack.queue');
// 声明队列同时返回队列中的消息数量
$messageCount = $queue->declareQueue();
echo '消息数量: ' . $messageCount . PHP_EOL;
$queue->bind('test.exchange.ack', 'test.rt.ack'); // 获取消息后进行自动应答时, get方法的参数设置为AMQP_AUTOACK即可
while($msgEnvelope = $queue->get(AMQP_AUTOACK))
{
$msg = $msgEnvelope->getBody();
echo $msg . PHP_EOL;
}
$con->disconnect();
}catch(Exception $e)
{
echo $e->getMessage();
}
将 Queue 的 get 方法参数设置为 AMQP_AUTOACK 即可在获取到消息后自动发送消息已收到响应.
手动 ack
如果不需要自动 ack, 而是根据实际的业务处理结果进行处理. Queue 的 get 方法参数修改为 AMQP_NOPARM 即可.
修改后推送三条消息:
连续两次从队列中获取消息:
如果不进行 ack, 队列中的消息将一直存在, 可以反复获取.
继续修改 while 循环为:
while($msgEnvelope = $queue->get(AMQP_NOPARAM))
{
$msg = $msgEnvelope->getBody();
if(preg_match("/.*?消息2.*?/", $msg))
{
// 对消息2执行确定响应
$queue->ack($msgEnvelope->getDeliveryTag());
}
echo date('Y-m-d H:i:s') . ' ' . $msg . PHP_EOL;
}
连续执行两次 comsumer.php:
第一次获取到 3 条消息, 但第一次执行中对消息 2 执行了确认响应, 剩余消息不进行确认响应. 第二次执行中只获取到剩余消息.
NACK (否定响应)
如果既不想对消息执行确定响应, 也不需要消息继续出现在队列中, 可以使用 Queue 的 nack 方法. 继续修改 while 循环:
while($msgEnvelope = $queue->get(AMQP_NOPARAM))
{
$msg = $msgEnvelope->getBody();
if(preg_match("/.*?消息2.*?/", $msg))
{
// 对消息2执行确定响应
$queue->ack($msgEnvelope->getDeliveryTag());
}else
{
$queue->nack($msgEnvelope->getDeliveryTag());
}
echo date('Y-m-d H:i:s') . ' ' . $msg . PHP_EOL;
}
推送 3 条消息后, 连续执行两次 consumper.php:
第一次执行, 获取到 3 条数据; 第二次执行未获取到任何数据. nack 方法除了可以从队列中过滤掉不需要的方法, 也可以将暂时不需要的方法重新放回队列, 将该方法的调用修改为:
$queue->nack($msgEnvelope->getDeliveryTag(), AMQP_REQUEUE);
注意: nack 方法将消息放回队列后, 队列会将消息再次推送给消费者. 如果此时队列只有一个消费者, 将会造成死循环.
最新文章
- 【原创】js实现一个可随意拖拽排序的菜单导航栏
- WPF 程序Form自的控件自适应方式之一
- HTTP 战役 与 历史
- PHP+微信分享自定义小图标
- c#中的protected和internal
- 利用Rsync在windows和linux之间同步数据
- 【Hololens】微软Hololens虚拟现实视频集
- Core 核心标签库->;运算式操作
- 再谈 SharePoint 大局观
- Spark源码剖析(七):Job触发流程原理与源码剖析
- 安装vue错误详情解决办法
- 章节四、4-For循环
- Jmeter 自动化测试报告扩展
- Linux TCP并发请求溺出 调优
- GDOI2018 Day1 题目总结
- javascript基础拾遗(十三)
- lynis-*nix安全审计
- ASP.NET MVC5 学习系列之初探MVC
- Memcachedclient-XMemcached使用
- XP系统下 VS2010 选中行崩溃
热门文章
- Ubuntu 14.04.1 配置 Android 源码开发环境(jdk版本切换)(转载)
- ubuntu 12.04上安装QQ2013(转载)
- [软件安装]JDK
- Swift4 函数, 元组, 运算符
- bzoj 3784: 树上的路径【点分治+st表+堆】
- React实战之将数据库返回的时间转换为几分钟前、几小时前、几天前的形式。
- 利用OneDNS同步chrome数据
- 【微信公众号开发】根据openId群发消息
- ObjectARX2012错误1 fatal error C1083: 无法打开包括文件:“arxHeaders.h”: No such file or directory; fatal error C1083: 无法打开包括文件:“map”: No such file or directory
- Python variable 作用域和初始化