为了防止大家找不到代码,还是先贴一下项目链接:
GitHub - skyfireitdiy/cocpp at cocpp-0.1.0
前一篇文章介绍了 ctx 的状态与标识位,本文将介绍 env 的相关状态与标识位。
标识
env 的标识定义在 include/cocpp/core/co_define.h 中:
constexpr int CO_ENV_FLAG_NO_SCHE_THREAD = 0; // 没有调度线程
constexpr int CO_ENV_FLAG_COVERTED = 1; // 从正常线程转换来的调度线程
constexpr int CO_ENV_FLAG_SCHEDULED = 2; // 被调度过
constexpr int CO_ENV_FLAG_DONT_AUTO_DESTORY = 3; // 禁止被自动清理线程选中
constexpr int CO_ENV_FLAG_MAX = 8; // 最大FLAG值
复制代码
CO_ENV_FLAG_NO_SCHE_THREAD 没有关联的调度任务
此标识说明当前的 env 没有关联的调度任务(线程),不具备调度协程的能力,但是具有等待协程或者协程休眠等能力。
设置时机
当 env 是从已有线程生成的时候,会设置 CO_ENV_FLAG_NO_SCHE_THREAD 标识,表示(暂时)没有调度能力。
source/cocpp/core/co_env_factory.cpp
co_env* co_env_factory::create_env_from_this_thread(size_t stack_size)
{
auto idle_ctx = create_idle_ctx__();
auto shared_stack = stack_factory__->create_stack(stack_size);
auto ret = env_pool__.create_obj(shared_stack, idle_ctx, false);
idle_ctx->set_state(co_state::running);
return ret;
}
复制代码
上面代码中 env_pool__.create_obj 会创建一个新的 env,参数会被传递到 env 的构造函数,注意最后一个参数。
source/cocpp/core/co_env.cpp
co_env::co_env(co_stack* shared_stack, co_ctx* idle_ctx, bool create_new_thread)
: sleep_controller__([this] { return need_sleep__(); })
, shared_stack__(shared_stack)
, idle_ctx__(idle_ctx)
{
idle_ctx__->set_env(this);
if (create_new_thread)
{
// 此处设置状态是防止 add_ctx 前调度线程还未准备好,状态断言失败
set_state(co_env_state::idle);
start_schedule();
}
else
{
set_flag(CO_ENV_FLAG_NO_SCHE_THREAD);
set_flag(CO_ENV_FLAG_COVERTED);
current_env__ = this;
}
}
复制代码
最后一个参数表示不创建关联的调度线程,会设置 CO_ENV_FLAG_NO_SCHE_THREAD 和 CO_ENV_FLAG_COVERTED(表示是由普通线程转换来的 env)。
那么还有一个问题,co_env_factory::create_env_from_this_thread 会在什么场景下调用呢?
source/cocpp/core/co_manager.cpp
void co_manager::create_env_from_this_thread__()
{
std::scoped_lock lck(env_set__.normal_lock, env_set__.mu_normal_env_count);
current_env__ = factory_set__.env_factory->create_env_from_this_thread(default_shared_stack_size__);
subscribe_env_event__(current_env__);
env_set__.normal_set.insert(current_env__);
++env_set__.normal_env_count;
env_from_this_thread_created().pub(current_env__);
}
co_env* co_manager::current_env()
{
if (current_env__ == nullptr)
{
create_env_from_this_thread__();
}
return current_env__;
}
复制代码
当调用 co_manager::current_env 获取当前的 env,当时当前线程有没有关联的 env 时,就会创建一个与当前线程关联的不具备调度能力的 env。
检测时机
此标识为为了判断 env 是否有调度能力,因此需要调度能力的地方都需要检测。
销毁的时候需要判断是否有关联的调度任务,如果有,需要设置销毁状态,让调度任务从内部退出,而对于没有调度任务的 env,就不用处理了(source/cocpp/core/co_env.cpp)。
void co_env::stop_schedule()
{
if (!test_flag(CO_ENV_FLAG_NO_SCHE_THREAD))
{
set_state(co_env_state::destorying);
}
wake_up();
schedule_stopped().pub();
}
复制代码
此时会根据这个标识来判断当前线程是否已经是一个调度线程了,如果已经是了,那么会抛出异常(source/cocpp/core/co_env.cpp):
void co_env::schedule_in_this_thread()
{
if (!test_flag(CO_ENV_FLAG_NO_SCHE_THREAD))
{
throw co_error("schedule_in_this_thread: already started");
}
this_thread_converted_to_schedule_thread().pub(std::this_thread::get_id());
start_schedule_routine__();
}
复制代码
如果是从已有线程转换的 env,是不能被自动销毁的,因为销毁一个没有调度线程关联的 env 是没有意义的,而且,目标线程如果后面还要访问 env,那么又会创建一个新的 env。
source/cocpp/core/co_env.cpp:
bool co_env::can_auto_destroy() const
{
// 如果是用户自己转换的env,不能被选中销毁
return !test_flag(CO_ENV_FLAG_COVERTED) && !test_flag(CO_ENV_FLAG_DONT_AUTO_DESTORY) && !test_flag(CO_ENV_FLAG_NO_SCHE_THREAD);
}
复制代码
从已有协程转换的 env 是不具备调度能力的(source/cocpp/core/co_env.cpp):
bool co_env::can_schedule_ctx() const
{
auto s = state();
return s != co_env_state::blocked && s != co_env_state::destorying && !test_flag(CO_ENV_FLAG_NO_SCHE_THREAD);
}
复制代码
清除时机
当使用 co_env::schedule_in_this_thread 将当前线程转换为调度线程的时候,这个标识就会被清除,表明此 env 具备了调度能力。
source/cocpp/core/co_env.cpp:
void co_env::schedule_in_this_thread()
{
if (!test_flag(CO_ENV_FLAG_NO_SCHE_THREAD))
{
throw co_error("schedule_in_this_thread: already started");
}
this_thread_converted_to_schedule_thread().pub(std::this_thread::get_id());
start_schedule_routine__();
}
void co_env::start_schedule_routine__()
{
co_interrupt_closer interrupt_closer;
schedule_thread_tid__ = gettid();
schedule_started().pub();
reset_flag(CO_ENV_FLAG_NO_SCHE_THREAD);
set_state(co_env_state::idle);
// ...
}
复制代码
CO_ENV_FLAG_COVERTED 转换标识
此标识说明此 env 是有普通线程生成的。那么这个标识和 CO_ENV_FLAG_NO_SCHE_THREAD 有什么不同呢?
CO_ENV_FLAG_NO_SCHE_THREAD 是一种状态标识,存在这种标识的时候标识不能用来调度协程,CO_ENV_FLAG_NO_SCHE_THREAD 会在线程具备调度能力之后被清除。而 CO_ENV_FLAG_COVERTED 是 env 的一种固有属性,无论后续线程的能力发生如何变化,这个标识都不会被清除。
设置时机
这个标识会在 CO_ENV_FLAG_NO_SCHE_THREAD 标识被设置的时候一同设置,可以见 CO_ENV_FLAG_NO_SCHE_THREAD 的分析。
检测时机
此标识会在判断协程是否可以被自动销毁时作为判断依据(source/cocpp/core/co_env.cpp):
bool co_env::can_auto_destroy() const
{
// 如果是用户自己转换的env,不能被选中销毁
return !test_flag(CO_ENV_FLAG_COVERTED) && !test_flag(CO_ENV_FLAG_DONT_AUTO_DESTORY) && !test_flag(CO_ENV_FLAG_NO_SCHE_THREAD);
}
复制代码
事实上,判断是否需要销毁其实没必要同时判断 CO_ENV_FLAG_COVERTED 和 CO_ENV_FLAG_NO_SCHE_THREAD,因为具有 CO_ENV_FLAG_NO_SCHE_THREAD 标识的 env 肯定也具有标识 CO_ENV_FLAG_COVERTED。这里如此判断是让逻辑更清晰,后续可能会修改。
清除时机
此标识不会被清除。
CO_ENV_FLAG_SCHEDULED 调度标识
此标识代表在一个监控周期内此 env 发生过调度。主要是作为协程是否阻塞调度的判断依据。
设置时机
在每次调度的时候都会设置此标识(source/cocpp/core/co_env.cpp):
bool co_env::prepare_to_switch(co_ctx*& from, co_ctx*& to)
{
co_ctx* curr = current_ctx();
co_ctx* next = next_ctx__();
assert(curr != nullptr);
assert(next != nullptr);
set_flag(CO_ENV_FLAG_SCHEDULED);
// ...
}
复制代码
检测时机
判断协程是否阻塞调度 env 的时候作为判断依据(source/cocpp/core/co_env.cpp):
bool co_env::is_blocked() const
{
return state() == co_env_state::busy && !test_flag(CO_ENV_FLAG_SCHEDULED);
}
复制代码
清除时机
在监控任务一轮监控之后,就会将此标识清除,进入下一轮的统计(source/cocpp/core/co_env.cpp):
void co_env::reset_scheduled_flag()
{
reset_flag(CO_ENV_FLAG_SCHEDULED);
scheduled_flag_reset().pub();
}
复制代码
此函数会在强制调度函数 co_manager::force_schedule__和协程重分配函数 co_manager::redistribute_ctx__中调用,这两个函数在后面的相关章节介绍。
CO_ENV_FLAG_DONT_AUTO_DESTORY 禁止自动销毁标识
这个标识用于禁止自动销毁(显式指定),前文的 CO_ENV_FLAG_NO_SCHE_THREAD 和 CO_ENV_FLAG_COVERTED 都可以禁止协程自动销毁,但是那两个标识都是与从普通线程产生的 env 有关。而此标识是可以将讲一个一开始就具有调度能力的 env 标识为不可自动函数。
其应用场景与运行环境绑定相关,在创建 ctx 的时候如果需要设置绑定的 env,那么就需要提供一个 env,这个 env 从哪里来呢?自然是需要管理器提供一个创建 env 的接口。那么,如果这个 env 创建出来会被自动销毁,那么在我们真正使用的时候,env 的状态就不确定了(可能已经被销毁了)。因此,当我们创建 env 用于绑定 ctx 的时候,通常需要指定禁止自动销毁标识。
需要注意的是,设置了禁止自动销毁标识,并不是说不能销毁了,只是禁止了自动销毁,所有的 env 在 manager 生命周期结束的时候也都会销毁的。
设置时机
此标识在创建 env 的时候可以指定(source/cocpp/core/co_manager.cpp):
co_env* co_manager::create_env(bool dont_auto_destory)
{
assert(!clean_up__);
auto env = factory_set__.env_factory->create_env(default_shared_stack_size__);
subscribe_env_event__(env);
if (dont_auto_destory)
{
env->set_flag(CO_ENV_FLAG_DONT_AUTO_DESTORY);
}
std::scoped_lock lck(env_set__.normal_lock, env_set__.mu_normal_env_count);
env_set__.normal_set.insert(env);
++env_set__.normal_env_count;
env_created().pub(env);
return env;
}
复制代码
检测时机
在判断 env 是否可以被自动销毁的时候,此标识作为一个判断依据(source/cocpp/core/co_env.cpp):
bool co_env::can_auto_destroy() const
{
// 如果是用户自己转换的env,不能被选中销毁
return !test_flag(CO_ENV_FLAG_COVERTED) && !test_flag(CO_ENV_FLAG_DONT_AUTO_DESTORY) && !test_flag(CO_ENV_FLAG_NO_SCHE_THREAD);
}
复制代码
清除时机
此标识不会被清除。
状态
env 的状态主要有以下几种,定义在 include/cocpp/core/co_type.h
enum class co_env_state : unsigned char
{
idle, // 空闲状态,此时可以将ctx加入到env中,或者释放此env
busy, // 繁忙状态,可以将ctx加入到env
blocked, // 阻塞状态,需要创建更多的env分担任务
destorying, // 正在销毁状态,不再调度ctx
created, // 创建完成状态,此env不能用于调度ctx,通常是普通线程适配产生的env
};
复制代码
idle 空闲
当没有协程调度的时候,就会进入空闲状态,此时如果有新的协程加入,此 env 会被优先选中进行调度。
设置或者转换时机
env 关联的调度线程创建的时候,会将 env 的状态设置为 idle(source/cocpp/core/co_env.cpp):
co_env::co_env(co_stack* shared_stack, co_ctx* idle_ctx, bool create_new_thread)
: sleep_controller__([this] { return need_sleep__(); })
, shared_stack__(shared_stack)
, idle_ctx__(idle_ctx)
{
idle_ctx__->set_env(this);
if (create_new_thread)
{
// 此处设置状态是防止 add_ctx 前调度线程还未准备好,状态断言失败
set_state(co_env_state::idle);
start_schedule();
}
else
{
set_flag(CO_ENV_FLAG_NO_SCHE_THREAD);
set_flag(CO_ENV_FLAG_COVERTED);
current_env__ = this;
}
}
复制代码
如果没有协程需要调度了,就会将当前的 env 转换为 idle 状态(source/cocpp/core/co_env.cpp):
void co_env::start_schedule_routine__()
{
co_interrupt_closer interrupt_closer;
schedule_thread_tid__ = gettid();
schedule_started().pub();
reset_flag(CO_ENV_FLAG_NO_SCHE_THREAD);
set_state(co_env_state::idle);
while (state() != co_env_state::destorying)
{
schedule_switch();
// 切换回来检测是否需要执行共享栈切换
if (shared_stack_switch_info__.need_switch)
{
continue;
}
set_state(co_env_state::idle); // 切换到idle协程,说明空闲了
sleep_if_need();
}
remove_all_ctx__();
task_finished().pub();
current_env__ = nullptr;
}
复制代码
检测时机
对于新加入的 ctx,manger 会为它选取一个最佳的 env 去调度,此时处于 idle 状态的 env 将被优先选择(source/cocpp/core/co_manager.cpp)。
co_env* co_manager::get_best_env__()
{
// ...
co_env* best_env = nullptr;
auto min_workload = std::numeric_limits<int>::max();
size_t can_schedule_env_count = 0;
for (auto& env : env_set__.normal_set)
{
if (env->state() == co_env_state::idle)
{
best_env_got().pub(env);
return env;
}
if (!env->can_schedule_ctx())
{
continue;
}
// 统计可用于调度的env数量
++can_schedule_env_count;
if (env->workload() < min_workload)
{
min_workload = env->workload();
best_env = env;
}
}
// ...
}
复制代码
自动销毁多余的 env 时,总会选择空闲的 env 销毁(source/cocpp/core/co_manager.cpp):
void co_manager::destroy_redundant_env__()
{
std::lock_guard<std::recursive_mutex> lock(env_set__.normal_lock);
// 然后删除多余的处于idle状态的env
size_t can_schedule_env_count = 0;
std::vector<co_env*> idle_env_list;
idle_env_list.reserve(env_set__.normal_set.size());
for (auto& env : env_set__.normal_set)
{
if (env->can_schedule_ctx())
{
++can_schedule_env_count;
}
if (env->state() == co_env_state::idle && env->can_auto_destroy()) // 如果状态是空闲,并且可以可以被自动销毁线程选中
{
idle_env_list.push_back(env);
}
}
// 超出max_thread_count__,需要销毁env
if (can_schedule_env_count > env_set__.max_env_count)
{
auto should_destroy_count = can_schedule_env_count - env_set__.max_env_count;
for (size_t i = 0; i < should_destroy_count && i < idle_env_list.size(); ++i)
{
idle_env_list[i]->stop_schedule();
}
}
redundant_env_destroyed().pub();
}
复制代码
当一个 env 的状态变更为 idle 之后,就会尝试从其他 env 偷取可移动的协程,而此过程由 manager 完成,manager 会先检测 env 是否空闲(source/cocpp/core/co_manager.cpp),具体细节在后面协程偷取相关章节介绍。
void co_manager::steal_ctx_routine__()
{
std::scoped_lock lock(env_set__.normal_lock);
std::vector<co_env*> idle_env_list;
idle_env_list.reserve(env_set__.normal_set.size());
for (auto& env : env_set__.normal_set)
{
if (env->state() == co_env_state::idle)
{
idle_env_list.push_back(env);
}
}
auto iter = env_set__.normal_set.begin();
for (auto& env : idle_env_list)
{
for (; iter != env_set__.normal_set.end(); ++iter)
{
if ((*iter)->state() == co_env_state::idle)
{
break;
}
auto ctx = (*iter)->take_one_movable_ctx();
if (ctx != nullptr)
{
env->move_ctx_to_here(ctx);
break;
}
}
}
}
复制代码
busy 繁忙状态
当至少有一个协程被调度的时候,env 就会被转为繁忙状态。
设置或者转换时机
当一个新的协程被加入的时候,会将当前 env 转为 busy 状态,表示有 ctx 要调度(source/cocpp/core/co_env.cpp)。
void co_env::move_ctx_to_here(co_ctx* ctx)
{
assert(ctx != nullptr);
assert(state() != co_env_state::created && state() != co_env_state::destorying);
ctx->set_env(this);
{
std::lock_guard<co_spinlock> lock(mu_normal_ctx__);
all_normal_ctx__[ctx->priority()].push_back(ctx);
}
update_min_priority__(ctx->priority());
set_state(co_env_state::busy);
wake_up();
ctx_received().pub(ctx);
}
复制代码
当发生调度时,如果下一个需要被调度的协程不是 idle 协程,就会将状态转为 busy 状态(source/cocpp/core/co_env.cpp)。
bool co_env::prepare_to_switch(co_ctx*& from, co_ctx*& to)
{
co_ctx* curr = current_ctx();
co_ctx* next = next_ctx__();
assert(curr != nullptr);
assert(next != nullptr);
set_flag(CO_ENV_FLAG_SCHEDULED);
update_ctx_state__(curr, next);
if (curr == next)
{
return false;
}
if (next != idle_ctx__)
{
set_state(co_env_state::busy);
}
// ...
}
复制代码
检测时机
busy 状态表示当前正在调度协程,所以如果在一个监控周期内没发生调度,才算是阻塞。如果在其他状态,本就没有协程调度,即使在一个监控周期内没有发生调度,也不能算是阻塞(source/cocpp/core/co_env.cpp)。
bool co_env::is_blocked() const
{
return state() == co_env_state::busy && !test_flag(CO_ENV_FLAG_SCHEDULED);
}
复制代码
blocked 阻塞状态
发生阻塞的场景是超过一个监控周期没有发生调度(即使使用外部强制调度也不奏效)。对于阻塞的 env,是不能将新的 ctx 交给他调度的。
设置或者转换时机
当协程转移时,会设置转移来源的那个 env 为 blocked 状态(source/cocpp/core/co_manager.cpp):
void co_manager::redistribute_ctx__()
{
// ......
for (auto& env : env_set__.normal_set)
{
// 如果检测到某个env被阻塞了,收集可转移的ctx
if (env->is_blocked())
{
// 设置阻塞状态,后续的add_ctx不会将ctx加入到此env
env->set_state(co_env_state::blocked);
merge_list(moved_ctx_list, env->take_all_movable_ctx()); // 将阻塞的env中可移动的ctx收集起来
}
env->reset_scheduled_flag();
}
// ......
}
复制代码
检测时机
当判断一个 env 是否有调度协程的能力时,会使用 blocked 状态作为其中的一个判断依据(source/cocpp/core/co_env.cpp):
bool co_env::can_schedule_ctx() const
{
auto s = state();
return s != co_env_state::blocked && s != co_env_state::destorying && !test_flag(CO_ENV_FLAG_NO_SCHE_THREAD);
}
复制代码
destorying 正在销毁状态
此状态表示 env 正在被销毁,不能往 env 中添加新的协程。
设置或者转换时机
当一个 env 需要停止调度的时候,需要设置销毁状态(source/cocpp/core/co_env.cpp)。
void co_env::stop_schedule()
{
if (!test_flag(CO_ENV_FLAG_NO_SCHE_THREAD))
{
set_state(co_env_state::destorying);
}
wake_up();
schedule_stopped().pub();
}
复制代码
检测时机
新的协程加入时,会检测 env 是否处于正在销毁状态,如果是,则会抛出异常(source/cocpp/core/co_env.cpp):
void co_env::add_ctx(co_ctx* ctx)
{
assert(ctx != nullptr);
if (state() != co_env_state::created && state() != co_env_state::destorying)
{
throw co_error("env state error");
}
init_ctx(shared_stack__, ctx); // 初始化ctx
ctx_initted().pub(ctx);
move_ctx_to_here(ctx);
ctx_added().pub(ctx);
}
复制代码
在选择下一个运行协程的时候,如果检测到自身的状态为 destorying,会直接切换到 idle 协程,idle 协程会负责将 env 销毁(source/cocpp/core/co_env.cpp)。
co_ctx* co_env::next_ctx__()
{
// 如果要销毁、并且可以销毁,切换到idle销毁
if (state() == co_env_state::destorying)
{
return idle_ctx__;
}
else
{
auto next = choose_ctx_from_normal_list__();
return next == nullptr ? idle_ctx__ : next;
}
}
复制代码
idle 协程的调度循环条件就是协程状态不是 destorying,因此在每一轮调度都会检测状态(source/cocpp/core/co_env.cpp)。
void co_env::start_schedule_routine__()
{
// ......
while (state() != co_env_state::destorying)
{
schedule_switch();
// ......
}
// ......
}
复制代码
destorying 状态的 env 不能调度协程(source/cocpp/core/co_env.cpp)。
bool co_env::can_schedule_ctx() const
{
auto s = state();
return s != co_env_state::blocked && s != co_env_state::destorying && !test_flag(CO_ENV_FLAG_NO_SCHE_THREAD);
}
复制代码
当状态为 destorying 时,不需要休眠(source/cocpp/core/co_env.cpp)。
bool co_env::need_sleep__()
{
return !can_schedule__() && state() != co_env_state::destorying;
}
复制代码
created 创建状态
这个状态是 env 刚创建时候的状态。有的读者就要问了,前面不是说调度线程创建的时候,对应 env 不就是 idle 状态了吗?注意一点,这里的对象不一样,前面提到的是调度线程创建,这里是 env 创建,两者有关联关系,但是却不等价,env 可以与普通线程关联,而不一定非得与调度线程关联(考虑一下 CO_ENV_FLAG_NO_SCHE_THREAD 标识的分析)。
设置或者转换时机
env 创建的初始状态就是 created,此时还没有关联调度线程(include/cocpp/core/co_env.h)。
co_state_manager<co_env_state, co_env_state::created, co_env_state::destorying> state_manager__; // 状态管理器
复制代码
检测时机
在加入 ctx 的时候,会检测当前的 env 状态,如果是 created,说明没有关联调度线程,不能用于调度,抛出异常(source/cocpp/core/co_env.cpp):
void co_env::add_ctx(co_ctx* ctx)
{
assert(ctx != nullptr);
if (state() != co_env_state::created && state() != co_env_state::destorying)
{
throw co_error("env state error");
}
init_ctx(shared_stack__, ctx); // 初始化ctx
ctx_initted().pub(ctx);
move_ctx_to_here(ctx);
ctx_added().pub(ctx);
}
复制代码
总结
本文介绍了 cocpp 调度中最重要的调度环境 env 的标识和状态的含义、设置条件等细节,代码贴的比较多,大致了解就可以了,后面还会经常遇到。
评论