Spark 版本:3.0.0
找到入口
bin文件夹中在存放了 spark-shell 等其他入口方式。
可以看到是一个 shell 脚本,加载的类是 org.apache.spark.deploy.SparkSubmit。 $@ 把执行的参数带进去。接下来我们找到具体的类 SparkSubmit,在 deploy 这个包下。
SparkSubmit Object
上一步的 spark-submit 会创建一个 Java 进程,执行SparkSumbit.scala 文件的逻辑,这个文件有 1300 多行代码,一上来看哪里呢?还是抓主线,找入口,我们找到 main 函数。
main方法是 JVM 执行的入口。
override def main(args: Array[String]): Unit = { // Initialize logging if it hasn't been done yet. Keep track of whether logging needs to // be reset before the application starts. val uninitLog = initializeLogIfNecessary(true, silent = true)
val appArgs = new SparkSubmitArguments(args) if (appArgs.verbose) { // scalastyle:off println printStream.println(appArgs) // scalastyle:on println } appArgs.action match { case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog) case SparkSubmitAction.KILL => kill(appArgs) case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs) } }
复制代码
在main函数中,主要做了这几件事
1. 准备参数
这部分会 parse 命令行中的参数,如 --master local[*] 转成 SparkSubmitArguments 对象中的成员变量。同时检查参数输入的正不正确等逻辑。
2. 执行任务
如果是提交任务,着重看submit函数。
@tailrec private def submit(args: SparkSubmitArguments, uninitLog: Boolean): Unit = { // 该方法准备执行任务所需的一些环境变量等,返回四元祖 val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)
if (args.isStandaloneCluster && args.useRest) { try { ... doRunMain() } catch { // Fail over to use the legacy submission gateway ... } // In all other modes, just run the main class as prepared } else { doRunMain() }
复制代码
返回的四元组分别是:
childArgs:the arguments for the child process 子进程参数
childClasspath:a list of classpath entries for the child 子进程 classpath 列表
sparkConf:a map of system properties 系统属性 map
childMainClass:the main class for the child 子进程 main 方法
着重看一下 childMainClass。
2.1 部署模式的差异
// 模式 1 if (deployMode == CLIENT) { childMainClass = args.mainClass if (localPrimaryResource != null && isUserJar(localPrimaryResource)) { childClasspath += localPrimaryResource } if (localJars != null) { childClasspath ++= localJars.split(",") } }... if (args.isStandaloneCluster) { // 模式 2 if (args.useRest) { childMainClass = REST_CLUSTER_SUBMIT_CLASS childArgs += (args.primaryResource, args.mainClass) } else { // 模式 3 // In legacy standalone cluster mode, use Client as a wrapper around the user class childMainClass = STANDALONE_CLUSTER_SUBMIT_CLASS ...// 模式 4// In yarn-cluster mode, use yarn.Client as a wrapper around the user class if (isYarnCluster) { childMainClass = YARN_CLUSTER_SUBMIT_CLASS if (args.isPython) {
复制代码
如果部署模式为client,直接运行我们设置的主类的名字
比如 com.chixiaodou.wordcount
如果是StandaloneCluster,且使用了rest风格,
REST_CLUSTER_SUBMIT_CLASS = classOf[RestSubmissionClientApp].getName()
如果是StandaloneCluster, 没有使用rest
STANDALONE_CLUSTER_SUBMIT_CLASS = classOf[ClientApp].getName()
如果是Yarn集群上运行,则为
YARN_CLUSTER_SUBMIT_CLASS = "org.apache.spark.deploy.yarn.YarnClusterApplication"
代码后面还有其他的部署模式,不再罗列了。
2.2 runMain 方法
def doRunMain(): Unit = { if (args.proxyUser != null) { val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser, UserGroupInformation.getCurrentUser()) try { proxyUser.doAs(new PrivilegedExceptionAction[Unit]() { override def run(): Unit = { runMain(childArgs, childClasspath, sparkConf, childMainClass, args.verbose) } }) } catch { case e: Exception => ... } } else { runMain(childArgs, childClasspath, sparkConf, childMainClass, args.verbose) } }
复制代码
接下来执行doRunMain, 把准备好的参数传递到runMain方法中。
runMain方法执行 child class 的 main 方法,使用 submit函数中准备好的四元组。
private def runMain( childArgs: Seq[String], childClasspath: Seq[String], sparkConf: SparkConf, childMainClass: String, verbose: Boolean): Unit = {
... var mainClass: Class[_] = null
try { // 上一步准备environment的时候,选择了一个childMainClass,在 // 这里加载这个 childMainClass 类 // 通过一个类的字符串名称,构建一个类,也就是反射的方式创建一个类 mainClass = Utils.classForName(childMainClass) } catch {...}
// 根据 mainClass,构造 SparkApplication 对象 // 两种情况,1. mainClass有没有去继承 classOf[SparkApplication],如果有的话,反射创建一个实例并转换成asInstanceOf[SparkApplication] // 2. mainClass没有继承,通过new JavaMainApplication(mainClass) 构造 val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) { mainClass.newInstance().asInstanceOf[SparkApplication] } else { // SPARK-4170 if (classOf[scala.App].isAssignableFrom(mainClass)) { printWarning("Subclasses of scala.App may not work correctly. Use a main() method instead.") } new JavaMainApplication(mainClass) }
...
try { // 关键,调用 SparkApplication.start() app.start(childArgs.toArray, sparkConf) } catch { case t: Throwable => findCause(t) match { case SparkUserAppException(exitCode) => System.exit(exitCode)
case t: Throwable => throw t } } }
复制代码
在这一步,生成 Application,调用 Start 方法。最后这里逻辑比较复杂,我们来一点点掰开梳理
假设我们以 StandaloneCluster, 没有使用 rest 的方式启动集群,那么 mainClass = classOf[ClientApp].getName(),这个可以看上面 2.1 提到的详解
所以这里app.start(childArgs.toArray, sparkConf) 调用的应该是ClientApp, 跳到Client.scala文件看看start()都做了什么
代码执行到这里,Spark-Submit 的任务就完成了。我们看一下上面提及的所有调用的 UML 图。接下来我们以 Standalone Client 为例,接下去展开分析。
3. Driver - Rpc 环境准备
private[spark] class ClientApp extends SparkApplication {
override def start(args: Array[String], conf: SparkConf): Unit = { val driverArgs = new ClientArguments(args)
if (!conf.contains("spark.rpc.askTimeout")) { conf.set("spark.rpc.askTimeout", "10s") } Logger.getRootLogger.setLevel(driverArgs.logLevel)
val rpcEnv = RpcEnv.create("driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf))
val masterEndpoints = driverArgs.masters.map(RpcAddress.fromSparkURL). map(rpcEnv.setupEndpointRef(_, Master.ENDPOINT_NAME)) rpcEnv.setupEndpoint("client", new ClientEndpoint(rpcEnv, driverArgs, masterEndpoints, conf))
rpcEnv.awaitTermination() }
}
复制代码
这段代码具体做了如下几件事:
代码中涉及 Spark RPC 通信的知识,RPC 是一种进程间的通信方式。如果对这部分知识没有了解的话,会看得有点一头雾水。这部分知识也非常庞大,需要另外开一个文档来详细介绍。
3.1 Spark RPC 设计
这里就先简单介绍一下,让这段代码看起来不那么尴尬。
Spark 3.0 的 RPC 框架是基于 Netty,Netty 使用经典的 Actor 模型做消息传递。图中画了若干个组建。
RpcEnv:为 RpcEndpoint 提供了一个处理消息的环境,负责 Endpoint 的生命周期管理,注册 Endpoint,消息间的路由,停止 Endpoint 等。
RpcEndpoint:代表具体的通信节点。例如 Master,Worker,DriverEndpoint 等,它们都实现该接口。一个 RpcEndpoint 的生命周期是create -> onStart -> receive* -> onStop。receive和receiveAndReply 分别用来接收其他 endpoint 发过来的 send 和 ask 消息。
RpcEndpointRef:顾名思义,是一个 RpcEndpoint 的引用。当我们想向 master,worker 发送消息的时候,需要先获取到如 master Endpoint 的引用。
RpcAddress:表示远程 RpcEndpointRef 的地址,包含 Host 和 Port。
Spark 是一个分布式的系统,想要跟其他的端点通信,需要按照如上的步骤,建立通信环境,通信地址及引用。接下来才可以通过发送消息请求的方式,在其他 Endpoint 执行如 launchDriver,registerWorker 等动作。
4. 启动 Driver
接上面 ClientApp start 继续往下看。在 RpcEnv 中构建一个 ClientEndpoint。
private[spark] class ClientApp extends SparkApplication {
override def start(args: Array[String], conf: SparkConf): Unit = { ... rpcEnv.setupEndpoint("client", new ClientEndpoint(rpcEnv, driverArgs, masterEndpoints, conf)) ...}}
复制代码
而我们前面又讲过,一个 RpcEndpoint 的会调用onStart()。进入 ClientEndpoint的 onStart() 进行查看。
4.1 向 Master 发送请求
private class ClientEndpoint( override val rpcEnv: RpcEnv, driverArgs: ClientArguments, masterEndpoints: Seq[RpcEndpointRef], conf: SparkConf) extends ThreadSafeRpcEndpoint with Logging { ... override def onStart(): Unit = { driverArgs.cmd match { // 匹配到 launch driver case "launch" => .... // 敲重点,这里重点记忆一下,DriverWrapper 作为 mainClass val mainClass = "org.apache.spark.deploy.worker.DriverWrapper" ... // 准备通信的请求信息 val driverDescription = new DriverDescription( driverArgs.jarUrl, driverArgs.memory, driverArgs.cores, driverArgs.supervise, command) //向master发送提交启动 Driver 的请求 asyncSendToMasterAndForwardReply[SubmitDriverResponse]( RequestSubmitDriver(driverDescription)) ...
复制代码
在onStart()中匹配到 launch这个动作,并向 MasterEndpoint 发送 RequestSubmitDriver消息。
4.2 Master
接下来我们跳到 Master.scala(package org.apache.spark.deploy.master) 去看看 Master 节点如何处理这个请求。
在 Spark 的 Rpc 设计中,receive 和 receiveAndReply分别用来接收 send 和 ask 类型的消息。
Sends a one-way asynchronous message. Fire-and-forget semantics.
def send(message: Any): Unit
Send a message to the corresponding [[RpcEndpoint.receiveAndReply)]] and return a [[Future]] to receive the reply within the specified timeout.
def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T]:
5. Master 接收消息并回复
RequestSubmitDriver 在下面这个方法中调用。这里调用了我们上面提到的 ask()方法。也就是说会触发 Master 的 receiveAndReply()。
/** * Send the message to master and forward the reply to self asynchronously. */ private def asyncSendToMasterAndForwardReply[T: ClassTag](message: Any): Unit = { for (masterEndpoint <- masterEndpoints) { masterEndpoint.ask[T](message).onComplete { case Success(v) => self.send(v) case Failure(e) => logWarning(s"Error sending messages to master $masterEndpoint", e) }(forwardMessageExecutionContext) } }
复制代码
Master 的 receiveAndReply()方法。
private[deploy] class Master( override val rpcEnv: RpcEnv, address: RpcAddress, webUiPort: Int, val securityMgr: SecurityManager, val conf: SparkConf) extends ThreadSafeRpcEndpoint with Logging with LeaderElectable { .... // 重点 override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { case RequestSubmitDriver(description) => ... // 根据请求的description创建一个 DriverInfo val driver = createDriver(description) persistenceEngine.addDriver(driver) // 重点:先把driver存一份到 waiting list waitingDrivers += driver drivers.add(driver) // 重点关注 schedule()
复制代码
把 Driver 信息准备好之后,最后来到 schedule()方法,该方法中,会把所有的 waitingDrivers等待被分配的 driver 一一分配出去,分配到 alive 的 worker 中。
private def schedule(): Unit = { if (state != RecoveryState.ALIVE) { return } // Drivers take strict precedence over executors // 打散 worker val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE)) val numWorkersAlive = shuffledAliveWorkers.size var curPos = 0 for (driver <- waitingDrivers.toList) { var launched = false var numWorkersVisited = 0 while (numWorkersVisited < numWorkersAlive && !launched) { val worker = shuffledAliveWorkers(curPos) numWorkersVisited += 1 if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) { // 关键:启动 driver launchDriver(worker, driver) waitingDrivers -= driver launched = true } curPos = (curPos + 1) % numWorkersAlive } } startExecutorsOnWorkers() }
复制代码
我们来看 launchDriver(),又是一个远程调用。
private def launchDriver(worker: WorkerInfo, driver: DriverInfo) { logInfo("Launching driver " + driver.id + " on worker " + worker.id) worker.addDriver(driver) driver.worker = Some(worker) // 远程调用 worker.endpoint.send(LaunchDriver(driver.id, driver.desc)) driver.state = DriverState.RUNNING }
复制代码
这里调用了 worker endpoint 的 send 方法,因此要看接收方的 receive() 详情。同理,因为这个是往 Worker 发消息,那就去 Worker.scala 看看 Worker 怎么处理这个消息。
6. Worker 接收消息
终于扒到这个非常关键的信息了。终于要开始构建我们的 Driver 了。
private[deploy] class Worker( override val rpcEnv: RpcEnv, webUiPort: Int, ... val securityMgr: SecurityManager) extends ThreadSafeRpcEndpoint with Logging { ... override def receive: PartialFunction[Any, Unit] = synchronized { // 启动 Driver case LaunchDriver(driverId, driverDesc) => logInfo(s"Asked to launch driver $driverId") // 创建 DriverRunner val driver = new DriverRunner(...) drivers(driverId) = driver driver.start() // 记录 core 和 memory 使用了多少 coresUsed += driverDesc.cores memoryUsed += driverDesc.mem
复制代码
接下去就是比较细节的启动工作。Driver 启动的详细信息又可以单开一个篇幅来讲了。
至此,我们从找到 Spark 的入口, spark 提交任务开始,准备参数,执行 submit 任务,根据特定的部署模式,启动相应的 Spark Application,又通过了解 Spark 的远程调用设计,摸索到 Client,Master,Worker 之间如何发送启动 Driver 的信息。一步一步找到 launchDriver 的入口。
最后还是画一张 UML 图来更加清晰的展现整体交互。
常驻小尾巴
都看到这里了,不关注一下嘛 👇 👇 👇
我的免费星球,欢迎来看我的日常碎碎念
参考资料
陈凯 https://zhuanlan.zhihu.com/p/84506391
简书 https://www.jianshu.com/p/4d4964c505fe?utm_campaign=maleskine&utm_content=note&utm_medium=seo_notes&utm_source=recommendation
尚硅谷 P131 源码阅读 https://www.bilibili.com/video/BV11A411L7CK?p=131&spm_id_from=pageDriver
Solve https://cloud.tencent.com/developer/article/1574348
评论