写点什么

一个 cpp 协程库的前世今生(十三)互斥量

作者:SkyFire
  • 2022 年 1 月 07 日
  • 本文字数:2886 字

    阅读完需:约 9 分钟

一个cpp协程库的前世今生(十三)互斥量

为了防止大家找不到代码,还是先贴一下项目链接:


GitHub - skyfireitdiy/cocpp at cocpp-0.1.0


上一篇文章讲了自旋锁,本文将继续讲同步相关的互斥量。

接口

互斥量的接口定义如下(include/cocpp/sync/co_mutex.h):


class co_mutex : private co_noncopyable{    co_ctx*             owner__ { nullptr }; // 持有者    co_spinlock         spinlock__;          // 互斥锁    std::deque<co_ctx*> wait_deque__;        // 等待队列public:    void lock();     // 加锁    void unlock();   // 解锁    bool try_lock(); // 尝试加锁};
复制代码


其接口定义与自旋锁一致,功能也一样。那么为什么有了自旋锁还需要互斥量呢?上一篇文章提到了自旋锁的缺陷。也就是可以在线程 a 中加锁,在线程 b 中解锁。但是由于自旋锁是框架内部使用,不对外暴露,所以只要严格控制加锁和解锁的顺序,这个缺陷也就不存在了。


但是互斥量是要对用户暴露的,因此必须修复这个缺陷。


还有一点,自旋锁是循环等待的,所以它只能用于速度非常快的加锁解锁流程中,如果被保护的代码段有很耗时的操作,那么另一个等待枷锁的协程,就会疯狂占 CPU,这一点是必须避免的。


因此设计的面向用户的互质量,其应用于协程上下文中,保护的代码段之间可以有耗时操作。


这里对几个私有成员变量做一些说明:


  • owner__:表示当前互斥量的持有者,如果没有协程持有,则此值为 nullptr。

  • spinlock__:自旋锁,用于保护 owner__。

  • wait_deque__:等待队列,表示当前互斥量被哪些 ctx 等待着。


接下来我们看一下互质量的实现。

实现

lock

lock 的实现如下(source/cocpp/sync/co_mutex.cpp):


void co_mutex::lock(){    auto             ctx = co::current_ctx();    std::scoped_lock lock(spinlock__);    if (owner__ != nullptr)    {        ctx_enter_wait_state__(ctx, CO_RC_TYPE_MUTEX, this, wait_deque__);        lock_yield__(spinlock__, [this] { return owner__ != nullptr; });    }    owner__ = ctx;}
复制代码


第 1 步先获取当前线程的上下文。


第 2 步对 owner__变量使用自旋锁保护。


第 3 步判断 owner__是否是空,为空则表示没有人占用当前互斥量,因此就可以进行加锁流程。但是如果不为空的话,说明当前互斥量被其他的协程持有着,那么当前协程需要进入睡眠等待。


接下来我们看一下进入睡眠等待的两个函数。

ctx_enter_wait_state__

此函数的实现位于 source/cocpp/sync/co_sync_helper.cpp:


void ctx_enter_wait_state__(co_ctx* ctx, int rc_type, void* rc, std::deque<co_ctx*>& wait_deque){    ctx_enter_wait_state__(ctx, rc_type, rc, wait_deque, ctx);}
复制代码


这里调用了模板重载函数,位于 include/cocpp/sync/co_sync_helper.h


template <typename ContextType>void ctx_enter_wait_state__(co_ctx* ctx, int rc_type, void* rc, std::deque<ContextType>& wait_list, const ContextType& data){    ctx->enter_wait_resource_state(rc_type, rc);    wait_list.push_back(data);}
复制代码


此函数首先将 ctx 设置为等待状态(enter_wait_resource_state 在前面的 ctx 状态与标志为相关章节有讲解),然后将当前的 ctx 添加到互斥量的等待列表中。


从之前 ctx 的介绍可以了解到,当 ctx 被设置为等待状态时,env 不会再去调度此 ctx,这样就实现了等待的效果。


接下来我们来看另一个函数 lock_yield__。

lock_yield__

此函数的定义位于 source/cocpp/sync/co_sync_helper.cpp:


void lock_yield__(co_spinlock& lk, std::function<bool()> checker){    do    {        lk.unlock();        this_co::yield();        lk.lock();    } while (checker());}
复制代码


函数的逻辑非常简单,在这个锁被其他协程持有期间,一直会循环,循环过程中会切换到其他的协程。那么有的读者可能就会问了,这不还是死循环吗,和自旋锁有什么区别呢?


其区别就在于上面的 ctx_enter_wait_state__函数设置了等待状态,只有在等待状态被取消之后才会重新调度到这个 ctx,也就是说在 lock_yield__函数切出去后,直到解除等待前,当前线程都不会调度,因此也就不会占用 CPU。


然后我们再看一下这里,将之前的自旋锁解锁了才切换协程,为什么要这样做呢?因为在其他协程释放互质量的时候,也是需要加锁的,如果这里不释放,那么其他只有互斥量的协程就因为获取不到自旋锁而无法释放互斥量,引发死锁问题。

try_lock

try_lock 的实现相当于是 lock 的单次版本,如下(source/cocpp/sync/co_mutex.cpp):


bool co_mutex::try_lock(){    auto             ctx = co::current_ctx();    std::scoped_lock lock(spinlock__);    if (owner__ != nullptr)    {        return false;    }    owner__ = ctx;    return true;}
复制代码

unlock

接下来我们看一下解锁流程,其实现代码位于 source/cocpp/sync/co_mutex.cpp:


void co_mutex::unlock(){    auto             ctx = co::current_ctx();    std::scoped_lock lock(spinlock__);    if (owner__ != ctx)    {        CO_O_ERROR("ctx is not owner, this ctx is %p, owner is %p", ctx, owner__);        throw co_error("ctx is not owner[", ctx, "]");    }
owner__ = nullptr;
wake_front__(wait_deque__);}
复制代码


第 1 步仍然是获取自旋锁,只有获取的自旋锁才可以对 owner__操作。


第 2 步是判断当前协程是否是持有互斥量的协程,如果不是则直接抛出异常,这里明显与自旋锁的实现不同了。


第 3 步将 owner__设置为空,意味当前互斥量没有协程持有。


最后一步,唤醒等待队列中的 ctx。


这里我们看一下 wake_front__的实现(source/cocpp/sync/co_sync_helper.cpp):


void wake_front__(std::deque<co_ctx*>& wait_deque){    wake_front__(wait_deque, std::function([](co_ctx*& ctx) {                     ctx->leave_wait_resource_state();                 }));}
复制代码


此处依然是调用了模板重载版本(include/cocpp/sync/co_sync_helper.h):


template <typename ContextType>void wake_front__(std::deque<ContextType>& wait_list, std::function<void(ContextType&)> wake_method){    if (wait_list.empty())    {        return;    }
auto obj = wait_list.front(); wait_list.pop_front(); wake_method(obj);}
复制代码


首先判断等待队列是否为空,如果为空,说明没有协程等待,则没有协程需要唤醒。


否则从等待队列的头部取出一个 ctx,调用其对应的 leave_wait_resource_state 函数来唤醒。


leave_wait_resource_state 这个函数在前面的 ctx 状态与标识位相关章节中有介绍,作用为清除等待标志,并唤醒对应的调度线程。这样一来,被唤醒的那个 ctx 就可以去抢占这个互斥量。


此处遗留了一个问题,如果我们将队列头部的 ctx 唤醒了,但是他却没有抢到这个互斥量,会发生什么?此时会导致 lock_yield__中的那个循环,经常被触发,从而引发 CPU 升高。这个问题在后续的版本中再做修改(触发几率比较低,一般不影响使用)。

总结

本文讨论了互质量的实现方式,包括如何让协程在不消耗 CPU 的情况下去等待互斥量,如何解锁互斥量并唤醒对应的 ctx。

发布于: 刚刚阅读数: 2
用户头像

SkyFire

关注

这个cpper很懒,什么都没留下 2018.10.13 加入

会一点点cpp的苦逼码农

评论

发布
暂无评论
一个cpp协程库的前世今生(十三)互斥量