[架构师训练营第 1 期] 第 12 周学习总结
本周主要介绍了大数据相关的存储系统(HDFS)、编程模型和计算框架(MapReduce)以及相关工具(YARN、Hive)。
HDFS(Hadoop Distributed File System)
大数据的基础是数据存储,有了数据才能够进行计算,有了计算才能够支持应用。
那么如何进行数据存储,数据存储有哪些特性?
或者说,如何进行数据存储才能保证数据的高速读写和高可用性?
HDFS 的系统架构
NameNode 存储文件系统的元数据(文件分配表、文件控制块等),对文件系统的数据进行管理(接收客户端申请、分配磁盘资源等)
DataNode 存储文件系统的具体数据,接收客户端的读写请求,进行实际的读写操作并返回响应给客户端
Client 接收用户的读写请求,并在必要的时候向 NameNode 发送申请,向 DataNode 发送读写请求
默认配置情况下,DataNode 上的每一份数据都会在另外两台 DataNode 服务器上各存在一个备份
HDFS 的设计目标
以流式数据访问模式存储超大文件,运行于商用硬件集群上(节点失效是常态)。
允许商用硬件失效宕机,由 HDFS 来处理硬件失效宕机的情况(自动备份和切换)。
流式数据访问模式即:批量顺序,一次写入,多次读取;一次写入即不再修改。
它主要是一个用于大数据计算的,而非随机访问的存储系统。
因此:最佳实践是不建议修改已存储的数据,多用户随机写入、快速数据访问也不现实,小文件存储不合适。
HDFS 的数据一般是利用 MapReduce 生成写入的,再利用 MapReduce 读取。
因此:HDFS 主要是面向 MapReduce 或 Spark 应用的,而非直接面向开发者接口编程使用的。
HDFS 的失效策略
DataNode 上的磁盘失效:DataNode 正常工作,尽快通知 NameNode,由 NameNode 调度其它 DataNode 再形成三个备份,继续保证文件系统高可用。
DataNode 的服务器失效:DataNode 正常情况下每 3 秒钟向 NameNode 发送心跳,当 NameNode 10 分钟没有收到心跳,则认为相应 DataNode 已经失效,并取出该 DataNode 上对应的数据 block,找到对应的备份,调度其它 DataNode 再形成三个备份,继续保证文件系统高可用。
NameNode 的服务器失效:
一种方法是:
NameNode 运行过程中会持久化一份操作日志(edit log)和文件系统镜像(FsImage),并同步给其它服务器;
当 NameNode 失效时,其它服务器通过 FsImage 恢复 NameNode 到某个时间点,并从操作日志中该时间点开始重做进行 NameNode 上元数据信息的完整恢复。
其中:
- 操作日志记录了文件创建、删除、修改文件属性等操作
- FsImage 包含了完整的命名空间、File-Block 映射关系、文件的属性(ACL、quota、修改时间等)
另一种方法是:主从复制,主服务器宕机时,由从服务器接管主服务器,主从通过操作日志(share edit log)复制信息,ZooKeeper 进行失效切换
Client 失效:即数据写入到一半失效的数据一致性问题
跨机架备份
MapReduce
MapReduce 基本概念
用于大规模分布式存储数据处理的编程模型或计算框架
特点:
处理大规模数据(1TB)
分布式 CPU 并行处理
原理:
移动计算比移动数据更划算
分而治之(Divide and Conquer)
作为编程模型的 MapReduce
以 WordCount(词频统计)为例
普通数据的处理方式:读入数据 -> 分词 -> 逐词累加该词出现次数(存储在“词-次”键值对哈希表中) -> 输出结果
大数据的处理方式:基于 MapReduce 的大数据计算框架(如 Spark、Flink 等)
基于 MapReduce 编程模型的代码:
WordCount Mapper:
(框架对所有 Mappers 的输出 key-value 进行分类合并成 key-values 输入到同一个 Reducers)
(MapReduce 的核心关键点就在于:Shuffle:Mappers 的输出,相同的 key 合并输入到相同的 Reducer)
WrodSum Reducer:
main 函数:
MapReduce 编程模型
作为架构师,进行架构设计或参加架构评审,对 MapReduce 等大数据技术应该掌握到何种程度?
答:应该抓住核心关键点进行掌握。如:
MapReduce 的编程模型是什么样子的
MapReduce 的输入输出是什么样子的
MapReduce 框架是如何完成这些技术处理的,如何把这些 Map 和 Reduce 函数在整个分布式集群中调度起来的
等
再通过自己的架构思想对这些关键点进行推导和分析,就可以快速把大数据技术掌握起来,否则就会迷失在技术细节中
[图 1]
[图 2]
[图 3]
[图 4]
用自己的语言描述以上三幅图的原理。
MapReduce 框架架构
[系统协作图]
大数据应用程序服务器 Main 函数启动 JobClient 将 Jar 包分发到 HDFS DataNode 存储为 Job Jar 文件
上述 Main 函数向 JobTracker 服务器提交作业
注:在整个 MapReduce 大数据服务器集群中,只有一台 JobTracker 服务器,负责整个集群的作业管理和调度
JobTracker 服务器内部由 JobScheduler 对所有提交的作业进行调度管理、排队处理、安排运行
当作业被安排开始运行时,JobScheduler 就会为该作业创建一个 JobInProcess 数据结构(由数据块组成的树状结构)
JobInProcess 树状结构分三层:根节点(JobInProcess)、中间节点(TaskInProcess)、叶节点(TaskAttempt)
JobScheduler 再根据 JobInProcess 中的数据块分别创建其各自的 TaskInProcess 数据结构用于运行 Map 或 Reduce 任务
注:在 JobInProcess 中,每个叶子节点都是一个 Map 或 Reduce 任务(TaskInProcess),其子节点还可能包括一个或多个 TaskAttempt
当 TaskInProcess 任务执行失败时,就会执行这些多个 TaskAttempt 去完成同一个任务,所以正常情况下,只有 TaksInProcess 一个节点会被执行
TaskInProcess 数据中存放着 Map 或 Reduce 要处理的数据文件名、数据偏移量、数据长度等
TaksInProcess 分发一个 task 任务到 TaskTracker 服务器,与上述 DataNode 是同一台服务器
即每一台服务器既运行着 DataNode 进程,也运行着 TaskTracker 进程
当 TaskTracker 空闲时,就会向 JobTracker 发送心跳,告知 JobTrakcer 自己当前的空闲状态,由 JobTracker 为它分配相应的任务
注:相应的任务是指该任务中指定的数据块所存储的 DataNode 的服务器正是该 TaskTracker 所在服务器(通过 IP 地址匹配)
TaskTracker 领到任务后,就会启动一个 TaskRunner 子进程,子进程通过任务中指定的 Job Jar 包文件路径反射式地加载 Map 或 Reduce 函数并代入任务携带的参数运行
Map 任务携带的参数包括上述的:要处理的数据文件名、数据偏移量、数据长度等
Map 函数处理完数据后,输出会由 TaskTracker 复制给其它服务器,由各服务器进行 shuffle 后交给 Reduce 进行处理
注:任务和数据最好在一起,被称作数据的本地化(Data Locality),这可以通过设置合适的数据分片来确保
所有 TaskInProcess 执行完,则 JobInProcess 执行完
适合 MapReduce 的计算任务类型:
TopK(数据排序)
K-means(数据聚类)
Bayes(数据分类)
SQL(数据分类)
π
不适合的:(数据不适合分片,每条数据之间存在关联的)
Fibonacci
MapReduce 框架组件
inputFormater
outputFormater
partitioner
MapReduce 技术细节
JobScheduler 调度方法
JobTracker 内部实现
JobTracker 容错
JobTracker 失效,则未完成的 Job 亦失败
通过 Job 日志,Job 可部分恢复,重新执行一次 Job 会更好
超时机制:TaskTracker 10 分钟(mapred.tasktracker.expirt.interval)未汇报心跳,则将其从集群中移除,其上未完成任务将在其它 TaskTracker 上重新运行
灰名单和黑名单机制:在 TaskTracker 上部署性能监控脚本,表现太差则被 JobTracker 减少或暂停调度,避免个别性能差的 TaskTracker 拖慢整个 Job 作业的进度
Task 容错
根据任务特性(如 TopK)允许部分 Task 失败,通过设置允许失败任务占比,即 mapred.max.map.failers.percent 默认 0
Task 由 TaskInProcess 监控,失败任务多次尝试(TaskAttempt),慢任务启动备份任务,最大允许尝试次数为 mapred.map.max.attempts
Record 容错
跳过导致 Task 失败的坏记录,如:
- K,V 超大,导致 OOM,可配置最大长度(mapred.linerecordreader.maxlength),超出则截断
- 异常数据引发程序 bug,task 重试几次(mapred.skip.attempts.to.start.skipping)后,自动进入 skip mode,跳过导致失败的记录
YARN
分布式集群资源管理工具
Hive
支持 SQL 语法的大数据仓库
版权声明: 本文为 InfoQ 作者【猫切切切切切】的原创文章。
原文链接:【http://xie.infoq.cn/article/9506f6c080a895facca4cbb7f】。文章转载请联系作者。
评论