写点什么

架构师训练营 1 期第 12 周:数据应用(一)- 总结

用户头像
piercebn
关注
发布于: 2020 年 12 月 13 日

本文主要介绍 MapReduce 是如何实现大规模的分布式计算的,我们写的 map 和 reduce 函数如何在分布式集群中运行起来的,并完成整个计算过程的。

MapReduce 计算框架架构设计

MapReduce 计算框架核心由两部分组成

  • JobTracker

  • TaskTracker

大数据应用程序,用来编写 MapReduce 程序,通过 main 函数以进程的方式运行,大数据应用进程启动后,会在整个 MapReduce 集群中分布式的运行,下面介绍一个 MapReduce 程序如何在一个大规模的分布式集群中进行并行计算的,以及整个处理过程。

首先,启动大数据应用进程(设置好 Map Class、Reduce Class、Input Path、Output Path 等),启动后会和整个 MapReduce 集群进行通信,完成作业的分布式计算

  1. 将自己的作业 jar 包分发到 HDFS 里,和 HDFS 的 DataNode 进行通信,分发给 DataNode,存储在一个 job jar 文件里去

  2. 存储完后,向 JobTracker 提交作业,在整个 MapReduce 大数据集群中,JobTracker 只有一台服务器,它要完成整个集群的作业管理和调度

  3. JobTracker 有一个 JobScheduler 对作业进行调度管理,整个集群中的计算资源是有限的,能够启动的 map 进程和 reduce 进程也是有限的,所以并不是提交到了作业以后,它就直接在集群中运行了,它需要排队调度处理,通过 JobScheduler 对作业进行调度,如果当前服务器资源时空闲的,那么可能就会被调度执行排期,安排在整个集群中开始运行了,第一件事是在 JobTracker 服务器里为这个作业构建一个 JobInProcess 数据结构,每个 Job 都有一个 JobInProcess 数据结构,JobInProcess 是一棵树的根节点

  4. JobInProcess 去解析提交上来作业的 Input 的文件路径,然后看能够处理的数据块有多少个,然后根据这些数据块创建 TaskInProcess,每个数据分片 split 将来都要启动一个 map 或 reduce 任务去处理它,每个任务块都会创建一个 TaskInProcess 子节点。所以一开始初始化的时候,就把整个作业能够分解成的 map 和 reduce 任务都初始化好,构建成一个树,JobInProcess 是根节点,TaskInProcess 是中间节点,TaskInProcess 下面还有自己的节点,叫做 TaskAttempt(任务的一个尝试),当我们去分配一个计算资源进行任务计算的时候,这是一次任务尝试,这次计算有可能是不成功的,因为这个服务器可能会宕机,数据可能会出问题,计算过程中会异常,这次失败了以后,TaskInProcess 会启动另一个 TaskAttempt 任务尝试,所以 TaskInProcess 下面可能会有多个 TaskAttempt 叶子节点

  5. 初始化完了作业任务树以后,就等待着去分配这些任务到分布式集群中了,分布式集群的计算任务是在分布式的服务器上,一台服务器通常在 MapReduce 的作业环境下会承担两种角色,一种角色是 MapReduce 的 TaskTracker 角色,另一种是 HDFS 的 DataNode 的角色,即一台服务器既启动着 DataNode 也启动着 TaskTracker,在它上面既存储的 HDFS 数据块,也启动着将要对这些数据块进行计算的 TaskTracker 进程,这样我们才能实现移动计算比移动数据更划算,我们才能通过 TaskTracker 将要对这个数据计算的任务或作业移动到了 DataNode 的同一个服务器节点上,所以启动了很多的 TaskInProcess 以后,每个 TaskInProcess 要处理文件的文件名,在文件中的数据的偏移量和要处理的数据的长度,都记录在 TaskInProcess 里面,构建好后,就等待着 TaskTracker 去领取任务,当一个 TaskTracker 空闲的时候,有空闲的 CPU,它就可以启动一个任务去完成一个 Task 作业,它就像 JobTracker 发送一个心跳,表明可以给自己分配任务

  6. JobTracker 就会给它分配一个任务,分配任务的时候并不是随便分配的,因为任务中已经记录了要处理的数据块是哪个,JobTracker 会做一个任务匹配,TaskTracker 所在服务器的 IP 地址,以及在这个 IP 地址上有哪些数据块存在,JobTracker 会跟 HDFS 的 NameNode 进行通信,然后去获取数据块所在的 IP 地址,然后再跟 TaskInProcess 里记录的任务所在的文件的偏移量(文件块的地址)进行匹配,如果有相同的文件块、任务块正好和 TaskTracker、DataNode 在同一个服务器上,就会把这个任务分配给这个 TaskTracker,TaskTracker 领到任务以后,它要处理的数据通常和它在同一个服务器上(并不是绝对的),JobTracker 分配任务的时候,优先给 TaskTracker 分配的任务上面有它在本地有的数据,这个过程叫做 Data Locality 数据的本地化,使数据能够在本地完成

  7. TaskTracker 领到了任务以后,它就会启动一个 TaskRunner 子进程,去加载 map 函数,可以通过和 DataNode 去通信,我要执行任务的描述信息是什么,然后把 map 函数所在的 jar 包加载到本地进程中,反射的把整个 map 函数加载起来,加载起来以后由 TaskRunner 根据分配给它的任务的所在的数据块,从数据块去读一个数据分片的一行,读到以后把偏移量以及这一行记录交给 map 函数去处理,整个过程就启动起来了

  8. map 函数处理完了以后,map 函数的输出也是由 TaskTracker 进行数据的拷贝,拷贝给其他服务器的 reduce,其他 reduce 收到数据以后就开始启动,然后把相同的 key value 合并以后,key value list 交给 reduce 去处理,任务最后就完成了

适合 MapReduce 的计算类型

  • TopK(数据排序)

  • K-means(数据聚类)

  • Bayes(数据分类)

  • SQL(大部分的数据分析 SQL)

不适合 MapReduce 的计算类型

  • Fibonacci(不适合分片,单个数据之间有前后关联)

InputFormat

  • MapReduce 能够完成整个的计算,是因为对数据能够进行有效的管理,它可以从数据分片中获取正确的数据交给 map 函数去执行,主要靠的是一个 InputFormat 接口去完成的,接口中有重要的两个函数

  • getSplits:将输入文件切分成逻辑的 InputSplits,一个 InputSplit 将被分配给一个单独的 Mapper task 

  • createRecordReader:提供 RecordReader 的实现,这个 RecordReader 会从 InputSplit 中正确读出一条一条的 K-V对供 Mapper 使用。

  • MapReduce 框架靠调用 InputFormat 去完成数据的任务的切分,调用 getSplits 获取所有的 splits,有多少个 splits 就有多少个 map task,而 reduce task 是根据 Job 可以 set 的,set 多少 reduce number 就有多少个 reduce task,这样就可以把 map 和 reduce task 初始化出来了,在程序的运行中,调用 createRecordReader,由它返回 key value 交给 map 函数去处理,因此由 InputFormat 接口完成了数据的输入

FileInputFormat

  • 我们在开发过程中,有可能不需要自己去写程序去实现 InputFormat,如果我们使用的是普通的文件,我们就可以用 MapReduce 框架内已经提供的 FileInputFormat 去完成数据输入

  • 我们可以通过配置变量去设置 split 的大小,得到分片的最小值 minSize 和最大值 maxSize,可以通过设置 mapred.min.split.size 和 mapred.max.split.size 来设置;

  • 对于每个输入文件,计算 max(minSize, min(maxSize, blockSize)); 如果 minSize<=blockSize<=maxSize,则设为 blockSize。

  • 分片信息 <file,start,length,hosts>,通过 hosts 实现 map 本地性。

OutputFormat

  • OutputFormt 接口决定了在哪里以及怎样持久化作业结果。

  • 默认的 OutputFormat 就是 TextOutputFormat,它是一种以行分隔,包含制表符界定的键值对的文本文件格式。

Partitioner

什么是 Partitioner

  • Mapreduce 通过 Partitioner 对 Key 进行分区,进而把数据按我们自己的需求来分发。

什么情况下使用 Partitioner

  • 如果你需要 key 按照自己意愿分发,那么你需要这样的组件。

  • 框架默认的 HashPartitioner

  • 例如:数据内包含省份,而输出要求每个省份输出一个文件。

主要调度方法

MapReduce1.0 采用单队列调度方法

  • 特点:FIFO

  • 优点:简单

  • 缺点:资源利用率低

容量调度(Capacity Scheduler ,Hadoop-0.19.0 )

  • 特点:

  • 多队列,每个队列分配一定系统容量(Guaranteed Capacity)

  • 空闲资源可以被动态分配给负载重的队列

  • 支持作业优先级

  • 作业选择:

  • 选择队列:资源回收请求队列优先;最多自由空间队列优先。

  • 选择作业:按提交时间、优先级排队;检查用户配额;检查内存。

  • 优点:

  • 支持多作业并行执行,提高资源利用率

  • 动态调整资源分配,提高作业执行效率

  • 缺点:

  • 队列设置和队列选择无法自动进行,用户需要了解大量系统信息

生产环境中,大多数情况下我们不会用 MapReduce 提供的调度算法,在我们的实践工作中,有些作业的优先级就是要比其他的作业优先级高,这种情况下,我们就不可能用固定的这种调度方法去进行作业调度,需要更灵活的作业调度方式

JobTracker 的内部实现

作业控制

  • 作业抽象成三层:作业监控层(JIP),任务控制层(TIP),任务执行层。

  • 任务可能会被尝试多次执行,每个任务实例被称作 Task Attempt(TA)

  • TA 成功,TIP 会标注该任务成功,所有 TIP 成功,JIP 成功

资源管理

  • 根据 TaskTracker 状态信息进行任务分配

JobTracker 的容错

  • JobTracker 失败,那么未完成 Job 失败;

  • 通过 Job 日志,重新恢复整个 JIP 树,看哪些任务完成了,继续向下执行,Job 可部分恢复。

TaskTracker 的容错

超时

  • TaskTracker 10 分钟(mapred.tasktracker.expiry.interval)未汇报心跳,则将其从集群移除

灰名单,黑名单

  • TaskTracker 上部署性能监控脚本

  • 如果性能表现太差,被 JobTacker 暂停调度

Task 的容错

允许部分 Task 失败

  • 允许失败的任务占比,默认 0,Mapred.max.map.failers.percent, mapred.max.reduce.failures.percent

Task 由 TIP 监控,失败任务多次尝试,慢任务启动备份任务

  • 每次都是一个 TA(Task Attempt),最大允许尝试次数:mapred.map.max.attempts, mapred.reduce.max.attempts

Record 的容错

跳过导致 Task 失败的坏记录

  • K,V 超大,导致 OOM,配置最大长度,超出截断 mapred.linercordreader.maxlength,会导致数据不完整,得到结果不正确。

  • 异常数据引发程序 bug,task 重试几次后,自动进入 skip mode,跳过导致失败的记录, mapred.skip.attempts.to.start.skipping 。


发布于: 2020 年 12 月 13 日阅读数: 21
用户头像

piercebn

关注

还未添加个人签名 2019.07.24 加入

还未添加个人简介

评论

发布
暂无评论
架构师训练营 1 期第 12 周:数据应用(一)- 总结