写点什么

【论文分享】Presto: SQL on Everything(二)

用户头像
小舰
关注
发布于: 22 小时前

4.查询优化:

那接下来需要怎样的处理呢?Coordinator 的查询优化器会对逻辑计划进行优化。Coordinator 将一系列的优化策略(例如剪枝操作、谓词下推、条件下推等)应用于与逻辑计划的各个子计划,从而将逻辑计划转换成更加适合物理执行的结构,形成更加高效的执行策略。Presto 已经提供了基于统计信息的优化器,优化器可以通过 API 提供一些表和列的统计信息来帮助制定合适的 join 策略等。下面具体来说说优化器在几个方面所做的工作:

1)自适应:Presto 的 Connector 可以通过 Data Layout API 提供数据的物理分布信息(例如数据的位置、分区、排序、分组以及索引等属性),如果一个表有多种不同的数据存储分布方式,Connector 也可以将所有的数据布局全部返回,这样 Presto 优化器就可以根据 query 的特点来选择最高效的数据分布来读取数据并进行处理。举个例子,在 OLAP 场景中,可能优先会从列存格式的数据源中读取数据。

2)谓词下推:谓词下推是一个应用非常普遍的优化方式,就是将一些条件或者列尽可能的下推到叶子结点,最终将这些交给数据源去执行,从而可以大大减少计算引擎和数据源之间的 I/O,提高效率。

图 4 图 3 的逻辑计划进一步转换后的执行计划(未进行)

3)节点间并行:不同 stage 之间的数据 shuffle 会带来很大的内存和 CPU 开销,因此,将 shuffle 数优化到最小是一个非常重要的目标。围绕这个目标,Presto 可以借助一下两类信息:

*数据布局信息:上面我们提到的数据物理分布信息同样可以用在这里以减少 shuffle 数。例如,如果进行 join 连接的两个表的字段同属于分区字段,则可以将连接操作在在各个节点分别进行,从而可以大大减少数据的 shuffle。再比如两个表的连接键加了索引,可以考虑采用嵌套循环的连接策略。

4)节点内并行:优化器通过在节点内部使用多线程的方式来提高节点内对并行度,而且节点内并行由于具有更小的延迟会比节点间并行效率更高。节点内并行对以下两种场景会有比较大的性能提升。

*交互式分析:交互式查询的负载大部分是一次执行的短查询,查询负载一般不会经过优化,这就会导致数据倾斜的现象时有发生。典型的表现为少量的节点被分到了大量的数据。

*批量 ETL:这类的查询特点是任务会不加过滤的从叶子结点拉取大量的数据到上层节点进行转换操作,致使上层节点压力非常大。

针对以上两种场景遇到的问题,节点内的多线程机制可以在一定程度上缓解并发瓶颈。这样,引擎就可以通过多线程来运行单个操作符序列(或 pipeline),如图 5 所示的,pipeline1 和 2 通过多线程并行执行来加速 build 端的 hash-join。

图 5 pipeline1 和 2 通过多线程并行执行来加速 build 端的 hash-join

当然,除了上述列举的 Presto 优化器已经实现的优化策略,Presto 也正在积极探索 Cascades framework(一个基于代价估算的优化器),相信未来优化器会得到进一步的改进。

5.查询调度:

Presto 通过 Coordinator 将 stage 以 task 的形式分发到 worker 节点,coordinator 将 task 以 stage 为单位进行串联,通过将不同 stage 按照先后执行顺序串联成一棵执行树,确保数据流能够顺着 stage 进行流动。

Presto 引擎处理一条查询需要进行两套调度,第一套是如何调度 stage 的执行顺序,第二套是判断每个 stage 有多少需要调度的 task 以及每个 task 应该分发到哪个 worker 节点上进行处理。

(1)stage 调度

Presto 支持两种 stage 调度策略:All-at-once 和 Phased 两种。All-at- once 策略针对所有的 stage 进行统一调度,不管 stage 之间的数据流顺序,只要该 stage 里的 task 数据准备好了就可以进行处理;Phased 策略是需要以 stage 调度的有向图为依据按序执行,只要前序任务执行完毕开会开始后续任务的调度执行。例如一个 hash-join 操作,在 hash 表没有准备好之前,Presto 不会调度 left side 表。

(2)task 调度

在进行 task 调度的时候,调度器会首先区分 task 所在的 stage 是哪一类 stage:Leaf Stage 和 intermediate stage。Leaf Stage 负责通过 Connector 从数据源读取数据,intermediate stage 负责处理来此其他上游 stage 的中间结果;

1)leaf stages:在分发 leaf stages 中的 task 到 worker 节点的时候需要考虑网络和 connector 的限制。例如蚕蛹 shared- nothing 部署的时候,worker 节点和存储是同地协作,这时候调度器就可以根据 connector data Layout API 来决定将 task 分发到哪些 worker 节点。资料表明在一个生产集群大部分的 CPU 消耗都是花费在了对从 connector 读取到的数据的解压缩、编码、过滤以及转换等操作上,因此对于此类操作,要尽可能的提高并行度,调动所有的 worker 节点来并行处理。

2)intermediate stages:这里的 task 原则上可以被分发到任意的 worker 节点,但是 Presto 引擎仍然需要考虑每个 stage 的 task 数量,这也会取决于一些相关配置,当然,有时候引擎也可以在运行的时候动态改变 task 数。

(3)split 调度

当 Leaf stage 中的一个 task 在一个工作节点开始执行的时候,它会收到一个或多个 split 分片,不同 connector 的 split 分片所包含的信息也不一样,最简单的比如一个分片会包含该分片 IP 以及该分片相对于整个文件的偏移量。对于 Redis 这类的键值数据库,一个分片可能包含表信息、键值格式以及要查询的主机列表。Leaf stage 中的 task 必须分配一个或多个 split 才能够运行,而 intermediate stage 中的 task 则不需要。

1)split 分配

当 task 任务分配到各个工作节点后,coordinator 就开始给每个 task 分配 split 了。Presto 引擎要求 Connector 将小批量的 split 以懒加载的方式分配给 task。这是一个非常好的特点,会有如下几个方面的优点:

a)解耦时间:将前期的 split 准备工作与实际的查询执行时间分开;

b)减少不必要的数据加载:有时候一个查询可能刚出结果但是还没完全查询完就被取消了,或者会通过一些 limit 条件限制查询到部分数据就结束了,这样的懒加载方式可以很好的避免过多加载数据;

c)维护 split 队列:工作节点会为分配到工作进程的 split 维护一个队列,Coordinator 会将新的 split 分配给具有最短队列的 task,Coordinator 分给最短的。

d)减少元数据维护:这种方式可以避免在查询的时候将所有元数据都维护在内存中,例如对于 Hive Connector 来讲,处理 Hive 查询的时候可能会产生百万级的 split,这样就很容易把 Coordinator 的内存给打满。

当然,这种方式也不是没有缺点,他的缺点是可能会导致难以准确估计和报告查询进度。

6.查询执行

(1)本地数据流动

一旦一个 split 被分配给线程,它就会被 driver loop 执行。Presto 的 driver loog 比递归迭代器 Volcano 模型更复杂,但提供了重要的功能。driver loop 处理的是以页(page)为单位的数据单元,当 Connector 数据源 API 收到一个 split 的时候会返回一个页(page),然后 operators 会将这个 page 进行计算等操作,然后输出这个 page(图 6 展示了内存中 page 的具体结构),driver loop 会不断的讲 page 在 operator 算子之间“搬运”来进行 page 的处理。

图 6 一个 page 里的不同块类型

(2)Shuffle

Presto 被设计成最小化端到端延迟以及最大化资源利用,它的节点间数据流动机制就反映了这个设计。Presto 用 http 协议来 shuffle 交换缓冲区内的中间结果。task 产生的数据存储缓冲区供其他的节点消费,其他节点通过 http 协议的长轮询机制来请求中间结果。服务器会保存数据,直到客户端带着上一次请求获得的 token 请求下一个数据片段。

Presto 引擎会通过调整并发度来平衡输入/输出缓冲区的利用率。它会持续监控输出缓冲区,当使用持续保持高位时,会通过减少 split 的数量来压低并发度,这有利于提高网络资源共享的公平性。

(3)写

对于 ETL 任务,经常需要讲产生的数据写入另外的表中,那么影响向远端存储写性能的一个很重要的因素就是就是并发度,例如通过 Connector Data Sink API 进行写数据的总的线程数。那自然就会出现一个新的问题:如果写的并发太大,每一个并发都会创建一个新文件,那不可避免会出现小文件过多的问题,另一方面,如果将并发调小,又会影响吞吐量,是的任务运行效率降低。因此,Presto 再次采取了一种自适应的方法,通过设置一个缓冲区监控的阀值,一旦某个 stage 写入缓冲区的数据超过了这个阀值,引擎就会在更多的节点上启动新的任务来动态增加写并发。这种方式对于写任务繁重的批量 ETL 任务非常有效。

7.资源管理

Presto 适用于多租户部署的一个很重要的因素就是它完全整合了细粒度资源管理系统。一个单集群可以并发执行上百条查询以及最大化的利用 CPU、IO 和内存资源。

(1)CPU 调度

Presto 首要任务是优化所有集群的吞吐量,例如在处理数据是的 CPU 总利用量。本地(节点级别)调度又为低成本的计算任务的周转时间优化到更低,以及对于具有相似 CPU 需求的任务采取 CPU 公平调度策略。一个 task 的资源使用是这个线程下所有 split 的执行时间的累计,为了最小化协调时间,Presto 的 CPU 使用最小单位为 task 级别并且进行节点本地调度。

Presto 通过在每个节点并发调度任务来实现多租户,并且使用合作的多任务模型。任何一个 split 任务在一个运行线程中只能占中最大 1 秒钟时长,超时之后就要放弃该线程重新回到队列。如果该任务的缓冲区满了或者 OOM 了,即使还没有到达占用时间也会被切换至另一个任务,从而最大化 CPU 资源的利用。

当一个 split 离开了运行线程,Presto 需要去定哪一个 task(包含一个或多个 split)排在下一位运行。

Presto 通过合计每个 task 任务的总 CPU 使用时间,从而将他们分到五个不同等级的队列而不是仅仅通过提前预测一个新的查询所需的时间的方式。如果累积的 Cpu 使用时间越多,那么它的分层会越高。Presto 会为每一个曾分配一定的 CPU 总占用时间。

调度器也会自适应的处理一些情况,如果一个操作占用超时,调度器会记录他实际占用线程的时长,并且会临时减少它接下来的执行次数。这种方式有利于处理多种多样的查询类型。给一些低耗时的任务更高的优先级,这也符合低耗时任务往往期望尽快处理完成,而高耗时的任务对时间敏感性低的实际。

(2)内存管理

在像 Presto 这样的多租户系统中,内存是主要的资源管理挑战之一。

1)内存池

在 Presto 中,内存被分成用户内存和系统内存,这两种内存被保存在内存池中。用户内存是指用户可以仅根据系统的基本知识或输入数据进行推理的内存使用情况(例如,聚合的内存使用与其基数成比例)。另一方面,系统内存是实现决策(例如 shuffle 缓冲区)的副产品,可能与查询和输入数据量无关。换句话说,用户内存是与任务运行有关的,我们可以通过自己的程序推算出来运行时会用到的内存,系统内存可能更多的是一些不可变的。

Presto 引擎对单独对用户内存和总的内存(用户+系统)进行不同的规则限制,如果一个查询超过了全局总内存或者单个节点内存限制,这个查询将会被杀掉。当一个节点的内存耗尽时,该查询的预留内存会因为任务停止而被阻塞。

有时候,集群的内存可能会因为数据倾斜等原因造成内存不能充分利用,那么 Presto 提供了两种机制来缓解这种问题--溢写和保留池。

2)溢写

当某一个节点内存用完的时候,引擎会启动内存回收程序,现将执行的任务序列进行升序排序,然后找到合适的 task 任务进行内存回收(也就是将状态进行溢写磁盘),知道有足够的内存来提供给任务序列的后一个请求。

3)预留池

如果集群的没有配置溢写策略,那么当一个节点内存用完或者没有可回收的内存的时候,预留内存机制就来解除集群阻塞了。这种策略下,查询内存池被进一步分成了两个池:普通池和预留池。这样当一个查询把普通池的内存资源用完之后,会得到所有节点的预留池内存资源的继续加持,这样这个查询的内存资源使用量就是普通池资源和预留池资源的加和。为了避免死锁,一个集群中同一时间只有一个查询可以使用预留池资源,其他的任务的预留池资源申请会被阻塞。这在某种情况下是优点浪费,集群可以考虑配置一下去杀死这个查询而不是阻塞大部分节点。

8.容错

Presto 可以对一些临时的报错采用低级别的重试来恢复,然而,截止 2018 年还并没有任何有效的内嵌容错机制来解决 coordinator 或者 worker 节点坏掉的情况。Presto 依靠的是客户端的自动重跑失败查询。

在 Facebook 的产品应用中,也是根据不同的场景来采取不同的可用性模式。交互式查询和批量 ETL 会使用主备 Coordinator 的方式,A/B Test 会采用多活集群的模式。同时辅以外部的监控系统将一些频繁产生错误的节点及时移出集群,修复好的节点重新加入集群。这些方式只能从不同程度上降低不可用风险,但是并不能解决。

标准检查点或者部分修复技术是计算代价比较高的,而且很难在这种一旦结果可用就返回给客户端(即时查询类)的系统中实现。基于复制的容错机制会造成较大的资源消耗,考虑到成本,这些技术的期望值并不明确,特别是考虑到节点平均故障时间、集群大小约 1000 个节点规模,统计数据显示大多数查询在几个小时内完成(包括批处理 ETL)。

当然,Presto 也在积极的提高对长查询的容错机制,例如评估添加可选检查点和限制重新启动到执行计划的子树,这些子树可能不会以流水线方式运行。

四、总结

以上是 Facebook 从一条 SQL 语句在 Presto 引擎被处理的整个生命周期的角度进行的总结,通过这篇文章我们可以大致了解 Presto 的整个运行机制了。英文阅读水平有限,如有歧义欢迎交流~

参考文献:

[1] Raghav Sethi, Martin Traverso, Dain Sundstrom, David Phillips, Wenlei Xie, Yutian Sun, Nezih Yegitbasi, Haozhun Jin, Eric Hwang, Nileema Shingte, Christopher Berner: Presto: SQL on Everything. ICDE 2019: 1802-1813

[2] https://analytics.facebook.com


可以关注我

发布于: 22 小时前阅读数: 10
用户头像

小舰

关注

公众号:DLab数据实验室 2020.11.12 加入

中国人民大学硕士

评论 (1 条评论)

发布
用户头像
作者该系列(一)传送门:https://xie.infoq.cn/article/d3df5e510feebb7f954ca81c4
15 小时前
回复
没有更多了
【论文分享】Presto: SQL on Everything(二)