写点什么

基于 DAG 任务编排实现

作者:Bingo
  • 2023-11-19
    广东
  • 本文字数:6606 字

    阅读完需:约 22 分钟

场景

在任务开发场景中,比如大数据处理、CI/CD 等,一个作业通常包含多种类型任务,任务之间又有一定的执行依赖顺序。早期由于流程简单,可以通过代码硬编码方式满足需求,但随着业务的数量增加、流程的复杂度增加,往往会遇到以下问题:

  1. 研发成本高。业务接入和流程调整、新任务接入都需要大量的重复工作,研发成本非常高;

  2. 可维护性差。通过代码实现业务流程,改动和维护成本都非常高,频繁变更容易引起线上故障;

  3. 可观测性差。任务依赖顺序都是通过代码实现,实际的依赖顺序、执行情况无法有效地观测。

任务编排就为了解决上述问题,任务编排是指通过对任务进行标准化,以及任务之间的关系进行建模,使得任务按照定义的方式顺序执行。DAG 有向无环图则是任务编排模型中最常见的、有效的实现方式。


workflow-dag 是一个基于 Java 实现的 DAG 有向无环图。DAG 有向无环图是一个特殊类型的图,它由一组节点和有向边组成。在 DAG 中,边具有特定的方向,表示节点之间的流动或关系。


特征

作为任务编排的 DAG 支持以下特征:

  1. 支持任务自定义;

  2. 支持灵活的任务依赖顺序;

  3. 支持配置化描述 DAG;

  4. 支持任务间的并行执行;

  5. 支持度量统计等功能。


领域设计

  • DAG(流程): 有向无环图,描述有向无环图节点、边缘、依赖关系,以及负责合法性校验等;

  • Node(节点): 流程节点,描述有向无环图中单个节点信息,在可执行的 DAG 中描述的就是任务;

  • Task(任务): 执行任务,描述一类可执行的任务,负责具体任务执行的实现;

  • Engine(引擎): DAG 加载器和执行器,负责配置化解析、流程引擎执行。


代码实现

详细参考 Github:https://github.com/AaronSheng/workflow-dag


DAG:有向无环图

public interface DAG<T> {    void addEdge(Node<T> from, Node<T> to);      Set<Node<T>> getAllNodes();
boolean validate();
String getId();}
复制代码


public class DefaultDAG<T> implements DAG<T> {    private static final Logger LOGGER = LogManager.getLogger(DefaultDAG.class);
private final Set<Node<T>> nodes = new HashSet<>(); private final String id;
public DefaultDAG() { this.id = UUID.randomUUID().toString(); }
@Override public void addEdge(Node<T> from, Node<T> to) { if (from.equals(to)) { return; } from.addChildren(to); to.addParent(from); nodes.add(from); nodes.add(to); }
@Override public Set<Node<T>> getAllNodes() { return nodes; }
@Override public boolean validate() { Node<T> root = null; for (Node<T> node : nodes) { if (node.getParents().isEmpty()) { root = node; break; } } if (root == null) { return false; }
Map<String, Node<T>> visited = new HashMap<>(); return dfs(root, visited); }
private boolean dfs(Node<T> root, Map<String, Node<T>> visited) { if (root == null) { return true; }
visited.put(root.getId(), root); for (Node<T> child : root.getChildren()) { if (visited.containsKey(child.getId())) { LOGGER.warn("Node: {} is circled", child.getId()); return false; }
visited.put(child.getId(), child); if (!dfs(child, visited)) { return false; } } visited.remove(root.getId());
return true; }
@Override public String getId() { return id; }}
复制代码


Node:流程节点

public interface Node<T> {    // build children and parent    Set<Node<T>> getChildren();    Set<Node<T>> getParents();    void addChildren(Node<T> child);    void addParent(Node<T> parent);
String getId();
void setId(String id);
void setData(T data);
T getData();}
复制代码


public class DefaultNode<T> implements Node<T> {    private final Set<Node<T>> parents = new HashSet<>();    private final Set<Node<T>> children = new HashSet<>();
private String id; private T data;
public DefaultNode(String id, T data) { this.id = id; this.data = data; }
@Override public Set<Node<T>> getChildren() { return children; }
@Override public Set<Node<T>> getParents() { return parents; }
@Override public void addChildren(Node<T> child) { children.add(child); }
@Override public void addParent(Node<T> parent) { parents.add(parent); }
@Override public String getId() { return id; }
@Override public void setId(String id) { this.id = id; }
@Override public void setData(T data) { this.data = data; }
@Override public T getData() { return data; }}
复制代码


Task:执行任务

public interface Task {
public String getTaskName();
TaskOutput run(TaskInput input);}
复制代码


Engine:引擎

public class Engine {    private static final Logger LOGGER = LogManager.getLogger(Engine.class);
private static final int DEFAULT_EXEC_CONCURRENT = 1; private final ExecutorService executor; private final Map<String, Task> taskMapping = new HashMap<>();
public Engine() { this.executor = Executors.newFixedThreadPool(DEFAULT_EXEC_CONCURRENT); }
public Engine(int concurrent) { this.executor = Executors.newFixedThreadPool(concurrent); }
public Engine(ExecutorService executor) { this.executor = executor; }
public <T extends Task> void register(T task) { taskMapping.put(task.getTaskName(), task); }
public DAG<Task> load(String dag) { DAGVO dagDO = JsonUtil.parse(dag, new TypeReference<DAGVO>(){}); if (dagDO.getEdges() == null || dagDO.getEdges().isEmpty()) { LOGGER.warn("Config Attr:edges Is Empty"); throw new RuntimeException("Empty Edges"); } if (dagDO.getNodes() == null || dagDO.getNodes().isEmpty()) { LOGGER.warn("Config Attr:nodes Is Empty"); throw new RuntimeException("Empty Nodes"); }
// parse node Map<String, Node<Task>> nodeMapping = new HashMap<>(); for (NodeVO nodeVO : dagDO.getNodes()) { Task task = taskMapping.get(nodeVO.getTaskName()); if (task == null) { throw new RuntimeException(String.format("Task %s Not Found", nodeVO.getTaskName())); }
Node<Task> node = new DefaultNode<>(nodeVO.getId(), task); nodeMapping.put(node.getId(), node); }
// build edges DAG<Task> graph = new DefaultDAG<>(); for (EdgeVO edgeVO : dagDO.getEdges()) { Node<Task> from = nodeMapping.get(edgeVO.getFrom()); Node<Task> to = nodeMapping.get(edgeVO.getTo()); if (from == null || to == null) { throw new RuntimeException(String.format("Node %s or %s Not Found", edgeVO.getFrom(), edgeVO.getTo())); } graph.addEdge(from, to); }
// validate dag if (!graph.validate()) { throw new RuntimeException("Dag Is Invalid"); }
return graph; }
public Result execute(DAG<Task> graph, Map<String, String> parameters) { if (!graph.validate()) { return new Result().setSucceed(false) .setMessage("Graph Is Circled"); }
Context context = new Context(graph, parameters); InnerResult result = doExecute(graph.getAllNodes(), context);
return new Result() .setSucceed(result.isSucceed()) .setException(result.getException()) .setMessage(result.getMessage()) .setOutput(context.getParameters()); }
private InnerResult doExecute(Set<Node<Task>> nodes, Context context) { Set<Node<Task>> processed = context.getProcessed(); Map<String, String> parameters = context.getParameters();
// pick up can execute node List<Node<Task>> canExecuteNodeList = new ArrayList<>(); Map<String, Node<Task>> nodeMap = new HashMap<>(); for (Node<Task> node : nodes) { nodeMap.put(node.getId(), node); if (!processed.contains(node) && processed.containsAll(node.getParents())) { canExecuteNodeList.add(node); } }
// submit can execute node List<Future<TaskOutput>> futureList = new ArrayList<>(); for (Node<Task> node : canExecuteNodeList) { Future<TaskOutput> future = executor.submit(() -> { return doExecute(node, context); }); futureList.add(future); }
// process node execute output for (Future<TaskOutput> future : futureList) { TaskOutput output; try { output = future.get(); } catch (Throwable t) { throw new RuntimeException(t); }
Node<Task> node = nodeMap.get(output.getTaskId()); processed.add(node);
// process output parameters.putAll(output.getOutput()); if (!output.isSucceed()) { return new InnerResult().setSucceed(false).setMessage(output.getMessage()); }
// process children InnerResult result = doExecute(node.getChildren(), context); if (!result.isSucceed()) { return result; } }
return new InnerResult().setSucceed(true); }
private TaskOutput doExecute(Node<Task> node, Context context) { Task task = node.getData();
TaskOutput output = new TaskOutput(); output.setSucceed(true); output.setTaskId(node.getId());
try { output = task.run(new TaskInput().setTaskId(node.getId()).setParameters(context.getParameters())); } catch (Exception e) { output.setSucceed(false); output.setException(e); }
return output; }}
复制代码


使用方式

使用方式支持代码和配置化两种方式进行初始化和运行。


方法一:代码初加载运行

public class EngineTest {    private static final Logger LOGGER = LogManager.getLogger(EngineTest.class);
@Test public void testExecute() { DAG<Task> graph = new DefaultDAG<>();
PrintTask printTask = new PrintTask(); Node<Task> nodeA = new DefaultNode<>("A", printTask); Node<Task> nodeB = new DefaultNode<>("B", printTask); Node<Task> nodeC = new DefaultNode<>("C", printTask); Node<Task> nodeD = new DefaultNode<>("D", printTask); Node<Task> nodeE = new DefaultNode<>("E", printTask); Node<Task> nodeF = new DefaultNode<>("F", printTask);
graph.addEdge(nodeA, nodeB); graph.addEdge(nodeB, nodeC); graph.addEdge(nodeC, nodeE); graph.addEdge(nodeA, nodeD); graph.addEdge(nodeD, nodeE); graph.addEdge(nodeE, nodeF);
Map<String, String> parameters = new HashMap<>(); parameters.put("FlowID", graph.getId());
Engine engine = new Engine(); engine.register(printTask);
Result result = engine.execute(graph, parameters); LOGGER.info("Exec Flow:{} Succeed:{} Output:{}", graph.getId(), result.isSucceed(), result.getOutput()); }}
复制代码


方法二:配置化加载运行

public class LoadTest {
@Test public void testLoad() { String dag = "{\n" + " \"nodes\":[\n" + " {\n" + " \"id\":\"1\",\n" + " \"task_name\":\"PrintTask\"\n" + " },\n" + " {\n" + " \"id\":\"2\",\n" + " \"task_name\":\"PrintTask\"\n" + " },\n" + " {\n" + " \"id\":\"3\",\n" + " \"task_name\":\"PrintTask\"\n" + " },\n" + " {\n" + " \"id\":\"4\",\n" + " \"task_name\":\"PrintTask\"\n" + " }\n" + " ],\n" + " \"edges\":[\n" + " {\n" + " \"from\":\"1\",\n" + " \"to\":\"2\"\n" + " },\n" + " {\n" + " \"from\":\"1\",\n" + " \"to\":\"3\"\n" + " },\n" + " {\n" + " \"from\":\"2\",\n" + " \"to\":\"4\"\n" + " },\n" + " {\n" + " \"from\":\"3\",\n" + " \"to\":\"4\"\n" + " }\n" + " ],\n" + " \"parameters\":{\n" + "\n" + " }\n" + "}";
System.out.println(dag);
Engine engine = new Engine(); PrintTask printTask = new PrintTask(); engine.register(printTask);
DAG<Task> graph = engine.load(dag);
Map<String, String> parameters = new HashMap<>(); parameters.put("FlowID", graph.getId());
Result result = engine.execute(graph, parameters); System.out.printf("Exec Flow:%s Succeed:%s Output:%s\n", graph.getId(), result.isSucceed(), result.getMessage()); }}
复制代码


发布于: 16 分钟前阅读数: 9
用户头像

Bingo

关注

提升认知 2020-12-07 加入

还未添加个人简介

评论

发布
暂无评论
DAG_Java_Bingo_InfoQ写作社区