写点什么

🍃【Spring 专题】「开发指南」手把手教你将 @Schedule 任务调度升级为分布式调度 @DistributeSchedule

作者:洛神灬殇
  • 2022-11-10
    江苏
  • 本文字数:3115 字

    阅读完需:约 10 分钟

🍃【Spring专题】「开发指南」手把手教你将@Schedule任务调度升级为分布式调度@DistributeSchedule

背景介绍

很多小伙伴们都跟我留言说过一个类似的问题,就是针对于任务调度框架而言的选取,很多公司都会采用任务调度框架的鼻祖 Quartz,那么我们来梳理以下 Java 领域的任务调度框架吧。

Java 领域的定时任务的框架

单机级别任务调度

  • TimeTask:是一个定时器类,通过该类可以为指定的定时任务进行配置。TimerTask 类是一个定时任务类,该类实现了 Runnable 接口,缺点异常未检查会中止线程。

  • ScheduledExecutorService:相对延迟或者周期作为定时任务调度,缺点没有绝对的日期或者时间

  • spring 定时框架:配置简单功能较多,如果系统使用单机的话可以优先考虑 spring 定时器


给予 Java 原生的一些任务调度框架后有很多组织机构推陈出新,出现了很多的优秀的任务调度框架,它们除了可以实现任务调度,还可以实现了分布式体系下的运行任务。例如以下:

分布式级别任务调度

  • Quartz:Java 实现的定时任务调度框架,但 Quartz 关注点在于定时任务而非数据,并没有一套根据数据处理而定制化的流程。虽然 Quartz 可以基于数据库实现作业的高可用,但缺少分布式并行调度的功能。

  • TBSchedule:阿里早期开源的分布式任务调度系统,代码较为陈旧,使用 timer 而非线程池执行任务调度。众所周知,timer 在处理异常状况时是有缺陷的。而且 TBSchedule 作业类型较为单一,只能是获取/处理数据一种模式。还有就是文档缺失比较严重,而且目前已经没有对应的人进行维护了,(处理方式主要分为“抢占式”和“协同分配式”,通过集群的节点分担大批量任务的处理,提高批量任务的处理效率)。

  • Elastic-job:当当开发的弹性分布式任务调度系统,功能丰富强大,采用 zookeeper 实现分布式协调,实现任务高可用以及分片,并且可以支持云开发,但是开发的模式有点侵入性,但也是一个不错的选择方向

  • Saturn:是唯品会自主研发的分布式的定时任务的调度平台,基于当当的 elastic-job 版本开发的,并且可以很好的部署到 docker 容器上,目前本人还没有做过

  • Xxl-job: 是大众点评员工徐雪里于 2015 年发布的分布式任务调度平台,是一个轻量级分布式任务调度框架,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。

回到主旨如何建立 Spring 的分布式框架

这时候你会说,如果我们已经有了很多以上的分布式能力的任务调度框架了,我们为什么还要再进行改造 Spring 模式的任务调度框架呢,好的,就这事一个问题,如果当我们的任务调度不需要那么大的资源以及不能够依靠以上的开源框架去处理的时候,我们只能采用单机版的场景下,那该怎么办呢?比如说,公司不需要提供额外的部署分布式调度、资源不足不想暴漏任务调度服务的单点问题等场景、甚至很多人习惯了使用任务调度了@Schedule,你让他直接改成以上的开发模式,成本是否也会很高?这该咋办?那么种种不能使用以上开源的分布式任务调度框架的时候,才有了我们今天的课题系列。

废话不多说直接上干货

先说以下我们所需要的原材料有哪些,首先 Spring 框架是必须的,之后还需要就是可以连接 Redis 的组件如:jedis、lettuce、Redission 等。本次我们采用的是 Redission 框架,如果有用其他的小伙伴可以直接替换理念就好。

分析以下 Spring 的 @Schedule 的工作原理

@EnableScheduling 这个注解

以下是源码:



可以看出 Schedule 调度框架主要启动需要依靠 SchedulingConfiguration 这个类进行解析扫描所有的组件。我们深入进去看一下。



别的我们都不看,我们就关注一个类:ScheduledAnnotationBeanPostProcessor,相信对 Spring 了解的小伙伴们应该知道了,这个是一个注解注入的扫描器类,专门处理 bean 的生命周期的,专门讲 @Scheduled 的方法对应的类进行动态代理植入到对应的容器内,并且分配对应的任务调度触发器机制,以下就是 ScheduledAnnotationBeanPostProcessor 的处理类的源码。



再次我们重点关注代码,后置处理的类拦截机制,他会扫描出所有相关的 @Schedule 对应的类和方法:



然后进行循环调用 processScheduled 的方法:



我们再来看一下 processScheduled 方法里面最直接封装到对应的 schedule 调用机制的方法片段。

cron 模式
delay 模式
fix 模式


以上类型分为了三种模式:Cron 模式、fix 模式、delay 模式等,相信了解任务调度的小伙伴们并不陌生吧!

最重要的改造点来了

我们如何通过以上分析得出的结论讲我们的 @Schedule 改造为分布式模式的呢,关键点就在这里,下图标黄的位置 createRunnable 方法,这也是 Spring 提供给我们唯一可以改造的地方。



createRunnable



进入 createRunnable 方法之后,我们需要看一下最后创建出的对象是什么?就是这个 ScheduledMethodRunnable,进入这里的 ScheduledMethodRunnable 的类后可以看到如下



改造位置



这里就是真正要执行的代码,我们只需要再这里加入分布式锁,进行拦截,保证多个任务调度,同时只会有一个方法进行执行是否就可以保证了呢。但是如果你说需要进行分片处理,也是可以通过数据条件进行分隔,但是那又是另外一回事了。所以是不是很好就可以解决了?那我们来实现一下啊,以下就是小编完成的分布式改造后的成品。

我的分布式任务调度

定义属于我的分布式调度注解

我才用了加入前缀 Distribute 用来区分和原来的任务调度。

启动分布式任务调度机制
@Target(ElementType.TYPE)@Retention(RetentionPolicy.RUNTIME)@Import(DistributeSchedulingConfiguration.class)@Documentedpublic @interface EnableDistributeScheduling {}
复制代码
标识任务调度机制(1)

源码直接拷贝过来改一下改一下名称就好


@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})@Retention(RetentionPolicy.RUNTIME)@Documentedpublic @interface DistributeScheduleds {    DistributeScheduled[] value();}
复制代码
标识任务调度机制(2)

源码直接拷贝过来改一下改一下名称就好


对应的 EnableDistributeScheduling 的配置类服务
@Configuration@Role(BeanDefinition.ROLE_INFRASTRUCTURE)public class DistributeSchedulingConfiguration {
@Bean(name = TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME) @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public DistributeScheduledAnnotationBeanPostProcessor scheduledAnnotationProcessor() { return new DistributeScheduledAnnotationBeanPostProcessor(); }}
复制代码
定义 DistributeSchedulingConfiguration 中的 DistributeScheduledAnnotationBeanPostProcessor 处理器

只需要找原来的样子调整成为 return new DistributeScheduledMethodRunnable(target, invocableMethod,redisDistributionLock);即可。


 /**     * Create a {@link Runnable} for the given bean instance,     * calling the specified scheduled method.     * <p>The default implementation creates a {@link ScheduledMethodRunnable}.     * @param target the target bean instance     * @param method the scheduled method to call     * @since 5.1     * @see ScheduledMethodRunnable#ScheduledMethodRunnable(Object, Method)     */    protected Runnable createRunnable(Object target, Method method) {        Assert.isTrue(method.getParameterCount() == 0, "Only no-arg methods may be annotated with @DistributeScheduled");        Method invocableMethod = AopUtils.selectInvocableMethod(method, target.getClass());        RedisDistributionLock redisDistributionLock = applicationContext.getBean(RedisDistributionLock.class);        return new DistributeScheduledMethodRunnable(target, invocableMethod,redisDistributionLock);    }
复制代码
定义我的 DistributeScheduledMethodRunnable 的 runnable 对象

添加分布式锁组件



改造 run 方法



亲测有效!

发布于: 刚刚阅读数: 2
用户头像

洛神灬殇

关注

🏆InfoQ写作平台-签约作者🏆 2020-03-25 加入

【个人简介】酷爱计算机科学、醉心编程技术、喜爱健身运动、热衷悬疑推理的“极客达人” 【技术格言】任何足够先进的技术都与魔法无异 【技术范畴】Java领域、Spring生态、MySQL专项、微服务/分布式体系和算法设计等

评论

发布
暂无评论
🍃【Spring专题】「开发指南」手把手教你将@Schedule任务调度升级为分布式调度@DistributeSchedule_spring_洛神灬殇_InfoQ写作社区