写点什么

并发编程 -FutureTask 解析 | 京东物流技术团队

  • 2023-07-27
    北京
  • 本文字数:6081 字

    阅读完需:约 20 分钟

并发编程-FutureTask解析 | 京东物流技术团队

1、FutureTask 对象介绍

Future 对象大家都不陌生,是 JDK1.5 提供的接口,是用来以阻塞的方式获取线程异步执行完的结果。


在 Java 中想要通过线程执行一个任务,离不开 Runnable 与 Callable 这两个接口。


Runnable 与 Callable 的区别在于,Runnable 接口只有一个 run 方法,该方法用来执行逻辑,但是并没有返回值;而 Callable 的 call 方法,同样用来执行业务逻辑,但是是有一个返回值的。


Callable 执行任务过程中可以通过 FutureTask 获得任务的执行状态,并且可以在执行完成后通过 Future.get()方式获取执行结果。


Future 是一个接口,而 FutureTask 就是 Future 的实现类。并且 FutureTask 实现了 RunnableFuture(Runnable + Future),说明我们可以创建一个 FutureTask 并直接把它放到线程池执行,然后获取 FutureTask 的执行结果。

2、FutureTask 源码解析

2.1 主要方法和属性

那么 FutureTask 是如何通过阻塞的方式来获取到异步线程执行的结果的呢?我们看下 FutureTask 中的属性。


// FutureTask的状态及其常量private volatile int state;    private static final int NEW          = 0;    private static final int COMPLETING   = 1;    private static final int NORMAL       = 2;    private static final int EXCEPTIONAL  = 3;    private static final int CANCELLED    = 4;    private static final int INTERRUPTING = 5;    private static final int INTERRUPTED  = 6;        // callable对象,执行完后置空    private Callable<V> callable;    // 要返回的结果或要引发的异常来自 get() 方法    private Object outcome; // non-volatile, protected by state reads/writes    // 执行Callable的线程    private volatile Thread runner;    // 等待线程的一个链表结构    private volatile WaitNode waiters;
复制代码


FutureTask 中几个比较重要的方法。


// 取消任务的执行boolean cancel(boolean mayInterruptIfRunning);// 返回任务是否已经被取消boolean isCancelled();// 返回任务是否已经完成,任务状态不为NEW即为完成boolean isDone();// 通过get方法获取任务的执行结果V get() throws InterruptedException, ExecutionException;// 通过get方法获取任务的执行结果,带有超时,如果超过给定时间则抛出异常V get(long timeout, TimeUnit unit)        throws InterruptedException, ExecutionException, TimeoutException;
复制代码

2.2 FutureTask 执行

当我们在线程池中执行一个 Callable 方法时,其实是将 Callable 任务封装成一个 RunnableFuture 对象去执行,同时将这个 RunnableFuture 对象返回,这样我们就拿到了 FutureTask 的引用,可以随时获取到任务执行的状态,并且可以在任务执行完成后通过该对象获取执行结果。


以下为 ThreadPoolExecutor 线程池提交一个 callable 方法的源码。


public <T> Future<T> submit(Callable<T> task) {        if (task == null) throw new NullPointerException();        RunnableFuture<T> ftask = newTaskFor(task);        execute(ftask);        return ftask;    }    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {        return new FutureTask<T>(callable);    }
复制代码

2.3 run 方法介绍

RunnableFuture 其实也是一个可以执行的 runnable,我们看下他的 run 方法。其主要流程就是执行 call 方法,正常执行完毕后将 result 结果赋值到 outcome 属性上。


public void run() {        if (state != NEW ||            !UNSAFE.compareAndSwapObject(this, runnerOffset,                                         null, Thread.currentThread()))            return;        try {            // 将callable赋值到本地变量            Callable<V> c = callable;            // 判断callable不为空并且FutureTask的状态必须为新创建            if (c != null && state == NEW) {                V result;                boolean ran;                try {                    // 执行call方法(用户自己实现的call逻辑),并获取到result结果                    result = c.call();                    ran = true;                } catch (Throwable ex) {                    result = null;                    ran = false;                    // 如果执行过程出现异常,则将异常对象赋值到outcome上                    setException(ex);                }                // 如果正常执行完毕,则将result赋值到outcome属性上                if (ran)                    set(result);            }        } finally {            // runner must be non-null until state is settled to            // prevent concurrent calls to run()            runner = null;            // state must be re-read after nulling runner to prevent            // leaked interrupts            int s = state;            if (s >= INTERRUPTING)                handlePossibleCancellationInterrupt(s);        }    }
复制代码


以下逻辑为正常执行完成后赋值的逻辑。


// 如果任务没有被取消,将future执行完的返回值赋值给result结果// FutureTask任务的执行状态是通过CAS的方式进行赋值的,并且由此可知,COMPLETING其实是一个瞬时状态// 当将线程执行结果赋值给outcome后,状态会修改为对应的NORMAL,即正常结束protected void set(V v) {        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {            outcome = v;            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state            finishCompletion();        }    }
复制代码


以下为执行异常时赋值逻辑,直接将 Throwable 对象赋值到 outcome 属性上。


protected void setException(Throwable t) {        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {            outcome = t;            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state            finishCompletion();        }    }
复制代码


无论是正常执行还是异常执行,最终都会调用一个 finishCompletion 方法,用来做工作的收尾工作。

2.4 get 方法介绍

Future 的 get 方法有两个重载的方法,一个是 get()获取结果,一个是 get(long, TimeUnit)带有超时时间的获取结果,我们看下 FutureTask 中的这两个方法是如何实现的。


// 不带有超时时间,一直阻塞直到获取结果public V get() throws InterruptedException, ExecutionException {        int s = state;        if (s <= COMPLETING)            // 等待结果完成,带有超时的get方法也是调用的awaitDone方法            s = awaitDone(false, 0L);        // 返回结果        return report(s);    }
// 带有超时时间的获取结果,如果超过时间还没有获取到结果则抛出异常public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (unit == null) throw new NullPointerException(); int s = state; // 如果任务未中断,调用awaitDone方法等待任务结果 if (s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING) throw new TimeoutException(); // 返回结果 return report(s); }
复制代码


我们主要看下 awaitDone 方法的执行逻辑。此方法会通过 for 循环的方式一直阻塞等待任务执行完成。如果带有超时时间,则超过截止时间后会直接返回。


// timed:是否需要超时获取// nanos:超时时间单位纳秒private int awaitDone(boolean timed, long nanos)        throws InterruptedException {        final long deadline = timed ? System.nanoTime() + nanos : 0L;        WaitNode q = null;        boolean queued = false;        // 此方法会一直for循环判断任务状态是否已经完成,是Future.get阻塞的原因        for (;;) {            if (Thread.interrupted()) {                removeWaiter(q);                throw new InterruptedException();            }
int s = state; // 任务状态大于COMPLETING,则表明任务结束,直接返回 if (s > COMPLETING) { if (q != null) q.thread = null; return s; } else if (s == COMPLETING) // cannot time out yet // Thread.yield() 方法,使当前线程由执行状态,变成为就绪状态,让出cpu时间,在下一个线程执行时候,此线程有可能被执行,也有可能没有被执行。 // COMPLETING状态为瞬时状态,任务执行完成,要么是正常结束,要么异常结束,后续会被置为NORMAL或者EXCEPTIONAL Thread.yield(); else if (q == null) // 每调用一次get方法,都会创建一个WaitNode等待节点 q = new WaitNode(); else if (!queued) // 将该等待节点添加到链表结构waiters中,q.next = waiters 即在waiters的头部插入 queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); // 如果方法带有超时判断,则判断当前时间是否已经超过了截止时间,如果超过了及截止日期,则退出循环直接返回当前状态,此时任务状态一定是NEW else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } else LockSupport.park(this); } }
复制代码


我们在看下 report 方法,在调用 get 方法时是如何返回结果的。


这里首先获取 outcome 的值,并判断任务是否已经执行完成,如果执行完成,则将 outcome 对象强转成泛型指定的类型;如果任务被取消了,则抛出一个 CancellationException 异常;如果都不是,则说明任务在执行过程中发生了异常,此时任务状态位 EXCEPTIONAL,此时的 outcome 即为 Throwable 对象,所以将 outcome 强转为 Throwable 并抛出异常。


由此可以知道,我们将一个 FutureTask 任务 submit 到线程池中执行的时候,如果发生了异常,是会在调用 get 方法的时候抛出的。


private V report(int s) throws ExecutionException {        Object x = outcome;        if (s == NORMAL)            return (V)x;        if (s >= CANCELLED)            throw new CancellationException();        throw new ExecutionException((Throwable)x);    }
复制代码

2.5 cancel 方法介绍

cancel 方法用于取消正在运行的任务,如果任务取消成功,则返回 TRUE,如果取消失败则返回 FALSE。


// mayInterruptIfRunning:允许中断正在运行的任务public boolean cancel(boolean mayInterruptIfRunning) {        // mayInterruptIfRunning如果为true则将状态置为INTERRUPTING,如果未false则将状态置为CANCELLED        if (!(state == NEW &&              UNSAFE.compareAndSwapInt(this, stateOffset, NEW,                  mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))            return false;        // 如果状态修改成功后,判断是否允许中断线程,如果允许,则调用Thread的interrupt方法中断        try {    // in case call to interrupt throws exception            if (mayInterruptIfRunning) {                try {                    Thread t = runner;                    if (t != null)                        t.interrupt();                } finally { // final state                    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);                }            }        } finally {            // 取消后的收尾工作            finishCompletion();        }        return true;    }
复制代码

2.6 isDone/isCancelled 方法介绍

isDone 方法用于判断 FutureTask 是否已经完成;isCancelled 方法用来判断 FutureTask 是否已经取消,这两个方法都是通过状态位来判断的。


public boolean isCancelled() {        return state >= CANCELLED;    }
public boolean isDone() { return state != NEW; }
复制代码

2.7 finishCompletion 方法介绍

我们看下 finishCompletion 方法都做了哪些工作。


// 删除所有等待线程并发出信号,最后执行done方法private void finishCompletion() {        // assert state > COMPLETING;        for (WaitNode q; (q = waiters) != null;) {            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {                for (;;) {                    Thread t = q.thread;                    if (t != null) {                        q.thread = null;                        LockSupport.unpark(t);                    }                    WaitNode next = q.next;                    if (next == null)                        break;                    q.next = null; // unlink to help gc                    q = next;                }                break;            }        }
done();
callable = null; // to reduce footprint }
复制代码


我们看到 done 方法是一个受保护的空方法,此处没有任何逻辑,由其子类去根据自己的业务去实现相应的逻辑。例如:java.util.concurrent.ExecutorCompletionService.QueueingFuture。


protected void done() { }
复制代码

3、总结

通过源码解读可以了解到 Future 的原理:


第一步:主线程将任务封装成一个 Callable 对象,通过 submit 方法提交到线程池去执行。


第二步:线程池执行任务的 run 方法,主线程则可以继续执行其他逻辑。


第三步:线程池中方法执行完成后将结果赋值到 outcome 属性上,并修改任务状态。


第四步:主线程在需要拿到异步任务结果的时候,主动调用 fugure.get()方法来获取结果。


第五步:如果异步线程在执行过程中发生异常,则会在调用 future.get()方法的时候抛出来。


以上就是对于 FutureTask 的分析,我们可以了解 FutureTask 任务执行的方式以及 Future.get 已阻塞的方式获取线程执行的结果原理,并且从代码中可以了解 FutureTask 的任务执行状态以及状态的变化过程。


作者:京东物流 丁冬

来源:京东云开发者社区 自猿其说 Tech

发布于: 刚刚阅读数: 4
用户头像

拥抱技术,与开发者携手创造未来! 2018-11-20 加入

我们将持续为人工智能、大数据、云计算、物联网等相关领域的开发者,提供技术干货、行业技术内容、技术落地实践等文章内容。京东云开发者社区官方网站【https://developer.jdcloud.com/】,欢迎大家来玩

评论

发布
暂无评论
并发编程-FutureTask解析 | 京东物流技术团队_并发编程_京东科技开发者_InfoQ写作社区