写点什么

ScheduledThreadPoolExecutor 踩过最痛的坑

作者:小小怪下士
  • 2022-10-21
    湖南
  • 本文字数:3901 字

    阅读完需:约 1 分钟

概述

最近项目上反馈某个重要的定时任务突然不执行了,很头疼,开发环境和测试环境都没有出现过这个问题。定时任务采用的是ScheduledThreadPoolExecutor,后来一看代码发现踩了一个大坑....

还原"大坑"

这个坑就是如果ScheduledThreadPoolExecutor中执行的任务出错抛出异常后,不仅不会打印异常堆栈信息,同时还会取消后面的调度, 直接看例子。


@Testpublic void testException() throws InterruptedException {    // 创建1个线程的调度任务线程池    ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();    // 创建一个任务    Runnable runnable = new Runnable() {
volatile int num = 0;
@Override public void run() { num ++; // 模拟执行报错 if(num > 5) { throw new RuntimeException("执行错误"); } log.info("exec num: [{}].....", num); } };
// 每隔1秒钟执行一次任务 scheduledExecutorService.scheduleAtFixedRate(runnable, 0, 1, TimeUnit.SECONDS); Thread.sleep(10000);}
复制代码


运行结果:



  • 只执行了 5 次后,就不打印,不执行了,因为报错了

  • 任务报错,也没有打印一次堆栈,更导致调度任务取消,后果十分严重。

解决方案

解决方法也非常简单,只要通过 try catch 捕获异常即可。



运行结果:



看到不仅打印了异常堆栈,而且也会进行周期性的调度。

更推荐的做法

更好的建议可以在自己的项目中封装一个包装类,要求所有的调度都提交通过我们统一的包装类, 如下代码:


@Slf4jpublic class RunnableWrapper implements Runnable {    // 实际要执行的线程任务    private Runnable task;    // 线程任务被创建出来的时间    private long createTime;    // 线程任务被线程池运行的开始时间    private long startTime;    // 线程任务被线程池运行的结束时间    private long endTime;    // 线程信息    private String taskInfo;
private boolean showWaitLog;
/** * 执行间隔时间多久,打印日志 */ private long durMs = 1000L;
// 当这个任务被创建出来的时候,就会设置他的创建时间 // 但是接下来有可能这个任务提交到线程池后,会进入线程池的队列排队 public RunnableWrapper(Runnable task, String taskInfo) { this.task = task; this.taskInfo = taskInfo; this.createTime = System.currentTimeMillis(); }
public void setShowWaitLog(boolean showWaitLog) { this.showWaitLog = showWaitLog; }
public void setDurMs(long durMs) { this.durMs = durMs; }
// 当任务在线程池排队的时候,这个run方法是不会被运行的 // 但是当任务结束了排队,得到线程池运行机会的时候,这个方法会被调用 // 此时就可以设置线程任务的开始运行时间 @Override public void run() { this.startTime = System.currentTimeMillis();
// 此处可以通过调用监控系统的API,实现监控指标上报 // 用线程任务的startTime-createTime,其实就是任务排队时间 // 这边打印日志输出,也可以输出到监控系统中 if(showWaitLog) { log.info("任务信息: [{}], 任务排队时间: [{}]ms", taskInfo, startTime - createTime); }
// 接着可以调用包装的实际任务的run方法 try { task.run(); } catch (Exception e) { log.error("run task error", e); throw e; }
// 任务运行完毕以后,会设置任务运行结束的时间 this.endTime = System.currentTimeMillis();
// 此处可以通过调用监控系统的API,实现监控指标上报 // 用线程任务的endTime - startTime,其实就是任务运行时间 // 这边打印任务执行时间,也可以输出到监控系统中 if(endTime - startTime > durMs) { log.info("任务信息: [{}], 任务执行时间: [{}]ms", taskInfo, endTime - startTime); }
}}
复制代码


使用:



我们还可以在包装类里面封装各种监控行为,如本例打印日志执行时间等。

原理探究

那大家有没有想过为什么任务出错会导致异常无法打印,甚至调度都取消了呢?让我们从源码出发,一探究竟。


  1. 下面是调度任务的入口方法。


// ScheduledThreadPoolExecutor#scheduleAtFixedRatepublic ScheduledFuture<?> scheduleAtFixedRate(Runnable command,                                              long initialDelay,                                              long period,                                              TimeUnit unit) {    if (command == null || unit == null)        throw new NullPointerException();    if (period <= 0)        throw new IllegalArgumentException();    // 将执行任务和参数包装成ScheduledFutureTask对象    ScheduledFutureTask<Void> sft =        new ScheduledFutureTask<Void>(command,                                      null,                                      triggerTime(initialDelay, unit),                                      unit.toNanos(period));    RunnableScheduledFuture<Void> t = decorateTask(command, sft);    sft.outerTask = t;    // 延迟执行    delayedExecute(t);    return t;}
复制代码


这个方法主要做了两个事情:


  • 将执行任务和参数包装成 ScheduledFutureTask 对象

  • 调用delayedExecute方法延迟执行任务


  1. 延迟或周期性任务的主要执行方法, 主要是将任务丢到队列中,后续由工作线程获取执行。


// ScheduledThreadPoolExecutor#delayedExecuteprivate void delayedExecute(RunnableScheduledFuture<?> task) {        if (isShutdown())            reject(task);        else {            // 将任务丢到阻塞队列中            super.getQueue().add(task);            if (isShutdown() &&                !canRunInCurrentRunState(task.isPeriodic()) &&                remove(task))                task.cancel(false);            else                // 开启工作线程,去执行任务,或者从队列中获取任务执行                ensurePrestart();        }    }
复制代码


  1. 现在任务已经在队列中了,我们看下任务执行的内容是什么,还记得前面的包装对象ScheduledFutureTask类,它的实现类是ScheduledFutureTask,继承了 Runnable 类。


// ScheduledFutureTask#run方法public void run() {    // 是不是周期性任务    boolean periodic = isPeriodic();    if (!canRunInCurrentRunState(periodic))        cancel(false);    // 不是周期性任务的话, 直接调用一次下面的run        else if (!periodic)        ScheduledFutureTask.super.run();    // 如果是周期性任务,则调用runAndReset方法,如果返回true,继续执行    else if (ScheduledFutureTask.super.runAndReset()) {        // 设置下次调度时间        setNextRunTime();        // 重新执行调度任务        reExecutePeriodic(outerTask);    }}
复制代码


  • 这里的关键就是看ScheduledFutureTask.super.runAndReset()方法是否返回 true,如果是 true 的话继续调度。


  1. runAndReset 方法也很简单,关键就是看报异常如何处理。


// FutureTask#runAndResetprotected boolean runAndReset() {    if (state != NEW ||        !UNSAFE.compareAndSwapObject(this, runnerOffset,                                     null, Thread.currentThread()))        return false;    // 是否继续下次调度,默认false    boolean ran = false;    int s = state;    try {        Callable<V> c = callable;        if (c != null && s == NEW) {            try {                // 执行任务                c.call();                 // 执行成功的话,设置为true                ran = true;
// 异常处理,关键点 } catch (Throwable ex) { // 不会修改ran的值,最终是false,同时也不打印异常堆栈 setException(ex); } } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } // 返回结果 return ran && s == NEW;}
复制代码


  • 关键点 ran 变量,最终返回是不是下次继续调度执行

  • 如果抛出异常的话,可以看到不会修改 ran 为 true。

总结

Java 的 ScheduledThreadPoolExecutor 定时任务线程池所调度的任务中如果抛出了异常,并且异常没有捕获直接抛到框架中,会导致 ScheduledThreadPoolExecutor 定时任务不调度了。这个结论希望大家一定要记住,不然非常坑,关键是有时候测试环境、开发环境还无法复现,有一定的随机性,真的到了生产就完蛋了。


关于这些知识点,我们不仅要知其然,还要知其所以然,这样才会记忆深刻,不然很容易遗忘。

用户头像

还未添加个人签名 2022-09-04 加入

热衷于分享java技术,一起交流学习,探讨技术。 需要Java相关资料的可以+v:xiaoyanya_1

评论

发布
暂无评论
ScheduledThreadPoolExecutor踩过最痛的坑_Java_小小怪下士_InfoQ写作社区