写点什么

PingCode Flow 技术架构揭秘

  • 2022 年 5 月 06 日
  • 本文字数:6055 字

    阅读完需:约 20 分钟

PingCode Flow技术架构揭秘

作者:PingCode 研发 VP@徐子岩



本文是 PingCode Flow 系列文章的第四篇,不出意外的话,也是最后一篇。从去年五月份 PingCode Flow 正式上线,就在脑海中构思好了四篇文档的顺序和大致内容,从介绍研发自动化的现状、痛点和解决方案,到展示如何使用 PingCode Flow 来实现研发的自动化。而这最后一篇,则是希望能够从纯技术角度展示 PingCode Flow 内部是如何工作的,如何在每天将近 4000 次规则执行的压力下,保障 99%的规则都能在 1 秒内完成,同时支持顺序、并行、判断、循环等复杂的运行逻辑。



同时,我们也希望在这篇文章中分享一下我们是如何分析和思考,并最终得出现在的架构。因此本文不会直接展示最终的设计结果,而是会阐述我们为什么要如此的设计,优缺点的权衡,以及开发过程中的重构。

PingCode Flow 系列文章传送门

PingCode Flow 的本质是什么

在前几篇文章中我们提到,PingCode Flow 是一款研发自动化工具。所谓的自动化,就是指在某个事件发生后,按照预定义的流程去完成一系列的操作。所以,本质上来讲,PingCode Flow 是一个 TAP(Trigger Action Platform)系统。它由一个触发器和多个动作组成一个有序的执行规则,然后按照这个规则顺序执行。





因此,PingCode Flow 在技术架构设计的时候,就是要确保这样的流程能够顺畅的运行。

数据结构:怎么定义一个规则

确定了产品的核心目标后,第一件事就是要明确数据是如何定义的。按照上面的图示,在一个团队中用户可能会定义多个规则,而每个规则都包含了一个触发器和多个后续的动作。基于这个简单的需求,可以将规则的数据结构定义如下。





这样,一个规则就包含了一个触发器以及它所包含的动作。而动作的序号决定了他们执行的先后顺序。这样的设计看来基本满足了目前的产品需求。

但是我们知道,现在的 PingCode Flow 不仅仅支持上述的单线顺序执行流程,还支持条件、并行、判断、循环等负责的执行流程。而且不止于此,我们需要将上述这些流程自由组合,譬如并行内部有判断,判断里面有循环,循环中还有并行……通过这样几乎无限的组合,让一个规则实现几乎任意的流转。





因此,上述简单的数据结构就完全不能满足需求。而如何设计一个能够支持各种场景的规则,是摆在我们 PingCode Flow 团队面前的第一个难题。

如果我们以「一个规则就是一系列动作的集合」这个方式去考虑,那么很难设计出相对通用的数据结构。因为规则内的动作是由用户决定的,不可能穷举出所有可能的结构出来。但是可以尝试换一种思路来考虑这个问题,也就是说,我们不再把规则看做是触发器和动作的有序列表,而是将他们定义为一个链表。那么一个规则就是

  • 触发器及下一个动作

  • 当前动作及下一个动作

的集合。

假如我们再将「触发器」和「动作」合并为「步骤」。那么一个规则就是

  • 第一个步骤

  • 当前步骤的下一个步骤

这样,我们对于规则和步骤的定义就可以统一为





即规则并不关心它内部的动作都是什么以及先后顺序,它只关心第一个动作是什么。而每一个动作也仅仅关心下一个动作是什么。

对于并行、循环、判断等复杂的流程,我们只需要扩展对应动作的数据结构,就可以实现不同的排列组合。譬如对于并行,它的数据结构是这样的。





「并行」内部的每个分支的第一个步骤 ID 保存在一个数组中,表示这个分支要执行的第一个步骤是什么。「并行」本身不关心每个分支里面具体的流程是什么样的。它只关心当所有分支都执行完毕后,下一个步骤是什么。

基于这样的结构,对于上述这个复杂的规则





我们的数据大致是这样的。对于 步骤 1,它只设置了下一个步骤是 步骤 2


对于 步骤 2 ,它内部有两个分支 步骤 3 和 步骤 4 ,下一个步骤是整个分支全部执行完毕的 步骤 10 。因此它的数据是这样的。


对于 步骤 4 ,它是一个循环步骤。进入循环体的第一个步骤是 步骤 6 ,而它完成循环后就会结束当前的分支操作。因此它的数据是这样的。


 下一个步骤 ID 为空,表示当前分支结束。

执行逻辑:如何支持各种类型的步骤

当我们确定了数据结构之后,规则内步骤的执行方式也随之确定了。和结构类似,当一个规则被启动后(我们先不考虑规则是如何被触发的),它会首先找到第一个动作的 ID。熟悉 PingCode Flow 的读者们都知道,我们系统内预置了很多的动作,譬如设置工作项负责人、创建页面、变更测试用例状态等等。那么这些动作是怎么被执行起来的呢。

首先,每一个动作都会有一个全局唯一的名称。当规则执行到这个步骤的时候,我们会通过步骤的 ID 找到它的动作名。通过动作的名称定位代码中对应的实际执行逻辑。





譬如「设置工作项负责人」这个动作,它的连接器名称是 project ,动作名称是 set_assignee 。代码大致如下。



@action({    name: "set_assignee",    displayName: "设置工作项负责人",    description: "设置当前一个或多个工作项的负责人。",    isEnabled: Is.yes,    allowCreation: Is.yes,    allowDeletion: Is.yes})export class AgileActionSetWorkItemsAssignee extends AgileWorkItemsAction<AgileActionSetAssigneeDynamicPropertiesSchema, AgileActionSetAssigneeDirectives, AgileActionSetAssigneeRuleStepEntity> {    constructor() {        super(AgileActionSetAssigneeRuleStepEntity, undefined, /* ... */);    }
protected onGetDirectivesMetadata(): DirectivesMetadata<Omit<AgileActionSetAssigneeDirectives, keyof AgileWorkItemsActionDirectives>> { /* ... */ };
protected onGetDynamicPropertiesMetadata(): PropertiesMetadata<Omit<AgileActionSetAssigneeDynamicPropertiesSchema, keyof AgileWorkItemsDynamicPropertiesSchema>> { /* ... */ };
protected async onExecute(context: ExecuteContextWrapper<AgileActionSetAssigneeDynamicPropertiesSchema, AgileActionSetAssigneeDirectives, AgileActionSetAssigneeRuleStepEntity>): Promise<RuleStepResult<AgileActionSetAssigneeDynamicPropertiesSchema>> { /* ... */ }}
复制代码


其中最主要的代码是 onExecute ,它将会在执行这个步骤时候被调用。当操作执行完毕后,会将数据库中保存的的 下一个步骤 ID 返回,规则执行引擎会去调用后续的步骤。这就是一个最简单的动作步骤,由系统调用,执行具体的操作,然后返回下一个步骤的 ID。

除了普通的动作之外,PingCode Flow 还支持条件、并行、判断、循环等复杂的流程控制。和刚才提到的动作一样,都是通过重写 onExecute 这个方法来实现的。以「条件」为例,它需要在判断为真的时候继续执行后续的步骤,为假则停止当前步骤。那么它的 onExecute 就是这样的。



export abstract class Condition<D extends Directives, T extends RuleStepsConditionEntity<D>> extends Element<EmptyPropertiesSchema, D, T> {
constructor(ruleStepCtor: new (...args: any) => T, contracts: ElementContract[]) { /* ... */ }
protected abstract predicate(context: ExecuteContextWrapper<EmptyPropertiesSchema, D, T>): Promise<boolean>;
protected async onExecute(context: ExecuteContextWrapper<EmptyPropertiesSchema, D, T>): Promise<RuleStepResult<EmptyPropertiesSchema>> { if (await this.predicate(context)) { return { properties: undefined, nextStepId: context.getRuleStepEntity().next_step_id }; } else { return { properties: undefined, nextStepId: undefined }; } }
public getDynamicPropertiesMetadata(): PropertiesMetadata<EmptyPropertiesSchema> { return {}; }
}
复制代码


我们定义了一个抽象方法 predicate ,用来给派生类实现具体的判断逻辑。 onExecute 方法会调用这个 predicate 。如果结果为 TRUE ,那么将会返回数据库里面定义的下一个步骤的 ID,规则将会继续执行;如果结果为 FALSE ,那么它会返回 undefined ,表示没有后续的步骤了,执行流程到此结束。

而对于「判断」、「并行」、「循环」等类型的步骤,它内部可能包含了非常复杂的流程,也可以通过现有的数据结构和执行流程做到解耦,让每个步骤只需要专注自己的工作。

以「并行」为例,我们知道它的数据结构包含了

  • 每个分支的首个步骤 ID

  • 所有分支结束后的下一个步骤 ID

因此,「并行」步骤的执行逻辑就是同时启动每个分支的首个步骤。然后等所有分支的操作都结束了,再返回下一个步骤的 ID。

@control({    name: "parallel",    displayName: "并行(Parallel)",    description: "并行执行步骤。",    isEnabled: Is.yes,    allowCreation: Is.yes,    allowDeletion: Is.yes})export class ControlParallel extends ControlAction<EmptyPropertiesSchema, RuleStepsControlParallelEntity> {    constructor() {        /* ... */    }
public getDynamicPropertiesMetadata(): PropertiesMetadata<EmptyPropertiesSchema> { /* ... */ }
protected async onExecute(context: ExecuteContextWrapper<EmptyPropertiesSchema, EmptyDirectives, RuleStepsControlParallelEntity>): Promise<RuleStepResult<EmptyPropertiesSchema>> { const entity = context.getRuleStepEntity(); const contexts = await Promise.all(_.map(entity.parallel_next_step_ids, id => new Promise<ExecuteContext>((resolve, reject) => { const ctx = context.getRawContext(true); Executor.create().execute(entity._id, id, ctx) .then(() => { return resolve(ctx); }) .catch(error => { return reject(error); }); }))); context .mergeProperties(contexts) .mergeTargets(contexts, false); return { properties: undefined, nextStepId: entity.next_step_id }; }
}
复制代码



注意在 onExecute 方法中,我们将数据库中定义的分支步骤 ID 数组 parallel_next_step_ids 转化为异步操作 Executor.create().execute ,让他们在各自的上下文中执行。然后等所有分支的操作都执行完毕,也就是 await Promise.all 后,再返回下一个步骤的 ID。这样,对于「并行」本身则完全不用关心每个分支内的执行逻辑是什么样的。而当规则执行到某个分支内的时候,也完全不会意识到自己是处在某个「并行」的「分支」中。

模块拆分:规则是如何被调度起来的

刚才我们介绍了规则和步骤的数据是怎么保存的,以及一个规则内的步骤是怎么执行的。但是规则是如何被触发启动的呢?目前 PingCode Flow 支持自动化、手动和即时三种规则,同时自动化规则又可以分为如下三种启动场景:

  • 由 PingCode 其它子产品调用启动

  • 由第三方子产品调用启动

  • 由自定义的 Webhook 调用启动





由上图可以看出,对于一个规则来说,它并不需要关心自己是由什么途径触发的。它只需要知道在某个时刻,有一个规则需要执行。因此,我们为这个执行规则的部分单独分离了一个模块,即「Flow Engine」,它的职责很简单,就是「启动某个规则」。

而对于负责接收规则启动请求的模块,它们有一个通用的职责,就是按自己的需求去通知 Flow Engine 启动规则。上图中五个触发规则的模块,各自的职责如下:




通过这样的拆分,就可以将规则的执行和规则的触发完全隔离开。在 PingCode Flow 开发初期,我们仅支持从 PingCode 其它子产品触发的规则。但是随着产品功能的不断增强,我们陆续实现了第三方产品(GitHub、GitLab、Jenkins 等)的接入,即时规则(手动触发)和定时规则(定时触发)。而这些新的触发方式完全不影响之前其它的模块,因此最大限度的保证了产品的质量。

部署方式:让所有节点都支持横向扩展

企业级 SaaS 产品的核心诉求就是数据的安全性和服务的稳定性。对于稳定性,一方面要求我们的产出物(即代码)质量很高,另一方是要求我们的服务在各个环节都能支持横向扩展,不会出现因请求量和执行量增加的情况下导致的系统性能问题和稳定性问题。单一职责的模块划分让我们在设计 PingCode Flow 部署方式的时候有了更好的选择,更容易的达成稳定性的要求。

具体来说,之前介绍的五个接收模块和规则的执行模块(Flow Engine),本身的业务逻辑都是无状态的,因此都可以支持独立的横向扩展。





上图中的箭头,表示调用关系由各个触发模块发起,按需启动 Flow Engine 的规则。我们最初是设计是使用基础框架的 RPC 功能来实现。即当有一个事件发生时,譬如用户修改了工作项的状态,那么「PingCode 子产品」这个触发模块会通过 RPC(HTTP 或 TCP 请求)同步调用 Flow Engine 的接口,启动相应的规则。

但是 PingCode Flow 和其它的 PingCode 子产品有所不同。PingCode Flow 的执行频率和实行时间是基于客户定义的规则,由 PingCode 系统内以及各种外部系统的操作和事件驱动的,一旦启动请求量和执行量会非常大。因此就要求位于后端的 Flow Engine 有足够的弹性,能够平稳的执行每一条规则,缓冲短时间的大量操作。因此,直接使用 RPC 的方案最终在架构评审会中被否定了。

既然架构目标是需要 PingCode Flow 系统能够在高峰期保护后端的 Engine 模块,因此,我们决定在调用层和实际执行层之间使用了消息队列。





通过消息队列,所有规则的执行请求会被加入队列中,然后由多个侦听队列的 Flow Engine 实例进行读取和处理。这样的好处是,首先,一旦出现短时间执行量过大的情况,执行请求会被缓冲在消息队列中,不会对 Flow Engine 造成冲击。其次,调用方和执行方完全通过数据进行交互,二者之间彻底的解耦。最后,在操作量有波动的时候,我们可以将新的 Flow Engine 接入消息队列来完成扩容,无需额外的配置 IP、端口号、请求转发和负载均衡等信息。

最终,我们 PingCode Flow 的整体架构如下所示。





写在最后:合理的架构是不断演进出来的

在与我们 PingCode 的客户进行沟通的时候,经常会被问及的一个问题是,研发团队是如何得出一个好的架构设计的?既满足了未来扩展的需要,同时避免过度的设计。对此,我个人的观点是,世界上就没有所谓好的设计,只有合理的设计。而一个合理的设计不是出自于某个架构师的空想,而要基于现有的业务需求和可预见的场景,逐步发现的。

尤其是在敏捷开发的大场景下,我们每一个迭代都是为了完成能够体现客户价值的一个一个用户故事。因此,架构设计也不是一蹴而就,而是要在每一迭代中不断的思考、设计、实践、反馈和修改,最终得到一个当前看来最为合理的答案。


Pingcode官网

Worktile官网

发布于: 刚刚阅读数: 4
用户头像

还未添加个人签名 2021.02.01 加入

还未添加个人简介

评论

发布
暂无评论
PingCode Flow技术架构揭秘_PingCode研发中心_InfoQ写作社区