写点什么

MongoDB 源码学习:mongod 如何处理请求

作者:云里有只猫
  • 2022-11-01
    广东
  • 本文字数:8037 字

    阅读完需:约 26 分钟

MongoDB源码学习:mongod如何处理请求

简介

通过 MongoDB 的源码学习,了解更多细节。本篇主要关注 MongoDB 从接收请求到执行之间的过程(其实就是突然脑抽筋想写点东西)。

流程:从接收请求到执行命令

Talk is cheap, show you the picture !



对象源码浅析

mongod_main

作用是根据配置初始化其他模块对象并且启动服务。


源码文件 src/mongo/db/mongod_main.cpp


int mongod_main(int argc, char* argv[]) {    // 生成ServiceEntryPoint,注册到ServiceContext中    service->setServiceEntryPoint(std::make_unique<ServiceEntryPointMongod>(service));        // 生成TransportLayer并且注册到ServiceContext中    auto tl =            transport::TransportLayerManager::createWithConfig(&serverGlobalParams, serviceContext);    auto res = tl->setup();    serviceContext->setTransportLayer(std::move(tl));        // 启动TransportLayer和ServiceEntryPoint    auto start = serviceContext->getServiceEntryPoint()->start();    start = serviceContext->getTransportLayer()->start();}
复制代码

ServiceContext

上下文对象,mongod_main 将初始化完成的模块都注册到 ServiceContext 中,并且将这个 ServiceContext 传递到各个模块,这样就实现了不同模块之间的互相调用。


主要源码文件src/mongo/db/service_context.h


class ServiceContext final : public Decorable<ServiceContext>     public:    void setServiceEntryPoint(std::unique_ptr<ServiceEntryPoint> sep);    void setTransportLayer(std::unique_ptr<transport::TransportLayer> tl);        transport::TransportLayer* getTransportLayer() const;    ServiceEntryPoint* getServiceEntryPoint() const;

private: SyncUnique<transport::TransportLayer> _transportLayer; SyncUnique<ServiceEntryPoint> _serviceEntryPoint;
}
复制代码

传输层 TransportLayer

TransportLayer 负责将 Message 从 Endpoint 传递到 database。


TransportLayer 维护着 acceptor 用于接收请求,创建一个对应的 session 传递给 database(当中经过 ServiceEntryPoint)。database 需要管理 session,使其完成读取 Message、处理 Message、回复 Message 三个步骤组成的生命周期。


主要源码文件 src/mongo/transport/transport_layer.h


class TransportLayer {public:    // 开启监听连接之前的准备操作    virtual Status setup() = 0;    // 开始监听连接    virtual Status start() = 0;    // 不再监听连接,并且结束所有session    virtual void shutdown() = 0;}
复制代码

TransportLayerManager

TransportLayerManager 实现了 TransportLayer 的接口,内部维护其他 TransportLayer,让 Mongod 和 Mongos 调用其他 TransportLayer 时候更方便


主要源码文件 src/mongo/transport/transport_layer_manager.hsrc/mongo/transport/transport_layer_manager.cpp


// tranport_layer_manager.hclass TransportLayerManager final : public TransportLayer {private:    // 维护其他TransportLayer    std::vector<std::unique_ptr<TransportLayer>> _tls;public:    // 在mongod_main中被调用,根据配置创建TransportLayerManager    static std::unique_ptr<TransportLayer> createWithConfig(const ServerGlobalParams* config, ServiceContext* ctx);}

// tranport_layer_manager.cppstd::unique_ptr<TransportLayer> TransportLayerManager::createWithConfig(const ServerGlobalParams* config, ServiceContext* ctx) { // 因为mongod_main中设置的是ServiceEntryPointMongod,因此这里这里会得到ServiceEntryPointMongod auto sep = ctx->getServiceEntryPoint(); // 创建TransportLayerASIO,并且作为TransportLayerManager管理的第一个TransportLayer retVector.emplace_back(std::make_unique<transport::TransportLayerASIO>(opts, sep)); return std::make_unique<TransportLayerManager>(std::move(retVector));}Status TransportLayerManager::setup() { for (auto&& tl : _tls) { // 调用所有管理的TransportLayer::setup auto status = tl->setup(); }}Status TransportLayerManager::start() { for (auto&& tl : _tls) { // 调用所有管理的TransportLayer::start auto status = tl->start(); }}
复制代码

TransportLayerASIO

TransportLayerASIO 是一个基于 ASIO 封装的 TransportLayer(ASIO 是 C++中用于网络编程的库)。


主要源码文件 src/mongo/transport/transport_layer_asio.cpp


Status TransportLayerASIO::setup() {    // 从配置中获取服务端需要监听的ip、port等信息}Status TransportLayerASIO::start() {    // 开始监听连接    for (auto& acceptor : _acceptors) {        // 调用_acceptConnection等待连接        _acceptConnection(acceptor.second);    }}void TransportLayerASIO::_acceptConnection(GenericAcceptor& acceptor) {    auto acceptCb = [this, &acceptor](const std::error_code& ec, GenericSocket peerSocket) mutable {        try {            // 创建一个Session,然后调用ServiceEntryPoint::startSession处理            std::shared_ptr<ASIOSession> session(                new ASIOSession(this, std::move(peerSocket), true));            _sep->startSession(std::move(session));        } catch (const DBException& e) {}        // 处理完会话或异常之后,重新调用_acceptConnection        _acceptConnection(acceptor);    }    acceptor.async_accept(*_ingressReactor, std::move(acceptCb)); // 设置有连接时候的callback}
复制代码

ServiceEntryPoint

ServiceEntryPoint 负责接收来自 TransportLayer 的 Session,按照获取 Message、处理 Message、回复 Message 三个步骤,不断循环处理 Session 接收到的 Message,直到 Session 结束。


ServiceEntryPointImpl 实现了 ServiceEntryPoint 的 start 和 startSession 方法,mongod_main 中创建的 ServiceEntryPointMongod 是继承了 ServiceEntryPoint。


主要源码文件


  • src/mongo/transport/service_entry_point.h

  • src/mong/transport/service_entry_impl.h

  • src/mong/transport/service_entry_impl.cpp


// service_entry.hclass ServiceEntryPoint {    // 处理一个Session    virtual void startSession(transport::SessionHandle session) = 0;    // 启动ServiceEntryPoint    virtual Status start() = 0;    // 处理一个Message,并且返回DBResponse    virtual Future<DbResponse> handleRequest(OperationContext* opCtx,                                             const Message& request) noexcept = 0;}
// service_entry_impl.hclass ServiceEntryPointImpl : public ServiceEntryPoint {private: using SSMList = std::list<transport::ServiceStateMachine>; // 维护着一个ServiceStateMachine的列表 SSMList _sessions;}
// service_entry_impl.cppStatus ServiceEntryPointImpl::start() { // 创建三种类型的ServiceExecutor并且执行start方法 if (auto status = transport::ServiceExecutorSynchronous::get(_svcCtx)->start(); !status.isOK()) { return status; } if (auto exec = transport::ServiceExecutorReserved::get(_svcCtx)) { if (auto status = exec->start(); !status.isOK()) { return status; } }
if (auto status = transport::ServiceExecutorFixed::get(_svcCtx)->start(); !status.isOK()) { return status; }}

void ServiceEntryPointImpl::startSession(transport::SessionHandle session) { // 创建一个client auto client = _svcCtx->makeClient(clientName, session); auto maybeSsmIt = [&]() -> boost::optional<SSMListIterator> { // 创建一个ServiceStateMachine并且添加到_sessions中 auto it = _sessions.emplace(_sessions.begin(), std::move(client)); // 返回这个新创建的ServiceStateMachine return it; } auto ssmIt = *maybeSsmIt; // 调用ServiceStateMachine::start auto seCtx = transport::ServiceExecutorContext{}; seCtx.setThreadingModel(transport::ServiceExecutor::getInitialThreadingModel()); // 决定了使用那个类型的ServiceExecutor seCtx.setCanUseReserved(canOverrideMaxConns); ssmIt->start(std::move(seCtx));}
复制代码

ServiceEntryPointMongod

主要文件 src/mongo/transport/service_entry_point_mongod.cpp


// service_entry_point_mongod.hclass ServiceEntryPointMongod final : public ServiceEntryPointImpl {}
// service_entry_point_mongod.cppFuture<DbResponse> ServiceEntryPointMongod::handleRequest(OperationContext* opCtx, const Message& m) noexcept { // 处理Message使用ServiceEntryPointCommon中的实现,并且注入一系列的Hooks return ServiceEntryPointCommon::handleRequest(opCtx, m, std::make_unique<Hooks>());}
复制代码

ServiceEntryPointCommon

ServiceEntryPointCommon 是一个实现了 ServiceEntryPointImpl 的公共方法集合。另外还定义了 Hooks(本次不涉及)。


主要文件 src/mongo/transport/service_entry_point_common.cpp


Future<DbResponse> ServiceEntryPointCommon::handleRequest(    OperationContext* opCtx,    const Message& m,    std::unique_ptr<const Hooks> behaviors) {    // 根据命令创建opRunner(例如QueryOpRunner),返回DBResponse,本次不介绍细节    HandleRequest hr(opCtx, m, std::move(behaviors));    auto opRunner = hr.makeOpRunner();    return opRunner->run().then([hr = std::move(hr)](DbResponse response) mutable {// ....})}
复制代码

OpRunner

定义在 service_entry_point_common.cpp 中,根据 Message 中不同的操作类型会有创建不同的 OpRunner,例如 QueryOpRunner、InsertOpRunner、UpdateOpRunner 等。


struct OpRunner {        explicit OpRunner(HandleRequest* hr) : executionContext{hr->executionContext}{}    virtual ~OpRunner() = default;    // 执行命令,返回DbResponse    virtual Future<DbResponse> run() = 0;    std::shared_ptr<ExecutionContext> executionContext;};
复制代码

状态机 ServiceStateMachine

ServiceStateMachine(简称 SSM)是一个维护客户端连接的状态生命周期的状态机,每个客户端连接都会生成一个 SSM。


主要文件 src/mongo/transport/service_state_machine.cpp


// service_state_machine.hclass ServiceStateMachine {    // start之后,SSM会按照获取Message、处理Message、回复Message步骤执行。维护client的状态,确保状态流转正确。    void start(ServiceExecutorContext seCtx);}
// service_state_machine.cpp// 定义一个SSM的Implclass ServiceStateMachine::Impl final : public std::enable_shared_from_this<ServiceStateMachine::Impl> {public: // 定义状态机的所有状态,整个生命周期如下 // Source -> SourceWait -> Process -> SinkWait -> Source (standard RPC) // Source -> SourceWait -> Process -> SinkWait -> Process -> SinkWait ... (exhaust) // Source -> SourceWait -> Process -> Source (fire-and-forget) enum class State { Created, // Session创建之后,还没有操作 Source, // 等待新Message SourceWait, // 从请求中读取Message,等待Message接收完成 Process, // 已经通知DB开始执行Message SinkWait, // 等待DB结果通过网络返回客户端 EndSession, // 结束Session,待Session结束后当前SSM会失效 Ended // Session已经结束 }; void start(ServiceExecutorContext seCtx); // 通知DB执行Message Future<void> processMessage(); // 以下两个方法,在TransportLayer完成网络操作之后回调 void sourceCallback(Status status); void sinkCallback(Status status); // 通过TransportLayer读取请求或者返回响应 Future<void> sourceMessage(); Future<void> sinkMessage(); // 通过ServiceExecutor开启循环的调度 void scheduleNewLoop(Status status); // 开始一个新的循环(例如source、process、最后skin) void startNewLoop(const Status& execStatus); // 根据ServiceExecutor ServiceExecutor* executor() { return ServiceExecutorContext::get(_clientStrand->getClientPointer())->getServiceExecutor(); } private: AtomicWord<State> _state{State::Created}; // 创建SSM,默认Created bool _inExhaust = false;}
void ServiceStateMachine::Impl::start(ServiceExecutorContext seCtx) { scheduleNewLoop(Status::OK());}
复制代码

循环-lookup

void ServiceStateMachine::Impl::scheduleNewLoop(Status status) try {    auto cb = [this, anchor = shared_from_this()](Status executorStatus) {        _clientStrand->run([&] { startNewLoop(executorStatus); });    };        try {        // 调用ServiceExecutor的schedule或runOnDataAvailable调度任务执行,执行触发cb回调        if (_inExhaust) {            // If we're in exhaust, we're not expecting more data.            executor()->schedule(std::move(cb));        } else {            executor()->runOnDataAvailable(session(), std::move(cb));        }    } catch (const DBException& ex) {        _state.store(State::EndSession);    }}
void ServiceStateMachine::Impl::startNewLoop(const Status& executorStatus) { makeReadyFutureWith([&]() -> Future<void> { if (_inExhaust) { return Status::OK(); } else { // 读取Message return sourceMessage(); } }) .then([this]() { return processMessage(); // 处理Message }) .then([this]() -> Future<void> { if (_outMessage.empty()) { return Status::OK(); }
return sinkMessage(); // 等待Message回复 }) .getAsync([this, anchor = shared_from_this()](Status status) { scheduleNewLoop(std::move(status)); });}
复制代码

获取 Message-sourceMessage

Future<void> ServiceStateMachine::Impl::sourceMessage() {    // 更新状态为SourceWait    _state.store(State::SourceWait);        auto sourceMsgImpl = [&] {        // 读取Message        if (transportMode == transport::Mode::kSynchronous) {            return Future<Message>::makeReady(session()->sourceMessage());        } else {            return session()->asyncSourceMessage();        }    }    return sourceMsgImpl().onCompletion([this](StatusWith<Message> msg) -> Future<void> {        if (msg.isOK()) {            // 读取完成记录到_inMessage变量中            _inMessage = std::move(msg.getValue());            invariant(!_inMessage.empty());        }        // 触发sourceCallback        sourceCallback(msg.getStatus());        return Status::OK();    });}
void ServiceStateMachine::Impl::sourceCallback(Status status) { // 更新状态为Process _state.store(State::Process);}
复制代码

处理 Message-handleMessage

Future<void> ServiceStateMachine::Impl::processMessage() {    // 调用ServiceEntryPoint::handleRequest处理Message    return _sep->handleRequest(_opCtx.get(), _inMessage).then([this, &compressorMgr = compressorMgr](DbResponse dbresponse) mutable -> void {        Message& toSink = dbresponse.response;        if (!toSink.empty()) {            _outMessage = std::move(toSink);        } else {            // Message已经处理完成,等待下一次Message            _state.store(State::Source);            _inExhaust = false;        }    }}
复制代码

返回响应-replyMessage

Future<void> ServiceStateMachine::Impl::sinkMessage() {    // 更新状态为SinkWait    _state.store(State::SinkWait);    auto toSink = std::exchange(_outMessage, {});            auto sinkMsgImpl = [&] {        // 通过TransportLayer返回response        if (transportMode == transport::Mode::kSynchronous) {            return Future<void>::makeReady(session()->sinkMessage(std::move(toSink)));        } else {            return session()->asyncSinkMessage(std::move(toSink));        }    }        return sinkMsgImpl().onCompletion([this](Status status) {        // 返回响应完成之后,触发sinkCallback        sinkCallback(std::move(status));        return Status::OK();    });}
void ServiceStateMachine::Impl::sinkCallback(Status status) { _state.store(State::EndSession);}
复制代码

任务调度 ServiceExecutor

ServiceExecutor 负责调度并且回调执行 Task,例如 ServiceExecutorSynchronous 实现同步处理 Task。


主要文件 src/mongo/transport/service_executor.cpp


// service_executor.hclass ServiceExecutor : public OutOfLineExecutor {public:    // 执行Task并且马上返回    virtual Status scheduleTask(Task task, ScheduleFlags flags) = 0;            // 对scheduleTask的包装    void schedule(OutOfLineExecutor::Task func) override {        iassert(scheduleTask([task = std::move(func)]() mutable { task(Status::OK()); },                             ScheduleFlags::kEmptyFlags));    }        // 等待session中有可用数据才执行schedule调度    virtual void runOnDataAvailable(const SessionHandle& session,                                    OutOfLineExecutor::Task onCompletionCallback) = 0;
// 根据_threadingModel获取不同的ServiceExecutor ServiceExecutor* getServiceExecutor() noexcept;private: ThreadingModel _threadingModel = ThreadingModel::kDedicated; // 决定使用哪种ServiceExecutor}
复制代码

ServiceExecutorSynchronous

ServiceExecutorSynchronous 每个连接都会有对应的 work 线程负责调度。当被调用 runOnDataAvailable 时候,会创建新线程(如果当前线程超过 CPU 核数,不会创建线程,而是让出当前 CPU 时间)。

ServiceExecutorReserved

ServiceExecutorReserved 根据配置启动 serverGlobalParams.reservedThreads 个线程,并且确保无论任何时候都会有 reservedThreads 个线程处理任务(即使线程处于空闲状态)。

ServiceExecutorFixed

ServiceExecutorFixed 相当于线程池,在不超过配置数量的情况下,新任务会启动新线程调度。

to be continue

发布于: 刚刚阅读数: 5
用户头像

还未添加个人签名 2020-03-31 加入

还未添加个人简介

评论

发布
暂无评论
MongoDB源码学习:mongod如何处理请求_mongodb_云里有只猫_InfoQ写作社区