01-Future
A Future represents the result of an asynchronous computation.
Future 中定义了 5 个方法
get() / get(long, TimeUnit),获取任务的计算结果,在计算任务未完成时阻塞当前线程直至计算完成或超时。
cancel(boolean),尝试取消任务的执行。该方法在以下机种情况下会失败(即返回 false):a) 任务已完成;b) 任务已被取消;c) 其他不能取消的情形;如果任务还没开始执行,取消后的任务不会再执行;如果任务已经开始,boolean 参数来控制是否中断任务的执行,true 表示会中断,false 表示不中断。
isCancelled(),cancel(boolean) 返回值为 true 的 Future 上调用 isCancelled() 方法返回值为 true。
isDone(),任务完成时,返回值为 true。以下几种都被认为时任务完成的情况:a) 正常结束,b) 异常结束,c) 被取消。
A cancellable asynchronous computation. This class provides a base implementation of Future.
02-FutureTask
FutureTask 是 Future 接口的一个实现。而且,FutureTask 实现了 Runnable 接口,因此可以提交到线程中执行。
02.1-FutureTask 的核心属性
/** FutureTask 的运行状态 */
private volatile int state;
/** 底层的可执行任务 */
private Callable<V> callable;
/** 任务与运行后的结果,可能是计算结果,也可能是异常 */
private Object outcome;
/** 执行任务的线程 */
private volatile Thread runner;
/** 等待的线程队列 */
private volatile WaitNode waiters;
复制代码
刚创建时,state 的值为 NEW(0)。COMPLETING(1) 和 INTERRUPTING(5) 是一种中间状态,持续时间较短暂。
除此之外,其他几个都是最终状态。Future 中定义的 isDone 方法和 isCancelled 方法都是基于 state 变量判断的,前者返回值为state !=NEW
,后者返回值为state >= CANCELLED
。
FutureTask 中的 Callable 是要执行的任务,在构造器 FutureTask(Callable) 中传入,或者在构造器 FutureTask(Runable, V) 中根据 Runable 对象创建Executors.*callable*(runnable, result);
02.2-FutureTask 的核心方法
FutureTask 实现了 Runnable 接口,实现了其中定义的public void run();
方法。
public void run() {
if (state != NEW || /** 防止重复运行 */
/** FutureTask 已交给其他线程去执行 */
!RUNNER.compareAndSet(this, null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
/** 执行任务,获得返回值 */
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
/** 设置 outcome 为异常,
* state 通过 CAS 方式先被设置为 COMPLETING,之后被设置为 EXCEPTIONAL
* 后面遍历阻塞队列,唤醒阻塞线程
*/
setException(ex);
}
if (ran)
/** 设置 outcome 为任务执行结果
* state 通过 CAS 方式先被设置为 COMPLETING,之后被设置为 NORMAL
* 后面遍历阻塞队列,唤醒阻塞线程
*/
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
复制代码
get 方法是 Future 接口中定义的,用来获得任务的计算结果。如果任务尚未完成,则阻塞调用 get 方法的线程(底层通过 awaitDone 方法实现)。
public V get() throws InterruptedException, ExecutionException {
int s = state;
/** state 为 NEW / COMPLETING 时,
* 新创建的对象 state 为 NEW,setExeception 或 set 方法会将 state 设置为 COMPLETING
* 上述两种状态都需要阻塞线程
*/
if (s <= COMPLETING)
/** 阻塞线程 */
s = awaitDone(false, 0L);
/** 根据 state,返回任务结果或抛对应的异常 */
return report(s);
}
复制代码
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
long startTime = 0L; // Special value 0L means not yet parked
WaitNode q = null;
boolean queued = false; /** 表示当前线程(调用了 get 方法)尚未被放入到等待队列中 */
for (;;) {
int s = state;
if (s > COMPLETING) { /** 尝试将当前线程加入到等待队列时,任务完成了,则放弃将当前线程加入到等待队列 */
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING) /** 需要稍等一会儿,等待任务结果或异常设置到 outcome */
// We may have already promised (via isDone) that we are done
// so never return empty-handed or throw InterruptedException
Thread.yield();
else if (Thread.interrupted()) { /** 下面的所有情况都是 state == NEW 的情形 */
/** 当前线程如果被中断,则放弃添加当前线程到队列,并抛出异常 */
removeWaiter(q);
throw new InterruptedException();
}
else if (q == null) { /** 创建节点,将当前线程包装 */
if (timed && nanos <= 0L)
return s;
q = new WaitNode();
}
else if (!queued) /** 当前节点尚未入队,加入队列 */
queued = WAITERS.weakCompareAndSet(this, q.next = waiters, q);
else if (timed) { /** 带超时版本的 */
final long parkNanos;
if (startTime == 0L) { // first time
startTime = System.nanoTime();
if (startTime == 0L)
startTime = 1L;
parkNanos = nanos;
} else {
long elapsed = System.nanoTime() - startTime;
if (elapsed >= nanos) {
removeWaiter(q);
return state;
}
parkNanos = nanos - elapsed;
}
// nanoTime may be slow; recheck before parking
if (state < COMPLETING)
LockSupport.parkNanos(this, parkNanos);
}
else /** 当加入队列之后,阻塞当前线程 */
LockSupport.park(this);
}
}
复制代码
cancel 是 Future 接口中定义的方法,用于取消任务的执行:
public boolean cancel(boolean mayInterruptIfRunning) {
/** 尝试将任务的 state 设置为中断或取消,如果成功则继续,否则返回 false */
if (!(state == NEW && STATE.compareAndSet
(this, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {
/** 如果允许中断运行中的线程,则中断 */
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
STATE.setRelease(this, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
return true;
}
复制代码
历史文章推荐
评论