浅谈离线数据倾斜
作者:京东零售 荆明岚
一、数据倾斜的基本概念
1、什么是数据倾斜?
用最通俗易懂的话来说,数据倾斜无非就是大量的相同 key 被 partition 分配到一个分区里,造成了'一个人累死,其他人闲死'的情况,这种情况是我们不能接受的,这也违背了并行计算的初衷,首先一个节点要承受着巨大的压力,而其他节点计算完毕后要一直等待这个忙碌的节点,也拖累了整体的计算时间,可以说效率是十分低下的
2、数据倾斜发生时的现象
(1)绝大多数 task 执行得都非常快,但个别 task 执行的极慢。
(2)原本能正常执行的 Spark 作业,某天突然爆出 OOM(内存溢出)异常。观察异常栈,是我们写的业务代码造成的
3、通用的常规解决方案:
(1)增加 jvm 内存,这适用于第一种情况(唯一值非常少,极少数值有非常多的记录值(唯一值少于几千)),这种情况下,往往只能通过硬件的手段来进行调优,增加 jvm 内存可以显著的提高运行效率。
(2)增加 reduce 的个数,这适用于第二种情况(唯一值比较多,这个字段的某些值有远远多于其他值的记录数,但是它的占比也小于百分之一或千分之一),我们知道,这种情况下,最容易造成的结果就是大量相同 key 被 partition 到一个分区,从而一个 reduce 执行了大量的工作,而如果我们增加了 reduce 的个数,这种情况相对来说会减轻很多,毕竟计算的节点多了,就算工作量还是不均匀的,那也要小很多。
(3)自定义分区,这需要用户自己继承 partition 类,指定分区策略,这种方式效果比较显著。
(4)重新设计 key,有一种方案是在 map 阶段时给 key 加上一个随机数,有了随机数的 key 就不会被大量的分配到同一节点(小几率),待到 reduce 后再把随机数去掉即可。
(5)使用 combinner 合并,combinner 是在 map 阶段,reduce 之前的一个中间阶段,在这个阶段可以选择性的把大量的相同 key 数据先进行一个合并,可以看做是 local reduce,然后再交给 reduce 来处理,这样做的好
4、通用定位发生数据倾斜的代码?
(1)数据倾斜只会发生在 shuffle 中,下面是常用的可能会触发 shuffle 操作的算子:distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition 等。出现数据倾斜时,可能就是代码中使用了这些算子的原因
(2)通过观察 spark UI,定位数据倾斜发生在第几个 stage 中,如果是用 yarn-client 模式提交,那么本地是可以直接看到 log 的,可以在 log 中找到当前运行到了第几个 stage;如果用 yarn-cluster 模式提交,可以通过 Spark Web UI 来查看当前运行到了第几个 stage。此外,无论是使用了 yarn-client 模式还是 yarn-cluster 模式,我们都可以在 Spark Web UI 上深入看一下当前这个 stage 各个 task 分配的数据量,从而进一步确定是不是 task 分配的数据不均匀导致了数据倾斜
二、 Hive 数据倾斜
1、Hive 的执行是分阶段的,map 处理数据量的差异取决于上一个 stage 的 reduce 输出,所以如何将数据均匀的分配到各个 reduce 中,就是解决数据倾斜的根本所在
2 、造成数据倾斜的原因
1)、key 分布不均匀
2)、业务数据本身的特性
3)、建表时考虑不周
4)、某些 SQL 语句本身就有数据倾斜
3 、数据倾斜的表现:
数据倾斜出现在 SQL 算子中包含 join/group by/等聚合操作时,大量的相同 KEY 被分配到少量的 reduce 去处理。导致绝大多数 TASK 执行得都非常快,但个别 TASK 执行的极慢,原本能正常执行的作业,某天突然爆出 OOM(内存溢出)异常。任务进度长时间维持在 99%(或 100%)。任务监控页面,发现只有少量(1 个或几个)reduce 子任务未完成。因为其处理的数据量和其他 reduce 差异过大。单一 reduce 的记录数与平均记录数差异过大,通常可能达到 3 倍甚至更多。 最长时长远大于平均时长。可以查看具体 job 的 reducer counter 计数器协助定位。
4、数据倾斜的解决方案:
(1)参数调节:
hive.map.aggr=true (是否在 Map 端进行聚合,默认为 true),这个设置可以将顶层的聚合操作放在 Map 阶段执行,从而减轻清洗阶段数据传输和 Reduce 阶段的执行时间,提升总体性能
Set hive.groupby.skewindata=true(hive 自动进行负载均衡)
(2)SQL 语句调节
a、如何 Join:
关于驱动表的选取,选用 join key 分布最均匀的表作为驱动表
做好列裁剪和 filter 操作,以达到两表做 join 的时候,数据量相对变小的效果,避免笛卡尔积
Hive 中进行表的关联查询时,尽可能将较大的表放在 Join 之后
b、大小表 Join,开启 mapjoin
mapjoin 的原理: MapJoin 会把小表全部读入内存中,在 map 阶段直接拿另外一个表的数据和内存中表数据做匹配,由于在 map 是进行了 join 操作,省去了 reduce 阶段,运行的效率就会高很多。参与连接的小表的行数,以不超过 2 万条为宜,大小不超过 25M
设置参数
set hive.auto.convert.join=true;
hive.mapjoin.smalltable.filesize=25000000 即 25M
手动指定
-- a 表是大表,数据量是百万级别
-- b 表是小表,数据量在百级别,mapjion 括号中的 b 就是指定哪张表为小表
select
/+mapjoin(b)/
a.field1 as field1,
b.field2 as field2,
b.field3 as field3
from a left join b
on a.field1 = b.field1;
c、大表 Join 大表:
null 值不参与连接,简单举例
select field1,field2,field3…
from log a left join user b on a.userid is not null and a.userid=b.userid
union select field1,field2,field3 from log where userid is null;
将热点 key 打散,但是需要注意,尽量不要在 join 时,对关联 key 使用 rand()函数。因为在 hive 中当遇到 map 失败重算时,就会出现数据重复(数据丢失)的问题,spark 引擎使用 rand 容易导致 task 失败重新计算的时候偶发不一致的问题。可以使用 md5 加密唯一维度值的方式替代 rand(), 比如: md5(concat(coalesce(sku_id, 0), '_', coalesce(dim_store_num, 0), '_', coalesce(store_id, 0), '_',coalesce(delv_center_id, 0))),其中 concat 的字段是表的唯一粒度;也可以使用 hash。
d、count distinct 大量相同特殊值,使用 sum...group by 代替 count(distinct )
例如 select a,count(distinct b) from t group by a
可以写成 select a,sum(1) from (select a,b from t group by a,b) group by a;
select count (distinct key) from a
可以写成 Select sum(1) from (Select key from a group by key) t 特殊情况特殊处理:在业务逻辑优化效果的不大情况下,有些时候是可以将倾斜的数据单独拿出来处理。最后 union 回去
e、 不管是 join 还是 groupby 请先在内层先进行数据过滤,建议只保留需要的 key 值
f、 取最大最小值尽量使用 min/max;不要采用 row_number
g、 不要直接 select * ;在内层做好数据过滤
h、 尽量使用 sort by 替换 order by
i、 明确数据源,有上层汇总的就不要使用基础 fdm 或明细表
J、join 避免多对多关联
在 join 链接查询时,确认是否存在多对多的关联,起码保证有一个表的结果集的关联字段不重复
5、典型的业务场景举例
(1)空值产生的数据倾斜
场景:如日志中,常会有信息丢失的问题,比如日志中的 user_id,如果取其中的 user_id 和 用户表中的 user_id 关联,会碰到数据倾斜的问题。
解决方法 1: user_id 为空的不参与关联
select * from log a
join users b
on a.user_id is not null
and a.user_id = b.user_idunion allselect * from log a
where a.user_id is null;
(2)不同数据类型关联产生数据倾斜
场景:用户表中 user_id 字段为 int,log 表中 user_id 字段既有 string 类型也有 int 类型。当按照 user_id 进行两个表的 Join 操作时,默认的 Hash 操作会按 int 型的 id 来进行分配,这样会导致所有 string 类型 id 的记录都分配到一个 Reducer 中。
解决方法:把数字类型转换成字符串类型
select * from users a
left outer join logs b
on a.usr_id = cast(b.user_id as string)
(3)小表不小不大,怎么用 map join 解决倾斜问题
使用 map join 解决小表(记录数少)关联大表的数据倾斜问题,这个方法使用的频率非常高,但如果小表很大,大到 map join 会出现 bug 或异常,这时就需要特别的处理
select * from log a
left outer join users b
on a.user_id = b.user_id;
users 表有 600w+ 的记录,把 users 分发到所有的 map 上也是个不小的开销,而且 map join 不支持这么大的小表。如果用普通的 join,又会碰到数据倾斜的问题
解决方法:
select /+mapjoin(x)/* from log a
left outer join (
select /+mapjoin(c)/d.*
from ( select distinct user_id from log ) c
join users d
on c.user_id = d.user_id
) x
on a.user_id = b.user_id;
log 里 user_id 有上百万个,这就又回到原来 map join 问题。所幸,每日的会员 uv 不会太多,有交易的会员不会太多,有点击的会员不会太多,有佣金的会员不会太多等等。所以这个方法能解决很多场景下的数据倾斜问题
(4)业务逻辑突发热 key 的处理(真实线上问题)
业务场景举例:流量数据多个设备号对应了一个安装 id,突发某几个安装 id 数量级特别大。在归一环节中,按照安装 id 进行分发 reduce,再进行处理,异常热 key 会造成单一节点处理数据量大,由于数据倾斜从而导致任务卡死的情况。
解决方案:基于小时任务,提前设置一个异常范围,把异常安装 id 和对应的 aid 捞出来,写到维表里面。按照归一逻辑,优先使用 aid 值作为归一结果,所以在归一任务中,读取异常值,随机分发到 reduce 中,并将 aid 赋值给归一字段,这样就避免了热点处理
总结:
1、对于 join,在判断小表不大于 1G 的情况下,使用 map join
2、对于 group by 或 distinct,设定 hive.groupby.skewindata=true
3、尽量使用上述的 SQL 语句调节进行优化
6、数据倾斜的监控预防
(1)测试的时候需要关注数据分布,针对不同日期、关键指标、重点 key、枚举值等
(2)增加数据质量监控,数据计算的每层任务增加数据质量监控。
(3)L0 任务,大数据平台需要有健康度巡检,对资源、参数配置,数据倾斜、稳定性等做任务健康度打分,从而发现数据倾斜的趋势,及早检查任务
三、spark 数据倾斜
Spark 优化数据倾斜的思路,join 方式从 SMJ 方式改成 BMJ 的方式,但是只适合大小表的情况。优化思路一般是: 改 join 方式,开启 spark 自适应框架,优化 sql。
1、开启 sparksql 的数据倾斜时的自适应关联优化
spark.shuffle.statistics.verbose=true --打开后 MapStatus 会采集每个 partition 条数的信息,用于倾斜处理
2 、Sortmergejoin 改成 BroadcastHashJoin。调大 BroadcastHashJoin 的阈值,
在某些场景下可以把 SortMergeJoin 转化成 BroadcastHashJoin 而避免 shuffle 产生的数据倾斜
增加参数:spark.sql.autoBroadcastJoinThreshold=524288000 --将 BHJ 的阈值提高到 500M 3、优化 sql 同 hive
4、倾斜 KEY 查找
需要结合实际业务代码,查找到引起 Shuffle 的算子,并按照以下两种方式查找大 KEY.
方式一:通过 SQL 抽样倾斜 KEY
适用场景:如果数据量比较小的情况下,通过 SQL 的方式验证比较便捷
操作步骤:
1、针对 KEY 进行数量统计。
2、按照数量从大到小进行排序。
3、直接取 limit N 即可。
方式二:通过 sample 抽样倾斜 KEY
适用场景:如果数据量很大,可以通过抽样进行抽取大 KEY。能否抽取到大 KEY 一般和抽取数据比例有关系。
操作步骤:
(1)对 KEY 赋值为 1,便于下一步进行计数
(2) 对 KEY 进行累计
(3)对 KEY 和 VALUE 交换
(4)针对 KEY 按照字典进行倒排
(5)将 KEY 和 VAlUE 位置交换,还原到真实的<KEY,VALUE>
(6)从已排序的 RDD 中,直接取前 N 条
数据倾斜一般由 Shuffle 时数据不均匀导致,一般有三类算子会产生 Shuffle:Aggregation (groupBy)、Join、Window。
Aggregation
建议打散 key 进行二次聚合:采用对 非 constant 值、与 key 无关 的列进行 hash 取模,不要使用 rand 类函数
以 DataFrame API 示例:
dataframe
.groupBy(col("key"), pmod(hash(col("some_col")), 100)).agg(max("value").as("partial_max"))
.groupBy(col("key")).agg(max("partial_max").as("max"))
Window
目前支持该模式下的倾斜 window,(仅支持 3.0)
select (... row_number() over(partition by ... order by ...) as rn) where rn [==|<=|<] k and other conditionsspark.sql.rankLimit.enabled=true (目前支持基于 row_number 的 topK 计算逻辑)
Shuffled Join
Spark 2.4 开启参数
spark.sql.adaptive.enabled=true
spark.shuffle.statistics.verbose=true
spark.sql.adaptive.skewedJoin.enabled=true
spark.sql.adaptive.allowAdditionalShuffle=true
如果不能处理,建议用户自行定位热点数据进行处理
Spark 3.0
spark.sql.adaptive.enabled=true
spark.sql.adaptive.skewJoin.enabled=true
spark.sql.adaptive.skewJoin.enhance.enabled=true (通用倾斜算法,可处理更多场景)
spark.sql.adaptive.forceOptimizeSkewedJoin=true (允许插入额外 shuffle,可处理更多场景)
其他参数:
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes (默认为 256MB,分区大小超过该阈值才可被识别为倾斜分区,如果希望调整的倾斜分区小于该阈值,可以酌情调小)
spark.sql.adaptive.skewJoin.skewedPartitionFactor (默认为 5,分区大小超过中位数 Xfactor 才可被识别为倾斜分区,一般不需要调整)
spark.sql.adaptive.skewJoin.enhance.maxJoins (默认 5,通用倾斜算法中,如果 shuffled join 超过此阈值则不处理,一般不需要调整)
spark.sql.adaptive.skewJoin.enhance.maxSplitsPerPartition (默认 1000,通用倾斜算法中,尽量使得每个倾斜分区的划分不超过该阈值,一般不需要调整)
数据膨胀(Join)
spark.sql.adaptive.skewJoin.inflation.enabled=true (默认 false,由于采样计算会导致性能回归,正常任务不要开启)
spark.sql.adaptive.skewJoin.inflation.factor=50 (默认为 100,预估的分区输出大小超过中位数 Xfactor 才可被识别为膨胀分区,由于预估算法存在误差,一般不要低于 50)
spark.sql.adaptive.shuffle.sampleSizePerPartition=500 (默认 100,每个 Task 中的采样数,基于该采样数据预估 Join 之后的分区大小,如果 Task 数量不大,可以酌情调大)
倾斜 key 检测(Join)
由于 Join 语义限制,对于 A left join skewed B 之类的场景,无法对 B 进行划分处理,否则会导致数据正确性问题,这也是 Spark 项目所面临的难题。如果开启以上功能依然不能处理数据倾斜,可以通过开启倾斜 key 检测功能来定位是哪些 key 导致了倾斜或膨胀,继而进行过滤等处理。
spark.sql.adaptive.shuffle.detectSkewness=true (默认 false,由于采样计算会导致性能回归,正常任务不要开启)
其他参数:
spark.sql.adaptive.shuffle.sampleSizePerPartition=100 (默认 100,每个 Task 中的采样数,如果 Task 数量不大,可以酌情调大)
版权声明: 本文为 InfoQ 作者【京东科技开发者】的原创文章。
原文链接:【http://xie.infoq.cn/article/6f6495f0ff0491b2240b91b4a】。文章转载请联系作者。
评论