写点什么

线程池系列 - (4)工作流程

用户头像
Android架构
关注
发布于: 10 小时前

====


正如我们前面分析的一样。当小于核心线程数的时候会执行 addWorker添加一个任务,那么它又是如何运行的呢?


先看一下他的源码,其实还是不少的。笔者将其分成 2 块进行分析。


  • for 循环

  • try cache


上半部分 for

先看到 for 循环这一块。



  • ① 中 我们已经分析了。当线程池运行状态的时候不会触发。

  • ② 首先看到。截图中是有两个 for(;;) 循环。第二个循环(红色框②)

  • 先判断是否超过最大线程数目。CAPACITY 的值如下??



  • 或者是否超过核心线程还是最大线程。这里我们选择的是核心线程数。如下??我们传入的是 true



  • 然后执行了 compareAndIncrementWorkerCount 尝试更新一下原来记录的线程数量。

  • true 说明 CAS 记录成功了,并且跳出大循环,也就是最外层的循环。

  • false 说明没有记录成功。继续执行 for 循环,直到返回 true 为止


上半部分其实就是更新了线程池中线程数量。并没有真的添加任务

下半部分 try cache

继续看下半部分。try cache 部分。



笔者这里分成五步。


  • 第①步。任务被包装在了Worker 中。

  • 第②步。上锁

  • 第③步。添加任务

  • 第④步。解锁

  • 第⑤步。执行任务


在第①步。任务被包装在了Worker中。Worker是任务与线程的包装,如下图,通过线程创建工厂创建一个新的线程,将任务和线程组合在一起。



在看一下Worker继承与哪些。注意有一个 Runnable



看一下第③步,添加任务


try {// 检查线程池状态 int rs = runStateOf(ctl.get());// 如果实在运行状态 或者 (线程池处于 SHUTDOWN 并且 任务是 null)if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {// 检查线程是否存活 if (t.isAlive()) // precheck that t is startable// 抛出异常 throw new IllegalThreadStateException();//将任务添加到 workers (HashSet) 中 workers.add(w);}}


来看一下他的两个判断条件


  • 线程池处于运行状态

  • 线程池处于SHUTDOWN 状态 并且 firstTask == null


第一个比较好理解。关键是第二个条件。笔者这边查阅了源码。发现不少地方都有 addWorker(null,true/flase) 的地方。应该是对这些方法进行了判断。



在看一下检查线程是否存活.


首先isAlive()true 说明线程处于执行状态。换句话说。当先线程处于RUNNABLE TIMED_WAITING BLOCKED WAITING 中的一种。不管哪一种。都说明线程现在是在工作的。


线程池中。刚刚才创建的任务就已经执行了。都还没有start 呢。所以就有了一个precheck that t is startable (预先检查 t 是否可启动)

执行任务

这里就比较好理解。将所有的 Worker 都添加到workers 也就是HashSet



这都不是重点。重点在 ?? t.start()



我们都知道线程的 start()肯定有一个地方执行了run。而线程池中将这他隐藏在了 Worker 之中。在看一下Worker。实现了Runnable。并且实现了 run 方法。当线程 start() 之后。就会执行到这里的run,然后执行runWorker(this);



到这里我们知道正在执行任务的地方就在 runWorker. 这里的核心代码就一行task.run();将任务执行起来。


final void runWorker(Worker w) {// 拿到当前线程 Thread wt = Thread.currentThread();// 拿到当前任务 Runnable task = w.firstTask;// 将 Worker.firstTask 置空 并且释放锁 w.firstTask = null;w.unlock(); // allow interruptsboolean completedAbruptly = true;try {// 如果 task 或者 getTask 不为空,则一直循环 while (task != null || (task = getTask()) != null) {// 加锁 w.lock();// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted. This// requires a recheck in second case to deal with// shutdownNow race while clearing interrupt// return ctl.get() >= stop// 如果线程池状态>=STOP 或者 (线程中断且线程池状态>=STOP)且当前线程没有中断// 其实就是保证两点:// 1. 线程池没有停止// 2. 保证线程没有中断 if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())// 中断当前线程 wt.interrupt();try {// 空方法 beforeExecute(wt, task);Throwable thrown = null;try {// ?? 执行 run 方法(Runable 对象)task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {afterExecute(task, thrown);}} finally {// 执行完后, 将 task 置空, 完成任务++, 释放锁 task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {// 退出工作 processWorkerExit(w, completedAbruptly);}


看到while (task != null || (task = getTask()) != null) 这个代码。有两个条件


  • task 不是空

  • getTask()能够获取到任务


重点还是看一下getTask()方法。其实就是从队列中获取任务。


private Runnable getTask() {// 标记是否超时 boolean timedOut = false; // Did the last poll() time out?// 死循环 for (;;) {// 获取线程池状态 int c = ctl.get();int rs = runStateOf(c);


// Check if queue empty only if necessary.// 判读线程池状态是否大于等于 SHUTDOWN// 再线程池状态是否大于等于 STOP 或 判断任务队列是否为空 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {// 递减 workerCountdecrementWorkerCount();// 返回 null, 表示没有更多的任务 return null;}


// 获取 workerCountint wc = workerCountOf(c);


// 判断是否使用超时获取任务// 1. allowCo


《Android学习笔记总结+最新移动架构视频+大厂安卓面试真题+项目实战源码讲义》
浏览器打开:qq.cn.hn/FTe 免费领取
复制代码


reThreadTimeOut : 可以通过 Thread#allowCoreThreadTimeOut 设置// 2. 如果 workerCount 大于 coolPoolSize 也进行超时获取任务 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;


// 如果 workerCount 大于 maximumPoolSize 或者获取任务超时// 并且 workerCount > 1 或者 任务队列为空//// workerCount 大于 maximumPoolSize 基本不可能,所以主要判断是否超时 if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {// 判断递减 workerCount 是否成功// 成功返回 null 表示没有更多任务, 失败继续死循环 if (compareAndDecrementWorkerCount(c))return null;continue;}


try {// 判断是否使用超时从任务队列获取任务// poll 方法在规定时间没有获取任务返回 null// take 方法则是一直阻塞直到获取任务 Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();// 如果获取的任务不为 null, 返回任务 if (r != null)return r;// 标记超时 timedOut = true;} catch (InterruptedException retry) {// 如果出现异常,重置超时标志 timedOut = false;}

用户头像

Android架构

关注

还未添加个人签名 2021.10.31 加入

还未添加个人简介

评论

发布
暂无评论
线程池系列 - (4)工作流程