RabbitMQ介绍



说明:

Consumer (消费者):使用队列 Queue 从 Exchange 中获取消息的应用。

Exchange (交换机):负责接收生产者的消息并把它转到到合适的队列。

Queue (队列):一个存储Exchange 发来的消息的缓冲,并将消息主动发送给Consumer,或者 Consumer 主动来获取消息。

Binding (绑定):队列 和 交换机 之间的关系。Exchange 根据消息的属性和 Binding 的属性来转发消息。绑定的一个重要属性是 binding_key。

Connection (连接)和 Channel (通道):生产者和消费者需要和 RabbitMQ 建立 TCP 连接。一些应用需要多个connection,为了节省TCP 连接,可以使用 Channel,它可以被认为是一种轻型的共享 TCP 连接的连接。连接需要用户认证,并且支持 TLS (SSL)。连接需要显式关闭。

Message (消息): RabbitMQ 转发的二进制对象,包括Headers(头)、Properties (属性)和 Data (数据),其中数据部分不是必要的。Producer(生产者): 消息的生产者,负责产生消息并把消息发到交换机


RabbitMQ安装

第一种安装方式:

官网下载地址:https://www.rabbitmq.com/download.html

第二种安装方式:

使用APT库

 deb http://www.rabbitmq.com/debian/ testing main
wget http://www.rabbitmq.com/rabbitmq-signing-key-public.asc
sudo apt-key add rabbitmq-signing-key-public.asc
apt-get update
sudo apt-get install rabbitmq-server

Python操作RabbitMQ

实现简单消息队列:



图解:生产者(producer)把消息发送到一个名为“hello”的队列中。消费者(consumer)从这个队列中获取消息。

生产者(producer.py)全部代码

#!/usr/bin/env python
import pika connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel() channel.queue_declare(queue='hello') channel.basic_publish(exchange='',
routing_key='hello',
body='Hello World!')
print " [x] Sent 'Hello World!'"
connection.close()

消费者(consumer.py)全部代码

#!/usr/bin/env python
import pika connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel() channel.queue_declare(queue='hello') print ' [*] Waiting for messages. To exit press CTRL+C' def callback(ch, method, properties, body):
print " [x] Received %r" % (body,) channel.basic_consume(callback,
queue='hello',
no_ack=True) channel.start_consuming()

首先在终端中运行我们的 producer.py 程序:

$ python producer.py
[x] Sent 'Hello World!'

然后,运行 consumer.py 程序

$ python consumer.py
[*] Waiting for messages. To exit press CTRL+C
[x] Received 'Hello World!'

工作队列(任务队列)

图解:当我们把任务(Task)当作消息发送到队列中,一个运行在后台的工作者(worker)进程就会取出任务然后处理。当你运行多个工作者(workers),任务就会在它们之间共享。

工作队列(任务队列-Task Queues)是为了避免等待一些占用大量资源、时间的操作,它会发送一些耗时的任务给多个工作者(Worker)。

这个概念在网络应用中是非常有用的,它可以在短暂的HTTP请求中处理一些复杂的任务。

注意:此处使用默认交换机

生产者(producer.py)全部代码

#coding: utf-8
'''
循环调度:
使用工作队列的一个好处就是它能够并行的处理队列。如果堆积了很多任务,我们只需要添加更多的工作者(workers)就可以了,扩展很简单。
若有多个消费者,即打开多个终端,运行消费者程序。默认来说,RabbitMQ会按顺序得把消息发送给每个消费者(consumer)。平均每个消费者都会收到同等数量得消息。这种发送消息得方式叫做——轮询(round-robin)。
''' import pika
import sys connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) #durable=True 表示为了不让队列消失,需要把队列声明为持久化 message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
routing_key='task_queue',
body=message,
properties=pika.BasicProperties(
delivery_mode = 2, #我们需要把我们的消息也要设为持久化——将delivery_mode的属性设为2。
)) print(" [x] Sent %r" % (message,))
connection.close()

消费者(consumer.py)全部代码

#coding: utf-8
import pika
import time
#使用time.sleep()函数来模拟占用大量资源、时间的操作
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel() #如果你没有特意告诉RabbitMQ,那么在它退出或者崩溃的时候,将会丢失所有队列和消息。为了确保信息不会丢失,有两个事情是需要注意的:我们必须把“队列”和“消息”设为持久化。
channel.queue_declare(queue='task_queue', durable=True) #durable=True 表示为了不让队列消失,需要把队列声明为持久化
#这个queue_declare必须在生产者(producer)和消费者(consumer)对应的代码中修改。
#这时候,我们就可以确保在RabbitMq重启之后queue_declare队列不会丢失。
print(" [*] Waiting for message. To exit press CTRL+C") def callback(ch, method, properties, body):
print(" [x] Received %r" % (body,))
#time.sleep(body.count('.'))
print(" [x] Done") #一个很容易犯的错误就是忘了basic_ack,后果很严重。
#消息在你的程序退出之后就会重新发送,如果它不能够释放没响应的消息,RabbitMQ就会占用越来越多的内存。
ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_qos(prefetch_count=1) #我们可以使用basic.qos方法,并设置prefetch_count=1。
#这样是告诉RabbitMQ,再同一时刻,不要发送超过1条消息给一个工作者(worker),直到它已经处理了上一条消息并且作出了响应。这样,RabbitMQ就会把消息分发给下一个空闲的工作者(worker)。
channel.basic_consume(callback, queue='task_queue', no_ack=False) # no_ack=False 默认值,表示消息响应是开启的
channel.start_consuming()

发布/订阅

图解:为了描述这种模式,我们将会构建一个简单的日志系统。它包括两个程序——第一个程序负责发送日志消息,第二个程序负责获取消息并输出内容。在这个日志系统中,所有正在运行的接收方程序都会接受消息。用其中一个消费者或者接收者把日志写入硬盘中,另外一个消费者或者接受者把日志输出到屏幕上。最终,日志消息被广播给所有的消费者或者接受者。

注意:此处使用扇形交换机

生产者(producer.py)全部代码

#!/usr/bin/env python
import pika
import sys connection =pika.BlockingConnection(
pika.ConnectionParameters(host='localhost')
) channel = connection.channel() channel.exchange_declare(exchange='logs', type='fanout') message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs',
routing_key='',
body=message
)
print (" [x] Sent %r" % (message,))
connection.close()

消费者(consumer.py)全部代码

#!/usr/bin/env python
import pika connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost')
) channel = connection.channel() channel.exchange_declare(exchange='logs', type='fanout')  #扇型交换机 result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue channel.queue_bind(exchange='logs', queue=queue_name) print (' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body):
print " [x] %r" % (body,) channel.basic_consume(callback,
queue=queue_name,
no_ack=True
) channel.start_consuming()

运行如下命令:

首先,运行消费者(consumer.py)命令

python3 consumer.py > consumer.log  #把日志保存到文件中

python3 consumer.py    #在屏幕中查看日志

然后,运行生产者(producer.py)命令

python3 producer.py

发布/订阅(改进版)

下图能够很好的描述这个场景:



图解:在这个场景中,可以看到直连交换机 X和两个队列进行了绑定。第一个队列使用orange作为绑定键,第二个队列有两个绑定,一个使用black作为绑定键,另外一个使用green。这样以来,当路由键为orange的消息发布到交换机,就会被路由到队列Q1。路由键为black或者green的消息就会路由到Q2。其他的所有消息都将会被丢弃。

发布/订阅(改进版)

注意:此处使用直接交换机

生产者(producer.py)或发送者全部代码

#!/usr/bin/env python
import pika
import sys
'''
发送者日志:把消息发送到一个直连交换机,把日志级别作为路由键。这样接收日志的脚本就可以根据严重级别来选择它想要处理的日志。
'''
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost')
) channel = connection.channel() channel.exchange_declare(exchange='direct_logs', type='direct') severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',
routing_key=severity,   #此处routing_key为路由键
body=message
) print (" [x] Sent %r:%r" % (severity, message))
connection.close()

消费者(consumer.py)或接收者全部代码

#!/usr/bin/env python
import pika
import sys '''
为每个严重级别分别创建一个新的绑定
''' connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost')
) channel = connection.channel() channel.exchange_declare(exchange='direct_logs', type='direct') result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue severities = sys.argv[1:]
if not severities:
print >> sys.stderr, "Usage: %s [info] [warning] [error]" % \
(sys.argv[0],)
sys.exit(1) for severity in severities:
channel.queue_bind(exchange='direct_logs',
queue=queue_name,
routing_key=severity  #此处routing_key为绑定键
) print (' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body):
print (" [x] %r:%r" % (method.routing_key, body,)) channel.basic_consume(callback,
queue=queue_name,
no_ack=True) channel.start_consuming()

运行如下命令:

首先,运行消费者(consumer.py)或者发送者命令

python3 consumer.py warning error > consumer.log #此命令只是保存warning和error级别的日志到磁盘

 python3 consumer.py info warning error

然后,运行生产者(producer.py)或者发送者命令

python3 producer.py error "Run. Run. Or it will explode."

发布/订阅(再改进版)

从以上例子可以看出,直连交换机替代了扇型交换机,从只能盲目的广播消息改进为有可能选择性的接收日志。

尽管直连交换机能够改善我们的系统,但是它也有它的限制 —— 没办法基于多个标准执行路由操作。通过使用主题交换机,可以监听来源于“cron”的严重程度为“critical errors”的日志,也可以监听来源于“kern”的所有日志。

图解:一个携带有 quick.orange.rabbit 的消息将会被分别投递给这两个队列。携带着 lazy.orange.elephant 的消息同样也会给两个队列都投递过去。另一方面携带有 quick.orange.fox 的消息会投递给第一个队列,携带有 lazy.brown.fox 的消息会投递给第二个队列。携带有 lazy.pink.rabbit 的消息只会被投递给第二个队列一次,即使它同时匹配第二个队列的两个绑定。携带着 quick.brown.fox 的消息不会投递给任何一个队列。

如果我们违反约定,发送了一个携带有一个单词或者四个单词(”orange” or “quick.orange.male.rabbit”)的消息时,发送的消息不会投递给任何一个队列,而且会丢失掉。

但是另一方面,即使 “lazy.orange.male.rabbit” 有四个单词,他还是会匹配最后一个绑定,并且被投递到第二个队列中。

注意:此处使用主题交换机

生产者(producer.py)或发送者全部代码

#!/usr/bin/env python
import pika
import sys connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost')
) channel = connection.channel() channel.exchange_declare(exchange='topic_logs', type='topic') routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
routing_key=routing_key,
body=message
)
print (" [x] Sent %r:%r" % (routing_key, message))
connection.close()

消费者(consumer.py)或接收者全部代码

#!/usr/bin/env python
import pika
import sys connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost')
) channel = connection.channel() channel.exchange_declare(exchange='topic_logs', type='topic') result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue binding_keys = sys.argv[1:]
if not binding_keys:
print >> sys.stderr, "Usage: %s [binding_key]..." % (sys.argv[0],)
sys.exit(1) for binding_key in binding_keys:
channel.queue_bind(exchange='topic_logs',
queue=queue_name,
routing_key=binding_key) print (' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body):
print (" [x] %r:%r" % (method.routing_key, body,)) channel.basic_consume(callback,
queue=queue_name,
no_ack=True
) channel.start_consuming()

运行如下命令:

首先,运行消费者(consumer.py)或者发送者命令

python3 consumer.py "#"   #接收所有日志

python3 consumer.py "kern.*"    #接收来自kern设备的日志

python3 consumer.py "*.critical"  #只接收严重程度为critical的日志

python3 consumer.py "kern.*" "*.critical"  #建立多个绑定

然后,运行生产者(producer.py)或者发送者命令

python3 producer.py "kern.critical" "A critical kernel error"

参考文档:

http://www.zixuebook.cn/rabbitmq/rabbitmq-python-intro.html

http://rabbitmq.mr-ping.com/tutorials_with_python/[2]Work_Queues.html

最新文章

  1. seajs的CMD模式的优势以及使用
  2. TextView所有属性
  3. mysql实例 保存查询结果到变量
  4. UVA 1364 - Knights of the Round Table (获得双连接组件 + 二部图推理染色)
  5. 取消选中单选框radio的三种方式
  6. 【转】rem自适应布局
  7. HDU4609 3-idiots(生成函数)
  8. SpringCloud之初识Hystrix熔断器 ----- 程序的保护机制
  9. git----------SourceTree如何连接ssh的仓库地址,这里记录的是客户端需要做的事
  10. windows环境下面批量新建文件夹
  11. C#-MVC开发微信应用(7)--在管理系统中同步微信用户分组信息
  12. [转]谈谈 Bias-Variance Tradeoff
  13. JavaScript prototype背后的工作原理
  14. AutoFac简介
  15. MySQL · 数据恢复 · undrop-for-innodb
  16. C++内存管理变革(6):通用型垃圾回收器 - ScopeAlloc
  17. js截取字符串substr和substring的区别
  18. Gradle设置代理
  19. Python 操作 Excel 、txt等文件
  20. 计算机原理--cpu篇

热门文章

  1. 1.安装CDH5.12.x
  2. css贝塞尔曲线模仿饿了么购物车小球动画
  3. 《剑指Offer》题六十一~题六十八
  4. 第二十一次ScrumMeeting会议
  5. [经典贪心算法]Prim算法
  6. java — JVM调优
  7. LintCode-70.二叉树的层次遍历 II
  8. PagedDataSource数据绑定控件和AspNetPager分页控件结合使用列表分页
  9. PokeCats开发者日志(十一)
  10. intellij idea 之 CheckStyle 代码格式校验