延迟任务应用场景

场景一:物联网系统经常会遇到向终端下发命令,如果命令一段时间没有应答,就需要设置成超时。

场景二:订单下单之后30分钟后,如果用户没有付钱,则系统自动取消订单。

场景三:过1分钟给新注册会员的用户,发送注册邮件等。

php 使用rabbitmq-delayed-message-exchange插件实现延迟功能

1.安装

下载后解压,并将其拷贝至(使用Linux Debian/RPM部署)rabbitmq服务器目录:/usr/local/rabbitmq/plugins中( windows安装目录\rabbitmq_server-version\plugins ).

2.启用插件

使用命令rabbitmq-plugins enable rabbitmq_delayed_message_exchang启用插件

rabbitmq-plugins enable rabbitmq_delayed_message_exchang

输出如下:

The following plugins have been enabled:
rabbitmq_delayed_message_exchange

通过rabbitmq-plugins list查看已安装列表,如下:

...
[ ] rabbitmq_delayed_message_exchange 20171215-3.6.x
...

3.机制解释

安装插件后会生成新的Exchange类型x-delayed-message,该类型消息支持延迟投递机制,接收到消息后并未立即将消息投递至目标队列中,而是存储在mnesia(一个分布式数据系统)表中,检测消息延迟时间,如达到可投递时间时并将其通过x-delayed-type类型标记的交换机类型投递至目标队列。

4.php实现过程

消费者 delay_consumer2.php:

<?php

//header('Content-Type:text/html;charset=utf8;');

$params = array(
'exchangeName' => 'delayed_exchange_test',
'queueName' => 'delayed_queue_test',
'routeKey' => 'delayed_route_test',
);
$connectConfig = array(
'host' => 'localhost',
'port' => 5672,
'login' => 'guest',
'password' => 'guest',
'vhost' => '/'
); //var_dump(extension_loaded('amqp')); //exit(); try {
$conn = new AMQPConnection($connectConfig);
$conn->connect();
if (!$conn->isConnected()) {
//die('Conexiune esuata');
//TODO 记录日志
echo 'rabbit-mq 连接错误:', json_encode($connectConfig);
exit();
}
$channel = new AMQPChannel($conn);
if (!$channel->isConnected()) {
// die('Connection through channel failed');
//TODO 记录日志
echo 'rabbit-mq Connection through channel failed:', json_encode($connectConfig);
exit();
}
$exchange = new AMQPExchange($channel);
//$exchange->setFlags(AMQP_DURABLE);//声明一个已存在的交换器的,如果不存在将抛出异常,这个一般用在consume端
$exchange->setName($params['exchangeName']);
$exchange->setType('x-delayed-message'); //x-delayed-message类型
/*RabbitMQ常用的Exchange Type有三种:fanout、direct、topic。       fanout:把所有发送到该Exchange的消息投递到所有与它绑定的队列中。       direct:把消息投递到那些binding key与routing key完全匹配的队列中。       topic:将消息路由到binding key与routing key模式匹配的队列中。*/
$exchange->setArgument('x-delayed-type','direct');
$exchange->declareExchange(); //$channel->startTransaction(); $queue = new AMQPQueue($channel);
$queue->setName($params['queueName']);
$queue->setFlags(AMQP_DURABLE);
$queue->declareQueue(); //绑定
$queue->bind($params['exchangeName'], $params['routeKey']);
} catch(Exception $e) {
echo $e->getMessage();
exit();
} function callback(AMQPEnvelope $message) {
global $queue;
if ($message) {
$body = $message->getBody();
echo '接收时间:'.date("Y-m-d H:i:s", time()). PHP_EOL;
echo '接收内容:'.$body . PHP_EOL;
//为了防止接收端在处理消息时down掉,只有在消息处理完成后才发送ack消息
$queue->ack($message->getDeliveryTag());
} else {
echo 'no message' . PHP_EOL;
}
} //$queue->consume('callback'); 第一种消费方式,但是会阻塞,程序一直会卡在此处 //第二种消费方式,非阻塞
/*$start = time();
while(true)
{
$message = $queue->get();
if(!empty($message))
{
echo $message->getBody();
$queue->ack($message->getDeliveryTag()); //应答,代表该消息已经消费
$end = time();
echo '<br>' . ($end - $start);
exit();
}
else
{
//echo 'message not found' . PHP_EOL;
}
}*/ //注意:这里需要注意的是这个方法:$queue->consume,queue对象有两个方法可用于取消息:consume和get。前者是阻塞的,无消息时会被挂起,适合循环中使用;后者则是非阻塞的,取消息时有则取,无则返回false。
//就是说用了consume之后,会同步阻塞,该程序常驻内存,不能用nginx,apache调用。
$action = '2'; if($action == '1'){
$queue->consume('callback'); //第一种消费方式,但是会阻塞,程序一直会卡在此处
}else{
//第二种消费方式,非阻塞
$start = time();
while(true)
{
$message = $queue->get();
if(!empty($message))
{
echo '接收时间:'.date("Y-m-d H:i:s", time()). PHP_EOL;
echo '接收内容:'.$message->getBody().PHP_EOL;
$queue->ack($message->getDeliveryTag()); //应答,代表该消息已经消费
$end = time();
echo '运行时间:'.($end - $start).'秒'.PHP_EOL;
//exit();
}
else
{
//echo 'message not found' . PHP_EOL;
}
}
}

生产者delay_publisher2.php:

<?php

//header('Content-Type:text/html;charset=utf-8;');

$params = array(
'exchangeName' => 'delayed_exchange_test',
'queueName' => 'delayed_queue_test',
'routeKey' => 'delayed_route_test',
); $connectConfig = array(
'host' => 'localhost',
'port' => 5672,
'login' => 'guest',
'password' => 'guest',
'vhost' => '/'
); //var_dump(extension_loaded('amqp')); 判断是否加载amqp扩展
//exit();
try {
$conn = new AMQPConnection($connectConfig);
$conn->connect();
if (!$conn->isConnected()) {
//die('Conexiune esuata');
//TODO 记录日志
echo 'rabbit-mq 连接错误:', json_encode($connectConfig);
exit();
}
$channel = new AMQPChannel($conn);
if (!$channel->isConnected()) {
// die('Connection through channel failed');
//TODO 记录日志
echo 'rabbit-mq Connection through channel failed:', json_encode($connectConfig);
exit();
}
$exchange = new AMQPExchange($channel);
$exchange->setName($params['exchangeName']);
$exchange->setType('x-delayed-message'); //x-delayed-message类型
/*RabbitMQ常用的Exchange Type有三种:fanout、direct、topic。       fanout:把所有发送到该Exchange的消息投递到所有与它绑定的队列中。       direct:把消息投递到那些binding key与routing key完全匹配的队列中。       topic:将消息路由到binding key与routing key模式匹配的队列中。*/
$exchange->setArgument('x-delayed-type','direct');
$exchange->declareExchange(); //$channel->startTransaction();
//RabbitMQ不容许声明2个相同名称、配置不同的Queue,否则报错
$queue = new AMQPQueue($channel);
$queue->setName($params['queueName']);
$queue->setFlags(AMQP_DURABLE);
$queue->declareQueue(); //绑定队列和交换机
$queue->bind($params['exchangeName'], $params['routeKey']);
//$channel->commitTransaction();
} catch(Exception $e) { } for($i=5;$i>0;$i--){
//生成消息
echo '发送时间:'.date("Y-m-d H:i:s", time()).PHP_EOL;
echo 'i='.$i.',延迟'.$i.'秒'.PHP_EOL;
$message = json_encode(['order_id'=>time(),'i'=>$i]);
$exchange->publish($message, $params['routeKey'], AMQP_NOPARAM, ['headers'=>['x-delay'=> 1000*$i]]);
sleep(2);
}
$conn->disconnect();

对于代码来讲,首先对于消费者核心代码

$exchange->setType('x-delayed-message'); //x-delayed-message类型
$exchange->setArgument('x-delayed-type','direct');

生产者核心代码

$exchange = new AMQPExchange($channel);
$exchange->setName($params['exchangeName']);
$exchange->setType('x-delayed-message'); //x-delayed-message类型
$exchange->setArgument('x-delayed-type','direct');
$exchange->declareExchange();

使用方法:先运行delay_consumer1.php,再运行delay_publisher1.php

运行效果:

最新文章

  1. Tsinsen A1493 城市规划(DP + CDQ分治 + NTT)
  2. MongoDB-3.2.6 副本集 和主从
  3. MSIL 教程(二):数组、分支、循环、使用不安全代码和如何调用Win32 API(转)
  4. 洛谷P2751 [USACO4.2]工序安排Job Processing
  5. MVC+MEF+UnitOfWork+EF架构,网站速度慢的原因总结!(附加ANTS Memory Profiler简单用法)
  6. 冒泡排序 JAVA版
  7. 现代3D图形编程学习-你好,三角形(译)
  8. Day 18: 记filebeat内存泄漏问题分析及调优
  9. saltstack主机管理项目:计主机管理项目命令分发器(三)
  10. SonarLint 代码质量管理
  11. [ZOJ 4024] Peak
  12. MySQL之You can't specify target table for update in FROM clause解决办法
  13. 使用dstat命令的插件查看mysql的io状态
  14. sql server数据库查看锁表和解锁
  15. 使用DebugView小工具调试已部署的.net程序 (转)
  16. fnmatch源码阅读
  17. 前端学习之HTML(1)
  18. python3 openpyxl基本操作
  19. javascript 将内容复制到剪贴板
  20. 使用open_read_write等底层函数来赋值一个文件

热门文章

  1. python - django (路由)
  2. 模拟赛20181031 雅礼 Wearry 养花 折射 画作
  3. SpringBoot学习(四)开发web应用
  4. MySQL常用五大引擎的区别
  5. Kubernetes 学习11 kubernetes ingress及ingress controller
  6. C语言指针的使用例子(1)指针地址的输出
  7. leetcode解题报告(33): Find All Numbers Disappeared in an Array
  8. Maven+SSM整合ehcache
  9. UOJ#318. 【NOI2017】蔬菜 贪心
  10. shell history 命令