写点什么

Spark Shuffle 内部机制(二)

用户头像
hanke
关注
发布于: 2021 年 02 月 22 日
Spark Shuffle 内部机制(二)

在上一篇文章里的Spark Shuffle内部机制(一)中我们介绍了 Spark Shuffle Write 的框架设计,在本篇中我们继续总结一下 Spark Shuffle Read 的框架设计。



Spark Shuffle Read 框架

Spark Shuffle Read 阶段主要解决的是从上游 Map 产生的数据里拉取对应分区的数据,然后进行重新组织和计算,为后续的操作 transformation 提供输入数据。主要包括拉取分区数据 --> Aggregate Func(可选) --> Sort(可选)三步计算顺序。


如上图所示,是一个 Spark Shuffle Read 比较通用的框架流程。接下来会为不同的数据场景需求来分析 Spark Shuffle Read 阶段具体的实现方式和细节。



1) 不需要聚合操作,也不需要 sort 操作


过程

  • 直接从上游 Map 端拉取 reduce task 对应的分区文件到 buffer 里,输出为<K, V>的 record 然后直接继续后续的 transformation 操作。



优点

  • 内存消耗小,实现简单。



缺点

  • 不支持聚合和排序操作。



适用场景

  • reduce 端不需要聚合和排序的场景,比如简单的 partitionBy()操作。



内存消耗

  • 拉取数据的 buffer 空间



2) 只需要 sort 操作,不需要聚合操作


过程

  • 直接从上游 Map 端拉取 reduce task 对应的分区文件放到内存特殊数组PartitionPairedBuffer里(具体数据结构会在下文介绍),当数组里的数据装满后会按照partitionId + record key进行排序,spill 到磁盘。然后清空数组继续装拉取过来的数据。当所有从 map 端拉取的数据都处理完后,会将内存里的数据和 spill 到磁盘里的数据进行全局归并排序,最后交给后面进行其他的 transformation 操作。



优点

  • 支持 reduce 端的 sort 操作

  • 支持内存+磁盘的方式排序,解决内存不足问题,可以处理大数据量



缺点

  • 不支持 reduce 端聚合操作

  • 排序增加时延



适用场景

  • reduce 端只需要排序不需要聚合的场景,比如 sortByKey()



**数据结构PartitionPairedBuffer**

  • 实现

* 底层是特殊 array。

* 一条 record 会占用 array 里两个相临空间,第一元素是partitionId+record key,第二元素是 recordvalue

* 扩容时将底层数组再扩大一倍,然后对数组里的数据 copy 到新的空间里。

* 局部排序直接将数组里的数据然后按照对应的 partitionId 或者 partitionId+key 进行线性数组排序。

* 全局排序再将 spill 的数据和内存里的数据进行全局排序和 merge 时,通过建立最小堆或者最大堆进行全局归并排序合并操作即可。

  • 优点

* 支持内存+磁盘的方式,可以支持按 partitionId 或者 partitionId + record key 的方式进行排序。



内存消耗

  • 存放数组的 sort 所需要的数组 + 排序所占用的空间

  • 取决于输入数据量的大小



3) 需要聚合操作,需要 sort 或者不需要 sort 的操作


过程

  • 从上游 map 端拉取的 reduce tasks 对应的分区文件的数据到 buffer 后,会建立一个 HashMap 的数据结构ExternalAppendOnlyMap对 buffer 的 record 进行在线聚合。如果 HashMap 数据放不下后,会先进行扩容,如果内存还放不下。会先按照 record 的 key 或者 hash 值进行排序,然后 spill 到本地磁盘里。最终 map 端的数据处理结果以后,会将磁盘里 spill 的文件和内存中排好序的数据进行全局的归并聚合,再交给后面的其他的 transformation 操作。



优点

  • 支持 reduce 端的聚合和 sort 操作

  • 支持内存+磁盘的方式在线聚合和排序,解决内存不足和等所有 map 的数据都拉取结束再开始聚合的时延问题,可以处理大数据量



缺点

  • 聚合排序增加内存消耗和时延



适用场景

  • reduce 端需要聚合的场景,比如 reduceByKey(), aggregateByKey()



数据结构ExternalAppendOnlyMap

同上一篇文章里的Spark Shuffle内部机制(一)PartitionedAppendOnlyMap数据结构,是一个类似的 HashMap 结构,只不过PartitionedAppendOnlyMap的 key 是partitionId + record key, 而ExternalAppendOnlyMap的 key 是record key



  • 实现

* HashMap + Array的结合体。

* 仅支持对数据的增加和更改操作(Append Only),不支持删除操作。

* 底层是一个线性数组,通过对 key(record key)的 hash,然后按照 hash 值在数组中通过探测的方式找到存放和读取的位置

* 如果 hash 值对应的位置没有存放数据,那么就把对应的数据放进去。

* 否则,看临近的下一个位置是否是空的,依次探测,直到找到一个空的位置放进去即可。

* 读取的时候也是类似的根据 hash 值探测方式读取,然后将值 combine 以后再 update 回去。

* 扩容时将底层数组再扩大一倍,然后对数组里的数据再 rehash 重新放置。

* 局部排序: 将数组的数据都移到最前方的位置,然后按照对应的 key 的 hash 值或者 key(需要按照 key 排序的场景)进行线性数组排序。

* 如果按照 key 的 hash 值排序,当 hash 值相等时,会进一步看 key 是否相等,来判断是否是 hash 冲突引发的相等。

* 全局排序: 在将 spill 的数据和内存里的数据进行全局排序和 merge 时,通过建立最小堆或者最大堆进行全局排序合并操作即可。

  • 优点

* 不需要额外占用内存空间来进行 HashMap 结构到线性结构的 copy 转化过程。



内存消耗

  • 在线聚合需要的 HashMap 数据结构 + 中间聚合过程用户代码所占用的数据空间 + 排序所占用的空间

  • 取决于分区输入数据量的大小和用户聚合函数的复杂性



到这里,我们基本上对 Spark Shuffle 的 Write 和 Read 两部分的设计有了一个总体的了解。下一篇文章里,会再进一步总结 Spark Shuffle 的前世今世,敬请关注。



Reference

Spark 官方文档

《大数据处理框架 Spark 的设计与实现》

Hadoop之MapReduce内部机制


Spark Shuffle内部机制(一)


更多大数据相关分享,可关注微信公众号,搜索“数据元素”或扫描下方二维码。




发布于: 2021 年 02 月 22 日阅读数: 16
用户头像

hanke

关注

凡是过往,皆为序章 2019.09.11 加入

热爱大数据技术沉淀和分享,致力于构建让数据业务产品更易用的大数据生态圈,为业务增值。

评论

发布
暂无评论
Spark Shuffle 内部机制(二)