打造次世代分析型数据库(六):如何从零实现向量化引擎
作者介绍
josehu(胡翔),腾讯云数据库高级工程师,具有多年分布式数据库内核研发经验,主要负责和参与过高可用、数据导入导出、索引等相关模块的设计和开发。博士毕业于中国科学院软件研究所,加入腾讯后主要负责 CDW PG 数据库向量化执行引擎等相关特性的设计和开发工作。
1. 什么是向量化执行
向量化是指计算从一次对一个值进行运算转换为一次对一组值进行运算的过程。
1.1 从 CPU 角度看
现代 CPU 支持将单个指令应用于多个数据(SIMD)的向量运算。例如,具有 128 位寄存器的 CPU 可以保存 4 个 32 位数并进行一次计算,比一次执行一条指令快 4 倍。
比如我们在内存当中有 4 个 32 位的 int,传统的 CPU 不支持 SIMD,进行计算时需要 4 次从内存中 Load 数据,再进行 4 次乘法计算,然后把结果写回到内存当中,这个过程同样要进行 4 次。假如 CPU 支持 SIMD,就可以一次载入多个连续的内存数据,一次计算多对操作数以及一次写入多个计算结果。这样的话理论上会比传统的 CPU 快 4 倍。随着 CPU 的发展,现在大家常用的都是 128 位的 SSE 的指令,后面又多了 256 位的 AVX 指令,以及英特尔现在最新的 AVX512 的指令,一次向量化运算的一组值可以变得越来越多,效率会越来越高。但这不一定是线性的关系,但是能够保证一次对一组值的操作是更多更快的。
1.2 从数据库角度看
类似地,对于数据库里面的一个查询语句,其向量化执行是每次运算都对一组元组进行批量运算的过程。
数据库里面的查询执行引擎使用最为广泛的模型是火山模型,即迭代模型。上层算子每次通过调用一个 next 函数,从下层算子获取一个元组并进行处理,然后递归地调用下层算子去获取元组。一些特殊的算子需要从下层算子获取所有元组之后才能继续执行,比如需要构建 hash table 的算子(HashAgg、HashJoin)或需要元组排序的算子等。火山模型的优势在于实现简单,而且方便对输出结果进行控制,在过去磁盘 IO 是主要瓶颈的情况下具有很好的效果,但是不能充分发挥当前 CPU 的能力,CPU 的有效率、利用率非常低。自上而下逐层地函数调用会造成大量的指令以及数据的 cache miss。因此,很多数据库使用向量化或者编译执行等方法来解决上述火山模型导致的问题。
向量化模型与火山模型类似,但是每次 next 调用返回的是一组元组,这样就可以将函数调用的代价均摊到多个元组上,从而减少总体函数调用次数。另外,算子内部实现或者计算函数实现可以使用更加高效的方式循环处理一组元组,比如使用编译器自动循环展开或者手动编写 SIMD 指令等方式。需要注意的是,在实际的计算中往往执行的是在特定类型的列向量上的简单计算,连续的数据可以完全放入到 cache 中,计算过程中没有数据依赖以及条件分支,这样就可以充分发挥 CPU 乱序执行的能力,减少数据和指令的 cache miss,从而将 CPU 硬件的能力充分释放。
举一个简单的例子,比如执行一个 Scan 操作,然后执行一个条件过滤,最后执行一个乘法运算,两种模型的对比如下图所示。原来的火山模型,一次只能处理一个元组,而实现向量化之后,一次就能处理多个元组,Scan 算子获取多个列向量,Filter 算子筛选出满足条件的元组并对其进行标记(使用 bool 数组),Project 算子计算出最终的乘法结果,其操作数为满足 Filter 条件的元组进行压缩后得到的列向量。可以看出,向量化模型可以大大减少函数调用次数。
2. 如何实现向量化
实现向量化的核心工作主要分为这四块:
向量化执行框架:为了让当前的执行器逻辑兼容向量化执行,需要考虑如何生成向量化计划,如何执行向量化计划,以及如何支持向量化执行和非向量化执行共存等。
向量化数据结构:为了更好地发挥向量化执行的计算加速的作用,需要合理设计向量的内存组织形式,尽可能地使用 cache 资源,加速内存的读写。同时,减少内存的拷贝。
向量化算子实现:为了适应一次处理一组元组的执行方式,需要调整原有算子的实现。基本原则是使用尽可能小的循环来处理简单的操作,这就需要对原有算子的实现进行拆分,或者重新实现一套向量化的算法。
向量化函数实现:与向量化算子实现类似,向量化函数实现也要做相应的调整。另外还需要调整一些函数计算框架,如 Qual、Project 等。
2.1 向量化执行框架
向量化执行往往会带来更多的性能优势,因此在生成向量化计划时采用贪婪的方法,尽可能将计划路径中涉及的每个算子转换成向量化执行的方式。虽然没有通过比对所有计划代价并进行最优计划选择的方式,但是这种方式简单直接,而且可以通过 GUC 参数进行灵活控制。
一个查询计划生成之后,会尝试对其进行向量化转换。一般过程是先处理子计划,然后处理整个计划。对于每个计划节点,会根据计划节点的类型递归地对其包含的左右子树计划节点进行判断和转换操作,如果一个计划节点不支持向量化,可以通过在这个计划节点上面添加一个行转向量的新的计划节点,尽可能地让上层算子支持向量化执行。此外,还需要判断该计划节点包含的表达式计算是否支持向量化。比如对于 Hashjoin 计划节点,首先判断其左右子树计划节点是否支持向量化,如果都支持,则继续判断其包含的哈希键匹配函数以及非哈希键匹配函数等是否支持向量化,如果都支持,整个 HashJoin 就可以向量化执行。如果左子树计划节点不支持向量化,通过在其上添加一个行转向量的计划节点,使得 HashJoin 可以向量化执行。如果右子树计划节点不支持向量化,由于其对应 Hash 计划节点,与 HashJoin 计划节点是紧密关联的,故 HashJoin 节点不可以向量化执行,同时需要在支持向量化的左子树计划节点上面添加一个向量转行的新的计划节点,确保计划向量化计划和非向量化计划可以兼容。
以一个简单的 SQL 为例,原有的火山模型执行流程和向量化模型执行流程如下图所示。两者都是上层算子迭代调用下层算子,但前者每次只能处理一个元组,而后者每次可以处理一组元组。
2.2 向量化数据结构
向量化数据结构指的是一个向量在内存的组织形式。设计向量在内存的组织形式的原则是尽量能把同种类型的数据连续存储在更加靠近 CPU 的位置,比如 cache,方便快速计算。原有的元组在内存中的组织形式为 TupleTableSlot,一个 TupleTableSlot 就代表一个,包含了每个列的信息和数据,执行过程中算子内部的计算过程以及算子之间的结果传递都是以 TupleTableSlot 为单位的。为了便于向量化计算,必须把多个元组组织在一起,同时,相同列的计算是一样的,必须把相同列的数据组织在一起。我们创建出新的数据结构 VectorTableSlot 来表示元组向量,其中,使用数据结构 ColumnVector 来表示每个列向量。
VectorTableSlot 可以保存多个元组,用于算子间 vector 的流转。ColumnVector 是实际计算的操作数,便于实现基于该列上的批量计算。根据数据的存储形式,可以区分为定长和非定长类型。定长数据,如果不超过 8 个字节,都可以直接存放在 ColumnVector 的 cv_vals 数组中。超过 8 个字节的定长数据以及非定长数据,由于无法通过 cv_vals 数组直接表示,需要存在 cv_buf 指向的内存中,并且在 cv_vals 保存该数据的起始位置指针。这部分内存的结构由 VectorBuffer 表示,它是一个由多个 TinyBuffer 组成的链表,其中,TinyBuffer 表示一块可供使用的连续内存。通过 buffer 的自维护,可以尽量地将变量连续存储,减少 cache miss,还可以减少算子间的内存拷贝以及方便内存的快速复用。
2.3 向量化算子实现
下面以两个使用最频繁的算子为例,即 HashAgg 和 HashJoin 算子,来介绍如何进行向量化算子的实现。向量化算子实现的原则是尽可能地将复杂的循环处理过程拆解成多个简单的小循环,以便对多个同种类型的数据进行快速循环处理。另外,还需要尽可能地减少分支以及数据、控制等依赖。
2.3.1 HashAgg 向量化
首先看一个 HashAgg 的例子,使用两个列进行分组并对每个组内进行 count*计算。整个算法流程跟原有的一致,仍然包含两个步骤:一是构建 hash table 并在每个 hash entry 上计算聚合结果;二是遍历 hash table,计算最终的聚合结果。
向量化改造之后,一些操作可以通过简单的循环来进行批量处理,包括 hash 值、hash bucket 值的计算,以及聚合结果的计算,可以显著提高计算的效率。另外,下盘的逻辑也是批量完成的,可以显著提高下盘数据的读写速度。具体如下图所示:
对输入的元组向量在分组列上批量计算 hash 值;根据计算的 hash 值批量计算 hash bucket 值;
构建 hash table,针对每个元组,首先判断其 hash bucket 位置是否存在 hash entry,如果不存在就需要创建新的 hash entry(如图示 insert 情形)并记录相应的 hash entry 位置,如果存在就需要进行 hash entry 的匹配操作,包括检查 key 和 value 是否匹配,如果匹配(如图示 match 情形),则查找结束并记录相应的 hash entry 位置,如果不匹配(如图示 conflict 情形),则需要找到当前位置的下一个位置再次进行 hash entry 的匹配操作直到完成匹配并记录相应的 hash entry 位置。另外,如果当前 hash table 已经超过了内存阈值,则对需要创建新的 hash entry 的元组进行下盘操作(如图示 spill 情形)。需要注意的是这里使用的 hash table 是采用 Open addressing 的方式处理冲突的,这种数组结构具有更好的 locality,从而更加适合向量化执行;
计算聚合结果,原有的方法是每次命中一个 hash entry(包括 insert/match 情形)就计算一次聚合结果并更新到 hash entry 上面,向量化执行的方法可以批量地对多个 hash entry 进行聚合计算,比如对 quantity 列进行 sum 操作,然后将最终的结果更新到相应的 hash entry 即可;
遍历 hash table 输出聚合结果,扫描每一个 hash entry,将聚合结果以及 group by 列和聚合列拼接成元组向量并返回;
如果存在下盘的元组,则需要重置当前已经遍历结束的 hash table,根据下盘的元组构建新的 hash table 继续执行上述步骤,直至所有元组处理完毕。
2.3.2HashJoin 算子向量化
再看一个 HashJoin 的例子,由于 HashJoin 类型比较多,当前的状态机实现是一种通用的方式,不同类型的 HashJoin 逻辑混在一起,造成代码的复杂度比较高,直接向量化比较困难。因此,我们这里选择了最常用的内连接方式进行向量化。整个流程仍然包含两个步骤,一是构建哈希表,二是探测哈希表并进行匹配,最后将匹配的结果输出。
向量化改造之后,与 HashAgg 算子类似,一些操作可以通过简单的循环来进行批量处理,包括 hash 值、hash bucket 值的计算(内外表都需要计算)。另外,哈希匹配操作也可批量进行。具体如下图所示。
对 Scan 内表得到的元组向量批量计算 hash 值和 hash bucket 值;
对内表构建 hash table;
对 Scan 外表得到的元组向量批量计算 hash 值和 hash bucket 值;
使用外表元组向量探测内表构建的 hash table,如果匹配成功,通过一个标记数组在对应位置上进行标记,如果匹配失败,需要找到 hash bucket 的下一个位置继续进行匹配,直到匹配成功或者当前 hash bucket 链匹配结束;
根据标记数组将匹配成功的行进行对应的 Proj 列输出。
2.4 向量化函数实现
设计时有两个方案。一是对每种数据类型新增加一个向量化版本的数据类型,然后实现向量化版本的函数,但是这种方案相当于重新实现所有的类型,涉及的修改点比较多,方案二只需要实现向量化版本的函数,并提供非向量化版本和向量化版本函数的一个映射表,在向量化执行时按照二分查找的方式找到并调用向量化版本的函数即可。这种方案可行性更高,故最终选择方案二。
以 32 位 int 值判等函数为例,非向量化版本的入参包括两个 int32 变量,直接返回判等结果,而向量化版本的入参是两个列向量,需要对列向量的每行进行判等,结果存储在另一个列向量中返回。另外,向量化版本需要考虑 null 值等场景的处理,而原有的逻辑是在计算函数之前执行的。具体如下图所示:
3. 总结和展望
本文首先介绍了向量化执行的由来,然后逐步地介绍了向量化的实现过程,包括向量化执行框架、向量化数据结构、向量化算子实现以及向量化函数实现,采用了通用的方式,具有普遍意义。同时,向量化执行不影响原有架构,具体的实现与社区大致保持一致,尽可能地不对原有代码浸入式修改,保持一定的独立性,方便后续与社区最新代码进行同步。
为了进一步提升查询执行效率,需要对算子的向量化算法进行更加深入的优化,尽可能地方便编译器对程序自动地进行向量化编译,通过 SIMD 指令显示编程来进一步提高向量化的粒度。另外,与向量化执行方法一样,编译执行也是解决火山模型 CPU 使用效率不高的有效手段,特别是对于表达式计算、元组解析等通用模块尤为有效,甚至可以对整体的执行计划进行编译执行。
相信通过不断地优化,查询执行的效率会不断地提升,进而可以轻松应对各种不同复杂业务的需求。
评论