Spring Cloud Stream是一个建立在Spring Boot和Spring Integration之上的框架,有助于创建事件驱动或消息驱动的微服务。
通过它可以更方便的访问消息服务,如消费Rabbitmq的消息示例如下:
添加Spring Cloud Stream与RabbitMQ消息中间件的依赖。
<dependency>

<groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency>

配置通道关联的destination,对应rabbitmq的exchange名称。
spring:
cloud:
stream:
bindings:
input:
destination: mqTestDefault
        output:
destination: mqTestDefault
contentType: text/plain

destination:指定了消息获取的目的地 exchange,这里的exchange就是 mqTestDefault。这里配置应用输入、输出的destination相同,实际应用是input或output中的一方。

@SpringBootApplication@EnableBinding(Processor.class)
public class MyLoggerServiceApplication {public static void main(String[] args) {
SpringApplication.run(MyLoggerServiceApplication.class, args);
}

@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public LogMessage enrichLogMessage(LogMessage log) {
return new LogMessage(String.format("[1]: %s", log.getMessage()));
}
}

启动后,默认是会创建一个临时队列,临时队列绑定的exchange为 “mqTestDefault”,routing key为 “#”。
实际使用中,我们需要一个持久化的队列,并且指定一个分组,用于保证应用服务的缩放。

只需要在消费者端的 binding 添加配置项 spring.cloud.stream.bindings.[channelName].group = XXX 。对应的队列就是持久化,并且名称为:mqTestOrder.XXX。

如果我们需要进一步根据 routing key 来进行区分消息投递的目的地,或者消息接受,需要进一步配,Spring Cloud Stream 也提供了相关配置:

spring:
cloud:
stream:
bindings:
inputProductAdd:
destination: mqTestProduct
group: addProductHandler # 拥有 group 默认会持久化队列
outputProductAdd:
destination: mqTestProduct
rabbit:
bindings:
inputProductAdd:
consumer:
bindingRoutingKey: addProduct.* # 用来绑定消费者的 routing key
outputProductAdd:
producer:
routing-key-expression: '''addProduct.*''' # 需要用这个来指定 RoutingKey

常用配置

给消费者设置消费组和主题

  1. 设置消费组: spring.cloud.stream.bindings.<通道名>.group=<消费组名>
  2. 设置主题: spring.cloud.stream.bindings.<通道名>.destination=<主题名>

给生产者指定通道的主题:spring.cloud.stream.bindings.<通道名>.destination=<主题名>

消费者开启分区,指定实例数量与实例索引

  1. 开启消费分区: spring.cloud.stream.bindings.<通道名>.consumer.partitioned=true
  2. 消费实例数量: spring.cloud.stream.instanceCount=1 (具体指定)
  3. 实例索引: spring.cloud.stream.instanceIndex=1 #设置当前实例的索引值

生产者指定分区键

  1. 分区键: spring.cloud.stream.bindings.<通道名>.producer.partitionKeyExpress=<分区键>
  2. 分区数量: spring.cloud.stream.bindings.<通道名>.producer.partitionCount=<分区数量>
一般最简单的应用只要配置spring.cloud.stream.bindings.开头的项即可,如果涉及到

最新文章

  1. SpringMVC中使用Cron表达式的定时器
  2. iptables日志探秘
  3. Postgres Plus Advanced Server installation
  4. 冲刺阶段 day 8
  5. CSS纯样式实现箭头、对话框等形状
  6. 解决浏览器使用&lt;pre&gt;&lt;/pre&gt;时不换行
  7. Codeforces Round #297 (Div. 2)
  8. AIX 第7章 指令记录
  9. git记住用户名密码
  10. Keil的c语言编译器
  11. Java GC 概念摘要
  12. 最短路径之Dijkstras算法(图片格式)
  13. 主成分分析算法(PCA)
  14. Redis安装部署教程
  15. Zabbix监控文件是否存在/文件大小
  16. Hadoop.之.入门部署
  17. Java sun.misc.unsafe类
  18. 下拉刷新 上拉更多 支持ListView GridView WebView【转载】
  19. Chrome 制作绿色便携版
  20. orzdba_monitor.sh脚本使用

热门文章

  1. - 反编译 AndroidKiller 逆向 实践案例 MD
  2. js获取简单表单对象(1)
  3. Identity和IdentityServer的区别及联系
  4. dapper.common新增概念object to sql
  5. JVM故障分析系列之四:jstack生成的Thread Dump日志线程状态
  6. ASP.NET SignalR 系列(七)之服务端触发推送
  7. window当mac用,VirtualBox虚拟机安装os系统
  8. Java自学-类和对象 单例模式
  9. python基础--数据结构之字典
  10. Python学习日记(三十六) Mysql数据库篇 四