写点什么

HDFS 源码解析:教你用 HDFS 客户端写数据

  • 2021 年 12 月 30 日
  • 本文字数:13936 字

    阅读完需:约 46 分钟

摘要:终于开始了这个很感兴趣但是一直觉得困难重重的源码解析工作,也算是一个好的开端。


本文分享自华为云社区《hdfs源码解析之客户端写数据》,作者: dayu_dls。

 

在我们客户端写数据的代码大致如下:


Configuration conf = new Configuration(); conf.set("fs.defaultFS","hdfs://172.16.40.119:8020"); String a = "This is my first hdfs file!";//① 得到DistributedFileSystemFileSystem filesytem = FileSystem.get(conf);   //② 得到输出流FSDataOutputStreamFSDataOutputStream fs = filesytem.create(new Path("/a.txt"),true); //③ 开始写数据fs.write(a.getBytes()); fs.flush();
复制代码


​最重要的三步已经在上面标注,通过源码分析每一步所发生的细节是什么?


FileSystem filesytem = FileSystem.get(conf); 
复制代码


​其中 conf 是一个 Configuration 对象。执行这行代码后就进入到 FileSystem.get(Configuration conf)方法中,可以看到,在这个方法中先通过 getDefaultUri()方法获取文件系统对应的的 URI,该 URI 保存了与文件系统对应的协议和授权信息,如:hdfs://localhost:9000。这个 URI 又是如何得到的呢?是在 CLASSPATH 中的配置文件中取得的,看 getDefaultUri()方法中有 conf.get(FS_DEFAULT_NAME_KEY, "file:///") 这么一个实参,在笔者项目的 CLASSPATH 中的 core-site.xml 文件中有这么一个配置: 


 <property>        <name>fs.default.name</name>        <value>hdfs://localhost:9000</value>    </property>    <property> 
复制代码


​而常量 FS_DEFAULT_NAME_KEY 对应的值是 fs.default.name,所以 conf.get(FS_DEFAULT_NAME_KEY, "file:///")得到的值是 hdfs://localhost:9000。

URI 创建完成之后就进入到 FileSystem.get(URI uri, Configuration conf)方法。在这个方法中,先执行一些检查,检查 URI 的协议和授权信息是否为空,然后再直接或简介调用该方法,最重要的是执行


String disableCacheName = String.format("fs.%s.impl.disable.cache", scheme);    if (conf.getBoolean(disableCacheName, false)) {//是否使用被Cache的文件系统      return createFileSystem(uri, conf);    }     return CACHE.get(uri, conf);
复制代码


​常量 CACHE 用于缓存已经打开的、可共享的文件系统,它是 FileSystem 类的静态内部类 FileSystem.Cache 的对象,在其内部使用一个 Map 存储文件系统 private final Map<Key, FileSystem> map = new HashMap<Key, FileSystem>();这个键值对映射的键是 FileSystem.Cache.Key 类型,它有三个成员变量:

/**URI 模式**/final String scheme;/**URI 的授权部分**/final String authority;/**保存了打开具体文件系统的本地用户信息,不同本地用户打开的具体文件系统也是不能共享的**/final UserGroupInformation ugi;


由于 FileSystem.Cache 表示可共享的文件系统,所以这个 Key 就用于区别不同的文件系统对象,如一个一个文件系统对象可共享,那么 FileSystem.Cache.Key 的三个成员变量相等,在这个类中重写了 hashCode()方法和 equals()方法,就是用于判断这三个变量是否相等。根据《Hadoop 技术内幕:深入解析 Hadoop Common 和 HDFS 架构设计与实现原理》这本书的介绍,在 Hadoop1.0 版本中 FileSystem.Cache.Key 类还有一个 unique 字段,这个字段表示,如果其他 3 个字段相等的情况,下如果用户不想共享这个文件系统,就设置这个值(默认为 0),但是不知道现在为什么去除了,还没搞清楚,有哪位同学知道的话麻烦告知,谢谢。


回到 FileSystem.get(final URI uri, final Configuration conf)方法的最后一行语句 return CACHE.get(uri, conf),调用了 FileSystem.Cahce.get()方法获取具体的文件系统对象,该方法代码如下:


FileSystem get(URI uri, Configuration conf) throws IOException{      Key key = new Key(uri, conf);      FileSystem fs = null;      synchronized (this) {        fs = map.get(key);      }      if (fs != null) {        return fs;      }            fs = createFileSystem(uri, conf);      synchronized (this) {  // refetch the lock again        FileSystem oldfs = map.get(key);        if (oldfs != null) { // a file system is created while lock is releasing          fs.close(); // close the new file system          return oldfs;  // return the old file system        }         // now insert the new file system into the map        if (map.isEmpty() && !clientFinalizer.isAlive()) {          Runtime.getRuntime().addShutdownHook(clientFinalizer);        }        fs.key = key;        map.put(key, fs);        return fs;      }    } 
复制代码


​在这个方法中先查看已经 map 中是否已经缓存了要获取的文件系统对象,如果已经有了,直接从集合中去除,如果没有才进行创建,由于 FileSystem.CACHE 为 static 类型,所以在同一时刻可能有多个线程在访问,所以需要在 Cache 类的方法中使用同步的操作来取值和设置值。这个方法比较简单,最核心的就是


fs = createFileSystem(uri, conf); 
复制代码


​这行语句,它执行了具体的文件系统对象的创建的功能。createFileSystem()方法是 FileSystem 的一个私有方法,其代码如下:


private static FileSystem createFileSystem(URI uri, Configuration conf      ) throws IOException {    Class<?> clazz = conf.getClass("fs." + uri.getScheme() + ".impl", null);    LOG.debug("Creating filesystem for " + uri);    if (clazz == null) {      throw new IOException("No FileSystem for scheme: " + uri.getScheme());    }    FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);    fs.initialize(uri, conf);    return fs;  }
复制代码


​其实现就是先从配置文件取得 URI 对应的类,如在 core-default.xml 文件中属性(键)fs.hdfs.impl 对应的值是 org.apache.hadoop.hdfs.DistributedFileSystem,相应的 XML 代码如下:


<property>  <name>fs.hdfs.impl</name>  <value>org.apache.hadoop.hdfs.DistributedFileSystem</value>  <description>The FileSystem for hdfs: uris.</description></property>
复制代码


​所以若 uri 对应 fs.hdfs.impl,那么 createFileSystem 中的 clazz 就是 org.apache.hadoop.hdfs.DistributedFileSystem 的 Class 对象。然后再利用反射,创建 org.apache.hadoop.hdfs.DistributedFileSystem 的对象 fs。然后执行 fs.initialize(uri, conf);初始化 fs 对象。DistributedFileSystem 是 Hadoop 分布式文件系统的实现类,实现了 Hadoop 文件系统的界面,提供了处理 HDFS 文件和目录的相关事务。


这行代码


FSDataOutputStream fs = filesytem.create(new Path("/a.txt"),true); 
复制代码


​主要做了两件事:

①通过 rpc 调用在 namenode 命名空间创建文件条目;

②创建该文件对应的输出流。

filesytem.create()最终调用的是 DistributedFileSystem 的 create 方法


@Override  //返回HdfsDataOutputStream对象,继承FSDataOutputStream  public FSDataOutputStream create(final Path f, final FsPermission permission,    final EnumSet<CreateFlag> cflags, final int bufferSize,    final short replication, final long blockSize, final Progressable progress,    final ChecksumOpt checksumOpt) throws IOException {   //此文件系统的统计信息,每次写操作增加1	 /* 跟踪有关在FileSystem中完成了多少次读取,写入等操作的统计信息。 由于每个FileSystem只有一个这样的对象,	  因此通常会有许多线程写入此对象。 几乎打开文件上的每个操作都将涉及对该对象的写入。 相比之下,大多数程序不经常阅读统计数据,	  而其他程序则根本不这样做。 因此,这针对写入进行了优化。 每个线程都写入自己的线程本地内存区域。 这消除了争用,	  并允许我们扩展到许多线程。 为了读取统计信息,读者线程总计了所有线程本地数据区域的内容。*/	statistics.incrementWriteOps(1);	//获取绝对路径    Path absF = fixRelativePart(f);   /* 尝试使用指定的FileSystem和Path调用重写的doCall(Path)方法。 如果调用因UnresolvedLinkException失败,               它将尝试解析路径并通过调用next(FileSystem,Path)重试该调用。*/    return new FileSystemLinkResolver<FSDataOutputStream>() {      @Override      public FSDataOutputStream doCall(final Path p)          throws IOException, UnresolvedLinkException {        final DFSOutputStream dfsos = dfs.create(getPathName(p), permission,                cflags, replication, blockSize, progress, bufferSize,                checksumOpt);        //返回HdfsDataOutputStream对象,并传入DFSOutputStream对象        return dfs.createWrappedOutputStream(dfsos, statistics);      }      @Override      public FSDataOutputStream next(final FileSystem fs, final Path p)          throws IOException {        return fs.create(p, permission, cflags, bufferSize,            replication, blockSize, progress, checksumOpt);      }    }.resolve(this, absF);  }
复制代码


在上面代码中首先构建 DFSOutputStream,然后传给 dfs.createWrappedOutputStream 构建 HdfsDataOutputStream,看下 dfs.create(getPathName(p), permission,cflags, replication, blockSize, progress, bufferSize, checksumOpt)是如何构建输出流 DFSOutputStream 的。


public DFSOutputStream create(String src,                              FsPermission permission,                             EnumSet<CreateFlag> flag,                              boolean createParent,                             short replication,                             long blockSize,                             Progressable progress,                             int buffersize,                             ChecksumOpt checksumOpt,                             InetSocketAddress[] favoredNodes) throws IOException {    checkOpen();    if (permission == null) {      permission = FsPermission.getFileDefault();    }    FsPermission masked = permission.applyUMask(dfsClientConf.uMask);    if(LOG.isDebugEnabled()) {      LOG.debug(src + ": masked=" + masked);    }    String[] favoredNodeStrs = null;    if (favoredNodes != null) {      favoredNodeStrs = new String[favoredNodes.length];      for (int i = 0; i < favoredNodes.length; i++) {        favoredNodeStrs[i] =             favoredNodes[i].getHostName() + ":"                          + favoredNodes[i].getPort();      }    }//DFSOutputStream.newStreamForCreate构建DFSOutputStream    final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this,        src, masked, flag, createParent, replication, blockSize, progress,        buffersize, dfsClientConf.createChecksum(checksumOpt),        favoredNodeStrs);    beginFileLease(result.getFileId(), result);    return result;  }
复制代码


​再进到 DFSOutputStream.newStreamForCreate 方法中


static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src,      FsPermission masked, EnumSet<CreateFlag> flag, boolean createParent,      short replication, long blockSize, Progressable progress, int buffersize,      DataChecksum checksum, String[] favoredNodes) throws IOException {    HdfsFileStatus stat = null;     // Retry the create if we get a RetryStartFileException up to a maximum    // number of times    boolean shouldRetry = true;    int retryCount = CREATE_RETRY_COUNT;    while (shouldRetry) {      shouldRetry = false;      try {        stat = dfsClient.namenode.create(src, masked, dfsClient.clientName,            new EnumSetWritable<CreateFlag>(flag), createParent, replication,            blockSize, SUPPORTED_CRYPTO_VERSIONS);        break;      } catch (RemoteException re) {        IOException e = re.unwrapRemoteException(            AccessControlException.class,            DSQuotaExceededException.class,            FileAlreadyExistsException.class,            FileNotFoundException.class,            ParentNotDirectoryException.class,            NSQuotaExceededException.class,            RetryStartFileException.class,            SafeModeException.class,            UnresolvedPathException.class,            SnapshotAccessControlException.class,            UnknownCryptoProtocolVersionException.class);        if (e instanceof RetryStartFileException) {          if (retryCount > 0) {            shouldRetry = true;            retryCount--;          } else {            throw new IOException("Too many retries because of encryption" +                " zone operations", e);          }        } else {          throw e;        }      }    }    Preconditions.checkNotNull(stat, "HdfsFileStatus should not be null!");    final DFSOutputStream out = new DFSOutputStream(dfsClient, src, stat,        flag, progress, checksum, favoredNodes);    out.start();    return out;  }
复制代码


​在 newStreamForCreate 方法中,先定义一个文件状态变量 stat,然后不停的尝试通过 namenode 创建文件条目,创建成功后再创建改文件的输出流,然后通过 out.start()启动 DataQueue 线程开始发送数据。我们重点看一下 namenode 是怎么创建文件条目的。打开 dfsClient.namenode.create 方法,dfsClient.namenode 是在 dfsClient 中声明的 ClientProtocol 对象。ClientProtocol 是客户端协议接口,namenode 端需要实现该接口的 create 方法,通过动态代理的方式把结果返回给客户端,即是 rpc 远程调用。那么看下 namenode 端是怎么实现这个 create 方法的,打开这个方法的实现类我们发现了 NameNodeRpcServer 这个类,这个类是实现 namenode rpc 机制的核心类,继承了各种协议接口并实现。


打开 NameNodeRpcServer 的 create 方法:


@Override // ClientProtocol  public HdfsFileStatus create(String src, FsPermission masked,      String clientName, EnumSetWritable<CreateFlag> flag,      boolean createParent, short replication, long blockSize,       CryptoProtocolVersion[] supportedVersions)      throws IOException {    String clientMachine = getClientMachine();    if (stateChangeLog.isDebugEnabled()) {      stateChangeLog.debug("*DIR* NameNode.create: file "                         +src+" for "+clientName+" at "+clientMachine);    }    if (!checkPathLength(src)) {      throw new IOException("create: Pathname too long.  Limit "          + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");    }    //经过一系列的检查最终调用了namesystem.startFile方法,    HdfsFileStatus fileStatus = namesystem.startFile(src, new PermissionStatus(        getRemoteUser().getShortUserName(), null, masked),        clientName, clientMachine, flag.get(), createParent, replication,        blockSize, supportedVersions);    metrics.incrFilesCreated();    metrics.incrCreateFileOps();    return fileStatus;  }
复制代码


​打开 namesystem.startFile,namesystem 是 NameNodeRpcServer 中声明的 FSNamesystem 对象:


/**   * Create a new file entry in the namespace.   * 在命名空间创建一个文件条目   *    * For description of parameters and exceptions thrown see   * {@link ClientProtocol#create}, except it returns valid file status upon   * success   */  HdfsFileStatus startFile(String src, PermissionStatus permissions,      String holder, String clientMachine, EnumSet<CreateFlag> flag,      boolean createParent, short replication, long blockSize,       CryptoProtocolVersion[] supportedVersions)      throws AccessControlException, SafeModeException,      FileAlreadyExistsException, UnresolvedLinkException,      FileNotFoundException, ParentNotDirectoryException, IOException {    HdfsFileStatus status = null;    CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache,        null);    if (cacheEntry != null && cacheEntry.isSuccess()) {      return (HdfsFileStatus) cacheEntry.getPayload();    }        try {        //调用的startFileInt      status = startFileInt(src, permissions, holder, clientMachine, flag,          createParent, replication, blockSize, supportedVersions,          cacheEntry != null);    } catch (AccessControlException e) {      logAuditEvent(false, "create", src);      throw e;    } finally {      RetryCache.setState(cacheEntry, status != null, status);    }    return status;  }最后打开startFileInt方法中,可以看到又调用了startFileInternal方法:
try { checkOperation(OperationCategory.WRITE); checkNameNodeSafeMode("Cannot create file" + src); src = resolvePath(src, pathComponents); toRemoveBlocks = startFileInternal(pc, src, permissions, holder, clientMachine, create, overwrite, createParent, replication, blockSize, isLazyPersist, suite, protocolVersion, edek, logRetryCache); stat = dir.getFileInfo(src, false, FSDirectory.isReservedRawName(srcArg), true); } catch (StandbyException se) { skipSync = true; throw se;打开startFileInternal:
/** * Create a new file or overwrite an existing file<br> * * Once the file is create the client then allocates a new block with the next * call using {@link ClientProtocol#addBlock}. * <p> * For description of parameters and exceptions thrown see * {@link ClientProtocol#create} */ private BlocksMapUpdateInfo startFileInternal(FSPermissionChecker pc, String src, PermissionStatus permissions, String holder, String clientMachine, boolean create, boolean overwrite, boolean createParent, short replication, long blockSize, boolean isLazyPersist, CipherSuite suite, CryptoProtocolVersion version, EncryptedKeyVersion edek, boolean logRetryEntry) throws FileAlreadyExistsException, AccessControlException, UnresolvedLinkException, FileNotFoundException, ParentNotDirectoryException, RetryStartFileException, IOException { //检查当前线程是否有写锁,没有退出 assert hasWriteLock(); // Verify that the destination does not exist as a directory already. //判断文件是否已经作为目录存在 //INodesInPath:包含从给定路径解析的INode信息。 //获取给定文件或目录的inode信息 final INodesInPath iip = dir.getINodesInPath4Write(src); final INode inode = iip.getLastINode(); if (inode != null && inode.isDirectory()) { throw new FileAlreadyExistsException(src + " already exists as a directory"); }//FileEncryptionInfo封装加密文件的所有加密相关信息 FileEncryptionInfo feInfo = null; if (dir.isInAnEZ(iip)) { // The path is now within an EZ, but we're missing encryption parameters if (suite == null || edek == null) { throw new RetryStartFileException(); } // Path is within an EZ and we have provided encryption parameters. // Make sure that the generated EDEK matches the settings of the EZ. String ezKeyName = dir.getKeyName(iip); if (!ezKeyName.equals(edek.getEncryptionKeyName())) { throw new RetryStartFileException(); } feInfo = new FileEncryptionInfo(suite, version, edek.getEncryptedKeyVersion().getMaterial(), edek.getEncryptedKeyIv(), ezKeyName, edek.getEncryptionKeyVersionName()); Preconditions.checkNotNull(feInfo); } final INodeFile myFile = INodeFile.valueOf(inode, src, true); if (isPermissionEnabled) { if (overwrite && myFile != null) { checkPathAccess(pc, src, FsAction.WRITE); } /* * To overwrite existing file, need to check 'w' permission * of parent (equals to ancestor in this case) */ checkAncestorAccess(pc, src, FsAction.WRITE); } if (!createParent) { verifyParentDir(src); } try { BlocksMapUpdateInfo toRemoveBlocks = null; if (myFile == null) { if (!create) { throw new FileNotFoundException("Can't overwrite non-existent " + src + " for client " + clientMachine); } } else { if (overwrite) { toRemoveBlocks = new BlocksMapUpdateInfo(); List<INode> toRemoveINodes = new ChunkedArrayList<INode>(); long ret = dir.delete(src, toRemoveBlocks, toRemoveINodes, now()); if (ret >= 0) { incrDeletedFileCount(ret); removePathAndBlocks(src, null, toRemoveINodes, true); } } else { // If lease soft limit time is expired, recover the lease //如果租约软限制时间到期,则恢复租约 recoverLeaseInternal(myFile, src, holder, clientMachine, false); throw new FileAlreadyExistsException(src + " for client " + clientMachine + " already exists"); } } checkFsObjectLimit(); INodeFile newNode = null; // Always do an implicit mkdirs for parent directory tree. Path parent = new Path(src).getParent(); if (parent != null && mkdirsRecursively(parent.toString(), permissions, true, now())) { //获取文件的inode newNode = dir.addFile(src, permissions, replication, blockSize, holder, clientMachine); } if (newNode == null) { throw new IOException("Unable to add " + src + " to namespace"); } leaseManager.addLease(newNode.getFileUnderConstructionFeature() .getClientName(), src); // Set encryption attributes if necessary if (feInfo != null) { dir.setFileEncryptionInfo(src, feInfo); newNode = dir.getInode(newNode.getId()).asFile(); } //设置存储策略 setNewINodeStoragePolicy(newNode, iip, isLazyPersist); // record file record in log, record new generation stamp //把操作写入到EditLog getEditLog().logOpenFile(src, newNode, overwrite, logRetryEntry); if (NameNode.stateChangeLog.isDebugEnabled()) { NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: added " + src + " inode " + newNode.getId() + " " + holder); } return toRemoveBlocks; } catch (IOException ie) { NameNode.stateChangeLog.warn("DIR* NameSystem.startFile: " + src + " " + ie.getMessage()); throw ie; } }
复制代码


​这个方法就是生成文件条目的核心方法,首先判断检查当前线程是否有写锁,没有退出。FSDirectory dir 是一个命名空间的内存树。


fs.write(a.getBytes());
复制代码


​上面 write 方法郑振调用的是 FSOutputSummer.write,FSOutputSummer 维护了一个本地缓冲区 buf,大小初始为 9*chunkSize,append 文件时初始化方法不同。循环写 buf.length 字节数据,buf 满了就开始调用 writeChecksumChunks 写 packet。


@Override  public synchronized void write(byte b[], int off, int len)      throws IOException {        checkClosed();        if (off < 0 || len < 0 || off > b.length - len) {      throw new ArrayIndexOutOfBoundsException();    }//循环写buf.length字节数据,buf满了就开始写packet    for (int n=0;n<len;n+=write1(b, off+n, len-n)) {    }  } private int write1(byte b[], int off, int len) throws IOException {    if(count==0 && len>=buf.length) {      // local buffer is empty and user buffer size >= local buffer size, so      // simply checksum the user buffer and send it directly to the underlying      // stream      final int length = buf.length;      writeChecksumChunks(b, off, length);      return length;    }        // copy user data to local buffer    int bytesToCopy = buf.length-count;    bytesToCopy = (len<bytesToCopy) ? len : bytesToCopy;    System.arraycopy(b, off, buf, count, bytesToCopy);    count += bytesToCopy;    if (count == buf.length) {      // local buffer is full      flushBuffer();    }     return bytesToCopy;  }
复制代码


​创建文件时,是每次写 getBytesPerChecksum,刚好一个 chunk 的大小,追加文件时第一次写文件最后一个 block 的最后一个 chunk 空的部分,这样就可以组成一个完整的 chunk,后面就按照 create 文件一样每次写 chunk 大小。所以每次写的大小是根据 create 还是 append 区别的。 


//创建文件时,是每次写 getBytesPerChecksum,刚好一个 chunk 的大小,追加文件时第一次写文件最后一个 block 的最后一个 chunk 空的部分,这样就可以组成一个完整的 chunk,后面就按照 create 文件一样每次写 chunk 大小。所以每次写的大小是根据 create 还是 append 区别的。


private void writeChecksumChunks(byte b[], int off, int len)  throws IOException {    sum.calculateChunkedSums(b, off, len, checksum, 0);    for (int i = 0; i < len; i += sum.getBytesPerChecksum()) {      int chunkLen = Math.min(sum.getBytesPerChecksum(), len - i);      int ckOffset = i / sum.getBytesPerChecksum() * getChecksumSize();      writeChunk(b, off + i, chunkLen, checksum, ckOffset, getChecksumSize());    }  }
复制代码


​最核心的方法是 writeChunk() 


/**   * cklen :校验和大小   * 写数据到packet,每次只写一个chunk大小的数据   */  @Override  protected synchronized void writeChunk(byte[] b, int offset, int len,      byte[] checksum, int ckoff, int cklen) throws IOException {    dfsClient.checkOpen();//检查DFSClient对象的状态    checkClosed();//检查DFSOutputStream对象的状态//输出的数据比一个校验块(chunk)还大    if (len > bytesPerChecksum) {      throw new IOException("writeChunk() buffer size is " + len +                            " is larger than supported  bytesPerChecksum " +                            bytesPerChecksum);    }    //要写入的校验和大小与给定的大小不一致    if (cklen != 0 && cklen != getChecksumSize()) {      throw new IOException("writeChunk() checksum size is supposed to be " +                            getChecksumSize() + " but found to be " + cklen);    }//当前要写入的packet为空,则新建    if (currentPacket == null) {      currentPacket = createPacket(packetSize, chunksPerPacket,           bytesCurBlock, currentSeqno++);      if (DFSClient.LOG.isDebugEnabled()) {        DFSClient.LOG.debug("DFSClient writeChunk allocating new packet seqno=" +             currentPacket.seqno +            ", src=" + src +            ", packetSize=" + packetSize +            ", chunksPerPacket=" + chunksPerPacket +            ", bytesCurBlock=" + bytesCurBlock);      }    }//开始写数据,先写校验和checksum,再写chunkdata    currentPacket.writeChecksum(checksum, ckoff, cklen);    currentPacket.writeData(b, offset, len);    //chunk个数自增    currentPacket.numChunks++;    //block中的偏移量    bytesCurBlock += len;     // If packet is full, enqueue it for transmission    //packet的chunk个数等于packet设置的最大chunk个数,则packet满了,就开始传输,如果bytesCurBlock大于blockSize呢?    //如何处理    //已解决:通过computePacketChunkSize我们知道,    if (currentPacket.numChunks == currentPacket.maxChunks ||        bytesCurBlock == blockSize) {      if (DFSClient.LOG.isDebugEnabled()) {        DFSClient.LOG.debug("DFSClient writeChunk packet full seqno=" +            currentPacket.seqno +            ", src=" + src +            ", bytesCurBlock=" + bytesCurBlock +            ", blockSize=" + blockSize +            ", appendChunk=" + appendChunk);      }      //当前packet放入到队列,等待消费      waitAndQueueCurrentPacket();       // If the reopened file did not end at chunk boundary and the above      // write filled up its partial chunk. Tell the summer to generate full       // crc chunks from now on.      //如果重新打开的文件没有在chunk块边界结束,并且上面的写入填满了它的部分块。 告诉夏天从现在开始生成完整的crc块。      //block中刚好存储整数个完整的chunk块,如果分配的block中已经存在数据      //通过对文件进行追加操作,然后逐步调试,终于明白了appendChunk的含义,在对已经存在的文件进行append操作时,会构建DFSOutputStream对象,而这个对象的初始化和新建      //文件时的方法是不同的。append操作的对象初始化会从namenode把文件最后一个block(block存在一个list中)的信息拿到,然后把这个block的信息初始化给DFSOutputStream      //本地缓冲区buf就是blockSize-bytesCurBlock,且当前packet的chunksize=blockSize-bytesCurBlock      //如果是追加数据,且追加后构成一个完整的chunk块,那么就需要把之前指定的buf重置成正常值      if (appendChunk && bytesCurBlock%bytesPerChecksum == 0) {        appendChunk = false;        // 为何这个操作?buf置空        //使用指定大小的新缓冲区重置现有缓冲区。        resetChecksumBufSize();      }       if (!appendChunk) {          //计算下一个packet的大小,保证写入时不会超过blocksize        /*  就是说,在new每个新的Packet之前,都会重新计算一下新的Packet的大小,          以保证新的Packet大小不会超过Block的剩余大小          如果block还有不到一个Packet的大小(比如还剩3kb的空间),则最后一个Packet的大小就是:          blockSize-bytesCurBlock,也就是3kb*/         int psize = Math.min((int)(blockSize-bytesCurBlock), dfsClient.getConf().writePacketSize);        computePacketChunkSize(psize, bytesPerChecksum);      }      //      // if encountering a block boundary, send an empty packet to       // indicate the end of block and reset bytesCurBlock.      //      //如果block满了,发送空包,重置变量      if (bytesCurBlock == blockSize) {        currentPacket = createPacket(0, 0, bytesCurBlock, currentSeqno++);        currentPacket.lastPacketInBlock = true;        currentPacket.syncBlock = shouldSyncBlock;        waitAndQueueCurrentPacket();        bytesCurBlock = 0;        lastFlushOffset = 0;      }    }  }
复制代码


​如果重新打开的文件没有在 chunk 块边界结束,并且上面的写入填满了它的部分块。 告诉夏天从现在开始生成完整的 crc 块。block 中刚好存储整数个完整的 chunk 块,如果分配的 block 中已经存在数据。通过对文件进行追加操作,然后逐步调试,终于明白了 appendChunk 的含义,在对已经存在的文件进行 append 操作时,会构建 DFSOutputStream 对象,而这个对象的初始化和新建文件时的方法是不同的。append 操作的对象初始化会从 namenode 把文件最后一个 block(block 存在一个 list 中)的信息拿到,然后把这个 block 的信息初始化给 DFSOutputStream。本地缓冲区 buf 就是 blockSize-bytesCurBlock,且当前 packet 的 chunksize=blockSize-bytesCurBlock。如果是追加数据,且追加后构成一个完整的 chunk 块,那么就需要把之前指定的 buf 重置成正常值。


点击关注,第一时间了解华为云新鲜技术~

发布于: 刚刚
用户头像

提供全面深入的云计算技术干货 2020.07.14 加入

华为云开发者社区,提供全面深入的云计算前景分析、丰富的技术干货、程序样例,分享华为云前沿资讯动态,方便开发者快速成长与发展,欢迎提问、互动,多方位了解云计算! 传送门:https://bbs.huaweicloud.com/

评论

发布
暂无评论
HDFS源码解析:教你用HDFS客户端写数据