写点什么

将 DataX 执行结果通过钉钉上报

用户头像
白粥
关注
发布于: 2021 年 06 月 04 日
将DataX执行结果通过钉钉上报

在上文《使用 Docker 运行 DataX 定时全量备份关键数据表》中我们知道了如何运行 DataX 备份程序,接下来我们实现将任务执行结果通过钉钉自定义机器人通知。


DataX 的 Hook 机制

DataX 的代码库,在“common/src/main/java/com/alibaba/datax/common/spi/Hook.java”类中定义了开放的接口:

public interface Hook {
/** * 返回名字 * * @return */ public String getName();
/** * TODO 文档 * * @param jobConf * @param msg */ public void invoke(Configuration jobConf, Map<String, Number> msg);
}
复制代码


在“core/src/main/java/com/alibaba/datax/core/container/util/HookInvoker.java”类中使用 ServiceLoader 机制加载并 Hook 的实现类。


在“core/src/main/java/com/alibaba/datax/core/job/JobContainer.java”类中,当任务结束后调用“HookInvoker”执行了所有的 Hook:

/** * Created by jingxing on 14-8-24. * <p/> * job实例运行在jobContainer容器中,它是所有任务的master,负责初始化、拆分、调度、运行、回收、监控和汇报 * 但它并不做实际的数据同步操作 */public class JobContainer extends AbstractContainer {   /**     * jobContainer主要负责的工作全部在start()里面,包括init、prepare、split、scheduler、     * post以及destroy和statistics     */    @Override    public void start() {        LOG.info("DataX jobContainer starts job.");
boolean hasException = false; boolean isDryRun = false; try { this.startTimeStamp = System.currentTimeMillis(); isDryRun = configuration.getBool(CoreConstant.DATAX_JOB_SETTING_DRYRUN, false); if(isDryRun) { LOG.info("jobContainer starts to do preCheck ..."); this.preCheck(); } else { userConf = configuration.clone(); LOG.debug("jobContainer starts to do preHandle ..."); this.preHandle();
LOG.debug("jobContainer starts to do init ..."); this.init(); LOG.info("jobContainer starts to do prepare ..."); this.prepare(); LOG.info("jobContainer starts to do split ..."); this.totalStage = this.split(); LOG.info("jobContainer starts to do schedule ..."); this.schedule(); LOG.debug("jobContainer starts to do post ..."); this.post();
LOG.debug("jobContainer starts to do postHandle ..."); this.postHandle(); LOG.info("DataX jobId [{}] completed successfully.", this.jobId); // ⚠️ 执行hook ⚠️ this.invokeHooks(); } } // 省略 } /** * 调用外部hook */ private void invokeHooks() { Communication comm = super.getContainerCommunicator().collect(); HookInvoker invoker = new HookInvoker(CoreConstant.DATAX_HOME + "/hook", configuration, comm.getCounter()); invoker.invokeAll(); } }
复制代码

其中,“configuration”为对应 job 的 json 配置信息,“comm.getCounter()”存放相关的执行统计。 我们通过实现 Hook 接口,就可以拿到相应的信息。


实现 Hook

新建一个项目,将 DataX 的“datax-common-0.0.1-SNAPSHOT.jar”和钉钉的“taobao-sdk-java-auto_1479188381469-20210528.jar”加入到 classpath 中。


然后编写继承 Hook 接口的实现类:DingTalkReport。

public class DingTalkReport implements Hook {	private static final Logger LOG = LoggerFactory.getLogger(DingTalkReport.class);	private static final SimpleDateFormat dateFormat = new SimpleDateFormat(		"yyyy-MM-dd HH:mm:ss");
@Override public String getName() { return "DingTalkReportHook"; }
@Override public void invoke(Configuration configuration, Map<String, Number> map) { LOG.debug(configuration.beautify()); //{writeSucceedRecords=1248, // readSucceedRecords=1247, // totalErrorBytes=0, // writeSucceedBytes=81477, // byteSpeed=0, // totalErrorRecords=0, // recordSpeed=0, // waitReaderTime=308600221, // writeReceivedBytes=81477, // stage=1, // waitWriterTime=6348796, // percentage=1.0, // totalReadRecords=1247, // writeReceivedRecords=1248, // readSucceedBytes=81477, // totalReadBytes=81477} LOG.debug(map.toString());
// 从job的json配置中读取自定义的参数 String accessToken = getString(configuration.get("job.dingTalkReporter.accessToken")); String title = getString(configuration.get("job.dingTalkReporter.title")); String secret = getString(configuration.get("job.dingTalkReporter.secret")); String defaultTemplate = "# %s \n > %s \n - 成功读:%d \n - 成功写:%d \n - 等待%d秒"; String time = dateFormat.format(System.currentTimeMillis()); // 好像没办法拿到执行时间,如果需要更多map中没有的信息,可以改写Hook接口 long totalWaitTime = TimeUnit.SECONDS.convert(map.get("waitReaderTime").longValue() + map.get("waitWriterTime").longValue(), TimeUnit.NANOSECONDS);
String content = String.format(defaultTemplate, title, time, map.get("readSucceedRecords").longValue(), map.get("writeSucceedRecords").longValue(), totalWaitTime);
DingTalkUtil.send(title, content, accessToken, secret); }
String getString(Object obj){ return null == obj ? "" : String.valueOf(obj); }
}
复制代码

打包

假定包名为:com.bz.datax.hook,在 resources 下创建目录“META_INF/services”,在 services 文件夹下,新建文件“com.alibaba.datax.common.spi.Hook”,文件内容为 Hook 接口实现类的路径。

com.bz.datax.hook.DingTalkReport
复制代码

然后通过 idea 打包即可。


集成 Hook

引入 Hook 实现包


在 DataX 根目录下,新建“hook”文件夹,然后在其下创建“dingtalk”文件夹(也可自定义其他名字),将我们打包好的 jar 包放到“dingtalk”文件夹下。


新增 job 配置

接入钉钉自定义机器人,需要引入额外的配置信息,我们可以在 job 任务描述文件新增钉钉的配置参数“dingTalkReporter”,利用 configuration 类提取即可。

{    "job": {        "dingTalkReporter": {            "accessToken": "d9e5b4f89xxxxxxxxxxx",            "secret": "SECc008578xxxxxxx",            "title": "Test表同步"        },        "setting": {            "speed": {                "byte":10485760            },            "errorLimit": {                "record": 0,                "percentage": 0.02            }        },……}
复制代码

其中 accessToken、title 为必填,secret 为选填。


启动效果:

2021-06-03 10:39:08.673 [job-0] INFO  HookInvoker - Invoke hook [DingTalkReportHook], path: /datax/hook/dingtalk2021-06-03 10:39:09.415 [job-0] INFO  DingTalkUtil - Send DingTalk Message Result:ok-0
复制代码


通知效果:



参考

  1. 完整项目代码参加 Github:https://github.com/mchange/datax-dingtalk-report

  2. 钉钉自定义机器人接入文档:https://developers.dingtalk.com/document/app/custom-robot-access

发布于: 2021 年 06 月 04 日阅读数: 171
用户头像

白粥

关注

还未添加个人签名 2018.05.02 加入

还未添加个人简介

评论

发布
暂无评论
将DataX执行结果通过钉钉上报