基于 DAG 任务编排实现
- 2023-11-19 广东
本文字数:6606 字
阅读完需:约 22 分钟
场景
在任务开发场景中,比如大数据处理、CI/CD 等,一个作业通常包含多种类型任务,任务之间又有一定的执行依赖顺序。早期由于流程简单,可以通过代码硬编码方式满足需求,但随着业务的数量增加、流程的复杂度增加,往往会遇到以下问题:
研发成本高。业务接入和流程调整、新任务接入都需要大量的重复工作,研发成本非常高;
可维护性差。通过代码实现业务流程,改动和维护成本都非常高,频繁变更容易引起线上故障;
可观测性差。任务依赖顺序都是通过代码实现,实际的依赖顺序、执行情况无法有效地观测。
任务编排就为了解决上述问题,任务编排是指通过对任务进行标准化,以及任务之间的关系进行建模,使得任务按照定义的方式顺序执行。DAG 有向无环图则是任务编排模型中最常见的、有效的实现方式。
workflow-dag 是一个基于 Java 实现的 DAG 有向无环图。DAG 有向无环图是一个特殊类型的图,它由一组节点和有向边组成。在 DAG 中,边具有特定的方向,表示节点之间的流动或关系。
特征
作为任务编排的 DAG 支持以下特征:
支持任务自定义;
支持灵活的任务依赖顺序;
支持配置化描述 DAG;
支持任务间的并行执行;
支持度量统计等功能。
领域设计
DAG(流程): 有向无环图,描述有向无环图节点、边缘、依赖关系,以及负责合法性校验等;
Node(节点): 流程节点,描述有向无环图中单个节点信息,在可执行的 DAG 中描述的就是任务;
Task(任务): 执行任务,描述一类可执行的任务,负责具体任务执行的实现;
Engine(引擎): DAG 加载器和执行器,负责配置化解析、流程引擎执行。
代码实现
详细参考 Github:https://github.com/AaronSheng/workflow-dag
DAG:有向无环图
public interface DAG<T> {
void addEdge(Node<T> from, Node<T> to);
Set<Node<T>> getAllNodes();
boolean validate();
String getId();
}
public class DefaultDAG<T> implements DAG<T> {
private static final Logger LOGGER = LogManager.getLogger(DefaultDAG.class);
private final Set<Node<T>> nodes = new HashSet<>();
private final String id;
public DefaultDAG() {
this.id = UUID.randomUUID().toString();
}
@Override
public void addEdge(Node<T> from, Node<T> to) {
if (from.equals(to)) {
return;
}
from.addChildren(to);
to.addParent(from);
nodes.add(from);
nodes.add(to);
}
@Override
public Set<Node<T>> getAllNodes() {
return nodes;
}
@Override
public boolean validate() {
Node<T> root = null;
for (Node<T> node : nodes) {
if (node.getParents().isEmpty()) {
root = node;
break;
}
}
if (root == null) {
return false;
}
Map<String, Node<T>> visited = new HashMap<>();
return dfs(root, visited);
}
private boolean dfs(Node<T> root, Map<String, Node<T>> visited) {
if (root == null) {
return true;
}
visited.put(root.getId(), root);
for (Node<T> child : root.getChildren()) {
if (visited.containsKey(child.getId())) {
LOGGER.warn("Node: {} is circled", child.getId());
return false;
}
visited.put(child.getId(), child);
if (!dfs(child, visited)) {
return false;
}
}
visited.remove(root.getId());
return true;
}
@Override
public String getId() {
return id;
}
}
Node:流程节点
public interface Node<T> {
// build children and parent
Set<Node<T>> getChildren();
Set<Node<T>> getParents();
void addChildren(Node<T> child);
void addParent(Node<T> parent);
String getId();
void setId(String id);
void setData(T data);
T getData();
}
public class DefaultNode<T> implements Node<T> {
private final Set<Node<T>> parents = new HashSet<>();
private final Set<Node<T>> children = new HashSet<>();
private String id;
private T data;
public DefaultNode(String id, T data) {
this.id = id;
this.data = data;
}
@Override
public Set<Node<T>> getChildren() {
return children;
}
@Override
public Set<Node<T>> getParents() {
return parents;
}
@Override
public void addChildren(Node<T> child) {
children.add(child);
}
@Override
public void addParent(Node<T> parent) {
parents.add(parent);
}
@Override
public String getId() {
return id;
}
@Override
public void setId(String id) {
this.id = id;
}
@Override
public void setData(T data) {
this.data = data;
}
@Override
public T getData() {
return data;
}
}
Task:执行任务
public interface Task {
public String getTaskName();
TaskOutput run(TaskInput input);
}
Engine:引擎
public class Engine {
private static final Logger LOGGER = LogManager.getLogger(Engine.class);
private static final int DEFAULT_EXEC_CONCURRENT = 1;
private final ExecutorService executor;
private final Map<String, Task> taskMapping = new HashMap<>();
public Engine() {
this.executor = Executors.newFixedThreadPool(DEFAULT_EXEC_CONCURRENT);
}
public Engine(int concurrent) {
this.executor = Executors.newFixedThreadPool(concurrent);
}
public Engine(ExecutorService executor) {
this.executor = executor;
}
public <T extends Task> void register(T task) {
taskMapping.put(task.getTaskName(), task);
}
public DAG<Task> load(String dag) {
DAGVO dagDO = JsonUtil.parse(dag, new TypeReference<DAGVO>(){});
if (dagDO.getEdges() == null || dagDO.getEdges().isEmpty()) {
LOGGER.warn("Config Attr:edges Is Empty");
throw new RuntimeException("Empty Edges");
}
if (dagDO.getNodes() == null || dagDO.getNodes().isEmpty()) {
LOGGER.warn("Config Attr:nodes Is Empty");
throw new RuntimeException("Empty Nodes");
}
// parse node
Map<String, Node<Task>> nodeMapping = new HashMap<>();
for (NodeVO nodeVO : dagDO.getNodes()) {
Task task = taskMapping.get(nodeVO.getTaskName());
if (task == null) {
throw new RuntimeException(String.format("Task %s Not Found", nodeVO.getTaskName()));
}
Node<Task> node = new DefaultNode<>(nodeVO.getId(), task);
nodeMapping.put(node.getId(), node);
}
// build edges
DAG<Task> graph = new DefaultDAG<>();
for (EdgeVO edgeVO : dagDO.getEdges()) {
Node<Task> from = nodeMapping.get(edgeVO.getFrom());
Node<Task> to = nodeMapping.get(edgeVO.getTo());
if (from == null || to == null) {
throw new RuntimeException(String.format("Node %s or %s Not Found", edgeVO.getFrom(), edgeVO.getTo()));
}
graph.addEdge(from, to);
}
// validate dag
if (!graph.validate()) {
throw new RuntimeException("Dag Is Invalid");
}
return graph;
}
public Result execute(DAG<Task> graph, Map<String, String> parameters) {
if (!graph.validate()) {
return new Result().setSucceed(false)
.setMessage("Graph Is Circled");
}
Context context = new Context(graph, parameters);
InnerResult result = doExecute(graph.getAllNodes(), context);
return new Result()
.setSucceed(result.isSucceed())
.setException(result.getException())
.setMessage(result.getMessage())
.setOutput(context.getParameters());
}
private InnerResult doExecute(Set<Node<Task>> nodes, Context context) {
Set<Node<Task>> processed = context.getProcessed();
Map<String, String> parameters = context.getParameters();
// pick up can execute node
List<Node<Task>> canExecuteNodeList = new ArrayList<>();
Map<String, Node<Task>> nodeMap = new HashMap<>();
for (Node<Task> node : nodes) {
nodeMap.put(node.getId(), node);
if (!processed.contains(node) && processed.containsAll(node.getParents())) {
canExecuteNodeList.add(node);
}
}
// submit can execute node
List<Future<TaskOutput>> futureList = new ArrayList<>();
for (Node<Task> node : canExecuteNodeList) {
Future<TaskOutput> future = executor.submit(() -> {
return doExecute(node, context);
});
futureList.add(future);
}
// process node execute output
for (Future<TaskOutput> future : futureList) {
TaskOutput output;
try {
output = future.get();
} catch (Throwable t) {
throw new RuntimeException(t);
}
Node<Task> node = nodeMap.get(output.getTaskId());
processed.add(node);
// process output
parameters.putAll(output.getOutput());
if (!output.isSucceed()) {
return new InnerResult().setSucceed(false).setMessage(output.getMessage());
}
// process children
InnerResult result = doExecute(node.getChildren(), context);
if (!result.isSucceed()) {
return result;
}
}
return new InnerResult().setSucceed(true);
}
private TaskOutput doExecute(Node<Task> node, Context context) {
Task task = node.getData();
TaskOutput output = new TaskOutput();
output.setSucceed(true);
output.setTaskId(node.getId());
try {
output = task.run(new TaskInput().setTaskId(node.getId()).setParameters(context.getParameters()));
} catch (Exception e) {
output.setSucceed(false);
output.setException(e);
}
return output;
}
}
使用方式
使用方式支持代码和配置化两种方式进行初始化和运行。
方法一:代码初加载运行
public class EngineTest {
private static final Logger LOGGER = LogManager.getLogger(EngineTest.class);
@Test
public void testExecute() {
DAG<Task> graph = new DefaultDAG<>();
PrintTask printTask = new PrintTask();
Node<Task> nodeA = new DefaultNode<>("A", printTask);
Node<Task> nodeB = new DefaultNode<>("B", printTask);
Node<Task> nodeC = new DefaultNode<>("C", printTask);
Node<Task> nodeD = new DefaultNode<>("D", printTask);
Node<Task> nodeE = new DefaultNode<>("E", printTask);
Node<Task> nodeF = new DefaultNode<>("F", printTask);
graph.addEdge(nodeA, nodeB);
graph.addEdge(nodeB, nodeC);
graph.addEdge(nodeC, nodeE);
graph.addEdge(nodeA, nodeD);
graph.addEdge(nodeD, nodeE);
graph.addEdge(nodeE, nodeF);
Map<String, String> parameters = new HashMap<>();
parameters.put("FlowID", graph.getId());
Engine engine = new Engine();
engine.register(printTask);
Result result = engine.execute(graph, parameters);
LOGGER.info("Exec Flow:{} Succeed:{} Output:{}", graph.getId(), result.isSucceed(), result.getOutput());
}
}
方法二:配置化加载运行
public class LoadTest {
@Test
public void testLoad() {
String dag = "{\n" +
" \"nodes\":[\n" +
" {\n" +
" \"id\":\"1\",\n" +
" \"task_name\":\"PrintTask\"\n" +
" },\n" +
" {\n" +
" \"id\":\"2\",\n" +
" \"task_name\":\"PrintTask\"\n" +
" },\n" +
" {\n" +
" \"id\":\"3\",\n" +
" \"task_name\":\"PrintTask\"\n" +
" },\n" +
" {\n" +
" \"id\":\"4\",\n" +
" \"task_name\":\"PrintTask\"\n" +
" }\n" +
" ],\n" +
" \"edges\":[\n" +
" {\n" +
" \"from\":\"1\",\n" +
" \"to\":\"2\"\n" +
" },\n" +
" {\n" +
" \"from\":\"1\",\n" +
" \"to\":\"3\"\n" +
" },\n" +
" {\n" +
" \"from\":\"2\",\n" +
" \"to\":\"4\"\n" +
" },\n" +
" {\n" +
" \"from\":\"3\",\n" +
" \"to\":\"4\"\n" +
" }\n" +
" ],\n" +
" \"parameters\":{\n" +
"\n" +
" }\n" +
"}";
System.out.println(dag);
Engine engine = new Engine();
PrintTask printTask = new PrintTask();
engine.register(printTask);
DAG<Task> graph = engine.load(dag);
Map<String, String> parameters = new HashMap<>();
parameters.put("FlowID", graph.getId());
Result result = engine.execute(graph, parameters);
System.out.printf("Exec Flow:%s Succeed:%s Output:%s\n", graph.getId(), result.isSucceed(), result.getMessage());
}
}
版权声明: 本文为 InfoQ 作者【Bingo】的原创文章。
原文链接:【http://xie.infoq.cn/article/1398d580927a86c6879564309】。文章转载请联系作者。
Bingo
提升认知 2020-12-07 加入
还未添加个人简介
评论