写点什么

【云原生】Nacos 中的事件发布与订阅 -- 观察者模式

  • 2022 年 9 月 12 日
    江西
  • 本文字数:7197 字

    阅读完需:约 24 分钟

作者石臻臻,CSDN 博客之星 Top5Kafka Contributornacos Contributor华为云 MVP,腾讯云 TVP,滴滴 Kafka 技术专家 KnowStreaming


KnowStreaming 是滴滴开源的Kafka运维管控平台, 有兴趣一起参与参与开发的同学,但是怕自己能力不够的同学,可以联系我,当你导师带你参与开源!

1EventDispatcher

EventDispatcher 在 Nacos 中是一个事件发布与订阅的类,也就是我们经常使用的 Java 设计模式——观察者模式

一般发布与订阅主要有三个角色

  • 事件:表示某些类型的事件动作,例如 Nacos 中的 本地数据发生变更事件 LocalDataChangeEvent

  • 事件源 : 事件源可以看成是一个动作,某个事件发生的动作,例如 Nacos 中本地数据发生了变更,就会通知给所有监听该事件的监听器

  • 事件监听器: 事件监听器监听到事件源之后,会执行自己的一些业务处理,监听器必须要有回调方法供事件源回调

一个监听器可以监听多个事件,一个事件也可以被多个监听器监听

那我们看看这个类中的角色

2 事件

Event

    /**事件定义接口,所有事件继承这个空接口**/    public interface Event {    }
复制代码

LocalDataChangeEvent

/** * 本地数据发生变更的事件。 * @author Nacos */public class LocalDataChangeEvent implements Event {}
复制代码

事件监听器

AbstractEventListener

 /**抽象事件监听器; 每个监听器需要实现onEvent()处理事件,和interest()将要监听的事件列表**/    static public abstract class AbstractEventListener {        public AbstractEventListener() {            /*自动注册到*/            EventDispatcher.addEventListener(this);        }        /**感兴趣的事件列表**/        abstract public List<Class<? extends Event>> interest();  /**处理事件**/        abstract public void onEvent(Event event);    }
复制代码

LongPollingService

/** * 长轮询服务。负责处理 * * @author Nacos */@Servicepublic class LongPollingService extends AbstractEventListener { @Override    public List<Class<? extends Event>> interest() {        List<Class<? extends Event>> eventTypes = new ArrayList<Class<? extends Event>>();        eventTypes.add(LocalDataChangeEvent.class);        return eventTypes;    }    @Override    public void onEvent(Event event) {        if (isFixedPolling()) {            // ignore        } else {            if (event instanceof LocalDataChangeEvent) {                LocalDataChangeEvent evt = (LocalDataChangeEvent)event;                scheduler.execute(new DataChangeTask(evt.groupKey, evt.isBeta, evt.betaIps));            }        }    }}
复制代码

事件分发类

EventDispatcher

public class EventDispatcher {    /**事件与事件监听器的数据中心; 一个事件可以对应着多个监听器**/    static final CopyOnWriteArrayList<Entry> LISTENER_HUB = new CopyOnWriteArrayList<Entry>(); /**     * 新增监听器     */    static public void addEventListener(AbstractEventListener listener) {        for (Class<? extends Event> type : listener.interest()) {            getEntry(type).listeners.addIfAbsent(listener);        }    }    /**     * 事件源调用的接口动作,告知某个事件发生了     */    static public void fireEvent(Event event) {        if (null == event) {            throw new IllegalArgumentException();        }        for (AbstractEventListener listener : getEntry(event.getClass()).listeners) {            try {                listener.onEvent(event);            } catch (Exception e) {                log.error(e.toString(), e);            }        }    }
复制代码

事件源

例如当删除配置文件的时候,就需要触发本地数据变更事件,则需要调用

EventDispatcher.fireEvent(new LocalDataChangeEvent(groupKey));
复制代码

调用了 fireEvent 之后所有监听这个 Event 的监听器都将执行listener.onEvent(event);


事件发布与订阅的使用方法有很多,但是基本模式都是一样的---观察者模式;我们介绍一下其他的用法

3Google Guava 中的 EventBus

EventBus 是 Guava 的事件处理机制,是设计模式中的观察者模式(生产/消费者编程模型)的优雅实现。对于事件监听和发布订阅模式,EventBus 是一个非常优雅和简单解决方案,我们不用创建复杂的类和接口层次结构。

EventBucket

我们自定义一个类 EventBucket,来初始化及注册一些监听器(订阅者)

@Componentpublic class EventBucket {    private static Logger logger = LoggerFactory.getLogger(EventBucket.class);    /**事件总线**/    private static EventBus asyncEventBus = new AsyncEventBus("asyncEventBus", Executors.newFixedThreadPool(5));    private static AtomicBoolean isInit = new AtomicBoolean(false);    private final List<AsyncListener> asyncListenerList;    /**将所有类型为AsyncListener的监听器注入**/    @Autowired    public EventBucket(List<AsyncListener> asyncListenerList) {        this.asyncListenerList = asyncListenerList;    }    @PostConstruct    public synchronized void init() {        //要将所有的事件监听者都在 EventBus中去注册        if (isInit.compareAndSet(false, true)) {            asyncListenerList.forEach(a -> asyncEventBus.register(a));        }    }    /**发送事件**/    public static void post(BaseEvent event) {        try {            asyncEventBus.post(event);        } catch (Throwable e) {            logger.error("EventBucket发送事件出错: " + e.getMessage(), e);        }    }}

复制代码

BaseEvent

定义 BaseEvent 这个类有个 post 方法,用来发送事件的;所有的**事件必须继承此类

public class BaseEvent {    public void post(){        EventBucket.post(this);    }}
复制代码

AsyncListener

定义一个监听器空接口,所有继承此接口的监听器类都将被注册到 EventBus 中;

public interface AsyncListener {}
复制代码

上面定义好了基本的类,那我们下面测试怎么使用发布以及订阅首先订阅一个事件 TestEvent

public class TestEvent extends BaseEvent {    private Integer id;    public Integer getId() {        return id;    }    public void setId(Integer id) {        this.id = id;    }}
复制代码

然后定义一个监听器 TestAsyncListener

@Componentpublic class TestAsyncListener implements AsyncListener {    @Subscribe    public void testEvent(TestEvent testEvent){        System.out.print("我是TestAsyncListener接收到了TestEvent通知"+testEvent.toString());    }    @Subscribe    public void test2Event(Test2Event test2Event){        System.out.print("我是TestAsyncListener接收到了test2Event通知"+test2Event.toString());    }}
复制代码

用注解@Subscribe 就可以直接订阅事件了;那么接下来开始发送一个事件;我们再 SpringBoot 启动完之后调用一下发送事件通知

@SpringBootApplicationpublic class ClientsApplication {    public static void main(String[] args) {        SpringApplication.run(ClientsApplication.class, args);        TestEvent event = new TestEvent();        event.setId(1);        //发布通知        event.post();    }}

复制代码

启动完成之后,立马打印了


4Spring 事件驱动机制

这篇博客写的比较详细可以前往阅读Spring事件驱动机制

5Nacos 中使用的监听扩展接口

  • [ ] SpringApplicationRunListener

  • [ ] ApplicationListener

SpringApplicationRunListener

SpringApplicationRunListener 接口的作用主要就是在 Spring Boot 启动初始化的过程中可以通过 SpringApplicationRunListener 接口回调来让用户在启动的各个流程中可以加入自己的逻辑。它也是 观察者模式,Spring 为我们提供了这个监听器的扩展接口;它监听的就是 SpringBoot 启动初始化中下面的各个事件

SpringBoot 启动过程的关键事件(按照触发顺序)包括:1.开始启动 2.Environment 构建完成 3.ApplicationContext 构建完成 4.ApplicationContext 完成加载 5.ApplicationContext 完成刷新并启动 6.启动完成 7.启动失败

package org.springframework.boot;public interface SpringApplicationRunListener {    // 在run()方法开始执行时,该方法就立即被调用,可用于在初始化最早期时做一些工作    void starting();    // 当environment构建完成,ApplicationContext创建之前,该方法被调用    void environmentPrepared(ConfigurableEnvironment environment);    // 当ApplicationContext构建完成时,该方法被调用    void contextPrepared(ConfigurableApplicationContext context);    // 在ApplicationContext完成加载,但没有被刷新前,该方法被调用    void contextLoaded(ConfigurableApplicationContext context);    // 在ApplicationContext刷新并启动后,CommandLineRunners和ApplicationRunner未被调用前,该方法被调用    void started(ConfigurableApplicationContext context);    // 在run()方法执行完成前该方法被调用    void running(ConfigurableApplicationContext context);    // 当应用运行出错时该方法被调用    void failed(ConfigurableApplicationContext context, Throwable exception);}
复制代码

StartingSpringApplicationRunListener

-在这个监听类中,主要是做了一些系统属性的设置;如:

nacos.mode=stand alone / clusternacos.function.mode=All/config/namingnacos.local.ip=InetUtils.getSelfIp()
复制代码


@Override    public void environmentPrepared(ConfigurableEnvironment environment) {        if (STANDALONE_MODE) {            System.setProperty(MODE_PROPERTY_KEY_STAND_MODE, "stand alone");        } else {            System.setProperty(MODE_PROPERTY_KEY_STAND_MODE, "cluster");        }        if (FUNCTION_MODE == null) {           System.setProperty(MODE_PROPERTY_KEY_FUNCTION_MODE, "All");        } else if(SystemUtils.FUNCTION_MODE_CONFIG.equals(FUNCTION_MODE)){            System.setProperty(MODE_PROPERTY_KEY_FUNCTION_MODE, SystemUtils.FUNCTION_MODE_CONFIG);        } else if(SystemUtils.FUNCTION_MODE_NAMING.equals(FUNCTION_MODE)) {            System.setProperty(MODE_PROPERTY_KEY_FUNCTION_MODE, SystemUtils.FUNCTION_MODE_NAMING);        }        System.setProperty(LOCAL_IP_PROPERTY_KEY, LOCAL_IP);    }
复制代码
  • 还有顺便再启动结束之前,每秒中打印一次日志 Nacos is starting...

ApplicationListener

ApplicationListener 就是 spring 的监听器,能够用来监听事件,典型的观察者模式

@FunctionalInterfacepublic interface ApplicationListener<E extends ApplicationEvent> extends EventListener { /**  * Handle an application event.  * @param event the event to respond to  */ void onApplicationEvent(E event);}
复制代码

ApplicationEvent事件的抽象类; 具体的事件必须继承这个类;

Nacos 中StandaloneProfileApplicationListener

public class StandaloneProfileApplicationListener implements ApplicationListener<ApplicationEnvironmentPreparedEvent>,    PriorityOrdered {    private static final Logger logger = LoggerFactory.getLogger(StandaloneProfileApplicationListener.class);    @Override    public void onApplicationEvent(ApplicationEnvironmentPreparedEvent event) {        ConfigurableEnvironment environment = event.getEnvironment();        if (environment.getProperty(STANDALONE_MODE_PROPERTY_NAME, boolean.class, false)) {            environment.addActiveProfile(STANDALONE_SPRING_PROFILE);        }        if (logger.isInfoEnabled()) {            logger.info("Spring Environment's active profiles : {} in standalone mode : {}",                Arrays.asList(environment.getActiveProfiles()),                STANDALONE_MODE            );        }    }    @Override    public int getOrder() {        return HIGHEST_PRECEDENCE;    }}
复制代码

这里的监听器的泛型传的ApplicationEnvironmentPreparedEvent这个事件是 SpringBoot 内置的事件;

SpringApplication 启动并且 Environment 首次可用于检查和修改时发布的事件,也就是说通过 ApplicationEnvironmentPreparedEvent 可以拿到 Environment 的属性;

在这里这个监听器的作用就是拿到ConfigurableEnvironment,然后如果是单机模式standalone就设置一下ActiveProfile

6EnvironmentPostProcessor 加载外部配置文件

SpringBoot 支持动态的读取文件,留下的扩展接口org.springframework.boot.env.EnvironmentPostProcessor。这个接口是 spring 包下的,使用这个进行配置文件的集中管理,而不需要每个项目都去配置配置文件。

NacosDefaultPropertySourceEnvironmentPostProcessor 加载 Nacos 配置文件

加载这个类是加载 core 模块下面的META-INF/nacos-default.properties 配置文件的;

public class NacosDefaultPropertySourceEnvironmentPostProcessor implements EnvironmentPostProcessor, Ordered {@Override    public void postProcessEnvironment(ConfigurableEnvironment environment, SpringApplication application) {        ResourceLoader resourceLoader = getResourceLoader(application);        processPropertySource(environment, resourceLoader);    }}
复制代码

7Spring Factories SPI 扩展机制

Spring Boot 中有一种非常解耦的扩展机制:Spring Factories。这种扩展机制实际上是仿照 Java 中的 SPI 扩展机制来实现的

简单的总结下 java SPI 机制的思想。我们系统里抽象的各个模块,往往有很多不同的实现方案,比如日志模块的方案,xml 解析模块、jdbc 模块的方案等。面向的对象的设计里,我们一般推荐模块之间基于接口编程,模块之间不对实现类进行硬编码。一旦代码里涉及具体的实现类,就违反了可拔插的原则,如果需要替换一种实现,就需要修改代码。为了实现在模块装配的时候能不在程序里动态指明,这就需要一种服务发现机制;

在 Dubbo 中也定义了 SPI 机制;

在 Spring 中也有一种类似与 Java SPI 的加载机制。它在 META-INF/spring.factories 文件中配置接口的实现类名称,然后在程序中读取这些配置文件并实例化。这种自定义的 SPI 机制是 Spring Boot Starter 实现的基础。

我们看上面说的 Nacos 中的几个类,并没有打上 @Component 等等 Spring 中的注解,没有这些注解那么他们是怎么被加载到 Spring 容器中被管理的呢?

我们打开文件 core/resources/META-INF/spring.factories

# ApplicationListenerorg.springframework.context.ApplicationListener=\com.alibaba.nacos.core.listener.StandaloneProfileApplicationListener
# EnvironmentPostProcessororg.springframework.boot.env.EnvironmentPostProcessor=\com.alibaba.nacos.core.env.NacosDefaultPropertySourceEnvironmentPostProcessor
# SpringApplicationRunListenerorg.springframework.boot.SpringApplicationRunListener=\com.alibaba.nacos.core.listener.LoggingSpringApplicationRunListener,\com.alibaba.nacos.core.listener.StartingSpringApplicationRunListener
复制代码

上面提及到的几个类的全类名都在这个文件中;

  • Spring Factories 实现原理是什么 spring-core 包里定义了 SpringFactoriesLoader 类,这个类实现了检索 META-INF/spring.factories 文件,并获取指定接口的配置的功能

具体的实现原理,后面可以再写一篇文章;

发布于: 刚刚阅读数: 3
用户头像

关注公众号: 石臻臻的杂货铺 获取最新文章 2019.09.06 加入

进高质量滴滴技术交流群,只交流技术不闲聊 加 szzdzhp001 进群 20w字《Kafka运维与实战宝典》PDF下载请关注公众号:石臻臻的杂货铺

评论

发布
暂无评论
【云原生】Nacos中的事件发布与订阅--观察者模式_云原生_石臻臻的杂货铺_InfoQ写作社区