一切从假如开始, 假如有一个用户的实体列表, 需求是统计出来有多少个男性, 多少个女性, 年龄 25 以上有多少, 25 及一下有多少人.
List<User> userList = List.of(
User.builder().id(1L).name("a").age(18).gender(Gender.M).build(),
User.builder().id(2L).name("b").age(20).gender(Gender.F).build(),
User.builder().id(3L).name("c").age(22).gender(Gender.M).build(),
User.builder().id(4L).name("d").age(28).gender(Gender.F).build(),
User.builder().id(5L).name("e").age(31).gender(Gender.F).build());
复制代码
static Map<String, Long> normal(List<User> userList) {
long mCount = userList.stream().filter(i -> Gender.M.equals(i.getGender())).count();
long fCount = userList.stream().filter(i -> Gender.F.equals(i.getGender())).count();
long oldCount = userList.stream().filter(i -> i.getAge() > 25).count();
long youngCount = userList.stream().filter(i -> i.getAge() <= 25).count();
return Map.of(
"mCount", mCount,
"fCount", fCount,
"oldCount", oldCount,
"youngCount", youngCount);
}
复制代码
在常规方法中, 如果我们需要处理的数据量很小而且耗时很短, 如此处理完全没有问题, 但是如果获取四个指标都非常的耗时, 假如每个指标的获取都需要大概一秒钟的时间, 因为每个指标的获取都是阻塞的, 那么用上面的方法来处理总共耗时大概需要 4 秒钟左右, 此时需要引入异步.而且还要有返回值.因此可以想到 jdk1.5 就引入的 Future.
static Map<String, Long> futureDemo(List<User> userList) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(4);
Future<Long> mCount = executorService.submit(() -> userList.stream().filter(i -> Gender.M.equals(i.getGender())).count());
Future<Long> fCount = executorService.submit(() -> userList.stream().filter(i -> Gender.F.equals(i.getGender())).count());
Future<Long> oldCount = executorService.submit(() -> userList.stream().filter(i -> i.getAge() > 25).count());
Future<Long> youngCount = executorService.submit(() -> userList.stream().filter(i -> i.getAge() <= 25).count());
return Map.of(
"mCount", mCount.get(),
"fCount", fCount.get(),
"oldCount", oldCount.get(),
"youngCount", youngCount.get());
}
复制代码
StopWatch stopWatch = new StopWatch();
stopWatch.start("普通处理方式");
log.info("普通方式: {}", normal(userList).toString());
stopWatch.stop();
stopWatch.start("Future+多线程处理方式");
log.info("Future+多线程方式: {}", futureDemo(userList).toString());
stopWatch.stop();
log.info(stopWatch.prettyPrint());
复制代码
22:37:57.712 [main] INFO com.example.multithread.completablefuture.CompletableFutureDemo - 普通方式: {youngCount=3, oldCount=2, fCount=3, mCount=2}
22:37:57.718 [main] INFO com.example.multithread.completablefuture.CompletableFutureDemo - Future+多线程方式: {youngCount=3, oldCount=2, fCount=3, mCount=2}
22:37:57.719 [main] INFO com.example.multithread.completablefuture.CompletableFutureDemo - StopWatch '': running time = 8670142 ns
---------------------------------------------
ns % Task name
---------------------------------------------
005453510 063% 普通处理方式
003216632 037% Future+多线程处理方式
复制代码
总结: 可以看出使用异步后的效率得到了明显的提升, 主要是因为后者的处理中, 各个指标之间不存在依赖性, 程序中由于数据量比较小, 后者的统计中还包括了创建线程池的耗时, 所以效果没有特别明显, 但是如果在更加耗时的任务中, 效果会更加明显.
构建异步 API
在上面的常规处理方法中, 每个指标的获取都是阻塞的, 导致即使不相干的任务也必须要等待上一个不相干的任务结束并拿到返回值, 在实际情况中, 我们经常会同时调用多个阻塞的 API, 导致资源浪费. 将同步 API 修改为异步 API 是个不错的想法, 前提是我们能够修改接口.
static Long getMcount(List<User> userList) {
return userList.stream().filter(i -> Gender.M.equals(i.getGender())).count();
}
复制代码
static Future<Long> getMcountAsync(List<User> userList) {
ExecutorService executorService = Executors.newFixedThreadPool(4);
return executorService.submit(() -> userList.stream().filter(i -> Gender.M.equals(i.getGender())).count());
}
复制代码
static Future<Long> getMcountAsyncWithCF(List<User> userList) {
CompletableFuture<Long> completableFuture = new CompletableFuture<>();
new Thread(() -> {
long count = userList.stream().filter(i -> Gender.M.equals(i.getGender())).count();
completableFuture.complete(count);
}).start();
return completableFuture;
}
复制代码
在以上构建的异步应用中, 如果在计算时出现了异常, 由于异步的原因导致调用者无法在外部直接了解到到底发生了什么错误, 这就需要借助于 CompletableFuture 的 completeExceptionally 方法, 将内部异常问题抛出.
static Future<Long> getMcountAsyncWithCFWithException(List<User> userList) {
CompletableFuture<Long> completableFuture = new CompletableFuture<>();
new Thread(() -> {
try {
long count = userList.stream().filter(i -> Gender.M.equals(i.getGender())).count();
completableFuture.complete(count);
} catch (Exception e) {
completableFuture.completeExceptionally(e);
}
}).start();
return completableFuture;
}
复制代码
通过以上的例子可以看出, 构建比较完整的异步 api 还是比较臃肿的, 我们可以通过 CompletableFuture 提供的工厂方法来进一步优化异步 api.
static Future<Long> getMcountAsyncWithSupplyAsync(List<User> userList) {
return CompletableFuture.supplyAsync(() -> userList.stream().filter(i -> Gender.M.equals(i.getGender())).count());
}
复制代码
可以看出借助 CompletableFuture 提供的工厂方法, 可以非常方便简洁的构建出异步 api 来, 其中 supplyAsync 方法接受一个生产者(Supplier) 作为参数, 默认情况下, 任务处理会交由 ForkJoinPool 池中的某个线程来处理. 但是也可以通过 SupplyAsync 的重载版本, 传入一个线程池来执行任务. supplyAsync 方法还包含了和以上手工处理异常类似的异常处理机制, 以上这些都是在我们可以修改 api 的情况下实现的, 如果我们无法修改我们需要调用的 api, 那就不适用了.
异步调用同步方法
假设有五个运动员, 现在需要测试他们的成绩. 看看谁跑的最快.
模拟运行的过程代码:
static String runTest(Athlete athlete) {
delay();//延时 1秒, 模拟任务耗时.
return "我是: " + athlete.getName() + "我的测试成绩是: " + random.nextInt(10) + "s";
}
复制代码
如果只有一个跑道的话, 那么我们只能让他们一个一个顺序的跑.
static String runTest(Athlete athlete) {
delay();
return "我是: " + athlete.getName() + "我的测试成绩是: " + random.nextInt(10) + "s";
}
复制代码
这样以同步的方式来调用接口, 耗时大概是 5s 多一点. 下面让我们尝试用并行流来测试.
static List<String> runAsyncWithParallel(List<Athlete> athletes) {
return athletes.parallelStream().map(CompletableFutureDemo2::runTest).collect(Collectors.toList());
}
复制代码
通过测试耗时大概 1s 左右. 似乎所有的任务都是以异步的方式在运行. 在试试用 CompletableFuture 的方式结果如何.
static List<String> runAsyncWithCF(List<Athlete> athletes) {
List<CompletableFuture<String>> completableFutures = athletes.stream().map(i -> CompletableFuture.supplyAsync(() -> runTest(i))).collect(Collectors.toList());
return completableFutures.stream().map(CompletableFuture::join).collect(Collectors.toList());
}
复制代码
通过测试发现结果类似与使用并行流的方式, 但是当我们把任务的数据增加至一定的数量, 会发现使用 CompletableFuture 的耗时会翻倍, 但是并行流任然能保持高效, 是什么原因呢? 难道并行流比 completableFuture 高效? 当我们尝试再次把任务数量增加更多时发现并行流的耗时开始翻倍, 这个时候再拿出我们的 CompletableFuture 尝试, 发现耗时接近并行流的处理方式, 其实出现耗时翻倍是由于线程复用导致的, jdk 默认会根据我们的机器情况来初始化线程池中的线程数量, 计算方式: Runtime.getRuntime().availableProcessors(). CompletableFuture 和 ParallelStream 相比他的优势在于可以手动配置线程池. 以至于可以配置更加适合我们任务的线程池, 主要是线程数量.
static List<String> runAsyncWithExecutor(List<Athlete> athletes, Executor executor) {
List<CompletableFuture<String>> completableFutures = athletes.stream().map(i -> CompletableFuture.supplyAsync(() -> runTest(i), executor)).collect(Collectors.toList());
return completableFutures.stream().map(CompletableFuture::join).collect(Collectors.toList());
}
复制代码
通过自定义线程池, 我们发现任务耗时终于回归到了 1s 的耗时上. 这正是我们想要的结果, 通过 CompletableFuture, 我们把同步的接口调用成功转换为了异步的方式来调用. 以上所有的操作中,我们使用的都是单个任务的处理. CompletableFuture 还有一个重要能力, 组合多个异步任务. 并让其以异步的方式运行.
对多个异步任务进行流水线操作
现在假设我们的每个运动员都要根据平时的训练情况的不同对测试成绩进行再次计算. 于是在我们得到他们的成绩的时候还要再次查看他们平时训练情况. 于是我们的计算过程应该如下:
static String runTestTwo(String athlete) {
delay();
return athlete;
}
复制代码
static List<String> runSync(List<Athlete> athletes) {
return athletes.stream().map(CompletableFutureDemo3::runTest).map(CompletableFutureDemo3::runTestTwo).collect(Collectors.toList());
}
复制代码
当所有的任务都以同步的方式运行时, 我们任务耗时是非常可怕的. 在 5 个任务需要处理时, 以上代码耗时 10s 多一点.
static List<String> runAsyncWithExecutor(List<Athlete> athletes, Executor executor) {
List<CompletableFuture<String>> completableFutures = athletes.stream().map(i -> CompletableFuture.supplyAsync(() -> runTest(i), executor))
.map(future->future.thenCompose(i->CompletableFuture.supplyAsync(()->CompletableFutureDemo3.runTestTwo(i), executor)))
.collect(Collectors.toList());
return completableFutures.stream().map(CompletableFuture::join).collect(Collectors.toList());
}
复制代码
这次我们任务耗时为 2s 多一点, 一个运动员要进行的两次测试动作为同步操作. 但是不同运动员之间是异步操作, 所以只有我们的跑道足够多, 那么无论我们有多少运动员, 最终的耗时都应该是 2s 多一点, 处理流程类似:
这里梳理一下如何借助 CompletableFuture 实现的不同运动员之间执行异步任务.
第一阶段测试
对每个运动员进行第一次测试, 利用了 CompletableFuture 的 supplyAsync 工厂方法对其进行了异步测试, 并且自定义了 Executor.
第二阶段测试
在第一阶段的结果之上, 再次对测试结果进行计算, 这里不同运动员之间也是异步执行的, 所以在这一步也利用了 supplyAsync 方法.且自定义了 Executor.
在这一步还用到了 CompletableFuture 的 thenCompose 方法. 这个方法就是专门了实现不同任务之间实现流水线操作而设计的.
thenCompose 方法和其他方法类似, 提供了以 Async 结尾的方法 thenComposeAsync 版本, 不同之处在于是否和前一个阶段使用同一个线程来处理, 我们示例中就是使用同一个线程来处理两个阶段的任务, 如果使用 thenComposeAsync 那么同一个运动员的两阶段任务会使用不同的线程来执行, 由于涉及到线程切换的开销, 所以这里使用了 thenCompose 而不是 thenComposeAsync.
合并两个 CompletableFuture 对象.
之前我们使用了 thenCompose 方法, 将上一步计算的结果传递给下一步计算作为输入参数, 但是如果现在第二步的计算不需要第一步的计算结果, 那么就可以把第一步和第二步完全异步化执行, 这里需要借助: thenCombine.
static List<String> runAsyncWithCombine(List<Athlete> athletes, Executor executor) {
List<CompletableFuture<String>> completableFutures = athletes.stream().map(i ->
CompletableFuture.supplyAsync(() -> runTest(i), executor)
.thenCombine(CompletableFuture.supplyAsync(() -> CompletableFutureDemo3.runTestTwo(i), executor), (a, b) -> a + b))
.collect(Collectors.toList());
return completableFutures.stream().map(CompletableFuture::join).collect(Collectors.toList());
}
复制代码
以上代码虽然有两个步骤, 但是这两个步骤是完全异步化的, 所以在线程资源充足的情况下, 所有的任务都可以异步执行, 最后的合并操作需要这两个异步操作完成. 所以最终耗时为 1s 多一点. thenCombine 方法也有 Async 版本, 作用类似于上面的 thenCoposeAsync. 通过一个图描述上述代码运行过程:
completion 事件
现在假如提前测试结束的运动员, 我们可以提前公布其测试成绩, 而不需要等待所有的人都测试完成, 可以借助工厂方法: thenAccept. 他同样也有 Async 版本.功能都是类似的. 要实现测试完一个运动员成绩并立马公布. 还需要借助工厂方法: allof().join()实现.
static List<String> runAsyncWithCombine(List<Athlete> athletes, Executor executor) {
CompletableFuture[] completableFutures = athletes.stream().map(i ->
CompletableFuture.supplyAsync(() -> runTest(i), executor)
.thenCombine(CompletableFuture.supplyAsync(() -> CompletableFutureDemo4.runTestTwo(i), executor), (a, b) -> a + b))
.map(i -> i.thenAccept(log::info)).collect(Collectors.toList()).toArray(CompletableFuture[]::new);
CompletableFuture.allOf(completableFutures).join();
//CompletableFuture.anyOf(completableFutures).join();
return new ArrayList<>();
}
复制代码
打印输出:
21:29:18.803 [pool-1-thread-8] INFO com.example.multithread.completablefuture.CompletableFutureDemo4 - 我是: tom我的测试成绩是: 6s我给你60分
21:29:19.214 [pool-1-thread-6] INFO com.example.multithread.completablefuture.CompletableFutureDemo4 - 我是: tony我的测试成绩是: 4s我给你57分
21:29:19.403 [pool-1-thread-4] INFO com.example.multithread.completablefuture.CompletableFutureDemo4 - 我是: lucy我的测试成绩是: 1s我给你45分
21:29:19.731 [pool-1-thread-10] INFO com.example.multithread.completablefuture.CompletableFutureDemo4 - 我是: perter我的测试成绩是: 9s我给你60分
21:29:19.875 [pool-1-thread-2] INFO com.example.multithread.completablefuture.CompletableFutureDemo4 - 我是: jacky我的测试成绩是: 7s我给你94分
21:29:19.881 [main] INFO com.example.multithread.completablefuture.CompletableFutureDemo4 - 2.403287309s
21:29:19.882 [main] INFO com.example.multithread.completablefuture.CompletableFutureDemo4 - StopWatch '': running time = 2403287309 ns
---------------------------------------------
ns % Task name
---------------------------------------------
2403287309 100% 用自定义线程池的方式使用 CompletableFuture 开始测试
复制代码
可以看见在不同的运动员测试结果公布时间差别特别大, 但是最终还是全部测试完成主线程才继续执行. 如果我把 allOf 修改为: anyOf 代替 allOf();
打印输入:
21:33:11.633 [pool-1-thread-2] INFO com.example.multithread.completablefuture.CompletableFutureDemo4 - 我是: jacky我的测试成绩是: 5s我给你4分
21:33:11.642 [main] INFO com.example.multithread.completablefuture.CompletableFutureDemo4 - 1.011479551s
21:33:11.642 [main] INFO com.example.multithread.completablefuture.CompletableFutureDemo4 - StopWatch '': running time = 1011479551 ns
---------------------------------------------
ns % Task name
---------------------------------------------
1011479551 100% 用自定义线程池的方式使用 CompletableFuture 开始测试
21:33:11.678 [pool-1-thread-9] INFO com.example.multithread.completablefuture.CompletableFutureDemo4 - 我是: perter我的测试成绩是: 9s我给你6分
21:33:12.075 [pool-1-thread-4] INFO com.example.multithread.completablefuture.CompletableFutureDemo4 - 我是: lucy我的测试成绩是: 2s我给你16分
21:33:12.676 [pool-1-thread-8] INFO com.example.multithread.completablefuture.CompletableFutureDemo4 - 我是: tom我的测试成绩是: 3s我给你31分
21:33:13.135 [pool-1-thread-6] INFO com.example.multithread.completablefuture.CompletableFutureDemo4 - 我是: tony我的测试成绩是: 5s我给你33分
复制代码
可以看出在获得了第一个运动员的成绩之后, 主线程立马开始了运行, 其他的比较慢的运动员并未阻塞主线程的运行. 但是最终还是全部都测试完成了, 如果我们把线程池线程都修改为守护线程.
打印输出:
21:46:29.114 [Thread-3] INFO com.example.multithread.completablefuture.CompletableFutureDemo4 - 我是: lucy我的测试成绩是: 0s我给你74分
21:46:29.122 [main] INFO com.example.multithread.completablefuture.CompletableFutureDemo4 - 1.302108399s
21:46:29.123 [main] INFO com.example.multithread.completablefuture.CompletableFutureDemo4 - StopWatch '': running time = 1302108399 ns
---------------------------------------------
ns % Task name
---------------------------------------------
1302108399 100% 用自定义线程池的方式使用 CompletableFuture 开始测试
复制代码
这次裁判只公布了最优秀的运动员的成绩, 后面的运动员都直接放弃了. 可见这种情况适用于处理同一个任务时, 如果有多个方式处理, 我们只接受处理最快的那一个方式, 其他的方式都直接放弃.
借助 java8 实战的总结
执行比较耗时的操作时,尤其是那些依赖一个或多个远程服务的操作,使用异步任务可以改善程序的性能,加快程序的响应速度。
你应该尽可能地为客户提供异步 API。使用 CompletableFuture 类提供的特性,你能够轻松地实现这一目标。
CompletableFuture 类还提供了异常管理的机制,让你有机会抛出/管理异步任务执行中发生的异常。
将同步 API 的调用封装到一个 CompletableFuture 中,你能够以异步的方式使用其结果。
如果异步任务之间相互独立,或者它们之间某一些的结果是另一些的输入,你可以将这些异步任务构造或者合并成一个。
你可以为 CompletableFuture 注册一个回调函数,在 Future 执行完毕或者它们计算的结果可用时,针对性地执行一些程序。
你可以决定在什么时候结束程序的运行,是等待由 CompletableFuture 对象构成的列表中所有的对象都执行完毕,还是只要其中任何一个首先完成就中止程序的运行。
重要: 文章内容完全参考: java8 实战一书总结学习.
重要: 文章内容完全参考: java8 实战一书总结学习.
重要: 文章内容完全参考: java8 实战一书总结学习.
评论