写点什么

Spark ShuffleManager

作者:布兰特
  • 2022 年 6 月 10 日
  • 本文字数:4529 字

    阅读完需:约 15 分钟

Spark ShuffleManager

SparkEnv 中 Shuffle 的初始化

// Let the user specify short names for shuffle managers// 让用户指定shuffle managers的简称val shortShuffleMgrNames = Map(      "sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName,      "tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName)    val shuffleMgrName = conf.get(config.SHUFFLE_MANAGER)    val shuffleMgrClass =      shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase(Locale.ROOT), shuffleMgrName    val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
复制代码


ShuffleManager 的几次变更


val shortShuffleMgrNames = Map(

      "hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager",

      "sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager")


引入了 tungsten-sort

更快的基于排序的 shuffle, 一个新的 shuffle 管理器,该管理器通过一种新的缓存友好排序算法来增强现有的基于排序的 shuffle,该算法直接对二进制数据进行操作。此补丁的目标是在 shuffle 期间降低内存使用和 Java 对象开销,并加快排序。它还为后续补丁奠定了基础,这些补丁将使序列化记录的端到端处理成为可能。

val shortShuffleMgrNames = Map(

      "hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager",

      "sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager",

      "tungsten-sort" -> "org.apache.spark.shuffle.unsafe.UnsafeShuffleManager")


两个 ShuffleManager 的合并

SortShuffleManager 和 UnsafeShuffleManager 之间有很多重复。鉴于这些现在提供相同的功能集,现在 UnsafeShuffleManager 支持大记录,我认为我们应该用 UnsafeShuffleManager 替换 SortShuffleManager 的序列化 shuffle 实现,并且应该将两个管理器合并在一起。

val shortShuffleMgrNames = Map(

      "hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager",

      "sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager",

      "tungsten-sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager")


2016-04-19,现在是时候移除旧的 HashShuffleManager 了

val shortShuffleMgrNames = Map(

      "sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName,

      "tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName)


注:tungsten-sort

https://issues.apache.org/jira/browse/SPARK-7081


SparkEnv.scala

val shuffleMgrName = conf.get(config.SHUFFLE_MANAGER)


org.apache.spark.internal.config, 默认 sort shuffle

private[spark] val SHUFFLE_MANAGER =    ConfigBuilder("spark.shuffle.manager")      .version("1.1.0")      .stringConf      .createWithDefault("sort")
复制代码


ShuffleManager

ShuffleManager 是 Shuffle 系统的可插拔接口。基于 spark.shuffle.manager 设置,在 Driver 和每一个 Executor 上的 SparkEnv 中创建。Driver 使用它注册 shuffle,Executor(或在 Executor 中本地运行的 task)可以通过他请求读取和写入数据。


结构如下:


  • registerShuffle:向管理器注册 shuffle,并获取一个句柄以将其传递给任务。

  • getWriter: 在给定的分区上获取 writer,由 executor 的 map task 调用

  • getReader:获取 reduce 分区范围(包括 startPartition 到 endPartition-1)的读取器,以便从 mapOutPut(包括 startMapIndex 到 endMapIndex-1)读取数据。

  • ShuffleHandle:shuffle 的 opaque handle(不透明句柄),所谓的 opaque handle 其角色类似于基类指针,隐藏实现细节,每个实现者需要提供自己的实现,由 ShuffleManager 用于将有关它的信息传递给 tasks。三种实现



关于句柄

https://zhuanlan.zhihu.com/p/361975054


ShuffleManager 的子类实现

org.apache.spark.shuffle.sort.SortShuffleManager 是 ShuffleManager 唯一的子类实现。

registerShuffle()

上面已经说过,registerShuffle 向管理器注册 shuffle,并获取一个句柄以将其传递给任务。在具体实现类中,会根据不同条件来返回三种不同的 ShuffleHandle,也就是对应着三种不同的 Shuffle


override def registerShuffle[K, V, C](      shuffleId: Int,      dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {    if (SortShuffleWriter.shouldBypassMergeSort(conf, dependency)) {      new BypassMergeSortShuffleHandle[K, V](        shuffleId, dependency.asInstanceOf[ShuffleDependency[K, V, V]])    } else if (SortShuffleManager.canUseSerializedShuffle(dependency)) {      new SerializedShuffleHandle[K, V](        shuffleId, dependency.asInstanceOf[ShuffleDependency[K, V, V]])    } else {      new BaseShuffleHandle(shuffleId, dependency)    }  }
复制代码


第一个条件:SortShuffleWriter.shouldBypassMergeSort(conf, dependency)

private[spark] object SortShuffleWriter {  def shouldBypassMergeSort(conf: SparkConf, dep: ShuffleDependency[_, _, _]): Boolean = {    // We cannot bypass sorting if we need to do map-side aggregation.    if (dep.mapSideCombine) {      false    } else {      val bypassMergeThreshold: Int = conf.get(config.SHUFFLE_SORT_BYPASS_MERGE_THRESHOLD)      dep.partitioner.numPartitions <= bypassMergeThreshold    }  }}
复制代码


从 shouldBypassMergeSort 方法中可以清晰的看出,满足 BypassMergeSortShuffleHandle 有两个要求:

  • 少于 spark.shuffle.sort.bypassMergeThreshold 分区

  • 不需要 map-side 预聚合(如 groupByKey()算子)


返回 BypassMergeSortShuffleHandle,并且采用 BypassMergeSortShuffle,那么直接写入 numPartitions 文件,并在末尾将它们连接起来。这避免了为了合并溢出的文件而进行两次序列化和反序列化,这在正常的代码路径中会发生。缺点是一次打开多个文件,从而为缓冲区分配更多内存。

第二个条件:SortShuffleManager.canUseSerializedShuffle(dependency)

def canUseSerializedShuffle(dependency: ShuffleDependency[_, _, _]): Boolean = {    val shufId = dependency.shuffleId    val numPartitions = dependency.partitioner.numPartitions    if (!dependency.serializer.supportsRelocationOfSerializedObjects) {      log.debug(s"because does not support object relocation")      false    } else if (dependency.mapSideCombine) {      log.debug(s"because we need to do map-side aggregation")      false    } else if (numPartitions > MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE) {      log.debug(s"because it has more than $MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE partitions")      false    } else {      log.debug(s"Can use serialized shuffle for shuffle $shufId")      true    } }
复制代码


用于确定 shuffle 是否应使用优化的序列化 shuffle 路径或是否应回退到对反序列化对象进行操作的原始路径的辅助方法。

也就是说,如果同时满足以下三个条件:

  • 使用的序列化器支持序列化对象的重定位(如 KryoSerializer)

  • shuffle 中没有 map-side 预聚合

  • 分区数不大于 MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE 的值(最大分区 ID 号+1,即 2^24=16777216)

综上会返回 SerializedShuffleHandle,启用序列化 sort shuffle 机制(也就是 tungsten-sort),尝试以序列化形式缓冲映射输出,因为这样更有效。


第三个条件:BaseShuffleHandle

在不满足 bypass 和 tungsten-sort 的情况下,则返回 BaseShuffleHandle,采用基础的 sort shuffle,以反序列化形式输出的缓冲区映射


getWriter()

在给定分区上获取 writer,由 executor 上的 map task 调用

override def getWriter[K, V](      handle: ShuffleHandle,      mapId: Long,      context: TaskContext,      metrics: ShuffleWriteMetricsReporter): ShuffleWriter[K, V] = {    val mapTaskIds = taskIdMapsForShuffle.computeIfAbsent(      handle.shuffleId, _ => new OpenHashSet[Long](16))    mapTaskIds.synchronized { mapTaskIds.add(context.taskAttemptId()) }    val env = SparkEnv.get    handle match {      case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] =>        new UnsafeShuffleWriter(          env.blockManager,          context.taskMemoryManager(),          unsafeShuffleHandle,          mapId,          context,          env.conf,          metrics,          shuffleExecutorComponents)      case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] =>        new BypassMergeSortShuffleWriter(          env.blockManager,          bypassMergeSortHandle,          mapId,          env.conf,          metrics,          shuffleExecutorComponents)      case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] =>        new SortShuffleWriter(          shuffleBlockResolver, other, mapId, context, shuffleExecutorComponents)    }}
复制代码


根据传入 handle 的不同类型,返回不同的 ShuffleWriter,而 handle 类型则正是上面 registerShuffle 返回的。

  • tungsten-sort(SerializedShuffle)对应的是 UnsafeShuffleWriter

  • BypassMergeSortShuffle 对应的是 BypassMergeSortShuffleWriter

  • BaseShuffleHandle 对应的是 SortShuffleWriter


getReader()

由 executor 上 reduce task 来调用,方法返回 ShuffleReader,首先在 mapOutputTracker 中获取要读取的数据地址(block 地址),再通过 BlockStoreShuffleReader 来读取。

  override def getReader[K, C](      handle: ShuffleHandle,      startMapIndex: Int,      endMapIndex: Int,      startPartition: Int,      endPartition: Int,      context: TaskContext,      metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = {    val blocksByAddress = SparkEnv.get.mapOutputTracker.getMapSizesByExecutorId(      handle.shuffleId, startMapIndex, endMapIndex, startPartition, endPartition)    new BlockStoreShuffleReader(      handle.asInstanceOf[BaseShuffleHandle[K, _, C]], blocksByAddress, context, metrics,      shouldBatchFetch = canUseBatchFetch(startPartition, endPartition, context))    }
复制代码


发布于: 刚刚阅读数: 3
用户头像

布兰特

关注

大数据研发工程师 2019.05.10 加入

while true{ 🍴😪💻 }

评论

发布
暂无评论
Spark ShuffleManager_大数据_布兰特_InfoQ写作社区