写点什么

史上最全的 Java 并发系列之 Java 中的线程池

作者:自然
  • 2022 年 8 月 13 日
    广东
  • 本文字数:10127 字

    阅读完需:约 33 分钟

前言

文本已收录至我的 GitHub 仓库,欢迎 Star:https://github.com/bin392328206/six-finger

种一棵树最好的时间是十年前,其次是现在

我知道很多人不玩 qq 了,但是怀旧一下,欢迎加入六脉神剑 Java 菜鸟学习群,群聊号码:549684836 鼓励大家在技术的路上写博客

前言

Java 中的线程池是运用场景最多的并发框架,几乎所有需要异步或并发执行任务的程序都可以使用线程池。在开发过程中,合理地使用线程池能够带来 3 个好处。


  • 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。

  • 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。

  • 提高线程的可管理性。线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统- 一分配、调优和监控。但是,要做到合理利用线程池,必须对其实现原理了如指掌。

线程池的实现原理

当向线程池提交一个任务之后,线程池是如何处理这个任务的呢?本文来看一下线程池的主要处理流程,处理流程图下图所示。



从图中可以看出,当提交一个新任务到线程池时,线程池的处理流程如下。


  1. 线程池判断核心线程池里的线程是否都在执行任务。如果不是,则创建一个新的工作线程来执行任务。如果核心线程池里的线程都在执行任务,则进入下个流程。

  2. 线程池判断工作队列是否已经满。如果工作队列没有满,则将新提交的任务存储在这个工作队列里。如果工作队列满了,则进入下个流程。

  3. 线程池判断线程池的线程是否都处于工作状态。如果没有,则创建一个新的工作线程来执行任务。如果已经满了,则交给饱和策略来处理这个任务。


ThreadPoolExecutor 执行 execute() 方法的示意图如下:



ThreadPoolExecutor 执行 execute 方法分下面 4 种情况:


  • 如果当前运行的线程少于 corePoolSize,则创建新线程来执行任务(注意,执行这一步骤需要获取全局锁)。上图 1

  • 如果运行的线程等于或多于 corePoolSize,则将任务加入 BlockingQueue。上图 2

  • 如果无法将任务加入 BlockingQueue(队列已满),则创建新的线程来处理任务(注意,执行这一步骤需要获取全局锁)。上图 3

  • 如果创建新线程将使当前运行的线程超出 maximumPoolSize,任务将被拒绝,并调用 RejectedExecutionHandler.rejectedExecution()方法。上图 4


ThreadPoolExecutor 采取上述步骤的总体设计思路,是为了在执行 execute()方法时,尽可能地避免获取全局锁(那将会是一个严重的可伸缩瓶颈)。在 ThreadPoolExecutor 完成预热之后(当前运行的线程数大于等于 corePoolSize),几乎所有的 execute()方法调用都是执行 上图 2 ,而 上图 2 不需要获取全局锁。

源码分析

上面的流程分析让我们很直观地了解了线程池的工作原理,让我们再通过源代码来看看是如何实现的,线程池执行任务的方法如下:


public void execute(Runnable command) {    if (command == null)        throw new NullPointerException();
int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { // 如果线程数小于基本线程数,则创建线程并执行当前任务 if (addWorker(command, true)) return; c = ctl.get(); }
// 如线程数大于等于基本线程数或线程创建失败,则将当前任务放到工作队列中。 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (!isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) // 如果线程池不处于运行中或任务无法放入队列, //并且当前线程数量小于最大允许的线程数量,则创建一个线程执行任务.
// 抛出RejectedExecutionException异常 reject(command);}
复制代码


工作线程:线程池创建线程时,会将线程封装成工作线程 Worker,Worker 在执行完任务后,还会循环获取工作队列里的任务来执行。我们可以从 Worker 类的 run()方法里看到这点


public void run() {    try {        Runnable task = firstTask;        firstTask = null;        while (task != null || (task = getTask()) != null) {            runTask(task);            task = null;        }    } finally {        workerDone(this);    }}
复制代码


说明线程池中的线程都是运行状态


ThreadPoolExecutor 中线程执行任务的示意图如下:



线程池中的线程执行任务分两种情况:


  • 在 execute()方法中创建一个线程时,会让这个线程执行当前任务。

  • 这个线程执行完上图中 1 的任务后,会反复从 BlockingQueue 获取任务来执行。

线程池的源码(ThreadPoolExecutor)

我就讲讲 Java 的 ThreadPoolExecutor 吧,Spring 的 ThreadPoolTaskExecutor 底层也是 Java 的 ThreadPoolExecutor

继承结构


  • Executor 接口只有一个方法 execute,传入线程任务参数

  • ExecutorService 接口继承 Executor 接口,并增加了 submit、shutdown、invokeAll 等等一系列方法。

  • AbstractExecutorService 抽象类实现 ExecutorService 接口,并且提供了一些方法的默认实现,例如 submit 方法、invokeAny 方法、invokeAll 方法。像 execute 方法、线程池的关闭方法(shutdown、shutdownNow 等等)就没有提供默认的实现。

  • ThreadPoolExecutor 一个最下面的底层实现,我们具体就来看看它

基本属性

    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));    private static final int COUNT_BITS = Integer.SIZE - 3;    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;     // runState is stored in the high-order bits    private static final int RUNNING    = -1 << COUNT_BITS;    private static final int SHUTDOWN   =  0 << COUNT_BITS;    private static final int STOP       =  1 << COUNT_BITS;    private static final int TIDYING    =  2 << COUNT_BITS;    private static final int TERMINATED =  3 << COUNT_BITS;     // Packing and unpacking ctl    private static int runStateOf(int c)     { return c & ~CAPACITY; }   // 获取线程池运行状态    private static int workerCountOf(int c)  { return c & CAPACITY; }    // 获取线程数量    private static int ctlOf(int rs, int wc) { return rs | wc; }    
复制代码


ctl 是线程池的控制状态,是 AtomicInteger 类型的,里面包含两部分,workcount---线程的数量,runState---线程池的运行状态。这里限制了最大线程数是 2^29-1,大约 500 百万个线程,这也是个问题,所以 ctl 也可以变成 AtomicLong 类型的。


线程池的五种状态:


  • RUNNING - 接受新任务并且继续处理阻塞队列中的任务

  • SHUTDOWN - 不接受新任务但是会继续处理阻塞队列中的任务

  • STOP -  不接受新任务,不在执行阻塞队列中的任务,中断正在执行的任务

  • TIDYING - 所有任务都已经完成,线程数都被回收,线程会转到 TIDYING 状态会继续执行钩子方法

  • TERMINATED - 钩子方法执行完毕


线程之间的转换:


  • RUNNING -> SHUTDOWN

  • 显式调用 shutdown()方法, 或者隐式调用了 finalize()方法

  • (RUNNING or SHUTDOWN) -> STOP

  • 显式调用 shutdownNow()方法

  • SHUTDOWN -> TIDYING

  • 当线程池和任务队列都为空的时候

  • STOP -> TIDYING

  • 当线程池为空的时候

  • TIDYING -> TERMINATED

  • 当 terminated() hook 方法执行完成时候

构造函数


4 个构造函数,最后都是调用一个方法


public ThreadPoolExecutor(int corePoolSize,                          int maximumPoolSize,                          long keepAliveTime,                          TimeUnit unit,                          BlockingQueue<Runnable> workQueue,                          ThreadFactory threadFactory,                          RejectedExecutionHandler handler) {}
复制代码


参数介绍



  • corePoolSize:核心线程数,即最小的 keep alive 线程数,如果 allowCoreThreadTimeOut 设置为 true,则该参数无效,即为 0;

  • maximumPoolSize:最大线程数,即定义了线程池的最大线程数(实际最大值不能超过 CAPACITY);

  • keepAliveTime:空闲时间,即线程的最大空闲时间,默认情况是当线程池中的线程数超过 corePoolSize 时,线程的最大空闲时间,及线程数小于 corePoolSize 时不生效,除非 allowCoreThreadTimeOut 设置为 true;

  • workQueue:任务队列,为阻塞队列,保存需要执行的任务。

  • threadFactory: 创建线程的工厂类。

  • handler: 当 queue 满了和线程数达到最大限制,对于继续到达的任务采取的策略。默认采取 AbortPolicy , 也就是拒绝策略。

拒绝策略

  1. AbortPolicy



/** * 默认的拒绝策略,当继续有任务到来时直接抛出异常 */ public static class AbortPolicy implements RejectedExecutionHandler { public AbortPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); } }
复制代码


  1. DiscardPolicy:rejectedexecution 是个空方法,意味着直接抛弃该任务,不处理。


 public static class DiscardPolicy implements RejectedExecutionHandler {               public DiscardPolicy() { }         public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {        }
复制代码


  1. DiscardOldestPolicy:抛弃 queue 中的第一个任务,再次执行该任务。


public static class DiscardOldestPolicy implements RejectedExecutionHandler {                public DiscardOldestPolicy() { }         public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {            if (!e.isShutdown()) {                e.getQueue().poll();                e.execute(r);            }        }    }
复制代码


  1. CallerRunsPolicy: 直接由执行该方法的线程继续执行该任务,除非调用了 shutdown 方法,这个任务才会被丢弃,否则继续执行该任务会发生阻塞。


 public static class CallerRunsPolicy implements RejectedExecutionHandler {               public CallerRunsPolicy() { }         public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {            if (!e.isShutdown()) {                r.run();            }        }    }
复制代码

workQueue

用于保存等待执行的任务的阻塞队列。可以选择以下几个阻塞队列:


  • rrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序。

  • LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按 FIFO 排序元素,吞吐量通常要高于 ArrayBlockingQueue。静态工厂- 方法 Executors.newFixedThreadPool()使用了这个队列。

  • SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态- ,吞吐量通常要高于 LinkedBlockingQueue,静态工厂方法 Executors.newCachedThreadPool 使用了这个队列。

  • PriorityBlockingQueue:一个具有优先级的无限阻塞队列。

execute 方法

 public void execute(Runnable command) {        if (command == null)            throw new NullPointerException();        /*         *  3步操作         *         * 1. 如果当前运行的线程数<核心线程数,创建一个新的线程执行任务,调用addWorker方法原子性地检查         *    运行状态和线程数,通过返回false防止不需要的时候添加线程         * 2. 如果一个任务能够成功的入队,仍然需要双重检查,因为我们添加了一个线程(有可能这个线程在上次检查后就已经死亡了)         *    或者进入此方法的时候调用了shutdown,所以需要重新检查线程池的状态,如果必要的话,当停止的时候要回滚入队操作,         *    或者当线程池为空的话创建一个新的线程         * 3. 如果不能入队,尝试着开启一个新的线程,如果开启失败,说明线程池已经是shutdown状态或饱和了,所以拒绝执行该任务         */        int c = ctl.get();        if (workerCountOf(c) < corePoolSize) {            if (addWorker(command, true))                return;            c = ctl.get();        }        if (isRunning(c) && workQueue.offer(command)) {            int recheck = ctl.get();            if (! isRunning(recheck) && remove(command))                reject(command);            else if (workerCountOf(recheck) == 0)                addWorker(null, false);        }        else if (!addWorker(command, false))            reject(command);    }
复制代码


  1. 检查当前线程池中的线程数是否<核心线程数,如果小于核心线程数,就调用 addWorker 方法创建一个新的线程执行任务,addworker 中的第二个参数传入 true,表示当前创建的是核心线程。如果当前线程数>=核心线程数或者创建线程失败的话,直接进入第二种情况。

  2. 通过调用 isRunning 方法判断线程池是否还在运行,如果线程池状态不是 running,那就直接退出 execute 方法,没有执行的必要了;如果线程池的状态是 running,尝试着把任务加入到 queue 中,再次检查线程池的状态, 如果当前不是 running,可能在入队后调用了 shutdown 方法,所以要在 queue 中移除该任务,默认采用拒绝策略直接抛出异常。如果当前线程数为 0,可能把 allowCoreThreadTimeOut 设为了 true,正好核心线程全部被回收,所以必须要创建一个空的线程,让它自己去 queue 中去取任务执行。

  3. 如果当前线程数>核心线程数,并且入队失败,调用 addWorker 方法创建一个新的线程去执行任务,第二个参数是 false,表示当前创建的线程不是核心线程。这种情况表示核心线程已满并且 queue 已满,如果当前线程数小于最大线程数,创建线程执行任务。如果当前线程数>=最大线程数,默认直接采取拒绝策略。

addWorker 方法

看一下 addWorker 是怎么具体执行的。代码有点长,慢慢看。


 private boolean addWorker(Runnable firstTask, boolean core) {        retry:        for (;;) {            int c = ctl.get();            int rs = runStateOf(c);             // Check if queue empty only if necessary.            if (rs >= SHUTDOWN &&                ! (rs == SHUTDOWN &&                   firstTask == null &&                   ! workQueue.isEmpty()))                return false;             for (;;) {                int wc = workerCountOf(c);                if (wc >= CAPACITY ||                    wc >= (core ? corePoolSize : maximumPoolSize))                    return false;                if (compareAndIncrementWorkerCount(c))                    break retry;                c = ctl.get();  // Re-read ctl                if (runStateOf(c) != rs)                    continue retry;                // else CAS failed due to workerCount change; retry inner loop            }        }         boolean workerStarted = false;        boolean workerAdded = false;        Worker w = null;        try {            w = new Worker(firstTask);            final Thread t = w.thread;            if (t != null) {                final ReentrantLock mainLock = this.mainLock;                mainLock.lock();                try {                    // Recheck while holding lock.                    // Back out on ThreadFactory failure or if                    // shut down before lock acquired.                    int rs = runStateOf(ctl.get());                     if (rs < SHUTDOWN ||                        (rs == SHUTDOWN && firstTask == null)) {                        if (t.isAlive()) // precheck that t is startable                            throw new IllegalThreadStateException();                        workers.add(w);                        int s = workers.size();                        if (s > largestPoolSize)                            largestPoolSize = s;                        workerAdded = true;                    }                } finally {                    mainLock.unlock();                }                if (workerAdded) {                    t.start();                    workerStarted = true;                }            }        } finally {            if (! workerStarted)                addWorkerFailed(w);        }        return workerStarted;    }
复制代码


首先判断线程池的 runstate,如果 runstate 为 shutdown,那么并不能满足第二个条件,runstate != shutdown,所以这是针对 runstate 不是 running,shutdown 的情况,当 runstate>shutdown 时,队列为空,此时仍然有任务的话,直接返回 false,线程池已关闭,并不能在继续执行任务了。


第二个自旋操作就的目的就是对线程数量自增,由于涉及到高并发,所以采用了 cas 来控制,判断线程的 workcount>=CAPACITY,那么直接返回 false,或者通过判断是否核心线程,如果是 true,判断 workcount>=核心线程数,如果是 false,判断 workcount>=最大线程数,直接返回 false。如果不满足上个条件,直接使用 cas 把线程数自增,退出自旋操作。


worker 对象。


  Worker(Runnable firstTask) {            setState(-1); // inhibit interrupts until runWorker            this.firstTask = firstTask;            this.thread = getThreadFactory().newThread(this);        }
复制代码


在当前线程不为空的情况下,加一把可重入锁 reentrantlock,在加锁后,再次检查线程池的状态 runstate,防止在获取到锁之前线程池已经关闭了,线程池的状态为 running 或者状态为 shutdown 并且任务为空的情况下,才能继续往下执行任务,这是充分必要条件。如果当前线程已经开启了,直接抛出异常,这是绝不允许的。


private final HashSet<Worker> workers = new HashSet<Worker>();

addWorkerFailed 方法

如果添加 worker 失败或者开启线程失败就要调用 addWorkerFailed 方法移除失败的 worker。


   private void addWorkerFailed(Worker w) {        final ReentrantLock mainLock = this.mainLock;        mainLock.lock();        try {            if (w != null)                workers.remove(w);            decrementWorkerCount();            tryTerminate();        } finally {            mainLock.unlock();        }    }
复制代码


首先还是获取全局锁 mainlock,接着对 workers 集合中移除 worker,workers 的数量自减。

runWorker 方法

这个方法是运行 task 的方法


 final void runWorker(Worker w) {        Thread wt = Thread.currentThread();        Runnable task = w.firstTask;        w.firstTask = null;        w.unlock(); // allow interrupts        boolean completedAbruptly = true;        try {            while (task != null || (task = getTask()) != null) {                w.lock();                                if ((runStateAtLeast(ctl.get(), STOP) ||                     (Thread.interrupted() &&                      runStateAtLeast(ctl.get(), STOP))) &&                    !wt.isInterrupted())                    wt.interrupt();                try {                    beforeExecute(wt, task);                    Throwable thrown = null;                    try {                        task.run();                    } catch (RuntimeException x) {                        thrown = x; throw x;                    } catch (Error x) {                        thrown = x; throw x;                    } catch (Throwable x) {                        thrown = x; throw new Error(x);                    } finally {                        afterExecute(task, thrown);                    }                } finally {                    task = null;                    w.completedTasks++;                    w.unlock();                }            }            completedAbruptly = false;        } finally {            processWorkerExit(w, completedAbruptly);        }    }
复制代码


首先还是获取当前线程,获取当前 worker 对象中的任务 task,把当前线程的状态由-1 设为 0,表示可以获取锁执行任务,接下来就是一个 while 循环,当 task 不为空或者从 gettask 方法取出的任务不为空的时候,加锁,底层还是使用了 AQS,保证了只有一个线程执行完毕其他线程才能执行。在执行任务之前,必须进行判断,线程池的状态如果>=STOP,必须中断当前线程,如果是 running 或者 shutdown,当前线程不能被中断,防止线程池调用了 shutdownnow 方法必须中断所有的线程。


在处理任务之前,会执行 beforeExecute 方法, 在处理任务之后,执行 afterExecute 方法,这两个都是钩子方法,继承了 ThreadPoolExecutor 可以重写此方法,嵌入自定义的逻辑。一旦在任务运行的过程中,出现异常会直接抛出,所以在实际的业务中,应该使用 try..catch,把这些日常加入到日志中。


任务执行完,就把 task 设为空,累加当前线程完成的任务数,unlock,继续从 queue 中取任务执行。

getTask 方法。

  private Runnable getTask() {        boolean timedOut = false; // Did the last poll() time out?
for (;;) { int c = ctl.get(); int rs = runStateOf(c);
// Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; }
int wc = workerCountOf(c);
// Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; }
try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
复制代码


这个有点复杂,其实目的就是获得一个要运行的任务,不过人家的判断还是比较多的

shutdown 方法

public void shutdown() {        final ReentrantLock mainLock = this.mainLock;        mainLock.lock();        try {            checkShutdownAccess();            advanceRunState(SHUTDOWN);            interruptIdleWorkers();            onShutdown(); // hook for ScheduledThreadPoolExecutor        } finally {            mainLock.unlock();        }        tryTerminate();    }
复制代码


调用了 shutdown,意味着不能在继续往 queue 中添加任务,也不能在接受新的任务。利用 cas 把当前线程池的状态设为 shutdown,中断所有的空闲线程,onShutdown 是一个钩子方法,是专门给 ScheduledThreadPoolExecutor 来实现的,再次调用 tryTerminate 方法来尝试中止线程池,直到 queue 中的任务全部处理完毕才能正常关闭。



提一下,默认的 存任务的队列是 BlockingQueue

结尾

因为很多东西,全是从书上拷贝的,很枯燥,但同时看书,又是最详细的学习方法之一了,大家跟着书看博客,或许会好点吧.最后一节不写了,就是工厂方法里面的几种线程池,我们一般也是自定义线程池来操作的。


因为博主也是一个开发萌新 我也是一边学一边写 我有个目标就是一周 二到三篇 希望能坚持个一年吧 希望各位大佬多提意见,让我多学习,一起进步。

日常求赞

好了各位,以上就是这篇文章的全部内容了,能看到这里的人呀,都是真粉


创作不易,各位的支持和认可,就是我创作的最大动力,我们下篇文章见


六脉神剑 | 文 【原创】如果本篇博客有任何错误,请批评指教,不胜感激 !

发布于: 刚刚阅读数: 3
用户头像

自然

关注

还未添加个人签名 2020.03.01 加入

小六六,目前负责营收超百亿的支付中台

评论

发布
暂无评论
史上最全的Java并发系列之Java中的线程池_线程池_自然_InfoQ写作社区