#include<stdio.h>#include<string.h>#include<stdlib.h>#include<pthread.h>//Linux编程使用线程相关的就会用到这个,编译的时候还要加-lpthread,用pthread动态库
#define LIST_INSERT(item,list) do{ \ //链表的增加 item->prev = NULL; \ item->next = list; \ if((list)!=NULL)(list)->prev=item; \ (list) = item; \}while(0)
#define LIST_REMOVE(item,list)do{ \ //链表的删除 if (item->prev != NULL) item->prev->next = item->next; \ if (item->next != NULL) item->next->prev = item->prev; \ if (item == list) list = item->next; \ item->prev = item->next = NULL; \}while(0)
struct nTask { //任务
void (*task_func)(struct nTask* task); void* user_data;
struct nTask* prev; struct nTask* next;};
struct nWorker { //工作 pthread_t threadid; int terminate; //用来判断工作是否结束,以便于终止 struct nManager* manager; //可以用来查看任务
struct nWorker* prev; struct nWorker* next;};
typedef struct nManager { //管理 struct nTask* tasks; struct nWorker* workers;
pthread_mutex_t mutex;//定义互斥锁,也可以用自旋锁 pthread_cond_t cond;//条件变量}ThreadPool;
//callback!=taskstatic void* nThreadPoolCallback(void* arg) {
struct nWorker* worker = (struct nWorker*)arg; //接受pthread_create传来的参数
while (1) //默认一直执行 { pthread_mutex_lock(&worker->manager->mutex);//加锁 while (worker->manager->tasks == NULL) { if (worker->terminate)break; pthread_cond_wait(&worker->manager->cond, &worker->manager->mutex);//如果没有任务就等待 }
if (worker->terminate) { pthread_mutex_unlock(&worker->manager->mutex);//解锁 break; }
struct nTask* task = worker->manager->tasks; LIST_REMOVE(task, worker->manager->tasks);
pthread_mutex_unlock(&worker->manager->mutex);
task->task_func(task); }
free(worker); return;}
//APIint nThreadPoolCreate(ThreadPool* pool, int numWorkers) {
if (pool == NULL) return -1; if (numWorkers < 1) numWorkers = 1;//如果任务数量小于1,就默认为1 memset(pool, 0, sizeof(ThreadPool));
pthread_cond_t blank_cond = PTHREAD_COND_INITIALIZER;//创建互斥量cond,静态全局 memcpy(&pool->cond, &blank_cond, sizeof(pthread_cond_t));
//pthread_mutex_init(&pool->mutex, NULL); pthread_mutex_t blank_mutex = PTHREAD_MUTEX_INITIALIZER;//创建互斥锁,静态全局 memcpy(&pool->mutex, &blank_mutex, sizeof(pthread_mutex_t));
int i = 0; for (i = 0; i < numWorkers; i++) { struct nWorker* worker = (struct nWorker*)malloc(sizeof(struct nWorker)); if (worker == NULL) { perror("malloc"); return -2; } memset(worker, 0, sizeof(struct nWorker)); worker->manager = pool;
int ret = pthread_create(&worker->threadid, NULL, nThreadPoolCallback, worker); if (ret) { perror("pthread_create"); free(worker); return -3; } LIST_INSERT(worker, pool->workers); }
return 0;}
//APIint nThreadPoolDestory(ThreadPool* pool, int nWorker) {
struct nWorker* worker = NULL;
for (worker = pool->workers; worker != NULL; worker = worker->next) { worker->terminate; }
pthread_mutex_lock(&pool->mutex);
pthread_cond_broadcast(&pool->cond);//唤醒所有线程
pthread_mutex_unlock(&pool->mutex);
pool->workers = NULL;//置空 pool->tasks = NULL;//置空
return 0;
}
//APIint nThreadPoolPushTask(ThreadPool* pool, struct nTask* task) {
pthread_mutex_lock(&pool->mutex);
LIST_INSERT(task, pool->tasks);
pthread_cond_signal(&pool->cond);//唤醒一个线程
pthread_mutex_unlock(&pool->mutex);
}
#if 1
#define THREADPOOL_INIT_COUNT 20#define TASK_INIT_SIZE 1000
void task_entry(struct nTask* task) {
int idx = *(int*)task->user_data;
printf("idx: %d\n", idx);
free(task->user_data); free(task);}
int main(void) {
ThreadPool pool = { 0 }; nThreadPoolCreate(&pool, THREADPOOL_INIT_COUNT);
int i = 0; for (i = 0; i < TASK_INIT_SIZE; i++) { struct nTask* task = (struct nTask*)malloc(sizeof(struct nTask)); if (task == NULL) { perror("malloc"); exit(1); } memset(task, 0, sizeof(struct nTask));
task->task_func = task_entry; task->user_data = malloc(sizeof(int)); *(int*)task->user_data = i;
nThreadPoolPushTask(&pool, task); }
getchar();//防止主线程提前结束任务
}
#endif
评论