写点什么

源码解读之 FutureTask 如何实现最大等待时间

作者:EquatorCoco
  • 2023-07-07
    福建
  • 本文字数:3438 字

    阅读完需:约 11 分钟

预备知识:Java 线程挂起的常用方式有以下几种


  1. Thread.sleep(long millis):这个方法可以让线程挂起一段时间,并释放 CPU 时间片,等待一段时间后自动恢复执行。这种方式可以用来实现简单的定时器功能,但如果不恰当使用会影响系统性能。


  1. Object.wait() 和 Object.notify() 或 Object.notifyAll():这是一种通过等待某个条件的发生来挂起线程的方式。wait() 方法会让线程等待,直到其他线程调用了 notify() 或 notifyAll() 方法来通知它。这种方式需要使用 synchronized 或者 ReentrantLock 等同步机制来保证线程之间的协作和通信。


  1. LockSupport.park() 和 LockSupport.unpark(Thread thread):这两个方法可以让线程挂起和恢复。park() 方法会使当前线程挂起,直到其他线程调用了 unpark(Thread thread) 方法来唤醒它。这种方式比较灵活,可以根据需要控制线程的挂起和恢复。


先上结论:


1.futureTask.get 时通过 LockSupport.park()挂起线程


2.在 Thread.run() 方法中 调用 setException(ex)或 set(result),然后调用 LockSupport.unpark(t)唤醒线程。


获取地址:http://www.jnpfsoft.com/?from=infoq


一:示例-引入主题


public class FutureTaskDemo {    public static void main(String[] args) {        FutureTask<String> futureTask = new FutureTask<>(new Callable() {            @Override            public Object call() throws Exception {                System.out.println("异步线程执行");                Thread.sleep(3000);//模拟线程执行任务需要3秒                return "ok";            }        });        Thread t1 = new Thread(futureTask, "线程一");        t1.start();
try { //关键代码 String s = futureTask.get(2, TimeUnit.SECONDS); //最大等待线程2秒 } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } }}
复制代码


二:进入 futureTask.get(2, TimeUnit.SECONDS);


  public V get(long timeout, TimeUnit unit)        throws InterruptedException, ExecutionException, TimeoutException {        if (unit == null)            throw new NullPointerException();        int s = state;        if (s <= COMPLETING &&            (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING) //重点awaitDone,即完成了最大等待,依然没有结果就抛出异常逻辑            throw new TimeoutException();        return report(s);    }
复制代码


awaitDone 返回线程任务执行状态,即小于等于 COMPLETING(任务正在运行,等待完成)抛出异常 TimeoutException


三:进入(awaitDone(true, unit.toNanos(timeout)))原理分析


private int awaitDone(boolean timed, long nanos)        throws InterruptedException {        final long deadline = timed ? System.nanoTime() + nanos : 0L;        WaitNode q = null;        boolean queued = false;        for (;;) {            if (Thread.interrupted()) {                removeWaiter(q);                throw new InterruptedException();            }
int s = state; if (s > COMPLETING) { if (q != null) q.thread = null; return s; } else if (s == COMPLETING) // cannot time out yet Thread.yield(); else if (q == null) q = new WaitNode(); else if (!queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } else LockSupport.park(this); } }
复制代码


3.1 总体解读 awaitDone


利用自旋(for (;😉)的方式 ,检查 state(任务状态)与 waitNode(维护等待的线程),


第一步:首先检查 if (Thread.interrupted()) 线程是否被打断(LockSupport.parkNanos 挂起的线程被打断不抛出异常),


第二步:判断任务状态与 waitNode 是否入队+确定最大等待时间


​ 若已完成(if (s > COMPLETING))返回任务状态


​ 若已完成(if (s == COMPLETING))-->表示正在完成,但尚未完成。则让出 CPU,进入就绪状态,等待其他线程的执行


​ 若 if (q == null)==>创建等待等待节点


​ 若 if (!queued)==>表示上一步创建的节点没有和当前线程绑定,故绑定


​ 最后 else if (timed)与 else,判断最大等待时间


static final class WaitNode {        volatile Thread thread;        volatile WaitNode next;        WaitNode() { thread = Thread.currentThread(); }    }
复制代码


private static final int NEW          = 0;private static final int COMPLETING   = 1;private static final int NORMAL       = 2;private static final int EXCEPTIONAL  = 3;private static final int CANCELLED    = 4;private static final int INTERRUPTING = 5;private static final int INTERRUPTED  = 6;state可能转换的过程     1.NEW -> COMPLETING -> NORMAL (成功完成)    2.NEW -> COMPLETING -> EXCEPTIONAL (异常)    3.NEW -> CANCELLED (任务被取消)    4.NEW -> INTERRUPTING -> INTERRUPTED(任务被打断)
复制代码


3.2 关键代码


LockSupport.park(this, nanos) ==内部实现==> UNSAFE.park(false, nanos)();
复制代码

即让当前线程堵塞直至指定的时间(nanos),该方法同 Thread.sleep()一样不会释放持有的对象锁,但不同的是 Thread.sleep 会被打断(interrupted)并抛出异常,而 LockSupport.park 被打断不会抛出异常,故在自旋时(for (;😉)需判断 if (Thread.interrupted())线程是否被打断(手动抛出异常)。


四:线程运行时 state 的变化轨迹


4.1:新建时利用构造器设置 state=NEW


 public FutureTask(Runnable runnable, V result) {     this.callable = Executors.callable(runnable, result);     this.state = NEW;   // 赋值状态 }
复制代码


4.2: 线程运行时 state 可能变化轨迹


public void run() {        ..........防止多次运行stat()方法..............        try {            Callable<V> c = callable;            if (c != null && state == NEW) {                V result;                boolean ran;                try {                    result = c.call();                    ran = true;                } catch (Throwable ex) {                    result = null;                    ran = false;                    setException(ex); //异常轨迹---> 见下分析                }                if (ran)                    set(result); // 正常轨迹--->见下分析            }        } finally {            runner = null;    		//----最后结束---防止线程被打断            int s = state;            if (s >= INTERRUPTING)                handlePossibleCancellationInterrupt(s);        }    }
复制代码


异常轨迹 setException(ex)


protected void setException(Throwable t) {    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {        outcome = t;        UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state        finishCompletion();        //轨迹变化 2.NEW -> COMPLETING -> EXCEPTIONAL (异常)    }    //否则1: 3.NEW -> CANCELLED (任务被取消)    //否则2: 4.NEW -> INTERRUPTING -> INTERRUPTED(任务被打断)}
复制代码


正常轨迹 set(result);


 1.NEW -> COMPLETING -> NORMAL (成功完成)
复制代码


文章转载自:爱我-中华

原文链接:https://www.cnblogs.com/jinliang374003909/p/17264944.html

用户头像

EquatorCoco

关注

还未添加个人签名 2023-06-19 加入

还未添加个人简介

评论

发布
暂无评论
源码解读之FutureTask如何实现最大等待时间_源码_EquatorCoco_InfoQ写作社区