写点什么

Dolphinscheduler DAG 核心源码剖析

作者:白鲸开源
  • 2024-12-03
    天津
  • 本文字数:3578 字

    阅读完需:约 12 分钟

Dolphinscheduler DAG核心源码剖析

背景描述


注意 : 在 Dolphinscheduler 中,离线任务是有完整的声明周期的,比如说停止、暂停、暂停恢复、重跑等等,都是以 DAG(有向无环图的形式进行任务组织)T+1 离线任务的。

Dolphinscheduler DAG 实现

org.apache.dolphinscheduler.common.graph.DAG


DAG 三个重要的数据结构 :


// 顶点信息private final Map<Node, NodeInfo> nodesMap;
// 边关联信息,作用是记录顶点和边的关系,可以找到叶子节点,也可以获取下游节点private final Map<Node, Map<Node, EdgeInfo>> edgesMap;
// 反向边关联信息,作用是可以快速找到入度为0的节点(起始节点),也可以获取上游节点private final Map<Node, Map<Node, EdgeInfo>> reverseEdgesMap;
复制代码


如下示例 :


DAG<String, String, String> graph = new DAG<>();graph.addNode("A", "A");graph.addNode("B", "B");graph.addNode("C", "C");
// 添加一个B -> C的边,当前A还飘着呢graph.addEdge("B", "C");
// 如果添加A -> B,其实就是会从B开始一直到子节点,看有没有可连接的线到A,如果有,说明这个A -> B的边添加不得,因为会形成环,否则就可以添加graph.addEdge("A", "B");
复制代码


源码分析 :org.apache.dolphinscheduler.common.graph.DAG#addEdge


public boolean addEdge(Node fromNode, Node toNode, EdgeInfo edge, boolean createNode) {    lock.writeLock().lock();
try { // TODO 是否可以添加该边 if (!isLegalAddEdge(fromNode, toNode, createNode)) { log.error("serious error: add edge({} -> {}) is invalid, cause cycle!", fromNode, toNode); return false; }
// TODO 添加节点 addNodeIfAbsent(fromNode, null); addNodeIfAbsent(toNode, null);
// TODO 添加边 addEdge(fromNode, toNode, edge, edgesMap); addEdge(toNode, fromNode, edge, reverseEdgesMap);
return true; } finally { lock.writeLock().unlock(); }}

private boolean isLegalAddEdge(Node fromNode, Node toNode, boolean createNode) { // TODO 如果fromNode和toNode两个是同一个顶点,这个边是不能添加的 if (fromNode.equals(toNode)) { log.error("edge fromNode({}) can't equals toNode({})", fromNode, toNode); return false; }
// TODO 这里其实就是想说,不是创建节点,也就是说要求fromNode和toNode是需要存在的顶点 if (!createNode) { if (!containsNode(fromNode) || !containsNode(toNode)) { log.error("edge fromNode({}) or toNode({}) is not in vertices map", fromNode, toNode); return false; } }
// Whether an edge can be successfully added(fromNode -> toNode),need to determine whether the // DAG has cycle! // TODO 这里获取节点的数量 int verticesCount = getNodesCount();
Queue<Node> queue = new LinkedList<>();
// TODO 将toNode放入到queue中 queue.add(toNode);
// if DAG doesn't find fromNode, it's not has cycle! // TODO 当queue不为空,这里肯定就不为空了 while (!queue.isEmpty() && (--verticesCount > 0)) { // TODO 获取队列里面的元素 Node key = queue.poll();
for (Node subsequentNode : getSubsequentNodes(key)) { // TODO 其实这里判断的是比如说A -> B 有连接的DAG图,传入的是节点B,看B节点的边是不是有A,如果有A说明已经有B -> A的关联了, // TODO 就不能添加了。如果比如说B的下游节点,比如说 A -> B -> C,这样的话,B的下游节点就是C,C是需要放入queue中的 // TODO 核心思想其实就是要找到它要添加的目标节点的连线,是否有目标节点到源节点的连线存在(这样来判断是否存在环) if (subsequentNode.equals(fromNode)) { return false; }
queue.add(subsequentNode); } }
return true;}
复制代码

Dolphinscheduler DagHelper 解说

DAG 类是一个基础通用的 DAG 工具类,而 DagHelper 是任务定义、任务定义直接的关系组装成 DAG 的一个业务工具类。


org.apache.dolphinscheduler.server.master.graph.WorkflowGraphFactory#createWorkflowGraph


public IWorkflowGraph createWorkflowGraph(ProcessInstance workflowInstance) throws Exception {
// TODO 这里其实就是获取的流程实例对应的任务数和之间的关系 List<ProcessTaskRelation> processTaskRelations = processService.findRelationByCode( workflowInstance.getProcessDefinitionCode(), workflowInstance.getProcessDefinitionVersion());
// TODO 获取对应的任务定义log List<TaskDefinitionLog> taskDefinitionLogs = taskDefinitionLogDao.queryTaskDefineLogList(processTaskRelations);
// TODO 获取TaskNode List<TaskNode> taskNodeList = processService.transformTask(processTaskRelations, taskDefinitionLogs);
// generate process to get DAG info // TODO 这里其实解析的是是否自己手动指定的启动节点列表,默认不会 List<Long> recoveryTaskNodeCodeList = getRecoveryTaskNodeCodeList(workflowInstance.getCommandParam());
// TODO 如果 默认startNodeNameList为空 List<Long> startNodeNameList = parseStartNodeName(workflowInstance.getCommandParam());
// TODO 构建ProcessDag对象实例 ProcessDag processDag = DagHelper.generateFlowDag( taskNodeList, startNodeNameList, recoveryTaskNodeCodeList, workflowInstance.getTaskDependType());
if (processDag == null) { log.error("ProcessDag is null"); throw new IllegalArgumentException("Create WorkflowGraph failed, ProcessDag is null"); }
// TODO 生成DAG DAG<Long, TaskNode, TaskNodeRelation> dagGraph = DagHelper.buildDagGraph(processDag); log.debug("Build dag success, dag: {}", dagGraph);
// TODO 使用WorkflowGraph来封装任务节点列表和dagGraph return new WorkflowGraph(taskNodeList, dagGraph);}
复制代码


org.apache.dolphinscheduler.service.utils.DagHelper#generateFlowDag


public static ProcessDag generateFlowDag(                                             List<TaskNode> totalTaskNodeList,                                             List<Long> startNodeNameList,                                             List<Long> recoveryNodeCodeList,                                             TaskDependType depNodeType) throws Exception {
// TODO 其实就是拿到所有的节点 List<TaskNode> destTaskNodeList = generateFlowNodeListByStartNode( totalTaskNodeList, startNodeNameList, recoveryNodeCodeList, depNodeType);
if (destTaskNodeList.isEmpty()) { return null; }
// TODO 获取任务节点之前的关系 List<TaskNodeRelation> taskNodeRelations = generateRelationListByFlowNodes(destTaskNodeList);
// TODO 其实就是实例化一个ProcessDag ProcessDag processDag = new ProcessDag(); // TODO 设置DAG的边 processDag.setEdges(taskNodeRelations); // TODO 设置DAG的顶点 processDag.setNodes(destTaskNodeList); return processDag;}
复制代码


设置了 destTaskNodeList 和 taskNodeRelations


org.apache.dolphinscheduler.service.utils.DagHelper#buildDagGraph


public static DAG<Long, TaskNode, TaskNodeRelation> buildDagGraph(ProcessDag processDag) {
DAG<Long, TaskNode, TaskNodeRelation> dag = new DAG<>();
// TODO 添加顶点 if (CollectionUtils.isNotEmpty(processDag.getNodes())) { for (TaskNode node : processDag.getNodes()) { dag.addNode(node.getCode(), node); } }
// TODO 添加边 if (CollectionUtils.isNotEmpty(processDag.getEdges())) { for (TaskNodeRelation edge : processDag.getEdges()) { dag.addEdge(edge.getStartNode(), edge.getEndNode()); } } return dag;}
复制代码


用户头像

白鲸开源

关注

一家开源原生的DataOps商业公司。 2022-03-18 加入

致力于打造下一代开源原生的DataOps 平台,助力企业在大数据和云时代,智能化地完成多数据源、多云及信创环境的数据集成、调度开发和治理,以提高企业解决数据问题的效率,提升企业分析洞察能力和决策能力。

评论

发布
暂无评论
Dolphinscheduler DAG核心源码剖析_GitHub_白鲸开源_InfoQ写作社区