Soul 网关源码阅读(三)请求处理概览
简介
基于上篇:Soul 源码阅读(二)代码初步运行的配置,这次 debug 下请求处理的大致路径,验证网关模型的路径
详细流程记录
查看运行日志,寻找切入点
上篇中我们配置了 Divide 插件,让 http://localhost:9195 ,转发到了后台服务器 http://localhost:8082 上,首先不打断点运行,查看运行日志,找一个切入点:
o.d.soul.plugin.base.AbstractSoulPlugin : divide selector success match , selector name :neety8082
o.d.soul.plugin.base.AbstractSoulPlugin : divide selector success match , selector name :neety8082
o.d.s.plugin.httpclient.WebClientPlugin : The request urlPath is http://localhost:8082, retryTimes is 1
复制代码
上面的日志中一个比较明显的 divide 相关的日志,是 AbstractSoulPlugin 打印出来的,win 下双击 shift,搜索 AbstractSoulPlugin 进入,发现是一个接口,IDEA 左边向下的箭头查看它的实现,发现有一个熟系的 DividePlugin 实现类,点击进入,在一个明显的 doExecute 函数上打上断点,发起请求:http://localhost:9195
通过函数调用栈,发送调用的是 SoulWebHandler ,从下面的函数中可以大致看出这是一个循环遍历,遍历 plugins 进行操作
public Mono<Void> execute(final ServerWebExchange exchange) {
return Mono.defer(() -> {
if (this.index < plugins.size()) {
SoulPlugin plugin = plugins.get(this.index++);
Boolean skip = plugin.skip(exchange);
if (skip) {
return this.execute(exchange);
}
return plugin.execute(exchange, this);
}
return Mono.empty();
});
}
复制代码
跟踪调用栈
再次往前看调用栈,发送 SoulWebHandler 调用了 SoulWebHandler 的 execute
public Mono<Void> handle(@NonNull final ServerWebExchange exchange) {
MetricsTrackerFacade.getInstance().counterInc(MetricsLabelEnum.REQUEST_TOTAL.getName());
Optional<HistogramMetricsTrackerDelegate> startTimer = MetricsTrackerFacade.getInstance().histogramStartTimer(MetricsLabelEnum.REQUEST_LATENCY.getName());
// new DefaultSoulPluginChain(plugins).execute(exchange) 明显的调用关系
return new DefaultSoulPluginChain(plugins).execute(exchange).subscribeOn(scheduler)
.doOnSuccess(t -> startTimer.ifPresent(time -> MetricsTrackerFacade.getInstance().histogramObserveDuration(time)));
}
复制代码
再往前发现看不懂了,没用明显的函数传递关系,我们在上面的函数打上端口,重启程序,再次发送请求
断点进来后,查看调用栈,发送调用上面函数的是 DefaultWebFilterChain
public Mono<Void> filter(ServerWebExchange exchange) {
return Mono.defer(() -> {
// 当下面都为null的时候进行调用
return this.currentFilter != null && this.chain != null ? this.invokeFilter(this.currentFilter, this.chain, exchange) : this.handler.handle(exchange);
});
}
复制代码
再往前查看,调用栈又看不懂了,再次在上面的函数打上断点,重启,发请求,下面就直接写类和相关函数,有特别的地方就叫点说明
来到 FilteringWebHandler
public Mono<Void> handle(ServerWebExchange exchange) {
return this.chain.filter(exchange);
}
复制代码
继续来到 WebHandlerDecorator
public Mono<Void> handle(ServerWebExchange exchange) {
return this.delegate.handle(exchange);
}
复制代码
来到 ExceptionHandlingWebHandler
public Mono<Void> handle(ServerWebExchange exchange) {
Mono completion;
try {
// 在这进行调用
completion = super.handle(exchange);
} catch (Throwable var5) {
completion = Mono.error(var5);
}
WebExceptionHandler handler;
for(Iterator var3 = this.exceptionHandlers.iterator(); var3.hasNext(); completion = completion.onErrorResume((ex) -> {
return handler.handle(exchange, ex);
})) {
handler = (WebExceptionHandler)var3.next();
}
return completion;
}
复制代码
继续来到 HttpWebHandlerAdapter ,这个类有点关键,看到在前面一直传递的变量:exchange,exchange 在这个类中生成,传递给后面的函数进行调用,而且是使用 response 和 request 生成的
public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
if (this.forwardedHeaderTransformer != null) {
request = this.forwardedHeaderTransformer.apply(request);
}
// 重点变量 exchange 的生成
ServerWebExchange exchange = this.createExchange(request, response);
LogFormatUtils.traceDebug(logger, (traceOn) -> {
return exchange.getLogPrefix() + this.formatRequest(exchange.getRequest()) + (traceOn ? ", headers=" + this.formatHeaders(exchange.getRequest().getHeaders()) : "");
});
// this.getDelegate().handle(exchange)
// 通过debug可以看到 getDelete 得到的是 ExceptionHandlingWebHandler,那调用就是在这
Mono var10000 = this.getDelegate().handle(exchange).doOnSuccess((aVoid) -> {
this.logResponse(exchange);
}).onErrorResume((ex) -> {
return this.handleUnresolvedError(exchange, ex);
});
response.getClass();
return var10000.then(Mono.defer(response::setComplete));
}
复制代码
继续走到 ReactiveWebServerApplicationContext
public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
return this.handler.handle(request, response);
}
复制代码
继续走到 ReactorHttpHandlerAdapter
public Mono<Void> apply(HttpServerRequest reactorRequest, HttpServerResponse reactorResponse) {
NettyDataBufferFactory bufferFactory = new NettyDataBufferFactory(reactorResponse.alloc());
try {
// exchange 需要的 request 和 response 的生成
ReactorServerHttpRequest request = new ReactorServerHttpRequest(reactorRequest, bufferFactory);
ServerHttpResponse response = new ReactorServerHttpResponse(reactorResponse, bufferFactory);
if (request.getMethod() == HttpMethod.HEAD) {
response = new HttpHeadResponseDecorator((ServerHttpResponse)response);
}
return this.httpHandler.handle(request, (ServerHttpResponse)response).doOnError((ex) -> {
logger.trace(request.getLogPrefix() + "Failed to complete: " + ex.getMessage());
}).doOnSuccess((aVoid) -> {
logger.trace(request.getLogPrefix() + "Handling completed");
});
} catch (URISyntaxException var6) {
if (logger.isDebugEnabled()) {
logger.debug("Failed to get request URI: " + var6.getMessage());
}
reactorResponse.status(HttpResponseStatus.BAD_REQUEST);
return Mono.empty();
}
}
复制代码
继续走到 HttpServerHandle
public void onStateChange(Connection connection, State newState) {
if (newState == HttpServerState.REQUEST_RECEIVED) {
try {
if (log.isDebugEnabled()) {
log.debug(ReactorNetty.format(connection.channel(), "Handler is being applied: {}"), new Object[]{this.handler});
}
HttpServerOperations ops = (HttpServerOperations)connection;
Mono.fromDirect((Publisher)this.handler.apply(ops, ops)).subscribe(ops.disposeSubscriber());
} catch (Throwable var4) {
log.error(ReactorNetty.format(connection.channel(), ""), var4);
connection.channel().close();
}
}
}
复制代码
继续走到 TcpServerBind
public void onStateChange(Connection connection, State newState) {
if (newState == State.DISCONNECTING && connection.channel().isActive() && !connection.isPersistent()) {
connection.dispose();
}
this.childObs.onStateChange(connection, newState);
}
复制代码
走到了很关键的一个: HttpServerOperations ,下面这个函数的 ctx 和 msg 也太熟悉不过了,明显的 netty 的 handler 处理,请求入口
protected void onInboundNext(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof HttpRequest) {
try {
// 调用
this.listener().onStateChange(this, HttpServerState.REQUEST_RECEIVED);
} catch (Exception var4) {
this.onInboundError(var4);
ReferenceCountUtil.release(msg);
return;
}
if (msg instanceof FullHttpRequest) {
super.onInboundNext(ctx, msg);
if (this.isHttp2()) {
this.onInboundComplete();
}
}
} else {
if (msg instanceof HttpContent) {
if (msg != LastHttpContent.EMPTY_LAST_CONTENT) {
super.onInboundNext(ctx, msg);
}
if (msg instanceof LastHttpContent) {
this.onInboundComplete();
}
} else {
super.onInboundNext(ctx, msg);
}
}
}
复制代码
这个时候走到头了,我们跳出来看一看,梳理一下目前所得,发现我们搞清楚了一个请求接受,然后到 divide 的处理过程,梳理下大致如下:
HttpServerOperations : 明显的 netty 的请求接收的地方,请求入口
TcpServerBind
HttpServerHandle
ReactorHttpHandlerAdapter :生成 response 和 request
ReactiveWebServerApplicationContext
HttpWebHandlerAdapter :exchange 的生成
ExceptionHandlingWebHandler
WebHandlerDecorator
FilteringWebHandler
DefaultWebFilterChain
SoulWebHandler :plugins 调用链
DividePlugin :plugin 具体处理
逐步 debug 相关细节
这个时候参考网关模型,发现路由匹配之类的没有看到,没办法,细节部分没有清楚的就是: SoulWebHandler ,它的 plugin 调用的部分没有细看,于是我们进行如下的函数的 debug,进入各个函数的调用(进入 subscribe 之类的时候就跳出来,点击进入下一个断点,IDEA debug 左上角的箭头)
public Mono<Void> execute(final ServerWebExchange exchange) {
return Mono.defer(() -> {
if (this.index < plugins.size()) {
SoulPlugin plugin = plugins.get(this.index++);
Boolean skip = plugin.skip(exchange);
if (skip) {
return this.execute(exchange);
}
return plugin.execute(exchange, this);
}
return Mono.empty();
});
}
复制代码
我们逐步调试上面的那个函数,查看变量: plugins,内容大致如下,后面 false 和 true 是变量 skip。发现是 true 就不执行,看函数也能大致猜的到,各个插件的 skip 情况如下:
GlobalPlugin : false
SignPlugin : false
WafPlugin: false
RateLimiterPlugin : false
HystrixPlugin : false
Resilience4JPlugin : false
DividePlugin : false
WebClientPulugin : false
WebsocketPlugin : true
BodyParamPlugin : false
AlibabaDubblePlugin : true
MonitorPlugin : false
WebClientResponsePlugin : false
DubboResponsePlugin : true
调试的时候跟着进去,进去以后一步一步的走即可
我们调试进入前几个 plugin 都是没有执行到下面代码中的 if,在 divide plugin 执行了,我们跟着进入看看,看到了疑似路由匹配的 rules,还有 match,猜测这是路由匹配相关
public Mono<Void> execute(final ServerWebExchange exchange, final SoulPluginChain chain) {
String pluginName = named();
final PluginData pluginData = BaseDataCache.getInstance().obtainPluginData(pluginName);
if (pluginData != null && pluginData.getEnabled()) {
final Collection<SelectorData> selectors = BaseDataCache.getInstance().obtainSelectorData(pluginName);
if (CollectionUtils.isEmpty(selectors)) {
return handleSelectorIsNull(pluginName, exchange, chain);
}
final SelectorData selectorData = matchSelector(exchange, selectors);
if (Objects.isNull(selectorData)) {
return handleSelectorIsNull(pluginName, exchange, chain);
}
selectorLog(selectorData, pluginName);
final List<RuleData> rules = BaseDataCache.getInstance().obtainRuleData(selectorData.getId());
if (CollectionUtils.isEmpty(rules)) {
return handleRuleIsNull(pluginName, exchange, chain);
}
RuleData rule;
if (selectorData.getType() == SelectorTypeEnum.FULL_FLOW.getCode()) {
//get last
rule = rules.get(rules.size() - 1);
} else {
// divide plugin 执行到这步,在rules,发现我们配置的规则,猜测这里是路由匹配
rule = matchRule(exchange, rules);
}
if (Objects.isNull(rule)) {
return handleRuleIsNull(pluginName, exchange, chain);
}
ruleLog(rule, pluginName);
return doExecute(exchange, chain, selectorData, rule);
}
return chain.execute(exchange);
}
复制代码
继续 debug,进入: WebClientPlugin ,看到了疑似发送请求给后台服务器,相关代码如下:
public Mono<Void> execute(final ServerWebExchange exchange, final SoulPluginChain chain) {
final SoulContext soulContext = exchange.getAttribute(Constants.CONTEXT);
assert soulContext != null;
String urlPath = exchange.getAttribute(Constants.HTTP_URL);
if (StringUtils.isEmpty(urlPath)) {
Object error = SoulResultWrap.error(SoulResultEnum.CANNOT_FIND_URL.getCode(), SoulResultEnum.CANNOT_FIND_URL.getMsg(), null);
return WebFluxResultUtils.result(exchange, error);
}
long timeout = (long) Optional.ofNullable(exchange.getAttribute(Constants.HTTP_TIME_OUT)).orElse(3000L);
int retryTimes = (int) Optional.ofNullable(exchange.getAttribute(Constants.HTTP_RETRY)).orElse(0);
log.info("The request urlPath is {}, retryTimes is {}", urlPath, retryTimes);
HttpMethod method = HttpMethod.valueOf(exchange.getRequest().getMethodValue());
WebClient.RequestBodySpec requestBodySpec = webClient.method(method).uri(urlPath);
return handleRequestBody(requestBodySpec, exchange, timeout, retryTimes, chain);
}
private Mono<Void> handleRequestBody(final WebClient.RequestBodySpec requestBodySpec,
final ServerWebExchange exchange,
final long timeout,
final int retryTimes,
final SoulPluginChain chain) {
// 下面这段代码太想前端的 ajax 调用了,猜测这是 请求发送
return requestBodySpec.headers(httpHeaders -> {
httpHeaders.addAll(exchange.getRequest().getHeaders());
httpHeaders.remove(HttpHeaders.HOST);
})
.contentType(buildMediaType(exchange))
.body(BodyInserters.fromDataBuffers(exchange.getRequest().getBody()))
.exchange()
.doOnError(e -> log.error(e.getMessage()))
.timeout(Duration.ofMillis(timeout))
.retryWhen(Retry.onlyIf(x -> x.exception() instanceof ConnectTimeoutException)
.retryMax(retryTimes)
.backoff(Backoff.exponential(Duration.ofMillis(200), Duration.ofSeconds(20), 2, true)))
.flatMap(e -> doNext(e, exchange, chain));
}
private Mono<Void> doNext(final ClientResponse res, final ServerWebExchange exchange, final SoulPluginChain chain) {
if (res.statusCode().is2xxSuccessful()) {
exchange.getAttributes().put(Constants.CLIENT_RESPONSE_RESULT_TYPE, ResultEnum.SUCCESS.getName());
} else {
exchange.getAttributes().put(Constants.CLIENT_RESPONSE_RESULT_TYPE, ResultEnum.ERROR.getName());
}
exchange.getAttributes().put(Constants.CLIENT_RESPONSE_ATTR, res);
return chain.execute(exchange);
}
复制代码
继续进入,来到: WebClientResponsePlugin ,发现疑似 response 返回给客户端的代码
public Mono<Void> execute(final ServerWebExchange exchange, final SoulPluginChain chain) {
return chain.execute(exchange).then(Mono.defer(() -> {
ServerHttpResponse response = exchange.getResponse();
ClientResponse clientResponse = exchange.getAttribute(Constants.CLIENT_RESPONSE_ATTR);
if (Objects.isNull(clientResponse)
|| response.getStatusCode() == HttpStatus.BAD_GATEWAY
|| response.getStatusCode() == HttpStatus.INTERNAL_SERVER_ERROR) {
Object error = SoulResultWrap.error(SoulResultEnum.SERVICE_RESULT_ERROR.getCode(), SoulResultEnum.SERVICE_RESULT_ERROR.getMsg(), null);
return WebFluxResultUtils.result(exchange, error);
}
if (response.getStatusCode() == HttpStatus.GATEWAY_TIMEOUT) {
Object error = SoulResultWrap.error(SoulResultEnum.SERVICE_TIMEOUT.getCode(), SoulResultEnum.SERVICE_TIMEOUT.getMsg(), null);
return WebFluxResultUtils.result(exchange, error);
}
response.setStatusCode(clientResponse.statusCode());
response.getCookies().putAll(clientResponse.cookies());
response.getHeaders().putAll(clientResponse.headers().asHttpHeaders());
// 疑似响应返回
return response.writeWith(clientResponse.body(BodyExtractors.toDataBuffers()));
}));
}
复制代码
经过一通调试,感觉 DividePlugin 、WebClientPlugin 、 WebClientResponsePlugin 非常的可疑,我们取消所有的断点,然后对他们三个打上断点进行调试
首先对 DividePlugin 进行测试,我们发送一个没有配置的请求 http://localhost:9195/get ,发送后一路 debug 下来,发现返回结果如下:
{
"code": -107,
"message": "Can not find selector, please check your configuration!",
"data": null
}
复制代码
符合我们的预期,我们看下相关的代码,后面还测试了配置不正确的,发现都会进入下面的调用函数:
public static Mono<Void> result(final ServerWebExchange exchange, final Object result) {
exchange.getResponse().getHeaders().setContentType(MediaType.APPLICATION_JSON);
// 这个exchange.getResponse().writeWith 很前面看到基本一样,可以猜测soul里面估计都是这样返回响应的
return exchange.getResponse().writeWith(Mono.just(exchange.getResponse()
.bufferFactory().wrap(Objects.requireNonNull(JsonUtils.toJson(result)).getBytes())));
}
复制代码
继续测试正确的请求,WebClientPlugin 经过上面的分析和根据函数大致代码判断,handleRequestBody 发送请求,doNext 可以收到请求结果,我们在 doNext 上打上断点,发现果然在后台服务端收到了请求(自己写的 netty 服务,并打印日志),验证我们这里是请求发送的猜想
WebClientResponsePlugin 我们在 execute 函数打上断点,调试的时候注意到,进入了两次,一次是 chain.execute(exchange),一次是后面的 then,这个有点像 lamda 表达式(也可以类比为 vue 的请求),而且 then 是等所有 plugin 都运行以后才执行,执行完以后进客户端就得到了结果
具体的发送逻辑还没有看懂,但不影响这次的处理流程解析。再 debug 的时候还有一个不断循环调用的地方,我们回过头来去看一下它,看看有什么分析疏漏没
在类: DefaultWebFilterChain 中有这么一点循环调用,代码如下:
public Mono<Void> filter(ServerWebExchange exchange) {
return Mono.defer(() -> {
return this.currentFilter != null && this.chain != null ? this.invokeFilter(this.currentFilter, this.chain, exchange) : this.handler.handle(exchange);
});
}
复制代码
我们查看其相关类有下面几个,进入相应的类看了看,大致如下:
MetricsWebFilter : 没看懂
HealthFilter : 感觉类似监控检查,可以直接方法,因为是本地的;"/actuator/health", "/health_check",尝试了下确实不走后面的处理逻辑,直接返回了
FileSizeFilter :文件上传大小限制?MediaType.MULTIPART_FORM_DATA.isCompatibleWith(mediaType)
WebSocketParamFilter :不太清楚其功能
HiddenHttpMethodFilter :看到好像没有啥逻辑代码
得到了健康检查之类的直接在 HealthFilter 中返回,文件上传大小限制功能也在 filter 中,其他的不太清楚,但不影响大局
总结
经过总结梳理,得到下面的初步处理流程概览:
HttpServerOperations : 明显的 netty 的请求接收的地方,请求入口
ReactorHttpHandlerAdapter :生成 response 和 request
HttpWebHandlerAdapter :exchange 的生成
FilteringWebHandler : filter 操作
SoulWebHandler :plugins 调用链
请求由 netty 收到后,来到 Filter,这里进行一些处理:健康检查,文件大小检查等待,然后来到核心的 plugins,这里有三个部分的 plugin 需要注意(自己给它分的,初步猜测):
前置处理:这里的 plugin 都会进行匹配,感觉就是针对配置后的 url 进行认证、黑名单、限流、降级、熔断等操作
请求发送:这里对 HTTP、websocket、rpc、文件上传(这个是猜测)这四种请求进行处理,发送到后端服务器
响应返回:这里就两种响应,HTTP 和 rpc,拿到后返回给客户端
可以看到 plugins 非常的核心,关键功能都是在这里实现的,其中 divide plugin 好像扮演了路由匹配的角色,在 Soul 中就没有明显单独的路由匹配
请求和响应的处理也是在 plugins 进行处理的
评论