写点什么

极光笔记丨 Spark SQL 在极光的建设实践

发布于: 1 小时前
极光笔记丨Spark SQL 在极光的建设实践

极光高级工程师——蔡祖光


前言


Spark 在 2018 开始在极光大数据平台部署使用,历经多个版本的迭代,逐步成为离线计算的核心引擎。当前在极光大数据平台每天运行的 Spark 任务有 20000+,执行的 Spark SQL 平均每天 42000 条,本文主要介绍极光数据平台在使用 Spark SQL 的过程中总结的部分实践经验,包括以下方面内容:


  • Spark Extension 的应用实践

  • Spark Bucket Table 的改造优化

  • 从 Hive 迁移到 Spark SQL 的实践方案

一、Spark Extension 应用实践


Spark Extension 作为 Spark Catalyst 扩展点在 SPARK-18127 中被引入,Spark 用户可以在 SQL 处理的各个阶段扩展自定义实现,非常强大高效


1.1 血缘关系解析


在极光我们有自建的元数据管理平台,相关元数据由各数据组件进行信息收集,其中对 Spark SQL 的血缘关系解析和收集就是通过自定义的 Spark Extension 实现的。


Spark Catalyst 的 SQL 处理分成 parser,analyzer,optimizer 以及 planner 等多个步骤,其中 analyzer,optimizer 等步骤内部也分为多个阶段,为了获取最有效的血缘关系信息,我们选择最终的 planner 阶段作为切入点,为此我们专门实现了一个 planner strategy 进行 Spark SQL 物理执行计划的解析,并提取出读写表等元数据信息并存储到元数据管理平台



1.2 权限校验


在数据安全方面,极光选择用 Ranger 作为权限管理等组件,但在实际使用的过程中我们发现目前社区版本的 Ranger 主要提供的还是 HDFS、HBase、Hive、Yarn 的相关接入插件,在 Spark 方面需要自己去实现相关功能,对于以上问题我们同样选择用 Spark Extension 去帮助我们进行权限方面的二次开发,在实现的过程中我们借助了 Ranger Hive-Plugin 的实现原理,对 Spark SQL 访问 Hive 进行了权限校验功能的实现。


1.3 参数控制


随着数据平台使用 Spark SQL 的业务同学越来越多,我们发现每个业务同学对于 Spark 的熟悉程度都有所不同,对 Spark 配置参数的理解也有好有坏,为了保障集群整体运行的稳定性,我们对业务同学提交的 Spark 任务的进行了拦截处理,提取任务设置的配置参数,对其中配置不合理的参数进行屏蔽,并给出风险提示,有效的引导业务同学进行合理的线上操作。


二、Spark Bucket Table 的改造优化


在 Spark 的实践过程中,我们也积极关注业内其它公司优秀方案,在 2020 年我们参考字节跳动对于 Spark Bucket Table 的优化思路,在此基础上我们对极光使用的 Spark 进行了二次改造,完成如下优化项:


  • Spark Bucket Table 和 Hive Bucket Table 的互相兼容

  • Spark 支持 Bucket Num 是整数倍的 Bucket Join

  • Spark 支持 Join 字段和 Bucket 字段是包含关系的 Bucket Join


上述三点的优化,丰富了 Bucket Join 的使用场景,可以让更多 Join、Aggregate 操作避免产生 Shuffle,有效的提高了 Spark SQL 的运行效率.在完成相关优化以后,如何更好的进行业务改造推广,成为了我们关心的问题。


通过对数据平台过往 SQL 执行记录的分析,我们发现用户 ID 和设备 ID 的关联查询是十分高频的一项操作,在此基础上,我们通过之前 SQL 血缘关系解析收集到的元数据信息,对每张表进行 Join、Aggregate 操作的高频字段进行了分析整理,统计出最为合适的 Bucket Cloumn,并在这些元数据的支撑下辅助我们进行 Bucket Table 的推广改造。


三、Hive 迁移 Spark


随着公司业务的高速发展,在数据平台上提交的 SQL 任务持续不断增长,对任务的执行时间和计算资源的消耗都提出了新的挑战,出于上述原因,我们提出了 Hive 任务迁移到 Spark SQL 的工作目标,由此我们总结出了如下问题需求:


  • 如何更好的定位哪些 Hive 任务可以迁移,哪些不可以

  • 如何让业务部门无感知的从 Hive 迁移到 Spark SQL

  • 如何进行对比分析,确认任务迁移前后的运行效果


3.1 Hive 迁移分析程序的实现


在迁移业务 job 时,我们需要知道这个部门有哪些人,由于 Azkaban 在执行具体 job 时会有执行人信息,所以我们可以根据执行人来推测有哪些 job。分析程序使用了元数据系统的某些表数据和 azkaban 相关的一些库表信息,用来帮助我们收集迁移的部门下有多少 hive job,以及该 hive job 有多少 sql,sql 语法通过率是多少,当然在迁移时还需要查看 Azkaban 的具体执行耗时等信息,用于帮助我们在精细化调参的时候大致判断消耗的资源是多少。


由于线上直接检测某条 sql 是否合乎 spark 语义需要具有相关的读写权限,直接开放权限给分析程序不安全。所以实现的思路是通过使用元数据系统存储的库表结构信息,以及 azkaban 上有采集业务 job 执行的 sql 信息。只要拥有某条 sql 所需要的全部库表信息,我们就能在本地通过重建库表结构分析该条 sql 是否合乎 spark 语义(当然线上环境和本地是有不同的,比如函数问题,但大多情况下是没有问题的)。



以下为某数据部通过分析程序得到的 SQL 通过率



3.2 SQL 执行引擎的无感知切换


目前业务方使用 Hive 的主要方式是通过 beeline 去连接 hiveserver2,由于 livy 也提供了 thriftserver 模块,所以 beeline 也可以直接连接 livy。迁移的策略就是先把合乎 Spark 语法的 SQL 发往 livy 执行,如果执行失败再切换到 Hive 进行兜底执行。


beeline 可获取用户 SQL,启动 beeline 时通过 thrift 接口创建 livy session,获取用户 sql 发送给 livy 执行,期间执行进度等信息可以查询 livy 获得,同时一个 job 对应一个 session,以及每启动一次 beeline 对应一个 session,当 job 执行完毕或者 beeline 被关闭时,关闭 livy session。(如果 spark 不能成功执行则走之前 hive 的逻辑)



图 3-2-1


有了以上切换思路以后,我们开始着手 beeline 程序的修改设计


beeline 重要类图如图 3-2-2 所示, Beeline 类是启动类,获取用户命令行输入并调用 Commands 类去 执行,Commands 负责调用 JDBC 接口去执行和获取结果, 单向调用流程如图 3-2-3 所示。


 图 3-2-2

 图 3-2-3


由图 3-2-2 和图 3-2-3 可知,所有的操作都是通过 DatabaseConnection 这个对象去完成的,持有这个 对象的是 DatabaseConnections 这个对象,所以多计算引擎切换,通过策略适配


DatabaseConnections 对象,这样就能在不修改其他代码的情况下切换执行引擎(即获取不同的 connection)



3.3 任务迁移黑名单


前文有说到,当一个 Hive 任务用 SQL 分析程序走通,并且在迁移程序用 livy 进行 Spark 任务提交以后,还是会有可能执行失败,这个时候我们会用 Hive 进行兜底执行保障任务稳定性。但是失败的 SQL 会有多种原因,有的 SQL 确实用 Hive 执行稳定性更好,如果每次都先用 Spark SQL 执行失败以后再用 Hive 执行会影响任务效率,基于以上目的,我们对迁移程序开发了黑名单功能,用来保障每个 SQL 可以找到它真正适合的执行引擎,考虑到 beeline 是轻量级客户端,识别的功能应该放在 livy-server 侧来做,开发一个类似 HBO 的功能来将这样的异常 SQL 加入黑名单,节省迁移任务执行时间。


目标: 基于 HBE(History-Based Executing)的异常 SQL 识别


有了上述目标以后我们主要通过如下方式进行了 SQL 黑名单的识别切换


SQL 识别限定在相同 appName 中(缩小识别范围避免识别错误)得到 SQL 抽象语法树的后续遍历内容后生成 md5 值作为该 sql 的唯一性标识把执行失败超过 N 次的 SQL 信息写入黑名单下次执行时根据赋值规则比较两条 SQL 的结构树特征对于在黑名单中的 SQL 不进行 Spark SQL 切换


3.4 迁移成果


今年经过迁移程序的迁移改造,HSQL 最大降幅为 50%+(后随今年业务增长有所回升)




四、Spark3.0 的应用


当前极光使用的 Spark 默认版本已经从 2.X 版本升级到了 3.X 版本,Spark3.X 的 AQE 特性也辅助我们更好的使用 Spark



实践配置优化:


#spark3.0.0 参数


#动态合并 shuffle partitions


spark.sql.adaptive.coalescePartitions.enabled true


spark.sql.adaptive.coalescePartitions.minPartitionNum 1


spark.sql.adaptive.coalescePartitions.initialPartitionNum 500


spark.sql.adaptive.advisoryPartitionSizeInBytes 128MB


#动态优化数据倾斜,通过实际的数据特性考虑,skewedPartitionFactor 我们设置成了 1


spark.sql.adaptive.skewJoin.enabled true


spark.sql.adaptive.skewJoin.skewedPartitionFactor 1


spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes 512MB


五、后续规划


目前针对线上运行的 Spark 任务,我们正在开发一套 Spark 全链路监控平台,作为我们大数据运维平台的一部分,该平台会承担对线上 Spark 任务运行状态的采集监控工作,我们希望可以通过该平台及时定位发现资源使用浪费、写入大量小文件、存在 slow task 等问题的 Spark 任务,并以此进行有针对性的优化,让数据平台可以更高效的运行。

发布于: 1 小时前阅读数: 5
用户头像

还未添加个人签名 2021.04.23 加入

还未添加个人简介

评论

发布
暂无评论
极光笔记丨Spark SQL 在极光的建设实践