
C++ 线程池

  • 2023-03-06
在做 C++开发的时候,线程池是一个经常需要使用的技术。通过它,可以充分利用多线程带来的好处,同时避免频繁创建和、销毁线程所带来的消耗。


线程池(英语:thread pool):一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络 sockets 等的数量。 例如,对于计算密集型任务,线程数一般取 cpu 数量+2 比较合适,线程数过多会导致额外的线程切换开销。

但是,在标准 C++库里,并未提供线程池的实现,于是,这世上就有了 n 种实现方式。

更糟糕的是,我又贡献了第 n+1 种。

期待以后的 C++ 2x 能统一实现一个版本。

Linux 老大 Linus Torvalds 说:"Talk is cheap. Show me the code."


// ThreadPool with C++ std::thread// Author: Yuchuan Wang// yuchuan.wang@gmail.com// 
#include <iostream>#include <atomic>#include <thread>#include <chrono>#include <mutex>#include <condition_variable>#include <functional>#include <vector>#include <queue>
// Function will be running inside thread poolusing ThreadTask = std::function<void()>;
class ThreadPool{public: // If threads_num is 0, it will use the same number of CPU cores // If tasks_num is -1, the number of tasks will be unlimited ThreadPool(int threads_num = 0, int tasks_num = -1) { if(threads_num == 0) { max_threads = std::thread::hardware_concurrency(); } else { max_threads = threads_num; } max_tasks = tasks_num; is_running = false; }
~ThreadPool() { WaitForStop(); }
// Add task to queue bool AddTask(ThreadTask task) { // Scope for lock { std::unique_lock<std::mutex> lock(tasks_guard); if(max_tasks == -1) { // Unlimited tasks.push(task); } else { if(tasks.size() >= max_tasks) { return false; } else { tasks.push(task); } } }
// Notify thread tasks_event.notify_one();
return true; }
// Start threads bool Start() { if(is_running) { // Running already return false; }
is_running = true; if(threads.empty()) { CreateThreads(); }
return true; }
void WaitForStop() { if(!is_running) { // I am not running return; }
is_running = false; tasks_event.notify_all(); for(auto &t : threads) { // Wait for all threads to exit t.join(); } threads.clear(); }
private: void CreateThreads() { for(int i = 0; i < max_threads; i++) { threads.push_back(std::thread(&ThreadPool::ThreadRoutine, this)); } }
// Thread worker function // Take task from queue, and run it static void ThreadRoutine(ThreadPool* ptr) { if(ptr == nullptr) { return; }
while(ptr->is_running || !ptr->tasks.empty()) { ThreadTask task; // Scope for lock { // Get task to run std::unique_lock<std::mutex> lock(ptr->tasks_guard); while(ptr->tasks.empty()) { // Wait until task is ready ptr->tasks_event.wait(lock); } // OK, now there is a task ready to run task = ptr->tasks.front(); ptr->tasks.pop(); } // Run it task(); } }
private: // Max threads allowed int max_threads; // Max tasks inside queue int max_tasks; // Vector of threads std::vector<std::thread> threads; // Queue of tasks std::queue<ThreadTask> tasks; // Flag of runnin status bool is_running; // Mutex to protect the tasks queue std::mutex tasks_guard; // Condition of tasks event std::condition_variable tasks_event; };


  1. using ThreadTask = std::function<void()>;

类似于以前的 typedef 用法,定义了一个 ThreadTask 的类型,代表 C++的函数对象。它会被放进线程池的任务队列里面。后续的各个线程,会从任务队列里面取出具体的任务(函数对象),然后执行这个任务。

  1. ThreadPool(int threads_num = 0, int tasks_num = -1)

初始化线程池。默认情况下,创建的线程数跟你系统的 CPU 核数一样,任务队列里的任务个数无限。

  1. bool AddTask(ThreadTask task)


  1. bool Start()

创建 threads_num 个线程,并开始运行。

  1. void WaitForStop()



#include "ThreadPool.h"
int product_sell = 0;void ProductCounter(std::mutex* task_protect){ //std::this_thread::sleep_for(std::chrono::seconds(1)); std::this_thread::sleep_for(std::chrono::microseconds(100));
std::lock_guard<std::mutex> lock(*task_protect); std::cout <<"How many products sell: " << product_sell++ << std::endl;}
int main(){ std::mutex protect_task; ThreadPool pool(0, -1); for(int i = 0; i < 100; i++) { pool.AddTask(std::bind(ProductCounter, &protect_task)); } pool.Start(); // Do more stuff... for(int i = 0; i < 50; i++) { pool.AddTask(std::bind(ProductCounter, &protect_task)); } pool.WaitForStop(); return 0;}

在这个小例子中,创建了一个线程池。先添加了 100 个任务,然后启动线程池。接着又添加了 50 个任务。线程池会以同样的线程个数(在我的电脑,有 8 个 core,所以默认是 8 个线程),依次把任务完成,最后退出。


How many products sell: 0
How many products sell: 1
How many products sell: 2
How many products sell: 3
How many products sell: 4
How many products sell: 5
How many products sell: 6
How many products sell: 7
How many products sell: 8
How many products sell: 9
...
How many products sell: 147
How many products sell: 148
How many products sell: 149





