写点什么

快速熟悉 MatrixOne 内核前端

作者:MatrixOrigin
  • 2024-02-06
    上海
  • 本文字数:9370 字

    阅读完需:约 31 分钟

Part 1 背景

首先简单科普下数据库内核的结构,一条 sql 语句的执行,主要涉及以下几个组件:

  • Parser :将 sql 语句生成抽象语法树(ast)。

  • Binder :负责验证语句的语义是否正确,并确定语句中涉及的表、列、函数等是否存在,最后将查询中逻辑名称与数据库中的实际对象绑定。

  • Planner :在绑定阶段完成后,Planner 开始处理查询的逻辑结构,并生成查询的执行计划,执行计划描述了执行查询的方式。

  • Optimizer:负责对生成的执行计划进行进一步的优化,考虑执行计划中的多个执行路径,并根据成本估算和查询需求选择最佳的执行路径,包括选择最优的索引、连接方法、访问顺序等。

  • Executor :按照规划器生成的执行计划来执行查询。

依据之前的学习经验,个人认为熟悉一个从未接触过的全新代码库最快速方法是:照着一条最简单最常规的运行链路,看代码与单步跟踪结合,理清主线逻辑,然后从主线慢慢拓展到各个组件各个 feature 分支,直至覆盖整个项目。本文介绍的就是从最常见的 sql 语句 select … from … join … where … group by … order by … limit … 出发,快速熟悉 MO 内核前端部分(主要包括 binder,planner 和 optimizer)的代码。


Part 2 大致执行流程

MO 服务端 handle 客户端发来的请求,中间请求传递的过程我们就忽略不看,最终请求会进入到 MysqlCmdExecutor.doComQuery()方法,这个方法的大致逻辑是:

  1. Parser 解析 sql 成 statement list,也就是 ast list;

  2. 执行每一条 statement。

// pkg/frontend/mysql_cmd_executor.godoComQuery() {    cws = GetComputationWrapper()    for cw in cws {        // 执行语句        executeStmt(cw)    }}
GetComputationWrapper() { // parse SQL语句 asts = parsers.Parse(sql) return [InitTxnComputationWrapper(stmt) for stmt in asts]}
复制代码

executeStmt 方法主要做三件事:

  1. 生成执行计划

  2. 执行查询

  3. 返回结果

MO 兼容 MySQL 通讯协议,按照 MySQL 协议,返回结果先发送 column 信息 ,然后发送具体数据,最后返回一个 EOF packet 结束这次通讯。Column 信息在生成执行计划之后就可以拿到,而具体数据会在 runner 得到结果之后发送。

// pkg/frontend/mysql_cmd_executor.goexecuteStmt(cw) {    // 生成执行计划    compile = cw.Compile()    // 发送列信息    for c in cw.columns {        SendColumnDefinitionPacket(c)    }    SendEOFPacketIf()    // 执行查询    runner = compile.(ComputationRunner)    runner.Run()    // 结束通讯    sendEOFOrOkPacket()}
复制代码

Part 3 生成执行计划

上一节中提到的 Compile 方法主要做两件事:生成执行计划以及执行器相关算子的编译

// pkg/frontend/computation_wrapper.goCompile() {    // 生成及优化执行计划    cw.plan = buildPlan(..)    // 编译执行代码    cw.compile = compile.New(...)    cw.compile.Compile()}
复制代码

我们先看 buildPlan 方法,由于下面的代码很多都是做了一层封装的简单调用,调用路径:buildPlan(mysql_cmd_executor.go) → BaseOptimizer.Optimize → BuildPlan(build.go) → runBuildSelectByBinder ,我们就略过中间部分了,直接看 runBuildSelectByBinder 方法:

// pkg/sql/plan/build.gorunBuildSelectByBinder(stmt) {    // 生成执行计划    builder := NewQueryBuilder(SelectType...)    bindCtx := NewBindContext(builder)    rootId := builder.buildSelect(stmt, bindCtx, true)    builder.qry.Steps = append(builder.qry.Steps, rootId)    // 优化执行计划    query := builder.createQuery()    return &Plan{query}}
复制代码

上面介绍过 binder 的主要职责是将查询语句中的库/表/列名与数据库中对应的对象绑定,在 MO 中 bind select 语句的方法就是 buildSelect ,代码大致如下:

// pkg/sql/plan/query_builder.gobuildSelect() {    preprocess WITH    build FROM    unfold stars and generate headings    bind WHERE    bind GROUP BY    bind HAVING    bind PROJECTION    bind TIME WINDOW    bind ORDER BY    bind LIMIT/OFFSET
append AGGREGATION node append TIME WINDOW node append WINDOW node append PROJECT node append DISTINCT node append SORT node (include LIMIT/OFFSET) append result PROJECT node}
复制代码

可以看到 buildSelect 就是对 select 语句中各个部分(WITH、FROM、WHERE(HAVING)、GROUP BY、TIME WINDOW、ORDER BY 及 LIMIT/OFFSET)按顺序分别进行绑定,下面就对常用的几个部分进行介绍:

1. FROM

MO bind FROM 子句的方法是 buildTable。

// pkg/sql/plan/query_builder.gobuildTable(stmt) {    switch tbl := stmt.(type) {    case *tree.Select:    case *tree.TableName:        // 获取db,table名        schema := tbl.SchemaName        table := tbl.ObjectName          // check db        schema, err = databaseIsValid(schema, builder.compCtx)        if err != nil {            return 0, err        }        // 得到tableDef        obj, tableDef := builder.compCtx.Resolve(schema, table)        if tableDef == nil {            return 0, moerr.NewParseError("table %q does not exist", table)        }        // 生成node        nodeID = builder.appendNode(&plan.Node{            NodeType:    nodeType,            Stats:       nil,            ObjRef:      obj,            TableDef:    tableDef,            BindingTags: []int32{builder.genNewTag()},        }, ctx)
case *tree.JoinTableExpr: // 如果是单表,bind left table if tbl.Right == nil { return builder.buildTable(tbl.Left, ctx, preNodeId, leftCtx) } // 否则buildJoinTable return builder.buildJoinTable(tbl, ctx) case *tree.TableFunction: case *tree.ParenTableExpr: case *tree.AliasedTableExpr: // bind table nodeID = builder.buildTable(tbl.Expr) // addBinding builder.addBinding(nodeID, ctx)}
复制代码

根据 sql 语法,能跟在 FROM 后面表达式类型有:

  • select 表达式

  • table 名

  • join table 表达式

  • 表函数

  • 带括号的 table 表达式

  • 带别名的 table 表达式

table 表达式主要有 table 名(TableName),带别名的 table 表达式(AliasedTableExpr)和 join table 表达式(JoinTableExpr), 为了保持形式上的统一,MO 的 sql parser 会把每个 TableName 都包装成 AliasedTableExpr,如果不带 alias,则 As 设为 nil;每个 AliasedTableExpr 都会包装成 JoinTableExpr,如果只是单表没有 join,则 Right 设为 nil。

举个例子,sql 语句 SELECT * FROM t1,parse 得到的 t1 的结构就类似这样的:

JoinTableExpr {    Left: AliasedTableExpr {        Expr: t1        As: nil    }    Right: nil}
复制代码

bind 这条 sql 语句的 FROM 子句过程大概是这样的:首先 JoinTableExpr 右表为 nil,就只 bind 左表;左表是个 AliasedTableExpr 结构,先 bind Expr(这里是 TableName 结构),然后将 bind 的信息添加到 BindContext(ctx)中(addBinding 方法)。

bind TableName 的逻辑大致如下,首先验证 db 是否存在,如果存在获取 table meta 信息,得到 table 的 column 信息(即 Resolve 方法,如果 table 不存在返回 nil),然后包装成 plan.Node 返回。

addBinding 方法是将 bind 完的 node 中的信息,比如 bindingTag,nodeID,表信息(tableName,tableID)以及列信息(colName,colType,是否隐藏)等信息包装成 Binding 结构添加到 ctx 中,这样在后续的 bind 过程中,就可以依据这些 binding 信息对表名/列名进行解析。

// pkg/sql/plan/query_builder.goaddBinding(nodeID, ctx) {    // 获取node    node := getNode(nodeID)    // 生成col信息    colLength := len(node.TableDef.Cols)    cols := make([]string, colLength)    colIsHidden := make([]bool, colLength)    types := make([]*plan.Type, colLength)    tag := node.BindingTags[0]      for i, col := range node.TableDef.Cols {        if i < len(alias.Cols) {            cols[i] = string(alias.Cols[i])        } else {            cols[i] = col.Name        }        colIsHidden[i] = col.Hidden        types[i] = col.Typ        name := table + "." + cols[i]        builder.nameByColRef[[2]int32{tag, int32(i)}] = name    }    // 生成Binding结构    binding = NewBinding(tag, nodeID, table, node.TableDef.TblId, cols, colIsHidden, types)    // 将Binding添加到BindContext    ctx.bindings = append(ctx.bindings, binding)    ctx.bindingByTag[binding.tag] = binding    ctx.bindingByTable[binding.table] = binding}
复制代码

2. WHERE / HAVING

WHERE 子句的 bind 过程为:先将过滤条件以 AND 为分隔符进行切割,然后对每个表达式递归地进行 bind,比如过滤条件 A AND B AND C 可以被分割为 A,B 和 C, 然后分别 bind(A),bind(B),bind(C);过滤条件 bind 完之后,需要将包含子查询的语句进行拍平,把子查询转换成等价的 join 语句,因为子查询的执行效率极低,会拖慢整个查询的速度,转换为 join 可以极大提升系统性能。

怎么将子查询转换为 join 这个问题比较复杂,有兴趣可以参考论文:https://github.com/matrixorigin/docs/blob/main/readings/subqueries-sigmod01.pdf

HAVING 子句和 WHERE 子句一样,都是起到过滤作用,但是语法上不支持子查询,因此 bind 过程和 WHERE 前半部分一样,就不多做介绍了。

// pkg/sql/plan/query_builder.go// split过滤条件whereList := splitAndBindCondition(clause.Where.Expr, NoAlias, ctx)
var newFilterList []*plan.Exprvar expr *plan.Exprfor _, cond := range whereList { // 优化子查询 nodeID, expr, err = builder.flattenSubqueries(nodeID, cond, ctx) newFilterList = append(newFilterList, expr)}// 生成node并添加到ctxnodeID = builder.appendNode(&plan.Node{ NodeType: plan.Node_FILTER, Children: []int32{nodeID}, FilterList: newFilterList, NotCacheable: notCacheable,}, ctx)
复制代码

3. GROUP BY

GROUP BY 子句的 bind 过程比 HAVING 更简单一点,都需要将表达式切割,直接对每个 GROUP BY 子表达式进行递归 bind 即可。

下面的 PROEJCTION,ORDER BY,LIMIT/OFFSET 的 bind 过程基本都和 GROUP BY 类似,除了需要做些额外的信息设置(比如排序的顺序是正序还是倒序等)和检查(比如 limit 后面跟的常数不能为负数等),就不做重复说明了。

// pkg/sql/plan/query_builder.gogroupBinder := NewGroupBinder(builder, ctx)for _, group := range clause.GroupBy {    // 补全列名,col1 -> t1.col1    group = ctx.qualifyColumnNames(group, AliasAfterColumn)    // bind子表达式    groupBinder.BindExpr(group, 0, true)}
复制代码

4. PROJECTION

// pkg/sql/plan/query_builder.gofor i := range selectList {    // bind子表达式    expr := projectionBinder.BindExpr(selectList[i].Expr, 0, true)    ctx.projects = append(ctx.projects, expr)}
复制代码

5. ORDER BY

// pkg/sql/plan/query_builder.goorderBinder := NewOrderBinder(projectionBinder, selectList)orderBys := make([]*plan.OrderBySpec, 0, len(astOrderBy))
for _, order := range astOrderBy { // bind子表达式 expr := orderBinder.BindExpr(order.Expr) orderBy := &plan.OrderBySpec{ Expr: expr, Flag: plan.OrderBySpec_INTERNAL, } // 设置Flag set orderBy.Flag orderBys = append(orderBys, orderBy)}
复制代码

6. LIMIT/OFFSET

// pkg/sql/plan/query_builder.golimitBinder := NewLimitBinder(builder, ctx)if astLimit.Offset != nil {    // bind子表达式    offsetExpr := limitBinder.BindExpr(astLimit.Offset, 0, true)    // check非负性    if ifNegative(offsetExpr) {        return 0, moerr.NewSyntaxError(builder.GetContext(), "offset value must be nonnegative")    }}if astLimit.Count != nil {    // bind子表达式    limitExpr := limitBinder.BindExpr(astLimit.Count, 0, true)    // check非负性    if ifNegative(limitExpr) {        return 0, moerr.NewSyntaxError(builder.GetContext(), "offset value must be nonnegative")    }}
复制代码

Part 4 优化执行计划

runBuildSelectByBinder 方法中,除了 buildSelect 方法外,还有一部分工作是做查询优化,即 createQuery 方法:

// pkg/sql/plan/query_builder.gocreateQuery() {    for i, rootID := range builder.qry.Steps {        // rule 1        builder.rewriteDistinctToAGG(rootID)        // rule 2        builder.rewriteEffectlessAggToProject(rootID)        // rule 3        rootID = builder.pushdownFilters(rootID, nil, false)        // rule 4        err := foldTableScanFilters(builder.compCtx.GetProcess(), builder.qry, rootID)        if err != nil {            return nil, err        }        // rule 5        builder.pushdownLimit(rootID)        // rule 6        builder.removeSimpleProjections(rootID, plan.Node_UNKNOWN, false, make(map[[2]int32]int))        ...         // rule n    }}
复制代码

从代码上看,createQuery 方法逻辑很简单,就是将所有优化规则一条一条应用于之前生成的执行计划上,目前最常见的优化规则有:谓词下推,列裁剪,投影消除,外连接消除,聚合算子下推/上提,子查询优化(上文提到过,MO 在生成 plan 阶段已经做了这个优化)等等。

这里就介绍一种比较常见且重要的策略:谓词下推。

谓词也就是过滤条件,谓词下推是从直觉上非常容易理解的一个优化:在越靠近数据读取的位置做过滤,后续需要处理的数据量就越少,整个查询的效率就越高。

例如这条 sql: select * from t1, t2 where t1.col1 > 1 and t2.col2 > 2,假设 t1, t2 都有 1 万条数据,2 张表满足过滤条件的数据量都是 100 条,如果先对 t1,t2 做笛卡尔积再做过滤,需要处理的数据是 1 亿条,如果先对 t1,t2 做过滤再做笛卡尔积,那需要处理的数据条数就只有 1 万条,能极大减少需要处理的数据量。

MO 中谓词下推优化在 pushdownFilters 方法中:

// pkg/sql/plan/opt_misc.go// filters是指下推到当前节点的过滤条件pushdownFilters(nodeID, filters) {    node := builder.qry.Nodes[nodeID]    var canPushdown, cantPushdown []*plan.Expr    // 带limit的语句不能下推    if node.Limit != nil {        cantPushdown = filters        filters = nil    }        switch node.NodeType {    case plan.Node_FILTER:        canPushdown = filters        // 收集当前节点的过滤条件        for _, filter := range node.FilterList {            canPushdown = append(canPushdown, splitPlanConjunction(filter)...)        }        // 下推到子节点        childID, cantPushdownChild := pushdownFilters(node.Children[0], canPushdown)        // 如果所有过滤条件都下推了,当前节点可以删除        if len(cantPushdownChild) > 0 {            node.Children[0] = childID            node.FilterList = cantPushdownChild        } else {            nodeID = childID        }    case plan.Node_JOIN:        var leftPushdown, rightPushdown []*plan.Expr        for i, filter := range filters {            joinSides[i] = getJoinSide(filter, leftTags, rightTags, markTag)            // 是否能将left join转化为inner join            if node.JoinType == plan.Node_LEFT && joinSides[i]&JoinSideRight != 0 && rejectsNull(filter, builder.compCtx.GetProcess()) {                node.JoinType = plan.Node_INNER                break            }        }                node.OnList = splitPlanConjunctions(node.OnList)                if node.JoinType == plan.Node_INNER {            // 如果能转化为inner join,重新计算joinSide            for _, cond := range node.OnList {                // 收集ON子句的过滤条件                filters = append(filters, splitPlanConjunction(cond)...)            }            node.OnList = nil                    joinSides = make([]int8, len(filters))            for i, filter := range filters {                joinSides[i] = getJoinSide(filter, leftTags, rightTags, markTag)            }        } else if node.JoinType == plan.Node_LEFT {            // 如果不能转化为inner join, 收集能下推到右表的过滤条件            var newOnList []*plan.Expr            for _, cond := range node.OnList {                conj := splitPlanConjunction(cond)                for _, conjElem := range conj {                    side := getJoinSide(conjElem, leftTags, rightTags, markTag)                    if side&JoinSideLeft == 0 {                        rightPushdown = append(rightPushdown, conjElem)                    } else {                        newOnList = append(newOnList, conjElem)                    }                }            }            node.OnList = newOnList        }                // 分别收集能下推到左表右表的条件        for i, filter := range filters {            switch joinSides[i] {            case JoinSideLeft:                if node.JoinType != plan.Node_OUTER {                    leftPushdown = append(leftPushdown, filter)                } else {                    cantPushdown = append(cantPushdown, filter)                }            case JoinSideRight:                if node.JoinType == plan.Node_INNER {                    rightPushdown = append(rightPushdown, filter)                } else {                    cantPushdown = append(cantPushdown, filter)                }            // ... case XXXX                default:                cantPushdown = append(cantPushdown, filter)            }        }                // 下推到左子节点        childID, cantPushdownChild := pushdownFilters(node.Children[0], leftPushdown)        // 如果有不能下推的条件,左子节点增加一个FILTER节点        if len(cantPushdownChild) > 0 {            childID = builder.appendNode(&plan.Node{                NodeType:   plan.Node_FILTER,                Children:   []int32{node.Children[0]},                FilterList: cantPushdownChild,            }, nil)        }        node.Children[0] = childID        // 下推到右子节点,处理同上        childID, cantPushdownChild = pushdownFilters(node.Children[1], rightPushdown)        if len(cantPushdownChild) > 0 {            childID = builder.appendNode(&plan.Node{                NodeType:   plan.Node_FILTER,                Children:   []int32{node.Children[1]},                FilterList: cantPushdownChild,            }, nil)        }        node.Children[1] = childID    }
case plan.Node_PROJECT: case plan.Node_AGG: case plan.Node_WINDOW: case plan.Node_TABLE_SCAN, plan.Node_EXTERNAL_SCAN: case plan.Node_FUNCTION_SCAN: ... }
复制代码

带 limit 的语句不能将谓词下推,因为先做 limit 再做 filter,和先做 filter 再做 limit 的结果是不一样的。

然后根据节点类型做相应处理,这里看下常见的 FILTER 节点和 JOIN 节点:

FILTER 节点

FILTER 类型是最简单的情形,只需要将当前节点的过滤条件与上层传来的合并,尝试下推到子节点:如果所有条件都能下推,那当前节点是个没有过滤条件的 FILTER 节点,可以直接删除;如果有条件不能下推,那就在当前节点处理。

JOIN 节点

首先介绍一个将 left join 简化为 inner join 的小优化,left join 的结果包含左表的所有列,对于没有匹配的右表列,用 NULL 值填充;如果下推的过滤条件会将包含右表列中包含 NULL 值的行过滤掉,那这个 left join 就可以等价转化为 inner join。

这个转化的好处是,对于 inner join,ON 子句的过滤条件如果只涉及左表或者右表的列,可以将该条件下推到对应的左右子节点。

谓词下推 JOIN 节点先尝试将 left join 转化为 inner join,然后根据 join side 分别收集下推到左右表的过滤条件,递归下推到左右子节点,对于不能下推到子节点的条件,在子节点上方增加一个 FILTER 节点来处理这些条件。

Part 5 总结

本文从一个零基础学习者的视角,从最常见的 select 语句出发,简单介绍了 MO 内核代码的前端部分,包括 binder,planner 和 optimizer 3 个组件,介绍了常见 sql 子句 bind 以及 plan 生成的过程,基于 rule 的优化器代码框架以及谓词下推的优化策略。

本文主要目标还是梳理清楚主流程脉络为主,实际 MO 的源码要复杂的的多,涉及的细节及各类 corner case 非常多,想仔细研究 MO 以及有兴趣深挖细节的同学可以下载源码仔细阅读。


关于 MatrixOne

MatrixOne 是一款基于云原生技术,可同时在公有云和私有云部署的多模数据库。该产品使用存算分离、读写分离、冷热分离的原创技术架构,能够在一套存储和计算系统下同时支持事务、分析、流、时序和向量等多种负载,并能够实时、按需的隔离或共享存储和计算资源。云原生数据库 MatrixOne 能够帮助用户大幅简化日益复杂的 IT 架构,提供极简、极灵活、高性价比和高性能的数据服务。

MatrixOne 企业版和 MatrixOne 云服务自发布以来,已经在互联网、金融、能源、制造、教育、医疗等多个行业得到应用。得益于其独特的架构设计,用户可以降低多达 70%的硬件和运维成本,增加 3-5 倍的开发效率,同时更加灵活的响应市场需求变化和更加高效的抓住创新机会。在相同硬件投入时,MatrixOne 可获得数倍以上的性能提升。

MatrixOne 秉持开源开放、生态共建的理念,核心代码全部开源,全面兼容 MySQL 协议,并与合作伙伴打造了多个端到端解决方案,大幅降低用户的迁移和使用成本,也帮助用户避免了供应商锁定风险。

关键词:超融合数据库、多模数据库、云原生数据库、国产数据库

MatrixOrigin 官网:新一代超融合异构开源数据库-矩阵起源(深圳)信息科技有限公司 MatrixOne

Github 仓库:GitHub - matrixorigin/matrixone: Hyperconverged cloud-edge native database

发布于: 刚刚阅读数: 4
用户头像

MatrixOrigin

关注

还未添加个人签名 2021-12-06 加入

一个以技术创新和用户价值为核心的基础软件技术公司。

评论

发布
暂无评论
快速熟悉 MatrixOne 内核前端_分布式数据库_MatrixOrigin_InfoQ写作社区