原文链接: Apache Calcite整体架构及处理流程 - Liebing's Homepage
Apache Calcite是一个动态的数据管理框架, 它可以实现 SQL 的解析, 验证, 优化和执行. 称之为"动态"是因为 Calcite 是模块化和插件式的, 上述任何一个步骤在 Calcite 中都对应着一个相对独立的模块. 用户可以选择使用其中的一个或多个模块, 也可以对任意模块进行定制化的扩展. 正是这种灵活性使得 Calcite 可以在现有的存储或计算系统上方便地构建 SQL 访问层, 甚至在已有 SQL 能力的系统中也可引入 Calcite 中的某个模块实现相应的功能, 比如 Apche Hive 就仅使用了 Calcite 进行优化, 但却有自己的 SQL 解析器. Calcite 的这种特性使其在大数据系统中得到了广泛的运用, 比如Apache Flink, Apache Drill等都大量使用了 Calcite, 因此理解 Calcite 的原理已经成为理解大数据系统中 SQL 访问层实现原理的必备条件.
笔者在学习 Calcite 的过程中发现关于 Calcite 的实践案例十分稀缺, Calcite 文档中对于原理和使用方法的介绍也比较笼统, 因此准备对 Calcite 的相关内容进行总结整理, 由于整体内容较多, 后续计划每个模块安排一到两篇文章进行详细介绍. 本文是这一系列的第一篇, 重点介绍 Calcite 的架构, 并用一个可运行的例子来一步步分析 Calcite 在 SQL 解析, 验证, 优化和执行各个阶段所做的工作和输出的结果, 以形成对 Calcite 的整体了解. 关于 Calcite 的历史背景, 可以阅读参考[1], 本文不再赘述.
Calcite 整体架构
Calcite 的整体架构如下图(图片来自Calcite论文)所示, 它包含以下组成部分:
JDBC 接口: 用于使用标准的 JDBC 接口访问 Calcite 获取数据, 为了提供 JDBC/ODBC 接口, Calcite 构建了一个独立的Avatica框架.
SQL Parser 和 SQL Validator: 用于进行 SQL 的解析和验证, 将原始的 SQL 字符串解析并转化为内部的SqlNode
树表示.
Query Optimizer: 用于进行查询优化, 查询优化是在关系代数的基础上进行的, 在 Calcite 内部有一种关系代数表示方法, 即将关系代数表示为RelNode
树. RelNode
树既可由SqlNode
树转化而来, 也可通过 Calcite 提供的 Expressions Builder 接口构建.
Enumerator 执行计划: Calcite 提供了一种将优化后的RelNode
树生成为 Enumerator 执行计划的方法, Enumerator 执行计划基于 Linq4j 实现, 这部分并未在图中画出. 由于多数系统有自己的执行接口, 因此 Calcite 的这部分组件在成熟的系统中较少使用. Calcite 的一些Adapter使用了 Enumerator 执行计划.
述架构可以看出, Calcite 包含许多组成典型数据库管理系统的部件. 不过, 它省略了一些关键的组成部分, 例如, 数据的存储, 处理数据的算法和存储元数据的存储库. 这些省略是有意为之的, 因为在大数据时代, 对不同的数据类型有不同的存储和计算引擎, 想要将它们统一到一个框架中是不太可能的. Calcite 的目的是仅提供构建 SQL 访问的框架, 这也是其广泛适用的原因. 这种省略带来的另一个好处是, 使用 Calcite 可以十分方便地构建联邦查询引擎, 即屏蔽底层物理存储和计算引擎, 使用一个统一的 SQL 接口实现数据访问.
Calcite 处理流程
Calcite 的完整处理流程实际上就是 SQL 的解析, 优化与执行流程, 具体步骤如下图所示.
从图中可以看出, Calcite 的处理流程主要分为 5 个阶段:
Parser 用于解析 SQL, 将输入的 SQL 字符串转化为抽象语法树(AST), Calcite 中用SqlNode
树表示.
Validator 根据元数据信息对SqlNode
树进行验证, 其输出仍是SqlNode
树.
Converter 将SqlNode
树转化为关系代数, 以方便进一步优化, Calcite 中使用RelNode
树表示关系代数.
Optimizer 对输入的关系代数进行优化, 输出优化后的RelNode
树.
Execute 阶段会根据优化后的RelNode
生成执行计划, 在 Calcite 中内置了一种基于 Enumerator 的执行计划生成方法.
为了更加深入地理解上述步骤, 笔者设计了一个小而全的案例, 通过 Calcite 实现使用 SQL 访问 CSV 文件. 后文我们将依据 Calcite 处理的 5 个阶段, 逐步分析每个阶段所作的工作和输出结果. 为方便阅读本文只给出了核心代码, 完整的可执行代码可参考这里.
如果读者阅读过 Calcite 的文档或源代码, 可能会发现 Calcite 已经包含了一个CSV Adapter, 可以实现 CSV 文件的访问. 不过 Adapter 需要借助 JDBC 接口使用, 而 JDBC 的调用链十分复杂, 不利于我们理解 Calcite 的核心处理步骤. 因此这里绕过了 JDBC, 使用了一个更简易的案例. 通过本案例也可帮助理解 Calcite Adapter 的实现原理.
案例数据及 SQL
在本文的案例中, 我们使用了两张表. 其中users
表存储用户数据, orders
表存储订单数据.
users
表的内容如下.
id:string,name:string,age:int
1,Jack,28
2,John,21
3,Tom,32
4,Peter,24
复制代码
orders
表的内容如下.
id:string,user_id:string,goods:string,price:double
001,1,Cola,3.5
002,1,Hat,38.9
003,2,Shoes,199.9
004,3,Book,39.9
005,4,Phone,2499.9
复制代码
后文将基于上述两张表, 逐步分析以下 SQL 语句的解析和执行流程, 这一 SQL 语句用于查询每个用户的订单消费总额, 并按用户id
排序后输出.
SELECT u.id, name, age, sum(price)
FROM users AS u join orders AS o ON u.id = o.user_id
WHERE age >= 20 AND age <= 30
GROUP BY u.id, name, age
ORDER BY u.id
复制代码
SQL 解析
SQL 语句处理的第一步便是通过词法分析和语法分析将 SQL 字符串转化为 AST. 在 Calcite 中, 借助JavaCC实现了 SQL 的解析, 并转化为SqlNode
表示. JavaCC 是一种解析器生成器工具, 可以根据用户提供的语法规则文件自动生成解析器, 如果对如何使用 JavaCC 生成抽象语法树感兴趣可阅读笔者之前的博文编译原理实践 - JavaCC解析表达式并生成抽象语法树.
在 Calcite 中, SqlNode
是 AST 节点的抽象基类, 不同类型的节点有对应的实现类. 比如上述 SQL 语句便会生成SqlSelect
和SqlOrderBy
两个主要的节点. 在 Calcite 中, 我们可以简单地使用如下代码将 SQL 字符串转化为SqlNode
.
String sql = "SELECT u.id, name, age, sum(price) " +
"FROM users AS u join orders AS o ON u.id = o.user_id " +
"WHERE age >= 20 AND age <= 30 " +
"GROUP BY u.id, name, age " +
"ORDER BY u.id";
// 创建SqlParser, 用于解析SQL字符串
SqlParser parser = SqlParser.create(sql, SqlParser.Config.DEFAULT);
// 解析SQL字符串, 生成SqlNode树
SqlNode sqlNode = parser.parseStmt();
复制代码
上述代码中的sqlNode
是 AST 的根节点, 下图是将其展开后的结果. 可以看到sqlNode
其实是SqlOrderBy
类型, 它的query
字段是一个SqlSelect
类型, 即代表原始的 SQL 语句去掉ORDER BY
部分. 图中红色矩形框内的其实都是SqlNode
类型.
SQL 验证
SQL 解析阶段只是简单地将 SQL 字符串转化为SqlNode
树, 并没有对 SQL 语句进行语义上的检查, 比如 SQL 中指定的表是否存在于数据库中, 字段是否存在于表中等. Calcite 中的 SQL 验证阶段一方面会借助元数据信息执行上述验证, 另一方面会对SqlNode
树进行一些改写, 以转化为统一的格式, 方便下一步处理.
在 Calcite 中可通过以下代码进行 SQL 验证.
// 创建Schema, 一个Schema中包含多个表. Calcite中的Schema类似于RDBMS中的Database
SimpleTable userTable = SimpleTable.newBuilder("users")
.addField("id", SqlTypeName.VARCHAR)
.addField("name", SqlTypeName.VARCHAR)
.addField("age", SqlTypeName.INTEGER)
.withFilePath("/path/to/user.csv")
.withRowCount(10)
.build();
SimpleTable orderTable = SimpleTable.newBuilder("orders")
.addField("id", SqlTypeName.VARCHAR)
.addField("user_id", SqlTypeName.VARCHAR)
.addField("goods", SqlTypeName.VARCHAR)
.addField("price", SqlTypeName.DECIMAL)
.withFilePath("/path/to/order.csv")
.withRowCount(10)
.build();
SimpleSchema schema = SimpleSchema.newBuilder("s")
.addTable(userTable)
.addTable(orderTable)
.build();
CalciteSchema rootSchema = CalciteSchema.createRootSchema(false, false);
rootSchema.add(schema.getSchemaName(), schema);
RelDataTypeFactory typeFactory = new JavaTypeFactoryImpl();
// 创建CatalogReader, 用于指示如何读取Schema信息
Prepare.CatalogReader catalogReader = new CalciteCatalogReader(
rootSchema,
Collections.singletonList(schema.getSchemaName()),
typeFactory,
config);
// 创建SqlValidator, 用于执行SQL验证
SqlValidator.Config validatorConfig = SqlValidator.Config.DEFAULT
.withLenientOperatorLookup(config.lenientOperatorLookup())
.withSqlConformance(config.conformance())
.withDefaultNullCollation(config.defaultNullCollation())
.withIdentifierExpansion(true);
SqlValidator validator = SqlValidatorUtil.newValidator(
SqlStdOperatorTable.instance(), catalogReader, typeFactory, validatorConfig);
// 执行SQL验证
SqlNode validateSqlNode = validator.validate(node);
复制代码
从上述代码中可以看到, SQL 验证后的输出结果仍是SqlNode
树, 不过其内部结构发生了改变. 一个明显的变化是验证后的SqlOrderBy
节点被改写为了SqlSelect
节点, 并在其orderBy
变量中记录了排序字段.
另外, 如果把验证前后的SqlNode
完全打印出来, 我们可以发现 Calcite 在验证时会为每个字段加上表名限定, 为每个表加上 Schema 限定. 读者可以试着故意把表名或者字段写错, 运行时在这一阶段就会报错.
-- 验证前的SqlNode树打印结果
SELECT `u`.`id`, `name`, `age`, SUM(`price`)
FROM `users` AS `u`
INNER JOIN `orders` AS `o` ON `u`.`id` = `o`.`user_id`
WHERE `age` >= 20 AND `age` <= 30
GROUP BY `u`.`id`, `name`, `age`
ORDER BY `u`.`id`
-- 验证后的SqlNode树打印结果
SELECT `u`.`id`, `u`.`name`, `u`.`age`, SUM(`o`.`price`)
FROM `s`.`users` AS `u`
INNER JOIN `s`.`orders` AS `o` ON `u`.`id` = `o`.`user_id`
WHERE `u`.`age` >= 20 AND `u`.`age` <= 30
GROUP BY `u`.`id`, `u`.`name`, `u`.`age`
ORDER BY `u`.`id`
复制代码
转化为关系代数
关系代数是 SQL 背后的理论基础, 如果读者不了解关系代数可阅读Introduction of Relational Algebra in DBMS作简单了解, "数据库系统概念"中对关系代数有更深入的介绍.
在 Calcite 中, 关系代数由RelNode
表示, 我们可以通过以下代码, 将验证后的SqlNode
树转化为RelNode
树. 可以看到创建SqlToRelConverter
的代码其实设计的并不十分优雅, VolcanoPlanner
其实是在优化阶段使用的, 但是在转化为关系代数的时候就必须创建一个 Planner, 这也是 Calcite 在抽象上有待提升的地方.
// 创建VolcanoPlanner, VolcanoPlanner在后面的优化中还需要用到
VolcanoPlanner planner = new VolcanoPlanner(RelOptCostImpl.FACTORY, Contexts.of(config));
planner.addRelTraitDef(ConventionTraitDef.INSTANCE);
// 创建SqlToRelConverter
RelOptCluster cluster = RelOptCluster.create(planner, new RexBuilder(typeFactory));
SqlToRelConverter.Config converterConfig = SqlToRelConverter.config()
.withTrimUnusedFields(true)
.withExpand(false);
SqlToRelConverter converter = new SqlToRelConverter(
null,
validator,
catalogReader,
cluster,
StandardConvertletTable.INSTANCE,
converterConfig);
// 将SqlNode树转化为RelNode树
RelNode relNode = converter.convertQuery(validateSqlNode, false, true);
复制代码
RelNode
树其实可以理解为一个逻辑执行计划, 上述 SQL 对应的逻辑执行计划如下, 其中每一行都表示一个节点, 是RelNode
的实现类, 缩进表示父子关系.
LogicalSort(sort0=[$0], dir0=[ASC])
LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[SUM($3)])
LogicalProject(id=[$0], name=[$1], age=[$2], price=[$6])
LogicalFilter(condition=[AND(>=($2, 20), <=($2, 30))])
LogicalJoin(condition=[=($0, $4)], joinType=[inner])
LogicalTableScan(table=[[s, users]])
LogicalTableScan(table=[[s, orders]])
复制代码
查询优化
查询优化是 Calcite 的核心模块, 它主要有三部分组成:
Planner rules: 即优化规则, Calcite 已经内置了很多优化规则, 如谓词下推, 投影下推等. 用户也可定义自己的优化规则.
Metadata providers: 这里的元数据主要用于基于成本的优化(Cost-based Optimize, CBO)中, 包括表的行数, 表的大小, 给定列的值是否唯一等信息,
Planner engines: Calcite 提供了两种优化器实现, HepPlanner
用于实现基于规则的优化(Rule-based Optimize, RBO), VolcanoPlanner
用于实现基于成本的优化.
本文中暂且只使用VolcanoPlanner
, 执行优化的代码如下. EnumerableRules
中的规则用于将逻辑计划中的节点转化为对应的EnumerableRel
节点, 它是 Calcite 中提供的物理节点, 可用于生成执行代码.
// 优化规则
RuleSet rules = RuleSets.ofList(
CoreRules.FILTER_TO_CALC,
CoreRules.PROJECT_TO_CALC,
CoreRules.FILTER_CALC_MERGE,
CoreRules.PROJECT_CALC_MERGE,
CoreRules.FILTER_INTO_JOIN, // 过滤谓词下推到Join之前
EnumerableRules.ENUMERABLE_TABLE_SCAN_RULE,
EnumerableRules.ENUMERABLE_PROJECT_TO_CALC_RULE,
EnumerableRules.ENUMERABLE_FILTER_TO_CALC_RULE,
EnumerableRules.ENUMERABLE_JOIN_RULE,
EnumerableRules.ENUMERABLE_SORT_RULE,
EnumerableRules.ENUMERABLE_CALC_RULE,
EnumerableRules.ENUMERABLE_AGGREGATE_RULE);
Program program = Programs.of(RuleSets.ofList(rules));
RelNode optimizerRelTree = program.run(
planner,
relNode,
relNode.getTraitSet().plus(EnumerableConvention.INSTANCE),
Collections.emptyList(),
Collections.emptyList());
复制代码
经过优化后的输出如下, 可以看到所有的节点都变成了Enumerable
开头的物理节点, 它们的基类是EnumerableRel
.
EnumerableSort(sort0=[$0], dir0=[ASC])
EnumerableAggregate(group=[{0, 1, 2}], EXPR$3=[SUM($3)])
EnumerableCalc(expr#0..6=[{inputs}], proj#0..2=[{exprs}], price=[$t6])
EnumerableHashJoin(condition=[=($0, $4)], joinType=[inner])
EnumerableCalc(expr#0..2=[{inputs}], expr#3=[Sarg[[20..30]]], expr#4=[SEARCH($t2, $t3)], proj#0..2=[{exprs}], $condition=[$t4])
EnumerableTableScan(table=[[s, users]])
EnumerableTableScan(table=[[s, orders]])
复制代码
对比优化前后的计划, 另一个值得注意的变化是对users
表的过滤位置发生了变动, 从先 Join 后过滤变成了先过滤后 Join, 如下图所示.<br />
生成执行计划
一般的存储或计算系统都有自己的执行计划, 因此为了将物理计划转化为执行计划通常需要用户编写代码. 不过 Calcite 中也提供了一种执行计划生成方法, 为了完整性我们这里使用它来生成访问 CSV 文件的执行计划. 通过如下代码即可生成执行计划并读取 CSV 文件中的数据.
EnumerableRel enumerable = (EnumerableRel) optimizerRelTree;
Map<String, Object> internalParameters = new LinkedHashMap<>();
EnumerableRel.Prefer prefer = EnumerableRel.Prefer.ARRAY;
Bindable bindable = EnumerableInterpretable.toBindable(internalParameters,
null, enumerable, prefer);
Enumerable bind = bindable.bind(new SimpleDataContext(rootSchema.plus()));
Enumerator enumerator = bind.enumerator();
while (enumerator.moveNext()) {
Object current = enumerator.current();
Object[] values = (Object[]) current;
StringBuilder sb = new StringBuilder();
for (Object v : values) {
sb.append(v).append(",");
}
sb.setLength(sb.length() - 1);
System.out.println(sb);
}
复制代码
执行上述代码后可以看到如下结果.
1,Jack,28,42.40
2,John,21,199.90
4,Peter,24,2499.90
复制代码
至此, 我们成功地借助 Calcite 实现了使用 SQL 查询 CSV 文件中的数据, 其核心代码也就 200 行左右.
总结
本文通过一个小而全的案例囫囵地过了一遍 Calcite 的整个处理流程, 希望能借此构建对 Calcite 的整体理解. 具体到各个模块, Calcite 其实还有很多细节和概念, 笔者计划在后续的文章中再深入讲解. 通过本文的案例也可以看到, 借助 Calcite 可以十分方便地构建 SQL 访问层. 不过 Calcite 最强大的还是在于其可扩展性, 本文所述的每个步骤都可以进行自定义的扩展, 这也使其在业界得到了广泛的运用, 比如 Apache Flink 的 SQL API 就重度依赖 Calcite, 相信未来更多的大数据系统在构建 SQL 访问层时都会优先考虑 Calcite.
参考
[1] Apache Calcite: Hadoop中新型大数据查询引擎
[2] SQL over anything with an Optiq Adapter
[3] Apache Calcite: A Foundational Framework for Optimized Query Processing Over Heterogeneous Data Sources
[4] Apache Calcite 处理流程详解(一)
评论