Spark 数据倾斜方案实战 (二)
导读:上一期讲了[Spark 数据倾斜解决方案实战(一)],这个方法有个很好的优点就是,你可以不改代码,直接通过参数 spark.default.parallelism 就可以调整。本期介绍第二种,自定义 partitioner 的方法,我们实现 spark 提供的分区接口,自定义分区。这样有个什么好处呢,我们想让哪个数据在哪个分区就能在那个分区,这个方法可真是太香了,我们来看看吧~
自定义 partitioner
01 数据准备
数据准备工作上一期已经介绍过了,这里就不多介绍了,直接开始看第二种数据倾斜解决方法。
02 自定义分区
首先我们要创建一个自定义的 partitioner 的类,继承 Spark 本身提供的接口,实现其中的方法。其实就有两个比较重要的方法,如下所示。
override def numPartitions: Int = numParts 这个方法就是来设定,你的任务并发数的; override def getPartition(key: Any): Int = {}这个方法就是来自定义实现你的分区逻辑的;
比如我这里是这样规定的,如果你的 id 超过了 1 亿,那就让这些数据平均分配到各个分区中,如果你没看第一篇文章我的倾斜数据是什么样的,这里可能比较懵(这里再简单说一遍,就是我的数据总共有 1.2 亿条,前 1 亿条数据都是连续的,后面的 2000 万条数据对 id 做了处理,使他们模 10 余 8),这里就是将这 2000 万条数据再经过处理,让他们重新平均分配到各个分区中。
看代码实现:
然后,我们运行一下看看效果,我们的并发数依然是 10,果不其然,我们的数据倾斜现象没了,都老老实实的进行了平均分配。
附一下上一期的数据倾斜的情况:
总结
所以,这种方式我们不用改变并行度,直接根据自己的意愿来安排每条数据的分区,非常棒。当然了,如果非要说缺点,跟调整并行度一样,我们都是针对不同的 key 来进行操作的,如果是由于 key 相同而引起的数据倾斜,还要想另外的办法。所以想知道么,等我下一期~
版权声明: 本文为 InfoQ 作者【小舰】的原创文章。
原文链接:【http://xie.infoq.cn/article/ba53eeef6a55d7399c21c5ed0】。文章转载请联系作者。
评论