前言
文本已收录至我的 GitHub 仓库,欢迎 Star:https://github.com/bin392328206/six-finger
种一棵树最好的时间是十年前,其次是现在
我知道很多人不玩 qq 了,但是怀旧一下,欢迎加入六脉神剑 Java 菜鸟学习群,群聊号码:549684836 鼓励大家在技术的路上写博客
前言
Java 中的线程池是运用场景最多的并发框架,几乎所有需要异步或并发执行任务的程序都可以使用线程池。在开发过程中,合理地使用线程池能够带来 3 个好处。
降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
提高线程的可管理性。线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统- 一分配、调优和监控。但是,要做到合理利用线程池,必须对其实现原理了如指掌。
线程池的实现原理
当向线程池提交一个任务之后,线程池是如何处理这个任务的呢?本文来看一下线程池的主要处理流程,处理流程图下图所示。
从图中可以看出,当提交一个新任务到线程池时,线程池的处理流程如下。
线程池判断核心线程池里的线程是否都在执行任务。如果不是,则创建一个新的工作线程来执行任务。如果核心线程池里的线程都在执行任务,则进入下个流程。
线程池判断工作队列是否已经满。如果工作队列没有满,则将新提交的任务存储在这个工作队列里。如果工作队列满了,则进入下个流程。
线程池判断线程池的线程是否都处于工作状态。如果没有,则创建一个新的工作线程来执行任务。如果已经满了,则交给饱和策略来处理这个任务。
ThreadPoolExecutor 执行 execute() 方法的示意图如下:
ThreadPoolExecutor 执行 execute 方法分下面 4 种情况:
如果当前运行的线程少于 corePoolSize,则创建新线程来执行任务(注意,执行这一步骤需要获取全局锁)。上图 1
如果运行的线程等于或多于 corePoolSize,则将任务加入 BlockingQueue。上图 2
如果无法将任务加入 BlockingQueue(队列已满),则创建新的线程来处理任务(注意,执行这一步骤需要获取全局锁)。上图 3
如果创建新线程将使当前运行的线程超出 maximumPoolSize,任务将被拒绝,并调用 RejectedExecutionHandler.rejectedExecution()方法。上图 4
ThreadPoolExecutor 采取上述步骤的总体设计思路,是为了在执行 execute()方法时,尽可能地避免获取全局锁(那将会是一个严重的可伸缩瓶颈)。在 ThreadPoolExecutor 完成预热之后(当前运行的线程数大于等于 corePoolSize),几乎所有的 execute()方法调用都是执行 上图 2 ,而 上图 2 不需要获取全局锁。
源码分析
上面的流程分析让我们很直观地了解了线程池的工作原理,让我们再通过源代码来看看是如何实现的,线程池执行任务的方法如下:
public void execute(Runnable command) { if (command == null) throw new NullPointerException();
int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { // 如果线程数小于基本线程数,则创建线程并执行当前任务 if (addWorker(command, true)) return; c = ctl.get(); }
// 如线程数大于等于基本线程数或线程创建失败,则将当前任务放到工作队列中。 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (!isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) // 如果线程池不处于运行中或任务无法放入队列, //并且当前线程数量小于最大允许的线程数量,则创建一个线程执行任务.
// 抛出RejectedExecutionException异常 reject(command);}
复制代码
工作线程:线程池创建线程时,会将线程封装成工作线程 Worker,Worker 在执行完任务后,还会循环获取工作队列里的任务来执行。我们可以从 Worker 类的 run()方法里看到这点
public void run() { try { Runnable task = firstTask; firstTask = null; while (task != null || (task = getTask()) != null) { runTask(task); task = null; } } finally { workerDone(this); }}
复制代码
说明线程池中的线程都是运行状态
ThreadPoolExecutor 中线程执行任务的示意图如下:
线程池中的线程执行任务分两种情况:
线程池的源码(ThreadPoolExecutor)
我就讲讲 Java 的 ThreadPoolExecutor 吧,Spring 的 ThreadPoolTaskExecutor 底层也是 Java 的 ThreadPoolExecutor
继承结构
Executor 接口只有一个方法 execute,传入线程任务参数
ExecutorService 接口继承 Executor 接口,并增加了 submit、shutdown、invokeAll 等等一系列方法。
AbstractExecutorService 抽象类实现 ExecutorService 接口,并且提供了一些方法的默认实现,例如 submit 方法、invokeAny 方法、invokeAll 方法。像 execute 方法、线程池的关闭方法(shutdown、shutdownNow 等等)就没有提供默认的实现。
ThreadPoolExecutor 一个最下面的底层实现,我们具体就来看看它
基本属性
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1; // runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; // Packing and unpacking ctl private static int runStateOf(int c) { return c & ~CAPACITY; } // 获取线程池运行状态 private static int workerCountOf(int c) { return c & CAPACITY; } // 获取线程数量 private static int ctlOf(int rs, int wc) { return rs | wc; }
复制代码
ctl 是线程池的控制状态,是 AtomicInteger 类型的,里面包含两部分,workcount---线程的数量,runState---线程池的运行状态。这里限制了最大线程数是 2^29-1,大约 500 百万个线程,这也是个问题,所以 ctl 也可以变成 AtomicLong 类型的。
线程池的五种状态:
RUNNING - 接受新任务并且继续处理阻塞队列中的任务
SHUTDOWN - 不接受新任务但是会继续处理阻塞队列中的任务
STOP - 不接受新任务,不在执行阻塞队列中的任务,中断正在执行的任务
TIDYING - 所有任务都已经完成,线程数都被回收,线程会转到 TIDYING 状态会继续执行钩子方法
TERMINATED - 钩子方法执行完毕
线程之间的转换:
RUNNING -> SHUTDOWN
显式调用 shutdown()方法, 或者隐式调用了 finalize()方法
(RUNNING or SHUTDOWN) -> STOP
显式调用 shutdownNow()方法
SHUTDOWN -> TIDYING
当线程池和任务队列都为空的时候
STOP -> TIDYING
当线程池为空的时候
TIDYING -> TERMINATED
当 terminated() hook 方法执行完成时候
构造函数
4 个构造函数,最后都是调用一个方法
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {}
复制代码
参数介绍
corePoolSize:核心线程数,即最小的 keep alive 线程数,如果 allowCoreThreadTimeOut 设置为 true,则该参数无效,即为 0;
maximumPoolSize:最大线程数,即定义了线程池的最大线程数(实际最大值不能超过 CAPACITY);
keepAliveTime:空闲时间,即线程的最大空闲时间,默认情况是当线程池中的线程数超过 corePoolSize 时,线程的最大空闲时间,及线程数小于 corePoolSize 时不生效,除非 allowCoreThreadTimeOut 设置为 true;
workQueue:任务队列,为阻塞队列,保存需要执行的任务。
threadFactory: 创建线程的工厂类。
handler: 当 queue 满了和线程数达到最大限制,对于继续到达的任务采取的策略。默认采取 AbortPolicy , 也就是拒绝策略。
拒绝策略
AbortPolicy
/** * 默认的拒绝策略,当继续有任务到来时直接抛出异常 */ public static class AbortPolicy implements RejectedExecutionHandler { public AbortPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); } }
复制代码
DiscardPolicy:rejectedexecution 是个空方法,意味着直接抛弃该任务,不处理。
public static class DiscardPolicy implements RejectedExecutionHandler { public DiscardPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { }
复制代码
DiscardOldestPolicy:抛弃 queue 中的第一个任务,再次执行该任务。
public static class DiscardOldestPolicy implements RejectedExecutionHandler { public DiscardOldestPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } } }
复制代码
CallerRunsPolicy: 直接由执行该方法的线程继续执行该任务,除非调用了 shutdown 方法,这个任务才会被丢弃,否则继续执行该任务会发生阻塞。
public static class CallerRunsPolicy implements RejectedExecutionHandler { public CallerRunsPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } } }
复制代码
workQueue
用于保存等待执行的任务的阻塞队列。可以选择以下几个阻塞队列:
rrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序。
LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按 FIFO 排序元素,吞吐量通常要高于 ArrayBlockingQueue。静态工厂- 方法 Executors.newFixedThreadPool()使用了这个队列。
SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态- ,吞吐量通常要高于 LinkedBlockingQueue,静态工厂方法 Executors.newCachedThreadPool 使用了这个队列。
PriorityBlockingQueue:一个具有优先级的无限阻塞队列。
execute 方法
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * 3步操作 * * 1. 如果当前运行的线程数<核心线程数,创建一个新的线程执行任务,调用addWorker方法原子性地检查 * 运行状态和线程数,通过返回false防止不需要的时候添加线程 * 2. 如果一个任务能够成功的入队,仍然需要双重检查,因为我们添加了一个线程(有可能这个线程在上次检查后就已经死亡了) * 或者进入此方法的时候调用了shutdown,所以需要重新检查线程池的状态,如果必要的话,当停止的时候要回滚入队操作, * 或者当线程池为空的话创建一个新的线程 * 3. 如果不能入队,尝试着开启一个新的线程,如果开启失败,说明线程池已经是shutdown状态或饱和了,所以拒绝执行该任务 */ int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
复制代码
检查当前线程池中的线程数是否<核心线程数,如果小于核心线程数,就调用 addWorker 方法创建一个新的线程执行任务,addworker 中的第二个参数传入 true,表示当前创建的是核心线程。如果当前线程数>=核心线程数或者创建线程失败的话,直接进入第二种情况。
通过调用 isRunning 方法判断线程池是否还在运行,如果线程池状态不是 running,那就直接退出 execute 方法,没有执行的必要了;如果线程池的状态是 running,尝试着把任务加入到 queue 中,再次检查线程池的状态, 如果当前不是 running,可能在入队后调用了 shutdown 方法,所以要在 queue 中移除该任务,默认采用拒绝策略直接抛出异常。如果当前线程数为 0,可能把 allowCoreThreadTimeOut 设为了 true,正好核心线程全部被回收,所以必须要创建一个空的线程,让它自己去 queue 中去取任务执行。
如果当前线程数>核心线程数,并且入队失败,调用 addWorker 方法创建一个新的线程去执行任务,第二个参数是 false,表示当前创建的线程不是核心线程。这种情况表示核心线程已满并且 queue 已满,如果当前线程数小于最大线程数,创建线程执行任务。如果当前线程数>=最大线程数,默认直接采取拒绝策略。
addWorker 方法
看一下 addWorker 是怎么具体执行的。代码有点长,慢慢看。
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
复制代码
首先判断线程池的 runstate,如果 runstate 为 shutdown,那么并不能满足第二个条件,runstate != shutdown,所以这是针对 runstate 不是 running,shutdown 的情况,当 runstate>shutdown 时,队列为空,此时仍然有任务的话,直接返回 false,线程池已关闭,并不能在继续执行任务了。
第二个自旋操作就的目的就是对线程数量自增,由于涉及到高并发,所以采用了 cas 来控制,判断线程的 workcount>=CAPACITY,那么直接返回 false,或者通过判断是否核心线程,如果是 true,判断 workcount>=核心线程数,如果是 false,判断 workcount>=最大线程数,直接返回 false。如果不满足上个条件,直接使用 cas 把线程数自增,退出自旋操作。
worker 对象。
Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); }
复制代码
在当前线程不为空的情况下,加一把可重入锁 reentrantlock,在加锁后,再次检查线程池的状态 runstate,防止在获取到锁之前线程池已经关闭了,线程池的状态为 running 或者状态为 shutdown 并且任务为空的情况下,才能继续往下执行任务,这是充分必要条件。如果当前线程已经开启了,直接抛出异常,这是绝不允许的。
private final HashSet<Worker> workers = new HashSet<Worker>();
addWorkerFailed 方法
如果添加 worker 失败或者开启线程失败就要调用 addWorkerFailed 方法移除失败的 worker。
private void addWorkerFailed(Worker w) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (w != null) workers.remove(w); decrementWorkerCount(); tryTerminate(); } finally { mainLock.unlock(); } }
复制代码
首先还是获取全局锁 mainlock,接着对 workers 集合中移除 worker,workers 的数量自减。
runWorker 方法
这个方法是运行 task 的方法
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
复制代码
首先还是获取当前线程,获取当前 worker 对象中的任务 task,把当前线程的状态由-1 设为 0,表示可以获取锁执行任务,接下来就是一个 while 循环,当 task 不为空或者从 gettask 方法取出的任务不为空的时候,加锁,底层还是使用了 AQS,保证了只有一个线程执行完毕其他线程才能执行。在执行任务之前,必须进行判断,线程池的状态如果>=STOP,必须中断当前线程,如果是 running 或者 shutdown,当前线程不能被中断,防止线程池调用了 shutdownnow 方法必须中断所有的线程。
在处理任务之前,会执行 beforeExecute 方法, 在处理任务之后,执行 afterExecute 方法,这两个都是钩子方法,继承了 ThreadPoolExecutor 可以重写此方法,嵌入自定义的逻辑。一旦在任务运行的过程中,出现异常会直接抛出,所以在实际的业务中,应该使用 try..catch,把这些日常加入到日志中。
任务执行完,就把 task 设为空,累加当前线程完成的任务数,unlock,继续从 queue 中取任务执行。
getTask 方法。
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out?
for (;;) { int c = ctl.get(); int rs = runStateOf(c);
// Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; }
int wc = workerCountOf(c);
// Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; }
try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
复制代码
这个有点复杂,其实目的就是获得一个要运行的任务,不过人家的判断还是比较多的
shutdown 方法
public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(SHUTDOWN); interruptIdleWorkers(); onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } tryTerminate(); }
复制代码
调用了 shutdown,意味着不能在继续往 queue 中添加任务,也不能在接受新的任务。利用 cas 把当前线程池的状态设为 shutdown,中断所有的空闲线程,onShutdown 是一个钩子方法,是专门给 ScheduledThreadPoolExecutor 来实现的,再次调用 tryTerminate 方法来尝试中止线程池,直到 queue 中的任务全部处理完毕才能正常关闭。
提一下,默认的 存任务的队列是 BlockingQueue
结尾
因为很多东西,全是从书上拷贝的,很枯燥,但同时看书,又是最详细的学习方法之一了,大家跟着书看博客,或许会好点吧.最后一节不写了,就是工厂方法里面的几种线程池,我们一般也是自定义线程池来操作的。
因为博主也是一个开发萌新 我也是一边学一边写 我有个目标就是一周 二到三篇 希望能坚持个一年吧 希望各位大佬多提意见,让我多学习,一起进步。
日常求赞
好了各位,以上就是这篇文章的全部内容了,能看到这里的人呀,都是真粉。
创作不易,各位的支持和认可,就是我创作的最大动力,我们下篇文章见
六脉神剑 | 文 【原创】如果本篇博客有任何错误,请批评指教,不胜感激 !
评论