写点什么

(二)3.1.9 生产“稳”担当:Apache DolphinScheduler Worker 服务源码全方位解析

作者:白鲸开源
  • 2025-09-25
    天津
  • 本文字数:7505 字

    阅读完需:约 25 分钟

作者 | 李杰 移动云,Apache DolphinScheduler 贡献者



在现代数据驱动的企业中,工作流调度系统是数据管道(Data Pipeline)的“中枢神经”。从 ETL 任务到机器学习训练,从报表生成到实时监控,几乎所有关键业务都依赖于一个稳定、高效、易扩展的调度引擎。


笔者认为 Apache DolphinScheduler 3.1.9 是稳定且广泛使用的版本,故本系列文章将深入其源码核心,剖析其架构设计、模块划分与关键实现机制,帮助开发者理解 Master 和 Worker “如何工作”,并为进一步二次开发或性能优化打下基础。


我们之前解读了 Apache DolphinScheduler 3.1.9版本源码的 Master server 启动流程,感兴趣的可以去查看。本文是 Apache DolphinScheduler 3.1.9 版本源码解读的第二篇:Worker Server 启动流程源码解读以及相关流程设计。结尾处附有相关流程图,供大家参考。

2. Worker Server 启动核心概览

  • 代码入口:org.apache.dolphinscheduler.server.worker.WorkerServer#run


public void run() {        // 1. rpc启动        this.workerRpcServer.start();        // 忽略,因为workerRpcServer初始化时包含workerRpcClient初始化的功能        this.workerRpcClient.start();        // 2. 任务插件初始化        this.taskPluginManager.loadPlugin();
this.workerRegistryClient.setRegistryStoppable(this); // 3. worker 注册 this.workerRegistryClient.start();
// 4. worker管理线程,不断从任务队列中waitSubmitQueue领取任务,提交到线程池处理 this.workerManagerThread.start();
// 5. 消息重试线程。负责轮询通过RPC发送服务,如当task在运行中,若未收到master的ack信息,会周期给master发送“运行中”信号 this.messageRetryRunner.start(); ... }
复制代码

2.1 rpc 启动:

  • 描述:注册相关命令的 process 处理器,如接收任务请求、停止任务请求等。

  • 代码入口:org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcServer#start


    public void start() {        LOGGER.info("Worker rpc server starting");        NettyServerConfig serverConfig = new NettyServerConfig();        serverConfig.setListenPort(workerConfig.getListenPort());        this.nettyRemotingServer = new NettyRemotingServer(serverConfig);        // 接收派发任务请求。然后将任务放置到任务队列waitSubmitQueue中,等待workerManagerThread去处理        this.nettyRemotingServer.registerProcessor(CommandType.TASK_DISPATCH_REQUEST, taskDispatchProcessor);        // 停止任务请求        this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST, taskKillProcessor);        // 接收任务运行中的ack请求        this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RUNNING_ACK,                taskExecuteRunningAckProcessor);        // 接收任务结果的ack请求        this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESULT_ACK, taskExecuteResultAckProcessor);        this.nettyRemotingServer.registerProcessor(CommandType.TASK_REJECT_ACK, taskRejectAckProcessor);        this.nettyRemotingServer.registerProcessor(CommandType.PROCESS_HOST_UPDATE_REQUEST, hostUpdateProcessor);        this.nettyRemotingServer.registerProcessor(CommandType.TASK_SAVEPOINT_REQUEST, taskSavePointProcessor);        // logger server        this.nettyRemotingServer.registerProcessor(CommandType.GET_APP_ID_REQUEST, loggerRequestProcessor);        this.nettyRemotingServer.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST, loggerRequestProcessor);        this.nettyRemotingServer.registerProcessor(CommandType.ROLL_VIEW_LOG_REQUEST, loggerRequestProcessor);        this.nettyRemotingServer.registerProcessor(CommandType.VIEW_WHOLE_LOG_REQUEST, loggerRequestProcessor);        this.nettyRemotingServer.registerProcessor(CommandType.REMOVE_TAK_LOG_REQUEST, loggerRequestProcessor);        this.nettyRemotingServer.start();        LOGGER.info("Worker rpc server started");    }
复制代码


此处以 TASK_DISPATCH_REQUEST 为例进行描述。当有任务从 master 派发请求时,worker 会接受TASK_DISPATCH_REQUEST的RPC请求,然后触发 process 处理器taskDispatchProcessor(org.apache.dolphinscheduler.server.worker.processor.TaskDispatchProcessor#process)的处理:


    public void process(Channel channel, Command command) {            ...            TaskExecutionContext taskExecutionContext = taskDispatchCommand.getTaskExecutionContext();            ...            // set cache, it will be used when kill task            TaskExecutionContextCacheManager.cacheTaskExecutionContext(taskExecutionContext);
// 设置执行任务的worker地址 taskExecutionContext.setHost(workerConfig.getWorkerAddress()); // 设置任务执行日志的目录 taskExecutionContext.setLogPath(LogUtils.getTaskLogPath(taskExecutionContext));
// 构建任务执行线程。整个任务执行需要依赖该线程 WorkerDelayTaskExecuteRunnable workerTaskExecuteRunnable = WorkerTaskExecuteRunnableFactoryBuilder .createWorkerDelayTaskExecuteRunnableFactory( taskExecutionContext, workerConfig, workflowMasterAddress, workerMessageSender, alertClientService, taskPluginManager, storageOperate) .createWorkerTaskExecuteRunnable(); // submit task to manager // 提交到一个task队列,然后有消费者消费该队列 boolean offer = workerManager.offer(workerTaskExecuteRunnable); ... }
复制代码


最终会提交给 waitSubmitQueue 队列,后续有消费者不断进行消费。


    public boolean offer(WorkerDelayTaskExecuteRunnable workerDelayTaskExecuteRunnable) {        if (workerConfig.getTaskExecuteThreadsFullPolicy() == TaskExecuteThreadsFullPolicy.CONTINUE) {            return waitSubmitQueue.offer(workerDelayTaskExecuteRunnable);        }
if (waitSubmitQueue.size() > workerExecThreads) { logger.warn("Wait submit queue is full, will retry submit task later"); WorkerServerMetrics.incWorkerSubmitQueueIsFullCount(); // if waitSubmitQueue is full, it will wait 1s, then try add ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS); if (waitSubmitQueue.size() > workerExecThreads) { return false; } } return waitSubmitQueue.offer(workerDelayTaskExecuteRunnable); }
复制代码

2.2 任务插件初始化:

  • 描述:task 的相关模板操作,如创建 task、解析 task 参数、获取 task 资源信息等。对于该插件,api、master、worker 都需要进行注册,在 worker 的作用是获取文件资源、创建任务信息等。

2.3 worker 注册:

  • 描述:将 worker 信息注册至注册中心(本文以 zookeeper 为例),同时监听注册变化情况。

  • 代码入口:org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient#start


    public void start() {        try {            // 1、将worker信息注册至注册中心(本文以zookeeper为例)            registry();            // 2、监听自身与注册中心的连接情况;            registryClient.addConnectionStateListener(                    new WorkerConnectionStateListener(workerConfig, registryClient, workerConnectStrategy));        } catch (Exception ex) {            throw new RegistryException("Worker registry client start up error", ex);        }    }
复制代码

2.4 worker 管理线程:

  • 描述:不断从任务队列中 waitSubmitQueue 领取任务,提交到线程池处理。

  • 代码入口:org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread#run


    public void run() {        Thread.currentThread().setName("Worker-Execute-Manager-Thread");        while (!ServerLifeCycleManager.isStopped()) {            try {                if (!ServerLifeCycleManager.isRunning()) {                    Thread.sleep(Constants.SLEEP_TIME_MILLIS);                }                // 1、如果任务线程池线程个数够用,则处理任务                if (this.getThreadPoolQueueSize() <= workerExecThreads) {                    // 消费task队列并且执行任务执行线程                    final WorkerDelayTaskExecuteRunnable workerDelayTaskExecuteRunnable = waitSubmitQueue.take();                    workerExecService.submit(workerDelayTaskExecuteRunnable);                } else {                    // 2、若线程池资源紧张,则进行循环等待                    WorkerServerMetrics.incWorkerOverloadCount();                    logger.info("Exec queue is full, waiting submit queue {}, waiting exec queue size {}",                            this.getWaitSubmitQueueSize(), this.getThreadPoolQueueSize());                    ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);                }            } catch (Exception e) {                logger.error("An unexpected interrupt is happened, "                        + "the exception will be ignored and this thread will continue to run", e);            }        }    }
复制代码


workerDelayTaskExecuteRunnable核心内容:


public void run() {            ...            // 初始化任务,如任务启动时间等            initializeTask();            ...            // 任务执行前的操作            beforeExecute();
// 任务回调,如更改任务的appId。可以先忽略 TaskCallBack taskCallBack = TaskCallbackImpl.builder().workerMessageSender(workerMessageSender) .masterAddress(masterAddress).build(); // 执行任务,等待结果 executeTask(taskCallBack); // 任务执行后的操作 afterExecute(); ... }
复制代码


初始化:


   protected void initializeTask() {        logger.info("Begin to initialize task");
// 设置任务启动时间 Date taskStartTime = new Date(); taskExecutionContext.setStartTime(taskStartTime); logger.info("Set task startTime: {}", taskStartTime);
// 获取环境变量,默认从dolphinscheduler_env.sh获取 String systemEnvPath = CommonUtils.getSystemEnvPath(); taskExecutionContext.setEnvFile(systemEnvPath); logger.info("Set task envFile: {}", systemEnvPath);
String taskAppId = String.format("%s_%s", taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId()); taskExecutionContext.setTaskAppId(taskAppId); logger.info("Set task appId: {}", taskAppId);
logger.info("End initialize task"); }
复制代码


执行前:


   protected void beforeExecute() {        // 设置任务状态为运行中        taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.RUNNING_EXECUTION);        // 向master发送“运行中”信号,且将任务的关键信息一起发送,如任务执行节点、任务日志目录等        workerMessageSender.sendMessageWithRetry(taskExecutionContext, masterAddress, CommandType.TASK_EXECUTE_RUNNING);        logger.info("Set task status to {}", TaskExecutionStatus.RUNNING_EXECUTION);
// 查看租户是否存在 TaskExecutionCheckerUtils.checkTenantExist(workerConfig, taskExecutionContext); logger.info("TenantCode:{} check success", taskExecutionContext.getTenantCode());
// 创建任务执行目录(是一个本地的临时目录) TaskExecutionCheckerUtils.createProcessLocalPathIfAbsent(taskExecutionContext); logger.info("ProcessExecDir:{} check success", taskExecutionContext.getExecutePath());
// 从存储介质下载文件资源(如从hdfs下载aa.jar) TaskExecutionCheckerUtils.downloadResourcesIfNeeded(storageOperate, taskExecutionContext, logger); logger.info("Resources:{} check success", taskExecutionContext.getResources());
TaskChannel taskChannel = taskPluginManager.getTaskChannelMap().get(taskExecutionContext.getTaskType()); if (null == taskChannel) { throw new TaskPluginException(String.format("%s task plugin not found, please check config file.", taskExecutionContext.getTaskType())); } // 利用task插件创建任务信息,此处的task是是具体的任务类型,如shell、spark等 task = taskChannel.createTask(taskExecutionContext); if (task == null) { throw new TaskPluginException(String.format("%s task is null, please check the task plugin is correct", taskExecutionContext.getTaskType())); } logger.info("Task plugin: {} create success", taskExecutionContext.getTaskType());
// 任务参数准备。如将shell任务执行脚本解析出来、将spark任务的jar包、main类解析出来 task.init(); logger.info("Success initialized task plugin instance success");
task.getParameters().setVarPool(taskExecutionContext.getVarPool()); logger.info("Success set taskVarPool: {}", taskExecutionContext.getVarPool());
}
复制代码


任务的具体执行。如 shell 任务的具体执行过程:org.apache.dolphinscheduler.plugin.task.shell.ShellTask#handle


    public void handle(TaskCallBack taskCallBack) throws TaskException {        try {            // construct process            // 利用shell任务内容在执行目录生成一个脚本文件            String command = buildCommand();            // 执行脚本等待结果            TaskResponse commandExecuteResult = shellCommandExecutor.run(command);            // 设置执行情况            setExitStatusCode(commandExecuteResult.getExitStatusCode());            // 设置进程id            setProcessId(commandExecuteResult.getProcessId());            shellParameters.dealOutParam(shellCommandExecutor.getVarPool());        } catch (InterruptedException e) {            Thread.currentThread().interrupt();            logger.error("The current Shell task has been interrupted", e);            setExitStatusCode(EXIT_CODE_FAILURE);            throw new TaskException("The current Shell task has been interrupted", e);        } catch (Exception e) {            logger.error("shell task error", e);            setExitStatusCode(EXIT_CODE_FAILURE);            throw new TaskException("Execute shell task error", e);        }    }
复制代码


执行后:


    protected void afterExecute() throws TaskException {        if (task == null) {            throw new TaskException("The current task instance is null");        }        // 发送告警相关信息        sendAlertIfNeeded();
// 往master发送任务结果 sendTaskResult();
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId()); logger.info("Remove the current task execute context from worker cache"); // 清理任务执行目录 clearTaskExecPathIfNeeded(); }
复制代码

2.5 消息重试线程:

  • 描述:对于 worker 向 master 发送的 RPC 请求。如“任务运行中”、“任务结束”等命令,若未收到 master 的 ack 回复时,此重试线程会间隔 5min 进行命令重新发送操作。直至收到 ack 请求或者收到停止任务命令。

3. 相关流程图

官网描述了很多流程图,如 master、worker 容错机制流程图、分布式锁实现流程图等,详见:https://dolphinscheduler.apache.org/zh-cn/docs/3.1.9/contribute/architecture-designhttps://dolphinscheduler.apache.org/zh-cn/docs/3.1.9/architecture/design


本文补充任务派发与任务停止流程图,且只描述正常的实例启动、停止流程,不包含实例容错恢复场景,不包含相关锁以及并发场景。


  • 任务派发流程:



  • 任务停止流程:


结语

以上是笔者对 Apache DolphinScheduler 3.1.9 版本特性与架构的初步理解,基于个人学习与实践整理而成。由于水平有限,文中难免存在理解偏差或疏漏之处,恳请各位读者不吝指正。如有不同见解,欢迎交流讨论,共同进步。


如果你对 Apache DolphinScheduler 的源码有兴趣,可以深入研究其任务调度策略的细节部分,或者根据自身业务场景进行二次开发,充分发挥 DolphinScheduler 的调度能力。

发布于: 刚刚阅读数: 4
用户头像

白鲸开源

关注

一家开源原生的DataOps商业公司。 2022-03-18 加入

致力于打造下一代开源原生的DataOps 平台,助力企业在大数据和云时代,智能化地完成多数据源、多云及信创环境的数据集成、调度开发和治理,以提高企业解决数据问题的效率,提升企业分析洞察能力和决策能力。

评论

发布
暂无评论
(二)3.1.9 生产“稳”担当:Apache DolphinScheduler Worker 服务源码全方位解析_大数据_白鲸开源_InfoQ写作社区