mongodb 源码实现系列 - 网络传输层模块实现二

发布于: 2020 年 10 月 26 日
mongodb源码实现系列-网络传输层模块实现二

关于作者

 前滴滴出行技术专家,现任OPPO文档数据库mongodb负责人,负责oppo千万级峰值TPS/十万亿级数据量文档数据库mongodb内核研发及运维工作,一直专注于分布式缓存、高性能服务端、数据库、中间件等相关研发。后续持续分享《MongoDB内核源码设计、性能优化、最佳运维实践》,Github账号地址:https://github.com/y123456yz

1. 说明

    mongodb源码实现系列文章有前后逻辑关系,阅读本文前,请提前阅读<<mongodb网络模块源码实现及性能调优一>>

在之前的<<mongodb网络模块源码实现及性能调优一>>一文中分析了如何阅读百万级大工程源码、Asio网络库实现、transport传输层网络模块中线程模型实现,但是由于篇幅原因,传输层网络模块中的以下模块实现原理没有分析,本文降将继续分析遗留的以下子模块:

  1. transport_layer套接字处理及传输层管理子模块

  2. session会话子模块

  3. Ticket数据收发子模块

  4. service_entry_point服务入口点子模块

  5. service_state_machine状态机子模块(该《模块在网络传输层模块源码实现三》中分析)

  6. service_executor线程模型子模块(该《模块在网络传输层模块源码实现四》中分析)

2. transport_layer套接字处理及传输层管理子模块

transport_layer套接字处理及传输层管理子模块功能包括套接字相关初始化处理、结合asio库实现异步accept处理、不同线程模型管理及初始化等,该模块的源码实现主要由以下几个文件实现:

上图是套接字处理及传输层管理子模块源码实现的相关文件,其中mock和test文件主要用于模拟测试等,所以真正核心的代码实现只有下表的几个文件,对应源码文件功能说明如下表所示:

2.1核心代码实现

该子模块核心代码主要由TransportLayerManager类和TransportLayerASIO类相关接口实现。

2.1.1  TransportLayerManager类核心代码实现

TransportLayerManager类主要成员及接口如下:

1.//网络会话链接,消息处理管理相关的类,在createWithConfig构造该类存入_tls  
2.class TransportLayerManager final : public TransportLayer {  
3.    //以下四个接口真正实现在TransportLayerASIO类中具体实现  
4.    Ticket sourceMessage(...) override;  
5.    Ticket sinkMessage(...) override;      
6.    Status wait(Ticket&& ticket) override;  
7.    void asyncWait(...) override;  
8.    //配置初始化实现  
9.    std::unique_ptr<TransportLayer> createWithConfig(...);  
10.  
11.    //createWithConfig中赋值,对应TransportLayerASIO,  
12.    //实际上容器中就一个成员,就是TransportLayerASIO  
13.    std::vector<std::unique_ptr<TransportLayer>> _tls;  
14.};  

TransportLayerManager类包含一个_tls成员,该类最核心的createWithConfig接口代码实现如下:

15.//根据配置构造相应类信息  _initAndListen中调用  
16.std::unique_ptr<TransportLayer> TransportLayerManager::createWithConfig(...) {  
17.    std::unique_ptr<TransportLayer> transportLayer;  
18.    //服务类型,也就是本实例是mongos还是mongod  
19.    //mongos对应ServiceEntryPointMongod,mongod对应ServiceEntryPointMongos  
20.    auto sep = ctx->getServiceEntryPoint();  
21.    //net.transportLayer配置模式,默认asio, legacy模式已淘汰  
22.    if (config->transportLayer == "asio") {  
23.     //同步方式还是异步方式,默认synchronous  
24.        if (config->serviceExecutor == "adaptive") {  
25.         //动态线程池模型,也就是异步模式  
26.            opts.transportMode = transport::Mode::kAsynchronous;  
27.        } else if (config->serviceExecutor == "synchronous") {  
28.            //一个链接一个线程模型,也就是同步模式  
29.            opts.transportMode = transport::Mode::kSynchronous;  
30.        }   
31.     //如果配置是asio,构造TransportLayerASIO类  
32.     auto transportLayerASIO = stdx::make_unique<transport::TransportLayerASIO>(opts, sep);  
33.     if (config->serviceExecutor == "adaptive") { //异步方式  
34.         //构造动态线程模型对应的执行器ServiceExecutorAdaptive  
35.            ctx->setServiceExecutor(stdx::make_unique<ServiceExecutorAdaptive>(  
36.                ctx, transportLayerASIO->getIOContext()));  
37.         } else if (config->serviceExecutor == "synchronous") { //同步方式  
38.            //构造一个链接一个线程模型对应的执行器ServiceExecutorSynchronous  
39.            ctx->setServiceExecutor(stdx::make_unique<ServiceExecutorSynchronous>(ctx));  
40.         }  
41.     //transportLayerASIO转换为transportLayer类  
42.         transportLayer = std::move(transportLayerASIO);  
43.    }   
44.   //transportLayer转存到对应retVector数组中并返回  
45.    std::vector<std::unique_ptr<TransportLayer>> retVector;  
46.    retVector.emplace_back(std::move(transportLayer));  
47.    return stdx::make_unique<TransportLayerManager>(std::move(retVector));  
48.}  

createWithConfig函数根据配置文件来确定对应的TransportLayer,如果net.transportLayer配置为”asio”,则选用TransportLayerASIO类来进行底层的网络IO处理,如果配置为”legacy”,则选用TransportLayerLegacy。”legacy”模式当前已淘汰,本文只分析”asio”模式实现。

“asio”模式包含两种线程模型:adaptive(动态线程模型)和synchronous(同步线程模型)。adaptive模式线程设计采用动态线程方式,线程数和mongodb压力直接相关,如果mongodb压力大,则线程数增加;如果mongodb压力变小,则线程数自动减少。同步线程模式也就是一个链接一个线程模型,线程数的多少和链接数的多少成正比,链接数越多则线程数也越大。

Mongodb内核实现中通过opts.transportMode来标记asio的线程模型,这两种模型对应标记如下:

     说明:adaptive线程模型被标记为KAsynchronous,synchronous被标记为KSynchronous是有原因的,adaptive动态线程模型网络IO处理借助epoll异步实现,而synchronous一个链接一个线程模型网络IO处理是同步读写操作。Mongodb网络线程模型具体实现及各种优缺点可以参考:Mongodb网络传输处理源码实现及性能调优-体验内核性能极致设计

2.1.2 TransportLayerASIO类核心代码实现

TransportLayerASIO类核心成员及接口如下:

1.class TransportLayerASIO final : public TransportLayer {  
2.    //以下四个接口主要和套接字数据读写相关  
3.    Ticket sourceMessage(...);  
4.    Ticket sinkMessage(...);  
5.    Status wait(Ticket&& ticket);  
6.    void asyncWait(Ticket&& ticket, TicketCallback callback);  
7.    void end(const SessionHandle& session);  
8.    //新链接处理  
9.    void _acceptConnection(GenericAcceptor& acceptor);  
10.      
11.    //adaptive线程模型网络IO上下文处理  
12.    std::shared_ptr<asio::io_context> _workerIOContext;   
13.    //accept接收客户端链接对应的IO上下文  
14.    std::unique_ptr<asio::io_context> _acceptorIOContext;    
15.    //bindIp配置中的ip地址列表,用于bind监听,accept客户端请求  
16.    std::vector<std::pair<SockAddr, GenericAcceptor>> _acceptors;  
17.    //listener线程负责接收客户端新链接  
18.    stdx::thread _listenerThread;  
19.    //服务类型,也就是本实例是mongos还是mongod  
20.    //mongos对应ServiceEntryPointMongod,mongod对应ServiceEntryPointMongos  
21.    ServiceEntryPoint* const _sep = nullptr;  
22.    //当前运行状态  
23.    AtomicWord<bool> _running{false};  
24.    //listener处理相关的配置信息  
25.    Options _listenerOptions;  
26.}  

从上面的类结构可以看出,该类主要通过listenerThread线程完成bind绑定及listen监听操作,同时部分接口实现新连接上的数据读写。

套接字初始化代码实现如下:

1.Status TransportLayerASIO::setup() {  
2.    std::vector<std::string> listenAddrs;  
3. //如果没有配置bindIp,则默认监听"127.0.0.1:27017"
4.    if (_listenerOptions.ipList.empty()) {  
5.        listenAddrs = {"127.0.0.1"};  
6.    } else {  
7.        //配置文件中的bindIp:1.1.1.1,2.2.2.2,以逗号分隔符获取ip列表存入ipList  
8.        boost::split(listenAddrs, _listenerOptions.ipList, boost::is_any_of(","), boost::token_compress_on);  
9.    }  
10.    //遍历ip地址列表  
11.    for (auto& ip : listenAddrs) {  
12.     //根据IP和端口构造对应SockAddr结构  
13.        const auto addrs = SockAddr::createAll(  
14.            ip, _listenerOptions.port, _listenerOptions.enableIPv6 ? AF_UNSPEC : AF_INET);  
15.        ......  
16.        //根据addr构造endpoint  
17.        asio::generic::stream_protocol::endpoint endpoint(addr.raw(), addr.addressSize);  
18.     //_acceptorIOContext和_acceptors关联  
19.        GenericAcceptor acceptor(*_acceptorIOContext);  
20.     //epoll注册,也就是fd和epoll关联  
21.        //basic_socket_acceptor::open  
22.        acceptor.open(endpoint.protocol());   
23.     //SO_REUSEADDR配置 basic_socket_acceptor::set_option  
24.        acceptor.set_option(GenericAcceptor::reuse_address(true));  
25.     //非阻塞设置 basic_socket_acceptor::non_blocking  
26.        acceptor.non_blocking(true, ec);    
27.        //bind绑定    
28.        acceptor.bind(endpoint, ec);   
29.        if (ec) {  
30.            return errorCodeToStatus(ec);  
31.        }  
32.    }  
}

从上面的分析可以看出,代码实现首先解析出配置文件中bindIP中的ip:port列表,然后遍历列表绑定所有服务端需要监听的ip:port,每个ip:port对应一个GenericAcceptor ,所有acceptor和全局accept IO上下文_acceptorIOContext关联,同时bind()绑定所有ip:port。

      Bind()绑定所有配置文件中的Ip:port后,然后通过TransportLayerASIO::start()完成后续处理,该接口代码实现如下:

1.//_initAndListen中调用执行   
2.Status TransportLayerASIO::start() //listen线程处理  
3.    ......  
4.    //这里专门起一个线程做listen相关的accept事件处理  
5.    _listenerThread = stdx::thread([this] {  
6.        //修改线程名  
7.        setThreadName("listener");   
8.        //该函数中循环处理accept事件  
9.        while (_running.load()) {  
10.            asio::io_context::work work(*_acceptorIOContext);   
11.            try {  
12.                //accept事件调度处理  
13.         _acceptorIOContext->run();    
14.            } catch (...) { //异常处理  
15.                severe() << "Uncaught exception in the listener: " << exceptionToStatus();  
16.                fassertFailed(40491);  
17.            }  
18.        }  
19.    });   
20.   遍历_acceptors,进行listen监听处理  
21.   for (auto& acceptor : _acceptors) {   
22.        acceptor.second.listen(serverGlobalParams.listenBacklog);  
23.     //异步accept回调注册在该函数中  
24.        _acceptConnection(acceptor.second);       
25.    }  
26.}

从上面的TransportLayerASIO::start()接口可以看出,mongodb特地创建了一个listener线程用于客户端accept事件处理,然后借助ASIO网络库的acceptorIOContext->run()接口来调度,当有新链接到来的时候,就会执行相应的accept回调处理,accept回调注册到iocontext的流程由acceptConnection()完成,该接口核心源码实现如下:

1.//accept新连接到来的回调注册 
2.void TransportLayerASIO::_acceptConnection(GenericAcceptor& acceptor) {  
3.     //新链接到来时候的回调函数,服务端接收到新连接都会执行该回调
4. //注意这里面是递归执行,保证所有accept事件都会一次处理完毕
5.    auto acceptCb = [this, &acceptor](const std::error_code& ec, GenericSocket peerSocket) mutable {  
6.        if (!_running.load())  
7.            return;  
8.  
9.        ......  
10.     //每个新的链接都会new一个新的ASIOSession  
11.        std::shared_ptr<ASIOSession> session(new ASIOSession(thisstd::move(peerSocket)));  
12.     //新的链接处理ServiceEntryPointImpl::startSession,  
13.        //和ServiceEntryPointImpl服务入口点模块关联起来  
14.        _sep->startSession(std::move(session));  
15.        //递归,直到处理完所有的网络accept事件  
16.        _acceptConnection(acceptor);   
17.    };  
18.    //accept新连接到来后服务端的回调处理在这里注册  
19.    acceptor.async_accept(*_workerIOContext, std::move(acceptCb));  
20.}  

TransportLayerASIO::_acceptConnection的新连接处理过程借助ASIO库实现,通过acceptor.async_accept实现所有监听的acceptor回调异步注册。

当服务端接收到客户端新连接事件通知后,会触发执行acceptCb()回调,该回调中底层ASIO库通过 epoll_wait获取到所有的accept事件,每获取到一个accept事件就代表一个新的客户端链接,然后调用ServiceEntryPointImpl::startSession()接口处理这个新的链接事件,整个过程递归执行,保证一次可以处理所有的客户端accept请求信息。

每个链接都会构造一个唯一的session信息,该session就代表一个唯一的新连接,链接和session一一对应。此外,最终会调用ServiceEntryPointImpl::startSession()进行真正的accept()处理,从而获取到一个新的链接。

注意:TransportLayerASIO::_acceptConnection()中实现了TransportLayerASIO类和ServiceEntryPointImpl类的关联,这两个类在该接口实现了关联。

此外,从前面的TransportLayerASIO类结构中可以看出,该类还包含如下四个接口:sourceMessage(...)、sinkMessage(...)、wait(Ticket&& ticket)、asyncWait(Ticket&& ticket, TicketCallback callback),这四个接口入参都和Ticket数据分发子模块相关联,具体核心代码实现如下:

1.//根据asioSession, expiration, message三个信息构造数据接收类ASIOSourceTicket  
2.Ticket TransportLayerASIO::sourceMessage(...) {  
3.    ......  
4.    auto asioSession = checked_pointer_cast<ASIOSession>(session);  
5.    //根据asioSession, expiration, message三个信息构造ASIOSourceTicket  
6.    auto ticket = stdx::make_unique<ASIOSourceTicket>(asioSession, expiration, message);  
7.    return {thisstd::move(ticket)};  
8.}  
9.  
10.//根据asioSession, expiration, message三个信息构造数据发送类ASIOSinkTicket  
11.Ticket TransportLayerASIO::sinkMessage(...) {  
12.    auto asioSession = checked_pointer_cast<ASIOSession>(session);  
13.    auto ticket = stdx::make_unique<ASIOSinkTicket>(asioSession, expiration, message);  
14.    return {thisstd::move(ticket)};  
15.}  
16.  
17.//同步接收或者发送,最终调用ASIOSourceTicket::fill 或者 ASIOSinkTicket::fill  
18.Status TransportLayerASIO::wait(Ticket&& ticket) {  
19.    //获取对应Ticket,接收对应ASIOSourceTicket,发送对应ASIOSinkTicket  
20.    auto ownedASIOTicket = getOwnedTicketImpl(std::move(ticket));  
21.    auto asioTicket = checked_cast<ASIOTicket*>(ownedASIOTicket.get());  
22.    ......  
23.    //调用对应fill接口 同步接收ASIOSourceTicket::fill 或者 同步发送ASIOSinkTicket::fill  
24.    asioTicket->fill(true, [&waitStatus](Status result) { waitStatus = result; });  
25.    return waitStatus;  
26.}  
27.//异步接收或者发送,最终调用ASIOSourceTicket::fill 或者 ASIOSinkTicket::fill  
28.void TransportLayerASIO::asyncWait(Ticket&& ticket, TicketCallback callback) {  
29.    //获取对应数据收发的Ticket,接收对应ASIOSourceTicket,发送对应ASIOSinkTicket  
30.    auto ownedASIOTicket = std::shared_ptr<TicketImpl>(getOwnedTicketImpl(std::move(ticket)));  
31.    auto asioTicket = checked_cast<ASIOTicket*>(ownedASIOTicket.get());  
32.  
33.   //调用对应ASIOTicket::fill  
34.    asioTicket->fill(  
35.        false,   [ callback = std::move(callback),  
36.        ownedASIOTicket = std::move(ownedASIOTicket) ](Status status) { callback(status); });  
37.}  

上面四个接口中的前两个接口主要通过Session, expiration, message这三个参数来获取对应的Ticket 信息,实际上mongodb内核实现中把接收数据的Ticket和发送数据的Ticket分别用不同的继承类ASIOSourceTicket和ASIOSinkTicket来区分,三个参数的作用如下表所示:

数据收发包括同步收发和异步收发,同步收发通过TransportLayerASIO::wait()实现,异步收发通过TransportLayerASIO::asyncWait()实现。

注意:以上四个接口把TransportLayerASIO类和Ticket 数据收发类的关联。   

2.2总结



     transport_layer套接字处理及传输层管理子模块主要由transport_layer_manager和transport_layer_asio两个核心类组成,这两个类的核心接口功能总结如下表所示:

Transport_layer_manager中初始化TransportLayer和serviceExecutor,net.TransportLayer配置可以为legacy和asio,其中legacy已经淘汰,当前内核只支持asio模式。asio配置对应的TransportLayer由TransportLayerASIO实现,对应的serviceExecutor线程模型可以是adaptive动态线程模型,也可以是synchronous同步线程模型。

套接字创建、bind()绑定、listen()监听、accept事件注册等都由本类实现,同时数据分发Ticket模块也与本模块关联,一起配合完成整个后续Ticket模块模块的同步及异步数据读写流程。此外,本模块还通过ServiceEntryPoint服务入口子模块联动,保证了套接字初始化、accept事件注册完成后,服务入口子模块能有序的进行新连接接收处理。

接下来继续分析本模块相关联的ServiceEntryPoint服务入口子模块和Ticket数据分发子模块实现。

3. service_entry_point服务入口点子模块

service_entry_point服务入口点子模块主要负责如下功能:新连接处理、Session会话管理、接收到一个完整报文后的回调处理(含报文解析、认证、引擎层处理等)。

该模块的源码实现主要包含以下几个文件:

     service_entry_point开头的代码文件都和本模块相关,其中service_entry_point_utils*负责工作线程创建,service_entry_point_impl*完成新链接回调处理及sesseion会话管理。

3.1核心源码实现

     服务入口子模块相关代码实现比较简洁,主要由ServiceEntryPointImpl类和service_entry_point_utils中的线程创建函数组成。

3.1.1 ServiceEntryPointImpl类核心代码实现

ServiceEntryPointImpl类主要成员和接口如下:

1.class ServiceEntryPointImpl : public ServiceEntryPoint {  
2.    MONGO_DISALLOW_COPYING(ServiceEntryPointImpl);  
3.public:  
4.    //构造函数  
5.    explicit ServiceEntryPointImpl(ServiceContext* svcCtx);     
6.    //以下三个接口进行session会话处理控制  
7.    void startSession(transport::SessionHandle session) final;  
8.    void endAllSessions(transport::Session::TagMask tags) final;  
9.    bool shutdown(Milliseconds timeout) final;  
10.    //session会话统计  
11.    Stats sessionStats() const final;  
12.    ......  
13.private:  
14.    //该list结构管理所有的ServiceStateMachine信息  
15.    using SSMList = stdx::list<std::shared_ptr<ServiceStateMachine>>;  
16.    //SSMList对应的迭代器  
17.    using SSMListIterator = SSMList::iterator;  
18.    //赋值ServiceEntryPointImpl::ServiceEntryPointImpl  
19.    //对应ServiceContextMongoD(mongod)或者ServiceContextNoop(mongos)类  
20.    ServiceContext* const _svcCtx;   
21.    //该成员变量在代码中没有使用  
22.    AtomicWord<std::size_t> _nWorkers;  
23.    //锁  
24.    mutable stdx::mutex _sessionsMutex;  
25.    //一个新链接对应一个ssm保存到ServiceEntryPointImpl._sessions中  
26.    SSMList _sessions;  
27.    //最大链接数控制  
28.    size_t _maxNumConnections{DEFAULT_MAX_CONN};  
29.    //当前的总链接数,不包括关闭的链接  
30.    AtomicWord<size_t> _currentConnections{0};  
31.    //所有的链接,包括已经关闭的链接  
32.    AtomicWord<size_t> _createdConnections{0};  
33.};  

该类的几个接口主要是session相关控制处理,该类中的变量成员说明如下:



ServiceEntryPointImpl类最核心的startSession()接口负责每个新连接到来后的内部回调处理,具体实现如下:

1.//新链接到来后的回调处理  
2.void ServiceEntryPointImpl::startSession(transport::SessionHandle session) {   
3.    //获取该新连接对应的服务端和客户端地址信息  
4.    const auto& remoteAddr = session->remote().sockAddr();  
5.    const auto& localAddr = session->local().sockAddr();  
6.    //服务端和客户端地址记录到session中  
7.    auto restrictionEnvironment =  stdx::make_unique<RestrictionEnvironment>(*remoteAddr, *localAddr);  
8.    RestrictionEnvironment::set(session, std::move(restrictionEnvironment));  
9.    ......  
10.  
11.    //获取transportMode,kAsynchronous或者kSynchronous  
12.    auto transportMode = _svcCtx->getServiceExecutor()->transportMode();  
13.    //构造ssm  
14.    auto ssm = ServiceStateMachine::create(_svcCtx, session, transportMode);  
15.    {//该{}体内实现链接计数,同时把ssm统一添加到_sessions列表管理  
16.        stdx::lock_guard<decltype(_sessionsMutex)lk(_sessionsMutex);  
17.        connectionCount = _sessions.size() + 1//连接数自增  
18.        if (connectionCount <= _maxNumConnections) {  
19.            //新来的链接对应的session保存到_sessions链表    
20.            //一个新链接对应一个ssm保存到ServiceEntryPointImpl._sessions中  
21.            ssmIt = _sessions.emplace(_sessions.begin(), ssm);  
22.            _currentConnections.store(connectionCount);  
23.            _createdConnections.addAndFetch(1);  
24.        }  
25.    }  
26.    //链接超限,直接退出  
27.    if (connectionCount > _maxNumConnections) {   
28.        ......  
29.        return;  
30.    }  
31.    //链接关闭的回收处理  
32.    ssm->setCleanupHook([ this, ssmIt, session = std::move(session) ] {  
33.         ......  
34.    });  
35.    //获取transport模式为同步模式还是异步模式,也就是adaptive线程模式还是synchronous线程模式  
36.    auto ownership = ServiceStateMachine::Ownership::kOwned;  
37.    if (transportMode == transport::Mode::kSynchronous) {  
38.        ownership = ServiceStateMachine::Ownership::kStatic;  
39.    }  
40.    //ServiceStateMachine::start,这里和服务状态机模块衔接起来  
41.    ssm->start(ownership);  
42.}  

     该接口拿到该链接对应的服务端和客户端地址后,记录到该链接对应session中,然后根据该session、transportMode、_svcCtx构建一个服务状态机ssm(ServiceStateMachine)。一个新链接对应一个唯一session,一个session对应一个唯一的服务状态机ssm,这三者保持唯一的一对一关系。

      最终,startSession()让服务入口子模块、session会话子模块、ssm状态机子模块关联起来。   

3.1.2  service_entry_point_utils核心代码实现

service_entry_point_utils源码文件只有launchServiceWorkerThread一个接口,该接口主要负责工作线程创建,并设置每个工作线程的线程栈大小,如果系统默认栈大于1M,则每个工作线程的线程栈大小设置为1M,如果系统栈大小小于1M,则以系统堆栈大小为准,同时warning打印提示。该函数实现如下:

1.Status launchServiceWorkerThread(stdx::function<void()> task) {  
2.        static const size_t kStackSize = 1024 * 1024;  //1M  
3.        struct rlimit limits;  
4.        //或者系统堆栈大小  
5.        invariant(getrlimit(RLIMIT_STACK, &limits) == 0);  
6.        //如果系统堆栈大小大于1M,则默认设置线程栈大小为1M  
7.        if (limits.rlim_cur > kStackSize) {  
8.            size_t stackSizeToSet = kStackSize;  
9.            int failed = pthread_attr_setstacksize(&attrs, stackSizeToSet);  
10.            if (failed) {  
11.                const auto ewd = errnoWithDescription(failed);  
12.                warning() << "pthread_attr_setstacksize failed: " << ewd;  
13.            }  
14.        } else if (limits.rlim_cur < 1024 * 1024) {  
15.            //如果系统栈大小小于1M,则已系统堆栈为准,同时给出告警  
16.            warning() << "Stack size set to " << (limits.rlim_cur / 1024) << "KB. We suggest 1MB";  
17.        }}  
18.        ......  
19.        //task参数传递给新建线程  
20.        auto ctx = stdx::make_unique<stdx::function<void()>>(std::move(task));  
21.        int failed = pthread_create(&thread, &attrs, runFunc, ctx.get());   
22.        ......  
23.

3.2总结

service_entry_point服务入口点子模块主要负责新连接后的回调处理及工作线程创建,该模块和后续的session会话模块、SSM服务状态机模块衔接配合,完成数据收发的正常逻辑转换处理。上面的分析只列出了服务入口点子模块的核心接口实现,下表总结该模块所有的接口功能:

3. Ticket数据收发子模块

     Ticket数据收发子模块主要功能如下:调用session子模块进行底层asio库处理、拆分数据接收和数据发送到两个类、完整mongodb报文读取 、接收或者发送mongodb报文后的回调处理。

3.1 ASIOTicket类核心代码实现

Ticket数据收发模块相关实现主要由ASIOTicket类完成,该类结构如下:

1.//下面的ASIOSinkTicket和ASIOSourceTicket继承该类,用于控制数据的发送和接收  
2.class TransportLayerASIO::ASIOTicket : public TicketImpl {  
3.public:  
4.    //初始化构造  
5.    explicit ASIOTicket(const ASIOSessionHandle& session, Date_t expiration);  
6.    //获取sessionId  
7.    SessionId sessionId() const final {  
8.        return _sessionId;  
9.    }  
10.    //asio模式没用,针对legacy模型  
11.    Date_t expiration() const final {  
12.        return _expiration;  
13.    }  
14.
15.    //以下四个接口用于数据收发相关处理  
16.    void fill(bool sync, TicketCallback&& cb);  
17.protected:  
18.    void finishFill(Status status);  
19.    bool isSync() const;  
20.    virtual void fillImpl() 0;  
21.private:  
22.    //会话信息,一个链接一个session  
23.    std::weak_ptr<ASIOSession> _session;  
24.    //每个session有一个唯一id  
25.    const SessionId _sessionId;  
26.    //asio模型没用,针对legacy生效  
27.    const Date_t _expiration;  
28.    //数据发送或者接收成功后的回调处理  
29.    TicketCallback _fillCallback;  
30.    //同步方式还是异步方式进行数据处理,默认异步  
31.    bool _fillSync;  
32.};  

该类保护多个成员变量,这些成员变量功能说明如下:



mongodb在具体实现上,数据接收和数据发送分开实现,分别是数据接收类ASIOSourceTicket和数据发送类ASIOSinkTicket,这两个类都继承自ASIOTicket类,这两个类的主要结构如下:

1.//数据接收的ticket  
2.class TransportLayerASIO::ASIOSourceTicket : public TransportLayerASIO::ASIOTicket {  
3.public:  
4.    //初始化构造  
5.    ASIOSourceTicket(const ASIOSessionHandle& session, Date_t expiration, Message* msg);  
6.protected:  
7.    //数据接收Impl  
8.    void fillImpl() final;  
9.private:  
10.    //接收到mongodb头部数据后的回调处理  
11.    void _headerCallback(const std::error_code& ec, size_t size);  
12.    //接收到mongodb包体数据后的回调处理    
13.    void _bodyCallback(const std::error_code& ec, size_t size);  
14.  
15.    //存储数据的buffer,网络IO读取到的原始数据内容  
16.    SharedBuffer _buffer;  
17.    //数据Message管理,数据来源为_buffer  
18.    Message* _target;  
19.};  
1.
2.  
20.//数据发送的ticket  
21.class TransportLayerASIO::ASIOSinkTicket : public TransportLayerASIO::ASIOTicket {  
22. public:  
23.    //初始化构造  
24.    ASIOSinkTicket(const ASIOSessionHandle& session, Date_t expiration, const Message& msg);  
25.protected:  
26. //数据发送Impl  
27.    void fillImpl() final;  
28.private:  
29.    //发送数据完成的回调处理  
30.    void _sinkCallback(const std::error_code& ec, size_t size);  
31.    //需要发送的数据message信息  
32.    Message _msgToSend;  
33.}; 

    从上面的代码实现可以看出,ASIOSinkTicket 和ASIOSourceTicket 类接口及成员实现几乎意义,只是具体的实现方法不同,下面对ASIOSourceTicket和ASIOSinkTicket 相关核心代码实现进行分析。

3.1.2 ASIOSourceTicket 数据接收核心代码实现

数据接收过程核心代码如下:

1.//数据接收的fillImpl接口实现  
2.void TransportLayerASIO::ASIOSourceTicket::fillImpl() {    
3.    //获取对应session信息  
4.    auto session = getSession();  
5.    if (!session)  
6.        return;  
7.    //收到读取mongodb头部数据,头部数据长度是固定的kHeaderSize字节  
8.    const auto initBufSize = kHeaderSize;  
9.    _buffer = SharedBuffer::allocate(initBufSize);  
10.  
11.    //调用TransportLayerASIO::ASIOSession::read读取底层数据存入_buffer  
12.    //读完头部数据后执行对应的_headerCallback回调函数  
13.    session->read(isSync(),  
14.                  asio::buffer(_buffer.get(), initBufSize), //先读取头部字段出来  
15.                  [this](const std::error_code& ec, size_t size) { _headerCallback(ec, size); });  
16.}  
17.  
18.//读取到mongodb header头部信息后的回调处理  
19.void TransportLayerASIO::ASIOSourceTicket::_headerCallback(const std::error_code& ec, size_t size) {  
20.    ......  
21.    //获取session信息  
22.    auto session = getSession();  
23.    if (!session)  
24.        return;  
25.    //从_buffer中获取头部信息  
26.    MSGHEADER::View headerView(_buffer.get());  
27.    //获取message长度  
28.    auto msgLen = static_cast<size_t>(headerView.getMessageLength());  
29.    //长度太小或者太大,直接报错  
30.    if (msgLen < kHeaderSize || msgLen > MaxMessageSizeBytes) {  
31.        .......  
32.        return;  
33.    }  
34.    ....  
35.   //内容还不够一个mongo协议报文,继续读取body长度字节的数据,读取完毕后开始body处理  
36.   //注意这里是realloc,保证头部和body在同一个buffer中  
37.    _buffer.realloc(msgLen);   
38.    MsgData::View msgView(_buffer.get());  
39.  
40.    //调用底层TransportLayerASIO::ASIOSession::read读取数据body   
41.    session->read(isSync(),  
42.      //数据读取到该buffer                  
43.      asio::buffer(msgView.data(), msgView.dataLen()),  
44.      //读取成功后的回调处理  
45.      [this](const std::error_code& ec, size_t size) { _bodyCallback(ec, size); });  
46.}  
47.  
48.//_headerCallback对header读取后解析header头部获取到对应的msg长度,然后开始body部分处理  
49.void TransportLayerASIO::ASIOSourceTicket::_bodyCallback(const std::error_code& ec, size_t size) {  
50.    ......  
51.    //buffer转存到_target中  
52.    _target->setData(std::move(_buffer));  
53.    //流量统计  
54.    networkCounter.hitPhysicalIn(_target->size());  
55.    //TransportLayerASIO::ASIOTicket::finishFill    
56.    finishFill(Status::OK()); //包体内容读完后,开始下一阶段的处理    
57.    //报文读取