写点什么

C++ 中的多线程及其之后的周边

作者:EquatorCoco
  • 2024-12-09
    福建
  • 本文字数:4370 字

    阅读完需:约 14 分钟

平台差异:Linux 与 Windows,跨平台方案


在 Linux 上,有 pthread 的使用,而 C++ 11 标准中使用了<thread>,是一个良好的跨平台方案。thread 和 pthread 在实际的使用中有一些显著的差别,典型例子如:pthread_create用来创建线程,而 std::thread 可以直接被用来创建线程。


客观来说,thread 是一个更为简洁的实现,而 pthread 难免有些粗暴,本文在 C++的多线程编程中主要以 thread 风格来实现。


子线程退出与主线程退出的关系


引用:

https://blog.csdn.net/a0408152/article/details/129093394

https://blog.csdn.net/m0_56374992/article/details/119109979


detach 是将当前线程对象所代表的执行实例与该线程对象分离,使得线程的执行可以单独进行。然而,在这种情况下,即使子线程 detach,主线程退出也会导致子线程退出。具体原因是主进程通过 return 或者 exit 方式退出,进程退出导致所有线程同步退出。这里和 Linux 的进程/线程模型有关,是 posix(pthread)的具体实现,参见引用2。为了防止这种情况,在不想要在主进程中回收子进程的情况时,使用 pthread_exit(nullptr);如果仅为了测试,也可以在主进程中加循环。所以,重新理解 detach:把主进程和子线程分离,使二者能够独立的运行。


原子操作


备注:此节部分内容及代码来源于帝国理工学院 COMP60017 - L05 - Multi-core and Parallelism, Lluís Vilanova


由于现代 CPU 使用乱序流水线(out-of-order)的方式进行指令的执行,因此对于某一条单独的 a = b + 1 指令来说,其在 O2 优化层级上可以被分解为以下三条汇编指令:


mov     eax, DWORD PTR [rbp-8]add     eax, 1mov     DWORD PTR [rbp-4], eax
复制代码


模拟编译(以及编译优化):https://godbolt.org/

由此可见,在没有进行任何额外处理的情况下,有可能在 add 与 mov 操作之间出现进程的切换调度,因此就会出现伪递增现象,即为两个并行线程同时对一个变量自增 10000 次,最后结果通常要小于 20000。


atomic 关键字


atomic 是 C++中的一个关键字,作用是针对某一个具体变量,提供一组'原子的'操作。在本质上是对单条指令的临界区保护。


具体使用例子如下:

// Use atomic operations on data shared across threads#include <atomic>#include <thread>#include <iostream>int main(int argc, char** argv) {    int iters = 100000000; std::atomic<int> a = 0;    std::thread t1([&](){ for (volatile int i = 0; i < iters; i++) a++; });    std::thread t2([&](){ for (volatile int i = 0; i < iters; i++) a++; });    t1.join(); t2.join();    std::cout << "expected=" << iters*2 << " got=" << a << std::endl;}
复制代码


在 C++中,有两种 atomic 的使用方式:

  1. 在定义变量时声明为 atomic 类型 e.g. std::atomic<int> a = 0;

  2. 在使用变量时使用 atomic_系列操作 e.g. atomic_fetch_add(&a, 1);

参考:https://en.cppreference.com/w/cpp/atomic/atomic/compare_exchange

  • atomic_{load, store}:读取/写入

  • atomic_compare_exchange_{weak, strong}:注意,在 atomic 中,提供了两个 CAS 操作:

  • compare_exchange_weak(T& expected, T desired)

  • compare_exchange_strong(T& expected, T desired)

  • 首先与第一个参数比较:若相等,则改变原子变量的值为第二个参数,返回 true。若不相等,则将第一个参数的值改成原子变量的当前值,返回 false。

  • 但是需要注意,以 weak 方式实现的操作返回 false 时,并不一定完成了实际上的 expected value 修改,可能会出现伪失败(spuriously fail)情况。

  • 在实际操作中,尤其是在应用层面,如果不是对性能极度敏感的情况下,一律使用 strong

  • atomic_fetch_{add, sub, or, xor, and}:算数/逻辑运算


int beings, legs;void enter_room(int nlegs) {    atomic_fetch_add(&beings, 1);    atomic_fetch_add(&legs, nlegs);}
复制代码


C++内存顺序:memory order

参见:https://en.cppreference.com/w/cpp/atomic/atomic

std::memory_order_seq_cst

这个面试底层经常问,最好搞明白


CAS 无锁操作

无锁相比于加锁操作来说,最大的优势是性能显著提高。

大多数现代 CPU 在硬件层面上都提供了原子实现 CAS 的机制。

shared lock: 读写锁,多个 thread 可以同时读,但只有一个能写

在 Linux 中的 pthread 库中,我们采用了 CAS 作为读写锁的实现方式

CAS 操作,全名为 Compare-and-swap(比较并交换)操作,是一个原子的操作。

在 C++中,一个简单的实现:


bool compare_and_swap(int *pAddr, int nExpected, int nNew){    if(*pAddr == nExpected)    {        *pAddr = nNew;        return true;    }    else        return false;}
复制代码


这里提供一个使用 CAS 进行自增的操作:


void atomic_inc(uint64_t* addr) {    bool swapped = false;    while (not swapped) {        auto old = *addr;        swapped = CAS(addr, old, old+1);    }}
复制代码


线程同步


线程临界区:对于只读不写的变量,不需要保护

▪ Lock/mutex▪ Semaphore▪ Shared lock (aka, read/write lock)▪ Condition variables▪ Barrier

例:使用条件变量condition_variable

例题:leetcode 1117. H2O 生成

在 C++中,condition_variable必须结合unique_lock使用,此外还有一个condition_variable_any类可以使用所有的锁,此处暂时不论。

基本使用流程:mutex lock -> wait -> mutex unlock

wait 函数阻塞完成后即自动 unlock 释放锁,不需要手动释放。

虚假唤醒:使用notify_all()函数唤醒所有 wait 状态下的线程时,发现其等待的条件并没有满足。


解决方法:

1、使用一个 while 循环在每次被唤醒时判断条件

while (g_deque.empty()){    g_cond.wait(lck);}
复制代码


2、使用一个带 predicate 判断条件的 wait

cv.wait(lck,[this]{return printo > 0;});//此处用了lambda函数,在类中所以需要this,较为方便
复制代码


异步编程


异步编程:回调函数 callback

c++11:中新增了 std::future 和 std::promise


更加轻量级:协程


c++20:提供了 co_routine(协程),在适当的时候做挂起(suspend)和恢复(resume),是个基于 state machine 的无栈协程

评价为对 golang 的拙劣模仿(原生支持,从来没见人在 C++中用过)

我在这个项目中从内核到 userspace 维护了一个支持热迁移的虚拟机状态流,就用了类似的思想,但是这里是个有栈协程:https://github.com/mahiru23/intravisor/tree/syscall/src


锁的实现


User-level


  • Acquire/lock → Loop until CAS from “released” to “acquired”

  • Release/unlock → Set value to “released”


缺点:


  1. 假设有两个线程,t1 持有锁的时候 t2 会反复循环尝试直到获取位置,存在循环浪费(叫做 busy waiting(忙等)

  2. Potential thread starvation:等待的线程可能一直在等待(可能使用 queue 来解决问题?)


这里给出一个仅使用


#include <iostream>#include <atomic>#include <thread>#include <cstdlib>#include <ctime>#include <unistd.h>
class mysem {public: mysem(uint32_t init_value); void acquire(); void release();private: std::atomic<uint32_t> counter;};
mysem::mysem(uint32_t init_value) { counter.store(init_value, std::memory_order_seq_cst);}
void mysem::acquire() { if(counter.load(std::memory_order_seq_cst) > 0) { counter.fetch_sub(1); } else { while (counter.load(std::memory_order_seq_cst) <= 0) { // busy-wait } }}
void mysem::release() { counter.fetch_add(1);}
void random_work() { usleep((rand()%1000)*10);}
int main(int argc, char**argv){ srand(time(nullptr)); mysem s(1); std::thread t1([&](){ random_work(); s.acquire(); std::cout << 1; random_work(); std::cout << 1; s.release(); }); std::thread t2([&](){ random_work(); s.acquire(); std::cout << 2; random_work(); std::cout << 2; s.release(); }); t1.join(); t2.join(); std::cout << std::endl;}
复制代码


Kernel-level


阻塞后 sleep,内核层面 awake 按照顺序,保证了阻塞线程的公平性然而,这种方法更加 expensive,因为过程中需要 syscall


hybrid


Linux的pthread_mutex_lock内部使用了Linux futex

在较短的时间内使用 user-level,对于等待时间较长的 thread 由 kernel syscall 处理(先 busy-wait,再阻塞)

glibc 的 pthread 实现方式:提前预测可能需要花费多长时间:Can adapt user-level busy-wait time dynamically

给出一段 CAS + futex 的混合代码:


#include <iostream>#include <atomic>#include <thread>#include <cstdlib>#include <ctime>#include <unistd.h>#include <sys/syscall.h>#include <linux/futex.h>#include <sys/time.h>
class mysem {public: mysem(uint32_t init_value); void acquire(); void release();private: std::atomic<uint32_t> counter;};
mysem::mysem(uint32_t init_value) { counter.store(init_value, std::memory_order_seq_cst);}
void mysem::acquire() {
for (int i = 0; i < 100; ++i) { uint32_t expected = counter.load(std::memory_order_seq_cst); if (expected > 0 && counter.compare_exchange_strong(expected, expected - 1, std::memory_order_seq_cst)) { return; } }
uint32_t* counter_ptr = reinterpret_cast<uint32_t*>(&counter); syscall(SYS_futex, counter_ptr, FUTEX_WAIT, counter.load(std::memory_order_seq_cst) > 0, nullptr, nullptr, 0);}
void mysem::release() { counter.fetch_add(1); uint32_t* counter_ptr = reinterpret_cast<uint32_t*>(&counter); syscall(SYS_futex, counter_ptr, FUTEX_WAKE, 1, nullptr, nullptr, 0);}
void random_work() { usleep((rand()%1000)*10);}
int main(int argc, char**argv){ srand(time(nullptr)); mysem s(1); std::thread t1([&](){ random_work(); s.acquire(); std::cout << 1; random_work(); std::cout << 1; s.release(); }); std::thread t2([&](){ random_work(); s.acquire(); std::cout << 2; random_work(); std::cout << 2; s.release(); }); t1.join(); t2.join(); std::cout << std::endl;}
复制代码


线程安全:

thread_local 引用:https://zhuanlan.zhihu.com/p/77585472

C++ 11 引入了 thread_local,作为线程内部的私有本地变量


文章转载自:真昼小天使 daisuki 

原文链接:https://www.cnblogs.com/kazusarua/p/18594081

体验地址:http://www.jnpfsoft.com/?from=infoq

用户头像

EquatorCoco

关注

还未添加个人签名 2023-06-19 加入

还未添加个人简介

评论

发布
暂无评论
C++中的多线程及其之后的周边_Java_EquatorCoco_InfoQ写作社区