写点什么

Java Core 「16」J.U.C Executor 框架之 ScheduledThreadPoolExecutor

作者:Samson
  • 2022 年 6 月 23 日
  • 本文字数:3855 字

    阅读完需:约 13 分钟

摘要

前面《Java Core 「15」J.U.C Executor 框架》中学习了 ThreadPoolExecutor,其基于 Executor 框架,实现了线程池功能。那 ScheduledThreadPoolExecutor 又是解决什么问题的呢?从前面的文章中,我们了解到,提交到线程池的任务都是一次性的,即执行完就结束了。ScheduledThreadPoolExecutor 解决的是周期性或延迟任务的线程池。

相比于 ThreadPoolExecutor,ScheduledThreadPoolExecutor 有什么特点呢?

  • 可接受周期性任务,例如每隔 3s 执行一次,延迟任务,例如 5s 后执行;也可以接受普通的任务(由父类 ThreadPoolExecutor 执行)。

  • 实现了特殊的阻塞队列 DelayedWorkQueue,无界延迟队列的一种,用来存储周期性或延时任务。

  • 线程池 shutdown 之后,周期性或延时任务是否可继续执行可通过 run-after-shutdown 参数配置。

接下来,我们将详细地学习这些特点。

01-周期性任务 & 延时任务

ScheduledThreadPoolExecutor 继承自 ThreadPoolExecutor,并实现了 ScheduledExecutorService 接口。而正是后者定义了周期性任务执行(scheduleAtFixedRate / scheduleWithFixedDelay 方法)和延时任务执行(schedule 方法)。

public ScheduledFuture<?> schedule(Runnable command,                                   long delay,                                   TimeUnit unit) {    if (command == null || unit == null)        throw new NullPointerException();    /** decorateTask 的默认实现就是返回第二个入参,是预留的扩展点     * 默认实现等价于: t = new ScheduleFutureTask<Void>(...);     */    RunnableScheduledFuture<Void> t = decorateTask(command,        new ScheduledFutureTask<Void>(command, null,                                      triggerTime(delay, unit),                                      sequencer.getAndIncrement()));    delayedExecute(t);    return t;}
复制代码

schedule 方法共有两个版本,区别在于第一个参数是 Runnable 还是 Callable。ScheduledFutureTask 继承自 FutureTask,当入参为 Runnable 时,会被Executors.*callable*(runnable, null);封装,赋值给继承自 FutureTask 的callable变量。

schedule 方法大致上可以分为两步:

  1. 执行 decorateTask,将任务封装为 RunnableScheduledFuture。

  2. 执行 delayedExecute,其源码如下:

    private void delayedExecute(RunnableScheduledFuture<?> task) { if (isShutdown()) reject(task); /** 若线程池已 shutdown,则拒绝任务 */ else { /** 线程池在运行,任务添加到阻塞队列中,取到的是 DelayedWorkQueue */ super.getQueue().add(task); /** 双重检查,若线程池当前不能执行任务,则取消任务 * canRunInCurrentRunState 包含了线程池 shutdown 后, * 对 run-after-shutdown 参数的判断逻辑 */ if (!canRunInCurrentRunState(task) && remove(task)) task.cancel(false); else /** ThreadPoolExecutor 中的方法,工作线程小于 corePoolSize 时,添加 Worker */ ensurePrestart(); } }

scheduleAtFixedRate 方法以固定的频率周期性执行任务,其源码如下:

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,                                              long initialDelay,                                              long period,                                              TimeUnit unit) {    if (command == null || unit == null)        throw new NullPointerException();    /** 以上与 schedule 方法逻辑一致 */    if (period <= 0L)        throw new IllegalArgumentException();    ScheduledFutureTask<Void> sft =        new ScheduledFutureTask<Void>(command,                                      null,                                      triggerTime(initialDelay, unit), /** 第一次执行的时间 */                                      unit.toNanos(period),  /** 注意区别这里,之后每隔 period 就执行,不等待第一次执行完毕就开始计时 */                                      sequencer.getAndIncrement());    RunnableScheduledFuture<Void> t = decorateTask(command, sft);    sft.outerTask = t;    delayedExecute(t);    return t;}
复制代码

scheduleWithFixedDelay 以固定的延时周期性执行任务,其源码如下:

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,                                                 long initialDelay,                                                 long delay,                                                 TimeUnit unit) {    if (command == null || unit == null)        throw new NullPointerException();    /** 以上与 schedule 方法逻辑一致 */    if (delay <= 0L)        throw new IllegalArgumentException();    ScheduledFutureTask<Void> sft =        new ScheduledFutureTask<Void>(command,                                      null,                                      triggerTime(initialDelay, unit), /** 第一次执行的时间 */                                      -unit.toNanos(delay),  /** 注意区别这里,在上次执行完毕后,延迟 delay 后执行 */                                      sequencer.getAndIncrement());    RunnableScheduledFuture<Void> t = decorateTask(command, sft);    sft.outerTask = t;    delayedExecute(t);    return t;}
复制代码

scheduleAtFixedRate / scheduleWithFixedDelay 方法与前面的 schedule 方法步骤大致一样,同样分为两步:

  1. 将 Runnable 任务封装为 ScheduledFutureTask 对象;有两点需要注意:a) 没有 Callable 对象的接口,不过这也比较容易理解,周期性执行的任务会重复多次,返回值不太重要。b) 两个函数在 period 入参上取值有区别,前者为正值,后者为负值。

  2. 执行 delayedExecute 方法,与延时任务一致。

02-内部类

ScheduledThreadPoolExecutor 中包含两个内部类,ScheduledFutureTask 和 DelayedWorkQueue,前者是周期性任务对象,后者是存储周期性任务的阻塞队列。

ScheduledFutureTask 继承了 FutureTask,是一个异步运算任务;并且实现了 Runnable、Future、Delayed 接口,说明它是一个可以延迟执行的异步运算任务。

DelayedWorkQueue 继承了 AbstractQueue,并且为了契合父类 ThreadPoolExecutor,它还实现了 BlockingQueue 接口。在其内部,只允许存储 RunnableScheduledFuture 类型的对象, 而 ScheduledFutureTask 也实现了 RunnableScheduledFuture 接口。

ScheduledFutureTask 中与时间相关的两个关键属性:

//任务可以执行的时间,纳秒级,由 triggerTime 计算出private long time;//重复任务的执行周期时间,纳秒级,正数表示固定频率执行,负数表示固定延时执行,0表示不重复的任务private final long period;
复制代码

既然 ScheduledFutureTask 实现了 FutureTask 接口,其关键方法之一就是 run 方法,在线程执行时会调用:

public void run() {    /** 线程池能否执行任务,包含了对 run-after-shutdown 参数的判断 */    if (!canRunInCurrentRunState(this))        cancel(false);    else if (!isPeriodic())  /** 说明是普通的一次性任务,由父类 ThreadPoolExecutor 执行 */        super.run();    /** 与 super.run 基本一样,只不过有返回值,     * 当返回 true 说明调用 callable 计算正常,     * 且中间无任何异常、中断 */    else if (super.runAndReset()) {        /** 计算下一次执行的时间 */        setNextRunTime();        /** outerTask 就是当前对象本身,         * 如果线程池当前可以执行当前任务,则将当前任务重新加入到阻塞队列中,否则就取消任务         * double-check 若不能执行,则移除,并取消任务,         * 否则,若能执行,检查 corePoolSize,并据此判断是否需要创建线程         */        reExecutePeriodic(outerTask);    }}
复制代码

03-run-after-shutdown 参数

ScheduledThreadPoolExecutor 与 ThreadPoolExecutor 的不同之处是可以配置周期性、延迟任务在线程池 shutdown 之后,仍然运行。配置这种行为的参数被称为是 run-after-shutdown 参数。

/** 关闭后继续执行已经存在的周期任务 */private volatile boolean continueExistingPeriodicTasksAfterShutdown;/** 关闭后继续执行已经存在的延时任务 */ private volatile boolean executeExistingDelayedTasksAfterShutdown = true;/** 取消任务后移除 */ private volatile boolean removeOnCancel = false;/** 为相同延时的任务提供的顺序编号,保证任务之间的FIFO顺序 */private static final AtomicLong sequencer = new AtomicLong();
复制代码



历史文章推荐

Java Core 「15」J.U.C Executor 框架

Java Core 「14」J.U.C 线程池 -Future & FutureTask

Java Core 「13」ReentrantReadWriteLock 再探析

Java Core 「12」ReentrantLock 再探析

Java Core 「11」AQS-AbstractQueuedSynchronizer

Java Core 「10」J.U.C 同步工具类 -2

Java Core 「9」J.U.C 同步工具类 -1

Java Core 「8」字节码增强技术

Java Core 「7」各种不同类型的锁

Java Core「6」反射与 SPI 机制

Java Core「5」自定义注解编程

Java Core「4」java.util.concurrent 包简介

发布于: 刚刚阅读数: 3
用户头像

Samson

关注

还未添加个人签名 2019.07.22 加入

还未添加个人简介

评论

发布
暂无评论
Java Core 「16」J.U.C Executor 框架之 ScheduledThreadPoolExecutor_学习笔记_Samson_InfoQ写作社区