一次打通 FlinkCDC 同步 Mysql 数据
业务痛点
离开了业务谈技术都是耍流氓。我们来聊聊基于业务的痛点,衍生出来的多种数据同步方案。
业务中常见的需要数据同步的场景
1、多个库的表合并到一张表。不同的业务线或者微服务在不同的数据库里开发,但是此时有些报表需要将多个库的类似的数据合并后做查询统计。或者,某些历史原因,类似刚开始的商业模式不清晰,导致一些业务线分分合合。或者某些边缘业务逐步融合到了主业务。早起的数据是分开的,业务运营也是分开,后来又合并成了一个大块业务。
2、某个数据需要写到多个存储中。业务数据需要写入到多个中间件或者存储中,比如业务的数据存储再 Mysql 的数据中,后来为了方便检索需要写入到 ES,或者为了缓存需要写入到 Redis,或者是 Mysql 分表的数据合并写入到 Doris 中。
3、数据仓库的场景。比如将表里的数据实时写入到 DWS 数据仓库的宽表中。
4、应急场景。如果不采专用 CDC 的方案,那么要达到实时查询的效果,只能在 BFF 层的代码调用多个中心层的查询 API,然后再 BFF 层做各种聚合,运算。这种方式开发效率低下,万一有的中心层没有提供合适的查询 API,临时开发的话,会让开发进度不可控。
总之,不管是数据多写、还是多表合并、还是建立数据仓库,都属于数据同步任务。
数据同步为什么需要独立的系统来做
这种任务放在业务代码里做,是不可持续的。你要尽量让业务系统解耦,专注于做业务的事情,这种数据同步的任务应该交给专门的系统来做。如果在业务系统中增加额外的数据同步功能,同时为了提高数据同步的可用性,就需要写许多数据同步的代码和容错的代码(效率问题、并发问题、数据一致性问题、集群问题等等),这会让业务系统不堪重负,到后期业务系统几乎会达到不可维护的地步。
CDC 登场
基于以上问题,本场数据同步的主角 FlinkCDC 就登场了,FlinkCDC 是专门为数据同步(同步+计算)而生。通过 CDC 工具,可以将数据同步任务从业务系统中解耦出来,同时还可以将一份变动的数据,写入到多个存储中。这种方式不但让业务系统解耦,而且可以让数据同步任务更加健壮,方便后续的维护。
CDC 原理
CDC 是什么
CDC 是变更数据捕获(Change Data Capture)技术的缩写,它可以将源数据库(Source)的增量变动记录,同步到一个或多个数据目的(Sink)。在同步过程中,还可以对数据进行一定的处理,例如过滤、关联、分组、统计等。
目前专业做数据库事件接受和解析的中间件是 Debezium,如果是捕获 Mysql,还有 Canal。
Debezium 官方https://debezium.io/
Debezium 官方定义:Debezium is an open source distributed platform for change data capture. Start it up, point it at your databases, and your apps can start responding to all of the inserts, updates, and deletes that other apps commit to your databases. Debezium is durable and fast, so your apps can respond quickly and never miss an event, even when things go wrong。翻译过来则是:Debezium 是一个用于变更数据捕获的开源分布式平台。 启动它,将其指向您的数据库,您的应用程序就可以开始响应其他应用程序提交给您的数据库的所有插入、更新和删除操作。 Debezium 耐用且快速,因此即使出现问题,您的应用程序也可以快速响应并且不会错过任何事件。
CDC 原理
CDC 的原理是,当数据源表发生变动时,会通过附加在表上的触发器或者 binlog 等途径,将操作记录下来。下游可以通过数据库底层的协议,订阅并消费这些事件,然后对数据库变动记录做重放,从而实现同步。这种方式的优点是实时性高,可以精确捕捉上游的各种变动。
FlinkCDC
FlinkCDC 是什么
官网地址:https://ververica.github.io/flink-cdc-connectors/
官方定义:This project provides a set of source connectors for Apache Flink® directly ingesting changes coming from different databases using Change Data Capture(CDC)。根据 FlinkCDC 官方给出的定义,FlinkCDC 提供一组源数据的连接器,使用变更数据捕获的方式,直接吸收来自不同数据库的变更数据。
为什么是 FlinkCDC
1、FlinkCDC 提供了对 Debezium 连接器的封装和集成,简化了配置和使用的过程,并提供了更高级的 API 和功能,例如数据格式转换、事件时间处理等。Flink CDC 使用 Debezium 连接器作为底层的实现,将其与 Flink 的数据处理能力结合起来。通过配置和使用 Flink CDC,您可以轻松地将数据库中的变化数据流转化为 Flink 的 DataStream 或 Table,并进行实时的数据处理、转换和分析。
2、Flink 的 DataStream 和 SQL 比较成熟和易用
3、Flink 支持状态后端(State Backends),允许存储海量的数据状态
4、Flink 有更好的生态,更多的 Source 和 Sink 的支持
数据流向对比
数据合并流向:
数据多写流向:
技术方案比较
网上有数据同步的多种技术方案的比较,我只挑选我实践过的 2 种做个比较,Canal 和 FlinkCDC。
数据链路对比
通过下图,我们可以看到 Canal 处理数据的链路比 FlinkCDC 更长,数据链路一旦变长意味着,出错的可能性更高。
我在实践 Canal 的过程中,监听到 Kafka 之后,通过一个 Springboot 项目的微服务项目去监听 Kafka 处理业务逻辑,这种负责度更高,内部数据关联啥的也是调用 Dubbo API,我不建议你也使用这种方法。当然啦,这是我没遇到 Flink 之前的方案,嘻嘻。当然还用过更差的方案,定时任务扫描,再写入别的库,哈哈。
变更数据的结构
Mysql 单次提交多条数据的时候,Canal 拿到的数据是 1 条数据,FlinkCDC 拿到的是多条数据。FlinkCDC 的这种方式更便于处理。
canal 数据格式:
FlinkCDC 数据格式:
如何使用
FlinkCDC 同步数据,有两种方式,一种是 FlinkSQL 的方式,一种是 Flink DataStream 和 Table API 的方式。为了方便管理,这两种方式我都写在代码里。
前置准备
1、准备好 Flink 集群。FlinkCDC 也是以任务的形式提交到 Flink 集群去执行的。可以按照 Flink 官网进行下载安装:https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/try-flink/local_installation/
2、开启 Mysql 的 binlog。这一步自行解决。
FlinkSQL 方式
为了方便管理,FlinkSQL 方式也是用 Java 代码写
1、创建 database
2、创建 source 表
注意类型是'connector' = 'mysql-cdc'
。
3、创建 sink 表
注意类型是'connector' = 'jdbc'
。
4、执行 insert。
如果需要多表关联的,可以注册多个'connector' = 'jdbc'
的源表,然后这里编写类似insert into select join
这样代码
FlinkSQL 方式结束,此时只要 source 表有变动,那么会自动监听到数据,自动插入到新的表中。
DataStream 和 Table API 方式
个人觉得这种方式虽说有些繁琐,但是灵活度更好,可以用 Java 代码处理很多逻辑,比 SQL 更灵活些。
1、监听 source
2、清洗数据(过滤、转换等等)
此处逻辑比较自定义,文中是过滤掉了不相关的表,然后过滤掉了删除数据的 log。
过滤掉不相关的表。
过滤掉删除数据的 log
处理业务逻辑,过滤掉了部分数据
3、创建自定义 sink,将数据写出去
代码地址
代码里有 2 个夹子,一个是 API 方式的,一个是 SQL 方式的,每种方式了放了 2 个例子,代码地址如下:https://github.com/yclxiao/flink-cdc-demo.git
如果在实践的过程中碰到问题,可以在这里找到我:http://www.mangod.top/articles/2023/03/15/1678849930601.html
评论