关于作者
前滴滴出行技术专家,现任 OPPO 文档数据库 mongodb 负责人,负责 oppo 千万级峰值 TPS/十万亿级数据量文档数据库 mongodb 内核研发及运维工作,一直专注于分布式缓存、高性能服务端、数据库、中间件等相关研发。后续持续分享《MongoDB 内核源码设计、性能优化、最佳运维实践》,Github 账号地址:https://github.com/y123456yz
1. 说明
mongodb 源码实现系列文章有前后逻辑关系,阅读本文前,请提前阅读<<mongodb 网络模块源码实现及性能调优一>>
在之前的<<mongodb 网络模块源码实现及性能调优一>>一文中分析了如何阅读百万级大工程源码、Asio 网络库实现、transport 传输层网络模块中线程模型实现,但是由于篇幅原因,传输层网络模块中的以下模块实现原理没有分析,本文降将继续分析遗留的以下子模块:
transport_layer 套接字处理及传输层管理子模块
session 会话子模块
Ticket 数据收发子模块
service_entry_point 服务入口点子模块
service_state_machine 状态机子模块(该《模块在网络传输层模块源码实现三》中分析)
service_executor 线程模型子模块(该《模块在网络传输层模块源码实现四》中分析)
2. transport_layer 套接字处理及传输层管理子模块
transport_layer 套接字处理及传输层管理子模块功能包括套接字相关初始化处理、结合 asio 库实现异步 accept 处理、不同线程模型管理及初始化等,该模块的源码实现主要由以下几个文件实现:
上图是套接字处理及传输层管理子模块源码实现的相关文件,其中 mock 和 test 文件主要用于模拟测试等,所以真正核心的代码实现只有下表的几个文件,对应源码文件功能说明如下表所示:
2.1 核心代码实现
该子模块核心代码主要由 TransportLayerManager 类和 TransportLayerASIO 类相关接口实现。
2.1.1 TransportLayerManager 类核心代码实现
TransportLayerManager 类主要成员及接口如下:
1.//网络会话链接,消息处理管理相关的类,在createWithConfig构造该类存入_tls 2.class TransportLayerManager final : public TransportLayer { 3. //以下四个接口真正实现在TransportLayerASIO类中具体实现 4. Ticket sourceMessage(...) override; 5. Ticket sinkMessage(...) override; 6. Status wait(Ticket&& ticket) override; 7. void asyncWait(...) override; 8. //配置初始化实现 9. std::unique_ptr<TransportLayer> createWithConfig(...); 10. 11. //createWithConfig中赋值,对应TransportLayerASIO, 12. //实际上容器中就一个成员,就是TransportLayerASIO 13. std::vector<std::unique_ptr<TransportLayer>> _tls; 14.};
复制代码
TransportLayerManager 类包含一个_tls 成员,该类最核心的 createWithConfig 接口代码实现如下:
15.//根据配置构造相应类信息 _initAndListen中调用 16.std::unique_ptr<TransportLayer> TransportLayerManager::createWithConfig(...) { 17. std::unique_ptr<TransportLayer> transportLayer; 18. //服务类型,也就是本实例是mongos还是mongod 19. //mongos对应ServiceEntryPointMongod,mongod对应ServiceEntryPointMongos 20. auto sep = ctx->getServiceEntryPoint(); 21. //net.transportLayer配置模式,默认asio, legacy模式已淘汰 22. if (config->transportLayer == "asio") { 23. //同步方式还是异步方式,默认synchronous 24. if (config->serviceExecutor == "adaptive") { 25. //动态线程池模型,也就是异步模式 26. opts.transportMode = transport::Mode::kAsynchronous; 27. } else if (config->serviceExecutor == "synchronous") { 28. //一个链接一个线程模型,也就是同步模式 29. opts.transportMode = transport::Mode::kSynchronous; 30. } 31. //如果配置是asio,构造TransportLayerASIO类 32. auto transportLayerASIO = stdx::make_unique<transport::TransportLayerASIO>(opts, sep); 33. if (config->serviceExecutor == "adaptive") { //异步方式 34. //构造动态线程模型对应的执行器ServiceExecutorAdaptive 35. ctx->setServiceExecutor(stdx::make_unique<ServiceExecutorAdaptive>( 36. ctx, transportLayerASIO->getIOContext())); 37. } else if (config->serviceExecutor == "synchronous") { //同步方式 38. //构造一个链接一个线程模型对应的执行器ServiceExecutorSynchronous 39. ctx->setServiceExecutor(stdx::make_unique<ServiceExecutorSynchronous>(ctx)); 40. } 41. //transportLayerASIO转换为transportLayer类 42. transportLayer = std::move(transportLayerASIO); 43. } 44. //transportLayer转存到对应retVector数组中并返回 45. std::vector<std::unique_ptr<TransportLayer>> retVector; 46. retVector.emplace_back(std::move(transportLayer)); 47. return stdx::make_unique<TransportLayerManager>(std::move(retVector)); 48.}
复制代码
createWithConfig 函数根据配置文件来确定对应的 TransportLayer,如果 net.transportLayer 配置为”asio”,则选用 TransportLayerASIO 类来进行底层的网络 IO 处理,如果配置为”legacy”,则选用 TransportLayerLegacy。”legacy”模式当前已淘汰,本文只分析”asio”模式实现。
“asio”模式包含两种线程模型:adaptive(动态线程模型)和 synchronous(同步线程模型)。adaptive 模式线程设计采用动态线程方式,线程数和 mongodb 压力直接相关,如果 mongodb 压力大,则线程数增加;如果 mongodb 压力变小,则线程数自动减少。同步线程模式也就是一个链接一个线程模型,线程数的多少和链接数的多少成正比,链接数越多则线程数也越大。
Mongodb 内核实现中通过 opts.transportMode 来标记 asio 的线程模型,这两种模型对应标记如下:
说明:adaptive 线程模型被标记为 KAsynchronous,synchronous 被标记为 KSynchronous 是有原因的,adaptive 动态线程模型网络 IO 处理借助 epoll 异步实现,而 synchronous 一个链接一个线程模型网络 IO 处理是同步读写操作。Mongodb 网络线程模型具体实现及各种优缺点可以参考:Mongodb网络传输处理源码实现及性能调优-体验内核性能极致设计
2.1.2 TransportLayerASIO 类核心代码实现
TransportLayerASIO 类核心成员及接口如下:
1.class TransportLayerASIO final : public TransportLayer { 2. //以下四个接口主要和套接字数据读写相关 3. Ticket sourceMessage(...); 4. Ticket sinkMessage(...); 5. Status wait(Ticket&& ticket); 6. void asyncWait(Ticket&& ticket, TicketCallback callback); 7. void end(const SessionHandle& session); 8. //新链接处理 9. void _acceptConnection(GenericAcceptor& acceptor); 10. 11. //adaptive线程模型网络IO上下文处理 12. std::shared_ptr<asio::io_context> _workerIOContext; 13. //accept接收客户端链接对应的IO上下文 14. std::unique_ptr<asio::io_context> _acceptorIOContext; 15. //bindIp配置中的ip地址列表,用于bind监听,accept客户端请求 16. std::vector<std::pair<SockAddr, GenericAcceptor>> _acceptors; 17. //listener线程负责接收客户端新链接 18. stdx::thread _listenerThread; 19. //服务类型,也就是本实例是mongos还是mongod 20. //mongos对应ServiceEntryPointMongod,mongod对应ServiceEntryPointMongos 21. ServiceEntryPoint* const _sep = nullptr; 22. //当前运行状态 23. AtomicWord<bool> _running{false}; 24. //listener处理相关的配置信息 25. Options _listenerOptions; 26.}
复制代码
从上面的类结构可以看出,该类主要通过 listenerThread 线程完成 bind 绑定及 listen 监听操作,同时部分接口实现新连接上的数据读写。
套接字初始化代码实现如下:
1.Status TransportLayerASIO::setup() { 2. std::vector<std::string> listenAddrs; 3. //如果没有配置bindIp,则默认监听"127.0.0.1:27017"4. if (_listenerOptions.ipList.empty()) { 5. listenAddrs = {"127.0.0.1"}; 6. } else { 7. //配置文件中的bindIp:1.1.1.1,2.2.2.2,以逗号分隔符获取ip列表存入ipList 8. boost::split(listenAddrs, _listenerOptions.ipList, boost::is_any_of(","), boost::token_compress_on); 9. } 10. //遍历ip地址列表 11. for (auto& ip : listenAddrs) { 12. //根据IP和端口构造对应SockAddr结构 13. const auto addrs = SockAddr::createAll( 14. ip, _listenerOptions.port, _listenerOptions.enableIPv6 ? AF_UNSPEC : AF_INET); 15. ...... 16. //根据addr构造endpoint 17. asio::generic::stream_protocol::endpoint endpoint(addr.raw(), addr.addressSize); 18. //_acceptorIOContext和_acceptors关联 19. GenericAcceptor acceptor(*_acceptorIOContext); 20. //epoll注册,也就是fd和epoll关联 21. //basic_socket_acceptor::open 22. acceptor.open(endpoint.protocol()); 23. //SO_REUSEADDR配置 basic_socket_acceptor::set_option 24. acceptor.set_option(GenericAcceptor::reuse_address(true)); 25. //非阻塞设置 basic_socket_acceptor::non_blocking 26. acceptor.non_blocking(true, ec); 27. //bind绑定 28. acceptor.bind(endpoint, ec); 29. if (ec) { 30. return errorCodeToStatus(ec); 31. } 32. } }
复制代码
从上面的分析可以看出,代码实现首先解析出配置文件中 bindIP 中的 ip:port 列表,然后遍历列表绑定所有服务端需要监听的 ip:port,每个 ip:port 对应一个 GenericAcceptor ,所有 acceptor 和全局 accept IO 上下文_acceptorIOContext 关联,同时 bind()绑定所有 ip:port。
Bind()绑定所有配置文件中的 Ip:port 后,然后通过 TransportLayerASIO::start()完成后续处理,该接口代码实现如下:
1.//_initAndListen中调用执行 2.Status TransportLayerASIO::start() { //listen线程处理 3. ...... 4. //这里专门起一个线程做listen相关的accept事件处理 5. _listenerThread = stdx::thread([this] { 6. //修改线程名 7. setThreadName("listener"); 8. //该函数中循环处理accept事件 9. while (_running.load()) { 10. asio::io_context::work work(*_acceptorIOContext); 11. try { 12. //accept事件调度处理 13. _acceptorIOContext->run(); 14. } catch (...) { //异常处理 15. severe() << "Uncaught exception in the listener: " << exceptionToStatus(); 16. fassertFailed(40491); 17. } 18. } 19. }); 20. 遍历_acceptors,进行listen监听处理 21. for (auto& acceptor : _acceptors) { 22. acceptor.second.listen(serverGlobalParams.listenBacklog); 23. //异步accept回调注册在该函数中 24. _acceptConnection(acceptor.second); 25. } 26.}
复制代码
从上面的 TransportLayerASIO::start()接口可以看出,mongodb 特地创建了一个 listener 线程用于客户端 accept 事件处理,然后借助 ASIO 网络库的 acceptorIOContext->run()接口来调度,当有新链接到来的时候,就会执行相应的 accept 回调处理,accept 回调注册到 iocontext 的流程由 acceptConnection()完成,该接口核心源码实现如下:
1.//accept新连接到来的回调注册 2.void TransportLayerASIO::_acceptConnection(GenericAcceptor& acceptor) { 3. //新链接到来时候的回调函数,服务端接收到新连接都会执行该回调4. //注意这里面是递归执行,保证所有accept事件都会一次处理完毕5. auto acceptCb = [this, &acceptor](const std::error_code& ec, GenericSocket peerSocket) mutable { 6. if (!_running.load()) 7. return; 8. 9. ...... 10. //每个新的链接都会new一个新的ASIOSession 11. std::shared_ptr<ASIOSession> session(new ASIOSession(this, std::move(peerSocket))); 12. //新的链接处理ServiceEntryPointImpl::startSession, 13. //和ServiceEntryPointImpl服务入口点模块关联起来 14. _sep->startSession(std::move(session)); 15. //递归,直到处理完所有的网络accept事件 16. _acceptConnection(acceptor); 17. }; 18. //accept新连接到来后服务端的回调处理在这里注册 19. acceptor.async_accept(*_workerIOContext, std::move(acceptCb)); 20.}
复制代码
TransportLayerASIO::_acceptConnection 的新连接处理过程借助 ASIO 库实现,通过 acceptor.async_accept 实现所有监听的 acceptor 回调异步注册。
当服务端接收到客户端新连接事件通知后,会触发执行 acceptCb()回调,该回调中底层 ASIO 库通过 epoll_wait 获取到所有的 accept 事件,每获取到一个 accept 事件就代表一个新的客户端链接,然后调用 ServiceEntryPointImpl::startSession()接口处理这个新的链接事件,整个过程递归执行,保证一次可以处理所有的客户端 accept 请求信息。
每个链接都会构造一个唯一的 session 信息,该 session 就代表一个唯一的新连接,链接和 session 一一对应。此外,最终会调用 ServiceEntryPointImpl::startSession()进行真正的 accept()处理,从而获取到一个新的链接。
注意:TransportLayerASIO::_acceptConnection()中实现了 TransportLayerASIO 类和 ServiceEntryPointImpl 类的关联,这两个类在该接口实现了关联。
此外,从前面的 TransportLayerASIO 类结构中可以看出,该类还包含如下四个接口:sourceMessage(...)、sinkMessage(...)、wait(Ticket&& ticket)、asyncWait(Ticket&& ticket, TicketCallback callback),这四个接口入参都和 Ticket 数据分发子模块相关联,具体核心代码实现如下:
1.//根据asioSession, expiration, message三个信息构造数据接收类ASIOSourceTicket 2.Ticket TransportLayerASIO::sourceMessage(...) { 3. ...... 4. auto asioSession = checked_pointer_cast<ASIOSession>(session); 5. //根据asioSession, expiration, message三个信息构造ASIOSourceTicket 6. auto ticket = stdx::make_unique<ASIOSourceTicket>(asioSession, expiration, message); 7. return {this, std::move(ticket)}; 8.} 9. 10.//根据asioSession, expiration, message三个信息构造数据发送类ASIOSinkTicket 11.Ticket TransportLayerASIO::sinkMessage(...) { 12. auto asioSession = checked_pointer_cast<ASIOSession>(session); 13. auto ticket = stdx::make_unique<ASIOSinkTicket>(asioSession, expiration, message); 14. return {this, std::move(ticket)}; 15.} 16. 17.//同步接收或者发送,最终调用ASIOSourceTicket::fill 或者 ASIOSinkTicket::fill 18.Status TransportLayerASIO::wait(Ticket&& ticket) { 19. //获取对应Ticket,接收对应ASIOSourceTicket,发送对应ASIOSinkTicket 20. auto ownedASIOTicket = getOwnedTicketImpl(std::move(ticket)); 21. auto asioTicket = checked_cast<ASIOTicket*>(ownedASIOTicket.get()); 22. ...... 23. //调用对应fill接口 同步接收ASIOSourceTicket::fill 或者 同步发送ASIOSinkTicket::fill 24. asioTicket->fill(true, [&waitStatus](Status result) { waitStatus = result; }); 25. return waitStatus; 26.} 27.//异步接收或者发送,最终调用ASIOSourceTicket::fill 或者 ASIOSinkTicket::fill 28.void TransportLayerASIO::asyncWait(Ticket&& ticket, TicketCallback callback) { 29. //获取对应数据收发的Ticket,接收对应ASIOSourceTicket,发送对应ASIOSinkTicket 30. auto ownedASIOTicket = std::shared_ptr<TicketImpl>(getOwnedTicketImpl(std::move(ticket))); 31. auto asioTicket = checked_cast<ASIOTicket*>(ownedASIOTicket.get()); 32. 33. //调用对应ASIOTicket::fill 34. asioTicket->fill( 35. false, [ callback = std::move(callback), 36. ownedASIOTicket = std::move(ownedASIOTicket) ](Status status) { callback(status); }); 37.}
复制代码
上面四个接口中的前两个接口主要通过 Session, expiration, message 这三个参数来获取对应的 Ticket 信息,实际上 mongodb 内核实现中把接收数据的 Ticket 和发送数据的 Ticket 分别用不同的继承类 ASIOSourceTicket 和 ASIOSinkTicket 来区分,三个参数的作用如下表所示:
数据收发包括同步收发和异步收发,同步收发通过 TransportLayerASIO::wait()实现,异步收发通过 TransportLayerASIO::asyncWait()实现。
注意:以上四个接口把 TransportLayerASIO 类和 Ticket 数据收发类的关联。
2.2 总结
transport_layer 套接字处理及传输层管理子模块主要由 transport_layer_manager 和 transport_layer_asio 两个核心类组成,这两个类的核心接口功能总结如下表所示:
Transport_layer_manager 中初始化 TransportLayer 和 serviceExecutor,net.TransportLayer 配置可以为 legacy 和 asio,其中 legacy 已经淘汰,当前内核只支持 asio 模式。asio 配置对应的 TransportLayer 由 TransportLayerASIO 实现,对应的 serviceExecutor 线程模型可以是 adaptive 动态线程模型,也可以是 synchronous 同步线程模型。
套接字创建、bind()绑定、listen()监听、accept 事件注册等都由本类实现,同时数据分发 Ticket 模块也与本模块关联,一起配合完成整个后续 Ticket 模块模块的同步及异步数据读写流程。此外,本模块还通过 ServiceEntryPoint 服务入口子模块联动,保证了套接字初始化、accept 事件注册完成后,服务入口子模块能有序的进行新连接接收处理。
接下来继续分析本模块相关联的 ServiceEntryPoint 服务入口子模块和 Ticket 数据分发子模块实现。
3. service_entry_point 服务入口点子模块
service_entry_point 服务入口点子模块主要负责如下功能:新连接处理、Session 会话管理、接收到一个完整报文后的回调处理(含报文解析、认证、引擎层处理等)。
该模块的源码实现主要包含以下几个文件:
service_entry_point 开头的代码文件都和本模块相关,其中 service_entry_point_utils*负责工作线程创建,service_entry_point_impl*完成新链接回调处理及 sesseion 会话管理。
3.1 核心源码实现
服务入口子模块相关代码实现比较简洁,主要由 ServiceEntryPointImpl 类和 service_entry_point_utils 中的线程创建函数组成。
3.1.1 ServiceEntryPointImpl 类核心代码实现
ServiceEntryPointImpl 类主要成员和接口如下:
1.class ServiceEntryPointImpl : public ServiceEntryPoint { 2. MONGO_DISALLOW_COPYING(ServiceEntryPointImpl); 3.public: 4. //构造函数 5. explicit ServiceEntryPointImpl(ServiceContext* svcCtx); 6. //以下三个接口进行session会话处理控制 7. void startSession(transport::SessionHandle session) final; 8. void endAllSessions(transport::Session::TagMask tags) final; 9. bool shutdown(Milliseconds timeout) final; 10. //session会话统计 11. Stats sessionStats() const final; 12. ...... 13.private: 14. //该list结构管理所有的ServiceStateMachine信息 15. using SSMList = stdx::list<std::shared_ptr<ServiceStateMachine>>; 16. //SSMList对应的迭代器 17. using SSMListIterator = SSMList::iterator; 18. //赋值ServiceEntryPointImpl::ServiceEntryPointImpl 19. //对应ServiceContextMongoD(mongod)或者ServiceContextNoop(mongos)类 20. ServiceContext* const _svcCtx; 21. //该成员变量在代码中没有使用 22. AtomicWord<std::size_t> _nWorkers; 23. //锁 24. mutable stdx::mutex _sessionsMutex; 25. //一个新链接对应一个ssm保存到ServiceEntryPointImpl._sessions中 26. SSMList _sessions; 27. //最大链接数控制 28. size_t _maxNumConnections{DEFAULT_MAX_CONN}; 29. //当前的总链接数,不包括关闭的链接 30. AtomicWord<size_t> _currentConnections{0}; 31. //所有的链接,包括已经关闭的链接 32. AtomicWord<size_t> _createdConnections{0}; 33.};
复制代码
该类的几个接口主要是 session 相关控制处理,该类中的变量成员说明如下:
ServiceEntryPointImpl 类最核心的 startSession()接口负责每个新连接到来后的内部回调处理,具体实现如下:
1.//新链接到来后的回调处理 2.void ServiceEntryPointImpl::startSession(transport::SessionHandle session) { 3. //获取该新连接对应的服务端和客户端地址信息 4. const auto& remoteAddr = session->remote().sockAddr(); 5. const auto& localAddr = session->local().sockAddr(); 6. //服务端和客户端地址记录到session中 7. auto restrictionEnvironment = stdx::make_unique<RestrictionEnvironment>(*remoteAddr, *localAddr); 8. RestrictionEnvironment::set(session, std::move(restrictionEnvironment)); 9. ...... 10. 11. //获取transportMode,kAsynchronous或者kSynchronous 12. auto transportMode = _svcCtx->getServiceExecutor()->transportMode(); 13. //构造ssm 14. auto ssm = ServiceStateMachine::create(_svcCtx, session, transportMode); 15. {//该{}体内实现链接计数,同时把ssm统一添加到_sessions列表管理 16. stdx::lock_guard<decltype(_sessionsMutex)> lk(_sessionsMutex); 17. connectionCount = _sessions.size() + 1; //连接数自增 18. if (connectionCount <= _maxNumConnections) { 19. //新来的链接对应的session保存到_sessions链表 20. //一个新链接对应一个ssm保存到ServiceEntryPointImpl._sessions中 21. ssmIt = _sessions.emplace(_sessions.begin(), ssm); 22. _currentConnections.store(connectionCount); 23. _createdConnections.addAndFetch(1); 24. } 25. } 26. //链接超限,直接退出 27. if (connectionCount > _maxNumConnections) { 28. ...... 29. return; 30. } 31. //链接关闭的回收处理 32. ssm->setCleanupHook([ this, ssmIt, session = std::move(session) ] { 33. ...... 34. }); 35. //获取transport模式为同步模式还是异步模式,也就是adaptive线程模式还是synchronous线程模式 36. auto ownership = ServiceStateMachine::Ownership::kOwned; 37. if (transportMode == transport::Mode::kSynchronous) { 38. ownership = ServiceStateMachine::Ownership::kStatic; 39. } 40. //ServiceStateMachine::start,这里和服务状态机模块衔接起来 41. ssm->start(ownership); 42.}
复制代码
该接口拿到该链接对应的服务端和客户端地址后,记录到该链接对应 session 中,然后根据该 session、transportMode、_svcCtx 构建一个服务状态机 ssm(ServiceStateMachine)。一个新链接对应一个唯一 session,一个 session 对应一个唯一的服务状态机 ssm,这三者保持唯一的一对一关系。
最终,startSession()让服务入口子模块、session 会话子模块、ssm 状态机子模块关联起来。
3.1.2 service_entry_point_utils 核心代码实现
service_entry_point_utils 源码文件只有 launchServiceWorkerThread 一个接口,该接口主要负责工作线程创建,并设置每个工作线程的线程栈大小,如果系统默认栈大于 1M,则每个工作线程的线程栈大小设置为 1M,如果系统栈大小小于 1M,则以系统堆栈大小为准,同时 warning 打印提示。该函数实现如下:
1.Status launchServiceWorkerThread(stdx::function<void()> task) { 2. static const size_t kStackSize = 1024 * 1024; //1M 3. struct rlimit limits; 4. //或者系统堆栈大小 5. invariant(getrlimit(RLIMIT_STACK, &limits) == 0); 6. //如果系统堆栈大小大于1M,则默认设置线程栈大小为1M 7. if (limits.rlim_cur > kStackSize) { 8. size_t stackSizeToSet = kStackSize; 9. int failed = pthread_attr_setstacksize(&attrs, stackSizeToSet); 10. if (failed) { 11. const auto ewd = errnoWithDescription(failed); 12. warning() << "pthread_attr_setstacksize failed: " << ewd; 13. } 14. } else if (limits.rlim_cur < 1024 * 1024) { 15. //如果系统栈大小小于1M,则已系统堆栈为准,同时给出告警 16. warning() << "Stack size set to " << (limits.rlim_cur / 1024) << "KB. We suggest 1MB"; 17. }} 18. ...... 19. //task参数传递给新建线程 20. auto ctx = stdx::make_unique<stdx::function<void()>>(std::move(task)); 21. int failed = pthread_create(&thread, &attrs, runFunc, ctx.get()); 22. ...... 23.}
复制代码
3.2 总结
service_entry_point 服务入口点子模块主要负责新连接后的回调处理及工作线程创建,该模块和后续的 session 会话模块、SSM 服务状态机模块衔接配合,完成数据收发的正常逻辑转换处理。上面的分析只列出了服务入口点子模块的核心接口实现,下表总结该模块所有的接口功能:
3. Ticket 数据收发子模块
Ticket 数据收发子模块主要功能如下:调用 session 子模块进行底层 asio 库处理、拆分数据接收和数据发送到两个类、完整 mongodb 报文读取 、接收或者发送 mongodb 报文后的回调处理。
3.1 ASIOTicket 类核心代码实现
Ticket 数据收发模块相关实现主要由 ASIOTicket 类完成,该类结构如下:
1.//下面的ASIOSinkTicket和ASIOSourceTicket继承该类,用于控制数据的发送和接收 2.class TransportLayerASIO::ASIOTicket : public TicketImpl { 3.public: 4. //初始化构造 5. explicit ASIOTicket(const ASIOSessionHandle& session, Date_t expiration); 6. //获取sessionId 7. SessionId sessionId() const final { 8. return _sessionId; 9. } 10. //asio模式没用,针对legacy模型 11. Date_t expiration() const final { 12. return _expiration; 13. } 14.15. //以下四个接口用于数据收发相关处理 16. void fill(bool sync, TicketCallback&& cb); 17.protected: 18. void finishFill(Status status); 19. bool isSync() const; 20. virtual void fillImpl() = 0; 21.private: 22. //会话信息,一个链接一个session 23. std::weak_ptr<ASIOSession> _session; 24. //每个session有一个唯一id 25. const SessionId _sessionId; 26. //asio模型没用,针对legacy生效 27. const Date_t _expiration; 28. //数据发送或者接收成功后的回调处理 29. TicketCallback _fillCallback; 30. //同步方式还是异步方式进行数据处理,默认异步 31. bool _fillSync; 32.};
复制代码
该类保护多个成员变量,这些成员变量功能说明如下:
mongodb 在具体实现上,数据接收和数据发送分开实现,分别是数据接收类 ASIOSourceTicket 和数据发送类 ASIOSinkTicket,这两个类都继承自 ASIOTicket 类,这两个类的主要结构如下:
1.//数据接收的ticket 2.class TransportLayerASIO::ASIOSourceTicket : public TransportLayerASIO::ASIOTicket { 3.public: 4. //初始化构造 5. ASIOSourceTicket(const ASIOSessionHandle& session, Date_t expiration, Message* msg); 6.protected: 7. //数据接收Impl 8. void fillImpl() final; 9.private: 10. //接收到mongodb头部数据后的回调处理 11. void _headerCallback(const std::error_code& ec, size_t size); 12. //接收到mongodb包体数据后的回调处理 13. void _bodyCallback(const std::error_code& ec, size_t size); 14. 15. //存储数据的buffer,网络IO读取到的原始数据内容 16. SharedBuffer _buffer; 17. //数据Message管理,数据来源为_buffer 18. Message* _target; 19.}; 1.2. 20.//数据发送的ticket 21.class TransportLayerASIO::ASIOSinkTicket : public TransportLayerASIO::ASIOTicket { 22. public: 23. //初始化构造 24. ASIOSinkTicket(const ASIOSessionHandle& session, Date_t expiration, const Message& msg); 25.protected: 26. //数据发送Impl 27. void fillImpl() final; 28.private: 29. //发送数据完成的回调处理 30. void _sinkCallback(const std::error_code& ec, size_t size); 31. //需要发送的数据message信息 32. Message _msgToSend; 33.};
复制代码
从上面的代码实现可以看出,ASIOSinkTicket 和 ASIOSourceTicket 类接口及成员实现几乎意义,只是具体的实现方法不同,下面对 ASIOSourceTicket 和 ASIOSinkTicket 相关核心代码实现进行分析。
3.1.2 ASIOSourceTicket 数据接收核心代码实现
数据接收过程核心代码如下:
1.//数据接收的fillImpl接口实现 2.void TransportLayerASIO::ASIOSourceTicket::fillImpl() { 3. //获取对应session信息 4. auto session = getSession(); 5. if (!session) 6. return; 7. //收到读取mongodb头部数据,头部数据长度是固定的kHeaderSize字节 8. const auto initBufSize = kHeaderSize; 9. _buffer = SharedBuffer::allocate(initBufSize); 10. 11. //调用TransportLayerASIO::ASIOSession::read读取底层数据存入_buffer 12. //读完头部数据后执行对应的_headerCallback回调函数 13. session->read(isSync(), 14. asio::buffer(_buffer.get(), initBufSize), //先读取头部字段出来 15. [this](const std::error_code& ec, size_t size) { _headerCallback(ec, size); }); 16.} 17. 18.//读取到mongodb header头部信息后的回调处理 19.void TransportLayerASIO::ASIOSourceTicket::_headerCallback(const std::error_code& ec, size_t size) { 20. ...... 21. //获取session信息 22. auto session = getSession(); 23. if (!session) 24. return; 25. //从_buffer中获取头部信息 26. MSGHEADER::View headerView(_buffer.get()); 27. //获取message长度 28. auto msgLen = static_cast<size_t>(headerView.getMessageLength()); 29. //长度太小或者太大,直接报错 30. if (msgLen < kHeaderSize || msgLen > MaxMessageSizeBytes) { 31. ....... 32. return; 33. } 34. .... 35. //内容还不够一个mongo协议报文,继续读取body长度字节的数据,读取完毕后开始body处理 36. //注意这里是realloc,保证头部和body在同一个buffer中 37. _buffer.realloc(msgLen); 38. MsgData::View msgView(_buffer.get()); 39. 40. //调用底层TransportLayerASIO::ASIOSession::read读取数据body 41. session->read(isSync(), 42. //数据读取到该buffer 43. asio::buffer(msgView.data(), msgView.dataLen()), 44. //读取成功后的回调处理 45. [this](const std::error_code& ec, size_t size) { _bodyCallback(ec, size); }); 46.} 47. 48.//_headerCallback对header读取后解析header头部获取到对应的msg长度,然后开始body部分处理 49.void TransportLayerASIO::ASIOSourceTicket::_bodyCallback(const std::error_code& ec, size_t size) { 50. ...... 51. //buffer转存到_target中 52. _target->setData(std::move(_buffer)); 53. //流量统计 54. networkCounter.hitPhysicalIn(_target->size()); 55. //TransportLayerASIO::ASIOTicket::finishFill 56. finishFill(Status::OK()); //包体内容读完后,开始下一阶段的处理 57. //报文读取完后的下一阶段就是报文内容处理,开始走ServiceStateMachine::_processMessage 58.}
复制代码
Mongodb 协议由 msg header + msg body 组成,一个完整的 mongodb 报文内容格式如下:
上图所示各个字段及 body 内容部分功能说明如下表:
ASIOSourceTicket 类的几个核心接口都是围绕这一原则展开,整个 mongodb 数据接收流程如下:
1. 读取 mongodb 头部 header 数据,解析出 header 中的 messageLength 字段。
2. 检查 messageLength 字段是否在指定的合理范围,该字段不能小于 Header 整个头部大小,也不能超过 MaxMessageSizeBytes 最大长度。
3. Header len 检查通过,说明读取 header 数据完成,于是执行_headerCallback 回调。
4. realloc 更多的空间来存储 body 内容。
5. 继续读取 body len 长度数据,读取 body 完成后,执行_bodyCallback 回调处理。
3.1.3 ASIOSinkTicket 数据发送类核心代码实现
ASIOSinkTicket 发送类相比接收类,没有数据解析相关的流程,因此实现过程会更加简单,具体源码实现如下:
1.//发送数据成功后的回调处理 2.void TransportLayerASIO::ASIOSinkTicket::_sinkCallback(const std::error_code& ec, size_t size) { 3. //发送的网络字节数统计 4. networkCounter.hitPhysicalOut(_msgToSend.size()); 5. //执行SSM中对应的状态流程 6. finishFill(ec ? errorCodeToStatus(ec) : Status::OK()); 7.} 8. 9.//发送数据的fillImpl 10.void TransportLayerASIO::ASIOSinkTicket::fillImpl() { 11. //获取对应session 12. auto session = getSession(); 13. if (!session) 14. return; 15. 16. //调用底层TransportLayerASIO::ASIOSession::write发送数据,发送成功后执行_sinkCallback回调 17. session->write(isSync(), 18. asio::buffer(_msgToSend.buf(), _msgToSend.size()), 19. //发送数据成功后的callback回调 20. [this](const std::error_code& ec, size_t size) { _sinkCallback(ec, size); }); 21.}
复制代码
3.2 总结
从上面的分析可以看出,Ticket 数据收发模块主要调用 session 会话模块来进行底层数据的读写、解析,当读取或者发送一个完整的 mongodb 报文后最终交由 SSM 服务状态机模块调度处理。
ticket 模块主要接口功能总结如下表所示:
前面的分析也可以看出,Ticket 数据收发模块会调用 session 处理模块来进行真正的数据读写,同时接收或者发送完一个完整 mongodb 报文后的后续回调处理讲交由 SSM 服务状态机模块处理。
4. Session 会话子模块
Session 会话模块功能主要如下:负责记录 HostAndPort、和底层 asio 库直接互动,实现数据的同步或者异步收发。一个新连接 fd 对应一个唯一的 session,对 fd 的操作直接映射为 session 操作。Session 会话子模块主要代码实现相关文件如下:
4.1 session 会话子模块核心代码实现
1.class TransportLayerASIO::ASIOSession : public Session { 2. //初始化构造 3. ASIOSession(TransportLayerASIO* tl, GenericSocket socket); 4. //获取本session使用的tl 5. TransportLayer* getTransportLayer(); 6. //以下四个接口套接字相关,本端/对端地址获取,获取fd,关闭fd等 7. const HostAndPort& remote(); 8. const HostAndPort& local(); 9. GenericSocket& getSocket(); 10. void shutdown(); 11. 12. //以下四个接口调用asio网络库实现数据的同步收发和异步收发 13. void read(...) 14. void write(...) 15. void opportunisticRead(...) 16. void opportunisticWrite(...) 17. 18. //远端地址信息 19. HostAndPort _remote; 20. //本段地址信息 21. HostAndPort _local; 22. //赋值见TransportLayerASIO::_acceptConnection 23. //也就是fd,一个新连接对应一个_socket 24. GenericSocket _socket; 25. //SSL相关不做分析, 26.#ifdef MONGO_CONFIG_SSL 27. boost::optional<asio::ssl::stream<decltype(_socket)>> _sslSocket; 28. bool _ranHandshake = false; 29.#endif 30. 31. //本套接字对应的tl,赋值建TransportLayerASIO::_acceptConnection(...) 32. TransportLayerASIO* const _tl; 33.}
复制代码
该类最核心的三个接口 ASIOSession(...)、opportunisticRead(...)、opportunisticWrite(..)分别完成套接字处理、调用 asio 库接口实现底层数据读和底层数据写。这三个核心接口源码实现如下:
1.//初始化构造 TransportLayerASIO::_acceptConnection调用 2.ASIOSession(TransportLayerASIO* tl, GenericSocket socket) 3. //fd描述符及TL初始化赋值 4. : _socket(std::move(socket)), _tl(tl) { 5. std::error_code ec; 6. 7. //异步方式设置为非阻塞读 8. _socket.non_blocking(_tl->_listenerOptions.transportMode == Mode::kAsynchronous, ec); 9. fassert(40490, ec.value() == 0); 10. 11. //获取套接字的family 12. auto family = endpointToSockAddr(_socket.local_endpoint()).getType(); 13. //满足AF_INET14. if (family == AF_INET || family == AF_INET6) { 15. //no_delay keep_alive套接字系统参数设置 16. _socket.set_option(asio::ip::tcp::no_delay(true)); 17. _socket.set_option(asio::socket_base::keep_alive(true)); 18. //KeepAlive系统参数设置 19. setSocketKeepAliveParams(_socket.native_handle()); 20. } 21. 22. //获取本端和对端地址 23. _local = endpointToHostAndPort(_socket.local_endpoint()); 24. _remote = endpointToHostAndPort(_socket.remote_endpoint(ec)); 25. if (ec) { 26. LOG(3) << "Unable to get remote endpoint address: " << ec.message(); 27. } 28.}
复制代码
该类初始化的时候完成新连接_socket 相关的初始化设置,包括阻塞读写还是非阻塞读写。如果是同步线程模型(一个链接一个线程),则读写方式位阻塞读写;如果是异步线程模型(adaptive 动态线程模型),则调用 asio 网络库接口实现异步读写。
此外,该链接 socket 对应的客户端 ip:port 和服务端 ip:port 也在该初始化类中获取,最终保存到本 session 的 remote 和_local 成员中。
数据读取核心代码实现如下:
1.//读取指定长度数据,然后执行handler回调 2.void opportunisticRead(...) { 3. std::error_code ec; 4. //如果是异步线程模型,在ASIOSession构造初始化的时候会设置non_blocking非阻塞模式 5. //异步线程模型这里实际上是非阻塞读取,如果是同步线程模型,则没有non_blocking设置,也就是阻塞读取 6. auto size = asio::read(stream, buffers, ec); 7. 8. //如果是异步读,并且read返回would_block或者try_again说明指定长度的数据还没有读取完毕 9. if ((ec == asio::error::would_block || ec == asio::error::try_again) && !sync) { 10. //buffers有大小size,实际读最多读size字节 11. MutableBufferSequence asyncBuffers(buffers); 12. if (size > 0) { 13. asyncBuffers += size; //buffer offset向后移动 14. } 15. 16. //继续异步方式读取数据,读取到指定长度数据后执行handler回调处理 17. asio::async_read(stream, asyncBuffers, std::forward<CompleteHandler>(handler)); 18. } else { 19. //阻塞方式读取read返回后可以保证读取到了size字节长度的数据 20. //直接read获取到size字节数据,则直接执行handler 21. handler(ec, size); 22. } 23.}
复制代码
opportunisticRead 首先调用 asio::read(stream, buffers, ec)读取 buffers 对应 size 长度的数据,buffers 空间大小就是需要读取的数据 size 大小。如果是同步线程模型,则这里为阻塞式读取,直到读到 size 字节才会返回;如果是异步线程模型,这这里是非阻塞读取,非阻塞读取当内核网络协议栈数据读取完毕后,如果还没有读到 size 字节,则继续进行 async_read 异步读取。
当 buffers 指定的 size 字节全部读取完整后,不管是同步模式还是异步模式,最终都会执行 handler 回调,开始后续的数据解析及处理流程。
发送数据核心代码实现如下:
1.//发送数据 2.void opportunisticWrite(...) { 3. std::error_code ec; 4. //如果是同步模式,则阻塞写,直到全部写成功。异步模式则非阻塞写 5. auto size = asio::write(stream, buffers, ec); 6. 7. //异步写如果返回try_again说明数据还没有发送完,则继续异步写发送 8. if ((ec == asio::error::would_block || ec == asio::error::try_again) && !sync) { 9. ConstBufferSequence asyncBuffers(buffers); 10. if (size > 0) { //buffer中数据指针偏移计数11. asyncBuffers += size; 12. } 13. //异步写发送完成,执行handler回调 14. asio::async_write(stream, asyncBuffers, std::forward<CompleteHandler>(handler)); 15. } else { 16. //同步写成功,则直接执行handler回调处理 17. handler(ec, size); 18. } 19.}
复制代码
数据发送流程和数据接收流程类似,也分位同步模式发送和异步模式发送,同步模式发送为阻塞式写,只有当所有数据通过 asio::write()发送成功后才返回;异步模式发送为非阻塞写,asio::write()不一定全部发送出去,因此需要再次调用 asio 库的 asio::async_write()进行异步发送。
不管是同步模式还是异步模式发送数据,最终数据发送成功后,都会调用 handler()回调来执行后续的流程。
4.2 总结
从上面的代码分析可以看出,session 会话模块最终直接和 asio 网络库交互实现数据的读写操作。该模块核心接口功能总结如下表:
5. 总结
《Mongodb网络传输处理源码实现及性能调优-体验内核性能极致设计》一文对 mongodb 网络传输模块中的 ASIO 网络库实现、service_executor 服务运行子模块(即线程模型子模块)进行了详细的分析,加上本文的 transport_layer 套接字处理及传输层管理子模块、session 会话子模块、Ticket 数据收发子模块、service_entry_point 服务入口点子模块。
transport_layer 套接字处理及传输层管理子模块主要由 transport_layer_manager 和 transport_layer_asio 两个核心类组成。分别完成 net 相关的配置文件初始化操作,套接字初始化及处理,最终 transport_layer_asio 的相应接口实现了和 ticket 数据分发子模块、服务入口点子模块的关联。
服务入口子模块主要由 ServiceEntryPointImpl 类和 service_entry_point_utils 中的线程创建函数组成,实现新连接 accept 处理及控制。该模块通过 startSession()让服务入口子模块、session 会话子模块、ssm 状态机子模块关联起来。
ticket 数据收发子模块主要功能如下:调用 session 子模块进行底层 asio 库处理、拆分数据接收和数据发送到两个类、完整 mongodb 报文读取 、接收或者发送 mongodb 报文后的回调处理,回调处理由 SSM 服务状态机模块管理,当读取或者发送一个完整的 mongodb 报文后最终交由 SSM 服务状态机模块调度处理。。
Session 会话模块功能主要如下:负责记录 HostAndPort、和底层 asio 库直接互动,实现数据的同步或者异步收发。一个新连接 fd 对应一个唯一的 session,对 fd 的操作直接映射为 session 操作。
到这里,整个 mongodb 网络传输层模块分析只差 service_state_machine 状态机调度子模块,状态机调度子模块相比本文分析的几个子模块更加复杂,因此将在下期《mongodb 网络传输层模块源码分析三》中单独分析。
本文所有源码注释分析详见如下链接:mongodb网络传输模块详细源码分析
评论