写点什么

MongoDB 源码学习:Mongo 中的 OpRunner

作者:云里有只猫
  • 2022-11-20
    广东
  • 本文字数:3470 字

    阅读完需:约 11 分钟

MongoDB源码学习:Mongo中的OpRunner

简介

接上一回,mongod 在接收到用户的 request 后,到了 ServiceEntryPoint 模块(service_entry_point_common.cpp)之后,会根据 request 的类型创建出不同的 OpRunner 处理 request。那么这个 OpRunner 是什么?

OpRunner

我们先回顾下ServiceEntryPointCommon::handleRequest是调用hr.makeOpRunner得到一个 OpRunner,然后再执行OpRunner::run方法的。以下是 makeOpRunner 的源码


std::unique_ptr<HandleRequest::OpRunner> HandleRequest::makeOpRunner() {    switch (executionContext->op()) {        case dbQuery:            if (!executionContext->nsString().isCommand())                return std::make_unique<QueryOpRunner>(this);            // FALLTHROUGH: it's a query containing a command        case dbMsg:            return std::make_unique<CommandOpRunner>(this);        case dbGetMore:            return std::make_unique<GetMoreOpRunner>(this);        case dbKillCursors:            return std::make_unique<KillCursorsOpRunner>(this);        case dbInsert:            return std::make_unique<InsertOpRunner>(this);        case dbUpdate:            return std::make_unique<UpdateOpRunner>(this);        case dbDelete:            return std::make_unique<DeleteOpRunner>(this);        default:            return std::make_unique<UnsupportedOpRunner>(this);    }}

复制代码


接下来看下 OpRunner 家族的情况。


可以看到 OpRunner 是一个抽象,主要分 3 大类。

CommandOpRunner

负责处理命令的 OpRunner,例如db.createCollection命令。当中主要逻辑在 receivedCommands 方法完成。


struct CommandOpRunner : HandleRequest::OpRunner {    using HandleRequest::OpRunner::OpRunner;    Future<DbResponse> run() override {        return receivedCommands(executionContext);    }};
复制代码

SynchronousOpRunner

增加了 try-catch 逻辑,让子类不需要再单独处理 DBException。其子类分别有:


  • QueryOpRunner

  • GetMoreOpRunner

  • FireAndForgetOpRunner

  • UnsupportedOpRunner


// Allows wrapping synchronous code in futures without repeating the try-catch block.struct SynchronousOpRunner : HandleRequest::OpRunner {    using HandleRequest::OpRunner::OpRunner;    virtual DbResponse runSync() = 0;    Future<DbResponse> run() final try { return runSync(); } catch (const DBException& ex) {        return ex.toStatus();    }};
复制代码

FireAndForgetOpRunner

是 SynchronousOpRunner 的子类,因为 SynchronousOpRunner 会捕获 DBException,而 FireAndForgetOpRunner 会处理成只有NotPrimaryError会抛出异常,其余情况只做错误记录。其子类分别有:


  • KillCursorsOpRunner

  • InsertOpRunner

  • UpdateOpRunner

  • DeleteOpRunner


/** * Fire and forget network operations don't produce a `DbResponse`. * They override `runAndForget` instead of `run`, and this base * class provides a `run` that calls it and handles error reporting * via the `LastError` slot. */struct FireAndForgetOpRunner : SynchronousOpRunner {    using SynchronousOpRunner::SynchronousOpRunner;    virtual void runAndForget() = 0;    DbResponse runSync() final;};
DbResponse FireAndForgetOpRunner::runSync() { try { warnDeprecation(executionContext->client(), networkOpToString(executionContext->op())); runAndForget(); } catch (const AssertionException& ue) { LastError::get(executionContext->client()).setLastError(ue.code(), ue.reason()); LOGV2_DEBUG(21969, 3, "Caught Assertion in {networkOp}, continuing: {error}", "Assertion in fire-and-forget operation", "networkOp"_attr = networkOpToString(executionContext->op()), "error"_attr = redact(ue)); executionContext->currentOp().debug().errInfo = ue.toStatus(); } // A NotWritablePrimary error can be set either within // receivedInsert/receivedUpdate/receivedDelete or within the AssertionException handler above. // Either way, we want to throw an exception here, which will cause the client to be // disconnected. if (LastError::get(executionContext->client()).hadNotPrimaryError()) { notPrimaryLegacyUnackWrites.increment(); uasserted(ErrorCodes::NotWritablePrimary, str::stream() << "Not-master error while processing '" << networkOpToString(executionContext->op()) << "' operation on '" << executionContext->nsString() << "' namespace via legacy " << "fire-and-forget command execution."); } return {};}
复制代码

细说一下 CommandOpRunner

接下来细说一下 CommandOpRunner 的处理逻辑,回想一下 CommandOpRunner.Run,就只有调用了 receivedCommands 方法。那 receivedCommands 到底做了什么,以及由如何处理命令的呢?

receivedCommands

receivedCommands 的逻辑主要分为 3 步:


  • parseCommand - 解析命令

  • executeCommand - 执行命令

  • makeCommandResponse - 创建 DbResponse


Future<DbResponse> receivedCommands(std::shared_ptr<HandleRequest::ExecutionContext> execContext) {    // 设置一个ReplyBuilder处理回复的信息    execContext->setReplyBuilder(        rpc::makeReplyBuilder(rpc::protocolForMessage(execContext->getMessage())));    return parseCommand(execContext)        .then([execContext]() mutable { return executeCommand(std::move(execContext)); })        .onError([execContext](Status status) {            // 省略了代码,这里处理了错误的情况        })        .then([execContext]() mutable { return makeCommandResponse(std::move(execContext)); });}
复制代码

parseCommand - 解析命令

根据用户 request 中的 message,解析成 OpMsgRequest 对象(这里不太想看,有兴趣的朋友自行阅读吧)。

makeCommandResponse - 创建 DbResponse

根据处理结果,封装成 DbResponse 返回(没错,这里我也不想看)。

executeCommand - 执行命令

这个方法主要做的事情就是:


  • 找到 Command

  • 封装成 ExecCommandDatabase,然后执行 run 方法。这一步骤也可以分为 4 个主要步骤:

  • _parseCommand - 解析 Command。在初始化 ExecCommandDatabase 的时候会调用。在这一步骤中会将 Command 解析得到一个CommandInvocation对象。

  • _initiateCommand - 初始化 Command,在执行 ExecCommandDatabase::run 时候调用。

  • _commandExec - 执行 Command,同样在执行 ExecCommandDatabase::run 时候调用。


Future<void> executeCommand(std::shared_ptr<HandleRequest::ExecutionContext> execContext) {    auto future =        std::move(present)            .then([execContext]() -> Future<void> {                // 省略了部分逻辑,例如检查command是否存在                Command* c = execContext->getCommand();                {                    stdx::lock_guard<Client> lk(*opCtx->getClient());                    CurOp::get(opCtx)->setLogicalOp_inlock(c->getLogicalOp());                }
opCtx->setExhaust( OpMsg::isFlagSet(execContext->getMessage(), OpMsg::kExhaustSupported));
return Status::OK(); }) .then([execContext]() mutable { // 封装成ExecCommandDatabase,然后调用其run方法 return future_util::makeState<ExecCommandDatabase>(std::move(execContext)) .thenWithState([](auto* runner) { return runner->run(); }); }) .tapError([execContext](Status status) { // 处理错误的情况}) past.emplaceValue(); return future;}
复制代码

to be continue

发布于: 2022-11-20阅读数: 22
用户头像

还未添加个人签名 2020-03-31 加入

还未添加个人简介

评论

发布
暂无评论
MongoDB源码学习:Mongo中的OpRunner_mongodb_云里有只猫_InfoQ写作社区