线程池系列 - (4)工作流程
====
正如我们前面分析的一样。当小于核心线程数的时候会执行 addWorker
添加一个任务,那么它又是如何运行的呢?
先看一下他的源码,其实还是不少的。笔者将其分成 2 块进行分析。
for 循环
try cache
上半部分 for
先看到 for 循环这一块。
① 中 我们已经分析了。当线程池运行状态的时候不会触发。
② 首先看到。截图中是有两个
for(;;)
循环。第二个循环(红色框②)先判断是否超过最大线程数目。
CAPACITY
的值如下??
或者是否超过核心线程还是最大线程。这里我们选择的是核心线程数。如下??我们传入的是 true
然后执行了
compareAndIncrementWorkerCount
尝试更新一下原来记录的线程数量。true 说明 CAS 记录成功了,并且跳出大循环,也就是最外层的循环。
false 说明没有记录成功。继续执行 for 循环,直到返回 true 为止
上半部分其实就是更新了线程池中线程数量。并没有真的添加任务
下半部分 try cache
继续看下半部分。try cache 部分。
笔者这里分成五步。
第①步。任务被包装在了
Worker
中。第②步。上锁
第③步。添加任务
第④步。解锁
第⑤步。执行任务
在第①步。任务被包装在了Worker
中。Worker
是任务与线程的包装,如下图,通过线程创建工厂创建一个新的线程,将任务和线程组合在一起。
在看一下Worker
继承与哪些。注意有一个 Runnable
看一下第③步,添加任务
try {// 检查线程池状态 int rs = runStateOf(ctl.get());// 如果实在运行状态 或者 (线程池处于 SHUTDOWN 并且 任务是 null)if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {// 检查线程是否存活 if (t.isAlive()) // precheck that t is startable// 抛出异常 throw new IllegalThreadStateException();//将任务添加到 workers (HashSet) 中 workers.add(w);}}
来看一下他的两个判断条件
线程池处于运行状态
线程池处于
SHUTDOWN
状态 并且firstTask == null
第一个比较好理解。关键是第二个条件。笔者这边查阅了源码。发现不少地方都有 addWorker(null,true/flase)
的地方。应该是对这些方法进行了判断。
在看一下检查线程是否存活
.
首先isAlive()
为true
说明线程处于执行状态。换句话说。当先线程处于RUNNABLE
TIMED_WAITING
BLOCKED
WAITING
中的一种。不管哪一种。都说明线程现在是在工作的。
线程池中。刚刚才创建的任务就已经执行了。都还没有start
呢。所以就有了一个precheck that t is startable
(预先检查 t 是否可启动)
执行任务
这里就比较好理解。将所有的 Worker
都添加到workers
也就是HashSet
这都不是重点。重点在 ?? t.start()
。
我们都知道线程的 start()
肯定有一个地方执行了run
。而线程池中将这他隐藏在了 Worker
之中。在看一下Worker
。实现了Runnable
。并且实现了 run
方法。当线程 start()
之后。就会执行到这里的run
,然后执行runWorker(this);
到这里我们知道正在执行任务的地方就在 runWorker
. 这里的核心代码就一行task.run();
将任务执行起来。
final void runWorker(Worker w) {// 拿到当前线程 Thread wt = Thread.currentThread();// 拿到当前任务 Runnable task = w.firstTask;// 将 Worker.firstTask 置空 并且释放锁 w.firstTask = null;w.unlock(); // allow interruptsboolean completedAbruptly = true;try {// 如果 task 或者 getTask 不为空,则一直循环 while (task != null || (task = getTask()) != null) {// 加锁 w.lock();// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted. This// requires a recheck in second case to deal with// shutdownNow race while clearing interrupt// return ctl.get() >= stop// 如果线程池状态>=STOP 或者 (线程中断且线程池状态>=STOP)且当前线程没有中断// 其实就是保证两点:// 1. 线程池没有停止// 2. 保证线程没有中断 if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())// 中断当前线程 wt.interrupt();try {// 空方法 beforeExecute(wt, task);Throwable thrown = null;try {// ?? 执行 run 方法(Runable 对象)task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {afterExecute(task, thrown);}} finally {// 执行完后, 将 task 置空, 完成任务++, 释放锁 task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {// 退出工作 processWorkerExit(w, completedAbruptly);}
看到while (task != null || (task = getTask()) != null)
这个代码。有两个条件
task 不是空
getTask()能够获取到任务
重点还是看一下getTask()
方法。其实就是从队列中获取任务。
private Runnable getTask() {// 标记是否超时 boolean timedOut = false; // Did the last poll() time out?// 死循环 for (;;) {// 获取线程池状态 int c = ctl.get();int rs = runStateOf(c);
// Check if queue empty only if necessary.// 判读线程池状态是否大于等于 SHUTDOWN// 再线程池状态是否大于等于 STOP 或 判断任务队列是否为空 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {// 递减 workerCountdecrementWorkerCount();// 返回 null, 表示没有更多的任务 return null;}
// 获取 workerCountint wc = workerCountOf(c);
// 判断是否使用超时获取任务// 1. allowCo
reThreadTimeOut : 可以通过 Thread#allowCoreThreadTimeOut 设置// 2. 如果 workerCount 大于 coolPoolSize 也进行超时获取任务 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 如果 workerCount 大于 maximumPoolSize 或者获取任务超时// 并且 workerCount > 1 或者 任务队列为空//// workerCount 大于 maximumPoolSize 基本不可能,所以主要判断是否超时 if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {// 判断递减 workerCount 是否成功// 成功返回 null 表示没有更多任务, 失败继续死循环 if (compareAndDecrementWorkerCount(c))return null;continue;}
try {// 判断是否使用超时从任务队列获取任务// poll 方法在规定时间没有获取任务返回 null// take 方法则是一直阻塞直到获取任务 Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();// 如果获取的任务不为 null, 返回任务 if (r != null)return r;// 标记超时 timedOut = true;} catch (InterruptedException retry) {// 如果出现异常,重置超时标志 timedOut = false;}
评论