Soul 网关源码阅读(三)请求处理概览
简介
    基于上篇:Soul 源码阅读(二)代码初步运行的配置,这次 debug 下请求处理的大致路径,验证网关模型的路径
详细流程记录
查看运行日志,寻找切入点
    上篇中我们配置了 Divide 插件,让 http://localhost:9195 ,转发到了后台服务器 http://localhost:8082 上,首先不打断点运行,查看运行日志,找一个切入点:
 o.d.soul.plugin.base.AbstractSoulPlugin  : divide selector success match , selector name :neety8082o.d.soul.plugin.base.AbstractSoulPlugin  : divide selector success match , selector name :neety8082o.d.s.plugin.httpclient.WebClientPlugin  : The request urlPath is http://localhost:8082, retryTimes is 1
   复制代码
 
    上面的日志中一个比较明显的 divide 相关的日志,是 AbstractSoulPlugin 打印出来的,win 下双击 shift,搜索 AbstractSoulPlugin 进入,发现是一个接口,IDEA 左边向下的箭头查看它的实现,发现有一个熟系的 DividePlugin 实现类,点击进入,在一个明显的 doExecute 函数上打上断点,发起请求:http://localhost:9195
    通过函数调用栈,发送调用的是 SoulWebHandler ,从下面的函数中可以大致看出这是一个循环遍历,遍历 plugins 进行操作
     public Mono<Void> execute(final ServerWebExchange exchange) {            return Mono.defer(() -> {                if (this.index < plugins.size()) {                    SoulPlugin plugin = plugins.get(this.index++);                    Boolean skip = plugin.skip(exchange);                    if (skip) {                        return this.execute(exchange);                    }                    return plugin.execute(exchange, this);                }                return Mono.empty();            });        }
   复制代码
 
跟踪调用栈
    再次往前看调用栈,发送 SoulWebHandler 调用了 SoulWebHandler 的 execute
     public Mono<Void> handle(@NonNull final ServerWebExchange exchange) {        MetricsTrackerFacade.getInstance().counterInc(MetricsLabelEnum.REQUEST_TOTAL.getName());        Optional<HistogramMetricsTrackerDelegate> startTimer = MetricsTrackerFacade.getInstance().histogramStartTimer(MetricsLabelEnum.REQUEST_LATENCY.getName());        // new DefaultSoulPluginChain(plugins).execute(exchange) 明显的调用关系        return new DefaultSoulPluginChain(plugins).execute(exchange).subscribeOn(scheduler)                .doOnSuccess(t -> startTimer.ifPresent(time -> MetricsTrackerFacade.getInstance().histogramObserveDuration(time)));    }
   复制代码
 
    再往前发现看不懂了,没用明显的函数传递关系,我们在上面的函数打上端口,重启程序,再次发送请求
    断点进来后,查看调用栈,发送调用上面函数的是 DefaultWebFilterChain
     public Mono<Void> filter(ServerWebExchange exchange) {        return Mono.defer(() -> {            // 当下面都为null的时候进行调用            return this.currentFilter != null && this.chain != null ? this.invokeFilter(this.currentFilter, this.chain, exchange) : this.handler.handle(exchange);        });    }
   复制代码
 
    再往前查看,调用栈又看不懂了,再次在上面的函数打上断点,重启,发请求,下面就直接写类和相关函数,有特别的地方就叫点说明
    来到 FilteringWebHandler
     public Mono<Void> handle(ServerWebExchange exchange) {        return this.chain.filter(exchange);    }
   复制代码
 
    继续来到 WebHandlerDecorator
     public Mono<Void> handle(ServerWebExchange exchange) {        return this.delegate.handle(exchange);    }
   复制代码
 
    来到 ExceptionHandlingWebHandler
 public Mono<Void> handle(ServerWebExchange exchange) {        Mono completion;        try {            // 在这进行调用            completion = super.handle(exchange);        } catch (Throwable var5) {            completion = Mono.error(var5);        }
        WebExceptionHandler handler;        for(Iterator var3 = this.exceptionHandlers.iterator(); var3.hasNext(); completion = completion.onErrorResume((ex) -> {            return handler.handle(exchange, ex);        })) {            handler = (WebExceptionHandler)var3.next();        }
        return completion;    }
   复制代码
 
    继续来到 HttpWebHandlerAdapter ,这个类有点关键,看到在前面一直传递的变量:exchange,exchange 在这个类中生成,传递给后面的函数进行调用,而且是使用 response 和 request 生成的
     public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {        if (this.forwardedHeaderTransformer != null) {            request = this.forwardedHeaderTransformer.apply(request);        }
        // 重点变量 exchange 的生成        ServerWebExchange exchange = this.createExchange(request, response);        LogFormatUtils.traceDebug(logger, (traceOn) -> {            return exchange.getLogPrefix() + this.formatRequest(exchange.getRequest()) + (traceOn ? ", headers=" + this.formatHeaders(exchange.getRequest().getHeaders()) : "");        });        // this.getDelegate().handle(exchange)        // 通过debug可以看到 getDelete 得到的是 ExceptionHandlingWebHandler,那调用就是在这        Mono var10000 = this.getDelegate().handle(exchange).doOnSuccess((aVoid) -> {            this.logResponse(exchange);        }).onErrorResume((ex) -> {            return this.handleUnresolvedError(exchange, ex);        });        response.getClass();        return var10000.then(Mono.defer(response::setComplete));    }
   复制代码
 
    继续走到 ReactiveWebServerApplicationContext
     public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {            return this.handler.handle(request, response);        }
   复制代码
 
    继续走到 ReactorHttpHandlerAdapter
     public Mono<Void> apply(HttpServerRequest reactorRequest, HttpServerResponse reactorResponse) {        NettyDataBufferFactory bufferFactory = new NettyDataBufferFactory(reactorResponse.alloc());
        try {            // exchange 需要的 request 和 response 的生成            ReactorServerHttpRequest request = new ReactorServerHttpRequest(reactorRequest, bufferFactory);            ServerHttpResponse response = new ReactorServerHttpResponse(reactorResponse, bufferFactory);            if (request.getMethod() == HttpMethod.HEAD) {                response = new HttpHeadResponseDecorator((ServerHttpResponse)response);            }
            return this.httpHandler.handle(request, (ServerHttpResponse)response).doOnError((ex) -> {                logger.trace(request.getLogPrefix() + "Failed to complete: " + ex.getMessage());            }).doOnSuccess((aVoid) -> {                logger.trace(request.getLogPrefix() + "Handling completed");            });        } catch (URISyntaxException var6) {            if (logger.isDebugEnabled()) {                logger.debug("Failed to get request URI: " + var6.getMessage());            }
            reactorResponse.status(HttpResponseStatus.BAD_REQUEST);            return Mono.empty();        }    }
   复制代码
 
    继续走到 HttpServerHandle
     public void onStateChange(Connection connection, State newState) {        if (newState == HttpServerState.REQUEST_RECEIVED) {            try {                if (log.isDebugEnabled()) {                    log.debug(ReactorNetty.format(connection.channel(), "Handler is being applied: {}"), new Object[]{this.handler});                }
                HttpServerOperations ops = (HttpServerOperations)connection;                Mono.fromDirect((Publisher)this.handler.apply(ops, ops)).subscribe(ops.disposeSubscriber());            } catch (Throwable var4) {                log.error(ReactorNetty.format(connection.channel(), ""), var4);                connection.channel().close();            }        }
    }
   复制代码
 
    继续走到 TcpServerBind
         public void onStateChange(Connection connection, State newState) {            if (newState == State.DISCONNECTING && connection.channel().isActive() && !connection.isPersistent()) {                connection.dispose();            }
            this.childObs.onStateChange(connection, newState);        }
   复制代码
 
    走到了很关键的一个: HttpServerOperations ,下面这个函数的 ctx 和 msg 也太熟悉不过了,明显的 netty 的 handler 处理,请求入口
     protected void onInboundNext(ChannelHandlerContext ctx, Object msg) {        if (msg instanceof HttpRequest) {            try {                // 调用                this.listener().onStateChange(this, HttpServerState.REQUEST_RECEIVED);            } catch (Exception var4) {                this.onInboundError(var4);                ReferenceCountUtil.release(msg);                return;            }
            if (msg instanceof FullHttpRequest) {                super.onInboundNext(ctx, msg);                if (this.isHttp2()) {                    this.onInboundComplete();                }            }
        } else {            if (msg instanceof HttpContent) {                if (msg != LastHttpContent.EMPTY_LAST_CONTENT) {                    super.onInboundNext(ctx, msg);                }
                if (msg instanceof LastHttpContent) {                    this.onInboundComplete();                }            } else {                super.onInboundNext(ctx, msg);            }
        }    }
   复制代码
 
    这个时候走到头了,我们跳出来看一看,梳理一下目前所得,发现我们搞清楚了一个请求接受,然后到 divide 的处理过程,梳理下大致如下:
- HttpServerOperations : 明显的 netty 的请求接收的地方,请求入口 
- TcpServerBind 
- HttpServerHandle 
- ReactorHttpHandlerAdapter :生成 response 和 request 
- ReactiveWebServerApplicationContext 
- HttpWebHandlerAdapter :exchange 的生成 
- ExceptionHandlingWebHandler 
- WebHandlerDecorator 
- FilteringWebHandler 
- DefaultWebFilterChain 
- SoulWebHandler :plugins 调用链 
- DividePlugin :plugin 具体处理 
逐步 debug 相关细节
    这个时候参考网关模型,发现路由匹配之类的没有看到,没办法,细节部分没有清楚的就是: SoulWebHandler ,它的 plugin 调用的部分没有细看,于是我们进行如下的函数的 debug,进入各个函数的调用(进入 subscribe 之类的时候就跳出来,点击进入下一个断点,IDEA debug 左上角的箭头)
         public Mono<Void> execute(final ServerWebExchange exchange) {            return Mono.defer(() -> {                if (this.index < plugins.size()) {                    SoulPlugin plugin = plugins.get(this.index++);                    Boolean skip = plugin.skip(exchange);                    if (skip) {                        return this.execute(exchange);                    }                    return plugin.execute(exchange, this);                }                return Mono.empty();            });        }
   复制代码
 
    我们逐步调试上面的那个函数,查看变量: plugins,内容大致如下,后面 false 和 true 是变量 skip。发现是 true 就不执行,看函数也能大致猜的到,各个插件的 skip 情况如下:
- GlobalPlugin : false 
- SignPlugin : false 
- WafPlugin: false 
- RateLimiterPlugin : false 
- HystrixPlugin : false 
- Resilience4JPlugin : false 
- DividePlugin : false 
- WebClientPulugin : false 
- WebsocketPlugin : true 
- BodyParamPlugin : false 
- AlibabaDubblePlugin : true 
- MonitorPlugin : false 
- WebClientResponsePlugin : false 
- DubboResponsePlugin : true 
    调试的时候跟着进去,进去以后一步一步的走即可
    我们调试进入前几个 plugin 都是没有执行到下面代码中的 if,在 divide plugin 执行了,我们跟着进入看看,看到了疑似路由匹配的 rules,还有 match,猜测这是路由匹配相关
     public Mono<Void> execute(final ServerWebExchange exchange, final SoulPluginChain chain) {        String pluginName = named();        final PluginData pluginData = BaseDataCache.getInstance().obtainPluginData(pluginName);        if (pluginData != null && pluginData.getEnabled()) {            final Collection<SelectorData> selectors = BaseDataCache.getInstance().obtainSelectorData(pluginName);            if (CollectionUtils.isEmpty(selectors)) {                return handleSelectorIsNull(pluginName, exchange, chain);            }            final SelectorData selectorData = matchSelector(exchange, selectors);            if (Objects.isNull(selectorData)) {                return handleSelectorIsNull(pluginName, exchange, chain);            }            selectorLog(selectorData, pluginName);            final List<RuleData> rules = BaseDataCache.getInstance().obtainRuleData(selectorData.getId());            if (CollectionUtils.isEmpty(rules)) {                return handleRuleIsNull(pluginName, exchange, chain);            }            RuleData rule;            if (selectorData.getType() == SelectorTypeEnum.FULL_FLOW.getCode()) {                //get last                rule = rules.get(rules.size() - 1);            } else {                // divide plugin 执行到这步,在rules,发现我们配置的规则,猜测这里是路由匹配                rule = matchRule(exchange, rules);            }            if (Objects.isNull(rule)) {                return handleRuleIsNull(pluginName, exchange, chain);            }            ruleLog(rule, pluginName);            return doExecute(exchange, chain, selectorData, rule);        }        return chain.execute(exchange);    }
   复制代码
 
    继续 debug,进入: WebClientPlugin ,看到了疑似发送请求给后台服务器,相关代码如下:
     public Mono<Void> execute(final ServerWebExchange exchange, final SoulPluginChain chain) {        final SoulContext soulContext = exchange.getAttribute(Constants.CONTEXT);        assert soulContext != null;        String urlPath = exchange.getAttribute(Constants.HTTP_URL);        if (StringUtils.isEmpty(urlPath)) {            Object error = SoulResultWrap.error(SoulResultEnum.CANNOT_FIND_URL.getCode(), SoulResultEnum.CANNOT_FIND_URL.getMsg(), null);            return WebFluxResultUtils.result(exchange, error);        }        long timeout = (long) Optional.ofNullable(exchange.getAttribute(Constants.HTTP_TIME_OUT)).orElse(3000L);        int retryTimes = (int) Optional.ofNullable(exchange.getAttribute(Constants.HTTP_RETRY)).orElse(0);        log.info("The request urlPath is {}, retryTimes is {}", urlPath, retryTimes);        HttpMethod method = HttpMethod.valueOf(exchange.getRequest().getMethodValue());        WebClient.RequestBodySpec requestBodySpec = webClient.method(method).uri(urlPath);        return handleRequestBody(requestBodySpec, exchange, timeout, retryTimes, chain);    }
    private Mono<Void> handleRequestBody(final WebClient.RequestBodySpec requestBodySpec,                                         final ServerWebExchange exchange,                                         final long timeout,                                         final int retryTimes,                                         final SoulPluginChain chain) {        // 下面这段代码太想前端的 ajax 调用了,猜测这是 请求发送        return requestBodySpec.headers(httpHeaders -> {            httpHeaders.addAll(exchange.getRequest().getHeaders());            httpHeaders.remove(HttpHeaders.HOST);        })                .contentType(buildMediaType(exchange))                .body(BodyInserters.fromDataBuffers(exchange.getRequest().getBody()))                .exchange()                .doOnError(e -> log.error(e.getMessage()))                .timeout(Duration.ofMillis(timeout))                .retryWhen(Retry.onlyIf(x -> x.exception() instanceof ConnectTimeoutException)                    .retryMax(retryTimes)                    .backoff(Backoff.exponential(Duration.ofMillis(200), Duration.ofSeconds(20), 2, true)))                .flatMap(e -> doNext(e, exchange, chain));
    }
    private Mono<Void> doNext(final ClientResponse res, final ServerWebExchange exchange, final SoulPluginChain chain) {        if (res.statusCode().is2xxSuccessful()) {            exchange.getAttributes().put(Constants.CLIENT_RESPONSE_RESULT_TYPE, ResultEnum.SUCCESS.getName());        } else {            exchange.getAttributes().put(Constants.CLIENT_RESPONSE_RESULT_TYPE, ResultEnum.ERROR.getName());        }        exchange.getAttributes().put(Constants.CLIENT_RESPONSE_ATTR, res);        return chain.execute(exchange);    }
   复制代码
 
    继续进入,来到: WebClientResponsePlugin ,发现疑似 response 返回给客户端的代码
     public Mono<Void> execute(final ServerWebExchange exchange, final SoulPluginChain chain) {        return chain.execute(exchange).then(Mono.defer(() -> {            ServerHttpResponse response = exchange.getResponse();            ClientResponse clientResponse = exchange.getAttribute(Constants.CLIENT_RESPONSE_ATTR);            if (Objects.isNull(clientResponse)                    || response.getStatusCode() == HttpStatus.BAD_GATEWAY                    || response.getStatusCode() == HttpStatus.INTERNAL_SERVER_ERROR) {                Object error = SoulResultWrap.error(SoulResultEnum.SERVICE_RESULT_ERROR.getCode(), SoulResultEnum.SERVICE_RESULT_ERROR.getMsg(), null);                return WebFluxResultUtils.result(exchange, error);            }            if (response.getStatusCode() == HttpStatus.GATEWAY_TIMEOUT) {                Object error = SoulResultWrap.error(SoulResultEnum.SERVICE_TIMEOUT.getCode(), SoulResultEnum.SERVICE_TIMEOUT.getMsg(), null);                return WebFluxResultUtils.result(exchange, error);            }            response.setStatusCode(clientResponse.statusCode());            response.getCookies().putAll(clientResponse.cookies());            response.getHeaders().putAll(clientResponse.headers().asHttpHeaders());            // 疑似响应返回            return response.writeWith(clientResponse.body(BodyExtractors.toDataBuffers()));        }));    }
   复制代码
 
    经过一通调试,感觉 DividePlugin 、WebClientPlugin 、 WebClientResponsePlugin 非常的可疑,我们取消所有的断点,然后对他们三个打上断点进行调试
    首先对 DividePlugin 进行测试,我们发送一个没有配置的请求 http://localhost:9195/get ,发送后一路 debug 下来,发现返回结果如下:
 {    "code": -107,    "message": "Can not find selector, please check your configuration!",    "data": null}
   复制代码
 
    符合我们的预期,我们看下相关的代码,后面还测试了配置不正确的,发现都会进入下面的调用函数:
     public static Mono<Void> result(final ServerWebExchange exchange, final Object result) {        exchange.getResponse().getHeaders().setContentType(MediaType.APPLICATION_JSON);        // 这个exchange.getResponse().writeWith 很前面看到基本一样,可以猜测soul里面估计都是这样返回响应的        return exchange.getResponse().writeWith(Mono.just(exchange.getResponse()                .bufferFactory().wrap(Objects.requireNonNull(JsonUtils.toJson(result)).getBytes())));    }
   复制代码
 
    继续测试正确的请求,WebClientPlugin 经过上面的分析和根据函数大致代码判断,handleRequestBody 发送请求,doNext 可以收到请求结果,我们在 doNext 上打上断点,发现果然在后台服务端收到了请求(自己写的 netty 服务,并打印日志),验证我们这里是请求发送的猜想
    WebClientResponsePlugin 我们在 execute 函数打上断点,调试的时候注意到,进入了两次,一次是 chain.execute(exchange),一次是后面的 then,这个有点像 lamda 表达式(也可以类比为 vue 的请求),而且 then 是等所有 plugin 都运行以后才执行,执行完以后进客户端就得到了结果
    具体的发送逻辑还没有看懂,但不影响这次的处理流程解析。再 debug 的时候还有一个不断循环调用的地方,我们回过头来去看一下它,看看有什么分析疏漏没
    在类: DefaultWebFilterChain 中有这么一点循环调用,代码如下:
     public Mono<Void> filter(ServerWebExchange exchange) {        return Mono.defer(() -> {            return this.currentFilter != null && this.chain != null ? this.invokeFilter(this.currentFilter, this.chain, exchange) : this.handler.handle(exchange);        });    }
   复制代码
 
    我们查看其相关类有下面几个,进入相应的类看了看,大致如下:
- MetricsWebFilter : 没看懂 
- HealthFilter : 感觉类似监控检查,可以直接方法,因为是本地的;"/actuator/health", "/health_check",尝试了下确实不走后面的处理逻辑,直接返回了 
- FileSizeFilter :文件上传大小限制?MediaType.MULTIPART_FORM_DATA.isCompatibleWith(mediaType) 
- WebSocketParamFilter :不太清楚其功能 
- HiddenHttpMethodFilter :看到好像没有啥逻辑代码 
    得到了健康检查之类的直接在 HealthFilter 中返回,文件上传大小限制功能也在 filter 中,其他的不太清楚,但不影响大局
总结
    经过总结梳理,得到下面的初步处理流程概览:
- HttpServerOperations : 明显的 netty 的请求接收的地方,请求入口 
- ReactorHttpHandlerAdapter :生成 response 和 request 
- HttpWebHandlerAdapter :exchange 的生成 
- FilteringWebHandler : filter 操作 
- SoulWebHandler :plugins 调用链 
    请求由 netty 收到后,来到 Filter,这里进行一些处理:健康检查,文件大小检查等待,然后来到核心的 plugins,这里有三个部分的 plugin 需要注意(自己给它分的,初步猜测):
- 前置处理:这里的 plugin 都会进行匹配,感觉就是针对配置后的 url 进行认证、黑名单、限流、降级、熔断等操作 
- 请求发送:这里对 HTTP、websocket、rpc、文件上传(这个是猜测)这四种请求进行处理,发送到后端服务器 
- 响应返回:这里就两种响应,HTTP 和 rpc,拿到后返回给客户端 
    可以看到 plugins 非常的核心,关键功能都是在这里实现的,其中 divide plugin 好像扮演了路由匹配的角色,在 Soul 中就没有明显单独的路由匹配
    请求和响应的处理也是在 plugins 进行处理的
评论