写点什么

【并发编程系列 12】从 Java 线程池的常用 4 种写法深入分析线程池 (Thread Pool) 的实现原理

  • 2022 年 4 月 13 日
  • 本文字数:4535 字

    阅读完需:约 15 分钟

前言

面试技巧另外开篇再说,先上面试干货吧。面试的题目并不一定有严格的顺序关系,有的是从前一个问题延伸而来,(探究的是一个知识的深度),有的是考察面试者的知识广度、有的纯粹是我想到哪里写到哪里的啦。。不要太在意哈,最近工作有点忙。


});


}


}


}


singleThreadExecutor.shutdown();


运行结果只有 1 个线程:



SingleThreadExecutor 调用了如下方法构造线程池:



[](()CachedThreadPool




一个在需要处理任务时才会创建线程的线程池,如果一个线程处理完任务了还没有被回收,那么线程可以被重复使用。


当我们调用 execute 方法时,如果之前创建的线程有空闲可用的,则会复用之前创建好的线程,否则就会创建新的线程加入到线程池中。


创建好的线程如果在 60s 内没被使用,那么线程就会被终止并移出缓存。因此,这种线程池可以保持长时间空闲状态而不会消耗任何资源。

[](()示例

package com.zwx.concurrent.threadPool;


import java.util.concurrent.ExecutorService;


import java.util.concurrent.Executors;


public class TestThreadPool {


public static void main(String[] args) {


ExecutorService cachedThreadPool = Executors.newCachedThreadPool();


for (int i=0;i<9;i++){


cachedThreadPool.execute(()-> {


System.out.println("线程名:" + Thread.currentThread().getName());


});


}


cachedThreadPool.shutdown();


}


输出结果可以看到,创建了 9 个不同的线程:



接下来我们对上面的示例改造一下,在执行 execute 之前休眠一段时间:


package com.zwx.concurrent.threadPool;


import java.util.concurrent.ExecutorService;


import java.util.concurrent.Executors;


public class TestThreadPool {


public static void main(String[] args) {


ExecutorService cachedThreadPool = Executors.newCachedThreadPool();


for (int i=0;i<9;i++){


try {


Thread.sleep(i * 10L);


} catch (InterruptedException e) {


e.printStackTrace();


}


cachedThreadPool.execute(()-> {


System.out.println("线程名:" + Thread.currentThread().getName());


});


}


cachedThreadPool.shutdown();


}


这时候输出的结果就只有 1 个线程了,因为有部分线程可以被复用:



注意:这两个示例的结果都不是固定的,第一种有可能也不会创建 9 个线程,第二种也有可能不止创建 1 个线程,具体要看线程的执行情况。


CachedThreadPool 调用了如下方法构造线程池



[](()ScheduledThreadPool




创建一个线程池,它可以在调度命令给定的延迟后运行或定期执行。这个相比较于其他的线程池,其自定义了一个子类 ScheduledExecutorService 继承了 ExecutorService。

[](()示例

package com.zwx.concurrent.threadPool;


import java.util.concurrent.ExecutorService;


import java.util.concurrent.ScheduledExecutorService;


import java.util.concurrent.Executors;


public class TestThreadPool {


public static void main(String[] args) {


ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(3);


for (int i=0;i<9;i++){


scheduledThreadPool.execute(()->{


System.out.println("线程名:" + Thread.currentThread().getName());


});


}


scheduledThreadPool.shutdown();


}


}


输出结果(执行结果具有随机性,最多只有 3 个线程执行):



ScheduledThreadPool 最终调用了如下方法构造线程池



[](()线程池原理


==================================================================


根据上面的截图可以看到,列举的 4 中常用的线程池在构造时,最终调用的方法都是 ThreadPoolExecutor 类的构造方法,所以要分析原理,我们就去看看 ThreadPoolExecutor 吧!


[](()构造线程池 7 大参数




下面就是 ThreadPoolExecutor 类中最完整的一个构造方法:



这个就是是构造线程池的核心方法,总共有 7 个参数:


  • corePoolSize:核心线程数量。一直保留在池中的线程,核心线程即使空闲状态也不会被回收,除非设置了 allowCoreThreadTimeOut 属性

  • maximumPoolSize:最大线程数量。线程池中允许的最大线程数,大于等于核心线程数

  • keepAliveTime:活跃时间。当最大线程数比核心线程数更大时,超出核心的线程数的其他线程如果空间时间超过 keepAliveTime 会被回收

  • TimeUnit:活跃时间的单位

  • BlockingQueue:阻塞队列。用于存储尚等待被执行的任务。

  • ThreadFactory:创建线程的工厂类

  • RejectedExecutionHandler:拒绝策略。当达到了线程边界和队列容量时提交的任务被阻塞时执行的策略。


[](()线程池执行流程




execute(Runnable) 方法的主流程非常清晰:



根据上面源码,可以得出线程池执行流程图如下:



[](()源码分析




首先看看 ThreadPoolExecutor 类中的 ctl,是一个 32 位的 int 类型,其中将高 3 位用来表示线程数量,低 29 位用来表示,其中的计算方式都是采用二进制来计算。



其中各种状态的转换关系如下图:



其中状态的大小关系为:


RUNNING<SHUTDOWN<STOP<TIDYING<TERMINATED

[](()addWork 方法

private boolean addWorker(Runnable firstTask, boolean core) {


//第一段逻辑:线程数+1


retry:


for (;;) {


int c = ctl.get();//获取线程池容量


int rs = runStateOf(c);//获取状态


// Check if queue empty only if necessary.


if (rs >= SHUTDOWN &&//即:SHUTDOWN,STOP,TIDYING,TERMINATED


! (rs == SHUTDOWN &&


firstTask == null &&


! workQueue.isEmpty()))//即:rs==RUNNING,firstTask!=null,queue==null


return false;//如果已经关闭,不接受任务;如果正在运行,且 queue 为 null,也返回 false


for (;;) {


int wc = workerCountOf(c);//获取当前的工作线程数


//如果工作线程数大于等于容量或者大于等于核心线程数(最大线程数),那么就不能再添加 worker


if (wc >= CAPACITY ||


wc >= (core ? corePoolSize : maximumPoolSize))


return false;


if (compareAndIncrementWorkerCount(c))//cas 增加线程数,失败则再次自旋尝试


break retry;


c = ctl.get(); // Re-read ctl //再次获取工作线程数


if (runStateOf(c) != rs)//不相等说明线程池的状态发生了变化,继续自旋尝试


continue retry;


}


}


//第二段逻辑:将线程构造成 Worker 对象,并添加到线程池


boolean workerStarted = false;//工作线程是否启动成功


boolean workerAdded = false;//工作线程是否添加成功


Worker w = null;


try {


w = new Worker(firstTask);//构建一个 worker


final Thread t = w.thread;//去除 worker 中的线程


if (t != null) {


final ReentrantLock mainLock = this.mainLock;//获取重入锁


mainLock.lock();//上锁


try {


// Recheck while holding lock.


// Back out on ThreadFactory failure or if


// shut down before lock acquired.


int rs = runStateOf(ctl.get());//获得锁之后,再次检查状态


//只有当前线程池是正在运行状态,[或是 SHUTDOWN 且 firstTask 为空],才能添加到 workers 集合中


if (rs < SHUTDOWN ||


(rs == SHUTDOWN && firstTask == null)) {


if (t.isAlive()) // precheck that t is startable


throw new IllegalThreadStateException();


workers.add(w);//将新创建的 Worker 添加到 workers 集合中


int s = workers.size();


if (s > largestPoolSize)


largestPoolSize = s;//更新线程池中线程的数量


workerAdded = true;//添加线程(worker)成功


}


} finally {


mainLock.unlock();


}


if (workerAdded) {


t.start();//这里就会去执行 Worker 中的 run()方法


workerStarted = true;//启动成功


}


}


} finally {


if (! workerStarted)


addWorkerFailed(w);//如果启动线程失败,需要回滚


}


return workerStarted;


}


这个方法主要就是做两件事:


  • 一、将线程数+1

  • 二、将线程构造成 Worker 对象,加入到线程池中,并调用 start()方法启动线程

[](()Worker 对象


上面这个方法继承了 AbstractQueuedSynchronizer,前面我们讲述[AQS 同步队列](()的时候知道,AQS 就是一个同步器,那么既然有线程的同步器,这里为什么不直接使用,反而要继承之后重写呢?


这是因为 AQS 同步器内是支持锁重入的,但是线程池这里的设计思想是并不希望支持重入,所以才会重写一个 AQS 来避免重入。


Worker 中 state 初始化状态设置为-1,原因是在初始化 Worker 对象的时候,在线程真正执行 runWorker()方法之前,不能被中断。而一旦线程构造完毕并开始执行任务的时候,是允许被中断的,所以在线程进入 runWorker()之后的第一件事就是将 state 设置为 0(无锁状态),也就是允许被中断。


我们再看看 Worker 的构造器:



addWork 方法执行到这句:w = new Worker(firstTask);//构建一个 worker 的时候,就会调用构造器创建一个 Worker 对象,state=-1,并且将当前任务作为 firstTask,后面再运行的时候会优先执行 firstTask。


上面 addWorker 方法在 worker 构造成功之后,就会调用 worker.start 方法,这时候就会去执行 Worker 中的 run()方法,这也是一种委派的方式,如果对 start()和 run()具体是如何执行这一块不理解,可以[点击这里](()进行详细了解(currentThread()和 this 的区别部分有解释)。


run()方法中调用了 runWorker(this)方法,这个方法就是真正执行任务的方法:

[](()runWorker(this)方法

final void runWorker(Worker w) {


Thread wt = Thread.currentThread();


Runnable task = w.firstTask;


w.firstTask = null;


/**


  • 表示当前 worker 线程允许中断,因为 new Worker 默认的 state=-1,此处是调用

  • Worker 类的 tryRelease()方法,state 置为 0,

  • 而 interruptIfStarted()中只有 state>=0 才允许调用中断


*/


w.unlock(); // allow interrupts


boolean completedAbruptly = true;


try {


while (task != null || (task = getTask()) != null) {


/**


  • 加锁,这里加锁不仅仅是为了防止并发,更是为了当调用 shutDown()方法的时候线程不被中断,

  • 因为 shutDown()的时候在中断线程之前会调用 tryLock 方法尝试获取锁,获取锁成功才会中断


*/


w.lock();


// If pool is stopping, ensure thread is interrupted;


// if not, ensure thread is not interrupted. This


// requires a recheck in second case to deal with


// shutdownNow race while clearing interrupt


/**


  • 如果是以下两种情况,需要中断线程

  • 1.如果 state>=STOP,且线程中断标记为 false

  • 2.如果 state<STOP,获取中断标记并复位,如果线程被中断,那么,再次判断 state 是否 STOP

  • 如果是的话,且线程中断标记为 false


*/


if ((runStateAtLeast(ctl.get(), STOP) ||//状态>=STOP


(Thread.interrupted() &&


runStateAtLeast(ctl.get(), STOP))) &&


!wt.isInterrupted())


wt.interrupt();//中断线程


try {


beforeExecute(wt, task);//空方法,我们可以重写它,在执行任务前做点事情,常用于线程池运行的监控和统计


Throwable thrown = null;


try {


task.run();//正式调用 run()执行任务


} catch (RuntimeException x) {


thrown = x; throw x;


} catch (Error x) {


thrown = x; throw x;


《一线大厂 Java 面试真题解析+Java 核心总结学习笔记+最新全套讲解视频+实战项目源码》开源


Java 优秀开源项目:

  • ali1024.coding.net/public/P7/Java/git

最后

每年转战互联网行业的人很多,说白了也是冲着高薪去的,不管你是即将步入这个行业还是想转行,学习是必不可少的。作为一个 Java 开发,学习成了日常生活的一部分,不学习你就会被这个行业淘汰,这也是这个行业残酷的现实。


如果你对 Java 感兴趣,想要转行改变自己,那就要趁着机遇行动起来。或许,这份限量版的 Java 零基础宝典能够对你有所帮助。



用户头像

还未添加个人签名 2022.04.13 加入

还未添加个人简介

评论

发布
暂无评论
【并发编程系列12】从Java线程池的常用4种写法深入分析线程池(Thread Pool)的实现原理_Java_爱好编程进阶_InfoQ写作平台