技术解析 | Doris SQL 原理解析
汪细勖
小米云平台工程师
负责小米 Apache Doris 项目的开发和运维
专注于 OLAP 计算引擎的 SQL 解析和优化
导读
本文主要介绍了 Doris SQL 解析的原理。
重点讲述了生成单机逻辑计划,生成分布式逻辑计划,生成分布式物理计划的过程。对应于代码实现是 Analyze,SinglePlan,DistributedPlan,Schedule 四个部分。
Analyze 负责对 AST 进行前期的一些处理,SinglePlan 根据 AST 进行优化生成单机查询计划,DistributedPlan 将单机的查询计划拆成分布式的查询计划,Schedule 阶段负责决定查询计划下发到哪些机器上执行。
由于 SQL 类型有很多,本文侧重介绍查询 SQL 的解析,从算法原理和代码实现上深入讲解了 Doris 的 SQL 解析原理。
1 Doris 简介
Doris 是基于 MPP 架构的交互式 SQL 数据仓库,主要用于解决近实时的报表和多维分析。
Doris 分成两部分 FE 和 BE,FE 负责存储以及维护集群元数据、接收、解析、查询、设计规划整体查询流程,BE 在 Doris 的存储引擎中,用户数据被水平划分为若干个数据分片(Tablet,也称作数据分桶)。每个 Tablet 包含若干数据行。多个 Tablet 在逻辑上归属于不同的分区 Partition。一个 Tablet 只属于一个 Partition。而一个 Partition 包含若干个 Tablet。Tablet 是数据移动、复制等操作的最小物理存储单元。
2 SQL 解析简介
SQL 解析在这篇文章中指的是将一条 sql 语句经过一系列的解析最后生成一个完整的物理执行计划的过程。
这个过程包括以下四个步骤:词法分析,语法分析,生成逻辑计划,生成物理计划。
如图 1 所示:
2.1 词法分析
词法分析主要负责将字符串形式的 sql 识别成一个个 token,为语法分析做准备。
2.2 语法分析
语法分析主要负责根据语法规则,将词法分析生成的 token 转成抽象语法树(Abstract Syntax Tree),如图 2 所示。
图 2 抽象语法树示例
2.3 逻辑计划
逻辑计划负责将抽象语法树转成代数关系。代数关系是一棵算子树,每个节点代表一种对数据的计算方式,整棵树代表了数据的计算方式以及流动方向,如图 3 所示。
图 3 关系代数示例
2.4 物理计划
物理计划是在逻辑计划的基础上,根据机器的分布,数据的分布,决定去哪些机器上执行哪些计算操作。
Doris 系统的 SQL 解析也是采用这些步骤,只不过根据 Doris 系统结构的特点和数据的存储方式,进行了细化和优化,最大化发挥机器的计算能力。
3 设计目标
最大化计算的并行性
最小化数据的网络传输
最大化减少需要扫描的数据
4 总体架构
Doris SQL 解析具体包括了五个步骤:词法分析,语法分析,生成单机逻辑计划,生成分布式逻辑计划,生成物理执行计划。
具体代码实现上包含以下五个步骤:Parse, Analyze, SinglePlan, DistributedPlan, Schedule。
图 4 系统总体架构图
如图 4 所示,Parse 阶段本文不详细讲,Analyze 负责对 AST 进行前期的一些处理,SinglePlan 根据 AST 进行优化生成单机查询计划,DistributedPlan 将单机的查询计划拆成分布式的查询计划,Schedule 阶段负责决定查询计划下发到哪些机器上执行。
由于 SQL 类型有很多,本文侧重介绍查询 SQL 的解析。
图 5 展示了一个简单的查询 SQL 在 Doris 的解析实现。
5 Parse 阶段
词法分析采用 jflex 技术,语法分析采用 java cup parser 技术,最后生成抽象语法树(Abstract Syntax Tree)AST,这些都是现有的、成熟的技术,在这里不进行详细介绍。
AST 是一种树状结构,代表着一条 SQL。不同类型的查询 select, insert, show, set, alter table, create table 等经过 Parse 阶段后生成不同的数据结构(SelectStmt, InsertStmt, ShowStmt, SetStmt, AlterStmt, AlterTableStmt, CreateTableStmt 等),但他们都继承自 Statement,并根据自己的语法规则进行一些特定的处理。例如:对于 select 类型的 sql, Parse 之后生成了 SelectStmt 结构。
SelectStmt 结构包含了 SelectList,FromClause,WhereClause,GroupByClause,SortInfo 等结构。这些结构又包含了更基础的一些数据结构,如 WhereClause 包含了 BetweenPredicate(between 表达式), BinaryPredicate(二元表达式), CompoundPredicate(and or 组合表达式), InPredicate(in 表达式)等。
AST 中所有结构都是由基本结构表达式 Expr 通过多种组合而成,如图 6 所示。
6 Analyze 阶段
Analyze 主要是对 Parse 阶段生成的抽象语法树 AST 进行一些前期的处理和语义分析,为生成单机逻辑计划做准备。
抽象语法树是由 StatementBase 这个抽象类表示。这个抽象类包含一个最重要的成员函数 analyze(),用来执行 Analyze 阶段要做的事。
不同类型的查询 select, insert, show, set, alter table, create table 等经过 Parse 阶段后生成不同的数据结构(SelectStmt, InsertStmt, ShowStmt, SetStmt, AlterStmt, AlterTableStmt, CreateTableStmt 等),这些数据结构继承自 StatementBase,并实现 analyze()函数,对特定类型的 SQL 进行特定的 Analyze。
例如:select 类型的查询,会转成对 select sql 的子语句 SelectList, FromClause, GroupByClause, HavingClause, WhereClause, SortInfo 等的 analyze()。然后这些子语句再各自对自己的子结构进行进一步的 analyze(),通过层层迭代,把各种类型的 sql 的各种情景都分析完毕。例如:WhereClause 进一步分析其包含的 BetweenPredicate(between 表达式), BinaryPredicate(二元表达式), CompoundPredicate(and or 组合表达式), InPredicate(in 表达式)等。
对于查询类型的 SQL,包含以下几项重要工作:
· 元信息的识别和解析:识别和解析 sql 中涉及的 Cluster, Database, Table, Column 等元信息,确定需要对哪个集群的哪个数据库的哪些表的哪些列进行计算。
· SQL 的合法性检查:窗口函数不能 DISTINCT,投影列是否有歧义,where 语句中不能含有 grouping 操作等。
· SQL 简单重写:比如将 select * 扩展成 select 所有列,count distinct 转成 bitmap 或者 hll 函数等。
· 函数处理:检查 sql 中包含的函数和系统定义的函数是否一致,包括参数类型,参数个数等。
· Table 和 Column 的别名处理
· 类型检查和转换:例如二元表达式两边的类型不一致时,需要对其中一个类型进行转换(BIGINT 和 DECIMAL 比较,BIGINT 类型需要 Cast 成 DECIMAL)。
对 AST 进行 analyze 后,会再进行一次 rewrite 操作,进行精简或者是转成统一的处理方式。目前 rewrite 的算法是基于规则的方式,针对 AST 的树状结构,自底向上,应用每一条规则进行重写。如果重写后,AST 有变化,则再次进行 analyze 和 rewrite,直到 AST 无变化为止。
例如:常量表达式的化简:1 + 1 + 1 重写成 3,1 > 2 重写成 Flase 等。将一些语句转成统一的处理方式,比如将 where in, where exists 重写成 semi join, where not in, where not exists 重写成 anti join。
7 生成单机逻辑 Plan 阶段
这部分工作主要是根据 AST 抽象语法树生成代数关系,也就是俗称的算子数。树上的每个节点都是一个算子,代表着一种操作。
如图 7 所示,ScanNode 代表着对一个表的扫描操作,将一个表的数据读出来。HashJoinNode 代表着 join 操作,小表在内存中构建哈希表,遍历大表找到连接键相同的值。Project 表示投影操作,代表着最后需要输出的列,图 7 表示只用输出 citycode 这一列。
如果不进行优化,生成的关系代数下发到存储中执行的代价非常高。
对于查询:
未优化的关系代数,如图 8 所示,需要将所有列读出来进行一系列的计算,在最后选择输出 siteid, pv 两列,大量无用的列数据浪费了计算资源。
具体来说这个阶段主要做了如下几项工作:
· Slot 物化:指确定一个表达式对应的列需要 Scan 和计算,比如聚合节点的聚合函数表达式和 Group By 表达式需要进行物化。
· 投影下推:BE 在 Scan 时只会 Scan 必须读取的列。
· 谓词下推:在满足语义正确的前提下将过滤条件尽可能下推到 Scan 节点。
· 分区,分桶裁剪:根据过滤条件中的信息,确定需要扫描哪些分区,哪些桶的 tablet。
· Join Reorder:对于 Inner Join, Doris 会根据行数调整表的顺序,将大表放在前面。
· Sort + Limit 优化成 TopN:对于 order by limit 语句会转换成 TopN 的操作节点,方便统一处理。
· MaterializedView 选择:会根据查询需要的列,过滤,排序和 Join 的列,行数,列数等因素选择最佳的物化视图。
图 9 展示了优化的示例,Doris 是在生成关系代数的过程中优化,边生成边优化。
8 生成分布式 Plan 阶段
有了单机的 PlanNode 树之后,就需要进一步根据分布式环境,拆成分布式 PlanFragment 树(PlanFragment 用来表示独立的执行单元),毕竟一个表的数据分散地存储在多台主机上,完全可以让一些计算并行起来。
这个步骤的主要目标是最大化并行度和数据本地化。主要方法是将能够并行执行的节点拆分出去单独建立一个 PlanFragment,用 ExchangeNode 代替被拆分出去的节点,用来接收数据。拆分出去的节点增加一个 DataSinkNode,用来将计算之后的数据传送到 ExchangeNode 中,做进一步的处理。
这一步采用递归的方法,自底向上,遍历整个 PlanNode 树,然后给树上的每个叶子节点创建一个 PlanFragment,如果碰到父节点,则考虑将其中能够并行执行的子节点拆分出去,父节点和保留下来的子节点组成一个 parent PlanFragment。拆分出去的子节点增加一个父节点 DataSinkNode 组成一个 child PlanFragment,child PlanFragment 指向 parent PlanFragment。这样就确定了数据的流动方向。
对于查询操作来说,join 操作是最常见的一种操作。
Doris 目前支持 4 种 join 算法:broadcast join,hash partition join,colocate join,bucket shuffle join。
broadcast join:将小表发送到大表所在的每台机器,然后进行 hash join 操作。当一个表扫描出的数据量较少时,计算 broadcast join 的 cost,通过计算比较 hash partition 的 cost,来选择 cost 最小的方式。
hash partition join:当两张表扫描出的数据都很大时,一般采用 hash partition join。它遍历表中的所有数据,计算 key 的哈希值,然后对集群数取模,选到哪台机器,就将数据发送到这台机器进行 hash join 操作。
colocate join:两个表在创建的时候就指定了数据分布保持一致,那么当两个表的 join key 与分桶的 key 一致时,就会采用 colocate join 算法。由于两个表的数据分布是一样的,那么 hash join 操作就相当于在本地,不涉及到数据的传输,极大提高查询性能。
bucket shuffle join:当 join key 是分桶 key,并且只涉及到一个分区时,就会优先采用 bucket shuffle join 算法。由于分桶本身就代表了数据的一种切分方式,所以可以利用这一特点,只需将右表对左表的分桶数 hash 取模,这样只需网络传输一份右表数据,极大减少了数据的网络传输,如图 10 所示。
如图 11 展示了带有 HashJoinNode 的单机逻辑计划创建分布式逻辑计划的核心流程。
· 对 PlanNode,自底向上创建 PlanFragment。
· 如果是 ScanNode,则直接创建一个 PlanFragment,PlanFragment 的 RootPlanNode 是这个 ScanNode。
· 如果是 HashJoinNode,则首先计算下 broadcastCost,为选择 boracast join 还是 hash partition join 提供参考。
· 根据不同的条件判断选择哪种 Join 算法
· 如果使用 colocate join,由于 join 操作都在本地,就不需要拆分。设置 HashJoinNode 的左子节点为 leftFragment 的 RootPlanNode,右子节点为 rightFragment 的 RootPlanNode,与 leftFragment 共用一个 PlanFragment,删除掉 rightFragment。
· 如果使用 bucket shuffle join,需要将右表的数据发送给左表。所以先创建了一个 ExchangeNode,设置 HashJoinNode 的左子节点为 leftFragment 的 RootPlanNode,右子节点为这个 ExchangeNode,与 leftFragment 共用一个 PlanFragment,并且指定 rightFragment 数据发送的目的地为这个 ExchangeNode。
· 如果使用 broadcast join,需要将右表的数据发送给左表。所以先创建了一个 ExchangeNode,设置 HashJoinNode 的左子节点为 leftFragment 的 RootPlanNode,右子节点为这个 ExchangeNode,与 leftFragment 共用一个 PlanFragment,并且指定 rightFragment 数据发送的目的地为这个 ExchangeNode。
· 如果使用 hash partition join,左表和右边的数据都要切分,需要将左右节点都拆分出去,分别创建 left ExchangeNode, right ExchangeNode,HashJoinNode 指定左右节点为 left ExchangeNode 和 right ExchangeNode。单独创建一个 PlanFragment,指定 RootPlanNode 为这个 HashJoinNode。最后指定 leftFragment, rightFragment 的数据发送目的地为 left ExchangeNode, right ExchangeNode。
图 11 HashJoinNode 创建分布式逻辑计划核心流程
图 12 是两个表的 join 操作转换成 PlanFragment 树之后的示例,一共生成了 3 个 PlanFragment。最终数据的输出通过 ResultSinkNode 节点。
9 Schedule 阶段
这一步是根据分布式逻辑计划,创建分布式物理计划。主要解决以下问题:
哪个 BE 执行哪个 PlanFragment
每个 Tablet 选择哪个副本去查询
如何进行多实例并发
图 13 展示了创建分布式物理计划的核心流程:
a. prepare 阶段:给每个 PlanFragment 创建一个 FragmentExecParams 结构,用来表示 PlanFragment 执行时所需的所有参数;如果一个 PlanFragment 包含有 DataSinkNode,则找到数据发送的目的 PlanFragment,然后指定目的 PlanFragment 的 FragmentExecParams 的输入为该 PlanFragment 的 FragmentExecParams。
b. computeScanRangeAssignment 阶段:针对不同类型的 join 进行不同的处理。
computeScanRangeAssignmentByColocate:针对 colocate join 进行处理,由于 join 的两个表桶中的数据分布都是一样的,他们是基于桶的 join 操作,所以在这里是确定每个桶选择哪个 host。在给 host 分配桶时,尽量保证每个 host 分配到的桶基本平均。
computeScanRangeAssignmentByBucket:针对 bucket shuffle join 进行处理,也只是基于桶的操作,所以在这里是确定每个桶选择哪个 host。在给 host 分配桶时,同样需要尽量保证每个 host 分配到的桶基本平均。
computeScanRangeAssignmentByScheduler:针对其他类型的 join 进行处理。确定每个 scanNode 读取 tablet 哪个副本。一个 scanNode 会读取多个 tablet,每个 tablet 有多个副本。为了使 scan 操作尽可能分散到多台机器上执行,提高并发性能,减少 IO 压力,Doris 采用了 Round-Robin 算法,使 tablet 的扫描尽可能地分散到多台机器上去。例如 100 个 tablet 需要扫描,每个 tablet 3 个副本,一共 10 台机器,在分配时,保障每台机器扫描 10 个 tablet。
c. computeFragmentExecParams 阶段:这个阶段解决 PlanFragment 下发到哪个 BE 上执行,以及如何处理实例并发问题。确定了每个 tablet 的扫描地址之后,就可以以地址为维度,将 FragmentExecParams 生成多个实例,也就是 FragmentExecParams 中包含的地址有多个,就生成多个实例 FInstanceExecParam。如果设置了并发度,那么一个地址的执行实例再进一步的拆成多个 FInstanceExecParam。针对 bucket shuffle join 和 colocate join 会有一些特殊处理,但是基本思想一样。FInstanceExecParam 创建完成后,会分配一个唯一的 ID,方便追踪信息。如果 FragmentExecParams 中包含有 ExchangeNode,需要计算有多少 senders,以便知道需要接受多少个发送方的数据。最后 FragmentExecParams 确定 destinations,并把目的地址填充上去。
d. create result receiver 阶段:result receiver 是查询完成后,最终数据需要输出的地方。
e. to thrift 阶段:根据所有 PlanFragment 的 FInstanceExecParam 创建 rpc 请求,然后下发到 BE 端执行。这样一个完整的 SQL 解析过程完成了。
如图 14 所示是一个简单示例,图中的 PlanFrament 包含了一个 ScanNode,ScanNode 扫描 3 个 tablet,每个 tablet 有 2 副本,集群假设有 2 台 host。
computeScanRangeAssignment 阶段确定了需要扫描 replica 1,3,5,8,10,12,其中 replica 1,3,5 位于 host1 上,replica 8,10,12 位于 host2 上。
如果全局并发度设置为 1 时,则创建 2 个实例 FInstanceExecParam,下发到 host1 和 host2 上去执行,如果如果全局并发度设置为 3,这个 host1 上创建 3 个实例 FInstanceExecParam,host2 上创建 3 个实例 FInstanceExecParam,每个实例扫描一个 replica,相当于发起 6 个 rpc 请求。
10 总结
本文首先简单介绍了 Doris,然后介绍 SQL 解析的通用流程:词法分析,语法分析,生成逻辑计划,生成物理计划,接着从总体上介绍了 Doris 在 SQL 解析这块的总体架构,最后详细讲解了 Parse,Analyze,SinglePlan,DistributedPlan,Schedule 等 5 个过程,从算法原理和代码实现上进行了深入的讲解。
Doris 遵守了 SQL 解析的常用方法,但根据底层存储架构,以及分布式的特点,在 SQL 解析这块进行了大量的优化,实现了最大并行度和最小化网络传输,给 SQL 执行层面减少很多负担。
更多技术干货,欢迎关注“百度开发者中心“公众号
评论