写点什么

「Java 并发编程」从源码分析几道必问线程池的面试题?

发布于: 2020 年 11 月 10 日

这篇文章我们就来分析下上篇文章的几个小问题

  • 线程池是否区分核心线程和非核心线程?

  • 如何保证核心线程不被销毁?

  • 线程池的线程是如何做到复用的?我们先看最后一个问题一般一个线程执行完任务之后就结束了,Thread.start()只能调用一次,一旦这个调用结束,则该线程就到了 stop 状态,不能再次调用 start。如果你对一个已经启动的线程对象再调用一次 start 方法的话,会产生:IllegalThreadStateException 异常,但是 Thread 的 run 方法是可以重复调用的。所以这里也会有一个面试经常问到的问题:「Thread 类中 run()和 start()方法的有什么区别?」下面我们就从 jdk 的源码来一起看看如何实现线程复用的:线程池执行任务的 ThreadPoolExecutor#execute 方法为入口

 public void execute(Runnable command) {     if (command == null)         throw new NullPointerException();        int c = ctl.get();     // 线程池当前线程数小于 corePoolSize 时进入if条件调用 addWorker 创建核心线程来执行任务     if (workerCountOf(c) < corePoolSize) {         if (addWorker(command, true))             return;         c = ctl.get();     }     // 线程池当前线程数大于或等于 corePoolSize ,就将任务添加到 workQueue 中     if (isRunning(c) && workQueue.offer(command)) {      // 获取到当前线程的状态,赋值给 recheck ,是为了重新检查状态         int recheck = ctl.get();         // 如果 isRunning 返回 false ,那就 remove 掉这个任务,然后执行拒绝策略,也就是回滚重新排队         if (! isRunning(recheck) && remove(command))             reject(command);         // 线程池处于 running 状态,但是没有线程,那就创建线程执行任务         else if (workerCountOf(recheck) == 0)             addWorker(null, false);     }     // 如果任务放入 workQueue 失败,则尝试通过创建非核心线程来执行任务     // 创建非核心线程失败,则说明线程池已经关闭或者已经饱和,会执行拒绝策略     else if (!addWorker(command, false))         reject(command); }
复制代码

「excute」方法主要业务逻辑

  • 如果当前的线程池运行线程小于「coreSize」,则创建新线程来执行任务。

  • 如果当前运行的线程等于「coreSize」或多余「coreSize」(动态修改了 coreSize 才会出现这种情况),把任务放到阻塞队列中。

  • 如果队列已满无法将新加入的任务放进去的话,则需要创建新的线程来执行任务。

  • 如果新创建线程已经达到了最大线程数,任务将会被拒绝。

addWorker 方法

上述方法的核心主要就是 addWorker 方法,

private boolean addWorker(Runnable firstTask, boolean core) {       // 前面还有一部分就省略了。。。。
        boolean workerStarted = false;        boolean workerAdded = false;        Worker w = null;        try {            w = new Worker(firstTask);            final Thread t = w.thread;            if (t != null) {                final ReentrantLock mainLock = this.mainLock;                mainLock.lock();                try {                    // Recheck while holding lock.                    // Back out on ThreadFactory failure or if                    // shut down before lock acquired.                    int rs = runStateOf(ctl.get());
                    if (rs < SHUTDOWN ||                        (rs == SHUTDOWN && firstTask == null)) {                        if (t.isAlive()) // precheck that t is startable                            throw new IllegalThreadStateException();                        workers.add(w);                        int s = workers.size();                        if (s > largestPoolSize)                            largestPoolSize = s;                        workerAdded = true;                    }                } finally {                    mainLock.unlock();                }                if (workerAdded) {                    t.start();                    workerStarted = true;                }            }        } finally {            if (! workerStarted)                addWorkerFailed(w);        }        return workerStarted;    }
复制代码

这个方法我们先看看这个「work」类吧

private final class Worker        extends AbstractQueuedSynchronizer        implements Runnable    {             Worker(Runnable firstTask) {            setState(-1); // inhibit interrupts until runWorker            this.firstTask = firstTask;            this.thread = getThreadFactory().newThread(this);        }                public void run() {            runWorker(this);        }
复制代码

「work」类实现了「Runnable」接口,然后 run 方法里面调用了「runWorker」方法

final void runWorker(Worker w) {        Thread wt = Thread.currentThread();        // 新增创建        Runnable task = w.firstTask;        w.firstTask = null;        w.unlock(); // allow interrupts        boolean completedAbruptly = true;        try {             // 判断 task 是否为空,如果不为空直接执行         // 如果 task 为空,调用 getTask() 方法,从 workQueue 中取出新的 task 执行            while (task != null || (task = getTask()) != null) {                w.lock();                if ((runStateAtLeast(ctl.get(), STOP) ||                     (Thread.interrupted() &&                      runStateAtLeast(ctl.get(), STOP))) &&                    !wt.isInterrupted())                    wt.interrupt();                try {                    beforeExecute(wt, task);                    Throwable thrown = null;                    try {                        task.run();                    } catch (RuntimeException x) {                        thrown = x; throw x;                    } catch (Error x) {                        thrown = x; throw x;                    } catch (Throwable x) {                        thrown = x; throw new Error(x);                    } finally {                        afterExecute(task, thrown);                    }                } finally {                    task = null;                    w.completedTasks++;                    w.unlock();                }            }            completedAbruptly = false;        } finally {            processWorkerExit(w, completedAbruptly);        }    }
复制代码

这个 runwork 方法中会优先取 worker 绑定的任务,如果创建这个 worker 的时候没有给 worker 绑定任务,worker 就会从队列里面获取任务来执行,执行完之后 worker 并不会销毁,而是通过 while 循环不停的执行 getTask 方法从阻塞队列中获取任务调用 task.run()来执行任务,这样的话就达到了线程复用的目的。while (task != null || (task = getTask()) != null) 这个循环条件只要 getTask 返回获取的值不为空这个循环就不会终止, 这样线程也就会一直在运行。「那么任务执行完怎么保证核心线程不销毁?非核心线程销毁?」答案就在这个 getTask()方法里面

private Runnable getTask() {  // 超时标记,默认为false,如果调用workQueue.poll()方法超时了,会标记为true  // 这个标记非常之重要,下面会说到  boolean timedOut = false;  for (;;) {    // 获取ctl变量值    int c = ctl.get();    int rs = runStateOf(c);
    // 如果当前状态大于等于SHUTDOWN,并且workQueue中的任务为空或者状态大于等于STOP    // 则操作AQS减少工作线程数量,并且返回null,线程被回收    // 也说明假设状态为SHUTDOWN的情况下,如果workQueue不为空,那么线程池还是可以继续执行剩下的任务    if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {      // 操作AQS将线程池中的线程数量减一      decrementWorkerCount();      return null;    }
    // 获取线程池中的有效线程数量    int wc = workerCountOf(c);
    // 如果主动开启allowCoreThreadTimeOut,或者获取当前工作线程大于corePoolSize,那么该线程是可以被超时回收的    // allowCoreThreadTimeOut默认为false,即默认不允许核心线程超时回收    // 这里也说明了在核心线程以外的线程都为“临时”线程,随时会被线程池回收    boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;        // 这里说明了两点销毁线程的条件:    // 1.原则上线程池数量不可能大于maximumPoolSize,但可能会出现并发时操作了setMaximumPoolSize方法,如果此时将最大线程数量调少了,很可能会出现当前工作线程大于最大线程的情况,这时就需要线程超时回收,以维持线程池最大线程小于maximumPoolSize,    // 2.timed && timedOut 如果为true,表示当前操作需要进行超时控制,这里的timedOut为true,说明该线程已经从workQueue.poll()方法超时了    // 以上两点满足其一,都可以触发线程超时回收    if ((wc > maximumPoolSize || (timed && timedOut))        && (wc > 1 || workQueue.isEmpty())) {      // 尝试用AQS将线程池线程数量减一      if (compareAndDecrementWorkerCount(c))        // 减一成功后返回null,线程被回收        return null;      // 否则循环重试      continue;    }
    try {      // 如果timed为true,阻塞超时获取任务,否则阻塞获取任务      Runnable r = timed ?        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :      workQueue.take();      if (r != null)        return r;      // 如果poll超时获取任务超时了, 将timeOut设置为true      // 继续循环执行,如果碰巧开发者开启了allowCoreThreadTimeOut,那么该线程就满足超时回收了      timedOut = true;    } catch (InterruptedException retry) {      timedOut = false;    }  }}
复制代码

所以保证线程不被销毁的关键代码就是这一句代码

   Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
复制代码

只要 timed 为 false 这个 workQueue.take()就会一直阻塞,也就保证了线程不会被销毁。timed 的值又是通过 allowCoreThreadTimeOut 和正在运行的线程数量是否大于 coreSize 控制的。

  • 只要 getTask 方法返回 null 我们的线程就会被回收(runWorker 方法会调用 processWorkerExit)

  • 这个方法的源码也就解释了为什么我们在创建线程池的时候设置了 allowCoreThreadTimeOut =true 的话,核心线程也会进行销毁。

  • 通过这个方法我也们可以回答上面那个问题线程池是不区分核心线程和非核心线程的。

结束

  • 由于自己才疏学浅,难免会有纰漏,假如你发现了错误的地方,还望留言给我指出来,我会对其加以修正。

  • 如果你觉得文章还不错,你的转发、分享、赞赏、点赞、留言就是对我最大的鼓励。

  • 感谢您的阅读,十分欢迎并感谢您的关注。

  • 巨人的肩膀摘苹果

看完三件事❤️

========

如果你觉得这篇内容对你还蛮有帮助,我想邀请你帮我三个小忙:

点赞,转发,有你们的 『点赞和评论』,才是我创造的动力。

关注公众号 『 Java 斗帝 』,不定期分享原创知识。

同时可以期待后续文章 ing🚀



用户头像

还未添加个人签名 2020.09.07 加入

还未添加个人简介

评论

发布
暂无评论
「Java并发编程」从源码分析几道必问线程池的面试题?