transport_layer网络传输层模块源码实现四
关于作者
前滴滴出行技术专家,现任OPPO文档数据库mongodb负责人,负责oppo千万级峰值TPS/十万亿级数据量文档数据库mongodb内核研发及运维工作,一直专注于分布式缓存、高性能服务端、数据库、中间件等相关研发。后续持续分享《MongoDB内核源码设计、性能优化、最佳运维实践》,Github账号地址:https://github.com/y123456yz
《mongodb内核源码实现、性能调优、最佳运维实践系列》文章有前后逻辑关系,请阅读本篇文章前,提前阅读如下模块:
mongodb网络传输层模块源码实现一
mongodb网络传输层模块源码实现二
mongodb网络传输层模块源码实现三
1. 说明
本文分析网络传输层模块中的最后一个子模块:service_executor服务运行子模块,即线程模型子模块。在阅读该文章前,请提前阅读下<<Mongodb网络传输处理源码实现及性能调优-体验内核性能极致设计>>、<<transport_layer网络传输层模块源码实现二>>、<<transport_layer网络传输层模块源码实现三>>,这样有助于快速理解本文分享的线程模型子模块。
线程模型设计在数据库性能指标中起着非常重要的作用,因此本文将重点分析mongodb服务层线程模型设计,体验mongodb如何通过优秀的工作线程模型来达到多种业务场景下的性能极致表现。该模块主要代码实现文件如下:
service_executor线程模型子模块,在代码实现中,把线程模型分为两种:synchronous线程模式和adaptive线程模型,这两种线程模型中用于任务调度运行的线程统称为worker工作线程。Mongodb启动的时候通过配置参数net.serviceExecutor来确定采用那种线程模式运行mongo实例,配置方式如下:
1.
2.net:
3. serviceExecutor: synchronous
4.
5.
6.net:
7. serviceExecutor: adaptive
2. synchronous同步线程模型(一个链接已给线程)设计原理及核心代码实现
Synchronous同步线程模型也就是每接收到一个链接,就创建一个线程专门负责该链接对应所有的客户端请求,也就是该链接的所有访问至始至终由同一个线程负责处理。
2.1核心代码实现原理
该线程模型核心代码实现由ServiceExecutorSynchronous类负责,该类注意成员变量和重要接口如下:
1.
2.class ServiceExecutorSynchronous final : public ServiceExecutor {
3.public:
4.
5. explicit ServiceExecutorSynchronous(ServiceContext* ctx);
6.
7. Status start() override;
8.
9. Status shutdown(Milliseconds timeout) override;
10.
11. Status schedule(Task task, ScheduleFlags flags) override;
12.
13. Mode transportMode() const override {
14. return Mode::kSynchronous;
15. }
16.
17. void appendStats(BSONObjBuilder* bob) const override;
18.
19.private:
20.
21. static thread_local std::deque<Task> _localWorkQueue;
22.
23. static thread_local int _localRecursionDepth;
24.
25. static thread_local int64_t _localThreadIdleCounter;
26.
27. AtomicBool _stillRunning{false};
28.
29. AtomicWord<size_t> _numRunningWorkerThreads{0};
30.
31. size_t _numHardwareCores{0};
32.};
ServiceExecutorSynchronous类核心成员变量及其功能说明如下:
每个链接对应的线程都有三个私有成员,分别是:线程队列、递归深度、idle频度,这三个线程私有成员的作用如下:
1) _localWorkQueue:线程私有队列,task任务入队及出队执行都是通过该队列完成
2) _localRecursionDepth:任务递归深度控制,避免堆栈溢出
3) _localThreadIdleCounter:当线程运行多少次任务后,需要短暂的休息一会儿,默认运行0xf次task任务就调用markThreadIdle()一次
同步线程模型子模块最核心的代码实现如下:
1.
2.Status ServiceExecutorSynchronous::schedule(Task task, ScheduleFlags flags) {
3.
4. if (!_stillRunning.load()) {
5. return Status{ErrorCodes::ShutdownInProgress, "Executor is not running"};
6. }
7.
8.
9. if (!_localWorkQueue.empty()) {
10.
11. if (flags & ScheduleFlags::kMayYieldBeforeSchedule) {
12.
13. if ((_localThreadIdleCounter++ & 0xf) == 0) {
14.
15.
16. markThreadIdle();
17. }
18.
19. if (_numRunningWorkerThreads.loadRelaxed() > _numHardwareCores) {
20. stdx::this_thread::yield();
21. }
22. }
23.
24.
25. if ((flags & ScheduleFlags::kMayRecurse) &&
26. (_localRecursionDepth < synchronousServiceExecutorRecursionLimit.loadRelaxed())) {
27. ++_localRecursionDepth;
28.
29. task();
30. } else {
31.
32. _localWorkQueue.emplace_back(std::move(task));
33. }
34. return Status::OK();
35. }
36.
37.
38. Status status = launchServiceWorkerThread([ this, task = std::move(task) ] {
39.
40. int ret = _numRunningWorkerThreads.addAndFetch(1);
41.
42. _localWorkQueue.emplace_back(std::move(task));
43. while (!_localWorkQueue.empty() && _stillRunning.loadRelaxed()) {
44.
45. _localRecursionDepth = 1;
46.
47. _localWorkQueue.front()();
48.
49. _localWorkQueue.pop_front();
50. }
51.
52. ......
53. });
54. return status;
55.}
从上面的代码可以看出,worker工作线程通过_localRecursionDepth控制task任务的递归深度,当递归深度超过最大深度synchronousServiceExecutorRecursionLimit值,则把任务到_localWorkQueue队列,然后从队列获取task任务执行。
此外,为了达到性能的极致发挥,在每次执行task任务的时候做了如下细节设计,这些细节设计在高压力情况下,可以提升5%的性能提升:
1) 每运行oxf次任务,就通过markThreadIdle()让线程idle休息一会儿
2) 如果线程数大于CPU核数,则每执行一个任务前都让线程yield()一次
2.2该模块函数接口总结大全
synchronous同步线程模型所有接口及其功能说明如下表所示:
3. Adaptive动态线程模型设计原理及核心代码实现
adaptive动态线程模型,会根据当前系统的访问负载动态的调整线程数,当线程CPU工作比较频繁的时候,控制线程增加工作线程数;当线程CPU比较空闲后,本线程就会自动销毁退出,总体worker工作线程数就会减少。
3.1动态线程模型核心源码实现
动态线程模型核心代码实现由ServiceExecutorAdaptive负责完成,该类核心成员变量及核心函数接口如下:
1.class ServiceExecutorAdaptive : public ServiceExecutor {
2.public:
3.
4. explicit ServiceExecutorAdaptive(...);
5. explicit ServiceExecutorAdaptive(...);
6. ServiceExecutorAdaptive(...) = default;
7. ServiceExecutorAdaptive& operator=(ServiceExecutorAdaptive&&) = default;
8. virtual ~ServiceExecutorAdaptive();
9.
10. Status start() final;
11.
12. Status shutdown(Milliseconds timeout) final;
13.
14. Status schedule(Task task, ScheduleFlags flags) final;
15.
16. Mode transportMode() const final {
17. return Mode::kAsynchronous;
18. }
19.
20. void appendStats(BSONObjBuilder* bob) const final;
21.
22. int threadsRunning() {
23. return _threadsRunning.load();
24. }
25.
26. void _startWorkerThread();
27.
28. void _workerThreadRoutine(int threadId, ThreadList::iterator it);
29.
30. void _controllerThreadRoutine();
31.
32. bool _isStarved() const;
33.
34. std::shared_ptr<asio::io_context> _ioContext;
35.
36. std::unique_ptr<Options> _config;
37.
38. mutable stdx::mutex _threadsMutex;
39. ThreadList _threads;
40.
41. stdx::thread _controllerThread;
42.
43.
44.
45. TickSource* const _tickSource;
46.
47. AtomicWord<bool> _isRunning{false};
48.
49. AtomicWord<int> _threadsRunning{0};
50.
51. AtomicWord<int> _threadsPending{0};
52.
53. AtomicWord<int> _threadsInUse{0};
54.
55. AtomicWord<int> _tasksQueued{0};
56.
57. AtomicWord<int> _deferredTasksQueued{0};
58.
59.
60. TickTimer _lastScheduleTimer;
61.
62. AtomicWord<TickSource::Tick> _pastThreadsSpentExecuting{0};
63.
64. AtomicWord<TickSource::Tick> _pastThreadsSpentRunning{0};
65.
66. static thread_local ThreadState* _localThreadState;
67.
68.
69. AtomicWord<int64_t> _totalQueued{0};
70.
71. AtomicWord<int64_t> _totalExecuted{0};
72.
73. AtomicWord<TickSource::Tick> _totalSpentQueued{0};
74.
75.
76. stdx::condition_variable _deathCondition;
77.
78.
79. stdx::condition_variable _scheduleCondition;
80.};
ServiceExecutorAdaptive类核心成员变量及其功能说明如下:
从上面的成员变量列表看出,队列、线程这两个大类可以进一步细化为不同的小类,如下:
1) 线程:_threadsRunning、threadsPending、_threadsInUsed
2) 队列:_totalExecuted、_tasksQueued、deferredTasksQueued
从上面的ServiceExecutorAdaptive类中的核心接口函数代码实现可以归纳为如下三类:
1) 时间计数相关核心代码实现
2) Worker工作线程创建及任务调度相关核心接口代码实现
3) controler控制线程设计原理及核心代码实现
3.1.1线程运行时间计算相关核心代码实现
线程运行时间计算核心算法如下:
1.
2.enum class ThreadTimer
3.{
4.
5. Running,
6.
7. Executing
8.};
9.
10.
11.
12.struct ThreadState {
13.
14. ThreadState(TickSource* ts) : running(ts), executing(ts) {}
15.
16. CumulativeTickTimer running;
17.
18. CumulativeTickTimer executing;
19.
20. int recursionDepth = 0;
21.};
22.
23.
24.
25.
26.TickSource::Tick ServiceExecutorAdaptive::_getThreadTimerTotal(ThreadTimer which) const {
27.
28. TickSource::Tick accumulator;
29.
30. switch (which) {
31.
32. case ThreadTimer::Running:
33. accumulator = _pastThreadsSpentRunning.load();
34. break;
35.
36. case ThreadTimer::Executing:
37. accumulator = _pastThreadsSpentExecuting.load();
38. break;
39. }
40.
41. stdx::lock_guard<stdx::mutex> lk(_threadsMutex);
42. for (auto& thread : _threads) {
43. switch (which) {
44.
45. case ThreadTimer::Running:
46. accumulator += thread.running.totalTime();
47. break;
48.
49. case ThreadTimer::Executing:
50. accumulator += thread.executing.totalTime();
51. break;
52. }
53. }
54.
55. return accumulator;
56.}
Worker工作线程启动后的时间可以包含两类:1.线程运行task任务的时间;2.线程等待客户端请求的时间。一个线程创建起来,如果没有客户端请求,则线程就会等待接收数据。如果有客户端请求,线程就会通过队列获取task任务运行。这两类时间分别代表线程”忙”和“空闲”。
线程总的“忙”状态时间=所有线程运行task任务的时间,包括已经销毁的线程。线程总的“空闲”时间=所有线程等待获取任务执行的时间,也包括已销毁的线程,线程空闲一般是没有客户端请求,或者客户端请求很少。Worker工作线程对应while(){}循环每循环一次都会进行线程私有运行时间ThreadState计数,总的时间统计就是以该线程私有统计信息为基准求和而来。
3.1.2 worker工作线程创建、销毁及task任务处理
worker工作线程在如下情况下创建或者销毁:1.线程池初始化;2. controler控制线程发现当前线程池中线程比较”忙”,则会动态创建新的工作线程;3.工作线程在while体中每循环一次都会判断当前线程池是否很”闲”,如果很”闲”则本线程直接销毁退出。
Worker工作线程创建核心源码实现如下:
1.Status ServiceExecutorAdaptive::start() {
2. invariant(!_isRunning.load());
3.
4. _isRunning.store(true);
5.
6. _controllerThread = stdx::thread(&ServiceExecutorAdaptive::_controllerThreadRoutine, this);
7.
8. for (auto i = 0; i < _config->reservedThreads(); i++) {
9.
10. _startWorkerThread();
11. }
12. return Status::OK();
}
worker工作线程默认初始化为CPU/2个,初始工作线程数也可以通过指定的命令行参数来配置:adaptiveServiceExecutorReservedThreads。此外,start()接口默认也会创建一个controler控制线程。
Task任务通过SSM状态机调用ServiceExecutorAdaptive::schedule()接口入队,该函数接口核心代码实现如下:
1.Status ServiceExecutorAdaptive::schedule(ServiceExecutorAdaptive::Task task, ScheduleFlags flags) {
2.
3. auto scheduleTime = _tickSource->getTicks();
4.
5.
6.
7. auto pendingCounterPtr = (flags & kDeferredTask) ? &_deferredTasksQueued : &_tasksQueued;
8.
9. pendingCounterPtr->addAndFetch(1);
10. ......
11.
12. auto wrappedTask = [ this, task = std::move(task), scheduleTime, pendingCounterPtr ] {
13.
14. pendingCounterPtr->subtractAndFetch(1);
15. auto start = _tickSource->getTicks();
16.
17.
18. _totalSpentQueued.addAndFetch(start - scheduleTime);
19.
20. if (_localThreadState->recursionDepth++ == 0) {
21.
22. _localThreadState->executing.markRunning();
23.
24. _threadsInUse.addAndFetch(1);
25. }
26.
27. const auto guard = MakeGuard([this, start] {
28.
29. if (--_localThreadState->recursionDepth == 0) {
30.
31.
32. _localThreadState->executingCurRun += _localThreadState->executing.markStopped();
33.
34. _threadsInUse.subtractAndFetch(1);
35. }
36.
37. _totalExecuted.addAndFetch(1);
38. });
39.
40. task();
41. };
42.
43. if ((flags & kMayRecurse) &&
44.
45. (_localThreadState->recursionDepth + 1 < _config->recursionLimit())) {
46.
47.
48. _ioContext->dispatch(std::move(wrappedTask));
49. } else {
50.
51. _ioContext->post(std::move(wrappedTask));
52. }
53.
54. _lastScheduleTimer.reset();
55.
56. _totalQueued.addAndFetch(1);
57.
58.
59. if (_isStarved() && !(flags & kDeferredTask)) {
60.
61. _scheduleCondition.notify_one();
62. }
63. return Status::OK();
}
从上面的分析可以看出,schedule()主要完成task任务入队处理。如果带有递归标识kMayRecurse,则通过_ioContext->dispatch()接口入队,该接口再ASIO底层实现的时候实际上没有真正把任务添加到全局队列,而是直接当前线程继续处理,这样就实现了递归调用。如果没有携带kMayRecurse递归标识,则task任务通过_ioContext->post()需要入队到全局队列。ASIO库的dispatch接口和post接口的具体实现可以参考:
<<Mongodb网络传输处理源码实现及性能调优-体验内核性能极致设计>>
如果任务入队到全局队列,则线程池中的worker线程就会通过全局锁竞争从队列中获取task任务执行,该流程通过如下接口实现:
1.
2.void ServiceExecutorAdaptive::_workerThreadRoutine(
3. int threadId, ServiceExecutorAdaptive::ThreadList::iterator state) {
4.
5. _localThreadState = &(*state);
6. {
7.
8. std::string threadName = str::stream() << "worker-" << threadId;
9. setThreadName(threadName);
10. }
11.
12.
13. bool stillPending = true;
14.
15.
16. const auto guard = MakeGuard([this, &stillPending, state] {
17.
18.
19. }
20. while (_isRunning.load()) {
21. ......
22.
23. state->executingCurRun = 0;
24. try {
25.
26. asio::io_context::work work(*_ioContext);
27.
28. state->running.markRunning();
29.
30.
31.
32. if (stillPending) {
33.
34. _ioContext->run_one_for(runTime.toSystemDuration());
35. } else {
36.
37. _ioContext->run_for(runTime.toSystemDuration());
38. }
39. ......
40. }
41.
42. if (stillPending) {
43. _threadsPending.subtractAndFetch(1);
44. stillPending = false;
45.
46. } else if (_threadsRunning.load() > _config->reservedThreads()) {
47.
48. double executingToRunning = state->executingCurRun / static_cast<double>(spentRunning);
49. executingToRunning *= 100;
50. dassert(executingToRunning <= 100);
51.
52. int pctExecuting = static_cast<int>(executingToRunning);
53.
54.
55. if (pctExecuting < _config->idlePctThreshold()) {
56. log() << "Thread was only executing tasks " << pctExecuting << "% over the last "
57. << runTime << ". Exiting thread.";
58. break;
59. }
60. }
61. }
62.}
线程主循环主要工作内容:1.从ASIO库的全局队列获取任务执行;2.判断本线程是否比较”闲”,如果是则直接销毁退出。3.线程创建起来进行初始线程名设置、线程主循环一些计数处理等。
3.2.3 controller控制线程核心代码实现
控制线程用于判断线程池是线程是否压力很大,是否比较”忙”,如果是则增加线程数来减轻全局队列中task任务积压引起的延迟处理问题。控制线程核心代码实现如下:
1.
2.void ServiceExecutorAdaptive::_controllerThreadRoutine() {
3.
4. setThreadName("worker-controller"_sd);
5. ......
6.
7. while (_isRunning.load()) {
8.
9. const auto timerResetGuard =
10. MakeGuard([&sinceLastControlRound] { sinceLastControlRound.reset(); });
11.
12. _scheduleCondition.wait_for(fakeLk, _config->stuckThreadTimeout().toSystemDuration());
13. ......
14. double utilizationPct;
15. {
16.
17. auto spentExecuting = _getThreadTimerTotal(ThreadTimer::Executing);
18.
19. auto spentRunning = _getThreadTimerTotal(ThreadTimer::Running);
20.
21.
22.
23. auto diffExecuting = spentExecuting - lastSpentExecuting;
24.
25.
26. auto diffRunning = spentRunning - lastSpentRunning;
27. if (spentRunning == 0 || diffRunning == 0)
28. utilizationPct = 0.0;
29. else {
30. lastSpentExecuting = spentExecuting;
31. lastSpentRunning = spentRunning;
32.
33.
34. utilizationPct = diffExecuting / static_cast<double>(diffRunning);
35. utilizationPct *= 100;
36. }
37. }
38.
39.
40. if (sinceLastControlRound.sinceStart() >= _config->stuckThreadTimeout()) {
41.
42. if ((_threadsInUse.load() == _threadsRunning.load()) &&
43. (sinceLastSchedule >= _config->stuckThreadTimeout())) {
44. log() << "Detected blocked worker threads, "
45. << "starting new reserve threads to unblock service executor";
46.
47.
48.
49. for (int i = 0; i < _config->reservedThreads(); i++) {
50.
51. _startWorkerThread();
52. }
53. }
54. continue;
55. }
56.
57. auto threadsRunning = _threadsRunning.load();
58.
59. if (threadsRunning < _config->reservedThreads()) {
60.
61. while (_threadsRunning.load() < _config->reservedThreads()) {
62. _startWorkerThread();
63. }
64. }
65.
66. if (utilizationPct < _config->idlePctThreshold()) {
67. continue;
68. }
69.
70.
71.
72.
73. do {
74. stdx::this_thread::sleep_for(_config->maxQueueLatency().toSystemDuration());
75. } while ((_threadsPending.load() > 0) &&
76. (sinceLastControlRound.sinceStart() < _config->stuckThreadTimeout()));
77.
78. if (_isStarved()) {
79. _startWorkerThread();
80. }
81. }
}
Mongodb服务层有个专门的控制线程用于判断线程池中工作线程的压力情况,以此来决定是否在线程池中创建新的工作线程来提升性能。
控制线程每过一定时间循环检查线程池中的线程压力状态,实现原理就是简单的实时记录线程池中的线程当前运行情况,为以下两类计数:总线程数_threadsRunning、
当前正在运行task任务的线程数_threadsInUse。如果_threadsRunning=_threadsRunning,说明所有工作线程当前都在处理task任务,这时候就会创建新的worker线程来减轻任务因为排队引起的延迟。
2.1.4 adaptive线程模型函数接口大全
前面只分析了核心的几个接口,下表列出了该模块的完整接口功能说明:
3. 总结
adaptive动态线程池模型,内核实现的时候会根据当前系统的访问负载动态的调整线程数。当线程CPU工作比较频繁的时候,控制线程增加工作线程数;当线程CPU比较空闲后,本线程就会自动消耗退出。下面一起体验adaptive线程模式下,mongodb是如何做到性能极致设计的。
3.1 synchronous同步线程模型总结
Sync线程模型也就是一个链接一个线程,实现比较简单。该线程模型,listener线程每接收到一个链接就会创建一个线程,该链接上的所有数据读写及内部请求处理流程将一直由本线程负责,整个线程的生命周期就是这个链接的生命周期。
3.2 adaptive线程模型worker线程运行时间相关的几个统计
3.6状态机调度模块中提到,一个完整的客户端请求处理可以转换为2个任务:通过asio库接收一个完整mongodb报文、接收到报文后的后续所有处理(含报文解析、认证、引擎层处理、发送数据给客户端等)。假设这两个任务对应的任务名、运行时间分别如下表所示:
客户端一次完整请求过程中,mongodb内部处理过程=task1 + task2,整个请求过程中mongodb内部消耗的时间T1+T2。
实际上如果fd上没有数据请求,则工作线程就会等待数据,等待数据的过程就相当于空闲时间,我们把这个时间定义为T3。于是一个工作线程总运行时间=内部任务处理时间+空闲等待时间,也就是线程总时间=T1+T2+T3,只是T3是无用等待时间。
步骤2中提到,线程运行总时间=T1 + T2 +T3,其中T3是无用等待时间。如果T3的无用等待时间占比很大,则说明线程比较空闲。
Mongodb工作线程每次运行完一次task任务后,都会判断本线程的有效运行时间占比,有效运行时间占比=(T1+T2)/(T1+T2+T3),如果有效运行时间占比小于某个阀值,则该线程自动退出销毁,该阀值由adaptiveServiceExecutorIdlePctThreshold参数指定。该参数在线调整方式:
db.adminCommand( { setParameter: 1, adaptiveServiceExecutorIdlePctThreshold: 50} )
Mongodb服务层有个专门的控制线程用于判断线程池中工作线程的压力情况,以此来决定是否在线程池中创建新的工作线程来提升性能。
控制线程每过一定时间循环检查线程池中的线程压力状态,实现原理就是简单的实时记录线程池中的线程当前运行情况,为以下两类计数:总线程数_threadsRunning、
当前正在运行task任务的线程数_threadsInUse。如果_threadsRunning=_threadsRunning,说明所有工作线程当前都在处理task任务,这时候已经没有多余线程去asio库中的全局任务队列op_queue_中取任务执行了,这时候队列中的任务就不会得到及时的执行,就会成为响应客户端请求的瓶颈点。
control控制线程会在收集线程池中所有工作线程的有效运行时间占比,如果占比小于指定配置的阀值,则代表整个线程池空闲。
前面已经说明一个线程的有效时间占比为:(T1+T2)/(T1+T2+T3),那么所有线程池中的线程总的有效时间占比计算方式如下:
所有线程的总有效时间TT1 = (线程池中工作线程1的有效时间T1+T2) + (线程池中工作线程2的有效时间T1+T2) + ..... + (线程池中工作线程n的有效时间T1+T2)
所有线程总运行时间TT2 = (线程池中工作线程1的有效时间T1+T2+T3) + (线程池中工作线程2的有效时间T1+T2+T3) + ..... + (线程池中工作线程n的有效时间T1+T2+T3)
线程池中所有线程的总有效工作时间占比= TT1/TT2
Mongodb在启动初始化的时候,会创建一个线程名为”worker-controller”的控制线程,该线程主要工作就是判断线程池中是否有充足的工作线程来处理asio库中全局队列op_queue_中的task任务,如果发现线程池比较忙,没有足够的线程来处理队列中的任务,则在线程池中动态增加线程来避免task任务在队列上排队等待。
1.control控制线程循环主体主要压力判断控制流程如下:
2.while {
3. #等待工作线程唤醒条件变量,最长等待stuckThreadTimeout
4. _scheduleCondition.wait_for(stuckThreadTimeout)
5. ......
6. #获取线程池中所有线程最近一次运行任务的总有效时间TT1
7. Executing = _getThreadTimerTotal(ThreadTimer::Executing);
8. #获取线程池中所有线程最近一次运行任务的总运行时间TT2
9. Running = _getThreadTimerTotal(ThreadTimer::Running);
10. #线程池中所有线程的总有效工作时间占比 = TT1/TT2
11. utilizationPct = Executing / Running;
12. ......
13. #代表control线程太久没有进行线程池压力检查了
14. if(本次循环到该行代码的时间 > stuckThreadTimeout阀值) {
15. #说明太久没做压力检查,造成工作线程不够用了
16. if(_threadsInUse == _threadsRunning) {
17. #批量创建一批工作线程
18. for(; i < reservedThreads; i++)
19. #创建工作线程
20. _startWorkerThread();
21. }
22. #control线程继续下一次循环压力检查
23. continue;
24. }
25. ......
26. #如果当前线程池中总线程数小于最小线程数配置
27. #则创建一批线程,保证最少工作线程数达到要求
28. if (threadsRunning < reservedThreads) {
29. while (_threadsRunning < reservedThreads) {
30. _startWorkerThread();
31. }
32. }
33. ......
34. #检查上一次循环到本次循环这段时间范围内线程池中线程的工作压力
35. #如果压力不大,则说明无需增加工作线程数,则继续下一次循环
36. if (utilizationPct < idlePctThreshold) {
37. continue;
38. }
39. ......
40. #如果发现已经有线程创建起来了,但是这些线程还没有运行任务
41. #这说明当前可用线程数可能足够了,我们休息sleep_for会儿在判断一下
42. #该循环最多持续stuckThreadTimeout时间
43. do {
44. stdx::this_thread::sleep_for();
45. } while ((_threadsPending.load() > 0) &&
46. (sinceLastControlRound.sinceStart() < stuckThreadTimeout)
47.
48. #如果tasksQueued队列中的任务数大于工作线程数,说明任务在排队了
49. #该扩容线程池中线程了
50. if (_isStarved()) {
51. _startWorkerThread();
52. }
53.}
本文分析的mongodb版本为3.6.1,其network.serviceExecutorTaskStats网络线程模型相关统计通过db.serverStatus().network.serviceExecutorTaskStats可以查看,如下图所示:
上图的几个信息功能可以分类为三大类,说明如下:
上表中各个字段的都有各自的意义,我们需要注意这些参数的以下情况:
1. threadsRunning - threadsInUse的差值越大说明线程池中线程比较空闲,差值越小说明压力越大
2. threadsPending越大,表示线程池越空闲
3. tasksQueued - totalExecuted的差值越大说明任务队列上等待执行的任务越多,说明任务积压现象越明显
4. deferredTasksQueued越大说明工作线程比较空闲,在等待客户端数据到来
5. totalTimeRunningMicros - totalTimeExecutingMicros差值越大说明越空闲
上面三个大类中的总体反映趋势都是一样的,任何一个差值越大就说明越空闲。
在后续mongodb最新版本中,去掉了部分重复统计的字段,同时也增加了以下字段,如下图所示:
新版本增加的几个统计项实际上和3.6.1大同小异,只是把状态机任务按照不通类型进行了更加详细的统计。新版本中,更重要的一个功能就是control线程在发现线程池压力过大的时候创建新线程的触发情况也进行了统计,这样我们就可以更加直观的查看动态创建的线程是因为什么原因创建的。
从步骤6中可以看出,control控制线程创建工作线程的第一个条件为:如果该线程超过stuckThreadTimeout阀值都没有做线程压力控制检查,并且线程池中线程数全部在处理任务队列中的任务,这种情况control线程一次性会创建reservedThreads个线程。reservedThreads由adaptiveServiceExecutorReservedThreads配置,如果没有配置,则采用初始值CPU/2。
那么问题来了,如果我提前通过命令行配置了这个值,并且这个值配置的非常大,例如一百万,这里岂不是要创建一百万个线程,这样会造成操作系统负载升高,更容易引起耗尽系统pid信息,这会引起严重的系统级问题。
不过,不用担心,最新版本的mongodb代码,内核代码已经做了限制,这种情况下创建的线程数变为了1,也就是这种情况只创建一个线程。
3.3 adaptive线程模型实时参数调优
动态线程模设计的时候,mongodb设计者考虑到了不通应用场景的情况,因此在核心关键点增加了实时在线参数调整设置,主要包含如下7种参数,如下表所示:
命令行实时参数调整方法如下,以adaptiveServiceExecutorReservedThreads为例,其他参数调整方法类似:db.adminCommand( { setParameter: 1, adaptiveServiceExecutorReservedThreads: xx} )
Mongodb服务层的adaptive动态线程模型设计代码实现非常优秀,有很多实现细节针对不同应用场景做了极致优化。
3.4不同线程模型性能多场景PK
详见:<<Mongodb网络传输处理源码实现及性能调优-体验内核性能极致设计>>
3.5 Asio网络库全局队列锁优化,性能进一步提升
通过<<Mongodb网络传输处理源码实现及性能调优-体验内核性能极致设计>>一文中的ASIO库实现和adaptive动态线程模型实现,可以看出为了获取全局任务队列上的任务,需要进行全局锁竞争,这实际上是整个线程池从队列获取任务运行最大的一个瓶颈。
优化思路:我们可以通过优化队列和锁来提升整体性能,当前的队列只有一个,我们可以把单个队列调整为多个队列,每个队列一把锁,任务入队的时候通过把链接session散列到多个队列,通过该优化,锁竞争及排队将会得到极大的改善。
优化前队列架构:
优化后队列架构:
如上图,把一个全局队列拆分为多个队列,任务入队的时候把session按照hash散列到各自的队列,工作线程获取任务的时候,同理通过hash的方式去对应的队列获取任务,通过这种方式减少锁竞争,同时提升整体性能。
由于篇幅原因,本文只分析了主要核心接口源码实现,更多接口的源码实现可以参考如下地址,详见:mongodb adaptive动态线程模型源码详细分析
评论