写点什么

如何使用 DATAX 以 UPSERT 语义更新下游 ORACLE 数据库中的数据

  • 2022 年 6 月 21 日
  • 本文字数:2506 字

    阅读完需:约 8 分钟

如何使用 DATAX 以 UPSERT 语义更新下游 ORACLE 数据库中的数据?

1 业务背景

在数据架构上,很多大数据项目,都会将 HIVE/SPARK 等离线计算引擎计算获得的结果数据同步到下游业务系统的线上数据库,以对外提供服务,而且很多业务系统需要为客户提供稳定的 7*24 小时的数据查询功能,要求底层数据库中的数据,需要是准确的,不能出现部分数据缺失的情况。


具体到数据同步工具的选型上,datax 是阿里开源的一款流行的数据集成工具,通过插件机制实现了多种异构数据之间的高效的离线数据同步,目前开源版本 datax 支持的插件已经将近 30 多种了,所以很多大数据项目都选用了 DATAX 来做离线数据的同步。


为保证数据的完整性与准确性,在使用 DATAX 进行数据同步时,目前很多项目都采用了先删除旧数据再插入计算生成的新数据的方式 (通过在作业中配置 preSql 执行旧数据的删除),此时当需要同步的数据量比较大时,旧数据的删除与新数据的插入,都需要一段时间,此时下游数据库中的表不可避免地会有一段时间的空档期,查询不到对应的数据。


怎么解决这个问题呢?


DATAX 官方推荐的一种方式是配置使用临时表,先向临时表导入数据,完成后再 rename 到线上表(可以通过在作业中配置 postSql 完成这类操作)。


除了临时表这种曲线救国的方式,也可以尝试以 UPSERT 语义直接更新下游数据库中线上的目标表数据。


那么 DATAX 中,不同数据库 WRITER 插件都是怎么实现 UPSERT 语义的呢?

2 DATAX 常见数据库 WRITER 插件是怎么实现 UPSERT 语义的?

  • datax 的 MysqlWriter 和 oceanbasev10writer, 支持配置 writeMode 参数为 insert/replace/update,可以通过该参数控制写入数据到目标表时,底层采用 insert into/replace into/INSERT INTO ... ON DUPLICATE KEY UPDATE 语句:

  • 其中 insert into 当主键/唯一性索引冲突时会写不进去冲突的行;

  • 后两者没有遇到主键/唯一性索引冲突时与 insert into 行为一致,遇到冲突时会用新行替换原有行所有字段;

  • datax 原生的 OracleWriter 和 PostgresqlWriter,不支持配置 writeMode 参数,在底层实现上都是通过 JDBC 连接远程 Oracle/PG 数据库,并执行相应的 insert into ... sql 语句将数据写入 Oracle/pg,在内部会分批次提交入库。


那么,能不能更改原生的 OracleWriter 以支持 UPSERT 语义插入 ORALCE 呢?

3. ORACLE 的 MERGE INTO 语句

Oracle 9i 引入了对 merge 语句的支持, 通过 merge 能够在一个 SQL 语句中对一个表同时执行 inserts 和 updates 操作,Oracle 10g 对 MERGE 语句又做了如下增强:


  • UPDATE 或 INSERT 子句是可选的

  • UPDATE 和 INSERT 子句可以加 WHERE 子句

  • 在 ON 条件中可以使用常量过滤谓词来 insert 所有的行到目标表中,不需要连接源表和目标表

  • UPDATE 子句后面可以跟 DELETE 子句来去除一些不需要的行。


merge into 语句语法如下:


MERGE INTO [target-table] A USING [source-table sql] B ON([conditional expression] and [...]...) WHEN MATCHED THEN [UPDATE sql] WHEN NOT MATCHED THEN [INSERT sql]
复制代码


merge into 语句实例如下:


MERGE INTO member_staging xUSING (SELECT member_id, first_name, last_name, rank FROM members) yON (x.member_id  = y.member_id)WHEN MATCHED THEN    UPDATE SET x.first_name = y.first_name,                         x.last_name = y.last_name,                         x.rank = y.rank    WHERE x.first_name <> y.first_name OR            x.last_name <> y.last_name OR            x.rank <> y.rank WHEN NOT MATCHED THEN    INSERT(x.member_id, x.first_name, x.last_name, x.rank)      VALUES(y.member_id, y.first_name, y.last_name, y.rank);    

复制代码


所以,虽然 oracle 不支持类似 MYSQL 的 REPLACE INTO 和 INSERT ... ON DUPLICATE KEY UPDATE,但由于 ORACLE 原生支持 MERGE INTO 语句,我们完全可以更改 datax 的 OracleWriter 源码,通过 merge into 语句,实现 UPSERT 语义。

4. 更改 DATAX oracleWriter 以通过 MERGE INTO 语句实现 UPSERT 语义

涉及改动的 datax 源码中类和方法的改动点主要有:


  • com.alibaba.datax.plugin.writer.oraclewriter.OracleWriter.Job#init:更改该方法以允许用户配置 writeMode;

  • com.alibaba.datax.plugin.rdbms.writer.util.OriginalConfPretreatmentUtil#dealWriteMode:更改该方法以获取用户配置的 uniqueKeys 并在调用 WriterUtil.getWriteTemplate 时传递 uniqueKeys;

  • com.alibaba.datax.plugin.rdbms.writer.util.WriterUtil#getWriteTemplate: 更改该方法,以在用户配置 writeMode 使用 replace 且配置了 uniqueKeys 时,拼接获取 ORACLE MERGE INTO 语句对应的 preparedStatement 字符串;

  • com.alibaba.datax.plugin.rdbms.writer.CommonRdbmsWriter.Task#init:更改该方法以获取用户配置的 uniqueKeys;

  • com.alibaba.datax.plugin.rdbms.writer.CommonRdbmsWriter.Task#calcWriteRecordSql:更改该方法以在调用 WriterUtil.getWriteTemplate 时传递 uniqueKeys;

  • com.alibaba.datax.plugin.rdbms.writer.CommonRdbmsWriter.Task#fillPreparedStatementColumnType(PreparedStatement, int, int, String, Column): 更改该方法以在用户配置 writeMode 使用 replace 且配置了 uniqueKeys 时,对 ORACLE MERGE INTO 语句对应的 preparedStatement 的变量进行 setString 等赋值操作;


com.alibaba.datax.plugin.rdbms.writer.util.WriterUtil#getWriteTemplate 方法拼接获取的 ORACLE MERGE INTO 语句对应的 preparedStatement 字符串,示例内容如下:


MERGE INTO %s xUSING (SELECT ? as member_id, ? as first_name, ? as last_name, ? as rank FROM dual) yON (x.member_id  = y.member_id and x.xxx = y.xx)WHEN MATCHED THEN UPDATE SET                 x.first_name = y.first_name,                 x.last_name = y.last_name,                 x.rank = y.rankWHEN NOT MATCHED THEN INSERT(x.member_id, x.first_name, x.last_name, x.rank)      VALUES(?,?,?,?);    
复制代码


!关注不迷路~ 各种福利、资源定期分享!欢迎小伙伴们扫码添加明哥微信,后台加群交流学习。



发布于: 刚刚阅读数: 4
用户头像

Keep Striving! 2018.04.25 加入

明哥,十四年 IT经验,六年大数据经验; 做过大数据集群的搭建运维,大数据应用系统的开发优化,也做过大数据平台的技术选型以及架构咨询; 目前聚焦于泛大数据生态,包括数据仓库/数据湖,云计算和人工智能。

评论

发布
暂无评论
如何使用 DATAX 以 UPSERT 语义更新下游 ORACLE 数据库中的数据_oracle_明哥的IT随笔_InfoQ写作社区