写点什么

Java 多线程 : 迟来的 Future

发布于: 2021 年 03 月 19 日

一 . Future 是什么

1.3 Future Task 简述

作用 : future 可以用于异步获取多线程任务结果 , Callable 用于产生结果,Future 用于获取结果流程 : 流程类似于叫好等餐 , 等餐是花费时间的过程,但是不妨碍我们叫号


  • 当 Future 进行 submit 开始 , 业务处理已经在多线程中开始 , 而 Get 即从多线程中获取数据

  • 当 Get 获取时业务还未处理完 , 当前线程会阻塞 , 直到业务处理完成 . 所以需要注意 future 的任务安排

使用 future 会有以下效果:

  • 1 启动多线程任务

  • 2 处理其他事情

  • 3 收集多线程任务结果

Future 对应的方法 :

  • cancel(boolean) : 取消操作

  • get() : 获取结果

  • get(long,TimeUtil) : 指定时间获取

  • isCancelled() : 该任务是否在完成之前被取消

  • isDone() :判断是否有结果

Future 接口的作用就是先生成一个 Future 对象 ,将具体的运行放入 future 对象中 ,最终通过 future 对象的 get 方法来获取最终的结果

1.2 Future Task

FutureTask 表示一个可以取消的异步运算 ,提供了 Future 完整的流程

  • 它有启动和取消运算、查询运算是否完成和取回运算结果等方法。

  • 只有当运算完成的时候结果才能取回,如果运算尚未完成 get 方法将会阻塞



我们关注一下以下源码细节点 :


// Node 1 : 实现了 RunnableFuture
// ------------------------------------
// Node 2 : 提供了7种状态private static final int NEW = 0; // 新建private static final int COMPLETING = 1; // 完成private static final int NORMAL = 2; // 正常private static final int EXCEPTIONAL = 3; // 异常private static final int CANCELLED = 4; // 取消private static final int INTERRUPTING = 5; // 中断(中)private static final int INTERRUPTED = 6; // 打断
// 过程的流转 : * NEW -> COMPLETING -> NORMAL* NEW -> COMPLETING -> EXCEPTIONAL* NEW -> CANCELLED* NEW -> INTERRUPTING -> INTERRUPTED
// ------------------------------------
// Node 3 : 内部属性// 底层Callable 对象private Callable<V> callable;// 输出对象private Object outcome;// 运行线程private volatile Thread runner;// 等待线程的Treiber堆栈private volatile WaitNode waiters;
// ------------------------------------
// Node 4 : 内部方法 // 方法一 : 获取参数V report(int s) // 为已完成的任务返回结果或抛出异常 1. Object x = outcome; 2. return (V)x; // 注意 : 状态为 CANCELLED 时会抛出异常 CancellationException
// 方法二 : 取消public boolean cancel(boolean mayInterruptIfRunning) // 取消 1. UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) // 这里神奇的做了一个CAS操作, 判断当前的状态 2. Thread t = runner; + t.interrupt(); // 打断线程 3. UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); // CAS 方式修改状态
// 方法三 : get 类型public V get() throwspublic V get(long timeout, TimeUnit unit) 1. if (s <= COMPLETING &&(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING) // 核心代码就是关注是否完成

// 方法四 : runStep 1 : CAS 操作状态Step 2 : 准备一个 Callable<V> c = callable;Step 3 : state == NEW 后 , result = c.call(); // 此时阻塞等待Step 4 : 设置结果 : set(result); Step 5 : 当然是修改状态啦Over !!!
// 方法五 : runAndReset// 在不设置结果的情况下执行计算,然后将这个future重置为初始状态 (其实主要是结尾修改了状态)
核心 : return ran && s == NEW;

// 方法五 : finishCompletion// 删除并通知所有等待的线程,调用done(),并使callable为空
// 2个for 循环保证执行 for (WaitNode q; (q = waiters) != null;) { // CAS 保证操作的准确性 if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; // 提供许可 LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } }


复制代码
复制代码

二 . Future 使用

Future 使用其实比较简单 , 发起等待即可 , 但是注意 Future 是会阻塞主线程的

public class FutureService extends AbstractService implements ApplicationRunner, Callable<String> {
private Logger logger = LoggerFactory.getLogger(this.getClass());
private static Long startTime; private static Long endTime;
private String salt; private Integer sleepNum;
public FutureService() { }
public FutureService(String salt, Integer sleepNum) { this.salt = salt; this.sleepNum = sleepNum; }
@Override public String call() throws Exception {
logger.info("------> 业务逻辑开始执行 :{} <-------", salt); StringBuffer sb = new StringBuffer(); for (int i = 0; i < sleepNum; i++) { sb.append(salt); try { Thread.sleep(1000); } catch (InterruptedException e) {
} } endTime = System.currentTimeMillis(); logger.info("------> {} - 业务执行完成 :{} <-------", salt, sb.toString()); getTime(startTime, endTime); return sb.toString(); }

@Override public void run(ApplicationArguments args) throws Exception { logger.info("------> 创建一个初始连接池 <-------"); ExecutorService executor = Executors.newFixedThreadPool(3);
logger.info("------> 开始业务一 future - a : <-------"); FutureTask<String> future = new FutureTask<String>(new FutureService("a", 10)); startTime = System.currentTimeMillis(); executor.submit(future); logger.info("------> 业务一请求完毕!主线程执行 <-------");
logger.info("------> 开始业务二 future - b : <-------"); FutureTask<String> future2 = new FutureTask<String>(new FutureService("b", 5)); startTime = System.currentTimeMillis(); executor.submit(future2); logger.info("------> 业务三请求完毕!主线程执行 <-------");
logger.info("------> 开始业务三 future - c : <-------"); FutureTask<String> future3 = new FutureTask<String>(new FutureService("c", 3)); startTime = System.currentTimeMillis(); executor.submit(future3); logger.info("------> 业务三请求完毕!主线程执行 <-------"); logger.info("------> future2 数据处理完成:{} <-------", future2.get()); logger.info("------> 2-1 测试主线程是否阻塞 <-------"); logger.info("------> future1 数据处理完成:{} <-------", future.get()); logger.info("------> 1-3 测试主线程是否阻塞 <-------"); logger.info("------> future3 数据处理完成:{} <-------", future3.get()); }}
19.839 [ main] this is run <-------19.839 [ main] 开始业务一 future - a : <-------19.839 [ main] 业务一请求完毕!主线程执行 <-------19.839 [ main] 开始业务二 future - b : <-------19.840 [ main] 业务三请求完毕!主线程执行 <-------19.840 [ main] 开始业务三 future - c : <-------19.840 [ main] 业务三请求完毕!主线程执行 <-------19.840 [pool-4-thread-2] 业务逻辑开始执行 :b <-------19.840 [pool-4-thread-1] 业务逻辑开始执行 :a <-------19.840 [pool-4-thread-3] 业务逻辑开始执行 :c <-------22.843 [pool-4-thread-3] c - 业务执行完成 :ccc <-------22.843 [pool-4-thread-3] time is :3.0 <-------24.844 [pool-4-thread-2] b - 业务执行完成 :bbbbb <-------24.844 [pool-4-thread-2] time is :5.0 <-------24.844 [ main] future2 数据处理完成:bbbbb <-------24.844 [ main] 2-1 测试主线程是否阻塞 <-------29.847 [pool-4-thread-1] a - 业务执行完成 :aaaaaaaaaa <-------29.847 [pool-4-thread-1] time is :10.0 <-------29.847 [ main] future1 数据处理完成:aaaaaaaaaa <-------29.847 [ main] 1-3 测试主线程是否阻塞 <-------29.847 [ main] future3 数据处理完成:ccc <------- // 流程 : // Main 线程中 , 哭有看到 abc 时顺序执行的 , 从 submit 开始 , 开始多线程执行 (所以顺序不再固定, 变成了 bac)// 从多线程里面看 , c > b > a 执行完成// 当 b 业务完成后 , 因为main 一直阻塞到 futurb.get 的阶段 , 所以B future 获取值 , main 线程遇到 a future 继续阻塞// 当 a future get 完成后 , c 才能get // 总结 : > future submit 多线程执行> future get 会阻塞主线程等待 , 当 get 时 , 多线程才会把数据提供出来 复制代码
复制代码

三 . Future 问答

按照线程池最常见的用法 , 我们通过 executor.submit 时会返回一个 Future 对象 ,然后通过 Future 对象获取

First : 我们以这个方法去推理 ExecutorService executor = Executors.newFixedThreadPool(1);Future future = executor.submit(A Callable Object);

问题一 : Future 底层基于什么 ?

  • Step 1 : 当通过 submit 调用的时候 , 底层会调用 :

return new FutureTask<T>(runnable, value);

  • Step 2 : 在外层会被提升为父类 RunnableFuture , 在返回的时候又会被提成 Future

RunnableFuture<T> ftask = newTaskFor(task, result);

总结 : 所以 , 底层的实现类主要可以看成 FutureTask , 而 task 实际上可以算 Runnable 的实现类

问题二 : Future 怎么运行 ?

Future 运行主要基于 run()

  1. 通过调用 callable.call() 完成

  2. 如果 call 执行成功,则通过 set 方法保存结果 ,将 result 保存到 outcome;

  3. 如果 call 执行有异常,则通过 setException 保存异常;

问题三 : future 回调基于什么 -- get(long,TimeUtil) ?

通过调用 report(s) 完成调用

问题四 : future 阻塞的方式 ?

当判断未完成时 , 会调用 awaitDone 等待 , 具体的逻辑以后分析

if (s <= COMPLETING){      s = awaitDone(false, 0L);}复制代码
复制代码

awaitDone 中主要做了以下几件事 :

  1. 如果主线程被中断,则抛出中断异常;

  2. 判断 FutureTask 当前的 state,如果大于 COMPLETING,说明任务已经执行完成,则直接返回;

  3. 如果当前 state 等于 COMPLETING,说明任务已经执行完,这时主线程只需通过 yield 方法让出 cpu 资源,等待 state 变成 NORMAL;

  4. 通过 WaitNode 类封装当前线程,并通过 UNSAFE 添加到 waiters 链表;

  5. 最终通过 LockSupport 的 park 或 parkNanos 挂起线程;

问题五 : future 怎么取消 ?

  • cancel(boolean mayInterruptIfRunning) 中 t.interrupt()

  • 并且 UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); 修改状态

问题六 : future 怎么判断取消和结果 isCancelled -- isDone ?

return state >= CANCELLED;

四 . Future 与 Callable

这个需要从 ExecutorService.submit() 来看 , ExecutorService 有 2 个主要的 submit 方法 , 不论是 Callable , 还是 Runnable , 通过返回值就不难发现 ,其最终都变成了一个 Future

<T> Future<T> submit(Callable<T> task);	- RunnableFuture<T> ftask = newTaskFor(task);		-  return new ForkJoinTask.AdaptedCallable<T>(callable);	- execute(ftask);        <T> Future<T> submit(Runnable task, T result);	- RunnableFuture<T> ftask = newTaskFor(task, result);	- execute(ftask);        // 例如这里可以直接将 Callable 作为参数传进去 : Future<String> future = executor.submit(createCallable());public Callable createCallable() {        Callable<Module> call = new Callable<Module>() {            public Module call() throws Exception {                // .....            }        };        return call;    } 
复制代码
复制代码

五 . 衍生用法 ScheduledFutureTask

// ScheduledFutureTask 简介• time:任务执行时间;• period:任务周期执行间隔;• sequenceNumber:自增的任务序号。    // 执行顺序 : 在等待队列里调度不再按照FIFO,而是按照执行时间,谁即将执行,谁就排在前面。M- getDelay(TimeUnit unit)M- int compareTo(Delayed other)  

// ScheduledFutureTask 主要在 ScheduledThreadPoolExecutor中

// Node 1 : ScheduledThreadPoolExecutor内部类 ScheduledFutureTaskprivate class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> 复制代码
复制代码

核心代码参考线程池这篇文档



作者:AntBlack

链接:https://juejin.cn/post/6941010435512467493


用户头像

还未添加个人签名 2021.03.15 加入

还未添加个人简介

评论

发布
暂无评论
Java 多线程 : 迟来的 Future