以 Direct 类型的 交换机和 Queue 的 get 方法为例.

producer.php

// 连接设置
$conConfig = [
'host' => '127.0.0.1',
'port' => 5672,
'login' => 'root',
'password' => 'root',
'vhost' => '/'
];
try
{
// RabbitMQ 连接实例
$con = new AMQPConnection($conConfig);
// 发起连接
$con->connect();
// 新建通道
$channel = new AMQPChannel($con);
// 在指定通道上新建交换机
$exchange = new AMQPExchange($channel);
// 交换机名称
$exchange->setName('test.exchange.ack');
// 交换机类型
$exchange->setType('direct');
// 声明交换机
$exchange->declareExchange(); for($i = 1; $i <= 3; $i++)
{
$msg = '消息' . $i;
// 发送消息,同时为消息指定routing key,成功返回true,失败false
$state = $exchange->publish($msg, 'test.rt.ack');
if($state)
{
echo 'Success' . PHP_EOL;
}else
{
echo 'Fail' . PHP_EOL;
}
} // 关闭连接
//$con->disconnect();
}catch(\Exception $e)
{
echo $e->getMessage();
}

自动 ACK

consumer.php

$conConfig = [
'host' => '127.0.0.1',
'port' => 5672,
'login' => 'root',
'password' => 'root',
'vhost' => '/'
]; try
{
$con = new AMQPConnection($conConfig);
$con->connect(); $channel = new AMQPChannel($con);
$exchange =new AMQPExchange($channel);
$exchange->setName('test.exchange.ack');
$exchange->setType('direct');
$exchange->declareExchange(); $queue = new AMQPQueue($channel);
$queue->setName('test.ack.queue');
// 声明队列同时返回队列中的消息数量
$messageCount = $queue->declareQueue();
echo '消息数量: ' . $messageCount . PHP_EOL;
$queue->bind('test.exchange.ack', 'test.rt.ack'); // 获取消息后进行自动应答时, get方法的参数设置为AMQP_AUTOACK即可
while($msgEnvelope = $queue->get(AMQP_AUTOACK))
{
$msg = $msgEnvelope->getBody();
echo $msg . PHP_EOL;
}
$con->disconnect();
}catch(Exception $e)
{
echo $e->getMessage();
}

将 Queue 的 get 方法参数设置为 AMQP_AUTOACK 即可在获取到消息后自动发送消息已收到响应.

手动 ack

如果不需要自动 ack, 而是根据实际的业务处理结果进行处理. Queue 的 get 方法参数修改为 AMQP_NOPARM 即可.

修改后推送三条消息:

连续两次从队列中获取消息:

如果不进行 ack, 队列中的消息将一直存在, 可以反复获取.

继续修改 while 循环为:

while($msgEnvelope = $queue->get(AMQP_NOPARAM))
{
$msg = $msgEnvelope->getBody();
if(preg_match("/.*?消息2.*?/", $msg))
{
// 对消息2执行确定响应
$queue->ack($msgEnvelope->getDeliveryTag());
}
echo date('Y-m-d H:i:s') . ' ' . $msg . PHP_EOL;
}

连续执行两次 comsumer.php:

第一次获取到 3 条消息, 但第一次执行中对消息 2 执行了确认响应, 剩余消息不进行确认响应. 第二次执行中只获取到剩余消息.

NACK (否定响应)

如果既不想对消息执行确定响应, 也不需要消息继续出现在队列中, 可以使用 Queue 的 nack 方法. 继续修改 while 循环:

while($msgEnvelope = $queue->get(AMQP_NOPARAM))
{
$msg = $msgEnvelope->getBody();
if(preg_match("/.*?消息2.*?/", $msg))
{
// 对消息2执行确定响应
$queue->ack($msgEnvelope->getDeliveryTag());
}else
{
$queue->nack($msgEnvelope->getDeliveryTag());
}
echo date('Y-m-d H:i:s') . ' ' . $msg . PHP_EOL;
}

推送 3 条消息后, 连续执行两次 consumper.php:

第一次执行, 获取到 3 条数据; 第二次执行未获取到任何数据. nack 方法除了可以从队列中过滤掉不需要的方法, 也可以将暂时不需要的方法重新放回队列, 将该方法的调用修改为:

$queue->nack($msgEnvelope->getDeliveryTag(), AMQP_REQUEUE);

注意: nack 方法将消息放回队列后, 队列会将消息再次推送给消费者. 如果此时队列只有一个消费者, 将会造成死循环.

最新文章

  1. 【原创】js实现一个可随意拖拽排序的菜单导航栏
  2. WPF 程序Form自的控件自适应方式之一
  3. HTTP 战役 与 历史
  4. PHP+微信分享自定义小图标
  5. c#中的protected和internal
  6. 利用Rsync在windows和linux之间同步数据
  7. 【Hololens】微软Hololens虚拟现实视频集
  8. Core 核心标签库-&gt;运算式操作
  9. 再谈 SharePoint 大局观
  10. Spark源码剖析(七):Job触发流程原理与源码剖析
  11. 安装vue错误详情解决办法
  12. 章节四、4-For循环
  13. Jmeter 自动化测试报告扩展
  14. Linux TCP并发请求溺出 调优
  15. GDOI2018 Day1 题目总结
  16. javascript基础拾遗(十三)
  17. lynis-*nix安全审计
  18. ASP.NET MVC5 学习系列之初探MVC
  19. Memcachedclient-XMemcached使用
  20. XP系统下 VS2010 选中行崩溃

热门文章

  1. Ubuntu 14.04.1 配置 Android 源码开发环境(jdk版本切换)(转载)
  2. ubuntu 12.04上安装QQ2013(转载)
  3. [软件安装]JDK
  4. Swift4 函数, 元组, 运算符
  5. bzoj 3784: 树上的路径【点分治+st表+堆】
  6. React实战之将数据库返回的时间转换为几分钟前、几小时前、几天前的形式。
  7. 利用OneDNS同步chrome数据
  8. 【微信公众号开发】根据openId群发消息
  9. ObjectARX2012错误1 fatal error C1083: 无法打开包括文件:“arxHeaders.h”: No such file or directory; fatal error C1083: 无法打开包括文件:“map”: No such file or directory
  10. Python variable 作用域和初始化