写点什么

重走 JAVA 之路(五):面试又被问线程池原理?教你如何反击 (1)

  • 2021 年 11 月 12 日
  • 本文字数:3590 字

    阅读完需:约 12 分钟

  • task. The call to addWorker atomically checks runState and

  • workerCount, and so prevents false alarms that would add

  • threads when it shouldn't, by returning false.

  • If a task can be successfully queued, then we still need

  • to double-check whether we should have added a thread

  • (because existing ones died since last checking) or that

  • the pool shut down since entry into this method. So we

  • recheck state and if necessary roll back the enqueuing if

  • stopped, or start a new thread if there are none.

  • If we cannot queue task, then we try to add a new

  • thread. If it fails, we know we are shut down or saturated

  • and so reject the task.*/int c = ctl.get();//如果当前线程数量小于核心线程数量,执行 addWorker 创建新线程执行 command 任务 if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}//如果当前是运行状态,将任务放入阻塞队列,double-check 线程池状态 if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();//如果再次 check,发现线程池状态不是运行状态了,移除刚才添加进来的任务,并且拒绝改任务 if (! isRunning(recheck) && remove(command))reject(command);//处于运行状态,但是没有线程,创建线程 else if (workerCountOf(recheck) == 0)addWorker(null, false);}//往线程池中创建新的线程失败,则 reject 任务 else if (!addWorker(command, false))reject(command);}


这里大概总结下 execute 方法的执行流程,其实大家看源码方法注释是一样很好的学习方法


  • 首先判断当前线程数量是不是比核心线程数量少,如果是,直接创建核心线程执行任务,否则走第二步

  • 如果当前线程数量等于核心线程数量了,那么就任务排期,将任务放进任务队列,放入成功后,再次 check 线程池状态,这里说明一下,在多线程的环境下,ctl.get()这个方法并不是一个原子操作,如果加入队列后,线程池状态改变了,不是 RUNNING 状态,那么这个任务将永远不会被执行,所以需要再次 check,如果不是 RUNNING 状态,移除任务并拒绝任务,如果是 RUNNING 状态并且当前没有线程,则直接创建线程

  • 走到这一步前提就是第二步中的添加队列失败了,也就是任务队列满了,那么这个时候就考虑到创建非核心线程去执行任务,如果添加非核心线程也失败,那就直接拒绝


这里注意一点,当核心线程满的时候,并不会去直接创建非核心线程去执行任务,而是先放进任务队列,可以理解为需求任务首先是需要让内部核心员工去完成的,任务队列的优先级是高于非核心员工的,addWorker(),这里的传进去的 boolean 值,就代表着创建核心线程或者非核心线程


reject()


final void reject(Runnable command) {handler.rejectedExecution(command, this);}


拒绝任务很简单,reject 方法会调用 handler 的 rejectedExecution(command,this)方法,handler 是 RejectedExecutionHandler 接口,默认实现是 AbortPolicy,下面是 AbortPolicy 的实现:


public static class AbortPolicy implements RejectedExecutionHandler {public AbortPolicy() { }public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {throw new RejectedExecutionException("Task " + r.toString() +" rejected from " +e.toString());}}


可以看到默认策略是直接抛出异常的,这只是默认使用的策略,可以通过实现接口实现自己的逻辑。


addWorker()


private boolean addWorker(Runnable firstTask, boolean core) {retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);// 这里 return false 的情况有以下几种//1.当前状态是 stop 及以上 2.当前是 SHUTDOWN 状态,但是 firstTask 不为空//3.当前是 SHUTDOWN 状态,但是队列中为空//从第一节我们知道,SHUTDOWN 状态是不执行进来的任务的,但是会继续执行队列中的任务 if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {int wc = workerCountOf(c);if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;if (compareAndIncrementWorkerCount(c))break retry;c = ctl.get(); // Re-read ctlif (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}}boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {w = new Worker(firstTask);final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {int rs = runStateOf(ctl.get());if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();workers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}if (workerAdded) {t.start();workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted;}


这里就主要流程分析下


  • 2 层循环,外部循环查询线程池状态,如果当前是 stop 及之上的状态,直接 return,如果是 SHUTDOWN 状态,并且 firstTask 不为空或者队列中是空的,直接 return

  • 内部循环查询线程数量,通过传递进来的 boolean 值,分别和核心线程以及最大线程数量进行对比,如果成立,worker 数量+1,并且跳出循环。

  • 跳出循环就是实际执行任务了,Worker 就将工作线程和任务封装到了自己内部,我们可以将 Worker 看成就是一个工作线程,至于 Worker 是如何执行任务和从阻塞队列中取任务


Worker()


p


【一线大厂Java面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义】
浏览器打开:qq.cn.hn/FTf 免费领取
复制代码


rivate final class Workerextends AbstractQueuedSynchronizerimplements Runnable {/** Thread this worker is running in. Null if factory fails. /final Thread thread;/* Initial task to run. Possibly null. /Runnable firstTask;/* Per-thread task counter */volatile long completedTasks;Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorkerthis.firstTask = firstTask;this.thread = getThreadFactory().newThread(this);}public void run() {runWorker(this);}......}


可以看到,Worker 内部维护,一个线程变量以及任务变量,启动一个 Worker 对象中包含的线程 thread, 就相当于要执行 runWorker()方法, 并将该 Worker 对象作为该方法的参数.


runWorker()


final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // allow interruptsboolean completedAbruptly = true;try {//task 不为空,执行当前任务,任务执行完后将 task 置位空,getTask 方法接着不断从队列中取任务 while (task != null || (task = getTask()) != null) {w.lock();//再次 check 线程池状态,如果是 stop 状态,直接 interrupt()中断任务 if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {beforeExecute(wt, task);Throwable thrown = null;try {//执行任务 task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {afterExecute(task, thrown);}} finally {task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly);}}


通过 while 循环不断的调用 getTask 方法,获取任务 task 并进行执行,如果任务都执行完,跳出循环,线程结束并减少当前线程数量。


getTask()


private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?for (;;) {int c = ctl.get();int rs = runStateOf(c);


// Check if queue empty only if necessary.if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();return null;}


int wc = workerCountOf(c);


// Are workers subject to culling?boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;


if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}


try {Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;timedOut = true;} catch (InterruptedException retry) {timedOut = false;

评论

发布
暂无评论
重走JAVA之路(五):面试又被问线程池原理?教你如何反击(1)