写点什么

mongodb 源码实现系列 - command 命令处理模块源码实现一

发布于: 2020 年 11 月 25 日
mongodb 源码实现系列 - command命令处理模块源码实现一

关于作者

前滴滴出行技术专家,现任 OPPO 文档数据库 mongodb 负责人,负责 oppo 千万级峰值 TPS/十万亿级数据量文档数据库 mongodb 内核研发及运维工作,一直专注于分布式缓存、高性能服务端、数据库、中间件等相关研发。后续持续分享《MongoDB 内核源码设计、性能优化、最佳运维实践》,Github 账号地址:https://github.com/y123456yz

1. 背景

     <<transport_layer 网络传输层模块源码实现>>中分享了 mongodb 内核底层网络 IO 处理相关实现,包括套接字初始化、一个完整 mongodb 报文的读取、获取到 DB 数据发送给客户端等。Mongodb 支持多种增、删、改、查、聚合处理、cluster 处理等操作,每个操作在内核实现中对应一个 command,每个 command 有不同的功能,mongodb 内核如何进行 command 源码处理将是本文分析的重点

此外,mongodb 提供了 mongostat 工具来监控当前集群的各种操作统计。Mongostat 监控统计如下图所示:

其中,insert、delete、update、query 这四项统计比较好理解,分别对应增、删、改、查。但是,comand、getmore 不是很好理解,command 代表什么统计?getMore 代表什么统计?,这两项相对比较难理解。

此外,通过本文字分析,我们将搞明白这六项统计的具体含义,同时弄清这六项统计由那些操作进行计数。

Command 命令处理模块分为:mongos 操作命令、mongod 操作命令、mongodb 集群内部命令,具体定义如下:

① mongos 操作命令,客户端可以通过 mongos 访问集群相关的命令。

② mongod 操作命令:客户端可以通过 mongod 复制集和 cfg server 访问集群的相关命令。

③ mongodb 集群内部命令:mongos、mongod、mongo-cfg 集群实例之间交互的命令。

     Command 命令处理模块核心代码实现如下:


     《command 命令处理模块源码实现》相关文章重点分析命令处理模块核心代码实现,也就是上面截图中的命令处理源码文件实现。

2. <<transport_layer 网络传输层模块源码实现>>衔接回顾

<<transport_layer 网络传输层模块源码实现三>>一文中,我们对 service_state_machine 状态机调度子模块进行了分析,该模块中的 dealTask 任务进行 mongodb 内部业务逻辑处理,其核心实现如下:

1.//dealTask处理  2.void ServiceStateMachine::_processMessage(ThreadGuard guard) {  3.    ......4.    //command处理、DB访问后的数据通过dbresponse返回  5.    DbResponse dbresponse = _sep->handleRequest(opCtx.get(), _inMessage);  6.    ......}
复制代码

上面的 sep 对应 mongod 或者 mongos 实例的服务入口实现,该 seq 成员分别在如下代码中初始化为 ServiceEntryPointMongod 和 ServiceEntryPointMongod 类实现。SSM 状态机的_seq 成员初始化赋值核心代码实现如下:

1.//mongos实例启动初始化  2.static ExitCode runMongosServer() {  3.    ......  4.    //mongos实例对应sep为ServiceEntryPointMongos  5.    auto sep = stdx::make_unique<ServiceEntryPointMongos>(getGlobalServiceContext());  6.    getGlobalServiceContext()->setServiceEntryPoint(std::move(sep));  7.    ......  8.}  9.  10.//mongod实例启动初始化  11.ExitCode _initAndListen(int listenPort) {  12.    ......  13.    //mongod实例对应sep为ServiceEntryPointMongod  14.    serviceContext->setServiceEntryPoint(  15.        stdx::make_unique<ServiceEntryPointMongod>(serviceContext));  16.    ......  17.}  18.  19.//SSM状态机初始化  20.ServiceStateMachine::ServiceStateMachine(...)  21.    : _state{State::Created},  22.      //mongod和mongos实例的服务入口通过这里赋值给_seq成员变量  23.      _sep{svcContext->getServiceEntryPoint()},  24.      ......  25.} 
复制代码

通过上面的几个核心接口把 mongos 和 mongod 实例的服务入口与状态机 SSM(ServiceStateMachine)联系起来,最终和下面的 command 命令处理模块关联。

  dealTask 进行一次 mongodb 请求的内部逻辑处理,该处理由_sep->handleRequest()接口实现。由于 mongos 和 mongod 服务入口分别由 ServiceEntryPointMongos 和 ServiceEntryPointMongod 两个类实现,因此 dealTask 也就演变为如下接口处理:

① mongos 实例:ServiceEntryPointMongos::handleRequest(...)

② Mongod 实例::ServiceEntryPointMongod::handleRequest(...)

这两个接口入参都是 OperationContext 和 Message,分别对应操作上下文、请求原始数据内容。下文会分析 Message 解析实现、OperationContext 服务上下文实现将在后续章节分析。

Mongod 和 mongos 实例服务入口类都继承自网络传输模块中的 ServiceEntryPointImpl 类,如下图所示:


Tips: mongos 和 mongod 服务入口类为何要继承网络传输模块服务入口类?

原因是一个请求对应一个链接 session,该 session 对应的请求又和 SSM 状态机唯一对应。所有客户端请求对应的 SSM 状态机信息全部保存再 ServiceEntryPointImpl._sessions 成员中,而 command 命令处理模块为 SSM 状态机任务中的 dealTask 任务,通过该继承关系,ServiceEntryPointMongod 和 ServiceEntryPointMongos 子类也就可以和状态机及任务处理关联起来,同时也可以获取当前请求对应的 session 链接信息。

3. Mongodb 协议解析

在《transport_layer 网络传输层模块源码实现二》中的数据收发子模块完成了一个完整 mongodb 报文的接收,一个 mongodb 报文由 Header 头部+opCode 包体组成,如下图所示:

上图中各个字段说明如下表:

opCode 取值比较多,早期版本中 OPINSERT、OPDELETE、OPUPDATE、OPQUERY 分别针对增删改查请求,Mongodb 从 3.6 版本开始默认使用 OPMSG 操作作为默认 opCode,是一种可扩展的消息格式,旨在包含其他操作码的功能,新版本读写请求协议都对应该操作码。本文以 OPMSG 操作码对应协议为例进行分析,其他操作码协议分析过程类似,OP_MSG 请求协议格式如下:

1.OP_MSG {  2.    //mongodb报文头部  3.    MsgHeader header;            4.    //位图,用于标识报文是否需要校验 是否需要应答等  5.    uint32 flagBits;           // message flags  6.    //报文内容,例如find write等命令内容通过bson格式存在于该结构中  7.    Sections[] sections;       // data sections  8.    //报文CRC校验  9.    optional<uint32> checksum; // optional CRC-32C checksum  }  
复制代码

OP_MSG 各个字段说明如下表:

一个完整 OP_MSG 请求格式如下:

除了通用头部 header 外,客户端命令请求实际上都保存于 sections 字段中,该字段存放的是请求的原始 bson 格式数据。BSON 是由 10gen 开发的一个数据格式,目前主要用于 MongoDB 中,是 MongoDB 的数据存储格式。BSON 基于 JSON 格式,选择 JSON 进行改造的原因主要是 JSON 的通用性及 JSON 的 schemaless 的特性。BSON 相比 JSON 具有以下特性:

① Lightweight(更轻量级)

② Traversable(易操作)

③ Efficient(高效性能)

本文重点不是分析 bson 协议格式,bson 协议实现细节将在后续章节分享。bson 协议更多设计细节详见:http://bsonspec.org/

总结:一个完整 mongodb 报文由 header+body 组成,其中 header 长度固定为 16 字节,body 长度等于 messageLength-16。Header 部分协议解析由 message.cpp 和 message.h 两源码文件实现,body 部分对应的 OP_MSG 类请求解析由 op_msg.cpp 和 op_msg.h 两源码文件实现。

4. mongodb 报文通用头部解析及封装源码实现

Header 头部解析由 src/mongo/util/net 目录下 message.cpp 和 message.h 两文件完成,该类主要完成通用 header 头部和 body 部分的解析、封装。因此报文头部核心代码分为以下两类:

① 报文头部内容解析及封装(MSGHEADER 命名空间实现)

② 头部和 body 内容解析及封装(MsgData 命名空间实现)

4.1 mongodb 报文头部解析及封装核心代码实现

mongodb 报文头部解析由 namespace MSGHEADER {...}实现,该类主要成员及接口实现如下:

1.namespace MSGHEADER {  2.//header头部各个字段信息  3.struct Layout {  4.    //整个message长度,包括header长度和body长度  5.    int32_t messageLength;     6.    //requestID 该请求id信息  7.    int32_t requestID;         8.    //getResponseToMsgId解析  9.    int32_t responseTo;        10.    //操作类型:OP_UPDATE、OP_INSERT、OP_QUERY、OP_DELETE、OP_MSG等  11.    int32_t opCode;  12.};  13.  14.//ConstView实现header头部数据解析  15.class ConstView {   16.public:  17.    ......  18.    //初始化构造  19.    ConstView(const char* data) : _data(data) {}  20.    //获取_data地址  21.    const char* view2ptr() const {  22.        return data().view();  23.    }  24.    //TransportLayerASIO::ASIOSourceTicket::_headerCallback调用  25.    //解析header头部的messageLength字段  26.    int32_t getMessageLength() const {  27.        return data().read<LittleEndian<int32_t>>(offsetof(Layout, messageLength));  28.    }  29.    //解析header头部的requestID字段  30.    int32_t getRequestMsgId() const {  31.        return data().read<LittleEndian<int32_t>>(offsetof(Layout, requestID));  32.    }  33.    //解析header头部的getResponseToMsgId字段  34.    int32_t getResponseToMsgId() const {  35.        return data().read<LittleEndian<int32_t>>(offsetof(Layout, responseTo));  36.    }  37.    //解析header头部的opCode字段  38.    int32_t getOpCode() const {  39.        return data().read<LittleEndian<int32_t>>(offsetof(Layout, opCode));  40.    }  41.  42.protected:  43.    //mongodb报文数据起始地址  44.    const view_type& data() const {  45.        return _data;  46.    }  47.private:  48.    //数据部分  49.    view_type _data;  50.};  51.  52.//View填充header头部数据  53.class View : public ConstView {  54.public:  55.    ......  56.    //构造初始化  57.    View(char* data) : ConstView(data) {}  58.    //header起始地址  59.    char* view2ptr() {  60.        return data().view();  61.    }  62.    //以下四个接口进行header填充  63.    //填充header头部messageLength字段  64.    void setMessageLength(int32_t value) {  65.        data().write(tagLittleEndian(value), offsetof(Layout, messageLength));  66.    }  67.    //填充header头部requestID字段  68.    void setRequestMsgId(int32_t value) {  69.        data().write(tagLittleEndian(value), offsetof(Layout, requestID));  70.    }  71.    //填充header头部responseTo字段  72.    void setResponseToMsgId(int32_t value) {  73.        data().write(tagLittleEndian(value), offsetof(Layout, responseTo));  74.    }  75.    //填充header头部opCode字段  76.    void setOpCode(int32_t value) {  77.        data().write(tagLittleEndian(value), offsetof(Layout, opCode));  78.    }  79.private:  80.    //指向header起始地址  81.    view_type data() const {  82.        return const_cast<char*>(ConstView::view2ptr());  83.    }  84.};  85.}
复制代码

从上面的 header 头部解析、填充的实现类可以看出,header 头部解析由 MSGHEADER::ConstView 实现;header 头部填充由 MSGHEADER::View 完成。实际上代码实现上,通过 offsetof 来进行移位,从而快速定位到头部对应字段。

4.2 mongodb 报文头部+body 解析封装核心代码实现

Namespace MSGHEADER{...}命名空间只负责 header 头部的处理,namespace MsgData{...}命名空间相对 MSGHEADER 命名空间更加完善,除了处理头部解析封装外,还负责 body 数据起始地址维护、body 数据封装、数据长度检查等。MsgData 命名空间核心代码实现如下:

1.namespace MsgData {  2.struct Layout {  3.    //数据填充组成:header部分  4.    MSGHEADER::Layout header;  5.    //数据填充组成: body部分,body先用data占位置  6.    char data[4];  7.};  8.  9.//解析header字段信息及body其实地址信息  10.class ConstView {  11.public:  12.    //初始化构造  13.    ConstView(const char* storage) : _storage(storage) {}  14.    //获取数据起始地址  15.    const char* view2ptr() const {  16.        return storage().view();  17.    }  18.  19.    //以下四个接口间接执行前面的MSGHEADER中的头部字段解析  20.    //填充header头部messageLength字段  21.    int32_t getLen() const {  22.        return header().getMessageLength();  23.    }  24.    //填充header头部requestID字段  25.    int32_t getId() const {  26.        return header().getRequestMsgId();  27.    }  28.    //填充header头部responseTo字段  29.    int32_t getResponseToMsgId() const {  30.        return header().getResponseToMsgId();  31.    }  32.    //获取网络数据报文中的opCode字段  33.    NetworkOp getNetworkOp() const {  34.        return NetworkOp(header().getOpCode());  35.    }  36.    //指向body起始地址  37.    const char* data() const {  38.        return storage().view(offsetof(Layout, data));  39.    }  40.    //messageLength长度检查,opcode检查  41.    bool valid() const {  42.        if (getLen() <= 0 || getLen() > (4 * BSONObjMaxInternalSize))  43.            return false;  44.        if (getNetworkOp() < 0 || getNetworkOp() > 30000)  45.            return false;  46.        return true;  47.    }  48.    ......  49.protected:  50.    //获取_storage  51.    const ConstDataView& storage() const {  52.        return _storage;  53.    }  54.    //指向header起始地址  55.    MSGHEADER::ConstView header() const {  56.        return storage().view(offsetof(Layout, header));  57.    }  58.private:  59.    //mongodb报文存储在这里  60.    ConstDataView _storage;  61.};  62.  63.//填充数据,包括Header和body  64.class View : public ConstView {  65.public:  66.    //构造初始化  67.    View(char* storage) : ConstView(storage) {}  68.    ......  69.    //获取报文起始地址  70.    char* view2ptr() {  71.        return storage().view();  72.    }  73.  74.    //以下四个接口间接执行前面的MSGHEADER中的头部字段构造  75.    //以下四个接口完成msg header赋值  76.    //填充header头部messageLength字段  77.    void setLen(int value) {  78.        return header().setMessageLength(value);  79.    }  80.    //填充header头部messageLength字段  81.    void setId(int32_t value) {  82.        return header().setRequestMsgId(value);  83.    }  84.    //填充header头部messageLength字段  85.    void setResponseToMsgId(int32_t value) {  86.        return header().setResponseToMsgId(value);  87.    }  88.    //填充header头部messageLength字段  89.    void setOperation(int value) {  90.        return header().setOpCode(value);  91.    }  92.  93.    using ConstView::data;  94.    //指向data  95.    char* data() {  96.        return storage().view(offsetof(Layout, data));  97.    }  98.private:  99.    //也就是报文起始地址  100.    DataView storage() const {  101.        return const_cast<char*>(ConstView::view2ptr());  102.    }  103.    //指向header头部  104.    MSGHEADER::View header() const {  105.        return storage().view(offsetof(Layout, header));  106.    }  107.};  108.  109.......  110.//Value为前面的Layout,减4是因为有4字节填充data,所以这个就是header长度  111.const int MsgDataHeaderSize = sizeof(Value) - 4;  112.  113.//除去头部后的数据部分长度  114.inline int ConstView::dataLen() const {   115.    return getLen() - MsgDataHeaderSize;  116.}  117.}  // namespace MsgData  
复制代码

     和 MSGHEADER 命名空间相比,MsgData 这个 namespace 命名空间接口实现和前面的 MSGHEADER 命名空间实现大同小异。MsgData 不仅仅处理 header 头部的解析组装,还负责 body 部分数据头部指针指向、头部长度检查、opCode 检查、数据填充等。其中,MsgData 命名空间中 header 头部的解析构造底层依赖 MSGHEADER 实现。

4.3 Message/DbMessage 核心代码实现

在《transport_layer 网络传输层模块源码实现二》中,从底层 ASIO 库接收到的 mongodb 报文是存放在 Message 结构中存储,最终存放在 ServiceStateMachine._inMessage 成员中。

在前面第 2 章我们知道 mongod 和 mongso 实例的服务入口接口 handleRequest(...)中都带有 Message 入参,也就是接收到的 Message 数据通过该接口处理。Message 类主要接口实现如下:

1.//DbMessage._msg成员为该类型  2.class Message {  3.public:  4.    //message初始化  5.    explicit Message(SharedBuffer data) : _buf(std::move(data)) {}  6.    //头部header数据  7.    MsgData::View header() const {  8.        verify(!empty());  9.        return _buf.get();  10.    }  11.    //获取网络数据报文中的op字段  12.    NetworkOp operation() const {  13.        return header().getNetworkOp();  14.    }  15.    //_buf释放为空  16.    bool empty() const {  17.        return !_buf;  18.    }  19.    //获取报文总长度messageLength  20.    int size() const {  21.        if (_buf) {  22.            return MsgData::ConstView(_buf.get()).getLen();  23.        }  24.        return 0;  25.    }  26.    //body长度  27.    int dataSize() const {  28.        return size() - sizeof(MSGHEADER::Value);  29.    }  30.    //buf重置  31.    void reset() {  32.        _buf = {};  33.    }  34.    // use to set first buffer if empty  35.    //_buf直接使用buf空间  36.    void setData(SharedBuffer buf) {  37.        verify(empty());  38.        _buf = std::move(buf);  39.    }  40.     //把msgtxt拷贝到_buf中  41.    void setData(int operation, const char* msgtxt) {  42.        setData(operation, msgtxt, strlen(msgtxt) + 1);  43.    }  44.    //根据operation和msgdata构造一个完整mongodb报文  45.    void setData(int operation, const char* msgdata, size_t len) {  46.        verify(empty());  47.        size_t dataLen = len + sizeof(MsgData::Value) - 4;  48.        _buf = SharedBuffer::allocate(dataLen);  49.        MsgData::View d = _buf.get();  50.        if (len)  51.            memcpy(d.data(), msgdata, len);  52.        d.setLen(dataLen);  53.        d.setOperation(operation);  54.    }  55.    ......  56.    //获取_buf对应指针  57.    const char* buf() const {  58.        return _buf.get();  59.    }  60.  61.private:  62.    //存放接收数据的buf  63.    SharedBuffer _buf;  64.};  
复制代码

Message 是操作 mongodb 收发报文最直接的实现类,该类主要完成一个完整 mongodb 报文封装。有关 mongodb 报文头后面的 body 更多的解析实现在 DbMessage 类中完成,DbMessage 类包含 Message 类成员 msg。实际上,Message 报文信息在 handleRequest(...)实例服务入口中赋值给 DbMessage.msg,报文后续的 body 处理继续由 DbMessage 类相关接口完成处理。DbMessage 和 Message 类关系如下:

1.class DbMessage {  2.    ......  3.    //包含Message成员变量  4.    const Message& _msg;  5.    //mongodb报文起始地址6.    const char* _nsStart; 7.    //报文结束地址8.    const char* _theEnd; 9.}  10.  11.DbMessage::DbMessage(const Message& msg) : _msg(msg),   12.  _nsStart(NULL), _mark(NULL), _nsLen(0) {  13.    //一个mongodb报文(header+body)数据的结束地址  14.    _theEnd = _msg.singleData().data() + _msg.singleData().dataLen();  15.    //报文起始地址 [_nextjsobj, _theEnd ]之间的数据就是一个完整mongodb报文  16.    _nextjsobj = _msg.singleData().data();  17.    ......  }
复制代码

DbMessage._msg 成员为 DbMessage 类型,DbMessage 的_nsStart 和_theEnd 成员分别记录完整 mongodb 报文的起始地址和结束地址,通过这两个指针就可以获取一个完整 mongodb 报文的全部内容,包括 header 和 body。

注意:DbMessage 是早期 mongodb 版本(version<3.6)中用于报文 body 解析封装的类,这些类针对 opCode=[dbUpdate, dbDelete]这个区间的操作。在 mongodb 新版本(version>=3.6)中,body 解析及封装由 op_msg.h 和 op_msg.cpp 代码文件中的 clase OpMsgRequest{}完成处理。

4.4 OpMsg 报文解析封装核心代码实现

      Mongodb 从 3.6 版本开始默认使用 OP_MSG 操作作为默认 opCode,是一种可扩展的消息格式,旨在包含其他操作码的功能,新版本读写请求协议都对应该操作码。OP_MSG 对应 mongodb 报文 body 解析封装处理由 OpMsg 类相关接口完成,OpMsg::parse(Message)从 Message 中解析出报文 body 内容,其核心代码实现如下:

1.struct OpMsg {   2.      ......  3.    //msg解析赋值见OpMsg::parse     4.    //各种命令(insert update find等)都存放在该body中  5.    BSONObj body;    6.    //sequences用法暂时没看懂,感觉没什么用?先跳过  7.    std::vector<DocumentSequence> sequences; //赋值见OpMsg::parse  8.}  
复制代码


1.//从message中解析出OpMsg信息  2.OpMsg OpMsg::parse(const Message& message) try {  3.    //message不能为空,并且opCode必须为dbMsg  4.    invariant(!message.empty());  5.    invariant(message.operation() == dbMsg);  6.    //获取flagBits  7.    const uint32_t flags = OpMsg::flags(message);  8.    //flagBits有效性检查,bit 0-15中只能对第0和第1位操作  9.    uassert(ErrorCodes::IllegalOpMsgFlag,  10.            str::stream() << "Message contains illegal flags value: Ob"  11.                          << std::bitset<32>(flags).to_string(),  12.            !containsUnknownRequiredFlags(flags));  13.  14.    //校验码默认4字节  15.    constexpr int kCrc32Size = 4;  16.    //判断该mongo报文body内容是否启用了校验功能  17.    const bool haveChecksum = flags & kChecksumPresent;  18.    //如果有启用校验功能,则报文末尾4字节为校验码  19.    const int checksumSize = haveChecksum ? kCrc32Size : 0;  20.    //sections字段内容  21.    BufReader sectionsBuf(message.singleData().data() + sizeof(flags),  22.                          message.dataSize() - sizeof(flags) - checksumSize);  23.  24.    //默认先设置位false  25.    bool haveBody = false;  26.    OpMsg msg;  27.    //解析sections对应命令请求数据  28.    while (!sectionsBuf.atEof()) {  29.        //BufReader::read读取kind内容,一个字节  30.        const auto sectionKind = sectionsBuf.read<Section>();  31.        //kind为0对应命令请求body内容,内容通过bson报错  32.        switch (sectionKind) {  33.            //sections第一个字节是0说明是body  34.            case Section::kBody: {  35.                //默认只能有一个body  36.                uassert(40430, "Multiple body sections in message", !haveBody);  37.                haveBody = true;  38.                //命令请求的bson信息保存在这里  39.                msg.body = sectionsBuf.read<Validated<BSONObj>>();  40.                break;  41.            }  42.  43.            //DocSequence暂时没看明白,用到的地方很少,跳过,后续等  44.            //该系列文章主流功能分析完成后,从头再回首分析  45.            case Section::kDocSequence: {  46.                  ......  47.            }  48.        }  49.    }  50.    //OP_MSG必须有body内容  51.    uassert(40587, "OP_MSG messages must have a body", haveBody);  52.    //body和sequence去重判断  53.    for (const auto& docSeq : msg.sequences) {  54.        ......  55.    }  56.    return msg;  }  
复制代码

OpMsg 类被 OpMsgRequest 类继承,OpMsgRequest 类中核心接口就是解析出 OpMsg.body 中的库信息和表信息,OpMsgRequest 类代码实现如下:

1.//协议解析得时候会用到,见runCommands  2.struct OpMsgRequest : public OpMsg {  3.    ......  4.    //构造初始化  5.    explicit OpMsgRequest(OpMsg&& generic) : OpMsg(std::move(generic)) {}  6.    //opMsgRequestFromAnyProtocol->OpMsgRequest::parse   7.    //从message中解析出OpMsg所需成员信息  8.    static OpMsgRequest parse(const Message& message) {  9.        //OpMsg::parse  10.        return OpMsgRequest(OpMsg::parse(message));  11.    }  12.    //根据db body extraFields填充OpMsgRequest  13.    static OpMsgRequest fromDBAndBody(... {  14.        OpMsgRequest request;  15.        request.body = ([&] {  16.            //填充request.body  17.            ......  18.        }());  19.        return request;  20.    }  21.    //从body中获取db name  22.    StringData getDatabase() const {  23.        if (auto elem = body["$db"])  24.            return elem.checkAndGetStringData();  25.        uasserted(40571, "OP_MSG requests require a $db argument");  26.    }  27.    //find  insert 等命令信息  body中的第一个elem就是command 名  28.    StringData getCommandName() const {  29.        return body.firstElementFieldName();  30.    }  }; 
复制代码

OpMsgRequest 通过 OpMsg::parse(message)解析出 OpMsg 信息,从而获取到 body 内容,GetCommandName()接口和 getDatabase()则分别从 body 中获取库 DB 信息、命令名信息。通过该类相关接口,命令名(find、write、update 等)和 DB 库都获取到了。

OpMsg 模块除了 OPMSG 相关报文解析外,还负责 OPMSG 报文组装填充,该模块接口功能大全如下表:

5. Mongod 实例服务入口核心代码实现

Mongod 实例服务入口类 ServiceEntryPointMongod 继承 ServiceEntryPointImpl 类,mongod 实例的报文解析处理、命令解析、命令执行都由该类负责处理。ServiceEntryPointMongod 核心接口可以细分为:opCode 解析及回调处理、命令解析及查找、命令执行三个子模块。

5.1 opCode 解析及回调处理

     OpCode 操作码解析及其回调处理由 ServiceEntryPointMongod::handleRequest(...)接口实现,核心代码实现如下:

1.//mongod服务对于客户端请求的处理    2.//通过状态机SSM模块的如下接口调用:ServiceStateMachine::_processMessage  3.DbResponse ServiceEntryPointMongod::handleRequest(OperationContext* opCtx, const Message& m) {  4.    //获取opCode,3.6版本对应客户端默认使用OP_MSG  5.    NetworkOp op = m.operation();   6.    ......  7.    //根据message构造DbMessage  8.    DbMessage dbmsg(m);  9.    //根据操作上下文获取对应的client  10.    Client& c = *opCtx->getClient();    11.    ......  12.    //获取库.表信息,注意只有dbUpdate<opCode<dbDelete的opCode请求才通过dbmsg直接获取库和表信息  13.    const char* ns = dbmsg.messageShouldHaveNs() ? dbmsg.getns() : NULL;  14.    const NamespaceString nsString = ns ? NamespaceString(ns) : NamespaceString();  15.    ....  16.    //CurOp::debug 初始化opDebug,慢日志相关记录  17.    OpDebug& debug = currentOp.debug();  18.    //慢日志阀值  19.    long long logThresholdMs = serverGlobalParams.slowMS;  20.    //时mongodb将记录这次慢操作,1为只记录慢操作,即操作时间大于了设置的配置,2表示记录所有操作    21.    bool shouldLogOpDebug = shouldLog(logger::LogSeverity::Debug(1));  22.    DbResponse dbresponse;  23.    if (op == dbMsg || op == dbCommand || (op == dbQuery && isCommand)) {  24.        //新版本op=dbMsg,因此走这里  25.        //从DB获取数据,获取到的数据通过dbresponse返回  26.        dbresponse = runCommands(opCtx, m);     27.    } else if (op == dbQuery) {  28.        ......   29.        //早期mongodb版本查询走这里  30.        dbresponse = receivedQuery(opCtx, nsString, c, m);  31.    } else if (op == dbGetMore) {    32.        //早期mongodb版本查询走这里  33.        dbresponse = receivedGetMore(opCtx, m, currentOp, &shouldLogOpDebug);  34.    } else {  35.        ......  36.        //早期版本增 删 改走这里处理  37.         if (op == dbInsert) {  38.              receivedInsert(opCtx, nsString, m); //插入操作入口   新版本CmdInsert::runImpl  39.         } else if (op == dbUpdate) {  40.              receivedUpdate(opCtx, nsString, m); //更新操作入口    41.         } else if (op == dbDelete) {  42.              receivedDelete(opCtx, nsString, m); //删除操作入口    43.         }   44.    }  45.    //获取runCommands执行时间,也就是内部处理时间  46.    debug.executionTimeMicros = durationCount<Microseconds>(currentOp.elapsedTimeExcludingPauses());  47.    ......  48.    //慢日志记录  49.    if (shouldLogOpDebug || (shouldSample && debug.executionTimeMicros > logThresholdMs * 1000LL)) {  50.        Locker::LockerInfo lockerInfo;    51.        //OperationContext::lockState  LockerImpl<>::getLockerInfo  52.        opCtx->lockState()->getLockerInfo(&lockerInfo);   53.  54.    //OpDebug::report 记录慢日志到日志文件  55.        log() << debug.report(&c, currentOp, lockerInfo.stats);   56.    }  57.    //各种统计信息  58.    recordCurOpMetrics(opCtx);  59.}  
复制代码

Mongod 的 handleRequest()接口主要完成以下工作:

① 从 Message 中获取 OpCode,早期版本每个命令又对应取值,例如增删改查早期版本分别对应:dbInsert、dbDelete、dbUpdate、dbQuery;Mongodb 3.6 开始,默认请求对应 OpCode 都是 OP_MSG,本文默认只分析 OpCode=OP_MSG 相关的处理。

② 获取本操作对应的 Client 客户端信息。

③ 如果是早期版本,通过 Message 构造 DbMessage,同时解析出库.表信息。

④ 根据不同 OpCode 执行对应回调操作,OP_MSG 对应操作为 runCommands(...),获取的数据通过 dbresponse 返回。

⑤ 获取到 db 层返回的数据后,进行慢日志判断,如果 db 层数据访问超过阀值,记录慢日志。

⑥ 设置 debug 的各种统计信息。

5.2 命令解析及查找

从上面的分析可以看出,接口最后调用 runCommands(...),该接口核心代码实现如下所示:

1.//message解析出对应command执行  2.DbResponse runCommands(OperationContext* opCtx, const Message& message) {  3.    //获取message对应的ReplyBuilder,3.6默认对应OpMsgReplyBuilder  4.    //应答数据通过该类构造  5.    auto replyBuilder = rpc::makeReplyBuilder(rpc::protocolForMessage(message));  6.    [&] {  7.        OpMsgRequest request;  8.        try {  // Parse.  9.            //协议解析 根据message获取对应OpMsgRequest  10.            request = rpc::opMsgRequestFromAnyProtocol(message);  11.        }   12.    }  13.    try {  // Execute.  14.        //opCtx初始化  15.        curOpCommandSetup(opCtx, request);  16.        //command初始化为Null  17.        Command* c = nullptr;  18.        //OpMsgRequest::getCommandName查找  19.        if (!(c = Command::findCommand(request.getCommandName()))) {   20.             //没有找到相应的command的后续异常处理  21.             ......  22.        }  23.        //执行command命令,获取到的数据通过replyBuilder.get()返回  24.        execCommandDatabase(opCtx, c, request, replyBuilder.get());  25.    }  26.    //OpMsgReplyBuilder::done对数据进行序列化操作  27.    auto response = replyBuilder->done();  28.    //responseLength赋值  29.    CurOp::get(opCtx)->debug().responseLength = response.header().dataLen();  30.    // 返回  31.    return DbResponse{std::move(response)};  
复制代码

RunCommands(...)接口从 message 中解析出 OpMsg 信息,然后获取该 OpMsg 对应的 command 命令信息,最后执行该命令对应的后续处理操作。主要功能说明如下:

① 获取该 OpCode 对应 replyBuilder,OP_MSG 操作对应 builder 为 OpMsgReplyBuilder。

② 根据 message 解析出 OpMsgRequest 数据,OpMsgRequest 来中包含了真正的命令请求 bson 信息。

③ opCtx 初始化操作。

④ 通过 request.getCommandName()返回命令信息(如“find”、“update”等字符串)。

⑤ 通过 Command::findCommand(command name)从 CommandMap 这个 map 表中查找是否支持该 command 命令。如果没找到说明不支持,如果找到说明支持。

⑥ 调用 execCommandDatabase(...)执行该命令,并获取命令的执行结果。

⑦ 根据 command 执行结果构造 response 并返回

5.3 命令执行

1.void execCommandDatabase(...) {  2.    ......  3.    //获取dbname  4.    const auto dbname = request.getDatabase().toString();  5.    ......  6.    //mab表存放从bson中解析出的elem信息  7.    StringMap<int> topLevelFields;  8.    //body elem解析  9.    for (auto&& element : request.body) {  10.        //获取bson中的elem信息  11.        StringData fieldName = element.fieldNameStringData();  12.        //如果elem信息重复,则异常处理  13.        ......  14.    }  15.    //如果是help命令,则给出help提示  16.    if (Command::isHelpRequest(helpField)) {  17.        //给出help提示  18.        Command::generateHelpResponse(opCtx, replyBuilder, *command);  19.        return;  20.    }  21.    //权限认证检查,检查该命令执行权限  22.    uassertStatusOK(Command::checkAuthorization(command, opCtx, request));  23.    ......  24.  25.    //该命令执行次数统计  db.serverStatus().metrics.commands可以获取统计信息  26.    command->incrementCommandsExecuted();  27.    //真正的命令执行在这里面  28.    retval = runCommandImpl(opCtx, command, request, replyBuilder, startOperationTime);  29.    //该命令执行失败次数统计  30.    if (!retval) {  31.        command->incrementCommandsFailed();  32.     }  33.     ......  }
复制代码

execCommandDatabase(...)最终调用 RunCommandImpl(...)进行对应命令的真正处理,该接口核心代码实现如下:

1.bool runCommandImpl(...) {  2.    //获取命令请求内容body  3.    BSONObj cmd = request.body;  4.    //获取请求中的DB库信息  5.    const std::string db = request.getDatabase().toString();  6.    //ReadConcern检查  7.    Status rcStatus = waitForReadConcern(  8.        opCtx, repl::ReadConcernArgs::get(opCtx), command->allowsAfterClusterTime(cmd));  9.    //ReadConcern检查不通过,直接异常提示处理  10.    if (!rcStatus.isOK()) {  11.         //异常处理  12.         return;  13.    }  14.    if (!command->supportsWriteConcern(cmd)) {  15.        //命令不支持WriteConcern,但是对应的请求中却带有WriteConcern配置,直接报错不支持  16.        if (commandSpecifiesWriteConcern(cmd)) {  17.            //异常处理"Command does not support writeConcern"  18.            ......  19.            return result;  20.        }  21.    //调用Command::publicRun执行不同命令操作  22.        result = command->publicRun(opCtx, request, inPlaceReplyBob);  23.    }  24.    //提取WriteConcernOptions信息  25.    auto wcResult = extractWriteConcern(opCtx, cmd, db);  26.    //提取异常,直接异常处理  27.    if (!wcResult.isOK()) {  28.        //异常处理  29.        ......  30.        return result;  31.    }  32.    ......  33.    //执行对应的命令Command::publicRun,执行不同命令操作  34.    result = command->publicRun(opCtx, request, inPlaceReplyBob);  35.    ......  36.}
复制代码

     RunCommandImpl(...)接口最终调用该接口入参的 command,执行 command->publicRun(...)接口,也就是命令模块的公共 publicRun。

5.4 总结

Mongod 服务入口首先从 message 中解析出 opCode 操作码,3.6 版本对应客户端默认操作码为 OP_MSQ,解析出该操作对应 OpMsgRequest 信息。然后从 message 原始数据中解析出 command 命令字符串后,继续通过全局 Map 表种查找是否支持该命令操作,如果支持则执行该命令;如果不支持,直接异常打印,同时返回。

6. Mongos 实例服务入口核心代码实现

       mongos 服务入口核心代码实现过程和 mongod 服务入口代码实现流程几乎相同,mongos 实例 message 解析、OP_MSG 操作码处理、command 命令查找等流程和上一章节 mongod 实例处理过程类似,本章节不在详细分析。Mongos 实例服务入口处理调用流程如下:

ServiceEntryPointMongos::handleRequest(...)->Strategy::clientCommand(...)-->runCommand(...)->execCommandClient(...)

最后的接口核心代码实现如下:

1.void runCommand(...) {  2.    ......  3.    //获取请求命令name  4.    auto const commandName = request.getCommandName();  5.    //从全局map表中查找  6.    auto const command = Command::findCommand(commandName);  7.    //没有对应的command存在,抛异常说明不支持该命令  8.    if (!command) {   9.        ......  10.        return;  11.    }   12.    ......  13.    //执行命令  14.    execCommandClient(opCtx, command, request, builder);   15.    ......  16.}  17.18.void execCommandClient(...)  19.{   20.    ......  21.    //认证检查,是否有操作该command命令的权限,没有则异常提示  22.    Status status = Command::checkAuthorization(c, opCtx, request);    23.    if (!status.isOK()) {  24.        Command::appendCommandStatus(result, status);  25.        return;  26.    }  27.    //该命令的执行次数自增,代理上面也是要计数的  28.    c->incrementCommandsExecuted();   29.    //如果需要command统计,则加1  30.    if (c->shouldAffectCommandCounter()) {  31.        globalOpCounters.gotCommand();  32.    }  33.    ......  34.    //有部分命令不支持writeconcern配置,报错  35.    bool supportsWriteConcern = c->supportsWriteConcern(request.body);  36.    //不支持writeconcern又带有该参数的请求,直接异常处理"Command does not support writeConcern"  37.    if (!supportsWriteConcern && !wcResult.getValue().usedDefault) {  38.        ......  39.        return;  40.    }  41.    //执行本命令对应的公共publicRun接口,Command::publicRun  42.    ok = c->publicRun(opCtx, request, result);   43.    ......  44.}  
复制代码
  • Tips: mongos 和 mongod 实例服务入口核心代码实现的一点小区别

① Mongod 实例 opCode 操作码解析、OpMsg 解析、command 查找及对应命令调用处理都由 class ServiceEntryPointMongod{...}类一起完成。

② mongos 实例则把 opCode 操作码解析交由 class ServiceEntryPointMongos{...}类实现,OpMsg 解析、command 查找及对应命令调用处理放到了 clase Strategy{...}类来处理。

7.总结

   Mongodb 报文解析及组装流程总结

① 一个完整 mongodb 报文由通用报文 header 头部+body 部分组成。

② Body 部分内容,根据报文头部的 opCode 来决定不同的 body 内容。

③ 3.6 版本对应客户端请求 opCode 默认为 OP_MSG,该操作码对应 body 部分由 flagBits + sections + checksum 组成,其中 sections 中存放的是真正的命令请求信息,已 bson 数据格式保存。

④ Header 头部和 body 报文体封装及解析过程由 class Message {...}类实现

⑤ Body 中对应 command 命令名、库名、表名的解析在 mongodb(version<3.6)低版本协议中由 class DbMessage {...}类实现

⑥ Body 中对应 command 命令名、库名、表名的解析在 mongodb(version<3.6)低版本协议中由 struct OpMsgRequest{...}结构和 struct OpMsg {...}类实现

 

   Mongos 和 mongod 实例的服务入口处理流程大同小异,整体处理流程如下:

① 从 message 解析出 opCode 操作码,根据不同操作码执行对应操作码回调。

② 根据 message 解析出 OpMsg request 信息,mongodb 报文的命令信息就存储在该 body 中,该 body 已 bson 格式存储。

③ 从 body 中解析出 command 命令字符串信息(如“insert”、“update”等)。

④ 从全局_commands map 表中查找是否支持该命令,如果支持则执行该命令处理,如果不支持则直接报错提示。

⑤ 最终找到对应 command 命令后,执行 command 的功能 run 接口。

   图形化总结如下:

说明:第 3 章的协议解析及封装过程实际上应该算是网络处理模块范畴,本文为了分析 command 命令处理模块方便,把该部分实现归纳到了命令处理模块,这样方便理解。

 

Tips: 下期继续分享不同 command 命令执行细节。

8.遗留问题

     第 1 章节中的统计信息,将在 command 模块核心代码分析完毕后揭晓答案,《mongodb command 命令处理模块源码实现二》中继续分析,敬请关注。


发布于: 2020 年 11 月 25 日阅读数: 836
用户头像

万亿级mongodb集群性能优化实践 2020.10.13 加入

Qcon、Gdevops、dbaplus等讲师,滴滴出行专家工程师/OPPO-mongodb负责人,负责数万亿级mongodb内核研发、性能优化及运维工作。持续分享《MongoDB内核源码设计、性能优化、最佳实践》,https://github.com/y123456yz

评论

发布
暂无评论
mongodb 源码实现系列 - command命令处理模块源码实现一