写点什么

FutureTask 源码解读,阻塞获取异步计算结果(阻塞、取消、装饰器、适配器、Callable)

用户头像
徐同学呀
关注
发布于: 2021 年 04 月 17 日

一、前言

FutureTask 继承自Runnable,所以也可以实现异步执行的效果,但是和常规的异步执行方式不同,常规异步只要求异步的过程是正确的就可以了,而FutureTask不仅可以知道异步执行的状态,还可以知道异步结果。那它是如何实现的呢?


FutureTask在 JUC 中是一个比较重要的类:


  • 它是ScheduledThreadPoolExecutor内部类ScheduledFutureTask的父类,ScheduledThreadPoolExecutor实现延迟和周期性调度功能时调用的就是FutureTask的函数。

  • 再者线程池的抽象父类AbstractExecutorService中有一个函数submit(),也是可以提交任务异步执行,其内部通过将任务类包装成FutureTask提交给工作线程异步执行,更是被“面试圣经”总结为是第三种实现线程的方式。这种说法是肤浅的,感知肤浅正是探索底层的开始。


//AbstractExecutorService#submitpublic <T> Future<T> submit(Callable<T> task) {    if (task == null) throw new NullPointerException();    RunnableFuture<T> ftask = newTaskFor(task);    execute(ftask);    return ftask;}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new FutureTask<T>(callable);}
复制代码

二、原理简述

FutureTask作为Runnable的子类,它就像是一个装饰器在Runnable异步执行的功能上,又增加了可以获取异步执行状态以及结果的功能:


  • 其内部维护了一个Callable类型的成员变量,任务代码会包装成callableFutureTask直接调用Callable.call()执行任务代码并返回结果。

  • 还维护了一个链表实现的栈,外部获取结果的线程在任务没有执行完前都会被压入栈并阻塞(awaitDone),任务完成唤醒所有阻塞线程(finishCompletion)。

三、基本结构

FutureTask实现了接口RunnableFutureRunnableFuture继承了接口RunnableFuture


1、基本定义

FutureTask有 7 个状态:NEW(新建)、COMPLETING(正在完成)、NORMAL(正常完成)、EXCEPTIONAL(异常完成)、CANCELLED(被取消)、INTERRUPTING(正在中断)、INTERRUPTED(被中断)。如下是状态流转:


  • NEW -> COMPLETING :此时 get 会被阻塞,并将当前线程放入阻塞栈中。

  • NEW -> COMPLETING -> NORMAL :此时outcomecallable的运行结果。

  • NEW -> COMPLETING -> EXCEPTIONAL :此时outcomecallable的运行异常。

  • NEW -> CANCELLED :调用了cancel(false)取消任务,删除并唤醒所有等待的线程。

  • NEW -> INTERRUPTING -> INTERRUPTED :调用了cancel(true)中断任务,删除并唤醒所有等待的线程。


FutureTask的两大特点阻塞和取消:


  • 阻塞是由waiters属性实现,它是一个由链表实现的栈,先进后出。

  • 取消是由runner属性实现,runnerCAS的方式记录当前运行线程,运行完成会再次设置为 null,运行过程中取消,将调用runner.interrupt()中断运行线程,在线程池中取消是中断工作线程。


public class FutureTask<V> implements RunnableFuture<V> {        private volatile int state;    private static final int NEW          = 0;    private static final int COMPLETING   = 1;    private static final int NORMAL       = 2;    private static final int EXCEPTIONAL  = 3;    private static final int CANCELLED    = 4;    private static final int INTERRUPTING = 5;    private static final int INTERRUPTED  = 6;
/** The underlying callable; nulled out after running */ //底层调用的任务 private Callable<V> callable; /** The result to return or exception to throw from get() */ private Object outcome; // non-volatile, protected by state reads/writes //记录当前计算线程,用于取消时,中断计算。 private volatile Thread runner; /** Treiber stack of waiting threads */ //等待栈 private volatile WaitNode waiters;}
复制代码

2、构造函数

构造函数有两种,一种参数类型是Callable,可直接赋值给成员变量callable;另一种参数类型是Runnable,并可传入一个result,不过没什么用。


public FutureTask(Callable<V> callable) {    if (callable == null)        throw new NullPointerException();    this.callable = callable;    this.state = NEW;       // ensure visibility of callable}
public FutureTask(Runnable runnable, V result) { //设置进来的runnable 会被适配成callable(RunnableAdapter) this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable}
复制代码


Runnable会被包装成一个实现了CallableRunnableAdapter赋值给callable。适配的过程也可以看出result是怎么传进去就会怎么返回。


//Executorspublic static <T> Callable<T> callable(Runnable task, T result) {    if (task == null)        throw new NullPointerException();    //将Runnable适配成Callable    return new RunnableAdapter<T>(task, result);}//适配器、典型的适配器模式static final class RunnableAdapter<T> implements Callable<T> {        final Runnable task;        final T result;        RunnableAdapter(Runnable task, T result) {            this.task = task;            this.result = result;        }        public T call() {            task.run();            //传入的result并没有做任何处理            return result;        }}
复制代码


所以官方给的函数AbstractExecutorService#submit(java.lang.Runnable, T),也可以传入Runnable类型的任务和result,不过只能判断任务是否完成或者取消任务,外部getresult还是传进去的result,没有太大的意义。

3、核心函数

(1)get 阻塞获取结果

get()有两种,一种是会等待计算结束返回,一种是加了超时时长timeout,get 线程等待timeout时长,如果此时还没有完成就抛出异常TimeoutException


/** * 会一直阻塞到计算结束,可以被打断 * @throws CancellationException {@inheritDoc} */public V get() throws InterruptedException, ExecutionException {    int s = state;    if (s <= COMPLETING)        //运行未完成,放入等待栈中        s = awaitDone(false, 0L);    return report(s);}
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (unit == null) throw new NullPointerException(); int s = state; if (s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING) //阻塞时间到,若还没有完成就抛异常 throw new TimeoutException(); return report(s);}
private V report(int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) //如果是正常结束,返回结果 return (V)x; //取消或者中断的就抛出取消异常 if (s >= CANCELLED) throw new CancellationException(); //其他状态的抛异常EXCEPTIONAL throw new ExecutionException((Throwable)x);}
复制代码
awaitDone 阻塞等待

阻塞的核心代码就是awaitDone,get 时若正在计算,将会被放入等待栈阻塞,直到超时时间到,或者被唤醒,或者被中断,然后返回当前的状态。


  1. 当前 get 线程被打断,删除等待节点,并抛出InterruptedException

  2. 若此时s > COMPLETING有可能完成、取消、中断,将等待节点(WaitNode)的 thread 设置为 null,并返回当前状态。

  3. s == COMPLETING说明正在完成,暂停当前 get 线程,让出对cpu的占用,执行其他 get 线程。

  4. 若当前 get 线程还没有入等待栈,实例化一个WaitNode,并cas入栈。

  5. 最后阻塞,阻塞分为时间阻塞和永久阻塞,阻塞时间到,删除等待节点并返回当前状态。


private int awaitDone(boolean timed, long nanos)    throws InterruptedException {    final long deadline = timed ? System.nanoTime() + nanos : 0L;    WaitNode q = null;    boolean queued = false;    //循环阻塞,循环终止- 阻塞时间到,或者被唤醒,然后返回当前的状态    for (;;) {        if (Thread.interrupted()) {            removeWaiter(q);            throw new InterruptedException();        }
int s = state; if (s > COMPLETING) { //完成,取消,中断 if (q != null) q.thread = null; //返回结果状态 return s; } else if (s == COMPLETING) // cannot time out yet //正在完成,暂停当前线程,执行其他get线程 Thread.yield(); else if (q == null) //NEW 状态,新建一个WaitNode q = new WaitNode(); else if (!queued) //cas入栈 queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); else if (timed) { //需要时间阻塞 nanos = deadline - System.nanoTime(); if (nanos <= 0L) { //不需要等待,删除等待节点 removeWaiter(q); return state; } //阻塞一段时间 LockSupport.parkNanos(this, nanos); } else //timed false 会一直阻塞到 计算完成, 需要唤醒 LockSupport.park(this); }}
/** * 栈,node.thread=null的节点会被删除 * @param node */private void removeWaiter(WaitNode node) { if (node != null) { node.thread = null; retry: for (;;) { // restart on removeWaiter race //循环删除node.thread = null的节点 for (WaitNode pred = null, q = waiters, s; q != null; q = s) { s = q.next; if (q.thread != null) pred = q; else if (pred != null) { pred.next = s; if (pred.thread == null) // check for race continue retry; } else if (!UNSAFE.compareAndSwapObject(this, waitersOffset, q, s)) continue retry; } break; } }}
复制代码

(2)cancel 取消任务中断工作线程

调用 cancel()可能是被中断(mayInterruptIfRunning=true)或者是主动取消(mayInterruptIfRunning=false)。


  • 中断取消,设置stateINTERRUPTING,并调用runner.interrupt()中断当前运行线程,设置stateINTERRUPTED,最后清空等待栈中所有阻塞节点并唤醒所有等待的线程。

  • 主动取消,设置stateCANCELLED,最后清空等待栈中所有阻塞节点并唤醒所有等待的线程。


从代码可以看出只有stateNEW才能被取消。


public boolean cancel(boolean mayInterruptIfRunning) {    if (!(state == NEW &&            //cass设置 state为INTERRUPTING or CANCELLED          UNSAFE.compareAndSwapInt(this, stateOffset, NEW,              mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))        //状态没有修改成功返回false,取消失败        return false;    try {    // in case call to interrupt throws exception        //如果是被打断的  就调用t.interrupt();中断线程        if (mayInterruptIfRunning) {            try {                Thread t = runner;                if (t != null)                    //中断当前线程                    t.interrupt();            } finally { // final state                //设置state为中断                UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);            }        }    } finally {        //最后  删除并唤醒所有等待的线程        finishCompletion();    }    return true;}//删除并唤醒所有等待的线程private void finishCompletion() {    // assert state > COMPLETING;    for (WaitNode q; (q = waiters) != null;) {        //循环cas WaitNode 为null,删除所有等待的线程        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {            for (;;) {                Thread t = q.thread;                if (t != null) {                    q.thread = null;                    //循环唤醒唤醒                    LockSupport.unpark(t);                }                WaitNode next = q.next;                if (next == null)                    //最后了,break                    break;                q.next = null; // unlink to help gc 设置为null  有利于gc                q = next;            }            break;        }    }    //钩子函数    done();    callable = null;        // to reduce footprint}
复制代码

(3)run 执行任务代码并唤醒所有等待线程

FutureTask是会被传给ThreadPoolExecutor.Worker,由线程池启动工作线程,然后调用FutureTaskrun()函数,run()又调用callable.call()run()会将结果设置给outcome


public void run() {    //新任务-->执行UNSAFE.compareAndSwapObject(this, runnerOffset,    //                                         null, Thread.currentThread()))    //新任务会将当前线程设置给runner 这里的作用是乐观锁,保证下面的执行流程是只有一个线程    if (state != NEW ||        !UNSAFE.compareAndSwapObject(this, runnerOffset,                                     null, Thread.currentThread()))        return;    try {        Callable<V> c = callable;        if (c != null && state == NEW) {            V result;            boolean ran;            try {                //调用callable的call,并设置返回值                //如果传进来的任务是Runnable,会被转换成callable                result = c.call();                //若运行异常,ran=false,异常会被捕获处理                //所以传进来的任务的run或者call代码块最好try-catch下                ran = true;            } catch (Throwable ex) {                result = null;                ran = false;                //设置异常给outcome                setException(ex);            }            if (ran)                //运行正常完成,设置结果给outcome                set(result);        }    } finally {        // runner must be non-null until state is settled to        // prevent concurrent calls to run()        //最后 runner=null 相当于是释放锁        runner = null;        // state must be re-read after nulling runner to prevent        // leaked interrupts        int s = state;        if (s >= INTERRUPTING)            //如果状态是被打断,让出cpu            handlePossibleCancellationInterrupt(s);    }}
复制代码


执行细节如下:


  • 新任务CAS设置当前线程给runner,这里的作用是乐观锁保证下面的执行流程只有一个线程,并且外部可以通过runner 随时中断执行。

  • 直接调用callable.call()执行任务代码。

  • 中途出现异常,将异常设置给outcome,状态流转为异常完成,并唤醒所有等待的线程。


protected void setException(Throwable t) {    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {        //设置异常        outcome = t;        //设置异常状态        UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state        //唤醒所有线程        finishCompletion();    }}
复制代码


  • 正常运行完毕将运行结果result设置给outcome,状态流转为正常完成,并唤醒所有等待的线程。


protected void set(V v) {    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {        //设置为完成状态        outcome = v;        //设置正常状态        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state        //完成操作-删除并唤醒所有等待的线程        finishCompletion();    }}
private void finishCompletion() { // assert state > COMPLETING; for (WaitNode q; (q = waiters) != null;) { //循环cas WaitNode 为null,删除所有等待的线程 //这里cas删除 其实是为了if里面的操作线程安全无锁 if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; //循环唤醒唤醒 LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) //最后了,break break; q.next = null; // unlink to help gc 设置为null 有利于gc q = next; } break; } } //钩子函数 done();
callable = null; // to reduce footprint}
复制代码


  • 最终runner设置为null,相当于释放锁;如若此时状态为被打断状态INTERRUPTING,需要让出cpu


private void handlePossibleCancellationInterrupt(int s) {    // It is possible for our interrupter to stall before getting a    // chance to interrupt us.  Let's spin-wait patiently.    if (s == INTERRUPTING)        while (state == INTERRUPTING)            //暂停当前线程,让出cpu时间片            Thread.yield(); // wait out pending interrupt}
复制代码

(4)runAndReset 重复执行

runAndReset()run()类似,但是没有把result设置给outcome,函数返回为boolean,用于是否重复执行。


protected boolean runAndReset() {    if (state != NEW ||        !UNSAFE.compareAndSwapObject(this, runnerOffset,                                     null, Thread.currentThread()))        return false;    boolean ran = false;    int s = state;    try {        Callable<V> c = callable;        if (c != null && s == NEW) {            try {                c.call(); // don't set result                //如果c.call抛异常,将会被处理,但是没有打印堆栈,使用者不易排查                //不会再往下执行ran=false                //所以传进来的任务run里需要自己try-catch                ran = true;            } catch (Throwable ex) {                //设置异常给outcome                setException(ex);            }        }    } finally {        // runner must be non-null until state is settled to        // prevent concurrent calls to run()        runner = null;        // state must be re-read after nulling runner to prevent        // leaked interrupts        s = state;        if (s >= INTERRUPTING)            //如果状态是被打断,让出cpu            handlePossibleCancellationInterrupt(s);    }    return ran && s == NEW;}
复制代码

(5)无锁化

FutureTask的几个成员变量并没有使用悲观锁Lock或者synchronized,而是用了cas乐观锁,而且也用了volatile修饰,使得state的流转,runner的设置,waiters的入栈出栈线程安全。


private static final sun.misc.Unsafe UNSAFE;private static final long stateOffset;private static final long runnerOffset;private static final long waitersOffset;static {    try {        //这不就是乐观锁吗 无锁化-牛逼        UNSAFE = sun.misc.Unsafe.getUnsafe();        Class<?> k = FutureTask.class;        stateOffset = UNSAFE.objectFieldOffset            (k.getDeclaredField("state"));        runnerOffset = UNSAFE.objectFieldOffset            (k.getDeclaredField("runner"));        waitersOffset = UNSAFE.objectFieldOffset            (k.getDeclaredField("waiters"));    } catch (Exception e) {        throw new Error(e);    }}
复制代码

四、实际应用

FutureTask可以配合线程池使用,也可以单独启动线程。


(1)配合线程池使用。


LinkedBlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>();ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(5, 5,        30, TimeUnit.SECONDS, workQueue);Future<Integer> future = poolExecutor.submit(new Callable<Integer>() {    @Override    public Integer call() throws Exception {        int i = 100;        int j = 100;        int sum = i + j;        Thread.sleep(2000);        return sum;    }});long s = System.currentTimeMillis();//获得计算结果Integer result = future.get();long e = System.currentTimeMillis();System.out.println("result=" + result + ",ms=" + (e-s));//result=200,ms=2016
复制代码


(2)单独启动线程


List<FutureTask> taskList = new ArrayList<FutureTask>();for (int i = 0; i < 3; i++) {    int j = i;    FutureTask<Integer> futureTask = new FutureTask<Integer>(new Callable<Integer>() {        @Override        public Integer call() throws Exception {            return j + 10;        }    });    Thread thread = new Thread(futureTask);    thread.start();    taskList.add(futureTask);}//批量获取结果for (FutureTask futureTask : taskList) {    System.out.println(futureTask.get());}
复制代码


需要注意:


无论是线程池 submit 提交任务还是批量启动多个线程使用FutureTask,切不可在一个 for 循环里一边异步执行一边获取结果,这样会使得整个过程因为阻塞获取结果变成单线程。


应该批量提交任务,批量获取结果

五、总结

  1. FutureTask的特点异步执行、阻塞获取结果、可取消。

  2. FutureTask运用了装饰器和适配器模式。装饰器的体现是在Runnable异步执行的基础上增加了异步阻塞获取结果和取消的功能;适配器的体现是当传入的是任务是Runnable类型时会被适配成一个实现了CallableRunnableAdapter

  3. callableFutureTask的成员变量,无法单独实现线程,配合FutureTask使用最佳。

  4. 任务正常完成outcome的值是callable.run的运行结果;任务异常完成outcome是异常。

  5. 成员变量runner的作用,可在外围中断取消正在执行的任务,也有保证任务执行线程安全。

  6. 成员变量waitersget获取结果当未完成前一个个入栈阻塞,完成时全部出栈唤醒。

  7. 因为runrunAndReset对异常不作为,建议任务代码自行try-catch


PS: 如若文章中有错误理解,欢迎批评指正,同时非常期待你的评论、点赞和收藏。我是徐同学,愿与你共同进步!


发布于: 2021 年 04 月 17 日阅读数: 11
用户头像

徐同学呀

关注

公众号:徐同学呀 2018.09.24 加入

专注于源码分析及Java底层架构开发领域。持续改进,坦诚合作!我是徐同学,愿与你共同进步!

评论

发布
暂无评论
FutureTask源码解读,阻塞获取异步计算结果(阻塞、取消、装饰器、适配器、Callable)