写点什么

Meta 项目功能测试 | 开启 PrestoDB 和 Aria 扫描优化

作者:Alluxio
  • 2022 年 8 月 19 日
    北京
  • 本文字数:3857 字

    阅读完需:约 13 分钟

Meta项目功能测试 | 开启PrestoDB和Aria扫描优化

概要速览

PrestoDB 的 Aria 项目曾于 2020 年发布过一组实验性功能,用来提高对表(通过 Hive 连接器连接并以 ORC 格式存储数据)的扫描性能。

在本文中,我们将在基于 Docker 的 PrestoDB 测试环境中对这些新功能进行基础性的测试。[1]

Presto

Presto 是一款能够大规模并行处理 (MPP) 的 SQL 执行引擎。执行引擎与数据存储是分离的,该项目包含大量插件(又称为连接器,connector),它们为 Presto 引擎提供查询的数据。数据存储中的数据被读取后,交由 Presto 执行查询操作,比如数据连接(joining)和聚合(aggregation)。这种数据存储和执行分离的架构允许单个 Presto 实例查询多个数据源,从而提供了非常强大的联合查询层。

Presto 有许多可用的连接器,社区也会定期提供用以访问数据存储的新型连接器。

Hive 连接器

Hive 连接器一般被视为 Presto 的标准连接器。我们通常用它连接到 Hive Metastore,以此来获取 Metastore 中定义的表的元数据信息。数据通常存储在 HDFS 或 S3 中,而 Metastore 提供有关文件存储位置和格式的信息;最常用的是 ORC 格式,但也支持 Avro 和 Parquet 等其他格式。Hive 连接器允许 Presto 引擎并行地将数据从 HDFS/S3 扫描到引擎中来执行查询。ORC 格式是一种非常标准且常见的数据存储格式,能提供很好的压缩比和性能。

两个用于执行查询的核心服务

Presto 有两个用于执行查询的核心服务:一个负责查询解析和任务调度等职责的 Coordinator,以及多个负责并行执行查询的 Worker。理论上,Coordinator 也可以充当 Worker 的角色,但在生产环境中不会这么操作。鉴于我们在这里测试的是 Presto,为方便起见,我们只使用一个节点,既作为 Coordinator 也作为 Worker。[2]

我们将使用单个 Docker 容器来进行本次 Presto 的测试。请点击查看部署文档,文档的末尾处有如何实现单节点 Presto 部署的示例。

下面来介绍 Presto 是如何执行一条查询语句的:

首先,Presto coordinator 先对查询语句进行解析,从而制定出一个执行计划(下文会提供示例展示)。计划制定完成之后就会被分成几个阶段(或片段),每个阶段将执行一系列操作,即引擎用来执行查询的特定函数。执行计划通常从连接器扫描数据开始,然后执行一系列操作,如数据过滤、部分聚合以及在 Presto worker 节点之间交换数据来执行数据连接和最终的数据聚合等。所有这些阶段被分成多个分片(split),即 Presto 中的并行执行单元。Worker 并行执行可配置数量的分片,从而获得所需的结果。引擎中的所有数据都保存在内存中(前提是不超过集群的容量阈值)。

Hive 连接器(以及所有其他连接器)负责将输入数据集拆分为多个分片,供 Presto 并行读取。作为一项优化措施,Presto 引擎将告知连接器查询中使用的谓词(predicate)以及选定的列(column)——称为谓词下推 (predicate pushdown),这使得连接器能够在把数据提供给 Presto 引擎之前过滤掉不必要的数据,这也是本文的重点所在。

为了演示谓词下推,我们来看一个基本查询——统计某个数据表内符合条件的行数。我们的查询示例是基于基准测试数据集 TPC-H 的 lineitem 数据表进行的。TPC-H 的 lineitem 表中大约有 6 亿行记录,它们的 shipdate 字段取值介于 1992 和 1998 之间。下面的查询语句是针对 lineitem 数据表的设置条件过滤谓词,筛选出 shipdate 字段为 1992 年的数据行。我们先在不启用 Aria 增强会话属性的情况下,通过运行 EXPLAIN 命令来观察一下查询计划:

presto:tpch> EXPLAIN (TYPE DISTRIBUTED) SELECT COUNT(shipdate) FROM lineitem WHERE shipdate BETWEEN DATE '1992-01-01' AND DATE '1992-12-31';
Fragment 0 [SINGLE] Output layout: [count] Output partitioning: SINGLE [] Stage Execution Strategy: UNGROUPED_EXECUTION - Output[_col0] => [count:bigint] _col0 := count - Aggregate(FINAL) => [count:bigint] count := ""presto.default.count""((count_4)) - LocalExchange[SINGLE] () => [count_4:bigint] - RemoteSource[1] => [count_4:bigint]
Fragment 1 [SOURCE] Output layout: [count_4] Output partitioning: SINGLE [] Stage Execution Strategy: UNGROUPED_EXECUTION - Aggregate(PARTIAL) => [count_4:bigint] count_4 := ""presto.default.count""((shipdate)) -ScanFilter[table = TableHandle {connectorId='hive', connectorHandle='HiveTableHandle{schemaName=tpch, tableName=lineitem, analyzePartitionValues=Optional.empty}', layout='Optional[tpch.lineitem{domains={shipdate=[ [[1992-01-01, 1992-12-31]] ]}}]'}, grouped = false, filterPredicate = shipdate BETWEEN (DATE 1992-01-01) AND (DATE 1992-12-31)] => [shipdate:date] Estimates: {rows: 600037902 (2.79GB), cpu: 3000189510.00, memory: 0.00, network: 0.00}/{rows: ? (?), cpu: 6000379020.00, memory: 0.00, network: 0.00} LAYOUT: tpch.lineitem{domains={shipdate=[ [[1992-01-01, 1992-12-31]] ]}} shipdate := shipdate:date:10:REGULAR
复制代码

查询计划按自下而上顺序来阅读,从 Fragment 1 开始,并行扫描 lineitem 表,使用谓词对 shipdate 列进行过滤,然后对每个分片执行部分聚合,并将该部分结果交换到下一阶段 Fragment 0 来执行最终的聚合,之后再将结果发送到客户端,查询计划流程参见下图:(图中靠近底部的水平线标示出哪些代码在 Hive 连接器中执行,哪些代码在 Presto 引擎中执行。)


现在我们来执行这个查询!

presto:tpch> SELECT COUNT(shipdate) FROM lineitem WHERE shipdate BETWEEN DATE '1992-01-01' AND DATE '1992-12-31';  _col0  ---------- 76036301(1 row)
Query 20200609_154258_00019_ug2v4, FINISHED, 1 nodeSplits: 367 total, 367 done (100.00%)0:09 [600M rows, 928MB] [63.2M rows/s, 97.7MB/s]
复制代码

我们看到,lineitem 表包含 7600 多万行 shipdate 列取值为 1992 年的记录。执行这个查询大约花费了 9 秒,总共处理了 6 亿行数据。

现在我们来激活会话属性 pushdown_subfields_enabled 和 hive.pushdown_filter_enabled,以启用 Aria 功能,下面我们来看一下查询计划发生了怎样的变化:

presto:tpch> SET SESSION pushdown_subfields_enabled=true;SET SESSIONpresto:tpch> SET SESSION hive.pushdown_filter_enabled=true;SET SESSIONpresto:tpch> EXPLAIN (TYPE DISTRIBUTED) SELECT COUNT(shipdate) FROM lineitem WHERE shipdate BETWEEN DATE '1992-01-01' AND DATE '1992-12-31';Fragment 0 [SINGLE]    Output layout: [count]    Output partitioning: SINGLE []    Stage Execution Strategy: UNGROUPED_EXECUTION    - Output[_col0] => [count:bigint]            _col0 := count        - Aggregate(FINAL) => [count:bigint]                count := ""presto.default.count""((count_4))            - LocalExchange[SINGLE] () => [count_4:bigint]                - RemoteSource[1] => [count_4:bigint]
Fragment 1 [SOURCE] Output layout: [count_4] Output partitioning: SINGLE [] Stage Execution Strategy: UNGROUPED_EXECUTION - Aggregate(PARTIAL) => [count_4:bigint] count_4 := ""presto.default.count""((shipdate)) - TableScan[TableHandle {connectorId='hive', connectorHandle='HiveTableHandle{schemaName=tpch, tableName=lineitem, analyzePartitionValues=Optional.empty}', layout='Optional[tpch.lineitem{domains={shipdate=[ [[1992-01-01, 1992-12-31]] ]}}]'}, grouped = false] => [shipdate:date] Estimates: {rows: 540034112 (2.51GB), cpu: 2700170559.00, memory: 0.00, network: 0.00} LAYOUT: tpch.lineitem{domains={shipdate=[ [[1992-01-01, 1992-12-31]] ]}} shipdate := shipdate:date:10:REGULAR :: [[1992-01-01, 1992-12-31]]
复制代码

注意:查询计划的主要变化位于底部,即 TableScan 操作中包含了 shipdate 列。连接器已经接收到 shipdate 列上的谓词条件——取值介于 1992-01-01 和 1992-12-31 之间。如下图所示,该谓词被下推到连接器,免去了查询引擎过滤这些数据的必要性。

我们再一次运行这个查询!

presto:tpch> SELECT COUNT(shipdate) FROM lineitem WHERE shipdate BETWEEN DATE '1992-01-01' AND DATE '1992-12-31';  _col0  ---------- 76036301(1 row)
Query 20200609_154413_00023_ug2v4, FINISHED, 1 nodeSplits: 367 total, 367 done (100.00%)0:05 [76M rows, 928MB] [15.5M rows/s, 189MB/s]
复制代码

运行查询后,我们得到了相同的结果,但查询时间几乎缩短了一半,更重要的是,查询只扫描了 7600 万行!连接器已经将谓词应用于 shipdate 列,而不是让引擎来处理谓词,因此节省了 CPU 周期,继而加快了查询速度。针对不同的查询和数据集情况可能有所不同,但如果是通过 Hive 连接器查询 ORC 文件的场景,该方案绝对值得一试。

文章作者:Adam Shook

原文于 2020 年 6 月 15 日发表在作者的个人博客上:http://datacatessen.com

参考

  1. ^如需了解有关 Aria 项目功能的更多信息,可查看文章底部 https://engineering.fb.com/2019/06/10/data-infrastructure/aria-presto/

  2. ^[2]如需了解安装等详细信息,可查看文章底部文档 https://prestodb.io/docs/current/

发布于: 刚刚阅读数: 5
用户头像

Alluxio

关注

还未添加个人签名 2022.01.04 加入

Alluxio是全球首个面向基于云原生数据分析和人工智能的开源的资料编排技术!能够在跨集群、跨区域、跨国家的任何云中将数据更紧密地编排接近数据分析和AI/ML应用程序,从而向上层应用提供内存速度的数据访问。

评论

发布
暂无评论
Meta项目功能测试 | 开启PrestoDB和Aria扫描优化_hive_Alluxio_InfoQ写作社区