1. 背景
为啥突然想到写这个?起因就是看到了 Nacos 的#3757 ISSUE
,理解错误, 以为是服务启动,没有注册上服务,实际 namespace 不同,导致服务无法注册。 但这丝毫不影响再去研究了一波代码,顺便也看到了 Nacos 是如何利用 Spring 的事件来进行服务注册的。分享一波,欢迎大家学习指正!
2. 娓娓道来 - Spring 事件
2.1 怎么发布事件
我们大家应该都知道,在 Spring 中,是通过实现org.springframework.context.ApplicationEventPublisher
来发布一个事件。ApplicationEcentPublisher
是一个接口,我们来看一下接口中逻辑。
public interface ApplicationEventPublisher {
/**
* Notify all <strong>matching</strong> listeners registered with this
* application of an application event. Events may be framework events
* (such as RequestHandledEvent) or application-specific events.
* @param event the event to publish
* @see org.springframework.web.context.support.RequestHandledEvent
*/
default void publishEvent(ApplicationEvent event) {
publishEvent((Object) event);
}
/**
* Notify all <strong>matching</strong> listeners registered with this
* application of an event.
* <p>If the specified {@code event} is not an {@link ApplicationEvent},
* it is wrapped in a {@link PayloadApplicationEvent}.
* @param event the event to publish
* @since 4.2
* @see PayloadApplicationEvent
*/
void publishEvent(Object event);
}
复制代码
接口很简单,里面有两个方法,都是发布事件,而接口默认实现,是调用publishEvent(Object event)
的实现,唯一的区别就是 default 方法的参数是具体的事件类。
既然这是个接口,那一定有实现类,那么我们肯定是要进入实现类,快捷键,查看一下具体实现类。
我们通过实现类,可以看到,应该是从AbstractApplicationContext
进去(其他 Context 都是实现类,凭感觉和代码提示,从第一个类进如代码)。
进去org.springframework.context.support.AbstractApplicationContext
,我们直奔主题,查看是如何实现 PublishEvent。具体代码如下:
/**
* Publish the given event to all listeners.
* @param event the event to publish (may be an {@link ApplicationEvent}
* or a payload object to be turned into a {@link PayloadApplicationEvent})
* @param eventType the resolved event type, if known
* @since 4.2
*/
protected void publishEvent(Object event, @Nullable ResolvableType eventType) {
Assert.notNull(event, "Event must not be null");
// Decorate event as an ApplicationEvent if necessary
ApplicationEvent applicationEvent;
if (event instanceof ApplicationEvent) {
// 实现的具体的ApplicationEvent, 直接进行转化即可
applicationEvent = (ApplicationEvent) event;
}
else {
// 将事件装饰成PayloadApplicationEvent,
applicationEvent = new PayloadApplicationEvent<>(this, event);
if (eventType == null) {
eventType = ((PayloadApplicationEvent<?>) applicationEvent).getResolvableType();
}
}
// Multicast right now if possible - or lazily once the multicaster is initialized
if (this.earlyApplicationEvents != null) {
// 广播还未初始化完成,放入earlyApplicationEvents.
this.earlyApplicationEvents.add(applicationEvent);
}
else {
// 核心关注点: 使用广播广播事件,
getApplicationEventMulticaster().multicastEvent(applicationEvent, eventType);
}
// Publish event via parent context as well...
// 父容器的监听器进行广播
if (this.parent != null) {
if (this.parent instanceof AbstractApplicationContext) {
((AbstractApplicationContext) this.parent).publishEvent(event, eventType);
}
else {
this.parent.publishEvent(event);
}
}
}
复制代码
在这段代码中,我们得到了两点有用的信息,分别是:
发布事件的整体流程。(封装事件,广播事件)
发布的时间通过getApplicationEventMulticaster().multicastEvent(applicationEvent, eventType);
广播事件
通过代码可以知道,我们接下来的关注点是getApplicationEventMulticaster()
,如何通过multicastEvent()
方法将事件广播出去呢?
2.2 什么是 ApplicationEventMulticaster
进入到核心关注点代码,getApplicationEventMulticaster()
是什么?
ApplicationEventMulticaster getApplicationEventMulticaster() throws IllegalStateException {
// ...略... 没什么重要信息
return this.applicationEventMulticaster;
}
复制代码
代码很简单,没什么逻辑,但是让我们明白了getApplicationEventMulticaster()
获取到的对象是ApplicationEventMulticaster
,那我们接下来搞懂这个东西(ApplicationEventMulticaster
)是如何初始化?理清楚后可以弄清楚逻辑了。
轻轻松松翻到ApplicationEventMulticaster
初始化的代码(给自己鼓个掌👏),代码如下:
/**
* Initialize the ApplicationEventMulticaster.
* Uses SimpleApplicationEventMulticaster if none defined in the context.
* @see org.springframework.context.event.SimpleApplicationEventMulticaster
*/
protected void initApplicationEventMulticaster() {
ConfigurableListableBeanFactory beanFactory = getBeanFactory();
// 判断容器中是否已经创建ApplicationEventMulticaster
if (beanFactory.containsLocalBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME)) {
this.applicationEventMulticaster =
beanFactory.getBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, ApplicationEventMulticaster.class);
}
else {
// 初始化默认ApplicationEventMulticaster
this.applicationEventMulticaster = new SimpleApplicationEventMulticaster(beanFactory);
beanFactory.registerSingleton(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, this.applicationEventMulticaster);
}
}
复制代码
代码很简单,就是判断容器中是否已经包含了ApplicationEventMulticaster
(可以理解为是否自己实现了ApplicationEventMulticaster
),如果没实现,则采用 Spring 的默认实现SimpleApplicationEventMulticaster
。
到目前为止,我们已经知道的信息是:
SimpleApplicationEventMulticaster
是ApplicationEventMulticaster
的默认实现
搞到了这个有用的信息,那么我们去搞懂SimpleApplicationEventMulticaster
是什么就可以知道 Spring 的广播流程了
2.2.1 SimpleApplicationEventMulticaster 是什么?如何广播事件?
带着思考与疑惑,我们进入SimpleApplicationEventMulticaster
,就看看multicastEvent()
方法, 因为上面代码中有写到,发送事件是这样操作的:getApplicationEventMulticaster().multicastEvent(applicationEvent, eventType);
方法如下:
public void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) {
// 解析事件类型。(不知道没关系,先通过名字猜,然后再去验证自己的猜想)
ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event));
// 遍历所有的监听者,
for (final ApplicationListener<?> listener : getApplicationListeners(event, type)) {
Executor executor = getTaskExecutor();
if (executor != null) {
// 有线程池,则用线程池
executor.execute(() -> invokeListener(listener, event));
}
else {
// 没有线程池则直接调用
invokeListener(listener, event);
}
}
}
复制代码
代码看到这, 我们已经有了大概的了解,但大家应该还有两个疑问,分别是:
getApplicationListeners(event, type)
是什么?(这个很重要,包含著为什么特定的 Listener 能接收到特定的事件)
invokeListener(listener, event);
里面的逻辑是什么?
在这两个问题中,第一个问题我们暂时先放下,因为这暂时不涉及事件调用流程,后边说。我们就先说一下第二个以为。invokeListener(listener, event);
逻辑是什么。
我们带着疑惑,接着进入代码:
private void doInvokeListener(ApplicationListener listener, ApplicationEvent event) {
try {
// 监听器处理事件
listener.onApplicationEvent(event);
}
catch (ClassCastException ex) {
...略...
}
}
复制代码
到这,知道了。逻辑蛮简单的,就是调用 Listener 处理事件的逻辑。我们脑海中的有个大概流程图了,如下图所示:
通过AbstractApplicationContext实现类
发布事件,在通过SimpleApplicationEventMulticaster
将事件广播到所有 Listener,就完成了整个过程的调用。但是,我们还有其中二块没有搞清楚,分别是:
allListener 是怎么来的呢?
为什么能根据事件类型来决定调用哪些 Listener?
2.3 Listener 的秘密
代码疑问,回到代码。从multicastEvent()
方法中的getApplicationListeners(event, type)
进入,发现进入到了AbstractApplicationEventMulticaster
代码中,发现了 Listener 的增减和删除逻辑(为什么先看这两个?类的代码不多,可以先随便瞅瞅,不要太局限)。增加代码逻辑如下:
// 通过实现类添加listener
public void addApplicationListener(ApplicationListener<?> listener) {
synchronized (this.retrievalMutex) {
// Explicitly remove target for a proxy, if registered already,
// in order to avoid double invocations of the same listener.
Object singletonTarget = AopProxyUtils.getSingletonTarget(listener);
if (singletonTarget instanceof ApplicationListener) {
this.defaultRetriever.applicationListeners.remove(singletonTarget);
}
this.defaultRetriever.applicationListeners.add(listener);
this.retrieverCache.clear();
}
}
// 通过beanName添加listener
public void addApplicationListenerBean(String listenerBeanName) {
synchronized (this.retrievalMutex) {
this.defaultRetriever.applicationListenerBeans.add(listenerBeanName);
this.retrieverCache.clear();
}
}
复制代码
从参数可以看出,两个方法的差一点,就不细说。但是我们好奇啊,代码中的defaultRetriever
是什么,有属性?为什么 listener 保存在这个里面?还是带着疑惑,进入代码,核心代码如下:
public Collection<ApplicationListener<?>> getApplicationListeners() {
List<ApplicationListener<?>> allListeners = new ArrayList<>(
this.applicationListeners.size() + this.applicationListenerBeans.size());
allListeners.addAll(this.applicationListeners);
if (!this.applicationListenerBeans.isEmpty()) {
BeanFactory beanFactory = getBeanFactory();
for (String listenerBeanName : this.applicationListenerBeans) {
try {
ApplicationListener<?> listener = beanFactory.getBean(listenerBeanName, ApplicationListener.class);
// 是否需要过滤重复的listener
if (this.preFiltered || !allListeners.contains(listener)) {
allListeners.add(listener);
}
}
catch (NoSuchBeanDefinitionException ex) {
// Singleton listener instance (without backing bean definition) disappeared -
// probably in the middle of the destruction phase
}
}
}
if (!this.preFiltered || !this.applicationListenerBeans.isEmpty()) {
AnnotationAwareOrderComparator.sort(allListeners);
}
return allListeners;
}
复制代码
代码逻辑蛮简单的,就是讲 beanName 方式的 listener 和 listener 实现类直接结合放在一个集合中,并且进行排序返回。
接着,我们在来看getApplicationListeners(event, type)
的具体实现:
/**
* Return a Collection of ApplicationListeners matching the given
* event type. Non-matching listeners get excluded early.
* @param event the event to be propagated. Allows for excluding
* non-matching listeners early, based on cached matching information.
* @param eventType the event type
* @return a Collection of ApplicationListeners
* @see org.springframework.context.ApplicationListener
*/
protected Collection<ApplicationListener<?>> getApplicationListeners(
ApplicationEvent event, ResolvableType eventType) {
Object source = event.getSource();
Class<?> sourceType = (source != null ? source.getClass() : null);
// 通过事件类型和事件源类型构建缓存key
ListenerCacheKey cacheKey = new ListenerCacheKey(eventType, sourceType);
// Quick check for existing entry on ConcurrentHashMap...
// 是否有缓存,有缓存就快速返回
ListenerRetriever retriever = this.retrieverCache.get(cacheKey);
if (retriever != null) {
return retriever.getApplicationListeners();
}
if (this.beanClassLoader == null ||
(ClassUtils.isCacheSafe(event.getClass(), this.beanClassLoader) &&
(sourceType == null || ClassUtils.isCacheSafe(sourceType, this.beanClassLoader)))) {
... 删掉了复杂逻辑,直接看下面else代码
}
else {
// No ListenerRetriever caching -> no synchronization necessary
// 如何区分事件的执行逻辑
return retrieveApplicationListeners(eventType, sourceType, null);
}
}
复制代码
通过上面代码,我们发现,spring 是在本地对事件有缓存,通过事件类型和事件源类型构建 ListenerCacheKey。 但是还没有到核心点, 如何区分事件的呢?接着我们进入retrieveApplicationListeners(eventType, sourceType, null);
中去寻找事情的真正原因。
/**
* Actually retrieve the application listeners for the given event and source type.
* @param eventType the event type
* @param sourceType the event source type
* @param retriever the ListenerRetriever, if supposed to populate one (for caching purposes)
* @return the pre-filtered list of application listeners for the given event and source type
*/
private Collection<ApplicationListener<?>> retrieveApplicationListeners(
ResolvableType eventType, @Nullable Class<?> sourceType, @Nullable ListenerRetriever retriever) {
List<ApplicationListener<?>> allListeners = new ArrayList<>();
Set<ApplicationListener<?>> listeners;
Set<String> listenerBeans;
synchronized (this.retrievalMutex) {
listeners = new LinkedHashSet<>(this.defaultRetriever.applicationListeners);
listenerBeans = new LinkedHashSet<>(this.defaultRetriever.applicationListenerBeans);
}
// 已经加装的Listener
for (ApplicationListener<?> listener : listeners) {
if (supportsEvent(listener, eventType, sourceType)) {
if (retriever != null) {
retriever.applicationListeners.add(listener);
}
allListeners.add(listener);
}
}
// beanName的listener
if (!listenerBeans.isEmpty()) {
BeanFactory beanFactory = getBeanFactory();
for (String listenerBeanName : listenerBeans) {
try {
Class<?> listenerType = beanFactory.getType(listenerBeanName);
if (listenerType == null || supportsEvent(listenerType, eventType)) {
ApplicationListener<?> listener =
beanFactory.getBean(listenerBeanName, ApplicationListener.class);
if (!allListeners.contains(listener) && supportsEvent(listener, eventType, sourceType)) {
if (retriever != null) {
if (beanFactory.isSingleton(listenerBeanName)) {
retriever.applicationListeners.add(listener);
}
else {
retriever.applicationListenerBeans.add(listenerBeanName);
}
}
allListeners.add(listener);
}
}
}
catch (NoSuchBeanDefinitionException ex) {
// Singleton listener instance (without backing bean definition) disappeared -
// probably in the middle of the destruction phase
}
}
}
AnnotationAwareOrderComparator.sort(allListeners);
if (retriever != null && retriever.applicationListenerBeans.isEmpty()) {
retriever.applicationListeners.clear();
retriever.applicationListeners.addAll(allListeners);
}
return allListeners;
}
复制代码
代码很简单,就是把 beanName 的 Listener 和 listener 实现类拿出去,找出支持 eventType 和 sourceType 的。 自此,我们也弄懂了,是如何添加 listener 的,大功告成!!!
别捉急,我们得把上面看的代码在整理一下,把流程图完善一下,如下图所示:
3 大功告成
至此,分析完了整个流程,弄懂了 Spring 整个事件广播流程,是不是特别简单?!!!!, 没搞懂的时候觉得难,仔细看一下发现如此简单哇! 给自己加个油,可以的!
4. Spring 中的初始化流程与 Nacos 中的使用
这部分知识还有点多,暂时先不写,先放在下篇文章中,下篇文章将具体说一下我的分析 ISSUE 的流程,以及我的发现-----Nacos 中如何使用事件来将服务注册到 Nacos Discover 中的。 还有一个就是 Spring 初始化流程(这个初始化流程也包含了 Nacos 注册的一个小问题:当服务不是 web 服务的时候,Nacos 将不会注册这个服务,不知道算不算问题? 下次在接着说)。
5. 本文为个人分析,如果有问题欢迎指出, 谢谢啦!
评论