写点什么

使用线程池你应该知道的知识点

  • 2024-07-29
    福建
  • 本文字数:6714 字

    阅读完需:约 22 分钟

多线程编程是每一个开发必知必会的技能,在实际项目中,为了避免频繁创建和销毁线程,我们通常使用池化的思想,用线程池进行多线程开发。线程池在开发中使用频率非常高,也包含不少知识点,是一个高频面试题,本篇总结线程池的使用经验和需要注意的问题,更好的应对日常开发和面试。如有更多知识点,欢迎补充~


异常处理


正如我们在异常处理机制所讲,如果你没有对提交给线程池的任务进行异常捕获,那么异常信息将会丢失,不利于问题排查。通常异常处理要么是手动处理掉,要么是往上抛由全局异常处理器统一处理,切勿吃掉异常。在实际开发中,我们可以使用装饰器模式对 TheradPoolExecutor 进行封装,重写它的 execute 和 submit 方法,进行 try-catch 处理,打印日志,防止开发同学直接使用 ThreadPoolExecutor 提交任务而漏了异常处理。


traceid


完整的日志链路对日志分析,问题排查是至关重要的,否则拿到一堆日志没有关联性,根本无从下手。一个完整的请求可能经过很多个方法调用,服务调用,mq 消息发送等,要串联起来需要一个全局 id,称为 traceid。例如我们使用 spring cloud sleuth 链路跟踪,它就会在上下文(MDC)塞一个 traceid,并不断传递下去。很遗憾,如果你在请求过程使用线程池(直接 new ThreadPoolExecutor),那么 traceid 将会丢失,例如你会看到如下日志:



很明显,线程池里打印的日志跟外面的关联不起来了,这会影响我们分析排查问题。解决方案,可以使用 spring 提供的 ThreadPoolTaskExecutor,它内部也包装了 ThreadPoolExecutor,提供更多功能。将其注册到 spring 容器中,使用 spring cloud sleuth 时,它会判断如果实现了 ExecutorService 接口的 bean,就会进行动态代理为 TraceableExecutorService,它会将当前上下文的 traceid 传递给线程池的线程,那么就可以关联起来了。如:



当然你也可以像前面说的,封装自己的 ThreadPoolExecutor 注册到 spring 容器,也一样会被代理。关于 traceid 我们还在 xxl 这边有写到,可以参考给xxl新增traceId和spanId


ThreadLocal


ThreadLocal 是线程内一块数据结构(Thread 类内有一个 ThreadLocal.ThreadLocalMap),线程间互不干扰,没有并发问题。上面我们提到使用 MDC 可以在各个位置打印 traceid,实际就是利用了 ThreadLocal,如使用 logback:



ThreadLocal 在线程内传递数据是没有问题的,但涉及到子线程怎么办呢?这个时候就无法传递过去了,不过 Thread 类内还有一个 ThreadLocal.ThreadLocalMap inheritableThreadLocals,当创建子线程的时候会把父线程的 ThreadLocal“继承”过来。



但使用线程池场景又不太一样了,因为线程池里的线程是只创建一次,后续复用的,而前面说的“继承”是创建时一次性传递过来,后续就不会更改,很明显不符合线程池的场景,使用线程池时希望线程每次都从父线程获取最新的 ThreadLocal。解决方案,可以使用阿里的transmittable-thread-local,它的原理也不复杂,就是在每次提交任务给线程池的时候,拷贝 ThreadLocal。 关于 ThreadLocal 我们之前有过介绍,有兴趣可以看下面试再也不怕问ThreadLocal了ThreadLocal扩展


核心参数


这是入门级八股文了吧,基本烂到面试官都不问了,但有一些点我仍想发掘一下亮点。

    public ThreadPoolExecutor(int corePoolSize,                              int maximumPoolSize,                              long keepAliveTime,                              TimeUnit unit,                              BlockingQueue<Runnable> workQueue,                              ThreadFactory threadFactory,                              RejectedExecutionHandler handler)
复制代码


如上是参数最全的构造方法,参数解释:

  • corePoolSize

核心线程数,默认是不初始化创建核心线程,也不回收。可以通过 prestartCoreThread/prestartAllCoreThreads 方法对线程池进行预热,也可以通过 allowCoreThreadTimeOut 方法对核心线程进行回收。


  • maximumPoolSize

最大线程数,当核心线程开到最大,且任务队列也满了,还有任务提交,就会继续开线程到直到最大线程数。


  • keepAliveTime

当线程数大于核心线程数,线程最大空闲时间,超过就会被销毁回收。


  • unit

keepAliveTime 的时间单位。


  • workQueue

队列,核心线程满了,任务会暂存到队列,等待执行。


  • threadFactory

线程工厂,默认是 Executors.defaultThreadFactory(),线程名称由:pool-数字-thread 组成。


  • RejectedExecutionHandler

拒绝策略,当队列满了,最大线程数也满了,还提交任务就会触发拒绝策略,jdk 提供了 4 种拒绝策略,默认是 AbortPolicy,也可以自定义拒绝策略。



需要提到的点是:

  • 根据业务定义不同的线程池

有的人喜欢定义一个所谓“通用”的线程池来处理各种业务,这种做法不好,每种业务的核心参数需求不一样,且会相互竞争资源,正确的做法应该是每种业务定义一个线程池。


  • 有意义的线程名称

从上面可以看到默认的线程名称不是特别友好,我们可以根据业务取一个有意义的名称。这里我用到 guava 里的 ThreadFactoryBuilder,例如:

new ThreadFactoryBuilder()    .setNameFormat(“taskDispatchPool” + "-%d")    .build();
复制代码


  • 合适的队列长度

过长的队列长度可能导致应用 OOM,实际要根据具体情况指定,不宜过大,禁止使用无界队列。

jdk 的 Executors 辅助类创建的线程池队列长度很多都是无界的,稍有不慎就会导致内存溢出,这也是为什么阿里 java 开发规范明确禁止使用 Executors 创建线程池的原因。


  • 与 tomcat/hystirx 线程池的区别

jdk 的线程池是在核心线程满了,任务先进入队列,队列满了再继续创建线程到最大线程数。

而 tomcat/hystrix 的线程池策略是,核心线程满了,就继续创建线程到最大线程数,再有任务就进入队列。


  • 设置合适的核心参数

一般任务可以分为 cpu 密集型或 io 密集型,对于 cpu 密集型比较容易设置,一般设置为 cpu 核数即可,不宜过大,因为 cpu 密集型线程过大会有大量的线程切换,反而降低性能。

io 密集型就不好估算了,而且大部分情况下都是 io 密集型,没有万能公式,只能根据经验,测试,生产运行观察,调整,才能达到一个比较合适的值,这就要求我们的线程池要支持动态调整参数,下面还会说到。


如果跟面试官提这些点,说明你不是单纯背八股文,是有真的在思考总结~


动态线程池


上面我们说到线程池的核心参数(主要是线程数和队列长度)不太好估算,设置过小可能任务处理不过来导致阻塞,设置过大可能影响整体服务或影响下游服务,所以生产环境的线程池要支持动态调整。幸运的是 ThreadPoolExecutor 提供了方法可以直接对核心参数进行修改,例如 setCorePoolSize,setMaximumPoolSize,我们可以在运行过程进行设置,线程池内部就会根据参数调整线程了。笔者所在的项目一份代码会部署在各个国家(环境),每个环境的业务,数据量不一样,机器配置也不一样,所以需要根据环境设置不同的参数。在实际运行过程中,有时候为了提升处理效率设置比较大的线程数,而忽略对下游服务的影响,导致下游服务被压垮。也有时候开始估算太小,后面业务增长,导致处理不过来的情况。所以需要动态调整,我们把线程池参数配置接入到 apollo,随时可以调整生效,同时我们也把线程池的运行情况上报到 grafana,可以进行监控,告警。关于动态线程池,之前也写过,可以参考:加强版ThreadPoolExecutor升级Java线程池实现原理及其在美团业务中的实践hippo4j如何设置线程池参数?美团给出了一个让面试官虎躯一震的回答


任务统计


有时候我们需要对提交给线程池的任务进行统计,例如本次执行成功多少,失败多少,过滤多少。线程池就没有提供这种实现了,因为它是一直运行的状态,区分不了业务上的东西,只能简单获取总体完成成功次数(getCompletedTaskCount),或触发拒绝策略次数(getRejectCount)。业务上的统计我们就可以结合 CountDownLatch 来进行计数,主线程提交完要进行 await 等待线程池所有任务完成,每个线程完成一次任务就 countDown 一次,任务计数可以使用 LongAdder 统计,相比 AtomicLong 更加高效。这也回应了我们上面说要根据业务定义不同的线程池的原因,不同类型的统计不会相互影响。后来我们有个同学提醒 jdk 还有个 Phaser 类,比 CountDownLatch 好用,也可以用它来完成,具体参见:并发工具类Phaser


默认线程池


你是否在你的团队见过如下代码:

	@Test	public void test4() throws InterruptedException {		List<String> list = Lists.newArrayList();		list.parallelStream().forEach(item->{			//run async		});
CompletableFuture.runAsync(()->{ //run async }); }
复制代码


这些写法确实都是异步的,但底层都是用了系统默认的线程池 ForkJoinPool.commonPool(),默认线程数是 Runtime.getRuntime().availableProcessors() - 1。如果是 cpu 密集型任务,这么使用也没啥问题。如果是 io 密集型,就会相互影响。如下,业务 2 的任务会卡住,知道业务 1 执行完成。

		//业务1		for (int i = 0; i < ForkJoinPool.commonPool().getParallelism(); i++) {			CompletableFuture.runAsync(() -> {				try {					Thread.sleep(5000);				} catch (InterruptedException e) {				}			});		}
//业务2 for (int i = 0; i < ForkJoinPool.commonPool().getParallelism(); i++) { CompletableFuture.runAsync(() -> { System.out.println("running..."); }); }
复制代码


父子线程公用一个线程池


这也是一个实际案例,在我们团队有同学这么使用线程池,逻辑很简单,想要查一批数据的时间和还款概率,实际情况还查了更多信息,做了简化,这些查询需要调用外部接口,为了提升接口性能,把这些查询都丢到线程池里去并发执行,通过 Future.get 获取结果。同时主线程希望这两个操作也可以并发执行,所以通过 CompletableFuture 也提交到这个线程池里,最终通过 CompletableFuture.join 等待所有任务完成。实际代码比较复杂,简化后如下:

        //查一个时间	CompletableFuture<Void> timeFuture = CompletableFuture.runAsync(() -> {		initTime(pageResult);	}, executor);
//预测还款概率 CompletableFuture<Void> repayDesireFuture = CompletableFuture.runAsync(() -> { initRepayDesire(pageResult); }, executor); //主线程等待 CompletableFuture.allOf(timeFuture, repayDesireFuture).join();
private void initTime(List<JobListVo> data) { List<List<JobListVo>> lists = Lists.partition(data, CommonConst.PAGE_SIZE_50); List<Future<Result<List<TimeInfo>>>> futures = new ArrayList<>(); lists.forEach(item -> futures.add(executor.submit(() -> client.getTimeList(ids)))); futures.forEach( item -> { try { Result<List<TimeInfo>> result = item.get(); } catch (Exception e) { log.error("fetch error", e); } } ); }
private void initRepayDesire(List<JobListVo> data) { List<List<JobListVo>> lists = Lists.partition(data, CommonConst.PAGE_SIZE_50); List<Future<Result<List<RepayDesire>>>> futures = new ArrayList<>(); lists.forEach(item -> futures.add(executor.submit(() -> client.queryRepayDesire(ids)))); futures.forEach( item -> { try { Result<List<RepayDesire>> result = item.get(); } catch (Exception e) { log.error("fetch error", e); } } ); }
复制代码


看到这段代码,可能有的人会说主线程那两个操作并发的意义不大,串行执行即可。但实际分析还是有意义的,假如只有一个请求,查询时间的任务提交到线程池后,线程池资源还有剩余,这个时候并发执行还款概率的任务,是可以加速整个查询速度的。笔者在 review 这段代码的时候,感觉总是怪怪的,但逻辑上又好像说得通,直到我看到这边文章线程池遇到父子任务,有大坑,要注意!,这不就跟我们的场景几乎一样吗,确实可能会有问题。


根据文章所述,父子任务提交到同一个线程池可能导致父子任务相互等待,最终卡死。极端一点,假设线程池核心线程数是 1,队列长度是 1,现在主线程提交了一个任务,使用了这个核心线程,开始执行,并等待子线程执行完成。它的执行逻辑是在子线程内再提交一个任务给线程池,由于只有一个核心线程,所以这个任务进入队列等待。等到什么时候呢,等到主线程那个任务完成线程才能释放,主线程又什么时候完成呢,等待子线程执行完成,死循环了...


总结:父子任务,不要公用一个线程池。


shutdown/shutdownNow


这两个方法都是关闭线程池的,区别是 shutdown 是不再接收新任务,但提交的任务还会执行,而 shutdownNow 除了不再接收新任务,已提交的任务也不会执行,正在执行中的任务会终止。一般在线程池不再使用或应用下线前,就会调用线程关闭方法。如果关闭后再提交任务,就会触发拒绝策略。最好确保在关闭后不再有任务提交给线程池,否则可能会有问题,笔者之前就遇到线程池关闭后还有请求(服务下线前)进来,导致报 TimeoutException 错误,具体分析在这篇/线程池shutdown引发TimeoutException,有兴趣的可以看下。


内存泄漏


将线程池变量定义为局部变量时,可能会发生内存泄漏。如下:

	@GetMapping(value = "/test")	public Result<Void> info() {		ExecutorService executorService = Executors.newFixedThreadPool(1);		executorService.submit(() -> System.out.println("active thread"));		return Result.success();	}
复制代码


在我们印象中,executorService 作为一个局部变量,在方法返回时,生命周期就结束了,这个时候应该是可以被 gc 回收的,怎么会发生内存泄漏呢?使用线程池时有点不同,因为这里有个隐含的条件是,虽然方法返回结束了,但线程仍存活这,而存活的线程是可以作为 gc root 的。我们知道线程池里的线程会被包装为 Worker 对象,这是定义在 ThreadPoolExecutor 里的非静态内部类。如下代码,Worker 也实现了 Runnable 接口,把它作为参数传递给 Thread 对象。




活跃的线程作为 gc root 的,也就是不会被垃圾回收,而这个线程又引用着这个 Worker 对象,所以它也不会被回收。那个线程池对象又有什么关系呢?上面说到 Worker 是作为 ThreadPoolExecutor 里的非静态内部类,非静态内部类有一个规则就是持有外部类的引用,例如我们可以在 InnerClass 里调用外部类的方法。或者通过编译后查看 class 文件也可以看到 InnerClass 持有外部类的 this 引用。

public class OuterClass {
public OuterClass(Runnable runnable) { } public void outterFunction() {
}
class InnerClass { public InnerClass() { outterFunction(); } }}
复制代码


所以,由于 Worker 持有 ThreadPoolExecutor 的引用,所以它也不会被回收,用一张图表示就是:



解决方案,1.不要使用局部线程池变量,定义为全局变量。2.调用 shutdown/shutdownNow 关闭线程池,关闭后线程就会被回收,线程池也可以被回收。


附 chatgpt:jdk8 中,哪些对象可以作为 gc root?



虚拟线程


在 jdk21 以前,我们在 java 里使用的线程都称为平台线程,与内核线程是一对一的关系,开篇就说到,平台线程的使用成本比较高,所以才使用线程线程池来缓存复用它。



jdk21 虚拟线程成为正式功能,可以投入生产使用。java 里的虚拟线程类似于 goland 中的协程,虚拟线程与内核线程不再是一对一的关系,而是多对一,在 jvm 层面进行调度,可以大大内核线程的数量。如图,可以看到多个虚拟线程在底层还是公用一个内核线程,它们之间的执行调度由 jvm 自动完成,虚拟线程的创建成本非常低,可以创建的虚拟线程数量可以远大于平台线程。当我们的程序遇到 io 的时候,以往的方式是将当前线程挂起,cpu 进行线程切换,执行另外的任务。使用虚拟线程则不需要,任务的调度执行是在 jvm 层面完成的,cpu 还是一直在执行同一个线程。



代码示例:

Thread.ofVirtual().start(()->{});
复制代码


springboot3.2 tomcat 开启虚拟线程:

spring.threads.virtual.enabled = true
复制代码


那么使用虚拟线程,还需要线程池吗?答案是不需要的,我们之所以池化是因为对象的创建、销毁成本较高,每次使用都创建,用完就丢弃,太浪费了,但虚拟线程的使用成本很低,所以它不需要池化了。虚拟线程的设计虽然不是为了取代传统线程和编程方式,可以看到 jdk 是通过扩展支持虚拟线程的,我们依然可以像以前一样编程开发,但在高并发场景虚拟线程是更好的选择,这也是大势所趋,说不定以后哪一天线程池也会被无情的标记上 @Deprecated。


文章转载自:jtea

原文链接:https://www.cnblogs.com/jtea/p/18329735

体验地址:http://www.jnpfsoft.com/?from=infoq

用户头像

还未添加个人签名 2023-06-19 加入

还未添加个人简介

评论

发布
暂无评论
使用线程池你应该知道的知识点_Java_不在线第一只蜗牛_InfoQ写作社区