Spark RDD 分区数与分区器源码解析
Spark 从 2014 年诞生时的“星星之火”到如今的“燎原之势”,仅仅用了八年时间,其发展速度之快,以及受欢迎程度之高,由此可见一斑。现如今 Spark 已经得到了几乎所有大数据企业的认可,而这些企业也迅速将自己的产品与 Spark 进行了紧密地集成。所以,作为现在最热门的几大分布式大数据计算引擎之一,Spark 几乎是大数据工程师的必修课,而 RDD 作为 Spark 框架的灵魂所在,也是我们所必须熟悉并掌握的,今天就通过本文一起来了解下 Spark RDD 的重要属性之分区器的源码级解析吧。
一、Spark 基础知识补充介绍
1. Spark RDD 的简介
RDD 叫做弹性分布式数据集合,是 Spark 中最基本的数据抽象,它代表一个不可变、只读的,被分区的数据集。操作 RDD 就像操作本地集合一样,有很多的方法可以调用,使用方便,而且无需关心底层的调度细节。也就是相当于将每一步处理完的数据进行封装,让开发者只需要关注数据的处理过程以及计算逻辑,而不需要去考虑其底层该如何实现。
2. Spark 中算子类型
spark 中的算子,分为两大类型:
transformation 算子即转换算子,如:map / flatMap / filter 等
action 算子即行动算子,如:collect / take / saveAsTextFile 等
transformation 类型算子操作都是懒执行的(lazy),也就是说 transformation 操作并不会立即计算他们的结果,而是记录了这个转换和计算逻辑,然后返回一个新的 RDD;
只有当通过调用一个 action 算子来获取结果返回给驱动程序的时候这些 transformation 类型算子的操作才开始计算,然后返回具体的计算结果数据。这种设计可以使 Spark 运行起来更加的高效。
3. Spark RDD 的五大属性
(1)compute 计算函数(描述本 RDD 的数据该如何计算出来)
在源码中的定义:f:(TaskContext, Int, Iterator[T]) => Iterator[U]
一个用来对每个 partition(分区)数据进行逻辑运算的函数;(本质上就是运算逻辑的迭代器)
每个 partition(分区)之间是并行的,所以计算逻辑是相同的;
(2)依赖 RDD 列表
在源码中的定义:var deps: Seq[Dependency[_]
存储着当前 RDD 所依赖的一个或多个前序 RDD
( 3)分区列表
在源码中的定义:private var partitions_ : Array[Partition]
Spark 是基于海量数据分布式计算的场景设计的;
会把数据分给多个 task 并行计算,因此,也就需要把数据划分成多个任务片(partition)
处理 HDFS 上的文件时,会按文件及偏移量范围划分 partition,通常一个 hdfs 的 block 块对应一个 partition,比如:
PARTITION_0:/inputdata/aaa.txt ,0~128M
PARTITION_1:/inputdata/aaa.txt,128M~200M
处理 MySQL 等数据库中的数据时,会按照用户指定的某个字段的取值范围和指定的分区数进行 partition 划分,比如:
PARTITION_0: from xdb.table_a where id < 100000
PARTITION_1: from xdb.table_a where id >=100000 and id<200000
PARTITION_2: from xdb.table_a where id >=200000
(4)[可选]RDD 的分区器
在源码中的定义:val partitioner: Option[Partitioner]
直接决定了 RDD 中分区的个数,RDD 中每条数据经过 Shuffle 过程属于哪个分区以及 Reduce 的个数。只有 Key-Value 类型的 RDD 才有分区的,非 Key-Value 类型的 RDD 分区的值是 None 的。
(5)[可选] 每个分区的首选计算执行位置
在源码中的定义:
val partitioner: Option[Partitioner]finaldef preferredLocations(split: Partition): Seq[String]
Spark 在任务调度时,会优先选择在同一个节点机器的数据传输到同一个节点,也就是尽量使用本地传输计算。即子 RDD 会优先取在同一台机器的父 RDD 中的数据进行计算,所以才会设计这个属性。
二、Spark RDD 分区(Partition)与分区器(Partitioner)
分区 Partition :分区代表 RDD 数据集的一部分,也是五大属性之一;每个分区的数据都需要一个 task 并行实例来计算;RDD 有几个分区,则在计算数据时就需要几个 task 并行度。其实分布式计算,基本上离不开任务划分,任务划分就离不开数据分片(只不过有些框架中称之为 Split,有些框架中称之为 Partition);
分区器 Partitioner :描述的是 RDD 的数据在各个分区之间的分布规则,比如上游数据 shuffle 到下游时,分区器就决定了上游的哪些数据需要进入下游对应的哪些分区,但是只有 kv 类型 RDD 才有分区器,其它 RDD 分区器都为 none。Spark 中实现的分区器有两种,HashPartitioner(哈希分区器)和 RangePartitioner(范围分区器),其中最常用的分区器是 HashPartitioner。
1. RDD 的 partitions 数是如何决定的
RDD 的分区数一般分为三种情况,数据源 RDD 分区数,窄依赖算子 RDD 分区数,宽依赖算子 RDD 分区数。
(1)数据源 RDD 的分区数,由数据源的读取器决定
①如果数据源为读取文件类型(可以是本地文件或 HDFS):即 sc.textFile 产生的 rdd,分区数是由 TextInputFormat.getInputSplits()方法决定的;
getInputSplits()方法的分区逻辑:挨个遍历输入目录下的文件,默认按 blocksize 进行切片,分区的数量一般等于块的数量,且分区的数量至少是 2 个。
②如果数据源为数据库类型,即通过 jdbc 的方式从各种数据库读取数据,这时会根据读取数据总量的上界和下界进行划分分区,例如:
from db.tablewhere id > 10000 and id < 20000;
③如果数据源 RDD 为集合类型,那么分区数首先会判断是否配置运行时参数 spark.default.parallelism,如果配置了则取该参数作为分区数,如果未配置,则参考运行时能用的 cpu 核数;
(2)后续的窄依赖子 RDD(map/flatmap/mappartitions/filter/mapvalues),分区数是一路传承不改变的;
特例:val rdd2 = rdd1.coalesce(2,false)
rdd2 中的一个分区,映射了 rdd1 的多个固定的分区;
比如 rdd2 的 p0 映射了 rdd1 的 p0,p3,p4
比如 rdd2 的 p1 映射了 rdd1 的 p1,p2
(3)后续宽依赖子 RDD,分区数是由 shuffle 算子传入“分区数参数”来决定的,例如下列示例中参数“4”就是指定的分区数:
reduceByKey(f,4) /join(rdd2,4)
如果没有传入“分区数参数”,该如何决定分区数呢?
策略:
①优先把 spark.default.parallelism 参数值作为默认分区数
②如果参数没配,则参考运行时能用的 cpu 核数;
local 运行模式下:
getInt("spark.default.parallelism",totalCores)
集群运行模式:
getInt("spark.default.parallelism",math.max(totalCoreCount.get(), 2)
2. RDD 的 Partitioner 是如何决定的
(1) 源头 RDD 的分区数是由数据源的读取器机制内部决定的,而且通常没有分区器;
(2) 窄依赖子 RDD 分区器通常为 None,也可以选择让它保留父 RDD 的分区器;
(3) 宽依赖子 RDD(ShuffledRDD)的分区数、分区器(因为宽依赖在 shuffle 的时候需要把上游分区中的数据混洗,然后分配到下游的多个分区,就需要一个分区规则 Partitioner 来决定一条数据究竟该放到下游的哪一个分区):
①可以由算子传入:reduceByKey(_+ _ ,4),分区器就是 HashPartitioner;
②可以由算子传入分区器:reduceByKey(new HashPartitioner(4), _ +_ );
③也可以什么都不传,通过调用 Partitioner.defaultPartitioner( )这个方法来得到一个分区器,该方法的主要部分源码如下 (已做过注释,可放心食用):
因为源码比较晦涩难懂,在此用流程图的方式来解释源码中调用 defaultPartitioner( )方法的逻辑:
如果:父 RDD 中有分区器,且最大分区器的分区数>默认并行度或者是父 RDD 最大分区数/最大分区器分区数 < 10 ,就使用父 RDD 中的最大分区器(分区数),作为子 RDD 的分区器;
否则:新建 HashPartitioner 作为子 RDD 的分区器;分区数优先用 spark.default.parallelis,如无,则用上游最大分区数 ;
源码中之所以这样设计,是为了在调用 shuffle 算子的时候在开发人员没有指定分区器和分区数的情况下,尽量使用父 RDD 中的分区器,而且如此设计也能保证上下游的分区数过渡平稳,即上游 RDD 的分区数不大于下游分区数的十倍,因为每一个分区的数据将来都对应一个 task 去处理,如果下游分区数少于上游太多的话,下游 task 数量就会太少,每一份 task 要处理的数据就会过多,这样就很容易导致某个 Executor 内存溢出,也会拖慢整个任务处理速度。
3. shuffle 算子不一定需要 shuffle 的原因
分区数与分区器的知识已经介绍完了,接下来我们通过一些实验去深入理解分区器和分区数的原理:
在我们印象中,join/reduceByKey/cogroup 这些算子是不是应该都是需要 shuffle 的,那么我们来观察下实验的代码,看看是否真的就 shuffle 了呢。
内在原因:
因为 rdd1 和 rdd2 都没有分区器,所以在调用 defaultPartitioner( )方法获取默认分区器时,由于父 RDD 没有分区器,会给他们都新建一个 hashpartitioner 作为他们的子 RDD——rdd11 和 rdd12 的分区器(所以它们在这个过程中就已经 shuffle 过了),因此它们的分区器是相同的。
因为没有指定分区数所以会去取配置的 spark.default.parallelis 参数作为分区数,因为运行环境相同,所以该参数也相同,也就是分区数也相同。
而 join/reduceByKey/cogroup 等算子的底层原理中都会判断,如果上游 RDD 和下游 RDD、分区器和分区数都相等,则在执行分组聚合或者 join 等逻辑时,数据在上下游的分区分布规则就会完全相同,上游分区数据就会直达下游指定分区,因此也就不需要 Shuffle。
相信经过上文的介绍与实践的思考,各位小伙伴都对 spark 中 RDD 的分区器和分区数有了全新的认知,恭喜你 get 到了这个新知识点,这意味着你离架构师的路又近了一小步。以后再谈起 spark 中 shuffle 算子是否一定会引起 shuffle 的问题时,就可以给朋友从底层 RDD 分区器和分区数的原理上解答该问题了,如果在面试中被问到该类问题,相信如果能从源码的角度答出,也会给自己增分不少。
版权声明: 本文为 InfoQ 作者【数新网络官方账号】的原创文章。
原文链接:【http://xie.infoq.cn/article/a249a7c1b6fd00ea1d86c7dd1】。文章转载请联系作者。
评论