Unix/Linux 编程: 网络编程之 线程池
一,线程池的基本组成
工作线程队列
任务队列
线程管理器(封装了线程池的一些基本方法,创建,销毁,加入新任务等)
比如一个常用的网络服务,通常主循环用于处理接收与基本的 recv/send 操作。可是如果某些业务操作需要长时间的处理数据。
比如等待数据库查询结果,对数据进行编解码等业务逻辑事。为了不让主线程进行长时间的阻塞,从而引入了线程池。让线程池的工作队列来处理具体的业务。
主线程则继续处理网络连接。
应用:线程池是生产者与消费者模型的一个最典型的应用。最常见的 nginx 服务中就使用了线程池。
二、线程池的基本工作流程
比如主循环是一个服务器,主循环首先处理各种网络 IO,当需要实际处理业务数据的情况。将业务数据封装成“task(有些场合也叫消息)",并加入线程池的工作队列中就, 这个过程也叫做”生产“。
线程池中的工作线程会循环取出工作队列中的任务,并进行实际的业务处理,这个过程也叫”消费“。
1. task 的定义
比如对于一个任务可以采用以下的方式来定义:
// define the task nodetypedef void* (*callback)(void* arg);struct node_task { callback func; // call back which will be executed by the worker thread. void* user_data; struct node_task* prev; // link to the prv task struct node_task* next; // link to the next task};其中回调函数则是会被工作线程具体调用的函数。
user_data 则是传递给回调函数处理的业务数据。
2. 线程队列的定义
// define the work node which will process the task.struct node_worker { pthread_t tid; // thread id. int terminate; // flag to indicate whether the worker would be terminate. struct _threadPool* pool; struct node_worker* prev; struct node_worker* next;};线程队列就是一个双向链表数据结构,tid 队应了相应的工作线程
3. 线程池管理对象
// manager componentstruct _threadPool { pthread_mutex_t mtx; pthread_cond_t cond; struct node_task* tasks; struct node_worker* workers;};线程池主要包含任务队列和工作队列,由于对任务队列中进行读取是一个并发的过程,即任务队列是”临界资源",因此需要引入 mutex 互斥锁,以及条件变量。
4. 线程池的基本操作
线程池支持 3 种基本操作:
// initialize the thread pool object.int nThreadPoolCreate(nThreadPool* pool, int numWorkers); // destory the thread pool.int nThreadPoolDestory(nThreadPool* pool); // insert task into the pool task queue.int nThreadPoolPushTask(nThreadPool* pool, struct node_task* task);分别是创建
销毁
以及插入任务
5. 线程池的工作原理
1)工作线程在任务队列为空的时候默认是处于休眠状态的。若退出标志位被设置为 1,则表示工作线程要退出。
2)主线程往线程池添加任务以后会调用 pthread_cond_signal ,这会唤醒一个工作线程,该线程同时会取得互斥锁,从任务队列中"取出"任务, 并释放锁。然后进行后续的任务处理。
3)销毁工作线程的时候,会释放各种资源,将工作线程退出标记设置为 1。并通过 pthread_cond_broadcast 唤醒所有工作线程.此时工作线程将判断并退出。
4)拓展应用:根据任务队列的数量可以动态的创建或者销毁线程。
整理了一些个人觉得比较好 Linux 服务器/架构师学习书籍、大厂面试题、和热门技术教学视频资料(资料包括 C/C++,Linux,golang 技术,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,TCP/IP,协程,DPDK,ffmpeg 等),免费分享有需要的可以自行添加群973961276
三、线程池的实现
/* File name: threadpool.c * Author: sesiria 2021-05-05 * A Implementation of thread pool. including task queue, worker queue, management component **/#include <stdio.h>#include <stdlib.h>#include <string.h>#include <errno.h>#include <unistd.h>#include <pthread.h> // insert item into the list.#define LL_ADD(item, list) do { \ item->prev = NULL; \ item->next = list; \ if(list != NULL) list->prev = item; \ list = item; \} while(0) // remove item from list.#define LL_REMOVE(item, list) do { \ if(item->prev != NULL) item->prev->next = item->next; \ if(item->next != NULL) item->next->prev = item->prev; \ if(list == item) list = item->next; \ item->prev = item->next = NULL; \} while(0) //===============start of declaration============== // define the task nodetypedef void* (*callback)(void* arg);struct node_task { callback func; // call back which will be executed by the worker thread. void* user_data; struct node_task* prev; // link to the prv task struct node_task* next; // link to the next task}; // declaration at first.struct _threadPool; // define the work node which will process the task.struct node_worker { pthread_t tid; // thread id. int terminate; // flag to indicate whether the worker would be terminate. struct _threadPool* pool; struct node_worker* prev; struct node_worker* next;}; // manager componentstruct _threadPool { pthread_mutex_t mtx; pthread_cond_t cond; struct node_task* tasks; struct node_worker* workers;}; typedef struct _threadPool nThreadPool;//===============end of declaration============== //=============function definition===============// normal callback function for worker threadvoid* thread_callback(void* arg) { if (arg == NULL) { perror("Illegal parameters, and exit worker thread\n"); pthread_exit(NULL); } struct node_worker* worker = (struct node_worker*)arg; nThreadPool* pool = worker->pool; if (pool == NULL) { perror("Illegal poll, and exit worker thread\n"); pthread_exit(NULL); } while (1) { // the task queue is a critical resource. pthread_mutex_lock(&pool->mtx); while (pool->tasks == NULL && !worker->terminate) { pthread_cond_wait(&pool->cond, &pool->mtx); } // the current worker has been tagged to be terminated. if (worker->terminate) { pthread_mutex_unlock(&pool->mtx); // wake up other threads. break; } // pick one task. struct node_task* task = pool->tasks; if (task != NULL) { LL_REMOVE(task, pool->tasks); // remove task from queue. } pthread_mutex_unlock(&pool->mtx); // execute the call back task->func(task); // should we free the current task? free(task); } free(worker); printf("thread ID: %lu exit!\n", pthread_self()); pthread_exit(NULL);} // initialize the thread pool object.int nThreadPoolCreate(nThreadPool* pool, int numWorkers) { // check the paramters if (pool == NULL || numWorkers < 0) return -1; // illegal parameters. memset(pool, 0, sizeof(nThreadPool)); // init mutex pthread_mutex_t blank_mutex = PTHREAD_MUTEX_INITIALIZER; memcpy(&pool->mtx, &blank_mutex, sizeof(pthread_mutex_t)); // init cond pthread_cond_t blank_cond = PTHREAD_COND_INITIALIZER; memcpy(&pool->cond, &blank_cond, sizeof(pthread_cond_t)); int i = 0; // create workers for (i = 0; i < numWorkers; i++) { struct node_worker* worker = (struct node_worker*)malloc(sizeof(struct node_worker)); if (worker == NULL) { perror("malloc"); return -1; } memset(worker, 0, sizeof(struct node_worker)); worker->pool = pool; int ret = pthread_create(&worker->tid, NULL, thread_callback, worker); if (ret) { perror("pthread_create"); free(worker); return -2; } // add the current worker into workers LL_ADD(worker, pool->workers); }} // destory the thread.int nThreadPoolDestory(nThreadPool* pool) { // shut down all of the workers. struct node_worker* worker = NULL; for (worker = pool->workers; worker != NULL; worker = worker->next) { worker->terminate = 1; // set terminate. } pthread_mutex_lock(&pool->mtx); // clear the workers queue // because each struct worker will be released by each thread call back function. pool->workers = NULL; // clear the task queue struct node_task* task; for (task = pool->tasks; task != NULL; task = task->next) { free(task); } pool->tasks = NULL; // wake up all of the threads. pthread_cond_broadcast(&pool->cond); pthread_mutex_unlock(&pool->mtx);} // insert task into the pool task queue.int nThreadPoolPushTask(nThreadPool* pool, struct node_task* task) { pthread_mutex_lock(&pool->mtx); LL_ADD(task, pool->tasks); pthread_cond_signal(&pool->cond); pthread_mutex_unlock(&pool->mtx);} //==========end function definition============#if 1 // debug #define MAX_THREADS 10#define COUNTER_SIZE 1000 void counter(struct node_task* task) { int index = *(int*)task->user_data; printf("index : %d, selfid : %lu\n", index, pthread_self()); free(task->user_data); // the task will be released by the worker thread.} int main(int argc, char* argv[]) { nThreadPool pool; nThreadPoolCreate(&pool, MAX_THREADS); int i = 0; for (i = 0; i < COUNTER_SIZE; i++) { struct node_task* task = (struct node_task*)malloc(sizeof(struct node_task)); if (task == NULL) { perror("malloc"); exit(EXIT_FAILURE); } task->func = (callback)counter; task->user_data = malloc(sizeof(int)); *(int*)task->user_data = i; nThreadPoolPushTask(&pool, task); } printf("Press any key to destory the thread pool\n"); nThreadPoolDestory(&pool); getchar(); exit(EXIT_SUCCESS);} #endif 测试代码运行结果:
所有工作线程都能正确退出。
赖猫
C/C++Linux服务器开发学习群960994558 2020.11.28 加入
纸上得来终觉浅,绝知此事要躬行











评论