写点什么

OpenMLDB: 一文了解窗口倾斜优化技术细节

发布于: 刚刚
OpenMLDB: 一文了解窗口倾斜优化技术细节

简介

OpenMLDB是针对 AI 场景优化的开源数据库项目,实现了数据与计算一致性的离线 MPP 场景和在线 OLTP 场景计算引擎。MPP 引擎可基于 Spark 实现,并通过拓展 Spark 源码实现数倍性能提升。本文主要解释OpenMLDB如何基于 Spark 来解决窗口数据的倾斜问题。

背景

数据倾斜是在大数据处理场景下常见的一种现象,它由某一分区数据量过大造成。数据倾斜会导致倾斜分区与其他分区的运算时间产生巨大差距,换句话说就是倾斜数据分区的计算任务与其 cpu 资源严重不匹配。最终会造成多等一的情况——多个小数据量的分区计算完毕后等待倾斜的大数据量分区,只有倾斜分区计算完毕才能输出结果。这对效率来说是巨大的灾难。

在机器学习的特征计算中,涉及到很多的窗口计算。在窗口计算下,如果出现单一 key 数据量过大,也会导致某一分区数据过多,从而产生数据倾斜问题。而传统数据倾斜中分区优化的方案,如:数据加前缀再分区,是不适合窗口计算场景的。它会导致窗口计算场景下最终计算结果错误。因此 OpenMLDB 提出了一种基于 Spark 的窗口数据倾斜分区优化方案——在扩充窗口数据后,再根据分区键以及时间片对倾斜数据进行再分区。

数据倾斜介绍


SELECT sum(Amount) OVER w AS sum,FROM inputWINDOW w as (PARTITION By Gender Order By Time ROWS BETWEEN 2 PRECEDING AND CURRENT ROW)
复制代码

在上图的数据中,因为主键“Gender”只有两个值,离线计算最好情况下只能将数据划分到两个 partition,即并行度只有 2。此时同样的分区资源,计算任务的数据量差距却很大。在后续的计算中,“male”所在的分区计算的时间必然比“female”所在分区计算的时间大。当倾斜分区数据量变大的时候,这个时间差距还会被不断拉大。且由于 spark 的底层执行里每个 partition 只有一个 thread,这使得整个 stage 周期里只有两个 thread 在工作,还有很多其他的 thread 一直处于空闲状态,这也会导致严重的性能浪费。

传统数据倾斜解决方案

对于倾斜数据的优化,解决根本问题的方法就是对倾斜数据进行再分区,把原本一个倾斜分区内庞大的数据块,分散成多个小的数据分区。以此来达到对大数据进行拆分从而提高计算效率的目的。

在常见的数据再分区策略中,有通过分区键加上不同前缀从而进行再分区的策略,也有通过多加几列作为分区键进行再分区的策略。但是这些简单的再分区方案,在窗口计算中,都会造成计算错误。

如果采用数据加前缀再分区的简单分区优化方案,原本同一个 partition 下的数据会被拆分到不同的 partition。而窗口计算涉及到数据之间滑动取值的情况,因此如果只是简单的将分区内的数据再拆分,窗口计算将无法取到原本相邻的数据,这会导致最终计算结果的错误。

OpenMLDB 窗口倾斜优化方案

整体思路

我们的方案总体思路是在上述倾斜数据再分区的基础上,进一步保证各个再分区的数据块在窗口计算时结果正确。方案里采用的方式是在每个再分区的数据块中,根据窗口需要滑动的数据条数,进行一定的窗口数据扩充。

在优化中,总体上采用的就是再分区+窗口补充的 repartition 策略来对数据进行分区。思路是采用空间换时间的策略,优点是计算时间短性能高,缺点是补充的窗口数据会造成一定的数据冗余,导致占用更多内存。

下面详细介绍本方案的技术细节,倾斜优化方案具体的实现主要分为五步,以下面 SQL 为例。

SELECT SUM(Amount) OVER W1 AS sumFROM InputTableWINDOW W1 AS (PARTITION BY Gender ORDER BY Time ROWS PRECEDING 2 AND CURRENT ROW)
复制代码

第一步:数据评估——统计窗口分区键的数据分布

这一步需要对总体的数据做一个评估,统计出一些相关的指标,比如数据划分的分界线,以及 partition 内数据的条数等。参数介绍如下。

总体来说,第一步的数据评估是对数据各项指标进行统计和计算,并在统计后,对数据进行判断以及处理,但由于涉及到全量数据的遍历,会比较耗时。对此我们也有一个额外的优化,我们支持通过读取提前预处理好的 distribution 表来跳过第一步中统计的部分。这样就可以在凌晨或者不需要处理业务时,执行统计任务,将数据结果统计完成,来避免用户需要执行处理逻辑时,在第一步等待时间太久。

// Use skew configval distributionDf = ctx.getSparkSession.read.parquet(ctx.getConf.windowSkewOptConfig)logger.info("Load distribution dataframe")
复制代码

第二步:数据标记——标记重新分区的编号


这一步根据 Distribution Table 中对数据的统计结果,来对数据进行划分,并对划分后的数据打上(“PART_ID”)和(“EXPANDED_ROW"),作为不同数据块重分区后的分区标号以及是否为扩充数据的标记。

在最开始的 Join 中,我们采用了 Broadcast Join,来提升 Join 时的效率。Broadcast Join 是 Spark 中一种可以避免 shuffle 的 Join,一般一张大表和一张小表进行 Join 时可以使用 Broadcast Join,它是通过将小表的数据广播到每个 Executor 计算节点上,再通过 map 聚合的方式,来避免了数据的 shuffle。在我们的表中,Distribution Table 比 Input Table 小很多,因此刚好可以采用 Broadcast Join。

在 Join 之后,可以得到数据分界线,且当 PERCENTILE_i 为第 i 条分界线时,符合(PERCENTILE_i,PERCENTILE_i+1] 的数据就为第 i 个数据块,采用固定策略划分完结果之后。就可以根据划分结果,生成新的分区标号——“PART_ID”。表数据介绍如下。

第三步:数据扩充——对不同分块的数据进行窗口数据的扩充

对窗口数据进行扩充是 OpenMLDB 关于窗口倾斜优化中,比较核心的部分。由于数据较多,为了便于理解,下面只展示“male”部分数据。

具体实现时,我们对每个需要扩充的数据块进行全体窗口数据的扩充,即通过遍历,对每个需要扩充数据的重分区数据块都扩充到第一条数据。过程图解如下,深色代表当前遍历的分区,浅色代表当前分区需要补充的窗口数据。

1.过滤出需要扩充的数据

对于 Time 为 1 和 3,“PART_ID" = 1 的第一个重分区数据块,由于是时间最先的数据块,上面已经没有数据可以给他们补充了,因此会跳过。

对于 Time 为 5,“PART_ID" = 2 的第二个重分区数据块,会将所有时间比当前数据块前的数据都取出来,也就是“PART_ID" = 1 的数据块。

对于 Time 为 7,“PART_ID" = 3 的第三个重分区数据块同理,将所有时间比当前数据块前的数据都取出来,也就是取第一个和第二个数据块作为扩充的窗口数据。

后续第四个重分区数据块也同上,将所有需要的数据取出,因此不再赘述。

2.更改过滤数据的 ID 并进行 Union

将数据取出来之后,我们还需要将(“EXPANDED_ROW")改成 true,代表是扩充的窗口数据。改完(“EXPANDED_ROW")之后,只需要不断的和原来的 AddColumn Table 进行 Union,我们就完成了一个数据块的窗口数据扩充。以第二块数据块为例子,下图 Union Table 中,不同颜色代表不同的重分区数据块,可以看到经过 filter 和 union,第二块数据块已经扩充好了数据。

对于其他数据块窗口扩充的方式和第二块数据块方式的思路一样,在过滤以及扩充完后,再和之前的 Union 表进行 Unoin 即可。

下面展示最终第四块数据块扩充完窗口上数据后得到的最终 Union Table。

第四步:数据分区——根据再分区键进行重新分区

虽然之前我们通过不同色块来标记不同的再分区数据,但实际上,到了第四步,我们才真正的对数据进行了重分区,底层我们依赖了 Spark 中的 repartition 函数进行数据重分区。在第三步后,我们可以得到最终的 Union Table,此时只需要根据分区键(“Gender”)和(“PART_ID")进行 repartition,就可以将数据拆分到不同的 executor 上。

第五步:数据计算——对分区后的数据进行计算

在第三步中,我们知道那些"EXPANDED_ROW" = false 的数据列是新补充进来的窗口数据,而且在实际计算中,他们是不需要参与计算的。因此只需要对"EXPANDED_ROW" = true 的数据进行窗口计算,最终便可得到计算结果。

值得特别说明的是,由于 OpenMLDB 底层处理引擎是自主研发设计的,因此窗口计算的内部逻辑也是由 OpenMLDB 实现的。下面贴出相关代码进行讲解。

repartitionDf.rdd.mapPartitionsWithIndex {  case (partitionIndex, iter) =>    val computer = WindowAggPlanUtil.createComputer(partitionIndex, hadoopConf, sparkFeConfig, windowAggConfig)    windowAggIter(computer, iter, sparkFeConfig, windowAggConfig)}
复制代码

对于第四步生成的 repartitionDf,我们在外层调用了 Spark 的 mapPartitionsWithIndex 方法。之后对于每个分区,OpenMLDB 都构建一个 computer 计算单元,用来处理接下来的窗口计算。之后则是正式进行窗口计算,调用 windowAggIter 方法。

InputIter.flatMap(row => {  if (lastRow != null) {    computer.checkPartition(row, lastRow)  }  lastRow = row   val orderKey = computer.extractKey(row)  val expandedFlag = row.getBoolean(config.expandedFlagIdx)  if (!isValidOrder(orderKey)) {    None  } else if (!expandedFlag) {    Some(computer.compute(row, orderKey, config.keepIndexColumn, config.unionFlagIdx))  } else {    computer.bufferRowOnly(row, orderKey)    None  }})
复制代码

在 windowAggIter 方法里,我们对传进来的迭代器 InputIter 进行了 flatMap 操作,之后再检查是否分区内数据有没有分错,如果有分错的 row,则会对 window 进行重新设置。接下来检查 orderKey 没有问题后,会对 expandedFlag 也就是上图中的(“EXPANDED_ROW")作判断,如果为 true,则证明当前 row 是扩充的数据,因此 computer 计算单元只进行 buffeRowOnly 操作,缓存扩充的窗口数据进内存,为之后真实需要计算的数据使用。如果为 false,此时 expandedFlag 也为 false,computer 计算单元就进行真正的计算 compute,在 compute 方法里会读取之前缓存的数据并进行计算,之后会返回处理完成的 row。compute 方法内部是由 c 实现的,有兴趣的同学可以去查看 OpenMLDB 里相关源码。

性能对比测试

Benchmark 性能测试使用 Kaggle 公开数据集,也就是 New York City Taxi Trip Duration 竞赛的数据集,使用测试的 SQL 语句如下:

SELECT    sum(vendor_id) over w as w_sum_vendor_id,    max(vendor_id) over w as w_max_vendor_id,    min(vendor_id) over w as w_min_vendor_id,    avg(vendor_id) over w as w_avg_vendor_id,    sum(pickup_longitude) over w as w_sum_pickup_longitude,    max(pickup_longitude) over w as w_max_pickup_longitude,    min(pickup_longitude) over w as w_min_pickup_longitude,    avg(pickup_longitude) over w as w_avg_pickup_longitudeFROM taxi_skew_allWINDOW w as (partition by vendor_id order by pickup_datetime ROWS BETWEEN 10000 PRECEDING AND CURRENT ROW)
复制代码

对比开源版本 SparkSQL 以及开源版本 OpenMLDB 进行测试,测试结果如下。

可以看到,OpenMLDB 引擎即使在不开启倾斜优化的情况下,在不同的倾斜比例中,相对于 Spark 引擎,仍然有 4 倍以上的性能提升,这种性能提升主要是通过 OpenMLDB 底层高效的引擎实现来保证的。而 OpenMLDB 在开启了窗口倾斜优化之后,通过调整不同的再分区数,相比 OpenMLDB 不开启倾斜优化也还能提升大约 60%~140%的性能。

总结

OpenMLDB 通过扩充窗口数据加上数据再分区的策略,实现了窗口计算下数据倾斜的优化。策略总体上采用了空间换时间的思想,即将原本集中在一个分区中的倾斜数据,在存储空间上进行窗口数据的扩充,之后再将数据分散至多个分区并行计算,从而增加计算的并行度,来换取更短的计算时间,并在最终实现了效率的大幅提升。此外在数据测试中,我们发现越在极端的倾斜分布下,OpenMLDB 越有更好的表现。总的来说,对于窗口计算下的数据倾斜场景,OpenMLDB 实现的数据倾斜优化有着不错的效果。

本文介绍了常见的滑动窗口数据倾斜问题,并且剖析了 OpenMLDB 解决数据倾斜的实现方案以及展示最终的性能优化结果。如果你对 Spark 优化、大规模特征计算、OpenMLDB 数据库等感兴趣,我们会分享更多类似的技术文章,欢迎大家继续关注 OpenMLDB专栏 。

发布于: 刚刚阅读数: 2
用户头像

AI for every developer,AI for everyone 2021.06.21 加入

还未添加个人简介

评论

发布
暂无评论
OpenMLDB: 一文了解窗口倾斜优化技术细节