MapReduce 的运行机制详解
MapTask 工作机制
整个 Map 阶段流程大体如上图所示。
简单概述:inputFile 通过 split 被逻辑切分为多个 split 文件,通过 Record 按行读取内容给 map(用户自己实现的)进行处理,数据被 map 处理结束之后交给 OutputCollector 收集器,对其结果 key 进行分区(默认使用 hash 分区),然后写入 buffer,每个 map task 都有一个内存缓冲区,存储着 map 的输出结果,当缓冲区快满的时候需要将缓冲区的数据以一个临时文件的方式存放到磁盘,当整个 map task 结束后再对磁盘中这个 map task 产生的所有临时文件做合并,生成最终的正式输出文件,然后等待 reduce task 来拉数据
详细步骤
1) 读取数据组件 InputFormat (默认 TextInputFormat) 会通过
getSplits
方法对输入目录中文件进行逻辑切片规划得到block
, 有多少个block
就对应启动多少个MapTask
.2) 将输入文件切分为
block
之后, 由RecordReader
对象 (默认是 LineRecordReader) 进行读取, 以\n
作为分隔符, 读取一行数据, 返回<key,value>
. Key 表示每行首字符偏移值, Value 表示这一行文本内容3) 读取
block
返回<key,value>
, 进入用户自己继承的 Mapper 类中,执行用户重写的 map 函数, RecordReader 读取一行这里调用一次4) Mapper 逻辑结束之后, 将 Mapper 的每条结果通过
context.write
进行 collect 数据收集. 在 collect 中, 会先对其进行分区处理,默认使用 HashPartitioner
5) 接下来, 会将数据写入内存, 内存中这片区域叫做环形缓冲区, 缓冲区的作用是批量收集 Mapper 结果, 减少磁盘 IO 的影响. 我们的 Key/Value 对以及 Partition 的结果都会被写入缓冲区. 当然, 写入之前,Key 与 Value 值都会被序列化成字节数组
6) 当溢写线程启动后, 需要对这 80MB 空间内的 Key 做排序 (Sort). 排序是 MapReduce 模型默认的行为, 这里的排序也是对序列化的字节做的排序
7) 合并溢写文件, 每次溢写会在磁盘上生成一个临时文件 (写之前判断是否有 Combiner), 如果 Mapper 的输出结果真的很大, 有多次这样的溢写发生, 磁盘上相应的就会有多个临时文件存在. 当整个数据处理结束之后开始对磁盘中的临时文件进行 Merge 合并, 因为最终的文件只有一个, 写入磁盘, 并且为这个文件提供了一个索引文件, 以记录每个 reduce 对应数据的偏移量
版权声明: 本文为 InfoQ 作者【五分钟学大数据】的原创文章。
原文链接:【http://xie.infoq.cn/article/2006c36923ecdf055817b0968】。
本文遵守【CC-BY 4.0】协议,转载请保留原文出处及本版权声明。
评论