写点什么

纯干货 | Dolphinscheduler Master 模块源码剖析

作者:白鲸开源
  • 2025-04-02
    天津
  • 本文字数:2969 字

    阅读完需:约 10 分钟

纯干货 | Dolphinscheduler Master模块源码剖析

此前我们曾用万字长文解释了Apache DolphinScheduler的Worker模块源码,今天,我们再来一起看看 Master 模块源码的原理。

Master Slot 计算


核心代码逻辑:org.apache.dolphinscheduler.server.master.registry.MasterSlotManager.SlotChangeListener#notify


public void notify(Map<String, MasterHeartBeat> masterNodeInfo) {    List<Server> serverList = masterNodeInfo.values().stream()            // TODO 这里其实就是过滤掉buzy的master节点            .filter(heartBeat -> !heartBeat.getServerStatus().equals(ServerStatus.BUSY))            .map(this::convertHeartBeatToServer).collect(Collectors.toList());    // TODO 同步master节点    syncMasterNodes(serverList);}
复制代码


计算 totalSlot 和 currentSlot


private void syncMasterNodes(List<Server> masterNodes) {    slotLock.lock();    try {        this.masterPriorityQueue.clear();        // TODO 这里会把所有的master节点都放入到masterPriorityQueue中,比如说 [192.168.220.1:12345,192.168.220.2:12345]        this.masterPriorityQueue.putAll(masterNodes);        // TODO 就是获取到本地ip的在队列中的位置        int tempCurrentSlot = masterPriorityQueue.getIndex(masterConfig.getMasterAddress());        // TODO 所有节点数量        int tempTotalSlot = masterNodes.size();        // TODO 正常情况下不会小于0        if (tempCurrentSlot < 0) {            totalSlot = 0;            currentSlot = 0;            log.warn("Current master is not in active master list");        } else if (tempCurrentSlot != currentSlot || tempTotalSlot != totalSlot) {            // TODO 这里其实就是记录的是比如说一共有两个slot,我的slot是0或者1            totalSlot = tempTotalSlot;            currentSlot = tempCurrentSlot;            log.info("Update master nodes, total master size: {}, current slot: {}", totalSlot, currentSlot);        }    } finally {        slotLock.unlock();    }}
复制代码


this.masterPriorityQueue.putAll(masterNodes); 会计算索引


public void putAll(Collection<Server> serverList) {    for (Server server : serverList) {        this.queue.put(server);    }    // TODO 这里更新了hostIndexMap,存放的是 <host:port> -> 索引    refreshMasterList();}

private void refreshMasterList() { hostIndexMap.clear(); Iterator<Server> iterator = queue.iterator(); int index = 0; while (iterator.hasNext()) { Server server = iterator.next(); String addr = NetUtils.getAddr(server.getHost(), server.getPort()); hostIndexMap.put(addr, index); index += 1; }
}
复制代码

Master 消费 Command 生成流程实例


command 最终的获取逻辑:


比如说两个Master节点 : masterCount=2 thisMasterSlot=0  master1masterCount=2 thisMasterSlot=1  master2
command中的数据如下 :1 master22 master13 master24 master1
select * from t_ds_command where id % #{masterCount} = #{thisMasterSlot} order by process_instance_priority, id asc limit #{limit}
复制代码


有没有感到疑惑,就是如果一个 master 更新到的最新的,一个没有更新到,怎么办?


比如说,master1节点是这样的1  master22  master13  master24  master1
比如说,master2节点是这样的,是不是发现master2节点都是他的,都可以拉取消费?那就导致重复消费,比如说1这个command1 master12 master13 master14 master1
复制代码


org.apache.dolphinscheduler.service.process.ProcessServiceImpl#handleCommand


@Transactionalpublic @Nullable ProcessInstance handleCommand(String host,                                                   Command command) throws CronParseException, CodeGenerateException {    // TODO 创建流程实例    ProcessInstance processInstance = constructProcessInstance(command, host);    // cannot construct process instance, return null    if (processInstance == null) {        log.error("scan command, command parameter is error: {}", command);        commandService.moveToErrorCommand(command, "process instance is null");        return null;    }    processInstance.setCommandType(command.getCommandType());    processInstance.addHistoryCmd(command.getCommandType());    processInstance.setTestFlag(command.getTestFlag());    // if the processDefinition is serial    ProcessDefinition processDefinition = this.findProcessDefinition(processInstance.getProcessDefinitionCode(),            processInstance.getProcessDefinitionVersion());    // TODO 是否是串行执行    if (processDefinition.getExecutionType().typeIsSerial()) {        saveSerialProcess(processInstance, processDefinition);        if (processInstance.getState() != WorkflowExecutionStatus.RUNNING_EXECUTION) {            setSubProcessParam(processInstance);            triggerRelationService.saveProcessInstanceTrigger(command.getId(), processInstance.getId());            deleteCommandWithCheck(command.getId());            // todo: this is a bad design to return null here, whether trigger the task            return null;        }    } else {        // TODO 并行执行        processInstanceDao.upsertProcessInstance(processInstance);    }
// TODO 这里其实还会向triggerRelation表中插入一条数据,是流程实例和triggerCode的关系 triggerRelationService.saveProcessInstanceTrigger(command.getId(), processInstance.getId()); // TODO 设置子流程参数 setSubProcessParam(processInstance); // TODO 删除command deleteCommandWithCheck(command.getId()); return processInstance;}
复制代码


注意:这个方法是加 @Transactional 的,所以说创建流程实例和删除 Command 是在一个事物里面的,如果不同的 Master 消费到同一个 Command。肯定会有一个删除 Command 失败,这时会抛出一个异常,这样就会让数据库进行回滚。

工作流启动流程


DAG 切分 & 任务提交


Master 事件状态流转


图连接 : Master事件状态流转


TaskEventService 组件中的 TaskEventDispatchThread(线程)和 TaskEventHandlerThread(线程)解析



其实就是 Master 自己状态(DISPATCH)和 Worker 汇报上来的状态(RUNNING、UPDATE_PID、RESULT)都会放入到 eventQueue,TaskEventDispatchThread(线程)会阻塞的方式进行获取,然后放入到对应的 TaskExecuteRunnable 中(注意 : 不执行的),只有通过 TaskEventHandlerThread(线程)才会使用 TaskExecuteThreadPool 线程进行 TaskExecuteRunnable 的提交。


转载自 Journey 原文链接:https://segmentfault.com/a/1190000044992842

用户头像

白鲸开源

关注

一家开源原生的DataOps商业公司。 2022-03-18 加入

致力于打造下一代开源原生的DataOps 平台,助力企业在大数据和云时代,智能化地完成多数据源、多云及信创环境的数据集成、调度开发和治理,以提高企业解决数据问题的效率,提升企业分析洞察能力和决策能力。

评论

发布
暂无评论
纯干货 | Dolphinscheduler Master模块源码剖析_开源_白鲸开源_InfoQ写作社区