写点什么

知道这些线程池底层源码的知识,你也能和面试官扯半小时

作者:Java高工P7
  • 2021 年 11 月 12 日
  • 本文字数:5832 字

    阅读完需:约 19 分钟

pool-1-thread-2 的 i 值为: 47


pool-1-thread-2 的 i 值为: 49


pool-1-thread-1 的 i 值为: 0


pool-1-thread-1 的 i 值为: 2


pool-1-thread-1 的 i 值为: 4


pool-1-thread-1 的 i 值为: 6


pool-1-thread-1 的 i 值为: 8


pool-1-thread-1 的 i 值为: 10


pool-1-thread-1 的 i 值为: 12


pool-1-thread-1 的 i 值为: 14


pool-1-thread-1 的 i 值为: 16


pool-1-thread-1 的 i 值为: 18


pool-1-thread-1 的 i 值为: 20


pool-1-thread-1 的 i 值为: 22


pool-1-thread-1 的 i 值为: 24


pool-1-thread-1 的 i 值为: 26


pool-1-thread-1 的 i 值为: 28


pool-1-thread-1 的 i 值为: 30


pool-1-thread-1 的 i 值为: 32


pool-1-thread-1 的 i 值为: 34


pool-1-thread-1 的 i 值为: 36


pool-1-thread-1 的 i 值为: 38


pool-1-thread-1 的 i 值为: 40


pool-1-thread-1 的 i 值为: 42


pool-1-thread-1 的 i 值为: 44


pool-1-thread-1 的 i 值为: 46


pool-1-thread-1 的 i 值为: 48


pool-1-thread-1 的 i 值为: 50


?

五、剖析 ThreadPoolExecutor 底层源码

?


public ThreadPoolExecutor(int corePoolSize,


int maximumPoolSize,


long keepAliveTime,


TimeUnit unit,


BlockingQueue<Runnable> workQueue,


ThreadFactory threadFactory,


RejectedExecutionHandler handler) {


if (corePoolSize < 0 ||


maximumPoolSize <= 0 ||


maximumPoolSize < corePoolSize ||


keepAliveTime < 0)


throw new IllegalArgumentException();


if (workQueue == null || threadFactory == null || handler == null)


throw new NullPointerException();


this.corePoolSize = corePoolSize;


this.maximumPoolSize = maximumPoolSize;


this.workQueue = workQueue;


this.keepAliveTime = unit.toNanos(keepAliveTime);


this.threadFactory = threadFactory;


this.handler = handler;


}


?

1. 创建一个线程池时的参数

  • corePoolSize 表示核心池的大小。


当提交一个任务到线程池的时候,线程池将会创建一个线程来执行任务。这个值的大小设置非常关键,设置过小将频繁地创建或销毁线程,设置过大则会造成资源浪费。


线程池新建线程的时候,如果当前线程总数小于 corePoolSize,则新建的是核心线程;如果超过 corePoolSize,则新建的是非核心线程。


  • maximumPoolSize 表示线程池的最大数量。


如果队列 (workQueue) 满了,并且已创建的线程数小于 maximumPoolSize,则线程池会进行创建新的线程执行任务。 如果


等待执行的线程数 大于 maximumPoolSize,缓存在队列中; 如果 maximumPoolSize 等于 corePoolSize,即是固定大小线程池。


  • keepAliveTime 表示线程活动的保持时间


当需要执行的任务很多,线程池的线程数大于核心池的大小时,keepAliveTime 才起作用;



当一个非核心线程,如果不干活(闲置状态)的时长超过这个参数所设定的时长,就会被销毁掉


  • TimeUnit 表示线程活动保持时间的单位


它的单位有:TimeUnit.DAYS,TimeUnit.HOURS,TimeUnit.MINUTES,TimeUnit.MILLISECONDS,TimeUnit.MICRODECONDS


  • workQueue 表示线程池中存放被提交但尚未被执行的任务的队列


维护着等待执行的 Runnable 对象。当所有的核心线程都在干活时,新添加的任务会被添加到这个队列中等待处理,如果队列满了,则新建非核心线程执行任务。


  • threadFactory 用于设置创建线程的工厂


它用来给每个创建出来的线程设置一个名字,就可以知道线程任务是由哪个线程工厂产生的。


  • handler 表示拒绝处理策略


线程数量大于最大线程数,当超过 workQueue 的任务缓存区上限的时候,就可以调用该策略,这是一种简单的限流保护。


从 JDK 源码里面可以看到:



?


其中里面其设置的 corePoolSize(核心池的大小) 和 maximumPoolSize(最大线程数) 都是 nThreads,其设定的阻塞队列是无界的,则饱和策略将失效,所有请求将一直排队等待被执行,可能会产生内存溢出的风险。因此阿里巴约编码规范不推荐用 Executors 来创建 ThreadPoolExecutor。


使用 Executors 创建线程池时要明确创建的阻塞队列是否有界。因此最好自己创建 ThreadPoolExecutor。


??



ThreadPoolExecutor pool1 = (ThreadPoolExecutor) pool;


// 设置核心池的大小


pool1.setCorePoolSize(15);


// setkeepAliveTime()方法 设置线程没有任务时最多保持多长时间后会停止


pool1.setKeepAliveTime(60000, TimeUnit.HOURS);


Executors 的工厂方法主要就返回了 ThreadPoolExecutor 对象


继承结构:



ThreadPoolExecutor 继承了 AbstractExecutorService


public class ThreadPoolExecutor extends AbstractExecutorService {


private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));


// 表示线程池数量的位数,很明显是 29,Integer.SIZE=32


private static final int COUNT_BITS = Integer.SIZE - 3;


// 表示线程池最大数量,2^29 - 1


private static final int CAPACITY = (1 << COUNT_BITS) - 1;


// runState is stored in the high-order bits


priv


【一线大厂Java面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义】
浏览器打开:qq.cn.hn/FTf 免费领取
复制代码


ate static final int RUNNING = -1 << COUNT_BITS;


private static final int SHUTDOWN = 0 << COUNT_BITS;


private static final int STOP = 1 << COUNT_BITS;


private static final int TIDYING = 2 << COUNT_BITS;


private static final int TERMINATED = 3 << COUNT_BITS;


// Packing and unpacking ctl


private static int runStateOf(int c) { return c & ~CAPACITY; }


private static int workerCountOf(int c) { return c & CAPACITY; }


private static int ctlOf(int rs, int wc) { return rs | wc; }


/*


  • Bit field accessors that don't require unpacking ctl.

  • These depend on the bit layout and on workerCount being never negative.


*/


private static boolean runStateLessThan(int c, int s) {


return c < s;


}


private static boolean runStateAtLeast(int c, int s) {


return c >= s;


}


private static boolean isRunning(int c) {


return c < SHUTDOWN;


}


/**


  • Attempts to CAS-increment the workerCount field of ctl.


*/


private boolean compareAndIncrementWorkerCount(int expect) {


return ctl.compareAndSet(expect, expect + 1);


}


/**


  • Attempts to CAS-decrement the workerCount field of ctl.


*/


private boolean compareAndDecrementWorkerCount(int expect) {


return ctl.compareAndSet(expect, expect - 1);


}


/**


  • Decrements the workerCount field of ctl. This is called only on

  • abrupt termination of a thread (see processWorkerExit). Other

  • decrements are performed within getTask.


*/


private void decrementWorkerCount() {


do {} while (! compareAndDecrementWorkerCount(ctl.get()));


}


/**


  • The queue used for holding tasks and handing off to worker

  • threads. We do not require that workQueue.poll() returning

  • null necessarily means that workQueue.isEmpty(), so rely

  • solely on isEmpty to see if the queue is empty (which we must

  • do for example when deciding whether to transition from

  • SHUTDOWN to TIDYING). This accommodates special-purpose

  • queues such as DelayQueues for which poll() is allowed to

  • return null even if it may later return non-null when delays

  • expire.


*/


// 用于存放线程任务的阻塞队列


private final BlockingQueue<Runnable> workQueue;


/**


  • Lock held on access to workers set and related bookkeeping.

  • While we could use a concurrent set of some sort, it turns out

  • to be generally preferable to use a lock. Among the reasons is

  • that this serializes interruptIdleWorkers, which avoids

  • unnecessary interrupt storms, especially during shutdown.

  • Otherwise exiting threads would concurrently interrupt those

  • that have not yet interrupted. It also simplifies some of the

  • associated statistics bookkeeping of largestPoolSize etc. We

  • also hold mainLock on shutdown and shutdownNow, for the sake of

  • ensuring workers set is stable while separately checking

  • permission to interrupt and actually interrupting.


*/


// 重入锁


private final ReentrantLock mainLock = new ReentrantLock();


/**


  • Set containing all worker threads in pool. Accessed only when

  • holding mainLock.


*/


// 线程池当中的线程集合,只有当拥有 mainLock 锁的时候,才可以进行访问


private final HashSet<Worker> workers = new HashSet<Worker>();


/**


  • Wait condition to support awaitTermination


*/


// 等待条件支持终止


private final Condition termination = mainLock.newCondition();


/**


  • Tracks largest attained pool size. Accessed only under

  • mainLock.


*/


private int largestPoolSize;


/**


  • Counter for completed tasks. Updated only on termination of

  • worker threads. Accessed only under mainLock.


*/


private long completedTaskCount;


/*


  • All user control parameters are declared as volatiles so that

  • ongoing actions are based on freshest values, but without need

  • for locking, since no internal invariants depend on them

  • changing synchronously with respect to other actions.


*/


/**


  • Factory for new threads. All threads are created using this

  • factory (via method addWorker). All callers must be prepared

  • for addWorker to fail, which may reflect a system or user's

  • policy limiting the number of threads. Even though it is not

  • treated as an error, failure to create threads may result in

  • new tasks being rejected or existing ones remaining stuck in

  • the queue.

  • We go further and preserve pool invariants even in the face of

  • errors such as OutOfMemoryError, that might be thrown while

  • trying to create threads. Such errors are rather common due to

  • the need to allocate a native stack in Thread.start, and users

  • will want to perform clean pool shutdown to clean up. There

  • will likely be enough memory available for the cleanup code to

  • complete without encountering yet another OutOfMemoryError.


*/


// 创建新线程的线程工厂


private volatile ThreadFactory threadFactory;


/**


  • Handler called when saturated or shutdown in execute.


*/


// 饱和策略


private volatile RejectedExecutionHandler handler;


总结


ctl 是主要的控制状态



  • workerCount:表示有效的线程数目



  • runState:表示线程池里线程的运行状态


2. 重要方法

① execute() 方法

源码剖析


// 调用 execute 方法将线程提交到线程池中


public void execute(Runnable command) {


// 如果执行的任务为空,则会抛出空指针异常


if (command == null)


throw new NullPointerException();


/*


  • Proceed in 3 steps:

  • If fewer than corePoolSize threads are running, try to

  • start a new thread with the given command as its first

  • task. The call to addWorker atomically checks runState and

  • workerCount, and so prevents false alarms that would add

  • threads when it shouldn't, by returning false.

  • If a task can be successfully queued, then we still need

  • to double-check whether we should have added a thread

  • (because existing ones died since last checking) or that

  • the pool shut down since entry into this method. So we

  • recheck state and if necessary roll back the enqueuing if

  • stopped, or start a new thread if there are none.

  • If we cannot queue task, then we try to add a new

  • thread. If it fails, we know we are shut down or saturated

  • and so reject the task.


*/


// 获取线程池的控制状态


int c = ctl.get();


// 如果 workerCount 值小于 corePoolSize


if (workerCountOf(c) < corePoolSize) {


// 添加任务到 worker 集合当中,成功的话返回


if (addWorker(command, true))


return;


// 如果失败,再次获取线程池的控制状态


c = ctl.get();


}


// 如果 corePoolSize 已经满了,则需要加入到阻塞队列


// 判断线程池的状态以及是否可以往阻塞队列中继续添加 runnable


if (isRunning(c) && workQueue.offer(command)) {


// 获取线程池的状态


int recheck = ctl.get();


// 再次检查状态,线程池不处于 RUNNING 状态,将任务从 workQueue 队列中移除


if (! isRunning(recheck) && remove(command))


reject(command);


else if (workerCountOf(recheck) == 0)


addWorker(null, false);


}


// 如果此时队列已满,则会采取相应的拒绝策略


else if (!addWorker(command, false))


reject(command);


}


总结


  • 往 corePoolSize 中加入任务进行执行


  • 当 corePoolSize 满时往阻塞队列中加入任务


  • 阻塞队列满时并且 maximumPoolSize 已满,则采取相应的拒绝策略

② addWorker()方法

源码剖析


private boolean addWorker(Runnable firstTask, boolean core) {


// 外部循环标志


retry:


for (;;) {


// 获取线程池的状态


int c = ctl.get();


int rs = runStateOf(c);


// Check if queue empty only if necessary.


if (rs >= SHUTDOWN &&


! (rs == SHUTDOWN &&


firstTask == null &&


! workQueue.isEmpty()))


return false;


for (;;) {


// 获取 workerCount


int wc = workerCountOf(c);


if (wc >= CAPACITY ||


wc >= (core ? corePoolSize : maximumPoolSize))


return false;


// 跳出循环


if (compareAndIncrementWorkerCount(c))


break retry;


c = ctl.get(); // Re-read ctl


if (runStateOf(c) != rs)


continue retry;


// else CAS failed due to workerCount change; retry inner loop


}


}


boolean workerStarted = false;


boolean workerAdded = false;


Worker w = null;


try {


// 实例化 worker 对象


w = new Worker(firstTask);


// 获取 worker 的线程


final Thread t = w.thread;


// 如果获取的线程不为空


if (t != null) {


// 初始线程池的锁


final ReentrantLock mainLock = this.mainLock;


// 得到锁


mainLock.lock();


try {


// Recheck while holding lock.


// Back out on ThreadFactory failure or if


// shut down before lock acquired.


// 获取锁后再次检查,获取线程池 runState


int rs = runStateOf(ctl.get());


// 判断


if (rs < SHUTDOWN ||


(rs == SHUTDOWN && firstTask == null)) {


// 检查线程是否启动


if (t.isAlive()) // precheck that t is startable


// 如果未启动存活,则抛出异常


throw new IllegalThreadStateException();

用户头像

Java高工P7

关注

还未添加个人签名 2021.11.08 加入

还未添加个人简介

评论

发布
暂无评论
知道这些线程池底层源码的知识,你也能和面试官扯半小时