写点什么

分布式任务批处理技术选型与实践

作者:苏格拉格拉
  • 2022-11-07
    浙江
  • 本文字数:4136 字

    阅读完需:约 14 分钟

一、背景

后端系统有些时候需要处理一些任务,如刷新缓存;甚至有的系统天生就是为了处理任务,如保险续期系统,即不断扫描出窗口期到宽限期之间的保单,对其进行续期操作。

如何对这些任务进行调度、管理,如何找到一个合适的架构方案,是非常重要的。


前面举的两个例子,都是笔者自身经历的两个案例。

后者保险续期系统,是蚂蚁保险在职期间所维护的项目,其基于 schedulerX 开发的任务调度与消息分发,可以说是一套比较稳定合适的方案。

前者缓存刷新任务,是当前正在面临的一个技术难题,当前任务主要有两大问题:

1.基于 ElasticJob 的单机调度,并发处理能力有限,单次全量刷新的任务需要执行 3 个小时。

2.基于任务触发的调度机制,其并没有持久化的任务状态做支持,任务执行到哪了、成功还是失败不容易管理。


基于以上的问题,需要对当前的任务进行升级,使其满足两个功能:

1.负载均衡:任务能够进行分片,并且能够在集群下多机并行处理,可以通过扩容有效提升处理速度。

2.任务的持久化、可视化:可以方便的看到任务的执行进度、执行结果。

二、开源技术

基于以上需求,首先想到的就是寻找合适的开源技术框架。

1.当前的任务调度框架

2.原先的并非开源

既然没有现成合适的方案,功能也还算通用,那就自研好了。

三、自研前的调研

说是自研,倒不如说是山寨。为了确保山寨的质量,得先大致回顾一下原厂的功能。

1.系统流程

任务分片(map)

1.每日 0 点,调度系统会初始化当日的任务及分片,数据保存在任务表。

2.任务注册:有哪些任务,由业务系统在 xml 或注解中配置

3.分片规则:业务系统实现分片规则,由框架在初始化时回调

比如这里生成一个当天的续期任务,有 1000 个分片,这样理论上任务可以被拆分成 1000 次处理。


任务调度(process)

1.执行任务时,框架会回调业务系统实现的 process 接口,调用时会提供分片上下文信息

2.业务逻辑:业务系统感知当前分片,做当前分片该做的事情,然后返回继续执行或者执行完成。

比如这里回调到续期任务,分片是 012:那么当前任务需要去查找第 012 个分表,处理其中的续期任务。

因为当前分表需要处理的任务非常多,不可能一次处理完,所以一次只处理 100 条,处理完成后更新数据库状态,确保下次不会再被捞出来。然后给接口返回“继续执行”,直到查不到需要处理的数据。


任务完结(reduce)

当所有分片的任务都处理完成,框架会回调其中一台服务器的完结接口。

业务系统可以在里面进行实现,如发送通知,也可以不实现。

比如这里打了一个日志,然后预期每天下午 7 点前任务全部跑完。如果 7 点还没监控到这个日志,则报警,人工介入。

2.分片调度

因为在调度与分片这块比较复杂,所以需要单独说明一下。

可以看到,这里有 1000 张任务表,同时业务表也刚好有 1000 个分表。

这不是巧合,而是刻意设计的。因为业务表有 1000 张,所以在分片的规则上,就设计了 1000 个分片。

因为每台实例的代码都是一样的,每个分片的任务,不论是路由到哪台服务器执行,都会去找这个分片对应的表去处理其中的任务。当然你也可以反过来,让分片 1 的任务处理业务表 1000 的业务,让分片 1000 的任务处理业务表 1 的业务。

分片与业务的对应关系一定要一对一吗,能否是一对多、或是多对一呢?

如果是一个任务表对应多个业务表。那么在处理任务的时候,需要查找多个数据库,即使是其中一个表的任务完成了,但还是会被调用到。这样会造成资源浪费的情况。

如果是多个任务表对应一个业务表。那么多个任务都是执行同一个表,这样会造成并发问题,或是锁的竞争问题。


在有分表的任务场景下,根据分表进行任务分片是一个不错的选择,前提是分表数量要大于服务器实例数量,不然任务都分不均匀。

但是如果有的系统没有分表,那该怎么办?比如说我就是要刷新一批数据的缓存,而手上只有一张千万级别数据的大表。

这里有两个方案:

1.以要处理数据的某个维度作为分片。

比如说刷新 1000 个门店的缓存,这些门店又有 10 个商品作为笛卡尔积,一共 10000key。那就有 100 个分片。

这里就可以将每个 key 作为分片,考虑到 key 比较多,也可以将门店作为分片。

2.有的时候并不容易找到一个合适的维度,我们会想到对大维度进行取模。但是取模后查询走索引又不方便,则可以增加一个 shardingkey 字段,以次作为分片(变相实现了 db 的后缀索引)。

比如上述场景,缓存有商品、门店两个维度,商品一共 10 个,门店一共 10000 个。大的太大,占据数据库容量;小的太小,很容易分片不均。

我们期望有 100-1000 这个范围的分片策略。那可以把门店进行 100 取模,模后的数据作为 shardingkey 字段保存。

四、自研的方案

1.需求分析

运营一次操作更新了一批价格,价格由三个重要维度决定:门店、商品、渠道。

系统要做的是触发“刷新所有受影响维度的缓存”的任务。

非功能需求是:

1 这个任务能够被分片,分发到多个实例快速执行完成;

2 执行的进度、结果可以追溯

2.分片规则

综合数据量与实例数量的考虑,分片定在 1000 个。

数据库压力:以 3 年可用估算,运营一天操作 50 次,一共操作 5w 次,每次 1k 数据量,一共 5kw 数据量可以接受。

并行加速:集群多台机器,刷新缓存这种 io 密集型操作,可以开较多线程,分片数量可以尽量多。

因为业务的特点,运营单次操作涉及商品、渠道数量较少,所以不适合做为分片根据。

1w 个门店,门店 id 按 1000 取模作为分片规则即可。

3.系统流程

系统流程主要分为任务创建、任务调度、任务完结三部分。

任务创建:任务初始化

任务调度:任务分发、业务系统处理任务

任务完结:任务状态更新到已完成、回调业务完结接口

3.1.创建任务

任务创建可以由业务系统调用,因为此处的业务场景是运营操作,属于触发型动作;对于日常类型的任务,业务系统可以通过定时任务手动创建,这里就不做实现了。

任务分片:任务创建会调用业务系统提供的分片策略,用于生成任务分片。

		  public interface JobShardingStrategy {
List<TaskSplit> sharding(Task task);
}
复制代码

3.2.任务调度


任务分发:单台实例周期性扫描待处理任务,将任务分片丢到 mq 中,由消息队列进行负载均衡。

任务执行:任务实例只要监听 mq,就可以领取单个分片的任务,然后进行当前分片的任务执行。

关于任务单次处理与任务分多次处理:

有些任务比较简单,一个分片的任务仅需要数秒,这样单次处理就行了,就算成功了一半,全部重试就是了。

还有一些任务比较复杂,一个分片的任务需要处理个把小时。在这种场景下,业务系统可以维护每个分片的任务自身的进度状态,每次处理一部分,然后返回重试,等待下次调度。

简而言之:任务框架的功能是多机分片调度,复杂的任务可以在分片里面由业务系统自行管理进度。

三种返回结果类型:

  • 完成:当前分片任务完成,不再调度当前分片,状态标记为已完成。

  • 停止:业务需要停止当前分片任务的情况,不再调度当前分片,状态标记为已停止。

  • 重试:分片任务比较复杂,当前完成了一部分,需要重新调度。

  • 异常:会记录失败次数,并且重新调度,失败到一定次数后将不再调度。

3.3.任务完结

4.数据模型

CREATE TABLE `task` (	    `id` bigint unsigned NOT NULL AUTO_INCREMENT,	    `status` varchar(10) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL,	    `create_time` datetime NOT NULL,	    `update_time` datetime NOT NULL ON UPDATE CURRENT_TIMESTAMP,	    `task_type` int NOT NULL,	    `task_time` datetime NOT NULL,	    `biz_data` varchar(1000) DEFAULT NULL,	    PRIMARY KEY (`id`)	  ) ENGINE=InnoDB AUTO_INCREMENT=13 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;	  CREATE TABLE `task_split` (	    `id` bigint unsigned NOT NULL AUTO_INCREMENT,	    `task_id` bigint NOT NULL,	    `status` varchar(10) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL,	    `create_time` datetime NOT NULL,	    `update_time` datetime NOT NULL ON UPDATE CURRENT_TIMESTAMP,	    `task_type` int NOT NULL,	    `task_time` datetime NOT NULL,	    `exec_count` bigint NOT NULL DEFAULT '0',	    `biz_data` varchar(1000) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,	    PRIMARY KEY (`id`)	  ) ENGINE=InnoDB AUTO_INCREMENT=618 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
复制代码

5.状态机

创建任务:任务、分片状态为 INIT

任务分发:分片状态更新 INIT->EXEC

任务处理:分片状态更新 EXEC->SUC、EXEC->STOP

任务完结:任务状态 INIT->SUC、INIT->FIN

6.业务系统视角

这里略去引入 jar 包、初始化数据库的步骤。

6.1.注册任务

确定任务类型

实现`TaskTypeEnum`接口,通过枚举类维护不同的任务类型

@AllArgsConstructorpublic enum MyTaskTypeEnum implements TaskTypeEnum {  TYPE1(0), TYPE2(1);- private final int type;- @Override  public Integer getType() {      return type;  }}
复制代码

实现任务策略

实现`TaskStrategy`接口,实现以下功能:

  • 返回任务类型

  • 提供任务分片规则

  • 提供业务处理逻辑

  • 提供任务完结回调

	  @Service
public class MyTaskStrategy implements TaskStrategy {
TaskTypeEnum getTaskType(){}
JobShardingStrategy shardingStrategy(){}
TaskHandler handler(){}
void reduce(Task task){}
}
复制代码

6.2.配置消息

	  rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.topic=my_topic
rocketmq.producer.group=my_group
- rocketmq.consumer.topic=my_topic
rocketmq.consumer.group=my_group
复制代码

生产者跟消费者的 topic 需要对应

6.3.创建任务

	      @Resource	      private TaskApi taskClient;		      public void testCreateTask() {	          taskClient.submit(MyTaskTypeEnum.TYPE1, new Date(), String.valueOf(new Random().nextInt(100000)));	      }
复制代码

6.4.配置调度系统

	  	@Resource	      private TaskService taskService;	  	      @Test	      public void testSchedule() {	          taskService.dispatch();	          taskService.reduce();        	      }
复制代码

7.系统架构

8.技术架构

五、一些细节问题

任务调度分发,当前任务未处理完成的并发问题

mq 消息,发送者消费者策略

mq 消息,消费者是否可以长时间不返回,

发布于: 刚刚阅读数: 4
用户头像

还未添加个人签名 2018-08-22 加入

还未添加个人简介

评论

发布
暂无评论
分布式任务批处理技术选型与实践_苏格拉格拉_InfoQ写作社区