写点什么

FutureTask 源码解析

用户头像
徐海兴
关注
发布于: 2021 年 03 月 10 日

Future 设计模式

属性


    private volatile int state; //当前任务状态    private static final int NEW          = 0; //定义的状态常量    private static final int COMPLETING   = 1;    private static final int NORMAL       = 2;    private static final int EXCEPTIONAL  = 3;    private static final int CANCELLED    = 4;    private static final int INTERRUPTING = 5;    private static final int INTERRUPTED  = 6;
/** The underlying callable; nulled out after running */ private Callable<V> callable; //执行的任务 /** The result to return or exception to throw from get() */ private Object outcome; // non-volatile, protected by state reads/writes 结果值 /** The thread running the callable; CASed during run() */ private volatile Thread runner; //执行当前任务的线程 /** Treiber stack of waiting threads */ private volatile WaitNode waiters; //获取结果的等待队列
static final class WaitNode { volatile Thread thread; volatile WaitNode next; WaitNode() { thread = Thread.currentThread(); } }
复制代码


构造函数


public FutureTask(Callable<V> callable) {    if (callable == null)        throw new NullPointerException();    this.callable = callable;    this.state = NEW;       // ensure visibility of callable}
//runnable转换为callablepublic FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable}


复制代码


Executors 类, runnable 转换为 callable,适配器,result 为运行完成时返回的结果


public static <T> Callable<T> callable(Runnable task, T result) {    if (task == null)        throw new NullPointerException();    return new RunnableAdapter<T>(task, result);}
private static final class RunnableAdapter<T> implements Callable<T> { private final Runnable task; private final T result; RunnableAdapter(Runnable task, T result) { this.task = task; this.result = result; } public T call() { task.run(); return result; } public String toString() { return super.toString() + "[Wrapped task = " + task + "]"; }}
复制代码


取消线程


中断


public boolean cancel(boolean mayInterruptIfRunning) {    if (!(state == NEW && STATE.compareAndSet          (this, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))        return false;    try {    // in case call to interrupt throws exception        if (mayInterruptIfRunning) {            try {                Thread t = runner;                if (t != null)                    t.interrupt();            } finally { // final state                STATE.setRelease(this, INTERRUPTED);            }        }    } finally {        finishCompletion();    }    return true;}
复制代码


获取结果


根据状态判断,没完成就加入等待队列,park 此线程


    public V get() throws InterruptedException, ExecutionException {        int s = state;        if (s <= COMPLETING)            s = awaitDone(false, 0L);        return report(s);    }    private int awaitDone(boolean timed, long nanos)        throws InterruptedException {        // The code below is very delicate, to achieve these goals:        // - call nanoTime exactly once for each call to park        // - if nanos <= 0L, return promptly without allocation or nanoTime        // - if nanos == Long.MIN_VALUE, don't underflow        // - if nanos == Long.MAX_VALUE, and nanoTime is non-monotonic        //   and we suffer a spurious wakeup, we will do no worse than        //   to park-spin for a while        long startTime = 0L;    // Special value 0L means not yet parked        WaitNode q = null;        boolean queued = false;        for (;;) {            int s = state;            if (s > COMPLETING) {                if (q != null)                    q.thread = null;                return s;            }            else if (s == COMPLETING)                // We may have already promised (via isDone) that we are done                // so never return empty-handed or throw InterruptedException                Thread.yield();            else if (Thread.interrupted()) {                removeWaiter(q);                throw new InterruptedException();            }            else if (q == null) {                if (timed && nanos <= 0L)                    return s;                q = new WaitNode();            }            else if (!queued)                queued = WAITERS.weakCompareAndSet(this, q.next = waiters, q);            else if (timed) {                final long parkNanos;                if (startTime == 0L) { // first time                    startTime = System.nanoTime();                    if (startTime == 0L)                        startTime = 1L;                    parkNanos = nanos;                } else {                    long elapsed = System.nanoTime() - startTime;                    if (elapsed >= nanos) {                        removeWaiter(q);                        return state;                    }                    parkNanos = nanos - elapsed;                }                // nanoTime may be slow; recheck before parking                if (state < COMPLETING)                    LockSupport.parkNanos(this, parkNanos);            }            else                LockSupport.park(this);        }    }
复制代码


运行任务


public void run() {    if (state != NEW ||        !RUNNER.compareAndSet(this, null, Thread.currentThread()))        return;    try {        Callable<V> c = callable;        if (c != null && state == NEW) {            V result;            boolean ran;            try {                result = c.call();                ran = true;            } catch (Throwable ex) {                result = null;                ran = false;                setException(ex);            }            if (ran)                set(result);        }    } finally {        // runner must be non-null until state is settled to        // prevent concurrent calls to run()        runner = null;        // state must be re-read after nulling runner to prevent        // leaked interrupts        int s = state;        if (s >= INTERRUPTING)            handlePossibleCancellationInterrupt(s);    }}
//完成后,改变任务状态,设置返回结果,通知其它线程protected void set(V v) { if (STATE.compareAndSet(this, NEW, COMPLETING)) { outcome = v; STATE.setRelease(this, NORMAL); // final state finishCompletion(); }}private void finishCompletion() { // assert state > COMPLETING; for (WaitNode q; (q = waiters) != null;) { if (WAITERS.weakCompareAndSet(this, q, null)) { for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } }
done();
callable = null; // to reduce footprint}
复制代码


实时内容请关注微信公众号,公众号与博客同时更新:程序员星星


发布于: 2021 年 03 月 10 日阅读数: 13
用户头像

徐海兴

关注

还未添加个人签名 2018.08.28 加入

还未添加个人简介

评论

发布
暂无评论
FutureTask源码解析