写点什么

DolphinScheduler 自身容错导致的服务器持续崩溃重大问题的排查与解决

作者:白鲸开源
  • 2025-01-09
    天津
  • 本文字数:13061 字

    阅读完需:约 43 分钟

DolphinScheduler自身容错导致的服务器持续崩溃重大问题的排查与解决

01 问题复现

在 DolphinScheduler 中有如下一个 Shell 任务:


current_timestamp() {      date +"%Y-%m-%d %H:%M:%S"  }
TIMESTAMP=$(current_timestamp)echo $TIMESTAMPsleep 60
复制代码


在 DolphinScheduler 将工作流执行策略设置为并行:



定时周期调度设置为 10 秒一次:


将定时调度上线后,会调度执行任务,此时一切正常:


此时将 Master 节点给 kill 掉,模拟宕机:


$ jps1979710 AlertServer1979626 WorkerServer1979546 MasterServer1979794 ApiApplicationServer1980483 Jps$ kill -9 1979546
复制代码


去到 DolphinScheduler 中查看,发现 Master 已经不存在了:


此时观察 DolphinScheduler 工作流执行,发现其不会继续调度任务执行了,并且所有的任务则会一直执行下去,直到报错。


当过了一段时间后(模拟发现了宕机问题),此时重启 DolphinScheduler:


sh bin/stop-all.shsh bin/start-all.sh
复制代码


重启完成后,就会将之前没有执行成功的任务,包括没有执行的调度任务,全部都执行一次:


这就有一个致命的问题:如果都是高性能任务的话,就会导致 CPU、内存被打满,从而让服务器整个宕机!!!


02 多场景测试

  • Master 宕机后,重启整个 DS:会产生上述问题。

  • Master 宕机后,重启相应的 Master:会产生上述问题。——有缺陷,官方没有单独的 Master 后台启动,只有前台启动的脚本,但可以重复执行 start-all.sh。

  • Worker 宕机后,重启整个 DS:不会产生上述问题。——因为 Master 会持续的调度任务,而 Worker 宕机后的结果就是调度任务直接失败。

  • Worker 宕机后,重启相应的 Worker:不会产生上述问题。——有缺陷,官方没有单独的 Worker 后台启动,只有前台启动的脚本,但可以重复执行 start-all.sh。

  • DS 整个宕机后,重启整个 DS:会产生上述问题。

  • DS 使用 stop-all.sh 停止后,重启整个 DS:会产生上述问题。


其核心就是在于 Master,只要配置了周期任务,无论 Master 是宕机还是调用脚本关闭的,其都会产生上述问题。

03 原理分析

DolphinScheduler 核心角色:


  • MasterServer 主要负责 DAG 任务切分、任务提交监控,并同时监听其它 MasterServer 和 WorkerServer 的健康状态。MasterServer 服务启动时向 Zookeeper 注册临时节点,通过监听 Zookeeper 临时节点变化来进行容错处理。

  • WorkerServer 主要负责任务的执行和提供日志服务。WorkerServer 服务启动时向 Zookeeper 注册临时节点,并维持心跳。

  • ApiServer 主要负责处理前端 UI 层的请求。


大致的任务运行流程如下:


  • 在 API-Server 中创建任务,并将元数据持久化到 DB 中。

  • 通过手动点击或定时执行生成一个触发工作流执行的 Command 写入 DB。

  • Master 消费 DB 中的 Command,开始执行工作流,并将工作流中的任务分发给 Worker 执行。

  • 当整个工作流执行结束之后,Master 结束工作流的执行。


参考官网,上述的 DolphinScheduler 核心任务执行流程可以细化为如下:


鉴于任务调度的复杂性,一个大的流程可以划分为小的流程,在主线流程之外还附加了支线流程,下面对执行调度流程拆分进行分析一下,这样更容易理解:


在本次问题中,主要关注的就是 Command 分发流程。其 Command 分发流程是一个异步分布式生产消费模式。


i. 首先是生产者 api-server,会将用户的运行工作流 http 请求封装成 command 数据,insert 到 t_ds_command 表中,如下是一个启动工作流实例的 command 样例(老版本):


{    "commandType": "START_PROCESS",    "processDefinitionCode": 14285512555584,    "executorId": 1,    "commandParam": "{}",    "taskDependType": "TASK_POST",    "failureStrategy": "CONTINUE",    "warningType": "NONE",    "startTime": 1723444881372,    "processInstancePriority": "MEDIUM",    "updateTime": 1723444881372,    "workerGroup": "default",    "tenantCode": "default",    "environmentCode": -1,    "dryRun": 0,    "processInstanceId": 0,    "processDefinitionVersion": 1,    "testFlag": 0}
复制代码


ii.其次是消费者,master server 中的 MasterSchedulerBootstrap loop 程序, MasterSchedulerBootstrap 使用 ZK 分配到自己的 slot,从 t_ds_command 表中 select 属于 slot 的 command 列表处理,其查询语句是:


<select id="queryCommandPageBySlot" resultType="org.apache.dolphinscheduler.dao.entity.Command">        select *        from t_ds_command        where id % #{masterCount} = #{thisMasterSlot}        order by process_instance_priority, id asc            limit #{limit}</select>
复制代码


iii.MasterSchedulerBootstrap loop 轮训查到待处理的 command 任务,将 command 任务和 master host 生成 ProcessInstance,将 ProcessInstance 对象插入到 t_ds_process_instance 表中, 同时生成包含运行所需要的上下文信息的可执行任务 workflowExecuteRunnable。 将 workflowExecuteRunnablecache 到本地 cache processInstanceExecCacheManager,同时生产将 ProcessInstance 的 WorkflowEventType.START_WORKFLOW 生产到 workflowEventQueue 队列中。


上面的步骤是用户在 Web 页面点击启动任务后的流程,而本次的问题是 Master 周期调度的问题。经过查阅资料,周期调度任务则是 MasterServer 将其封装为命令数据并插入 t_ds_process_instance 表中,后续步骤如上,大致流程如下:


  • 命令分发:以用户提交的工作流请求为触发,MasterServer 将其封装为命令数据并插入数据库中。

  • 任务分配:MasterServer 循环查询待处理的命令,依照负载情况将任务分配到对应的 ProcessInstance 中。

  • 任务执行:根据 DAG 的依赖关系,WorkerServer 会优先执行无依赖的任务,然后根据优先级逐步执行其他任务。

  • 状态反馈:任务执行过程中,WorkerServer 会定期回调 MasterServer,通知任务的进展和执行状态。


所以,上述的问题就在这,当Master从停止到启动时,t_ds_command中会产生大量的任务数据。


在 DolphinScheduler3.2.1 中,其 t_ds_command 数据样例为:


id  |command_type|process_definition_code|process_definition_version|process_instance_id|command_param                        |task_depend_type|failure_strategy|warning_type|warning_group_id|schedule_time      |start_time         |executor_id|update_time        |process_instance_priority|worker_group|tenant_code|environment_code|dry_run|test_flag|----+------------+-----------------------+--------------------------+-------------------+-------------------------------------+----------------+----------------+------------+----------------+-------------------+-------------------+-----------+-------------------+-------------------------+------------+-----------+----------------+-------+---------+1988|           6|         15921642898976|                         4|                  0|{"schedule_timezone":"Asia/Shanghai"}|               2|               1|           0|               0|2024-12-11 00:36:40|2024-12-11 00:39:01|          2|2024-12-11 00:39:01|                        2|default     |default    |              -1|      0|        0|1989|           6|         15921642898976|                         4|                  0|{"schedule_timezone":"Asia/Shanghai"}|               2|               1|           0|               0|2024-12-11 00:36:50|2024-12-11 00:39:01|          2|2024-12-11 00:39:01|                        2|default     |default    |              -1|      0|        0|1990|           6|         15921642898976|                         4|                  0|{"schedule_timezone":"Asia/Shanghai"}|               2|               1|           0|               0|2024-12-11 00:37:00|2024-12-11 00:39:01|          2|2024-12-11 00:39:01|                        2|default     |default    |              -1|      0|        0|
复制代码


而 command_type 的枚举由源码中的 CommandType 定义,其内容如下:


/** * command types * 0 start a new process * 1 start a new process from current nodes * 2 recover tolerance fault process * 3 recover suspended process * 4 start process from failure task nodes * 5 complement data * 6 start a new process from scheduler * 7 repeat running a process * 8 pause a process * 9 stop a process * 10 recover waiting thread * 11 recover serial wait * 12 start a task node in a process instance */START_PROCESS(0, "start a new process"),START_CURRENT_TASK_PROCESS(1, "start a new process from current nodes"),RECOVER_TOLERANCE_FAULT_PROCESS(2, "recover tolerance fault process"),RECOVER_SUSPENDED_PROCESS(3, "recover suspended process"),START_FAILURE_TASK_PROCESS(4, "start process from failure task nodes"),COMPLEMENT_DATA(5, "complement data"),SCHEDULER(6, "start a new process from scheduler"),REPEAT_RUNNING(7, "repeat running a process"),PAUSE(8, "pause a process"),STOP(9, "stop a process"),RECOVER_WAITING_THREAD(10, "recover waiting thread"),RECOVER_SERIAL_WAIT(11, "recover serial wait"),EXECUTE_TASK(12, "start a task node in a process instance"),DYNAMIC_GENERATION(13, "dynamic generation"),;
复制代码


那为什么会这样呢?本质上是 Master 自身的容错机制造成的,其容错机制可以细分为如下几个模块:


  • 1)Master 自身的容错:如果是多 Master 同时运行,其中一个作为 Active Master 负责处理任务调度请求,其他节点作为 Standby Master。当 Active Master 出现故障时,Standby Master 将自动接管其工作,确保系统的正常运行。这是通过 ZooKeeper 实现的,ZooKeeper 负责选举 Active Master 节点,并监控节点的状态。

  • 2)状态同步:多 Master 节点之间会进行状态同步,以确保在 Active Master 宕机时,Standby Master 能够接管任务调度。

  • 3)故障恢复:当 Master 节点宕机后,其他 Master 节点会通过 ZooKeeper 的 Watcher 机制监听到这一事件,并触发故障恢复。

  • 4)正在运行任务的容错:当前 Master 节点宕机后,新 Master 会通过已下线的 Master 的地址和正在运行的工作流状态数组获取需要容错的 ProcessInstance 列表,之后将其放入 t_ds_command 表中(后续流程就是 Master 获取到并调度+Worker 执行了)。

  • 5)分布式锁:在容错过程中,Master 节点会使用 ZK 分布式锁+采用指定 command 表分配 ID 的形式来确保只有一个 Master 节点执行容错操作,避免多个 Master 节点同时接管同一个任务。

  • 6)定时容错线程:除了 ZooKeeper 的事件触发容错外,DolphinScheduler 还实现了一个定时线程 FailoverExecuteThread,用于 Master 重启后恢复自身之前的工作流实例。

  • 7)任务重试:DolphinScheduler 还支持任务失败后的重试机制,这与服务宕机容错相辅相成,确保任务的最终执行成功。


所以,此时根据原理+复现可以初步推测出,是在 Master 启动时的某一个线程进行的定时容错,接下来就进入源码来真正验证一下。

04 源码解析

在 org.apache.dolphinscheduler.server.master.MasterServer 下,启动 Master 时会有 run 入口:


/** * run master server */@PostConstructpublic void run() throws SchedulerException {    // init rpc server    this.masterRPCServer.start();    // install task plugin    this.taskPluginManager.loadPlugin();    this.masterSlotManager.start();    // self tolerant    this.masterRegistryClient.start();    this.masterRegistryClient.setRegistryStoppable(this);    this.masterSchedulerBootstrap.start();    this.eventExecuteService.start();    this.failoverExecuteThread.start();    this.schedulerApi.start();    this.taskGroupCoordinator.start();    MasterServerMetrics.registerMasterCpuUsageGauge(() -> {        SystemMetrics systemMetrics = metricsProvider.getSystemMetrics();        return systemMetrics.getTotalCpuUsedPercentage();    });    MasterServerMetrics.registerMasterMemoryAvailableGauge(() -> {        SystemMetrics systemMetrics = metricsProvider.getSystemMetrics();        return (systemMetrics.getSystemMemoryMax() - systemMetrics.getSystemMemoryUsed()) / 1024.0 / 1024 / 1024;    });    MasterServerMetrics.registerMasterMemoryUsageGauge(() -> {        SystemMetrics systemMetrics = metricsProvider.getSystemMetrics();        return systemMetrics.getJvmMemoryUsedPercentage();    });    Runtime.getRuntime().addShutdownHook(new Thread(() -> {        if (!ServerLifeCycleManager.isStopped()) {            close("MasterServer shutdownHook");        }    }));}
复制代码


通过上面的代码,可以看到 Master 启动执行了:


  • masterRPCServer.start():初始化并启动 RPC 服务器,用于节点间通信。

  • taskPluginManager.loadPlugin():加载任务插件,这些插件可以扩展 DolphinScheduler 的任务类型。

  • masterSlotManager.start():启动 Master 的 Slot 管理器,它负责管理 Master 的资源槽位,用于任务调度。

  • masterRegistryClient.start():启动 Master 的注册客户端,它负责将 Master 节点注册到分布式协调服务(如 ZooKeeper)中。

  • masterRegistryClient.setRegistryStoppable(this):设置注册客户端的可停止对象,以便在 Master 停止时能够进行清理工作。

  • masterSchedulerBootstrap.start():启动 Master 的调度引导服务,它负责初始化调度相关的服务。

  • eventExecuteService.start():启动事件执行服务,它负责处理工作流中的事件,如任务状态变化。

  • failoverExecuteThread.start():启动故障恢复执行线程,它负责在 Master 宕机后恢复任务执行。

  • schedulerApi.start():启动调度 API 服务,提供调度相关的接口供外部调用。

  • taskGroupCoordinator.start():启动任务组协调器,它负责协调任务组内的任务执行。


经过源码探查,发现最关键的 failoverExecuteThread 不是重新执行未调度的周期任务,而是容错未执行完的任务。并且其他源码中也没有关于恢复周期任务调度的内容。


那现在需要换一个思路,就是从下往上走:


  1. 首先发现重启恢复后,Web 页面上的“运行类型”是“调度执行”,而数据库的“command_type”是“6”,那就意味着必须有一个服务会有往数据库里面去插入 command_type 为 6 的方法。并且其会去获取 t_ds_schedules 表中的任务定时调度实例。

  2. 根据源码,排查到 dolphinscheduler-dao 项目下会存放所有的数据库操作 DAO,遂可以找到 ScheduleMapper 类,此类是和 t_ds_schedules 相关的 DAO 类;之后根据 t_ds_command 反查,找到了 CommandServiceImpl 类中的 createCommand 方法;再根据两者反查+command_type 为 6,找到了 ProcessScheduleTask 类中的 executeInternal 方法。

  3. ProcessScheduleTask 类中的 executeInternal 方法,同时满足:获取了调度任务、插入 command 数据、类型为 6 这三个条件。

  4. 查看 ProcessScheduleTask 的 executeInternal 源码,前半部分是从 Quartz 上下文中获取到预定义的调度时间和调度实际运行时间,下半部分是校验这个调度 Cron 是否存在和上线。

  5. 在 executeInternal 中,最关键的其实就是 scheduledFireTime 和 fireTime。


找到这里的话,我们再结合 DolphinScheduler+Quartz 总结一下调度的原理:


  • Web 页面设置调度,其会通过 SchedulerController 中的 createSchedule()来创建调度,并往 t_ds_schedules 中插入一条数据;

  • Web 页面设置调度上线,其会通过 QuartzScheduler 中的 insertOrUpdateScheduleTask()向 Quartz 中创建 Trigger 触发器,并往 QRTZ_CRON_TRIGGERS 中插入一条数据;

  • 之后定期调用 ProcessScheduleTask 中的 executeInternal()来往 t_ds_command 中插入数据;

  • 之后就是 Master-Worker 的执行流程了;


了解了大致的调度流程后,结合源码中的 scheduledFireTime 和 fireTime,就可以推断出调度时间不是由 DolphinScheduler 设置的,而是由 Quartz 设置的。


那就继续查阅 Quartz 相关的资料,发现在 Quartz 中有一个 misfire 机制:周期性任务 A 需要在某个规定的时间执行,但是由于某种原因导致任务 A 未执行,称为 MisFire。


而 Quartz 判断一个任务是 MisFire,提供了一个配置项:org.quartz.jobStore.misfireThreshold,默认是 60000ms(即 60 秒)。


misfire 产生需要有 2 个前置条件:


  • 一个是 job 到达触发时间时没有被执行;

  • 二是被执行的延迟时间超过了 Quartz 配置的 misfireThreshold 阀值;


如果延迟执行的时间小于阀值,则 Quartz 不认为发生了 misfire,立即执行 job;如果延迟执行的时间大于或者等于阀值,则被判断为 misfire,然后会按照指定的策略来执行。


那 misfire 产生的原因一般如下:


  • 当 job 达到触发时间时,所有线程都被其他 job 占用,没有可用线程。;

  • 在 job 需要触发的时间点,scheduler 停止了(可能是意外停止的);【——当前的问题属于这种类型】

  • job 使用了 @DisallowConcurrentExecution 注解,job 不能并发执行,当达到下一个 job 执行点的时候,上一个任务还没有完成;

  • job 指定了过去的开始执行时间,例如当前时间是 8 点 00 分 00 秒,指定开始时间为 7 点 00 分 00 秒;


而判定了任务是 MisFire 后,会有一个补偿机制,补偿机制只有在任务确认为 MisFire 状态后,才会被执行。补偿机制配置在 Quartz 源码的 Trigger 中:


public interface Trigger extends Serializable, Cloneable, Comparable<Trigger> {    long serialVersionUID = -3904243490805975570L;    int MISFIRE_INSTRUCTION_SMART_POLICY = 0;    int MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY = -1;    int DEFAULT_PRIORITY = 5;    ......
复制代码


但是这个补偿机制需要根据 Trigger 来判定,如下是不同的 Trigger:


在 DolphinScheduler 中,各种类型的 Trigg 都会涉及到:


Trigger 的类型:


  • SimpleTrigger 是一个简单的触发器,用于执行重复任务。它可以指定一个起始时间,然后按照固定的间隔时间重复执行任务,直到达到指定的重复次数。SimpleTrigger 的属性包括重复间隔(repeatInterval)和重复次数(repeatCount),实际执行次数是 repeatCount + 1,因为在开始时间(startTime)时会执行一次。

  • CronTrigger:CronTrigger 使用 Cron 表达式来定义复杂的调度计划。Cron 表达式由 6 或 7 个空格分隔的时间字段组成,分别表示秒、分、小时、一个月中的日期、月份、一周中的日期和可选的年份。CronTrigger 允许设定非常复杂的触发时间表,基本上覆盖了其他触发器的绝大部分能力。

  • CalendarIntervalTrigger:CalendarIntervalTrigger 指定从某一个时间开始,以一定的时间间隔执行的任务。不同于 SimpleTrigger 只支持毫秒单位的时间间隔,CalendarIntervalTrigger 支持的间隔单位有秒、分钟、小时、天、月、年。它适合的任务类似于每周执行一次。

  • DailyTimeIntervalTrigger:DailyTimeIntervalTrigger 指定每天的某个时间段内,以一定的时间间隔执行任务。并且它可以支持指定星期。它适合的任务类似于每天 9:00 到 18:00,每隔 70 秒执行一次,并且只要周一至周五执行。

  • ......


所以,因为不同的 Trigger 类型其参数是不一样的,所以当 Trigger 触发 Misfire 机制时,根据 Trigger 的不同,策略也会不同:


/** 公共的Misfire机制,在Trigger类中 **/ // 这是一个智能策略,Quartz会根据Trigger的类型自动选择一个合适的misfire策略。对于CronTrigger,默认使用MISFIRE_INSTRUCTION_FIRE_ONCE_NOW。int MISFIRE_INSTRUCTION_SMART_POLICY = 0;// 这个策略会将所有错过的触发事件,立即执行所有补偿动作。即使定时任务执行的时间已经结束,它也会把所有应该执行的任务一次性全部执行完。int MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY = -1;/** SimpleTrigger的Misfire机制,在SimpleTrigger类中 **/ // 如果触发器错过了预定的触发时间,这个策略会立即执行一次任务,然后按照原计划继续执行后续的任务。int MISFIRE_INSTRUCTION_FIRE_NOW = 1;// 这个策略会将触发器的开始时间设置为当前时间,并立即执行错过的任务,包括已经错过的重复次数。int MISFIRE_INSTRUCTION_RESCHEDULE_NOW_WITH_EXISTING_REPEAT_COUNT = 2;// 类似于MISFIRE_INSTRUCTION_RESCHEDULE_NOW_WITH_EXISTING_REPEAT_COUNT,但是会忽略已经错过的触发次数,只执行剩余的重复次数。int MISFIRE_INSTRUCTION_RESCHEDULE_NOW_WITH_REMAINING_REPEAT_COUNT = 3;// 这个策略会忽略已经错过的触发次数,并在下一个预定的触发时间执行任务,执行剩余的重复次数。int MISFIRE_INSTRUCTION_RESCHEDULE_NEXT_WITH_REMAINING_COUNT = 4;// 类似于MISFIRE_INSTRUCTION_RESCHEDULE_NEXT_WITH_REMAINING_COUNT,但是会包括所有错过的重复次数。int MISFIRE_INSTRUCTION_RESCHEDULE_NEXT_WITH_EXISTING_COUNT = 5;/** CronTrigger的Misfire机制,在CronTrigger类中**/// 如果触发器错过了预定的触发时间,这个策略会立即执行一次任务,然后按照原计划继续执行后续的任务。int MISFIRE_INSTRUCTION_FIRE_ONCE_NOW = 1;// 对于CronTrigger,这个策略会忽略所有错过的触发事件,直接等待下一次预定的触发时间。int MISFIRE_INSTRUCTION_DO_NOTHING = 2;......
复制代码


在 QuartzScheduler 的 insertOrUpdateScheduleTask()中,用的只有 CronTrigger,其源码如下:


CronTrigger cronTrigger = newTrigger()        .withIdentity(triggerKey)        .startAt(startDate)        .endAt(endDate)        .withSchedule(                cronSchedule(cronExpression)                        .withMisfireHandlingInstructionIgnoreMisfires()                        .inTimeZone(DateUtils.getTimezone(timezoneId)))        .forJob(jobDetail).build();// 往下走public CronScheduleBuilder withMisfireHandlingInstructionIgnoreMisfires() {    this.misfireInstruction = -1;    return this;}
复制代码


其补偿机制采用的-1 编码,也就是会将所有错过的触发事件,立即执行所有补偿动作。所以此时就可以解释,为什么 Master 重启后,会将所有的未执行的周期任务,全部执行一次!!!


这个设置根据 Trigger 的不同,也可以分别设置不同的参数:


/** CronTrigger,引用CronScheduleBuilder中的设置 **/public CronScheduleBuilder withMisfireHandlingInstructionIgnoreMisfires() {    this.misfireInstruction = -1;    return this;}public CronScheduleBuilder withMisfireHandlingInstructionDoNothing() {    this.misfireInstruction = 2;    return this;}public CronScheduleBuilder withMisfireHandlingInstructionFireAndProceed() {    this.misfireInstruction = 1;    return this;}/** SimpleTrigger,引用SimpleScheduleBuilder的设置**/public SimpleScheduleBuilder withMisfireHandlingInstructionIgnoreMisfires() {    this.misfireInstruction = -1;    return this;}public SimpleScheduleBuilder withMisfireHandlingInstructionFireNow() {    this.misfireInstruction = 1;    return this;}public SimpleScheduleBuilder withMisfireHandlingInstructionNextWithExistingCount() {    this.misfireInstruction = 5;    return this;}public SimpleScheduleBuilder withMisfireHandlingInstructionNextWithRemainingCount() {    this.misfireInstruction = 4;    return this;}public SimpleScheduleBuilder withMisfireHandlingInstructionNowWithExistingCount() {    this.misfireInstruction = 2;    return this;}public SimpleScheduleBuilder withMisfireHandlingInstructionNowWithRemainingCount() {    this.misfireInstruction = 3;    return this;}
复制代码

05 解决方案

  • 将任务设置为“串行等待”。——可行,但无法发挥出大数据集群的并行化优势。并且有一个致命的缺陷,就是串行等待的任务无法在页面数手动停止,需要去到 t_ds_process_instance 中更改状态或删除数据。


  • Master HA:单机 Master 时,对 Master 设置守护,宕机后自动拉起(但也有无法拉起的时候);多机 Master 时,部署多台 Master,使其可以实现 HA。

  • DolphinScheduler 监控告警:持续监控 DolphinScheduler 运行状态,当有角色宕机后,及时发出告警信息(会有半夜宕机而运维人员没有及时发现告警信息的情况)。

  • 设置 DolphinScheduler 的 CPU 和内存使用阈值:在配置文件中,默认的 CPU 和内存阈值是 70%,以为着当服务器的 CPU 和内存占用达到了 70%后,DolphinScheduler 就不会在这台服务器上调度任务了。这种方式的好处是可以保证服务器资源不被打满,弊端是如果 Master 容错的旧任务打满了资源,那就会影响 DolphinScheduler 正常状态下的新任务了。并且有的任务是非常关键的任务,必须要跑成功的。

  • 设置 DolphinScheduler 的任务数:在配置文件中,DolphinScheduler 默认的任务数是单 Worker100 个,单 Master 是 1000 个。而在现网中,无法对任务数去做到精细的控制,并且 DolphinScheduler 也无法做到自动调配。

  • 在宕机后重新启动前删除 t_ds_command 表中的数据:经过验证,Master 在宕机后是不会往 t_ds_command 中写数据了。其会在重启启动后,将数据写到 t_ds_command 后执行,但其中的时间大概就 1~2 秒钟,手工无法去执行删除。

  • 修改 t_ds_process_instance 中的数据:根据时间周期,修改 t_ds_process_instance 中所有这个范围内的工作流的状态,人工使其结束(但如果 DolphinScheduler 和元数据库在一台服务器上,容易 DolphinScheduler 启动后里面把服务器资源打满,造成无法操作元数据库了)。


上面的解决方案主要是分为:


  • 避免或减少 Master 的宕机;

  • 在 Master 宕机后,不要运行 MisFire 的任务;


首先是“避免或减少 Master 宕机”,这在生产环境中是很难做到的,计算机程序的假设就是 100%会在某一个时刻产生某些问题,所以才有了各种微服务架构、高可用 HA、多活、容灾等等机制。


其次是“不要运行 MisFire 的任务”,依照前面的解决方案,没有一个方案能解决这个问题。所以,根据之前的源码解析,需要考虑采用源码修改+重新编译打包的方式进行解决。

06 修改源码

将关键源码修改为:


            CronTrigger cronTrigger = newTrigger()                    .withIdentity(triggerKey)                    .startAt(startDate)                    .endAt(endDate)                    .withSchedule(                            cronSchedule(cronExpression)                                    .withMisfireHandlingInstructionDoNothing()//                                    .withMisfireHandlingInstructionIgnoreMisfires()                                    .inTimeZone(DateUtils.getTimezone(timezoneId)))                    .forJob(jobDetail).build();
复制代码

07 开发环境验证

使用 Java8 进行。


更改 Master、worker、API 下的 application.yaml 中的 MySQL 链接信息:


spring:  config:    activate:      on-profile: mysql  datasource:    driver-class-name: com.mysql.cj.jdbc.Driver    url: jdbc:mysql://IP地址:3306/dolphinscheduler?useUnicode=true&characterEncoding=UTF-8&useSSL=false    username: 账号    password: 密码  quartz:    properties:      org.quartz.jobStore.driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate
复制代码


更改 Master、Worker、API 下的 Zookeeper 信息:


registry:  type: zookeeper  zookeeper:    namespace: dolphinscheduler_dev    connect-string: IP地址:2181    retry-policy:      base-sleep-time: 60ms      max-sleep: 300ms      max-retries: 5    session-timeout: 30s    connection-timeout: 9s    block-until-connected: 600ms    digest: ~
复制代码


更改 bom 下面的 pom:


            <dependency>                <groupId>mysql</groupId>                <artifactId>mysql-connector-java</artifactId>                <version>${mysql-connector.version}</version><!--                <scope>test</scope>-->            </dependency>
复制代码


更改 api、master、worker 下的 logback-spring.xml,开启运行日志:


    <root level="INFO"><!--        <if condition="${DOCKER:-false}">--><!--            <then>--><!--                <appender-ref ref="STDOUT"/>--><!--            </then>--><!--        </if>-->        <appender-ref ref="STDOUT"/>        <appender-ref ref="APILOGFILE"/>    </root>        <root level="INFO"><!--        <if condition="${DOCKER:-false}">--><!--            <then>--><!--                <appender-ref ref="STDOUT"/>--><!--            </then>--><!--        </if>-->        <appender-ref ref="STDOUT"/>        <appender-ref ref="TASKLOGFILE"/>        <appender-ref ref="MASTERLOGFILE"/>    </root>    <root level="INFO"><!--        <if condition="${DOCKER:-false}">--><!--            <then>--><!--                <appender-ref ref="STDOUT"/>--><!--            </then>--><!--        </if>-->        <appender-ref ref="STDOUT"/>        <appender-ref ref="TASKLOGFILE"/>        <appender-ref ref="WORKERLOGFILE"/>    </root>
复制代码


启动 Master、Worker、Api:


  • Master VM Options:-Dlogging.config=classpath:logback-spring.xml -Ddruid.mysql.usePingMethod=false -Dspring.profiles.active=mysql

  • Worker VM Options:-Dlogging.config=classpath:logback-spring.xml -Ddruid.mysql.usePingMethod=false -Dspring.profiles.active=mysql

  • Api VM Options:-Dlogging.config=classpath:logback-spring.xml -Dspring.profiles.active=api,mysql



如果报错:


Error running 'ApiApplicationServer'Error running ApiApplicationServer.Command line is too long. Shorten the command line via JAR manifest or via a classpath file and rerun.则加入:


如果还是报错缺少 MySQL 的 JDBC 驱动包,则在 Master、Woker、API 的 Pom 下面添加:


<dependency>    <groupId>mysql</groupId>    <artifactId>mysql-connector-java</artifactId>    <version>8.0.33</version></dependency>
复制代码

08 整体编译打包

需要注意,此时打包的项目,需要只是经过了《修改源码》环境的,不是进行了《开发环境验证》环节的!!!


使用 Java8 进行。


在项目根目录下执行命令,打包时间较长:


mvn spotless:apply clean package -Dmaven.test.skip=true -Prelease


打包后的二进制文件在 dolphinscheduler-dist/target 下 bin.tar.gz 后缀文件。


之后就可以尝试重新部署,验证是否解决上面的问题了。

09 只编译单个模块

去到 dolphinscheduler-scheduler-quartz 根目录下,执行:


mvn spotless:apply clean package -Dmaven.test.skip=true -Prelease


打包后的文件在 dolphinscheduler-scheduler-quartz/target 目录下:


将其在服务器上进行替换:


su dolphinscheduler -
mv /opt/module/dolphinscheduler-3.2.1/master-server/libs/dolphinscheduler-scheduler-quartz-3.2.1.jar /opt/module/dolphinscheduler-3.2.1/master-server/libs/dolphinscheduler-scheduler-quartz-3.2.1.jar.bakmv /opt/module/dolphinscheduler-3.2.1/api-server/libs/dolphinscheduler-scheduler-quartz-3.2.1.jar /opt/module/dolphinscheduler-3.2.1/api-server/libs/dolphinscheduler-scheduler-quartz-3.2.1.jar.bak
cp dolphinscheduler-scheduler-quartz-3.2.1.jar /opt/module/dolphinscheduler-3.2.1/master-server/libs/dolphinscheduler-scheduler-quartz-3.2.1.jarcp dolphinscheduler-scheduler-quartz-3.2.1.jar /opt/module/dolphinscheduler-3.2.1/api-server/libs/dolphinscheduler-scheduler-quartz-3.2.1.jar
chown -R dolphinscheduler:dolphinscheduler /opt/module/dolphinscheduler-3.2.1/
复制代码


之后就可以尝试重启 DolphinScheduler,验证是否解决上面的问题了。

10 问题解决

再次进行问题复现,发现问题已经被解决了:



至此,本次问题排查及修复完成。

用户头像

白鲸开源

关注

一家开源原生的DataOps商业公司。 2022-03-18 加入

致力于打造下一代开源原生的DataOps 平台,助力企业在大数据和云时代,智能化地完成多数据源、多云及信创环境的数据集成、调度开发和治理,以提高企业解决数据问题的效率,提升企业分析洞察能力和决策能力。

评论

发布
暂无评论
DolphinScheduler自身容错导致的服务器持续崩溃重大问题的排查与解决_大数据_白鲸开源_InfoQ写作社区