写点什么

MO 干货 | shuffle 执行计划解析(上篇)

作者:MatrixOrigin
  • 2024-03-05
    上海
  • 本文字数:5697 字

    阅读完需:约 19 分钟

MO干货 | shuffle执行计划解析(上篇)

作者:倪涛 MO 产品布道师

目录

Part 1. 背景

Part 2. Shuffle 实现原理

Part 3. Colocate shuffle


Part 1 背景

shuffle 算法的基本原理,就是给输入的数据进行分桶,每个桶内的数据都可以单独处理并且输出结果。这样可以减少汇总数据时 hash 表的大小。

以 group 算子为例,实现方式有 sort group 和 hash group 两种。hash group 在多并发时,每个并发都会对输入数据进行 hash 并放到 hash 表中进行处理。所有 hash 表需要汇总到一起再做一次 merge,以得到最终结果。这种方式叫做 merge group。另外一种方式是先将数据进行 shuffle 分发,每个并发接收并处理不同的数据,并且直接输出结果,不需要将 hash 表 merge 到一起。这种方式叫做 shuffle group。

为什么需要支持 shuffle?原因主要有以下三点。:

  1. 解决 hash 表太大导致分配内存失败的问题:以 merge group 为例,由于需要将所有数据 merge 到一起,如果最后 merge 的 hash 表基数太大,则需要分配的内存就会非常大,会导致分配内存失败。如果超过单 CN(compute node, 表示计算节点)的内存上限,且不考虑 spill 的情况下,还会导致 oom。而 shuffle 则可以将一个大的 hash 表切分成多个小的 hash 表进行处理。

  2. 提高计算节点的横向扩展性:仍以 merge group 为例,由于需要将所有数据 merge 到一起,在单 CN 上进行单并发处理。如果 hash 表基数比较大,这一步处理比较慢,就会成为瓶颈,即使增加再多 CN 也无法加速这样的 query。而 shuffle group 则不存在这样的单点瓶颈。

  3. 提升性能:当 hash 表太大时,随机访问 hash 表会导致非常高的 cache miss 概率,而 cache miss 会导致性能显著下降。通过 shuffle 将 hash 表切分成多个小 hash 表进行处理,对每个小 hash 表的随机访问 cache 命中率大大提高,进而提升整体性能。

Part 2 Shuffle 实现原理

shuffle 对数据进行分桶,桶的数目在编译 pipeline(在执行阶段,算子被组成流水线来执行。一道流水线被称为一个 pipeline)时,由运行环境的 CN 数量,CPU 核数动态决定。例如当前租户的环境是 3CN,每个 CN10 核,shuffle 算子会自动计算分桶数量为 30。

shuffle 的分桶算法有两种,range shuffle 和 hash shuffle。hash shuffle 直接通过一个 hash 函数得到数据的分桶号。range shuffle 则需要优化器在编译阶段给出分桶策略。对于均匀分布的数据,分桶策略较为简单。以 tpch1T,lineitem 表为例。l_orderkey 的取值范围是 1 到 60 亿,且均匀分布。对该列进行 shuffle 的策略则是按照最小最大值进行平均分配,1-2 亿分到桶 1,2 亿到 4 亿分到桶 2,依次类推。对于非均匀分布的数据处理算法比较复杂,将在后文详细介绍。

MO 在编译阶段由优化器决定使用 range shuffle 还是 hash shuffle,通常优先使用 range shuffle,如果出于某些原因无法使用 range shuffle,则会使用 hash shuffle,保证分桶的均匀性。

在 MO Pipeline 中对应 shuffle 算法的算子有两个,shuffle 和 dispatch。

Shuffle 算子的功能是将输入的数据进行分桶,并且输出分桶后的数据。例如输入一个 batch(MO 是列存数据库,算子对数据的处理以 batch 为基本单位),batch 内包含 8192 行数据,值为 1、2、3 ... 一直到 8192。如果需要分两个桶,可能的分桶结果是将 1-4096 放入一个新 batch,4097-8192 放入一个新 batch 内,然后将这两个新的 batch 再发送出去。

由于并发可能非常多,分桶后每个桶内数据可能很少,所以直接输出是不太合适的。因此 shuffle 算子需要在内部维护一个内存池,将分桶后的数据存放起来,攒到一定程度的时候再发送出去。或者是当 shuffle 算子读完所有输入数据后,就需要将内部所有数据都发送出去。

在 pipeline 里大部分算子都是一个 batch 输入对应一个 batch 的输出,但是 shuffle 算子并不会这样。由于内部维护的内存池,shuffle 可能一段时间内没有输出,也可能连续输出多个 batch。内存池的策略对 shuffle 算子的性能有比较重要的影响。

Dispatch 算子负责将 shuffle 分桶后的数据发送到指定的位置。shuffle 算子会在 batch 中设置一个字段 shuffleIDX,dispatch 算子读取这个字段,并且根据指定的策略找到需要发送的目的地并将 batch 发送出去。需要注意的是 shuffleIDX 这个字段不会参与 batch 的序列化和反序列化,因此 shuffle 算子和 dispatch 算子之间一定不能有跨 CN 的网络传输,否则会导致错误。

dispatch 算子有三种发送数据的策略,绝大部分情况下使用第一种策略,后面两种策略在 hybrid shuffle 中使用,将在后续进行介绍。

  1. 直接根据 shuffleIDX 找到唯一的对应目的地(可能在本地 CN,也可能在远端 CN),并发送过去。可能会有跨 CN 的网络传输。

  2. 根据 shuffleIDX 找到当前 CN 的对应目的地,并发送过去。这种情况下一定只给本地发送一份,不会有跨 CN 的网络传输。

  3. 根据 shuffleIDX 找到每个 CN 的对应目的地,并发送过去。这种情况下会给每个 CN 都发送一份,必定会有跨 CN 的网络传输。

目前 MO 支持开启 shuffle 的算子有四个:load、scan、group 和 join。不过理论上任何算子对 shuffle 都是无感知的,以后 MO 将会支持对更多的算子开启 shuffle 算法。

Load 算子

Load 算子中部分复用了 shuffle 的分发逻辑来保证多并发读以及多并发写的情况下,写入 s3 的数据能保留数据原始排序。

Scan 算子

对 scan 算子的 shuffle,是在编译 pipeline 的阶段,将所有需要读的 block 分发到多个 CN 上。这里的 shuffle 是以 block 为单位进行,还没有到实际读取 block 数据的阶段。如果是 hash shuffle,则对 blockname 做 hash。如果是 range shuffle,则取 block zonemap 里 min max 的中间值进行 range 计算。

Group 算子

对 group 算子的 shuffle,是将 group 算子在编译 pipeline 时静态展开成多个 pipeline,对 scan 算子实际读取的 block 数据进行 shuffle,理论上以每行数据为单位。每个 pipeline 接收 shuffle 输出的数据。此时 group 算子不需要 merge group,每个单独的 pipeline 都可以直接输出数据到下一个 pipeline。

Join 算子

对 join 算子的 shuffle,同样是将 join 算子在编译 pipeline 时静态展开成多个 pipeline。join 的输入分为 probe 和 build 两边,对两边的输入数据采用同样的 shuffle 算法,保证相同数据会分到同一个桶内。每个 join 算子也可以直接将 join 结果输出,而不需要做 merge。

如果需要查看一条语句的执行计划是否开启 shuffle,可以通过 explain 语句,查看执行计划中是否有 shuffle 关键字。例如 tpch q4,可以看到 lineitem 表 join orders 表,join 类型是 right semi,lineitem 是 probe 表,orders 是 build 表。并且开启了 shuffle join,在 lineitem 表上对 l_orderkey 这一列做了 range shuffle。由于 join 两表采用同样的 shuffle 算法,所以在 orders 表上同样会对 o_orderkey 这一列做 shuffle。

QUERY PLANProject  ->  Sort        Sort Key: orders.o_orderpriority INTERNAL        ->  Aggregate              Group Key: orders.o_orderpriority              Aggregate Functions: starcount(1)              ->  Join                    Join Type: RIGHT SEMI   hashOnPK                    Join Cond: (lineitem.l_orderkey = orders.o_orderkey) shuffle: range(lineitem.l_orderkey)                    ->  Table Scan on tpch_10g.lineitem                          Filter Cond: (lineitem.l_commitdate < lineitem.l_receiptdate)                          Block Filter Cond: (lineitem.l_commitdate < lineitem.l_receiptdate)                    ->  Table Scan on tpch_10g.orders                          Filter Cond: (orders.o_orderdate >= 1997-07-01), (orders.o_orderdate < 1997-10-01)                          Block Filter Cond: (orders.o_orderdate >= 1997-07-01), (orders.o_orderdate < 1997-10-01)
复制代码

Part 3 Colocate shuffle

shuffle 的执行计划在多 CN 上虽然可以减少 hash 表的开销,但是同样可能会导致数据在网络传输上传输的开销增加。一个好的 shuffle 执行计划,必须要尽可能减少数据在网络上的传输。所以接下来要介绍的 colocate shuffle 优化对性能是至关重要的。

仍以 tpch1T,q4 为例。lineitem 表输出行数大约为 38 亿行,orders 表输出行数大约为 5 千万行。如果采用 broadcast join,hash 表太大难以处理,并且需要广播 5 千万行的 hash 表,开销也不算低。如果采用普通的 shuffle join,在 3CN 上,可以计算出需要走网络传输的数据量大约为(38 亿+5 千万)/3*2,大约是 26 亿行左右。虽然 hash 表变小了,但是网络传输开销太大,性能会降低到难以接受。此时使用 colocate shuffle join 是最优选择。

colocate shuffle join 的具体原理是,从 scan 开始,利用 block zonemap 将数据分布到对应的 CN 上,并且尽可能保证后续的 shuffle 算法会将数据 shuffle 到当前 CN 上进行处理,最大程度减少数据跨网络发送。在这个例子中,优化器会告诉 scan 算子,block zonemap 处于 1-20 亿之间的数据在 CN1 上读取,20 亿-40 亿之间的数据在 CN2 上读取,40 亿到 60 亿之间的数据在 CN3 上读取。同时优化器会告诉 shuffle join,1-20 亿之间的数据需要 shuffle 到 CN1 上进行计算,20 亿-40 亿之间的数据 shuffle 到 CN2 上计算,40 亿到 60 亿之间的数据在 CN3 上计算。如果每个 CN 有 10 核,那么具体到 CN1 是 1-2 亿之间在第一个 pipeline 上处理,2 亿-4 亿在第二个 pipeline 上处理,以此类推。具体到 CN2 是 20 亿-22 亿之间在第一个 pipeline 上处理,22 亿-24 亿在第二个 pipeline 上处理,以此类推。具体到 CN3 是 40 亿-42 亿之间在第一个 pipeline 上处理,42 亿-44 亿在第二个 pipeline 上处理,以此类推。

此时,只有极少数的 block,由于数据正好跨越在 shuffle 的边界上,需要进行跨网络传输。例如某个 block zonemap 为 19 亿-21 亿,这个 block 可能是在 CN1 上进行读取,读取到的 8192 行里一部分应该在 CN1 上计算,一部分应该发送到 CN2 进行计算。由于 l_orderkey 列是主键,TAE(MO 的存储引擎)会保证主键在 S3 文件里的的排序,block 与 block 之间应该是重叠非常少,需要跨网络传输的数据应该是极少数。整体流程示意如下图所示。

由此可见,想要开启 colocate shuffle ,必须是 range shuffle。如果使用 hash shuffle,则一定无法开启 colocate shuffle。

另外 colocate shuffle 还有个优化是,对于绝大多数 block,可以直接通过 zonemap 就判断出这个 block 可以整体 shuffle 到某个桶内,而不需要对每一行进行读取,计算,重新组成新的 batch。在这种情况下,colocate shuffle join 几乎没有引入新的开销,但是比 broadcast join 减少了广播 hash 表的开销,显著降低了 hash 表随机访问导致 cache miss 的开销。实测多 CN 场景下性能几乎是 broadcast join 的十倍以上。并且 colocate shuffle join 具有良好的多 CN 扩展性,可以达到线性扩展,甚至某些场景下可以超过线性扩展性。

Colocate shuffle 在分布式场景下对性能提升很大,不过具体提升程度是与数据的排序性,block 与 block 之间的重叠度有关的。通常主键或者 clusterby 列具有良好的排序性,提升最大,其他情况则 colocate shuffle 可能退化成普通 shuffle。另外由于 join 是对两边做同样的 shuffle 策略,所以有可能是两边都开启 colocate,也有可能只有一边可以开启 colocate。由于 join order 的策略总是将小表放在 build 端,所以通常是优先对 probe 端的表开启 colocate。

目前 MO 对 colocate shuffle join 的支持是优化器自动识别计算的。比起同类数据库的实现,优势在于不依赖手工收集统计信息,不需要修改 DDL,对计算节点数量(CN 数)以及并发数没有任何限制,扩缩容时也可以自动计算出新的 shuffle 执行计划,用户无需进行任何额外配置或调整,从而实现了完全透明的操作体验。主要体现在以下几个方面:

首先是统计信息,colocate shuffle join 必须使用 range shuffle,range shuffle 又依赖优化器给出合理的 shuffle 策略,这就必须依赖准确的 stats(统计信息)。而 MO 的优化器会在访问 block zonemap 时自动计算统计信息,不依赖任何定时或者人工更新统计信息的策略,可以保证在任何情况下都可以通过准确的 stats 来计算 range shuffle 策略。

其次不依赖分区表的分区策略。这里包含几个部分,一是可以支持对任意表进行 colocate shuffle join,不需要用户改动任何 DDL。二是不限制在特定的列上。例如 lineitem join orders on l_orderkey = o_orderkey, 可能应该在 l_orderkey 和 o_orderkey 这两列上开启 shuffle, 但是如果 lineitem join part on l_partkey=p_partkey where l_shipdate < xxx, lineitem 表过滤后输出行数小于 part 表输出,此时应该在 lineitem 表上做 build,part 表做 probe,在 p_partkey 列和 l_partkey 列上做 shuffle,保证 part 表的数据可以实现 colocate。这个策略应当由优化器来进行计算,如果依赖用户指定,就可能无法得到最优执行计划。三是 shuffle 分桶数量不依赖分区的数量,有些情况下用户在建表时很难立刻给出最合适的分区数,还有些情况下,分区和分区之间的数据可能不是很均衡。MO 的实现方式是用户无需感知这一切,全部交给优化器进行计算。

最后是对于 serverless 数据库,CN 的数量随时会发生变化。第一次执行的时候可能是 3CN10 核的场景,需要 shuffle 分 30 个桶,下一次执行可能变成了 5CN10 核的场景,需要分 50 个桶。仍以 tpch1T lineitem 表为例,MO 会自动计算出新的 shuffle 策略为第一个桶处理 1-1.2 亿,第二个桶处理 1.2 亿到 2.4 亿,依次类推。再下一次执行可能又变成了 7CN8 核,MO 依旧会自动计算新的 shuffle 策略。

下篇预告

下篇将继续分享如何处理不均匀数据、hybrid shuffle、shuffle reuse、join reorder 等相关内容,感兴趣的同学可以加入 MO 社区群持续关注


MatrixOne

MatrixOne 是一款基于云原生技术,可同时在公有云和私有云部署的多模数据库。该产品使用存算分离、读写分离、冷热分离的原创技术架构,能够在一套存储和计算系统下同时支持事务、分析、流、时序和向量等多种负载,并能够实时、按需的隔离或共享存储和计算资源。云原生数据库 MatrixOne 能够帮助用户大幅简化日益复杂的 IT 架构,提供极简、极灵活、高性价比和高性能的数据服务。

MatrixOne 企业版和 MatrixOne 云服务自发布以来,已经在互联网、金融、能源、制造、教育、医疗等多个行业得到应用。得益于其独特的架构设计,用户可以降低多达 70%的硬件和运维成本,增加 3-5 倍的开发效率,同时更加灵活的响应市场需求变化和更加高效的抓住创新机会。在相同硬件投入时,MatrixOne 可获得数倍以上的性能提升

关键词:超融合数据库、多模数据库、云原生数据库、国产数据库。

用户头像

MatrixOrigin

关注

还未添加个人签名 2021-12-06 加入

一个以技术创新和用户价值为核心的基础软件技术公司。

评论

发布
暂无评论
MO干货 | shuffle执行计划解析(上篇)_数据库_MatrixOrigin_InfoQ写作社区