大数据培训:Flink 的提交模式
前言
这篇文章应先以了解 flink 组件为开始,再以简单模式 Local 和 Standlone 正式进入正题。本篇主要是以 Yarn 方式下三种模式展开细讲,当然还有 Kubernetes 方式(本篇不细说)。
组件
在了解提交模式之前,先了解一下 Flink 组件与组件之间的协作关系。
资源管理器(Resource Manager)
(1)主要负责管理任务管理器 TaskManager 的插槽 slot。
(2) 当作业管理器 JM 申请插槽资源时, RM 会将有空闲插槽的 TM 分配给 JM。如果 RM 没有足够的插槽来满足 JM 的请求。
(3)它还可以向资源提供平台发起会话,以提供启动 TM 进程的容器。
作业管理器(JobManager)
(1) 控制一个应用程序执行的主进程,也就是说,每个应用程序 都会被一个不同的 JM 所控制执行。
(2) JM 会先接收到要执行的应用程序,这个应用程序会包括:作业图(Job Graph)、逻辑数据流图( ogical dataflow graph)和打包了所有的类、库和其它资源的 JAR 包。
(3) JM 会把 Jobgraph 转换成一个物理层面的 数据流图,这个图被叫做 “执行图”(Executiongraph),包含了所有可以并发执行的任务。Job Manager 会向资源管理器( Resourcemanager)请求执行任务必要的资源,大数据培训也就是 任务管理器(Taskmanager)上的插槽 slot。一旦它获取到了足够的资源,就会将执行图分发到真正运行它们的 TM 上。而在运行过程中 JM 会负责所有需要中央协调的操作,比如说检查点(checkpoints)的协调。
任务管理器(Taskmanager)
(1) Flink 中的工作进程。通常在 Flink 中会有多个 TM 运行, 每个 TM 都包含了一定数量的插槽 slots。插槽的数量限制了 TM 能够执行的任务数量。
(2) 启动之后,TM 会向资源管理器注册它的插槽;收到资源管理器的指令后, TM 就会将一个或者多个插槽提供给 JM 调用。TM 就可以向插槽分配任务 tasks 来执行了。
(3) 在执行过程中, 一个 TM 可以跟其它运行同一应用程序的 TM 交换数据。
分发器(Dispatcher)
(1)可以跨作业运行,它为应用提交提供了 REST 接口。
(2)当一个应用被提交执行时,分发器就会启动并将应用移交给 JM。
(3)Dispatcher 他会启动一个 WebUi,用来方便地 展示和监控作业执行的信息。
Local 模式
JobManager 和 TaskManager 共用一个 JVM,只需要 jdk 支持,单节点运行,主要用来调试。
Standlone 模式
Standlone 是 Flink 自带的一个分布式集群,它不依赖其他的资源调度框架、不依赖 yarn 等。
充当 Master 角色的是 JobManager。
充当 Slave/Worker 角色是 TaskManager
配置与启动
(1)conf 目录下有两个文件:masters 和 workers 指定地址。
(2)需要配置 conf/flink-conf.yaml 的自行配置。
(3)分发各个机器。
(4)启动集群 bin/start-cluster.sh
(5)提交任务 flink run
Yarn 模式
首先认识下提交流程
(1)提交 App 之前,先上传 Flink 的 Jar 包和配置到 HDFS,以便 JobManager 和 TaskManager 共享 HDFS 的数据。
(2)客户端向 ResourceManager 提交 Job,ResouceManager 接到请求后,先分配 container 资源,然后通知 NodeManager 启动 ApplicationMaster。
(3)ApplicationMaster 会加载 HDFS 的配置,启动对应的 JobManager,然后 JobManager 会分析当前的作业图,将它转化成执行图(包含了所有可以并发执行的任务),从而知道当前需要的具体资源。
(4)接着,JobManager 会向 ResourceManager 申请资源,ResouceManager 接到请求后,继续分配 container 资源,然后通知 ApplictaionMaster 启动更多的 TaskManager(先分配好 container 资源,再启动 TaskManager)。container 在启动 TaskManager 时也会从 HDFS 加载数据。
(5)TaskManager 启动后,会向 JobManager 发送心跳包。JobManager 向 TaskManager 分配任务。
Session Mode
Session 模式提前初始化好一个集群,然后向这个集群提交应用。所有应用都在同一个集群中执行,共享资源。这里 JobManager 仅有一个。提交到这个集群的作业可以直接运行。如图所示
Session 模式共享 Dispatcher 和 ResourceManager,作业共享集群资源。
Session 多个作业之间又不是隔离的,如果有一个 TaskManager 挂掉,它上面承载着的所有作业也会失败。同样来说,启动的 Job 任务越多,JobManager 的负载也就越大。
所以,Session 模式适合生命周期短资源消耗低的场景。
提交
./bin/flink run -t yarn-session \
-Dyarn.application.id=application_XXXX_YY \
./examples/streaming/TopSpeedWindowing.jar
Per-Job Cluster Mode
在 Per-Job 模式下,每个提交到 YARN 上的作业会有单独的 Flink 集群,拥有专属的 JobManager 和 TaskManager。也即:一个作业一个集群,作业之间相互隔离。
以 Per-Job 模式提交作业的启动延迟可能会较高,因为不需要共享集群,所以在 PipelineExecutor 中执行作业提交的时候,创建集群并将 JobGraph 以及所需要的文件等一同提交给 Yarn 集群,进行一系列的初始化动作,这个时候需要些时间。提交任务的时候会把本地 flink 的所有 jar 包先上传到 hdfs 上相应的临时目录,这个也会带来大量的网络的开销。
优点就是作业之间的资源完全隔离,一个作业的 TaskManager 失败不会影响其他作业的运行,JobManager 的负载也是分散开来的,不存在单点问题。当作业运行完成,与它关联的集群也就被销毁,资源被释放。
所以,Per-Job 模式一般用来部署那些长时间运行的作业。
提交
./bin/flink run -t yarn-per-job --detached ./examples/streaming/TopSpeedWindowing.jar
「其他操作」
# List running job on the cluster
./bin/flink list -t yarn-per-job -Dyarn.application.id=application_XXXX_YY
# Cancel running job
./bin/flink cancel -t yarn-per-job -Dyarn.application.id=application_XXXX_YY <jobId>
Application Mode
Application 模式尝试去将 per-job 模式的资源隔离性和轻量级,可扩展的应用提交进程相结合。为了实现这个目的,它会每个 Job 创建一个集群,但是 应用的 main()将被在 JobManager 执行。
Application 模式为每个提交的应用程序创建一个集群,该集群可以看作是在特定应用程序的作业之间共享的会话集群,并在应用程序完成时终止。在这种体系结构中 Application 模式在不同应用之间提供了资源隔离和负载平衡保证
在 JobManager 中执行 main()方法,可以节省所需的 CPU 周期。还有个好处就是,由于每个应用程序有一个 JobManager,因此可以更平均地分散网络负载。
提交
./bin/flink run-application -t yarn-application ./examples/streaming/TopSpeedWindowing.jar
「其他操作」
# List running job on the cluster
./bin/flink list -t yarn-application -Dyarn.application.id=application_XXXX_YY
# Cancel running job
./bin/flink cancel -t yarn-application -Dyarn.application.id=application_XXXX_YY <jobId>
Application mode 中的多个 job,实际在代码上的表现就是能够允许在一个 Application 里面调用多次 execute/executeAsyc 方法。但是 execute 方法会被阻塞,也就是只有一个 job 完成之后才能继续下一个 job 的 execute,但是可以通过 executeAsync 进行异步非阻塞执行。
Yarn 模式总结
文章来源于大数据与机器学习文摘
评论