大数据之面试篇之 Hadoop/HDFS/Yarn
MapReduce 篇
Hadoop 解决大规模数据分布式计算的方案是 MapReduce。MapReduce 既是一个编程模型,又是一个计算框架。也就是说,开发人员必须基于 MapReduce 编程模型进大数据培训行编程开发,然后将程序通过 MapReduce 计算框架分发到 Hadoop 集群中运行。我们先看一下作为编程模型的 MapReduce。
说说 MapReduce 编程模型
MapReduce 是一种非常简单又非常强大的编程模型。
简单在于其编程模型只包含 map 和 reduce 两个过程,map 的主要输入是一对< key , value>值,经过 map 计算后输出一对< key , value>值;然后将相同 key 合并,形成< key , value 集合>;再将这个< key , value 集合>输入 reduce,经过计算输出零个或多个< key , value>对。
但是 MapReduce 同时又是非常强大的,不管是关系代数运算(SQL 计算),还是矩阵运算(图计算),大数据领域几乎所有的计算需求都可以通过 MapReduce 编程来实现。
我们以 WordCount 程序为例。WordCount 主要解决文本处理中的词频统计问题,就是统计文本中每一个单词出现的次数。如果只是统计一篇文章的词频,几十 K 到几 M 的数据,那么写一个程序,将数据读入内存,建一个 Hash 表记录每个词出现的次数就可以了,如下图。
MapReduce 作业启动和运行机制
我们以 Hadoop1 为例,MapReduce 运行过程涉及以下几类关键进程:
大数据应用进程:启动用户 MapReduce 程序的主入口,主要指定 Map 和 Reduce 类、输入输出文件路径等,并提交作业给 Hadoop 集群。JobTracker 进程:根据要处理的输入数据量启动相应数量的 map 和 reduce 进程任务,并管理整个作业生命周期的任务调度和监控。JobTracker 进程在整个 Hadoop 集群全局唯一。TaskTracker 进程:负责启动和管理 map 进程以及 reduce 进程。因为需要每个数据块都有对应的 map 函数,TaskTracker 进程通常和 HDFS 的 DataNode 进程启动在同一个服务器,也就是说,Hadoop 集群中绝大多数服务器同时运行 DataNode 进程和 TaskTacker 进程。
具体作业启动和计算过程如下:
应用进程将用户作业 jar 包存储在 HDFS 中,将来这些 jar 包会分发给 Hadoop 集群中的服务器执行 MapReduce 计算。应用程序提交 job 作业给 JobTracker。JobTacker 根据作业调度策略创建 JobInProcess 树,每个作业都会有一个自己的 JobInProcess 树。JobInProcess 根据输入数据分片数目(通常情况就是数据块的数目)和设置的 reduce 数目创建相应数量的 TaskInProcess。TaskTracker 进程和 JobTracker 进程进行定时通信。如果 TaskTracker 有空闲的计算资源(空闲 CPU 核),JobTracker 就会给他分配任务。分配任务的时候会根据 TaskTracker 的服务器名字匹配在同一台机器上的数据块计算任务给它,使启动的计算任务正好处理本机上的数据。TaskRunner 收到任务后根据任务类型(map 还是 reduce),任务参数(作业 jar 包路径,输入数据文件路径,要处理的数据在文件中的起始位置和偏移量,数据块多个备份的 DataNode 主机名等)启动相应的 map 或者 reduce 进程。map 或者 reduce 程序启动后,检查本地是否有要执行任务的 jar 包文件,如果没有,就去 HDFS 上下载,然后加载 map 或者 reduce 代码开始执行。如果是 map 进程,从 HDFS 读取数据(通常要读取的数据块正好存储在本机)。如果是 reduce 进程,将结果数据写出到 HDFS。
3. HDFS 中的文件大小设置,以及有什么影响?
HDFS 中的文件在物理上是分块存储(block),块的大小可以通过配置参数( dfs.blocksize)来规定,默认大小在 hadoop2.x 版本中是 128M,老版本中是 64M。
思考:为什么块的大小不能设置的太小,也不能设置的太大?
HDFS 的块比磁盘的块大,其目的是为了最小化寻址开销。如果块设置得足够大,从磁盘传输数据的时间会明显大于定位这个块开始位置所需的时间。
因而,传输一个由多个块组成的文件的时间取决于磁盘传输速率。
如果寻址时间约为 10ms,而传输速率为 100MB/s,为了使寻址时间仅占传输时间的 1%,我们要将块大小设置约为 100MB。默认的块大小 128MB。增加文件块大小,需要增加磁盘的传输速率。
secondary namenode 工作机制
1)第一阶段:NameNode 启动
(1)第一次启动 NameNode 格式化后,创建 fsimage 和 edits 文件。如果不是第一次启动,直接加载编辑日志和镜像文件到内存。
(2)客户端对元数据进行增删改的请求。
(3)NameNode 记录操作日志,更新滚动日志。
(4)NameNode 在内存中对数据进行增删改查。
2)第二阶段:Secondary NameNode 工作
(1)Secondary NameNode 询问 NameNode 是否需要 checkpoint。直接带回 NameNode 是否检查结果。
(2)Secondary NameNode 请求执行 checkpoint。
(3)NameNode 滚动正在写的 edits 日志。
(4)将滚动前的编辑日志和镜像文件拷贝到 Secondary NameNode。
(5)Secondary NameNode 加载编辑日志和镜像文件到内存,并合并。
(6)生成新的镜像文件 fsimage.chkpoint。
(7)拷贝 fsimage.chkpoint 到 NameNode。
(8)NameNode 将 fsimage.chkpoint 重新命名成 fsimage。
NameNode 与 SecondaryNameNode 的区别与联系?
1)区别
(1)NameNode 负责管理整个文件系统的元数据,以及每一个路径(文件)所对应的数据块信息。
(2)SecondaryNameNode 主要用于定期合并命名空间镜像和命名空间镜像的编辑日志。
2)联系:
(1)SecondaryNameNode 中保存了一份和 namenode 一致的镜像文件(fsimage)和编辑日志(edits)。
(2)在主 namenode 发生故障时(假设没有及时备份数据),可以从 SecondaryNameNode 恢复数据。
ZKFailoverController 主要职责
1)健康监测:周期性的向它监控的 NN 发送健康探测命令,从而来确定某个 NameNode 是否处于健康状态,如果机器宕机,心跳失败,那么 zkfc 就会标记它处于一个不健康的状态。
2)会话管理:如果 NN 是健康的,zkfc 就会在 zookeeper 中保持一个打开的会话,如果 NameNode 同时还是 Active 状态的,那么 zkfc 还会在 Zookeeper 中占有一个类型为短暂类型的 znode,当这个 NN 挂掉时,这个 znode 将会被删除,然后备用的 NN,将会得到这把锁,升级为主 NN,同时标记状态为 Active。
3)当宕机的 NN 新启动时,它会再次注册 zookeper,发现已经有 znode 锁了,便会自动变为 Standby 状态,如此往复循环,保证高可靠,需要注意,目前仅仅支持最多配置 2 个 NN。
4)master 选举:如上所述,通过在 zookeeper 中维持一个短暂类型的 znode,来实现抢占式的锁机制,从而判断那个 NameNode 为 Active 状态。
7.Hadoop 序列化和反序列化及自定义 bean 对象实现序列化
1)序列化和反序列化
(1)序列化就是把内存中的对象,转换成字节序列(或其他数据传输协议)以便于存储(持久化)和网络传输。
(2)反序列化就是将收到字节序列(或其他数据传输协议)或者是硬盘的持久化数据,转换成内存中的对象。
(3)Java 的序列化是一个重量级序列化框架(Serializable),一个对象被序列化后,会附带很多额外的信息(各种校验信息,header,继承体系等),不便于在网络中高效传输。所以,hadoop 自己开发了一套序列化机制(Writable),精简、高效。
2)自定义 bean 对象要想序列化传输步骤及注意事项:
(1)必须实现 Writable 接口
(2)反序列化时,需要反射调用空参构造函数,所以必须有空参构造
(3)重写序列化方法
(4)重写反序列化方法
(5)注意反序列化的顺序和序列化的顺序完全一致
(6)要想把结果显示在文件中,需要重写 toString(),且用"\t"分开,方便后续用
(7)如果需要将自定义的 bean 放在 key 中传输,则还需要实现 comparable 接口,因为 mapreduce 框中的 shuffle 过程一定会对 key 进行排序
说说 FileInputFormat 切片机制
job 提交流程源码详解 waitForCompletion()submit();
// 1、建立连接 connect();
// 1)创建提交 job 的代理 new Cluster(getConfiguration());
// (1)判断是本地 yarn 还是远程 initialize(jobTrackAddr, conf);
// 2、提交 jobsubmitter.submitJobInternal(Job.this, cluster)
// 1)创建给集群提交数据的 Stag 路径 Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
// 2)获取 jobid,并创建 job 路径 JobID jobId = submitClient.getNewJobID();
// 3)拷贝 jar 包到集群 copyAndConfigureFiles(job, submitJobDir);rUploader.uploadFiles(job, jobSubmitDir);
// 4)计算切片,生成切片规划文件 writeSplits(job, submitJobDir);maps = writeNewSplits(job, jobSubmitDir);input.getSplits(job);
// 5)向 Stag 路径写 xml 配置文件 writeConf(conf, submitJobFile);conf.writeXml(out);
// 6)提交 job,返回提交状态 status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());
在一个运行的 Hadoop 任务中,什么是 InputSplit?
FileInputFormat 源码解析(input.getSplits(job))
(1)找到你数据存储的目录。
(2)开始遍历处理(规划切片)目录下的每一个文件。
(3)遍历第一个文件 ss.txt。
a)获取文件大小 fs.sizeOf(ss.txt);。
b)计算切片大小 computeSliteSize(Math.max(minSize,Math.min(maxSize,blocksize)))=blocksize=128M。
c)默认情况下,切片大小=blocksize。
d)开始切,形成第 1 个切片:ss.txt—0:128M 第 2 个切片 ss.txt—128:256M 第 3 个切片 ss.txt—256M:300M(每次切片时,都要判断切完剩下的部分是否大于块的 1.1 倍,不大于 1.1 倍就划分一块切片)。
e)将切片信息写到一个切片规划文件中。
f)整个切片的核心过程在 getSplit()方法中完成。
g)数据切片只是在逻辑上对输入数据进行分片,并不会再磁盘上将其切分成分片进行存储。InputSplit 只记录了分片的元数据信息,比如起始位置、长度以及所在的节点列表等。
h)注意:block 是 HDFS 上物理上存储的存储的数据,切片是对数据逻辑上的划分。
(4)提交切片规划文件到 yarn 上,yarn 上的 MrAppMaster 就可以根据切片规划文件计算开启 maptask 个数。如何判定一个 job 的 map 和 reduce 的数量?
1)map 数量 splitSize=max{minSize,min{maxSize,blockSize}}map 数量由处理的数据分成的 block 数量决定 default_num = total_size / split_size;
2)reduce 数量 reduce 的数量 job.setNumReduceTasks(x);x 为 reduce 的数量。不设置的话默认为 1。Maptask 的个数由什么决定?
一个 job 的 map 阶段 MapTask 并行度(个数),由客户端提交 job 时的切片个数决定。
MapTask 和 ReduceTask 工作机制(也可回答 MapReduce 工作原理)
MapTask 工作机制
(1)Read 阶段:Map Task 通过用户编写的 RecordReader,从输入 InputSplit 中解析出一个个 key/value。
(2)Map 阶段:该节点主要是将解析出的 key/value 交给用户编写 map()函数处理,并产生一系列新的 key/value。
(3)Collect 收集阶段:在用户编写 map()函数中,当数据处理完成后,一般会调用 OutputCollector.collect()输出结果。在该函数内部,它会将生成的 key/value 分区(调用 Partitioner),并写入一个环形内存缓冲区中。
(4)Spill 阶段:即“溢写”,当环形缓冲区满后,MapReduce 会将数据写到本地磁盘上,生成一个临时文件。需要注意的是,将数据写入本地磁盘之前,先要对数据进行一次本地排序,并在必要时对数据进行合并、压缩等操作。
(5)Combine 阶段:当所有数据处理完成后,MapTask 对所有临时文件进行一次合并,以确保最终只会生成一个数据文件。
ReduceTask 工作机制
(1)Copy 阶段:ReduceTask 从各个 MapTask 上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中。
(2)Merge 阶段:在远程拷贝数据的同时,ReduceTask 启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或磁盘上文件过多。
(3)Sort 阶段:按照 MapReduce 语义,用户编写 reduce()函数输入数据是按 key 进行聚集的一组数据。为了将 key 相同的数据聚在一起,Hadoop 采用了基于排序的策略。由于各个 MapTask 已经实现对自己的处理结果进行了局部排序,因此,ReduceTask 只需对所有数据进行一次归并排序即可。
(4)Reduce 阶段:reduce()函数将计算结果写到 HDFS 上。说说 MapReduce 有几种排序及排序发生的阶段
1)排序的分类:
(1)部分排序:MapReduce 根据输入记录的键对数据集排序。保证输出的每个文件内部排序。
(2)全排序:如何用 Hadoop 产生一个全局排序的文件?最简单的方法是使用一个分区。但该方法在处理大型文件时效率极低,因为一台机器必须处理所有输出文件,从而完全丧失了 MapReduce 所提供的并行架构。替代方案:首先创建一系列排好序的文件;其次,串联这些文件;最后,生成一个全局排序的文件。主要思路是使用一个分区来描述输出的全局排序。例如:可以为待分析文件创建 3 个分区,在第一分区中,记录的单词首字母 a-g,第二分区记录单词首字母 h-n, 第三分区记录单词首字母 o-z。
(3)辅助排序:(GroupingComparator 分组)Mapreduce 框架在记录到达 reducer 之前按键对记录排序,但键所对应的值并没有被排序。甚至在不同的执行轮次中,这些值的排序也不固定,因为它们来自不同的 map 任务且这些 map 任务在不同轮次中完成时间各不相同。一般来说,大多数 MapReduce 程序会避免让 reduce 函数依赖于值的排序。但是,有时也需要通过特定的方法对键进行排序和分组等以实现对值的排序。
(4)二次排序:在自定义排序过程中,如果 compareTo 中的判断条件为两个即为二次排序。
2)自定义排序 WritableComparablebean 对象实现 WritableComparable 接口重写 compareTo 方法,就可以实现排序 @Overridepublic int compareTo(FlowBean o) {
// 倒序排列,从大到小 return this.sumFlow > o.getSumFlow() ? -1 : 1;}
3)排序发生的阶段:
(1)一个是在 map side 发生在 spill 后 partition 前。
(2)一个是在 reduce side 发生在 copy 后 reduce 前。
说说 MapReduce 中 shuffle 阶段的工作流程,如何优化 shuffle 阶段
分区,排序,溢写,拷贝到对应 reduce 机器上,增加 combiner,压缩溢写的文件。
说说 MapReduce 中 combiner 的作用是什么,一般使用情景,哪些情况不需要,及和 reduce 的区别?
1)Combiner 的意义就是对每一个 maptask 的输出进行局部汇总,以减小网络传输量。
2)Combiner 能够应用的前提是不能影响最终的业务逻辑,而且,Combiner 的输出 kv 应该跟 reducer 的输入 kv 类型要对应起来。
3)Combiner 和 reducer 的区别在于运行的位置。Combiner 是在每一个 maptask 所在的节点运行;Reducer 是接收全局所有 Mapper 的输出结果。如果没有定义 partitioner,那数据在被送达 reducer 前是如何被分区的?
如果没有自定义的 partitioning,则默认的 partition 算法,即根据每一条数据的 key 的 hashcode 值摸运算(%)reduce 的数量,得到的数字就是“分区号“。
MapReduce 怎么实现 TopN?
可以自定义 groupingcomparator,对结果进行最大值排序,然后再 reduce 输出时,控制只输出前 n 个数。就达到了 topn 输出的目的。
Hadoop 的缓存机制(Distributedcache)
分布式缓存一个最重要的应用就是在进行 join 操作的时候,如果一个表很大,另一个表很小,我们就可以将这个小表进行广播处理,即每个计算节点上都存一份,然后进行 map 端的连接操作,经过我的实验验证,这种情况下处理效率大大高于一般的 reduce 端 join,广播处理就运用到了分布式缓存的技术。
DistributedCache 将拷贝缓存的文件到 Slave 节点在任何 Job 在节点上执行之前,文件在每个 Job 中只会被拷贝一次,缓存的归档文件会被在 Slave 节点中解压缩。将本地文件复制到 HDFS 中去,接着 Client 会通过 addCacheFile() 和 addCacheArchive()方法告诉 DistributedCache 在 HDFS 中的位置。当文件存放到文地时,JobClient 同样获得 DistributedCache 来创建符号链接,其形式为文件的 URI 加 fragment 标识。当用户需要获得缓存中所有有效文件的列表时,JobConf 的方法 getLocalCacheFiles() 和 getLocalArchives()都返回一个指向本地文件路径对象数组。
如何使用 mapReduce 实现两个表的 join?
1)reduce side join : 在 map 阶段,map 函数同时读取两个文件 File1 和 File2,为了区分两种来源的 key/value 数据对,对每条数据打一个标签(tag),比如:tag=0 表示来自文件 File1,tag=2 表示来自文件 File2。
2)map side join : Map side join 是针对以下场景进行的优化:两个待连接表中,有一个表非常大,而另一个表非常小,以至于小表可以直接存放到内存中。这样,我们可以将小表复制多份,让每个 map task 内存中存在一份(比如存放到 hash table 中),然后只扫描大表:对于大表中的每一条记录 key/value,在 hash table 中查找是否有相同的 key 的记录,如果有,则连接后输出即可。
什么样的计算不能用 mr 来提速?
1)数据量很小。2)繁杂的小文件。3)索引是更好的存取机制的时候。4)事务处理。5)只有一台机器的时候。MapReduce3.0 做了哪些改进?
1.Tasknative 优化:为 MapReduce 增加了 C/C++的 map output collector 实现(包括 Spill,Sort 和 IFile 等),通过作业级别参数调整就可切换到该实现上。对于 shuffle 密集型应用,其性能可提高约 30%。
2.MapReduce 内存参数自动推断。在 Hadoop 2.0 中,为 MapReduce 作业设置内存参数非常繁琐,涉及到两个参数:mapreduce.{map,reduce}.memory.mb 和 mapreduce.{map,reduce}.java.opts,一旦设置不合理,则会使得内存资源浪费严重,比如将前者设置为 4096MB,但后者却是“-Xmx2g”,则剩余 2g 实际上无法让 java heap 使用到。
3.Hadoop3.x 中的 MapReduce 添加了 Map 输出 collector 的本地实现,对于 shuffle 密集型的作业来说,这将会有 30%以上的性能提升。
HDFS 篇 HDFS 中的 block 默认保存几份?
默认保存 3 份
HDFS 默认 BlockSize 是多大?
从 2.7.3 版本开始,官方关于 Data Blocks 的说明中,block size 由 64 MB 变成了 128 MB 的。
负责 HDFS 数据存储的是哪一部分?
DataNode 负责数据存储
HDFS 有哪些优点
1、高容错性
数据自动保存多个副本
副本丢失后,自动恢复
2、适合批处理
移动计算而非数据
数据位置暴露给计算框架
3、适合大数据处理
GB、TB、甚至 PB 级数据
百万规模以上的文件数量
10K+节点规模
4、流式文件访问
一次性写入,多次读取
保证数据一致性
5、可构建在廉价机器上
通过多副本提高可靠性
提供了容错和恢复机制
HDFS 缺点
1、低延迟数据访问
比如毫秒级-达不到
低延迟与高吞吐率
2、小文件存取
占用 NameNode 大量内存
寻道时间超过读取时间
3、并发写入、文件随机修改
一个文件只能有一个写者
仅支持 append
HDFS 访问方式有哪些
HDFS Shell 命令
HDFS Java API
HDFS REST API
HDFS Fuse:实现了 fuse 协议
HDFS lib hdfs:C/C++访问接口
HDFS 其他语言编程 API
使用 thrift 实现
支持 C++、Python、php、C#等语言
如何增加和移除 DataNode 节点
1、加入新的 datanode
步骤 1:将已存在 datanode 上的安装包(包括配置文件等)拷贝到新 datanode 上
步骤 2:启动新 datanode
sbin/hadoop-deamon.sh start datanode
2、移除旧 datanode
步骤 1:将 datanode 加入黑名单,并更新黑名单,在 NameNode 上,将 datanode 的 host 或者 ip 加入配置选项 dfs.hosts.exclude 指定的文件中
步骤 2:移除 datanode
bin/hdfs dfsadmin -refreshNodes
HDFS 快照的作用以及如何设置
1、HDFS 上文件和目录是不断变化的,快照可以帮助用户保存某个时刻的数据
2、HDFS 快照的作用
防止用户误操作删除数据
数据备份
3、一个目录可以产生快照,当且仅当它是 Snapshottable
bin/hdfs dfsadmin allowSnapshot
4、创建/删除快照
bin/hdfs dfs -createSnapshot[]
bin/hdfs dfs -deleteSnapshot[]
5、快照存放位置和特点
快照是只读的,不可修改
快照位置:
谈谈 HDFS 的缓存
1、HDFS 自身不提供数据缓存功能,而是使用 OS 缓存
容易内存浪费,eg.一个 block 三个副本同时被缓存
多种计算框架共存,均将 HDFS 作为共享存储系统
MapReduce:离线计算,充分利用磁盘
Impala:低延迟计算,充分利用内存
Spark:内存计算框架
2、HDFS 应让多种混合计算类型共存一个集群中
合理的使用内存、磁盘等资源
比如,高频访问的特点文件应被尽可能长期缓存,防止置换到磁盘上
3、用户需通过命令显式的将一个目录或文件加入/移除缓存
不支持块级别的缓存
不支持自动化缓存
可设置缓存失效时间
4、缓存目录:仅对一级文件进行缓存
不会递归缓存所有文件与目录
5、以 pool 的形式组织缓存资源
借助 YARN 的资源管理方式,将缓存划分到不同 pool 中
每个 pool 有类 linux 权限管理机制、缓存上限、失效时间等
6、独立管理内存,未与资源管理系统 YARN 集成
用户可为每个 DN 设置缓存大小,该值独立于 YARN
HDFS 的存储机制
HDFS 存储机制,包括 HDFS 的写入数据过程和读取数据过程两部分
HDFS 写数据过程 1)客户端通过 Distributed FileSystem 模块向 NameNode 请求上传文件,NameNode 检查目标文件是否已存在,父目录是否存在。
2)NameNode 返回是否可以上传。
3)客户端请求第一个 block 上传到哪几个 datanode 服务器上。
4)NameNode 返回 3 个 datanode 节点,分别为 dn1、dn2、dn3。
5)客户端通过 FSDataOutputStream 模块请求 dn1 上传数据,dn1 收到请求会继续调用 dn2,然后 dn2 调用 dn3,将这个通信管道建立完成。
6)dn1、dn2、dn3 逐级应答客户端。
7)客户端开始往 dn1 上传第一个 block(先从磁盘读取数据放到一个本地内存缓存),以 packet 为单位,dn1 收到一个 packet 就会传给 dn2,dn2 传给 dn3;
dn1 每传一个 packet 会放入一个应答队列等待应答。
8)当一个 block 传输完成之后,客户端再次请求 NameNode 上传第二个 block 的服务器。(重复执行 3-7 步)。
HDFS 读数据过程 1)客户端通过 Distributed FileSystem 向 NameNode 请求下载文件,NameNode 通过查询元数据,找到文件块所在的 DataNode 地址。
2)挑选一台 DataNode(就近原则,然后随机)服务器,请求读取数据。
3)DataNode 开始传输数据给客户端(从磁盘里面读取数据输入流,以 packet 为单位来做校验)。
4)客户端以 packet 为单位接收,先在本地缓存,然后写入目标文件。
MapReduce 性能优化 Map 端优化
通过单个 map 的写入文件大小和任务处理时间得出
发生大量溢写时会产生性能问题和读取过载,比较 Map output records < Spilled Records
需要精确分配内存缓冲区
二进制文件和压缩文件本质上不基于块,因此不能拆分
小文件会产生大量并行任务来处理,会浪费很多资源
处理小文件的最好方法是打包为大文件
使用 Avro 对数据序列化来创建容器文件
使用 HAR 格式文件
使用序列文件把小文件存储成单个大文件
如果数据集很大但数据块很小会导致 mapper 过多,需要花时间进行拆分;因此输入文件大则数据块大小也要加大
大的数据块会加速磁盘 IO,但会增加网络传输开销,因而在 Map 阶段造成记录溢写
Map 任务的流程
输入数据和块大小的影响
处置小文件和不可拆分文件
在 Map 阶段压缩溢写记录
计算 Map 任务的吞吐量
Reduce 端优化
压缩排序和合并的数据量(combiner,数据压缩,数据过滤)
解决本地磁盘问题和网络问题
最大化内存分配以尽可能把数据保留在内存而不是输出到磁盘
造成 Reduce 低速的原因可能是未经优化的 reduce 函数,硬件问题或者不当的 Hadoop 配置
通过输入 Shuffle 除以 Reduce 运行时间得到吞吐量
Reduce 任务的流程
计算 Reduce 吞吐量
改善 Reduce 执行阶段
MapReduce 任务级别优化
使用 Combiner 类似于本地 Reduce 操作,可以提升全局 Reduce 操作效率
习惯上一般直接把 reduce 函数当做 Combiner,逻辑需满足交换律和结合律
Combiner 会在 Map 函数生成的键值对收集到列表,并经过 Combiner 运算直到 Combiner 缓冲区达到一定数目时,才会发送给 reduce。因此在数据量非常大的情况下可以很好的改善性能
使用压缩技术输入压缩:在有大量数据且计划重复处理时,应考虑输入压缩。Hadoop 会自动对合适扩展名的文件启用压缩和解压
压缩 Mapper 输出:当 map 任务中间数据量大时,应考虑在此阶段启用压缩。能改善 Shuffle 过程,降低网络开销
压缩 Reducer 输出:可以减少要存储的结果数据量,同时降低下游任务的输入数据量
如果磁盘 IO 和网络影响了 MR 作业性能,则在任意阶段(压缩输入,Mapper 或 Reduce 输出)启用压缩都可以改善处理时间,减小 IO 和网络开销
使用正确的 Writable 类型通过使用 FileInputFormat 实现原始字节比 WriteableComparable 更有优势
使用 Text 而不是 String 来消除字符串拆分所花的时间
使用 VIntWritable 或者 VLongWritable 有时比使用 int 和 long 更快
在代码中使用正确的可写类型能提高 MR 作业整体性能
在 Shuffle 和 Sort 阶段,中间键的比较可能会成为瓶颈
转载原创作者:大数据真好玩
评论