安装python rabbitMQ module

pip instal pika

发布者:

#!/usr/bin/env  python
#coding:utf8 import pika connection=pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel=connection.channel() #######################发布者##########################
channel.queue_declare(queue='hello')#队列名称
channel.basic_publish(exchange='',#交换器
routing_key='hello',#发布的消息绑定到指定的队列
body='hello,world!'#消息内容
) print "[x] Sent 'Hello,world!'"
connection.close()

订阅者

#!/usr/bin/env  python
#coding:utf8 import pika
import time connection=pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))#连接rabbitmq-server的IP
channel=connection.channel()#打开通道 #######################消费者##########################
channel.queue_declare(queue='hello')#创建队列
#如果这行代码注释,程序运行的时候会找指定的hello队列,如果hello不存在则会报错.
#如果server端口先启动并且创建了hello队列,这里运行的时候可以不创建队列.因为server已创建 def callback(ch,method,properties,body):
print(" [x] Received %r" % body)
import time
time.sleep(10)
print 'ok'
ch.basic_ack(delivery_tag = method.delivery_tag)#处理完消息之后会发送ack信号 channel.basic_consume(callback,#往队列取回消息之后执行回调函数
queue='hello',#队列名称
no_ack=False,
)
#no_ack=False 保证消息不丢失.如果消费者收到消息后正在处理,此时网络中断或机器断电.RabbitMQ会重新将该任务添加到队列中
channel.start_consuming()#关闭连接

2.消息持久化

如果生产者往队列中发布了一条消息,此时rabbitmq服务断开了。服务恢复后消费者会收不到消息。解决办法请看下面代码:

#!/usr/bin/env  python
#coding:utf8 import pika connection=pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel=connection.channel() #######################发布者##########################
channel.queue_declare(queue='hello1',durable=True)#创建队列 #保证消息在rabbitmq里面不丢失需要满足2点:
#1.队列持久化 durable=True
#2.发送的消息持久化
# properties=pika.BasicProperties(
# delivery_mode=2, # make message persistent
# ) channel.basic_publish(exchange='',#交换器
routing_key='hello1',#发布的消息绑定到指定的队列
body='hello,world!',#消息内容
properties=pika.BasicProperties(
delivery_mode=2, # make message persistent
)) print "[x] Sent 'Hello,world!'"
connection.close()
#!/usr/bin/env  python
#coding:utf8 import pika
import time connection=pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))#连接rabbitmq-server的IP
channel=connection.channel()#打开通道 #######################消费者##########################
channel.queue_declare(queue='hello1',durable=True)#创建队列
#如果这行代码注释,程序运行的时候会找指定的hello队列,如果hello不存在则会报错.
#如果server端口先启动并且创建了hello队列,这里运行的时候可以不创建队列.因为server已创建
#队列持久化 durable=True def callback(ch,method,properties,body):
print(" [x] Received %r" % body)
import time
# time.sleep(10)
# print 'ok'
ch.basic_ack(delivery_tag = method.delivery_tag)#处理完消息之后会发送ack信号 channel.basic_consume(callback,#往队列取回消息之后执行回调函数
queue='hello1',#队列名称
no_ack=False,
)
#no_ack=False 保证消息不丢失.如果消费者受到消息后正在处理,此时网络中断或机器断电.消息还保存在队列不会丢失.
channel.start_consuming()#关闭连接

3、消息获取顺序

默认消息队列里的数据是按照顺序被消费者拿走,例如:消费者1 去队列中获取 奇数 序列的任务,消费者1去队列中获取 偶数 序列的任务。

channel.basic_qos(prefetch_count=1) 表示谁来谁取,不再按照奇偶数排列

消费者代码:

#!/usr/bin/env  python
#coding:utf8 import pika
import time connection=pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))#连接rabbitmq-server的IP
channel=connection.channel()#打开通道 #######################消费者##########################
channel.queue_declare(queue='hello1',durable=True)#创建队列
#如果这行代码注释,程序运行的时候会找指定的hello队列,如果hello不存在则会报错.
#如果server端口先启动并且创建了hello队列,这里运行的时候可以不创建队列.因为server已创建
#队列持久化 durable=True def callback(ch,method,properties,body):
print(" [x] Received %r" % body)
import time
# time.sleep(10)
# print 'ok'
ch.basic_ack(delivery_tag = method.delivery_tag)#处理完消息之后会发送ack信号 channel.basic_qos(prefetch_count=1) channel.basic_consume(callback,#往队列取回消息之后执行回调函数
queue='hello1',#队列名称
no_ack=False,
)
#no_ack=False 保证消息不丢失.如果消费者受到消息后正在处理,此时网络中断或机器断电.消息还保存在队列不会丢失.
channel.start_consuming()#关闭连接

4.发布者订阅者,一对多发消息:

#!/usr/bin/env  python
#coding:utf8 import pika connection=pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel=connection.channel() #######################发布者##########################
channel.exchange_declare(exchange='logs', #交换器名字
type='fanout')#类型 channel.basic_publish(exchange='logs',#交换器
routing_key='',
body='hello,world!',#消息内容
)
#发布者把消息放到交换器,交换器把消息发布到每个队列中.~ 订阅者从交换器绑定的队列中取数据 print "[x] Sent 'Hello,world!'"
connection.close()
#!/usr/bin/env  python
#coding:utf8 import pika
import time connection=pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))#连接rabbitmq-server的IP
channel=connection.channel()#打开通道 #######################订阅者########################## channel.exchange_declare(exchange='logs',#交换器名字
type='fanout')#类型 result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
#生成随机队列 channel.queue_bind(exchange='logs',
queue=queue_name)
#把生成的随机队列绑定到交换器 def callback(ch,method,properties,body):
print(" [x] Received %r" % body)
ch.basic_ack(delivery_tag = method.delivery_tag)#处理完消息之后会发送ack信号 channel.basic_consume(callback,#往队列取回消息之后执行回调函数
queue=queue_name,#从队列中取数据
no_ack=False,
)
#no_ack=False 保证消息不丢失.如果消费者受到消息后正在处理,此时网络中断或机器断电.消息还保存在队列不会丢失.
channel.start_consuming()#关闭连接

关键字发送

exchange type = direct

之前事例,发送消息时明确指定某个队列并向其中发送消息,RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列。

#!/usr/bin/env  python
#coding:utf8 import pika connection=pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel=connection.channel() #######################发布者##########################
channel.exchange_declare(exchange='dicect_logs',
type='direct') #对绑定db关键字的队列发送数据
channel.basic_publish(exchange='dicect_logs',
routing_key='db',
body='db hello,world!',
) #对绑定web关键字的队列发送数据
channel.basic_publish(exchange='dicect_logs',
routing_key='web',
body='web hello,world!',
) print "[x] Sent 'Hello,world!'"
connection.close()

订阅者1

#!/usr/bin/env  python
#coding:utf8 import pika
import time connection=pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))#连接rabbitmq-server的IP
channel=connection.channel() #######################订阅者1##########################
channel.exchange_declare(exchange='dicect_logs',
type='direct',
) result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue channel.queue_bind(exchange='dicect_logs',
queue=queue_name,
routing_key='db',#绑定db关键字
) def callback(ch,method,properties,body):
print(" [x] Received %r:%r" %(method.routing_key,body))
ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_consume(callback,
queue=queue_name,
no_ack=False,
)
channel.start_consuming()

订阅者2

#!/usr/bin/env  python
#coding:utf8 import pika
import time connection=pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))#连接rabbitmq-server的IP
channel=connection.channel()#打开通道 #######################订阅者2########################## channel.exchange_declare(exchange='dicect_logs',
type='direct',
) result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue channel.queue_bind(exchange='dicect_logs',
queue=queue_name,
routing_key='web',#绑定web关键字
) def callback(ch,method,properties,body):
print(" [x] Received %r:%r" %(method.routing_key,body))
ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_consume(callback,
queue=queue_name,
no_ack=False,
)
channel.start_consuming()

执行结果:

发布者:

[x] Sent 'Hello,world!'

订阅者1:

[x] Received 'db':'db hello,world!'

订阅者2:

[x] Received 'web':'web hello,world!'

#每个订阅者只能收到指定关键字的消息,订阅者1收到db相关的消息,订阅者2收到web相关的消息

6、模糊匹配

exchange type = topic

在topic类型下,可以让队列绑定几个模糊的关键字,之后发送者将数据发送到exchange,exchange将传入”路由值“和 ”关键字“进行匹配,匹配成功,则将数据发送到指定队列。

  • # 表示可以匹配 0 个 或 多个 单词
  • *  表示只能匹配 一个 单词
mysql.error.log       mysql.#            可以匹配
mysql.error.log       mysql.*            只能匹配到 mysql.error
mysql.error.log      mysql.error.*     可以匹配到 mysql.error.log 

发布者代码:

#!/usr/bin/env python
import pika
import sys connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel() channel.exchange_declare(exchange='topic_logs',
type='topic') message = 'hello,world'
channel.basic_publish(exchange='topic_logs',
routing_key='mysql.error.log',
body=message) print(" [x] Sent %r" % (message))
connection.close()

订阅者代码:

#!/usr/bin/env python
import pika
import sys connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel() channel.exchange_declare(exchange='topic_logs',
type='topic') result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue channel.queue_bind(exchange='topic_logs',
queue=queue_name,
routing_key='mysql.#') print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body):
print(" [x] %r" % body) channel.basic_consume(callback,
queue=queue_name,
no_ack=True) channel.start_consuming()

最新文章

  1. Redis Cluster
  2. python 学习笔记 8(闭包)
  3. SNMP 和 NetBios协议理解
  4. Android Studio 单刷《第一行代码》系列 05 —— Fragment 基础
  5. highcharts 柱状图
  6. powerdesigener 12.5注册机
  7. 私有云Rabbitmq 集群部署
  8. PhpStorm11.0 配置在浏览器中打开文件
  9. Exception和RuntimeException的区别
  10. nyoj_600:花儿朵朵(树状数组+坐标离散化)
  11. 新博客在SEO方面需要注意哪几点?
  12. 面试之路(28)-反转链表(reverse ListNode)
  13. android 请求接口报错 org.apache.http.conn.HttpHostConnectException: Connection to http://192.168.1.90:9090 refused
  14. 深入理解 Python 中的上下文管理器
  15. 【算法】C语言趣味程序设计编程百例精解
  16. Arch Linux中安装Anaconda
  17. mysql5.7 root用户默认密码
  18. 整数中1出现的次数(从1到n整数中1出现的次数)(python)
  19. 【Codeforces 1132D】Stressful Training
  20. three.js 相机camera位置属性设置详解

热门文章

  1. 关于rem的自定义HTML比例设定
  2. RealProxy实现AOP编程(1)
  3. ionic slidebox 嵌套问题
  4. SQL server2008-对象资源管理器
  5. 各种报错各种坑 webpack让我在学习的过程中一度想要放弃
  6. Myeclipse非正常关闭出现问题
  7. 小结MapReduce 程序的流程及设计思路
  8. Install Debian note
  9. Dubbo框架
  10. NewQuant的设计(一)——整体的领域设计