EventBus 说明
EventBus 是 Guava 封装的事件处理机制,属于设计模式中的观察者模式(生产|消费者编程模型)。使用简单、优雅只关注业务本身,仅限 JVM 内部使用,缺点:不适用分布式,看实际业务可选用 MQ。
事件总线(EventBus)处理个服务、或业务间公共线路,而 EventBus 则是存储事件、处理时间中间服务,通过 post()往 EventBus 发送事件,由 EventBus 去调度对应订阅方(subscriber)消费处理,解耦观察者模式中订阅方与事件源之间强依赖关系。
如何使用
引入 Maven 依赖
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>20.0</version>
</dependency>
复制代码
场景举例(伪代码)
public class EventBusCenter {
private static EventBus eventBus = new EventBus();
private EventBusCenter() {
}
public static EventBus getInstance(){
return eventBus;
}
/**注册监听*/
public static void register(Object listener){
eventBus.register(listener);
}
/**取消注册*/
public static void unregister(Object listener){
eventBus.unregister(listener);
}
/**发布事件*/
public static void post(Object event){
eventBus.post(event);
}
}
复制代码
定义业务对象
public class OrderInfo {
private long orderId;
private String orderName;
}
public class OrderFailInfo{
private long orderId;
private String orderName;
private String msg;
}
复制代码
支付成功监听处理事件
public class OrderSuccessListener {
@Subscribe
public void orderSubscribe(OrderInfo event) {
// 订单业务监听
System.out.println("订单支付成功处理事件: " + event);
}
@Subscribe
public void ortherSubscribe(OrderInfo event) {
// 其它业务监听
System.out.println("订单支付成功其它处理事件: " + event);
}
}
复制代码
支付失败监听处理事件
public class OrderFailListener {
@Subscribe
public void orderSubscribe(OrderFailInfo event) {
// 订单业务监听
System.err.println("订单支付失败处理事件: " + event);
}
@Subscribe
public void ortherSubscribe(String event) {
// 其它业务监听
System.err.println("订单支付失败其它处理事件: " + event);
}
}
复制代码
测试
public class EventBusTest {
public static void main(String[] args) {
OrderSuccessListener successListener = new OrderSuccessListener();
OrderFailListener failListener = new OrderFailListener();
EventBusCenter.register(successListener);
EventBusCenter.register(failListener);
OrderInfo order = OrderInfo.builder().orderId(111L).orderName("订单名称xxx").build();
log.info("=====支付成功=======");
EventBusCenter.post(order);
log.info("=====支付失败=======");
OrderFailInfo orderFail = OrderFailInfo.builder().orderId(111L).orderName("订单名称xxx").msg("支付失败").build();
EventBusCenter.post(orderFail);
log.info("=====记录支付失败原因=======");
EventBusCenter.post("用户取消订单");
}
}
输出结果:
信息: =====支付成功=======
订单支付成功其它处理事件: OrderInfo(orderId=111, orderName=订单名称xxx)
订单支付成功处理事件: OrderInfo(orderId=111, orderName=订单名称xxx)
信息: =====支付失败=======
订单支付失败处理事件: OrderFailInfo(orderId=111, orderName=订单名称xxx, msg=支付失败)
信息: =====记录支付失败原因=======
订单支付失败其它处理事件: 用户取消订单
复制代码
使用说明:
源码分析
初始化 EvenBus
EventBus(String identifier, Executor executor, Dispatcher dispatcher, SubscriberExceptionHandler exceptionHandler) {
this.subscribers = new SubscriberRegistry(this);//所有观察者列表维护对象
this.identifier = (String) Preconditions.checkNotNull(identifier);// EventBus的处理标识符(方法名称)
//executor是事件分发过程中使用到的线程池,可以自己实现
this.executor = (Executor)Preconditions.checkNotNull(executor);
//Dispatcher类型的子类,对监听者分发策略,主要有3种方式
this.dispatcher = (Dispatcher)Preconditions.checkNotNull(dispatcher);
//异常处理策咯
this.exceptionHandler = (SubscriberExceptionHandler)Preconditions.checkNotNull(exceptionHandler);
}
复制代码
注册监听–>register()
作用:初始化监听者类型与监听方法的集合 subscribers,在发布事件场景由传入参数类型,匹配、并执行对应监听者方法
①获取指定监听者对应的全部观察者集合(一对多)
②获取对应**@Subscribe**观察者事件类型(即:方法参数类型)集合
③添加对应类型所有观察者到集合事件集合中(即:SubscriberRegistry 对象维护的 subscribers 集合)
subscribers 说明:
private final ConcurrentMap<Class<?>, CopyOnWriteArraySet<Subscriber>> subscribers = Maps.newConcurrentMap();
复制代码
防止并发问题维护一个 ConcurrentMap 集合,Subscriber 集合对应 velue 使用的是 Java 中的 CopyOnWriteArraySet 集合,
主要作用:1、避免监听重复事件 2、适用于读多写少的场景:只要 register()才写入,基本都是查询
findAllSubscribers()说明
①获取注册监听对象所有包含**@Subscribe**的方法(如下图所示⤵️)
②通过反射获取对应方法、及监听参数类型
③放入当前初始化的 EventBus 对象里面,维护监听对象与监听方法的对应关系
首先从 subscriberMethodsCache 缓存中获取监听对象映射关系,如果缓存中不存在通过反射遍历所有包含**@Subscribe**的方法
发布事件–>post()
①getSubscribers()方法获取该事件对应的全部观察者
②Dispatcher-分发事件策略
ImmediateDispatcher:直接在当前线程中遍历所有的观察者并进行事件分发
LegacyAsyncDispatcher:使用全局队列,先将观察者依次放入队列,再顺序从队列中取出观察者对象进行事件分发
PerThreadQueuedDispatcher(默认):使用线程相关队列,会先获取当前线程的观察者队列,并将传入的观察者列表传入到该队列中;判断当前线程是否正在进行分发操作,如果没有在进行分发操作,就通过遍历上述队列进行事件分发
最后无论使用哪个分发器,都会执行 dispatchEvent()方法,通过反射(target、method、executor)多线程触发监听方法
评论