写点什么

简易异步任务中心 & 批量导入技术处理方案

  • 2024-02-01
    北京
  • 本文字数:9525 字

    阅读完需:约 31 分钟

一、解决什么问题

一个任务中心技术实现的参考案例,可以快速部署实现且仅需关注业务个性落库逻辑实现,其他如任务状态维护、数据解析及异常包装、结果导出均由工具自动实现。

二、基本原理


图 1 请求示意图


异步任务中心共分三个模块:


1)任务初始化, 将目标导入文件上传至云存储后得到目标文件 url 按任务类型(如类目导入、商品导入等)入库任务表并返回前台提交成功,任务初始状态为"待处理";


2)任务调度,使用开源调度组件 xxlJob 开箱即用。传送门: xxlJob


3)任务 Worker 执行器核心组成:


1.任务并行分片拉取


分片广播模式下,每个 worker 按 index 取模 获取应执行的任务 id,参考 sql :


from task where status in ('PENDING','FAILURE') and errCnt <= MAX_RETRY_CNT and mod(id,#总 worker 数量) = 当前 worker index


2.根据任务类型命中执行器策略


任务类型: 即导入业务的枚举字段,如类目导入 CATE_IMPORT、商品导入 PRODUCT_IMPORT 等


业务执行器: 执行 excel 批量导入解析落库的载体,下文介绍。


策略如何命中: 业务执行器 class 类增加 @JobExecutor 注解并指明注解值为对应任务类型; 拉取任务后寻找有 @JobExecutor 修饰的类且其注解值等于任务记录任务类型即为命中目标执行器


3.执行器设计


A.抽象任务接口并定义行为 -> BaseJob<T>


accept() 接受任务,实现后置任务状态为"处理中"


parse() 解析任务, 负责解析目标文件(zip、xlsx)为 List<Bean>,并实现数据校验


run() 将业务数据 List<Bean>数据落库


export() 生成导入结果文件,上传至云存储并更新到任务记录结果列


errHandle() 异常处理,置任务状态为"失败",累计任务失败次数,触发业务报警


B.基础抽象实现类 -> BaseExecutableAbsJob implements BaseJob


accept()export()errHandle() 步骤因其业务无关性故在此抽象类中做通用默认实现;


parse() 有一定通用性,默认实现为 excel 解析(easyExcel 实现)


run() 业务相关不做默认实现,由继承方实现


C.一次性解析抽象实现 -> DisposableAbsJob extends BaseExecutableAbsJob


特征:


解析规则为一次性解析 excel 所有记录**,不适用超大 excel 解析 job**


可以在落库前获得全部业务实体信息


导出结果可以显示原始输入


D.分批解析通用实现类 -> BatchableAbsJob extends BaseExecutableAbsJob


特征:


解析规则为按 BATCH_CNT 来分段操作数据解析及入库,适用于大 excel 导入场景的使用


解析完毕前拿不到记录总数


导出结果不显示原始输入,仅显示 MAX_ERROR_CNT 数量以内的错误记录原始信息及错误信息。

三、快速使用

业务类按场景选择继承 DisposableAbsJob 或 BatchableAbsJob,


**仅需重写落库方法,其他如拉取、解析、导出结果步骤均由系统自动执行。如需特殊解析逻辑(比如解析 zip 按特定规则拼装 bean)重写 parse()**方法即可


举个栗子,现需求场景为批量类目信息导入, 则开发过程为:


步骤一 : 落库任务类型为 TaskBizTypeEnum.CATE_BATCH_PUBLISH 的记录到任务表中,并记录前台上传的 excel 导入文件 url(常规 crud 本案例不做封装,自行实现即可)


步骤二 : 定义类目 Excel 导入实体 Bean


/** * 类目导入实体 */@Data@NoArgsConstructor@AllArgsConstructor@EqualsAndHashCode(callSuper = true)public class ImportCateExcelDTO extends BaseWorkerDTO {
/** 类目级别*/ @ExcelProperty(index = 0,converter = CateLevelConverter.class,value = "类目级别") private Integer cateLevel;
/** 类目中文名*/ @ExcelProperty(index = 1 ,value = "类目中文名") private String cateName;
/** 类目排序*/ @ExcelProperty(index = 2 ,value = "类目排序") private Integer sort;
/** 上级类目id*/ @ExcelProperty(index = 3 ,value = "上级管理类目id") private Long parentCateId;
/** 状态*/ @ExcelProperty(index = 4,converter = StatusConverter.class ,value = "状态") private Integer status;
}
复制代码


步骤三 : 编写业务实现类,并自行实现 run 落库方法.


/** * 类目批量导入(一次性解析全部excel) */@Service@Slf4j@JobExecutor(taskBizType = TaskBizTypeEnum.CATE_BATCH_PUBLISH)  // 策略注解,枚举类型全局唯一。 不加该注解则任务调度找不到策略public class DisposableCateImportHandler extends DisposableAbsJob<ImportCateExcelDTO> {
@Resource private XXXXService xxxxService;
@Override public void run(TaskDTO<ImportCategoryExcelDTO> task){ try{ if(CollectionUtils.isNotEmpty(task.getTarget())){ xxxxService.save(task.getTarget()) } }catch (BaseImportException e){ errHandle(task); } }}
复制代码


至此开发部分结束,任务执行器会自动调度拉取 CATE_BATCH_PUBLISH 类型的任务 -> 解析到 List<Bean> -> 调用你的 run()方法实现落库 -> 将结果流上传到云存储并将结果链接更新到任务表中

四、源码

1. TaskDispatcher - 任务调度派发


/** * 任务调度派发 */@Component@Slf4jpublic class TaskDispatcher {
@Resource private TaskMangeService taskMangeService; @Resource private ApplicationContext applicationContext;
@SneakyThrows @XxlJob("iscWorker") public ReturnT<String> iscWorker(String param) { TaskDTO task = taskMangeService.pullTask(); if(task!=null){ BaseJob executor = getExecutor(task.getTask().getBizType()); if(null!=executor){ executor.of(task).start(); log.info("iscWorker 执行完毕:{} " , JSON.toJSONString(task)); } } return ReturnT.SUCCESS; }
//获取执行器 public BaseJob getExecutor(TaskBizTypeEnum taskBizType){ Map<String, Object> beanMap = applicationContext.getBeansWithAnnotation(JobExecutor.class); if(beanMap.isEmpty()){ return null; } log.info("TaskDispatcher.getExecutor class list:{}" , beanMap.keySet()); for (Map.Entry<String,Object> entry : beanMap.entrySet()) { try { JobExecutor ano = AnnotationUtil.getAnnotation(entry.getValue().getClass(), JobExecutor.class); if(taskBizType.equals(ano.taskBizType()) && entry.getValue() instanceof BaseJob){ log.info("TaskDispatcher.getExecutor 当前任务:{}命中执行策略job:{}" , taskBizType, entry.getValue()); return (BaseJob) entry.getValue(); } }catch (Exception e){ e.printStackTrace(); } } return null; }
}
复制代码


2. DisposableAbsJob - 一次性解析任务执行器


/** *  一次性解析任务执行器,解析规则为一次性解析所有excel记录,不适用超大excel解析job *  使用方法: 1.使用方继承DisposableAbsJob类,并根据需要重写parse方法(当前默认是按excel解析) *           2.重写run方法,将解析好的list<Bean>推入数据库 */@Component@Slf4jpublic abstract class DisposableAbsJob<T extends BaseWorkerDTO> extends BaseExecutableAbsJob<T> {
//自有个性逻辑,默认就是空逻辑}
复制代码


3. BatchableAbsJob - 分段解析任务执行器


/** *  批次解析任务执行器,解析规则为分批解析excel记录,适用超大excel解析job *  使用方法: 1.使用方继承BatchableAbsJob类,重写saveOrUpdate方法和excel2Po方法, */@Component@Slf4jpublic abstract class BatchableAbsJob<T extends BaseWorkerDTO,K> extends BaseExecutableAbsJob<T> {
/** * 批次解析逻辑 * @param task */ @Override public void parse(TaskDTO<T> task){ if(TaskCreateTypeEnum.IMPORT.equals(task.getTaskCreateType())){ log.info("BaseExecutableAbsJob.import parse {} ",task.getTaskId()); BaseBatchExcelDataListener<T,K> listener = new BaseBatchExcelDataListener<>(this); EasyExcel.read(task.getTargetInputFile().getObjectContent(), getTargetClass(), listener).sheet().doRead(); task.setErrDataList(listener.errDataList); } }
/** 批次解析结果逻辑,仅导出有问题的记录(上限100条) */ @Override public void export(TaskDTO<T> task){ if(task!=null){ log.info("BatchableAbsJob.export {}", task.getTaskId()); if(CollectionUtils.isEmpty(task.getErrDataList())){ taskMangeService.update(new TaskVO(task.getTaskId(), TaskStatusEnum.SUCCESS)); log.info("BatchableAbsJob.export 任务{}全部执行成功" , task.getTaskId()); return; } String resultName = task.getFileName() + Constant.UNDER_LINE + System.currentTimeMillis() + ".xlsx"; ByteArrayOutputStream targetOutputStream = new ByteArrayOutputStream(); try (ExcelWriter excelWriter = EasyExcel.write(targetOutputStream).build()) { if (CollectionUtils.isNotEmpty(task.getErrDataList())) { excelWriter.write(task.getErrDataList(), EasyExcel.writerSheet(0, "result").head(BatchResultDTO.class).build()); } task.setEndTime(System.currentTimeMillis()); excelWriter.finish(); try (ByteArrayInputStream inputStream = new ByteArrayInputStream(targetOutputStream.toByteArray())) { task.setResultUrl(s3Utils.upload(inputStream, FileTypeEnum.BATCH_FILE.getCode(),resultName)); taskMangeService.update(new TaskVO(task.getTaskId(), TaskStatusEnum.SUCCESS, task.getResultUrl())); } } catch (Exception e) { log.error("BaseExecutableAbsJob.export error, target:{} ", task.getTaskId(), e); throw new TaskExportException(task.getTaskId() + e.getMessage()); } finally { log.info("BaseExecutableAbsJob.export 任务「{}」执行完毕:{},文件地址:{}", task.getTaskId(), task.getOssPutMd5(), task.getResultUrl()); } } }
public List<BatchResultDTO> saveOrUpdate(Map<Integer, K> k) { return null; }
public Map<Integer,K> excel2Po(Map<Integer, T> excel) { return null; }
}
复制代码


4. BaseExecutableAbsJob - 通用抽象任务执行器


/** * 通用抽象任务执行器 */@Component@Slf4jpublic abstract class BaseExecutableAbsJob<T extends BaseWorkerDTO> implements BaseJob<T> {
@Resource public S3Utils s3Utils; @Resource public TaskMangeService taskMangeService;
public final static String RESULT_FOLDER = "xxx";

@Override public void accept(TaskDTO<T> task){ //导入类任务 if(TaskCreateTypeEnum.IMPORT.equals(task.getTask().getCreateType())){ task.setTargetInputFile(s3Utils.download(task.getTask().getReqParam())); task.setFileName(task.getTask().getName()); //导出类任务 }else if(TaskCreateTypeEnum.EXPORT.equals(task.getTask().getCreateType())){ // 方式1. 保存 前台勾选的记录id到任务入参中 // 方式2. 根据前台勾选的查询条件命中记录id,再保存到任务入参中<限制总导出记录数> String req = task.getTask().getReqParam(); if(StringUtils.isNotBlank(req)){ task.setKey(Arrays.stream(req.split(Constant.COMMA)).map(Long::valueOf).collect(Collectors.toSet())); } } task.setTaskBizTypeEnum(task.getTask().getBizType()); task.setTaskId(task.getTask().getId()); task.setStartTime(System.currentTimeMillis()); //更新任务状态 taskMangeService.update(new TaskVO(task.getTaskId(),TaskStatusEnum.PROCESSING)); }
/** * 通用解析逻辑 * @param task */ @Override public void parse(TaskDTO<T> task){ if(TaskCreateTypeEnum.IMPORT.equals(task.getTaskCreateType())){ if(task.getTargetInputFile()!=null && task.getTargetInputFile().getObjectContent()!=null){ List<T> target = EasyExcel.read(task.getTargetInputFile().getObjectContent(), getTargetClass() , new PageReadListener<T>(dataList -> {})).sheet(0).headRowNumber(1).doReadSync(); task.setTarget(target); } } }
/** * 导入通用落库逻辑/导出构建list<Bean>逻辑 * @param task */ @Override public void run(TaskDTO<T> task){ }
@Override public void export(TaskDTO<T> task){ if(task!=null){ if(CollectionUtils.isEmpty(task.getTarget())){ taskMangeService.update(new TaskVO(task.getTaskId(), TaskStatusEnum.SUCCESS)); log.info("BaseExecutableAbsJob.export 空任务{},跳过执行" , task.getTaskId()); return; } String resultName = RESULT_FOLDER + task.getTaskBizTypeEnum().getName() + Constant.UNDER_LINE + System.currentTimeMillis() + ".xlsx"; ByteArrayOutputStream targetOutputStream = new ByteArrayOutputStream(); try (ExcelWriter excelWriter = EasyExcel.write(targetOutputStream).build()) { if (CollectionUtils.isNotEmpty(task.getTarget())) { excelWriter.write(task.getTarget(), EasyExcel.writerSheet(0, "result").head(getTargetClass()).build()); } task.setEndTime(System.currentTimeMillis()); excelWriter.finish(); try (ByteArrayInputStream inputStream = new ByteArrayInputStream(targetOutputStream.toByteArray())) { task.setResultUrl(s3Utils.upload(inputStream, FileTypeEnum.BATCH_FILE.getCode(),resultName)); taskMangeService.update(new TaskVO(task.getTaskId(), TaskStatusEnum.SUCCESS, task.getResultUrl())); } } catch (Exception e) { log.error("BaseExecutableAbsJob.export error, target:{} ", task.getTaskId(), e); throw new TaskExportException(task.getTaskId() + e.getMessage()); } finally { log.info("BaseExecutableAbsJob.export 任务「{}」执行完毕:{},文件地址:{}", task.getTaskId(), task.getOssPutMd5(), task.getResultUrl()); } } }
@Override public void errHandle(TaskDTO<T> taskDTO,Exception e){ taskMangeService.errHandle(taskDTO,e.toString()); }
public Class<T> getTargetClass(){ Type res = getClass().getGenericSuperclass(); if(res instanceof ParameterizedType){ ParameterizedType pRes = (ParameterizedType) res; Type[] type = pRes.getActualTypeArguments(); if(type.length>0){ if(type[0] instanceof Class){ Type typeE = type[0]; return (Class<T>)typeE; } } } return null; }
}
复制代码


5. BaseBatchExcelDataListener - 批处理 excel 解析监听器


/** * 批处理excel解析监听器 * @param <T> Excel DTO * @param <K> 落库 PO */@Slf4jpublic class BaseBatchExcelDataListener<T extends BaseWorkerDTO,K> implements ReadListener<T> {
private static final int BATCH_COUNT = 100; private static final int MAX_ERROR_COUNT = 100;
/** 业务服务*/ private final BatchableAbsJob<T,K> batchableAbsJob;
/** 每批待处理业务数据*/ private Map<Integer,T> cachedDataList = Maps.newHashMapWithExpectedSize(BATCH_COUNT);
/** 业务处理失败数据,行号&错误报文 */ public List<BatchResultDTO> errDataList = Lists.newArrayListWithExpectedSize(MAX_ERROR_COUNT) ;
public BaseBatchExcelDataListener(BatchableAbsJob<T,K> batchableAbsJob) { this.batchableAbsJob = batchableAbsJob; }
@Override public void invoke(T data, AnalysisContext context) { cachedDataList.put(context.readRowHolder().getRowIndex(),data); if (cachedDataList.size() >= BATCH_COUNT) { saveData(); cachedDataList = Maps.newHashMapWithExpectedSize(BATCH_COUNT); } }
@Override public void doAfterAllAnalysed(AnalysisContext analysisContext) { saveData(); }
/** 持久化 */ private void saveData() { Map<Integer, K> po = batchableAbsJob.excel2Po(cachedDataList); if(po!=null && !po.isEmpty()){ List<BatchResultDTO> errRes = batchableAbsJob.saveOrUpdate(po); if(errDataList.size()<MAX_ERROR_COUNT && CollectionUtils.isNotEmpty(errRes)){ errDataList.addAll(errRes); } } }}
复制代码


6. BaseJob - 任务接口


public interface BaseJob<T> {
void accept(TaskDTO<T> task);
void parse(TaskDTO<T> task);
void run(TaskDTO<T> task);
void export(TaskDTO<T> task);
void errHandle(TaskDTO<T> task,Exception e);
default AbsExecutor<Void> of(TaskDTO<T> task){ return () -> { try { accept(task); try { parse(task); }finally { if(task.getTargetInputFile()!=null){ task.getTargetInputFile().close(); } } run(task); export(task); }catch (Exception e){ errHandle(task,e); } return null; }; }}
复制代码


7. JobExecutor- 策略注解


/** * 任务执行器 */@Target({ElementType.TYPE})@Retention(RetentionPolicy.RUNTIME)@Documented@Inheritedpublic @interface JobExecutor {    //任务业务类型    TaskBizTypeEnum taskBizType() ;}
复制代码


8. TaskMangeService- 任务执行类


/** * 任务读写服务 */@Service@Slf4jpublic class TaskMangeServiceImpl extends BaseManageSupportService<TaskVO, TaskPO> implements TaskMangeService {
private final static Integer MAX_ERR_CNT = 2; private final static Long LIMIT = 1L;

@Override public TaskPO saveOrUpdate(TaskVO taskVO) { return taskService.save(input); }

@Override public Page<TaskPO> hashList(TaskReqVO taskReqVO) { Page<TaskPO> page = Page.of(taskReqVO.getIndex(), taskReqVO.getSize()); LambdaQueryWrapper<TaskPO> wrapper = Wrappers.<TaskPO>lambdaQuery() .in(CollectionUtils.isNotEmpty(taskReqVO.getStatus()), TaskPO::getStatus, taskReqVO.getStatus()) .eq(taskReqVO.getBizType() != null, TaskPO::getBizType, taskReqVO.getBizType()) .le(taskReqVO.getErrCnt() != null, TaskPO::getErrCnt, taskReqVO.getErrCnt()) .apply("mod(id," + taskReqVO.getShardTotal() + ") =" + taskReqVO.getShardIndex() + " ") .orderByAsc(TaskPO::getCreateTime); return taskService.page(page, wrapper); }
private TaskVO getTask(String fileName,String pin, String key,TaskBizTypeEnum bizType,TaskCreateTypeEnum taskCreateType){ // build task return res; }
@Override public TaskDTO pullTask(){ TaskDTO target = null; ShardingUtil.ShardingVO shardingVo = ShardingUtil.getShardingVo(); log.info("iscWorker.pullTask workerIndex: {}, total:{}" , shardingVo.getIndex(),shardingVo.getTotal()); TaskReqVO queryDTO = new TaskReqVO(); queryDTO.setShardIndex(shardingVo.getIndex()); queryDTO.setShardTotal(shardingVo.getTotal()); queryDTO.setStatus(Lists.newArrayList(TaskStatusEnum.PENDING,TaskStatusEnum.FAILURE)); queryDTO.setErrCnt(MAX_ERR_CNT); queryDTO.setIndex(0L); queryDTO.setSize(LIMIT); Page<TaskPO> targetList = hashList(queryDTO); if(CollectionUtils.isNotEmpty(targetList.getRecords())){ log.info("PublishMkuBySkuWorker.pullTask 准备执行:{}" , JSON.toJSONString(targetList)); target = new TaskDTO<>(targetList.getRecords().get(0)); } return target; }

@Override public Boolean error(TaskVO taskInfo) { return task.update(taskInfo); }
/** 失败处理*/ @Override public void errHandle(TaskDTO task, String errMsg){ error(new TaskVO(task.getTaskId())); Profiler.businessAlarm(UmpKeyConstant.BUSINESS_KEY_TASK_WARNING,("excel批量导入-任务执行异常:"+errMsg+task.getTaskId())); log.info("TaskMangeServiceImpl.errHandle 任务Id{}执行失败:{}", task.getTaskId(),errMsg); }
}
复制代码

五、类图


图 2 类图


作者:京东工业 于洋


来源:京东云开发者社区 转载请注明来源

用户头像

拥抱技术,与开发者携手创造未来! 2018-11-20 加入

我们将持续为人工智能、大数据、云计算、物联网等相关领域的开发者,提供技术干货、行业技术内容、技术落地实践等文章内容。京东云开发者社区官方网站【https://developer.jdcloud.com/】,欢迎大家来玩

评论

发布
暂无评论
简易异步任务中心&批量导入技术处理方案_京东科技开发者_InfoQ写作社区