写点什么

Flink CDC 2.0 数据处理流程全面解析

  • 2021 年 11 月 15 日
  • 本文字数:4825 字

    阅读完需:约 16 分钟

8 月份 FlinkCDC 发布 2.0.0 版本,相较于 1.0 版本,在全量读取阶段支持分布式读取、支持 checkpoint,且在全量 + 增量读取的过程在不锁表的情况下保障数据一致性。

Flink CDC2.0 数据读取逻辑并不复杂,复杂的是 FLIP-27: Refactor Source Interface 的设计及对 Debezium Api 的不了解。本文重点对 Flink CDC 的处理逻辑进行介绍, FLIP-27 的设计及 Debezium 的 API 调用不做过多讲解。

本文先以 Flink SQL 案例来介绍 Flink CDC2.0 的使用,接着介绍 CDC 中的核心设计包含切片划分、切分读取、增量读取,最后对数据处理过程中涉及 flink-mysql-cdc 接口的调用及实现进行代码讲解。

案例

全量读取+增量读取 Mysql 表数据,以 changelog-json 格式写入 kafka,观察 RowKind 类型及影响的数据条数。

public static void main(String[] args) {        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        EnvironmentSettings envSettings = EnvironmentSettings.newInstance()                .useBlinkPlanner()                .inStreamingMode()                .build();        env.setParallelism(3);        // note: 增量同步需要开启CK        env.enableCheckpointing(10000);        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env, envSettings);                    tableEnvironment.executeSql(" CREATE TABLE demoOrders (\n" +                "         `order_id` INTEGER ,\n" +                "          `order_date` DATE ,\n" +                "          `order_time` TIMESTAMP(3),\n" +                "          `quantity` INT ,\n" +                "          `product_id` INT ,\n" +                "          `purchaser` STRING,\n" +                "           primary key(order_id)  NOT ENFORCED" +                "         ) WITH (\n" +                "          'connector' = 'mysql-cdc',\n" +                "          'hostname' = 'localhost',\n" +                "          'port' = '3306',\n" +                "          'username' = 'cdc',\n" +                "          'password' = '123456',\n" +                "          'database-name' = 'test',\n" +                "          'table-name' = 'demo_orders'," +                            //  全量 + 增量同步                   "          'scan.startup.mode' = 'initial'      " +                " )");
            tableEnvironment.executeSql("CREATE TABLE sink (\n" +                "         `order_id` INTEGER ,\n" +                "          `order_date` DATE ,\n" +                "          `order_time` TIMESTAMP(3),\n" +                "          `quantity` INT ,\n" +                "          `product_id` INT ,\n" +                "          `purchaser` STRING,\n" +                "          primary key (order_id)  NOT ENFORCED " +                ") WITH (\n" +                "    'connector' = 'kafka',\n" +                "    'properties.bootstrap.servers' = 'localhost:9092',\n" +                "    'topic' = 'mqTest02',\n" +                "    'format' = 'changelog-json' "+                ")");
            tableEnvironment.executeSql("insert into sink select * from demoOrders");}
复制代码


全量数据输出:

{"data":{"order_id":1010,"order_date":"2021-09-17","order_time":"2021-09-22 10:52:12.189","quantity":53,"product_id":502,"purchaser":"flink"},"op":"+I"}{"data":{"order_id":1009,"order_date":"2021-09-17","order_time":"2021-09-22 10:52:09.709","quantity":31,"product_id":500,"purchaser":"flink"},"op":"+I"}{"data":{"order_id":1008,"order_date":"2021-09-17","order_time":"2021-09-22 10:52:06.637","quantity":69,"product_id":503,"purchaser":"flink"},"op":"+I"}{"data":{"order_id":1007,"order_date":"2021-09-17","order_time":"2021-09-22 10:52:03.535","quantity":52,"product_id":502,"purchaser":"flink"},"op":"+I"}{"data":{"order_id":1002,"order_date":"2021-09-17","order_time":"2021-09-22 10:51:51.347","quantity":69,"product_id":503,"purchaser":"flink"},"op":"+I"}{"data":{"order_id":1001,"order_date":"2021-09-17","order_time":"2021-09-22 10:51:48.783","quantity":50,"product_id":502,"purchaser":"flink"},"op":"+I"}{"data":{"order_id":1000,"order_date":"2021-09-17","order_time":"2021-09-17 17:40:32.354","quantity":30,"product_id":500,"purchaser":"flink"},"op":"+I"}{"data":{"order_id":1006,"order_date":"2021-09-17","order_time":"2021-09-22 10:52:01.249","quantity":31,"product_id":500,"purchaser":"flink"},"op":"+I"}{"data":{"order_id":1005,"order_date":"2021-09-17","order_time":"2021-09-22 10:51:58.813","quantity":69,"product_id":503,"purchaser":"flink"},"op":"+I"}{"data":{"order_id":1004,"order_date":"2021-09-17","order_time":"2021-09-22 10:51:56.153","quantity":50,"product_id":502,"purchaser":"flink"},"op":"+I"}{"data":{"order_id":1003,"order_date":"2021-09-17","order_time":"2021-09-22 10:51:53.727","quantity":30,"product_id":500,"purchaser":"flink"},"op":"+I"}
复制代码


修改表数据,增量捕获:

## 更新 1005 的值 {"data":{"order_id":1005,"order_date":"2021-09-17","order_time":"2021-09-22 02:51:58.813","quantity":69,"product_id":503,"purchaser":"flink"},"op":"-U"}{"data":{"order_id":1005,"order_date":"2021-09-17","order_time":"2021-09-22 02:55:43.627","quantity":80,"product_id":503,"purchaser":"flink"},"op":"+U"}
## 删除 1000 {"data":{"order_id":1000,"order_date":"2021-09-17","order_time":"2021-09-17 09:40:32.354","quantity":30,"product_id":500,"purchaser":"flink"},"op":"-D"}
复制代码


核心设计

切片划分

全量阶段数据读取方式为分布式读取,会先对当前表数据按主键划分成多个 Chunk,后续子任务读取 Chunk 区间内的数据。根据主键列是否为自增整数类型,对表数据划分为均匀分布的 Chunk 及非均匀分布的 Chunk。

均匀分布

主键列自增且类型为整数类型(int,bigint,decimal)。查询出主键列的最小值,最大值,按 chunkSize 大小将数据均匀划分,因为主键为整数类型,根据当前 chunk 起始位置、chunkSize 大小,直接计算 chunk 的结束位置。

//  计算主键列数据区间select min(`order_id`), max(`order_id`) from demo_orders;
//  将数据划分为 chunkSize 大小的切片chunk-0: [min,start + chunkSize)chunk-1: [start + chunkSize, start + 2chunkSize).......chunk-last: [max,null)
复制代码


非均匀分布

主键列非自增或者类型为非整数类型。主键为非数值类型,每次划分需要对未划分的数据按主键进行升序排列,取出前 chunkSize 的最大值为当前 chunk 的结束位置。

// 未拆分的数据排序后,取 chunkSize 条数据取最大值,作为切片的终止位置。chunkend = SELECT MAX(`order_id`) FROM (        SELECT `order_id`  FROM `demo_orders`         WHERE `order_id` >= [前一个切片的起始位置]         ORDER BY `order_id` ASC         LIMIT   [chunkSize]      ) AS T
复制代码


全量切片数据读取

Flink 将表数据划分为多个 Chunk,子任务在不加锁的情况下,并行读取 Chunk 数据。因为全程无锁在数据分片读取过程中,可能有其他事务对切片范围内的数据进行修改,此时无法保证数据一致性。因此,在全量阶段 Flink 使用快照记录读取+Binlog 数据修正的方式来保证数据的一致性。

快照读取

通过 JDBC 执行 SQL 查询切片范围的数据记录。

## 快照记录数据读取SQL SELECT * FROM `test`.`demo_orders` WHERE order_id >= [chunkStart] AND NOT (order_id = [chunkEnd]) AND order_id <= [chunkEnd]
复制代码


数据修正

在快照读取操作前、后执行 SHOW MASTER STATUS 查询 binlog 文件的当前偏移量,在快照读取完毕后,查询区间内的 binlog 数据并对读取的快照记录进行修正。

快照读取+Binlog 数据读取时的数据组织结构。



BinlogEvents 修正 SnapshotEvents 规则。

  • 未读取到 binlog 数据,即在执行 select 阶段没有其他事务进行操作,直接下发所有快照记录。

  • 读取到 binlog 数据,且变更的数据记录不属于当前切片,下发快照记录。

  • 读取到 binlog 数据,且数据记录的变更属于当前切片。delete 操作从快照内存中移除该数据,insert 操作向快照内存添加新的数据,update 操作向快照内存中添加变更记录,最终会输出更新前后的两条记录到下游。

修正后的数据组织结构:



以读取切片[1,11)范围的数据为例,描述切片数据的处理过程。c,d,u 代表 Debezium 捕获到的新增、删除、更新操作。

修正前数据及结构:



修正后数据及结构:



单个切片数据处理完毕后会向 SplitEnumerator 发送已完成切片数据的起始位置(ChunkStart, ChunkStartEnd)、Binlog 的最大偏移量(High watermark),用来为增量读取指定起始偏移量。

单个切片数据处理完毕后会向 SplitEnumerator 发送已完成切片数据的起始位置(ChunkStart, ChunkStartEnd)、Binlog 的最大偏移量(High watermark),用来为增量读取指定起始偏移量。

增量切片数据读取

全量阶段切片数据读取完成后,SplitEnumerator 会下发一个 BinlogSplit 进行增量数据读取。BinlogSplit 读取最重要的属性就是起始偏移量,偏移量如果设置过小下游可能会有重复数据,偏移量如果设置过大下游可能是已超期的脏数据。而 Flink CDC 增量读取的起始偏移量为所有已完成的全量切片最小的 Binlog 偏移量,只有满足条件的数据才被下发到下游。

数据下发条件:

  • 捕获的 Binlog 数据的偏移量 > 数据所属分片的 Binlog 的最大偏移量。

例如,SplitEnumerator 保留的已完成切片信息为。



增量读取时,从偏移量 800 开始读取 Binlog 数据 ,当捕获到数据 <data:123, offset:1500> 时,先找到 123 所属快照分片,并找到对应的最大 Binlog 偏移量 800。当前偏移量大于快照读的最大偏移量,则下发数据,否则直接丢弃。

发布于: 2021 年 11 月 15 日阅读数: 7
用户头像

还未添加个人签名 2021.03.07 加入

还未添加个人简介

评论

发布
暂无评论
Flink CDC 2.0 数据处理流程全面解析