T3 出行 Apache Kyuubi Flink SQL Engine 设计和相关实践
在日前的 Apache SeaTunnel & Kyuubi 联合 Meetup 上,T3 出行大数据平台负责人、 Apache Kyuubi committer 杨华和 T3 出行高级大数据工程师李心恺共同分享了 Apache Kyuubi(Incubating) 在 T3 出行的最新实践与应用,包括基于 Kyuubi 设计的 Flink SQL Engine,Kyuubi 与 Apache Linkis 的集成,以及在 T3 出行的落地实践。
JDBC 之于 Flink 的现状
首先我们来聊一下 Apache Flink 社区 JDBC 的现状,Flink 在最初是一个流式计算(data flow)的模型,后来将批处理也引入了 data flow 上来,形成了“流批一体”的理念。
Flink 的批处理逐步在发展,但相比于一些成熟的离线计算引擎,比如 Hive、Spark SQL 而言,还不是很成熟,表现之一就是对 JDBC 规范的支持比较欠缺。当然 Flink 社区为此也做出了一些努力,这里给大家分享两来自 Flink 的德国的母公司 Ververica 所开源的两个项目,一个叫做 Flink SQL Gateway,一个叫做 Flink JDBC Driver。
这两个项目结合起来是能够让 Flink 支持 JDBC 的,但是这两个项目已经有一年多的时间不太活跃了,最后的一个贡献还停留在 2020 年。根据 Ververica 的现状估计,这两个项目再次活跃的可能性并不大。
Flink SQL Gateway 和 Flink JDBC Driver 的设计和实现,这是我基于源码画出的,大致的就是这两个组件结合起来,跟 Flink 交互提供对 JDBC 的支持。我们可以分成三层来看,最底下的 JDBC Driver 内部是通过 RESTful API 跟 Flink SQL Gateway 的进程所提供的服务去交互,走的是一个 HTTP 的协议。Flink SQL Gateway 可以看作是一个围绕 flink-client 包装出来的服务,在内部引入了 Session 的概念,并且提供了 SessionManager,可以认为它对资源做了一些 share 或者 cache 的能力,主入口是 SQLGatewayEndpoint,内部也提供了很多 Operation 的实现。
这些 Operation 可以分为两大类,一类是本地的 Operation,在内存中对元数据进行管理,比如说 catalog 数据;另外一类是 JobOperation。这两类 Operation 去跟 Flink Cluster 交互,一个是 Insert ,一个是 Selector,向 Flink 集群去提交作业。
我们可以看到这两个设计和实现是专门针对 Flink 的。虽然 Flink 提出了流批一体的概念,但据我们了解,目前只用 Flink 来把流和批处理的能力全部实现的公司并不多,绝大部分公司还处于 Spark 和 Flink 两个引擎共存的局面。所以对于一些通用能力,比如说多租户或者是对 JDBC 的支持,我们希望能够提供一个抽象了的实现,能够同时 Flink 和 Spark 这样的引擎,未来能够支持更多的引擎,所以我们的考虑,就是直接基于 Kyuubi 来 Flink SQL Engine 的集成。
Flink SQL Engine 的设计与实现
下面介绍 Flink SQL Engine 的设计和实现。这是我画的一个高层抽象的组件的交互图,也可以拆成三个层次。最底下是一些 JDBC 或者 REST 的 client,他们会直接跟 Kyuubi Server 来交互。Kyuubi Server 内部分了两层的实现,最前端是 frontend layer,提供了不同协议的 frontend 的实现,后面是 backend layer,再往后这个 backend 会去跟 Engine 的 frontend 去进行交互。
Engine 在 Kyuubi 里面的设计其实也分为 frontend 和 backend 两个层次。在 Flink Engine 的实现,我们提供了 FlinkThrift Frontend Service, backend layer 我们提供了一个 Flink SQL Backend Service,它会去跟 Flink SQL Manager 和 Operation Manager 去交互。
Kyuubi Flink Engine 在整个 Kyuubi 的生态里面所处的位置,是在纵向的第五排 Kyuubi Engine .我们也可以看到社区即将推出的对 Trino(Presto SQL) 的支持,整体来看 Kyuubi 对其他技术的态度其实是很开放的,它并不要求我们对其他组件去做很多的改动来适配这套体系,这在企业技术选型中,对异构的架构特别友好。
Flink SQL QuickStart DEMO
接下来我们来看 Flink SQL QuickStart 的 DEMO,第一步我们是基于 Kyuubi 的源码来构建一个二进制的发布包,因为在 1.5.0 版本之前 release 的包里面, Flink SQL Engine 还没有被包含进去。(目前 Kyuubi 1.5.0 版本已经发布,包含了 Flink SQL Engine (Beta) 的支持,详见https://kyuubi.apache.org/release/1.5.0-incubating.html)
这是 Mac 本机的一个 DEMO,所以我们需要 Hadoop classpath 添加到加到 PATH 环境变量,并启动一个 Flink 本地集群。下一步就可以启动 Kyuubi Server,再利用 Kyuubi 自带的 Beeline 去连上 Flink Engine。Kyuubi 会自动拉起一个 Flink Engine,它是一个 Java 进程。
进入到 Beeline 的交互式命令行里面去,选择特定的数据库之后,就可以执行一些 DDL 和 DML 的操作。这里我们的示例是创建一张表,然后调了一个 Insert 的语句。执行结束之后,我们打开 Flink 的 WebUI,就可以看到一个 Insert 的 Job 被提交了,并且已经执行完成。
这个 DEMO 比较简单,最近社区也有小伙伴提供了一个 on YARN 模式的 QuickStart 的支持和文档,有兴趣的小伙伴也可以基于这个文档去了解 on YARN 的提交方式。
Flink SQL Engine 的实现离不开 Flink 社区大佬们的鼎力支持。这里感谢一下阿里巴巴 Blink 团队的蒋晓峰同学,和网易游戏实时团队的林小柏同学,一直在社区里面比较活跃,贡献了很多 PR。
Flink SQL Engine 目前提供的功能,我大致列了列了一些:
常规的 DDL 和 DML 操作,基本上都是支持的
JDBC 规范的一些 Get 或者 Show 相关的方法也已经支持
Flink 所实现的一些设置或者重置属性相关的方法
UDF 的支持
目前的支持的 deploy 模式 Flink Session Standalone / on YARN 模式
Flink SQL Engine 展望
Flink SQL Engine 未来的计划,首先它的 deploy mode 会有一个很大的变化,主要是往 Application 模式和 Session 模式去提供支持,业界已经用得最多的,也是比较成熟的 Per-Job 模式会被废弃掉。Kyuubi 的 Flink Engine 未来应该主要是集中在 Application 模式,Session 模式也支持,但并不是第一选择。
其他的一些规划,包括 Flink SQL Engine on YARN(application) 的支持,Flink SQL Engine on Kubernetes 的支持,另外是对 Kyuubi 共享级别的增强,还有一些使用上的提升,比如支持 Session 里面的 JAR 的管理。当然还有一些可能是需要大家使用之后去反馈的,也欢迎大家参与进来,一起贡献。
Why Kyuubi
下面介绍 Kyuubi 在 T3 出行的一些应用场景。
T3 出行是一个基于车联网驱动的平台,以车联网的数据为主,基于车联网数据的多样性,T3 出行构建了一个以 Apache Hudi 为基础的企业级数据湖的平台,而且在此之上构建了 BI 平台,任务调度,机器学习,数据质量等等一系列的平台,为业务提供了支撑。随着业务的发展,平台越来越多,对于这些平台的统一管理也越来越复杂,业务小伙伴的使用体验也变差。我们经过一系列的调研选型,决定选择了微众银行开源的 DSS(DataSphere Studio) 作为一站式数据应用的交互管理平台,并且根据公司的实际场景进行了一些定制化的开发。
下图是 DSS 引入 Kyuubi 之前的架构,我们是通过 Kafka 和 CANAL 来定位数据,然后数据进入对象存储,以 Hudi 的格式保存在数据湖之上,资源编排是 YARN,计算引擎主要是 Spark、Hive 和 Flink。计算引擎通过计算中间件 Linkis 来统一管理交互,同时和 DSS 之间做了一个打通,在 Linkis 计算中间件之上构建了 BI 平台和数据地图,数据开发,机器学习等等,这多种平台通过 DSS 这个一站式的入口来统一进入管理。
这个架构在实际使用中遇到了一些问题,比如说跨存储的问题,现在数据分布在 OBS 对象存储, Hudi 的格式存储,还有 ClickHouse、MongoDB 等不同的成熟数据,开发小伙伴就需要写各种代码进行关联分析,或者 ETL 的导入,Linkis 对此解决还是比较有限的。还有 SQL 语法的不统一,比如 Hive 或者原生的 Spark SQL 不支持 upsert、update、delete 等语法操作,还有 MongoDB、ClickHouse 等语法也各不相同,开发转换成本比较高。同时 Linkis 和 Hive、Spark 版本是强耦合的,如果升级 Spark 版本,就需要修改一系列的源码,重新编译,升级的难度比较大;同时 Linkis 中的 Spark 引擎对于 cluster 运行模式、 AQE、动态资源等特性的支持还不完善,改造的成本都比较大。
所以在此之上我们引入 Apache Kyuubi,T3 出行使用了很长时间 Kyuubi,所以我们决定调研一下,看能否把 Kyuubi 和 Linkis 互相打通。Kyuubi 是一个统一的 Thrift JDBC 的服务,对接了 Spark 引擎和 Flink 引擎,Trino 社区也做了一些对接,所以能够管理多种引擎,可以满足 BI、ETL、ad-hoc 的一些场景,同时已经是进入了 Apache 孵化器,为企业级数据湖探索提供了一个标准化的接口,赋予用户调动整个数据湖数据的一个能力,让用户能够像处理普通数据一样处理大数据,是一个 Serverless 的服务。
对比一下 Kyuubi 和 Linkis、Hive on Spark,如下表,Hive 和 Kyuubi 都是 JDBC 的接口,Linkis 自己提供了一些 HTTP 的 REST API,Linkis 主要基于 Spark 引擎或者 HiveQL 之类的语法,Kyuubi 主要是 Spark SQL 或者 Flink SQL 语法,Hive 还是自己的 HQL 语法。
SQL 解析 Hive 是 Server 端,Linkis 和 Kyuubi 都是 Engine 端。任务提交,Hive 拆分了多个 RemoteDriver 提交,Linkis 基于 Server 端进行的一个分布式的线程调度,Kyuubi 自己有一个资源隔离的机制,通过 USER、GROUP 或者 CONNECTION 的策略来隔离资源。
Linkis 和 Spark、Hive 的版本是绑定的,Kyuubi 就比较灵活,支持多版本适配,它和 Server 端和引擎端是分离的,比较便于我们升级和更新迭代。计算资源方面,Linkis 是基于自己的引擎管理,同时它使用的时候它是会把资源给锁住了,Kyuubi 是基于 YARN 和 Kubernetes 的资源调度,在此之上通过 Engine 的维度来进行资源的管理,比 Linkis 要灵活很多。
Linkis 集成 Kyuubi 实践过程
下面介绍 Linkis 和 Kyuubi 的集成流程,Linkis 支持新增一个自定义引擎的,支持多种的引擎类型,我们选择了其中的ComputationExecutor:
这个类型,集成了它的方法,因为它是常用的交互式引擎 Executor,能够处理交互式执行任务,并且具备状态查询、任务 kill 等交互式能力。而 Kyuubi 引擎是一个交互式引擎,所以在此之上实现这个 Execute 是比较合适的。
实现这个引擎,主要是 com 文件要引入 Linkis 的一个相关包,具体可以看 Linkis 的官方文档《如何快速实现新的底层引擎》。
我们要实现这几个模块,KyuubiEngineConnPlugin 是启动引擎的一个连接的入口;KyuubiEngineConnFactory 是实现一个引擎的管理,启动引擎的一个整体的逻辑;KyuubiEngineLaunchBuilder 模块用于封装引擎端管理,是解析启动命令的;实际执行的场景得是 KyuubiExecutor,它直接和 Kyuubi Server 来交互,实现计算逻辑的执行的单元。主要是这几大块的实现,大致的代码架构如下图所示。
Linkis 启动和 Kyuubi 之间的交互主要是通过 Linkis 的 Gateway 转发引擎的一个管理管理模块,通过 Linkis 的引擎管理模块会启动引擎的连接器管理,连接器管理是多个的,对接不同的用户、不同的引擎,来启动执行 Kyuubi 引擎。Kyuubi 引擎再和 Kyuubi Server 之间建立会话,然后通过和 Kyuubi Server 之间交互,比如说查询或者是 DDL 操作,会把这些返回的结果存储到一个 HDFS 临时目录里,该临时目录会返回之前的查询结果给 Gateway,Gateway 再返回给用户的一个实际客户端。
引入 Kyuubi 之后整体架构如下图。主要的变化是计算中间件,是由 Kyuubi 和 Linkis 共同来实现,SQL 模块都交予 Kyuubi 来管理,其他的模块还是 Linkis 来管理,同时和任务调度、计算引擎之间都打通了。后续我们希望能把 Scala 或者 Flink 之类的任务也都集成在 Kyuubi 之上。
Kyuubi 在一站式平台使用场景
Kyuubi 在一站式平台的实际使用场景,可以看到 DSS 数据开发模块,我们直接增加了一个 Kyyubi 的类型。Kyuubi 的类型直接对接 Kyuubi 服务,可以在上面通过一些 SQL 语句进行数据开发,并且实现一站式的开发和 CI/CD 的管理。
开发好的脚本可以任务编排,任务编排里集成了 Kyuubi 类型,可以直接把 Kyuubi 作为一个组件来进行任务编排,关联已经编写好的脚本。这些编写好的脚本有一个发布的功能,这个发布功能是和 DSS 打通的,发布到 DSS 之上的时候相当于一个 SQL 模块,就是把已经写好的一些脚本,使用 Kyuubi 的数据源就发布到发布到 DS 之上,这就形成了一整套 CI/CD 的过程。
之前 DSS 之上对 Linkis 有一系列的 WebUI 的监控管理,而 Kyuubi 在这方面是没有的,所以我们在这基础上加强了 Kyuubi 的后台管理功能,单独开发了一个 Kyuubi Web 的 Server 模块,通过 UI 对用户的操作进行统一的管理监控,主要是对 Kyuubi Server 端进行连接数、引擎数量、JVM 的监控。
还有对用户会话的 Session 能进行接通,这个 Session 直接是调用 Server 端 Session API 获取一些 Session 状态的接口,然后存到一个 MySQL 的持久化存储,同时也可以手动调 Server 端的 API 去关闭 Session。
此外对用户提交使用的一些语句也都能展示,或者统一管理,这样方便后台管理员来管控用户的操作,同时回溯问题的时候也会比较方便。
小结一下,T3 出行大数据平台引入 Apache Kyuubi 后,和 Linkis 功能互补,实现了代码开发、业务上线与调度系统的打通,同时可以收口做到大数据开发 CI/CD 管理,帮助业务部门低门槛上线大数据相关的需求,减轻了数据开发的压力,向我们一站式开发平台的目标更进了一步。也期望 Apache kyuubi 和 Linkis 作为计算中间件的 引领者越来越好!
作者:杨华,李心恺
附视频回放及 PPT 下载:
T3 出行 Apache Kyuubi FlinkSQLEngine 设计和相关实践
延伸阅读:
eBay 基于 Apache Kyuubi 构建统一 Serverless Spark 网关的实践
评论