写点什么

深入浅出 c++ 协程丨 C++ 协程实现

发布于: 2021 年 05 月 12 日

1|0 一些实现的 c++协程

C++协程实现相关视频讲解:(视频代码资料点击 正在跳转 获取)

协程的实现与原理剖析(上)

协程的实现与原理剖析(下)

协程是一种函数对象,可以设置锚点做暂停,然后再该锚点恢复继续运行,我觉得这是最合适的定义,用户态线程,轻量级线程,可中断恢复的函数,这些都不够精确,先来认识一个 boost 1.75 的一个例子

#include <iostream>#include <boost/coroutine2/all.hpp>
void coroutine_function(boost::coroutines2::coroutine<void>::pull_type & coro_back){ std::cout << "a "; coro_back(); // 锚点,返回 std::cout << "b "; coro_back(); //锚点 返回 std::cout << "c ";}
int main(){ boost::coroutines2::coroutine<void>::push_type coroutine_object(coroutine_function); // 创建协程 std::cout << "1 "; coroutine_object(); // 运行协程 std::cout << "2 "; coroutine_object(); // 返回锚点,继续运行协程 std::cout << "3 "; coroutine_object(); // 返回锚点,继续运行协程 return 0;}g++ test.cpp -lboost_coroutine -lboost_context -o test./pull--------------输出分割线-------------1 a 2 b 3 c
复制代码

在 main( )中创建了一个协程 coroutine_object,然后调用 coroutine_object()去运行,实际上运行的 coroutine_function( )函数,而且每次运行到 coro_back();就中断当前的执行返回,下次调用 coroutine_object()就从这个断点继续运行,这就是协程的全部了

为什么会有协程是轻量级线程的说法呢?因为协程具有中断可恢复的特性,那么只需要在开一个全局的数组存储所有的协程,在协程中断时,不断轮转调用下一个协程继续运行即可; 这看起来似乎和线程无异,但其实有巨大的区别,因为协程本质是函数,调用协程后原来的地方就会被阻塞,协程处理完了才返回结果,这是天然同步的,而多线程无法做到这点,因为多线程的调度受内核控制,触发点来自于硬件时钟中断不可预见,同时又运行在多核心下,调用后运行次序是不确定的,想实现同步调用就必须通过 std::promise/future 去辅佐,但为了性能往往见到的是异步+回调的方式进行多线程的交互,异步回调代码的可读性是很差的而且还需要考虑一大堆并发上锁的情况,协程因其函数本质,是天然同步的,而在遇到阻塞条件时候,把 cpu 让给别的协程,等条件满足了再通过中断可恢复的特性再继续运行,就实现了并发,同步+并发就是协程强大的地方,其使用范式和轮转+同步非阻塞很像

接下来会介绍一些目前的实现的协程,有非官方的: boost.coroutine2 的协程,使用起来方便,让我们可以直观了解协程;微信的 libco, 源码很好阅读,资料多,可以进一步学习到协程是如何实现运行的;而官方本身的 c++20 协程,还不成熟,使用起来比较复杂,官方的东西还是需要提前了解;

C/C++ Linux 服务器开发高级架构学习视频点击观看:C/C++Linux服务器开发/Linux后台架构师-学习视频

2|0 一些实现的 c++协程

2|1boost 中的协程


push_type 和 pull_type

boost 自己早就实现了一套协程,先后推出了两个版本 boost coroutine 和 boost coroutine2,现在第一个版本 boost coroutine 已经弃用, 直接看看 coroutin2 的简单例子

#include <iostream>#include <boost/coroutine2/all.hpp>
void foo(boost::coroutines2::coroutine<int>::push_type & sink){ std::cout<<"start coroutine\n"; sink(1); std::cout<<"finish coroutine\n";}

int main(){ boost::coroutines2::coroutine<int>::pull_type source(foo); std::cout<<source.get()<<std::endl; std::cout<<source()<<std::endl; std::cout<<"finish\n"; return 0;}
复制代码

编译链接运行后

g++ pull.cpp -lboost_coroutine -lboost_context -o pull./pull--------------输出分割线-------------start coroutine1finish coroutinefinish
复制代码


boost.corountine2 中的协程增加了 push_type 和 pull_type 用于提供协程数据的流转,约束了数据的从 push_type 流入,从 pull_type 流出, 上面的 demo 定义协程对象 source 的时候使用了 pull_type,所以协程函数参数类型是 push_type



当协程对象被创建之后就直接运行,直到 sink(1)的时候暂停返回到 main 中,main 中使用 source.get()获取数据,继续使用 source()调用协程对象,协程从 sink(1)之后继续运行执行完毕,返回 main,main 也执行完毕。


上面是一个 pull 的例子,接下来是一个 push 的例子

#include <iostream>#include <boost/coroutine2/all.hpp>
void foo(boost::coroutines2::coroutine<int>::pull_type& sink){ std::cout<<"start coroutine\n"; //sink(); int a = sink().get(); std::cout<<a<<std::endl; std::cout<<"finish coroutine\n";}
复制代码


int main(){    boost::coroutines2::coroutine<int>::push_type source(foo);        std::cout<<"finish\n";    source(0);    source(5);    return 0;}
复制代码

编译输出

g++ push.cpp -lboost_coroutine -lboost_context./push --------------输出分割线-------------finishstart coroutine5finish coroutine
复制代码

也可以看到一个细节, 当 source 为 pull_type 的时候,协程是马上运行的,因为此时不用传递数据进行,而 push_type 的时候,需要 source()才会运行,第一次需要放一个没用的数据

为了使用方便,boost::coroutine2 实现了协程迭代器,如下

push_coroutine< T > &push_coroutine< T >::operator()( T const& t) { //() 切换协程    cb_->resume( t);    return * this;}

class iterator{ // 实现迭代器 .... iterator & operator++() noexcept { return *this; } }
复制代码


所以其支持如下用法,直接在 range-for 切换元素的时候就能恢复运行

boost::coroutines2::coroutine<void>::push_type source(foo);    for(auto& s : source){    	std::cout<<"run"    }
复制代码

关于 C/C++ Linux 后端开发网络底层原理知识 点击 学习资料 获取,内容知识点包括 Linux,Nginx,ZeroMQ,MySQL,Redis,线程池,MongoDB,ZK,Linux 内核,CDN,P2P,epoll,Docker,TCP/IP,协程,DPDK 等等。

fiber

因为 push_type 和 pull_type 这样的简洁组合已经可以解决基本问题---同步调用的中断恢复,但是只有多协程并发才能发挥其真正威力,为此需要同步和调度,boost 搞了个 fiber(纤程,这才是轻量级线程)出来,是在 coroutine2 的基础上添加了协程调度器以及 barrier mutex channel promise future condition_variable, sleep yield 等协程同步工具,这些和线程同步工具很像,因为在多协程场景下,它两模型和解决的问题都是一样的,都是通过调度多实体实现并发,但是协程有很多好处,开销很小,而且调度是运行的协程自己控制让出 cpu 给下一个要运行的线程,是可预见的,同时调用上是同步的,保证了顺序性就可以避免锁,

下面是 boost 的 fiber 的一个例子

#include <boost/fiber/all.hpp>#include <iostream>
using namespace std;using namespace boost; void callMe(fibers::buffered_channel<string>& pipe) { pipe.push("hello world");} int main() { fibers::buffered_channel<string> pipe(2); fibers::fiber f([&]() {callMe(pipe); }); f.detach(); string str; std::cout<<"start pop"<<std::endl; pipe.pop(str); //切换协程运行 std::cout<<"get str:"<<str<<std::endl; return 0;}
复制代码

编译运行

g++ channel.cpp -o channel -lboost_fiber -lboost_context./channel -------------------输出分割线-------------------start popget str:hello world
复制代码

这是一个最简单的例子,并没有去体现使用一个 loop 去做调度协程,调度还是由一些函数手动触发的

注意 pull_type 和 push_type 的操作已经没有了,那协程是如何切换的呢? 切换发生在 pipe.pop( )中, fibers::buffered_channel 是一个缓存队列,用来传输数据,pop 的底层检测到没有数据,会就开始让出 cpu,底层的协程调度器就开始调度别的协程进行运行,没有看过源码不知道执行到 pipe.push 的时候是否有没有发生调度,也许有也许没有,但都不太重要,因为这就和线程是一样的;

由于 fiber 中有调度器的存在,当前协程主动让出 cpu,调度器让别的协程运行,比如上面的 pipe.pop(),相当执行了一个协程的 co_yield()操作让出 cpu;所以,某个协程中如果有阻塞操作,将导致整个线程都处于阻塞,所有协程都被阻塞, 此文提出两种解决方法

同步改成非阻塞,一旦发现未达到条件直接 yield()让出 cpu,再后面轮转调度还能回到该店

int read_chunk( NonblockingAPI & api, std::string & data, std::size_t desired) {    int error;    while ( EWOULDBLOCK == ( error = api.read( data, desired) ) ) {        boost::this_fiber::yield();    }    return error;}
复制代码

2|2asio 中的协程

asio 的协程总感觉有两个版本,一个是 c++20 之前就有的版本,还有一个是在 c++20 的提供的协程的基础上封装的版本;asio 的协程是无栈协程(后文会介绍),无栈协程除了运行高效,节省内存之外,还能通过 gdb 查看到调用堆栈,有栈协程的堆栈因为被汇编切换走了是没法看到的,asio 基于其 io_context(详见 asio 的异步与线程模型解析)实现了多协程调度,所以要使用它的协程就就需要用到它的 io_context(在此可以理解成一个跑着 loop 的协程调度器),该例子取自 asio/src/examples/cpp17/coroutines_ts/echo_server.cpp

#include <asio/co_spawn.hpp>#include <asio/detached.hpp>#include <asio/io_context.hpp>#include <asio/ip/tcp.hpp>#include <asio/signal_set.hpp>#include <asio/write.hpp>#include <cstdio>#include <iostream>
using asio::ip::tcp;using asio::awaitable;using asio::co_spawn;using asio::detached;using asio::use_awaitable;namespace this_coro = asio::this_coro;
#if defined(ASIO_ENABLE_HANDLER_TRACKING)# define use_awaitable \ asio::use_awaitable_t(__FILE__, __LINE__, __PRETTY_FUNCTION__)#endif
awaitable<void> echo(tcp::socket socket){ try { char data[1024]; for (;;) { std::size_t n = co_await socket.async_read_some(asio::buffer(data), use_awaitable); co_await async_write(socket, asio::buffer(data, n), use_awaitable); } } catch (std::exception& e) { std::printf("echo Exception: %s\n", e.what()); }}void fn2(){ std::cout<<"hhh\n";}
void fn(){ fn2();}
awaitable<void> listener(){ auto executor = co_await this_coro::executor; fn(); tcp::acceptor acceptor(executor, {tcp::v4(), 8988}); for (;;) { tcp::socket socket = co_await acceptor.async_accept(use_awaitable); //调用协程,体现同步性 co_spawn(executor, echo(std::move(socket)), detached);// 创建连接处理线程 }}
int main(){ try { asio::io_context io_context(1);
asio::signal_set signals(io_context, SIGINT, SIGTERM); signals.async_wait([&](auto, auto){ io_context.stop(); });
co_spawn(io_context, listener(), detached); // 创建纤程,体现并发性
io_context.run(); // 开始调度 } catch (std::exception& e) { std::printf("Exception: %s\n", e.what()); }}
复制代码

代码很长,但只需要看 main( )就可以了,co_spawn( )创建了一个协程,然后使用 io_context.run( ),对基于该 io_context 创建的协程进行调度, 上面实现的协程函数 listener( )中,使用 co_await acceptor.async_accept(use_awaitable)做一个协程的阻塞同步调用,async_accept( )中发现没有新的连接就让出 cpu 给当前 io_context 下别的协程继续运行,当时间片又切回到该协程时,发现有新的链接时候,往 io_context 中创建一个新的协程去处理该连接,这里就能很好的体现了协程的同步和并发的应用场景,调度过程;

asio 的协程是基于 c++20 实现的,简单的介绍因为 asio 库很通用,还没有精力继续研究,但可以先来看看 c++20 的协程给的基础设施。

2|3c++20 的协程

c++20 的协程目前只是一套框架基础,远未成熟,最好的文档参考还是 cppreference,同时的这里两篇很好的文章进行了介绍文章 1 和文章 2

先看一个非常简化的例子看整体

#include <iostream>#include <thread>#include <coroutine>#include <future>#include <chrono>#include <functional>
struct Result{ struct promise_type { Result get_return_object() { return {}; } std::suspend_never initial_suspend() { return {}; } std::suspend_never final_suspend() noexcept { return {}; } void return_void() {} void unhandled_exception() {} };};
std::coroutine_handle<> coroutine_handle;
struct AWaitableObject{ AWaitableObject() {} bool await_ready() const {return false;}
int await_resume() { return 0; }
void await_suspend(std::coroutine_handle<> handle){ coroutine_handle = handle; }
};

Result CoroutineFunction(){ std::cout<<"start coroutine\n"; int ret = co_await AWaitableObject(); std::cout<<"finish coroutine\n";}


int main(){ std::cout<<"start \n"; auto coro = CoroutineFunction(); std::cout<<"coroutine co_await\n"; coroutine_handle.resume();
return 0;}
复制代码

对该程序使用如下方式进行编译运行(需 g++10.2.0 及以上)

g++ test4.cpp -O0 -g -o test4 -fcoroutines -std=c++20start start coroutinecoroutine co_awaitfinish coroutine
复制代码

我们可以看到它的运行正如一般协程一样, 在使用了关键字 co_await 后会返回到 caller, 在 main 中使用 resume()后,回到 co_await 的赋值等式中运行

接下来,介绍目前 c++协程的设计思想和细节

Results CoroutineFunction(){		co_await AwaitatbleObject();		co_return {};}
复制代码

一个协程函数形式如上,当函数体内出现了 co_await, co_yield,co_return 这三个关键字之后,就会被当成一个协程函数;此时,编译器要求返回值类型是否包含一个 promise_type 的结构以及需要实现必要的函数,以上一个例子中的 Result 类型为例:

struct Result{  struct promise_type {    Result get_return_object() { return {}; }    std::suspend_never initial_suspend() { return {}; }    std::suspend_never final_suspend() noexcept { return {}; }    void unhandled_exception() {}        suspend_aways yield_value(){} // 对应co_yield    void return_void() {}	//对应co_return    Result return_value(const Result& res){ return res;}  };};
复制代码

c++20 的编译器对于协程的运行有一套流程,我们可以通过提供 promise_type 去控制这个流程,同时对于协程的 caller 而言,协程运行后的只能获得返回值,所以希望通过它与协程进行后续交互的主要对象, 获取返回值, 处理异常等功能,所以这个很重要的控制器 struct promise_type 就放在了返回值类型 Result 中;

下面介绍 promise_type 的接口在协程运行如何交互,从头到尾,主要分成下面三个阶段

开头初始化准备:

  • 协程函数运行后,首先生成一个 promise_type 对象

  • 调用 get_return_object()函数创建返回值对象,这个对象会在协程第一次返回时就会把这个对象返回给 caller;

  • 调用 initial_suspend()函数,这个返回值有两个选择 suspend_never/suspend_always,never 表示继续运行,always 表示协程挂起,同时把返回值对象返回,所以这个接口的语义是,协程创建后是否马上运行

运行:

  • 开始运行协程函数,如果出现异常会调用 unhandled_exception()去处理

  • 如果遇到 co_yield var 这样的表达式,表示想要挂起当前协程,返回一个值给 caller 店, 编译器调用 yield_value(var)方法,我们可以此时将值设置到 Result 的相关变量中,编译器会继续根据函数的返回值判断是否为 suspend_always 判断要返回到 caller 点

  • 如果 co_return 这样的表达式,想要结束协程返回一个对象,则会调用 return_value()这个函数,设置好要返回的相关值; 如果整个协程都没有出现 co_return,则会调用 return_void()

结束善后:

  • 最后调用 final_suspend() 判断协程已处理完毕释放前是否要挂起

其中有一个重要的关键字--co_await, 这是一个一元操作符,操作的对象为 awaitable 类型,就是实现 await_ready(), await_resume(), await_suspend( ) 的类型,如例子所示的 AWaitableObject

struct AWaitableObject{	AWaitableObject() {}    	bool await_ready() const {return false;}
int await_resume() { return 0; }
void await_suspend(std::coroutine_handle<> handle){ coroutine_handle = handle; }};
复制代码

当使用 co_await awaitable_object 时:

  • 首先运行 await_ready( )函数,判断是否要挂起当前线程: 如果是 false,则不挂起; 如果是 true,则表示要挂起,然后会调用 await_suspend(),用于提供挂起前的处理,然后协程就被挂在这个点

  • 一旦协程被恢复运行时,继续调用 await_resume()在返回一个值到协程挂起点,如例子所示

co_await 除了显示使用之外,promise_type 的接口中凡是返回了 suspend_never/suspend_always 的地方,编译器都是通过 co_await 的方式调用这些函数的,suspend_never/suspend_always 是 awaitable 类型

struct suspend_always  {    bool await_ready() { return false; }
void await_suspend(coroutine_handle<>) {}
void await_resume() {} };
struct suspend_never { bool await_ready() { return true; }
void await_suspend(coroutine_handle<>) {}
void await_resume() {} };
复制代码

每个协程都对应一个 handle,用来管理协程的挂起和恢复,比如说 handle.resume()就是用来恢复协程的运行的

协程 handle 的获取有两种方式:

  • 第一种是通过 co_await 的 await_suspend( )方法,该方法被调用时就能拿到协程的 handle,但是这个方法肯定是不太好;

  • 另一种方法是可以从 promise_type 对象中拿到,需要使用这个方法 coroutine_handle<promise_type>::from_promise(promise_type obj)基于此,我们可以对返回值做如下改造

struct Result{  //add  Result(promise_type* obj):promise_type_ptr(obj){}  //add  void resume(){  	promise_type_ptr->resume();  }
struct promise_type { // mod Result get_return_object() { return Reuslt(this); } // add void resume(){ coroutine_handle<promise_type>::from_promise(*this).resume(); } std::suspend_never initial_suspend() { return {}; } std::suspend_never final_suspend() noexcept { return {}; } void unhandled_exception() {} suspend_aways yield_value(){} void return_void() {} Result return_value(const Result& res){ return res;} }; // add promise_type *promise_type_ptr;};
复制代码

则可以通过如下方式使用

auto result = CoroutineFunction();result.resume();
复制代码

从 promise_type 到 awaitable object,c++20 的协程目前提供的更多的是一个灵活的基础框架,离使用上还有一段距离

除此之外还有大量的优秀的协程库,比如基于 c++20 的 libcopp, cppcoro,以及不依赖微信自己实现的 libco(由于篇幅原因,libco 介绍与实现分析不放在当前文章)

3|0 协程的一些应用场景

awaitable<void> listener(){  auto executor = co_await this_coro::executor;  fn();   tcp::acceptor acceptor(executor, {tcp::v4(), 8988});  for (;;)  {    tcp::socket socket = co_await acceptor.async_accept(use_awaitable); //调用协程,体现同步性    co_spawn(executor, echo(std::move(socket)), detached);// 创建连接处理线程  }}
int main(){ try { asio::io_context io_context(1);
asio::signal_set signals(io_context, SIGINT, SIGTERM); signals.async_wait([&](auto, auto){ io_context.stop(); });
co_spawn(io_context, listener(), detached); // 创建协程,体现并发性
io_context.run(); // 开始调度 } catch (std::exception& e) { std::printf("Exception: %s\n", e.what()); }}
复制代码

在 asio 的例子中很好的介绍了协程的使用方式了,主要是不断的创建协程,让调度器调度运行,在协程运行过程对于一些会阻塞的条件,做一个非阻塞的检测中,发现条件不满足就让出 cpu,这就是常见轮转+非阻塞同步。

4|0 协程的分类

4|1 有栈协程和无栈协程

协程可以分成有栈 stackful 和无栈 stackless 两种,比如,libco 就是有栈协程, 每个协程创建的时候都会获得一块 128k 的堆内存,协程运行的时候就是使用这块堆内存当作运行栈使用,切换时候保存/恢复运行栈和相应寄存器,而无栈协程不需要这些,因为无栈协程的实现原理并不是通过切换时保存/恢复运行栈和寄存器实现的,它的实现见下,由于协程的每个中断点都是确定,那其实只需要将函数的代码再进行细分,保存好局部变量,做好调用过程的状态变化, 下面就将一个协程函数 fn 进行切分后变成一个 Struct,这样的实现相对于有栈协程而言使用的内存更少,因为有栈协程的运行栈由堆获得,必须要保证运行栈充足,然而很多时候用不到这么多的内存,会造成内存浪费;

void fn(){	int a, b, c;	a = b + c;	yield();	b = c + a;	yield();	c = a + b;}
----------------------------分割线---------------------------------Struct fn{ int a, b, c; int __state = 0; void resume(){ switch(__state) { case 0: return fn1(); case 1: return fn2(); case 2: return fn3(); } } void fn1(){ a = b + c; } void fn2(){ b = c + a; } void fn3(){ c = a + b; }};
复制代码

4|2 对称和非对称

boost.coroutine2 和 libco 这类属于非对称协程,这类协程的特点是存在调用链,有调用和返回的关系,比如说 coroutine2 中进行 source()的时候去调用协程了,协程执行到阻塞点 sink()返回,而不是让出 cpu,随便执行别的协程;

用户头像

Linux服务器开发qun720209036,欢迎来交流 2020.11.26 加入

专注C/C++ Linux后台服务器开发。

评论

发布
暂无评论
深入浅出c++协程丨C++协程实现