写点什么

Java 定时任务大盘点:发工资也能“指日可待”

  • 2024-11-12
    北京
  • 本文字数:9358 字

    阅读完需:约 31 分钟

作者:京东保险 孙昊宇

引子:“指日可待”

让我们先从一个成语开始,“指日可待”。没错,我说的就是定时任务😏。



“指日可待”: 为任务指定好日程,就可以安心等待任务执行。


在实际场景中,我们往往需要在特定时间做某件事情,或以某个时间间隔重复某件事情,如定期备份数据、定时取消超时订单等。所有和时间有关的事情,都需要借助定时任务来完成。


定时任务可分为两种:本地定时任务分布式定时任务


本地定时任务,即单机定时任务,适合做那些需要每台机器都执行的任务,如刷新每台机器的本地缓存;分布式定时任务则以一个分布式集群为单位执行任务,适用于支持在分布式场景下任务的高可用。


今天让我们看看 Java 中的本地定时任务,本文将介绍如何使用 Timer、ScheduledExecutorService 和 @Scheduled 三种方式实现本地定时任务。


读完本文,你会发现:原来每月最后一个工作日发工资,也可以用定时任务实现!



一、Timer

Timer,即 java.util.Timer,是来自 Java 1.3 的古老定时器。


要使用 Timer,要先创建一个 TimerTask,作为 Timer 要执行的任务:


// 创建Timer对象Timer timer = new Timer();// 创建TimerTask:task1TimerTask task1 = new TimerTask() {    @Override    public void run() {        System.out.println("task1开始执行:" + new Date().getSeconds());    }};
复制代码


有了 Timer 和 TimerTask,就可以安排任务执行。让我们简单了解下 Timer 的用法:

(1)单次执行

使用 Timer.schedule 方法,只需传入 TimerTask 和延迟时间,即可让任务在指定的延迟时间后执行一次。也可以传入 Date,让任务在指定的时刻执行:


// 5s后执行timer.schedule(task1, 5000);// 指定时刻。如果传入当前时刻,立即执行timer.schedule(task1, new Date());
复制代码


如果直接运行以上代码,会出现“Task already scheduled or cancelled”异常。这是因为一个 TimerTask 只能被 schedule 方法调度一次。如果需要执行两个任务,我们需要创建两个 TimerTask。我们让两个任务在执行时分别打印当前时刻的秒数,全部代码如下:


// 创建Timer对象Timer timer = new Timer();// 创建TimerTask:task1TimerTask task1 = new TimerTask() {    @Override    public void run() {        System.out.println("task1开始执行:" + new Date().getSeconds());    }};// 创建TimerTask:task2TimerTask task2 = new TimerTask() {    @Override    public void run() {        System.out.println("task2开始执行:" + new Date().getSeconds());    }};// 5s后执行timer.schedule(task1, 5000);// 指定时刻。如果传入当前时刻,立即执行timer.schedule(task2, new Date());
复制代码


运行结果如下,可以看到,task2 传入当前时刻立即执行,而 task1 延迟了 5 秒执行。


task2 开始执行:32 task1 开始执行:37

(2)周期性执行

周期性任务可以以固定的周期反复地执行下去。要让 Timer 周期性执行,同样使用重载的 schedule 方法,传入第三个参数 period——执行周期,就可以让 task 以固定频率执行。我们给 task1 传入 period = 3000(ms),让它三秒执行一次:


// 5s后执行,每3s一次timer.schedule(task1, 5000, 3000);
复制代码


运行结果如下:


task1 开始执行:33 task1 开始执行:36 task1 开始执行:39 task1 开始执行:42 ... ...

(3)原理浅析

Timer 是如何实现的?查看 Timer 的源码,发现 Timer 有两个成员变量,它们是 Timer 的核心实现:


TaskQueue:任务队列,其中定义了一个长度为 128 的 TimerTask 数组,根据 TimerTask.nextExecutionTime(下次执行倒计时)维护成了一个最小堆,堆顶就是最近要执行的任务。


TimerThread:任务触发线程,是一个无限循环的线程,它不断从 TaskQueue 堆顶取出最近要执行的任务,判断剩余执行时间,等待指定时间后去执行任务。执行时,根据任务配置(单次执行 or 周期执行),决定是否向任务队列中放入下一次任务。


用堆来实现任务优先级队列是非常高效的办法,因为任务触发线程只关心下一个要执行的任务,即堆顶元素,剩下的任务的剩余时间一定更长,不必有序,只需取走堆顶元素后重新堆化即可,每次操作的时间复杂度是 O(log n)。

(4)存在问题

Timer 的实现方式,导致其存在如下问题:


(1)Timer 只有一个执行任务的线程,即 TimerThread。执行任务时其他任务会阻塞,如果一个任务执行很久,会导致后续任务无法按时执行;


(2)Timer 内部只捕获了 InterruptedException,未捕获运行时异常。如果任务执行过程中抛出运行时异常,线程将直接被杀死,其他任务也将无法执行。


Timer 是 Java 早期的任务调度框架,其缺陷较多,请读者简单了解,非常不建议使用哦。



二、ScheduledExecutorService

ScheduledExecutorService,可以称为“计划线程池”或“调度线程池”,来自于 Java 1.5 的 JUC 包。作为 Java 升级版的任务调度框架,它解决了 Timer 的遗留问题,为多线程场景下的定时任务调度提供了稳定可靠的支持。有了它,再也不需要使用 Timer 了。


作为 Executor 框架的一部分,ScheduledExecutorService 继承了 ExecutorService 接口,其实现类为 ScheduledThreadPoolExecutor。构造方法有 4 个,如下:


ScheduledThreadPoolExecutor(int corePoolSize);ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory); ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler);ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler); 
复制代码


其中 3 参数构造方法实现如下:


public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) {    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory, handler);}
复制代码


根据构造方法,可以看到 ScheduledThreadPoolExecutor 的所有参数如下,其中 3 个参数可指定,4 个参数固定:


① 核心线程数:必传参数,控制执行任务的线程数量;


② 最大线程数:固定为 Integer.MAX_VALUE。在 ScheduledThreadPoolExecutor 中没有作用,实际起作用的是 corePoolSize;


③、④ 空闲线程存活时间:固定为 0,单位为纳秒。


⑤ 任务队列:固定 DelayedWorkQueue 延迟阻塞队列,同样是一个最小堆实现的优先级队列。


⑥ 线程工厂:可以手动设置,建议手动传参,方便设置线程名称。


⑦ 拒绝策略:可以手动设置,如果不指定,默认为 AbortPolicy,拒绝任务并抛出异常。


让我们看看如何使用它。

(1)单次执行

要想让任务只执行一次,使用 schedule 方法即可,有 3 个参数,依次是:要执行的任务方法(实现 Runnable 或 Callable)、延迟的时间、时间单位。注意,ScheduledExecutorService 不支持在指定时刻执行,只能在指定的延迟后执行。示例如下:


// 创建线程池,核心线程数=5System.out.println("创建线程池:" + new Date().getSeconds());ScheduledExecutorService scheduler = new ScheduledThreadPoolExecutor(5, new ThreadFactoryBuilder().setNameFormat("scheduler-thread-%d").build());// 5s后执行任务scheduler.schedule(() -> System.out.println("任务开始执行:" + new Date().getSeconds()), 5, TimeUnit.SECONDS);
复制代码

(2)周期性执行

两种执行方式

周期性执行任务又可细分为:固定频率执行、固定延迟执行。


当以固定频率执行时,以上次任务执行的开始时间到本次任务的开始时间来计算任务周期,不考虑任务的执行时间;


当以固定延迟执行时,以上次任务执行的结束时间到本次任务的开始时间来计算任务周期,即任务的执行时间会影响下次任务的开始时间;


当任务执行时间可以忽略时,两种执行方式效果一样。如果考虑任务的执行时间,如任务周期为 5s,任务执行需要 1s,那么固定频率执行的效果是:0s(开始)、5s(开始)、10s(开始)... ,而固定延迟执行的效果是:0s(开始)、1s(结束)、6s(开始)、7s(结束)、12s(开始)... 。

固定频率执行

要让任务以固定频率执行,使用 scheduleAtFixedRate 方法。它有 4 个参数,依次是:要执行的任务方法(实现 Runnable 或 Callable)、首次执行延迟的时间、执行周期、时间单位。


我们设置延迟时间为 0,周期为 5s,在任务执行中,让任务 sleep 1s,模拟任务耗时,配置如下:


// 创建线程池,核心线程数=5System.out.println("创建线程池:" + new Date().getSeconds());ScheduledExecutorService scheduler = new ScheduledThreadPoolExecutor(5, new ThreadFactoryBuilder().setNameFormat("scheduler-thread-%d").build());
scheduler.scheduleAtFixedRate(() -> { System.out.println("任务开始执行:" + new Date().getSeconds()); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("任务执行完成:" + new Date().getSeconds());}, 0, 5, TimeUnit.SECONDS);
复制代码


效果如下:


创建线程池:4 任务开始执行:4 任务执行完成:5 任务开始执行:9 任务执行完成:10 ... ...


****需要注意的是,如果任务的执行耗时 > 任务周期,即下一个任务要开始时,上一个任务还没结束,则 scheduleAtFixedRate 并不会严格按照预期时间执行,而是会等待上一个任务执行结束后再执行。即:任何情况下,一个周期任务都不会同时存在两个执行中的任务实例。

固定延迟执行

要让任务以固定延迟执行,使用 scheduleWithFixedDelay 方法。其他参数都不变,仅修改方法名,让我们对比下效果:


// 创建线程池,核心线程数=5System.out.println("创建线程池:" + new Date().getSeconds());ScheduledExecutorService scheduler = new ScheduledThreadPoolExecutor(5, new ThreadFactoryBuilder().setNameFormat("scheduler-thread-%d").build());
scheduler.scheduleWithFixedDelay(() -> { System.out.println("任务开始执行:" + new Date().getSeconds()); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("任务执行完成:" + new Date().getSeconds());}, 0, 5, TimeUnit.SECONDS);
复制代码


执行结果如下,可以看到上次任务结束时间和下次任务开始时间的间隔固定,符合我们的预期:


创建线程池:6 任务开始执行:6 任务执行完成:7 任务开始执行:12 任务执行完成:13 ... ...

(3)原理浅析

首先需要清楚,调度线程池和普通线程池最大的区别是:对普通线程池而言,只要线程池中有空闲工作线程,就只管从任务队列中取出任务执行,因此普通线程池的任务队列都是 FIFO 的。而调度线程池必须判断每个任务的剩余等待时间,没有“到点”的任务,是不可以执行的。如何合理地安排每个定时任务的执行时间?这就需要特殊的任务队列(优先级队列)了。


上文提到,ScheduledThreadPoolExecutor 使用的任务队列固定为:DelayedWorkQueue 延迟阻塞队列。这是一个专为 ScheduledThreadPoolExecutor 定制的、满足多线程定时任务设计的任务队列。具有如下特性:


① 【优先性】优先级队列,实现原理和 Timer 相同,同样是按照任务剩余时间构造的最小堆,每次从堆顶取得最近要执行的任务;


② 【无限大】初始大小为 16,每次队列满后自动扩容,可无限扩容到 Integer.MAX_VALUE,因此在添加任务时(offer 方法)不会阻塞;


③ 【并发性】队列操作有锁机制保证线程安全;同时,为了更好管理线程资源,队列采用了 Leader-Follower 的线程模型。

Leader/Follower 模型

为了实现该模型,DelayedWorkQueue 中定义了如下成员变量:


// 队列内的重入锁,保证线程安全private final ReentrantLock lock = new ReentrantLock();// leader线程private Thread leader = null;// lock创建的等待队列private final Condition available = lock.newCondition();
复制代码


首先,队列拥有一个重入锁 lock,所有队列操作都需要先获取这把锁; 一个成员变量 leader,指向下一个要处理队列头部任务的线程,其他空闲的工作线程被称作 follower; 最后是 lock 创建的等待队列 available,所有 follower 都在这里等待,等着成为新的 leader。那么什么时候才会出现岗位空缺(available)呢?请看下文。


刚才提到,leader 就是处理当前队列头任务的线程。leader 首先会判断这个任务的剩余时间,然后等待这个时间。时间一到就取走任务,要去执行,就在 leader 要“卸任”的时候,它需要通知一下排队的继任者(follower)们,于是发出 available.signal()信号,岗位有空缺啦!从而使一个 follower 线程获取锁成为 leader,执行后边任务。最后,所有执行完任务的线程都会重新成为 follower 等着领新一轮的任务,如此循环。这部分实现,请感兴趣的读者阅读 DelayedWorkQueue 的 take()/poll()方法。


如果在 leader 等待时,来了新任务怎么办?先别急,重新堆化,如果新任务没排到队首,说明剩余时间肯定大于队首任务,则不需要着急执行;如果新任务排到了队首,说明这个任务时间最紧急,执行时间已经早于了当前 leader 的苏醒时间了,来不及啦!那么直接把当前的 leader 踢掉,发送 available.signal()信号,召唤新 leader 执行新任务。这部分实现,请感兴趣的读者阅读 DelayedWorkQueue 的 offer()方法。


这种 Leader/Follower 模式最早被应用于多线程网络服务中,通过确保请求接收者和执行者是同一个线程来减少接收者另外创建执行者线程的开销,减少线程间数据交换。在 DelayedWorkQueue 中,这种模式巧妙地确保了在线程池中最多只有一个线程(leader)在等待执行最近的任务,而其他空闲线程可以无限等待直到被唤醒,从而避免多个工作线程同时等待一个任务带来的额外开销。

(4)使用注意

译自 ScheduledThreadPoolExecutor 的官方注释:


虽然这个类继承自 ThreadPoolExecutor,但继承的一些调优方法对它没有用处。特别是,因为它使用 corePoolSize 线程和无界队列充当固定大小的池,所以对 maximumPoolSize 的调整没有任何有用的影响。此外,将 corePoolSize 设置为零或使用 allowCoreThreadTimeout 几乎从来都不是一个好主意,因为这可能会使池中没有线程来处理任务,一旦它们有资格运行。


使用 ScheduledThreadPoolExecutor 的时候,我们需要格外注意的是:线程池大小始终固定为 corePoolSize 不会变,而 maximumPoolSize 没有任何作用,它可不会自己添加工作线程!如果你需要执行多个定时任务,请尽量把 corePoolSize 设置大一些,避免工作线程不够导致任务没能按时执行。



三、@Scheduled

@Scheduled 注解是 Spring 框架提供的通过注解方式实现定时调度的定时任务框架,来自 org.springframework.scheduling 包。该注解主要有三种配置定时的方式,分别支持我们以固定频率、固定延迟或 cron 表达式配置定时任务:


// 固定频率执行,5s一次@Scheduled(fixedRate = 5000)
// 固定延迟执行,5s一次 @Scheduled(fixedDelay= 5000)
// cron表达式,1分钟执行一次@Scheduled(cron = "0 0/1 * * * ?")
复制代码

(1)环境配置

首先让我们看看使用 @Scheduled 注解需要的配置。要让该注解生效,有两种配置方法。


① 注解启动类:在 Spring 启动类添加 @EnableScheduling 注解,才能使 @Scheduled 生效,默认未开启。


② 配置文件:在配置文件中引入 task 的命名空间,并添加注解驱动“annotation-driven”:


<beans xmlns:task="http://www.springframework.org/schema/task" xsi:schemaLocation="http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd"> <task:annotation-driven/>
复制代码

(2)cron 表达式

固定频率执行、固定延迟执行,这两个执行方式前文已经介绍过了,有读者可能要问了,第三种方式 cron 表达式是什么?又该如何使用呢?


cron 始于 linux 下的定时执行工具,是 linux 系统的内置服务。在系统中,可以使用 crontab 命令来设定 cron 服务,cron 会根据命令和执行时间来按时调度工作任务。cron 表达式的功能强大,可以满足各种定制化的定时任务配置需求,远比 fixedRate、fixedDelay 等方式灵活多样,适合各种复杂的定时任务配置。


cron 表达式一共包含 7 个域,每个域代表不同的含义,从左到右依次是:“秒 分 时 日 月 周 年”。这些域以空格隔开组成的字符串就是 cron 表达式。其中,“年”在大多数场景下用不上(极少有以年为周期的任务),因此不是必要的,前 6 个域即可组成一个 cron 表达式。cron 表达式的规则是:


① 【数字】在每个域中,可以填入数字,代表在指定的时刻执行。具体到每个域的数字范围是:


分、秒:0-59; 时:0-23; 日:1-31(视每月情况); 月:1-12(JAN-DEC); 周:1-7(SUN-SAT); 年:1970-2099。


特别提醒:在【周】域中,1=周日、2=周一、... 7=周六。 这块有点反常识,担心记错的话可以填星期的英文缩写(SUN-SAT)。


② 【】如果想让这个域无论等于什么值都执行,请填入【】(通配符)。


这时细心的读者会发现有个 bug:如果配置了【日】为【*】,即每天都执行,且【周】配为【6】,即每周五执行,就会出现互斥:又想每天都执行,又想每周五执行,但并非每天都是周五啊! 为了解决这个问题,cron 表达式设置了【?】符号。


③ 【?】如果这个域的值不关心,请填入【?】(不指定)。这个符号只能填在【日】或【周】域,用来解决两个日期和星期的互斥问题。


用规则①②③搭配就可以创建很多基本的 cron 表达式:


1 * * * * ? 每分钟的第 1 秒执行 0 0 0 * * ? 每天 0 点执行 0 15 10 ? * * 每天上午 10:15 执行


光有这些还不够,cron 表达式还有更加丰富的符号以满足更多样的需求,请接着往下看:


④ 【-】指定取值范围。例:


0 0 9-17 * * ? 每天 9 点到 17 点,每整点执行一次


⑤ 【,】指定多个值。例:


0 0 0 1,15 * ? 每月 1 日、15 日,0 点执行


⑥ 【/】指定起始量/增量,以固定频率执行。例:


0/2 * * * * ? (从每分钟的 0 秒开始)每 2 秒执行一次 0 0 18/1 * * ? 从每天的 18 点开始,每整点执行一次


⑦ 【L】即“LAST”之意,只能填在【日】或【周】域, 代表最后一天。


在【周】域,还可以写“数字 + L”,代表该取值的最后一个,即最后一个周几,例:


0 0 0 L * ? 每月最后一天 0 点执行 0 0 0 ? * L 每周最后一天 0 点执行 0 0 0 ? * WEDL 每月最后一个周三 0 点执行


⑧【W】表示自动匹配工作日,只能填在【日】域,且必须在数字后。如“5W”意为“本月距离 5 号最近的工作日”。


也可以将【L】【W】连用,意为“本月最后一个工作日”。


0 0 0 1W * ? 在本月距离 1 号最近的工作日 0 点执行 0 0 0 LW * ? 在本月最后一个工作日 0 点执行 看到这里,我想你已经明白怎么用定时任务发工资了。这个表达式,真像是为月末发工资准备的呢!🤑


⑨【#】指定第几个周几,只能填在【周】域。 【#】左边填周几,右边填第几个。例:


0 0 0 ? 5 SUN#2 每年母亲节(5 月的第二个星期日)的 0 点执行


强大的 cron 表达式几乎可以满足所有定时任务的需求,如果你迫不及待想尝试一下 cron 表达式并验证效果,推荐这个网站:https://cron.qqe2.com,可以在线生成和验证 cron 表达式,并看到预期执行结果。

(3)单线程 or 多线程?

看完了 cron 表达式,那么问题来了:@Scheduled 注解实现的定时任务是单线程还是多线程呢?如果是多线程,有几个线程?如何控制线程池?


很遗憾,答案是:在默认配置下, @Scheduled 注解是单线程的。 这是因为 @Scheduled 注解默认创建的线程池大小为 1,这显然很可能导致阻塞问题。要想改成多线程,需要手动配置任务线程池,让我们一起看看。


方法①:使用.properties 文件或.yml 文件,直接配置线程池大小


  spring.task.scheduling.pool.size = 10      
复制代码


spring:  task:    scheduling:      pool:        size: 10
复制代码


方法②:手动配置线程池 org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler,并设置线程池大小等参数。ThreadPoolTaskScheduler 即为 @Scheduled 注解所创建的线程池:


@Configuration@EnableSchedulingpublic class TaskSchedulerConfig {    @Bean    public TaskScheduler taskScheduler() {        ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();        scheduler.setPoolSize(10);        scheduler.initialize();        return scheduler;    }}
复制代码


经过以上配置,我们可以让 @Scheduled 注解的定时任务以多线程方式执行,即使 A 任务阻塞,B 任务也不会受影响。这和上文提到的 ScheduledExecutorService 的效果是完全一样的。事实上,ThreadPoolTaskScheduler 就是基于 ScheduledThreadPoolExecutor 实现的,其部分源码:


public class ThreadPoolTaskScheduler extends ExecutorConfigurationSupport      implements AsyncListenableTaskExecutor, SchedulingTaskExecutor, TaskScheduler {
// ScheduledThreadPoolExecutor.setRemoveOnCancelPolicy(boolean) only available on JDK 7+ private static final boolean setRemoveOnCancelPolicyAvailable = ClassUtils.hasMethod(ScheduledThreadPoolExecutor.class, "setRemoveOnCancelPolicy", boolean.class);

private volatile int poolSize = 1;
private volatile boolean removeOnCancelPolicy = false;
private volatile ErrorHandler errorHandler;
private volatile ScheduledExecutorService scheduledExecutor;

/** * Set the ScheduledExecutorService's pool size. * Default is 1. * <p><b>This setting can be modified at runtime, for example through JMX.</b> */ public void setPoolSize(int poolSize) { Assert.isTrue(poolSize > 0, "'poolSize' must be 1 or higher"); this.poolSize = poolSize; if (this.scheduledExecutor instanceof ScheduledThreadPoolExecutor) { ((ScheduledThreadPoolExecutor) this.scheduledExecutor).setCorePoolSize(poolSize); } } // ... ... }
复制代码


****不难发现,Spring 的 ThreadPoolTaskScheduler 就是在 ScheduledThreadPoolExecutor 基础上封装了一层,且默认的线程池大小为 1。在原线程池的基础上增加了注解驱动、cron 表达式解析等功能,更加方便了我们的使用。

(4)关于异步执行

以上提到的所有任务调度都有一个共同点:我们可以通过线程池让不同任务的执行互不干扰,但对于同一任务,当上一次执行未完成时,即使到了下一次执行时间,下一次执行还是会等待,即不会出现一个任务同时存在两个执行中的任务实例。如果想让一个任务的每次执行都互不影响呢?


@Async 注解可以帮助我们,它可以支持异步地执行方法,每次执行都会另起线程。我们将它和 @Scheduled 注解一起加在方法上:


@Async@Scheduled(cron = "0 0/1 * * * ?")public void task() {    // do something}
复制代码


这样,就可以让该任务的每次执行都互不影响。我们还可以在注解中指定异步方法执行的线程池,如 @Async("asyncExecutor"),且 asyncExecutor 应为 ThreadPoolTaskScheduler 类型。


特别提醒:原则上讲,不应该,也没有必要让一个周期性任务异步执行。一旦允许异步,如果该任务卡死,后续本类任务不再阻塞,还会继续起新线程,并不断卡死,很快把任务线程池打满,最后阻塞所有的定时任务,造成严重后果。如果一定要使用,请为异步定时任务手动指定一个单独的任务线程池,并配置好最大等待时长(setAwaitTerminationSeconds),避免无限阻塞。

发布于: 刚刚阅读数: 3
用户头像

拥抱技术,与开发者携手创造未来! 2018-11-20 加入

我们将持续为人工智能、大数据、云计算、物联网等相关领域的开发者,提供技术干货、行业技术内容、技术落地实践等文章内容。京东云开发者社区官方网站【https://developer.jdcloud.com/】,欢迎大家来玩

评论

发布
暂无评论
Java定时任务大盘点:发工资也能“指日可待”_京东科技开发者_InfoQ写作社区