一 . Future 是什么
1.3 Future Task 简述
作用 : future 可以用于异步获取多线程任务结果 , Callable 用于产生结果,Future 用于获取结果流程 : 流程类似于叫好等餐 , 等餐是花费时间的过程,但是不妨碍我们叫号
当 Future 进行 submit 开始 , 业务处理已经在多线程中开始 , 而 Get 即从多线程中获取数据
当 Get 获取时业务还未处理完 , 当前线程会阻塞 , 直到业务处理完成 . 所以需要注意 future 的任务安排
使用 future 会有以下效果:
1 启动多线程任务
2 处理其他事情
3 收集多线程任务结果
Future 对应的方法 :
Future 接口的作用就是先生成一个 Future 对象 ,将具体的运行放入 future 对象中 ,最终通过 future 对象的 get 方法来获取最终的结果
1.2 Future Task
FutureTask 表示一个可以取消的异步运算 ,提供了 Future 完整的流程
我们关注一下以下源码细节点 :
// Node 1 : 实现了 RunnableFuture
// ------------------------------------
// Node 2 : 提供了7种状态
private static final int NEW = 0; // 新建
private static final int COMPLETING = 1; // 完成
private static final int NORMAL = 2; // 正常
private static final int EXCEPTIONAL = 3; // 异常
private static final int CANCELLED = 4; // 取消
private static final int INTERRUPTING = 5; // 中断(中)
private static final int INTERRUPTED = 6; // 打断
// 过程的流转 :
* NEW -> COMPLETING -> NORMAL
* NEW -> COMPLETING -> EXCEPTIONAL
* NEW -> CANCELLED
* NEW -> INTERRUPTING -> INTERRUPTED
// ------------------------------------
// Node 3 : 内部属性
// 底层Callable 对象
private Callable<V> callable;
// 输出对象
private Object outcome;
// 运行线程
private volatile Thread runner;
// 等待线程的Treiber堆栈
private volatile WaitNode waiters;
// ------------------------------------
// Node 4 : 内部方法
// 方法一 : 获取参数
V report(int s) // 为已完成的任务返回结果或抛出异常
1. Object x = outcome;
2. return (V)x;
// 注意 : 状态为 CANCELLED 时会抛出异常 CancellationException
// 方法二 : 取消
public boolean cancel(boolean mayInterruptIfRunning) // 取消
1. UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
// 这里神奇的做了一个CAS操作, 判断当前的状态
2. Thread t = runner; + t.interrupt();
// 打断线程
3. UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
// CAS 方式修改状态
// 方法三 : get 类型
public V get() throws
public V get(long timeout, TimeUnit unit)
1. if (s <= COMPLETING &&(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
// 核心代码就是关注是否完成
// 方法四 : run
Step 1 : CAS 操作状态
Step 2 : 准备一个 Callable<V> c = callable;
Step 3 : state == NEW 后 , result = c.call();
// 此时阻塞等待
Step 4 : 设置结果 : set(result);
Step 5 : 当然是修改状态啦
Over !!!
// 方法五 : runAndReset
// 在不设置结果的情况下执行计算,然后将这个future重置为初始状态 (其实主要是结尾修改了状态)
核心 : return ran && s == NEW;
// 方法五 : finishCompletion
// 删除并通知所有等待的线程,调用done(),并使callable为空
// 2个for 循环保证执行
for (WaitNode q; (q = waiters) != null;) {
// CAS 保证操作的准确性
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
// 提供许可
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
复制代码
复制代码
二 . Future 使用
Future 使用其实比较简单 , 发起等待即可 , 但是注意 Future 是会阻塞主线程的
public class FutureService extends AbstractService implements ApplicationRunner, Callable<String> {
private Logger logger = LoggerFactory.getLogger(this.getClass());
private static Long startTime;
private static Long endTime;
private String salt;
private Integer sleepNum;
public FutureService() {
}
public FutureService(String salt, Integer sleepNum) {
this.salt = salt;
this.sleepNum = sleepNum;
}
@Override
public String call() throws Exception {
logger.info("------> 业务逻辑开始执行 :{} <-------", salt);
StringBuffer sb = new StringBuffer();
for (int i = 0; i < sleepNum; i++) {
sb.append(salt);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
}
endTime = System.currentTimeMillis();
logger.info("------> {} - 业务执行完成 :{} <-------", salt, sb.toString());
getTime(startTime, endTime);
return sb.toString();
}
@Override
public void run(ApplicationArguments args) throws Exception {
logger.info("------> 创建一个初始连接池 <-------");
ExecutorService executor = Executors.newFixedThreadPool(3);
logger.info("------> 开始业务一 future - a : <-------");
FutureTask<String> future = new FutureTask<String>(new FutureService("a", 10));
startTime = System.currentTimeMillis();
executor.submit(future);
logger.info("------> 业务一请求完毕!主线程执行 <-------");
logger.info("------> 开始业务二 future - b : <-------");
FutureTask<String> future2 = new FutureTask<String>(new FutureService("b", 5));
startTime = System.currentTimeMillis();
executor.submit(future2);
logger.info("------> 业务三请求完毕!主线程执行 <-------");
logger.info("------> 开始业务三 future - c : <-------");
FutureTask<String> future3 = new FutureTask<String>(new FutureService("c", 3));
startTime = System.currentTimeMillis();
executor.submit(future3);
logger.info("------> 业务三请求完毕!主线程执行 <-------");
logger.info("------> future2 数据处理完成:{} <-------", future2.get());
logger.info("------> 2-1 测试主线程是否阻塞 <-------");
logger.info("------> future1 数据处理完成:{} <-------", future.get());
logger.info("------> 1-3 测试主线程是否阻塞 <-------");
logger.info("------> future3 数据处理完成:{} <-------", future3.get());
}
}
19.839 [ main] this is run <-------
19.839 [ main] 开始业务一 future - a : <-------
19.839 [ main] 业务一请求完毕!主线程执行 <-------
19.839 [ main] 开始业务二 future - b : <-------
19.840 [ main] 业务三请求完毕!主线程执行 <-------
19.840 [ main] 开始业务三 future - c : <-------
19.840 [ main] 业务三请求完毕!主线程执行 <-------
19.840 [pool-4-thread-2] 业务逻辑开始执行 :b <-------
19.840 [pool-4-thread-1] 业务逻辑开始执行 :a <-------
19.840 [pool-4-thread-3] 业务逻辑开始执行 :c <-------
22.843 [pool-4-thread-3] c - 业务执行完成 :ccc <-------
22.843 [pool-4-thread-3] time is :3.0 <-------
24.844 [pool-4-thread-2] b - 业务执行完成 :bbbbb <-------
24.844 [pool-4-thread-2] time is :5.0 <-------
24.844 [ main] future2 数据处理完成:bbbbb <-------
24.844 [ main] 2-1 测试主线程是否阻塞 <-------
29.847 [pool-4-thread-1] a - 业务执行完成 :aaaaaaaaaa <-------
29.847 [pool-4-thread-1] time is :10.0 <-------
29.847 [ main] future1 数据处理完成:aaaaaaaaaa <-------
29.847 [ main] 1-3 测试主线程是否阻塞 <-------
29.847 [ main] future3 数据处理完成:ccc <-------
// 流程 :
// Main 线程中 , 哭有看到 abc 时顺序执行的 , 从 submit 开始 , 开始多线程执行 (所以顺序不再固定, 变成了 bac)
// 从多线程里面看 , c > b > a 执行完成
// 当 b 业务完成后 , 因为main 一直阻塞到 futurb.get 的阶段 , 所以B future 获取值 , main 线程遇到 a future 继续阻塞
// 当 a future get 完成后 , c 才能get
// 总结 :
> future submit 多线程执行
> future get 会阻塞主线程等待 , 当 get 时 , 多线程才会把数据提供出来
复制代码
复制代码
三 . Future 问答
按照线程池最常见的用法 , 我们通过 executor.submit 时会返回一个 Future 对象 ,然后通过 Future 对象获取
First : 我们以这个方法去推理 ExecutorService executor = Executors.newFixedThreadPool(1);Future future = executor.submit(A Callable Object);
问题一 : Future 底层基于什么 ?
return new FutureTask<T>(runnable, value);
RunnableFuture<T> ftask = newTaskFor(task, result);
总结 : 所以 , 底层的实现类主要可以看成 FutureTask , 而 task 实际上可以算 Runnable 的实现类
问题二 : Future 怎么运行 ?
Future 运行主要基于 run()
通过调用 callable.call() 完成
如果 call 执行成功,则通过 set 方法保存结果 ,将 result 保存到 outcome;
如果 call 执行有异常,则通过 setException 保存异常;
问题三 : future 回调基于什么 -- get(long,TimeUtil) ?
通过调用 report(s) 完成调用
问题四 : future 阻塞的方式 ?
当判断未完成时 , 会调用 awaitDone 等待 , 具体的逻辑以后分析
if (s <= COMPLETING){
s = awaitDone(false, 0L);
}
复制代码
复制代码
awaitDone 中主要做了以下几件事 :
如果主线程被中断,则抛出中断异常;
判断 FutureTask 当前的 state,如果大于 COMPLETING,说明任务已经执行完成,则直接返回;
如果当前 state 等于 COMPLETING,说明任务已经执行完,这时主线程只需通过 yield 方法让出 cpu 资源,等待 state 变成 NORMAL;
通过 WaitNode 类封装当前线程,并通过 UNSAFE 添加到 waiters 链表;
最终通过 LockSupport 的 park 或 parkNanos 挂起线程;
问题五 : future 怎么取消 ?
cancel(boolean mayInterruptIfRunning) 中 t.interrupt()
并且 UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); 修改状态
问题六 : future 怎么判断取消和结果 isCancelled -- isDone ?
return state >= CANCELLED;
四 . Future 与 Callable
这个需要从 ExecutorService.submit() 来看 , ExecutorService 有 2 个主要的 submit 方法 , 不论是 Callable , 还是 Runnable , 通过返回值就不难发现 ,其最终都变成了一个 Future
<T> Future<T> submit(Callable<T> task);
- RunnableFuture<T> ftask = newTaskFor(task);
- return new ForkJoinTask.AdaptedCallable<T>(callable);
- execute(ftask);
<T> Future<T> submit(Runnable task, T result);
- RunnableFuture<T> ftask = newTaskFor(task, result);
- execute(ftask);
// 例如这里可以直接将 Callable 作为参数传进去 :
Future<String> future = executor.submit(createCallable());
public Callable createCallable() {
Callable<Module> call = new Callable<Module>() {
public Module call() throws Exception {
// .....
}
};
return call;
}
复制代码
复制代码
五 . 衍生用法 ScheduledFutureTask
// ScheduledFutureTask 简介
• time:任务执行时间;
• period:任务周期执行间隔;
• sequenceNumber:自增的任务序号。
// 执行顺序 : 在等待队列里调度不再按照FIFO,而是按照执行时间,谁即将执行,谁就排在前面。
M- getDelay(TimeUnit unit)
M- int compareTo(Delayed other)
// ScheduledFutureTask 主要在 ScheduledThreadPoolExecutor中
// Node 1 : ScheduledThreadPoolExecutor内部类 ScheduledFutureTask
private class ScheduledFutureTask<V>
extends FutureTask<V> implements RunnableScheduledFuture<V>
复制代码
复制代码
核心代码参考线程池这篇文档
作者:AntBlack
链接:https://juejin.cn/post/6941010435512467493
评论