写点什么

极客时间架构师训练营 1 期 - 第 12 周总结

用户头像
Kaven
关注
发布于: 2020 年 12 月 13 日

大数据发展史


  1. 今天我们常说的大数据技术,其实起源于 Google 在 2004 年前后发表的三篇论文,也就 是我们经常听到的大数据“三驾马车”,分别是分布式文件系统 GFS、大数据分布式计 算框架 MapReduce 和 NoSQL 数据库系统 BigTable。

搜索引擎主要就做两件事情,一个是网页抓取,一个是索引构建和排序,而在这个过程 中,有大量的数据需要存储和计算。这“三驾马车”其实就是用来解决这个问题的,也 就是,一个文件系统、一个计算框架、一个数据库系统。

浏览下 Hadoop 的代码,这个纯用 Java 编写的软件其实并没有什么高深的技术难点, 使用的也都是一些基础的编程技巧,也没有什么出奇之处,但是它却给社会带来巨大 的影响,甚至带动一场深刻的科技革命,推动了人工智能的发展与进步

Lucene 开源项目的创始人 Doug Cutting 当时正在开发开源搜索引擎 Nutch,阅读了 Google 的论文后,根据论文原理初步实现了类似 GFS 和 MapReduce 的功能。

2006 年,Doug Cutting 将这些大数据相关的功能从 Nutch 中分离了出来,然后启动了 一个独立的项目专门开发维护大数据技术,这就是后来赫赫有名的 Hadoop,主要包括 Hadoop 分布式文件系统 HDFS 和大数据计算引擎 MapReduce。

Hadoop 发布之后,Yahoo 首先用了起来

  1. 2008 年,Hadoop 正式成为 Apache 的顶级项目,后来 Doug Cutting 本人也成为了 Apache 基金会的主席。自此,Hadoop 作为软件开发领域的一颗明星冉冉升起。

同年,专门运营 Hadoop 的商业公司 Cloudera 成立,Hadoop 得到进一步的商业支持。

  1. Yahoo 的一些人觉得用 MapReduce 进行大数据编程太麻烦了,于是便开发 了 Pig

Facebook 又发布了 Hive。Hive 支持使用 SQL 语法来进行大数据计算,比如说你可 以写个 Select 语句进行数据查询,然后 Hive 会把 SQL 语句转化成 MapReduce 的计算 程序。

  1. 在 Hadoop 早期,MapReduce 既是一个执行引擎,又是一个资源调度框架,服务器集 群的资源调度管理由 MapReduce 自己完成。但是这样不利于资源复用,也使得 MapReduce 非常臃肿。于是一个新项目启动了,将 MapReduce 执行引擎和资源调度 分离开来,这就是 Yarn。2012 年,Yarn 成为一个独立的项目开始运营,随后被各类大 数据产品支持,成为大数据平台上主流的资源调度系统。

  2. 在 2012 年,UC 伯克利 AMP 实验室(Algorithms、Machine 和 People 的缩写) 开发的 Spark 开始崭露头角


----

一般说来,像 MapReduce、Spark 这类计算框架处理的业务场景都被称作批处理计算


而在大数据领域,还有另外一类应用场景,它们需要对实时产生的大量数据进行即时计 算,比如对于遍布城市的监控摄像头进行人脸识别和嫌犯追踪。这类计算称为大数据流 计算,相应地,有 Storm、Flink、Spark Streaming 等流计算框架来满足此类大数据应 用的场景。流式计算要处理的数据是实时在线产生的数据,所以这类计算也被称为大数据实时计算


NoSQL 系统处理的主要也是大规模海量数据的存储与访问,所以也被归为大数据技术。

NoSQL 曾经在 2011 年左右非常火爆,涌现出 HBase、Cassandra 等许多优秀的产品, 其中 HBase 是从 Hadoop 中分离出来的、基于 HDFS 的 NoSQL 系统。


上面这些基本上都可以归类为大数据引擎或者大数据框架。而大数据处理的主要应用场 景包括数据分析、数据挖掘与机器学习。数据分析主要使用 Hive、Spark SQL 等 SQL 引擎完成;数据挖掘与机器学习则有专门的机器学习框架 TensorFlow、Mahout 以及 MLlib 等,内置了主要的机器学习和数据挖掘算法。



大数据应用发展史


  1. 大数据应用的搜索引擎时代

  2. 大数据应用的数据仓库时代

  3. 大数据应用的数据挖掘时代

  4. 大数据应用的机器学习时代


大数据应用领域


  1. 医学影像智能识别

  2. 病历大数据智能诊疗

  3. AI 外语老师

  4. 智能解题

  5. 舆情监控与分析

  6. 大数据风控

  7. 新零售

  8. 无人驾驶


------

HDFS 系统架构




HDFS 设计目标

HDFS 定义

HDFS 以流式数据访问模式存储超大文件,运行于商用硬件集群上。

  • 超大文件

  • 流式数据访问-- 一次写入多次读取

  • 商用硬件


不适合 HDFS 的场景
  • 低延迟的数据访问

  • 大量小文件- 超过 NameNode 的处理能力

  • 多用户随机写入修改文件

HDFS 为了做到可靠性(reliability)创建了多份数据块(data blocks)的复制(replicas ),并将它们放置在服务器群的计算节点中(compute nodes),MapReduce 就可以 在它们所在的节点上处理这些数据了。


设计目标

假设:节点失效是常态

理想:

  1. 任何一个节点失效,不影响 HDFS 服务

  2. HDFS 可以自动完成副本的复制

文件存储:

文件切分成块(默认大小 64M),以块为单位,每个块有多个副本存储在不同的机器上, 副本数可在文件生成时指定(默认 3)

NameNode 是主节点,存储文件的元数据如文件名,文件目录结构,文件属性(生成时 间,副本数,文件权限),以及每个文件的块列表以及块所在的 DataNode 等等

DataNode 在本地文件系统存储文件块数据,以及块数据的校验和

可以创建、删除、移动或重命名文件,当文件创建、写入和关闭之后不能修改文件内容。

分而治之(Divide and Conquer)

数据存储,分片处理,且存有多份备份

NameNode

NameNode 是一个中心服务器,负责管理文件系统的名字空间(namespace)以及客户 端对文件的访问。 文件操作,NameNode 负责文件元数据的操作,DataNode 负责处理文件内容的读写请 求,跟文件内容相关的数据流不经过 NameNode,只会询问它跟那个 DataNode 联系, 否则 NameNode 会成为系统的瓶颈。


副本存放在那些 DataNode 上由 NameNode 来控制,根据全局情况做出块放置决定, 读取文件时 NameNode 尽量让用户先读取近的副本,降低带块消耗和读取时延 NameNode 全权管理数据块的复制,它周期性地从集群中的每个 DataNode 接收心跳信 号和块状态报告(Blockreport)。接收到心跳信号意味着该 DataNode 节点工作正常。 块状态报告包含了一个该 DataNode 上所有数据块的列表。


DataNode


一个数据块在 DataNode 以文件存储在磁盘上,包括两个文件,一个是数据本身,一个 是元数据包括数据块的长度,块数据的校验和,以及时间戳。

DataNode 启动后向 NameNode 注册,通过后,周期性(1 小时)的向 NameNode 上 报所有的块信息。

心跳是每 3 秒一次,心跳返回结果带有 NameNode 给该 DataNode 的命令如复制块数据 到另一台机器,或删除某个数据块。如果超过 10 分钟没有收到某个 DataNode 的心跳, 则认为该节点不可用。

集群运行中可以安全加入和退出一些机器。


HDFS 关键运行机制–高可用

一个名字节点和多个数据节点


  • 数据复制(冗余机制) - 存放的位置(机架感知策略)

  • 故障检测

  • 数据节点

心跳包(检测是否宕机)

块报告(安全模式下检测)

数据完整性检测(校验和比较)

  • 名字节点(日志文件,镜像文件)

  • 空间回收机制


HDFS 如何写文件?




• 使用 HDFS 提供的客户端开发库 Client,向远程的 NameNode 发起 RPC 请求;

• NameNode 会检查要创建的文件是否已经存在,创建者是否有权限进行操作,成功则会为文 件创建一个记录,否则会让客户端抛出异常;、

• 当客户端开始写入文件的时候,开发库会将文件切分成多个 packets,并在内部以数据队列 " data queue "的形式管理这些 packets,并向 NameNode 申请新的 blocks,获取用来存储 replicas 的合适的 datanodes 列表,列表的大小根据在 NameNode 中对 replication 的设置 而定。

• 开始以 pipeline(管道)的形式将 packet 写入所有的 replicas 中。开发库把 packet 以流的 方式写入第一个 datanode,该 datanode 把该 packet 存储之后,再将其传递给在此 pipeline 中的下一个 datanode,直到最后一个 datanode,这种写数据的方式呈流水线的形 式。

• 最后一个 datanode 成功存储之后会返回一个 ack packet,在 pipeline 里传递至客户端,在 客户端的开发库内部维护着“ack queue” ,成功收到 datanode 返回的 ack packet 后会从 “ack queue”移除相应的 packet。

• 如果传输过程中,有某个 datanode 出现了故障,那么当前的 pipeline 会被关闭,出现故障 的 datanode 会从当前的 pipeline 中移除,剩余的 block 会继续剩下的 datanode 中继续以 pipeline 的形式传输,同时 NameNode 会分配一个新的 datanode,保持 replicas 设定的数 量。

#### HDFS 如何读文件?



• 使用 HDFS 提供的客户端开发库 Client,向远程的 NameNode 发起 RPC 请求;

• NameNode 会视情况返回文件的部分或者全部 block 列表,对于每个 block, NameNode 都会返回有该 block 拷贝的 DataNode 地址;

• 客户端开发库 Client 会选取离客户端接近的 DataNode 来读取 block;如果客户端 本身就是 DataNode,那么将从本地直接获取数据。

  • 读取完当前 block 的数据后,关闭与当前的 DataNode 连接,并为读取下一个 block 寻找佳的 DataNode; • 当读完列表的 block 后,且文件读取还没有结束,客户端开发库会继续向 NameNode 获取下一批的 block 列表。

  • 读取完一个 block 都会进行 checksum 验证,如果读取 DataNode 时出现错误,客户 端会通知 NameNode,然后再从下一个拥有该 block 拷贝的 DataNode 继续读。


节点失效是常态


  • DataNode 中的磁盘挂了怎么办?

  • DataNode 所在机器挂了怎么办?

  • NameNode 挂了怎么办?

  • Client 挂了怎么办?


##### DataNode 的磁盘挂了怎么办?


  • DataNode 正常服务

  • 坏掉的磁盘上的数据尽快通知 NameNode


DataNode 所在机器挂了怎么办?

问:NameNode 怎么知道 DataNode 挂掉了?

答:Datanode 每 3 秒钟向 NameNode 发送心跳,如果 10 分钟 DataNode 没有向 NameNode 发送心跳,则 NameNode 认为该 DataNode 已经 dead,NameNode 将取 出该 DataNode 上对应的 block,对其进行复制。


NameNode 挂了怎么办?

持久化元数据


  • 操作日志(edit log)

记录文件创建,删除,修改文件属性等操作

  • Fsimage

  1. 包含完整的命名空间

  2. File -> Block 的映射关系

  3. 文件的属性(ACL, quota, 修改时间等)



Client 挂了怎么办?

问:Client 所在机器挂了有什么影响?

答:一致性问题

###### HDFS 一致性模型


  • 文件创建以后,不保证在 NameNode 立即可见,即使文件刷新并存储,文件长度依 然可能为 0;

  • 当写入数据超过一个块后,新的 reader 可以看见第一个块,reader 不能看见当前正 在写入的块;

  • HDFS 提供 sync()方法强制缓存与数据节点同步,sync()调用成功后,当前写入数据 对所有 reader 可见且一致;

  • 调用 sync()会导致额外的开销.


###### 副本摆放策略

跨机架写

###### 压缩

减少存储所需的磁盘空间

加速数据在网络和磁盘上的传输



###### SequenceFile



数据块 block

  • 默认 64M,通常设置为 128M

  • 可以在 hdfs-site.xml 中设置

<property>     <name>dfs.block.size</name>     <value>134217728</value> </property> 
复制代码

NameNode 参数,在 hdfs-site.xml 中设置。

dfs.name.dir

<property>     <name>dfs.name.dir</name>     <value>/data0/name,/nfs/name</value>     <description>文件存储路径</description> </property> 
复制代码


DataNode 参数,在 hdfs-site.xml 中设置。

dfs.data.dir

<property>     <name>dfs.data.dir</name>     <value>/data0/hdfs,/data1/hdfs</value>     <description>数据存储目录</description> </property>
复制代码

Hadoop 文件系统

文件系统抽象(org.apache.hadoop)


  • fs.FileSystem

  • fs.LocalFileSystem

  • hdfs.DistributedFileSystem

  • hdfs.HftpFileSystem

  • hdfs.HsftpFileSystem

  • fs.HarFileSystem


JAVA 接口

通过 FileSystem API 读取数据


  • Path 对象。

hdfs://localhost:9000/user/tom/t.txt


  • 获取 FileSystem 实例

publish static FileSystem get(Configuration conf) throws IOException

publish static FileSystem get(URI uri,Configuration conf) throws IOException


  • 获取文件输入流

publish FSDataInputStream open(Path p) throws IOException

publish abstract FSDataInputStream open(Path p,int bufferSize) throws IOException


-----

MapReduce

MapReduce:大规模数据处理


  • 处理海量数据(>1TB)

  • 上百上千 CPU 实现并行处理

简单地实现以上目的-移动计算比移动数据更划算

MapReduce 特性


  • 自动实现分布式并行计算

  • 容错

  • 提供状态监控工具

  • 模型抽象简洁,程序员易用


MapReduce

它由称为 map 和 reduce 的两部分用户程序组成,然后利用框架在计算机集群上面根据 需求运行多个程序实例来处理各个子任务,然后再对结果进行归并。






适合 MapReduce 的计算类型


  • TopK

  • K-means

  • Bayes

  • SQL

不适合 MapReduce 的计算类型


  • Fibonacci


###### InputFormat

验证作业的输入的正确性

将输入文件切分成逻辑的 InputSplits,一个 InputSplit 将被分配给一个单独的 Mapper task 提供 RecordReader 的实现,这个 RecordReader 会从 InputSplit 中正确读出一条一条的 K-V对供 Mapper 使用。

###### FileInputFormat

得到分片的小值 minSize 和大值 maxSize,可以通过设置 mapred.min.split.size 和 mapred.max.split.size 来设置;

对于每个输入文件,计算 max(minSize, min(maxSize, blockSize)); 如果 minSize<=blockSize<=maxSize,则设为 blockSize。

分片信息<file,start,length,hosts>,通过 hosts 实现 map 本地性。

###### OutputFormat

OutputFormt 接口决定了在哪里以及怎样持久化作业结果。

默认的 OutputFormat 就是 TextOutputFormat,它是一种以行分隔,包含制表符界定的 键值对的文本文件格


###### Partitioner


  • 什么是 Partitioner

Mapreduce 通过 Partitioner 对 Key 进行分区,进而把数据按我们自己的需求来分发。

  • 什么情况下使用 Partitioner

1. 如果你需要 key 按照自己意愿分发,那么你需要这样的组件。

2. 框架默认的 HashPartitioner

3. 例如:数据内包含省份,而输出要求每个省份输出一个文件。


###### 主要调度方法


  • 单队列调度

特点:FIFO

优点:简单

缺点:资源利用率低


  • 容量调度(Capacity Scheduler ,Hadoop-0.19.0 )

1. 特点:

多队列,每个队列分配一定系统容量(Guaranteed Capacity)

空闲资源可以被动态分配给负载重的队列

支持作业优先级

2. 作业选择:

选择队列:资源回收请求队列优先;最多自由空间队列优先。

选择作业:按提交时间、优先级排队;检查用户配额;检查内存。

3. 优点:

支持多作业并行执行,提高资源利用率

动态调整资源分配,提高作业执行效率

4. 缺点:

队列设置和队列选择无法自动进行,用户需要了解大量系统信息

  • 公平调度(Fair Scheduler,Hadoop-0.19.0)

1. 目标:

改善小作业的响应时间

确保生产性作业的服务水平

2. 特点:

将作业分组——形成作业池(based on a configurable attribute , such as user name, unixgroup,…)

给每个作业池分配最小共享资源(Minimum map slots, Minimum reduce slots)

将多余的资源平均分配给每个作业

3. 作业选择:

优先调度资源小于最小共享资源的作业

选择分配资源与所需资源差距最大的作业

4. 优点:

支持作业分类调度,使不同类型的作业获得不同的资源分配,提高服务质量

动态调整并行作业数量,充分利用资源

5. 缺点: 不考虑节点的实际负载状态,导致节点负载实际不均衡


###### JobTracker 内部实现


  • 作业控制

作业抽象成三层:作业监控层(JIP),任务控制层(TIP),任务执行层。

任务可能会被尝试多次执行,每个任务实例被称作 Task Attempt(TA)

TA 成功,TIP 会标注该任务成功,所有 TIP 成功,JIP 成功

  • 资源管理

根据 TaskTracker 状态信息进行任务分配


###### JobTracker 容错

JobTracker 失败,那么未完成 Job 失败;

通过 Job 日志,Job 可部分恢复。


###### TaskTracker 容错


  • 超时

TaskTracker10 分钟(mapred.tasktracker.expiry.interval)未汇报心跳,则将其从集群移除

  • 灰名单,黑名单

TaskTracker 上部署性能监控脚本

如果性能表现太差,被 JobTacker 暂停调度

###### Task 容错


  • 允许部分 Task 失败

允许失败的任务占比,默认 0,Mapred.max.map.failers.percent, mapred.max.reduce.failures.percent


  • Task 由 TIP 监控,失败任务多次尝试,慢任务启动备份任务

每次都是一个 TA(Task Attempt),最大允许尝试次数:mapred.map.max.attempts, mapred.reduce.max.attempts


###### Record 容错

跳过导致 Task 失败的坏记录


  • K,V 超大,导致 OOM,配置最大长度,超出截断 mapred.linercordreader.maxlength。

  • 异常数据引发程序 bug,task 重试几次后,自动进入 skip mode,跳过导致失败的记录, mapred.skip.attempts.to.start.skipping。


------

Yarn

YARN:Yet Another Resource Negotiator

下一代 MapReduce 框架的名称。

不再是一个传统的 MapReduce 框架,甚至与 MapReduce 无关。

一个通用的运行时框架,用户可以编写自己的计算框架,在该运行环境中运行。


MapReduce 的架构,在 MapReduce 应用程序的启动过程中,重要的就是要把 MapReduce 程序分发到大数据集群的服务器上,在 Hadoop 1 中,这个过程主要是通 过 TaskTracker 和 JobTracker 通信来完成。


这种架构方案的主要缺点是,服务器集群资源调度管理和 MapReduce 执行过程耦合在 一起,如果想在当前集群中运行其他计算任务,比如 Spark 或者 Storm,就无法统一使 用集群中的资源了。


在 Hadoop 早期的时候,大数据技术就只有 Hadoop 一家,这个缺点并不明显。但随着 大数据技术的发展,各种新的计算框架不断出现,我们不可能为每一种计算框架部署一 个服务器集群,而且就算能部署新集群,数据还是在原来集群的 HDFS 上。所以我们需 要把 MapReduce 的资源管理和计算框架分开,这也是 Hadoop 2 主要的变化,就是 将 Yarn 从 MapReduce 中分离出来,成为一个独立的资源调度框架。


MRv2 基本的设计思想是将 JobTracker 的两个主要功能,即资源管理和作业管理分成 两个独立的进程


  • 在该解决方案中包含两个组件:全局的 ResourceManager(RM)和与每个应用相关的 ApplicationMaster(AM)

  • 这里的“应用”指一个单独的 MapReduce 作业或者 DAG 作业


Yarn 架构



Yarn 包括两个部分:

一个是资源管理器(Resource Manager),一个是节点管理器(Node Manager)。


这也是 Yarn 的两种主要进程:ResourceManager 进程负责整个集群的资源调度管理, 通常部署在独立的服务器上;NodeManager 进程负责具体服务器上的资源和任务管理, 在集群的每一台计算服务器上都会启动,基本上跟 HDFS 的 DataNode 进程一起出现。


资源管理器又包括两个主要组件:调度器和应用程序管理器。


调度器其实就是一个资源分配算法,根据应用程序(Client)提交的资源申请和当前服务 器集群的资源状况进行资源分配。Yarn 内置了几种资源调度算法,包括 Fair Scheduler 、CapacityScheduler 等,你也可以开发自己的资源调度算法供 Yarn 调用。


Yarn 进行资源分配的单位是容器(Container),每个容器包含了一定量的内存、CPU 等计算资源,默认配置下,每个容器包含一个 CPU 核心。容器由 NodeManager 进程 启动和管理,NodeManger 进程会监控本节点上容器的运行状况并向 ResourceManger 进程汇报。


应用程序管理器负责应用程序的提交、监控应用程序运行状态等。应用程序启动后需要 在集群中运行一个 ApplicationMaster,ApplicationMaster 也需要运行在容器里面。每 个应用程序启动后都会先启动自己的 ApplicationMaster,由 ApplicationMaster 根据应 用程序的资源需求进一步向 ResourceManager 进程申请容器资源,得到容器以后就会 分发自己的应用程序代码到容器上启动,进而开始分布式计算,


Yarn 的工作流程(MapReduce 为例)


  1. 我们向 Yarn 提交应用程序,包括 MapReduce ApplicationMaster、我们的 MapReduce 程序,以及 MapReduce Application 启动命令。

  2. ResourceManager 进程和 NodeManager 进程通信,根据集群资源,为用户程序分 配第一个容器,并将 MapReduce ApplicationMaster 分发到这个容器上面,并在容 器里面启动 MapReduce ApplicationMaster。

  3. MapReduce ApplicationMaster 启动后立即向 ResourceManager 进程注册,并为自 己的应用程序申请容器资源。

  4. MapReduce ApplicationMaster 申请到需要的容器后,立即和相应的 NodeManager 进程通信,将用户 MapReduce 程序分发到 NodeManager 进程所在服务器,并在容 器中运行,运行的就是 Map 或者 Reduce 任务。

  5. Map 或者 Reduce 任务在运行期和 MapReduce ApplicationMaster 通信,汇报自己 的运行状态,如果运行结束,MapReduce ApplicationMaster 向 ResourceManager 进程注销并释放所有的容器资源。


资源管理器 HA



----


Hive


Hive 架构




Hive 执行流程


  • 操作符(Operator)是 Hive 的小处理单元

  • 每个操作符处理代表 HDFS 操作或 MR 作业

  • 编译器把 Hive SQL 转换成一组操作符



Hive 编译器





Hive Metastore


  • Single User Mode (Default)

  • Multi User Mode

  • Remote Server

  • Hive QL –Join

Hive QL –Join in Map Reduce

  • Join Optimizations

Hive QL –Map Join


用户头像

Kaven

关注

还未添加个人签名 2019.04.13 加入

还未添加个人简介

评论

发布
暂无评论
极客时间架构师训练营 1 期 - 第 12 周总结