写点什么

死磕 Spark 事件总线——聊聊 Spark 中事件监听是如何实现的

  • 2023-03-11
    湖南
  • 本文字数:4090 字

    阅读完需:约 13 分钟

Spark 中大量采用事件监听方式,实现 driver 端的组件之间的通信。本文就来解释一下 Spark 中事件监听是如何实现的

观察者模式和监听器

在设计模式中有一个观察者模式,该模式建立一种对象与对象之间的依赖关系,一个对象状态发生改变时立即通知其他对象,其他对象就据此作出相应的反应。其中发生改变的对象称之为观察目标(也有叫主题的),被通知的对象称之为观察者,可以有多个观察者注册到一个观察目标中,这些观察者之间没有联系,其数量可以根据需要增减。

事件驱动的异步化编程

Spark-Core 内部的事件框架实现了基于事件的异步化编程模式。它的最大好处是可以提升应用程序对物理资源的充分利用,能最大限度的压榨物理资源,提升应用程序的处理效率。缺点比较明显,降低了应用程序的可读性。


Spark 的基于事件的异步化编程框架由事件框架和异步执行线程池组成,应用程序产生的 Event 发送给 ListenerBus,ListenerBus 再把消息广播给所有的 Listener,每个 Listener 收到 Event 判断是否自己感兴趣的 Event,若是,会在 Listener 独享的线程池中执行 Event 所对应的逻辑程序块。


下图展示 Event、ListenerBus、Listener、Executor 的关系,从事件生成、事件传播、事件解释三个方面的视角来看:

我们从线程的视角来看,看异步化处理。异步化处理体现在事件传播、事件解释两个阶段,其中事件解释的异步化实现了我们的基于事件的异步化编程。

Spark 的实现

Spark-Core、Spark-Streaming 采用了分类的思路(分而治之)进行管理,每一大类事件都有独自的 Event、ListenerBus

Event

Spark-Core 的核心事件 trait 是 SparkListenerEvent,Spark-Straming 的核心事件 trait 是 StreamingListenerEvent


下图是各种事件实体类:



我们在定义事件需要注意哪些方面呢?我们以 SparkListenerTaskStart 为例,分析一个事件拥有哪些特征:

  1. 见名知义,SparkListenerTaskStart,一看名字我们就能猜到是 SparkListener 的一个任务启动事件。

  2. 触发条件,一个事件的触发条件必须清晰,能够清晰的描述一个行为,且行为宿主最好是唯一的。SparkListenerTaskStart 事件生成的宿主是 DAGScheduler,在 DAGScheduler 产生 BeginEvent 事件后生成 SparkListenerTaskStart。

  3. 事件传播,事件传播可选择 Point-Point 或者 BroadCast,这个可根据业务上的需要权衡、选择。Spark-Core、Spark-Streaming 的事件框架采用 BroadCast 模式。

  4. 事件解释,一个事件可以有一个或者多个解释。Spark-Core、Spark-Streaming 由于采用 BroadCast 模式,所以支持 Listener 对事件解释,原则一个 Listener 对一个事件只有一种解释。AppStatusListener、EventLoggingListener、ExecutorAllocationManager 等分别对 SparkListenerTaskStart 做了解释。 我们在设计事件框架上可根据实际需要借鉴以上四点,设计一个最恰当的事件框架。

Listner

Spark-Core 的核心监听 triat 是 SparkListener,Spark-Streaming 的核心监听 triat StreamingListener,两者都代表了一类监听的抽象


下图是一些监听实体类:


ListenerBus

监听器总线对象,Spark 程序在运行的过程中,Driver 端的很多功能都依赖于事件的传递和处理,而事件总线在这中间发挥着至关重要的纽带作用。事件总线通过异步线程,提高了 Driver 执行的效率。


Listener 注册到 ListenerBus 对象中,然后通过 ListenerBus 对象来实现事件监听(类似于计算机与周边设备之间的关系)


其 start 方法直接启动一个 dispatchThread,其核心逻辑就是不停地在一个事件队列 eventQueue 里取出事件,如果事件合法且 LiverListenerBus 没有被关停,就将事件通知给所有注册的 listener 中

其 dispatch 方法就是向事件队列里添加相应的事件。


ListenerBus 用于管理所有的 Listener,Spark-Core 和 Spark-Streaming 公用相同的 trait ListenerBus, 最终都是使用 AsyncEventQueue 类对 Listener 进行管理。

LiveListenerBus

管理所有注册的 Listener,为一类 Listener 创建一个唯一的 AsyncEventQueue,广播 Event 到所有的 Listener。默认可提供四类 AsyncEventQueue 分别为‘shared’、‘appStatus’、‘executorManagement’、‘eventLog’。


目前 Spark-Core 并没有放开类别设置,意谓着最多只能有上述四类,从设计的严谨上来讲分类并不是越多越好,每多一个类别,就会多一个 AsyncEventQueue 实例,每个实例中会包含一个事件传播的线程,对系统的资源占用还是比较多的。

异步事件处理线程 listenerThread

  private val listenerThread = new Thread(name) {    setDaemon(true) //线程本身设为守护线程     override def run(): Unit = Utils.tryOrStopSparkContext(sparkContext) {      LiveListenerBus.withinListenerThread.withValue(true) {        while (true) {          eventLock.acquire()//不断获取信号量,信号量减一,能获取到说明还有事件未处理          self.synchronized {            processingEvent = true          }          try {            val event = eventQueue.poll  //获取事件, remove() 和 poll() 方法都是从队列中删除第一个元素(head)。            if (event == null) {              // 此时说明没有事件,但还是拿到信号量了,这说明stop方法被调用了              // 跳出while循环,关闭守护进程线程              if (!stopped.get) {                throw new IllegalStateException("Polling `null` from eventQueue means" +                  " the listener bus has been stopped. So `stopped` must be true")              }              return            }            // 调用ListenerBus的postxToAll(event: E)方法            postxToAll(event)          } finally {            self.synchronized {              processingEvent = false            }          }        }      }    }  }
复制代码

核心属性

private val started = new AtomicBoolean(false)private val stopped = new AtomicBoolean(false)//存放事件private lazy val eventQueue = new LinkedBlockingQueue[SparkListenerEvent]// 表示队列中产生和使用的事件数量的计数器,这个信号量是为了避免消费者线程空跑private val eventLock = new Semaphore(0)
复制代码

核心方法

start

LiveListenerBus 在 SparkContext 的 setupAndStartListenerBus 中被初始化,并调用 start 方法启动 LiveListenerBus。

  def start(): Unit = {    if (started.compareAndSet(false, true)) {       listenerThread.start() //启动消费者线程    } else {      throw new IllegalStateException(s"$name already started!")    }
复制代码

stop

停止 LiveListenerBus,它将等待队列事件被处理,但在停止后丢掉所有新的事件。需要注意 stop 可能会导致长时间的阻塞,执行 stop 方法的线程会被挂起,直到所有的 AsyncEventQueue(默认四个)中的 dispatch 线程都退出后执行 stop 主法的线程才会被唤醒。

  def stop(): Unit = {    if (!started.get()) {      throw new IllegalStateException(s"Attempted to stop $name that has not yet started!")    }    if (stopped.compareAndSet(false, true)) {      // Call eventLock.release() so that listenerThread will poll `null` from `eventQueue` and know      // `stop` is called.      // 释放一个信号量,但此时是没有事件的,从而listenerThread会拿到一个空事件,从而知道该停止了      eventLock.release()      //然后等待消费者线程自动关闭      listenerThread.join()    } else {      // Keep quiet    }  }
复制代码

post

采用广播的方式事件传播,这个过程很快,主线程只需要把事件传播给 AsyncEventQueue 即可,最后由 AsyncEventQueue 再广播给相应的 Listener

def post(event: SparkListenerEvent): Unit = {    if (stopped.get) {      // Drop further events to make `listenerThread` exit ASAP      logError(s"$name has already stopped! Dropping event $event")      return    }    // 在事件队列队尾添加事件    // add()和offer()区别:两者都是往队列尾部插入元素,不同的时候,当超出队列界限的时候,add()方法是抛出异常让你处理,而offer()方法是直接返回false    val eventAdded = eventQueue.offer(event)    if (eventAdded) {      //如果成功加入队列,则在信号量中加一      eventLock.release()    } else {      // 如果事件队列超过其容量,则将删除新的事件,这些子类将被通知到删除事件。      onDropEvent(event)      droppedEventsCounter.incrementAndGet()    }
val droppedEvents = droppedEventsCounter.get if (droppedEvents > 0) { // Don't log too frequently 日志不要太频繁 // 如果上一次,队列满了EVENT_QUEUE_CAPACITY=1000设置的值,就丢掉,然后记录一个时间,如果一直持续丢掉,那么每过60秒记录一次日志,不然日志会爆满的 if (System.currentTimeMillis() - lastReportTimestamp >= 60 * 1000) { if (droppedEventsCounter.compareAndSet(droppedEvents, 0)) { val prevLastReportTimestamp = lastReportTimestamp lastReportTimestamp = System.currentTimeMillis() // 记录一个warn日志,表示这个事件,被丢弃了 logWarning(s"Dropped $droppedEvents SparkListenerEvents since " + new java.util.Date(prevLastReportTimestamp)) } } } }
复制代码

完整流程


  1. 图中的 DAGScheduler、SparkContext、BlockManagerMasterEndpoint、DriverEndpoint 及 LocalSchedulerBackend 都是 LiveListenerBus 的事件来源,它们都是通过调用 LiveListenerBus 的 post 方法将消息提交给事件队列,每 post 一个事件,信号量就加一。

  2. listenerThread 不停的获取信号量,然后从事件队列中取出事件,取到事件,则调用 postForAll 把事件分发给已注册的监听器,否则,就是取到空事件,它明白这是事件总线搞的鬼,它调用了 stop 但是每 post 事件,从而停止事件总线线程。


作者:luckywind_509

链接:https://my.oschina.net/whucxf/blog/5127818

用户头像

还未添加个人签名 2021-07-28 加入

公众号:该用户快成仙了

评论

发布
暂无评论
死磕Spark事件总线——聊聊Spark中事件监听是如何实现的_Java_做梦都在改BUG_InfoQ写作社区