Hadoop 编程实战:HDFS API 编程
前言:Hadoop分布式计算平台不等同大数据。大数据是当前时代数据爆炸的一种表征,Hadoop是通过生态圈内的分布式工具,凝聚计算机集群的算力,对大数据进行计算的一个平台。
一、 浅谈HDFS原理
HDFS(Hadoop Distributed File System)是Hadoop生态圈的分布式文件系统,基于Google在2003年所发表论文《The Google File System》的开源实现。想要了解HDFS基本原理,首先要了解文件级别与块级别的分布式文件系统。
文件级别的分布式文件系统
文件级别的分布式文件系统,通俗来讲就是一个主从架构的文件系统。在一个计算机集群中,选取其中一个节点作为MASTER节点。MASTER节点主要管理文件系统的元信息,比如文件系统的目录结构,文件所存储的节点信息等。除了MASTER节点外,其他节点均为Slave节点。Slave节点主要负责文件的存储与维持MASTER节点的心跳连接。为了保证数据的可靠性,可以把文件分别存储在3个不同的Slave节点上(即3副本)。
当客户端Client向文件级别的分布式文件系统写入一个文件时,首先要跟MASTER节点通信,获取3个Slave节点的地址;接着判断文件的合法性(比如文件是否重名,写入用户对目录是否有足够权限),一旦文件合法性通过,则与所获取的3个Slave节点建立连接;最后把文件同时传输到3个Slave节点上,并更新MASTER节点的元信息,完成写入流程。
同理客户端Client向文件级别的分布式文件系统读取一个文件时的流程与上述一致。
文件级别的分布式文件系统虽然可以解决文件系统高可用问题,但同时也带来其他性能问题:
1) 难以负载均衡。因为系统是以文件级别进行存储,而文件的大小并不一致,所以各个Slave节点的存储大小难以均衡;
2) 难以并行处理。当上层计算引擎需要从多个节点提取文件时,文件系统的统一出口便会出现带宽瓶颈,影响上层计算引擎的并行处理效率。
基于上述性能问题,有科学家便提出块级别的分布式文件系统。
块级别的分布式文件系统
相比文件级别的分布式文件系统,块级别的分布式文件系统把文件切割成等大的数据块存储在各个Slave节点上,而MASTER节点从登记文件所存储的节点信息变更为登记数据块所存储的节点信息。
由于数据按块级别进行存储,各个Slave节点所存储的数据块数量差异会变得很小,从而解决负载均衡问题。同时并行读取数据时,数据块大小会比文件小得多,读取效率大大提升,从而解决数据并行处理问题。
HDFS是基于块级别的分布式文件系统而诞生的。
HDFS基本原理
HDFS主要分为3个角色,分别为NameNode(主节点),DataNode(数据节点)及客户端Client,基本架构如下所示:
NameNode主要负责管理元信息及所有DataNode。
1) 管理元信息:维护文件系统的目录结构、各个文件的数据块信息、DataNode登记信息及包含的数据块信息等。
2) 管理所有DataNode:定时检查所有DataNode的心跳连接、当DataNode出现异常时(比如节点丢失、数据块错误),向存活的DataNode节点重构异常节点的相应数据块等。
由于NameNode管理HDFS的元信息,所以一旦NameNode出现异常或元信息出现丢失,会导致HDFS变得不可用。为此一般会搭建SecondNameNode,作为NameNode的镜像节点。当NameNode出现异常时,及时恢复数据或切换到SecondNameNode,保证文件系统可用。
DataNode主要负责数据的存储及维持NameNode的心跳连接。
客户端Client主要负责HDFS的参数管理及操作HDFS内的数据。当向HDFS写入一个文件时,首先在Client对文件切成等大的数据块(Hadoop 2.x后默认数据块大小为128M),然后向NameNode申请三个DataNode的连接地址(假设副本数设置为3),并建立数据流水线,最后把数据块写入DataNode中存储。当向HDFS读取一个文件时,首先向NameNode获取相应数据块所存储的DataNode信息,并与这些DataNode建立连接,然后从DataNode上提取数据块到 Client,最后Client会整合数据块为完整文件进行展现。
详细HDFS原理未来将单独写成文章分享给大家。
二、 常用HDFS API
表2.1 HDFS常用类
表2.2 FileSystem常用方法
表2.3 FSDataInputStream常用方法
表2.4 FSDataOutputStream常用方法
表2.4 IOUtils常用方法
三、 HDFS API编程实战
通过本地与HDFS的文件交互编程实战,简单说明HDFS API编程方式。
创建HDFS访问配置对象
private static Configuration getConf(){
Configuration conf = new Configuration(); conf.set("fs.defaultFS","hdfs://namenode.cdh.com:8020");
return conf;
}
每次操作HDFS前都要创建FileSystem对象
Configuration conf = getConf();
fs = FileSystem.get(conf);
本地文件上传到HDFS方法
//把本地文件写到输入流
InputStream in=new BufferedInputStream(new FileInputStream(location));
//打开HDFS文件
fsout = fs.create(new Path(HdfsPath));
//把本地文件写入到HDFS文件上
IOUtils.copyBytes(in, fsout, 4096, true);
fs.close();
HDFS下载文件到本地方法
//把本地文件写到输出流
OutputStream out = new BufferedOutputStream(new FileOutputStream(location));
//读取HDFS指定路径文件
fsin = fs.open(new Path(HdfsPath));
//把HDFS文件写入到本地文件上
IOUtils.copyBytes(fsin, out, 1024, true);
fs.close();
完整代码
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import java.io.*;
public class hdfs_demo {
private static Configuration getConf(){
Configuration conf = new Configuration(); conf.set("fs.defaultFS","hdfs://namenode.cdh.com:8020");
return conf;
}
public static void updataToHdfs(String location,String HdfsPath) {
FileSystem fs=null;
FSDataOutputStream fsout=null;
try {
Configuration conf = getConf();
fs = FileSystem.get(conf);
//把本地文件写到输入流
InputStream in=new BufferedInputStream(new FileInputStream(location));
//打开HDFS文件
fsout = fs.create(new Path(HdfsPath));
//把本地文件写入到HDFS文件上
IOUtils.copyBytes(in, fsout, 4096, true);
fs.close();
} catch (IOException e) {
e.printStackTrace();
}
}
public static void downloadFromHdfs(String location,String HdfsPath) {
FileSystem fs=null;
FSDataInputStream fsin=null;
try {
Configuration conf = getConf();
fs = FileSystem.get(conf);
//把本地文件写到输出流
OutputStream out = new BufferedOutputStream(new FileOutputStream(location));
//读取HDFS指定路径文件
fsin = fs.open(new Path(HdfsPath));
//把HDFS文件写入到本地文件上
IOUtils.copyBytes(fsin, out, 1024, true);
fs.close();
} catch (IOException e){
e.printStackTrace();
}
}
public static void main(String[] args){
String HdfsFile="/hdfstest/test.txt";
String infile="d:/data/in_test.txt";
String outfile="d:/data/out_test.txt";
hdfs_demo h=new hdfs_demo();
h.updataToHdfs(infile,HdfsFile);
h.downloadFromHdfs(outfile,HdfsFile);
}
}
编译通过后直接在idea上执行即可
四、 常见问题
CDH关闭hdfs权限检查
问题描述:使用root用户在hdfs上创建文件或目录时,提示“Permission denied: user=root, access=WRITE, inode="/":hdfs:supergroup:d”错误
问题分析:CDH开启hdfs权限检查,导致root用户没权限,只能使用hdfs用户
解决方案:CDH关闭hdfs权限检查
1)进入HDFS配置界面
2)找到“检查HDFS权限“选项,并取消勾选
3) 重启HDFS
本地配置hadoop_home变量
问题描述:本地运行代码时提示“HADOOP_HOME and hadoop.home.dir are unset“
问题分析:本地缺少配置hadoop_home变量
解决方案:
1)下载winutils-master.zip蓝奏云:https://www.lanzous.com/i55ccnc
根据所使用的hadoop版本解压到本地,比如CDH 6.0是使用hadoop 3.0版本的,就解压hadoop-3.0.0到本地
2) 打开“环境变量“,配置hadoop_home变量
新建HADOOP_HOME变量
在Path变量内添加
3) 重启IDE(idea或者eclipse)并重新执行代码
参考文献:
【1】 杨正洪《大数据技术入门》【M】北京.清华大学出版社
【2】 董西成《大数据技术体系详解》【M】北京.机械工业出版社
版权声明: 本文为 InfoQ 作者【罗小龙】的原创文章。
原文链接:【http://xie.infoq.cn/article/560f9b30720a529852c718144】。文章转载请联系作者。
评论