Flink 资源调度模型
作者:王刚,腾讯 CSIG 高级工程师
Flink 资源模型 / 调度设计背景知识
首先,我们来简单回顾一下 Flink 作业的运行时模型,然后再来探讨在这种运行模型下,Flink 的资源模型和调度架构的设计和实现。
我们引用官网非常经典的一张图,来说明一个 Flink 流作业简化后的运行视图。
Tasks 和 Operator Chains (部分译自官网)我们知道,一个 Flink 作业可以看做是由 Operators 组成的 DAG,一个 Operator 代表对数据流的进行的某个数据变化操作( Sources 和 Sinks 也是代表数据流流入和数据流流出的特殊 Operator )。在实际的分布式运行中,Flink 会把符合聚合规则的相邻 Operator 的 SubTask 聚合成 Tasks,每一个 Task 都会被单独的线程执行。这种把多个 Operator 的 SubTask 聚合成 Tasks 优化通常非常有效:能有效减少线程间切换(相比单独的每个 operator 的每个 subtask 占用一个线程)、数据缓存的成本,从而在降低数据处理延迟的同时增加系统的吞吐。
下图代表了数据流在 Operator Chain 之后,会实际产生 5 个 SubTask,相应的需要 5 个并发线程来处理该数据流。
在此,我们简要的区分下 Task 和 SubTask 的异同:
Task Task 是 Flink Runtime 的运行的基本单元,Task 封装了一个 Operator 或 Operator Chain 的某一并行实例 SubTask 一个 SubTask 是负责处理某一数据流的一部分的 Task,SubTask 术语强调对于同一个 Operator 或 Operator Chain 这里有多个并行的 Tasks。所以,一个 Flink 的作业,最终会转化为一个个 Task 在集群上运行。我们接下来从 Task 运行维度分析,一层层来看 Flink 的资源模型设计。
资源模型首先,我们介绍 Flink 基本的几个运行时概念。
Flink Job:
Flink 程序提交到 Flink Cluster 运行后,会生成一个或者多个 Flink jobs。根据上文的介绍,我们知道一个 Flink job 其实是数据流变换的运行时抽象。具体来讲,是由 operator 或者 operator-chain 组成的一个个 Task 进行数据处理的有向图。
Flink Cluster:
一个 Flink Cluster 一般是由一个 JobManager 和多个 TaskManager 组成的分布式系统。
JobManager:
JobManager 是 Flink Cluster 资源的编排器,负责协调 Flink Application 的分布式执行,具体职责有:
1) 决定什么时候调度下一个 Task
2)处理 Task 运行结束或者失败的情形
3)协调 Checkpoint 的触发和执行
4)协调 Flink Job 在发生失败时的恢复行为
5)其它情形。JobMananger 进程主要由三个不同的组件组成:
Flink ResourceManager
ResourceManager 负责 Flink Clutster 资源的分配和回收工作,它管理着 Flink Cluster 的基本资源调度单元 Task Slots。Flink 针对不同的资源环境和运行环境(YARN、Kubernetes 和 standalone 模式等),有不同的 ResourceManager 实现。
Flink Dispatcher
Dispatcher 提供了提交 Flink 应用的 REST 接口,并且负责为每一个提交的 Job 启动一个新的 JobMaster。另外,Dispatcher 还提供了用来查询 Job 运行状态的 Flink WebUI。
Flink JobMaster
JobMaster,顾名思义,负责管理一个单独的 Flink Job 的运行。多个 Flink Jobs 可以同时运行在一个 Flink Cluster 中,每一个 Flink Job 都会有一个对应的 JobMaster。
TaskManager:
TaskManager 负责执行组成 Job 的 Tasks,并且会负责数据流之间的数据转发和缓存工作。Flink 运行时,必须有至少一个 TaskManager。一个 Task Manager 可能会被划分成多个 Slots,Slot 是 TaskManager 资源的一个子集, 也是 TaskManager 中最小的资源调度单位,Slot 的概念贯彻了资源调度过程的始终。
下面引用官网的一段材料来说明 Task Slot 和 Task 运行之间的关系。
Task Slots 和资源(摘自官网)
每个 TaskManager 都是一个 JVM 进程,可以在单独的线程中执行一个或多个 SubTask。为了控制一个 TaskManager 中接受多少个 Task,就有了所谓的 Task Slots(至少一个)。
每个 Task Slot 代表 TaskManager 中资源的固定子集。例如,具有 3 个 Slot 的 TaskManager,会将其托管内存 1/3 用于每个 Slot。分配资源意味着 subtask 不会与其他作业的 subtask 竞争托管内存,而是具有一定数量的保留托管内存。注意此处没有 CPU 隔离;当前 Slot 仅分离 Task 的托管内存通过调整 Task Slot 的数量,用户可以定义 subtask 如何互相隔离。每个 TaskManager 有一个 Slot,这意味着每个 Task 组都在单独的 JVM 中运行(例如,可以在单独的容器中启动)。具有多个 Slot 意味着更多 subtask 共享同一 JVM。同一 JVM 中的 Task 共享 TCP 连接(通过多路复用)和心跳信息。它们还可以共享数据集和数据结构,从而减少了每个 Task 的开销。
默认情况下,Flink 允许 SubTask 共享 Slot,即便它们是不同的 Task 的 SubTask,只要是来自于同一作业即可。结果就是一个 Slot 可以持有整个作业管道。允许 Slot 共享有两个主要优点:
Flink 集群所需的 Task Slot 和作业中使用的最大并行度恰好一样。无需计算程序总共包含多少个 Task(具有不同并行度)。
容易获得更好的资源利用。如果没有 Slot 共享,非密集 subtask(source/map()) 将阻塞和密集型 subtask(window) 一样多的资源。通过 Slot 共享,我们示例中的基本并行度从 2 增加到 6,可以充分利用分配的资源,同时确保繁重的 subtask 在 TaskManager 之间公平分配。
调度模型该小结部分内容引自 深入解读 Flink 资源管理机制 [4]
概览 Flink 的资源调度是一个典型的两层模型。其中从 Cluster 到 Job 的分配过程是由 Slot Manager 来完成,Job 内部分配给 Task 资源的过程则是由 Scheduler 来完成。如下图,Scheduler 向 Slot Pool 发出 Slot Request(资源请求),Slot Pool 如果不能满足该资源需求则会进一步请求 Resource Manager,具体来满足该请求的组件是 Slot Manager。
Flink Cluster 到 Flink Job 资源调度过程如下图,Cluster 到 Job 的资源调度过程中主要包含两个过程。
Slot Allocation(下图红色箭头)Scheduler 向 Slot Pool 发送请求,如果 Slot 资源足够则直接分配,如果 Slot 资源不够,则由 Slot Pool 再向 Slot Manager 发送请求(此时即为 Job 向 Cluster 请求资源),如果 Slot Manager 判断集群当中有足够的资源可以满足需求,那么就会向 Task Manager 发送 Assign 指令,Task Manager 就会提供 Slot 给 Slot Pool,Slot Pool 再去满足 Scheduler 的资源请求。
Starting TaskManagers(下图蓝色箭头)在 Active Resource Manager 资源部署模式下,当 Resource Manager 判定 Flink Cluster 中没有足够的资源去满足需求时,它会进一步去底层的资源调度系统请求资源,由调度系统把新的 Task Manager 启动起来,并且 TaskManager 向 Resource Manager 注册,则完成了新 Slot 的补充。
Flink Job 到 Task 调度过程 JobMaster 中的 Scheduler 组件,会根据 Execution Graph 和 Task 的执行状态,决定接下来要调度的 Task。
我们已经知道 Flink 是通过 Task Slots 来定义执行资源。每个 TaskManager 有一到多个 task slot,每个 task slot 可以运行一条由多个并行 task 组成的流水线。这样一条流水线由多个连续的 task 组成,比如并行度为 n 的 MapFunction 和 并行度为 n 的 ReduceFunction。需要注意的是 Flink 经常并发执行连续的 task,不仅在流式作业中到处都是,在批量作业中也很常见。
下图很好的阐释了这一点,一个由数据源、MapFunction 和 ReduceFunction 组成的 Flink 作业,其中数据源和 MapFunction 的并行度为 4 ,ReduceFunction 的并行度为 3 。流水线由一系列的 Source - Map - Reduce 组成,运行在 2 个 TaskManager 组成的集群上,每个 TaskManager 包含 3 个 slot,整个作业的运行如下图所示。
Flink 内部通过 SlotSharingGroup [5] 和 CoLocationGroup [6] 来定义哪些 Task 可以共享一个 Slot, 哪些 Task 必须严格放到同一个 Slot。
注:本文主要内容收集整理自 Flink 官网和公开的技术博客
参考资料[1] Flink 官网--Flink Architecture:https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/concepts/flink-architecture/
[2] Flink 官网--Jobs and Scheduling:https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/internals/job_scheduling/
[3] Flink 官网--Task Lifecycle:https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/internals/task_lifecycle/
[4] 深入解读 Flink 资源管理机制:https://www.infoq.cn/article/tnq4vystluqfkqzczesa
[5] SlotSharingGroup:https://github.com/apache/flink/blob/release-1.15//flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroup.java
[6] CoLocationGroup:https://github.com/apache/flink/blob/release-1.15//flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationGroup.java
评论