写点什么

消息队列 Kafka:源码解读 (一)- 异步任务管理

用户头像
正向成长
关注
发布于: 1 小时前

Kafka 通过DelayedOperationPurgatory进行异步任务(即DelayedOperation)的管理,在该类的内部对于无法立刻完成的任务则插入内部的多层时钟轮,等待超时处理。


时钟轮

假设这样一个高并发的场景:一秒钟处理数万个请求,定时 5 秒钟超时,如果每个超时任务启动一个线程,那么久需要启动数万个线程,这样是难以接受的。还可以启动一个线程,间隔一个小时间间隔(假设 100 毫秒)扫描一次任务,如果任务超时则执行。这样就意味着在 5 秒超时时间内,每个 100 毫秒扫描一次,大约要扫描 50 次,同一个任务要被多扫描 40 多次,很浪费 CPU。


时钟轮就是为了减少扫描的次数,节省 CPU。例如上述的 5 秒钟的超时任务,只需要在 4.9 秒时扫描这批任务就可以了。

时钟轮原理

时钟轮和实际的时钟可以进行类比,时间轮中有时间槽和时钟轮两个概念,时间槽类似于时钟的刻度,时钟轮好比秒针(或分针)跳动一个周期。

简单时钟轮

多层时钟轮


Kafka 多层时钟轮实现

Kafaka 的多层时钟轮的实现对应于kafka.utils.timer包的TimingWheel类的实现,内部通过TimerTaskList来保存定时任务链,而其内部则依赖于java.util.concurrent.Delayed,时钟轮没有时间推进功能,借助Delayed实现时间的推荐功能。


内部的构造的私有变量:

// 更高一层时钟轮的时间跨度private[this] val interval = tickMs * wheelSize
// 定时任务链,拥有wheelSize个桶private[this] val buckets = Array.tabulate[TimerTaskList](wheelSize) { _ => new TimerTaskList(taskCounter)}
// 根据tickMs向下取整private[this] var currentTime = startMs - (startMs % tickMs)
复制代码

添加更高一层时钟轮

并提供了addOverflowWheel接口实现创建更高一层时钟轮

  private[this] def addOverflowWheel(): Unit = {    synchronized {      if (overflowWheel == null) {        overflowWheel = new TimingWheel(          tickMs = interval,          wheelSize = wheelSize,          startMs = currentTime,          taskCounter = taskCounter,          queue        )      }    }  }
复制代码


添加定时任务

add接口来添加定时任务:

  /**   * @func:   添加定时任务   * @param   timerTaskEntry:   定时任务,在Kafka中定时任务被抽象为TimerTaskEntry类   * @return   */  	def add(timerTaskEntry: TimerTaskEntry): Boolean = {    val expiration = timerTaskEntry.expirationMs
if (timerTaskEntry.cancelled) { // Cancelled false } else if (expiration < currentTime + tickMs) { // Already expired false } else if (expiration < currentTime + interval) { // Put in its own bucket val virtualId = expiration / tickMs // 计算时间槽的位置并获取任务桶(TimerTaskEntry组成的双向链表) val bucket = buckets((virtualId % wheelSize.toLong).toInt) bucket.add(timerTaskEntry)
// Set the bucket expiration time if (bucket.setExpiration(virtualId * tickMs)) { queue.offer(bucket) } true } else { // Out of the interval. Put it into the parent timer if (overflowWheel == null) addOverflowWheel() // 若更高一层时钟轮无法容纳,会递归创建更高一层的时钟轮 overflowWheel.add(timerTaskEntry) } }
复制代码

推进时钟轮

advanceClock接口实现时钟轮的推进,该接口只是修改currentTime

def advanceClock(timeMs: Long): Unit = {  	// timeMs 超过了当前任务桶bucket 的时间范围    if (timeMs >= currentTime + tickMs) {      // 修改currentTime,对timeMs向下取整      currentTime = timeMs - (timeMs % tickMs)
// Try to advance the clock of the overflow wheel if present if (overflowWheel != null) overflowWheel.advanceClock(currentTime) } }
复制代码

定时任务桶解读

定时任务桶实现对应于TimerTaskList,它继承自Delayed

/**   @reference:*     Delayed   https://examples.javacodegeeks.com/core-java/util/concurrent/delayqueue/java-util-concurrent-delayqueue-example/*       由于继承自Delayed必须实现下面两个接口:*           1、getDelay: a method that returns how much time is left before the delay completes,*               getDelay method is important because Java decided to dequeue element from queue								if getDelay() method returns a value less than or equal to zero.”*           2、compareTo: The Delayed interface extends the Comparable interface, so Delayed implementations must*               override the compareTo() to specify how they should be ordered with respect to other Delayed objects.* */@threadsafeprivate[timer] class TimerTaskList(taskCounter: AtomicInteger) extends Delayed
复制代码

由于继承自Delayed,必须实现两个接口:

  /**   * @func:   依赖于DelayQueue必须实现该接口   * @return  返回截止超时还剩下多少时间   * */  def getDelay(unit: TimeUnit): Long = {    unit.convert(max(getExpiration - Time.SYSTEM.hiResClockMs, 0), TimeUnit.MILLISECONDS)  }
/** * @func: 依赖于DelayQueue必须实现该接口, specify how they should be ordered with respect to other Delayed objects * */ def compareTo(d: Delayed): Int = { val other = d.asInstanceOf[TimerTaskList] java.lang.Long.compare(getExpiration, other.getExpiration) }
复制代码


  // Remove all task entries and apply the supplied function to each of them  def flush(f: TimerTaskEntry => Unit): Unit = {    synchronized {      var head = root.next      // 遍历整个链表,对其中的每个元素执行f函数并删除所有的元素      while (head ne root) {        remove(head)        f(head)        head = root.next      }      expiration.set(-1L)    }  }
复制代码

Kafka 异步任务管理

定时器 SystemTimer

Kafka 定义了kafka.utils.timer包中Timer特质来统一定时器相关接口:

trait Timer {  /**    * Add a new task to this executor. It will be executed after the task's delay    * (beginning from the time of submission)    * @param timerTask the task to add    */  def add(timerTask: TimerTask): Unit
/** * Advance the internal clock, executing any tasks whose expiration has been * reached within the duration of the passed timeout. * @param timeoutMs * @return whether or not any tasks were executed */ def advanceClock(timeoutMs: Long): Boolean
/** * Get the number of tasks pending execution * @return the number of tasks */ def size: Int
/* Shutdown the timer service, leaving pending tasks unexecuted */ def shutdown(): Unit}
复制代码


SystemTimer继承自Timer,内部构造了TimingWheel多层时钟轮:

  // timeout timer  private[this] val taskExecutor = Executors.newFixedThreadPool(1,    (runnable: Runnable) => KafkaThread.nonDaemon("executor-" + executorName, runnable))	//   private[this] val delayQueue = new DelayQueue[TimerTaskList]()  private[this] val taskCounter = new AtomicInteger(0)  // 构造多层时钟轮  private[this] val timingWheel = new TimingWheel(    tickMs = tickMs,    wheelSize = wheelSize,    startMs = startMs,    taskCounter = taskCounter,    delayQueue  )
复制代码

添加任务

  def add(timerTask: TimerTask): Unit = {    readLock.lock()    try {      addTimerTaskEntry(new TimerTaskEntry(timerTask, timerTask.delayMs + Time.SYSTEM.hiResClockMs))    } finally {      readLock.unlock()    }  }
private def addTimerTaskEntry(timerTaskEntry: TimerTaskEntry): Unit = { if (!timingWheel.add(timerTaskEntry)) { // Already expired or cancelled if (!timerTaskEntry.cancelled) // 执行线程池的submit方法获取任务返回的Future taskExecutor.submit(timerTaskEntry.timerTask) } }
复制代码

时钟推进

  def advanceClock(timeoutMs: Long): Boolean = {    // 等待timeout,如果有超时任务链,则从延时队列中取出    var bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS)    if (bucket != null) { // 存在超时任务链      writeLock.lock()      try {        while (bucket != null) {          // 推进时钟轮,内部如果有更高一层时钟轮,则推进更高一层时钟轮          timingWheel.advanceClock(bucket.getExpiration)          // 将任务连中非取消的任务submit到线程池执行          bucket.flush(addTimerTaskEntry)          // 默认timeout=0,即尽可能多的取出超时任务链          bucket = delayQueue.poll()        }      } finally {        writeLock.unlock()      }      true    } else {      false    }  }
复制代码

过期任务检测

ExpiredOperationReaper来进行过期任务的检测,在advanceClock的接口会反复调用timeoutTimer.advanceClock将时钟轮向前推进,并执行过期的任务。


   def advanceClock(timeoutMs: Long): Unit = {    timeoutTimer.advanceClock(timeoutMs)
// Trigger a purge if the number of completed but still being watched operations is larger than // the purge threshold. That number is computed by the difference btw the estimated total number of // operations and the number of pending delayed operations. if (estimatedTotalOperations.get - numDelayed > purgeInterval) { // now set estimatedTotalOperations to delayed (the number of pending operations) since we are going to // clean up watchers. Note that, if more operations are completed during the clean up, we may end up with // a little overestimated total number of operations. estimatedTotalOperations.getAndSet(numDelayed) debug("Begin purging watch lists") val purged = watcherLists.foldLeft(0) { case (sum, watcherList) => sum + watcherList.allWatchers.map(_.purgeCompleted()).sum } debug("Purged %d elements from watch lists.".format(purged)) } } /** * A background reaper to expire delayed operations that have timed out * 检测过期的异步任务并进行处理,在线程函数中会反复调用内部定时器对象timeoutTimer(SystemTimer) * advanceClock方法向前推进 * 如果有过期的任务,则会将其从定时器中移除并执行回调 */ private class ExpiredOperationReaper extends ShutdownableThread( "ExpirationReaper-%d-%s".format(brokerId, purgatoryName), false) { /** * @func: 推进时钟轮,在基类ShutdownableThread的run方法中调用 * */ override def doWork(): Unit = { advanceClock(200L) } }
复制代码

异步任务管理

实现对应于DelayedOperationPurgatory

  private val Shards = 512 // Shard the watcher list to reduce lock contention
def apply[T <: DelayedOperation](purgatoryName: String, brokerId: Int = 0, purgeInterval: Int = 1000, reaperEnabled: Boolean = true, timerEnabled: Boolean = true): DelayedOperationPurgatory[T] = { // SystemTimer为定时器,内部通过时钟轮TimingWheel实现 val timer = new SystemTimer(purgatoryName) new DelayedOperationPurgatory[T](purgatoryName, timer, brokerId, purgeInterval, reaperEnabled, timerEnabled) }
}
/** * A helper purgatory class for bookkeeping delayed operations with a timeout, * and expiring timed out operations. */final class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, timeoutTimer: Timer, brokerId: Int = 0, purgeInterval: Int = 1000, reaperEnabled: Boolean = true, timerEnabled: Boolean = true) extends Logging with KafkaMetricsGroup { ... /* background thread expiring operations that have timed out */ private val expirationReaper = new ExpiredOperationReaper() if (reaperEnabled) expirationReaper.start() }
复制代码


参考资料


  1. Kafka 时间轮的原理和实现


发布于: 1 小时前阅读数: 2
用户头像

正向成长

关注

正向成长 2018.08.06 加入

想要坚定地做大规模数据处理(流数据方向),希望结合结合批处理的传统处理方式,以及之后流批混合处理方向进行学习和记录。

评论

发布
暂无评论
消息队列Kafka:源码解读(一)-异步任务管理