写点什么

Spark 数据倾斜解决方案实战(一)

用户头像
小舰
关注
发布于: 2021 年 04 月 05 日

1.数据准备

1.1 准备数据首先准备一份数据,导入 hive 表中,类似下表,当然也可以创建一个外表,总之,准备一份足量数据:(以下表只用于描述过程,大家可以根据自己的表做调整)

CREATE TABLE cyj_tmp(A int,B int,C varchar(10))


1.2 新增一个有序的 id 列

CREATE TABLE cyj_tmp(id int,A int,B int,C varchar(10))

insert into cyj_testselectrow_number() over(partition by 1) as id,A,B,Cfrom cyj_tmp;


1.3 制造数据倾斜

我的数据总共 3 亿条左右,但是我用不了这么多,就用了前 1.2 亿条数据,现在做如下处理:id 为 1-1 亿的数据不做任何处理;id 为 1 亿-1.2 亿的数据将 id 进行修改,改为模 10 余 8,也就是,如果以并行度为 10 的时候,这 2000 万条数据会倾斜到 task8 里;


INSERT OVERWRITE TABLE cyj_skewSELECTCASE WHEN id > 100000000 THEN (100000008 + (CAST (RAND() * 50 AS INT)) * 10 )ELSE idEND,A,B,CFROM cyj_testWHERE id BETWEEN 0 AND 120000000;


2.数据倾斜调优-提高并发

2.1 进行 wordcount 任务 def skew(spark: SparkSession, repath:String): Unit = {


val db_name = Config.dbnameval sql = "select * from cyj_skew "
spark.sql(s"use $db_name")val dfresult = spark.sql(sql)
val rddresult = dfresult.rdd .map(x=>(x.get(0),1)) .groupByKey(10) //并行度参数 = 10 .mapValues(_.size)rddresult.saveAsTextFile(repath)
复制代码


}

2.2 查看任务运行

我们可以看到,task8 里面确实比其他 task 多了 2000w 条数据,发生了数据倾斜(ps:由于我自己数据的原因,实际数据总条数是 120773171,后面的若干条数据可以无视)


2.3 修改并发度

def skew(spark: SparkSession, repath:String): Unit = {


val db_name = Config.dbnameval sql = "select * from cyj_skew "
spark.sql(s"use $db_name")val dfresult = spark.sql(sql)
val rddresult = dfresult.rdd .map(x=>(x.get(0),1)) .groupByKey(20) //并行度参数 = 20 .mapValues(_.size)rddresult.saveAsTextFile(repath)
复制代码


}

2.4 查看任务运行发现数据倾斜得到了改善,虽然 task 数量变多了,但是每个 task 的数据量变少了,这如果在数据量很大的情况下,是会缓解某个节点的压力的;



总结调整并发度的方式是改善数据倾斜的一个方法,但是优缺点已经很明显了,他会带来 task 任务数量的增多。而且,试想一些,如果倾斜的 key 并不是都不相同,而是都相同,这种方法其实并不能起到作用。那应该怎么解决呢?期待下一期吧~

发布于: 2021 年 04 月 05 日阅读数: 18
用户头像

小舰

关注

公众号:DLab数据实验室 2020.11.12 加入

中国人民大学硕士

评论

发布
暂无评论
Spark数据倾斜解决方案实战(一)