写点什么

Apache SeaTunnel 脚本升级及参数调优实战

  • 2025-03-20
    广东
  • 本文字数:4673 字

    阅读完需:约 15 分钟

最近作者针对实时数仓的 Apache SeaTunnel 同步链路,完成了双引擎架构升级与全链路参数深度调优,希望本文能够给大家有所启发,欢迎批评指正!



Apache SeaTunnel 版本 :2.3.9

Doris 版本:2.0.6

MySQL JDBC Connector : 8.0.28

架构升级

  • 批处理链路:JDBC 并行度进行提升,基于 ID 分区实现分片读取,结合批量参数(fetch_size=10000+batch_size=5000)使全量同步吞吐量大幅增加

  • 实时增量链路:引入 MySQL-CDC 组件,通过 initial 快照模式+chunk.size.rows=8096 实现全量/增量平滑切换,事件延迟压降至 500ms 内

稳定性增强

  • 资源管控:JDBC 连接池动态扩容(max_size=20)+ CDC 限流策略(rows_per_second=1000),源库 CPU 峰值负载下降 40%

  • 容错机制:Doris 两阶段提交(enable-2pc=true)配合检查点(checkpoint.interval=10s),故障恢复时间缩短 80%

写入优化

  • 缓冲区三级联控(buffer-size=10000+buffer-count=3+flush.interval=5s)提升 Doris 写入批次质量

  • Tablet 粒度控制(request_tablet_size=5)使 BE 节点负载均衡度提升

实战演示

同步之前创建 Doris 表


-- DROP TABLE IF EXISTS ods.ods_activity_info_full;CREATE TABLE ods.ods_activity_info_full(    `id`            VARCHAR(255) COMMENT '活动id',    `k1`            DATE NOT NULL   COMMENT '分区字段',    `activity_name` STRING COMMENT '活动名称',    `activity_type` STRING COMMENT '活动类型',    `activity_desc` STRING COMMENT '活动描述',    `start_time`    STRING COMMENT '开始时间',    `end_time`      STRING COMMENT '结束时间',    `create_time`   STRING COMMENT '创建时间')    ENGINE=OLAP  -- 使用Doris的OLAP引擎,适用于高并发分析场景    UNIQUE KEY(`id`,`k1`)  -- 唯一键约束,保证(id, k1)组合的唯一性(Doris聚合模型特性)COMMENT '活动信息全量表'PARTITION BY RANGE(`k1`) ()  -- 按日期范围分区(具体分区规则由动态分区配置决定)DISTRIBUTED BY HASH(`id`)  -- 按id哈希分桶,保证相同id的数据分布在同一节点PROPERTIES(    "replication_allocation" = "tag.location.default: 1",  -- 副本分配策略:默认标签分配1个副本    "is_being_synced" = "false",          -- 是否处于同步状态(通常保持false)    "storage_format" = "V2",             -- 存储格式版本(V2支持更高效压缩和索引)    "light_schema_change" = "true",      -- 启用轻量级schema变更(仅修改元数据,无需数据重写)    "disable_auto_compaction" = "false", -- 启用自动压缩(合并小文件提升查询性能)    "enable_single_replica_compaction" = "false", -- 禁用单副本压缩(多副本时保持数据一致性)
"dynamic_partition.enable" = "true", -- 启用动态分区 "dynamic_partition.time_unit" = "DAY", -- 按天创建分区 "dynamic_partition.start" = "-60", -- 保留最近60天的历史分区 "dynamic_partition.end" = "3", -- 预先创建未来3天的分区 "dynamic_partition.prefix" = "p", -- 分区名前缀(如p20240101) "dynamic_partition.buckets" = "32", -- 每个分区的分桶数(影响并行度) "dynamic_partition.create_history_partition" = "true", -- 自动创建缺失的历史分区
"bloom_filter_columns" = "id,activity_name", -- 为高频过滤字段(id/名称)创建布隆过滤器,加速WHERE查询 "compaction_policy" = "time_series", -- 按时间序合并策略优化时序数据(适合活动时间字段) "enable_unique_key_merge_on_write" = "true", -- 唯一键写时合并(实时更新场景减少读放大) "in_memory" = "false" -- 关闭全内存存储(仅小表可开启));
复制代码
配置 SeaTunnel JDBC 同步脚本


env {  # 环境配置  parallelism = 8                     # 增加并行度以提高吞吐量  job.mode = "STREAMING"              # 使用流式处理模式进行实时同步  checkpoint.interval = 10000         # 检查点间隔,单位毫秒
# 限流配置 - 避免对源数据库造成过大压力 read_limit.bytes_per_second = 10000000 # 每秒读取字节数限制,约10MB/s read_limit.rows_per_second = 1000 # 每秒读取行数限制
# 本地检查点配置 execution.checkpoint.data-uri = "file:///opt/seatunnel/checkpoints" execution.checkpoint.max-concurrent = 1 # 最大并发检查点数
# 性能优化参数 execution.buffer-timeout = 5000 # 缓冲超时时间(毫秒) execution.jvm-options = "-Xms4g -Xmx8g -XX:+UseG1GC -XX:MaxGCPauseMillis=100"}
source { MySQL-CDC { # 基本连接配置 # server-id = 5652-5657 # MySQL复制客户端的唯一ID范围 username = "root" # 数据库用户名 password = "" # 数据库密码 table-names = ["gmall.activity_info"] # 要同步的表 base-url = "jdbc:mysql://192.168.241.128:3306/gmall?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai"
# CDC 特有配置 schema-changes.enabled = true # 启用架构变更捕获 server-time-zone = "Asia/Shanghai" # 服务器时区
# 性能优化配置 snapshot.mode = "initial" # 初始快照模式 snapshot.fetch.size = 10000 # 快照获取大小 chunk.size.rows = 8096 # 分块大小,用于并行快照 connection.pool.size = 10 # 连接池大小
# 高级配置 include.schema.changes = true # 包含架构变更事件 scan.startup.mode = "initial" # 启动模式:initial(全量+增量) scan.incremental.snapshot.chunk.size = 8096 # 增量快照分块大小 debezium.min.row.count.to.stream.results = 1000 # 流式结果的最小行数
# 容错配置 connect.timeout = 30000 # 连接超时时间(毫秒) connect.max-retries = 3 # 最大重试次数
# 输出表名 result_table_name = "mysql_cdc_source" }}
# 可选的转换逻辑,如果需要对数据进行处理transform { Sql { source_table_name = "mysql_cdc_source" result_table_name = "doris_sink_data"
# 根据需要转换字段,这里添加了一个分区字段k1 query = """ select id, formatdatetime(create_time,'yyyy-MM-dd') as k1, activity_name, activity_type, activity_desc, start_time, end_time, create_time from mysql_cdc_source """ }}
sink { Doris { # 基本连接配置 source_table_name = "doris_sink_data" # 或直接使用 "mysql_cdc_source" fenodes = "192.168.241.128:8030" username = "root" password = "" table.identifier = "ods.ods_activity_info_full" # Doris目标表
# 事务和标签配置 sink.enable-2pc = "true" # 启用两阶段提交,确保一致性 sink.label-prefix = "cdc_sync" # 导入标签前缀
# 写入模式配置 sink.properties { format = "json" read_json_by_line = "true" column_separator = "\t" # 列分隔符 line_delimiter = "\n" # 行分隔符 max_filter_ratio = "0.1" # 允许的最大错误率
# CDC特有配置 - 处理不同操作类型 # 使用Doris的UPSERT模式处理CDC事件 merge_type = "MERGE" # 合并类型:APPEND或MERGE delete_enable = "true" # 启用删除操作 }
# 性能优化配置 sink.buffer-size = 10000 # 缓冲区大小 sink.buffer-count = 3 # 缓冲区数量 sink.flush.interval-ms = 5000 # 刷新间隔 sink.max-retries = 3 # 最大重试次数 sink.parallelism = 8 # 写入并行度
# Doris连接优化 doris.config = { format = "json" read_json_by_line = "true" request_connect_timeout_ms = "5000" # 连接超时 request_timeout_ms = "30000" # 请求超时 request_tablet_size = "5" # 每个请求的tablet数量 } }}
复制代码
配置 SeaTunnel MySQLCDC 同步脚本


env {  parallelism = 8  job.mode = "BATCH"  checkpoint.interval = 30000
# 本地文件系统检查点 execution.checkpoint.data-uri = "file:///opt/seatunnel/checkpoints" execution.buffer-timeout = 5000
# JVM 参数优化 execution.jvm-options = "-Xms4g -Xmx8g -XX:+UseG1GC -XX:MaxGCPauseMillis=100"}
source { Jdbc { result_table_name = "mysql_seatunnel" url = "jdbc:mysql://192.168.241.128:3306/gmall?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai&useSSL=false&rewriteBatchedStatements=true&useServerPrepStmts=true&cachePrepStmts=true" driver = "com.mysql.cj.jdbc.Driver" connection_check_timeout_sec = 30 user = "gmall" password = "gmall"
# 使用分区并行读取 query = "select id, activity_name, activity_type, activity_desc, start_time, end_time, create_time from gmall.activity_info" partition_column = "id" partition_num = 8
# 连接池配置 connection_pool { max_size = 20 min_idle = 5 max_idle_ms = 60000 }
# 批处理配置 fetch_size = 10000 batch_size = 5000 is_exactly_once = true }}
transform { Sql { source_table_name = "mysql_seatunnel" result_table_name = "seatunnel_doris"
query = """ select id, formatdatetime(create_time,'yyyy-MM-dd') as k1, activity_name, activity_type, activity_desc, start_time, end_time, create_time from mysql_seatunnel """ }}
sink { Doris { source_table_name = "seatunnel_doris" fenodes = "192.168.241.128:8030" username = "root" password = "" table.identifier = "ods.ods_activity_info_full" sink.enable-2pc = "true" sink.label-prefix = "test_json"
# 优化Doris写入配置 sink.properties { format = "json" read_json_by_line = "true" column_separator = "\t" line_delimiter = "\n" max_filter_ratio = "0.1" }
# 批量写入配置 sink.buffer-size = 10000 sink.buffer-count = 3 sink.flush.interval-ms = 5000 sink.max-retries = 3 sink.parallelism = 8
doris.config = { format = "json" read_json_by_line = "true" request_connect_timeout_ms = "5000" request_timeout_ms = "30000" request_tablet_size = "5" } }}
复制代码


最终 Apache Doris 数据:



本文完!

用户头像

还未添加个人签名 2022-03-07 加入

Apache SeaTunnel(Incubating) 是一个分布式、高性能、易扩展、用于海量数据(离线&实时)同步和转化的数据集成平台。

评论

发布
暂无评论
Apache SeaTunnel脚本升级及参数调优实战_Apache SeaTunnel_InfoQ写作社区