写点什么

基于 DolphinScheduler 抽取通用 EventBus 组件:支持延迟与事件驱动

作者:白鲸开源
  • 2025-05-23
    天津
  • 本文字数:2258 字

    阅读完需:约 7 分钟

基于DolphinScheduler抽取通用EventBus组件:支持延迟与事件驱动

一、思路来源

虽然 guava 中的 eventbus 已经很方便了,但是还是想要实现一个更为方便,同时支持延迟事件、同时带 eventbus 的组件。在 Apache DolphinScheduler 项目中,有一个 eventbus 的组件,这个组件写得挺好的,想着用在业务系统上,因此自己抽取了一下,拿到业务系统中来用。话不多说,我们把它抽取出来吧,同时进行 demo 的运行。还是要感谢 Apache DolphinScheduler 的开源,让这个很简单,但是很高效的组件能够让我们便捷地使用。

二、具体实现过程

首先是定义事件接口:


public interface IEvent {}
复制代码


针对事件接口,我们抽象出共性方法接口:延迟时间和过期时间。


public abstractclass AbstractDelayEvent implements IEvent, Delayed {    privatefinallong delayTime;    privatefinallong expireTime;

public long getDelayTime() { return delayTime; }
public long getExpireTime() { return expireTime; }
public AbstractDelayEvent(long delayTime) { this.delayTime = delayTime; this.expireTime = System.currentTimeMillis() + delayTime; }
@Override public long getDelay(TimeUnit unit) { long diff = expireTime - System.currentTimeMillis(); return unit.convert(diff, TimeUnit.MILLISECONDS); }
@Override public int compareTo(Delayed o) { if (this.expireTime < ((AbstractDelayEvent) o).expireTime) { return -1; } if (this.expireTime > ((AbstractDelayEvent) o).expireTime) { return1; } return0; }}
复制代码


主要的信息:


定义 eventbus 中,我们需要使用的方法:


public interface IEventBus<T extends IEvent> {
void publish(T event);
Optional<T> poll() throws InterruptedException;
Optional<T> peek();
Optional<T> remove();
boolean isEmpty();
int size();}
复制代码


可以看到主要是:发布事件、消费、移除、删除、判断当前的事件是否为空,以及事件大小等方法。其中最重要的方法为发布事件和消费处理事件方法。


针对当前的事件 bus 接口进行抽象,抽取出共性方法,方便复用:


public abstractclass AbstractDelayEventBus<T extends AbstractDelayEvent> implements IEventBus<T> {
protectedfinal DelayQueue<T> delayEventQueue = new DelayQueue<>();
@Override public void publish(T event) { delayEventQueue.put(event); }
@Override public Optional<T> poll() throws InterruptedException { // 使用带超时的 poll 方法,等待事件到期 return Optional.ofNullable(delayEventQueue.poll(1000, TimeUnit.MILLISECONDS)); }
@Override public Optional<T> peek() { return Optional.ofNullable(delayEventQueue.peek()); }
@Override public Optional<T> remove() { return Optional.ofNullable(delayEventQueue.poll()); }
@Override public boolean isEmpty() { return delayEventQueue.isEmpty(); }
@Override public int size() { return delayEventQueue.size(); }}
复制代码

三、测试运行效果

接下来,我们使用它,来进行处理:


定义自己的延迟事件:


如果是在业务中,可以定义自己的业务数据信息事件对象


public class MyDelayEvent extends AbstractDelayEvent {    private final String message;
public MyDelayEvent(long delayTime, String message) { super(delayTime); this.message = message; }
public String getMessage() { return message; }}
复制代码


定义事件延迟事件 bus


当然也可以进行自己的可定制化特性。


public class MyDelayEventBus extends AbstractDelayEventBus<MyDelayEvent> {    // 不需要额外的修改}
复制代码


进行测试:


思路:创建事件总线、发布事件,然后针对发布的事件信息,进行消费,然后等待延迟时间的到来,从而实现消费,从而进行业务的处理。


import java.util.Optional;
publicclass EventBusExample { public static void main(String[] args) throws InterruptedException { // 创建事件总线 IEventBus<MyDelayEvent> eventBus = new MyDelayEventBus();
// 发布单个事件 eventBus.publish(new MyDelayEvent(100, "Single Event")); System.out.println("After publish, event bus size: " + eventBus.size());
// 持续尝试消费事件 while (true) { Optional<MyDelayEvent> event = eventBus.poll(); if (event.isPresent()) { System.out.println("Received event: " + event.get().getMessage()); } else { System.out.println("No event received within the timeout."); break; } }
// 检查总线大小 System.out.println("Event bus size: " + eventBus.size()); }}
复制代码


运行结果:



可以看到实现自己的业务逻辑还是很方便的,可以自己实现吧,这里给出的代码是可以运行的。


  • 源码地址:https://gitee.com/null_713_2407/pratice

  • 参考:github:https://github.com/apache/dolphinscheduler


作者 | 刘亚洲

用户头像

白鲸开源

关注

一家开源原生的DataOps商业公司。 2022-03-18 加入

致力于打造下一代开源原生的DataOps 平台,助力企业在大数据和云时代,智能化地完成多数据源、多云及信创环境的数据集成、调度开发和治理,以提高企业解决数据问题的效率,提升企业分析洞察能力和决策能力。

评论

发布
暂无评论
基于DolphinScheduler抽取通用EventBus组件:支持延迟与事件驱动_开源_白鲸开源_InfoQ写作社区