写点什么

技术实践干货 | 从工作流到工作流

作者:观远数据
  • 2022 年 7 月 27 日
  • 本文字数:7992 字

    阅读完需:约 26 分钟

技术实践干货 | 从工作流到工作流

本文作者:葱油饼,观远数据前端工程师,落地团队开发规范,开发质量与速度并存,致力于打造更易用的 ABI 产品。


背景


先举个简单的例子,因为工作需要,你可能每天要从数据库抽取数据,然后做成报表,最后以邮件的形式发送给相关的领导。但是每个领导可能需要看的东西不一样,你需要在做成报表前对数据做下筛选和处理,那么每天这个重复的流程,是不是可以抽象成为一个具体的工作流程,把每个步骤具象成一个功能结点,然后以任务的形式串联起来,通过 DAG 的可视化形式展现出来,每天定时跑一下就可以了呢?为此,我们会需要一个工作流来标准化和自动化这个流程。


那工作流是什么?DAG 又是什么?下面让我们进入今天的内容。


前言


这篇文章会讲解我们 Universe(观远三大产品线之一,即观远数据的智能数据开发平台)里的工作流和 DAG 这两个概念,然后展开介绍一些其他内容。整体分为四个部分:

1. 开发平台里的工作流;

2. 如何抽象实现 DAG;

3. 其他工作流介绍;

4. 基于工作流和 DAG 的总结与思考。

接下来让我们开始吧~


一、工作流


首先简单介绍下 Universe 里的工作流:


实现各类任务的依赖关系、调度顺序设计,对流程进行可视化、低代码的设计及管理,对任务节点进行快速且高可用的配置,来处理一系列的数据任务;并且可以在约定时间/满足事件依赖后运行,有序调起各个任务节点,自动完成数据处理过程,具有简单易用、高可靠性及高扩展性等优势。


根据这段描述,我们可以简单总结出工作流的两个核心能力:

1. 调度;

2. 配置化(节点)。

下面详细介绍下这两个核心能力。


1.1 调度

开发平台支持基于 Cron 表达式的定时调度和基于输入源数据依赖的事件调度,其中定时调度采用 quartz 分布式调度器。


具备以下几点特性:

  • 高可用性

  • 通过 DAG 实现任务节点的可视化编排,无须复杂的平台语言学习成本,任务调度开箱即用;

  • 支持配置顺序调度、成功调度、失败调度等多种调度关系,灵活调整调度策略;

  • 支持按时、按天、按周、按月等定时运行工作流,运行结果可快速推送至钉钉、企业微信等平台,一次配置,持续可用。

  • 高可靠性:去中心化的多 Master 和多 Worker 分布式架构, 避免单点故障,增强系统可靠性。

  • 高扩展性:可以基于 SDK 开发自定义任务类型和流程无缝衔接。


1.1.1 定时调度

支持以每天/每周/每月/每年并精确到分钟的形式和间隔时长(时/分)的形式去设置定时。

举个例子:我期望工作流每天早上 7 时和晚上 21 时去运行,那我就可以选择 每天 - 7 时/21 时 - 00 分的形式,也可以设置分钟/时的间隔时间去运行。


1.1.2 事件调度

一般工作流都会有数据源依赖,比如数据集/数据库,当开启依赖的数据源全部更新的时候,工作流可以自动去运行一次。


1.2 配置化

基于一个约定式的配置描述,产出一个可交互的 UI,用于构建目标对象。


调度的目的是运行工作流,工作流的运行依赖于不同任务节点的配置,不同的配置必然会存在不一样的 UI 组件,那如何能用已知的数据结构去组装一个可视化的 UI 呢?答案就是配置化。

我们基于一个配置描述(对象)去进行读取,然后根据配置渲染对应的组件,同时把组件的值集中设置到一个总的配置对象里,从而完成了从描述到 UI 再到目标对象构建的一个过程。下面我会简单的举三个例子来说明配置化的强大与魅力。


1.2.1 基础能力

如果我们需要构建如下的一个目标对象:

{    name: '',    description: '',}
复制代码


然后我们就会有以下一段配置描述:

[    {        fieldName: 'name',        label: '名称',        type: 'STRING',        defaultValue: '',    },    {        fieldName: 'description',        label: '描述',        type: 'TEXT',        defaultValue: '',    },]
复制代码

生成的 UI 如下:



1.2.2 动态能力

很多时候我们会需要动态实现一个目标对象,什么意思呢?就是选择一个属性的不同值,动态使用一个属性组合成一个新的目标对象,那对应到 UI 上就是选择不同属性值对应展示不同的组件,那光靠我们的基础能力去实现,显然无法做到。

比如我想计算一个图形的面积,如正方形需要的是边长属性,而圆需要的是半径属性,那目标对象和 UI 就会变成:

  • 选择正方形时


{    shape: 'square',    side: 8,}
复制代码



  • 选择圆形时

{    shape: 'circle',    radius: 4,}
复制代码



可以看到 side 和 radius 是随着 shape 而动态出现的,那我们可以简单改造下配置描述:

[    {        fieldName: 'shape',        label: '图形',        type: 'MODEL',        model: {            modelType: 'SELECT',            labels: [ '圆', '正方形' ],            values: [ 'circle', 'square' ],        },    },    {        fieldName: 'radius',        label: '半径',        type: 'NUMBER',        dependsOnMap: {            shape: [ 'circle' ],        },        defaultValue: 4,    },    {        fieldName: 'side',        label: '边长',        type: 'NUMBER',        dependsOnMap: {            shape: [ 'square' ],        },        defaultValue: 8,    },]
复制代码

可以看到,我们仅仅添加了 dependsOnMap 属性,然后内部渲染和构建对象的时候稍微适配下,就可以实现选择不同属性展示不同组件的需求了。


这里简单说明下 dependsOnMap 属性,它的 key 值应该是某一个 fieldName,value 是一个数组,方便扩展允许多个值的情况,这样就可以根据 fieldName 去获取 value,与配置里的值去比较,如果一样那就展示该组件,核心逻辑如下:

function isDependsOnMap (dependsOnMap, config) {  const fieldNames = Object.keys(dependsOnMap || {})  if (fieldNames.length === 0) return true  return fieldNames.every(fieldName => {    const values = dependsOnMap[fieldName] || []    return values.indexOf(_get(config, fieldName)) > -1  })}
复制代码


1.2.3 复杂能力

我们在日常编写中可能还会存在组件之间的数据传递。因为由于配置描述的对象约束,我们在渲染每个组件的时候其实都是独立的,组件之间并不存在联系,为此我们只需要在最上层实现一个数据共享层即可,组件 3 把需要传递的数据放在数据共享层,需要该数据的组件 1 直接去获取即可。



配置如下:

[    {        fieldName: 'fieldName1',        label: '组件1',        type: 'MODEL',        model: {            modelType: 'SELECT',            labels: [ '圆', '正方形' ],            values: [ 'circle', 'square' ],            from: { fieldName: 'disabledFieldName' }, // 依赖于组件3里的设置,判读当前组件是否需要 disabled        },    },    {        fieldName: 'fieldName2',        label: '组件2',        type: 'NUMBER',    },    {        fieldName: 'fieldName3',        label: '组件3',        type: 'MODEL',        model: {            modelType: 'BOOLEAN',            targetSharedFieldName: 'disabledFieldName', // 往数据共享层设置数据的字段        },    },]
复制代码


关键的配置属性就是组件 3 里的 model.targetSharedFieldName 和组件 1 里的 model.from,两者相互对应即可,大体实现如下:

const SharedContext = React.createContext({  updateFieldValue: () => {}, // 更新字段 value  getFieldValue: () => {}, // 获取字段 value})
function Comp1 ({ definition }) {  const { targetSharedFieldName } = definition.model  const { updateFieldValue } = useContext(SharedContext)
  useEffect(() => {    updateFieldValue(targetSharedFieldName, value)  }, [ deps ])}
function Comp2 ({ definition }) {  const { from } = definition.model  const { getFieldValue } = useContext(SharedContext)  const value = getFieldValue(from)}
复制代码


最后简单上个开发平台中一个复杂的配置化 UI 动图,感受下配置化的强大和魅力:


1.2.4 服务能力

当我们需要构建一些数组类目标对象时,第一时间想到的肯定是以列表的形式去展示 UI,因此我们设计了一些服务类型的组件,只负责对列表的渲染,但是每个列表的组件根据数组元素的类型去决定。

比如我们需要这样一个数组类目标对象:

{    list: [        { name: 'a', age: 12 },        { name: 'b', age: 18 },    ],}
复制代码

那对应的配置描述可以写成这样:

[    {        fieldName: 'list',        label: '列表',        type: 'MODEL',        model: {            modelType: 'LIST',            definitions: [                {                    fieldName: 'name',                    label: '名称',                    type: 'STRING',                },                {                    fieldName: 'age',                    label: '年龄',                    type: 'NUMBER',                },            ],        },    },]
复制代码

而对应的 UI 如下:


这个 LIST 组件就是一个服务类型的组件,把数组对象通过列表形式展现出来。


1.2.5 注册能力

内置组件可能并不能完全满足配置化的需求,因为配置化只是一种约定,但是通过构建对象绘制 UI 属于自由化,展现形式千差万别,为此我们提供了注册机制。用户可以自定义注册组件类型,去绘制对应的目标对象。


1.3 总结

基于这么优秀的配置化能力应该被抽象出来,所以也被运用在了 BI 的自定义图表上。基于此,我们写了一个库叫 Lego,正如名字的含义,我们期望在搭建一些专门用于配置的 UI 时如同搭积木一样简单,约定好描述(接口),你去拼拼凑凑就可以了。


介绍完工作流,我们还需要一个可视化的界面来描述这个流程,那么 DAG 无疑是一个很好的展示形式了。


二、DAG


DAG 全称 Directed Acyclic Graph,中文为有向无环图。它由有限个顶点和“有向边”组成,从任意顶点出发,经过若干条有向边,都无法回到该顶点。举例如下图:



简单理解了 DAG 的概念,如何来针对开发平台的工作流场景来抽象出一个简单好用的 DAG 呢?首先整理下绘制一个 DAG 需要哪些信息及状态:

  • 节点信息(nodes)

  • 节点位置(location)

  • 连线信息(edges)

  • 编辑和只读状态


前三点很好理解,应该是绘制 DAG 必不可少的三要素,关于第四点解释下,因为开发平台的工作流有上下线的概念,开发完成后上线运行,不允许修改,来作为数仓开发中的一个规范,那么我们的工作流就存在了下线可编辑,上线只读的区分。


首先从编辑和只读下手,我们可以把 DAG 分为 Playground 和 Renderer 两部分,并且可独立使用。Playground 对应编辑态,Renderer 对应只读态。Playground 应该去实时生成编辑状态中的绘制信息,而 Renderer 则负责根据绘制信息去实时渲染。然后我们来梳理下编辑和只读状态下应该具备什么能力:

  • Playground

    节点拖动

    连线增删

    新增/复制节点

    框选节点进行批量复制/删除

    自动布局/撤销操作

  • Renderer

    放大缩小

    画布拖动

    节点点击


那再往上考虑一下,我们的 DAG 还应该具备什么能力?这里我结合开发平台的使用简单的列了以下几点:

  • 提供样式配置(如节点大小/连线宽度等)

  • 支持宽高自适应

  • 自定义绘制节点和连线

  • 其他绘制能力增强(如注释功能,本身并不属于 DAG 的功能,而是考虑成扩展功能实现)


至此,我们的 DAG 大概有了一个完整的结构和实现方向:

|- ConfigContext              --- 配置层     |- Playground            --- 编辑层        |- ResponsiveProvider --- 自适应宽高层(可选)           |- Renderer        --- 只读层,只做展示              |- Nodes        --- 节点              |- Edges        --- 连线
复制代码

使用上大概是这样的:

2.1 只读使用

<ConfigContext.Provider value={{ node: { width: 56, height: 56 } }}> <ResponsiveProvider>  <Renderer nodes={nodes} location={location} edges={edges} /> </ResponsiveProvider></ConfigContext.Provider>
复制代码

2.2 编辑使用

<ConfigContext.Provider value={{ node: { width: 60, height: 60 } }}> <Playground nodes={nodes} location={location} edges={edges} /></ConfigContext.Provider>
复制代码

2.3 自定义节点和连线使用

<ConfigContext.Provider value={{ node: { width: 56, height: 56 } }}> <Renderer nodes={nodes} location={location} edges={edges}>  <Nodes>   {(props) => <CustomNode />}  </Nodes>  <Edges>   {(props) => <CustomEdge />}  </Edges> </Renderer></ConfigContext.Provider>
复制代码

2.4 底层绘制

这里我们选择了 svg,是因为 svg 在绘制上足够强大,支持 css 去自定义样式,同时也方便事件的绑定。有了这个方向,我们可以确定下元素依次对应哪些标签:

画出来大致是下面这样的结构:



其中画布的放大缩小及移动是通过 viewBox 属性设置


根据 html 结构,连线是我们需要关心如何生成的,这里主要是通过两个节点的位置来计算一条二次贝塞尔曲线(Quadratic Curves)来得到一条反向对称的完美曲线,如下:


这里说下二次贝塞尔曲线在 path 标签中如何实现。首先绘制需要三个点的信息,如下动图:


其次因为我们的曲线是反向对称的,那么其实只需要绘制一半就行,这一半就是一条二次贝塞尔曲线,那么三个点的位置就很好确认了,如下:


其中 P0 为起点,P4 为终点,为方便计算,P1 对应 1/4 水平间距,高度同起点,P2 则是 1/2 的水平间距和垂直间距,然后通过计算 path 路径的 d 属性,分别把 3 个点代入即可:d = M P0x P0y Q P1x P1y P2x P2y T P4x P4y这样我们就得到了一条完整的曲线,由两条二次贝塞尔曲线构成。


2.5 布局

有了节点和连线,布局方面也是很重要的一环,人工拖拽显然有时候会显得不够整齐,如果有一个自动布局的算法,那么就会轻松许多,这里我们选择了  dagre  来作为自动布局的计算工具。主要有以下三种算法:

function rank(g) { switch(g.graph().ranker) {  case "network-simplex": networkSimplexRanker(g); break;  case "tight-tree": tightTreeRanker(g); break;  case "longest-path": longestPathRanker(g); break;  default: networkSimplexRanker(g); }}
复制代码

network-simplex

 和 tight-tree 在布局上类似,都是以紧凑的方式去实现布局,longest-path 的区别在于如果有多个末端结点,则保证这些结点从上而下对齐,而不是就近去布局,如下图:

  • network-simplex 和 tight-tree

  • longest-path


三、其他工作流


这里不会详细介绍这些工作流如何使用,只是会借鉴它们在对工作流绘制及应用上的一些想法。


3.1 n8n

The workflow automation platform that doesn't box you in, that you never outgrow.


n8n 支持以事件驱动(一般通过第三方应用的 hooks/本地文件修改监听等)和  cron  表达式的定时调度工作流,同时以数据传递的顺序确定结点之间的依赖关系。和我们的工作流很像,只是我们的工作流是结点任务调度上的依赖,而不是数据上的依赖。


3.1.1 应用

那它适合来干什么呢?如下图:


如果你是一个开源爱好者,希望知道自己的 Github Repo 被 star 或者移除 star 的时候第一时间知道消息,那么就可以使用 github 开放的 star hook,然后通过 slack 给自己发送消息。通过对第三方平台的集成,能很好把各种没有关系的应用串联起来,开发出便捷的工作流。


3.1.2 总结

n8n 目前已经集成了 200+ 的应用,覆盖了大部分主流的应用。但是国内的一些应用还是缺失的,比如钉钉/企业微信等,所以它也就顺利成章的支持了自定义开发结点,有兴趣的可以点击 这里 。整体来说,n8n 更像是一个集成应用的工作流,当然也支持一部分本地功能,如读写文件/使用 git 操作等。它可以把我们日常工作或者开发中需要点点写写的一些常用操作,整合成一个工作流,便捷日常生活。


3.1.3 借鉴

从它的工作流设计上,或许有些点可以借鉴过来:

  • 结点配置时可以看到上一个结点的输出数据是什么,方便当前步骤进行配置

  • 结点配置完可以立即执行,看到对应的输出数据

  • 连线上有一些数据可视化增强,如输出数据有多少行

  • 结点可以直接点击添加选择后置结点,省去一部分连线操作


3.1.4 其他

后面我试了下结点能否成环,结果是可以,陷入无限循环的运行中,应用卡死了,如下:

数据无限增长,运行无限循环。


3.2 Orange

Open source machine learning and data visualization. Build data analysis workflows visually, with a large, diverse toolbox.


3.2.1 应用

Orange 比较适合做 ML 相关的工作,有点像我们的 AI Flow,但是又把数据流/数据探查/图表分析等功能集成在里面,不用去其他页面单独配置处理查看,以工作流的形式对数据进行查看、处理和分析。简单上个图:

有个很有意思的小点,它的连线支持全量数据或者选中数据进行传递,如下图:

然后会把数据传递的方式体现在连线上。在连接上,它是以圆弧的形式来展现端点(我猜用圆弧是为了增加结点的连接面积,同时也适配圆形结点),有连接则为实线,无连接则为虚线,对于状态的展示上很友好。


3.2.2 总结

Orange 功能集成十分强大,除了基本的数据转换,还有图表/模型/评估等功能,很适合做 AI 方向的数据分析工作。


四、总结与思考

工作流这个概念已经提出很久了,它是对流程及其各操作步骤之间业务规则的抽象、概括和描述。工作流的出现让我们的流程得到规范,步骤变得清晰。而数据开发上面的工作流更是避免了一系列的重复操作,同时以 DAG 的形式去展现,让流程变得更为直观。当然 DAG 也不一定用在调度这类有着先后顺序限制的系统中,也可以用在其他形式中,比如数据血缘这类有着因果关系的展示上,也可以用于家族图谱的展示上,再提升一层,甚至可以用在数据处理网络中,数据从一个点流到另一个点,而并不一定需要以可视化的形式展现出来,仅仅需要这个概念就可以了。


4.1 可能性

其实我们的工作流的调度能力和配置化功能很强大,但是受限于有限的功能结点,如果我们可以支持自定义配置结点,能让用户在数据开发层面有更大的想象空间,而不是只受限于这些已有的结点去做工作流的开发。

参考资料

[1] https://dolphinscheduler.apache.org/zh-cn/docs/latest/user_doc/about/introduction.html

[2] https://en.wikipedia.org/wiki/Workflow

[3] https://en.wikipedia.org/wiki/Directed_acyclic_graph

[4] https://github.com/biolab/orange3

[5] https://github.com/n8n-io/n8n

发布于: 刚刚阅读数: 3
用户头像

观远数据

关注

还未添加个人签名 2022.07.14 加入

还未添加个人简介

评论

发布
暂无评论
技术实践干货 | 从工作流到工作流_工作流_观远数据_InfoQ写作社区