写点什么

Java Core 「14」J.U.C 线程池 -Future & FutureTask

作者:Samson
  • 2022 年 6 月 21 日
  • 本文字数:3426 字

    阅读完需:约 11 分钟

01-Future

A Future represents the result of an asynchronous computation.

Future 中定义了 5 个方法

  • get() / get(long, TimeUnit),获取任务的计算结果,在计算任务未完成时阻塞当前线程直至计算完成或超时。

  • cancel(boolean),尝试取消任务的执行。该方法在以下机种情况下会失败(即返回 false):a) 任务已完成;b) 任务已被取消;c) 其他不能取消的情形;如果任务还没开始执行,取消后的任务不会再执行;如果任务已经开始,boolean 参数来控制是否中断任务的执行,true 表示会中断,false 表示不中断。

  • isCancelled(),cancel(boolean) 返回值为 true 的 Future 上调用 isCancelled() 方法返回值为 true。

  • isDone(),任务完成时,返回值为 true。以下几种都被认为时任务完成的情况:a) 正常结束,b) 异常结束,c) 被取消。

A cancellable asynchronous computation. This class provides a base implementation of Future.

02-FutureTask

FutureTask 是 Future 接口的一个实现。而且,FutureTask 实现了 Runnable 接口,因此可以提交到线程中执行。

02.1-FutureTask 的核心属性

/** FutureTask 的运行状态 */private volatile int state;/** 底层的可执行任务 */private Callable<V> callable;/** 任务与运行后的结果,可能是计算结果,也可能是异常 */private Object outcome;/** 执行任务的线程 */private volatile Thread runner;/** 等待的线程队列 */private volatile WaitNode waiters;
复制代码

刚创建时,state 的值为 NEW(0)。COMPLETING(1) 和 INTERRUPTING(5) 是一种中间状态,持续时间较短暂。

  • COMPLETING,任务已经执行完毕或正在执行时,发成异常,但是任务结果或异常尚未保存到 outcome 中,state 会从 NEW → COMPLETING。

  • INTERRUPTING,任务尚未开始执行或正在执行时,调用 cancel(true) 方法取消任务,在尚未中断线程时,state 会从 NEW → INTERRUPTING。

除此之外,其他几个都是最终状态。Future 中定义的 isDone 方法和 isCancelled 方法都是基于 state 变量判断的,前者返回值为state !=NEW,后者返回值为state >= CANCELLED



FutureTask 中的 Callable 是要执行的任务,在构造器 FutureTask(Callable) 中传入,或者在构造器 FutureTask(Runable, V) 中根据 Runable 对象创建Executors.*callable*(runnable, result);

02.2-FutureTask 的核心方法

FutureTask 实现了 Runnable 接口,实现了其中定义的public void run();方法。

public void run() {    if (state != NEW ||   /** 防止重复运行 */        /** FutureTask 已交给其他线程去执行 */        !RUNNER.compareAndSet(this, null, Thread.currentThread()))         return;    try {        Callable<V> c = callable;        if (c != null && state == NEW) {            V result;            boolean ran;            try {							  /** 执行任务,获得返回值 */                result = c.call();                ran = true;            } catch (Throwable ex) {                result = null;                ran = false;								/** 设置 outcome 为异常,                 * state 通过 CAS 方式先被设置为 COMPLETING,之后被设置为 EXCEPTIONAL                 * 后面遍历阻塞队列,唤醒阻塞线程                 */                setException(ex);              }            if (ran)                /** 设置 outcome 为任务执行结果                  * state 通过 CAS 方式先被设置为 COMPLETING,之后被设置为 NORMAL		             * 后面遍历阻塞队列,唤醒阻塞线程                 */                set(result);        }    } finally {        // runner must be non-null until state is settled to        // prevent concurrent calls to run()        runner = null;        // state must be re-read after nulling runner to prevent        // leaked interrupts        int s = state;        if (s >= INTERRUPTING)            handlePossibleCancellationInterrupt(s);    }}
复制代码

get 方法是 Future 接口中定义的,用来获得任务的计算结果。如果任务尚未完成,则阻塞调用 get 方法的线程(底层通过 awaitDone 方法实现)。

public V get() throws InterruptedException, ExecutionException {    int s = state;    /** state 为 NEW / COMPLETING 时,     * 新创建的对象 state 为 NEW,setExeception 或 set 方法会将 state 设置为 COMPLETING     * 上述两种状态都需要阻塞线程     */    if (s <= COMPLETING)	      /** 阻塞线程 */        s = awaitDone(false, 0L);	     /** 根据 state,返回任务结果或抛对应的异常 */    return report(s);}
复制代码


private int awaitDone(boolean timed, long nanos)    throws InterruptedException {    long startTime = 0L;    // Special value 0L means not yet parked    WaitNode q = null;    boolean queued = false;  /** 表示当前线程(调用了 get 方法)尚未被放入到等待队列中 */    for (;;) {        int s = state;        if (s > COMPLETING) { /** 尝试将当前线程加入到等待队列时,任务完成了,则放弃将当前线程加入到等待队列 */            if (q != null)                q.thread = null;            return s;        }        else if (s == COMPLETING) /** 需要稍等一会儿,等待任务结果或异常设置到 outcome */            // We may have already promised (via isDone) that we are done            // so never return empty-handed or throw InterruptedException            Thread.yield();        else if (Thread.interrupted()) { /** 下面的所有情况都是 state == NEW 的情形 */ 	          /** 当前线程如果被中断,则放弃添加当前线程到队列,并抛出异常 */            removeWaiter(q);            throw new InterruptedException();        }        else if (q == null) { /** 创建节点,将当前线程包装 */            if (timed && nanos <= 0L)                return s;            q = new WaitNode();        }        else if (!queued) /** 当前节点尚未入队,加入队列 */            queued = WAITERS.weakCompareAndSet(this, q.next = waiters, q);        else if (timed) { /** 带超时版本的 */            final long parkNanos;            if (startTime == 0L) { // first time                startTime = System.nanoTime();                if (startTime == 0L)                    startTime = 1L;                parkNanos = nanos;            } else {                long elapsed = System.nanoTime() - startTime;                if (elapsed >= nanos) {                    removeWaiter(q);                    return state;                }                parkNanos = nanos - elapsed;            }            // nanoTime may be slow; recheck before parking            if (state < COMPLETING)                LockSupport.parkNanos(this, parkNanos);        }        else /** 当加入队列之后,阻塞当前线程 */            LockSupport.park(this);    }}
复制代码

cancel 是 Future 接口中定义的方法,用于取消任务的执行:

public boolean cancel(boolean mayInterruptIfRunning) {    /** 尝试将任务的 state 设置为中断或取消,如果成功则继续,否则返回 false */    if (!(state == NEW && STATE.compareAndSet          (this, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))        return false;    try {    // in case call to interrupt throws exception        if (mayInterruptIfRunning) {	          /** 如果允许中断运行中的线程,则中断 */            try {                Thread t = runner;                if (t != null)                    t.interrupt();            } finally { // final state                STATE.setRelease(this, INTERRUPTED);            }        }    } finally {        finishCompletion();    }    return true;}
复制代码



历史文章推荐

发布于: 刚刚阅读数: 3
用户头像

Samson

关注

还未添加个人签名 2019.07.22 加入

还未添加个人简介

评论

发布
暂无评论
Java Core 「14」J.U.C 线程池-Future & FutureTask_学习笔记_Samson_InfoQ写作社区