写点什么

Yarn 资源调度框架

用户头像
布兰特
关注
发布于: 2 小时前
Yarn资源调度框架

Yarn 是 Hadoop2.0 版本开始提出的资源管理框架,在 Hadoop1.x 中,JobTracker 的两个主要功能:资源管理和作业控制(作业监控和容错等),JobTracker 在既要负责资源和作业的情况下,扩展性就很差,尤其是作业管理方面,集群同时可能有成百上千的作业在运行,作业里面又有成百上千的 task 在跑,又要涉及到资源的管控,以及 Job 并发数的限制,所以 JobTracker 的压力负载很大,并且 JobTracker 存在单点故障问题。


Hadoop2.0 开始 Yarn 的提出,就是将 JobTracker 的两个功能进行拆分,由 Yarn 来负责资源管理的部分,并且朝着资源统一管理的方向发展,满足于不同的计算框架,不单单是 MR


Yarn 的基本组成


Resourcemanager(RM)

RM 是一个全局的资源管理器,集群只有一个,负责整个系统的资源管理和分配,包括处理客户端请求、启动/监控 APP master、监控 nodemanager、资源的分配与调度。它主要由两个组件构成:调度器(Scheduler)和应用程序管理器(Applications Manager,ASM)。


调度器(Scheduler)调度器根据容量、队列等限制条件(如每个队列分配一定的资源,最多执行一定数量的作业等),将系统中的资源分配给各个正在运行的应用程序。需要注意的是,该调度器是一个“纯调度器”,它不再从事任何与具体应用程序相关的工作,比如不负责监控或者跟踪应用的执行状态等,也不负责重新启动因应用执行失败或者硬件故障而产生的失败任务,这些均交由应用程序相关的 ApplicationMaster 完成。调度器仅根据各个应用程序的资源需求进行资源分配,而资源分配单位用一个抽象概念“资源容器”(Resource Container,简称 Container)此外,该调度器是一个可插拔的组件,用户可根据自己的需要设计新的调度器,YARN 提供了多种直接可用的调度器,比如 Fair Scheduler 和 Capacity Scheduler 等。


应用程序管理器(Applications Manager,ASM)应用程序管理器负责管理整个系统中所有应用程序,包括应用程序提交、与调度器协商资源以启动 ApplicationMaster、监控 ApplicationMaster 运行状态并在失败时重新启动它等。

  • 处理客户端请求

  • 启动/监控 ApplicationMaster

  • 监控 NodeManager 资源分配和调度


ApplicationMaster(AM)   

应用程序管理器,管理 YARN 内运行的单个应用程序实例(Application)。 

负责协调来自 resourcemanager 的资源,并通过 nodemanager 监视作业的执行和资源使用情况。

  • 数据切分

  • 为应用程序申请资源并进一步分配给内部任务。

  • 任务监控与容错


NodeManager(NM)

Nodemanager 整个集群有多个,负责每个节点上的资源和使用。Nodemanager 管理着抽象容器,这些抽象容器代表着一些特定程序使用针对每个节点的资源。Nodemanager 定时地向 RM 汇报本节点上的资源使用情况和各个 Container 的运行状态(cpu 和内存等资源)

  • 单个节点上的资源管理和任务

  • 处理来自于 Resourcemanager 的命令

  • 处理来自域 ApplicationMaster 的命令


Container

Container 是 YARN 中的资源抽象,它封装了某个节点上的多维度资源,如内存、CPU、磁盘、网络等,当 AM 向 RM 申请资源时,RM 为 AM 返回的资源便是用 Container 表示的。YARN 会为每个任务分配一个 Container,且该任务只能使用该 Container 中描述的资源。需要注意的是,Container 不同于 MRv1 中的 slot,它是一个动态资源划分单位,是根据应用程序的需求动态生成的。目前为止,YARN 仅支持 CPU 和内存两种资源,且使用了轻量级资源隔离机制 Cgroups 进行资源隔离。

  • 对 task 环境的抽象

  • 描述一系列信息任务运行资源的集合(cpu、内存、io 等)

  • 任务运行环境


Container 构成


Yarn 调度流程

Yarn 调度流程理解完上面的组件,来看看 YARN 启动一个 MapReduce 作业的流程:


第 1 步:客户端向 ResourceManager 提交自己的应用,这里的应用就是指 MapReduce 作业。

第 2 步:ResourceManager 向 NodeManager 发出指令,为该应用启动第一个 Container,并在其中启动 ApplicationMaster。

第 3 步:ApplicationMaster 向 ResourceManager 注册。

第 4 步:ApplicationMaster 采用轮询的方式向 ResourceManager 的 YARN Scheduler 申领资源。

第 5 步:当 ApplicationMaster 申领到资源后(其实是获取到了空闲节点的信息),便会与对应 NodeManager 通信,请求启动计算任务。

第 6 步:NodeManager 会根据资源量大小、所需的运行环境,在 Container 中启动任务。

第 7 步:各个任务向 ApplicationMaster 汇报自己的状态和进度,以便让 ApplicationMaster 掌握各个任务的执行情况。

第 8 步:应用程序运行完成后,ApplicationMaster 向 ResourceManager 注销并关闭自己。


思考

多说几句, 大概的讲一讲自己当时比较迷惑的点

ApplicationMaster->Container

在第四步、第五步中,ApplicationMaster 向 ResourceManager 申请完资源后,ApplicationMaster 再进一步要求对应的 NodeManager 启动 container

ResourceManager 就会给 NodeManager 发送一个和安全性相关的 master key。然后,NodeManager 用这个 master key 来验证 ApplicationMaster 发送给他的分配 Container 的请求,NodeManager 的一个最重要的功能是根据 ApplicationMaster 的要求启动 container,由于各个节点上的 container 由 ResourceManager 进行统一管理和分配的,通常,ResourceManager 将 Container 分配给 ApplicationMaster,为防止 ApplicationMaster 未经授权随意要求 NodeManager 启动 container,ResourceManager 一般会为每个 container 分配一个令牌(ApplicationMaster 无法伪造),而 NodeManager 启动任何 container 之前均会对令牌的合法性进行验证,一旦通过验证后,NodeManager 才会按照一定的流程启动该 container。


Container 启动

Container 启动是由 ApplicationMaster 通过 RPC 函数 ContainerManagementProtocol 的 startContainers() 向 NM 发起的,NM 中的 ContainerManagerImpl 组件负责接收并处理该请求。Container 启动过程主要经历三个阶段:资源本地化、启动并运行 Container 和资源清理。


资源本地化主要是指分布式缓存机制完成的工作,功能包括初始化各种服务组件、创建工作目录、从 HDFS 下载运行所需的各种资源(比如文本文件、JAR 包、可执行文件)等。资源本地化主要有两部分组成,分别是应用程序初始化和 Container 本地化。其中,应用程序初始化的主要工作是初始化各类必需的服务组件(比如日志记录组件 LogHandler、资源状态追踪器 LocalResourceTrackerImpl 等),供后续 Container 使用,通常由 Application 的第一个 Container 完成;Container 本地化则是创建工作目录,从 HDFS 下载各类文件资源。


Container 启动是由 ContainerLauncher 服务完成,该服务将进一步调用插拔式组件 ContainerExecutor。Yarn 中提供了三种 ContainerExecutor 实现,一种是 DefaultContainerExecutor,一种是 LinuxContainerExecutor,另一种是 DockerContainerExecutor,由参数 yarn.nodemanager.container-executor.class 控制具体采用的方式。


资源清理则是资源本地化的逆过程,它负责清理各类资源,均由 ResourceLocalizationService 服务完成。


Container 运行是由 ContainersLauncher 服务实现的,主要过程可概括为:将待运行的 Container 所需的环境和运行命令写到 Shell 脚本 launch_container.sh 脚本中,并将启动该脚本的命令写入 default_container_executro.sh 中,然后通过该脚本启动 Container。之所以要将 Container 运行命令写到脚本中并通过运行脚本来执行它,主要是直接执行命令可能让一些特殊符号发生转义。


在集群上随便找了个任务的 Container 启动脚本

launch_container.sh

#!/bin/bash
export SPARK_YARN_STAGING_DIR="hdfs://HDFS82653/user/hadoop/.sparkStaging/application_1614357352126_428450"export LOCAL_DIRS="/data/emr/yarn/local/usercache/hadoop/appcache/application_1614357352126_428450"export HADOOP_CONF_DIR="/usr/local/service/hadoop/etc/hadoop"export NM_HTTP_PORT="5008"export JAVA_HOME="/usr/local/jdk"export LOG_DIRS="/data/emr/yarn/logs/application_1614357352126_428450/container_e36_1614357352126_428450_01_000005"export NM_AUX_SERVICE_mapreduce_shuffle="AAA0+gAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA="export NM_PORT="5006"export USER="hadoop"export HADOOP_YARN_HOME="/usr/local/service/hadoop"export CLASSPATH="$PWD:$PWD/__spark_conf__:$PWD/__spark_libs__/*:$HADOOP_CONF_DIR:$HADOOP_COMMON_HOME/share/hadoop/common/*:$HADOOP_COMMON_HOME/share/hadoop/common/lib/*:$HADOOP_HDFS_HOME/share/hadoop/hdfs/*:$HADOOP_HDFS_HOME/share/hadoop/hdfs/lib/*:$HADOOP_YARN_HOME/share/hadoop/yarn/*:$HADOOP_YARN_HOME/share/hadoop/yarn/lib/*:$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/*:$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/lib/*:$PWD/__spark_conf__/__hadoop_conf__"export NM_HOST="172.21.4.103"export HADOOP_TOKEN_FILE_LOCATION="/data/emr/yarn/local/usercache/hadoop/appcache/application_1614357352126_428450/container_e36_1614357352126_428450_01_000005/container_tokens"export SPARK_USER="hadoop"export HADOOP_HDFS_HOME="/usr/local/service/hadoop"export LOGNAME="hadoop"export JVM_PID="$$"export PWD="/data/emr/yarn/local/usercache/hadoop/appcache/application_1614357352126_428450/container_e36_1614357352126_428450_01_000005"export HADOOP_COMMON_HOME="/usr/local/service/hadoop"export HOME="/home/"export CONTAINER_ID="container_e36_1614357352126_428450_01_000005"export MALLOC_ARENA_MAX="4"ln -sf "/data/emr/yarn/local/usercache/hadoop/filecache/191186/__spark_conf__.zip" "__spark_conf__"hadoop_shell_errorcode=$?if [ $hadoop_shell_errorcode -ne 0 ]then exit $hadoop_shell_errorcodefiln -sf "/data/emr/yarn/local/usercache/hadoop/filecache/191187/__spark_libs__3647098556803883493.zip" "__spark_libs__"hadoop_shell_errorcode=$?if [ $hadoop_shell_errorcode -ne 0 ]then exit $hadoop_shell_errorcodefiln -sf "/data/emr/yarn/local/usercache/hadoop/filecache/191189/maxmind-db-1.1.0.jar" "maxmind-db-1.1.0.jar"hadoop_shell_errorcode=$?if [ $hadoop_shell_errorcode -ne 0 ]then exit $hadoop_shell_errorcodefiln -sf "/data/emr/yarn/local/usercache/hadoop/filecache/191190/hive-site.xml" "hive-site.xml"hadoop_shell_errorcode=$?if [ $hadoop_shell_errorcode -ne 0 ]then exit $hadoop_shell_errorcodefiln -sf "/data/emr/yarn/local/usercache/hadoop/filecache/191191/original-log2orc_dongqiudi-1.0-SNAPSHOT.jar" "original-log2orc_dongqiudi-1.0-SNAPSHOT.jar"hadoop_shell_errorcode=$?if [ $hadoop_shell_errorcode -ne 0 ]then exit $hadoop_shell_errorcodefiln -sf "/data/emr/yarn/local/usercache/hadoop/filecache/191188/geoip2-2.5.0.jar" "geoip2-2.5.0.jar"hadoop_shell_errorcode=$?if [ $hadoop_shell_errorcode -ne 0 ]then exit $hadoop_shell_errorcodefiexec /bin/bash -c "$JAVA_HOME/bin/java -server -Xmx8192m '-Dlog4j.ignoreTCL=true' -Djava.io.tmpdir=$PWD/tmp '-Dspark.driver.port=39374' -Dspark.yarn.app.container.log.dir=/data/emr/yarn/logs/application_1614357352126_428450/container_e36_1614357352126_428450_01_000005 -XX:OnOutOfMemoryError='kill %p' org.apache.spark.executor.YarnCoarseGrainedExecutorBackend --driver-url spark://CoarseGrainedScheduler@172.21.0.124:39374 --executor-id 4 --hostname 172.21.4.103 --cores 4 --app-id application_1614357352126_428450 --resourceProfileId 0 --user-class-path file:$PWD/__app__.jar --user-class-path file:$PWD/geoip2-2.5.0.jar --user-class-path file:$PWD/maxmind-db-1.1.0.jar --user-class-path file:$PWD/original-log2orc_dongqiudi-1.0-SNAPSHOT.jar 1>/data/emr/yarn/logs/application_1614357352126_428450/container_e36_1614357352126_428450_01_000005/stdout 2>/data/emr/yarn/logs/application_1614357352126_428450/container_e36_1614357352126_428450_01_000005/stderr"hadoop_shell_errorcode=$?if [ $hadoop_shell_errorcode -ne 0 ]then exit $hadoop_shell_errorcodefi
复制代码

default_container_executro.sh

#!/bin/bash/bin/bash "/data/emr/yarn/local/usercache/hadoop/appcache/application_1614357352126_428450/container_e36_1614357352126_428450_01_000005/default_container_executor_session.sh"rc=$?echo $rc > "/data/emr/yarn/local/nmPrivate/application_1614357352126_428450/container_e36_1614357352126_428450_01_000005/container_e36_1614357352126_428450_01_000005.pid.exitcode.tmp"/bin/mv -f "/data/emr/yarn/local/nmPrivate/application_1614357352126_428450/container_e36_1614357352126_428450_01_000005/container_e36_1614357352126_428450_01_000005.pid.exitcode.tmp" "/data/emr/yarn/local/nmPrivate/application_1614357352126_428450/container_e36_1614357352126_428450_01_000005/container_e36_1614357352126_428450_01_000005.pid.exitcode"exit $rc
复制代码

Application

在 YARN 看来,他所维护的所有应用程序叫 Appliction,但是到了计算框架这一层,各自有各自的名字,mapreduce 叫 job,storm 叫 topology 等等,YARN 是资源管理系统,不仅仅运行 mapreduce,还有其他应用程序,mapreduce 只是一种计算应用。但是 yarn 内部设有应用程序到计算框架应用程序的映射关系(通常是 id 的映射),你这里所说的应用程序,job 属于不同层面的概念,切莫混淆,要记住,YARN 是资源管理系统,可看做云操作系统,其他的东西,比如 mapreduce,只是跑在 yarn 上的 application,但是,mapreduce 是应用层的东西,它可以有自己的属于,比如 job task,但是 yarn 专业一层是不知道或者说看不到的


ApplicationMaster

Spark、Storm、 Flink ApplicationMaster 都是自己实现的,而 MR 是 Yarn 提供默认的


ApplicationMaster 一般来说压力不大,因为他只负责作业的调度,而不是去执行 task 任务,但是如果你的任务小文件很多,这时候 ApplicationMaster 所维护的 task 的数据就很多,并且要去监控这些 task 的运行状态,所以 ApplicationMaster 内存就会存在 OOM 的情况


关于 ApplicationMaster 的状态,ApplicationMaster 的 NodeManager 和其他 NodeManager 都需要维护,通过 Application 状态机可将节点上属于这个 App 的所有 Container 聚集在一起,当需要特殊操作,比如杀死 Application 时,可以将对应的所有 Container 销毁。


另外,需要注意,一个应用程序的 ApplicationMaster 所在的节点也可以运行它的 container,包括 Yarn 为 ApplicationMaster 分配的 Container,这都是随机的


上面写的内容比较散,Yarn 的源码没有好好的读过,以后可能会好好的读一下源码,再仔细的了解下细节。


Yarn 文档

https://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/YARN.html

发布于: 2 小时前阅读数: 4
用户头像

布兰特

关注

大数据研发工程师 2019.05.10 加入

while true{ 🍴😪💻 }

评论

发布
暂无评论
Yarn资源调度框架