写点什么

训练营第十二周总结

用户头像
大脸猫
关注
发布于: 2021 年 01 月 08 日

本周主要学习 hadoop 相关的内容,对于 map reduce 有了一个简单认识。


1、MapReduce 中数据流动

   (1)最简单的过程:  map - reduce

   (2)定制了 partitioner 以将 map 的结果送往指定 reducer 的过程: map - partition - reduce

   (3)增加了在本地先进性一次 reduce(优化)过程: map - combin(本地 reduce) - partition -reduce

2、Mapreduce 中 Partition 的概念以及使用。

(1)Partition 的原理和作用

        得到 map 给的记录后,他们该分配给哪些 reducer 来处理呢?hadoop 采用的默认的派发方式是根据散列值来派发的,但是实际中,这并不能很高效或者按照我们要求的去执行任务。例如,经过 partition 处理后,一个节点的 reducer 分配到了 20 条记录,另一个却分配道了 10W 万条,试想,这种情况效率如何。又或者,我们想要处理后得到的文件按照一定的规律进行输出,假设有两个 reducer,我们想要最终结果中 part-00000 中存储的是"h"开头的记录的结果,part-00001 中存储其他开头的结果,这些默认的 partitioner 是做不到的。所以需要我们自己定制 partition 来根据自己的要求,选择记录的 reducer。自定义 partitioner 很简单,只要自定义一个类,并且继承 Partitioner 类,重写其 getPartition 方法就好了,在使用的时候通过调用 Job 的 setPartitionerClass 指定一下即可


        Map 的结果,会通过 partition 分发到 Reducer 上。Mapper 的结果,可能送到 Combiner 做合并,Combiner 在系统中并没有自己的基类,而是用 Reducer 作为 Combiner 的基类,他们对外的功能是一样的,只是使用的位置和使用时的上下文不太一样而已。Mapper 最终处理的键值对<key, value>,是需要送到 Reducer 去合并的,合并的时候,有相同 key 的键/值对会送到同一个 Reducer 那。哪个 key 到哪个 Reducer 的分配过程,是由 Partitioner 规定的。它只有一个方法,


        getPartition(Text key, Text value, int numPartitions)


输入是 Map 的结果对<key, value>和 Reducer 的数目,输出则是分配的 Reducer(整数编号)。就是指定 Mappr 输出的键值对到哪一个 reducer 上去。系统缺省的 Partitioner 是 HashPartitioner,它以 key 的 Hash 值对 Reducer 的数目取模,得到对应的 Reducer。这样保证如果有相同的 key 值,肯定被分配到同一个 reducre 上。如果有 N 个 reducer,编号就为 0,1,2,3……(N-1)。


(2)Partition 的使用

        分区出现的必要性,如何使用 Hadoop 产生一个全局排序的文件?最简单的方法就是使用一个分区,但是该方法在处理大型文件时效率极低,因为一台机器必须处理所有输出文件,从而完全丧失了 MapReduce 所提供的并行架构的优势。事实上我们可以这样做,首先创建一系列排好序的文件;其次,串联这些文件(类似于归并排序);最后得到一个全局有序的文件。主要的思路是使用一个 partitioner 来描述全局排序的输出。比方说我们有 1000 个 1-10000 的数据,跑 10 个 ruduce 任务, 如果我们运行进行 partition 的时候,能够将在 1-1000 中数据的分配到第一个 reduce 中,1001-2000 的数据分配到第二个 reduce 中,以此类推。即第 n 个 reduce 所分配到的数据全部大于第 n-1 个 reduce 中的数据。这样,每个 reduce 出来之后都是有序的了,我们只要 cat 所有的输出文件,变成一个大的文件,就都是有序的了


基本思路就是这样,但是现在有一个问题,就是数据的区间如何划分,在数据量大,还有我们并不清楚数据分布的情况下。一个比较简单的方法就是采样,假如有一亿的数据,我们可以对数据进行采样,如取 10000 个数据采样,然后对采样数据分区间。在 Hadoop 中,patition 我们可以用 TotalOrderPartitioner 替换默认的分区。然后将采样的结果传给他,就可以实现我们想要的分区。在采样时,我们可以使用 hadoop 的几种采样工具,RandomSampler,InputSampler,IntervalSampler。


       这样,我们就可以对利用分布式文件系统进行大数据量的排序了,我们也可以重写 Partitioner 类中的 compare 函数,来定义比较的规则,从而可以实现字符串或其他非数字类型的排序,也可以实现二次排序乃至多次排序。


2、MapReduce 中分组的概念和使用

    分区的目的是根据 Key 值决定 Mapper 的输出记录被送到哪一个 Reducer 上去处理。而分组的就比较好理解了。笔者认为,分组就是与记录的 Key 相关。在同一个分区里面,具有相同 Key 值的记录是属于同一个分组的。


3、MapReduce 中 Combiner 的使用

        很多 MapReduce 程序受限于集群上可用的带宽,所以它会尽力最小化需要在 map 和 reduce 任务之间传输的中间数据。Hadoop 允许用户声明一个 combiner function 来处理 map 的输出,同时把自己对 map 的处理结果作为 reduce 的输入。因为 combiner function 本身只是一种优化,hadoop 并不保证对于某个 map 输出,这个方法会被调用多少次。换句话说,不管 combiner function 被调用多少次,对应的 reduce 输出结果都应该是一样的。


  下面我们以《权威指南》的例子来加以说明,假设 1950 年的天气数据读取是由两个 map 完成的,其中第一个 map 的输出如下:

  (1950, 0)

  (1950, 20)

  (1950, 10)


第二个 map 的输出为:

       (1950, 25)

       (1950, 15)


而 reduce 得到的输入为:(1950, [0, 20, 10, 25, 15]), 输出为:(1950, 25)


  由于 25 是集合中的最大值,我们可以使用一个类似于 reduce function 的 combiner function 来找出每个 map 输出中的最大值,这样的话,reduce 的输入就变成了:

  (1950, [20, 25])


  各个 funciton 对温度值的处理过程可以表示如下:max(0, 20, 10, 25, 15) =max(max(0, 20, 10), max(25, 15)) = max(20, 25) = 25


  注意:并不是所有的函数都拥有这个属性的(有这个属性的函数我们称之为 commutative 和 associative),例如,如果我们要计算平均温度,就不能这样使用 combiner function,因为 mean(0, 20, 10, 25, 15) =14,而 mean(mean(0, 20, 10),mean(25, 15)) = mean(10, 20) = 15


  combiner function 并不能取代 reduce function(因为仍然需要 reduce function 处理来自不同 map 的带有相同 key 的记录)。但是他可以帮助减少需要在 map 和 reduce 之间传输的数据,就为这一点 combiner function 就值得考虑使用。


4、Shuffle 阶段排序流程详解

        我们首先看一下 MapReduce 中的排序的总体流程。


        MapReduce 框架会确保每一个 Reducer 的输入都是按 Key 进行排序的。一般,将排序以及 Map 的输出传输到 Reduce 的过程称为混洗(shuffle)。每一个 Map 都包含一个环形的缓存,默认 100M,Map 首先将输出写到缓存当中。当缓存的内容达到“阈值”时(阈值默认的大小是缓存的 80%),一个后台线程负责将结果写到硬盘,这个过程称为“spill”。Spill 过程中,Map 仍可以向缓存写入结果,如果缓存已经写满,那么 Map 进行等待。


Spill 的具体过程如下:首先,后台线程根据 Reducer 的个数将输出结果进行分组,每一个分组对应一个 Reducer。其次,对于每一个分组后台线程对输出结果的 Key 进行排序。在排序过程中,如果有 Combiner 函数,则对排序结果进行 Combiner 函数进行调用。每一次 spill 都会在硬盘产生一个 spill 文件。因此,一个 Map task 有可能会产生多个 spill 文件,当 Map 写出最后一个输出时,会将所有的 spill 文件进行合并与排序,输出最终的结果文件。在这个过程中 Combiner 函数仍然会被调用。从整个过程来看,Combiner 函数的调用次数是不确定的。下面我们重点分析下 Shuffle 阶段的排序过程:


        Shuffle 阶段的排序可以理解成两部分,一个是对 spill 进行分区时,由于一个分区包含多个 key 值,所以要对分区内的<key,value>按照 key 进行排序,即 key 值相同的一串<key,value>存放在一起,这样一个 partition 内按照 key 值整体有序了。


        第二部分并不是排序,而是进行 merge,merge 有两次,一次是 map 端将多个 spill 按照分区和分区内的 key 进行 merge,形成一个大的文件。第二次 merge 是在 reduce 端,进入同一个 reduce 的多个 map 的输出 merge 在一起,该 merge 理解起来有点复杂,最终不是形成一个大文件,而且期间数据在内存和磁盘上都有。所以 shuffle 阶段的 merge 并不是严格的排序意义,只是将多个整体有序的文件 merge 成一个大的文件,由于不同的 task 执行 map 的输出会有所不同,所以 merge 后的结果不是每次都相同,不过还是严格要求按照分区划分,同时每个分区内的具有相同 key 的<key,value>对挨在一起。


        Shuffle 排序综述:如果只定义了 map 函数,没有定义 reduce 函数,那么输入数据经过 shuffle 的排序后,结果为 key 值相同的输出挨在一起,且 key 值小的一定在前面,这样整体来看 key 值有序(宏观意义的,不一定是按从大到小,因为如果采用默认的 HashPartitioner,则 key 的 hash 值相等的在一个分区,如果 key 为 IntWritable 的话,每个分区内的 key 会排序好的),而每个 key 对应的 value 不是有序的。


5、MapReduce 中辅助排序的原理与实现

(1)任务

我们需要把内容如下的 sample.txt 文件处理为下面文件:


源文件:Sample.txt


bbb 654


ccc 534


ddd 423


aaa 754


bbb 842


ccc 120


ddd 219


aaa 344


bbb 214


ccc 547


ddd 654


aaa 122


bbb 102


ccc 479


ddd 742


aaa 146


目标:part-r-00000


aaa 122


bbb 102


ccc 120


ddd 219


(2)工作原理

   过程导引:

   1、定义包含记录值和自然值的组合键,本例中为 MyPariWritable.


   2、自定义键的比较器(comparator)来根据组合键对记录进行排序,即同时利用自然键和自然值进行排序。(aaa 122 组合为一个键)。


   3、针对组合键的 Partitioner(本示例使用默认的 hashPartitioner)和分组 comparator 在进行分区和分组时均只考虑自然键。


   详细过程:

首先在 map 阶段,使用 job.setInputFormatClass 定义的 InputFormat 将输入的数据集分割成小数据块 splites,同时 InputFormat 提供一个 RecordReder 的实现。本例子中使用的是 TextInputFormat,他提供的 RecordReder 会将文本的一行的行号作为 key,这一行的文本作为 value。这就是自定义 Map 的输入是<LongWritable, Text>的原因。然后调用自定义 Map 的 map 方法,将一个个<LongWritable, Text>对输入给 Map 的 map 方法。注意输出应该符合自定义 Map 中定义的输出< MyPariWritable, NullWritable>。最终是生成一个 List< MyPariWritable, NullWritable>。在 map 阶段的最后,会先调用 job.setPartitionerClass 对这个 List 进行分区,每个分区映射到一个 reducer。每个分区内又调用 job.setSortComparatorClass 设置的 key 比较函数类排序。可以看到,这本身就是一个二次排序。在 reduce 阶段,reducer 接收到所有映射到这个 reducer 的 map 输出后,也是会调用 job.setSortComparatorClass 设置的 key 比较函数类对所有数据对排序。然后开始构造一个 key 对应的 value 迭代器。这时就要用到分组,使用 jobjob.setGroupingComparatorClass 设置的分组函数类。只要这个比较器比较的两个 key 相同,他们就属于同一个组(本例中由于要求得每一个分区内的最小值,因此比较 MyPariWritable 类型的 Key 时,只需要比较自然键,这样就能保证只要两个 MyPariWritable 的自然键相同,则它们被送到 Reduce 端时候的 Key 就认为在相同的分组,由于该分组的 Key 只取分组中的第一个,而这些数据已经按照自定义 MyPariWritable 比较器排好序,则第一个 Key 正好包含了每一个自然键对应的最小值),它们的 value 放在一个 value 迭代器,而这个迭代器的 key 使用属于同一个组的所有 key 的第一个 key。最后就是进入 Reducer 的 reduce 方法,reduce 方法的输入是所有的 key 和它的 value 迭代器。同样注意输入与输出的类型必须与自定义的 Reducer 中声明的一致。


用户头像

大脸猫

关注

还未添加个人签名 2018.04.27 加入

还未添加个人简介

评论

发布
暂无评论
训练营第十二周总结