写点什么

xxj-job 服务端架构流程

作者:IT巅峰技术
  • 2022-11-29
    上海
  • 本文字数:7094 字

    阅读完需:约 23 分钟

前  言

本篇我们主要讲一下《xxl-job 的调度流程》,在讲调度流程前,我们先概述一下:客户端接入流程、服务端配置流程和路由策略参数详解。

一、客户端接入流程

1 添加 Maven 依赖

<dependency>  <groupId>com.xuxueli</groupId>  <artifactId>xxl-job-core</artifactId>  <version>${选择合适的版本}</version></dependency>
复制代码

2 添加 xxl-job 的配置

在 application.yml 中添加 xxl-job 的配置


基础参数:


xxl.job.admin.addresses=http://127.0.0.1:8080/xxl-job-adminxxl.job.accessToken=default_tokenxxl.job.executor.appname=xxl-job-executor-samplexxl.job.executor.address=xxl.job.executor.ip=xxl.job.executor.port=9999xxl.job.executor.logpath=/data/applogs/xxl-job/jobhandlerxxl.job.executor.logretentiondays=30
复制代码


拓展参数,非必填


xxl.job.i18n=zh_CNxxl.job.triggerpool.fast.max=200xxl.job.triggerpool.slow.max=100
复制代码

3 代码创建执行函数

  1. 任务开发:在 Spring Bean 实例中,开发 Job 方法;

  2. 注解配置:为 Job 方法添加注解 "@XxlJob(value="自定义 jobhandler 名称", init = "JobHandler 初始化方法", destroy = "JobHandler 销毁方法")",注解 value 值对应的是调度中心新建任务的 JobHandler 属性的值。

  3. 执行日志:需要通过 "XxlJobHelper.log" 打印执行日志;

  4. 任务结果:默认任务结果为 "成功" 状态,不需要主动设置;如有诉求,比如设置任务结果为失败,可以通过:

  5. "XxlJobHelper.handleFail/handleSuccess" 自主设置任务结果;


@XxlJob(“demoJobHandler”)public void demoJobHandler() throws Exception {    XxlJobHelper.log(“XXL-JOB, Hello World.”);    for (int i = 0; i < 5; i++) {        XxlJobHelper.log(“beat at:” + i);        TimeUnit.SECONDS.sleep(2);    }    // default success}
复制代码


4 客户端配置参数说明:


二、服务端配置流程

1 执行器管理


  • AppName:执行组 Name,Name 相同的执行器视为同一个执行组

  • 名称:执行组中文名。

  • 注册方式:


  1. 自动注册:IP 地址由执行器上报,通常这样使用;

  2. 手动注册:手动输入执行器地址 IP,不建议使用。

2 任务管理


基础配置:


  • 执行器:任务的绑定的执行器,任务触发调度时将会自动发现注册成功的执行器, 实现任务自动发现功能; 另一方面也可以方便的进行任务分组。每个任务必须绑定一个执行器, 可在 "执行器管理" 进行设置;

  • 任务描述:任务的描述信息,便于任务管理;

  • 负责人:任务的负责人;

  • 报警邮件:任务调度失败时邮件通知的邮箱地址,支持配置多邮箱地址,配置多个邮箱地址时用逗号分隔。


触发配置:


  • CRON:触发任务执行的 Cron 表达式;

  • 固定速度:固件速度的时间间隔,单位为秒;

  • 固定延迟:固件延迟的时间间隔,单位为秒。


任务配置:


  • 运行模式:


(1)BEAN模式:任务以JobHandler方式维护在执行器端;需要结合 "JobHandler" 属性匹配执行器中任务;(2)GLUE模式(Java):任务以源码方式维护在调度中心;该模式的任务实际上是一段继承自IJobHandler的Java类代码并 "groovy" 源码方式维护,它在执行器项目中运行,可使用@Resource/@Autowire注入执行器里中的其他服务;(3)GLUE模式(Shell):任务以源码方式维护在调度中心;该模式的任务实际上是一段 "shell" 脚本;(4)GLUE模式(Python):任务以源码方式维护在调度中心;该模式的任务实际上是一段 "python" 脚本;(5)GLUE模式(PHP):任务以源码方式维护在调度中心;该模式的任务实际上是一段 "php" 脚本;(6)GLUE模式(NodeJS):任务以源码方式维护在调度中心;该模式的任务实际上是一段 "nodejs" 脚本;(7)GLUE模式(PowerShell):任务以源码方式维护在调度中心;该模式的任务实际上是一段 "PowerShell" 脚本;
复制代码


  • JobHandler:运行模式为 "BEAN 模式" 时生效,对应执行器中新开发的 JobHandler 类“@JobHandler”注解自定义的 value 值;

  • 执行参数:任务执行所需的参数;


高级配置


  • 路由策略:当执行器集群部署时,提供丰富的路由策略,包括;


(1)FIRST(第一个):固定选择第一个机器;(2)LAST(最后一个):固定选择最后一个机器;(3)ROUND(轮询):;(4)RANDOM(随机):随机选择在线的机器;(5)CONSISTENT\_HASH(一致性HASH):每个任务按照Hash算法固定选择某一台机器,且所有任务均匀散列在不同机器上。(6)LEAST\_FREQUENTLY\_USED(最不经常使用):使用频率最低的机器优先被选举;(7)LEAST\_RECENTLY\_USED(最近最久未使用):最久未使用的机器优先被选举;(8)FAILOVER(故障转移):按照顺序依次进行心跳检测,第一个心跳检测成功的机器选定为目标执行器并发起调度;(9)BUSYOVER(忙碌转移):按照顺序依次进行空闲检测,第一个空闲检测成功的机器选定为目标执行器并发起调度;(10)SHARDING\_BROADCAST(分片广播):广播触发对应集群中所有机器执行一次任务,同时系统自动传递分片参数;可根据分片参数开发分片任务;
复制代码


  • 子任务:每个任务都拥有一个唯一的任务 ID(任务 ID 可以从任务列表获取),当本任务执行结束并且执行成功时,将会触发子任务 ID 所对应的任务的一次主动调度。

  • 调度过期策略:


(1)忽略:调度过期后,忽略过期的任务,从当前时间开始重新计算下次触发时间;(2)立即执行一次:调度过期后,立即执行一次,并从当前时间开始重新计算下次触发时间;
复制代码


  • 阻塞处理策略:调度过于密集执行器来不及处理时的处理策略;


(1)单机串行(默认):调度请求进入单机执行器后,调度请求进入FIFO队列并以串行方式运行;(2)丢弃后续调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,本次请求将会被丢弃并标记为失败;(3)覆盖之前调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,将会终止运行中的调度任务并清空队列,然后运行本地调度任务;
复制代码


  • 任务超时时间:支持自定义任务超时时间,任务运行超时将会主动中断任务;

  • 失败重试次数;支持自定义任务失败重试次数,当任务失败时将会按照预设的失败重试次数主动进行重试;

三、路由策略参数详解




上面我们讲解了客户端接入流程、服务端配置流程和路由策略参数详解,接下来我们讲一下《xxj-job 服务端架构流程》

四、xxl-job 的调度流程

任务调度器和执行器使用 http 协议通信,各自有轮询线程处理不同业务。


五、xxl-job 的调度中心详解

1 XXL-JOB 的启动和销毁逻辑:

如代码可见,xxl-job 调度中心的启动和销毁,核心是处理几个线程池的创建和销毁。对每一个业务线程池,后续有详细讲解。


public class XxlJobScheduler  {    private static final Logger logger = LoggerFactory.getLogger(XxlJobScheduler.class);    public void init() throws Exception {        // init i18n        initI18n();        // admin trigger pool start        JobTriggerPoolHelper.toStart();        // admin registry monitor run        JobRegistryHelper.getInstance().start();        // admin fail-monitor run        JobFailMonitorHelper.getInstance().start();        // admin lose-monitor run ( depend on JobTriggerPoolHelper )        JobCompleteHelper.getInstance().start();        // admin log report start        JobLogReportHelper.getInstance().start();        // start-schedule  ( depend on JobTriggerPoolHelper )        JobScheduleHelper.getInstance().start();        logger.info(">>>>>>>>> init xxl-job admin success.");    }    public void destroy() throws Exception {        // stop-schedule        JobScheduleHelper.getInstance().toStop();        // admin log report stop        JobLogReportHelper.getInstance().toStop();        // admin lose-monitor stop        JobCompleteHelper.getInstance().toStop();        // admin fail-monitor stop        JobFailMonitorHelper.getInstance().toStop();        // admin registry stop        JobRegistryHelper.getInstance().toStop();        // admin trigger pool stop        JobTriggerPoolHelper.toStop();    }}
复制代码

2 任务触发线程池

任务触发线程池:JobTriggerPoolHelper.toStart();


启动两个执行任务的线程池,通常任务在 fastTriggerPool,统计一分钟内超时 10 次的任务,对超时任务再执行放进 slowTriggerPool。


// job-timeout 10 times in 1 min


public class JobTriggerPoolHelper {    private static Logger logger = LoggerFactory.getLogger(JobTriggerPoolHelper.class);    // ---------------------- trigger pool ---------------------    // fast/slow thread pool    private ThreadPoolExecutor fastTriggerPool = null;    private ThreadPoolExecutor slowTriggerPool = null;    public void start(){        fastTriggerPool = new ThreadPoolExecutor(                10,                XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax(),                60L,                TimeUnit.SECONDS,                new LinkedBlockingQueue<Runnable>(1000),                new ThreadFactory() {                    @Override                    public Thread newThread(Runnable r) {                        return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-fastTriggerPool-" + r.hashCode());                    }                });        slowTriggerPool = new ThreadPoolExecutor(                10,                XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax(),                60L,                TimeUnit.SECONDS,                new LinkedBlockingQueue<Runnable>(2000),                new ThreadFactory() {                    @Override                    public Thread newThread(Runnable r) {                        return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-slowTriggerPool-" + r.hashCode());                    }                });    }}
复制代码

3 执行器管理线程

执行器管理线程:


JobRegistryHelper.getInstance().start();


保证任务执行的时候拿到的执行器列表都是运行状态的


  1. 启动一个守护线程;

  2. 每隔三十秒,查询一次数据库 注册表 中自动注册的执行器;

  3. 删除超过 90 秒未再次注册(心跳)的执行器;

  4. 将执行器注册信息加载到内存 Map 中;

  5. 更新注册上了的执行器地址信息到 任务执行表 中。


public void start(){        // for monitor        registryMonitorThread = new Thread(new Runnable() {            @Override            public void run() {                while (!toStop) {                    // auto registry group                    List<XxlJobGroup> groupList = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().findByAddressType(0);                    if (groupList!=null && !groupList.isEmpty()) {                        // remove dead address (admin/executor)                        List<Integer> ids = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT, new Date());                        if (ids!=null && ids.size()>0) {                            XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(ids);                        }                        // fresh online address (admin/executor)                        HashMap<String, List<String>> appAddressMap = new HashMap<String, List<String>>();                        List<XxlJobRegistry> list = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT, new Date());                        if (list != null) {                            for (XxlJobRegistry item: list) {                                if (RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) {                                    String appname = item.getRegistryKey();                                    List<String> registryList = appAddressMap.get(appname);                                    if (!registryList.contains(item.getRegistryValue())) {                                        registryList.add(item.getRegistryValue());                                    }                                    appAddressMap.put(appname, registryList);                                }                            }                        }                        // fresh group address                        for (XxlJobGroup group: groupList) {                            XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().update(group);                        }                    }                    try {                        TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);                    } catch (InterruptedException e) {                        if (!toStop) {                            logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);                        }                    }                }                logger.info(">>>>>>>>>>> xxl-job, job registry monitor thread stop");            }        });        registryMonitorThread.setDaemon(true);        registryMonitorThread.setName("xxl-job, admin JobRegistryMonitorHelper-registryMonitorThread");        registryMonitorThread.start();    }}
复制代码

4 失败任务管理线程

失败任务管理线程:JobFailMonitorHelper.getInstance().start();


管理执行失败的任务,重试或者发送告警


  1. 每隔 10s 查询执行失败的任务;

  2. 如果设置重试次数,就进行重试操作;

  3. 如未设置重试次数,或已经重试超过重试次数,就发送告警信息(邮件或短信等)。

5 完成任务管理线程

完成任务管理线程:JobCompleteHelper.getInstance().start();


管理超时任务,或者执行器宕机的任务,做轮询补偿。


  1. 每隔 1min 查询查询状态未结束的任务;

  2. 如果距任务开始时间 10min 并且 注册执行器不在线,那么就标记任务执行结束。

6 日志管理线程

日志管理线程:JobLogReportHelper.getInstance().start();


统计任务执行成功率,删除过期日志。


  1. 每隔 1min 执行一次;

  2. 按天统计总任务数,成功和失败的个数,可通过 xxl.job.logretentiondays 配置天数 默认 30 天。

7 任务执行调度线程

任务执行调度线程:


JobScheduleHelper.getInstance().start();


scheduleThread:任务查询并计算执行时间线程


  1. 每一秒 查询数据库中执行时间在 当前时间 至 (当前时间 + 5s)区间的任务;

  2. 根据 CronHelp 类计算出下次执行时间;

  3. 将任务的下次执行时间写入数据库;

  4. 加载此次执行任务 Id 到缓存中。


使用 ConcurrentHashMap 缓存,Key 是分钟内的秒数(0-59),Value 是任务 Id 组成的数组


{    "1":[        251,        172    ],    "2":[        643,        172    ],    "39":[        273    ],    "59":[        188,        175    ]}
复制代码


ringThread: 任务执行线程


  1. 每一秒轮询一次,查找当前秒的任务 Id ;

  2. 根据任务 Id,查出任务详情,并投递到发送线程池;

  3. 发送线程池查询到执行器地址列表,根据配置的发送策略,通过 http 请求发送到执行器。


发送策略:(对应页面的路由策略)


六、附录

一致性哈希算法详解


private static int VIRTUAL_NODE_NUM = 100;public String hashJob(int jobId, List<String> addressList) {    // ------A1------A2-------A3------    // -----------J1------------------    // Address的hashCode为Key,address本身为Value;    TreeMap<Long, String> addressRing = new TreeMap<Long, String>();    for (String address: addressList) {        // 对Address进行 100 次取模,每次对Key+1,        for (int i = 0; i < VIRTUAL_NODE_NUM; i++) {            long addressHash = hash("SHARD-" + address + "-NODE-" + i);            addressRing.put(addressHash, address);        }    }    // 对任务Id取模,以Hash树最近的Address作为选定的    long jobHash = hash(String.valueOf(jobId));    SortedMap<Long, String> lastRing = addressRing.tailMap(jobHash);    return addressRing.firstEntry().getValue();}
复制代码





程序员的核心竞争力其实还是技术,因此对技术还是要不断的学习,关注 “IT 巅峰技术” 公众号 ,该公众号内容定位:中高级开发、架构师、中层管理人员等中高端岗位服务的,除了技术交流外还有很多架构思想和实战案例。


作者是 《 消息中间件 RocketMQ 技术内幕》 一书作者,同时也是 “RocketMQ 上海社区”联合创始人,曾就职于拼多多、德邦等公司,现任上市快递公司架构负责人,主要负责开发框架的搭建、中间件相关技术的二次开发和运维管理、混合云及基础服务平台的建设。

发布于: 刚刚阅读数: 4
用户头像

一线架构师、二线开发、三线管理 2021-12-07 加入

Redis6.X、ES7.X、Kafka3.X、RocketMQ5.0、Flink1.X、ClickHouse20.X、SpringCloud、Netty5等热门技术分享;架构设计方法论与实践;作者热销新书《RocketMQ技术内幕》;

评论

发布
暂无评论
xxj-job服务端架构流程_IT巅峰技术_InfoQ写作社区