RabbitMq消息队列

参考:https://blog.csdn.net/hellozpc/article/details/81436980

什么是消息队列

MQ :message Queue ,实际上是一个队列,先进先出,队列中存放的是message

主要用途:不同进程process/线程Thread之间的通信

产生消息队列的原因

1.不同进程(process)之间传递消息时,两个进程之间耦合程度过高,改动一个进程,引发必须修改另一个进程

2.不同进程(process)之间传递消息时,为了实现标准化,将消息的格式规范化

3.某一个进程接受的消息太多,一下子无法处理完,并且也有先后顺序,必须对收到的消息进行排队

RabbitMq的特点

1.实现应用程序和应用程序之间的通信

2.使用Erlang编写,Erlang是一种并发的编程语言

什么叫AMQP

AMQP: 是消息队列的一个协议

如何使用RabbitMQ

1.安装Erlang

2.安装RabbitMQ

搭建RabbitMQ环境 ------》 登录RabbitMQ -------- > 设置用户,密码,vhost ----- >获取端口号 -----》此时就拥 有了一个RabbirMQ

两个应用之间的通信先经过RabbitMQ

RabbitMQ的5种队列

一个生产者 一个队列 一个消费者:

实现:

1.建立客户端

导入客户端依赖包:amqp-client.jar

建立工厂,配置工厂信息(RabbitMQ的用户账号,密码,端口号,vhost)与MQ连接

 /**
ConnectionUtils类
*/
//ConnectionFactory来自com.rabbitmq.client.ConnectionFactory
ConnectionFactory factory = new ConnectionFactory()
//配置工厂信息
factory.setHost("localhost") //设置服务地址
factory.setPort("5672")
factory.setVirtualhost("testhost") //在RabbitMQ上创建的虚拟的主机名称
factory.setUserName("admin") //在RabbitMq上设置的账号
factory.setPassword("admin") //根据工厂建立连接
Connection conn = factory.nwe Connection() // com.rabbitmq.client.Connection
return conn;

生产者发送消息到队列

private final static String QUEUE_NAME = "q_test_01"
//获取连接
Connection conn = ConnectionUtil.getConnection()
//通过连接建立通道
Channel channel = conn.createChannel();
//通过通道创建一个队列
channel.QueueDeclare(QUEUE_NAME,false,false,false,null);
//通过通道向对列传递信息
String message = "hhh";
channel.basicPublish("",QUEUE_NAME,false,message.getByte())

生产者做的事情:

1.与RabbitMQ建立连接(conn = getConnection()),

2.之后建立通道(channel = conn.createChannel()),

3.在通道上建立队列(channel.queueDclare("name")),

4.通过通道往队列发送信息(channel.basicPublish())

消费者消费队列的信息

private final static String QUEUE_NAME = "q_test_01"
//获取连接
Connection conn = ConnectionUtil.getConnection()
//通过连接建立通道
Channel channel = conn.createChannel();
//通过通道声明一个队列
channel.QueueDeclare(QUEUE_NAME,false,false,false,null);
//定义队列的消费者
QueueingConsummer consummer = new QueueingConsumer(channel);
//监听队列
channel.basicCosume(QUEUE_NAME,true,consummer);
//获取队列的信息
while(true){
QueueingConsummer.Delively delivery = comsummer.nextDelivery();
String message = new String(delivery.getBody())
}

生产者做的事情:

1.与RabbitMQ建立连接conn = getConnection()

2.建立通道channel = conn.createChannel()

3.声明队列channel.queueDeclare("name")

4.建立通道里的消费者QueueingConsumer consumer = new Consummer(channel)

5.监听队列信息 basicConsume("队列名称",true,consumer)

6.获取数据 comsummer.nextDelivery()

work模式,一个生产者,一个队列,多个消费者

消费者1接受收信息

private final static String QUEUE_NAME = "q_test_01"
//获取连接
Connection conn = ConnectionUtil.getConnection()
//通过连接建立通道
Channel channel = conn.createChannel();
//通过通道声明一个队列
channel.QueueDeclare(QUEUE_NAME,false,false,false,null);
//同一时刻服务器只会发一条信息给消费者
channel.basicQos)(1)
//定义消费者
QueueingConsummer consummer = new QueueingConsummer(channel);
//监听队列
channel.basicComsume("QUEUE_NAME",true,consumer)
//获取队列的信息
while(true){
QueueingConsummer.Delively delivery = comsummer.nextDelivery();
String message = new String(delivery.getBody())
//休眠
Thread.sleep(10)
// 返回确认状态,注释掉表示使用自动确认模式
//channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}

消费者2接收信息

private final static String QUEUE_NAME = "q_test_01"
//获取连接
Connection conn = ConnectionUtil.getConnection()
//通过连接建立通道
Channel channel = conn.createChannel();
//通过通道声明一个队列
channel.QueueDeclare(QUEUE_NAME,false,false,false,null);
//同一时刻服务器只会发一条信息给消费者
channel.basicQos)(1)
//定义消费者
QueueingConsummer consummer = new QueueingConsummer(channel);
//监听队列,false表示手动返回完成状态,true表示自动
channel.basicComsume("QUEUE_NAME",true,consumer)
//获取队列的信息
while(true){
QueueingConsummer.Delively delivery = comsummer.nextDelivery();
String message = new String(delivery.getBody())
//休眠1秒
Thread.sleep(1000)
// 返回确认状态,注释掉表示使用自动确认模式
//channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}

测试结果:

生产者生产100条信息,消费1个消费2一起去消费信息,消费1消费一条信息睡眠0.01秒,消费2消费一条信息睡眠1秒,最后两个消费者消费的信息个数相等,且不重复(轮询分发:消息按顺序的发送给消费者)

按常理来说应该睡眠时间断的消费者得到的信息条数更多

如何解决这一问题呢?

通过Qos,和Acknowledge(告知已收到)来解决

basicQos 方法设置了当前信道最大预获取(prefetch)消息数量为1

basicQos(1):

消息从队列异步推送给消费者,消费者的 ack 也是异步发送给队列 ,队列只有在收到消费者发回的上一条消息 ack 确认后,才会向该消费者发送下一条消息

basicQos(0):

没有限制,队列会将所有消息尽快发给消费者

公平分发:

使用basicQos( prefetchCount = 1)方法 ,限制RabbitMQ只发不超过1条的消息给同一个消费者。当消息处理完毕后,有了反馈,才会进行第二次发送

使用公平分发,必须关闭自动应答,改为手动应答

//使用手动确认模式
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); // 监听队列,false表示手动返回完成状态,true表示自动
channel.basicConsume(QUEUE_NAME, false, consumer);

订阅模式(exchange)

fanout exchange (广播模式)

特点:多个队列,一个队列对应一个用户,生产者将信息发送到exchange中,通过exchange发送给绑定的队列,消费者从对应的队列中消费信息,这样,所有的消费者就可以消费相同的信息

生产者生成

package com.zpc.rabbitmq.subscribe;

import com.zpc.rabbitmq.util.ConnectionUtil;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection; public class Send { private final static String EXCHANGE_NAME = "test_exchange_fanout"; public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel(); // 声明exchange
channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); // 消息内容
String message = "Hello World!";
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'"); channel.close();
connection.close();
}
}

消费者消费

public class Recv {

    private final static String QUEUE_NAME = "test_queue_work1";

    private final static String EXCHANGE_NAME = "test_exchange_fanout";

    public static void main(String[] argv) throws Exception {

        // 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel(); // 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 绑定队列到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); // 同一时刻服务器只会发一条消息给消费者
channel.basicQos(1); // 定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 监听队列,手动返回完成
channel.basicConsume(QUEUE_NAME, false, consumer); // 获取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [Recv] Received '" + message + "'");
Thread.sleep(10); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}

声明exchange: channel.exchangeDeclare("交换机的名字","交换机的类型")

                        channel.exchangeDeclare("交换机名","fanout")

                        fanout:广播式交换机,所有队列都能接受信息

向交换机发送数据:channel.basicPublish("交换机名","", null, message.getByte())

队列与交换机绑定:channel.queueBind("队列名","交换机名")

Direct exchange (exchange的key与queue的key一一匹配模式)

生产者生产:

生产者创建交换机时声明交换机的类型

channel.exchangeDeclare("交换机名称","direct") //声明交换机的类型为direct

发送数据时指定接受的队列,key值与队列的key值一一对应

channel.basicPublish("交换机名称","路由到queue的key值",null,message.getBytes())

消费者消费:

channel.queueBind("队列名称",”交换机名称“,"队列与交换机的key值一一对应")

topic模糊匹配模式

"#":匹配所有数据

"*":匹配单个数据

生产者发送数据时声明key值

两个交换机:

channel.exchangeDeclare("交换机1","topic")

channel.basicPublish("交换机1","user.news",null,message.getBytes());

channel.basicPublish("j交换机1","user.weather",null,message.getByutes())

消费者绑定交换机:

消费者一:

channel.queueBind("队列名称","交换机名称","user.*")

最新文章

  1. Eclipse 的单步调试
  2. spark发行版笔记10
  3. (2)艺术创新思维的PS成果
  4. SSM三大框架(转发)
  5. DataList分页访问FooterTemplate模板里的控件
  6. 机器学习(一):梯度下降、神经网络、BP神经网络
  7. Perl的主要应用领域
  8. ACM2054_A=B
  9. inline与lnk2001、lnk2019,鸡肋?
  10. ORA-22835 缓冲区对于 CLOB 到 CHAR 转换或 BLOB 到 RAW 转换而言太小
  11. [ExtJS6学习笔记]Ext JS6主题系列 (Classic工具包)
  12. Linux命令行抓包及包解析工具tshark(wireshark)使用实例解析
  13. 【转载】Jmeter 性能测试入门
  14. lintcode 程序题
  15. Quick_sort
  16. C#.NET接收JSON数组
  17. mysql修改用户密码的方法及命令
  18. git 提交规范
  19. html5式程序员表白
  20. Android的环境搭建

热门文章

  1. SpringBoot中如何优雅的读取yml配置文件?
  2. spring boot 整合JPA多数据源
  3. Leetcode Tags(6)Math
  4. Java多线程编程(一)Java多线程技能
  5. pyarango整理
  6. networkx整理
  7. Mybatis JdbcType与Oracle、MySql 数据类型对应关系
  8. 深入理解.NET Core的基元(二) - 共享框架
  9. 一个基于Net Core3.0的WPF框架Hello World实例
  10. 使用ASP.NET Core 3.x 构建 RESTful API - 1.准备工作