重走 JAVA 之路(五):面试又被问线程池原理?教你如何反击
private static int workerCountOf(int c) { return c & CAPACITY; }private static int ctlOf(int rs, int wc) { return rs | wc; }
AtomicInteger 是一个原子操作类,保证线程安全,采用低 29 位表示线程的最大数量,高 3 位表示 5 种线程池状态,维护两个参数,workCount 和 runState。workCount 表示有效的线程数量,runState 表示线程池的运行状态。
RUNNING:运行状态,可以接受新任务并处理
SHUTDOWN:关闭状态,不会接受新的任务了,但是会处理队列中还存在的任务
STOP:停止状态,不会接受新的任务,也不处理队列任务,直接中断
TIDYING:表示所有任务已经终止了
TERMINATED:表示
terminated()
方法已经执行完成
引用一张图片帮助大家理解 5 个状态
3.执行流程
execute()
public void execute(Runnable command) {if (command == null)throw new NullPointerException();/*
Proceed in 3 steps:
If fewer than corePoolSize threads are running, try to
start a new thread with the given command as its first
task. The call to addWorker atomically checks runState and
workerCount, and so prevents false ala
rms that would add
threads when it shouldn't, by returning false.
If a task can be successfully queued, then we still need
to double-check whether we should have added a thread
(because existing ones died since last checking) or that
the pool shut down since entry into this method. So we
recheck state and if necessary roll back the enqueuing if
stopped, or start a new thread if there are none.
If we cannot queue task, then we try to add a new
thread. If it fails, we know we are shut down or saturated
and so reject the task.*/int c = ctl.get();//如果当前线程数量小于核心线程数量,执行 addWorker 创建新线程执行 command 任务 if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}//如果当前是运行状态,将任务放入阻塞队列,double-check 线程池状态 if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();//如果再次 check,发现线程池状态不是运行状态了,移除刚才添加进来的任务,并且拒绝改任务 if (! isRunning(recheck) && remove(command))reject(command);//处于运行状态,但是没有线程,创建线程 else if (workerCountOf(recheck) == 0)addWorker(null, false);}//往线程池中创建新的线程失败,则 reject 任务 else if (!addWorker(command, false))reject(command);}
这里大概总结下 execute 方法的执行流程,其实大家看源码方法注释是一样很好的学习方法
首先判断当前线程数量是不是比核心线程数量少,如果是,直接创建核心线程执行任务,否则走第二步
如果当前线程数量等于核心线程数量了,那么就任务排期,将任务放进任务队列,放入成功后,再次 check 线程池状态,这里说明一下,在多线程的环境下,ctl.get()这个方法并不是一个原子操作,如果加入队列后,线程池状态改变了,不是 RUNNING 状态,那么这个任务将永远不会被执行,所以需要再次 check,如果不是 RUNNING 状态,移除任务并拒绝任务,如果是 RUNNING 状态并且当前没有线程,则直接创建线程
走到这一步前提就是第二步中的添加队列失败了,也就是任务队列满了,那么这个时候就考虑到创建非核心线程去执行任务,如果添加非核心线程也失败,那就直接拒绝
这里注意一点,当核心线程满的时候,并不会去直接创建非核心线程去执行任务,而是先放进任务队列,可以理解为需求任务首先是需要让内部核心员工去完成的,任务队列的优先级是高于非核心员工的,addWorker(),这里的传进去的 boolean 值,就代表着创建核心线程或者非核心线程
reject()
final void reject(Runnable command) {handler.rejectedExecution(command, this);}
拒绝任务很简单,reject 方法会调用 handler 的 rejectedExecution(command,this)方法,handler 是 RejectedExecutionHandler 接口,默认实现是 AbortPolicy,下面是 AbortPolicy 的实现:
public static class AbortPolicy implements RejectedExecutionHandler {public AbortPolicy() { }public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {throw new RejectedExecutionException("Task " + r.toString() +" rejected from " +e.toString());}}
可以看到默认策略是直接抛出异常的,这只是默认使用的策略,可以通过实现接口实现自己的逻辑。
addWorker()
private boolean addWorker(Runnable firstTask, boolean core) {retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);// 这里 return false 的情况有以下几种//1.当前状态是 stop 及以上 2.当前是 SHUTDOWN 状态,但是 firstTask 不为空//3.当前是 SHUTDOWN 状态,但是队列中为空//从第一节我们知道,SHUTDOWN 状态是不执行进来的任务的,但是会继续执行队列中的任务 if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {int wc = workerCountOf(c);if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;if (compareAndIncrementWorkerCount(c))break retry;c = ctl.get(); // Re-read ctlif (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}}boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {w = new Worker(firstTask);final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {int rs = runStateOf(ctl.get());if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();workers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}if (workerAdded) {t.start();workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted;}
这里就主要流程分析下
2 层循环,外部循环查询线程池状态,如果当前是 stop 及之上的状态,直接 return,如果是 SHUTDOWN 状态,并且 firstTask 不为空或者队列中是空的,直接 return
内部循环查询线程数量,通过传递进来的 boolean 值,分别和核心线程以及最大线程数量进行对比,如果成立,worker 数量+1,并且跳出循环。
跳出循环就是实际执行任务了,Worker 就将工作线程和任务封装到了自己内部,我们可以将 Worker 看成就是一个工作线程,至于 Worker 是如何执行任务和从阻塞队列中取任务
Worker()
private final class Workerextends AbstractQueuedSynchronizerimplements Runnable {/** Thread this worker is running in. Null if factory fails. /final Thread thread;/* Initial task to run. Possibly null. /Runnable firstTask;/* Per-thread task counter */volatile long completedTasks;Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorkerthis.firstTask = firstTask;this.thread = getThreadFactory().newThread(this);}public void run() {runWorker(this);}......}
可以看到,Worker 内部维护,一个线程变量以及任务变量,启动一个 Worker 对象中包含的线程 thread, 就相当于要执行 runWorker()方法, 并将该 Worker 对象作为该方法的参数.
runWorker()
final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // allow interruptsboolean completedAbruptly = true;try {//task 不为空,执行当前任务,任务执行完后将 task 置位空,getTask 方法接着不断从队列中取任务 while (task != null || (task = getTask()) != null) {w.lock();//再次 check 线程池状态,如果是 stop 状态,直接 interrupt()中断任务 if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {beforeExecute(wt, task);Throwable thrown = null;try {//执行任务 task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {afterExecute(task, thrown);}} finally {task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly);}}
通过 while 循环不断的调用 getTask 方法,获取任务 task 并进行执行,如果任务都执行完,跳出循环,线程结束并减少当前线程数量。
getTask()
private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?for (;;) {int c = ctl.get();
评论