写点什么

读 NebulaGraph 源码 | 查询语句 LOOKUP 的一生

作者:NebulaGraph
  • 2023-01-05
    浙江
  • 本文字数:20982 字

    阅读完需:约 69 分钟


本文由社区用户 Milittle 供稿


LOOKUP 是图数据库 NebulaGraph 的一个查询语句。它依赖索引,可以查询点或者边的信息。在本文,我将着重从源码的角度解析一下 LOOKUP 语句的一生是如何度过的。


本文源码阅读基于内核源码的 v3.3.0 版本,详见 GitHub https://github.com/vesoft-inc/nebula/releases/tag/v3.3.0

读源码之前

首先,我们需要明确 NebulaGraph 中 LOOKUP 语句的语法:


LOOKUP ON {<vertex_tag> | <edge_type>}[WHERE <expression> [AND <expression> ...]]YIELD <return_list> [AS <alias>][<clause>];
<return_list> <prop_name> [AS <col_alias>] [, <prop_name> [AS <prop_alias>] ...];
复制代码


  • <vertex_tag> 是 Tag 的类型,比如:数据集 basketballplayer 中的 player 和 team;

  • <edge_type> 是 EdgeType 的类型,比如:数据集 basketballplayer 中的 follow 和 serve;

  • <expression> 是表达式;

  • <return_list> 是返回的列表,比如:id(vertex),这部分内容详细参见 nGQL 的 Schema 函数 nGQL Schema 函数详解

  • <clause> 是子句,可以是 ORDER BYLIMIT 等子句,子句详情参见 子句


这里有个 LOOKUP 使用注意事项:


  1. 如果已经存在点、边,但是没有索引。必须在新建索引后再通过 REBUILD INDEX 重建索引,才能使其生效;

读语句解析原理

为了便于大家理解这里放一张 NebulaGraph 计算层的服务架构:



我们再来看下此次阅读的语句,是一个比较简单的 LOOKUP Sentence。用比较简单的语句来解析 LOOKUP 语句的基本原理,后面可以慢慢扩展条件语句和子句:


// 我们需要分析以下语句LOOKUP ON player YIELD id(vertex);
复制代码

1. 从 Parser 开始

我们先从 Parser 入手分析 LOOKUP Sentence 的组成部分。这里不介绍 lex 词法分析和 yacc 语法分析,感兴趣的小伙伴自己可以了解一下。下面,我们直接上我们关心的部分:


我们打开源码,找到文件 src/parser/parser.yy 文件,里面有所有语句的定义。我们 定位到 LOOKUP Sentence,是这里 https://github.com/Milittle/nebula/blob/90a3107044ce1621c7834a0f36a4eef273ec2f31/src/parser/parser.yy#L2176。下面便是 LOOKUP 语句的定义,你也可以拷贝上面的链接访问 GitHub 查看。来,我们分析分析每个部分:


/// LOOKUP 语句的语法定义
lookup_sentence : KW_LOOKUP KW_ON name_label lookup_where_clause yield_clause { $$ = new LookupSentence($3, $4, $5); } ;
// KW_LOOKUP 是 LOOKUP 的关键字,大小写不敏感的// KW_ON 是 ON 的关键字,大小写不敏感的// name_label 是 LABEL 的定义,也是 strval,简单的说就是字符串// lookup_where_clause 是 WHERE 子句的定义,这个我们后面有机会扩展介绍,也有一个对应的语义定义// yield_clause 这个是 YIELD 输出数据的关键语句,在 v3.x 版本以后,YIELD 子句是必须要指定的,不指定会报语法错误
/// YIELD clause 的语法定义,其实 YIELD clause 用在了很多其他语句中,比如 GO、FIND PATH、GET SUBGRAPH
yield_clause : %empty { $$ = nullptr; } | KW_YIELD yield_columns { if ($2->hasAgg()) { delete($2); throw nebula::GraphParser::syntax_error(@2, "Invalid use of aggregating function in yield clause."); } $$ = new YieldClause($2); } | KW_YIELD KW_DISTINCT yield_columns { if ($3->hasAgg()) { delete($3); throw nebula::GraphParser::syntax_error(@3, "Invalid use of aggregating function in yield clause."); } $$ = new YieldClause($3, true); } ;
// 可以为 empty,但是后面 validator 会进行校验,不指定就会报 Error// KW_YIELD 是 YIELD 的关键字,大小写不敏感// yield_columns 是输出的列信息,也有对应的一个语法定义// KW_DISTINCT 是 distinct 关键字,表示是否去除重复数据的语义,大小写不敏感
// LOOKUP Sentence 就是上面所有的信息组成,都会被构造在这个类里面,也就是 LOOKUP 语句的内容了
复制代码


下面,我们继续从 lookup_sentence 语句的定义往下规约看,可以看到它属于 src/parser/parser.yy:2917: traverse_sentence → src/parser/parser.yy:2936: piped_sentence → src/parser/parser.yy:2942: set_sentence → src/parser/parser.yy:3924: sentence → src/parser/parser.yy:3933: seq_sentence


其实,上面这些你可以暂时忽略,因为这些都是对 sentence 的规约抽象,有些集合语句和管道语句。这里,我想表达的是这些语句一定会映射到 seq_sentence 上的,即,序列语句。你可以把它理解为用分号分隔的复合语句,只不过这里面只包含了一条 lookup_sentence 而已。这样子,就好理解为什么下文在 seq_sentence 寻找入口代码,而不是 lookup_sentence.

2. 从 nGQL 解析看 LOOKUP 语句

第二,从 nGQL 的解析过程继续看 LOOKUP Sentence。其实,刚才已经强调过了,这里解析出来的对象一定是 seq_sentence


/// src/graph/service/QueryInstance.cpp
void QueryInstance::execute() { Status status = validateAndOptimize(); // 1. 负责 validate、执行计划生成、执行计划优化等工作 if (!status.ok()) { onError(std::move(status)); return; }
// Sentence is explain query, finish if (!explainOrContinue()) { // 6. 判断是否是 explain 语句。如果是,直接输出执行计划,不做实际物理算子执行 onFinish(); return; }
// The execution engine converts the physical execution plan generated by the Planner into a // series of Executors through the Scheduler to drive the execution of the Executors. scheduler_->schedule() // 7. 实际物理算子调度执行的部分,通过 DAG,对每一个 plan -> executor 的转换执行(后续步骤会进行详解) .thenValue([this](Status s) { if (s.ok()) { this->onFinish(); // 8. 这里是干完了所有物理执行计划,然后开始处理客户端 resp 了 } else { this->onError(std::move(s)); // 9. 这里是上面的过程出错了,需要处理 Error 信息 } }) // 10. 下面是处理一些异常情况,也是走错误分支 .thenError(folly::tag_t<ExecutionError>{}, [this](const ExecutionError &e) { onError(e.status()); }) .thenError(folly::tag_t<std::exception>{}, [this](const std::exception &e) { onError(Status::Error("%s", e.what())); });}
// 这个函数执行的是注释 1 的内容Status QueryInstance::validateAndOptimize() { auto *rctx = qctx()->rctx(); auto &spaceName = rctx->session()->space().name; VLOG(1) << "Parsing query: " << rctx->query(); // Result of parsing, get the parsing tree // 2. 第一步中的语法解析就是这里的解释,对 nGQL 进行词法语法解析,出来的 result 就是 Sentence*,通过我们上面的分析,这里吐出来的就是 seq_sentence 了 auto result = GQLParser(qctx()).parse(rctx->query()); NG_RETURN_IF_ERROR(result); sentence_ = std::move(result).value(); // 3. 这里是做指标的统计。这个可以在 dashboard 里面展示 if (sentence_->kind() == Sentence::Kind::kSequential) { size_t num = static_cast<const SequentialSentences *>(sentence_.get())->numSentences(); stats::StatsManager::addValue(kNumSentences, num); if (FLAGS_enable_space_level_metrics && spaceName != "") { stats::StatsManager::addValue( stats::StatsManager::counterWithLabels(kNumSentences, {{"space", spaceName}}), num); } } else { stats::StatsManager::addValue(kNumSentences); if (FLAGS_enable_space_level_metrics && spaceName != "") { stats::StatsManager::addValue( stats::StatsManager::counterWithLabels(kNumSentences, {{"space", spaceName}})); } }
// Validate the query, if failed, return // 4. 这个是源码校验 nGQL 解析出来的内容是否符合我们的预期,如果不符合预期就报语法错误 // validate 过程还会涉及到执行计划的生成,重点函数 NG_RETURN_IF_ERROR(Validator::validate(sentence_.get(), qctx())); // Optimize the query, and get the execution plan // 5. 对上面生成的执行计划进行 RBO 规则的优化,这个留在后面有机会再介绍 NG_RETURN_IF_ERROR(findBestPlan()); stats::StatsManager::addValue(kOptimizerLatencyUs, *(qctx_->plan()->optimizeTimeInUs())); if (FLAGS_enable_space_level_metrics && spaceName != "") { stats::StatsManager::addValue( stats::StatsManager::histoWithLabels(kOptimizerLatencyUs, {{"space", spaceName}})); }
return Status::OK();}
复制代码


我们按照上面的注释部分进行讲解,有的比较容易的部分,像注释 1、2、3、5。我们下面重点介绍注释 4 的部分


// src/graph/validator/Validator.cpp
// Entry of validating sentence.// Check session, switch space of validator context, create validators and validate.// static// 1. 参数 sentence 就是刚才我们从语法解析器中拿到的 seq_sentence// 2. 参数 qctx 是我们查询上下文,一个语句进来对应一个查询上下文,这个是在 QueryEngine 里面生成的,感兴趣可以自行阅读一下Status Validator::validate(Sentence* sentence, QueryContext* qctx) { DCHECK(sentence != nullptr); DCHECK(qctx != nullptr);
// Check if space chosen from session. if chosen, add it to context. auto session = qctx->rctx()->session(); if (session->space().id > kInvalidSpaceID) { auto spaceInfo = session->space(); qctx->vctx()->switchToSpace(std::move(spaceInfo)); }
// 3. 既然我们需要校验该 sentence 是否符合我们的预期,则需要根据 sentence 的类型,创建一个 validator,记住目前是 seq_sentence // 所以生成的就是 SequentialValidator,可以直接看下 makeValidator 函数的 switch case auto validator = makeValidator(sentence, qctx); // 4. 调用 validator 进行校验,我们切换到下面的函数中 NG_RETURN_IF_ERROR(validator->validate());
auto root = validator->root(); if (!root) { return Status::SemanticError("Get null plan from sequential validator"); } qctx->plan()->setRoot(root); return Status::OK();}
// 5. 所有子类 validator,调用 validate 方法,进行校验// Validate current sentence.// Check validator context, space, validate, duplicate reference columns,// check permission according to sentence kind and privilege of user.Status Validator::validate() { if (!vctx_) { VLOG(1) << "Validate context was not given."; return Status::SemanticError("Validate context was not given."); }
if (!sentence_) { VLOG(1) << "Sentence was not given"; return Status::SemanticError("Sentence was not given"); }
if (!noSpaceRequired_ && !spaceChosen()) { VLOG(1) << "Space was not chosen."; return Status::SemanticError("Space was not chosen."); }
if (!noSpaceRequired_) { space_ = vctx_->whichSpace(); VLOG(1) << "Space chosen, name: " << space_.spaceDesc.space_name_ref().value() << " id: " << space_.id; }
auto vidType = space_.spaceDesc.vid_type_ref().value().type_ref().value(); vidType_ = SchemaUtil::propTypeToValueType(vidType);
// 6. 调用子类 validateImpl NG_RETURN_IF_ERROR(validateImpl());
// Check for duplicate reference column names in pipe or var statement NG_RETURN_IF_ERROR(checkDuplicateColName());
// Execute after validateImpl because need field from it if (FLAGS_enable_authorize) { NG_RETURN_IF_ERROR(checkPermission()); }
// 7. 这里是生成执行计划调用 NG_RETURN_IF_ERROR(toPlan());
return Status::OK();}
复制代码


讲了这么久了,啥时候到 LOOKUP。只能说快了,因为第一次讲源码,一些上下文信息需要讲清楚,不然大家一看就看得云里雾里了。

3. 深入到 validator

下面,我们要进入 SequentialValidator.cppvalidateImpl() 去一探究竟。


// src/graph/validator/SequentialValidator.cpp
// Validator of sequential sentences which combine multiple sentences, e.g. GO ...; GO ...;// Call validator of sub-sentences.Status SequentialValidator::validateImpl() { Status status; if (sentence_->kind() != Sentence::Kind::kSequential) { return Status::SemanticError( "Sequential validator validates a SequentialSentences, but %ld is " "given.", static_cast<int64_t>(sentence_->kind())); } auto seqSentence = static_cast<SequentialSentences*>(sentence_); auto sentences = seqSentence->sentences();
if (sentences.size() > static_cast<size_t>(FLAGS_max_allowed_statements)) { return Status::SemanticError("The maximum number of statements allowed has been exceeded"); }
DCHECK(!sentences.empty());
// 我们的 StartNode 就是这里创建出来的 seqAstCtx_->startNode = StartNode::make(seqAstCtx_->qctx); // 一般序列语句中会放很多语句,也就是分号分隔的语句,这里我们只有一条语句就是 lookup_sentence // LOOKUP 语句创建出来 LookupValidator,终于看到曙光了 for (auto* sentence : sentences) { auto validator = makeValidator(sentence, qctx_); NG_RETURN_IF_ERROR(validator->validate()); seqAstCtx_->validators.emplace_back(std::move(validator)); }
return Status::OK();}
复制代码

4. 读一读 LookupValidator

终于,看到点 LOOKUP 的影子了,LookupValidator 驾到:


// src/graph/validator/LookupValidator.cpp
// LOOKUP 的 validateImpl 比较简洁,直接对 From Where Yield e分别进行校验
Status LookupValidator::validateImpl() { lookupCtx_ = getContext<LookupContext>();
// 详情请见下面的子函数分析 NG_RETURN_IF_ERROR(validateFrom()); // 此次不涉及,我们先不做分析 NG_RETURN_IF_ERROR(validateWhere()); // 详情请见下面的子函数分析 NG_RETURN_IF_ERROR(validateYield()); return Status::OK();}
// Validate specified schema(tag or edge) from sentenceStatus LookupValidator::validateFrom() { auto spaceId = lookupCtx_->space.id; auto from = sentence()->from(); // 根据 spaceId 和指定的 label_name 查询 Schema auto ret = qctx_->schemaMng()->getSchemaIDByName(spaceId, from); NG_RETURN_IF_ERROR(ret); // 指定的是不是边类型 lookupCtx_->isEdge = ret.value().first; // 指定的 schemaId lookupCtx_->schemaId = ret.value().second; schemaIds_.emplace_back(ret.value().second); return Status::OK();}
// Validate yield clause.Status LookupValidator::validateYield() { auto yieldClause = sentence()->yieldClause(); if (yieldClause == nullptr) { return Status::SemanticError("Missing yield clause."); } // 这个是判断是否指定了 distinct 关键字,用于后续生成 dedup lookupCtx_->dedup = yieldClause->isDistinct(); lookupCtx_->yieldExpr = qctx_->objPool()->makeAndAdd<YieldColumns>();
// 如果是边类型,返回的列中,有 src、dst、rank、type if (lookupCtx_->isEdge) { idxReturnCols_.emplace_back(nebula::kSrc); idxReturnCols_.emplace_back(nebula::kDst); idxReturnCols_.emplace_back(nebula::kRank); idxReturnCols_.emplace_back(nebula::kType); // 校验边类型 NG_RETURN_IF_ERROR(validateYieldEdge()); } else { // 如果点类型、返回的列中有 vid idxReturnCols_.emplace_back(nebula::kVid); // 校验点类型,这次我们介绍点类型的校验 NG_RETURN_IF_ERROR(validateYieldTag()); } if (exprProps_.hasInputVarProperty()) { return Status::SemanticError("unsupport input/variable property expression in yield."); } if (exprProps_.hasSrcDstTagProperty()) { return Status::SemanticError("unsupport src/dst property expression in yield."); } extractExprProps(); return Status::OK();}
// Validate yield clause when lookup on tag.// Disable invalid expressions, check schema name, rewrites expression to fit semantic,// check type and collect properties.Status LookupValidator::validateYieldTag() { auto yield = sentence()->yieldClause(); auto yieldExpr = lookupCtx_->yieldExpr; // yield 子句里面的每一个逗号分隔的就是一个 col、我们的示例语句是 id(vertex) // src/parser/parser.yy:1559 对 col 进行了定义 for (auto col : yield->columns()) { // 如果发现表达式有 Edge 类型的,则直接把语义错误 if (ExpressionUtils::hasAny(col->expr(), {Expression::Kind::kEdge})) { return Status::SemanticError("illegal yield clauses `%s'", col->toString().c_str()); } // 如果是 label 属性,则进行表达式名字的校验,比如 yield player.name 这种语句 if (col->expr()->kind() == Expression::Kind::kLabelAttribute) { const auto& schemaName = static_cast<LabelAttributeExpression*>(col->expr())->left()->name(); if (schemaName != sentence()->from()) { return Status::SemanticError("Schema name error: %s", schemaName.c_str()); } } // 这块应该是重写表达式,有 label 属性转换为 Tag 的 prop,这里不是特别清楚,后续精读一下 col->setExpr(ExpressionUtils::rewriteLabelAttr2TagProp(col->expr())); NG_RETURN_IF_ERROR(ValidateUtil::invalidLabelIdentifiers(col->expr()));
auto colExpr = col->expr(); // 推测表达式的类型 auto typeStatus = deduceExprType(colExpr); NG_RETURN_IF_ERROR(typeStatus); // 组织输出,由名字和类型组成的集合对象 outputs_.emplace_back(col->name(), typeStatus.value()); yieldExpr->addColumn(col->clone().release()); NG_RETURN_IF_ERROR(deduceProps(colExpr, exprProps_, &schemaIds_)); } return Status::OK();}
复制代码


到这里,LOOKUP 的 validator 工作差不多完事了。

5. 语句如何变成执行计划

介绍得不够细致,我还在熟悉过程,接下来就是介绍将 sentence 转换成执行计划的过程了。

执行计划生成

执行计划的生成,像是一些简单的语句,就通过子类的 validatortoPlan 直接生成了,比如:SHOW HOSTS 这个语句,就是直接在 ShowHostsValidator::toPlan 方法中直接生成执行计划。但是,对于一些比较复杂的语句来说,子类 validator 都没有实现 toPlan 方法,也就是需要借助父类的 toPlan 方法来生成执行计划。比如,本文在读的 LOOKUP 语句也属于复杂语句:


// src/graph/validator/Validator.cpp
// 这里就是复杂语句生成执行计划的入口// 需要配合 AstContext 来生成,对于 LOOKUP 语句来说,就是 LookupContext// Call planner to get final execution plan.Status Validator::toPlan() { // **去子类 LookupValidator 的 getAstContext() 方法看下,是不是返回的是 LookupContext** auto* astCtx = getAstContext(); if (astCtx != nullptr) { astCtx->space = space_; } // 利用抽象语法树上下文,借用 Planner 的 toPlan 生成具体的执行计划 auto subPlanStatus = Planner::toPlan(astCtx); NG_RETURN_IF_ERROR(subPlanStatus); auto subPlan = std::move(subPlanStatus).value(); // 将返回的 subPlan 对 root 和 tail 进行填充 root_ = subPlan.root; tail_ = subPlan.tail; VLOG(1) << "root: " << root_->kind() << " tail: " << tail_->kind(); return Status::OK();}
复制代码

6. 进入 toPlan() 一探究竟

从章节 5. 上面获知,需要进入 Planner 的 toPlan 方法一探究竟


// src/graph/planner/Planner.cpp
StatusOr<SubPlan> Planner::toPlan(AstContext* astCtx) { if (astCtx == nullptr) { return Status::Error("AstContext nullptr."); } const auto* sentence = astCtx->sentence; DCHECK(sentence != nullptr); // 从抽象语法树的执行上下文取到我们的 sentence // 下面的 plannerMap 是我们在 src/graph/planner/PlannersRegister.cpp 注册好的,一些复杂的语句都在这里注册好了 auto planners = plannersMap().find(sentence->kind()); if (planners == plannersMap().end()) { return Status::Error("No planners for sentence: %s", sentence->toString().c_str()); } for (auto& planner : planners->second) { // second 是语句具体对应的 planner 的实例化对象: MatchAndInstantiate if (planner.match(astCtx)) { // match 方法是具体 planner 的 match 方法,对应到 LookupPlaner,就是 match // 这里的 instantiate 是 LookupPlanner 的 make 方法 // 这里的 transform 是拿着 lookupcontext 生成执行计划的函数 return planner.instantiate()->transform(astCtx); } } return Status::Error("No planner matches sentence: %s", sentence->toString().c_str());}
复制代码

7. 计划中的 transform()

我们分析到这里,使用了 Planner 的 toPlan 方法生成一些复杂语句的执行计划。接下来,就是进去 LookupPlanner 的 transform 方法从 LookupContext 转换到执行计划的过程了。我们直接定位到 LookupPlanner 的 transform 方法上:


// src/graph/planner/ngql/LookupPlanner.cpp
StatusOr<SubPlan> LookupPlanner::transform(AstContext* astCtx) { // 是不是我们上面提到的 lookupContext auto lookupCtx = static_cast<LookupContext*>(astCtx); auto qctx = lookupCtx->qctx; // ON 后面的 name_label auto from = static_cast<const LookupSentence*>(lookupCtx->sentence)->from(); SubPlan plan; // 如果是边的话,生成的是 EdgeIndexFullScan if (lookupCtx->isEdge) { auto* edgeIndexFullScan = EdgeIndexFullScan::make(qctx, nullptr, from, lookupCtx->space.id, {}, lookupCtx->idxReturnCols, lookupCtx->schemaId, lookupCtx->isEmptyResultSet); edgeIndexFullScan->setYieldColumns(lookupCtx->yieldExpr); plan.tail = edgeIndexFullScan; plan.root = edgeIndexFullScan; } else { // 如果是点的话,生成的是 TagIndexFullScan auto* tagIndexFullScan = TagIndexFullScan::make(qctx, nullptr, from, lookupCtx->space.id, {}, lookupCtx->idxReturnCols, lookupCtx->schemaId, lookupCtx->isEmptyResultSet); tagIndexFullScan->setYieldColumns(lookupCtx->yieldExpr); plan.tail = tagIndexFullScan; plan.root = tagIndexFullScan; } plan.tail->setColNames(lookupCtx->idxColNames);
// 我们没有指定 where 语句,所以不会有 filter 算子 if (lookupCtx->filter) { plan.root = Filter::make(qctx, plan.root, lookupCtx->filter); } // 会有 Project 算子生成:对输出列做一个映射 plan.root = Project::make(qctx, plan.root, lookupCtx->yieldExpr); // 这里是 distinct 关键字,我们没有指定,默认是没有这个算子的 if (lookupCtx->dedup) { plan.root = Dedup::make(qctx, plan.root); }
return plan;}
复制代码

8. explain 验证生成的执行计划

通过我们上述的介绍,执行计划已经生成了。那么,我们是不是可以通过 explain 或者 profile 来验证我们分析生成的执行计划就是 Project→TagIndexFullScan→Start 呢。下面是我们通过 explain 生成的执行计划,它验证了我们分析的源码和生成的执行计划是一致的。 大喜😊


(root@nebula) [basketballplayer]> explain lookup on player yield id(vertex)Execution succeeded (time spent 615µs/1.057064ms)
Execution Plan (optimize time 42 us)
-----+------------------+--------------+----------------+-----------------------------------| id | name | dependencies | profiling data | operator info |-----+------------------+--------------+----------------+-----------------------------------| 2 | Project | 3 | | outputVar: { || | | | | "colNames": [ || | | | | "id(VERTEX)" || | | | | ], || | | | | "type": "DATASET", || | | | | "name": "__Project_2" || | | | | } || | | | | inputVar: __TagIndexFullScan_1 || | | | | columns: [ || | | | | "id(VERTEX)" || | | | | ] |-----+------------------+--------------+----------------+-----------------------------------| 3 | TagIndexFullScan | 0 | | outputVar: { || | | | | "colNames": [ || | | | | "_vid", || | | | | "player._tag", || | | | | "player.age", || | | | | "player.name" || | | | | ], || | | | | "type": "DATASET", || | | | | "name": "__TagIndexFullScan_1" || | | | | } || | | | | inputVar: || | | | | space: 6 || | | | | dedup: false || | | | | limit: 9223372036854775807 || | | | | filter: || | | | | orderBy: [] || | | | | schemaId: 7 || | | | | isEdge: false || | | | | returnCols: [ || | | | | "_vid", || | | | | "_tag", || | | | | "age", || | | | | "name" || | | | | ] || | | | | indexCtx: [ || | | | | { || | | | | "columnHints": [], || | | | | "filter": "", || | | | | "index_id": 11 || | | | | } || | | | | ] |-----+------------------+--------------+----------------+-----------------------------------| 0 | Start | | | outputVar: { || | | | | "colNames": [], || | | | | "type": "DATASET", || | | | | "name": "__Start_0" || | | | | } |-----+------------------+--------------+----------------+-----------------------------------
复制代码

阶段小结

源码阅读到这里,我们知道 Graph 层从一个 nGQL 语句,到生成执行计划的所有过程。当中可能有一些细节没有面面俱到,但是,我们应该整体对代码有了初步了解。

9. 调度执行计划

接下来,我们要了解执行计划是如何被物理执行、Executor 是如何调度执行计划的。目前,我们只涉及到三个物理算子的执行,而且 Start 节点是一个没有实际语义的算子。这里我们仔细分析一下 TagIndexScan 和 Project 算子。


我们需要先回到第二章节的注释 7 那里了。注释 5 我们就不讲了,那里是内核语句 RBO 规则对执行计划进行优化的子模块,我们的简单语句的执行计划不涉及这块,留下后续扩展介绍吧。


// src/graph/scheduler/AsyncMsgNotifyBasedScheduler.cpp// 我们回到了注释 7 那里,对 scheduler_ 的 shcedule 方法解读一下// 然后我们再看 LOOKUP 语句的两个物理算子在这里是怎么执行的// 目前内核只实现了基于消息的异步调度器folly::Future<Status> AsyncMsgNotifyBasedScheduler::schedule() {  // 拿到执行计划的 root 节点,在这次的语句中,就是 Project  auto root = qctx_->plan()->root();  // 这块还没有深入解读过,后续再扩展吧  if (FLAGS_enable_lifetime_optimize) {    // special for root    root->outputVarPtr()->userCount.store(std::numeric_limits<uint64_t>::max(),                                          std::memory_order_relaxed);    analyzeLifetime(root);  }  // 递归将执行计划 convert 到物理执行计划 Executor,也就是 Project->ProjectExecutor, TagindexFullScan->IndexScanExecutor  // 把物理 Executor 的拓扑结构创建出来  //    ProjectExecutor 依赖 IndexScanExecutor IndexScanExecutor 的后继是 ProjectExecutor  //    IndexScanExecutor 依赖 StartExecutor StartExecutor 的后继是 IndexScanExecutor  auto executor = Executor::create(root, qctx_);  // 这里开始 DAG 的物理计划执行  // 调度是基于 folly 的 Promise 和 Future 异步调用展开的  return doSchedule(executor);}
folly::Future<Status> AsyncMsgNotifyBasedScheduler::doSchedule(Executor* root) const { // 这个是按照算子的 id,承诺给别的算子的 promise(你可以理解为谁依赖这个算子,那么就给谁一个 promise) std::unordered_map<int64_t, std::vector<folly::Promise<Status>>> promiseMap; // 这个是当前算子,被谁许诺过的 future,是从 promise 那里或者的结果值。也就是说,如果这个算子依赖了某些算子,只有它们的许诺兑现了(promise set value),这里的 future 才能得到处理 std::unordered_map<int64_t, std::vector<folly::Future<Status>>> futureMap; // 这个 queue 是为了辅助算子生成 promiseMap 和 futureMap 的 std::queue<Executor*> queue; // 这个 queue2 是为结合刚才生成的 promiseMap 和 futureMap 实际进行调度运行的 std::queue<Executor*> queue2; // 算子节点访问标记,避免重复遍历 std::unordered_set<Executor*> visited;
auto* runner = qctx_->rctx()->runner(); // 首先把 root 的 promise 出来,这个对于我们的执行计划中的算子就是 Project folly::Promise<Status> promiseForRoot; auto resultFuture = promiseForRoot.getFuture(); promiseMap[root->id()].emplace_back(std::move(promiseForRoot)); queue.push(root); visited.emplace(root); // 开始 DAG 访问图计算节点,生成每一个节点的 promise 和 future while (!queue.empty()) { auto* exe = queue.front(); queue.pop(); queue2.push(exe);
std::vector<folly::Future<Status>>& futures = futureMap[exe->id()]; if (exe->node()->kind() == PlanNode::Kind::kArgument) { auto nodeInputVar = exe->node()->inputVar(); const auto& writtenBy = qctx_->symTable()->getVar(nodeInputVar)->writtenBy; for (auto& node : writtenBy) { folly::Promise<Status> p; futures.emplace_back(p.getFuture()); auto& promises = promiseMap[node->id()]; promises.emplace_back(std::move(p)); } } else { for (auto* dep : exe->depends()) { auto notVisited = visited.emplace(dep).second; if (notVisited) { queue.push(dep); } folly::Promise<Status> p; futures.emplace_back(p.getFuture()); auto& promises = promiseMap[dep->id()]; promises.emplace_back(std::move(p)); } } } // 开始调度执行,下面的 scheduleExecutor 这个方法是关键 // 这个方法是纯异步运行的,比如运行 ProjectExecutor,它的依赖是 IndexScanExecutor // 那么 ProjectExecutor 的 future 就来自于 IndexScanExecutor 的 promise // ProjectExecutor 需要在 folly::collect 出等待 IndexScanExecutor 的执行结束 // 这样 ProjectExecutor 才可以得到执行的机会 while (!queue2.empty()) { auto* exe = queue2.front(); queue2.pop();
auto currentFuturesFound = futureMap.find(exe->id()); DCHECK(currentFuturesFound != futureMap.end()); auto currentExeFutures = std::move(currentFuturesFound->second);
auto currentPromisesFound = promiseMap.find(exe->id()); DCHECK(currentPromisesFound != promiseMap.end()); auto currentExePromises = std::move(currentPromisesFound->second);
scheduleExecutor(std::move(currentExeFutures), exe, runner) .thenTry([this, pros = std::move(currentExePromises)](auto&& t) mutable { if (t.hasException()) { notifyError(pros, Status::Error(std::move(t).exception().what())); } else { auto v = std::move(t).value(); if (v.ok()) { notifyOK(pros); // **Promise填充:成功以后具体填充promise的地方** } else { notifyError(pros, v); } } }); }
return resultFuture;}
// 你可以把这个函数理解为异步调度器,上面把所有的算子通过这个函数进行了调度// 第一个参数包含了该算子所有的 futures,也就是这个算子依赖算子的 promise 需要执行结束,这里的 futures 才可以获取到结果// 第二个参数是该算子的 Executor// 第三个参数是执行器,你可以理解为线程池
// 根据不同的算子类型,实现不同的分支运行,我们上面的语句是走 default 分支// lookup on player yield id(vertex);语句整体的调度过程// ProjectExecutor(P)->IndexScanExecutor(I)->Start(S)执行计划。下面我们用简写来表示三个算子// 首先 P 算子调度以后,它到了 default 分支,depends 不为空,那么走 runExecutor// P 算子的 future 就来自于 I 算子的 promise,所以需要等待 I 算子的执行结束// I 算子调度到这个函数以后,它到了 default 分支,depends 不为空,那么走 runExecutor// I 算子的 future 就来自于 S 算子的 promise,所以需要等待 S 算子的执行结束// S 算子调度到这个函数以后,它到了 default 分支,depends 为空,那么走 runLeafExecutor// S 算子就开始 execute 的逻辑了,可以去看看 StartExecutor 的 executor 方法,啥也没干,所以之前说 start 算子没啥语义// S 算子结束以后,它的 promise 被填充,其实是上面那个函数的回调填充的,具体看我上面的注释 **Promise 填充**// 那么 I 算子的 future 就得到了响应,去 runExecutor 看看,是不是也是有一个回调,立马发起了 I 算子的调用// 当 I 算子的 promise 也被上面的函数填充// 那么 P 算子的 executor 也得到了执行,这下就算执行完folly::Future<Status> AsyncMsgNotifyBasedScheduler::scheduleExecutor( std::vector<folly::Future<Status>>&& futures, Executor* exe, folly::Executor* runner) const { switch (exe->node()->kind()) { case PlanNode::Kind::kSelect: { auto select = static_cast<SelectExecutor*>(exe); return runSelect(std::move(futures), select, runner); } case PlanNode::Kind::kLoop: { auto loop = static_cast<LoopExecutor*>(exe); return runLoop(std::move(futures), loop, runner); } case PlanNode::Kind::kArgument: { return runExecutor(std::move(futures), exe, runner); } default: { if (exe->depends().empty()) { return runLeafExecutor(exe, runner); } else { return runExecutor(std::move(futures), exe, runner); } } }}
复制代码

10. LOOKUP 语句的算子在执行什么?

上面我介绍了物理算子通过 folly 三方库的 Promise 和 Future 异步编程模型来实现调度执行。接下来,重点介绍一下我们本次 LOOKUP 语句中两个算子执行了什么。源码走起:上面的语句主要介绍了三个物理算子:ProjectExecutorIndexScanExecutorStartExecutor。这里多说一句,因为和 IndexScan 有关的算子都会映射到 IndexScanExecutor


// StartExecutor:啥也没干
// IndexScanExecutor:是主要干活的,需要 graph 和 storage 的 rpc,拉取数据
// ProjectExecutor:这个物理执行算子不需要和 storage 交互,直接在 graph 层闭环计算
// 这三个算子,我们只分析后两个算子的源码:
// src/graph/executor/query/IndexScanExecutor.cpp
folly::Future<Status> IndexScanExecutor::execute() { return indexScan();}
folly::Future<Status> IndexScanExecutor::indexScan() { // 拿到和 storage 交互的 storageClient StorageClient *storageClient = qctx_->getStorageClient(); auto *lookup = asNode<IndexScan>(node()); if (lookup->isEmptyResultSet()) { DataSet dataSet({"dummy"}); return finish(ResultBuilder().value(Value(std::move(dataSet))).build()); }
const auto &ictxs = lookup->queryContext(); auto iter = std::find_if( ictxs.begin(), ictxs.end(), [](auto &ictx) { return !ictx.index_id_ref().is_set(); }); if (ictxs.empty() || iter != ictxs.end()) { return Status::Error("There is no index to use at runtime"); } // Req 的公共请求参数 StorageClient::CommonRequestParam param(lookup->space(), qctx()->rctx()->session()->id(), qctx()->plan()->id(), qctx()->plan()->isProfileEnabled()); return storageClient ->lookupIndex(param, ictxs, lookup->isEdge(), // 是不是边类型 lookup->schemaId(), // schemaId lookup->returnColumns(), // resp 返回的列数据 lookup->orderBy(), // 是否带有 orderBy,为了下推 TopN 算子 lookup->limit(qctx_)) // 是否带有 limit,为了下推 limit 算子 .via(runner()) .thenValue([this](StorageRpcResponse<LookupIndexResp> &&rpcResp) { addStats(rpcResp, otherStats_); return handleResp(std::move(rpcResp)); });}
// TODO(shylock) merge the handler with GetProptemplate <typename Resp>Status IndexScanExecutor::handleResp(storage::StorageRpcResponse<Resp> &&rpcResp) { auto completeness = handleCompleteness(rpcResp, FLAGS_accept_partial_success); if (!completeness.ok()) { return std::move(completeness).status(); } auto state = std::move(completeness).value(); nebula::DataSet v; // 把每一个 resp 拉出来处理,因为我们 storage 是可以分布式部署的 // 这里有一个问题重点提出一下,结果集会维护在 ectx_ 中,供 ProjectExecutor 一会取 for (auto &resp : rpcResp.responses()) { if (resp.data_ref().has_value()) { nebula::DataSet &data = *resp.data_ref(); // TODO: convert the column name to alias. if (v.colNames.empty()) { v.colNames = data.colNames; } v.rows.insert(v.rows.end(), data.rows.begin(), data.rows.end()); } else { state = Result::State::kPartialSuccess; } } if (!node()->colNames().empty()) { DCHECK_EQ(node()->colNames().size(), v.colNames.size()); v.colNames = node()->colNames(); } return finish( ResultBuilder().value(std::move(v)).iter(Iterator::Kind::kProp).state(state).build());}
// src/graph/executor/query/ProjectExecutor.cpp
folly::Future<Status> ProjectExecutor::execute() { SCOPED_TIMER(&execTime_); auto *project = asNode<Project>(node()); // 刚才说从 storage 获取的结果数据都放在 ectx_ 里面了 auto iter = ectx_->getResult(project->inputVar()).iter(); DCHECK(!!iter); QueryExpressionContext ctx(ectx_);
// 默认 max_job_size 是 1,我们先看 if 分支,看 handleJob 到底干了啥 if (FLAGS_max_job_size <= 1) { auto ds = handleJob(0, iter->size(), iter.get()); return finish(ResultBuilder().value(Value(std::move(ds))).build()); } else { DataSet ds; ds.colNames = project->colNames(); ds.rows.reserve(iter->size());
auto scatter = [this](size_t begin, size_t end, Iterator *tmpIter) -> StatusOr<DataSet> { return handleJob(begin, end, tmpIter); };
auto gather = [this, result = std::move(ds)](auto &&results) mutable { for (auto &r : results) { auto &&rows = std::move(r).value(); result.rows.insert(result.rows.end(), std::make_move_iterator(rows.begin()), std::make_move_iterator(rows.end())); } finish(ResultBuilder().value(Value(std::move(result))).build()); return Status::OK(); };
return runMultiJobs(std::move(scatter), std::move(gather), iter.get()); }}
DataSet ProjectExecutor::handleJob(size_t begin, size_t end, Iterator *iter) { auto *project = asNode<Project>(node()); auto columns = project->columns()->clone(); DataSet ds; ds.colNames = project->colNames(); QueryExpressionContext ctx(qctx()->ectx()); ds.rows.reserve(end - begin); // 从头到尾遍历数据,去除关心的数据 for (; iter->valid() && begin++ < end; iter->next()) { Row row; for (auto &col : columns->columns()) { Value val = col->expr()->eval(ctx(iter)); // 这个是表达式的 eval 执行,对于我们 id(vertex) 对应的是:src/common/function/FunctionManager.cpp:1832 auto &attr = functions_["id"]; row.values.emplace_back(std::move(val)); // 这个对于 id(vertex) 的 val 来说,就是 vertex.id ds.rows.emplace_back(std::move(row)); } return ds;}
复制代码

11. 数据结果显示

我们通过物理执行算子,把数据放在最后一个算子的 ProjectExecutor 的 ectx_(ExecutionContext) 里面了。我们接下来就是要知道,哪个流程把这个执行上下文的数据取走了:给客户端的 resp 填充这些数据,最终显示到我们的 nebula-console,或者其他客户端中。Its time to go back to 章节 2. 的注释 8:


// 请看第二步的注释 8:this->onFinish(); // 8. 这里是干完了所有物理执行计划,然后开始处理客户端 resp 了
// 我们进到 onFinish 函数看下:void QueryInstance::onFinish() { auto rctx = qctx()->rctx(); VLOG(1) << "Finish query: " << rctx->query(); auto &spaceName = rctx->session()->space().name; rctx->resp().spaceName = std::make_unique<std::string>(spaceName); // 这个函数做了填充结果数据到 resp 中 fillRespData(&rctx->resp());
auto latency = rctx->duration().elapsedInUSec(); rctx->resp().latencyInUs = latency; addSlowQueryStats(latency, spaceName); rctx->finish();
rctx->session()->deleteQuery(qctx_.get()); // The `QueryInstance' is the root node holding all resources during the // execution. When the whole query process is done, it's safe to release this // object, as long as no other contexts have chances to access these resources // later on, e.g. previously launched uncompleted async sub-tasks, EVEN on // failures. delete this;}
// 把执行的数据从 ectx 中取出,然后填充到执行 resp 中,这次语句执行就结束了// Get result from query context and fill the responsevoid QueryInstance::fillRespData(ExecutionResponse *resp) { auto ectx = DCHECK_NOTNULL(qctx_->ectx()); auto plan = DCHECK_NOTNULL(qctx_->plan()); const auto &name = plan->root()->outputVar(); if (!ectx->exist(name)) return;
auto &&value = ectx->moveValue(name); if (!value.isDataSet()) return;
// Fill dataset auto result = value.moveDataSet(); if (!result.colNames.empty()) { // 结果填充 resp->data = std::make_unique<DataSet>(std::move(result)); } else { // 如果有错误,错误码和错误信息 resp->errorCode = ErrorCode::E_EXECUTION_ERROR; resp->errorMsg = std::make_unique<std::string>("Internal error: empty column name list"); LOG(ERROR) << "Empty column name list"; }}
复制代码

小结

目前为止,我们把 LOOKUP 是怎么在内核中执行的一生的源码解读就做完了。有很多细节没有展开,后续的文章中我们将不断展开。其实,对于任意一个语句,基本执行的流程和 LOOKUP 的一生都类似,其中有不同的地方就是额外的算子不同,算子之间处理的逻辑不同。而且,这次我们没有打开 Storage 服务的代码,可以作为一个遗留项。


祝大家都可以在 NebulaGraph 图数据库的源码世界里面翱翔,欢迎大家和我来进行交流,学习 Wey Gu 的方式,给大家留一个微信联系方式:echo TWlsaXR0bGVUaW1l | base64 -d Call me.




谢谢你读完本文 (///▽///)


要来近距离体验一把图数据库吗?现在可以用用 NebulaGraph Cloud 来搭建自己的图数据系统哟,快来节省大量的部署安装时间来搞定业务吧~ NebulaGraph 阿里云计算巢现 30 天免费使用中,点击链接来用用图数据库吧~


想看源码的小伙伴可以前往 GitHub 阅读、使用、(^з^)-☆ star 它 -> GitHub;和其他的 NebulaGraph 用户一起交流图数据库技术和应用技能,留下「你的名片」一起玩耍呢~

发布于: 刚刚阅读数: 4
用户头像

NebulaGraph

关注

一款开源的分布式图数据库 2020-04-28 加入

Follow me, here is my GitHub profile: https://github.com/vesoft-inc/nebula

评论

发布
暂无评论
读 NebulaGraph源码 | 查询语句 LOOKUP 的一生_图数据库_NebulaGraph_InfoQ写作社区