写点什么

达人专栏 | 还不会用 Apache Dolphinscheduler?大佬用时一个月写出的最全入门教程【三】

  • 2022 年 5 月 25 日
  • 本文字数:5171 字

    阅读完需:约 17 分钟

达人专栏 | 还不会用 Apache Dolphinscheduler?大佬用时一个月写出的最全入门教程【三】

作者 | 欧阳涛 招联金融大数据开发工程师


02 Master 启动流程


2.10 WorkFlowExecutorThread 里执行 Submit StandByTask 方法


SubmitStandByTask 干了 5 件事情:

  1. 从 ReadyToSubmitTaskQueue 中取出 TaskInstance。

  2. (这个 TaskInstance 是可以重试并且设定为强制成功了的)把 task 放到 completeTaskMap 以及 taskInstanceMap,并从队列中移除。

  3. 如果这个 task 是首次执行的话,就会先从 task 和 ProcessInstance 中获取参数(varPool)。

    【这一步的方法是 GetPreVarPool】

  4. 获取这个 task 依赖结果【这一步的方法是 GetDependResultForTask】

  5. 根据第 4 步获取的依赖结果,如果依赖结果为失败或者不执行,就从队列中移除,并且放到 FailedTaskMap 里的。如果依赖结果为成功则将执行 SubmitTaskExec 方法,同时会放到 CompleteTaskMap。至于 SubmitTaskExec 做了哪些事情将在 2.11 中说明。


2.11 WorkFlowExecutorThread 里执行 SubmitTaskExec 方法


SumbitTaskExec 干了 9 件事情:


  1. PackageTaskInstance 封装了 TaskInstance,就是将 TaskInstance 和 ProcessInstance 进行了绑定,并且获取到了 MainJar,ResourceList 这些信息。

  2. 根据 TaskType 获取 CommonTaskProcessor,这里采用 SPI 机制获取。如果想具体了解 SPI 机制的,可以百度搜索 AutoService 注解以及 ServiceLoad 进行详细了解。

  3. CommonTaskProcessor 初始化,也就是将 TaskInstance、ProcessInstance、ProcessService、MasterConfig 传递给 CommonTaskProcessor。

  4. 通知流程所在的主机,通过 netty 发送 Host 和 HostUpdateCommand。

  5. 将 CommonTaskProcessor 的 Action 为 submit(提交)状态。(这步极为重要)

  6. 放入到 ValidMap,TaskInstanceMap,ActiveTaskProcessorMaps 里。

  7. 将 CommonTaskProcessor 的 Action 设置成 Run 状态的。

  8. 将 task 以及 ProcessInstance 放入到 StateWheelExecuteThread 进行 checkout。

  9. 如果这个 task 执行完成就添加到 StateEvents 队列中。


下一节讲述 commonTaskProcessor 的 submit 状态。


2.12 CommonTaskProcessor 里执行 Submit Task 方法


回顾一下上节的第 5 步,CommonTaskProcessor 的 Action 设置为 Submit 之后,


去 ComonTaskProcessor 的父类 BaseTaskProcessor 找 Action 方法,在 Action 方法中有个 Switch 结构,很明显会进入 Submit 方法,之后就进入本节所说的 SubmitTask 方法的了。


SubmitTask 在这里干了三件事情:


  1. ProcessService。SubmitTaskWithRetry 可以重复 5 次(MasterConfig.GetTask CommitInterval)提交 task 任务,最后在 ProcessServiceImpl 执行 submitTask。

  2. 将此 task 的信息插入到 TaskGroupQueue 数据表中。

  3. DispatchTask 下发任务,将 Task 任务下发到实现了 TaskPriorityQueue 接口的 TaskPriorityQueueImpl 中去。


在 ProcessServiceImpl 如何执行 submitTask 将在 2.13 中说明,同时 DispatchTask 下发做了那些事情,将在 2.15 中说明。


2.13 ProcessServiceImpl 里执行 SubmitTask 方法


ProcessServiceImpl 是属于 Service 模块的,SubmitTask 主要干了 2 件事情:


  1. SubmitTaskInstanceToDB 将任务实例保存到数据库中,当然这里面有数据结构(TaskInstance)的变化,纯属业务的改变的。

  2. 如果此非结束状态,CreateSubWorkerProcess 创建子流程,如果没有子流程,直接跳过 2.14 的内容。进入 2.15。创建子流程做了哪些事情将在 2.14 中说明。


2.14 ProcessServiceImpl 里执行 CreateSubWork Process 方法


创建子流程需要干 6 件事情:


  1. FindWorkProcessMapByParent 查找父流程与 task 绑定的 ProcessInstanceMap,是流程实例与 Task 关系的表。

  2. SetProcessInstanceMap。设置刚刚查找的 ProcessInstanceMap,如果能找到以前跑的 ProcessInstanceMap,更新这个 ProcessInstanceMap,如果没有找到就创建新的 ProcessInstanceMap,并插入到数据库中。

  3. CreateSubProcessCommand,根据参数,父流程等创建子流程命令的(SubProcessCommand)。

  4. UpdateSubProcessDefinitionByParent 根据父流程更新子流程的定义。

  5. InitSubInstanceState 初始化子实例状态。

  6. CreateCommand 将创建的子流程命令插入数据库中。


这里 ProcessInstanceMap 并不是 jdk 包下的 map,而是表 t_ds_relation_process_instance 的数据的。里面存储了父流程实例以及任务的关系的。3 到 6 这些步骤都是 crud 的业务,里面具体的细节就赘述了。


2.15 CommonTaskProcessor 里执行 Dispatch Task 方法


DispatchTask 方法干了三件事情:

  1. 获取 TaskPriorityQueueImpl 的 bean。

  2. 将 TaskInstance,ProcessInstance 封装成 TaskPriority。

  3. 将封装后的 TaskPriority 放到这 bean 下的 queue 中去,这个队列是 jdk 的 PriorityBlockingQueue,是一个具有优先级别的无界阻塞队列。


此时将 DispatchTask 放进 task,那如何消费队列中的 task 的呢?2.16 将说明这个议题。


2.16 TaskPriorityQueueConsumer 执行 run 和 dispatchTask 方法


TaskPriorityQueueConsumer 是一个继承 Thread 的类。在 MasterServer 启动之后,根据 Spring 的特性,TaskPriorityQueueConsumer 会创建一个对象由 Spring 管理。TaskPriority 会执行 init 的方法。线程启动并且设置线程名字 Task UpdateConsumerThread。


Run 方法中以 3(MasterConfig.getMasterDispatch Task ) 次拉取为循环,每次 1 秒从队列中(BatchDispatch)拉取 TaskPriority,如果失败就有重新丢回到这队列中去。


随后对拉取的数据进行 DispatchTask 方法。


DispatchTask 方法中做了三件事情:


  1. 从 TaskPriority 中取出 context,根据 Command,ExecutorType 和 Workergroup 封装成 Execution Context。

  2. 将 ExecutionContext 交给 Executor Dispatcher 进行 Dispatcher,这将在 2.17 中说明。

  3. 如果发送成功,返回 result 为 true。将 TaskEvent 添加到 TaskEventService (addEvents)中,由 TaskEventService 进行管理的。TaskEventService 的说明将在 2.19 中介绍。


2.17 ExecutorDispatcher 里执行 Dispatch 方法


ExecutorDispatcher 这个类就干了三件事情:


  1. ExecutorDispatcher 此类实现了 InitializingBean。也就是创建过程中执行了 AfterPropertiesSet 方法,ExecutorManagers 注册了 Worker 和 Client 的 ExecutorType。

  2. Dispatch 方法中获取到了 Worker 的 ExecutorType,然后进行 HostManagar.select。在 Select 方法中会根据 MasterConfig 中的 Host-selector 策略选择机器,默认是 Lower-weight。如果读者有自定义的需求,则可以实现 HostManager 接口的。(Lower-weight 如何选择的,就不详细介绍了。因为难度并不大,也就是纯属业务的变化的,有兴趣就可以自行阅读的。)

  3. 选择完了 Host 之后,调用 ExecutorManager 进行 execute。这里的 EeforeExecute 和 AfterExecute 是没有内容的,如果读者有需求,同样可以在此添加内容的。在 2.18 中会说明 execute 的内容。


2.18 NettyExecutorManager 执行 execute 和 doExecute 方法


ExecutorManager 目前就一个实现类,就是 NettyExecutorManger。


在 init 方法中 NettyRemotingClient 注册了 TaskExecuteResponse、TaskExecuteAck 和

TaskKillResponse 的 Processor。这些 Processor 是用来让 Master 和 Worker 进行交互的。


在 Executor 方法中最核心的方法就是 DoExecute。

在 DoExecute 中 NettyRemotingClient 根据有效的 Host 发送 Command。如果发送失败了,剔除失败节点,将 task 重新添加到队列中。


至此,Master 就以 Command 形式发送 task 信息给 Worker,说明一下,此时的 Command 是 Remote 包下的 Command,与前面的 Command 没有任何关系的,不要混淆了。Master 和 Worker 的交互过程会在第四章节中讲述。


2.19 TaskEventService 执行 addEvents 方法


先说说 TaskEventService 创建过程。这是由 Spring 管理的,然后执行 Start 方法之后,有两个线程创建出来,一个是 TaskEventThread,另外一个是 TaskEventHandlerThread。在 TaskEventThread 会从 EventQueue 中取出 TaskEvent 事件进行提交(submitTaskEvent)。而 TaskEventHandlerThread 会执行 EventHandler 方法。EventHandler 中会从 TaskExecuteThreadMap 中取出数据来执行 executeEvent 方法。


那么 TaskExecuteThreadMap 如何插入数据的呢?答案就是本节所说的 addEvents 方法。


addEvents 方法中会调用 TaskExecuteThreadPool 中的 SubmitTaskEvent 方法。而在 SubmitTaskEvent 方法中最核心的功能就是往 TaskExecuteThreadMap 放入数据,也就是以 ProcessInstanceId 为 key,TaskExecuteThread 为 value 的 map,并且会调用 TaskExecuteThrad 的 addEvent 方法,将 event 放入到 events 队列中。


至于 TaskExecuteThread 做了哪些事情将在 2.20 中说明。


2.20 TaskExecuteThread 执行 Persist 方法


接上文 2.19 的在 TaskExecuteThreadPool 中 ExecuteEvent 方法。


执行 TaskExecuteThread 中的 run 方法。在 run 方法中从 events 队列中取出 TaskEvent,并执行 Persist 持久化操作的,将 task 信息保存到数据库中的。


在 Persist 方法中,重点是 Switch 结构下的内容。根据 DISPATCH,RUNNING,RESULT,执行不同的方法,封装不同的 TaskInstance 内容保存到数据库中,并发送请求给 Worker。


另外构建 StateEvent 对象,交给 WorkerflowExecuteThreadPool 进行处理持久化后的 StateEvent 对象。stateEvent 应该如何处理呢?请参考 2.22 的内容。


2.21 MasterSchedulerService 总结


MasterServer 的 MasterSchedulerService 已经基本讲完。回到最开始的 MasterServer 这部分,发现 MasterSchedulerService 后面的两个 bean 没有讲,也就是 EventExecuteService 以及 FailoverExecute Thread.这两个都是线程的,将在 2.22 和 2.23 中说明这最后两个 bean。


2.22 EventExecuteService 线程的 run 方法


在 MasterServer 调用 Start 方法后,EventExeuctor Service 的 run 方法执行过程如下:


  1. 每 100 毫秒执行 EventHandler 方法。

  2. 每次执行 EventHandler 方法时,从 2.5 章节的第 3 步 ProcessInstance ExecCacheManager 中取出 WorkFlowExecutorThread,通过 WorkflowExecuteThreadPool 执行 ExecuteEvent 方法。

  3. 在 ExecuteEvent 方法中, 可以发现最核心的方法就是 HandlerEvents 方法。

  4. 在 HandlerEvents 中可以发现,从 2.11 章节的第 9 步的 StateEvents 队列取出 StateEvent,然后在通过 StateEventHandler 方法进行判断的。

  5. 在 WorkflowExecutorThread 的 stateEventHandler 方法中,根据 StateEventType 的不同,以有 6 种不同类型的方法去调用,分别为 PROCESS_STATE _CHANGE、TASK_STATE_CHANGE、PROCESS_TIMEOUT、TASK_TIMEOUT、TASK_RETRY、PROCESS_BLOCKED。通过不同的 type 调用不同的方法,如 PROCESS_STATE_CHANGE 调用 ProcessStateChangeHandler 方法,这里就不详细讲述各个方法的内容了,其本质上也都是内存数据结构的变化。


P.S.:


  1. 如果 StateEventHandler 方法中某一类型成功执行,则从 StateEvents 队列中移除它了。

  2. 返回到 WorkflowExecuteThreadPool 类的 ExecuteEvent 方法中,执行完第 3 步之后,会有个回调函数,失败就执行 OnFailure 方法。成功就执行 OnSuccess 方法,NotifyProcessChanged 通知流程改变中,要么 NotifyMyself,要么通知其他流程 NotifyProcessChanged 的。


2.23 FailoverExecutorThread 线程的 Run 方法


此节为机器故障切换执行的线程,主要干了 5 件事情。具体执行流程如下:


  1. Run 方法中 FailoverService.checkMaster Failover 检查是否需要切换的 host。

  2. 如果有 host 的话,就进入 FailoveMaster WithLock 方法。在此方法中,从 zk 中通过分布式锁来进行切换机器,也就是进入 FailoverMaster 方法。

  3. 在 FailoverMaster 中,从 ProcessSerivce 里(QueryNeedFailover ProcessInstance)查询所需要切换的流程实例(NeedFailover ProcessInstanceList)。

  4. 接下来,就是通过 zk 获取有效的 WorkerServers.failoverTaskInstance 来切换 task。在切换 task 时有三个步骤,分别是:当是 Yarnjobs 时,则直接杀掉 ; 改变 task 的状态,也就是从 Running 到 Needfailover ; WorkflowExecutor ThreadPool 提交 StateEvent。

  5. 在 ProcessService 中处理该切换的流程,增加切换流程实例的 Command,插入数据库中。


下两章将继续讲述 Worker 和 Master 与 Worker 的交互。


参与贡献


随着国内开源的迅猛崛起,Apache DolphinScheduler 社区迎来蓬勃发展,为了做更好用、易用的调度,真诚欢迎热爱开源的伙伴加入到开源社区中来,为中国开源崛起献上一份自己的力量,让本土开源走向全球。


参与 DolphinScheduler 社区有非常多的参与贡献的方式,包括:


贡献第一个 PR(文档、代码) 我们也希望是简单的,第一个 PR 用于熟悉提交的流程和社区协作以及感受社区的友好度。


社区汇总了以下适合新手的问题列表:https://github.com/apache/dolphinscheduler/issues/5689


非新手问题列表:https://github.com/apache/dolphinscheduler/issues?q=is%3Aopen+is%3Aissue+label%3A%22volunteer+wanted%22


如何参与贡献链接:https://dolphinscheduler.apache.org/zh-cn/docs/development/contribute.html


来吧,DolphinScheduler 开源社区需要您的参与,为中国开源崛起添砖加瓦吧,哪怕只是小小的一块瓦,汇聚起来的力量也是巨大的。


参与开源可以近距离与各路高手切磋,迅速提升自己的技能,如果您想参与贡献,我们有个贡献者种子孵化群,可以添加社区小助手微信(Leonard-ds) ,手把手教会您( 贡献者不分水平高低,有问必答,关键是有一颗愿意贡献的心 )。


添加小助手微信时请说明想参与贡献。


来吧,开源社区非常期待您的参与。

发布于: 刚刚阅读数: 3
用户头像

分布式易扩展的可视化工作流任务调度平台 2022.03.18 加入

还未添加个人简介

评论

发布
暂无评论
达人专栏 | 还不会用 Apache Dolphinscheduler?大佬用时一个月写出的最全入门教程【三】_Apache_Apache DolphinScheduler_InfoQ写作社区