写点什么

Spark Shuffle 内部机制(一)

用户头像
hanke
关注
发布于: 2021 年 02 月 04 日
Spark Shuffle 内部机制(一)

Spark Shuffle 是什么

Spark Shuffle 是根据数据处理需求将数据按着某种方式重新混洗,以便于后面的数据处理。比如 reduceByKey()的操作,通过先将数据按照 key 进行重新分区以后,然后对每个 key 的数据进行 reduce 操作。


Spark Shuffle 共包括两部分:

  • Spark Shuffle Write

* 解决上游输出数据的分区问题

  • Spark Shuffle Read

* 通过网络拉取对应的分区数据、重新组织,然后为后续的操作提供输入数据


Spark Shuffle 要解决的问题

我们先从总体的角度来看一下 Spark Shuffle 要解决哪些问题,以及大致的解决方案是什么。

  • 如何灵活支持不同计算的场景下不同的需求?排序、聚合、分区?

* 通过灵活的框架设计来满足不同的需求

  • 如何应对大数据量的情况下面临的内存压力?

* 通过内存 + 磁盘 + 数据结构设计来解决内存问题

  • 如何保证 Shuffle 过程的高性能问题?

* 减少网络传输

* map 端的 reduce

* 减少碎小文件

* 按 partitionId 进行排序并合并多个碎文件为一个,然后加上分区文件索引供下游使用


Spark Shuffle 的框架

接下来,我们一起看看 Spark Shuffle 在 Write 和 Read 两个阶段是如何设计对应的框架,来解决上面阐述的问题


Spark Shuffle Write 框架

在 Shuffle Write 阶段,数据操作需要提供的是排序、聚合和分区三个数据功能。但可能在数据场景里,需要的功能只是其中一个或者两个,因此 Spark Shuffle 的 Write 整体框架设计成灵活的“map --> combine(可选)--> sort(可选) --> partitions”。具体的框架如下图所示:


如上图所示,根据 Write 阶段不同的数据处理需求,Spark 进行不同的流程选择和数据结构设计来解决计算需求、内存压力和性能问题。接下来会进行每个场景详细的分解介绍。


1) 仅需要 map,不需要 combine 和 sort 的场景


过程

对于输入的 record,经过 map 运算后,会输出 record 对应的 partitionId 和 map 后的数据直接放到内存里其对应分区的 buffer 里,当 buffer 数据满了以后会直接 flush 到磁盘上对应的分区文件里。因此一个 map task 的每个分区对应一个 buffer 和磁盘上的一个文件。


命名

  • BypassMergeSortShuffleWriter


优点

  • 速度快,操作是在内存中进行,直接将 map 处理后的 record 输出到对应的分区文件。


缺点

  • 每个分区需要对应内存的一个 buffer,如果分区个数较多,那么占用的内存就会比较大。另外,每个分区对应一个输出文件,当分区个数过多时,文件打开数,以及下游 Shuffle Read 时的连接数都会很大。因此容易造成资源不足的情况发生。


适用场景

  • 通过优缺点分析,可以看到,这种处理方式只适合分区个数比较小的情况下(可以通过spark.shuffle.sort.byPassMergeThreshold设置),速度比较快。另外也不需要额外的 combine 和 sort 的场景,比如简单的 partitionBy(), groupBy()类似的 transformation 操作。


内存消耗

  • 主要是每个分区对应的 buffer 的内存占用


2) 需要 map,需要 combine 的场景


过程

1) Map 和在线聚合

对于输入的 record,经过 map 运算后,会放进一个类似 HashMap 的数据结构里(PartitionedAppendOnlyMap,后面会对这个结构进行介绍),如果 HashMap 里key存在,那么会将该 record 的值和对应结构里的 value 进行 combine 操作,然后更新key在这个 HashMap 里的数值,实现在线聚合的功能。否则,直接将 record 的值更新到 HashMap 里即可。

其中这里的key = partitionId + record key


2) Sort&Spill

当 HashMap 里不够存放时,会先进行扩容,扩容为原来的两倍。如果还存放不下,然后会将 HashMap 里的 record 排序后放入磁盘里。然后清空进行 HashMap,继续进行后续的在线聚合操作。

其中这里对 record 进行排序的key:

  • 如果需要 map 端按照 record key 排序,那么这里排序的key = partitionId + record key

  • 否则,这里的排序key = partitionId


3) Sort&Merge

在输出文件时,会将 Spill 到磁盘的和内存里的数据,进行 Sort 和 Merge 操作,然后按 PartitionId 输出数据。最终会生成一个输出文件(存放该 map task 产生的所有的分区文件) + 分区数据索引文件供下游 Shuffle Read 使用。


命名

  • SortShuffleWriter的一种场景


优点

  • 通过设计的 HashMap 数据结构支持在线聚合的方式处理 map 端的 combine 操作,不用等所有的 map 数据都处理结束,提升性能,也节省了内存空间。

  • 利用内存+磁盘的方式来可以解决大数据量下面临内存不足的问题。

  • 一个 map task 只输出一个总的分区文件和分区索引,减少了碎文件的个数,提升了 I/O 资源利用率和性能,对下游也比较友好。适合分区数据较多的 Shuffle 情况。


缺点

  • 在线聚合的方式,需要对 record 一条一条的处理,相对 Hadoop 将 map 全处理结束后,再统一的进行聚合的方式相比,无法定制灵活的聚合方式。

  • 直接通过原生 HashMap 的方式的话,会存在聚合后,再次 copy 到线性数组结构时进行排序引发的额外的 copy 和内存占用问题。需要设计更好的数据结构来支持高效的 combine 和 sort 操作。


适用场景

  • 大分区且需要 map 端 combine 的操作,比如 reduceByKey(), aggregateByKey()等。


**数据结构PartitionedAppendOnlyMap**

  • 实现

* HashMap + Array的结合体。

* 仅支持对数据的增加和更改操作(Append Only),不支持删除操作。

* 底层是一个线性数组,通过对 key(partitionId + record key)的 hash,然后按照 hash 值在数组中通过探测的方式找到存放和读取的位置

* 如果 hash 值对应的位置没有存放数据,那么就把对应的数据放进去。

* 否则,看临近的下一个位置是否是空的,依次探测,直到找到一个空的位置放进去即可。

* 读取的时候也是类似的根据 hash 值探测方式读取,然后将值 combine 以后再 update 回去。

* 扩容时将底层数组再扩大一倍,然后对数组里的数据再 rehash 重新放置。

* 局部排序: 将数组的数据都移到最前方的位置,然后按照对应的 partitionId 或者 partitionId+key 进行线性数组排序。

* 全局排序: 再将 spill 的数据和内存里的数据进行全局排序和 merge 时,通过建立最小堆或者最大堆进行全局排序合并操作即可。

  • 优点

* 不需要额外占用内存空间来进行 HashMap 结构到线性数组结构的 copy 转化过程。


内存消耗

  • 在线聚合需要的 HashMap 数据结构 + 中间聚合过程用户代码所占用的数据空间 + 排序所占用的空间

  • 取决于输入数据量的大小和用户聚合函数的复杂性


3) 需要 map,需要 sort,不需要 combine 的场景


过程

1) Map

对于输入的 record,经过 map 运算后,会被放入内存一个类似线性数组的结构里(PartionedPairBuffer,下面会介绍该数据结构)


2) Sort&Spill

当数组里放不下 map 后的数据时,会先进行扩容,扩容为原来的两倍,并移动数据到新分配的空间。如果还存放不下,会把 array 的 record 进行排序后放入磁盘。然后清空数据,继续后续的 map 数据存储。

其中这里对 record 进行排序的key:

  • 如果需要 map 端按照 record key 排序,那么这里排序的key = partitionId + record key

  • 否则,这里的排序key = partitionId


3) Sort&Merge

在输出文件时,会将 Spill 到磁盘的和内存里的数据,进行 Sort 和 Merge 操作,然后按 PartitionId 输出数据。最终会生成一个输出文件(存放该 map task 产生的所有的分区文件) + 分区数据索引文件供下游 Shuffle Read 使用。


命名

  • SortShuffleWriter的一种场景


优点

  • 通过一个数组的结构来支持 map 端按照 partitionId 或者 partitionId + key 的方式进行排序。

  • 同样的利用内存+磁盘的方式来解决内存不足的问题。

  • 一个 map task 只输出一个总的分区文件和分区索引,减少了碎文件的个数,提升了 I/O 资源利用率和性能,对下游也比较友好。适合分区数据较多的 Shuffle 情况。


缺点

  • 需要额外的排序过程


适用场景

  • 大分区且需要 map 端按 key 进行 sort 且不需要 combine 的场景,比如 sortByKey()

  • 或者大分区限制下 map 端的不需要 combine 的场景,partitionBy(1000)


**数据结构PartitionedPairBuffer**

  • 实现

* 底层是特殊 array。

* 一条 record 会占用 array 里两个相临空间,第一元素是partitionId+record key,第二元素是 recordvalue

* 扩容时将底层数组再扩大一倍,然后对数组里的数据 copy 到新的空间里。

* 局部排序直接将数组里的数据然后按照对应的 partitionId 或者 partitionId+key 进行线性数组排序。

* 全局排序再将 spill 的数据和内存里的数据进行全局排序和 merge 时,通过建立最小堆或者最大堆进行全局归并排序合并操作即可。

  • 优点

* 支持内存+磁盘的方式,可以支持按 partitionId 或者 partitionId + record key 的方式进行排序。


内存消耗

  • 存放数组的 sort 所需要的数组 + 排序所占用的空间

  • 取决于输入数据量的大小


4) 更进一步优化 Serialized Shuffle


过程

1) Map&Serialized&Index

  • 输入数据通过 map 处理后得到带分区的<K, V> record,然后会将 record 序列化后放到相应的 pages 中,并在 page0 中记录 record 在对应的 page 的 offset 信息,以便能够快速找到 record,其中 page0 里会包括 partitionId, pageNum 和 pageOffset。


2) Sort&Spill

  • 按照 page0 里记录的 partitionId 进行排序,当内存中放不下 page0 和相应的存放序列化数据的 pages,会将 pages 和排序后的 page0 先 spill 到磁盘上,解决内存压力问题。


3) Sort&Merge

  • 当 map 端的数据都处理结束后,将内存排序后的 page0+pages 数据和磁盘上 spill 的文件及数据按照 partitionId 做全局的归并排序,最后输出一个文件包含排序后的所有分区数据+分区索引文件供下游 reduce task 使用。


命名

  • UnsafeShuffleWriter的一种场景


优点

  • 减少 memory 占用

* Serialized Shuffle 是针对 map 里不需要聚合和排序但是 partition 个数较多的情况下一种优化。前文中提到对于这种场景,用的是数组结构,存放的是普通的 record 的 Java 对象。当 record 比较大时,非常占用内存,也会导致 GC 频繁。Serialized Shuffle 将 record 序列化以后放在内存,减少内存的占用。

  • 提高性能

* 排序时只需要对 record 的索引 page0 按照 partitionId 进行排序,不涉及到具体 record 的操作,不会涉及额外的序列化反序列化操作,提高排序性能。

  • 分页技术可以使用非连续的内存空间

* 相对于数组,分页的方式不要求所有的 pages 分配的空间是连续的,因此可以充分利用零碎内存空间。

  • 可以使用堆外内存

* page0 和 pages 上的序列化数据可以使用堆外内存技术来存放数据。


缺点

  • 不支持 map 端的排序和聚合(因为 record 被序列化了)

  • 分区个数有限制需要小于 2<sup>24</sup> (page0 里 partitionId 用 24bits 来表示)

  • 单个 serialized 需要小于 128M


适用场景

  • 适用于 map 端不需要排序和聚合,partition 个数较大,record 本身也比较大的场景


**数据结构PointerArray**

  • 实现

* 实质上就是一个 Long Array

* 前 24bits 表示的 partitionId,接着 13bits 表示 pageNum,最后的 27bits 表示的是在 page 里 Offset

* 用来存放序列化后的 record 在对应的哪个 page 及 page 中的 offset

  • 优点

* 占的内存空间小,也能快速定位序列化后的 record


内存消耗

  • page0 + 数据 record 序列化后 pages 的大小 + 排序占用的空间

  • 取决于输入数据量的大小 + record 本身序列化后的大小


在下一篇文章里,我们会继续介绍 Spark Shuffle Read 部分的框架设计。敬请关注。


Reference

Spark 官方文档

《大数据处理框架 Spark 的设计与实现》

Hadoop的MapReduce到底有什么问题?


本网站的文章除非特别声明,全部都是原创。

原创文章版权归数据元素所有,未经许可不得转载!

**了解更多大数据相关分享,可关注微信公众号"数据元素"**


发布于: 2021 年 02 月 04 日阅读数: 703
用户头像

hanke

关注

凡是过往,皆为序章 2019.09.11 加入

热爱大数据技术沉淀和分享,致力于构建让数据业务产品更易用的大数据生态圈,为业务增值。

评论 (2 条评论)

发布
用户头像
这图是怎么画的 :)
2021 年 02 月 18 日 11:02
回复
Draw.io :)
2021 年 02 月 20 日 19:43
回复
没有更多了
Spark Shuffle 内部机制(一)