为了防止大家找不到代码,还是先贴一下项目链接:
GitHub - skyfireitdiy/cocpp at cocpp-0.1.0
上一篇文章介绍了如何从外部调度一个长时间运行的协程,但是那个方法并不能完全解决问题,比如陷入一个长时间运行的系统调用中。应用系统调用自身是在内核中运行,所以没有从内核态切往用户态的时机,因此信号得不到处理,也就无法切换。
所以本文将介绍另一种方法防止单个协程阻塞当前调度线程上的所有协程。
如果当前调度线程正在运行的协程陷入了系统调用而无法回应信号强制调度,那么就将当前调度线程上其他可移动的协程移动到其他线程上运行就可以了,这个过程被称为协程重分配。
定时任务
协程重分配也是一个定时任务,如下(source/cocpp/core/co_manager.cpp):
void co_manager::subscribe_manager_event__()
{
timing_routine_timout().sub([this] {
// 每两次超时重新分配一次
static bool double_timeout = false;
// 强制重新调度
force_schedule__();
// 如果是第二次超时
if (double_timeout)
{
// 重新调度
redistribute_ctx__();
// 偷取ctx
steal_ctx_routine__();
// 销毁多余的env
destroy_redundant_env__();
// 释放内存
free_mem__();
}
double_timeout = !double_timeout;
});
}
复制代码
这个函数在上一篇文章中其实已经见过了。本文将重点介绍 redistribute_ctx__函数。
从上面的逻辑可以看出,每发生两次强制调度会触发一次协程重分配。这是因为重分配的代价还是比较大的,如果可以通过强制调度来解决问题,就尽量不要触发重分配。
接下来我们来看一下 redistribute_ctx__函数的实现。
redistribute_ctx__
redistribute_ctx__的实现如下(source/cocpp/core/co_manager.cpp):
void co_manager::redistribute_ctx__()
{
std::scoped_lock lock(env_set__.expired_lock, env_set__.normal_lock, clean_up_lock__);
if (clean_up__)
{
return;
}
std::list<co_ctx*> moved_ctx_list; // 需要被移动的ctx
auto merge_list = [](std::list<co_ctx*>& target, const std::list<co_ctx*>& src) {
target.insert(target.end(), src.begin(), src.end());
};
for (auto& env : env_set__.normal_set)
{
// 如果检测到某个env被阻塞了,收集可转移的ctx
if (env->is_blocked())
{
// 设置阻塞状态,后续的add_ctx不会将ctx加入到此env
env->set_state(co_env_state::blocked);
merge_list(moved_ctx_list, env->take_all_movable_ctx()); // 将阻塞的env中可移动的ctx收集起来
}
env->reset_scheduled_flag();
}
// 重新选择合适的env进行调度
for (auto& ctx : moved_ctx_list)
{
get_best_env__()->move_ctx_to_here(ctx);
}
ctx_redistributed().pub();
}
复制代码
不过一些整体条件的判断,这个函数分为两个部分:
收集被阻塞的 env 中可移动的 ctx
重新分配这些 ctx
收集
该过程遍历所有的 env(除过即将被销毁的),检测其是否被阻塞(一个检测周期内没有发生调度),如果被阻塞了,调用 take_all_movable_ctx 函数将阻塞 env 中可移动的 ctx 取出来。
take_all_movable_ctx 的实现如下(source/cocpp/core/co_env.cpp):
std::list<co_ctx*> co_env::take_all_movable_ctx()
{
std::scoped_lock lock(mu_normal_ctx__, mu_min_priority__, schedule_lock__);
std::list<co_ctx*> ret;
for (unsigned int i = min_priority__; i < all_normal_ctx__.size(); ++i)
{
auto backup = all_normal_ctx__[i];
for (auto& ctx : backup)
{
if (ctx->can_move())
{
all_normal_ctx__[i].remove(ctx);
ret.push_back(ctx);
}
}
}
all_moveable_ctx_taken().pub(ret);
return ret;
}
复制代码
在函数判断 ctx 是否可移动,如果可以的话,就将它从调度的队列中删除,最终返回可移动 ctx 的列表。
重分配
当收集完所有阻塞 env 中可以移动的 ctx 之后。就开始重分配了。
重分配的逻辑非常简单,每次选一个 ctx,然后插入到当前条件下负载最低的 env 中(通过 get_best_env__函数获取),这部分的逻辑在 add_ctx 中是类似的,不做赘述。
不支持的场景
这个手段也并不是一定能解决所有问题的,注意一点的是,这里可以重分配的是当前线程上可以移动的 ctx,换句话说,如果 ctx 自身不支持移动,此手段也无能为力。接下来看一下哪些 ctx 不支持移动(source/cocpp/core/co_ctx.cpp)。
bool co_ctx::can_move() const
{
return !(state() == co_state::running || state() == co_state::finished || test_flag(CO_CTX_FLAG_BIND) || test_flag(CO_CTX_FLAG_SHARED_STACK));
}
复制代码
所以看到以下 ctx 不支持移动:
当前正在运行的协程
已经完成的协程
绑定了 env 的协程
共享栈的协程
总结
本文介绍了当协程陷入系统调用,阻塞当前调度线程的时候,如何防止将同一个调度线程上其他的协程堵塞。这个手段是上一节中外部调度的一个良好补充。
评论