HDFS源碼解析:教你用HDFS客戶端寫數據

摘要:終於開始了這個很感興趣但是一直覺得困難重重的源碼解析工作,也算是一個好的開端。

本文分享自華為雲社區《hdfs源碼解析之客戶端寫數據》,作者: dayu_dls。

在我們客戶端寫數據的程式碼大致如下:

Configuration conf = new Configuration();
 
conf.set("fs.defaultFS","hdfs://172.16.40.119:8020");
 
String a = "This is my first hdfs file!";
//① 得到DistributedFileSystem
FileSystem filesytem = FileSystem.get(conf);   
//② 得到輸出流FSDataOutputStream
FSDataOutputStream fs = filesytem.create(new Path("/a.txt"),true); 
//③ 開始寫數據
fs.write(a.getBytes());
 
fs.flush();

最重要的三步已經在上面標註,通過源碼分析每一步所發生的細節是什麼?

FileSystem filesytem = FileSystem.get(conf); 

其中conf是一個Configuration對象。執行這行程式碼後就進入到FileSystem.get(Configuration conf)方法中,可以看到,在這個方法中先通過getDefaultUri()方法獲取文件系統對應的的URI,該URI保存了與文件系統對應的協議和授權資訊,如:hdfs://localhost:9000。這個URI又是如何得到的呢?是在CLASSPATH中的配置文件中取得的,看getDefaultUri()方法中有conf.get(FS_DEFAULT_NAME_KEY, “file:///”) 這麼一個實參,在筆者項目的CLASSPATH中的core-site.xml文件中有這麼一個配置:

 <property>
        <name>fs.default.name</name>
        <value>hdfs://localhost:9000</value>
    </property>
    <property> 

而常量FS_DEFAULT_NAME_KEY對應的值是fs.default.name,所以conf.get(FS_DEFAULT_NAME_KEY, “file:///”)得到的值是hdfs://localhost:9000。URI創建完成之後就進入到FileSystem.get(URI uri, Configuration conf)方法。在這個方法中,先執行一些檢查,檢查URI的協議和授權資訊是否為空,然後再直接或簡介調用該方法,最重要的是執行

  String disableCacheName = String.format("fs.%s.impl.disable.cache", scheme);
    if (conf.getBoolean(disableCacheName, false)) {//是否使用被Cache的文件系統
      return createFileSystem(uri, conf);
    }
 
    return CACHE.get(uri, conf);

常量CACHE用於快取已經打開的、可共享的文件系統,它是FileSystem類的靜態內部類FileSystem.Cache的對象,在其內部使用一個Map存儲文件系統

private final Map<Key, FileSystem> map = new HashMap<Key, FileSystem>();這個鍵值對映射的鍵是FileSystem.Cache.Key類型,它有三個成員變數:

/**URI模式**/
final String scheme;
/**URI的授權部分**/
final String authority;
/**保存了打開具體文件系統的本地用戶資訊,不同本地用戶打開的具體文件系統也是不能共享的**/
final UserGroupInformation ugi;

由於FileSystem.Cache表示可共享的文件系統,所以這個Key就用於區別不同的文件系統對象,如一個一個文件系統對象可共享,那麼FileSystem.Cache.Key的三個成員變數相等,在這個類中重寫了hashCode()方法和equals()方法,就是用於判斷這三個變數是否相等。根據《Hadoop技術內幕:深入解析Hadoop Common和HDFS架構設計與實現原理》這本書的介紹,在Hadoop1.0版本中FileSystem.Cache.Key類還有一個unique欄位,這個欄位表示,如果其他3個欄位相等的情況,下如果用戶不想共享這個文件系統,就設置這個值(默認為0),但是不知道現在為什麼去除了,還沒搞清楚,有哪位同學知道的話麻煩告知,謝謝。

回到FileSystem.get(final URI uri, final Configuration conf)方法的最後一行語句return CACHE.get(uri, conf),調用了FileSystem.Cahce.get()方法獲取具體的文件系統對象,該方法程式碼如下:

 FileSystem get(URI uri, Configuration conf) throws IOException{
      Key key = new Key(uri, conf);
      FileSystem fs = null;
      synchronized (this) {
        fs = map.get(key);
      }
      if (fs != null) {
        return fs;
      }
 
      fs = createFileSystem(uri, conf);
      synchronized (this) {  // refetch the lock again
        FileSystem oldfs = map.get(key);
        if (oldfs != null) { // a file system is created while lock is releasing
          fs.close(); // close the new file system
          return oldfs;  // return the old file system
        }
 
        // now insert the new file system into the map
        if (map.isEmpty() && !clientFinalizer.isAlive()) {
          Runtime.getRuntime().addShutdownHook(clientFinalizer);
        }
        fs.key = key;
        map.put(key, fs);
        return fs;
      }
    } 

在這個方法中先查看已經map中是否已經快取了要獲取的文件系統對象,如果已經有了,直接從集合中去除,如果沒有才進行創建,由於FileSystem.CACHE為static類型,所以在同一時刻可能有多個執行緒在訪問,所以需要在Cache類的方法中使用同步的操作來取值和設置值。這個方法比較簡單,最核心的就是

   fs = createFileSystem(uri, conf); 

這行語句,它執行了具體的文件系統對象的創建的功能。createFileSystem()方法是FileSystem的一個私有方法,其程式碼如下:

private static FileSystem createFileSystem(URI uri, Configuration conf
      ) throws IOException {
    Class<?> clazz = conf.getClass("fs." + uri.getScheme() + ".impl", null);
    LOG.debug("Creating filesystem for " + uri);
    if (clazz == null) {
      throw new IOException("No FileSystem for scheme: " + uri.getScheme());
    }
    FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);
    fs.initialize(uri, conf);
    return fs;
  }

其實現就是先從配置文件取得URI對應的類,如在core-default.xml文件中屬性(鍵)fs.hdfs.impl對應的值是org.apache.hadoop.hdfs.DistributedFileSystem,相應的XML程式碼如下:

<property>
  <name>fs.hdfs.impl</name>
  <value>org.apache.hadoop.hdfs.DistributedFileSystem</value>
  <description>The FileSystem for hdfs: uris.</description>
</property>

所以若uri對應fs.hdfs.impl,那麼createFileSystem中的clazz就是org.apache.hadoop.hdfs.DistributedFileSystem的Class對象。然後再利用反射,創建org.apache.hadoop.hdfs.DistributedFileSystem的對象fs。然後執行fs.initialize(uri, conf);初始化fs對象。DistributedFileSystem是Hadoop分散式文件系統的實現類,實現了Hadoop文件系統的介面,提供了處理HDFS文件和目錄的相關事務。

這行程式碼

FSDataOutputStream fs = filesytem.create(new Path("/a.txt"),true); 

主要做了兩件事:

①通過rpc調用在namenode命名空間創建文件條目;

②創建該文件對應的輸出流。

filesytem.create()最終調用的是DistributedFileSystem的create方法

@Override
  //返回HdfsDataOutputStream對象,繼承FSDataOutputStream
  public FSDataOutputStream create(final Path f, final FsPermission permission,
    final EnumSet<CreateFlag> cflags, final int bufferSize,
    final short replication, final long blockSize, final Progressable progress,
    final ChecksumOpt checksumOpt) throws IOException {
   //此文件系統的統計資訊,每次寫操作增加1
     /* 跟蹤有關在FileSystem中完成了多少次讀取,寫入等操作的統計資訊。 由於每個FileSystem只有一個這樣的對象,
      因此通常會有許多執行緒寫入此對象。 幾乎打開文件上的每個操作都將涉及對該對象的寫入。 相比之下,大多數程式不經常閱讀統計數據,
      而其他程式則根本不這樣做。 因此,這針對寫入進行了優化。 每個執行緒都寫入自己的執行緒本地記憶體區域。 這消除了爭用,
      並允許我們擴展到許多執行緒。 為了讀取統計資訊,讀者執行緒總計了所有執行緒本地數據區域的內容。*/
    statistics.incrementWriteOps(1);
    //獲取絕對路徑
    Path absF = fixRelativePart(f);
   /* 嘗試使用指定的FileSystem和Path調用重寫的doCall(Path)方法。 如果調用因UnresolvedLinkException失敗,
               它將嘗試解析路徑並通過調用next(FileSystem,Path)重試該調用。*/
    return new FileSystemLinkResolver<FSDataOutputStream>() {
      @Override
      public FSDataOutputStream doCall(final Path p)
          throws IOException, UnresolvedLinkException {
        final DFSOutputStream dfsos = dfs.create(getPathName(p), permission,
                cflags, replication, blockSize, progress, bufferSize,
                checksumOpt);
        //返回HdfsDataOutputStream對象,並傳入DFSOutputStream對象
        return dfs.createWrappedOutputStream(dfsos, statistics);
      }
      @Override
      public FSDataOutputStream next(final FileSystem fs, final Path p)
          throws IOException {
        return fs.create(p, permission, cflags, bufferSize,
            replication, blockSize, progress, checksumOpt);
      }
    }.resolve(this, absF);
  }

在上面程式碼中首先構建DFSOutputStream,然後傳給dfs.createWrappedOutputStream構建HdfsDataOutputStream,看下dfs.create(getPathName(p), permission,cflags, replication, blockSize, progress, bufferSize, checksumOpt)是如何構建輸出流DFSOutputStream的。

public DFSOutputStream create(String src, 
                             FsPermission permission,
                             EnumSet<CreateFlag> flag, 
                             boolean createParent,
                             short replication,
                             long blockSize,
                             Progressable progress,
                             int buffersize,
                             ChecksumOpt checksumOpt,
                             InetSocketAddress[] favoredNodes) throws IOException {
    checkOpen();
    if (permission == null) {
      permission = FsPermission.getFileDefault();
    }
    FsPermission masked = permission.applyUMask(dfsClientConf.uMask);
    if(LOG.isDebugEnabled()) {
      LOG.debug(src + ": masked=" + masked);
    }
    String[] favoredNodeStrs = null;
    if (favoredNodes != null) {
      favoredNodeStrs = new String[favoredNodes.length];
      for (int i = 0; i < favoredNodes.length; i++) {
        favoredNodeStrs[i] = 
            favoredNodes[i].getHostName() + ":" 
                         + favoredNodes[i].getPort();
      }
    }
//DFSOutputStream.newStreamForCreate構建DFSOutputStream
    final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this,
        src, masked, flag, createParent, replication, blockSize, progress,
        buffersize, dfsClientConf.createChecksum(checksumOpt),
        favoredNodeStrs);
    beginFileLease(result.getFileId(), result);
    return result;
  }

再進到DFSOutputStream.newStreamForCreate方法中

static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src,
      FsPermission masked, EnumSet<CreateFlag> flag, boolean createParent,
      short replication, long blockSize, Progressable progress, int buffersize,
      DataChecksum checksum, String[] favoredNodes) throws IOException {
    HdfsFileStatus stat = null;
 
    // Retry the create if we get a RetryStartFileException up to a maximum
    // number of times
    boolean shouldRetry = true;
    int retryCount = CREATE_RETRY_COUNT;
    while (shouldRetry) {
      shouldRetry = false;
      try {
        stat = dfsClient.namenode.create(src, masked, dfsClient.clientName,
            new EnumSetWritable<CreateFlag>(flag), createParent, replication,
            blockSize, SUPPORTED_CRYPTO_VERSIONS);
        break;
      } catch (RemoteException re) {
        IOException e = re.unwrapRemoteException(
            AccessControlException.class,
            DSQuotaExceededException.class,
            FileAlreadyExistsException.class,
            FileNotFoundException.class,
            ParentNotDirectoryException.class,
            NSQuotaExceededException.class,
            RetryStartFileException.class,
            SafeModeException.class,
            UnresolvedPathException.class,
            SnapshotAccessControlException.class,
            UnknownCryptoProtocolVersionException.class);
        if (e instanceof RetryStartFileException) {
          if (retryCount > 0) {
            shouldRetry = true;
            retryCount--;
          } else {
            throw new IOException("Too many retries because of encryption" +
                " zone operations", e);
          }
        } else {
          throw e;
        }
      }
    }
    Preconditions.checkNotNull(stat, "HdfsFileStatus should not be null!");
    final DFSOutputStream out = new DFSOutputStream(dfsClient, src, stat,
        flag, progress, checksum, favoredNodes);
    out.start();
    return out;
  }

在newStreamForCreate方法中,先定義一個文件狀態變數stat,然後不停的嘗試通過namenode創建文件條目,創建成功後再創建改文件的輸出流,然後通過out.start()啟動DataQueue執行緒開始發送數據。我們重點看一下namenode是怎麼創建文件條目的。打開dfsClient.namenode.create方法,dfsClient.namenode是在dfsClient中聲明的ClientProtocol對象。ClientProtocol是客戶端協議介面,namenode端需要實現該介面的create方法,通過動態代理的方式把結果返回給客戶端,即是rpc遠程調用。那麼看下namenode端是怎麼實現這個create方法的,打開這個方法的實現類我們發現了NameNodeRpcServer這個類,這個類是實現namenode rpc機制的核心類,繼承了各種協議介面並實現。

打開NameNodeRpcServer的create方法:

@Override // ClientProtocol
  public HdfsFileStatus create(String src, FsPermission masked,
      String clientName, EnumSetWritable<CreateFlag> flag,
      boolean createParent, short replication, long blockSize, 
      CryptoProtocolVersion[] supportedVersions)
      throws IOException {
    String clientMachine = getClientMachine();
    if (stateChangeLog.isDebugEnabled()) {
      stateChangeLog.debug("*DIR* NameNode.create: file "
                         +src+" for "+clientName+" at "+clientMachine);
    }
    if (!checkPathLength(src)) {
      throw new IOException("create: Pathname too long.  Limit "
          + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
    }
    //經過一系列的檢查最終調用了namesystem.startFile方法,
    HdfsFileStatus fileStatus = namesystem.startFile(src, new PermissionStatus(
        getRemoteUser().getShortUserName(), null, masked),
        clientName, clientMachine, flag.get(), createParent, replication,
        blockSize, supportedVersions);
    metrics.incrFilesCreated();
    metrics.incrCreateFileOps();
    return fileStatus;
  }

打開namesystem.startFile,namesystem是NameNodeRpcServer中聲明的FSNamesystem對象:

/**
   * Create a new file entry in the namespace.
   * 在命名空間創建一個文件條目
   * 
   * For description of parameters and exceptions thrown see
   * {@link ClientProtocol#create}, except it returns valid file status upon
   * success
   */
  HdfsFileStatus startFile(String src, PermissionStatus permissions,
      String holder, String clientMachine, EnumSet<CreateFlag> flag,
      boolean createParent, short replication, long blockSize, 
      CryptoProtocolVersion[] supportedVersions)
      throws AccessControlException, SafeModeException,
      FileAlreadyExistsException, UnresolvedLinkException,
      FileNotFoundException, ParentNotDirectoryException, IOException {
    HdfsFileStatus status = null;
    CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache,
        null);
    if (cacheEntry != null && cacheEntry.isSuccess()) {
      return (HdfsFileStatus) cacheEntry.getPayload();
    }
 
    try {
        //調用的startFileInt
      status = startFileInt(src, permissions, holder, clientMachine, flag,
          createParent, replication, blockSize, supportedVersions,
          cacheEntry != null);
    } catch (AccessControlException e) {
      logAuditEvent(false, "create", src);
      throw e;
    } finally {
      RetryCache.setState(cacheEntry, status != null, status);
    }
    return status;
  }
最後打開startFileInt方法中,可以看到又調用了startFileInternal方法:

 try {
      checkOperation(OperationCategory.WRITE);
      checkNameNodeSafeMode("Cannot create file" + src);
      src = resolvePath(src, pathComponents);
      toRemoveBlocks = startFileInternal(pc, src, permissions, holder, 
          clientMachine, create, overwrite, createParent, replication, 
          blockSize, isLazyPersist, suite, protocolVersion, edek, logRetryCache);
      stat = dir.getFileInfo(src, false,
          FSDirectory.isReservedRawName(srcArg), true);
    } catch (StandbyException se) {
      skipSync = true;
      throw se;
打開startFileInternal:

  /**
   * Create a new file or overwrite an existing file<br>
   * 
   * Once the file is create the client then allocates a new block with the next
   * call using {@link ClientProtocol#addBlock}.
   * <p>
   * For description of parameters and exceptions thrown see
   * {@link ClientProtocol#create}
   */
  private BlocksMapUpdateInfo startFileInternal(FSPermissionChecker pc, 
      String src, PermissionStatus permissions, String holder, 
      String clientMachine, boolean create, boolean overwrite, 
      boolean createParent, short replication, long blockSize, 
      boolean isLazyPersist, CipherSuite suite, CryptoProtocolVersion version,
      EncryptedKeyVersion edek, boolean logRetryEntry)
      throws FileAlreadyExistsException, AccessControlException,
      UnresolvedLinkException, FileNotFoundException,
      ParentNotDirectoryException, RetryStartFileException, IOException {
    //檢查當前執行緒是否有寫鎖,沒有退出
      assert hasWriteLock();
    // Verify that the destination does not exist as a directory already.
      //判斷文件是否已經作為目錄存在
      //INodesInPath:包含從給定路徑解析的INode資訊。
      //獲取給定文件或目錄的inode資訊
    final INodesInPath iip = dir.getINodesInPath4Write(src);
    final INode inode = iip.getLastINode();
    if (inode != null && inode.isDirectory()) {
      throw new FileAlreadyExistsException(src +
          " already exists as a directory");
    }
//FileEncryptionInfo封裝加密文件的所有加密相關資訊
    FileEncryptionInfo feInfo = null;
    if (dir.isInAnEZ(iip)) {
      // The path is now within an EZ, but we're missing encryption parameters
      if (suite == null || edek == null) {
        throw new RetryStartFileException();
      }
      // Path is within an EZ and we have provided encryption parameters.
      // Make sure that the generated EDEK matches the settings of the EZ.
      String ezKeyName = dir.getKeyName(iip);
      if (!ezKeyName.equals(edek.getEncryptionKeyName())) {
        throw new RetryStartFileException();
      }
      feInfo = new FileEncryptionInfo(suite, version,
          edek.getEncryptedKeyVersion().getMaterial(),
          edek.getEncryptedKeyIv(),
          ezKeyName, edek.getEncryptionKeyVersionName());
      Preconditions.checkNotNull(feInfo);
    }
 
    final INodeFile myFile = INodeFile.valueOf(inode, src, true);
    if (isPermissionEnabled) {
      if (overwrite && myFile != null) {
        checkPathAccess(pc, src, FsAction.WRITE);
      }
      /*
       * To overwrite existing file, need to check 'w' permission 
       * of parent (equals to ancestor in this case)
       */
      checkAncestorAccess(pc, src, FsAction.WRITE);
    }
 
    if (!createParent) {
      verifyParentDir(src);
    }
 
    try {
      BlocksMapUpdateInfo toRemoveBlocks = null;
      if (myFile == null) {
        if (!create) {
          throw new FileNotFoundException("Can't overwrite non-existent " +
              src + " for client " + clientMachine);
        }
      } else {
        if (overwrite) {
          toRemoveBlocks = new BlocksMapUpdateInfo();
          List<INode> toRemoveINodes = new ChunkedArrayList<INode>();
          long ret = dir.delete(src, toRemoveBlocks, toRemoveINodes, now());
          if (ret >= 0) {
            incrDeletedFileCount(ret);
            removePathAndBlocks(src, null, toRemoveINodes, true);
          }
        } else {
          // If lease soft limit time is expired, recover the lease
            //如果租約軟限制時間到期,則恢復租約
          recoverLeaseInternal(myFile, src, holder, clientMachine, false);
          throw new FileAlreadyExistsException(src + " for client " +
              clientMachine + " already exists");
        }
      }
 
      checkFsObjectLimit();
      INodeFile newNode = null;
 
      // Always do an implicit mkdirs for parent directory tree.
      Path parent = new Path(src).getParent();
      if (parent != null && mkdirsRecursively(parent.toString(),
              permissions, true, now())) {
          //獲取文件的inode
        newNode = dir.addFile(src, permissions, replication, blockSize,
                              holder, clientMachine);
      }
 
      if (newNode == null) {
        throw new IOException("Unable to add " + src +  " to namespace");
      }
      leaseManager.addLease(newNode.getFileUnderConstructionFeature()
          .getClientName(), src);
 
      // Set encryption attributes if necessary
      if (feInfo != null) {
        dir.setFileEncryptionInfo(src, feInfo);
        newNode = dir.getInode(newNode.getId()).asFile();
      }
      //設置存儲策略
      setNewINodeStoragePolicy(newNode, iip, isLazyPersist);
 
      // record file record in log, record new generation stamp
      //把操作寫入到EditLog
      getEditLog().logOpenFile(src, newNode, overwrite, logRetryEntry);
      if (NameNode.stateChangeLog.isDebugEnabled()) {
        NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: added " +
            src + " inode " + newNode.getId() + " " + holder);
      }
      return toRemoveBlocks;
    } catch (IOException ie) {
      NameNode.stateChangeLog.warn("DIR* NameSystem.startFile: " + src + " " +
          ie.getMessage());
      throw ie;
    }
  }

這個方法就是生成文件條目的核心方法,首先判斷檢查當前執行緒是否有寫鎖,沒有退出。FSDirectory dir是一個命名空間的記憶體樹。

fs.write(a.getBytes());

上面write方法鄭振調用的是FSOutputSummer.write,FSOutputSummer維護了一個本地緩衝區buf,大小初始為9*chunkSize,append文件時初始化方法不同。循環寫buf.length位元組數據,buf滿了就開始調用writeChecksumChunks寫packet。

@Override
  public synchronized void write(byte b[], int off, int len)
      throws IOException {
 
    checkClosed();
 
    if (off < 0 || len < 0 || off > b.length - len) {
      throw new ArrayIndexOutOfBoundsException();
    }
//循環寫buf.length位元組數據,buf滿了就開始寫packet
    for (int n=0;n<len;n+=write1(b, off+n, len-n)) {
    }
  }
 
private int write1(byte b[], int off, int len) throws IOException {
    if(count==0 && len>=buf.length) {
      // local buffer is empty and user buffer size >= local buffer size, so
      // simply checksum the user buffer and send it directly to the underlying
      // stream
      final int length = buf.length;
      writeChecksumChunks(b, off, length);
      return length;
    }
 
    // copy user data to local buffer
    int bytesToCopy = buf.length-count;
    bytesToCopy = (len<bytesToCopy) ? len : bytesToCopy;
    System.arraycopy(b, off, buf, count, bytesToCopy);
    count += bytesToCopy;
    if (count == buf.length) {
      // local buffer is full
      flushBuffer();
    } 
    return bytesToCopy;
  }

創建文件時,是每次寫getBytesPerChecksum,剛好一個chunk的大小,追加文件時第一次寫文件最後一個block的最後一個chunk空的部分,這樣就可以組成一個完整的chunk,後面就按照create文件一樣每次寫chunk大小。所以每次寫的大小是根據create還是append區別的。

//創建文件時,是每次寫getBytesPerChecksum,剛好一個chunk的大小,追加文件時第一次寫文件最後一個block的最後一個chunk空的部分,這樣就可以組成一個完整的chunk,後面就按照create文件一樣每次寫chunk大小。所以每次寫的大小是根據create還是append區別的。

private void writeChecksumChunks(byte b[], int off, int len)
  throws IOException {
    sum.calculateChunkedSums(b, off, len, checksum, 0);
    for (int i = 0; i < len; i += sum.getBytesPerChecksum()) {
      int chunkLen = Math.min(sum.getBytesPerChecksum(), len - i);
      int ckOffset = i / sum.getBytesPerChecksum() * getChecksumSize();
      writeChunk(b, off + i, chunkLen, checksum, ckOffset, getChecksumSize());
    }
  }

最核心的方法是writeChunk()

 /**
   * cklen :校驗和大小
   * 寫數據到packet,每次只寫一個chunk大小的數據
   */
  @Override
  protected synchronized void writeChunk(byte[] b, int offset, int len,
      byte[] checksum, int ckoff, int cklen) throws IOException {
    dfsClient.checkOpen();//檢查DFSClient對象的狀態
    checkClosed();//檢查DFSOutputStream對象的狀態
//輸出的數據比一個校驗塊(chunk)還大
    if (len > bytesPerChecksum) {
      throw new IOException("writeChunk() buffer size is " + len +
                            " is larger than supported  bytesPerChecksum " +
                            bytesPerChecksum);
    }
    //要寫入的校驗和大小與給定的大小不一致
    if (cklen != 0 && cklen != getChecksumSize()) {
      throw new IOException("writeChunk() checksum size is supposed to be " +
                            getChecksumSize() + " but found to be " + cklen);
    }
//當前要寫入的packet為空,則新建
    if (currentPacket == null) {
      currentPacket = createPacket(packetSize, chunksPerPacket, 
          bytesCurBlock, currentSeqno++);
      if (DFSClient.LOG.isDebugEnabled()) {
        DFSClient.LOG.debug("DFSClient writeChunk allocating new packet seqno=" + 
            currentPacket.seqno +
            ", src=" + src +
            ", packetSize=" + packetSize +
            ", chunksPerPacket=" + chunksPerPacket +
            ", bytesCurBlock=" + bytesCurBlock);
      }
    }
//開始寫數據,先寫校驗和checksum,再寫chunkdata
    currentPacket.writeChecksum(checksum, ckoff, cklen);
    currentPacket.writeData(b, offset, len);
    //chunk個數自增
    currentPacket.numChunks++;
    //block中的偏移量
    bytesCurBlock += len;
 
    // If packet is full, enqueue it for transmission
    //packet的chunk個數等於packet設置的最大chunk個數,則packet滿了,就開始傳輸,如果bytesCurBlock大於blockSize呢?
    //如何處理
    //已解決:通過computePacketChunkSize我們知道,
    if (currentPacket.numChunks == currentPacket.maxChunks ||
        bytesCurBlock == blockSize) {
      if (DFSClient.LOG.isDebugEnabled()) {
        DFSClient.LOG.debug("DFSClient writeChunk packet full seqno=" +
            currentPacket.seqno +
            ", src=" + src +
            ", bytesCurBlock=" + bytesCurBlock +
            ", blockSize=" + blockSize +
            ", appendChunk=" + appendChunk);
      }
      //當前packet放入到隊列,等待消費
      waitAndQueueCurrentPacket();
 
      // If the reopened file did not end at chunk boundary and the above
      // write filled up its partial chunk. Tell the summer to generate full 
      // crc chunks from now on.
      //如果重新打開的文件沒有在chunk塊邊界結束,並且上面的寫入填滿了它的部分塊。 告訴夏天從現在開始生成完整的crc塊。
      //block中剛好存儲整數個完整的chunk塊,如果分配的block中已經存在數據
      //通過對文件進行追加操作,然後逐步調試,終於明白了appendChunk的含義,在對已經存在的文件進行append操作時,會構建DFSOutputStream對象,而這個對象的初始化和新建
      //文件時的方法是不同的。append操作的對象初始化會從namenode把文件最後一個block(block存在一個list中)的資訊拿到,然後把這個block的資訊初始化給DFSOutputStream
      //本地緩衝區buf就是blockSize-bytesCurBlock,且當前packet的chunksize=blockSize-bytesCurBlock
      //如果是追加數據,且追加後構成一個完整的chunk塊,那麼就需要把之前指定的buf重置成正常值
      if (appendChunk && bytesCurBlock%bytesPerChecksum == 0) {
        appendChunk = false;
        // 為何這個操作?buf置空
        //使用指定大小的新緩衝區重置現有緩衝區。
        resetChecksumBufSize();
      }
 
      if (!appendChunk) {
          //計算下一個packet的大小,保證寫入時不會超過blocksize
        /*  就是說,在new每個新的Packet之前,都會重新計算一下新的Packet的大小,
          以保證新的Packet大小不會超過Block的剩餘大小
          如果block還有不到一個Packet的大小(比如還剩3kb的空間),則最後一個Packet的大小就是:
          blockSize-bytesCurBlock,也就是3kb*/
 
        int psize = Math.min((int)(blockSize-bytesCurBlock), dfsClient.getConf().writePacketSize);
        computePacketChunkSize(psize, bytesPerChecksum);
      }
      //
      // if encountering a block boundary, send an empty packet to 
      // indicate the end of block and reset bytesCurBlock.
      //
      //如果block滿了,發送空包,重置變數
      if (bytesCurBlock == blockSize) {
        currentPacket = createPacket(0, 0, bytesCurBlock, currentSeqno++);
        currentPacket.lastPacketInBlock = true;
        currentPacket.syncBlock = shouldSyncBlock;
        waitAndQueueCurrentPacket();
        bytesCurBlock = 0;
        lastFlushOffset = 0;
      }
    }
  }

如果重新打開的文件沒有在chunk塊邊界結束,並且上面的寫入填滿了它的部分塊。 告訴夏天從現在開始生成完整的crc塊。block中剛好存儲整數個完整的chunk塊,如果分配的block中已經存在數據。通過對文件進行追加操作,然後逐步調試,終於明白了appendChunk的含義,在對已經存在的文件進行append操作時,會構建DFSOutputStream對象,而這個對象的初始化和新建文件時的方法是不同的。append操作的對象初始化會從namenode把文件最後一個block(block存在一個list中)的資訊拿到,然後把這個block的資訊初始化給DFSOutputStream。本地緩衝區buf就是blockSize-bytesCurBlock,且當前packet的chunksize=blockSize-bytesCurBlock。如果是追加數據,且追加後構成一個完整的chunk塊,那麼就需要把之前指定的buf重置成正常值。

 

點擊關注,第一時間了解華為雲新鮮技術~