写点什么

一招教你如何高效批量导入与更新数据

  • 2022 年 9 月 16 日
    中国香港
  • 本文字数:6985 字

    阅读完需:约 23 分钟

一招教你如何高效批量导入与更新数据

本文分享自华为云社区《一招教你如何高效批量导入与更新数据》,作者: acydy。


当前高斯 DB (DWS) 提供了 MERGE INTO 功能。本篇文章介绍 MERGE INTO 功能与基本用法。

前言


如果有一张表,我们既想对它更新,又想对它插入应该如何操作? 可以使用 UPDATE 和 INSERT 完成你的目标。


如果你的数据量很大,想尽快完成任务执行,可否有其他方案?那一定不要错过 GaussDB (DWS) 的 MERGE INTO 功能。

合并到 概念


MERGE INTO 是 SQL 2003 引入的标准。


如果表 T 以及可更新的表是可插入的,则可以在其中插入行(受适用的访问规则和一致性规则的约束)。<插入语句>对 T 的主要影响是将指定表中包含的零行或多行中的每一行插入到 T 中。<merge 语句>对 T 的主要影响是将 T 中的零个或多个行替换为指定的行和/或插入到 T 中,将零个或多个指定的行插入,具体取决于<搜索条件的结果>以及是否指定了<<合并>和未匹配子句时<合并>。


一张表在一条语句里面既可以被更新,也可以入。是否被更新还是插入取决于 search 条件 的结果和指定的 merge when match clauseed(当条件 匹配时做什么操作)和 merge 当不匹配时做什么操作)和 merge 当条件 不匹配时做什么操作)语法。


SQL 2008 进行了扩展,可以使用多个 MATCHED 和 NOT MATCHED 。


MERGE 已扩展为支持多个匹配子句和不匹配子句,每个子句都附带一个搜索条件,这为复杂 MERGE 语句的编码提供了更大的灵活性,以处理更新冲突。


MERGE INTO 命令涉及到两张表。目标表:入或者更新的表。源表:用于跟目标表进行匹配的表,目标表的数据来源。


MERGE INTO 语句将目标表和源表中数据针对关联条件进行匹配,若关联条件匹配时对目标表进行 UPDATE,无法匹配时对目标表执行 INSERT。


使用场景:当业务中需要将一个表中大量数据添加到现有表时,使用 MERGE INTO 可以高效地导入将数据导入,避免多次 INSERT+UPDATE 操作。

合并到语法


高斯数据库 (DWS) 合并到语法如下:


MERGE INTO table_name [ [ AS ] alias ]USING { { table_name | view_name } | subquery } [ [ AS ] alias ]ON ( condition )[ WHEN MATCHED THEN UPDATE SET { column_name = { expression | DEFAULT } | ( column_name [, ...] ) = ( { expression | DEFAULT } [, ...] ) } [, ...] [ WHERE condition ]][ WHEN NOT MATCHED THEN INSERT { DEFAULT VALUES | [ ( column_name [, ...] ) ] VALUES ( { expression | DEFAULT } [, ...] ) [, ...] [ WHERE condition ] }];
复制代码


  • INTO 指定目标表。

  • USING 指定源表。源表可以是普通表,也可以是子查询。

  • ON 关联条件,用于指定目标表和源表的关联条件。

  • WHEN MATCHED 当源表和目标表中数据可以匹配关联条件时,选择 WHEN MATCHED 子句执行 UPDATE 操作。

  • 当不匹配 当源表和目标表中数据无法匹配关联条件时,选择

    当匹配时,当不匹配可以缺省一个,不能指定多个。

    当匹配时,当不匹配可以使用 WHERE 进行条件过滤。

    当匹配时,当不匹配顺序可以交换。

实战应用


首先创建好下面几张表,用于执行 MREGE INTO 操作。


gaussdb=# CREATE TABLE dst ( product_id INT, product_name VARCHAR(20),  category VARCHAR(20),  total INT) DISTRIBUTE BY HASH(product_id);gaussdb=# CREATE TABLE dst_data ( product_id INT, product_name VARCHAR(20),  category VARCHAR(20),  total INT) DISTRIBUTE BY HASH(product_id);gaussdb=# CREATE TABLE src ( product_id INT, product_name VARCHAR(20),  category VARCHAR(20),  total INT) DISTRIBUTE BY HASH(product_id);gaussdb=# INSERT INTO dst_data VALUES(1601,'lamaze','toys',100),(1600,'play gym','toys',100),(1502,'olympus','electrncs',100),(1501,'vivitar','electrnc',100),(1666,'harry potter','dvd',100);gaussdb=# INSERT INTO src VALUES(1700,'wait interface','books',200),(1666,'harry potter','toys',200),(1601,'lamaze','toys',200),(1502,'olympus camera','electrncs',200);gaussdb=# INSERT INTO dst SELECT * FROM dst_data;
复制代码

同时指定 当匹配 与 当不匹配


  • 查看计划,看下 MERGE INTO 是如何执行的。


MERGE INTO 转化成 JOIN 将两个表进行关联处理,关联条件就是 ON 后指定的条件。


gaussdb=# EXPLAIN (COSTS off)MERGE INTO dst xUSING src yON x.product_id = y.product_idWHEN MATCHED THEN UPDATE SET product_name = y.product_name, category = y.category, total = y.totalWHEN NOT MATCHED THEN INSERT VALUES (y.product_id, y.product_name, y.category, y.total);                    QUERY PLAN--------------------------------------------------  id |                operation-----+-------------------------------------------- 1 | ->  Streaming (type: GATHER) 2 | -> Merge on dst x 3 | ->  Streaming(type: REDISTRIBUTE) 4 | -> Hash Left Join (5, 6) 5 | ->  Seq Scan on src y 6 | -> Hash 7 | ->  Seq Scan on dst x  Predicate Information (identified by plan id) ------------------------------------------------ 4 --Hash Left Join (5, 6) Hash Cond: (y.product_id = x.product_id)(14 rows)
复制代码


为什么这里转化成了 左 JOIN?


由于需要在目标表与源表匹配时更新目标表,不匹配时向目标表插入数据。也就是源表的一部分数据用于更新目标表,另一部分用于向目标表插入。与左联接 语义是相似的。


 5 --Seq Scan on public.src y         Output: y.product_id, y.product_name, y.category, y.total, y.ctid         Distribute Key: y.product_id 6 --Hash         Output: x.product_id, x.product_name, x.category, x.total, x.ctid, x.xc_node_id 7 --Seq Scan on public.dst x         Output: x.product_id, x.product_name, x.category, x.total, x.ctid, x.xc_node_id         Distribute Key: x.product_id
复制代码


  • 执行 MERGE INTO,查看结果。


两张表在 product_id 是 1502,1601,1666 时可以关联,所以这三条记录被更新。src 表 product_id 是 1700 时未匹配,插入此条记录。其他未修改。


gaussdb=# SELECT * FROM dst ORDER BY 1; product_id | product_name | category  | total------------+--------------+-----------+------- 1501 | vivitar | electrnc | 100 1502 | olympus | electrncs | 100 1600 | play gym     | toys      | 100  1601 | lamaze | toys      | 100 1666 | harry potter | dvd | 100 (5 rows)gaussdb=# SELECT * FROM src ORDER BY 1; product_id | product_name | category  | total------------+----------------+-----------+------- 1502 | olympus camera | electrncs | 200 1601 | lamaze | toys      | 200  1666 | harry potter   | toys      | 200 1700 | wait interface | books     | 200 (4 rows)gaussdb=# MERGE INTO dst xUSING src yON x.product_id = y.product_idWHEN MATCHED THEN UPDATE SET product_name = y.product_name, category = y.category, total = y.totalWHEN NOT MATCHED THEN INSERT VALUES (y.product_id, y.product_name, y.category, y.total);MERGE 4gaussdb=# SELECT * FROM dst ORDER BY 1; product_id | product_name | category  | total------------+----------------+-----------+------- 1501 | vivitar | electrnc | 100 -- 未修改 1502 | olympus camera | electrncs | 200 -- 更新 1600 | play gym       | toys      | 100 -- 未修改 1601 | lamaze | toys      | 200 -- 更新 1666 | harry potter   | toys      | 200 -- 更新 1700 | wait interface | books     | 200 -- 插入(6 rows)
复制代码


  • 查看具体 UPDATE、INSERT 个数


可以通过 EXPLAIN PERFORMANCE 或者 EXPLAIN ANALYZE 查看 UPDATE、INSERT 各自个数。(这里仅显示必要部分)


在 Predicate Information 部分可以看到总共插入一条,更新三条。


在 Datanode Information 部分可以看到每个节点的信息。datanode1 上更新 2 条,datanode2 上插入一条,更新 1 条。


gaussdb=# EXPLAIN PERFORMANCEMERGE INTO dst xUSING src yON x.product_id = y.product_idWHEN MATCHED THEN UPDATE SET product_name = y.product_name, category = y.category, total = y.totalWHEN NOT MATCHED THEN INSERT VALUES (y.product_id, y.product_name, y.category, y.total);  Predicate Information (identified by plan id) ------------------------------------------------ 2 --Merge on public.dst x Merge Inserted: 1 Merge Updated: 3 Datanode Information (identified by plan id) --------------------------------------------------------------------------------------- 2 --Merge on public.dst x         datanode1 (Tuple Inserted 0, Tuple Updated 2)         datanode2 (Tuple Inserted 1, Tuple Updated 1) 
复制代码

省略 WHEN NOT MATCHED 部分


  • 这里由于没有 WHEN NOT MATCHED 部分,在两个表不匹配时不需要执行任何操作,也就不需要源表这部分的数据,所有只需要 inner join 即可。


gaussdb=# EXPLAIN (COSTS off)MERGE INTO dst xUSING src yON x.product_id = y.product_idWHEN MATCHED THEN UPDATE SET product_name = y.product_name, category = y.category, total = y.total;                    QUERY PLAN--------------------------------------------------  id |             operation ----+----------------------------------- 1 | ->  Streaming (type: GATHER) 2 | -> Merge on dst x 3 | -> Hash Join (4,5) 4 | ->  Seq Scan on dst x 5 | -> Hash 6 | ->  Seq Scan on src y  Predicate Information (identified by plan id) ------------------------------------------------ 3 --Hash Join (4,5) Hash Cond: (x.product_id = y.product_id)(13 rows)
复制代码


  • 执行后查看结果。MERGE INTO 只操作了 3 条数据。


gaussdb=# truncate dst;gaussdb=# INSERT INTO dst SELECT * FROM dst_data;gaussdb=# MERGE INTO dst xUSING src yON x.product_id = y.product_idWHEN MATCHED THEN UPDATE SET product_name = y.product_name, category = y.category, total = y.total;MERGE 3gaussdb=# SELECT * FROM dst; product_id | product_name | category  | total------------+----------------+-----------+------- 1501 | vivitar | electrnc | 100 -- 未修改 1502 | olympus camera | electrncs | 200 -- 更新 1600 | play gym       | toys      | 100 -- 未修改 1601 | lamaze | toys      | 200 -- 更新 1666 | harry potter   | toys      | 200 -- 更新(5 rows)
复制代码

省略 WHEN NOT MATCHED


  • 只有在不匹配时进行插入。结果中没有数据被更新。


gaussdb=# EXPLAIN (COSTS off)MERGE INTO dst xUSING src yON x.product_id = y.product_idWHEN NOT MATCHED THEN INSERT VALUES (y.product_id, y.product_name, y.category, y.total);                    QUERY PLAN--------------------------------------------------  id |                operation ----+----------------------------------------- 1 | ->  Streaming (type: GATHER) 2 | -> Merge on dst x 3 | ->  Streaming(type: REDISTRIBUTE) 4 | -> Hash Left Join (5, 6) 5 | ->  Seq Scan on src y 6 | -> Hash 7 | ->  Seq Scan on dst x  Predicate Information (identified by plan id) ------------------------------------------------ 4 --Hash Left Join (5, 6) Hash Cond: (y.product_id = x.product_id)(14 rows)gaussdb=# truncate dst;gaussdb=# INSERT INTO dst SELECT * FROM dst_data;gaussdb=# MERGE INTO dst xUSING src yON x.product_id = y.product_idWHEN NOT MATCHED THEN INSERT VALUES (y.product_id, y.product_name, y.category, y.total);MERGE 1gaussdb=# SELECT * FROM dst ORDER BY 1; product_id | product_name | category  | total------------+----------------+-----------+------- 1501 | vivitar | electrnc | 100 -- 未修改 1502 | olympus | electrncs | 100 -- 未修改 1600 | play gym       | toys      | 100 -- 未修改 1601 | lamaze | toys      | 100 -- 未修改 1666 | harry potter   | dvd | 100 -- 未修改 1700 | wait interface | books     | 200 -- 插入(6 rows)
复制代码

WHERE 过滤条件


语义是在进行更新或者插入前判断当前行是否满足过滤条件,如果不满足,就不进行更新或者插入。如果对于字段不想被更新,需要指定过滤条件。


下面例子在两表可关联时,只会更新 product_name = 'olympus’的行。在两表无法关联时且源表的 product_id != 1700 时才会进行插入。


gaussdb=# truncate dst;gaussdb=# INSERT INTO dst SELECT * FROM dst_data;gaussdb=# MERGE INTO dst xUSING src yON x.product_id = y.product_idWHEN MATCHED THEN UPDATE SET product_name = y.product_name, category = y.category, total = y.total WHERE x.product_name = 'olympus'WHEN NOT MATCHED THEN INSERT VALUES (y.product_id, y.product_name, y.category, y.total) WHERE y.product_id != 1700;MERGE 1gaussdb=# SELECT * FROM dst ORDER BY 1;SELECT * FROM dst ORDER BY 1; product_id | product_name | category  | total------------+----------------+-----------+------- 1501 | vivitar | electrnc | 100 1502 | olympus camera | electrncs | 200 1600 | play gym       | toys      | 100 1601 | lamaze | toys      | 100 1666 | harry potter   | dvd | 100(5 rows)
复制代码

子查询


在 USING 部分可以使用子查询,进行更复杂的关联操作。


  • 对源表进行聚合操作的结果再与目标表匹配


MERGE INTO dst xUSING ( SELECT product_id, product_name, category, sum(total) AS total FROM src group by product_id, product_name, category) yON x.product_id = y.product_idWHEN MATCHED THEN UPDATE SET product_name = x.product_name, category = x.category, total = x.totalWHEN NOT MATCHED THEN INSERT VALUES (y.product_id, y.product_name, y.category, y.total + 200);
复制代码


  • 多个表 UNION 后的结果再与目标表匹配


MERGE INTO dst xUSING ( SELECT 1501 AS product_id, 'vivitar 35mm' AS product_name, 'electrncs' AS category, 100 AS total UNION ALL SELECT 1666 AS product_id, 'harry potter' AS product_name, 'dvd' AS category, 100 AS total) yON x.product_id = y.product_idWHEN MATCHED THEN UPDATE SET product_name = x.product_name, category = x.category, total = x.totalWHEN NOT MATCHED THEN INSERT VALUES (y.product_id, y.product_name, y.category, y.total + 200);
复制代码

存储过程


gaussdb=# CREATE OR REPLACE PROCEDURE store_procedure1()ASBEGIN MERGE INTO dst x USING src y ON x.product_id = y.product_id WHEN MATCHED THEN UPDATE SET product_name = y.product_name, category = y.category, total = y.total;END;/CREATE PROCEDUREgaussdb=# CALL store_procedure1();
复制代码

MERGE INTO 背后原理


上文提到了 MREGE INTO 转化成 LEFT JOIN 或者 INNER JOIN 将目标表和源表进行关联。那么如何知道某一行要进行更新还是插入?


通过 EXPLAIN VERBOSE 查看算子的输出。扫描两张表时都输出了 ctid 列。那么 ctid 列有什么作用呢?


 5 --Seq Scan on public.src y         Output: y.product_id, y.product_name, y.category, y.total, y.ctid         Distribute Key: y.product_id 6 --Hash         Output: x.product_id, x.product_name, x.category, x.total, x.ctid, x.xc_node_id 7 --Seq Scan on public.dst x         Output: x.product_id, x.product_name, x.category, x.total, x.ctid, x.xc_node_id         Distribute Key: x.product_id
复制代码


ctid 标识了这一行在存储上具体位置,知道了这个位置就可以对这个位置的数据进行更新。GaussDB (DWS) 作为 MPP 分布式数据库,还需要知道节点的信息 (xc_node_id)。UPDATE 操作需要这两个值。


在 MREGE INTO 这里 ctid 还另有妙用。当目标表匹配时需要更新,这是就保留本行 ctid 值。如果无法匹配,插入即可。就不需要 ctid,此时可认识 ctid 值是 NULL。根据 LEFT JOIN 输出的 ctid 结果是否为 NULL,最终决定本行该被更新还是插入。


这样在两张表做完 JOIN 操作后,根据 JOIN 后输出的 ctid 列,更新或者插入某一行。

注意事项


使用 MERGE INTO 时要注意匹配条件是否合适。如果不注意,容易造成数据被非预期更新,可能整张表被更新。

总结


GAUSSDB (DWS) 提供了高效的数据导入的功能 MERGE INTO,对于数据仓库是一项非常关键的功能。可以使用 MERGE INTO 同时更新和插入一张表,在数据量非常大的情况下也能很快完成地数据导入。


点击关注,第一时间了解华为云新鲜技术~

发布于: 5 小时前阅读数: 5
用户头像

提供全面深入的云计算技术干货 2020.07.14 加入

华为云开发者社区,提供全面深入的云计算前景分析、丰富的技术干货、程序样例,分享华为云前沿资讯动态,方便开发者快速成长与发展,欢迎提问、互动,多方位了解云计算! 传送门:https://bbs.huaweicloud.com/

评论

发布
暂无评论
一招教你如何高效批量导入与更新数据_数据库_华为云开发者联盟_InfoQ写作社区