写点什么

Spark Shuffle 内部机制(三)

用户头像
hanke
关注
发布于: 2021 年 02 月 24 日
Spark Shuffle 内部机制(三)

在上两篇文章Spark Shuffle 内部机制(一)Spark Shuffle 内部机制(二)中我们分别介绍了 Spark Shuffle Write 和 Read 的框架设计,在本篇中我们继续总结一下 Spark Shuffle 整个的发展历史。


Spark Shuffle 的前世今生

Spark 的 Shuffle 在 Write 和 Read 两阶段有今天灵活的框架设计也是经过一步步不断完善和努力的结果,我们一起来回顾一下它的前世今生。


1) Spark 0.8 之前 HashShuffleWriter

在 Spark 0.8 以前用的是 basic 的 HashShuffleWriter,整体的实现就是 Shuffle Write 框架里介绍是仅需要map,不需要combine和sort的场景。实现上比较简单直接,每个 map task 按照下游 reduce tasks 个数即 reduce 分区个数,每个分区生成一个文件写入磁盘。如果有 M 个 map tasks, R 个 reduce tasks,那么就会产生M * R个磁盘文件。因此对于大分区情况非常不友好,会生成大量的碎文件,造成 I/O 性能下降,文件连接数过大,导致 resource 吃紧,进而影响整体性能。具体可参考下图:


2) Spark 0.8.1 引入文件 Consolidation 的 HashShuffleWriter

由于 basic 的 HashShuffleWriter 生成的碎小文件过多,为了解决这个问题引入了文件 Consolidation 机制。在同一个 core 上运行的所有的 map tasks 对应的相同的分区数据会写到相同的 buffer 里最终对应分区的一个分区文件。如果有 M 个 map tasks, R 个 reduce tasks,C 个 cores,那么最终会产生C * R个磁盘文件。如果 C 比 M 小,那么对比 basic 的 HashShuffleWriter,文件个数有所下降,性能会得到提升。具体过程可参照下图:


3) Spark 1.1 引入 SortShuffleWriter

虽然 Consolidation 的机制在一定程度上减少文件个数,但是当 cores 和 reduce 的 task 过多的时候一个 map task 依然会产生大量的文件。在 Spark 1.1 里首次引入了基于 sort 的 Shuffle Writer,整体的实现是 Shuffle Writer 框架里介绍的需要map,需要sort,不需要combine的场景。每个 map task 的输出数据会按照 partitionId 排序,最终一个 map task 只会输出一个分区文件包括这个 map task 里的所有分区数据 + 分区索引文件供下游 shuffle read 使用,大大减少了文件个数。具体过程可参照下图:


4) BypassMergeSortShuffleWriter

SortShuffleWriter 的引入大大减少了文件个数,但是也额外增加了按 partitionId 排序的操作,加大了时延。对于分区个数不是太大的场景,简单直接的 HashShuffleWriter 还是有可借鉴之处的。BypassMergeSortShuffleWriter 融合了 HashShuffleWriter 和 SortShuffleWriter 的优势,每个 map task 首先按照下游 reduce tasks 的个数,生成对应的分区数据和分区文件(每一个分区对应一个分区文件),在最终提供给下游 shuffle read 之前,会将 map task 产生的这些中间分区文件做一个合并(Consolidation),只输出一个分区文件包含所有的分区数据 + 分区索引文件供下游 shuffle read 使用。具体过程可参照下图:


需要注意的是 BypassMergeSortShuffleWriter 不适合分区比较大的场景,因为在 Shuffle Writer 阶段,一个 map task 会为每个分区开一个对应的 buffer,如果分区过大,那么占用的内存比较大,性能也会有影响。具体可以参照 Spark Shuffle Writer 框架里仅需要map,不需要combine和sort的场景的解释,这里不再赘述。


5) Spark 1.4 引入 UnsafeShuffleWriter

UnsafeShuffleWriter 是一种 Serialized Shuffle,主要是对于 map 里不需要聚合和排序但是 partition 个数较多的情况下一种优化。在Shuffle Writer框架里需要map需要sort的场景中提到对于这种场景,用的是数组结构,存放的是普通的 record 的 Java 对象。当 record 比较大时,非常占用内存,也会导致 GC 频繁。Serialized Shuffle 将 record 序列化以后放在内存,进一步减少内存的占用、降低 GC 频率。具体可参考下图和前篇关于 Shuffle 优化部分 Serialized Shuffle 的介绍:


6) 今天的 Spark Shuffle

在 Spark 2.0 里,第一版的 HashShuffleWriter 彻底退出历史舞台。今天的 Spark Shuffle Writer 只有三种 Writer 方式:

  • Sort

* SortShuffleWriter(Default)

* UnsafeShuffleWriter

* 也叫 Tungsten-sort

  • BypassMergeSortShuffleWriter


是否需要 Sort?

默认模式下用的是 SortShuffleWriter 的方式,但用户也可以通过指定的方式来选择更适合的 Shuffle 方式。

  • 如果分区个数不超过 BypassMergeSort 的阈值spark.shuffle.sort.bypassMergeThreshold,就用BypassMergeSortShuffleWriter

  • 否则就用 Sort 的方式。


样例代码参见: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala

override def registerShuffle[K, V, C](    shuffleId: Int,    numMaps: Int,    dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {  if (SortShuffleWriter.shouldBypassMergeSort(conf, dependency)) {    // If there are fewer than spark.shuffle.sort.bypassMergeThreshold partitions and we don't    // need map-side aggregation, then write numPartitions files directly and just concatenate    // them at the end. This avoids doing serialization and deserialization twice to merge    // together the spilled files, which would happen with the normal code path. The downside is    // having multiple files open at a time and thus more memory allocated to buffers.    new BypassMergeSortShuffleHandle[K, V](      shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])  } else if (SortShuffleManager.canUseSerializedShuffle(dependency)) {    // Otherwise, try to buffer map outputs in a serialized form, since this is more efficient:    new SerializedShuffleHandle[K, V](      shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])  } else {    // Otherwise, buffer map outputs in a deserialized form:    new BaseShuffleHandle(shuffleId, numMaps, dependency)  }}
复制代码

用哪种 Sort 方式?

可以通过spark.shuffle.manager来设置 SortShuffleManager,默认是用的普通的 sort 方式。如果需要用序列化的 sort 方式进行优化的话,可以将该参数设置成tungsten-sort即可。

    // Let the user specify short names for shuffle managers    val shortShuffleMgrNames = Map(      "sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName,      "tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName)    val shuffleMgrName = conf.get(config.SHUFFLE_MANAGER)    val shuffleMgrClass =      shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase(Locale.ROOT), shuffleMgrName)    val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
复制代码


至此,关于 Spark Shuffle 相关的内部机制和历史都基本介绍完毕。具体使用哪种 Shuffle 的方式还需要根据场景实际需求来进行进一步的调优和配置,希望以上几篇文章能对大家有所启发。敬请关注本公众号后续数据相关技术分享。


Reference

Spark 官方文档

《大数据处理框架 Spark 的设计与实现》

Hadoop之MapReduce内部机制

Spark Shuffle 内部机制(一)

Spark Shuffle 内部机制(二)


更多大数据相关分享,可关注微信公众号,搜索“数据元素”或扫描下方二维码。



发布于: 2021 年 02 月 24 日阅读数: 21
用户头像

hanke

关注

凡是过往,皆为序章 2019.09.11 加入

热爱大数据技术沉淀和分享,致力于构建让数据业务产品更易用的大数据生态圈,为业务增值。

评论

发布
暂无评论
Spark Shuffle 内部机制(三)