1、安装erlang

[root@localhost ~]#yum -y install erlang

2、安装rabbitMQ

[root@localhost ~]#yum -y install rabbitmq-server

3、添加用户

[root@localhost ~]# rabbitmqctl add_user rabbit_user .com   // 添加admin的用户密码为123.com

4、将角色添加到管理员组

[root@localhost ~]# rabbitmqctl set_user_tags rabbit_user administrator

5、设置用户权限

[root@localhost ~]# rabbitmqctl set_permissions -p "/" rabbit_user  ".*" ".*" ".*"   //set_permissions [-p <vhostpath>] <user> <conf> <write> <read>

6、启用web插件

[root@localhost ~]# rabbitmq-plugins enable rabbitmq_management

7、启动rabbitMQ服务

[root@localhost ~]# systemctl start rabbitmq-server

8、访问http://192.168.10.10:15672/,如果一切顺利的话你会看到如下界面

9、输入用户名密码进入rabbitMQ后台,你会看到像下面这个样子。

到此rabbitMQ已经可以正常运行了,下面我们使用Python来操作队列。


1、安装pika模块

[root@localhost ~]# pip3 install pika

2、创建生产者模型

[root@localhost rabbitMQ]# vim producer.py
#!/usr/bin/env python
import pika
# 创建凭证,使用rabbitmq用户密码登录
credentials = pika.PlainCredentials("rabbit_user","123.com")
# 新建连接,这里localhost可以更换为服务器ip
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.10.10',credentials=credentials))
# 创建通道
channel = connection.channel()
# 在通道内声明一个队列,用于接收消息,队列名字叫“test_queue”
channel.queue_declare(queue='test_queue')
# 注意在rabbitmq中,消息想要发送给队列,必须经过交换(exchange),我们暂且将(exchange=''),
# 它允许我们精确的指定发送给哪个队列(routing_key=''),参数body值发送的数据
while True:
content = input("生产者输入数据>>>")
if content.upper() == "Q":
break
channel.basic_publish(exchange='',
routing_key='test_queue',
body=content)
print("已经发送了消息")
# 程序退出前,确保刷新网络缓冲以及消息发送给rabbitmq,需要关闭本次连接
connection.close()

3、运行生产者模型

[root@localhost rabbitMQ]# python3 producer.py

4、创建消费者模型

[root@localhost rabbitMQ]# vim consumer.py
#!/usr/bin/env python
import pika
# 建立与rabbitmq的连接
credentials = pika.PlainCredentials("rabbit_user","123.com")
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.10.10',credentials=credentials))
# 创建通道
channel = connection.channel()
# 在通道中创建队列
channel.queue_declare(queue="test_queue") def callbak(ch,method,properties,body):
print("消费者接收了消息:%r"%body.decode("utf8"))
# 有消息来时,立即执行callbak。
channel.basic_consume("test_queue",callbak,auto_ack=True)
# 等待接收消息
channel.start_consuming()

auto_ack=True:表示不确认机制也就是说每次消费者接收到数据后,不管是否处理完毕,rabbitmq-server都会把这个消息标记完成,从队列中删除,这样会有个缺陷,就是当我们从队列中获取到消息后,碰巧程序崩溃,或者什么其它原因导致程序终,此时消息已经从消息队列中删除,造成数据的不安全。

5、运行消费者模型

[root@localhost rabbitMQ]# python3 consumer.py 

6、验证消息队列的轮询机制

7、下面演示auto_ack=True的不安全机制,我们在消费者模型中主动抛出异常模拟程序非正常终止,然后查看消息队列。

7.1修改消费者模型(主动抛出异常)如下:

[root@localhost rabbitMQ]# vim consumer.py
#!/usr/bin/env python
import pika
# 建立与rabbitmq的连接
credentials = pika.PlainCredentials("rabbit_user","123.com")
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.10.10',credentials=credentials))
# 创建通道
channel = connection.channel()
# 在通道中创建队列
channel.queue_declare(queue="test_queue") def callbak(ch,method,properties,body):
# 主动抛出异常
raise TypeError
print("消费者接收了消息:%r"%body.decode("utf8"))
# 有消息来时,立即执行callbak。
channel.basic_consume("test_queue",callbak,auto_ack=True)
# 等待接收消息
channel.start_consuming()

7.2运行生产者模型

[root@localhost rabbitMQ]# python3 producer.py 

7.3运行消费之模型,查看队列变化

8、使用相对可靠的消息机制确认来保证数据安全

修改消费者模型的配置文件如下;

[root@localhost rabbitMQ]# vim consumer.py
#!/usr/bin/env python
import pika
# 建立与rabbitmq的连接
credentials = pika.PlainCredentials("rabbit_user","123.com")
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.10.10',credentials=credentials))
# 创建通道
channel = connection.channel()
# 在通道中创建队列
channel.queue_declare(queue="test_queue") def callbak(ch,method,properties,body):
# 主动抛出异常
raise TypeError
print("消费者接收了消息:%r"%body.decode("utf8"))
# 告诉消息队列,我已经确认收到消息了
ch.basic_ack(delivery_tag=method.delivery_tag)
# 有消息来时,立即执行callbak。
channel.basic_consume("test_queue",callbak,auto_ack=False)
# 等待接收消息
channel.start_consuming()

这样在消费者没有确认的情况下,消息队列中的消息是不会被删除的。如下;

至于是选择auto_ack确认机制还是使用auto_ack不确认机制,还需要你根据实际情况来定。

9、消息队列的持久化

我们上面的配置如果rabbitMQ重启服务,或者系统重启都会导致队列里的消息被清除掉。下面我们修改生产者的配置文件使消息队列持久化,即使重启服务,消息队列里的消息也不会被清除。

[root@localhost rabbitMQ]# vim producer.py 
#!/usr/bin/env python
import pika
# 创建凭证,使用rabbitmq用户密码登录
credentials = pika.PlainCredentials("rabbit_user","123.com")
# 新建连接,这里localhost可以更换为服务器ip
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.10.10',credentials=credentials))
# 创建通道
channel = connection.channel()
# 在通道内声明一个队列,用于接收消息,队列名字叫“test_queue”,durable=True表示持久化队列
channel.queue_declare(queue='delivery_queue',durable=True)
# 注意在rabbitmq中,消息想要发送给队列,必须经过交换(exchange),我们暂且将(exchange=''),
# 它允许我们精确的指定发送给哪个队列(routing_key=''),参数body值发送的数据
while True:
content = input("生产者输入数据>>>")
if content.upper() == "Q":
break
channel.basic_publish(exchange='',
routing_key='delivery_queue',
body=content,
properties=pika.BasicProperties(delivery_mode=2)) # 2代表的是持久化队列
print("已经发送了消息")
# 程序退出前,确保刷新网络缓冲以及消息发送给rabbitmq,需要关闭本次连接
connection.close()

常用命令

// 新建用户
rabbitmqctl add_user {用户名} {密码}

// 设置权限
rabbitmqctl set_user_tags {用户名} {权限}

// 查看用户列表
rabbitmqctl list_users

// 为用户授权
添加 Virtual Hosts :
rabbitmqctl add_vhost <vhost>

// 删除用户
rabbitmqctl delete_user Username

// 修改用户的密码
rabbitmqctl change_password Username Newpassword // 删除 Virtual Hosts :
rabbitmqctl delete_vhost <vhost> // 添加 Users :
rabbitmqctl add_user <username> <password>
rabbitmqctl set_user_tags <username> <tag> ...
rabbitmqctl set_permissions [-p <vhost>] <user> <conf> <write> <read> // 删除 Users :
delete_user <username>

// 使用户user1具有vhost1这个virtual host中所有资源的配置、写、读权限以便管理其中的资源
rabbitmqctl set_permissions -p vhost1 user1 '.*' '.*' '.*'

// 查看权限
rabbitmqctl list_user_permissions user1

rabbitmqctl list_permissions -p vhost1

// 清除权限
rabbitmqctl clear_permissions [-p VHostPath] User

rabbitmqctl reset // 重启应用
rabbitmqctl stop_app // 关闭应用
rabbitmqctl start_app // 启动应用
rabbitmqctl list_queues // 查看队列
​rabbitmqctl list_exchanges // 查看exchangelist
rabbitmqctl list_queues // 查看所有queue
rabbitmqctl list_users // 查看所有用户
rabbitmqctl list_bindings // 查看所有绑定exchange和queued 消息
rabbitmqctl list_queues name messages_ready messages_unacknowledged // 查看消息确认
rabbitmqctl status // 查看rabbitMQ的状态信息。

最新文章

  1. 2016-6-15-de novo文献阅读
  2. java实现 swing模仿金山打字 案例源码
  3. Codeforces Round #250 (Div. 1) A. The Child and Toy 水题
  4. GNU C中x++是原子操作吗?
  5. 基于邻接矩阵的广度优先搜索遍历(BFS)
  6. CSS 的选择符
  7. c++试题
  8. CopyU!新插件 CopyPC2U正式发布!
  9. 庖丁解牛Linux内核学习笔记(1)--计算机是如何工作的
  10. 枚举特性FlagsAttribute的用法
  11. js控制style样式
  12. iOS ipa包瘦身,iOS8及以下text段超60MB
  13. Wordpress 之删除 RSS 功能 的&quot;文章RSS&quot;、&quot;评论RSS&quot;、&quot;WordPress.org&quot;
  14. 深入字节码理解invokeSuper无限循环的原因
  15. 【转】给网站添加X-UA-Compatible标签
  16. LeetCode 79 Word Search(单词查找)
  17. VS Code 基本介绍 和 快捷键
  18. hdu 1978 How many ways 记忆化搜索 经典例题
  19. Linux--安全加固01
  20. JVM的理解

热门文章

  1. Centos7安装宝塔控制面板
  2. Delphi-基础(例程、例程返回值)
  3. 17、DNS服务器
  4. 01day-微信小程序 表单组件 动态绑定变量 导航组件 地图组件 view text button 上下滚动组件
  5. 洛谷 P5564: [Celeste-B]Say Goodbye
  6. Springboot jackSon -序列化-详解
  7. 初识xls文件的读写
  8. [Taro] Taro 环境安装 (一)
  9. 洛谷 P1950 长方形_NOI导刊2009提高(2)
  10. Spring Cloud组件使用/配置小记