(一)3.1.9 生产“稳”担当:Master 服务启动源码全方位解析
- 2025-09-25 天津
本文字数:11524 字
阅读完需:约 38 分钟
作者 | 李杰 移动云,Apache DolphinScheduler 贡献者
在现代数据驱动的企业中,工作流调度系统是数据管道(Data Pipeline)的“中枢神经”。从 ETL 任务到机器学习训练,从报表生成到实时监控,几乎所有关键业务都依赖于一个稳定、高效、易扩展的调度引擎。
笔者认为 Apache DolphinScheduler 3.1.9 是稳定且广泛使用的版本,故本文将聚焦于这一版本,解析 Master 服务启动时相关流程,深入其源码核心,剖析其架构设计、模块划分与关键实现机制,帮助开发者理解 Master “如何工作”,并为进一步二次开发或性能优化打下基础。
本系列文章分为 3 个部分,分别为 Master Server 启动流程、Worker server 启动流程,以及相关流程图,本文为第一部分。
1. Master Server 启动核心概览
代码入口:org.apache.dolphinscheduler.server.master.MasterServer#run
public void run() throws SchedulerException { // 1、init rpc server this.masterRPCServer.start();
// 2、install task plugin this.taskPluginManager.loadPlugin();
// 3、self tolerant this.masterRegistryClient.start(); this.masterRegistryClient.setRegistryStoppable(this);
// 4、master 调度 this.masterSchedulerBootstrap.init(); this.masterSchedulerBootstrap.start();
// 5、事件执行服务 this.eventExecuteService.start(); // 6、容错机制 this.failoverExecuteThread.start();
// 7、Quartz调度 this.schedulerApi.start();
... }
1.1 rpc 启动:
描述:注册相关命令的 process 处理器,如 task 执行中、task 执行结果、终止 task 等。
代码入口:org.apache.dolphinscheduler.server.master.rpc.MasterRPCServer#start
public void start() { ... // 任务执行中的请求处理器 this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RUNNING, taskExecuteRunningProcessor); // 任务执行结果的请求处理器 this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESULT, taskExecuteResponseProcessor); // 任务终止的请求处理器 this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, taskKillResponseProcessor); this.nettyRemotingServer.registerProcessor(CommandType.STATE_EVENT_REQUEST, stateEventProcessor); this.nettyRemotingServer.registerProcessor(CommandType.TASK_FORCE_STATE_EVENT_REQUEST, taskEventProcessor); this.nettyRemotingServer.registerProcessor(CommandType.TASK_WAKEUP_EVENT_REQUEST, taskEventProcessor); this.nettyRemotingServer.registerProcessor(CommandType.CACHE_EXPIRE, cacheProcessor); this.nettyRemotingServer.registerProcessor(CommandType.TASK_REJECT, taskRecallProcessor); this.nettyRemotingServer.registerProcessor(CommandType.WORKFLOW_EXECUTING_DATA_REQUEST, workflowExecutingDataRequestProcessor); // 流式任务启动请求处理器 this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_START, taskExecuteStartProcessor);
// logger server // log相关,查看或者获取日志等操作的处理器 this.nettyRemotingServer.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST, loggerRequestProcessor); this.nettyRemotingServer.registerProcessor(CommandType.ROLL_VIEW_LOG_REQUEST, loggerRequestProcessor); this.nettyRemotingServer.registerProcessor(CommandType.VIEW_WHOLE_LOG_REQUEST, loggerRequestProcessor); this.nettyRemotingServer.registerProcessor(CommandType.REMOVE_TAK_LOG_REQUEST, loggerRequestProcessor);
this.nettyRemotingServer.start(); logger.info("Started Master RPC Server..."); }
1.2 任务插件初始化:
描述:task 的相关模板操作,如创建 task、解析 task 参数、获取 task 资源信息等。对于该插件,api、master、worker 都需要进行注册,在 master 的作用是设置数据源和 UDF 信息等。
1.3 Self Tolerant(Master 注册):
描述:将自身信息注册至注册中心(本文以 zookeeper 为例),同时监听自身、其他 master 和所有 worker 节点的注册情况变化,从而做相应的容错处理。
代码入口:org.apache.dolphinscheduler.server.master.registry.MasterRegistryClient#start
public void start() { try { this.masterHeartBeatTask = new MasterHeartBeatTask(masterConfig, registryClient); // 1、将自身信息注册至注册中心; registry(); // 2、监听自身与注册中心的连接情况; registryClient.addConnectionStateListener( new MasterConnectionStateListener(masterConfig, registryClient, masterConnectStrategy)); // 3、监听其他master与所有worker在注册中心的活跃情况,做相应的容错工作处理 // 如对灭亡的master上面的任务进行容错,同时将在worker节点上kill任务 registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_NODE, new MasterRegistryDataListener()); } catch (Exception e) { throw new RegistryException("Master registry client start up error", e); } }
1.4 Master 调度
描述:一个扫描线程,定时扫描数据库中的 command 表,根据不同的命令类型进行不同的业务操作,是工作流启动、实例容错等处理的核心逻辑。
代码入口:org.apache.dolphinscheduler.server.master.runner.MasterSchedulerBootstrap#run
public void run() { while (!ServerLifeCycleManager.isStopped()) { try { if (!ServerLifeCycleManager.isRunning()) { // the current server is not at running status, cannot consume command. logger.warn("The current server {} is not at running status, cannot consumes commands.", this.masterAddress); Thread.sleep(Constants.SLEEP_TIME_MILLIS); } // todo: if the workflow event queue is much, we need to handle the back pressure boolean isOverload = OSUtils.isOverload(masterConfig.getMaxCpuLoadAvg(), masterConfig.getReservedMemory()); // 如果cpu以及memory负载过高,那么就暂时不处理命令 if (isOverload) { logger.warn("The current server {} is overload, cannot consumes commands.", this.masterAddress); MasterServerMetrics.incMasterOverload(); Thread.sleep(Constants.SLEEP_TIME_MILLIS); continue; } // 从数据库中获取commands执行命令,如启动工作流,容错工作流实例等 List<Command> commands = findCommands(); if (CollectionUtils.isEmpty(commands)) { // indicate that no command ,sleep for 1s Thread.sleep(Constants.SLEEP_TIME_MILLIS); continue; }
// 将相应的commands转为工作流实例,转换成功后删除相应的commands List<ProcessInstance> processInstances = command2ProcessInstance(commands); if (CollectionUtils.isEmpty(processInstances)) { // indicate that the command transform to processInstance error, sleep for 1s Thread.sleep(Constants.SLEEP_TIME_MILLIS); continue; } MasterServerMetrics.incMasterConsumeCommand(commands.size());
processInstances.forEach(processInstance -> { try { LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId()); if (processInstanceExecCacheManager.contains(processInstance.getId())) { logger.error( "The workflow instance is already been cached, this case shouldn't be happened"); } // 创建工作流执行线程,负责DAG任务切分、任务提交监控、各种不同事件类型的逻辑处理 WorkflowExecuteRunnable workflowRunnable = new WorkflowExecuteRunnable(processInstance, processService, processInstanceDao, nettyExecutorManager, processAlertManager, masterConfig, stateWheelExecuteThread, curingGlobalParamsService); // 此处将每个工作流执行线程进行缓存,后续从缓存中获取该线程进行执行 processInstanceExecCacheManager.cache(processInstance.getId(), workflowRunnable); // 将启动工作流事件放入工作流事件队列中,然后workflowEventLooper不断从队列中获取事件进行处理 workflowEventQueue.addEvent(new WorkflowEvent(WorkflowEventType.START_WORKFLOW, processInstance.getId())); } finally { LoggerUtils.removeWorkflowInstanceIdMDC(); } }); } catch (InterruptedException interruptedException) { logger.warn("Master schedule bootstrap interrupted, close the loop", interruptedException); Thread.currentThread().interrupt(); break; } catch (Exception e) { logger.error("Master schedule workflow error", e); // sleep for 1s here to avoid the database down cause the exception boom ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS); } } }
上述步骤产生工作流事件后,WorkflowEventLooper 不断地消费处理:
public void run() { WorkflowEvent workflowEvent = null; while (!ServerLifeCycleManager.isStopped()) { ... workflowEvent = workflowEventQueue.poolEvent(); LoggerUtils.setWorkflowInstanceIdMDC(workflowEvent.getWorkflowInstanceId()); logger.info("Workflow event looper receive a workflow event: {}, will handle this", workflowEvent); WorkflowEventHandler workflowEventHandler = workflowEventHandlerMap.get(workflowEvent.getWorkflowEventType()); // 相应的事件处理器来处理工作流事件,主要功能是执行上述中缓存的工作流执行线程WorkflowExecuteRunnable workflowEventHandler.handleWorkflowEvent(workflowEvent); ... } }
启动 WorkflowExecuteRunnable 时,主要功能是初始化 DAG、提交且分发 task 等:
public WorkflowSubmitStatue call() { ... LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId()); if (workflowRunnableStatus == WorkflowRunnableStatus.CREATED) { // 构建工作流的DAG buildFlowDag(); workflowRunnableStatus = WorkflowRunnableStatus.INITIALIZE_DAG; logger.info("workflowStatue changed to :{}", workflowRunnableStatus); } if (workflowRunnableStatus == WorkflowRunnableStatus.INITIALIZE_DAG) { // 初始化相关队列, 将相关队列都清空 initTaskQueue(); workflowRunnableStatus = WorkflowRunnableStatus.INITIALIZE_QUEUE; logger.info("workflowStatue changed to :{}", workflowRunnableStatus); } if (workflowRunnableStatus == WorkflowRunnableStatus.INITIALIZE_QUEUE) { // 从起始节点开始执行,提交所有节点任务 submitPostNode(null); workflowRunnableStatus = WorkflowRunnableStatus.STARTED; logger.info("workflowStatue changed to :{}", workflowRunnableStatus); } return WorkflowSubmitStatue.SUCCESS; ... }
此时 parentNodeCode 为 null,表示从根节点开始启动所有 node:
private void submitPostNode(String parentNodeCode) throws StateEventHandleException { ... // 根据起点节点parentNodeCode获取其后续待执行的task List<TaskInstance> taskInstances=... for (TaskInstance task : taskInstances) { ... // 将task放到 “预提交”队列 readyToSubmitTaskQueue addTaskToStandByList(task); } // 处理“预提交”队列readyToSubmitTaskQueue,提交task submitStandByTask(); ... }
public void submitStandByTask() throws StateEventHandleException { int length = readyToSubmitTaskQueue.size(); for (int i = 0; i < length; i++) { TaskInstance task = readyToSubmitTaskQueue.peek(); ... // 检测task的依赖关系是否构建成功,如果成功,则进行提交操作 DependResult dependResult = getDependResultForTask(task); if (DependResult.SUCCESS == dependResult) { logger.info("The dependResult of task {} is success, so ready to submit to execute", task.getName()); // 提交task Optional<TaskInstance> taskInstanceOptional = submitTaskExec(task); // 提交失败 if (!taskInstanceOptional.isPresent()) { ... } else { // 提交成功,从“预提交”队里中清除该task removeTaskFromStandbyList(task); } } ... } }
private Optional<TaskInstance> submitTaskExec(TaskInstance taskInstance) { ... // 根据master侧任务类型(不是shell、spark那种, 此处是例如Common、Condition、SubTask、SwitchTask等),做相应的初始化操作,为了便于理解,本文采用通用task来处理 ITaskProcessor taskProcessor = TaskProcessorFactory.getTaskProcessor(taskInstance.getTaskType()); taskProcessor.init(taskInstance, processInstance); ... // 补充taskInstance参数,且提交保存至db boolean submit = taskProcessor.action(TaskAction.SUBMIT); ... // 若为通用task类型,则将任务提交到一个待dispatch的task队列taskPriorityQueue中,有消费者TaskPriorityQueueConsumer专门消费该队列 boolean dispatchSuccess = taskProcessor.action(TaskAction.DISPATCH); ... // 若为通用task类型,则不做任何处理 taskProcessor.action(TaskAction.RUN);
// 增加超时检测,若是超时,会发生告警 stateWheelExecuteThread.addTask4TimeoutCheck(processInstance, taskInstance); // 增加状态检查,当成功或者其他状态时,会进行相应的处理 stateWheelExecuteThread.addTask4StateCheck(processInstance, taskInstance); ... return Optional.of(taskInstance); ... }
TaskPriorityQueueConsumer 是一个专门消费上述 taskPriorityQueue 队列的线程,在程序启动时开始监听 taskPriorityQueue 队列:
public void run() { int fetchTaskNum = masterConfig.getDispatchTaskNumber(); while (!ServerLifeCycleManager.isStopped()) { try { // 消费需要dispatch的task // 为task挑选可用worker节点,然后将task分配至该worker节点 List<TaskPriority> failedDispatchTasks = this.batchDispatch(fetchTaskNum); ... } catch (Exception e) { TaskMetrics.incTaskDispatchError(); logger.error("dispatcher task error", e); } } }
public List<TaskPriority> batchDispatch(int fetchTaskNum) throws TaskPriorityQueueException, InterruptedException { ... // 利用多线程并发消费task CountDownLatch latch = new CountDownLatch(fetchTaskNum); for (int i = 0; i < fetchTaskNum; i++) { TaskPriority taskPriority = taskPriorityQueue.poll(Constants.SLEEP_TIME_MILLIS, TimeUnit.MILLISECONDS); ... consumerThreadPoolExecutor.submit(() -> { try { // 为task进行分发操作 boolean dispatchResult = this.dispatchTask(taskPriority); ... } finally { // make sure the latch countDown latch.countDown(); } }); }
latch.await(); ... }
protected boolean dispatchTask(TaskPriority taskPriority) { ... try { WorkflowExecuteRunnable workflowExecuteRunnable = processInstanceExecCacheManager.getByProcessInstanceId(taskPriority.getProcessInstanceId()); ... Optional<TaskInstance> taskInstanceOptional = workflowExecuteRunnable.getTaskInstance(taskPriority.getTaskId()); ... TaskInstance taskInstance = taskInstanceOptional.get(); TaskExecutionContext context = taskPriority.getTaskExecutionContext(); ExecutionContext executionContext = new ExecutionContext(toCommand(context), ExecutorType.WORKER, context.getWorkerGroup(), taskInstance); ... // 挑选可用worker节点,然后将task分配至该worker节点 result = dispatcher.dispatch(executionContext); ... } catch (RuntimeException | ExecuteException e) { logger.error("Master dispatch task to worker error, taskPriority: {}", taskPriority, e); } return result; }
具体的分发操作:
public Boolean dispatch(final ExecutionContext context) throws ExecuteException { ... // host select // 根据配置的选择器,筛选符合要求的worker节点信息 Host host = hostManager.select(context); ... context.setHost(host); ... // 将task信息通过RPC发送给挑选的worker节点,要是发送失败,则往其他可用的worker节点发送 return executorManager.execute(context); ... }
1.5 事件执行服务:
描述:主要负责工作流实例的事件队列的轮询,因为工作流在执行过程中会不断产生事件,如工作流提交失败、任务状态变更等,下面方法就是处理产生的的相关事件。
代码入口:org.apache.dolphinscheduler.server.master.runner.EventExecuteService#run
public void run() { while (!ServerLifeCycleManager.isStopped()) { try { // 处理工作流执行线程的相关事件,最终会触发WorkflowExecuteRunnable#handleEvents方法 workflowEventHandler(); // 处理流式任务执行线程的相关事件 streamTaskEventHandler(); TimeUnit.MILLISECONDS.sleep(Constants.SLEEP_TIME_MILLIS_SHORT); } ... } }
工作流和实时任务的事件处理逻辑基本一致,下述只描述工作流的处理过程:
public void handleEvents() { ... StateEvent stateEvent = null; while (!this.stateEvents.isEmpty()) { try { stateEvent = this.stateEvents.peek(); ... StateEventHandler stateEventHandler = StateEventHandlerManager.getStateEventHandler(stateEvent.getType()) .orElseThrow(() -> new StateEventHandleError( "Cannot find handler for the given state event")); logger.info("Begin to handle state event, {}", stateEvent); // 根据不同事件处理器做不同的处理逻辑 if (stateEventHandler.handleStateEvent(this, stateEvent)) { this.stateEvents.remove(stateEvent); } } ... } }
下面以工作流提交失败为例:
public boolean handleStateEvent(WorkflowExecuteRunnable workflowExecuteRunnable, StateEvent stateEvent) throws StateEventHandleException { WorkflowStateEvent workflowStateEvent = (WorkflowStateEvent) stateEvent; ProcessInstance processInstance = workflowExecuteRunnable.getProcessInstance(); measureProcessState(workflowStateEvent); log.info( "Handle workflow instance submit fail state event, the current workflow instance state {} will be changed to {}", processInstance.getState(), workflowStateEvent.getStatus()); // 将实例状态改为FAILURE后入库 workflowExecuteRunnable.updateProcessInstanceState(workflowStateEvent); workflowExecuteRunnable.endProcess(); return true; }
1.6 容错机制:
描述:主要负责 Master 容错和 Worker 容错的相关逻辑。
代码入口:org.apache.dolphinscheduler.server.master.service.MasterFailoverService#checkMasterFailover
public void checkMasterFailover() { // 获取需要容错的master节点 List<String> needFailoverMasterHosts = processService.queryNeedFailoverProcessInstanceHost() .stream() // failover myself || dead server // 自身或者发生已经灭亡的master .filter(host -> localAddress.equals(host) || !registryClient.checkNodeExists(host, NodeType.MASTER)) .distinct() .collect(Collectors.toList()); if (CollectionUtils.isEmpty(needFailoverMasterHosts)) { return; } ... for (String needFailoverMasterHost : needFailoverMasterHosts) { failoverMaster(needFailoverMasterHost); } }
private void doFailoverMaster(@NonNull String masterHost) { ... // 从注册中心获取master的启动时间 Optional<Date> masterStartupTimeOptional = getServerStartupTime(registryClient.getServerList(NodeType.MASTER), masterHost); // 从获取与当前master的需要容错的工作路实例(主要根据需要容错的状态去筛选,如:SUBMITTED_SUCCESS、RUNNING_EXECUTION) List<ProcessInstance> needFailoverProcessInstanceList = processService.queryNeedFailoverProcessInstances( masterHost); ... for (ProcessInstance processInstance : needFailoverProcessInstanceList) { ... // 判断该实例是否需要容错处理,判断逻辑例如: // 1、其他已经灭亡的master还未重新启动,此时需要进行容错 // 2、若工作流实例的启动时间比master的启动时间早,说明master重启过,此时需要容错 // ... if (!checkProcessInstanceNeedFailover(masterStartupTimeOptional, processInstance)) { LOGGER.info("WorkflowInstance doesn't need to failover"); continue; } List<TaskInstance> taskInstanceList =... for (TaskInstance taskInstance : taskInstanceList) { ... if (!checkTaskInstanceNeedFailover(taskInstance)) { LOGGER.info("The taskInstance doesn't need to failover"); continue; } // 对于worker侧的任务,需要进行kill处理,同时将任务实例状态标记为NEED_FAULT_TOLERANCE failoverTaskInstance(processInstance, taskInstance); ... } ProcessInstanceMetrics.incProcessInstanceByState("failover"); // updateProcessInstance host is null to mark this processInstance has been failover // and insert a failover command processInstance.setHost(Constants.NULL); // 生成需要容错的command入库,待master调度进行扫描 processService.processNeedFailoverProcessInstances(processInstance); ... } }
结语
以上是笔者对 Apache DolphinScheduler 3.1.9 版本特性与架构的初步理解,基于个人学习与实践整理而成,后续还会输出 Worker 启动流程以及 Master 与 Worker 的交互流程相关文章。由于水平有限,文中难免存在理解偏差或疏漏之处,恳请各位读者不吝指正。如有不同见解,欢迎交流讨论,共同进步。
版权声明: 本文为 InfoQ 作者【白鲸开源】的原创文章。
原文链接:【http://xie.infoq.cn/article/9b52a95d98cd694b59e2218ed】。
本文遵守【CC-BY 4.0】协议,转载请保留原文出处及本版权声明。
白鲸开源
一家开源原生的DataOps商业公司。 2022-03-18 加入
致力于打造下一代开源原生的DataOps 平台,助力企业在大数据和云时代,智能化地完成多数据源、多云及信创环境的数据集成、调度开发和治理,以提高企业解决数据问题的效率,提升企业分析洞察能力和决策能力。







评论