一、前言
FutureTask
继承自Runnable
,所以也可以实现异步执行的效果,但是和常规的异步执行方式不同,常规异步只要求异步的过程是正确的就可以了,而FutureTask
不仅可以知道异步执行的状态,还可以知道异步结果。那它是如何实现的呢?
FutureTask
在 JUC 中是一个比较重要的类:
它是ScheduledThreadPoolExecutor
内部类ScheduledFutureTask
的父类,ScheduledThreadPoolExecutor
实现延迟和周期性调度功能时调用的就是FutureTask
的函数。
再者线程池的抽象父类AbstractExecutorService
中有一个函数submit()
,也是可以提交任务异步执行,其内部通过将任务类包装成FutureTask
提交给工作线程异步执行,更是被“面试圣经”总结为是第三种实现线程的方式。这种说法是肤浅的,感知肤浅正是探索底层的开始。
//AbstractExecutorService#submit
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
复制代码
二、原理简述
FutureTask
作为Runnable
的子类,它就像是一个装饰器在Runnable
异步执行的功能上,又增加了可以获取异步执行状态以及结果的功能:
三、基本结构
FutureTask
实现了接口RunnableFuture
,RunnableFuture
继承了接口Runnable
和Future
。
1、基本定义
FutureTask
有 7 个状态:NEW
(新建)、COMPLETING
(正在完成)、NORMAL
(正常完成)、EXCEPTIONAL
(异常完成)、CANCELLED
(被取消)、INTERRUPTING
(正在中断)、INTERRUPTED
(被中断)。如下是状态流转:
NEW -> COMPLETING
:此时 get 会被阻塞,并将当前线程放入阻塞栈中。
NEW -> COMPLETING -> NORMAL
:此时outcome
是callable
的运行结果。
NEW -> COMPLETING -> EXCEPTIONAL
:此时outcome
是callable
的运行异常。
NEW -> CANCELLED
:调用了cancel(false)
取消任务,删除并唤醒所有等待的线程。
NEW -> INTERRUPTING -> INTERRUPTED
:调用了cancel(true)
中断任务,删除并唤醒所有等待的线程。
FutureTask
的两大特点阻塞和取消:
public class FutureTask<V> implements RunnableFuture<V> {
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
/** The underlying callable; nulled out after running */
//底层调用的任务
private Callable<V> callable;
/** The result to return or exception to throw from get() */
private Object outcome; // non-volatile, protected by state reads/writes
//记录当前计算线程,用于取消时,中断计算。
private volatile Thread runner;
/** Treiber stack of waiting threads */
//等待栈
private volatile WaitNode waiters;
}
复制代码
2、构造函数
构造函数有两种,一种参数类型是Callable
,可直接赋值给成员变量callable
;另一种参数类型是Runnable
,并可传入一个result
,不过没什么用。
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
public FutureTask(Runnable runnable, V result) {
//设置进来的runnable 会被适配成callable(RunnableAdapter)
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
复制代码
Runnable
会被包装成一个实现了Callable
的RunnableAdapter
赋值给callable
。适配的过程也可以看出result
是怎么传进去就会怎么返回。
//Executors
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
//将Runnable适配成Callable
return new RunnableAdapter<T>(task, result);
}
//适配器、典型的适配器模式
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
//传入的result并没有做任何处理
return result;
}
}
复制代码
所以官方给的函数AbstractExecutorService#submit(java.lang.Runnable, T)
,也可以传入Runnable
类型的任务和result
,不过只能判断任务是否完成或者取消任务,外部get
的result
还是传进去的result
,没有太大的意义。
3、核心函数
(1)get 阻塞获取结果
get()
有两种,一种是会等待计算结束返回,一种是加了超时时长timeout
,get 线程等待timeout
时长,如果此时还没有完成就抛出异常TimeoutException
。
/**
* 会一直阻塞到计算结束,可以被打断
* @throws CancellationException {@inheritDoc}
*/
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
//运行未完成,放入等待栈中
s = awaitDone(false, 0L);
return report(s);
}
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
//阻塞时间到,若还没有完成就抛异常
throw new TimeoutException();
return report(s);
}
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
//如果是正常结束,返回结果
return (V)x;
//取消或者中断的就抛出取消异常
if (s >= CANCELLED)
throw new CancellationException();
//其他状态的抛异常EXCEPTIONAL
throw new ExecutionException((Throwable)x);
}
复制代码
awaitDone 阻塞等待
阻塞的核心代码就是awaitDone
,get 时若正在计算,将会被放入等待栈阻塞,直到超时时间到,或者被唤醒,或者被中断,然后返回当前的状态。
当前 get 线程被打断,删除等待节点,并抛出InterruptedException
。
若此时s > COMPLETING
有可能完成、取消、中断,将等待节点(WaitNode
)的 thread 设置为 null,并返回当前状态。
若s == COMPLETING
说明正在完成,暂停当前 get 线程,让出对cpu
的占用,执行其他 get 线程。
若当前 get 线程还没有入等待栈,实例化一个WaitNode
,并cas
入栈。
最后阻塞,阻塞分为时间阻塞和永久阻塞,阻塞时间到,删除等待节点并返回当前状态。
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
//循环阻塞,循环终止- 阻塞时间到,或者被唤醒,然后返回当前的状态
for (;;) {
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
if (s > COMPLETING) {
//完成,取消,中断
if (q != null)
q.thread = null;
//返回结果状态
return s;
}
else if (s == COMPLETING) // cannot time out yet
//正在完成,暂停当前线程,执行其他get线程
Thread.yield();
else if (q == null)
//NEW 状态,新建一个WaitNode
q = new WaitNode();
else if (!queued)
//cas入栈
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
//需要时间阻塞
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
//不需要等待,删除等待节点
removeWaiter(q);
return state;
}
//阻塞一段时间
LockSupport.parkNanos(this, nanos);
}
else
//timed false 会一直阻塞到 计算完成, 需要唤醒
LockSupport.park(this);
}
}
/**
* 栈,node.thread=null的节点会被删除
* @param node
*/
private void removeWaiter(WaitNode node) {
if (node != null) {
node.thread = null;
retry:
for (;;) {
// restart on removeWaiter race
//循环删除node.thread = null的节点
for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
s = q.next;
if (q.thread != null)
pred = q;
else if (pred != null) {
pred.next = s;
if (pred.thread == null) // check for race
continue retry;
}
else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
q, s))
continue retry;
}
break;
}
}
}
复制代码
(2)cancel 取消任务中断工作线程
调用 cancel()可能是被中断(mayInterruptIfRunning=true
)或者是主动取消(mayInterruptIfRunning=false
)。
从代码可以看出只有state
为NEW
才能被取消。
public boolean cancel(boolean mayInterruptIfRunning) {
if (!(state == NEW &&
//cass设置 state为INTERRUPTING or CANCELLED
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
//状态没有修改成功返回false,取消失败
return false;
try { // in case call to interrupt throws exception
//如果是被打断的 就调用t.interrupt();中断线程
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
//中断当前线程
t.interrupt();
} finally { // final state
//设置state为中断
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
//最后 删除并唤醒所有等待的线程
finishCompletion();
}
return true;
}
//删除并唤醒所有等待的线程
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
//循环cas WaitNode 为null,删除所有等待的线程
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
//循环唤醒唤醒
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
//最后了,break
break;
q.next = null; // unlink to help gc 设置为null 有利于gc
q = next;
}
break;
}
}
//钩子函数
done();
callable = null; // to reduce footprint
}
复制代码
(3)run 执行任务代码并唤醒所有等待线程
FutureTask
是会被传给ThreadPoolExecutor.Worker
,由线程池启动工作线程,然后调用FutureTask
的run()
函数,run()
又调用callable.call()
。run()
会将结果设置给outcome
。
public void run() {
//新任务-->执行UNSAFE.compareAndSwapObject(this, runnerOffset,
// null, Thread.currentThread()))
//新任务会将当前线程设置给runner 这里的作用是乐观锁,保证下面的执行流程是只有一个线程
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
//调用callable的call,并设置返回值
//如果传进来的任务是Runnable,会被转换成callable
result = c.call();
//若运行异常,ran=false,异常会被捕获处理
//所以传进来的任务的run或者call代码块最好try-catch下
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
//设置异常给outcome
setException(ex);
}
if (ran)
//运行正常完成,设置结果给outcome
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
//最后 runner=null 相当于是释放锁
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
//如果状态是被打断,让出cpu
handlePossibleCancellationInterrupt(s);
}
}
复制代码
执行细节如下:
新任务CAS
设置当前线程给runner
,这里的作用是乐观锁保证下面的执行流程只有一个线程,并且外部可以通过runner
随时中断执行。
直接调用callable.call()
执行任务代码。
中途出现异常,将异常设置给outcome
,状态流转为异常完成,并唤醒所有等待的线程。
protected void setException(Throwable t) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
//设置异常
outcome = t;
//设置异常状态
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
//唤醒所有线程
finishCompletion();
}
}
复制代码
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
//设置为完成状态
outcome = v;
//设置正常状态
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
//完成操作-删除并唤醒所有等待的线程
finishCompletion();
}
}
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
//循环cas WaitNode 为null,删除所有等待的线程
//这里cas删除 其实是为了if里面的操作线程安全无锁
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
//循环唤醒唤醒
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
//最后了,break
break;
q.next = null; // unlink to help gc 设置为null 有利于gc
q = next;
}
break;
}
}
//钩子函数
done();
callable = null; // to reduce footprint
}
复制代码
private void handlePossibleCancellationInterrupt(int s) {
// It is possible for our interrupter to stall before getting a
// chance to interrupt us. Let's spin-wait patiently.
if (s == INTERRUPTING)
while (state == INTERRUPTING)
//暂停当前线程,让出cpu时间片
Thread.yield(); // wait out pending interrupt
}
复制代码
(4)runAndReset 重复执行
runAndReset()
和run()
类似,但是没有把result
设置给outcome
,函数返回为boolean
,用于是否重复执行。
protected boolean runAndReset() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return false;
boolean ran = false;
int s = state;
try {
Callable<V> c = callable;
if (c != null && s == NEW) {
try {
c.call(); // don't set result
//如果c.call抛异常,将会被处理,但是没有打印堆栈,使用者不易排查
//不会再往下执行ran=false
//所以传进来的任务run里需要自己try-catch
ran = true;
} catch (Throwable ex) {
//设置异常给outcome
setException(ex);
}
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
s = state;
if (s >= INTERRUPTING)
//如果状态是被打断,让出cpu
handlePossibleCancellationInterrupt(s);
}
return ran && s == NEW;
}
复制代码
(5)无锁化
FutureTask
的几个成员变量并没有使用悲观锁Lock
或者synchronized
,而是用了cas
乐观锁,而且也用了volatile
修饰,使得state
的流转,runner
的设置,waiters
的入栈出栈线程安全。
private static final sun.misc.Unsafe UNSAFE;
private static final long stateOffset;
private static final long runnerOffset;
private static final long waitersOffset;
static {
try {
//这不就是乐观锁吗 无锁化-牛逼
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = FutureTask.class;
stateOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("state"));
runnerOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("runner"));
waitersOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("waiters"));
} catch (Exception e) {
throw new Error(e);
}
}
复制代码
四、实际应用
FutureTask
可以配合线程池使用,也可以单独启动线程。
(1)配合线程池使用。
LinkedBlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>();
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(5, 5,
30, TimeUnit.SECONDS, workQueue);
Future<Integer> future = poolExecutor.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
int i = 100;
int j = 100;
int sum = i + j;
Thread.sleep(2000);
return sum;
}
});
long s = System.currentTimeMillis();
//获得计算结果
Integer result = future.get();
long e = System.currentTimeMillis();
System.out.println("result=" + result + ",ms=" + (e-s));
//result=200,ms=2016
复制代码
(2)单独启动线程
List<FutureTask> taskList = new ArrayList<FutureTask>();
for (int i = 0; i < 3; i++) {
int j = i;
FutureTask<Integer> futureTask = new FutureTask<Integer>(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return j + 10;
}
});
Thread thread = new Thread(futureTask);
thread.start();
taskList.add(futureTask);
}
//批量获取结果
for (FutureTask futureTask : taskList) {
System.out.println(futureTask.get());
}
复制代码
需要注意:
无论是线程池 submit 提交任务还是批量启动多个线程使用FutureTask
,切不可在一个 for 循环里一边异步执行一边获取结果,这样会使得整个过程因为阻塞获取结果变成单线程。
应该批量提交任务,批量获取结果。
五、总结
FutureTask
的特点异步执行、阻塞获取结果、可取消。
FutureTask
运用了装饰器和适配器模式。装饰器的体现是在Runnable
异步执行的基础上增加了异步阻塞获取结果和取消的功能;适配器的体现是当传入的是任务是Runnable
类型时会被适配成一个实现了Callable
的RunnableAdapter
。
callable
是FutureTask
的成员变量,无法单独实现线程,配合FutureTask
使用最佳。
任务正常完成outcome
的值是callable.run
的运行结果;任务异常完成outcome
是异常。
成员变量runner
的作用,可在外围中断取消正在执行的任务,也有保证任务执行线程安全。
成员变量waiters
,get
获取结果当未完成前一个个入栈阻塞,完成时全部出栈唤醒。
因为run
和runAndReset
对异常不作为,建议任务代码自行try-catch
。
PS: 如若文章中有错误理解,欢迎批评指正,同时非常期待你的评论、点赞和收藏。我是徐同学,愿与你共同进步!
评论