EMR Spark-SQL 性能极致优化揭秘 Native Codegen Framework
作者:周克勇,花名一锤,阿里巴巴计算平台事业部 EMR 团队技术专家,大数据领域技术爱好者,对 Spark 有浓厚兴趣和一定的了解,目前主要专注于 EMR 产品中开源计算引擎的优化工作。
背景和动机
SparkSQL 多年来的性能优化集中在 Optimizer 和 Runtime 两个领域。前者的目的是为了获得最优的执行计划,后者的目的是针对既定的计划尽可能执行的更快。相比于 Runtime,Optimizer 是更加通用的、跟实现无关的优化。无论是 Java 世界(Spark, Hive)还是 C++世界(Impala, MaxCompute),无论是 Batch-Based(Spark, Hive)还是 MPP-Based(Impala, Presto),甚至无论是大数据领域还是传统数据库领域亦或 HTAP 领域(HyPer, ADB),在 Optimizer 层面考虑的都是非常类似的问题: Stats 收集,Cost 评估以及计划选择;采用的优化技术也比较类似,如 JoinReorder, CTE, GroupKey Elimination 等。尽管因为上下文不同(如是否有索引)在 Cost Model 的构造上会有不同,或者特定场景下采用不同的空间搜索策略(如遗传算法 vs. 动态规划),但方法大体是相同的。长期以来,Runtime 的优化工作基本聚焦在解决当时的硬件瓶颈。如 MapReduce 刚出来时网络带宽是瓶颈,所以 Google 做了很多 Locality 方面的优化;Spark 刚出来时解决的问题是磁盘 IO,内存缓存的设计使得性能相比 MapReduce 有了数量级的提升;后来 CPU 成为了新的瓶颈[1],因此提升 CPU 性能成了近年来 Runtime 领域重要的优化方向。提升 CPU 性能的两个主流技术是以 MonetDB/X100[2](如今演化为 VectorWise[3])为代表的向量化(Vectorized Processing)技术和以 HyPer[5][6]为代表的代码生成(CodeGen)技术(其中 Spark 跟进的是 CodeGen[9])。简单来说,向量化技术沿用了火山模型,但与其让 SQL 算子每次计算一条 Record,向量化技术会积攒一批数据后再执行。逐批计算相比于逐条计算有了更大的优化空间,例如虚函数的开销分摊,SIMD 优化,更加 Cache 友好等。这个技术的劣势在于算子之间传递的数据从条变成了批,因此增大了中间数据的物化开销。CodeGen 技术从另外一个角度解决虚函数开销和中间数据物化问题:算子融合。简单来说,CodeGen 框架通过打破算子之间的界限把火山模型“压平”了,把原来迭代器链压缩成了大的 for 循环,同时生成语义相同的代码(Java/C++/LLVM),紧接着用对应的工具链编译生成的代码,最后用编译后的 class(Java)或 so(C++,LLVM)去执行,从而把解释执行转变成了编译执行。此外,尽管还是逐条执行,由于抹去了函数调用,一条 Record 从(Stage 内的)初始算子一直执行到结束算子都基本处于寄存器中,不会物化到内存。CodeGen 技术的劣势在于难以应用 SIMD 等优化。两个门派相爱相杀,在经历了互相发论文验证自家优于对方后[4][8]两家走向了合作,合作产出了一系列项目和论文,而目前学界的主流看法也是两者融合是最优解,一些采用融合做法的项目也应运而生,如进化版 HyPer[6], Pelonton[7]等。尽管学界已走到了融合,业界主流却没有很强的动力往融合的路子走,探究其主要原因一是目前融合的做法相比单独的优化并没有质的提升;二是融合技术目前没有一个广为接受的最优做法,还在探索阶段;三是业界在单一的技术上还没有发挥出最大潜力。以 SparkSQL 为例,从 2015 年 SparkSQL 首次露面自带的 Expression 级别的 Codegen,到后来参考 HyPer 实现的 WholeStage Codegen,再经过多年的打磨,SparkSQL 的 Codegen 技术已趋成熟,性能也获得了两次数量级的跃升。然而,也许是出于可维护性或开发者接受度的考虑,SparkSQL 的 Codegen 一直限制在生成 Java 代码,并没有尝试过 NativeCode(C/C++, LLVM)。尽管 Java 的性能已经很优,但相比于 Native Code 还是有一定的 Overhead,并缺乏 SIMD(Java 在做这方面 feature),Prefetch 等语义,更重要的是,Native Code 直接操作裸金属,易于极致压榨硬件性能,对一些加速器(如 GPU)或新硬件(如 AEP)的支持也更方便。基于以上动机,EMR 团队探索并开发了 SparkSQL Native Codegen 框架,为 SparkSQL 换了引擎,新引擎带来 20%左右的性能提升,为 EMR 再次获取世界第一立下汗马功劳,本文将详细介绍 Native Codegen 框架。
核心问题
做 Native Codegen,核心问题有三个:1.生成什么?2.怎么生成?3.如何集成到 Spark?
生成什么
针对生成什么代码,结合调研的结果以及开发同学的技术栈,有三个候选项:C/C++, LLVM, Weld IR。C/C++的优势是实现相对简单,只需对照 Spark 生成的 Java 代码逻辑改写即可,劣势是编译时间过长,下图是 HyPer 的测评数据,C++的编译时间比 LLVM 高了一个数量级。
编译时间过长对小 query 很不友好,极端 case 编译时间比运行时间还要长。基于这个考虑,我们排除了 C/C++选项。上图看上去 LLVM 的编译时间非常友好,而且很多 Native CodeGen 的引擎,如 HyPer, Impala, 以及阿里云自研大数据引擎 MaxCompute,ADB 等,均采用了 LLVM 作为目标代码。LLVM 对我们来说(对你们则不一定:D)最大的劣势就是过于底层,语法接近于汇编,试想用汇编重写 SparkSQL 算子的工作量会有多酸爽。大多数引擎也不会用 LLVM 写全量代码,如 HyPer 仅把算子核心逻辑用 LLVM 生成,其他通用功能(如 spill,复杂数据结构管理等)用 C++编写并提前编译好。即使 LLVM+C++节省了不少工作量,对我们来说依然不可接受,因此我们把目光转向了第三个选项: Weld IR(Intermediate Representation)。首先简短介绍以下 Weld。Weld 的作者 Shoumik Palkar 是 Matei Zaharia 的学生,后者大家一定很熟悉,Spark 的作者。Weld 最初想解决的问题是不同 lib 之间互相调用时数据传输的开销,例如要在 pandas 里调用 numpy 的接口,首先 pandas 把数据写入内存,然后 numpy 读取内存进行计算,对于极度优化的 lib 来说,内存的写入和读取的时间可能会远超计算本身。针对这个问题,Weld 开发了 Common Runtime 并配套提供了一组 IR,再加上惰性求值的特性,只需(简单)修改 lib 使其符合 Weld 的规范,便可以做到不同 lib 共用 Weld Runtime,Weld Runtime 利用惰性求值实现跨 lib 的 Pipeline,从而省去数据物化的开销。Weld Runtime 还做了若干优化,如循环融合,循环展开,向量化,自适应执行等。此外,Weld 支持调用 C 代码,可以方便调用三方库。
我们感兴趣的是 Weld 提供的 IR 和对应的 Runtime。Weld IR 面向数据分析进行设计,因此语义上跟 SQL 非常接近,能较好的表达算子。数据结构层面,Weld IR 最核心的数据结构是 vec 和 struct,能较好地表达 SparkSQL 的 UnsafeRow Batch;基于 struct 和 vec 可以构造 dict,能较好的表达 SQL 里重度使用的 Hash 结构。操作层面,Weld IR 提供了类函数式语言的语义,如 map, filter, iterator 等,配合 builder 语义,能方便的表达 Project, Filter, Agg, BroadCastJoin 等算子语义。例如,以下 IR 表达了 Filter + Project 语义,具体含义是若第二列大于 10,则返回第一列:
以下 IR 表达了 groupBy 的语义,具体含义是按照第一列做 groupBy 来计算第二列的 sum:
具体的语法定义请参考 Weld 文档(https://github.com/weld-project/weld/blob/master/docs/language.md)。Weld 开发者 API 提供了两个核型接口:
weld_module_compile, 把 Weld IR 编译成可执行模块(module)。
weld_module_run, 执行编译好的模块。
基本流程如下图所示,最终也是生成 LLVM 代码。
由此,Weld IR 的优势就显然易见了,既兼顾了性能(最终生成 LLVM 代码),又兼顾了易用性(CodeGen Weld IR 相比 LLVM, C++方便很多)。基于这些考虑,我们最终选择 Weld IR 作为目标代码。
怎么生成
SparkSQL 原有的 CodeGen 框架之前简单介绍过了,详见https://developer.aliyun.com/article/727277。我们参考了 Spark 原有的做法,支持了表达式级别,算子级别,以及 WholeStage 级别的 Codegen。复用 Producer-Consumer 框架,每个算子负责生成自己的代码,最后由 WholeStageCodeGenExec 负责组装。
这个过程有两个关键问题:
1.算子之间传输的介质是什么?2.如何处理 Weld 不支持的算子?
传输介质
不同于 Java,Weld IR 不提供循环结构,取而代之的是 vec 结构和其上的泛迭代器操作,因此 Weld IR 难以借鉴 Java Codegen 在 Stage 外层套个大循环,然后每个算子处理一条 Record 的模式,取而代之的做法是每个算子处理一批数据,IR 层面做假物化,然后依赖 Weld 的 Loop-Fusion 优化去消除物化。例如前面提到的 Filter 后接 Project,Filter 算子生成的 IR 如下,过滤掉第二列<=10 的数据:
Project 算子生成的 IR 如下,返回第一列数据:
表面上看上去 Filter 算子会把中间结果做物化,实际上 Weld 的 Loop-Fusion 优化器会消除此次物化,优化后代码如下:
尽管依赖 Weld 的 Loop-Fusion 优化可以极大简化 CodeGen 的逻辑,但开发中我们发现 Loop-Fusion 过程非常耗时,对于复杂 SQL(嵌套 3 层以上)甚至无法在有限时间给出结果。当时面临两个选择:修改 Weld 的实现,或者修改 CodeGen 直接生成 Loop-Fusion 之后的代码,我们选择了后者。重构后生成的代码如下,其中 1,2,11 行由 Scan 算子生成,3,4,5,6,8,9,10 行由 Filter 算子生成,7 行由 Project 算子生成。
这个优化使得编译时间重回亚秒级别。
Fallback 机制
受限于 Weld 当前的表达能力,一些算子无法用 Weld 实现,例如 SortMergeJoin,Rollup 等。即使是原版的 Java CodeGen,一些算子如 Outter Join 也不支持 CodeGen,因此如何做好 Fallback 是保证正确性的前提。我们采用的策略很直观:若当前算子不支持 Native CodeGen,则由 Java CodeGen 接管。这里涉及的关键问题是 Fallback 的粒度:是算子级别还是 Stage 级别?抛去实现难度不谈,虽然直观上算子粒度的 Fallback 更加合理,但实际上却会导致更严重的问题:Stage 内部 Pipeline 的断裂。如上文所述,CodeGen 的一个优势是把整个 Stage 的逻辑 Pipeline 化,打破算子之间的界限,单条 Record 从初始算子执行到结束算子,整个过程不存在物化。而算子粒度的 Fallback 则会导致 Stage 内部一部分走 Native Runtime,另一部分走 Java Runtime,则两者连接处无可避免存在中间数据物化,这个开销通常会大于 Native Runtime 带来的收益。基于以上考虑,我们选择了 Stage 级别的 Fallback,在 CodeGen 阶段一旦遇到不支持的算子,则整个 Stage 都 Fallback 到 Java CodeGen。统计显示,整个 TPCDS Benchmark,命中 Native CodeGen 的 Stage 达到 80%。
Spark 集成
完成了代码生成和 Fallback 机制,最后的问题就是如何跟 Spark 集成了。Spark 的 WholeStageCodegenExec 的执行可以理解为一个黑盒,无论上游是 Table Scan,Shuffle Read,还是 BroadCast,给到黑盒的输入类型只有两种: RowBatch(上游是 Table Scan)或 Row Iterator(上游非 Table Scan),而黑盒的输出固定为 Row Iterator,如下图所示:
上文介绍我们选择了 Stage 级别的 Fallback,也就决定了黑盒要么是 Java Runtime,要么是 Native Runtime,不存在混合的情况,因此我们只需要关心如何把 Row Batch/Row Iterator 转化为 Weld 认识的内存布局,以及如何把 Weld 的输出转化成 Row Iterator 即可。为了进一步简化问题,我们注意到,尽管 Shuffle Reader/BroadCast 的输入是 Row Iterator,但本质上远端序列化的数据结构是 Row Batch,只不过 Spark 反序列化后转换成 Row Iterator 后再喂给 CodeGen Module,RowBatch 包装成 Row Iterator 非常简易。因此 Native Runtime 的输入输出可以统一成 RowBatch。解决办法呼之欲出了:把 RowBatch 转换成 Weld vec!但我们更进了一步,何不直接把 Row Batch 喂给 Weld 从而省去内存转换呢?本质上 Row Batch 也是满足某种规范的字节流而已,Spark 也提供了 OffHeap 模式把内存直接存堆外(仅针对 Scan Stage。Shuffle 数据和 Broadcast 数据需要读到堆外),Weld 可以直接访问。Spark UnsafeRow 的内存布局大致如下:
针对确定的 schema,null bitmap 和 fixed-length data 的结构是固定的,可以映射成 struct,而针对 var-length data 我们的做法是把这些数据 copy 到连续的内存地址中。如此一来,针对无变长数据的 RowBatch,我们直接把内存块喂给 Weld;针对有变长部分的数据,我们也只需做大粒度的内存拷贝(把定长部分和变长部分分别拷出来),而无需做列级别的细粒度拷贝转换。继续举前文的 Filter+Project 的例子,一条 Record 包含两个 int 列,其 UnsafeRow 的内存布局如下(为了对齐,Spark 里定长部分最少使用 8 字节)。
显而易见,这个结构可以很方便映射成 Weld struct:
而整个 Row Batch 便映射成 Weld vec:
如此便解决了 Input 的问题。而 Weld Output 转 RowBatch 本质是以上过程的逆向操作,不再赘述。解决了 Java 和 Native 之间的数据转换问题,剩下的就是如何执行了。首先我们根据当前 Stage 的 Mode 来决定走 Java Runtime 还是 Native Runtime。在 Native 分支,首先会执行 StageInit 做 Stage 级别的初始化工作,包括初始化 Weld,加载编译好的 Weld Module,拉取 Broadcast 数据(若有)等;接着是一个循环,每个循环读取一个 RowBatch(来自 Scan 或 Shuffle Reader)喂给 Native Runtime 执行,Output 转换并喂给 Shuffle Writer。如下图所示:
总结
本文介绍了 EMR 团队在 Spark Native Codegen 方向的探索实践,限于篇幅若干技术点和优化没有展开,后续可另开文详解,例如:
1.极致 Native 算子优化 2.数据转换详解 3.Weld Dict 优化
大家感兴趣的任何内容欢迎沟通: )[1] Making Sense of Performance in Data Analytics Frameworks. Kay Ousterhout[2] MonetDB/X100: Hyper-Pipelining Query Execution. Peter Boncz[3] Vectorwise: a Vectorized Analytical DBMS. Marcin Zukowski[4] Efficiently Compiling Efficient Query Plans for Modern Hardware. Thomas Neumann[5] HyPer: A Hybrid OLTP&OLAP Main Memory Database System Based on Virtual Memory Snapshots. Alfons Kemper[6] Data Blocks: Hybrid OLTP and OLAP on Compressed Storage using both Vectorization and Compilation. Harald Lang[7] Relaxed Operator Fusion for In-Memory Databases: Making Compilation, Vectorization, and Prefetching Work Together At Last. Prashanth Menon[8] Vectorization vs. Compilation in Query Execution. Juliusz Sompolski[9] https://databricks.com/blog/2016/05/23/apache-spark-as-a-compiler-joining-a-billion-rows-per-second-on-a-laptop.html
查看更多内容,欢迎访问天池技术圈官方地址:EMR Spark-SQL性能极致优化揭秘 Native Codegen Framework_天池技术圈-阿里云天池
评论