前言

点击进入Spring官网文档

本文章为单体项目,将消费者和生产者写在同一个项目中,介意者不用向下看了。

本文介绍三种应用方式:

1:普通整合RabbitMQ

2:消息分区

3:按条件消费(多个消费者只消费同一队列中满足自己条件的消息)

1:核心依赖

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
<version>${spring.cloud.stream}</version>
</dependency> <dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
<version>${spring.cloud.stream}</version>
</dependency>

全部依赖:

项目结构图:

2:普通整合RabbitMQ

2.1:application.properties

spring.rabbitmq.host=192.168.1.218
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456
spring.cloud.stream.bindings.dev-exchange.destination=dev-exchange
spring.cloud.stream.bindings.dev-exchange.group=dev-queue
spring.cloud.stream.bindings.dev-exchange.content-type=application/json
spring.cloud.stream.bindings.dev-exchange.consumer.concurrency=1
spring.cloud.stream.bindings.dev-exchange.consumer.max-attempts=1

2.2:定义生产者和消费者接口

import com.boot.rabbitmq.constance.MQConstants;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel; public interface RabbitStream { /**
* 消息流入(消费)
**/
@Input(MQConstants.DEV_EXCHANGE)
SubscribableChannel devConsumer(); /**
* 消息流出(生产)
**/
@Output(MQConstants.DEV_EXCHANGE)
MessageChannel devProducer();
}

2.3:生产者/消费者的具体实现

生产者代码:

@Component
@EnableBinding(RabbitStream.class)
public class DevProducer { private static final Logger logger = LoggerFactory.getLogger(DevProducer.class); private final RabbitStream rabbitStream; public DevProducer(RabbitStream rabbitStream) {
this.rabbitStream = rabbitStream;
} public void sendMsg(MQModel model) {
logger.info("producer:{}", JSON.toJSONString(model));
rabbitStream.devProducer()
.send(MessageBuilder.withPayload(model).build());
}
}

消费者代码:

@Component
@EnableBinding(RabbitStream.class)
public class DevListener { private static final Logger logger = LoggerFactory.getLogger(DevListener.class); @StreamListener(MQConstants.DEV_EXCHANGE)
public void receiveMsgAutoCommit(@Payload String payload) {
logger.info("consumer:{}", payload);
} }

controller代码:

    @PostMapping(value = "/dev")
public void dev(@RequestBody MQModel model) {
devProducer.sendMsg(model);
}

2.4:测试

发送请求:

控制台日志:

3:消息分区

3.1:概念

所谓消息分区就是将一个大队列拆分成多个小队列,然后分解成 producer-A -> queue-A -> Consumer-A 的一种场景。

3.2:如何在项目中使用

1:不需要改很多东西,只需要添加少部分配置即可

## RabbitMQ 消息分区配置
spring.cloud.stream.bindings.partition-exchange.destination=partition-exchange
spring.cloud.stream.bindings.partition-exchange.group=partition-queue
spring.cloud.stream.bindings.partition-exchange.content-type=application/json
spring.cloud.stream.bindings.partition-exchange.consumer.concurrency=1
spring.cloud.stream.bindings.partition-exchange.consumer.max-attempts=1
## 消息分区
spring.cloud.stream.bindings.partition-exchange.consumer.partitioned=true
## 分区数量
spring.cloud.stream.bindings.partition-exchange.producer.partition-count=2
## 机器下标,最大值=partition-count-1
spring.cloud.stream.instance-index=0
## 分区策略表达式
spring.cloud.stream.bindings.partition-exchange.producer.partition-key-expression=payload.mid

然后消息的路由的时候会从payload拿到mid进行条件运算:

mid/2=1则放在应用队列下标为1的队列,mid/2=0则放在队列下标为0的队列。

消息的入队前会计算出该消息应该进入哪个队列,源码截图:



可以看到开启分区之后,payload 的类型不是String,而是具备键值对的实体对象。

4:条件消费

4.1:概念



对同一个队列中的消息按条件进行划分再派发给不同的消费者。

4.2:匹配条件讲解

消息实体:



除了可以用payload中的数据进行匹配条件外,headers中的数据也可以作为条件。

4.3:测试



效果

本文GitHub地址

个人理解,不精之处望指出。

最新文章

  1. AgilePoint实例属性修改
  2. #研发解决方案#discache-分布式缓存查询与管理系统
  3. select/poll/epoll on serial port
  4. JS重要知识点
  5. 修改ssh的访问端口号
  6. ubuntun pptpd
  7. unity 4.x 从入门到精通(持续更新)
  8. MemoryMappedFile 内存映射文件 msdn
  9. uva 2218 Triathlon
  10. Office 2013 正式版 下载地址 带正版验证
  11. 成都传智职工high翻竞赛场
  12. debian服务器上不了网,缺少默认网关
  13. 自动化测试(二) 单元测试junit的Test注解突然不能使用原因以及解决方案
  14. Android 源代码结构
  15. Elasticsearch倒排索引结构
  16. Zookeeper本地模式安装
  17. p3168 [CQOI2015]任务查询系统(差分+主席树)
  18. java 线程池线程忙碌且阻塞队列也满了时给一个拒接的详细报告
  19. 【安装Ecshop2.7.2网站(LAMP环境)】--实践
  20. 【hive】时间段为五分钟的统计

热门文章

  1. 【Java】构造方法
  2. 通过HBase Observer同步数据到ElasticSearch
  3. Yarn参数优化(Fair Scheduler版本)
  4. HDU - 6761 Minimum Index (字符串,Lyndon分解)
  5. Codeforces Global Round 9 B. Neighbor Grid
  6. Codeforces Global Round 7 B. Maximums(逻辑)
  7. 2019牛客暑期多校训练营(第三场)B题、H题
  8. How many integers can you find HDU - 1796 容斥原理
  9. fzu2200 cleaning
  10. hdu5247 找连续数