写点什么

你说这是冷知识?Netty 时间轮调度算法原理分析,再不了解你就 out 啦 (1)

用户头像
极客good
关注
发布于: 刚刚

// 计算当前添加任务的执行时间 long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;


// Guard against overflow.if (delay > 0 && deadline < 0) {deadline = Long.MAX_VALUE;}// 将任务加入队列 HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);timeouts.add(timeout);return timeout;}


任务会先保存在队列中,当时间轮的时钟拨动时才会判断是否将队列中的任务加载进时间轮。


public void start() {switch (WORKER_STATE_UPDATER.get(this)) {case WORKER_STATE_INIT:// 这里存在并发,通过 CAS 操作保证最终只有一个线程能开启时间轮的工作线程 if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {workerThread.start();}break;case WORKER_STATE_STARTED:break;case WORKER_STATE_SHUTDOWN:throw new IllegalStateException("cannot be started once stopped");default:throw new Error("Invalid WorkerState");}


while (startTime == 0) {try {// startTimeInitialized 是一个 CountDownLatch,目的是为了保证工作线程的 startTime 属性初始化 startTimeInitialized.await();} catch (InterruptedException ignore) {// Ignore - it will be ready very soon.}}}


这里通过 CAS 加锁的方式保证线程安全,避免多次开启。


工作线程开启后,start() 方法会被阻塞,等工作线程的 startTime 属性初始化完成后才被唤醒。为什么只有等 startTime 初始化后才能继续执行呢?因为上面的 newTimeout 方法在线程开启后,需要计算当前添加进来任务的执行时间,而这个执行时间是根据 startTime 计算的。

2.4 时间轮调度

@Overridepublic void run() {// 初始化 startTime.startTime = System.nanoTime();if (startTime == 0) {startTime = 1;}


// 用来唤醒被阻塞的 HashedWheelTimer#start() 方法,保证 startTime 初始化 startTimeInitialized.countDown();


do {// 时钟拨动 final long deadline = waitForNextTick();if (deadline > 0) {int idx = (int) (tick & mask);// 处理过期的任务 processCancelledTasks();HashedWheelBucket bucket =wheel[idx];// 将任务加载进时间轮 transferTimeoutsToBuckets();// 执行当前时间轮槽内的任务 bucket.expireTimeouts(deadline);tick++;}} while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);


// 时间轮关闭,将还未执行的任务以列表的形式保存到 unprocessedTimeouts 集合中,在 stop 方法中返回出去// 还未执行的任务可能会在两个地方,一:时间轮数组内,二:队列中 for (HashedWheelBucket bucket: wheel) {bucket.clearTimeouts(unprocessedTimeouts);}for (;;) {HashedWheelTimeout timeout = timeouts.poll();if (timeout == null) {break;}if (!timeout.isCancelled()) {unprocessedTimeouts.add(timeout);}}// 处理过期的任务 processCancelledTasks();}


时间轮


【一线大厂Java面试题解析+核心总结学习笔记+最新架构讲解视频+实战项目源码讲义】
浏览器打开:qq.cn.hn/FTf 免费领取
复制代码


每拨动一次 tick 就会 +1,根据这个值与(时间轮数组长度 - 1)进行 & 运算,可以定位时间轮数组内的槽。因为 tick 值一直在增加,所以时间轮数组看起来就像一个不断循环的圆。


  • 先初始化 startTime 值,因为后面任务执行的时间是根据 startTime 计算的

  • 时钟拨动,如果时间未到,则 sleep 一会儿

  • 处理过期的任务

  • 将任务加载进时间轮

  • 执行当前时钟对应时间轮内的任务

  • 时间轮关闭,将所有未执行的任务封装到 unprocessedTimeouts 集合中,在 stop 方法中返回出去

  • 处理过期的任务


上面简单罗列了下 run 方法的大概执行步骤,下面是具体方法的分析。

2.5 时钟拨动

如果时间轮设置的 tickDuration 为 100ms 拨动一次,当时钟拨动一次后,应该计算下一次时钟拨动的时间,如果还没到就 sleep 一会儿,等到拨动时间再醒来。


private long waitForNextTick() {// 计算时钟下次拨动的相对时间 long deadline = tickDuration * (tick + 1);


for (;;) {// 获取当前时间的相对时间 final long currentTime = System.nanoTime() - startTime;// 计算距离时钟下次拨动的时间// 这里之所以加 999999 后再除 10000000, 是为了保证足够的 sleep 时间// 例如:当 deadline - currentTime = 2000002 的时候,如果不加 999999,则只睡了 2ms// 而 2ms 其实是未到达 deadline 时间点的,所以为了使上述情况能 sleep 足够的时间,加上 999999 后,会多睡 1mslong sleepTimeMs = (deadline - currentTime + 999999) / 1000000;


// <=0 说明可以拨动时钟了 if (sleepTimeMs <= 0) {if (currentTime == Long.MIN_VALUE) {return -Long.MAX_VALUE;} else {return currentTime;}}


// 这里是为了兼容 Windows 平台,因为 Windows 平台的调度最小单位为 10ms,如果不是 10ms 的倍数,可能会引起 sleep 时间不准确// See https://github.com/Netty/Netty/issues/356if (PlatformDependent.isWindows()) {sleepTimeMs = sleepTimeMs / 10 * 10;}


try {// sleep 到下次时钟拨动 Thread.sleep(sleepTimeMs);} catch (InterruptedException ignored) {if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {return Long.MIN_VALUE;}}}}


如果时间不到就 sleep 等待一会儿,为了使任务时钟准确,可以从上面的代码中看出 Netty 做了一些优化,比如 sleepTimeMs 的计算,Windows 平台的处理等。

2.6 将任务从队列加载进时间轮

private void transferTimeoutsToBuckets() {


// 一次最多只处理队列中的 100000 个任务 for (int i = 0; i < 100000; i++) {HashedWheelTimeout timeout = timeouts.poll();if (timeout == null) {// all processedbreak;}// 过滤已经取消的任务 if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {continue;}// 计算当前任务到执行还需要经过几次时钟拨动// 假设时间轮数组大小是 10,calculated 为 12,需要时间轮转动一圈加两次时钟拨动后后才能执行这个任务,因此还需要计算一下圈数 long calculated = timeout.deadline / tickDuration;// 计算当前任务到执行还需要经过几圈时钟拨动 timeout.remainingRounds = (calculated - tick) / wheel.length;// 有的任务可能在队列里很长时间,时间过期了也没有被调度,将这种情况的任务放在当前轮次内执行 final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past.// 计算任务在时间轮数组中的槽 int stopIndex = (int) (ticks & mask);HashedWheelBucket bucket = wheel[stopIndex];// 将任务放到时间轮的数组中,多个任务可能定位时间轮的同一个槽,这些任务通过以链表的形式链接 bucket.addTimeout(timeout);}}


void addTimeout(HashedWheelTimeout timeout) {assert timeout.bucket == null;// 任务构成双向链表 timeout.bucket = this;if (head == null) {head = tail = timeout;} else {tail.next = timeout;timeout.prev = tail;tail = timeout;}}


在上面也提到过,任务刚加进来不会立即到时间轮中去,而是暂时保存到一个队列中,当时间轮时钟拨动时,会将任务从队列中加载进时间轮内。


时间轮每次最大处理 100000 个任务,因为任务的执行时间是用户自定义的,所以需要计算任务到执行需要经过多少次时钟拨动,并计算时间轮拨动的圈数。接着将任务加载进时间轮对应的槽内,可能有多个任务经过 hash 计算后定位到同一个槽,这些任务会以双向链表的结构保存,有点类似 HashMap 处理碰撞的情况。

2.7 执行任务

public void expireTimeouts(long deadline) {HashedWheelTimeout timeout = head;


while (timeout != null) {HashedWheelTimeout next = timeout.next;// 任务执行的圈数 > 0,表示任务还需要经过 remainingRounds 圈时钟循环才能执行 if (timeout.remainingRounds <= 0) {// 从链表中移除当前任务,并返回链表中下一个任务 next = remove(timeout);if (timeout.deadline <= deadline) {// 执行任务 timeout.expire();} else {// The timeout was placed into a wrong slot. This should never happen.throw new IllegalStateException(String.format("timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));}} else if (timeout.isCancelled()) {// 过滤取消的任务 next = remove(timeout);} else {// 圈数 -1timeout.remainingRounds --;}timeout = next;}}


public void expire() {// 任务状态校验 if (!compareAndSetState(ST_INIT, ST_EXPIRED)) {return;}


try {task.run(this);} catch (Throwable t) {if (logger.isWarnEnabled()) {logger.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + '.', t);}}}


时间轮槽内的任务以链表形式存储,这些任务执行的时间可能会不一样,有的在当前时钟执行,有的在下一圈或者下两圈对应的时钟执行。当任务在当前时钟执行时,需要将这个任务从链表中删除,重新维护链表关系。

2.8 终止时间轮

@Overridepublic Set<Timeout> stop() {// 终止时间轮的线程不能是时间轮的工作线程 if (Thread.currentThread() == workerThread) {throw new IllegalStateException(HashedWheelTimer.class.getSimpleName() +".stop() cannot be called from " +TimerTask.class.getSimpleName());}// 将时间轮的状态修改为 WORKER_STATE_SHUTDOWN,这里有两种情况// 一:时间轮是 WORKER_STATE_INIT 状态,表明时间轮从创建到终止一直没有任务进来// 二:时间轮是 WORKER_STATE_STARTED 状态,多个线程尝试终止时间轮,只有一个操作成功 if (!WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)) {// 代码走到这里,时间轮只能是两种状态中的一个,WORKER_STATE_INIT 和 WORKER_STATE_SHUTDOWN// 为 WORKER_STATE_INIT 表示时间轮没有任务,因此不用返回未处理的任务,但是需要将时间轮实例 -1// 为 WORKER_STATE_SHUTDOWN 表示是 CAS 操作失败,什么都不用做,因为 CAS 成功的线程会处理 if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) {// 时间轮实例对象 -1INSTANCE_COUNTER.decrementAndGet();if (leak != null) {boolean closed = leak.close(this);assert closed;}}// CAS 操作失败,或者时间轮没有处理过任务,返回空的任务列表 return Collections.emptySet();}


try {boolean interrupted = false;while (workerThread.isAlive()) {// 中断时间轮工作线程 workerThread.interrupt();try {// 终止时间轮的线程等待时间轮工作线程 100ms,这个过程主要是为了时间轮工作线程处理未执行的任务 workerThread.join(100);} catch (InterruptedException ignored) {interrupted = true;}}


if (interrupted) {Thread.currentThread().interrupt();}} finally {INSTANCE_COUNTER.decrementAndGet();if (leak != null) {boolean closed = leak.close(this);assert closed;}}// 返回未处理的任务 return worker.unprocessedTimeouts();}


当终止时间轮时,时间轮状态有两种情况:


  • WORKER_STATE_INIT:时间轮初始化,前面我们说过,当初始化时间轮对象时并不会立即开启时间轮工作线程,而是第一次添加任务时才开启,为 WORKER_STATE_INIT 表示时间轮没有处理过任务

  • WORKER_STATE_STARTED:时间轮在工作,这里也有两种情况,存在并发与不存在并发,如果多个线程都尝试终止时间轮,肯定只能有一个成功


时间轮停止运行后会将未执行的任务返回出去,至于怎么处理这些任务,由业务方自己定义,这个流程和线程池的 shutdownNow 方法是类似的。


如果时间轮在运行,怎么才能获取到未执行的任务呢,答案就在上面的 run() 方法中,如果时间轮处于非运行状态,会把时间轮数组与队列中未执行且未取消的任务保存到 unprocessedTimeouts 集合中。而终止时间轮成功的线程只需要等待一会儿即可,这个等待是通过 workerThread.join(100); 实现的。


取消时间轮内的任务相对比较简单,这里就不概述了,想要了解的自行查看即可。


上面就是时间轮运行的基本原理了。

用户头像

极客good

关注

还未添加个人签名 2021.03.18 加入

还未添加个人简介

评论

发布
暂无评论
你说这是冷知识?Netty时间轮调度算法原理分析,再不了解你就out啦(1)