Hadoop 的 MapReduce 到底有什么问题?
作为 Hadoop 里重要的分布式计算组件 MapReduce 到底存在什么样的问题,大家纷纷都转投其他技术栈?我们来一起探个究竟。本文会先详细解析一下整个 MapReduce 的过程,编程方式,然后再去分析一下存在的问题和其中可以借鉴的点。
Map Reduce 的过程详细解析
① : 每个数据的 Split 对应一个 Map 任务作为 Map 的输入,一般来说是 HDFS 的一个 Block。
② : Map 产生的数据会先写入到一个环形的内存的 Buffer 空间里。
③ : 当 Buffer 满了以后, 会 Spill 溢出数据到磁盘里。在溢出之前会先按照 Partition 函数对数据进行分区(默认是取 key 的 hash 值然后根据 Reducer 的个数进行取模),然后按照 Key 进行排序(快速排序)。如果设置了 Combiner 会在写入磁盘前,对数据进行 Combine 操作,通过减少 key 的数据量来减轻 Reducer 拉取数据的网络传输。
④ : 最后将所有的溢出文件合并为一个文件,合并的过程中按照分区按照 key 进行排序(归并排序), 如果溢出文件超过一定的数量(可配置), 会在合并的前还会执行 Combine 操作(如果设置了 Combiner)。
⑤ : 当 Map 端有任务完成后,Reducer 端就会启动对应的 fetch & copy 线程去从 Map 端复制数据。
⑥ : 当 Copy 过来的数据内存中放不下后,会往磁盘写,写之前会先进行 merge 和 sort 操作(归并排序),combiner 操作,最终会合并得到一份 Reduce 的输入数据。
⑦ : 当输入数据准备好后,进行 Reduce 操作。
⑧ : 输出数据到指定的位置。
Map Reduce 的编程
Map 输入是<Key, Value>, 输出是一个或者多个<Key, Value>Reduce 的输入是<Key, Iteratorable<Value>> 输出是<Key, Value>。总结来说:
input<k1, v1>-->Map--><k2,v2>-->combine<k2,v2>-->Reduce--><k3, v3>(output)
实现一个 Map 接口类
实现一个 Reducer 的接口类
以 Wordcount 为例:
实现 Map 接口类
实现 Reducer 接口类
处理任务的 Job
用 Spark 来实现 Wordcount
Map Reduce 存在什么样的问题?
存在的问题
操作符或者算子固定单一只有 Map 和 Reduce 两种,无法支持复杂的操作,比如表之间
Join
操作,需要开发人员自己写Join
的逻辑实现:Reduce 端的 Join,在 Map 的时候对两个表的两一个 key 的数据分别打上表的标签,放进同一个 key 里,然后在 Reduce 阶段按标签分成两组,再进行 Join 输出操作。
Map 端 Join, 适合一端表小的情况,将小表在 Map 端作为其中一个 lookup 输入,进行 Join 操作。
读入输入数据然后产生的中间输出结果必须输出到磁盘,无法在中间内存中处理(Spark 可以选择 cache 在内存里,用空间换时间),重复写磁盘,I/O 开销比较大。
Key 需要是可比较,会用来排序。
Shuffle 和 Reduce 阶段强制要求对数据按照某 key 进行排序,某些场景(比如数据量不是特别大的时候,简单 hash 就够了)会有性能的损失。
不能在线聚合,不论是 Map 端的 combine 还是 Reduce 端的都需要等所有的数据都存放到内存或者磁盘后,再执行聚合操作,存储这些数据需要消耗大量的内存和磁盘空间。如果能够一边获取 record 一边聚合,那么就会大大的减少存储空间,减少延时。
总结:个人觉得 Map Reduce 主要的问题在于函数太过于底层,对用户的使用和操作上来说不够灵活,另外强制约束了需要按 key 排序和输出到磁盘使得其有性能上损失。但是也并不是全部一无是处,其中
简单清晰的分治 MapReduce(Map 即为分,Reduce 即为合)进行分布式计算的思想被多个框架借鉴,各个阶段读什么数据,进行什么操作,输出数据都是确定的。
内存优势:
内存使用固定,基本开销就是 Map 端输出数据的 Spill Buffer,Reduce 端需要一个大的数据来存放复制过来的分区数据两部分。用户聚合函数这部分的内存消耗是不确定的。
排序后的数据在进行聚合的时候可以用最大堆或者最小堆来做,省空间且比较快。
按照 Key 进行排序并 Spill 到磁盘的功能,可以保证 Shuffle 在大规模的数据时仍然能够顺利运行,不会那么容易出现 OOM 之类的问题。
通过归并排序来减少碎文件的提升 I/O 性能的思想其实也在 Spark 的 SortMergeShuffle 里使用。
Spark 是如何来解决这些问题的?
提供丰富的操作符和分层的 DAG 并发处理层
通过抽象数据结构 RDD 和[丰富的操作符][2]来提升用户的操作体验。
除了 Map, Reduce,还提供了 filter(), join(), cogroup(), flatMap(), union(), distinct()等等用户常用的操作符, 可参考 Reference 里 Spark 的 transformation 文档。
会将用户的代码分为逻辑处理层和物理执行层。
逻辑处理层将用户的代码(定义的各种操作符)解析成一个 DAG(有向无环图)来定义数据及数据之间的流动/操作关系。其中结点是 RDD 数据,箭头是在 RDD 上的一些数据操作和数据之间的关系。
物理执行层会根据数据之间的依赖关系将 DAG 整个流程图划分成多个小的执行阶段(stage),然后按照各个执行阶段执行并处理数据。
更灵活的 Shuffle 框架
解决强制按 Key 排序的问题
Spark 提供按 PartitionId 排序、按 Key 排序等多种方式来灵活应对不同操作的排序需求。
提供在线聚合的方式
采用 hash-based 的聚合,利用 HashMap 的在线聚合特性,在将 record 插入 HashMap 时自动完成聚合过程,这也是 Spark 为什么设计 AppendOnlyMap 等数据结构的原因。
通过将最终临时文件合并成一个文件,按 PartitionId 顺序存储来减少碎文件
Shuffle 产生的临时文件会按照
PartitionId
去排序,最终会按照 PartiontionId 的顺序将一个 Map 产生的所有文件合成一个文件,来减少碎文件。
Reference
[Hadoop 文档][1]
[Spark 文档 Transformation][2]
[2]: https://spark.apache.org/docs/latest/rdd-programming-guide.html#transformations
更多大数据相关分享,可关注微信公众号,搜索“数据元素”或扫描下方二维码。
版权声明: 本文为 InfoQ 作者【hanke】的原创文章。
原文链接:【http://xie.infoq.cn/article/d1f9dca888d6b119fd484c894】。文章转载请联系作者。
评论