前言
大家好,今天开始给大家分享 — Dubbo 专题之 Dubbo 事件通知。在前面的章节中我们介绍了 Dubbo 本地调用,了解了什么是本地调用以及日常的使用场景和实现原理,同时我们知道本地调用是一个伪协议,它不开启端口,不发起远程调用,只在 JVM 内直接关联,但会执行 Dubbo 的 Filter 链。那有的小伙伴可能想知道当我们发起对某个服务调用时在调用过程中能否有相应的事件通知呢?那么在这个章节中我们会通过介绍 Dubbo 事件通知来了解下 Dubbo 是怎样来实现的事件通知。首先我们得了解什么是事件?以及有什么使用场景和实现原理。那就让我们快速开始吧!
1. 事件通知简介
有过编程经验的小伙伴都知道,我们的 Spring 框架中就存在大量的事件和监听器。那有的小伙伴可能会问为什么我们要使用事件和监听器呢?首先我们知道事件和监听其实在我们的设计模式中可以理解为观察者模式,什么意思呢?例如:在我们的桌面编程中我们的 UI 界面中有一个按钮当用户点击按钮时触发一个事件,这个事件触发就会通知关心此事件的监听者,当监听者接收到事件后就会触发后面的处理逻辑,这就是我们所说的事件监听。这种基于事件通知的优点就是可以动态的增加和移除事件的监听者,同时可以实时或异步通知监听者。
2. 使用方式
在 Dubbo 中提供三种事件分别是:调用之前、调用之后、异常发生,对应的回调方法是 oninvoke
、onreturn
、onthrow
。我们可以通过这三个事件来配置一个服务调用的前后或异常发生时的行为。配置示例如下:
<dubbo:reference id="demoService" interface="com.muke.dubbocourse.common.api.BookFacade">
<dubbo:method name="get" async="true" onreturn = "demoCallback.onreturn" onthrow="demoCallback.onthrow" />
</dubbo:reference>
复制代码
async=true
表示结果是否马上返回,onreturn
表示是否需要回调。
两者叠加存在以下几种组合情况:
异步回调模式:async=true onreturn="xxx"
同步回调模式:async=false onreturn="xxx"
异步无回调 :async=true
同步无回调 :async=false
3. 使用场景
根据前面的讨论我们知道当我们调用某个服务时会可能触发三种事件,我们可以通过这三种事件对应的行为来处理相应的事情。
场景一:在调用服务方法前我们可以记录开始时间,在调用结束后统计整个调用耗费,以及在发生异常时我们可以告警或打印错误日志等。
场景二:我们可能有这样的需求比如,在调用某个服务前后设置和销毁上下文,那么可以在调用oninvoke
、onreturn
做响应的上下文操作。
场景三:在调用服务前后触发一些前置和后置处理工作。例如:记录请求日志、响应日志。
4. 示例演示
下面我们通过获取图书列表调用服务触发通知为例,下面是项目结构:
其中消费者和服务提供者代码我们就不再讨论,这里我们主要讨论消费端的配置文件dubbo-consumer-xml.xml
代码如下:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"
xmlns="http://www.springframework.org/schema/beans"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsd
http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd">
<dubbo:application name="demo-consumer" logger="log4j"/>
<!--使用zookeeper注册中心-->
<dubbo:registry address="zookeeper://127.0.0.1:2181" />
<!--事件通知接口-->
<bean id ="notify" class = "com.muke.dubbocourse.eventnotify.consumer.NotifyImpl" />
<!--url直接指定服务提供者所做机器以及端口-->
<dubbo:reference id="bookFacade" interface="com.muke.dubbocourse.common.api.BookFacade">
<!--指定调用queryByName方法触发相关的事件-->
<dubbo:method name="queryByName" async="true" onreturn="notify.onreturn" onthrow="notify.onthrow"/>
</dubbo:reference>
</beans>
复制代码
在上面代码中我们配置了一个事件通知接口NotifyImpl
,并且配置在调用queryByName
方法时触发onreturn
和onthrow
回调。下面我们来看看回调方法的定义:
public interface Notify {
void onreturn(List<Book> books, RequestParameter requestParameter);
void onthrow(Throwable ex, RequestParameter requestParameter);
}
public class NotifyImpl implements Notify{
@Override
public void onreturn(List<Book> books, RequestParameter requestParameter) {
System.out.println("调方法请求参数:" +requestParameter+",返回结果:"+books);
}
@Override
public void onthrow(Throwable ex, RequestParameter requestParameter) {
System.out.println("调用方法抛出异常:"+ex+",方法参数:"+requestParameter);
}
}
复制代码
在通知接口中我们抽象了onreturn
和onthrow
方法且和上面在 XML 配置进行对应,这样就简单的完成了代码配置。
5. 原理分析
下面我们通过源码简单分析下实现原理。
**事件回调方法配置加载过程:**我们都知道消费者对远程调用的服务都会包装为ReferenceConfig
,当我们调用org.apache.dubbo.config.ReferenceConfig#get
方法时会触发org.apache.dubbo.config.ReferenceConfig#init
方法,该方法除了处理配置参数、参数校验、本地存根检查等外还会解析<dubbo:method/>
标签中的oninvoke
、onreturn
、 onthrow
属性。核心代码如下:
public synchronized void init() {
//...
if (CollectionUtils.isNotEmpty(getMethods())) {
attributes = new HashMap<>();
for (MethodConfig methodConfig : getMethods()) {
//...
//回调方法`oninvoke`、`onreturn`、 `onthrow`封装为AsyncMethodInfo
ConsumerModel.AsyncMethodInfo asyncMethodInfo = AbstractConfig.convertMethodConfig2AsyncInfo(methodConfig);
if (asyncMethodInfo != null) {
// consumerModel.getMethodModel(methodConfig.getName()).addAttribute(ASYNC_KEY, asyncMethodInfo);
attributes.put(methodConfig.getName(), asyncMethodInfo);
}
}
}
//注册消费者信息
repository.registerConsumer(
serviceMetadata.getServiceKey(),
attributes,
serviceDescriptor,
this,
null,
serviceMetadata);
//创建远程代理对象
ref = createProxy(map);
//设置代理对象
serviceMetadata.setTarget(ref);
serviceMetadata.addAttribute(PROXY_CLASS_REF, ref);
repository.lookupReferredService(serviceMetadata.getServiceKey()).setProxyObject(ref);
//...
}
复制代码
其中核心方法org.apache.dubbo.config.AbstractConfig#convertMethodConfig2AsyncInfo
代码如下:
/***
*
* 事件通知回掉接口方法解析:当服务器端某个方法被调用之前、调用之后、出现异常时,会触发 oninvoke、onreturn、onthrow 三个事件,可以配置当事件发生时,通知哪个类的哪个方法
* <dubbo:method ></dubbo:method> 相关方法参数解析
*
* @author liyong
* @date 16:52 2020-03-02
* @param methodConfig
* @exception
* @return org.apache.dubbo.rpc.model.ConsumerModel.AsyncMethodInfo
**/
public static ConsumerModel.AsyncMethodInfo convertMethodConfig2AsyncInfo(MethodConfig methodConfig) {
if (methodConfig == null || (methodConfig.getOninvoke() == null && methodConfig.getOnreturn() == null && methodConfig.getOnthrow() == null)) {
return null;
}
//check config conflict
if (Boolean.FALSE.equals(methodConfig.isReturn()) && (methodConfig.getOnreturn() != null || methodConfig.getOnthrow() != null)) {
throw new IllegalStateException("method config error : return attribute must be set true when onreturn or onthrow has been set.");
}
ConsumerModel.AsyncMethodInfo asyncMethodInfo = new ConsumerModel.AsyncMethodInfo();
//获取配置中的回调方法
asyncMethodInfo.setOninvokeInstance(methodConfig.getOninvoke());
asyncMethodInfo.setOnreturnInstance(methodConfig.getOnreturn());
asyncMethodInfo.setOnthrowInstance(methodConfig.getOnthrow());
try {
//设置invokeMethod方法
String oninvokeMethod = methodConfig.getOninvokeMethod();
if (StringUtils.isNotEmpty(oninvokeMethod)) {
asyncMethodInfo.setOninvokeMethod(getMethodByName(methodConfig.getOninvoke().getClass(), oninvokeMethod));
}
//设置returnMethod方法
String onreturnMethod = methodConfig.getOnreturnMethod();
if (StringUtils.isNotEmpty(onreturnMethod)) {
asyncMethodInfo.setOnreturnMethod(getMethodByName(methodConfig.getOnreturn().getClass(), onreturnMethod));
}
//设置throwMethod方法
String onthrowMethod = methodConfig.getOnthrowMethod();
if (StringUtils.isNotEmpty(onthrowMethod)) {
asyncMethodInfo.setOnthrowMethod(getMethodByName(methodConfig.getOnthrow().getClass(), onthrowMethod));
}
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
return asyncMethodInfo;
}
复制代码
上面的代码主要是根据 XML 配置的回调方法封装为ConsumerModel.AsyncMethodInfo
实体。
**事件回调方法调用过程如下:**首先我们普及下 Dubbo 中的一个常识就是服务消费者调用远程服务过程会被封装为Invoker
调用对象其中也包括 Filter 对象,如下代码是构建过程的核心代码:
/***
*
* 过滤链转换为Invoker调用链
*
* @author liyong
* @date 17:40 2020-03-04
* @param invoker
* @param key
* @param group
* @exception
* @return org.apache.dubbo.rpc.Invoker<T>
**/
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
Invoker<T> last = invoker;
//通过Dubbo中SPI机制加载Filter过滤链
List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);//过滤链
if (!filters.isEmpty()) {
for (int i = filters.size() - 1; i >= 0; i--) {
final Filter filter = filters.get(i);
final Invoker<T> next = last;
//构建Invoker对象
last = new Invoker<T>() {
//...
//真实调用方法
@Override
public Result invoke(Invocation invocation) throws RpcException {
Result asyncResult;
try {
//执行过滤器invoke方法
asyncResult = filter.invoke(next, invocation);
} catch (Exception e) {
//...
} finally {
}
//调用完成触发响应事件
return asyncResult.whenCompleteWithContext((r, t) -> {
//处理回调或监听事件
if (filter instanceof ListenableFilter) {// Deprecated!
Filter.Listener listener = ((ListenableFilter) filter).listener();
if (listener != null) {
if (t == null) {
//触发消息回调
listener.onMessage(r, invoker, invocation);
} else {
//发生错误执行错误回调方法
listener.onError(t, invoker, invocation);
}
}
} else if (filter instanceof Filter.Listener) {
Filter.Listener listener = (Filter.Listener) filter;
if (t == null) {
//触发消息回调
listener.onMessage(r, invoker, invocation);
} else {
//发生错误执行错误回调方法
listener.onError(t, invoker, invocation);
}
} else {// Deprecated!
filter.onResponse(r, invoker, invocation);
}
});
}
//...
};
}
}
return last;
}
复制代码
在上面的代码中把 Filter 的调用转换为 Invoker 调用,其中当 Filter 执行完成后会触发响应事件调用并执行org.apache.dubbo.rpc.protocol.dubbo.filter.FutureFilter#onMessage
调用。核心代码如下:
@Override
public void onMessage(Result result, Invoker<?> invoker, Invocation invocation) {
if (result.hasException()) {
//存在异常触发
fireThrowCallback(invoker, invocation, result.getException());
} else {
//正常执行触发
fireReturnCallback(invoker, invocation, result.getValue());
}
}
复制代码
当调用过程存在异常触发org.apache.dubbo.rpc.protocol.dubbo.filter.FutureFilter#fireThrowCallback
代码如下:
/**
*
* 触发异常回调
*
* @author liyong
* @date 3:40 PM 2020/11/15
* @param invoker
* @param invocation
* @param exception
* @exception
* @return void
**/
private void fireThrowCallback(final Invoker<?> invoker, final Invocation invocation, final Throwable exception) {
final ConsumerModel.AsyncMethodInfo asyncMethodInfo = getAsyncMethodInfo(invoker, invocation);
if (asyncMethodInfo == null) {
return;
}
//获取配置的throwMethod方法
final Method onthrowMethod = asyncMethodInfo.getOnthrowMethod();
//获取throwMethod方法实例
final Object onthrowInst = asyncMethodInfo.getOnthrowInstance();
//...
//获取throwMethod方法参数类型
Class<?>[] rParaTypes = onthrowMethod.getParameterTypes();
//throwMethod方法第一个参数配置为Throwable以及子类
if (rParaTypes[0].isAssignableFrom(exception.getClass())) {
try {
//获取回调方法所有参数 例如:onthrow(Throwable ex, RequestParameter requestParameter)中ex和requestParameter
Object[] args = invocation.getArguments();
Object[] params;
if (rParaTypes.length > 1) {
//throwMethod方法存在2个参数且第二个参数是数组类型
if (rParaTypes.length == 2 && rParaTypes[1].isAssignableFrom(Object[].class)) {
params = new Object[2];
params[0] = exception;
//第二个参数填充数组
params[1] = args;
} else {
params = new Object[args.length + 1];
params[0] = exception;
//使用数组拷贝
System.arraycopy(args, 0, params, 1, args.length);
}
} else {
//不存在参数只填充异常
params = new Object[]{exception};
}
//执行配置throwMethod方法调用
onthrowMethod.invoke(onthrowInst, params);
} catch (Throwable e) {
logger.error(invocation.getMethodName() + ".call back method invoke error . callback method :" + onthrowMethod + ", url:" + invoker.getUrl(), e);
}
} else {
logger.error(invocation.getMethodName() + ".call back method invoke error . callback method :" + onthrowMethod + ", url:" + invoker.getUrl(), exception);
}
}
复制代码
上面方法触发异常回调,并填充方法签名类型的参数。下面核心代码触发正常的回调逻辑:
/**
*
* 触发正常回调
*
* @author liyong
* @date 3:43 PM 2020/11/15
* @param invoker
* @param invocation
* @param result
* @exception
* @return void
**/
private void fireReturnCallback(final Invoker<?> invoker, final Invocation invocation, final Object result) {
final ConsumerModel.AsyncMethodInfo asyncMethodInfo = getAsyncMethodInfo(invoker, invocation);
if (asyncMethodInfo == null) {
return;
}
//获取配置的throwMethod方法
final Method onReturnMethod = asyncMethodInfo.getOnreturnMethod();
//获取throwMethod方法实例
final Object onReturnInst = asyncMethodInfo.getOnreturnInstance();
//...
//获取回调方法所有参数
Object[] args = invocation.getArguments();
Object[] params;
Class<?>[] rParaTypes = onReturnMethod.getParameterTypes();
if (rParaTypes.length > 1) {
//调用方法参数个数为2 且第一个为数组类型
if (rParaTypes.length == 2 && rParaTypes[1].isAssignableFrom(Object[].class)) {
params = new Object[2];
//数组第0个为原方法返回值
params[0] = result;
//第1个为原方法参数
params[1] = args;
} else {
params = new Object[args.length + 1];
params[0] = result;
//使用数组拷贝
System.arraycopy(args, 0, params, 1, args.length);
}
} else {
//只有一个参数类型
params = new Object[]{result};
}
try {
//触发onreturn回调方法
onReturnMethod.invoke(onReturnInst, params);
} catch (InvocationTargetException e) {
fireThrowCallback(invoker, invocation, e.getTargetException());
} catch (Throwable e) {
fireThrowCallback(invoker, invocation, e);
}
}
复制代码
上面的两个核心方法fireThrowCallback
和fireReturnCallback
分别是在发生异常触发和业务正常执行触发的回调逻辑。同样oninvoke
方法也是一样会触发fireInvokeCallback
方法调用。
6. 小结
在本小节中我们主要学习了 Dubbo 中的事件通知他们分别时:调用之前、调用之后、异常发生,以及常见的使用场景和使用方式。同时我们也通过示例演示和源码分析对 Dubbo 的事件通知原理进行解析。其中有个难点就是要理解到 Dubbo 中的服务调用过程都是通过 Invoker 对象进行包装,后面有机会我会对 Dubbo 源码进行全面解析。
本节课程的重点如下:
理解 Dubbo 中事件通知
了解事件通知使用方式
了解事件通知使用场景
了解 Dubbo 中事件通知原理
作者
个人从事金融行业,就职过易极付、思建科技、某网约车平台等重庆一流技术团队,目前就职于某银行负责统一支付系统建设。自身对金融行业有强烈的爱好。同时也实践大数据、数据存储、自动化集成和部署、分布式微服务、响应式编程、人工智能等领域。同时也热衷于技术分享创立公众号和博客站点对知识体系进行分享。关注公众号:青年 IT 男 获取最新技术文章推送!
博客地址: http://youngitman.tech
微信公众号:
评论