Hive 中的 GroupBy, Distinct 和 Join
Hive 中的 GroupBy, Distinct 和 Join
GroupBy几种
几种
Mode
原理
相关参数
Distinct
Single Distinct
Multi Distinct
Join
Common Join
Map Join
Skew Join
本文将介绍 Hive GroupBy,Distinct 和 Join 的原理与相关调优参数,帮助大家深入理解Hive。
GroupBy
几种 Mode
根据调用UDAF的不同接口,Hive GroupBy 算子分为以下几类Mode,具体请看代码注释 :)
原理
Map端聚合: Map端进行预聚合,减少shuffle数据量,类似于MR中的Combiner。默认情况下,Hive 会尽可能地使用 Map 端Aggregation,但是如果 Hash Map不能有效地降低内存使用,那么会降级到普通的Aggregation,即 Map 端仅做Shuffle Write,Reducer执行真正的聚合运算。具体可参考:Hive执行过程中最后是否有map-side aggregation。
倾斜:生成的查询计划有两个 MapReduce 任务。在第一个 MapReduce 中,map 的输出结果集合会随机分布到 reduce 中, 每个 reduce 做部分聚合操作,并输出结果。这样处理的结果是,相同的 Group By Key 有可 能分发到不同的 reduce 中,从而达到负载均衡的目的;第二个 MapReduce 任务再根据预处 理的数据结果按照 Group By Key 分布到 reduce 中(这个过程可以保证相同的 Group By Key 分布到同一个 reduce 中),最后完成最终的聚合操作。(PS: 目前Hive实现中有数据质量问题,请慎用!)
相关参数
Distinct
Single Distinct
当 query 只有一个distinct expression时,那么将 distinct expression作为一个partition key做shuffle,然后利用 MapReduce / Tez 的排序,在 reducer 端取最后一个key的即可完成去重功能。
Multi Distinct
如果查询中有多个 distinct expression,同一条record,会生成多条记录进行Shuffle,增大Shuffle量。考虑以下query:
不考虑 Map Aggregation的情况下,上述 query 实际执行计划如下图所示。Reducer端接收到的 key 中元素分别是:dealid
, distinct expression的序号
, distinct expression
。
具体代码可参考ReduceSinkOperator.process
方法,代码片段如下图所示。
Join
Common Join
原理:
Map阶段:读取源表的数据,Map输出时候以Join on条件中的列为key,如果Join有多个关联键,则以这些关联键的组合作为key;Map输出的value为join之后所关心的(select或者where中需要用到的)列,同时在value中还会包含表的Tag信息,用于标明此value对应哪个表。
Shuffle阶段:根据key的值进行hash,并将key/value按照hash值推送至不同的reduce中,这样确保两个表中相同的key位于同一个reduce中。
Reduce阶段:根据key的值完成join操作,期间通过Tag来识别不同表中的数据。
适用场景:适用于所有类型的表关联与其他类型join不支持的join类型,比如:full outer join.
Map Join——Hive MapJoin 优化历程、FaceBook Join优化
原理:如果关联的表足够小,那么可以将小表加载到mapper的内存中,在map端完成join,减少shuffle和reduce阶段。MapReduce Local Task会在真正的MapReduce Join Task之前,从HDFS读取小表,然后将其转成一个tar文件,最后将文件上传至HDFS Cache.MapReduce Local Task运行过程中,可能由于内存不足而失败,可以通过设置
hive.mapjoin.localtask.max.memory.usage
来改变Local Task可使用的内存大小。
Conditional Task: 当如果N-1张小表大小和小于“hive.mapjoin.smalltable.filesize”这个值,则创建Conditional Task。Conditional Tasks把每张表都是小表的情况都考虑进去了,然后加上一个所有表都不是小表的Common Join Task。 如:
SELECT * FROM cities JOIN sales on cities.cityId=sales.cityId;
则Conditional Tasks结构如下。在作业执行过程中,Hive将获取到所有关联表的元数据,如:大小,位置等,然后选择一个从众多conditional tasks中选择一个task作为真正执行的task。BuckUp Task:在MapReduce Local Task执行过程中,如果由于内存不足,导致任务执行失败,此时会直接执行Common Join Task。(PS:生产环境上能正常走到BackUp Task的case比较少,一般会由于客户端OOM,任务直接退出了。)
相关参数:
适用场景:小表(维度表)join大表(事实表),不适用与Right/Full outer join.
可以优化的点:
根据作业中mapper的数量,来动态调整小表HashTable文件在HDFS上的副本数量。如果mapper有成千万个,可以适当提高cache的副本数量。
通过提前采样小表中的值,动态调整小表大小阈值。如果采样发现小表中重复row较多,可以适度提高小表的上限。
Skew Join——Skewed Join Optimization
原理:
假设A、B两张表相互Join,A表在key=1上倾斜。
若key为1,使用B的key=1的哈希表来计算结果,全部在mapper端完成;
A表其他key发送到reducer端来join,这个reduce也会从B的mapper中得到对应需要连接的数据;
整个过程中,需要读两次B表,并且需要将两类结果UNION起来。
相关参数:
适用场景:
其中一张表有大量数据集中在某几个Key上
非倾斜表B,关于A的倾斜key的所有数据,需要能全部加载至内存中
需要提前预知那些哪些Key是倾斜的
Bucket Join
原理:Join全部在Mapper端进行,只有相互匹配的bucket才会聚集在一起,mapper1只会拉取a,c表的bucket1;Mapper3只会拉取a,c表的bucket2。整个任务在mapper端完成,去除了Shuffle和reduce阶段。
相关参数:
适用场景分析:关联表的分桶数量成倍数表的分桶key,需要与join字段相同
附录
版权声明: 本文为 InfoQ 作者【tkanng】的原创文章。
原文链接:【http://xie.infoq.cn/article/bf6f757a91b75b99db2e37308】。文章转载请联系作者。
评论