消息队列 Kafka:源码解读 (一)- 异步任务管理
Kafka 通过DelayedOperationPurgatory
进行异步任务(即DelayedOperation
)的管理,在该类的内部对于无法立刻完成的任务则插入内部的多层时钟轮,等待超时处理。
时钟轮
假设这样一个高并发的场景:一秒钟处理数万个请求,定时 5 秒钟超时,如果每个超时任务启动一个线程,那么久需要启动数万个线程,这样是难以接受的。还可以启动一个线程,间隔一个小时间间隔(假设 100 毫秒)扫描一次任务,如果任务超时则执行。这样就意味着在 5 秒超时时间内,每个 100 毫秒扫描一次,大约要扫描 50 次,同一个任务要被多扫描 40 多次,很浪费 CPU。
时钟轮就是为了减少扫描的次数,节省 CPU。例如上述的 5 秒钟的超时任务,只需要在 4.9 秒时扫描这批任务就可以了。
时钟轮原理
时钟轮和实际的时钟可以进行类比,时间轮中有时间槽和时钟轮两个概念,时间槽类似于时钟的刻度,时钟轮好比秒针(或分针)跳动一个周期。
简单时钟轮
多层时钟轮
Kafka 多层时钟轮实现
Kafaka 的多层时钟轮的实现对应于kafka.utils.timer
包的TimingWheel
类的实现,内部通过TimerTaskList
来保存定时任务链,而其内部则依赖于java.util.concurrent.Delayed
,时钟轮没有时间推进功能,借助Delayed
实现时间的推荐功能。
内部的构造的私有变量:
// 更高一层时钟轮的时间跨度
private[this] val interval = tickMs * wheelSize
// 定时任务链,拥有wheelSize个桶
private[this] val buckets = Array.tabulate[TimerTaskList](wheelSize) {
_ => new TimerTaskList(taskCounter)
}
// 根据tickMs向下取整
private[this] var currentTime = startMs - (startMs % tickMs)
添加更高一层时钟轮
并提供了addOverflowWheel
接口实现创建更高一层时钟轮
private[this] def addOverflowWheel(): Unit = {
synchronized {
if (overflowWheel == null) {
overflowWheel = new TimingWheel(
tickMs = interval,
wheelSize = wheelSize,
startMs = currentTime,
taskCounter = taskCounter,
queue
)
}
}
}
添加定时任务
add
接口来添加定时任务:
/**
* @func: 添加定时任务
* @param timerTaskEntry: 定时任务,在Kafka中定时任务被抽象为TimerTaskEntry类
* @return
*/
def add(timerTaskEntry: TimerTaskEntry): Boolean = {
val expiration = timerTaskEntry.expirationMs
if (timerTaskEntry.cancelled) {
// Cancelled
false
} else if (expiration < currentTime + tickMs) {
// Already expired
false
} else if (expiration < currentTime + interval) {
// Put in its own bucket
val virtualId = expiration / tickMs
// 计算时间槽的位置并获取任务桶(TimerTaskEntry组成的双向链表)
val bucket = buckets((virtualId % wheelSize.toLong).toInt)
bucket.add(timerTaskEntry)
// Set the bucket expiration time
if (bucket.setExpiration(virtualId * tickMs)) {
queue.offer(bucket)
}
true
} else {
// Out of the interval. Put it into the parent timer
if (overflowWheel == null) addOverflowWheel()
// 若更高一层时钟轮无法容纳,会递归创建更高一层的时钟轮
overflowWheel.add(timerTaskEntry)
}
}
推进时钟轮
advanceClock
接口实现时钟轮的推进,该接口只是修改currentTime
。
def advanceClock(timeMs: Long): Unit = {
// timeMs 超过了当前任务桶bucket 的时间范围
if (timeMs >= currentTime + tickMs) {
// 修改currentTime,对timeMs向下取整
currentTime = timeMs - (timeMs % tickMs)
// Try to advance the clock of the overflow wheel if present
if (overflowWheel != null) overflowWheel.advanceClock(currentTime)
}
}
定时任务桶解读
定时任务桶实现对应于TimerTaskList
,它继承自Delayed
:
/*
* @reference:
* Delayed https://examples.javacodegeeks.com/core-java/util/concurrent/delayqueue/java-util-concurrent-delayqueue-example/
* 由于继承自Delayed必须实现下面两个接口:
* 1、getDelay: a method that returns how much time is left before the delay completes,
* getDelay method is important because Java decided to dequeue element from queue
if getDelay() method returns a value less than or equal to zero.”
* 2、compareTo: The Delayed interface extends the Comparable interface, so Delayed implementations must
* override the compareTo() to specify how they should be ordered with respect to other Delayed objects.
* */
@threadsafe
private[timer] class TimerTaskList(taskCounter: AtomicInteger) extends Delayed
由于继承自Delayed
,必须实现两个接口:
/**
* @func: 依赖于DelayQueue必须实现该接口
* @return 返回截止超时还剩下多少时间
* */
def getDelay(unit: TimeUnit): Long = {
unit.convert(max(getExpiration - Time.SYSTEM.hiResClockMs, 0), TimeUnit.MILLISECONDS)
}
/**
* @func: 依赖于DelayQueue必须实现该接口, specify how they should be ordered with respect to other Delayed objects
* */
def compareTo(d: Delayed): Int = {
val other = d.asInstanceOf[TimerTaskList]
java.lang.Long.compare(getExpiration, other.getExpiration)
}
// Remove all task entries and apply the supplied function to each of them
def flush(f: TimerTaskEntry => Unit): Unit = {
synchronized {
var head = root.next
// 遍历整个链表,对其中的每个元素执行f函数并删除所有的元素
while (head ne root) {
remove(head)
f(head)
head = root.next
}
expiration.set(-1L)
}
}
Kafka 异步任务管理
定时器 SystemTimer
Kafka 定义了kafka.utils.timer
包中Timer
特质来统一定时器相关接口:
trait Timer {
/**
* Add a new task to this executor. It will be executed after the task's delay
* (beginning from the time of submission)
* @param timerTask the task to add
*/
def add(timerTask: TimerTask): Unit
/**
* Advance the internal clock, executing any tasks whose expiration has been
* reached within the duration of the passed timeout.
* @param timeoutMs
* @return whether or not any tasks were executed
*/
def advanceClock(timeoutMs: Long): Boolean
/**
* Get the number of tasks pending execution
* @return the number of tasks
*/
def size: Int
/* Shutdown the timer service, leaving pending tasks unexecuted */
def shutdown(): Unit
}
SystemTimer
继承自Timer
,内部构造了TimingWheel
多层时钟轮:
// timeout timer
private[this] val taskExecutor = Executors.newFixedThreadPool(1,
(runnable: Runnable) => KafkaThread.nonDaemon("executor-" + executorName, runnable))
//
private[this] val delayQueue = new DelayQueue[TimerTaskList]()
private[this] val taskCounter = new AtomicInteger(0)
// 构造多层时钟轮
private[this] val timingWheel = new TimingWheel(
tickMs = tickMs,
wheelSize = wheelSize,
startMs = startMs,
taskCounter = taskCounter,
delayQueue
)
添加任务
def add(timerTask: TimerTask): Unit = {
readLock.lock()
try {
addTimerTaskEntry(new TimerTaskEntry(timerTask, timerTask.delayMs + Time.SYSTEM.hiResClockMs))
} finally {
readLock.unlock()
}
}
private def addTimerTaskEntry(timerTaskEntry: TimerTaskEntry): Unit = {
if (!timingWheel.add(timerTaskEntry)) {
// Already expired or cancelled
if (!timerTaskEntry.cancelled)
// 执行线程池的submit方法获取任务返回的Future
taskExecutor.submit(timerTaskEntry.timerTask)
}
}
时钟推进
def advanceClock(timeoutMs: Long): Boolean = {
// 等待timeout,如果有超时任务链,则从延时队列中取出
var bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS)
if (bucket != null) { // 存在超时任务链
writeLock.lock()
try {
while (bucket != null) {
// 推进时钟轮,内部如果有更高一层时钟轮,则推进更高一层时钟轮
timingWheel.advanceClock(bucket.getExpiration)
// 将任务连中非取消的任务submit到线程池执行
bucket.flush(addTimerTaskEntry)
// 默认timeout=0,即尽可能多的取出超时任务链
bucket = delayQueue.poll()
}
} finally {
writeLock.unlock()
}
true
} else {
false
}
}
过期任务检测
ExpiredOperationReaper
来进行过期任务的检测,在advanceClock
的接口会反复调用timeoutTimer.advanceClock
将时钟轮向前推进,并执行过期的任务。
def advanceClock(timeoutMs: Long): Unit = {
timeoutTimer.advanceClock(timeoutMs)
// Trigger a purge if the number of completed but still being watched operations is larger than
// the purge threshold. That number is computed by the difference btw the estimated total number of
// operations and the number of pending delayed operations.
if (estimatedTotalOperations.get - numDelayed > purgeInterval) {
// now set estimatedTotalOperations to delayed (the number of pending operations) since we are going to
// clean up watchers. Note that, if more operations are completed during the clean up, we may end up with
// a little overestimated total number of operations.
estimatedTotalOperations.getAndSet(numDelayed)
debug("Begin purging watch lists")
val purged = watcherLists.foldLeft(0) {
case (sum, watcherList) => sum + watcherList.allWatchers.map(_.purgeCompleted()).sum
}
debug("Purged %d elements from watch lists.".format(purged))
}
}
/**
* A background reaper to expire delayed operations that have timed out
* 检测过期的异步任务并进行处理,在线程函数中会反复调用内部定时器对象timeoutTimer(SystemTimer)
* advanceClock方法向前推进
* 如果有过期的任务,则会将其从定时器中移除并执行回调
*/
private class ExpiredOperationReaper extends ShutdownableThread(
"ExpirationReaper-%d-%s".format(brokerId, purgatoryName),
false) {
/**
* @func: 推进时钟轮,在基类ShutdownableThread的run方法中调用
* */
override def doWork(): Unit = {
advanceClock(200L)
}
}
异步任务管理
实现对应于DelayedOperationPurgatory
,
private val Shards = 512 // Shard the watcher list to reduce lock contention
def apply[T <: DelayedOperation](purgatoryName: String,
brokerId: Int = 0,
purgeInterval: Int = 1000,
reaperEnabled: Boolean = true,
timerEnabled: Boolean = true):
DelayedOperationPurgatory[T] = {
// SystemTimer为定时器,内部通过时钟轮TimingWheel实现
val timer = new SystemTimer(purgatoryName)
new DelayedOperationPurgatory[T](purgatoryName, timer, brokerId,
purgeInterval, reaperEnabled, timerEnabled)
}
}
/**
* A helper purgatory class for bookkeeping delayed operations with a timeout,
* and expiring timed out operations.
*/
final class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String,
timeoutTimer: Timer,
brokerId: Int = 0,
purgeInterval: Int = 1000,
reaperEnabled: Boolean = true,
timerEnabled: Boolean = true)
extends Logging with KafkaMetricsGroup {
...
/* background thread expiring operations that have timed out */
private val expirationReaper = new ExpiredOperationReaper()
if (reaperEnabled)
expirationReaper.start()
}
参考资料
版权声明: 本文为 InfoQ 作者【正向成长】的原创文章。
原文链接:【http://xie.infoq.cn/article/f9e04ccc51dcb2505a58f3e8e】。
本文遵守【CC-BY 4.0】协议,转载请保留原文出处及本版权声明。
正向成长
正向成长 2018.08.06 加入
想要坚定地做大规模数据处理(流数据方向),希望结合结合批处理的传统处理方式,以及之后流批混合处理方向进行学习和记录。
评论