一、前言
FutureTask 继承自Runnable,所以也可以实现异步执行的效果,但是和常规的异步执行方式不同,常规异步只要求异步的过程是正确的就可以了,而FutureTask不仅可以知道异步执行的状态,还可以知道异步结果。那它是如何实现的呢?
FutureTask在 JUC 中是一个比较重要的类:
它是ScheduledThreadPoolExecutor内部类ScheduledFutureTask的父类,ScheduledThreadPoolExecutor实现延迟和周期性调度功能时调用的就是FutureTask的函数。
再者线程池的抽象父类AbstractExecutorService中有一个函数submit(),也是可以提交任务异步执行,其内部通过将任务类包装成FutureTask提交给工作线程异步执行,更是被“面试圣经”总结为是第三种实现线程的方式。这种说法是肤浅的,感知肤浅正是探索底层的开始。
//AbstractExecutorService#submitpublic <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask;}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new FutureTask<T>(callable);}
复制代码
二、原理简述
FutureTask作为Runnable的子类,它就像是一个装饰器在Runnable异步执行的功能上,又增加了可以获取异步执行状态以及结果的功能:
三、基本结构
FutureTask实现了接口RunnableFuture,RunnableFuture继承了接口Runnable和Future。
1、基本定义
FutureTask有 7 个状态:NEW(新建)、COMPLETING(正在完成)、NORMAL(正常完成)、EXCEPTIONAL(异常完成)、CANCELLED(被取消)、INTERRUPTING(正在中断)、INTERRUPTED(被中断)。如下是状态流转:
NEW -> COMPLETING :此时 get 会被阻塞,并将当前线程放入阻塞栈中。
NEW -> COMPLETING -> NORMAL :此时outcome是callable的运行结果。
NEW -> COMPLETING -> EXCEPTIONAL :此时outcome是callable的运行异常。
NEW -> CANCELLED :调用了cancel(false)取消任务,删除并唤醒所有等待的线程。
NEW -> INTERRUPTING -> INTERRUPTED :调用了cancel(true)中断任务,删除并唤醒所有等待的线程。
FutureTask的两大特点阻塞和取消:
public class FutureTask<V> implements RunnableFuture<V> { private volatile int state; private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6;
/** The underlying callable; nulled out after running */ //底层调用的任务 private Callable<V> callable; /** The result to return or exception to throw from get() */ private Object outcome; // non-volatile, protected by state reads/writes //记录当前计算线程,用于取消时,中断计算。 private volatile Thread runner; /** Treiber stack of waiting threads */ //等待栈 private volatile WaitNode waiters;}
复制代码
2、构造函数
构造函数有两种,一种参数类型是Callable,可直接赋值给成员变量callable;另一种参数类型是Runnable,并可传入一个result,不过没什么用。
public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable}
public FutureTask(Runnable runnable, V result) { //设置进来的runnable 会被适配成callable(RunnableAdapter) this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable}
复制代码
Runnable会被包装成一个实现了Callable的RunnableAdapter赋值给callable。适配的过程也可以看出result是怎么传进去就会怎么返回。
//Executorspublic static <T> Callable<T> callable(Runnable task, T result) { if (task == null) throw new NullPointerException(); //将Runnable适配成Callable return new RunnableAdapter<T>(task, result);}//适配器、典型的适配器模式static final class RunnableAdapter<T> implements Callable<T> { final Runnable task; final T result; RunnableAdapter(Runnable task, T result) { this.task = task; this.result = result; } public T call() { task.run(); //传入的result并没有做任何处理 return result; }}
复制代码
所以官方给的函数AbstractExecutorService#submit(java.lang.Runnable, T),也可以传入Runnable类型的任务和result,不过只能判断任务是否完成或者取消任务,外部get的result还是传进去的result,没有太大的意义。
3、核心函数
(1)get 阻塞获取结果
get()有两种,一种是会等待计算结束返回,一种是加了超时时长timeout,get 线程等待timeout时长,如果此时还没有完成就抛出异常TimeoutException。
/** * 会一直阻塞到计算结束,可以被打断 * @throws CancellationException {@inheritDoc} */public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) //运行未完成,放入等待栈中 s = awaitDone(false, 0L); return report(s);}
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (unit == null) throw new NullPointerException(); int s = state; if (s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING) //阻塞时间到,若还没有完成就抛异常 throw new TimeoutException(); return report(s);}
private V report(int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) //如果是正常结束,返回结果 return (V)x; //取消或者中断的就抛出取消异常 if (s >= CANCELLED) throw new CancellationException(); //其他状态的抛异常EXCEPTIONAL throw new ExecutionException((Throwable)x);}
复制代码
awaitDone 阻塞等待
阻塞的核心代码就是awaitDone,get 时若正在计算,将会被放入等待栈阻塞,直到超时时间到,或者被唤醒,或者被中断,然后返回当前的状态。
当前 get 线程被打断,删除等待节点,并抛出InterruptedException。
若此时s > COMPLETING有可能完成、取消、中断,将等待节点(WaitNode)的 thread 设置为 null,并返回当前状态。
若s == COMPLETING说明正在完成,暂停当前 get 线程,让出对cpu的占用,执行其他 get 线程。
若当前 get 线程还没有入等待栈,实例化一个WaitNode,并cas入栈。
最后阻塞,阻塞分为时间阻塞和永久阻塞,阻塞时间到,删除等待节点并返回当前状态。
private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; //循环阻塞,循环终止- 阻塞时间到,或者被唤醒,然后返回当前的状态 for (;;) { if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); }
int s = state; if (s > COMPLETING) { //完成,取消,中断 if (q != null) q.thread = null; //返回结果状态 return s; } else if (s == COMPLETING) // cannot time out yet //正在完成,暂停当前线程,执行其他get线程 Thread.yield(); else if (q == null) //NEW 状态,新建一个WaitNode q = new WaitNode(); else if (!queued) //cas入栈 queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); else if (timed) { //需要时间阻塞 nanos = deadline - System.nanoTime(); if (nanos <= 0L) { //不需要等待,删除等待节点 removeWaiter(q); return state; } //阻塞一段时间 LockSupport.parkNanos(this, nanos); } else //timed false 会一直阻塞到 计算完成, 需要唤醒 LockSupport.park(this); }}
/** * 栈,node.thread=null的节点会被删除 * @param node */private void removeWaiter(WaitNode node) { if (node != null) { node.thread = null; retry: for (;;) { // restart on removeWaiter race //循环删除node.thread = null的节点 for (WaitNode pred = null, q = waiters, s; q != null; q = s) { s = q.next; if (q.thread != null) pred = q; else if (pred != null) { pred.next = s; if (pred.thread == null) // check for race continue retry; } else if (!UNSAFE.compareAndSwapObject(this, waitersOffset, q, s)) continue retry; } break; } }}
复制代码
(2)cancel 取消任务中断工作线程
调用 cancel()可能是被中断(mayInterruptIfRunning=true)或者是主动取消(mayInterruptIfRunning=false)。
从代码可以看出只有state为NEW才能被取消。
public boolean cancel(boolean mayInterruptIfRunning) { if (!(state == NEW && //cass设置 state为INTERRUPTING or CANCELLED UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) //状态没有修改成功返回false,取消失败 return false; try { // in case call to interrupt throws exception //如果是被打断的 就调用t.interrupt();中断线程 if (mayInterruptIfRunning) { try { Thread t = runner; if (t != null) //中断当前线程 t.interrupt(); } finally { // final state //设置state为中断 UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); } } } finally { //最后 删除并唤醒所有等待的线程 finishCompletion(); } return true;}//删除并唤醒所有等待的线程private void finishCompletion() { // assert state > COMPLETING; for (WaitNode q; (q = waiters) != null;) { //循环cas WaitNode 为null,删除所有等待的线程 if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; //循环唤醒唤醒 LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) //最后了,break break; q.next = null; // unlink to help gc 设置为null 有利于gc q = next; } break; } } //钩子函数 done(); callable = null; // to reduce footprint}
复制代码
(3)run 执行任务代码并唤醒所有等待线程
FutureTask是会被传给ThreadPoolExecutor.Worker,由线程池启动工作线程,然后调用FutureTask的run()函数,run()又调用callable.call()。run()会将结果设置给outcome。
public void run() { //新任务-->执行UNSAFE.compareAndSwapObject(this, runnerOffset, // null, Thread.currentThread())) //新任务会将当前线程设置给runner 这里的作用是乐观锁,保证下面的执行流程是只有一个线程 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { //调用callable的call,并设置返回值 //如果传进来的任务是Runnable,会被转换成callable result = c.call(); //若运行异常,ran=false,异常会被捕获处理 //所以传进来的任务的run或者call代码块最好try-catch下 ran = true; } catch (Throwable ex) { result = null; ran = false; //设置异常给outcome setException(ex); } if (ran) //运行正常完成,设置结果给outcome set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() //最后 runner=null 相当于是释放锁 runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) //如果状态是被打断,让出cpu handlePossibleCancellationInterrupt(s); }}
复制代码
执行细节如下:
新任务CAS设置当前线程给runner,这里的作用是乐观锁保证下面的执行流程只有一个线程,并且外部可以通过runner 随时中断执行。
直接调用callable.call()执行任务代码。
中途出现异常,将异常设置给outcome,状态流转为异常完成,并唤醒所有等待的线程。
protected void setException(Throwable t) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { //设置异常 outcome = t; //设置异常状态 UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state //唤醒所有线程 finishCompletion(); }}
复制代码
protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { //设置为完成状态 outcome = v; //设置正常状态 UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state //完成操作-删除并唤醒所有等待的线程 finishCompletion(); }}
private void finishCompletion() { // assert state > COMPLETING; for (WaitNode q; (q = waiters) != null;) { //循环cas WaitNode 为null,删除所有等待的线程 //这里cas删除 其实是为了if里面的操作线程安全无锁 if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; //循环唤醒唤醒 LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) //最后了,break break; q.next = null; // unlink to help gc 设置为null 有利于gc q = next; } break; } } //钩子函数 done();
callable = null; // to reduce footprint}
复制代码
private void handlePossibleCancellationInterrupt(int s) { // It is possible for our interrupter to stall before getting a // chance to interrupt us. Let's spin-wait patiently. if (s == INTERRUPTING) while (state == INTERRUPTING) //暂停当前线程,让出cpu时间片 Thread.yield(); // wait out pending interrupt}
复制代码
(4)runAndReset 重复执行
runAndReset()和run()类似,但是没有把result设置给outcome,函数返回为boolean,用于是否重复执行。
protected boolean runAndReset() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return false; boolean ran = false; int s = state; try { Callable<V> c = callable; if (c != null && s == NEW) { try { c.call(); // don't set result //如果c.call抛异常,将会被处理,但是没有打印堆栈,使用者不易排查 //不会再往下执行ran=false //所以传进来的任务run里需要自己try-catch ran = true; } catch (Throwable ex) { //设置异常给outcome setException(ex); } } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts s = state; if (s >= INTERRUPTING) //如果状态是被打断,让出cpu handlePossibleCancellationInterrupt(s); } return ran && s == NEW;}
复制代码
(5)无锁化
FutureTask的几个成员变量并没有使用悲观锁Lock或者synchronized,而是用了cas乐观锁,而且也用了volatile修饰,使得state的流转,runner的设置,waiters的入栈出栈线程安全。
private static final sun.misc.Unsafe UNSAFE;private static final long stateOffset;private static final long runnerOffset;private static final long waitersOffset;static { try { //这不就是乐观锁吗 无锁化-牛逼 UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> k = FutureTask.class; stateOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("state")); runnerOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("runner")); waitersOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("waiters")); } catch (Exception e) { throw new Error(e); }}
复制代码
四、实际应用
FutureTask可以配合线程池使用,也可以单独启动线程。
(1)配合线程池使用。
LinkedBlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>();ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(5, 5, 30, TimeUnit.SECONDS, workQueue);Future<Integer> future = poolExecutor.submit(new Callable<Integer>() { @Override public Integer call() throws Exception { int i = 100; int j = 100; int sum = i + j; Thread.sleep(2000); return sum; }});long s = System.currentTimeMillis();//获得计算结果Integer result = future.get();long e = System.currentTimeMillis();System.out.println("result=" + result + ",ms=" + (e-s));//result=200,ms=2016
复制代码
(2)单独启动线程
List<FutureTask> taskList = new ArrayList<FutureTask>();for (int i = 0; i < 3; i++) { int j = i; FutureTask<Integer> futureTask = new FutureTask<Integer>(new Callable<Integer>() { @Override public Integer call() throws Exception { return j + 10; } }); Thread thread = new Thread(futureTask); thread.start(); taskList.add(futureTask);}//批量获取结果for (FutureTask futureTask : taskList) { System.out.println(futureTask.get());}
复制代码
需要注意:
无论是线程池 submit 提交任务还是批量启动多个线程使用FutureTask,切不可在一个 for 循环里一边异步执行一边获取结果,这样会使得整个过程因为阻塞获取结果变成单线程。
应该批量提交任务,批量获取结果。
五、总结
FutureTask的特点异步执行、阻塞获取结果、可取消。
FutureTask运用了装饰器和适配器模式。装饰器的体现是在Runnable异步执行的基础上增加了异步阻塞获取结果和取消的功能;适配器的体现是当传入的是任务是Runnable类型时会被适配成一个实现了Callable的RunnableAdapter。
callable是FutureTask的成员变量,无法单独实现线程,配合FutureTask使用最佳。
任务正常完成outcome的值是callable.run的运行结果;任务异常完成outcome是异常。
成员变量runner的作用,可在外围中断取消正在执行的任务,也有保证任务执行线程安全。
成员变量waiters,get获取结果当未完成前一个个入栈阻塞,完成时全部出栈唤醒。
因为run和runAndReset对异常不作为,建议任务代码自行try-catch。
PS: 如若文章中有错误理解,欢迎批评指正,同时非常期待你的评论、点赞和收藏。我是徐同学,愿与你共同进步!
评论