#include<stdio.h>
#include<string.h>
#include<stdlib.h>
#include<pthread.h>//Linux编程使用线程相关的就会用到这个,编译的时候还要加-lpthread,用pthread动态库
#define LIST_INSERT(item,list) do{ \ //链表的增加
item->prev = NULL; \
item->next = list; \
if((list)!=NULL)(list)->prev=item; \
(list) = item; \
}while(0)
#define LIST_REMOVE(item,list)do{ \ //链表的删除
if (item->prev != NULL) item->prev->next = item->next; \
if (item->next != NULL) item->next->prev = item->prev; \
if (item == list) list = item->next; \
item->prev = item->next = NULL; \
}while(0)
struct nTask { //任务
void (*task_func)(struct nTask* task);
void* user_data;
struct nTask* prev;
struct nTask* next;
};
struct nWorker { //工作
pthread_t threadid;
int terminate; //用来判断工作是否结束,以便于终止
struct nManager* manager; //可以用来查看任务
struct nWorker* prev;
struct nWorker* next;
};
typedef struct nManager { //管理
struct nTask* tasks;
struct nWorker* workers;
pthread_mutex_t mutex;//定义互斥锁,也可以用自旋锁
pthread_cond_t cond;//条件变量
}ThreadPool;
//callback!=task
static void* nThreadPoolCallback(void* arg) {
struct nWorker* worker = (struct nWorker*)arg; //接受pthread_create传来的参数
while (1) //默认一直执行
{
pthread_mutex_lock(&worker->manager->mutex);//加锁
while (worker->manager->tasks == NULL) {
if (worker->terminate)break;
pthread_cond_wait(&worker->manager->cond, &worker->manager->mutex);//如果没有任务就等待
}
if (worker->terminate) {
pthread_mutex_unlock(&worker->manager->mutex);//解锁
break;
}
struct nTask* task = worker->manager->tasks;
LIST_REMOVE(task, worker->manager->tasks);
pthread_mutex_unlock(&worker->manager->mutex);
task->task_func(task);
}
free(worker);
return;
}
//API
int nThreadPoolCreate(ThreadPool* pool, int numWorkers) {
if (pool == NULL) return -1;
if (numWorkers < 1) numWorkers = 1;//如果任务数量小于1,就默认为1
memset(pool, 0, sizeof(ThreadPool));
pthread_cond_t blank_cond = PTHREAD_COND_INITIALIZER;//创建互斥量cond,静态全局
memcpy(&pool->cond, &blank_cond, sizeof(pthread_cond_t));
//pthread_mutex_init(&pool->mutex, NULL);
pthread_mutex_t blank_mutex = PTHREAD_MUTEX_INITIALIZER;//创建互斥锁,静态全局
memcpy(&pool->mutex, &blank_mutex, sizeof(pthread_mutex_t));
int i = 0;
for (i = 0; i < numWorkers; i++) {
struct nWorker* worker = (struct nWorker*)malloc(sizeof(struct nWorker));
if (worker == NULL) {
perror("malloc");
return -2;
}
memset(worker, 0, sizeof(struct nWorker));
worker->manager = pool;
int ret = pthread_create(&worker->threadid, NULL, nThreadPoolCallback, worker);
if (ret) {
perror("pthread_create");
free(worker);
return -3;
}
LIST_INSERT(worker, pool->workers);
}
return 0;
}
//API
int nThreadPoolDestory(ThreadPool* pool, int nWorker) {
struct nWorker* worker = NULL;
for (worker = pool->workers; worker != NULL; worker = worker->next) {
worker->terminate;
}
pthread_mutex_lock(&pool->mutex);
pthread_cond_broadcast(&pool->cond);//唤醒所有线程
pthread_mutex_unlock(&pool->mutex);
pool->workers = NULL;//置空
pool->tasks = NULL;//置空
return 0;
}
//API
int nThreadPoolPushTask(ThreadPool* pool, struct nTask* task) {
pthread_mutex_lock(&pool->mutex);
LIST_INSERT(task, pool->tasks);
pthread_cond_signal(&pool->cond);//唤醒一个线程
pthread_mutex_unlock(&pool->mutex);
}
#if 1
#define THREADPOOL_INIT_COUNT 20
#define TASK_INIT_SIZE 1000
void task_entry(struct nTask* task) {
int idx = *(int*)task->user_data;
printf("idx: %d\n", idx);
free(task->user_data);
free(task);
}
int main(void) {
ThreadPool pool = { 0 };
nThreadPoolCreate(&pool, THREADPOOL_INIT_COUNT);
int i = 0;
for (i = 0; i < TASK_INIT_SIZE; i++) {
struct nTask* task = (struct nTask*)malloc(sizeof(struct nTask));
if (task == NULL) {
perror("malloc");
exit(1);
}
memset(task, 0, sizeof(struct nTask));
task->task_func = task_entry;
task->user_data = malloc(sizeof(int));
*(int*)task->user_data = i;
nThreadPoolPushTask(&pool, task);
}
getchar();//防止主线程提前结束任务
}
#endif
评论