写点什么

【Nacos 源码之配置管理 三】TaskManager 任务管理的使用

  • 2022 年 10 月 06 日
    江西
  • 本文字数:2107 字

    阅读完需:约 7 分钟

【Nacos源码之配置管理 三】TaskManager 任务管理的使用

作者石臻臻,CSDN 博客之星 Top5Kafka Contributornacos Contributor华为云 MVP,腾讯云 TVP,滴滴 Kafka 技术专家 KnowStreaming


KnowStreaming 是滴滴开源的Kafka运维管控平台, 有兴趣一起参与参与开发的同学,但是怕自己能力不够的同学,可以联系我,当你导师带你参与开源!

1 任务管理类

因为 Nacos 中有很多地方使用了这个 TaskManager,所以我们得先了解一下这个类是干啥用的,方便后面阅读源码时候不会吃力;

先说结论:TaskManager 可以看成是一个待执行的任务集合,用于处理一定要执行成功的任务 单线程的方式处理任务,保证任务一定被成功处理; 如果执行失败了,任务会被重新放入集合中等待下一次被消费;

AbstractTask

AbstractTask 是个抽象类,所有的需要被执行的任务都继续这个类; 这个类主要提供执行任务所需要的数据和方法;例如

   /* 一个任务两次处理的间隔,单位是毫秒*/    private long taskInterval;    /*任务上次被处理的时间,用毫秒表示*/    private long lastProcessTime;/* TaskManager 判断当前是否需要处理这个Task,子类可以Override这个函数实现自己的逻辑     */    public boolean shouldProcess() {        return (System.currentTimeMillis() - this.lastProcessTime >= this.taskInterval);    }
复制代码

TaskProcessor 任务处理器

TaskProcessor 是任务处理器接口,它有个方法

boolean process(String taskType, AbstractTask task);
复制代码

用于执行对应的 AbstractTask 任务类; 不同的任务类型,可以实现自己的执行任务逻辑;

TaskManager 任务管理类

TaskManager 是个任务管理类;它里面有两个属性保存了待消费的任务 AbstractTask,和任务执行需要的 TaskProcessor;

/**待消费的任务AbstractTask**/private final ConcurrentHashMap<String, AbstractTask> tasks = new ConcurrentHashMap<String, AbstractTask>();/**任务AbstractTask对应的任务执行器TaskProcessor**/ private final ConcurrentHashMap<String, TaskProcessor> taskProcessors =new ConcurrentHashMap<String, TaskProcessor>();
复制代码

如果 taskProcessors 中没有找到对应的任务执行器,那么它里面有一个默认执行器会执行

 /**默认执行器**/ private TaskProcessor defaultTaskProcessor;
复制代码

2 使用用例

Nacos 配置中心模块很重要一个功能就是,在初始化的时候以及每隔一段时间就会去数据库中把所有数据 Dump 到磁盘中;Dump 就是一个任务类 AbstractTask; 我们上面说过 AbstractTask 就是一个信息承载对象,主要给 TaskProcessor 提供执行所需要的数据;我们看看 DumpTask;

DumpTask


DumpTask 定义了自己的一些属性; 再看看其他的例如 DumpAllTaskDumpAllBetaTask


这两个任务类只定义了 TASK_ID 既然有 DumpTask 任务类,那肯定就有对应的任务处理器类 DumpProcessor;

DumpProcessor

DumpProcessor 是 DumpTask 任务的执行器;执行器中的方法

public boolean process(String taskType, AbstractTask task)
复制代码

代码太长就不在这里分析了,它里面主要做的操作就是保存配置文件到本地磁盘中,并缓存 md5 详细可以看文章【Nacos源码之配置管理 四】DumpService如何将配置文件全部Dump到磁盘中

对应 DumpAllTaskDumpAllBetaTask 任务的任务执行器有 DumpAllProcessorDumpAllBetaProcessor

3DumpAllTask 任务触发执行的地方

上面是 DumpAllTask 的定义和 DumpAllTaskProcessor 执行器的定义;定义好了之后是怎么被触发的呢?

DumpService 初始化 Dump 配置信息

这个类就是专门 Dump 配置信息的服务类;上面提及的 DumpAll 就是在这里被调用的;我们来看下他主要方法;

    @PostConstruct    public void init() {        DumpAllProcessor dumpAllProcessor = new DumpAllProcessor(this);      /**在new这个TaskManager类的时候,专门执行任务的一个线程就已经开始启动了,这不过这个时候还没有任务Task添加进去**/       dumpAllTaskMgr = new TaskManager( "com.alibaba.nacos.server.DumpAllTaskManager");dumpAllTaskMgr.setDefaultTaskProcessor(dumpAllProcessor);      Runnable dumpAll = new Runnable() {            @Override            public void run() {dumpAllTaskMgr.addTask(DumpAllTask.TASK_ID, new DumpAllTask());            }        };            /**每10分钟执行一次DumpAll操作**/      TimerTaskService.scheduleWithFixedDelay(dumpAll, initialDelay, DUMP_ALL_INTERVAL_IN_MINUTE,                TimeUnit.MINUTES);    }
复制代码

DumpService 在初始化的时候回调用这个 init 方法;1.先 new 了一个 DumpAllProcessor 执行器;2.再 new 了一个 TaskManager 任务管理器;在 new 这个任务管理器的时候,就会启动一个线程专门去执行所有待执行的任务;只不过这个时候还没有添加任务;3.将这个任务管理器的默认执行器设置为 DumpAllProcessor;4.每十分钟执行一次往 TaskManager 中添加一个 DumpAllTask 的任务;一经添加就会被 TaskManager 中的线程processingThread执行 process 方法;


发布于: 刚刚阅读数: 3
用户头像

关注公众号: 石臻臻的杂货铺 获取最新文章 2019.09.06 加入

进高质量滴滴技术交流群,只交流技术不闲聊 加 szzdzhp001 进群 20w字《Kafka运维与实战宝典》PDF下载请关注公众号:石臻臻的杂货铺

评论

发布
暂无评论
【Nacos源码之配置管理 三】TaskManager 任务管理的使用_nacos_石臻臻的杂货铺_InfoQ写作社区