写点什么

Hadoop 的 MapReduce 到底有什么问题?

用户头像
hanke
关注
发布于: 2021 年 01 月 14 日
Hadoop的MapReduce到底有什么问题?

作为 Hadoop 里重要的分布式计算组件 MapReduce 到底存在什么样的问题,大家纷纷都转投其他技术栈?我们来一起探个究竟。本文会先详细解析一下整个 MapReduce 的过程,编程方式,然后再去分析一下存在的问题和其中可以借鉴的点。


Map Reduce 的过程详细解析

① : 每个数据的 Split 对应一个 Map 任务作为 Map 的输入,一般来说是 HDFS 的一个 Block。

② : Map 产生的数据会先写入到一个环形的内存的 Buffer 空间里。

③ : 当 Buffer 满了以后, 会 Spill 溢出数据到磁盘里。在溢出之前会先按照 Partition 函数对数据进行分区(默认是取 key 的 hash 值然后根据 Reducer 的个数进行取模),然后按照 Key 进行排序(快速排序)。如果设置了 Combiner 会在写入磁盘前,对数据进行 Combine 操作,通过减少 key 的数据量来减轻 Reducer 拉取数据的网络传输。

④ : 最后将所有的溢出文件合并为一个文件,合并的过程中按照分区按照 key 进行排序(归并排序), 如果溢出文件超过一定的数量(可配置), 会在合并的前还会执行 Combine 操作(如果设置了 Combiner)。

⑤ : 当 Map 端有任务完成后,Reducer 端就会启动对应的 fetch & copy 线程去从 Map 端复制数据。

⑥ : 当 Copy 过来的数据内存中放不下后,会往磁盘写,写之前会先进行 merge 和 sort 操作(归并排序),combiner 操作,最终会合并得到一份 Reduce 的输入数据。

⑦ : 当输入数据准备好后,进行 Reduce 操作。

⑧ : 输出数据到指定的位置。


Map Reduce 的编程

Map 输入是<Key, Value>, 输出是一个或者多个<Key, Value>Reduce 的输入是<Key, Iteratorable<Value>> 输出是<Key, Value>。总结来说:

input<k1, v1>-->Map--><k2,v2>-->combine<k2,v2>-->Reduce--><k3, v3>(output)


  • 实现一个 Map 接口类

  • 实现一个 Reducer 的接口类


以 Wordcount 为例:

实现 Map 接口类

public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {	private final statck IntWritable one = new IntWritable(1);	private Text word = new Text();		public void Map(Object key, Text value, Context context) throws IOException, InterruptedException {		StringTokenizer itr = new StringTokenizer(value.toString());		while (itr.hasMoreTokens()) {			word.set(itr.nextToken());			context.write(word, one);		}	}}
复制代码

实现 Reducer 接口类

public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWriterable> {	private IntWritable result = new IntWritable();		public void Reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {		int sum = 0;		for (IntWritable val: values) {			sum += val.get();		}		result.set(sum);		context.write(key, result);	}}
复制代码

处理任务的 Job

public static void main(String[] args)throws Exception {	Configuration conf = new Configuration();	Job job = Job.getInstance(conf, "word count");	job.setJarByClass(WordCount.class);	job.setMapperClass(TokenizerMapper.class);	job.setCombinerClass(IntSumReducer.class);	job.setReducerClass(IntSumReducer.class);	job.setOutputKeyClass(Text.class);	job.setOutputValueClass(IntWritable.class);	FileInputFormat.addInputPath(job, new Path(args[0]));	FileOutputFormat.addOutputPath(job, new Path(args[1]));	System.exit(job.waitForCompletion(true)?0:1);}
复制代码


用 Spark 来实现 Wordcount

object WordCount {	def main(args: Array[String]): Unit = {		val conf = new SparkConf().setAppName("WordCount").setMaster("local[*]"))		val spark = SparkSession.builder().config(conf).getOrCreate()				import spark.implicits._		val count = spark.read.textFile("input.txt")				.flatMap(_.split(" "))				.Map(s => (s, 1))				.rdd				.ReduceByKey((a, b) => a + b)				.collect()	}}
复制代码


Map Reduce 存在什么样的问题?

存在的问题

  • 操作符或者算子固定单一只有 Map 和 Reduce 两种,无法支持复杂的操作,比如表之间Join操作,需要开发人员自己写Join的逻辑实现:

  • Reduce 端的 Join,在 Map 的时候对两个表的两一个 key 的数据分别打上表的标签,放进同一个 key 里,然后在 Reduce 阶段按标签分成两组,再进行 Join 输出操作。

  • Map 端 Join, 适合一端表小的情况,将小表在 Map 端作为其中一个 lookup 输入,进行 Join 操作。

  • 读入输入数据然后产生的中间输出结果必须输出到磁盘,无法在中间内存中处理(Spark 可以选择 cache 在内存里,用空间换时间),重复写磁盘,I/O 开销比较大。

  • Key 需要是可比较,会用来排序。

  • Shuffle 和 Reduce 阶段强制要求对数据按照某 key 进行排序,某些场景(比如数据量不是特别大的时候,简单 hash 就够了)会有性能的损失。

  • 不能在线聚合,不论是 Map 端的 combine 还是 Reduce 端的都需要等所有的数据都存放到内存或者磁盘后,再执行聚合操作,存储这些数据需要消耗大量的内存和磁盘空间。如果能够一边获取 record 一边聚合,那么就会大大的减少存储空间,减少延时。


总结:个人觉得 Map Reduce 主要的问题在于函数太过于底层,对用户的使用和操作上来说不够灵活,另外强制约束了需要按 key 排序和输出到磁盘使得其有性能上损失。但是也并不是全部一无是处,其中

  • 简单清晰的分治 MapReduce(Map 即为分,Reduce 即为合)进行分布式计算的思想被多个框架借鉴,各个阶段读什么数据,进行什么操作,输出数据都是确定的。

  • 内存优势:

  • 内存使用固定,基本开销就是 Map 端输出数据的 Spill Buffer,Reduce 端需要一个大的数据来存放复制过来的分区数据两部分。用户聚合函数这部分的内存消耗是不确定的。

  • 排序后的数据在进行聚合的时候可以用最大堆或者最小堆来做,省空间且比较快。

  • 按照 Key 进行排序并 Spill 到磁盘的功能,可以保证 Shuffle 在大规模的数据时仍然能够顺利运行,不会那么容易出现 OOM 之类的问题。

  • 通过归并排序来减少碎文件的提升 I/O 性能的思想其实也在 Spark 的 SortMergeShuffle 里使用。


Spark 是如何来解决这些问题的?

提供丰富的操作符和分层的 DAG 并发处理层

  • 通过抽象数据结构 RDD 和[丰富的操作符][2]来提升用户的操作体验。

  • 除了 Map, Reduce,还提供了 filter(), join(), cogroup(), flatMap(), union(), distinct()等等用户常用的操作符, 可参考 Reference 里 Spark 的 transformation 文档。


  • 会将用户的代码分为逻辑处理层物理执行层

  • 逻辑处理层将用户的代码(定义的各种操作符)解析成一个 DAG(有向无环图)来定义数据及数据之间的流动/操作关系。其中结点是 RDD 数据,箭头是在 RDD 上的一些数据操作和数据之间的关系。

  • 物理执行层会根据数据之间的依赖关系将 DAG 整个流程图划分成多个小的执行阶段(stage),然后按照各个执行阶段执行并处理数据。


更灵活的 Shuffle 框架

  • 解决强制按 Key 排序的问题

  • Spark 提供按 PartitionId 排序、按 Key 排序等多种方式来灵活应对不同操作的排序需求。

  • 提供在线聚合的方式

  • 采用 hash-based 的聚合,利用 HashMap 的在线聚合特性,在将 record 插入 HashMap 时自动完成聚合过程,这也是 Spark 为什么设计 AppendOnlyMap 等数据结构的原因。

  • 通过将最终临时文件合并成一个文件,按 PartitionId 顺序存储来减少碎文件

  • Shuffle 产生的临时文件会按照PartitionId去排序,最终会按照 PartiontionId 的顺序将一个 Map 产生的所有文件合成一个文件,来减少碎文件。

Reference

  • [Hadoop 文档][1]

  • [Spark 文档 Transformation][2]


[1]: https://hadoop.apache.org/docs/r3.3.0/hadoop-MapReduce-client/hadoop-MapReduce-client-core/MapReduceTutorial.html

[2]: https://spark.apache.org/docs/latest/rdd-programming-guide.html#transformations


更多大数据相关分享,可关注微信公众号,搜索“数据元素”或扫描下方二维码。



发布于: 2021 年 01 月 14 日阅读数: 580
用户头像

hanke

关注

凡是过往,皆为序章 2019.09.11 加入

热爱大数据技术沉淀和分享,致力于构建让数据业务产品更易用的大数据生态圈,为业务增值。

评论

发布
暂无评论
Hadoop的MapReduce到底有什么问题?