1. 背景
为啥突然想到写这个?起因就是看到了 Nacos 的#3757 ISSUE,理解错误, 以为是服务启动,没有注册上服务,实际 namespace 不同,导致服务无法注册。 但这丝毫不影响再去研究了一波代码,顺便也看到了 Nacos 是如何利用 Spring 的事件来进行服务注册的。分享一波,欢迎大家学习指正!
2. 娓娓道来 - Spring 事件
2.1 怎么发布事件
我们大家应该都知道,在 Spring 中,是通过实现org.springframework.context.ApplicationEventPublisher来发布一个事件。ApplicationEcentPublisher是一个接口,我们来看一下接口中逻辑。
public interface ApplicationEventPublisher {
/** * Notify all <strong>matching</strong> listeners registered with this * application of an application event. Events may be framework events * (such as RequestHandledEvent) or application-specific events. * @param event the event to publish * @see org.springframework.web.context.support.RequestHandledEvent */ default void publishEvent(ApplicationEvent event) { publishEvent((Object) event); }
/** * Notify all <strong>matching</strong> listeners registered with this * application of an event. * <p>If the specified {@code event} is not an {@link ApplicationEvent}, * it is wrapped in a {@link PayloadApplicationEvent}. * @param event the event to publish * @since 4.2 * @see PayloadApplicationEvent */ void publishEvent(Object event);}
复制代码
接口很简单,里面有两个方法,都是发布事件,而接口默认实现,是调用publishEvent(Object event)的实现,唯一的区别就是 default 方法的参数是具体的事件类。
既然这是个接口,那一定有实现类,那么我们肯定是要进入实现类,快捷键,查看一下具体实现类。
我们通过实现类,可以看到,应该是从AbstractApplicationContext进去(其他 Context 都是实现类,凭感觉和代码提示,从第一个类进如代码)。
进去org.springframework.context.support.AbstractApplicationContext,我们直奔主题,查看是如何实现 PublishEvent。具体代码如下:
/** * Publish the given event to all listeners. * @param event the event to publish (may be an {@link ApplicationEvent} * or a payload object to be turned into a {@link PayloadApplicationEvent}) * @param eventType the resolved event type, if known * @since 4.2 */ protected void publishEvent(Object event, @Nullable ResolvableType eventType) { Assert.notNull(event, "Event must not be null");
// Decorate event as an ApplicationEvent if necessary ApplicationEvent applicationEvent; if (event instanceof ApplicationEvent) { // 实现的具体的ApplicationEvent, 直接进行转化即可 applicationEvent = (ApplicationEvent) event; } else { // 将事件装饰成PayloadApplicationEvent, applicationEvent = new PayloadApplicationEvent<>(this, event); if (eventType == null) { eventType = ((PayloadApplicationEvent<?>) applicationEvent).getResolvableType(); } }
// Multicast right now if possible - or lazily once the multicaster is initialized if (this.earlyApplicationEvents != null) { // 广播还未初始化完成,放入earlyApplicationEvents. this.earlyApplicationEvents.add(applicationEvent); } else { // 核心关注点: 使用广播广播事件, getApplicationEventMulticaster().multicastEvent(applicationEvent, eventType); }
// Publish event via parent context as well... // 父容器的监听器进行广播 if (this.parent != null) { if (this.parent instanceof AbstractApplicationContext) { ((AbstractApplicationContext) this.parent).publishEvent(event, eventType); } else { this.parent.publishEvent(event); } } }
复制代码
在这段代码中,我们得到了两点有用的信息,分别是:
发布事件的整体流程。(封装事件,广播事件)
发布的时间通过getApplicationEventMulticaster().multicastEvent(applicationEvent, eventType);广播事件
通过代码可以知道,我们接下来的关注点是getApplicationEventMulticaster(),如何通过multicastEvent()方法将事件广播出去呢?
2.2 什么是 ApplicationEventMulticaster
进入到核心关注点代码,getApplicationEventMulticaster()是什么?
ApplicationEventMulticaster getApplicationEventMulticaster() throws IllegalStateException { // ...略... 没什么重要信息 return this.applicationEventMulticaster;}
复制代码
代码很简单,没什么逻辑,但是让我们明白了getApplicationEventMulticaster()获取到的对象是ApplicationEventMulticaster,那我们接下来搞懂这个东西(ApplicationEventMulticaster)是如何初始化?理清楚后可以弄清楚逻辑了。
轻轻松松翻到ApplicationEventMulticaster初始化的代码(给自己鼓个掌👏),代码如下:
/** * Initialize the ApplicationEventMulticaster. * Uses SimpleApplicationEventMulticaster if none defined in the context. * @see org.springframework.context.event.SimpleApplicationEventMulticaster */protected void initApplicationEventMulticaster() { ConfigurableListableBeanFactory beanFactory = getBeanFactory(); // 判断容器中是否已经创建ApplicationEventMulticaster if (beanFactory.containsLocalBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME)) { this.applicationEventMulticaster = beanFactory.getBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, ApplicationEventMulticaster.class); } else { // 初始化默认ApplicationEventMulticaster this.applicationEventMulticaster = new SimpleApplicationEventMulticaster(beanFactory); beanFactory.registerSingleton(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, this.applicationEventMulticaster); }}
复制代码
代码很简单,就是判断容器中是否已经包含了ApplicationEventMulticaster(可以理解为是否自己实现了ApplicationEventMulticaster),如果没实现,则采用 Spring 的默认实现SimpleApplicationEventMulticaster。
到目前为止,我们已经知道的信息是:
SimpleApplicationEventMulticaster是ApplicationEventMulticaster的默认实现
搞到了这个有用的信息,那么我们去搞懂SimpleApplicationEventMulticaster是什么就可以知道 Spring 的广播流程了
2.2.1 SimpleApplicationEventMulticaster 是什么?如何广播事件?
带着思考与疑惑,我们进入SimpleApplicationEventMulticaster,就看看multicastEvent()方法, 因为上面代码中有写到,发送事件是这样操作的:getApplicationEventMulticaster().multicastEvent(applicationEvent, eventType);
方法如下:
public void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) { // 解析事件类型。(不知道没关系,先通过名字猜,然后再去验证自己的猜想) ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event)); // 遍历所有的监听者, for (final ApplicationListener<?> listener : getApplicationListeners(event, type)) { Executor executor = getTaskExecutor(); if (executor != null) { // 有线程池,则用线程池 executor.execute(() -> invokeListener(listener, event)); } else { // 没有线程池则直接调用 invokeListener(listener, event); } }}
复制代码
代码看到这, 我们已经有了大概的了解,但大家应该还有两个疑问,分别是:
getApplicationListeners(event, type)是什么?(这个很重要,包含著为什么特定的 Listener 能接收到特定的事件)
invokeListener(listener, event);里面的逻辑是什么?
在这两个问题中,第一个问题我们暂时先放下,因为这暂时不涉及事件调用流程,后边说。我们就先说一下第二个以为。invokeListener(listener, event);逻辑是什么。
我们带着疑惑,接着进入代码:
private void doInvokeListener(ApplicationListener listener, ApplicationEvent event) { try { // 监听器处理事件 listener.onApplicationEvent(event); } catch (ClassCastException ex) { ...略... }}
复制代码
到这,知道了。逻辑蛮简单的,就是调用 Listener 处理事件的逻辑。我们脑海中的有个大概流程图了,如下图所示:
通过AbstractApplicationContext实现类发布事件,在通过SimpleApplicationEventMulticaster将事件广播到所有 Listener,就完成了整个过程的调用。但是,我们还有其中二块没有搞清楚,分别是:
allListener 是怎么来的呢?
为什么能根据事件类型来决定调用哪些 Listener?
2.3 Listener 的秘密
代码疑问,回到代码。从multicastEvent()方法中的getApplicationListeners(event, type)进入,发现进入到了AbstractApplicationEventMulticaster代码中,发现了 Listener 的增减和删除逻辑(为什么先看这两个?类的代码不多,可以先随便瞅瞅,不要太局限)。增加代码逻辑如下:
// 通过实现类添加listenerpublic void addApplicationListener(ApplicationListener<?> listener) { synchronized (this.retrievalMutex) { // Explicitly remove target for a proxy, if registered already, // in order to avoid double invocations of the same listener. Object singletonTarget = AopProxyUtils.getSingletonTarget(listener); if (singletonTarget instanceof ApplicationListener) { this.defaultRetriever.applicationListeners.remove(singletonTarget); } this.defaultRetriever.applicationListeners.add(listener); this.retrieverCache.clear(); }}// 通过beanName添加listenerpublic void addApplicationListenerBean(String listenerBeanName) { synchronized (this.retrievalMutex) { this.defaultRetriever.applicationListenerBeans.add(listenerBeanName); this.retrieverCache.clear(); }}
复制代码
从参数可以看出,两个方法的差一点,就不细说。但是我们好奇啊,代码中的defaultRetriever是什么,有属性?为什么 listener 保存在这个里面?还是带着疑惑,进入代码,核心代码如下:
public Collection<ApplicationListener<?>> getApplicationListeners() { List<ApplicationListener<?>> allListeners = new ArrayList<>( this.applicationListeners.size() + this.applicationListenerBeans.size()); allListeners.addAll(this.applicationListeners); if (!this.applicationListenerBeans.isEmpty()) { BeanFactory beanFactory = getBeanFactory(); for (String listenerBeanName : this.applicationListenerBeans) { try { ApplicationListener<?> listener = beanFactory.getBean(listenerBeanName, ApplicationListener.class); // 是否需要过滤重复的listener if (this.preFiltered || !allListeners.contains(listener)) { allListeners.add(listener); } } catch (NoSuchBeanDefinitionException ex) { // Singleton listener instance (without backing bean definition) disappeared - // probably in the middle of the destruction phase } } } if (!this.preFiltered || !this.applicationListenerBeans.isEmpty()) { AnnotationAwareOrderComparator.sort(allListeners); } return allListeners;}
复制代码
代码逻辑蛮简单的,就是讲 beanName 方式的 listener 和 listener 实现类直接结合放在一个集合中,并且进行排序返回。
接着,我们在来看getApplicationListeners(event, type)的具体实现:
/** * Return a Collection of ApplicationListeners matching the given * event type. Non-matching listeners get excluded early. * @param event the event to be propagated. Allows for excluding * non-matching listeners early, based on cached matching information. * @param eventType the event type * @return a Collection of ApplicationListeners * @see org.springframework.context.ApplicationListener */protected Collection<ApplicationListener<?>> getApplicationListeners( ApplicationEvent event, ResolvableType eventType) {
Object source = event.getSource(); Class<?> sourceType = (source != null ? source.getClass() : null); // 通过事件类型和事件源类型构建缓存key ListenerCacheKey cacheKey = new ListenerCacheKey(eventType, sourceType);
// Quick check for existing entry on ConcurrentHashMap... // 是否有缓存,有缓存就快速返回 ListenerRetriever retriever = this.retrieverCache.get(cacheKey); if (retriever != null) { return retriever.getApplicationListeners(); }
if (this.beanClassLoader == null || (ClassUtils.isCacheSafe(event.getClass(), this.beanClassLoader) && (sourceType == null || ClassUtils.isCacheSafe(sourceType, this.beanClassLoader)))) { ... 删掉了复杂逻辑,直接看下面else代码 } else { // No ListenerRetriever caching -> no synchronization necessary // 如何区分事件的执行逻辑 return retrieveApplicationListeners(eventType, sourceType, null); }}
复制代码
通过上面代码,我们发现,spring 是在本地对事件有缓存,通过事件类型和事件源类型构建 ListenerCacheKey。 但是还没有到核心点, 如何区分事件的呢?接着我们进入retrieveApplicationListeners(eventType, sourceType, null);中去寻找事情的真正原因。
/** * Actually retrieve the application listeners for the given event and source type. * @param eventType the event type * @param sourceType the event source type * @param retriever the ListenerRetriever, if supposed to populate one (for caching purposes) * @return the pre-filtered list of application listeners for the given event and source type */private Collection<ApplicationListener<?>> retrieveApplicationListeners( ResolvableType eventType, @Nullable Class<?> sourceType, @Nullable ListenerRetriever retriever) {
List<ApplicationListener<?>> allListeners = new ArrayList<>(); Set<ApplicationListener<?>> listeners; Set<String> listenerBeans; synchronized (this.retrievalMutex) { listeners = new LinkedHashSet<>(this.defaultRetriever.applicationListeners); listenerBeans = new LinkedHashSet<>(this.defaultRetriever.applicationListenerBeans); } // 已经加装的Listener for (ApplicationListener<?> listener : listeners) { if (supportsEvent(listener, eventType, sourceType)) { if (retriever != null) { retriever.applicationListeners.add(listener); } allListeners.add(listener); } } // beanName的listener if (!listenerBeans.isEmpty()) { BeanFactory beanFactory = getBeanFactory(); for (String listenerBeanName : listenerBeans) { try { Class<?> listenerType = beanFactory.getType(listenerBeanName); if (listenerType == null || supportsEvent(listenerType, eventType)) { ApplicationListener<?> listener = beanFactory.getBean(listenerBeanName, ApplicationListener.class); if (!allListeners.contains(listener) && supportsEvent(listener, eventType, sourceType)) { if (retriever != null) { if (beanFactory.isSingleton(listenerBeanName)) { retriever.applicationListeners.add(listener); } else { retriever.applicationListenerBeans.add(listenerBeanName); } } allListeners.add(listener); } } } catch (NoSuchBeanDefinitionException ex) { // Singleton listener instance (without backing bean definition) disappeared - // probably in the middle of the destruction phase } } } AnnotationAwareOrderComparator.sort(allListeners); if (retriever != null && retriever.applicationListenerBeans.isEmpty()) { retriever.applicationListeners.clear(); retriever.applicationListeners.addAll(allListeners); } return allListeners;}
复制代码
代码很简单,就是把 beanName 的 Listener 和 listener 实现类拿出去,找出支持 eventType 和 sourceType 的。 自此,我们也弄懂了,是如何添加 listener 的,大功告成!!!
别捉急,我们得把上面看的代码在整理一下,把流程图完善一下,如下图所示:
3 大功告成
至此,分析完了整个流程,弄懂了 Spring 整个事件广播流程,是不是特别简单?!!!!, 没搞懂的时候觉得难,仔细看一下发现如此简单哇! 给自己加个油,可以的!
4. Spring 中的初始化流程与 Nacos 中的使用
这部分知识还有点多,暂时先不写,先放在下篇文章中,下篇文章将具体说一下我的分析 ISSUE 的流程,以及我的发现-----Nacos 中如何使用事件来将服务注册到 Nacos Discover 中的。 还有一个就是 Spring 初始化流程(这个初始化流程也包含了 Nacos 注册的一个小问题:当服务不是 web 服务的时候,Nacos 将不会注册这个服务,不知道算不算问题? 下次在接着说)。
5. 本文为个人分析,如果有问题欢迎指出, 谢谢啦!
评论