深入了解 java 线程篇之 ThreadPoolExecutor

一. 概述
在深入了解 JAVA 线程篇初略的介绍了线程池中几大关键属性成员,以及大体的原理。但是还未对其进行全面的解读,接下来会其属性以及方法进行全方面解读。
二. 解析
1. 线程池状态
在线程池对象中,有个 ctl 是记录这线程池状态以及正在运行的线程数量。ctl 结构图如下:
高三位记录这线程池状态;
100 - RUNNING,运行状态,即 ctl 是负数;
000 - SHUTDOWN,关闭状态,不再接受新任务,但能处理已添加的任务。
001 - STOP,停止状态,不再接受新任务,不处理已添加的任务,并且会中断正在处理的任务。
010 - TIDYING,僵尸状态,所有的任务已终止,线程数为 0;
011 - TERMINATED,线程池彻底终止。
状态的转换关系图如下:
我们创建线程池时,其状态默认值为 ctl=0xE0000000。线程池只提供了两个方法,对线程池进行关闭的
关闭线程池
public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); //设置状态 advanceRunState(SHUTDOWN); //中断工作线程 interruptIdleWorkers(); onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } //尝试结束线程池 tryTerminate();}立马关闭线程池
public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); //设置状态 advanceRunState(STOP); //中断工作线程 interruptWorkers(); tasks = drainQueue(); } finally { mainLock.unlock(); } //尝试结束线程池 tryTerminate(); return tasks;}2. 提交任务
这里面核心的代码主要是 execute 方法;
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { //如果小于核心线程数,则创建Worker工作线程(核心线程)去执行该任务 if (addWorker(command, true)) return; c = ctl.get(); } //如果创建不了Worker工作线程或者说是失败,则进入下面的逻辑 //判断线程池是否在运行状态,如果是,则将任务添加到队列中去 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) //如果线程池现在非运行状态,并从队列中剔除该任务,并拒绝这次任务的提交; reject(command); else if (workerCountOf(recheck) == 0) //如果工作线程数为0,则创建Worker工作线程; addWorker(null, false); } else if (!addWorker(command, false))//创建非核心线程 //创建Worker非核心线程失败,则拒绝这次任务的提交; reject(command);}流程图如下:
接下来,我们具体了解创建工作线程的方法,该方法提供了两个参数,一个是任务对象,一个是是否是核心线程标志位;
//参数: firstTask-任务, core-是否是核心线程标志private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // 主要检测线程池是否在运行状态 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); //如果大于指定的线程数,则拒绝创建Worker工作线程 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; //尝试新增工作线程数量->ctl+1 if (compareAndIncrementWorkerCount(c)) //创建成功,则打破这个死循环 break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { //创建工作线程 w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // 还没启动,该工作线程就是启动状态,则抛出异常。 throw new IllegalThreadStateException(); //添加到工作线程集合中去 workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { //启动工作线程 t.start(); workerStarted = true; } } } finally { if (! workerStarted) //如果添加失败,则从工作线程集合中剔除该工作线程对象,并尝试结束线程池 addWorkerFailed(w); } return workerStarted;}具体的流程图如下:
3. 任务执行
工作线程执行任务,是调用的 ThreadPoolExecutor 对象中的 runWorker 方法。从任务队列里拉取任务。代码如下
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { //如果是关闭状态且工作线程为0的话,或者是停止状态,则不执行任务 decrementWorkerCount(); return null; } int wc = workerCountOf(c); //检测是否超时标志位,如果为true,当从队列中获取任务时超过指定的时间,则结束该工作线程。 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } }}流程如下:
从队列里拉取任务后,则进行任务的执行。代码如下:
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { //从任务队列中拉取任务 w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) //如果线程池处于停止状态,中断当前线程 wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { //任务执行 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; //完成任务数进行+1 w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); }}流程如下:
当异常或者从任务队列中过去任务为空时,则处理工作线程退出。
private void processWorkerExit(Worker w, boolean completedAbruptly) { if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; workers.remove(w); } finally { mainLock.unlock(); } tryTerminate(); int c = ctl.get(); if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { //设置最小工作线程数 int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } addWorker(null, false); }}具体流程如下:
4. 尝试结束线程池
在上面的几个流程图中,有尝试结束线程池动作,代码如下:
final void tryTerminate() { for (;;) { int c = ctl.get(); if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return; if (workerCountOf(c) != 0) { interruptIdleWorkers(ONLY_ONE); return; } final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { terminated(); } finally { ctl.set(ctlOf(TERMINATED, 0)); termination.signalAll(); } return; } } finally { mainLock.unlock(); } }}流程图如下:
5. 设置核心线程数
代码如下:
public void setCorePoolSize(int corePoolSize) { if (corePoolSize < 0) throw new IllegalArgumentException(); int delta = corePoolSize - this.corePoolSize; this.corePoolSize = corePoolSize; if (workerCountOf(ctl.get()) > corePoolSize) //尝试进行一次中断工作线程 interruptIdleWorkers(); else if (delta > 0) { int k = Math.min(delta, workQueue.size()); while (k-- > 0 && addWorker(null, true)) { if (workQueue.isEmpty()) break; } }}6. 设置最大线程数
代码如下:
public void setMaximumPoolSize(int maximumPoolSize) { if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize) throw new IllegalArgumentException(); this.maximumPoolSize = maximumPoolSize; if (workerCountOf(ctl.get()) > maximumPoolSize) //尝试进行一次中断工作线程 interruptIdleWorkers();}三. 动态调整线程池参数算法
根据上面的两个方法,设置核心线程数量以及最大线程数数量,进行调整线程池参数。
对操作系统层面的 CPU 使用率以及 JVM 堆内存的使用率进行实时监控,针对该数据进行一定的算法,有效的更改线程数量以及队列容量大小;
这一块没有找到相关的介绍,后续完善该章节的内容............
四. 总结
经过上述的解读,大体对线程池中的线程池状态,任务提交,任务执行等逻辑有了大体的认知,里面照着代码梳理了流程图,还未对整体进行解读,为什么有这么多条件限制等未进行解读,后续完善该篇........
版权声明: 本文为 InfoQ 作者【邱学喆】的原创文章。
原文链接:【http://xie.infoq.cn/article/759bb1edda40c51ab0e2e1843】。文章转载请联系作者。
邱学喆
计算机原理的深度解读,源码分析。 2018.08.26 加入
在IT领域keep Learning。要知其然,也要知其所以然。原理的爱好,源码的阅读。输出我对原理以及源码解读的理解。个人的仓库:https://gitee.com/Michael_Chan











评论