写点什么

分布式任务调度的应用分享 | StartDT Tech Lab 13

用户头像
奇点云
关注
发布于: 2021 年 09 月 08 日
分布式任务调度的应用分享 | StartDT Tech Lab 13

这是奇点云全新技术专栏「StartDT Tech Lab」的第 13 期。

在这里,我们聚焦数据技术,分享方法论与实战。一线的项目经历,丰富的实践经验,真实的总结体会…滑到文末,可以看到我们的往期内容。

本篇由奇点云资深后端开发工程师「李四」带来:

作者:李四

阅读时间:约 10 分钟

概述

为了支撑 DataNuza(奇点云的数据化消费者运营平台,戳我看详情)运营相关工作,降低业务操作风险,提升生产操作质量,我们需要建设统一的任务调度系统,来实现任务集中管理,处理进度可视、客观、可控、可追溯,形成处理信息资源的有效管控。


任务平台需要支持以下几个方面:

1. 任务统一管理,提供图形化界面对任务进行配置和调度;


2. 任务并发控制,同个任务在同一时间只能允许一个执行;


3. 任务弹性扩容,可根据繁忙情况动态增减服务器分摊压力,对大任务进行分片处理;


4. 任务依赖问题,支持配置子任务依赖,当父任务执行结束后可以触发子任务的执行;


5. 支持多类型的任务,支持 Spring Bean、Shell 等;


6. 任务节点高可用,任务节点异常或者繁忙时能够转移到其它节点执行;


7. 调度中心高可用,支持集群部署,避免出现单点故障;


8. 执行状态监控,方便查看任务执行状态,异常情况告警,支持多渠道通知。

任务发展史

定时任务随着技术发展,从单线程调度到多线程调度,从单机部署到集群部署,从独立执行发展到多任务协同执行。

第一阶段 单线程调度:

// 基于线程的等待(sleep或wait)机制定时执行,需要开发者实现调度逻辑new Thread(() -> {  while (true) {    try {      Thread.sleep(5000L);      System.out.println("定时任务触发...");    } catch (InterruptedException e) {      e.printStackTrace();    }  }}).start();​​// 一个线程(Timer)处理多个任务容易因为某个任务繁忙导致其他任务阻塞TimerTask timerTask1 = new TimerTask() {  @Override  public void run() {    System.out.println("定时任务触发01...");  }};TimerTask timerTask2 = new TimerTask() {  @Override  public void run() {    System.out.println("定时任务触发02...");  }};Timer timer = new Timer();timer.scheduleAtFixedRate(timerTask1, 0, 5000);timer.scheduleAtFixedRate(timerTask2, 0, 1000);
复制代码

←左滑阅读

第二阶段 线程池调度:

由于 Timer 对调度的任务只有一个处理线程,因此增加了线程池来解决单线程的问题,在使用时,我们可以根据任务数量来定义线程池大小,以便隔离各个任务。

// 调度线程池支持固定的延时和固定间隔模式,对于需要在某个时间点执行不大方便,需要计算时间间隔,转换成启动延时和固定间隔,处理起来比较麻烦。Runnable r = () -> System.out.println("定时任务触发...");        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3);        scheduledExecutorService.scheduleAtFixedRate(r, 1, 1, TimeUnit.SECONDS);
复制代码

←左滑阅读

第三阶段 Spring 任务调度:

在上面阶段我们需要主动声明任务的线程池定义,但 spring 通过注解和配置的方式简化了这一步骤。

// Spring简化了任务调度,通过@Scheduled注解支持将某个Bean的方法定时执行,支持固定延时、固定间隔、cron表达式@Scheduled(cron = "0/1 * * * * *")public void springScheduled() {  System.out.println("定时任务触发...");}
复制代码

←左滑阅读

第四阶段 Quartz 任务调度:

Quartz 对任务调度的领域问题进行了高度的抽象,提出了调度器、任务和触发器这 3 个核心的概念,并对这三者提供了各自的一些特性。

在任务服务集群部署下,Quartz 可以通过数据库的行锁,实现任务的调度并发控制,避免同一个任务同时执行的情况。

public static void main(String[] args) throws SchedulerException {  // 1、创建Scheduler的工厂  SchedulerFactory sf = new StdSchedulerFactory();  // 2、从工厂中获取调度器实例  Scheduler scheduler = sf.getScheduler();​  // 3、创建JobDetail  JobDetail jb = JobBuilder.newJob(TestJob.class)    .withDescription("job desc")    .withIdentity("JobName", "JobGroup")    .build();​  // 任务运行的时间,SimpleSchedle类型触发器有效  long time = System.currentTimeMillis() + 3 * 1000L;  Date statTime = new Date(time);​  // 4、创建Trigger  Trigger t = TriggerBuilder.newTrigger()    .withDescription("")    .withIdentity("TriggerName", "TriggerGroup")    //.withSchedule(SimpleScheduleBuilder.simpleSchedule())    .startAt(statTime)    .withSchedule(CronScheduleBuilder.cronSchedule("0/2 * * * * ?"))    .build();​  // 5、注册任务和定时器  scheduler.scheduleJob(jb, t);​  // 6、启动调度器  scheduler.start();}​​public class TestJob implements Job {    @Override    public void execute(JobExecutionContext context) {        System.out.println("定时任务触发");    }}
复制代码

←左滑阅读

第五阶段 分布式任务平台:

提供一个统一的管理平台,无需再去做和调度相关的开发。业务系统只需要实现具体的任务逻辑,自动注册到任务调度平台,在上面进行相关的配置就完成了定时任务的开发。


分布式任务平台

下图是开源的任务调度平台 xxl-job 的架构图:

图源:https://www.xuxueli.com/xxl-job/


整个 xxl-job 系统,由调度中心和执行器两个角色组成。

1# 调度中心


负责管理调度信息,按照调度配置发出调度请求,自身不承担业务代码。

调度系统与任务解耦,提高了系统可用性和稳定性,同时调度系统性能不再受限于任务模块。

下面是调度中心启动过程:

com.xxl.job.admin.core.scheduler.XxlJobScheduler#init​public void init() throws Exception {  // init i18n  initI18n();​  // 初始化 fast/slow 任务调度线程池  JobTriggerPoolHelper.toStart();​  // 启动注册监控器  JobRegistryHelper.getInstance().start();​  // 启动失败日志监控(失败重试,失败邮件发送)  JobFailMonitorHelper.getInstance().start();​  // 启动任务完成执行日志、任务日志丢失监控  JobCompleteHelper.getInstance().start();​  // 启动调度器实时日志报告  JobLogReportHelper.getInstance().start();​  // 启动任务调度线程  JobScheduleHelper.getInstance().start();​  logger.info(">>>>>>>>> init xxl-job admin success.");}
复制代码

←左滑阅读

2# 执行器


负责接收调度请求并执行任务逻辑。任务模块专注于任务的执行等操作,开发和维护更加简单和高效。


执行器可以内嵌到应用服务里。例如,一个提供 RESTful API 的 Spring Boot 项目中,引入 xxl-job-core 依赖,同时也作为一个 xxl-job 执行器。

下面是执行器的启动过程

com.xxl.job.core.executor.impl.XxlJobSpringExecutor#afterSingletonsInstantiated​ @Override public void afterSingletonsInstantiated() {​  // 加载xxljob注解任务  initJobHandlerMethodRepository(applicationContext);​  // refresh GlueFactory  GlueFactory.refreshInstance(1);​  // super start  try {    super.start();  } catch (Exception e) {    throw new RuntimeException(e);  }}​// ---------------------- start + stop ----------------------public void start() throws Exception {​  // init logpath  XxlJobFileAppender.initLogPath(logPath);​  // 初始化注册中心列表  initAdminBizList(adminAddresses, accessToken);​  // 启动日志文件清理线程  JobLogFileCleanThread.getInstance().start(logRetentionDays);​  // 执行器任务结果回调  TriggerCallbackThread.getInstance().start();​  // netty http server, 接收调度中心指令(任务执行、心跳等)  initEmbedServer(address, ip, port, appname, accessToken);}
复制代码

←左滑阅读

DataNuza 中的分布式任务调度基于 xxl-job,做了一些功能增加和修改,如注册动态任务、任务重试机制、在途任务的追踪等。核心流程图如下:

· 注册动态任务

在 DataNuza 运行过程中会创建很多任务,这些任务的执行时间,执行周期都是不确定的,因此需要一个 cron 表达式的转换器,对不同的时间格式进行转换;在执行器把任务注册到调度中心后,调度中心会返回 job id,执行器需要持久化该 job id,后续可以通过该标识来对任务进行区分。

· 在途任务的追踪

任务执行时长的不确定性,可以对一些可拆分逻辑的任务进行监控(区别于任务分片),根据特定的编码方式实现任务中的部分内容重试、更细粒度的任务监控日志推送。

 

接着看一下调度中心的调度策略:

· 调度中心高可用

调度中心支持多节点部署,基于数据库行锁,保证同时只有一个调度中心节点触发任务调度。调度中心支持水平扩容。


com.xxl.job.admin.core.thread.JobScheduleHelper#start  // 数据库行锁conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();connAutoCommit = conn.getAutoCommit();conn.setAutoCommit(false);​preparedStatement = conn.prepareStatement("select * from xxl_job_lock where lock_name = 'schedule_lock' for update");preparedStatement.execute();
复制代码

· 路由策略

调度中心基于路由策略,路由选择一个执行器节点执行任务,xxl-job 提供了如下路由策略保证任务调度高可用:

    忙碌转移策略:下发任务前,向执行器节点发起 rpc 心跳请求查询是否忙碌,如果执行器节点返回忙碌,则转移到其他执行器节点执行。(参考:com.xxl.job.admin.core.route.strategy.ExecutorRouteBusyover)

    故障转移策略:下发任务前向执行器节点发起 rpc 心跳请求,查询是否在线,如果执行器节点没返回或者返回不可用,则转移到其他执行器节点执行。(参考:com.xxl.job.admin.core.route.strategy.ExecutorRouteFailover)

· 阻塞处理策略

当执行器节点存在多个相同任务 id 的任务未执行完成时,则需要基于阻塞策略对任务进行取舍:

    串行策略(默认策略,任务进行排队);

    丢弃旧任务策略;

    丢弃新任务策略。(参考:com.xxl.job.core.biz.impl.ExecutorBizImpl#run)

用户头像

奇点云

关注

AI驱动的数据中台创导者 2019.08.05 加入

还未添加个人简介

评论

发布
暂无评论
分布式任务调度的应用分享 | StartDT Tech Lab 13