写点什么

观察者模式在 spring 中的应用

  • 2022-11-28
    北京
  • 本文字数:8696 字

    阅读完需:约 29 分钟

观察者模式在spring中的应用

作者:王子源

1 观察者模式简介

1.1 定义

指多个对象间存在一对多的依赖关系,当一个对象的状态发生改变时,所有依赖于它的对象都得到通知并被自动更新。这种模式有时又称作发布-订阅模式、模型-视图模式,它是对象行为型模式。

1.2 角色介绍

在观察者模式中,有以下几个角色。

主题也叫被观察者(Subject):

  1. 定义被观察者必须实现的职责,

  2. 它能动态的增加取消观察者,它一般是抽象类或者是实现类,仅仅完成作为被观察者必须实现的职责:

  3. 管理观察者并通知观察者。

观察者(Observer):观察者接受到消息后,即进行更新操作,对接收到的信息进行处理。

具体的被观察者(ConcreteSubject):定义被观察者自己的业务逻辑,同时定义对哪些事件进行通知。

具体的观察者(ConcreteObserver):具体的观察者,每个观察者接收到消息后的处理反应是不同的,每个观察者都有自己的处理逻辑。

1.3 观察者模式的适用场景

  1. 对象间存在一对多关系,一个对象的状态发生改变会影响其他对象。

  2. 当一个抽象模型有两个方面,其中一个方面依赖于另一方面时,可将这二者封装在独立的对象中以使它们可以各自独立地改变和复用。

  3. 实现类似广播机制的功能,不需要知道具体收听者,只需分发广播,系统中感兴趣的对象会自动接收该广播。

  4. 多层级嵌套使用,形成一种链式触发机制,使得事件具备跨域(跨越两种观察者类型)通知。

2 观察者模式在 Spring 中的应用

2.1 spring 的事件监听机制

Spring 事件机制是观察者模式的实现。ApplicationContext 中事件处理是由 ApplicationEvent 类和 ApplicationListener 接口来提供的。如果一个 Bean 实现了 ApplicationListener 接口,并且已经发布到容器中去,每次 ApplicationContext 发布一个 ApplicationEvent 事件,这个 Bean 就会接到通知。ApplicationEvent 事件的发布需要显示触发,要么 Spring 触发,要么我们编码触发。spring 内置事件由 spring 触发。我们先来看一下,如何自定义 spring 事件,并使其被监听和发布。

2.1.1 事件

事件,ApplicationEvent,该抽象类继承了 EventObject,EventObject 是 JDK 中的类,并建议所有的事件都应该继承自 EventObject。



public abstract class ApplicationEvent extends EventObject {    private static final long serialVersionUID = 7099057708183571937L;    private final long timestamp = System.currentTimeMillis();    public ApplicationEvent(Object source) {        super(source);    }    public final long getTimestamp() {        return this.timestamp;    }}
复制代码
2.1.2 监听器

ApplicationListener,是一个接口,该接口继承了 EventListener 接口。EventListener 接口是 JDK 中的,建议所有的事件监听器都应该继承 EventListener。



@FunctionalInterfacepublic interface ApplicationListener<E extends ApplicationEvent> extends EventListener {    void onApplicationEvent(E var1);}
复制代码
2.1.3 事件发布器

ApplicationEventPublisher,ApplicationContext 继承了该接口,在 ApplicationContext 的抽象实现类 AbstractApplicationContext 中做了实现下面我们来看一下

org.springframework.context.support.AbstractApplicationContext#publishEvent(java.lang.Object, org.springframework.core.ResolvableType)

protected void publishEvent(Object event, @Nullable ResolvableType eventType) {   Assert.notNull(event, "Event must not be null");   ApplicationEvent applicationEvent;   if (event instanceof ApplicationEvent) {      applicationEvent = (ApplicationEvent) event;   }   else {      applicationEvent = new PayloadApplicationEvent<>(this, event);      if (eventType == null) {         eventType = ((PayloadApplicationEvent<?>) applicationEvent).getResolvableType();      }   }   if (this.earlyApplicationEvents != null) {      this.earlyApplicationEvents.add(applicationEvent);   }   else {//获取当前注入的发布器,执行发布方法      getApplicationEventMulticaster().multicastEvent(applicationEvent, eventType);   }   if (this.parent != null) {      if (this.parent instanceof AbstractApplicationContext) {         ((AbstractApplicationContext) this.parent).publishEvent(event, eventType);      }      else {         this.parent.publishEvent(event);      }   }}
复制代码

我们可以看到,AbstractApplicationContext 中 publishEvent 方法最终执行发布事件的是 ApplicationEventMulticaster#multicastEvent 方法,下面我们再来一起看一下 multicastEvent 方法

public void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) {   ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event));   Executor executor = getTaskExecutor();//拿到所有的监听器,如果异步执行器不为空,异步执行   for (ApplicationListener<?> listener : getApplicationListeners(event, type)) {      if (executor != null) {         executor.execute(() -> invokeListener(listener, event));      }      else {//执行监听方法         invokeListener(listener, event);      }   }}    protected void invokeListener(ApplicationListener<?> listener, ApplicationEvent event) {        ErrorHandler errorHandler = getErrorHandler();        if (errorHandler != null) {            try {                doInvokeListener(listener, event);            }            catch (Throwable err) {                errorHandler.handleError(err);            }        }        else {            doInvokeListener(listener, event);        }    }    private void doInvokeListener(ApplicationListener listener, ApplicationEvent event) {        try {            listener.onApplicationEvent(event);        }        catch (ClassCastException ex) {            String msg = ex.getMessage();            if (msg == null || matchesClassCastMessage(msg, event.getClass())) {                Log logger = LogFactory.getLog(getClass());                if (logger.isTraceEnabled()) {                    logger.trace("Non-matching event type for listener: " + listener, ex);                }            }            else {                throw ex;            }        }    }
复制代码

上面介绍了非 spring 内置的事件发布和监听执行流程。总结一下



  1. 定义一个事件,该事件继承 ApplicationEvent

  2. 定义一个监听器,实现 ApplicationListener 接口

  3. 定义一个事件发布器,实现 ApplicationEventPublisherAware 接口

  4. 调用 ApplicationEventPublisher#publishEvent 方法.

2.2 spring 事件实现原理

上面我们讲到了 spring 事件的发布,那么 spring 事件发布之后,spring 是如何根据事件找到事件对应的监听器呢?我们一起来探究一下。

spring 的容器初始化过程想必大家都已十分了解,这里就不过多赘述,我们直接看 refresh 方法在 refresh 方法中,有这样两个方法,initApplicationEventMulticaster()和 registerListeners()

我们先来看 initApplicationEventMulticaster()方法

2.2.1 initApplicationEventMulticaster()

org.springframework.context.support.AbstractApplicationContext#initApplicationEventMulticaster

public static final String APPLICATION_EVENT_MULTICASTER_BEAN_NAME = "applicationEventMulticaster";protected void initApplicationEventMulticaster() {//获得beanFactory   ConfigurableListableBeanFactory beanFactory = getBeanFactory();//BeanFactory中是否有ApplicationEventMulticaster   if (beanFactory.containsLocalBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME)) {      this.applicationEventMulticaster =            beanFactory.getBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, ApplicationEventMulticaster.class);      if (logger.isTraceEnabled()) {         logger.trace("Using ApplicationEventMulticaster [" + this.applicationEventMulticaster + "]");      }   }   else {//如果BeanFactory中不存在,就创建一个SimpleApplicationEventMulticaster      this.applicationEventMulticaster = new SimpleApplicationEventMulticaster(beanFactory);      beanFactory.registerSingleton(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, this.applicationEventMulticaster);      if (logger.isTraceEnabled()) {         logger.trace("No '" + APPLICATION_EVENT_MULTICASTER_BEAN_NAME + "' bean, using " +               "[" + this.applicationEventMulticaster.getClass().getSimpleName() + "]");      }   }}
复制代码

上述代码我们可以看出,spring 先从 BeanFactory 中获取 applicationEventMulticaster 如果为空,则直接创建 SimpleApplicationEventMulticaster



2.2.2 registerListeners()

org.springframework.context.support.AbstractApplicationContext#registerListeners

registerListeners 是将各种实现了 ApplicationListener 的监听器注册到 ApplicationEventMulticaster 事件广播器中

protected void registerListeners() {   for (ApplicationListener<?> listener : getApplicationListeners()) {      getApplicationEventMulticaster().addApplicationListener(listener);   }   String[] listenerBeanNames = getBeanNamesForType(ApplicationListener.class, true, false);   for (String listenerBeanName : listenerBeanNames) {//把监听器注册到事件发布器上      getApplicationEventMulticaster().addApplicationListenerBean(listenerBeanName);   }   Set<ApplicationEvent> earlyEventsToProcess = this.earlyApplicationEvents;   this.earlyApplicationEvents = null;   if (!CollectionUtils.isEmpty(earlyEventsToProcess)) {//如果内置监听事件集合不为空      for (ApplicationEvent earlyEvent : earlyEventsToProcess) {//执行spring内置的监听方法         getApplicationEventMulticaster().multicastEvent(earlyEvent);      }   }}
复制代码

这里解释一下 earlyApplicationListeners

earlyApplicationListeners 的本质还是 ApplicationListener。Spring 单例 Ban 的实例化是在 Refresh 阶段实例化的,那么用户自定义的一些 ApplicationListener 组件自然也是在这个阶段才初始化,但是 Spring 容器启动过程中,在 Refresh 完成之前还有很多事件:如 Spring 上下文环境准备等事件,这些事件又是 Spring 容器启动必须要监听的。所以 Spring 定义了一个 earlyApplicationListeners 集合,这个集合中的 Listener 在 factories 文件中定义好,在容器 Refresh 之前预先实例化好,然后就可以监听 Spring 容器启动过程中的所有事件。

当 registerListeners 方法执行完成,我们的监听器已经添加到多播器 SimpleApplicationEventMulticaster 中了,并且 earlyEvent 早期事件也已经执行完毕。但是我们发现,如果自定义了一个监听器去监听 spring 内置的事件,此时并没有被执行,那我们注册的监听器是如何被执行的呢?答案在 finishRefresh 方法中。

2.2.3 finishRefresh

org.springframework.context.support.AbstractApplicationContext#finishRefresh

protected void finishRefresh() {   clearResourceCaches();   initLifecycleProcessor();   getLifecycleProcessor().onRefresh();   //容器中的类全部初始化完毕后,触发刷新事件   publishEvent(new ContextRefreshedEvent(this));   LiveBeansView.registerApplicationContext(this);}
复制代码

如果我们想要实现在 spring 容器中所有 bean 创建完成后做一些扩展功能,我们就可以实现 ApplicationListener 这样我们就可以实现其功能了。至此,Spring 中同步的事件监听发布模式我们就讲解完了,当然 Spring 还支持异步的消息监听执行机制。

2.2.4 spring 中异步的监听执行机制

我们回过头来看一下 ApplicationEventMulticaster#pushEvent 方法

protected void publishEvent(Object event, @Nullable ResolvableType eventType) {   Assert.notNull(event, "Event must not be null");   ApplicationEvent applicationEvent;   if (event instanceof ApplicationEvent) {      applicationEvent = (ApplicationEvent) event;   }   else {      applicationEvent = new PayloadApplicationEvent<>(this, event);      if (eventType == null) {         eventType = ((PayloadApplicationEvent<?>) applicationEvent).getResolvableType();      }   }   if (this.earlyApplicationEvents != null) {      this.earlyApplicationEvents.add(applicationEvent);   }   else {    //获取当前注入的发布器,执行发布方法      getApplicationEventMulticaster().multicastEvent(applicationEvent, eventType);   }   if (this.parent != null) {      if (this.parent instanceof AbstractApplicationContext) {         ((AbstractApplicationContext) this.parent).publishEvent(event, eventType);      }      else {         this.parent.publishEvent(event);      }   }
复制代码

最终执行发布事件的是 ApplicationEventMulticaster#multicastEvent 方法,下面我们再来一起看一下 multicastEvent 方法

public void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) {   ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event));   Executor executor = getTaskExecutor();//拿到所有的监听器,如果异步执行器不为空,异步执行   for (ApplicationListener<?> listener : getApplicationListeners(event, type)) {      if (executor != null) {         executor.execute(() -> invokeListener(listener, event));      }      else {//执行监听方法         invokeListener(listener, event);      }   }}
复制代码

可以看到,异步事件通知主要依靠 SimpleApplicationEventMulticaster 类中的 Executor 去实现的,如果这个变量不配置的话默认事件通知是同步的, 否则就是异步通知了,要实现同时支持同步通知和异步通知就得从这里下手;我们上文已经分析过了在 initApplicationEventMulticaster 方法中有这样一段代码

if (beanFactory.containsLocalBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME)) {   this.applicationEventMulticaster =         beanFactory.getBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, ApplicationEventMulticaster.class);   if (logger.isTraceEnabled()) {      logger.trace("Using ApplicationEventMulticaster [" + this.applicationEventMulticaster + "]");   }}
复制代码

如果 BeanFactory 中已经有了 SimpleApplicationEventMulticaster 则不会重新创建,那么我们可以再 spring 中注册一个 SimpleApplicationEventMulticaster 并且向其中注入对应的 Executor 这样我们就可以得到一个异步执行监听的 SimpleApplicationEventMulticaster 了,我们的通知就会通过 Executor 异步执行。这样可以大大提高事件发布的效率。

在 springboot 项目中我们可以增加一个配置类来实现

@Configuration@EnableAsyncpublic class Config {    @Bean(AbstractApplicationContext.APPLICATION_EVENT_MULTICASTER_BEAN_NAME)    public SimpleApplicationEventMulticaster simpleApplicationEventMulticaster(){        SimpleApplicationEventMulticaster simpleApplicationEventMulticaster = new SimpleApplicationEventMulticaster();        simpleApplicationEventMulticaster.setTaskExecutor(taskExecutor());        return simpleApplicationEventMulticaster;    }    @Bean    public TaskExecutor taskExecutor() {        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();        executor.setCorePoolSize(5);        executor.setMaxPoolSize(20);        executor.setQueueCapacity(100);        executor.setKeepAliveSeconds(300);        executor.setThreadNamePrefix("thread-");        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());        executor.setWaitForTasksToCompleteOnShutdown(true);        return executor;    }}
复制代码

spring 项目中我们也可以增加如下 xml 配置

<!--定义事件异步处理--><bean id="commonTaskExecutor"          class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">        <!-- 线程池维持处于Keep-alive状态的线程数量。如果设置了allowCoreThreadTimeOut为true,该值可能为0。            并发线程数,想达到真正的并发效果,最好对应CPU的线程数及核心数 -->        <property name="corePoolSize" value="5" />        <!-- 最大线程池容量 -->        <property name="maxPoolSize" value="20" />        <!-- 超过最大线程池容量后,允许的线程队列数 -->        <property name="queueCapacity" value="100" />        <!-- 线程池维护线程所允许的空闲时间 .单位毫秒,默认为60s,超过这个时间后会将大于corePoolSize的线程关闭,保持corePoolSize的个数 -->        <property name="keepAliveSeconds" value="300" />        <!-- 允许核心线程超时: false(默认值)不允许超时,哪怕空闲;true则使用keepAliveSeconds来控制等待超时时间,最终corePoolSize的个数可能为0 -->        <property name="allowCoreThreadTimeOut" value="true" />        <!-- 线程池对拒绝任务(无线程可用)的处理策略 -->        <property name="rejectedExecutionHandler">            <bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy" />            <!-- java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy:主线程直接执行该任务,执行完之后尝试添加下一个任务到线程池中 -->            <!-- java.util.concurrent.ThreadPoolExecutor$AbortPolicy:直接抛出java.util.concurrent.RejectedExecutionException异常 -->        </property>    </bean>    <!--名字必须是applicationEventMulticaster,因为AbstractApplicationContext默认找个-->    <bean id="applicationEventMulticaster" class="org.springframework.context.event.SimpleApplicationEventMulticaster">        <!--注入任务执行器 这样就实现了异步调用-->        <property name="taskExecutor" ref="commonTaskExecutor"></property> </bean><!--定义事件异步处理--><bean id="commonTaskExecutor"          class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">        <!-- 线程池维持处于Keep-alive状态的线程数量。如果设置了allowCoreThreadTimeOut为true,该值可能为0。            并发线程数,想达到真正的并发效果,最好对应CPU的线程数及核心数 -->        <property name="corePoolSize" value="5" />        <!-- 最大线程池容量 -->        <property name="maxPoolSize" value="20" />        <!-- 超过最大线程池容量后,允许的线程队列数 -->        <property name="queueCapacity" value="100" />        <!-- 线程池维护线程所允许的空闲时间 .单位毫秒,默认为60s,超过这个时间后会将大于corePoolSize的线程关闭,保持corePoolSize的个数 -->        <property name="keepAliveSeconds" value="300" />        <!-- 允许核心线程超时: false(默认值)不允许超时,哪怕空闲;true则使用keepAliveSeconds来控制等待超时时间,最终corePoolSize的个数可能为0 -->        <property name="allowCoreThreadTimeOut" value="true" />        <!-- 线程池对拒绝任务(无线程可用)的处理策略 -->        <property name="rejectedExecutionHandler">            <bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy" />            <!-- java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy:主线程直接执行该任务,执行完之后尝试添加下一个任务到线程池中 -->            <!-- java.util.concurrent.ThreadPoolExecutor$AbortPolicy:直接抛出java.util.concurrent.RejectedExecutionException异常 -->        </property>    </bean>    <!--名字必须是applicationEventMulticaster,因为AbstractApplicationContext默认找个-->    <bean id="applicationEventMulticaster" class="org.springframework.context.event.SimpleApplicationEventMulticaster">        <!--注入任务执行器 这样就实现了异步调用-->        <property name="taskExecutor" ref="commonTaskExecutor"></property> </bean>
复制代码

3 小结

本文主要讲解了观察者模式在 spring 中的应用及事件监听机制,JDK 也有实现提供事件监听机制 Spring 的事件机制也是基于 JDK 来扩展的。Spring 的事件机制默认是同步阻塞的,想要提升对应的效率要考虑异步事件。


发布于: 刚刚阅读数: 3
用户头像

拥抱技术,与开发者携手创造未来! 2018-11-20 加入

我们将持续为人工智能、大数据、云计算、物联网等相关领域的开发者,提供技术干货、行业技术内容、技术落地实践等文章内容。京东云开发者社区官方网站【https://developer.jdcloud.com/】,欢迎大家来玩

评论

发布
暂无评论
观察者模式在spring中的应用_Java_京东科技开发者_InfoQ写作社区