写点什么

一个 cpp 协程库的前世今生(二十)外部调度

作者:SkyFire
  • 2022 年 1 月 18 日
  • 本文字数:9595 字

    阅读完需:约 31 分钟

一个cpp协程库的前世今生(二十)外部调度

为了防止大家找不到代码,还是先贴一下项目链接:


GitHub - skyfireitdiy/cocpp at cocpp-0.1.0


本文将介绍一种从外部切换协程的技巧。

切换方式

cocpp 中的切换方式分为内部主动切换与外部强制切换,下来解释一下这两者的区别。

内部主动切换

内部主动切换指的是运行中的协程在执行流中主动调用切换函数让出 cpu,发生切换,有可能是调用 this_co::yield,或者是因为使用同步手段等间接调用了切换函数, 总之,这一类切换一定是协程自身发起的。

外部强制切换

与内部主动切换不同的是,对于外部强制切换,被切换的协程自身的执行流中并没有协程切换的相关调用,或者即使有协程切换的相关调用,但是在很长一段时间内都没有被执行了,这种情况下,这个协程就会长时间占用 CPU,导致同一个执行环境下其他的协程得不到调度。


因此此时就需要外部力量干预,强行将长时间占用 CPU 的协程切出去。


想要将正在运行的线程从外部中断,最容易想到的办法是使用信号机制,那么接下来我们了解一下 linux 下的信号处理流程。

信号处理流程

信号处理时机

想要了解 linux 信号处理的细节,首先需要知道信号在什么时候会被处理,是任何时候来信号都会被处理吗?显然不是的,linux 的信号只在从内核态回到用户态的时候才会被处理。既然是从内核态返回用户态,那么首先需要知道什么场景会到内核态,进程从用户态转到用户态主要有以下场景:


  • 中断

  • 系统调用

  • 异常


因为时钟中断一直再发生,因此信号“几乎”都可以被“立即”处理,注意这里的“几乎”说明在某些场景下会有例外(比如长时间陷入内核态),“立即”指的是在一个时钟周期内。


再从用户态切换到内核态运行的时候,内核会保存当前的上下文到进程的内核栈中,然后从内核态回到用户态之前,内核会检测是否有信号需要处理,如果需要处理就会调用信号处理函数。信号处理函数调用之后,再回到用户态切初始的状态。



需要注意的地方是,整个过程并不是通过函数调用完成的(直接的函数调用并不会改变程序状态)。接下来来详述一下整个流程:


  1. 用户程序触发中断/系统调用/异常时,内核将当前程序的上下文保存到进程的内核栈中(pt_regs 结构)

  2. 内核处理完毕后,如果没有信号需要处理,就将 pt_regs 中的上下文恢复,切换到用户态

  3. 如果有信号处理,此时就完全不同了。内核会将进程内核栈中的 pt_regs 拷贝一份到用户栈中(在用户栈中为 sigcontext_64 结构),为何需要这个操作呢,因为接下来需要修改内核栈中的 pt_regs,这时将内核栈中 pt_regs 中寄存器的 sp 修改到用户栈(注意此时栈中有复制的上下文信息 sigcontext_64 ),ip 被设置为信号处理函数的地址,同时将栈顶设置为一个 rt_sigreturn 函数的函数地址。

  4. 信号处理函数执行结束,会由于栈顶是 rt_sigreturn 地址,所以执行 rt_sigreturn 函数,此函数是一个系统调用,因此又会再次陷入内核态,在系统调用中,将用户栈保存的上下文恢复到内核栈,然后返回用户态。


内核栈与用户栈的变化如下所示:


使用信号处理触发切换

从上面的介绍我们可以看到,在执行信号处理过程中,用户态上下文信息在用户栈中是有一个备份的,在信号处理函数执行结束之后,内核会使用这个备份来恢复执行上下文。因此我们可以通过修改这个备份的用户态上下文信息来实现协程切换。


接下来以几个最精简的例子来解释这一过程。

迭代一:找到对应的结构体

示例代码如下:


#include <cstdio>#include <cstdlib>#include <signal.h>#include <unistd.h>
using namespace std;
struct sigcontext_64{ unsigned long long r8; // 0*8 unsigned long long r9; // 1*8 unsigned long long r10; // 2*8 unsigned long long r11; // 3*8 unsigned long long r12; // 4*8 unsigned long long r13; // 5*8 unsigned long long r14; // 6*8 unsigned long long r15; // 7*8 unsigned long long di; // 8*8 unsigned long long si; // 9*8 unsigned long long bp; // 10*8 unsigned long long bx; // 11*8 unsigned long long dx; // 12*8 unsigned long long ax; // 13*8 unsigned long long cx; // 14*8 unsigned long long sp; // 15*8 unsigned long long ip; // 16*8 unsigned long long flags; // 17*8
// 后面这部分获取不到,暂时不用 // unsigned short cs; //18*8 // unsigned short gs; //18*8+2 // unsigned short fs; //18*8+4 // unsigned short __pad0; //18*8+6 // unsigned long long err; //19*8 // unsigned long long trapno; //20*8 // unsigned long long oldmask; //21*8 // unsigned long long cr2; //22*8 // unsigned long long fpstate; //23*8 // unsigned long long reserved1[8]; //24*8};
void printf_info(sigcontext_64* context){ printf("r8: %llx\n", context->r8); printf("r9: %llx\n", context->r9); printf("r10: %llx\n", context->r10); printf("r11: %llx\n", context->r11); printf("r12: %llx\n", context->r12); printf("r13: %llx\n", context->r13); printf("r14: %llx\n", context->r14); printf("r15: %llx\n", context->r15); printf("di: %llx\n", context->di); printf("si: %llx\n", context->si); printf("bp: %llx\n", context->bp); printf("bx: %llx\n", context->bx); printf("dx: %llx\n", context->dx); printf("ax: %llx\n", context->ax); printf("cx: %llx\n", context->cx); printf("sp: %llx\n", context->sp); printf("ip: %llx\n", context->ip); printf("flags: %llx\n", context->flags);}
static void switch_signal_handler(int signo){ sigcontext_64* context = nullptr; __asm volatile("leaq 88(%%rsp), %%rax\t\n" "movq %%rax, %0\t\n" : "=m"(context) : : "memory", "rax"); printf_info(context); abort();}
int main(){ signal(SIGSEGV, switch_signal_handler); int* p = (int*)5; *p = 0; return 0;}
复制代码


此处定义了一个 sigcontext_64 结构体,这个结构体是从内核代码抄过来的。就是上文中提到用户栈中的上下文信息。


在 main 函数中我们对地址 5 写入数据,触发了错误信号。此时进入 switch_signal_handler 处理函数。


在这里函数中,我们使用汇编取到 sigcontext_64 结构体的起始地址,这里有一个从当前 rsp 偏移 88 的操作,别问我是怎么算出来的,这个我根本就没有算,我是通过调试器打印栈内存推出来的。


拿到上下文信息之后,我们将上下文信息打印出来,与实际的上下文做比较验证(借助调试器):


(gdb) rStarting program: /home/skyfire/code/test/test_sig/build/linux/x86_64/debug/main 
Program received signal SIGSEGV, Segmentation fault.0x00005555555553b3 in main () at main.cpp:8080 *p = 0;(gdb) i registers rax 0x5 5rbx 0x0 0rcx 0x0 0rdx 0x7fffffffdd80 140737488346496rsi 0x7fffffffdce0 140737488346336rdi 0xb 11rbp 0x7fffffffdf90 0x7fffffffdf90rsp 0x7fffffffdf80 0x7fffffffdf80r8 0x7fffffffded0 140737488346832r9 0x0 0r10 0x8 8r11 0x202 514r12 0x555555555070 93824992235632r13 0x0 0r14 0x0 0r15 0x0 0rip 0x5555555553b3 0x5555555553b3 <main()+37>eflags 0x10206 [ PF IF RF ]cs 0x33 51ss 0x2b 43ds 0x0 0es 0x0 0fs 0x0 0gs 0x0 0(gdb) cContinuing.r8: 7fffffffded0r9: 0r10: 8r11: 202r12: 555555555070r13: 0r14: 0r15: 0di: bsi: 7fffffffdce0bp: 7fffffffdf90bx: 0dx: 7fffffffdd80ax: 5cx: 0sp: 7fffffffdf80ip: 5555555553b3flags: 10206
Program received signal SIGABRT, Aborted.__GI_raise (sig=sig@entry=6) at ../sysdeps/unix/sysv/linux/raise.c:4949 ../sysdeps/unix/sysv/linux/raise.c: No such file or directory.(gdb)
复制代码


从上面的结果来看,我们获取到的上下文是正确的。

迭代二:通过上下文信息切换协程

接下来补充协程的这一部分:


#include <chrono>#include <cstdio>#include <cstdlib>#include <signal.h>#include <sys/syscall.h>#include <sys/types.h>#include <thread>#include <unistd.h>
using namespace std;
struct sigcontext_64{ unsigned long long r8; // 0*8 unsigned long long r9; // 1*8 unsigned long long r10; // 2*8 unsigned long long r11; // 3*8 unsigned long long r12; // 4*8 unsigned long long r13; // 5*8 unsigned long long r14; // 6*8 unsigned long long r15; // 7*8 unsigned long long di; // 8*8 unsigned long long si; // 9*8 unsigned long long bp; // 10*8 unsigned long long bx; // 11*8 unsigned long long dx; // 12*8 unsigned long long ax; // 13*8 unsigned long long cx; // 14*8 unsigned long long sp; // 15*8 unsigned long long ip; // 16*8 unsigned long long flags; // 17*8
// 后面这部分获取不到,暂时不用 // unsigned short cs; //18*8 // unsigned short gs; //18*8+2 // unsigned short fs; //18*8+4 // unsigned short __pad0; //18*8+6 // unsigned long long err; //19*8 // unsigned long long trapno; //20*8 // unsigned long long oldmask; //21*8 // unsigned long long cr2; //22*8 // unsigned long long fpstate; //23*8 // unsigned long long reserved1[8]; //24*8};
sigcontext_64 co1_ctx;sigcontext_64 co2_ctx;long long co2_stack[1024];int curr_co = 1;
void switch_co(sigcontext_64* context){ if (curr_co == 1) { curr_co = 2; co1_ctx = *context; *context = co2_ctx; } else { curr_co = 1; co2_ctx = *context; *context = co1_ctx; }}
static void switch_signal_handler(int signo){ sigcontext_64* context = nullptr; __asm volatile("leaq 88(%%rsp), %%rax\t\n" "movq %%rax, %0\t\n" : "=m"(context) : : "memory", "rax"); printf("switch\n"); switch_co(context);}
void co1(){ while (true) { printf("co1, thread %d\n", gettid()); std::this_thread::sleep_for(std::chrono::milliseconds(500)); }}
void co2_entry(){ while (true) { printf("co2, thread %d\n", gettid()); std::this_thread::sleep_for(std::chrono::milliseconds(500)); }}
void switch_signal_sender(pid_t tid){ while (true) { std::this_thread::sleep_for(std::chrono::seconds(1)); ::syscall(SYS_tgkill, ::getpid(), tid, SIGUSR1); }}
int main(){ signal(SIGUSR1, switch_signal_handler); co2_ctx.ip = (long long)co2_entry; co2_ctx.sp = (long long)&co2_stack[1024] - 8; co2_ctx.bp = co2_ctx.sp; auto tid = gettid(); thread th(switch_signal_sender, tid); co1(); th.join(); return 0;}
复制代码


代码比较长,但是并不复杂。


全局变量 co1_ctx 和 co2_ctx 分别用来保存和恢复两个协程的上下文,co2_stack 是 co2 的栈(co1 的栈就是线程的栈),curr_co 表示当前正在运行的 co。


接下来看一下 main 函数。


  • 首先注册了信号处理函数,信号处理函数中做了一些修改,在获取到当前的上下文之后调用了 switch_co,该函数中根据当前运行的是哪一个 co,然后保存当前 co 上下文并且恢复下一个将要运行的 co 上下文。

  • 设置 co2 的上下文,因为 co2 不是直接调用的,而是需要在第 1 次切换的时候启动。

  • 启动一个外部切换的线程,该线程每秒会发一次 SIGUSR1 信号来触发切换。

  • 调用 co1


当 co1 正在运行的时候,收到来自切换线程的切换信号,进入信号处理函数,信号处理函数将备份的上下文修改为 co2 的上下文,然后信号处理函数返回的时候,就自然切换到 co2 中执行了,对于 co2 切换到 co1,原理相同,不再赘述。

其输出结果如下:


co1, thread 21894co1, thread 21894switchco2, thread 21894co2, thread 21894switchco1, thread 21894co1, thread 21894switchco2, thread 21894co2, thread 21894switchco1, thread 21894co1, thread 21894switchco2, thread 21894co2, thread 21894
复制代码

cocpp 中的处理

监控任务

外部强制切换的触发是由监控任务进行的,所以我们来先看一下监控任务。


监控任务的创建在 co_manager 的构造函数中(source/cocpp/core/co_manager.cpp)。


co_manager::co_manager(){    subscribe_manager_event__();    setup_switch_handler();    create_background_task__();} 
复制代码


此处调用了三个函数:


  • subscribe_manager_event__ 注册 manager 的事件处理

  • setup_switch_handler 设置外部切换的回调函数

  • create_background_task__创建后台线程


先看一下 subscribe_manager_event__(source/cocpp/core/co_manager.cpp):


void co_manager::subscribe_manager_event__(){    timing_routine_timout().sub([this] {        // 每两次超时重新分配一次        static bool double_timeout = false;
// 强制重新调度 force_schedule__();
// 如果是第二次超时 if (double_timeout) { // 重新调度 redistribute_ctx__(); // 偷取ctx steal_ctx_routine__(); // 销毁多余的env destroy_redundant_env__(); // 释放内存 free_mem__(); } double_timeout = !double_timeout; });}
复制代码


这里为 timing_routine_timout 事件注册了回调函数。


回调函数中,本文仅关心 force_schedule__函数,及外部强制调度函数。


force_schedule__函数的实现在下文会详细介绍,此时知道他是用来做强制调度的就可以了。


第二个函数 setup_switch_handler ,实现如下(source/cocpp/core/co_vos_gcc_x86_64.cpp):


void setup_switch_handler(){    ::signal(CO_SWITCH_SIGNAL, switch_signal_handler);} 
复制代码


此处使用::signal 注册了信号处理函数。


最后看一下第 3 个函数 create_background_task__(source/cocpp/core/co_manager.cpp):


void co_manager::create_background_task__(){    background_task__.emplace_back(std::async(std::launch::async, [this]() {        clean_env_routine__();    }));    background_task__.emplace_back(std::async(std::launch::async, [this]() {        timer_routine__();    }));
background_task_created().pub();}
复制代码


前面的函数为定时器超时事件注册了回调函数,而这里就是启动定时器线程,定时器执行函数为 timer_routine__。该函数实现非常简单(source/cocpp/core/co_manager.cpp):


void co_manager::timer_routine__(){    std::unique_lock<co_spinlock> lck(clean_up_lock__);    while (!clean_up__)    {        lck.unlock();        std::this_thread::sleep_for(timing_duration());        timing_routine_timout().pub();        lck.lock();    }    timing_routine_finished().pub();}
复制代码


使用循环睡眠来实现最简单的定时器,这对于外部强制调度任务来说已经足够了。


总结一下整个流程:manager 创建的时候,启动定时器、注册了定时器的超时处理函数为强制调度,同时设置了调度需要的信号。

force_schedule__

接下来看一下关键函数 force_schedule__的实现(source/cocpp/core/co_manager.cpp):


void co_manager::force_schedule__(){    std::scoped_lock lock(env_set__.expired_lock, env_set__.normal_lock, clean_up_lock__);    if (clean_up__)    {        return;    }    for (auto& env : env_set__.normal_set)    {        // 如果检测到某个env被阻塞了,先锁定对应env的调度,防止在操作的时候发生调度,然后收集可转移的ctx        if (env->is_blocked())        {            // 强行外部调度            send_switch_from_outside_signal(env);        }        env->reset_scheduled_flag();    }} 
复制代码


这个函数遍历所有的 env,检测 env 是否被阻塞,is_blocked 函数的判断逻辑是 env 状态为 busy,并且未设置 CO_ENV_FLAG_SCHEDULED 标识(该标识在每次切换的时候都会被设置),这说明在此 env 上有协程被调度,但是在一个检测周期内没有发生调度了。


如果 env 被阻塞,对 env 调用 send_switch_from_outside_signal 进行切换。


然后调用 reset_scheduled_flag 清除 CO_ENV_FLAG_SCHEDULED 标识。


接下来看一下 send_switch_from_outside_signal 函数。

send_switch_from_outside_signal

此函数非常简单(source/cocpp/core/co_vos_gcc_x86_64.cpp):


void send_switch_from_outside_signal(co_env* env){    ::syscall(SYS_tgkill, ::getpid(), static_cast<pid_t>(env->schedule_thread_tid()), CO_SWITCH_SIGNAL);}
复制代码


就是对 env 对应的调度线程发送一个 CO_SWITCH_SIGNAL 信号。这个信号的处理函数在 manager 初始化的时候被 setup_switch_handler 设置为 switch_signal_handler。


当线程收到 CO_SWITCH_SIGNAL 信号时,会触发 switch_signal_handler 执行,接下来看一下 switch_signal_handler:

switch_signal_handler

函数实现位于 source/cocpp/core/co_vos_gcc_x86_64.cpp:


static void switch_signal_handler(int signo){    sigcontext_64* context = nullptr;    __asm volatile("leaq 88(%%rsp), %%rax\t\n"                   "movq %%rax, %0\t\n"                   : "=m"(context)                   :                   : "memory", "rax");    switch_from_outside(context);}
复制代码


此函数从用户栈中获取了上下文,然后以上下文的地址为参数调用 switch_from_outside 函数。


接下来看一下 switch_from_outside 函数。

switch_from_outside

函数实现如下(source/cocpp/core/co_vos_gcc_x86_64.cpp):


void switch_from_outside(sigcontext_64* context){    auto env = co::current_env();
lock_interrupt(); if (!can_interrupt()) { unlock_interrupt(); return; }
co_ctx* curr = nullptr; co_ctx* next = nullptr;
if (!env->prepare_to_switch(curr, next)) { unlock_interrupt(); return; } save_context_to_ctx(context, curr); restore_context_from_ctx(context, next); unlock_interrupt(); // 能调用到此处,说明当前一定是在安全点内}
复制代码


注意一个调用 can_interrupt,此函数返回 env 当前状态是否可以被中断。当判断可以被中断的时候,使用 prepare_to_switch 函数选择下一个 ctx,然后调用 save_context_to_ctx 和 restore_context_from_ctx 完成上下文的保存与恢复。这两个函数的实现如下,前文有类似代码,不再赘述。


static void save_context_to_ctx(sigcontext_64* context, co_ctx* ctx){    *reinterpret_cast<sigcontext_64*>(ctx->regs()) = *context;}
static void restore_context_from_ctx(sigcontext_64* context, co_ctx* ctx){ *context = *reinterpret_cast<sigcontext_64*>(ctx->regs());}
复制代码

安全点

上面提到一个函数 can_interrupt,这个函数用于判断当前状态是否可以中断执行,与他相关实现如下(source/cocpp/core/co_vos_gcc_x86_64.cpp):


static thread_local int     interrupt_lock_count__;    // 当前中断锁的深度
bool can_interrupt(){ return interrupt_lock_count__ == 0;}
void lock_interrupt(){ mu_interrupt_lock_count__.lock();}
void unlock_interrupt(){ mu_interrupt_lock_count__.unlock();}
void increate_interrupt_lock_count(){ ++interrupt_lock_count__;}
void decreate_interrupt_lock_count(){ --interrupt_lock_count__;}
void increate_interrupt_lock_count_with_lock(){ std::scoped_lock lock(mu_interrupt_lock_count__); ++interrupt_lock_count__;}
void decreate_interrupt_lock_count_with_lock(){ std::scoped_lock lock(mu_interrupt_lock_count__); --interrupt_lock_count__;}
复制代码


只有当中断深度为 0(表示没有禁止中断)的时候,才能切换。


这里是中断的底层实现,我们看一下上层的封装 co_interrupt_closer(include/cocpp/core/co_interrupt_closer.h 和 source/cocpp/core/co_interrupt_closer.cpp):


class co_interrupt_closer final{public:    co_interrupt_closer();    ~co_interrupt_closer();};
co_interrupt_closer::co_interrupt_closer(){ lock_interrupt(); increate_interrupt_lock_count(); unlock_interrupt();}
co_interrupt_closer::~co_interrupt_closer(){ lock_interrupt(); decreate_interrupt_lock_count(); unlock_interrupt();}
复制代码


本质上就是利用 RAII 实现对范围内的中断锁控制。


我们将可中断的代码称为安全点。


接下来我们讨论一下为何需要安全点。


假设我们的代码运行到了协程切换,正在选择下一个需要执行哪个 ctx,这个流程需要对协程列表加锁保护,此时发生外部信号,打断了当前流程,然后进行调度,此时信号处理函数也需要对协程列表进行加锁,因此发生死锁。


所以,并不是每个地方都可以从外部调度。因此需要设立 interrupt_lock_count__来保护代码。


一个简单的做法就是,当代码运行到 cocpp 框架中时,我们认为是不安全的。而在用户自己的协程上下文中,我认为是安全的。


因此在调度线程启动的时候就可以将中断关闭,在切换上下文之前打开中断,从切换到上下文返回之后,再次关闭中断。


其控制代码位于 start_schedule_routine__、schedule_switch、switch_shared_stack_ctx__、switch_normal_ctx__等函数中,代码篇幅比较长就不再粘贴了。

总结

本文讨论了从外部如何切换一个长时间运行的协程,并结合具体的代码案例来深入理解其机制,这一部分功能需要对内核信号处理的实现有一定的理解。在此基础上讨论了安全点存在的必要性以及其实现方法。

发布于: 2022 年 01 月 18 日阅读数: 1270
用户头像

SkyFire

关注

这个cpper很懒,什么都没留下 2018.10.13 加入

会一点点cpp的苦逼码农

评论 (2 条评论)

发布
用户头像
"这里有一个从当前 rsp 偏移 88 的操作,别问我是怎么算出来的,这个我根本就没有算,我是通过调试器打印栈内存推出来的。"

大佬,你贴个图呢?
2022 年 01 月 31 日 19:50
回复
没有呢,我当时看着调试器一算就完了……需要的话,我专门写篇文章?
2022 年 02 月 06 日 13:51
回复
没有更多了
一个cpp协程库的前世今生(二十)外部调度_c++_SkyFire_InfoQ写作社区