写点什么

Flink Sql Gateway 的原理与实践

  • 2021 年 12 月 16 日
  • 本文字数:2389 字

    阅读完需:约 8 分钟

1 背景

我们在使用 Flink 开发实时任务时,都会用到框架本身提供的 DataStream API,这使得用户不能不用 Java 或者 Scala 甚至 Python 来编写业务逻辑;这种方式虽然灵活且表达性强,但对用户具有一定的开发门槛,并且随着版本的不断更新,DataStream API 也有很多老版本不兼容的问题。所以 Flink SQL 就成了广大开发用户的最佳选择,之所以 Flink 推出 SQL API,主要是因为 SQL 有如下几个重要特性:

图:SQL重要特性
  1. 声明式 API:用户只关心做什么,而不关心怎么做;

  2. 自动优化:屏蔽底层 API 的复杂性,自动做优化;

  3. 简单易懂:SQL 应用于不同行业和领域,学习成本较低;

  4. 不易变动:语法遵循 SQL 标准规范,不易变动;

  5. 流批统一:同样的 SQL 代码,可以用流和批的方式执行。


虽然 Flink 提供了 SQL 能力,但还是有必要基于 Flink SQL 打造属于自己的平台,目前搭建 SQL 平台的方式有如下几种:

  1. Flink 原生 API:使用 Flink 提供的 SQL API,封装一个通用的 pipeline jar,利用 flink shell 脚本工具提交 sql 任务;

  2. Apache Zeppelin:一款开源产品,利用 notebook 方式管理 sql 任务,目前已经与 Flink 集成,且提供了丰富的 SDK;

  3. Flink Sql Gateway:Flink 官方出品的一个 Sql 网关,用 Rest 方式执行 Flink Sql。


第一种方式缺乏灵活性,且大量提交任务时,有性能瓶颈;而 Zeppelin 虽然功能强大,但页面功能有限,如果要基于 Zeppelin 打造 SQL 平台,要么使用 SDK,要么对 Zeppelin 做重度的二次开发;所以 Flink Sql Gateway 比较适合做平台化建设,因为它是一个独立的网关服务,方便与公司现有系统集成,完全与其它系统解耦,本文也主要阐述 Flink Sql Gateway 的实践与探索。

2 Flink Sql Gateway 简介

2.1 架构

图:Flink Sql Gateway 架构

如上图所示,Flink Sql Gateway 的架构比较简单,主要组件是 SqlGatewayEndpoint,它是基于 Flink 的 RestServerEndpoint 实现的一个 Netty 服务,通过自定义实现多种 handler 来完成 sql 任务的创建和部署,以及管理的能力。SqlGatewayEndpoint 内部主要由 SessionManager(会话管理)组成,SessionManager 维护了一个 session map,而 session 内部主要是一些上下文配置和环境信息。


  1. SqlGatewayEndpoint:基于 RestServerEndpoint 实现的 Netty 服务,对外提供 Rest Api;

  2. SessionManager :会话管理器,管理 session 创建与删除;

  3. Session:一个会话,里面存放着任务所需要的 Flink 配置和上下文环境信息,负责任务的执行;

  4. Classpath:Flink Sql Gateway 启动时会加载 flink 安装目录的 classpath,所以 flink sql gateway 基本上没有除 flink 以外的相关依赖。

2.2 执行流程

sql gateway 其实只是一个普通的 NIO 服务器,每个 Handler 都会持有 SessionManager 的引用,因此可以共同访问同一个 SessionManager 对象。当请求到达时,Handler 会获取请求中的参数,如 SessionId 等,去 SessionManager 中查询对应的 Session,从而执行提交 sql、查询任务状态等工作。请求流程如下图所示:

图:请求流程

创建 session,这是使用 sql gateway 的第一步,SessionManager 会把用户传入的任务执行模式、配置、planner 引擎方式等参数封装成 Session 对象,放入 map 中,并返回 sessionid 给用户;


用户持有 sessionid,发起 sql request 的请求,gateway 根据 sessionid 找到对应的 Session 对象,开始部署 sql job 到 yarn / kubernetes;

2.3 功能

2.3.1 任务部署

Flink Sql Gateway 作为 Flink 的客户端,任务部署直接运用了 Flink 的能力,而 Flink 目前支持三种部署模式:

  1. in Application Mode,

  2. in a Per-Job Mode,

  3. in Session Mode。


三种模式有如下两个区别:

  1. 集群生命周期和资源隔离:per-job mode 的集群生命周期与 job 相同,但有较强的资源隔离保证。

  2. 应用程序的 main()方法是在客户端还是在集群上执行:session mode 和 per-job mode 在客户端上执行,而 application mode 在集群上执行。

图:三种模式

从以上可以看出,Application Mode 为每个应用程序创建一个会话集群,并在集群上执行应用程序的 main() 方法,所以它是 session mode 和 per-job 的一个折中方案。


目前为止,Flink 只支持 jar 包任务的 application mode,所以想要实现 sql 任务的 application mode,需要自己改造实现,后面会讲实现方法。

2.3.2 SQL 能力

Flink Sql Gateway 支持的 Sql 语法如下:

图:Flink Sql Gateway 支持的 Sql 语法


Flink Sql Gateway 支持所有 Flink Sql 语法,但本身也有一些限制:

  1. 不支持多条 sql 执行,多条 insert into 执行会产生多个任务;

  2. 不完整的 set 支持,对于 set 语法支持存在 bug;

  3. Sql Hit 支持不是很友好,写在 sql 里比较容易出错。

3 平台化改造

3.1 SQL 的 application mode 实现

前面说到,flink 不支持 sql 任务的 application mode 部署,只支持 jar 包任务。jar 包任务的 application mode 实现如下图所示:

图:jar 包任务的 application mode 实现
  1. flink-clients 解析出用户的配置和 jar 包信息;

  2. ApplicationConfiguration 里指定了 main 方法的入口类名和入参;

  3. ApplicationDeployer 负责把 Jobmanager 启动,并且启动时执行 Flink Application 的 main 方法。


通过以上流程可以看出,要实现 sql 的 application mode,实现通用执行 sql 的 pipeline jar 是关键:


实现一个执行 sql 的通用 pipeline jar 包,并且预先传到 yarn 或者 k8s,如下所示:

在 ApplicationConfiguration 中指定

pepeline jar 的 main 方法入口和参数:

3.2 多 Yarn 集群支持

目前 Flink 只支持单 Yarn 环境的任务部署,对于拥有多套 Yarn 环境的场景,需要部署多套 Flink 环境,每个 Flink 对应一个 Yarn 环境配置;虽然这种方式能解决问题,但并不是最优的解决方案。熟悉 Flink 应该都知道,Flink 使用 ClusterClientFactory 的 SPI 来生成与外部资源系统(Yarn/kubernetes)的访问介质(ClusterDescriptor),通过 ClusterDescriptor 可以完成与资源系统的交互,比如 YarnClusterDescriptor,它持有 YarnClient 对象,可以完成与 Yarn 的交互;所以对于多 Yarn 环境,我们只要保证 YarnClusterDescriptor 里持有的 YarnClient 对象与 Yarn 环境一一对应即可,代码如下图所示:

作者简介

Zheng OPPO 高级数据平台工程师

主要负责基于 Flink 的实时计算平台开发, 对 Flink 有较丰富的研发经验, 也曾参与过 Flink 社区的贡献。


获取更多精彩内容,请扫码关注[OPPO 数智技术]公众号


发布于: 6 小时前阅读数: 8
用户头像

还未添加个人签名 2019.12.23 加入

OPPO数智技术干货及技术活动分享平台

评论

发布
暂无评论
Flink Sql Gateway的原理与实践