写点什么

Unix/Linux 编程: 网络编程之 线程池

用户头像
赖猫
关注
发布于: 2021 年 05 月 18 日

一,线程池的基本组成

  1. 工作线程队列

  2. 任务队列

  3. 线程管理器(封装了线程池的一些基本方法,创建,销毁,加入新任务等)


比如一个常用的网络服务,通常主循环用于处理接收与基本的 recv/send 操作。可是如果某些业务操作需要长时间的处理数据。


比如等待数据库查询结果,对数据进行编解码等业务逻辑事。为了不让主线程进行长时间的阻塞,从而引入了线程池。让线程池的工作队列来处理具体的业务。


主线程则继续处理网络连接。


应用:线程池是生产者与消费者模型的一个最典型的应用。最常见的 nginx 服务中就使用了线程池。



二、线程池的基本工作流程

比如主循环是一个服务器,主循环首先处理各种网络 IO,当需要实际处理业务数据的情况。将业务数据封装成“task(有些场合也叫消息)",并加入线程池的工作队列中就, 这个过程也叫做”生产“。

线程池中的工作线程会循环取出工作队列中的任务,并进行实际的业务处理,这个过程也叫”消费“。

1. task 的定义

比如对于一个任务可以采用以下的方式来定义:

// define the task nodetypedef void* (*callback)(void* arg);struct node_task {    callback func;   // call back which will be executed by the worker thread.    void* user_data;     struct node_task* prev; // link to the prv task    struct node_task* next; // link to the next task};
复制代码

其中回调函数则是会被工作线程具体调用的函数。

user_data 则是传递给回调函数处理的业务数据。

2. 线程队列的定义

// define the work node which will process the task.struct node_worker {    pthread_t tid;  // thread id.    int terminate;  // flag to indicate whether the worker would be terminate.     struct _threadPool* pool;    struct node_worker* prev;    struct node_worker* next;};
复制代码

线程队列就是一个双向链表数据结构,tid 队应了相应的工作线程

3. 线程池管理对象

// manager componentstruct _threadPool {    pthread_mutex_t mtx;    pthread_cond_t  cond;     struct node_task* tasks;    struct node_worker* workers;};
复制代码

线程池主要包含任务队列和工作队列,由于对任务队列中进行读取是一个并发的过程,即任务队列是”临界资源",因此需要引入 mutex 互斥锁,以及条件变量。

4. 线程池的基本操作

线程池支持 3 种基本操作:

// initialize the thread pool object.int nThreadPoolCreate(nThreadPool* pool, int numWorkers); // destory the thread pool.int nThreadPoolDestory(nThreadPool* pool); // insert task into the pool task queue.int nThreadPoolPushTask(nThreadPool* pool, struct node_task* task);
复制代码

分别是创建

销毁

以及插入任务

5. 线程池的工作原理

1)工作线程在任务队列为空的时候默认是处于休眠状态的。若退出标志位被设置为 1,则表示工作线程要退出。

2)主线程往线程池添加任务以后会调用 pthread_cond_signal ,这会唤醒一个工作线程,该线程同时会取得互斥锁,从任务队列中"取出"任务, 并释放锁。然后进行后续的任务处理。

3)销毁工作线程的时候,会释放各种资源,将工作线程退出标记设置为 1。并通过 pthread_cond_broadcast 唤醒所有工作线程.此时工作线程将判断并退出。

4)拓展应用:根据任务队列的数量可以动态的创建或者销毁线程。

整理了一些个人觉得比较好 Linux 服务器/架构师学习书籍、大厂面试题、和热门技术教学视频资料(资料包括 C/C++,Linux,golang 技术,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,TCP/IP,协程,DPDK,ffmpeg 等),免费分享有需要的可以自行添加群973961276


三、线程池的实现

/* File name: threadpool.c * Author: sesiria   2021-05-05 * A Implementation of thread pool. including task queue, worker queue, management component **/#include <stdio.h>#include <stdlib.h>#include <string.h>#include <errno.h>#include <unistd.h>#include <pthread.h>  // insert item into the list.#define LL_ADD(item, list)  do {        \    item->prev = NULL;                  \    item->next = list;                  \    if(list != NULL) list->prev = item; \    list = item;                        \} while(0) // remove item from list.#define LL_REMOVE(item, list) do {                            \    if(item->prev != NULL) item->prev->next = item->next;   \    if(item->next != NULL) item->next->prev = item->prev;   \    if(list == item)  list = item->next;                    \    item->prev = item->next = NULL;                         \} while(0) //===============start of declaration============== // define the task nodetypedef void* (*callback)(void* arg);struct node_task {    callback func;   // call back which will be executed by the worker thread.    void* user_data;     struct node_task* prev; // link to the prv task    struct node_task* next; // link to the next task}; // declaration at first.struct _threadPool; // define the work node which will process the task.struct node_worker {    pthread_t tid;  // thread id.    int terminate;  // flag to indicate whether the worker would be terminate.     struct _threadPool* pool;    struct node_worker* prev;    struct node_worker* next;}; // manager componentstruct _threadPool {    pthread_mutex_t mtx;    pthread_cond_t  cond;     struct node_task* tasks;    struct node_worker* workers;}; typedef struct _threadPool nThreadPool;//===============end of declaration============== //=============function definition===============// normal callback function for worker threadvoid* thread_callback(void* arg) {    if (arg == NULL) {        perror("Illegal parameters, and exit worker thread\n");        pthread_exit(NULL);    }    struct node_worker* worker = (struct node_worker*)arg;    nThreadPool* pool = worker->pool;    if (pool == NULL) {        perror("Illegal poll, and exit worker thread\n");        pthread_exit(NULL);    }     while (1) {        // the task queue is a critical resource.        pthread_mutex_lock(&pool->mtx);        while (pool->tasks == NULL && !worker->terminate) {            pthread_cond_wait(&pool->cond, &pool->mtx);        }         // the current worker has been tagged to be terminated.        if (worker->terminate) {            pthread_mutex_unlock(&pool->mtx);            // wake up other threads.            break;        }        // pick one task.        struct node_task* task = pool->tasks;        if (task != NULL) {            LL_REMOVE(task, pool->tasks); // remove task from queue.        }         pthread_mutex_unlock(&pool->mtx);        // execute the call back        task->func(task);        // should we free the current task?        free(task);    }    free(worker);    printf("thread ID: %lu exit!\n", pthread_self());    pthread_exit(NULL);} // initialize the thread pool object.int nThreadPoolCreate(nThreadPool* pool, int numWorkers) {    // check the paramters    if (pool == NULL || numWorkers < 0)        return -1; // illegal parameters.    memset(pool, 0, sizeof(nThreadPool));     // init mutex    pthread_mutex_t blank_mutex = PTHREAD_MUTEX_INITIALIZER;    memcpy(&pool->mtx, &blank_mutex, sizeof(pthread_mutex_t));     // init cond    pthread_cond_t blank_cond = PTHREAD_COND_INITIALIZER;    memcpy(&pool->cond, &blank_cond, sizeof(pthread_cond_t));     int i = 0;    // create workers    for (i = 0; i < numWorkers; i++) {        struct node_worker* worker = (struct node_worker*)malloc(sizeof(struct node_worker));        if (worker == NULL) {            perror("malloc");            return -1;        }        memset(worker, 0, sizeof(struct node_worker));        worker->pool = pool;         int ret = pthread_create(&worker->tid, NULL, thread_callback, worker);        if (ret) {            perror("pthread_create");            free(worker);            return -2;        }         // add the current worker into workers        LL_ADD(worker, pool->workers);    }} // destory the thread.int nThreadPoolDestory(nThreadPool* pool) {    // shut down all of the workers.    struct node_worker* worker = NULL;    for (worker = pool->workers; worker != NULL; worker = worker->next) {        worker->terminate = 1; // set terminate.    }     pthread_mutex_lock(&pool->mtx);     // clear the workers queue    // because each struct worker will be released by each thread call back function.    pool->workers = NULL;    // clear the task queue    struct node_task* task;    for (task = pool->tasks; task != NULL; task = task->next) {        free(task);    }    pool->tasks = NULL;     // wake up all of the threads.    pthread_cond_broadcast(&pool->cond);     pthread_mutex_unlock(&pool->mtx);} // insert task into the pool task queue.int nThreadPoolPushTask(nThreadPool* pool, struct node_task* task) {    pthread_mutex_lock(&pool->mtx);     LL_ADD(task, pool->tasks);    pthread_cond_signal(&pool->cond);    pthread_mutex_unlock(&pool->mtx);} //==========end function definition============#if 1       // debug #define MAX_THREADS      10#define COUNTER_SIZE    1000  void counter(struct node_task* task) {    int index = *(int*)task->user_data;    printf("index : %d, selfid : %lu\n", index, pthread_self());    free(task->user_data);    // the task will be released by the worker thread.} int main(int argc, char* argv[]) {    nThreadPool pool;     nThreadPoolCreate(&pool, MAX_THREADS);     int i = 0;    for (i = 0; i < COUNTER_SIZE; i++) {        struct node_task* task = (struct node_task*)malloc(sizeof(struct node_task));        if (task == NULL) {            perror("malloc");            exit(EXIT_FAILURE);        }         task->func = (callback)counter;        task->user_data = malloc(sizeof(int));        *(int*)task->user_data = i;        nThreadPoolPushTask(&pool, task);    }     printf("Press any key to destory the thread pool\n");    nThreadPoolDestory(&pool);    getchar();    exit(EXIT_SUCCESS);} #endif 
复制代码

测试代码运行结果:

所有工作线程都能正确退出。

用户头像

赖猫

关注

C/C++Linux服务器开发学习群960994558 2020.11.28 加入

纸上得来终觉浅,绝知此事要躬行

评论

发布
暂无评论
Unix/Linux 编程:网络编程之 线程池