作者:vivo 互联网服务器团队- Wang Fei、LinYupan
Dubbo 泛化调用特性可以在不依赖服务接口 API 包的场景中发起远程调用, 这种特性特别适合框架集成和网关类应用开发。
本文结合在实际开发过程中所遇到的需要远程调用多个三方系统的问题,阐述了如何利用 Dubbo 泛化调用来简化开发降低系统耦合性的项目实践,最后对 Dubbo 泛化调用的原理进行了深度解析。
一、背景
统一配置平台是一个提供终端设备各个模块进行文件配置和文件下发能力的平台,模块开发在后台服务器进行文件配置,然后终端设备可以按照特定规则获取对应的配置文件,文件下发可以按照多种设备维度进行下发,具体项目框架可以参加下图:
现有的下发策略,都是由模块开发在统一配置后台服务器进行下发维度配置,文件是否下发到对应终端设备,由用户在本平台所选择的维度所确定。
但是其他业务方也存在在公司内部的 A/B 实验平台配置下发规则,来借助统一配置平台每天轮询服务器请求新文件的能力,但是在统一配置平台配置的文件是否能够下发由 A/B 实验平台来确定,A/B 实验平台内会配置对应的规则以及配置统一配置平台对应的文件 id,然后统一配置平台就需要针对请求调用 A/B 实验平台接口来判断文件是否可以下发。
随着公司内部实验平台的增加,越来越多这种由三方平台来决定文件是否下发的对接需求,如何更好更快的应对这种类似的对接需求,是我们需要去深入思考的问题。
二、方案选型
原有统一配置的下发逻辑是先找到所有可以下发的文件,然后判断单个配置文件是否满足设备维度,如果满足则可以下发。现在在对接 A/B 实验平台以后,文件是否能下发还需要由外部系统来确定,当时设计时考虑过两种方案:
同样先找到所有可以下发的文件,然后针对单个文件按照①设备维度判断是否匹配,然后②调用 A/B 实验平台的接口获取这台设备可以下发的文件 Id, 再调用③灰度实验平台获取这台设备可以下发的文件 id, 最后将前三步获取到的配置文件 id 进行汇总得到可以下发的文件,如下图所示。
方案一打破了原来文件是否能够下发的判断逻辑,现在除了原有的判断逻辑,还需要额外步骤调用其他系统来追加另外可以下发的文件。并且后续不可避免对接其他三方系统,方案一需要不断增加调用三方接口的逻辑来追加可以下发的文件 id。此外常规的 dubbo 调用在 provider 端需要引入其他实验系统的二方库以及模型类型,增加了统一配置系统和其他系统的强耦合性。
利用 Dubbo 泛化调用高级特性抽象一个下发维度(远程调用),专门用于其他想由三方实验系统来决定是否下发文件的场景,如下图所示:
方案二统一抽象一个远程调用下发维度,可以保持原有的判断逻辑,也就是先把系统中所有可以下发的文件先查找出来,然后根据设备维度进行匹配,如果某一个文件配置的是远程调用维度,那么查找这个远程调用维度所包含的函数名称、参数类型数组和参数值对象数组,然后调用三方接口,从而判断这个文件是否可以下发到设备,最终获取到可以下发的文件 id 列表。
此外,利用 Dubbo 泛化调用高级特性,调用方并不关心提供者的接口的详细定义,只需要关注调用哪个方法,传什么参数以及接收到什么返回结果即可,这样避免需要依赖服务提供者的二方库以及模型类元,这样可以大大降低 consumer 端和 provider 端的耦合性。
综合上面的分析,我们最终确定了方案二采取利用 Dubbo 泛化调用来抽象一个统一维度的方式,下面来看一下具体的实现。
三、具体实现
1.GenericService 是 Dubbo 提供的泛化接口,用来进行泛化调用。只提供了一个 $invoke 方法,三个入口参数分别为函数名称、参数类型数组和参数值对象数组。
package com.alibaba.dubbo.rpc.service;
/**
* Generic service interface
*
* @export
*/
public interface GenericService {
/**
* Generic invocation
*
* @param method Method name, e.g. findPerson. If there are overridden methods, parameter info is
* required, e.g. findPerson(java.lang.String)
* @param parameterTypes Parameter types
* @param args Arguments
* @return invocation return value
* @throws Throwable potential exception thrown from the invocation
*/
Object $invoke(String method, String[] parameterTypes, Object[] args) throws GenericException;
复制代码
创建服务引用配置对象 ReferenceConfig。
private ReferenceConfig<GenericService> buildReferenceConfig(RemoteDubboRestrictionConfig config) {
ReferenceConfig<GenericService> referenceConfig = new ReferenceConfig<>();
referenceConfig.setApplication(applicationConfig);
referenceConfig.setRegistry(registryConfig);
referenceConfig.setInterface(config.getInterfaceName());
referenceConfig.setVersion(config.getVersion());
referenceConfig.setGeneric(Boolean.TRUE.toString());
referenceConfig.setCheck(false);
referenceConfig.setTimeout(DUBBO_INVOKE_TIMEOUT);
referenceConfig.setRetries(DUBBO_INVOKE_RETRIES);
return referenceConfig;
}
复制代码
3.设置请求参数及服务调用, 这里利用在后台所配置的完整方法名、参数类型数组和参数值数组就可以进行服务调用。
public List<Integer> invoke(RemoteDubboRestrictionConfig config, ConfigFileListQuery listQuery) {
//由于ReferenceConfig很重量,里面封装了所有与注册中心及服务提供方连接,所以这里做了缓存
GenericService genericService = prepareGenericService(config);
//构建参数
Map<String, Object> params = buildParams(listQuery);
String method = config.getMethod();
String[] parameterTypeArray = new String[]{Map.class.getName()};
Object[] parameterValueArray = new Object[]{params};
long begin = System.currentTimeMillis();
Assert.notNull(genericService, "cannot find genericService");
//具体调用
Object result = genericService.$invoke(method, parameterTypeArray, parameterValueArray);
if (logger.isDebugEnabled()) {
long duration = System.currentTimeMillis() - begin;
logger.debug("Dubbo调用结果:{}, 耗时: {}", result, duration);
}
return result == null ? Collections.emptyList() : (List<Integer>) result;
}
复制代码
那么为什么 Dubbo 泛化调用所涉及的调用方并不关心提供者的接口的详细定义,只需要关注调用哪个方法,传什么参数以及接收到什么返回结果即可呢?
在讲解泛化调用的实现原理之前,先简单讲述一下直接调用的原理。
四、 Dubbo 直接调用相关原理
Dubbo 的直接调用相关原理涉及到两个方面:Dubbo 服务暴露原理和 Dubbo 服务消费原理
4.1 Dubbo 服务暴露原理
4.1.1 服务远程暴露的整体流程
在整体上看,Dubbo 框架做服务暴露分为两大部分,第一步将持有的服务实例通过代理转 换成 Invoker,第二步会把 Invoker 通过具体的协议(比如 Dubbo)转换成 Exporter,框架做了 这层抽象也大大方便了功能扩展。
这里的 Invoker 可以简单理解成一个真实的服务对象实例,是 Dubbo 框架实体域,所有模型都会向它靠拢,可向它发起 invoke 调用。它可能是一个本地的实 现,也可能是一个远程的实现,还可能是一个集群实现。
源代码如下:
if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {
// export to local if the config is not remote (export to remote only when config is remote)
if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
exportLocal(url);
}
// export to remote if the config is not local (export to local only when config is local)
if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {
if (logger.isInfoEnabled()) {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
if (registryURLs != null && !registryURLs.isEmpty()) {
for (URL registryURL : registryURLs) {
url = url.addParameterIfAbsent(Constants.DYNAMIC_KEY, registryURL.getParameter(Constants.DYNAMIC_KEY));
URL monitorUrl = loadMonitor(registryURL);
if (monitorUrl != null) {
url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
}
if (logger.isInfoEnabled()) {
logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
}
// For providers, this is used to enable custom proxy to generate invoker
String proxy = url.getParameter(Constants.PROXY_KEY);
if (StringUtils.isNotEmpty(proxy)) {
registryURL = registryURL.addParameter(Constants.PROXY_KEY, proxy);
}
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
//向注册中心注册服务信息
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
} else {
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
}
复制代码
首先将实现类 ref 封装为 Invoker,之后将 invoker 转换为 exporter,最后将 exporter 放入缓存 Listexporters 中。
4.1.2 服务暴露的细节
4.1.2.1 将实现类 ref 封装为 Invoker
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
复制代码
① dubbo 远程暴露的入口在 ServiceBean 的 export()方法,由于 servicebean'继承了 serviceconfig 类,于是真正执行暴露的逻辑是 serviceconfig 的 doExport()方法。
② Dubbo 支持相同服务暴露多个协议,比如同时暴露 Dubbo 和 REST 协议,也支持多个注册中心,比如 zookeeper 和 nacos,框架内部会依次 对使用的协议都做一次服务暴露,每个协议注册元数据都会写入多个注册中心,具体是执行 doExportUrlsFor1Protocol。
③ 然后通过动态代理的方式创建 Invoker 对象,在服务端生成 AbstractProxylnvoker 实例,所有真实的方法调用都会委托给代理,然后代理转发给服务实现者 ref 调用;动态代理一般有:JavassistProxyFactory 和 JdkProxyFactory 两种方式,这里所选用的 JavassistProxyFactory 。
4.1.2.2 将 invoker 转换为 exporter
Exporter exporter= protocol.export(wrapperInvoker);
Exporter<?> exporter = protocol.export(wrapperInvoker);
复制代码
在将服务实例 ref 转换成 Invoker 之后,开始执行服务暴露过程。
这里会经过一系列的过滤器链路,最终会通过 RegistryProtocol#export 进行更细粒度的控制,比如先进行服务暴露再注册服务元数据。注册中心在做服务暴露时依次 做了以下几件事情:
委托具体协议(Dubbo)进行服务暴露,创建 NettyServer 监听端口和保存服务实例。
创建注册中心对象,与注册中心创建 TCP 连接。
注册服务元数据到注册中心。
订阅 configurators 节点,监听服务动态属性变更事件。
服务销毁收尾工作,比如关闭端口、反注册服务信息等。
@Override
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
//export invoker
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
URL registryUrl = getRegistryUrl(originInvoker);
//registry provider
final Registry registry = getRegistry(originInvoker);
final URL registeredProviderUrl = getRegisteredProviderUrl(originInvoker);
//to judge to delay publish whether or not
boolean register = registeredProviderUrl.getParameter("register", true);
ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl);
if (register) {
//TODO 注册服务元数据
register(registryUrl, registeredProviderUrl);
ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
}
// Subscribe the override data
// FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call the same service. Because the subscribed is cached key with the name of the service, it causes the subscription information to cover.
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
//Ensure that a new exporter instance is returned every time export
return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registeredProviderUrl);
}
复制代码
这里我们重点讲解委托具体协议进行服务暴露的过程 doLocalExport(final InvokeroriginInvoker)。
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
String key = getCacheKey(originInvoker);
ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
synchronized (bounds) {
exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
bounds.put(key, exporter);
}
}
}
return exporter;
}
复制代码
(Exporter) protocol.export(invokerDelegete)方法又会经过一系列的拦截器进行处理,最终调用 DubboProtocol 的 export 方法。
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
// export service.
String key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);
//export an stub service for dispatching event
Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
if (logger.isWarnEnabled()) {
logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
"], has set stubproxy support event ,but no stub methods founded."));
}
} else {
stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
}
}
openServer(url);
optimizeSerialization(url);
return exporter;
}
复制代码
这里很重要的一点就是将 exporter 放到了缓存,这里的 key 是 serviceGroup/serviceName:serviceVersion:port 这样形式,这里最后获取到的是 com.alibaba.dubbo.demo.DemoService:20880,之后创建 DubboExporter。这里的内存缓存 exporterMap 是很重要的一个属性,在后续消费者调用服务提供者时会被被再次使用到。
至此,服务器提供者的远程暴露流程就基本介绍完毕。
4.2 Dubbo 服务消费的实现原理
4.2.1 服务消费的整体流程
在整体上看,Dubbo 框架做服务消费也分为两大部分,第一步通过持有远程服务实例生成 Invoker,这个 Invoker 在客户端是核心的远程代理对象。第二步会把 Invoker 通过动态代理转换 成实现用户接口的动态代理引用。这里的 Invoker 承载了网络连接、服务调用和重试等功能,在 客户端,它可能是一个远程的实现,也可能是一个集群实现。
源代码如下:
public Object getObject() throws Exception {
return get();
}
public synchronized T get() {
if (destroyed) {
throw new IllegalStateException("Already destroyed!");
}
if (ref == null) {
init();
}
return ref;
}
private void init() {
...
ref = createProxy(map);
}
private T createProxy(Map<String, String> map) {
...
if (urls.size() == 1) {
invoker = refprotocol.refer(interfaceClass, urls.get(0));
}
...
// 创建服务代理
return (T) proxyFactory.getProxy(invoker);
}
复制代码
4.2.2 服务消费的细节
4.2.2.1 使用 Protocol 将 interfaceClass 转化为 Invoker
invoker = refprotocol.refer(interfaceClass, url);
① 服务引用的入口点在 ReferenceBean#getObject,由于 Referencebean'继承了 serviceconfig 类,接着会调用 Reference 的 get 方法。
② 然后根据引用的接口类型将持有远程服务实例生成 Invoker。
③ 通过一系列的过滤器链,最后调用 RegistryProtocol 的 doRefer 方法。
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
directory.setRegistry(registry);
directory.setProtocol(protocol);
// all attributes of REFER_KEY
Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);
if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
&& url.getParameter(Constants.REGISTER_KEY, true)) {
URL registeredConsumerUrl = getRegisteredConsumerUrl(subscribeUrl, url);
registry.register(registeredConsumerUrl);
directory.setRegisteredConsumerUrl(registeredConsumerUrl);
}
directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
Constants.PROVIDERS_CATEGORY
+ "," + Constants.CONFIGURATORS_CATEGORY
+ "," + Constants.ROUTERS_CATEGORY));
Invoker invoker = cluster.join(directory);
ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
return invoker;
}
复制代码
这段逻辑主要完成了注册中心实例的创建,元数据注册到注册中心及订阅的功能。
具体远程 Invoker 是在哪里创建的呢?客户端调用拦截器又是在哪里构造的呢?
当在 directory.subscrib()中 第一次发起订阅时会进行一次数据拉取操作,同时触发 RegistryDirectory#notify 方法,这里 的通知数据是某一个类别的全量数据,比如 providers 和 routers 类别数据。当通知 providers 数 据时,在 RegistryDirectory#toInvokers 方法内完成 Invoker 转换。
private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<String, Invoker<T>>();
if (urls == null || urls.isEmpty()) {
return newUrlInvokerMap;
}
Set<String> keys = new HashSet<String>();
String queryProtocols = this.queryMap.get(Constants.PROTOCOL_KEY);
for (URL providerUrl : urls) {
// If protocol is configured at the reference side, only the matching protocol is selected
......
URL url = mergeUrl(providerUrl);
String key = url.toFullString(); // The parameter urls are sorted
if (keys.contains(key)) { // Repeated url
continue;
}
keys.add(key);
// Cache key is url that does not merge with consumer side parameters, regardless of how the consumer combines parameters, if the server url changes, then refer again
Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
if (invoker == null) { // Not in the cache, refer again
try {
boolean enabled = true;
.........
if (enabled) {
invoker = new InvokerDelegate<T>(protocol.refer(serviceType, url), url, providerUrl);
}
} catch (Throwable t) {
logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);
}
if (invoker != null) { // Put new invoker in cache
newUrlInvokerMap.put(key, invoker);
}
} else {
newUrlInvokerMap.put(key, invoker);
}
}
keys.clear();
return newUrlInvokerMap;
}
复制代码
核心代码
invoker = new InvokerDelegate<T>(protocol.refer(serviceType, url), url, providerUrl);
复制代码
这里会经过一系列的过滤器链,然后最终调用 DubboProtocol 的 refer 方法,来创建具体的 invoker。
@Override
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
optimizeSerialization(url);
// create rpc invoker.
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
invokers.add(invoker);
return invoker;
}
复制代码
这里返回的 invoker 会用来更新 RegistryDirectory 的 methodInvokerMap 属性,最终在实际调用消费端方法时,会根据 method 找到对应的 invoker 列表。
private void refreshInvoker(List<URL> invokerUrls) {
if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null
&& Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
this.forbidden = true; // Forbid to access
this.methodInvokerMap = null; // Set the method invoker map to null
destroyAllInvokers(); // Close all invokers
} else {
this.forbidden = false; // Allow to access
Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) {
invokerUrls.addAll(this.cachedInvokerUrls);
} else {
this.cachedInvokerUrls = new HashSet<URL>();
this.cachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparison
}
if (invokerUrls.isEmpty()) {
return;
}
Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map
Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // Change method name to map Invoker Map
// state change
// If the calculation is wrong, it is not processed.
if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {
logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls.toString()));
return;
}
this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
this.urlInvokerMap = newUrlInvokerMap;
try {
destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
} catch (Exception e) {
logger.warn("destroyUnusedInvokers error. ", e);
}
}
}
复制代码
4.2.2.2 使用 ProxyFactory 创建代理
(T) proxyFactory.getProxy(invoker)
复制代码
上述的 proxyFactory 是 ProxyFactory$Adaptive 实例,其 getProxy 内部最终得到是一个被 StubProxyFactoryWrapper 包装后的 JavassistProxyFactory。直接来看 JavassistProxyFactory.getProxy 方法。
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}
复制代码
【invoker】:MockClusterInvoker 实例
【interfaces】:[interface com.alibaba.dubbo.demo.DemoService, interface com.alibaba.dubbo.rpc.service.EchoService]
我们最终返回的代理对象其实是一个 proxy0 对象,当我们调用其 sayHello 方法时,其调用内部的 handler.invoke 方法。
package com.alibaba.dubbo.common.bytecode;
import com.alibaba.dubbo.demo.DemoService;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
public class proxy0 implements DemoService {
public static Method[] methods;
private InvocationHandler handler;
public String sayHello(String paramString) {
Object[] arrayOfObject = new Object[1];
arrayOfObject[0] = paramString;
Object localObject = this.handler.invoke(this, methods[0], arrayOfObject);
return (String) localObject;
}
public proxy0() {
}
public proxy0(InvocationHandler paramInvocationHandler) {
this.handler = paramInvocationHandler;
}
}
复制代码
Dubbo 泛化调用一般是服务器提供者都采用直接暴露的形式,消费者端采用服务泛化调用的形式,所以这里重点讨论 Dubbo 泛化调用与直接调用在消费者端的服务引用和发起消费的区别与联系。
五、Dubbo 泛化调用与直接调用的区别与联系
5.1 通过持有远程服务实例生成 Invoker
private T createProxy(Map<String, String> map) {
...
if (urls.size() == 1) {
invoker = refprotocol.refer(interfaceClass, urls.get(0));
}
...
// 创建服务代理
return (T) proxyFactory.getProxy(invoker);
}
复制代码
这里的 interfaceClass 的来源不一样,createProxy(Mapmap) 是在 ReferenceConfig 的 init()方法中调用的,具体的 interfaceClass 根据是否是返回调用会有所区别,具体看如下代码:
private void init() {
...
if (ProtocolUtils.isGeneric(getGeneric())) {
interfaceClass = GenericService.class;
} else {
try {
interfaceClass = Class.forName(interfaceName, true, Thread.currentThread()
.getContextClassLoader());
} catch (ClassNotFoundException e) {
throw new IllegalStateException(e.getMessage(), e);
}
checkInterfaceAndMethods(interfaceClass, methods);
}
...
ref = createProxy(map);
}
复制代码
直接调用:interfaceClass→com.alibaba.dubbo.demo.DemoService
泛化调用:interfaceClass→com.alibaba.dubbo.rpc.service.GenericService
最终获取的 invoker 也不一样
直接调用:
interface com.alibaba.dubbo.demo.DemoService -> dubbo://xx.xx.xx.xx:20881/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-consumer&bean.name=com.alibaba.dubbo.demo.DemoService&check=false&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=24932&qos.port=33333®ister.ip=xx.xx.xx.xx&remote.timestamp=1640744945905&side=consumer×tamp=1640745033688
泛化调用:
interface com.alibaba.dubbo.rpc.service.GenericService -> dubbo://xx.xx.xx.xx:20881/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=test&bean.name=com.alibaba.dubbo.demo.DemoService&check=false&dubbo=2.0.2&generic=true&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=27952&qos.port=33333®ister.ip=xx.xx.xx.xx&remote.timestamp=1640748337173&side=consumer×tamp=1640748368427
5.2 服务发起消费流程
在 4.2.2 服务消费者发起请求细节第①步是将请求参数(方法名,方法参数类型,方法参数值,服务名,附加参数)封装成一个 Invocation。
直接调用的 RpcInvoaction 如下:
RpcInvocation [methodName=sayHello, parameterTypes=[class java.lang.String], arguments=[world], attachments={}]
泛化调用的 RpcInvoaction 如下:
RpcInvocation [methodName=$invoke, parameterTypes=[class java.lang.String, class [Ljava.lang.String;, class [Ljava.lang.Object;], arguments=[sayHello, [Ljava.lang.String;@22c1bff0, [Ljava.lang.Object;@30ae230f], attachments={path=com.alibaba.dubbo.demo.DemoService, input=296, dubbo=2.0.2, interface=com.alibaba.dubbo.demo.DemoService, version=0.0.0, generic=true}]
我们可以发现这里生成的 RpcInvocation 对象是有区别的,但是服务提供者暴露的服务是不会发生变化的,所以这里必然有一个转换过程,这里的参数转换关键就在于服务提供者端的 GenericImplFilter 类。
@Activate(group = Constants.PROVIDER, order = -20000)
public class GenericFilter implements Filter {
protected final Logger logger = LoggerFactory.getLogger(getClass());
@Override
public Result invoke(Invoker<?> invoker, Invocation inv) throws RpcException {
logger.info("----------------GenericFilter-------------------------");
if (inv.getMethodName().equals(Constants.$INVOKE)
&& inv.getArguments() != null
&& inv.getArguments().length == 3
&& !invoker.getInterface().equals(GenericService.class)) {
String name = ((String) inv.getArguments()[0]).trim();
String[] types = (String[]) inv.getArguments()[1];
Object[] args = (Object[]) inv.getArguments()[2];
try {
Method method = ReflectUtils.findMethodByMethodSignature(invoker.getInterface(), name, types);
Class<?>[] params = method.getParameterTypes();
if (args == null) {
args = new Object[params.length];
}
String generic = inv.getAttachment(Constants.GENERIC_KEY);
if (StringUtils.isBlank(generic)) {
generic = RpcContext.getContext().getAttachment(Constants.GENERIC_KEY);
}
if (StringUtils.isEmpty(generic)
|| ProtocolUtils.isDefaultGenericSerialization(generic)) {
args = PojoUtils.realize(args, params, method.getGenericParameterTypes());
} else if (ProtocolUtils.isJavaGenericSerialization(generic)) {
...
} else if (ProtocolUtils.isBeanGenericSerialization(generic)) {
...
}
Result result = invoker.invoke(new RpcInvocation(method, args, inv.getAttachments()));
if (result.hasException()
&& !(result.getException() instanceof GenericException)) {
return new RpcResult(new GenericException(result.getException()));
}
RpcResult rpcResult;
if (ProtocolUtils.isJavaGenericSerialization(generic)) {
...
} else if (ProtocolUtils.isBeanGenericSerialization(generic)) {
...
} else {
rpcResult = new RpcResult(PojoUtils.generalize(result.getValue()));
}
rpcResult.setAttachments(result.getAttachments());
return rpcResult;
} catch (NoSuchMethodException e) {
throw new RpcException(e.getMessage(), e);
} catch (ClassNotFoundException e) {
throw new RpcException(e.getMessage(), e);
}
}
return invoker.invoke(inv);
}
}
复制代码
核心流程:
① 是否是泛化调用判断
if (inv.getMethodName().equals(Constants.$INVOKE)
&& inv.getArguments() != null
&& inv.getArguments().length == 3
&& !invoker.getInterface().equals(GenericService.class)) {
复制代码
② 参数的提取
String name = ((String) inv.getArguments()[0]).trim();
String[] types = (String[]) inv.getArguments()[1];
Object[] args = (Object[]) inv.getArguments()[2];
复制代码
③ 参数的序列化,再构造新的 RpcInvocation 对象
Method method = ReflectUtils.findMethodByMethodSignature(invoker.getInterface(), name, types);
Class<?>[] params = method.getParameterTypes();
if (StringUtils.isEmpty(generic)
|| ProtocolUtils.isDefaultGenericSerialization(generic)) {
args = PojoUtils.realize(args, params, method.getGenericParameterTypes());
}
...
Result result = invoker.invoke(new RpcInvocation(method, args, inv.getAttachments()));
复制代码
序列化前 RpcInvocation 对象:
RpcInvocation [methodName=$invoke, parameterTypes=[class java.lang.String, class [Ljava.lang.String;, class [Ljava.lang.Object;], arguments=[sayHello, [Ljava.lang.String;@22c1bff0, [Ljava.lang.Object;@30ae230f], attachments={path=com.alibaba.dubbo.demo.DemoService, input=296, dubbo=2.0.2, interface=com.alibaba.dubbo.demo.DemoService, version=0.0.0, generic=true}]
序列化后 RpcInvocation 对象:
RpcInvocation [methodName=sayHello, parameterTypes=[class java.lang.String], arguments=[world], attachments={path=com.alibaba.dubbo.demo.DemoService, input=296, dubbo=2.0.2, interface=com.alibaba.dubbo.demo.DemoService, version=0.0.0, generic=true}]
后面的调用逻辑就和直接调用的是一致的了,比如从本地缓存从本地缓存中 Map<string, list<invoker>> methodInvokerMap 中获取 key 为 sayHello(指定方法名)的 List<invoker>,接着进行后续的调用。
那么什么时候触发 GenericFilter 的 invoke 方法呢,这里其实就和过滤器的调用链建立有关系了,从 GenericFilter 类上的注解,我们可以看到 @Activate(group = Constants.PROVIDER, order = -20000),说明是在服务提供者端生效的。
另外,服务提供者端是如何知道调用是直接调用还是泛化调用的,这里就涉及到与服务提供者端 GenericFilter 对应的消费者端的 GenericImplFilter 类,代码如下:
/**
* GenericImplInvokerFilter
*/
@Activate(group = Constants.CONSUMER, value = Constants.GENERIC_KEY, order = 20000)
public class GenericImplFilter implements Filter {
private static final Logger logger = LoggerFactory.getLogger(GenericImplFilter.class);
private static final Class<?>[] GENERIC_PARAMETER_TYPES = new Class<?>[]{String.class, String[].class, Object[].class};
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
String generic = invoker.getUrl().getParameter(Constants.GENERIC_KEY);
if (ProtocolUtils.isGeneric(generic)
&& !Constants.$INVOKE.equals(invocation.getMethodName())
&& invocation instanceof RpcInvocation) {
...
}
if (invocation.getMethodName().equals(Constants.$INVOKE)
&& invocation.getArguments() != null
&& invocation.getArguments().length == 3
&& ProtocolUtils.isGeneric(generic)) {
Object[] args = (Object[]) invocation.getArguments()[2];
if (ProtocolUtils.isJavaGenericSerialization(generic)) {
for (Object arg : args) {
if (!(byte[].class == arg.getClass())) {
error(generic, byte[].class.getName(), arg.getClass().getName());
}
}
} else if (ProtocolUtils.isBeanGenericSerialization(generic)) {
for (Object arg : args) {
if (!(arg instanceof JavaBeanDescriptor)) {
error(generic, JavaBeanDescriptor.class.getName(), arg.getClass().getName());
}
}
}
((RpcInvocation) invocation).setAttachment(
Constants.GENERIC_KEY, invoker.getUrl().getParameter(Constants.GENERIC_KEY));
}
return invoker.invoke(invocation);
}
复制代码
5.3 泛化调用的整体流程图
六、总结
高内聚低耦合是我们架构设计的一个重要目标,而 Dubbo 的泛化调用特性仅仅只需知道服务的完整接口路径、请求参数类型和请求参数值就可以直接进行调用获取请求结果,能够避免依赖特定三方 jar 包,从而降低了系统的耦合性。在日常学习和开发的过程中,我们除了需要关注一门技术的常规使用方法以外,还需要关注一些高级特性,从而做出更合适的架构设计。
参考资料:
《深入理解 Apache Dubbo 与实战》 诣极,林琳
Dubbo源码分析-泛化调用
Dubbo 泛化调用使用及原理解析
评论