Flink Job 概览
介绍 Flink 是如何调度 Job,以及如何在 JobManager 上维护并跟踪 Job 状态。
Job 调度
Flink 通过任务槽(Task slot)定义执行资源,每个 TaskManager 都有一或多个 Task Slot,每个 Task Slot 都可以运行并行 Task 的一个 pipeline。pipeline 包括多个连续的任务,参照下图说明,由一个 Data source、一个 MapFunction 和一个 ReduceFunction 组成的程序,Data source 和 MapFunction 的并发度都为 4,ReduceFunction 的并发度为 3。一个 pipeline 由 Source-Map-Reduce 组成,在具有 2 个 TaskManager,每个 TaskManager 有 3 个 Task slot 的集群上运行,程序执行情况下:
说明如下:
TaskManager 1 和 TaskManager 上,分别有 2 个 pipeline,各占用一个 Task slot
4 个 pipeline 是并行执行的
Flink 内通过 SlotSharingGroup 和 CoLocationGroup 来定义哪些任务共享一个 Task slot,哪些任务必须严格的放到一个 Task slot 中。
JobManager 数据结构
Job 执行期间,JobManager 会持续跟踪分布式任务,决定什么时候调度下一个 Task(或者一组 Task),并且对已完成的或执行失败的 Task 进行响应。
JobManager 接收 JobGraph,JobGraph 是数据流的表现形式,包括算子(JobVertex)和中间结果(IntermediateDataSet)。每个算子都有并行度和执行代码等属性。此外,JobGraph 还拥有一些在算子执行代码时所需要的附加库。
JobManager 将 JobGraph 转换为 ExecutionGraph ,ExecutionGraph 是 JobGraph 的并行版本:每个 JobVertex 包含并行 SubTask 的 ExecutionVertex 。一个并行度为 100 的算子将拥有一个 JobVertex 和 100 个 ExecutionVertex。ExecutionVertex 会跟踪特定 SubTask 的执行状态。来自一个 JobVertex 的所有 ExecutionVertex 都由一个 ExecutionJobVertex 管理保存,ExecutionJobVertex 跟踪算子整体状态。除了各个节点之外,ExecutionGraph 还包括了 IntermediateResult 和 IntermediateResultPartition,前者跟踪中间结果的状态,后者跟踪中间结果每个分区的状态。
Job 状态
每个 ExecutionGraph 都有一个与其相关联的 Job Status,指示 Job 执行的当前状态。
一个 Flink Job 状态机首先处于创建状态(created),然后切换到运行状态(running),并且在完成所有工作后,切换到完成状态(finished)。在失败的情况下,状态机首先切换到失败状态(failing),取消所有正在运行 Task。如果所有 JobVertex 都已达到最终状态(参考 Vertex 状态),并且 Job 不可重新启动,则状态机将转换为失败(failed)。如果 Job 可以重新启动,那么状态机将进入重新启动状态(restarting)。一旦完成重新启动,状态机将变成创建状态(created)。
在用户取消 Job 的情况下,将进入取消状态(cancelling),会取消所有当前正在运行的 Task 。一旦所有运行的 Task 已经达到最终状态(参考 Vertex 状态),job 状态机将转换到已取消状态(canceled)。
完成状态(finished),取消状态(canceled)和失败状态(failed)表示一个全局的终结状态,并且触发清理工作,而暂停状态(suspended)仅处于本地终止状态。意味着 job 的执行在相应的 JobManager 上终止,但集群的另一个 JobManager 可以从持久化的高可用存储中恢复这个 Job 并重新启动。因此,处于暂停状态(suspended)的 Job 将不会被完全清理。
Vertex 状态
在执行 ExecutionGraph 期间,每个并行 Task 经过多个阶段,从创建(created)到完成(finished)或失败(failed),下图说明了它们之间的状态和可能的转换。任务可以执行多次(例如故障恢复)。每个 Execution 跟踪一个 ExecutionVertex 的执行,每个 ExecutionVertex 都有一个当前 Execution(current execution)和一个前置 Execution(prior execution)。
Flink 执行图
在 Flink 中的执行图可以分为四层:StreamGraph -> JobGraph -> ExecutionGraph -> 物理执行图
StreamGraph:Stream API 编写的代码生成的最初的图。用来表示程序的拓扑结构。可以调用
env.getExecutionPlan()
输出 Json 串,将该 JSON 串粘贴到 http://flink.apache.org/visualizer/ 可视化该执行图。JobGraph:StreamGraph 经过优化后生成了 JobGraph,提交给 JobManager 的数据结构。主要的优化为,将多个符合条件的节点链接(chain)在一起作为一个节点,减少数据在节点之间流动所需要的序列化/反序列化/传输消耗。
ExecutionGraph:JobManager 根据 JobGraph 生成 ExecutionGraph。ExecutionGraph 是 JobGraph 的并行化版本,是调度层最核心的数据结构。
物理执行图:JobManager 根据 ExecutionGraph 对 Job 进行调度后,在各个 TaskManager 上部署 Task 后形成的“图”,并不是一个具体的数据结构。
四层执行图的演变过程如下图所示(来源:Flink 原理与实现:架构和拓扑概览):
[1] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/internals/job_scheduling/
[2] http://wuchong.me/blog/2016/05/03/flink-internals-overview/
版权声明: 本文为 InfoQ 作者【Alex🐒】的原创文章。
原文链接:【http://xie.infoq.cn/article/b7687e6be271d74a93549dff7】。
本文遵守【CC-BY 4.0】协议,转载请保留原文出处及本版权声明。
评论