RibbitMQ

MQ优势

MQ的三大主要作用: 应用解耦、异步提速、流量削锋

应用解耦

系统的耦合性越高,容错性就越低,可维护性就越低:

解耦:

如果其中一个系统服务宕机,那么系统的其他服务将也无法保证一致性,只能执行失败,可以使用 MQ 进行解耦,提升容错性和可维护性。

异步提速

提升用户体验和系统吞吐量(单位时间内处理请求的数目):

用户下单请求访问订单系统需要多久才有反馈可以看出访问DB和各种相关系统需要:20 + 300 + 300 + 300 = 920ms

这样的效率算不得快,如果QPS高的话订单一多很多用户在后续排队等不到反馈,体验也不好。

用户下单请求后,只需等待25ms就能得到下单响应 (20 + 5 = 25ms)。因为订单需要只需要和MQ进行交互,后续由MQ与库存等系统交互,而MQ是异步无需等待其他系统反馈吞吐量自然提升上去了。

削峰填谷

使用了 MQ 之后,限制消费消息的速度为1000,这样一来,高峰期产生的数据势必会被积压在 MQ 中,高峰就被“削”掉了,但是因为消息积压,在高峰期过后的一段时间内,消费消息的速度还是会维持在1000,直到消费完积压的消息,这就叫做“填谷”。可以提高系统稳定性。

RibbtMQ

  • Broker:接收和分发消息的应用,RabbitMQ Server就是 Message Broker
  • Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个vhost,每个用户在自己的 vhost 创建 exchange/queue 等
  • Connection:publisher/consumer 和 broker 之间的 TCP 连接
  • Channel:如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的 channel 进行通讯,AMQP method 包含了channel id 帮助客户端和message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销
  • Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)
  • Queue:消息最终被送到这里等待 consumer 取走
  • Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key。Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据

中间件:

也许很难给中间件一个严格的定义,但中间件应具有如下的一些特点:

(1)满足大量应用的需要

(2)运行于多种硬件和OS平台

(3)支持分布计算,提供跨网络、硬件和OS平台的透明性的应用或服务的交互

(4)支持标准的协议

(5)支持标准的接口

举例:
1,RMI(Remote Method Invocations, 远程调用)
2,Load Balancing(负载均衡,将访问负荷分散到各个服务器中)
3,Transparent Fail-over(透明的故障切换)
4,Clustering(集群,用多个小的服务器代替大型机)
5,Back-end-Integration(后端集成,用现有的、新开发的系统如何去集成遗留的系统)
6,Transaction事务(全局/局部)全局事务(分布式事务)局部事务(在同一数据库联接内的事务)
7,Dynamic Redeployment(动态重新部署,在不停止原系统的情况下,部署新的系统)
8,System Management(系统管理)
9,Threading(多线程处理)
10,Message-oriented Middleware面向消息的中间件(异步的调用编程)
11,Component Life Cycle(组件的生命周期管理)
12,Resource pooling(资源池)
13,Security(安全)
14,Caching(缓存)

主流的:

  • 消息中间件 ActiveMQ
  • 消息中间件 RabbitMQ
  • 消息中间件 Kafaka
  • 消息中间件 RocketMQ
  • 消息中间件应用场景说明
  • 负载均衡中间件(Nginx/Lvs)
  • 缓存中间件(Memcache/Redis)
  • 数据库中间件(ShardingJdbc/Mycat)

消息中间件负责数据的传递,存储,和分发消费三个部分,数据的存储和分发的过程中肯定要遵循某种约定成俗的规范,你是采用底层的TCP/IP,UDP协议还是其他的自己取构建等,而这些约定成俗的规范就称之为:协议

网络协议的三要素

1.语法。语法是用户数据与控制信息的结构与格式,以及数据出现的顺序。

2.语义。语义是解释控制信息每个部分的意义。它规定了需要发出何种控制信息,以及完成的动作与做出什么样的响应。

3.时序。时序是对事件发生顺序的详细说明。

而消息中间件采用的并不是http协议,而常见的消息中间件协议有:OpenWire、AMQP、MQTT、Kafka,OpenMessage协议。

面试题:为什么消息中间件不直接使用http协议呢?

  1. 因为http请求报文头和响应报文头是比较复杂的,包含了cookie,数据的加密解密,状态码,响应码等附加的功能,但是对于一个消息而言,我们并不需要这么复杂,也没有这个必要性.一定要追求的是高性能。尽量简洁,快速。
  2. 大部分情况下http大部分都是短链接,在实际的交互过程中,一个请求到响应很有可能会中断,中断以后就不会就行持久化,就会造成请求的丢失。
AMQP协议()

AMQP:(全称:Advanced Message Queuing Protocol) 是高级消息队列协议。由摩根大通集团联合其他公司共同设计。是一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。Erlang中的实现有RabbitMQ等。

特性:

1:分布式事务支持。

2:消息的持久化支持。

3:高性能和高可靠的消息处理优势。

MQTT协议

MQTT协议:(Message Queueing Telemetry Transport)消息队列是IBM开放的一个即时通讯协议,物联网系统架构中的重要组成部分。

特点:

1:轻量

2:结构简单

3:传输快,不支持事务

4:没有持久化设计。

应用场景:

1:适用于计算能力有限

2:低带宽

3:网络不稳定的场景。

Kafka协议

Kafka协议是基于TCP/IP的二进制协议。消息内部是通过长度来分割,由一些基本数据类型组成。

特点是:

1:结构简单

2:解析速度快

3:无事务支持

4:有持久化设计

OpenMessage协议

是近几年由阿里、雅虎和滴滴出行、Stremalio等公司共同参与创立的分布式消息中间件、流处理等领域的应用开发标准。

特点:

1:结构简单

2:解析速度快

3:支持事务和持久化设计。

持久化

分发策略

MQ消息队列有如下几个角色

1:生产者

2:存储消息

3:消费者

那么生产者生成消息以后,MQ进行存储,消费者是如何获取消息的呢?一般获取数据的方式无外乎推(push)或者拉(pull)两种方式,典型的git就有推拉机制,我们发送的http请求就是一种典型的拉取数据库数据返回的过程。而消息队列MQ是一种推送的过程,而这些推机制会适用到很多的业务场景也有很多对应推机制策略。

消息队列高可用和高可靠

所谓高可用:是指产品在规定的条件和规定的时刻或时间内处于可执行规定功能状态的能力。

当业务量增加时,请求也过大,一台消息中间件服务器的会触及硬件(CPU,内存,磁盘)的极限,一台消息服务器你已经无法满足业务的需求,所以消息中间件必须支持集群部署。来达到高可用的目的。

集群模式1 - Master-slave主从共享数据的部署方式
集群模式2 - Master- slave主从同步部署方式
集群模式3 - 多主集群同步部署模式
集群模式4 - 多主集群转发部署模式
集群模式5 Master-slave与Breoker-cluster组合的方案

反正终归三句话:

1:要么消息共享,

2:要么消息同步

3:要么元数据共享

所谓高可用是指:是指系统可以无故障低持续运行,比如一个系统突然崩溃,报错,异常等等并不影响线上业务的正常运行,出错的几率极低,就称之为:高可靠。

在高并发的业务场景中,如果不能保证系统的高可靠,那造成的隐患和损失是非常严重的。

如何保证中间件消息的可靠性呢?可以从两个方面考虑:

1:消息的传输:通过协议来保证系统间数据解析的正确性。

2:消息的存储可靠:通过持久化来保证消息的可靠性。

输入:http://localhost:15672/#/

阿里云Docker安装RabbitMq:

    (1)yum 包更新到最新
> yum update
(2)安装需要的软件包, yum-util 提供yum-config-manager功能,另外两个是devicemapper驱动依赖的
> yum install -y yum-utils device-mapper-persistent-data lvm2
(3)设置yum源为阿里云
> yum-config-manager --add-repo http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo
(4)安装docker
> yum install docker-ce -y
(5)安装后查看docker版本
> docker -v
(6) 安装加速镜像
sudo mkdir -p /etc/docker
sudo tee /etc/docker/daemon.json <<-'EOF'
{
"registry-mirrors": ["https://0wrdwnn6.mirror.aliyuncs.com"]
}
EOF
sudo systemctl daemon-reload
sudo systemctl restart docker

docker启动命令

    # 启动docker:
systemctl start docker
# 停止docker:
systemctl stop docker
# 重启docker:
systemctl restart docker
# 查看docker状态:
systemctl status docker
# 开机启动:
systemctl enable docker
systemctl unenable docker
# 查看docker概要信息
docker info
# 查看docker帮助文档
docker --help

安装

    docker run -di --name myrabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 -p 25672:25672 -p 61613:61613 -p 1883:1883 rabbitmq:management

启动Docker中的RabbitMQ:

https://zhuanlan.zhihu.com/p/133257628

查看容器状态

查看当前正在运行的容器信息,包括容器ID(container port),镜像名,容器名,端口信息,状态等

docker ps

可用参数: - -a : 显示所有容器(默认显示最近曾经运行过的) - -n [num] : 显示最近创建过的num个容器 - -q : 只显示容器ID

从下图测试结果也可以看出 名为my-rabbitMQ 容器的Port信息中有服务器本地45672端口到该容器15672端口的映射

查看容器的端口映射

查看指定容器的端口映射情况

docker port [containerID | containerName]

容器的启动/停止/重启

    # 启动指定容器
docker start [containerID | containerName]
# 停止指定容器
docker stop [containerID | containerName]
# 重启指定容器
docker restart [containerID | containerName]

删除容器

    # 删除指定容器
docker rm [containerID | containerName]

可用参数:

  • -f : 强制删除

RabbitMQ入门案例 - Simple 简单模式

 <dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.10.0</version>
</dependency>

生产者把数据推入消息队列MQ

package com.gton.rabbit;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory; /**
* @description: 生产者
* @author: GuoTong
* @createTime: 2021-10-29 17:49
* @since JDK 1.8 OR 11
**/
public class Producer { public static void main(String[] args) {
//1.RabbitMq使用的是amqp协议,创建连接工程 // 1: 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2: 设置连接属性
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
Connection connection = null;
Channel channel = null;
try {
// 3: 从连接工厂中获取连接
connection = connectionFactory.newConnection("生产者");
// 4: 从连接中获取通道channel
channel = connection.createChannel();
// 5: 申明队列queue存储消息
/*
* 如果队列不存在,则会创建
* Rabbitmq不允许创建两个相同的队列名称,否则会报错。
*
* @params1: queue 队列的名称
* @params2: durable 队列是否持久化
* @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
* @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。
* @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。
* */
//定义消息队列的名字
String msgQueryName = "queue1";
channel.queueDeclare(msgQueryName, false, false, false, null);
// 6: 准备发送消息的内容
String message = "你好,学相伴!!!";
// 7: 发送消息给中间件rabbitmq-server
// @params1: 交换机exchange
// @params2: 队列名称/routing
// @params3: 属性配置
// @params4: 发送消息的内容
channel.basicPublish("", "queue1", null, message.getBytes());
System.out.println("消息发送成功!");
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("发送消息出现异常...");
} finally {
// 7: 释放连接关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}

消费者从消息队列里面把消息消费

package com.gton.rabbit;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory; /**
* @description:消费者,去消息队列消费
* @author: GuoTong
* @createTime: 2021-10-29 18:01
* @since JDK 1.8 OR 11
**/
public class Consumer {
public static void main(String[] args) {
//1.RabbitMq使用的是amqp协议,创建连接工程 // 1: 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2: 设置连接属性
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
Connection connection = null;
Channel channel = null;
try {
// 3: 从连接工厂中获取连接
connection = connectionFactory.newConnection("消费者");
// 4: 从连接中获取通道channel
channel = connection.createChannel();
//定义消息队列的名字
String msgQueryName = "queue1";
//消费basicConsume:String var1, boolean var2, DeliverCallback var3, CancelCallback var4
channel.basicConsume(msgQueryName,
true,
(s, delivery) -> System.out.println("消费者从消息队列获取的信息:" +
new String(delivery.getBody(), "UTF-8")),
s -> System.out.println("消费者消费消息失败了"));
System.out.println("接收成功");
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("连接消息队列操作出现异常...");
} finally {
// 7: 释放连接关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}

核心组件:

核心概念:
Server:又称Broker ,接受客户端的连接,实现AMQP实体服务。 安装rabbitmq-server
Connection:连接,应用程序与Broker的网络连接 TCP/IP/ 三次握手和四次挥手
Channel:网络信道,几乎所有的操作都在Channel中进行,Channel是进行消息读写的通道,客户端可以建立对各Channel,每个Channel代表一个会话任务。
Message :消息:服务与应用程序之间传送的数据,由Properties和body组成,Properties可是对消息进行修饰,比如消息的优先级,延迟等高级特性,Body则就是消息体的内容。
Virtual Host 虚拟地址,用于进行逻辑隔离,最上层的消息路由,一个虚拟主机理由可以有若干个Exhange和Queueu,同一个虚拟主机里面不能有相同名字的Exchange
Exchange:交换机,接受消息,根据路由键发送消息到绑定的队列。(==不具备消息存储的能力==)
Bindings:Exchange和Queue之间的虚拟连接,binding中可以保护多个routing key.
Routing key:是一个路由规则,虚拟机可以用它来确定如何路由一个特定消息。
Queue:队列:也成为Message Queue,消息队列,保存消息并将它们转发给消费者

RabbitMQ的工作模式

RabbitMQ 提供了 6 种工作模式:简单模式、work queues、Publish/Subscribe 发布与订阅模式、Routing 路由模式、Topics 主题模式、RPC 远程调用模式(远程调用,不太算 MQ;暂不作介绍)。

发布订阅模式具体实现

  • 类型:fanout
  • 特点:Fanout—发布与订阅模式,是一种广播机制,它是没有路由key的模式。

Direct模式模式具体实现

  • 类型:direct
  • 特点:Direct模式是fanout模式上的一种叠加,增加了路由RoutingKey的模式

RabbitMQ的模式之Topic模式

  • 类型:topic
  • 特点:Topic模式是direct模式上的一种叠加,增加了模糊路由RoutingKey的模式。

代码细讲:Direct模式

package com.gton.rabbit;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory; /**
* @description: 完整版
* @author: GuoTong
* @createTime: 2021-11-08 16:40
* @since JDK 1.8 OR 11
**/
@SuppressWarnings("all")
public class NewProducer {
public static void main(String[] args) {
//1.RabbitMq使用的是amqp协议,创建连接工程
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2: 设置连接属性
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
Connection connection = null;
Channel channel = null;
try {
// 3: 从连接工厂中获取连接
connection = connectionFactory.newConnection("生产者");
// 4: 从连接中获取通道channel
channel = connection.createChannel();
//定义消息队列的名称
String msgQueryName1 = "queue1";
String msgQueryName2 = "queue2";
String msgQueryName3 = "queue3";
//交换机的名字
String exchangeName = "direct_message_exchange";
//类型
String exchangeTyppe = "direct";
//交换机声明:交换机名字,交换机类型,是否持久化
channel.exchangeDeclare(exchangeName,exchangeTyppe,true);
/* 声明队列
* 如果队列不存在,则会创建
* Rabbitmq不允许创建两个相同的队列名称,否则会报错。
*
* @params1: queue 队列的名称
* @params2: durable 队列是否持久化
* @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
* @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。
* @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。
* */
channel.queueDeclare(msgQueryName1,true,false,false,null);
channel.queueDeclare(msgQueryName2,true,false,false,null);
channel.queueDeclare(msgQueryName3,true,false,false,null);
//交换机绑定队列和交换机的关系
/**
* Description:
* 参数一:队列名
* 参数二:交换机名
* 参数三:routingKey
*/
channel.queueBind(msgQueryName1,exchangeName,"routingKey1");
channel.queueBind(msgQueryName2,exchangeName,"routingKey2");
channel.queueBind(msgQueryName3,exchangeName,"routingKey1");
// 6: 准备发送消息的内容
String message = "你好,RabbitMq!!!";
// 7: 发送消息给中间件rabbitmq-server
// @params1: 交换机exchange 写空字符串不代表没交换机:交换机一定会有,不写就就有一个默认的
// @params2: 队列名称/routing :往这里面去发送信息
// @params3: 属性配置
// @params4: 发送消息的内容
channel.basicPublish(exchangeName, "routingKey1", null, message.getBytes());
System.out.println("消息发送成功!");
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("发送消息出现异常...");
} finally {
// 7: 释放连接关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}
package com.gton.rabbit;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory; /**
* @description: 完整版
* @author: GuoTong
* @createTime: 2021-11-08 16:41
* @since JDK 1.8 OR 11
**/
public class NewConsumer {
public static void main(String[] args) { // 1: 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2: 设置连接属性
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
Connection connection = null;
Channel channel = null;
try {
// 3: 从连接工厂中获取连接
connection = connectionFactory.newConnection("消费者");
// 4: 从连接中获取通道channel
channel = connection.createChannel();
//定义消息队列的名字
String msgQueryName = "queue1";
//消费basicConsume:String var1, boolean var2, DeliverCallback var3, CancelCallback var4
channel.basicConsume(msgQueryName,
true,
(s, delivery) -> System.out.println("消费者从queue1消息队列获取的信息:" +
new String(delivery.getBody(), "UTF-8")),
s -> System.out.println("消费者消费消息失败了"));
msgQueryName = "queue2";
channel.basicConsume(msgQueryName,
true,
(s, delivery) -> System.out.println("消费者从queue2消息队列获取的信息:" +
new String(delivery.getBody(), "UTF-8")),
s -> System.out.println("消费者消费消息失败了")); msgQueryName = "queue3";
channel.basicConsume(msgQueryName,
true,
(s, delivery) -> System.out.println("消费者从queue3消息队列获取的信息:" +
new String(delivery.getBody(), "UTF-8")),
s -> System.out.println("消费者消费消息失败了"));
//前面的都是异步:所以这个接收成功可能先执行
System.out.println("接收成功");
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("连接消息队列操作出现异常...");
} finally {
// 7: 释放连接关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}

Work模式轮询模式(Round-Robin)

当有多个消费者时,我们的消息会被哪个消费者消费呢,我们又该如何均衡消费者消费信息的多少呢?

主要有两种模式:

1、轮询模式的分发:一个消费者一条,按均分配;

2、公平分发:根据消费者的消费能力进行公平分发,处理快的处理的多,处理慢的处理的少;按劳分配;

Work模式 - 公平分发(Fair Dispatch)

手动应答:

特点:由于消息接收者处理消息的能力不同,存在处理快慢的问题,我们就需要能者多劳,处理快的多处理,处理慢的少处理;

当有多个消费者时,我们的消息会被哪个消费者消费呢,我们又该如何均衡消费者消费信息的多少呢?

主要有两种模式:

1、轮询模式的分发:一个消费者一条,按均分配;

2、公平分发:根据消费者的消费能力进行公平分发,处理快的处理的多,处理慢的处理的少;按劳分配;

SpringBoot集成RabbitMq

Fanout (广播)模式

生产者:

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency> <dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
@Service
public class OrderService { @Autowired
private RabbitTemplate rabbitTemplate; //用户下单为例||把信息发送到MQ的消息队列中
public void makeOrder(String userID, String productID, int number) { //模拟用户下单生成的订单ID:UUID
String orderID = UUID.randomUUID().toString().replaceAll("-", "");
//通过消息队列完成消息分发(交换机,路由key|队列名,消息内容)
String exchangeName = "fanout_order_exchange";
String routingKey = "";
String msgText = "生成的订单ID:" + orderID;
rabbitTemplate.convertAndSend(exchangeName, routingKey, msgText);
}
}
/**
* @description: 声明fanout模式交换机
* @author: GuoTong
* @createTime: 2021-11-08 18:28
* @since JDK 1.8 OR 11
**/
@Configuration
public class RabbitConfig { //声明交换机类型
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("fanout_order_exchange", true, false);
} //声明队列
@Bean
public Queue smsQueue() {
//名字和是否持久队列
return new Queue("sms.fanout.queue", true);
} @Bean
public Queue emailQueue() {
//名字和是否持久队列
return new Queue("email.fanout.queue", true);
} @Bean
public Queue phoneQueue() {
//名字和是否持久队列
return new Queue("phone.fanout.queue", true);
} //绑定关系交换机与队列
@Bean
public Binding smsBinding() {
return BindingBuilder.bind(smsQueue()).to(fanoutExchange());
} @Bean
public Binding emailBinding() {
return BindingBuilder.bind(emailQueue()).to(fanoutExchange());
} @Bean
public Binding phoneBinding() {
return BindingBuilder.bind(phoneQueue()).to(fanoutExchange());
}
}

消费者:

使用一个监听器,监听一个就消费一个队列:消费多个可以重写多个

@RabbitListener(queues = {
"email.fanout.queue"
})
@Service
public class FanoutConsumerEmail { @RabbitHandler
public void smsMessageReadByMqQueue(String msg) {
System.out.println("消费者消费了MQ的队列email.fanout.queue的消息-》" + msg);
}
}

设置过期时间:

设置消息的过期时间,过期了会进入死信队列

 //设置消息的过期时间
MessagePostProcessor messagePostProcessor = message -> {
message.getMessageProperties().setExpiration("5000");
message.getMessageProperties().setContentEncoding("UTF-8");
return message;
};
rabbitTemplate.convertAndSend(exchangeName,routingKey2,msgText,messagePostProcessor);

设置队列的过期时间

  //声明队列
@Bean
public Queue smsDirectQueue() {
//设置队列的过期时间||时间到了就自动消费移出
Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", 5);//单位秒
//名字和是否持久队列
return new Queue("sms.direct.queue", true, false, false, args);
}

死信队列:消息过期进入死信队列

   @Bean
public Queue smsDirectQueue() {
//设置队列的过期时间||时间到了就自动消费移出
Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", 5);//单位秒
//设置过期数据迁移到死信队列
args.put("x-dead-letter-exchange","dead_direct_exchange");
args.put("x-dead-letter-routing-key","dead_routing_key");
//名字和是否持久队列
return new Queue("sms.direct.queue", true, false, false, args);
}

分布式事务的解决方案有哪些?

  • 基于Lcn解决分布式事务
  • 基于阿里巴巴seata解决分布式事务
  • 基于RabbitMq解决分布式事务
  • 基于RocketMq解决分布式事务

最新文章

  1. Fis3的前端模块化之路[基础篇]
  2. Spring的数据库开发
  3. docker学习(2) mac中docker-machine使用vmware fusion以及配置国内镜像加速
  4. .NET WEB项目的调试发布相关
  5. word中那些重要但是被人忽略的快捷键和长word文档的跳转
  6. Python多进程(1)——subprocess与Popen()
  7. oracle系列--第四篇 Oracle的卸载
  8. 【转】itunes connect 如何修改主要语言
  9. ZOJ 3903 Ant(公式推导)
  10. jquery1.9学习笔记 之选择器(基本元素四)
  11. Android拍照与相册选取图片
  12. Devpexpress 打印预览问题
  13. thinkPHP 空模块和空操作、前置操作和后置操作 详细介绍(十四)
  14. Django:之CMDB资源系统
  15. DHCP配置部分
  16. thinkjs—控制器方法名不能大写
  17. 【原创】1、简单理解微信小程序
  18. Microsoft Graph 桌面应用程序
  19. c语言变量及输入输出
  20. synchronized学习

热门文章

  1. filebeat客户端传输cisco日志到elasticsearch
  2. 安装Win 8.1 跳过输入密钥步骤
  3. Windows优先使用IPv4
  4. Java中的StringBuilder和StringBuffer适用场景
  5. Xcode模拟器simulators安装
  6. Docker MySql 查看版本的三种方法
  7. 文心ERNIE-ViLG,你的免费插图画师
  8. 玩转Google开源C++单元测试框架Google Test系列(gtest)之八 - 打造自己的单元测试框架
  9. 运行eeui项目不出现 WiFI真机同步 IP地址
  10. NodeJS 服务 Docker 镜像极致优化指北