MaxCompute 执行引擎核心技术 DAG 揭秘
作为业界少有的 EB 级别数据分布式平台,MaxCompute 系统每天支撑上千万个分布式作业的运行。在这个量级的作业数目上,毫无疑问平台需要支撑的作业特点也多种多样:既有在"阿里体量"的大数据生态中独有的包含数十万计算节点的超大型作业,也有中小规模的分布式作业。同时不同用户对于不同规模/特点的作业,在运行时间,资源使用效率,数据吞吐率等方面,也有着不同的期待。
Fig.1 MaxCompute 线上数据分析
基于作业的不同规模,当前 MaxCompute 平台提供了两种不同的运行模式,下表对于这两种模式做了总结对比:
Fig.2 离线(batch)模式 vs 一体化调度准实时(smode)模式
从上图可以看到,离线作业和一体化调度的准实时作业,在调度方式,数据传输,使用资源来源等多个方面,都有非常显著的区别。可以说,这两种运行方式分别代表了在海量数据场景上按需申请资源来优化吞吐量和资源利用率,以及在处理中等(少量)数据时通过计算节点的全量预拉起来(以及数据直传等手段加速)降低执行时延的两个极端。而这些区别,最终会通过执行时间和作业资源利用率等方面体现出来。很显然,以高 Throughput 为主要优化目标的离线模式,和以追求低 Latency 的准实时模式,在各方面的性能指标会有很大的区别。比如以 1TB-TPCH 标准 benchmark 为例,此报告执行时间(性能)和资源消耗两个维度来做比较。可以看到,准实时的(SMODE)在性能上有着非常明显的优势(2.3X),但是这样的性能提升也并不是没有代价的。在 TPCH 这个特定的场景上,一体化执行的 SMODE 模式,在获取了 2.3X 性能提升的同时,也消耗了 3.2X 的系统资源(cpu * time)。
Fig.3 性能/资源消耗比较:离线(batch)模式 vs 一体化调度准实时(smode)模式
这个观察结论其实并不意外,或者从某种程度上是 by design 的。拿下图一个典型 SQL 产生的 DAG 来看,所有计算节点都在作业提交伊始就被拉起,虽然这样的调度方式允许数据得以(在需要的时候)pipeline 起来,从而可能加速数据的处理。但并不是所有的执行计划里的所有上下游计算节点都可以有理想化的 pipelined dataflow。事实上对于许多作业而言,除了 DAG 的根节点(下图中的 M 节点)以外,下游的计算节点在某种程度上都存在着一定程度的浪费。
Fig.4 一体化调度准实时(smode)模式下,可能的资源使用低效
这种空转造成的资源使用的低效,在数据的处理流程上存在 barrier 算子而无法 pipeline,以及在 DAG 图比较深的情况下会尤为明显。当然对于希望极致优化作业运行时间的场景而言,通过更多的资源消耗,来获取极致的性能优化,在一些场景上是有其合理性的。 事实上,在一些 business-critical 的在线服务系统中,为了保证服务总是能迅速响应并处理峰值数据,平均个位数的 CPU 利用率也并非少见。但是对于计算平台这种量级的分布式系统,能否在极致性能以及高效的资源利用率之间,获取一个更好的平衡呢?
答案是肯定的。这就是我们在这里要介绍的混合计算模式:Bubble Execution
1. Bubble Execution 概述
DAG 框架的核心架构思想,在于对执行计划的逻辑层与物理层的清晰分层设计。物理执行图是通过对逻辑图中的节点、边等的物理特性(如数据传输介质,调度时机,资源特性等)的物化来实现的。对比在 Fig.2 中描述的 batch 模式和 smode 模式,DAG 提供了在一套灵活的调度执行框架之上,统一离线模式和准实时一体化执行模式的实现。如同下图所示,通过调整计算节点和数据连接边的不同物理特性,不仅能对现有的两种计算模式做清晰的表述,在对其进行更通用化的扩展后,还可以探索一种全新的混合运行模式,也就是 Bubble Execution。
Fig.5 DAG 框架上的多种计算模式
直观上来理解,如果我们把一个 Bubble 当作一个大的调度单位,Bubble 内部的资源一起申请运行,并且内部上下游节点的数据均通过网络/内存直连传输。与之相对的,Bubbles 之间连接边上的数据传输,则通过落盘方式来传输。那么离线和准实时作业执行,其实可以认为是 Bubble 执行的两个极端场景:离线模式可以认为是每个 stage 都单独作为 single-bubble 的特例,而准实时框架则是将作业所有计算节点都规划到一个大 Bubble 内部,来做一体化调度执行的另一个极端。DAG AM 已经将两种计算模式统一到一套调度执行 infra 之上。使得在两种模式上进行优点互补成为可能,为引入 Bubble Execution 奠定了基础。
Bubble Execution 通过灵活自适应的子图(Bubble)切割,在现有的两个极端之间,提供了一种选取更细粒度,更通用的调度执行方法,达到作业性能和资源利用率之间获取优化的 tradeoff 的方法。在根据输入数据量、算子特性、作业规模等信息进行分析后,DAG 的 Bubble 执行模式可以将一个离线作业切分出多个 Bubbles,在 Bubble 内部充分利用网络/内存直连和计算节点预热等方式提升性能。这种切分方式下,一个 DAG 运行图中的计算节点,可以都被切入某个 Bubble,根据所在 DAG 中的位置被切入不同 Bubbles,还可以完全不被切入任何 Bubble(依然以传统离线作业模式运行)。这种高度灵活的混合运行模式,使整个作业的运行能更加灵活的自适应线上多种多样作业的特点,在实际生产中具有重要的意义:
Bubble 模式使更多作业的加速成为可能:一体化调度的准实时作业具有基于整体规模(线上默认 2000)的"一刀切"式的准入条件。这一方面是出于有限资源的公平使用,另一方面也是为了控制节点 failure 带来的 cost。但对于中大型作业,虽然整体规模可能超过准入门限,但是其内部的不同子图,有可能是规模合适,且可以通过数据 pipeline 等方法来加速的。此外线上部分计算节点由于其本身的特性(比如包含 UDF 等用户逻辑需要安全沙箱),无法使用预热的准实时资源池执行,而当前非黑即白的模式,会使得一个作业中,只要包含一个这种计算节点,整个作业都无法使用加速模式执行。Bubble 模式能较好的解决这些问题。
Bubble 模式将 enable 线上两个资源池的打通:当前离线资源(cold)和准实时资源池(warm)作为两种特性不同的线上资源,完全隔离,各自管理。这种分离的现状,可能导致资源的浪费。比如对于大规模作业,因为完全无法利用准实时资源池,排队等待离线资源,而同时准实时资源池可能正处于空闲状态,反之亦然。Bubble 模式能通过在作业内部拉通不同资源的混合使用,使得两者各自补充,削峰填谷。
Bubble 模式可以整体上提高资源的利用率:从资源利用的角度来看,对于可以满足准实时模式准入的中型作业,由于准实时模式一体式调度拉起的运行模式,虽然运行速度能有所提升,但客观上会造成一定程度资源的空转与浪费(尤其是 DAG 图较深以及计算逻辑有 barrier 时)。这种情况下,按照节点数目,计算 barrier 等条件,将一体化模式拆解成多个 Bubble。这能够有效的减少节点大量的空转消耗,而且在拆分条件合理的情况下,性能方面的损失也可以做到较低。
Bubble 模式能有效降低单个计算节点 failure 带来的代价:一体化的准实时模式执行,由于其数据 pipeline 的特性,作业的容错粒度和其调度粒度是紧密挂钩的:都是 all-in-one。也就是说,只要有一个节点运行失败,整个作业都要重新运行。因为作业规模越大,运行过程中可能有节点失败的概率也就越大,这样的 failover 粒度无疑也限制了其能支持的最大作业规模。而 Bubble 模式则提供了一个更好的平衡点:单个计算节点的失败,最多只影响同处于一个 Bubble 的节点。此外 Bubble 模式对于各种 failover 做了细粒度的各种处理,我们将在下文描述。
我们可以通过标准的 TPCH-1TB 测试 benchmark 来直观评测 Bubble 执行模式的效果。在上层计算引擎(MaxCompute 优化器以及 runtime 等)保持不变,并且 Bubble 的大小维持在 500(具体 Bubble 切分规则下文介绍)时,做一下 Bubble 执行模式与标准离线模式,以及准实时模式,在性能(Latency) 以及资源消耗(cpu * time)两个方面的比较:
Fig.6.a 性能(Latency)比较:Bubble 模式 vs 离线(batch)模式 vs 一体化调度准实时(smode)模式
从运行时间来看,Bubble 模式显然要远优于离线模式(整体 2X 的性能提升),而较准实时的一体化调度模式而言,Bubble 的执行性能也并没有太明显的下降。当然在一些数据可以非常有效利用 pipeline 处理的 query(比如 Q5, Q8 等),准实时作业还是有一定的优势。但 SMODE 作业在执行时间上的优势并不是没有代价的,如果同时考虑资源消耗,在下图中,我们可以看到,准实时作业的性能提升是建立在资源消耗远远大于 Bubble 模式的前提之上的。而 Bubble 在性能远优于离线模式的同时,其资源消耗,则整体上是相近的。
Fig.6.b 资源消耗(cpu * time)比较:
Bubble 模式 vs 离线(batch)模式 vs 一体化调度准实时(smode)模式
综合起来看,Bubble Execution 可以很好的结合 batch 模式和准实时模式的优点:
在执行时间层面,对于 TPCH 测试集中的任意 query,bubble 模式的执行时间都比 batch 模式要短,整体上 22 个 Queries 总耗时缩减将近 2X,接近 service mode 模式的耗时;
在资源消耗层面,bubble 模式基本上和 batch 模式相当,相比于 service mode 模式有大幅度的减少,整体缩减 2.6X。
Fig.6.c Bubble 模式与离线/准实时模式的整体比较
值得说明的是,在上面的 TPCH Benchmark 比较中,我们把 Bubble 切分条件简单化了,也就是整体上之限制 bubble 的大小在 500,而没有充分考虑 barrier 等条件,如果在切分 bubble 的时候进一步调优,比如对于数据可以有效 pipeline 起来的节点,尽量保证切分在 bubble 内部,那作业的执行性能和资源利用率等方面都还可以进一步得到的提升,这是我们在实际生产系统上线过程中会注重考虑的。具体上线的效果见 Section 3。
在了解了 Bubble 执行模式的整体设计思想与架构后,接下来展开来讲一下具体 Bubble 模式的实现细节,以及将这种全新的混合执行模式推上线所需要的具体工作。
2. Bubble 的切分与执行
采用 Bubble Execution 的作业(以下简称 Bubble 作业)和传统的离线作业一样,会通过一个 DAG master(aka. Application Master)来管理整个 DAG 的生命周期。AM 负责对 DAG 进行合理的 bubble 切分,以及对应的资源申请和调度运行。整体而言,Bubble 内部的计算节点,将按照计算加速度原则,包括同时使用预拉起的计算节点以及数据传输通过内存/网络直传进行 pipeline 加速。而不切在 bubble 内部的计算节点则通过经典离线模式执行,不在 bubble 内部的连接边(包括横跨 bubble boundary 的边)上的数据,均通过落盘方式进行传输。
Fig.7 混合 Bubble 执行模式
Bubble 切分方法,决定了作业的执行时间和资源利用率。需要根据计算节点的并发规模,节点内部算子属性等信息综合考虑。而在切分出 bubble 之后,Bubble 的执行则涉及到节点的执行,与数据 pipeline/barrier 的 shuffle 方式怎么做到有机的结合,这里分开做一下描述。
2.1 Bubble 切分原理
Bubble Execution 的核心思想在于将一个离线作业拆分成多个 Bubble 来执行。为了切分出有利于作业整体高效运行的 bubble,有几个因素需要综合考虑:
计算节点内部算子特性:对于同时拉起 bubble 所有计算节点的调度模式而言,数据在 bubble 内部的上下游节点之间能否有效的进行 pipeline 处理,很大程度上决定了在 bubble 内部,下游节点是否会因处于空转状态带来资源浪费。所以在切分 bubble 的逻辑中,当节点包含 barrier 特性的算子而可能阻塞数据的 pipeline 时,将考虑不将该节点与其下游切入同一个 bubble。
单个 Bubble 内部计算节点数目的多少:如同之前讨论的,一体化的资源申请/运行,当包含的计算节点过多时,可能无法申请到资源,或者即使能申请到其 failure 代价也可能无法控制。限定 Bubble 的大小,可以避免过大的一体化运行带来的负面作用。
聚合计算节点,切割 Bubble 的迭代方向:考虑到 bubble 大小的限制,从上而下切分 bubble 与从下而上切分 bubble 两种方式,可能导致切分的结果的不同。对于线上大部分作业而言,处理的数据往往呈倒三角型,对应的 DAG 也大多数是倒三角形态,所以默认采用自底向上的算法来切割 bubble,也就是从距离 root vertex 最远的节点开始迭代。
在上述的几个因素中,算子的 barrier 属性由上层计算引擎(e.g., MaxCompute 的 optimizer)给出。一般而言,依赖 global sort 操作的算子(比如 MergeJoin, SorteAggregate 等),会被认为会造成数据阻塞(barrier),而基于 hash 特性操作的算子则对于 pipeline 更加友好。对于单个 Bubble 内部允许的计算节点数目,根据我们对线上准实时作业特点的分析和 Bubble 作业的实际灰度实验,选定的默认上限在 500。这是一个在大多数场景下比较合理的值,既能保证比较快速的拿到全量资源,同时由于处理数据量和 DoP 基本成正相关关系,这个规模的 bubble 一般也不会出现内存超限的问题。当然这些参数和配置,均允许作业级别通过配置进行微调,同时 Bubble 执行框架也会后继提供作业运行期间动态实时调整的能力。
在 DAG 的体系中,边连接的物理属性之一,就是边连接的上下游节点,是否有运行上的前后依赖关系。对于传统的离线模式,上下游先后运行,对应的是 sequential 的属性,我们称之为 sequential edge。而对于 bubble 内部的上下游节点,是同时调度同时运行的,我们称连接这样的上下游节点的边,为 concurrent edge。可以注意到,这种 concurrent/sequential 的物理属性,在 bubble 应用场景上,实际与数据的传送方式(网络/内存直传 vs 数据落盘)的物理属性是重合的(Note: 但这两种依然是分开的物理属性,比如在必要的时候 concurrent edge 上也可以通过数据落盘方式传送数据)。
基于这样的分层抽象,Bubble 切分算法,本质上就是尝试聚合 DAG 图的节点,将不满足 bubble 准入条件的 concurrent edge 还原成 sequential edge 的过程。最终,由 concurrent edge 联通的子图即为 bubble。在这里我们通过一个实际的例子来展示 Bubble 切分算法的工作原理。假设存在下图所示的 DAG 图,图中的圆圈表示计算顶点(vertex),每个圆圈中的数字表示该 vertex 对应的实际计算节点并发度。其中 V1 和 V3 因为在作业提交初始,就因为其内部包含 barrier 算子,而被标注成 barrier vertex。圆圈之间的连接线表示上下游的连接边(edge)。橙色线代表(初始)concurrent edge,黑色线代表 sequential edge,初始状态图中的 sequential edge 根据 barrier vertex 的输出边均为 sequential edge 的原则确定,其他边默认均初始化为 concurrent edge。
Fig.8 示例 DAG 图(初始状态)
在这个初始 DAG 基础上,按照上面介绍过的整体原则,以及本章节最后描述的一些实现细节,上图描述的初始状态,可以经过多轮算法迭代,最终产生如下的 Bubble 切分结果。在这个结果中产生了两个 Bubbles: Bubble#0 [V2, V4, V7, V8],Bubble#1 [V6, V10], 而其他的节点则被判断将使用离线模式运行。
Fig.9 示例 DAG 图 Bubble 切分结果
在上图的切分过程中,自底向上的遍历 vertex,并秉承如下原则:
若当前 vertex 不能加入 bubble,将其输入 edge 均还原为 sequential edge(比如 DAG 图中的 V9);
若当前 vertex 能够加入 bubble,执行广度优先遍历算法聚合生成 bubble,先检索输入 edge 连接的 vertex,再检索输出 edge 连接的,对于不能联通的 vertex,将 edge 还原为 sequential edge(比如 DAG 图中遍历 V2 的输出 vertex V5 时会因为 total task count 超过 500 触发 edge 还原)。
而对任意一个 vertex,只有当满足以下条件才能被添加到 bubble 中:
vertex 和当前 bubble 之间不存在 sequential edge 连接;
vertex 和当前 bubble 不存在循环依赖,即:
Case#1:该 vertex 的所有下游 vertex 中不存在某个 vertex 是当前 bubble 的上游;
Case#2:该 vertex 的所有上游 vertex 中不存在某个 vertex 是当前 bubble 的下游;
Case#3:该 vertex 的所有下游 bubble 中不存在某个 vertex 是当前 bubble 的上游;
Case#4:该 vertex 的所有上游 bubble 中不存在某个 vertex 是当前 bubble 的下游;
注:这里的上游/下游不仅仅代表当前 vertex 的直接后继/前驱,也包含间接后继/前驱
Fig.10 切分 Bubble 过程可能存在循环依赖的几种场景
而实际线上 bubble 的切分还会考虑到实际资源和预期运行时间等信息,比如计算节点的 plan memory 是否超过一定数值,计算节点中是否包含 UDF 算子,生产作业中计算节点基于历史信息(HBO)的预估执行时间是否超长,等等,这里不再赘述。
2.2 Bubble 的调度与执行
2.2.1 Bubble 调度
为了实现计算的加速,Bubble 内部的计算节点的来源默认均来自常驻的预热资源池,这一点与准实时执行框架相同。与此同时我们提供了灵活的可插拔性,在必要的情况下,允许 Bubble 计算节点从 Resource Manager 当场申请(可通过配置切换)。
从调度时机上来看,一个 Bubble 内部的节点调度策略与其对应的输入边特性相关,可以分成下面几种情况:
不存在任何 input edge 的 bubble root vertext(比如 Fig.9 中的 V2):作业一运行就被调度拉起。
只有 sequential edge 输入 bubble root vertex(比如 Fig.9 中的 V6):等待上游节点完成度达到配置的 min fraction 比例(默认为 100%,即所有上游节点完成)才被调度。
Bubble 内部的 vertex(即所有输入边都是 concurrent edge,比如 Fig.9 中的 V4, V8, V10),因为其完全是通过 concurrent edge 进行连接的,会自然的被与上游同时触发调度。
Bubble 边界上存在 mixed-inputs 的 bubble root vertex(比如 Fig.9 中的 V7)。这种情况需要一些特殊处理,虽然 V7 与 V4 是通过 concurrent edge 链接,但是由于 V7 的调度同时被 V3 通过 sequential edge 控制,所以事实上需要等待 V3 完成 min-fraction 后才能调度 V7。对于这种场景,可以将 V3 的 min-fraction 配置为较小(甚至 0)来提前触发;此外 Bubble 内部我们也提供了 progressive 调度的能力,对这种场景也会有帮助。
比如图 7 中的 Bubble#1,只有一条 SequentialEdge 外部依赖边,当 V2 完成后,就会触发 V6 + V10(通过 concurrent edge)的整体调度,从而将整个 Bubble#1 运行起来。
在 Bubble 被触发调度后,会直接向 SMODE Admin 申请资源,默认使用的是一体化 Gang-Scheduling(GS)的资源申请模式,在这种模式下,整个 Bubble 会构建一个 request,发送给 Admin。当 Admin 有足够的资源来满足这个申请时,会将,再包含预拉起 worker 信息的调度结果发送给 bubble 作业的 AM。
Fig.11 Bubble 与 Admin 之间的资源交互
为了同时支持紧张资源上以及 Bubble 内部动态调整的场景,Bubble 同时还支持 Progressive 的资源申请模式。这种模式允许 Bubble 内的每个 Vertex 独立申请资源和调度。对于这种申请,Admin 只要有增量的资源调度即会将结果发送给 AM,直到对应 Vertex 的 request 完全满足。对于这种场景上的独特应用这里暂时不做展开。
在准实时执行框架升级后,SMODE 服务中的资源管理(Admin)和多 DAG 作业管理逻辑(MultiJobManager)已经解耦,因此 bubble 模式中的资源申请逻辑,只需要和 Admin 进行交互,而不会对于正常准实时作业的 DAG 执行管理逻辑带来任何影响。另外,为了支持线上灰度热升级能力,Admin 管理的资源池中的每个常驻计算节点均通过 Agent+多 Labor 模式运行,在调度具体资源时,还会根据 AM 版本,进行 worker 版本的匹配,并调度满足条件的 labor 给 Bubble 作业。
2.2.2 Bubble 数据 Shuffle
对于穿越 Bubble bourndary 上的 sequential edge,其上传输的数据和普通离线作业相同,都是通过落盘的方式来进行数据传输。这里我们主要讨论在 Bubble 内部的数据传输方式。根据之前描述的作业 bubble 切分原则,bubble 内部的通常具备充分的数据 pipeline 特性,且数据量不大。因此对于 bubble 内部 concurrent edge 上的数据,均采用执行速度最快的网络/内存直传方式来进行 shuffle。
这其中网络 shuffle 的方式和经典的准实时作业相同,通过上游节点和下游节点之间建立 TCP 链接,进行网络直连发送数据。这种 push-based 的网络传送数据方式,要求上下游必须同时拉起,根据链式的依赖传递,这种网络 push 模式强依赖于 Gang-Scheduling,此外在容错,长尾规避等问题上也限制了 bubble 的灵活性。
为了更好的解决以上问题,在 Bubble 模式上,探索了内存 shuffle 模式。在这一模式下,上游节点将数据直接写到集群 ShuffleAgent(SA)的内存中,而下游节点则从 SA 中读取数据。内存 shuffle 模式的容错,扩展,包括在内存不够的时候将部分数据异步落盘保证更高的可用性等能力,由 ShuffleService 独立提供。这种模式可以同时支持 Gang-Scheduling/Progressive 两种调度模式,也使其具备了较强的可扩展性,比如可以通过 SA Locality 调度实现更多的 Local 数据读取,通过基于血缘的 instance level retry 实现粒度更精细的容错机制等等。
Fig.12 Network Shuffle VS Memory Shuffle
鉴于内存 shuffle 提供的诸多可扩展优势,这也是线上 Bubble 作业选用的默认 shuffle 方式,而网络直传则作为备选方案,允许在容错代价很小的超小规模作业上,通过配置使用。
2.3 Fault-Tolerance
作为一种全新的混合执行模式,Bubble 执行探索了在离线作业和一体化调度的准实时作业间的各种细粒度平衡。在线上复杂的集群中,运行过程中各种各样的失败在所难免。而 bubble 这种全新模式下,为了保证失败的影响最小,并在可靠性和作业性能之间取得最佳的平衡,其对于失败处理的策略也更加的多样化。
针对不同的异常问题,我们设计了各种针对性容错策略,通过各种从细到粗的力度,处理执行过程中可能涉及的各种异常场景处理,比如:向 admin 申请资源失败、bubble 中的 task 执行失败(bubble-rerun)、bubble 多次执行失败的回退(bubble-renew),执行过程中 AM 发生 failover 等等。
2.3.1 Bubble Rerun
目前 Bubble 在内部计算节点失败时,默认采用的 retry 策略是 rerun bubble。即当 bubble 内的某个节点的本次执行(attempt)失败,会立即 rerun 整个 bubble,取消正在执行的同一版本的 attempt。在归还资源的同时,触发 bubble 重新执行。通过这种方式,保证 bubble 内所有计算节点对应的(retry) attempt 版本一致。
触发 bubble rerun 的场景有很多,比较常见的有以下几种:
Instance Failed:计算节点执行失败,通常由上层引擎的 runtime 错误触发(比如抛出 retryable-exception)。
Resource Revoked:在线上生产环境,有很多种场景会导致资源节点重启。比如所在的机器整机 oom、机器被加黑等。在 worker 被杀之后,重启之后的 worker 会依照最初的启动参数重新连回 admin。此时,admin 会将这个 worker 重启的消息封装成 Resource Revoked 发送给对应的 AM,触发 bubble rerun。
Admin Failover: 由于 Bubble 作业所使用的计算资源来自于 SMODE 的 admin 资源池,当 admin 由于某些原因 Failover,或者 SMODE 整体服务被重启时,分配给 AM 的计算节点会被停止。Admin 在 Failover 之后不感知当前各个节点被分配的 AM 信息,无法将这些重启的消息发送给 AM。目前的处理方法是,每个 AM 订阅了 admin 对应的 nuwa,在 admin 重启之后会更新这个文件. AM 感知到信息更新后,会触发对应的 taskAttempt Failed,从而 rerun bubble。
Input Read Error:在计算节点执行时,读不到上游数据是一个很常见的错误,对于 bubble 来说,这个错误实际上有三种不同的类型:
Bubble 内的 InputReadError:由于 shuffle 数据源也在 bubble 内,在 rerun bubble 时,对应上游 task 也会重跑。不需要再做针对性的处理。
Bubble 边界处的 InputReadError: shuffle 数据源是上游离线 vertex(或也可能是另一个 bubble)中的 task 产生,InputReadError 会触发上游的 task 重跑,当前 bubble rerun 之后会被 delay 住,直到上游血缘(lineage)的新版本数据全部 ready 之后再触发调度。
Bubble 下游的 InputReadError: 如果 bubble 下游的 task 出现了 InputReadError,这个事件会触发 bubble 内的某个 task 重跑,此时由于该 task 依赖的内存 shuffle 数据已经被释放,会触发整个 bubble rerun。
2.3.2 Bubble Renew
在 Admin 资源紧张时, Bubble 从 Admin 的资源申请可能等因为等待而超时。在一些异常情况下,比如 bubble 申请资源时刚好 onlinejob 服务处于重启的间隔,也会出现申请资源失败的情况。在这种情况下,bubble 内所有 vertex 都将回退成纯离线 vertex 状态执行。此外对于 rerun 次数超过上限的 bubble,也会触发 bubble renew。在 bubble renew 发生后,其内部所有边都还原成 sequential edge,并在所有 vertex 重新初始化之后,通过回放内部所有调度状态机触发事件,重新以纯离线的方式触发这些 vertex 的内部状态转换。确保当前 bubble 内的所有 vertex 在回退后,均会以经典离线的模式执行,从而有效的保障了作业能够正常 terminated。
Fig. 13 Bubble Renew
2.3.3 Bubble AM Failover
对于正常的离线作业,在 DAG 框架中,每个计算节点相关的内部调度事件都会被持久化存储,方便做计算节点级别的增量 failover。但是对于 bubble 作业来说,如果在 bubble 执行过程发生了 AM failover 重启,通过存储事件的 replay 来恢复出的 bubble,有可能恢复到 running 的中间状态。然而由于内部 shuffle 数据可能存储在内存而丢失,恢复成中间 running 状态的 bubble 内未完成的计算节点,会因读取不到上游 shuffle 数据而立刻失败。
这本质上是因为在 Gang-Scheduled Bubble 的场景上,bubble 整体是作为 failover 的最小粒度存在的,所以一旦发生 AM 的 failover,恢复粒度也应该在 bubble 这个层面上。所以对于 bubble 相关的所有调度事件,在运行中都会被当作一个整体,同时当 bubble 开始和结束的时候分别刷出 bubbleStartedEvent 和 bubbleFInishedEvent。一个 bubble 所有相关的 events 在 failover 后恢复时会被作为一个整体,只有结尾的 bubbleFInishedEvent 才表示这个 bubble 可以被认为完全结束,否则将重跑整个 bubble。
比如在下图这个例子中,DAG 中包含两个 Bubble(Bubble#0: {V1, V2}, Bubble#1: {V3, V4}),在发生 AM 重启时,Bubble#0 已经 TERMINATED,并且写出 BubbleFinishedEvent。而 Bubble#1 中的 V3 也已经 Terminated,但是 V4 处于 Running 状态,整个 Bubble #1 并没有到达终态。AM recover 之后,V1,V2 会恢复为 Terminated 状态,而 Bubble#1 会重头开始执行。
Fig 14. AM Failover with Bubbles
3. 上线效果
当前 Bubble 模式已经在公共云全量上线,SQL 作业中 34%执行 Bubble,日均执行包含 176K 个 Bubble。
我们针对 signature 相同的 query 在 bubble execution 关闭和打开时进行对比,我们发现在整体的资源消耗基本不变的基础上,作业的执行性能提升了 34%,每秒处理的数据量提升了 54%。
Fig 15. 执行性能/资源消耗对比
除了整体的对比之外,我们针对 VIP 用户也进行了针对性的分析,用户 Project 在打开了 Bubble 开关之后(下图中红色标记的点为打开 Bubble 的时间点),作业的平均执行性能有非常明显的提升。
Fig 16. VIP 用户开启 Bubble 后平均执行时间对比
评论