为了防止大家找不到代码,还是先贴一下项目链接:
GitHub - skyfireitdiy/cocpp at cocpp-0.1.0
前面我们提到了 env 的两个阈值,基础 env 数量 base_env_count 和最大 env 数量 max_env_count。
并提到了在超过 max_env_count 的时候,会自动销毁一些空闲的 env。本文我们介绍一下如何完成销毁 env。
定时任务
检测并标记需要销毁的 env 还是由定时任务完成的,如下(source/cocpp/core/co_manager.cpp)。
void co_manager::subscribe_manager_event__()
{
timing_routine_timout().sub([this] {
// 每两次超时重新分配一次
static bool double_timeout = false;
// 强制重新调度
force_schedule__();
// 如果是第二次超时
if (double_timeout)
{
// 重新调度
redistribute_ctx__();
// 偷取ctx
steal_ctx_routine__();
// 销毁多余的env
destroy_redundant_env__();
// 释放内存
free_mem__();
}
double_timeout = !double_timeout;
});
}
复制代码
标记函数为 destroy_redundant_env__。
接下来我们来看一下 destroy_redundant_env__的实现。
destroy_redundant_env__
该函数的实现位于 source/cocpp/core/co_manager.cpp:
void co_manager::destroy_redundant_env__()
{
std::lock_guard<std::recursive_mutex> lock(env_set__.normal_lock);
// 然后删除多余的处于idle状态的env
size_t can_schedule_env_count = 0;
std::vector<co_env*> idle_env_list;
idle_env_list.reserve(env_set__.normal_set.size());
for (auto& env : env_set__.normal_set)
{
if (env->can_schedule_ctx())
{
++can_schedule_env_count;
}
if (env->state() == co_env_state::idle && env->can_auto_destroy() && !env->has_ctx()) // 如果状态是空闲,并且可以可以被自动销毁线程选中
{
idle_env_list.push_back(env);
}
}
// 超出max_thread_count__,需要销毁env
if (can_schedule_env_count > env_set__.max_env_count)
{
auto should_destroy_count = can_schedule_env_count - env_set__.max_env_count;
for (size_t i = 0; i < should_destroy_count && i < idle_env_list.size(); ++i)
{
idle_env_list[i]->stop_schedule();
}
}
redundant_env_destroyed().pub();
}
复制代码
这个函数可以分为两个部分:
找出所有的空闲 env,并统计可以调度 ctx 的 env 数量。
清理多余的 env
下面主要讲一下后面的销毁流程,销毁流程其实很简单,就是调用 env 的 stop_schedule 成员函数。
stop_schedule
函数实现如下(source/cocpp/core/co_env.cpp):
void co_env::stop_schedule()
{
if (!test_flag(CO_ENV_FLAG_NO_SCHE_THREAD))
{
set_state(co_env_state::destorying);
}
wake_up();
schedule_stopped().pub();
}
复制代码
此函数将 state 设置为 co_env_state::destorying,然后调用 wake_up(因为此时 env 是 idle 状态,已经进入条件变量的等待中了)唤醒 env 对应的调度线程。
start_schedule_routine__
start_schedule_routine__是调度线程的执行函数,其实现如下(source/cocpp/core/co_env.cpp):
void co_env::start_schedule_routine__()
{
co_interrupt_closer interrupt_closer;
schedule_thread_tid__ = gettid();
schedule_started().pub();
reset_flag(CO_ENV_FLAG_NO_SCHE_THREAD);
set_state(co_env_state::idle);
while (state() != co_env_state::destorying)
{
decreate_interrupt_lock_count_with_lock();
schedule_switch();
increate_interrupt_lock_count_with_lock();
// 切换回来检测是否需要执行共享栈切换
if (shared_stack_switch_info__.need_switch)
{
continue;
}
set_state(co_env_state::idle); // 切换到idle协程,说明空闲了
sleep_if_need();
}
remove_all_ctx__();
task_finished().pub();
current_env__ = nullptr;
}
复制代码
线程此时运行的代码应该是 sleep_if_need 函数中,从前面的介绍中可以知道,这个函数使用条件变量的等待来让出 cpu,而 stop_schedule 中的 wake_up 函数调用将此线程从条件变量的等待状态中退出,因此此处的 sleep_if_need 函数调用返回。
然后进入下一轮的条件判断,此时状态为 co_env_state::destorying,此时循环条件不成立,因此退出循环。
接下来线程会调用 remove_all_ctx__函数来删除所有的 ctx。
而事实上,如果是因为空闲而销毁的 env,那么肯定没有 ctx,因此此处可以忽略 remove_all_ctx__的调用。
最后将 current_env__设置为 nullptr,current_env__ 是一个线程局部存储变量,用于快速访问到当前线程对应的 env。
有的读者可能会问,没看到真正删除 env 的地方呀,此函数返回只是当前调度线程退出,不会销毁 env 呀,那么 env 不就泄露了吗?能想到这一点的读者,可以说是非常细了。
这就需要再提一下前面讲过的 event 了,在 current_env__ 被设置为 nullptr 之前,会发布一个事件:
而此事件在 env 被创建的时候会设置一个回调处理函数(source/cocpp/core/co_manager.cpp):
co_env* co_manager::create_env(bool dont_auto_destory)
{
assert(!clean_up__);
auto env = factory_set__.env_factory->create_env(default_shared_stack_size__);
subscribe_env_event__(env);
// ...
}
void co_manager::subscribe_env_event__(co_env* env)
{
env->task_finished().sub([this, env]() {
remove_env__(env);
});
}
复制代码
也就是最终会调用 remove_env__来销毁 env,接下来看一下 remove_env__的实现。
remove_env__
函数的实现位于 source/cocpp/core/co_manager.cpp:
void co_manager::remove_env__(co_env* env)
{
std::scoped_lock lock(env_set__.normal_lock, env_set__.expired_lock);
env_set__.normal_set.erase(env);
env_set__.expired_set.insert(env);
env_set__.cond_expired_env.notify_one();
env_removed().pub(env);
}
复制代码
此函数将 env 从普通集合中移动到过期集合,并通过信号量唤醒一个线程。那么,唤醒的是那个线程呢?我们到 manger 构造函数中调用的 create_background_task__中看一看(source/cocpp/core/co_manager.cpp):
void co_manager::create_background_task__()
{
background_task__.emplace_back(std::async(std::launch::async, [this]() {
clean_env_routine__();
}));
background_task__.emplace_back(std::async(std::launch::async, [this]() {
timer_routine__();
}));
background_task_created().pub();
}
复制代码
上面的信号量唤醒的就是 clean_env_routine__这个线程。
clean_env_routine__
函数实现如下(source/cocpp/core/co_manager.cpp):
void co_manager::clean_env_routine__()
{
std::unique_lock<std::recursive_mutex> lck(env_set__.expired_lock);
std::unique_lock<co_spinlock> clean_lock(clean_up_lock__);
while (!clean_up__ || env_set__.normal_env_count != 0)
{
clean_lock.unlock();
env_set__.cond_expired_env.wait(lck);
clean_lock.lock();
std::lock_guard<co_spinlock> lock(env_set__.mu_normal_env_count);
for (auto& p : env_set__.expired_set)
{
factory_set__.env_factory->destroy_env(p);
--env_set__.normal_env_count;
}
env_set__.expired_set.clear();
}
env_routine_cleaned().pub();
}
复制代码
此函数中对同步的处理代码比较多,暂时忽略这些。看一下主流程起就是调用工厂的 destroy_env 函数销毁 expired_set 中所有的 env,并维护正常 env 的数量。然后清空 expired_set。
所以整个流程如下:
定时任务检测是否需要自动回收 env
如果需要,调用每个需要回收 env 的 stop_schedule 成员函数,此函数将状态设置为销毁中(co_env_state::destorying)并唤醒关联的调度线程
调度线程退出循环,发出 task_finished 信号,并将 current_env__设置为 nullptr
task_finished 对应的事件处理函数 remove_env__将 env 从正常队列移动到过期队列,并唤醒销毁线程 clean_env_routine__
clean_env_routine__调用工厂对象的 destroy_env 将过期队列中的所有 env 销毁
总结
本文介绍了 cocpp 中自动 env 自动回收的实现细节,包括策略和整个回收流程。
评论