Spark RDD 模型

弹性分布式数据集
物理上:RDD 是一组记录的集合。 一个 RDD 可以分成多个分区, 每个分区是不可变的,分散在集群的各个节点。
逻辑上:RDD 是一个编程的数据抽象, 可以对它进行各种函数操作。 RDD 操作都是高阶函数,这些操作内部都是并发执行 , 由两种类型的操作: 转换和执行。
为什么会出现 RDD 这种抽象模型?
以当时的背景,提出了弹性分布式数据集 RDD,这个分布式内存抽象,让程序员以容错的方式在大型集群上进行内存计算。
RDD 是由两种当前计算框架:迭代算法和交互式数据挖掘工具处理效率低下提出的,当时的大部分框架对计算之间的数据复用的处理方式就是将中间数据写到一个靠稳定的系统中(比如分布式文件系统),这样会由于数据的复制备份,磁盘的 I/O 以及数据的序列化而致应用任务执行很费时间,所以 RDD 的提出,将数据保存在内存中都可以将性能提高一个数量级。
分布式存储器
Spark 的弹性分布式数据集可以被看做一个抽象的分布式共享存储器(DSM),这种概念已被广泛的研究。RDDS 是由 DSM 的两种不同的接口方式发展而来。首先,RDDS 提供了更加严格的编程模型,如果发生了集群节点故障,能够有效的重构相关的数据集。虽然一些 DSM 系统,可以通过检查点的方式来实现容错,但是 Spark 通过使用 RDD 对象获取其沿袭的父对象的信息来对丢失的 RDD 数据分块进行重建。这意味着,仅仅是丢失的数据分块需要被重新计算,并且他们可以在不同的节点上并行重新计算,而不需要将程序恢复到检查点。此外,如果没有节点失败,就不存在额外开销。第二,RDDS 将数据推送到 MapReduce 进行计算,而不是让任意的节点都去访问全局地址空间。
虽然还有其他的系统也提供了 DSM 编程模型,但是在性能、可靠性和可编程上都有很多的限制。Munin(Munin 也是一个 DSM 系统,允许共享内存,并且在多处理器上执行并行程序的系统)要求程序员们使用的变量与访问模式,都要选一个一致性的协议。而 Linda(一种分布式编程的语言,主要以网络计算机为基础)要求,使用元组空间的编程模型来实现容错。Thor(一种面向对象的数据库存储系统,设计为在一个异构的分布式环境下使用,提供了高度可靠且高可用的持久存储对象,并支持安全的共享这些对象)是通过提供了一个接口来持久化共享对象。
RDD 容错
设计 RDD 的主要挑战是定义一个可以有效提供容错的编程接口。集群上内存存储的现有抽象,例如分布式共享内存 [24]、键值存储 [25]、数据库和 Piccolo [27],提供了一个基于对可变状态的细粒度更新的接口。使用此接口,提供容错的唯一方法是跨机器复制数据或跨机器记录更新。这两种方法对于数据密集型工作负载来说都是昂贵的,因为它们需要通过集群网络复制大量数据,其带宽远低于 RAM 的带宽,并且会产生大量的存储开销。
与这些系统相反,RDDs 提供了基于粗粒度转换(比如 map,filter 以及 join)的接口,这些接口可以对多的数据条目应用相同的操作。这样就可以通过记录来生成某个数据集的一系列转换(就是这个数据集 lineage)而不是记录真实的数据来达到提供高效的容错机制。这个 RDD 就有足够的信息知道它是从哪 RDDs 转换计算来的,如果一个 RDD 的分区数据丢失掉了,那么重新计算这个 RDD 所依赖的那个 RDD 对应的区就行了。因此可以很快且不用通过复制备份方式来恢复丢失的数据。
RDD 的延时性
延时性指构建 DAG 的时候是不进行填充数据的,只有当调用的时候,才去进行数据填充
RDD 五个重要组成
- A list of partitions
- A function for computing each split
- A list of dependencies on other RDDs
- Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
- Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)
RDD 实现源码


RDD 为抽象类,下图为其中部分的实现

RDD 调试


Partitioner
在多个 rdd 之间进行类似于 cogroup 的操作,会使用相应的 Partitioner 来进行数据的重分区
如果设置了 spark.default.parallelism,我们将使用 SparkContext defaultParallelism 的值作为默认分区数,否则我们将使用上游 RDD 最大的分区数。
当可用时,我们选择具有最大分区数的 rdds 中的 Partitioner。如果这个 Partitioner 是符合条件的(rdds 中最大分区数的分区数),或者分区数大于或等于默认分区数——我们使用这个 Partitioner。
否则,我们将使用 HashPartitioner,并且分区数为 defaultParallelism。除非设置了 spark.default.parallelism,否则分区的数量将与最大的上游 RDD 中的分区数量相同,因为这应该是最不可能导致内存不足的错误。我们使用两个方法参数(rdd,others) 来强制调用者传递至少一个 rdd。

compute
RDD 的计算是惰性的,一系列转换操作只有在遇到 action 操作是才会去计算数据,而分区作为数据计算的基本单位。在计算链中,无论一个 RDD 有多么复杂,其最终都会调用内部的 compute 函数来计算一个分区的数据。
RDD 抽象类要求其所有子类都必须实现 compute 方法,该方法介绍的参数之一是一个 Partition 对象,目的是计算该分区中的数据。以 MapPartitionsRDD 类为例,其 compute 方法如下

MapPartitionsRDD 类的 compute 方法调用当前 RDD 内的第一个父 RDD 的 iterator 方法,该方法的目的是拉取父 RDD 对应分区的数据,iterator 方法会返回一个迭代器对象,迭代器内部存储的每一个元素即父 RDD 对应分区内的数据记录。
RDD 的粗粒度转换体现在 map 方法上,f 函数是 map 转换操作函数,RDD 会对一个分区(而不是一条一条数据记录)内的数据执行单的的操作 f,最终返回包含所有经过转换过的数据记录的新迭代器,即新的分区。

其他 RDD 子类的 compute 方法与之类似,在需要用用到父 RDD 的分区数据时,就会调用 iterator 方法,以数据流的方式,来对数据分块进行读取。然后根据需求在得到的数据上执行相应的操作。换句话说,compute 函数负责的是父 RDD 分区数据到子 RDD 分区数据的变换逻辑。
withScope


在调试代码的时候,调用的 textFile,以及转换操作 map 中,都会有 withScope 代码块。

它是用来做 DAG 可视化的(DAG visualization on SparkUI),以前的 sparkUI 中只有 stage 的执行情况,也就是说我们不可以看到上个 RDD 到下个 RDD 的具体信息。于是为了在 sparkUI 中能展示更多的信息。所以把所有创建的 RDD 的方法都包裹起来,同时用 RDDOperationScope 记录 RDD 的操作历史和关联,就能达成目标
RDD 宽依赖 &窄依赖
窄依赖:表示父亲 RDDs 的一个分区最多被子 RDDs 一个分区所依赖.
宽依赖:表示父亲 RDDs 的一个分区可以被子 RDDs 的多个子分区所依赖.

为什么要设计宽窄依赖?
第一, 窄依赖可以使得在集群中一个机器节点的执行流计算所有父亲的分区数据, 比如, 我们可以将每一个元素应用了 map 操作后紧接着应用 filter 操作, 与此相反, 宽依赖需要父亲 RDDs 的所有分区数据准备好并且利用类似于 MapReduce 的操作将数据在不同的节点之间进行重新洗牌和网络传输.也就是说按照宽窄依赖切分 stage, 同一 stage 里面的数据,可以在同一线程中运行。
第二, 窄依赖从一个失败节点中恢复是非常高效的, 因为只需要重新计算相对应的父亲的分区数据就可以, 而且这个重新计算是在不同的节点进行并行重计算的, 与此相反, 在一个含有宽依赖的血缘关系 RDDs 图中, 一个节点的失败可能导致一些分区数据的丢失, 但是我们需要重新计算父 RDD 的所有分区的数据.
版权声明: 本文为 InfoQ 作者【布兰特】的原创文章。
原文链接:【http://xie.infoq.cn/article/4adb0a501aed6e13f68d88668】。文章转载请联系作者。
评论