1.介绍

官网:https://www.springcloud.cc/spring-cloud-dalston.html#_spring_cloud_stream

1.1定义

是一个构建消息驱动微服务的框架,主要是屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型。目前只支持RabbitMQ和Kafka。

1.2设计思想

通过定义绑定器(Binder)作为中间层,实现了应用程序与消息中间件细节之间的隔离。inout对应消费者,output对应生产者。通信方式采用了发布-订阅的模式。

2.项目开发

源代码:https://github.com/zhongyushi-git/cloud-stream.git

2.1环境搭建

1)创建一个SpringBoot的项目,导入相关的依赖。详见源码

2)添加一个服务注册中心eureka。详见源码

3)需要依赖RabbitMQ,因此需要先安装它。

2.2消息驱动之生产者8801

1)创建模块stream-rabbitmq-provider8801,导入依赖

 <dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency> <dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency> <dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

2)创建启动类

3)yml配置

server:
port: 8801 spring:
application:
name: cloud-stream-provider
cloud:
stream:
# 在此处配置要绑定的rabbitMQ的服务信息
binders:
# 表示定义的名称,用于binding的整合
defaultRabbit:
# 消息中间件类型
type: rabbit
# 设置rabbitMQ的相关环境配置
environment:
spring:
rabbitmq:
host: 192.168.51.5
port: 5672
username: guest
password: guest
# 服务的整合处理
bindings:
# 这个名字是一个通道的名称
output:
# 表示要使用的exchange名称定义
destination: studyExchange
# 设置消息类型,本次为json,文本则设为text/plain
content-type: application/json
# 设置要绑定的消息服务的具体设置
binder: defaultRabbit eureka:
client:
service-url:
defaultZone: http://localhost:7001/eureka
instance:
lease-renewal-interval-in-seconds: 2 # 设置心跳的间隔时间,默认30
lease-expiration-duration-in-seconds: 5 # 超过5秒间隔,默认90
instance-id: send-8001.com # 主机名
prefer-ip-address: true # 显示ip

4)创建发送消息接口

package com.zys.cloud.service;

public interface IMessageProvider {
/**
* 消息发送
* @return
*/
String send();
}

5)创建发送接口实现类

package com.zys.cloud.service.impl;
import com.zys.cloud.service.IMessageProvider;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import javax.annotation.Resource;
import java.util.UUID; //这不是传统的service,这是和rabbitmq打交道的,不需要加注解@Service
//信道channel和exchange绑定在一起
@EnableBinding(Source.class)
public class MessageProviderImpl implements IMessageProvider { /**
* 消息发送管道
*/
@Resource
private MessageChannel output; @Override
public String send() {
String serial = UUID.randomUUID().toString();
output.send(MessageBuilder.withPayload(serial).build());
System.out.println("serial = " + serial);
return null;
}
}

6)创建controller

package com.zys.cloud.controller;
import com.zys.cloud.service.IMessageProvider;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource; @RestController
public class SendMessageController { @Resource
private IMessageProvider messageProvider; @GetMapping("/sendMessage")
public String sendMessage(){
return messageProvider.send();
}
}

7)测试

先启动7001,然后是RabbitMQ,再启动8801,访问http://localhost:8801/sendMessage可以在控制台看到输出的内容。

2.3消息驱动之消费者8803

1)创建模块stream-rabbitmq-,导入依赖

 <dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency> <dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency> <dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

2)创建启动类

3)yml配置

server:
port: 8803 spring:
application:
name: cloud-stream-consumer
cloud:
stream:
binders: # 在此处配置要绑定的rabbitMQ的服务信息
defaultRabbit: # 表示定义的名称,用于binding的整合
type: rabbit # 消息中间件类型
environment: # 设置rabbitMQ的相关环境配置
spring:
rabbitmq:
host: 192.168.51.5
port: 5672
username: guest
password: guest
bindings: # 服务的整合处理
input: # 这个名字是一个通道的名称
destination: studyExchange # 表示要使用的exchange名称定义
content-type: application/json # 设置消息类型,本次为json,文本则设为text/plain
binder: defaultRabbit # 设置要绑定的消息服务的具体设置 eureka:
client:
service-url:
defaultZone: http://localhost:7001/eureka
instance:
lease-renewal-interval-in-seconds: 2 # 设置心跳的间隔时间,默认30
lease-expiration-duration-in-seconds: 5 # 超过5秒间隔,默认90
instance-id: receive-8802.com #主机名
prefer-ip-address: true # 显示ip

4)创建controller

package com.zys.cloud.controller;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.RestController; @Component
@EnableBinding(Sink.class)
public class ReceiveMessageListenerController {
@Value("${server.port}")
private String serverPort; @StreamListener(Sink.INPUT)
public void input(Message<String> message){
System.out.println("消费者1号,------->接收到的消息: "+message.getPayload()+"\t port: "+serverPort);
}
}

5)测试

先启动7001,然后是RabbitMQ,再启动8801,最后启动8803,访问http://localhost:8801/sendMessage可以在控制台看到8801,8803输出的内容

按照8803再创建8804模块,配置同上。

2.4服务重复消费

1)在8804和8804启动后,发现他们都在消费消息,但是一般消息在被消费后就不能再被消费了。

解决办法很简单,只需要配置相同分组即可。

最新文章

  1. 【转】GPU 与CPU的作用协调,工作流程、GPU整合到CPU得好处
  2. session超时设置
  3. ajax 无刷新分页
  4. LingPipe-TextClassification(文本分类)
  5. 【转】周末班LR笔记总结—新手入门必备
  6. Number Sequence HDU 1711 KMP 模板
  7. webpack2使用ch7-loader解析css 自动添加浏览器前缀
  8. TCP的核心系列 — ACK的处理(二)
  9. 天津联通新兴ICT业务工程师面试经历
  10. 关于ASL(平均查找长度)的简单总结
  11. [Swift]LeetCode563. 二叉树的坡度 | Binary Tree Tilt
  12. Java中的String,StringBuilder,StringBuffer三者的区别(转载)
  13. APPLE-SA-2019-3-25-4 Safari 12.1
  14. JavaScript进阶系列1:performace和console.time性能测试
  15. Java ME之Android开发从入门到精通
  16. 理解和使用ThreadLocal类
  17. 爬虫(猫眼电影+校花网+github+今日头条+拉钩)
  18. Python知识梳理
  19. putty登陆sourceforge.net(设置登录)
  20. Centos6.8 Mysql5.6 安装配置教程(转)

热门文章

  1. php开发扩展环境的搭建(Windows)
  2. CCF CSP 202012-2 期末预测之最佳阈值
  3. PIE模型
  4. HDU6370 Werewolf 【基环内向树】
  5. cf 1305 E. Kuroni and the Score Distribution
  6. Codeforces Round #652 (Div. 2) C. RationalLee 贪心
  7. 哈希算法解决:HDU1686 &amp;&amp; POJ2774 &amp;&amp; POJ3261
  8. CodeForces - 721C 拓扑排序+dp
  9. hdu5402 Travelling Salesman Problem
  10. 已处理证书链,但是在不受信任提供程序信任的根证书中终止 - Windows 7安装.Net Framework 4.6.2时出现此问题