为了防止大家找不到代码,还是先贴一下项目链接:
GitHub - skyfireitdiy/cocpp at cocpp-0.1.0
本文将介绍协程间的通信机制 channel 的实现方式。
机制
channel 的机制本质上就是队列(FIFO),为了可以安全地在不同协程间通信,需要将其设计为协程安全的,这可以借助前面介绍的协程同步机制完成。
cocpp 中的 channel 分为 3 类:无限制缓冲 channel,固定缓冲 channle,以及无缓冲 channel。
无限制缓冲 channel 的特点是发送端可以一直发送而不阻塞,接收端只要 channel 中有数据就可以取到数据。
固定缓冲区 channel 是指 channel 的缓冲区大小是有限制的,当 channel 的缓冲区满了之后,就会阻塞发送端。
无缓冲 channel 的特点是,只有当发送端与接收端同时操作的时候,才会继续往下运行。
接口
channel 的接口定义如下(include/cocpp/comm/co_chan.h):
template <typename ValueType, int MaxSize>class co_chan : private co_noncopyable{private: std::deque<ValueType> data__; bool closed__ { false }; mutable co_mutex mu__; co_condition_variable full_cond__; co_condition_variable empty_cond__;
co_binary_semaphore read_sem__ { 0 }; co_binary_semaphore write_sem__ { 0 };
public: class iterator { private: co_chan* ch__; ValueType value__; bool closed__ { true };
public: iterator(co_chan* ch); iterator() = default; iterator operator++(); ValueType& operator*(); ValueType* operator->(); bool operator!=(const iterator& other); };
iterator begin(); iterator end();
bool push(ValueType value); std::optional<ValueType> pop(); void close(); bool closed() const;};
复制代码
此类型是一个模板类型,其中两个模板参数的含义如下:
接下来看一下成员函数,成员函数如下:
begin、end:这两个成员函数是为了支持迭代器方式访问。
push:将数据添加到 channel,返回操作结果(当 channel 被关闭的场景下,push 会失败)。
pop:从 channel 中取出数据,返回结果 optional,当 channel 关闭并且其中没有数据的时候,返回无效 optional。
close:关闭 channel,关闭后无法存入。
closed:查询当前 channel 是否关闭。
然后是成员变量:
data__:实际的数据,使用 deque 作为底层数据结构
closed__:关闭标识
mu__:用于保护数据的互斥量。
full_cond__、empty_cond__分别是 channel 在满和空的时候用于等待的条件变量。
read_sem__、write_sem__ 用于无缓冲 channel 的同步二值信号量。
最后看一下一些非成员的辅助函数。
template <typename ValueType, int MaxSize>bool operator<(co_chan<ValueType, MaxSize>& ch, ValueType value);
template <typename ValueType, int MaxSize>bool operator>(co_chan<ValueType, MaxSize>& ch, ValueType& value);
template <typename ValueType, int MaxSize>co_chan<ValueType, MaxSize>& operator<<(co_chan<ValueType, MaxSize>& ch, ValueType value);
template <typename ValueType, int MaxSize>co_chan<ValueType, MaxSize>& operator>>(co_chan<ValueType, MaxSize>& ch, ValueType& value);
复制代码
其中<和<<操作符是 push 的简化写法,>和>>是 pop 的简化写法。
区别是<<和>>会忽略错误,返回 channel 对象引用,以便于进行链式调用。
实现
首先看一下有缓冲的 channel 实现。
有缓冲 channel
无限制缓冲 channel 和固定缓冲区 channel 属于有缓冲 channel,其实现如下(include/cocpp/comm/co_chan.h),忽略无关代码:
push
template <typename ValueType, int MaxSize>bool co_chan<ValueType, MaxSize>::push(ValueType value){ std::unique_lock<co_mutex> lock(mu__); if (closed__) { return false; } if constexpr (MaxSize > 0) // 当MaxSize < 0,是无限长度的chan { if (data__.size() == MaxSize) { full_cond__.wait(lock, [this] { return closed__ || data__.size() < MaxSize; }); if (closed__) { return false; } } } else if constexpr (MaxSize == 0) // 无缓冲chan { // ... } data__.push_back(value); if constexpr (MaxSize == 0) { // ... } else { empty_cond__.notify_one(); } return true;}
复制代码
步骤如下:
获取数据保护锁,这个锁保护了 channel 中所有的数据,是一把“大”锁。
判断 channel 是否已经被关闭,如果被关闭,那么就返回 push 失败。
如果是固定缓冲大小的 channel(MaxSize > 0),判断是否缓冲区已满,如果满了,就等待条件变量 full_cond__,直到元素被取出或者 channel 被关闭。
往缓冲区中插入一个元素。
唤醒等待在 empty_cond__上的协程。
pop
pop 的操作与 push 是相反的,逻辑有一定的相似度,但不完全一样。
template <typename ValueType, int MaxSize>std::optional<ValueType> co_chan<ValueType, MaxSize>::pop(){ std::optional<ValueType> ret; std::unique_lock<co_mutex> lock(mu__);
if constexpr (MaxSize != 0) { if (data__.empty()) { if (closed__) { return ret; } empty_cond__.wait(lock, [this] { return closed__ || !data__.empty(); }); if (data__.empty()) { return ret; } } } else { // ... } ret = data__.front(); data__.pop_front(); if constexpr (MaxSize != 0) { full_cond__.notify_one(); } return ret;}
复制代码
整个流程如下:
这里需要注意与 push 的一点不同是对 channel 关闭的处理。当 channel 关闭后,调用 push 会返回失败,而调用 pop 却不一定会返回无效数据,因为 pop 先判断的是 channel 中是否有数据,因此,当 channel 被关闭后,依然可以取出 channel 中已有的数据。
close
channel 关闭的逻辑很简单,如下:
template <typename ValueType, int MaxSize>void co_chan<ValueType, MaxSize>::close(){ std::lock_guard<co_mutex> lock(mu__); closed__ = true; if constexpr (MaxSize == 0) { // ... } else { full_cond__.notify_all(); empty_cond__.notify_all(); }}
复制代码
整个逻辑将关闭的标志位设置上,然后唤醒所有的等待中的协程。
无缓冲的 channel
相对于有缓冲的 channel,有缓冲的 channel 实现会复杂一些。
push
template <typename ValueType, int MaxSize>bool co_chan<ValueType, MaxSize>::push(ValueType value){ std::unique_lock<co_mutex> lock(mu__); if (closed__) { return false; } if constexpr (MaxSize > 0) // 当MaxSize < 0,是无限长度的chan { // ... } else if constexpr (MaxSize == 0) // 无缓冲chan { lock.unlock(); read_sem__.acquire(); lock.lock(); } data__.push_back(value); if constexpr (MaxSize == 0) { lock.unlock(); write_sem__.release(); lock.lock(); } else { // ... } return true;}
复制代码
与有缓冲的 channel 不同的是,无缓冲的 channel 需要发送端与接收端同时解锁,整体的逻辑如下:
接下来将 pop 的逻辑也加上。
pop
template <typename ValueType, int MaxSize>std::optional<ValueType> co_chan<ValueType, MaxSize>::pop(){ std::optional<ValueType> ret; std::unique_lock<co_mutex> lock(mu__);
if constexpr (MaxSize != 0) { // ... } else { if (closed__) { return ret; } lock.unlock(); read_sem__.release(); write_sem__.acquire(); lock.lock(); if (data__.empty()) { return ret; } } ret = data__.front(); data__.pop_front(); if constexpr (MaxSize != 0) { // ... } return ret;}
复制代码
逻辑如下:
整体流程
将 push 与 pop 流程交叉起来如下:
接收端:释放一个 read_sem__,说明在等待数据
发送端:当 read_sem__ acquire 成功之后,说明有读者在等待数据
发送端:写数据到 data__中
发送端:释放一个 write_sem__(说明有数据准备好了)
接收端:获取 write_sem__,返回后说明发送端已经将数据准备好了
接收端:取出数据
已知问题
以上的无缓冲 channel 实现有一些问题,当 lock 在解锁后,可能会有其他的协程将 channel 关闭,整个流程中对这种情况没有做处理,在此场景下会出问题。这个问题留在后续的版本中修复。
总结
本文讨论了用于协程通信的 c 三种不同类型的 channel 的实现。
评论