简介
接上一回,mongod 在接收到用户的 request 后,到了 ServiceEntryPoint 模块(service_entry_point_common.cpp)之后,会根据 request 的类型创建出不同的 OpRunner 处理 request。那么这个 OpRunner 是什么?
OpRunner
我们先回顾下ServiceEntryPointCommon::handleRequest
是调用hr.makeOpRunner
得到一个 OpRunner,然后再执行OpRunner::run
方法的。以下是 makeOpRunner 的源码
std::unique_ptr<HandleRequest::OpRunner> HandleRequest::makeOpRunner() {
switch (executionContext->op()) {
case dbQuery:
if (!executionContext->nsString().isCommand())
return std::make_unique<QueryOpRunner>(this);
// FALLTHROUGH: it's a query containing a command
case dbMsg:
return std::make_unique<CommandOpRunner>(this);
case dbGetMore:
return std::make_unique<GetMoreOpRunner>(this);
case dbKillCursors:
return std::make_unique<KillCursorsOpRunner>(this);
case dbInsert:
return std::make_unique<InsertOpRunner>(this);
case dbUpdate:
return std::make_unique<UpdateOpRunner>(this);
case dbDelete:
return std::make_unique<DeleteOpRunner>(this);
default:
return std::make_unique<UnsupportedOpRunner>(this);
}
}
复制代码
接下来看下 OpRunner 家族的情况。
可以看到 OpRunner 是一个抽象,主要分 3 大类。
CommandOpRunner
负责处理命令的 OpRunner,例如db.createCollection
命令。当中主要逻辑在 receivedCommands 方法完成。
struct CommandOpRunner : HandleRequest::OpRunner {
using HandleRequest::OpRunner::OpRunner;
Future<DbResponse> run() override {
return receivedCommands(executionContext);
}
};
复制代码
SynchronousOpRunner
增加了 try-catch 逻辑,让子类不需要再单独处理 DBException。其子类分别有:
QueryOpRunner
GetMoreOpRunner
FireAndForgetOpRunner
UnsupportedOpRunner
// Allows wrapping synchronous code in futures without repeating the try-catch block.
struct SynchronousOpRunner : HandleRequest::OpRunner {
using HandleRequest::OpRunner::OpRunner;
virtual DbResponse runSync() = 0;
Future<DbResponse> run() final try { return runSync(); } catch (const DBException& ex) {
return ex.toStatus();
}
};
复制代码
FireAndForgetOpRunner
是 SynchronousOpRunner 的子类,因为 SynchronousOpRunner 会捕获 DBException,而 FireAndForgetOpRunner 会处理成只有NotPrimaryError
会抛出异常,其余情况只做错误记录。其子类分别有:
KillCursorsOpRunner
InsertOpRunner
UpdateOpRunner
DeleteOpRunner
/**
* Fire and forget network operations don't produce a `DbResponse`.
* They override `runAndForget` instead of `run`, and this base
* class provides a `run` that calls it and handles error reporting
* via the `LastError` slot.
*/
struct FireAndForgetOpRunner : SynchronousOpRunner {
using SynchronousOpRunner::SynchronousOpRunner;
virtual void runAndForget() = 0;
DbResponse runSync() final;
};
DbResponse FireAndForgetOpRunner::runSync() {
try {
warnDeprecation(executionContext->client(), networkOpToString(executionContext->op()));
runAndForget();
} catch (const AssertionException& ue) {
LastError::get(executionContext->client()).setLastError(ue.code(), ue.reason());
LOGV2_DEBUG(21969,
3,
"Caught Assertion in {networkOp}, continuing: {error}",
"Assertion in fire-and-forget operation",
"networkOp"_attr = networkOpToString(executionContext->op()),
"error"_attr = redact(ue));
executionContext->currentOp().debug().errInfo = ue.toStatus();
}
// A NotWritablePrimary error can be set either within
// receivedInsert/receivedUpdate/receivedDelete or within the AssertionException handler above.
// Either way, we want to throw an exception here, which will cause the client to be
// disconnected.
if (LastError::get(executionContext->client()).hadNotPrimaryError()) {
notPrimaryLegacyUnackWrites.increment();
uasserted(ErrorCodes::NotWritablePrimary,
str::stream() << "Not-master error while processing '"
<< networkOpToString(executionContext->op()) << "' operation on '"
<< executionContext->nsString() << "' namespace via legacy "
<< "fire-and-forget command execution.");
}
return {};
}
复制代码
细说一下 CommandOpRunner
接下来细说一下 CommandOpRunner 的处理逻辑,回想一下 CommandOpRunner.Run,就只有调用了 receivedCommands 方法。那 receivedCommands 到底做了什么,以及由如何处理命令的呢?
receivedCommands
receivedCommands 的逻辑主要分为 3 步:
Future<DbResponse> receivedCommands(std::shared_ptr<HandleRequest::ExecutionContext> execContext) {
// 设置一个ReplyBuilder处理回复的信息
execContext->setReplyBuilder(
rpc::makeReplyBuilder(rpc::protocolForMessage(execContext->getMessage())));
return parseCommand(execContext)
.then([execContext]() mutable { return executeCommand(std::move(execContext)); })
.onError([execContext](Status status) {
// 省略了代码,这里处理了错误的情况
})
.then([execContext]() mutable { return makeCommandResponse(std::move(execContext)); });
}
复制代码
parseCommand - 解析命令
根据用户 request 中的 message,解析成 OpMsgRequest 对象(这里不太想看,有兴趣的朋友自行阅读吧)。
makeCommandResponse - 创建 DbResponse
根据处理结果,封装成 DbResponse 返回(没错,这里我也不想看)。
executeCommand - 执行命令
这个方法主要做的事情就是:
找到 Command
封装成 ExecCommandDatabase,然后执行 run 方法。这一步骤也可以分为 4 个主要步骤:
_parseCommand - 解析 Command。在初始化 ExecCommandDatabase 的时候会调用。在这一步骤中会将 Command 解析得到一个CommandInvocation
对象。
_initiateCommand - 初始化 Command,在执行 ExecCommandDatabase::run 时候调用。
_commandExec - 执行 Command,同样在执行 ExecCommandDatabase::run 时候调用。
Future<void> executeCommand(std::shared_ptr<HandleRequest::ExecutionContext> execContext) {
auto future =
std::move(present)
.then([execContext]() -> Future<void> {
// 省略了部分逻辑,例如检查command是否存在
Command* c = execContext->getCommand();
{
stdx::lock_guard<Client> lk(*opCtx->getClient());
CurOp::get(opCtx)->setLogicalOp_inlock(c->getLogicalOp());
}
opCtx->setExhaust(
OpMsg::isFlagSet(execContext->getMessage(), OpMsg::kExhaustSupported));
return Status::OK();
})
.then([execContext]() mutable {
// 封装成ExecCommandDatabase,然后调用其run方法
return future_util::makeState<ExecCommandDatabase>(std::move(execContext))
.thenWithState([](auto* runner) { return runner->run(); });
})
.tapError([execContext](Status status) { // 处理错误的情况})
past.emplaceValue();
return future;
}
复制代码
to be continue
评论