Spark 运行模式
Apache Spark 是用于大规模数据处理的统一分析引擎,它提供了 Java、Scala、Python 和 R 语言的高级 API,以及一个支持通用的执行图计算的优化引擎。
Spark Core 是 Spark 的核心模块,负责任务调度、内存管理等功能。Spark Core 的实现依赖于 RDD(Resilient Distributed Datasets,弹性分布式数据集)的程序抽象概念。
在 Spark Core 的基础上,Spark 提供了一系列面向不同应用需求的组件,包括使用 SQL 进行结构化数据处理的 Spark SQL、用于实时流处理的 Spark Streaming、用于机器学习的 MLlib 以及用于图处理的 GraphX。
Spark 本身并没有提供分布式文件系统,因而 Spark 的数据存储主要依赖于 HDFS,也可以使用 HBase 和 S3 等作为存储层。
Spark 有多种运行模式:
1.可以运行在一台机器上,称为 Local(本地)运行模式。
2.可以使用 Spark 自带的资源调度系统,称为 Standalone 模式。
3.可以使用 Yarn、Mesos、Kubernetes 作为底层资源调度系统,称为 Spark On Yarn、Spark On Mesos、Spark On K8S。
Client 和 Cluster 提交模式
Driver 是 Spark 中的主控进程,负责执行应用程序的 main() 方法,创建 SparkContext 对象,负责与 Spark 集群进行交互,提交 Spark 作业,并将作业转化为 Task(一个作业由多个 Task 任务组成),然后在各个 Executor 进程间对 Task 进行调度和监控。
根据应用程序提交方式的不同,Driver 在集群中的位置也有所不同,应用程序提交方式主要有两种:Client 和 Cluster,默认是 Client,可以在向 Spark 集群提交应用程序时使用 --deploy-mode
参数指定提交方式。
3 种运行模式部署
Local 模式
Local 模式的部署方式比较简单,只需下载安装包并解压就可以使用了。具体可以参考上一章的 Spark 系列教程(1)Word Count 的介绍,本文就不再赘述了。
在 spark-shell 交互式界面执行一个简单的计算,取出 0~99 之间的值。
❯ bin/spark-shell
21/10/07 11:50:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://chengzw:4040
Spark context available as 'sc' (master = local[*], app id = local-1633578611004).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 3.1.2
/_/
Using Scala version 2.12.10 (OpenJDK 64-Bit Server VM, Java 1.8.0_302)
Type in expressions to have them evaluated.
Type :help for more information.
scala> val range = spark.range(100)
range: org.apache.spark.sql.Dataset[Long] = [id: bigint]
scala> range.collect()
res0: Array[Long] = Array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99)
复制代码
Standalone 模式
在 Spark Standalone 模式中,资源调度是由 Spark 自己实现的。 Spark Standalone 模式是 Master-Slaves 架构的集群模式,和大部分的 Master-Slaves 结构的集群一样,存在着 Master 单点故障的问题。对于单点故障的问题,Spark 提供了两种方案:
基于文件系统的单点恢复(Single-Node Recovery with Local File System),将 Application 和 Worker 的注册信息写入文件中,当 Master 宕机时,可以重新启动 Master 进程恢复工作。该方式只适用于开发或测试环境。
基于 Zookeeper 的 Standby Masters(Standby Masters with ZooKeeper)。ZooKeeper 提供了一个 Leader Election 机制,利用这个机制可以保证虽然集群存在多个 Master,但是只有一个是 Active 的,其他的都是 Standby。当 Active 的 Master 出现故障时,另外的一个 Standby Master 会被选举出来,对于恢复期间正在运行的应用程序,由于 Application 在运行前已经向 Master 申请了资源,运行时 Driver 负责与 Executor 进行通信,管理整个 Application,因此 Master 的故障对 Application 的运行不会造成影响,但是会影响新的 Application 的提交。接下来将介绍 Spark Standalone 模式基于 Zookeeper 的 HA 高可用部署。
前提条件
Host 设置
编辑 /etc/hosts 文件:
192.168.1.117 hadoop1
192.168.1.118 hadoop2
192.168.1.119 hadoop3
复制代码
拷贝到其他两台机器上:
scp /etc/hosts root@hadoop2:/etc/hosts
scp /etc/hosts root@hadoop3:/etc/hosts
复制代码
配置免密登录
为了方便后续拷贝文件以及执行脚本,配置 SSH 免密登录。 在 hadoop1 上生成 RSA 非对称密钥对:
[root@hadoop1 hadoop]# ssh-keygen
Generating public/private rsa key pair.
Enter file in which to save the key (/root/.ssh/id_rsa):
Enter passphrase (empty for no passphrase):
Enter same passphrase again:
Your identification has been saved in /root/.ssh/id_rsa.
Your public key has been saved in /root/.ssh/id_rsa.pub.
The key fingerprint is:
SHA256:wkMiPVpbBtjoZwBIpyvvluYtfQM9hQeHtgBFVfrwL1I root@hadoop1
The key's randomart image is:
+---[RSA 2048]----+
|+o.O+..o. |
|. *.o.+.. |
| o..=o*= |
| o+oOo+o |
|...o..+oE |
|.. . o+ . |
| .o .... . |
| .=.. o. . |
| +o... . |
+----[SHA256]-----+
复制代码
将公钥拷贝到集群中的其他机器:
[root@hadoop1 hadoop]# ssh-copy-id root@hadoop1
[root@hadoop1 hadoop]# ssh-copy-id root@hadoop2
[root@hadoop1 hadoop]# ssh-copy-id root@hadoop3
复制代码
安装 Java
进入 Oracle 官网 下载并解压 JDK 安装包。设置环境变量,编辑 vim /etc/profile:
export JAVA_HOME=/software/jdk
export PATH=$PATH:$JAVA_HOME/bin
复制代码
Zookeeper 集群部署
下载并解压安装包
wget https://archive.apache.org/dist/zookeeper/zookeeper-3.5.8/apache-zookeeper-3.5.8-bin.tar.gz
tar -xzvf apache-zookeeper-3.5.8-bin.tar.gz
mv apache-zookeeper-3.5.8 /software/zk
复制代码
编辑配置文件
编辑 zk/conf/zoo.cfg 文件:
#用于配置 Zookeeper 中最小时间单位的长度,单位是毫秒
tickTime=2000
#该参数用于配置 Leader 服务器等待 Follower 启动,并完成数据同步的时间
#乘上 tickTime 得到具体时间:10 * 2000 = 20000 毫秒
initLimit=10
#Leader 与 Follower 心跳检测的超时时间。
#乘上 tickTime 得到具体时间:5 * 2000 = 10000 毫秒
syncLimit=5
#数据存放目录
dataDir=/software/zk/data
#客户端连接端口
clientPort=2181
#Zookeeper集群成员地址
#2888端口用于集群间通信,leader会监听此端口
#3888端口用于leader选举
server.1=hadoop1:2888:3888
server.2=hadoop2:2888:3888
server.3=hadoop3:2888:3888
复制代码
同步修改后的配置文件到集群的其他节点:
scp -r zk root@hadoop2:/software/
scp -r zk root@hadoop3:/software/
复制代码
标识 Server ID
#在 hadoop1 节点上执行
echo 1 > /root/zookeeper-cluster/zk1/myid
#在 hadoop2 节点上执行
echo 2 > /root/zookeeper-cluster/zk2/myid
#在 hadoop3 节点上执行
echo 3 > /root/zookeeper-cluster/zk3/myid
复制代码
启动 Zookeeper 集群
分别在 3 台节点上执行以下命令启动 Zookeeper:
查看 Zookeeper 集群状态
分别在 3 台节点上查看 Zookeeper 状态,可以看到此时 hadoop2 节点为 Zookeeper 的 Master 节点。
hadoop1 节点:
[root@hadoop1 software]# zk/bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /software/zk/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: follower
复制代码
hadoop2 节点:
[root@hadoop2 software]# zk/bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /software/zk/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: leader
复制代码
hadoop3 节点:
[root@hadoop3 software]# zk/bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /software/zk/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: follower
复制代码
Spark Standalone 模式 HA 集群部署
下载并解压安装包
wget https://mirrors.tuna.tsinghua.edu.cn/apache/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz
tar -xzvf spark-3.1.2-bin-hadoop2.7.tgz
mv spark-3.1.2-bin-hadoop2.7 /software/spark
复制代码
修改配置文件
编辑 spark/conf/spark-env.sh 文件,由于 Spark HA 使用 Zookeeper 来协调主从,因此需要指定 Zookeeper 的地址和 Spark 在 Zookeeper 中使用的目录。
export JAVA_HOME=/software/jdk
export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=hadoop1:2181,hadoop2:2181,hadoop3:2181 -Dspark.deploy.zookeeper.dir=/spark"
复制代码
编辑 spark/conf/slaves 文件:
同步修改后的配置文件到集群的其他节点:
scp -r spark root@hadoop2:/software/
scp -r spark root@hadoop3:/software/
复制代码
启动 Spark 集群
在 hadoop1 节点上启动 Spark 集群,执行 start-all.sh 脚本会在 hadoop1 节点上启动 Master 进程,并且在 spark/conf/slaves 文件中配置的所有节点上启动 Worker 进程。
[root@hadoop1 software]# spark/sbin/start-all.sh
starting org.apache.spark.deploy.master.Master, logging to /software/spark/logs/spark-root-org.apache.spark.deploy.master.Master-1-hadoop1.out
hadoop2: starting org.apache.spark.deploy.worker.Worker, logging to /software/spark/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-hadoop2.out
hadoop1: starting org.apache.spark.deploy.worker.Worker, logging to /software/spark/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-hadoop1.out
hadoop3: starting org.apache.spark.deploy.worker.Worker, logging to /software/spark/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-hadoop3.out
复制代码
登录 hadoop2 节点,启动第二个 Master(Standby Master)。
[root@hadoop2 software]# spark/sbin/start-master.sh
starting org.apache.spark.deploy.master.Master, logging to /software/spark/logs/spark-root-org.apache.spark.deploy.master.Master-1-hadoop2.out
复制代码
查看各节点的进程
在各节点执行 jps 命令查看启动的 Java 进程。可以看到 Spark 的 Master 进程分别在 hadoop1 和 hadoop2 节点上运行,Worker 进程在所有节点上运行。QuorumPeerMain 是 Zookeeper 的进程。
hadoop1 节点:
[root@hadoop1 software]# jps
18528 Worker
18427 Master
23468 QuorumPeerMain
18940 Jps
复制代码
hadoop2 节点:
[root@hadoop2 software]# jps
27824 Worker
29954 Jps
23751 QuorumPeerMain
28135 Master
复制代码
hadoop3 节点:
[root@hadoop3 software]# jps
11696 Worker
12939 QuorumPeerMain
13021 Jps
复制代码
Zookeeper 查看节点注册状态
可以看到此时 3 个 Spark 节点都注册到 Zookeeper 上了,并且此时 192.168.1.117 hadoop1 这个节点是 Master。
[zk: localhost:2181(CONNECTED) 33] ls /spark/master_status
[worker_worker-20210821150002-192.168.1.117-42360, worker_worker-20210821150002-192.168.1.118-39584, worker_worker-20210821150002-192.168.1.119-42991]
[zk: localhost:2181(CONNECTED) 34] get /spark/master_status
192.168.1.117
复制代码
Spark HA 测试
浏览器访问 http://hadoop1:8081 进入 Spark WebUI 界面,此时 hadoop1 节点 Master 的状态为 ALIVE。
浏览器访问 http://hadoop1:8082 ,可以看到 hadoop2 节点 Master 的状态为 STANDBY。
停止 hadoop1 节点上的 Master 进程。
[root@hadoop1 software]# spark/sbin/stop-master.sh
stopping org.apache.spark.deploy.master.Master
复制代码
等待几秒以后 hadoop2 节点的状态从 STANDBY 切换为 ALIVE。
Spark HA 测试完成,重新启动 hadoop1 节点的 Master 进程。
[root@hadoop1 software]# spark/sbin/start-master.sh
starting org.apache.spark.deploy.master.Master, logging to /software/spark/logs/spark-root-org.apache.spark.deploy.master.Master-1-hadoop1.out
复制代码
Spark Shell 连接集群
--master 参数的连接地址后可以指定多个 Master 的地址,当第一个 Master 无法连接时,会依次往后尝试连接其他的 Master。
[root@hadoop1 software]# spark/bin/spark-shell --master spark://hadoop1:7077,hadoop2:7077
21/08/21 18:00:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://hadoop1:4040
Spark context available as 'sc' (master = spark://hadoop1:7077,hadoop2:7077, app id = app-20210821180100-0000).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 3.1.2
/_/
Using Scala version 2.12.10 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_181)
Type in expressions to have them evaluated.
Type :help for more information.
复制代码
使用第三方资源调度系统
Spark 可以使用 Yarn、Mesos、Kubernetes 作为底层资源调度系统,目前 Mesos 使用的已经比较少了,本文将介绍 Spark 使用 Yarn 和 Kubernetes 作为调度系统的应用。
Spark On Yarn
Spark On Yarn 模式的搭建比较简单,仅需要在 Yarn 集群的一个节点上安装 Spark 客户端即可,该节点可以作为提交 Spark 应用程序到 Yarn 集群的客户端。Spark 本身的 Master 节点和 Worker 节点不需要启动。前提是我们需要准备好 Yarn 集群,关于 Yarn 集群的安装可以参考 Hadoop 分布式集群安装。
使用此模式需要修改 Spark 的配置文件 conf/spark-env.sh,添加 Hadoop 相关属性,指定 Hadoop 配置文件所在的目录:
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
复制代码
修改完毕后,即可运行 Spark 应用程序,例如运行 Spark 自带的求圆周率的例子,并以 Spark On Yarn 的 Cluster 模式运行。
bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode cluster \
/software/spark/examples/jars/spark-examples_2.12-3.1.2.jar
复制代码
在 Yarn 的 ResourceManager 对应的 WebUI 界面中可以查看应用程序执行的详细信息。
在 Application 详情页中可以查看输出日志的详细信息。
可以看到最后计算 Pi 的输出结果。
Spark On K8S
目前基于 Kubernetes 的 Spark 的应用主要采用两种方式运行:
1.基于 Kubernetes 的 Operator 的 Spark on K8S Operator,是 Google Cloud Platform 为了支持 Spark 而开发的一种 Operator。个人比较推荐使用 Spark on K8S Operator 的方式提交作业。
2.Spark 原生支持的 Spark On K8S,是 Spark 社区为支持 Kubernetes 这种资源管理框架而引入的 Kubernetes Client 的实现。
Spark Operator 定义了两个 CRD(Custom Resource Definitions,自定义资源定义)对象,SparkApplication 和 ScheduledSparkApplication。 这些 CRD 是 Spark 作业的抽象,使得在 Kubernetes 集群中可以使用 YAML 来定义这些作业。另外还提供了 sparkctl 命令行工具方便我们操控 SparkApplication 和 ScheduledSparkApplication CRD 资源对象。
使用 Spark On K8S Operator 模式时,需要预先在 Kubernetes 集群中部署 Spark Operator 容器,用于将 SparkApplication 和 ScheduledSparkApplication 这些 CRD 资源对象转换为 Kubernetes 原生的资源对象,例如 Pod,Service 等等。
在 Spark On K8S 模式中,Spark 客户端需要与 Kubernetes API Server 直接通信来创建相关的 Kubernetes 资源。
Spark On K8S 和 Spark On K8S Operator 提交作业的方式如下图所示。
Spark On K8S Operator(推荐)
使用 Spark On K8S Operator 模式时,需要预先在 Kubernetes 集群中部署 Spark Operator。
部署 Spark Operator
添加 Spark On K8S Operator Helm 仓库并下载 Helm 资源文件。
helm repo add spark-operator https://googlecloudplatform.github.io/spark-on-k8s-operator
helm pull spark-operator/spark-operator --untar
复制代码
修改 values.yaml 文件中有以下两个地方需要修改:
使用 helm install
命令安装 Spark Operator,spark-job 命名空间是之后提交 Spark 作业时使用的。
kubectl create namespace spark-job
helm install my-spark spark-operator \
--namespace spark-operator --create-namespace
复制代码
确认 Spark Operator Pod 已经正常运行。
❯ kubectl get pod -n spark-operator
NAME READY STATUS RESTARTS AGE
my-spark-spark-operator-674cbc9d9c-8x22x 1/1 Running 0 5m24s
复制代码
查看在 spark-job 命名空间创建的 ServiceAccount。
❯ kubectl get serviceaccounts -n spark-job
NAME SECRETS AGE
.....
my-spark-spark 1 2m33s
复制代码
运行 SparkApplications
SparkApplications 资源对象中通常使用的 Cluster 模式来提交作业。在 YAML 文件中指定运行应由程序的 jar 包以及 main() 方法所在的类。
apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
name: spark-pi
namespace: spark-job
spec:
type: Scala
mode: cluster
image: "registry.cn-hangzhou.aliyuncs.com/public-namespace/spark:v3.1.1"
imagePullPolicy: Always
mainClass: org.apache.spark.examples.SparkPi
mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.12-3.1.1.jar"
sparkVersion: "3.1.1"
restartPolicy:
type: Never
driver:
cores: 1
coreLimit: "1200m"
memory: "512m"
labels:
version: 3.1.1
serviceAccount: my-spark-spark
executor:
cores: 1
instances: 1
memory: "512m"
labels:
version: 3.1.1
复制代码
等待一会,查看 SparkApplications 状态,COMPLETED 表示已经执行完成该作业。
❯ kubectl get sparkapplications -n spark-job spark-piNAME STATUS ATTEMPTS START FINISH AGEspark-pi COMPLETED 1 2021-10-04T13:13:27Z 2021-10-04T13:13:48Z 8h
复制代码
查看在 spark-job 命名空间创建的 Pod 的日志,可以看到本次作业执行的详情。
kubectl logs -n spark-job spark-pi-driver spark-kubernetes-driver
复制代码
Spark On K8S
使用 Spark On K8S 模式提交作业时我们通常可以使用 spark-submit 或者 spark-shell 两种命令行工具,其中 spark-submit 支持 Cluster 和 Client 两种提交方式,而 spark-shell 只支持 Client 一种提交方式。
Spark-Submit
Cluster 模式
使用 spark-submit 的 Cluster 模式提交作业时,由于我们的 Kubernetes 集群的 API Server 是使用自签名的证书进行 HTTPS 加密的,因此需要使用 spark.kubernetes.authenticate.submission.caCertFile
参数指定 Kubernetes 集群的 CA 证书,让 Spark 客户端信任自签名证书。注意这里的 ServiceAccount 需要自行创建并且赋予以下权限,如果你是按照顺序完成实验的,那么在前面 Spark On K8S Operator 中已经创建了该 ServiceAccount,可以跳过这一步。
❯ kubectl get rolebindings -n spark-job spark -o yaml
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
annotations:
meta.helm.sh/release-name: my-spark
meta.helm.sh/release-namespace: spark-operator
creationTimestamp: "2021-09-29T16:10:51Z"
labels:
app.kubernetes.io/instance: my-spark
app.kubernetes.io/managed-by: Helm
app.kubernetes.io/name: spark-operator
app.kubernetes.io/version: v1beta2-1.2.3-3.1.1
helm.sh/chart: spark-operator-1.1.6
name: spark
namespace: spark-job
resourceVersion: "204712527"
selfLink: /apis/rbac.authorization.k8s.io/v1/namespaces/spark-job/rolebindings/spark
uid: 225970e8-472d-4ea5-acb5-08630852f76c
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: Role
name: spark-role
subjects:
- kind: ServiceAccount
name: my-spark-spark
namespace: spark-job
❯ kubectl get role -n spark-job spark-role -o yaml
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
annotations:
meta.helm.sh/release-name: my-spark
meta.helm.sh/release-namespace: spark-operator
creationTimestamp: "2021-09-29T16:10:51Z"
labels:
app.kubernetes.io/instance: my-spark
app.kubernetes.io/managed-by: Helm
app.kubernetes.io/name: spark-operator
app.kubernetes.io/version: v1beta2-1.2.3-3.1.1
helm.sh/chart: spark-operator-1.1.6
name: spark-role
namespace: spark-job
resourceVersion: "204712525"
selfLink: /apis/rbac.authorization.k8s.io/v1/namespaces/spark-job/roles/spark-role
uid: 436afb3f-a304-4756-b64a-978d5836c3a2
rules:
- apiGroups:
- ""
resources:
- pods
verbs:
- '*'
- apiGroups:
- ""
resources:
- services
verbs:
- '*'
- apiGroups:
- ""
resources:
- configmaps
verbs:
- '*'
复制代码
执行 spark-submit 命令向 Kubernetes 集群提交作业。
bin/spark-submit \
--master k8s://https://11.16.0.153:6443 \
--conf spark.kubernetes.authenticate.submission.caCertFile=/Users/chengzhiwei/software/spark/spark-3.1.2-bin-hadoop3.2/certs/ca.crt \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=my-spark-spark \
--deploy-mode cluster \
--name spark-pi-submit \
--class org.apache.spark.examples.SparkPi \
--conf spark.kubernetes.namespace=spark-job \
--conf spark.executor.instances=2 \
--conf spark.kubernetes.container.image.pullPolicy=Always \
--conf spark.kubernetes.container.image=registry.cn-hangzhou.aliyuncs.com/public-namespace/spark:v3.1.1 \
local:///opt/spark/examples/jars/spark-examples_2.12-3.1.1.jar
复制代码
关于证书不受信任这里也有个讨巧的方式,就是使用 kubectl proxy
命令将 API Server 的 HTTPS 转化为 HTTP。
❯ kubectl proxy
Starting to serve on 127.0.0.1:8001
复制代码
然后通过 http://localhost:8001 和 API Server 进行交互,此时就无需指定 CA 证书了。
bin/spark-submit \
--master k8s://http://localhost:8001 \
--deploy-mode cluster \
--name spark-pi-submit \
--class org.apache.spark.examples.SparkPi \
--conf spark.kubernetes.namespace=spark-job \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=my-spark-spark \
--conf spark.executor.instances=2 \
--conf spark.kubernetes.container.image.pullPolicy=Always \
--conf spark.kubernetes.container.image=registry.cn-hangzhou.aliyuncs.com/public-namespace/spark:v3.1.1 \
local:///opt/spark/examples/jars/spark-examples_2.12-3.1.1.jar
复制代码
通过查看 Kubernetes 为本次 Spark 作业创建的 Pod 的日志,可以看到运行结果。
❯ kubectl logs -n spark-job spark-pi-submit-fc7b507c4be84351-driver
......
Pi is roughly 3.140075700378502
......
复制代码
Client 模式
Client 模式无需指定 CA 证书,但是需要使用 spark.driver.host
和 spark.driver.port
指定提交作业的 Spark 客户端所在机器的地址,端口号默认就是 7078。
bin/spark-submit \
--master k8s://https://11.16.0.153:6443 \
--deploy-mode client \
--name spark-pi-submit-client \
--class org.apache.spark.examples.SparkPi \
--conf spark.kubernetes.namespace=spark-job \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=my-spark-spark \
--conf spark.executor.instances=2 \
--conf spark.kubernetes.container.image=registry.cn-hangzhou.aliyuncs.com/public-namespace/spark:v3.1.1 \
--conf spark.driver.host=11.8.38.43 \
--conf spark.driver.port=7078 \
/home/chengzw/spark-3.1.2-bin-hadoop3.2/examples/jars/spark-examples_2.12-3.1.2.jar
复制代码
使用 Client 模式提交作业在终端就可以直接看到输出结果了。
Spark-Shell
spark-shell 只支持 Client 方式,使用以下命令连接 Kubernetes API Server 并打开 spark-shell 交互式界面。
bin/spark-shell \
--master k8s://https://11.16.0.153:6443 \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=my-spark-spark \
--deploy-mode client \
--name spark-shell \
--conf spark.kubernetes.namespace=spark-job \
--conf spark.executor.instances=2 \
--conf spark.kubernetes.container.image.pullPolicy=Always \
--conf spark.kubernetes.container.image=registry.cn-hangzhou.aliyuncs.com/public-namespace/spark:v3.1.1 \
--conf spark.driver.host=11.8.38.43 \
--conf spark.driver.port=7078
复制代码
在 spark-shell 交互式界面执行一个简单的计算,取出 0~99 之间的值。
21/10/05 10:44:28 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://11.8.38.43:4040
Spark context available as 'sc' (master = k8s://https://11.16.0.153:6443, app id = spark-application-1633401878962).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 3.1.2
/_/
Using Scala version 2.12.10 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_101)
Type in expressions to have them evaluated.
Type :help for more information.
scala> val range = spark.range(100)
range: org.apache.spark.sql.Dataset[Long] = [id: bigint]
scala> range.collect()
res1: Array[Long] = Array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99)
复制代码
Spark History Server(可选)
部署 Spark History Server
在运行 Spark Application 的时候,Spark 会提供一个 WebUI 列出应用程序的运行时信息,但是一旦该应用程序执行完毕后,将无法查看应用程序执行的历史记录。Spark History Server 就是为了处理这种情况而诞生的,我们可以将 Spark 作业的日志提交到一个统一的地方,例如 HDFS,然后 Spark History Server 就可以通过读取 HDFS 目录中的文件来重新渲染生成 WebUI 界面来展示应用程序执行的历史信息。
使用以下资源文件部署一个 Spark History Server,并且通过 NodePort Service 的方式将服务暴露到集群外部,集群外部可以通过节点地址:NodePort 来访问 Spark History Server。前提是我们需要准备好 HDFS 集群,关于 HDFS 集群的安装可以参考 Hadoop 分布式集群安装。
apiVersion: apps/v1
kind: Deployment
metadata:
name: spark-history-server
namespace: spark-job
spec:
selector:
matchLabels:
run: spark-history-server
replicas: 1
template:
metadata:
labels:
run: spark-history-server
spec:
containers:
- image: "registry.cn-hangzhou.aliyuncs.com/public-namespace/spark:v3.1.1"
name: spark-history-server
args: ["/opt/spark/bin/spark-class", "org.apache.spark.deploy.history.HistoryServer"]
ports:
- containerPort: 18080
name: http
env:
- name: SPARK_HISTORY_OPTS
value: "-Dspark.history.fs.logDirectory=hdfs://11.8.36.125:8020/spark-k8s"
---
apiVersion: v1
kind: Service
metadata:
name: spark-history-server
namespace: spark-job
spec:
ports:
- name: http
nodePort: 30080
port: 18080
protocol: TCP
targetPort: 18080
selector:
run: spark-history-server
type: NodePort
复制代码
Spark On K8S Operator 使用 History Server
设置 spark.eventLog.enabled
参数值为 true 启用记录 Spark 日志,spark.eventLog.dir
指定输出日志的目录为 HDFS 目录。
apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
name: spark-pi
namespace: spark-job
spec:
type: Scala
mode: cluster
image: "registry.cn-hangzhou.aliyuncs.com/public-namespace/spark:v3.1.1"
imagePullPolicy: Always
mainClass: org.apache.spark.examples.SparkPi
mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.12-3.1.1.jar"
sparkVersion: "3.1.1"
sparkConf:
"spark.eventLog.enabled": "true"
"spark.eventLog.dir": "hdfs://11.8.36.125:8020/spark-k8s"
restartPolicy:
type: Never
driver:
cores: 1
coreLimit: "1200m"
memory: "512m"
labels:
version: 3.1.1
serviceAccount: my-spark-spark
executor:
cores: 1
instances: 1
memory: "512m"
labels:
version: 3.1.1
复制代码
在集群外通过节点地址:30080 访问 Spark History Server,可以在应用程序执行完毕后看到详细的信息。
Spark On K8S 使用 History Server
bin/spark-submit \
--master k8s://https://11.16.0.153:6443 \
--conf spark.kubernetes.authenticate.submission.caCertFile=/Users/chengzhiwei/software/spark/spark-3.1.2-bin-hadoop3.2/certs/ca.crt \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=my-spark-spark \
--deploy-mode cluster \
--name spark-pi-submit \
--class org.apache.spark.examples.SparkPi \
--conf spark.kubernetes.namespace=spark-job \
--conf spark.executor.instances=2 \
--conf spark.kubernetes.container.image.pullPolicy=Always \
--conf spark.kubernetes.container.image=registry.cn-hangzhou.aliyuncs.com/public-namespace/spark:v3.1.1 \
--conf spark.eventLog.enabled=true \
--conf spark.eventLog.dir=hdfs://11.8.36.125:8020/spark-k8s \
local:///opt/spark/examples/jars/spark-examples_2.12-3.1.1.jar
复制代码
构建镜像
上面的例子都是使用 Spark 官方自带的程序来提交作业,如果我们想要自定义一个程序可以使用 Spark 官网提供的脚本来构建镜像。
程序代码
该项目使用 Maven 来管理依赖。
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.12</version>
<scope>compile</scope>
</dependency>
</dependencies>
复制代码
程序代码如下,使用 Java 编写了一个 Word Count 程序。
package com.chengzw.wordcount;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
/**
* @description WordCount 示例
* @author chengzw
* @since 2021/7/25 8:39 下午
*/
public class MyJavaWordCount {
public static void main(String[] args) {
Logger.getLogger("org").setLevel(Level.OFF);
System.setProperty("spark.ui.showConsoleProgress","false");
//创建配置对象
//本地运行
//SparkConf conf = new SparkConf().setAppName("MyJavaWordCount").setMaster("local");
//在Spark上运行
SparkConf conf = new SparkConf().setAppName("MyJavaWordCount");
//创建SparkContext对象
JavaSparkContext sc = new JavaSparkContext(conf);
//读取hdfs数据
//在本地运行
//JavaRDD<String> rdd1= sc.textFile("/tmp/data.txt");
//在Spark上运行
JavaRDD<String> rdd1= sc.textFile(args[0]);
//分词
JavaRDD<String> rdd2 = rdd1.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String input) throws Exception {
return Arrays.asList(input.split(" ")).iterator();
}
});
//单词计数 word,1
JavaPairRDD<String, Integer> rdd3 = rdd2.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String word) throws Exception {
return new Tuple2<String, Integer>(word, 1);
}
});
//相同Key的值累加
JavaPairRDD<String, Integer> rdd4 = rdd3.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer a, Integer b) throws Exception {
return a + b;
}
});
//触发计算
List<Tuple2<String, Integer>> result = rdd4.collect();
//打印
for(Tuple2<String,Integer> r : result){
System.out.println(r._1 + "\t" + r._2);
}
//释放资源
sc.stop();
}
}
复制代码
打包
点击 mvn package 将程序打成 jar 包。
构建并上传镜像
将 jar 包放到 Spark 安装包的 examples/jars 目录中,进入 Spark 目录然后执行以下命令构建镜像。
bin/docker-image-tool.sh -r registry.cn-hangzhou.aliyuncs.com/public-namespace -t my-spark:1.0.0 build
复制代码
查看构建好的镜像。
❯ docker images | grep spark
registry.cn-hangzhou.aliyuncs.com/public-namespace/spark v1.0.0 372341ae930d 12 minutes ago 529MB
复制代码
上传镜像。
./docker-image-tool.sh -r registry.cn-hangzhou.aliyuncs.com/public-namespace -t v1.0.0 push
复制代码
使用自己构建的镜像执行 Word Count 程序。
bin/spark-submit \
--master k8s://https://11.16.0.153:6443 \
--conf spark.kubernetes.authenticate.submission.caCertFile=/Users/chengzhiwei/software/spark/spark-3.1.2-bin-hadoop3.2/certs/ca.crt \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=my-spark-spark \
--deploy-mode cluster \
--name spark-pi-submit \
--class com.chengzw.wordcount.MyJavaWordCount \
--conf spark.kubernetes.namespace=spark-job \
--conf spark.executor.instances=2 \
--conf spark.kubernetes.container.image.pullPolicy=Always \
--conf spark.kubernetes.container.image=registry.cn-hangzhou.aliyuncs.com/public-namespace/spark:v1.0.0 \
local:///opt/spark/examples/jars/spark-lab-1.0-SNAPSHOT.jar /etc/security/limits.conf
复制代码
查看执行结果:
kubectl logs -n spark-job spark-pi-submit-37945f7c4f24e729-driver
#返回结果
......
rss 2
space 2
priority 4
4 1
this 1
"soft" 1
max 14
cpu 1
memlock 1
apply 1
......
复制代码
参考资料
欢迎关注
评论