写点什么

一个 cpp 协程库的前世今生(八)env 的状态与标识位

作者:SkyFire
  • 2022 年 1 月 02 日
  • 本文字数:9625 字

    阅读完需:约 32 分钟

一个cpp协程库的前世今生(八)env的状态与标识位

为了防止大家找不到代码,还是先贴一下项目链接:


GitHub - skyfireitdiy/cocpp at cocpp-0.1.0


前一篇文章介绍了 ctx 的状态与标识位,本文将介绍 env 的相关状态与标识位。

标识

env 的标识定义在 include/cocpp/core/co_define.h 中:


constexpr int CO_ENV_FLAG_NO_SCHE_THREAD    = 0; // 没有调度线程constexpr int CO_ENV_FLAG_COVERTED          = 1; // 从正常线程转换来的调度线程constexpr int CO_ENV_FLAG_SCHEDULED         = 2; // 被调度过constexpr int CO_ENV_FLAG_DONT_AUTO_DESTORY = 3; // 禁止被自动清理线程选中constexpr int CO_ENV_FLAG_MAX               = 8; // 最大FLAG值
复制代码

CO_ENV_FLAG_NO_SCHE_THREAD 没有关联的调度任务

此标识说明当前的 env 没有关联的调度任务(线程),不具备调度协程的能力,但是具有等待协程或者协程休眠等能力。

设置时机

当 env 是从已有线程生成的时候,会设置 CO_ENV_FLAG_NO_SCHE_THREAD 标识,表示(暂时)没有调度能力。


source/cocpp/core/co_env_factory.cpp


co_env* co_env_factory::create_env_from_this_thread(size_t stack_size){    auto idle_ctx     = create_idle_ctx__();    auto shared_stack = stack_factory__->create_stack(stack_size);    auto ret          = env_pool__.create_obj(shared_stack, idle_ctx, false);    idle_ctx->set_state(co_state::running);    return ret;}
复制代码


上面代码中 env_pool__.create_obj 会创建一个新的 env,参数会被传递到 env 的构造函数,注意最后一个参数。


source/cocpp/core/co_env.cpp


co_env::co_env(co_stack* shared_stack, co_ctx* idle_ctx, bool create_new_thread)    : sleep_controller__([this] { return need_sleep__(); })    , shared_stack__(shared_stack)    , idle_ctx__(idle_ctx){    idle_ctx__->set_env(this);    if (create_new_thread)    {        // 此处设置状态是防止 add_ctx 前调度线程还未准备好,状态断言失败        set_state(co_env_state::idle);        start_schedule();    }    else    {        set_flag(CO_ENV_FLAG_NO_SCHE_THREAD);        set_flag(CO_ENV_FLAG_COVERTED);        current_env__ = this;    }}
复制代码


最后一个参数表示不创建关联的调度线程,会设置 CO_ENV_FLAG_NO_SCHE_THREAD 和 CO_ENV_FLAG_COVERTED(表示是由普通线程转换来的 env)。


那么还有一个问题,co_env_factory::create_env_from_this_thread 会在什么场景下调用呢?


source/cocpp/core/co_manager.cpp


void co_manager::create_env_from_this_thread__(){    std::scoped_lock lck(env_set__.normal_lock, env_set__.mu_normal_env_count);    current_env__ = factory_set__.env_factory->create_env_from_this_thread(default_shared_stack_size__);
subscribe_env_event__(current_env__);
env_set__.normal_set.insert(current_env__); ++env_set__.normal_env_count;
env_from_this_thread_created().pub(current_env__);}
co_env* co_manager::current_env(){ if (current_env__ == nullptr) { create_env_from_this_thread__(); } return current_env__;}
复制代码


当调用 co_manager::current_env 获取当前的 env,当时当前线程有没有关联的 env 时,就会创建一个与当前线程关联的不具备调度能力的 env。

检测时机

此标识为为了判断 env 是否有调度能力,因此需要调度能力的地方都需要检测。


  • env 销毁


销毁的时候需要判断是否有关联的调度任务,如果有,需要设置销毁状态,让调度任务从内部退出,而对于没有调度任务的 env,就不用处理了(source/cocpp/core/co_env.cpp)。


void co_env::stop_schedule(){    if (!test_flag(CO_ENV_FLAG_NO_SCHE_THREAD))    {        set_state(co_env_state::destorying);    }
wake_up();
schedule_stopped().pub();}
复制代码


  • 转换当前线程为调度线程


此时会根据这个标识来判断当前线程是否已经是一个调度线程了,如果已经是了,那么会抛出异常(source/cocpp/core/co_env.cpp):


void co_env::schedule_in_this_thread(){    if (!test_flag(CO_ENV_FLAG_NO_SCHE_THREAD))    {        throw co_error("schedule_in_this_thread: already started");    }    this_thread_converted_to_schedule_thread().pub(std::this_thread::get_id());    start_schedule_routine__();}
复制代码


  • 判断 env 是否可以被自动销毁


如果是从已有线程转换的 env,是不能被自动销毁的,因为销毁一个没有调度线程关联的 env 是没有意义的,而且,目标线程如果后面还要访问 env,那么又会创建一个新的 env。


source/cocpp/core/co_env.cpp:


bool co_env::can_auto_destroy() const{    // 如果是用户自己转换的env,不能被选中销毁    return !test_flag(CO_ENV_FLAG_COVERTED) && !test_flag(CO_ENV_FLAG_DONT_AUTO_DESTORY) && !test_flag(CO_ENV_FLAG_NO_SCHE_THREAD);}
复制代码


  • 判断是否有调度能力


从已有协程转换的 env 是不具备调度能力的(source/cocpp/core/co_env.cpp):


bool co_env::can_schedule_ctx() const{
auto s = state(); return s != co_env_state::blocked && s != co_env_state::destorying && !test_flag(CO_ENV_FLAG_NO_SCHE_THREAD);}
复制代码

清除时机

当使用 co_env::schedule_in_this_thread 将当前线程转换为调度线程的时候,这个标识就会被清除,表明此 env 具备了调度能力。


source/cocpp/core/co_env.cpp:


void co_env::schedule_in_this_thread(){    if (!test_flag(CO_ENV_FLAG_NO_SCHE_THREAD))    {        throw co_error("schedule_in_this_thread: already started");    }    this_thread_converted_to_schedule_thread().pub(std::this_thread::get_id());    start_schedule_routine__();}
void co_env::start_schedule_routine__(){ co_interrupt_closer interrupt_closer; schedule_thread_tid__ = gettid(); schedule_started().pub(); reset_flag(CO_ENV_FLAG_NO_SCHE_THREAD); set_state(co_env_state::idle); // ...}
复制代码

CO_ENV_FLAG_COVERTED 转换标识

此标识说明此 env 是有普通线程生成的。那么这个标识和 CO_ENV_FLAG_NO_SCHE_THREAD 有什么不同呢?


CO_ENV_FLAG_NO_SCHE_THREAD 是一种状态标识,存在这种标识的时候标识不能用来调度协程,CO_ENV_FLAG_NO_SCHE_THREAD 会在线程具备调度能力之后被清除。而 CO_ENV_FLAG_COVERTED 是 env 的一种固有属性,无论后续线程的能力发生如何变化,这个标识都不会被清除。

设置时机

这个标识会在 CO_ENV_FLAG_NO_SCHE_THREAD 标识被设置的时候一同设置,可以见 CO_ENV_FLAG_NO_SCHE_THREAD 的分析。

检测时机

此标识会在判断协程是否可以被自动销毁时作为判断依据(source/cocpp/core/co_env.cpp):


bool co_env::can_auto_destroy() const{    // 如果是用户自己转换的env,不能被选中销毁    return !test_flag(CO_ENV_FLAG_COVERTED) && !test_flag(CO_ENV_FLAG_DONT_AUTO_DESTORY) && !test_flag(CO_ENV_FLAG_NO_SCHE_THREAD);}
复制代码


事实上,判断是否需要销毁其实没必要同时判断 CO_ENV_FLAG_COVERTED 和 CO_ENV_FLAG_NO_SCHE_THREAD,因为具有 CO_ENV_FLAG_NO_SCHE_THREAD 标识的 env 肯定也具有标识 CO_ENV_FLAG_COVERTED。这里如此判断是让逻辑更清晰,后续可能会修改。

清除时机

此标识不会被清除。

CO_ENV_FLAG_SCHEDULED 调度标识

此标识代表在一个监控周期内此 env 发生过调度。主要是作为协程是否阻塞调度的判断依据。

设置时机

在每次调度的时候都会设置此标识(source/cocpp/core/co_env.cpp):


bool co_env::prepare_to_switch(co_ctx*& from, co_ctx*& to){    co_ctx* curr = current_ctx();    co_ctx* next = next_ctx__();
assert(curr != nullptr); assert(next != nullptr);
set_flag(CO_ENV_FLAG_SCHEDULED);
// ...}
复制代码

检测时机

判断协程是否阻塞调度 env 的时候作为判断依据(source/cocpp/core/co_env.cpp):


bool co_env::is_blocked() const{    return state() == co_env_state::busy && !test_flag(CO_ENV_FLAG_SCHEDULED);}
复制代码

清除时机

在监控任务一轮监控之后,就会将此标识清除,进入下一轮的统计(source/cocpp/core/co_env.cpp):


void co_env::reset_scheduled_flag(){    reset_flag(CO_ENV_FLAG_SCHEDULED);    scheduled_flag_reset().pub();}
复制代码


此函数会在强制调度函数 co_manager::force_schedule__和协程重分配函数 co_manager::redistribute_ctx__中调用,这两个函数在后面的相关章节介绍。

CO_ENV_FLAG_DONT_AUTO_DESTORY 禁止自动销毁标识

这个标识用于禁止自动销毁(显式指定),前文的 CO_ENV_FLAG_NO_SCHE_THREAD 和 CO_ENV_FLAG_COVERTED 都可以禁止协程自动销毁,但是那两个标识都是与从普通线程产生的 env 有关。而此标识是可以将讲一个一开始就具有调度能力的 env 标识为不可自动函数。


其应用场景与运行环境绑定相关,在创建 ctx 的时候如果需要设置绑定的 env,那么就需要提供一个 env,这个 env 从哪里来呢?自然是需要管理器提供一个创建 env 的接口。那么,如果这个 env 创建出来会被自动销毁,那么在我们真正使用的时候,env 的状态就不确定了(可能已经被销毁了)。因此,当我们创建 env 用于绑定 ctx 的时候,通常需要指定禁止自动销毁标识。


需要注意的是,设置了禁止自动销毁标识,并不是说不能销毁了,只是禁止了自动销毁,所有的 env 在 manager 生命周期结束的时候也都会销毁的。

设置时机

此标识在创建 env 的时候可以指定(source/cocpp/core/co_manager.cpp):


co_env* co_manager::create_env(bool dont_auto_destory){    assert(!clean_up__);    auto env = factory_set__.env_factory->create_env(default_shared_stack_size__);
subscribe_env_event__(env);
if (dont_auto_destory) { env->set_flag(CO_ENV_FLAG_DONT_AUTO_DESTORY); } std::scoped_lock lck(env_set__.normal_lock, env_set__.mu_normal_env_count); env_set__.normal_set.insert(env); ++env_set__.normal_env_count;
env_created().pub(env); return env;}
复制代码

检测时机

在判断 env 是否可以被自动销毁的时候,此标识作为一个判断依据(source/cocpp/core/co_env.cpp):


bool co_env::can_auto_destroy() const{    // 如果是用户自己转换的env,不能被选中销毁    return !test_flag(CO_ENV_FLAG_COVERTED) && !test_flag(CO_ENV_FLAG_DONT_AUTO_DESTORY) && !test_flag(CO_ENV_FLAG_NO_SCHE_THREAD);}
复制代码

清除时机

此标识不会被清除。

状态

env 的状态主要有以下几种,定义在 include/cocpp/core/co_type.h


enum class co_env_state : unsigned char{    idle,       // 空闲状态,此时可以将ctx加入到env中,或者释放此env    busy,       // 繁忙状态,可以将ctx加入到env    blocked,    // 阻塞状态,需要创建更多的env分担任务    destorying, // 正在销毁状态,不再调度ctx    created,    // 创建完成状态,此env不能用于调度ctx,通常是普通线程适配产生的env};
复制代码

idle 空闲

当没有协程调度的时候,就会进入空闲状态,此时如果有新的协程加入,此 env 会被优先选中进行调度。

设置或者转换时机

  • 调度线程创建


env 关联的调度线程创建的时候,会将 env 的状态设置为 idle(source/cocpp/core/co_env.cpp):


co_env::co_env(co_stack* shared_stack, co_ctx* idle_ctx, bool create_new_thread)    : sleep_controller__([this] { return need_sleep__(); })    , shared_stack__(shared_stack)    , idle_ctx__(idle_ctx){    idle_ctx__->set_env(this);    if (create_new_thread)    {        // 此处设置状态是防止 add_ctx 前调度线程还未准备好,状态断言失败        set_state(co_env_state::idle);        start_schedule();    }    else    {        set_flag(CO_ENV_FLAG_NO_SCHE_THREAD);        set_flag(CO_ENV_FLAG_COVERTED);        current_env__ = this;    }}
复制代码


  • 没有协程需要调度


如果没有协程需要调度了,就会将当前的 env 转换为 idle 状态(source/cocpp/core/co_env.cpp):


void co_env::start_schedule_routine__(){    co_interrupt_closer interrupt_closer;    schedule_thread_tid__ = gettid();    schedule_started().pub();    reset_flag(CO_ENV_FLAG_NO_SCHE_THREAD);    set_state(co_env_state::idle);    while (state() != co_env_state::destorying)    {        schedule_switch();
// 切换回来检测是否需要执行共享栈切换 if (shared_stack_switch_info__.need_switch) { continue; }
set_state(co_env_state::idle); // 切换到idle协程,说明空闲了
sleep_if_need(); }
remove_all_ctx__(); task_finished().pub(); current_env__ = nullptr;}
复制代码

检测时机

  • 新加入的 ctx 选取最佳 env 时


对于新加入的 ctx,manger 会为它选取一个最佳的 env 去调度,此时处于 idle 状态的 env 将被优先选择(source/cocpp/core/co_manager.cpp)。


co_env* co_manager::get_best_env__(){    // ...        co_env* best_env               = nullptr;    auto    min_workload           = std::numeric_limits<int>::max();    size_t  can_schedule_env_count = 0;    for (auto& env : env_set__.normal_set)    {        if (env->state() == co_env_state::idle)        {            best_env_got().pub(env);            return env;        }        if (!env->can_schedule_ctx())        {            continue;        }        // 统计可用于调度的env数量        ++can_schedule_env_count;        if (env->workload() < min_workload)        {            min_workload = env->workload();            best_env     = env;        }    }        // ...}
复制代码


  • 自动销毁 env 时


自动销毁多余的 env 时,总会选择空闲的 env 销毁(source/cocpp/core/co_manager.cpp):


void co_manager::destroy_redundant_env__(){    std::lock_guard<std::recursive_mutex> lock(env_set__.normal_lock);    // 然后删除多余的处于idle状态的env    size_t               can_schedule_env_count = 0;    std::vector<co_env*> idle_env_list;    idle_env_list.reserve(env_set__.normal_set.size());    for (auto& env : env_set__.normal_set)    {        if (env->can_schedule_ctx())        {            ++can_schedule_env_count;        }        if (env->state() == co_env_state::idle && env->can_auto_destroy()) // 如果状态是空闲,并且可以可以被自动销毁线程选中        {            idle_env_list.push_back(env);        }    }    // 超出max_thread_count__,需要销毁env    if (can_schedule_env_count > env_set__.max_env_count)    {        auto should_destroy_count = can_schedule_env_count - env_set__.max_env_count;        for (size_t i = 0; i < should_destroy_count && i < idle_env_list.size(); ++i)        {            idle_env_list[i]->stop_schedule();        }    }    redundant_env_destroyed().pub();}
复制代码


  • 协程偷取


当一个 env 的状态变更为 idle 之后,就会尝试从其他 env 偷取可移动的协程,而此过程由 manager 完成,manager 会先检测 env 是否空闲(source/cocpp/core/co_manager.cpp),具体细节在后面协程偷取相关章节介绍。


void co_manager::steal_ctx_routine__(){    std::scoped_lock     lock(env_set__.normal_lock);    std::vector<co_env*> idle_env_list;    idle_env_list.reserve(env_set__.normal_set.size());    for (auto& env : env_set__.normal_set)    {        if (env->state() == co_env_state::idle)        {            idle_env_list.push_back(env);        }    }
auto iter = env_set__.normal_set.begin(); for (auto& env : idle_env_list) { for (; iter != env_set__.normal_set.end(); ++iter) { if ((*iter)->state() == co_env_state::idle) { break; } auto ctx = (*iter)->take_one_movable_ctx(); if (ctx != nullptr) { env->move_ctx_to_here(ctx); break; } } }}
复制代码

busy 繁忙状态

当至少有一个协程被调度的时候,env 就会被转为繁忙状态。

设置或者转换时机

  • 新协程加入


当一个新的协程被加入的时候,会将当前 env 转为 busy 状态,表示有 ctx 要调度(source/cocpp/core/co_env.cpp)。


void co_env::move_ctx_to_here(co_ctx* ctx){    assert(ctx != nullptr);    assert(state() != co_env_state::created && state() != co_env_state::destorying);
ctx->set_env(this);
{ std::lock_guard<co_spinlock> lock(mu_normal_ctx__); all_normal_ctx__[ctx->priority()].push_back(ctx); }
update_min_priority__(ctx->priority()); set_state(co_env_state::busy);
wake_up(); ctx_received().pub(ctx);}
复制代码


  • 发生调度


当发生调度时,如果下一个需要被调度的协程不是 idle 协程,就会将状态转为 busy 状态(source/cocpp/core/co_env.cpp)。


bool co_env::prepare_to_switch(co_ctx*& from, co_ctx*& to){    co_ctx* curr = current_ctx();    co_ctx* next = next_ctx__();
assert(curr != nullptr); assert(next != nullptr);
set_flag(CO_ENV_FLAG_SCHEDULED);
update_ctx_state__(curr, next); if (curr == next) { return false; } if (next != idle_ctx__) { set_state(co_env_state::busy); }
// ...}
复制代码

检测时机

  • 判断是否阻塞


busy 状态表示当前正在调度协程,所以如果在一个监控周期内没发生调度,才算是阻塞。如果在其他状态,本就没有协程调度,即使在一个监控周期内没有发生调度,也不能算是阻塞(source/cocpp/core/co_env.cpp)。


bool co_env::is_blocked() const{    return state() == co_env_state::busy && !test_flag(CO_ENV_FLAG_SCHEDULED);}
复制代码

blocked 阻塞状态

发生阻塞的场景是超过一个监控周期没有发生调度(即使使用外部强制调度也不奏效)。对于阻塞的 env,是不能将新的 ctx 交给他调度的。

设置或者转换时机

当协程转移时,会设置转移来源的那个 env 为 blocked 状态(source/cocpp/core/co_manager.cpp):


void co_manager::redistribute_ctx__(){    // ......
for (auto& env : env_set__.normal_set) { // 如果检测到某个env被阻塞了,收集可转移的ctx if (env->is_blocked()) { // 设置阻塞状态,后续的add_ctx不会将ctx加入到此env env->set_state(co_env_state::blocked);
merge_list(moved_ctx_list, env->take_all_movable_ctx()); // 将阻塞的env中可移动的ctx收集起来 } env->reset_scheduled_flag(); }
// ......}
复制代码

检测时机

当判断一个 env 是否有调度协程的能力时,会使用 blocked 状态作为其中的一个判断依据(source/cocpp/core/co_env.cpp):


bool co_env::can_schedule_ctx() const{
auto s = state(); return s != co_env_state::blocked && s != co_env_state::destorying && !test_flag(CO_ENV_FLAG_NO_SCHE_THREAD);}
复制代码

destorying 正在销毁状态

此状态表示 env 正在被销毁,不能往 env 中添加新的协程。

设置或者转换时机

当一个 env 需要停止调度的时候,需要设置销毁状态(source/cocpp/core/co_env.cpp)。


void co_env::stop_schedule(){    if (!test_flag(CO_ENV_FLAG_NO_SCHE_THREAD))    {        set_state(co_env_state::destorying);    }
wake_up();
schedule_stopped().pub();}
复制代码

检测时机

  • 新协程加入时


新的协程加入时,会检测 env 是否处于正在销毁状态,如果是,则会抛出异常(source/cocpp/core/co_env.cpp):


void co_env::add_ctx(co_ctx* ctx){    assert(ctx != nullptr);    if (state() != co_env_state::created && state() != co_env_state::destorying)    {        throw co_error("env state error");    }    init_ctx(shared_stack__, ctx); // 初始化ctx    ctx_initted().pub(ctx);
move_ctx_to_here(ctx);
ctx_added().pub(ctx);}
复制代码


  • 选择下一个运行协程时


在选择下一个运行协程的时候,如果检测到自身的状态为 destorying,会直接切换到 idle 协程,idle 协程会负责将 env 销毁(source/cocpp/core/co_env.cpp)。


co_ctx* co_env::next_ctx__(){    // 如果要销毁、并且可以销毁,切换到idle销毁    if (state() == co_env_state::destorying)    {        return idle_ctx__;    }    else    {        auto next = choose_ctx_from_normal_list__();        return next == nullptr ? idle_ctx__ : next;    }}
复制代码


  • idle 协程的调度循环条件


idle 协程的调度循环条件就是协程状态不是 destorying,因此在每一轮调度都会检测状态(source/cocpp/core/co_env.cpp)。


void co_env::start_schedule_routine__(){    // ......        while (state() != co_env_state::destorying)    {        schedule_switch();
// ...... }
// ......}
复制代码


  • 判断是否可以调度 ctx


destorying 状态的 env 不能调度协程(source/cocpp/core/co_env.cpp)。


bool co_env::can_schedule_ctx() const{
auto s = state(); return s != co_env_state::blocked && s != co_env_state::destorying && !test_flag(CO_ENV_FLAG_NO_SCHE_THREAD);}
复制代码


  • 判断调度线程是否有必要休眠


当状态为 destorying 时,不需要休眠(source/cocpp/core/co_env.cpp)。


bool co_env::need_sleep__(){    return !can_schedule__() && state() != co_env_state::destorying;}
复制代码

created 创建状态

这个状态是 env 刚创建时候的状态。有的读者就要问了,前面不是说调度线程创建的时候,对应 env 不就是 idle 状态了吗?注意一点,这里的对象不一样,前面提到的是调度线程创建,这里是 env 创建,两者有关联关系,但是却不等价,env 可以与普通线程关联,而不一定非得与调度线程关联(考虑一下 CO_ENV_FLAG_NO_SCHE_THREAD 标识的分析)。

设置或者转换时机

env 创建的初始状态就是 created,此时还没有关联调度线程(include/cocpp/core/co_env.h)。


co_state_manager<co_env_state, co_env_state::created, co_env_state::destorying> state_manager__; // 状态管理器
复制代码

检测时机

在加入 ctx 的时候,会检测当前的 env 状态,如果是 created,说明没有关联调度线程,不能用于调度,抛出异常(source/cocpp/core/co_env.cpp):


void co_env::add_ctx(co_ctx* ctx){    assert(ctx != nullptr);    if (state() != co_env_state::created && state() != co_env_state::destorying)    {        throw co_error("env state error");    }    init_ctx(shared_stack__, ctx); // 初始化ctx    ctx_initted().pub(ctx);
move_ctx_to_here(ctx);
ctx_added().pub(ctx);}
复制代码

总结

本文介绍了 cocpp 调度中最重要的调度环境 env 的标识和状态的含义、设置条件等细节,代码贴的比较多,大致了解就可以了,后面还会经常遇到。

发布于: 1 小时前
用户头像

SkyFire

关注

这个cpper很懒,什么都没留下 2018.10.13 加入

会一点点cpp的苦逼码农

评论

发布
暂无评论
一个cpp协程库的前世今生(八)env的状态与标识位