写点什么

ReplacingMergeTree:实现 Clickhouse 数据更新

发布于: 2 小时前

摘要:Clickhouse 作为一个 OLAP 数据库,它对事务的支持非常有限。本文主要介绍通过 ReplacingMergeTree 来实现 Clickhouse 数据的更新、删除。


本文分享自华为云社区《Clickhouse如何实现数据更新》,作者: 小霸王。


Clickhouse 作为一个 OLAP 数据库,它对事务的支持非常有限。Clickhouse 提供了 MUTATION 操作(通过 ALTER TABLE 语句)来实现数据的更新、删除,但这是一种“较重”的操作,它与标准 SQL 语法中的 UPDATE、DELETE 不同,是异步执行的,对于批量数据不频繁的更新或删除比较有用,可参考https://altinity.com/blog/2018/10/16/updates-in-clickhouse。除了 MUTATION 操作,Clickhouse 还可以通过 CollapsingMergeTree、VersionedCollapsingMergeTree、ReplacingMergeTree 结合具体业务数据结构来实现数据的更新、删除,这三种方式都通过 INSERT 语句插入最新的数据,新数据会“抵消”或“替换”掉老数据,但是“抵消”或“替换”都是发生在数据文件后台 Merge 时,也就是说,在 Merge 之前,新数据和老数据会同时存在。因此,我们需要在查询时做一些处理,避免查询到老数据。Clickhouse 官方文档提供了使用 CollapsingMergeTree、VersionedCollapsingMergeTree 的指导,https://clickhouse.com/docs/en/engines/table-engines/mergetree-family/collapsingmergetree/。相比于 CollapsingMergeTree、VersionedCollapsingMergeTree 需要标记位字段、版本字段,用 ReplacingMergeTree 来实现数据的更新删除会更加方便,这里着重介绍一下如何用 ReplacingMergeTree 来实现数据的更新删除。


我们假设一个需要频繁数据更新的场景,如某市用户用电量的统计,我们知道,用户的用电量每分每秒都有可能发生变化,所以会涉及到数据频繁的更新。首先,创建一张表来记录某市所有用户的用电量。


CREATE TABLE IF NOT EXISTS default.PowerConsumption_local ON CLUSTER default_cluster(    User_ID             UInt64                              COMMENT '用户ID',    Record_Time         DateTime    DEFAULT toDateTime(0)   COMMENT '电量记录时间',    District_Code       UInt8                               COMMENT '用户所在行政区编码',    Address             String                              COMMENT '用户地址',    Power               UInt64                              COMMENT '用电量',    Deleted             BOOLEAN     DEFAULT 0               COMMENT '数据是否被删除')ENGINE = ReplicatedReplacingMergeTree('/clickhouse/tables/default.PowerConsumption_local/{shard}', '{replica}', Record_Time)ORDER BY (User_ID, Address)PARTITION BY District_Code;CREATE TABLE default.PowerConsumption ON CLUSTER default_cluster AS default.PowerConsumption_localENGINE = Distributed(default_cluster, default, PowerConsumption_local, rand());
复制代码


PowerConsumption_local 为本地表,PowerConsumption 为对应的分布式表。其中 PowerConsumption_local 使用 ReplicatedReplacingMergeTree 表引擎,第三个参数‘Record_Time’表示相同主键的多条数据,只会保留 Record_Time 最大的一条,我们正是利用 ReplacingMergeTree 的这一特性来实现数据的更新删除。因此,在选择主键时,我们需要确保主键唯一。这里我们选择(User_ID, Address)来作为主键,因为用户 ID 加上用户的地址可以确定唯一的一个电表,不会出现第二个相同的电表,所以对于某个电表多条数据,只会保留电量记录时间最新的一条。


然后我们向表中插入 10 条数据:


INSERT INTO default.PowerConsumption VALUES (0, '2021-10-30 12:00:00', 3, 'Yanta', rand64() % 1000 + 1, 0);INSERT INTO default.PowerConsumption VALUES (1, '2021-10-30 12:10:00', 2, 'Beilin', rand64() % 1000 + 1, 0);INSERT INTO default.PowerConsumption VALUES (2, '2021-10-30 12:15:00', 1, 'Weiyang', rand64() % 1000 + 1, 0);INSERT INTO default.PowerConsumption VALUES (3, '2021-10-30 12:18:00', 1, 'Gaoxin', rand64() % 1000 + 1, 0);INSERT INTO default.PowerConsumption VALUES (4, '2021-10-30 12:23:00', 2, 'Qujiang', rand64() % 1000 + 1, 0);INSERT INTO default.PowerConsumption VALUES (5, '2021-10-30 12:43:00', 3, 'Baqiao', rand64() % 1000 + 1, 0);INSERT INTO default.PowerConsumption VALUES (6, '2021-10-30 12:45:00', 1, 'Lianhu', rand64() % 1000 + 1, 0);INSERT INTO default.PowerConsumption VALUES (7, '2021-10-30 12:46:00', 3, 'Changan', rand64() % 1000 + 1, 0);INSERT INTO default.PowerConsumption VALUES (8, '2021-10-30 12:55:00', 1, 'Qianhan', rand64() % 1000 + 1, 0);INSERT INTO default.PowerConsumption VALUES (9, '2021-10-30 12:57:00', 4, 'Fengdong', rand64() % 1000 + 1, 0);
复制代码


表中数据如图所示:



假如现在我们要行政区编码为 1 的所有用户数据都需要更新,我们插入最新的数据:


INSERT INTO default.PowerConsumption VALUES (2, now(), 1, 'Weiyang', rand64() % 100 + 1, 0);INSERT INTO default.PowerConsumption VALUES (3, now(), 1, 'Gaoxin', rand64() % 100 + 1, 0);INSERT INTO default.PowerConsumption VALUES (6, now(), 1, 'Lianhu', rand64() % 100 + 1, 0);INSERT INTO default.PowerConsumption VALUES (8, now(), 1, 'Qianhan', rand64() % 100 + 1, 0);
复制代码


插入最新数据后,表中数据如图所示:



可以看到,此时新插入的数据与老数据同时存在于表中,因为后台数据文件还没有进行 Merge,“替换”还没有发生,这时就需要对查询语句做一些处理来过滤掉老数据,函数 argMax(a, b)可以按照 b 的最大值取 a 的值,所以通过如下查询语句就可以只获取到最新数据:


SELECT    User_ID,    max(Record_Time) AS R_Time,    District_Code,    Address,    argMax(Power, Record_Time) AS Power,    argMax(Deleted, Record_Time) AS DeletedFROM default.PowerConsumptionGROUP BY    User_ID,    Address,    District_CodeHAVING Deleted = 0;
复制代码


查询结果如下图:



为了更方便我们查询,这里可以创建一个视图:


CREATE VIEW PowerConsumption_view ON CLUSTER default_cluster ASSELECT    User_ID,    max(Record_Time) AS R_Time,    District_Code,    Address,    argMax(Power, Record_Time) AS Power,    argMax(Deleted, Record_Time) AS DeletedFROM default.PowerConsumptionGROUP BY    User_ID,    Address,    District_CodeHAVING Deleted = 0;
复制代码


通过该视图,可以查询到最新的数据:



假如现在我们又需要删除用户 ID 为 0 的数据,我们需要插入一条 User_ID 字段为 0,Deleted 字段为 1 的数据:


INSERT INTO default.PowerConsumption VALUES (0, now(), 3, 'Yanta', null, 1);
复制代码


查询视图,发现 User_ID 为 0 的数据已经查询不到了:



通过如上方法,我们可以实现 Clickhouse 数据的更新、删除,就好像在使用 OLTP 数据库一样,但我们应该清楚,实际上老数据真正的删除是在数据文件 Merge 时发生的,只有在 Merge 后,老数据才会真正物理意义上的删除掉。


点击关注,第一时间了解华为云新鲜技术~

发布于: 2 小时前阅读数: 4
用户头像

提供全面深入的云计算技术干货 2020.07.14 加入

华为云开发者社区,提供全面深入的云计算前景分析、丰富的技术干货、程序样例,分享华为云前沿资讯动态,方便开发者快速成长与发展,欢迎提问、互动,多方位了解云计算! 传送门:https://bbs.huaweicloud.com/

评论

发布
暂无评论
ReplacingMergeTree:实现Clickhouse数据更新