transport_layer网络传输层模块源码实现三
关于作者
前滴滴出行技术专家,现任OPPO文档数据库mongodb负责人,负责oppo千万级峰值TPS/十万亿级数据量文档数据库mongodb研发和运维工作,一直专注于分布式缓存、高性能服务端、数据库、中间件等相关研发。Github账号地址:https://github.com/y123456yz
1.说明
在之前的Mongodb网络模块源码实现及性能调优(一)和<<transport_layer网络传输层模块源码实现二>>一文中分析了如何阅读百万级大工程源码、Asio网络库实现、线程模型、transport_layer套接字处理及传输层管理子模块、session会话子模块、Ticket数据收发子模块、service_entry_point服务入口点子模块。
本文将继续分析网络传输层模块中service_state_machine状态机调度子模块内核源码实现。
2. service_state_machine状态机调度子模块
service_state_machine状态机处理模块主要复杂一次完整请求的状态转换,确保请求可以按照指定的流程顺利进行,最终实现客户端的正常mongodb访问。该模块核心代码实现主要由以下三个源码文件实现(test为测试相关,可以忽略):
2.1核心代码实现
在service_entry_point服务入口点子模块分析中,当接收到一个新的链接后,在ServiceEntryPointImpl::startSession(...)回调函数中会构造一个ServiceStateMachine ssm类,从而实现了新链接、session、ssm的一一映射关系。其中,ServiceStateMachine 类实现对ThreadGuard(线程守护)有较多的依赖,因此本文从这两个类核心代码实现来分析整个状态机调度模块的内部设计细节。
2.1.1 ThreadGuard线程守护类核心代码实现
ThreadGuard也就是”线程守护”类,该类主要用于工作线程名的管理维护、ssm归属管理、该ssm对应session链接的回收处理等。该类核心成员及接口实现如下:
1.class ServiceStateMachine::ThreadGuard {
2. ......
3.public:
4.
5.
6. explicit ThreadGuard(ServiceStateMachine* ssm) : _ssm{ssm} {
7.
8. auto owned = _ssm->_owned.compareAndSwap(Ownership::kUnowned, Ownership::kOwned);
9.
10. if (owned == Ownership::kStatic) {
11.
12. dassert(haveClient());
13. dassert(Client::getCurrent() == _ssm->_dbClientPtr);
14.
15. _haveTakenOwnership = true;
16. return;
17. }
18. ......
19.
20.
21.
22. auto oldThreadName = getThreadName();
23.
24. if (oldThreadName != _ssm->_threadName) {
25.
26. _ssm->_oldThreadName = getThreadName().toString();
27.
28. setThreadName(_ssm->_threadName);
29. }
30.
31.
32. Client::setCurrent(std::move(_ssm->_dbClient));
33.
34. _haveTakenOwnership = true;
35. }
36. ......
37.
38. ThreadGuard& operator=(ThreadGuard&& other) {
39. if (this != &other) {
40. _ssm = other._ssm;
41. _haveTakenOwnership = other._haveTakenOwnership;
42.
43. other._haveTakenOwnership = false;
44. }
45.
46. return *this;
47. };
48.
49.
50. ~ThreadGuard() {
51.
52. if (_haveTakenOwnership)
53. release();
54. }
55.
56.
57. void markStaticOwnership() {
58. dassert(static_cast<bool>(*this));
59. _ssm->_owned.store(Ownership::kStatic);
60. }
61.
62.
63. void release() {
64. auto owned = _ssm->_owned.load();
65.
66. if (owned != Ownership::kStatic) {
67.
68. if (haveClient()) {
69. _ssm->_dbClient = Client::releaseCurrent();
70. }
71.
72. if (!_ssm->_oldThreadName.empty()) {
73.
74. setThreadName(_ssm->_oldThreadName);
75. }
76. }
77.
78. if (_ssm->state() == State::Ended) {
79.
80. auto cleanupHook = std::move(_ssm->_cleanupHook);
81. if (cleanupHook)
82. cleanupHook();
83. return;
84. }
85.
86.
87. _haveTakenOwnership = false;
88.
89. if (owned == Ownership::kOwned) {
90. _ssm->_owned.store(Ownership::kUnowned);
91. }
92. }
93.
94.private:
95.
96. ServiceStateMachine* _ssm;
97.
98. bool _haveTakenOwnership = false;
99.}
从上面的代码分析可以看出线程守护类作用比较明确,就是守护当前线程的归属状态,并记录状态机ssm不同状态变化前后的线程名。此外,状态机ssm对应的session链接如果进入end状态,则该链接的资源回收释放也由该类完成。
查看mongod或者mongos实例,如果启动实例的时候配置了”serviceExecutor: adaptive”会发现这些进程下面有很多线程名为”conn-x”和”worker-x”线程,同时同一个线程线程名可能发生改变,这个过程就是由ThreadGuard线程守护类来实现。synchronous一个链接一个线程模型只有”conn-x”线程,adaptive线程模型将同时存在有线程名为”conn-x”和”worker-x”的线程,具体原理讲在后面继续分析,如下图:
说明:synchronous线程模式对应worker初始线程名为”conn-x”,adaptive线程模型对应worker初始线程名为”worker-x”。
ThreadGuard线程守护类和状态机ssm(service_state_machine)关联紧密,客户端请求处理的内部ssm状态转换也和该类密切关联,请看后续分析。
2.1.2 ServiceStateMachine 类核心代码实现
service_state_machine状态机处理模块核心代码实现通过ServiceStateMachine类完成,该类的核心结构成员和函数接口如下:
1.
2.class ServiceStateMachine : public std::enable_shared_from_this<ServiceStateMachine> {
3. ......
4.public:
5. ......
6. static std::shared_ptr<ServiceStateMachine> create(...);
7. ServiceStateMachine(...);
8.
9. enum class State {
10.
11. Created,
12.
13.
14.
15. Source,
16.
17. SourceWait,
18.
19. Process,
20.
21. SinkWait,
22.
23. EndSession,
24.
25. Ended
26. };
27.
28. enum class Ownership {
29.
30. kUnowned,
31.
32. kOwned,
33.
34. kStatic
35. };
36.
37. ......
38.private:
39.
40. class ThreadGuard;
41. friend class ThreadGuard;
42.
43. ......
44.
45. const transport::SessionHandle& _session()
46.
47. void _scheduleNextWithGuard(...);
48. void _runNextInGuard(ThreadGuard guard);
49.
50. inline void _processMessage(ThreadGuard guard);
51.
52. void _sourceCallback(Status status);
53. void _sinkCallback(Status status);
54. void _sourceMessage(ThreadGuard guard);
55. void _sinkMessage(ThreadGuard guard, Message toSink);
56.
57.
58. AtomicWord<State> _state{State::Created};
59.
60. ServiceEntryPoint* _sep;
61.
62. transport::Mode _transportMode;
63.
64. ServiceContext* const _serviceContext;
65.
66. transport::SessionHandle _sessionHandle;
67.
68.
69. ServiceContext::UniqueClient _dbClient;
70.
71. const Client* _dbClientPtr;
72.
73. const std::string _threadName;
74.
75. std::string _oldThreadName;
76.
77.
78. stdx::function<void()> _cleanupHook;
79.
80. Message _inMessage;
81.
82.
83. AtomicWord<Ownership> _owned{Ownership::kUnowned};
84.}
该类核心成员功能说明如下表:
我们知道,链接、session、SSM状态机一一对应,他们也拥有对应的归属权,这里的归属权指的就是当前SSM归属于那个线程,也就是当前SSM状态机调度模块由那个线程实现。归属权通过Ownership类标记,该类保护如下几种状态,每种状态功能说明如下:
Mongodb服务端接收到客户端请求后的数据接收、协议解析、从db层获取数据、发送数据给客户端都是通过SSM状态机进行有序的状态转换处理,SSM调度处理过程中保护多个状态,每个状态对应一个状态码,具体状态码及其功能说明如下表所示:
以上是SSM处理请求过程的状态码信息,状态转换的具体实现过程请参考后面的核心代码分析。listerner线程接收到新的客户端链接后会调用通过serviceentrypoint服务入口点子模块的ssm->start()接口进入SSM状态机调度模块,该接口相关的源码实现如下:
1.
2.void ServiceStateMachine::start(Ownership ownershipModel) {
3.
4. _scheduleNextWithGuard(
5.
6.
7. ThreadGuard(this), transport::ServiceExecutor::kEmptyFlags, ownershipModel);
8.}
9.
10.void ServiceStateMachine::_scheduleNextWithGuard(...) {
11.
12. auto func = [ ssm = shared_from_this(), ownershipModel ] {
13.
14. ThreadGuard guard(ssm.get());
15.
16. if (ownershipModel == Ownership::kStatic)
17. guard.markStaticOwnership();
18.
19. ssm->_runNextInGuard(std::move(guard));
20. };
21.
22. guard.release();
23.
24.
25. Status status = _serviceContext->getServiceExecutor()->schedule(std::move(func), flags);
26. if (status.isOK()) {
27. return;
28. }
29.
30. ......
31.}
ServiceStateMachine::start()接口调用ServiceStateMachine::_scheduleNextWithGuard()来启动状态机运行。_scheduleNextWithGuard()接口最核心的作用就是调用service_executor服务运行子模块(线程模型子模块)的schedule()接口来把状态机调度任务入队到ASIO网络库的一个全局队列(adaptive动态线程模型),如果是一个链接一个线程模型,则任务入队到线程级私有队列。
adaptive线程模型,任务入队以及工作线程调度任务执行的流程将在后续的线程模型子模块中分析,也可以参考:<<Mongodb网络传输处理源码实现及性能调优-体验内核性能极致设计>>
此外,_scheduleNextWithGuard()入队到全局队列的任务就是本模块后面需要分析的SSM状态机任务,这些task任务通过本函数接口的func (...)进行封装,然后通过线程模型子模块入队到一个全局队列。Func(...)这个task任务中会直接调用_runNextInGuard()接口来进行状态转换处理,该接口也就是入队到ASIO全局队列的任务,核心代码功能如下:
1.void ServiceStateMachine::_runNextInGuard(ThreadGuard guard) {
2.
3. auto curState = state();
4.
5.
6. if (curState == State::Created) {
7.
8. curState = State::Source;
9. _state.store(curState);
10. }
11.
12. try {
13. switch (curState) {
14.
15. case State::Source:
16. _sourceMessage(std::move(guard));
17. break;
18.
19. case State::Process:
20. _processMessage(std::move(guard));
21. break;
22.
23. case State::EndSession:
24. _cleanupSession(std::move(guard));
25. break;
26. default:
27. MONGO_UNREACHABLE;
28. }
29. return;
30. } catch (...) {
31.
32. }
33.
34. ......
35.
36. _state.store(State::EndSession);
37. _cleanupSession(std::move(guard));
38.}
从上面的代码实现可以看出,真正入队到全局队列中的任务类型只有三个,分别是:
1) 接收mongodb数据的task任务,简称为readTask。
2) 接收到一个完整mongodb数据后的后续处理(包括协议解析、命令处理、DB获取数据、发送数据给客户端等),简称为dealTask。
3) 接收或者发送数据异常、链接关闭等引起的后续资源释放,简称为cleanTask。
下面针对这三种task任务核心代码实现进行分析:
readTask任务核心代码实现由_sourceMessage()接口实现,具体代码如下:
1.
2.void ServiceStateMachine::_sourceMessage(ThreadGuard guard) {
3. ......
4.
5. auto ticket = _session()->sourceMessage(&_inMessage);
6.
7. _state.store(State::SourceWait);
8.
9. guard.release();
10.
11. if (_transportMode == transport::Mode::kSynchronous) {
12.
13. _sourceCallback([this](auto ticket) {
14. MONGO_IDLE_THREAD_BLOCK;
15. return _session()->getTransportLayer()->wait(std::move(ticket));
16. }(std::move(ticket)));
17. } else if (_transportMode == transport::Mode::kAsynchronous) {
18.
19. _session()->getTransportLayer()->asyncWait(
20.
21. std::move(ticket), [this](Status status) { _sourceCallback(status); });
22. }
23.}
24.
25.
26.void ServiceStateMachine::_sourceCallback(Status status) {
27.
28. ThreadGuard guard(this);
29.
30.
31. dassert(state() == State::SourceWait);
32.
33. auto remote = _session()->remote();
34. if (status.isOK()) {
35.
36. _state.store(State::Process);
37.
38. return _scheduleNextWithGuard(std::move(guard), ServiceExecutor::kMayRecurse);
39. }
40. ......
41.
42. _runNextInGuard(std::move(guard));
43.}
SSM调度的第一个任务就是readTask任务,从上面的源码分析可以看出,该任务就是通过ticket数据分发模块从ASIO网络库读取一个完整长度的mongodb报文,然后执行sourceCallback回调。进入该回调函数后,即刻设置SSM状态为State::Process状态,然后调用scheduleNextWithGuard(...)把dealTask任务入队到ASIO的全局队列(adaptive线程模型),或者入队到线程级私有队列(synchronous线程模型)等待worker线程调度执行。
这里有个细节,在把dealTask入队的时候,携带了kMayRecurse标记,该标记标识该任务可以递归调用,也就是该任务可以由当前线程继续执行,这样也就可以保证同一个请求的taskRead任务和dealTask任务由同一个线程处理。任务递归调度,可以参考后面的线程模型子模块源码实现。
当读取到一个完整长度的mongodb报文后,就会把dealTask任务入队到全局队列,然后由worker线程调度执行该task任务。dealTask任务的核心代码实现如下:
1.
2.void ServiceStateMachine::_processMessage(ThreadGuard guard) {
3. ......
4.
5. networkCounter.hitLogicalIn(_inMessage.size());
6.
7. auto opCtx = Client::getCurrent()->makeOperationContext();
8.
9.
10. DbResponse dbresponse = _sep->handleRequest(opCtx.get(), _inMessage);
11.
12. opCtx.reset();
13.
14. Message& toSink = dbresponse.response;
15.
16. if (!toSink.empty()) {
17. ......
18.
19. _sinkMessage(std::move(guard), std::move(toSink));
20.
21. } else {
22.
23. ......
24. }
25.}
26.
27.
28.void ServiceStateMachine::_sinkMessage(ThreadGuard guard, Message toSink) {
29.
30. auto ticket = _session()->sinkMessage(toSink);
31.
32. _state.store(State::SinkWait);
33.
34. guard.release();
35.
36. if (_transportMode == transport::Mode::kSynchronous) {
37.
38. _sinkCallback(_session()->getTransportLayer()->wait(std::move(ticket)));
39. } else if (_transportMode == transport::Mode::kAsynchronous) {
40.
41. _session()->getTransportLayer()->asyncWait(
42. std::move(ticket), [this](Status status) { _sinkCallback(status); });
43. }
44.}
45.
46.
47.void ServiceStateMachine::_sinkCallback(Status status) {
48.
49. ThreadGuard guard(this);
50.
51. dassert(state() == State::SinkWait);
52. if (!status.isOK()) {
53.
54. _state.store(State::EndSession);
55.
56. return _runNextInGuard(std::move(guard));
57. } else if (_inExhaust) {
58.
59. _state.store(State::Process);
60. } else {
61.
62.
63. _state.store(State::Source);
64. }
65.
66. return _scheduleNextWithGuard(std::move(guard),
67. ServiceExecutor::kDeferredTask |
68. ServiceExecutor::kMayYieldBeforeSchedule);
69.}
readTask通过ticket数据分发子模块读取一个完整长度的mongodb报文后,开始dealTask任务逻辑,该任务也就是_processMessage(...)。该接口中核心实现就是调用mongod和mongos实例对应的服务入口类的handleRequest(...)接口来完成后续的command命令处理、DB层数据访问等,访问到的数据存储到DbResponse中,最终在通过_sinkMessage(...)把数据发送出去。
真正的mongodb内部处理流程实际上就是通过该dealTask任务完成,该任务也是处理整个请求中资源耗费最重的一个环节。在该task任务中,当数据成功发送给客户端后,该session链接对应的SSM状态机进入State::Source状态,继续等待worker线程调度来完成后续该链接的新请求。
在数据读写过程、客户端链接关闭、访问DB数据层等任何一个环节异常,则会进入State::EndSession状态。该状态对应得任务实现相对比较简单,具体代码实现如下:
1.
2.void ServiceStateMachine::_cleanupSession(ThreadGuard guard) {
3.
4. _state.store(State::Ended);
5.
6. _inMessage.reset();
7.
8. Client::releaseCurrent();
9.}
进入该状态后直接由本线程进行session资源回收和client资源释放处理,而无需状态机调度worker线程来回收。
2.2关于worker线程名和guardthread线程守护类
前面得分析我们知道,当线程模型为adaptive动态线程模型的时候,mongod和mongos实例对应的子线程中有很多名为“conn-xx”和”worker-xx”的线程,而且同一个线程可能一会儿线程名为“conn-xx”,下一次又变为了”worker-xx”。这个线程名的初始命名和线程名更改与ServiceStateMachine状态机调度类、guardthread线程守护类、worker线程模型等都有关系。
Worker线程由ServiceExecutor线程模型子模块创建,请参考后续线程模型子模块相关章节。默认初始化线程名为”conn-x”,初始化代码实现如下:
1.
2.ServiceStateMachine::ServiceStateMachine(...)
3. ......
4.
5. _threadName{str::stream() << "conn-" << _session()->id()} {}
6.}
7.
8.class Session {
9. ......
10.
11. const Id _id;
12.}
13.
14.
15.AtomicUInt64 sessionIdCounter(0);
16.
17.
18.Session::Session() : _id(sessionIdCounter.addAndFetch(1)) {}
SSM状态处理过程中,会把一个完整的请求过程= readTask任务+ dealTask任务,这两个任务都是通过SSM状态机和ServiceExecutor线程模型子模块的worker线程配合调度完成,在任务处理过程中处理同一个任务的线程可能会有多次线程名更改,这个就是结合guardthread线程守护类来完成,以一个线程名切换更改伪代码实现为例:
1.worker_thread_run_task(...)
2.{
3.
4. print(threadName)
5.
6. ......
7.
8.
9.
10. ThreadGuard guard(this);
11.
12. print(threadName)
13.
14. ......
15.
16. guard.release();
17.
18.
19. print(threadName)
20.}
从上面的伪代码可以看出,adaptive线程模型对应worker线程名为”worker”,在进入ThreadGuard guard(this)流程后,线程名更改为”conn-xx”线程,当guard.release()释放后恢复原有”worker-xx”线程名。
结合前面的SSM状态处理流程,adaptive线程模型可以得到如下总结:底层网络IO数据读写过程,worker线程名会改为”worker-xx”,其他非网络IO的mongodb内部逻辑处理线程名为”conn-xx”。所以,如果查看mongod或者mongos进程所有线程名的时候,如果发现线程名为”worker-xx”,说明当前线程在处理网络IO;如果发现线程名为”conn-xx”,则说明当前线程在处理内部逻辑处理,对于mongod实例可以理解为主要处理磁盘IO。
由于synchronous同步线程模型,同一链接对应的所有客户端请求至始至终都有同一线程处理,所以整个处理线程名不会改变,也没必要修改线程名,整个过程都是”conn-xx”线程名。
2.3该模块函数接口总结大全
前面分析了主要核心接口源码实现,很多其他接口没有一一列举详细分析,该模块u所有接口功能总结如下,更多接口代码实现详见Mongodb内核源码详细注释分析:
3. 总结
Mongodb网络模块源码实现及性能调优(一)
<<transport_layer网络传输层模块源码实现二>>
本文主要分析了service_state_machine状态机子模块,该模块把session对应的客户端请求转换为readTask任务、dealTask任务和cleanTask任务,前两个任务通过worker线程完成调度处理,cleanTask任务在内部处理异常或者链接关闭的时候由本线程直接执行,而不是通过worker线程调度执行。
这三个任务处理过程会分别对应到Created、Source、SourceWait、Process、SinkWait、EndSession、Ended七种状态的一种或者多种,具体详见前面的状态码分析。一个正常的客户端请求状态转换过程如下:
1) 链接刚建立的第一次请求状态转换过程:
Created->Source -> SourceWait -> Process -> SinkWait -> Source
2) 该链接后续的请求状态转换过程:
Source -> SourceWait -> Process -> SinkWait -> Source
此外,SSM状态机调度模块通过ServiceStateMachine::_scheduleNextWithGuard(...)接口和线程模型子模块关联起来。SSM通过该接口完成worker线程初始创建、task任务入队处理,下期将分析<<网络线程模型子模块>>详细源码实现。
说明:
该模块更多接口实现细节详见Mongodb内核源码注释:Mongodb内核源码详细注释分析
评论