Spark ShuffleManager
SparkEnv 中 Shuffle 的初始化
ShuffleManager 的几次变更
val shortShuffleMgrNames = Map(
"hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager",
"sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager")
引入了 tungsten-sort
更快的基于排序的 shuffle, 一个新的 shuffle 管理器,该管理器通过一种新的缓存友好排序算法来增强现有的基于排序的 shuffle,该算法直接对二进制数据进行操作。此补丁的目标是在 shuffle 期间降低内存使用和 Java 对象开销,并加快排序。它还为后续补丁奠定了基础,这些补丁将使序列化记录的端到端处理成为可能。
val shortShuffleMgrNames = Map(
"hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager",
"sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager",
"tungsten-sort" -> "org.apache.spark.shuffle.unsafe.UnsafeShuffleManager")
两个 ShuffleManager 的合并
SortShuffleManager 和 UnsafeShuffleManager 之间有很多重复。鉴于这些现在提供相同的功能集,现在 UnsafeShuffleManager 支持大记录,我认为我们应该用 UnsafeShuffleManager 替换 SortShuffleManager 的序列化 shuffle 实现,并且应该将两个管理器合并在一起。
val shortShuffleMgrNames = Map(
"hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager",
"sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager",
"tungsten-sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager")
2016-04-19,现在是时候移除旧的 HashShuffleManager 了
val shortShuffleMgrNames = Map(
"sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName,
"tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName)
注:tungsten-sort
https://issues.apache.org/jira/browse/SPARK-7081
SparkEnv.scala
val shuffleMgrName = conf.get(config.SHUFFLE_MANAGER)
org.apache.spark.internal.config, 默认 sort shuffle
ShuffleManager
ShuffleManager 是 Shuffle 系统的可插拔接口。基于 spark.shuffle.manager 设置,在 Driver 和每一个 Executor 上的 SparkEnv 中创建。Driver 使用它注册 shuffle,Executor(或在 Executor 中本地运行的 task)可以通过他请求读取和写入数据。
结构如下:
registerShuffle:向管理器注册 shuffle,并获取一个句柄以将其传递给任务。
getWriter: 在给定的分区上获取 writer,由 executor 的 map task 调用
getReader:获取 reduce 分区范围(包括 startPartition 到 endPartition-1)的读取器,以便从 mapOutPut(包括 startMapIndex 到 endMapIndex-1)读取数据。
ShuffleHandle:shuffle 的 opaque handle(不透明句柄),所谓的 opaque handle 其角色类似于基类指针,隐藏实现细节,每个实现者需要提供自己的实现,由 ShuffleManager 用于将有关它的信息传递给 tasks。三种实现
关于句柄
ShuffleManager 的子类实现
org.apache.spark.shuffle.sort.SortShuffleManager 是 ShuffleManager 唯一的子类实现。
registerShuffle()
上面已经说过,registerShuffle 向管理器注册 shuffle,并获取一个句柄以将其传递给任务。在具体实现类中,会根据不同条件来返回三种不同的 ShuffleHandle,也就是对应着三种不同的 Shuffle
第一个条件:SortShuffleWriter.shouldBypassMergeSort(conf, dependency)
从 shouldBypassMergeSort 方法中可以清晰的看出,满足 BypassMergeSortShuffleHandle 有两个要求:
少于 spark.shuffle.sort.bypassMergeThreshold 分区
不需要 map-side 预聚合(如 groupByKey()算子)
返回 BypassMergeSortShuffleHandle,并且采用 BypassMergeSortShuffle,那么直接写入 numPartitions 文件,并在末尾将它们连接起来。这避免了为了合并溢出的文件而进行两次序列化和反序列化,这在正常的代码路径中会发生。缺点是一次打开多个文件,从而为缓冲区分配更多内存。
第二个条件:SortShuffleManager.canUseSerializedShuffle(dependency)
用于确定 shuffle 是否应使用优化的序列化 shuffle 路径或是否应回退到对反序列化对象进行操作的原始路径的辅助方法。
也就是说,如果同时满足以下三个条件:
使用的序列化器支持序列化对象的重定位(如 KryoSerializer)
shuffle 中没有 map-side 预聚合
分区数不大于 MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE 的值(最大分区 ID 号+1,即 2^24=16777216)
综上会返回 SerializedShuffleHandle,启用序列化 sort shuffle 机制(也就是 tungsten-sort),尝试以序列化形式缓冲映射输出,因为这样更有效。
第三个条件:BaseShuffleHandle
在不满足 bypass 和 tungsten-sort 的情况下,则返回 BaseShuffleHandle,采用基础的 sort shuffle,以反序列化形式输出的缓冲区映射
getWriter()
在给定分区上获取 writer,由 executor 上的 map task 调用
根据传入 handle 的不同类型,返回不同的 ShuffleWriter,而 handle 类型则正是上面 registerShuffle 返回的。
tungsten-sort(SerializedShuffle)对应的是 UnsafeShuffleWriter
BypassMergeSortShuffle 对应的是 BypassMergeSortShuffleWriter
BaseShuffleHandle 对应的是 SortShuffleWriter
getReader()
由 executor 上 reduce task 来调用,方法返回 ShuffleReader,首先在 mapOutputTracker 中获取要读取的数据地址(block 地址),再通过 BlockStoreShuffleReader 来读取。
版权声明: 本文为 InfoQ 作者【布兰特】的原创文章。
原文链接:【http://xie.infoq.cn/article/1537b12bcdc83053efa88c439】。文章转载请联系作者。
评论