写点什么

架构师训练营—第十二周学习总结

用户头像
Geek_shu1988
关注
发布于: 2020 年 12 月 13 日

本周开始讲大数据平台,是这个架构师训练营我最喜欢听的部分,因为跟工作中遇到的戚戚相关。本周主要介绍了大数据相关的存储系统(HDFS)、编程模型和计算框架(MapReduce)以及相关工具(YARN、Hive)。



HDFS(Hadoop Distributed File System)



大数据的基础是数据存储,有了数据才能够进行计算,有了计算才能够支持应用。

HDFS 的系统架构

NameNode 存储文件系统的元数据(文件分配表、文件控制块等),对文件系统的数据进行管理(接收客户端申请、分配磁盘资源等)

DataNode 存储文件系统的具体数据,接收客户端的读写请求,进行实际的读写操作并返回响应给客户端

Client 接收用户的读写请求,并在必要的时候向 NameNode 发送申请,向 DataNode 发送读写请求

默认配置情况下,DataNode 上的每一份数据都会在另外两台 DataNode 服务器上各存在一个备份

HDFS 的设计目标

以流式数据访问模式存储超大文件,运行于商用硬件集群上(节点失效是常态)。

允许商用硬件失效宕机,由 HDFS 来处理硬件失效宕机的情况(自动备份和切换)。

流式数据访问模式即:批量顺序,一次写入,多次读取;一次写入即不再修改。

它主要是一个用于大数据计算的,而非随机访问的存储系统。

最佳实践是不建议修改已存储的数据,多用户随机写入、快速数据访问也不现实,小文件存储不合适。

HDFS 的数据一般是利用 MapReduce 生成写入的,再利用 MapReduce 读取。

因此:HDFS 主要是面向 MapReduce 或 Spark 应用的,而非直接面向开发者接口编程使用的。

HDFS 的失效策略

  • DataNode 上的磁盘失效:DataNode 正常工作,尽快通知 NameNode,由 NameNode 调度其它 DataNode 再形成三个备份,继续保证文件系统高可用。

  • DataNode 的服务器失效:DataNode 正常情况下每 3 秒钟向 NameNode 发送心跳,当 NameNode 10 分钟没有收到心跳,则认为相应 DataNode 已经失效,并取出该 DataNode 上对应的数据 block,找到对应的备份,调度其它 DataNode 再形成三个备份,继续保证文件系统高可用。

  • NameNode 的服务器失效:

一种方法是:NameNode 运行过程中会持久化一份操作日志(edit log)和文件系统镜像(FsImage),并同步给其它服务器;

当 NameNode 失效时,其它服务器通过 FsImage 恢复 NameNode 到某个时间点,并从操作日志中该时间点开始重做进行 NameNode 上元数据信息的完整恢复。

其中:

- 操作日志记录了文件创建、删除、修改文件属性等操作

- FsImage 包含了完整的命名空间、File-Block 映射关系、文件的属性(ACL、quota、修改时间等)

另一种方法是:主从复制,主服务器宕机时,由从服务器接管主服务器,主从通过操作日志(share edit log)复制信息,ZooKeeper 进行失效切换

  • Client 失效:即数据写入到一半失效的数据一致性问题

  • 跨机架备份

MapReduce

MapReduce 基本概念



用于大规模分布式存储数据处理的编程模型或计算框架

特点:

  • 处理大规模数据(1TB)

  • 分布式 CPU 并行处理

原理:

  • 移动计算比移动数据更划算

  • 分而治之(Divide and Conquer)



作为编程模型的 MapReduce

以 WordCount(词频统计)为例

普通数据的处理方式:读入数据 -> 分词 -> 逐词累加该词出现次数(存储在“词-次”键值对哈希表中) -> 输出结果

大数据的处理方式:基于 MapReduce 的大数据计算框架(如 Spark、Flink 等)

基于 MapReduce 编程模型的代码:



WordCount Mapper:

public class WordCount {  public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {    private final static IntWritable one = new IntWritable(1);    private Text word = new Text();    public void map(Object key, Text value, Context context)    throws IOException, InterruptException {      StringTokenizer itr = new StringTokenizer(value.toString());      while (itr.hasMoreTokens()) {        word.set(itr.nextToken());        context.write(word, one);      }    }  }}
复制代码



(框架对所有 Mappers 的输出 key-value 进行分类合并成 key-values 输入到同一个 Reducers)

(MapReduce 的核心关键点就在于:Shuffle:Mappers 的输出,相同的 key 合并输入到相同的 Reducer)



WrodSum Reducer:

public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable>   private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptException { int sum = 0; for (IntWritable val: values) { sum += val.get(); } result.set(sum); context.write(key, result); } }
复制代码



main 函数:

public static void main(String[] args) throws Exception {  Configuration conf = new Configuration();  // 得到集群配置参数  Job job = Job.getInstance(conf, "WordCount");  // 设置到本次的 job 实例中  job.setJarByClass(WordCount.class);  // 指定本次执行的主类是 WordCount  job.setMapperClass(TokenizerMapper.class);  // 指定 map 类  job.setCombinerClass(IntSumReducer.class);  // 指定 combiner 类  job.setReducerClass(IntSumReducer.class);  // 指定 reducer 类  job.setOutputKeyClass(Text.class);  job.setOutputValueClass(IntWritable.class);  FileInputFormat.addInputPath(job, new Path(args[0]));  // 指定输入数据的路径  FileOutputFormat.setOutputPath(job, new Path(args[1]));  // 指定输出路径  System.exit(job.waitForCompletion(true) ? 0: 1);  // 指定 job 执行模式,等待任务执行完成后,提交任务的客户端才会退出}
复制代码



MapReduce 编程模型



作为架构师,进行架构设计或参加架构评审,对 MapReduce 等大数据技术应该抓住核心关键点进行掌握。如:

  • MapReduce 的编程模型是什么样子的

  • MapReduce 的输入输出是什么样子的

  • MapReduce 框架是如何完成这些技术处理的,如何把这些 Map 和 Reduce 函数在整个分布式集群中调度起来的。

MapReduce 框架架构

适合 MapReduce 的计算任务类型:

  • TopK(数据排序)

  • K-means(数据聚类)

  • Bayes(数据分类)

  • SQL(数据分类)

  • π



不适合的:(数据不适合分片,每条数据之间存在关联的)

  • Fibonacci

MapReduce 框架组件



inputFormater

outputFormater

partitioner

MapReduce 技术细节



JobScheduler 调度方法

JobTracker 内部实现

JobTracker 容错

  • JobTracker 失效,则未完成的 Job 亦失败

通过 Job 日志,Job 可部分恢复,重新执行一次 Job 会更好

  • 超时机制:TaskTracker 10 分钟(mapred.tasktracker.expirt.interval)未汇报心跳,则将其从集群中移除,其上未完成任务将在其它 TaskTracker 上重新运行

  • 灰名单和黑名单机制:在 TaskTracker 上部署性能监控脚本,表现太差则被 JobTracker 减少或暂停调度,避免个别性能差的 TaskTracker 拖慢整个 Job 作业的进度



Task 容错

  • 根据任务特性(如 TopK)允许部分 Task 失败,通过设置允许失败任务占比,即 mapred.max.map.failers.percent 默认 0

  • Task 由 TaskInProcess 监控,失败任务多次尝试(TaskAttempt),慢任务启动备份任务,最大允许尝试次数为 mapred.map.max.attempts



Record 容错

  • 跳过导致 Task 失败的坏记录,如:

- K,V 超大,导致 OOM,可配置最大长度(mapred.linerecordreader.maxlength),超出则截断

- 异常数据引发程序 bug,task 重试几次(mapred.skip.attempts.to.start.skipping)后,自动进入 skip mode,跳过导致失败的记录

YARN



分布式集群资源管理工具

Hive



支持 SQL 语法的大数据仓库



用户头像

Geek_shu1988

关注

还未添加个人签名 2020.02.02 加入

还未添加个人简介

评论

发布
暂无评论
架构师训练营—第十二周学习总结