文章很长,建议收藏起来,慢慢读! 疯狂创客圈为小伙伴奉上以下珍贵的学习资源:


推荐2:史上最全 Java 面试题 21 个专题

史上最全 Java 面试题 21 个专题 阿里、京东、美团、头条.... 随意挑、横着走!!!
1: JVM面试题(史上最强、持续更新、吐血推荐) https://www.cnblogs.com/crazymakercircle/p/14365820.html
2:Java基础面试题(史上最全、持续更新、吐血推荐) https://www.cnblogs.com/crazymakercircle/p/14366081.html
4:设计模式面试题 (史上最全、持续更新、吐血推荐) https://www.cnblogs.com/crazymakercircle/p/14367101.html
5:架构设计面试题 (史上最全、持续更新、吐血推荐) https://www.cnblogs.com/crazymakercircle/p/14367907.html
还有 21篇必刷、必刷 的面试题 更多 ....., 请参见【疯狂创客圈 高并发 总目录

推荐3: 疯狂创客圈 高质量 博文

springCloud 高质量 博文
nacos 实战(史上最全) sentinel (史上最全+入门教程)
springcloud + webflux 高并发实战 Webflux(史上最全)
SpringCloud gateway (史上最全) TCP/IP图解 (史上最全)
10分钟看懂, Java NIO 底层原理 Feign原理 (图解)
更多精彩博文 ..... 请参见【疯狂创客圈 高并发 总目录

前言

webmvc和webflux作为spring framework的两个重要模块,代表了两个IO模型,阻塞式和非阻塞式的。

webmvc是基于servlet的阻塞式模型(一般称为oio),一个请求到达服务器后会单独分配一个线程去处理请求,如果请求包含IO操作,线程在IO操作结束之前一直处于阻塞等待状态,这样线程在等待IO操作结束的时间就浪费了。

webflux是基于reactor的非阻塞模型(一般称为nio),同样,请求到达服务器后也会分配一个线程去处理请求,如果请求包含IO操作,线程在IO操作结束之前不再是处于阻塞等待状态,而是去处理其他事情,等到IO操作结束之后,再通知(得益于系统的机制)线程继续处理请求。

这样线程就有效地利用了IO操作所消耗的时间。

WebFlux 增删改查完整实战 demo

Dao层 (又称 repository 层)

entity(又称 PO对象)

新建User 对象 ,代码如下:



package com.crazymaker.springcloud.reactive.user.info.entity;

import com.crazymaker.springcloud.reactive.user.info.dto.User;

import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
import javax.persistence.Table; @Entity
@Table(name = "t_user")
public final class UserEntity extends User
{ @Id
@Column(name = "id")
@GeneratedValue(strategy = GenerationType.IDENTITY)
@Override
public long getUserId()
{
return super.getUserId();
} @Column(name = "name")
public String getName()
{
return super.getName();
}
}

Dao 实现类

@Repository 用于标注数据访问组件,即 DAO 组件。实现代码中使用名为 repository 的 Map 对象作为内存数据存储,并对对象具体实现了具体业务逻辑。JpaUserRepositoryImpl 负责将 PO 持久层(数据操作)相关的封装组织,完成新增、查询、删除等操作。



package com.crazymaker.springcloud.reactive.user.info.dao.impl;

import com.crazymaker.springcloud.reactive.user.info.dto.User;
import org.springframework.stereotype.Repository; import javax.persistence.EntityManager;
import javax.persistence.PersistenceContext;
import javax.persistence.Query;
import javax.transaction.Transactional;
import java.util.List; @Repository
@Transactional
public class JpaUserRepositoryImpl
{ @PersistenceContext
private EntityManager entityManager; public Long insert(final User user)
{
entityManager.persist(user);
return user.getUserId();
} public void delete(final Long userId)
{
Query query = entityManager.createQuery("DELETE FROM UserEntity o WHERE o.userId = ?1");
query.setParameter(1, userId);
query.executeUpdate();
} @SuppressWarnings("unchecked")
public List<User> selectAll()
{
return (List<User>) entityManager.createQuery("SELECT o FROM UserEntity o").getResultList();
} @SuppressWarnings("unchecked")
public User selectOne(final Long userId)
{
Query query = entityManager.createQuery("SELECT o FROM UserEntity o WHERE o.userId = ?1");
query.setParameter(1, userId);
return (User) query.getSingleResult();
}
}

Service服务层


package com.crazymaker.springcloud.reactive.user.info.service.impl; import com.crazymaker.springcloud.common.util.BeanUtil;
import com.crazymaker.springcloud.reactive.user.info.dao.impl.JpaUserRepositoryImpl;
import com.crazymaker.springcloud.reactive.user.info.dto.User;
import com.crazymaker.springcloud.reactive.user.info.entity.UserEntity;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional; import javax.annotation.Resource;
import java.util.List; @Slf4j
@Service
@Transactional
public class JpaEntityServiceImpl
{ @Resource
private JpaUserRepositoryImpl userRepository; @Transactional
//增加用户
public User addUser(User dto)
{
User userEntity = new UserEntity();
userEntity.setUserId(dto.getUserId());
userEntity.setName(dto.getName());
userRepository.insert(userEntity);
BeanUtil.copyProperties(userEntity,dto);
return dto;
} @Transactional
//删除用户
public User delUser(User dto)
{
userRepository.delete(dto.getUserId());
return dto;
} //查询全部用户
public List<User> selectAllUser()
{
log.info("方法 selectAllUser 被调用了"); return userRepository.selectAll();
} //查询一个用户
public User selectOne(final Long userId)
{ log.info("方法 selectOne 被调用了"); return userRepository.selectOne(userId);
} }

Controller控制层

Spring Boot WebFlux也可以使用注解模式来进行API接口开发。

package com.crazymaker.springcloud.reactive.user.info.controller;

import com.crazymaker.springcloud.common.result.RestOut;
import com.crazymaker.springcloud.reactive.user.info.dto.User;
import com.crazymaker.springcloud.reactive.user.info.service.impl.JpaEntityServiceImpl;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import javax.annotation.Resource; /**
* Mono 和 Flux 适用于两个场景,即:
* Mono:实现发布者,并返回 0 或 1 个元素,即单对象。
* Flux:实现发布者,并返回 N 个元素,即 List 列表对象。
* 有人会问,这为啥不直接返回对象,比如返回 City/Long/List。
* 原因是,直接使用 Flux 和 Mono 是非阻塞写法,相当于回调方式。
* 利用函数式可以减少了回调,因此会看不到相关接口。这恰恰是 WebFlux 的好处:集合了非阻塞 + 异步
*/
@Slf4j
@Api(value = "用户信息、基础学习DEMO", tags = {"用户信息DEMO"})
@RestController
@RequestMapping("/api/user")
public class UserReactiveController
{ @ApiOperation(value = "回显测试", notes = "提示接口使用者注意事项", httpMethod = "GET")
@RequestMapping(value = "/hello")
@ApiImplicitParams({
@ApiImplicitParam(paramType = "query", dataType="string",dataTypeClass = String.class, name = "name",value = "名称", required = true)})
public Mono<RestOut<String>> hello(@RequestParam(name = "name") String name)
{
log.info("方法 hello 被调用了"); return Mono.just(RestOut.succeed("hello " + name));
} @Resource
JpaEntityServiceImpl jpaEntityService; @PostMapping("/add/v1")
@ApiOperation(value = "插入用户" )
@ApiImplicitParams({
// @ApiImplicitParam(paramType = "body", dataType="java.lang.Long", name = "userId", required = false),
// @ApiImplicitParam(paramType = "body", dataType="用户", name = "dto", required = true)
@ApiImplicitParam(paramType = "body",dataTypeClass = User.class, dataType="User", name = "dto", required = true),
})
// @ApiImplicitParam(paramType = "body", dataType="com.crazymaker.springcloud.reactive.user.info.dto.User", required = true)
public Mono<User> userAdd(@RequestBody User dto)
{
//命令式写法
// jpaEntityService.delUser(dto); //响应式写法
return Mono.create(cityMonoSink -> cityMonoSink.success(jpaEntityService.addUser(dto)));
} @PostMapping("/del/v1")
@ApiOperation(value = "响应式的删除")
@ApiImplicitParams({
@ApiImplicitParam(paramType = "body", dataType="User",dataTypeClass = User.class,name = "dto", required = true),
})
public Mono<User> userDel(@RequestBody User dto)
{
//命令式写法 // jpaEntityService.delUser(dto); //响应式写法 return Mono.create(cityMonoSink -> cityMonoSink.success(jpaEntityService.delUser(dto)));
} @PostMapping("/list/v1")
@ApiOperation(value = "查询用户")
public Flux<User> listAllUser()
{
log.info("方法 listAllUser 被调用了"); //命令式写法 改为响应式 以下语句,需要在流中执行
// List<User> list = jpaEntityService.selectAllUser();
//响应式写法
Flux<User> userFlux = Flux.fromIterable(jpaEntityService.selectAllUser());
return userFlux;
} @PostMapping("/detail/v1")
@ApiOperation(value = "响应式的查看")
@ApiImplicitParams({
@ApiImplicitParam(paramType = "body", dataTypeClass = User.class,dataType="User", name = "dto", required = true),
})
public Mono<User> getUser(@RequestBody User dto)
{
log.info("方法 getUser 被调用了"); //构造流
Mono<User> userMono = Mono.justOrEmpty(jpaEntityService.selectOne(dto.getUserId()));
return userMono;
} @PostMapping("/detail/v2")
@ApiOperation(value = "命令式的查看")
@ApiImplicitParams({
@ApiImplicitParam(paramType = "body", dataType="User",dataTypeClass = User.class, name = "dto", required = true),
}) public RestOut<User> getUserV2(@RequestBody User dto)
{
log.info("方法 getUserV2 被调用了"); User user = jpaEntityService.selectOne(dto.getUserId());
return RestOut.success(user);
} }

从返回值可以看出,Mono 和 Flux 适用于两个场景,即:

  • Mono:实现发布者,并返回 0 或 1 个元素,即单对象
  • Flux:实现发布者,并返回 N 个元素,即 List 列表对象

有人会问,这为啥不直接返回对象,比如返回 City/Long/List。原因是,直接使用 Flux 和 Mono 是非阻塞写法,相当于回调方式。利用函数式可以减少了回调,因此会看不到相关接口。这恰恰是 WebFlux 的好处:集合了非阻塞 + 异步。

Mono

Mono 是什么? 官方描述如下:A Reactive Streams Publisher with basic rx operators that completes successfully by emitting an element, or with an error.

Mono 是响应流 Publisher 具有基础 rx 操作符。可以成功发布元素或者错误。如图所示:

file

Mono 常用的方法有:

  • Mono.create():使用 MonoSink 来创建 Mono
  • Mono.justOrEmpty():从一个 Optional 对象或 null 对象中创建 Mono。
  • Mono.error():创建一个只包含错误消息的 Mono
  • Mono.never():创建一个不包含任何消息通知的 Mono
  • Mono.delay():在指定的延迟时间之后,创建一个 Mono,产生数字 0 作为唯一值

Flux

Flux 是什么? 官方描述如下:A Reactive Streams Publisher with rx operators that emits 0 to N elements, and then completes (successfully or with an error).

Flux 是响应流 Publisher 具有基础 rx 操作符。可以成功发布 0 到 N 个元素或者错误。Flux 其实是 Mono 的一个补充。如图所示:

file

所以要注意:如果知道 Publisher 是 0 或 1 个,则用 Mono。

Flux 最值得一提的是 fromIterable 方法。 fromIterable(Iterable<? extends T> it) 可以发布 Iterable 类型的元素。当然,Flux 也包含了基础的操作:map、merge、concat、flatMap、take,这里就不展开介绍了。

使用配置模式进行WebFlux 接口开发

1 可以编写一个处理器类 Handler代替 Controller , Service 、dao层保持不变。

2 配置请求的路由

处理器类 Handler

处理器类 Handler需要从请求解析参数,并且封装响应,代码如下:

package com.crazymaker.springcloud.reactive.user.info.config.handler;

import com.crazymaker.springcloud.common.exception.BusinessException;
import com.crazymaker.springcloud.reactive.user.info.dto.User;
import com.crazymaker.springcloud.reactive.user.info.service.impl.JpaEntityServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import javax.annotation.Resource; import static org.springframework.http.MediaType.APPLICATION_JSON_UTF8;
import static org.springframework.web.reactive.function.server.ServerResponse.ok; @Slf4j
@Component
public class UserReactiveHandler
{ @Resource
private JpaEntityServiceImpl jpaEntityService; /**
* 得到所有用户
*
* @param request
* @return
*/
public Mono<ServerResponse> getAllUser(ServerRequest request)
{
log.info("方法 getAllUser 被调用了");
return ok().contentType(APPLICATION_JSON_UTF8)
.body(Flux.fromIterable(jpaEntityService.selectAllUser()), User.class);
} /**
* 创建用户
*
* @param request
* @return
*/
public Mono<ServerResponse> createUser(ServerRequest request)
{
// 2.0.0 是可以工作, 但是2.0.1 下面这个模式是会报异常
Mono<User> user = request.bodyToMono(User.class);
/**Mono 使用响应式的,时候都是一个流,是一个发布者,任何时候都不能调用发布者的订阅方法
也就是不能消费它, 最终的消费还是交给我们的Springboot来对它进行消费,任何时候不能调用它的
user.subscribe();
不能调用block
把异常放在统一的地方来处理
*/ return user.flatMap(dto ->
{
// 校验代码需要放在这里
if (StringUtils.isBlank(dto.getName()))
{
throw new BusinessException("用户名不能为空");
} return ok().contentType(APPLICATION_JSON_UTF8)
.body(Mono.create(cityMonoSink -> cityMonoSink.success(jpaEntityService.addUser(dto))), User.class);
});
} /**
* 根据id删除用户
*
* @param request
* @return
*/
public Mono<ServerResponse> deleteUserById(ServerRequest request)
{
String id = request.pathVariable("id");
// 校验代码需要放在这里
if (StringUtils.isBlank(id))
{
throw new BusinessException("id不能为空");
}
User dto = new User();
dto.setUserId(Long.parseLong(id));
return ok().contentType(APPLICATION_JSON_UTF8)
.body(Mono.create(cityMonoSink -> cityMonoSink.success(jpaEntityService.delUser(dto))), User.class);
} }

路由配置

package com.crazymaker.springcloud.reactive.user.info.config;

import com.crazymaker.springcloud.reactive.user.info.config.handler.UserReactiveHandler;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.MediaType;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.RouterFunctions;
import org.springframework.web.reactive.function.server.ServerResponse;
import org.springframework.web.server.WebFilter; import static org.springframework.web.reactive.function.server.RequestPredicates.DELETE;
import static org.springframework.web.reactive.function.server.RequestPredicates.GET;
import static org.springframework.web.reactive.function.server.RequestPredicates.POST;
import static org.springframework.web.reactive.function.server.RequestPredicates.accept; @Configuration
public class RoutersConfig
{ @Bean
RouterFunction<ServerResponse> routes(UserReactiveHandler handler)
{ // 下面的相当于类里面的 @RequestMapping
// 得到所有用户
return RouterFunctions.route(GET("/user"), handler::getAllUser)
// 创建用户
.andRoute(POST("/user").and(accept(MediaType.APPLICATION_JSON_UTF8)), handler::createUser)
// 删除用户
.andRoute(DELETE("/user/{id}"), handler::deleteUserById);
} @Value("${server.servlet.context-path}")
private String contextPath; //处理上下文路径,没有上下文路径,此函数可以忽略
@Bean
public WebFilter contextPathWebFilter()
{
return (exchange, chain) ->
{
ServerHttpRequest request = exchange.getRequest(); String requestPath = request.getURI().getPath();
if (requestPath.startsWith(contextPath))
{
return chain.filter(
exchange.mutate()
.request(request.mutate().contextPath(contextPath).build())
.build());
}
return chain.filter(exchange);
};
}
}

集成Swagger

本文主要展示一下如何使用支持WebFlux的Swagger

maven依赖

        <dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>${swagger.version}</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-spring-webflux</artifactId>
<version>${swagger.version}</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
<version>${swagger.version}</version>
</dependency>
  • swagger.version目前是3.0.0,Spring 5引入了WebFlux,而当前版本的SpringFox Swagger2(2.9.2)还不支持WebFlux,得使用3.0.0才支持

swagger 配置

package com.crazymaker.springcloud.reactive.user.info.config;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import org.springframework.web.util.UriComponentsBuilder;
import springfox.documentation.PathProvider;
import springfox.documentation.builders.ApiInfoBuilder;
import springfox.documentation.builders.PathSelectors;
import springfox.documentation.builders.RequestHandlerSelectors;
import springfox.documentation.service.ApiInfo;
import springfox.documentation.service.Contact;
import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spring.web.paths.DefaultPathProvider;
import springfox.documentation.spring.web.paths.Paths;
import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger2.annotations.EnableSwagger2WebFlux; @Configuration
@EnableSwagger2WebFlux
public class SwaggerConfig
{ @Bean
public Docket createRestApi()
{
// return new Docket(DocumentationType.OAS_30)
return new Docket(DocumentationType.SWAGGER_2)
.apiInfo(apiInfo())
.pathMapping(servletContextPath) //注意webflux没有context-path配置,如果不加这句话的话,接口测试时路径没有前缀 .select()
.apis(RequestHandlerSelectors.basePackage("com.crazymaker.springcloud.reactive.user.info.controller"))
.paths(PathSelectors.any())
.build(); }
@Value("${server.servlet.context-path}")
private String servletContextPath; //构建 api文档的详细信息函数
private ApiInfo apiInfo()
{
return new ApiInfoBuilder()
//页面标题
.title("疯狂创客圈 springcloud + Nginx 高并发核心编程")
//描述
.description("Zuul+Swagger2 构建 RESTful APIs")
//条款地址
.termsOfServiceUrl("https://www.cnblogs.com/crazymakercircle/")
.contact(new Contact("疯狂创客圈", "https://www.cnblogs.com/crazymakercircle/", ""))
.version("1.0")
.build();
} /**
* 重写 PathProvider ,解决 context-path 重复问题
* @return
*/
@Order(Ordered.HIGHEST_PRECEDENCE)
@Bean
public PathProvider pathProvider() {
return new DefaultPathProvider() {
@Override
public String getOperationPath(String operationPath) {
operationPath = operationPath.replaceFirst(servletContextPath, "/");
UriComponentsBuilder uriComponentsBuilder = UriComponentsBuilder.fromPath("/");
return Paths.removeAdjacentForwardSlashes(uriComponentsBuilder.path(operationPath).build().toString());
} @Override
public String getResourceListingPath(String groupName, String apiDeclaration) {
apiDeclaration = super.getResourceListingPath(groupName, apiDeclaration);
return apiDeclaration;
}
};
}
}

测试

配置模式的 WebFlux Rest接口测试

配置模式的 WebFlux Rest接口只能使用PostMan测试,例子如下:

注意,不能带上下文路径:

http://192.168.68.1:7705/uaa-react-provider/user

注解模式的WebFlux Rest接口测试

swagger 增加界面

CRUD其他的界面,略过

配置大全

静态资源配置

@Configuration
@EnableWebFlux //使用注解@EnableWebFlux
public class WebFluxConfig implements WebFluxConfigurer { //继承WebFluxConfigurer
//配置静态资源
@Override
public void addResourceHandlers(ResourceHandlerRegistry registry) {
registry.addResourceHandler("/static/**")
.addResourceLocations("classpath:/static/");
registry.addResourceHandler("/file/**")
.addResourceLocations("file:" + System.getProperty("user.dir") + File.separator + "file" + File.separator);
registry.addResourceHandler("/swagger-ui.html**")
.addResourceLocations("classpath:/META-INF/resources/");
registry.addResourceHandler("/webjars/**")
.addResourceLocations("classpath:/META-INF/resources/webjars/");
}
//配置拦截器
//配置编解码
...
}

WebFluxSecurity配置

@Configuration
@EnableWebSecurity
public class WebMvcSecurityConfig extends WebSecurityConfigurerAdapter implements
AuthenticationEntryPoint, //未验证回调
AuthenticationSuccessHandler, //验证成功回调
AuthenticationFailureHandler, //验证失败回调
LogoutSuccessHandler { //登出成功回调 @Override
public void commence(HttpServletRequest request, HttpServletResponse response, AuthenticationException authException) throws IOException, ServletException {
sendJson(response, new Response<>(HttpStatus.UNAUTHORIZED.value(), "Unauthorized"));
} @Override
public void onAuthenticationFailure(HttpServletRequest request, HttpServletResponse response, AuthenticationException exception) throws IOException, ServletException {
sendJson(response, new Response<>(1, "Incorrect"));
} @Override
public void onAuthenticationSuccess(HttpServletRequest request, HttpServletResponse response, Authentication authentication) throws IOException, ServletException {
sendJson(response, new Response<>(0, authentication.getClass().getSimpleName()));
} @Override
public void onLogoutSuccess(HttpServletRequest request, HttpServletResponse response, Authentication authentication) throws IOException, ServletException {
sendJson(response, new Response<>(0, "Success"));
} @Override
protected void configure(HttpSecurity http) throws Exception {
http
.csrf()
.disable()
.authorizeRequests()
.antMatchers("/swagger*/**", "/webjars/**", "/v2/api-docs")
.permitAll()
.and()
.authorizeRequests()
.antMatchers("/static/**", "/file/**")
.permitAll()
.and()
.authorizeRequests()
.anyRequest()
.authenticated()
.and()
.logout()
.logoutUrl("/user/logout") //虚拟路径,不是控制器定义的路径
.logoutSuccessHandler(this)
.permitAll()
.and()
.exceptionHandling()
.authenticationEntryPoint(this)
.and()
.formLogin()
.usernameParameter("username")
.passwordParameter("password")
.loginProcessingUrl("/user/login") //虚拟路径,不是控制器定义的路径
.successForwardUrl("/user/login") //是控制器定义的路径
.failureHandler(this)
.and()
.httpBasic()
.authenticationEntryPoint(this);
} @Override
protected void configure(AuthenticationManagerBuilder auth) throws Exception {
auth.userDetailsService(userDetailService);
}

webflux-验证依赖于用户数据服务,需定义实现ReactiveUserDetailsService的Bean

@Configuration
@EnableWebFluxSecurity //使用注解@EnableWebFluxSecurity
public class WebFluxSecurityConfig implements
WebFilter, //拦截器
ServerLogoutSuccessHandler, //登出成功回调
ServerAuthenticationEntryPoint, //验证入口
ServerAuthenticationFailureHandler, //验证成功回调
ServerAuthenticationSuccessHandler { //验证失败回调
//实现接口的方法
@Override
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
//配置webflux的context-path
ServerHttpRequest request = exchange.getRequest();
if (request.getURI().getPath().startsWith(contextPath)) {
exchange = exchange.mutate().request(request.mutate().contextPath(contextPath).build()).build();
}
//把查询参数转移到FormData中,不然验证过滤器(ServerFormLoginAuthenticationConverter)接受不到参数
if (exchange.getRequest().getMethod() == HttpMethod.POST && exchange.getRequest().getQueryParams().size() > 0) {
ServerWebExchange finalExchange = exchange;
ServerWebExchange realExchange = new Decorator(exchange) {
@Override
public Mono<MultiValueMap<String, String>> getFormData() {
return super.getFormData().map(new Function<MultiValueMap<String, String>, MultiValueMap<String, String>>() {
@Override
public MultiValueMap<String, String> apply(MultiValueMap<String, String> stringStringMultiValueMap) {
if (stringStringMultiValueMap.size() == 0) {
return finalExchange.getRequest().getQueryParams();
} else {
return stringStringMultiValueMap;
}
}
});
}
};
return chain.filter(realExchange);
}
return chain.filter(exchange);
} @Override
public Mono<Void> onLogoutSuccess(WebFilterExchange webFilterExchange, Authentication authentication) {
return sendJson(webFilterExchange.getExchange(), new Response<>("登出成功"));
} @Override
public Mono<Void> commence(ServerWebExchange exchange, AuthenticationException e) {
return sendJson(exchange, new Response<>(HttpStatus.UNAUTHORIZED.value(), "未验证"));
} @Override
public Mono<Void> onAuthenticationFailure(WebFilterExchange webFilterExchange, AuthenticationException exception) {
return sendJson(webFilterExchange.getExchange(), new Response<>(1, "验证失败"));
} @Override
public Mono<Void> onAuthenticationSuccess(WebFilterExchange webFilterExchange, Authentication authentication) {
return webFilterExchange.getChain().filter(
webFilterExchange.getExchange().mutate()
.request(t -> t.method(HttpMethod.POST).path("/user/login")) //转发到自定义控制器
.build()
);
} @Bean
public SecurityWebFilterChain springSecurityFilterChain(ServerHttpSecurity http) {
http.addFilterAfter(this, SecurityWebFiltersOrder.FIRST)
.csrf().disable()
.authorizeExchange()
.pathMatchers("/swagger*/**", "/webjars/**", "/v2/api-docs") //swagger
.permitAll()
.and()
.authorizeExchange()
.pathMatchers("/static/**", "/file/**") //静态资源
.permitAll()
.and()
.authorizeExchange()
.anyExchange()
.authenticated()
.and()
.logout() //登出
.logoutUrl("/user/logout")
.logoutSuccessHandler(this)
.and()
.exceptionHandling() //未验证回调
.authenticationEntryPoint(this)
.and()
.formLogin()
.loginPage("/user/login")
.authenticationFailureHandler(this) //验证失败回调
.authenticationSuccessHandler(this) //验证成功回调
.and()
.httpBasic()
.authenticationEntryPoint(this); //basic验证,一般用于移动端
return http.build();
}
}

WebSession配置

@Configuration
@EnableRedisWebSession(maxInactiveIntervalInSeconds = 60) //使用注解@EnableRedisWebSession ,maxInactiveIntervalInSeconds设置数据过期时间,spring.session.timeout不管用
public class RedisWebSessionConfig { //考虑到分布式系统,一般使用redis存储session @Bean
public LettuceConnectionFactory lettuceConnectionFactory() {
return new LettuceConnectionFactory();
} }
//单点登录使用ReactiveRedisSessionRepository.getSessionRedisOperations().scan方法查询相同用户名的session,删除其他session即可
public Mono<Map<String, String>> findByPrincipalName(String name) {
return reactiveSessionRepository.getSessionRedisOperations().scan(ScanOptions.scanOptions().match(ReactiveRedisSessionRepository.DEFAULT_NAMESPACE + ":sessions:*").build())
.flatMap(new Function<String, Publisher<Tuple2<String, Map.Entry<Object, Object>>>>() {
@Override
public Publisher<Tuple2<String, Map.Entry<Object, Object>>> apply(String s) {
return reactiveSessionRepository.getSessionRedisOperations().opsForHash().entries(s)
.map(new Function<Map.Entry<Object, Object>, Tuple2<String, Map.Entry<Object, Object>>>() {
@Override
public Tuple2<String, Map.Entry<Object, Object>> apply(Map.Entry<Object, Object> objectObjectEntry) {
return Tuples.of(s, objectObjectEntry);
}
});
}
})
.filter(new Predicate<Tuple2<String, Map.Entry<Object, Object>>>() {
@Override
public boolean test(Tuple2<String, Map.Entry<Object, Object>> rule) {
Map.Entry<Object, Object> t = rule.getT2();
String key = "sessionAttr:" + HttpSessionSecurityContextRepository.SPRING_SECURITY_CONTEXT_KEY;
if (key.equals(t.getKey())) {
User sci = (User) ((SecurityContextImpl) t.getValue()).getAuthentication().getPrincipal();
return sci.getUsername().equals(name);
}
return false;
}
})
.collectMap(new Function<Tuple2<String, Map.Entry<Object, Object>>, String>() {
@Override
public String apply(Tuple2<String, Map.Entry<Object, Object>> rule) {
return name;
}
}, new Function<Tuple2<String, Map.Entry<Object, Object>>, String>() {
@Override
public String apply(Tuple2<String, Map.Entry<Object, Object>> rule) {
return rule.getT1().replace(ReactiveRedisSessionRepository.DEFAULT_NAMESPACE + ":sessions:", "");
}
});
}

对标的 SpringWebMVC配置

@Configuration
@EnableRedisHttpSession //使用注解@EnableRedisHttpSession
public class RedisHttpSessionConfig { //考虑到分布式系统,一般使用redis存储session @Bean
public LettuceConnectionFactory redisConnectionFactory() {
return new LettuceConnectionFactory();
} }
//单点登录使用FindByIndexNameSessionRepository根据用户名查询session,删除其他session即可
Map<String, Session> map = findByIndexNameSessionRepository.findByPrincipalName(name);

文件上传配置

//参数上传
//定义参数bean
@Setter
@Getter
@ToString
@ApiModel
public class QueryBean{
@ApiModelProperty(value = "普通参数", required = false, example = "")
private String query;
@ApiModelProperty(value = "文件参数", required = false, example = "")
private FilePart image; //强调,webflux中使用FilePart作为接收文件的类型
}
//定义接口
@ApiOperation("一个接口")
@PostMapping("/path")
//这里需要使用@ApiImplicitParam显示配置【文件参数】才能使swagger界面显示上传文件按钮
@ApiImplicitParams({
@ApiImplicitParam(
paramType = "form", //表单参数
dataType = "__file", //最新版本使用__file表示文件,以前用的是file
name = "image", //和QueryBean里面的【文件参数image】同名
value = "文件") //注释
})
public Mono<Response> bannerAddOrUpdate(QueryBean q) { }

WebFlux 执行流程

userAdd方法代码如下:

        public Mono<User> userAdd(@RequestBody User dto)
{
//命令式写法
// jpaEntityService.delUser(dto); //响应式写法
return Mono.create(cityMonoSink -> cityMonoSink.success(jpaEntityService.addUser(dto)));
}

由于返回的数据只有一个所以使用的是Mono作为返回数据,使用Mono类静态create方法创建Mono对象,代码如下:

public abstract class Mono<T> implements Publisher<T> {
static final BiPredicate EQUALS_BIPREDICATE = Object::equals; public Mono() {
} public static <T> Mono<T> create(Consumer<MonoSink<T>> callback) {
return onAssembly(new MonoCreate(callback));
}
...
}

​ 可以到create方法接收一个参数,参数是Consumer对象,通过callback可以看出,这里使用的是callback回调,下面看看Consumer接口的定义:


@FunctionalInterface
public interface Consumer<T> { /**
* Performs this operation on the given argument.
*
* @param t the input argument
*/
void accept(T t); /**
* Returns a composed {@code Consumer} that performs, in sequence, this
* operation followed by the {@code after} operation. If performing either
* operation throws an exception, it is relayed to the caller of the
* composed operation. If performing this operation throws an exception,
* the {@code after} operation will not be performed.
*
* @param after the operation to perform after this operation
* @return a composed {@code Consumer} that performs in sequence this
* operation followed by the {@code after} operation
* @throws NullPointerException if {@code after} is null
*/
default Consumer<T> andThen(Consumer<? super T> after) {
Objects.requireNonNull(after);
return (T t) -> { accept(t); after.accept(t); };
}
}

通过上面的代码可以看出,有两个方法,一个是默认的方法andThen,还有一个accept方法,

Mono.create()方法的参数需要一个实现类,实现Consumer接口;Mono.create方法的参数指向的实例对象, 就是要实现这个accept方法。

例子中,下面的lambda表达式,就是accept方法的实现,实参的类型为 Consumer<MonoSink> , accept的实现为 如下:

cityMonoSink -> cityMonoSink.success(jpaEntityService.addUser(dto))

来来来,重复看一下,create方法的实现:

   public static <T> Mono<T> create(Consumer<MonoSink<T>> callback) {
return onAssembly(new MonoCreate(callback));
}

​ 在方法内部调用了onAssembly方法,参数是MonoCreate对象,然后我们看看MonoCreate类,代码如下:

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by Fernflower decompiler)
// package reactor.core.publisher; import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Consumer;
import java.util.function.LongConsumer;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.Scannable.Attr;
import reactor.core.publisher.FluxCreate.SinkDisposable;
import reactor.util.annotation.Nullable;
import reactor.util.context.Context; final class MonoCreate<T> extends Mono<T> {
final Consumer<MonoSink<T>> callback; MonoCreate(Consumer<MonoSink<T>> callback) {
this.callback = callback;
} public void subscribe(CoreSubscriber<? super T> actual) {
MonoCreate.DefaultMonoSink<T> emitter = new MonoCreate.DefaultMonoSink(actual);
actual.onSubscribe(emitter); try {
this.callback.accept(emitter);
} catch (Throwable var4) {
emitter.error(Operators.onOperatorError(var4, actual.currentContext()));
} } static final class DefaultMonoSink<T> extends AtomicBoolean implements MonoSink<T>, InnerProducer<T> {
final CoreSubscriber<? super T> actual;
volatile Disposable disposable;
static final AtomicReferenceFieldUpdater<MonoCreate.DefaultMonoSink, Disposable> DISPOSABLE = AtomicReferenceFieldUpdater.newUpdater(MonoCreate.DefaultMonoSink.class, Disposable.class, "disposable");
volatile int state;
static final AtomicIntegerFieldUpdater<MonoCreate.DefaultMonoSink> STATE = AtomicIntegerFieldUpdater.newUpdater(MonoCreate.DefaultMonoSink.class, "state");
volatile LongConsumer requestConsumer;
static final AtomicReferenceFieldUpdater<MonoCreate.DefaultMonoSink, LongConsumer> REQUEST_CONSUMER = AtomicReferenceFieldUpdater.newUpdater(MonoCreate.DefaultMonoSink.class, LongConsumer.class, "requestConsumer");
T value;
static final int NO_REQUEST_HAS_VALUE = 1;
static final int HAS_REQUEST_NO_VALUE = 2;
static final int HAS_REQUEST_HAS_VALUE = 3; DefaultMonoSink(CoreSubscriber<? super T> actual) {
this.actual = actual;
} public Context currentContext() {
return this.actual.currentContext();
} @Nullable
public Object scanUnsafe(Attr key) {
if (key != Attr.TERMINATED) {
return key == Attr.CANCELLED ? OperatorDisposables.isDisposed(this.disposable) : super.scanUnsafe(key);
} else {
return this.state == 3 || this.state == 1;
}
} public void success() {
if (STATE.getAndSet(this, 3) != 3) {
try {
this.actual.onComplete();
} finally {
this.disposeResource(false);
}
} } public void success(@Nullable T value) {
if (value == null) {
this.success();
} else {
int s;
do {
s = this.state;
if (s == 3 || s == 1) {
Operators.onNextDropped(value, this.actual.currentContext());
return;
} if (s == 2) {
if (STATE.compareAndSet(this, s, 3)) {
try {
this.actual.onNext(value);
this.actual.onComplete();
} finally {
this.disposeResource(false);
}
} return;
} this.value = value;
} while(!STATE.compareAndSet(this, s, 1)); }
} public void error(Throwable e) {
if (STATE.getAndSet(this, 3) != 3) {
try {
this.actual.onError(e);
} finally {
this.disposeResource(false);
}
} else {
Operators.onOperatorError(e, this.actual.currentContext());
} } public MonoSink<T> onRequest(LongConsumer consumer) {
Objects.requireNonNull(consumer, "onRequest");
if (!REQUEST_CONSUMER.compareAndSet(this, (Object)null, consumer)) {
throw new IllegalStateException("A consumer has already been assigned to consume requests");
} else {
return this;
}
} public CoreSubscriber<? super T> actual() {
return this.actual;
} public MonoSink<T> onCancel(Disposable d) {
Objects.requireNonNull(d, "onCancel");
SinkDisposable sd = new SinkDisposable((Disposable)null, d);
if (!DISPOSABLE.compareAndSet(this, (Object)null, sd)) {
Disposable c = this.disposable;
if (c instanceof SinkDisposable) {
SinkDisposable current = (SinkDisposable)c;
if (current.onCancel == null) {
current.onCancel = d;
} else {
d.dispose();
}
}
} return this;
} public MonoSink<T> onDispose(Disposable d) {
Objects.requireNonNull(d, "onDispose");
SinkDisposable sd = new SinkDisposable(d, (Disposable)null);
if (!DISPOSABLE.compareAndSet(this, (Object)null, sd)) {
Disposable c = this.disposable;
if (c instanceof SinkDisposable) {
SinkDisposable current = (SinkDisposable)c;
if (current.disposable == null) {
current.disposable = d;
} else {
d.dispose();
}
}
} return this;
} public void request(long n) {
if (Operators.validate(n)) {
LongConsumer consumer = this.requestConsumer;
if (consumer != null) {
consumer.accept(n);
} int s;
do {
s = this.state;
if (s == 2 || s == 3) {
return;
} if (s == 1) {
if (STATE.compareAndSet(this, s, 3)) {
try {
this.actual.onNext(this.value);
this.actual.onComplete();
} finally {
this.disposeResource(false);
}
} return;
}
} while(!STATE.compareAndSet(this, s, 2)); }
} public void cancel() {
if (STATE.getAndSet(this, 3) != 3) {
this.value = null;
this.disposeResource(true);
} } void disposeResource(boolean isCancel) {
Disposable d = this.disposable;
if (d != OperatorDisposables.DISPOSED) {
d = (Disposable)DISPOSABLE.getAndSet(this, OperatorDisposables.DISPOSED);
if (d != null && d != OperatorDisposables.DISPOSED) {
if (isCancel && d instanceof SinkDisposable) {
((SinkDisposable)d).cancel();
} d.dispose();
}
} }
}
}

上面的代码比较多,我们主要关注下面两个函数:

MonoCreate(Consumer<MonoSink<T>> callback) {
this.callback = callback;
} public void subscribe(CoreSubscriber<? super T> actual) {
MonoCreate.DefaultMonoSink<T> emitter = new MonoCreate.DefaultMonoSink(actual);
actual.onSubscribe(emitter); try {
this.callback.accept(emitter);
} catch (Throwable var4) {
emitter.error(Operators.onOperatorError(var4, actual.currentContext()));
} }

通过上面的代码可以看出,一个是构造器,参数是Consumer,里面进行操作保存了Consumer对象,然后在subscribe方法里面有一句代码是this.callback.accept(emitter),就是在这里进行了接口的回调,回调Consumer的accept方法,这个方法是在调用Mono.create()方法的时候实现了。然后在细看subscribe方法,这里面有一个actual.onSubscribe方法,通过方法名可以知道,这里是订阅了消息。webflux是基于reactor模型,基于事件消息和异步,这里也体现了一个异步。

Mono和Flux的其他用法可以参照上面的源码流程自己看看,就不细说了。

最新文章

  1. 深入理解javascript选择器API系列第一篇——4种元素选择器
  2. 介绍DSA数字签名,非对称加密的另一种实现
  3. Action名称的搜索顺序
  4. 【Swift学习】Swift编程之旅(四)基本运算符
  5. CA 证书
  6. 如何自己编写一个easyui插件续
  7. emacs工程管理,cedet ede插件自动构建Make,Automake
  8. AndroidRichText 让Textview轻松的支持富文本(图像ImageSpan、点击效果等等类似QQ微信聊天)
  9. MemCached Cache Java Client封装优化历程
  10. xml转array
  11. vim 插件
  12. zookeeper[2] zookeeper原理(转)
  13. Django学习(五) 定义视图以及页面模板
  14. 设置cell背景色半透明
  15. 关于QuartusII对ram块的综合
  16. DLNA_百度百科
  17. Mac Git 学习笔记
  18. Android Studio的使用(二)--Debug调试
  19. vue.js中ajax请求
  20. hdu1166树状数组

热门文章

  1. C#类中方法的执行顺序
  2. MakeCode图形编程应用在micro:bit上的多工性能实测
  3. [刷题] 167 Two Sum II
  4. Debian 9.4 多网卡链路聚合bond配置
  5. XRDP freerdp
  6. 利用stat指令查看文件创建时间
  7. k8s 管理存储资源(10)
  8. Linux useradd 命令介绍
  9. python3 摆放家具练习
  10. 程序&quot;三高&quot;解决方案