大数据培训:HiveSQL 技术优化与面试
Hive SQL 编译成 MapReduce 过程
编译 SQL 的任务是在上节中介绍的 COMPILER(编译器组件)中完成的。Hive 将 SQL 转化为 MapReduce 任务,整个编译过程分为六个阶段:
词法、语法解析: Antlr 定义 SQL 的语法规则,完成 SQL 词法,语法解析,将 SQL 转化为抽象语法树 AST Tree;
Antlr 是一种语言识别的工具,可以用来构造领域语言。使用 Antlr 构造特定的语言只需要编写一个语法文件,定义词法和语法替换规则即可,Antlr 完成了词法分析、语法分析、语义分析、中间代码生成的过程。
语义解析: 遍历 AST Tree,抽象出查询的基本组成单元 QueryBlock;
生成逻辑执行计划: 遍历 QueryBlock,翻译为执行操作树 OperatorTree;
优化逻辑执行计划: 逻辑层优化器进行 OperatorTree 变换,合并 Operator,达到减少 MapReduce Job,减少数据传输及 shuffle 数据量;
生成物理执行计划: 遍历 OperatorTree,翻译为 MapReduce 任务;
优化物理执行计划: 物理层优化器进行 MapReduce 任务的变换,生成最终的执行计划。
下面对这六个阶段详细解析:
为便于理解,我们拿一个简单的查询语句进行展示,对 5 月 23 号的地区维表进行查询:
select * from dim.dim_region where dt = '2021-05-23';
阶段一:词法、语法解析
根据 Antlr 定义的 sql 语法规则,将相关 sql 进行词法、语法解析,转化为抽象语法树 AST Tree:
ABSTRACT SYNTAX TREE:
TOK_QUERY
TOK_FROM
TOK_TABREF
TOK_TABNAME
dim
dim_region
TOK_INSERT
TOK_DESTINATION
TOK_DIR
TOK_TMP_FILE
TOK_SELECT
TOK_SELEXPR
TOK_ALLCOLREF
TOK_WHERE
=
TOK_TABLE_OR_COL
dt
'2021-05-23'
阶段二:语义解析
遍历 AST Tree,抽象出查询的基本组成单元 QueryBlock:
AST Tree 生成后由于其复杂度依旧较高,不便于翻译为 mapreduce 程序,需要进行进一步抽象和结构化,形成 QueryBlock。
QueryBlock 是一条 SQL 最基本的组成单元,包括三个部分:输入源,计算过程,输出。简单来讲一个 QueryBlock 就是一个子查询。
QueryBlock 的生成过程为一个递归过程,先序遍历 AST Tree ,遇到不同的 Token 节点(理解为特殊标记),保存到相应的属性中。
阶段三:生成逻辑执行计划
遍历 QueryBlock,翻译为执行操作树 OperatorTree:
Hive 最终生成的 MapReduce 任务,Map 阶段和 Reduce 阶段均由 OperatorTree 组成。
基本的操作符包括:
TableScanOperator
SelectOperator
FilterOperator
JoinOperator
GroupByOperator
ReduceSinkOperator
Operator 在 Map Reduce 阶段之间的数据传递都是一个流式的过程。每一个 Operator 对一行数据完成操作后之后将数据传递给 childOperator 计算。
由于 Join/GroupBy/OrderBy 均需要在 Reduce 阶段完成,所以在生成相应操作的 Operator 之前都会先生成一个 ReduceSinkOperator,将字段组合并序列化为 Reduce Key/value, Partition Key。
阶段四:优化逻辑执行计划
Hive 中的逻辑查询优化可以大致分为以下几类:
投影修剪
推导传递谓词
谓词下推
将 Select-Select,Filter-Filter 合并为单个操作
多路 Join
查询重写以适应某些列值的 Join 倾斜
阶段五:生成物理执行计划
生成物理执行计划即是将逻辑执行计划生成的 OperatorTree 转化为 MapReduce Job 的过程,主要分为下面几个阶段:
对输出表生成 MoveTask
从 OperatorTree 的其中一个根节点向下深度优先遍历
ReduceSinkOperator 标示 Map/Reduce 的界限,多个 Job 间的界限
遍历其他根节点,遇过碰到 JoinOperator 合并 MapReduceTask
生成 StatTask 更新元数据
剪断 Map 与 Reduce 间的 Operator 的关系
阶段六:优化物理执行计划
Hive 中的物理优化可以大致分为以下几类:
分区修剪(Partition Pruning)
基于分区和桶的扫描修剪(Scan pruning)
如果查询基于抽样,则扫描修剪
在某些情况下,在 map 端应用 Group By
在 mapper 上执行 Join
优化 Union,使 Union 只在 map 端执行
在多路 Join 中,根据用户提示决定最后流哪个表
删除不必要的 ReduceSinkOperators
对于带有 Limit 子句的查询,减少需要为该表扫描的文件数
对于带有 Limit 子句的查询,通过限制 ReduceSinkOperator 生成的内容来限制来自 mapper 的输出
减少用户提交的 SQL 查询所需的 Tez 作业数量
如果是简单的提取查询,避免使用 MapReduce 作业
对于带有聚合的简单获取查询,执行不带 MapReduce 任务的聚合
重写 Group By 查询使用索引表代替原来的表
当表扫描之上的谓词是相等谓词且谓词中的列具有索引时,使用索引扫描
经过以上六个阶段,SQL 就被解析映射成了集群上的 MapReduce 任务。
SQL 编译成 MapReduce 具体原理
在阶段五-生成物理执行计划,即遍历 OperatorTree,翻译为 MapReduce 任务,这个过程具体是怎么转化的呢
我们接下来举几个常用 SQL 语句转化为 MapReduce 的具体步骤:
Join 的实现原理
以下面这个 SQL 为例,讲解 join 的实现:
select u.name, o.orderid from order o join user u on o.uid = u.uid;
在 map 的输出 value 中为不同表的数据打上 tag 标记,在 reduce 阶段根据 tag 判断数据来源。MapReduce 的过程如下:
MapReduce CommonJoin 的实现
Group By 的实现原理
以下面这个 SQL 为例,讲解 group by 的实现:
select rank, isonline, count(*) from city group by rank, isonline;
将 GroupBy 的字段组合为 map 的输出 key 值,利用 MapReduce 的排序,在 reduce 阶段保存 LastKey 区分不同的 key。MapReduce 的过程如下:
Distinct 的实现原理
以下面这个 SQL 为例,讲解 distinct 的实现:
select dealid, count(distinct uid) num from order group by dealid;
当只有一个 distinct 字段时,如果不考虑 Map 阶段的 Hash GroupBy,只需要将 GroupBy 字段和 Distinct 字段组合为 map 输出 key,利用 mapreduce 的排序,大数据培训同时将 GroupBy 字段作为 reduce 的 key,在 reduce 阶段保存 LastKey 即可完成去重:
Hive 千亿级数据倾斜
数据倾斜问题剖析
数据倾斜是分布式系统不可避免的问题,任何分布式系统都有几率发生数据倾斜,但有些小伙伴在平时工作中感知不是很明显,这里要注意本篇文章的标题—“千亿级数据”,为什么说千亿级,因为如果一个任务的数据量只有几百万,它即使发生了数据倾斜,所有数据都跑到一台机器去执行,对于几百万的数据量,一台机器执行起来还是毫无压力的,这时数据倾斜对我们感知不大,只有数据达到一个量级时,一台机器应付不了这么多的数据,这时如果发生数据倾斜,那么最后就很难算出结果。
所以就需要我们对数据倾斜的问题进行优化,尽量避免或减轻数据倾斜带来的影响。
在解决数据倾斜问题之前,还要再提一句:没有瓶颈时谈论优化,都是自寻烦恼。
大家想想,在 map 和 reduce 两个阶段中,最容易出现数据倾斜的就是 reduce 阶段,因为 map 到 reduce 会经过 shuffle 阶段,在 shuffle 中默认会按照 key 进行 hash,如果相同的 key 过多,那么 hash 的结果就是大量相同的 key 进入到同一个 reduce 中,导致数据倾斜。
那么有没有可能在 map 阶段就发生数据倾斜呢,是有这种可能的。
一个任务中,数据文件在进入 map 阶段之前会进行切分,默认是 128M 一个数据块,但是如果当对文件使用 GZIP 压缩等不支持文件分割操作的压缩方式时,MR 任务读取压缩后的文件时,是对它切分不了的,该压缩文件只会被一个任务所读取,如果有一个超大的不可切分的压缩文件被一个 map 读取时,就会发生 map 阶段的数据倾斜。
所以,从本质上来说,发生数据倾斜的原因有两种:一是任务中需要处理大量相同的 key 的数据。二是任务读取不可分割的大文件。
数据倾斜解决方案
MapReduce 和 Spark 中的数据倾斜解决方案原理都是类似的,以下讨论 Hive 使用 MapReduce 引擎引发的数据倾斜,Spark 数据倾斜也可以此为参照。
空值引发的数据倾斜
实际业务中有些大量的 null 值或者一些无意义的数据参与到计算作业中,表中有大量的 null 值,如果表之间进行 join 操作,就会有 shuffle 产生,这样所有的 null 值都会被分配到一个 reduce 中,必然产生数据倾斜。
之前有小伙伴问,如果 A、B 两表 join 操作,假如 A 表中需要 join 的字段为 null,但是 B 表中需要 join 的字段不为 null,这两个字段根本就 join 不上啊,为什么还会放到一个 reduce 中呢?
这里我们需要明确一个概念,数据放到同一个 reduce 中的原因不是因为字段能不能 join 上,而是因为 shuffle 阶段的 hash 操作,只要 key 的 hash 结果是一样的,它们就会被拉到同一个 reduce 中。
解决方案:
第一种:可以直接不让 null 值参与 join 操作,即不让 null 值有 shuffle 阶段
SELECT *
FROM log a
JOIN users b
ON a.user_id IS NOT NULL
AND a.user_id = b.user_id
UNION ALL
SELECT *
FROM log a
WHERE a.user_id IS NULL;
第二种:因为 null 值参与 shuffle 时的 hash 结果是一样的,那么我们可以给 null 值随机赋值,这样它们的 hash 结果就不一样,就会进到不同的 reduce 中:
SELECT *
FROM log a
LEFT JOIN users b ON CASE
WHEN a.user_id IS NULL THEN concat('hive_', rand())
ELSE a.user_id
END = b.user_id;
不同数据类型引发的数据倾斜
对于两个表 join,表 a 中需要 join 的字段 key 为 int,表 b 中 key 字段既有 string 类型也有 int 类型。当按照 key 进行两个表的 join 操作时,默认的 Hash 操作会按 int 型的 id 来进行分配,这样所有的 string 类型都被分配成同一个 id,结果就是所有的 string 类型的字段进入到一个 reduce 中,引发数据倾斜。
解决方案:
如果 key 字段既有 string 类型也有 int 类型,默认的 hash 就都会按 int 类型来分配,那我们直接把 int 类型都转为 string 就好了,这样 key 字段都为 string,hash 时就按照 string 类型分配了:
SELECT *
FROM users a
LEFT JOIN logs b ON a.usr_id = CAST(b.user_id AS string);
不可拆分大文件引发的数据倾斜
当集群的数据量增长到一定规模,有些数据需要归档或者转储,这时候往往会对数据进行压缩;当对文件使用 GZIP 压缩等不支持文件分割操作的压缩方式,在日后有作业涉及读取压缩后的文件时,该压缩文件只会被一个任务所读取。如果该压缩文件很大,则处理该文件的 Map 需要花费的时间会远多于读取普通文件的 Map 时间,该 Map 任务会成为作业运行的瓶颈。这种情况也就是 Map 读取文件的数据倾斜。
解决方案:
这种数据倾斜问题没有什么好的解决方案,只能将使用 GZIP 压缩等不支持文件分割的文件转为 bzip 和 zip 等支持文件分割的压缩方式。
所以,我们在对文件进行压缩时,为避免因不可拆分大文件而引发数据读取的倾斜,在数据压缩的时候可以采用 bzip2 和 Zip 等支持文件分割的压缩算法。
数据膨胀引发的数据倾斜
在多维聚合计算时,如果进行分组聚合的字段过多,如下:
select a,b,c,count(1)from log group by a,b,c with rollup;
注:对于最后的 with rollup 关键字不知道大家用过没,with rollup 是用来在分组统计数据的基础上再进行统计汇总,即用来得到 group by 的汇总信息。
如果上面的 log 表的数据量很大,并且 Map 端的聚合不能很好地起到数据压缩的情况下,会导致 Map 端产出的数据急速膨胀,这种情况容易导致作业内存溢出的异常。如果 log 表含有数据倾斜 key,会加剧 Shuffle 过程的数据倾斜。
解决方案:
可以拆分上面的 sql,将 with rollup 拆分成如下几个 sql:
SELECT a, b, c, COUNT(1)
FROM log
GROUP BY a, b, c;
SELECT a, b, NULL, COUNT(1)
FROM log
GROUP BY a, b;
SELECT a, NULL, NULL, COUNT(1)
FROM log
GROUP BY a;
SELECT NULL, NULL, NULL, COUNT(1)
FROM log;
但是,上面这种方式不太好,因为现在是对 3 个字段进行分组聚合,那如果是 5 个或者 10 个字段呢,那么需要拆解的 SQL 语句会更多。
在 Hive 中可以通过参数 hive.new.job.grouping.set.cardinality 配置的方式自动控制作业的拆解,该参数默认值是 30。表示针对 grouping sets/rollups/cubes 这类多维聚合的操作,如果最后拆解的键组合大于该值,会启用新的任务去处理大于该值之外的组合。如果在处理数据时,某个分组聚合的列有较大的倾斜,可以适当调小该值。
表连接时引发的数据倾斜
两表进行普通的 repartition join 时,如果表连接的键存在倾斜,那么在 Shuffle 阶段必然会引起数据倾斜。
解决方案:
通常做法是将倾斜的数据存到分布式缓存中,分发到各个 Map 任务所在节点。在 Map 阶段完成 join 操作,即 MapJoin,这避免了 Shuffle,从而避免了数据倾斜。
MapJoin 是 Hive 的一种优化操作,其适用于小表 JOIN 大表的场景,由于表的 JOIN 操作是在 Map 端且在内存进行的,所以其并不需要启动 Reduce 任务也就不需要经过 shuffle 阶段,从而能在一定程度上节省资源提高 JOIN 效率。
在 Hive 0.11 版本之前,如果想在 Map 阶段完成 join 操作,必须使用 MAPJOIN 来标记显示地启动该优化操作,由于其需要将小表加载进内存所以要注意小表的大小。
如将 a 表放到 Map 端内存中执行,在 Hive 0.11 版本之前需要这样写:
select /* +mapjoin(a) */ a.id , a.name, b.age
from a join b
on a.id = b.id;
如果想将多个表放到 Map 端内存中,只需在 mapjoin()中写多个表名称即可,用逗号分隔,如将 a 表和 c 表放到 Map 端内存中,则 / +mapjoin(a,c) / 。
在 Hive 0.11 版本及之后,Hive 默认启动该优化,也就是不在需要显示的使用 MAPJOIN 标记,其会在必要的时候触发该优化操作将普通 JOIN 转换成 MapJoin,可以通过以下两个属性来设置该优化的触发时机:
hive.auto.convert.join=true 默认值为 true,自动开启 MAPJOIN 优化。
hive.mapjoin.smalltable.filesize=2500000 默认值为 2500000(25M),通过配置该属性来确定使用该优化的表的大小,如果表的大小小于此值就会被加载进内存中。
注意:使用默认启动该优化的方式如果出现莫名其妙的 BUG(比如 MAPJOIN 并不起作用),就将以下两个属性置为 fase 手动使用 MAPJOIN 标记来启动该优化:
hive.auto.convert.join=false (关闭自动 MAPJOIN 转换操作)
hive.ignore.mapjoin.hint=false (不忽略 MAPJOIN 标记)
再提一句:将表放到 Map 端内存时,如果节点的内存很大,但还是出现内存溢出的情况,我们可以通过这个参数 mapreduce.map.memory.mb 调节 Map 端内存的大小。
确实无法减少数据量引发的数据倾斜
在一些操作中,我们没有办法减少数据量,如在使用 collect_list 函数时:
select s_age,collect_list(s_score) list_score
from student
group by s_age
collect_list:将分组中的某列转为一个数组返回。
在上述 sql 中,s_age 有数据倾斜,但如果数据量大到一定的数量,会导致处理倾斜的 Reduce 任务产生内存溢出的异常。
collect_list 输出一个数组,中间结果会放到内存中,所以如果 collect_list 聚合太多数据,会导致内存溢出。
有小伙伴说这是 group by 分组引起的数据倾斜,可以开启 hive.groupby.skewindata 参数来优化。我们接下来分析下:
开启该配置会将作业拆解成两个作业,第一个作业会尽可能将 Map 的数据平均分配到 Reduce 阶段,并在这个阶段实现数据的预聚合,以减少第二个作业处理的数据量;第二个作业在第一个作业处理的数据基础上进行结果的聚合。
hive.groupby.skewindata 的核心作用在于生成的第一个作业能够有效减少数量。但是对于 collect_list 这类要求全量操作所有数据的中间结果的函数来说,明显起不到作用,反而因为引入新的作业增加了磁盘和网络 I/O 的负担,而导致性能变得更为低下。
解决方案:
这类问题最直接的方式就是调整 reduce 所执行的内存大小。
调整 reduce 的内存大小使用 mapreduce.reduce.memory.mb 这个配置。
Hive 执行计划
Hive SQL 的执行计划描述 SQL 实际执行的整体轮廓,通过执行计划能了解 SQL 程序在转换成相应计算引擎的执行逻辑,掌握了执行逻辑也就能更好地把握程序出现的瓶颈点,从而能够实现更有针对性的优化。此外还能帮助开发者识别看似等价的 SQL 其实是不等价的,看似不等价的 SQL 其实是等价的 SQL。可以说执行计划是打开 SQL 优化大门的一把钥匙。
要想学 SQL 执行计划,就需要学习查看执行计划的命令:explain,在查询语句的 SQL 前面加上关键字 explain 是查看执行计划的基本方法。
学会 explain,能够给我们工作中使用 hive 带来极大的便利!
查看 SQL 的执行计划
Hive 提供的执行计划目前可以查看的信息有以下几种:
explain:查看执行计划的基本信息;
explain dependency:dependency 在 explain 语句中使用会产生有关计划中输入的额外信息。它显示了输入的各种属性;
explain authorization:查看 SQL 操作相关权限的信息;
explain vectorization:查看 SQL 的向量化描述信息,显示为什么未对 Map 和 Reduce 进行矢量化。从 Hive 2.3.0 开始支持;
explain analyze:用实际的行数注释计划。从 Hive 2.2.0 开始支持;
explain cbo:输出由 Calcite 优化器生成的计划。CBO 从 Hive 4.0.0 版本开始支持;
explain locks:这对于了解系统将获得哪些锁以运行指定的查询很有用。LOCKS 从 Hive 3.2.0 开始支持;
explain ast:输出查询的抽象语法树。AST 在 Hive 2.1.0 版本删除了,存在 bug,转储 AST 可能会导致 OOM 错误,将在 4.0.0 版本修复;
explain extended:加上 extended 可以输出有关计划的额外信息。这通常是物理信息,例如文件名,这些额外信息对我们用处不大;
1. explain 的用法
Hive 提供了 explain 命令来展示一个查询的执行计划,这个执行计划对于我们了解底层原理,Hive 调优,排查数据倾斜等很有帮助。
使用语法如下:
explain query;
在 hive cli 中输入以下命令(hive 2.3.7):
explain select sum(id) from test1;
得到结果:
STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-0 depends on stages: Stage-1
STAGE PLANS:
Stage: Stage-1
Map Reduce
Map Operator Tree:
TableScan
alias: test1
Statistics: Num rows: 6 Data size: 75 Basic stats: COMPLETE Column stats: NONE
Select Operator
expressions: id (type: int)
outputColumnNames: id
Statistics: Num rows: 6 Data size: 75 Basic stats: COMPLETE Column stats: NONE
Group By Operator
aggregations: sum(id)
mode: hash
outputColumnNames: _col0
Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
Reduce Output Operator
sort order:
Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
value expressions: _col0 (type: bigint)
Reduce Operator Tree:
Group By Operator
aggregations: sum(VALUE._col0)
mode: mergepartial
outputColumnNames: _col0
Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
File Output Operator
compressed: false
Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
Stage: Stage-0
Fetch Operator
limit: -1
Processor Tree:
ListSink
看完以上内容有什么感受,是不是感觉都看不懂,不要着急,下面将会详细讲解每个参数,相信你学完下面的内容之后再看 explain 的查询结果将游刃有余。
一个 HIVE 查询被转换为一个由一个或多个 stage 组成的序列(有向无环图 DAG)。这些 stage 可以是 MapReduce stage,也可以是负责元数据存储的 stage,也可以是负责文件系统的操作(比如移动和重命名)的 stage。
我们将上述结果拆分看,先从最外层开始,包含两个大的部分:
先看第一部分 stage dependencies ,包含两个 stage,Stage-1 是根 stage,说明这是开始的 stage,Stage-0 依赖 Stage-1,Stage-1 执行完成后执行 Stage-0。
stage dependencies:各个 stage 之间的依赖性
stage plan:各个 stage 的执行计划
再看第二部分 stage plan,里面有一个 Map Reduce,一个 MR 的执行计划分为两个部分:
Map Operator Tree:MAP 端的执行计划树
Reduce Operator Tree:Reduce 端的执行计划树
这两个执行计划树里面包含这条 sql 语句的 operator:
TableScan:表扫描操作,map 端第一个操作肯定是加载表,所以就是表扫描操作,常见的属性:
alias:表名称
Statistics:表统计信息,包含表中数据条数,数据大小等
Select Operator:选取操作,常见的属性 :
expressions:需要的字段名称及字段类型
outputColumnNames:输出的列名称
Statistics:表统计信息,包含表中数据条数,数据大小等
Group By Operator:分组聚合操作,常见的属性:
aggregations:显示聚合函数信息
mode:聚合模式,值有 hash:随机聚合,就是 hash partition;partial:局部聚合;final:最终聚合
keys:分组的字段,如果没有分组,则没有此字段
outputColumnNames:聚合之后输出列名
Statistics:表统计信息,包含分组聚合之后的数据条数,数据大小等
Reduce Output Operator:输出到 reduce 操作,常见属性:
sort order:值为空 不排序;值为 + 正序排序,值为 - 倒序排序;值为 +- 排序的列为两列,第一列为正序,第二列为倒序
Filter Operator:过滤操作,常见的属性:
predicate:过滤条件,如 sql 语句中的 where id>=1,则此处显示(id >= 1)
Map Join Operator:join 操作,常见的属性:
condition map:join 方式 ,如 Inner Join 0 to 1 Left Outer Join0 to 2
keys: join 的条件字段
outputColumnNames:join 完成之后输出的字段
Statistics:join 完成之后生成的数据条数,大小等
File Output Operator:文件输出操作,常见的属性
compressed:是否压缩
table:表的信息,包含输入输出文件格式化方式,序列化方式等
Fetch Operator 客户端获取数据操作,常见的属性:
limit,值为 -1 表示不限制条数,其他值为限制的条数
2. explain 的使用场景
本节介绍 explain 能够为我们在生产实践中带来哪些便利及解决我们哪些迷惑
案例一:join 语句会过滤 null 的值吗?
现在,我们在 hive cli 输入以下查询计划语句
select a.id,b.user_name from test1 a join test2 b on a.id=b.id;
问:上面这条 join 语句会过滤 id 为 null 的值吗
执行下面语句:
explain select a.id,b.user_name from test1 a join test2 b on a.id=b.id;
我们来看结果 (为了适应页面展示,仅截取了部分输出信息):
TableScan
alias: a
Statistics: Num rows: 6 Data size: 75 Basic stats: COMPLETE Column stats: NONE
Filter Operator
predicate: id is not null (type: boolean)
Statistics: Num rows: 6 Data size: 75 Basic stats: COMPLETE Column stats: NONE
Select Operator
expressions: id (type: int)
outputColumnNames: _col0
Statistics: Num rows: 6 Data size: 75 Basic stats: COMPLETE Column stats: NONE
HashTable Sink Operator
keys:
0 _col0 (type: int)
1 _col0 (type: int)
...
从上述结果可以看到 predicate: id is not null 这样一行,说明 join 时会自动过滤掉关联字段为 null 值的情况,但 left join 或 full join 是不会自动过滤 null 值的,大家可以自行尝试下。
案例二:group by 分组语句会进行排序吗?
看下面这条 sql
select id,max(user_name) from test1 group by id;
问:group by 分组语句会进行排序吗
直接来看 explain 之后结果 (为了适应页面展示,仅截取了部分输出信息)
TableScan
alias: test1
Statistics: Num rows: 9 Data size: 108 Basic stats: COMPLETE Column stats: NONE
Select Operator
expressions: id (type: int), user_name (type: string)
outputColumnNames: id, user_name
Statistics: Num rows: 9 Data size: 108 Basic stats: COMPLETE Column stats: NONE
Group By Operator
aggregations: max(user_name)
keys: id (type: int)
mode: hash
outputColumnNames: _col0, _col1
Statistics: Num rows: 9 Data size: 108 Basic stats: COMPLETE Column stats: NONE
Reduce Output Operator
key expressions: _col0 (type: int)
sort order: +
Map-reduce partition columns: _col0 (type: int)
Statistics: Num rows: 9 Data size: 108 Basic stats: COMPLETE Column stats: NONE
value expressions: _col1 (type: string)
...
我们看 Group By Operator,里面有 keys: id (type: int) 说明按照 id 进行分组的,再往下看还有 sort order: + ,说明是按照 id 字段进行正序排序的。
案例三:哪条 sql 执行效率高呢?
观察两条 sql 语句
SELECT
a.id,
b.user_name
FROM
test1 a
JOIN test2 b ON a.id = b.id
WHERE
a.id > 2;
SELECT
a.id,
b.user_name
FROM
(SELECT * FROM test1 WHERE id > 2) a
JOIN test2 b ON a.id = b.id;
这两条 sql 语句输出的结果是一样的,但是哪条 sql 执行效率高呢?
有人说第一条 sql 执行效率高,因为第二条 sql 有子查询,子查询会影响性能;
有人说第二条 sql 执行效率高,因为先过滤之后,在进行 join 时的条数减少了,所以执行效率就高了。
到底哪条 sql 效率高呢,我们直接在 sql 语句前面加上 explain,看下执行计划不就知道了嘛!
在第一条 sql 语句前加上 explain,得到如下结果
hive (default)> explain select a.id,b.user_name from test1 a join test2 b on a.id=b.id where a.id >2;
OK
Explain
STAGE DEPENDENCIES:
Stage-4 is a root stage
Stage-3 depends on stages: Stage-4
Stage-0 depends on stages: Stage-3
STAGE PLANS:
Stage: Stage-4
Map Reduce Local Work
Alias -> Map Local Tables:
$hdt$_0:a
Fetch Operator
limit: -1
Alias -> Map Local Operator Tree:
$hdt$_0:a
TableScan
alias: a
Statistics: Num rows: 6 Data size: 75 Basic stats: COMPLETE Column stats: NONE
Filter Operator
predicate: (id > 2) (type: boolean)
Statistics: Num rows: 2 Data size: 25 Basic stats: COMPLETE Column stats: NONE
Select Operator
expressions: id (type: int)
outputColumnNames: _col0
Statistics: Num rows: 2 Data size: 25 Basic stats: COMPLETE Column stats: NONE
HashTable Sink Operator
keys:
0 _col0 (type: int)
1 _col0 (type: int)
Stage: Stage-3
Map Reduce
Map Operator Tree:
TableScan
alias: b
Statistics: Num rows: 6 Data size: 75 Basic stats: COMPLETE Column stats: NONE
Filter Operator
predicate: (id > 2) (type: boolean)
Statistics: Num rows: 2 Data size: 25 Basic stats: COMPLETE Column stats: NONE
Select Operator
expressions: id (type: int), user_name (type: string)
outputColumnNames: _col0, _col1
Statistics: Num rows: 2 Data size: 25 Basic stats: COMPLETE Column stats: NONE
Map Join Operator
condition map:
Inner Join 0 to 1
keys:
0 _col0 (type: int)
1 _col0 (type: int)
outputColumnNames: _col0, _col2
Statistics: Num rows: 2 Data size: 27 Basic stats: COMPLETE Column stats: NONE
Select Operator
expressions: _col0 (type: int), _col2 (type: string)
outputColumnNames: _col0, _col1
Statistics: Num rows: 2 Data size: 27 Basic stats: COMPLETE Column stats: NONE
File Output Operator
compressed: false
Statistics: Num rows: 2 Data size: 27 Basic stats: COMPLETE Column stats: NONE
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
Local Work:
Map Reduce Local Work
Stage: Stage-0
Fetch Operator
limit: -1
Processor Tree:
ListSink
在第二条 sql 语句前加上 explain,得到如下结果
hive (default)> explain select a.id,b.user_name from(select * from test1 where id>2 ) a join test2 b on a.id=b.id;
OK
Explain
STAGE DEPENDENCIES:
Stage-4 is a root stage
Stage-3 depends on stages: Stage-4
Stage-0 depends on stages: Stage-3
STAGE PLANS:
Stage: Stage-4
Map Reduce Local Work
Alias -> Map Local Tables:
$hdt$_0:test1
Fetch Operator
limit: -1
Alias -> Map Local Operator Tree:
$hdt$_0:test1
TableScan
alias: test1
Statistics: Num rows: 6 Data size: 75 Basic stats: COMPLETE Column stats: NONE
Filter Operator
predicate: (id > 2) (type: boolean)
Statistics: Num rows: 2 Data size: 25 Basic stats: COMPLETE Column stats: NONE
Select Operator
expressions: id (type: int)
outputColumnNames: _col0
Statistics: Num rows: 2 Data size: 25 Basic stats: COMPLETE Column stats: NONE
HashTable Sink Operator
keys:
0 _col0 (type: int)
1 _col0 (type: int)
Stage: Stage-3
Map Reduce
Map Operator Tree:
TableScan
alias: b
Statistics: Num rows: 6 Data size: 75 Basic stats: COMPLETE Column stats: NONE
Filter Operator
predicate: (id > 2) (type: boolean)
Statistics: Num rows: 2 Data size: 25 Basic stats: COMPLETE Column stats: NONE
Select Operator
expressions: id (type: int), user_name (type: string)
outputColumnNames: _col0, _col1
Statistics: Num rows: 2 Data size: 25 Basic stats: COMPLETE Column stats: NONE
Map Join Operator
condition map:
Inner Join 0 to 1
keys:
0 _col0 (type: int)
1 _col0 (type: int)
outputColumnNames: _col0, _col2
Statistics: Num rows: 2 Data size: 27 Basic stats: COMPLETE Column stats: NONE
Select Operator
expressions: _col0 (type: int), _col2 (type: string)
outputColumnNames: _col0, _col1
Statistics: Num rows: 2 Data size: 27 Basic stats: COMPLETE Column stats: NONE
File Output Operator
compressed: false
Statistics: Num rows: 2 Data size: 27 Basic stats: COMPLETE Column stats: NONE
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
Local Work:
Map Reduce Local Work
Stage: Stage-0
Fetch Operator
limit: -1
Processor Tree:
ListSink
大家有什么发现,除了表别名不一样,其他的执行计划完全一样,都是先进行 where 条件过滤,在进行 join 条件关联。说明 hive 底层会自动帮我们进行优化,所以这两条 sql 语句执行效率是一样的。
以上仅列举了 3 个我们生产中既熟悉又有点迷糊的例子,explain 还有很多其他的用途,如查看 stage 的依赖情况、排查数据倾斜、hive 调优等,小伙伴们可以自行尝试。
2. explain dependency 的用法
explain dependency 用于描述一段 SQL 需要的数据来源,输出是一个 json 格式的数据,里面包含以下两个部分的内容:
input_partitions:描述一段 SQL 依赖的数据来源表分区,里面存储的是分区名的列表,如果整段 SQL 包含的所有表都是非分区表,则显示为空。
input_tables:描述一段 SQL 依赖的数据来源表,里面存储的是 Hive 表名的列表。
使用 explain dependency 查看 SQL 查询非分区普通表,在 hive cli 中输入以下命令:
explain dependency select s_age,count(1) num from student_orc;
得到结果:
{"input_partitions":[],"input_tables":[{"tablename":"default@student_tb _orc","tabletype":"MANAGED_TABLE"}]}
使用 explain dependency 查看 SQL 查询分区表,在 hive cli 中输入以下命令:
explain dependency select s_age,count(1) num from student_orc_partition;
得到结果:
{"input_partitions":[{"partitionName":"default@student_orc_partition@ part=0"},
{"partitionName":"default@student_orc_partition@part=1"},
{"partitionName":"default@student_orc_partition@part=2"},
{"partitionName":"default@student_orc_partition@part=3"},
{"partitionName":"default@student_orc_partition@part=4"},
{"partitionName":"default@student_orc_partition@part=5"},
{"partitionName":"default@student_orc_partition@part=6"},
{"partitionName":"default@student_orc_partition@part=7"},
{"partitionName":"default@student_orc_partition@part=8"},
{"partitionName":"default@student_orc_partition@part=9"}],
"input_tables":[{"tablename":"default@student_orc_partition", "tabletype":"MANAGED_TABLE"}]
explain dependency 的使用场景有两个:
场景一:快速排除。快速排除因为读取不到相应分区的数据而导致任务数据输出异常。例如,在一个以天分区的任务中,上游任务因为生产过程不可控因素出现异常或者空跑,导致下游任务引发异常。通过这种方式,可以快速查看 SQL 读取的分区是否出现异常。
场景二:理清表的输入,帮助理解程序的运行,特别是有助于理解有多重子查询,多表连接的依赖输入。
下面通过两个案例来看 explain dependency 的实际运用:
案例一:识别看似等价的代码
对于刚接触 SQL 的程序员,很容易将
select * from a inner join b on a.no=b.no and a.f>1 and a.f<3;
等价于
select * from a inner join b on a.no=b.no where a.f>1 and a.f<3;
我们可以通过案例来查看下它们的区别:
代码 1:
select
a.s_no
from student_orc_partition a
inner join
student_orc_partition_only b
on a.s_no=b.s_no and a.part=b.part and a.part>=1 and a.part<=2;
代码 2:
select
a.s_no
from student_orc_partition a
inner join
student_orc_partition_only b
on a.s_no=b.s_no and a.part=b.part
where a.part>=1 and a.part<=2;
我们看下上述两段代码 explain dependency 的输出结果:
代码 1 的 explain dependency 结果:
{"input_partitions":
[{"partitionName":"default@student_orc_partition@part=0"},
{"partitionName":"default@student_orc_partition@part=1"},
{"partitionName":"default@student_orc_partition@part=2"},
{"partitionName":"default@student_orc_partition_only@part=1"},
{"partitionName":"default@student_orc_partition_only@part=2"}],
"input_tables": [{"tablename":"default@student_orc_partition","tabletype":"MANAGED_TABLE"}, {"tablename":"default@student_orc_partition_only","tabletype":"MANAGED_TABLE"}]}
代码 2 的 explain dependency 结果:
{"input_partitions":
[{"partitionName":"default@student_orc_partition@part=1"},
{"partitionName" : "default@student_orc_partition@part=2"},
{"partitionName" :"default@student_orc_partition_only@part=1"},
{"partitionName":"default@student_orc_partition_only@part=2"}],
"input_tables": [{"tablename":"default@student_orc_partition","tabletype":"MANAGED_TABLE"}, {"tablename":"default@student_orc_partition_only","tabletype":"MANAGED_TABLE"}]}
通过上面的输出结果可以看到,其实上述的两个 SQL 并不等价,代码 1 在内连接(inner join)中的连接条件(on)中加入非等值的过滤条件后,并没有将内连接的左右两个表按照过滤条件进行过滤,内连接在执行时会多读取 part=0 的分区数据。而在代码 2 中,会过滤掉不符合条件的分区。
案例二:识别 SQL 读取数据范围的差别
代码 1:
explain dependency
select
a.s_no
from student_orc_partition a
left join
student_orc_partition_only b
on a.s_no=b.s_no and a.part=b.part and b.part>=1 and b.part<=2;
代码 2:
explain dependency
select
a.s_no
from student_orc_partition a
left join
student_orc_partition_only b
on a.s_no=b.s_no and a.part=b.part and a.part>=1 and a.part<=2;
以上两个代码的数据读取范围是一样的吗?答案是不一样,我们通过 explain dependency 来看下:
代码 1 的 explain dependency 结果:
{"input_partitions":
[{"partitionName": "default@student_orc_partition@part=0"},
{"partitionName":"default@student_orc_partition@part=1"}, …中间省略 7 个分区
{"partitionName":"default@student_orc_partition@part=9"},
{"partitionName":"default@student_orc_partition_only@part=1"},
{"partitionName":"default@student_orc_partition_only@part=2"}],
"input_tables": [{"tablename":"default@student_orc_partition","tabletype":"MANAGED_TABLE"}, {"tablename":"default@student_orc_partition_only","tabletype":"MANAGED_TABLE"}]}
代码 2 的 explain dependency 结果:
{"input_partitions":
[{"partitionName":"default@student_orc_partition@part=0"},
{"partitionName":"default@student_orc_partition@part=1"}, …中间省略 7 个分区
{"partitionName":"default@student_orc_partition@part=9"},
{"partitionName":"default@student_orc_partition_only@part=0"},
{"partitionName":"default@student_orc_partition_only@part=1"}, …中间省略 7 个分区
{"partitionName":"default@student_orc_partition_only@part=9"}],
"input_tables": [{"tablename":"default@student_orc_partition","tabletype":"MANAGED_TABLE"}, {"tablename":"default@student_orc_partition_only","tabletype":"MANAGED_TABLE"}]}
可以看到,对左外连接在连接条件中加入非等值过滤的条件,如果过滤条件是作用于右表(b 表)有起到过滤的效果,则右表只要扫描两个分区即可,但是左表(a 表)会进行全表扫描。如果过滤条件是针对左表,则完全没有起到过滤的作用,那么两个表将进行全表扫描。这时的情况就如同全外连接一样都需要对两个数据进行全表扫描。
在使用过程中,容易认为代码片段 2 可以像代码片段 1 一样进行数据过滤,通过查看 explain dependency 的输出结果,可以知道不是如此。
3. explain authorization 的用法
通过 explain authorization 可以知道当前 SQL 访问的数据来源(INPUTS) 和数据输出(OUTPUTS),以及当前 Hive 的访问用户 (CURRENT_USER)和操作(OPERATION)。
在 hive cli 中输入以下命令:
explain authorization
select variance(s_score) from student_tb_orc;
结果如下:
INPUTS:
default@student_tb_orc
OUTPUTS:
hdfs://node01:8020/tmp/hive/hdfs/cbf182a5-8258-4157-9194- 90f1475a3ed5/-mr-10000
CURRENT_USER:
hdfs
OPERATION:
QUERY
AUTHORIZATION_FAILURES:
No privilege 'Select' found for inputs { database:default, table:student_ tb_orc, columnName:s_score}
从上面的信息可知:
上面案例的数据来源是 defalut 数据库中的 student_tb_orc 表;
数据的输出路径是 hdfs://node01:8020/tmp/hive/hdfs/cbf182a5-8258-4157-9194-90f1475a3ed5/-mr-10000;
当前的操作用户是 hdfs,操作是查询;
观察上面的信息我们还会看到 AUTHORIZATION_FAILURES 信息,提示对当前的输入没有查询权限,但如果运行上面的 SQL 的话也能够正常运行。为什么会出现这种情况?Hive 在默认不配置权限管理的情况下不进行权限验证,所有的用户在 Hive 里面都是超级管理员,即使不对特定的用户进行赋权,也能够正常查询。
通过上面对 explain 的介绍,可以发现 explain 中有很多值得我们去研究的内容,读懂 explain 的执行计划有利于我们优化 Hive SQL,同时也能提升我们对 SQL 的掌控力。
Hive SQL 底层执行原理
本节结构采用宏观着眼,微观入手,从整体到细节的方式剖析 Hive SQL 底层原理。第一节先介绍 Hive 底层的整体执行流程,然后第二节介绍执行流程中的 SQL 编译成 MapReduce 的过程,第三节剖析 SQL 编译成 MapReduce 的具体实现原理。
Hive 底层执行架构
我们先来看下 Hive 的底层执行架构图, Hive 的主要组件与 Hadoop 交互的过程:
在 Hive 这一侧,总共有五个组件:
UI:用户界面。可看作我们提交 SQL 语句的命令行界面。
DRIVER:驱动程序。接收查询的组件。该组件实现了会话句柄的概念。
COMPILER:编译器。负责将 SQL 转化为平台可执行的执行计划。对不同的查询块和查询表达式进行语义分析,并最终借助表和从 metastore 查找的分区元数据来生成执行计划.
METASTORE:元数据库。存储 Hive 中各种表和分区的所有结构信息。
EXECUTION ENGINE:执行引擎。负责提交 COMPILER 阶段编译好的执行计划到不同的平台上。
上图的基本流程是:
步骤 1:UI 调用 DRIVER 的接口;
步骤 2:DRIVER 为查询创建会话句柄,并将查询发送到 COMPILER(编译器)生成执行计划;
步骤 3 和 4:编译器从元数据存储中获取本次查询所需要的元数据,该元数据用于对查询树中的表达式进行类型检查,以及基于查询谓词修建分区;
步骤 5:编译器生成的计划是分阶段的 DAG,每个阶段要么是 map/reduce 作业,要么是一个元数据或者 HDFS 上的操作。将生成的计划发给 DRIVER。
如果是 map/reduce 作业,该计划包括 map operator trees 和一个 reduce operator tree,执行引擎将会把这些作业发送给 MapReduce :
步骤 6、6.1、6.2 和 6.3:执行引擎将这些阶段提交给适当的组件。在每个 task(mapper/reducer) 中,从 HDFS 文件中读取与表或中间输出相关联的数据,并通过相关算子树传递这些数据。最终这些数据通过序列化器写入到一个临时 HDFS 文件中(如果不需要 reduce 阶段,则在 map 中操作)。临时文件用于向计划中后面的 map/reduce 阶段提供数据。
步骤 7、8 和 9:最终的临时文件将移动到表的位置,确保不读取脏数据(文件重命名在 HDFS 中是原子操作)。对于用户的查询,临时文件的内容由执行引擎直接从 HDFS 读取,然后通过 Driver 发送到 UI。
总结
通过上面的内容我们发现,shuffle 阶段堪称性能的杀手,为什么这么说,一方面 shuffle 阶段是最容易引起数据倾斜的;另一方面 shuffle 的过程中会产生大量的磁盘 I/O、网络 I/O 以及压缩、解压缩、序列化和反序列化等。这些操作都是严重影响性能的。
所以围绕 shuffle 和数据倾斜有很多的调优点:
Mapper 端的 Buffer 设置为多大?Buffer 设置得大,可提升性能,减少磁盘 I/O ,但 是对内存有要求,对 GC 有压力;Buffer 设置得小,可能不占用那么多内存, 但是可能频繁的磁盘 I/O 、频繁的网络 I/O 。
文章来源于大数据技术团队
评论