C++11 线程
hread/join/detach/joinable
thread::joinable()
用于判断是否可以调用thread::join()
或者thread::detach()
调用thread::detach()
后的线程在后台执行,被 C++运行时库接管,结束后由运行时库负责回收
thread::detach()
陷阱:主线程结束了,引用或者指针在其他线程就会无效
对于可调用对象,实际上是被拷贝到了其他线程,所以即使主线程的可调用对象析构了,其他线程仍能正常运行
免费订阅学习:c/c++Linux后台服务器开发高级架构师学习视频
线程传参/成员函数作线程函数
传递简单类型参数是值传递,不是引用传递(即使使用了引用符号)
如果传递类对象,应该避免隐式类型转换,只要用临时构造的对象作为参数传递给线程,就能保证在主线程执行完毕前把线程的参数构造出来
一般情况下不使用thread::detach()
,避免局部变量失效带来线程对内存的非法引用问题
std::ref() // 参数作引用传递
// unique_ptr要使用std::move()
复制代码
#include <iostream>
#include <thread>
using namespace std;
class Print {
public:
void operator()(int value) {
cout << "operator value = " << value << endl;
}
void func(int value) {
cout << "func value = " << value << endl;
}
};
int main() {
Print pt;
thread t1(pt, 314);
t1.join();
thread t2(&Print::func, pt, 314);
t2.join();
return 0;
}
复制代码
#include <iostream>
#include <thread>
#include <stdio.h>
#include <unistd.h>
#include <pthread.h>
#include <sys/syscall.h>
using namespace std;
#define gettid() syscall(__NR_gettid)
void print() {
printf("print::process id: %d\n", getpid());
printf("print::kernel id: %ld\n", gettid());
cout << "print::std thread id: " << std::this_thread::get_id() << endl;
printf("print::pthread id: %lu\n", pthread_self());
}
int main() {
thread t(print);
printf("print::process id: %d\n", getpid());
printf("kernel id: %ld\n", gettid());
cout << "print::std thread id: " << std::this_thread::get_id() << endl;
printf("pthread id: %lu\n", pthread_self());
t.join();
return 0;
}
复制代码
互斥量/死锁
mutex::lock()
和mutex::unlock()
必须成对使用,mutex::lock()
默认阻塞
std::lock_guard
的原理很简单,std::lock_guard
构造函数里执行了mutex::lock()
,析构函数里执行了mutex::unlock()
保证两个互斥量上锁的顺序一致就不会死锁
std::lock()
可用于同时锁住两个互斥量(一般不用)
std::adopt_lock
是个结构体对象,起一个标记作用,表示这个互斥量之前已经锁住,不需要在std::lock_guard
构造函数里执行mutex::lock()
std::lock(mtx1, mtx2);
std::lock_guard<std::mutex> lg1(mtx1, std::adopt_lock);
std::lock_guard<std::mutex> lg2(mtx2, std::adopt_lock);
复制代码
// mutex example
#include <iostream> // std::cout
#include <thread> // std::thread
#include <mutex> // std::mutex
std::mutex mtx; // mutex for critical section
void print_block (int n, char c) {
// critical section (exclusive access to std::cout signaled by locking mtx):
mtx.lock();
for (int i=0; i<n; ++i) { std::cout << c; }
std::cout << '\n';
mtx.unlock();
}
int main ()
{
std::thread th1 (print_block,50,'*');
std::thread th2 (print_block,50,'$');
th1.join();
th2.join();
return 0;
}
复制代码
unique_lock
std::unique_lock
比std::lock_guard
灵活,但效率差
std::unique_lock
的第二个参数
std::adopt_lock
:含义和std::lock_guard
相同
std::try_to_lock
:没有锁成功会立即返回
std::defer_lock
:初始化一个没有加锁的std::mutex
std::unique_lock
的成员函数
unique_lock::lock()
:体现了std::unique_lock
的灵活性
unique_lock::unlock()
:体现了std::unique_lock
的灵活性
unique_lock::try_lock()
:锁成功返回true
,锁失败返回false
unique_lock::release()
:解绑std::unique_lock
和std::mutex
,返回std::mutex
指针,如果解绑前std::mutex
已加锁,需要手动解锁
锁的粒度:锁住的代码少,粒度细,执行效率高
std::unqiue_lock
所有权的转移,把自己对std::mutex
的所有权转移给其他std::unique_lock
,不能复制
文章福利 Linux 后端开发网络底层原理知识学习提升 点击 学习资料 获取,完善技术栈,内容知识点包括 Linux,Nginx,ZeroMQ,MySQL,Redis,线程池,MongoDB,ZK,Linux 内核,CDN,P2P,epoll,Docker,TCP/IP,协程,DPDK 等等。
单例模式/call once
template< class Callable, class... Args >
void call_once( std::once_flag& flag, Callable&& f, Args&&... args );
复制代码
std::call_once()
保证函数只被调用一次,std::once_flag
标记对应的函数是否被执行
condition_variable/wait/notify_one/notify_all
condition_variable::wait()
如果第二个参数的 lambda 表达式返回false
,则wait
解锁互斥量并阻塞到本行,直到其他线程调用notify
为止;如果第二个参数的 lambda 表达式返回true
,则wait
直接返回;如果没有第二个参数,相当于解锁互斥量并阻塞到本行,直到其他线程调用notify
为止
其他线程调用notify
将wait
唤醒后,wait
不断尝试获取互斥锁,如果获取到了就继续执行
如果某个线程正在运行而不是阻塞在wait
等待唤醒,那个另外一个线程的notify
会没有效果
async/future/packaged_task/promise
std::async()
用来启动一个异步任务,返回一个std::future
对象
// async example
#include <iostream> // std::cout
#include <future> // std::async, std::future
// a non-optimized way of checking for prime numbers:
bool is_prime (int x) {
std::cout << "Calculating. Please, wait...\n";
for (int i=2; i<x; ++i) if (x%i==0) return false;
return true;
}
int main ()
{
// call is_prime(313222313) asynchronously:
std::future<bool> fut = std::async (is_prime,313222313);
std::cout << "Checking whether 313222313 is prime.\n";
// ...
bool ret = fut.get(); // waits for is_prime to return
if (ret) std::cout << "It is prime!\n";
else std::cout << "It is not prime.\n";
return 0;
}
复制代码
std::future
提供了一种访问异步操作结果的机制,std::future
对象包含了线程入口函数的返回结果,可调用future::get()
来获取结果
future::get()
阻塞直到获得结果,只能调用一次;future::wait()
等待线程返回,不返回结果
template <class Fn, class... Args> future<typename result_of<Fn(Args...)>::type> async (Fn&& fn, Args&&... args);
template <class Fn, class... Args> future<typename result_of<Fn(Args...)>::type> async (launch policy, Fn&& fn, Args&&... args);
复制代码
std::launch::deferred
:表示线程入口函数调用被延迟到future::wait()
或者future::get()
调用时才执行
如果future::wait()
或者future::get()
没被调用,则线程根本没有创建;延迟调用本质上没有创建新线程,由主线程调用线程入口函数
std::launch::async
:创建新线程并立即执行
std::launch::async | std::launch::deferred
:默认标记,由系统决定采用哪种运行方式
std::packaged_task
的模板参数是各种可调用对象,通过std::packaged_task
包装起来,作为线程入口函数
通过packaged_task::get_future()
获取std::future
对象
std::packaged_task
对象也可以直接调用
// packaged_task example
#include <iostream> // std::cout
#include <future> // std::packaged_task, std::future
#include <chrono> // std::chrono::seconds
#include <thread> // std::thread, std::this_thread::sleep_for
// count down taking a second for each value:
int countdown (int from, int to) {
for (int i=from; i!=to; --i) {
std::cout << i << '\n';
std::this_thread::sleep_for(std::chrono::seconds(1));
}
std::cout << "Lift off!\n";
return from-to;
}
int main () {
std::packaged_task<int(int,int)> tsk (countdown); // set up packaged_task
std::future<int> ret = tsk.get_future(); // get future
std::thread th (std::move(tsk),10,0); // spawn thread to count down from 10 to 0
// ...
int value = ret.get(); // wait for the task to finish and get result
std::cout << "The countdown lasted for " << value << " seconds.\n";
th.join();
return 0;
}
复制代码
std::promise
能够在某个线程赋值,然后在其他线程中把这个值取出来
promise::set_value()
绑定一个值,在将来某个时刻通过std::future
绑定到这个promise
上来得到这个值
// promise example
#include <iostream> // std::cout
#include <functional> // std::ref
#include <thread> // std::thread
#include <future> // std::promise, std::future
void print_int (std::future<int>& fut) {
int x = fut.get();
std::cout << "value: " << x << '\n';
}
int main ()
{
std::promise<int> prom; // create promise
std::future<int> fut = prom.get_future(); // engagement with future
std::thread th1 (print_int, std::ref(fut)); // send future to new thread
prom.set_value (10); // fulfill promise
// (synchronizes with getting the future)
th1.join();
return 0;
}
复制代码
future 扩展/atomic
future::wait_for()
返回std::future_status
std::shared_future
future::get()
设计是一个移动语义,所以只能使用一次,shared_future::get()
是拷贝语义,可以使用多次
原子操作一般针对变量而不是代码段
一般原子操作针对++
,--
,+=
,-=
是支持的,其他的可能不支持
#include <iostream>
#include <thread>
using namespace std;
int cnt = 0;
void func() {
for(int i = 0; i < 10000; ++i) {
__sync_fetch_and_add(&cnt, 1);
}
}
int main() {
thread t1(func);
thread t2(func);
t1.join();
t2.join();
cout << cnt << endl;
return 0;
}
复制代码
#include <iostream>
#include <thread>
#include <atomic>
using namespace std;
atomic<int> cnt;
void func() {
for(int i = 0; i < 10000; ++i) {
cnt++;
}
}
int main() {
thread t1(func);
thread t2(func);
t1.join();
t2.join();
cout << cnt << endl;
return 0;
}
复制代码
atomic 拓展/async 拓展
// C++11提供了6种内存模型
enum memory_order {
memory_order_relaxed,
memory_order_consume,
memory_order_acquire,
memory_order_release,
memory_order_acq_rel,
memory_order_seq_cst
}
复制代码
std::async()
和std::thread
区别
std::async()
并不一定创建新线程
std::thread
不容易拿到线程返回值
recursive_mutex/timed_mutex
std::recursive_mutex
:递归的独占互斥锁
std::timed_mutex
:带超时功能的互斥锁
补充
虚假唤醒:在 lambda 表达式中正确处理
atomic::load()
以原子方式读取atomic
对象的值;atomic::store()
以原子方式写入atomic
对象的值
POSIX 线程
线程概念
线程实现原理
从内核里看进程和线程是一样的,都有各自不同的 PCB,但是 PCB 中指向内存资源的三级页表是相同的
# 查看指定线程的LWP号,跟线程id不一样
ps -Lf PID
复制代码
线程共享/非共享资源
文件描述符表
信号处理方式
当前工作目录
用户 ID 和组 ID
内存地址空间(不共享栈)
线程 id
处理器现场和内核栈指针
独立的用户空间栈
errno 变量
信号屏蔽字
调度优先级
线程优缺点
提高程序并发性
开销小
数据通信,共享数据方便
调试,编写困难
对信号支持不友好
API
int pthread_create(pthread_t *thread, const pthread_attr_t *attr,
void *(*start_routine) (void *), void *arg);
// 参数1:传出参数,保存分配好的线程ID
// 参数2:通常传NULL,表示使用线程默认属性
// 参数3:函数指针
// 参数4:线程主函数执行期间所使用的参数
// 返回值:成功返回0,失败返回错误号
void pthread_exit(void *retval);
// retval表示线程退出状态,通常传NULL
pthread_t pthread_self(void);
// 获取线程ID:pthread_t类型,在linux下为无符号整数(%lu)
// 线程ID是进程内部识别标志(和LWP号不一样),两个进程间线程ID允许相同
int pthread_join(pthread_t thread, void **retval);
// 线程回收,retval接受线程退出状态
int pthread_detach(pthread_t thread);
// 线程分离,线程与主控线程断开关系
// 分离的线程不能用pthread_join()回收
int pthread_cancel(pthread_t thread);
// 取消线程,线程的取消不是实时的,而有一定的延时,需要等待线程到达某个取消点
// 取消点:检查线程是否被取消,通常是一些系统调用,例如open、read、write等
// 执行man 7 pthreads可以查看具备这些取消点的系统调用列表
复制代码
线程属性设置分离
typedef struct {
int etachstate; // 线程的分离状态
int schedpolicy; // 线程调度策略
struct sched_param schedparam; // 线程的调度参数
int inheritsched; // 线程的继承性
int scope; // 线程的作用域
size_t guardsize; // 线程栈末尾的警戒缓冲区大小
int stackaddr_set; // 线程的栈设置
void* stackaddr; // 线程栈的位置
size_t stacksize; // 线程栈的大小
} pthread_attr_t;
// ulimit -a查看用户栈的大小
int pthread_attr_init(pthread_attr_t *attr);
int pthread_attr_destroy(pthread_attr_t *attr);
int pthread_attr_setdetachstate(pthread_attr_t *attr, int detachstate);
// PTHREAD_CREATE_DETACHED,分离状态
// PTHREAD_CREATE_JOINABLE,非分离状态
// 如果设置一个线程为分离线程,而这个线程运行又非常快,它可能在pthread_create函数返回之前就终止了
// 它终止以后就可能将线程号和系统资源移交给其他线程使用,这样调用pthread_create的线程就得到了错误的线程号
int pthread_attr_setstack(pthread_attr_t *attr, void *stackaddr, size_t stacksize);
int pthread_attr_getstack(const pthread_attr_t *attr, void **stackaddr, size_t *stacksize);
int pthread_attr_setstacksize(pthread_attr_t *attr, size_t stacksize);
int pthread_attr_getstacksize(const pthread_attr_t attr, size_t stacksize);
复制代码
NPTL
从 Linux2.6 开始,glibc 采用了新的线程库 NPTL(Native POSIX Thread Library)
线程库由两部分组成:内核的线程支持和用户的线程支持(glibc)
Linux 早期内核不支持线程时,glibc 就在库中(用户态)以纤程(用户态线程)的方式支持多线程了
POSIX thread 只要求了用户编程的调用接口,对内核接口没有要求。Linux 下线程的实现就是在内核支持的基础上以 POSIX thread 的方式对外封装了接口,所以才会有两个 ID
查看当前线程库的版本
getconf GNU_LIBPTHREAD_VERSION
复制代码
线程使用注意事项
线程同步
协同步调,按预定的先后次序运行
多个控制流共同操作一个共享资源的情况,都需要同步
数据混乱原因
资源共享(独享资源则不会)
调度随机(意味着数据访问会出现竞争)
线程间缺乏必要的同步机制
互斥量
pthread_mutex_t mutex;
// 变量mutex只有两种取值1、0,mutex锁init成功时为1。lock将mutex--,unlock将mutex++
int pthread_mutex_init(pthread_mutex_t * restrict mutex, const pthread_mutexattr_t * restrict attr);
// 参数1:传出参数,调用时应传&mutex
// 参数2:传入参数,互斥量属性,通常传NULL
int pthread_mutex_destroy(pthread_mutex_t *mutex);
int pthread_mutex_lock(pthread_mutex_t *mutex);
// lock尝试加锁,如果加锁不成功,线程阻塞,阻塞到持有该互斥量的其他线程解锁为止
int pthread_mutex_trylock(pthread_mutex_t *mutex);
// trylock加锁失败直接返回错误号,不阻塞
int pthread_mutex_unlock(pthread_mutex_t *mutex);
// unlock主动解锁,同时将阻塞在该锁上的所有线程全部唤醒,不确定哪个线程先被唤醒
复制代码
在访问共享资源前加锁,访问结束后立即解锁。锁的“粒度”应越小越好
静态初始化:如果 mutex 定义在全局或者是静态分配的,直接使用宏进行初始化
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
复制代码
动态初始化:局部变量使用pthread_mutex_init
进行初始化
死锁产生
读写锁
写独占,读共享,写锁优先级高
写模式加锁时,解锁前,所有对该锁加锁的线程都会被阻塞
读模式加锁时,如果线程以读模式对其加锁会成功,如果线程以写模式加锁会阻塞
读模式加锁时,既有尝试以写模式加锁的线程,也有尝试以读模式加锁的线程。那么读写锁会阻塞随后的读模式锁请求,优先满足写模式锁
pthread_rwlock_t rwlock = PTHREAD_RWLOCK_INITIALIZER;
int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock, const pthread_rwlockattr_t *restrict attr);
int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);
int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_tryrdlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_trywrlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);
复制代码
条件变量
通常和互斥锁配合使用
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
int pthread_cond_init(pthread_cond_t *cond, pthread_condattr_t *cond_attr);
int pthread_cond_destroy(pthread_cond_t *cond);
int pthread_cond_signal(pthread_cond_t *cond);
int pthread_cond_broadcast(pthread_cond_t *cond);
int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
// 释放已经掌握的互斥锁,相当于pthread_mutex_unlock(&mutex)
// 阻塞等待条件变量cond
// 前两步是一个原子操作
// 当被唤醒返回时,解除阻塞并重新申请获取互斥锁pthread_mutex_lock(&mutex);
int pthread_cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex, const struct timespec *abstime);
// abstime表示绝对时间
复制代码
互斥量+条件变量实现生产者消费者模型
信号量
进化版的互斥锁
sem_t sem;
int sem_init(sem_t *sem, int pshared, unsigned int value);
// pshared取0用于线程间,取1用于进程间
// 信号量的初值决定了占用信号量的线程的个数
int sem_destroy(sem_t *sem);
int sem_wait(sem_t *sem);
// 信号量大于0则减1,等于0则造成进程阻塞
int sem_trywait(sem_t *sem);
int sem_timedwait(sem_t *sem, const struct timespec *abs_timeout);
int sem_post(sem_t *sem);
// 信号量加1,同时唤醒阻塞在信号量上的线程
复制代码
信号量实现生产者消费者模型
进程间同步
进程间也能使用互斥量来达到同步的目的,在使用pthread_mutex_init
初始化时修改其属性为进程间共享
pthread_mutexattr_t mattr;
int pthread_mutexattr_init(pthread_mutexattr_t *attr);
int pthread_mutexattr_destroy(pthread_mutexattr_t *attr);
int pthread_mutexattr_setpshared(pthread_mutexattr_t *attr, int pshared);
// PTHREAD_PROCESS_PRIVATE 线程锁
// PTHREAD_PROCESS_SHARED 进程锁
复制代码
文件锁
借助 fcntl 实现文件锁
哲学家就餐问题
__thread(c++11 thread_local)
__thread 是 GCC 内置的线程局部存储设施(thread local storage),它的实现非常高效
__thread 变量是每个线程都有一份独立实体,各个线程的变量值互不干扰
除了这个主要用途,它还可以修饰那些值可能会变,带有全局性,但是又不值得用全局锁保护的变量
评论