大家好,我是冰河~~
在【高并发专题】的专栏中,我们深度分析了 ThreadPoolExecutor 类的源代码,而 ScheduledThreadPoolExecutor 类是 ThreadPoolExecutor 类的子类。今天我们就来一起手撕 ScheduledThreadPoolExecutor 类的源代码。
构造方法
我们先来看下 ScheduledThreadPoolExecutor 的构造方法,源代码如下所示。
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue());
}
public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory);
}
public ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), handler);
}
public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory, handler);
}
复制代码
从代码结构上来看,ScheduledThreadPoolExecutor 类是 ThreadPoolExecutor 类的子类,ScheduledThreadPoolExecutor 类的构造方法实际上调用的是 ThreadPoolExecutor 类的构造方法。
schedule 方法
接下来,我们看一下 ScheduledThreadPoolExecutor 类的 schedule 方法,源代码如下所示。
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
//如果传递的Runnable对象和TimeUnit时间单位为空
//抛出空指针异常
if (command == null || unit == null)
throw new NullPointerException();
//封装任务对象,在decorateTask方法中直接返回ScheduledFutureTask对象
RunnableScheduledFuture<?> t = decorateTask(command, new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit)));
//执行延时任务
delayedExecute(t);
//返回任务
return t;
}
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit)
//如果传递的Callable对象和TimeUnit时间单位为空
//抛出空指针异常
if (callable == null || unit == null)
throw new NullPointerException();
//封装任务对象,在decorateTask方法中直接返回ScheduledFutureTask对象
RunnableScheduledFuture<V> t = decorateTask(callable,
new ScheduledFutureTask<V>(callable, triggerTime(delay, unit)));
//执行延时任务
delayedExecute(t);
//返回任务
return t;
}
复制代码
从源代码可以看出,ScheduledThreadPoolExecutor 类提供了两个重载的 schedule 方法,两个 schedule 方法的第一个参数不同。可以传递 Runnable 接口对象,也可以传递 Callable 接口对象。在方法内部,会将 Runnable 接口对象和 Callable 接口对象封装成 RunnableScheduledFuture 对象,本质上就是封装成 ScheduledFutureTask 对象。并通过 delayedExecute 方法来执行延时任务。
在源代码中,我们看到两个 schedule 都调用了 decorateTask 方法,接下来,我们就看看 decorateTask 方法。
decorateTask 方法
decorateTask 方法源代码如下所示。
protected <V> RunnableScheduledFuture<V> decorateTask(Runnable runnable, RunnableScheduledFuture<V> task) {
return task;
}
protected <V> RunnableScheduledFuture<V> decorateTask(Callable<V> callable, RunnableScheduledFuture<V> task) {
return task;
}
复制代码
通过源码可以看出 decorateTask 方法的实现比较简单,接收一个 Runnable 接口对象或者 Callable 接口对象和封装的 RunnableScheduledFuture 任务,两个方法都是将 RunnableScheduledFuture 任务直接返回。在 ScheduledThreadPoolExecutor 类的子类中可以重写这两个方法。
接下来,我们继续看下 scheduleAtFixedRate 方法。
scheduleAtFixedRate 方法
scheduleAtFixedRate 方法源代码如下所示。
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
//传入的Runnable对象和TimeUnit为空,则抛出空指针异常
if (command == null || unit == null)
throw new NullPointerException();
//如果执行周期period传入的数值小于或者等于0
//抛出非法参数异常
if (period <= 0)
throw new IllegalArgumentException();
//将Runnable对象封装成ScheduledFutureTask任务,
//并设置执行周期
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(period));
//调用decorateTask方法,本质上还是直接返回ScheduledFutureTask对象
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
//设置执行的任务
sft.outerTask = t;
//执行延时任务
delayedExecute(t);
//返回执行的任务
return t;
}
复制代码
通过源码可以看出,scheduleAtFixedRate 方法将传递的 Runnable 对象封装成 ScheduledFutureTask 任务对象,并设置了执行周期,下一次的执行时间相对于上一次的执行时间来说,加上了 period 时长,时长的具体单位由 TimeUnit 决定。采用固定的频率来执行定时任务。
ScheduledThreadPoolExecutor 类中另一个定时调度任务的方法是 scheduleWithFixedDelay 方法,接下来,我们就一起看看 scheduleWithFixedDelay 方法。
scheduleWithFixedDelay 方法
scheduleWithFixedDelay 方法的源代码如下所示。
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
//传入的Runnable对象和TimeUnit为空,则抛出空指针异常
if (command == null || unit == null)
throw new NullPointerException();
//任务延时时长小于或者等于0,则抛出非法参数异常
if (delay <= 0)
throw new IllegalArgumentException();
//将Runnable对象封装成ScheduledFutureTask任务
//并设置固定的执行周期来执行任务
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command, null,triggerTime(initialDelay, unit), unit.toNanos(-delay));
//调用decorateTask方法,本质上直接返回ScheduledFutureTask任务
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
//设置执行的任务
sft.outerTask = t;
//执行延时任务
delayedExecute(t);
//返回任务
return t;
}
复制代码
从 scheduleWithFixedDelay 方法的源代码,我们可以看出在将 Runnable 对象封装成 ScheduledFutureTask 时,设置了执行周期,但是此时设置的执行周期与 scheduleAtFixedRate 方法设置的执行周期不同。此时设置的执行周期规则为:下一次任务执行的时间是上一次任务完成的时间加上 delay 时长,时长单位由 TimeUnit 决定。也就是说,具体的执行时间不是固定的,但是执行的周期是固定的,整体采用的是相对固定的延迟来执行定时任务。
如果大家细心的话,会发现在 scheduleWithFixedDelay 方法中设置执行周期时,传递的 delay 值为负数,如下所示。
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(-delay));
复制代码
这里的负数表示的是相对固定的延迟。
在 ScheduledFutureTask 类中,存在一个 setNextRunTime 方法,这个方法会在 run 方法执行完任务后调用,这个方法更能体现 scheduleAtFixedRate 方法和 scheduleWithFixedDelay 方法的不同,setNextRunTime 方法的源码如下所示。
private void setNextRunTime() {
//距离下次执行任务的时长
long p = period;
//固定频率执行,
//上次执行任务的时间
//加上任务的执行周期
if (p > 0)
time += p;
//相对固定的延迟
//使用的是系统当前时间
//加上任务的执行周期
else
time = triggerTime(-p);
}
复制代码
在 setNextRunTime 方法中通过对下次执行任务的时长进行判断来确定是固定频率执行还是相对固定的延迟。
triggerTime 方法
在 ScheduledThreadPoolExecutor 类中提供了两个 triggerTime 方法,用于获取下一次执行任务的具体时间。triggerTime 方法的源码如下所示。
private long triggerTime(long delay, TimeUnit unit) {
return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
}
long triggerTime(long delay) {
return now() +
((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}
复制代码
这两个 triggerTime 方法的代码比较简单,就是获取下一次执行任务的具体时间。有一点需要注意的是:delay < (Long.MAX_VALUE >> 1 判断 delay 的值是否小于 Long.MAX_VALUE 的一半,如果小于 Long.MAX_VALUE 值的一半,则直接返回 delay,否则需要处理溢出的情况。
我们看到在 triggerTime 方法中处理防止溢出的逻辑使用了 overflowFree 方法,接下来,我们就看看 overflowFree 方法的实现。
overflowFree 方法
overflowFree 方法的源代码如下所示。
private long overflowFree(long delay) {
//获取队列中的节点
Delayed head = (Delayed) super.getQueue().peek();
//获取的节点不为空,则进行后续处理
if (head != null) {
//从队列节点中获取延迟时间
long headDelay = head.getDelay(NANOSECONDS);
//如果从队列中获取的延迟时间小于0,并且传递的delay
//值减去从队列节点中获取延迟时间小于0
if (headDelay < 0 && (delay - headDelay < 0))
//将delay的值设置为Long.MAX_VALUE + headDelay
delay = Long.MAX_VALUE + headDelay;
}
//返回延迟时间
return delay;
}
复制代码
通过对 overflowFree 方法的源码分析,可以看出 overflowFree 方法本质上就是为了限制队列中的所有节点的延迟时间在 Long.MAX_VALUE 值之内,防止在 ScheduledFutureTask 类中的 compareTo 方法中溢出。
ScheduledFutureTask 类中的 compareTo 方法的源码如下所示。
public int compareTo(Delayed other) {
if (other == this) // compare zero if same object
return 0;
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}
复制代码
compareTo 方法的主要作用就是对各延迟任务进行排序,距离下次执行时间靠前的任务就排在前面。
delayedExecute 方法
delayedExecute 方法是 ScheduledThreadPoolExecutor 类中延迟执行任务的方法,源代码如下所示。
private void delayedExecute(RunnableScheduledFuture<?> task) {
//如果当前线程池已经关闭
//则执行线程池的拒绝策略
if (isShutdown())
reject(task);
//线程池没有关闭
else {
//将任务添加到阻塞队列中
super.getQueue().add(task);
//如果当前线程池是SHUTDOWN状态
//并且当前线程池状态下不能执行任务
//并且成功从阻塞队列中移除任务
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
//取消任务的执行,但不会中断执行中的任务
task.cancel(false);
else
//调用ThreadPoolExecutor类中的ensurePrestart()方法
ensurePrestart();
}
}
复制代码
可以看到在 delayedExecute 方法内部调用了 canRunInCurrentRunState 方法,canRunInCurrentRunState 方法的源码实现如下所示。
boolean canRunInCurrentRunState(boolean periodic) {
return isRunningOrShutdown(periodic ? continueExistingPeriodicTasksAfterShutdown : executeExistingDelayedTasksAfterShutdown);
}
复制代码
可以看到 canRunInCurrentRunState 方法的逻辑比较简单,就是判断线程池当前状态下能够执行任务。
另外,在 delayedExecute 方法内部还调用了 ThreadPoolExecutor 类中的 ensurePrestart()方法,接下来,我们看下 ThreadPoolExecutor 类中的 ensurePrestart()方法的实现,如下所示。
void ensurePrestart() {
int wc = workerCountOf(ctl.get());
if (wc < corePoolSize)
addWorker(null, true);
else if (wc == 0)
addWorker(null, false);
}
复制代码
在 ThreadPoolExecutor 类中的 ensurePrestart()方法中,首先获取当前线程池中线程的数量,如果线程数量小于 corePoolSize 则调用 addWorker 方法传递 null 和 true,如果线程数量为 0,则调用 addWorker 方法传递 null 和 false。
关于 addWork()方法的源码解析,大家可以参考【高并发专题】中的《高并发之——通过ThreadPoolExecutor类的源码深度解析线程池执行任务的核心流程》一文,这里,不再赘述。
reExecutePeriodic 方法
reExecutePeriodic 方法的源代码如下所示。
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
//线程池当前状态下能够执行任务
if (canRunInCurrentRunState(true)) {
//将任务放入队列
super.getQueue().add(task);
//线程池当前状态下不能执行任务,并且成功移除任务
if (!canRunInCurrentRunState(true) && remove(task))
//取消任务
task.cancel(false);
else
//调用ThreadPoolExecutor类的ensurePrestart()方法
ensurePrestart();
}
}
复制代码
总体来说 reExecutePeriodic 方法的逻辑比较简单,但是,这里需要注意和 delayedExecute 方法的不同点:调用 reExecutePeriodic 方法的时候已经执行过一次任务,所以,并不会触发线程池的拒绝策略;传入 reExecutePeriodic 方法的任务一定是周期性的任务。
onShutdown 方法
onShutdown 方法是 ThreadPoolExecutor 类中的钩子函数,它是在 ThreadPoolExecutor 类中的 shutdown 方法中调用的,而在 ThreadPoolExecutor 类中的 onShutdown 方法是一个空方法,如下所示。
ThreadPoolExecutor 类中的 onShutdown 方法交由子类实现,所以 ScheduledThreadPoolExecutor 类覆写了 onShutdown 方法,实现了具体的逻辑,ScheduledThreadPoolExecutor 类中的 onShutdown 方法的源码实现如下所示。
@Override
void onShutdown() {
//获取队列
BlockingQueue<Runnable> q = super.getQueue();
//在线程池已经调用shutdown方法后,是否继续执行现有延迟任务
boolean keepDelayed = getExecuteExistingDelayedTasksAfterShutdownPolicy();
//在线程池已经调用shutdown方法后,是否继续执行现有定时任务
boolean keepPeriodic = getContinueExistingPeriodicTasksAfterShutdownPolicy();
//在线程池已经调用shutdown方法后,不继续执行现有延迟任务和定时任务
if (!keepDelayed && !keepPeriodic) {
//遍历队列中的所有任务
for (Object e : q.toArray())
//取消任务的执行
if (e instanceof RunnableScheduledFuture<?>)
((RunnableScheduledFuture<?>) e).cancel(false);
//清空队列
q.clear();
}
//在线程池已经调用shutdown方法后,继续执行现有延迟任务和定时任务
else {
//遍历队列中的所有任务
for (Object e : q.toArray()) {
//当前任务是RunnableScheduledFuture类型
if (e instanceof RunnableScheduledFuture) {
//将任务强转为RunnableScheduledFuture类型
RunnableScheduledFuture<?> t = (RunnableScheduledFuture<?>)e;
//在线程池调用shutdown方法后不继续的延迟任务或周期任务
//则从队列中删除并取消任务
if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
t.isCancelled()) {
if (q.remove(t))
t.cancel(false);
}
}
}
}
//最终调用tryTerminate()方法
tryTerminate();
}
复制代码
ScheduledThreadPoolExecutor 类中的 onShutdown 方法的主要逻辑就是先判断线程池调用 shutdown 方法后,是否继续执行现有的延迟任务和定时任务,如果不再执行,则取消任务并清空队列;如果继续执行,将队列中的任务强转为 RunnableScheduledFuture 对象之后,从队列中删除并取消任务。大家需要好好理解这两种处理方式。最后调用 ThreadPoolExecutor 类的 tryTerminate 方法。有关 ThreadPoolExecutor 类的 tryTerminate 方法的源码解析,大家可以参考【高并发专题】中的《高并发之——通过源码深度分析线程池中Worker线程的执行流程》一文,这里不再赘述。
至此,ScheduledThreadPoolExecutor 类中的核心方法的源代码,我们就分析完了。
好了,今天就到这儿吧,我是冰河,我们下期见~~
评论