Spark 地基之 RDD
RDD 是 Spark 的基本数据抽象,相较于 Hadoop/MapReduce 的数据模型而言,各方面都有很大的提升。
数据可以尽可能地存在内存中,从而大大提高的数据处理的效率
分区存储,所以天然支持并行处理
存储了每一步骤计算结果之间的依赖关系,从而大大提升了数据容错性和错误恢复的正确率。
Spark 的开山之作《Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing》提出了 RDD 数据结构:
We present Resilient Distributed Datasets (RDDs), a distributed memory abstraction that lets programmers perform in-memory computations on large clusters in a fault-tolerant manner.
RDD 数据结构是一种分布式的内存抽象,可以容错方式在大型的集群中进行内存运算。
To achieve fault tolerance effificiently, RDDs provide a restricted form of shared memory, based on coarse-grained transformations rather than fifine-grained updates to shared state.
为了提供更有效的容错机制(Fault Tolerance),RDD 采用了粗粒度的(coarse-grained)转换,而不是细粒度(fine-grained)的更新一个可变状态。
在以往的设计中,会将内存进行集群抽象,比如分布式共享内存、键值存储(Redis)和数据库等,这种方式是细粒度更新一个可变状态,相应的容错方式也需要进行机器间的数据复制和日志传输,这会加大网络开销和机器负担。
而 RDD 则使用了粗粒度转换,即对于很多相同的数据项使用同一种操作(如 map、filter、join 等)。这种方式能够通过记录 RDD 之间的转换从而刻画 RDD 的继承关系(lineage),而不是真实的数据,最终构成一个 DAG(有向无环图),这种结构使得当发生 RDD 丢失时,能够利用上下图中的信息从其祖辈 RDD 中重新计算得到。
RDD 定义
an RDD is a read-only, partitioned collection of records
在 RDD 的论文中,对于 RDD 给出的定义:RDD 是一组只读的,可分区的数据集。
RDD 具有如下特性:
分区
分区代表同一个 RDD 包含的数据被存储在系统的不同节点中,这也是它可以被并行处理的前提。
逻辑上,可以认为 RDD 是一个大的数组。数组中的每个元素代表一个分区(Partition)。
在物理存储中,每个分区指向一个存放在内存或者硬盘中的数据块(Block),而这些数据块是独立的,它们可以被存放在系统中的不同节点。
RDD 只是抽象意义的数据集合。
RDD 中的每个分区存有它在该 RDD 中的 index。通过 RDD 的 ID 和分区的 index 可以唯一确定对应数据块的编号,从而通过底层存储层的接口中提取到数据进行处理。
在集群中,各个节点上的数据块会尽可能地存放在内存中,只有当内存没有空间时才会存入硬盘,最大化地减少硬盘读写的开销。
不可变
每个 RDD 分区都是只读的,其内部包含的分区信息是不可更改的,创建 RDD 只能通过如下两种方式:
读取存储设备中的数据
从一个 RDD 经过 Transformation 变换得到
这样的属性使得:
提升了 Spark 的计算效率,更易于错误恢复。对于代表中间结果的 RDD,我们需要记录它是通过哪个 RDD 进行哪些转换操作得来,即依赖关系,而不用立刻去具体存储计算出的数据本身。
系统启动一个新的任务(Backup Task)可以对执行慢的任务备份运行,类似于 MapReduce 的推测执行。
虽然 RDD 是不可变的,但是允许用户修改两方面的属性:
持久化(persistence):用户可以指定数据的存储策略,例如,数据存储在内存、磁盘等。程序员可以通过
persist
接口指定将来可能会重复使用的 RDD,默认存储在内存中,也可以在内存不足时,存放在磁盘中。也可以指定存储优先级来指定哪部分内存中的数据优先刷新到磁盘。分区(partitioning):用户可以修改分区数量,实现修改并行计算单元的划分结构,这对于数据存放位置优化十分有用,例如,确保两个 dataset 以相同的 Hash 方式进行 Hash 分区,从而进行
join
。
并行操作
RDD 的分区特性使得它天然支持并发操作,可以在不同的节点的数据分别进行处理产生新的 RDD。
RDD 数据结构
其中,SparkContext 是所有 Spark 功能的入口,它代表了与 Spark 节点的连接,可以用来创建 RDD 对象以及在节点中的广播变量等。一个线程只有一个 SparkContext。SparkConf 则是一些参数配置信息。
RDD 数据结构公开 5 部分的公共接口:
partitions()
preferredLocations(p)
:分区优先存放的节点位置。dependencies()
:依赖列表iterator(p, parentIters)
:Compute the elements of partition p given iterators for its parent partitions(不理解,这部分应该是应该是基于分区进行数据计算的)。partitioner()
:表明 RDD 分区方式是 Hash 或 Range。
内存管理
Spark 提供了三种对 RDD 持久化的管理方式:
存储在内存中反序列化的 Java 对象。该实现性能最高,Java VM 可以最快捷地访问相应的 RDD。
存储在内存中序列化的数据。在内存空间有限的情况下,以低性能换取用户可以使用比 Java 对象图更加有效的内存表示。
存储在磁盘中。该实现对于很大的 RDD 十分有用,保存在内存中,存在数据丢失的风险,进而导致高昂的重新计算的代价。
LRU 逐出策略
为了管理有限的可用内存,我们在 RDD 级别使用 LRU 逐出策略。当计算了一个新的 RDD 分区,但没有足够的空间存储,如果最近访问最少的 RDD 分区和这个刚计算出的新 RDD 分区不在同一个 RDD 中,将会从最近访问最少的 RDD 中逐出一个分区。否则,会将旧分区保存在内存中,来防止频繁地从相同的 RDD 中进出,因为大多数操作都运行在整个 RDD 上,导致在未来有极大可能需要使用已经在内存中的旧分区。同时 Spark 还为每个 RDD 为用户提供了设置“持久化优先级”的进一步控制选项。
关于 Spark 内存管理的进一步优化,在 RDD 的论文中,指出目前集群上的每个 Spark 实例目前都有自己单独的内存空间,之后计划研究通过统一的内存管理器跨 Spark 实例共享 RDD。
RDD 依赖关系
从抽象的角度看,RDD 间存在着血统继承关系,而真正实现时,其本质是 RDD 间依(Dependency)关系。依赖关系是 RDD 重要的组件,它记录了从哪个 RDD 经过哪个转换得到新的 RDD,使得 Spark 不需要对中间结果进行复制以防止数据丢失的目的。
从图的角度看,RDD 为节点,在一次转换操作中,创建得到的新 RDD 称为子 RDD,同时会产生新的边,即依赖关系,子 RDD 依赖向上依赖的 RDD 便是父 RDD,可能会存在多个父 RDD。我们可以将这种依赖关系进一步分为两类,分别是窄依赖(Narrow Dependency)和宽依赖(Wide Dependency),也称之为 Shuffle 依赖。
narrow dependencies, where each partition of the parent RDD is used by at most one partition of the child RDD, wide dependencies, where multiple child partitions may depend on it. For example, map leads to a narrow dependency, while join leads to to wide dependencies (unless the parents are hash-partitioned).
窄依赖就是父 RDD 的分区可以一一对应到子 RDD 的分区,例如map
,宽依赖就是父 RDD 的每个分区可以被多个子 RDD 的分区使用,例如union
。
为什么将将依赖关系划分为窄依赖和宽依赖?
窄依赖可以在一个节点上对多个分区以 Pipeline 方式计算所有的父分区,例如,可以在执行
map
操作后,紧接着进行filter
。而宽依赖要求先计算好所有的父分区的数据,保证所有的父分区数据都是可用的,并且还可能需要执行类似于 MapReduce 的操作在节点间进行数据的混洗(Shuffle)。从节点故障后恢复的角度,对于窄依赖,只需要对丢失的父分区重新计算,并且可以在不同的节点并行地重新计算;而对于宽依赖,涉及到 RDD 各级的多个父分区,单个失败的节点可能会导致 RDD 所有祖先的某些分区的丢失,需要完全重新执行。
窄依赖
当子 RDD 分区依赖的父 RDD 分区不被其他的子 RDD 分区依赖,就是窄依赖。
宽依赖
父 RDD 分区被多个子 RDD 分区依赖,就是宽依赖。
任务调度
Spark 调度器和 Dryad 类似,不过综合考虑了缓存在内存中的 RDD 分区。当执行 Action 操作时,调度器检查 RDD 的继承关系图(Lineage)以构建 Stage 的 DAG 来执行。每个 Stage 内部尽可能多地包含一组具有窄依赖关系的转换,并将它们流水线并行化。Stage 的边界有两种情况:
宽依赖上的 Shuffle 操作
已完成计算的分区来缩短父 RDD 的计算过程
调度器另外加载一组任务计算每个 Stage 中丢失的分区,直到完成目标 RDD。
调度器给机器分配任务采用基于数据位置的延迟调度(Delay Scheduling)策略。
如果要处理的分区位于节点内存中,那么将任务分配给该节点。
否则,如果要处理的分区含有 Preferred Location 例如 HDFS 文件,那么将任务分配给这组节点。
对于宽依赖,在拥有父 RDD 分区的节点上将中间结果物化,来简化容错处理,这一点的处理方式和 MapReduce 物化 map 处理输出类似。
如果某个任务失败,只要 Stage 中父 RDD 分区依然可用,只需要在另外一个节点重新运行,如果某些 Stage 不可用(例如,Shuffle 时某个 map 输出丢失),重新提交任务来并行地对丢失分区计算。在原论文中,尽管只需要只需要直接备份 RDD 继承图,Spark 无法接受调度器失效。
CheckPoint
RDD 的 Lineage 可以用于故障恢复,但是对于 Lineage 链很长的 RDD 来说,数据恢复需要花费很长的时间。对一些 RDD 设置检查点很有用。一般而言,将一些 Lineage 链很长、包含宽依赖的 RDD 设置检查点十分有用。Spark 为 RDD 提供了设置检查点的 API。
参考资料
Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing
《Spark Streaming 实时流式大数据处理实战》
极客时间蔡元楠的《大规模数据处理实战》课程
版权声明: 本文为 InfoQ 作者【正向成长】的原创文章。
原文链接:【http://xie.infoq.cn/article/8b5ce3e929588eb85eb6f5013】。
本文遵守【CC-BY 4.0】协议,转载请保留原文出处及本版权声明。
评论