这是奇点云全新技术专栏「StartDT Tech Lab」的第 13 期。
在这里,我们聚焦数据技术,分享方法论与实战。一线的项目经历,丰富的实践经验,真实的总结体会…滑到文末,可以看到我们的往期内容。
本篇由奇点云资深后端开发工程师「李四」带来:
作者:李四
阅读时间:约 10 分钟
概述
为了支撑 DataNuza(奇点云的数据化消费者运营平台,戳我看详情)运营相关工作,降低业务操作风险,提升生产操作质量,我们需要建设统一的任务调度系统,来实现任务集中管理,处理进度可视、客观、可控、可追溯,形成处理信息资源的有效管控。
任务平台需要支持以下几个方面:
1. 任务统一管理,提供图形化界面对任务进行配置和调度;
2. 任务并发控制,同个任务在同一时间只能允许一个执行;
3. 任务弹性扩容,可根据繁忙情况动态增减服务器分摊压力,对大任务进行分片处理;
4. 任务依赖问题,支持配置子任务依赖,当父任务执行结束后可以触发子任务的执行;
5. 支持多类型的任务,支持 Spring Bean、Shell 等;
6. 任务节点高可用,任务节点异常或者繁忙时能够转移到其它节点执行;
7. 调度中心高可用,支持集群部署,避免出现单点故障;
8. 执行状态监控,方便查看任务执行状态,异常情况告警,支持多渠道通知。
任务发展史
定时任务随着技术发展,从单线程调度到多线程调度,从单机部署到集群部署,从独立执行发展到多任务协同执行。
第一阶段 单线程调度:
// 基于线程的等待(sleep或wait)机制定时执行,需要开发者实现调度逻辑new Thread(() -> { while (true) { try { Thread.sleep(5000L); System.out.println("定时任务触发..."); } catch (InterruptedException e) { e.printStackTrace(); } }}).start();// 一个线程(Timer)处理多个任务容易因为某个任务繁忙导致其他任务阻塞TimerTask timerTask1 = new TimerTask() { @Override public void run() { System.out.println("定时任务触发01..."); }};TimerTask timerTask2 = new TimerTask() { @Override public void run() { System.out.println("定时任务触发02..."); }};Timer timer = new Timer();timer.scheduleAtFixedRate(timerTask1, 0, 5000);timer.scheduleAtFixedRate(timerTask2, 0, 1000);
复制代码
←左滑阅读
第二阶段 线程池调度:
由于 Timer 对调度的任务只有一个处理线程,因此增加了线程池来解决单线程的问题,在使用时,我们可以根据任务数量来定义线程池大小,以便隔离各个任务。
// 调度线程池支持固定的延时和固定间隔模式,对于需要在某个时间点执行不大方便,需要计算时间间隔,转换成启动延时和固定间隔,处理起来比较麻烦。Runnable r = () -> System.out.println("定时任务触发..."); ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3); scheduledExecutorService.scheduleAtFixedRate(r, 1, 1, TimeUnit.SECONDS);
复制代码
←左滑阅读
第三阶段 Spring 任务调度:
在上面阶段我们需要主动声明任务的线程池定义,但 spring 通过注解和配置的方式简化了这一步骤。
// Spring简化了任务调度,通过@Scheduled注解支持将某个Bean的方法定时执行,支持固定延时、固定间隔、cron表达式@Scheduled(cron = "0/1 * * * * *")public void springScheduled() { System.out.println("定时任务触发...");}
复制代码
←左滑阅读
第四阶段 Quartz 任务调度:
Quartz 对任务调度的领域问题进行了高度的抽象,提出了调度器、任务和触发器这 3 个核心的概念,并对这三者提供了各自的一些特性。
在任务服务集群部署下,Quartz 可以通过数据库的行锁,实现任务的调度并发控制,避免同一个任务同时执行的情况。
public static void main(String[] args) throws SchedulerException { // 1、创建Scheduler的工厂 SchedulerFactory sf = new StdSchedulerFactory(); // 2、从工厂中获取调度器实例 Scheduler scheduler = sf.getScheduler(); // 3、创建JobDetail JobDetail jb = JobBuilder.newJob(TestJob.class) .withDescription("job desc") .withIdentity("JobName", "JobGroup") .build(); // 任务运行的时间,SimpleSchedle类型触发器有效 long time = System.currentTimeMillis() + 3 * 1000L; Date statTime = new Date(time); // 4、创建Trigger Trigger t = TriggerBuilder.newTrigger() .withDescription("") .withIdentity("TriggerName", "TriggerGroup") //.withSchedule(SimpleScheduleBuilder.simpleSchedule()) .startAt(statTime) .withSchedule(CronScheduleBuilder.cronSchedule("0/2 * * * * ?")) .build(); // 5、注册任务和定时器 scheduler.scheduleJob(jb, t); // 6、启动调度器 scheduler.start();}public class TestJob implements Job { @Override public void execute(JobExecutionContext context) { System.out.println("定时任务触发"); }}
复制代码
←左滑阅读
第五阶段 分布式任务平台:
提供一个统一的管理平台,无需再去做和调度相关的开发。业务系统只需要实现具体的任务逻辑,自动注册到任务调度平台,在上面进行相关的配置就完成了定时任务的开发。
分布式任务平台
下图是开源的任务调度平台 xxl-job 的架构图:
图源:https://www.xuxueli.com/xxl-job/
整个 xxl-job 系统,由调度中心和执行器两个角色组成。
1# 调度中心
负责管理调度信息,按照调度配置发出调度请求,自身不承担业务代码。
调度系统与任务解耦,提高了系统可用性和稳定性,同时调度系统性能不再受限于任务模块。
下面是调度中心启动过程:
com.xxl.job.admin.core.scheduler.XxlJobScheduler#initpublic void init() throws Exception { // init i18n initI18n(); // 初始化 fast/slow 任务调度线程池 JobTriggerPoolHelper.toStart(); // 启动注册监控器 JobRegistryHelper.getInstance().start(); // 启动失败日志监控(失败重试,失败邮件发送) JobFailMonitorHelper.getInstance().start(); // 启动任务完成执行日志、任务日志丢失监控 JobCompleteHelper.getInstance().start(); // 启动调度器实时日志报告 JobLogReportHelper.getInstance().start(); // 启动任务调度线程 JobScheduleHelper.getInstance().start(); logger.info(">>>>>>>>> init xxl-job admin success.");}
复制代码
←左滑阅读
2# 执行器
负责接收调度请求并执行任务逻辑。任务模块专注于任务的执行等操作,开发和维护更加简单和高效。
执行器可以内嵌到应用服务里。例如,一个提供 RESTful API 的 Spring Boot 项目中,引入 xxl-job-core 依赖,同时也作为一个 xxl-job 执行器。
下面是执行器的启动过程
com.xxl.job.core.executor.impl.XxlJobSpringExecutor#afterSingletonsInstantiated @Override public void afterSingletonsInstantiated() { // 加载xxljob注解任务 initJobHandlerMethodRepository(applicationContext); // refresh GlueFactory GlueFactory.refreshInstance(1); // super start try { super.start(); } catch (Exception e) { throw new RuntimeException(e); }}// ---------------------- start + stop ----------------------public void start() throws Exception { // init logpath XxlJobFileAppender.initLogPath(logPath); // 初始化注册中心列表 initAdminBizList(adminAddresses, accessToken); // 启动日志文件清理线程 JobLogFileCleanThread.getInstance().start(logRetentionDays); // 执行器任务结果回调 TriggerCallbackThread.getInstance().start(); // netty http server, 接收调度中心指令(任务执行、心跳等) initEmbedServer(address, ip, port, appname, accessToken);}
复制代码
←左滑阅读
DataNuza 中的分布式任务调度基于 xxl-job,做了一些功能增加和修改,如注册动态任务、任务重试机制、在途任务的追踪等。核心流程图如下:
· 注册动态任务
在 DataNuza 运行过程中会创建很多任务,这些任务的执行时间,执行周期都是不确定的,因此需要一个 cron 表达式的转换器,对不同的时间格式进行转换;在执行器把任务注册到调度中心后,调度中心会返回 job id,执行器需要持久化该 job id,后续可以通过该标识来对任务进行区分。
· 在途任务的追踪
任务执行时长的不确定性,可以对一些可拆分逻辑的任务进行监控(区别于任务分片),根据特定的编码方式实现任务中的部分内容重试、更细粒度的任务监控日志推送。
接着看一下调度中心的调度策略:
· 调度中心高可用
调度中心支持多节点部署,基于数据库行锁,保证同时只有一个调度中心节点触发任务调度。调度中心支持水平扩容。
com.xxl.job.admin.core.thread.JobScheduleHelper#start // 数据库行锁conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();connAutoCommit = conn.getAutoCommit();conn.setAutoCommit(false);preparedStatement = conn.prepareStatement("select * from xxl_job_lock where lock_name = 'schedule_lock' for update");preparedStatement.execute();
复制代码
· 路由策略
调度中心基于路由策略,路由选择一个执行器节点执行任务,xxl-job 提供了如下路由策略保证任务调度高可用:
忙碌转移策略:下发任务前,向执行器节点发起 rpc 心跳请求查询是否忙碌,如果执行器节点返回忙碌,则转移到其他执行器节点执行。(参考:com.xxl.job.admin.core.route.strategy.ExecutorRouteBusyover)
故障转移策略:下发任务前向执行器节点发起 rpc 心跳请求,查询是否在线,如果执行器节点没返回或者返回不可用,则转移到其他执行器节点执行。(参考:com.xxl.job.admin.core.route.strategy.ExecutorRouteFailover)
· 阻塞处理策略
当执行器节点存在多个相同任务 id 的任务未执行完成时,则需要基于阻塞策略对任务进行取舍:
串行策略(默认策略,任务进行排队);
丢弃旧任务策略;
丢弃新任务策略。(参考:com.xxl.job.core.biz.impl.ExecutorBizImpl#run)
评论