写点什么

Dubbo 服务消费者调用过程,35 岁老年程序员的绝地翻身之路

作者:Java高工P7
  • 2021 年 11 月 09 日
  • 本文字数:2874 字

    阅读完需:约 9 分钟

4、获取异步,或同步处理结果;


  • 异步 不需要返回值:直接调用 ExchangeClient.send()方法;

  • 同步 需要返回值:使用 ExchangeClient.request()方法,返回一个 ResponseFuture,一直阻塞到服务端返回响应结果;


案例介绍


====


先看一个简单的客户端引用服务的例子,HelloServicedubbo配置如下:


<dubbo:application name="consumer-of-helloService" />


<dubbo:registry protocol="zookeeper" address="127.0.0.1:2181" />


<dubbo:protocol name="dubbo" port="20880" />


<dubbo:reference id="helloService" interface="com.demo.dubbo.service.HelloService" />


  • 使用Zookeeper作为注册中心

  • 引用远程的HelloService接口服务


根据之前的介绍,在 Spring 启动的时候,根据<dubbo:reference>配置会创建一个 ReferenceBean,该 bean 又实现了 Spring 的 FactoryBean 接口,所以我们如下方式使用时:


@Autowired


private HelloService helloService;


使用的不是 ReferenceBean 对象,而是 ReferenceBean 的 getObject()方法返回的对象,该对象通过代理实现了 HelloService 接口,所以要看服务引用的整个过程就需要从 ReferenceBean.getObject()方法开始入手。


服务引用过程




将 ReferenceConfig.init()中的内容拆成具体的步骤,如下:


第一步:收集配置参数




methods=hello,


timestamp=1443695417847,


dubbo=2.5.3


application=consumer-of-helloService


side=consumer


pid=7748


interface=com.demo.dubbo.service.HelloService


第二步:从注册中心获取服务地址,返回 Invoker 对象




如果是单个注册中心,代码如下:


Protocol refprotocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();


invoker = refprotocol.refer(interfaceClass, url);


上述 url 的内容如下:


registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?


application=consumer-of-helloService&


dubbo=2.5.6&


pid=8292&


registry=zookeeper&


timestamp=1443707173909&


refer=


application=consumer-of-helloService&


dubbo=2.5.6&


interface=com.demo.dubbo.service.HelloService&


methods=hello&


pid=8292&


side=consumer&


timestamp=1443707173884&


第三步:使用 ProxyFactory 创建出 Invoker 的代理对象




ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();


proxyFactory.getProxy(invoker);


下面就详细说明下上述提到的几个概念:Protocol、Invoker、ProxyFactory


概念介绍




Invoker




Invoker 是一个可执行对象,有三种类型的 Invoker:


  1. 本地执行的 Invoker(服务端使用)

  2. 远程通信执行的 Invoker(客户端使用)

  3. 多个类型 2 的 Invoker 聚合成的集群版 Invoker(客户端使用)


Invoker 的实现情况如下:


这里写图片描述


先来看服务引用的第 2 个步骤,返回 Invoker 对象


对于客户端来说,Invoker 应该是 2、3 这两种类型。先来说第 2 种类型,即远程通信的 Invoker,看 DubboInvoker 的源码,调用过程 AbstractInvoker.invoke()->doInvoke():


protected Result doInvoke(final Invocation invocation) throws Throwable {


RpcInvocation inv = (RpcInvocation) invocation;


final String methodName = RpcUtils.getMethodName(invocation);


inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());


inv.setAttachment(Constants.VERSION_KEY, version);


ExchangeClient currentClient;


if (clients.length == 1) {


currentClient = clients[0];


} else {


currentClient = clients[index.getAndIncrement() % clients.length];


}


try {


boolean isAsync = RpcUtils.isAsync(getUrl(), invo


《Android学习笔记总结+最新移动架构视频+大厂安卓面试真题+项目实战源码讲义》
浏览器打开:qq.cn.hn/FTe 免费领取
复制代码


cation);


boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);


int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT);


if (isOneway) {


boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);


currentClient.send(inv, isSent);


RpcContext.getContext().setFuture(null);


return new RpcResult();


} else if (isAsync) {


ResponseFuture future = currentClient.request(inv, timeout) ;


RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));


return new RpcResult();


} else {


RpcContext.getContext().setFuture(null);


return (Result) currentClient.request(inv, timeout).get();


}


} catch (TimeoutException e) {


throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);


} catch (RemotingException e) {


throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);


}


}


大致内容就是:


将通过远程通信将 Invocation 信息传递给服务器端,服务器端接收到该 Invocation 信息后,找到对应的本地 Invoker,然后通过反射执行相应的方法,将方法的返回值再通过远程通信将结果传递给客户端。


这里分 3 种情况:


  • 执行方法不需要返回值:直接调用ExchangeClient.send()方法。

  • 执行方法的结果需要异步返回:使用ExchangeClient.request()方法返回一个ResponseFuture对象,通过RpcContext中的ThreadLocal使ResponseFuture和当前线程绑定,未等服务端响应结果就直接返回,然后服务端通过ProtocolFilterWrapper.buildInvokerChain()方法会调用Filter.invoke()方法,即FutureFilter.invoker()->asyncCallback(),会获取RpcContextResponseFuture对象,异步返回结果。

  • 执行方法的结果需要同步返回:使用ExchangeClient.request()方法,返回一个ResponseFuture,一直阻塞到服务端返回响应结果


Protocol




服务引用的第二步就是:


Protocol refprotocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();


invoker = refprotocol.refer(interfaceClass, url);


使用协议 Protocol 根据上述的 url 和服务接口来引用服务,创建出一个 Invoker 对象


默认实现的 DubboProtocol 也会经过 ProtocolFilterWrapper、ProtocolListenerWrapper、RegistryProtocol 的包装


首先看下 RegistryProtocol.refer()方法,它干了哪些事呢?


  • 将客户端的信息注册到注册中心上

  • 创建一个 RegistryDirectory,从注册中心中订阅自己引用的服务,将订阅到的 url 在 RegistryDirectory 内部转换成 Invoker。

  • RegistryDirectory 是 Directory 的实现,Directory 代表多个 Invoker,可以把它看成 List 类型的 Invoker,但与 List 不同的是,它的值可能是动态变化的,比如注册中心推送变更 RegistryDirectory 内部含有两者重要属性:

  • 注册中心服务 Registry

  • Protocol 它会利用注册中心服务 Registry 来获取最新的服务器端注册的 url 地址,然后再利用协议 Protocol 将这些 url 地址转换成一个具有远程通信功能的 Invoker 对象,如 DubboInvoker

用户头像

Java高工P7

关注

还未添加个人签名 2021.11.08 加入

还未添加个人简介

评论

发布
暂无评论
Dubbo服务消费者调用过程,35岁老年程序员的绝地翻身之路