FutureTask 源码解析
发布于: 2021 年 03 月 10 日
Future 设计模式
属性
private volatile int state; //当前任务状态 private static final int NEW = 0; //定义的状态常量 private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6;
/** The underlying callable; nulled out after running */ private Callable<V> callable; //执行的任务 /** The result to return or exception to throw from get() */ private Object outcome; // non-volatile, protected by state reads/writes 结果值 /** The thread running the callable; CASed during run() */ private volatile Thread runner; //执行当前任务的线程 /** Treiber stack of waiting threads */ private volatile WaitNode waiters; //获取结果的等待队列
static final class WaitNode { volatile Thread thread; volatile WaitNode next; WaitNode() { thread = Thread.currentThread(); } }复制代码
构造函数
public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable}
//runnable转换为callablepublic FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable}
复制代码
Executors 类, runnable 转换为 callable,适配器,result 为运行完成时返回的结果
public static <T> Callable<T> callable(Runnable task, T result) { if (task == null) throw new NullPointerException(); return new RunnableAdapter<T>(task, result);}
private static final class RunnableAdapter<T> implements Callable<T> { private final Runnable task; private final T result; RunnableAdapter(Runnable task, T result) { this.task = task; this.result = result; } public T call() { task.run(); return result; } public String toString() { return super.toString() + "[Wrapped task = " + task + "]"; }}复制代码
取消线程
中断
public boolean cancel(boolean mayInterruptIfRunning) { if (!(state == NEW && STATE.compareAndSet (this, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) return false; try { // in case call to interrupt throws exception if (mayInterruptIfRunning) { try { Thread t = runner; if (t != null) t.interrupt(); } finally { // final state STATE.setRelease(this, INTERRUPTED); } } } finally { finishCompletion(); } return true;}复制代码
获取结果
根据状态判断,没完成就加入等待队列,park 此线程
public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); } private int awaitDone(boolean timed, long nanos) throws InterruptedException { // The code below is very delicate, to achieve these goals: // - call nanoTime exactly once for each call to park // - if nanos <= 0L, return promptly without allocation or nanoTime // - if nanos == Long.MIN_VALUE, don't underflow // - if nanos == Long.MAX_VALUE, and nanoTime is non-monotonic // and we suffer a spurious wakeup, we will do no worse than // to park-spin for a while long startTime = 0L; // Special value 0L means not yet parked WaitNode q = null; boolean queued = false; for (;;) { int s = state; if (s > COMPLETING) { if (q != null) q.thread = null; return s; } else if (s == COMPLETING) // We may have already promised (via isDone) that we are done // so never return empty-handed or throw InterruptedException Thread.yield(); else if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } else if (q == null) { if (timed && nanos <= 0L) return s; q = new WaitNode(); } else if (!queued) queued = WAITERS.weakCompareAndSet(this, q.next = waiters, q); else if (timed) { final long parkNanos; if (startTime == 0L) { // first time startTime = System.nanoTime(); if (startTime == 0L) startTime = 1L; parkNanos = nanos; } else { long elapsed = System.nanoTime() - startTime; if (elapsed >= nanos) { removeWaiter(q); return state; } parkNanos = nanos - elapsed; } // nanoTime may be slow; recheck before parking if (state < COMPLETING) LockSupport.parkNanos(this, parkNanos); } else LockSupport.park(this); } }复制代码
运行任务
public void run() { if (state != NEW || !RUNNER.compareAndSet(this, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); }}
//完成后,改变任务状态,设置返回结果,通知其它线程protected void set(V v) { if (STATE.compareAndSet(this, NEW, COMPLETING)) { outcome = v; STATE.setRelease(this, NORMAL); // final state finishCompletion(); }}private void finishCompletion() { // assert state > COMPLETING; for (WaitNode q; (q = waiters) != null;) { if (WAITERS.weakCompareAndSet(this, q, null)) { for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } }
done();
callable = null; // to reduce footprint}复制代码
实时内容请关注微信公众号,公众号与博客同时更新:程序员星星
划线
评论
复制
发布于: 2021 年 03 月 10 日阅读数: 13
版权声明: 本文为 InfoQ 作者【徐海兴】的原创文章。
原文链接:【http://xie.infoq.cn/article/69be1baed0edcf4b9566bcc6a】。
本文遵守【CC-BY 4.0】协议,转载请保留原文出处及本版权声明。
徐海兴
关注
还未添加个人签名 2018.08.28 加入
还未添加个人简介











评论