写点什么

Spark 系列教程(2)运行模式介绍

用户头像
Se7en
关注
发布于: 刚刚
Spark 系列教程(2)运行模式介绍

Spark 运行模式

Apache Spark 是用于大规模数据处理的统一分析引擎,它提供了 Java、Scala、Python 和 R 语言的高级 API,以及一个支持通用的执行图计算的优化引擎。


Spark Core 是 Spark 的核心模块,负责任务调度、内存管理等功能。Spark Core 的实现依赖于 RDD(Resilient Distributed Datasets,弹性分布式数据集)的程序抽象概念。


在 Spark Core 的基础上,Spark 提供了一系列面向不同应用需求的组件,包括使用 SQL 进行结构化数据处理的 Spark SQL、用于实时流处理的 Spark Streaming、用于机器学习的 MLlib 以及用于图处理的 GraphX。


Spark 本身并没有提供分布式文件系统,因而 Spark 的数据存储主要依赖于 HDFS,也可以使用 HBase 和 S3 等作为存储层。


Spark 有多种运行模式:


  • 1.可以运行在一台机器上,称为 Local(本地)运行模式。

  • 2.可以使用 Spark 自带的资源调度系统,称为 Standalone 模式。

  • 3.可以使用 Yarn、Mesos、Kubernetes 作为底层资源调度系统,称为 Spark On Yarn、Spark On Mesos、Spark On K8S。


Client 和 Cluster 提交模式

Driver 是 Spark 中的主控进程,负责执行应用程序的 main() 方法,创建 SparkContext 对象,负责与 Spark 集群进行交互,提交 Spark 作业,并将作业转化为 Task(一个作业由多个 Task 任务组成),然后在各个 Executor 进程间对 Task 进行调度和监控。


根据应用程序提交方式的不同,Driver 在集群中的位置也有所不同,应用程序提交方式主要有两种:Client 和 Cluster,默认是 Client,可以在向 Spark 集群提交应用程序时使用 --deploy-mode 参数指定提交方式。


3 种运行模式部署

Local 模式

Local 模式的部署方式比较简单,只需下载安装包并解压就可以使用了。具体可以参考上一章的 Spark 系列教程(1)Word Count 的介绍,本文就不再赘述了。


在 spark-shell 交互式界面执行一个简单的计算,取出 0~99 之间的值。


❯ bin/spark-shell21/10/07 11:50:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicableUsing Spark's default log4j profile: org/apache/spark/log4j-defaults.propertiesSetting default log level to "WARN".To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).Spark context Web UI available at http://chengzw:4040Spark context available as 'sc' (master = local[*], app id = local-1633578611004).Spark session available as 'spark'.Welcome to      ____              __     / __/__  ___ _____/ /__    _\ \/ _ \/ _ `/ __/  '_/   /___/ .__/\_,_/_/ /_/\_\   version 3.1.2      /_/
Using Scala version 2.12.10 (OpenJDK 64-Bit Server VM, Java 1.8.0_302)Type in expressions to have them evaluated.Type :help for more information.
scala> val range = spark.range(100)range: org.apache.spark.sql.Dataset[Long] = [id: bigint]
scala> range.collect()res0: Array[Long] = Array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99)
复制代码

Standalone 模式

在 Spark Standalone 模式中,资源调度是由 Spark 自己实现的。 Spark Standalone 模式是 Master-Slaves 架构的集群模式,和大部分的 Master-Slaves 结构的集群一样,存在着 Master 单点故障的问题。对于单点故障的问题,Spark 提供了两种方案:


  • 基于文件系统的单点恢复(Single-Node Recovery with Local File System),将 Application 和 Worker 的注册信息写入文件中,当 Master 宕机时,可以重新启动 Master 进程恢复工作。该方式只适用于开发或测试环境。

  • 基于 Zookeeper 的 Standby Masters(Standby Masters with ZooKeeper)。ZooKeeper 提供了一个 Leader Election 机制,利用这个机制可以保证虽然集群存在多个 Master,但是只有一个是 Active 的,其他的都是 Standby。当 Active 的 Master 出现故障时,另外的一个 Standby Master 会被选举出来,对于恢复期间正在运行的应用程序,由于 Application 在运行前已经向 Master 申请了资源,运行时 Driver 负责与 Executor 进行通信,管理整个 Application,因此 Master 的故障对 Application 的运行不会造成影响,但是会影响新的 Application 的提交。接下来将介绍 Spark Standalone 模式基于 Zookeeper 的 HA 高可用部署。


前提条件

Host 设置

编辑 /etc/hosts 文件:


192.168.1.117 hadoop1192.168.1.118 hadoop2192.168.1.119 hadoop3
复制代码


拷贝到其他两台机器上:


scp  /etc/hosts root@hadoop2:/etc/hostsscp  /etc/hosts root@hadoop3:/etc/hosts
复制代码
配置免密登录

为了方便后续拷贝文件以及执行脚本,配置 SSH 免密登录。 在 hadoop1 上生成 RSA 非对称密钥对:


[root@hadoop1 hadoop]# ssh-keygen Generating public/private rsa key pair.Enter file in which to save the key (/root/.ssh/id_rsa): Enter passphrase (empty for no passphrase): Enter same passphrase again: Your identification has been saved in /root/.ssh/id_rsa.Your public key has been saved in /root/.ssh/id_rsa.pub.The key fingerprint is:SHA256:wkMiPVpbBtjoZwBIpyvvluYtfQM9hQeHtgBFVfrwL1I root@hadoop1The key's randomart image is:+---[RSA 2048]----+|+o.O+..o.        ||. *.o.+..        || o..=o*=         ||  o+oOo+o        ||...o..+oE        ||..  . o+ .       ||  .o .... .      || .=.. o. .       || +o... .         |+----[SHA256]-----+
复制代码


将公钥拷贝到集群中的其他机器:


[root@hadoop1 hadoop]# ssh-copy-id root@hadoop1[root@hadoop1 hadoop]# ssh-copy-id root@hadoop2[root@hadoop1 hadoop]# ssh-copy-id root@hadoop3
复制代码
安装 Java

进入 Oracle 官网 下载并解压 JDK 安装包。设置环境变量,编辑 vim /etc/profile:


export JAVA_HOME=/software/jdkexport PATH=$PATH:$JAVA_HOME/bin
复制代码

Zookeeper 集群部署

下载并解压安装包
wget https://archive.apache.org/dist/zookeeper/zookeeper-3.5.8/apache-zookeeper-3.5.8-bin.tar.gztar -xzvf apache-zookeeper-3.5.8-bin.tar.gzmv apache-zookeeper-3.5.8 /software/zk
复制代码
编辑配置文件

编辑 zk/conf/zoo.cfg 文件:


#用于配置 Zookeeper 中最小时间单位的长度,单位是毫秒tickTime=2000#该参数用于配置 Leader 服务器等待 Follower 启动,并完成数据同步的时间#乘上 tickTime 得到具体时间:10 * 2000 = 20000 毫秒initLimit=10
#Leader 与 Follower 心跳检测的超时时间。#乘上 tickTime 得到具体时间:5 * 2000 = 10000 毫秒syncLimit=5
#数据存放目录dataDir=/software/zk/data
#客户端连接端口clientPort=2181
#Zookeeper集群成员地址#2888端口用于集群间通信,leader会监听此端口#3888端口用于leader选举server.1=hadoop1:2888:3888server.2=hadoop2:2888:3888server.3=hadoop3:2888:3888
复制代码


同步修改后的配置文件到集群的其他节点:


scp -r zk root@hadoop2:/software/scp -r zk root@hadoop3:/software/
复制代码
标识 Server ID
#在 hadoop1 节点上执行echo 1 > /root/zookeeper-cluster/zk1/myid#在 hadoop2 节点上执行echo 2 > /root/zookeeper-cluster/zk2/myid#在 hadoop3 节点上执行echo 3 > /root/zookeeper-cluster/zk3/myid
复制代码
启动 Zookeeper 集群

分别在 3 台节点上执行以下命令启动 Zookeeper:


zk/bin/zkServer.sh start
复制代码
查看 Zookeeper 集群状态

分别在 3 台节点上查看 Zookeeper 状态,可以看到此时 hadoop2 节点为 Zookeeper 的 Master 节点。


hadoop1 节点:


[root@hadoop1 software]# zk/bin/zkServer.sh statusZooKeeper JMX enabled by defaultUsing config: /software/zk/bin/../conf/zoo.cfgClient port found: 2181. Client address: localhost.Mode: follower
复制代码


hadoop2 节点:


[root@hadoop2 software]# zk/bin/zkServer.sh statusZooKeeper JMX enabled by defaultUsing config: /software/zk/bin/../conf/zoo.cfgClient port found: 2181. Client address: localhost.Mode: leader
复制代码


hadoop3 节点:


[root@hadoop3 software]# zk/bin/zkServer.sh statusZooKeeper JMX enabled by defaultUsing config: /software/zk/bin/../conf/zoo.cfgClient port found: 2181. Client address: localhost.Mode: follower
复制代码

Spark Standalone 模式 HA 集群部署

下载并解压安装包
wget https://mirrors.tuna.tsinghua.edu.cn/apache/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgztar -xzvf spark-3.1.2-bin-hadoop2.7.tgz mv spark-3.1.2-bin-hadoop2.7 /software/spark
复制代码
修改配置文件

编辑 spark/conf/spark-env.sh 文件,由于 Spark HA 使用 Zookeeper 来协调主从,因此需要指定 Zookeeper 的地址和 Spark 在 Zookeeper 中使用的目录。


export JAVA_HOME=/software/jdkexport SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=hadoop1:2181,hadoop2:2181,hadoop3:2181 -Dspark.deploy.zookeeper.dir=/spark"
复制代码


编辑 spark/conf/slaves 文件:


hadoop1hadoop2hadoop3
复制代码


同步修改后的配置文件到集群的其他节点:


scp -r spark root@hadoop2:/software/scp -r spark root@hadoop3:/software/
复制代码
启动 Spark 集群

在 hadoop1 节点上启动 Spark 集群,执行 start-all.sh 脚本会在 hadoop1 节点上启动 Master 进程,并且在 spark/conf/slaves 文件中配置的所有节点上启动 Worker 进程。


[root@hadoop1 software]# spark/sbin/start-all.shstarting org.apache.spark.deploy.master.Master, logging to /software/spark/logs/spark-root-org.apache.spark.deploy.master.Master-1-hadoop1.outhadoop2: starting org.apache.spark.deploy.worker.Worker, logging to /software/spark/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-hadoop2.outhadoop1: starting org.apache.spark.deploy.worker.Worker, logging to /software/spark/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-hadoop1.outhadoop3: starting org.apache.spark.deploy.worker.Worker, logging to /software/spark/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-hadoop3.out
复制代码


登录 hadoop2 节点,启动第二个 Master(Standby Master)。


[root@hadoop2 software]# spark/sbin/start-master.shstarting org.apache.spark.deploy.master.Master, logging to /software/spark/logs/spark-root-org.apache.spark.deploy.master.Master-1-hadoop2.out
复制代码
查看各节点的进程

在各节点执行 jps 命令查看启动的 Java 进程。可以看到 Spark 的 Master 进程分别在 hadoop1 和 hadoop2 节点上运行,Worker 进程在所有节点上运行。QuorumPeerMain 是 Zookeeper 的进程。


hadoop1 节点:


[root@hadoop1 software]# jps18528 Worker18427 Master23468 QuorumPeerMain18940 Jps
复制代码


hadoop2 节点:


[root@hadoop2 software]# jps27824 Worker29954 Jps23751 QuorumPeerMain28135 Master
复制代码


hadoop3 节点:


[root@hadoop3 software]# jps11696 Worker12939 QuorumPeerMain13021 Jps
复制代码
Zookeeper 查看节点注册状态

可以看到此时 3 个 Spark 节点都注册到 Zookeeper 上了,并且此时 192.168.1.117 hadoop1 这个节点是 Master。


[zk: localhost:2181(CONNECTED) 33] ls /spark/master_status[worker_worker-20210821150002-192.168.1.117-42360, worker_worker-20210821150002-192.168.1.118-39584, worker_worker-20210821150002-192.168.1.119-42991][zk: localhost:2181(CONNECTED) 34] get /spark/master_status192.168.1.117
复制代码
Spark HA 测试

浏览器访问 http://hadoop1:8081 进入 Spark WebUI 界面,此时 hadoop1 节点 Master 的状态为 ALIVE。



浏览器访问 http://hadoop1:8082 ,可以看到 hadoop2 节点 Master 的状态为 STANDBY。



停止 hadoop1 节点上的 Master 进程。


[root@hadoop1 software]# spark/sbin/stop-master.sh stopping org.apache.spark.deploy.master.Master
复制代码


等待几秒以后 hadoop2 节点的状态从 STANDBY 切换为 ALIVE。



Spark HA 测试完成,重新启动 hadoop1 节点的 Master 进程。


[root@hadoop1 software]# spark/sbin/start-master.sh starting org.apache.spark.deploy.master.Master, logging to /software/spark/logs/spark-root-org.apache.spark.deploy.master.Master-1-hadoop1.out
复制代码
Spark Shell 连接集群

--master 参数的连接地址后可以指定多个 Master 的地址,当第一个 Master 无法连接时,会依次往后尝试连接其他的 Master。


[root@hadoop1 software]# spark/bin/spark-shell --master spark://hadoop1:7077,hadoop2:707721/08/21 18:00:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicableUsing Spark's default log4j profile: org/apache/spark/log4j-defaults.propertiesSetting default log level to "WARN".To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).Spark context Web UI available at http://hadoop1:4040Spark context available as 'sc' (master = spark://hadoop1:7077,hadoop2:7077, app id = app-20210821180100-0000).Spark session available as 'spark'.Welcome to      ____              __     / __/__  ___ _____/ /__    _\ \/ _ \/ _ `/ __/  '_/   /___/ .__/\_,_/_/ /_/\_\   version 3.1.2      /_/         Using Scala version 2.12.10 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_181)Type in expressions to have them evaluated.Type :help for more information.
复制代码

使用第三方资源调度系统

Spark 可以使用 Yarn、Mesos、Kubernetes 作为底层资源调度系统,目前 Mesos 使用的已经比较少了,本文将介绍 Spark 使用 Yarn 和 Kubernetes 作为调度系统的应用。

Spark On Yarn

Spark On Yarn 模式的搭建比较简单,仅需要在 Yarn 集群的一个节点上安装 Spark 客户端即可,该节点可以作为提交 Spark 应用程序到 Yarn 集群的客户端。Spark 本身的 Master 节点和 Worker 节点不需要启动。前提是我们需要准备好 Yarn 集群,关于 Yarn 集群的安装可以参考 Hadoop 分布式集群安装


使用此模式需要修改 Spark 的配置文件 conf/spark-env.sh,添加 Hadoop 相关属性,指定 Hadoop 配置文件所在的目录:


export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
复制代码


修改完毕后,即可运行 Spark 应用程序,例如运行 Spark 自带的求圆周率的例子,并以 Spark On Yarn 的 Cluster 模式运行。


bin/spark-submit \--class org.apache.spark.examples.SparkPi \--master yarn \--deploy-mode cluster \/software/spark/examples/jars/spark-examples_2.12-3.1.2.jar
复制代码


在 Yarn 的 ResourceManager 对应的 WebUI 界面中可以查看应用程序执行的详细信息。



在 Application 详情页中可以查看输出日志的详细信息。




可以看到最后计算 Pi 的输出结果。


Spark On K8S

目前基于 Kubernetes 的 Spark 的应用主要采用两种方式运行:


  • 1.基于 Kubernetes 的 Operator 的 Spark on K8S Operator,是 Google Cloud Platform 为了支持 Spark 而开发的一种 Operator。个人比较推荐使用 Spark on K8S Operator 的方式提交作业。

  • 2.Spark 原生支持的 Spark On K8S,是 Spark 社区为支持 Kubernetes 这种资源管理框架而引入的 Kubernetes Client 的实现。


Spark Operator 定义了两个 CRD(Custom Resource Definitions,自定义资源定义)对象,SparkApplication 和 ScheduledSparkApplication。 这些 CRD 是 Spark 作业的抽象,使得在 Kubernetes 集群中可以使用 YAML 来定义这些作业。另外还提供了 sparkctl 命令行工具方便我们操控 SparkApplication 和 ScheduledSparkApplication CRD 资源对象。


使用 Spark On K8S Operator 模式时,需要预先在 Kubernetes 集群中部署 Spark Operator 容器,用于将 SparkApplication 和 ScheduledSparkApplication 这些 CRD 资源对象转换为 Kubernetes 原生的资源对象,例如 Pod,Service 等等。



在 Spark On K8S 模式中,Spark 客户端需要与 Kubernetes API Server 直接通信来创建相关的 Kubernetes 资源。



Spark On K8S 和 Spark On K8S Operator 提交作业的方式如下图所示。


Spark On K8S Operator(推荐)

使用 Spark On K8S Operator 模式时,需要预先在 Kubernetes 集群中部署 Spark Operator。


部署 Spark Operator


添加 Spark On K8S Operator Helm 仓库并下载 Helm 资源文件。


helm repo add spark-operator https://googlecloudplatform.github.io/spark-on-k8s-operatorhelm pull spark-operator/spark-operator --untar
复制代码


修改 values.yaml 文件中有以下两个地方需要修改:


  • 1.repository 镜像仓库地址,由于国内拉取 Spark 相关镜像速度较慢,我已经提前下载好镜像并且上传至阿里云镜像仓库中了,大家可以直接使用我的镜像。

  • 2.sparkJobNamespace:设置 Spark 提交作业的命名空间,会为该命名空间创建一个 ServiceAccount 并赋予相应的权限,ServiceAccount 的名字为 helm 项目名-spark



使用 helm install 命令安装 Spark Operator,spark-job 命名空间是之后提交 Spark 作业时使用的。


kubectl create namespace spark-jobhelm install my-spark spark-operator \--namespace spark-operator --create-namespace
复制代码


确认 Spark Operator Pod 已经正常运行。


❯ kubectl get pod -n spark-operatorNAME                                       READY   STATUS    RESTARTS   AGEmy-spark-spark-operator-674cbc9d9c-8x22x   1/1     Running   0          5m24s
复制代码


查看在 spark-job 命名空间创建的 ServiceAccount。


❯ kubectl get serviceaccounts -n spark-jobNAME             SECRETS   AGE.....my-spark-spark   1         2m33s
复制代码


运行 SparkApplications


SparkApplications 资源对象中通常使用的 Cluster 模式来提交作业。在 YAML 文件中指定运行应由程序的 jar 包以及 main() 方法所在的类。


apiVersion: "sparkoperator.k8s.io/v1beta2"kind: SparkApplicationmetadata:  name: spark-pi  namespace: spark-jobspec:  type: Scala  mode: cluster  image: "registry.cn-hangzhou.aliyuncs.com/public-namespace/spark:v3.1.1"  imagePullPolicy: Always  mainClass: org.apache.spark.examples.SparkPi  mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.12-3.1.1.jar"  sparkVersion: "3.1.1"  restartPolicy:    type: Never  driver:    cores: 1    coreLimit: "1200m"    memory: "512m"    labels:      version: 3.1.1    serviceAccount: my-spark-spark  executor:    cores: 1    instances: 1    memory: "512m"    labels:      version: 3.1.1
复制代码


等待一会,查看 SparkApplications 状态,COMPLETED 表示已经执行完成该作业。


❯ kubectl get sparkapplications -n spark-job  spark-piNAME       STATUS      ATTEMPTS   START                  FINISH                 AGEspark-pi   COMPLETED   1          2021-10-04T13:13:27Z   2021-10-04T13:13:48Z   8h
复制代码


查看在 spark-job 命名空间创建的 Pod 的日志,可以看到本次作业执行的详情。


kubectl logs -n spark-job spark-pi-driver spark-kubernetes-driver
复制代码


Spark On K8S

使用 Spark On K8S 模式提交作业时我们通常可以使用 spark-submit 或者 spark-shell 两种命令行工具,其中 spark-submit 支持 Cluster 和 Client 两种提交方式,而 spark-shell 只支持 Client 一种提交方式。


Spark-Submit


Cluster 模式


使用 spark-submit 的 Cluster 模式提交作业时,由于我们的 Kubernetes 集群的 API Server 是使用自签名的证书进行 HTTPS 加密的,因此需要使用 spark.kubernetes.authenticate.submission.caCertFile 参数指定 Kubernetes 集群的 CA 证书,让 Spark 客户端信任自签名证书。注意这里的 ServiceAccount 需要自行创建并且赋予以下权限,如果你是按照顺序完成实验的,那么在前面 Spark On K8S Operator 中已经创建了该 ServiceAccount,可以跳过这一步。


❯ kubectl get rolebindings -n spark-job spark -o yamlapiVersion: rbac.authorization.k8s.io/v1kind: RoleBindingmetadata:  annotations:    meta.helm.sh/release-name: my-spark    meta.helm.sh/release-namespace: spark-operator  creationTimestamp: "2021-09-29T16:10:51Z"  labels:    app.kubernetes.io/instance: my-spark    app.kubernetes.io/managed-by: Helm    app.kubernetes.io/name: spark-operator    app.kubernetes.io/version: v1beta2-1.2.3-3.1.1    helm.sh/chart: spark-operator-1.1.6  name: spark  namespace: spark-job  resourceVersion: "204712527"  selfLink: /apis/rbac.authorization.k8s.io/v1/namespaces/spark-job/rolebindings/spark  uid: 225970e8-472d-4ea5-acb5-08630852f76croleRef:  apiGroup: rbac.authorization.k8s.io  kind: Role  name: spark-rolesubjects:- kind: ServiceAccount  name: my-spark-spark  namespace: spark-job❯ kubectl get role -n spark-job  spark-role -o yamlapiVersion: rbac.authorization.k8s.io/v1kind: Rolemetadata:  annotations:    meta.helm.sh/release-name: my-spark    meta.helm.sh/release-namespace: spark-operator  creationTimestamp: "2021-09-29T16:10:51Z"  labels:    app.kubernetes.io/instance: my-spark    app.kubernetes.io/managed-by: Helm    app.kubernetes.io/name: spark-operator    app.kubernetes.io/version: v1beta2-1.2.3-3.1.1    helm.sh/chart: spark-operator-1.1.6  name: spark-role  namespace: spark-job  resourceVersion: "204712525"  selfLink: /apis/rbac.authorization.k8s.io/v1/namespaces/spark-job/roles/spark-role  uid: 436afb3f-a304-4756-b64a-978d5836c3a2rules:- apiGroups:  - ""  resources:  - pods  verbs:  - '*'- apiGroups:  - ""  resources:  - services  verbs:  - '*'- apiGroups:  - ""  resources:  - configmaps  verbs:  - '*'
复制代码


执行 spark-submit 命令向 Kubernetes 集群提交作业。


 bin/spark-submit \--master  k8s://https://11.16.0.153:6443 \--conf spark.kubernetes.authenticate.submission.caCertFile=/Users/chengzhiwei/software/spark/spark-3.1.2-bin-hadoop3.2/certs/ca.crt \--conf spark.kubernetes.authenticate.driver.serviceAccountName=my-spark-spark \--deploy-mode cluster \--name spark-pi-submit \--class org.apache.spark.examples.SparkPi \--conf spark.kubernetes.namespace=spark-job \--conf spark.executor.instances=2 \--conf spark.kubernetes.container.image.pullPolicy=Always \--conf spark.kubernetes.container.image=registry.cn-hangzhou.aliyuncs.com/public-namespace/spark:v3.1.1 \ local:///opt/spark/examples/jars/spark-examples_2.12-3.1.1.jar
复制代码


关于证书不受信任这里也有个讨巧的方式,就是使用 kubectl proxy 命令将 API Server 的 HTTPS 转化为 HTTP。


❯ kubectl proxyStarting to serve on 127.0.0.1:8001
复制代码


然后通过 http://localhost:8001 和 API Server 进行交互,此时就无需指定 CA 证书了。


bin/spark-submit \--master  k8s://http://localhost:8001 \--deploy-mode cluster \--name spark-pi-submit \--class org.apache.spark.examples.SparkPi \--conf spark.kubernetes.namespace=spark-job \--conf spark.kubernetes.authenticate.driver.serviceAccountName=my-spark-spark \--conf spark.executor.instances=2 \--conf spark.kubernetes.container.image.pullPolicy=Always \--conf spark.kubernetes.container.image=registry.cn-hangzhou.aliyuncs.com/public-namespace/spark:v3.1.1 \ local:///opt/spark/examples/jars/spark-examples_2.12-3.1.1.jar
复制代码


通过查看 Kubernetes 为本次 Spark 作业创建的 Pod 的日志,可以看到运行结果。


❯ kubectl logs -n spark-job  spark-pi-submit-fc7b507c4be84351-driver......Pi is roughly 3.140075700378502......
复制代码


Client 模式


Client 模式无需指定 CA 证书,但是需要使用 spark.driver.hostspark.driver.port 指定提交作业的 Spark 客户端所在机器的地址,端口号默认就是 7078。


bin/spark-submit \--master k8s://https://11.16.0.153:6443 \--deploy-mode client \--name spark-pi-submit-client \--class org.apache.spark.examples.SparkPi \--conf spark.kubernetes.namespace=spark-job \--conf spark.kubernetes.authenticate.driver.serviceAccountName=my-spark-spark \--conf spark.executor.instances=2 \--conf spark.kubernetes.container.image=registry.cn-hangzhou.aliyuncs.com/public-namespace/spark:v3.1.1 \--conf spark.driver.host=11.8.38.43 \--conf spark.driver.port=7078 \/home/chengzw/spark-3.1.2-bin-hadoop3.2/examples/jars/spark-examples_2.12-3.1.2.jar
复制代码


使用 Client 模式提交作业在终端就可以直接看到输出结果了。



Spark-Shell


spark-shell 只支持 Client 方式,使用以下命令连接 Kubernetes API Server 并打开 spark-shell 交互式界面。


 bin/spark-shell \--master  k8s://https://11.16.0.153:6443 \--conf spark.kubernetes.authenticate.driver.serviceAccountName=my-spark-spark \--deploy-mode client \--name spark-shell \--conf spark.kubernetes.namespace=spark-job \--conf spark.executor.instances=2 \--conf spark.kubernetes.container.image.pullPolicy=Always \--conf spark.kubernetes.container.image=registry.cn-hangzhou.aliyuncs.com/public-namespace/spark:v3.1.1 \--conf spark.driver.host=11.8.38.43 \--conf spark.driver.port=7078
复制代码


在 spark-shell 交互式界面执行一个简单的计算,取出 0~99 之间的值。


21/10/05 10:44:28 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicableUsing Spark's default log4j profile: org/apache/spark/log4j-defaults.propertiesSetting default log level to "WARN".To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).Spark context Web UI available at http://11.8.38.43:4040Spark context available as 'sc' (master = k8s://https://11.16.0.153:6443, app id = spark-application-1633401878962).Spark session available as 'spark'.Welcome to      ____              __     / __/__  ___ _____/ /__    _\ \/ _ \/ _ `/ __/  '_/   /___/ .__/\_,_/_/ /_/\_\   version 3.1.2      /_/         Using Scala version 2.12.10 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_101)Type in expressions to have them evaluated.Type :help for more information.
scala> val range = spark.range(100)range: org.apache.spark.sql.Dataset[Long] = [id: bigint]
scala> range.collect()res1: Array[Long] = Array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99)
复制代码
Spark History Server(可选)

部署 Spark History Server


在运行 Spark Application 的时候,Spark 会提供一个 WebUI 列出应用程序的运行时信息,但是一旦该应用程序执行完毕后,将无法查看应用程序执行的历史记录。Spark History Server 就是为了处理这种情况而诞生的,我们可以将 Spark 作业的日志提交到一个统一的地方,例如 HDFS,然后 Spark History Server 就可以通过读取 HDFS 目录中的文件来重新渲染生成 WebUI 界面来展示应用程序执行的历史信息。


使用以下资源文件部署一个 Spark History Server,并且通过 NodePort Service 的方式将服务暴露到集群外部,集群外部可以通过节点地址:NodePort 来访问 Spark History Server。前提是我们需要准备好 HDFS 集群,关于 HDFS 集群的安装可以参考 Hadoop 分布式集群安装


apiVersion: apps/v1kind: Deploymentmetadata:  name: spark-history-server  namespace: spark-jobspec:  selector:    matchLabels:      run: spark-history-server  replicas: 1  template:    metadata:      labels:        run: spark-history-server    spec:      containers:        - image:  "registry.cn-hangzhou.aliyuncs.com/public-namespace/spark:v3.1.1"          name: spark-history-server          args: ["/opt/spark/bin/spark-class", "org.apache.spark.deploy.history.HistoryServer"]          ports:            - containerPort: 18080              name: http          env:          - name: SPARK_HISTORY_OPTS            value: "-Dspark.history.fs.logDirectory=hdfs://11.8.36.125:8020/spark-k8s"---apiVersion: v1kind: Servicemetadata:  name: spark-history-server  namespace: spark-jobspec:  ports:  - name: http    nodePort: 30080    port: 18080    protocol: TCP    targetPort: 18080  selector:     run: spark-history-server  type: NodePort
复制代码


Spark On K8S Operator 使用 History Server


设置 spark.eventLog.enabled 参数值为 true 启用记录 Spark 日志,spark.eventLog.dir 指定输出日志的目录为 HDFS 目录。


apiVersion: "sparkoperator.k8s.io/v1beta2"kind: SparkApplicationmetadata:  name: spark-pi  namespace: spark-jobspec:  type: Scala  mode: cluster  image: "registry.cn-hangzhou.aliyuncs.com/public-namespace/spark:v3.1.1"  imagePullPolicy: Always  mainClass: org.apache.spark.examples.SparkPi  mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.12-3.1.1.jar"  sparkVersion: "3.1.1"  sparkConf:    "spark.eventLog.enabled": "true"    "spark.eventLog.dir": "hdfs://11.8.36.125:8020/spark-k8s"  restartPolicy:    type: Never  driver:    cores: 1    coreLimit: "1200m"    memory: "512m"    labels:      version: 3.1.1    serviceAccount: my-spark-spark  executor:    cores: 1    instances: 1    memory: "512m"    labels:      version: 3.1.1
复制代码


在集群外通过节点地址:30080 访问 Spark History Server,可以在应用程序执行完毕后看到详细的信息。





Spark On K8S 使用 History Server


 bin/spark-submit \--master  k8s://https://11.16.0.153:6443 \--conf spark.kubernetes.authenticate.submission.caCertFile=/Users/chengzhiwei/software/spark/spark-3.1.2-bin-hadoop3.2/certs/ca.crt \--conf spark.kubernetes.authenticate.driver.serviceAccountName=my-spark-spark \--deploy-mode cluster \--name spark-pi-submit \--class org.apache.spark.examples.SparkPi \--conf spark.kubernetes.namespace=spark-job \--conf spark.executor.instances=2 \--conf spark.kubernetes.container.image.pullPolicy=Always \--conf spark.kubernetes.container.image=registry.cn-hangzhou.aliyuncs.com/public-namespace/spark:v3.1.1 \--conf spark.eventLog.enabled=true \--conf spark.eventLog.dir=hdfs://11.8.36.125:8020/spark-k8s \ local:///opt/spark/examples/jars/spark-examples_2.12-3.1.1.jar
复制代码

构建镜像

上面的例子都是使用 Spark 官方自带的程序来提交作业,如果我们想要自定义一个程序可以使用 Spark 官网提供的脚本来构建镜像。

程序代码

该项目使用 Maven 来管理依赖。


<dependencies>    <dependency>        <groupId>org.apache.spark</groupId>        <artifactId>spark-core_2.11</artifactId>        <version>2.1.0</version>    </dependency>    <dependency>        <groupId>org.scala-lang</groupId>        <artifactId>scala-library</artifactId>        <version>2.11.12</version>        <scope>compile</scope>    </dependency></dependencies>
复制代码


程序代码如下,使用 Java 编写了一个 Word Count 程序。


package com.chengzw.wordcount;
import org.apache.log4j.Level;import org.apache.log4j.Logger;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.FlatMapFunction;import org.apache.spark.api.java.function.Function2;import org.apache.spark.api.java.function.PairFunction;import scala.Tuple2;
import java.util.Arrays;import java.util.Iterator;import java.util.List;
/** * @description WordCount 示例 * @author chengzw * @since 2021/7/25 8:39 下午 */public class MyJavaWordCount { public static void main(String[] args) { Logger.getLogger("org").setLevel(Level.OFF); System.setProperty("spark.ui.showConsoleProgress","false"); //创建配置对象 //本地运行 //SparkConf conf = new SparkConf().setAppName("MyJavaWordCount").setMaster("local"); //在Spark上运行 SparkConf conf = new SparkConf().setAppName("MyJavaWordCount"); //创建SparkContext对象 JavaSparkContext sc = new JavaSparkContext(conf);
//读取hdfs数据 //在本地运行 //JavaRDD<String> rdd1= sc.textFile("/tmp/data.txt"); //在Spark上运行 JavaRDD<String> rdd1= sc.textFile(args[0]);
//分词 JavaRDD<String> rdd2 = rdd1.flatMap(new FlatMapFunction<String, String>() { @Override public Iterator<String> call(String input) throws Exception { return Arrays.asList(input.split(" ")).iterator(); } }); //单词计数 word,1 JavaPairRDD<String, Integer> rdd3 = rdd2.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String word) throws Exception { return new Tuple2<String, Integer>(word, 1); } });
//相同Key的值累加 JavaPairRDD<String, Integer> rdd4 = rdd3.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer a, Integer b) throws Exception { return a + b; } });
//触发计算 List<Tuple2<String, Integer>> result = rdd4.collect();
//打印 for(Tuple2<String,Integer> r : result){ System.out.println(r._1 + "\t" + r._2); }
//释放资源 sc.stop();
}}
复制代码

打包

点击 mvn package 将程序打成 jar 包。


构建并上传镜像

将 jar 包放到 Spark 安装包的 examples/jars 目录中,进入 Spark 目录然后执行以下命令构建镜像。


bin/docker-image-tool.sh -r registry.cn-hangzhou.aliyuncs.com/public-namespace -t my-spark:1.0.0 build
复制代码


查看构建好的镜像。


❯ docker images | grep sparkregistry.cn-hangzhou.aliyuncs.com/public-namespace/spark                v1.0.0       372341ae930d   12 minutes ago   529MB
复制代码


上传镜像。


./docker-image-tool.sh -r registry.cn-hangzhou.aliyuncs.com/public-namespace -t v1.0.0 push
复制代码


使用自己构建的镜像执行 Word Count 程序。


 bin/spark-submit \--master  k8s://https://11.16.0.153:6443 \--conf spark.kubernetes.authenticate.submission.caCertFile=/Users/chengzhiwei/software/spark/spark-3.1.2-bin-hadoop3.2/certs/ca.crt \--conf spark.kubernetes.authenticate.driver.serviceAccountName=my-spark-spark \--deploy-mode cluster \--name spark-pi-submit \--class com.chengzw.wordcount.MyJavaWordCount \--conf spark.kubernetes.namespace=spark-job \--conf spark.executor.instances=2 \--conf spark.kubernetes.container.image.pullPolicy=Always \--conf spark.kubernetes.container.image=registry.cn-hangzhou.aliyuncs.com/public-namespace/spark:v1.0.0 \ local:///opt/spark/examples/jars/spark-lab-1.0-SNAPSHOT.jar /etc/security/limits.conf
复制代码


查看执行结果:


kubectl logs -n spark-job spark-pi-submit-37945f7c4f24e729-driver#返回结果......rss  2space  2priority  44  1this  1"soft"  1max  14cpu  1memlock  1apply  1......
复制代码

参考资料

欢迎关注


发布于: 刚刚阅读数: 2
用户头像

Se7en

关注

还未添加个人签名 2020.01.10 加入

还未添加个人简介

评论

发布
暂无评论
Spark 系列教程(2)运行模式介绍