Dubbo—SPI 及自适应扩展原理,java 项目系统架构图
registry=com.alibaba.dubbo.registry.integration.RegistryProtocolfilter=com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapperlistener=com.alibaba.dubbo.rpc.protocol.ProtocolListenerWrappermock=com.alibaba.dubbo.rpc.support.MockProtocolinjvm=com.alibaba.dubbo.rpc.protocol.injvm.InjvmProtocoldubbo=com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocolrmi=com.alibaba.dubbo.rpc.protocol.rmi.RmiProtocolhessian=com.alibaba.dubbo.rpc.protocol.hessian.HessianProtocolcom.alibaba.dubbo.rpc.protocol.http.HttpProtocolcom.alibaba.dubbo.rpc.protocol.webservice.WebServiceProtocolthrift=com.alibaba.dubbo.rpc.protocol.thrift.ThriftProtocolmemcached=memcom.alibaba.dubbo.rpc.protocol.memcached.MemcachedProtocolredis=com.alibaba.dubbo.rpc.protocol.redis.RedisProtocol
看到这么多扩展类(每一个配置文件中都有很多),我们首先应该思考一个问题:Dubbo 一启动,就加载所有的扩展类么?作为一个优秀的 RPC 框架,肯定不会耗时耗力做这样的无用功,所以肯定会通过一种方式拿到指定的扩展才对。我们可以看到大多是以键值对方式(表示为 extName-value)配置的扩展,那么不难猜测,这里的 extName 就是用来实现上面所说的功能的。 那到底是不是呢?以上纯属猜测,下面就到源码中去验证。
SPI 源码
Dubbo 中实现 SPI 的核心类是 ExtensionLoader,该类并未提供公共的构造方法来初始化,而是通过 getExtensionLoader 方法获取一个 loader 对象:
// loader 缓存 private static final ConcurrentMap<Class<?>, ExtensionLoader<?>> EXTENSION_LOADERS = new ConcurrentHashMap<Class<?>, ExtensionLoader<?>>();public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) {if (type == null)throw new IllegalArgumentException("Extension type == null");if(!type.isInterface()) {throw new IllegalArgumentException("Extension type(" + type + ") is not interface!");}if(!withExtensionAnnotation(type)) {throw new IllegalArgumentException("Extension type(" + type +") is not extension, because WITHOUT @" + SPI.class.getSimpleName() + " Annotation!");}
ExtensionLoader<T> loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);if (loader == null) {EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader<T>(type));loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);}return loader;}
private final ExtensionFactory objectFactory;private ExtensionLoader(Class<?> type) {this.type = type;objectFactory = (type == ExtensionFactory.class ? null : ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension());}
这里的 class 参数就是扩展点的接口类型,每一个 loader 都需要绑定一个扩展点类型。然后首先从缓存中获取 loader,未获取到就初始化一个 loader 并放入缓存。而在私有构造器初始化的时候我们需要注意 objectFactory 这个变量,先大概有个映像,后面会用到。 拿到 loader 之后,就可以调用 getExtension 方法去获取指定的扩展点了,该方法传入了一个 name 参数,不难猜测这个就是配置文件中的键,可以 debugger 验证一下:
private final ConcurrentMap<String, Holder<Object>> cachedInstances = new ConcurrentHashMap<String, Holder<Object>>();public T getExtension(String name) {if (name == null || name.length() == 0)throw new IllegalArgumentException("Extension name == null");if ("true".equals(name)) {return getDefaultExtension();}// 从缓存中获取 Holder 对象,该对象的值就是扩展对象 Holder<Object> holder = cachedInstances.get(name);if (holder == null) {cachedInstances.putIfAbsent(name, new Holder<Object>());holder = cachedInstances.get(name);}// 缓存中没有该扩展,说明还没有加载扩展类,就去配置文件中加载并创建对应的扩展对象// 这里通过双重校验锁的方式保证线程安全,Dubbo 中大量运用了该技巧 Object instance = holder.get();if (instance == null) {synchronized (holder) {instance = holder.get();if (instance == null) {instance = createExtension(name);holder.set(instance);}}}return (T) instance;}
同样的也是先从缓存拿,缓存没有就创建并添加到缓存,因此主要看 createExtension 方法:
// 扩展类实例缓存对象 private static final ConcurrentMap<Class<?>, Object> EXTENSION_INSTANCES = new ConcurrentHashMap<Class<?>, Object>();private T createExtension(String name) {// 从配置文件中加载扩展类并获取指定的扩展类,没有就抛出异常 Class<?> clazz = getExtensionClasses().get(name);if (clazz == null) {throw findException(name);}try {// 从缓存中拿扩展类实例,没有就通过反射创建并缓存 T instance = (T) EXTENSION_INSTANCES.get(clazz);if (instance == null) {EXTENSION_INSTANCES.putIfAbsent(clazz, (T) clazz.newInstance());instance = (T) EXTENSION_INSTANCES.get(clazz);}// 依赖注入,这里及后面都和当前流程无关,可以先略过,有个印象就好 injectExtension(instance);// 获取包装类并实例化,最后注入依赖对象 Set<Class<?>> wrapperClasses = cachedWrapperClasses;if (wrapperClasses != null && wrapperClasses.size() > 0) {for (Class<?> wrapperClass : wrapperClasses) {instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));}}return instance;} catch (Throwable t) {throw new IllegalStateException("Extension instance(name: " + name + ", class: " +type + ") could not be instantiated: " + t.getMessage(), t);}}
关键点代码就在 getExtensionClasses 方法中,怎么从配置文件中加载扩展类的。而该方法主要是调用了 loadExtensionClasses 方法:
private Map<String, Class<?>> loadExtensionClasses() {// 判断接口上是否标注有 @SPI 注解,该注解的值就是默认使用的扩展类,// 赋值给 cachedDefaultName 变量缓存起来 final SPI defaultAnnotation = type.getAnnotation(SPI.class);if(defaultAnnotation != null) {String value = defaultAnnotation.value();if(value != null && (value = value.trim()).length() > 0) {String[] names = NAME_SEPARATOR.split(value);if(names.length > 1) {throw new IllegalStateException("more than 1 default extension name on extension " + type.getName()
": " + Arrays.toString(names));}if(names.length == 1) cachedDefaultName = names[0];}}
// 真正读取配置文件的方法 Map<String, Class<?>> extensionClasses = new HashMap<String, Class<?>>();loadFile(extensionClasses, DUBBO_INTERNAL_DIRECTORY);loadFile(extensionClasses, DUBBO_DIRECTORY);loadFile(extensionClasses, SERVICES_DIRECTORY);return extensionClasses;}
该方法主要是缓存当前扩展接口指定的默认扩展实现类(@SPI 注解指定),并调用 loadFile 读取配置文件,从这里我们可以看到 Dubbo 默认是读取以下 3 个文件夹中的配置文件:
private static final String SERVICES_DIRECTORY = "META-INF/services/";private static final String DUBBO_DIRECTORY = "META-INF/dubbo/";private static final String DUBBO_INTERNAL_DIRECTORY = DUBBO_DIRECTORY + "internal/";
然后是 loadFile,该方法很长,不全部放上来了,这里提取关键的代码:
String fileName = dir + type.getName();
首先通过文件全路径找到对应的文件,并用 BufferedReader 一行行读取文件内容:
String name = null;int i = line.indexOf('=');if (i > 0) {// 配置文件中的键 name = line.substring(0, i).trim();// 扩展点全类名 line = line.substring(i + 1).trim();}if (line.length() > 0) {// 加载 class,如果有些类的依赖 jar 包未导入,这里就会抛出异常(比如 WebserviceProtocol)Class<?> clazz = Class.forName(line, true, classLoader);// 验证当前类型是否是扩展类的父类型 if (! type.isAssignableFrom(clazz)) {throw new IllegalStateException("Error when load extension class(interface: " +type + ", class line: " + clazz.getName() + "), class "
clazz.getName() + "is not subtype of interface.");}// 扩展类是否标注了 @Adaptive 注解,表示为一个自定义的自适应扩展类// 如果是将其缓存到 cachedAdaptiveClassif (clazz.isAnnotationPresent(Adaptive.class)) {if(cachedAdaptiveClass == null) {cachedAdaptiveClass = clazz;} else if (! cachedAdaptiveClass.equals(clazz)) {// 超过一个自定义的自适应扩展类就抛出异常 throw new IllegalStateException("More than 1 adaptive class found: "
cachedAdaptiveClass.getClass().getName()
", " + clazz.getClass().getName());}} else {try {// 进入到该分支表示为 Wrapper 装饰扩展类,该类都有一个特征:包含// 一个有参的构造器,如果没有,就抛出异常进入到另一个分支,// Wrapper 类的作用我们后面再分析 clazz.getConstructor(type);// 缓存 Wrapper 到 cachedWrapperClasses 中 Set<Class<?>> wrappers = cachedWrapperClasses;if (wrappers == null) {cachedWrapperClasses = new ConcurrentHashSet<Class<?>>();wrappers = cachedWrapperClasses;}wrappers.add(clazz);} catch (NoSuchMethodException e) {// 进入此分支表示为一个普通的扩展类 clazz.getConstructor();if (name == null || name.length() == 0) {// 由于历史原因,Dubbo 最开始配置文件中并不是以 K-V 来配置的// 扩展点,而是会通过 @Extension 注解指定,所以这里会通过// 该注解去获取到 namename = findAnnotationName(clazz);// 由于 @Extension 废弃使用,但配置文件中仍存在非 K-V 的配置,// 所以这里是直接通过类名获取简单的 nameif (name == null || name.length() == 0) {if (clazz.getSimpleName().length() > type.getSimpleName().length()&& clazz.getSimpleName().endsWith(type.getSimpleName())) {name = clazz.getSimpleName().substring(0, clazz.getSimpleName().length() - type.getSimpleName().length()).toLowerCase();} else {throw new IllegalStateException("No such extension name for the class " + clazz.getName() + " in the config " + url);}}}String[] names = NAME_SEPARATOR.split(name);if (names != null && names.length > 0) {// 判断当前扩展点是否标注有 @Activate 注解,该注解表示// 该扩展点自动激活 Activate activate = clazz.getAnnotation(Activate.class);if (activate != null) {cachedActivates.put(names[0], activate);}for (String n : names) {if (! cachedNames.containsKey(clazz)) {cachedNames.put(clazz, n);}Class<?> c = extensionClasses.get(n);if (c == null) {extensionClasses.put(n, clazz);} else if (c != clazz) {throw new IllegalStateException("Duplicate extension " + type.getName() + " name " + n + " on " + c.getName() + " and " + clazz.getName());}}}}}}
至此,我们就看到了 Dubbo SPI 的实现全过程,我们也了解了 Dubbo 强大的扩展性是如何实现的,但是这么多扩展,Dubbo 在运行中是如何决定调用哪一个扩展点的方法呢?这就是 Dubbo 另一强大的机制:自适应扩展。(PS:这里需要留意 cachedAdaptiveClass 和 cachedWrapperClasses 两个变量的赋值,后面会用到。)
二、自适应扩展机制
什么是自适应扩展?上文刚刚也说了,Dubbo 中存在很多的扩展类,这些扩展类不可能一开始就全部初始化,那样非常的耗费资源,所以我们应该在使用到该类的时候再进行初始化,也就是懒加载。但是这是比较矛盾的,拓展未被加载,那么拓展方法就无法被调用(静态方法除外)。拓展方法未被调用,拓展就无法被加载(官网原话)。所以也就有了自适应扩展机制,那么这个原理是怎样的呢? 首先需要了解 @Adaptive 注解,该注解可以标注在类和方法上:
标注在类上,表明该类为自定义的适配类
标注在方法上,表明需要动态的为该方法创建适配类
当有地方调用扩展类的方法时,首先会调用适配类的方法,然后适配类再根据扩展名称调用 getExtension 方法拿到对应的扩展类对象,最后调用该对象的方法即可。流程就这么简单,下面看看代码怎么实现的。 首先我们回到 ExtensionLoader 的构造方法中:
objectFactory = (type == ExtensionFactory.class ? null : ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension());
其中调用了 getAdaptiveExtension 方法,从方法名不难看出就是去获取一个适配类对象:
private final Holder<Object> cachedAdaptiveInstance = new Holder<Object>();public T getAdaptiveExtension() {Object instance = cachedAdaptiveInstance.get();if (instance == null) {if(createAdaptiveInstanceError == null) {synchronized (cachedAdaptiveInstance) {instance = cachedAdaptiveInstance.get();if (instance == null) {try {instance = createAdaptiveExtension();cachedAdaptiveInstance.set(instance);} catch (Throwable t) {createAdaptiveInstanceError = t;throw new IllegalStateException("fail to create adaptive instance: " + t.toString(), t);}}}}else {throw new IllegalStateException("fail to create adaptive instance: " + createAdaptiveInstanceError.toString(), createAdaptiveInstanceError);}}
return (T) instance;}
该方法很简单,就是从缓存中获取适配类对象,未获取到就调用 createAdaptiveExtension 方法加载适配类并通过反射创建对象:
private T createAdaptiveExtension() {try {// 这里又注入了些东西,先略过 return injectExtension((T) getAdaptiveExtensionClass().newInstance());} catch (Exception e) {throw new IllegalStateException("Can not create adaptive extenstion " + type + ", cause: " + e.getMessage(), e);}}
调用 getAdaptiveExtensionClass 加载适配类:
private Class<?> getAdaptiveExtensionClass() {// 这里刚刚分析过了,从配置文件中加载配置类 getExtensionClasses();if (cachedAdaptiveClass != null) {return cachedAdaptiveClass;}return cachedAdaptiveClass = createAdaptiveExtensionClass();}
cachedAdaptiveClass 这个变量应该还没忘,在 loadFile 里赋值的,即我们自定义的适配扩展类,若没有则调用 createAdaptiveExtensionClass 动态创建:
private Class<?> createAdaptiveExtensionClass() {// 生成适配类的 Java 代码,主要实现标注了 @Adaptive 的方法逻辑 String code = createAdaptiveExtensionClassCode();ClassLoader classLoader = findClassLoader();// 调用 compiler 编译 com.alibaba.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.common.compiler.Compiler.class).getAdaptiveExtension();return compiler.compile(code, classLoader);}
该方法就是生成适配类的字节码,你一定好奇适配类的代码是怎样的,只需要打断点就可以看到了,这里我们以 Protocol 类的适配类为例:
import com.alibaba.dubbo.common.extension.ExtensionLoader;
public class Protocol$Adpative implements com.alibaba.dubbo.rpc.Protocol {public void destroy() {throw new UnsupportedOperationException("method public abstract void com.alibaba.dubbo.rpc.Protocol.destroy() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");}
public int getDefaultPort() {throw new UnsupportedOperationException("method public abstract int com.alibaba.dubbo.rpc.Protocol.getDefaultPort() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");}
public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0, com.alibaba.dubbo.common.URL arg1) {if (arg1 == null) throw new IllegalArgumentException("url == null");com.alibaba.dubbo.common.URL url = arg1;String extName = (url.getProtocol() == null ? "dubbo" : url.getP
rotocol());if (extName == null)throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);return extension.refer(arg0, arg1);}
public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) {if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");if (arg0.getUrl() == null)throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");com.alibaba.dubbo.common.URL url = arg0.getUrl();String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());if (extName == null)throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);return extension.export(arg0);}}
后面会讲到 Protocol 扩展类都是通过 export 方法暴露服务,refer 方法引用服务,而这两个方法在接口中都标注了 @Adaptive 注解,所以 Dubbo 为其生成了动态的适配类(这个和 Java 的动态代理的原理有点像。同时我们看到这两个方法中都通过 getExtension 方法去获取指定的扩展类的实例(这个扩展类名称来自于 Invoker(后面会讲)中的 url,因为 dubbo 是基于 url 驱动的,所有的配置都在 url 中)。 这就是 Dubbo 强大的自适应扩展机制的实现原理,我们可以将其运用到我们的项目中去,这就是看源码的好处。不过还有个问题,刚刚在 createAdaptiveExtensionClass 方法中你一定疑惑 compiler 是什么,它也是调用的 getAdaptiveExtension 获取适配类,这不就进入了死循环么? 当然不会,首先我们可以去配置文件看看 Compiler 的扩展类都有哪些:
adaptive=com.alibaba.dubbo.common.compiler.support.AdaptiveCompilerjdk=com.alibaba.dubbo.common.compiler.support.JdkCompilerjavassist=com.alibaba.dubbo.common.compiler.support.JavassistCompiler
有一个 AdaptiveCompiler 类,从名字上我们就能猜到它是一个自定义的适配类了,然后在其类上可以看到 @Adaptive 注解验证我们的猜想,那么上文也说了在 loadFile 方法中会将该类赋值给 cachedAdaptiveClass 变量缓存,然后在 createAdaptiveExtension -> getAdaptiveExtensionClass 方法中获取并实例化对象,所以并不会死循环,那么在该类中做了什么呢?
public class AdaptiveCompiler implements Compiler {
// 这个是在哪赋值的?private static volatile String DEFAULT_COMPILER;
public static void setDefaultCompiler(String compiler) {DEFAULT_COMPILER = compiler;}
public Class<?> compile(String code, ClassLoader classLoader) {Compiler compiler;ExtensionLoader<Compiler> loader = ExtensionLoader.getExtensionLoader(Compiler.class);String name = DEFAULT_COMPILER; // copy referenceif (name != null && name.length() > 0) {// 根据 DEFAULT_COMPILER 名称获取 compiler = loader.getExtension(name);} else {// 获取 @SPI 注解值指定的默认扩展 compiler = loader.getDefaultExtension();}return compiler.compile(code, classLoader);}
}
评论