写点什么

Soul 网关源码阅读(四)Dubbo 请求概览

用户头像
关注
发布于: 2021 年 01 月 15 日

Soul 网关源码阅读(四)Dubbo 请求概览




简介


    本次启动一个 dubbo 服务示例,初步探索 Soul 网关源码的 Dubbo 请求处理流程


示例运行


环境配置


    在 Soul 源码 clone 下来以后,有一个 soul-example 目录,这个就是示例工程,里面有很多的示例可以运行


    可能初始文件夹不被 IDEA 识别为 Java 工程,我们需要点击改工程目录下的 pom.xml 文件,然后在右键,在菜单中选择:add as maven project


    我们选择运行:soul-examples --> soul-examples-apache-dubbo-service


    此次示例需要 mysql 和 zookeeper,我们使用 docker 启动一下,记得修改 soul-admin 的数据库配置,其他的都不需要懂,猜测 zk 有默认配置


docker run -dit --name zk -p 2181:2181 zookeepedocker run --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=123456 -d mysql:latest
复制代码


测试运行


    然后运行 Soul-admin、Soul-bootstrap,可以参考:Soul 源码阅读(二)代码初步运行


    运行 soul-examples-apache-dubbo-service


    登录 soul-admin 管理界面:http://localhost:9095/ ,账号和密码:admin 123456


    进入界面:系统管理 --> 插件管理


    在插件:dubbo,点击编辑,将状态修改为开启(是个开关图标)


    开启成功后,可以在 Soul-Bootstrap 中看到相关的日志,大致如下:


o.d.s.p.a.d.c.ApplicationConfigCache     : init aliaba dubbo reference success there meteData is :MetaDatao.d.s.p.a.d.c.ApplicationConfigCache     : init aliaba dubbo reference success there meteData is :MetaDatao.d.s.p.a.d.c.ApplicationConfigCache     : init aliaba dubbo reference success there meteData is :MetaData
复制代码


    进入 Soul-admin 管理界面:插件管理 --> dubbo ,我们可以看到很多的配置,这些都是 soul-examples-apache-dubbo-service 的配置


    随便找个查询的接口:http://localhost:9195/dubbo/findAll


    使用浏览器访问得到的结果如下:


{    "code": 200,    "message": "Access to success!",    "data": {        "name": "hello world Soul Apache, findAll",        "id": "-1206394682"    }}
复制代码


    很好,已经成功跑通示例,接下来,进入源码进行 debug


源码分析


    基于上篇,我们已经知道了一个 HTTP 请求的初步处理流程:soul源码阅读3-请求处理概览.md


    基于上篇的熟悉度,我们就开始看看 Dubbo 的处理流程和 Http 的有什么区别,顺带看一看各个 Plugin 都干了啥


    Plugins 的链式处理核心类是:SoulWebHandler,我们就从这里开始打上断点,逐步进入每个 Plugin 进行查看


        public Mono<Void> execute(final ServerWebExchange exchange) {            return Mono.defer(() -> {                if (this.index < plugins.size()) {                    SoulPlugin plugin = plugins.get(this.index++);                    Boolean skip = plugin.skip(exchange);                    if (skip) {                        return this.execute(exchange);                    }                    return plugin.execute(exchange, this);                }                return Mono.empty();            });        }
复制代码


GlobalPlugin


    代码看的不是很懂,猜测有下面的功能:


  • 获取请求头的 upgrade,此时为 null,进入逻辑

  • 另外的分支的 MultiValueMap 有点像文件上传之类需要的,后面有时间验证一下

  • 功能是修改了 SoulContext


    这个不是此次分析的目标,到这就跳过


SignPlugin


    这个应该是一个认证相关的插件,但在获取插件信息,没有开启,不进入逻辑,直接跳过,没有进入具体执行


    这里发现有的能需要进入函数:execute,有的直接进入:doExecute。稍微有点好奇,就看了下原因,大致是直接继承 SoulPlugin 就直接进入 doExecute,而 AbstractSoulPlugin 需要进入 execute,这里不是重点,就不继续研究这个了,有个大概了解即可


WafPlugin、RateLimiterPlugin、HystrixPlugin、Resilience4JPlugin


    上面几个都是获取插件信息,没有开启,不进入逻辑,直接跳过


DividePlugin


    这里是 true,有点和自己想的不一样,查看管理界面看到 divide 是开启的,稍微挖一下这个


    定位到关键函数:Boolean skip = plugin.skip(exchange),判断是否可以跳过,我们看下具体的代码:


    public Boolean skip(final ServerWebExchange exchange) {        final SoulContext soulContext = exchange.getAttribute(Constants.CONTEXT);        return !Objects.equals(Objects.requireNonNull(soulContext).getRpcType(), RpcTypeEnum.HTTP.getName());    }
复制代码


    好像就是判断类型来返回是否跳过,其他的 plugins 的判断逻辑,至于这些都是都是怎么来的,今天就不分析了,保留体力,留待下次进行分析


WebClientPulugin、WebsocketPlugin


    skip 为 true,直接跳过,跳过的判断的和 DividePlugin 基本相同,通过类型判断


BodyParamPlugin


    对 soulContext 进行了操作,和 GlobalPlugin 有一些联动,还看到了 MediaType 等关键字,感觉很像文件上传之类的,但细节不太清楚,这个也不是本次分析的目的,不要陷入细节,这次放过它,下次有时间再仔细研究


AlibabaDubblePlugin


    这个插件是本次的核心,大致干了下面三个事情


  • 进入了并且匹配上了规则,发现这个 plugin 是 dubbo 总插件,同样能处理 Apache dubbo,也就是他们是相同的或者复用的

  • 进入到 doExecute 函数:获取 body,soulContext,查看 soulContext 发现已经有方法、路径等信息(猜测是前面加的),获得关键的 metaData

  • 获取请求的结果,并且放到 exchange 中:Object result = alibabaDubboProxyService.genericInvoker(body, metaData)


    代码大致如下:


    protected Mono<Void> doExecute(final ServerWebExchange exchange, final SoulPluginChain chain, final SelectorData selector, final RuleData rule) {        String body = exchange.getAttribute(Constants.DUBBO_PARAMS);        SoulContext soulContext = exchange.getAttribute(Constants.CONTEXT);        assert soulContext != null;        // dubbo 请求的相关数据        MetaData metaData = exchange.getAttribute(Constants.META_DATA);        if (!checkMetaData(metaData)) {            assert metaData != null;            log.error(" path is :{}, meta data have error.... {}", soulContext.getPath(), metaData.toString());            exchange.getResponse().setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR);            Object error = SoulResultWrap.error(SoulResultEnum.META_DATA_ERROR.getCode(), SoulResultEnum.META_DATA_ERROR.getMsg(), null);            return WebFluxResultUtils.result(exchange, error);        }        if (StringUtils.isNoneBlank(metaData.getParameterTypes()) && StringUtils.isBlank(body)) {            exchange.getResponse().setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR);            Object error = SoulResultWrap.error(SoulResultEnum.DUBBO_HAVE_BODY_PARAM.getCode(), SoulResultEnum.DUBBO_HAVE_BODY_PARAM.getMsg(), null);            return WebFluxResultUtils.result(exchange, error);        }        // 获取请求结果        Object result = alibabaDubboProxyService.genericInvoker(body, metaData);        if (Objects.nonNull(result)) {            // 将结果放到exchange中            exchange.getAttributes().put(Constants.DUBBO_RPC_RESULT, result);        } else {            exchange.getAttributes().put(Constants.DUBBO_RPC_RESULT, Constants.DUBBO_RPC_RESULT_EMPTY);        }        exchange.getAttributes().put(Constants.CLIENT_RESPONSE_RESULT_TYPE, ResultEnum.SUCCESS.getName());        return chain.execute(exchange);    }
复制代码


    继续跟踪查看获取 result 的那个函数,通过下面的代码可以明显看出确实是 rpc 调用,并得到结果


    public Object genericInvoker(final String body, final MetaData metaData) throws SoulException {        // 通过rpc和dubbo的相关知识,这里的reference相当于consumer,metaData.getPath()==/dubbo/findAll        // ApplicationConfigCache.getInstance().get 的逻辑感觉比较复杂,先不看了,大体感觉是初始化的时候使用 path 作为了 key        ReferenceConfig<GenericService> reference = ApplicationConfigCache.getInstance().get(metaData.getPath());        if (Objects.isNull(reference) || StringUtils.isEmpty(reference.getInterface())) {            ApplicationConfigCache.getInstance().invalidate(metaData.getPath());            reference = ApplicationConfigCache.getInstance().initRef(metaData);        }        // 这段很像RPC Demo中的获取字节码生成的对象        GenericService genericService = reference.get();        try {            Pair<String[], Object[]> pair;            if (ParamCheckUtils.dubboBodyIsEmpty(body)) {                pair = new ImmutablePair<>(new String[]{}, new Object[]{});            } else {                pair = dubboParamResolveService.buildParameter(body, metaData.getParameterTypes());            }            // 传入方法名、参数名、参数?Dubbo还没用的熟练,暂时这样猜一猜,不影响此次分析的大局            return genericService.$invoke(metaData.getMethodName(), pair.getLeft(), pair.getRight());        } catch (GenericException e) {            log.error("dubbo invoker have exception", e);            throw new SoulException(e.getExceptionMessage());        }    }
复制代码


    对 reference.get(),有点好奇,稍微看下 ReferenceConfig


    public synchronized T get() {        // 判断是否能用        if (this.destroyed) {            throw new IllegalStateException("Already destroyed!");        } else {            // 有延迟加载的作用            if (this.ref == null) {                this.init();            }
return this.ref; } }
复制代码


    看到它这用法还是挺巧妙的,有一个延迟加载的效果,好像可以按照这个思路去改一改自己的 RPC Demo 的客户端代理生成,哈哈


MonitorPlugin


    获取插件信息,没有开启,不进入逻辑,直接跳过


WebClientResponsePlugin


    skip = true,直接跳过


DubboResponsePlugin


    看名字就知道是个核心类,我们在 then 后执行的代码段打个断点(这种需要单独打个端口,后面才能跳进去)


    public Mono<Void> execute(final ServerWebExchange exchange, final SoulPluginChain chain) {        return chain.execute(exchange).then(Mono.defer(() -> {            // 从exchange中拿到结果            final Object result = exchange.getAttribute(Constants.DUBBO_RPC_RESULT);            try {                if (Objects.isNull(result)) {                    Object error = SoulResultWrap.error(SoulResultEnum.SERVICE_RESULT_ERROR.getCode(), SoulResultEnum.SERVICE_RESULT_ERROR.getMsg(), null);                    return WebFluxResultUtils.result(exchange, error);                }                Object success = SoulResultWrap.success(SoulResultEnum.SUCCESS.getCode(), SoulResultEnum.SUCCESS.getMsg(), JsonUtils.removeClass(result));                // 进入后使用之前熟悉的:exchange.getResponse().writeWith,返回响应给客户端                return WebFluxResultUtils.result(exchange, success);            } catch (SoulException e) {                return Mono.empty();            }        }));    }
复制代码


    执行完 plugins 链后进入到这个断点:大致是获取结果 result,加工得到 success


    然后 WebFluxResultUtils.result(exchange, success),进入后是:exchange.getResponse().writeWith,返回响应给客户端


    到这里,一个 Dubbo 请求的处理流程大致展示在我们面前了,有了前面几篇的分析基础,还是挺流畅的


总结


    此次分析验证了在上篇中的一些猜想,也得到了一些新的东西,请求处理的大致流程图如下:



  • HttpServerOperations : 明显的 netty 的请求接收的地方,请求入口

  • ReactorHttpHandlerAdapter :生成 response 和 request

  • HttpWebHandlerAdapter :exchange 的生成

  • FilteringWebHandler : filter 操作

  • SoulWebHandler :plugins 调用


    可以参考下上篇分析: Soul 源码阅读(三)HTTP请求处理概览


    这篇中我们详细分析各个 Dubbo 经过的 Plugin 的相关处理,有些复杂的就没有看了,但也得到我们此次分析想要的结果,脑海中对一个 RPC 请求如果进行处理,设计那些组件有了一定的了解,基于这些了解也能进行更近一步的分析


    我们收获和 HTTP 请求调用、RPC 请求调用、Websocket 请求调用是互斥的,通过判断请求的类型来进行选择其中一个,而 HTTP 相应、RPC 响应都会转为 HTTP 响应,然后返回给客户端,exchange 里面估计是绑定了一个 socket,然后直接调用即可(Netty 网关 Demo 又可以进行借鉴了)


    此次分析有了下面新的疑问:


  • Websocket 的响应的返回时怎样的?是长连接吗?因为没有看到 Websocket 响应相关的处理类

  • exchange 中的请求类型是怎么绑定,如何生成的?

  • 限流等插件也是需要进行匹配吗?具体执行逻辑是怎样的?


    上面这些问题就留待以后分析了,慢慢来才能可持续发展


Soul 网关源码分析文章列表



发布于: 2021 年 01 月 15 日阅读数: 16
用户头像

关注

还未添加个人签名 2018.09.09 加入

代码是门手艺活,也是门艺术活

评论

发布
暂无评论
Soul 网关源码阅读(四)Dubbo请求概览