写点什么

dubbo 源码 v2.7 分析:核心机制(二)

发布于: 2021 年 03 月 02 日
dubbo 源码 v2.7 分析:核心机制(二)

系列文章

dubbo 源码 v2.7 分析:结构、container 入口及线程模型

dubbo 源码 v2.7 分析:SPI 机制

dubbo 源码 v2.7 分析:核心机制(一)


关注公众号:程序员架构进阶,每天实时获取更新,上百份面试资料和其他福利等你拿~

一 概述

上一篇重点讲了 dubbo 中的几种设计模式,和对应的源码。本篇会继续介绍 Bean 加载、Extension、代理几种机制在 dubbo 中的应用。

二 Bean 加载

dubbo 中,bean 的加载是基于 Spring 的可扩展 Schema 机制。

2.1 Spring 的可扩展 Schema

在大多数场景,如果我们需要为系统提供可配置化支持,简单的做法是直接基于 Spring 的标准 Bean 来配置;但如果配置较为复杂或者需要更多丰富控制,这种简单粗暴的方法会显得非常笨拙。一般的做法会用原生态的方式去解析定义好的 xml 文件,然后转化为配置对象,这种方式当然可以解决所有问题,但实现起来比较繁琐,特别是是在配置非常复杂的时候,解析工作是一个不得不考虑的负担。Spring 提供了可扩展 Schema,配置步骤如下:

2.1.1 设计配置属性和 JavaBean


2.1.2 编写 XSD 文件

Spring 用 xsd 文件校验 xml 文件格式。校验方法:Spring 默认在启动时是要加载 XSD 文件来验证 xml 文件的,所以如果有的时候断网了,或者一些开源软件切换域名,那么就很容易碰到应用启动不了。为了防止这种情况,Spring 提供了一种机制,默认从本地加载 XSD 文件。

2.1.3 编写 NamespaceHandler 和 BeanDefinitionParser

完成解析工作,会用到 NamespaceHandler 和 BeanDefinitionParser。NamespaceHandler 会根据 schema 和节点名找到某个 BeanDefinitionParser,然后由 BeanDefinitionParser 完成具体的解析工作。因此需要分别完成 NamespaceHandler 和 BeanDefinitionParser 的实现类,Spring 提供了默认实现类 NamespaceHandlerSupport 和 AbstractSingleBeanDefinitionParser,简单的方式就是去继承这两个类。

2.1.4 编写 spring.handlers 和 spring.schemas

放在 META-INF 文件夹,串联所有部件,让应用感知到。这两个文件需要开发者编写并放入 META-INF 文件夹中,这两个文件的地址必须是 META-INF/spring.handlers 和 META-INF/spring.schemas,spring 会默认去载入这两个文件。

2.1.5 在 Bean 文件中应用

与配置一个普通的 Spring Bean 类似,只是需要注意使用的是自定义的 Schema。配置好后,再通过 Spring 容器的 getBean(beanName)方法来使用自定义的 bean。

2.2 dubbo 中的实现方式

在 dubbo 的 META-INF 下,可以看到包含了很多文件,其中 spring.handlers 和 spring.schemas 就是上面提到的两个串联配置文件:

spring.handles:

http\://dubbo.apache.org/schema/dubbo=org.apache.dubbo.config.spring.schema.DubboNamespaceHandlerhttp\://code.alibabatech.com/schema/dubbo=org.apache.dubbo.config.spring.schema.DubboNamespaceHandler
复制代码

spring.schemas:

http\://dubbo.apache.org/schema/dubbo/dubbo.xsd=META-INF/dubbo.xsdhttp\://code.alibabatech.com/schema/dubbo/dubbo.xsd=META-INF/compat/dubbo.xsd
复制代码


三 Extension 机制

即扩展点机制。

3.1 扩展点配置

3.1.1 根据关键字读取配置文件,获得具体的实现类

dubbo-demo/dubbo-demo-xml 是 dubbo 源码的 xml 示例,其中dubbo-provider.xml的配置:

<?xml version="1.0" encoding="UTF-8"?><beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"       xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"       xmlns="http://www.springframework.org/schema/beans"       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsd       http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd">
<dubbo:application name="demo-provider" metadata-type="remote"> <dubbo:parameter key="mapping-type" value="metadata"/> </dubbo:application>
<dubbo:config-center address="zookeeper://127.0.0.1:2181"/> <dubbo:metadata-report address="zookeeper://127.0.0.1:2181"/> <dubbo:registry id="registry1" address="zookeeper://127.0.0.1:2181"/>
<dubbo:protocol name="dubbo" port="-1"/>
<bean id="demoService" class="org.apache.dubbo.demo.provider.DemoServiceImpl"/> <bean id="greetingService" class="org.apache.dubbo.demo.provider.GreetingServiceImpl"/>
<dubbo:service interface="org.apache.dubbo.demo.DemoService" timeout="3000" ref="demoService" registry="registry1"/> <dubbo:service version="1.0.0" group="greeting" timeout="5000" interface="org.apache.dubbo.demo.GreetingService" ref="greetingService"/>
</beans>
复制代码

其中,<dubbo:service interface="org.apache.dubbo.demo.DemoService" timeout="3000" ref="demoService" registry="registry1"/> 这段配置就会根据 registry 去获取指定的 Service。

3.1.2 注解 @SPI 和 @Adaptive

@SPI 注解:

在 Protocol 接口中,定义默认协议为 dubbo,就是通过 @SPI("dubbo")来实现的:

@SPI("dubbo")public interface Protocol {
int getDefaultPort();
@Adaptive <T> Exporter<T> export(Invoker<T> invoker) throws RpcException;
@Adaptive <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;
void destroy();
}
复制代码

@Adaptive 注解:

这个注解打在接口方法上,通过 ExtensionLoader.getAdaptiveExtension()方法获取设配类,会先通过前面的过程生成 Java 的源代码,再通过编译器编译成 class 加载。但 Compiler 的实现策略选择,也是通过 ExtensionLoader.getAdaptiveExtension(),如果也通过编译器编译成 class 会导致死循环?

再分析一下 ExtensionLoader.getAdaptiveExtension(),对于实现类上标记了注解 @Adaptive 的 dubbo spi 扩展机制,它获取设配类不是生成设配类的 Java 源代码,而是在读取扩展文件的时候,遇到实现类打了 @Adaptive 就把这个类作为设配类缓存在 ExtensionLoader 中,调用时直接返回。

3.1.3 filter 和 listener

在生成具体的实现类对象时,不是直接读取类文件,而是在读取类文件的基础上,通过 filter 和 listener 去封装类对象。

四 代理

4.1 代理生成方式

大家熟知的应该有 JDK、cglib 这两种方式,在 Spring 框架中做了这两种动态代理的支持。不过除此之外,Java 中海油 Javassist 库动态代理、Javassist 库动态字节码代理。代理之间的区别可参见Java动态代理机制详解(JDK 和CGLIB,Javassist,ASM)

4.2 dubbo 中的代理

在 dubbo 的 rpc->proxy 包下,我们可以看到 javassist、jdk、wrapper 这三个包。


其中几个重点类,JdkProxyFactory,JavassistProxyFactory 这两个代理工厂类是继承 AbstractProxyFactory 抽象类,而 AbstractProxyFactory 这个抽象类又实现了 ProxyFactory 接口,StubProxyFactoryWrapper 则是对 ProxyFactory 的直接实现。类之间关系如下:

4.3 Invoke 调用

先上一张图:


ReferenceConfig 类比较重要的是 init()方法,在这里做了大量工作。我们关注的是 ref = createProxy(map);再继续向下跟进,这个方法会创建一个 service proxy 并返回:

return (T) PROXY_FACTORY.getProxy(invoker);
复制代码

这里的 PROXY_FACTORY 也是通过扩展点机制获取的:

private static final ProxyFactory PROXY_FACTORY = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
复制代码

关于 ProxyFactory 的配置,会涉及到 META-INF/dubbo.internal/org.apache.dubbo.rpc.ProxyFactory 这个文件,内容为:

stub=org.apache.dubbo.rpc.proxy.wrapper.StubProxyFactoryWrapperjdk=org.apache.dubbo.rpc.proxy.jdk.JdkProxyFactoryjavassist=org.apache.dubbo.rpc.proxy.javassist.JavassistProxyFactory
复制代码

这里指定了我们前面提到的三种 ProxyFactory 实现类。PROXY_FACTORY 默认走的是 JdkProxyFactory,所以 PROXY_FACTORY.getProxy(invoker);实际使用的方法就是下面的这段(AbstractProxyFactory):

public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {        return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));    }
复制代码

在 InvokerInvocationHandler 类,阐述了代理的具体工作方式。贴出源码:

/** * InvokerHandler */public class InvokerInvocationHandler implements InvocationHandler {    private static final Logger logger = LoggerFactory.getLogger(InvokerInvocationHandler.class);    private final Invoker<?> invoker;
public InvokerInvocationHandler(Invoker<?> handler) { this.invoker = handler; }
@Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { String methodName = method.getName(); Class<?>[] parameterTypes = method.getParameterTypes(); if (method.getDeclaringClass() == Object.class) { return method.invoke(invoker, args); } if ("toString".equals(methodName) && parameterTypes.length == 0) { return invoker.toString(); } if ("hashCode".equals(methodName) && parameterTypes.length == 0) { return invoker.hashCode(); } if ("equals".equals(methodName) && parameterTypes.length == 1) { return invoker.equals(args[0]); }
return invoker.invoke(new RpcInvocation(method, args)).recreate(); }}
复制代码

这里最重要的是最后一句,调用了 Invoker 接口(实现类)的 invoke 方法。根据关系 Invoker=>AbstractInvoker=>DubboInvoker,上面的 invoker.invoke()使用的是 AbstractInvoker 的 invoke:

@Override    public Result invoke(Invocation inv) throws RpcException {        // if invoker is destroyed due to address refresh from registry, let's allow the current invoke to proceed        if (destroyed.get()) {            logger.warn("Invoker for service " + this + " on consumer " + NetUtils.getLocalHost() + " is destroyed, "                    + ", dubbo version is " + Version.getVersion() + ", this invoker should not be used any longer");        }        RpcInvocation invocation = (RpcInvocation) inv;        invocation.setInvoker(this);        if (CollectionUtils.isNotEmptyMap(attachment)) {            invocation.addAttachmentsIfAbsent(attachment);        }        Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();        if (CollectionUtils.isNotEmptyMap(contextAttachments)) {            /**             * invocation.addAttachmentsIfAbsent(context){@link RpcInvocation#addAttachmentsIfAbsent(Map)}should not be used here,             * because the {@link RpcContext#setAttachment(String, String)} is passed in the Filter when the call is triggered             * by the built-in retry mechanism of the Dubbo. The attachment to update RpcContext will no longer work, which is             * a mistake in most cases (for example, through Filter to RpcContext output traceId and spanId and other information).             */            invocation.addAttachments(contextAttachments);        }
invocation.setInvokeMode(RpcUtils.getInvokeMode(url, invocation)); RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
try { return doInvoke(invocation); } catch (InvocationTargetException e) { // biz exception Throwable te = e.getTargetException(); if (te == null) { return AsyncRpcResult.newDefaultAsyncResult(null, e, invocation); } else { if (te instanceof RpcException) { ((RpcException) te).setCode(RpcException.BIZ_EXCEPTION); } return AsyncRpcResult.newDefaultAsyncResult(null, te, invocation); } } catch (RpcException e) { if (e.isBiz()) { return AsyncRpcResult.newDefaultAsyncResult(null, e, invocation); } else { throw e; } } catch (Throwable e) { return AsyncRpcResult.newDefaultAsyncResult(null, e, invocation); } }
复制代码

DubboInvoker 的 doInvoke:

 @Override    protected Result doInvoke(final Invocation invocation) throws Throwable {        RpcInvocation inv = (RpcInvocation) invocation;        final String methodName = RpcUtils.getMethodName(invocation);        inv.setAttachment(PATH_KEY, getUrl().getPath());        inv.setAttachment(VERSION_KEY, version);
ExchangeClient currentClient; if (clients.length == 1) { currentClient = clients[0]; } else { currentClient = clients[index.getAndIncrement() % clients.length]; } try { boolean isOneway = RpcUtils.isOneway(getUrl(), invocation); int timeout = getUrl().getMethodParameter(methodName, TIMEOUT_KEY, DEFAULT_TIMEOUT); if (isOneway) { boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false); currentClient.send(inv, isSent); RpcContext.getContext().setFuture(null); return AsyncRpcResult.newDefaultAsyncResult(invocation); } else { AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv); CompletableFuture<Object> responseFuture = currentClient.request(inv, timeout); responseFuture.whenComplete((obj, t) -> { if (t != null) { asyncRpcResult.completeExceptionally(t); } else { asyncRpcResult.complete((AppResponse) obj); } }); RpcContext.getContext().setFuture(new FutureAdapter(asyncRpcResult)); return asyncRpcResult; } } catch (TimeoutException e) { throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } catch (RemotingException e) { throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } }
复制代码

在这里,invoke 函数最终会转为网络调用。

RpcInvocation 的构造函数中,包含了客户端传递给 invoker 的信息:

public RpcInvocation(Invocation invocation, Invoker<?> invoker) {        this(invocation.getMethodName(), invocation.getParameterTypes(),                invocation.getArguments(), new HashMap<String, String>(invocation.getAttachments()),                invocation.getInvoker());        if (invoker != null) {            URL url = invoker.getUrl();            setAttachment(PATH_KEY, url.getPath());            if (url.hasParameter(INTERFACE_KEY)) {                setAttachment(INTERFACE_KEY, url.getParameter(INTERFACE_KEY));            }            if (url.hasParameter(GROUP_KEY)) {                setAttachment(GROUP_KEY, url.getParameter(GROUP_KEY));            }            if (url.hasParameter(VERSION_KEY)) {                setAttachment(VERSION_KEY, url.getParameter(VERSION_KEY, "0.0.0"));            }            if (url.hasParameter(TIMEOUT_KEY)) {                setAttachment(TIMEOUT_KEY, url.getParameter(TIMEOUT_KEY));            }            if (url.hasParameter(TOKEN_KEY)) {                setAttachment(TOKEN_KEY, url.getParameter(TOKEN_KEY));            }            if (url.hasParameter(APPLICATION_KEY)) {                setAttachment(APPLICATION_KEY, url.getParameter(APPLICATION_KEY));            }        }    }
复制代码

至此,我们理清了 invoker.invode 的调用过程。


发布于: 2021 年 03 月 02 日阅读数: 19
用户头像

磨炼中成长,痛苦中前行 2017.10.22 加入

微信公众号【程序员架构进阶】。多年项目实践,架构设计经验。曲折中向前,分享经验和教训

评论

发布
暂无评论
dubbo 源码 v2.7 分析:核心机制(二)