写点什么

大数据 -25 Sqoop 增量数据导入 CDC 变化数据捕获 差量同步数据

作者:武子康
  • 2025-06-29
    美国
  • 本文字数:2794 字

    阅读完需:约 9 分钟

大数据-25 Sqoop 增量数据导入 CDC 变化数据捕获 差量同步数据

点一下关注吧!!!非常感谢!!持续更新!!!

🚀 AI 篇持续更新中!(长期更新)

目前 2025 年 06 月 16 日更新到:AI 炼丹日志-29 - 字节跳动 DeerFlow 深度研究框斜体样式架 私有部署 测试上手 架构研究,持续打造实用 AI 工具指南!📐🤖

💻 Java 篇正式开启!(300 篇)

目前 2025 年 06 月 28 日更新到:Java-58 深入浅出 分布式服务 ACID 三阶段提交 3PC 对比 2PCMyBatis 已完结,Spring 已完结,Nginx 已完结,Tomcat 已完结,分布式服务正在更新!深入浅出助你打牢基础!

📊 大数据板块已完成多项干货更新(300 篇):

包括 Hadoop、Hive、Kafka、Flink、ClickHouse、Elasticsearch 等二十余项核心组件,覆盖离线+实时数仓全栈!目前 2025 年 06 月 13 日更新到:大数据-278 Spark MLib - 基础介绍 机器学习算法 梯度提升树 GBDT 案例 详解


章节内容

上节我们完成了如下的内容:


  • Sqoop MySQL 迁移到 Hive

  • Sqoop Hive 迁移数据到 MySQL

  • 编写脚本进行数据导入导出测试

背景介绍

这里是三台公网云服务器,每台 2C4G,搭建一个 Hadoop 的学习环境,供我学习。


  • 2C4G 编号 h121

  • 2C4G 编号 h122

  • 2C2G 编号 h123


CDC

全称为:变化数据捕获(Change Data Capture)我们前面执行的都是全量数据的导入。


  • 如果数据量很小采取完全源数据抽取

  • 如果源数据量很大,则需要抽取发生变化的数据,这种数据抽取模式叫:“变化数据捕获”,简称 CDC。


如果CDC 是侵入式的,那么操作会给源系统带来性能的影响

核心价值

  • 批量 ETL 延迟高(T+1、T+N)将数据库变更实时转成事件流,近实时数仓 / Lakehouse

  • 全量同步开销大,增量采集(只抓日志差异),带宽与存储成本骤降

  • 业务迁移/灾备停机窗口大,持续复制并随时切换,零停机迁移、热备

  • 微服务难以获取一致数据,“Outbox” 模式把行级变更写入 Kafka/Pulsar,事务内发布领域事件

捕获方式与架构剖析

查询比对(Timestamp/Diff)

简易、无侵入;但需轮询,全量扫描易拖慢 OLTP。

触发器/存储过程

变更立即写冗余表;对数据库有侵入,DDL 维护复杂。

日志解析(推荐)

解析 binlog / WAL / redo / redo&undo 等,几乎零额外负载,可拿到行级顺序 + 事务边界,是目前主流方案。

通用拓扑

OLTP DB → CDC Connector → 消息队列/Kafka → 流处理(Flink/Spark)→ 数据仓库/湖(Iceberg / Delta)→ 实时 BI/Feature Store

开源方案

  • Flink CDC:基于 Flink Runtime;内置 MySQL/PostgreSQL/Oracle/Mongo 等 Connector;支持 Incremental Snapshot,3.0/3.1 把所有源迁到统一增量快照框架,CLI 能从 Savepoint 恢复,再平滑扩缩容

  • Debezium:Kafka Connect 插件;配套 Server/Serverless 形态;覆盖 10+ 数据源 2.5 系列支持从 PostgreSQL 16 Stand-by 捕获、MySQL 5.7 退役提示、Mongo Post-image、Serverless Sink 改进

  • Canal / Maxwell:轻量 MySQL/Oracle/Cassandra CDC;早期淘系内部项目社区活跃度下降,适合存量 MySQL 场景

  • NiFi / SeaTunnel / DataX:可视化或脚本式流水线,CDC 作为 Processor / Connector,易上手,适合异构源快速打通

落地设计要点

初始快照 vs. 增量

  • 小表:优先一次性快照。

  • TB 级表:Flink CDC Incremental Snapshot(先分片全量 → 继续拉日志)可在不停机情况下完成。

消息编解码

  • 行级 JSON / Avro + Schema Registry,方便演进。

  • 压缩列式格式(Parquet/Iceberg)在下游落湖时统一写。

Exactly-Once 语义

  • Kafka → Flink Two-Phase commit + checkpoint,写湖时用 Iceberg MERGE ON READ / Hudi MOR 避免重复。

DDL 同步

  • Debezium Schema Change Topic or Flink Catalog 自动下推。

  • 对不支持在线 DDL 的源,可配合 gh-ost / pt-osc。

监控 & 延迟治理

  • endToEnd lag = source LSN - sink committed offset

  • 自治流控:Flink - backlog > 阈值 自动降并发。

常见挑战

  • 高峰写入放大,批量 UPSERT 导致热点分区,主键 Hash + Range 二级分桶;Iceberg Clustered Write

  • Schema Drift,生产库无约束,随意加列删列,强制 PR 审核 + Pipeline DDL Gate

  • 历史补数 (Backfill),早期没开 CDC,需要补齐历史,Flink Changelog-Source + 时间旅行 (Iceberg Snapshot)

  • 多源合并顺序乱序,Shard key 不同、时钟漂移,统一使用源时间戳 + Watermark,或落 Kafka Partition-Key = PK

基于时间戳

抽取过程可以根据某些属性列来判断哪些数据是增量的,最常见的属性列有以下两种:


  • 时间戳:最好有两个列,一个插入时间戳,表示何时创建,一个更新时间戳表示最后一次更新时间。

  • 序列:大多数数据库都提供自增功能,表中的列定义成自增的,很容易得根据该列识别新插入的数据


时间戳最简单且常用的,但是有如下缺点


  • 不能记录删除记录的操作

  • 无法识别多次更新

  • 不具有实时的能力

基于触发器

当执行:INSERTUPDATEDELTE 这些 SQL 语句时,激活数据库的触发器,使用触发器可捕获变更的数据,并把数据保存中间临时表里。大多数场合下,不允许向操作性数据库里添加触发器,且这种会降低系统性能,基本不会采用。

基于快照

可以通过比较源表快照表来得到数据的变化,基于快照的 CDC 可以检测插入、更新、删除等数据,这是相对于时间戳的 CDC 方案的优点。缺点就是需要大量的空间

基于日志

最复杂没有侵入性的就是基于日志的方式,数据库把每个插入、更新、删除都记录到日志里,解析日志文件,就可以获取相关的信息。每个关系型数据库:日志格式不一致没有通用的产品。阿里巴巴的Canal可以完成MySQL日志文件解析。

Append 方式

初始化数据

删除 MySQL 中的数据


-- 删除 MySQL 表中的全部数据truncate table sqoop.goodtbl;
复制代码

删除 Hive 中的数据

-- 删除 Hive 表中的全部数据truncate table mydb.goodtbl;
复制代码

重新生成数据

这个 SQL 是之前章节写的函数方法,如果你第一次看到这里,你可能需要把前边的文章执行一次。


-- 向MySQL的表中插入100条数据call batchInsertTestData(1, 100);
复制代码

导入 Hive

sqoop import \--connect jdbc:mysql://h122.wzk.icu:3306/sqoop \--username hive --password hive@wzk.icu \--table goodtbl \--incremental append \--hive-import \--fields-terminated-by "\t" \--hive-table mydb.goodtbl \--check-column serialNumber \--last-value 50 \-m 1
复制代码


以上参数说明:


  • check-column 用来指定一些列,来检查是否可以作为增量数据进行导入,和关系型数据库自增或时间戳类似。

  • last-value 制定上一次导入检查列指定字段的最大值


检查 Hive

我们通过指令查看 Hive 同步了多少数据过来:


select count(*) from mydb.goodtbl;
复制代码

继续生成

call batchInsertTestData(200, 1000);
复制代码

增量导入

sqoop import \--connect jdbc:mysql://h122.wzk.icu:3306/sqoop \--username hive --password hive@wzk.icu \--table goodtbl \--incremental append \--hive-import \--fields-terminated-by "\t" \--hive-table mydb.goodtbl \--check-column serialNumber \--last-value 100 \-m 1
复制代码

检查 Hive

重新查看 Hive,看看目前同步了多少数据过来


select count(*) from mydb.goodtbl;
复制代码


发布于: 刚刚阅读数: 4
用户头像

武子康

关注

永远好奇 无限进步 2019-04-14 加入

Hi, I'm Zikang,好奇心驱动的探索者 | INTJ / INFJ 我热爱探索一切值得深究的事物。对技术、成长、效率、认知、人生有着持续的好奇心和行动力。 坚信「飞轮效应」,相信每一次微小的积累,终将带来深远的改变。

评论

发布
暂无评论
大数据-25 Sqoop 增量数据导入 CDC 变化数据捕获 差量同步数据_大数据_武子康_InfoQ写作社区