写点什么

Spark 数据倾斜方案实战 (二)

用户头像
小舰
关注
发布于: 2021 年 04 月 08 日

导读:上一期讲了[Spark 数据倾斜解决方案实战(一)],这个方法有个很好的优点就是,你可以不改代码,直接通过参数 spark.default.parallelism 就可以调整。本期介绍第二种,自定义 partitioner 的方法,我们实现 spark 提供的分区接口,自定义分区。这样有个什么好处呢,我们想让哪个数据在哪个分区就能在那个分区,这个方法可真是太香了,我们来看看吧~


自定义 partitioner

01 数据准备


数据准备工作上一期已经介绍过了,这里就不多介绍了,直接开始看第二种数据倾斜解决方法。


02 自定义分区


首先我们要创建一个自定义的 partitioner 的类,继承 Spark 本身提供的接口,实现其中的方法。其实就有两个比较重要的方法,如下所示。

override def numPartitions: Int = numParts 这个方法就是来设定,你的任务并发数的;  override def getPartition(key: Any): Int = {}这个方法就是来自定义实现你的分区逻辑的;

比如我这里是这样规定的,如果你的 id 超过了 1 亿,那就让这些数据平均分配到各个分区中,如果你没看第一篇文章我的倾斜数据是什么样的,这里可能比较懵(这里再简单说一遍,就是我的数据总共有 1.2 亿条,前 1 亿条数据都是连续的,后面的 2000 万条数据对 id 做了处理,使他们模 10 余 8),这里就是将这 2000 万条数据再经过处理,让他们重新平均分配到各个分区中。

看代码实现:

//自定义分区逻辑class SkewPartitioner(numParts:Int) extends Partitioner{  override def numPartitions: Int = numParts
//返回分区id override def getPartition(key: Any): Int = { val keynum = Integer.parseInt(key.toString) if(keynum<100000000){ keynum%numParts }else{ (keynum+(new util.Random().nextInt(numParts)))%numParts } }}

//调用def skew_partitioner(spark: SparkSession, parall:Int, repath:String): Unit = {
val db_name = Config.dbname val sql = "select * from cyj_skew "
spark.sql(s"use $db_name") val dfresult = spark.sql(sql)
val rddresult = dfresult.rdd .map(x=>(x.get(0),1)) .groupByKey(new SkewPartitioner(10)) //调用自定义partitioner .mapValues(_.size) rddresult.saveAsTextFile(repath)
}
复制代码

然后,我们运行一下看看效果,我们的并发数依然是 10,果不其然,我们的数据倾斜现象没了,都老老实实的进行了平均分配。


附一下上一期的数据倾斜的情况:


总结

所以,这种方式我们不用改变并行度,直接根据自己的意愿来安排每条数据的分区,非常棒。当然了,如果非要说缺点,跟调整并行度一样,我们都是针对不同的 key 来进行操作的,如果是由于 key 相同而引起的数据倾斜,还要想另外的办法。所以想知道么,等我下一期~


发布于: 2021 年 04 月 08 日阅读数: 26
用户头像

小舰

关注

公众号:DLab数据实验室 2020.11.12 加入

中国人民大学硕士

评论

发布
暂无评论
Spark数据倾斜方案实战(二)