写点什么

Hive 中的 GroupBy, Distinct 和 Join

用户头像
tkanng
关注
发布于: 2020 年 05 月 02 日
Hive 中的 GroupBy, Distinct 和 Join



Hive 中的 GroupBy, Distinct 和 Join

  • GroupBy几种

  • 几种Mode

  • 原理

  • 相关参数

  • Distinct

  • Single Distinct

  • Multi Distinct

  • Join

  • Common Join

  • Map Join

  • Skew Join



本文将介绍 Hive GroupBy,Distinct 和 Join 的原理与相关调优参数,帮助大家深入理解Hive。

GroupBy

几种 Mode

根据调用UDAF的不同接口,Hive GroupBy 算子分为以下几类Mode,具体请看代码注释 :)

/**
* Group-by Mode: COMPLETE: complete 1-phase aggregation: iterate, terminate
* PARTIAL1: partial aggregation - first phase: iterate, terminatePartial
* PARTIAL2: partial aggregation - second phase: merge, terminatePartial
* PARTIALS: For non-distinct the same as PARTIAL2, for distinct the same as
* PARTIAL1
* FINAL: partial aggregation - final phase: merge, terminate
* HASH: For non-distinct the same as PARTIAL1 but use hash-table-based aggregation
* MERGEPARTIAL: FINAL for non-distinct aggregations, COMPLETE for distinct
* aggregations.
*/

原理

  • Map端聚合: Map端进行预聚合,减少shuffle数据量,类似于MR中的Combiner。默认情况下,Hive 会尽可能地使用 Map 端Aggregation,但是如果 Hash Map不能有效地降低内存使用,那么会降级到普通的Aggregation,即 Map 端仅做Shuffle Write,Reducer执行真正的聚合运算。具体可参考:Hive执行过程中最后是否有map-side aggregation。 



  • 倾斜:生成的查询计划有两个 MapReduce 任务。在第一个 MapReduce 中,map 的输出结果集合会随机分布到 reduce 中, 每个 reduce 做部分聚合操作,并输出结果。这样处理的结果是,相同的 Group By Key 有可 能分发到不同的 reduce 中,从而达到负载均衡的目的;第二个 MapReduce 任务再根据预处 理的数据结果按照 Group By Key 分布到 reduce 中(这个过程可以保证相同的 Group By Key 分布到同一个 reduce 中),最后完成最终的聚合操作。(PS: 目前Hive实现中有数据质量问题,请慎用!) 



  • 相关参数

# 是否开启mapper端聚合
hive.map.aggr
# 是否开启,如果数据倾斜,是否优化group by为两个MR job
#该配置会触发hive增加额外的mr过程,随机化key后进行聚合操作得到中间结果,再对中间结果执行最终的聚合操作。
#count(distinct)操作比较特殊,无法进行中间的聚合操作,因此该参数对有count(distinct)操作的sql不适用。
hive.groupby.skewindata
# 用于map端聚合的hashtable最大可用内存,如果超过该内存比例,将flush到磁盘
hive.map.aggr.hash.force.flush.memory.threshold
# 可以用于mapper端hatable的内存比例
hive.map.aggr.hash.percentmemory (Default: 0.5) – Percent of total map task memory that can be used for hash table.
# 如果hashtable大小/输入行数 大于该阈值,那么停止hash聚合,转为sort-based aggregation
hive.map.aggr.hash.min.reduction (Default: 0.5)
# 每隔多少行,检测hashtable大小和input row比例是否超过阈值
hive.groupby.mapaggr.checkinterval
# 是否开启bucket group by
hive.optimize.groupby

Distinct

Single Distinct

当 query 只有一个distinct expression时,那么将 distinct expression作为一个partition key做shuffle,然后利用 MapReduce / Tez 的排序,在 reducer 端取最后一个key的即可完成去重功能。

Multi Distinct

如果查询中有多个 distinct expression,同一条record,会生成多条记录进行Shuffle,增大Shuffle量。考虑以下query:

select dealid, count(distinct uid), count(distinct date) from order group by dealid;


不考虑 Map Aggregation的情况下,上述 query 实际执行计划如下图所示。Reducer端接收到的 key 中元素分别是:dealiddistinct expression的序号distinct expression

https://tech.meituan.com/2014/02/12/hive-sql-to-mapreduce.html



具体代码可参考ReduceSinkOperator.process方法,代码片段如下图所示。 



Join

Common Join

  • 原理:

  • Map阶段:读取源表的数据,Map输出时候以Join on条件中的列为key,如果Join有多个关联键,则以这些关联键的组合作为key;Map输出的value为join之后所关心的(select或者where中需要用到的)列,同时在value中还会包含表的Tag信息,用于标明此value对应哪个表。

  • Shuffle阶段:根据key的值进行hash,并将key/value按照hash值推送至不同的reduce中,这样确保两个表中相同的key位于同一个reduce中。

  • Reduce阶段:根据key的值完成join操作,期间通过Tag来识别不同表中的数据。



  • 适用场景:适用于所有类型的表关联与其他类型join不支持的join类型,比如:full outer join.

Map Join——Hive MapJoin 优化历程FaceBook Join优化

  • 原理:如果关联的表足够小,那么可以将小表加载到mapper的内存中,在map端完成join,减少shuffle和reduce阶段。MapReduce Local Task会在真正的MapReduce Join Task之前,从HDFS读取小表,然后将其转成一个tar文件,最后将文件上传至HDFS Cache.MapReduce Local Task运行过程中,可能由于内存不足而失败,可以通过设置hive.mapjoin.localtask.max.memory.usage 来改变Local Task可使用的内存大小。 





  • Conditional Task: 当如果N-1张小表大小和小于“hive.mapjoin.smalltable.filesize”这个值,则创建Conditional Task。Conditional Tasks把每张表都是小表的情况都考虑进去了,然后加上一个所有表都不是小表的Common Join Task。 如:SELECT * FROM cities JOIN sales on cities.cityId=sales.cityId; 则Conditional Tasks结构如下。在作业执行过程中,Hive将获取到所有关联表的元数据,如:大小,位置等,然后选择一个从众多conditional tasks中选择一个task作为真正执行的task。 

  • BuckUp Task:在MapReduce Local Task执行过程中,如果由于内存不足,导致任务执行失败,此时会直接执行Common Join Task。(PS:生产环境上能正常走到BackUp Task的case比较少,一般会由于客户端OOM,任务直接退出了。) 



  • 相关参数:

# 是否自动转换common join为map join
set hive.auto.convert.join=true;
# 如果join的小表和小于该阈值,会尝试将Common join 转换成map join。通过explain命令,可以发现Operator树中有conditional Operator。 如果n-1张表大小和,小于该阈值,则生成conditional tasks。
hive.smalltable.filesize or hive.mapjoin.smalltable.filesize
# 如果join的小表小于该阈值,会直接将Common join转换成Map join。需要考虑到数据解压之后的实际大小,hive表在被解压后,文件大小可能会增大10倍。
hive.auto.convert.join.noconditionaltask.size
  • 适用场景:小表(维度表)join大表(事实表),不适用与Right/Full outer join.

  • 可以优化的点:

  • 根据作业中mapper的数量,来动态调整小表HashTable文件在HDFS上的副本数量。如果mapper有成千万个,可以适当提高cache的副本数量。

  • 通过提前采样小表中的值,动态调整小表大小阈值。如果采样发现小表中重复row较多,可以适度提高小表的上限。

Skew Join——Skewed Join Optimization

  • 原理:

  • 假设A、B两张表相互Join,A表在key=1上倾斜。

  • 若key为1,使用B的key=1的哈希表来计算结果,全部在mapper端完成;

  • A表其他key发送到reducer端来join,这个reduce也会从B的mapper中得到对应需要连接的数据;

  • 整个过程中,需要读两次B表,并且需要将两类结果UNION起来。 



  • 相关参数:

set hive.optimize.skewjoin=true;
set hive.skewjoin.key=100000;
  • 适用场景:

  • 其中一张表有大量数据集中在某几个Key上

  • 非倾斜表B,关于A的倾斜key的所有数据,需要能全部加载至内存中

  • 需要提前预知那些哪些Key是倾斜的

Bucket Join

  • 原理:Join全部在Mapper端进行,只有相互匹配的bucket才会聚集在一起,mapper1只会拉取a,c表的bucket1;Mapper3只会拉取a,c表的bucket2。整个任务在mapper端完成,去除了Shuffle和reduce阶段。 



  • 相关参数:

set hive.optimize.bucketmapjoin=true;
  • 适用场景分析:关联表的分桶数量成倍数表的分桶key,需要与join字段相同



附录



发布于: 2020 年 05 月 02 日阅读数: 115
用户头像

tkanng

关注

大数据研发工程师 2018.05.10 加入

欢迎私信交流。https://www.zhihu.com/people/tkanng

评论

发布
暂无评论
Hive 中的 GroupBy, Distinct 和 Join