一、

消息的广播需要exchange:exchange是一个转发器,其实把消息发给RabbitMQ里的exchange

fanout: 所有bind到此exchange的queue都可以接收消息,广播

direct: 通过routingKey和exchange决定的那个唯一的queue可以接收消息

topic:所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息

headers:通过headers来决定把消息发给哪些queue,用的比较少

原理:消息发布端分别发送INFO,WARNING,ERROR类型的消息,C1 C2 C3订阅了不同类型的消息

消息发送端:

'''
发布者publisher
'''
import pika
import sys connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs',
type='direct') # 1、改成type='direct'
# 2、默认发送的消息级别为info,可以带参数,warning error等
severity = sys.argv[1] if len(sys.argv) > 1 else "info" message = ' '.join(sys.argv[2:]) or " Hello World!"
channel.basic_publish(exchange='direct_logs',
routing_key=severity, # 3、把上面的消息发到这个queue中
body=message)
print("send :", message)
connection.close()

消息订阅者:

'''
订阅者subscriber
'''
import pika
import sys connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs',
type='direct') # 4、改exchange的类型
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue # 5、启动订阅端的时候,severities存放订阅端订阅了哪些级别
# 然后用routing_key把这些级别绑定到queue上,这些queue就放这些级别的消息
severities = sys.argv[1]
if not severities:
sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
sys.exit(1)
for severity in severities:
channel.queue_bind(exchange='direct_logs',
queue=queue_name,
routing_key=severity)
print("Wait for logs...")
# 6、使用method.routing_key可以得到消息的级别
def callback(ch, method, properties, body):
print("received:", method.routing_key, body)
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()

运行过程:

'''
启动订阅者1: python subscriber.py info
启动订阅者2:python subscriber.py info error
启动发布者1:python publisher.py info hello
启动发布者2:python publisher.py error servicesdown
订阅者1收到消息:info b'hello'
订阅者2收到消息:info b'hello' error b'servicesdown'
'''

最新文章

  1. Junit4参数化测试实现程序与用例数据分离
  2. Sharepoint 2010 创建栏 计算栏
  3. svn Error: post-commit hook failed (exit code 127) with output
  4. _EPROCESS结构简单了解!
  5. [开发笔记]-WindowsService服务程序开发
  6. Android 订阅-发布者模式-详解
  7. 修改Android 程序的icon快捷方式图标和名称
  8. JAVA小笔记
  9. 【录音】Android录音--AudioRecord、MediaRecorder
  10. 在mangento后台调用wysiwyg编辑器
  11. java问题诊断
  12. .net 平台下, Socket通讯协议中间件设计思路(附源码)
  13. elasticsearch 常用命令
  14. PowerDesigner大小写转换
  15. echarts饼图去除鼠标移入高亮
  16. Analysis Services 中的服务器属性配置
  17. CF617E XOR and Favorite Number
  18. 性能调优9:根据WaitType诊断性能
  19. LeetCode:149_Max Points on a line | 寻找一条直线上最多点的数量 | Hard
  20. 弹窗中修改select默认值遇到的问题

热门文章

  1. zabbix备份数据库
  2. Shader 入门笔记(二) CPU和GPU之间的通信,渲染流水线
  3. 三种方式给apt设置代理
  4. .NET应用加载容器Glue4Net
  5. 详解intellij idea搭建SSM框架(spring+maven+mybatis+mysql+junit)(上)
  6. 什么是LogDashboard?
  7. RabbitMQ消息队列(九)-通过Headers模式分发消息(.Net Core版)
  8. springboot情操陶冶-初识springboot
  9. Perl进程:僵尸进程和孤儿进程
  10. 服务器配置https