写点什么

大数据培训:hadoop 中 shuffle 过程面试题

  • 2022 年 1 月 05 日
  • 本文字数:3784 字

    阅读完需:约 12 分钟

Shuffle 阶段是指从 Map 的输出开始,包括系统执行排序以及传送 Map 输出到 Reduce 作为输入的过程。Sort 阶段是指对 Map 端输出的 Key 进行排序的过程。大数据培训不同的 Map 可能输出相同的 Key,相同的 Key 必须发送到同一个 Reduce 端处理。Shuffle 阶段可以分为 Map 端的 Shuffle 和 Reduce 端的 Shuffle。



​一、Map 端的 shuffle   

Map 端会处理输入数据并产生中间结果,这个中间结果会写到本地磁盘,而不是 HDFS。每个 Map 的输出会先写到内存缓冲区中,当写入的数据达到设定的阈值时,系统将会启动一个线程将缓冲区的数据写到磁盘,这个过程叫做 spill。

在 spill 写入之前,会先进行二次排序,首先根据数据所属的 partition 进行排序,然后每个 partition 中的数据再按 key 来排序。partition 的目是将记录划分到不同的 Reducer 上去,以期望能够达到负载均衡,以后的 Reducer 就会根据 partition 来读取自己对应的数据。接着运行 combiner(如果设置了的话),combiner 的本质也是一个 Reducer,其目的是对将要写入到磁盘上的文件先进行一次处理,这样,写入到磁盘的数据量就会减少。最后将数据写到本地磁盘产生 spill 文件(spill 文件保存在{mapred.local.dir}指定的目录中,Map 任务结束后就会被删除)。  

最后,每个 Map 任务可能产生多个 spill 文件,在每个 Map 任务完成前,会通过多路归并算法将这些 spill 文件归并成一个文件。至此,Map 的 shuffle 过程就结束了。

二、Reduce 端的 shuffle

Reduce 端的 shuffle 主要包括三个阶段,copy、sort(merge)和 reduce。

首先要将 Map 端产生的输出文件拷贝到 Reduce 端,但每个 Reducer 如何知道自己应该处理哪些数据呢?因为 Map 端进行 partition 的时候,实际上就相当于指定了每个 Reducer 要处理的数据(partition 就对应了 Reducer),所以 Reducer 在拷贝数据的时候只需拷贝与自己对应的 partition 中的数据即可。每个 Reducer 会处理一个或者多个 partition,但需要先将自己对应的 partition 中的数据从每个 Map 的输出结果中拷贝过来。

接下来就是 sort 阶段,也成为 merge 阶段,因为这个阶段的主要工作是执行了归并排序。从 Map 端拷贝到 Reduce 端的数据都是有序的,所以很适合归并排序。最终在 Reduce 端生成一个较大的文件作为 Reduce 的输入。

最后就是 Reduce 过程了,在这个过程中产生了最终的输出结果,并将其写到 HDFS 上。

说一下 zookeeper 的选举过程

Zookeeper 虽然在配置文件中并没有指定 Master 和 Slave。但是,Zookeeper 工作时,是有一个节点为 Leader,其他则为 Follower,Leader 是通过内部的选举机制临时产生的。

选举过程

假设有五台服务器组成的 Zookeeper 集群,它们的 id 从 1-5,同时它们都是最新启动的,也就是没有历史数据,在存放数据量这一点上,都是一样的。假设这些服务器依序启动,来看看会发生什么。

(1)服务器 1 启动,发起一次选举。服务器 1 投自己一票。此时服务器 1 票数一票,不够半数以上(3 票),选举无法完成,服务器 1 状态保持为 LOOKING;

(2)服务器 2 启动,再发起一次选举。服务器 1 和 2 分别投自己一票并交换选票信息:此时服务器 1 发现服务器 2 的 ID 比自己目前投票推举的(服务器 1)大,更改选票为推举服务器 2。此时服务器 1 票数 0 票,服务器 2 票数 2 票,没有半数以上结果,选举无法完成,服务器 1,2 状态保持 LOOKING

(3)服务器 3 启动,发起一次选举。此时服务器 1 和 2 都会更改选票为服务器 3。此次投票结果:服务器 1 为 0 票,服务器 2 为 0 票,服务器 3 为 3 票。此时服务器 3 的票数已经超过半数,服务器 3 当选 Leader。服务器 1,2 更改状态为 FOLLOWING,服务器 3 更改状态为 LEADING;

(4)服务器 4 启动,发起一次选举。此时服务器 1,2,3 已经不是 LOOKING 状态,不会更改选票信息。交换选票信息结果:服务器 3 为 3 票,服务器 4 为 1 票。此时服务器 4 服从多数,更改选票信息为服务器 3,并更改状态为 FOLLOWING;

(5)服务器 5 启动,同 4 一样当小弟。

为什么要对数仓进行分层

数仓进行分层的一个主要原因就是希望在管理数据的时候,能对数据有一个更加清晰的掌控。主要有以下优点:

1.划清层次结构:每一个数据分层都有它的作用域,这样我们在使用表的时候能更方便地定位和理解。2.数据血缘追踪:简单来讲可以这样理解,我们最终给下游是直接能使用的业务表,但是它的来源有很多,如果有一张来源表出问题了,我们希望能够快速准确地定位到问题,并清楚它的危害范围。3.减少重复开发:规范数据分层,开发一些通用的中间层数据,能够减少极大的重复计算。4.把复杂问题简单化。将一个复杂的任务分解成多个步骤来完成,每一层只处理单一的步骤,比较简单和容易理解。而且便于维护数据的准确性,当数据出现问题之后,可以不用修复所有的数据,只需要从有问题的步骤开始修复。5.屏蔽原始数据的异常。屏蔽业务的影响,不必改一次业务就需要重新接入数据。

数据仓库都分哪几层

如果划分细致,数据仓库总共可以划分为 5 层:

ODS 层

ODS 层: Operation Data Store,数据准备区,贴源层。直接接入源数据的:业务库、埋点日志、消息队列等。ODS 层数数据仓库的准备区

DW 数仓

DWD 层:Data Warehouse Details,数据明细层,属于业务层和数据仓库层的隔离层,把持和 ODS 层相同颗粒度。进行数据清洗和规范化操作,去空值/脏数据、离群值等。

DWM 层:Data Warehouse middle,数据中间层,在 DWD 的基础上进行轻微的聚合操作,算出相应的统计指标

DWS 层:Data warehouse service,数据服务层,在 DWM 的基础上,整合汇总一个主题的数据服务层。汇总结果一般为宽表,用于 OLAP、数据分发等。

ADS 层

ADS 层:Application data service, 数据应用层,存放在 ES,Redis、PostgreSql 等系统中,供数据分析和挖掘使用。

给定 a、b 两个文件,各存放 50 亿个 url,每个 url 各占 64 字节,内存限制是 4G,让你找出 a、b 文件共同的 url?

每个 url 大小为 64bytes,那么可以估计每个文件的大小为 5G×64=320G,远远大于内存限制的 4G,所以不可能将其完全加载到内存中处理,可以采用分治的思想来解决。

Step1:遍历文件 a,对每个 url 求取 hash(url)%1000,然后根据所取得的值将 url 分别存储到 1000 个小文件(记为 a0,a1,...,a999 ,每个小文件约 300M);

Step2: 遍历文件 b,采取和 a 相同的方式将 url 分别存储到 1000 个小文件(记为 b0,b1,...,b999);

巧妙之处:这样处理后,所有可能相同的 url 都被保存在对应的小文件(a0 vs b0, a1 vs b1 ,...,a999 vs b999)中,不对应的小文件不可能有相同的 url。然后我们只要求出这个 1000 对小文件中相同的 url 即可。

Step3:求每对小文件 ai 和 bi 中相同的 url 时,可以把 ai 的 url 存储到 hash_set/hash_map 中。然后遍历 bi 的每个 url,看其是否在刚才构建的 hash_set 中,如果是,那么就是共同的 url,存到文件里面就可以了。

有一个 1G 大小的一个文件,里面每一行是一个词,词的大小不超过 16 字节,内存限制大小是 1M,要求返回频数最高的 100 个词。

Step1:顺序读文件中,对于每个词 x,取 hash(x)%5000,然后按照该值存到 5000 个小文件(记为 f0 ,f1 ,... ,f4999)中,这样每个文件大概是 200k 左右,如果其中的有的文件超过了 1M 大小,还可以按照类似的方法继续往下分,直到分解得到的小文件的大小都不超过 1M;

Step2:对每个小文件,统计每个文件中出现的词以及相应的频率(可以采用 trie 树/hash_map 等),并取出出现频率最大的 100 个词(可以用含 100 个结点的最小堆),并把 100 词及相应的频率存入文件,这样又得到了 5000 个文件;

Step3:把这 5000 个文件进行归并(类似与归并排序);

现有海量日志数据保存在一个超级大的文件中,该文件无法直接读入内存,要求从中提取某天出访问百度次数最多的那个 IP。

Step1:从这一天的日志数据中把访问百度的 IP 取出来,逐个写入到一个大文件中;

Step2:注意到 IP 是 32 位的,最多有 2^32 个 IP。同样可以采用映射的方法,比如模 1000,把整个大文件映射为 1000 个小文件;

Step3:找出每个小文中出现频率最大的 IP(可以采用 hash_map 进行频率统计,然后再找出频率最大的几个)及相应的频率;

Step4:在这 1000 个最大的 IP 中,找出那个频率最大的 IP,即为所求。

现有 1TB 文本文件 words.txt,文件每行为若干个英文单词,单词间用空格分隔,文件中存在单词 word1 占据了总单词量的 30%以上,其他单词出现频率较为平均。根据以上场景,请描述 mapreduce 如何统计每个单词出现的频次。

题中所述文本文件存在明显的数据倾斜问题,word1 出现频次远大于其他单词,因此需要对 word1 在 map 阶段的输出 key 值进行构造,从而将 word1 均分给多个 reduce 计算。注:如果只答出一般的 wordcount 步骤,没有考虑到数据倾斜问题,严格来说应不得分。

Step1:map 阶段 map 方法按行读取文件,每行文件按空格分隔为一个单词列表,依次读取每个单词. 若单词为 word1,则 map 阶段的输出为<word1_randomInt(50),1>,即"word1_“加 0-50 之间的随机整数。其他单词直接输出<单词,1>。注:只要答出 map 阶段对单词 word1 的输出 key 值进行构造,以达到将 word1 均分为多个不同的 key 输出的目的即可,具体方法可有所区别。

Step2:combine 阶段 注:计算方法同 reduce 阶段,该步骤可省去,答出+1 分。

Step3:reduce 阶段 对同一 key 值得 value 进行累加,得出各个 key 值的出现次数。

Step4:计算最终结果。依次逐行读取 reduce 阶段输出目录中的所有文件:a.若 key 不是形同"word1_XX”,直接输出 key 和 value,即得出对应单词的出现频次。b.若 key 类似"word1_XX",对所有 key 的 value 值累加,即可得出 word1 的出现频次。

文章来源:java 大数据学习

用户头像

关注尚硅谷,轻松学IT 2021.11.23 加入

还未添加个人简介

评论

发布
暂无评论
大数据培训:hadoop中shuffle过程面试题