12.4 大数据计算框架 MapReduce- 架构

用户头像
张荣召
关注
发布于: 2020 年 12 月 14 日

1.MapReduce整体架构及处理过程



问题:MepReduce在集群中是如何运行的?处理过程是什么样子的?

解析:1.分发MapReduce程序到集群:

main函数启动后,把自己的jar包分发到HDFS。

即分发给DataNode,存储在job.jar文件。比如:wordcount.jar

          2.提交作业到JobTracker。   

整个大数据服务集群中,只有一台JobTacker。JobTracker管理整个集群的作业管理和调度。

              作业提交后,并不是立即执行,作业排队等待调度。

          3.调度作业:JobScheduler. 如果资源空闲,就可以调度执行。

          4.安排运行:

             4.1为作业构建JobInProcess:JobInProcess数据结构。JobInProcess是一棵树。

                  JobInProcess解析被提交作业的input文件路径,查找能够处理的数据块有多少个split。

每个split将来都要启动一个map或者reduce来处理。

             4.2.创建TaskProcess:

先解析input文件路径,文件的数据块有多少。为每个数据块创建TaskInProcess。

                (Reduce任务也会分配一个TaskProcess)

TaskInProcess任务总数=Map的TaskInProcess总数+Reduce的TaskInProcess总数。



                  JobInProcess树的业务含义:一个作业被切分为多个任务。

每个任务下有任务尝试TaskAttempt(任务执行可能不成功)。

                  任务计算过程中可能异常,任务失败。

TaskInProcess会启动另一个TaskAttempt.  

                  ==>一个TaskInProcess下可能挂载多个TaskAttempt叶子节点.

          

            4.3.作业初始化完成后,等待任务分配到分布式集群中。

            4.4.分配任务到在分布式集群中:分布式计算任务,分布在集群中的服务器上。

                  分布式服务器在MapReduce场景中承担两种角色:TaskTracker与DataNode。

                  MapReduce的TaskTracker:

                  HDFS的DataNode:

          5.TaskTracker发送心跳给JobTracker:表示有空闲CPU资源可以分配任务。

          6.JobTracker分配任务:任务匹配.     

任务中已经记录了要处理的数据块(数据库所在的文件名称,偏移量,大小)

             TaskTracker所在服务器的IP地址是什么,IP地址上有那些数据块。

             JobTracker通信HDFS的NameNode,获取数据块的IP地址。

             然后再和TaskInProcess里面记录的任务所在的文件的偏移量(文件块地址),文件块进行匹配。

             使相同的文件块,任务块,数据块,刚好在同一个服务器上,就会分配任务给这个TaskTracker。

             TaskTracker领到任务后,通常数据块和任务在同一台服务器上。(不是绝对)

             JobTracker分配任务优先给TaskTracker分配的任务有本地数据。

==>Data  Locality数据本地化使数据能够在本地完成。

           7.TaskTacker领取任务后,启动TaskRunner进程:

              加载map函数。(DataNode中的jar包加载到本地进程中--使用反射技术)。

              TaskRunner读取数据分片,交给map函数处理。

           8.TaskTracker拷贝数据到其他服务器的reducer。   

              合并处理。

2.适合MapReduce计算类型

  • TopK:数据排序

  • K-means:数据聚类

  • Bayes:数据分配

  • SQL:

      不适合MapReduce的计算类型:(不适合数据分片)

  • Fibonacci数列      

3.InputFormat

     验证作业的输入的正确性

     将输入文件切分成逻辑的InputSplits,一个InputSplit将被分配给一个单独的Mappertask

     提供RecordReader的实现,这个RecordReader会从InputSplit中正确读出一条一条的K-V对公Mapper使用。

     InputFormat:

     getSplits:List<InputSplit>()

     createRecordReader: RecordReader<K,V>()

3.1.FileInputFormat

     得到分片的最小值minSize和最大值maxSize,可以通过设置mappered.min.split.size和mppered.max.split.size设置。

     对于每个输入文件,计算max(minSize,min(maxSize,blockSize));

     如果minSize<=blockSize<=maxSize,则设置为blockSize。

     ==>实现Data Locality设计。数据和计算在一起。通常设置为文件块大小blockSize。

     分片信息<file,start,length,hosts>,通过hosts实现map本地性。



4.OutputFormat

     OutputFormat接口决定了在哪里以及怎样持久化作业结果。

     默认的OutputFormat就是TextOutputFormat,它是一种以行分离,包含制表符界定的键值对的文本文件格式。

5.Partitioner

     什么是Partitioner:

  • MapReduce通过Partitioner对Key进行分区,进而把数据按我们自己的需求来分发。

    什么情况下使用Partitioner

  • 如果需要key按照自己意愿分发,那么你需要这样的组件。

  • 框架默认的HashPartitioner

  • 例如:数据内包含省份,而输出要求每个省份输出一个文件:

           public int getPartition(K key,V value,int numReduceTasks){

                return (key.hashCode() & Integer.MAX_VALUE)%numReduceTasks;

           }        



6.主要调度方法

  • 单队列调度:

                 特点:FIFO

                 优点:简单

                 缺点:资源利用率低

  •   容量调度(Capacity Scheduler,Hadoop-0.19.0)

                特点:多队列,每个队列分配一定系统容量(Guaranteed Capacity)

                          空闲资源可以被动态分配给负载重的队列

                          支持作业优先级

                 作业选择:

                          选择队列:资源回收请求队列优先;最多自由空间队列优先。

                          选择作业:按提交时间,优先级排队;检查用户配额;检查内存。

                 优点:支持多作业并行执行,提交作业利用率

                           动态调整资源分配,提高作业执行效率

                 缺点:队列设置和队列选择无法自动进行,用户需要了解大量系统信息。



7.JobTacker内部实现

 作业控制

  • 作业抽象成三层:作业监控层(JIP:JobInProcess),任务控制层(TIP:TaskInProcess),任务执行层。

  • 任务可能会被尝试多次执行,每个任务实例被称作Task Attempt(TA)

  • TA成功,TIP会标注该任务成功,所有TIP成功,JIP成功。

资源管理:

  • 根据TaskTacker状态信息分配任务。



7.JobTacker容错

JobTracker失败,那么未完成Job失败;

通过Job日志,Job可部分恢复。



8.TaskTacker容错

超时:

  • TaskTracker 10分钟(mapered.tasktracker.expiry.interval)未汇报心跳,则将其从集群中移除。

灰名单,黑名单:

  • TaskTracker上部署性能监控脚本

  • 如果性能表现太差,被JobTracker暂停调度。



9.Task容错

  • 允许部分Task失败:

                 允许失败的任务占比,默认0,Mapered.max.map.failers.percent和Mapered.max.reduce.failers.percent

  • Task由TIP监控,失败任务多次尝试,慢任务启动备份任务       

                 每次都是一个TA(Task Attempt),最大允许尝试次数:mapered.map.max.attempts和mapered.reduce.max.attempts.



10.Record容错

跳过导致Task失败的坏记录

  • K,V超大,导致OOM,配置最大长度,超出截断mapered.linecordreader.maxlength.

  • 异常数据引发程序BUG,task重试几次后,自动进入skip mode,跳过导致失败的记录,mapered.skip.attempts.to.start.skipping.



用户头像

张荣召

关注

还未添加个人签名 2018.05.02 加入

还未添加个人简介

评论

发布
暂无评论
12.4大数据计算框架MapReduce-架构