对于 Message 的 routing_key 字符串格式是有限制的:以点号"."分割的字符表,如php.laravel,并且长度不能超过 255 个字节。

对于 routing_key 而言,有两个特殊字符:

  • *:代表任意单词
  • #:代表0个或多个单词

Topic Exchange 与其他 Exchange 的转化:

  • routing_key 是 #,会接收所有 Message,此时等同于 Fanout Exchange;
  • routing_key 不包含 # 或 *,则等同于 Direct Exchange

整合代码

emit_log_topic.php

<?php

/**
* 发送消息
*/ $exchangeName = 'topic_logs';
$topic = empty($argv[1]) ? 'anonymous.info' : $argv[1]; // 主题
$message = empty($argv[2]) ? 'Hello World!' : $argv[2]; // 建立TCP连接
$connection = new AMQPConnection([
'host' => 'localhost',
'port' => '5672',
'vhost' => '/',
'login' => 'guest',
'password' => 'guest'
]);
$connection->connect() or die("Cannot connect to broker!\n"); $channel = new AMQPChannel($connection);
$exchange = new AMQPExchange($channel);
$exchange->setName($exchangeName);
$exchange->setType(AMQP_EX_TYPE_TOPIC);
$exchange->declareExchange(); $exchange->publish($message, $topic);
echo "Message is sent: " . $message . "\n";
$connection->disconnect();

receive_logs_topic.php

<?php

/**
* 接收消息
*/ $exchangeName = 'topic_logs';
$topic = $argv[1]; // 建立TCP连接
$connection = new AMQPConnection([
'host' => 'localhost',
'port' => '5672',
'vhost' => '/',
'login' => 'guest',
'password' => 'guest'
]);
$connection->connect() or die("Cannot connect to broker!\n"); $channel = new AMQPChannel($connection); $exchange = new AMQPExchange($channel);
$exchange->setName($exchangeName);
$exchange->setType(AMQP_EX_TYPE_TOPIC);
$exchange->declareExchange(); $queue = new AMQPQueue($channel);
$queue->setFlags(AMQP_EXCLUSIVE);
$queue->declareQueue();
$queue->bind($exchangeName, $topic); echo "Waiting for logs...\n";
while (TRUE) {
$queue->consume('processLogs');
} $connection->disconnect(); function processLogs($envelope, $queue) {
$logs = $envelope->getBody();
var_dump("Received: " . $logs);
$queue->ack($envelope->getDeliveryTag()); // 手动发送ACK应答
}

先运行脚本:

php receive_logs_topic.php *.laravel

然后再运行另外一个脚本:

php emit_log_topic.php php.laravel

效果展示:

最新文章

  1. alias拦截器的使用
  2. Centos下防止暴力破解密码 - Denyhost
  3. linux中安装eclipse,安装好之后不能直接建servlet,不能直接在jsp页面中run on server.权限在作怪,我猜的,
  4. Mysql数据库优化
  5. 用C语言画一个“爱心”
  6. SQL Server 一些关键字详解(一)
  7. JS全局屏蔽回车事件
  8. RichTextBox控件-主要用于输入输出编辑文本信息
  9. 基于SuperSocket实现的WebSocket(前端)
  10. VCL线程的同步方法 Synchronize(用消息来同步)
  11. HTML转义字符大全(转)
  12. 隔一段时间应用就会挂掉(进程在,但停止响应,也无log输出),必须重启tomcat
  13. 关于java反射获取泛型
  14. HK2框架的简单自实现kunJ
  15. 一、Python入门
  16. node,Yeoman,Bower,Grunt的简介及安装
  17. 002_cookie的session_id解释
  18. glog日志库使用笔记
  19. 2017-12-21 FriceEngine试用与API中文化
  20. C# TextBox猜想输入和历史记录输入

热门文章

  1. 原生js获取display属性注意事项
  2. 结构型模式(二) 桥接模式(Bridge)
  3. 查看mysql日志文件
  4. [NgRx 8] Basic of NgRx8
  5. php自定义函数之内部函数
  6. orm-配置不启动项目自动查询orm
  7. div双击全屏,再双击恢复到原来的状态vue,js来做
  8. Java利用FastJson一行代码转List&lt;Map&gt;为List&lt;Bean&gt;
  9. 咏南中间件随LINUX开机自动启动
  10. TThread.Queue和TThread.Synchronize的区别