写点什么

Flink 和流式应用运维(十 - 上)

发布于: 2 小时前
Flink 和流式应用运维(十-上)

写在前面:

大家好,我是强哥,一个热爱分享的技术狂。目前已有 12 年大数据与 AI 相关项目经验, 10 年推荐系统研究及实践经验。平时喜欢读书、暴走和写作。

业余时间专注于输出大数据、AI 等相关文章,目前已经输出了 40 万字的推荐系统系列精品文章,强哥的畅销书「构建企业级推荐系统:算法、工程实现与案例分析」已经出版,需要提升可以私信我呀。如果这些文章能够帮助你快速入门,实现职场升职加薪,我将不胜欢喜。

想要获得更多免费学习资料或内推信息,一定要看到文章最后喔。

内推信息

如果你正在看相关的招聘信息,请加我微信:liuq4360,我这里有很多内推资源等着你,欢迎投递简历。

免费学习资料

如果你想获得更多免费的学习资料,请关注同名公众号【数据与智能】,输入“资料”即可!

学习交流群

如果你想找到组织,和大家一起学习成长,交流经验,也可以加入我们的学习成长群。群里有老司机带你飞,另有小哥哥、小姐姐等你来勾搭!加小姐姐微信:epsila,她会带你入群。


流应用程序通常是需要长时间不间断运行,而且它们的工作负载通常是不可预测的。流式作业连续运行数月的情况并不少见,因此其操作需求与短期批处理作业的需求大不相同。假设一个场景,你在部署的应用程序中检测到错误。如果你的应用程序是批处理作业,你可以通过离线的方式轻松地修复错误,然后在当前作业实例完成后重新部署修复后的应用程序代码。但是,如果你的作业是长时间运行的流作业怎么办?你如何在保证正确性的同时轻松应用更新后的配置?

 

如果你使用的是 Flink,则无需担心上面的问题。 Flink 将会完成所有繁重的工作,因此你可以轻松地监控、操作和重新配置你的作业,同时保留恰好一次的状态语义。 在本章中,我们将介绍 Flink 提供的用于操作和维护持续运行的流应用程序的工具。 我们将向你展示如何收集指标和监控你的应用程序,以及如何在你想要更新应用程序代码或调整应用程序资源时保持结果一致性。

 

运行和管理流式应用

正如你所料,维护流应用程序比维护批处理应用程序更具挑战性。虽然流应用程序是有状态的并且持续运行,但批处理应用程序会定期执行。可以在两次执行之间重新配置、扩展或更新批处理应用程序,这比升级一个不断摄取、处理和发出数据的应用程序要容易得多。

 

然而,Apache Flink 有许多特性可以显著简化流式应用程序的维护。 大多数这些功能都基于保存点。 Flink 公开了以下接口来监控和控制其 master 进程和工作进程以及应用程序:

 

1. 命令行客户端是用于提交和控制应用程序的工具。

2. REST API 是命令行客户端和 Web UI 使用的底层接口。 它可由用户和脚本访问,并提供对所有系统和应用程序指标以及端点的访问,以提交和管理应用程序。

3. Web UI 是一个 Web 界面,提供有关 Flink 集群和正在运行的应用程序的详细信息和指标。 它还提供提交和管理应用程序的基本功能。 Web UI 在“Flink Web UI”中有描述。

 

在本节中,我们将解释保存点的实用应用,并讨论如何使用 Flink 的命令行客户端和 Flink 的 REST API 启动、停止、暂停和恢复、扩展和升级有状态的流应用程序。

保存点

保存点(savepoint)与检查点(checkpoint)基本相同——它是应用程序状态的一致且完整的快照。 但是,检查点和保存点的生命周期不同。 检查点由 Flink 自动创建、加载和自动删除(取决于应用程序的配置)。 此外,取消应用程序时会自动删除检查点,除非应用程序明确启用检查点保留。 相比之下,保存点必须由用户或外部服务手动触发,并且永远不会被 Flink 自动删除。

 

保存点是持久数据存储中的一个目录。它由一个子目录组成,该子目录包含所有任务状态的数据文件和一个包含所有数据文件绝对路径的二进制元数据文件。由于元数据文件中的路径是绝对路径,因此将保存点移动到不同的路径将使其无法使用。以下是一个保存点的结构:

# Savepoint root path

/savepoints/

# Path of a particular savepoint

/savepoints/savepoint-:shortjobid-:savepointid/

# Binary metadata file of a savepoint

/savepoints/savepoint-:shortjobid-:savepointid/_metadata

# Checkpointed operator states

/savepoints/savepoint-:shortjobid-:savepointid/:xxx

通过命令行客户端管理应用

Flink 的命令行客户端提供启动、停止和管理 Flink 应用程序的功能。它从./conf/flink-conf.yaml 文件中读取配置(参见“系统配置”章节)。可以从 Flink 安装目录中调用./bin/flink 命令。

在没有额外参数的情况下运行时,客户端会打印一条帮助消息。

                       WINDOWS 上的命令行客户端

命令行客户端基于 bash 脚本。 因此,它不适用于 Windows 命令行。Windows 命令行的./bin/flink.bat 脚本仅提供非常有限的功能。 如果你是 Windows 用户,我们建议使用常规命令行客户端并在 WSL 或 Cygwin 上运行它。

启动一个应用程序

你可以在命令行客户端使用 run 命令启动一个应用程序:

./bin/flink run ~/myApp.jar

上面的命令从 JAR 文件的 META-INF/MANIFEST.MF 文件的 program-class 属性中引用的类的 main() 方法启动应用程序,而不向应用程序传递任何参数。 客户端将 JAR 文件提交给 master 进程,master 进程将其分发给 worker 节点。

 

你可以通过将参数附加到命令的末尾来将参数传递给应用程序的 main() 方法:

./bin/flink run ~/myApp.jar my-arg1 my-arg2 my-arg3

默认情况下,客户端在提交应用程序后不返回,而是等待应用程序终止。你可以使用-d 标志以独立(detached)模式提交申请,如下图所示:

./bin/flink run -d ~/myApp.jar

客户端不会等待应用程序终止,而是返回并打印已提交作业的 JobID。JobID 用于在获取保存点、取消或重新扩展应用程序时指定作业。 你可以使用-p 标志指定应用程序的默认并行度:

./bin/flink run -p 16 ~/myApp.jar

上述命令将执行环境的默认并行度设置为 16。 执行环境的默认并行度被应用程序源代码明确指定的所有设置覆盖——通过在 StreamExecutionEnvironment 上调用 setParallelism() 方法或在算子上指定并行度,这两者都优先于默认值。

如果你的应用程序 JAR 文件的清单文件没有指定一个入口类,你可以使用-c 参数指定这个类:

./bin/flink run -c my.app.MainClass ~/myApp.jar

此时客户端会启动 my.app.MainClass 类的 main 方法。

默认情况下,客户端向./conf/flink-conf.yaml 文件指定的 Flink master 提交应用程序(不同设置见“系统配置”中的配置)。 你可以使用 -m 标志将应用程序提交给特定的 master 进程:

./bin/flink run -m myMasterHost:9876 ~/myApp.jar

此命令将应用程序提交给运行在主机 myMasterHost,端口 9876 上的 master。

请注意,如果你第一次启动应用程序或不提供保存点或检查点来初始化状态,则应用程序的状态将为空。在这种情况下,一些有状态的算子运行特殊的逻辑来初始化它们的状态。例如,如果没有可用的恢复读取位置,Kafka 源需要选择从其消费主题的分区偏移量。

 

列出运行中的应用程序

对于你想要应用于正在运行的作业的所有操作,你需要提供标识应用程序的 JobID。 可以从 Web UI、REST API 或使用命令行客户端获取作业的 ID。 当你运行以下命令时,客户端会打印所有正在运行的作业的列表,包括它们的 JobID:

./bin/flink list -r

Waiting for response...

------------------ Running/Restarting Jobs ------------------

- 17.10.2018 21:13:14 :

bc0b2ad61ecd4a615d92ce25390f61ad :

Socket Window WordCount (RUNNING)

-------------------------------------------------------------

在上面这个例子中,JobID 是 bc0b2ad61ecd4a615d92ce25390f61ad。


获取和处理保存点

使用命令行客户端,可以为一个运行的应用程序,设置一个保存点,命令如下:

./bin/flink savepoint <jobId> [savepointPath]

该命令使用提供的 JobID 触发作业的保存点。 如果你明确指定保存点路径,则它存储在提供的目录中。 否则,将使用 flink-conf.yaml 文件中配置的默认保存点目录。

为了触发作业 bc0b2ad61ecd4a615d92ce25390f61ad 的保存点并将其存储在目录 hdfs:///xxx:50070/savepoints 中,我们调用命令行客户端:

./bin/flink savepoint bc0b2ad61ecd4a615d92ce25390f61ad hdfs:///xxx:50070/savepoints

Triggering savepoint for job

bc0b2ad61ecd4a615d92ce25390f61ad.

Waiting for response...

Savepoint completed.

Path: hdfs:///xxx:50070/savepoints/savepoint-bc0b2a-

63cf5d5ccef8

You can resume your program from this savepoint with the run command.

保存点会占用大量磁盘空间,savepoint 不会被 Flink 自动删除。需要手动删除它们以释放占用的存储空间。使用以下命令删除保存点:

./bin/flink savepoint -d <savepointPath>

例如,删除我们前面创建的保存点,调用命令:

./bin/flink savepoint -d hdfs:///xxx:50070/savepoints/savepoint-bc0b2a-63cf5d5ccef8

Disposing savepoint 'hdfs:///xxx:50070/savepoints/savepointbc0b2a-

63cf5d5ccef8'.

Waiting for response...

Savepoint 'hdfs:///xxx:50070/savepoints/savepoint-bc0b2a-

63cf5d5ccef8' disposed.

删除保存点

在另一个检查点或保存点完成之前,你不得删除保存点。由于系统处理保存点与常规检查点类似,因此算子还会收到已完成保存点的检查点完成通知并对其采取行动。例如,当保存点完成时,事务接收器将更改提交给外部系统。为了保证恰好一次输出,Flink 必须从最近完成的检查点或保存点恢复。 如果 Flink 尝试从已删除的保存点恢复,则故障恢复将失败。 一旦另一个检查点(或保存点)完成,你就可以安全地删除保存点。

取消运行应用程序

可以通过两种方式取消应用程序:有或没有保存点都适用。要取消正在运行的应用程序而不使用保存点,请运行以下命令:

./bin/flink cancel <jobId>

为了在取消正在运行的应用程序之前获取保存点,将-s 标志添加到取消命令:

./bin/flink cancel -s [savepointPath] <jobId>

如果不指定保存点路径,则使用./conf/flink-conf.yaml 文件中配置的默认保存点目录(请参阅“系统配置”)。 如果保存点文件夹既未在命令中明确指定,也未从配置中可用,则该命令将失败。 要取消 JobID 为 bc0b2ad61ecd4a615d92ce25390f61ad 的应用程序并将保存点存储在 hdfs:///xxx:50070/savepoints,请运行以下命令:

./bin/flink cancel -s hdfs:///xxx:50070/savepoints d5fdaff43022954f5f02fcd8f25ef855

Cancelling job bc0b2ad61ecd4a615d92ce25390f61ad

with savepoint to hdfs:///xxx:50070/savepoints.

Cancelled job bc0b2ad61ecd4a615d92ce25390f61ad.

Savepoint stored in hdfs:///xxx:50070/savepoints/savepointbc0b2a-d08de07fbb10.

 取消应用程序可能会失败

请注意,如果获取保存点失败,作业将继续运行。你将需要再次尝试取消作业。

从保存点启动应用程序

从保存点启动应用程序相当简单。你所要做的就是使用 run 命令启动一个应用程序,并另外使用 -s 选项提供一个保存点的路径:

./bin/flink run -s <savepointPath> [options] <jobJar> [arguments]

当作业启动时,Flink 将保存点的各个状态快照与启动的应用程序的所有状态进行匹配。 这种匹配分两步完成。 首先,Flink 比较保存点和应用程序算子的唯一标识符。 其次,它为每个算子操作匹配保存点和应用程序的状态标识符(详见“保存点”)。

应该定义唯一的算子 ids

当作业启动时,Flink 将保存点的各个状态快照与启动的应用程序的所有状态进行匹配。 这种匹配分两步完成。 首先,Flink 比较保存点和应用程序算子的唯一算子标识符。 其次,它为每个算子匹配保存点和应用程序的状态标识符(详见“保存点”)。

如前所述,只有与保存点兼容的应用程序才能从保存点启动。未修改的应用程序始终可以从其保存点重新启动。但是,如果重新启动的应用程序与从中获取保存点的应用程序不同,则需要考虑三种情况:

 

 如果你在应用中添加了一个新的 state 或者改变了 stateful operator 的唯一标识符,Flink 将不会在保存点中找到对应的 state 快照。 在这种情况下,新状态被初始化为空。

 如果你从应用程序中删除了一个状态或更改了有状态算子的唯一标识符,则保存点中存在无法与应用程序匹配的状态。在这种情况下,Flink 不会启动应用程序以避免丢失保存点中的状态。 你可以通过向 run 命令添加 -n 选项来禁用此安全检查。

 如果你更改了应用程序中的状态——更改了状态原语或修改了状态的数据类型——应用程序将无法启动。 这意味着你不能轻易地在你的应用程序中演化状态的数据类型,除非你从一开始设计你的应用程序就考虑到状态演化。 Flink 社区目前正在努力改进对状态演化的支持。 (参见“修改算子的状态”。)


扩展和缩减应用程序

减少或增加应用程序的并行度并不困难。你需要获取一个保存点,取消应用程序,然后从保存点以调整后的并行度重新启动它。应用程序的状态会自动重新分配给更多或更少数量的并行算子任务。有关如何缩放不同类型的算子状态和键状态的详细信息,请参阅“缩放有状态算子”。 但是,有一些事情需要考虑。

 

如果你需要精确一次的结果,你应该使用集成的 savepoint-and-cancel 命令获取保存点并停止应用程序。这将阻止另一个检查点在保存点之后完成,保存点将触发精确一次的 sinks 输出保存点之后的数据。

 

如“设置并行性”中所述,应用程序及其算子的并行性可以用不同的方式指定。 默认情况下,算子以其关联的 StreamExecutionEnvironment 的默认并行度运行。 可以在启动应用程序时指定默认并行度(例如,在 CLI 客户端中使用 -p 参数)。 如果你实现的应用程序使得其算子的并行度取决于默认环境并行度,则可以通过从相同的 JAR 文件启动应用程序并指定新的并行度来简单地扩展应用程序。 但是,如果你对 StreamExecutionEnvironment 或某些算子的并行性进行了硬编码,则可能需要调整源代码并在提交执行之前重新编译和重新打包你的应用程序。

 

如果你的应用程序的并行度取决于环境的默认并行度,Flink 提供了一个原子 rescale 命令,该命令获取一个保存点,取消应用程序,并使用新的默认并行度重新启动它:

./bin/flink modify <jobId> -p <newParallelism>

若要使 jobId 为 bc0b2ad61ecd4a615d92ce25390f61ad 的应用程序,并行度扩展为 16,请运行以下命令:

 

./bin/flink modify bc0b2ad61ecd4a615d92ce25390f61ad -p 16

Modify job bc0b2ad61ecd4a615d92ce25390f61ad.

Rescaled job bc0b2ad61ecd4a615d92ce25390f61ad. Its new parallelism is 16.

 

正如“Scaling Stateful Operators”中所述,Flink 在所谓的 key group 的粒度上分配 key 状态。 因此,有状态算子的 key group 数量决定了它的最大并行度。 使用 setMaxParallelism() 方法为每个算子配置 key group 的数量。(参见“定义键状态算子的最大并行度”。)

 

通过 REST API 管理应用

REST API 可以由用户或脚本直接访问,并公开有关 Flink 集群及其应用程序的信息,包括指标以及用于提交和控制应用程序的端点。 Flink 从同一个 Web 服务器提供 REST API 和 Web UI,它作为 Dispatcher 进程的一部分运行。 默认情况下,两者都在端口 8081 上公开。你可以在 ./conf/flink-conf.yaml 文件中使用 rest.port 配置不同的端口。 值为 -1 将禁用 REST API 和 Web UI。

与 REST API 交互的一个常用命令行工具是 curl。典型的 curl REST 命令如下:

curl -X <HTTP-Method> [-d <parameters>] http://hostname:port/v1/<REST-point>

v1 表示 REST API 的版本。Flink 1.7 公开了 API 的第一个版本(v1)。假设你正在运行一个本地 Flink,它在端口 8081 上公开了它的 REST API,下面的 curl 命令向/overview REST 提交一个 GET 请求:

curl -X GET http://localhost:8081/v1/overview


该命令返回关于集群的一些基本信息,如 Flink 的版本、正在运行、完成、取消或失败的 TaskManagers、slots 和 jobs 的数量:

{

   "taskmanagers":2,

   "slots-total":8,

   "slots-available":6,

   "jobs-running":1,

   "jobs-finished":2,

   "jobs-cancelled":1,

   "jobs-failed":0,

   "flink-version":"1.7.1",

   "flink-commit":"89eafb4"

}

在下文中,我们列出并简要描述了最重要的 REST 调用。 有关支持调用的完整列表,请参阅 Apache Flink 的官方文档。 “使用命令行客户端管理应用程序”提供了有关某些操作的更多详细信息,例如升级或扩展应用程序。

管理和监控 FLINK 集群

REST API 公开端点来查询关于正在运行的集群的信息并将其关闭。表 10-1、10-2 和 10-3 显示了获取关于 Flink 集群的信息的 REST 请求,比如 slots 的数量、运行和完成的作业、JobManager 的配置或所有可连接的 TaskManagers 列表。

 

表 10-1 获取基本的集群信息的 REST 请求

表 10-2 获取 JobManager 配置的 REST 请求

表 10-3 列出所有可连接的任务管理器的 REST 请求

表 10-4 显示的 REST 请求,列出了为 JobManager 收集的所有指标。

表 10-4 列出可用的 JobManager 指标的 REST 请求

为了检索一个或多个 JobManager 指标,将带有所有请求指标的 get 查询参数添加到请求中:

curl -X GET http://hostname:port/v1/jobmanager/metrics?get=metric1,metric2

表 10-5 显示了 REST 请求,以列出为 TaskManagers 收集的所有指标。

表 10-5  列出可用的 TaskManager 指标的 REST 请求

要检索 TaskManager 的一个或多个指标,请将 get 查询参数与所有请求的指标一起添加到请求中:

curl -X GET http://hostname:port/v1/taskmanagers/<tmId>/metrics?get=metric1

还可以使用表 10-6 所示的 REST 调用来关闭集群。

表 10-6  关闭集群的 REST 请求

管理和监控 FLINK 应用程序

REST API 还可以用于管理和监控 Flink 应用程序。要启动应用程序,首先需要将应用程序的 JAR 文件上传到集群。表 10-7、10-8 和 10-9 显示了管理这些 JAR 文件的 REST 请求。

表 10-7  上传 JAR 文件的 REST 请求

curl 命令上传一个 JAR 文件:

curl -X POST -H "Expect:" -F "jarfile=@path/to/flink-job.jar"  http://hostname:port/v1/jars/upload

表 10-8 列出所有上传的 JAR 文件的 REST 请求

表 10-9 删除 JAR 文件的 REST 请求

使用 REST 请求从上传的 JAR 文件启动应用程序,如表 10-10 所示。

表 10-10 启动应用程序的 REST 请求

curl 命令启动一个默认并行度为 4 的应用程序:

curl -d '{"parallelism":"4"}' -X POST http://localhost:8081/v1/jars/43e844ef-382f-45c3-aa2f-00549acd961e_App.jar/run

表 10-11、10-12 和 10-13 展示了如何使用 REST API 管理正在运行的应用程序。

表 10-11 列出所有应用程序的 REST 请求

表 10-12 显示应用程序细节的 REST 请求


REST API 还提供了关于应用程序的以下方面的更详细的信息:

 应用程序的 operator 计划

 应用程序的配置

 收集应用程序在各个详细级别的指标

 检查点指标

 背压指标

 导致应用程序失败的异常

有关如何访问这些信息的详细信息,请参阅官方文档。

表 10-13 取消应用程序的 REST 请求

你还可以通过表 10-14 中所示的 REST 请求对正在运行的应用程序生成保存点。

表 10-14  生成应用程序保存点的 REST 请求

curl 命令在不取消应用程序的情况下触发保存点:

curl -d '{"target-directory":"file:///savepoints", "canceljob":"false"}' -X POST

http://localhost:8081/v1/jobs/e99cdb41b422631c8ee2218caa6af1cc/savepoints

{"request-id":"ebde90836b8b9dc2da90e9e7655f4179"}

取消具有保存点的应用程序可能会失败

取消应用程序的请求只有在成功获取保存点时才会成功。如果保存点命令失败,应用程序将继续运行。

检查 ID 为 ebde90836b8b9dc2da90e9e7655f4179 的请求是否成功,并检索保存点路径:

curl -X GET

http://localhost:8081/v1/jobs/e99cdb41b422631c8ee2218caa6af1cc/savepoints/ebde90836b8b9dc2da90e9e7655f4179

{"status":{"id":"COMPLETED"}

"operation":{"location":"file:///savepoints/savepoint-e99cdb-34410597dec0"}}

 

要释放保存点,请使用表 10-15 中所示的 REST 请求

表 10-15 释放一个保存点的 REST 请求

要使用 curl 释放保存点,请运行:

curl -d '{"savepoint-path":"file:///savepoints/savepointe99cdb-34410597"}'

-X POST http://localhost:8081/v1/savepoint-disposal

{"request-id":"217a4ffe935ceac2c281bdded76729d6"}

 

表 10-16 显示了扩展应用程序的 REST 请求

表 10-16  扩展应用程序的 REST 请求

扩展一个应用程序的 curl 到一个新的默认并行度为 16,运行:

curl -X PATCH

http://localhost:8081/v1/jobs/129ced9aacf1618ebca0ba81a4b222c6/rescaling?parallelism=16

{"request-id":"39584c2f742c3594776653f27833e3eb"}

应用程序可能不会被扩展

如果触发的保存点失败,应用程序将继续以原始并行度运行。你可以使用请求 ID 检查重新调整请求的状态。

在容器中打包并部署应用

到目前为止,我们已经解释了如何在正在运行的 Flink 集群上启动应用程序。 这就是我们所说的部署应用程序的框架风格。 在“应用部署”中,我们简单介绍了一种替代方案——不需要运行的 Flink 集群来提交作业的库模式。

在库(library)模式下,应用程序被捆绑到一个 Docker 镜像中,该镜像还包含所需的 Flink 二进制文件。 镜像可以通过两种方式启动——作为 JobMaster 容器或 TaskManager 容器。 当镜像部署为 JobMaster 时,容器会启动一个 Flink master 进程,该进程会立即拿起捆绑的应用程序来启动它。 TaskManager 容器在 JobMaster 上注册自己并提供它的 slot。 一旦有足够的 slot 可用,JobMaster 容器就会部署应用程序以供执行。

运行 Flink 应用程序的库风格类似于在容器化环境中部署微服务。 当部署在容器编排框架上时,例如 Kubernetes 框架。

 

构建作业相关的 Flink Docker 镜像

Apache Flink 提供了一个脚本来构建特定于作业的 Flink Docker 镜像。 该脚本包含在源代码分发和 Flink 的 Git 存储库中。 它不是 Flink 二进制发行版的一部分。

你可以下载并提取 Flink 的源代码发行版,也可以克隆 Git 存储库。 从发行版的基本文件夹开始,脚本位于 ./flink-container/docker/build.sh。

构建脚本创建并注册一个新的 Docker 镜像,该镜像基于 Java Alpine 镜像,这是一个提供 Java 的最小基础镜像。 该脚本需要以下参数:

 Flink 存档的路径

 应用程序 JAR 文件的路径

 新镜像的名称

要使用 Flink 1.7.1 构建包含本书示例应用程序的镜像,请执行以下脚本:

cd ./flink-container/docker

./build.sh \

--from-archive <path-to-Flink-1.7.1-archive> \

--job-jar <path-to-example-apps-JAR-file> \

--image-name flink-book-apps

如果在构建脚本完成后运行 docker images 命令,你应该会看到一个名为 flink-book-apps 的新 Docker 镜像。

./flink-container/docker 目录还包含一个 docker-compose.yml 文件,用于使用 docker-compose 部署 Flink 应用程序。

如果运行以下命令,快速查看 Flink”中的示例应用程序将部署在一个主容器和三个工作容器上到 Docker:

看看 Flink 部署在 Docker 的一个 master 和三个 worker 容器上:

FLINK_DOCKER_IMAGE_NAME=flink-book-jobs \

FLINK_JOB=io.github.streamingwithflink.chapter1.AverageSensorReadings

\

DEFAULT_PARALLELISM=3 \

docker-compose up -d

你可以通过访问运行 http://localhost:8081 的 Web UI 来监控和控制应用程序。

在 Kubernetes 上运行作业相关的 Docker 镜像

如“Kubernetes”中所述,在 Kubernetes 上运行作业相关的 Docker 镜像与在 Kubernetes 上启动 Flink 集群非常相似。 原则上,你只需要调整描述你的部署的 YAML 文件以使用包含作业代码的镜像,并将其配置为在容器启动时自动启动作业。

Flink 为源代码分发中提供的或在项目的 Git 存储库中找到的 YAML 文件提供模板。 从基本目录开始,模板位于:

./flink-container/kubernetes

该目录包含两个模板文件:

 job-cluster-job.yaml.template 将主容器配置为 Kubernetes 作业

 task-manager-deployment.yaml.template 将工作容器配置为 Kubernetes deployment

两个模板文件都包含需要用实际值替换的占位符:

 ${FLINK_IMAGE_NAME}:作业相关的镜像的名称。

 

这个参数还决定了启动的工作容器的数量。如你所见,这些参数与我们在使用 docker-compose 部署特定于作业的镜像时使用的参数相同。 该目录还包含定义 Kubernetes 服务的 YAML 文件 job-cluster-service.yaml。 复制模板文件并配置所需值后,你可以像以前一样使用 kubectl 将应用程序部署到 Kubernetes:

kubectl create -f job-cluster-service.yaml

kubectl create -f job-cluster-job.yaml

kubectl create -f task-manager-deployment.yaml

 在 MINIKUBE 上运行特定于作业的图像

在 Minikube 集群上运行特定于作业的镜像需要比“Kubernetes”中讨论的步骤更多的步骤。 问题在于 Minikube 试图从公共 Docker 镜像注册表而不是你本地机器的 Docker 注册表中获取自定义镜像。

但是,你可以通过运行以下命令将 Docker 配置为将其镜像部署到 Minikube 自己的注册表:

eval $(minikube docker-env)

你之后在此 shell 中构建的所有图像都部署到 Minikube 的图像注册表。 Minikube 需要运行。

此外,你需要将 YAML 文件中的 ImagePullPolicy 设置为 Never 以确保 Minikube 从其自己的注册表中获取图像。                      

一旦特定作业的容器开始运行,你可以将集群视为常规 Flink 集群,如“Kubernetes”中所述。

 

发布于: 2 小时前阅读数: 7
用户头像

还未添加个人签名 2018.05.14 加入

公众号【数据与智能】主理人,个人微信:liuq4360 12 年大数据与 AI相关项目经验, 10 年推荐系统研究及实践经验,目前已经输出了40万字的推荐系统系列精品文章,并有新书即将出版。

评论

发布
暂无评论
Flink 和流式应用运维(十-上)