写点什么

从“分散”到“统一”,中控技术利用 SeaTunnel 构建高效数据采集框架,核心数据同步任务 0 故障运行!

作者:白鲸开源
  • 2025-09-18
    天津
  • 本文字数:8512 字

    阅读完需:约 28 分钟

作者 | 崔俊乐


引言:对企业而言,数据采集的核心挑战从来不仅仅是“同步”,而是如何在大规模、多元异构的复杂环境下,保障数据的准确性、完整性和时效性。本文将深入探讨中控技术基于 Apache SeaTunnel 构建企业级数据采集框架的实践,重点分享集群高可用配置、性能调优、容错机制及数据质量监控等方面的具体思考与方案。

1、困境:烟囱林立的采集架构与高昂的运维代价

作为深度赋能流程工业的工业 AI 平台型公司,中控技术的全球业务不断发展,目前已拥有近 40 多家全球子公司,服务超 35000 家全球客户。业务的不断扩张对数据工作提出了更高要求:数据不仅要“算得快”,更要“落得准”。为此,我们搭建了流批分离的大数据平台以应对复杂场景。然而,平台本身的复杂度却反向加剧了数据采集、开发和运维的难度,特别是在数据采集这一源头环节,我们面临着严峻挑战:


(1)架构复杂,烟囱林立:我们过去长期依赖多种工具拼凑的方案(如使用 Sqoop 进行批式数据同步至 HDFS,借助 Maxwell/StreamSets 处理数据库增量日志并写入 Kafka/Kudu)。这种“打补丁”式的架构使得技术栈碎片化,维护成本高昂。


(2)运维黑洞,疲于奔命:多套路线意味着双倍的运维监控压力。缺乏统一的监控告警机制,任何一方出现异常(如:同步延迟、资源耗尽),都需要投入大量人力进行排查和“救火”,稳定性难以保障。


(3)能力割裂,难以扩展:当面临新的数据源(国产数据库和 SAP HANA 等数据库),我们需要在不同的工具中寻找适配方案或自行开发插件,无法快速响应业务需求。



上图清晰地展示了过去分散的采集生态。 我们意识到,这种“各自为战”的模式已成为数据中最脆弱的一环,不仅无法匹配公司未来的发展速度,数据质量与时效性存在潜在威胁。打造一个统一、稳定、高效的数据采集框架,已变得至关重要且迫在眉睫。

2、破局:统一采集框架的思考与技术选型

经过深度的分析和思考,我们明确了新技术的五大核心选型标准:


(1) 全面的连接能力:能够完全覆盖公司当前与未来的所有数据源类型(从 MySQL、Oracle、HANA 到 Kafka、StarRocks 等),并同时支持离线与实时两种采集模式,从根本上解决技术栈统一的问题。


(2) 集群稳定性与高可用:框架本身必须是高可用的分布式集群,具备强大的容错能力。即使单个节点故障,整个服务也不应中断,且能够自动恢复,保障数据采集管道的持续运行。


(3) 可靠的数据一致性保障:在任务执行层面,必须提供精确一次(Exactly-Once)或至少一次(At-Least-Once)处理语义,确保在任务因异常中断后能够自动从断点恢复,杜绝数据重复或丢失,这是数据质量的基石。


(4) 强劲的吞吐性能: 必须能够轻松应对我们日均 TB 级的数据增量挑战,其架构应支持水平扩展,可通过增加节点来线性提升同步性能,满足业务高速发展带来的数据增长需求。


(5) 可观测的运维体验:必须提供完善的监控告警机制,能够对数据同步过程中的异常、延迟、吞吐量等关键指标进行实时追踪,并及时通知运维人员,变被动“救火”为主动“预警”。


基于这五大标准,我们对业界主流方案进行了深入的调研与对比测试。最终, Apache SeaTunnel 在所有维度上都表现出色,成为我们破局的最优解。



3、实践:具体实施方案与细节

我们的 Apache SeaTunnel 实践之路,也是项目的成长之路。早期,我们基于 Apache SeaTunnel v2.3.5 进行构建,当时为了满足一些特定的需求(如处理不同数据库表名或字段名的大小写敏感问题),我们进行了一些二次开发工作。


然而,随着 SeaTunnel 社区的飞速发展,新版本的功能和转换器日益完善。当我们将集群顺利升级至 Apache SeaTunnel v2.3.11 时惊喜地发现,过去那些需要定制化开发的需求,在新版本中均已得到原生支持。


目前,我们所有的数据同步任务均基于官方版本实现,实现了零改造,这极大地降低了我们的长期维护成本,并能让我们无缝享受社区带来的最新功能和性能提升。


以下是我们基于 v2.3.11 版本,经过生产环境 TB 级数据量验证的核心实施方案,为我们集群自搭建以来 0 故障的卓越表现,奠定了坚实基础。

(1)集群规划

为保障集群的高可用性,建议优先选择分离模式集群部署,以下是我们使用的资源。


(2)集群关键配置文件

  • seatunnel.yaml 该配置文件主要用于定义作业的执行行为、容错机制和运维监控设置。它通过启用类加载缓存和动态资源分配来优化性能,并通过配置基于 S3 的检查点(Checkpoint)来保障作业的容错与数据一致性。此外,还可以开启指标收集、日志管理以及设置,从而为作业的稳定运行、监控和日常管理提供全面支持。


seatunnel:  engine:    # 类加载器缓存模式:开启后可显著提升作业频繁启停时的性能,减少类加载开销。生产环境建议开启。    classloader-cache-mode: true
# 历史作业数据过期时间(单位:分钟): 3天。超过此时间的已完成作业历史信息将被自动清理。 history-job-expire-minutes: 4320
# 数据备份数量backup-count: 1
# 队列类型:阻塞队列 queue-type: blockingqueue
# 执行信息打印间隔(秒):每隔60秒在日志中打印一次作业执行信息。 print-execution-info-interval: 60
# 作业指标信息打印间隔(秒):每隔60秒在日志中打印一次详细的指标信息。 print-job-metrics-info-interval: 60
slot-service: # 动态Slot管理:开启后,引擎会根据节点资源情况动态分配计算槽位(Slot),提高资源利用率。 dynamic-slot: true
# 检查点(Checkpoint)配置。 checkpoint: interval: 60000 # 两次Checkpoint之间的时间间隔,单位毫秒(ms)。此处为1分钟。 timeout: 600000 # 执行Checkpoint的超时时间,单位毫秒(ms)。此处为10分钟。 storage: type: hdfs # 此处声明存储类型为HDFS,实际存储在下方的S3。 max-retained: 3 # 最多保留的Checkpoint历史数量。旧的Checkpoint会被自动删除以节省空间。 plugin-config: storage.type: s3 # 实际配置存储类型为S3(或MinIO等兼容S3协议的对象存储) fs.s3a.access.key: xxxxxxx # S3兼容存储的访问密钥(Access Key) fs.s3a.secret.key: xxxxxxx # S3兼容存储的私有密钥(Secret Key) fs.s3a.endpoint: http://xxxxxxxx:8060 # S3兼容存储的服务端点(Endpoint)地址 s3.bucket: s3a://seatunel-pro-bucket # 用于存储Checkpoint数据的桶(Bucket)名称 fs.s3a.aws.credentials.provider: org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider # 认证凭证提供者 # 可观测性配置 telemetry: metric: enabled: true # 开启指标(Metrics)收集 logs: # 启用日志定时删除:开启日志文件的自动清理功能,防止日志占满磁盘。 scheduled-deletion-enable: true
# Web UI 和 REST API 配置 http: enable-http: true # 启用Web UI和HTTP REST API服务 port: 8080 # Web服务绑定的端口号 enable-dynamic-port: false # 禁用动态端口。如果8080被占用,是否启用其他端口。 # 以下为Web UI基础认证配置 enable-basic-auth: true # 启用基础身份认证 basic-auth-username: admin # 登录用户名 basic-auth-password: xxxxxxx # 登录密码
复制代码


  • jvm_master_options 该 JVM 参数配置文件主要用于保障 SeaTunnel 引擎在大规模数据处理时的稳定性和性能。通过设定堆内存与元空间容量来提供基础内存保障,并专门针对 G1 垃圾回收器进行了系列优化,以有效管理内存垃圾、控制回收停顿时间并提升运行效率。


# JVM 堆内存-Xms30g-Xmx30g
# 内存溢出诊断:当发生OOM时自动生成Heap Dump文件,保存至指定路径便于后续分析。-XX:+HeapDumpOnOutOfMemoryError-XX:HeapDumpPath=/tmp/seatunnel/dump/zeta-server
# 元空间:限制最大容量为5GB,防止元数据无限膨胀占用过多本地内存。-XX:MaxMetaspaceSize=5g
# G1垃圾回收器相关配置-XX:+UseG1GC # 启用G1垃圾回收器-XX:+PrintGCDetails # 在日志中打印详细的GC信息-Xloggc:/path/to/gc.log # 将GC日志输出到指定文件-XX:+PrintGCDateStamps # 在GC日志中打印时间戳-XX:MaxGCPauseMillis=5000 # 目标最大GC暂停时间为5000毫秒(5秒)-XX:InitiatingHeapOccupancyPercent=50 # 当堆内存使用率达到50%时启动并发GC周期-XX:+UseStringDeduplication # 启用字符串去重,节省内存空间-XX:GCTimeRatio=4 # 设置GC时间与应用时间的目标比例-XX:G1ReservePercent=15 # 保留15%的堆内存-XX:ConcGCThreads=6 # 设置并发GC阶段使用的线程数为6-XX:G1HeapRegionSize=32m # 设置G1分区大小为32MB
复制代码


  • hazelcast-master.yaml(iMap 存储在自建对象存储)该配置文件定义了 SeaTunnel 引擎集群的底层分布式架构与协同机制。它主要用于建立和管理集群节点间的网络通信。配置还包含了高精度的故障检测心跳机制,以确保能快速发现并处理节点失效问题,保障集群的高可用性。同时,启用了基于 S3 兼容存储的分布式数据持久化功能,将关键状态信息可靠地保存到对象存储中。


hazelcast:  cluster-name: seatunnel  # 集群名称,所有节点需保持一致  network:    rest-api:      enabled: true  # 启用REST API      endpoint-groups:        CLUSTER_WRITE:          enabled: true          DATA:          enabled: true      join:      tcp-ip:        enabled: true  # 使用TCP/IP发现机制        member-list:  # 集群节点列表          - 10.xx.xx.xxx:5801          - 10.xx.xx.xxx:5801          - 10.xx.xx.xxx:5802          - 10.xx.xx.xxx:5802          - 10.xx.xx.xxx:5802    port:      auto-increment: false  # 禁用端口自动递增      port: 5801  # 固定使用5801端口  properties:    hazelcast.invocation.max.retry.count: 20  # 调用最大重试次数    hazelcast.tcp.join.port.try.count: 30  # TCP连接端口尝试次数    hazelcast.logging.type: log4j2  # 使用log4j2日志框架    hazelcast.operation.generic.thread.count: 50  # 通用操作线程数    hazelcast.heartbeat.failuredetector.type: phi-accrual  # 使用Phi-accrual故障检测器    hazelcast.heartbeat.interval.seconds: 2  # 心跳间隔(秒)    hazelcast.max.no.heartbeat.seconds: 180  # 无心跳超时时间(秒)    hazelcast.heartbeat.phiaccrual.failuredetector.threshold: 10  # 故障检测阈值    hazelcast.heartbeat.phiaccrual.failuredetector.sample.size: 200  # 检测样本大小    hazelcast.heartbeat.phiaccrual.failuredetector.min.std.dev.millis: 100  # 最小标准差(毫秒)    hazelcast.operation.call.timeout.millis: 150000  # 操作调用超时时间(毫秒)  map:    engine*:      map-store:        enabled: true  # 启用Map存储持久化        initial-mode: EAGER  # 启动时立即加载所有数据        factory-class-name: org.apache.seatunnel.engine.server.persistence.FileMapStoreFactory  # 持久化工厂类        properties:          type: hdfs  # 存储类型          namespace: /seatunnel/imap  # 命名空间路径          clusterName: seatunnel-cluster  # 集群名称          storage.type: s3  # 实际使用S3兼容存储          fs.s3a.access.key: xxxxxxxxxxxxxxxx  # S3访问密钥          fs.s3a.secret.key: xxxxxxxxxxxxxxxx  # S3私有密钥          fs.s3a.endpoint: http://xxxxxxx:8060  # S3端点地址          s3.bucket: s3a://seatunel-pro-bucket  # S3存储桶名称          fs.s3a.aws.credentials.provider: org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider  # 认证提供者
复制代码

(3)采集任务示例

① MySQL-CDC 到 StarRocks


采集 MySQL-CDC 数据需要确保源数据库需要开启了 Binlog,格式为 ROW,需要用户拥有相关权限,并且把对应的 Mysql Jar 包放入 ${SEATUNNEL_HOME}/lib 目录。详情可以参考官网:https://seatunnel.apache.org/zh-CN/docs/2.3.11/connector-v2/source/MySQL-CDC


下方是我们采集 MySQL-CDC 的配置样例。


env {  parallelism = 1 # 并行度设置为1,流式采集只能是1  job.mode = "STREAMING" # 流式作业模式  job.name = cdh2sr # 作业名称标识  job.retry.times = 3 # 作业失败重试次数  job.retry.interval.seconds=180 # 重试间隔时间(秒)}
source { MySQL-CDC { base-url = "jdbc:mysql://xxxxxxx:3306/databasename" # MySQL连接地址 username = "xxxxxxr" # 数据库用户名 password = "xxxxxx" # 数据库密码 table-names = ["databasename.table1","databasename_pro.table2"] # 需要同步的表列表,格式:数据库.表名 startup.mode = "latest" # 从最新位点开始同步 exactly_once = true # 启用精确一次语义 debezium { include.schema.changes = "false" # 不包含Schema变更 snapshot.mode = when_needed # 按需进行快照 } }}
transform { TableRename { plugin_input = "cdc" # 输入插件标识 plugin_output = "rs" # 输出插件标识 convert_case = "LOWER" # 表名转换为小写 prefix = "ods_cdh_databasename_" # 添加表名前缀 }}
sink { StarRocks { plugin_input = "rs" # 输入插件标识(与transform输出一致) nodeUrls = ["xxxxxxx:8030","xxxxxxx:8030","xxxxxxx:8030"] # StarRocks FE节点地址 base-url = "jdbc:mysql://xxxxxxx:3307" # StarRocks MySQL协议地址 username = "xxxx" # StarRocks用户名 password ="xxxxxxx" # StarRocks密码 database = "ods" # 目标数据库 enable_upsert_delete = true # 启用更新删除功能 max_retries = 3 # 写入失败重试次数 http_socket_timeout_ms = 360000 # HTTP超时时间(毫秒) retry_backoff_multiplier_ms = 2000 # 重试退避乘数 max_retry_backoff_ms = 20000 # 最大重试退避时间 batch_max_rows = 2048 # 单批次最大行数 batch_max_bytes = 50000000 # 单批次最大字节数 }}
复制代码


② Oracle-CDC 到 StarRocks


采集 Oracle-CDC 数据需要确保源数据库需要开启了 Logminer,用户拥有相关权限,并且把对应的 OJDBC.Jar 和 Orai18n.jar 包放入 ${SEATUNNEL_HOME}/lib目录。详情可以参考官网:https://seatunnel.apache.org/zh-CN/docs/2.3.11/connector-v2/source/Oracle-CDC


值得一提的是,我们采集 Oracle-CDC 过程中遇到的延迟问题,可以优先让 DBA 查询 Logminer 日志切换的次数是否很频繁,官方建议控制在每小时在十次左右,太频繁的切换会导致发生长时间延迟情况的可能。如果次数过大,可以加大单个日志文件大小。其次考虑把 QPS 值极高的表拆分到新的 SeaTunel 任务中。


-- 查询日志切换次数SELECT GROUP#, THREAD#, BYTES/1024/1024 || 'MB' "SIZE", ARCHIVED, STATUS FROM V$LOG;SELECTTO_CHAR(first_time, 'YYYY-MM-DD HH24') AS hour,COUNT(*) AS switch_countFROMv$log_historyWHEREfirst_time >= TRUNC(SYSDATE) - 1 -- 过去一天的数据GROUP BYTO_CHAR(first_time, 'YYYY-MM-DD HH24')ORDER BYhour;

-- 查询日志文件大小SELECT F.MEMBER, L.GROUP#, L.THREAD#, L.SEQUENCE#, L.BYTES/1024/1024 AS SIZE_MB, L.ARCHIVED, L.STATUS, L.FIRST_CHANGE#, L.NEXT_CHANGE#FROM V$LOG L, V$LOGFILE FWHERE F.GROUP# = L.GROUP#ORDER BY L.GROUP#;
复制代码


下方是我们采集 Oracle-CDC 的配置样例。


env {  parallelism = 1 # 并行度为1,流式采集只能是1  job.mode = "STREAMING" # 流式作业模式  job.name = bpm2sr # 作业名称标识  job.retry.times = 3 # 作业失败重试次数   job.retry.interval.seconds=180 # 重试间隔时间(秒)}
source { Oracle-CDC { plugin_output = "cdc" # 输出插件标识 base-url = "jdbc:oracle:thin:@xxxxxx:1521:DB" # Oracle连接地址 username = "xxxxxx" # 数据库用户名 password = "xxxxxx" # 数据库密码 table-names = ["DB.SC.TABLE1","DB.SC.TABLE2"] # 需要同步的表,格式:数据库.模式.表名 startup.mode = "latest" # 从最新位点开始同步 database-names = ["DB"] # 数据库名 schema-names = ["SC"] # 模式名 skip_analyze = true # 跳过表分析 use_select_count = true # 使用统计 exactly_once = true # 启用精确一次语义 connection.pool.size = 20 # 连接池大小 debezium { log.mining.strategy = "online_catalog" # 日志挖掘策略 log.mining.continuous.mine = true # 持续挖掘日志 lob.enabled = false # 禁用LOB支持 internal.log.mining.dml.parser ="legacy" # 使用传统DML解析器 } }}
transform { TableRename { plugin_input = "cdc" # 输入插件标识 plugin_output = "rs" # 输出插件标识 convert_case = "LOWER" # 表名转换为小写 prefix = "ods_crm_db_" # 添加表名前缀 }}
sink { StarRocks { plugin_input = "rs" # 输入插件标识 nodeUrls = ["xxxxxxx:8030","xxxxxxx:8030","xxxxxxx:8030"] # StarRocks FE节点 base-url = "jdbc:mysql://xxxxxxx:3307" # JDBC连接地址 username = "xxxx" # 用户名 password ="xxxxxxx" # 密码 database = "ods" # 目标数据库 enable_upsert_delete = true # 启用更新删除 max_retries = 3 # 最大重试次数 http_socket_timeout_ms = 360000 # HTTP超时时间 retry_backoff_multiplier_ms = 2000 # 重试退避乘数 max_retry_backoff_ms = 20000 # 最大重试退避时间 batch_max_rows = 2048 # 批次最大行数 batch_max_bytes = 50000000 # 批次最大字节数 }}
复制代码

(4)可观测的监控

得益于 SeaTunnel 新版本提供的强大监控指标(Metrics)和我们构建的完善监控体系,我们能够从集群全局和任务粒度两个层面,对数据采集平台的状态了如指掌。我们的监控体系主要包含以下两个维度:


① 集群监控


  • 节点状态:实时监控集群节点个数与存活状态,确保 Worker 节点无异常下线,保障集群处理能力。

  • 集群吞吐:监控集群整体的 SourceReceivedQPS 和 SinkWriteQPS,掌控全局数据流入与流出速率,评估集群负载。

  • 资源状态:监控集群节点的 CPU、内存,为资源扩容或优化提供依据。

  • 网络健康度:通过监控内部心跳与通信延迟,确保集群网络状况良好。



② 任务监控


  • 任务运行状况:实时检查所有任务的运行状态(Running/Failed/Finished),是监控的最基本要求。

  • 数据同步量:监控每个任务的 SourceReceivedCount 和 SinkWriteCount,实时掌握每条数据流水线的吞吐量。

  • 延迟时间:这是 CDC 任务最关键的指标之一,当采集端发生持续发生延迟时发送告警。



4、成效:可衡量收益

经过一段时间的稳定运行,基于 Apache SeaTunnel 构建的新一代数据采集框架为我们带来了显著且可量化的收益,主要体现在以下几个方面:

(1)稳定性:从“疲于奔命”到“高枕无忧”

  • 任务故障率降低超 99%: 旧方案下,每月需处理 1-3 次同步异常。新集群上线至今,核心数据同步任务保持 0 故障运行,未发生因框架本身导致的数据服务中断。

  • 数据一致性达到 100%:依托 Apache SeaTunnel 的 Exactly-Once 语义和强大的 Checkpoint 机制,实现了端到端的精确一次处理,彻底解决了之前可能存在的微量数据重复或丢失问题,数据质量得到根本保障。

  • 可用性大幅提升: 集群的高可用设计确保了服务 99.99% 的可用性,任何单点故障均可在分钟级内自动恢复,对业务透明无感。

(2)效率:开发运维效能的倍增

  • 开发效率提升 50%: 从过去编写维护多套脚本,转变为统一的配置化开发。新数据源的接入从原来的 1-2 人天缩短至 1 分钟内即可完成,效率提升显著。

  • 运维成本降低 70%: 现在仅需通过 Grafana 监控大屏即可掌控全局状态,日均主动运维投入小于 0.5 人时。

  • 数据时效性优化: 数据端到端延迟从分钟级优化至秒级,为实时数据分析和决策提供了坚实基础。

(3)架构:资源优化与统一框架

  • 技术栈统一: 成功将 Sqoop、StreamSets 等多种技术栈统一收口至 Apache SeaTunnel,极大降低了技术复杂度和长期维护成本。

5、展望:未来规划

  • (1)全面云原生化: 我们将积极探索 Apache SeaTunnel 在 Kubernetes 上的原生部署与调度能力,利用其弹性伸缩的特性,实现计算资源的按需分配,进一步优化成本与效率,更好地拥抱混合云与多云战略。

  • (2)智能化运维: 基于已收集的丰富 Metrics 数据,构建 AIOps 能力,实现任务性能的智能预测、故障的根因分析自动定位与智能调参。

6、致谢

在此,我们由衷地感谢 Apache SeaTunnel 开源社区。同时,也要感谢公司内部项目团队的每一位成员,你们的辛勤付出与勇于探索,是此次架构升级得以成功实施的关键。最后,我们衷心祝愿 Apache SeaTunnel 项目未来越来越好,生态愈发繁荣!

发布于: 14 分钟前阅读数: 6
用户头像

白鲸开源

关注

一家开源原生的DataOps商业公司。 2022-03-18 加入

致力于打造下一代开源原生的DataOps 平台,助力企业在大数据和云时代,智能化地完成多数据源、多云及信创环境的数据集成、调度开发和治理,以提高企业解决数据问题的效率,提升企业分析洞察能力和决策能力。

评论

发布
暂无评论
从“分散”到“统一”,中控技术利用SeaTunnel构建高效数据采集框架,核心数据同步任务0故障运行!_开源_白鲸开源_InfoQ写作社区