写点什么

流批一体向量化引擎 Flex

作者:Apache Flink
  • 2025-06-11
    陕西
  • 本文字数:8095 字

    阅读完需:约 27 分钟

流批一体向量化引擎Flex

摘要:本文整理自蚂蚁分布式计算引擎技术专家,Calcite Comitter、Flink Contributor 刘勇老师,在 Flink Forward Asia 2024 核心技术 (一) 专场中的分享,内容分为以下三个部分:

1、向量化技术背景

2、架构

3、未来规划

01. 向量化技术背景

1.1 什么是向量化计算

1.1.1 并行数据处理 SIMD 指令

以下面循环代码为例,计算在 CPU 内完成需经三步:

  • 加载(Load),从内存加载 2 个源操作数(a[i]和 b[i])到 2 个寄存器。

  • 计算(Compute),执行加法指令,作用于 2 个寄存器里的源操作数副本,结果产生到目标寄存器。

  • 存储(Store),将目标寄存器的数据存入(拷贝)到目标内存位置(c[i])。

void addArrays(const int* a, const int* b, int* c, int num) {  for (int i = 0; i < num; ++i) {    c[i] = a[i] + b[i];  }}
复制代码

该流程即对应传统的计算架构:单指令单数据(SISD)顺序架构,任意时间点只有一条指令作用于一条数据流。如果有更宽的寄存器(超机器字长,比如 256 位 16 字节),一次性从源内存同时加载更多的数据到寄存器,一条指令作用于寄存器 x 和 y,在 x 和 y 的每个分量(比如 32 位 4 字节)上并行进行加,并将结果存入寄存器 z 的各对应分量,最后一次性将寄存器 z 里的内容存入目标内存,那么就能实现单指令并行处理数据的效果,这就是单指令多数据(SIMD)。



1.1.2 向量化执行框架具有的特性

执行引擎常规按行处理的方式,存在以下问题:

  • CPU Cache 命中率差。一行的多列(字段)数据的内存紧挨在一起,哪怕只对其中的一个字段做操作,其他字段所占的内存也需要加载进来,这会抢占稀缺的 Cache 资源。Cache 命失会导致被请求的数据从内存加载进 Cache,等待内存操作完成会导致 CPU 执行指令暂停(Memory Stall),这会增加延时,还可能浪费内存带宽。

  • 变长字段影响计算效率。假设一行包括 int、string、int 三列,其中 int 类型是固定长度,而 string 是变长的(一般表示为 int len + bytes content),变长列的存在会导致无法通过行号算 offset 做快速定位。

  • 虚函数开销。对一行的多列进行处理通常会封装在一个循环里,会抽象出一个类似 handle 的接口(C++虚函数)用于处理某类型数据,各字段类型会 override 该 handle 接口。虚函数的调用多一步查表,且无法被内联,循环内高频调用虚函数的性能影响不可忽视。



因此,要让向量化计算发挥威力,只使用 SIMD 指令还不够,还需要对框架层面进行改造,数据按列组织

  • 数据按列组织将提高数据局部性。参与计算的列的多行数据会内存紧凑的保存在一起,CPU 可以通过预取指令将接下来要处理的数据加载进 Cache,从而减少 Memory Stall。不参与计算的列的数据不会与被处理的列竞争 Cache,这种内存交互的隔离能提高 Cache 亲和性。

  • 同一列数据在循环里被施加相同的计算。批量迭代将减少函数调用次数,通过模版能减少虚函数调用,降低运行时开销。针对固定长度类型的列很容易被并行处理(通过行号 offset 到数据),这样的执行框架也有利于让编译器做自动向量化代码生成,显著减少分支,减轻预测失败的惩罚。结合模板,编译器会为每个实参生成特定实例化代码,避免运行时查找虚函数表,并且由于编译器知道了具体的类型信息,可以对模板函数进行内联展开。



1.2 向量化计算在分布式计算领域的现状

随着技术的发展,向量化技术在硬件、指令集、配套工具、类库等得到多方位协同发展。

  • 在指令集层面,当前大多数机器都支持 SIMD 指令集,比如 x86 平台的 SSE、AVX 指令集,ARM 平台的 NEON 指令集。

  • 在编译器层面,现代编译器如 GCC、LLVM 可以自动将代码中可向量化的部分转成 SIMD 指令。比如开源有一个基于 arrow 的表达式计算框架 gandiva 就是基于 LLVM 生成 SIMD 指令。

  • 在类库层面,有支持跨平台的库 XSIMD,可同时运行在 x8 6 和 arm 架构。



上面讲述了向量化的技术原理和可供使用的方案,我们再看下大数据领域在向量化方向做了哪些探索。首先看下 Photon, Photon 是 Databricks 闭源的一个全新的向量化引擎,完全用 C++ 编写。Databricks 在过去几年里,一直通过各种技术手段对内部的 Spark 版本 Databricks Runtimes 进行优化。左下图展示了不同 DBR 版本相对于 2.1 版本的性能提升情况,可以看到未开启 Photon 的版本性能提升在 1-2 倍,自从上了 Photon 后,性能提升可以达到 3-8 倍。自从 Databricks 论文公布其方案和效果后,大数据领域进行了各种尝试和探索。主流分布式计算引擎都在往向量化方向发力且效果显然,Flink 作为流批一体实时计算引擎在湖仓一体中起关键核心作用,但缺少核心的向量化能力,因此我们蚂蚁向这块发起了挑战。



1.3 流批一体向量化引擎在蚂蚁的探索

今年我们在 Flink 1.18 中首次引入了流批一体向量化计算,通过使用 C++开发,利用 SIMD、向量化、列计算等技术,在完全兼容 Flink SQL 语法的基础之上,可以提供比原生的执行引擎更高的吞吐,并可以降低用户 Flink 作业的执行成本。同时,SQL 语义对齐,对用户来说,仅仅需要增加配置即可开启,降低存量用户使用成本。

02. 架构

2.1 引擎选型



由于目前开源社区已经有比较成熟的 Native Engine,如 ClickHouse、Velox,具备了优秀的向量化、批处理、列计算等能力,并经过业界广泛验证和实践。因此,我们不打算重头造轮子,而是站在巨人的肩膀上。采用了类似 gluten 的实现方式,基于 velox 上构建。同时,为了防止闭门造车,在立项之出选择和 gluten 社区合作,讨论构建方案以及加强紧密型合作。内部代号 Flex,Flex 是 Fink 和 Velox 的全称,也是 Flexible 的前缀,我们希望它能够做到灵活可插拔,像胶水一样扮演其工作。

同时,velox 算子面向传统批处理场景,算子本身无法处理回撤消息,即天然和流式计算场景想违背,因此无法直接基于 gluten 的 substrain plan 方案。改方案实现复杂工作量较大,也没法支持状态算子。同时,为了复用 velox 一些现有能力,不必重复造轮子,因此我们另辟蹊径采用了新的方案,新方案可以做到真正的流批一体,同时,向量化核心效果在于表达式计算。因此非常适合 Calc 算子,而且 Calc 算子不区分流和批,也就是说流任务和批任务都可以享受到向量化带来的技术红利,也可以使用到 velox 的表达式计算能力。

最后,我们在内部完成相应的 POC 之后,通过使用基准测试函数进行测试,发现端到端的性能 TPS 能够提升四倍以上。所以我们开始立项,代号叫 Flex,它是 Flink 加 Velox 的全称,也是 Flexible(灵活的)的前缀,我们希望它能够做到灵活可插拔,做好中间层的工作。

2.2 Flex 引擎架构



Flex 的架构主要分为六个模块。第一层是 JNI 胶水层,其主要负责实现一套通用的高性能 JIN 库和 Velox 进行交互。第二层是 native 算子层,当前支持 native source、native sink、native calc。第三层是 plan conversion,负责将 Flink 的算子、数据等转换成 Velox 能处理的形式。第四是数据转换层,因为 Flink SQL 是面向 RowData 的数据结构,需要把它转变成面向列的 Velox RowVector 数据结构。第五层是 Fallback 层,由于状态算子当前还不支持需要进行 fallback 处理。第六层是统一的管理层,对内存等进行统一管理。

2.3 关键工作

然后我们从功能性、正确性、易用性和稳定性四个方面进行全方位建设。



2.3.1 功能性建设



2.3.1.1 Native 算子层优化

NativeCalc 算子优化

在用户向量化作业验证过程中,发现对于任务中含有很多 RexInputRef 引用字段,这些字段数据内容长且没有加工逻辑,如果把使用到的 Scheme 字段全部做攒批和数据转换,转换开销将近 11%。我们可以将引用字段单独拆出来,拆成两个 Calc 算子,在 NativeCalc 算子层仅仅将表达式中使用的字段进行数据转换,其他字段仅仅 forward 到原有费 Calc codegen 计算进行取值,最后再拼接 JoinedRowData。为了高效的拼接 JoinedRowData,自然需要插入 Calc 算子对字段重排序,因此需要增加规则支持 projection reorder。通过该优化数据转换层开销降低到 6.68%

同时对于表达式里面含有 udf,复用此方案把 udf 也拆到引用字段的 Calc 里面,就不需要整个 Calc 算子 fallback。



支持 NativeSource/NativeSink

对于蚂蚁内部有类似 kafka 的消息队列 sls, sls source/sink 当前支持 Arrow 模式,为了避免冗余的数据行转列开销,插件层直接读写 Arrow 格式数据,从而避免额外的行转列开销。

2.3.1.2 Plan 层优化

为了尽可能多的将 DAG 中各种算子中包含 projection 和 condition 的逻辑中包含 SIMD 函数的计算逻辑委托给 velox 进行计算,我们在 plan 层增加了多种基于规则的 Rule 对算子进行拆分合并和交互,全方位的发挥性能极致。目前 Calc、Join、Correlate 算子都已经支持 Native 执行

由于流式计算场景需要攒批,发挥列计算的优势,但是攒批和转换数据层存在一定开销,因此对于 calc 算子中整个表达式 tree 都没有使用到 SIMD 函数,将不翻译成 native 算子

下面的 sql RexCall 以 blink_json_value 为例进行阐述优化规则

projection reorder

以下面 sql 为例,该规则将 RexInputRef 引用字段排在 RexCall 前面,通过一个非 Native 的 Cal 记录下引用关系。主要用于和算子层优化结合。

SELECT a, blink_json_value(a, c), b FROM MyTable
复制代码


Calc(select=[a, f0 AS EXPR$1, b])+- NativeCalc(select=[a, b, blink_json_value(a, c) AS f0])   +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c, d)]]], fields=[a, b, c, d])
复制代码
projections 包含 SIMD 函数

该 sql 仅仅 projections 含有 SIMD 函数,condition 没有使用,因此 filter 逻辑还是使用非 native 的算子。

SELECT a, blink_json_value(b, d) FROM MyTableWHERE concat(a, '1') is not null
复制代码

经过优化后的执行计划如下所示

NativeCalc(select=[a, blink_json_value(b, d) AS EXPR$1])+- Calc(select=[a, b, d], where=[CONCAT(a, '1') IS NOT NULL])   +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c, d)]]], fields=[a, b, c, d])
复制代码
condition 包含 SIMD 函数

该 sql projections 没有使用 SIMD 函数,condition 含有,因此 condition 部分使用 native 算子执行。

SELECT a, b, concat(a, '1') FROM MyTableWHERE blink_json_value(a, c) is not null
复制代码

经过优化后的执行计划如下所示

Calc(select=[a, b, concat(a, '1') AS EXPR$1], where=[f0])+- NativeCalc(select=[a, b, blink_json_value(a, c) IS NOT NULL AS f0])   +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c, d)]]], fields=[a, b, c, d])
复制代码
projections 和 condition 都包含 SIMD 函数
  • 该 sql projections 和 condition 都含有 SIMD 函数,因此拆成两个 Native 算子执行。

SELECT blink_json_value(a, b), concat(c, '1') FROM MyTableWHERE blink_json_value(a, c) is not null
复制代码

经过优化后的执行计划如下所示

NativeCalc(select=[blink_json_value(a, b) AS EXPR$0, CONCAT(c, '1') AS EXPR$1])+- Calc(select=[a, b, c], where=[f0])   +- NativeCalc(select=[a, b, c, blink_json_value(a, c) IS NOT NULL AS f0])      +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c, d)]]], fields=[a, b, c, d])
复制代码
Inner Join 的 condition 中包含 SIMD 函数



下面的双流 join 例子,不仅仅 On 条件中包含 SIMD 函数,后面的 Where 逻辑也有。

    SELECT a, concat(d, '1') FROM(            SELECT a, d FROM(            SELECT a, d                    FROM leftTable JOIN rightTable ON                    a = d and blink_json_value(a, a) = concat(a, d))    WHERE blink_json_value(a, d) = concat(a, d))
复制代码

calcite 解析后的 AST 树如下

LogicalProject(a=[$0], EXPR$1=[CONCAT($1, _UTF-16LE'1')])+- LogicalProject(a=[$0], d=[$1])   +- LogicalFilter(condition=[=(blink_json_value($0, $1), CONCAT($0, $1))])      +- LogicalProject(a=[$0], d=[$3])         +- LogicalJoin(condition=[AND(=($0, $3), =(blink_json_value($0, $0), CONCAT($0, $3)))], joinType=[inner])            :- LogicalTableScan(table=[[default_catalog, default_database, leftTable]])            +- LogicalTableScan(table=[[default_catalog, default_database, rightTable]])
复制代码

通过 logical plan 优化,可以看到两个 SIMD 函数,已经抽取到单独的 Calc 算子,跟在 Join 后面

FlinkLogicalCalc(select=[a, CONCAT(d, '1') AS EXPR$1], where=[f0])+- FlinkLogicalCalc(select=[a, d, AND(=(blink_json_value(a, a), CONCAT(a, d)), =(blink_json_value(a, d), CONCAT(a, d))) AS f0])   +- FlinkLogicalJoin(condition=[=($0, $1)], joinType=[inner])      :- FlinkLogicalCalc(select=[a])      :  +- FlinkLogicalLegacyTableSourceScan(table=[[default_catalog, default_database, leftTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])      +- FlinkLogicalCalc(select=[d])         +- FlinkLogicalLegacyTableSourceScan(table=[[default_catalog, default_database, rightTable, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
复制代码

如果不开启 native 能力,翻译后的 ExecPlan 如下,其中 Calc 推到了下面的 Join 算子

Calc(select=[a, CONCAT(d, '1') AS EXPR$1])+- Join(joinType=[InnerJoin], where=[((a = d) AND (blink_json_value(a, a) = CONCAT(a, d)) AND (blink_json_value(a, d) = CONCAT(a, d)))], select=[a, d], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])   :- Exchange(distribution=[hash[a]])   :  +- Calc(select=[a])   :     +- TableSourceScan(table=[[default_catalog, default_database, leftTable]], fields=[a, b, c])   +- Exchange(distribution=[hash[d]])      +- Calc(select=[d])         +- TableSourceScan(table=[[default_catalog, default_database, rightTable]], fields=[d, e, f])
复制代码

开启 native 能力后,SIMD 表达式是委托给了 native 计算

Calc(select=[a, CONCAT(d, '1') AS EXPR$1], where=[f0])+- NativeCalc(select=[a, d, ((blink_json_value(a, a) = CONCAT(a, d)) AND (blink_json_value(a, d) = CONCAT(a, d))) AS f0])   +- Join(joinType=[InnerJoin], where=[(a = d)], select=[a, d], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])      :- Exchange(distribution=[hash[a]])      :  +- Calc(select=[a])      :     +- TableSourceScan(table=[[default_catalog, default_database, leftTable, nativeOperator=[false]]], fields=[a, b, c])      +- Exchange(distribution=[hash[d]])         +- Calc(select=[d])            +- TableSourceScan(table=[[default_catalog, default_database, rightTable, nativeOperator=[false]]], fields=[d, e, f])
复制代码
Correlate 节点对应物理算子 condition 中包含 SIMD 函数



同理,对于 Flink 内置 udtf 函数物理算子实现,由于包含 condition,对于该场景也是可以使用 native 进行加速

Calc(select=[a, b, c, f0, f1], where=[((CAST(f1 AS BIGINT) = a) AND (c = f0))])+- Correlate(invocation=[func($cor0.c)], correlate=[table(func($cor0.c))], select=[a,b,c,f0,f1], joinType=[INNER], condition=[AND(=(blink_json_value($0, _UTF-16LE'$.id'), 2), =(+($1, 1), *($1, $1)))])   +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
复制代码

经过优化后的执行计划如下所示

Calc(select=[a, b, c, f0, f1], where=[f00])+- NativeCalc(select=[a, b, c, f0, f1, ((blink_json_value(f0, '$.id') = 2) AND (CAST(f1 AS BIGINT) = a) AND (c = f0)) AS f00])   +- Correlate(invocation=[func($cor0.c)], correlate=[table(func($cor0.c))], select=[a,b,c,f0,f1], joinType=[INNER], condition=[=(+($1, 1), *($1, $1))])      +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
复制代码

2.3.1.3 Native 层

在 native 层,我们支持了 18 个 SIMD 函数,其中字符串函数 15 个,数学函数 3 个,也补齐了大量 velox 不支持的 Flink 内置函数。

2.3.1.4 细粒度 Fallback 机制

  • 支持细粒度的 fallback 机制,全部做到可配置

  • 仅含有 SIMD 函数的表达式才翻译成 NativeCalc

  • 支持细粒度的函数签名级黑名单机制

  • 对于 SQL timestamp/decimal 类型 fallback 机制,对这种容易出现正确性问题的类型先 fallback

2.3.1.5 其他

  • 支持配置化的函数映射机制,函数覆盖优先级机制

  • 支持配置化的函数映射机制对于 Flink velox spark/presto 函数语义一直但是函数名不一样,可以不用改动 c++代码,修改配置即可,函数覆盖优先级机制持配置化的函数映射机制。新引入的函数或者修复 velox 函数 bug 无需加入 velox,导致整个编译时间很久。在 flex 内部加入即可,编译时间从 2 个小时到 2 分钟。

2.3.2 正确性建设



这个是重中之重。两套引擎函数行为语义无法保证是对齐的,如何通过自动化手段发现二者行为上的差异?因此我们支持了函数级和作业级两套自动化比对框架。

如何复用 Flink 原有函数单元测试代码,而不需要改动和重写测试逻辑。然后当前 Flink 内置函数有两套机制 oldstack/newstack,测试框架也不同。为此我们改造原有引擎 2 套测试框架逻辑,通过反射方式将这些单元测试自动注入向量化配置方式进行自动化比对。每天定时跑脚本将语义不一致的函数输出出来。通过这个工具,我们发现了 Flink 引擎本身函数正确性问题 4 个,都已经提给社区,Flink 和 Velox 函数语义不对齐问题 15 个。

也支持了作业级端到端比对框架,本质就是上线前双跑比对。对重要作业 mock 两个作业消费固定的数据集输出到不同表,每天定时跑并输出比对结果报告到钉钉群。

2.3.3 易用性建设



为了全方位提升易用性,我们在引擎层,尽最大程度将简单交给用户,复杂留给自己

  • 如何提前发现用户作业中使用的函数 velox 是否支持,可以翻译成 Native 算子?我们开发了一套自动化编译工具捞取线上作业自动化执行,将各种函数不支持问题问题提前解决掉,从而减少用户干扰。

  • 如何提前知道开发的 SIMD 函数效果怎么样,是否有性能回退?因此我们基于 JMH 框架实现了一套端到端的性能测试框架,因为数据转换层有一定开销,因此需要对比整体性能更合理,目前支持 GenericRow 和 ColumnRow。下图是函数使用 SIMD 实现端到端的性能效果数据。



还搭建了向量化大盘,可以看到作业级别的效果数据。

易用的 DAG 中 native calc/source/sink 算子展示

2.3.4 稳定性建设



对向量化作业配置监控告警,提前发现问题。同时,由于 Native 算子需要额外的 native 内存,我们 plan 层自动注入额外资源。

2.3.5 效果

我们从线上捞取符合向量化场景的部分作业跑成向量化方式,下面这张图是真实的效果数据,有 13%的作业端到端 TPS 可以提升 1 被以上,37%的作业可以提升 40%以上,作业平均提升了 75%。其中效果最好的作业提升了 14 倍。



03. 未来规划

最后我来展望一下未来规划:



  • 全新的数据转换层支持 RowData 直接转 velox RowVector,减少转换层数据拷贝开销

    目前 Flink 的数据转换是基于 Arrow 的数据转换,存在额外的数据转换开销

  • 和 Paimon 结合,支持 Native Parquet/Orc Reader

  • 支持更多算子,非状态算子如维表算子,状态算子等

  • 支持更多 SIMD,支持 SQL 全类型,对齐 Flink 所有内置函数

更多内容




活动推荐

阿里云基于 Apache Flink 构建的企业级产品-实时计算 Flink 版现开启活动:新用户复制点击下方链接或者扫描二维码即可 0 元免费试用 Flink + Paimon实时计算 Flink 版(3000CU*小时,3 个月内)了解活动详情:https://free.aliyun.com/?utm_content=g_1000395379&productCode=sc



发布于: 刚刚阅读数: 4
用户头像

Apache Flink

关注

Apache Flink 中文社区 2020-04-29 加入

官方微信号:Ververica2019 微信公众号:Apache Flink 微信视频号:ApacheFlink Apache Flink 学习网站:https://flink-learning.org.cn/ Apache Flink 官方帐号,Flink PMC 维护

评论

发布
暂无评论
流批一体向量化引擎Flex_大数据_Apache Flink_InfoQ写作社区