写点什么

大数据 -140 ClickHouse CollapsingMergeTree 详解 外部数据源最小闭环 HDFS/MySQL/Kafka

作者:武子康
  • 2025-10-31
    山东
  • 本文字数:3346 字

    阅读完需:约 11 分钟

大数据-140 ClickHouse CollapsingMergeTree详解 外部数据源最小闭环HDFS/MySQL/Kafka

TL;DR

  • 场景:高并发“以增代删/更新”审计流量,如何既省存储又查得准?

  • 结论:更新场景用 VersionedCollapsing + argMax,删除/撤销用 Collapsing;FINAL 只用于小范围核对。

  • 产出:标准 DDL、写入规范、argMax 查询模板、OPTIMIZE ... FINAL ,HDFS、MySQL、Kafka 数据源对接



CollapsingMergeTree

核心工作原理:以增代删

CollapsingMergeTree 是 ClickHouse 中一种特殊的 MergeTree 引擎,其核心设计理念是"以增代删"。Yandex 官方对此引擎的定义是:


CollapsingMergeTree 会异步地删除(折叠)除了特定列的 Sign 列值为 1 和-1 以外,其他所有字段值都相等的成对行。没有成对的行会被保留。这种设计可以显著降低存储量并提高 SELECT 查询效率。

状态列机制详解

CollapsingMergeTree 引擎的关键在于其特殊的状态列 Sign,该列的值决定行的状态:


  • 1(状态行):表示这是一条有效的数据记录

  • -1(取消行):表示这是一条需要被删除/撤销的记录

运行机制示例

假设我们有一个用户行为跟踪表:


CREATE TABLE user_actions (    user_id UInt64,    action_date Date,    action String,    duration UInt32,    sign Int8) ENGINE = CollapsingMergeTree(sign)ORDER BY (user_id, action_date, action)
复制代码


写入示例


  1. 先插入一条状态行:


   INSERT INTO user_actions VALUES (123, '2023-01-01', 'login', 30, 1)
复制代码


  1. 随后发现数据有误,插入取消行:


   INSERT INTO user_actions VALUES (123, '2023-01-01', 'login', 30, -1)
复制代码


后台合并时,这两条记录会被折叠删除。

应用场景

这种引擎特别适合以下场景:


  • 需要频繁更新的时序数据

  • 需要保留历史变更记录的审计场景

  • 需要高效存储大量状态变更的数据

查询注意事项

由于折叠是异步进行的,查询时应使用:


SELECT * FROM user_actions FINAL
复制代码


来获取已折叠的最终结果,或者使用条件过滤:


SELECT * FROM user_actions WHERE sign = 1
复制代码

案例

创建新表

CREATE TABLE cmt_tab (  id UInt32,  sign Int8,  date Date,  name String,  point String) ENGINE = CollapsingMergeTree(sign)PARTITION BY toYYYYMM(date)ORDER BY (name, id)SAMPLE BY id;
复制代码


执行结果如下图:


插入数据

INSERT INTO cmt_tab (id, sign, date, name, point) VALUES(1, 1, '2024-01-01', 'Alice', '10'),(2, 1, '2024-01-01', 'Bob', '15'),(3, 1, '2024-01-02', 'Charlie', '20'),(4, 1, '2024-01-02', 'David', '25'),(5, 1, '2024-01-03', 'Eve', '30');
-- Mark Alice's row as deleted-- Mark Bob's row as deletedINSERT INTO cmt_tab (id, sign, date, name, point) VALUES(1, -1, '2024-01-01', 'Alice', '10'),(2, -1, '2024-01-01', 'Bob', '15');
-- Insert Alice's updated row-- Insert Bob's updated rowINSERT INTO cmt_tab (id, sign, date, name, point) VALUES(1, 1, '2024-01-01', 'Alice', '12'),(2, 1, '2024-01-01', 'Bob', '18');
复制代码


运行结果如下所示:


optimize

OPTIMIZE TABLE cmt_tab;SELECT  *FROM  cmt_tab;
复制代码


执行结果如下图所示:


使用场景

大数据中对于数据更新很难做到,比如统计一个网站或 TV 的用户数,更多场景都是选择用记录每个点的数据,再对数据进行聚合查询。而 ClickHouse 通过 CollapsingMergeTree 就可以实现,使得 CollapsingMergeTreeTree 大部分用于 OLAP 场景。

VersionedCollapsingMergeTree

这个引擎和 CollapsingMergeTree 差不多,只是对 CollapsingMergeTree 引擎加了一个版本,比如可以适用于非实时的在线统计,统计每个节点用户在线的业务。

其他数据源

端口冲突

我们的 ClickHouse 和 Hadoop 的 9000 端口冲突了,看大家是更改 ClickHouse 的端口,还是 Hadoop 的端口。我这里选择修改 ClickHouse 的端口,从 9000 到 9001。不过如果你不做 HDFS 的相关实验,这块冲突不管直接跳过就好。


我这里选择修改 ClickHouse,我已经集群都修改完毕了,所以我连接方式修改为:


clickhouse-client -m --host h121.wzk.icu --port 9001 --user default --password clickhouse@wzk.icu
复制代码

HDFS

该引擎提供了集成了 Apache Hadoop 生态系统通过允许管理数据 HDFS 通过 ClickHouse,这个引擎是相似的到文件和 URL 引擎,但提供 Hadoop 特定的功能。

用途介绍

ENGINE = HDFS(URI, format)该 URI 参数是 HDFS 中整个文件的 URI,该 format 参数指定一种可用的文件格式。执行 SELECT 查询时,格式必须支持输入。

示例 1

添加新表

设置 HDFS_ENGINE_TABLE 表:


CREATE TABLE hdfs_engine_table(  name String,  value UInt32) ENGINE = HDFS('hdfs://h121.wzk.icu:9000/clickhouse', 'TSV');
复制代码


运行之后的截图为:


插入数据

INSERT INTO hdfs_engine_table VALUES('one', 1), ('two', 2), ('three', 3);
复制代码


运行之后截图为:


查询数据

SELECT  *FROM  hdfs_engine_table;
复制代码


运行之后的截图为:


HDFS 数据查看

实施细节

  • 读取和写入可以并行

  • 不支持:ALTER、SELECT SAMPLE、索引、复制

MySQL

介绍

MySQL 引擎可以对存储在远程 MySQL 服务器上的数据执行 SELECT 查询。

调用参数

  • host:port MySQL 服务器地址

  • database 数据库名称

  • table 表名称

  • user 数据库用户

  • password 用户密码

  • replace_query 将 INSERT INTO 查询是否替换为 REPLACE_INFO 的标志,如果 REPLACE_QUERY=1 则替换查询

  • on_duplicate_clause 将 ON DUPLCATE KEY UPDATE 表达式添加到 INSERT 查询语句中。

示例

创建新表

CREATE TABLE mysql_table2 (  `id` UInt32,  `name` String,  `age` UInt32) ENGINE = MySQL('h122.wzk.icu:3306', 'clickhouse', 'mysql_table2', 'hive', 'hive@wzk.icu')
复制代码


执行结果如下图所示:


数据库配置

在数据库中,我们要建立好对应的数据库和表:


插入数据

INSERT INTO mysql_table2 VALUES(1, 'wzk', 18);INSERT INTO mysql_table2 VALUES(2, 'icu', 18);
复制代码

查询数据

SELECT  *FROM  mysql_table2;
复制代码


运行之后截图:


Kafka

Apache Kafka 是一个分布式流处理平台,广泛用于构建实时数据管道和流应用。它能够高效地处理大量的实时数据流,常用于日志收集、事件监控、实时分析等场景。ClickHouse 提供了专门的 Kafka 引擎,使其能够直接从 Kafka 中读取数据,实现实时数据流的处理与分析。

创建新表

CREATE TABLE kafka_events(    `timestamp` DateTime,    `event_type` String,    `user_id` UInt64,    `event_data` String)ENGINE = KafkaSETTINGS    kafka_broker_list = 'broker1:9092,broker2:9092',    kafka_topic_list = 'events_topic',    kafka_group_name = 'clickhouse_group',    kafka_format = 'JSONEachRow',    kafka_num_consumers = 1;
复制代码


创建目标表并设置 Materialized View 为了将 Kafka 中的数据持久化到 ClickHouse 的表中,通常会创建一个目标表,并通过 Materialized View 实现自动插入。


CREATE TABLE events (    `timestamp` DateTime,    `event_type` String,    `user_id` UInt64,    `event_data` String) ENGINE = MergeTree()ORDER BY timestamp;
CREATE MATERIALIZED VIEW kafka_to_eventsTO eventsAS SELECT * FROM kafka_events;
复制代码

插入数据

INSERT INTO events SELECT * FROM kafka_events;
复制代码

应用场景

  • 实时日志分析:通过 Kafka 收集应用日志,ClickHouse 实时消费并分析日志数据,支持快速故障排查和性能监控。

  • 事件驱动的业务分析:实时跟踪用户行为事件,进行实时的用户行为分析和推荐系统。

  • 实时监控与报警:将监控数据流入 Kafka,ClickHouse 处理并生成实时报警指标。

错误速查

引擎对比

其他系列

🚀 AI 篇持续更新中(长期更新)

AI 炼丹日志-29 - 字节跳动 DeerFlow 深度研究框斜体样式架 私有部署 测试上手 架构研究,持续打造实用 AI 工具指南!AI-调查研究-108-具身智能 机器人模型训练全流程详解:从预训练到强化学习与人类反馈🔗 AI模块直达链接

💻 Java 篇持续更新中(长期更新)

Java-154 深入浅出 MongoDB 用 Java 访问 MongoDB 数据库 从环境搭建到 CRUD 完整示例 MyBatis 已完结,Spring 已完结,Nginx 已完结,Tomcat 已完结,分布式服务正在更新!深入浅出助你打牢基础!🔗 Java模块直达链接

📊 大数据板块已完成多项干货更新(300 篇):

包括 Hadoop、Hive、Kafka、Flink、ClickHouse、Elasticsearch 等二十余项核心组件,覆盖离线+实时数仓全栈!大数据-278 Spark MLib - 基础介绍 机器学习算法 梯度提升树 GBDT 案例 详解🔗 大数据模块直达链接

发布于: 14 分钟前阅读数: 5
用户头像

武子康

关注

永远好奇 无限进步 2019-04-14 加入

Hi, I'm Zikang,好奇心驱动的探索者 | INTJ / INFJ 我热爱探索一切值得深究的事物。对技术、成长、效率、认知、人生有着持续的好奇心和行动力。 坚信「飞轮效应」,相信每一次微小的积累,终将带来深远的改变。

评论

发布
暂无评论
大数据-140 ClickHouse CollapsingMergeTree详解 外部数据源最小闭环HDFS/MySQL/Kafka_MySQL_武子康_InfoQ写作社区