写点什么

配置 Flink 流式应用 (九)

发布于: 刚刚
配置Flink流式应用(九)

写在前面:

大家好,我是强哥,一个热爱分享的技术狂。目前已有 12 年大数据与 AI 相关项目经验, 10 年推荐系统研究及实践经验。平时喜欢读书、暴走和写作。

业余时间专注于输出大数据、AI 等相关文章,目前已经输出了 40 万字的推荐系统系列精品文章,强哥的畅销书「构建企业级推荐系统:算法、工程实现与案例分析」已经出版,需要提升可以私信我呀。如果这些文章能够帮助你快速入门,实现职场升职加薪,我将不胜欢喜。

想要获得更多免费学习资料或内推信息,一定要看到文章最后喔。

内推信息

如果你正在看相关的招聘信息,请加我微信:liuq4360,我这里有很多内推资源等着你,欢迎投递简历。

免费学习资料

如果你想获得更多免费的学习资料,请关注同名公众号【数据与智能】,输入“资料”即可!

学习交流群

如果你想找到组织,和大家一起学习成长,交流经验,也可以加入我们的学习成长群。群里有老司机带你飞,另有小哥哥、小姐姐等你来勾搭!加小姐姐微信:epsila,她会带你入群。


今天的数据基础设施是非常多样化的,像 Apache Flink 这样的分布式数据处理框架的出现和发展显得非常有必要,通过 Flink 可以很方便地与资源管理器、文件系统和分布式协调服务等许多组件进行交互。

 

在本章中,我们会就 Flink 集群的多种部署模式进行讨论,以及如何安全地配置它们使得它们具有高可用性。我们针对不同 Hadoop 版本和文件系统的 Flink 设置进行了阐述,并讨论了 Flink master 进程和 worker 进程最重要的配置参数。通过阅读本章之后,你将了解到如何对部署好的 Flin 集群进行设置和配置。

部署模式

Flink 可以部署在不同的环境中,比如本地机器、裸机集群、Hadoop yarn 集群或 Kubernetes 集群。在“Flink 组件配置”这一部分内容中,我们介绍了 Flink 集群的不同组件构成,其中包括以下的组件:JobManager、TaskManager、ResourceManager 和 Dispatcher。在本节中,我们将解释如何在不同的环境中进行配置和启动 Flink,包括独立集群、Docker、Apache Hadoop Yarn 和 Kubernetes,以及针对 Flink 集群的各个组件做配置。

Standalone 集群

Standalone Flink 集群包括至少一个 master 进程和至少一个 TaskManager 进程组成,这两个进程可以运行在在一台或多台机器上。所有进程都作为常规的 Java JVM 进程运行。图 9-1 显示了一个 Standalone Flink 设置。


master 会启动并运行一个 Dispatcher 线程和一个 ResourceManager 线程。一旦两个线程开始运行,TaskManagers 就会向 ResourceManager 进行注册。图 9-2 显示了如何将作业提交到 standalone 集群。

在独立集群部署模式中,如果一旦出现单点故障,则会引起服务不可用,这种情况下,master 服务器和 worker 服务器不会自动重新启动。如果集群中有足够数量的资源 slots 可用,则作业可以从 worker 故障中进行恢复。可以通过运行一个或多个备用 worker 来保证故障恢复。从 master 故障中恢复作业进行高可用配置,即 HA,本章后面将对此进行讨论。

 

为了构建一个 standalone 的 Flink 集群,我们可以从 Apache Flink 网站下载一个二进制发行版,并使用以下命令解压 tar 包:

tar xfz ./flink-1.7.1-bin-scala_2.12.tgz

解压后的目录包括一个./bin 文件夹,其中包含用于启动和停止 Flink 进程的 bash 脚本。通过 ./bin/start-cluster.sh 脚本在本地机器上启动一个 master 进程,同时在本地机器或远程机器上启动一个或多个 TaskManagers 进程。

Flink 默认配置为本地运行,在本地机器上启动单个 master 和单个 TaskManager。Flink 的运行需要依赖 JVM 环境,启动脚本必须能够启动 Java 进程。如果 java 二进制文件没有在 PATH 中,可以通过设置 JAVA_HOME 环境变量或在./conf/flink-conf.yaml 中设置 env.java.home 参数。接着通过执行 ./bin/start-cluster.sh 脚本启动本地 Flink 集群。你可以通过 http://localhost:8081 访问 Flink 的 Web UI 界面,并检查存活的 TaskManagers 数量和可用 slots 的数量。

为了在多台机器上启动并运行分布式 Flink 集群,你需要调整 flink 的默认配置并完成如下几个步骤。

 需要在./conf/slaves 配置文件中列出运行 taskmanager 的所有机器的主机名(或 IP 地址)。

 需要在所有机器上进行无密码认证的 SSH 配置(集群节点之间免密登录),以便能够 start-cluster.sh 脚本启动 TaskManager 进程。

 Flink 安装包所在文件夹必须位于所有计算机上同一路径下。一种常见的方法是使用每个机器上的 Flink 安装包挂载一个 networkshared 目录。

 需要在./conf/flinkconf. yaml 文件中配置运行 master 进程的机器的主机名(或 IP 地址),配置 key 为:jobmanager.rpc.address。

一旦上面提到的配置都设置好了,那么你就可以通过执行./bin/start-cluster.sh 脚本来启动 Flink 集群。该脚本将启动一个本地 JobManager,并为 slaves 文件中的服务器映射启动一个 TaskManager。通过访问运行 master 进程的机器上的 Web UI,你可以检查 master 进程是否已启动,并且看到集群中所有 TaskManager 是否已成功注册。最后你可以通过执行 ./bin/stopcluster.sh 脚本来停止本地或分布式 standalone 集群。

 

Docker

Docker 是一个用于在容器中打包和运行应用程序的流行平台。Docker 容器由主机系统的操作系统内核运行,因此比虚拟机更轻量级。此外,它们是隔离的,仅通过定义良好的通道进行通信。容器是从定义容器中的软件的镜像开始的。

Flink 社区的成员为 Apache Flink 配置和构建 Docker 镜像,并将它们上传到 Docker Hub,一个 Docker 镜像的公共存储库,存储库承载了最新版本 Flink 的 Docker 镜像。在 Docker 中运行 Flink 是在本地机器上设置 Flink 集群的一种简单方法。对于一个本地 Docker 设置,你必须启动两种类型的容器,一个运行 Dispatcher 和 ResourceManager 的主容器以及运行 TaskManager 以及一个或多个运行任务管理器的工作容器。这些容器像独立部署一样协同工作(请参阅独立集群)。启动后,TaskManager 会在 ResourceManager 上注册自己。当一个作业提交给 Dispatcher 时,它会产生一个 JobManager 线程,该线程从 ResourceManager 请求处理 slot。ResourceManager 将 TaskManagers 分配给 JobManager, JobManager 在所有所需资源可用时部署作业。

Master 和 worker 容器从相同的 Docker 镜像启动,使用不同的参数,如下面例子 9-1 所示。

// start master process

docker run -d --name flink-jobmanager \

-e JOB_MANAGER_RPC_ADDRESS=jobmanager \

-p 8081:8081 flink:1.7 jobmanager

 

// start worker process (adjust the name to start more than one TM)

docker run -d --name flink-taskmanager-1 \

--link flink-jobmanager:jobmanager \

-e JOB_MANAGER_RPC_ADDRESS=jobmanager flink:1.7 taskmanager

例 9.1:在 Docker 中启动 master 和 worker 容器

 

第一个命令的-p 8081:8081 参数将主容器的端口 8081 映射到主机的端口 8081,以便从主机访问 Web UI。你可以通过在浏览器中打开 http://localhost:8081 来访问 Web UI。Web UI 可用于上传应用程序 JAR 文件和运行应用程序。该端口还公开了 Flink 的 REST API。因此,你也可以在/bin/ Flink 上使用 Flink 的 CLI 客户端提交应用程序,管理正在运行的应用程序,或者请求集群或正在运行的应用程序的信息。

                                  注意

请注意,目前无法将自定义配置传递到 Flink Docker 映像中。如果要调整某些参数,则需要构建自己的 Docker 映像。Docker Flink 映像的构建脚本是定制映像的良好起点。

 

除了手动启动两个(或更多)容器,还可以创建 Docker Compose 配置脚本,该脚本自动启动并配置在 Docker 容器中运行的 Flink 群集,可能还有 ZooKeeper 和 Kafka 等其他服务。我们将不深入讨论此模式的细节,但 Docker Compose 配置需要指定网络配置,以便在隔离容器中运行的 Flink 进程可以相互通信。有关详细信息,请参阅 Apache Flink 的文档。

 

Apache Hadoop YARN

YARN 是 Apache Hadoop 的资源管理组件。它负责管理集群环境的计算资源——集群机器的 cpu 和内存,并将它们提供给请求资源的应用程序。YARN 以 container 的形式提供资源,container 分布在集群中,应用程序在 container 中运行它们的进程。由于 YARN 起源于 Hadoop 生态系统,所以 YARN 通常会被数据处理框架使用。

 

Flink 在 YARN 上运行有两种模式:job(作业)模式和 session(会话)模式。在 job 模式下,将启动一个 Flink 集群来运行单个作业。作业终止后,将停止 Flink 集群并释放所有占用的资源。图 9-3 显示了 Flink 作业是如何提交到 YARN 集群。

1.客户端提交作业执行时,它首先会连接到 YARN 资源管理器(ResourceManager)

2.启动一个新的 Yarn Application Master 进程,该进程包括了一个 JobManager 线程和一个 ResourceManager。

3.JobManager 向 ResourceManager 请求所需的 slots 资源来运行 Flink 作业。

4.Flink 的 ResourceManager 向 YARN 的 ResourceManager 请求容器资源。

5.并启动 TaskManager 进程。

6.一旦启动,TaskManagers 就会在 Flink 的 ResourceManager 上注册它们的 slots 信息。

7.Flink 的 ResourceManager 将它们提供给 JobManager。

8.JobManager 将作业的任务提交给 TaskManagers 执行。

会话模式则会启动一个长时间运行的 Flink 集群,该集群可以运行多个作业,并且需要手动停止。如果以会话模式启动,Flink 连接到 YARN 的 ResourceManager 来启动一个 Application Master,它会运行一个 Dispatcher 线程和一个 Flink ResourceManager 线程。空闲的 Flink YARN 会话设置如图 9-4 所示。

当作业提交执行时,Dispatcher 启动一个 JobManager 线程,该线程从 Flink 的 ResourceManager 请求 slots。如果没有足够的 slots 可用,Flink 的 ResourceManager 请求来自 YARN ResourceManager 的额外容器来启动 TaskManager 进程,这些进程在 Flink ResourceManager 上注册。一旦有足够的 slots 可用,Flink 的 ResourceManager 就会将它们分配给 JobManager,然后开始执行作业。图 9-5 显示了在 Flink 的 YARN 会话模式下如何执行作业。

对于 Flink 的两种运行模式,作业模式和会话模式,失败的 TaskManagers 将由 Flink 的 ResourceManager 自动重新启动。你可以使用在./conf/flink-conf.yaml 中的几个参数用于控制 Flink 在 YARN 上的恢复行为。例如,你可以配置在应用程序终止之前允许失败的容器的最大数量。为了从主节点发生故障时进行恢复,需要按照后面的部分中描述的那样配置高可用的设置。

无论是以作业模式还是以会话模式在 Yarn 上运行 Flink,它都需要访问 Hadoop 依赖项对应的版本以及 Hadoop 配置的路径。“与 Hadoop 组件的集成”章节详细描述了所需的配置。

给定一个运行中的和配置良好的 YARN 和 HDFS 设置,可以在 Flink 的命令行客户端使用以下命令提交 Flink 作业在 YARN 上执行:

./bin/flink run -m yarn-cluster ./path/to/job.jar

参数-m 定义提交作业到哪个主机。如果设置为关键字 yarn-cluster,那么客户端会将作业提交给 Hadoop 配置的 YARN 集群。Flink 的 CLI 客户端可以支持更多的参数,比如控制 TaskManager 容器内存的能力。有关可用参数的参考资料,请参阅文档。已经启动的 Flink 集群的 Web UI 由在 YARN 集群的某个节点上运行的 master 进程提供服务。你可以通过 YARN 的 Web UI 进行访问,它在“Tracking URL:ApplicationMaster”下的“应用程序概览”页上提供了一个链接。

Flink YARN 会话由./bin/yarnsession.sh 脚本启动,该脚本也可以使用各种参数来控制容器的大小、YARN 应用程序的名称以及其他动态属性。默认情况下,脚本会打印会话集群的连接信息,并且不返回。当脚本终止时,会话将被停止,所有资源将被释放。也可以使用-d 符合以分离模式启动 YARN 会话。可以使用 YARN 的应用程序工具终止一个分离的 Flink 会话。

当 Flink YARN 会话运行时,你可以使用./bin/flink  run ./path/to/job.jar 命令向会话提交作业。

 

注意,你不需要提供连接信息,因为 Flink 记住了在 YARN 上运行的 Flink 会话的连接细节。与作业模式类似,Flink 的 Web UI 也会链接到 YARN 的 Web UI 的应用概览页面。

Kubernetes

Kubernetes 是一个开源平台,它使用户能够在分布式环境中部署和扩展容器化的应用程序,给定 Kubernetes 集群和打包到容器镜像中的应用程序,你可以创建应用程序的部署,告诉 Kubernetes 启动多少个应用程序实例。Kubernetes 将利用其自身资源运行所请求的容器数,并在发生故障时重新启动它们。Kubernetes 还可以负责为内部和外部通信打开网络端口,并可以为进程发现和负载平衡提供服务。


Kubernetes 在内部部署,在云环境中运行,或在混合基础设施上运行。

在 Kubernetes 上部署数据处理框架和应用程序已经变得非常流行。Apache Flink 也可以部署在 Kubernetes 上。在深入了解如何在 Kubernetes 上部署 Flink 的细节之前,我们需要简要解释一些 Kubernetes 的术语:

 pod 是由 Kubernetes 管理和启动的容器

 deployment 定义了指定数量的 pod 或者 container 去运行

 Kubernetes 可能在集群的任何地方运行一个 pod。当 pod 在发生故障后或出现 deployment 扩缩容时重新启动,这时 IP 地址可能会发生更改。这显然是一个问题,如果 pod 之间需要进行互相通信。因此 Kubernetes 提供 service 来克服这个问题。service 定义了如何访问某一组 pod 的策略。当一个 pod 在集群中的其他节点上启动时,它负责更新路由。


  在本地机器上运行 Kubernetes

Kubernetes 是为集群操作而设计的。然而,Kubernetes 项目提供了 Minikube,一个在一台机器上本地运行单节点 Kubernetes 集群进行测试、开发的环境。如果你想

尝试在 Kubernetes 上运行 Flink 并且手头没有 Kubernetes 集群,我们建议设置 Minikube。                    

为了让部署在 Minikube 上的 Flink 集群上的应用程序成功运行,需要在部署 Flink 之前运行以下命令:

minikube ssh 'sudo ip link set docker0 promisc on'.

Kubernetes 的 Flink 设置定义为两个 deployment,一个用于运行主进程的 pod,另一个用于 worker 进程 pod。还有一个 service 将 master  Pod 的端口暴露给 worker  Pod。两种类型的 pod——master 和 worker,这两者的行为就像我们之前描述的 standalone 或 Docker 部署过程。示例 9-2 显示了 master deployment 的配置。

apiVersion: extensions/v1beta1

kind: Deployment

metadata:

name: flink-master

spec:

replicas: 1

template:

metadata:

labels:

app: flink

component: master

spec:

containers:

- name: master

image: flink:1.7

args:

- jobmanager

ports:

- containerPort: 6123

name: rpc

- containerPort: 6124

name: blob

- containerPort: 6125

name: query

- containerPort: 8081

name: ui

env:

- name: JOB_MANAGER_RPC_ADDRESS

value: flink-master

例 9.2:Flink master 的 kubernetes 部署

该 deployment 指定了运行单个 master 容器,副本数为 1,master 容器通过 Flink 1.7 Docker 镜像(镜像:flink:1.7)启动,带有启动主进程的参数(args:-jobmanager)。此外,deployment 配置容器的哪些端口要打开以进行 RPC 通信、blob 管理器(用于交换大文件)、可查询状态服务器以及 Web UI 和 REST 接口。 示例 9-3 显示了 worker  Pod 的 deployment。

apiVersion: extensions/v1beta1

kind: Deployment

metadata:

name: flink-worker

spec:

replicas: 2

template:

metadata:

labels:

app: flink

component: worker

spec:

containers:

- name: worker

image: flink:1.7

args:

- taskmanager

ports:

- containerPort: 6121

name: data

- containerPort: 6122

name: rpc

- containerPort: 6125

name: query

env:

- name: JOB_MANAGER_RPC_ADDRESS

value: flink-master

例 9.3:2 个 Flink worker 的 kubernetes 部署

worker  deployment 看起来几乎与 master deployment 相同,但有一些不同。 首先,worker deployment 指定了两个副本,这意味着启动了两个 worker 容器。 worker 容器基于相同的 Flink Docker 镜像,但以不同的参数(参数:-taskmanager)启动。而且,deployment 还开放了几个端口,并传递了 Flink master 部署的服务名称,以便 worker 可以访问 master。 暴露主进程并使其可供工作容器访问的服务定义如例 9-4 所示。

apiVersion: v1

kind: Service

metadata:

name: flink-master

spec:

ports:

- name: rpc

port: 6123

- name: blob

port: 6124

- name: query

port: 6125

- name: ui

port: 8081

selector:

app: flink

component: master

例 9.4:Flink mster 的 kubernetes 服务

你可以通过将每个定义存储在单独的文件中来为 Kubernetes 创建 Flink deployment,例如 master-deployment.yaml、worker-deployment.yaml 或 master-service.yaml。 这些文件也在我们的存储库中提供。 获得定义文件后,你可以使用 kubectl 命令将它们注册到 Kubernetes:

kubectl create -f master-deployment.yaml

kubectl create -f worker-deployment.yaml

kubectl create -f master-service.yaml

运行这些命令时,Kubernetes 开始部署请求的容器。 你可以通过运行以下命令来显示所有部署的状态:

kubectl get deployments

首次创建 deployment 时,需要一段时间用于下载 Flink 容器镜像。 一旦所有 pod 都启动了,你将拥有一个在 Kubernetes 上运行的 Flink 集群。 但是,使用给定的配置,Kubernetes 不会将任何端口暴露到外部环境。 因此,你无法访问主容器来提交应用程序或访问 Web UI。 你首先需要告诉 Kubernetes 创建从主容器到本地机器的端口转发。 这时你可以通过运行以下命令来完成:

kubectl port-forward deployment/flink-master 8081:8081

当端口转发运行时,你可以通过 http://localhost:8081 访问 Web UI。

现在你可以将作业上传和提交到在 Kubernetes 上运行的 Flink 集群。 此外,你可以使用 Flink CLI 客户端(./bin/flink)提交应用程序并访问 REST 接口以请求有关 Flink 集群的信息或管理正在运行的应用程序。

当工作 Pod 发生故障时,Kubernetes 将自动重启故障的 Pod,应用程序将被恢复(假设检查点已激活并正确配置)。 为了从主 Pod 故障中恢复,你需要配置高可用设置。

你可以通过运行以下命令关闭在 Kubernetes 上运行的 Flink 集群:

kubectl delete -f master-deployment.yaml

kubectl delete -f worker-deployment.yaml

kubectl delete -f master-service.yaml

你无法使用我们在本节中使用的 Flink Docker 镜像自定义 Flink deployment 的配置。 你需要使用调整后的配置构建自定义 Docker 镜像。 所提供镜像的构建脚本是自定义镜像的良好起点。

 

高可用设置

在理想情况下,大多数流应用程序连续执行,停机时间尽可能短。因此,许多应用程序必须能够从执行中涉及的任何进程的故障中自动恢复。虽然 worker 故障由 ResourceManager 处理,但 JobManager 组件的故障需要配置高可用 (HA) 设置。

 

Flink 的 JobManager 保存有关应用程序及其执行的元数据,例如应用程序 JAR 文件、JobGraph 和指向已完成检查点的指针。 如果主设备发生故障,则需要恢复此信息。 Flink 的 HA 模式依赖于 Apache ZooKeeper,一种用于分布式协调和一致存储的服务,以及一个持久性远程存储,例如 HDFS、NFS 或 S3。 JobManager 将所有相关数据存储在持久存储中,并将指向信息的指针(存储路径)写入 ZooKeeper。 如果出现故障,新的 JobManager 会从 ZooKeeper 查找指针并从持久存储加载元数据。

 

我们在“高可用设置”中更详细地介绍了 Flink HA 设置的操作模式和内部结构。 在本节中,我们将为不同的部署选项配置此模式。

 

Flink HA 设置需要一个正在运行的 Apache ZooKeeper 集群和一个持久性远程存储,例如 HDFS、NFS 或 S3。 为了帮助用户快速启动 ZooKeeper 集群以进行测试,Flink 提供了一个用于引导的帮助脚本。 首先需要通过调整./conf/zoo.cfg 文件来配置集群中涉及的所有 ZooKeeper 进程的 hosts 和 ports。 完成后,你可以调用 ./bin/start-zookeeper-quorum.sh 在每个配置的节点上启动 ZooKeeper 进程。

注意

你不应该在生产环境中使用 Flink 的 ZooKeeper 脚本,而应该自己配置和部署一个 ZooKeeper 集群。

 

Flink HA 模式是在./conf/flink-conf.yaml 文件中配置的。通过设置参数,如例 9-5 所示。

# REQUIRED: enable HA mode via ZooKeeper

high-availability: zookeeper

# REQUIRED: provide a list of all ZooKeeper servers of the

quorum

high-availability.zookeeper.quorum:

address1:2181[,...],addressX:2181

# REQUIRED: set storage location for job metadata in remote

storage

high-availability.storageDir: hdfs:///flink/recovery

# RECOMMENDED: set the base path for all Flink clusters in

ZooKeeper.

# Isolates Flink from other frameworks using the ZooKeeper

cluster.

high-availability.zookeeper.path.root: /flink

例 9.5:Flink cluster HA 配置

独立集群的 HA 设置

Flink 独立部署不依赖于外部资源管理器,例如 YARN 或 Kubernetes。 所有进程都是手动启动的,并且没有任何组件来监视这些进程并在出现故障时重新启动它们。 因此,独立的 Flink 集群需要备用的 Dispatcher 和 TaskManager 进程,可以接管失败进程的工作。

除了启动备用 TaskManager 之外,独立部署不需要额外的配置即可从 TaskManager 故障中恢复。 所有启动的 TaskManager 进程都在活动的 ResourceManager 中注册自己。 只要有足够的 slot 资源处于待机状态以补偿丢失的 TaskManager,应用程序就可以从 TaskManager 故障中恢复。 ResourceManager 分发之前空闲的 slot,然后重新启动应用程序。

如果为 HA 配置,则独立设置的所有 Dispatcher 在 ZooKeeper 注册。 ZooKeeper 选举一个负责执行应用程序的 leader Dispatcher。 提交应用程序时,负责的 Dispatcher 会启动一个 JobManager 线程,该线程将其元数据存储在配置的持久存储中,并将指针存储在 ZooKeeper 中,如前所述。如果运行 active Dispatcher 和 JobManager 的 master 进程失败,则 ZooKeeper 将选择一个新的 Dispatcher 作为 leader。leader Dispatcher 通过启动一个新的 JobManager 线程来恢复失败的应用程序,该线程在 ZooKeeper 中查找元数据指针并从持久存储中加载元数据。

除了前面讨论的配置之外,HA 独立部署还需要以下配置更改。 在 ./conf/flink-conf.yaml 中,你需要为每个正在运行的集群设置一个集群标识符。 如果多个 Flink 集群依赖同一个 ZooKeeper 实例进行故障恢复,则需要这样做:

# RECOMMENDED: set the path for the Flink cluster in

ZooKeeper.

# Isolates multiple Flink clusters from each other.

# The cluster id is required to look up the metadata of a failed cluster.

high-availability.cluster-id: /cluster-1

如果运行了 ZooKeeper quorum 并正确配置了 Flink,那么可以通过在./conf/masters 文件中添加额外的主机名和端口,使用常规的./bin/start-cluster.sh 脚本启动 HA standalone 集群。

YARN 上的 HA 设置

YARN 是一个集群资源和容器管理器。 默认情况下,它会自动重启失败的 master 和 TaskManager 容器。 因此,你不需要在 YARN 设置中运行备用进程来实现 HA。

Flink 的 master 进程作为 YARN ApplicationMaster 启动。YARN 会自动重启失败的 ApplicationMaster,但会跟踪和限制重启次数以防止无限恢复周期。 需要在 YARN 配置文件 yarn-site.xml 中配置 ApplicationManager 最大重启次数,如下图所示:

<property>

<name>yarn.resourcemanager.am.max-attempts</name>

<value>4</value>

<description>

The maximum number of application master execution attempts.

Default value is 2, i.e., an application is restarted at most once.

</description>

</property>

此外,你需要调整 Flink 的配置文件./conf/flinkconf.yaml,并配置应用程序重启尝试的次数:

# Restart an application at most 3 times (+ the initial start).

# Must be less or equal to the configured maximum number of attempts.

yarn.application-attempts: 4

YARN 只计算由于应用程序故障而导致的重启次数——由于抢占、硬件故障或重新启动而导致的重启不计入应用程序尝试次数。 如果你运行 Hadoop YARN 2.6 或更高版本,Flink 会自动配置尝试失败的有效间隔。 此参数指定应用程序仅在超过有效间隔内的重新启动尝试时才完全取消,这意味着不考虑间隔之前的尝试。 Flink 将时间间隔配置为与 ./conf/flink-conf.yaml 中的 akka.ask.timeout 参数相同的值,默认值为 10 秒。

 

给定一个运行的 ZooKeeper 集群和正确配置的 YARN 和 Flink 设置,你可以通过使用./bin/flink  run -m yarn-cluster 和./bin/yarn-session.sh.在作业模式或会话模式下启动 Flink 集群,就像没有启用 HA 一样。

 

请注意,必须为连接到同一 ZooKeeper 集群的所有 Flink 会话集群配置不同的群集 ID。在作业模式下启动 Flink 集群时,集群 ID 会自动设置为已启动应用程序的 ID,因此是唯一的。

 

Kubernetes 的 HA 设置

当在 Kubernetes 上运行 Flink 时,有一个 master 部署和一个 worker 部署,如“Kubernetes”中所述,Kubernetes 将自动重启失败的容器,以确保正确数量的 pod 启动并运行。 这足以从 ResourceManager 处理的 worker 故障中恢复。 但是,从 master 故障中恢复需要额外的配置,如前所述。

为了启用 Flink 的 HA 模式,你需要调整 Flink 的配置并提供诸如 ZooKeeper quorum

节点的主机名、持久存储的路径以及 Flink 的集群 ID 等信息。 所有这些参数都需要添加到 Flink 的配置文件(./conf/flink-conf.yaml)中。

                    自定义 Flink 容器配置

不幸的是,我们之前在 Docker 和 Kubernetes 示例中使用的 Flink Docker 镜像不支持设置自定义配置参数。 因此,该镜像不能用于在 Kubernetes 上设置 HA Flink 集群。 相反,你需要构建一个自定义镜像,该镜像要么对所需参数进行“硬编码”,要么足够灵活以通过参数或环境变量动态调整配置。 标准的 Flink Docker 镜像是定制你自己的 Flink 镜像的一个很好的起点。

 

集成 Hadoop 组件

Apache Flink 可以轻松地与 Hadoop YARN 和 HDFS 以及 Hadoop 生态系统的其他组件(例如 HBase)集成。 在所有这些情况下,Flink 都需要 Hadoop 对其类路径的依赖。

 

有三种方法为 Flink 提供 Hadoop 依赖:

1. 使用为特定 Hadoop 版本构建的 Flink 二进制分发版。 Flink 为最常用的 vanilla Hadoop 版本提供构建。

2. 为特定的 Hadoop 版本构建 Flink。 如果 Flink 的二进制发行版都不适用于你环境中部署的 Hadoop 版本,自己构建将非常有用; 例如,如果你运行修补过的 Hadoop 版本或经销商的 Hadoop 版本,例如 Cloudera、Hortonworks 或 MapR。

为了给特定的 Hadoop 版本构建 Flink,你需要 Flink 的源代码,可以通过从网站下载源发行版或从项目的 Git 存储库中克隆一个稳定的发布分支,至少需要 Java8 JDK,获得 Flink 的源代码, 和 Apache Maven 3.2。 进入 Flink 源代码文件夹并运行以下命令之一:

// build Flink for a specific official Hadoop version

mvn clean install -DskipTests -Dhadoop.version=2.6.1

// build Flink for a Hadoop version of a distributor

mvn clean install -DskipTests -Pvendor-repos \

-Dhadoop.version=2.6.1-cdh5.0.0

编译完成后会存放在./build-target 文件夹中。

  

3. 使用 Flink 的 Hadoop-free 发行版并手动配置 Hadoop 依赖项的类路径。 如果提供的构建都不适用于你的设置,则此方法很有用。 Hadoop 依赖项的类路径必须在 HADOOP_CLASSPATH 环境变量中声明。 如果未配置该变量,如果可以访问 hadoop 命令,你可以使用以下命令自动设置它:export HADOOP_CLASSPATH=`hadoop classpath`。hadoop 命令的 classpath 选项打印其配置的类路径

 

除了配置 Hadoop 依赖项之外,你还需要提供 Hadoop 配置目录的位置。这应该通过导出 HADOOP_CONF_DIR(首选)或 HADOOP_CONF_PATH 环境变量来完成。 一旦 Flink 知道 Hadoop 的配置,它就可以连接到 YARN 的 ResourceManager 和 HDFS。

文件系统配置

Apache Flink 使用文件系统来完成各种任务。 应用程序可以从文件中读取输入并将结果写入文件(参见“文件系统源连接器”),应用程序检查点和元数据保存在远程文件系统中以进行恢复(参见“检查点、保存点和状态恢复”),并且一些内部组件利用文件系统将数据分发给任务,例如应用程序 JAR 文件。

Flink 支持多种文件系统。 由于 Flink 是分布式系统并在集群或云环境中运行进程,因此文件系统通常需要可全局访问。 因此,Hadoop HDFS、S3 和 NFS 是常用的文件系统。

与其他数据处理系统类似,Flink 查看路径的 URI 模式来识别路径所引用的文件系统。例如,file:///home/user/data.txt 指向本地文件系统中的一个文件,而 hdfs:///namenode:50010/home/user/data.txt 指向指定 hdfs 集群中的一个文件。

文件系统在 Flink 中由 org.apache.flink.core.fs.FileSystem 类的实现表示。 FileSystem 类实现文件系统操作,例如读取和写入文件、创建目录或文件以及列出目录的内容。 Flink 进程(JobManager 或 TaskManager)为每个配置的文件系统实例化一个 FileSystem 对象,并在所有本地任务之间共享它,以确保强制执行配置的约束,例如对打开连接数量的限制。

Flink 为最常用的文件系统提供了如下实现:

本地文件系统

Flink 内置了对本地文件系统的支持,包括本地挂载的网络文件系统,例如 NFS 或 SAN,并且不需要额外的配置。 本地文件系统由 file:// URI 方案引用。

Hadoop 的 HDFS

Flink 的 HDFS 连接器始终在 Flink 的类路径中。 但是,它需要 Hadoop 依赖类路径才能工作。 “与 Hadoop 组件的集成”解释了如何确保加载 Hadoop 依赖项。 HDFS 路径以 hdfs:// 方案为前缀。

Amazon S3

Flink 提供了两种替代的文件系统连接器来连接到 S3,它们基于 Apache Hadoop 和 Presto。 两个连接器都是完全独立的,不需要任何依赖项。 要安装这两个连接器中的任何一个,请将相应的 JAR 文件从 ./opt 文件夹移动到 ./lib 文件夹中。 Flink 文档提供了有关 S3 文件系统配置的更多详细信息。 S3 路径使用 s3:// scehme 指定

OpenStack Swift FS

Flink 提供了一个连接到 Swift FS 的连接器,它基于 Apache Hadoop。 连接器是完全独立的,不公开任何依赖项。 它是通过将 swift-connector JAR 文件从 ./opt 移动到 ./lib 文件夹来安装的。 Swift FS 路径由 swift:// scheme 标识。

对于 Flink 没有提供专用连接器的文件系统,如果配置正确,Flink 可以委托给 Hadoop 文件系统连接器。 这就是 Flink 能够支持所有 HCFS 的原因。

Flink 在 ./conf/flink-conf.yaml 中提供了一些配置选项来指定默认文件系统并限制文件系统连接的数量。 你可以指定默认文件系统 scheme (fs.default-scheme),如果路径不提供模式,该 scheme 会自动添加为前缀。例如,如果你指定 fs.default-scheme: hdfs://nnode1:9000,则路径 /result 将扩展到 hdfs://nnode1:9000/result。

你可以限制从(输入)和写入(输出)文件系统的连接数。可以根据 URI 模式定义配置。相关的配置 key 为:

fs.<scheme>.limit.total: (number, 0/-1 mean no limit)

fs.<scheme>.limit.input: (number, 0/-1 mean no limit)

fs.<scheme>.limit.output: (number, 0/-1 mean no limit)

fs.<scheme>.limit.timeout: (milliseconds, 0 means infinite)

fs.<scheme>.limit.stream-timeout: (milliseconds, 0 means infinite)

每个 TaskManager 进程和路径权限跟踪连接数——分别跟踪 hdfs://nnode1:50010 和 hdfs://nnode2:50010。 连接限制可以单独配置为输入和输出连接,也可以配置为连接总数。 当文件系统达到其连接限制并尝试打开新连接时,它将阻塞并等待另一个连接关闭。 超时参数定义了等待连接请求失败的时间(fs.<scheme>.limit.timeout)和等待空闲连接关闭的时间(fs.<scheme>.limit.stream-timeout)。

你还可以提供自定义的文件系统连接器。更多可以参考 Flink 文档,以了解如何实现和注册自定义文件系统。

 

系统配置

Apache Flink 提供了许多参数来配置其行为和调整其性能。 所有参数都可以在 ./conf/flink-conf.yaml 文件中定义,该文件组织为键值对的扁平化 YAML 文件。 配置文件由不同的组件读取,例如启动脚本、master 和 worker JVM 进程以及 CLI 客户端。 例如./bin/start-cluster.sh 等启动脚本解析配置文件提取 JVM 参数和堆大小设置,CLI 客户端(./bin/flink)提取连接信息访问 master 进程。在重新启动 Flink 之前,配置文件中的更改是不会生效的。

为了改善开箱即用的体验,Flink 预先配置为本地设置。 你需要调整配置才能在分布式环境中成功运行 Flink。 在本节中,我们将讨论在设置 Flink 集群时通常需要配置的不同方面。 我们建议你参阅官方文档以获取所有参数的完整列表和详细说明。

 

Java 和类加载

默认情况下,Flink 使用 PATH 环境变量链接的 Java 可执行文件启动 JVM 进程。 如果 Java 不在 PATH 中,或者如果你想使用不同的 Java 版本,你可以通过 JAVA_HOME 环境变量或配置文件中的 env.java.home 配置项指定 Java 安装的根文件夹。 Flink 的 JVM 进程可以使用自定义 Java 选项启动——例如,微调垃圾收集器或启用远程调试,通过 env.java.opts, env.java.opts.jobmanager, and env.java.opts.taskmanager 这些参数进行配置。

 

运行具有外部依赖项的作业时,类加载问题并不少见。为了执行 Flink 应用程序,应用程序 JAR 文件中的所有类都必须由类加载器加载。 Flink 将每个作业的类注册到单独的用户代码类加载器中,以确保作业的依赖关系不会与 Flink 的运行时依赖关系或其他作业的依赖关系相互干涉。 当相应的作业终止时,用户代码类加载器将被处理掉。 Flink 的系统类加载器加载 ./lib 文件夹中的所有 JAR 文件,用户代码类加载器源自系统类加载器。

 

默认情况下,Flink 首先在子(用户代码)类加载器中查找用户代码类,然后在父(系统)类加载器中查找用户代码类,以防止在作业使用与 Flink 相同的依赖关系时发生版本冲突。 但是,你也可以使用 classloader.resolve-order 配置键反转查找顺序。

 

请注意,某些类始终首先在父类加载器(classloader.parent-first-patterns.default)中解析。 你可以通过提供首先从父类加载器 (classloader.parent-first-patterns.additional) 解析的类名模式白名单来扩展该列表。

CPU

Flink 不会主动限制其消耗的 CPU 资源量。 然而,它使用处理 slot(详见“任务执行”)来控制可以分配给 worker 进程(TaskManager)的任务数量。 TaskManager 提供一定数量的 slot,这些 slot 在 ResourceManager 处注册并受其管理。 JobManager 请求一个或多个 slot 来执行应用程序。 每个 slot 可以处理一个应用程序的一个切片,该应用程序的每个算子的一个并行任务。 因此,JobManager 需要获取至少与应用程序的最大算子并行度一样多的 slot。任务作为工作线程 (TaskManager) 进程内的线程执行,并根据需要占用尽可能多的 CPU 资源。

 

TaskManager 提供的 slots 的数量由配置文件中的 taskmanager.numberOfTaskSlots 控制。默认是每个 TaskManager 一个 slot。通常只需要为 standalone 模式配置 slots 的数量,因为在集群资源管理器(YARN、Kubernetes、Mesos)上运行 Flink 可以很容易地为每个计算节点启动多个任务管理器(每个都有一个 slot)。

 

内存和网络缓冲

Flink 的 master 和 worker 进程有不同的内存需求。 master 进程主要管理计算资源 (ResourceManager) 并协调应用程序的执行 (JobManager),而 worker 进程负责繁重的计算工作并处理潜在的大量数据。

通常,主进程对内存的要求适中。默认情况下,它以 1 GB JVM 堆内存启动。 如果 master 进程需要管理多个应用程序或具有多个算子的应用程序,你可能需要使用 jobmanager.heap.size 配置项增加 JVM 堆大小。

配置 worker 进程的内存有点复杂,因为有多个组件分配不同类型的内存。最重要的参数是 JVM 堆内存的大小,用 taskmanager.heap.size 设置。堆内存用于所有对象,包括 TaskManager 运行时、应用程序的运算符和函数以及动态数据。使用内存或文件系统状态后端的应用程序的状态也存储在 JVM 上。一个任务可能会消耗其正在运行的 JVM 的整个堆内存。 Flink 不保证或授予每个任务或 slot 的堆内存。每个 TaskManager 具有一个 slot 的配置具有更好的资源隔离,并且可以防止行为不端的应用程序干扰不相关的应用程序。如果运行具有许多依赖项的应用程序,JVM 的非堆内存也会显着增长,因为它存储所有 TaskManager 和用户代码类。

 

除了 JVM 之外,还有另外两个主要的内存消费者,Flink 的网络堆栈和 RocksDB,当它用作状态后端时。 Flink 的网络堆栈基于 Netty 库,它从本机(堆外)内存中分配网络缓冲区。 Flink 需要足够数量的网络缓冲区才能将记录从一个 worker 进程传送到另一个 worker 进程。 缓冲区的数量取决于算子任务之间的网络连接总数。 对于通过分区或广播连接的两个运营商,网络缓冲区的数量取决于发送和接收算子并行度的乘积。 对于具有多个分区步骤的应用程序,这种二次依赖可以快速累积为网络传输所需的大量内存。

 

Flink 的默认配置只适用于较小规模的分布式设置,针对更大规模集群。需要调整配置。如果缓冲区的数量配置不当,作业提交将失败,并出现 java.io.IOException: Insufficient number of network buffers。 在这种情况下,你应该为网络堆栈提供更多内存。

 

分配给网络缓冲区的内存量是使用 taskmanager.network.memory.fraction 配置的,它决定了分配给网络缓冲区的 JVM 大小的比例。 默认情况下,使用 10% 的 JVM 堆大小。 由于缓冲区被分配为堆外内存,因此 JVM 堆减少了该数量。 配置键 taskmanager.memory.segment-size 决定了网络缓冲区的大小,默认为 32 KB。 减少网络缓冲区的大小会增加缓冲区的数量,但会降低网络堆栈的效率。 你还可以指定用于网络缓冲区的最小 (taskmanager.network.memory.min) 和最大 (taskmanager.network.memory.max) 内存量(默认情况下分别为 64 MB 和 1 GB)以设置相对配置值的绝限制。

 

RocksDB 是另一个内存消费者,在配置 worker 进程的内存时需要考虑。 不幸的是,计算 RocksDB 的内存消耗并不简单,因为它取决于应用程序中键控状态的数量。 Flink 为 基于 key 的 operator 的每个任务创建一个单独的(嵌入式)RocksDB 实例。 在每个实例中,运算符的每个不同状态都存储在单独的列族(或表)中。 在默认配置下,每个列族需要大约 200 MB 到 240 MB 的堆外内存。 你可以调整 RocksDB 的配置并使用许多参数调整其性能。

 

在配置 TaskManager 的内存设置时,你应该调整 JVM 堆内存的大小,以便为 JVM 非堆内存(类和元数据)和 RocksDB(如果将其配置为状态后端)留出足够的内存。 网络内存会自动从配置的 JVM 堆大小中减去。 请记住,某些资源管理器(例如 YARN)会在容器超出其内存预算时立即终止该容器。

磁盘存储

Flink worker 进程出于多种原因将数据存储在本地文件系统上,包括接收应用程序 JAR 文件、写入日志文件以及在配置了 RocksDB 状态后端的情况下维护应用程序状态。 使用 io.tmp.dirs 配置键,你可以指定一个或多个用于在本地文件系统中存储数据的目录(以冒号分隔)。 默认情况下,数据写入默认临时目录,由 Java 系统属性 java.io.tmpdir 或 Linux 和 MacOS 上的 /tmp 确定。 io.tmp.dirs 参数作为 Flink 大部分组件本地存储路径的默认值。 但是,这些路径也可以单独配置。

 

                        确保临时目录没有被自动清除

某些 Linux 发行版会定期清理临时目录 /tmp。 如果你计划运行连续的 Flink 应用程序,请确保禁用此行为或配置不同的目录。 否则作业恢复可能会丢失存储在临时目录中的元数据并失败。

blob.storage.directory 键配置 blob 服务器的本地存储目录,用于交换较大的文件,例如应用程序 JAR 文件。 env.log.dir 键配置了 TaskManager 将其日志文件写入的目录(默认情况下,Flink 设置中的 ./log 目录)。 最后,RocksDB 状态后端在本地文件系统中维护应用程序状态。 该目录是使用 state.backend.rocksdb.localdir 键配置的。 如果存储目录没有明确配置,RocksDB 使用 io.tmp.dirs 参数的值。

 

检查点和状态后端

Flink 提供了一些选项来配置状态后端如何检查它们的状态。 所有参数都可以在应用程序代码中明确指定,如“调整检查点和恢复”中所述。 但是,你也可以通过 Flink 的配置文件为 Flink 集群提供默认设置,如果未声明作业特定选项,则应用这些设置。

 

影响应用程序性能的一个重要选择是维护其状态的状态后端。你可以使用 state.backend 键定义集群的默认状态后端。 此外,你可以启用异步检查点 (state.backend.async) 和增量检查点 (state.backend.incremental)。 某些后端不支持所有选项,可能会忽略它们。 你还可以配置检查点 (state.checkpoints.dir) 和保存点 (state.savepoints.dir) 写入的远程存储的根目录。

一些检查点选项是后端特定的。对于 RocksDB 状态后端,你可以定义一个或多个 RocksDB 存储其本地文件的路径(state.backend.rocksdb.localdir),以及计时器状态是存储在堆上(默认)还是 RocksDB 中(state.backend.rocksdb.timer-service.factory)。

最后,你可以默认为 Flink 集群启用和配置本地恢复。 要启用本地恢复,请将参数 state.backend.local- recovery 设置为 true。 也可以指定本地状态副本的存储位置(taskmanager.state.local.root-dirs)。

 

安全性

数据处理框架是公司 IT 基础设施的敏感组件,需要保护以防止未经授权的使用和访问数据。 Apache Flink 支持 Kerberos 身份验证,并且可以配置为使用 SSL 加密所有网络通信。

Flink 具有 Kerberos 与 Hadoop 及其组件(YARN、HDFS、HBase)、ZooKeeper 和 Kafka 的集成。 你可以分别为每个服务启用和配置 Kerberos 支持。 Flink 支持两种身份验证模式——keytabs 和 Hadoop 委托令牌。 Keytabs 是首选方法,因为令牌会在一段时间后过期,这可能会导致长时间运行的流处理应用程序出现问题。 请注意,凭据与 Flink 集群相关联,而不是与正在运行的作业相关联; 在同一集群上运行的所有应用程序都使用相同的身份验证令牌。 如果你需要使用不同的凭据,你应该启动一个新集群。 有关启用和配置 Kerberos 身份验证的详细说明,请参阅 Flink 文档。

 

Flink 支持通信伙伴的相互认证和使用 SSL 的网络通信加密,用于内部和外部通信。 对于内部通信(RPC 调用、数据传输和用于分发库或其他工件的 Blob 服务通信),所有 Flink 进程(Dispatcher、ResourceManager、JobManager 和 TaskManager)都执行相互身份验证——发送方和接收方通过 SSL 证书相互验证。 该证书充当共享机密,可以嵌入到容器中或附加到 YARN 设置。

 

所有与 Flink 服务的外部通信——提交和控制应用程序以及访问 REST 接口——都发生在 REST/HTTP 端点上。你也可以为这些连接启用 SSL 加密。 也可以启用相互身份验证。 但是,推荐的方法是设置和配置控制对 REST 端点的访问的专用代理服务。 原因是代理服务比 Flink 提供了更多的身份验证和配置选项。 尚不支持用于与可查询状态通信的加密和身份验证。

默认情况下,不启用 SSL 身份验证和加密。 由于设置需要几个步骤,例如生成证书、设置 TrustStores 和 KeyStores、配置密码套件,我们建议你参考 Flink 官方文档。 该文档还包括针对不同环境(例如独立集群、Kubernetes 和 YARN)的操作方法和技巧。

 

小结

在本章中,我们讨论了如何在不同环境中设置 Flink 以及如何配置 HA 设置。 我们解释了如何启用对各种文件系统的支持以及如何将它们与 Hadoop 及其组件集成。 最后,我们讨论了最重要的配置选项。 我们没有提供全面的配置指南; 相反,我们建议你参阅 Apache Flink 的官方文档,以获取所有配置选项的完整列表和详细说明。

 

发布于: 刚刚阅读数: 3
用户头像

还未添加个人签名 2018.05.14 加入

公众号【数据与智能】主理人,个人微信:liuq4360 12 年大数据与 AI相关项目经验, 10 年推荐系统研究及实践经验,目前已经输出了40万字的推荐系统系列精品文章,并有新书即将出版。

评论

发布
暂无评论
配置Flink流式应用(九)