深入浅出 Spark
在数据分析和处理的过程中,我们经常会用Join操作来关联两个数据集,Spark作为一个通用的分析引擎,能够支持多种Join的应用场景。
Join操作的输入是两个数据集,A和B,将数据集A中的每一条记录和数据集B中的每一条记录进行比对,每发现一条符合条件的记录时,返回一条新的记录,新记录中的字段可以只从A中来,也可以只从B中来,也可以分别从A和B中取一部分,因此,Join后的记录可以表示两个数据集中记录的结合。
影响Spark Join操作的三个因素
具体到Spark中Join操作的执行,有三个影响较大的因素:输入数据集的大小、Join条件、Join类型。
输入数据集的大小
输入数据集的大小直接影响Join操作的效率和可靠性,不只绝对大小,数据集之间的相对大小也对效率和可靠性有影响。
Join条件
Join条件通常是两个数据集中字段的逻辑比较,一般可以分为等值Join和不等值Join。
等值Join可以包含一个相等条件或多个需要同时满足的相等条件,比如:
一个相等条件:
A.x == B.x
多个相等条件:
A.x == B.x and A.y == B.y
注:x 和 y 是数据集A和B中的字段。
不等值Join使用不相等条件或者不能同时满足的相等条件,比如:
不相等条件:
A.x < B.x
不能同时满足的相等条件:
A.x == B.x or A.y == B.y
Join类型
Join类型影响Join操作的输出,大致包括以下几类:
Inner Join:Inner Join只输出匹配的记录(满足Join条件),记录来自两个数据集
Outer Join:Outer Join除了输出匹配的记录,也输出未匹配的记录,根据如何输出未匹配的记录,outer Join可以进一步分为left out join、right out join和full outer join,记录来自两个数据集
Semi Join:Semi Join输出的记录只来自一个数据集,要么是匹配的记录,要么是未匹配的记录。如果输出的是未匹配的记录,也叫做Anti Join
Cross Join:Cross Join输出两个数据集中所有记录可能的组合,例如,A集合中有m条记录,B集合中有n条记录,则结果为m*n条记录,Cross Join又称为笛卡尔积。
根据上面的三个因素,Spark会选择合适的执行机制来完成Join操作。
Spark Join的执行机制
Spark提供了五种执行Join操作的机制,分别是:
Shuffle Hash Join
Broadcast Hash Join
Sort Merge Join
Cartesian Join
Broadcast Nested Join
Hash Join
Broadcast Hash Join和Shuffle Hash Join都基于Hash Join,Hash Join是单机上的Join操作。想象一道LeetCode算法题,数据量分别为m和n的两个数组,怎么找到两个数组的公共元素?第一种方法:对两个数组进行嵌套循环的遍历,发现相等元素则输出。第二种方法:用空间换时间,将其中一个数组转化成集合(Python的set或者Java的HashSet,实现都基于哈希表),然后遍历第二个数组中的每一个元素,判断是否包含在第一个集合中。Hash Join和第二种方法类似,将较小的数据集分区构造成哈希表,用Join的key作为哈希表的key,key所对应的记录作为哈希表的value,然后遍历较大的数据集分区,在哈希表中寻找对应的key,找到两个分区key相同的记录将其输出。因为使用了哈希表,所以叫做Hash Join。
根据进行Join的两个数据集的大小关系,Spark支持两种Hash Join。
Broadcast Hash Join
当其中一个数据集足够小时,采用Broadcast Hash Join,较小的数据集会被广播到所有Spark的executor上,并转化为一个Hash Table,之后较大数据集的各个分区会在各个executor上与Hash Table进行本地的Join,各分区Join的结果合并为最终结果。
Broadcast Hash Join 没有Shuffle阶段、效率最高。但为了保证可靠性,executor必须有足够的内存能放得下被广播的数据集,所以当进两个数据集的大小都超过一个可配置的阈值之后,Spark不会采用这种Join。控制这个阈值的参数为
spark.sql.autoBroadcastJoinThreshold
,最新版本(3.0.1)中默认值为10M。
Shuffle Hash Join
当两个数据集都小于可以使用Broadcast Hash Join的阈值时,采用Shuffle Join,先对两个数据集进行Shuffle,Shuffle是意思是根据key的哈希值,对两个数据集进行重新分区,使得两个数据集中key的哈希值相同的记录会被分配到同一个executor上,此时在每个executor上的分区都足够小,各个executor分别执行Hash Join即可。
Shuffle操作会带来大量的网络IO开销,因此效率会受到影响。同时,在executor的内存使用方面,如果executor的数量足够多,每个分区处理的数据量可以控制到比较小。
Sort Merge Join
Sort Merge Join和Shuffle Hash Join类似,会有一个Shuffle阶段,将key相同的记录重分配同一个executor上,不同的是,在每个executor上,不再构造哈希表,而是对两个分区进行排序,然后用两个下标同时遍历两个分区,如果两个下标指向的记录key相同,则输出这两条记录,否则移动key较小的下标。
Sort Merge Join也有Shuffle阶段,因此效率同样不如Broadcast Hash Join。在内存使用方面,因为不需要构造哈希表,需要的内存比Hash Join要少。
Cartesian Join
Cartesian Join机制专门用来实现cross join,结果的分区数等于输入数据集的分区数之积,结果中每一个分区的数据对应一个输入数据集的一个分区和另外一个输入数据集的一个分区。
Cartesian Join会产生非常多的分区,但如果要进行cross join,别无选择。
Broadcast Nested Loop Join
Broadcast Nested Join将一个输入数据集广播到每个executor上,然后在各个executor上,另一个数据集的分区会和第一个数据集使用嵌套循环的方式进行Join输出结果。
Broadcast Nested Join需要广播数据集和嵌套循环,计算效率极低,对内存的需求也极大,因为不论数据集大小,都会有一个数据集被广播到所有executor上。
Spark如何选择Join机制
Spark根据以下的因素选择实际执行Join的机制:
参数配置
hint参数
输入数据集大小
Join类型
Join条件
其中,hint参数是一种在join时手动指定join机制的方法,例如:
下面介绍在什么情况下使用何种Join机制。
何时使用Broadcast Hash Join
必需条件:
只用于等值Join
不能用于Full Outer Join
以下条件需要满足一个:
左边的数据集使用了broadcast hint,Join类型是Right Outer,Right Semi或Inner
没使用hint,但左边的数据集小于
spark.sql.autoBroadcastJoinThreshold
参数,Join类型是Right Outer,Right Semi或Inner右边的数据集使用了broadcast hint,Join类型是Left Outer,Left Semi或Inner
没使用hint,但右边的数据集小于
spark.sql.autoBroadcastJoinThreshold
参数,Join类型是Left Outer,Left Semi或Inner两个数据集都使用了broadcast hint,Join类型是Left Outer,Left Semi,Right Outer,Right Semi或Inner
没使用hint,但两个数据集都小于
spark.sql.autoBroadcastJoinThreshold
参数,Join类型是Left Outer,Left Semi,Right Outer,Right Semi或Inner
何时使用Shuffle Hash Join
必需条件:
只用于等值Join
不能用于Full Outer Join
spark.sql.join.prefersortmergeJoin
参数默认值为true,设置为false
以下条件需要满足一个:
左边的数据集使用了shuffle_hash hint,Join类型是Right Outer,Right Semi或Inner
没使用hint,但左边的数据集比右边的数据集显著小,Join类型是Right Outer,Right Semi或Inner
右边的数据集使用了shuffle_hash hint,Join类型是Left Outer,Left Semi或Inner
没使用hint,但右边的数据集比左边的数据集显著小,Join类型是Left Outer,Left Semi或Inner
两边的数据集都使用了shuffle_hash hint,Join类型是Left Outer,Left Semi,Right Outer,Right Semi或Inner
没使用hint,两个数据集都比较小,Join类型是Left Outer,Left Semi,Right Outer,Right Semi或Inner
何时使用Sort Merge Join
必需条件:
只用于等值Join
Join条件中的key是可排序的
spark.sql.join.prefersortmergeJoin
参数默认值为true,设置为true
以下条件需要满足一个:
有一个数据集使用了merge hint,Join类型任意
没有使用merge hint,Join类型任意
何时使用Cartesian Join
必需条件:
Cross Join
以下条件需要满足一个:
使用了shuffle_replicate_nl hint,是等值或不等值Join均可
没有使用hint,等值或不等值Join均可
何时Broadcast Nested Loop Join
Broadcast Nested Loop Join是默认的Join机制,当没有选用其他Join机制被选择时,用它来进行任意条件任意类型的Join。
当有多种Join机制可用时,选择的优先级为Broadcast Hash Join > Sort Merge Join > Shuffle Hash Join > Cartesian Join。
在进行Inner Join和不等值Join时,如果有一个数据集可以被广播,Broadcast Nested Loop Join的优先级比Cartesian Join优先级高。
参考链接:
https://towardsdatascience.com/demystifying-joins-in-apache-spark-38589701a88e
https://blog.csdn.net/lsr40/article/details/99569049
https://www.ruanyifeng.com/blog/2019/01/table-join.html
http://hbasefly.com/2017/03/19/sparksql-basic-join/
公众号:大数志
传递最新、最有价值的大数据技术干货和资讯。
版权声明: 本文为 InfoQ 作者【大数志】的原创文章。
原文链接:【http://xie.infoq.cn/article/612ccea5c1762c351d8d139c8】。
本文遵守【CC BY-NC-SA】协议,转载请保留原文出处及本版权声明。
评论