简介
通过 MongoDB 的源码学习,了解更多细节。本篇主要关注 MongoDB 从接收请求到执行之间的过程(其实就是突然脑抽筋想写点东西)。
流程:从接收请求到执行命令
Talk is cheap, show you the picture !
对象源码浅析
mongod_main
作用是根据配置初始化其他模块对象并且启动服务。
源码文件 src/mongo/db/mongod_main.cpp
int mongod_main(int argc, char* argv[]) { // 生成ServiceEntryPoint,注册到ServiceContext中 service->setServiceEntryPoint(std::make_unique<ServiceEntryPointMongod>(service)); // 生成TransportLayer并且注册到ServiceContext中 auto tl = transport::TransportLayerManager::createWithConfig(&serverGlobalParams, serviceContext); auto res = tl->setup(); serviceContext->setTransportLayer(std::move(tl)); // 启动TransportLayer和ServiceEntryPoint auto start = serviceContext->getServiceEntryPoint()->start(); start = serviceContext->getTransportLayer()->start();}
复制代码
ServiceContext
上下文对象,mongod_main 将初始化完成的模块都注册到 ServiceContext 中,并且将这个 ServiceContext 传递到各个模块,这样就实现了不同模块之间的互相调用。
主要源码文件src/mongo/db/service_context.h
class ServiceContext final : public Decorable<ServiceContext> public: void setServiceEntryPoint(std::unique_ptr<ServiceEntryPoint> sep); void setTransportLayer(std::unique_ptr<transport::TransportLayer> tl); transport::TransportLayer* getTransportLayer() const; ServiceEntryPoint* getServiceEntryPoint() const;
private: SyncUnique<transport::TransportLayer> _transportLayer; SyncUnique<ServiceEntryPoint> _serviceEntryPoint;
}
复制代码
传输层 TransportLayer
TransportLayer 负责将 Message 从 Endpoint 传递到 database。
TransportLayer 维护着 acceptor 用于接收请求,创建一个对应的 session 传递给 database(当中经过 ServiceEntryPoint)。database 需要管理 session,使其完成读取 Message、处理 Message、回复 Message 三个步骤组成的生命周期。
主要源码文件 src/mongo/transport/transport_layer.h
class TransportLayer {public: // 开启监听连接之前的准备操作 virtual Status setup() = 0; // 开始监听连接 virtual Status start() = 0; // 不再监听连接,并且结束所有session virtual void shutdown() = 0;}
复制代码
TransportLayerManager
TransportLayerManager 实现了 TransportLayer 的接口,内部维护其他 TransportLayer,让 Mongod 和 Mongos 调用其他 TransportLayer 时候更方便
主要源码文件 src/mongo/transport/transport_layer_manager.h 和 src/mongo/transport/transport_layer_manager.cpp
// tranport_layer_manager.hclass TransportLayerManager final : public TransportLayer {private: // 维护其他TransportLayer std::vector<std::unique_ptr<TransportLayer>> _tls;public: // 在mongod_main中被调用,根据配置创建TransportLayerManager static std::unique_ptr<TransportLayer> createWithConfig(const ServerGlobalParams* config, ServiceContext* ctx);}
// tranport_layer_manager.cppstd::unique_ptr<TransportLayer> TransportLayerManager::createWithConfig(const ServerGlobalParams* config, ServiceContext* ctx) { // 因为mongod_main中设置的是ServiceEntryPointMongod,因此这里这里会得到ServiceEntryPointMongod auto sep = ctx->getServiceEntryPoint(); // 创建TransportLayerASIO,并且作为TransportLayerManager管理的第一个TransportLayer retVector.emplace_back(std::make_unique<transport::TransportLayerASIO>(opts, sep)); return std::make_unique<TransportLayerManager>(std::move(retVector));}Status TransportLayerManager::setup() { for (auto&& tl : _tls) { // 调用所有管理的TransportLayer::setup auto status = tl->setup(); }}Status TransportLayerManager::start() { for (auto&& tl : _tls) { // 调用所有管理的TransportLayer::start auto status = tl->start(); }}
复制代码
TransportLayerASIO
TransportLayerASIO 是一个基于 ASIO 封装的 TransportLayer(ASIO 是 C++中用于网络编程的库)。
主要源码文件 src/mongo/transport/transport_layer_asio.cpp
Status TransportLayerASIO::setup() { // 从配置中获取服务端需要监听的ip、port等信息}Status TransportLayerASIO::start() { // 开始监听连接 for (auto& acceptor : _acceptors) { // 调用_acceptConnection等待连接 _acceptConnection(acceptor.second); }}void TransportLayerASIO::_acceptConnection(GenericAcceptor& acceptor) { auto acceptCb = [this, &acceptor](const std::error_code& ec, GenericSocket peerSocket) mutable { try { // 创建一个Session,然后调用ServiceEntryPoint::startSession处理 std::shared_ptr<ASIOSession> session( new ASIOSession(this, std::move(peerSocket), true)); _sep->startSession(std::move(session)); } catch (const DBException& e) {} // 处理完会话或异常之后,重新调用_acceptConnection _acceptConnection(acceptor); } acceptor.async_accept(*_ingressReactor, std::move(acceptCb)); // 设置有连接时候的callback}
复制代码
ServiceEntryPoint
ServiceEntryPoint 负责接收来自 TransportLayer 的 Session,按照获取 Message、处理 Message、回复 Message 三个步骤,不断循环处理 Session 接收到的 Message,直到 Session 结束。
ServiceEntryPointImpl 实现了 ServiceEntryPoint 的 start 和 startSession 方法,mongod_main 中创建的 ServiceEntryPointMongod 是继承了 ServiceEntryPoint。
主要源码文件
src/mongo/transport/service_entry_point.h
src/mong/transport/service_entry_impl.h
src/mong/transport/service_entry_impl.cpp
// service_entry.hclass ServiceEntryPoint { // 处理一个Session virtual void startSession(transport::SessionHandle session) = 0; // 启动ServiceEntryPoint virtual Status start() = 0; // 处理一个Message,并且返回DBResponse virtual Future<DbResponse> handleRequest(OperationContext* opCtx, const Message& request) noexcept = 0;}
// service_entry_impl.hclass ServiceEntryPointImpl : public ServiceEntryPoint {private: using SSMList = std::list<transport::ServiceStateMachine>; // 维护着一个ServiceStateMachine的列表 SSMList _sessions;}
// service_entry_impl.cppStatus ServiceEntryPointImpl::start() { // 创建三种类型的ServiceExecutor并且执行start方法 if (auto status = transport::ServiceExecutorSynchronous::get(_svcCtx)->start(); !status.isOK()) { return status; } if (auto exec = transport::ServiceExecutorReserved::get(_svcCtx)) { if (auto status = exec->start(); !status.isOK()) { return status; } }
if (auto status = transport::ServiceExecutorFixed::get(_svcCtx)->start(); !status.isOK()) { return status; }}
void ServiceEntryPointImpl::startSession(transport::SessionHandle session) { // 创建一个client auto client = _svcCtx->makeClient(clientName, session); auto maybeSsmIt = [&]() -> boost::optional<SSMListIterator> { // 创建一个ServiceStateMachine并且添加到_sessions中 auto it = _sessions.emplace(_sessions.begin(), std::move(client)); // 返回这个新创建的ServiceStateMachine return it; } auto ssmIt = *maybeSsmIt; // 调用ServiceStateMachine::start auto seCtx = transport::ServiceExecutorContext{}; seCtx.setThreadingModel(transport::ServiceExecutor::getInitialThreadingModel()); // 决定了使用那个类型的ServiceExecutor seCtx.setCanUseReserved(canOverrideMaxConns); ssmIt->start(std::move(seCtx));}
复制代码
ServiceEntryPointMongod
主要文件 src/mongo/transport/service_entry_point_mongod.cpp
// service_entry_point_mongod.hclass ServiceEntryPointMongod final : public ServiceEntryPointImpl {}
// service_entry_point_mongod.cppFuture<DbResponse> ServiceEntryPointMongod::handleRequest(OperationContext* opCtx, const Message& m) noexcept { // 处理Message使用ServiceEntryPointCommon中的实现,并且注入一系列的Hooks return ServiceEntryPointCommon::handleRequest(opCtx, m, std::make_unique<Hooks>());}
复制代码
ServiceEntryPointCommon
ServiceEntryPointCommon 是一个实现了 ServiceEntryPointImpl 的公共方法集合。另外还定义了 Hooks(本次不涉及)。
主要文件 src/mongo/transport/service_entry_point_common.cpp
Future<DbResponse> ServiceEntryPointCommon::handleRequest( OperationContext* opCtx, const Message& m, std::unique_ptr<const Hooks> behaviors) { // 根据命令创建opRunner(例如QueryOpRunner),返回DBResponse,本次不介绍细节 HandleRequest hr(opCtx, m, std::move(behaviors)); auto opRunner = hr.makeOpRunner(); return opRunner->run().then([hr = std::move(hr)](DbResponse response) mutable {// ....})}
复制代码
OpRunner
定义在 service_entry_point_common.cpp 中,根据 Message 中不同的操作类型会有创建不同的 OpRunner,例如 QueryOpRunner、InsertOpRunner、UpdateOpRunner 等。
struct OpRunner { explicit OpRunner(HandleRequest* hr) : executionContext{hr->executionContext}{} virtual ~OpRunner() = default; // 执行命令,返回DbResponse virtual Future<DbResponse> run() = 0; std::shared_ptr<ExecutionContext> executionContext;};
复制代码
状态机 ServiceStateMachine
ServiceStateMachine(简称 SSM)是一个维护客户端连接的状态生命周期的状态机,每个客户端连接都会生成一个 SSM。
主要文件 src/mongo/transport/service_state_machine.cpp
// service_state_machine.hclass ServiceStateMachine { // start之后,SSM会按照获取Message、处理Message、回复Message步骤执行。维护client的状态,确保状态流转正确。 void start(ServiceExecutorContext seCtx);}
// service_state_machine.cpp// 定义一个SSM的Implclass ServiceStateMachine::Impl final : public std::enable_shared_from_this<ServiceStateMachine::Impl> {public: // 定义状态机的所有状态,整个生命周期如下 // Source -> SourceWait -> Process -> SinkWait -> Source (standard RPC) // Source -> SourceWait -> Process -> SinkWait -> Process -> SinkWait ... (exhaust) // Source -> SourceWait -> Process -> Source (fire-and-forget) enum class State { Created, // Session创建之后,还没有操作 Source, // 等待新Message SourceWait, // 从请求中读取Message,等待Message接收完成 Process, // 已经通知DB开始执行Message SinkWait, // 等待DB结果通过网络返回客户端 EndSession, // 结束Session,待Session结束后当前SSM会失效 Ended // Session已经结束 }; void start(ServiceExecutorContext seCtx); // 通知DB执行Message Future<void> processMessage(); // 以下两个方法,在TransportLayer完成网络操作之后回调 void sourceCallback(Status status); void sinkCallback(Status status); // 通过TransportLayer读取请求或者返回响应 Future<void> sourceMessage(); Future<void> sinkMessage(); // 通过ServiceExecutor开启循环的调度 void scheduleNewLoop(Status status); // 开始一个新的循环(例如source、process、最后skin) void startNewLoop(const Status& execStatus); // 根据ServiceExecutor ServiceExecutor* executor() { return ServiceExecutorContext::get(_clientStrand->getClientPointer())->getServiceExecutor(); } private: AtomicWord<State> _state{State::Created}; // 创建SSM,默认Created bool _inExhaust = false;}
void ServiceStateMachine::Impl::start(ServiceExecutorContext seCtx) { scheduleNewLoop(Status::OK());}
复制代码
循环-lookup
void ServiceStateMachine::Impl::scheduleNewLoop(Status status) try { auto cb = [this, anchor = shared_from_this()](Status executorStatus) { _clientStrand->run([&] { startNewLoop(executorStatus); }); }; try { // 调用ServiceExecutor的schedule或runOnDataAvailable调度任务执行,执行触发cb回调 if (_inExhaust) { // If we're in exhaust, we're not expecting more data. executor()->schedule(std::move(cb)); } else { executor()->runOnDataAvailable(session(), std::move(cb)); } } catch (const DBException& ex) { _state.store(State::EndSession); }}
void ServiceStateMachine::Impl::startNewLoop(const Status& executorStatus) { makeReadyFutureWith([&]() -> Future<void> { if (_inExhaust) { return Status::OK(); } else { // 读取Message return sourceMessage(); } }) .then([this]() { return processMessage(); // 处理Message }) .then([this]() -> Future<void> { if (_outMessage.empty()) { return Status::OK(); }
return sinkMessage(); // 等待Message回复 }) .getAsync([this, anchor = shared_from_this()](Status status) { scheduleNewLoop(std::move(status)); });}
复制代码
获取 Message-sourceMessage
Future<void> ServiceStateMachine::Impl::sourceMessage() { // 更新状态为SourceWait _state.store(State::SourceWait); auto sourceMsgImpl = [&] { // 读取Message if (transportMode == transport::Mode::kSynchronous) { return Future<Message>::makeReady(session()->sourceMessage()); } else { return session()->asyncSourceMessage(); } } return sourceMsgImpl().onCompletion([this](StatusWith<Message> msg) -> Future<void> { if (msg.isOK()) { // 读取完成记录到_inMessage变量中 _inMessage = std::move(msg.getValue()); invariant(!_inMessage.empty()); } // 触发sourceCallback sourceCallback(msg.getStatus()); return Status::OK(); });}
void ServiceStateMachine::Impl::sourceCallback(Status status) { // 更新状态为Process _state.store(State::Process);}
复制代码
处理 Message-handleMessage
Future<void> ServiceStateMachine::Impl::processMessage() { // 调用ServiceEntryPoint::handleRequest处理Message return _sep->handleRequest(_opCtx.get(), _inMessage).then([this, &compressorMgr = compressorMgr](DbResponse dbresponse) mutable -> void { Message& toSink = dbresponse.response; if (!toSink.empty()) { _outMessage = std::move(toSink); } else { // Message已经处理完成,等待下一次Message _state.store(State::Source); _inExhaust = false; } }}
复制代码
返回响应-replyMessage
Future<void> ServiceStateMachine::Impl::sinkMessage() { // 更新状态为SinkWait _state.store(State::SinkWait); auto toSink = std::exchange(_outMessage, {}); auto sinkMsgImpl = [&] { // 通过TransportLayer返回response if (transportMode == transport::Mode::kSynchronous) { return Future<void>::makeReady(session()->sinkMessage(std::move(toSink))); } else { return session()->asyncSinkMessage(std::move(toSink)); } } return sinkMsgImpl().onCompletion([this](Status status) { // 返回响应完成之后,触发sinkCallback sinkCallback(std::move(status)); return Status::OK(); });}
void ServiceStateMachine::Impl::sinkCallback(Status status) { _state.store(State::EndSession);}
复制代码
任务调度 ServiceExecutor
ServiceExecutor 负责调度并且回调执行 Task,例如 ServiceExecutorSynchronous 实现同步处理 Task。
主要文件 src/mongo/transport/service_executor.cpp
// service_executor.hclass ServiceExecutor : public OutOfLineExecutor {public: // 执行Task并且马上返回 virtual Status scheduleTask(Task task, ScheduleFlags flags) = 0; // 对scheduleTask的包装 void schedule(OutOfLineExecutor::Task func) override { iassert(scheduleTask([task = std::move(func)]() mutable { task(Status::OK()); }, ScheduleFlags::kEmptyFlags)); } // 等待session中有可用数据才执行schedule调度 virtual void runOnDataAvailable(const SessionHandle& session, OutOfLineExecutor::Task onCompletionCallback) = 0;
// 根据_threadingModel获取不同的ServiceExecutor ServiceExecutor* getServiceExecutor() noexcept;private: ThreadingModel _threadingModel = ThreadingModel::kDedicated; // 决定使用哪种ServiceExecutor}
复制代码
ServiceExecutorSynchronous
ServiceExecutorSynchronous 每个连接都会有对应的 work 线程负责调度。当被调用 runOnDataAvailable 时候,会创建新线程(如果当前线程超过 CPU 核数,不会创建线程,而是让出当前 CPU 时间)。
ServiceExecutorReserved
ServiceExecutorReserved 根据配置启动 serverGlobalParams.reservedThreads 个线程,并且确保无论任何时候都会有 reservedThreads 个线程处理任务(即使线程处于空闲状态)。
ServiceExecutorFixed
ServiceExecutorFixed 相当于线程池,在不超过配置数量的情况下,新任务会启动新线程调度。
to be continue
评论