HDFS 源码解析:教你用 HDFS 客户端写数据
摘要:终于开始了这个很感兴趣但是一直觉得困难重重的源码解析工作,也算是一个好的开端。
本文分享自华为云社区《hdfs源码解析之客户端写数据》,作者: dayu_dls。
在我们客户端写数据的代码大致如下:
最重要的三步已经在上面标注,通过源码分析每一步所发生的细节是什么?
其中 conf 是一个 Configuration 对象。执行这行代码后就进入到 FileSystem.get(Configuration conf)方法中,可以看到,在这个方法中先通过 getDefaultUri()方法获取文件系统对应的的 URI,该 URI 保存了与文件系统对应的协议和授权信息,如:hdfs://localhost:9000。这个 URI 又是如何得到的呢?是在 CLASSPATH 中的配置文件中取得的,看 getDefaultUri()方法中有 conf.get(FS_DEFAULT_NAME_KEY, "file:///") 这么一个实参,在笔者项目的 CLASSPATH 中的 core-site.xml 文件中有这么一个配置:
而常量 FS_DEFAULT_NAME_KEY 对应的值是 fs.default.name,所以 conf.get(FS_DEFAULT_NAME_KEY, "file:///")得到的值是 hdfs://localhost:9000。
URI 创建完成之后就进入到 FileSystem.get(URI uri, Configuration conf)方法。在这个方法中,先执行一些检查,检查 URI 的协议和授权信息是否为空,然后再直接或简介调用该方法,最重要的是执行
常量 CACHE 用于缓存已经打开的、可共享的文件系统,它是 FileSystem 类的静态内部类 FileSystem.Cache 的对象,在其内部使用一个 Map 存储文件系统 private final Map<Key, FileSystem> map = new HashMap<Key, FileSystem>();这个键值对映射的键是 FileSystem.Cache.Key 类型,它有三个成员变量:
/**URI 模式**/final String scheme;/**URI 的授权部分**/final String authority;/**保存了打开具体文件系统的本地用户信息,不同本地用户打开的具体文件系统也是不能共享的**/final UserGroupInformation ugi;
由于 FileSystem.Cache 表示可共享的文件系统,所以这个 Key 就用于区别不同的文件系统对象,如一个一个文件系统对象可共享,那么 FileSystem.Cache.Key 的三个成员变量相等,在这个类中重写了 hashCode()方法和 equals()方法,就是用于判断这三个变量是否相等。根据《Hadoop 技术内幕:深入解析 Hadoop Common 和 HDFS 架构设计与实现原理》这本书的介绍,在 Hadoop1.0 版本中 FileSystem.Cache.Key 类还有一个 unique 字段,这个字段表示,如果其他 3 个字段相等的情况,下如果用户不想共享这个文件系统,就设置这个值(默认为 0),但是不知道现在为什么去除了,还没搞清楚,有哪位同学知道的话麻烦告知,谢谢。
回到 FileSystem.get(final URI uri, final Configuration conf)方法的最后一行语句 return CACHE.get(uri, conf),调用了 FileSystem.Cahce.get()方法获取具体的文件系统对象,该方法代码如下:
在这个方法中先查看已经 map 中是否已经缓存了要获取的文件系统对象,如果已经有了,直接从集合中去除,如果没有才进行创建,由于 FileSystem.CACHE 为 static 类型,所以在同一时刻可能有多个线程在访问,所以需要在 Cache 类的方法中使用同步的操作来取值和设置值。这个方法比较简单,最核心的就是
这行语句,它执行了具体的文件系统对象的创建的功能。createFileSystem()方法是 FileSystem 的一个私有方法,其代码如下:
其实现就是先从配置文件取得 URI 对应的类,如在 core-default.xml 文件中属性(键)fs.hdfs.impl 对应的值是 org.apache.hadoop.hdfs.DistributedFileSystem,相应的 XML 代码如下:
所以若 uri 对应 fs.hdfs.impl,那么 createFileSystem 中的 clazz 就是 org.apache.hadoop.hdfs.DistributedFileSystem 的 Class 对象。然后再利用反射,创建 org.apache.hadoop.hdfs.DistributedFileSystem 的对象 fs。然后执行 fs.initialize(uri, conf);初始化 fs 对象。DistributedFileSystem 是 Hadoop 分布式文件系统的实现类,实现了 Hadoop 文件系统的界面,提供了处理 HDFS 文件和目录的相关事务。
这行代码
主要做了两件事:
①通过 rpc 调用在 namenode 命名空间创建文件条目;
②创建该文件对应的输出流。
filesytem.create()最终调用的是 DistributedFileSystem 的 create 方法
在上面代码中首先构建 DFSOutputStream,然后传给 dfs.createWrappedOutputStream 构建 HdfsDataOutputStream,看下 dfs.create(getPathName(p), permission,cflags, replication, blockSize, progress, bufferSize, checksumOpt)是如何构建输出流 DFSOutputStream 的。
再进到 DFSOutputStream.newStreamForCreate 方法中
在 newStreamForCreate 方法中,先定义一个文件状态变量 stat,然后不停的尝试通过 namenode 创建文件条目,创建成功后再创建改文件的输出流,然后通过 out.start()启动 DataQueue 线程开始发送数据。我们重点看一下 namenode 是怎么创建文件条目的。打开 dfsClient.namenode.create 方法,dfsClient.namenode 是在 dfsClient 中声明的 ClientProtocol 对象。ClientProtocol 是客户端协议接口,namenode 端需要实现该接口的 create 方法,通过动态代理的方式把结果返回给客户端,即是 rpc 远程调用。那么看下 namenode 端是怎么实现这个 create 方法的,打开这个方法的实现类我们发现了 NameNodeRpcServer 这个类,这个类是实现 namenode rpc 机制的核心类,继承了各种协议接口并实现。
打开 NameNodeRpcServer 的 create 方法:
打开 namesystem.startFile,namesystem 是 NameNodeRpcServer 中声明的 FSNamesystem 对象:
这个方法就是生成文件条目的核心方法,首先判断检查当前线程是否有写锁,没有退出。FSDirectory dir 是一个命名空间的内存树。
上面 write 方法郑振调用的是 FSOutputSummer.write,FSOutputSummer 维护了一个本地缓冲区 buf,大小初始为 9*chunkSize,append 文件时初始化方法不同。循环写 buf.length 字节数据,buf 满了就开始调用 writeChecksumChunks 写 packet。
创建文件时,是每次写 getBytesPerChecksum,刚好一个 chunk 的大小,追加文件时第一次写文件最后一个 block 的最后一个 chunk 空的部分,这样就可以组成一个完整的 chunk,后面就按照 create 文件一样每次写 chunk 大小。所以每次写的大小是根据 create 还是 append 区别的。
//创建文件时,是每次写 getBytesPerChecksum,刚好一个 chunk 的大小,追加文件时第一次写文件最后一个 block 的最后一个 chunk 空的部分,这样就可以组成一个完整的 chunk,后面就按照 create 文件一样每次写 chunk 大小。所以每次写的大小是根据 create 还是 append 区别的。
最核心的方法是 writeChunk()
如果重新打开的文件没有在 chunk 块边界结束,并且上面的写入填满了它的部分块。 告诉夏天从现在开始生成完整的 crc 块。block 中刚好存储整数个完整的 chunk 块,如果分配的 block 中已经存在数据。通过对文件进行追加操作,然后逐步调试,终于明白了 appendChunk 的含义,在对已经存在的文件进行 append 操作时,会构建 DFSOutputStream 对象,而这个对象的初始化和新建文件时的方法是不同的。append 操作的对象初始化会从 namenode 把文件最后一个 block(block 存在一个 list 中)的信息拿到,然后把这个 block 的信息初始化给 DFSOutputStream。本地缓冲区 buf 就是 blockSize-bytesCurBlock,且当前 packet 的 chunksize=blockSize-bytesCurBlock。如果是追加数据,且追加后构成一个完整的 chunk 块,那么就需要把之前指定的 buf 重置成正常值。
版权声明: 本文为 InfoQ 作者【华为云开发者社区】的原创文章。
原文链接:【http://xie.infoq.cn/article/1e24cb3aa21b840985262762a】。文章转载请联系作者。
评论