写点什么

大数据开发之 Spark Shuffle 原理分析

  • 2022 年 1 月 06 日
  • 本文字数:14196 字

    阅读完需:约 47 分钟

​Shuffle 的产生

在 Spark 作业中当父 RDD 与子 RDD 的分区对应关系为多对多或者一对多的情况下会发生宽依赖,也即一个父 RDD 的分区需要分发到多个子 RDD 所在的任务中大数据培训去执行,这种情况就会涉及数据的重新分布,也即产生了 shuffle。


Spark 算子是否引入 shuffle 与各算子的具体实现有关,本质上是要看父子 RDD 的分区器的实现是否相同,例如:在执行聚合类算子 reduceByKey 时判断是否会引入 shuffle,需要分析父子 rdd 的分区器 partitioner 是否一致,如果不一致则创建一个 ShuffleRDD 作为子 RDD 从而产生 shuffle:


def combineByKeyWithClassTag[C](      createCombiner: V => C,      mergeValue: (C, V) => C,      mergeCombiners: (C, C) => C,      partitioner: Partitioner,      mapSideCombine: Boolean = true,      serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {    ...    if (self.partitioner == Some(partitioner)) {      self.mapPartitions(iter => {        val context = TaskContext.get()        new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))      }, preservesPartitioning = true)    } else {      new ShuffledRDD[K, V, C](self, partitioner)        .setSerializer(serializer)        .setAggregator(aggregator)        .setMapSideCombine(mapSideCombine)    }  }
复制代码


类似的再以 join 算子的实现为例,在 CoGroupedRDD 的 getDependencies 方法中遍历所有的父 rdd,如果父 rdd 和子 rdd 的分区器一致则创建 OneToOneDependency,否则创建 ShuffleDependency 并引入 shuffle:

override def getDependencies: Seq[Dependency[_]] = {  rdds.map { rdd: RDD[_] =>    if (rdd.partitioner == Some(part)) {      logDebug("Adding one-to-one dependency with " + rdd)      new OneToOneDependency(rdd)    } else {      logDebug("Adding shuffle dependency with " + rdd)      new ShuffleDependency[K, Any, CoGroupCombiner](        rdd.asInstanceOf[RDD[_ <: Product2[K, _]]], part, serializer)    }  }}
复制代码



Shuffle 文件的生成

我们首先来看 shuffle 的第一个阶段:Shuffle Write 阶段。通过 Spark 任务调度原理的讲解,我们知道在创建 taskset 的过程中,如果当前的 stage 是 ShuffleMapStage,则创建的任务类型为 ShuffleMapTask,否则 task 的类型为 ResultTask,两种类型的 Task 均实现了 runTask 方法;如果发生了 shuffle 则执行 ShuffleMapTask 实现的 runTask 方法,即根据 rdd、dependency、mapId 等信息调用 ShuffleWriteProcessor 的 write 方法执行 shuffle 数据的写入:

val rdd = rddAndDep._1val dep = rddAndDep._2// While we use the old shuffle fetch protocol, we use partitionId as mapId in the// ShuffleBlockId construction.val mapId = if (SparkEnv.get.conf.get(config.SHUFFLE_USE_OLD_FETCH_PROTOCOL)) {  partitionId} else context.taskAttemptId()dep.shuffleWriterProcessor.write(rdd, dep, mapId, context, partition)
复制代码


选择 ShuffleWriter

在 ShuffleWriteProcessor 的 write 方法中首先通过 ShuffleManager 获取 writer 实例,然后再由相应的 writer 执行具体的 write 逻辑:

def write(    rdd: RDD[_],    dep: ShuffleDependency[_, _, _],    mapId: Long,    context: TaskContext,    partition: Partition): MapStatus = {  var writer: ShuffleWriter[Any, Any] = null  try {    val manager = SparkEnv.get.shuffleManager    writer = manager.getWriter[Any, Any](      dep.shuffleHandle,      mapId,      context,      createMetricsReporter(context))    writer.write(      rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
复制代码


Spark 根据 ShuffleHandle 的不同采用相应的 ShuffleWriter 的实现,包括:UnsafeShuffleWriter、BypassMergeSortShuffleWriter 和 SortShuffleWriter 三种:


​override def getWriter[K, V](handle: ShuffleHandle,mapId: Long,context: TaskContext,metrics: ShuffleWriteMetricsReporter): ShuffleWriter[K, V] = {val mapTaskIds = taskIdMapsForShuffle.computeIfAbsent(handle.shuffleId, _ => new OpenHashSetLong)mapTaskIds.synchronized { mapTaskIds.add(mapId) }val env = SparkEnv.gethandle match {case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] =>new UnsafeShuffleWriter(...case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] =>new BypassMergeSortShuffleWriter(...case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] =>new SortShuffleWriter(other, mapId, context, shuffleExecutorComponents)}


而具体的 ShuffleHandle 的选择是根据 shuffle 算子实际的 partition 数、是否需要执行排序或者聚合等情况来确定的:

override def registerShuffle[K, V, C](    shuffleId: Int,    dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {  if (SortShuffleWriter.shouldBypassMergeSort(conf, dependency)) {    new BypassMergeSortShuffleHandle[K, V](      shuffleId, dependency.asInstanceOf[ShuffleDependency[K, V, V]])  } else if (SortShuffleManager.canUseSerializedShuffle(dependency)) {    new SerializedShuffleHandle[K, V](      shuffleId, dependency.asInstanceOf[ShuffleDependency[K, V, V]])  } else {    new BaseShuffleHandle(shuffleId, dependency)  }}
复制代码


下面分别介绍这三种 ShuffleHandle 的选择逻辑:


1、BypassMergeSortShuffleHandle


BypassMergeSortShuffleHandle 对应 BypassMergeSortShuffleWriter,当不需要做 map 端的聚合,并且分区数小于 SHUFFLE_SORT_BYPASS_MERGE_THRESHOLD(默认 200)时采用这种方式,可以跳过在内存中排序和聚合的过程:

if (dep.mapSideCombine) {  false} else {  val bypassMergeThreshold: Int = conf.get(config.SHUFFLE_SORT_BYPASS_MERGE_THRESHOLD)  dep.partitioner.numPartitions <= bypassMergeThreshold}
复制代码


BypassMergeSortShuffleWriter 不需要将 shuffle 记录先写入内存缓存结构中,而是根据数据的 key 值得到 reduce 分区,并创建对应的 DiskBlockObjectWriter 对象将数据记录直接写入到各分区对应的临时文件中;最后再将不同分区的临时文件合并生产 data 和 index 文件即可。


2、SerializedShuffleHandle,该方式使用了 tungsten 基于内存压缩的机制,缓解 shuffle 过程中的内存压力从而实现 shuffle 加速。


采用该方式需要满足三个条件:

  if (!dependency.serializer.supportsRelocationOfSerializedObjects) {    log.debug(s"Can't use serialized shuffle for shuffle $shufId because the serializer, " +      s"${dependency.serializer.getClass.getName}, does not support object relocation")    false  } else if (dependency.mapSideCombine) {    log.debug(s"Can't use serialized shuffle for shuffle $shufId because we need to do " +      s"map-side aggregation")    false  } else if (numPartitions > MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE) {    log.debug(s"Can't use serialized shuffle for shuffle $shufId because it has more than " +      s"$MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE partitions")    false  } else {    log.debug(s"Can use serialized shuffle for shuffle $shufId")    true  }

复制代码


1)dependency 的序列化器支持 relocation


如果用户程序中采用 DataFrame、DataSet 数据模型则等底层使用 SparkSQL 内核,当出现 shuffle 的情况下,优化器在制定物理计划会构建 ShuffleExchangeExec 节点,并采用 UnsafeRowSerializer,该序列化器的 supportsRelocationOfSerializedObjects 属性为 true,即支持对序列化对象进行排序;另外,如果用户指定使用 KryoSerializer 序列化器或者记录的 key 和 value 为原生数据类型或者 string 类型也采用 KryoSerializer 序列化器,此时 upportsRelocationOfSerializedObjects 属性为 true;否则使用默认的 JavaSerializer,该属性的值为 false。


2)不需要执行 map 端合并:


如果采用非聚合类算子例如 join 相关算子时对应 dependency 的 mapSideCombine 属性值为 false;如果采用聚合类算子如 reduceByKey、aggregateByKey、combineByKey 等 mapSideCombine 属性为 true;注意执行 groupByKey 算子时该属性也为 false:

def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope {  ...  val bufs = combineByKeyWithClassTag[CompactBuffer[V]](    createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
复制代码


3)shuffledependency 的 partition 数小于 MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE,即 16777215。


3、如果无法使用前两种 ShuffleHandle,则采用 BaseShuffleHandle,对应 ShuffleWriter 为 SortShuffleWriter。


综合以上分析,Spark 根据 ShuffleHandle 的不同而选择相应的 ShuffleWriter 的实现,接下来我们来详细阐述这三种 ShuffleWriter 中最为典型的实现方式 SortShuffleWriter 的执行原理,在后续的 Spark 内存管理的文章中我们将对 UnsafeShuffleWriter 以内存的角度进行阐述;而 BypassMergeSortShuffleWriter 则是 SortShuffleWriter 的特殊情况,即跳过了 map 排序和聚合部分。


SortShuffleWriter

SortShuffleWriter 通过 insertAll 方法首先将参与 shuffle 的数据写入到 shuffle 缓存列表中,当缓存列表的空间增大到无法继续写入时则将数据溢写到磁盘中。


Shuffle 缓存列表的实现有两种数据结构:如果参与 shuffle 的算子需要做聚合则将数据记录写入到数据结构 PartitionedAppendOnlyMap 中,该结构是一个 HashMap,key 为 partitionId 和记录的 key 值,并且每处理一个记录均会更新对应的 key 的 value 值;如果算子不需要做聚合则采用 PartitionedPairBuffer 的数据结构,并将记录的 key 和 value 顺序插入到 buffer 数组中:

if (shouldCombine) {  // Combine values in-memory first using our AppendOnlyMap  val mergeValue = aggregator.get.mergeValue  val createCombiner = aggregator.get.createCombiner  var kv: Product2[K, V] = null  val update = (hadValue: Boolean, oldValue: C) => {        if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)  }  while (records.hasNext) {    addElementsRead()    kv = records.next()    map.changeValue((getPartition(kv._1), kv._1), update) //更新hashmap中的value值    maybeSpillCollection(usingMap = true)  }} else {  // Stick values into our buffer  while (records.hasNext) {    addElementsRead()    val kv = records.next()    buffer.insert(getPartition(kv._1), kv._1, kv._2.asInstanceOf[C]) //将记录顺序插入到buffer中    maybeSpillCollection(usingMap = false)  }}
复制代码


由此可见,采用 PartitionedAppendOnlyMap 这种数据结构可以节约内存空间、减少磁盘溢写以及 shuffle 拉取的网络开销,这也是 reduceByKey 比 groupByKey 性能更好的原因;同时,也是为什么 shuffle 在没有聚合的情况下需要采用 tungsten 的存储方式来进一步提升执行性能。


每次写入记录之后都会判断是否需要将内存中的数据进行溢写,主要的判断逻辑是当 shuffle 缓存的数据量达到当前的阈值之后尝试扩容 shuffle 缓存列表,当扩容之后的空间仍然不足的情况下则开始执行溢写逻辑:


if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) {  // Claim up to double our current memory from the shuffle memory pool  val amountToRequest = 2 * currentMemory - myMemoryThreshold  val granted = acquireMemory(amountToRequest)  myMemoryThreshold += granted  // If we were granted too little memory to grow further (either tryToAcquire returned 0,  // or we already had more memory than myMemoryThreshold), spill the current collection  shouldSpill = currentMemory >= myMemoryThreshold}shouldSpill = shouldSpill || _elementsRead > numElementsForceSpillThreshold// Actually spillif (shouldSpill) {    _spillCount += 1  logSpillage(currentMemory)  spill(collection)     _elementsRead = 0  _memoryBytesSpilled += currentMemory  releaseMemory()}
复制代码


排序:

如果在 shuffle 依赖中指定了排序的顺序或者聚合算法则定义排序函数 keyComparator:

private def comparator: Option[Comparator[K]] = {  if (ordering.isDefined || aggregator.isDefined) {    Some(keyComparator)  } else {    None  }}
复制代码


在具有排序函数的情况下,PartitionedAppendOnlyMap 和 PartitionedPairBuffer 分别实现了 partitionedDestructiveSortedIterator 函数,对数据记录首先根据分区排序,然后再根据 key 进行排序:

/** * A comparator for (Int, K) pairs that orders them both by their partition ID and a key ordering. */def partitionKeyComparator[K](keyComparator: Comparator[K]): Comparator[(Int, K)] =  (a: (Int, K), b: (Int, K)) => {    val partitionDiff = a._1 - b._1    if (partitionDiff != 0) {      partitionDiff    } else {      keyComparator.compare(a._2, b._2)    }  }
复制代码


磁盘刷写:

通过前面的过程将需要溢写的数据在内存中排序并封装到一个迭代器对象 inMemoryIterator 中,然后再通过 ExternalSorter 调用 spillMemoryIteratorToDisk 方法将排序后的数据写到 IO 输出缓冲区,当达到输出缓冲区的容量上限(配置项:spark.shuffle.file.buffer,默认 32K) 或者记录的个数超过 SHUFFLE_SPILL_BATCH_SIZE 的值(配置项:spark.shuffle.spill.batchSize,默认 10000),则将数据 flush 到磁盘。因此如果一个作业 shuffle 溢写的数据量较大,可以适当调大相关配置参数从而减轻磁盘 IO 的性能开销:


​val (blockId, file) = diskBlockManager.createTempShuffleBlock() //创建临时溢写文件,并为其生成 blockIdval writer: DiskBlockObjectWriter =blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, spillMetrics) //构建 fileBufferSize 大小的缓冲区 writer...try {while (inMemoryIterator.hasNext) {val partitionId = inMemoryIterator.nextPartition()require(partitionId >= 0 && partitionId < numPartitions,s"partition Id: {numPartitions})")inMemoryIterator.writeNext(writer) //将对象的键值对写到缓冲区 elementsPerPartition(partitionId) += 1objectsWritten += 1


    if (objectsWritten == serializerBatchSize) {    //当对象数达到上限则flush到磁盘中      flush()    }
复制代码


...


将内存中的记录先进行排序并刷写到临时磁盘文件之后,再将该文件追加到 spills 列表中,spills 列表是一个 ArrayBuffer[SpilledFile]的数据结构,表示一个 task 所有的待合并文件的集合:

val spillFile = spillMemoryIteratorToDisk(inMemoryIterator)spills += spillFile
复制代码


合并溢写文件:

当所有需要 shuffle 的记录均处理完成并溢写之后,ExternalSorter 针对每个 map 分区调用 writePartitionedMapOutput 方法将溢写到磁盘的临时文件和以及内存中数据进行归并排序,并写入到一个 data 文件中,具体实现过程如下:


1.根据 shuffleId、mapId 创建该分区溢写文件合并后的 data 文件,文件名为:


name: String = "shuffle_" + shuffleId + "" + mapId + "" + reduceId + ".data"


2.首先针对每个 reduce 分区,遍历所有的临时溢写文件和内存中的记录,将属于该分区的记录根据 key 值进行聚合运算;如果需要排序,则先对记录进行归并排序再根据 key 值做聚合;最后生成一个(partitionId,partitionId 对应的记录列表) 的二元组迭代器。

private def merge(spills: Seq[SpilledFile], inMemory: Iterator[((Int, K), C)])      : Iterator[(Int, Iterator[Product2[K, C]])] = {    val readers = spills.map(new SpillReader(_))    val inMemBuffered = inMemory.buffered    (0 until numPartitions).iterator.map { p =>      val inMemIterator = new IteratorForPartition(p, inMemBuffered)      val iterators = readers.map(_.readNextPartition()) ++ Seq(inMemIterator)      if (aggregator.isDefined) {        // Perform partial aggregation across partitions        (p, mergeWithAggregation(          iterators, aggregator.get.mergeCombiners, keyComparator, ordering.isDefined))      } else if (ordering.isDefined) {        // No aggregator given, but we have an ordering (e.g. used by reduce tasks in sortByKey);        // sort the elements without trying to merge them        (p, mergeSort(iterators, ordering.get))      } else {        (p, iterators.iterator.flatten)      }    }  }

复制代码


3.遍历第 2 步中生成的二元组迭代器,依次为每个 reduce partitionId 创建一个 ShufflePartitionPairsWriter 对象,并将 partitionId 对应的所有记录的 key 和 value 值写入到在步骤 1 中创建的 data 文件中:


for ((id, elements) <- this.partitionedIterator) {  val blockId = ShuffleBlockId(shuffleId, mapId, id)  var partitionWriter: ShufflePartitionWriter = null  var partitionPairsWriter: ShufflePartitionPairsWriter = null  TryUtils.tryWithSafeFinally {    partitionWriter = mapOutputWriter.getPartitionWriter(id)      partitionPairsWriter = new ShufflePartitionPairsWriter(      partitionWriter,      serializerManager,      serInstance,      blockId,      context.taskMetrics().shuffleWriteMetrics)    if (elements.hasNext) {      for (elem <- elements) {        partitionPairsWriter.write(elem._1, elem._2)      }    }  } 
复制代码


需要说明的是,在将 shuffle 数据进行合并的过程还会累计各个 patition 对应数据所占用的存储空间的大小,这些信息采用 partitionLengths 数组进行记录,partitionLengths 数组是一个下标为 partitionId、值为对应分区的数据长度的长整型数组。


构建索引文件:

由于在创建的 data 文件的过程中还构建了 partitionLengths 数组,就可以方便的知道各分区的数据在 data 文件中的偏移量,以便于在 reduce 阶段快速检索 data 文件中的数据,避免了大量 shuffle 文件的全量扫描,从而提高 shuffle 读阶段的处理性能。接下来介绍为每个 data 文件构建索引文件的过程:


1.在 IndexShuffleBlockResolver 类的 writeIndexFileAndCommit 方法中,根据 shuffleId、mapId 即"shuffle_" + shuffleId + "" + mapId + "" + reduceId + ".index" 作为索引文件的名称,并且该文件名也对应存储系统中的 BlockId,然后通过对应 executor 的 DiskBlockManager 对象在 localDir(一般是 spark.local.dir 配置项)目录中创建一个 index 文件;


def getIndexFile(shuffleId: Int,mapId: Long,dirs: Option[Array[String]] = None): File = {val blockId = ShuffleIndexBlockId(shuffleId, mapId, NOOP_REDUCE_ID)dirs.map(ExecutorDiskUtils.getFile(_, blockManager.subDirsPerLocalDir, blockId.name)).getOrElse(blockManager.diskBlockManager.getFile(blockId))}


2.根据 partitionLengths 数组中的 length 值进行逐一累加计算,获得每个 reduce task 的数据在 data 文件中的起始偏移量 offset,并将其记录在 index 文件中,用于后续快速检索对应分区的 shuffle 数据:


var offset = 0Lout.writeLong(offset)for (length <- lengths) {offset += lengthout.writeLong(offset)}


Reduce 阶段的数据处理在 shuffle 的 Map 阶段也即 shuffle write 阶段完成了数据的溢写和合并,接下来进入 shuffle 的 Reduce 阶段也即 shuffle read 阶段。


我们知道,所有 RDD 都会执行其 compute 方法,在 ShuffleRDD 的 compute 方法中会初始化一个 reader 对象并调用其 read()方法并在实现了如上逻辑:


override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = {val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]val metrics = context.taskMetrics().createTempShuffleReadMetrics()SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, context, metrics).read().asInstanceOf[Iterator[(K, C)]]}


初始化 BlockStoreShuffleReader 对象在 reduce 阶段,SortShuffleManager 首先通过 MapOutputTracker 根据 shuffleId 从 mapStatuses 中获取 blocksByAddress 对象,该对象的数据结构为:Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])],表示一个 reduce 分区所需拉取的 shuffle 文件的存储信息(包括 BlockManagerId 以及 BlockId、字节数、mapId 的集合);接下来创建 BlockStoreShuffleReader 对象用于读取 blocksByAddress 中所指定的 shuffle 文件:


override def getReader[K, C](handle: ShuffleHandle,startMapIndex: Int,endMapIndex: Int,startPartition: Int,endPartition: Int,context: TaskContext,metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = {val blocksByAddress = SparkEnv.get.mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, startMapIndex, endMapIndex, startPartition, endPartition)new BlockStoreShuffleReader(handle.asInstanceOf[BaseShuffleHandle[K, _, C]], blocksByAddress, context, metrics,shouldBatchFetch = canUseBatchFetch(startPartition, endPartition, context))}


其中,BlockId 为 ShuffleBlockId 的实例,其编码方式为"shuffle_" + shuffleId + "" + mapId + "" + reduceId,block 所占字节数通过 MapStatus 获取:


for (part <- startPartition until endPartition) {val size = status.getSizeForBlock(part)if (size != 0) {splitsByAddress.getOrElseUpdate(status.location, ListBuffer()) +=((ShuffleBlockId(shuffleId, status.mapId, part), size, mapIndex))}}


执行 read()方法 BlockStoreShuffleReader 通过执行其 read()方法从本地获取或者通过网络从其他节点拉取 shuffle 数据,并对这些数据进行进一步处理,接下来我们来看一下 read()方法的具体实现:


1、获取 shuffle map 数据在 read()方法中首先构建了 ShuffleBlockFetcherIterator 实例,并通过 ShuffleBlockFetcherIterator 的 initialize()方法来实现 shuffle 记录的读取:


1)调用 partitionBlocksByFetchMode 方法根据 shuffle 文件的位置信息划分为不同的队列:


根据 blocksByAddress 中携带的 shuffle 文件的地址信息,如果 blockManager 对应的 executor 与当前 reduce 任务的 executor 一致,则将该的 blockManager 对应的 shuffle 文件存储信息放入 localBlocks 列表中;否则,如果 blockManager 所在的节点与当前 reduce 任务的节点一致,则将该 blockManager 对应的 shuffle 文件存储信息放到 hostLocalBlocks 列表中;否则 shuffle 文件信息存在于远程节点中,将对应的 shuffle 文件存储信息放到 fetchRequests 队列中:


for ((address, blockInfos) <- blocksByAddress) {//blockManager 对应的 executor 与当前 reduce 任务的 executor 一致 if (Seq(blockManager.blockManagerId.executorId, fallback).contains(address.executorId)) {checkBlockSizes(blockInfos)val mergedBlockInfos = mergeContinuousShuffleBlockIdsIfNeeded(blockInfos.map(info => FetchBlockInfo(info._1, info._2, info.3)), doBatchFetch)numBlocksToFetch += mergedBlockInfos.sizelocalBlocks ++= mergedBlockInfos.map(info => (info.blockId, info.mapIndex))localBlockBytes += mergedBlockInfos.map(.size).sum} else if (blockManager.hostLocalDirManager.isDefined &&address.host == blockManager.blockManagerId.host) { //blockManager 所在的节点与当前 reduce 任务的节点一致 checkBlockSizes(blockInfos)val mergedBlockInfos = mergeContinuousShuffleBlockIdsIfNeeded(blockInfos.map(info => FetchBlockInfo(info._1, info._2, info._3)), doBatchFetch)numBlocksToFetch += mergedBlockInfos.sizeval blocksForAddress =mergedBlockInfos.map(info => (info.blockId, info.size, info.mapIndex))hostLocalBlocksByExecutor += address -> blocksForAddresshostLocalBlocks ++= blocksForAddress.map(info => (info.1, info.3))hostLocalBlockBytes += mergedBlockInfos.map(.size).sum} else { //否则 shuffle 文件信息存在于远程节点中 remoteBlockBytes += blockInfos.map(._2).sumcollectFetchRequests(address, blockInfos, collectedRemoteRequests)}


值得注意的是,从远端拉取数据的情况下如果数据量太大容易导致网络阻塞,因此 spark 中通过 targetRemoteRequestSize 来限制 reduce task 每次远程拉取的数据量,如果超过该阈值则将当前的 block 封装为一个 FetchRequest 并放置到 collectedRemoteRequests 列表中作为后续数据拉取的一个基本单元:


if (curRequestSize >= targetRemoteRequestSize || mayExceedsMaxBlocks) {curBlocks = createFetchRequests(curBlocks, address, isLast = false,collectedRemoteRequests)...


private def createFetchRequests(curBlocks: Seq[FetchBlockInfo],address: BlockManagerId,isLast: Boolean,collectedRemoteRequests: ArrayBuffer[FetchRequest]): Seq[FetchBlockInfo] = {val mergedBlocks = mergeContinuousShuffleBlockIdsIfNeeded(curBlocks, doBatchFetch)numBlocksToFetch += mergedBlocks.sizevar retBlocks = Seq.empty[FetchBlockInfo]if (mergedBlocks.length <= maxBlocksInFlightPerAddress) {collectedRemoteRequests += createFetchRequest(mergedBlocks, address)} else {mergedBlocks.grouped(maxBlocksInFlightPerAddress).foreach { blocks =>if (blocks.length == maxBlocksInFlightPerAddress || isLast) {collectedRemoteRequests += createFetchRequest(blocks, address)...


其中,targetRemoteRequestSize 的值为 math.max(maxBytesInFlight / 5, 1L),maxBytesInFlight 通过配置项 SparkEnv.get.conf.get(config.REDUCER_MAX_SIZE_IN_FLIGHT) * 1024 * 1024(默认 48M)来指定;


如果网络带宽不是瓶颈并且需要拉取的 shuffle 数据量较大,则可以适当调大 REDUCER_MAX_SIZE_IN_FLIGHT 即配置项 spark.reducer.maxSizeInFlight 的值,反之亦然。


2)前面的步骤根据 shuffle 文件存储的位置不同得到了三个请求列表,接下来分别获取各个列表中的数据:


// Send out initial requests for blocks, up to our maxBytesInFlightfetchUpToMaxBytes() //跨节点拉取数据...// Get Local BlocksfetchLocalBlocks()...if (hostLocalBlocks.nonEmpty) {blockManager.hostLocalDirManager.foreach(fetchHostLocalBlocks)}


以获取远端的 shuffle 文件拉取为例,ShuffleBlockFetcherIterator 遍历 fetchRequests,首先得到各个 request 对应的 blockManager,然后向该 blockManager 发送数据拉取请求:


while (isRemoteBlockFetchable(fetchRequests)) {val request = fetchRequests.dequeue()val remoteAddress = request.address...send(remoteAddress, request)}}


实际的请求发送是在 NettyBlockTransferService 的 fetchBlocks 方法中实现的,首先创建 TransportClient 实例,然后由 OneForOneBlockFetcher 根据 TransportClient 实例、appId、executorId 等向数据所在的 BlockManager 发送拉取消息 FetchShuffleBlocks 并处理返回的结果:


@Overridepublic void fetchBlocks(String host,int port,String execId,String[] blockIds,BlockFetchingListener listener,DownloadFileManager downloadFileManager) {...RetryingBlockFetcher.BlockFetchStarter blockFetchStarter =(inputBlockId, inputListener) -> {// Unless this client is closed.if (clientFactory != null) {TransportClient client = clientFactory.createClient(host, port, maxRetries > 0);new OneForOneBlockFetcher(client, appId, execId,inputBlockId, inputListener, conf, downloadFileManager).start();


当对应 BlockManager 的 NettyBlockRpcServer 接收到 FetchShuffleBlocks 消息后,则根据 ShuffleBlockId 调用 BlockManager 的 getLocalBlockData 方法从本地的 shuffle 文件中读取所需的数据:


case fetchShuffleBlocks: FetchShuffleBlocks =>val blocks = fetchShuffleBlocks.mapIds.zipWithIndex.flatMap { case (mapId, index) =>if (!fetchShuffleBlocks.batchFetchEnabled) {fetchShuffleBlocks.reduceIds(index).map { reduceId =>blockManager.getLocalBlockData(ShuffleBlockId(fetchShuffleBlocks.shuffleId, mapId, reduceId))}


值得注意的是,在 getLocalBlockData 方法的实现代码中我们看到了前面提到的 IndexShuffleBlockResolver 的实例:


override def getLocalBlockData(blockId: BlockId): ManagedBuffer = {if (blockId.isShuffle) {logDebug(s"Getting local shuffle block ${blockId}")try {shuffleManager.shuffleBlockResolver.getBlockData(blockId)}


由于 IndexShuffleBlockResolver 对象在 shuffle map 阶段文件合并的过程中创建了 index 文件,在 reduce 阶段就可以根据 shuffleId、mapId 等信息得到具体的 index 文件,然后根据 reduceId 获取对应分区的数据长度值在 index 文件中的偏移量,快速地从 data 文件中定位到对应 partition 的数据:


override def getBlockData(blockId: BlockId,dirs: Option[Array[String]]): ManagedBuffer = {val (shuffleId, mapId, startReduceId, endReduceId) = blockId match {case id: ShuffleBlockId =>(id.shuffleId, id.mapId, id.reduceId, id.reduceId + 1)...val indexFile = getIndexFile(shuffleId, mapId, dirs)...val channel = Files.newByteChannel(indexFile.toPath)channel.position(startReduceId * 8L)val in = new DataInputStream(Channels.newInputStream(channel))try {val startOffset = in.readLong()channel.position(endReduceId * 8L)val endOffset = in.readLong()val actualPosition = channel.position()val expectedPosition = endReduceId * 8L + 8if (actualPosition != expectedPosition) {throw new Exception(s"SPARK-22982: Incorrect channel position after index file reads: " +s"expected actualPosition.")}new FileSegmentManagedBuffer(transportConf,getDataFile(shuffleId, mapId, dirs),startOffset,endOffset - startOffset)}


2、执行聚合如果指定了聚合函数则调用聚合器(Aggregator)的 combine CombinersByKey 方法在 reduce 端对数据进行聚合:


val aggregatedIter: Iterator[Product2[K, C]] = if (dep.aggregator.isDefined) {if (dep.mapSideCombine) {// We are reading values that are already combinedval combinedKeyValuesIterator = interruptibleIter.asInstanceOf[Iterator[(K, C)]]dep.aggregator.get.combineCombinersByKey(combinedKeyValuesIterator, context)} else {// We don't know the value type, but also don't care -- the dependency should// have made sure its compatible w/ this aggregator, which will convert the value// type to the combined type Cval keyValuesIterator = interruptibleIter.asInstanceOf[Iterator[(K, Nothing)]]dep.aggregator.get.combineValuesByKey(keyValuesIterator, context)}}


3、执行排序如果需要根据 key 做排序(例如 sortByKey 算子),则调用 ExternalSorter 的 insertAll 方法对数据进行缓存,当缓存空间无法扩容时则先在内存中排序然后执行溢写,这个过程和 map 阶段 insertAll 方法类似,reduce 阶段的输出又可以作为下一个 shuffle map 阶段或者是 action 的数据源:


// Sort the output if there is a sort ordering defined.val resultIter = dep.keyOrdering match {case Some(keyOrd: Ordering[K]) =>// Create an ExternalSorter to sort the data.val sorter =new ExternalSorter[K, C, C](context, ordering = Some(keyOrd), serializer = dep.serializer)sorter.insertAll(aggregatedIter)context.taskMetrics().incMemoryBytesSpilled(sorter.memoryBytesSpilled)context.taskMetrics().incDiskBytesSpilled(sorter.diskBytesSpilled)context.taskMetrics().incPeakExecutionMemory(sorter.peakMemoryUsedBytes)// Use completion callback to stop sorter if task was finished/cancelled.context.addTaskCompletionListener[Unit](_ => {sorter.stop()})CompletionIterator[Product2[K, C], Iterator[Product2[K, C]]](sorter.iterator, sorter.stop())


总结 Shuffle 的处理过程较为复杂,并且由于其排序、聚合、磁盘溢写以及数据分发等过程必然会产生 CPU、内存、磁盘 IO 以及网络通信等方面的性能开销,我们在实际的业务开发中需要尽量避免 shuffle,或者通过数据过滤和中间结果缓存等方式尽量减少 shuffle 带来的性能影响。


下面根据本文的内容,Shuffle 的原理主要总结如下:


1、Shuffle 的产生从根本上是由父子 RDD 的分区器是否一致决定的,分区器不同则必然产生任务之间的数据分发;


2、Shuffle 的过程主要分为 map 和 reduce 两个阶段,也即 shuffle write 和 shuffle read 阶段:


在 shuffle write 阶段,根据 ShuffleHandle 的不同,shuffle 写磁盘的过程将采用不同的 ShuffleWriter 的实现类,本文详细介绍了其中最经典的实现方式 SortShuffleWriter,该模式通过 PartitionedAppendOnlyMap 数据结构在 map 端将 key 值相同的数据聚合之后再进行排序和溢写,采用该结构可以减少数据记录占用的内存空间从而提升 shuffle 的执行性能;BypassMergeSortShuffleWriter 则是跳过了内存合并和排序的过程,直接将 shuffle 数据溢写到对应分区的临时文件中;而采用 UnsafeShuffleWriter 可以利用到 Tungsten 内存模式的红利,通过字节数组来组织数据记录,不仅减少了内存空间的占用,而且大幅减少了数据对象的创建从而减轻 JVM 的运行压力。


在 shuffle read 阶段,首先根据 shuffleId 从 mapStatuses 中得到对应的 MapStatus 列表,然后结合 reduceId 获得 reduce 任务对应的所有 shuffle 文件的存储信息,并根据文件所在的存储位置将 shuffle 记录分配到不同的列表中并分别执行数据拉取;如果定义了合并和排序算法则先在内存中进行合并和排序之后再溢写到磁盘中,否则直接将该分区的数据写入对应的磁盘文件中,并作为下一个 shuffle read 阶段或者 action 算子的输入。


作者:焦媛

用户头像

关注尚硅谷,轻松学IT 2021.11.23 加入

还未添加个人简介

评论

发布
暂无评论
大数据开发之Spark Shuffle 原理分析