写点什么

[自研开源] MyData 数据集成任务的流程介绍 v0.7.1

作者:LIEN
  • 2024-03-08
    江苏
  • 本文字数:2831 字

    阅读完需:约 9 分钟

[自研开源] MyData 数据集成任务的流程介绍 v0.7.1

开源地址:gitee | github

详细介绍:MyData 基于 Web API 的数据集成平台 v0.7.0

部署文档:用 Docker 部署 MyData v0.7.1

使用手册:MyData 使用手册v0.7.1

交流 Q 群:430089673

MyData 后端结构

MyData 的后端由 3 个子服务组成,分别是管理服务任务服务业务数据服务

  • 管理服务:通过项目、数据标准、应用 API、环境的管理 配置出同步业务数据的任务;

  • 任务服务:根据配置的任务 定时调用应用 API 和数据服务 实现业务数据的传输和存储;

  • 数据服务:封装业务数据的隔离机制和读写操作;


依赖的组件:

  • MySQL:存储管理数据;

  • Redis:缓存管理数据和任务;

  • MongoDB;存储业务数据;


下图从数据流角度 展示 3 个子服务的关联:

注:开源版本采用单体 SpringBoot;


任务服务

配置任务

任务主要包括:项目环境、数据标准、应用 API、任务类型、字段映射、任务周期;

  • 项目环境:确定应用 API 的统一前缀地址;

  • 数据标准:明确集成的业务数据的数据结构;

  • 应用 API: 业务数据的传输通道;

  • 任务类型:明确数据的传输方向,提供数据表示从应用 API 读取业务员数据、消费数据表示向应用 API 发送业务数据;

  • 字段映射:配置接口响应结构中 与标准数据字段的映射关系;

  • 任务周期:定期执行任务的时间间隔,格式为 cron 表达式;


任务流程

数据集成的任务执行流程如下图:


  1. 任务服务启动时(即 MyData 系统启动),查询所有运行状态的任务;

public class JobExecutor implements ApplicationRunner {    ...        @Override    public void run(ApplicationArguments args) {        // 移除已有缓存        jobCache.removeAll();            // 查询已启动的任务        List<Task> tasks = taskService.listRunningTasks();        log.info("tasks.size() = " + tasks.size());        if (CollUtil.isNotEmpty(tasks)) {            tasks.forEach(this::startTask);        }    }
...}
复制代码


  1. 根据任务的 cron 表达式,计算任务的下次执行时间;

/** * 根据 任务的上次执行时间 和 设定间隔规则,计算任务的 下次执行时间 * * @param taskInfo 定时任务 */private void calculateNextRunTime(TaskInfo taskInfo) {    Assert.notNull(taskInfo);    Assert.notEmpty(taskInfo.getTaskPeriod());    Date date = taskInfo.getStartTime();    if (taskInfo.getFailCount() > 0) {        date = taskInfo.getNextRunTime();    }
CronExpression cronExpression = new CronExpression(taskInfo.getTaskPeriod()); Date nextRunTime = cronExpression.getNextValidTimeAfter(date); taskInfo.setNextRunTime(nextRunTime);}
复制代码


  1. 计算任务的下次执行时间 与 当前时间的时间差,以时间差作为缓存失效期 将任务存入 redis 缓存;

/** * 缓存任务 * * @param taskInfo 任务对象 * @throws IllegalArgumentException 缓存时长无效 */public void cacheJob(TaskInfo taskInfo) throws IllegalArgumentException {    // 计算任务缓存有效时长    long expire = DateUtil.between(taskInfo.getStartTime(), taskInfo.getNextRunTime(), DateUnit.SECOND);    if (expire <= 0) {        throw new IllegalArgumentException(StrUtil.format("expire <= 0, startTime = {}, nextRunTime = {}"                , DateUtil.format(taskInfo.getStartTime(), DatePattern.NORM_DATETIME_MS_PATTERN)                , DateUtil.format(taskInfo.getNextRunTime(), DatePattern.NORM_DATETIME_MS_PATTERN)));    }
redisUtil.set(CACHE_TASK + taskInfo.getId(), taskInfo); redisUtil.set(CACHE_JOB + taskInfo.getId(), taskInfo.getId(), expire); taskInfo.appendLog("任务存入redis,缓存时长 {} 秒", expire);}
复制代码


  1. 通过监听 redis 的 key 失效事件,获得待执行的任务;

public class RedisKeyExpiredListener implements MessageListener {
private final JobExecutor jobExecutor;
@Override public void onMessage(Message message, byte[] pattern) { String expiredKey = message.toString(); if (StrUtil.startWith(expiredKey, JobCache.CACHE_JOB)) { String taskId = StrUtil.subSuf(expiredKey, JobCache.CACHE_JOB.length()); jobExecutor.notify(taskId); } }}
复制代码


  1. 将任务加入待执行的线程池,随后即可执行

/** * 任务存入执行队列 * * @param taskInfo 任务 */private void executeJob(TaskInfo taskInfo) {    taskInfo.appendLog("任务存入执行队列");    Runnable runnable = new JobThread(taskInfo);    getThreadPoolExecutor().execute(runnable);}
复制代码


  1. 根据任务类型分别执行提供数据消费数据流程;

  2. 提供数据

  3. 调用应用 API,获取 json 格式数据;

  4. 根据任务中字段映射 解析 json 为业务数据 Map 集合;

  5. 调用数据服务 将业务数据存入 MongoDB;

case MdConstant.DATA_PRODUCER:    // 调用api 获取json    String json = ApiUtil.read(taskInfo);    // 将json按字段映射 解析为业务数据    jobDataService.parseData(taskInfo, json);    // 根据条件过滤数据    jobDataFilterService.doFilter(taskInfo);    // 保存业务数据    jobDataService.saveTaskData(taskInfo);    // 更新环境变量    jobVarService.saveVarValue(taskInfo, json);    break;
复制代码


  1. 消费数据

  2. 根据任务所选数据标准,查询业务数据;

  3. 再根据字段映射,将业务数据 转为指定的 json 对象集合;

  4. 调用应用 API,传输 json 数据;

case MdConstant.DATA_CONSUMER:                    List<BizDataFilter> filters = taskInfo.getDataFilters();                    if (CollUtil.isNotEmpty(filters)) {                        // 解析过滤条件值中的 自定义字符串                        parseFilterValue(filters);                        // 排除值为null的条件                        filters = filters.stream().filter(filter -> filter.getValue() != null).collect(Collectors.toList());                    }                    // 根据过滤条件 查询数据                    String dataCode = taskInfo.getDataCode();                    if (StrUtil.isNotEmpty(dataCode)) {                        List<Map> dataList = bizDataDAO.list(MdUtil.getBizDbCode(taskInfo.getTenantId(), taskInfo.getProjectId(), taskInfo.getEnvId()), dataCode, filters);                        taskInfo.setConsumeDataList(dataList);                        // 根据字段映射转换为api参数                        jobDataService.convertData(taskInfo);                    }                    // 调用api传输数据                    ApiUtil.write(taskInfo);                    break;
复制代码


  1. 保存任务执行日志;


发布于: 刚刚阅读数: 4
用户头像

LIEN

关注

还未添加个人签名 2020-04-25 加入

还未添加个人简介

评论

发布
暂无评论
[自研开源] MyData 数据集成任务的流程介绍 v0.7.1_开源_LIEN_InfoQ写作社区