写点什么

异步编程框架:Workflow 的计算调度算法

作者:1412
  • 2023-07-23
    北京
  • 本文字数:3599 字

    阅读完需:约 12 分钟

异步编程框架:Workflow的计算调度算法

让大家好奇了这么久,今天终于写被 cue 到最多的话题:Workflow 的计算调度,包括独创的调度算法与相关数据结构。P.S. 原文写于 2022 年 5 月份~


C++ Workflow 作为一个异步调度编程范式,对调度的拆解是有几个层次的:


  • 用户代码层:提供任务流级别的结构化并发,包括串并联、DAG 和复合任务等,用于管理业务逻辑,组织要做的事情的依赖关系;

  • 资源管理层:对不同资源内部做了协调和管理,比如网络、CPU、GPU、文件 IO 等,用最少的代价、做最高效、最通用的资源复用。


今天就重点介绍一下,Workflow 内部独创的计算调度算法,包括 Executor 模块(仅 200 行代码)及相关模块,整体是如何管理计算资源、协调不同计算任务,从而做到无论任务耗时长短,都可以尽可能均衡调度的最通用的方案。


而且看完之后,也可以对上一篇《一个逻辑完备的线程池》中一直强调的生命周期有一个更好的理解。架构设计一直要强调每一个模块本身做到完备和自洽,是因为更有利于演化出上层模块。


https://github.com/sogou/workflow/blob/master/src/kernel/Executor.cc

1. 计算调度面临的问题

无论是用何种计算硬件,计算调度要解决的问题是:


  1. 充分使用资源;

  2. 不同类别的任务的资源分配;

  3. 优先级管理;


第一点很好理解,以 CPU 为例,只要来任务了就要尽量跑满 CPU 上的若干核。


第二点,不同类别是比如:每件事情由 3 种步骤** A->B->C** 组成,耗费的计算资源是 1:2:3,简单的做法是可以分别给予 3 个线程池,线程比例 1:2:3(假设 24 核的机器,我们可以分别把 3 个线程池中的线程数配置为 4,8,12)。


但由于线上环境是复杂多变的如果耗费资源变成了 7:2:3,固定线程数方案显然不可取,不改动代码是难以匹配这样的状况。


这么做的另一个弊端是,如果一批提交 100 个 a,那么显然只有 4 个线程可以工作,难以做到“资源应用即用”;


还有没有解决办法呢?更复杂地,可以引入动态监测耗时,然而引入任何复杂方案都会有新的 overhead,绝大部分情况下这些都是浪费。


继续看第三点,优先级管理是比如:还是 A->B->C 三种任务。现在增加了一个 D,我想要尽快被调起来,简单的做法往往是给所有任务一个优先级编号,比如 1-32。


但这并不是长久的解决办法,编号是固定的总会往更高优的用完,而且任务自己都是贪心的,只要有最高优先级,最终大家都会卷起来(不是


我们需要的,是一个灵活配置线程比例充分调度 CPU、且可以公平处理优先级的方案。

2. 创新的数据结构:多维调度队列

Workflow 内部几乎所有的方案都是往通用了做,对于 CPU 计算,则是:全局一个线程池,和统一的管理方式。使用上,计算任务只需要带一个队列名,即可交给 Workflow 帮你做到最均衡的调度。


基本原理图如下:


Executor 内部,有一个线程池和一个基本的主队列。而每个任务本身,同时也是属于一个 ExecQueue,可以看到其结构是一组以名字区分的子队列。


这种数据结构的组合,可以做到以下三点:


  1. 首先,只要有空闲计算线程可用,任务将实时调起,计算队列名不起作用。

  2. 当计算线程无法实时调起每个任务的时候,那么同一队列名下的任务将按 FIFO 的顺序被调起,而队列与队列之间则是平等对待。

  3. 例如,先连续提交 n 个队列名为 A 的任务,再连续提交 n 个队列名为 B 的任务。那么无论每个任务的 cpu 耗时分别是多少,也无论计算线程数多少,这两个队列将近倾向于同时执行完毕。

  4. 这个规律可以扩展到任意队列数量以及任意提交顺序。


分别来看看算法是什么。


第一点:Executor 的线程不停从 Executor 内部的主队列中拿任务出来执行;


第二点:线程从主队列把任务取走、并准备执行任务之前,也把任务从它自己的子队列里拿走。并且,如果该子队列后面还有任务,就把下一个任务出来,放到主队列中。


第三点:外部用户给 Workflow 提交任务的时候,Workflow 会先把任务按名字放到子队列。并且如果发现这是子队列中的第一个任务(说明刚才子队列是空的),便立刻提交到主队列中。


算法本身相当简单,而提交任务时,只需要给调度器轻微的指导,既队列名(对应 Executor 的一个 ExecQueue),无需指定优先级或计算时间预估等信息。


当我们收到的 A, B, C 任务数足够多而且数量相等,无论任务以什么顺序到达,也无论每个(注意是每个而不是每种)任务的计算时间多少,A, B, C 三个子队列将同时计算完成。


而主队列长度,永远不超过子队列的个数,且主队列中,每个子队列的任务永远只有一个,这是算法的必然结果。

3. 源码简析

我们用最简单的 WFGoTask 为例子,把抽象的调度算法从外到里一层层落实到代码上。

1) 用法示例

void func(int a);
// 使用时WTGoTask *task = WFTaskFactory::create_go_task("queue_name_test", func, 4); task->start();
复制代码

2) 派生关系

了解过 Workflow 任务的小伙伴一定知道,Workflow 任何任务都是行为派生,而中间有一层,是基本单元,即由 SubTask 和具体执行单元双派生,这样既可以让上层任务被 SubTask 串到任务流里、也可以做具体执行单元做的事情。


对计算调度来说,具体执行单元那肯定是每个可以被线程调度起来的计算任务。


我们可以看到 WFGoTask 是从 ExecRequest 派生的,而 ExecRequest 就是执行的基本单元。(复习到网络层面,基本单元是 CommRequest,一个代表执行,一个代表网络,对称性无处不在~)打开src/kernel/ExecRequest.h文件可以找到它,这里只看dispatch()里做了什么:


 // 这里可以看到,具体执行单元是ExecSession,它负责和Executor等打交道class ExecRequest : public SubTask, public ExecSession {public:    virtual void dispatch()    {        if (this->executor->request(this, this->queue) < 0)        ...    }
复制代码


dispath()做的事情,就是把自己和自己的队列,通过request()接口提交到 Executor。

3) Executor 的生产接口:request()

int Executor::request(ExecSession *session, ExecQueue *queue){    ...    // 把任务加到对应的子队列后面      list_add_tail(&entry->list, &queue->task_list);
if (queue->task_list.next == &entry->list) { // 子队列刚才是空的,那么可以立刻把此任务提交到Executor中 struct thrdpool_task task = { .routine = Executor::executor_thread_routine, .context = queue };
if (thrdpool_schedule(&task, this->thrdpool) < 0) //提交 ... } return -!entry; }
复制代码

4) Executor 的消费接口:routine()

刚才看到线程真正执行一个任务的时候,是调用的executor_thread_routine(),传进去的 context 就是这个任务所在的子队列。


void Executor::executor_thread_routine(void *context){    ExecQueue *queue = (ExecQueue *)context;    ...    entry = list_entry(queue->task_list.next, struct ExecTaskEntry, list);      list_del(&entry->list); // 从子队列里删掉当前正要执行的任务    ...     if (!list_empty(&queue->task_list))    { // 如果子队列后面还有任务(也就是同名任务),放进来主队列中        struct thrdpool_task task = {        .routine = Executor::executor_thread_routine,        .context = queue        };         __thrdpool_schedule(&task, entry, entry->thrdpool);    }    ...    session->execute(); // 执行当前任务... 跑啊跑...    session->handle(ES_STATE_FINISHED, 0); // 执行完,调用接口,会打通后续任务流}
复制代码

4. 改造案例分享:用任务流替代传统流水线模式

在公司内部,最经典的改造案例就是用 Workflow 的任务调度替换掉传统的流水线模式,和开头介绍的按资源比例分配不同模块的线程数量是类似的,这对于某一个步骤突发增加的耗时、想要额外增加另一个步骤/module 等,都是非常不灵活的方案。


而 Workflow 的方案相比起来,则可以完美避开传统做法的许多弊端。


除此之外,实际的计算调度中还有一些问题,是非常考验框架的实现细节的。比如,错误处理好不好做,依赖和取消好不好做,生命周期好不好管理。虽然这些不是计算模块本身的事情,但 Workflow 的任务流这层都提供了很好的解决方案。


在上一篇线程池的文章 po 上网时,也有小伙伴问到:


做法如下简单分享一下:


5. 最后

如上总结一些心得吧。


我们做计算调度时往往忘了,根本要解决问题是 A->B->C ,而不是关心 1:2:3。如果常常要担心其他问题,往往是因为调度方案本身做得不够通用。只有一个最通用、最回归问题本质的架构方案,才可以让开发者不用关心其他问题,专注于提升自己的模块本身,也更方便上层做二次开发,为开发者提供充满想象力的无穷可能性。


另外也是作为上一篇逻辑完备的线程池的场景补充吧,把 Executor 等上层代码拿出来分析,才能真正感受到底层线程池中让任务本身可以调起下一个任务等做法的重要性。


Workflow 内部有许多创新的做法,也许我本身有许多表达不好或者技术不到位的地方,但技术文章都是抱着一种分享新思路新做法的心态~ 那么,很希望可以看到大家的不同意见,欢迎发到项目的 issue 中~~~


https://github.com/sogou/workflow

发布于: 2023-07-23阅读数: 35
用户头像

1412

关注

鶸鶸的架构师 2018-08-07 加入

专注于异步调度框架开发和分布式存储技术,开源框架Workflow和srpc的作者之一。

评论

发布
暂无评论
异步编程框架:Workflow的计算调度算法_c++_1412_InfoQ写作社区