Apache Kyuubi 在 B 站大数据场景下的应用实践
01 背景介绍
近几年随着 B 站业务高速发展,数据量不断增加,离线计算集群规模从最初的两百台发展到目前近万台,从单机房发展到多机房架构。在离线计算引擎上目前我们主要使用 Spark、Presto、Hive。架构图如下所示,我们的 BI、ADHOC 以及 DQC 服务都是通过自研的 Dispatcher 路由服务来实现统一 SQL 调度,Dispatcher 会结合查询 SQL 语法特征、读 HDFS 量以及当前引擎的负载情况,动态地选择当前最佳计算引擎执行任务。如果用户 SQL 失败了会做引擎自动降级,降低用户使用门槛;其中对于 Spark 查询早期我们都是走 STS,但是 STS 本身有很多性能和可用性上的问题,因此我们引入了 Kyuubi,通过 Kyuubi 提供的多租户、多引擎代理以及完全兼容 Hive Thrift 协议能力,实现各个部门 Adhoc 任务的资源隔离和权限验证。
Query 查询情况
目前在 Adhoc 查询场景下,SparkSQL 占比接近一半,依赖 Kyuubi 对于 Scala 语法的支持,目前已经有部分高级用户使用 scala 语法提交语句执行,并且可以在 SQL 和 Scala 模式做自由切换,这大大丰富了 adhoc 的使用场景。
02 Kyuubi 应用
Kyuubi 是网易数帆大数据团队贡献给 Apache 社区的开源项目。Kyuubi 主要应用在大数据领域场景,包括大数据离线计算、adhoc、BI 等方向。Kyuubi 是一个分布式、支持多用户、兼容 JDBC 或 ODBC 的大数据处理服务。
为目前热门的计算引擎(例如 Spark、Presto 或 Flink 等)提供 SQL 等查询服务。
我们选择 Kyuubi 的原因:
1. 完全兼容 Hive thrift 协议,符合 B 站已有的技术选型。
2. 高可用和资源隔离,对于大规模的生产环境必不可少。
3. 灵活可扩展,基于 kyuubi 可以做更多适配性开发。
4. 支持多引擎代理,为未来统一计算入口打下基础。
5. 社区高质量实现以及社区活跃。
Kyuubi 的架构可以分成三个部分:
1.客户端: 用户使用 jdbc 或者 restful 协议来提交作业获取结果。
2.kyuubi server: 接收、管理和调度与客户端建立的 Kyuubi Session,Kyuubi Session 最终被路由到实际的引擎执行。
3.kyuubi engine: 接受处理 kyuubi server 发送过来的任务,不同 engine 有着不同的实现方式。
03 基于 Kyuubi 的改进
Kyuubi 已经在 B 站生产环境稳定运行一年以上,目前所有的 Adhoc 查询都通过 kyuubi 来接入大数据计算引擎。 在这一年中我们经历了两次大版本的演进过程,从最初 kyuubi 1.3 到 kyuubi 1.4 版本,再从 kyuubi 1.4 升级 kyuubi 1.6 版本。与之前的 STS 相比,kyuubi 在稳定性和查询性能方面有着更好的表现。在此演进过程中,我们结合 B 站业务以及 kyuubi 功能特点,对 kyuubi 进行部分改造。
3.1 增加 QUEUE 模式
Kyuubi Engine 原生提供了 CONNECTION、USER、GROUP 和 SERVER 多种隔离级别。在 B 站大数据计算资源容量按照部门划分,不同部门在 Yarn 上对应不同的队列。我们基于 GROUP 模式进行了改造,实现 Queue 级别的资源隔离和权限控制。
用户信息和队列的映射由上层工具平台统一配置和管理,Kyuubi 只需关心上游 Dispatcher 提交过来 user 和 queue 信息,进行调度并分发到对应队列的 spark engine 上进行计算。目前我们有 20+个 adhoc 队列,每个队列都对应一个或者多个 Engine 实例(Engine pool)。
3.2 在 QUEUE 模式下支持多租户
kyuubi server 端由超级用户 Hive 启动,在 spark 场景下 driver 和 executor 共享同一个的用户名。不同的用户提交不同的 sql, driver 端和 executor 端无法区分当前的任务是由谁提交的,在数据安全、资源申请和权限访问控制方面都存在着问题。
针对该问题,我们对以下几个方面进行了改造。
3.2.1 kyuubi server 端
1. kyuubi server 以 hive principal 身份启动。
2. Dispatcher 以 username proxyUser 身份提交 SQL。
3.2.2 spark engine 端
1. Driver 和 Executor 以 hive 身份启动。
2. Driver 以 username proxyUser 身份提交 SQL。
3. Executor 启动 Task 线程需要以 username proxyUser 身份执行 Task。
4. 同时需要保证所有的公共线程池,绑定的 UGI 信息正确。如 ORC Split 线程池上,当 Orc 文件达到一定数量会启用线程池进行 split 计算,线程池是全局共享,永久绑定的是第一次触发调用的用户 UGI 信息,会导致用户 UGI 信息错乱。
3.3 kyuubi engine UI 展示功能
在日常使用中我们发现 kyuubi 1.3 Engine UI 页面展示不够友好。不同的用户执行不同的 SQL 无法区分的开,session 、job、stage、task 无法关联的起来。
导致排查定位用户问题比较困难,我们借鉴 STS 拓展了 kyuubi Engine UI 页面。我们对以下几个方面进行了改造。
1. 自定义 kyuubi Listener 监听 Spark Job、Stage、Task 相关事件以及 SparkSQL 相关事件:SessionCreate、SessionClose、executionStart、executionRunning、executionEnd 等
2. Engine 执行 SQL 相关操作时,绑定并发送相关 SQL Event,构造 SQL 相关状态事件,将采集的 Event 进行状态分析、汇总以及存储。
3. 自定义 Kyuubi Page 进行 Session 以及 SQL 相关状态实时展示。
Session Statistics 信息展示
SQL Statistics 信息展示
3.4 kyuubi 支持配置中心加载 Engine 参数
为了解决队列之间计算资源需求的差异性,如任务量大的队列需要更多计算资源(Memory、Cores),任务量小的队列需要少量资源,每个队列需求的差异,我们将所有队列的 Engine 相关资源参数统一到配置中心管理。每个队列第一次启动 Engine 前,将查询自己所属队列的参数并追加到启动命令中,进行参数的覆盖。
3.5 Engine 执行任务的进度显示与消耗资源上报功能
任务在执行过程中,用户最关心的就是自己任务的进度以及健康状况,平台比较关心的是任务所消耗的计算资源成本。我们在 Engine 端,基于事件采集 user、session、job、stage 信息并进行存储,启动定时任务将收集的 user、session、job、stage 信息进行关联并进行资源消耗成本计算,并将结果注入对应 operation log 中, 回传给前端日志展示。
任务进度信息展示
查询消耗资源上报展示
04 Kyuubi 稳定性建设
4.1 大结果集溢写到磁盘
在 adhoc 场景中用户通常会拉取大量结果到 driver 中,同一时间大量的用户同时拉取结果集,会造成大量的内存消耗,导致 spark engine 内存紧张,driver 性能下降问题,直接影响着用户的查询体验,为此专门优化了 driver fetch result 的过程,在获取结果时会实时监测 driver 内存使用情况,当 driver 内存使用量超过阈值后会先将拉取到的结果直接写出到本地磁盘文件中,在用户请求结果时再从文件中分批读出返回,增加 driver 的稳定性。
4.2 单个 SQL 的 task 并发数、执行时间和 task 数量的限制
在生产过程中,我们经常性的遇到单个大作业直接占用了整个 Engine 的全部计算资源,导致短作业长时间得不到计算资源,一直 pending 的情况,为了解决这种问题我们对以下几个方面进行优化。
Task 并发数方面:默认情况下 Task 调度时只要有资源就会全部调度分配出去,后续 SQL 过来就面临着完全无资源可用的情况,我们对单个 SQL 参与调度的 task 数进行了限制,具体的限制数随着可用资源大小进行动态调整。
单个 SQL 执行时间方面:上层 Dispatcher 和下层 Engine 都做了超时限制,规定 adhoc 任务超过 1 小时,就会将该任务 kill 掉。
单个 Stage task 数量方面:同时我们也对单个 stage 的 task 数进行限制,一个 stage 最大允许 30W 个 task。
4.3 单次 table scan 的文件数和大小的限制
为保障 kyuubi 的稳定性,我们对查询数据量过大的 SQL 进行限制。通过自定义外部 optimization rule(TableScanLimit)来达到目的。TableScanLimit 匹配 LocalLimit,收集子节点 project、filter。匹配叶子结点 HiveTableRelation 和 HadoopFsRelation。即匹配 Hive 表和 DataSource 表的 Logical relation,针对不同的表采取不同的计算方式。
1. HiveTableRelation:
非分区表, 通过 table meta 拿到表的 totalSize、numFiles、numRows 值。
分区表,判断是否有下推下来的分区。若有,则拿对应分区的数据 totalSize、numFiles、numRows。若没有,则拿全表的数据。
2. HadoopFsRelation:判断 partitionFilter 是否存在动态 filter
不存在,则通过 partitionFilter 得到需要扫描的分区
存在,则对 partitionFilter 扫描出来的分区进一步过滤得到最终需要扫描的分区
获取到 SQL 查询的 dataSize、numFiles、numRows 后, 还需要根据表存储类型、不同字段的类型、是否存在 limit、在根据下推来的 project、filter 得出最终需要扫描的列,估算出需要 table scan size,如果 table scan size 超过制定阈值则拒绝查询并告知原因。
4.4 危险 join condition 发现 &Join 膨胀率的限制
4.4.1 危险 join condition 发现
为保障 kyuubi 的稳定性,我们也对影响 Engine 性能的 SQL 进行限制。用户在写 sql 时可能并不了解 spark 对于 join 的底层实现,可能会导致程序运行的非常慢甚至 OOM,这个时候如果可以为用户提供哪些 join condition 可能是导致 engine 运行慢的原因,并提醒用户改进和方便定位问题,甚至可以拒绝这些危险的 query 提交。
在选择 join 方式的时候如果是等值 join 则按照 BHJ,SHJ,SMJ 的顺序选择,如果还没有选择 join type 则判定为 Cartesian Join,如果 join 类型是 InnerType 的就使用 Cartesian Join,Cartesian Join 会产生笛卡尔积比较慢,如果不是 InnerType,则使用 BNLJ,在判断 BHJ 时,表的大小就超过了 broadcast 阈值,因此将表 broadcast 出去可能会对 driver 内存造成压力,性能比较差甚至可能会 OOM,因此将这两种 join 类型定义为危险 join。
如果是非等值 join 则只能使用 BNLJ 或者 Cartesian Join,如果在第一次 BNLJ 时选不出 build side 说明两个表的大小都超过了 broadcast 阈值,则使用 Cartesian Join,如果 Join Type 不是 InnerType 则只能使用 BNLJ,因此 Join 策略中选择 Cartesian Join 和第二次选择 BNLJ 时为危险 join。
4.4.2 Join 膨胀率的限制
在 shareState 中的 statusScheduler 用于收集 Execution 的状态和指标,这其中的指标就是按照 nodes 汇总了各个 task 汇报上来的 metrics,我们启动了一个 join 检测的线程定时的监控 Join 节点的 "number of output rows"及 Join 的 2 个父节点的 "number of output rows" 算出该 Join 节点的膨胀率。
Join 节点的膨胀检测:
05 kyuubi 新应用场景
5.1 大查询 connection&scala 模式的使用
5.1.1 connection 模式的使用
adhoc 大任务和复杂的 SQL 会导致 kyuubi engine 在一定时间内性能下降,严重影响了其他正常的 adhoc 任务的执行效率。我们在 adhoc 前端开放了大查询模式,让这些复杂、查询量大的任务走 kyuubi connection 模式。在 kyuubi connection 模式下一个用户任务单独享有自己申请的资源,独立的 Driver,任务的大小快慢都由自身的 SQL 特征决定,不会影响到其他用户的 SQL 任务,同时我们也会适当放开前面一些限制条件。
connection 模式在 B 站的使用场景:
table scan 判定该 adhoc 任务为大任务,执行时间超过 1 个小时。
复杂的 SQL 任务, 该任务存在笛卡尔积或 Join 膨胀超过阈值。
单个 SQL 单个 stage 的 task 数超过 30W。
用户自行选择 connection 模式。
5.1.2 scala 模式的使用
SQL 模式可以解决大数据 80%的业务问题,SQL 模式加上 Scala 模式编程可以解决 99%的业务问题;SQL 是一种非常用户友好的语言,用户不用了解 Spark 内部的原理,就可以使用 SQL 进行复杂的数据处理,但是它也有一定的局限性。
SQL 模式不够灵活,无法以 dataset 以及 rdd 两种方式进行数据处理操作。无法处理更加复杂的业务,特别是非数据处理相关的需求。另一方面,用户执行 scala code 项目时必须打包项目并提交到计算集群,如果 code 出错了就需来回打包上传,非常的耗时。
Scala 模式可以直接提交 code,类似 Spark 交互式 Shell,简化流程。针对这些问题, 我们将 SQL 模式、Scala 模式的优点结合起来,两者进行混合编程,这样基本上可以解决数据分析场景下大部分的 case。
5.2 Presto on spark
Presto 为了保证集群的稳定性,每个 Query 的最大内存进行了限制,超过配置内存的 Query 会被 Presto oom kill 掉。部分 ETL 任务会出现随着业务增长,数据量增大,占用内存也会增多,当超过阈值后,流程就会出现失败。
为了解决这个问题,prestodb 社区开发了一个 presto on spark 的项目,通过将 query 提交到 Spark 来解决 query 的内存占用过大导致的扩张性问题,但是社区方案对于已经存在的查询并不是很友好,用户的提交方式有 presto-cli、pyhive 等方式,而要使用 Presto on spark 项目,则必须通过 spark-submit 方式将 query 提交到 yarn。
为了让用户无感知的执行 presto on spark 查询,我们在 presto gateway 上做了一些改造,同时借助 kyuubi restulful 的接口,和 service + engine 的调度能力,在 kyuubi 内开发了 Presto-Spark Engine,该 engine 能够比较友好的来提交查询到 Yarn。
主要实现细节如下:
1. presto gateway 将 query 的执行历史进行保存,包括 query 的资源使用情况、报错信息等。
2. presto gateway 请求 HBO 服务,判断当前 query 是否需要通过 presto on spark 提交查询。
3. presto gateway 通过 zk 获取可用的 kyuubi server 列表,随机选择一台,通过 http 向 kyuubi open 一个 session。
4. presto gateway 根据获取到的 sessionHandle 信息,再提交语句。
5. kyuubi server 接收到 query 后,会启动一个独立的 Presto-Spark Engine,构建启动命令,执行命令提交 spark-submit 到 yarn。
6. Presto gateway 根据返回的 OperatorHandle 信息, 通过 http 不断获取 operation status。
7. 作业成功,则通过 fetch result 请求将结果获取并返回给客户端。
06 kyuubi 部署方式
6.1 Kyuubi server 接入 K8S
整合 Engine on yarn label 的实践
生产实践中遇到的问题:
**1.**目前 kyuubi server/engine 部署在混部集群上,环境复杂,各组件环境相互依赖、发布过程中难免会存在环境不一致、误操作等问题,从而导致服务运行出错。
**2.**资源管理问题。最初 engine 使用的是 client 模式,不同的队列的 engine driver 使用的都是大内存 50g-100g 不等 ,同时 AM、NM 、DN、kyuubi server 都共享着同一台物理机器上的资源,当 AM 启动过多, 占满整个机器的资源,导致机器内存不足,engine 无法启动。
针对于该问题,我们研发了一套基于 Queue 模式资源分配调度实现:每个 kyuubi server 和 spark engine 在 znode 上都记录着当前资源使用情况。每个 kyuubi server znode 信息:当前 kyuubi 注册 SparkEngine 数量、当前 kyuubi server 注册 SparkEngine 实例、kyuubi server 内存总大小以及当前 kyuubi server 剩余内存总大小等。
每个 engine znode 信息:所属 kyuubi server IP/端口、当前 SparkEngine 内存、当前 SparkEngine 所属队列等 。每次 Spark engine 的启动/退出,都会获取该队列的目录锁,然后对其所属的 kyuubi server 进行资源更新操作。kyuubi server 如果宕机,在启动时,遍历获取所有 engine 在 znode 的信息,进行资源和状态的快速恢复。
**3.**针对资源管理功能也存在着一些问题: 资源碎片化问题、新功能的拓展不友好以及维护成本大。Engine 使用的是 client 模式,过多大内存的 AM 会占用客户端的过多计算资源,导致 engine 水平拓展受限。
针对以上提出的问题,我们做了对应的解决方案:
1. kyuubi server 接入 k8s
我们指定了一批机器作为 kyuubi server 在 k8s 上调度资源池,实现 kyuubi server 环境、资源的隔离。实现了 kyuubi server 快速部署、提高 kyuubi server 水平扩展能力,降低了运维成本。
2. Engine on yarn label
我们将 kyuubi engine 资源管理交给 yarn,由 yarn 负责 engine 的分配和调度。我们采用了 cluster 模式以防 engine 在水平拓展时受到资源限制。采用 cluster 模式后,我们遇到了新的问题:在 queue 模式下 engine driver 使用的都是 50g-100g 不等的大内存,但是由于 yarn 集群的配置限制,能够申请的最大 Container 资源量为<28G, 10vCore>。为了在 cluster 模式的情况下保证 Driver 能够获取到足够的资源,我们改造了 yarn 以适应此类场景。我们将需求拆分为以下三项:
将 kyuubi Driver 放置于独立的 Node Label 中,该 Node Label 中的服务器由 kyuubi driver 独立使用;
kyuubi Executor 仍然放置在 Default Label 的各对应队列的 adhoc 叶子队列内,承接 adhoc 任务处理工作;
Driver 申请的资源需要大于 MaxAllocation,即上文所述的<28G, 10vCore>。希望能够根据 Node Label 动态设置 Queue 级别的 MaxAllocation,使得 kyuubi Driver 能够获得较大资源量。
首先,我们在 yarn 上建立了 kyuubi_label,并在 label 内与 Default Label 映射建立 kyuubi 队列,以供所有的 Driver 统一提交在 kyuubi 队列上。并通过“spark.yarn.am.nodeLabelExpression=kyuubi_label”指定 Driver 提交至 kyuubi_label,通过“spark.yarn.executor.nodeLabelExpression= ”指定 Executor 提交至 default label,实现如下的效果:
其次,我们将 yarn 的资源最大值由原先的“集群”级别管控下放至“队列+Label”级别管控,通过调整"queue name + kyuubi_label"的 Conf,我们能够将 Driver 的 Container 资源量最大值提高至<200G, 72vCore>,且保证其他 Container 的最大值仍为<28G, 10vCore>。同样申请 50G 的 Driver,在 default 集群中会出现失败提示:
而在 kyuubi_lable 的同队列下则能够成功运行, 这样我们既借助了 yarn 的资源管控能力,又保证了 kyuubi driver 获得的资源量。
07 未来规划
1. 小的 ETL 任务接入 kyuubi,减少 ETL 任务资源申请时间
2. Kyuubi Engine(Spark 和 Flink)云原生,接入 K8S 统一调度
3. Spark jar 任务也统一接入 Kyuubi
以上是今天的分享内容,如果你有什么想法或疑问,欢迎大家在留言区与我们互动,如果喜欢本期内容的话,请给我们点个赞吧!
评论