一、前言
延迟阻塞队列DelayedWorkQueue中放的元素是ScheduledFutureTask,提交的任务被包装成ScheduledFutureTask放进工作队列,Woker工作线程消费工作队列中的任务,即调用ScheduledFutureTask.run(),ScheduledFutureTask又调用任务的run(),这点和ThreadPoolExecutor差不多,而ScheduledThreadPoolExecutor是如何实现按时间调度的呢?
ScheduledThreadPoolExecutor提交任务的核心函数有 3 个:
schedule(...) 按一定延迟时长执行任务,只执行一次。
scheduleAtFixedRate(...)按固定频率,周期性执行任务。
scheduleWithFixedDelay(...) 按固定延迟时间,受任务执行时长影响,周期性执行任务。
二、提交任务
首先从 3 个核心函数出发,其入口源码相似,提交的任务都会先创建一个ScheduledFutureTask对象,然后调用decorateTask包装返回RunnableScheduledFuture对象,最后刚才被包装成RunnableScheduledFuture对象作为参数调用统一的延迟执行函数delayedExecute()。
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); //将任务包装成RunnableScheduledFuture对象 //decorateTask直接返回sft,这个函数的意图是让开发者DIY继承实现的 RunnableScheduledFuture<?> t = decorateTask(command, new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit))); //延迟执行 加延迟阻塞队列+启动一个空的Worker线程 delayedExecute(t); return t;}
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (period <= 0) throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, //传入第一次延时时间 now+initialDelay triggerTime(initialDelay, unit), unit.toNanos(period)); //将任务包装成RunnableScheduledFuture对象, //decorateTask直接返回sft,这个函数的意图是让开发者DIY继承实现的 RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; //延迟执行 加延迟阻塞队列+启动一个空的Worker线程 delayedExecute(t); return t;}public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (delay <= 0) throw new IllegalArgumentException(); ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), //重点! 传入的delay取反了,用delay正负来区分执行间隔是否固定 unit.toNanos(-delay)); //将任务包装成RunnableScheduledFuture对象 //decorateTask直接返回sft,这个函数的意图是让开发者DIY继承实现的 RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; //延迟执行 加延迟阻塞队列+启动一个空的Worker线程 delayedExecute(t); return t;}
复制代码
1、decorateTask 留给开发者去实现
看decorateTask源码,其有两个参数,任务原始对象runnable和把原始任务包装成RunnableScheduledFuture对象。decorateTask函数直接返回RunnableScheduledFuture对象,没有做什么事情,那其意图是什么呢?
decorateTask是想让开发者继承ScheduledThreadPoolExecutor实现定制化定时线程池时,可以实现这个函数,对原始任务对象和包装后任务对象做特殊 DIY 处理。
protected <V> RunnableScheduledFuture<V> decorateTask( Runnable runnable, RunnableScheduledFuture<V> task) { return task;}
复制代码
2、delayedExecute 延迟调度
delayedExecute()是延迟执行和周期性执行的主函数,其基本流程如下:
判断线程池的状态,runstate为shutdown将拒绝任务提交。
任务处于正常运行状态,则将任务直接加入阻塞工作队列。
再次判断线程池的状态,runstate为shutdown,再判断是否是周期性任务(isPeriodic),不同的性质不同的处理策略。
一起正常预启动一个空Worker线程,循环从阻塞队列中消费任务。
private void delayedExecute(RunnableScheduledFuture<?> task) { if (isShutdown()) reject(task); else { //1、直接加入延时阻塞队列 super.getQueue().add(task); if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task)) task.cancel(false); else //2、预启动一个空的worker ensurePrestart(); }}void ensurePrestart() { int wc = workerCountOf(ctl.get()); if (wc < corePoolSize) //创建一个空worker,并且启动 addWorker(null, true); else if (wc == 0) addWorker(null, false);}
复制代码
三、ScheduledFutureTask 时间调度执行的核心
可以看出提交的任务最重被包装成ScheduledFutureTask,然后加到工作队列由Worker工作线程去消费了。
延迟执行和周期性执行的核心代码也就在于ScheduledFutureTask。
1、基本架构
ScheduledFutureTask继承了FutureTask并实现了接口RunnableScheduledFuture。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-dxeJoszR-1604798379406)(C:\study\myStudy\myNotes\java\concurrent\线程池\ScheduledFutureTask.png)]
private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> {
/** Sequence number to break ties FIFO */ private final long sequenceNumber;
/** The time the task is enabled to execute in nanoTime units */ ////任务被调用的执行时间 private long time; /** * Period in nanoseconds for repeating tasks. */ //周期性执行的时间间隔 private final long period;
/** The actual task to be re-enqueued by reExecutePeriodic */ RunnableScheduledFuture<V> outerTask = this;
/** * Index into delay queue, to support faster cancellation. * 索引到延迟队列为了支持快速取消 */ int heapIndex;
/** * Creates a one-shot action with given nanoTime-based trigger time. */ ScheduledFutureTask(Runnable r, V result, long ns) { super(r, result); this.time = ns; this.period = 0; this.sequenceNumber = sequencer.getAndIncrement(); }}
复制代码
ScheduledFutureTask实现了接口Delayed,所以需要重写两个方法getDelay、compareTo。
//获取当前延迟时间(距离下次任务执行还有多久)public long getDelay(TimeUnit unit) { return unit.convert(time - now(), NANOSECONDS);}/** * 比较this 和 other谁先执行 * @param other * @return <=0 this先执行 */public int compareTo(Delayed other) { if (other == this) // compare zero if same object return 0; if (other instanceof ScheduledFutureTask) { ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other; long diff = time - x.time; if (diff < 0) return -1; else if (diff > 0) return 1; else if (sequenceNumber < x.sequenceNumber) return -1; else return 1; } //比较Delay long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS); return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;}
复制代码
2、延迟执行和周期执行区别
延迟执行和周期执行区别在于period:
public boolean isPeriodic() { return period != 0;}private void setNextRunTime() { long p = period; //AtFixedRate 当传入period > 0 时 ,每次执行的时间的间隔是固定的 if (p > 0) time += p; else //WithFixedDelay 当传入period < 0 时,每次执行的时间受任务执行时长影响,是任务执行结束后的当前时间+ (-p) time = triggerTime(-p);}long triggerTime(long delay) { return now() + ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));}
复制代码
3、heapIndex 支持快速取消定时任务
ScheduledFutureTask还有一个变量heapIndex,是记录任务在阻塞队列的索引的,其方便支持快速取消任务和删除任务。但是其并不会作为删除任务的位置判断,只是当用于判断惹怒是否在阻塞队列中:heapIndex >= 0 在阻塞队列中,取消任务时需要同时从阻塞队列删除任务;heapIndex < 0不在阻塞队列中。
阻塞队列DelayedWorkQueue的每次堆化siftUp()、siftDown(),以及remove()都维护着heapIndex,想必这也是ScheduledThreadPoolExecutor自行定制延迟阻塞队列的原因之一。
public boolean cancel(boolean mayInterruptIfRunning) { boolean cancelled = super.cancel(mayInterruptIfRunning); if (cancelled && removeOnCancel && heapIndex >= 0) //从延迟阻塞队列中删除任务 remove(this); return cancelled;}
复制代码
4、核心逻辑 run()
ScheduledFutureTask间接实现了接口Runnable,其核心逻辑就在run():
public void run() { boolean periodic = isPeriodic(); //当runState为SHUTDOWN时,非周期性任务继续,周期性任务会中断取消
if (!canRunInCurrentRunState(periodic)) cancel(false); else if (!periodic) //非周期性任务,只执行一次 ScheduledFutureTask.super.run(); //runAndReset返回false周期性任务将不再执行 else if (ScheduledFutureTask.super.runAndReset()) { //runAndReset() 周期性任务执行并reset //设置下一次执行时间 setNextRunTime(); //把自己再放回延时阻塞队列 reExecutePeriodic(outerTask); }}
复制代码
(1)canRunInCurrentRunState 不同任务性质不同策略
代码一开始判断线程池的运行状态canRunInCurrentRunState,当线程池处于 SHUTDOWN 状态时,是否是周期性任务有不同的策略:
boolean canRunInCurrentRunState(boolean periodic) { return isRunningOrShutdown(periodic ? continueExistingPeriodicTasksAfterShutdown : executeExistingDelayedTasksAfterShutdown);}/** * False if should cancel/suppress periodic tasks on shutdown. */private volatile boolean continueExistingPeriodicTasksAfterShutdown;
/** * False if should cancel non-periodic tasks on shutdown. */private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
复制代码
(2)单次执行调度父类 FutureTask.run
延迟执行任务,即只执行一次,调用了父类的FutureTask.run()。提交的任务如果是Runnable型,会被包装成Callable型作为FutureTask的成员变量。FutureTask.run()中直接调度执行任务的代码call(),同时返回结果。
需要注意的是,任务代码c.call()若抛出异常会被FutureTask捕获处理,这样对外查找问题不利,所以最好在任务 run()或者 call()的核心代码用 try-catch 包起来。
public void run() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { //调用callable的call,并设置返回值 //如果传进来的任务是Runnable,会被转换成callable result = c.call(); //若运行异常,ran=false,异常会被捕获处理 //所以传进来的任务的run或者call代码块最好try-catch下 ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); }}
复制代码
(3)周期性执行调度父类 FutureTask.runAndReset
从源码看出,周期性执行任务没有返回值,FutureTask.runAndReset最终返回布尔值,并且也会捕获任务代码异常,最终返回 true 代表代码没有出现异常,下次可以正常执行,false 代表任务代码中有异常,下次不能正常执行。
所以特别强调任务代码必须要try-catch,否则一旦出现异常,周期性执行将不会再设置下次执行时间和把自己放回延迟阻塞队列。
protected boolean runAndReset() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return false; boolean ran = false; int s = state; try { Callable<V> c = callable; if (c != null && s == NEW) { try { c.call(); // don't set result //如果c.call抛异常,将会被处理,但是没有打印堆栈,使用者不易排查 // 不会再往下执行ran=false //所以传进来的任务run里需要自己try-catch ran = true; } catch (Throwable ex) { setException(ex); } } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } return ran && s == NEW;}
复制代码
四、onShutdown
还记得ThreadPoolExecutor.shutdown()代码里一个空的钩子函数onShutdown()吗?
ScheduledThreadPoolExecutor的shutdown()和shutdownNow()都是调用了父类ThreadPoolExecutor的函数,但是实现了onShutdown()。
回顾当调用shutdownNow()时,会清空工作队列并中断所有工作线程,这样正在执行的任务也会中断,可额外做的事情几乎没有,所以官方也没有给开发者留钩子函数。
而调用shutdown(),不会清空工作线程也不会中断正在执行任务的工作线程,对于ScheduledThreadPoolExecutor有周期性任务,会往复重置执行,如果不额外做些处理就使得即使调用了shutdown()也不会销毁线程池。
onShutdown()对于任务性质不同有不同处理策略:
/** * Cancels and clears the queue of all tasks that should not be run * due to shutdown policy. Invoked within super.shutdown. */@Override void onShutdown() { BlockingQueue<Runnable> q = super.getQueue(); //Shutdown后是否保持延时, 默认true boolean keepDelayed = getExecuteExistingDelayedTasksAfterShutdownPolicy(); //Shutdown后是否保持周期, 默认false boolean keepPeriodic = getContinueExistingPeriodicTasksAfterShutdownPolicy(); if (!keepDelayed && !keepPeriodic) { for (Object e : q.toArray()) if (e instanceof RunnableScheduledFuture<?>) ((RunnableScheduledFuture<?>) e).cancel(false); q.clear(); //取消并清空队列 } else { // Traverse snapshot to avoid iterator exceptions for (Object e : q.toArray()) { if (e instanceof RunnableScheduledFuture) { RunnableScheduledFuture<?> t = (RunnableScheduledFuture<?>)e; if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) || t.isCancelled()) { // also remove if already cancelled if (q.remove(t)) t.cancel(false); } } } } //尝试中断线程池 tryTerminate();}
复制代码
五、总结
通过阅读源码,知道了一些需要注意的细节,一不小心就会踩坑:
任务代码一定要try-catch,否则异常被ScheduledFutureTask的父类FutureTask捕获处理,难以排查问题,同时周期性执行任务会因为任务代码抛异常而不再设置下次执行时间和把自己放回延迟队列的操作,即不会再周期性执行。
ScheduledFutureTask通过一个变量就区分了延迟和周期性执行,period=0延迟执行,即只执行一次;period>0固定频率周期执行;``period<0`固定延迟时间周期执行,两次任务开始执行时间间隔受任务执行耗时影响。
如果周期性任务的执行时长大于period,且看重执行等间隔,使用scheduleWithFixedDelay()。
若周期性任务的执行时长远小于period,则可以使用scheduleAtFixedRate()。
默认情况下,线程池处于关闭状态(shutdown),周期性任务会被取消和阻止执行,非周期性任务会顺利执行完成不会被阻止。
PS: 如若文章中有错误理解,欢迎批评指正,同时非常期待你的评论、点赞和收藏。我是徐同学,愿与你共同进步!
评论