写点什么

Apache SeaTunnel(Incubating) 与计算引擎的解耦之道,重构 API 我们做了些什么

  • 2022 年 5 月 23 日
  • 本文字数:4299 字

    阅读完需:约 14 分钟

Apache SeaTunnel(Incubating)与计算引擎的解耦之道,重构API我们做了些什么

Apache SeaTunnel (Incubating) 与 Apache Inlong (Incubating) 的 5 月联合 Meetup 中,第二位分享的嘉宾是来自白鲸开源的高级工程师李宗文。在使用 Apache SeaTunnel (Incubating) 的过程中,他发现了 Apache SeaTunnel (Incubating) 存在的四大问题:Connector 实现次数多、参数不统一、难以支持多个版本的引擎以及引擎升级难的问题。为了解决以上的难题,李宗文将目标放在将 Apache SeaTunnel (Incubating)与计算引擎进行解耦,重构其中 Source 与 Sink API,实现改良了开发体验。


本次演讲主要包含四个部分:


  • Apache SeaTunnel (Incubating)重构的背景和动机

  • Apache SeaTunnel (Incubating)重构的目标

  • Apache SeaTunnel (Incubating)重构整体的设计

  • Apache SeaTunnel (Incubating) Source API 的设计

  • Apache SeaTunnel (Incubating) Sink API 的设计



​李宗文白鲸开源 高级工程师 Apache SeaTunnel(Incubating) & Flink Contributor, Flink CDC & Debezium Contributor


01 重构的背景与动机

01 Apache SeaTunnel(Incubating)与引擎耦合


用过 Apache SeaTunnel (Incubating) 的小伙伴或者开发者应该知道,目前 Apache SeaTunnel (Incubating) 与引擎完全耦合,完全基于 Spark、Flink 开发,其中的配置文件参数都基于 Flink、Spark 引擎。从贡献者和用户的角度出发,我们能发现一些问题。


从贡献者的角度:反复实现 Connector,没有收获感;潜在贡献者由于引擎版本不一致无法贡献社区;


从用户的角度:目前很多公司采用 Lambda 架构,离线作业使用 Spark,实时作业使用 Flink, 使用中就会发现 SeaTunnel 的 Connector 可能 Spark 有,但是 Flink 没有,以及两个引擎对于同一存储引擎的 Connector 的参数也不统一,有较高的使用成本,脱离了 SeaTunnel 简单易用的初衷;还有用户提问说目前支不支持 Flink 的 1.14 版本,按照目前 SeaTunnel 的架构,想要支持 Flink 的 1.14 就必须抛弃之前的版本,因此这也会对之前版本的用户造成很大的问题。


因此,我们不管是做引擎升级或者支持更多的版本的用户都很困难。


另外 Spark 和 Flink 都采用了 Chandy-lamport 算法实现的 Checkpoint 容错机制,也在内部进行了 DataSet 与 DataStream 的统一,以此为前提我们认为解耦是可行的。


02 Apache SeaTunnel(Incubating)与引擎解耦


因此为了解决以上提出的问题,我们有了以下的目标:


  • Connector 只实现一次:针对参数不统一、Connector 多次实现的问题,我们希望实现一个统一的 Source 与 Sink API;

  • 支持多个版本的 Spark 与 Flink 引擎:在 Source 与 Sink API 上再加入翻译层去支持多个版本与 Spark 和 Flink 引擎,解耦后这个代价会小很多。

  • 明确 Source 的分片并行逻辑和 Sink 的提交逻辑:我们必须提供一个良好的 API 去支持 Connector 开发;

  • 支持实时场景下的数据库整库同步:这个是目前很多用户提到需要 CDC 支持衍生的需求。我之前参与过 Flink CDC 社区,当时有许多用户提出在 CDC 的场景中,如果直接使用 Flink CDC 的话会导致每一个表都持有一个链接,当遇到需要整库同步需求时,千张表就有千个链接,该情况无论是对于数据库还是 DBA 都是不能接受的,如果要解决这个问题,最简单的方式就是引入 Canal、Debezium 等组件,使用其拉取增量数据到 Kafka 等 MQ 做中间存储,再使用 Flink SQL 进行同步,这实际已经违背了 Flink CDC 最早减少链路的想法,但是 Flink CDC 的定位只是一个 Connector,无法做全链路的需求,所以该 proposal 在 Flink CDC 社区中没有被提出,我们借着本次重构,将 proposa 提交到了 SeaTunnel 社区中。

  • 支持元信息的自动发现与存储:这一部分用户应该有所体验,如 Kafka 这类存储引擎,没有记录数据结构的功能,但我们在读取数据时又必须是结构化的,导致每次读取一个 topic 之前,用户都必须定义 topic 的结构化数据类型,我们希望做到用户只需要完成一次配置,减少重复的操作。


可能也有同学有疑惑为什么我们不直接使用 Apache Beam,Beam 的 Source 分为 BOUNDED 与 UNBOUNDED,也就是需要实现两遍,并且有些 Source 与 Sink 的特性也不支持,具体所需的特性在后面会提到;


03 Apache SeaTunnel(Incubating)重构整体的设计


Apache SeaTunnel(Incubating) API 总体结构的设计如上图;


Source & Sink API: 数据集成的核心 API 之一,明确 Source 的分片并行逻辑和 Sink 的提交逻辑,用于实现 Connector;


Engine API:Translation: 翻译层,用于将 SeaTunnel 的 Souce 与 Sink API 翻译成引擎内部可以运行的 Connector;


Execution: 执行逻辑,用于定义 Source、Transform、Sink 等操作在引擎内部的执行逻辑;Table API:


Table SPI:主要用于以 SPI 的方式暴露 Source 与 Sink 接口,并明确 Connector 的必填与可选参数等;


DataType:SeaTunnel 的数据结构,用于隔离引擎,声明 Table Schema 等;


Catalog:用于获取 Table Scheme、Options 等;


Catalog Storage: 用于存储用户定义 Kafka 等非结构化引擎的 Table Scheme 等;


​添加图片注释,不超过 140 字(可选)


从上图是我们现在设想的执行流程:


  • 从配置文件或 UI 等方式获取任务参数;

  • 通过参数从 Catalog 中解析得到 Table Schema、Option 等信息;

  • 以 SPI 方式拉起 SeaTunnel 的 Connector,并注入 Table 信息等;

  • 将 SeaTunnel 的 Connector 翻译为引擎内部的 Connector;

  • 执行引擎的作业逻辑,图中的多表分发目前只存在 CDC 整库同步场景下,其他 Connector 都是单表,不需要分发逻辑;



从以上可以看出,最难的部分是如何将 Apache SeaTunnel(Incubating) 的 Source 和 Sink 翻译成引擎内部的 Source 和 Sink。


当下许多用户不仅把 Apache SeaTunnel (Incubating) 当做一个数据集成方向的工具,也当做数仓方向的工具,会使用很多 Spark 和 Flink 的 SQL,我们目前希望能够保留这样的 SQL 能力,让用户实现无缝升级。



根据我们的调研,如上图,是对 Source 与 Sink 的理想执行逻辑,由于 SeaTunnel 以 WaterDrop 孵化,所以图上的术语偏向 Spark;


理想情况下,在 Driver 上可以运行 Source 和 Sink 的协调器,然后 Worker 上运行 Source 的 Reader 和 Sink 的 Writer。在 Source 协调器方面,我们希望它能支持几个能力。


一、是数据的分片逻辑,可以将分片动态添加到 Reader 中。


二、是可以支持 Reader 的协调。SourceReader 用于读取数据,然后将数据发送到引擎中流转,最终流转到 Source Writer 中进行数据写入,同时 Writer 可以支持二阶段事务提交,并由 Sink 的协调器支持 Iceberg 等 Connector 的聚合提交需求;


04 Source API


通过我们的调研,发现 Source 所需要的以下特性:


  • 统一离线和实时 API:Source 只实现一次,同时支持离线和实时;


  • 能够支持并行读取:比如 Kafka 每一个分区都生成一个的读取器,并行的执行;


  • 支持动态添加分片:比如 Kafka 定于一个 topic 正则,由于业务量的需求,需要新增一个 topic,该 Source API 可以支持我们动态添加到作业中。


  • 支持协调读取器的工作:这个目前只发现在 CDC 这种 Connector 需要支持。CDC 目前都是基于 Netfilx 的 DBlog 并行算法去支持,该情况在全量同步和增量同步两个阶段的切换时需要协调读取器的工作。


  • 支持单个读取器处理多张表:即由前面提到的支持实时场景下的数据库整库同步需求;



对应以上需求,我们做出了基础的 API,如上图,目前代码以提交到 Apache SeaTunnel(Incubating)的社区中 api-draft 分支,感兴趣的可以查看代码详细了解。


如何适配 Spark 和 Flink 引擎


Flink 与 Spark 都在后面统一了 DataSet 与 DataStream API,即能够支持前两个特性,那么对于剩下的 3 个特性:


  • 如何支持动态添加分片?

  • 如何支持协调读取器?

  • 如何支持单个读取器处理多张表?


带着问题,进入目前的设计。



我们发现除了 CDC 之外,其他 Connector 是不需要协调器的,针对不需要协调器的,我们会有一个支持并行的 Source,并进行引擎翻译。


如上图中左边是一个分片的 enumerator,可以列举 source 需要哪些分片,有哪些分片,实时进行分片的枚举,随后将每个分片分发到真正的数据读取模块 SourceReader 中。对于离线与实时作业的区分使用 Boundedness 标记,Connector 可以在分片中标记是否有停止的 Offset,如 Kafka 可以支持实时,同时也可以支持离线。ParallelSource 可以在引擎设置任意并行度,以支持并行读取。



在需要协调器的场景,如上图,需要在 Reader 和 Enumerator 之间进行 Event 传输, Enumerator 通过 Reader 发送的 Event 进行协调工作。Coordinated Source 需要在引擎层面保证单并行度,以保证数据的一致性;当然这也不能良好的使用引擎的内存管理机制,但是取舍是必要的;


对于最后一个问题,我们如何支持单个读取器处理多张表。这会涉及到 Table API 层,通过 Catalog 读取到了所有需要的表后,有些表可能属于一个作业,可以通过一个链接去读取,有些可能需要分开,这个依赖于 Source 是怎么实现的。基于这是一个特殊需求,我们想要减少普通开发者的难度,在 Table API 这一层,我们会提供一个 SupportMultipleTable 接口,用于声明 Source 支持多表的读取。Source 在实现时,要根据多张表实现对应的反序列化器。针对衍生的多表数据如何分离,Flink 将采用 Side Output 机制,Spark 预想使用 Filter 或 Partition 机制。


05 Sink API


目前 Sink 所需的特性并不是很多,经过调研目前发现有三个需求:


  • 幂等写入,这个不需要写代码,主要看存储引擎是否能支持。

  • 分布式事务,主流是二阶段提交,如 Kafka 都是可以支持分布式事务的。

  • 聚合提交,对于 Iceberg、hoodie 等存储引擎而言,我们不希望有小文件问题,于是期望将这些文件聚合成一个文件,再进行提交。


基于以上三个需求,我们有对应的三个 API,分别是 SinkWriter、SinkCommitter、SinkAggregated Committer。SinkWriter 是作为基础写入,可能是幂等写入,也可能不是。SinkCommitter 支持二阶段提交。SinkAggregatedCommitter 支持聚合提交。



理想状态下,AggregatedCommitter 单并行的在 Driver 中运行,Writer 与 Committer 运行在 Worker 中,可能有多个并行度,每个并行度都有自己的预提交工作,然后把自己提交的信息发送给 Aggregated Committer 再进行聚合。


目前 Spark 和 Flink 的高版本都支持在 Driver(Job Manager)运行 AggregatedCommitter,worker(Job Manager)运行 writer 和 Committer。



但是对于 Flink 低版本,无法支持 AggregatedCommitter 在 JM 中运行,我们也进行翻译适配的设计。Writer 与 Committer 会作为前置的算子,使用 Flink 的 ProcessFunction 进行包裹,支持并发的预提交与写入工作,基于 Flink 的 Checkpoint 机制实现二阶段提交,这也是目前 Flink 的很多 Connector 的 2PC 实现方式。这个 ProcessFunction 会将预提交信息发送到下游的 Aggregated Committer 中,Aggregated Committer 可以采用 SinkFunction 或 Process Function 等算子包裹,当然,我们需要保证 AggregatedCommitter 只会启动一个,即单并行度,否则聚合提交的逻辑就会出现问题。


感谢各位的观看,如果大家对具体实现感兴趣,可以去 Apache SeaTunnel (Incubating) 的社区查看 api-draft 分支代码,谢谢大家。

发布于: 刚刚阅读数: 4
用户头像

还未添加个人签名 2022.03.07 加入

Apache SeaTunnel(Incubating) 是一个分布式、高性能、易扩展、用于海量数据(离线&实时)同步和转化的数据集成平台。

评论

发布
暂无评论
Apache SeaTunnel(Incubating)与计算引擎的解耦之道,重构API我们做了些什么_Apache_Apache SeaTunnel_InfoQ写作社区