一文读懂 Guava EventBus(订阅\发布事件)
作者:京东科技 刘子洋
背景
最近项目出现同一消息发送多次的现象,对下游业务方造成困扰,经过排查发现使用 EventBus 方式不正确。也借此机会学习了下 EventBus 并进行分享。以下为分享内容,本文主要分为五个部分,篇幅较长,望大家耐心阅读。
1、简述:简单介绍 EventBus 及其组成部分。
2、原理解析:主要对 listener 注册流程及 Event 发布流程进行解析。
3、使用指导:EventBus 简单的使用指导。
4、注意事项:在使用 EventBus 中需要注意的一些隐藏逻辑。
5、分享时提问的问题
6、项目中遇到的问题:上述问题进行详细描述并复现场景。
1、简述
1.1、概念
下文摘自 EventBus 源码注释,从注释中可以直观了解到他的功能、特性、注意事项。
【源码注释】
Dispatches events to listeners, and provides ways for listeners to register themselves.
The EventBus allows publish-subscribe-style communication between components without requiring the components to explicitly register with one another (and thus be aware of each other). It is designed exclusively to replace traditional Java in-process event distribution using explicit registration. It is not a general-purpose publish-subscribe system, nor is it intended for interprocess communication.
Receiving Events
To receive events, an object should:
Expose a public method, known as the event subscriber, which accepts a single argument of the type of event desired;
Mark it with a Subscribe annotation;
Pass itself to an EventBus instance's register(Object) method.
Posting Events
To post an event, simply provide the event object to the post(Object) method. The EventBus instance will determine the type of event and route it to all registered listeners.
Events are routed based on their type — an event will be delivered to any subscriber for any type to which the event is assignable. This includes implemented interfaces, all superclasses, and all interfaces implemented by superclasses.
When post is called, all registered subscribers for an event are run in sequence, so subscribers should be reasonably quick. If an event may trigger an extended process (such as a database load), spawn a thread or queue it for later. (For a convenient way to do this, use an AsyncEventBus.)
Subscriber Methods
Event subscriber methods must accept only one argument: the event.
Subscribers should not, in general, throw. If they do, the EventBus will catch and log the exception. This is rarely the right solution for error handling and should not be relied upon; it is intended solely to help find problems during development.
The EventBus guarantees that it will not call a subscriber method from multiple threads simultaneously, unless the method explicitly allows it by bearing the AllowConcurrentEvents annotation. If this annotation is not present, subscriber methods need not worry about being reentrant, unless also called from outside the EventBus.
Dead Events
If an event is posted, but no registered subscribers can accept it, it is considered "dead." To give the system a second chance to handle dead events, they are wrapped in an instance of DeadEvent and reposted.
If a subscriber for a supertype of all events (such as Object) is registered, no event will ever be considered dead, and no DeadEvents will be generated. Accordingly, while DeadEvent extends Object, a subscriber registered to receive any Object will never receive a DeadEvent.
This class is safe for concurrent use.
See the Guava User Guide article on EventBus.
Since:
10.0
Author:
Cliff Biffle
1.2、系统流程
1.3、组成部分
1.3.1、调度器
EventBus、AsyncEventBus 都是一个调度的角色,区别是一个同步一个异步。
EventBus
AsyncEventBus
1.3.2、事件承载器
Event
DeadEvent
1.3.3、事件注册中心
SubscriberRegistry
1.3.4、事件分发器
Dispatcher
Dispatcher 有三个子类,用以满足不同的分发情况
1.PerThreadQueuedDispatcher
2.LegacyAsyncDispatcher
3.ImmediateDispatcher
1.3.4、订阅者
Subscriber
SynchronizedSubscriber
2、原理解析
2.1、主体流程
listener 通过 EventBus 进行注册。
SubscriberRegister 会根据 listener、listener 中含有【@Subscribe】注解的方法及各方法参数创建 Subscriber 对象,并将其维护在 Subscribers(ConcurrentMap 类型,key 为 event 类对象,value 为 subscriber 集合)中。
publisher 发布事件 Event。
发布 Event 后,EventBus 会从 SubscriberRegister 中查找出所有订阅此事件的 Subscriber,然后让 Dispatcher 分发 Event 到每一个 Subscriber。
流程如下:
2.2、listener 注册原理
2.2.1、listener 注册流程
缓存所有含有 @Subscribe 注解方法到 subscriberMethodsCache(LoadingCache<Class<?>, ImmutableList>, key 为 listener,value 为 method 集合)。
listener 注册。
2.2.2、原理分析
获取含有 @Subscribe 注释的方法进行缓存
找到所有被【@Subscribe】修饰的方法,并进行缓存
注意!!!这两个方法被 static 修饰,类加载的时候就进行寻找
订阅者唯一标识是【方法名+入参】
注册订阅者
1.注册方法
创建 Subscriber 时,如果 method 含有【@AllowConcurrentEvents】注释,则创建 SynchronizedSubscriber,否则创建 Subscriber
2、获取所有订阅者
3、从缓存中获取所有订阅方法
2.3、Event 发布原理
2.3.1、发布主体流程
publisher 发布事件 Event。
EventBus 根据 Event 类对象从 SubscriberRegistry 中获取所有订阅者。
将 Event 和 eventSubscribers 交由 Dispatcher 去分发。
Dispatcher 将 Event 分发给每个 Subscribers。
Subscriber 利用反射执行订阅者方法。
图中画出了三个 Dispatcher 的分发原理。
2.3.2、原理分析
创建缓存
缓存 EventMsg 所有超类
注意!!!此处是静态方法,因此在代码加载的时候就会缓存 Event 所有超类。
发布 Event 事件
此方法是发布事件时调用的方法。
获取所有订阅者
1、从缓存中获取所有订阅者
2、获取 Event 超类
事件分发
1、分发入口
2、分发器分发
2.1、ImmediateDispatcher
来了一个事件则通知对这个事件感兴趣的订阅者。
2.2、PerThreadQueuedDispatcher(EventBus 默认选项)
在同一个线程 post 的 Event 执行顺序是有序的。用 ThreadLocal<Queue> queue 来实现每个线程的 Event 有序性,在把事件添加到 queue 后会有一个 ThreadLocal dispatching 来判断当前线程是否正在分发,如果正在分发,则这次添加的 event 不会马上进行分发而是等到 dispatching 的值为 false(分发完成)才进行。
源码如下:
2.3、LegacyAsyncDispatcher(AsyncEventBus 默认选项)
会有一个全局的队列 ConcurrentLinkedQueue queue 保存 EventWithSubscriber(事件和 subscriber),如果被不同的线程 poll,不能保证在 queue 队列中的 event 是有序发布的。源码如下:
执行订阅者方法
方法入口是 dispatchEvent,源码如下:
由于 Subscriber 有两种,因此执行方法也有两种:
1.Subscriber(非线程安全)
2.SynchronizedSubscriber(线程安全)
注意!!!执行方法会加同步锁
3、使用指导
3.1、主要流程
3.2、流程详解
1、创建 EventBus、AsyncEventBus Bean
在项目中统一配置全局单例 Bean(如特殊需求,可配置多例)
2、定义 EventMsg
设置消息载体。
3、注册 Listener
注册 Listener,处理事件
注意! 在使用 PostConstruct 注释进行注册时,需要注意子类会执行父类含有 PostConstruct 注释的方法。
3、事件发布
封装统一发布事件的 Bean,然后通过 Bean 注入到需要发布的 Bean 里面进行事件发布。
此处对 EventBus 进行了统一封装收口操作,主要考虑的是如果做一些操作,直接改这一处就可以。如果不需要封装,可以在使用的地方直接注入 EventBus 即可。
4、注意事项
4.1、循环分发事件
如果业务流程较长,切记梳理好业务流程,不要让事件循环分发。
目前 EventBus 没有对循环事件进行处理。
4.2、使用 @PostConstrucrt 注册 listener
子类在执行实例化时,会执行父类 @PostConstrucrt 注释。 如果 listenerSon 继承 listenerFather,当两者都使用 @PostConstrucrt 注册订阅方法时,子类也会调用父类的注册方法进行注册订阅方法。由于 EventBus 机制,子类注册订阅方法时,也会注册父类的监听方法
Subscriber 唯一标志是(listener+method),因此在对同一方法注册时,由于不是同一个 listener,所以对于 EventBus 是两个订阅方法。
因此,如果存在 listenerSon、listenerFather 两个 listener,且 listenerSon 继承 listenerFather。当都使用 @PostConstrucrt 注册时,会导致 listenerFather 里面的订阅方法注册两次。
4.3、含有继承关系的 listener
当注册 listener 含有继承关系时,listener 处理 Event 消息时,listener 的父类也会处理该消息。
4.3.1、继承关系的订阅者
4.3.2、原理
子类 listener 注册,父类 listener 也会注册
4.4、含有继承关系的 Event
如果作为参数的 Event 有继承关系,使用 EventBus 发布 Event 时,Event 父类的监听者也会对 Event 进行处理。
4.4.1、执行结果
4.4.2、原理
在分发消息的时候,会获取所有订阅者数据(Event 订阅者和 Event 超类的订阅者),然后进行分发数据。
获取订阅者数据如下图:
缓存 Event 及其超类的类对象,key 为 Event 类对象。
5、分享提问问题
问题 1:PerThreadQueuedDispatcherd 里面的队列,是否是有界队列?
有界队列,最大值为 int 的最大值 (2147483647),源码如下图:
问题 2:dispatcher 分发给订阅者是否有序?
EventBus:同步事件总线:
同一个事件的多个订阅者,在接收到事件的顺序上面有不同。谁先注册到 EventBus 的,谁先执行(由于 base 使用的是 PostConstruct 进行注册,因此跟不同 Bean 之间的初始化顺序有关系)。如果是在同一个类中的两个订阅者一起被注册到 EventBus 的情况,收到事件的顺序跟方法名有关。
AsyncEventBus:异步事件总线:同一个事件的多个订阅者,它们的注册顺序跟接收到事件的顺序上没有任何联系,都会同时收到事件,并且都是在新的线程中,异步并发的执行自己的任务。
问题 3:EventBus 与 SpringEvent 的对比?
使用方式比较
使用场景比较
参考链接https://www.cnblogs.com/shoren/p/eventBus_springEvent.html
问题 4:EventBus 的使用场景,结合现有使用场景考虑是否合适?
EventBus 暂时不适用,主要有一下几个点:
EventBus 不支持事务,项目在更新、创建商品时,最好等事务提交成功后,再发送 MQ 消息(主要问题点)
EventBus 不支持设置同一消息的订阅者消费顺序。
EventBus 不支持消息过滤。SpringEvent 支持消息过滤
6.项目中遇到的问题
6.1、问题描述
商品上架时会触发渠道分发功能,会有两步操作
1、创建一条分发记录,并对外发送一条未分发状态的商品变更消息(通过 eventBus 事件发送消息)。
2、将分发记录改为审核中(需要审核)或审核通过(不需要审核),并对外发送一条已分发状态的商品变更消息(通过 eventBus 事件发送消息)。
所以分发会触发两条分发状态不同的商品变更消息,一条是未分发,另一条是已分发。实际发送了两条分发状态相同的商品变更消息,状态都是已分发。
6.2、原因
我们先来回顾下 EventBus 监听者处理事件时有三种策略,这是根本原因:
ImmediateDispatcher:来一个事件马上进行处理。
PerThreadQueuedDispatcher(eventBus 默认选项,项目中使用此策略):在同一个线程 post 的 Event,执行的顺序是有序的。用 ThreadLocal<Queue> queue 来实现每个线程 post 的 Event 是有序的,在把事件添加到 queue 后会有一个 ThreadLocal dispatching 来判断当前线程是否正在分发,如果正在分发,则这次添加的 event 不会马上进行分发而是等到 dispatching 的值为 false 才进行。
LegacyAsyncDispatcher(AsyncEventBus 默认选项):会有一个全局的队列 ConcurrentLinkedQueue queue 保存 EventWithSubscriber(事件和 subscriber),如果被不同的线程 poll 不能保证在 queue 队列中的 event 是有序发布的。
详情可见上文中的【2.3.4、事件分发】
再看下项目中的逻辑:
6.3、场景复现
在 handler 中对静态变量进行两次+1 操作,每操作一步发送一条事件,此处假设静态变量为分发状态。
6.4、解决办法
目前 Dispatcher 包用 default 修饰,使用者无法指定 Dispatcher 策略。并且 ImmediateDispatcher 使用 private 修饰。
因此目前暂无解决非同步问题,只能在业务逻辑上进行规避。
其实可以修改源码并发布一个包自己使用,但是公司安全规定不允许这样做,只能通过业务逻辑上进行规避,下图是 github 上对此问题的讨论。
7、总结
如果项目中需要使用异步解耦处理一些事项,使用 EventBus 还是比较方便的。
版权声明: 本文为 InfoQ 作者【京东科技开发者】的原创文章。
原文链接:【http://xie.infoq.cn/article/25bc3a4d31800a350edd219ad】。文章转载请联系作者。
评论