写点什么

Spark 数据倾斜解决方案实战(三)

用户头像
小舰
关注
发布于: 2021 年 04 月 13 日

增加随机前缀

上两期,我们分别讲了通过提高并行度和自定义分区策略来解决数据倾斜的方法,同时我们也讲到了他们的共同缺点:针对于不同 key 倾斜到同一个节点到场景。那如果是同样的 key 太大怎么办呢?如何将同一个 key 分配到不同的节点呢?答案就是通过对 key 增加前后缀的方式,这样就可以重新为这些数据划分分区了。

这种方式解决起来比较麻烦,我们可以一步一步来。

1.数据准备

由于上面两期,我们都是用的 1.2 亿条数据的单表来进行测试的,本节的场景需要涉及到两表 join,因此我们再次构造一个小表。

我们取10050000条数据,其中后5万条数据做了重复key的处理,总共有500个key,每个key重复100次INSERT OVERWRITE TABLE cyj_skew_smallSELECT     CASE WHEN id > 100000000 THEN (100000008  + (CAST (RAND() * 500 AS INT)) * 10 )    ELSE id    END,    A,    B,    CFROM cyj_testWHERE id BETWEEN 90000000 AND 100050000;
本期我们就用这两个表进行join,再简单回顾一下那1.2亿条数据的表cyj_skew,其中有1亿条数据是不重复的,后面的2000万条数据集中在了50个key中,所以这50个key每个key会重复40万次;那我们可以简单估计,cyj_skew中的50个重复key与cyj_skew_small 500个重复key相join,二者共有的一个重复key会产生100*40w=4000w条结果,这个数据量比起其它key来讲会大千万倍。所以我们预想肯定会产生数据倾斜。
复制代码

2.判断数据倾斜

如何判断数据倾斜呢?如果我们看到在任务执行的时候,大部分 task 都执行完毕,只剩若干个任务还在运行,并且运行时间和处理数据量远超其他 task,则可以认为发生了数据倾斜,如下图所示,时间和数据量相差比较大。



2.判断倾斜键

因为数据倾斜键是我们自己模拟出来的,所以我们知道cyj_skew和cyj_skew_small分别有50和500个重复键,那我们需要找出他们共有的重复键,对这些重复键进行优化。
select t1.idfrom(select distinct id from cyj_skew where id > 100000000) t1join (select distinct id from cyj_skew_small where id > 100000000) t2on t1.id=t2.id;
结果我们确实查出了共有的50个重复键[100000298 , 100000318 , 100000158 , 100000388 , 100000078 , 100000088 , 100000328 , 100000128 , 100000378 , 100000408 , 100000418 , 100000278 , 100000068 , 100000098 , 100000108 , 100000168 , 100000178 , 100000438 , 100000228 , 100000458 , 100000248 , 100000468 , 100000488 , 100000118 , 100000148 , 100000018 , 100000038 , 100000258 , 100000048 , 100000498 , 100000358 , 100000238 , 100000308 , 100000138 , 100000188 , 100000478 , 100000058 , 100000338 , 100000268 , 100000348 , 100000398 , 100000198 , 100000428 , 100000028 , 100000288 , 100000448 , 100000368 , 100000008 , 100000208 , 100000218 ]
复制代码

3.对重复键进行加前缀

首先,我们需要查询出这两个表对数据,得到两份数据 DataFrame

val db_name = Config.dbname   val sql_big = "select * from cyj_skew"   val sql_small = "select * from cyj_skew_small"
spark.sql(s"use $db_name") val dfbig = spark.sql(sql_big) val dfsmall = spark.sql(sql_small)
复制代码

然后,对这些数据的共有 key 进行加前缀操作

var skewKeys:List[Int] = List( 100000298  , 100000318  , 100000158  , 100000388  , 100000078  , 100000088      , 100000328  , 100000128  , 100000378  , 100000408  , 100000418  , 100000278  , 100000068  , 100000098      , 100000108  , 100000168  , 100000178  , 100000438  , 100000228  , 100000458  , 100000248  , 100000468      , 100000488  , 100000118  , 100000148  , 100000018  , 100000038  , 100000258  , 100000048  , 100000498      , 100000358  , 100000238  , 100000308  , 100000138  , 100000188  , 100000478  , 100000058  , 100000338      , 100000268  , 100000348  , 100000398  , 100000198  , 100000428  , 100000028  , 100000288  , 100000448      , 100000368  , 100000008  , 100000208  , 100000218  )

val rddbig = dfbig.rdd .map(x=>{ if(skewKeys.contains(x.get(0))){ //对包含在公共重复key的key进行处理,随机添加0-50的前缀 //例如: 100000298 ==> 34|100000298 var parts = new util.Random().nextInt(50) (parts+"|"+x.get(0),x.get(1).toString,x.get(2).toString) }else{ (x.get(0).toString,x.get(1).toString,x.get(2).toString) } })
val rddsmall = dfsmall.rdd .flatMap(x=>{ var buffList = ListBuffer[(String,String,String)]() if(skewKeys.contains(x.get(0))){ for(i<- 0 until 50){ //until不包括50 buffList ++= List((i+"|"+x.get(0),x.get(1).toString,x.get(2).toString)) } }else{ buffList ++= List((x.get(0).toString,x.get(1).toString,x.get(2).toString)) } buffList })
复制代码

最后,我们重新将添加前缀的数据进行 join 计算

val _dfbig = rddbig.toDF("id","A","B") val _dfsmall = rddsmall.toDF("id","A","B")_dfbig.join(_dfsmall,"id").rdd.saveAsTextFile(rePath)
复制代码

4.运行无 key 前缀优化的性能结果

我们可以看到无论是时间还是数据量上都差别很大,而且总运行时长为 11 分钟左右


经过优化后,我们再次运行

我们可以看到,整体的时间和数据量相差不大,数据倾斜现象基本消失,整个任务运行 3 分钟左右。


总结

好了,到这里我们就把数据倾斜常用的处理方式介绍完了。在实际的场景中,可能要综合这几种方法来综合调优,欢迎一起讨论~

发布于: 2021 年 04 月 13 日阅读数: 15
用户头像

小舰

关注

公众号:DLab数据实验室 2020.11.12 加入

中国人民大学硕士

评论

发布
暂无评论
Spark数据倾斜解决方案实战(三)