写点什么

Spark 启动及提交流程内部核心原理剖析

作者:编程江湖
  • 2022 年 3 月 23 日
  • 本文字数:7037 字

    阅读完需:约 23 分钟

Apache Spark 是专为大规模数据处理而设计的快速通用的计算引擎,并且拥有 Hadoop MapReduce 所具有的优点;但不同于 MapReduce 的是——Job 中间输出结果可以保存在内存中,从而不再需要读写 HDFS,因此 Spark 能更好地适用于需要迭代 MapReduce 的算法。接下来带大家探索一下 Spark 启动及提交流程的内部核心原理。

Netty

在探索 Spark 启动及提交流程的内部核心原理之前,我们得先简单介绍一下 Spark 内部的通信框架----Netty

Spark 中通信框架的发展:

  • Spark 早期版本中采用 Akka 作为内部通信部件。

  • Spark1.3 中引入 Netty 通信框架,为了解决 Shuffle 的大数据传输问题使用

  • Spark1.6 中 Akka 和 Netty 可以配置使用。Netty 完全实现了 Akka 在 Spark 中的功能。

  • Spark2 系列中,Spark 抛弃 Akka,使用 Netty。

尚硅谷大数据培训_专业的大数据培训机构_值得信赖的大数据教程

大数据

大数据教程

大数据培训

尚硅谷大数据拼课程、论口碑更给力

尚硅谷 IT 培训

立即咨询

Netty 通信架构解析

Netty 通讯架构如下:

RpcEndpoint:RPC 通信终端。Spark 针对每个节点(Client/Master/Worker)都称之为一个 RPC 终端,且都实现 RpcEndpoint 接口,内部根据不同端点的需求,设计不同的消息和不同的业务处理,如果需要发送(询问)则调用 Dispatcher。在 Spark 中,所有的终端都存在生命周期:

  1. Constructor

  2. onStart

  3. receive*

  4. onStop

RpcEnv:RPC 上下文环境,每个 RPC 终端运行时依赖的上下文环境称为 RpcEnv;在把当前 Spark 版本中使用的 NettyRpcEnv(即每个节点都有环境上下文)

Dispatcher:消息调度(分发)器,针对于 RPC 终端需要发送远程消息或者从远程 RPC 接收到的消息,分发至对应的指令收件箱(发件箱)。如果指令接收方是自己则存入收件箱,如果指令接收方不是自己,则放入发件箱;一个环境一个

Inbox:指令消息收件箱。一个本地 RpcEndpoint 对应一个收件箱,Dispatcher 在每次向 Inbox 存入消息时,都将对应 EndpointData 加入内部 ReceiverQueue 中,另外 Dispatcher 创建时会启动一个单独线程进行轮询 ReceiverQueue,进行收件箱消息消费;

RpcEndpointRef:RpcEndpointRef 是对远程 RpcEndpoint 的一个引用。当我们需要向一个具体的 RpcEndpoint 发送消息时,一般我们需要获取到该 RpcEndpoint 的引用,然后通过该应用发送消息。

OutBox:指令消息发件箱。对于当前 RpcEndpoint 来说,一个目标 RpcEndpoint 对应一个发件箱,如果向多个目标 RpcEndpoint 发送信息,则有多个 OutBox。当消息放入 Outbox 后,紧接着通过 TransportClient 将消息发送出去。消息放入发件箱以及发送过程是在同一个线程中进行;

RpcAddress:表示远程的 RpcEndpointRef 的地址,Host + Port。

TransportClient:Netty 通信客户端,一个 OutBox 对应一个 TransportClient,TransportClient 不断轮询 OutBox,根据 OutBox 消息的 receiver 信息,请求对应的远程 TransportServer;(类似 socket)

TransportServer:Netty 通信服务端,一个 RpcEndpoint 对应一个 TransportServer,接受远程消息后调用 Dispatcher 分发消息至对应收发件箱(通过本地指令);

Netty 通信流程总结

  1. 在一个 rpcEnv 里, RpcEndpoint 通过持有 RpcEndpointRef,向 Dispatcher 发送消息,Dispatcher 识别到消息是远程指令,会把消息发送到 OutBox。

  2. TransportClient 不断轮询 OutBox 的队列,一旦 OutBox 队列有消息,就会将消息发往对应 RpcEndpoint 的 TransportServer。

  3. 接收的 RpcEndpoint 的 TransportServer 会把消息发往 Dispatcher,Dispatcher 识别到本地指令后,会把消息给发往自身的 InBox 里面,这样就实现了通信。

Spark 启动流程剖析

在剖析 Spark 启动流程中,我们主要通过 StandAlone 模式下的 Master / Work 启动流程来看 Spark 是怎么通讯的。

Master 启动流程

我们首先从启动命令 start-all.sh 出发(因为他会启动 master 和 work),一步一步查看启动的调用流程:

start-all.sh

会加载 sparkhome 作为变量,所以学习 spark 安装多种模式 spark 时最好不配

start-master.sh

CLASS="org.apache.spark.deploy.master.Master"

"${SPARK_HOME}/sbin"/spark-daemon.sh start $CLASS 1 \

--host $SPARK_MASTER_HOST --port $SPARK_MASTER_PORT --webui-port $SPARK_MASTER_WEBUI_PORT \

$ORIGINAL_ARGS

run_command class "$@" --运行传过来的所有参数

${SPARK_HOME}"/bin/spark-class "$command" "$@"

java .. org.apache.spark.deploy.master.Master 最终启动这个类,启动 java 虚拟机

-- main

-- startRpcEnvAndEndpoint // master 和 worker 通讯需要现有通讯环境,先创建通讯环境和 endpoint

1. RpcEnv.create //创建环境

1.1-- RpeEnv return new NettyRpcEnvFactory().create(config) //启动创建环境的工厂

1.2-- val nettyEnv = new NettyRpcEnv //创建 netty 环境

-- dispatcher: Dispatcher = new Dispatcher(this, numUsableCores)// 创建 dispatch(一个环境只有一个

-- endpoints: ConcurrentMap[String, MessageLoop] //存储每个 endpoint 的消息死循环

-- endpointRefs: ConcurrentMap[RpcEndpoint, RpcEndpointRef] //根据 ref 找到 endpoint 通讯实体

-- new DedicatedMessageLoop(name, e, this)//专用消息循环(每个 endpoint 一个消息循环

-- private val inbox = new Inbox(name, endpoint) //一个 endpoint 都单独享有一个收件箱

-- receiveLoop()//每个线程都会死循环等待信息

1.3-- nettyEnv.startServer(config.bindAddress, actualPort) //启动 netty 服务

-- server = transportContext.createServer //创建 transportServer


2. rpcEnv.setupEndpoint(ENDPOINT_NAME, //将 endpoint(master)放进环境

new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))


-- dispatcher.registerRpcEndpoint(name, endpoint) //将创建的 master 注册


-- new Master

//启动了一个线程, 这个线程会每 60s 检测一次所有 worker 的超时情况

-- timeOutDeadWorkers()


3. master 启动完毕

Worker 启动流程

Worker 的启动流程还是先从 start-all.sh 触发,会走进 start-woker.sh。

org.apache.spark.deploy.worker.Worker

val rpcEnv = RpcEnv.create(systemName, host, port, conf, securityMgr)//创建 rpcenv

val masterAddresses = masterUrls.map(RpcAddress.fromSparkURL)//与 master 相比,只多了这一行.

//worker 会从配置找到 master 地址,主动去找 master 注册.(master 有可能会是 ha,所以可能找到的是 master 地址数组)

rpcEnv.setupEndpoint(ENDPOINT_NAME, new Worker(rpcEnv, webUiPort, cores, memory, //注册 worker 为 endpoint

masterAddresses, ENDPOINT_NAME, workDir, conf, securityMgr, resourceFileOpt))

-- onStart

registerWithMaster() //向 master 注册自己(worker)

sendRegisterMessageToMaster(masterEndpoint) //发送注册信息

-- 注册成功后,每 15 秒执行一次,发送心跳

sendToMaster(Heartbeat(workerId, self))

masterRef.send(message)

尚硅谷大数据培训_专业的大数据培训机构_值得信赖的大数据教程

大数据

大数据教程

大数据培训

尚硅谷大数据拼课程、论口碑更给力

尚硅谷 IT 培训

立即咨询

Spark 启动流程总结

  1. A 跟 B 通信,A 拿到 B 的 EndPointRef,通过 send 方法发送一个样例类进行通信。样例类携带更多信息,类似通信协议

  2. B 会有 receive 方法收到信息通过模式匹配进行匹配信息

Spark 提交流程剖析

因为 Spark 可以以多种模式运行,国内多以 YARN 模式进行提交,所以此处以 YARN 的 Cluster 模式下的 Spark 提交流程进行剖析。

SparkSubmit

SparkSubmit 的作用主要就是两个:

1. 解析参数

2. 提交参数,初始数环境,并获取"org.apache.spark.deploy.yarn.YarnClusterApplication"的对象,调用对象的 start 方法

org.apache.spark.deploy.SparkSubmit


main

-- submit.doSubmit(args) //执行提交

-- doSubmit

submit(appArgs, uninitLog)

-- doRunMain()

//执行主方法

runMain(args, uninitLog)

//准备提交环境

1. val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)

//"org.apache.spark.deploy.yarn.YarnClusterApplication"赋给 childMainClass 重点!!

//如果是 client 模式,则 childMainClass 就是提交 jar 包的主类

childMainClass = YARN_CLUSTER_SUBMIT_CLASS


//反射加载 childMainClass 类

2. mainClass = Utils.classForName(childMainClass)


//创建 mainClass 实例并且转为 SparkApplication 类型

//SparkApplication 是 YarnClusterApplication 的父类

3. val app = mainClass.getConstructor().newInstance().asInstanceOf[SparkApplication]


//最终调用 YarnClusterApplication 的 start 方法

4. app.start(childArgs.toArray, sparkConf)

//client 的构造器创建 YarnClient 对象,用于连接 ResourceManager

new Client(new ClientArguments(args), conf, null).run()

--run

this.appId = submitApplication()//提交应用到 ResourceManager,返回 appid

-- submitApplication

val containerContext = createContainerLaunchContext(newAppResponse)//

//确定 applicationMaster 是谁

//如果 yarn 模式:就是 applicationMaster

//如果是 client 模式:就是 executorLauncher

-- amClass = org.apache.spark.deploy.yarn.ApplicationMaster

val appContext = createApplicationSubmissionContext(newApp, containerContext)

yarnClient.submitApplication(appContext)

//提交应用到 ResourceManager,让 resourcemanager 启动 container(applicationMaster)

此时第一个进程 spark submit 已经完成(可以直接把这个进程 kill 掉也没问题)

ApplicationMaster

此时到 am 进程启动,而这个进程主要的作用如下:

1. 封装 ApplicationMaster 的参数

2. 根据参数,创建 ApplicationMaster 对象

3. 执行 ApplicationMaster 的 run 方法,在 run 方法中,最后调用到 runDriver 方法

//解析完各种参数,new 一个 applicationMaster

master = new ApplicationMaster(amArgs, sparkConf, yarnConf)

//执行 applicationMaster 的 run 方法

master.run


if (isClusterMode) { //集群模式

runDriver()//集群模式就运行 driver

} else { // client 模式

runExecutorLauncher()

}


//am 启动第一件事就是跑 driver,启动应用程序

runDriver()


1. userClassThread = startUserApplication() //启动应用程序,也就是执行提交的 jar 包中的主函数

//加载参数穿过来的用户类 即提交时指定的--class

1.1 val mainMethod = userClassLoader.loadClass(args.userClass)

.getMethod("main", classOf[Array[String]])

1.2 new Thread ... //创建一个线程,线程名就叫 driver,并返回这个线程

2.userThread.start()//执行这个 driver 线程的 run 方法,线程执行的就是我们提交应用程序类的主函数


//等待 sc 初始化,初始化完后才继续往下执行

3. val sc: SparkContext = ThreadUtils.awaitResult(sparkContextPromise.future,

Duration(totalWaitTime, TimeUnit.MILLISECONDS))


//cs 初始化完后向 ResourceManager 注册 applicationMaster

//注册的本质就是向 rm 申请资源运行 executor 进程

4. registerAM(host, port, userConf, sc.ui.map(_.webUrl), appAttemptId)

-- client.register(host, port, yarnConf, _sparkConf, uiAddress, historyAddress)

-- amClient.registerApplicationMaster(driverHost, driverPort, trackingUrl)


//am 向 rm 注册成功后,创建分配器

5. -- createAllocator(driverRef, userConf, rpcEnv, appAttemptId, distCacheConf)

-- allocator.allocateResources()//分配资源

-- val allocatedContainers = allocateResponse.getAllocatedContainers()//通过分配器响应 获取 分配到的容器列表

allocatorBlacklistTracker.setNumClusterNodes(allocateResponse.getNumClusterNodes)


handleAllocatedContainers(allocatedContainers.asScala)//资源列表大于 0,就处理分配到的资源

runAllocatedContainers(containersToUse)//运行分配后的资源,封装指令

ExecutorRunnable.run//启动 executor

-- startContainer()

-- val commands = prepareCommand()//封装指令

-- bin/java org.apache.spark.executor.YarnCoarseGrainedExecutorBackend

-- nmClient.startContainer(container.get, ctx)//启动容器,也就是启动 executor


!!AM 启动完毕

AM 只有两个子线程,一个主线程,一个子线程(driver)

  1. 子线程 driver 执行用户类(用户传过来的 jar 包 main 方法)

  2. 主线程

1.主要是注册 am(向 rm 请求分配资源) , rm 返回容器给 am

2. am 拿到返回容器列表,让 nm 在容器上执行 java 命令

-- bin/java org.apache.spark.executor.YarnCoarseGrainedExecutorBackend 这个命令

3.最终在 nm 上启动 executor 进程

CoarseGrainedExecutorBackend

执行一次- bin/java org.apache.spark.executor.YarnCoarseGrainedExecutorBackend 这个命令 就会执行一个新的进程,则是属于并行执行的感觉,和之前执行的内容是分开的。类似我们在 Windows 中开了一个微信和 qq 程序一样,各自执行,互不影响。因为这就是我们平时说的 executor 进程

1. commands=/bin/java/org.apache.spark.executor.CoarseGrainedExecutorBackend,

执行这个指令,那么是调用这个类的 main 方法。

2. main 方法中:

// 1. 首先是对一些参数进行封装

// 2. 执行 run 方法

-- run(driverUrl, executorId, hostname, cores, appId, workerUrl, userClassPath)

// 1.通过 driver 的 uri 和 Driver 进行关联

--driver = fetcher.setupEndpointRefByURI(driverUrl)

// 2.通过通信环境创建了一个终端,名字为 executor,

在底层:Executor 启动后会注册通信,并收到信息 onStart,收到消息后,会执行通信对象 CoarseGrainedExecutorBackend

的 onStart 方法,点击 CoarseGrainedExecutorBackend

--env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend( env.rpcEnv, driverUrl, executorId, hostname, cores, userClassPath, env))

// 1.获取 driver 的引用

-- driver = Some(ref)

// 2.ExecutorBackend 向 driver 发送消息,注册 executor 的消息,也称之为反向注册

--ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls))

// 3.在 driver 端会接收到这个消息,因为在 driver 端,有一个上下文的对象,sparkcontext,在这个类有一个属性:

private var _schedulerBackend: SchedulerBackend = _,点击 SchedulerBackend,是一个 trait,找到

实现类:CoarseGrainedSchedulerBackend,在这个类中,有一个方法:receiveAndReply():

// executor 的引用,在 driver 端,发送消息给到 ExecutorBackend,注册 executor 成功

--executorRef.send(RegisteredExecutor)


// ExecutorBackend 类中有一个 recive 方法,用来接收 driver 返回的 executor 注册成功的消息,executor 是一个计算对象,在这个对象里面有一个线程池,每一个线程来处理一个从 driver 端发送过来的任务

--executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)

整体提交流程图如下图所示:

YARN ClusterClient 模式异同

到此,YARN 模式下的 Cluster 提交流程结束,而 Client 模式的提交与 Cluster 模式提交的不同如下:

SparkSubmit 类中:

childMainClass = args.mainClass//用户类 即提交时的 --class

//而在 cluster 模式则是 org.apache.spark.deploy.yarn.YarnClusterApplication


开始执行用户的 main 函数:其实在执行 drvier

driver 在 sparkSubmit 进程的主线程中运行

//而 cluster 的 driver 则是 ApplicationMaster 中的子线程,而 AM 一定在某一个 NM 上,所以叫 cluster 模式

//driver 在客户端上运行,所以叫 client 模式


SparkContext 类中//driver 中会创建 SparkContext

client.submitApplication()

amClass = org.apache.spark.deploy.yarn.ExecutorLauncher


开始启动 AM, 表明上是 ExecutorLauncher, 本质还是 ApplicationMaster


runExecutorLauncher()


----------------------------------------------------------------------------------------

不同:

1. driver 的位置不一样

cluset: 在 ApplicationMaster 进程中, 是它的一个子线程

client: 在 SparkSubmit 的进程中, 而且是在他的主线程中执行的.

2. AM 的名字不一样

cluster: ApplicationMaster

client: ExecutorLauntcher

Spark 提交流程总结

用大白话解释提交流程源码就是:

执行 suspark-submit 后会有三个进程

  1. SparkSubmit

  2. ApplicationMaster

  3. YarnCoarseGrainedExecutorBackend:粗粒度执行器后端, 也就是 Executor

  4. 找个客户端执行 sparksubmit

  5. 执行 SparkSubmit 类里的 main 方法,准备环境,解析一堆参数

  6. 获取一个 childMainClass 的值并且反射创建这个类

  7. 如果是 cluster 模式他的值就是 YarnClusterApplication

  8. 如果是 client 模式他的值就是提交 jar 包的主类

  9. 通过反射创建 childMainClass 得到 YarnClusterApplication 并且强转为 SparkApplication

  10. 调用 SparkApplication 的 start 方法

  11. 创建一个 client 去连接 rm,并且获取到 rm 返回 appId

  12. 封装一个指令让 rm 找一台 nm 启动一个 am

  13. 这个指令如果是 cluster 那么启动的类就是 applicationMaster,如果是 client 就是启动 executorLauncher

  14. 此时 SparkSubmit 工作完成,如果是 cluster 模式,那么直接把这个进程 kill 掉也没事

  15. ApplicationMaster

  16. 启动一个 ApplicationMaster 进程后,解析各种参数后封装一个 ApplicationMaster 对象

  17. 封装好的 ApplicationMaster 对象会开启一个线程运行用户类(提交的 jar 包)的 main 函数,这个线程就是 driver 线程,

  18. 在 am 进程主方法,会等待获取 SparkContext,等到获取后就会向 rm 注册自己并申请资源,rm 返回容器列表(这里申请资源细节比较多)

  19. am 拿到容器列表,就会在 nm 启动 executor 进程

  20. YarnCoarseGrainedExecutorBackend 进程启动成功后

  21. 启动后第一件事向 driver 线程反向注册

  22. 注册成功后,executor 进程会创建 executor 计算对象

  23. 计算对象里有一个线程池,每一个线程来处理一个 driver 端发过来的任务


关键词:大数据培训

用户头像

编程江湖

关注

IT技术分享 2021.11.23 加入

关注【IT云文化】微信公众号,获取学习资源

评论

发布
暂无评论
Spark启动及提交流程内部核心原理剖析_编程江湖_InfoQ写作平台