写点什么

Spark 源码阅读 02:从 Spark-Submit 到 Driver 启动

发布于: 2 小时前
Spark 源码阅读 02:从 Spark-Submit 到 Driver 启动

Spark 版本:3.0.0

找到入口

  • 理解一个项目最好的切入点就是找到一个入口。比如跟 Spark 的交互方式,如何把任务提交到 Spark 集群。通常使用 spark-submit 脚本来提交任务。它在 Spark 源码的 bin 目录下



bin文件夹中在存放了 spark-shell 等其他入口方式。


  • 我们来看一下 spark-submit 的具体内容。



可以看到是一个 shell 脚本,加载的类是 org.apache.spark.deploy.SparkSubmit$@ 把执行的参数带进去。接下来我们找到具体的类 SparkSubmit,在 deploy 这个包下。

SparkSubmit Object

上一步的 spark-submit 会创建一个 Java 进程,执行SparkSumbit.scala 文件的逻辑,这个文件有 1300 多行代码,一上来看哪里呢?还是抓主线,找入口,我们找到 main 函数。


main方法是 JVM 执行的入口。



  override def main(args: Array[String]): Unit = {    // Initialize logging if it hasn't been done yet. Keep track of whether logging needs to    // be reset before the application starts.    val uninitLog = initializeLogIfNecessary(true, silent = true)
val appArgs = new SparkSubmitArguments(args) if (appArgs.verbose) { // scalastyle:off println printStream.println(appArgs) // scalastyle:on println } appArgs.action match { case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog) case SparkSubmitAction.KILL => kill(appArgs) case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs) } }
复制代码


main函数中,主要做了这几件事


  • 准备参数

  • 匹配任务的执行类型

1. 准备参数


这部分会 parse 命令行中的参数,如 --master local[*] 转成 SparkSubmitArguments 对象中的成员变量。同时检查参数输入的正不正确等逻辑。


2. 执行任务

如果是提交任务,着重看submit函数。


  @tailrec  private def submit(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {    // 该方法准备执行任务所需的一些环境变量等,返回四元祖    val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)
if (args.isStandaloneCluster && args.useRest) { try { ... doRunMain() } catch { // Fail over to use the legacy submission gateway ... } // In all other modes, just run the main class as prepared } else { doRunMain() }
复制代码


返回的四元组分别是:


  • childArgs:the arguments for the child process 子进程参数

  • childClasspath:a list of classpath entries for the child 子进程 classpath 列表

  • sparkConf:a map of system properties 系统属性 map

  • childMainClass:the main class for the child 子进程 main 方法


着重看一下 childMainClass

2.1 部署模式的差异

// 模式 1  if (deployMode == CLIENT) {      childMainClass = args.mainClass      if (localPrimaryResource != null && isUserJar(localPrimaryResource)) {        childClasspath += localPrimaryResource      }      if (localJars != null) { childClasspath ++= localJars.split(",") }    }...  if (args.isStandaloneCluster) {      // 模式 2      if (args.useRest) {        childMainClass = REST_CLUSTER_SUBMIT_CLASS        childArgs += (args.primaryResource, args.mainClass)      } else {        // 模式 3        // In legacy standalone cluster mode, use Client as a wrapper around the user class        childMainClass = STANDALONE_CLUSTER_SUBMIT_CLASS        ...// 模式 4// In yarn-cluster mode, use yarn.Client as a wrapper around the user class  if (isYarnCluster) {      childMainClass = YARN_CLUSTER_SUBMIT_CLASS      if (args.isPython) {
复制代码


  • 如果部署模式为client,直接运行我们设置的主类的名字

  • 比如 com.chixiaodou.wordcount

  • 如果是StandaloneCluster,且使用了rest风格,

  • REST_CLUSTER_SUBMIT_CLASS = classOf[RestSubmissionClientApp].getName()

  • 如果是StandaloneCluster, 没有使用rest

  • STANDALONE_CLUSTER_SUBMIT_CLASS = classOf[ClientApp].getName()

  • 如果是Yarn集群上运行,则为

  • YARN_CLUSTER_SUBMIT_CLASS = "org.apache.spark.deploy.yarn.YarnClusterApplication"


代码后面还有其他的部署模式,不再罗列了。

2.2 runMain 方法

    def doRunMain(): Unit = {      if (args.proxyUser != null) {        val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser,          UserGroupInformation.getCurrentUser())        try {          proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {            override def run(): Unit = {              runMain(childArgs, childClasspath, sparkConf, childMainClass, args.verbose)            }          })        } catch {          case e: Exception =>            ...        }      } else {        runMain(childArgs, childClasspath, sparkConf, childMainClass, args.verbose)      }    }
复制代码


接下来执行doRunMain, 把准备好的参数传递到runMain方法中。


runMain方法执行 child class 的 main 方法,使用 submit函数中准备好的四元组。


  private def runMain(      childArgs: Seq[String],      childClasspath: Seq[String],      sparkConf: SparkConf,      childMainClass: String,      verbose: Boolean): Unit = {
... var mainClass: Class[_] = null
try { // 上一步准备environment的时候,选择了一个childMainClass,在 // 这里加载这个 childMainClass 类 // 通过一个类的字符串名称,构建一个类,也就是反射的方式创建一个类 mainClass = Utils.classForName(childMainClass) } catch {...}
// 根据 mainClass,构造 SparkApplication 对象 // 两种情况,1. mainClass有没有去继承 classOf[SparkApplication],如果有的话,反射创建一个实例并转换成asInstanceOf[SparkApplication] // 2. mainClass没有继承,通过new JavaMainApplication(mainClass) 构造 val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) { mainClass.newInstance().asInstanceOf[SparkApplication] } else { // SPARK-4170 if (classOf[scala.App].isAssignableFrom(mainClass)) { printWarning("Subclasses of scala.App may not work correctly. Use a main() method instead.") } new JavaMainApplication(mainClass) }
...
try { // 关键,调用 SparkApplication.start() app.start(childArgs.toArray, sparkConf) } catch { case t: Throwable => findCause(t) match { case SparkUserAppException(exitCode) => System.exit(exitCode)
case t: Throwable => throw t } } }
复制代码


在这一步,生成 Application,调用 Start 方法。最后这里逻辑比较复杂,我们来一点点掰开梳理


  • 假设我们以 StandaloneCluster, 没有使用 rest 的方式启动集群,那么 mainClass = classOf[ClientApp].getName(),这个可以看上面 2.1 提到的详解

  • 所以这里app.start(childArgs.toArray, sparkConf) 调用的应该是ClientApp, 跳到Client.scala文件看看start()都做了什么


代码执行到这里,Spark-Submit 的任务就完成了。我们看一下上面提及的所有调用的 UML 图。接下来我们以 Standalone Client 为例,接下去展开分析。


3. Driver - Rpc 环境准备

private[spark] class ClientApp extends SparkApplication {
override def start(args: Array[String], conf: SparkConf): Unit = { val driverArgs = new ClientArguments(args)
if (!conf.contains("spark.rpc.askTimeout")) { conf.set("spark.rpc.askTimeout", "10s") } Logger.getRootLogger.setLevel(driverArgs.logLevel)
val rpcEnv = RpcEnv.create("driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf))
val masterEndpoints = driverArgs.masters.map(RpcAddress.fromSparkURL). map(rpcEnv.setupEndpointRef(_, Master.ENDPOINT_NAME)) rpcEnv.setupEndpoint("client", new ClientEndpoint(rpcEnv, driverArgs, masterEndpoints, conf))
rpcEnv.awaitTermination() }
}
复制代码


这段代码具体做了如下几件事:


  • 创建 RPC 通信环境 rpcEnv

  • 获取 Master 的通信邮箱 RpcEndpointRef,用于和 Master 通信

  • 注册RpcEndpoint,调用onStart方法。让 clientEndpoint 可以和 masterEndpoint 通信


代码中涉及 Spark RPC 通信的知识,RPC 是一种进程间的通信方式。如果对这部分知识没有了解的话,会看得有点一头雾水。这部分知识也非常庞大,需要另外开一个文档来详细介绍。

3.1 Spark RPC 设计

这里就先简单介绍一下,让这段代码看起来不那么尴尬。



Spark 3.0 的 RPC 框架是基于 Netty,Netty 使用经典的 Actor 模型做消息传递。图中画了若干个组建。


  • RpcEnv:为 RpcEndpoint 提供了一个处理消息的环境,负责 Endpoint 的生命周期管理,注册 Endpoint,消息间的路由,停止 Endpoint 等。

  • RpcEndpoint:代表具体的通信节点。例如 Master,Worker,DriverEndpoint 等,它们都实现该接口。一个 RpcEndpoint 的生命周期是create -> onStart -> receive* -> onStopreceivereceiveAndReply 分别用来接收其他 endpoint 发过来的 sendask 消息。

  • RpcEndpointRef:顾名思义,是一个 RpcEndpoint 的引用。当我们想向 master,worker 发送消息的时候,需要先获取到如 master Endpoint 的引用。

  • RpcAddress:表示远程 RpcEndpointRef 的地址,包含 Host 和 Port。


Spark 是一个分布式的系统,想要跟其他的端点通信,需要按照如上的步骤,建立通信环境,通信地址及引用。接下来才可以通过发送消息请求的方式,在其他 Endpoint 执行如 launchDriverregisterWorker 等动作。

4. 启动 Driver

接上面 ClientApp start 继续往下看。在 RpcEnv 中构建一个 ClientEndpoint


private[spark] class ClientApp extends SparkApplication {
override def start(args: Array[String], conf: SparkConf): Unit = { ... rpcEnv.setupEndpoint("client", new ClientEndpoint(rpcEnv, driverArgs, masterEndpoints, conf)) ...}}
复制代码


而我们前面又讲过,一个 RpcEndpoint 的会调用onStart()。进入 ClientEndpointonStart() 进行查看。

4.1 向 Master 发送请求

private class ClientEndpoint(    override val rpcEnv: RpcEnv,    driverArgs: ClientArguments,    masterEndpoints: Seq[RpcEndpointRef],    conf: SparkConf)  extends ThreadSafeRpcEndpoint with Logging {    ...    override def onStart(): Unit = {    driverArgs.cmd match {      // 匹配到 launch driver      case "launch" =>            ....          // 敲重点,这里重点记忆一下,DriverWrapper 作为 mainClass          val mainClass = "org.apache.spark.deploy.worker.DriverWrapper"          ...          // 准备通信的请求信息          val driverDescription = new DriverDescription(            driverArgs.jarUrl,            driverArgs.memory,            driverArgs.cores,            driverArgs.supervise,            command)         //向master发送提交启动 Driver 的请求        asyncSendToMasterAndForwardReply[SubmitDriverResponse](          RequestSubmitDriver(driverDescription))      ...
复制代码


onStart()中匹配到 launch这个动作,并向 MasterEndpoint 发送 RequestSubmitDriver消息。

4.2 Master

接下来我们跳到 Master.scala(package org.apache.spark.deploy.master) 去看看 Master 节点如何处理这个请求。


在 Spark 的 Rpc 设计中,receivereceiveAndReply分别用来接收 sendask 类型的消息。


Sends a one-way asynchronous message. Fire-and-forget semantics.

def send(message: Any): Unit


  • 发送单向的异步消息,发送了就不再返回任何东西。


Send a message to the corresponding [[RpcEndpoint.receiveAndReply)]] and return a [[Future]] to receive the reply within the specified timeout.

def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T]:


  • 发送消息出去之后,返回一个 [[Future]],并在规定的 timeout 时间内接收回复。

5. Master 接收消息并回复

RequestSubmitDriver 在下面这个方法中调用。这里调用了我们上面提到的 ask()方法。也就是说会触发 Master 的 receiveAndReply()


  /**   * Send the message to master and forward the reply to self asynchronously.   */  private def asyncSendToMasterAndForwardReply[T: ClassTag](message: Any): Unit = {    for (masterEndpoint <- masterEndpoints) {      masterEndpoint.ask[T](message).onComplete {        case Success(v) => self.send(v)        case Failure(e) =>          logWarning(s"Error sending messages to master $masterEndpoint", e)      }(forwardMessageExecutionContext)    }  }
复制代码


Master 的 receiveAndReply()方法。


private[deploy] class Master(    override val rpcEnv: RpcEnv,    address: RpcAddress,    webUiPort: Int,    val securityMgr: SecurityManager,    val conf: SparkConf)  extends ThreadSafeRpcEndpoint with Logging with LeaderElectable {    ....      // 重点  override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {    case RequestSubmitDriver(description) =>      ...        // 根据请求的description创建一个 DriverInfo        val driver = createDriver(description)        persistenceEngine.addDriver(driver)        // 重点:先把driver存一份到 waiting list        waitingDrivers += driver        drivers.add(driver)        // 重点关注        schedule()
复制代码


  • 构建 DriverInfo,保存后续 Driver 启动所需的信息

  • driver append 到 waiting list,方便后续有 worker 资源了再去创建

  • 执行 schedule()


把 Driver 信息准备好之后,最后来到 schedule()方法,该方法中,会把所有的 waitingDrivers等待被分配的 driver 一一分配出去,分配到 alive 的 worker 中。


private def schedule(): Unit = {    if (state != RecoveryState.ALIVE) {      return    }    // Drivers take strict precedence over executors    // 打散 worker    val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))    val numWorkersAlive = shuffledAliveWorkers.size    var curPos = 0    for (driver <- waitingDrivers.toList) {       var launched = false      var numWorkersVisited = 0      while (numWorkersVisited < numWorkersAlive && !launched) {        val worker = shuffledAliveWorkers(curPos)        numWorkersVisited += 1        if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {          // 关键:启动 driver          launchDriver(worker, driver)          waitingDrivers -= driver          launched = true        }        curPos = (curPos + 1) % numWorkersAlive      }    }    startExecutorsOnWorkers()  }
复制代码


我们来看 launchDriver(),又是一个远程调用。


  private def launchDriver(worker: WorkerInfo, driver: DriverInfo) {    logInfo("Launching driver " + driver.id + " on worker " + worker.id)    worker.addDriver(driver)    driver.worker = Some(worker)    // 远程调用    worker.endpoint.send(LaunchDriver(driver.id, driver.desc))    driver.state = DriverState.RUNNING  }
复制代码


这里调用了 worker endpoint 的 send 方法,因此要看接收方的 receive() 详情。同理,因为这个是往 Worker 发消息,那就去 Worker.scala 看看 Worker 怎么处理这个消息。

6. Worker 接收消息

终于扒到这个非常关键的信息了。终于要开始构建我们的 Driver 了。


private[deploy] class Worker(    override val rpcEnv: RpcEnv,    webUiPort: Int,    ...    val securityMgr: SecurityManager)  extends ThreadSafeRpcEndpoint with Logging {    ...      override def receive: PartialFunction[Any, Unit] = synchronized {            // 启动 Driver            case LaunchDriver(driverId, driverDesc) =>              logInfo(s"Asked to launch driver $driverId")              // 创建 DriverRunner              val driver = new DriverRunner(...)              drivers(driverId) = driver              driver.start()              // 记录 core 和 memory 使用了多少              coresUsed += driverDesc.cores              memoryUsed += driverDesc.mem
复制代码


接下去就是比较细节的启动工作。Driver 启动的详细信息又可以单开一个篇幅来讲了。


至此,我们从找到 Spark 的入口, spark 提交任务开始,准备参数,执行 submit 任务,根据特定的部署模式,启动相应的 Spark Application,又通过了解 Spark 的远程调用设计,摸索到 ClientMasterWorker 之间如何发送启动 Driver 的信息。一步一步找到 launchDriver 的入口。


最后还是画一张 UML 图来更加清晰的展现整体交互。


常驻小尾巴

都看到这里了,不关注一下嘛 👇 👇 👇



我的免费星球,欢迎来看我的日常碎碎念


参考资料

  • 陈凯 https://zhuanlan.zhihu.com/p/84506391

  • 简书 https://www.jianshu.com/p/4d4964c505fe?utm_campaign=maleskine&utm_content=note&utm_medium=seo_notes&utm_source=recommendation

  • 尚硅谷 P131 源码阅读 https://www.bilibili.com/video/BV11A411L7CK?p=131&spm_id_from=pageDriver

  • Solve https://cloud.tencent.com/developer/article/1574348

发布于: 2 小时前阅读数: 12
用户头像

还未添加个人签名 2018.07.24 加入

日拱一卒,长期主义,感受编程之美。 欢迎关注公众号 :「程序员赤小豆」

评论

发布
暂无评论
Spark 源码阅读 02:从 Spark-Submit 到 Driver 启动