Improvements of Job Scheduler and Query Execution on Flink OLAP
摘要:本文整理自字节跳动基础架构工程师方勇在 Flink Forward Asia 2021 核心技术专场的演讲。主要内容包括:
背景
问题和分析
调度执行优化
未来计划
一、背景
字节跳动的很多的业务方都有混合计算的需求,希望一套系统既支持 TP 计算,也支持 AP 计算。
上图是我们 HTAP 系统的总体架构。TP 侧使用我们内部自研的数据库作为 TP 计算引擎,AP 侧是用 Flink 作为 AP 的计算引擎。我们对外通过 MySQL 协议作为统一的入口,如果一个查询是 AP 计算,就会被转发到 Flink 的 Gateway 生成执行计划,然后提交到 Flink 引擎去执行计算。AP 侧有一个列式存储,Flink 引擎通过 Catalog 和 Connector 的接口,分别与存储端的元信息和存储层进行交互。AP 计算完成后,Client 端会向 Flink Gateway 发起 Proxy 数据的请求,然后由 Gateway 向 Flink 集群 Proxy 结果数据。至此,整个 AP 计算的查询交互和计算执行就完成了。
众所周知,Flink 是一个流批一体的计算引擎,既可以支持流式计算,也可以支持批式计算。为什么现在有很多系统选择使用 Flink 来做 OLAP 计算?
我们对比了 Flink 与 Presto 的差异,首先从架构上看,Flink 支持多种不同的部署模式,Flink 的 session 集群是一个非常典型的 MPP 架构,这是 Flink 可以支持 OLAP 计算的前提和基础。在 Flink 的计算执行上可以分为执行计划、作业 Runtime 管理、计算任务执行管理、集群部署和 Failover 管理 4 大部分。从 Presto 和 Flink OLAP 的架构以及功能模块图来看,两套系统在支持这些计算功能的具体实现上有很大的差异,但他们提供的系统能力和模块功能上是基本一致的。
所以 Flink 引擎在架构以及功能实现上,完全可以支持完整的 Flink OLAP 的计算需求。
在字节内部,Flink 最开始是被用作流式计算,后来由于 Flink 流批一体的计算能力,对于一些实时数仓场景我们也使用了 Flink 作为批式计算引擎。最终选用 Flink 作为 AP 计算引擎,主要基于三个方面的考虑:
第一,统一引擎降低运维成本。我们对 Flink 有非常丰富的运维和优化开发经验,在流批一体的基础上,使用 Flink 作为 AP 计算引擎可以降低开发和运维成本;
第二,生态支持。Flink 内部有很多存储系统,也有很多业务方使用 Flink SQL 来开发流式和批式作业,而我们内部的存储系统开发和对接了很多其他系统,所以用户使用 Flink 支持 OLAP 计算非常方便;
最后一个是性能优势。我们内部做过 TCP-DS 相关的基准测试 Benchmark,Flink 计算引擎相比 Presto 和 Spark SQL,在计算性能上并不逊色,并且在某些查询方面甚至是占优的。
二、问题和分析
首先介绍一下如何使用 Flink 做 OLAP 计算。
首先在接入层,我们使用 Flink SQL Gateway 作为接入层,提供 rest 协议直接接收 SQL 语句查询;架构上,是在 K8s 上拉起 Flink 的 session 集成,这是一个非常典型的 MPP 架构;计算模式上,我们使用 batch 模式加上计算全拉起的调度模式,减少了计算节点之间的数据落盘且能提升 OLAP 计算的性能。
在 Flink OLAP 计算过程中,主要存在以下几个问题:
首先,Flink OLAP 计算相比流式和批式计算,最大的特点是 Flink OLAP 计算是一个面向秒级和毫秒级的小作业,作业在启动过程中会频繁申请内存、网络以及磁盘资源,导致 Flink 集群内产生大量的资源碎片。
另一个 OLAP 最大的特点是查询作业对 latency 和 QPS 有要求的,需要保证作业在 latency 的前提下提供比较高的并发调度和执行能力,这就对 Flink 引擎提出了一个新的要求。
为了测试 Flink 执行 OLAP 计算的能力,我们对比了 Flink 作业调度的 Benchmark 相关测试。我们设计了三组不同复杂度的作业,分别是单节点的作业、两个节点的 wordcount 作业以及 6 个节点的 join 作业。每组作业计算节点并发度都是 128。
我们选取 5 台物理机启动一个 Flink session 集群,集群内有 1 万多个 slot,还实现了一个 Benchmarket Client 可以多线程并发提交作业,然后统计 10 分钟之内完成的作业的数量以及完成作业的平均 latency。
结果如下图所示。
先分析 QPS 的结果:
单节点作业,client 单线程的时候 QPS 是 7.81;线程数是 4 的时候,已经到达了 QPS 极限 17 左右;
Wordcount 两节点作业, client 单线程的时候,QPS 是 1.38;线程数是 32 的时候,QPS 是 7.53;
Join 作业的表现是最差的,client 单线程的时候,QPS 只有 0.44;线程数增加到 32 时,QPS 也只有 2.17。
再看一下 latency 的表现:
client 线程数增加时,单作业的 latency 从 100 多毫秒提交增加到 2 秒;Wordcount 作业的从 700 多毫秒增加到了 4 秒;Join 的作业从 2 秒增加到了 15 秒多,有数倍的增长。
这样的作业调度性能在线上使用过程中是不可接受的。
针对 Flink 并发作业调度的性能问题,我们也尝试针对一些性能的瓶颈点进行简单的优化,但效果并不理想。所以我们针对 Flink 作业的调度执行全链路进行分析,将 Flink 作业的执行分为作业管理、资源申请、计算任务三个主要的阶段,然后对每个阶段进行相应的性能优化和改进。
三、调度执行优化
3.1 作业管理优化
首先是作业管理优化。Flink 通过 Dispatcher 模块接收和管理作业,整个作业的执行过程可以分为 4 个步骤:初始化、作业执行准备、启动作业执行、结束作业执行。
Dispatcher 内部有 3 个线程池负责执行作业的 4 个步骤,分别是 Netty/Rest、Dispatcher Actor 以及 Akka 线程池。根据测试和分析:
Netty/Rest 线程池默认大小太小;
Dispatcher Actor 单点处理且执行了一些非常重量级的作业操作;
此外,Akka 线程池太繁忙,不仅要负责 dispatcher 内的作业管理,还负责了很多 jobmaster 类作业的具体执行以及 resource manager 中的资源管理。
针对上述问题,我们分别进行了相应的优化。加大了 Netty/Rest 线程池的大小,对作业进行拆解并创建了两个独立的线程池:IO 线程池和 Store 线程池,分别负责执行作业管理过程中比较重量级的操作,减轻 Dispatcher Actor 和 Akka 线程池的工作压力。
一个作业具体执行过程中会有很多定时任务,包括作业模块间的超时检查\心跳检查,作业资源申请过程中的超时检查等。在 Flink 目前的实现里,这些超时任务会被放到 Akka 线程池里,由 Akka 线程池调度和执行。即使一个作业已经结束,也没办法直接回收和释放,这会使 Akka 线程池里缓存的定时任务非常多,导致 JobManager 节点产生大量的 fullGC,JobManager 进程有 90% 左右的内存被这些定时任务占用。
针对这个问题我们也进行了相关优化,在每一个作业启动的时候都为它创建一个作业级别的本地线程池,作业相关的定时任务会先提交到本地线程池,当这些任务需要被真正执行的时候,本地线程池会将他们发送到 Akka 线程池直接执行。作业结束后会直接被释放,快速进行定时任务的回收。
3.2 资源申请优化
字节跳动公司内部目前使用的是 Flink1.11 版本,Flink 资源申请主要是基于 slot 维度,我们使用全拉起的作业调度模式,所以作业会等待 slot 资源全部申请完成之后才会进行计算任务调度。比如 resource manager 有 4 个 slot,现在有两个作业并发申请资源,每个作业都需要三个 slot,如果它们都只申请到两个 slot,就会导致两个作业相互等待 slot 资源而产生死锁。
针对这个问题,我们选择将 slot 粒度的资源申请优化为作业 batch 粒度的申请。这里的 batch 资源申请主要有两个难点:
一个是跟原先 slot 粒度的资源申请怎么做兼容,因为有很多机制是基于 slot 粒度的,比如资源申请的超时,这两块机制需要进行无缝融合;
第二是 batch 资源申请的事务性,我们需要保证一个 batch 内的资源是同时申请到或者同时释放的,如果有异常情况,这些资源申请就需要同时取消。
3.3 任务执行优化
3.3.1 作业间连接复用
首先是连接复用的问题,Flink 上下游计算任务通过 channel 传输数据,在一个 Flink 的作业内部,相同计算节点的网络连接是可以复用的,但是不同作业间的网络连接无法复用。一个作业所有的计算任务结束之后,它在 manager 之间的网络连接会被关闭并且释放。而另外一个作业执行计算的时候,TaskManager 需要创建新的网络连接,会出现作业任务执行过程中频繁创建和关闭连接,最终影响计算任务和查询作业的 latency 以及 QPS,同时在过程也会导致资源使用的不稳定,增加 CPU 的使用率以及 CPU 使用过程中的波峰波谷。
多作业复用在 TaskManager 的网络连接里主要存在以下几个难点:
第一,稳定性的问题,channel 不仅用来做数据传输,而且还跟计算任务的反压相关,所以复用连接可能会导致计算任务饿死以及死锁等问题;
第二,脏数据的问题,不同的作业复用计算连接有可能引起计算任务在执行过程中会产生脏数据;
第三,网络连接的膨胀和回收问题,对于不再使用的网络连接,我们需要及时探测并且关闭、释放资源。
我们实现 Flink 作业间的连接复用,主要方案是在 TaskManager 里面增加一个 net 连接池,作业需要创建载体连接时会向连接池发起请求,连接池会根据需要创建或复用已经存在的连接,完成计算后,计算任务会向连接池释放连接。
为了保证系统的稳定性,Flink 现有的作业内的连接使用机制是保持不变的,每个 net 连接有三个状态,分别是 idle、busy 以及 invalid。连接池会管理计算网络连接的三个状态,还支持根据需要来创建网络连接,然后发送增加校验,同时会回收网络连接,后台的定时任务会 check 连接状态。
3.3.2 PartitionRequest 优化
第二块是 partition request 优化,主要分为两个方面:batch 优化和通知机制的优化。
一个作业内上下游计算任务创建连接后,下游的计算任务会向上游发送一个 partition request 消息,告诉上游任务需要接收哪些 partition 数据的信息。 Partition request 的消息最大的问题是消息量太大,是上下游计算节点并发度的平方量级。
batch 优化的主要目的是将相同 TaskManager 内上下游计算任务间的 partition request 的消息数量进行打包处理,降低 partition request 的量级。比如在一个计算节点 100 并发的情况下,两个 TaskManager partition request 数量可以从原先的 10000 降低到现在的 4,由并发的平方降为 TaskManager 数量的平方,改善非常明显。
由于上下游的计算任务是并行部署的,所以会存在下游计算任务部署完成之后,上游的计算任务还没有开始部署的情况。当下游的计算任务向上游发送一个 partition request 的时候,上游的 TaskManager 会返回一个 partition not found 的异常,下游的计算任务根据这个异常会不断重试和轮询,直到请求完成。
这个过程存在两个问题,一个是 partition request 数量过多,另外一个是下游的计算任务在轮询重试的过程中有时间差,导致计算任务的 latency 加大。所以我们为上下游计算任务交互实现了一个 listen+notify 机制。上游的 TaskManager 接受到下游计算任务发送的 partition request 时,如果上游的计算任务还未部署,则会将 partition request 放入到一个 listen 列表里面,计算任务部署完成再从计算队列里面获取 partition request,并且回调执行完成整个交互。
3.3.3 网络内存池优化
最后一块是网络内存优化。TaskManager 启动后,会预先分配一块内存作为网络内存池,计算任务在 TaskManager 部署时会从网络内存池里分配一个本地内存池,并加入到网络内存池列表。计算任务创建本地内存池后,申请内存分片以及释放本地内存池等所有操作时,网络内存池都会遍历本地内存池列表,TaskManager 里面并行执行的计算任务很多时,这个遍历的次数会非常大,是 slot 的数量乘以上游并发度的数量,甚至会达到千万量级。
遍历的主要目的是提前释放其他本地内存池里面空闲的内存分片,提升内存的使用率。我们的主要优化是将这块遍历操作删去,虽然这会造成一部分的内存浪费,但能够极大地提升计算任务的执行性。
此外,我们还做了很多其他的优化和改造。包括计算调度方面,我们支持实现全拉起和 block 结合的调度模式;在执行计划方面,我们优化和实现了很多计算下推,将计算下推到存储去执行;在任务执行方面,我们针对任务的拉起和初始化都做了很多相关优化和实现。
3.4 Benchmark
上图是优化的效果 Benchmark。
QPS 方面,针对单节点计算 QPS 从原先的 17 提升到现在 33,Wordcount 两节点从最高的 7.5 提升到 20 左右,join 计算从原先最高的 2 左右提升到现在 11 左右,效果非常明显。
latency 方面提升效果也非常不错,32 个线程下,单节点作业的 latency 从原先的 1.8 秒降低到 200 毫秒左右,Wordcount 两节点作业从 4 秒降低为 2 秒不到,最明显的是 join 作业,从原先的 15 秒降为 2.5 秒左右,提升的幅度非常大。
四、未来计划
现在 Flink OLAP 虽然已经投入实际业务场景中使用,但我觉得还只是从 0 走到了 1。未来,我们希望做得更完善,从 1 走到 100。Flink OLAP 系统的指标主要可以分为三块:稳定性、性能和功能。
稳定性方面,首先要提升单点的稳定性问题,包括资源管理单点以及作业管理单点。其次,提升运行时的资源使用以及计算线程等管理,还有 OLAP 计算结果的优化管理。此外还有很多其他稳定性相关的优化。
性能方面,包括计划优化、细粒度计算任务执行和管理优化,还有面向行合列的计算优化等,都可以极大提升 Flink OLAP 计算的性能。
功能方面,我们希望持续完善产品化建设,包括 history server 的持续完善和建设。另一方面我们会更加完善 web 分析工具,帮助业务方更好地定位在查询过程中发现的问题。
更多 Flink 相关技术问题,可扫码加入社区钉钉交流群第一时间获取最新技术文章和社区动态,请关注公众号~
版权声明: 本文为 InfoQ 作者【Apache Flink】的原创文章。
原文链接:【http://xie.infoq.cn/article/c3a5d5906051a804431c54e78】。文章转载请联系作者。
评论