Spark Shuffle 原理
什么是 Shuffle?
Shuffle 在 Spark 中代表 Stage 之间的数据分发,map task 和 reduce task 之间数据交换的过程。
为什么会产生 Shuffle?
本质上就是,要把 key 相同的数据放在一起。其实,计算过程之所以需要 Shuffle,往往是由计算逻辑、或者说业务逻辑决定的。
举个栗子:凌晨 0:00 ~ 3:00 之间,App 的访问量激增,这时候想看一下这个时间段内,哪个业务的量暴涨,所以我们可以通过这段时间内的 Nginx 日志,来统计一下各个 URI 接口的访问量,这三个小时内,产生了 100 个日志文件,一共 500G 的数据,在 Spark 分布式程序处理的过程中,要把分散在各个 Executor 上的数据,按照某种规则,将相同的 URL 分发到同一个 Executor 上,这样才可以完成分组聚合,所以 Shuffle 是某些场景下无法避免的存在。到这里,其实就是 WordCount 的思想。
何时产生 Shuffle?
回到 RDD 的概念中,Spark 在分布式计算中,RDD 与 RDD 之间的关系分为窄依赖和宽依赖。区别就是父 RDD 的分区和子 RDD 分区是一对一的关系还是一对多的关系。
宽依赖也叫 shuffle 依赖,所以说,发生宽依赖必然会发生 Shuffle 操作,也就是宽依赖算子必然会发生 Shuffle 操作。
Shuffle 是划分 Stage 的依据,两个 Stage 之间产生了 Shuffle,在逻辑上,需要划分开两个不同的 Stage,每个 Stage 中的算子进行串行。
宽依赖算子:
Distinct、reduceByKey、groupByKey、sortByKey、repartition、repartitionAndSortWithinPartitions、coalesce(shuffle=true)、join、cogroup 等。
Shuffle 的工作原理
屏蔽细节,先看整体,Shuffle 的几种类型后续单独来分析,此文只针对 Sort-Based Shuffle 来记录一些通用概念,Shuffle 处于两个 Stage 之间,也可以说是处于 map task 和 reduce task 之间,起到分发数据的作用。Shuffle 过程分为 Shuffle Write 和 Shuffle Read 两个过程。
Map 阶段
如果使用的算子是 reduceByKey、aggregateByKey、combineBykey 等算子,则 map 端会使用用户自定义的函数对每个节点本地的相同 key 进行 map 端预聚合。而 groupByKey 算子是不会进行 map 端预聚合。
这里问一句,为什么要 map 端预聚合?
为了减少 shuffle 数据量以及 reduce 端的压力
Reduce 阶段
将各个节点 read 过来的数据,在 reduce tasks 所在 Executor 节点上做全局聚合
Shuffle Write
总的来说,Shuffle Write 就是生成 Shuffle 中间文件的过程。逐条读入 map task 预聚合后的数据,按照数据分区方法(Hash 取模)计算每条数据应该分发到哪个 reduce task 中,并将写入到中间文件,持久化在磁盘上。
之所以要持久化,一方面是要减少内存存储空间压力,另一方面也是为了 fault-tolerance。Map 和 Reduce 过程之间还是需要通过磁盘来过渡中间态数据, 这些缓冲区减少了在创建中间 shuffle 文件时进行的磁盘查找和系统调用的次数
内存结构
在生成中间文件之前,先要经过一段内存缓冲区,根据是否有预聚合,决定采用 Map 还是 Buffer 数组,如果需要聚合,Spark 会借助一种类似于 Map 的数据结构 AppendOnlyMap,来计算、缓存并排序数据分区中的数据记录。这种 Map 结构的 Key 是(Partition ID, Key),而 Value 是原数据记录中的数据值
溢写
RDD 每个 partition 会不断的向内存缓冲 Map 或者 buffer 中插入数据,直到当前缓冲区集合大小超过阈值,并且申请不到更多的内存后,则将内存中的集合数据溢写到临时文件中,随后清空内存缓冲区,循环往复直到数据全部插入。
临时文件归并排序
在所有数据插入完成后,经过 n 轮的溢写,每个 map task 都会生成多个临时文件,以及内存中还有部分没有溢写的数据,为了减少文件数,会通过归并排序的方式对临时文件和内存中的数据进行合并,生成 shuffle_x_x.data 文件
关于 Shuffle Write,分为如下几个步骤:
1. 对于数据分区中的数据记录,逐一计算其目标分区,然后填充内存数据结构;
2. 当数据结构填满后,如果分区中还有未处理的数据记录,就对结构中的数据记录按(目标分区 ID,Key)排序,将所有数据溢出到临时文件,同时清空数据结构;
3. 重复前 2 个步骤,直到分区中所有的数据记录都被处理为止;
4. 对所有临时文件和内存数据结构中剩余的数据记录做归并排序,生成 .data 数据文件和 .index 索引文件。
Shuffle 中间文件
Map 阶段与 Reduce 阶段,通过生产与消费 Shuffle 中间文件的方式,来完成集群范围内的数据交换。换句话说,Map 阶段生产 Shuffle 中间文件,Reduce 阶段消费 Shuffle 中间文件,二者以中间文件为媒介,完成数据交换。
Shuffle 中间文件指的是 Shuffle Write 阶段生成的两类磁盘文件,一个是记录(Key,Value)键值对的 data 文件,另一个是记录键值对所属 Reduce Task 的 index 文件。下图中,shuffle data 文件是有序的,article ~ clickhouse 之间的数据对应的分区长度为 1845,也就是数据分区后的第 0 分区,这个区间则指定了某一个 Reduce Task 对应读取的数据区间。clickhouse~spark 区间数据长度为 1644,则对应 index 文件中末尾的 offset 为 3489。
产生中间文件,先写内存,再溢写磁盘。减少磁盘 IO,为了避免大量的分区文件占用大量的内存而导致 oom。 保存到磁盘可以保证数据的安全性。
Shuffle Read
对于所有 Map Task 生成的中间文件,Reduce Task 需要通过网络从不同节点的硬盘中下载并拉取属于自己的数据内容
什么时候 fetch?
Shuffle Read 何时进行 fetch,不妨从这个角度来看,Shuffle 是发生在父 Stage 与子 Stage 之间的,换个问法也就是子 Stage 何时读取父 Stage 的数据?一个 Stage 只有在它的父 stage 执行完之后才会执行,所以很直观的说,fetch 操作是在前一个 stage 的所有 ShuffleMapTasks 完成之后才开始的。
边 fetch 边处理还是一次性 fetch 完再处理?
fetch 和处理同时进行。在 MapReduce 中,shuffle 阶段获取数据,然后同时应用 combine() 逻辑。在 MapReduce 中,reducer 输入数据需要排序,因此在 shuffle-sort 过程之后应用 reduce() 逻辑。
由于 Spark 对 reducer 输入的数据不需要排序,所以我们不需要等到所有数据都拿到了才开始处理。那么 Spark 是如何实现 fetch 和处理同时进行的呢?其实类似于 Shuffle Map 预聚合,利用 Map 这样的数据结构来实现更新聚合。
fetch 来的数据存放到哪里?
Fetch 来的数据会先在 softBuffer 中缓冲。然后进行数据处理,并将其写入可配置的位置。如果 spark.shuffle.spilis 为 false,则写入位置仅为内存。一种特殊的数据结构,AppendOnlyMap,用于将这些处理后的数据保存在内存中。否则,处理后的数据将被写入内存和磁盘,使用 ExternalAppendOnlyMap。
当没有足够的可用内存时,这种数据结构可以将排序后的键值对溢出到磁盘上。同时使用内存和磁盘的一个关键问题是如何找到两者的平衡点。在 Hadoop 中,默认情况下 70%内存是为随机数据保留的。一旦使用了这部分内存的 66%,Hadoop 就会启动 merge-combine-spill 过程。在 Spark 中使用了类似的策略。
如何知道 fetch 数据的位置的呢?
Shuffle Write 完成后,会将中间数据生成位置,注册到 Driver 中, reduce task 则可以在 driver 中进行获取。
最后,现在默认的都是 Sort-based shuffle。Shuffle 的目的不是排序,单纯的 Shuffle,做不到全局有序。Sort-based 只是 shuffle 的一种实现方式~ Sort-based 实现方式至少有两个收益:
1)为后续可能的 Sort Merge Join 奠定基础
2)为后续可能的全局排序,奠定基础
参考:https://mallikarjuna_g.gitbooks.io/sparkinternals/content/shuffle-details.html
推荐:https://www.zhihu.com/question/23079001
评论