写点什么

Java 线程池进阶

作者:木小风
  • 2022 年 2 月 27 日
  • 本文字数:2579 字

    阅读完需:约 8 分钟

Java线程池进阶

线程池是日常开发中常用的技术,使用也非常简单,不过想使用好线程池也不是件容易的事,开发者需要不断探索底层的实现原理,才能在不同的场景中选择合适的策略,最大程度发挥线程池的作用以及避免踩坑。

一、线程池工作流程

以下是 Java 线程池的工作流程,涉及创建线程的参数及拒绝策略,如果读者对这部分内容不太了解,可参考其他的文档,本文不在赘述。



二、线程池进阶

1、线程池的创建

需要手动通过 ThreadPoolExecutor 创建,使用者要非常明确业务场景并定制线程池,避免误用可能导致的问题。

以下是阿里巴巴 Java 开发手册中的描述:



ThreadFactory:推荐使用 guava 中的 ThreadFactoryBuilder 创建:

new ThreadFactoryBuilder().setNameFormat("name-%d").build();
复制代码

2、阻塞队列在线程池中的使用

很多同学一看到阻塞队列就自然的认为出入队列都是阻塞的,使用的阻塞队列也就没必要关心拒绝策略了,其实不然,阻塞队列在任务提交和任务获取阶段使用了不同的策略。

任务提交阶段:调用的阻塞队列的 offer 方法,这个方法是非阻塞的,如果插入队列失败会直接返回 false,并触发拒绝策略;

获取任务阶段:使用的是 take 方法,此方法是阻塞的;

3、保证提交阶段任务不丢失

有三种方法:使用 CallerRunsPolicy 拒绝策略、自定义拒绝策略、使用 MQ 系统保证任务不丢失。

(1)CallerRunsPolicy 拒绝策略

ThreadPoolExecutor.CallerRunsPolicy:由提交任务的线程处理

这种是最简单的策略,但需要注意的是如果任务耗时较长,会阻塞提交任务的线程,可能会成为系统瓶颈。

(2)自定义拒绝策略

既然 Java 线程默认使用的是 offer 提交任务,那我们可以自定义拒绝策略在任务提交失败时改为 put 阻塞提交。

缺点也是会阻塞提交线程,不过相比 CallerRunsPolicy 策略更能发挥多线程的优势。

 RejectedExecutionHandler executionHandler = (r, executor) -> {   try {​     executor.getQueue().put(r);   } catch (InterruptedException e) {​     Thread.currentThread().interrupt();​     throw new RejectedExecutionException("Producer thread interrupted", e);   } };
复制代码

(3)配合 MQ 保证任务不丢失

使用默认的 ThreadPoolExecutor.AbortPolicy 策略,如果抛出 RejectedExecutionException 异常则返回给 MQ 消费失败,MQ 会保证自动重试。

4、保证队列、未执行完成的任务不丢失

当服务停止的时候,线程池中队列和活跃线程中未执行完成的任务可能会造成数据丢失,首先说下结论:无论采取任何策略,在 Java 层都不能 100%保证不丢,比如机器突然断电的情况。我们还是可以采取一定的措施尽量避免任务丢失。

(1)线程池关闭

线程池关闭有两个方法:

shutdownNow 方法:线程池拒绝接收新提交的任务,同时立马关闭线程池,线程池里的任务不再执行,并抛出 InterruptedException 异常。

shutdown 方法:线程池拒绝接收新提交的任务,同时等待线程池里的任务执行完毕后关闭线程池。

(2)注册关闭钩子

使用以下方法注册 JVM 进程关闭钩子,在钩子方法中执行线程池关闭、未处理完成的任务持久化保存等。

Runtime.getRuntime().addShutdownHook()
复制代码

需要注意的是:钩子方法在使用 kill -9 杀死进程时不会执行,一般的杀进程的方式是先执行 kill,等待一段时间,如果进程还没杀死,再执行 kill -9。

要保证队列中的任务不丢失,需要消费队列中的数据,发送到外部 MQ 中;

保证未执行完成的任务不丢失,需要在抛出 InterruptedException 异常后,将任务参数保证到 MQ 中;

需要注意的是:1)尽量不要把未完成的任务保存到本地磁盘,尤其是在经常扩缩容的弹性集群里;2)捕获 InterruptedException 异常后,不要做重试等耗时操作;3)需要监控任务都发送到 MQ 中的时间,以便调整 kill -9 强制执行前的等待时间。

(3)使用 MQ 保证任务必须执行完成

通过上面介绍的两种方式,可以处理大部分正常停止服务丢数据的任务。不过对于极端情况下,比如断电、断网等,需要严格保证任务不丢失的场景还是不能满足业务需要,这种情况下就需要依赖 MQ。

方案是使用线程池的 submit 方法提交任务,通过 future 获取到任务执行完成再返回给 MQ 消费完成。在 MQ 中如何保证数据不丢失是另外一个复杂的话题了,这里不再深入探讨。

需要注意的是,如果采用这种方案,需要保证处理任务的幂等性,在操作步骤比较多的时候,复杂性也会很高。

5、ThreadLocal 变量

ThreadLocal 中变量的作用域是当前线程,使用线程池后会因跨线程导致数据不能传递,如果业务中使用了 ThreadLocal,需要额外处理这种场景。

(1)InheritableThreadLocal

InheritableThreadLocal 是在父子线程中自动传递参数,在线程池场景中不适用。

(2)手动处理

在提交任务前把 ThreadLocal 中的值取出来,在线程池执行时再 set 到线程池中线程的 ThreadLocal 中,并且在 finally 中清理数据。

缺点是每个线程池都要处理一遍,如果对上下文不熟悉,有漏传的风险。

(3)TransmittableThreadLocal

阿里开源地址:TransmittableThreadLocal

原理是通过 javaagent 自动处理 ThreadLocal 跨线程池传参,对业务开发者无感知,也是推荐的方案。

6、异常处理

(1)异常感知

execute 方法:抛异常会被提交任务线程感知;

submit 方法:抛异常不会被提交任务线程感知,在 Future.get()执行时会被感知;

(2)统一处理方案 1:异步任务里统一 catch

在线程池的执行逻辑最外层,包装 try、catch,处理所有异常。

缺点是: 1)所有的不同任务都要 trycatch,增加了代码量。2)不存在 checkedexception 的地方也需要都 trycatch 起来,代码丑陋。

(3)统一处理方案 2:覆写统一异常处理方法

此方案有两种常用实现:1)自定义线程池,继承 ThreadPoolExecutor 并覆写其 afterExecute 方法;2)创建线程池时自定义 ThreadFactory,在实现里手动创建线程池,并调用 Thread.setUncaughtExceptionHandler 注册统一异常处理器。

(4)统一处理方案 3:Future

任务提交都使用 submit,并在 Future.get()时捕获所有异常。

三、总结

本文从创建线程池、队列注意事项、如何保证任务不丢失、ThreadLocal、异常等方面总结了笔者的一些思考,各位读者可以对照下自己的使用场景,看本文提到的问题是否都考虑到了呢,或者你还有什么线程池方面的使用经验,欢迎交流分享。

本文链接:Java线程池进阶

作者简介:木小丰,美团 Java 技术专家,专注分享软件研发实践、架构思考。欢迎关注公共号:Java 研发



更多精彩文章:

从MVC到DDD的架构演进

平台化建设思路浅谈

构建可回滚的应用及上线checklist实践

Maven依赖冲突问题排查经验

发布于: 刚刚阅读数: 2
用户头像

木小风

关注

美团技术专家。公共号:Java研发 2012.03.22 加入

还未添加个人简介

评论

发布
暂无评论
Java线程池进阶