解码 DolphinScheduler:Flink 任务如何 “跑” 起来?
- 2025-12-18 天津
本文字数:7887 字
阅读完需:约 26 分钟
作者 | leo 的小跟班
最近在对接 Dolphinscheduler 到公司系统中,想了解 Flink 任务是如何被调度起来的,于是在本地进行了一些调试,本文会进行一些说明。
启动 Dolphinscheduler
对于 Dolphinscheduler 的启动,这里直接选择了本地启动。运行dolphinscheduler-standalone-server模块中的StandaloneServer即可。
因为需要运行 Flink 任务,因此做了如下配置:
资源相关的配置:这里配置了 HDFS 相关的地址,因为需要在本地启动好 HDFS。
./sbin/start-dfs.sh
# resource storage type: HDFS, S3, NONEresource.storage.type=HDFS
# resource store on HDFS/S3 path, resource file will store to this hadoop hdfs path, self configuration, please make sure the directory exists on hdfs and have read write permissions. "/dolphinscheduler" is recommendedresource.storage.upload.base.path=/dolphinscheduler
# whether to startup kerberoshadoop.security.authentication.startup.state=false
# resource view suffixs#resource.view.suffixs=txt,log,sh,bat,conf,cfg,py,java,sql,xml,hql,properties,json,yml,yaml,ini,js
# if resource.storage.type=HDFS, the user must have the permission to create directories under the HDFS root pathresource.hdfs.root.user=lizu
# if resource.storage.type=S3, the value like: s3a://dolphinscheduler; if resource.storage.type=HDFS and namenode HA is enabled, you need to copy core-site.xml and hdfs-site.xml to conf dirresource.hdfs.fs.defaultFS=hdfs://0.0.0.0:9000
# use sudo or not, if set true, executing user is tenant user and deploy user needs sudo permissions; if set false, executing user is the deploy user and doesn't need sudo permissionssudo.enable=true
环境变量相关:直接配置的本机的环境变量信息,Dolphinscheduler 在运行的时候会判断
resources目录下是否有环境变量文件,存在就会执行source环境变量的命令。
# JAVA_HOME, will use it to start DolphinScheduler serverexport JAVA_HOME=/Library/Java/JavaVirtualMachines/jdk1.8.0_281.jdk/Contents/Homeexport PATH=$PATH:$JAVA_HOME/binexport M2_HOME=/Users/lizu/app/apache-maven-3.6.3export PATH=$PATH:$M2_HOME/binexport SCALA_HOME=/Users/lizu/app/scala-2.11.12#export SCALA_HOME=/Users/lizu/app/scala-2.12.13export PATH=$PATH:$SCALA_HOME/bin#export SPARK_HOME=/Users/lizu/app/spark-2.4.3-bin-hadoop2.7export SPARK_HOME=/Users/lizu/app/spark-3.0.2-bin-hadoop2.7export PATH=$PATH:$SPARK_HOME/binexport FLINK_HOME=/Users/lizu/app/flink-1.13.6export PATH=$PATH:$FLINK_HOME/binexport PATH=/Library/Frameworks/Python.framework/Versions/3.6/bin:$PATHexport HADOOP_HOME=/Users/lizu/app/hadoop-2.7.6export PATH=$PATH:$HADOOP_HOME/bin#export HIVE_HOME=/Users/lizu/app/apache-hive-1.2.1-binexport HIVE_HOME=/Users/lizu/app/hive-2.3.4export PATH=$PATH:$HIVE_HOME/bin#export NODE_HOME=/Users/lizu/app/node-v14.16.0export NODE_HOME=/Users/lizu/app/node-v16.17.0export PATH=$PATH:$NODE_HOME/binexport PATH=$PATH:/Users/lizu/app/go/binexport PATH=$PATH:/Users/lizu/app/gradle-6.3/binalias python="/usr/local/bin/python3"export PYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/pyspark:$SPARK_HOME/python/lib/py4j-0.10.9-src.zip:$PYTHONPATH
然后启动前端就可以访问了,地址如下:http://127.0.0.1:5173/
Flink 任务配置
启动 local 集群
因为要运行 Flink 任务,因此需要提前准备好 Flink 的环境,这里选择了 local 模型,直接在本地进行启动。
./start-cluster.sh
启动完成后就可以看到 Flink 的 web 页面了。
然后就可以去 Dolphinscheduler 配置 Flink 任务了。配置 Flink 任务主要分了如下几个步骤:
上传资源文件:可以直接选择官方 example 中的
SocketWindowWordCount.jar。
配置工作流:这里主要配置主函数以及前面上传的资源,因为
SocketWindowWordCount运行需要参数,这里在主程序参数中设置了其入参。
启动运行:最后可以启动进行测试,需要首先开启 socket 端口。
nc -l 9999
通过页面就可以看到 Flink 任务在执行中了。
Flink 任务运行相关源码
通过上面的内容,已经将 Flink 任务在 Dolphinscheduler 中运行了起来,最后来通过源码看看任务运行的一些细节吧。
在 Dolphinscheduler 中,任务是通过 Master 分配到 Worker 中进行运行的,并且任务运行的状态也会实时的通知到 Master。两者之间的交互是通过 netty 实现的。
worker 节点对任务的接收
worker 启动后,会启动 rpc 的服务端和客户端,以及一个workerManagerThread用于从队列中获取需要执行的任务。
@PostConstructpublic void run() { this.workerRpcServer.start(); this.workerRpcClient.start(); this.taskPluginManager.loadPlugin();
this.workerRegistryClient.setRegistryStoppable(this); this.workerRegistryClient.start();
this.workerManagerThread.start();
this.messageRetryRunner.start();
/* * registry hooks, which are called before the process exits */ Runtime.getRuntime().addShutdownHook(new Thread(() -> { if (!ServerLifeCycleManager.isStopped()) { close("WorkerServer shutdown hook"); } }));}
在WorkerRpcServer中注册了很多处理器,暂时只关注这一个吧TaskDispatchProcessor。
this.nettyRemotingServer.registerProcessor(CommandType.TASK_DISPATCH_REQUEST, taskDispatchProcessor);
TaskDispatchProcessor会接收从 Master 发送的task dispatch消息,并加入到 worker 任务队列waitSubmitQueue中。
final String workflowMasterAddress = taskDispatchCommand.getMessageSenderAddress();logger.info("Receive task dispatch request, command: {}", taskDispatchCommand);
//....忽略部分代码
WorkerDelayTaskExecuteRunnable workerTaskExecuteRunnable = WorkerTaskExecuteRunnableFactoryBuilder .createWorkerDelayTaskExecuteRunnableFactory( taskExecutionContext, workerConfig, workflowMasterAddress, workerMessageSender, alertClientService, taskPluginManager, storageOperate) .createWorkerTaskExecuteRunnable();// submit task to managerboolean offer = workerManager.offer(workerTaskExecuteRunnable);if (!offer) { logger.warn("submit task to wait queue error, queue is full, current queue size is {}, will send a task reject message to master", workerManager.getWaitSubmitQueueSize()); workerMessageSender.sendMessageWithRetry(taskExecutionContext, workflowMasterAddress, CommandType.TASK_REJECT);} else { logger.info("Submit task to wait queue success, current queue size is {}", workerManager.getWaitSubmitQueueSize());}
WorkerTaskExecuteRunnable 执行任务
WorkerManagerThread线程会不断的从 worker 任务队列waitSubmitQueue中获取到需要执行的任务,然后提交到线程池中。
@Overridepublic void run() { Thread.currentThread().setName("Worker-Execute-Manager-Thread"); while (!ServerLifeCycleManager.isStopped()) { try { if (!ServerLifeCycleManager.isRunning()) { Thread.sleep(Constants.SLEEP_TIME_MILLIS); } if (this.getThreadPoolQueueSize() <= workerExecThreads) { final WorkerDelayTaskExecuteRunnable workerDelayTaskExecuteRunnable = waitSubmitQueue.take(); workerExecService.submit(workerDelayTaskExecuteRunnable); } else { WorkerServerMetrics.incWorkerOverloadCount(); logger.info("Exec queue is full, waiting submit queue {}, waiting exec queue size {}", this.getWaitSubmitQueueSize(), this.getThreadPoolQueueSize()); ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS); } } catch (Exception e) { logger.error("An unexpected interrupt is happened, " + "the exception will be ignored and this thread will continue to run", e); } }}
//work线程池workerExecService = new WorkerExecService( ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", workerConfig.getExecThreads()), taskExecuteThreadMap);
最终的执行逻辑就在WorkerTaskExecuteRunnable中了。
@Overridepublic void run() { try { // set the thread name to make sure the log be written to the task log file Thread.currentThread().setName(taskExecutionContext.getTaskLogName());
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId()); logger.info("Begin to pulling task");
initializeTask();
if (Constants.DRY_RUN_FLAG_YES == taskExecutionContext.getDryRun()) { taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.SUCCESS); taskExecutionContext.setEndTime(new Date()); TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId()); workerMessageSender.sendMessageWithRetry(taskExecutionContext, masterAddress, CommandType.TASK_EXECUTE_RESULT); logger.info( "The current execute mode is dry run, will stop the subsequent process and set the taskInstance status to success"); return; }
beforeExecute();
TaskCallBack taskCallBack = TaskCallbackImpl.builder().workerMessageSender(workerMessageSender) .masterAddress(masterAddress).build(); executeTask(taskCallBack);
afterExecute();
} catch (Throwable ex) { logger.error("Task execute failed, due to meet an exception", ex); afterThrowing(ex); } finally { LoggerUtils.removeWorkflowAndTaskInstanceIdMDC(); }}
在WorkerTaskExecuteRunnable中主要进行了几个步骤:
initializeTask:设置任务的环境变量
Set task envFile,起始时间等。beforeExecute:任务执行的前置校验,任务状态变成
running。executeTask:任务的执行。
afterExecute:任务执行完成后的操作,通知
master修改状态。
FlinkTask 执行逻辑
最后来看看 Flink 任务的具体执行逻辑,executeTask中会调用具体实现类的handle方法。
@Overridepublic void executeTask(TaskCallBack taskCallBack) throws TaskException { if (task == null) { throw new TaskException("The task plugin instance is not initialized"); } task.handle(taskCallBack);}
Flink 任务的具体实现类是FlinkTask继承自AbstractYarnTask。
// todo split handle to submit and track@Overridepublic void handle(TaskCallBack taskCallBack) throws TaskException { try { // SHELL task exit code TaskResponse response = shellCommandExecutor.run(buildCommand()); setExitStatusCode(response.getExitStatusCode()); // set appIds setAppIds(String.join(TaskConstants.COMMA, getApplicationIds())); setProcessId(response.getProcessId()); } catch (InterruptedException ex) { Thread.currentThread().interrupt(); logger.info("The current yarn task has been interrupted", ex); setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE); throw new TaskException("The current yarn task has been interrupted", ex); } catch (Exception e) { logger.error("yarn process failure", e); exitStatusCode = -1; throw new TaskException("Execute task failed", e); }}
其本质就是调用shellCommandExecutor来执行,最终是提交了一个 shell 命令来执行 Flink 任务。
public TaskResponse run(String execCommand) throws IOException, InterruptedException { TaskResponse result = new TaskResponse(); int taskInstanceId = taskRequest.getTaskInstanceId(); if (null == TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId)) { result.setExitStatusCode(EXIT_CODE_KILL); return result; } if (StringUtils.isEmpty(execCommand)) { TaskExecutionContextCacheManager.removeByTaskInstanceId(taskInstanceId); return result; }
String commandFilePath = buildCommandFilePath();
// create command file if not exists createCommandFileIfNotExists(execCommand, commandFilePath);
// build process buildProcess(commandFilePath);
// parse process output parseProcessOutput(process);
int processId = getProcessId(process);
result.setProcessId(processId);
// cache processId taskRequest.setProcessId(processId); boolean updateTaskExecutionContextStatus = TaskExecutionContextCacheManager.updateTaskExecutionContext(taskRequest); if (Boolean.FALSE.equals(updateTaskExecutionContextStatus)) { ProcessUtils.kill(taskRequest); result.setExitStatusCode(EXIT_CODE_KILL); return result; } // print process id logger.info("process start, process id is: {}", processId);
// if timeout occurs, exit directly long remainTime = getRemainTime();
// waiting for the run to finish boolean status = process.waitFor(remainTime, TimeUnit.SECONDS);
// if SHELL task exit if (status) {
// SHELL task state result.setExitStatusCode(process.exitValue());
} else { logger.error("process has failure, the task timeout configuration value is:{}, ready to kill ...", taskRequest.getTaskTimeout()); ProcessUtils.kill(taskRequest); result.setExitStatusCode(EXIT_CODE_FAILURE); } int exitCode = process.exitValue(); String exitLogMessage = EXIT_CODE_KILL == exitCode ? "process has killed." : "process has exited."; logger.info(exitLogMessage + " execute path:{}, processId:{} ,exitStatusCode:{} ,processWaitForStatus:{} ,processExitValue:{}", taskRequest.getExecutePath(), processId, result.getExitStatusCode(), status, exitCode); return result;
}
我们可以通过执行日志来看看上面做了什么:
[INFO] 2023-06-22 23:49:12.520 +0800 - flink task command : flink run -p 1 -sae -c org.apache.flink.streaming.examples.socket.SocketWindowWordCount flink-example/SocketWindowWordCount.jar --hostname localhost --port 9999[INFO] 2023-06-22 23:49:12.520 +0800 - Begin to create command file:/tmp/dolphinscheduler/exec/process/lizu/9986178674720/9986206731168_3/5/5/5_5.command[INFO] 2023-06-22 23:49:12.521 +0800 - Success create command file, command: #!/bin/bashBASEDIR=$(cd `dirname $0`; pwd)cd $BASEDIRsource /Users/lizu/idea/scheduler/dolphinscheduler/dolphinscheduler-standalone-server/target/classes/dolphinscheduler_env.shflink run -p 1 -sae -c org.apache.flink.streaming.examples.socket.SocketWindowWordCount flink-example/SocketWindowWordCount.jar --hostname localhost --port 9999[INFO] 2023-06-22 23:49:12.526 +0800 - task run command: sudo -u lizu -E bash /tmp/dolphinscheduler/exec/process/lizu/9986178674720/9986206731168_3/5/5/5_5.command[INFO] 2023-06-22 23:49:12.547 +0800 - process start, process id is: 23572[INFO] 2023-06-22 23:49:15.560 +0800 - -> Job has been submitted with JobID 192c0f1b984f2bc10cd2ec6d39525fbb
其实就是将 Flink 运行的命令和环境变量信息都写入到了一个脚本文件中,然后去运行这个脚本文件,比如:(sudo -u lizu -E bash /tmp/dolphinscheduler/exec/process/lizu/9986178674720/9986206731168_3/5/5/5_5.command)。
如果是 Flink 任务则会启动一个CliFrontend进程并且代码中通过process.waitFor一直等待进程的返回,直到返回失败或者成功的标志。
最终还会通过afterExecute方法去通知master任务运行的情况。
protected void sendTaskResult() { taskExecutionContext.setCurrentExecutionStatus(task.getExitStatus()); taskExecutionContext.setEndTime(new Date()); taskExecutionContext.setProcessId(task.getProcessId()); taskExecutionContext.setAppIds(task.getAppIds()); taskExecutionContext.setVarPool(JSONUtils.toJsonString(task.getParameters().getVarPool())); workerMessageSender.sendMessageWithRetry(taskExecutionContext, masterAddress, CommandType.TASK_EXECUTE_RESULT);
logger.info("Send task execute result to master, the current task status: {}", taskExecutionContext.getCurrentExecutionStatus());}
相关运行日志
[INFO] 2023-06-22 23:49:15.560 +0800 - -> Job has been submitted with JobID 192c0f1b984f2bc10cd2ec6d39525fbb[INFO] 2023-06-23 00:11:35.405 +0800 - process has exited. execute path:/tmp/dolphinscheduler/exec/process/lizu/9986178674720/9986206731168_3/5/5, processId:23572 ,exitStatusCode:1 ,processWaitForStatus:true ,processExitValue:1[INFO] 2023-06-23 00:11:35.411 +0800 - Send task execute result to master, the current task status: TaskExecutionStatus{code=6, desc='failure'}
总结
上面就是 Flink 任务在 Dolphinscheduler 中如何运行的简单介绍,如果有不对的地方,希望可以提出来一起进步。
白鲸开源
一家开源原生的DataOps商业公司。 2022-03-18 加入
致力于打造下一代开源原生的DataOps 平台,助力企业在大数据和云时代,智能化地完成多数据源、多云及信创环境的数据集成、调度开发和治理,以提高企业解决数据问题的效率,提升企业分析洞察能力和决策能力。







评论