01-Future
A Future represents the result of an asynchronous computation.
Future 中定义了 5 个方法
get() / get(long, TimeUnit),获取任务的计算结果,在计算任务未完成时阻塞当前线程直至计算完成或超时。
cancel(boolean),尝试取消任务的执行。该方法在以下机种情况下会失败(即返回 false):a) 任务已完成;b) 任务已被取消;c) 其他不能取消的情形;如果任务还没开始执行,取消后的任务不会再执行;如果任务已经开始,boolean 参数来控制是否中断任务的执行,true 表示会中断,false 表示不中断。
isCancelled(),cancel(boolean) 返回值为 true 的 Future 上调用 isCancelled() 方法返回值为 true。
isDone(),任务完成时,返回值为 true。以下几种都被认为时任务完成的情况:a) 正常结束,b) 异常结束,c) 被取消。
A cancellable asynchronous computation. This class provides a base implementation of Future.
02-FutureTask
FutureTask 是 Future 接口的一个实现。而且,FutureTask 实现了 Runnable 接口,因此可以提交到线程中执行。
02.1-FutureTask 的核心属性
/** FutureTask 的运行状态 */private volatile int state;/** 底层的可执行任务 */private Callable<V> callable;/** 任务与运行后的结果,可能是计算结果,也可能是异常 */private Object outcome;/** 执行任务的线程 */private volatile Thread runner;/** 等待的线程队列 */private volatile WaitNode waiters;
复制代码
刚创建时,state 的值为 NEW(0)。COMPLETING(1) 和 INTERRUPTING(5) 是一种中间状态,持续时间较短暂。
除此之外,其他几个都是最终状态。Future 中定义的 isDone 方法和 isCancelled 方法都是基于 state 变量判断的,前者返回值为state !=NEW,后者返回值为state >= CANCELLED。
FutureTask 中的 Callable 是要执行的任务,在构造器 FutureTask(Callable) 中传入,或者在构造器 FutureTask(Runable, V) 中根据 Runable 对象创建Executors.*callable*(runnable, result);
02.2-FutureTask 的核心方法
FutureTask 实现了 Runnable 接口,实现了其中定义的public void run();方法。
public void run() { if (state != NEW || /** 防止重复运行 */ /** FutureTask 已交给其他线程去执行 */ !RUNNER.compareAndSet(this, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { /** 执行任务,获得返回值 */ result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; /** 设置 outcome 为异常, * state 通过 CAS 方式先被设置为 COMPLETING,之后被设置为 EXCEPTIONAL * 后面遍历阻塞队列,唤醒阻塞线程 */ setException(ex); } if (ran) /** 设置 outcome 为任务执行结果 * state 通过 CAS 方式先被设置为 COMPLETING,之后被设置为 NORMAL * 后面遍历阻塞队列,唤醒阻塞线程 */ set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); }}
复制代码
get 方法是 Future 接口中定义的,用来获得任务的计算结果。如果任务尚未完成,则阻塞调用 get 方法的线程(底层通过 awaitDone 方法实现)。
public V get() throws InterruptedException, ExecutionException { int s = state; /** state 为 NEW / COMPLETING 时, * 新创建的对象 state 为 NEW,setExeception 或 set 方法会将 state 设置为 COMPLETING * 上述两种状态都需要阻塞线程 */ if (s <= COMPLETING) /** 阻塞线程 */ s = awaitDone(false, 0L); /** 根据 state,返回任务结果或抛对应的异常 */ return report(s);}
复制代码
private int awaitDone(boolean timed, long nanos) throws InterruptedException { long startTime = 0L; // Special value 0L means not yet parked WaitNode q = null; boolean queued = false; /** 表示当前线程(调用了 get 方法)尚未被放入到等待队列中 */ for (;;) { int s = state; if (s > COMPLETING) { /** 尝试将当前线程加入到等待队列时,任务完成了,则放弃将当前线程加入到等待队列 */ if (q != null) q.thread = null; return s; } else if (s == COMPLETING) /** 需要稍等一会儿,等待任务结果或异常设置到 outcome */ // We may have already promised (via isDone) that we are done // so never return empty-handed or throw InterruptedException Thread.yield(); else if (Thread.interrupted()) { /** 下面的所有情况都是 state == NEW 的情形 */ /** 当前线程如果被中断,则放弃添加当前线程到队列,并抛出异常 */ removeWaiter(q); throw new InterruptedException(); } else if (q == null) { /** 创建节点,将当前线程包装 */ if (timed && nanos <= 0L) return s; q = new WaitNode(); } else if (!queued) /** 当前节点尚未入队,加入队列 */ queued = WAITERS.weakCompareAndSet(this, q.next = waiters, q); else if (timed) { /** 带超时版本的 */ final long parkNanos; if (startTime == 0L) { // first time startTime = System.nanoTime(); if (startTime == 0L) startTime = 1L; parkNanos = nanos; } else { long elapsed = System.nanoTime() - startTime; if (elapsed >= nanos) { removeWaiter(q); return state; } parkNanos = nanos - elapsed; } // nanoTime may be slow; recheck before parking if (state < COMPLETING) LockSupport.parkNanos(this, parkNanos); } else /** 当加入队列之后,阻塞当前线程 */ LockSupport.park(this); }}
复制代码
cancel 是 Future 接口中定义的方法,用于取消任务的执行:
public boolean cancel(boolean mayInterruptIfRunning) { /** 尝试将任务的 state 设置为中断或取消,如果成功则继续,否则返回 false */ if (!(state == NEW && STATE.compareAndSet (this, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) return false; try { // in case call to interrupt throws exception if (mayInterruptIfRunning) { /** 如果允许中断运行中的线程,则中断 */ try { Thread t = runner; if (t != null) t.interrupt(); } finally { // final state STATE.setRelease(this, INTERRUPTED); } } } finally { finishCompletion(); } return true;}
复制代码
历史文章推荐
评论