为了防止大家找不到代码,还是先贴一下项目链接:
GitHub - skyfireitdiy/cocpp at cocpp-0.1.0
本文将介绍协程间的通信机制 channel 的实现方式。
机制
channel 的机制本质上就是队列(FIFO),为了可以安全地在不同协程间通信,需要将其设计为协程安全的,这可以借助前面介绍的协程同步机制完成。
cocpp 中的 channel 分为 3 类:无限制缓冲 channel,固定缓冲 channle,以及无缓冲 channel。
无限制缓冲 channel 的特点是发送端可以一直发送而不阻塞,接收端只要 channel 中有数据就可以取到数据。
固定缓冲区 channel 是指 channel 的缓冲区大小是有限制的,当 channel 的缓冲区满了之后,就会阻塞发送端。
无缓冲 channel 的特点是,只有当发送端与接收端同时操作的时候,才会继续往下运行。
接口
channel 的接口定义如下(include/cocpp/comm/co_chan.h):
template <typename ValueType, int MaxSize>
class co_chan : private co_noncopyable
{
private:
std::deque<ValueType> data__;
bool closed__ { false };
mutable co_mutex mu__;
co_condition_variable full_cond__;
co_condition_variable empty_cond__;
co_binary_semaphore read_sem__ { 0 };
co_binary_semaphore write_sem__ { 0 };
public:
class iterator
{
private:
co_chan* ch__;
ValueType value__;
bool closed__ { true };
public:
iterator(co_chan* ch);
iterator() = default;
iterator operator++();
ValueType& operator*();
ValueType* operator->();
bool operator!=(const iterator& other);
};
iterator begin();
iterator end();
bool push(ValueType value);
std::optional<ValueType> pop();
void close();
bool closed() const;
};
复制代码
此类型是一个模板类型,其中两个模板参数的含义如下:
接下来看一下成员函数,成员函数如下:
begin、end:这两个成员函数是为了支持迭代器方式访问。
push:将数据添加到 channel,返回操作结果(当 channel 被关闭的场景下,push 会失败)。
pop:从 channel 中取出数据,返回结果 optional,当 channel 关闭并且其中没有数据的时候,返回无效 optional。
close:关闭 channel,关闭后无法存入。
closed:查询当前 channel 是否关闭。
然后是成员变量:
data__:实际的数据,使用 deque 作为底层数据结构
closed__:关闭标识
mu__:用于保护数据的互斥量。
full_cond__、empty_cond__分别是 channel 在满和空的时候用于等待的条件变量。
read_sem__、write_sem__ 用于无缓冲 channel 的同步二值信号量。
最后看一下一些非成员的辅助函数。
template <typename ValueType, int MaxSize>
bool operator<(co_chan<ValueType, MaxSize>& ch, ValueType value);
template <typename ValueType, int MaxSize>
bool operator>(co_chan<ValueType, MaxSize>& ch, ValueType& value);
template <typename ValueType, int MaxSize>
co_chan<ValueType, MaxSize>& operator<<(co_chan<ValueType, MaxSize>& ch, ValueType value);
template <typename ValueType, int MaxSize>
co_chan<ValueType, MaxSize>& operator>>(co_chan<ValueType, MaxSize>& ch, ValueType& value);
复制代码
其中<和<<操作符是 push 的简化写法,>和>>是 pop 的简化写法。
区别是<<和>>会忽略错误,返回 channel 对象引用,以便于进行链式调用。
实现
首先看一下有缓冲的 channel 实现。
有缓冲 channel
无限制缓冲 channel 和固定缓冲区 channel 属于有缓冲 channel,其实现如下(include/cocpp/comm/co_chan.h),忽略无关代码:
push
template <typename ValueType, int MaxSize>
bool co_chan<ValueType, MaxSize>::push(ValueType value)
{
std::unique_lock<co_mutex> lock(mu__);
if (closed__)
{
return false;
}
if constexpr (MaxSize > 0) // 当MaxSize < 0,是无限长度的chan
{
if (data__.size() == MaxSize)
{
full_cond__.wait(lock, [this] { return closed__ || data__.size() < MaxSize; });
if (closed__)
{
return false;
}
}
}
else if constexpr (MaxSize == 0) // 无缓冲chan
{
// ...
}
data__.push_back(value);
if constexpr (MaxSize == 0)
{
// ...
}
else
{
empty_cond__.notify_one();
}
return true;
}
复制代码
步骤如下:
获取数据保护锁,这个锁保护了 channel 中所有的数据,是一把“大”锁。
判断 channel 是否已经被关闭,如果被关闭,那么就返回 push 失败。
如果是固定缓冲大小的 channel(MaxSize > 0),判断是否缓冲区已满,如果满了,就等待条件变量 full_cond__,直到元素被取出或者 channel 被关闭。
往缓冲区中插入一个元素。
唤醒等待在 empty_cond__上的协程。
pop
pop 的操作与 push 是相反的,逻辑有一定的相似度,但不完全一样。
template <typename ValueType, int MaxSize>
std::optional<ValueType> co_chan<ValueType, MaxSize>::pop()
{
std::optional<ValueType> ret;
std::unique_lock<co_mutex> lock(mu__);
if constexpr (MaxSize != 0)
{
if (data__.empty())
{
if (closed__)
{
return ret;
}
empty_cond__.wait(lock, [this] { return closed__ || !data__.empty(); });
if (data__.empty())
{
return ret;
}
}
}
else
{
// ...
}
ret = data__.front();
data__.pop_front();
if constexpr (MaxSize != 0)
{
full_cond__.notify_one();
}
return ret;
}
复制代码
整个流程如下:
这里需要注意与 push 的一点不同是对 channel 关闭的处理。当 channel 关闭后,调用 push 会返回失败,而调用 pop 却不一定会返回无效数据,因为 pop 先判断的是 channel 中是否有数据,因此,当 channel 被关闭后,依然可以取出 channel 中已有的数据。
close
channel 关闭的逻辑很简单,如下:
template <typename ValueType, int MaxSize>
void co_chan<ValueType, MaxSize>::close()
{
std::lock_guard<co_mutex> lock(mu__);
closed__ = true;
if constexpr (MaxSize == 0)
{
// ...
}
else
{
full_cond__.notify_all();
empty_cond__.notify_all();
}
}
复制代码
整个逻辑将关闭的标志位设置上,然后唤醒所有的等待中的协程。
无缓冲的 channel
相对于有缓冲的 channel,有缓冲的 channel 实现会复杂一些。
push
template <typename ValueType, int MaxSize>
bool co_chan<ValueType, MaxSize>::push(ValueType value)
{
std::unique_lock<co_mutex> lock(mu__);
if (closed__)
{
return false;
}
if constexpr (MaxSize > 0) // 当MaxSize < 0,是无限长度的chan
{
// ...
}
else if constexpr (MaxSize == 0) // 无缓冲chan
{
lock.unlock();
read_sem__.acquire();
lock.lock();
}
data__.push_back(value);
if constexpr (MaxSize == 0)
{
lock.unlock();
write_sem__.release();
lock.lock();
}
else
{
// ...
}
return true;
}
复制代码
与有缓冲的 channel 不同的是,无缓冲的 channel 需要发送端与接收端同时解锁,整体的逻辑如下:
接下来将 pop 的逻辑也加上。
pop
template <typename ValueType, int MaxSize>
std::optional<ValueType> co_chan<ValueType, MaxSize>::pop()
{
std::optional<ValueType> ret;
std::unique_lock<co_mutex> lock(mu__);
if constexpr (MaxSize != 0)
{
// ...
}
else
{
if (closed__)
{
return ret;
}
lock.unlock();
read_sem__.release();
write_sem__.acquire();
lock.lock();
if (data__.empty())
{
return ret;
}
}
ret = data__.front();
data__.pop_front();
if constexpr (MaxSize != 0)
{
// ...
}
return ret;
}
复制代码
逻辑如下:
整体流程
将 push 与 pop 流程交叉起来如下:
接收端:释放一个 read_sem__,说明在等待数据
发送端:当 read_sem__ acquire 成功之后,说明有读者在等待数据
发送端:写数据到 data__中
发送端:释放一个 write_sem__(说明有数据准备好了)
接收端:获取 write_sem__,返回后说明发送端已经将数据准备好了
接收端:取出数据
已知问题
以上的无缓冲 channel 实现有一些问题,当 lock 在解锁后,可能会有其他的协程将 channel 关闭,整个流程中对这种情况没有做处理,在此场景下会出问题。这个问题留在后续的版本中修复。
总结
本文讨论了用于协程通信的 c 三种不同类型的 channel 的实现。
评论