Hive|如何避免数据倾斜
1. hive 中桶的概述
对于每一个表(table)或者分区, Hive 可以进一步组织成桶,也就是说桶是更为细粒度的数据范围划分。Hive 也是 针对某一列进行桶的组织。Hive 采用对列值哈希,然后除以桶的个数求余的方式决定该条记录存放在哪个桶当中。
把表(或者分区)组织成桶(Bucket)有两个理由:
(1)获得更高的查询处理效率。
桶为表加上了额外的结构,Hive 在处理有些查询时能利用这个结构。具体而言,连接两个在(包含连接列的)相同列上划分了桶的表,可以使用 Map 端连接 (Map-side join)高效的实现。比如 JOIN 操作。对于 JOIN 操作两个表有一个相同的列,如果对这两个表都进行了桶操作。那么将保存相同列值的桶进行 JOIN 操作就可以,可以大大较少 JOIN 的数据量。
(2)使取样(sampling)更高效。
在处理大规模数据集时,在开发和修改查询的阶段,如果能在数据集的一小部分数据上试运行查询,会带来很多方便。
创建带桶的 table
create table bucketed_user(id int,name string) clustered by (id) sorted by(name) into 4 buckets row format delimited fields terminated by '\t' stored as textfile;
首先,我们来看如何告诉 Hive—个表应该被划分成桶。我们使用 CLUSTERED BY 子句来指定划分桶所用的列和要划分的桶的个数:
CREATE TABLE bucketed_user (id INT) name STRING)
CLUSTERED BY (id) INTO 4 BUCKETS;
在这里,我们使用用户 ID 来确定如何划分桶(Hive 使用对值进行哈希并将结果除 以桶的个数取余数。这样,任何一桶里都会有一个随机的用户集合(PS:其实也能说是随机,不是吗?)。
对于 map 端连接的情况,两个表以相同方式划分桶。处理左边表内某个桶的 mapper 知道右边表内相匹配的行在对应的桶内。因此,mapper 只需要获取那个桶 (这只是右边表内存储数据的一小部分)即可进行连接。这一优化方法并不一定要求 两个表必须桶的个数相同,两个表的桶个数是倍数关系也可以。用 HiveQL 对两个划分了桶的表进行连接,可参见“map 连接”部分(P400)。
桶中的数据可以根据一个或多个列另外进行排序。由于这样对每个桶的连接变成了高效的归并排序(merge-sort), 因此可以进一步提升 map 端连接的效率。以下语法声明一个表使其使用排序桶:
CREATE TABLE bucketed_users (id INT, name STRING)
CLUSTERED BY (id) SORTED BY (id ASC) INTO 4 BUCKETS;
我们如何保证表中的数据都划分成桶了呢?把在 Hive 外生成的数据加载到划分成 桶的表中,当然是可以的。其实让 Hive 来划分桶更容易。这一操作通常针对已有的表。
Hive 并不检查数据文件中的桶是否和表定义中的桶一致(无论是对于桶 的数量或用于划分桶的列)。如果两者不匹配,在査询时可能会碰到错 误或未定义的结果。因此,建议让 Hive 来进行划分桶的操作。
2.hive 中 join 优化
mapside join
方法一:
select /*+ MAPJOIN(time_dim) / count() from
store_sales join time_dim on (ss_sold_time_sk = t_time_sk)
方法二:这个可以由 hive 自动进行 map 端 join
set hive.auto.convert.join=true;
select count(*) from
store_sales join time_dim on (ss_sold_time_sk = t_time_sk)
执行下面这段代码将会产生两个 map-only 方法:
select /*+ MAPJOIN(time_dim, date_dim) / count() from
store_sales
join time_dim on (ss_sold_time_sk = t_time_sk)
join date_dim on (ss_sold_date_sk = d_date_sk)
where t_hour = 8 and d_year = 2002
设置下面两个属性 hive 将会进行自动执行上述过程,第一个属性默认为 true,第二个属性是设置 map 端 join 适合读取内存文件的大小。
set hive.auto.convert.join.noconditionaltask = true;
set hive.auto.convert.join.noconditionaltask.size = 10000000;
Sort-Merge-Bucket (SMB) joins 可以被转化为 SMB map joins。
我们只需要设置一下几个参数即可:
set hive.auto.convert.sortmerge.join=true;
set hive.optimize.bucketmapjoin = true;
set hive.optimize.bucketmapjoin.sortedmerge = true;
已设置大表选择的策略
使用下面属性:
set hive.auto.convert.sortmerge.join.bigtable.selection.policy= org.apache.hadoop.hive.ql.optimizer.TableSizeBasedBigTableSelectorForAutoSMJ;
几种策略设置
org.apache.hadoop.hive.ql.optimizer.AvgPartitionSizeBasedBigTableSelectorForAutoSMJ (default)
org.apache.hadoop.hive.ql.optimizer.LeftmostBigTableSelectorForAutoSMJ
org.apache.hadoop.hive.ql.optimizer.TableSizeBasedBigTableSelectorForAutoSMJ
详细请参考一下连接:hive 中进行连接方案详解(https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Joins)
3,数据倾斜
Hive 的执行是分阶段的,map 处理数据量的差异取决于上一个 stage 的 reduce 输出,所以如何将数据均匀的分配到各个 reduce 中,就是解决数据倾斜的根本所在。
操作
原因
1,数据在节点上分布不均
2,key 分布不均(key 中存在个别值数据量比较大,比如 NULL,那么 join 时就会容易发生数据倾斜)
3,count(disctinct key),在数据两比较大的时候容易发生数据倾斜,因为 count(distinct)是按照 group by 字段进行分组的
4,group by 的使用容易造成数据倾斜
5,业务数据本身的特性
6,建表时考虑不周
7,某些 SQL 语句本身就有数据倾斜
表现
任务进度长时间维持在 99%左右,查看任务监控页面发现只有少量 reduce 任务未完成。因为其处理的数据量和其他 reduce 差异过大。单一 reduce 的记录数与平均记录数差异过大,通常可能达到 3 倍甚至更多。最长时长远大于平均时长。
4,数据倾斜的解决方案
参数调节
set hive.map.aggr=true;
Map 端部分聚合,相当于Combiner
set hive.groupby.skewindata=true;
有数据倾斜的时候进行负载均衡,当选项设定为 true,生成的查询计划会有两个 MR Job。第一个 MR Job 中,Map 的输出结果集合会随机分布到 Reduce 中,每个 Reduce 做部分聚合操作,并输出结果,这样处理的结果是相同的 Group By Key 有可能被分发到不同的 Reduce 中,从而达到负载均衡的目的;第二个 MR Job 再根据预处理的数据结果按照 Group By Key 分布到 Reduce 中(这个过程可以保证相同的 Group By Key 被分布到同一个 Reduce 中),最后完成最终的聚合操作。
SQL 语句调节:
如何 Join 关于驱动表的选取,选用 join key 分布最均匀的表作为驱动表 做好列裁剪和 filter 操作,以达到两表做 join 的时候,数据量相对变小的效果。
大小表 Join 使用 map join 让小的维度表(1000 条以下的记录条数) 先进内存。在 map 端完成 reduce.
大表 Join 大表 把空值的 key 变成一个字符串加上随机数,把倾斜的数据分到不同的 reduce 上,由于 null 值关联不上,处理后并不影响最终结果。
count distinct 大量相同特殊值 容易倾斜,当 xx 字段存在大量的某个值时,NULL 或者空的记录
解决思路 将特定的值,进行特定的处理。比如是 null,过滤掉,where case 特定方式转换特定的值,使得这些值不一样,同时这些值不影响分析。
group by 维度过小:Group by 在 Map 端进行部分数据合并
set hive.map.aggr ; --> 是否在Map端进行数据聚合,默认设置为true; set hive.groupby.mapaggr.checkinterval ; --> 在Map端进行聚合操作的条目数。
进行负载均衡负载均衡
set hive.groupby.skewindata ; 默认值是false,需要设置成true ; 当设置为true时,会变成两个MapReduce ;
第一个 MR JOb 中,map 的输出结果会随机分布到 Reduce 中,每个 Reduce 做部分聚合操作,并输出结果,这样出来的结果相同的 Group By Key 有可能被分发到不同的 Reduce 中,从而达到辅助均衡目的。
第二个 MR JOb,会根据预处理数据结果按照 key 分布到 Reduce 中,最终完成聚合操作。
5,典型的业务场景
空值产生的数据倾斜
场景:如日志中,常会有信息丢失的问题,比如日志中的 user_id,如果取其中的 user_id 和 用户表中的 user_id 关联,会碰到数据倾斜的问题。
解决方法 1:user_id 为空的不参与关联
select * from log a
join users b
on a.user_id is not null
and a.user_id = b.user_id
union all
select * from log a
where a.user_id is null;
解决方法 2 :赋与空值分新的 key 值
`select *
from log a
left outer join users b
on case when a.user_id is null then concat(‘hive’,rand() ) else a.user_id end = b.user_id;
`
结论:方法 2 比方法 1 效率更好,不但 io 少了,而且作业数也少了。解决方法 1 中 log 读取两次,jobs 是 2。解决方法 2 job 数是 1 。这个优化适合无效 id (比如 -99 , ’’, null 等) 产生的倾斜问题。把空值的 key 变成一个字符串加上随机数,就能把倾斜的数据分到不同的 reduce 上 ,解决数据倾斜问题。
不同数据类型关联产生数据倾斜
场景:用户表中 user_id 字段为 int,log 表中 user_id 字段既有 string 类型也有 int 类型。当按照 user_id 进行两个表的 Join 操作时,默认的 Hash 操作会按 int 型的 id 来进行分配,这样会导致所有 string 类型 id 的记录都分配到一个 Reducer 中。
解决方法:把数字类型转换成字符串类型
`select * from users a
left outer join logs b
on a.usr_id = cast(b.user_id as string)
`
小表不小不大,怎么用 map join 解决倾斜问题
使用 map join 解决小表(记录数少)关联大表的数据倾斜问题,这个方法使用的频率非常高,但如果小表很大,大到 map join 会出现 bug 或异常,这时就需要特别的处理。以下例子:
`select * from log a
left outer join users b
on a.user_id = b.user_id;
`
users 表有 600w+ 的记录,把 users 分发到所有的 map 上也是个不小的开销,而且 map join 不支持这么大的小表。如果用普通的 join,又会碰到数据倾斜的问题。
解决方法:
`select /+mapjoin(x)/* from log a
left outer join (
on a.user_id = b.user_id;
`
假如,log 里 user_id 有上百万个,这就又回到原来 map join 问题。所幸,每日的会员 uv 不会太多,有交易的会员不会太多,有点击的会员不会太多,有佣金的会员不会太多等等。所以这个方法能解决很多场景下的数据倾斜问题。
6,数据倾斜总结
使 map 的输出数据更均匀的分布到 reduce 中去,是我们的最终目标。由于 Hash 算法的局限性,按 key Hash 会或多或少的造成数据倾斜。大量经验表明数据倾斜的原因是人为的建表疏忽或业务逻辑可以规避的。在此给出较为通用的步骤:
1、采样 log 表,哪些 user_id 比较倾斜,得到一个结果表 tmp1。由于对计算框架来说,所有的数据过来,他都是不知道数据分布情况的,所以采样是并不可少的。
2、数据的分布符合社会学统计规则,贫富不均。倾斜的 key 不会太多,就像一个社会的富人不多,奇特的人不多一样。所以 tmp1 记录数会很少。把 tmp1 和 users 做 map join 生成 tmp2,把 tmp2 读到 distribute file cache。这是一个 map 过程。
3、map 读入 users 和 log,假如记录来自 log,则检查 user_id 是否在 tmp2 里,如果是,输出到本地文件 a,否则生成的 key,value 对,假如记录来自 member,生成的 key,value 对,进入 reduce 阶段。
4、最终把 a 文件,把 Stage3 reduce 阶段输出的文件合并起写到 hdfs。
如果确认业务需要这样倾斜的逻辑,考虑以下的优化方案:
1、对于 join,在判断小表不大于 1G 的情况下,使用 map join
2、对于 group by 或 distinct,设定
hive.groupby.skewindata=true
版权声明: 本文为 InfoQ 作者【数据社】的原创文章。
原文链接:【http://xie.infoq.cn/article/0e1296242d04cf64fdc1c83a7】。文章转载请联系作者。
评论