写点什么

Week 12 大数据应用

用户头像
evildracula
关注
发布于: 2021 年 01 月 10 日

大数据概述

大数据层次


HDFS

系统架构



设计目标

HDFS 以流式数据访问模式存储超大文件,运行于商用硬件集群上。

超大文件,流式数据访问(一次写入多次读取),商用硬件


假设:节点失效是常态

理想

1. 任何一个节点失效,不影响 HDFS 服务

2. HDFS 可以自动完成副本的复制


不适合 HDFS 场景

  • 低延迟的数据访问

  • 大量小文件(超过 namenode 处理能力)

  • 多用户随机写入修改文件


HDFS 为了做到可靠性(reliability)创建了多份数据块(data blocks)的复制(replicas ),并将它们放置在服务器群的计算节点中(compute nodes),MapReduce 就可以 在它们所在的节点上处理这些数据了。


文件

文件切分成块(默认大小 64M),以块为单位,每个块有多个副本存储在不同的机器上,副本数可在文件生成时指定(默认 3)

NameNode 是主节点,存储文件的元数据如文件名,文件目录结构,文件属性(生成时间,副本数,文件权限),以及每个文件的块列表以及块所在的 DataNode 等等

DataNode 在本地文件系统存储文件块数据,以及块数据的校验和可以创建、删除、移动或重命名文件,当文件创建、写入和关闭之后不能修改文件内容。


分而治之




NameNode

NameNode 是一个中心服务器,负责管理文件系统的名字空间(namespace)以及客户端对文件的访问。

文件操作,NameNode 负责文件元数据的操作,DataNode 负责处理文件内容的读写请求,跟文件内容相关的数据流不经过 NameNode,只会询问它跟那个 DataNode 联系,否则 NameNode 会成为系统的瓶颈。

副本存放在那些 DataNode 上由 NameNode 来控制,根据全局情况做出块放置决定,读取文件时 NameNode 尽量让用户先读取最近的副本,降低带块消耗和读取时延。

NameNode 全权管理数据块的复制,它周期性地从集群中的每个 DataNode 接收心跳信号和块状态报告(Blockreport)。接收到心跳信号意味着该 DataNode 节点工作正常。块状态报告包含了一个该 DataNode 上所有数据块的列表。


DataNode

一个数据块在 DataNode 以文件存储在磁盘上,包括两个文件,一个是数据本身,一个是元数据包括数据块的长度,块数据的校验和,以及时间戳。

DataNode 启动后向 NameNode 注册,通过后,周期性(1 小时)的向 NameNode 上报所有的块信息。

心跳是每 3 秒一次,心跳返回结果带有 NameNode 给该 DataNode 的命令如复制块数据到另一台机器,或删除某个数据块。如果超过 10 分钟没有收到某个 DataNode 的心跳,则认为该节点不可用。

集群运行中可以安全加入和退出一些机器。


HDFS 关键运行机制 - 高可用

一个名字节点和多个数据节点数据复制(冗余机制)

  • 存放的位置(机架感知策略)

故障检测

数据节点

  • 心跳包(检测是否宕机)

  • 块报告(安全模式下检测)

  • 数据完整性检测(校验和比较)

名字节点(日志文件,镜像文件)

空间回收机制

HDFS 写文件

  • 使用 HDFS 提供的客户端开发库 Client,向远程的 NameNode 发起 RPC 请求;

  • NameNode 会检查要创建的文件是否已经存在,创建者是否有权限进行操作,成功则会为文

  • 当客户端开始写入文件的时候,开发库会将文件切分成多个 packets,并在内部以数据队列" data queue "的形式管理这些 packets,并向 NameNode 申请新的 blocks,获取用来存储 replicas 的合适的 datanodes 列表,列表的大小根据在 NameNode 中对 replication 的设置而定。

  • 开始以 pipeline(管道)的形式将 packet 写入所有的 replicas 中。开发库把 packet 以流的方式写入第一个 datanode,该 datanode 把该 packet 存储之后,再将其传递给在此 pipeline 中的下一个 datanode,直到最后一个 datanode,这种写数据的方式呈流水线的形式。

  • 最后一个 datanode 成功存储之后会返回一个 ackpacket,在 pipeline 里传递至客户端,在客户端的开发库内部维护着 “ack queue” ,成功收到 datanode 返回的 ack packet 后会从“ack queue” 移除相应的 packet。

  • 如果传输过程中,有某个 datanode 出现了故障,那么当前的 pipeline 会被关闭,出现故障的 datanode 会从当前的 pipeline 中移除,剩余的 block 会继续剩下的 datanode 中继续以 pipeline 的形式传输,同时 NameNode 会分配一个新的 datanode,保持 replicas 设定的数量。




HDFS 读文件

  • 使用 HDFS 提供的客户端开发库 Client,向远程的 NameNode 发起 RPC 请求;

  • NameNode 会视情况返回文件的部分或者全部 block 列表,对于每个 block,

  • 客户端开发库 Client 会选取离客户端最接近的 DataNode 来读取 block;如果客户端

  • 读取完当前 block 的数据后,关闭与当前的 DataNode 连接,并为读取下一个 block

  • 当读完列表的 block 后,且文件读取还没有结束,客户端开发库会继续向 NameNode

  • 读取完一个 block 都会进行 checksum 验证,如果读取 DataNode 时出现错误,客户端会通知 NameNode,然后再从下一个拥有该 block 拷贝的 DataNode 继续读。


节点失效

DataNode 中的磁盘挂了怎么办?

• DataNode

• 坏掉的磁盘上的数据尽快通知 NameNode


DataNode 所在机器挂了怎么办?

Datanode 每 3 秒钟向 NameNode 发送心跳,如果 10 分钟 DataNode 没有向 NameNode 发送心跳,则 NameNode 认为该 DataNode 已经 dead, NameNode 将取出该 DataNode 上对应的 block,对其进行复制。


NameNode 挂了怎么办?

持久化数据

  • 操作日志(edit log)

  • 记录文件创建,删除,修改文件属性等操作

  • FSImage

  • 包含完整的命名空间

  • File -> Block 的映射关系

  • 文件的属性(ACL, quota, 修改时间等)



Client 挂了怎么办?

HDFS 一致性模型

  • 文件创建以后,不保证在 NameNode 立即可见,即使文件刷新并存储,文件长度依然可能为 0

  • 当写入数据超过一个块后,新的 reader 可以看见第一个块,reader 不能看见当前正

  • HDFS 提供 sync() 方法强制缓存与数据节点同步,sync() 调用成功后,当前写入数据对所有 reader 可见且一致

  • 调用 sync() 会导致额外的开销.

副本摆放

压缩

减少存储所需的磁盘空间

加速数据在网络和磁盘上的传输

SequenceFile


Hadoop 文件系统

文件系统抽象(org.apache.hadoop)

  • fs.FileSystem

  • fs.LocalFileSystem

  • hdfs.DistributedFileSystem

  • hdfs.HftpFileSystem

  • hdfs.HsftpFileSystem

  • fs.HarFileSystem


MapReduce

它由称为 map 和 reduce 的两部分用户程序组成,然后利用框架在计算机集群上面根据需求运行多个程序实例来处理各个子任务,然后再对结果进行归并。


特性

  • 自动实现分布式并行计算

  • 容错

  • 提供状态监控工具

  • 模型抽象简洁,程序员易用


WordCount



架构




主要调用方法

单队列调度

  • 特点:FIFO

  • 优点:简单

  • 缺点:资源利用率低

容量调度(Capacity Scheduler ,Hadoop-0.19.0 )

  • 特点:

  • 多队列,每个队列分配一定系统容量(GuaranteedCapacity)

  • 空闲资源可以被动态分配给负载重的队列

  • 支持作业优先级

  • 作业选择:

  • 选择队列:资源回收请求队列优先;最多自由空间队列优先。

  • 选择作业:按提交时间、优先级排队;检查用户配额;检查内存。

  • 优点:

  • 支持多作业并行执行,提高资源利用率

  • 动态调整资源分配,提高作业执行效率

  • 缺点:

  • 队列设置和队列选择无法自动进行,用户需要了解大量系统信息


公平调度(Fair Scheduler,Hadoop-0.19.0)

  • 目标:

  • 改善小作业的响应时间

  • 确保生产性作业的服务水平

  • 特点:

  • 将作业分组——形成作业池(based on a configurable attribute , such as user name, unix group,...)

  • 给每个作业池分配最小共享资源(Minimum map slots, Minimum reduce slots )

  • 将多余的资源平均分配给每个作业

  • 作业选择:

  • 优先调度资源小于最小共享资源的作业

  • 选择分配资源与所需资源差距最大的作业

  • 优点:

  • 支持作业分类调度,使不同类型的作业获得不同的资源分配,提高服务质量

  • 动态调整并行作业数量,充分利用资源

  • 缺点:

  • 不考虑节点的实际负载状态,导致节点负载实际不均衡


JobTracker 内部实现

作业控制

  • 作业抽象成三层:作业监控层(JIP),任务控制层(TIP),任务执行层。

  • 任务可能会被尝试多次执行,每个任务实例被称作 Task Attempt(TA)

  • TA 成功,TIP 会标注该任务成功,所有 TIP 成功,JIP 成功

资源管理

  • 根据 TaskTracker 状态信息进行任务分配


JobTracker 容错

  • JobTracker 失败,那么未完成 Job 失败;

  • 通过 Job 日志,Job 可部分恢复。


TaskTracker 容错

  • 超时

  • TaskTracker 10 分钟(mapred.tasktracker.expiry.interval)未汇报心跳,则将其从集群移除

  • 灰名单,黑名单

  • TaskTracker 上部署性能监控脚本

  • 如果性能表现太差,被 JobTacker 暂停调度


Task 容错

允许部分 Task 失败

• 允许失败的任务占比,默认 0,Mapred.max.map.failers.percent,mapred.max.reduce.failures.percent

Task 由 TIP 监控,失败任务多次尝试,慢任务启动备份任务

• 每次都是一个 TA(Task Attempt),最大允许尝试次数:mapred.map.max.attempts,mapred.reduce.max.attempts


Record 容错

跳过导致 Task 失败的坏记录

  • K,V 超大,导致 OOM,配置最大长度,超出截断 mapred.linercordreader.maxlength。

  • 异常数据引发程序 bug,task 重试几次后,自动进入 skip mode,跳过导致失败的记录,mapred.skip.attempts.to.start.skipping 。


Yarn

MapReduce 的架构,在 MapReduce 应用程序的启动过程中,最重要的就是要把 MapReduce 程序分发到大数据集群的服务器上,在 Hadoop 1 中,这个过程主要是通 过 TaskTracker 和 JobTracker 通信来完成。

这种架构方案的主要缺点是,服务器集群资源调度管理和 MapReduce 执行过程耦合在 一起,如果想在当前集群中运行其他计算任务,比如 Spark 或者 Storm,就无法统一使 用集群中的资源了。


在 Hadoop 早期的时候,大数据技术就只有 Hadoop 一家,这个缺点并不明显。但随着 大数据技术的发展,各种新的计算框架不断出现,我们不可能为每一种计算框架部署一 个服务器集群,而且就算能部署新集群,数据还是在原来集群的 HDFS 上。所以我们需 要把 MapReduce 的资源管理和计算框架分开,这也是 Hadoop 2 最主要的变化,就是 将 Yarn 从 MapReduce 中分离出来,成为一个独立的资源调度框架。


MRv2 最基本的设计思想是将 JobTracker 的两个主要功能,即资源管理和作业管理分成两个独立的进程。

• 在该解决方案中包含两个组件:全局的 ResourceManager(RM)和与每个应用相关的 ApplicationMaster(AM)

• 这里的“应用”指一个单独的 MapReduce 作业或者 DAG 作业


架构


Yarn 包括两个部分:一个是资源管理器(Resource Manager),一个是节点管理器(Node Manager)。


这也是 Yarn 的两种主要进程:

  • ResourceManager 进程负责整个集群的资源调度管理,通常部署在独立的服务器上;

  • 调度器:调度器其实就是一个资源分配算法,根据应用程序(Client)提交的资源申请和当前服务器集群的资源状况进行资源分配。Yarn 内置了几种资源调度算法,包括 Fair Scheduler、Capacity Scheduler 等,你也可以开发自己的资源调度算法供 Yarn 调用。

  • Yarn 进行资源分配的单位是容器(Container),每个容器包含了一定量的内存、CPU 等计算资源,默认配置下,每个容器包含一个 CPU 核心。容器由 NodeManager 进程启动和管理,NodeManger 进程会监控本节点上容器的运行状况并向 ResourceManger 进程汇报。


  • 应用程序管理器:负责应用程序的提交、监控应用程序运行状态等。应用程序启动后需要在集群中运行一个 ApplicationMaster,ApplicationMaster 也需要运行在容器里面。每个应用程序启动后都会先启动自己的 ApplicationMaster,由 ApplicationMaster 根据应用程序的资源需求进一步向 ResourceManager 进程申请容器资源,得到容器以后就会分发自己的应用程序代码到容器上启动,进而开始分布式计算。

  • NodeManager 进程负责具体服务器上的资源和任务管理,在集群的每一台计算服务器上都会启动,基本上跟 HDFS 的 DataNode 进程一起出现。


工作流(MapReduce 为例)

  1. 我们向 Yarn 提交应用程序,包括 MapReduce ApplicationMaster、我们的 MapReduce 程序,以及 MapReduce Application 启动命令。

  2. ResourceManager 进程和 NodeManager 进程通信,根据集群资源,为用户程序分 配第一个容器,并将 MapReduce ApplicationMaster 分发到这个容器上面,并在容 器里面启动 MapReduce ApplicationMaster。

  3. MapReduce ApplicationMaster 启动后立即向 ResourceManager 进程注册,并为自 己的应用程序申请容器资源。

  4. MapReduce ApplicationMaster 申请到需要的容器后,立即和相应的 NodeManager 进程通信,将用户 MapReduce 程序分发到 NodeManager 进程所在服务器,并在容 器中运行,运行的就是 Map 或者 Reduce 任务。

  5. Map 或者 Reduce 任务在运行期和 MapReduce ApplicationMaster 通信,汇报自己 的运行状态,如果运行结束,MapReduce ApplicationMaster 向 ResourceManager 进程注销并释放所有的容器资源。


资源管理器 HA


Hive


Hive 架构



Hive 执行流程

  • 操作符(Operator)是 Hive 的最小处理单元

  • 每个操作符处理代表 HDFS 操作或 MR 作业

  • 编译器把 Hive SQL 转换成一组操作符


Hive 编译器

  • Parser:SQL 转换为抽象语法树(AST)

  • SemanOc Analyzer: 把抽象语法树转化为查询块(QB)

  • Logic Plan Generator: 把 QB 转化为逻辑执行计划(Logical Plan)

  • Logic Optimizer: 重写执行计划,带入更多的优化后的计划。

  • Physical Plan Generator:将逻辑执行计划转化为物理执行计划(M/R jobs)

  • Physical Op Omizer:适应性 Join 策略调整


Hive Metastore

Single User Mode


Multi User Mode


Remote User


发布于: 2021 年 01 月 10 日阅读数: 13
用户头像

evildracula

关注

还未添加个人签名 2019.07.29 加入

还未添加个人简介

评论

发布
暂无评论
Week 12 大数据应用