简介
通过 MongoDB 的源码学习,了解更多细节。本篇主要关注 MongoDB 从接收请求到执行之间的过程(其实就是突然脑抽筋想写点东西)。
流程:从接收请求到执行命令
Talk is cheap, show you the picture !
对象源码浅析
mongod_main
作用是根据配置初始化其他模块对象并且启动服务。
源码文件 src/mongo/db/mongod_main.cpp
int mongod_main(int argc, char* argv[]) {
// 生成ServiceEntryPoint,注册到ServiceContext中
service->setServiceEntryPoint(std::make_unique<ServiceEntryPointMongod>(service));
// 生成TransportLayer并且注册到ServiceContext中
auto tl =
transport::TransportLayerManager::createWithConfig(&serverGlobalParams, serviceContext);
auto res = tl->setup();
serviceContext->setTransportLayer(std::move(tl));
// 启动TransportLayer和ServiceEntryPoint
auto start = serviceContext->getServiceEntryPoint()->start();
start = serviceContext->getTransportLayer()->start();
}
复制代码
ServiceContext
上下文对象,mongod_main 将初始化完成的模块都注册到 ServiceContext 中,并且将这个 ServiceContext 传递到各个模块,这样就实现了不同模块之间的互相调用。
主要源码文件src/mongo/db/service_context.h
class ServiceContext final : public Decorable<ServiceContext>
public:
void setServiceEntryPoint(std::unique_ptr<ServiceEntryPoint> sep);
void setTransportLayer(std::unique_ptr<transport::TransportLayer> tl);
transport::TransportLayer* getTransportLayer() const;
ServiceEntryPoint* getServiceEntryPoint() const;
private:
SyncUnique<transport::TransportLayer> _transportLayer;
SyncUnique<ServiceEntryPoint> _serviceEntryPoint;
}
复制代码
传输层 TransportLayer
TransportLayer 负责将 Message 从 Endpoint 传递到 database。
TransportLayer 维护着 acceptor 用于接收请求,创建一个对应的 session 传递给 database(当中经过 ServiceEntryPoint)。database 需要管理 session,使其完成读取 Message、处理 Message、回复 Message 三个步骤组成的生命周期。
主要源码文件 src/mongo/transport/transport_layer.h
class TransportLayer {
public:
// 开启监听连接之前的准备操作
virtual Status setup() = 0;
// 开始监听连接
virtual Status start() = 0;
// 不再监听连接,并且结束所有session
virtual void shutdown() = 0;
}
复制代码
TransportLayerManager
TransportLayerManager 实现了 TransportLayer 的接口,内部维护其他 TransportLayer,让 Mongod 和 Mongos 调用其他 TransportLayer 时候更方便
主要源码文件 src/mongo/transport/transport_layer_manager.h
和 src/mongo/transport/transport_layer_manager.cpp
// tranport_layer_manager.h
class TransportLayerManager final : public TransportLayer {
private:
// 维护其他TransportLayer
std::vector<std::unique_ptr<TransportLayer>> _tls;
public:
// 在mongod_main中被调用,根据配置创建TransportLayerManager
static std::unique_ptr<TransportLayer> createWithConfig(const ServerGlobalParams* config, ServiceContext* ctx);
}
// tranport_layer_manager.cpp
std::unique_ptr<TransportLayer> TransportLayerManager::createWithConfig(const ServerGlobalParams* config, ServiceContext* ctx) {
// 因为mongod_main中设置的是ServiceEntryPointMongod,因此这里这里会得到ServiceEntryPointMongod
auto sep = ctx->getServiceEntryPoint();
// 创建TransportLayerASIO,并且作为TransportLayerManager管理的第一个TransportLayer
retVector.emplace_back(std::make_unique<transport::TransportLayerASIO>(opts, sep));
return std::make_unique<TransportLayerManager>(std::move(retVector));
}
Status TransportLayerManager::setup() {
for (auto&& tl : _tls) {
// 调用所有管理的TransportLayer::setup
auto status = tl->setup();
}
}
Status TransportLayerManager::start() {
for (auto&& tl : _tls) {
// 调用所有管理的TransportLayer::start
auto status = tl->start();
}
}
复制代码
TransportLayerASIO
TransportLayerASIO 是一个基于 ASIO 封装的 TransportLayer(ASIO 是 C++中用于网络编程的库)。
主要源码文件 src/mongo/transport/transport_layer_asio.cpp
Status TransportLayerASIO::setup() {
// 从配置中获取服务端需要监听的ip、port等信息
}
Status TransportLayerASIO::start() {
// 开始监听连接
for (auto& acceptor : _acceptors) {
// 调用_acceptConnection等待连接
_acceptConnection(acceptor.second);
}
}
void TransportLayerASIO::_acceptConnection(GenericAcceptor& acceptor) {
auto acceptCb = [this, &acceptor](const std::error_code& ec, GenericSocket peerSocket) mutable {
try {
// 创建一个Session,然后调用ServiceEntryPoint::startSession处理
std::shared_ptr<ASIOSession> session(
new ASIOSession(this, std::move(peerSocket), true));
_sep->startSession(std::move(session));
} catch (const DBException& e) {}
// 处理完会话或异常之后,重新调用_acceptConnection
_acceptConnection(acceptor);
}
acceptor.async_accept(*_ingressReactor, std::move(acceptCb)); // 设置有连接时候的callback
}
复制代码
ServiceEntryPoint
ServiceEntryPoint 负责接收来自 TransportLayer 的 Session,按照获取 Message、处理 Message、回复 Message 三个步骤,不断循环处理 Session 接收到的 Message,直到 Session 结束。
ServiceEntryPointImpl 实现了 ServiceEntryPoint 的 start 和 startSession 方法,mongod_main 中创建的 ServiceEntryPointMongod 是继承了 ServiceEntryPoint。
主要源码文件
src/mongo/transport/service_entry_point.h
src/mong/transport/service_entry_impl.h
src/mong/transport/service_entry_impl.cpp
// service_entry.h
class ServiceEntryPoint {
// 处理一个Session
virtual void startSession(transport::SessionHandle session) = 0;
// 启动ServiceEntryPoint
virtual Status start() = 0;
// 处理一个Message,并且返回DBResponse
virtual Future<DbResponse> handleRequest(OperationContext* opCtx,
const Message& request) noexcept = 0;
}
// service_entry_impl.h
class ServiceEntryPointImpl : public ServiceEntryPoint {
private:
using SSMList = std::list<transport::ServiceStateMachine>;
// 维护着一个ServiceStateMachine的列表
SSMList _sessions;
}
// service_entry_impl.cpp
Status ServiceEntryPointImpl::start() {
// 创建三种类型的ServiceExecutor并且执行start方法
if (auto status = transport::ServiceExecutorSynchronous::get(_svcCtx)->start();
!status.isOK()) {
return status;
}
if (auto exec = transport::ServiceExecutorReserved::get(_svcCtx)) {
if (auto status = exec->start(); !status.isOK()) {
return status;
}
}
if (auto status = transport::ServiceExecutorFixed::get(_svcCtx)->start(); !status.isOK()) {
return status;
}
}
void ServiceEntryPointImpl::startSession(transport::SessionHandle session) {
// 创建一个client
auto client = _svcCtx->makeClient(clientName, session);
auto maybeSsmIt = [&]() -> boost::optional<SSMListIterator> {
// 创建一个ServiceStateMachine并且添加到_sessions中
auto it = _sessions.emplace(_sessions.begin(), std::move(client));
// 返回这个新创建的ServiceStateMachine
return it;
}
auto ssmIt = *maybeSsmIt;
// 调用ServiceStateMachine::start
auto seCtx = transport::ServiceExecutorContext{};
seCtx.setThreadingModel(transport::ServiceExecutor::getInitialThreadingModel()); // 决定了使用那个类型的ServiceExecutor
seCtx.setCanUseReserved(canOverrideMaxConns);
ssmIt->start(std::move(seCtx));
}
复制代码
ServiceEntryPointMongod
主要文件 src/mongo/transport/service_entry_point_mongod.cpp
// service_entry_point_mongod.h
class ServiceEntryPointMongod final : public ServiceEntryPointImpl {
}
// service_entry_point_mongod.cpp
Future<DbResponse> ServiceEntryPointMongod::handleRequest(OperationContext* opCtx,
const Message& m) noexcept {
// 处理Message使用ServiceEntryPointCommon中的实现,并且注入一系列的Hooks
return ServiceEntryPointCommon::handleRequest(opCtx, m, std::make_unique<Hooks>());
}
复制代码
ServiceEntryPointCommon
ServiceEntryPointCommon 是一个实现了 ServiceEntryPointImpl 的公共方法集合。另外还定义了 Hooks(本次不涉及)。
主要文件 src/mongo/transport/service_entry_point_common.cpp
Future<DbResponse> ServiceEntryPointCommon::handleRequest(
OperationContext* opCtx,
const Message& m,
std::unique_ptr<const Hooks> behaviors) {
// 根据命令创建opRunner(例如QueryOpRunner),返回DBResponse,本次不介绍细节
HandleRequest hr(opCtx, m, std::move(behaviors));
auto opRunner = hr.makeOpRunner();
return opRunner->run().then([hr = std::move(hr)](DbResponse response) mutable {// ....})
}
复制代码
OpRunner
定义在 service_entry_point_common.cpp 中,根据 Message 中不同的操作类型会有创建不同的 OpRunner,例如 QueryOpRunner、InsertOpRunner、UpdateOpRunner 等。
struct OpRunner {
explicit OpRunner(HandleRequest* hr) : executionContext{hr->executionContext}{}
virtual ~OpRunner() = default;
// 执行命令,返回DbResponse
virtual Future<DbResponse> run() = 0;
std::shared_ptr<ExecutionContext> executionContext;
};
复制代码
状态机 ServiceStateMachine
ServiceStateMachine(简称 SSM)是一个维护客户端连接的状态生命周期的状态机,每个客户端连接都会生成一个 SSM。
主要文件 src/mongo/transport/service_state_machine.cpp
// service_state_machine.h
class ServiceStateMachine {
// start之后,SSM会按照获取Message、处理Message、回复Message步骤执行。维护client的状态,确保状态流转正确。
void start(ServiceExecutorContext seCtx);
}
// service_state_machine.cpp
// 定义一个SSM的Impl
class ServiceStateMachine::Impl final
: public std::enable_shared_from_this<ServiceStateMachine::Impl> {
public:
// 定义状态机的所有状态,整个生命周期如下
// Source -> SourceWait -> Process -> SinkWait -> Source (standard RPC)
// Source -> SourceWait -> Process -> SinkWait -> Process -> SinkWait ... (exhaust)
// Source -> SourceWait -> Process -> Source (fire-and-forget)
enum class State {
Created, // Session创建之后,还没有操作
Source, // 等待新Message
SourceWait, // 从请求中读取Message,等待Message接收完成
Process, // 已经通知DB开始执行Message
SinkWait, // 等待DB结果通过网络返回客户端
EndSession, // 结束Session,待Session结束后当前SSM会失效
Ended // Session已经结束
};
void start(ServiceExecutorContext seCtx);
// 通知DB执行Message
Future<void> processMessage();
// 以下两个方法,在TransportLayer完成网络操作之后回调
void sourceCallback(Status status);
void sinkCallback(Status status);
// 通过TransportLayer读取请求或者返回响应
Future<void> sourceMessage();
Future<void> sinkMessage();
// 通过ServiceExecutor开启循环的调度
void scheduleNewLoop(Status status);
// 开始一个新的循环(例如source、process、最后skin)
void startNewLoop(const Status& execStatus);
// 根据ServiceExecutor
ServiceExecutor* executor() {
return ServiceExecutorContext::get(_clientStrand->getClientPointer())->getServiceExecutor();
}
private:
AtomicWord<State> _state{State::Created}; // 创建SSM,默认Created
bool _inExhaust = false;
}
void ServiceStateMachine::Impl::start(ServiceExecutorContext seCtx) {
scheduleNewLoop(Status::OK());
}
复制代码
循环-lookup
void ServiceStateMachine::Impl::scheduleNewLoop(Status status) try {
auto cb = [this, anchor = shared_from_this()](Status executorStatus) {
_clientStrand->run([&] { startNewLoop(executorStatus); });
};
try {
// 调用ServiceExecutor的schedule或runOnDataAvailable调度任务执行,执行触发cb回调
if (_inExhaust) {
// If we're in exhaust, we're not expecting more data.
executor()->schedule(std::move(cb));
} else {
executor()->runOnDataAvailable(session(), std::move(cb));
}
} catch (const DBException& ex) {
_state.store(State::EndSession);
}
}
void ServiceStateMachine::Impl::startNewLoop(const Status& executorStatus) {
makeReadyFutureWith([&]() -> Future<void> {
if (_inExhaust) {
return Status::OK();
} else {
// 读取Message
return sourceMessage();
}
})
.then([this]() { return processMessage(); // 处理Message })
.then([this]() -> Future<void> {
if (_outMessage.empty()) {
return Status::OK();
}
return sinkMessage(); // 等待Message回复
})
.getAsync([this, anchor = shared_from_this()](Status status) {
scheduleNewLoop(std::move(status));
});
}
复制代码
获取 Message-sourceMessage
Future<void> ServiceStateMachine::Impl::sourceMessage() {
// 更新状态为SourceWait
_state.store(State::SourceWait);
auto sourceMsgImpl = [&] {
// 读取Message
if (transportMode == transport::Mode::kSynchronous) {
return Future<Message>::makeReady(session()->sourceMessage());
} else {
return session()->asyncSourceMessage();
}
}
return sourceMsgImpl().onCompletion([this](StatusWith<Message> msg) -> Future<void> {
if (msg.isOK()) {
// 读取完成记录到_inMessage变量中
_inMessage = std::move(msg.getValue());
invariant(!_inMessage.empty());
}
// 触发sourceCallback
sourceCallback(msg.getStatus());
return Status::OK();
});
}
void ServiceStateMachine::Impl::sourceCallback(Status status) {
// 更新状态为Process
_state.store(State::Process);
}
复制代码
处理 Message-handleMessage
Future<void> ServiceStateMachine::Impl::processMessage() {
// 调用ServiceEntryPoint::handleRequest处理Message
return _sep->handleRequest(_opCtx.get(), _inMessage).then([this, &compressorMgr = compressorMgr](DbResponse dbresponse) mutable -> void {
Message& toSink = dbresponse.response;
if (!toSink.empty()) {
_outMessage = std::move(toSink);
} else {
// Message已经处理完成,等待下一次Message
_state.store(State::Source);
_inExhaust = false;
}
}
}
复制代码
返回响应-replyMessage
Future<void> ServiceStateMachine::Impl::sinkMessage() {
// 更新状态为SinkWait
_state.store(State::SinkWait);
auto toSink = std::exchange(_outMessage, {});
auto sinkMsgImpl = [&] {
// 通过TransportLayer返回response
if (transportMode == transport::Mode::kSynchronous) {
return Future<void>::makeReady(session()->sinkMessage(std::move(toSink)));
} else {
return session()->asyncSinkMessage(std::move(toSink));
}
}
return sinkMsgImpl().onCompletion([this](Status status) {
// 返回响应完成之后,触发sinkCallback
sinkCallback(std::move(status));
return Status::OK();
});
}
void ServiceStateMachine::Impl::sinkCallback(Status status) {
_state.store(State::EndSession);
}
复制代码
任务调度 ServiceExecutor
ServiceExecutor 负责调度并且回调执行 Task,例如 ServiceExecutorSynchronous 实现同步处理 Task。
主要文件 src/mongo/transport/service_executor.cpp
// service_executor.h
class ServiceExecutor : public OutOfLineExecutor {
public:
// 执行Task并且马上返回
virtual Status scheduleTask(Task task, ScheduleFlags flags) = 0;
// 对scheduleTask的包装
void schedule(OutOfLineExecutor::Task func) override {
iassert(scheduleTask([task = std::move(func)]() mutable { task(Status::OK()); },
ScheduleFlags::kEmptyFlags));
}
// 等待session中有可用数据才执行schedule调度
virtual void runOnDataAvailable(const SessionHandle& session,
OutOfLineExecutor::Task onCompletionCallback) = 0;
// 根据_threadingModel获取不同的ServiceExecutor
ServiceExecutor* getServiceExecutor() noexcept;
private:
ThreadingModel _threadingModel = ThreadingModel::kDedicated; // 决定使用哪种ServiceExecutor
}
复制代码
ServiceExecutorSynchronous
ServiceExecutorSynchronous 每个连接都会有对应的 work 线程负责调度。当被调用 runOnDataAvailable 时候,会创建新线程(如果当前线程超过 CPU 核数,不会创建线程,而是让出当前 CPU 时间)。
ServiceExecutorReserved
ServiceExecutorReserved 根据配置启动 serverGlobalParams.reservedThreads 个线程,并且确保无论任何时候都会有 reservedThreads 个线程处理任务(即使线程处于空闲状态)。
ServiceExecutorFixed
ServiceExecutorFixed 相当于线程池,在不超过配置数量的情况下,新任务会启动新线程调度。
to be continue
评论