本期我们主要还是讲解一下Gateway,上一期我们讲解了一下Gateway中进行路由转发的关键角色,过滤器和断言是如何被加载的,上期链接:

  https://www.cnblogs.com/guoxiaoyu/p/14735706.html

  好了我们废话不多说,开始今天的Gateway请求转发流程讲解,为了在讲解源码的时候,以防止大家可能会迷糊,博主专门画了一下源码流程图,链接地址:

  https://www.processon.com/view/link/60c88f64e401fd4a04b7db24

  上一期我们已经知道了相关类的加载,今天直接从源码开始,大家可能不太了解webflux和reactor这种响应式编程,毕竟不是主流,我们一直用的都是spring MVC,没事,我们主要讲解流程,不做过多的讲解。

  大家先看下面的代码,我们今天主要的代码入口就是这里:

 1 public Mono<Void> handle(ServerWebExchange exchange) {
2 if (logger.isDebugEnabled()) {
3 ServerHttpRequest request = exchange.getRequest();
4 logger.debug("Processing " + request.getMethodValue() + " request for [" + request.getURI() + "]");
5 }
6 if (this.handlerMappings == null) {
7 return Mono.error(HANDLER_NOT_FOUND_EXCEPTION);
8 }
9 return Flux.fromIterable(this.handlerMappings)
10 .concatMap(mapping -> mapping.getHandler(exchange))
11 .next()
12 .switchIfEmpty(Mono.error(HANDLER_NOT_FOUND_EXCEPTION))
13 .flatMap(handler -> invokeHandler(exchange, handler))
14 .flatMap(result -> handleResult(exchange, result));
15 }

  第一步,我们先来看一看几个主要的类及其方法,Flux 表示的是包含 0 到 N 个元素的异步序列,Mono 表示的是包含 0 或者 1 个元素的异步序列,记住Flux 是多个元素集合,Mono 是单个元素集合就很好理解以后的源码了,以下方法注释是博主为了大家好理解而写的,具体实际的意义还是需要大家自行Google学习了。

  Mono.empty();创建一个空Mono对象;

  Mono.just(**);创建一个**元素的对象;

  Mono.then(**);在最后执行,相当于spring的aop后置通知一样

  开始我们的第一步解析:mapping.getHandler(exchange);本方法主要做的是获取路由,我们继续看一看底层源码:

 1 public Mono<Object> getHandler(ServerWebExchange exchange) {
2 return getHandlerInternal(exchange).map(handler -> {
3 if (CorsUtils.isCorsRequest(exchange.getRequest())) {
4 CorsConfiguration configA = this.globalCorsConfigSource.getCorsConfiguration(exchange);
5 CorsConfiguration configB = getCorsConfiguration(handler, exchange);
6 CorsConfiguration config = (configA != null ? configA.combine(configB) : configB);
7 if (!getCorsProcessor().process(config, exchange) ||
8 CorsUtils.isPreFlightRequest(exchange.getRequest())) {
9 return REQUEST_HANDLED_HANDLER;
10 }
11 }
12 return handler;
13 });
14 }

getHandler

 1 protected Mono<?> getHandlerInternal(ServerWebExchange exchange) {
2 // don't handle requests on the management port if set
3 if (managmentPort != null && exchange.getRequest().getURI().getPort() == managmentPort.intValue()) {
4 return Mono.empty();
5 }
6 exchange.getAttributes().put(GATEWAY_HANDLER_MAPPER_ATTR, getSimpleName());
7
8 return lookupRoute(exchange)
9 // .log("route-predicate-handler-mapping", Level.FINER) //name this
10 .flatMap((Function<Route, Mono<?>>) r -> {
11 exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
12 if (logger.isDebugEnabled()) {
13 logger.debug("Mapping [" + getExchangeDesc(exchange) + "] to " + r);
14 }
15
16 exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r);
17 return Mono.just(webHandler);
18 }).switchIfEmpty(Mono.empty().then(Mono.fromRunnable(() -> {
19 exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
20 if (logger.isTraceEnabled()) {
21 logger.trace("No RouteDefinition found for [" + getExchangeDesc(exchange) + "]");
22 }
23 })));
24 }

getHandlerInternal

 1 //这里返回的是单个对象
2 protected Mono<Route> lookupRoute(ServerWebExchange exchange) {
3 return this.routeLocator
4 //我们一会主要看一下这个方法
5 .getRoutes()
6 //individually filter routes so that filterWhen error delaying is not a problem
7 .concatMap(route -> Mono
8 .just(route)
9 .filterWhen(r -> {
10 // add the current route we are testing
11 exchange.getAttributes().put(GATEWAY_PREDICATE_ROUTE_ATTR, r.getId());
12 //只返回一个符合断言的路由配置,所以整个流程先匹配断言
13 return r.getPredicate().apply(exchange);
14 })
15 //instead of immediately stopping main flux due to error, log and swallow it
16 .doOnError(e -> logger.error("Error applying predicate for route: "+route.getId(), e))
17 .onErrorResume(e -> Mono.empty())
18 )
19 // .defaultIfEmpty() put a static Route not found
20 // or .switchIfEmpty()
21 // .switchIfEmpty(Mono.<Route>empty().log("noroute"))
22 .next()
23 //TODO: error handling
24 .map(route -> {
25 if (logger.isDebugEnabled()) {
26 logger.debug("Route matched: " + route.getId());
27 }
28 validateRoute(route, exchange);
29 return route;
30 });
31
32
33 }

  我们现在看看Route对象是怎么在getRoutes()创建的。

 1 public Flux<Route> getRoutes() {
2
3 return this.routeDefinitionLocator.getRouteDefinitions() //这一步是从配置文件中读取我们配置的路由定义
4 .map(this::convertToRoute)//这一步会加载我们配置给路由的断言与过滤器形成路由对象
5 //TODO: error handling
6 .map(route -> {
7 if (logger.isDebugEnabled()) {
8 logger.debug("RouteDefinition matched: " + route.getId());
9 }
10 return route;
11 });
12
13 }
 1 //关键的代码在这里
2 private Route convertToRoute(RouteDefinition routeDefinition) {
3 //这两步才会跟上一章节讲解的如何加载断言与过滤器有关联,大家可以自行查看底层源码是如何查出来的对象的
4 AsyncPredicate<ServerWebExchange> predicate = combinePredicates(routeDefinition);
5 List<GatewayFilter> gatewayFilters = getFilters(routeDefinition);
6 //终于生成了路由对象
7 return Route.async(routeDefinition)
8 .asyncPredicate(predicate)
9 .replaceFilters(gatewayFilters)
10 .build();
11 }

  这里大家要记住getHandlerInternal方法,生成了Mono.just(webHandler),仔细看webHandler是FilteringWebHandler对象,以后用到这个WebHandler,好了路由生成也选择完毕了,我们应该知道改请求是否符合我们配置的过滤器了,因为过滤器还没用上,断言只负责了选择哪一个路由生效。

 1     //我们看下一个主流程的方法
2 private Mono<HandlerResult> invokeHandler(ServerWebExchange exchange, Object handler) {
3 if (this.handlerAdapters != null) {
4 for (HandlerAdapter handlerAdapter : this.handlerAdapters) {
5 if (handlerAdapter.supports(handler)) {
6 //这里走的是SimpleHandlerAdapter,可以自己debug发现,也可以去找自动配置类找,这里就不讲解了
7 return handlerAdapter.handle(exchange, handler);
8 }
9 }
10 }
11 return Mono.error(new IllegalStateException("No HandlerAdapter: " + handler));
12 }
1 public Mono<HandlerResult> handle(ServerWebExchange exchange, Object handler) {
2 WebHandler webHandler = (WebHandler) handler;
3 //让大家记住的那个FilteringWebHandler类,终于在这里起作用了。我们这回可以看看过滤器是如何起作用的
4 Mono<Void> mono = webHandler.handle(exchange);
5 return mono.then(Mono.empty());//过滤器处理完后,开始处理mono.then方法
6 }
 1     public Mono<Void> handle(ServerWebExchange exchange) {
2 Route route = exchange.getRequiredAttribute(GATEWAY_ROUTE_ATTR);
3 List<GatewayFilter> gatewayFilters = route.getFilters();//我们路由自己配置的过滤器
4 //加载全局过滤器
5 List<GatewayFilter> combined = new ArrayList<>(this.globalFilters);
6 combined.addAll(gatewayFilters);
7 //TODO: needed or cached?
8 AnnotationAwareOrderComparator.sort(combined);
9 //排序
10 if (logger.isDebugEnabled()) {
11 logger.debug("Sorted gatewayFilterFactories: "+ combined);
12 }
13 //形成过滤器链,开始调用filter进行过滤。这里剩下的我们就不讲解,跟spring配置的过滤器链调用流程是一样的
14 return new DefaultGatewayFilterChain(combined).filter(exchange);
15 }

  至此,我们的请求流程基本完事了,我们再来看看几个主要的全局过滤器配置。LoadBalancerClientFilter:负责获取服务器ip的过滤器,NettyRoutingFilter:负责转发我们请求的过滤器。

  这里主要讲解Gateway流程,关于Ribbon的代码我们就不做主要讲解了

 1     public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
2 URI url = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR);
3 String schemePrefix = exchange.getAttribute(GATEWAY_SCHEME_PREFIX_ATTR);
4 //所以要加上lb前缀,才会走该过滤器
5 if (url == null || (!"lb".equals(url.getScheme()) && !"lb".equals(schemePrefix))) {
6 return chain.filter(exchange);
7 }
8 //preserve the original url
9 addOriginalRequestUrl(exchange, url);
10
11 log.trace("LoadBalancerClientFilter url before: " + url);
12 //选择实例
13 final ServiceInstance instance = choose(exchange);
14
15 ......
16 return chain.filter(exchange);
17 }

  看主要代码即可,非必要的看来也晕。

 1 public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
2
3 .......
4 //通过httpClient发送请求获取响应
5 Mono<HttpClientResponse> responseMono = this.httpClient.request(method, url, req -> {
6 final HttpClientRequest proxyRequest = req.options(NettyPipeline.SendOptions::flushOnEach)
7 .headers(httpHeaders)
8 .chunkedTransfer(chunkedTransfer)
9 .failOnServerError(false)
10 .failOnClientError(false);
11
12 if (preserveHost) {
13 String host = request.getHeaders().getFirst(HttpHeaders.HOST);
14 proxyRequest.header(HttpHeaders.HOST, host);
15 }
16
17 if (properties.getResponseTimeout() != null) {
18 proxyRequest.context(ctx -> ctx.addHandlerFirst(
19 new ReadTimeoutHandler(properties.getResponseTimeout().toMillis(), TimeUnit.MILLISECONDS)));
20 }
21
22 return proxyRequest.sendHeaders() //I shouldn't need this
23 .send(request.getBody().map(dataBuffer ->
24 ((NettyDataBuffer) dataBuffer).getNativeBuffer()));
25 });
26
27 return responseMono.doOnNext(res -> {
28 ...
29 }
30
31 }

  我们今天主要看的是Gateway的主要请求转发的流程,像webflux这种我们没有精力学习的,可以暂时略过,毕竟也不是主流。我们今天最后总结一下。首先在Gateway这两章的点,项目启动时加载断言与过滤器->接收请求时添加配置文件中的路由配置并生成路由对象->找到符合断言的路由->除了个人配置的过滤器联合全局过滤器生成过滤器链,并逐步过滤知道所有调用完成。

  其中我们主要分析了两个主要的全局过滤器:LoadBalancerClientFilter:负责获取服务器ip的过滤器,NettyRoutingFilter:负责转发我们请求的过滤器。


最新文章

  1. Java Web开发中MVC设计模式简介
  2. HTTP请求 GET POST 网络编程实现
  3. Viewdraghelper解析
  4. ORACLE SQL前端补0的三种方式。
  5. IOS 作业项目(2) 画图(保存,撤销,笔粗细设定功能)
  6. vs目录(继承的值)配置
  7. Redis应用场景 及其数据对象 string hash list set sortedset
  8. WebBrowser!
  9. centos7下,解决Apache错误日志文件过大问题
  10. mybatis 增加热加载xml
  11. [物理学与PDEs]第1章第3节 真空中的 Maxwell 方程组, Lorentz 力 3.2 Lorentz 力
  12. leetcode 刷题(2)--- 两数相加
  13. N!中末尾有多少个0
  14. 线程 ID
  15. python对字典及列表递归排序
  16. PostgreSQL数据加载工具之pg_bulkload
  17. select标签和多行文本标签
  18. [转]HTTP Error 502.5 - Process Failure asp.net core error in IIS
  19. bzoj5039:[Jsoi2014]序列维护
  20. idea自动生成testNG.xml

热门文章

  1. ZwQuerySystemInformation枚举内核模块
  2. Jmeter(一) - 从入门到精通 - 环境搭建(详解教程)
  3. [网络编程之Socket套接字介绍,套接字工作流程,基于TCP协议的套接字程序]
  4. wrk 及扩展支持 tcp 字节流协议压测
  5. Django/Flask的一些实现方法
  6. BUUCTF(八)[极客大挑战 2019]LoveSQL
  7. Linux_部署日志服务器
  8. Android屏幕适配全攻略(最权威的官方适配指导)屏幕尺寸 屏幕分辨率 屏幕像素密度 dpdipdpisppx mdpihdpixdpixxdpi
  9. Linux进阶之环境变量文件/etc/profile、/etc/bashrc、/etc/environment
  10. MyBatis 开启 Log4j 日志调试信息开关