写点什么

一个 cpp 协程库的前世今生(二十五)channel

作者:SkyFire
  • 2022 年 2 月 06 日
  • 本文字数:4083 字

    阅读完需:约 13 分钟

一个cpp协程库的前世今生(二十五)channel

为了防止大家找不到代码,还是先贴一下项目链接:


GitHub - skyfireitdiy/cocpp at cocpp-0.1.0


本文将介绍协程间的通信机制 channel 的实现方式。

机制

channel 的机制本质上就是队列(FIFO),为了可以安全地在不同协程间通信,需要将其设计为协程安全的,这可以借助前面介绍的协程同步机制完成。


cocpp 中的 channel 分为 3 类:无限制缓冲 channel,固定缓冲 channle,以及无缓冲 channel。


  • 无限制缓冲 channel


无限制缓冲 channel 的特点是发送端可以一直发送而不阻塞,接收端只要 channel 中有数据就可以取到数据。


  • 固定缓冲区 channle


固定缓冲区 channel 是指 channel 的缓冲区大小是有限制的,当 channel 的缓冲区满了之后,就会阻塞发送端。


  • 无缓冲 channel


无缓冲 channel 的特点是,只有当发送端与接收端同时操作的时候,才会继续往下运行。

接口

channel 的接口定义如下(include/cocpp/comm/co_chan.h):



template <typename ValueType, int MaxSize>class co_chan : private co_noncopyable{private: std::deque<ValueType> data__; bool closed__ { false }; mutable co_mutex mu__; co_condition_variable full_cond__; co_condition_variable empty_cond__;
co_binary_semaphore read_sem__ { 0 }; co_binary_semaphore write_sem__ { 0 };
public: class iterator { private: co_chan* ch__; ValueType value__; bool closed__ { true };
public: iterator(co_chan* ch); iterator() = default; iterator operator++(); ValueType& operator*(); ValueType* operator->(); bool operator!=(const iterator& other); };
iterator begin(); iterator end();
bool push(ValueType value); std::optional<ValueType> pop(); void close(); bool closed() const;};
复制代码


此类型是一个模板类型,其中两个模板参数的含义如下:


  • ValueType:channel 中可以存取的数据类型。

  • MaxSize:channel 的大小,此值如果为正值,表示固定大小缓冲的 channel,如果为 0,则表示为无缓冲的 channel,而如果为负值,表示缓冲区大小没有限制


接下来看一下成员函数,成员函数如下:


  • begin、end:这两个成员函数是为了支持迭代器方式访问。

  • push:将数据添加到 channel,返回操作结果(当 channel 被关闭的场景下,push 会失败)。

  • pop:从 channel 中取出数据,返回结果 optional,当 channel 关闭并且其中没有数据的时候,返回无效 optional。

  • close:关闭 channel,关闭后无法存入。

  • closed:查询当前 channel 是否关闭。


然后是成员变量:


  • data__:实际的数据,使用 deque 作为底层数据结构

  • closed__:关闭标识

  • mu__:用于保护数据的互斥量。

  • full_cond__、empty_cond__分别是 channel 在满和空的时候用于等待的条件变量。

  • read_sem__、write_sem__ 用于无缓冲 channel 的同步二值信号量。


最后看一下一些非成员的辅助函数。


template <typename ValueType, int MaxSize>bool operator<(co_chan<ValueType, MaxSize>& ch, ValueType value);
template <typename ValueType, int MaxSize>bool operator>(co_chan<ValueType, MaxSize>& ch, ValueType& value);
template <typename ValueType, int MaxSize>co_chan<ValueType, MaxSize>& operator<<(co_chan<ValueType, MaxSize>& ch, ValueType value);
template <typename ValueType, int MaxSize>co_chan<ValueType, MaxSize>& operator>>(co_chan<ValueType, MaxSize>& ch, ValueType& value);
复制代码


其中<和<<操作符是 push 的简化写法,>和>>是 pop 的简化写法。


区别是<<和>>会忽略错误,返回 channel 对象引用,以便于进行链式调用。

实现

首先看一下有缓冲的 channel 实现。

有缓冲 channel

无限制缓冲 channel 和固定缓冲区 channel 属于有缓冲 channel,其实现如下(include/cocpp/comm/co_chan.h),忽略无关代码:

push

template <typename ValueType, int MaxSize>bool co_chan<ValueType, MaxSize>::push(ValueType value){    std::unique_lock<co_mutex> lock(mu__);    if (closed__)    {        return false;    }    if constexpr (MaxSize > 0) // 当MaxSize < 0,是无限长度的chan    {        if (data__.size() == MaxSize)        {            full_cond__.wait(lock, [this] { return closed__ || data__.size() < MaxSize; });            if (closed__)            {                return false;            }        }    }    else if constexpr (MaxSize == 0) // 无缓冲chan    {        // ...    }    data__.push_back(value);    if constexpr (MaxSize == 0)    {        // ...    }    else    {        empty_cond__.notify_one();    }    return true;}
复制代码


步骤如下:


  • 获取数据保护锁,这个锁保护了 channel 中所有的数据,是一把“大”锁。

  • 判断 channel 是否已经被关闭,如果被关闭,那么就返回 push 失败。

  • 如果是固定缓冲大小的 channel(MaxSize > 0),判断是否缓冲区已满,如果满了,就等待条件变量 full_cond__,直到元素被取出或者 channel 被关闭。

  • 往缓冲区中插入一个元素。

  • 唤醒等待在 empty_cond__上的协程。

pop

pop 的操作与 push 是相反的,逻辑有一定的相似度,但不完全一样。



template <typename ValueType, int MaxSize>std::optional<ValueType> co_chan<ValueType, MaxSize>::pop(){ std::optional<ValueType> ret; std::unique_lock<co_mutex> lock(mu__);
if constexpr (MaxSize != 0) { if (data__.empty()) { if (closed__) { return ret; } empty_cond__.wait(lock, [this] { return closed__ || !data__.empty(); }); if (data__.empty()) { return ret; } } } else { // ... } ret = data__.front(); data__.pop_front(); if constexpr (MaxSize != 0) { full_cond__.notify_one(); } return ret;}
复制代码


整个流程如下:


  • 获取数据保护锁

  • 判断缓冲区中是否有数据,如果没有数据,再判断 channel 是否被关闭,如果被关闭,返回空的 optional,如果没有被关闭,就等到被关闭或者 channel 中有数据为止

  • 如果缓冲区中有数据,就取出数据

  • 唤醒等待在 full_cond__上的协程。


这里需要注意与 push 的一点不同是对 channel 关闭的处理。当 channel 关闭后,调用 push 会返回失败,而调用 pop 却不一定会返回无效数据,因为 pop 先判断的是 channel 中是否有数据,因此,当 channel 被关闭后,依然可以取出 channel 中已有的数据。

close

channel 关闭的逻辑很简单,如下:


template <typename ValueType, int MaxSize>void co_chan<ValueType, MaxSize>::close(){    std::lock_guard<co_mutex> lock(mu__);    closed__ = true;    if constexpr (MaxSize == 0)    {        // ...    }    else    {        full_cond__.notify_all();        empty_cond__.notify_all();    }}
复制代码


整个逻辑将关闭的标志位设置上,然后唤醒所有的等待中的协程。

无缓冲的 channel

相对于有缓冲的 channel,有缓冲的 channel 实现会复杂一些。

push


template <typename ValueType, int MaxSize>bool co_chan<ValueType, MaxSize>::push(ValueType value){ std::unique_lock<co_mutex> lock(mu__); if (closed__) { return false; } if constexpr (MaxSize > 0) // 当MaxSize < 0,是无限长度的chan { // ... } else if constexpr (MaxSize == 0) // 无缓冲chan { lock.unlock(); read_sem__.acquire(); lock.lock(); } data__.push_back(value); if constexpr (MaxSize == 0) { lock.unlock(); write_sem__.release(); lock.lock(); } else { // ... } return true;}
复制代码


与有缓冲的 channel 不同的是,无缓冲的 channel 需要发送端与接收端同时解锁,整体的逻辑如下:


  • 当 read_sem__ acquire 成功之后,说明有读者在等待数据

  • 写数据到 data__中

  • 释放一个 write_sem__(说明有数据准备好了)


接下来将 pop 的逻辑也加上。

pop


template <typename ValueType, int MaxSize>std::optional<ValueType> co_chan<ValueType, MaxSize>::pop(){ std::optional<ValueType> ret; std::unique_lock<co_mutex> lock(mu__);
if constexpr (MaxSize != 0) { // ... } else { if (closed__) { return ret; } lock.unlock(); read_sem__.release(); write_sem__.acquire(); lock.lock(); if (data__.empty()) { return ret; } } ret = data__.front(); data__.pop_front(); if constexpr (MaxSize != 0) { // ... } return ret;}
复制代码


逻辑如下:


  • 释放一个 read_sem__,说明在等待数据

  • 获取 write_sem__,返回后说明发送端已经将数据准备好了

  • 取出数据

整体流程

将 push 与 pop 流程交叉起来如下:


  • 接收端:释放一个 read_sem__,说明在等待数据

  • 发送端:当 read_sem__ acquire 成功之后,说明有读者在等待数据

  • 发送端:写数据到 data__中

  • 发送端:释放一个 write_sem__(说明有数据准备好了)

  • 接收端:获取 write_sem__,返回后说明发送端已经将数据准备好了

  • 接收端:取出数据

已知问题

以上的无缓冲 channel 实现有一些问题,当 lock 在解锁后,可能会有其他的协程将 channel 关闭,整个流程中对这种情况没有做处理,在此场景下会出问题。这个问题留在后续的版本中修复。

总结

本文讨论了用于协程通信的 c 三种不同类型的 channel 的实现。

发布于: 2022 年 02 月 06 日阅读数: 3
用户头像

SkyFire

关注

这个cpper很懒,什么都没留下 2018.10.13 加入

会一点点cpp的苦逼码农

评论

发布
暂无评论
一个cpp协程库的前世今生(二十五)channel