写点什么

[架构师训练营第 1 期] 第 12 周学习总结

发布于: 2020 年 12 月 13 日

本周主要介绍了大数据相关的存储系统(HDFS)、编程模型和计算框架(MapReduce)以及相关工具(YARN、Hive)。

HDFS(Hadoop Distributed File System)


大数据的基础是数据存储,有了数据才能够进行计算,有了计算才能够支持应用。


那么如何进行数据存储,数据存储有哪些特性?

或者说,如何进行数据存储才能保证数据的高速读写和高可用性?

HDFS 的系统架构

NameNode 存储文件系统的元数据(文件分配表、文件控制块等),对文件系统的数据进行管理(接收客户端申请、分配磁盘资源等)

DataNode 存储文件系统的具体数据,接收客户端的读写请求,进行实际的读写操作并返回响应给客户端

Client 接收用户的读写请求,并在必要的时候向 NameNode 发送申请,向 DataNode 发送读写请求

默认配置情况下,DataNode 上的每一份数据都会在另外两台 DataNode 服务器上各存在一个备份

HDFS 的设计目标

以流式数据访问模式存储超大文件,运行于商用硬件集群上(节点失效是常态)。

允许商用硬件失效宕机,由 HDFS 来处理硬件失效宕机的情况(自动备份和切换)。

流式数据访问模式即:批量顺序,一次写入,多次读取;一次写入即不再修改。

它主要是一个用于大数据计算的,而非随机访问的存储系统。

因此:最佳实践是不建议修改已存储的数据,多用户随机写入、快速数据访问也不现实,小文件存储不合适。

HDFS 的数据一般是利用 MapReduce 生成写入的,再利用 MapReduce 读取。

因此:HDFS 主要是面向 MapReduce 或 Spark 应用的,而非直接面向开发者接口编程使用的。

HDFS 的失效策略

  • DataNode 上的磁盘失效:DataNode 正常工作,尽快通知 NameNode,由 NameNode 调度其它 DataNode 再形成三个备份,继续保证文件系统高可用。

  • DataNode 的服务器失效:DataNode 正常情况下每 3 秒钟向 NameNode 发送心跳,当 NameNode 10 分钟没有收到心跳,则认为相应 DataNode 已经失效,并取出该 DataNode 上对应的数据 block,找到对应的备份,调度其它 DataNode 再形成三个备份,继续保证文件系统高可用。

  • NameNode 的服务器失效:

一种方法是:

NameNode 运行过程中会持久化一份操作日志(edit log)和文件系统镜像(FsImage),并同步给其它服务器;

当 NameNode 失效时,其它服务器通过 FsImage 恢复 NameNode 到某个时间点,并从操作日志中该时间点开始重做进行 NameNode 上元数据信息的完整恢复。

其中:

- 操作日志记录了文件创建、删除、修改文件属性等操作

- FsImage 包含了完整的命名空间、File-Block 映射关系、文件的属性(ACL、quota、修改时间等)

另一种方法是:主从复制,主服务器宕机时,由从服务器接管主服务器,主从通过操作日志(share edit log)复制信息,ZooKeeper 进行失效切换

  • Client 失效:即数据写入到一半失效的数据一致性问题

  • 跨机架备份

MapReduce

MapReduce 基本概念


用于大规模分布式存储数据处理的编程模型或计算框架

特点:

  • 处理大规模数据(1TB)

  • 分布式 CPU 并行处理

原理:

  • 移动计算比移动数据更划算

  • 分而治之(Divide and Conquer)


作为编程模型的 MapReduce

以 WordCount(词频统计)为例

普通数据的处理方式:读入数据 -> 分词 -> 逐词累加该词出现次数(存储在“词-次”键值对哈希表中) -> 输出结果

大数据的处理方式:基于 MapReduce 的大数据计算框架(如 Spark、Flink 等)

基于 MapReduce 编程模型的代码:


WordCount Mapper:

public class WordCount {  public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {    private final static IntWritable one = new IntWritable(1);    private Text word = new Text();    public void map(Object key, Text value, Context context)    throws IOException, InterruptException {      StringTokenizer itr = new StringTokenizer(value.toString());      while (itr.hasMoreTokens()) {        word.set(itr.nextToken());        context.write(word, one);      }    }  }}
复制代码


(框架对所有 Mappers 的输出 key-value 进行分类合并成 key-values 输入到同一个 Reducers)

(MapReduce 的核心关键点就在于:Shuffle:Mappers 的输出,相同的 key 合并输入到相同的 Reducer)


WrodSum Reducer:

public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable>   private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptException { int sum = 0; for (IntWritable val: values) { sum += val.get(); } result.set(sum); context.write(key, result); } }
复制代码


main 函数:

public static void main(String[] args) throws Exception {  Configuration conf = new Configuration();  // 得到集群配置参数  Job job = Job.getInstance(conf, "WordCount");  // 设置到本次的 job 实例中  job.setJarByClass(WordCount.class);  // 指定本次执行的主类是 WordCount  job.setMapperClass(TokenizerMapper.class);  // 指定 map 类  job.setCombinerClass(IntSumReducer.class);  // 指定 combiner 类  job.setReducerClass(IntSumReducer.class);  // 指定 reducer 类  job.setOutputKeyClass(Text.class);  job.setOutputValueClass(IntWritable.class);  FileInputFormat.addInputPath(job, new Path(args[0]));  // 指定输入数据的路径  FileOutputFormat.setOutputPath(job, new Path(args[1]));  // 指定输出路径  System.exit(job.waitForCompletion(true) ? 0: 1);  // 指定 job 执行模式,等待任务执行完成后,提交任务的客户端才会退出}
复制代码


MapReduce 编程模型


作为架构师,进行架构设计或参加架构评审,对 MapReduce 等大数据技术应该掌握到何种程度?

答:应该抓住核心关键点进行掌握。如:

  • MapReduce 的编程模型是什么样子的

  • MapReduce 的输入输出是什么样子的

  • MapReduce 框架是如何完成这些技术处理的,如何把这些 Map 和 Reduce 函数在整个分布式集群中调度起来的

再通过自己的架构思想对这些关键点进行推导和分析,就可以快速把大数据技术掌握起来,否则就会迷失在技术细节中


[图 1]


[图 2]


[图 3]


[图 4]


用自己的语言描述以上三幅图的原理。

MapReduce 框架架构


[系统协作图]


  1. 大数据应用程序服务器 Main 函数启动 JobClient 将 Jar 包分发到 HDFS DataNode 存储为 Job Jar 文件

  2. 上述 Main 函数向 JobTracker 服务器提交作业

注:在整个 MapReduce 大数据服务器集群中,只有一台 JobTracker 服务器,负责整个集群的作业管理和调度

JobTracker 服务器内部由 JobScheduler 对所有提交的作业进行调度管理、排队处理、安排运行

  1. 当作业被安排开始运行时,JobScheduler 就会为该作业创建一个 JobInProcess 数据结构(由数据块组成的树状结构)

JobInProcess 树状结构分三层:根节点(JobInProcess)、中间节点(TaskInProcess)、叶节点(TaskAttempt)

  1. JobScheduler 再根据 JobInProcess 中的数据块分别创建其各自的 TaskInProcess 数据结构用于运行 Map 或 Reduce 任务

注:在 JobInProcess 中,每个叶子节点都是一个 Map 或 Reduce 任务(TaskInProcess),其子节点还可能包括一个或多个 TaskAttempt

当 TaskInProcess 任务执行失败时,就会执行这些多个 TaskAttempt 去完成同一个任务,所以正常情况下,只有 TaksInProcess 一个节点会被执行

TaskInProcess 数据中存放着 Map 或 Reduce 要处理的数据文件名、数据偏移量、数据长度等

  1. TaksInProcess 分发一个 task 任务到 TaskTracker 服务器,与上述 DataNode 是同一台服务器

即每一台服务器既运行着 DataNode 进程,也运行着 TaskTracker 进程

  1. 当 TaskTracker 空闲时,就会向 JobTracker 发送心跳,告知 JobTrakcer 自己当前的空闲状态,由 JobTracker 为它分配相应的任务

注:相应的任务是指该任务中指定的数据块所存储的 DataNode 的服务器正是该 TaskTracker 所在服务器(通过 IP 地址匹配)

  1. TaskTracker 领到任务后,就会启动一个 TaskRunner 子进程,子进程通过任务中指定的 Job Jar 包文件路径反射式地加载 Map 或 Reduce 函数并代入任务携带的参数运行

Map 任务携带的参数包括上述的:要处理的数据文件名、数据偏移量、数据长度等

Map 函数处理完数据后,输出会由 TaskTracker 复制给其它服务器,由各服务器进行 shuffle 后交给 Reduce 进行处理

注:任务和数据最好在一起,被称作数据的本地化(Data Locality),这可以通过设置合适的数据分片来确保

  1. 所有 TaskInProcess 执行完,则 JobInProcess 执行完


适合 MapReduce 的计算任务类型:

  • TopK(数据排序)

  • K-means(数据聚类)

  • Bayes(数据分类)

  • SQL(数据分类)

  • π


不适合的:(数据不适合分片,每条数据之间存在关联的)

  • Fibonacci

MapReduce 框架组件


inputFormater


outputFormater


partitioner

MapReduce 技术细节


JobScheduler 调度方法


JobTracker 内部实现


JobTracker 容错

  • JobTracker 失效,则未完成的 Job 亦失败

通过 Job 日志,Job 可部分恢复,重新执行一次 Job 会更好

  • 超时机制:TaskTracker 10 分钟(mapred.tasktracker.expirt.interval)未汇报心跳,则将其从集群中移除,其上未完成任务将在其它 TaskTracker 上重新运行

  • 灰名单和黑名单机制:在 TaskTracker 上部署性能监控脚本,表现太差则被 JobTracker 减少或暂停调度,避免个别性能差的 TaskTracker 拖慢整个 Job 作业的进度


Task 容错

  • 根据任务特性(如 TopK)允许部分 Task 失败,通过设置允许失败任务占比,即 mapred.max.map.failers.percent 默认 0

  • Task 由 TaskInProcess 监控,失败任务多次尝试(TaskAttempt),慢任务启动备份任务,最大允许尝试次数为 mapred.map.max.attempts


Record 容错

  • 跳过导致 Task 失败的坏记录,如:

- K,V 超大,导致 OOM,可配置最大长度(mapred.linerecordreader.maxlength),超出则截断

- 异常数据引发程序 bug,task 重试几次(mapred.skip.attempts.to.start.skipping)后,自动进入 skip mode,跳过导致失败的记录

YARN


分布式集群资源管理工具

Hive


支持 SQL 语法的大数据仓库


发布于: 2020 年 12 月 13 日阅读数: 22
用户头像

还未添加个人签名 2018.03.26 加入

还未添加个人简介

评论

发布
暂无评论
[架构师训练营第 1 期] 第12周学习总结