写点什么

用 SeaTunnel 同步 MySQL 到 Doris:全量增量 + SQL 过滤

作者:白鲸开源
  • 2025-09-25
    天津
  • 本文字数:1075 字

    阅读完需:约 4 分钟

Apache SeaTunnel 能够实现 MySQL 到 Doris 的全量和增量数据同步,同时也支持 SQL 级别的数据过滤。以下是具体实现方式及功能特点:

全量与增量同步支持

1. 全量同步

实现方式:通过 SeaTunnel 的批处理模式(job.mode = "BATCH"),将 MySQL 的历史数据一次性导入 Doris。支持分片读取(如按主键分片)以提升效率,并可通过配置参数优化并行度、批量写入大小等。


source {  Jdbc {    query = "SELECT * FROM orders"  -- 全量数据读取    partition_column = "id"        -- 分片字段    split.size = 5000              -- 每分片读取行数  }}
复制代码

2. 增量同步

  • 基于时间戳字段:通过WHERE update_time >= '${last_update_time}'动态参数筛选增量数据,需外部系统记录时间点并触发定期任务。

  • 基于 CDC(变更数据捕获):使用 MySQL CDC 连接器实时捕获 Binlog 变更(如插入、更新、删除),同步至 Doris。支持精确一次语义(Exactly-Once)和流式处理。


source {  MySQL-CDC {    startup.mode = "latest"  -- 从最新位点开始同步    table-names = ["db.table"]  }}
复制代码

SQL 级数据过滤支持

1. 数据源层过滤

  • 在源端 SQL 中定义过滤条件:通过 source 模块的 query 参数直接筛选数据,例如仅同步特定状态或时间范围的数据。


source {  Jdbc {    query = """      SELECT * FROM orders       WHERE status = 1 AND create_time > '2025-01-01'    """  }}
复制代码


  1. 转换层过滤 Transform 模块的 SQL 转换:在 transform 阶段通过自定义 SQL 对数据进行清洗、过滤或字段映射。


transform {  Sql {    query = "SELECT id, name FROM source WHERE amount > 1000"  -- 过滤金额小于1000的数据  }}
复制代码

关键配置与注意事项

  1. Doris Sink 参数


需配置 Doris FE 节点地址、批量写入参数(batch_size)、数据合并策略(merge_type)等。


sink {  Doris {    fenodes = "doris_fe:8030"    batch_size = 10000    stream_load_properties = { "merge_type" = "MERGE" }  }}
复制代码


  1. CDC 同步依赖条件


MySQL 需开启 Binlog 并配置ROW模式,用户需具备SELECT, REPLICATION SLAVE权限。


  1. 性能调优


全量同步建议分片读取避免单节点压力;增量同步可调整 Flink 或 Zeta 引擎的并行度以提升吞吐量。

与其他工具的对比

  • SeaTunnel 优势:支持批流一体、SQL 级灵活性、精确一次语义,适合复杂场景;相比之下,DataX 仅支持离线批量,Sqoop 局限于 Hadoop 生态 123。

  • 与 CloudCanal 对比:虽然 CloudCanal 提供可视化界面,但 SeaTunnel 的插件化架构和开源特性更适合自定义开发。


原文链接:https://blog.csdn.net/a772304419/article/details/146341445

用户头像

白鲸开源

关注

一家开源原生的DataOps商业公司。 2022-03-18 加入

致力于打造下一代开源原生的DataOps 平台,助力企业在大数据和云时代,智能化地完成多数据源、多云及信创环境的数据集成、调度开发和治理,以提高企业解决数据问题的效率,提升企业分析洞察能力和决策能力。

评论

发布
暂无评论
用 SeaTunnel 同步 MySQL 到 Doris:全量增量 + SQL 过滤_MySQL_白鲸开源_InfoQ写作社区