UData 查询引擎优化 - 如何让一条 SQL 性能提升数倍

1 UData-解决数据使用的最后一公里
1.1 背景
在大数据的范畴,我们经历了数据产业化的历程,从各个生产系统将数据收集起来,经过实时和离线的数据处理最终汇集在一起,成为我们的主题域数据,下一步挖掘数据的价值将成为关键。
数据应用直接体现数据的价值,数据应用多种多样,它们使用数据的方式也各不相同,UData 作为数据资产和数据应用之间的桥梁,它的第一目标是解决所谓的数据使用的最后一公里问题。
UData 平台以数据指标为基本的管理单位,通过四个阶段对于数据使用提供支持,一体化整合数据链路的整个生命周期,接数据、管数据、找数据、用数据。

UData 核心聚焦数据应用场景,从数据应用倒推打通数据接入、数据管理、数据查询等环节。各种数据应用对于数据的使用方式,大部分分为两个场景:
应用在线及时访问数据,大多数以接口的形式,UData 平台相对应的提供了数据服务的模块;
业务人员通过在线查找自己需要的数据指标(数据指标地图),可视化的进行人工数据分析和展示,UData 平台同时提供了数据分析的模块。
1.2 UData 功能架构图

上图,UData 功能架构自底部向上,包含了数据流转使用的整个过程,平台内的功能模块从数据使用的流程角度,完整的涵盖了数据使用最后一公里的整个生命周期。
1.3 Udata 的数据管理

UData 对于数据的使用,从物理和逻辑两个层面进行了划分,并且对于多个租户同样进行了资源和计算的隔离。
1.4 Udata 目前能做什么?
1.4.1 指标配置化开发管理
UData 数据接入可以将外部数据实时或者定时的导入平台,同时平台提供了多种数据源的联邦查询;
在线可视化的创建数据指标,并对数据指标进行打标签;
数据指标地图使业务人员方便的查找自己需要的业务指标;
数据指标的开发,管理,使用,几个阶段相互分离,职责划分更加松耦合,业务注意力更加聚焦;
1.4.2 指标积木式编排和接口服务
UData 从底层数据源开始至最上层封装成为数据指标对外提供数据服务;
数据指标在 UData 中可以像积木一样通过可视化的方式进行任意组合;
UData 提供了接口编排能力,可以在指标组合基础之上,实现带有业务逻辑的分支条件判断;
1.4.3 指标及明细交互式关联分析和协同分享
UData 可以重用数据视图和数据指标,创建数据集,以此为基础向上进行数据分析;
数据集的配置支持 SQL 模式和可视化配置模式,分别针对不同 SQL 水平的分析人员;
面向数据分析应用,以应用场景为单位进行数据和计算函数的管理和组织,场景可共享;
数据在线化实时分析,无需线上导出数据;
在线 Excel 操作,持久化 Excel 模式,数据实时刷新,Excel 报表在线共享;
2 Udata-查询引擎执行介绍-一条 SQL 的旅行
2.1 引擎架构

Udata 查询引擎基于 StarRocks 进行了部分改造,由两部分组成 FrontEnd(FE),BackEnd(BE)组成。
FE:负责接收和返回客户端的请求,元数据和集群的管理,查询计划的生成和优化,协调 BE 进行查询。
BE: 主要负责 SR 表的数据存储和查询,外部表形式连接三方存储,并执行查询计划中的具体节点,例如 scan, 投影,聚合等。
执行主流程:
FE 收到 Sql 客户端发起的查询请求,解析 sql 并制定查询计划;
FE 下发执行计划到 BE, 并指定一个 BE 为 Coordinator;
各 BE 按照查询计划中的 PlanFragment 为执行单位,接收工作,完成工作,并将结果汇聚到 Coordinator 节点;
Coordinator 的 BE 节点将数据返回给 FE;
FE 向 Sql 客户端返回结果;
2.2 从 SQL 语句到执行的过程
2.2.1 过程概览

用户通过 Mysql 客户端工具或者 JDBC 等方式,将需要执行的 SQL 语句进行输入,输入后的 SQL 语句经过语法解析,Binder,Transformer,Optimizer 等过程,从基础的 sql 语句,经过语法树,Relation,逻辑计划,分布式物理计划等过程,最终在 FE 端通过 Coordinator 发送到 BE 侧进行执行,并后续收集 BE 返回的数据,返回给调用客户端。
2.2.2 举例介绍
表结构:
desc remote_mysql_decimal;

SQL:
2.2.3 执行过程详解
1.解析 SQL 语句
在这一步骤中,SQL 语句会进行语法检查,不符合规范的语句返回错误,之后经过语法解析,会生成一个抽象语法树,上面实例中的 SQL 语句(语句中有聚合,排序,谓词条件,limit 等元素)生成的语法树结构如下:


2.绑定数据表元数据信息-生成 Relation
生成语法树之后,只是单纯的 SQL 语法信息,在 SR 中 FE 有一个重要的作用,就是保存数据表的元数据信息(库名,表名,列名,数据类型,对应的外表)等。在这一步骤中,会将抽象语法树和 FE 中的元数据信息(Catalog)进行关联,丰富 SQL 相关的信息,将抽象语法树生成 Relation 这种数据结构。

3.Transformer - 基于 RBO,进行 Rewrite 生成逻辑执行计划
从 Relation 到逻辑计划,只是基于一些 SQL 改写规则,将树中的一些节点转变会逻辑计划节点。
如:
FromClause 会转换为逻辑计划中的 LogicalScanOperator 这种扫表操作;
WhereClause 会转换成逻辑计划中的 LOGICAL_FILTER,指导后续进行进行条件过滤;
OrderByElements 会转换成逻辑计划的 LOGICAL_TOPN,指导后续进行排序和 limit;
SelectList 会转换为逻辑计划的 LogicalProjectOperator,指导后续进行投影操作,减少网络数据传输;
本实例中的 SQL 会生成如下的逻辑计划:

4.Optimizer - 基于 CBO 优化
在这一步骤中,会根据上一步生成的逻辑计划,同时结合 FE 中保存的元数据信息,基于 CBO 优化执行计划,进行谓词下推,Join order 调整等。本实例中生成的 Optimizer Plan 如下:

5.分布式物理计划的生成- BE 执行的并行单位(PlanFragment)
BE 是分布式的,查询实际执行的时候,会将计划分配给具体的 BE。BE 之间,BE 和 FE 之间通过 RPC 通信传输数据,BE 执行的最小并行单位是 Fragment, 在这一步骤中会生成分布式的物理计划。本实例 SQL 生成的分布式物理计划如下:

2.3 数据的输出
2.3.1 PlanFragment 在 BE 侧的映射

物理执行计划切分成 PlanFragment 之后,会发送到 BE 侧执行,BE 会根据 Fragment 中的树形结构,生成对应的 Node,完成各自的算子逻辑,算子之间通过不停的调用下层算子的 get_next()函数,将数据用 chunk 的形式进行组织并流动起来,chunk 的数据结构是一种列式的批结构,非常有利于向量化的执行。
2.3.2 执行模型
1.火山模型/迭代模型 ( Volcano Model )
在这种模型中,每一种操作会抽象成一个 Operator, 在执行侧作为一个操作数,从顶到下调用 next()接口,数据从底部的 scan 节点向上传输,但是每次只传输计算一条数据,也叫做(Tuple-at-a-time),是一种拉取执行模式。优点:每个 Operator 可以单独实现逻辑,比较单间,灵活。缺点:每次传输计算一条数据,导致 next()函数调用次数过多,cpu 效率低。
2.物化模式/Materialization Model
这种模型的处理方式,仍然是调用自顶向下,数据从底向上,但是每一个操作 Operator 一次性处理所有的输入,处理完成之后,将结果一次性向上输出。此模式对于数据量较大的 OLAP 不太适合,但是比较适合数据量较小的 OLTP 系统。
3.向量化模式/批处理模型 ( Vectorized / Batch Model )
这种模型和火山模型非常类似,不同之处是每个 Operator 的 next()函数,会返回一批的 tuples 数据,相当于是一种批处理的模型,这是一种上面两种模型的折中方式。SR 的向量化执行器主要集中在算子向量化,表达式向量化,存储向量化;充分利用 SIMD 指令优化,CPU Cache 友好。
3 Udata 查询引擎-联邦查询的增强
3.1 Udata 查询引擎发展的三个阶段

3.1.1 社区版 FE + 自研 JAVA 版 BE
Udata 查询引擎的第一阶段,是参照 StarRocks 的 C++版本 BE 实现了一个 JAVA 版本的 BE,主要完成了 Udata 在第一个阶段的进行联邦查询的数据服务的任务,并且在第一个版本基础上,已经实现了聚合计算的下推,同时也经过了 618 的考验,在执行引擎层面积累了大量的经验,为我们开展引擎改造的第二阶段提供了支持;
3.1.2 原生 StarRocks + Udata 改进
鉴于 StarRocks 表的优异性能,我们将查询引擎切换回原生的 SR, 同时将之前的积累的优化经验,在原生 SR 上进行了实现,包括聚合查询和 Sort 排序的下推,额外支持了外表数据源 CK,Jsf,Http,进行了查询函数 format 等的丰富。
3.1.3 未来探索方向
在下一个阶段,Udata 查询引擎将会在 SR 的基础之上,密切地配合社区,引入新版本的功能,同时进行数据湖的使用探索和高性能的点查实践,以及跨 SR 集群的联邦查询等。
3.2 计算下推 - 极限压榨底层引擎的计算能力
3.2.1 优化背景
StarRocks 在联邦查询方面针对 MySQL, ElasticSearch 已经有了非常快的性能,StarRocks 在联邦查询方面的设计思想是针对不同的查询外部数据源,设计不同的 Scan 节点,并且尽可能的将谓词下推到 Scan 节点,在 Scan 节点查询到数据之后,上层会共用 Project 节点,Agg 节点,TopN 等这些节点的算子,基本的查询架构类似下图。

这种设计使 StarRocks 有非常好的扩展性,可以很容易的扩展到新一种的数据源,也正是这种高度可扩展的设计使我们有机会在联邦查询的细节层面,做进一步的优化,比如将一些算子的计算也尽可能的推到外部表引擎,可以节省一部分网络传输的时间,同时最大程度的压榨底层引擎原生计算能力,通过我们的测试这种计算下推也达到了数倍于原来的性能。
3.2.2 优化范围
在优化之前我们针对底层引擎和算子的特征做了调研,优化的范围包括如下:
针对 ES 引擎,进行了聚合算子的下推,但是某些特殊算子排除,不支持 sum(distinct ), avg(distinct ) 算子下推;
针对 MySQL 引擎和 ClickHouse,进行了聚合算子,TopN 算子的下推;
针对新增加的 Jsf 和 Http,进行了查询参数下推,运行时列过滤;
3.2.3 整体优化思路
目前整体的优化思路,主要分为两个部分,FE 侧的改造和 BE 侧的扩充,同时对于原生 StarRocks 计算方式保持兼容,可以轻易的切换回原来的计算模式。
1)FE 侧改造优化- Optimizer Plan 的转换
执行计划优化流程

目前 Udata 查询引擎对执行计划进行优化的节点是在原来的 Optimizer 之后,我们从 Scan 节点开始对于执行计划,进行了模式匹配,命中模式之后,进行对应的计算下推和投影的合并,同时过滤底层引擎不支持的特殊算子( 如 ES 的 sum(distinct) ),最终将转变后的物理计划发送给 BE 侧进行执行。
模式匹配和计划改写
物理计划的树状封装:

ElasticSearch:

Mysql:

查询树改写:

最终,AggScanOperator 会转变为 AggNode,发送到 BE 进行执行。
2)BE 侧改造优化
针对执行计划进行了改写之后,同样在 BE 侧我们创建了对应的 Node 节点,完成计算下推后的执行逻辑,向下对接外部执行引擎,同时向上对接类似 join 的聚合节点,最终输出结果数据。

3)原生 SR 兼容
同时执行层面,我们设置了灵活的开关 ( set agg_push_down = 0 ),可以非常容易的关闭 UData 优化。
3.2.4 改造成效-( 30 秒 vs 6 秒 )
在我们的实际过程中,我们对于计算下推,尤其是多表聚合后关联的场景进行了观察测试,计算性能随着聚合表数目的增加,会有成倍数的效果提升。

3.3 JSF&HTTP&ClickHouse 的支持 - 京东生态的对齐
3.3.1 简介
JSF 是京东内部的一种 RPC 调用服务,很多数据分析的场景中,一些维表是在其他服务中用 JSF 或者 Http 的方式提供的,或者一些已经计算好的数据指标需要在我们的 UData 计算引擎中进行关联查询,因此我们增加了对于 JSF 和 Http 的支持,来作为京东生态的一个补充。
JSF 和 HTTP 查询的两个关注点是如何将查询参数进行下推和如何将返回的结构化数据映射为表中的列数据,以便在联邦查询中进行数据关联和聚合。
同时,京东内部有不少使用 ClickHouse 的场景,我们也进行了查询支持,ClickHouse 支持 TCP 协议,http 协议,mysql wire 协议,目前 Udata 查询引擎通过 Mysql wire 协议和 ClickHouse 进行外表关联。
3.3.2 主要改造点介绍
在 FE 侧,增加了 JSF,HTTP,ClickHouse 三种外部表对应的元数据结构,可以持久化外部表查询需要的底层引擎的属性信息;
FE 侧 RBO 改造,对于 SQL 语法树对应的 FromClause 转换为对应的逻辑计划,并进一步转换为物理计划节点;
BE 侧增加对应的 ScanNode,进行数据查询;
对于 JSF 和 HTTP,通过函数,用于从 FE 侧将查询参数传输到 BE 侧真实的查询节点,查询参数下推,同时列的过滤条件在获取数据后,在 Scan 节点运行时过滤;
对于 JSF 和 HTTP,建表中增加 Mapping,将返回的 JSON 数据映射到数据列;
ClickHouse 外部表查询节点,可以支持两种模式,普通的 scan 查询和计算下推的 Agg 查询;

3.3.3 使用方式及案例展示
1)Jsf 外部表使用
Jsf 建表语句 ( 表结构+访问 JSF 必须的元信息 ):

Mapping ( Jsf 返回的 json 字串与数据表结构的映射 ):

查询 Sql 语句 ( 查询参数下推 和 列表达式运行时过滤 ):

上面的 sql 是用来查询 Jsf 外表的,同样的其他聚合函数都可以用于该 Jsf 表查询,上面主要有以下需要进行下说明:
列表达式过滤:( recv_count >= 1000 ) 这种过滤条件用于 Scan 操作获取到数据之后,在 BE 节点内运行时进行再次过滤;
查询参数下推: jsfparam 函数内置于 Udata 查询引擎,可以通过此函数,将需要带入到 Jsf 调用中的参数从调用端一直传递到 Jsf 服务中,从而减少数据的获取;
联邦查询:Jsf 表同其他外表一样可以支持联邦查询,也同样可以支持其他外表支持的聚合等查询操作;
2)Http 外部表使用
Http 建表语句:

Http 的建表语句同上面 Jsf 表,只是 Properties 有所变化,变成了 http 访问的元信息。
查询函数:

httpconfig : 第一个参数是数据表中的某一个列名,后面是一个 map, 目前仅支持 httpmethod 表示请求的方式 get/post ;
httpheader : 第一个参数是数据表中的某一个列名,后面是一个 map, json 结构,解析后,按照 key=>value 的配对,放入 http 请求的 header 中去 ;
httpbody : 第一个参数是数据表中的某一个列名,后面是参数,将直接放入 http 的请求的 body 中,这里需要注意的是 http 请求的方式是 application/json , 还是 x-www-form-urlencoded ,两种方式 body 中的写法是不一样的,x-www-form-urlencoded 写法是 key1=value&key2=value2 ;
3.4 查询代理-使 Udata 查询引擎在理论上具备了查询一切的可能性
UData 查询引擎目前支持的联邦数据源有 Es, Mysql, Ck, StarRocks, Hive, Iceberg, Hudi 等,同时对于 UData 目前不支持的数据源可以通过代理插件的形式进行扩展,我们提供了 Udata Proxy 的设计,只要遵循 Udata 代理提供的接口,实现对应的逻辑,来完成其他三方数据源的读取,便可以集成到 UData 查询引擎中,并和其他数据源一样可以完成普通查询和联邦关联查询。

3.4.1 批处理 vs 分页流式
Udata 查询引擎增加了 Proxy scan 节点,Scan 节点和 Proxy 代理之间可以通过 Http 和 RPC 两种协议进行通讯;
数据从 Proxy 传输到 Scan 节点有两种方式:
批处理:一次性获取 proxy 返回的全部数据;
分页流式:适合数据量比较大的场景,利用 scroll_id 的参数,使数据可以分页微批的方式流向 scan 节点,需要 Proxy 中逻辑代码也支持滑动查询;
3.4.2 逻辑读插件热插拔
任何异构的数据源可以通过逻辑读插件的形式来支持,Proxy runtime 提供插件的执行环境,并进行并行线程的管理,逻辑读插件可以通过 Proxy 管理端进行上传和管理,热插拔及时生效;
4 团队介绍
京东物流运营数据产品部-数据工具研发组职责为面向京东物流业务场景,打造数据相关工具及平台研发,提供数据服务、分析和产品化能力,助力业务降低数据使用成本,助力研发提升数据开发效率,让数据发挥更大价值,主打产品——服务分析一体化系统 UData。
作者:刘敬斌 贺思远
版权声明: 本文为 InfoQ 作者【京东科技开发者】的原创文章。
原文链接:【http://xie.infoq.cn/article/572b1567945c86eaca38c6a6a】。文章转载请联系作者。
评论