「Hive 进阶篇」二、万字长文超详述 hive 企业级优化
大家好,我是 Akin,肝了几个晚上,梳理总结了一份万字长文超详述 hive 企业级优化文章,也整理了一份
hive优化总结思维导图
和hive优化详细PDF文档
,有需要可 评论区留言 or 文末原文链接 获取 PDF 文档保存本地吧,学习和复习都是绝佳,不断分享技术相关文章。话不多说,👇🏻下面就直接开讲吧!
一、问题背景
hive 离线数仓开发,一个良好的数据任务,它的运行时长一般是在合理范围内的,当发现报表应用层的指标数据总是产出延迟,排查定位发现是有些任务执行了超 10 小时这样肯定是不合理的,此时就该想想如何优化数据任务链路,主要从以下几个角度来考虑问题解决:
从数据任务本身 hive 逻辑代码出发,即 hive 逻辑优化,偏理解业务角度
从集群的资源设置出发,即 hive 参数调优,偏理解技术角度
从全局数据链路的任务设置出发,观测是否任务执行调度设置不合理
从数仓的数据易用性和模型复用性的角度出发,针对某些中间逻辑过程可以复用的就落地中间模型表
附上一份个人梳理总结的思维导图部分截图
下面就先分享下常见的 hive 优化策略吧~ 会附带案例实践帮助理解
hive 优化文章大纲
列裁剪和分区裁剪
提前数据收敛
谓词下推(PPD)
多路输出,减少表读取次数写多个结果表
合理选择排序
join 优化
合理选择文件存储格式和压缩方式
解决小文件过多问题
distinct 和 group by
参数调优
解决数据倾斜问题
二、hive 优化
1. 列裁剪和分区裁剪
裁剪 顾名思义就是不需要的数据不要多查。
列裁剪,尽量减少直接select * from table
这种操作,首先可读性不好,根本不知道具体用到哪几个列,其次列选择多了也会增大 IO 传输;分区裁剪就是针对分区表切记要加上分区过滤条件,比如表以时间作为分区字段,要加上分区筛选。
2. 提前数据收敛
在子查询中,有些条件能先过滤的尽量放在子查询里先过滤,减少子查询输出的数据量。
3. 谓词下推(Predicate Pushdown)
谓词下推 Predicate Pushdown 是什么?简称 PPD,指的是在不影响数据结果的情况下,将过滤表达式尽可能移动至靠近数据源的位置,以使真正执行时能直接跳过无关的数据,这样在 map 执行过滤条件,可以减少 map 端数据输出,起到了数据收敛的作用,降低了数据在集群上传输的量,节约了集群的资源,也提升了任务的性能。hive 默认是开启谓词下推该参数设置的,hive.optimize.ppd=true
所谓下推,即谓词过滤在 map 端执行;所谓不下推,即谓词过滤在 reduce 端执行。关于谓词下推的规则,主要分为 join 条件过滤下推和 where 条件过滤下推,我整理了一张图方便理解。
核心判断逻辑:on 条件过滤不能下推到保留行表中;where 条件过滤不能下推到 null 补充表中。
谓词下推注意事项:如果在表达式中含有不确定函数,整个表达式的谓词将不会被下推。例如下面脚本,则整个条件过滤都是在 reduce 端执行:
因为上面unix_timestamp()
是不确定函数,在编译的时候无法得知,所以,整个表达式不会被下推,即 ds='2022-07-04'也不会被提前过滤。类似的不确定函数还有rand()
函数等。
附上一篇关于谓词下推的案例分析讲解!上链接:https://cloud.tencent.com/developer/article/1616687
4. 多路输出
当我们有使用一次查询,多次插入
的场景时,则可以采用多路输出的写法,减少表的读取次数,起到性能优化的作用。
多路输出注意事项:
一般情况下,一个 sql 里面最多支持 128 路输出,超过了则会报错
在多插往同一张分区表的不同分区时,不允许在一个 sql 里面多路输出时既包含 insert overwrite 和 insert into,要统一操作
5. 合理选择排序
order by
全局排序,只走一个reducer
,当表数据量较大时容易计算不出来,性能不佳慎用,在严格模式下需要加 limitsort by
局部排序,即保证单个 reduce 内结果有序,但没有全局排序的能力。
distribute by
按照指定的字段把数据划分输出到不同的reducer中,是控制数据如何从map端输出到reduce端
,hive 会根据 distribute by 后面的字段和对应 reducer 的个数进行 hash 分发cluster by
拥有 distrubute by 的能力,同时也拥有 sort by 的能力,
所以可以理解cluster by是 distrubute by+sort by
以下举个排序方式优化案例,取用户信息表(10 亿数据量)中年龄排前 100 的用户信息:
以下案例实现也体现了一个大数据思想,分而治之,大job拆分小job。
排序选择的小结:
order by 全局排序,但只有一个 reducer 执行,数据量大的话容易计算不过来,慎用
sort by 局部排序,单个 reducer 内有序,把 map 端随机分发给 reduce 端执行,如果是要实现全局排序且走多个 reducer 的优化需求时,可以在外层嵌套一层,
例如:select * from (select * from 表名 sort by 字段名 limit N) order by 字段名 limit N
,这样就有 2 个 Job,一个是内层的局部排序,一个是外层的归并全局排序distribute by 可以按照指定字段将数据进行 hash 分发到对应的 reducer 去执行
当分区字段和排序字段相同时可以使用cluster by来简化distribute by+sort by的写法
,但是 cluster by 排序只能是升序排序,不能指定排序规则是 ASC 或者 DESC
6. join 优化
hive 在 redurce 阶段完成的 join 就是 common join,在 map 阶段完成的 join 就是 map join。
提前收敛数据量,保证在 join 关联前无用数据不参与关联
这块可以跟前面的数据收敛模块 &谓词下推模块 搭配起来看,主要就是提前收敛数据量,不止在 join 场景,在其他复杂计算前同样适用。
left semi join 左半关联
left semi join
一开始出现的使用场景其实是解决 hive 不支持 in/exists 子查询的高效实现,虽然 left semi join 含有 left,但其实不是保留左表全部数据,效果类似于 join 吧,只是最终结果只取左表中的列,还有最终结果某些场景下会跟 join 结果不同。
left semi join 注意事项:
右表的条件过滤只能写在 on 后面,不能写在 where 后面
最终结果只能展示左表的列,右表的列不能展示
left semi join 与 join 的差异:主要在于右表有重复数据时,left semi join 是遍历到右表一条数据后就跳过,只取一条,而 join 是一直遍历至右表最后一条数据,这也就是要注意实际数据场景是否有重复和是否要保留
大表 join 小表场景
大表 join 小表的话,
要把小表放在左边,大表放在右边
,这是因为 join 操作发生在 reduce 阶段,在 hive2.x 版本以前,位于左边的表会被加载进内存中,所以如果是大表放左边被加载进内存的话就会有内存溢出的风险,不过在 hive2.x 版本后就已经优化好这块了,无需关注,底层帮我们优化好这个问题了。启用 mapjoin
mapjoin就是把join的表直接分发到map端的内存中,即在map端来执行join操作
,就不用在 reduce 阶段进行 join 了,提高了执行效率。如果表比较小的话最好是启用 mapjoin,hive 默认是开启自动 mapjoin 的。
大表 join 大表场景
举例,假设 a 表是包括许多空值的数据,b 表是不包含空值的数据
1、空key过滤,过滤空key的数据
关联的过程是相同 key 对应的数据都会发送到相同的 reducer 上,如果某些空 key 过多是会导致内存不够的,从而引发 join 超时,所以如果不需要这类空 key 数据的时候,可以先过滤掉这些异常数据。
2、空key转换,转换key的数据进行关联时打散key
当然,有时候空值的数据又不一定是异常数据,还是需要保留的,但是空 key 过多都分配到一个 reducer 去了,这样执行起来就算不内存溢出也会发生数据倾斜情况,数据倾斜的话对集群资源的利用率来看的话是极其不利的,我们可以通过把空 key 虚拟成随机数,但要保证不是同一个空 key,从而降低数据倾斜概率,虽然这样在对关联键做处理反而会总体增长执行时间,但却减轻了 reducer 负担。
避免笛卡尔积
尽量避免笛卡尔积,即避免 join 的时候不加 on 条件,或者无效的 on 条件,因为 Hive 只能使用 1 个 reducer 来完成笛卡尔积,不过这点 hive 会通过严格模式下来提醒,在严格模式下出现笛卡尔积时报错。
7. 合理选择文件存储格式和压缩方式
关于这点,我专门写过一篇文章介绍 hive 常见的几种存储格式和压缩方式,具体可以去上次我写过的这篇文章看看!上链接:https://mp.weixin.qq.com/s/RndQKF5y9Mto7QfgiiAOvQ
8. 解决小文件过多问题
先来说一说什么是小文件,怎么发生的
顾名思义,小文件就是文件很小的文件,小文件的产生一定是发生在向hive表导入数据的时候
,比如:
MR 中 reduce 有多少个就输出多少个文件,文件数量 = reduce数量 * 分区数
,如果说某些简单 job 没有 reduce 阶段只有 map 阶段,那文件数量 = map数量 * 分区数
。从公式上看,reduce 的个数和分区数最终决定了输出的文件的个数,所以可以调整 reduce 的个数以及分区 达到控制 hive 表的文件数量。
小文件过多有什么影响
首先第一点从HDFS底层来看
,小文件过多会给集群 namenode 带来负担,即 namenode 元数据大占用内存,影响 HDFS 的性能第二点从hive来看
,在进行查询时,每个小文件都会当成一个块,启动一个 Map 任务来完成,而一个 Map 任务启动和初始化的时间远远大于逻辑处理的时间,就会造成很大的资源浪费如何解决小文件过多问题
1、使用hive自带的 concatenate 命令,来合并小文件
不过要注意的是 concatenate 命令只支持 hive 表存储格式是 orcfile 或者 rcfile,还有该方式不支持指定合并后的文件数量
2、调整参数减少Map数
设置 map 输入合并小文件
设置 map 输出和 reduce 输出合并小文件
3、调整参数减少Reduce数
9. count(distinct ) 和 group by
在计算去重指标的时候,比如不同年龄段的用户数这个指标,一般都是采用count(distinct user_id)
直接计算,当表数据量不大的话影响不大,但如果数据量大 count distinct 就很耗性能了,因为其只会用一个 reduce task 来执行,容易 reduce 端数据倾斜,通常优化就使用里层group by age然后再外层count(user_id)
来替代。
注意事项:关于使用
里层group by age然后再外层count(user_id)
来替代count(distinct user_id)
直接去重计算是否一定就起到优化效果这也是看情况的,假设表数据量不是特别大,有些情况下里层group by age然后再外层count(user_id)
未必就见得比count(distinct user_id)
好。所以还是具体业务场景具体分析为好,优化从来不是考虑局部就好,要全局考虑。
hive3.x 版本里已经新增了对
count(distinct )
的优化,通过set hive.optimize.countdistinct
配置,即使真的出现数据倾斜也可以自动优化,自动改变 SQL 执行的逻辑
里层group by age然后再外层count(user_id)
这种方式会生成 2 个 job 任务,会消耗更多的磁盘网络 I/O 资源
10. 参数调优
set hive.optimize.countdistinct=true
开启对 count(distinct )的自动优化set hive.auto.convert.join = true;
开启自动 mapjoinset hive.mapjoin.smalltable.filesize=26214400;
大表小表的阈值设置(默认 25M 一下认为是小表)set hive.exec.parallel=true;
打开任务并行执行set hive.exec.parallel.thread.number=16;
同一个 sql 允许最大并行度,默认值为 8。默认情况下,Hive 一次只会执行一个阶段。开启并行执行时会把一个 sql 语句中没有相互依赖的阶段并行去运行,这样可能使得整个 job 的执行时间缩短。提高集群资源利用率,不过这当然得是在系统资源比较空闲的时候才有优势,否则没资源,并行也起不来。set hive.map.aggr=true;
默认值是 true,当选项设定为 true 时,开启 map 端部分聚合set hive.groupby.skewindata = ture;
默认值是 false,当有数据倾斜的时候进行负载均衡,生成的查询计划有两个 MapReduce 任务,第一个 MR Job 中,Map 的输出结果会随机分布到 Reduce 中,每个 Reduce 做部分聚合操作,并输出结果,这样处理的结果是相同的 Group By Key 有可能被分发到不同的 Reduce 中,从而达到负载均衡的目的;第二个 MR Job 再根据预处理的数据结果按照 Group By Key 分布到 Reduce 中(这个过程可以保证相同的 Group By Key 被分布到同一个 Reduce 中),最后完成最终的聚合操作set hive.mapred.mode=strict;
设置严格模式,默认值是 nonstrict 非严格模式。严格模式下会禁止以下 3 种类型不合理查询,即以下 3 种情况会报错对于查询分区表,必须 where 加上分区限制条件使用 order by 全局排序时,必须加上 limit 限制数据查询条数限制了笛卡尔积查询set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
设置 map 端执行前合并小文件set hive.exec.compress.output=true;
设置 hive 的查询结果输出是否进行压缩set mapreduce.output.fileoutputformat.compress=true;
设置 MapReduce Job 的结果输出是否使用压缩set hive.cbo.enable=false;
关闭 CBO 优化,默认值 true 开启,可以自动优化 HQL 中多个 JOIN 的顺序,并选择合适的 JOIN 算法
11. 解决数据倾斜问题
什么是数据倾斜
数据倾斜是大量的相同key被partition分配到同一个reduce里,造成了'一个人累死,其他人闲死'的情况
,违背了并行计算的初衷,而且当其他节点计算好了还要等待这个忙碌节点的计算,效率就被拉低了数据倾斜的明显表现
任务进度长时间维持在99%
,查看任务监控页面,发现只有少量(1 个或几个)reduce 子任务未完成。因为其处理的数据量和其他 reduce 差异过大数据倾斜的根本原因是什么?
key 分布不均匀,redurce 数据处理不均匀
如何尽量避免数据倾斜
如何将数据均匀的分配到各个 reduce 中,就是避免数据倾斜的根本所在。举例下 2 个典型案例,关于 join 操作发生的数据倾斜和解决方案:
就在文章上面的第六点join优化【大表join大表场景】
,还有合理设置 map 数和 reduce 数的解决方案。合理设置 map 数和 reduce 数
1、Map端优化
通常情况下,Job 会通过 input 目录产生一个或多个 map 任务,map数主要取决与input的文件总个数,文件总大小,集群设置的文件块大小。
从 hadoop2.7.3 版本开始,HDFS 的默认块大小 block size 是 128M。每张 hive 表在 hdfs 上对应存储都是一个文件,关于执行 task 时,每一个 128M 的文件都是一个块 block,每个块就用一个 map 任务来完成,若文件超过 128M 就分块,若小于 128M 则独立成块。那么:①当小文件过多怎么办?
答案是 map 任务增多,map 任务的启动和初始化时间远大于执行逻辑处理时间,从而集群造成资源浪费。②是不是让每个文件都接近128M大小就毫无问题了呢?
答案是不可能,假设一个文件大小 127M,但表只有一两个字段,文件大小是由几千万条记录撑大的,如果数据处理逻辑复杂则用一个 map 任务去执行也是很耗时的。③是不是map数越多越好?
答案是这种说法是片面的,map 数增多有利于提升并行度,但一个 map 在启动和初始化时间是远大于执行逻辑处理时间,越多的 map 启动初始化就造成很大的集群资源浪费。
减少map数量,降低资源浪费,如何做?
以下相当于是把小文件合并成大文件处理 (多合一)
有时候对 hive 进行优化,在执行时间上可能没什么大的改观,但是在计算资源上就有很大改善。
增大map数量,分担每个map处理的数据量提升任务效率,如何做?
以下相当于是把小文件合并成大文件处理 (一拆多)
根据 mapreduce 切片的公式:computeSliteSize(Math.max(minSize,Math.min(maxSize,blocksize)))
,从公式可以看出调整 maxSize 最大值,让 maxSize 最大值低于 blocksize 就可以增加 map 的个数。
mapreduce.input.fileinputformat.split.minsize(切片最小值)
,默认值=1,参数调的比 blockSize 大,则可以让切片变得比 blocksize 还大,从而减少 map 数mapreduce.input.fileinputformat.split.maxsize(切片最大值)
,默认值=blocksize 块大小,参数如果调到比 blocksize 小,则会让切片变小,从而增大 map 数
2、Reduce端优化
reduce 个数设置过大也会产生很多小文件对 namenode 有影响,且输出的小文件偶尔也会作为下一个任务的输入导致出现小文件过多问题,设置过小又会导致单个 reduce 处理的数据量过大导致 OOM 异常。不指定时则 hive 会默认根据计算公式hive.exec.reducers.bytes.per.reducer
(每个 reduce 任务处理数据量,默认 1G)和hive.exec.reducers.max
(每个任务的最大 reduce 数,默认 1009 个),来做min(hive.exec.reducers.max值,总输入数据量/hive.exec.reducers.bytes.per.reducer值)
计算,得出结果确定 reduce 个数,所以可以通过调整参数 1 和参数 2 来调整 reduce 个数,不过最简便的还是通过下面的参数来直接控制 reduce 个数。
那么:①reduce数是不是越多越好?
答案是错误的,同 map 数一样,启动 reduce 和初始化同样耗时和占资源,而且过多的 reduce 会生成多个文件,同样会出现小文件问题。②什么情况下当设置了参数指定reduce个数后还是只有单个reduce在跑?
本身输入数据量就小于 1G
在做测数据量验证时没加 group by 分组汇总。比如
select count(1) from test_table where dt = 20201228;
用了 order by 排序
关联出现了笛卡尔积
合理设置 map 数和 reduce 数的小结:
set hive.input.format = org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
//系统默认格式,设置在 map 执行前合并小文件,减少 map 数
set mapreduce.input.fileinputformat.split.maxsize = 100;
//调整最大切片值,让 maxSize 值低于 blocksize 就可以增加 map 数根据 mapreduce 切片的公式:
computeSliteSize(Math.max(minSize,Math.min(maxSize,blocksize)))
,从公式可以看出调整 maxSize 最大值,让 maxSize 最大值低于 blocksize,从而使切片变小,就可以增加 map 的个数
三、总结
日常 hive 开发中时刻养成提前数据收敛的习惯,避免无用数据参与到计算中
不要过度进行优化,有可能做的是无用功甚至产生负效应,在调优上投入的工作成本和回报不成正比
对于公共可复用的逻辑代码,可以抽取出来落地临时表或者中间表,提升复用性,
强调复用!
理解 hiveQL 底层执行的原理,优化起来才有章可循
理透需求是代码优化的前提,关注全局数据链路,一些常见的 hive 优化策略要懂
做 hive 优化的时候,涉及到参数调优时要慎重,比如把内存都申请抢占满了,避免因为你自己的任务调优了但影响到整个集群其他任务的资源分配,
全局优才是优!
整理了一份
hive优化总结思维导图
和hive优化详细PDF文档
,有需要可 评论区留言 or 文末原文链接 获取 PDF 文档保存本地吧,学习和复习都是绝佳。
分享就到此结束了,建议收藏吸纳消化,博文不易,欢迎👏🏻点赞+转发
微信公众号推文链接:https://mp.weixin.qq.com/s/GHwYVEwAS8WgNBLb14NC4A
版权声明: 本文为 InfoQ 作者【大数据阶梯之路】的原创文章。
原文链接:【http://xie.infoq.cn/article/c4bb62622c0b6cdcd566f72f0】。文章转载请联系作者。
评论