引自B站楠哥:https://space.bilibili.com/434617924

一、创建父工程

创建父工程hello-spring-cloud-alibaba

Spring Cloud Alibaba 的环境在父工程中创建,微服务的各个组件作为子工程,继承父工程的环境。

Spring Boot ---》Spring Cloud ---》Spring Cloud Alibaba

引入依赖时注意提前查询一下spring cloud的版本依赖关系。

pom文件为,父工程打包时应打成pom,否则子工程构建时无法打包成功。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.0.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.yqd</groupId>
<artifactId>hello-spring-cloud-alibaba</artifactId>
<version>1.0.0-SNAPSHOT</version>
<name>hello-spring-cloud-alibaba</name>
<description>Demo project for hello-spring-cloud-alibaba</description>
<packaging>pom</packaging> <properties>
<java.version>1.8</java.version>
</properties> <dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency> <dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies> <dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Hoxton.SR3</version>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-dependencies</artifactId>
<version>2.2.1.RELEASE</version>
</dependency>
</dependencies>
</dependencyManagement> <build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build> </project>

父工程中只需要引入所需要的依赖就可以了,不用其他的资源,可以删除src目录和resources目录。

二、服务的提供者

在父工程中添加spring boot模块module,工程名字为provider。

pom文件为

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.yqd</groupId>
<artifactId>hello-spring-cloud-alibaba</artifactId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<groupId>com.yqd</groupId>
<artifactId>provider</artifactId>
<version>1.0.0-SNAPSHOT</version>
<name>provider</name>
<description>Demo project for Spring Boot</description> <properties>
<java.version>1.8</java.version>
</properties> <dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency> <dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
<version>2.2.1.RELEASE</version>
</dependency>
</dependencies> <build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build> </project>

在子工程中要引入spring-cloud-starter-alibaba-nacos-discovery依赖,这样才可以注册到nacos中。

配置文件application.yml为:

spring:
cloud:
nacos:
discovery:
server-addr: localhost:8848
application:
name: provider server:
port: 8081

做个接口供外部调用:

package com.yqd.controller;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController; @RestController
public class ProviderController { @Value("${server.port}") //SPEL
private String port; @GetMapping("/index")
public String index(){
return this.port;
}
}

下载解压nacos1.2.1,解压之后打开bin目录下的startup.cmd,然后通过url访问nacos,http://localhost:8848/nacos/index.html,登录的用户名和密码都默认是nocos。可以通过nacos查看注册的实例。

三、服务的消费者

在父工程中添加spring boot模块module,工程名字为consumer。

pom文件为

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.yqd</groupId>
<artifactId>hello-spring-cloud-alibaba</artifactId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<groupId>com.yqd</groupId>
<artifactId>consumer</artifactId>
<version>1.0.0-SNAPSHOT</version>
<name>consumer</name>
<description>Demo project for Spring Boot</description> <properties>
<java.version>1.8</java.version>
</properties> <dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency> <dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
<version>2.2.1.RELEASE</version>
</dependency>
</dependencies> <build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build> </project>

在子工程中要引入spring-cloud-starter-alibaba-nacos-discovery依赖,这样才可以注册到nacos中。但是消费者只需要到提供者中调用接口就可以了,因此可以不用在配置文件中配置nacos。

配置文件application.yml为:

server:
port: 8180

远程过程调用使用RestTemplate

package com.yqd.config;

import org.springframework.cloud.client.loadbalancer.LoadBalanced;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.web.client.RestTemplate; @Configuration
public class ConsumerConfiguration {
@Bean
@LoadBalanced //使用Ribbon进行远程调用
@Primary
public RestTemplate restTemplate(){
return new RestTemplate();
} //必须使用自己配置的这个Bean,并且使用名称注入,否则报错java.lang.IllegalStateException: No instances available for ...
@Bean
public RestTemplate restTemplateCustom(){
return new RestTemplate();
}
}

创建接口来从provider的接口中获取数据

package com.yqd.controller;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.discovery.DiscoveryClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.client.RestTemplate; import java.util.List;
import java.util.concurrent.ThreadLocalRandom; @RestController
@Slf4j
public class ConsumerController {
//自动装载,直接用
@Autowired
private DiscoveryClient discoveryClient;
//没有自动装载,不能直接用,需要用配置类装载
@Autowired(required = false) // 必须使用自己配置的这个Bean,并且使用名称注入,否则报错
@Qualifier("restTemplateCustom") // 表示根据名称来找bean
private RestTemplate restTemplateCustom; /**
* 返回服务实例
* @return
*/
@GetMapping("/instances")
public List<ServiceInstance> instances() {
List<ServiceInstance> provider = this.discoveryClient.getInstances("provider");
return provider;
} @GetMapping("/index")
public String index(){
//调用provider服务的index方法
//1、找到provider实例
List<ServiceInstance> list = this.discoveryClient.getInstances("provider");
int index = ThreadLocalRandom.current().nextInt(list.size());//返回一个随机数
ServiceInstance serviceInstance = list.get(index); //获取其中一个实例
String url = serviceInstance.getUri()+"/index";//URL=URI+请求路径
//2、调用
log.info("调用的端口是,{}", serviceInstance.getPort());
return "调用了端口为:"+serviceInstance.getPort()+"的服务,返回结果是:"+this.restTemplateCutom.getForObject(url, String.class);
}
}

四、使用Ribbon做负载均衡

配置类

@Configuration
public class ConsumerConfig { @Bean
@LoadBalanced
public RestTemplate restTemplate(){
return new RestTemplate();
} }
@RestController
public class ConsumerController { @Autowired
private RestTemplate restTemplate; @GetMapping("/index")
public String index(){
return "consumer远程调用provier:"+this.restTemplate.getForObject("http://provider/index", String.class);
} }

application.yml中配置,随机调用机器

server:
port: 8180
provider:
ribbon:
NFLoadBalancerRuleClassName: com.netflix.loadbalancer.RandomRule

Nacos 权重配置,在nacos中配置机器的权重,权重越高被访问的概率越大

package com.yqd.config;

import com.alibaba.cloud.nacos.NacosDiscoveryProperties;
import com.alibaba.cloud.nacos.ribbon.NacosServer;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.NamingService;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.netflix.client.config.IClientConfig;
import com.netflix.loadbalancer.AbstractLoadBalancerRule;
import com.netflix.loadbalancer.BaseLoadBalancer;
import com.netflix.loadbalancer.ILoadBalancer;
import com.netflix.loadbalancer.Server;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; @Slf4j
public class NacosWeightedRule extends AbstractLoadBalancerRule { @Autowired
private NacosDiscoveryProperties nacosDiscoveryProperties; @Override
public void initWithNiwsConfig(IClientConfig iClientConfig) {
//读取配置文件
} @Override
public Server choose(Object o) {
ILoadBalancer loadBalancer = this.getLoadBalancer();
BaseLoadBalancer baseLoadBalancer = (BaseLoadBalancer) loadBalancer;
//获取要请求的微服务名称
String name = baseLoadBalancer.getName();
//获取服务发现的相关API
NamingService namingService = nacosDiscoveryProperties.namingServiceInstance();
try {
Instance instance = namingService.selectOneHealthyInstance(name);
log.info("选择的实例是port={},instance={}",instance.getPort(),instance);
return new NacosServer(instance);
} catch (NacosException e) {
e.printStackTrace();
return null;
}
}
}
server:
port: 8180
provider:
ribbon:
NFLoadBalancerRuleClassName: com.yqd.config.NacosWeightedRule

五、Sentinel 服务限流降级

雪崩效应--->降级、限流、熔断

解决方案:

1、设置线程超时

2、设置限流

3、熔断器 Sentinel、Hystrix

1、在服务提供者provider项目的pom.xml中加入依赖

<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
<version>2.2.1.RELEASE</version>
</dependency> <dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>

2、在provider的application中增加配置

management:
endpoints:
web:
exposure:
include: '*'
spring:
cloud:
sentinel:
transport:
dashboard: localhost:8080

3、下载 Sentinel 控制台,解压,启动。访问地址:http://localhost:8080。用户名和密码都是sentinel。

4、启动nacos,再启动provider项目,访问provider中的接口,等待一会刷新会出现服务相关的信息。必须有请求时才可以监控到。

5.1 流控规则

直接限流:直接对某个接口限流

关联限流:对某个接口的访问会影响对另一个接口的访问

链路限流:可以通过链路入侵到其他层,比如controller层、service层

链路过滤相关代码:

1、在服务提供者provider项目的pom.xml中加入依赖

<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-core</artifactId>
<version>1.7.1</version>
</dependency> <dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-web-servlet</artifactId>
<version>1.7.1</version>
</dependency>

2、在provider的application中增加配置

spring:
cloud:
sentinel:
filter:
enabled: false

3、增加配置类

package com.southwind.configuration;

import com.alibaba.csp.sentinel.adapter.servlet.CommonFilter;
import org.springframework.boot.web.servlet.FilterRegistrationBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; @Configuration
public class FilterConfiguration { @Bean
public FilterRegistrationBean registrationBean(){
FilterRegistrationBean registrationBean = new FilterRegistrationBean();
registrationBean.setFilter(new CommonFilter());
registrationBean.addUrlPatterns("/*");
registrationBean.addInitParameter(CommonFilter.WEB_CONTEXT_UNIFY,"false");
registrationBean.setName("sentinelFilter");
return registrationBean;
}
}

4、Service

@Service
public class HelloService { @SentinelResource("test")
public void test(){
System.out.println("test");
}
}

5、Controller

@Autowired
private HelloService helloService; @GetMapping("/test1")
public String test1(){
this.helloService.test();
return "test1";
} @GetMapping("/test2")
public String test2(){
this.helloService.test();
return "test2";
}

6、访问接口http://localhost:8081/test1和http://localhost:8081/test2。然后再sentinel控制台簇点链路会看到两个接口test1和test2。对test1接口增加一个流控,设置单机阈值=1,流控模式魏链路,入口资源为/test1,新增。然后再去分别刷新访问两个接口,会发现test1接口刷新太快会限流,test2接口不会限流。

5.2流控效果

快速失败

直接抛出异常

Warm UP

给系统一个预热的时间,预热时间段内单机阈值较低,预热时间过后单机阈值增加,预热时间内当前的单机阈值是设置的阈值的三分之一,预热时间过后单机阈值恢复设置的值。

排队等待

当请求调用失败之后,不会立即抛出异常,等待下一次调用,时间范围是超时时间,在时间范围内如果能请求成功则不抛出异常,如果请求则抛出异常。

5.3 降级规则

RT

单个请求的响应时间超过阈值,则进入准降级状态,接下来 1 S 内连续 5 个请求响应时间均超过阈值,就进行降级,持续时间为时间窗口的值。

异常比例

每秒异常数量占通过量的比例大于阈值,就进行降级处理,持续时间为时间窗口的值。

异常数

1 分钟内的异常数超过阈值就进行降级处理,时间窗口的值要大于 60S,否则刚结束熔断又进入下一次熔断了。

5.4 热点规则

热点规则是流控规则的更细粒度操作,可以具体到对某个热点参数的限流,设置限流之后,如果带着限流参数的请求量超过阈值,则进行限流,时间为统计窗口时长。

必须要添加 @SentinelResource,即对资源进行流控。

@GetMapping("/hot")
@SentinelResource("hot")
public String hot(
@RequestParam(value = "num1",required = false) Integer num1,
@RequestParam(value = "num2",required = false) Integer num2){
return num1+"-"+num2;
}

5.5 授权规则

给指定的资源设置流控应用(追加参数),可以对流控应用进行访问权限的设置,具体就是添加白名单和黑名单。

如何给请求指定流控应用,通过实现 RequestOriginParser 接口来完成,代码如下所示。

package com.southwind.configuration;

import com.alibaba.csp.sentinel.adapter.servlet.callback.RequestOriginParser;
import org.springframework.util.StringUtils; import javax.servlet.http.HttpServletRequest; public class RequestOriginParserDefinition implements RequestOriginParser {
@Override
public String parseOrigin(HttpServletRequest httpServletRequest) {
String name = httpServletRequest.getParameter("name");
if(StringUtils.isEmpty(name)){
throw new RuntimeException("name is null");
}
return name;
}
}

要让 RequestOriginParserDefinition 生效,需要在配置类中进行配置。

package com.southwind.configuration;

import com.alibaba.csp.sentinel.adapter.servlet.callback.WebCallbackManager;
import org.springframework.context.annotation.Configuration; import javax.annotation.PostConstruct; @Configuration
public class SentinelConfiguration { @PostConstruct
public void init(){
WebCallbackManager.setRequestOriginParser(new RequestOriginParserDefinition());
}
}

5.6 自定义规则异常返回

创建异常处理类

package com.southwind.handler;

import com.alibaba.csp.sentinel.adapter.servlet.callback.UrlBlockHandler;
import com.alibaba.csp.sentinel.slots.block.BlockException;
import com.alibaba.csp.sentinel.slots.block.degrade.DegradeException;
import com.alibaba.csp.sentinel.slots.block.flow.FlowException; import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException; public class ExceptionHandler implements UrlBlockHandler {
@Override
public void blocked(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse, BlockException e) throws IOException {
httpServletResponse.setContentType("text/html;charset=utf-8");
String msg = null;
if(e instanceof FlowException){
msg = "限流";
}else if(e instanceof DegradeException){
msg = "降级";
}
httpServletResponse.getWriter().write(msg);
}
}

进行配置。

@Configuration
public class SentinelConfiguration { @PostConstruct
public void init(){
WebCallbackManager.setUrlBlockHandler(new ExceptionHandler());
}
}

六、整合 RocketMQ

6.1 安装 RocketMQ

1、传入 Linux 服务器

2、解压缩,把rocketmq的文件夹放到/usr/local/目录下

unzip rocketmq-all-4.7.1-bin-release.zip

3、切换到rockmq的目录中,启动 NameServer

nohup ./bin/mqnamesrv &

4、检查是否启动成功

netstat -an | grep 9876

5、启动 Broker

启动之前需要编辑配置文件,修改 JVM 内存设置,默认给的内存 4 GB,超过我们的 JVM 了。

cd bin
vim runserver.sh

改成如下参数:

JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
vim runbroker.sh

改成如下参数:

JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"

启动 Broker

nohup ./mqbroker -n localhost:9876 &

可以查看日志

tail -f ~/logs/rocketmqlogs/broker.log

启动成功

6、测试 RocketMQ

消息发送

cd bin
export NAMESRV_ADDR=localhost:9876
./tools.sh org.apache.rocketmq.example.quickstart.Producer

消息接收

cd bin
export NAMESRV_ADDR=localhost:9876
./tools.sh org.apache.rocketmq.example.quickstart.Consumer

7、关闭 RocketMQ

cd bin
./mqshutdown broker
./mqshutdown namesrv

6.2 安装 RocketMQ 控制台

1、把rocketmq的客户端文件解压缩,进入目录rocketmq-externals-rocketmq-console-1.0.0\rocketmq-console\src\main\resources,修改配置application.yml,打包

mvn clean package -Dmaven.test.skip=true

2、进入 target 启动 jar

java -jar rocketmq-console-ng-1.0.0.jar

打开浏览器访问 localhost:9877,如果报错

这是因为我们的 RocketMQ 安装在 Linux 中,控制台在 windows,Linux 需要开放端口才能访问,开放 10909 和 9876 端口

firewall-cmd --zone=public --add-port=9876/tcp --permanent
firewall-cmd --zone=public --add-port=10909/tcp --permanent
firewall-cmd --zone=public --add-port=10911/tcp --permanent
systemctl restart firewalld.service
firewall-cmd --reload

重新启动控制台项目

6.3 Java 实现消息发送

1、pom.xml 中引入依赖

<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.0</version>
</dependency>

2、生产消息

package com.southwind;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message; public class Test {
public static void main(String[] args) throws Exception {
//创建消息生产者
DefaultMQProducer producer = new DefaultMQProducer("myproducer-group");
//设置NameServer
producer.setNamesrvAddr("192.168.248.129:9876");
//启动生产者
producer.start();
//构建消息对象
Message message = new Message("myTopic","myTag",("Test MQ").getBytes());
//发送消息
SendResult result = producer.send(message, 1000);
System.out.println(result);
//关闭生产者
producer.shutdown();
}
}

3、直接运行,如果报错 sendDefaultImpl call timeout,可以开放 10911 端口

firewall-cmd --zone=public --add-port=10911/tcp --permanent
systemctl restart firewalld.service
firewall-cmd --reload

打开 RocketMQ 控制台,可查看消息。

6.4 Java 实现消息消费

package com.southwind.service;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt; import java.util.List; @Slf4j
public class ConsumerTest {
public static void main(String[] args) throws MQClientException {
//创建消息消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("myconsumer-group");
//设置NameServer
consumer.setNamesrvAddr("192.168.248.129:9876");
//指定订阅的主题和标签
consumer.subscribe("myTopic","*");
//回调函数
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
log.info("Message=>{}",list);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动消费者
consumer.start();
}
}

6.5 Spring Boot 整合 RocketMQ

provider中添加内容

1、pom.xml

<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.7.0</version>
</dependency>

2、application.yml

rocketmq:
name-server: 192.168.248.129:9876
producer:
group: myprovider

3、Order

package com.southwind.entity;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor; import java.util.Date; @Data
@AllArgsConstructor
@NoArgsConstructor
public class Order {
private Integer id;
private String buyerName;
private String buyerTel;
private String address;
private Date createDate;
}

4、Controller

@Autowired
private RocketMQTemplate rocketMQTemplate; @GetMapping("/create")
public Order create(){
Order order = new Order(
1,
"张三",
"123123",
"软件园",
new Date()
);
this.rocketMQTemplate.convertAndSend("myTopic",order);
return order;
}

consumer中添加内容

1、pom.xml

<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.7.0</version>
</dependency>

2、application.yml

rocketmq:
name-server: 192.168.248.129:9876

3、Service

@Slf4j
@Service
@RocketMQMessageListener(consumerGroup = "myConsumer",topic = "myTopic")
public class SmsService implements RocketMQListener<Order> {
@Override
public void onMessage(Order order) {
log.info("新订单{},发短信",order);
}
}

先启动provider,再启动consumer。然后调用provider的接口产生消息:http://localhost:8081/create,消息会存入rocketmq服务器中,consumer一直监听着服务器,当服务器新存入消息的时候就从服务器中读取消息。可能会遇到无法读取消息的情况,可以重启一下consumer。

七、服务网关

Spring Cloud Gateway 是基于 Netty,跟 Servlet 不兼容,所以你的工程中不能出现 Servlet 的组件 。

1、pom.xml

注意,一定不能出现 spring web 的依赖,因为 Gateway 与 Servlet 不兼容。因此需要把spring-boot-starter-web依赖从父工程中添加到需要它的子工程中,这样gateway中就不会依赖到它了。

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-gateway</artifactId>
</dependency>

2、application.yml

server:
port: 8010
spring:
application:
name: gateway
cloud:
gateway:
discovery:
locator:
enabled: true
routes:
- id: provider_route
uri: http://localhost:8081
predicates:
- Path=/provider/**
filters:
- StripPrefix=1

上面这种做法其实没有用到 nacos ,现在我们让 gateway 直接去 nacos 中发现服务,配置更加简单了。

1、pom.xml 引入 nacos

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-gateway</artifactId>
</dependency> <dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>

2、application.yml

server:
port: 8010
spring:
application:
name: gateway
cloud:
gateway:
discovery:
locator:
enabled: true

7.1 Gateway 限流

7.1.1 基于路由限流

1、pom.xml

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-gateway</artifactId>
</dependency> <dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-spring-cloud-gateway-adapter</artifactId>
</dependency>

2、配置类

package com.southwind.configuration;

import com.alibaba.csp.sentinel.adapter.gateway.common.rule.GatewayFlowRule;
import com.alibaba.csp.sentinel.adapter.gateway.common.rule.GatewayRuleManager;
import com.alibaba.csp.sentinel.adapter.gateway.sc.SentinelGatewayFilter;
import com.alibaba.csp.sentinel.adapter.gateway.sc.callback.BlockRequestHandler;
import com.alibaba.csp.sentinel.adapter.gateway.sc.callback.GatewayCallbackManager;
import com.alibaba.csp.sentinel.adapter.gateway.sc.exception.SentinelGatewayBlockExceptionHandler;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.codec.ServerCodecConfigurer;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.server.ServerResponse;
import org.springframework.web.reactive.result.view.ViewResolver;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono; import javax.annotation.PostConstruct;
import java.util.*; @Configuration
public class GatewayConfiguration {
private final List<ViewResolver> viewResolvers;
private final ServerCodecConfigurer serverCodecConfigurer; public GatewayConfiguration(ObjectProvider<List<ViewResolver>> viewResolversProvider,
ServerCodecConfigurer serverCodecConfigurer) {
this.viewResolvers = viewResolversProvider.getIfAvailable(Collections::emptyList);
this.serverCodecConfigurer = serverCodecConfigurer;
} //配置限流的异常处理
@Bean
@Order(Ordered.HIGHEST_PRECEDENCE)
public SentinelGatewayBlockExceptionHandler sentinelGatewayBlockExceptionHandler() {
return new SentinelGatewayBlockExceptionHandler(viewResolvers, serverCodecConfigurer);
} //配置初始化的限流参数
@PostConstruct
public void initGatewayRules(){
Set<GatewayFlowRule> rules = new HashSet<>();
rules.add(
new GatewayFlowRule("provider_route")
.setCount(1)
.setIntervalSec(1)
);
GatewayRuleManager.loadRules(rules);
} //初始化限流过滤器
@Bean
@Order(Ordered.HIGHEST_PRECEDENCE)
public GlobalFilter sentinelGatewayFilter() {
return new SentinelGatewayFilter();
} //自定义限流异常页面
@PostConstruct
public void initBlockHandlers(){
BlockRequestHandler blockRequestHandler = new BlockRequestHandler() {
@Override
public Mono<ServerResponse> handleRequest(ServerWebExchange serverWebExchange, Throwable throwable) {
Map map = new HashMap();
map.put("code",0);
map.put("msg","被限流了");
return ServerResponse.status(HttpStatus.OK)
.contentType(MediaType.APPLICATION_JSON)
.body(BodyInserters.fromObject(map));
}
};
GatewayCallbackManager.setBlockHandler(blockRequestHandler);
}
}

3、application.yml

server:
port: 8010
spring:
application:
name: gateway
cloud:
gateway:
discovery:
locator:
enabled: true
routes:
- id: provider_route
uri: http://localhost:8081
predicates:
- Path=/provider/**
filters:
- StripPrefix=1

7.1.2 基于 API 分组限流

1、修改配置类,添加基于 API 分组限流的方法,修改初始化的限流参数

package com.southwind.configuration;

import com.alibaba.csp.sentinel.adapter.gateway.common.SentinelGatewayConstants;
import com.alibaba.csp.sentinel.adapter.gateway.common.api.ApiDefinition;
import com.alibaba.csp.sentinel.adapter.gateway.common.api.ApiPathPredicateItem;
import com.alibaba.csp.sentinel.adapter.gateway.common.api.ApiPredicateItem;
import com.alibaba.csp.sentinel.adapter.gateway.common.api.GatewayApiDefinitionManager;
import com.alibaba.csp.sentinel.adapter.gateway.common.rule.GatewayFlowRule;
import com.alibaba.csp.sentinel.adapter.gateway.common.rule.GatewayRuleManager;
import com.alibaba.csp.sentinel.adapter.gateway.sc.SentinelGatewayFilter;
import com.alibaba.csp.sentinel.adapter.gateway.sc.callback.BlockRequestHandler;
import com.alibaba.csp.sentinel.adapter.gateway.sc.callback.GatewayCallbackManager;
import com.alibaba.csp.sentinel.adapter.gateway.sc.exception.SentinelGatewayBlockExceptionHandler;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.codec.ServerCodecConfigurer;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.server.ServerResponse;
import org.springframework.web.reactive.result.view.ViewResolver;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono; import javax.annotation.PostConstruct;
import java.util.*; @Configuration
public class GatewayConfiguration { private final List<ViewResolver> viewResolvers;
private final ServerCodecConfigurer serverCodecConfigurer; public GatewayConfiguration(ObjectProvider<List<ViewResolver>> viewResolversProvider,
ServerCodecConfigurer serverCodecConfigurer) {
this.viewResolvers = viewResolversProvider.getIfAvailable(Collections::emptyList);
this.serverCodecConfigurer = serverCodecConfigurer;
} //配置限流的异常处理
@Bean
@Order(Ordered.HIGHEST_PRECEDENCE)
public SentinelGatewayBlockExceptionHandler sentinelGatewayBlockExceptionHandler() {
return new SentinelGatewayBlockExceptionHandler(viewResolvers, serverCodecConfigurer);
} //配置初始化的限流参数
@PostConstruct
public void initGatewayRules(){
Set<GatewayFlowRule> rules = new HashSet<>();
rules.add(new GatewayFlowRule("provider_api1").setCount(1).setIntervalSec(1));
rules.add(new GatewayFlowRule("provider_api2").setCount(1).setIntervalSec(1));
GatewayRuleManager.loadRules(rules);
} //初始化限流过滤器
@Bean
@Order(Ordered.HIGHEST_PRECEDENCE)
public GlobalFilter sentinelGatewayFilter() {
return new SentinelGatewayFilter();
} //自定义限流异常页面
@PostConstruct
public void initBlockHandlers(){
BlockRequestHandler blockRequestHandler = new BlockRequestHandler() {
@Override
public Mono<ServerResponse> handleRequest(ServerWebExchange serverWebExchange, Throwable throwable) {
Map map = new HashMap();
map.put("code",0);
map.put("msg","被限流了");
return ServerResponse.status(HttpStatus.OK)
.contentType(MediaType.APPLICATION_JSON)
.body(BodyInserters.fromObject(map));
}
};
GatewayCallbackManager.setBlockHandler(blockRequestHandler);
} //自定义API分组
@PostConstruct
private void initCustomizedApis(){
Set<ApiDefinition> definitions = new HashSet<>();
ApiDefinition api1 = new ApiDefinition("provider_api1")
.setPredicateItems(new HashSet<ApiPredicateItem>(){{
add(new ApiPathPredicateItem().setPattern("/provider/api1/**")
.setMatchStrategy(SentinelGatewayConstants.URL_MATCH_STRATEGY_PREFIX));
}});
ApiDefinition api2 = new ApiDefinition("provider_api2")
.setPredicateItems(new HashSet<ApiPredicateItem>(){{
add(new ApiPathPredicateItem().setPattern("/provider/api2/demo1"));
}});
definitions.add(api1);
definitions.add(api2);
GatewayApiDefinitionManager.loadApiDefinitions(definitions);
}
}

2、在provider的Controller中添加方法

@GetMapping("/api1/demo1")
public String demo1(){
return "demo";
} @GetMapping("/api1/demo2")
public String demo2(){
return "demo";
} @GetMapping("/api2/demo1")
public String demo3(){
return "demo";
} @GetMapping("/api2/demo2")
public String demo4(){
return "demo";
}

7.1.3 基于 Nacos 服务发现组件进行限流

server:
port: 8010
spring:
application:
name: gateway
cloud:
gateway:
discovery:
locator:
enabled: true

API 分组代码修改,改为 discovery 中的服务名。

ApiDefinition api2 = new ApiDefinition("provider_api2")
.setPredicateItems(new HashSet<ApiPredicateItem>(){{
add(new ApiPathPredicateItem().setPattern("/p1/api2/demo1"));
}});

八、分布式事务

8.1 模拟分布式事务异常

1、创建两个工程 order、pay,pom.xml

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency> <dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>

2、建两个数据库 order、pay,两个微服务分别访问。

order数据库里面建一个表orders,字段有id和username。

pay数据库里面建一个表pay,字段有id和username。

3、分别写两个服务的 application.yml

server:
port: 8010
spring:
application:
name: order
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
username: root
password: 123456
url: jdbc:mysql://localhost:3306/order?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=UTC
server:
port: 8020
spring:
application:
name: pay
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
username: root
password: 123456
url: jdbc:mysql://localhost:3306/pay?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=UTC

4、分别写两个 Service

package com.southwind.service;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service; @Service
public class OrderService {
@Autowired
private JdbcTemplate jdbcTemplate; public void save(){
this.jdbcTemplate.update("insert into orders(username) values ('张三')");
}
}
package com.southwind.service;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service; @Service
public class PayService {
@Autowired
private JdbcTemplate jdbcTemplate; public void save(){
this.jdbcTemplate.update("insert into pay(username) values ('张三')");
}
}

5、控制器 Order 通过 RestTemplate 调用 Pay 的服务

package com.southwind.controller;

import com.southwind.service.OrderService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.client.RestTemplate; @RestController
public class OrderController { @Autowired
private OrderService orderService;
@Autowired
private RestTemplate restTemplate; @GetMapping("/save")
public String save(){
//订单
this.orderService.save();
int i = 10/0;
//支付
this.restTemplate.getForObject("http://localhost:8020/save",String.class);
return "success";
}
}
package com.southwind.controller;

import com.southwind.service.PayService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController; @RestController
public class PayController {
@Autowired
private PayService payService; @GetMapping("/save")
public String save(){
this.payService.save();
return "success";
}
}

6、启动类

package com.southwind;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.web.client.RestTemplate; @SpringBootApplication
public class OrderApplication { public static void main(String[] args) {
SpringApplication.run(OrderApplication.class, args);
} @Bean
public RestTemplate restTemplate(){
return new RestTemplate();
}
}
package com.southwind;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication
public class PayApplication { public static void main(String[] args) {
SpringApplication.run(PayApplication.class, args);
} }

分布式异常模拟结束,Order 存储完成之后,出现异常,会导致 Pay 无法存储,但是 Order 数据库不会进行回滚。

8.2 Seata 解决

1、下载

2、解压,进入到conf文件夹下修改两个文件

regisry.conf

registry {
type = "nacos"
nacos {
serverAddr = "localhost"
namespace = "public"
cluster = "default"
}
} config {
type = "nacos"
nacos {
serverAddr = "localhost"
namespace = "public"
cluster = "default"
}
}

nacos-config.txt

3、启动 Nacos,通过git bish的命令行窗口来运行 nacos-config.sh,将 Seata 配置导入 Nacos

进入 conf,右键 Git Bash Here

cd conf
sh nacos-config.sh 127.0.0.1

执行成功,刷新 Nacos,配置加入

nacos-config.txt 配置已生效

4、启动 Seata Server, JDK 8 以上环境无法启动

cd bin
seata-server.bat -p 8090 -m file

启动成功,Nacos 注册成功。

Seata 服务环境搭建完毕,接下来去应用中添加。

1、初始化数据库,在两个数据库中添加事务日志记录表,SQL Seata 已经提供。复制conf文件夹下db_undo_log.sql中的建表语句分别在order数据库和pay数据库中新建undo_log表。

2、直接在两个数据库运行脚本。

CREATE TABLE `undo_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`branch_id` bigint(20) NOT NULL,
`xid` varchar(100) NOT NULL,
`context` varchar(128) NOT NULL,
`rollback_info` longblob NOT NULL,
`log_status` int(11) NOT NULL,
`log_created` datetime NOT NULL,
`log_modified` datetime NOT NULL,
`ext` varchar(100) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

3、两个工程的 pom.xml 添加 Seata 组件和 Nacos Config 组件。

<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
<version>2.1.1.RELEASE</version>
</dependency> <dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency>

4、给 JDBCTemplate 添加代理数据源,因为本地数据源无法通知两个不同的数据库使用代理数据源才可以

package com.southwind;

import io.seata.rm.datasource.DataSourceProxy;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.web.client.RestTemplate; import javax.sql.DataSource; @SpringBootApplication
public class OrderApplication { public static void main(String[] args) {
SpringApplication.run(OrderApplication.class, args);
} @Bean
public RestTemplate restTemplate(){
return new RestTemplate();
} @Bean
public JdbcTemplate jdbcTemplate(DataSource dataSource){
return new JdbcTemplate(new DataSourceProxy(dataSource));
}
}
package com.southwind;

import io.seata.rm.datasource.DataSourceProxy;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.jdbc.core.JdbcTemplate; import javax.sql.DataSource; @SpringBootApplication
public class PayApplication { public static void main(String[] args) {
SpringApplication.run(PayApplication.class, args);
} @Bean
public JdbcTemplate jdbcTemplate(DataSource dataSource){
return new JdbcTemplate(new DataSourceProxy(dataSource));
} }

5、将 registry.conf 复制到两个工程的 resources 下。

6、给两个工程添加 bootstrap.yml 读取 Nacos 配置。

spring:
application:
name: order
cloud:
nacos:
config:
server-addr: localhost:8848
namespace: public
group: SEATA_GROUP
alibaba:
seata:
tx-service-group: ${spring.application.name}
spring:
application:
name: pay
cloud:
nacos:
config:
server-addr: localhost:8848
namespace: public
group: SEATA_GROUP
alibaba:
seata:
tx-service-group: ${spring.application.name}

tx-service-group 需要和 Nacos 配置中的名称一致。

7、在 Order 调用 Pay 处添加注解 @GlobalTransactional

package com.southwind.controller;

import com.southwind.service.OrderService;
import io.seata.spring.annotation.GlobalTransactional;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.client.RestTemplate; @RestController
public class OrderController { @Autowired
private OrderService orderService;
@Autowired
private RestTemplate restTemplate; @GetMapping("/save")
@GlobalTransactional
public String save(){
//订单
this.orderService.save();
int i = 10/0;
//支付
this.restTemplate.getForObject("http://localhost:8020/save",String.class);
return "success";
}
}

最新文章

  1. $.extend()的实现源码 --(源码学习1)
  2. AppBox升级进行时 - 拥抱Entity Framework的Code First开发模式
  3. php正则,删除字符串中的中英文标点符号
  4. c# HttpWebRequest 与 HttpWebResponse
  5. Fragment碎片的创建和动态更新
  6. mac上安装opencv3
  7. 深入浅出设计模式——工厂方法模式(Factory Method)
  8. jquery 动态生成html后click事件不触发原因
  9. kindEditor使用注意事项
  10. HTML中解决双击会选中文本的问题
  11. 【笨嘴拙舌WINDOWS】GDI绘制区域
  12. URLEncode转json
  13. asp.net pagebase获取缓存的方法
  14. JAVA中,JSON MAP LIST的相互转换
  15. nginx请求体读取
  16. Ubuntu packages multi-architectures
  17. Ognl值栈对象及struts标签
  18. php二进制流文件
  19. Linux查看和修改文件时间
  20. Facebook的一些基本操作(网页版)

热门文章

  1. 简丽Framework-开篇
  2. Python利用openpyxl带格式统计数据(1)- 处理excel数据
  3. Kafka Eagle 管理平台
  4. Echarts数据可视化,easyshu图表集成。
  5. Queue的使用说明
  6. IDEA关联mysql失败Server returns invalid timezone. Go to &#39;Advanced&#39; tab and set &#39;serverTimezon&#39;
  7. java 多态 向上造型
  8. MySQL不会丢失数据的秘密,就藏在它的 7种日志里
  9. Solon rpc 之 SocketD 协议 - 消息鉴权模式
  10. P4292 [WC2010]重建计划 点分治+单调队列