写点什么

Linux 线程 - 生产消费模型 / 线程池

作者:可口也可樂
  • 2022-10-23
    湖南
  • 本文字数:13220 字

    阅读完需:约 43 分钟

Linux线程-生产消费模型/线程池

@TOC

零、前言

本章主要讲解学习 Linux 线程章节的后一部分,主要介绍生产消费者模型以及信号量的学习

一、生产消费者模型

  • 什么是生产消费者模型:


  1. 三种关系:生产者和生产者(互斥关系);消费者和消费者(互斥关系);生产者和消费者(互斥关系、同步关系)

  2. 两种角色:生产者和消费者(通常由进程或线程构成)

  3. 一个交易场所:通常指的是内存中的一段缓冲区,或者某种数据的组织方式

  4. 主要过程:生产者将生产的数据或者任务放入到交易场所中,消费者从交易产所拿取数据或者任务


  • 生产者和生产者/消费者和消费者/生产者和消费者的互斥关系:


交易产所是被多个生产者和消费者共同所见的,即被多个执行流同时访问,为了避免交易产所中的数据混乱,我们需要将该临界资源用互斥锁保护起来。其中,所有的生产者和消费者都会竞争式的申请锁,因此生产者和生产者、消费者和消费者、生产者和消费者之间都存在互斥关系


  • 生产者和消费者之间的同步关系:


  1. 由于交易产所的容量有限,如果让生产者一直生产,那中当生产者生产的数据将空间塞满后,生产者再频繁访问交易场所也会是没有实际的效用;如果让消费者一直消费,那么当容器当中的数据被消费完后,消费者再频繁的访问交易产所也会是没有实际的效用

  2. 虽然这样没什么问题,但是不合理,是非常低效的。我们需要应该让生产者和消费者访问交易产所按照一定的顺序,当没有数据时,让消费者等待,生产者进行生产;当容量满了,让生产者等待,消费者进行消费

注:互斥关系保证的是数据的访问正常,而同步关系是为了让多线程(生产和消费者)之间协同起来


  • 为何要使用生产者消费者模型:


  1. 生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题

  2. 生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取

  3. 阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力,这个阻塞队列就是用来给生产者和消费者解耦的


  • 生产者消费者模型优点:


  1. 解耦(阻塞队列的作用)

  2. 支持并发(解耦后,消费和生产可以各自运行)

  3. 支持忙闲不均


  • 示图:


二、阻塞队列生产消费模型

  • BlockingQueue:


  1. 在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构

  2. 其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)


  • 示图:



  • 示例:单生产者-单消费者


main.cc:#include "BlockQueue.hpp"#include<cstdio>#include<ctime>#include<unistd.h>#define NUM 5void* consumer(void* c){    blockqueue<int>* bq=(blockqueue<int>*)c;    while(true)    {         //取出数据        int t=0;        bq->Get(&t);        std::cout<<"consumer["<<pthread_self()<<"]:生产数据->"<<t<<std::endl;    }    return nullptr;}void* producter(void* p){    srand((unsigned)time(nullptr));    blockqueue<int>* bq=(blockqueue<int>*)p;    while(true)    {        //生成数据和放入数据        int t=rand()%100+1;        bq->Put(t);        std::cout<<"producter["<<pthread_self()<<"]:消费数据->"<<t<<std::endl;        sleep(1);    }    return nullptr;}int main(){    blockqueue<int>* bq=new blockqueue<int>(NUM);    pthread_t cid,pid;    pthread_create(&cid,nullptr,consumer,(void*)bq);    pthread_create(&pid,nullptr,producter,(void*)bq);    pthread_join(cid,nullptr);    pthread_join(pid,nullptr);    delete bq;}BlockQueue.hpp:#include <iostream>#include <queue>#include <pthread.h>template<class T>class blockqueue{private:    int _cap;//容量    std::queue<T> _q;//交易场所-临界资源    //同步与互斥    pthread_cond_t _data;    pthread_cond_t _space;    pthread_mutex_t _lock;public:    blockqueue(int cap):_cap(cap)    {        pthread_cond_init(&_data,nullptr);        pthread_cond_init(&_space,nullptr);        pthread_mutex_init(&_lock,nullptr);    }    bool IsFull()    {        return _q.size()==_cap;    }    bool IsEmpty()    {        return _q.size()==0;    }    //获取数据    void Get(T* out)    {        pthread_mutex_lock(&_lock);        while(IsEmpty())//唤醒后还需检测        {            pthread_cond_wait(&_data,&_lock);//同步        }        //有数据        *out=_q.front();        _q.pop();        if(_q.size()<=_cap/2)//空间过半            pthread_cond_signal(&_space);//唤醒该条件变量下的线程        pthread_mutex_unlock(&_lock);    }    //放数据    void Put(const T& in)    {        pthread_mutex_lock(&_lock);        while(IsFull())        {            pthread_cond_wait(&_space,&_lock);        }        _q.push(in);        if(_q.size()>=_cap/2)//数据过半        pthread_cond_signal(&_data);//有数据,唤醒该条件变量下的等待线程        pthread_mutex_unlock(&_lock);    }
~blockqueue() { pthread_cond_destroy(&_data); pthread_cond_destroy(&_space); pthread_mutex_destroy(&_lock); }};
复制代码


  • 效果:



  • 注意:


  1. 对于单生产者、单消费者的生产者消费者模型只需要维护生产者和消费者之间的同步与互斥关系;阻塞队列是会被生产者和消费者同时访问的临界资源,进行访问时需要申请互斥锁

  2. 生产者线程要向阻塞队列当中 Push 数据,前提是阻塞队列里面有空间,若阻塞队列已经满了,那么此时该生产者线程就需要进行等待,直到阻塞队列中有空间时再将其唤醒;消费者线程要从阻塞队列当中 Pop 数据,前提是阻塞队列里面有数据,若阻塞队列为空,那么此时该消费者线程就需要进行等待,直到阻塞队列中有新的数据时再将其唤醒

  3. 因此在这里我们需要用到两个条件变量进行描述临界资源的状态,一个条件变量用来描述队列是否有空间,另一个条件变量用来描述是否有数据:当阻塞队列满了的时候,要进行生产的生产者线程就应该在 space 条件变量下进行等待;当阻塞队列为空的时候,要进行消费的消费者线程就应该在 data 条件变量下进行等待;当放入数据时就可以进行唤醒 data 下等待的线程,当取出数据是就可以唤醒 space 下等待的线程

  4. 不论是生产者线程还是消费者线程,它们都是先申请到锁进入临界区后再判断是否满足生产或消费条件的,如果对应条件不满足,那么对应线程就会被挂起。但此时该线程是拿着锁的,为了避免死锁问题,在调用pthread_cond_wait函数时就需要传入当前线程手中的互斥锁,此时当该线程被挂起时就会自动释放手中的互斥锁,而当该线程被唤醒时又会自动获取到该互斥锁


  • 示例:多生产者-多消费者


main.cc:#include "BlockQueue.hpp"#include<cstdio>#include<ctime>#include<unistd.h>
#define NUM 10
void* consumer(void* c){ blockqueue<int>* bq=(blockqueue<int>*)c; while(true) { //取出数据 int t=0; bq->Get(&t); std::cout<<"consumer["<<pthread_self()<<"]:生产数据->"<<t<<std::endl; } return nullptr;}
void* producter(void* p){ srand((unsigned)time(nullptr)); blockqueue<int>* bq=(blockqueue<int>*)p; while(true) { //生成数据和放入数据 int t=rand()%100+1; bq->Put(t); std::cout<<"producter["<<pthread_self()<<"]:消费数据->"<<t<<std::endl; sleep(1); } return nullptr;}int main(){ blockqueue<int>* bq=new blockqueue<int>(NUM); pthread_t cid1,cid2,cid3,cid4,pid1,pid2; pthread_create(&cid1,nullptr,consumer,(void*)bq); pthread_create(&cid2,nullptr,consumer,(void*)bq); pthread_create(&cid3,nullptr,consumer,(void*)bq); pthread_create(&cid4,nullptr,consumer,(void*)bq); pthread_create(&pid1,nullptr,producter,(void*)bq); pthread_create(&pid2,nullptr,producter,(void*)bq);
pthread_join(cid1,nullptr); pthread_join(cid2,nullptr); pthread_join(cid3,nullptr); pthread_join(cid4,nullptr); pthread_join(pid1,nullptr); pthread_join(pid2,nullptr); delete bq;}BlockQueue.hpp:#include <iostream>#include <queue>#include <pthread.h>
template<class T>class blockqueue{private: int _cap;//容量 std::queue<T> _q;//交易场所-临界资源 //同步与互斥 pthread_cond_t _data; pthread_cond_t _space; pthread_mutex_t _lock;//生产-消费的互斥 pthread_mutex_t c_lock;//消费之间互斥 pthread_mutex_t p_lock;//生产之间互斥
public: blockqueue(int cap):_cap(cap) { pthread_cond_init(&_data,nullptr); pthread_cond_init(&_space,nullptr); pthread_mutex_init(&_lock,nullptr); pthread_mutex_init(&c_lock,nullptr); pthread_mutex_init(&p_lock,nullptr); } bool IsFull() { return _q.size()==_cap; } bool IsEmpty() { return _q.size()==0; } //获取数据 void Get(T* out) { pthread_mutex_lock(&c_lock); pthread_mutex_lock(&_lock); while(IsEmpty())//唤醒后还需检测 { pthread_cond_wait(&_data,&_lock);//同步 } //有数据 *out=_q.front(); _q.pop(); if(_q.size()<=_cap/2)//空间过半 pthread_cond_signal(&_space);//唤醒该条件变量下的线程 pthread_mutex_unlock(&_lock); pthread_mutex_unlock(&c_lock); } //放数据 void Put(const T& in) { pthread_mutex_lock(&p_lock); pthread_mutex_lock(&_lock); while(IsFull()) { pthread_cond_wait(&_space,&_lock); } _q.push(in); if(_q.size()>=_cap/2)//数据过半 pthread_cond_signal(&_data);//有数据,唤醒该条件变量下的等待线程 pthread_mutex_unlock(&_lock); pthread_mutex_unlock(&p_lock); }
~blockqueue() { pthread_cond_destroy(&_data); pthread_cond_destroy(&_space); pthread_mutex_destroy(&_lock); pthread_mutex_destroy(&c_lock); pthread_mutex_destroy(&p_lock); }};
复制代码


注:当然可以只使用一把互斥锁,这里为了更好表示生产与消费者们之间的关系使用了三个互斥锁


  • 效果:


三、环形队列生产消费模型

  • 概念:


环形队列采用数组模拟,用模运算来模拟环状特性


  • 示图:



  • 注意:


  1. 在这个生产消费者模型中,我们依旧是需要维护三个关系:生产者和生产者(互斥关系);消费者和消费者(互斥关系);生产者和消费者(互斥关系、同步关系)

  2. 对于生产和消费两个角色,我们使用两个变量记录下标位置;而这下标变量对于生产者们和消费者们各自来说是临界资源,对下标变量的操作并不是原子的,需要使用互斥锁进行保护

  3. 对于生产者关注的是队列的空间资源,而消费者关注的是数据资源;对于生产和消费之间,使用信号量进行描述空间和数据资源的数量,维护生产和消费的同步与互斥

  4. 申请信号量是申请资源的使用的权限,相当于预定;申请互斥锁保证多线程的生产者或者消费者之间访问的安全,相当于进行排队


  • 示例:


RingQueue.hpp:#include<iostream>#include<semaphore.h>#include<pthread.h>#include<vector>
#define NUM 10
template<class T>class RingQueue{private: int _cap;//容量 std::vector<T> _rq;//临界资源 //保证消费者和生产者之间的同步与互斥关系 sem_t _space;//描述空间资源 sem_t _data;//描述数据资源 //记录下标位置-同样也是一种临界资源 int c_index; int p_index; //保证多线程下消费者之间和生产者之间的互斥关系(使用的下标是临界资源,并且对下标的操作不是原子的) pthread_mutex_t c_lock; pthread_mutex_t p_lock;public: RingQueue(int cap=NUM):_cap(cap),_rq(cap) { sem_init(&_space,0,_cap); sem_init(&_data,0,0); pthread_mutex_init(&c_lock,nullptr); pthread_mutex_init(&p_lock,nullptr); } ~RingQueue() { sem_destroy(&_space); sem_destroy(&_data); pthread_mutex_destroy(&c_lock); pthread_mutex_destroy(&p_lock); } void Put(const T& in) { sem_wait(&_space);//申请空间资源-预定 pthread_mutex_lock(&c_lock);//生产者之间竞争互斥锁-排队 _rq[c_index++]=in; c_index%=_cap; sem_post(&_data);//发布数据资源 pthread_mutex_unlock(&c_lock);//解锁 } void Get(T* out) { sem_wait(&_data);//申请数据资源 pthread_mutex_lock(&p_lock);//消费者竞争互斥锁-排队 *out=_rq[p_index++]; p_index%=_cap; sem_post(&_space);//发布空间资源 pthread_mutex_unlock(&p_lock);//解锁 }};main.cc:#include "RingQueue.hpp"#include<cstdio>#include<ctime>#include<unistd.h>
class Task {private: int _cnt;public: Task(){} Task(int cnt):_cnt(cnt){} int Run() { int sum=0; for(int i=1;i<=_cnt;i++) sum+=i; return sum; } void Show() { printf("Thread:%p is running...Task done: counting from 1 to %d",pthread_self(),_cnt); }};void* consumer(void* c){ RingQueue<Task>* rq=(RingQueue<Task>*)c; while(true) { //取出数据+计算结果 Task t; rq->Get(&t); int ret=t.Run(); std::cout<<"consumer->"; t.Show(); printf(" ret=%d\n",ret); sleep(1); } return nullptr;}
void* producter(void* p){ srand((unsigned)time(nullptr)); RingQueue<Task>* rq=(RingQueue<Task>*) p; while(true) { //生成数据和放入数据 int t=rand()%100+1; rq->Put(Task(t)); std::cout<<"producter->Thread:"<<pthread_self()<<" is running...Task made: count from 1 to "<<t<<std::endl; //sleep(1); } return nullptr;}int main(){ RingQueue<int>* rq=new RingQueue<int>(NUM); pthread_t cid1,cid2,cid3,cid4,pid1,pid2; pthread_create(&cid1,nullptr,consumer,(void*)rq); pthread_create(&cid2,nullptr,consumer,(void*)rq); pthread_create(&cid3,nullptr,consumer,(void*)rq); pthread_create(&cid4,nullptr,consumer,(void*)rq); pthread_create(&pid1,nullptr,producter,(void*)rq); pthread_create(&pid2,nullptr,producter,(void*)rq);
pthread_join(cid1,nullptr); pthread_join(cid2,nullptr); pthread_join(cid3,nullptr); pthread_join(cid4,nullptr); pthread_join(pid1,nullptr); pthread_join(pid2,nullptr); delete rq;}
复制代码


  • 效果:


四、线程池 threadpool

  • 线程池概念:


  1. 线程池是一种线程使用模式

  2. 线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价

  3. 线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络 sockets 等的数量


  • 线程池的应用场景:


  1. 需要大量的线程来完成任务,且完成任务的时间比较短


示例:WEB 服务器完成网页请求这样的任务,使用线程池技术是非常合适的,因为单个任务小,而任务数量巨大;但对于长时间的任务,比如一个 Telnet 连接请求,线程池的优点就不明显了,因为 Telnet 会话时间比线程的创建时间大多了


  1. 对性能要求苛刻的应用,但不至于使服务器因此产生大量线程的应用


示例:要求服务器迅速响应客户请求接受突发性的大量请求,突发性大量客户请求,在没有线程池情况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,短时间内产生大量线程可能使内存到达极限,出现错误


  • 线程池示例:创建固定数量线程池,循环从任务队列中获取任务对象,获取到任务对象后,执行任务对象中的任务接口


ThreadPool.hpp:#pragma once #include<pthread.h>#include<iostream>#include<queue>
#define NUM 10
template<class T>class ThreadPool{private: int _num;//线程数量 std::queue<T> task_queue;//任务阻塞队列->临界资源 //线程同步与互斥 pthread_mutex_t _lock; pthread_cond_t _cond;public: //封装接口便于传入的对象调用 void Lock() { pthread_mutex_lock(&_lock); } void UnLock() { pthread_mutex_unlock(&_lock); } void CondWait() { pthread_cond_wait(&_cond,&_lock); } void CondWake() { pthread_cond_signal(&_cond);
} bool IsEmpty() { return task_queue.size()==0; }public: ThreadPool(int num):_num(num) { pthread_mutex_init(&_lock,nullptr); pthread_cond_init(&_cond,nullptr); } ~ThreadPool() { pthread_mutex_destroy(&_lock); pthread_cond_destroy(&_cond); } void InitThreadPool() { pthread_t tid; for(int i=0;i < _num;i++)//创建一批线程 { pthread_create(&tid,nullptr,Routine,this);//传入线程池地址 pthread_detach(tid);//线程分离 } } //线程执行例程 static void* Routine(void* arg) { ThreadPool* tp=(ThreadPool*)arg; //检测是否有任务-没任务等待;有任务进行获取并执行 while(true) { //检查 while(tp->IsEmpty()) tp->CondWait(); //取出任务 T task; tp->Get(&task); task();//执行逻辑-仿函数 } } void Put(const T& t) { Lock(); task_queue.push(t); CondWake();//有任务进行唤醒 UnLock(); } void Get(T* t) { //取任务已经是获取到锁资源了 *t=task_queue.front(); task_queue.pop(); }};main.cc:#include "ThreadPool.hpp"#include<cstdio>#include<ctime>#include<unistd.h>class Task {private: int x; int y; char op;public: Task(){} Task(int _x,int _y,char _op):x(_x),y(_y),op(_op){} void operator()() { Handler(); } void Handler() { int res; switch(op) { case '+': res=x+y; break; case '-': res=x-y; break; case '*': res=x*y; break; case '/': res=x/y; break; default: break; } printf("thread:%p done the task: %d %c %d = %d\n",pthread_self(),x,op,y,res); }};int main(){ srand((unsigned)time(nullptr)); ThreadPool<Task>* tp=new ThreadPool<Task>(NUM); tp->InitThreadPool(); char ch[]="+-*/"; while(true) { int x=rand()%1000+1; int y=rand()%1000+1; int op=ch[rand()%4]; tp->Put(Task(x,y,op)); sleep(1); } delete tp;}
复制代码


  • 效果:



  • 为什么设置例程函数为静态:


  1. 例程函数需要设置成静态的成员函数,因为线程创建的执行函数的类型是返回值和参数都是 void *,对于普通成员函数来说,每个函数的参数列表都带有一个 this 指针类型,参数类型不一致,所以设置成静态成员

  2. 由于静态成员函数只能调用静态属性的成员或者通过对象调用的方式访问内部方法,由此创建线程池后将线程池对象的地址传入线程执行函数的参数中,便于在例程中直接使用对象进行调用函数进行访问任务队列

  3. 多线程在访问任务队列时需要维护同步与互斥,所以需要使用条件变量与互斥锁接口,为了更方便在静态例程函数中使用条件变量和互斥锁,我们需要进一步封装接口便于调用

五、线程安全的单例模式

  • 设计模式的概念:


设计模式(Design Pattern)是一套被反复使用、多数人知晓的、经过分类的、代码设计经验的总结


  • 使用设计模式的目的:


为了代码可重用性、让代码更容易被他人理解、保证代码可靠性;设计模式使代码编写真正工程化


  • 单例模式:


一个类只能创建一个对象,即单例模式,该模式可以保证系统中该类只有一个实例,并提供一个访问它的全局访问点,该实例被所有程序模块共享


  • 比如:


在某个服务器程序中,该服务器的配置信息存放在一个文件中,这些配置数据由一个单例对象统一读取,然后服务进程中的其他对象再通过这个单例对象获取这些配置信息,这种方式简化了在复杂环境下的配置管理


  • 单例模式有两种实现模式:


饿汉模式和懒汉模式

1、饿汉模式

当程序启动时就创建一个唯一的实例对象


  • 示例代码:


template <typename T>class Singleton{public:    static Singleton& GetInstance()//获取实例地址    {        return _s;    }private:    Singleton();//构造私有化,禁止随意构造    //delete拷贝构造和赋值函数,防拷贝赋值    Singleton(const Singleton& s) = delete;    Singleton& operator=(const Singleton& s) = delete;
static T _s;//在类里的变量都是声明,在cpp文件中进行定义};
复制代码


  • 解释:


类里面的成员变量只是声明,而静态成员对象需要在类外进行定义,并且不能在.h 文件中定义,如果多个.cpp 文件包含该头文件,那么则会报重复定义的错误


  • 优势:


实现简单


  • 劣势:


  1. 如果单例对象构造十分耗时或者占用很多资源,比如加载插件啊, 初始化网络连接啊,读取文件啊等等,而有可能该对象程序运行时不会用到,那么也要在程序一开始就进行初始化,就会导致程序启动时非常的缓慢

  2. 对于多个单例类的如果具有依赖关系的话,则无法进行控制定义顺序(静态变量)

2、懒汉模式

懒汉模式则是需要的时候在第一次调用的时候进行创建


  • 示例代码:


template <typename T>class Singleton{public:    //提供获取对象以及释放对象的静态方法    static Singleton& GetInstance()    {        //提高效率,避免多次锁住及解锁        if (_s == nullptr)        {            //保证线程安全            _m.lock();//锁住            if (_s == nullptr)            {                _s = new Singleton;            }            _m.unlock();//解锁        }        return *_s;    }    static void DelInstance()    {        //提高效率,避免多次锁住及解锁        if (_s != nullptr)//设置volatile关键字避免被编译器优化        {            //保证线程安全            _m.lock();//锁住            if (_s != nullptr)            {                delete _s;                _s = nullptr;            }            _m.unlock();//解锁        }    }    vector<int> _v;private:    Singleton() {};//要有函数体,否则只是声明,当new的时候找不到对应的实体    Singleton(const Singleton& s) = delete;    Singleton& operator=(const Singleton& s) = delete;    volatile static T* _s;//储存实例对象地址    static mutex _m;//互斥锁};
复制代码


  • 解释:


对于懒汉模式需要注意的是要保证线程安全,当多个进行调用 GetInstance()/DelInstance()时,可能多次进行 new 和 delete,可能造成数据的丢失


  • 优势:


无启动负载;可以自由控制多个单例类的定义顺序


  • 劣势:


实现复杂


  • 注意事项:


  1. 加锁解锁的位置

  2. 双重 if 判定,避免不必要的锁竞争

  3. volatile 关键字防止过度优化


  • 单例模式的线程池:


ThreadPool.hpp:#pragma once #include<pthread.h>#include<iostream>#include<queue>
#define NUM 10
template<class T>class ThreadPool{private: std::queue<T> task_queue;//任务阻塞队列->临界资源 //线程同步与互斥 pthread_mutex_t _lock; pthread_cond_t _cond;private: static ThreadPool<T>* _inst; //避免构造调用 ThreadPool() { pthread_mutex_init(&_lock,nullptr); pthread_cond_init(&_cond,nullptr); } //防拷贝构造 ThreadPool(const ThreadPool<T>&)=delete; ThreadPool<T>& operator=(const ThreadPool<T>&)=delete;public: //封装接口便于传入的对象调用 void Lock() { pthread_mutex_lock(&_lock); } void UnLock() { pthread_mutex_unlock(&_lock); } void CondWait() { pthread_cond_wait(&_cond,&_lock); } void CondWake() { pthread_cond_signal(&_cond);
} bool IsEmpty() { return task_queue.size()==0; }public: static ThreadPool<T>* GetInstance() { //静态变量,全局只有一个,只在最开始初始化 static pthread_mutex_t mtx=PTHREAD_MUTEX_INITIALIZER;//静态初始化 if(_inst==nullptr) { pthread_mutex_lock(&mtx); if(_inst==nullptr) { _inst=new ThreadPool<T>(); } pthread_mutex_unlock(&mtx); } return _inst; } ~ThreadPool() { pthread_mutex_destroy(&_lock); pthread_cond_destroy(&_cond); } void InitThreadPool(int num) { pthread_t tid; for(int i=0;i < num;i++)//创建一批线程 { pthread_create(&tid,nullptr,Routine,this);//传入线程池地址 pthread_detach(tid);//线程分离 } } //线程执行例程 static void* Routine(void* arg) { ThreadPool* tp=(ThreadPool*)arg; //检测是否有任务-没任务等待;有任务进行获取并执行 while(true) { //检查 while(tp->IsEmpty()) tp->CondWait(); //取出任务 T task; tp->Get(&task); task();//执行逻辑-仿函数 } } void Put(const T& t) { Lock(); task_queue.push(t); CondWake();//有任务进行唤醒 UnLock(); } void Get(T* t) { //取任务已经是获取到锁资源了 *t=task_queue.front(); task_queue.pop(); }};//静态成员初始化template<class T>ThreadPool<T>* ThreadPool<T>::_inst=nullptr;main.cc:#include "ThreadPool.hpp"#include<cstdio>#include<ctime>#include<unistd.h>
class Task {private: int x; int y; char op;public: Task(){} Task(int _x,int _y,char _op):x(_x),y(_y),op(_op){} void operator()() { Handler(); } void Handler() { int res; switch(op) { case '+': res=x+y; break; case '-': res=x-y; break; case '*': res=x*y; break; case '/': res=x/y; break; default: break; } printf("thread:%p done the task: %d %c %d = %d\n",pthread_self(),x,op,y,res); }};
int main(){ srand((unsigned)time(nullptr)); ThreadPool<Task>* tp=ThreadPool<Task>::GetInstance(); tp->InitThreadPool(NUM); char ch[]="+-*/"; while(true) { int x=rand()%1000+1; int y=rand()%1000+1; int op=ch[rand()%4]; tp->Put(Task(x,y,op)); sleep(1); } delete tp;}
复制代码


  • 效果:


六、STL 智能指针和线程安全

STL 中的容器不是线程安全的


  • 原因:


  1. STL 的设计初衷是将性能挖掘到极致,而一旦涉及到加锁保证线程安全,会对性能造成巨大的影响而且对于不同的容器,加锁方式的不同,性能可能也不同(例如 hash 表的锁表和锁桶),因此 STL 默认不是线程安全

  2. 如果需要在多线程环境下使用,往往需要调用者自行保证线程安全


  • 智能指针是否是线程安全的:


  1. 对于 unique_ptr, 由于只是在当前代码块范围内生效, 因此不涉及线程安全问题

  2. 对于 shared_ptr, 多个对象需要共用一个引用计数变量, 所以会存在线程安全问题. 但是标准库实现的时候考虑到了这个问题, 基于原子操作(CAS)的方式保证 shared_ptr 能够高效, 原子的操作引用计数

七、其他常见的各种锁

  1. 悲观锁:在每次取数据时,总是担心数据会被其他线程修改,所以会在取数据前先加锁(读锁,写锁,行锁等),当其他线程想要访问数据时,被阻塞挂起

  2. 乐观锁:每次取数据时候,总是乐观的认为数据不会被其他线程修改,因此不上锁。但是在更新数据前,会判断其他数据在更新前有没有对数据进行修改。主要采用两种方式:版本号机制和 CAS 操作

  3. CAS 操作:当需要更新数据时,判断当前内存值和之前取得的值是否相等。如果相等则用新值更新。若不等则失败,失败则重试,一般是一个自旋的过程,即不断重试

  4. 自旋锁:对于占用互斥锁的时间长短来决定是否选择使用自旋锁,如果占用锁时间长那么不用自旋锁,让线程进行挂起等待就好;如果占用时间短,使用自旋锁进行间断性获取锁申请,也就是自旋

八、读者写者问题

  • 读写锁概念:


  1. 在编写多线程的时候,有一种情况是十分常见的。那就是,有些公共数据修改的机会比较少。相比较改写,它们读的机会反而高的多

  2. 通常而言,在读的过程中,往往伴随着查找的操作,中间耗时很长。给这种代码段加锁,会极大地降低我们程序的效率。读写锁可以专门处理这种多读少写的情况

  • 示图:



注:写独占,读共享,读锁优先级高


  • 读写锁接口:


  1. 设置读写优先:


int pthread_rwlockattr_setkind_np(pthread_rwlockattr_t *attr, int pref);
复制代码


  • pref 共有的选择 :


PTHREAD_RWLOCK_PREFER_READER_NP (默认设置) 读者优先,可能会导致写者饥饿情况PTHREAD_RWLOCK_PREFER_WRITER_NP 写者优先,目前有 BUG,导致表现行为和PTHREAD_RWLOCK_PREFER_READER_NP 一致PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP 写者优先,但写者不能递归加锁
复制代码


注:读者优先:读者和写着一起来,优先读者进入临界区;写者优先:写者来了之后,等之前的读者出临界区,后面来的读者进行等待


  1. 初始化:


int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock,const pthread_rwlockattr_t *restrict attr);
复制代码


  1. 销毁:


int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);
复制代码


  1. 加锁和解锁:


int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);
复制代码


发布于: 刚刚阅读数: 3
用户头像

还未添加个人签名 2022-04-28 加入

还未添加个人简介

评论

发布
暂无评论
Linux线程-生产消费模型/线程池_Linux_可口也可樂_InfoQ写作社区