写点什么

线程池异常处理的 5 中方式

作者:FunTester
  • 2024-12-18
    河北
  • 本文字数:5577 字

    阅读完需:约 18 分钟

在我进行 Java 编程实践当中,特别是高性能编程时,线程池是无法逾越的高山。在最近攀登高山的路途上,我又双叒叕掌握了一些优雅地使用线程池的技巧。


通常我会将异步任务丢给线程池去处理,不怎么会额外处理异步任务执行中报错。一般来说,任务执行报错,会终止当前的线程,这样线程池会创建新的线程执行下一个任务,当然是在需要创建线程和可以创建新线程的前提下。


在我最近一次实践当中,发现一个定长 20 的线程池,已经创建过上万个线程,这让我大呼不可能。仔细一想,最终也在日志当中确认了大量的异步任务报错。所以不得不让我开始研究如何处理线程池中异步任务的异常了。


以下是我的研究报告,诚邀各位共赏。


就我的水平而言,总计发现 5 种常见的异常处理方式。

try-catch

在提交异步任务之前,通常我们会对异步任务检查异常进行处理,但是对于诸如 java.lang.RuntimeException 的非检查异常不会做更多操作。


当我们提交异步任务的时候,可以增加一个 try-catch 处理的话,就可以完全 hold 住异步任务的可能抛出的异常。


在我的框架设计当中,提交异步任务有且仅一个入口,代码如下:


/   * 异步执行某个代码块   * Java调用需要return,Groovy也不需要,语法兼容   *   * @param f   */  public static void fun(Closure f) {      fun(f, null, true);  }
/ * 使用自定义同步器{@link FunPhaser}进行多线程同步 * * @param f 代码块 * @param phaser 同步器 */ public static void fun(Closure f, FunPhaser phaser) { fun(f, phaser, true); } / * 使用自定义同步器{@link FunPhaser}进行多线程同步 * * @param f * @param phaser * @param log */ public static void fun(Closure f, FunPhaser phaser, boolean log) { if (phaser != null) phaser.register(); ThreadPoolUtil.executeSync(() -> { try { ThreadPoolUtil.executePriority(); //处理高优任务,可忽略 f.call(); } finally { if (phaser != null) { phaser.done(); if (log) logger.info("async task {}", phaser.queryTaskNum()); } } }); }
复制代码


所以改造起来比较简单,只需要在最后的方法中,增加 catch 代码块即可。


/   * 使用自定义同步器{@link FunPhaser}进行多线程同步   * @param f   * @param phaser   * @param log   */  public static void fun(Closure f, FunPhaser phaser, boolean log) {      if (phaser != null) phaser.register();      ThreadPoolUtil.executeSync(() -> {          try {              ThreadPoolUtil.executePriority();              f.call();          } catch (Exception e) {              logger.error("fun error", e);          } finally {              if (phaser != null) {                  phaser.done();                  if (log) logger.info("async task {}", phaser.queryTaskNum());              }          }      });  }
复制代码

Callable

在 Java 中,Callable 是一种可以抛出受检异常(Checked Exception)的任务接口。这与 Runnable 的不同之处在于,Callable 能够返回结果,并允许在任务执行过程中抛出异常。异常处理通常在获取任务结果时完成,以下是一些常见的处理方式。


Callable 异常处理的特点:


  1. 异常机制:

  2. Callable 方法签名为 V call() throws Exception,允许直接抛出 Exception 或其子类。

  3. 提交到线程池的 Callable 任务,如果抛出异常,会被封装到 ExecutionException 中。

  4. 获取异常:

  5. 通过 Future.get() 获取结果时,若任务抛出异常,则会引发 ExecutionException

  6. 开发者需要在调用 get() 时捕获和处理 ExecutionException


演示代码如下:


import java.util.concurrent.*;
public class CallableExceptionExample { public static void main(String[] args) { ExecutorService executor = Executors.newSingleThreadExecutor();
Callable<String> task = () -> { if (true) { // 模拟任务中出错 throw new Exception("Simulated Exception"); } return "Task Completed"; }; Future<String> future = executor.submit(task); try { // 获取任务结果,可能抛出 ExecutionException String result = future.get(); System.out.println(result); } catch (ExecutionException e) { System.out.println("Caught an ExecutionException: " + e.getCause().getMessage()); } catch (InterruptedException e) { System.out.println("Task was interrupted"); Thread.currentThread().interrupt(); // 恢复中断状态 } }}
复制代码


控制台输出:


Caught an ExecutionException: Simulated Exception
复制代码


Callable 抛出异常时,线程池会捕获这个异常,并将其封装在 ExecutionException 中。如果任务在执行过程中被中断,会抛出 InterruptedException。建议在捕获时恢复线程的中断状态,以避免吞掉中断信号。

afterExecute()

在 Java 中,afterExecute()ThreadPoolExecutor 提供的一个钩子方法,允许开发者在每个任务执行完成后执行一些额外的逻辑。它可以用来捕获线程池任务中抛出的运行时异常和其他异常,从而进行集中处理或记录。


afterExecute() 的作用:


  1. 触发时机:

  2. 每当线程池中某个任务完成后,无论是正常完成还是抛出异常,都会调用 afterExecute()

  3. 默认实现为空,用户可以重写它以添加自定义行为。

  4. 异常捕获:

  5. 如果任务在执行过程中抛出异常(例如 RuntimeExceptionError),它会作为参数传递给 afterExecute()Throwable 参数。

  6. 需要手动从 Future 中获取异常,或者在异常处理逻辑中记录。


演示案例如下:


import java.util.concurrent.*;
public class AfterExecuteExample { public static void main(String[] args) { ThreadPoolExecutor executor = new CustomThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
// 提交一个会抛出异常的任务 executor.submit(() -> { System.out.println("Task started"); throw new RuntimeException("Task failed with exception"); });
executor.shutdown(); }
static class CustomThreadPoolExecutor extends ThreadPoolExecutor { public CustomThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); }
@Override protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); // 调用父类实现以确保正常行为
// 处理任务异常 if (t != null) { System.err.println("Task threw an exception: " + t.getMessage()); } else if (r instanceof Future<?>) { try { ((Future<?>) r).get(); // 获取任务执行结果或捕获异常 } catch (CancellationException e) { System.err.println("Task was cancelled"); } catch (ExecutionException e) { System.err.println("Task threw an exception: " + e.getCause().getMessage()); } } } }}
复制代码


最终控制台输出:


Task startedTask threw an exception: Task failed with exception
复制代码


如果需要在任务开始和结束时都执行逻辑,可以同时重写 beforeExecute()afterExecute()。重写此方法时,建议注意线程中断信号的恢复,并确保异常记录逻辑不会引发额外的错误。

自定义 ThreadFactory

在 Java 中,如果需要自定义线程的异常处理行为,可以通过 自定义 ThreadFactory 创建线程并设置异常处理策略。线程的异常处理主要依赖于 Thread.UncaughtExceptionHandler 接口,该接口用于处理线程运行时未捕获的异常。


步骤概览:


  1. 创建自定义 ThreadFactory

  2. 实现 ThreadFactory 接口,定制线程的创建逻辑。

  3. 在创建线程时,设置自定义的 UncaughtExceptionHandler

  4. 实现异常处理逻辑:

  5. 使用 java.lang.Thread#setUncaughtExceptionHandler,定义异常处理行为(如日志记录、发送警报等)。

  6. 将自定义 ThreadFactory 应用于线程池:

  7. 在创建线程池时,通过 ExecutorsThreadPoolExecutor 使用自定义的 ThreadFactory


import java.util.concurrent.*;
public class CustomThreadFactoryExample {
public static void main(String[] args) { // 使用自定义线程工厂创建线程池 ExecutorService executor = Executors.newFixedThreadPool(2, new CustomThreadFactory());
// 提交任务 executor.submit(() -> { System.out.println("Task 1 started"); throw new RuntimeException("Task 1 encountered an error"); });
executor.submit(() -> System.out.println("Task 2 completed"));
executor.shutdown(); }
// 自定义 ThreadFactory 实现 static class CustomThreadFactory implements ThreadFactory { private int threadId = 0;
@Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setName("CustomThread-" + threadId++); thread.setUncaughtExceptionHandler(new CustomExceptionHandler()); return thread; } }
// 自定义异常处理器 static class CustomExceptionHandler implements Thread.UncaughtExceptionHandler { @Override public void uncaughtException(Thread t, Throwable e) { System.err.println("Thread " + t.getName() + " threw an exception: " + e.getMessage()); // 这里可以添加更多处理逻辑,例如日志记录或警报通知 } }}
复制代码


控制台输出:


Task 1 startedThread CustomThread-0 threw an exception: Task 1 encountered an errorTask 2 completed
复制代码


使用 Executors.newFixedThreadPool() 并传入自定义的 ThreadFactory,让线程池中的每个线程具备统一的异常处理行为。也可以通过线程标识为每个线程设置了自定义的 UncaughtExceptionHandler

全局异常处理

在 Java 中,Thread.setDefaultUncaughtExceptionHandler 是一个全局异常处理机制,用于处理所有未被捕获的线程异常。与每个线程单独设置的 Thread.setUncaughtExceptionHandler 不同,setDefaultUncaughtExceptionHandler 提供了一个全局级别的异常处理器,适用于所有线程(除非线程单独设置了自己的处理器)。


Thread.setDefaultUncaughtExceptionHandler 方法作用:


  • 用于设置一个全局的默认未捕获异常处理器。

  • 如果某个线程未显式设置自己的 UncaughtExceptionHandler,则会使用这个默认处理器。

  • 通常用于记录日志、发送报警等全局异常处理逻辑。


演示代码如下:


public class DefaultExceptionHandlerExample {
public static void main(String[] args) { // 设置全局默认异常处理器 Thread.setDefaultUncaughtExceptionHandler((thread, throwable) -> { System.err.println("Unhandled exception in thread: " + thread.getName()); System.err.println("Exception: " + throwable.getMessage()); throwable.printStackTrace(); });
// 创建一个线程,抛出未捕获的异常 Thread thread1 = new Thread(() -> { throw new RuntimeException("Thread 1 failed!"); }); thread1.start();
// 创建另一个线程,也抛出未捕获的异常 Thread thread2 = new Thread(() -> { throw new RuntimeException("Thread 2 encountered an error!"); }); thread2.start(); }}
复制代码


控制台如下:


Unhandled exception in thread: Thread-0Exception: Thread 1 failed!java.lang.RuntimeException: Thread 1 failed!    at ...
Unhandled exception in thread: Thread-1Exception: Thread 2 encountered an error!java.lang.RuntimeException: Thread 2 encountered an error! at ...
复制代码


全局异常处理只针对主线程和未显式设置 UncaughtExceptionHandler 的其他线程生效。


异常处理的优先级:


  • 如果线程显式设置了自己的 UncaughtExceptionHandler(通过 thread.setUncaughtExceptionHandler),那么会优先调用该处理器。

  • 如果线程未设置单独的处理器,则调用全局默认处理器。

  • 如果没有设置全局默认处理器,未捕获的异常将打印到标准错误输出流。


如果主线程抛出异常,Thread.setDefaultUncaughtExceptionHandler 无法捕获它。需要在 main 方法中显式处理。如果使用线程池(如 ExecutorService),未捕获的异常通常会被封装为 ExecutionException,不会触发默认处理器。需要使用 Future.get() 或重写 afterExecute() 来处理线程池的任务异常。

发布于: 刚刚阅读数: 3
用户头像

FunTester

关注

公众号:FunTester,800篇原创,欢迎关注 2020-10-20 加入

Fun·BUG挖掘机·性能征服者·头顶锅盖·Tester

评论

发布
暂无评论
线程池异常处理的 5 中方式_FunTester_InfoQ写作社区