我们知道,当微服务越来越来多的时候,仅仅是feign的http调用方式已经满足不了我们的使用场景了。这个时候系统就需要接入消息中间件了。相比较于传统的Spring项目、SpringBoot项目使用消息中间件的很多配置不同,SpringCloud Stream抽象了中间件产品的不同,在SpringCloud中你仅仅需要修改几行配置文件就可以灵活的切换中间件产品而不需要修改任何代码。

现在我们以SpringCloud Stream整合RabbitMQ为例来学习一下

创建生产者

1. 引入依赖

1
2
3
4
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

2. 定义配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
spring:
cloud:
stream:
binders:
test:
type: rabbit
environment:
spring:
rabbitmq:
addresses: 10.0.20.132
port: 5672
username: root
password: root
virtual-host: /unicode-pay
bindings:
testOutPut:
destination: testRabbit
content-type: application/json
default-binder: test

现在来解释一下这些配置的含义

  1. binders: 这是一组binder的集合,这里配置了一个名为test的binder,这个binder中是包含了一个rabbit的连接信息
  2. bindings:这是一组binding的集合,这里配置了一个名为testOutPut的binding,这个binding中配置了指向名test的binder下的一个交换机testRabbit。
  3. 扩展: 如果我们项目中不仅集成了rabbit还集成了kafka那么就可以新增一个类型为kafka的binder、如果项目中会使用多个交换机那么就使用多个binding,

3.创建通道

1
2
3
4
5
6
7
8
public interface  MqMessageSource {

    String TEST_OUT_PUT = "testOutPut";

    @Output(TEST_OUT_PUT)
MessageChannel testOutPut(); }

这个通道的名字就是上方binding的名字

4. 发送消息

1
2
3
4
5
6
7
8
9
10
11
12
@EnableBinding(MqMessageSource.class)
public class MqMessageProducer {
@Autowired
@Output(MqMessageSource.TEST_OUT_PUT)
private MessageChannel channel; public void sendMsg(String msg) {
channel.send(MessageBuilder.withPayload(msg).build());
System.err.println("消息发送成功:"+msg);
}
}

这里就是使用上方的通道来发送到指定的交换机了。需要注意的是withPayload方法你可以传入任何类型的对象,但是需要实现序列化接口

5. 创建测试接口

EnableBinding注解绑定的类默认是被Spring管理的,我们可以在controller中注入它

1
2
3
4
5
6
7
8
@Autowired
private MqMessageProducer mqMessageProducer; @GetMapping(value = "/testMq")
public String testMq(@RequestParam("msg")String msg){
mqMessageProducer.sendMsg(msg);
return "发送成功";
}

生产者的代码到此已经完成了。

创建消费者

1. 引入依赖

1
2
3
4
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

2. 定义配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
spring:
cloud:
stream:
binders:
test:
type: rabbit
environment:
spring:
rabbitmq:
addresses: 10.0.20.132
port: 5672
username: root
password: root
virtual-host: /unicode-pay
bindings:
testInPut:
destination: testRabbit
content-type: application/json
default-binder: test

这里与生产者唯一不同的地方就是testIntPut了,相信你已经明白了,它是binding的名字,也是通道与交换机绑定的关键

3.创建通道

1
2
3
4
5
6
7
8
public interface  MqMessageSource {

    String TEST_IN_PUT = "testInPut";

    @Input(TEST_IN_PUT)
SubscribableChannel testInPut(); }

4. 接受消息

1
2
3
4
5
6
7
8
@EnableBinding(MqMessageSource.class)
public class MqMessageConsumer {
@StreamListener(MqMessageSource.TEST_IN_PUT)
public void messageInPut(Message<String> message) {
System.err.println(" 消息接收成功:" + message.getPayload());
} }

这个时候启动Eureka、消息生产者和消费者,然后调用生产者的接口应该就可以接受到来自mq的消息了。

GitHub地址:https://github.com/shiyujun/spring-cloud-demo。代码所在模块:cloud-demo-consumer,cloud-demo-provider-2

如果对您有所帮助,请记得帮忙点一个star哦

本文出自http://zhixiang.org.cn,转载请保留

最新文章

  1. ubuntu安装mysql--PC端
  2. Display HTML in WPF and CefSharp
  3. Swift:函数和闭包
  4. 15个实用的Linux find命令示例
  5. SQL中采用Newtonsoft.Json处理json字符串
  6. Delphi_MemoryModule — load DLL from memory. Also includes hooking utilities.
  7. 图片切割工具---产生多个div切割图片 采用for和一的二维阵列设置背景位置
  8. ScrollView 嵌套ListView 幻灯冲突,和显示不全
  9. J2ee技术难点
  10. 程序员的视角:java 线程
  11. linux操作系统的前世今生
  12. LOJ#2665 树的计数
  13. CCF CSP 201412-1 门禁系统
  14. mybatis_09关联查询_一对一
  15. Elasticsearch&#160;Elasticsearch入门指导
  16. c# WPF RichTextBox 文字颜色
  17. cxf+spring+restful简单接口搭建
  18. java 虚拟机栈
  19. 安卓界面之Toolbar+tablayout+viewpager仿WhatsApp界面样式
  20. Pollard_Rho大数分解模板题 pku-2191

热门文章

  1. thinkphp5实现多级控制器
  2. CentOS7 查看操作系统版本信息
  3. java进阶系列之装饰器模式
  4. 响应式web设计(一)
  5. kubenetes安装记录和要点
  6. Django ORM 常用字段和参数
  7. 大白书中无向图的点双联通分量(BCC)模板的分析与理解
  8. EmWin 字体相关函数
  9. union: redis config
  10. [Ionic] Error: No provider for Http! Error: No provider for Http!