写点什么

JUC 之 ThreadPoolExecutor 实现原理分析

用户头像
AI乔治
关注
发布于: 2020 年 11 月 04 日
JUC 之ThreadPoolExecutor实现原理分析

ThreadPoolExecutor 工作流程

JDK1.5 中引入了线程池,合理地利用线程池能有效的提高程序的运行效率,但不当的使用线程池也会带来致命的危害。作为使用最多的 ThreadPoolExecutor,很有必要深入理解的其源码与实现原理。

先看一下 ThreadPoolExecutor 是如何工作的,暂时不看源码,这样会先有一个比较直观的印象有利于后面深入分析源码。

既然是线程池那么提交任务后一定要创建线程用于执行任务,ThreadPoolExecutor 创建线程执行提交任务的流程如下。



简单介绍一下,一个任务提交给线程池后,线程池创建线程来执行提交任务的流程。

  1. 当提交任务时线程池中的来用执行任务的线程数小于 corePoolSize(核心线程数),则线程池利用 ThreadFacory(线程工厂)创建线程用于执行提交的任务。否则执行第二 2 步。

  2. 当提交任务时线程池中的来用执行任务的线程数大于 corePoolSize(核心线程数),但 workQueue 没有满,则线程池会将提交的任务先保存在 workQueue(工作队列),等待线程池中的线程执行完其它已提交任务后会循环从 workQueue 中取出任务执行。否则执行第 3 步。

  3. 当提交任务时线程池中的来用执行任务大于 corePoolSize(核心线程数),且 workQueu 已满,但没有超过 maximunPoolSize(最大线程数),则线程池利用 ThreadFacory(线程工厂)创建线程用于执行提交的任务。否则执行 4。

  4. 当提交任务时线程池中的来用执行任务大于 maximunPoolSize,执行线程池中配置的拒绝策略(RejectedExecutionHanlder)。

下图给出了 ThreadPoolExecutor 更加直观的整体运行图。图中标注 1、2、3、4 的分别对应上面分析中的第 1、第 2、第 3、第 4 步。



结合上图补充几点:

  1. 线程池中创建的用于执行提交任务的线程的引用被 Worker 对象持有。Worker 会去执行提交的任务,如果提交的任务已执行完 Worker 会循环地从 workQueue(即图中的 BlockingQueue)中 poll 或 take 任务执行。

  2. 主线程调用 ThreadPoolExecutor 的 prestartCoreThread()或 prestartAllCoreThreads()方法可以在任务还没有提交到线程池前,先创建用于执行提交任务的 Worker,这些 Worker 将等待任务提交。

  3. 线程池饱和时默认地拒绝策略为 AbortPolicy 策略,抛出 RejectedExecutionException 异常,上图中 CallerRunsPolicy 表达的不是默认地拒绝策略,而是 CallerRunsPolicy 策略是会将提交的任务(Task)交给主线程执行。即主线程调用 Task.run()方法。

ThreadPoolExecutor 源码分析



ThreadPoolExecutor 的 UML 类图如上图,其中 Executor 提供最基础的任务执行的抽象 void execute(Runnable command)方法,而 ExecutorService 在其基础上扩展的管理线程池的一些方法 shutdown()、shutdownNow()、isShutdown() 与 isTerminated()等,同时增加了用三个重载的 submit 方法,用于获取任务的执行结果。submit 可以提交 Callable 类型的任务,也可提交 Runnable 类型的任务。AbstractExecutorService 类提供了 newTaskFor 将提交的 Callable 与 Runnable 类型任务转为 FutureTask,同时提供了 sumbit 与 invoke 的默认实现,具体的任务执行逻辑交由子类 ThreadPoolExecutor 的 execute 方法。不管是调用 submit 还是 execute 的提交的任务,最终都交由 ThreadPoolExecutor 的 execute 方法执行。

execute 方法是分析 ThreadPoolExecutor 源码的入口。

分析 execute 方法前先看一下 ThreadPoolExecutor 里面的核心变量与类。

//线程池状态与线程池中有效线程数控制变量,AtomicInteger变量的高3位用于//保存线程池状态,低29位用于保存线程池中有效线程数。 //程线程对应状态如下:// 1、RUNNING: 运行中,接收新的任务或处理队列中的任务 值为-536870912// 2、SHUTDOWN: 关闭,不再接收新的任务,但会处理队列中的任务 值为0// 3、STOP: 停止,不再接收新的任务,也不处理队列中的任务,并中断正在处理的任务 值为536870912// 4、TIDYING:  所有任务已结束,队列大小为0,转变为TIDYING状态的线程将会执行terminated() hook 方法 值为1073741824// 5、TERMINATED: 结束,terminated() 已被执行完 值为1610612736private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
复制代码

与 ctl 变量有关的操作方法

  //获取线程池的运行状态runState  //CAPACITY 二进制值为: 00011111111111111111111111111111   //~CAPACITY 按位取反为:11100000000000000000000000000000   //ctl&~CAPACITY 低29全为0,得到高3位即线程池的runState    private static int runStateOf(int c)     { return c & ~CAPACITY; }    //获取线程池中有效的线程数    private static int workerCountOf(int c)  { return c & CAPACITY; }  //根据runState与workerCount计算出ctl的值    private static int ctlOf(int rs, int wc) { return rs | wc; }    //判断线程池是否处于运行中    private static boolean isRunning(int c) {return c < SHUTDOWN;}
复制代码

其它核心成员变量

//工作队列,提交任务超过corePoolSize时,任务被保证在workQueue中private final BlockingQueue<Runnable> workQueue;//处理wokers的锁private final ReentrantLock mainLock = new ReentrantLock(); //工作作线程集合private final Hash<Worker> workers = new HashSet<Worker>(); //用于支持awaitTermination方法的条件private final Condition termination = mainLock.newCondition(); //曾经创建过的最大工作线程数private int largestPoolSize;//线程池中已完成的总任务数private long completedTaskCount;//线程池创建执行提交任务对应线程时采用的线程工厂private volatile ThreadFactory threadFactory;//线程池饱和时,拒绝策略private volatile RejectedExecutionHandler handler;//allowCoreThreadTimeOut为true时,无任务时情况下核心线程允许存活时间;//线程池中超过核心线程数,那部分工作线程,无任务时情况下核心线程允许存活时间。private volatile long keepAliveTime;//核心工作线程是以超时的方式还是阻塞的方式尝试从workQueue队列里面获取任务,//当以超时的方式获取时,如果在指定时间内还没有获取到任务工作线程run方法将执//行完毕,对应工作线程被GC回收private volatile boolean allowCoreThreadTimeOut;//线程池中核心工作线程数private volatile int corePoolSize;//线程池中最大工作线程数private volatile int maximumPoolSize;// 线程池饱和时,默认拒绝策略 直接抛出异常private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
复制代码

工作线程 Worker 类,Worker 类利用 AQS 框架实现了一个简单的非重入的互斥锁, 实现互斥锁主要目的是为了中断的时候判断线程是在空闲还是运行,可以看后面 shutdown 和 shutdownNow 方法的分析。涉及 AQS 部分暂时不深入分析,后面再写专关于 AQS 的文章。

private final class Worker        extends AbstractQueuedSynchronizer        implements Runnable {       //用于执行提交任务的线程        final Thread thread;        //第一个要执行的任务,可能为null        Runnable firstTask;        //每个工作线程执行的任务数量        volatile long completedTasks;        Worker(Runnable firstTask) {       //阻止中断,直到运行runWorker方法            setState(-1);             this.firstTask = firstTask;       //利用线程工厂创建工作线程,同时让当前Worker.run方法去执行提交的任务            this.thread = getThreadFactory().newThread(this);        }     //工作线程执行任务的入口,具体执行任务代理给runWorker方法        public void run() {            runWorker(this);        }       // The value 0 represents the unlocked state.       // The value 1 represents the locked state.        protected boolean isHeldExclusively() {            return getState() != 0;        }protected boolean tryAcquire(int unused) {            if (compareAndSetState(0, 1)) {                //设置为排它锁                setExclusiveOwnerThread(Thread.currentThread());                //成功获取锁                               return true;            }            //到同步队列中自旋                return false;        }        protected boolean tryRelease(int unused) {            setExclusiveOwnerThread(null);            setState(0);            return true;        }        public void lock()        { acquire(1); }        public boolean tryLock()  { return tryAcquire(1); }        public void unlock()      { release(1); }        public boolean isLocked() { return isHeldExclusively(); }        void interruptIfStarted() {            Thread t;            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {                try {                    t.interrupt();                } catch (SecurityException ignore) {                }               }        }        //省略部分源码}
复制代码

ThreadPoolExecutor 之 execute 方法

execute 方法执行流程可以概括为

如果工作线程数量小于核心线程数量,则创建新的线程处理提交的任务。

如果工作线程数量大于等于核心线程数量,且没有超过最大线程数,则将新提交的任务,加入工作队列中等待执行。

如果工作线程数量大于等于核心线程数量,且工作队列已满,工作线程数量又于小最大线程数量,则创建新的线程处理提交的任务。

如果工作线程数量大于最大线程数量或者线程池是不在运行,执行拒绝策略。

上面流程图其实已说明过了 execute 的大体执行过程。其中,addWorker 方法内部会检测线程池的运行状态,判断任务是否应该被成功提交。

public void execute(Runnable command) {        if (command == null)            throw new NullPointerException();        int c = ctl.get();     // 工作线程数量小于核心线程数,调用addWorker方法创建工作线程。        // 提交任务command作为Worder的第一个任务执行。        if (workerCountOf(c) < corePoolSize) {            if (addWorker(command, true))                return;            c = ctl.get();        }        //工作线程数量大于核心线程数且,线程池在运行则将任务加到队列        if (isRunning(c) && workQueue.offer(command)) {            int recheck = ctl.get();            //重新检查,如果线程池不在RUNNING,删除上一步加入队列的任务            if (! isRunning(recheck) && remove(command))                reject(command);       // 线程池处于RUNNING状态 || 线程池处于非RUNNING状态但是任务移除失败            else if (workerCountOf(recheck) == 0)       // 线程池处于SHUTDOWN状态下,没有活动线程了,但是队列里还有任务没执行这种特殊情况。            // 添加一个null任务是因为SHUTDOWN状态下,线程池不再接受新任务                      addWorker(null, false);        }        //1、非RUNNING状态拒绝新的任务        //2、队列满了启动新的线程失败,即工作线程数量大于最大线程数量(workCount > maximumPoolSize)        else if (!addWorker(command, false))            reject(command);    }
复制代码

ThreadPoolExecutor 之 addWorker 方法

addWorker 创建了 ThreadPoolExecutor 中用于执行提交任务的线程,这个过程同时把任务与执行任务的线程封装到 Worker 对象中。同时 addWorker 还启动了用于执行任务的线程,而具体任务的执行,则代理给了 ThreadPoolExecutor 的 runWorkers 方法。

private boolean addWorker(Runnable firstTask, boolean core) {        retry:        for (;;) {            int c = ctl.get();            int rs = runStateOf(c);        //线程池状态为非RUNNING &&        //(线程池状态为非SHUTDOWN || firstTask不为null || 队列为空) 三者中的一者        //组合一下分别是        //1、线程池状态为非RUNNING && 线程池状态为非SHUTDOWN,        //即,线程池状态为 (STOP || TIDYING || TERMINATED)         //此时线程池不在接受新的任务,通过addWorker新提交的任务会失败                //2、线程池状态为非RUNNING && firstTask不为null              //即,线程池状态为 (SHUTDOWN || STOP || TIDYING || TERMINATED) && firstTask不为null             //此时线程池不在接受新的任务,但有处理队列里的任务,通过addWorker新提交的任务会失败             //3、线程池状态为非RUNNING && 队列为空             //即,线程池状态为(SHUTDOWN || STOP || TIDYING || TERMINATED)&& 队列为空              //此时线程池不在接受新的任务,因为队列中没有任务要处理,        //所以没必要调用addWorker(null, false),创建新的线程去处理工作队列的任务            if (rs >= SHUTDOWN &&                ! (rs == SHUTDOWN &&                   firstTask == null &&                   ! workQueue.isEmpty()))                       return false;       // 线程池状态为RUNNING 或者 (线程池状态为SHUTDOWN状态,且队列中还有任务需要执行)            for (;;) {                int wc = workerCountOf(c);          //工作线程数过大最大值,或者超过核心线程数或超过最大线程数,都返回false          if (wc >= CAPACITY ||                    wc >= (core ? corePoolSize : maximumPoolSize))                    return false;          //原子方式设置线程池中线程数成功,则跳出重试的循环                          if (compareAndIncrementWorkerCount(c))                    break retry;                c = ctl.get();  // Re-read ctl          // 如果线程池的状态发生变化则重试                if (runStateOf(c) != rs)                    continue retry;                // else CAS failed due to workerCount change; retry inner loop            }        }        boolean workerStarted = false;        boolean workerAdded = false;        Worker w = null;        try {       //创建worker对象,worker对象内部            //利用线程工厂创建一个线程去执行提交的任务       //这个线程的target Runnable为 worker本身,       //最终调用worker.run执行提交的任务            w = new Worker(firstTask);            final Thread t = w.thread;            if (t != null) {                final ReentrantLock mainLock = this.mainLock;                mainLock.lock();                try {                    int rs = runStateOf(ctl.get());            // 重新检测线程池的状态,在获取得锁前一步,线程池可能已被终止            // 线程池状态为RUNNING 或者 (线程池状态为SHUTDOWN状态,且队列中还有任务需要执行)                    if (rs < SHUTDOWN ||                        (rs == SHUTDOWN && firstTask == null)) {                        if (t.isAlive()) // precheck that t is startable                            throw new IllegalThreadStateException();                        workers.add(w);                        int s = workers.size();                        if (s > largestPoolSize)                            largestPoolSize = s;                        workerAdded = true;                    }                } finally {                    mainLock.unlock();                }                //worker添加成功                if (workerAdded) {            //启动worker内部线程,worker内部线程的            //target Runnable为worker本身,                    //将运行worker的run方法,run内部调用ThreadPoolExecutor.runWorkers方法                    t.start();                    workerStarted = true;                }                 }        } finally {        //获取得锁前一步,线程池已被终止导致            //workerAdded失败或线程没start。            if (! workerStarted)          //会调用tryTerminate方法                addWorkerFailed(w);        }        return workerStarted;    }
复制代码

ThreadPoolExecutor 之 runWorkers 方法

runWorkers 方法首先会执行 woker 对象中的 firstTask,当 firstTask 执行完后,会通过 getTask 方法循环地从 workerQueue(工作队列)中获取任务去执行。当 workerQueue 中没有任务,getTask 方法会阻塞挂起。runWorkers 中在任务执行前调用了 beforeExecute 扩展点,在任务执行后调用了 afterExecute 扩展点。最后则调用 processWorkerExit 方法作一下清理工作。

final void runWorker(Worker w) {        Thread wt = Thread.currentThread();        Runnable task = w.firstTask;        w.firstTask = null;       //Worker的构造函数中通过setState(-1)抑制了线程中断,       //这里通过unlock允许中断        w.unlock();         boolean completedAbruptly = true;        try {       //首先执行worker中的firestTask,       //然后循环地从workQueue中拉取任务执行            while (task != null || (task = getTask()) != null) {                w.lock();                //如果线程池处于停止中,          //即线程池处于STOP、TIDYING、TERMINATED状态,          // 要确保线程被中断。如果没有确保不被中断。                if ((runStateAtLeast(ctl.get(), STOP) ||                     (Thread.interrupted() &&                      runStateAtLeast(ctl.get(), STOP))) &&                    !wt.isInterrupted())                    wt.interrupt();                try {            //任务执行前扩展点                    beforeExecute(wt, task);                    Throwable thrown = null;                    try {              //执行任务                        task.run();                    } catch (RuntimeException x) {                        thrown = x; throw x;                    } catch (Error x) {                        thrown = x; throw x;                    } catch (Throwable x) {                        thrown = x; throw new Error(x);                    } finally {              //任务执行后扩展点                        afterExecute(task, thrown);                    }                } finally {                    task = null;                    w.completedTasks++;                    w.unlock();                }            }            completedAbruptly = false;        } finally {       //将worker中workerSet中清除,统计完成任务数       //同时调用tryTerminate方法尝试终止线程池            processWorkerExit(w, completedAbruptly);        }    }
复制代码

ThreadPoolExecutor 之 getTask 方法

getTask 方法主要用于从 workQueue 中取出任务交给 runWorker 方法去执行提交的任务,同时完了线程池中核心线程是否要 allowCoreThreadTimeOut 与线程池中线程数量超过 maximunPoolSize 时 timeOut 处理。核心工作线程是以超时的方式还是阻塞的方式尝试从 workQueue 队列里面获取任务,当以超时的方式获取时,如果在指定时间内还没有获取到任务工作线程 run 方法将执行完毕,对应工作线程被 GC 回收。

分析 execute 方法前先看一下 ThreadPoolExecutor 里面的核心变量与类。

private Runnable getTask() {     //上一次从workQueue.poll方法是否超时        boolean timedOut = false;         for (;;) {            int c = ctl.get();            int rs = runStateOf(c);            // 1、线程池为SHUTDOWN以上状态时且工作队列为空时,            // 此时没有任务,直接返回null            // 2、线程池为STOP以上状态时,       // 此时不用处理工作队列中的任务直接返回            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {                decrementWorkerCount();                return null;            }            int wc = workerCountOf(c);            //工作线程是否要在指的timeout时间内被清理            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;       //1、wc > maximumPoolSize && (wc > 1 || workQueue.isEmpty())           //这个种情况按理不会出现??           //2、(timed && timedOut) && (wc > 1 || workQueue.isEmpty())        //影响超时处理             if ((wc > maximumPoolSize || (timed && timedOut))                && (wc > 1 || workQueue.isEmpty())) {                if (compareAndDecrementWorkerCount(c))                    return null;                continue;         }            try {                //影响timeOut的方式从workQueue中获取任务,          //或者以阻塞的方式从workQueue中获取任务                Runnable r = timed ?                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :                    workQueue.take();                if (r != null)                    return r;                timedOut = true;            } catch (InterruptedException retry) {                timedOut = false;            }        }    }
复制代码

ThreadPoolExecutor 之 processWorkerExit 方法

processWorkerExit 方法统计了线程池中总执行的任务数,同时尝试终止线程池。另外还加上当线程池的 runState 为 RUNNING 或 SHUTDOWN 时,由于核心线程数允许超时导致线程池中没有线程处理工作队列中任务的逻辑。即通过 addWorker(null,false)创建一个新的线程来处理工作队列中的任务。

private void processWorkerExit(Worker w, boolean completedAbruptly) {     //runWokder执行异常时,让ctl中有效线程数量减一,      //runWokder正常执行时,getTask方法中workerCount会被减一        if (completedAbruptly)       //ctl中有效线程数量减一            decrementWorkerCount();        final ReentrantLock mainLock = this.mainLock;        mainLock.lock();        try {            //统计已完成的任务数            completedTaskCount += w.completedTasks;       //从wordkerSet中去worker            workers.remove(w);        } finally {            mainLock.unlock();        }     //尝试终止线程池        tryTerminate();        int c = ctl.get();     //线程池的runState为RUNNING或SHUTDOWN时        if (runStateLessThan(c, STOP)) {            //runWorker正常执行            if (!completedAbruptly) {          //线程池最小空闲数,允许core thread超时就是0,          //否则就是corePoolSize                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;                //任务队列不为空,则至少要一个线程处理任务队列中的任务                if (min == 0 && ! workQueue.isEmpty())                    min = 1;                //线程池中有线程处理任务中的任务直接返回                if (workerCountOf(c) >= min)                    return; // replacement not needed            }       //线程池中没有线程处理任务队列中的任务,       //创建一个线程处理任务队列中的任务            addWorker(null, false);        }    }
复制代码

ThreadPoolExecutor 之 tryTerminate 方法

tryTerminate 方法会尝试终止线程池,如果线程池还不能终止则直接返回。如果确定可以终止的话,会调用 terminated 扩展点方法,执行线程池终止前想要做的工作。

final void tryTerminate() {        for (;;) {            int c = ctl.get();            // 1、线程池还处于RUNNING状态直接返回            // 2、线程池状态大于TIDYING,线程池已经停止了或在停止            // 3、线程池为SHUTDOWN状态但是任务队列非空直接返回            if (isRunning(c) ||                runStateAtLeast(c, TIDYING) ||                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))                return;       //线程池中还有工作线程,中断工作线程,退出            if (workerCountOf(c) != 0) {                 interruptIdleWorkers(ONLY_ONE);                return;            }            final ReentrantLock mainLock = this.mainLock;            mainLock.lock();            try {          //cas方式设置ctl状态,成功执行terminated方法                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {                    try {                        //线程池终止前执行的扩展点方法                        terminated();                    } finally {                        ctl.set(ctlOf(TERMINATED, 0));                        //通知awaitTermination方法,继续执行                        termination.signalAll();                    }                    return;                }            } finally {                mainLock.unlock();            }        }    }
复制代码

ThreadPoolExecutor 之 shutdown 方法

shutdown 方法会先将线程池的状态设置为 SHUTDOWN,然后向线程池中的所有线程发出中断信号。最后会调用 tryTermiate 方法尝试终止线程。处理 SHUTDOWN 状态的线程池,不接受新的任务,但会执行工作队列中的任务。

public void shutdown() {        final ReentrantLock mainLock = this.mainLock;        mainLock.lock();        try {       //确保调用者用权限关闭线程池            checkShutdownAccess();       //自旋的方法设置线程池的状态为SHUTDOWN            advanceRunState(SHUTDOWN);       // 注意这里是中断所有空闲的线程:runWorker中等待的线程被中断 → 进入processWorkerExit →            // tryTerminate方法中会保证队列中剩余的任务得到执行。            interruptIdleWorkers();       // hook for ScheduledThreadPoolExecutor            onShutdown();         } finally {            mainLock.unlock();        }        //尝试终止线程池        tryTerminate();    }
复制代码

ThreadPoolExecutor 之 shutdownNow 方法

shutdownNow 方法会先将线程池的状态设置为 SHUTDOWN,然后向线程池中的所有线程发出中断信号。最后会调用 tryTermiate 方法尝试终止线程。处于 STOP 状态的线程池,不接受新的任务,同时由于调用了 drainQueue 使得 workQueue 中任务全被删除,workQueue 中的任务不被执行。

public List<Runnable> shutdownNow() {        List<Runnable> tasks;        final ReentrantLock mainLock = this.mainLock;        mainLock.lock();        try {       //确保调用者用权限关闭线程池            checkShutdownAccess();       //自旋的方法设置线程池的状态为SHUTDOWN            advanceRunState(STOP);       //中断所有线程池中所有线程            interruptWorkers();            //获取未执行的任务       tasks = drainQueue();        } finally {            mainLock.unlock();        }        //尝试终止线程池        tryTerminate();        return tasks;    }
复制代码

shutDown 方法与 shutDownNow 方法,最主要的区别在于 shutDown 调用的是

interruptIdleWorkers()方法,而 shutDownNow 调用的是 interruptWorkers()方法。

ThreadPoolExecutor 之 interruptIdleWorkers

interruptIdleWorkers 方法只会中断空闲的线程。这点是通过 w.tryLock 实现的,由于 runWorker 方法中在 worker 在执行任务前会先调用 worker 的 lock 方法。

所以 tryLock 方法成功时,当前的 worker 一定处于空闲状态。

private void interruptIdleWorkers(boolean onlyOne) {    final ReentrantLock mainLock = this.mainLock;    mainLock.lock();    try {        for (Worker w : workers) {            Thread t = w.thread;            //判断worker持有的线程是否已被中断       //且能通过tryLock能获取到锁。       //由于runWorker方法中执行任务时会先lock,       //如果能tryLock说线程不在执行任务,       //保证了中断的肯定是空闲的线程。            if (!t.isInterrupted() && w.tryLock()) {                try {                    t.interrupt();                } catch (SecurityException ignore) {                } finally {                    w.unlock();                }            }            if (onlyOne)                break;        }    }    finally {        mainLock.unlock();    }}ThreadPoolExecutor之interruptWorkers方法  private void interruptWorkers() {        final ReentrantLock mainLock = this.mainLock;        mainLock.lock();        try {            for (Worker w : workers)          //调用worker的interruptIfStarted方法中断                w.interruptIfStarted();        } finally {            mainLock.unlock();        }    }Worker之interruptIfStarted方法void interruptIfStarted() {     Thread t;     //state为0表示worker unLock,1表示worker lock,     //不管worker是在runWorker还是idle,全部进行中断     if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {       try {             t.interrupt();           } catch (SecurityException ignore) {           }     } }
复制代码

ThreadPoolExecutor 之 prestartCoreThread 方法

prestartCoreThread 方法首先判断当前线程池中的线程数是否小于核心线程数,如果小于则调用 addWorker 创建一个工作线程。该工作线程等待处理后面将要提交的任务

public boolean prestartCoreThread() {    return workerCountOf(ctl.get()) < corePoolSize &&           addWorker(null, true);}ThreadPoolExecutor之prestartAllCoreThreads方法prestartAllCoreThreads与prestartCoreThread方法类似
public int prestartAllCoreThreads() { int n = 0; while (addWorker(null, true)) ++n; return n;}
复制代码

这也是之前 ThreadPoolExecutor 更加直观的整体运行图中 prestartAllCoreThreads 与 prestartCoreThread 指向线程池中核心线程执行有者 Worker 的那部分。

内容有点点多,如果之前没有看过 ThreadPoolExecutor 的源码的话一下子看下来可能比较累,大家可以先收藏后面再根据这个分析的路线一个个看具体方法的分析。本想在这篇文章最后分享一下线程池的一些面试考点与使用时的注意点,但内容实在太多,只能放到下一篇中。


看完三件事❤️

如果你觉得这篇内容对你还蛮有帮助,我想邀请你帮我三个小忙:



  1. 点赞,转发,有你们的 『点赞和评论』,才是我创造的动力。

  2. 关注公众号 『 java 烂猪皮 』,不定期分享原创知识。

  3. 同时可以期待后续文章 ing🚀


作者:叶易

出处:https://club.perfma.com/article/1719588

用户头像

AI乔治

关注

分享后端技术干货。公众号【 Java烂猪皮】 2019.06.30 加入

一名默默无闻的扫地僧!

评论

发布
暂无评论
JUC 之ThreadPoolExecutor实现原理分析