写点什么

一个 cpp 协程库的前世今生(十)调度的流程

作者:SkyFire
  • 2022 年 1 月 04 日
  • 本文字数:7143 字

    阅读完需:约 23 分钟

一个cpp协程库的前世今生(十)调度的流程

为了防止大家找不到代码,还是先贴一下项目链接:


GitHub - skyfireitdiy/cocpp at cocpp-0.1.0


上一篇介绍了任意协程入口函数是怎么准换到规范化的入口函数的,本篇分析一下一个协程从加入调度环境到运行结束被清理的整个流程。

创建 ctx

一个协程与一个 ctx 关联,所以创建协程就是创建 ctx。


让一个 ctx 运行起来共以下几个步骤:


  • 创建 ctx

  • 选择 env

  • 初始化 ctx

  • 加入调度列表


具体实现见 source/cocpp/core/co_manager.cpp:


co_ctx* co_manager::create_and_schedule_ctx(const co_ctx_config& config, std::function<void(co_any&)> entry, bool lock_destroy){    auto ctx = factory_set__.ctx_factory->create_ctx(config, entry);    subscribe_ctx_event__(ctx);    if (lock_destroy)    {        ctx->lock_destroy();    }    auto bind_env = ctx->config().bind_env;    if (bind_env != nullptr)    {        bind_env->add_ctx(ctx);    }    else    {        get_best_env__()->add_ctx(ctx);    }
ctx_created().pub(ctx); return ctx;}
复制代码


接下来就对这里的细节做一下分析。

创建 ctx

上面实现可以看出,ctx 是由 manager 通过工厂创建的,创建完成后,ctx 的大多数成员就都初始化了,除了:


  • 共享栈

  • 寄存器


创建 ctx 的具体实现在 source/cocpp/core/co_ctx_factory.cpp:


co_ctx* co_ctx_factory ::create_ctx(const co_ctx_config& config, std::function<void(co_any&)> entry){    auto ret = ctx_pool__.create_obj(config.shared_stack ? nullptr : co_stack_factory::instance()->create_stack(config.stack_size), config, entry);    assert(ret != nullptr);    if (config.bind_env != nullptr)    {        ret->set_flag(CO_CTX_FLAG_BIND);    }    if (config.shared_stack)    {        ret->set_flag(CO_CTX_FLAG_SHARED_STACK);    }    return ret;}
复制代码


ctx_pool__.create_obj 会调用 ctx 的构造函数(source/cocpp/core/co_ctx.cpp):


co_ctx::co_ctx(co_stack* stack, const co_ctx_config& config, std::function<void(co_any&)> entry)    : stack__(stack)    , config__(config)    , entry__(entry){    set_priority(config.priority);}
复制代码

选择 env

在第一阶段初始化完成后,manager 就会为新的 ctx 选择一个 env,选择的方法在 source/cocpp/core/co_manager.cpp,此处先不关注绑定 env 的这种场景,在这种场景下,不用选择 env:


co_env* co_manager::get_best_env__(){    std::lock_guard<std::recursive_mutex> lck(env_set__.normal_lock);    if (env_set__.normal_set.empty())    {        return create_env(true);    }
co_env* best_env = nullptr; auto min_workload = std::numeric_limits<int>::max(); size_t can_schedule_env_count = 0; for (auto& env : env_set__.normal_set) { if (env->state() == co_env_state::idle) { best_env_got().pub(env); return env; } if (!env->can_schedule_ctx()) { continue; } // 统计可用于调度的env数量 ++can_schedule_env_count; if (env->workload() < min_workload) { min_workload = env->workload(); best_env = env; } } // 如果没有可用的env,就创建 if (best_env == nullptr) { auto ret = create_env(true); best_env_got().pub(ret); return ret; }
// 如果可用于调度的env数量小于基础线程数量,创建一个来调度新的ctx if (can_schedule_env_count < env_set__.base_env_count) { auto ret = create_env(true); best_env_got().pub(ret); return ret; } best_env_got().pub(best_env); return best_env;}
复制代码


这里有几个步骤:


  1. 目前 env 列表为空时,创建一个 env 直接返回

  2. 遍历 env 列表,找到具有 ctx 调度能力并且工作负载最小的 env(注意:此处不是直接返回的)

  3. 如果上面步骤 2 没有找到合适的 env,创建一个新的 env 返回

  4. 如果上面步骤 2 找到了合适的 env,但是目前 env 的数量小于 env 数量的最小阈值,就创建 env 并返回

  5. 返回找到的 env


可以看出,虽然步骤 2 尝试找工作负载最小的 env,但是即使找到了也不一定要用。


工作负载的计算方法目前是以可调度的 ctx 数量为评价标准,见 source/cocpp/core/co_env.cpp:


int co_env::workload() const{    std::scoped_lock lock(mu_normal_ctx__, mu_min_priority__);
size_t ret = 0; for (unsigned int i = min_priority__; i < all_normal_ctx__.size(); ++i) { ret += all_normal_ctx__[i].size(); } return ret;}
复制代码

初始化

ctx 的初始化在 co_env::add_ctx 中完成,见 source/cocpp/core/co_env.cpp:


void co_env::add_ctx(co_ctx* ctx){    assert(ctx != nullptr);    if (state() == co_env_state::created || state() == co_env_state::destorying)    {        throw co_error("env state error");    }    init_ctx(shared_stack__, ctx); // 初始化ctx    ctx_initted().pub(ctx);
move_ctx_to_here(ctx);
ctx_added().pub(ctx);}
复制代码


这里调用了 ctx 的初始化函数 init_ctx,此函数主要初始化共享栈指针和 ctx 的寄存器信息(source/cocpp/core/co_vos_gcc_x86_64.cpp):


void init_ctx(co_stack* shared_stack, co_ctx* ctx){    auto      context = get_sigcontext_64(ctx);    co_stack* stack   = ctx->stack();    if (ctx->test_flag(CO_CTX_FLAG_SHARED_STACK))    {        stack = shared_stack;    }    auto config = ctx->config();
CO_SETREG(context, sp, stack->stack_top()); CO_SETREG(context, bp, stack->stack_top()); CO_SETREG(context, ip, &co_ctx::real_entry); CO_SETREG(context, di, ctx);}
复制代码


如果 ctx 有共享栈标识,就将传入的共享栈设置到 stack__成员上,如果没有共享栈标识,忽略这个参数,这种场景下 stack__以及被指向 ctx 的私有栈空间了。


然后是设置一些寄存器的值:rsp、rbp、rip、rdi,这个前面的文章有分析过,这个不做赘述。

添加到调度列表

co_env::move_ctx_to_here 会将初始化好的 ctx 加入 env 的调度列表中(source/cocpp/core/co_env.cpp)。


void co_env::move_ctx_to_here(co_ctx* ctx){    assert(ctx != nullptr);    assert(state() != co_env_state::created && state() != co_env_state::destorying);
ctx->set_env(this);
{ std::lock_guard<co_spinlock> lock(mu_normal_ctx__); all_normal_ctx__[ctx->priority()].push_back(ctx); }
update_min_priority__(ctx->priority()); set_state(co_env_state::busy);
wake_up(); ctx_received().pub(ctx);}
复制代码


此函数会将 ctx 中的 env 指针赋值,将 ctx 加入调度列表,然后更新最新小优先级(可能新增的 ctx 优先级比目前最高的还高),然后设置 env 状态为 busy,最后将 env 唤醒(此时 env 可能由于没有协程调度陷入休眠,所以需要唤醒,唤醒细节在后面的锁相关章节介绍)。

调度

调度循环

调度线程启动之后,会在新线程里面运行 co_env::start_schedule_routine__这个函数(source/cocpp/core/co_env.cpp):


void co_env::start_schedule_routine__(){    co_interrupt_closer interrupt_closer;    schedule_thread_tid__ = gettid();    schedule_started().pub();    reset_flag(CO_ENV_FLAG_NO_SCHE_THREAD);    set_state(co_env_state::idle);    while (state() != co_env_state::destorying)    {        schedule_switch();
// 切换回来检测是否需要执行共享栈切换 if (shared_stack_switch_info__.need_switch) { continue; }
set_state(co_env_state::idle); // 切换到idle协程,说明空闲了
sleep_if_need(); }
remove_all_ctx__(); task_finished().pub(); current_env__ = nullptr;}
复制代码


切换就是将执行流从当前协程切换到选中的协程,这就是 schedule_switch 函数承担的职责。


schedule_switch 的实现如下(source/cocpp/core/co_env.cpp):


void co_env::schedule_switch(){    lock_schedule();    co_interrupt_closer interrupt_closer;    remove_detached_ctx__();    if (shared_stack_switch_info__.need_switch)    {        switch_shared_stack_ctx__();    }    else    {        switch_normal_ctx__();    }    unlock_schedule();}
复制代码


此处我们暂不分析共享栈切换。所以直接分析 switch_normal_ctx__函数(source/cocpp/core/co_env.cpp):


void co_env::switch_normal_ctx__(){    co_ctx* curr = nullptr;    co_ctx* next = nullptr;    {        if (!prepare_to_switch(curr, next))        {            return;        }    }    unlock_schedule();    switch_to(curr->regs(), next->regs());    lock_schedule();    switched_to().pub(curr);}
复制代码

选择下一个 ctx

此函数首先调用 prepare_to_switch 选择 ctx,然后使用 switch_to 切换。先看一下 prepare_to_switch 是如何选择下一个 ctx 的(source/cocpp/core/co_env.cpp):


bool co_env::prepare_to_switch(co_ctx*& from, co_ctx*& to){    co_ctx* curr = current_ctx();    co_ctx* next = next_ctx__();
assert(curr != nullptr); assert(next != nullptr);
set_flag(CO_ENV_FLAG_SCHEDULED);
update_ctx_state__(curr, next); if (curr == next) { return false; } if (next != idle_ctx__) { set_state(co_env_state::busy); }
if (curr->test_flag(CO_CTX_FLAG_SHARED_STACK) || next->test_flag(CO_CTX_FLAG_SHARED_STACK)) { // 共享栈相关代码,暂时不看 }
from = curr; to = next;
return true;}
复制代码


可以看到此处调用了 next_ctx__决定下一个执行哪个 ctx(source/cocpp/core/co_env.cpp):


co_ctx* co_env::next_ctx__(){    // 如果要销毁、并且可以销毁,切换到idle销毁    if (state() == co_env_state::destorying)    {        return idle_ctx__;    }    else    {        auto next = choose_ctx_from_normal_list__();        return next == nullptr ? idle_ctx__ : next;    }}
co_ctx* co_env::choose_ctx_from_normal_list__(){ std::scoped_lock lock(mu_normal_ctx__, mu_curr_ctx__, mu_min_priority__); for (unsigned int i = min_priority__; i < all_normal_ctx__.size(); ++i) { for (auto& ctx : all_normal_ctx__[i]) { if (ctx->can_schedule()) { auto ret = ctx; all_normal_ctx__[i].remove(ctx); all_normal_ctx__[i].push_back(ret); min_priority__ = i; curr_ctx__ = ret; return ret; } } } curr_ctx__ = nullptr; return nullptr;}
复制代码


从上面的实现我们可以看到选择下一个 ctx 的流程就是从当前最高优先级的位置开始遍历,找到第一个可以被调度的 ctx,然后更新当前最高的优先级。注意这里有一个细节,在选中下一个 ctx 的时候,会将它移动到当前优先级列表的末尾,这是为了实现轮询,防止每次都选中同一个 ctx。


在 co_env::prepare_to_switch 选中了下一个 ctx 以后,就会更新 ctx 状态,然后使用 switch_to 进行切换。


更新状态的实现如下(source/cocpp/core/co_env.cpp):


void co_env::update_ctx_state__(co_ctx* curr, co_ctx* next){    // 如果当前运行的ctx已经完成,状态不变    if (curr->state() != co_state::finished)    {        curr->set_state(co_state::suspended);    }    next->set_state(co_state::running);}
复制代码


对于当前状态为非 finished 状态的,状态转换为 co_state::suspended,等待下一次调度。选中的下一个 ctx 更新为 co_state::running 状态。

切换

switch_to 函数完成切换,是由汇编完成的(source/cocpp/core/co_vos_gcc_x86_64.cpp)。



void switch_to(co_byte** curr, co_byte** next){
__asm volatile("" :: : "memory");
__asm volatile("popq %rbp");
__asm volatile("movq %r8, 0(%rdi)"); __asm volatile("movq %r9, 8(%rdi)"); __asm volatile("movq %r10, 16(%rdi)"); __asm volatile("movq %r11, 24(%rdi)"); __asm volatile("movq %r12, 32(%rdi)"); __asm volatile("movq %r13, 40(%rdi)"); __asm volatile("movq %r14, 48(%rdi)"); __asm volatile("movq %r15, 56(%rdi)"); __asm volatile("movq %rdi, 64(%rdi)"); __asm volatile("movq %rsi, 72(%rdi)"); __asm volatile("movq %rbp, 80(%rdi)"); __asm volatile("movq %rbx, 88(%rdi)"); __asm volatile("movq %rdx, 96(%rdi)"); __asm volatile("movq %rax, 104(%rdi)"); __asm volatile("movq %rcx, 112(%rdi)"); __asm volatile("popq 128(%rdi)"); __asm volatile("pushf"); __asm volatile("popq 136(%rdi)");
__asm volatile("movq %rsp, 120(%rdi)"); // rsp必须在rip后保存,先恢复 ///////////////////////////////////////////////// __asm volatile("movq 120(%rsi), %rsp");
__asm volatile("pushq 136(%rsi)"); __asm volatile("popf"); __asm volatile("pushq 128(%rsi)"); __asm volatile("movq 112(%rsi), %rcx"); __asm volatile("movq 104(%rsi), %rax"); __asm volatile("movq 96(%rsi), %rdx"); __asm volatile("movq 88(%rsi), %rbx"); __asm volatile("movq 80(%rsi), %rbp"); __asm volatile("movq 64(%rsi), %rdi"); __asm volatile("movq 56(%rsi), %r15"); __asm volatile("movq 48(%rsi), %r14"); __asm volatile("movq 40(%rsi), %r13"); __asm volatile("movq 32(%rsi), %r12"); __asm volatile("movq 24(%rsi), %r11"); __asm volatile("movq 16(%rsi), %r10"); __asm volatile("movq 8(%rsi), %r9"); __asm volatile("movq 0(%rsi), %r8"); __asm volatile("movq 72(%rsi), %rsi"); // 必须放在最后恢复
__asm volatile("pushq %rbp");
__asm volatile("" :: : "memory");}
复制代码


这里保存和恢复的寄存器较多,具体原因在后面强制调度章节再介绍。

完成

开始调度后,流程就会走到规范化协程入口函数(source/cocpp/core/co_ctx.cpp):


void co_ctx::real_entry(co_ctx* ctx){    ctx->entry()(ctx->ret_ref());    // CO_DEBUG("ctx %s %p finished", ctx->config().name.c_str(), ctx);    ctx->set_state(co_state::finished);    ctx->finished().pub();    assert(ctx->env() != nullptr);    ctx->env()->schedule_switch(); // 此处的ctx对应的env不可能为空,如果为空,这个ctx就不可能被调度}
复制代码


在函数执行过程中,可能会经历多次切出切入,但是最终都会执行结束返回到此处。


在执行结束后,会将状态设置为 co_state::finished,然后再进行一次切换,这次切换出去后就不再切换回来了(选择下一个 ctx 会跳过 finished 状态的 ctx)。

分离

注意:分离阶段不一定是在完成阶段之后才发生的。


之所以有分离这个阶段,是因为只有过了这个阶段,finished 状态的 ctx 才能被销毁。


先看一下等分离的实现。

等待

等待的实现如下(source/cocpp/interface/co.cpp):


void co::detach(){    if (ctx__ == nullptr)    {        return;    }    ctx__->unlock_destroy();    ctx__ = nullptr;}
复制代码


实现非常简单,就是调用了 unlock_destroy 函数(source/cocpp/core/co_ctx.cpp):


void co_ctx::unlock_destroy(){    reset_flag(CO_CTX_FLAG_LOCKED);    unlocked_destroy().pub();}
复制代码


此函数清除了 CO_CTX_FLAG_LOCKED 标识,清除这个标识说明这个 ctx 可以被框架回收了。

销毁

在每次调度的时候,框架都会尝试清理掉可以被销毁的 ctx(source/cocpp/core/co_env.cpp):


void co_env::remove_detached_ctx__(){    auto curr    = current_ctx();    auto all_ctx = all_scheduleable_ctx__();    for (auto& ctx : all_ctx)    {        // 注意:此处不能删除当前的ctx,如果删除了,switch_to的当前上下文就没地方保存了        if (ctx->state() == co_state::finished && ctx->can_destroy() && ctx != curr)        {            remove_ctx(ctx);        }    }}
复制代码


注意,这里跳过了正在运行的 ctx,因为本次切换还需要将正在运行的上下文保存到当前 ctx 中。


can_destroy 的结果的决定性因素之一就是上面提到的 CO_CTX_FLAG_LOCKED 标识,所以才会说因为只有过了分离这个阶段,finished 状态的 ctx 才能被销毁。

总结

调度流程可以简单以下图表示:



可以对照源码理解。


本文分析一个协程的整个生命周期从创建到销毁的主要流程,设计到大量代码实现,可以运行一下项目中的例子,使用调试器跟踪以加深理解。

发布于: 3 小时前
用户头像

SkyFire

关注

这个cpper很懒,什么都没留下 2018.10.13 加入

会一点点cpp的苦逼码农

评论

发布
暂无评论
一个cpp协程库的前世今生(十)调度的流程