写点什么

ThreadPoolExecutor 任务提交原码分析

作者:new life
  • 2021 年 11 月 17 日
  • 本文字数:3360 字

    阅读完需:约 11 分钟

ThreadPoolExecutor 任务提交原码分析

线程池任务提交过程,总体上是:

1、如果线程池中线程数量小于核心线程数的情况下,直接创建新线程执行提交的任务;

2、如果线程池中线程数量大于等于核心线程数量的情况下:

(1)线程池处在 RUNNING 状态下,直接将任务添加到任务队列里;

(2)这里会双重校验线程池的状态,防止在添加任务到队列的时候,线程池状态已经变更;

3、如果线程池线程数量达到核心线程数之后:

(1)队列满了,直接创建新线程去执行任务;

(2)线程池状态变更,重新修正线程数量;

具体实现细节,要比上面说详细,考虑的也更周全,详细注释如下:

1.execute(Runnable command)

public void execute(Runnable command) {  if (command == null)    throw new NullPointerException();
int c = ctl.get(); /** 1.校验当前线程数量 * 如果当前线程总数小于核心线程数,创建新线程,并将当前任务 command 当成新线程的第一个任务执行 */ if (workerCountOf(c) < corePoolSize) { /** * 创建新线程,并指定这是核心线程(其实核心线程也是普通线程,只不过告知是核心线程后, * 创建线程的判断条件会变,addWorker()里具体说明) */ if (addWorker(command, true)) return; c = ctl.get(); } /** * 2.双重检查(走到这一步,说明线程总数已经大于核心线程数了) * (1) isRunning() 校验了两次,防止线程池状态变更; * (2) 第一次校验:如果线程池是 RUNNING 状态,将任务放到队列里,并且用的是Offer()非阻塞的放任务 * (3) 第二次校验:如果线程池状态非 RUNNING 状态,说明线程池已经被 SHUTDOWN 或者 STOP 了, * 需要从队列中移除刚刚添加进的任务 command ,并触发拒绝策略 reject(); * (4) 如果线程池是 RUNNING 状态,但是线程池里线程总数为0(线程异常退出),这时候往线程池里添加新线程, * 这样可以让任务队列里,未被执行的任务正常执行,保证线程池可用; */ if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); /** * 3.走到这一步有以下几种情况: * (1) 线程池 RUNNING: 队列满了,放不下了,创建新线程去执行任务,但是新线程创建失败,执行 reject() 拒绝策略; * (2) 线程池非 RUNNING: 说明线程池被停止了,为了让线程在 SHUTDOWN 状态下,任务队列直线完,会 addWork() 创建新线程, * 反之,如果线程池是 STOP 即使调了这个 addWork()方法,也不会创建成功; */ }else if (!addWorker(command, false)) reject(command);}
复制代码


2.submit()

submit 内部封装的也是 execute() 方法:

public <T> Future<T> submit(Callable<T> task) {  if (task == null) throw new NullPointerException();  RunnableFuture<T> ftask = newTaskFor(task);  execute(ftask);  return ftask;}

public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask;}
复制代码


3.addWork(Runnable firstTask, boolean core)

/*** firstTask - 新线程创建后,第一个执行的任务* core - 表明,调用这个方法创建的线程是否是核心线程*/private boolean addWorker(Runnable firstTask, boolean core) {        retry:  			// 自旋        for (;;) {            int c = ctl.get();            int rs = runStateOf(c);
/** * 1、如果线程池是 SHUTDOWN 状态,并且线程池的任务队列里没有任务了,没有必要创建线程了,退出; * 2、如果线程池是 STOP 及之后的状态,直接return,不必再创建线程了; */ if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false;
for (;;) { int wc = workerCountOf(c); /** * 1、如果当前线程总数大于等于允许创建的线程总数 ( 1<<29 -1 ),直接退出 * 2、如果这次指定创建 '核心' 线程,但是当前线程总数已经大于设定的核心线程数,直接退出; */ if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // 经过上面的校验了,说明可以创建线程了,先将线程池线程数量加1 if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); if (runStateOf(c) != rs) continue retry; } }
boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { // 创建新线程,并指定新线程第一个执行的任务 w = new Worker(firstTask); // 获取新创建的线程 final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { /** * 1、获取当前线程池状态并校验,防止获取到锁之前,线程池状态已经变更; * 2、校验线程池状态: * (1)如果线程池是 RUNNING(小于 SHUTDOWN 状态的唯一状态)状态,将work线程添加到线程集合 workers set中; * (2)如果线程池是 SHUTDOWN 并且第一次任务为null,说明当前的线程是补充线程,不是提交正常任务的线程,为了保证 * 线程中任务的正常执行,可以继续往下走; */ int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { // 防止重复启动 if (t.isAlive()) throw new IllegalThreadStateException(); // 线程添加到 works 集合中 workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; // 表明线程已经到集合中添加成功了,剩下的就剩 start() 此线程了 workerAdded = true; } } finally { mainLock.unlock(); } /** works 集合添加成功才启动线程,并表明启动成功, * 但是为什么不把这段代码放到上面,把对 workerAdded 的赋值和这里的判断合写到一块, * 我想的是可能不想持有锁的时间太长吧 */ if (workerAdded) { t.start(); workerStarted = true; } } } finally { // 如果线程启动失败,还要从线程集合中works移除已经添加的work,线程数量减1 if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
复制代码


private void addWorkerFailed(Worker w) {  final ReentrantLock mainLock = this.mainLock;  mainLock.lock();  try {    if (w != null)      workers.remove(w);      decrementWorkerCount();      tryTerminate();  } finally {    mainLock.unlock();  }}
复制代码


用户头像

new life

关注

还未添加个人签名 2019.03.04 加入

还未添加个人简介

评论

发布
暂无评论
ThreadPoolExecutor 任务提交原码分析