写点什么

JUC 之 FutureTask 源码与工作原理分析

用户头像
AI乔治
关注
发布于: 2020 年 11 月 04 日
JUC之 FutureTask 源码与工作原理分析

JDK1.5 引入了Future模式,Future代表了一个异步任务的执行结果。Future模式可以理解成:主线程将待执行的任务提交给子线程执行后,可以先获取任务结果的持有者Future。然后主线程可以去执行其他的任务。等待到要关注之前任务的执行结果时,再从Future中获取。下面是用户注册的场景。





同步实现时,注册接口响应时间150ms= 用户信息保存50ms + 发送短信50ms + 发送邮件50ms

异步实现时,注册接口响应时间100ms= 用户信息保存50ms + max(发送短信50ms, 发送邮件50ms)

Future 接口核心方法

/*尝试取消当前执行的任务*/
boolean cancel(boolean mayInterruptIfRunning);
/*判断当前执行的任务是否被取消*/
boolean isCancelled();
/*判断当前执行的任务是否完成,正常结束、发生异常或任务被取消都认为任务完成*/
boolean isDone();
/*获取当前执行任务对应的结果,将阻塞等待任务执行完*/
V get() throws InterruptedException, ExecutionException;
/*等待一段时间,获取当前执行任务对应的结果,将阻塞等待任务执行完,如果对应时间还未获取结果,
* 抛出TimeoutException
*/
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;


后面所有分析全部基于JDK8

在JDK中Future接口的实现类为FutureTask,其UML图如下。





FutureTask同时实现了Runnable与Future的功能。实现Runnable主要是为了让Executor可以执行,实现Future为了能够持有执行结果的引用

FutureTask的成员变量如下:

/*当前任务的执行状态*/
private volatile int state;
/*当前要执行的任务,执行完后将被设置为null*/
private Callable<V> callable;
/*当前任务的执行结果,当发生异常或被取消时为对应异常,非volatile,由state状态保证线程安全*/
private Object outcome;
/*执行当callable任务的线程引用,为当前线程通过CAS方式原子设置*/
private volatile Thread runner;
/*Treiber椎,用于保存由于调用Future.get方法而阻塞的线程*/
private volatile WaitNode waiters;


先来看一下用于维护当前任务执行状态的state成员变量。

FutureTask内部定义了,任务执行的7种状态。

//任务的初始初始化状态
private static final int NEW = 0;
//执行结果设置中时,的中间状态
private static final int COMPLETING = 1;
//任务正常执行完成的最终状态
private static final int NORMAL = 2;
//任务执行出错的最终状态
private static final int EXCEPTIONAL = 3;
//调用Future.cancell取消任务的最终状态
private static final int CANCELLED = 4;
//执行任务的线程被中断的中间状态
private static final int INTERRUPTING = 5;
//执行任务的线程被中断的最终状态
private static final int INTERRUPTED = 6;






上图展示了,Future任务状态的扭转图。new状态时通过调用set()方法、setException()方法可以使状态最终分别转变为NORMAL与EXCEPTIONAL; new状态时通过调用cancel(flase)方法、cancel(true)方法可以可以使状态最终分别转变为CANCELLED与INTERRUPTED。

再看一下用于保存由于调用Future.get方法而阻塞的线程的Treiber椎引用成变量waiters。

static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}


每当一个线程调用Future.get去获取任务的执行结果时,如果当前任务还没有执行结束、还没有被取消或者执行中未抛出异常。将产生一个新的WaitNode类型的节点,该节点持有调用Future.get方法的线程的引用,放入到Treiber椎中。FutureTask成变量waiters更新为最新的WaitNode节点。如下图。





介绍了,FutureTask中最重要的两个成量变state与waiters。现在开始分析FutureTask的源码与工作原理。先从FutureTask的get方法入手。

/**
* @throws CancellationException {@inheritDoc}
*/
public V get() throws InterruptedException, ExecutionException {
int s = state;
//任务执行状态为NEW或COMPLETING时,等待任务执行完成
if (s <= COMPLETING)
//等待任务执行完成,具体的等待通过Treiber椎实现
s = awaitDone(false, 0L);
/*不是NEW或COMPLETING时,可以任务执行的获取结果,
*当调用cancel方法将抛出CancellationException,
*当任务执行中出现异常时将抛出ExecutionException
*/
return report(s);
}


FutureTask awaitDone方法:

/**
* Awaits completion or aborts on interrupt or timeout.
* * @param timed true if use timed waits
* @param nanos time to wait, if timed
* @return state upon completion
*/
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}

int s = state;
/*任务已执行完,包括NORMAL(任务正常执行完), EXCEPTIONAL(任务执行出现异常),
*CANCELLED(任务被取消执行不中断), INTERRUPTING(任务被取消执行中断),
*INTERRUPTED(任务被取消执行中断)
*/
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
//任务还在执行中
else if (s == COMPLETING) // cannot time out yet
//向操作系统发出让出当前线程CUP的调度执行信号
Thread.yield();
else if (q == null)
//状态为NEW时,将当前调用get方法的线程构造成WaitNode做准备下一步放入Treiber stack
q = new WaitNode();
else if (!queued)
//以原子的方式将调用了get方法的线程构造好的WaitNode放入Treiber stack
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
//响应超时的调用get方法
nanos = deadline - System.nanoTime();
//已超时返回
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
//将当前线程挂起,直到调用LockSupport.unpark当前线程或是已超时
LockSupport.parkNanos(this, nanos);
}
else
//将当前线程挂起,直到调用LockSupport.unpark当前线程
LockSupport.park(this);
}
}


从上面的分析中可以看出Future中任务没有执行完(包括正常执行、执行中抛出现异常、执行任务被取消)时,调用get方法的线程被LockSupport.park方法挂起,操作系统将不会对这个线程进行调度,当前线程被阻塞。而这个阻塞的结束是依靠调用LockSupport.unpark方法。LockSupport.unpark方法只有在任务正常执行完、执行中抛出现异常、执行任务被取消才会被调用。先不看具体何时在什么地方LockSupport.unpark方法被调用。先看看report方法的源码。

/**
* Returns result or throws exception for completed task.
* * @param s completed state value
*/
@SuppressWarnings("unchecked")
private V report(int s) throws ExecutionException {
/*最终任务执行的结果,如果任务正常执行完成为任务执行结果
*如果任务执行被取消为null, 如果任务执行中出现异常为对应的Throwable对象
*/
Object x = outcome;
//任务正常执行完成
if (s == NORMAL)
return (V)x;
/*任务执行被取消,调用cancel(false)产生的状态为CANCELLED, *调用cancel(true)产生的状态为INTERRUPTED */
if (s >= CANCELLED)
//抛出任务被取消的异常
throw new CancellationException();
//任务执行中出现异常抛出,执行中的异常
throw new ExecutionException((Throwable)x);
}


上面已将FutureTask的get方法有关的源码分析了一遍。但任务对应的结果outcome是何时在什么被设置呢?下面分析一下这部分,即任务的执行结果outcome如何被设置的,何时被设置的。FutureTask中有一个set方法源码如下。

/**
* Sets the result of this future to the given value unless
* this future has already been set or has been cancelled.
*
* <p>This method is invoked internally by the {@link #run} method
* upon successful completion of the computation.
*
* @param v the value
*/
protected void set(V v) {
//以原子的方式设置任务的执行中间状态
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
//设置任务执行结果
outcome = v;
//设置任务为正常执行完
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
//从Treiber stack删除所有阻塞等待的线程,同时将这些挂起的线程唤醒
finishCompletion();
}
}


finishCompletion源码如下。

private void finishCompletion() {
// assert state > COMPLETING;
//遍历Treiber stack 删除所有阻塞等待的线程,同时将这些挂起的线程唤醒
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
//唤醒挂起的线程
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
//任务执行完后的扩展点,默认实现为空。可以利用这个扩展点实现回调
done();
callable = null; // to reduce footprint
}


跟踪调用链会发现run方法里面调用了set方法,由于FutureTask实现了Runnable接口,实现了run方法。run源码如下。

public void run() {
/*任务执行的状态为非NEW时,
*UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))
*将不执行直接退出。 *任务执行的状态为NEW时,
*执行UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))
*将当前线程设置为callable的runner。
*多个线程并发调用run时,只有一个线程会执行,其他线程会直接退出,因为只有一个线程的
*UNSAFE.compareAndSwapObject将为true。
*/
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
//待执行的任务不空且任务还未执行才执行任务
if (c != null && state == NEW) {
V result;
boolean ran;
try {
//执行任务具体的方法
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
//设置异常
setException(ex);
}
if (ran)
//成功执行设置任务对应结果
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}


前面曾经提到FutureTask实现Runnable主要是为了让Executor可以执行,实现Future为了能够持有执行结果的引用

ExecutorService扩展了Executor提供了三个重载的submit方法,从而让线程池支持Future模式。

//提交Callable类型的任务
<T> Future<T> submit(Callable<T> task)
//提交Runnable类型的任务,并以result作为任务执行完后的结果
<T> Future<T> submit(Runnable task, T result);
//提交Runnable类型的任务,以null作为任务执行完后的结果
Future<?> submit(Runnable task);


如果提交的任务类型是Runnable在AbstractExecutorService类中,Runnable类型的任务将适配为Callable类型的任务。源码如下,不做具体分析。后面分析线程池源码时会分析。

public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
//将提交runnable类型的任务适配为Callable类型的任务
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}


对于任务的调用记住以下两点便可:

  1. 如果submit给线程池的任务类型为Callable,则线程池内部执行该任务的线程的run方法,会调用FutureTask的run方法,而FutureTask的run方法又会调用提交任务的Callable的call方法。

  2. 如果submit给线程池的任务类型为Runnable,则线程池内部执行该任务的线程的run方法,会调用FutureTask的run方法,而FutureTask的run方法又会调用提交任务适配后的Callable的call方法,而Callable的call方法内部调用Runnable的run方法。

最后看一下FutureTask的cancel方法

//mayInterruptIfRunning 运行中任务的是否可以被中断
public boolean cancel(boolean mayInterruptIfRunning) {
//状态为NEW的任务,才有可能cancel成功
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
/* 一旦任务的状态不是NEW下面代码将不执行*/
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
//中断当前执行任务的线程
t.interrupt();
} finally { // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
//从Treiber stack删除所有阻塞等待的线程,同时将这些挂起的线程唤醒
finishCompletion();
}
return true;
}


上面对Future做了这整体分析。回顾几个重要的知识点:

  1. 当提交任务没有执行完时,Future的内部通过Treiber椎与LockSport.park将调用Future.get方法的线程构成一个WaitNode加入到Treiber椎中挂起,当任务执行完成后(包括正常执行、执行中抛出现异常、执行任务被取消),将遍历Treiber椎中的结点调用LockSport.unpark唤醒所有挂起的线程让其返回任务的执行结果。

  2. FutureTask类提供了done方法做为扩展点,提交任务成功执行后(包括正常执行、执行中抛出现异常、执行任务被取消)done方法将被调用。可以利用这个扩展点实现回调,不用调用isDone与get组合去轮询任务的执行结果,而在任务执行回后会主动调用之前的回调方法。(Google的Guava库利用这个扩展点实现了ListenableFuture,同时Spring-core中也借鉴了Guava的ListenableFuture实现了ListenableFuture。)

  3. 提交的任务,正常执行完、执行中抛出现异常、执行任务被取消,都视为任务执行完成。

  4. 调用线程池的submit方法的任务最终将封装成FutureTask,提交给线程池去执行。对于提交类型为Runnable的任务其会被先适配为Callable类型的任务。



看完三件事❤️

如果你觉得这篇内容对你还蛮有帮助,我想邀请你帮我三个小忙:



  1. 点赞,转发,有你们的 『点赞和评论』,才是我创造的动力。

  2. 关注公众号 『 java烂猪皮 』,不定期分享原创知识。

  3. 同时可以期待后续文章ing🚀



作者:叶易



出处:https://club.perfma.com/article/1563956 



用户头像

AI乔治

关注

分享后端技术干货。公众号【 Java烂猪皮】 2019.06.30 加入

一名默默无闻的扫地僧!

评论

发布
暂无评论
JUC之 FutureTask 源码与工作原理分析