Java并发编程系列——Fork-Join

2020 年 04 月 30 日 阅读数: 631
Java并发编程系列——Fork-Join

上次回顾提到要写下线程池,这篇呢还是先放一下,再继续一下线程使用中的一些相关概念,所以本篇来通过一个例子介绍Java中的Fork-Join,当然例子中实现用到了ForkJoinPool,也是线程池的一种。

首先,我们提出一个要解决的问题,而且经过分析后,认为该例子用多线程实现是比较适合的。

问题:假设在当前的系统中有这样一个业务,有一个树形的工程结构,深度为三层,第一层为根节点,类型为Project,第二层为SubProject,最底层为Item。规则上Project只有一个,Project下可以包含若干的SubProject,而SubProject下有若干的Item。该结构有个重要的功能,是通过一层层的计算来确定工程的费用,每一层有自己的费用,而上层的费用则是下层费用的汇总加上该层自身的费用。

有了这个问题我们就来分析下题目如何解决。从规则上看,这个问题本身是诸多独立计算的汇总,也就是说可以把当前的一个大问题分解为若干的相同规则的小问题,而且彼此独立,而且在单线程下整个计算相当耗时(如果发现要解决的问题在多线程下完全没有优势,比如时间上的优势,程序结构上的优势等,则实现多线程的意义不大),这样分析下来,是符合使用Fork-Join的方式来实现的。

既然使用Fork-Join来实现,我们就先介绍下实现Fork-Join的一般形式:

class MyTask extens RecursiveTask/RecursiveAction/ForkJoinTask{
compute() {
if(condition) {
doCompute;
return result;
} else {
tasks = doSplitTask();
invokeAll(tasks)
return tasks.foreach.join;
}
}
}
ForkJoinPool pool;
MyTask task = new MyTask();
pool.invoke(task);
result = task.join();

解释一下这段伪码,首先是定义任务类,任务类继承自如上伪码中的任一个,RecursiveTask与RecursiveAction都继承自ForkJoinTask,通常使用这两个就可以,而这两者的主要区别是RecursiveTask可以带返回值,而RecursiveAction无返回值。实现compute方法,在compute中决定是否要继续拆分,并实现业务逻辑,最后使用ForkJoinPool的invoke来调用初始任务。

接下来我们就分步来实现,先定义我们的树结构

public static class TreeNode {
public TreeNode(NodeType type, long amount, TreeNode parent) {
this.type = type;
this.amount = amount;
if (parent != null) {
this.parent = parent;
this.parent.children.add(this);
}
}
public long calcCost() {
return this.amount;
}
private NodeType type;
private TreeNode parent;
private long amount;
private List<TreeNode> children = new ArrayList<>();
}

其中节点类型,我们定义一个枚举来表示

public enum NodeType {
PROJECT, SUB_PROJECT, ITEM
}

接下来定义任务类

public static class ComputeTask extends RecursiveTask<Long> {
private TreeNode node;
public ComputeTask(TreeNode node) {
this.node = node;
}
@Override
protected Long compute() {
if (NodeType.SUB_PROJECT.equals(node.type)) {
for (TreeNode item : this.node.children) {
this.node.amount += item.calcCost();
}
} else {
List<ComputeTask> tasks = new ArrayList<>();
for (TreeNode treeNode : this.node.children) {
ComputeTask task = new ComputeTask(treeNode);
tasks.add(task);
}
invokeAll(tasks);
for (ComputeTask task : tasks) {
this.node.amount += task.join();
}
}
return this.node.calcCost();
}
}

假设我们任务拆分规则为仅拆分到SubProject这一层,避免任务拆分过于细碎。

定义一个初始化方法,用来构造一个要计算的工程树。

private static TreeNode constructProject() {
TreeNode project = new TreeNode(NodeType.PROJECT, 0, null);
for (int i = 0; i < 20; i++) {
TreeNode subProject = new TreeNode(NodeType.SUB_PROJECT, 0, project);
for (int j = 0; j < 5000; j++) {
new TreeNode(NodeType.ITEM, j, subProject);
}
}
return project;
}

最后是如何调用并输出结果。

public static void main(String[] args) {
TreeNode project = constructProject();
ForkJoinPool pool = new ForkJoinPool();
ComputeTask task = new ComputeTask(project);
pool.invoke(task);
System.out.println(task.join());
}

这样一个工程树的计算就完成了。

这里仅举个例子来说明Fork-Join的用法,在实际使用过程中需要注意,invoke方式因为内部有执行join,所以可以说是一种同步的方式,而如果使用execute方法,则相当于异步的方式,在使用中需要注意应用场景。比如该示例中,最后如果不使用task.join()而直接获取project.amout,同样可以得到结果,但改成execute执行任务,而不使用join()获取结果,在同样位置是得不到正确的结果的。

本文仅较简略地介绍Fork-Join的一般使用方式,像Fork-Join内部实现了工作密取,对于这一点,先了解有这样一个机制。

下一篇应该会讲一下线程的一些常用工具类。

欢迎关注公众号“像什么”

本系列其他文章:

Java并发编程系列——分布式锁

Java并发编程系列——线程池

Java并发编程系列——锁顺序

Java并发编程系列——锁

Java并发编程系列——常用并发工具类

Java并发编程系列——线程的等待与唤醒

Java并发编程系列插曲——对象的内存结构

Java并发编程系列——线程

用户头像

孙苏勇

关注

不读书,思想就会停止。 2018.04.05 加入

公众号“像什么",记录想记录的。

评论

发布
暂无评论
Java并发编程系列——Fork-Join