写点什么

CDC ChangeLog Stream 实时流入 CLICKHOUSE 最佳姿势

作者:水滴
  • 2022-10-25
    广东
  • 本文字数:3507 字

    阅读完需:约 12 分钟

CDC ChangeLog Stream实时流入CLICKHOUSE最佳姿势

承接上个专题 clickhosue准实时数仓能力探索 遗留问题“上游实时数据怎么 sink 到 clickhouse?”,在这

里一起探索 CDC ChangeLog Stream 实时流 sink 到 CLICKHOUSE 最佳姿势。


在进行技术选型、方案设计与实操之前,先简单概述下数据库变更日志是怎么流入 click house 的:


CDC 技术通过实时捕捉数据变更日志作为流计算引擎(如 flink,spark)数据源,这些实时流数据源 ChangeLog Stream 由包含变更操作列(用于插入、删除、更新(先前)、更新(新)标识)的行和实际的元数据列组成,流入 flink 引擎。flink 再将 ChangeLog Stream 转换为 Dynamic Table 的 Append 或 Retract 或 Upsert 模式,然后再 sink 到外部系统,如:clickhouse


这里涉及到几个术语解释:


  • Dynamic Table & ChangeLog Stream

  • Append-only stream

  • Retract stream

  • Upsert stream

Dynamic Table & ChangeLog Stream

Dynamic table 在 flink 中是一个逻辑概念,。下图是 ChangeLog Stream 和 dynamic table 转换关系,先将 ChangeLog Stream 转化为 dynamic table,再基于 dynamic table 进行 SQL 操作生成新的 dynamic table。



  • Dynamic Table 就是 Flink SQL 定义的动态表,动态表和流的概念是对等的。参照上图,流可以转换成动态表,动态表也可以转换成流。

  • 在 Flink SQL 中,数据在从一个算子流向另外一个算子时都是以 Changelog Stream 的形式,任意时刻的 Changelog Stream 可以翻译为一个表,也可以翻译为一个流。


上游 CDC 技术,实时捕捉数据库变更日志,flink 实时消费日志,数据库中的变更日志作为 flink 流的数据源(Changelog Stream),如 MySQL 的 binlog 日志完整记录了数据库中的变更,可以把 binlog 文件当作流的数据源在将 Changelog Stream 转换为 Dynamic Table 或将其写入外部系统时,Flink 根据数据变化类型提供三种结果的输出模式。

Append-only stream


Append-only stream: A dynamic table that is only modified by INSERT changes can be converted into a stream by emitting the inserted rows.


Append-only 是最为简单的输出模式,只支持追加结果记录的操作。结果一旦输出以后便不会再有变更,Append 输出模式的最大特性是不可变性(immutability)通常来说,Append 模式会用于写入不方便做撤回或者删除操作的存储系统的场景,比如 Kafka 等 MQ 或者打印到控制台。

Retract stream


Retract stream: A retract stream is a stream with two types of messages, add messages and retract messages. A dynamic table is converted into a retract stream by encoding an INSERT change as add message, a DELETE change as a retract message, and an UPDATE change as a retract message for the updated (previous) row, and an additional message for the updating (new) row. The following figure visualizes the conversion of a dynamic table into a retract stream.


retract 流包含两种类型的 message: add messages 和 retract messages 。通过将 INSERT 操作编码为 add message、将 DELETE 操作编码为 retract message、将 UPDATE 操作编码为更新(先前)行的 retract message 和更新(新)行的 add message,将动态表转换为 retract 流。



如上图,在 mysql 执行 update 操作


update inventory.debezium_products set weight=180 where id=101;


ChangeLog 转为 Retract stream 会在 dynamic table 写入以下数据


op          id                           name               description       weight  -U         101                        scooter                        1111111       80.000 +U         101                        scooter                        1111111      180.000
复制代码


  • -U 更新(先前)行数据

  • +U 更新(新)行的数据

Upsert stream


Upsert stream: An upsert stream is a stream with two types of messages, upsert messages and delete messages. A dynamic table that is converted into an upsert stream requires a (possibly composite) unique key. A dynamic table with a unique key is transformed into a stream by encoding INSERT and UPDATE changes as upsert messages and DELETE changes as delete messages.


upsert 流包含两种类型的 message: upsert messages 和_delete messages_。转换为 upsert 流的动态表需要(可能是组合的)唯一键。通过将 INSERT 和 UPDATE 操作编码为 upsert message,将 DELETE 操作编码为 delete message 。



如上图,在 mysql 执行 update 操作


update inventory.debezium_products set weight=180 where id=101;


ChangeLog 转为 Retract stream 会在 dynamic table 写入以下数据


op          id                           name                    description       weight               EXPR$4 -D         101                        scooter                        1111111      180.000                    1 +I         101                        scooter                        1111111      190.000                    1
复制代码

ChangeLog Stream 写入 clickhosue 方案


由于 clickhosue 以下特性,ChangeLog Stream 写入 clickhosue 需要相应解决方案


  • clickhosue 不适合大量单条数据的写请求,因为写入过快时后台合并不过来,会报 Too many parts 等错误

  • clickhosue 不适合高频繁的数据更新和删除操作,因为变更数据的聚合处理需要时间,短期内可能出现数据不准的现象,同时 clickhosue 对事务支持也不够完善。


通过以下方案,解决 ChangeLog Stream 写入 clickhosue 存在以上局限问题


  • 使用 CollapsingMergeTree 表承接 Retract stream 与 Upsert stream 写入。


  • 使用 VersionedCollapsingMergeTree 表承接 Versioned Table 的 Retract stream 写入。


  • 使用 ReplacingMergeTree 表承接 Append-only stream 或特定场景(没有物理删除)Retract stream 写入。


  • 某些数据量小,变更不频繁维表的 Upsert stream 可以使用 other engines of this family (*MergeTree)表承接写入


  • 按数据批次大小以及批次间隔两个条件控制写入频率,在 part merge 压力和数据实时性两方面取得平衡。


基于以上解决方案,flink-connector-clickhouse 设计如下图,关注"数据万有引力"公众号,私信获取 “flink-connector-clickhouse-1.15.1-SNAPSHOT.jar”



  • 首先考虑到 ClickHouse 擅长大批量写入的特点,通过 batch option 可以支持攒批写入,避免频繁写入造成的性能下降问题;同时通过 batchTime option 兼顾数据实时性;两个 option,只要其中一个满足条件就触发 sink,从而在 part merge 压力和数据实时性两方面取得平衡


  • 其次是 ChangeLog Stream 包含大量的更新和删除操作。为了支持频繁变更的数据,将 Flink 的 Retract Stream(回撤流)、Upsert Stream(更新-插入流)含有状态标记的数据流,写入到 ClickHouse 的 CollapsingMergeTree 引擎表中。


  • 然后是 Versioned Table(dynamic table with a PRIMARY KEY constraint and time attribute),通过将 event time 生成 version,根据状态标记生成 sign,再将数据流写入 ClickHouse 的 VersionedCollapsingMergeTree 引擎表中。

CDC 技术选型



在 flink cdc connector 与 flink Debezium Format 对 CDC 技术进行选型,通过上图架构与对比


  • flink cdc connector 需要维护组件更少,实时链路更简单,部署成本低;全量阶段支持数据并发读取,并且支持全量阶段 checkpoint;可以不需要对库或表加锁来保证数据一致性。


  • Debezium 的使用人数多,社区活跃,框架也比较成熟,技术更稳定;在保证数据一致性时,需要对读取的库或表加锁;全量阶段读取阶段,只支持单并发。


虽然 flink cdc 有很多亮点能力,不过项目还在孵化阶段,有些操作不是很丝滑;如果有功力深厚的技术架构团队来驾驭它(陪社区一起成长,拥抱社区并与之合作),flink cdc 可以覆盖业务场景会更深。如果业务场景对稳定要求比较高,同时又不想投入高成本驾驭技术,其实 Debezium 已经可以覆盖很多场景了。


可以将 Debezium 作为 Flink 的嵌入式引擎,作为一个依赖包嵌入到代码库,而不用通过 kafka connector 运行,同样也可以不再需要直接与 MySQL 服务器通信,不需要处理复杂快照、GTID、锁等等优点。同时简化

全过程解决方案


根据上面探索,最终 CDC ChangeLog Stream 实时流 sink 到 CLICKHOUSE 全过程解决方案如上图


  • flink cdc connector 实时捕捉数据变更日志,实现数据全量与实时增量采集。


  • 自研 flink-connector-clickhouse 实现不同 ChangeLog Stream 模式(append\retract\upsert)输出到相应的 clickhosue 表引擎。


  • 在 clickhouse 使用相应表引擎承接上游数据输入。



CDC ChangeLog Stream实时流入CLICKHOUSE最佳姿势


关注"数据万有引力"  公众号私信获取 “flink-connector-clickhouse”


发布于: 刚刚阅读数: 4
用户头像

水滴

关注

技术无产阶级 2020-03-23 加入

弱小和无知不是生存的障碍,傲慢才是。 关注"数据万有引力"  公众号,获取更多数据技术与建设方法论知识。

评论

发布
暂无评论
CDC ChangeLog Stream实时流入CLICKHOUSE最佳姿势_水滴_InfoQ写作社区