写点什么

如何将 SeaTunnel MySQL-CDC 与 Databend 高效整合?格式与方案全解析

作者:白鲸开源
  • 2025-06-11
    天津
  • 本文字数:7217 字

    阅读完需:约 24 分钟

如何将SeaTunnel MySQL-CDC与Databend 高效整合?格式与方案全解析

转载自 wubx


SeaTunnel 是一款易用且高性能的分布式数据集成平台,支持实时海量数据同步,具备稳定、高效的处理能力,每天可同步数百亿级别的数据,现已在国内 3000+ 企业的生产环境中广泛应用。


Databend 则是一款云原生的存算分离数据平台,具备弹性、高并发等特性,适用于现代数据处理需求。


本文将聚焦分析 SeaTunnel 中 MySQL-CDC 插件及其 Sink 输出的数据格式,并进一步探讨在实际场景中将 SeaTunnel 与 Databend 进行集成的可行性与实现路径。


SeaTunnel 和 MySQL-CDC

SeaTunnel 的 MySQL CDC 连接器允许从 MySQL 数据库中读取快照数据和增量数据。根据不同的 sink 端,观察 MySQL-CDC 输出的数据是否可以直接被 Databend 使用。


从测试来看,SeaTunnel 所用的 MySQL 同步组件应该是 debezium-mysql-connector(Kafka Connect 也调用该组件)。

source: MySQL-CDC sink: console

任务设定是通过 SeaTunnel 从 MySQL 中同步 wubx.t01 表。配置文件 v2.mysql.streaming.conf


# v2.mysql.streaming.confenv{  parallelism = 1  job.mode = "STREAMING"  checkpoint.interval = 2000}
source { MySQL-CDC { base-url="jdbc:mysql://192.168.1.100:3306/wubx" username="wubx" password="wubxwubx" table-names=["wubx.t01"] startup.mode="initial" }}
sink { Console { }}
复制代码

启动 SeaTunnel

./bin/seatunnel.sh --config ./config/v2.mysql.streaming.conf -m local
复制代码


观察终端上日志

观察到全量同步

SELECT * FROM `wubx`.`t01`
复制代码


获取到的数据如下:


2025-05-07 14:28:21,914 INFO  [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=0  rowIndex=1:  SeaTunnelRow#tableId=wubx.t01 SeaTunnelRow#kind=INSERT : 1, databend2025-05-07 14:28:21,914 INFO  [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=0  rowIndex=2:  SeaTunnelRow#tableId=wubx.t01 SeaTunnelRow#kind=INSERT : 3, MySQL2025-05-07 14:28:21,914 INFO  [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=0  rowIndex=3:  SeaTunnelRow#tableId=wubx.t01 SeaTunnelRow#kind=INSERT : 4, Setunnel01
复制代码


全量同步完成

source 端 insert

insert into t01 values(5,'SeaTunnel');在 SeaTunnel 中可以直接捕获到增量数据,对应的动作为 kind=INSERT。


2025-05-07 14:35:48,520 INFO  [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=0  rowIndex=4:  SeaTunnelRow#tableId=wubx.t01 SeaTunnelRow#kind=INSERT : 5, SeaTunnel
复制代码

source 端 update

update t01 set c1='MySQL-CDC' where id=5;


2025-05-07 14:36:47,455 INFO  [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=0  rowIndex=5:  SeaTunnelRow#tableId=wubx.t01 SeaTunnelRow#kind=UPDATE_BEFORE : 5, SeaTunnel2025-05-07 14:36:47,455 INFO  [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=0  rowIndex=6:  SeaTunnelRow#tableId=wubx.t01 SeaTunnelRow#kind=UPDATE_AFTER : 5, MySQL-CDC
复制代码

source 端 delete

delete from t01 where id=5;


2025-05-07 14:37:33,082 INFO  [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=0  rowIndex=7:  SeaTunnelRow#tableId=wubx.t01 SeaTunnelRow#kind=DELETE : 5, MySQL-CDC
复制代码


从 console 输出的日志格式较为清晰,非常利于排查问题和后续使用。

source: MySQL-CDC sink: MySQL

通过上述 MySQL-CDC 输出终端的测试,可以确认 insert、update、delete 操作均能被正确捕获和处理。接下来我们测试 MySQL-CDC -> MySQL,对应的配置文件 v2.mysql.streaming.m.conf 如下:


#v2.mysql.streaming.m.confenv{  parallelism = 1  job.mode = "STREAMING"  checkpoint.interval = 2000}
source { MySQL-CDC { base-url="jdbc:mysql://192.168.1.100:3306/wubx" username="wubx" password="wubxwubx" table-names=["wubx.t01"] startup.mode="initial" }}
sink { jdbc { url = "jdbc:mysql://192.168.1.100:3306/wubx?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true" driver = "com.mysql.cj.jdbc.Driver" user = "wubx" password = "wubxwubx" generate_sink_sql = true # You need to configure both database and table database = wubx table = s_t01 primary_keys = ["id"] schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST" data_save_mode="APPEND_DATA" }}
复制代码

启动 SeaTunnel

./bin/seatunnel.sh --config ./config/v2.mysql.streaming.m.conf -m local
复制代码


观察终端上日志

同步过程分析

全量同步语句:


2025-05-07 14:56:01,024 INFO  [e.IncrementalSourceScanFetcher] [debezium-snapshot-reader-0] - Start snapshot read task for snapshot split: SnapshotSplit(tableId=wubx.t01, splitKeyType=ROW<id INT>, splitStart=null, splitEnd=null, lowWatermark=null, highWatermark=null) exactly-once: false2025-05-07 14:56:01,026 INFO  [f.s.MySqlSnapshotSplitReadTask] [debezium-snapshot-reader-0] - Snapshot step 1 - Determining low watermark {ts_sec=0, file=mysql-bin.000058, pos=7737, gtids=12b437c2-ba62-11ec-a554-b4b5b694bca5:1-2215900, row=0, event=0} for split SnapshotSplit(tableId=wubx.t01, splitKeyType=ROW<id INT>, splitStart=null, splitEnd=null, lowWatermark=null, highWatermark=null)2025-05-07 14:56:01,028 INFO  [f.s.MySqlSnapshotSplitReadTask] [debezium-snapshot-reader-0] - Snapshot step 2 - Snapshotting data2025-05-07 14:56:01,028 INFO  [f.s.MySqlSnapshotSplitReadTask] [debezium-snapshot-reader-0] - Exporting data from split 'wubx.t01:0' of table wubx.t012025-05-07 14:56:01,028 INFO  [f.s.MySqlSnapshotSplitReadTask] [debezium-snapshot-reader-0] - For split 'wubx.t01:0' of table wubx.t01 using select statement: 'SELECT * FROM `wubx`.`t01`'2025-05-07 14:56:01,032 INFO  [f.s.MySqlSnapshotSplitReadTask] [debezium-snapshot-reader-0] - Finished exporting 3 records for split 'wubx.t01:0', total duration '00:00:00.004'2025-05-07 14:56:01,033 INFO  [f.s.MySqlSnapshotSplitReadTask] [debezium-snapshot-reader-0] - Snapshot step 3 - Determining high watermark {ts_sec=0, file=mysql-bin.000058, pos=7737, gtids=12b437c2-ba62-11ec-a554-b4b5b694bca5:1-2215900, row=0, event=0} for split SnapshotSplit(tableId=wubx.t01, splitKeyType=ROW<id INT>, splitStart=null, splitEnd=null, lowWatermark=null, highWatermark=null)2025-05-07 14:56:01,519 INFO  [o.a.s.c.s.c.s.r.f.SplitFetcher] [Source Data Fetcher for BlockingWorker-TaskGroupLocation{jobId=972391330309210113, pipelineId=1, taskGroupId=2}] - Finished reading from splits [wubx.t01:0]
复制代码


sink 端写数据对应的 prepare 语句


2025-05-07 14:56:01,708 INFO  [.e.FieldNamedPreparedStatement] [st-multi-table-sink-writer-1] - PrepareStatement sql is:INSERT INTO `wubx`.`s_t01` (`id`, `c1`) VALUES (?, ?) ON DUPLICATE KEY UPDATE `id`=VALUES(`id`), `c1`=VALUES(`c1`)
2025-05-07 14:56:01,709 INFO [.e.FieldNamedPreparedStatement] [st-multi-table-sink-writer-1] - PrepareStatement sql is:DELETE FROM `wubx`.`s_t01` WHERE `id` = ?
复制代码


从上述语句可以看出,对应的 binlog event 可以直接处理:


  • insert ,update 可以直接用 INSERT INTO wubx.s_t01 (id, c1) VALUES (?, ?) ON DUPLICATE KEY UPDATE id=VALUES(id), c1=VALUES(c1) 处理

  • delete 使用: DELETE FROM wubx.s_t01 WHERE id = ? 处理

小结

SeaTunnel MySQL-CDC 这块应该比较稳定,底层数据读取使用的是 debezium,这是一个非常成熟的工具,值得信赖。

source: MySQL-CDC sink: s3 format json

本节也重点关注在云环境下的数据同步基座,尤其是如何以最低成本完成数据同步工作。在云上进行数据同步时,需要考虑如何以最低成本完成这项工作。在海外项目中,开发者更倾向于使用 kafka-connect,通常先将数据 Sink 到 S3 中,然后批量处理 S3 中的文件,最终得到一份完整的数据。


直接使用配置文件 v2.mysql.streaming.s3.conf:


env{  parallelism = 1  job.mode = "STREAMING"  checkpoint.interval = 2000}
source { MySQL-CDC { base-url="jdbc:mysql://192.168.1.100:3306/wubx" username="wubx" password="wubxwubx" table-names=["wubx.t01"] startup.mode="initial" }}
sink { S3File { bucket = "s3a://mystage" tmp_path = "/tmp/SeaTunnel/${table_name}" path="/mysql/${table_name}" fs.s3a.endpoint="http://192.168.1.100:9900" fs.s3a.aws.credentials.provider="org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider" access_key = "minioadmin" secret_key = "minioadmin" file_format_type="json" schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST" data_save_mode="APPEND_DATA" }}
复制代码


首先使用 json 格式进行 sink

启动 SeaTunnel

./bin/seatunnel.sh --config ./config/v2.mysql.streaming.s3.conf -m local
复制代码


观察终端上日志

发现全量同步

2025-05-07 15:14:41,430 INFO  [.c.s.f.h.HadoopFileSystemProxy] [hz.main.generic-operation.thread-42] - rename file :[/tmp/SeaTunnel/t01/SeaTunnel/972396021571125249/c679929b12/T_972396021571125249_c679929b12_0_1/NON_PARTITION/T_972396021571125249_c679929b12_0_1_0.json] to [/mysql/t01/T_972396021571125249_c679929b12_0_1_0.json] finish
复制代码


/mysql/t01/T_972396021571125249_c679929b12_0_1_0.json 内容:


{"id":1,"c1":"databend"}{"id":3,"c1":"MySQL"}{"id":4,"c1":"Setunnel01"}{"id":5,"c1":"SeaTunnel"}
复制代码


看到这里感觉有些失望,似乎缺少了 kind 和时间字段。

source 端 insert

接下来 insert into t01 values(6,'SeaTunnel01');


2025-05-07 15:18:59,380 INFO  [.c.s.f.h.HadoopFileSystemProxy] [hz.main.generic-operation.thread-16] - rename file :[/tmp/SeaTunnel/t01/SeaTunnel/972396021571125249/c679929b12/T_972396021571125249_c679929b12_0_130/NON_PARTITION/T_972396021571125249_c679929b12_0_130_0.json] to [/mysql/t01/T_972396021571125249_c679929b12_0_130_0.json] finish
复制代码


T_972396021571125249_c679929b12_0_130_0.json 内容为:


{"id":6,"c1":"SeaTunnel01"}
复制代码

source 端 update

update t01 set c1='MySQL-CDC' where id=5;


2025-05-07 15:20:15,386 INFO  [.c.s.f.h.HadoopFileSystemProxy] [hz.main.generic-operation.thread-9] - rename file :[/tmp/SeaTunnel/t01/SeaTunnel/972396021571125249/c679929b12/T_972396021571125249_c679929b12_0_168/NON_PARTITION/T_972396021571125249_c679929b12_0_168_0.json] to [/mysql/t01/T_972396021571125249_c679929b12_0_168_0.json] finish
复制代码


T_972396021571125249_c679929b12_0_168_0.json 对应内容:


{"id":5,"c1":"SeaTunnel"}{"id":5,"c1":"MySQL-CDC"}
复制代码


一个 update 操作在 json 文件中记录了两条数据,但由于缺少操作类型(kind)和时间字段,难以准确还原数据变更过程。如果包含时间字段,还可以选择保留最新的一条记录。

source 端 delete

delete from t01 where id=5;


2025-05-07 15:22:53,392 INFO  [.c.s.f.h.HadoopFileSystemProxy] [hz.main.generic-operation.thread-6] - rename file :[/tmp/SeaTunnel/t01/SeaTunnel/972396021571125249/c679929b12/T_972396021571125249_c679929b12_0_247/NON_PARTITION/T_972396021571125249_c679929b12_0_247_0.json] to [/mysql/t01/T_972396021571125249_c679929b12_0_247_0.json] finish
复制代码


T_972396021571125249_c679929b12_0_247_0.json 对应的内容


{"id":5,"c1":"MySQL-CDC"}
复制代码


delete 操作同样缺少操作类型(kind),仅记录了一行原始数据,因此难以用于后续的数据处理和溯源。

小结

因此,利用 SeaTunnel 的 S3File sink 以 json 格式进行数据溯源目前并不可行。建议 S3File sink 增加对 maxwell_json 和 debezium_json 格式的支持。
复制代码


https://github.com/apache/SeaTunnel/issues/9278


期待这一功能的完善,这样 SeaTunnel 就可以将所有数据同步到 S3,让 S3 承担消息队列的功能。
复制代码

source: MySQL-CDC sink: Kafka

开源世界非常有趣,如果一个功能无法实现,总会有其他替代方案。因为 MySQL-CDC 底层基于 Debezium,应该支持 Debezium format。


https://SeaTunnel.apache.org/docs/2.3.10/connector-v2/formats/debezium-json 而且还支持 https://SeaTunnel.apache.org/docs/2.3.10/connector-v2/formats/maxwell-json


也就是说,SeaTunnel 为了保持与 debezium 和 maxwell 的兼容性,支持在 sink 到 Kafka 时选择这两种格式。

debezium-json

{    "before": {        "id": 111,        "name": "scooter",        "description": "Big 2-wheel scooter ",        "weight": 5.18    },    "after": {        "id": 111,        "name": "scooter",        "description": "Big 2-wheel scooter ",        "weight": 5.17    },    "source": {        "version": "1.1.1.Final",        "connector": "mysql",        "name": "dbserver1",        "ts_ms": 1589362330000,        "snapshot": "false",        "db": "inventory",        "table": "products",        "server_id": 223344,        "gtid": null,        "file": "mysql-bin.000003",        "pos": 2090,        "row": 0,        "thread": 2,        "query": null    },    "op": "u",    "ts_ms": 1589362330904,    "transaction": null}
复制代码


上述格式的数据在 Databend 或 Snowflake 中都比较容易处理,可以根据对应的
复制代码


    "op": "u",    "ts_ms": 1589362330904,
复制代码


使用 merge into + stream 的方式把数据合并到目标表中。

maxwell-json

{    "database":"test",    "table":"product",    "type":"insert",    "ts":1596684904,    "xid":7201,    "commit":true,    "data":{        "id":111,        "name":"scooter",        "description":"Big 2-wheel scooter ",        "weight":5.18    },    "primary_key_columns":[        "id"    ]}
复制代码


这个 json 体中包含了 type、ts 和主键字段,后续利用 SQL 进行 ELT 处理也非常方便。

小结

也就是说,如果想用 SeaTunnel 输出这种标准的 CDC 格式日志,还需要引入类似 Kafka 的架构:



与群里的小伙伴交流后发现,确实有人这么做,从 Kafka 中将消息同步到 OSS 上。

用 maxwell-json 消息体举例实现和 Databend 的整合

  1. 创建一个用于记录 binlog 消息体明细的 update 表,用于记录明细


create table t01_update(  database varchar,  table varchar,  type varchar,  ts bigint,  xid bigint,  commit boolean,  data variant,  primary_key_columns array(varchar));
复制代码


该表数据源可以从 S3 获取,利用 copy into 可以把对应的数据近实时的加载到 t01_update 这张表里


  1. 创建一个目标表: t01


id int,name varchar,description varchar,weight double);
复制代码


  1. 对 t01_update 表创建一个 stream 用于记录该表的增量


create stream stream_t01_update on table t01_update;
复制代码


  1. 在 Databend 实现该数据合并到目标表中


MERGE INTO t01 AS aUSING (    SELECT         data:id AS id,        data:name AS name,        data:description AS description,        data:weight AS weight,        ts,        type    FROM stream_t01_update    QUALIFY ROW_NUMBER() OVER (PARTITION BY id ORDER BY ts DESC) = 1) AS bON a.id = b.idWHEN MATCHED AND b.type = 'update' THEN    UPDATE SET        a.name = b.name,        a.description = b.description,        a.weight = b.weightWHEN MATCHED AND b.type = 'delete' THEN    DELETEWHEN NOT MATCHED THEN    INSERT (id, name, description, weight)    VALUES (b.id, b.name, b.description, b.weight);
复制代码


通过该 SQL,可以实现以窗口去重的方式,将 binlog 原始数据合并到目标表。

SeaTunnel 和 Databend 整合思路

通过对 MySQL-CDC 输出形态的分析,目前有三种方案可以实现 SeaTunnel 和 Databend 的整合:


  • 第一种方式是直接开发 Databend 的 SeaTunnel connector,支持 sink 和 source。这种方式实现简单

  • 第二种方式是在 S3File 中增加对 debezium-json 和 maxwell-json 格式的支持,这是一种较为优雅的方案,后续增量数据可以基于 Databend Stream 提供,方便外部数据源直接获取

  • 第三种方式是引入 Kafka 作为 SeaTunnel 的 Sink,这样可以直接使用 debezium-json 和 maxwell-json 格式的消息体,通过数据治理实现 MySQL-CDC 到 Databend 的同步。这种方式方便多个下游系统订阅 Kafka 中的增量数据。


通过对 SeaTunnel 多种格式输出及行为的测试,我们初步了解了 SeaTunnel MySQL-CDC 的能力,为后续与 Databend 的整合做了准备。SeaTunnel 结合 Spark、Flink 等生态,已经可以胜任大型 CDC 任务。如果有相关实践,欢迎与作者交流分享。

发布于: 刚刚阅读数: 4
用户头像

白鲸开源

关注

一家开源原生的DataOps商业公司。 2022-03-18 加入

致力于打造下一代开源原生的DataOps 平台,助力企业在大数据和云时代,智能化地完成多数据源、多云及信创环境的数据集成、调度开发和治理,以提高企业解决数据问题的效率,提升企业分析洞察能力和决策能力。

评论

发布
暂无评论
如何将SeaTunnel MySQL-CDC与Databend 高效整合?格式与方案全解析_白鲸开源_InfoQ写作社区