写点什么

万字长文 | Apache SeaTunnel 分离集群模式部署 K8s 集群实践

作者:白鲸开源
  • 2025-04-22
    广东
  • 本文字数:10244 字

    阅读完需:约 34 分钟

文章作者:雷宝鑫


整理排版:白鲸开源 曾辉


Apache SeaTunnel 官网链接: https://seatunnel.apache.org/


Apache SeaTunnel(以下简称 SeaTunnel)是一款新一代高性能、分布式的数据集成同步工具,正受到业界广泛关注和应用。SeaTunnel 支持三种部署模式:本地模式(Local)、混合集群模式(Hybrid Cluster Mode)和分离集群模式(Separated Cluster Mode)。


本文尝试介绍如何在 K8s 上以分离集群模式部署 SeaTunnel,为有相关需求的伙伴提供完整的部署流程和配置案例参考。

前期准备

在开始部署之前,需要确保以下环境和组件已经准备就绪:


  • Kubernetes 集群环境

  • kubectl 命令行工具

  • docker

  • helm (option)


对于熟悉和具有 Helm 环境的部署,可以直接参考官网中使用 Helm 部署教程:



本文主要介绍基于Kubernetes环境和kubectl工具的方式实现部署。

构建 SeaTunnel Docker 镜像

目前官方已提供各版本的 Docker 镜像,可直接拉取,详细信息可参考官方文档:Set Up With Docker


docker pull apache/seatunnel:<version_tag>
复制代码


由于我们需要部署的是集群模式,接下来需要配置集群间的网络通信。SeaTunnel 集群的网络服务是通过Hazelcast实现的,所以接下来对这部分内容进行配置。

Hazelcast 集群相关配置

Headless Service 配置

Hazelcast 集群是由运行 Hazelcast 的集群成员组成的网络,集群成员自动联合起来形成一个集群,这种自动加入是通过集群成员用于查找彼此的各种发现机制实现的。


Hazelcast 支持以下发现机制:


  • 自动发现机制,支持以下环境:

  • AWS

  • Azure

  • GCP

  • Kubernetes

  • TCP

  • Multicast

  • Eureka

  • Zookeeper


在本文的集群部署中,我们基于HazelcastKubernetes自动发现机制来配置文件,详细的原理可以参考官网文档:Kubernetes Auto Discovery


Hazelcast 的 k8s 自动发现机制(DNS Lookup mode)需要借助于 k8s 的Headless Service功能来实现。


Headless Service在查询服务域名时,会将域名解析为所有匹配PodIP地址列表,以此来实现 Hazelcast 集群成员互相发现彼此。


为此,首先我们创建K8s Headless Service服务:


# use for hazelcast cluster joinapiVersion: v1kind: Servicemetadata:  name: seatunnel-clusterspec:  type: ClusterIP  clusterIP: None  selector:    app.kubernetes.io/instance: seatunnel-cluster-app    app.kubernetes.io/version: 2.3.10  ports:  - port: 5801    name: hazelcast
复制代码


上述配置中的关键部分:


  • metadata.name: seatunnel-cluster: 服务名称,Hazelcast 客户端/节点将通过该名称发现集群

  • spec.clusterIP: None:关键配置,声明为 Headless Service,不分配虚拟 IP

  • spec.selector: 选择器匹配的 Pod 标签,包含相应标签的 pod 会被该 Service 识别和代理

  • spec.port:Hazelcast 的暴露端口


同时,为了能从系统外部利用rest api访问集群,我们定义另一个 Service 来包含 Master 的节点pod


# use for access seatunnel from outside system via rest apiapiVersion: v1kind: Servicemetadata:  name: seatunnel-cluster-masterspec:  type: ClusterIP  clusterIP: None  selector:    app.kubernetes.io/instance: seatunnel-cluster-app    app.kubernetes.io/version: 2.3.10    app.kubernetes.io/name: seatunnel-cluster-master    app.kubernetes.io/component: master  ports:  - port: 8080    name: "master-port"    targetPort: 8080    protocol: TCP
复制代码


定义好上述 K8s 的 Service 服务后,接下来根据 Hazelcast 的 k8s 发现机制来配置hazelcast-master.yamlhazelcast-worker.yaml文件。

Hazelcast master 和 worker 的 yaml 配置

对于 SeaTunnel 分离集群模式来说,所有网络相关的配置都在hazelcast-master.yamlhazelcast-worker.yaml文件中。


hazelcast-master.yaml的配置如下所示:


hazelcast:  cluster-name: seatunnel-cluster  network:    rest-api:      enabled: true      endpoint-groups:        CLUSTER_WRITE:          enabled: true        DATA:          enabled: true    join:      kubernetes:        enabled: true        service-dns: seatunnel-cluster.bigdata.svc.cluster.local        service-port: 5801    port:      auto-increment: false      port: 5801  properties:    hazelcast.invocation.max.retry.count: 20    hazelcast.tcp.join.port.try.count: 30    hazelcast.logging.type: log4j2    hazelcast.operation.generic.thread.count: 50    hazelcast.heartbeat.failuredetector.type: phi-accrual    hazelcast.heartbeat.interval.seconds: 30    hazelcast.max.no.heartbeat.seconds: 300    hazelcast.heartbeat.phiaccrual.failuredetector.threshold: 15    hazelcast.heartbeat.phiaccrual.failuredetector.sample.size: 200    hazelcast.heartbeat.phiaccrual.failuredetector.min.std.dev.millis: 200
复制代码


上述配置文件中的关键配置项如下:

cluster-name

该配置用于确定多个节点是否属于同一个集群,即只有相同 cluster-name 的节点才会属于同一个集群。如果两个节点之间的 cluster-name 名称不同,Hazelcast 将会拒绝服务请求。

网络配置
  • rest-api.enabled:在 ST 2.3.10 版本中 Hazelcast REST 服务默认在配置中禁用,需要手动显式指定开启。

  • service-dns(必填):Headless Service 的完整域名,通常为 {NAMESPACE}.svc.cluster.local。

  • service-port(可选):Hazelcast 端口;如果指定的值大于 0,则覆盖默认值(默认端口 = 5701)


使用上述基于 k8s 的 join 机制,在 Hazelcast Pod 启动时会解析 service-dns,获取所有成员 pod 的 IP 列表(通过Headless Service),然后成员之间通过5801端口尝试建立 TCP 连接。


同样的,对于hazelcast-worker.yaml配置文件如下所示:


hazelcast:  cluster-name: seatunnel-cluster  network:    rest-api:      enabled: true      endpoint-groups:        CLUSTER_WRITE:          enabled: true        DATA:          enabled: true    join:      kubernetes:        enabled: true        service-dns: seatunnel-cluster.bigdata.svc.cluster.local        service-port: 5801    port:      auto-increment: false      port: 5801  properties:    hazelcast.invocation.max.retry.count: 20    hazelcast.tcp.join.port.try.count: 30    hazelcast.logging.type: log4j2    hazelcast.operation.generic.thread.count: 50    hazelcast.heartbeat.failuredetector.type: phi-accrual    hazelcast.heartbeat.interval.seconds: 30    hazelcast.max.no.heartbeat.seconds: 300    hazelcast.heartbeat.phiaccrual.failuredetector.threshold: 15    hazelcast.heartbeat.phiaccrual.failuredetector.sample.size: 200    hazelcast.heartbeat.phiaccrual.failuredetector.min.std.dev.millis: 200  member-attributes:    rule:      type: string      value: worker
复制代码


通过上述流程,我们就创建好了与 Hazelcast 集群相关的配置和服务,实现了 Hazecast 基于 Kubernetes 的集群成员发现。


接下来,继续完成有关 SeaTunnel 引擎的相关配置。

配置 SeaTunnel 引擎

SeaTunnel 引擎的相关配置都在seatunnel.yaml文件中,下面给出seatunnel.yaml配置示例以供参考:


seatunnel:  engine:    history-job-expire-minutes: 1440    backup-count: 1    queue-type: blockingqueue    print-execution-info-interval: 60    print-job-metrics-info-interval: 60    classloader-cache-mode: true    http:      enable-http: true      port: 8080      enable-dynamic-port: false      port-range: 100    slot-service:      dynamic-slot: true    checkpoint:      interval: 300000      timeout: 60000      storage:        type: hdfs        max-retained: 3        plugin-config:          namespace: /tmp/seatunnel/checkpoint_snapshot          storage.type: hdfs          fs.defaultFS: hdfs://xxx:8020 # Ensure that the directory has written permission    telemetry:      metric:        enabled: true
复制代码


包含以下配置信息:


  • history-job-expire-minutes:任务历史记录保留时长为 24 小时(1440 分钟),超时自动清理。

  • backup-count: 1:任务状态备份副本数为 1。

  • queue-type: blockingqueue:使用阻塞队列管理任务,避免资源耗尽。

  • print-execution-info-interval: 60:每分钟打印一次任务执行状态。

  • print-job-metrics-info-interval: 60:每分钟输出一次任务指标(如吞吐量、延迟)。

  • classloader-cache-mode: true:启用类加载缓存,减少重复加载开销,提升性能。

  • dynamic-slot: true:允许根据负载动态调整任务槽(Slot)数量,优化资源利用率。

  • checkpoint.interval: 300000:每 5 分钟触发一次检查点(Checkpoint)。

  • checkpoint.timeout: 60000:检查点超时时间为 1 分钟。

  • telemetry.metric.enabled: true:启用任务运行指标采集(如延迟、吞吐量),便于监控。

创建 k8s yaml 文件部署应用

在完成上面的工作流程后,我们就可以进入到最后一步:创建 Master 和 Worker 节点的 k8s yaml 文件定义部署的相关配置。


为了将配置文件与应用程序解耦,我们将上文中列出的配置文件合并到一个 ConfigMap 中,并挂载到容器的配置路径下,便于对配置文件的统一管理和更新。


以下是针对 seatunnel-cluster-master.yamlseatunnel-cluster-worker.yaml 的配置示例,涵盖了配置 ConfigMap 挂载、容器启动命令以及部署资源定义等相关内容。


seatunnel-cluster-master.yaml:


apiVersion: apps/v1kind: Deploymentmetadata:  name: seatunnel-cluster-masterspec:  replicas: 2  # modify replicas according to your case  strategy:    type: RollingUpdate    rollingUpdate:      maxUnavailable: 25%      maxSurge: 50%  selector:    matchLabels:      app.kubernetes.io/instance: seatunnel-cluster-app      app.kubernetes.io/version: 2.3.10      app.kubernetes.io/name: seatunnel-cluster-master      app.kubernetes.io/component: master  template:    metadata:      annotations:        prometheus.io/path: /hazelcast/rest/instance/metrics        prometheus.io/port: "5801"        prometheus.io/scrape: "true"        prometheus.io/role: "seatunnel-master"      labels:        app.kubernetes.io/instance: seatunnel-cluster-app        app.kubernetes.io/version: 2.3.10        app.kubernetes.io/name: seatunnel-cluster-master        app.kubernetes.io/component: master    spec:      affinity:        nodeAffinity:          requiredDuringSchedulingIgnoredDuringExecution:            nodeSelectorTerms:            - matchExpressions:              - key: nodeAffinity-key                operator: Exists      containers:        - name: seatunnel-master          image: seatunnel:2.3.10          imagePullPolicy: IfNotPresent          ports:            - containerPort: 5801              name: hazelcast            - containerPort: 8080              name: "master-port"          command:            - /opt/seatunnel/bin/seatunnel-cluster.sh            - -r            - master          resources:            requests:              cpu: "1"              memory: 4G          volumeMounts:            - mountPath: "/opt/seatunnel/config/hazelcast-master.yaml"              name: seatunnel-configs              subPath: hazelcast-master.yaml            - mountPath: "/opt/seatunnel/config/hazelcast-worker.yaml"              name: seatunnel-configs              subPath: hazelcast-worker.yaml            - mountPath: "/opt/seatunnel/config/seatunnel.yaml"              name: seatunnel-configs              subPath: seatunnel.yaml            - mountPath: "/opt/seatunnel/config/hazelcast-client.yaml"              name: seatunnel-configs              subPath: hazelcast-client.yaml            - mountPath: "/opt/seatunnel/config/log4j2_client.properties"              name: seatunnel-configs              subPath: log4j2_client.properties            - mountPath: "/opt/seatunnel/config/log4j2.properties"              name: seatunnel-configs              subPath: log4j2.properties
volumes: - name: seatunnel-configs configMap: name: seatunnel-cluster-configs
复制代码
部署策略
  • 采用多副本(replicas=2)部署确保服务高可用

  • 滚动更新策略(RollingUpdate)实现零停机部署:

  • maxUnavailable: 25%:保证更新期间至少 75%的 Pod 保持运行

  • maxSurge: 50%:允许临时增加 50%的 Pod 资源用于平滑过渡

标签选择器
  • 采用 Kubernetes 推荐的标准标签体系

  • spec.selector.matchLabels: 根据标签定义 Deployment 管理 Pod 的范围

  • spec.template.labels: 定义新创建 Pod 的标签,标识 Pod 的元数据。

节点亲和性
  • 配置affinity属性指定 Pod 调度的节点,需要根据自己 k8s 环境的节点标签进行替换。

配置文件挂载
  • 核心配置文件统一管理在ConfigMap中,便于管理以及与应用程序解耦

  • 通过 subPath 指定挂载的单个文件


seatunnel-cluster-worker.yaml配置文件如下:


apiVersion: apps/v1kind: Deploymentmetadata:  name: seatunnel-cluster-workerspec:  replicas: 3  # modify replicas according to your case  strategy:    type: RollingUpdate    rollingUpdate:      maxUnavailable: 25%      maxSurge: 50%  selector:    matchLabels:      app.kubernetes.io/instance: seatunnel-cluster-app      app.kubernetes.io/version: 2.3.10      app.kubernetes.io/name: seatunnel-cluster-worker      app.kubernetes.io/component: worker  template:    metadata:      annotations:        prometheus.io/path: /hazelcast/rest/instance/metrics        prometheus.io/port: "5801"        prometheus.io/scrape: "true"        prometheus.io/role: "seatunnel-worker"      labels:        app.kubernetes.io/instance: seatunnel-cluster-app        app.kubernetes.io/version: 2.3.10        app.kubernetes.io/name: seatunnel-cluster-worker        app.kubernetes.io/component: worker    spec:      affinity:        nodeAffinity:          requiredDuringSchedulingIgnoredDuringExecution:            nodeSelectorTerms:            - matchExpressions:              - key: nodeAffinity-key                operator: Exists      containers:        - name: seatunnel-worker          image: seatunnel:2.3.10          imagePullPolicy: IfNotPresent          ports:            - containerPort: 5801              name: hazelcast          command:            - /opt/seatunnel/bin/seatunnel-cluster.sh            - -r            - worker          resources:            requests:              cpu: "1"              memory: 10G          volumeMounts:            - mountPath: "/opt/seatunnel/config/hazelcast-master.yaml"              name: seatunnel-configs              subPath: hazelcast-master.yaml            - mountPath: "/opt/seatunnel/config/hazelcast-worker.yaml"              name: seatunnel-configs              subPath: hazelcast-worker.yaml            - mountPath: "/opt/seatunnel/config/seatunnel.yaml"              name: seatunnel-configs              subPath: seatunnel.yaml            - mountPath: "/opt/seatunnel/config/hazelcast-client.yaml"              name: seatunnel-configs              subPath: hazelcast-client.yaml            - mountPath: "/opt/seatunnel/config/log4j2_client.properties"              name: seatunnel-configs              subPath: log4j2_client.properties            - mountPath: "/opt/seatunnel/config/log4j2.properties"              name: seatunnel-configs              subPath: log4j2.properties
volumes: - name: seatunnel-configs configMap: name: seatunnel-cluster-configs
复制代码


定义好上述 master 和 worker 的 yaml 文件后,就可以执行以下命令进行部署到 k8s 集群了:


kubectl apply -f seatunnel-cluster-master.yamlkubectl apply -f seatunnel-cluster-worker.yaml
复制代码


正常情况下会看到 SeaTunnel 集群中共有 2 个 master 节点和 3 个 worker 节点:


$ kubectl get pods | grep seatunnel-cluster
seatunnel-cluster-master-6989898f66-6fjz8 1/1 Running 0 156mseatunnel-cluster-master-6989898f66-hbtdn 1/1 Running 0 155mseatunnel-cluster-worker-87fb469f7-5c96x 1/1 Running 0 156mseatunnel-cluster-worker-87fb469f7-7kt2h 1/1 Running 0 155mseatunnel-cluster-worker-87fb469f7-drm9r 1/1 Running 0 156m
复制代码


至此,我们已成功在 Kubernetes 环境中以分离集群模式部署了 SeaTunnel 集群。


如今,集群已就绪,如何在客户端向其提交任务呢?

客户端提交任务到集群

使用命令行工具提交任务

有关 SeaTunnel 客户端的配置都在 hazelcast-client.yaml 文件中。


首先需要在客户端本地下载二进制安装包(包含 bin、config 文件),并保证 SeaTunnel 的安装路径与服务端一致,这也就是官网中所说的:Setting the SEATUNNEL_HOME the same as the server,否则,可能会导致出现诸如无法在服务器端找到连接器插件路径等错误(因为服务端插件路径与客户端路径不一致)。


进入安装路径下,只需要修改config/hazelcast-client.yaml文件,配置指向刚刚创建的Headless Service服务地址即可:


hazelcast-client:      cluster-name: seatunnel-cluster      properties:        hazelcast.logging.type: log4j2      connection-strategy:        connection-retry:          cluster-connect-timeout-millis: 3000      network:        cluster-members:          - seatunnel-cluster.bigdata.svc.cluster.local:5801
复制代码


客户端配置完成后,即可将任务提交至集群执行。任务提交时的 JVM 参数配置方式主要有两种:


  • config/jvm_client_options文件中配置任务提交时的 JVM 参数

  • 此方法配置的 JVM 参数将应用于所有通过seatunnel.sh提交的任务,无论运行于本地模式还是集群模式。所有提交的任务都将共享相同的 JVM 参数配置。

  • 在提交任务的命令行中指定 JVM 参数。

  • 使用seatunnel.sh提交任务时,可在命令行中直接指定 JVM 参数,例如:sh bin/seatunnel.sh --config $SEATUNNEL_HOME/config/v2.batch.config.template -DJvmOption=-Xms2G -Xmx2G。此方法允许为每个提交的任务独立配置 JVM 参数。


接下来通过一个案例来演示客户端提交任务至集群执行的完整流程:


env {  parallelism = 2  job.mode = "STREAMING"  checkpoint.interval = 2000}
source { FakeSource { parallelism = 2 plugin_output = "fake" row.num = 16 schema = { fields { name = "string" age = "int" } } }}
sink { Console { }}
复制代码


在客户端使用以下命令提交任务:


sh bin/seatunnel.sh --config config/v2.streaming.example.template -m cluster -n st.example.template -DJvmOption="-Xms2G -Xmx2G"
复制代码


在 Master 节点,使用如下命令列出正在运行的任务列表:


$ sh bin/seatunnel.sh -l
Job ID Job Name Job Status Submit Time Finished Time ------------------ ------------------- ---------- ----------------------- ----------------------- 964354250769432580 st.example.template RUNNING 2025-04-15 10:39:30.588
复制代码


可以看到,我们刚刚向集群中提交的st.example.template任务已经处于 RUNNING 状态了。现在我们可以在 Worker 节点日志中看到如下日志打印:


2025-04-15 10:34:41,998 INFO  [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=0  rowIndex=1:  SeaTunnelRow#tableId=fake SeaTunnelRow#kind=INSERT : bdaUB, 1103480492025-04-15 10:34:41,998 INFO  [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=1  rowIndex=1:  SeaTunnelRow#tableId=fake SeaTunnelRow#kind=INSERT : mOifY, 19745390872025-04-15 10:34:41,999 INFO  [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=0  rowIndex=2:  SeaTunnelRow#tableId=fake SeaTunnelRow#kind=INSERT : jKFrR, 18280477422025-04-15 10:34:41,999 INFO  [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=1  rowIndex=2:  SeaTunnelRow#tableId=fake SeaTunnelRow#kind=INSERT : gDiqR, 11775447962025-04-15 10:34:41,999 INFO  [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=0  rowIndex=3:  SeaTunnelRow#tableId=fake SeaTunnelRow#kind=INSERT : bCVxc, 909343602...
复制代码


说明我们的任务成功提交至所创建的 SeaTunnel 集群,并且确认其正常运行。

使用 Rest Api 接口提交任务

SeaTunnel 提供了通过 Rest Api 接口的方式来查询运行作业的状态和统计信息,以及提交/停止作业等操作。


在上文中我们配置了只包含 Master 节点的 Headless Service,并指定暴露的端口为8080。因此,我们就可以在客户端使用Rest API接口的方式来实现任务的提交。


SeaTunnel Rest API接口提供了通过上传配置文件来提交任务,命令如下:


 $ curl 'http://seatunnel-cluster-master.bigdata.svc.cluster.local:8080/submit-job/upload' --form 'config_file=@"/opt/seatunnel/config/v2.streaming.example.template"' --form 'jobName=st.example.template'  {"jobId":"964553575034257409","jobName":"st.example.template"}
复制代码


如果作业提交成功,会返回jobIdjobName,如上所示。


接下来,通过Rest API接口获取集群正在运行的所有任务,观察刚刚提交的任务信息:


curl 'http://seatunnel-cluster-master.bigdata.svc.colo.gzgalocal:8080/running-jobs'[{"jobId":"964553575034257409","jobName":"st.example.template","jobStatus":"RUNNING","envOptions":{"job.mode":"STREAMING","checkpoint.interval":"2000","parallelism":"2"}, ...]
复制代码


可以看到接口返回显示了任务状态和其他额外的元数据信息,说明我们通过 Rest Api 接口提交任务的方式也成功执行。更多 Rest Api 接口介绍可以参考官网:RESTful API V2

总结

本文着重介绍了如何以推荐的分离集群模式(Separated Cluster Mode)部署 k8s 集群的实践,总结下来,部署过程主要包含以下步骤:


  1. 准备 Kubernetes 环境

  2. 确保已搭建并运行一个可用的 Kubernetes 集群,并安装所有必要的组件。

  3. 构建 SeaTunnel Docker 镜像

  4. 如果没有二次开发需求,可直接使用官方提供的镜像。否则,在本地编译打包后,编写 Dockerfile 并构建 SeaTunnel 镜像。

  5. 配置 Headless Service 和 Hazelcast 集群

  6. Hazelcast 的 k8s 自动发现机制的 DNS Lookup 模式是基于 k8s 的 Headless Service 功能来实现的,因此首先创建 Headless Service 服务,并在 hazelcast 的 yaml 配置文件中通过 service-dns 来指定服务地址。

  7. Headless Service 会在域名解析时解析成所包含 pod 的 IP 地址集合,以此实现 hazelcast 集群成员之间的彼此发现。

  8. 配置 SeaTunnel 引擎

  9. 修改 seatunnel.yaml 文件,配置 SeaTunnel 引擎参数。

  10. 创建 k8s yaml 部署文件

  11. 分别创建 Master 和 Worker 的 k8s yaml 文件,配置节点标签、启动命令、资源和数据卷挂载等内容,最终将其部署到 k8s 集群。

  12. 配置 SeaTunnel 客户端

  13. 在客户端安装 SeaTunnel,并确保客户端的安装路径 (SEATUNNEL_HOME) 与服务端一致。修改 hazelcast-client.yaml 文件,配置客户端连接到集群 Service 服务的地址。

  14. 任务提交与执行:

  15. 完成以上步骤后,即可在客户端提交任务并由 SeaTunnel 集群执行。


本文上述配置案例仅供参考,可能仍有很多配置项和配置内容未涉及,欢迎各位补充与讨论,希望有各位有所帮助!

用户头像

白鲸开源

关注

一家开源原生的DataOps商业公司。 2022-03-18 加入

致力于打造下一代开源原生的DataOps 平台,助力企业在大数据和云时代,智能化地完成多数据源、多云及信创环境的数据集成、调度开发和治理,以提高企业解决数据问题的效率,提升企业分析洞察能力和决策能力。

评论

发布
暂无评论
万字长文 | Apache SeaTunnel 分离集群模式部署 K8s 集群实践_白鲸开源_InfoQ写作社区