原文来源:https://tidb.net/blog/ac226af1
作者介绍:胡梦宇, 知乎大数据基础架构开发工程师.
最近在 TiBigData 实现了一把 TiDB 的流批一体 HybirdSource,其主要思想是利用 TiKV 的快照机制,从 TiKV 里以批的方式读取 TiDB 的全量数据,然后在 Kafka 里以流的方式读取 TiDB CDC 的增量数据,最后全量加增量合成一张实时的流表。
区别于 Flink 官方提供的 TiDB CDC,这种流批一体的方式更适用于大型 TiDB 表(这里指 1T 以上的表)。这是因为 Flink TiDB CDC 的实现基于 tikv-java-client 的 CDC 客户端,这个客户端目前还处于快速迭代阶段,相比于 TiDB golang 版本的 TiCDC 完善程度会低一些,可能在采集超大型 TiDB 表的 CDC 时,不能满足一些性能上的需求。而 TiBigData 不直接从 TiDB 内读 CDC 数据,而是使用 Kafka 内由 TiCDC 采集的数据,性能完全由 TiCDC 保证。当然,如果 TiDB 表的大小没有超过 1T,直接使用 Flink 官方提供的 TiDB CDC 即可,简单粗暴好使。
CDC 合成的流表输出到下游时,只要下游是支持更新的数据源,就可以构建出一张与原表相同的实时快照表,这在构建实时数据仓库方面是具有重大意义的。
本篇选取数据湖 Iceberg,介绍如何利用 Flink 在 Iceberg 内构建 TiDB 的实时数据。以下内容基于 PingCAP 开源的 TiBigData 实现。
1 部署
本节是部署相关,如果已有相关环境,或者有更好的部署方式做测试(如 docker),则可以跳过本节。
1.1 前置依赖
以下组件的版本仅限本次测试使用,生产环境可使用兼容的版本灵活替换。
1.2 部署 TiDB
本节依赖 Kafka 与 TiUP,请自行参考官方文档进行安装。
利用 TiUP 启动一个单机版 TiDB:
启动 cdc server:
tiup cdc server --pd=http://localhost:2379 --log-file=/tmp/ticdc/ticdc.log --addr=0.0.0.0:8301 --advertise-addr=127.0.0.1:8301 --data-dir=/tmp/log/ticdc
复制代码
将 change log 发送至 Kafka,这里 Kafka 的版本与地址替换成你安装的真实地址:
tiup cdc cli changefeed create --pd=http://127.0.0.1:2379 --sink-uri="kafka://localhost:9092/test_cdc?kafka-version=2.4.0&partition-num=1&max-message-bytes=67108864&replication-factor=1&protocol=default"
复制代码
1.3 部署 Flink
本节将会启动一个单机版 Flink 集群,并且给 Flink 安装相关依赖。
到 Flink 下载页面 下载 Flink 安装包,我们选择 1.13.2 的安装包,下载并解压:
wget http://archive.apache.org/dist/flink/flink-1.13.2/flink-1.13.2-bin-scala_2.12.tgz
tar -zxf flink-1.13.2-bin-scala_2.12.tgz
export FLINK_HOME=${PWD}/flink-1.13.2
复制代码
下载 Iceberg 的 Flink connector 并放入 Flink 的 lib 目录内,我们选择 0.13.1 的版本,因为本次我们测试的 Iceberg catalog 类型是 Hive,所以也需要额外下载一些 Hive 的依赖:
wget https://search.maven.org/remotecontent?filepath=org/apache/iceberg/iceberg-flink-runtime-1.13/0.13.1/iceberg-flink-runtime-1.13-0.13.1.jar
cp iceberg-flink-runtime-1.13-0.13.1.jar ${FLINK_HOME}/lib
wget https://search.maven.org/remotecontent?filepath=org/apache/iceberg/iceberg-hive-runtime/0.13.1/iceberg-hive-runtime-0.13.1.jar
cp iceberg-hive-runtime-0.13.1.jar ${FLINK_HOME}/lib
wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-hive-2.3.6_2.12/1.13.2/flink-sql-connector-hive-2.3.6_2.12-1.13.2.jar
cp flink-sql-connector-hive-2.3.6_2.12-1.13.2.jar ${FLINK_HOME}/lib
复制代码
下载 Hadoop 相关的依赖,如果机器已经安装 Hadoop,则可以通过配置 HADOOP_CLASSPATH 环境变量跳过此步骤:
wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-10.0/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
cp flink-shaded-hadoop-2-uber-2.8.3-10.0.jar ${FLINK_HOME}/lib
复制代码
配置 HADOOP_CONF,用于解析 HDFS 的高可用地址,如果你是单节点 HDFS 或者是负载均衡代理的地址,则可以跳过此步骤,注意下面的 ${HADOOP_CONF_DIR} 需要换成你真实的 Hadoop 配置目录:
export HADOOP_CONF_DIR=${HADOOP_CONF_DIR}
复制代码
编译 TiDB 的 Flink connector 并放入 Flink 的 lib 内:
git clone git@github.com:tidb-incubator/TiBigData.git
cd TiBigData
mvn clean package -DskipTests -am -pl flink/flink-1.13 -Ddep.flink.version=1.13.2 -Dmysql.driver.scope=compile -Dflink.jdbc.connector.scope=compile -Dflink.kafka.connector.scope=compile
cp TiBigData/flink/flink-1.13/target/flink-tidb-connector-1.13-0.0.5.jar ${FLINK_HOME}/lib
复制代码
此时你的 Flink lib 目录的文件应该如下:
find ${FLINK_HOME}/lib -name *.jar | sort
${FLINK_HOME}/flink-csv-1.13.2.jar
${FLINK_HOME}/flink-dist_2.12-1.13.2.jar
${FLINK_HOME}/flink-json-1.13.2.jar
${FLINK_HOME}/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
${FLINK_HOME}/flink-shaded-zookeeper-3.4.14.jar
${FLINK_HOME}/flink-sql-connector-hive-2.2.0_2.12-1.13.2.jar
${FLINK_HOME}/flink-table_2.12-1.13.2.jar
${FLINK_HOME}/flink-table-blink_2.12-1.13.2.jar
${FLINK_HOME}/flink-tidb-connector-1.13-0.0.5-SNAPSHOT.jar
${FLINK_HOME}/iceberg-flink-runtime-1.13-0.13.1.jar
${FLINK_HOME}/iceberg-hive-runtime-0.13.1.jar
${FLINK_HOME}/log4j-1.2-api-2.12.1.jar
${FLINK_HOME}/log4j-api-2.12.1.jar
${FLINK_HOME}/log4j-core-2.12.1.jar
${FLINK_HOME}/log4j-slf4j-impl-2.12.1.jar
复制代码
编辑 Flink conf,加入测试相关的配置,主要是 slot 数与 checkpoint 相关配置:
echo 'taskmanager.numberOfTaskSlots: 2' >> ${FLINK_HOME}/conf/flink-conf.yaml
echo 'execution.checkpointing.interval: 5s' >> ${FLINK_HOME}/conf/flink-conf.yaml
echo 'execution.checkpointing.timeout: 60s' >> ${FLINK_HOME}/conf/flink-conf.yaml
复制代码
最后,启动 flink:
${FLINK_HOME}/bin/start-cluster.sh
复制代码
2 创建同步任务
本节描述如何在 Flink 内创建 TiDB 到 Iceberg 的同步任务。
首先在 TiDB 内创建一张带主键的表,并插入一些数据:
mysql --host 127.0.0.1 --port 4000 -uroot --database test
CREATE TABLE `test`.`people`(
`id` bigint primary key,
`name` varchar(16)
);
INSERT INTO `test`.`people` values(1,'name1'),(2,'name2'),(3,'name3');
复制代码
启动 Flink SQL Client:
${FLINK_HOME}/bin/sql-client.sh
复制代码
以下操作均发生在 Flink SQL Client 内。
创建 Iceberg catalog,并在 Iceberg 内创建一张 Iceberg 的表,用于实时接受 TiDB 的数据:
CREATE CATALOG `iceberg` WITH(
'type' = 'iceberg',
'property-version' = '1',
'catalog-type' = 'hive',
'hive-conf-dir' = '/etc/hive/conf',
'cache-enabled' = 'false'
);
CREATE TABLE `iceberg`.`default`.`people` (
`id` BIGINT,
`name` STRING,
PRIMARY KEY (`id`) NOT ENFORCED
)
with(
'format-version'='2',
'engine.hive.enabled' = 'true'
);
复制代码
创建 TiDB catalog:
CREATE CATALOG `tidb`
WITH (
'type' = 'tidb',
'tidb.database.url' = 'jdbc:mysql://localhost:4000/',
'tidb.username' = 'root',
'tidb.password' = '',
'tidb.streaming.source' = 'kafka',
'tidb.streaming.codec' = 'json',
'tidb.streaming.kafka.bootstrap.servers' = 'localhost:9092',
'tidb.streaming.kafka.topic' = 'test_cdc',
'tidb.streaming.kafka.group.id' = 'test_cdc_group',
'tidb.streaming.ignore-parse-errors' = 'true'
);
复制代码
读取 TiDB 的数据并插入到 Iceberg 内:
INSERT INTO `iceberg`.`default`.`people` SELECT * FROM `tidb`.`test`.`people`;
复制代码
查看 Flink 的 web 页面,一般是 http://localhost:8081 ,如果任务正常启动,则可以进行实时数据的查询。
3 增删改查实时数据
实时任务启动后,我们可以尝试查询数据,此操作发生在 Flink SQL Client 内:
SET 'sql-client.execution.result-mode' = 'tableau';
SELECT * FROM `iceberg`.`default`.`people`;
复制代码
然后我们用 MySQL 客户端在 TiDB 内进行增删改查,在下面每一条 SQL 在 MySQL 客户端执行完毕后,我们都到 Flink SQL Client 内查询 Iceberg 内的数据。
INSERT INTO `test`.`people` values(4,'name4');
DELETE FROM `test`.`people` WHERE id = 1;
UPDATE `test`.`people` SET name = 'name1' WHERE id = 4;
复制代码
当然,由于大部分 OLAP 引擎均兼容 Iceberg,你也可以用 Hive/Presto/Trino/Spark 来查询 Iceberg。最后我们可以看到数据的变化如下:
4 数据校验
实时计算中最常见的问题就是如何保证数据的一致性,因为实时任务链路复杂,依赖太多,可能在任务运行了一段时间后,TiDB 在 Iceberg 中的实时数据与 TiDB 内的真实数据存在误差,本节介绍如何校验 TiDB 与 Iceberg 内的数据。
数据校验的方式简单描述如下:从 Iceberg 与 TiDB 分别导出一份特定版本的快照数据,按照 id 比较每一条数据的内容是否相同。
TiDB 导出快照数据可以直接借助 TiBigData 内的 Flink connector:
CREATE CATALOG `tidb_snapshot`
WITH (
'type' = 'tidb',
'tidb.database.url' = 'jdbc:mysql://localhost:4000/',
'tidb.username' = 'root',
'tidb.password' = ''
);
SELECT * FROM `tidb_snapshot`.`test`.`people`/*+ OPTIONS('tidb.snapshot_version'= '${snapshot}') */;
复制代码
如果表不是特别大,也可以利用 MySQL 客户端直接查询:
set @@tidb_snapshot="2016-10-08 16:45:26";
SELECT * FROM `test`.`people`;
复制代码
下面将介绍如何从 Iceberg 导出特定版本的 TiDB 快照数据。
4.1 Metadata 列介绍
TiBigData 内的 Flink connector 提供了一些 metadata 列,可以获取数据的一些额外信息,用于校验数据。
可选的 metadata 列如下:
想要开启 metadata 列,只需要在 Flink 内创建 TiDB 的 Catalog 时,设置对应属性即可(其他属性已经省略):
CREATE CATALOG `tidb`
WITH (
'tidb.metadata.included' = '*'
);
复制代码
这里配置的 * 的含义是包含所有的 metadata 列,并且不改变它们的列名,如果想选择部分 metadata 列,或者是修改它们的列名(防止列名重复),可以参考下面的配置方式。
比如我们只需要 commit_version 与 source_event,并且想将 commit_version 重命名为 _commit_version,将 source_event 重命名为 _source_event,则配置方式如下:
CREATE CATALOG `tidb`
WITH (
'tidb.metadata.included' = 'commit_version=_commit_version,source_event=_source_event'
);
复制代码
接下来我们以上面的 people 表为例,开启所有的 metadata:
CREATE CATALOG `tidb_snapshot`
WITH (
'type' = 'tidb',
'tidb.database.url' = 'jdbc:mysql://localhost:4000/',
'tidb.username' = 'root',
'tidb.password' = '',
'tidb.metadata.included' = '*'
);
复制代码
开启 metadata 后,我们在 Flink SQL Client 内查看一下表结构,metadata 列会被拼接到表字段的最后:
尝试查询一下这个表,可以看到除了 TiDB 本身已有的列外,metadata 列也被打印出来了:
也就是说,在将 TiDB 的数据写入 Iceberg 的时候,将 metadata 列也一起写入到 Iceberg,这样得到的 Iceberg 表也包含了数据的版本信息。
所以如果想要支持数据校验,Iceberg 建表时,结构应该如下:
CREATE TABLE `iceberg`.`default`.`people` (
`id` BIGINT,
`name` STRING,
`commit_version` BIGINT,
`commit_timestamp` TIMESTAMP,
`source_event` STRING
PRIMARY KEY (`id`) NOT ENFORCED
)
with(
'format-version'='2',
'engine.hive.enabled' = 'true'
);
复制代码
4.2 校验
选定一个快照时间,下面用 ${version} 代替,分别从 Iceberg 及 TiDB 内查询对应版本的数据,比较这两份数据是否相同即可。
SELECT * FROM `iceberg`.`default`.`people` WHERE commit_version <= ${snapshot};
SELECT * FROM `tidb_snapshot`.`test`.`people`/*+ OPTIONS('tidb.snapshot_version'= '${snapshot}') */;
复制代码
请注意,比较的时候,只需要按照 id 比较对应的数据,metadata 列的值不需要比较。
你也可以将这两份数据都导入到 Hive 内,利用 Spark 或者 Presto 之类的 OLAP 进行 join 来验证数据的准确性。
评论