写点什么

dubbo 源码 v2.7 分析:核心机制(一)

发布于: 2021 年 03 月 01 日
dubbo 源码 v2.7 分析:核心机制(一)

系列文章

dubbo 源码 v2.7 分析:结构、container 入口及线程模型

dubbo 源码 v2.7 分析:SPI 机制


一 回顾

上一篇我们介绍了 SPI 机制。本篇会先介绍 dubbo 中的核心机制,包括设计模式、bean 加载、扩展点机制、动态代理和远程调用流程。

二 设计模式

2.1 装饰器 &责任链模式

2.1.1 装饰器模式

装饰器模式(Decorator Pattern)允许向一个现有的对象添加新的功能,同时又不改变其结构。这种类型的设计模式属于结构型模式,它是作为现有的类的一个包装。

这种模式创建了一个装饰类,用来包装原有的类,并在保持类方法签名完整性的前提下,提供了额外的功能。

2.1.2 责任链模式

责任链模式(Chain of Responsibility Pattern)为请求创建了一个接收者对象的链。这种模式给予请求的类型,对请求的发送者和接收者进行解耦。这种类型的设计模式属于行为型模式。

在这种模式中,通常每个接收者都包含对另一个接收者的引用。如果一个对象不能处理该请求,那么它会把相同的请求传给下一个接收者,依此类推。

2.1.3 Dubbo 中的使用

Dubbo 的源码中,在启动和调用阶段都使用了装饰器。例如生产者(Provider)的调用链,具体的调用过程是在 org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper 类的方法 buildInvokerChain()内完成的。相关代码:

private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {        Invoker<T> last = invoker;        List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
if (!filters.isEmpty()) { for (int i = filters.size() - 1; i >= 0; i--) { final Filter filter = filters.get(i); final Invoker<T> next = last; last = new Invoker<T>() {
@Override public Class<T> getInterface() { return invoker.getInterface(); }
@Override public URL getUrl() { return invoker.getUrl(); }
@Override public boolean isAvailable() { return invoker.isAvailable(); }
@Override public Result invoke(Invocation invocation) throws RpcException { Result asyncResult; try { asyncResult = filter.invoke(next, invocation); } catch (Exception e) { // onError callback if (filter instanceof ListenableFilter) { Filter.Listener listener = ((ListenableFilter) filter).listener(); if (listener != null) { listener.onError(e, invoker, invocation); } } throw e; } return asyncResult; }
@Override public void destroy() { invoker.destroy(); }
@Override public String toString() { return invoker.toString(); } }; } }
return new CallbackRegistrationInvoker<>(last, filters); }
复制代码

在这个方法的第 3 行(源码中的第 55 行),通过 SPI 机制获取 Filter 列表,从 rpc.filter 目录下可以看到 Filter 的实现类集合。每个 Filter 的实现类,都带有 @Activate 注解,将注解中含有 group=provider 的 Filter 实现,按照 order 值的顺序排序,构成了一个调用链条。调用顺序为:

以 EchoFilter 为例,org.apache.dubbo.rpc.filter.EchoFilter 的源码:

@Activate(group = CommonConstants.PROVIDER, order = -110000)public class EchoFilter implements Filter {
@Override public Result invoke(Invoker<?> invoker, Invocation inv) throws RpcException { if (inv.getMethodName().equals($ECHO) && inv.getArguments() != null && inv.getArguments().length == 1) { return AsyncRpcResult.newDefaultAsyncResult(inv.getArguments()[0], inv); } return invoker.invoke(inv); }}
复制代码

这里是装饰器和责任链模式的混合使用。例如,EchoFilter 的作用是判断

是否是回声测试请求,是的话直接返回内容,这是一种责任链的体现。而像 ClassLoaderFilter 则只是在主功能上添加了功能,更改当前线程的 ClassLoader,这是典型的装饰器模式。

2.2 观察者模式

2.2.1 定义

当对象间存在一对多关系时,则使用观察者模式(Observer Pattern)。比如,当一个对象被修改时,则会自动通知依赖它的对象。观察者模式属于行为型模式。

2.2.2 Dubbo 中的使用

Provider 启动时,需要与注册中心进行交互,先注册自己的服务,再订阅自己的服务。而在订阅时,使用了观察者模式,开启一个 Listener。注册中心会做定时扫描,默认每 5 秒检查一次是否有服务更新,如果有,就会向该服务的提供者发送一个 notify 消息。Provider 接收到 notify 消息后,就会执行 NotifyListener 的 notify 方法,执行监听器方法。

2.2.3 源码


2.3 代理模式——动态代理

2.3.1 定义

在代理模式(Proxy Pattern)中,一个类代表另一个类的功能。这种类型的设计模式属于结构型模式。

在代理模式中,我们创建具有现有对象的对象,以便向外界提供功能接口。

2.3.2 Dubbo 中的使用

Dubbo 扩展了 JDK 标准 SPI 类的 Adaptive,也就是自适应扩展点,这是一个典型的动态代理实现。Dubbo

需要灵活地控制实现类,即在调用阶段动态地根据参数决定调用哪个实现类,所以采用先生成代理类的方法,能够做到灵活的调用。生成代理类的代码是 ExtensionLoader 的 createAdaptiveExtensionClassCode 方法。代理类的主要逻辑是,获取 URL 参数中指定参数的值作为获取实现类的 key。

2.2.3 源码

Adaptive 注解:

AdaptiveCompiler 是使用 Adaptive 注解的示例:

@Adaptivepublic class AdaptiveCompiler implements Compiler {
private static volatile String DEFAULT_COMPILER;
public static void setDefaultCompiler(String compiler) { DEFAULT_COMPILER = compiler; }
@Override public Class<?> compile(String code, ClassLoader classLoader) { Compiler compiler; ExtensionLoader<Compiler> loader = ExtensionLoader.getExtensionLoader(Compiler.class); String name = DEFAULT_COMPILER; // copy reference if (name != null && name.length() > 0) { compiler = loader.getExtension(name); } else { compiler = loader.getDefaultExtension(); } return compiler.compile(code, classLoader); }
}
复制代码


Adaptive 注解是一个自适应 扩展点的标识。它可以修饰在类上,也可以修饰在方法上面。

修饰类和方法上的区别:放在类上,说明当前类是一个确定的自适应扩展点的类。如果是放在方法级别,那么需要生成一个动态字节码来进行转发。

例如 Protocol 接口来说,定义了 export 和 refer 两个抽象方法,这两个方法分别带有 @Adaptive 的标识,标识是 一个自适应方法。 我们知道 Protocol 是一个通信协议的接口,具体有多种实现,那么这个时候选择哪一种呢? 取决于我们在使用 dubbo 的时候所 配置的协议名称。而这里的方法层面的 Adaptive 就决定了当前这个方法会采用何种协议来发布服务。

org.apache.dubbo.rpc.Protocol 源码:

/** * Protocol. (API/SPI, Singleton, ThreadSafe) */@SPI("dubbo")public interface Protocol {
/** * Get default port when user doesn't config the port. * * @return default port */ int getDefaultPort();
/** * Export service for remote invocation: <br> * 1. Protocol should record request source address after receive a request: * RpcContext.getContext().setRemoteAddress();<br> * 2. export() must be idempotent, that is, there's no difference between invoking once and invoking twice when * export the same URL<br> * 3. Invoker instance is passed in by the framework, protocol needs not to care <br> * * @param <T> Service type * @param invoker Service invoker * @return exporter reference for exported service, useful for unexport the service later * @throws RpcException thrown when error occurs during export the service, for example: port is occupied */ @Adaptive <T> Exporter<T> export(Invoker<T> invoker) throws RpcException;
/** * Refer a remote service: <br> * 1. When user calls `invoke()` method of `Invoker` object which's returned from `refer()` call, the protocol * needs to correspondingly execute `invoke()` method of `Invoker` object <br> * 2. It's protocol's responsibility to implement `Invoker` which's returned from `refer()`. Generally speaking, * protocol sends remote request in the `Invoker` implementation. <br> * 3. When there's check=false set in URL, the implementation must not throw exception but try to recover when * connection fails. * * @param <T> Service type * @param type Service class * @param url URL address for the remote service * @return invoker service's local proxy * @throws RpcException when there's any error while connecting to the service provider */ @Adaptive <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;
/** * Destroy protocol: <br> * 1. Cancel all services this protocol exports and refers <br> * 2. Release all occupied resources, for example: connection, port, etc. <br> * 3. Protocol can continue to export and refer new service even after it's destroyed. */ void destroy();
}
复制代码


发布于: 2021 年 03 月 01 日阅读数: 21
用户头像

磨炼中成长,痛苦中前行 2017.10.22 加入

微信公众号【程序员架构进阶】。多年项目实践,架构设计经验。曲折中向前,分享经验和教训

评论

发布
暂无评论
dubbo 源码 v2.7 分析:核心机制(一)