线程池中 execute 和 submit 的区别?
- 2025-06-13 福建
本文字数:8306 字
阅读完需:约 27 分钟
简要回答
execute 只能提交 Runnable 类型的任务,无返回值。submit 既可以提交 Runnable 类型的任务,也可以提交 Callable 类型的任务,会有一个类型为 Future 的返回值,但当任务类型为 Runnable 时,返回值为 null。
execute 在执行任务时,如果遇到异常会直接抛出,而 submit 不会直接抛出,只有在使用 Future 的 get 方法获取返回值时,才会抛出异常。
execute 所属顶层接口是 Executor,submit 所属顶层接口是 ExecutorService,实现类 ThreadPoolExecutor 重写了 execute 方法,抽象类 AbstractExecutorService 重写了 submit 方法。
扩展知识
通过执行 execute 方法 该方法无返回值,为 ThreadPoolExecutor 自带方法,传入 Runnable 类型对象

通过执行 submit 方法 该方法返回值为 Future 对象,为抽象类 AbstractExecutorService 的方法,被 ThreadPoolExecutor 继承,其内部实现也是调用了接口类 Executor 的 execute 方法,通过上面的类图可以看到,该方法的实现依然是 ThreadPoolExecutor 的 execute 方法


execute()执行流程图

execute()源码
// 使用原子操作类AtomicInteger的ctl变量,前3位记录线程池的状态,后29位记录线程数
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// Integer的范围为[-2^31,2^31 -1], Integer.SIZE-3 =32-3= 29,用来辅助左移位运算
private static final int COUNT_BITS = Integer.SIZE - 3;
// 高三位用来存储线程池运行状态,其余位数表示线程池的容量
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 线程池状态以常量值被存储在高三位中
private static final int RUNNING = -1 << COUNT_BITS; // 线程池接受新任务并会处理阻塞队列中的任务
private static final int SHUTDOWN = 0 << COUNT_BITS; // 线程池不接受新任务,但会处理阻塞队列中的任务
private static final int STOP = 1 << COUNT_BITS; // 线程池不接受新的任务且不会处理阻塞队列中的任务,并且会中断正在执行的任务
private static final int TIDYING = 2 << COUNT_BITS; // 所有任务都执行完成,且工作线程数为0,将调用terminated方法
private static final int TERMINATED = 3 << COUNT_BITS; // 最终状态,为执行terminated()方法后的状态
// ctl变量的封箱拆箱相关的方法
private static int runStateOf(int c) { return c & ~CAPACITY; } // 获取线程池运行状态
private static int workerCountOf(int c) { return c & CAPACITY; } // 获取线程池运行线程数
private static int ctlOf(int rs, int wc) { return rs | wc; } // 获取ctl对象
public void execute(Runnable command) {
if (command == null) // 任务为空,抛出NPE
throw new NullPointerException();
int c = ctl.get(); // 获取当前工作线程数和线程池运行状态(共32位,前3位为运行状态,后29位为运行线程数)
if (workerCountOf(c) < corePoolSize) { // 如果当前工作线程数小于核心线程数
if (addWorker(command, true)) // 在addWorker中创建核心线程并执行任务
return;
c = ctl.get();
}
// 核心线程数已满(工作线程数>核心线程数)才会走下面的逻辑
if (isRunning(c) && workQueue.offer(command)) { // 如果当前线程池状态为RUNNING,并且任务成功添加到阻塞队列
int recheck = ctl.get(); // 双重检查,因为从上次检查到进入此方法,线程池可能已成为SHUTDOWN状态
if (! isRunning(recheck) && remove(command)) // 如果当前线程池状态不是RUNNING则从队列删除任务
reject(command); // 执行拒绝策略
else if (workerCountOf(recheck) == 0) // 当线程池中的workerCount为0时,此时workQueue中还有待执行的任务,则新增一个addWorker,消费workqueue中的任务
addWorker(null, false);
}
// 阻塞队列已满才会走下面的逻辑
else if (!addWorker(command, false)) // 尝试增加工作线程执行command
// 如果当前线程池为SHUTDOWN状态或者线程池已饱和
reject(command); // 执行拒绝策略
}
execute 体现的就是线程池的工作原理,addWorker
则是更复杂的逻辑来保证 worker 的原子性地插入
private boolean addWorker(Runnable firstTask, boolean core) {
retry: // 循环退出标志位
for (;;) { // 无限循环
int c = ctl.get();
int rs = runStateOf(c); // 线程池状态
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()) // 换成更直观的条件语句
// (rs != SHUTDOWN || firstTask != null || workQueue.isEmpty())
)
// 返回false的条件就可以分解为:
//(1)线程池状态为STOP,TIDYING,TERMINATED
//(2)线程池状态为SHUTDOWN,且要执行的任务不为空
//(3)线程池状态为SHUTDOWN,且任务队列为空
return false;
// cas自旋增加线程个数
for (;;) {
int wc = workerCountOf(c); // 当前工作线程数
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize)) // 工作线程数>=线程池容量 || 工作线程数>=(核心线程数||最大线程数)
return false;
if (compareAndIncrementWorkerCount(c)) // 执行cas操作,添加线程个数
break retry; // 添加成功,退出外层循环
// 通过cas添加失败
c = ctl.get();
// 线程池状态是否变化,变化则跳到外层循环重试重新获取线程池状态,否者内层循环重新cas
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// 简单总结上面的CAS过程:
//(1)内层循环作用是使用cas增加线程个数,如果线程个数超限则返回false,否者进行cas
//(2)cas成功则退出双循环,否者cas失败了,要看当前线程池的状态是否变化了
//(3)如果变了,则重新进入外层循环重新获取线程池状态,否者重新进入内层循环继续进行cas
// 走到这里说明cas成功,线程数+1,但并未被执行
boolean workerStarted = false; // 工作线程调用start()方法标志
boolean workerAdded = false; // 工作线程被添加标志
Worker w = null;
try {
w = new Worker(firstTask); // 创建工作线程实例
final Thread t = w.thread; // 获取工作线程持有的线程实例
if (t != null) {
final ReentrantLock mainLock = this.mainLock; // 使用全局可重入锁
mainLock.lock(); // 加锁,控制并发
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get()); // 获取当前线程池状态
// 线程池状态为RUNNING或者(线程池状态为SHUTDOWN并且没有新任务时)
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // 检查线程是否处于活跃状态
throw new IllegalThreadStateException();
workers.add(w); // 线程加入到存放工作线程的HashSet容器,workers全局唯一并被mainLock持有
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock(); // finally块中释放锁
}
if (workerAdded) { // 线程添加成功
t.start(); // 调用线程的start()方法
workerStarted = true;
}
}
} finally {
if (! workerStarted) // 如果线程启动失败,则执行addWorkerFailed方法
addWorkerFailed(w);
}
return workerStarted;
}
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)
workers.remove(w); // 线程启动失败时,需将前面添加的线程删除
decrementWorkerCount(); // ctl变量中的工作线程数-1
tryTerminate(); // 尝试将线程池转变成TERMINATE状态
} finally {
mainLock.unlock();
}
}
final void tryTerminate() {
for (;;) {
int c = ctl.get();
// 以下情况不会进入TERMINATED状态:
//(1)当前线程池为RUNNING状态
//(2)在TIDYING及以上状态
//(3)SHUTDOWN状态并且工作队列不为空
//(4)当前活跃线程数不等于0
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
if (workerCountOf(c) != 0) { // 工作线程数!=0
interruptIdleWorkers(ONLY_ONE); // 中断一个正在等待任务的线程
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 通过CAS自旋判断直到当前线程池运行状态为TIDYING并且活跃线程数为0
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated(); // 调用线程terminated()
} finally {
ctl.set(ctlOf(TERMINATED, 0)); // 设置线程池状态为TERMINATED,工作线程数为0
termination.signalAll(); // 通过调用Condition接口的signalAll()唤醒所有等待的线程
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
Worker 源码
Worker 是 ThreadPoolExecutor 类的内部类,此处只讲最重要的构造函数和 run 方法
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
// 该worker正在运行的线程
final Thread thread;
// 将要运行的初始任务
Runnable firstTask;
// 每个线程的任务计数器
volatile long completedTasks;
// 构造方法
Worker(Runnable firstTask) {
setState(-1); // 调用runWorker()前禁止中断
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this); // 通过ThreadFactory创建一个线程
}
// 实现了Runnable接口的run方法
public void run() {
runWorker(this);
}
... // 此处省略了其他方法
}
Worker 实现了 Runable 接口,在调用 start()方法候,实际执行的是 run 方法 Worker 实现了 Runable 接口,在调用 start()方法候,实际执行的是 run 方法
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask; // 获取工作线程中用来执行任务的线程实例
w.firstTask = null;
w.unlock(); // status设置为0,允许中断
boolean completedAbruptly = true; // 线程意外终止标志
try {
// 如果当前任务不为空,则直接执行;否则调用getTask()从任务队列中取出一个任务执行
while (task != null || (task = getTask()) != null) {
w.lock(); // 加锁,保证下方临界区代码的线程安全
// 如果状态值大于等于STOP且当前线程还没有被中断,则主动中断线程
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt(); // 中断当前线程
try {
beforeExecute(wt, task); // 任务执行前的回调,空实现,可以在子类中自定义
Throwable thrown = null;
try {
task.run(); // 执行线程的run方法
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown); // 任务执行后的回调,空实现,可以在子类中自定义
}
} finally {
task = null; // 将循环变量task设置为null,表示已处理完成
w.completedTasks++; // 当前已完成的任务数+1
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
从任务队列中取出一个任务
private Runnable getTask() {
boolean timedOut = false; // 通过timeOut变量表示线程是否空闲时间超时了
// 无限循环
for (;;) {
int c = ctl.get(); // 线程池信息
int rs = runStateOf(c); // 线程池当前状态
// 如果线程池状态>=SHUTDOWN并且工作队列为空 或 线程池状态>=STOP,则返回null,让当前worker被销毁
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount(); // 工作线程数-1
return null;
}
int wc = workerCountOf(c); // 获取当前线程池的工作线程数
// 当前线程是否允许超时销毁的标志
// 允许超时销毁:当线程池允许核心线程超时 或 工作线程数>核心线程数
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 如果(当前线程数大于最大线程数 或 (允许超时销毁 且 当前发生了空闲时间超时))
// 且(当前线程数大于1 或 阻塞队列为空)
// 则减少worker计数并返回null
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 根据线程是否允许超时判断用poll还是take(会阻塞)方法从任务队列头部取出一个任务
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();//线程池重用逻辑:没有任务了就阻塞在这里,等待新的任务
if (r != null)
return r; // 返回从队列中取出的任务
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
总结一下哪些情况 getTask()会返回 null:
线程池状态为 SHUTDOWN 且任务队列为空
线程池状态为 STOP、TIDYING、TERMINATED
线程池线程数大于最大线程数
线程可以被超时回收的情况下等待新任务超时
工作线程退出
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 如果completedAbruptly为true则表示任务执行过程中抛出了未处理的异常
// 所以还没有正确地减少worker计数,这里需要减少一次worker计数
if (completedAbruptly)
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 把将被销毁的线程已完成的任务数累加到线程池的完成任务总数上
completedTaskCount += w.completedTasks;
workers.remove(w); // 从工作线程集合中移除该工作线程
} finally {
mainLock.unlock();
}
// 尝试结束线程池
tryTerminate();
int c = ctl.get();
// 如果是RUNNING 或 SHUTDOWN状态
if (runStateLessThan(c, STOP)) {
// worker是正常执行完
if (!completedAbruptly) {
// 如果允许核心线程超时则最小线程数是0,否则最小线程数等于核心线程数
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
// 如果阻塞队列非空,则至少要有一个线程继续执行剩下的任务
if (min == 0 && ! workQueue.isEmpty())
min = 1;
// 如果当前线程数已经满足最小线程数要求,则不需要再创建替代线程
if (workerCountOf(c) >= min)
return; // replacement not needed
}
// 重新创建一个worker来代替被销毁的线程
addWorker(null, false);
}
}
submit 源码
提交任务到线程池有两种方法,一种是 execute,另一种是 submit。区别是 execute 没有返回值,submit 是有返回值的,如果有异常抛出,submit 同样可以获取异常结果。
// AbstractExecutorService.submit
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
submit 中调用了newTaskFor
方法来返回一个 ftask 对象,然后 execute 这个 ftask 对象,newTaskFor
代码如下:
// AbstractExecutorService.newTaskFor
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
newTaskFor
又调用FutureTask的有参构造器来创建一个futureTask
实例,代码如下:
// FutureTask有参构造器
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
这个有参构造器中又调用了Executors
的静态方法 callable 创建一个 callable 实例来赋值给futureTask
的 callable 属性,代码如下:
// Executors.callable
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}
最后还是使用了RunnableAdapter
来包装这个 task,代码如下:
// Executors.RunnableAdapter类
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
}
梳理一下整个流程,run 和 call 的关系的伪代码如下
// submit
run(){
// RunnableAdapter.call
call(){
// task.run
run(){
// 实际的任务
}
}
}
为什么要这么麻烦封装一层又一层呢?
可能是为了适配。submit 的返回值是futureTask
,但是传给 submit 的是个 runnable,然后 submit 会把这个 runnable 继续传给futureTask
,futureTask
的结果值是 null,但是又由于futureTask
的 run 方法已经被重写成执行 call 方法了,所以只能在 call 方法里面跑真正的 run 方法了
文章转载自:SevenCoder

还未添加个人签名 2025-04-01 加入
还未添加个人简介
评论