SpringCloud Stream整合RabbitMQ3.5.0
2024-09-08 07:33:25
前言
本文章为单体项目,将消费者和生产者写在同一个项目中,介意者不用向下看了。
本文介绍三种应用方式:
1:普通整合RabbitMQ
2:消息分区
3:按条件消费(多个消费者只消费同一队列中满足自己条件的消息)
1:核心依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
<version>${spring.cloud.stream}</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
<version>${spring.cloud.stream}</version>
</dependency>
全部依赖:
项目结构图:
2:普通整合RabbitMQ
2.1:application.properties
spring.rabbitmq.host=192.168.1.218
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456
spring.cloud.stream.bindings.dev-exchange.destination=dev-exchange
spring.cloud.stream.bindings.dev-exchange.group=dev-queue
spring.cloud.stream.bindings.dev-exchange.content-type=application/json
spring.cloud.stream.bindings.dev-exchange.consumer.concurrency=1
spring.cloud.stream.bindings.dev-exchange.consumer.max-attempts=1
2.2:定义生产者和消费者接口
import com.boot.rabbitmq.constance.MQConstants;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
public interface RabbitStream {
/**
* 消息流入(消费)
**/
@Input(MQConstants.DEV_EXCHANGE)
SubscribableChannel devConsumer();
/**
* 消息流出(生产)
**/
@Output(MQConstants.DEV_EXCHANGE)
MessageChannel devProducer();
}
2.3:生产者/消费者的具体实现
生产者代码:
@Component
@EnableBinding(RabbitStream.class)
public class DevProducer {
private static final Logger logger = LoggerFactory.getLogger(DevProducer.class);
private final RabbitStream rabbitStream;
public DevProducer(RabbitStream rabbitStream) {
this.rabbitStream = rabbitStream;
}
public void sendMsg(MQModel model) {
logger.info("producer:{}", JSON.toJSONString(model));
rabbitStream.devProducer()
.send(MessageBuilder.withPayload(model).build());
}
}
消费者代码:
@Component
@EnableBinding(RabbitStream.class)
public class DevListener {
private static final Logger logger = LoggerFactory.getLogger(DevListener.class);
@StreamListener(MQConstants.DEV_EXCHANGE)
public void receiveMsgAutoCommit(@Payload String payload) {
logger.info("consumer:{}", payload);
}
}
controller代码:
@PostMapping(value = "/dev")
public void dev(@RequestBody MQModel model) {
devProducer.sendMsg(model);
}
2.4:测试
发送请求:
控制台日志:
3:消息分区
3.1:概念
所谓消息分区就是将一个大队列拆分成多个小队列,然后分解成 producer-A -> queue-A -> Consumer-A 的一种场景。
3.2:如何在项目中使用
1:不需要改很多东西,只需要添加少部分配置即可
## RabbitMQ 消息分区配置
spring.cloud.stream.bindings.partition-exchange.destination=partition-exchange
spring.cloud.stream.bindings.partition-exchange.group=partition-queue
spring.cloud.stream.bindings.partition-exchange.content-type=application/json
spring.cloud.stream.bindings.partition-exchange.consumer.concurrency=1
spring.cloud.stream.bindings.partition-exchange.consumer.max-attempts=1
## 消息分区
spring.cloud.stream.bindings.partition-exchange.consumer.partitioned=true
## 分区数量
spring.cloud.stream.bindings.partition-exchange.producer.partition-count=2
## 机器下标,最大值=partition-count-1
spring.cloud.stream.instance-index=0
## 分区策略表达式
spring.cloud.stream.bindings.partition-exchange.producer.partition-key-expression=payload.mid
然后消息的路由的时候会从payload拿到mid进行条件运算:
mid/2=1则放在应用队列下标为1的队列,mid/2=0则放在队列下标为0的队列。
消息的入队前会计算出该消息应该进入哪个队列,源码截图:
可以看到开启分区之后,payload 的类型不是String,而是具备键值对的实体对象。
4:条件消费
4.1:概念
对同一个队列中的消息按条件进行划分再派发给不同的消费者。
4.2:匹配条件讲解
消息实体:
除了可以用payload中的数据进行匹配条件外,headers中的数据也可以作为条件。
4.3:测试
效果
本文GitHub地址
个人理解,不精之处望指出。
最新文章
- AgilePoint实例属性修改
- #研发解决方案#discache-分布式缓存查询与管理系统
- select/poll/epoll on serial port
- JS重要知识点
- 修改ssh的访问端口号
- ubuntun pptpd
- unity 4.x 从入门到精通(持续更新)
- MemoryMappedFile 内存映射文件 msdn
- uva 2218 Triathlon
- Office 2013 正式版 下载地址 带正版验证
- 成都传智职工high翻竞赛场
- debian服务器上不了网,缺少默认网关
- 自动化测试(二) 单元测试junit的Test注解突然不能使用原因以及解决方案
- Android 源代码结构
- Elasticsearch倒排索引结构
- Zookeeper本地模式安装
- p3168 [CQOI2015]任务查询系统(差分+主席树)
- java 线程池线程忙碌且阻塞队列也满了时给一个拒接的详细报告
- 【安装Ecshop2.7.2网站(LAMP环境)】--实践
- 【hive】时间段为五分钟的统计
热门文章
- 【Java】构造方法
- 通过HBase Observer同步数据到ElasticSearch
- Yarn参数优化(Fair Scheduler版本)
- HDU - 6761 Minimum Index (字符串,Lyndon分解)
- Codeforces Global Round 9 B. Neighbor Grid
- Codeforces Global Round 7 B. Maximums(逻辑)
- 2019牛客暑期多校训练营(第三场)B题、H题
- How many integers can you find HDU - 1796 容斥原理
- fzu2200 cleaning
- hdu5247 找连续数