消息队列 Kafka:源码解读 (一)- 异步任务管理
Kafka 通过DelayedOperationPurgatory进行异步任务(即DelayedOperation)的管理,在该类的内部对于无法立刻完成的任务则插入内部的多层时钟轮,等待超时处理。
时钟轮
假设这样一个高并发的场景:一秒钟处理数万个请求,定时 5 秒钟超时,如果每个超时任务启动一个线程,那么久需要启动数万个线程,这样是难以接受的。还可以启动一个线程,间隔一个小时间间隔(假设 100 毫秒)扫描一次任务,如果任务超时则执行。这样就意味着在 5 秒超时时间内,每个 100 毫秒扫描一次,大约要扫描 50 次,同一个任务要被多扫描 40 多次,很浪费 CPU。
时钟轮就是为了减少扫描的次数,节省 CPU。例如上述的 5 秒钟的超时任务,只需要在 4.9 秒时扫描这批任务就可以了。
时钟轮原理
时钟轮和实际的时钟可以进行类比,时间轮中有时间槽和时钟轮两个概念,时间槽类似于时钟的刻度,时钟轮好比秒针(或分针)跳动一个周期。
简单时钟轮
多层时钟轮
Kafka 多层时钟轮实现
Kafaka 的多层时钟轮的实现对应于kafka.utils.timer包的TimingWheel类的实现,内部通过TimerTaskList来保存定时任务链,而其内部则依赖于java.util.concurrent.Delayed,时钟轮没有时间推进功能,借助Delayed实现时间的推荐功能。
内部的构造的私有变量:
// 更高一层时钟轮的时间跨度private[this] val interval = tickMs * wheelSize
// 定时任务链,拥有wheelSize个桶private[this] val buckets = Array.tabulate[TimerTaskList](wheelSize) { _ => new TimerTaskList(taskCounter)}
// 根据tickMs向下取整private[this] var currentTime = startMs - (startMs % tickMs)添加更高一层时钟轮
并提供了addOverflowWheel接口实现创建更高一层时钟轮
private[this] def addOverflowWheel(): Unit = { synchronized { if (overflowWheel == null) { overflowWheel = new TimingWheel( tickMs = interval, wheelSize = wheelSize, startMs = currentTime, taskCounter = taskCounter, queue ) } } }添加定时任务
add接口来添加定时任务:
/** * @func: 添加定时任务 * @param timerTaskEntry: 定时任务,在Kafka中定时任务被抽象为TimerTaskEntry类 * @return */ def add(timerTaskEntry: TimerTaskEntry): Boolean = { val expiration = timerTaskEntry.expirationMs
if (timerTaskEntry.cancelled) { // Cancelled false } else if (expiration < currentTime + tickMs) { // Already expired false } else if (expiration < currentTime + interval) { // Put in its own bucket val virtualId = expiration / tickMs // 计算时间槽的位置并获取任务桶(TimerTaskEntry组成的双向链表) val bucket = buckets((virtualId % wheelSize.toLong).toInt) bucket.add(timerTaskEntry)
// Set the bucket expiration time if (bucket.setExpiration(virtualId * tickMs)) { queue.offer(bucket) } true } else { // Out of the interval. Put it into the parent timer if (overflowWheel == null) addOverflowWheel() // 若更高一层时钟轮无法容纳,会递归创建更高一层的时钟轮 overflowWheel.add(timerTaskEntry) } }推进时钟轮
advanceClock接口实现时钟轮的推进,该接口只是修改currentTime。
def advanceClock(timeMs: Long): Unit = { // timeMs 超过了当前任务桶bucket 的时间范围 if (timeMs >= currentTime + tickMs) { // 修改currentTime,对timeMs向下取整 currentTime = timeMs - (timeMs % tickMs)
// Try to advance the clock of the overflow wheel if present if (overflowWheel != null) overflowWheel.advanceClock(currentTime) } }定时任务桶解读
定时任务桶实现对应于TimerTaskList,它继承自Delayed:
/** @reference:* Delayed https://examples.javacodegeeks.com/core-java/util/concurrent/delayqueue/java-util-concurrent-delayqueue-example/* 由于继承自Delayed必须实现下面两个接口:* 1、getDelay: a method that returns how much time is left before the delay completes,* getDelay method is important because Java decided to dequeue element from queue if getDelay() method returns a value less than or equal to zero.”* 2、compareTo: The Delayed interface extends the Comparable interface, so Delayed implementations must* override the compareTo() to specify how they should be ordered with respect to other Delayed objects.* */@threadsafeprivate[timer] class TimerTaskList(taskCounter: AtomicInteger) extends Delayed由于继承自Delayed,必须实现两个接口:
/** * @func: 依赖于DelayQueue必须实现该接口 * @return 返回截止超时还剩下多少时间 * */ def getDelay(unit: TimeUnit): Long = { unit.convert(max(getExpiration - Time.SYSTEM.hiResClockMs, 0), TimeUnit.MILLISECONDS) }
/** * @func: 依赖于DelayQueue必须实现该接口, specify how they should be ordered with respect to other Delayed objects * */ def compareTo(d: Delayed): Int = { val other = d.asInstanceOf[TimerTaskList] java.lang.Long.compare(getExpiration, other.getExpiration) } // Remove all task entries and apply the supplied function to each of them def flush(f: TimerTaskEntry => Unit): Unit = { synchronized { var head = root.next // 遍历整个链表,对其中的每个元素执行f函数并删除所有的元素 while (head ne root) { remove(head) f(head) head = root.next } expiration.set(-1L) } }Kafka 异步任务管理
定时器 SystemTimer
Kafka 定义了kafka.utils.timer包中Timer特质来统一定时器相关接口:
trait Timer { /** * Add a new task to this executor. It will be executed after the task's delay * (beginning from the time of submission) * @param timerTask the task to add */ def add(timerTask: TimerTask): Unit
/** * Advance the internal clock, executing any tasks whose expiration has been * reached within the duration of the passed timeout. * @param timeoutMs * @return whether or not any tasks were executed */ def advanceClock(timeoutMs: Long): Boolean
/** * Get the number of tasks pending execution * @return the number of tasks */ def size: Int
/* Shutdown the timer service, leaving pending tasks unexecuted */ def shutdown(): Unit}SystemTimer继承自Timer,内部构造了TimingWheel多层时钟轮:
// timeout timer private[this] val taskExecutor = Executors.newFixedThreadPool(1, (runnable: Runnable) => KafkaThread.nonDaemon("executor-" + executorName, runnable)) // private[this] val delayQueue = new DelayQueue[TimerTaskList]() private[this] val taskCounter = new AtomicInteger(0) // 构造多层时钟轮 private[this] val timingWheel = new TimingWheel( tickMs = tickMs, wheelSize = wheelSize, startMs = startMs, taskCounter = taskCounter, delayQueue )添加任务
def add(timerTask: TimerTask): Unit = { readLock.lock() try { addTimerTaskEntry(new TimerTaskEntry(timerTask, timerTask.delayMs + Time.SYSTEM.hiResClockMs)) } finally { readLock.unlock() } }
private def addTimerTaskEntry(timerTaskEntry: TimerTaskEntry): Unit = { if (!timingWheel.add(timerTaskEntry)) { // Already expired or cancelled if (!timerTaskEntry.cancelled) // 执行线程池的submit方法获取任务返回的Future taskExecutor.submit(timerTaskEntry.timerTask) } }时钟推进
def advanceClock(timeoutMs: Long): Boolean = { // 等待timeout,如果有超时任务链,则从延时队列中取出 var bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS) if (bucket != null) { // 存在超时任务链 writeLock.lock() try { while (bucket != null) { // 推进时钟轮,内部如果有更高一层时钟轮,则推进更高一层时钟轮 timingWheel.advanceClock(bucket.getExpiration) // 将任务连中非取消的任务submit到线程池执行 bucket.flush(addTimerTaskEntry) // 默认timeout=0,即尽可能多的取出超时任务链 bucket = delayQueue.poll() } } finally { writeLock.unlock() } true } else { false } }过期任务检测
ExpiredOperationReaper来进行过期任务的检测,在advanceClock的接口会反复调用timeoutTimer.advanceClock将时钟轮向前推进,并执行过期的任务。
def advanceClock(timeoutMs: Long): Unit = { timeoutTimer.advanceClock(timeoutMs)
// Trigger a purge if the number of completed but still being watched operations is larger than // the purge threshold. That number is computed by the difference btw the estimated total number of // operations and the number of pending delayed operations. if (estimatedTotalOperations.get - numDelayed > purgeInterval) { // now set estimatedTotalOperations to delayed (the number of pending operations) since we are going to // clean up watchers. Note that, if more operations are completed during the clean up, we may end up with // a little overestimated total number of operations. estimatedTotalOperations.getAndSet(numDelayed) debug("Begin purging watch lists") val purged = watcherLists.foldLeft(0) { case (sum, watcherList) => sum + watcherList.allWatchers.map(_.purgeCompleted()).sum } debug("Purged %d elements from watch lists.".format(purged)) } } /** * A background reaper to expire delayed operations that have timed out * 检测过期的异步任务并进行处理,在线程函数中会反复调用内部定时器对象timeoutTimer(SystemTimer) * advanceClock方法向前推进 * 如果有过期的任务,则会将其从定时器中移除并执行回调 */ private class ExpiredOperationReaper extends ShutdownableThread( "ExpirationReaper-%d-%s".format(brokerId, purgatoryName), false) { /** * @func: 推进时钟轮,在基类ShutdownableThread的run方法中调用 * */ override def doWork(): Unit = { advanceClock(200L) } }异步任务管理
实现对应于DelayedOperationPurgatory,
private val Shards = 512 // Shard the watcher list to reduce lock contention
def apply[T <: DelayedOperation](purgatoryName: String, brokerId: Int = 0, purgeInterval: Int = 1000, reaperEnabled: Boolean = true, timerEnabled: Boolean = true): DelayedOperationPurgatory[T] = { // SystemTimer为定时器,内部通过时钟轮TimingWheel实现 val timer = new SystemTimer(purgatoryName) new DelayedOperationPurgatory[T](purgatoryName, timer, brokerId, purgeInterval, reaperEnabled, timerEnabled) }
}
/** * A helper purgatory class for bookkeeping delayed operations with a timeout, * and expiring timed out operations. */final class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, timeoutTimer: Timer, brokerId: Int = 0, purgeInterval: Int = 1000, reaperEnabled: Boolean = true, timerEnabled: Boolean = true) extends Logging with KafkaMetricsGroup { ... /* background thread expiring operations that have timed out */ private val expirationReaper = new ExpiredOperationReaper() if (reaperEnabled) expirationReaper.start() }参考资料
版权声明: 本文为 InfoQ 作者【正向成长】的原创文章。
原文链接:【http://xie.infoq.cn/article/f9e04ccc51dcb2505a58f3e8e】。
本文遵守【CC-BY 4.0】协议,转载请保留原文出处及本版权声明。
正向成长
正向成长 2018.08.06 加入
想要坚定地做大规模数据处理(流数据方向),希望结合结合批处理的传统处理方式,以及之后流批混合处理方向进行学习和记录。











评论