Soul 网关源码阅读(三)请求处理概览
简介
基于上篇:Soul 源码阅读(二)代码初步运行的配置,这次 debug 下请求处理的大致路径,验证网关模型的路径
详细流程记录
查看运行日志,寻找切入点
上篇中我们配置了 Divide 插件,让 http://localhost:9195 ,转发到了后台服务器 http://localhost:8082 上,首先不打断点运行,查看运行日志,找一个切入点:
o.d.soul.plugin.base.AbstractSoulPlugin : divide selector success match , selector name :neety8082o.d.soul.plugin.base.AbstractSoulPlugin : divide selector success match , selector name :neety8082o.d.s.plugin.httpclient.WebClientPlugin : The request urlPath is http://localhost:8082, retryTimes is 1
复制代码
上面的日志中一个比较明显的 divide 相关的日志,是 AbstractSoulPlugin 打印出来的,win 下双击 shift,搜索 AbstractSoulPlugin 进入,发现是一个接口,IDEA 左边向下的箭头查看它的实现,发现有一个熟系的 DividePlugin 实现类,点击进入,在一个明显的 doExecute 函数上打上断点,发起请求:http://localhost:9195
通过函数调用栈,发送调用的是 SoulWebHandler ,从下面的函数中可以大致看出这是一个循环遍历,遍历 plugins 进行操作
public Mono<Void> execute(final ServerWebExchange exchange) { return Mono.defer(() -> { if (this.index < plugins.size()) { SoulPlugin plugin = plugins.get(this.index++); Boolean skip = plugin.skip(exchange); if (skip) { return this.execute(exchange); } return plugin.execute(exchange, this); } return Mono.empty(); }); }
复制代码
跟踪调用栈
再次往前看调用栈,发送 SoulWebHandler 调用了 SoulWebHandler 的 execute
public Mono<Void> handle(@NonNull final ServerWebExchange exchange) { MetricsTrackerFacade.getInstance().counterInc(MetricsLabelEnum.REQUEST_TOTAL.getName()); Optional<HistogramMetricsTrackerDelegate> startTimer = MetricsTrackerFacade.getInstance().histogramStartTimer(MetricsLabelEnum.REQUEST_LATENCY.getName()); // new DefaultSoulPluginChain(plugins).execute(exchange) 明显的调用关系 return new DefaultSoulPluginChain(plugins).execute(exchange).subscribeOn(scheduler) .doOnSuccess(t -> startTimer.ifPresent(time -> MetricsTrackerFacade.getInstance().histogramObserveDuration(time))); }
复制代码
再往前发现看不懂了,没用明显的函数传递关系,我们在上面的函数打上端口,重启程序,再次发送请求
断点进来后,查看调用栈,发送调用上面函数的是 DefaultWebFilterChain
public Mono<Void> filter(ServerWebExchange exchange) { return Mono.defer(() -> { // 当下面都为null的时候进行调用 return this.currentFilter != null && this.chain != null ? this.invokeFilter(this.currentFilter, this.chain, exchange) : this.handler.handle(exchange); }); }
复制代码
再往前查看,调用栈又看不懂了,再次在上面的函数打上断点,重启,发请求,下面就直接写类和相关函数,有特别的地方就叫点说明
来到 FilteringWebHandler
public Mono<Void> handle(ServerWebExchange exchange) { return this.chain.filter(exchange); }
复制代码
继续来到 WebHandlerDecorator
public Mono<Void> handle(ServerWebExchange exchange) { return this.delegate.handle(exchange); }
复制代码
来到 ExceptionHandlingWebHandler
public Mono<Void> handle(ServerWebExchange exchange) { Mono completion; try { // 在这进行调用 completion = super.handle(exchange); } catch (Throwable var5) { completion = Mono.error(var5); }
WebExceptionHandler handler; for(Iterator var3 = this.exceptionHandlers.iterator(); var3.hasNext(); completion = completion.onErrorResume((ex) -> { return handler.handle(exchange, ex); })) { handler = (WebExceptionHandler)var3.next(); }
return completion; }
复制代码
继续来到 HttpWebHandlerAdapter ,这个类有点关键,看到在前面一直传递的变量:exchange,exchange 在这个类中生成,传递给后面的函数进行调用,而且是使用 response 和 request 生成的
public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) { if (this.forwardedHeaderTransformer != null) { request = this.forwardedHeaderTransformer.apply(request); }
// 重点变量 exchange 的生成 ServerWebExchange exchange = this.createExchange(request, response); LogFormatUtils.traceDebug(logger, (traceOn) -> { return exchange.getLogPrefix() + this.formatRequest(exchange.getRequest()) + (traceOn ? ", headers=" + this.formatHeaders(exchange.getRequest().getHeaders()) : ""); }); // this.getDelegate().handle(exchange) // 通过debug可以看到 getDelete 得到的是 ExceptionHandlingWebHandler,那调用就是在这 Mono var10000 = this.getDelegate().handle(exchange).doOnSuccess((aVoid) -> { this.logResponse(exchange); }).onErrorResume((ex) -> { return this.handleUnresolvedError(exchange, ex); }); response.getClass(); return var10000.then(Mono.defer(response::setComplete)); }
复制代码
继续走到 ReactiveWebServerApplicationContext
public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) { return this.handler.handle(request, response); }
复制代码
继续走到 ReactorHttpHandlerAdapter
public Mono<Void> apply(HttpServerRequest reactorRequest, HttpServerResponse reactorResponse) { NettyDataBufferFactory bufferFactory = new NettyDataBufferFactory(reactorResponse.alloc());
try { // exchange 需要的 request 和 response 的生成 ReactorServerHttpRequest request = new ReactorServerHttpRequest(reactorRequest, bufferFactory); ServerHttpResponse response = new ReactorServerHttpResponse(reactorResponse, bufferFactory); if (request.getMethod() == HttpMethod.HEAD) { response = new HttpHeadResponseDecorator((ServerHttpResponse)response); }
return this.httpHandler.handle(request, (ServerHttpResponse)response).doOnError((ex) -> { logger.trace(request.getLogPrefix() + "Failed to complete: " + ex.getMessage()); }).doOnSuccess((aVoid) -> { logger.trace(request.getLogPrefix() + "Handling completed"); }); } catch (URISyntaxException var6) { if (logger.isDebugEnabled()) { logger.debug("Failed to get request URI: " + var6.getMessage()); }
reactorResponse.status(HttpResponseStatus.BAD_REQUEST); return Mono.empty(); } }
复制代码
继续走到 HttpServerHandle
public void onStateChange(Connection connection, State newState) { if (newState == HttpServerState.REQUEST_RECEIVED) { try { if (log.isDebugEnabled()) { log.debug(ReactorNetty.format(connection.channel(), "Handler is being applied: {}"), new Object[]{this.handler}); }
HttpServerOperations ops = (HttpServerOperations)connection; Mono.fromDirect((Publisher)this.handler.apply(ops, ops)).subscribe(ops.disposeSubscriber()); } catch (Throwable var4) { log.error(ReactorNetty.format(connection.channel(), ""), var4); connection.channel().close(); } }
}
复制代码
继续走到 TcpServerBind
public void onStateChange(Connection connection, State newState) { if (newState == State.DISCONNECTING && connection.channel().isActive() && !connection.isPersistent()) { connection.dispose(); }
this.childObs.onStateChange(connection, newState); }
复制代码
走到了很关键的一个: HttpServerOperations ,下面这个函数的 ctx 和 msg 也太熟悉不过了,明显的 netty 的 handler 处理,请求入口
protected void onInboundNext(ChannelHandlerContext ctx, Object msg) { if (msg instanceof HttpRequest) { try { // 调用 this.listener().onStateChange(this, HttpServerState.REQUEST_RECEIVED); } catch (Exception var4) { this.onInboundError(var4); ReferenceCountUtil.release(msg); return; }
if (msg instanceof FullHttpRequest) { super.onInboundNext(ctx, msg); if (this.isHttp2()) { this.onInboundComplete(); } }
} else { if (msg instanceof HttpContent) { if (msg != LastHttpContent.EMPTY_LAST_CONTENT) { super.onInboundNext(ctx, msg); }
if (msg instanceof LastHttpContent) { this.onInboundComplete(); } } else { super.onInboundNext(ctx, msg); }
} }
复制代码
这个时候走到头了,我们跳出来看一看,梳理一下目前所得,发现我们搞清楚了一个请求接受,然后到 divide 的处理过程,梳理下大致如下:
HttpServerOperations : 明显的 netty 的请求接收的地方,请求入口
TcpServerBind
HttpServerHandle
ReactorHttpHandlerAdapter :生成 response 和 request
ReactiveWebServerApplicationContext
HttpWebHandlerAdapter :exchange 的生成
ExceptionHandlingWebHandler
WebHandlerDecorator
FilteringWebHandler
DefaultWebFilterChain
SoulWebHandler :plugins 调用链
DividePlugin :plugin 具体处理
逐步 debug 相关细节
这个时候参考网关模型,发现路由匹配之类的没有看到,没办法,细节部分没有清楚的就是: SoulWebHandler ,它的 plugin 调用的部分没有细看,于是我们进行如下的函数的 debug,进入各个函数的调用(进入 subscribe 之类的时候就跳出来,点击进入下一个断点,IDEA debug 左上角的箭头)
public Mono<Void> execute(final ServerWebExchange exchange) { return Mono.defer(() -> { if (this.index < plugins.size()) { SoulPlugin plugin = plugins.get(this.index++); Boolean skip = plugin.skip(exchange); if (skip) { return this.execute(exchange); } return plugin.execute(exchange, this); } return Mono.empty(); }); }
复制代码
我们逐步调试上面的那个函数,查看变量: plugins,内容大致如下,后面 false 和 true 是变量 skip。发现是 true 就不执行,看函数也能大致猜的到,各个插件的 skip 情况如下:
GlobalPlugin : false
SignPlugin : false
WafPlugin: false
RateLimiterPlugin : false
HystrixPlugin : false
Resilience4JPlugin : false
DividePlugin : false
WebClientPulugin : false
WebsocketPlugin : true
BodyParamPlugin : false
AlibabaDubblePlugin : true
MonitorPlugin : false
WebClientResponsePlugin : false
DubboResponsePlugin : true
调试的时候跟着进去,进去以后一步一步的走即可
我们调试进入前几个 plugin 都是没有执行到下面代码中的 if,在 divide plugin 执行了,我们跟着进入看看,看到了疑似路由匹配的 rules,还有 match,猜测这是路由匹配相关
public Mono<Void> execute(final ServerWebExchange exchange, final SoulPluginChain chain) { String pluginName = named(); final PluginData pluginData = BaseDataCache.getInstance().obtainPluginData(pluginName); if (pluginData != null && pluginData.getEnabled()) { final Collection<SelectorData> selectors = BaseDataCache.getInstance().obtainSelectorData(pluginName); if (CollectionUtils.isEmpty(selectors)) { return handleSelectorIsNull(pluginName, exchange, chain); } final SelectorData selectorData = matchSelector(exchange, selectors); if (Objects.isNull(selectorData)) { return handleSelectorIsNull(pluginName, exchange, chain); } selectorLog(selectorData, pluginName); final List<RuleData> rules = BaseDataCache.getInstance().obtainRuleData(selectorData.getId()); if (CollectionUtils.isEmpty(rules)) { return handleRuleIsNull(pluginName, exchange, chain); } RuleData rule; if (selectorData.getType() == SelectorTypeEnum.FULL_FLOW.getCode()) { //get last rule = rules.get(rules.size() - 1); } else { // divide plugin 执行到这步,在rules,发现我们配置的规则,猜测这里是路由匹配 rule = matchRule(exchange, rules); } if (Objects.isNull(rule)) { return handleRuleIsNull(pluginName, exchange, chain); } ruleLog(rule, pluginName); return doExecute(exchange, chain, selectorData, rule); } return chain.execute(exchange); }
复制代码
继续 debug,进入: WebClientPlugin ,看到了疑似发送请求给后台服务器,相关代码如下:
public Mono<Void> execute(final ServerWebExchange exchange, final SoulPluginChain chain) { final SoulContext soulContext = exchange.getAttribute(Constants.CONTEXT); assert soulContext != null; String urlPath = exchange.getAttribute(Constants.HTTP_URL); if (StringUtils.isEmpty(urlPath)) { Object error = SoulResultWrap.error(SoulResultEnum.CANNOT_FIND_URL.getCode(), SoulResultEnum.CANNOT_FIND_URL.getMsg(), null); return WebFluxResultUtils.result(exchange, error); } long timeout = (long) Optional.ofNullable(exchange.getAttribute(Constants.HTTP_TIME_OUT)).orElse(3000L); int retryTimes = (int) Optional.ofNullable(exchange.getAttribute(Constants.HTTP_RETRY)).orElse(0); log.info("The request urlPath is {}, retryTimes is {}", urlPath, retryTimes); HttpMethod method = HttpMethod.valueOf(exchange.getRequest().getMethodValue()); WebClient.RequestBodySpec requestBodySpec = webClient.method(method).uri(urlPath); return handleRequestBody(requestBodySpec, exchange, timeout, retryTimes, chain); }
private Mono<Void> handleRequestBody(final WebClient.RequestBodySpec requestBodySpec, final ServerWebExchange exchange, final long timeout, final int retryTimes, final SoulPluginChain chain) { // 下面这段代码太想前端的 ajax 调用了,猜测这是 请求发送 return requestBodySpec.headers(httpHeaders -> { httpHeaders.addAll(exchange.getRequest().getHeaders()); httpHeaders.remove(HttpHeaders.HOST); }) .contentType(buildMediaType(exchange)) .body(BodyInserters.fromDataBuffers(exchange.getRequest().getBody())) .exchange() .doOnError(e -> log.error(e.getMessage())) .timeout(Duration.ofMillis(timeout)) .retryWhen(Retry.onlyIf(x -> x.exception() instanceof ConnectTimeoutException) .retryMax(retryTimes) .backoff(Backoff.exponential(Duration.ofMillis(200), Duration.ofSeconds(20), 2, true))) .flatMap(e -> doNext(e, exchange, chain));
}
private Mono<Void> doNext(final ClientResponse res, final ServerWebExchange exchange, final SoulPluginChain chain) { if (res.statusCode().is2xxSuccessful()) { exchange.getAttributes().put(Constants.CLIENT_RESPONSE_RESULT_TYPE, ResultEnum.SUCCESS.getName()); } else { exchange.getAttributes().put(Constants.CLIENT_RESPONSE_RESULT_TYPE, ResultEnum.ERROR.getName()); } exchange.getAttributes().put(Constants.CLIENT_RESPONSE_ATTR, res); return chain.execute(exchange); }
复制代码
继续进入,来到: WebClientResponsePlugin ,发现疑似 response 返回给客户端的代码
public Mono<Void> execute(final ServerWebExchange exchange, final SoulPluginChain chain) { return chain.execute(exchange).then(Mono.defer(() -> { ServerHttpResponse response = exchange.getResponse(); ClientResponse clientResponse = exchange.getAttribute(Constants.CLIENT_RESPONSE_ATTR); if (Objects.isNull(clientResponse) || response.getStatusCode() == HttpStatus.BAD_GATEWAY || response.getStatusCode() == HttpStatus.INTERNAL_SERVER_ERROR) { Object error = SoulResultWrap.error(SoulResultEnum.SERVICE_RESULT_ERROR.getCode(), SoulResultEnum.SERVICE_RESULT_ERROR.getMsg(), null); return WebFluxResultUtils.result(exchange, error); } if (response.getStatusCode() == HttpStatus.GATEWAY_TIMEOUT) { Object error = SoulResultWrap.error(SoulResultEnum.SERVICE_TIMEOUT.getCode(), SoulResultEnum.SERVICE_TIMEOUT.getMsg(), null); return WebFluxResultUtils.result(exchange, error); } response.setStatusCode(clientResponse.statusCode()); response.getCookies().putAll(clientResponse.cookies()); response.getHeaders().putAll(clientResponse.headers().asHttpHeaders()); // 疑似响应返回 return response.writeWith(clientResponse.body(BodyExtractors.toDataBuffers())); })); }
复制代码
经过一通调试,感觉 DividePlugin 、WebClientPlugin 、 WebClientResponsePlugin 非常的可疑,我们取消所有的断点,然后对他们三个打上断点进行调试
首先对 DividePlugin 进行测试,我们发送一个没有配置的请求 http://localhost:9195/get ,发送后一路 debug 下来,发现返回结果如下:
{ "code": -107, "message": "Can not find selector, please check your configuration!", "data": null}
复制代码
符合我们的预期,我们看下相关的代码,后面还测试了配置不正确的,发现都会进入下面的调用函数:
public static Mono<Void> result(final ServerWebExchange exchange, final Object result) { exchange.getResponse().getHeaders().setContentType(MediaType.APPLICATION_JSON); // 这个exchange.getResponse().writeWith 很前面看到基本一样,可以猜测soul里面估计都是这样返回响应的 return exchange.getResponse().writeWith(Mono.just(exchange.getResponse() .bufferFactory().wrap(Objects.requireNonNull(JsonUtils.toJson(result)).getBytes()))); }
复制代码
继续测试正确的请求,WebClientPlugin 经过上面的分析和根据函数大致代码判断,handleRequestBody 发送请求,doNext 可以收到请求结果,我们在 doNext 上打上断点,发现果然在后台服务端收到了请求(自己写的 netty 服务,并打印日志),验证我们这里是请求发送的猜想
WebClientResponsePlugin 我们在 execute 函数打上断点,调试的时候注意到,进入了两次,一次是 chain.execute(exchange),一次是后面的 then,这个有点像 lamda 表达式(也可以类比为 vue 的请求),而且 then 是等所有 plugin 都运行以后才执行,执行完以后进客户端就得到了结果
具体的发送逻辑还没有看懂,但不影响这次的处理流程解析。再 debug 的时候还有一个不断循环调用的地方,我们回过头来去看一下它,看看有什么分析疏漏没
在类: DefaultWebFilterChain 中有这么一点循环调用,代码如下:
public Mono<Void> filter(ServerWebExchange exchange) { return Mono.defer(() -> { return this.currentFilter != null && this.chain != null ? this.invokeFilter(this.currentFilter, this.chain, exchange) : this.handler.handle(exchange); }); }
复制代码
我们查看其相关类有下面几个,进入相应的类看了看,大致如下:
MetricsWebFilter : 没看懂
HealthFilter : 感觉类似监控检查,可以直接方法,因为是本地的;"/actuator/health", "/health_check",尝试了下确实不走后面的处理逻辑,直接返回了
FileSizeFilter :文件上传大小限制?MediaType.MULTIPART_FORM_DATA.isCompatibleWith(mediaType)
WebSocketParamFilter :不太清楚其功能
HiddenHttpMethodFilter :看到好像没有啥逻辑代码
得到了健康检查之类的直接在 HealthFilter 中返回,文件上传大小限制功能也在 filter 中,其他的不太清楚,但不影响大局
总结
经过总结梳理,得到下面的初步处理流程概览:
HttpServerOperations : 明显的 netty 的请求接收的地方,请求入口
ReactorHttpHandlerAdapter :生成 response 和 request
HttpWebHandlerAdapter :exchange 的生成
FilteringWebHandler : filter 操作
SoulWebHandler :plugins 调用链
请求由 netty 收到后,来到 Filter,这里进行一些处理:健康检查,文件大小检查等待,然后来到核心的 plugins,这里有三个部分的 plugin 需要注意(自己给它分的,初步猜测):
前置处理:这里的 plugin 都会进行匹配,感觉就是针对配置后的 url 进行认证、黑名单、限流、降级、熔断等操作
请求发送:这里对 HTTP、websocket、rpc、文件上传(这个是猜测)这四种请求进行处理,发送到后端服务器
响应返回:这里就两种响应,HTTP 和 rpc,拿到后返回给客户端
可以看到 plugins 非常的核心,关键功能都是在这里实现的,其中 divide plugin 好像扮演了路由匹配的角色,在 Soul 中就没有明显单独的路由匹配
请求和响应的处理也是在 plugins 进行处理的
评论