写点什么

【函数式编程实战】(六) Stream 高并发实战

  • 2022 年 7 月 25 日
  • 本文字数:2453 字

    阅读完需:约 8 分钟

前言:

📫 作者简介:小明 java 问道之路,专注于研究计算机底层,就职于金融公司后端高级工程师,擅长交易领域的高安全/可用/并发/性能的设计和架构📫

🏆 Java 领域优质创作者、阿里云专家博主、华为云享专家🏆

🔥 如果此文还不错的话,还请👍关注、点赞、收藏三连支持👍一下博主哦

本文导读

​​我们上讲看 Stream 接口提供大量 API 可以方便的处理元素,这讲 Stream 高并发(并发、并行、多线程)、ForkJoin 线程池框架的实战

一、并行流(parallelStream、parallel、sequential)

并行流就是把一个内容拆分成多个数据块来执行,用不同的线程分别处理每个数据块的流

parallelStream、.parallel()都可以将流转换成并行流,.parallel()的粒度更小。要注意的是顺序流( .stream) 调用 parallel 方法不意味本身有什么实际变化,它内部设置了一个 boolean,表示调用 parallel 之后的操作都是并行的

对并行流调用后可以使用 .sequential() 变成顺序流。

        List<String> collect10 = orderInfos.stream().parallel()                .map(OrderInfo::getOrderId).collect(toList());
BigDecimal totalSubOrderAmt1 = orderInfos.stream() .map(OrderInfo::getSubOrderInfoList) .flatMap(subOrderInfos -> subOrderInfos.stream()) .filter(subOrderInfo -> null != subOrderInfo.getSubOrderAmt()) .parallel() .map(SubOrderInfo::getSubOrderAmt) .sequential() .reduce(BigDecimal.ZERO, BigDecimal::add);
List<String> collect11 = orderInfos.parallelStream() .map(OrderInfo::getOrderId).collect(toList());
复制代码

并行流 不是一定并行,多线程中保证原子操作会有对象的可变状态,当多个线程共享对象时,共享的可变状态会不断被线程锁住,会影响并行流和并行计算

所以我们是否可以使用并行流需要避免共享变量,而且当较小的数据量的时候并行处理的开销不一定会小于计算开销,同时集合的数据结构也会对并行流有影响,ArrayList 拆分的开销,要小于 LinkedList(前者底层是数组可以直接在内存拆分,后者底层是链表,内存不连续需要遍历拆分,ArrayList、HashSet、TreeSet 可拆分性好,LinkedList 可拆分性极差)

二、Fork/Jion 框架

​​并行流的背后的原理是 java7 里面的分支/合并框架,分支/合并框架的目的就是以递归的方式,将数据块(任务)并行的拆分成更小的数据块,然后将每个子任务合并成整体,ExecutorService 接口的实现,ExecutorService 把子任务分配给线程池 ForkJionPool 中的工作线程。

要把任务提交到这个 ForkJionPool 线程池,必须创建 RecursiveTask 的实现类,要定义 RecursiveTask 只需要实现他的抽象方法 compute

创建一个 ForkJoinTask,并把对象传给我们自定义的 ForkJoinCalculator,创建一个 ForkJoinPool 并把任务传递给他的调用方法 invoke,返回值就是 ForkJoinCalculator 定义的结果

扩展 RecursiveTask,穿件 ForkJoin 框架,创建起始位置和终止位置,实现 compute 方法,实现 fork、join 金额累加

    public static void main(String[] args) {        List<OrderInfo> orderInfos = Arrays.asList(new OrderInfo("123", BigDecimal.ONE),                new OrderInfo("456", BigDecimal.TEN), new OrderInfo("789", BigDecimal.TEN));
// 创建一个 ForkJoinTask,并把对象传给我们自定义的ForkJoinCalculator ForkJoinCalculator orderInfoForkJoinTask = new ForkJoinCalculator(orderInfos, 0, orderInfos.size());
// 创建一个 ForkJoinPool 并把任务传递给他的调用方法,返回值就是ForkJoinCalculator定义的结果 System.out.println(new ForkJoinPool().invoke(orderInfoForkJoinTask)); }

/** * 扩展RecursiveTask,穿件ForkJoin框架,创建起始位置和终止位置 */ static class ForkJoinCalculator extends RecursiveTask<BigDecimal> { List<OrderInfo> orderInfos; int start; int end; BigDecimal amt;
public ForkJoinCalculator(List<OrderInfo> orderInfos, int start, int end) { this.orderInfos = orderInfos; this.amt = orderInfos.get(start).getOrderAmt(); this.start = start; this.end = end; }
@Override protected BigDecimal compute() { int length = end - start; // 如果大小等于阈值,返回结果 if (length < 2) return amt; // 订单金额
ForkJoinCalculator task = new ForkJoinCalculator(orderInfos, start, start + length / 2); task.fork(); ForkJoinCalculator task1 = new ForkJoinCalculator(orderInfos, start + length / 2, end); BigDecimal compute = task1.compute(); BigDecimal join = task.join(); System.out.println("compute:" + compute + " join:" + join); return compute.add(join); // 累加 } }
复制代码

Fork/Jion 框架还有几个框架需要注意的,对于一个任务调用 join 方法,会阻塞调用方,直到出任务结果;不应该在 RecursiveTask 里面实现 ForkJoinPool 应该使用 compute 或 fork 方法;compute 实现需要中断方法并且这里面实现比较困难需要多加练习。

工作窃取,在实际工作中任务差不多被平均分配到 ForkJoinPool 的所有线程,每个线程都为分配给线程的任务保存一个 双向 LinkedQueue(双向链式队列),每个人物完成就会从队列的头取一个在进行执行,但是某些线程执行可能过快,此时这个线程会从其他线程的 队列尾,取走一个任务执行,这也是 Fork/Jion 框架能有高性能的原因

总结

本文讲解 Stream 高并发(并发、并行、多线程)、ForkJoin 线程池框架的实战

发布于: 刚刚阅读数: 3
用户头像

物有本末,事有终始。知所先后,则近道矣 2020.03.20 加入

🏆CSDNJava领域优质创作者/阿里云专家博主/华为云享专家 📫就职某大型金融互联网公司后端高级工程师 👍专注于研究计算机底层/Java/架构/设计模式/算法

评论

发布
暂无评论
【函数式编程实战】(六) Stream高并发实战_Java_小明Java问道之路_InfoQ写作社区