PHP实现RabbitMQ的Publish/Subscribe
2024-09-06 02:50:28
<?php
/**
* Created by PhpStorm.
* User: 豆腐居士
* Date: 2018/5/30
* Time: 上午11:01
*/ class AqiTask extends BaseTask
{
const EX_NAME = 'aqi_fanout';
function clientAction()
{
$conn = new AMQPConnection([
'host' => '0.0.0.0',
'port' => 5672,
'login' => 'wangwang',
'password' => '123456'
]);
if (!$conn->connect()) {
exit('fail');
}
$channel = new AMQPChannel($conn);
$ex = new AMQPExchange($channel);
$ex->setName(self::EX_NAME);
$ex->setType(AMQP_EX_TYPE_FANOUT); //direct类型
$ex->setFlags(AMQP_DURABLE); //持久化
$ex->declareExchange();
for ($i = 0; $i < 10; $i++) {
$msg = 'hello fanout:' . $i;
echo $msg . PHP_EOL;
$ex->publish($msg);
}
$conn->disconnect();
} function server1Action()
{
$conn = new AMQPConnection([
'host' => '0.0.0.0',
'port' => 5672,
'login' => 'wangwang',
'password' => '123456'
]);
if (!$conn->connect()) {
exit('fail');
}
$channel = new AMQPChannel($conn);
$ex = new AMQPExchange($channel);
$ex->setName(self::EX_NAME);
$ex->setType(AMQP_EX_TYPE_FANOUT); //fanout类型
$ex->setFlags(AMQP_DURABLE); //持久化
$ex->declareExchange(); $queue = new AMQPQueue($channel);
$queue->setName('q1');
$queue->setFlags(AMQP_DURABLE);
$queue->declareQueue();
$queue->bind($ex->getName());try {
$queue->consume(function ($envelope, $q) {
echo 'server1:' . $envelope->getBody(), PHP_EOL;
$q->ack($envelope->getDeliveryTag()); //这里是手动应答 如果使用默认的自动应答,进程退出后消息会丢失
});
} catch (\Exception $e) {
echo $e->getMessage();
}
} function server2Action()
{
$conn = new AMQPConnection([
'host' => '0.0.0.0',
'port' => 5672,
'login' => 'wangwang',
'password' => '123456'
]);
if (!$conn->connect()) {
exit('fail');
} $channel = new AMQPChannel($conn);
$ex = new AMQPExchange($channel);
$ex->setName(self::EX_NAME);
$ex->setType(AMQP_EX_TYPE_FANOUT);
$ex->setFlags(AMQP_DURABLE);
$ex->declareExchange(); $queue = new AMQPQueue($channel);
$queue->setName('q2');
$queue->setFlags(AMQP_DURABLE);
$queue->declareQueue();
$queue->bind($ex->getName());$queue->consume(function ($envelope, $q) {
echo 'server2:' . $envelope->getBody(), PHP_EOL;
$q->ack($envelope->getDeliveryTag());
});
}
}
最新文章
- Python
- 简单阐述下OC中UIImage三种创建方式~~~
- python pip 升级每个包
- Jenkins构建Git manager服务器的源码
- ACM竞赛常用STL(二)之STL--algorithm
- sed和awk最佳入门教程
- 深入研究B树索引(一)
- Hibernate 、多表关联映射 - 一对一关系映射(one- to-one)
- Linux文件属性上
- intel服务器cpu命名规则
- 【Bootstrap简单用法】
- IntelliJ IDEA运行慢解决方法
- Maven 构建浏览器解析userAgent类
- js编译原理(你不知道的javascript)
- Vc数据库编程基础1
- SQL结构化查询语句
- Python 进制转换 二进制 八进制 十进制 十六进制
- Pandas plot出图
- create-react-app创建的项目npm run build之后静态文件找不到
- poj3237树链剖分边权+区间取负
热门文章
- Django admin模块使用search时报错:django.core.exceptions.FieldError: Related Field got invalid lookup: contains
- laravel5.2总结--软删除
- User Account Control
- RNNs在股票价格预测的应用
- 无法启动此程序,因为计算机中丢失OgreMain_d.dll。尝试重新安装该程序以解决此问题。
- STL学习笔记3--deque
- BZOJ 3876 支线剧情(有上下界的无源汇最小费用可行流)
- BZOJ 3150 [Ctsc2013]猴子 ——期望DP 高斯消元
- 洛谷P3245 [HNOI2016]大数 【莫队】
- imx6 PCIE使能加载ath9k无线网卡