TiDB x Flink 数据集成实践
原文来源:https://tidb.net/blog/8f0c72ad
作者简介:胡梦宇,知乎大数据基础架构开发工程师.
张允禹,知乎核心架构开发工程师
盛亮,知乎核心架构开发工程师
1 背景
1.1 TiDB 简介
TiDB 是 PingCAP 公司自主设计、研发的开源分布式关系型数据库,是一款同时支持在线事务处理与在线分析处理 (Hybrid Transactional and Analytical Processing, HTAP)的融合型分布式数据库产品,具备水平扩容或者缩容、金融级高可用、实时 HTAP、云原生的分布式数据库、兼容 MySQL 5.7 协议和 MySQL 生态等重要特性。目标是为用户提供一站式 OLTP (Online Transactional Processing)、OLAP (Online Analytical Processing)、HTAP 解决方案。TiDB 适合高可用、强一致要求较高、数据规模较大等各种应用场景。
TiDB 由于其高可用,高性能,易扩展的特性,在知乎被大规模使用,并应用到一些核心业务场景。
1.2 数据集成平台简介
在大数据场景经常会有这样的需求,将存储在不同数据源(如 MySQL, TiDB, Redis, Kafka, Pulsar 等)内的数据进行联合查询分析,产出一系列重要的报表,业务方用于衡量收益或者调整战略。
为满足上述场景,知乎基于 Flink 构建了数据集成平台,利用 Flink 丰富的 connector 为用户提供了实时和离线的数据同步及清洗功能,这样用户可以选择将不同数据源的数据导入到同一种数据源(通常是 Hive 和 HDFS)中存储, 再借助一系列 OLAP 计算引擎如 MapReduce, Spark, Presto, ClickHouse, Doris 等进行进一步分析处理。
2 TiDB 给数据集成平台带来的挑战
TiDB 在知乎内部主要是用于替代 MySQL, 以解决 MySQL 扩展能力不足的问题。在功能方面,因为 TiDB 兼容 MySQL 协议,所以在绝大多数场景 TiDB 都能完美替代 MySQL, 甚至在某些特定场景(如超大表存储, 高并发写入等)更有优势。因此在很长一段时间内,数据集成平台都没有将 TiDB 作为一个单独的数据源,而是将其作为 MySQL, 使用 MySQL 的相关工具来进行兼容。
在数据集成场景将 TiDB 作为 MySQL 确实很方便省事,但是同时也为我们带来一些问题,下面将分几个小节来说明。
2.1 数据倾斜与数据版本问题
在将 TiDB 作为 MySQL 时,我们使用的是 Flink 社区提供的 flink-jdbc-connector. flink-jdbc-connector 在读取数据时,会使用如下 SQL:
在 Flink 任务真正运行的时候,每个子任务会对分区字段选取不同的范围值做数据分片,从而达到并发抽数的效果。这样进行数据分片会有两个问题:首先是不同的数据分片之间数据版本不一致,因为每一条 SQL 都会发放到不同的 task 去执行,不同的 task 执行的时间是不同的,读到的数据版本也是不同的;其次是需要保证每张表有一个适合分区的字段,一般分区字段会选择有唯一键约束的数值型字段,这样可以保证每一个分区内的数据在分区字段上不会重复,每一个分区内的数据条数可控。以下是我们使用 flink-jdbc-connector 时,遇到的痛点:
数据分区需要自己根据分区字段的上下界设置,比较依赖经验,不够智能;
表内存在唯一键约束的数值型字段,但是不连续(比如雪花算法生成的 id 以及 auto_random 生成的 id 等),这种情况在 TiDB 的超大表上出现的尤为频繁。TiDB 的超大表一般是由原来 MySQL 的分库分表迁移过来的,读写量十分巨大,业务方为了提高吞吐,一般会给主键加上 auto_random 的关键字,导致主键的值跨度非常大而且不连续,几乎无法按照范围进行数据分片;
表内有没有唯一键约束的数值型字段,强行按照非唯一键约束的数值型字段分区,当该字段重复数据较多时,会造成数据分片不均匀,导致 Flink job 运行时间较长或者 OOM 失败;
表内既没有唯一键约束的数值型字段,也没有非唯一键约束的数值型字段,只能一张表一个数据分片,无法做到并发扫表,也会导致 Flink job 运行时间较长或者 OOM 失败;
2.2 从库与多数据中心问题
为了不将所有鸡蛋放在同一个篮子里,知乎目前是采取多数据中心的方案,数据中心按业务场景分为两部分,第一部分是在线数据中心,主要是知乎主站的一些服务(如评论,已读等),存储(如 MySQL, TiDB, Redis 等)和消息系统(如 Kafka, Pulsar 等);第二部分是离线数据中心,主要是离线分析场景的一些大数据计算引擎和存储(如 HDFS, Yarn, Hive 等)。每个数据中心包含多个机房,做备灾与容错。
数据集成平台在读 MySQL 内的数据时,一般是读从库的数据,保证读写分离。其实在很多时候,用户需要从在线数据中心的 MySQL 抽数,导入到离线数据中心的 HDFS, 供 Hive 表查询分析,这样会存在着跨数据中心同步数据的情况,如果同时运行多个同步任务,专线很快就会打满,影响其他跨专线的服务。
跨数据中心数据同步方案常见的有两种,第一种是借助 CDC(Change Data Capture) 的同步方案,将数据打散到每个时间点,避免出现峰值,这种方案通常需要借助消息系统和流式计算引擎等额外的组件,链路较长容易出错,好处是可以直接落异构数据源;第二种是同构数据库的主从复制方案,将从库跨数据中心部署,向从库取数,本质上也是借助 CDC, 好处是大部分数据库原生支持,不需要做额外处理,简单好维护,坏处是从库比较浪费资源。这两种方案不能说谁好谁坏,根据业务场景灵活选取即可,知乎内部这两种方案都有,我们本次讨论的重点是第二种同步方式。
我们将 MySQL 的从库跨数据中心部署,数据集成平台在进行数据同步时,会根据目标数据源所在的数据中心来选择 MySQL 在对应数据中心的从库,架构图大致如下:
同样,数据集成平台在读取 TiDB 内数据时,我们也是从 TiDB 的从库内抽数,只不过这里的从库是由 TiCDC 同步而来。架构图大致如下:
这个方案虽然解决了跨数据中心抽数的问题,但是也引入了一些新的问题。
首先是利用从库同步的方案必定会存在成本问题,不管是 MySQL 从库还是 TiDB 从库,都需要在两个数据中心分别搭建两个同等规模的数据库集群,服务器和人力成本消耗都是非常惊人的,尤其是被当作从库的集群, 除了在抽数的时候被用到,其他时候根本不会被业务所使用,属于资源的极大浪费。
其次是 TiCDC 的问题,在某些极端的场景下,旧版本的 TiCDC 存在着以下问题:
早期版本 TiCDC 在执行 DDL 的时候会阻塞当前集群所有 CDC 任务,而某些大库的索引类 DDL 可能会执行多天,具体可参考 issue, 这是不可接受的。这个问题已经在 v4.0.15 版本被修复,现在的情况为「异步执行 DDL 语句,不阻塞其他 changefeed」;
TiCDC 在同步的过程中会和每一个 region 建立连接,单独启动一个 goroutine, 去监听扫描出来的变更数据,region 多了之后 goroutine 也会变多。当 goroutine 过多时, 一些用于心跳探活的 goroutine 就难以被调度到,这时 TiCDC Owner 会认为 changefeed 挂掉了,然后重新调度任务和检测数据,这个故障可能会重复发生,导致数据延迟。此问题预期在 v4.0.16 会得到修复。
TiCDC 修复、测试以及发版需要一定的时间,而业务方对数据同步又有比较强烈的需求,因此,我们需要重新拟定数据同步的方案。
3 数据集成平台对 TiDB 的优化
数据集成平台对 TiDB 的优化主要分为两部分,首先,我们不再使用 Flink 社区提供的 flink-jdbc-connector, 而是自研了更符合 TiDB 分布式架构的 flink-tidb-connector — TiBigData; 其次,我们对 TiDB 的部署做了一些优化,解决了从库资源浪费的问题。
3.1 TiDB 原生 Connector - TiBigData
数据集成平台对 TiDB 数据同步的第一个优化体现在引擎方面。
3.1.1 基于 TiKV Region 的数据分片
TiBigData 首先解决的就是 TiDB 的数据分片问题,这里需要了解一下 TiKV Region 的概念。
TiKV Region:TiKV 将数据按照 key 的范围划分成大致相等的切片(这些切片就称为 Region),每一个 Region 会有多个副本(通常是 3 个),其中一个副本是 Leader,提供读写服务,其他的称为 Follower,在 3.1 版本后,TiDB 提供了 follower read 功能。TiKV 会确保每个 Region 的大小在一定的范围内:当某个 Region 的大小超过一定限制(默认是 144MB)后,TiKV 会将它分裂为两个或者更多个 Region;当某个 Region 因为大量的删除请求导致 Region 的大小变得更小时,TiKV 会将比较小的两个相邻 Region 合并为一个。
由于 TiDB 的数据存储在 TiKV 上,而 TiKV 具有自己天然的数据分片单位 Region. 一个很自然的想法就是我们在读取 TiDB 内的数据时,能不能绕过 TiDB, 按照 Region 直接读取 TiKV 内的数据,答案是肯定的,tikv-java-client 为我们提供了这个可能。最后 TiBigData 实现了如下功能:
利用 Region 对 TiDB 进行数据分片,每个 Flink Task 只处理一个 Region 的数据,TiKV 会保证每一个数据分片的数据量相差不大,因此不会存在数据倾斜的情况;
分片过程完全自动,无需手动填写配置,并且支持 TiDB 任意表结构,包括无主键表以及 auto_random 主键表;
从 TiKV 内读取数据后,键值对的解码过程放在 Flink 内,节省 TiDB 宝贵的计算资源;
具有更高的并发,理论上并发数可以与 Region 数相同。
TiBigData 读取一个 Region 的时间平均在 6 秒,我们同步的最大表约为 8T, 在 Flink 20 并发,每个 taskmanager 4G 1Core 的情况下,只需要 4 小时。
3.1.2 基于 TiKV MVCC 的 Snapshot Read
TiBigData 解决的第二个问题就是数据版本的问题,在使用 flink-jdbc-connector 时,会存在每个分片的数据版本不同的情况,无法做到全局一致的事务读取。但是 TiKV 是有 MVCC 相关概念的,并且 tikv-java-client 为我们提供了访问不同版本数据的 API, 为我们实现 TiDB 的快照读取提供了可能。TiBigData 对快照读取的支持如下:
支持用户自己设置版本,用户可以读取 TiDB 当前最新版本的数据,也可以读取指定版本的旧数据,这里的版本可以理解为特定时间戳 ;
在 JobGraph 生成时,如果用户未设置版本,TiBigData 会将每个数据分片的版本设置为最新版本,读取最新版本的数据。
这样能保证所有数据分片的版本相同,可以做到分布式抽数发生在同一个事务里。
3.2 基于 Follower Read 的 TiDB “从库”
数据集成平台对 TiDB 数据同步的另一部分优化体现在部署方面。TiDB 为我们的数据同步带来了新的更优秀的方案。
TiKV 使用多副本来保证数据的安全性,使用 Raft 算法实现了分布式环境下面数据的强一致性,一个很自然的想法就是将副本跨数据中心部署,在对 TiDB 抽数时,访问对应数据中心的副本即可。这样做的好处有以下几点:
便于维护,不需要借助 TiCDC 与 第二套 TiDB, 完全通过 TiKV 自身的 Raft 协议维护数据的一致性,没有额外的组件运维负担;
节约成本,相比于需要搭建第二套 TiDB 集群,现在只需要将一个副本跨数据中心部署,无任何额外的成本;
读写分离,跨数据中心放置的副本可以是除 Leader 外的任何角色,只要多数副本在主要的数据中心,就不会对线上业务造成任何影响,完美实现读写分离。
我们在 TiBigData 内实现了 TiKV 的 Follower Read, 支持选取任意角色读取 TiKV 内的数据,包括 Leader, Follower 和 Learner. 架构图如下:
4 展望
目前 TiBigData 已完成的工作都是 TiDB 读取相关,包括:
Flink-TiDB-Connector;
Presto-TiDB-Connector;
MapReduce-TiDB-Connector.
后续我们会对 TiDB 写入做原生的支持,目前已经有一个实验性的 PR, 也欢迎大家一起参与进来。
评论