写点什么

C/C++ 学习:C++ 并发与多线程

发布于: 2021 年 06 月 05 日

C++11 线程


hread/join/detach/joinable


  • thread::joinable()用于判断是否可以调用thread::join()或者thread::detach()

  • 调用thread::detach()后的线程在后台执行,被 C++运行时库接管,结束后由运行时库负责回收

  • thread::detach()陷阱:主线程结束了,引用或者指针在其他线程就会无效

  • 对于可调用对象,实际上是被拷贝到了其他线程,所以即使主线程的可调用对象析构了,其他线程仍能正常运行

免费订阅学习:c/c++Linux后台服务器开发高级架构师学习视频

线程传参/成员函数作线程函数


  • 传递简单类型参数是值传递,不是引用传递(即使使用了引用符号)

  • 如果传递类对象,应该避免隐式类型转换只要用临时构造的对象作为参数传递给线程,就能保证在主线程执行完毕前把线程的参数构造出来

  • 一般情况下不使用thread::detach(),避免局部变量失效带来线程对内存的非法引用问题

std::ref()   // 参数作引用传递// unique_ptr要使用std::move()
复制代码


#include <iostream>#include <thread>
using namespace std;
class Print {public: void operator()(int value) { cout << "operator value = " << value << endl; }
void func(int value) { cout << "func value = " << value << endl; }};
int main() { Print pt; thread t1(pt, 314); t1.join(); thread t2(&Print::func, pt, 314); t2.join(); return 0;}
复制代码


#include <iostream>#include <thread>#include <stdio.h>#include <unistd.h>#include <pthread.h>#include <sys/syscall.h>  
using namespace std;
#define gettid() syscall(__NR_gettid)void print() { printf("print::process id: %d\n", getpid()); printf("print::kernel id: %ld\n", gettid()); cout << "print::std thread id: " << std::this_thread::get_id() << endl; printf("print::pthread id: %lu\n", pthread_self());}
int main() { thread t(print); printf("print::process id: %d\n", getpid()); printf("kernel id: %ld\n", gettid()); cout << "print::std thread id: " << std::this_thread::get_id() << endl; printf("pthread id: %lu\n", pthread_self()); t.join(); return 0;}
复制代码

互斥量/死锁


  • mutex::lock()mutex::unlock()必须成对使用,mutex::lock()默认阻塞

  • std::lock_guard的原理很简单,std::lock_guard构造函数里执行了mutex::lock(),析构函数里执行了mutex::unlock()

  • 保证两个互斥量上锁的顺序一致就不会死锁

  • std::lock()可用于同时锁住两个互斥量(一般不用)

  • std::adopt_lock是个结构体对象,起一个标记作用,表示这个互斥量之前已经锁住,不需要在std::lock_guard构造函数里执行mutex::lock()

std::lock(mtx1, mtx2);std::lock_guard<std::mutex> lg1(mtx1, std::adopt_lock);std::lock_guard<std::mutex> lg2(mtx2, std::adopt_lock);
复制代码


// mutex example#include <iostream>       // std::cout#include <thread>         // std::thread#include <mutex>          // std::mutexstd::mutex mtx;           // mutex for critical sectionvoid print_block (int n, char c) {    // critical section (exclusive access to std::cout signaled by locking mtx):    mtx.lock();    for (int i=0; i<n; ++i) { std::cout << c; }    std::cout << '\n';    mtx.unlock();}int main (){    std::thread th1 (print_block,50,'*');    std::thread th2 (print_block,50,'$');    th1.join();    th2.join();    return 0;}
复制代码

unique_lock


std::unique_lockstd::lock_guard灵活,但效率差


std::unique_lock的第二个参数


  • std::adopt_lock:含义和std::lock_guard相同

  • std::try_to_lock:没有锁成功会立即返回

  • std::defer_lock:初始化一个没有加锁的std::mutex


std::unique_lock的成员函数


  • unique_lock::lock():体现了std::unique_lock的灵活性

  • unique_lock::unlock():体现了std::unique_lock的灵活性

  • unique_lock::try_lock():锁成功返回true,锁失败返回false

  • unique_lock::release():解绑std::unique_lockstd::mutex,返回std::mutex指针,如果解绑前std::mutex已加锁,需要手动解锁


锁的粒度:锁住的代码少,粒度细,执行效率高


std::unqiue_lock所有权的转移,把自己对std::mutex的所有权转移给其他std::unique_lock,不能复制


  • 使用std::move()

  • 从函数返回一个局部的std::unique_lock对象,返回局部对象会导致系统生成临时对象,并调用移动构造函数


文章福利 Linux 后端开发网络底层原理知识学习提升 点击 学习资料 获取,完善技术栈,内容知识点包括 Linux,Nginx,ZeroMQ,MySQL,Redis,线程池,MongoDB,ZK,Linux 内核,CDN,P2P,epoll,Docker,TCP/IP,协程,DPDK 等等。

单例模式/call once

template< class Callable, class... Args >void call_once( std::once_flag& flag, Callable&& f, Args&&... args );
复制代码

std::call_once()保证函数只被调用一次std::once_flag标记对应的函数是否被执行


condition_variable/wait/notify_one/notify_all


condition_variable::wait()如果第二个参数的 lambda 表达式返回false,则wait解锁互斥量并阻塞到本行,直到其他线程调用notify为止;如果第二个参数的 lambda 表达式返回true,则wait直接返回;如果没有第二个参数,相当于解锁互斥量并阻塞到本行,直到其他线程调用notify为止


其他线程调用notifywait唤醒后,wait不断尝试获取互斥锁,如果获取到了就继续执行


如果某个线程正在运行而不是阻塞在wait等待唤醒,那个另外一个线程的notify会没有效果


async/future/packaged_task/promise


std::async()用来启动一个异步任务,返回一个std::future对象

// async example#include <iostream>       // std::cout#include <future>         // std::async, std::future// a non-optimized way of checking for prime numbers:bool is_prime (int x) {    std::cout << "Calculating. Please, wait...\n";    for (int i=2; i<x; ++i) if (x%i==0) return false;    return true;}int main (){    // call is_prime(313222313) asynchronously:    std::future<bool> fut = std::async (is_prime,313222313);    std::cout << "Checking whether 313222313 is prime.\n";    // ...    bool ret = fut.get();      // waits for is_prime to return    if (ret) std::cout << "It is prime!\n";    else std::cout << "It is not prime.\n";    return 0;}
复制代码


std::future提供了一种访问异步操作结果的机制,std::future对象包含了线程入口函数的返回结果,可调用future::get()来获取结果


future::get()阻塞直到获得结果,只能调用一次future::wait()等待线程返回,不返回结果

template <class Fn, class... Args>                                                                           future<typename result_of<Fn(Args...)>::type> async (Fn&& fn, Args&&... args);
template <class Fn, class... Args> future<typename result_of<Fn(Args...)>::type> async (launch policy, Fn&& fn, Args&&... args);
复制代码
  • std::launch::deferred:表示线程入口函数调用被延迟到future::wait()或者future::get()调用时才执行


    如果future::wait()或者future::get()没被调用,则线程根本没有创建;延迟调用本质上没有创建新线程,由主线程调用线程入口函数

  • std::launch::async:创建新线程并立即执行

  • std::launch::async | std::launch::deferred:默认标记,由系统决定采用哪种运行方式


std::packaged_task的模板参数是各种可调用对象,通过std::packaged_task包装起来,作为线程入口函数


通过packaged_task::get_future()获取std::future对象


std::packaged_task对象也可以直接调用

// packaged_task example#include <iostream>     // std::cout#include <future>       // std::packaged_task, std::future#include <chrono>       // std::chrono::seconds#include <thread>       // std::thread, std::this_thread::sleep_for// count down taking a second for each value:int countdown (int from, int to) {	for (int i=from; i!=to; --i) {        std::cout << i << '\n';        std::this_thread::sleep_for(std::chrono::seconds(1));    }    std::cout << "Lift off!\n";    return from-to;}
int main () { std::packaged_task<int(int,int)> tsk (countdown); // set up packaged_task std::future<int> ret = tsk.get_future(); // get future std::thread th (std::move(tsk),10,0); // spawn thread to count down from 10 to 0 // ... int value = ret.get(); // wait for the task to finish and get result std::cout << "The countdown lasted for " << value << " seconds.\n"; th.join(); return 0;}
复制代码

std::promise能够在某个线程赋值,然后在其他线程中把这个值取出来


promise::set_value()绑定一个值,在将来某个时刻通过std::future绑定到这个promise上来得到这个值

// promise example#include <iostream>       // std::cout#include <functional>     // std::ref#include <thread>         // std::thread#include <future>         // std::promise, std::futurevoid print_int (std::future<int>& fut) {    int x = fut.get();    std::cout << "value: " << x << '\n';}int main (){    std::promise<int> prom;                      // create promise    std::future<int> fut = prom.get_future();    // engagement with future    std::thread th1 (print_int, std::ref(fut));  // send future to new thread    prom.set_value (10);                         // fulfill promise    // (synchronizes with getting the future)    th1.join();    return 0;}
复制代码

future 扩展/atomic


future::wait_for()返回std::future_status


  • timeout:表示线程还没执行完

  • ready:表示线程成功返回

  • deferred:用于async第一个参数设置为deferred


std::shared_future


future::get()设计是一个移动语义,所以只能使用一次,shared_future::get()是拷贝语义,可以使用多次


原子操作一般针对变量而不是代码段


一般原子操作针对++--+=-=是支持的,其他的可能不支持

#include <iostream> #include <thread> using namespace std; 
int cnt = 0; void func() { for(int i = 0; i < 10000; ++i) { __sync_fetch_and_add(&cnt, 1); } } int main() { thread t1(func); thread t2(func); t1.join(); t2.join(); cout << cnt << endl; return 0; }
复制代码


#include <iostream> #include <thread>#include <atomic>using namespace std; 
atomic<int> cnt;void func() { for(int i = 0; i < 10000; ++i) { cnt++; } } int main() { thread t1(func); thread t2(func); t1.join(); t2.join(); cout << cnt << endl; return 0; }
复制代码


atomic 拓展/async 拓展

// C++11提供了6种内存模型enum memory_order {    memory_order_relaxed,    memory_order_consume,    memory_order_acquire,    memory_order_release,    memory_order_acq_rel,    memory_order_seq_cst}
复制代码

std::async()std::thread区别


  • std::async()并不一定创建新线程

  • std::thread不容易拿到线程返回值


recursive_mutex/timed_mutex


std::recursive_mutex:递归的独占互斥锁


std::timed_mutex:带超时功能的互斥锁


补充


虚假唤醒:在 lambda 表达式中正确处理


atomic::load()以原子方式读取atomic对象的值;atomic::store()以原子方式写入atomic对象的值


POSIX 线程


线程概念


  • LWP(Light Weight Process)轻量级进程

  • 进程:拥有 PCB,独立地址空间

  • 线程:拥有 PCB,共享地址空间

  • 进程与线程的区别:是否共享地址空间

  • 线程:最小调度单位

  • 进程:最小分配资源单位


线程实现原理


从内核里看进程和线程是一样的,都有各自不同的 PCB,但是 PCB 中指向内存资源的三级页表是相同的

# 查看指定线程的LWP号,跟线程id不一样ps -Lf PID
复制代码

线程共享/非共享资源


  • 共享资源

文件描述符表

信号处理方式

当前工作目录

用户 ID 和组 ID

内存地址空间(不共享栈)

  • 非共享资源

线程 id

处理器现场和内核栈指针

独立的用户空间栈

errno 变量

信号屏蔽字

调度优先级


线程优缺点


  • 提高程序并发性

  • 开销小

  • 数据通信,共享数据方便

  • 调试,编写困难

  • 对信号支持不友好


API

int pthread_create(pthread_t *thread, const pthread_attr_t *attr,           void *(*start_routine) (void *), void *arg);// 参数1:传出参数,保存分配好的线程ID// 参数2:通常传NULL,表示使用线程默认属性// 参数3:函数指针// 参数4:线程主函数执行期间所使用的参数// 返回值:成功返回0,失败返回错误号
void pthread_exit(void *retval);// retval表示线程退出状态,通常传NULL
pthread_t pthread_self(void);// 获取线程ID:pthread_t类型,在linux下为无符号整数(%lu)// 线程ID是进程内部识别标志(和LWP号不一样),两个进程间线程ID允许相同
int pthread_join(pthread_t thread, void **retval);// 线程回收,retval接受线程退出状态
int pthread_detach(pthread_t thread);// 线程分离,线程与主控线程断开关系// 分离的线程不能用pthread_join()回收
int pthread_cancel(pthread_t thread);// 取消线程,线程的取消不是实时的,而有一定的延时,需要等待线程到达某个取消点// 取消点:检查线程是否被取消,通常是一些系统调用,例如open、read、write等// 执行man 7 pthreads可以查看具备这些取消点的系统调用列表
复制代码


线程属性设置分离

typedef struct {    int etachstate;                     // 线程的分离状态    int schedpolicy;                    // 线程调度策略    struct sched_param schedparam;      // 线程的调度参数    int inheritsched;                   // 线程的继承性    int scope;                          // 线程的作用域    size_t guardsize;                   // 线程栈末尾的警戒缓冲区大小    int stackaddr_set;                  // 线程的栈设置    void* stackaddr;                    // 线程栈的位置    size_t stacksize;                   // 线程栈的大小} pthread_attr_t;// ulimit -a查看用户栈的大小
int pthread_attr_init(pthread_attr_t *attr);int pthread_attr_destroy(pthread_attr_t *attr);
int pthread_attr_setdetachstate(pthread_attr_t *attr, int detachstate);// PTHREAD_CREATE_DETACHED,分离状态// PTHREAD_CREATE_JOINABLE,非分离状态// 如果设置一个线程为分离线程,而这个线程运行又非常快,它可能在pthread_create函数返回之前就终止了// 它终止以后就可能将线程号和系统资源移交给其他线程使用,这样调用pthread_create的线程就得到了错误的线程号
int pthread_attr_setstack(pthread_attr_t *attr, void *stackaddr, size_t stacksize);int pthread_attr_getstack(const pthread_attr_t *attr, void **stackaddr, size_t *stacksize);int pthread_attr_setstacksize(pthread_attr_t *attr, size_t stacksize);int pthread_attr_getstacksize(const pthread_attr_t attr, size_t stacksize);
复制代码

NPTL


从 Linux2.6 开始,glibc 采用了新的线程库 NPTL(Native POSIX Thread Library)


  • 线程库由两部分组成:内核的线程支持和用户的线程支持(glibc)

  • Linux 早期内核不支持线程时,glibc 就在库中(用户态)以纤程(用户态线程)的方式支持多线程了

  • POSIX thread 只要求了用户编程的调用接口,对内核接口没有要求。Linux 下线程的实现就是在内核支持的基础上以 POSIX thread 的方式对外封装了接口,所以才会有两个 ID


查看当前线程库的版本

getconf GNU_LIBPTHREAD_VERSION
复制代码

线程使用注意事项


  • 主线程退出其他线程不退出,主线程退出应调用pthread_exit

  • 避免僵尸线程


    pthread_join


    pthread_detach或者在pthread_create里指定分离属性

  • 避免在多线程引入信号机制


线程同步


协同步调,按预定的先后次序运行


多个控制流共同操作一个共享资源的情况,都需要同步


数据混乱原因


  • 资源共享(独享资源则不会)

  • 调度随机(意味着数据访问会出现竞争)

  • 线程间缺乏必要的同步机制


互斥量

pthread_mutex_t mutex;// 变量mutex只有两种取值1、0,mutex锁init成功时为1。lock将mutex--,unlock将mutex++int pthread_mutex_init(pthread_mutex_t * restrict mutex, const pthread_mutexattr_t * restrict attr);// 参数1:传出参数,调用时应传&mutex// 参数2:传入参数,互斥量属性,通常传NULLint pthread_mutex_destroy(pthread_mutex_t *mutex);
int pthread_mutex_lock(pthread_mutex_t *mutex);// lock尝试加锁,如果加锁不成功,线程阻塞,阻塞到持有该互斥量的其他线程解锁为止int pthread_mutex_trylock(pthread_mutex_t *mutex);// trylock加锁失败直接返回错误号,不阻塞int pthread_mutex_unlock(pthread_mutex_t *mutex);// unlock主动解锁,同时将阻塞在该锁上的所有线程全部唤醒,不确定哪个线程先被唤醒
复制代码

在访问共享资源前加锁,访问结束后立即解锁。锁的“粒度”应越小越好


静态初始化:如果 mutex 定义在全局或者是静态分配的,直接使用宏进行初始化

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
复制代码

动态初始化:局部变量使用pthread_mutex_init进行初始化


死锁产生


  • 线程试图对同一个互斥量加锁两次

  • 线程 1 拥有 A 锁,请求获得 B 锁;线程 2 拥有 B 锁,请求获得 A 锁(trylock 解决)


读写锁


写独占,读共享,写锁优先级高


  • 写模式加锁时,解锁前,所有对该锁加锁的线程都会被阻塞

  • 读模式加锁时,如果线程以读模式对其加锁会成功,如果线程以写模式加锁会阻塞

  • 读模式加锁时,既有尝试以写模式加锁的线程,也有尝试以读模式加锁的线程。那么读写锁会阻塞随后的读模式锁请求,优先满足写模式锁

pthread_rwlock_t rwlock = PTHREAD_RWLOCK_INITIALIZER;int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock, const pthread_rwlockattr_t *restrict attr);int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);int pthread_rwlock_tryrdlock(pthread_rwlock_t *rwlock);int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);int pthread_rwlock_trywrlock(pthread_rwlock_t *rwlock);int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);
复制代码

条件变量


通常和互斥锁配合使用

pthread_cond_t cond = PTHREAD_COND_INITIALIZER;int pthread_cond_init(pthread_cond_t *cond, pthread_condattr_t *cond_attr);int pthread_cond_destroy(pthread_cond_t *cond);int pthread_cond_signal(pthread_cond_t *cond);int pthread_cond_broadcast(pthread_cond_t *cond);int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);// 释放已经掌握的互斥锁,相当于pthread_mutex_unlock(&mutex)// 阻塞等待条件变量cond// 前两步是一个原子操作// 当被唤醒返回时,解除阻塞并重新申请获取互斥锁pthread_mutex_lock(&mutex);int pthread_cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex, const struct timespec *abstime);// abstime表示绝对时间
复制代码

互斥量+条件变量实现生产者消费者模型


信号量


进化版的互斥锁

sem_t sem;int sem_init(sem_t *sem, int pshared, unsigned int value);// pshared取0用于线程间,取1用于进程间// 信号量的初值决定了占用信号量的线程的个数int sem_destroy(sem_t *sem);int sem_wait(sem_t *sem);// 信号量大于0则减1,等于0则造成进程阻塞int sem_trywait(sem_t *sem);int sem_timedwait(sem_t *sem, const struct timespec *abs_timeout);int sem_post(sem_t *sem);// 信号量加1,同时唤醒阻塞在信号量上的线程
复制代码

信号量实现生产者消费者模型


进程间同步


进程间也能使用互斥量来达到同步的目的,在使用pthread_mutex_init初始化时修改其属性为进程间共享

pthread_mutexattr_t mattr;int pthread_mutexattr_init(pthread_mutexattr_t *attr);int pthread_mutexattr_destroy(pthread_mutexattr_t *attr);int pthread_mutexattr_setpshared(pthread_mutexattr_t *attr, int pshared);// PTHREAD_PROCESS_PRIVATE 线程锁// PTHREAD_PROCESS_SHARED 进程锁
复制代码

文件锁


借助 fcntl 实现文件锁


哲学家就餐问题


__thread(c++11 thread_local)


__thread 是 GCC 内置的线程局部存储设施(thread local storage),它的实现非常高效


__thread 变量是每个线程都有一份独立实体,各个线程的变量值互不干扰


除了这个主要用途,它还可以修饰那些值可能会变,带有全局性,但是又不值得用全局锁保护的变量


  • 只能用于修饰 POD 类型(Plain old data structure),不能修饰 class 类型

  • __thread 可以用于修饰全局变量、函数内的静态变量,但是不能用于修饰函数的局部变量或者 class 的普通成员变量

  • __thread 变量的初始化只能用编译期常量

用户头像

直奔腾讯去,一起学习:Q群654378476 2021.05.20 加入

我要学完第十代《Linux C/C++服务架构开发》知识体系里的内容,直奔腾讯去,一起学习:Q群654378476 系统学习免费课程:https://ke.qq.com/course/417774?flowToken=1033508

评论

发布
暂无评论
C/C++学习:C++并发与多线程