写点什么

K8s Application 模式下的 flink 任务执行精要

  • 2025-09-18
    北京
  • 本文字数:4069 字

    阅读完需:约 13 分钟

本文分享自天翼云开发者社区《K8s Application模式下的flink任务执行精要》,作者:l****n

构键 k8s 集群

  1. 在这里,我们需要搭建一个 K8S 环境用于提供 flink 任务的运行时环境。在这里推荐使用 kubeadm 或者一些脚本工具搭建,可参考本自动k8s脚本工具。具体过程在这里省略,可以参考上述链接中的文档进行操作。

  2. 需要注意的是,我们需要在相应用户的目录下提供一个 kubeconfig 文件,如下图所示,通过该文件,StreamPark 才能顺利地调用 K8S 客户端提交任务,该 config 的内容为与 K8S 的 ApiServer 进行连接时需要使用的信息。


提供 flink 运行任务的环境

  1. 将 kubeconfig 提供出来,供 flink 客户端调用

  2. 在这里主要提供一个供 flink 使用命名空间、和 sa

# 创建namespacekubectl create ns flink-dev# 创建serviceaccountkubectl create serviceaccount flink-service-account -n flink-dev# 用户授权kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink-dev:flink-service-account
复制代码

下载 flink 客户端

flink 客户端是控制 flink 的核心,需要下载并部署

wget https://archive.apache.org/dist/flink/flink-1.14.3/flink-1.14.3-bin-scala_2.12.tgztar -xf flink-1.14.3-bin-scala_2.12.tgz
复制代码

任务编程

任务 jar 生成过程

在这里,主要提供一个 flink 任务案例供 flink k8s application 进行调用

  1. 开发 java 代码,供使用,本示例项目较为简单,仅为将数据输出至 mysql 中,调用 mysql-connector 进行实现

package cn.ctyun.demo;import org.apache.flink.api.java.utils.ParameterTool;import org.apache.flink.connector.jdbc.JdbcConnectionOptions;import org.apache.flink.connector.jdbc.JdbcSink;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class SinkToMySQL {   public static void main(String[] args) throws Exception {//       从启动参数中获取连接信息       ParameterTool parameterTool = ParameterTool.fromArgs(new String[]{"url", "passwd", "user"});       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();       env.setParallelism(1);       DataStreamSource<Event> stream = env.fromElements(               new Event("Mary", "./home", 1000L),               new Event("Bob", "./cart", 2000L),               new Event("Alice", "./prod?id=100", 3000L),               new Event("Alice", "./prod?id=200", 3500L),               new Event("Bob", "./prod?id=2", 2500L),               new Event("Alice", "./prod?id=300", 3600L),               new Event("Bob", "./home", 3000L),               new Event("Bob", "./prod?id=1", 2300L),               new Event("Bob", "./prod?id=3", 3300L));       stream.addSink(               JdbcSink.sink(                       "INSERT INTO clicks (user, url) VALUES (?, ?)",                      (statement, r) -> {                           statement.setString(1, r.user);                           statement.setString(2, r.url);                      },                       new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()                              .withUrl(parameterTool.get("url"))                              .withDriverName("com.mysql.jdbc.Driver")                              .withUsername(parameterTool.get("user"))                              .withPassword(parameterTool.get("passwd"))                              .build()              )      );       env.execute();  }}
复制代码
  1. 项目打包

防止一些依赖缺失,这里使用 fatjar 的方式进行打包,注意,这里使用了 jar-with-dependencies 方法进行打包,即将依赖全部打入到相应的 jar 包中,这样可以防止平台上的 flink 因为以来缺失问题导致无法使用 flink 程序。maven 相关的设置如下所示:

<plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>3.0.0</version><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin>
复制代码

之后通过命令mvn package进行打包,注意将打包后带有 with-dependencies.jar 后缀的 jar 包留下。以供使用

  1. 制作镜像,在这里通过官方镜像作为基础镜像进行构建,

使用 docker 进行镜像生成,使用命令为 docker build -t /flink-demo-jar-job:1.0-SNAPSHOT .

FROM apache/flink:1.14.3-scala_2.12RUN mkdir -p $FLINK_HOME/usrlibCOPY lib $FLINK_HOME/lib/COPY flink-demo-jar-job-1.0-SNAPSHOT-jar-with-dependencies.jar $FLINK_HOME/usrlib/flink-demo-jar-job-1.0-SNAPSHOT-jar-with-dependencies.jar
复制代码
  1. 推送镜像

这里推送镜像一般会推送到默认的 dockerhub 相应的仓库。如果需要 push 到自己的镜像仓,则需要修改相应的镜像前缀 ${docker_repository}为自己的镜像仓位置

docker push ${docker_repository}/flink-demo-jar-job:1.0-SNAPSHOT

docker push ${docker_repository}/flink-demo-jar-job:1.0-SNAPSHOT
复制代码


k8s Application 运行

Application 模式架构

在 k8s application 模式下,用户只需要通过 Flink Client/CLI 启动作业。首先通过 K8s 启动 JobManager(deployment)的同时启动作业,然后通过 JobManager 内部的 K8sResourceManager 模块向 K8s 直接申请 TaskManager 的资源并启动,最后当 TM 注册到 JM 后作业就提交到 TM。用户在整个过程无需指定 TaskManager 资源的数量,而是由 JobManager 向 K8s 按需申请的。

启动命令

这里我们可以指定一定的运行参数,相关的参数设定方案请参考官方文档https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/deployment/config/#kubernetes

./bin/flink run-application \--target kubernetes-application \-Dkubernetes.cluster-id=flink-cluster \# 指定容器启动的镜像(与之前提交的保持一致)-Dkubernetes.container.image=****/flink-demo-jar-job:1.0-SNAPSHOT \-Dkubernetes.jobmanager.replicas=1 \# 指定容器运行的命名空间-Dkubernetes.namespace=flink-dev \-Dkubernetes.jobmanager.service-account=flink-service-account \-Dkubernetes.taskmanager.cpu=1 \-Dtaskmanager.memory.process.size=4096mb \-Dkubernetes.jobmanager.cpu=1 \-Djobmanager.memory.process.size=4096mb \-Dkubernetes.rest-service.exposed.type=NodePort \-Dclassloader.resolve-order=parent-first \# yaml 模板,为解决hosts映射,后续可以通过编排此yaml文件,实现动态替换启动jar包、配置文件和持久化一些数据# -Dkubernetes.pod-template-file=/opt/flink-1.14.2/flink-templeta.yaml \# Main方法-c cn.ctyun.demo.SinkToMySQL \# 启动Jar包和启动配置文件的绝对路径(容器内部,不是宿主机)local:///usr/local/flink/lib/flink-realtime-1.0-SNAPSHOT.jar \# 如下将提供mysql的连接信息,通过参数的方式传递给jar包--passwd ****** \--user ******\--url ******
复制代码

PodTemplate

PodTemplate 主要是通过指定 pod 的启动样例,在 podtemplate 中可以指定域名、挂载路径、配置文件、初始化容器等信息,如下给出一个提供一个持久化保存日志的 PodTemplate。

apiVersion: v1kind: Podmetadata:name: jobmanager-pod-templatespec:initContainers:  - name: artifacts-fetcher    image: artifacts-fetcher:latest     # Use wget or other tools to get user jars from remote storage    command: [ 'wget', 'https://path/of/StateMachineExample.jar', '-O', '/flink-artifact/myjob.jar' ]    volumeMounts:      - mountPath: /flink-artifact        name: flink-artifactcontainers:   # Do not change the main container name  - name: flink-main-container    resources:      requests:        ephemeral-storage: 2048Mi      limits:        ephemeral-storage: 2048Mi    volumeMounts:      - mountPath: /opt/flink/volumes/hostpath        name: flink-volume-hostpath      - mountPath: /opt/flink/artifacts        name: flink-artifact      - mountPath: /opt/flink/log        name: flink-logs     # Use sidecar container to push logs to remote storage or do some other debugging things  - name: sidecar-log-collector    image: sidecar-log-collector:latest    command: [ 'command-to-upload', '/remote/path/of/flink-logs/' ]    volumeMounts:      - mountPath: /flink-logs        name: flink-logsvolumes:  - name: flink-volume-hostpath    hostPath:      path: /tmp      type: Directory  - name: flink-artifact    emptyDir: { }  - name: flink-logs    emptyDir: { }
复制代码

可知,通过如上的配置文件,启动 taskmanager、JobManager 后将能够提供挂载功能,能够将主容器中存储日志的目录进行挂载,供另一个容器 artifacts-fetcher 获取并通过其内置脚本 command-to-upload 实时将日志进行上传。该功能是 flink 官方提供的一种通过 podtemplate 方法解决 flink 中日志持久化问题的一个案例,具体 podTemplate 的使用需要结合实际需求场景进行调整。

用户头像

还未添加个人签名 2022-02-22 加入

天翼云是中国电信倾力打造的云服务品牌,致力于成为领先的云计算服务提供商。提供云主机、CDN、云电脑、大数据及AI等全线产品和场景化解决方案。

评论

发布
暂无评论
K8s Application模式下的flink任务执行精要_k8s_天翼云开发者社区_InfoQ写作社区