12.4 大数据计算框架 MapReduce- 架构
1.MapReduce整体架构及处理过程
问题:MepReduce在集群中是如何运行的?处理过程是什么样子的?
解析:1.分发MapReduce程序到集群:
main函数启动后,把自己的jar包分发到HDFS。
即分发给DataNode,存储在job.jar文件。比如:wordcount.jar
2.提交作业到JobTracker。
整个大数据服务集群中,只有一台JobTacker。JobTracker管理整个集群的作业管理和调度。
作业提交后,并不是立即执行,作业排队等待调度。
3.调度作业:JobScheduler. 如果资源空闲,就可以调度执行。
4.安排运行:
4.1为作业构建JobInProcess:JobInProcess数据结构。JobInProcess是一棵树。
JobInProcess解析被提交作业的input文件路径,查找能够处理的数据块有多少个split。
每个split将来都要启动一个map或者reduce来处理。
4.2.创建TaskProcess:
先解析input文件路径,文件的数据块有多少。为每个数据块创建TaskInProcess。
(Reduce任务也会分配一个TaskProcess)
TaskInProcess任务总数=Map的TaskInProcess总数+Reduce的TaskInProcess总数。
JobInProcess树的业务含义:一个作业被切分为多个任务。
每个任务下有任务尝试TaskAttempt(任务执行可能不成功)。
任务计算过程中可能异常,任务失败。
TaskInProcess会启动另一个TaskAttempt.
==>一个TaskInProcess下可能挂载多个TaskAttempt叶子节点.
4.3.作业初始化完成后,等待任务分配到分布式集群中。
4.4.分配任务到在分布式集群中:分布式计算任务,分布在集群中的服务器上。
分布式服务器在MapReduce场景中承担两种角色:TaskTracker与DataNode。
MapReduce的TaskTracker:
HDFS的DataNode:
5.TaskTracker发送心跳给JobTracker:表示有空闲CPU资源可以分配任务。
6.JobTracker分配任务:任务匹配.
任务中已经记录了要处理的数据块(数据库所在的文件名称,偏移量,大小)
TaskTracker所在服务器的IP地址是什么,IP地址上有那些数据块。
JobTracker通信HDFS的NameNode,获取数据块的IP地址。
然后再和TaskInProcess里面记录的任务所在的文件的偏移量(文件块地址),文件块进行匹配。
使相同的文件块,任务块,数据块,刚好在同一个服务器上,就会分配任务给这个TaskTracker。
TaskTracker领到任务后,通常数据块和任务在同一台服务器上。(不是绝对)
JobTracker分配任务优先给TaskTracker分配的任务有本地数据。
==>Data Locality数据本地化使数据能够在本地完成。
7.TaskTacker领取任务后,启动TaskRunner进程:
加载map函数。(DataNode中的jar包加载到本地进程中--使用反射技术)。
TaskRunner读取数据分片,交给map函数处理。
8.TaskTracker拷贝数据到其他服务器的reducer。
合并处理。
2.适合MapReduce计算类型
TopK:数据排序
K-means:数据聚类
Bayes:数据分配
SQL:
不适合MapReduce的计算类型:(不适合数据分片)
Fibonacci数列
3.InputFormat
验证作业的输入的正确性
将输入文件切分成逻辑的InputSplits,一个InputSplit将被分配给一个单独的Mappertask
提供RecordReader的实现,这个RecordReader会从InputSplit中正确读出一条一条的K-V对公Mapper使用。
InputFormat:
getSplits:List<InputSplit>()
createRecordReader: RecordReader<K,V>()
3.1.FileInputFormat
得到分片的最小值minSize和最大值maxSize,可以通过设置mappered.min.split.size和mppered.max.split.size设置。
对于每个输入文件,计算max(minSize,min(maxSize,blockSize));
如果minSize<=blockSize<=maxSize,则设置为blockSize。
==>实现Data Locality设计。数据和计算在一起。通常设置为文件块大小blockSize。
分片信息<file,start,length,hosts>,通过hosts实现map本地性。
4.OutputFormat
OutputFormat接口决定了在哪里以及怎样持久化作业结果。
默认的OutputFormat就是TextOutputFormat,它是一种以行分离,包含制表符界定的键值对的文本文件格式。
5.Partitioner
什么是Partitioner:
MapReduce通过Partitioner对Key进行分区,进而把数据按我们自己的需求来分发。
什么情况下使用Partitioner
如果需要key按照自己意愿分发,那么你需要这样的组件。
框架默认的HashPartitioner
例如:数据内包含省份,而输出要求每个省份输出一个文件:
public int getPartition(K key,V value,int numReduceTasks){
return (key.hashCode() & Integer.MAX_VALUE)%numReduceTasks;
}
6.主要调度方法
单队列调度:
特点:FIFO
优点:简单
缺点:资源利用率低
容量调度(Capacity Scheduler,Hadoop-0.19.0)
特点:多队列,每个队列分配一定系统容量(Guaranteed Capacity)
空闲资源可以被动态分配给负载重的队列
支持作业优先级
作业选择:
选择队列:资源回收请求队列优先;最多自由空间队列优先。
选择作业:按提交时间,优先级排队;检查用户配额;检查内存。
优点:支持多作业并行执行,提交作业利用率
动态调整资源分配,提高作业执行效率
缺点:队列设置和队列选择无法自动进行,用户需要了解大量系统信息。
7.JobTacker内部实现
作业控制
作业抽象成三层:作业监控层(JIP:JobInProcess),任务控制层(TIP:TaskInProcess),任务执行层。
任务可能会被尝试多次执行,每个任务实例被称作Task Attempt(TA)
TA成功,TIP会标注该任务成功,所有TIP成功,JIP成功。
资源管理:
根据TaskTacker状态信息分配任务。
7.JobTacker容错
JobTracker失败,那么未完成Job失败;
通过Job日志,Job可部分恢复。
8.TaskTacker容错
超时:
TaskTracker 10分钟(mapered.tasktracker.expiry.interval)未汇报心跳,则将其从集群中移除。
灰名单,黑名单:
TaskTracker上部署性能监控脚本
如果性能表现太差,被JobTracker暂停调度。
9.Task容错
允许部分Task失败:
允许失败的任务占比,默认0,Mapered.max.map.failers.percent和Mapered.max.reduce.failers.percent
Task由TIP监控,失败任务多次尝试,慢任务启动备份任务
每次都是一个TA(Task Attempt),最大允许尝试次数:mapered.map.max.attempts和mapered.reduce.max.attempts.
10.Record容错
跳过导致Task失败的坏记录
K,V超大,导致OOM,配置最大长度,超出截断mapered.linecordreader.maxlength.
异常数据引发程序BUG,task重试几次后,自动进入skip mode,跳过导致失败的记录,mapered.skip.attempts.to.start.skipping.
评论