死磕 Spark 事件总线——聊聊 Spark 中事件监听是如何实现的
Spark 中大量采用事件监听方式,实现 driver 端的组件之间的通信。本文就来解释一下 Spark 中事件监听是如何实现的
观察者模式和监听器
在设计模式中有一个观察者模式,该模式建立一种对象与对象之间的依赖关系,一个对象状态发生改变时立即通知其他对象,其他对象就据此作出相应的反应。其中发生改变的对象称之为观察目标(也有叫主题的),被通知的对象称之为观察者,可以有多个观察者注册到一个观察目标中,这些观察者之间没有联系,其数量可以根据需要增减。
事件驱动的异步化编程
Spark-Core 内部的事件框架实现了基于事件的异步化编程模式。它的最大好处是可以提升应用程序对物理资源的充分利用,能最大限度的压榨物理资源,提升应用程序的处理效率。缺点比较明显,降低了应用程序的可读性。
Spark 的基于事件的异步化编程框架由事件框架和异步执行线程池组成,应用程序产生的 Event 发送给 ListenerBus,ListenerBus 再把消息广播给所有的 Listener,每个 Listener 收到 Event 判断是否自己感兴趣的 Event,若是,会在 Listener 独享的线程池中执行 Event 所对应的逻辑程序块。
下图展示 Event、ListenerBus、Listener、Executor 的关系,从事件生成、事件传播、事件解释三个方面的视角来看:
我们从线程的视角来看,看异步化处理。异步化处理体现在事件传播、事件解释两个阶段,其中事件解释的异步化实现了我们的基于事件的异步化编程。
Spark 的实现
Spark-Core、Spark-Streaming 采用了分类的思路(分而治之)进行管理,每一大类事件都有独自的 Event、ListenerBus
Event
Spark-Core 的核心事件 trait 是 SparkListenerEvent,Spark-Straming 的核心事件 trait 是 StreamingListenerEvent
下图是各种事件实体类:
我们在定义事件需要注意哪些方面呢?我们以 SparkListenerTaskStart 为例,分析一个事件拥有哪些特征:
见名知义,SparkListenerTaskStart,一看名字我们就能猜到是 SparkListener 的一个任务启动事件。
触发条件,一个事件的触发条件必须清晰,能够清晰的描述一个行为,且行为宿主最好是唯一的。SparkListenerTaskStart 事件生成的宿主是 DAGScheduler,在 DAGScheduler 产生 BeginEvent 事件后生成 SparkListenerTaskStart。
事件传播,事件传播可选择 Point-Point 或者 BroadCast,这个可根据业务上的需要权衡、选择。Spark-Core、Spark-Streaming 的事件框架采用 BroadCast 模式。
事件解释,一个事件可以有一个或者多个解释。Spark-Core、Spark-Streaming 由于采用 BroadCast 模式,所以支持 Listener 对事件解释,原则一个 Listener 对一个事件只有一种解释。AppStatusListener、EventLoggingListener、ExecutorAllocationManager 等分别对 SparkListenerTaskStart 做了解释。 我们在设计事件框架上可根据实际需要借鉴以上四点,设计一个最恰当的事件框架。
Listner
Spark-Core 的核心监听 triat 是 SparkListener,Spark-Streaming 的核心监听 triat StreamingListener,两者都代表了一类监听的抽象
下图是一些监听实体类:
ListenerBus
监听器总线对象,Spark 程序在运行的过程中,Driver 端的很多功能都依赖于事件的传递和处理,而事件总线在这中间发挥着至关重要的纽带作用。事件总线通过异步线程,提高了 Driver 执行的效率。
Listener 注册到 ListenerBus 对象中,然后通过 ListenerBus 对象来实现事件监听(类似于计算机与周边设备之间的关系)
其 start 方法直接启动一个 dispatchThread,其核心逻辑就是不停地在一个事件队列 eventQueue 里取出事件,如果事件合法且 LiverListenerBus 没有被关停,就将事件通知给所有注册的 listener 中
其 dispatch 方法就是向事件队列里添加相应的事件。
ListenerBus 用于管理所有的 Listener,Spark-Core 和 Spark-Streaming 公用相同的 trait ListenerBus, 最终都是使用 AsyncEventQueue 类对 Listener 进行管理。
LiveListenerBus
管理所有注册的 Listener,为一类 Listener 创建一个唯一的 AsyncEventQueue,广播 Event 到所有的 Listener。默认可提供四类 AsyncEventQueue 分别为‘shared’、‘appStatus’、‘executorManagement’、‘eventLog’。
目前 Spark-Core 并没有放开类别设置,意谓着最多只能有上述四类,从设计的严谨上来讲分类并不是越多越好,每多一个类别,就会多一个 AsyncEventQueue 实例,每个实例中会包含一个事件传播的线程,对系统的资源占用还是比较多的。
异步事件处理线程 listenerThread
核心属性
核心方法
start
LiveListenerBus 在 SparkContext 的 setupAndStartListenerBus 中被初始化,并调用 start 方法启动 LiveListenerBus。
stop
停止 LiveListenerBus,它将等待队列事件被处理,但在停止后丢掉所有新的事件。需要注意 stop 可能会导致长时间的阻塞,执行 stop 方法的线程会被挂起,直到所有的 AsyncEventQueue(默认四个)中的 dispatch 线程都退出后执行 stop 主法的线程才会被唤醒。
post
采用广播的方式事件传播,这个过程很快,主线程只需要把事件传播给 AsyncEventQueue 即可,最后由 AsyncEventQueue 再广播给相应的 Listener
完整流程
图中的 DAGScheduler、SparkContext、BlockManagerMasterEndpoint、DriverEndpoint 及 LocalSchedulerBackend 都是 LiveListenerBus 的事件来源,它们都是通过调用 LiveListenerBus 的 post 方法将消息提交给事件队列,每 post 一个事件,信号量就加一。
listenerThread 不停的获取信号量,然后从事件队列中取出事件,取到事件,则调用 postForAll 把事件分发给已注册的监听器,否则,就是取到空事件,它明白这是事件总线搞的鬼,它调用了 stop 但是每 post 事件,从而停止事件总线线程。
作者:luckywind_509
链接:https://my.oschina.net/whucxf/blog/5127818
评论