Spark 版本:3.0.0
找到入口
bin
文件夹中在存放了 spark-shell
等其他入口方式。
可以看到是一个 shell 脚本,加载的类是 org.apache.spark.deploy.SparkSubmit
。 $@
把执行的参数带进去。接下来我们找到具体的类 SparkSubmit
,在 deploy 这个包下。
SparkSubmit Object
上一步的 spark-submit
会创建一个 Java 进程,执行SparkSumbit.scala
文件的逻辑,这个文件有 1300 多行代码,一上来看哪里呢?还是抓主线,找入口,我们找到 main
函数。
main
方法是 JVM 执行的入口。
override def main(args: Array[String]): Unit = {
// Initialize logging if it hasn't been done yet. Keep track of whether logging needs to
// be reset before the application starts.
val uninitLog = initializeLogIfNecessary(true, silent = true)
val appArgs = new SparkSubmitArguments(args)
if (appArgs.verbose) {
// scalastyle:off println
printStream.println(appArgs)
// scalastyle:on println
}
appArgs.action match {
case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)
case SparkSubmitAction.KILL => kill(appArgs)
case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
}
}
复制代码
在main
函数中,主要做了这几件事
1. 准备参数
这部分会 parse 命令行中的参数,如 --master local[*]
转成 SparkSubmitArguments
对象中的成员变量。同时检查参数输入的正不正确等逻辑。
2. 执行任务
如果是提交任务,着重看submit
函数。
@tailrec
private def submit(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
// 该方法准备执行任务所需的一些环境变量等,返回四元祖
val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)
if (args.isStandaloneCluster && args.useRest) {
try {
...
doRunMain()
} catch {
// Fail over to use the legacy submission gateway
...
}
// In all other modes, just run the main class as prepared
} else {
doRunMain()
}
复制代码
返回的四元组分别是:
childArgs
:the arguments for the child process 子进程参数
childClasspath
:a list of classpath entries for the child 子进程 classpath 列表
sparkConf
:a map of system properties 系统属性 map
childMainClass
:the main class for the child 子进程 main 方法
着重看一下 childMainClass
。
2.1 部署模式的差异
// 模式 1
if (deployMode == CLIENT) {
childMainClass = args.mainClass
if (localPrimaryResource != null && isUserJar(localPrimaryResource)) {
childClasspath += localPrimaryResource
}
if (localJars != null) { childClasspath ++= localJars.split(",") }
}
...
if (args.isStandaloneCluster) {
// 模式 2
if (args.useRest) {
childMainClass = REST_CLUSTER_SUBMIT_CLASS
childArgs += (args.primaryResource, args.mainClass)
} else {
// 模式 3
// In legacy standalone cluster mode, use Client as a wrapper around the user class
childMainClass = STANDALONE_CLUSTER_SUBMIT_CLASS
...
// 模式 4
// In yarn-cluster mode, use yarn.Client as a wrapper around the user class
if (isYarnCluster) {
childMainClass = YARN_CLUSTER_SUBMIT_CLASS
if (args.isPython) {
复制代码
如果部署模式为client
,直接运行我们设置的主类的名字
比如 com.chixiaodou.wordcount
如果是StandaloneCluster
,且使用了rest
风格,
REST_CLUSTER_SUBMIT_CLASS = classOf[RestSubmissionClientApp].getName()
如果是StandaloneCluster
, 没有使用rest
STANDALONE_CLUSTER_SUBMIT_CLASS = classOf[ClientApp].getName()
如果是Yarn
集群上运行,则为
YARN_CLUSTER_SUBMIT_CLASS = "org.apache.spark.deploy.yarn.YarnClusterApplication"
代码后面还有其他的部署模式,不再罗列了。
2.2 runMain 方法
def doRunMain(): Unit = {
if (args.proxyUser != null) {
val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser,
UserGroupInformation.getCurrentUser())
try {
proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {
override def run(): Unit = {
runMain(childArgs, childClasspath, sparkConf, childMainClass, args.verbose)
}
})
} catch {
case e: Exception =>
...
}
} else {
runMain(childArgs, childClasspath, sparkConf, childMainClass, args.verbose)
}
}
复制代码
接下来执行doRunMain
, 把准备好的参数传递到runMain
方法中。
runMain
方法执行 child class 的 main 方法,使用 submit
函数中准备好的四元组。
private def runMain(
childArgs: Seq[String],
childClasspath: Seq[String],
sparkConf: SparkConf,
childMainClass: String,
verbose: Boolean): Unit = {
...
var mainClass: Class[_] = null
try {
// 上一步准备environment的时候,选择了一个childMainClass,在
// 这里加载这个 childMainClass 类
// 通过一个类的字符串名称,构建一个类,也就是反射的方式创建一个类
mainClass = Utils.classForName(childMainClass)
} catch {...}
// 根据 mainClass,构造 SparkApplication 对象
// 两种情况,1. mainClass有没有去继承 classOf[SparkApplication],如果有的话,反射创建一个实例并转换成asInstanceOf[SparkApplication]
// 2. mainClass没有继承,通过new JavaMainApplication(mainClass) 构造
val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) {
mainClass.newInstance().asInstanceOf[SparkApplication]
} else {
// SPARK-4170
if (classOf[scala.App].isAssignableFrom(mainClass)) {
printWarning("Subclasses of scala.App may not work correctly. Use a main() method instead.")
}
new JavaMainApplication(mainClass)
}
...
try {
// 关键,调用 SparkApplication.start()
app.start(childArgs.toArray, sparkConf)
} catch {
case t: Throwable =>
findCause(t) match {
case SparkUserAppException(exitCode) =>
System.exit(exitCode)
case t: Throwable =>
throw t
}
}
}
复制代码
在这一步,生成 Application,调用 Start
方法。最后这里逻辑比较复杂,我们来一点点掰开梳理
假设我们以 StandaloneCluster
, 没有使用 rest
的方式启动集群,那么 mainClass = classOf[ClientApp].getName()
,这个可以看上面 2.1 提到的详解
所以这里app.start(childArgs.toArray, sparkConf)
调用的应该是ClientApp
, 跳到Client.scala
文件看看start()
都做了什么
代码执行到这里,Spark-Submit 的任务就完成了。我们看一下上面提及的所有调用的 UML 图。接下来我们以 Standalone Client 为例,接下去展开分析。
3. Driver - Rpc 环境准备
private[spark] class ClientApp extends SparkApplication {
override def start(args: Array[String], conf: SparkConf): Unit = {
val driverArgs = new ClientArguments(args)
if (!conf.contains("spark.rpc.askTimeout")) {
conf.set("spark.rpc.askTimeout", "10s")
}
Logger.getRootLogger.setLevel(driverArgs.logLevel)
val rpcEnv =
RpcEnv.create("driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf))
val masterEndpoints = driverArgs.masters.map(RpcAddress.fromSparkURL).
map(rpcEnv.setupEndpointRef(_, Master.ENDPOINT_NAME))
rpcEnv.setupEndpoint("client", new ClientEndpoint(rpcEnv, driverArgs, masterEndpoints, conf))
rpcEnv.awaitTermination()
}
}
复制代码
这段代码具体做了如下几件事:
代码中涉及 Spark RPC
通信的知识,RPC 是一种进程间的通信方式。如果对这部分知识没有了解的话,会看得有点一头雾水。这部分知识也非常庞大,需要另外开一个文档来详细介绍。
3.1 Spark RPC 设计
这里就先简单介绍一下,让这段代码看起来不那么尴尬。
Spark 3.0
的 RPC 框架是基于 Netty,Netty
使用经典的 Actor 模型
做消息传递。图中画了若干个组建。
RpcEnv
:为 RpcEndpoint 提供了一个处理消息的环境,负责 Endpoint 的生命周期管理,注册 Endpoint,消息间的路由,停止 Endpoint 等。
RpcEndpoint
:代表具体的通信节点。例如 Master,Worker,DriverEndpoint 等,它们都实现该接口。一个 RpcEndpoint 的生命周期是create -> onStart -> receive* -> onStop
。receive
和receiveAndReply
分别用来接收其他 endpoint 发过来的 send
和 ask
消息。
RpcEndpointRef
:顾名思义,是一个 RpcEndpoint 的引用。当我们想向 master,worker 发送消息的时候,需要先获取到如 master Endpoint 的引用。
RpcAddress
:表示远程 RpcEndpointRef 的地址,包含 Host 和 Port。
Spark 是一个分布式的系统
,想要跟其他的端点通信,需要按照如上的步骤,建立通信环境,通信地址及引用。接下来才可以通过发送消息请求的方式,在其他 Endpoint
执行如 launchDriver
,registerWorker
等动作。
4. 启动 Driver
接上面 ClientApp start
继续往下看。在 RpcEnv 中构建一个 ClientEndpoint
。
private[spark] class ClientApp extends SparkApplication {
override def start(args: Array[String], conf: SparkConf): Unit = {
...
rpcEnv.setupEndpoint("client", new ClientEndpoint(rpcEnv, driverArgs, masterEndpoints, conf))
...}
}
复制代码
而我们前面又讲过,一个 RpcEndpoint 的会调用onStart()
。进入 ClientEndpoint
的 onStart()
进行查看。
4.1 向 Master 发送请求
private class ClientEndpoint(
override val rpcEnv: RpcEnv,
driverArgs: ClientArguments,
masterEndpoints: Seq[RpcEndpointRef],
conf: SparkConf)
extends ThreadSafeRpcEndpoint with Logging {
...
override def onStart(): Unit = {
driverArgs.cmd match {
// 匹配到 launch driver
case "launch" =>
....
// 敲重点,这里重点记忆一下,DriverWrapper 作为 mainClass
val mainClass = "org.apache.spark.deploy.worker.DriverWrapper"
...
// 准备通信的请求信息
val driverDescription = new DriverDescription(
driverArgs.jarUrl,
driverArgs.memory,
driverArgs.cores,
driverArgs.supervise,
command)
//向master发送提交启动 Driver 的请求
asyncSendToMasterAndForwardReply[SubmitDriverResponse](
RequestSubmitDriver(driverDescription))
...
复制代码
在onStart()
中匹配到 launch
这个动作,并向 MasterEndpoint 发送 RequestSubmitDriver
消息。
4.2 Master
接下来我们跳到 Master.scala
(package org.apache.spark.deploy.master) 去看看 Master 节点如何处理这个请求。
在 Spark 的 Rpc 设计中,receive
和 receiveAndReply
分别用来接收 send
和 ask
类型的消息。
Sends a one-way asynchronous message. Fire-and-forget semantics.
def send(message: Any): Unit
Send a message to the corresponding [[RpcEndpoint.receiveAndReply)]] and return a [[Future]] to receive the reply within the specified timeout.
def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T]
:
5. Master 接收消息并回复
RequestSubmitDriver
在下面这个方法中调用。这里调用了我们上面提到的 ask()
方法。也就是说会触发 Master 的 receiveAndReply()
。
/** * Send the message to master and forward the reply to self asynchronously. */
private def asyncSendToMasterAndForwardReply[T: ClassTag](message: Any): Unit = { for (masterEndpoint <- masterEndpoints) { masterEndpoint.ask[T](message).onComplete { case Success(v) => self.send(v) case Failure(e) => logWarning(s"Error sending messages to master $masterEndpoint", e) }(forwardMessageExecutionContext) } }
复制代码
Master 的 receiveAndReply()
方法。
private[deploy] class Master( override val rpcEnv: RpcEnv, address: RpcAddress, webUiPort: Int, val securityMgr: SecurityManager, val conf: SparkConf) extends ThreadSafeRpcEndpoint with Logging with LeaderElectable { .... // 重点 override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { case RequestSubmitDriver(description) => ... // 根据请求的description创建一个 DriverInfo val driver = createDriver(description) persistenceEngine.addDriver(driver) // 重点:先把driver存一份到 waiting list waitingDrivers += driver drivers.add(driver) // 重点关注 schedule()
复制代码
把 Driver 信息准备好之后,最后来到 schedule()
方法,该方法中,会把所有的 waitingDrivers
等待被分配的 driver 一一分配出去,分配到 alive 的 worker 中。
private def schedule(): Unit = {
if (state != RecoveryState.ALIVE) {
return
}
// Drivers take strict precedence over executors
// 打散 worker
val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
val numWorkersAlive = shuffledAliveWorkers.size
var curPos = 0
for (driver <- waitingDrivers.toList) {
var launched = false
var numWorkersVisited = 0
while (numWorkersVisited < numWorkersAlive && !launched) {
val worker = shuffledAliveWorkers(curPos)
numWorkersVisited += 1
if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
// 关键:启动 driver
launchDriver(worker, driver)
waitingDrivers -= driver
launched = true
}
curPos = (curPos + 1) % numWorkersAlive
}
}
startExecutorsOnWorkers()
}
复制代码
我们来看 launchDriver()
,又是一个远程调用。
private def launchDriver(worker: WorkerInfo, driver: DriverInfo) {
logInfo("Launching driver " + driver.id + " on worker " + worker.id)
worker.addDriver(driver)
driver.worker = Some(worker)
// 远程调用
worker.endpoint.send(LaunchDriver(driver.id, driver.desc))
driver.state = DriverState.RUNNING
}
复制代码
这里调用了 worker endpoint 的 send
方法,因此要看接收方的 receive()
详情。同理,因为这个是往 Worker 发消息,那就去 Worker.scala
看看 Worker 怎么处理这个消息。
6. Worker 接收消息
终于扒到这个非常关键的信息了。终于要开始构建我们的 Driver 了。
private[deploy] class Worker(
override val rpcEnv: RpcEnv,
webUiPort: Int,
...
val securityMgr: SecurityManager)
extends ThreadSafeRpcEndpoint with Logging {
...
override def receive: PartialFunction[Any, Unit] = synchronized {
// 启动 Driver
case LaunchDriver(driverId, driverDesc) =>
logInfo(s"Asked to launch driver $driverId")
// 创建 DriverRunner
val driver = new DriverRunner(...)
drivers(driverId) = driver
driver.start()
// 记录 core 和 memory 使用了多少
coresUsed += driverDesc.cores
memoryUsed += driverDesc.mem
复制代码
接下去就是比较细节的启动工作。Driver 启动的详细信息又可以单开一个篇幅来讲了。
至此,我们从找到 Spark 的入口
, spark 提交任务开始,准备参数,执行 submit 任务,根据特定的部署模式,启动相应的 Spark Application
,又通过了解 Spark 的远程调用设计,摸索到 Client
,Master
,Worker
之间如何发送启动 Driver
的信息。一步一步找到 launchDriver
的入口。
最后还是画一张 UML 图来更加清晰的展现整体交互。
常驻小尾巴
都看到这里了,不关注一下嘛 👇 👇 👇
我的免费星球,欢迎来看我的日常碎碎念
参考资料
陈凯 https://zhuanlan.zhihu.com/p/84506391
简书 https://www.jianshu.com/p/4d4964c505fe?utm_campaign=maleskine&utm_content=note&utm_medium=seo_notes&utm_source=recommendation
尚硅谷 P131 源码阅读 https://www.bilibili.com/video/BV11A411L7CK?p=131&spm_id_from=pageDriver
Solve https://cloud.tencent.com/developer/article/1574348
评论