­

hadoop源碼_hdfs啟動流程_2_DataNode

執行start-dfs.sh腳本後,集群是如何啟動的?
本文閱讀並注釋了start-dfs腳本,以及datanode的啟動主要流程流程源碼。

DataNode 啟動流程

腳本代碼分析

start-dfs.sh中啟動datanode的代碼:

#---------------------------------------------------------
# datanodes (using default workers file)

echo "Starting datanodes"
hadoop_uservar_su hdfs datanode "${HADOOP_HDFS_HOME}/bin/hdfs" \
    --workers \
    --config "${HADOOP_CONF_DIR}" \
    --daemon start \
    datanode ${dataStartOpt}
(( HADOOP_JUMBO_RETCOUNTER=HADOOP_JUMBO_RETCOUNTER + $? ))

hadoop-hdfs > src > mian > bin > hdfs中查看namenode命令:

# 命令描述:用於啟動DFS datanode
  hadoop_add_subcommand "datanode" daemon "run a DFS datanode"
  
# 命令處理程序
    datanode)
      HADOOP_SUBCMD_SUPPORTDAEMONIZATION="true"
      HADOOP_SECURE_CLASSNAME="org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter"
      HADOOP_CLASSNAME='org.apache.hadoop.hdfs.server.datanode.DataNode'
      hadoop_deprecate_envvar HADOOP_SECURE_DN_PID_DIR HADOOP_SECURE_PID_DIR
      hadoop_deprecate_envvar HADOOP_SECURE_DN_LOG_DIR HADOOP_SECURE_LOG_DIR
    ;;

這裡定位到了具體的處理類org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarterorg.apache.hadoop.hdfs.server.namenode.NameNode

接着跟進腳本代碼到hadoop-functions.sh中的hadoop_generic_java_subcmd_handler函數可以查看到以下代碼:

  # do the hard work of launching a daemon or just executing our interactive
  #  是啟動守護進程還是僅僅執行交互
  # java class
  if [[ "${HADOOP_SUBCMD_SUPPORTDAEMONIZATION}" = true ]]; then
    if [[ "${HADOOP_SUBCMD_SECURESERVICE}" = true ]]; then
      hadoop_secure_daemon_handler \
        "${HADOOP_DAEMON_MODE}" \
        "${HADOOP_SUBCMD}" \
        "${HADOOP_SECURE_CLASSNAME}" \
        "${daemon_pidfile}" \
        "${daemon_outfile}" \
        "${priv_pidfile}" \
        "${priv_outfile}" \
        "${priv_errfile}" \
        "${HADOOP_SUBCMD_ARGS[@]}"
    else
      hadoop_daemon_handler \
        "${HADOOP_DAEMON_MODE}" \
        "${HADOOP_SUBCMD}" \
        "${HADOOP_CLASSNAME}" \
        "${daemon_pidfile}" \
        "${daemon_outfile}" \
        "${HADOOP_SUBCMD_ARGS[@]}"
    fi
    exit $?
  else
    hadoop_java_exec "${HADOOP_SUBCMD}" "${HADOOP_CLASSNAME}" "${HADOOP_SUBCMD_ARGS[@]}"
  fi

這裡需要分析一下最終走的是hadoop_secure_daemon_handler還是hadoop_daemon_handler

在滿足HADOOP_SUBCMD_SUPPORTDAEMONIZATION = trueHADOOP_SUBCMD_SECURESERVICE = true兩個條件時才會進行安全模式啟動。

HADOOP_SUBCMD_SUPPORTDAEMONIZATIONdatanode的命令處理程序中會賦值:

# 在hdfs腳本中
datanode)
      HADOOP_SUBCMD_SUPPORTDAEMONIZATION="true"
         HADOOP_SECURE_CLASSNAME="org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter"
# ......  
    ;;

HADOOP_SUBCMD_SECURESERVICEhadoop-functions.sh腳本中定義的默認值為:

  HADOOP_SUBCMD_SECURESERVICE=false

在函數hadoop_generic_java_subcmd_handler(我們的腳本執行函數)中,有條件判斷是否賦值為true

## @description Handle subcommands from main program entries
## @audience private
## @stability evolving
## @replaceable yes
function hadoop_generic_java_subcmd_handler
{
# ......

  # The default/expected way to determine if a daemon is going to run in secure
  # mode is defined by hadoop_detect_priv_subcmd.  If this returns true
  # then setup the secure user var and tell the world we're in secure mode

  if hadoop_detect_priv_subcmd "${HADOOP_SHELL_EXECNAME}" "${HADOOP_SUBCMD}"; then
    HADOOP_SUBCMD_SECURESERVICE=true
# ......

進入hadoop_detect_priv_subcmd函數中:

## @description autodetect whether this is a priv subcmd
## @description by whether or not a priv user var exists
## @description and if HADOOP_SECURE_CLASSNAME is defined
## @audience     public
## @stability    stable
## @replaceable  yes
## @param        command
## @param        subcommand
## @return       1 = not priv
## @return       0 = priv
function hadoop_detect_priv_subcmd
{
  declare program=$1
  declare command=$2
  # 
  if [[ -z "${HADOOP_SECURE_CLASSNAME}" ]]; then
    hadoop_debug "No secure classname defined."
    return 1
  fi

  uvar=$(hadoop_build_custom_subcmd_var "${program}" "${command}" SECURE_USER)
  if [[ -z "${!uvar}" ]]; then
    hadoop_debug "No secure user defined."
    return 1
  fi
  return 0
}

可以看到需要HADOOP_SECURE_CLASSNAME,和兩個傳入參數HADOOP_SHELL_EXECNAME,HADOOP_SUBCMD都存在的情況下才會返回0(在shell腳本中if function; then 格式,function返回0即會執行then後的語句)。

HADOOP_SECURE_CLASSNAME參數與HADOOP_SUBCMD_SUPPORTDAEMONIZATION相同會在hdfs腳本中的datanode的命令處理程序中賦值。

HADOOP_SHELL_EXECNAME參數在hdfs腳本中會定義默認值:

# The name of the script being executed.
HADOOP_SHELL_EXECNAME="hdfs"

HADOOP_SUBCMD參數在hdfs腳本中被定義為:HADOOP_SUBCMD=$1,即取自第二個參數,我們返回start-dfs.sh腳本中查看調用命令的完整語句如下:

#---------------------------------------------------------
# datanodes (using default workers file)

echo "Starting datanodes"
hadoop_uservar_su hdfs datanode "${HADOOP_HDFS_HOME}/bin/hdfs" \
    --workers \
    --config "${HADOOP_CONF_DIR}" \
    --daemon start \
    datanode ${dataStartOpt}
(( HADOOP_JUMBO_RETCOUNTER=HADOOP_JUMBO_RETCOUNTER + $? ))

第二個參數為workers

所以可以得出,正常執行start-dfs.sh腳本的情況下,會默認值行hadoop_secure_daemon_handler函數,即通過執行SecureDataNodeStarter類來以安全模式啟動datanode。

SecureDataNodeStarter

官方注釋翻譯:

在安全集群中啟動datanode的實用程序類,首先在主啟動前獲得特權資源並將它們交給datanode。

SecureDataNodeStarter實現了Daemon,作為一個守護進程,我們先看它實現自Daemon的方法:

  @Override
  public void init(DaemonContext context) throws Exception {
    System.err.println("Initializing secure datanode resources");
    // 創建一個新的HdfsConfiguration對象,以確保選中hdfs-site.xml中的配置。
    Configuration conf = new HdfsConfiguration();
    
    // 存儲常規datanode的命令行參數
    args = context.getArguments();
    // 初始化數據節點的特權資源(即特權端口)。
    resources = getSecureResources(conf);
  }

 @Override
  public void start() throws Exception {
    System.err.println("Starting regular datanode initialization");
    // 正常的初始化DataNode
    DataNode.secureMain(args, resources);
  }

  @Override public void destroy() {}
  @Override public void stop() throws Exception { /* Nothing to do */ }

靜態變量

可以看到SecureDataNodeStarter主要作用就是獲取配置信息並存儲起來,然後正常的初始化DateNode時再作為參數傳遞。接下來看看除了命令行參數外都還初始化了哪些參數:

	// 命令行參數
  private String [] args;
  private SecureResources resources;

	// 在安全的環境中存儲datanode操作所需的資源
	public static class SecureResources {
    // 是否啟用sasl
    private final boolean isSaslEnabled;
    // rpc 端口是否為特權端口(端口號小於1024,不允許普通用戶在其上運行服務器)
    // 詳見//www.w3.org/Daemon/User/Installation/PrivilegedPorts.html
    private final boolean isRpcPortPrivileged;
    // http 端口是否為特權端口
    private final boolean isHttpPortPrivileged;
		
  	// 監聽dfs.datanode.address配置的端口的服務器套接字
    private final ServerSocket streamingSocket;
  	// 監聽dfs.datanode.http.address配置的端口的服務器套接字通道
    private final ServerSocketChannel httpServerSocket;

    public SecureResources(ServerSocket streamingSocket, ServerSocketChannel
        httpServerSocket, boolean saslEnabled, boolean rpcPortPrivileged,
        boolean httpPortPrivileged) {
      this.streamingSocket = streamingSocket;
      this.httpServerSocket = httpServerSocket;
      this.isSaslEnabled = saslEnabled;
      this.isRpcPortPrivileged = rpcPortPrivileged;
      this.isHttpPortPrivileged = httpPortPrivileged;
    }

   // getter / setter .... 略
  }

getSecureResources(conf)

接下來看init()中調用的方法getSecureResources(conf),看看SecureResources中的參數都是從哪獲取的。

  //  獲取數據節點的特權資源(即特權端口)。
  //  特權資源由RPC服務器的端口和HTTP(不是HTTPS)服務器的端口組成。
  @VisibleForTesting
  public static SecureResources getSecureResources(Configuration conf)
      throws Exception {
    // 獲取http訪問協議,HTTP_ONLY, HTTPS_ONLY, HTTP_AND_HTTPS
    HttpConfig.Policy policy = DFSUtil.getHttpPolicy(conf);
    // 嘗試構建SaslPropertiesResolver,如果可以即為開啟sasl
    boolean isSaslEnabled =
        DataTransferSaslUtil.getSaslPropertiesResolver(conf) != null;
    boolean isRpcPrivileged;
    boolean isHttpPrivileged = false;

    System.err.println("isSaslEnabled:" + isSaslEnabled);
    // 獲取數據流到datanode的安全端口,創建IP套接字地址
    // 會通過配置項dfs.datanode.address來創建,配置的默認值為:0.0.0.0:9866
    InetSocketAddress streamingAddr  = DataNode.getStreamingAddr(conf);
    // 獲取socket 寫超時時間
    // 配置項為:dfs.datanode.socket.write.timeout, 默認值為:8 * 60 秒
    int socketWriteTimeout = conf.getInt(
        DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY,
        HdfsConstants.WRITE_TIMEOUT);
    // 獲取請求的傳入連接隊列的最大長度。
    // 配置項為ipc.server.listen.queue.size, 默認值為256
    int backlogLength = conf.getInt(
        CommonConfigurationKeysPublic.IPC_SERVER_LISTEN_QUEUE_SIZE_KEY,
        CommonConfigurationKeysPublic.IPC_SERVER_LISTEN_QUEUE_SIZE_DEFAULT);
    // 默認打開ServerSocketChannel進行datanode端口監聽
    ServerSocket ss = (socketWriteTimeout > 0) ?
        ServerSocketChannel.open().socket() : new ServerSocket();
    try {
      // 綁定端口,設置請求的傳入連接隊列的最大長度
      ss.bind(streamingAddr, backlogLength);
    } catch (BindException e) {
      BindException newBe = appendMessageToBindException(e,
          streamingAddr.toString());
      throw newBe;
    }

    // 檢查是否綁定到了正確
    if (ss.getLocalPort() != streamingAddr.getPort()) {
      throw new RuntimeException(
          "Unable to bind on specified streaming port in secure "
              + "context. Needed " + streamingAddr.getPort() + ", got "
              + ss.getLocalPort());
    }

    // 檢查給定端口是否為特權端口。
    // 在unix/linux系統中,小於1024的端口被視為特權端口。
    // 對於其他操作系統,請謹慎使用此方法。
    // 例如,Windows沒有特權端口的概念。
    // 但是,在Windows客戶端上可以用來檢查linux服務器的端口。
    isRpcPrivileged = SecurityUtil.isPrivilegedPort(ss.getLocalPort());
    System.err.println("Opened streaming server at " + streamingAddr);

    //  為web服務器綁定端口。
    //  該代碼打算僅將HTTP服務器綁定到特權端口,因為如果服務器通過SSL進行通信,客戶端可以使用證書對服務器進行身份驗證。
    final ServerSocketChannel httpChannel;
    // 判斷是否允許http訪問
    if (policy.isHttpEnabled()) {
      httpChannel = ServerSocketChannel.open();
      // 確定http服務器的有效地址
      // 通過配置項dfs.datanode.http.address來生成,默認值為:0.0.0.0:9864
      InetSocketAddress infoSocAddr = DataNode.getInfoAddr(conf);
      try {
        httpChannel.socket().bind(infoSocAddr);
      } catch (BindException e) {
        BindException newBe = appendMessageToBindException(e,
            infoSocAddr.toString());
        throw newBe;
      }
      InetSocketAddress localAddr = (InetSocketAddress) httpChannel.socket()
        .getLocalSocketAddress();
      // 校驗httpChannel綁定的地址是否正確
      if (localAddr.getPort() != infoSocAddr.getPort()) {
        throw new RuntimeException("Unable to bind on specified info port in " +
            "secure context. Needed " + infoSocAddr.getPort() + ", got " +
             ss.getLocalPort());
      }
      System.err.println("Successfully obtained privileged resources (streaming port = "
          + ss + " ) (http listener port = " + localAddr.getPort() +")");
      // 判斷端口號是否為特權端口(小於1024)
      isHttpPrivileged = SecurityUtil.isPrivilegedPort(localAddr.getPort());
      System.err.println("Opened info server at " + infoSocAddr);
    } else {
      httpChannel = null;
    }
    // 將獲取到的特權資源封裝成SecureResources
    return new SecureResources(ss, httpChannel, isSaslEnabled,
        isRpcPrivileged, isHttpPrivileged);
  }

至此,SecureDataNodeStarter類的init()方法結束。

繼續看start()方法,可以看到就是正常的傳入init()方法中初始化的配置。

  @Override
  public void start() throws Exception {
    System.err.println("Starting regular datanode initialization");
    DataNode.secureMain(args, resources);
  }

resources參數在datanode中的具體作用見datanode代碼分析

DataNode

dataNode官方注釋反應如下:

DataNode是一個類(和程序),它為DFS部署存儲一組塊。
單個部署可以有一個或多個datanode。
每個DataNode定期與單個NameNode通信。
它還會不時地與客戶機代碼和其他datanode通信。
datanode存儲一系列命名塊。
DataNode允許客戶端代碼讀取這些塊,或者寫入新的塊數據。
DataNode也可以響應來自它的NameNode的指令,刪除塊或從其他DataNode複製塊。
DataNode只維護一個關鍵表:block->這個信息存儲在本地磁盤上。
DataNode會在啟動時以及之後的每隔一段時間向NameNode報告表的內容。
datanode一輩子都在無止境地要求NameNode做點什麼。
NameNode不能直接連接到DataNode;NameNode只是從DataNode調用的函數中返回值。
datanode維護一個開放的服務器套接字,以便客戶端代碼或其他datanode可以讀寫數據。
這個服務器的主機/端口報告給NameNode,然後NameNode將該信息發送給可能感興趣的客戶端或其他datanode。

靜態代碼塊

dataNode的靜態代碼塊與NameNode中相同,用於加載默認的配置文件

  static{
    HdfsConfiguration.init();
  }

mian方法

由上文中SecureDataNodeStarter#start方法可以看到,默認調用的是DataNode#secureMain方法來啟動datanode。而默認的main方法也是調用DataNode#secureMain,接下來具體看看mainsecureMain方法的代碼:

  public static void main(String args[]) {
    // 分析傳入的參數,是否是幫助參數
    if (DFSUtil.parseHelpArgument(args, DataNode.USAGE, System.out, true)) {
      System.exit(0);
    }
		// 調用
    secureMain(args, null);
  }
  public static void secureMain(String args[], SecureResources resources) {
    int errorCode = 0;
    try {
      //打印一些啟動日誌信息
      StringUtils.startupShutdownMessage(DataNode.class, args, LOG);
      // 創建datanode
      DataNode datanode = createDataNode(args, null, resources);
      if (datanode != null) {
        // join各種線程,等待執行結束
        // blockPoolManager.joinAll(); -> BPOfferService#jion -> BPServiceActor#join
        // BPServiceActor: 每個活動或備用namenode要執行的線程:
        // 預註冊與namenode握手, 然後登記, 定期發送心跳到namenode, 處理從namenode接收到的命令
        datanode.join();
      } else {
        errorCode = 1;
      }
    } catch (Throwable e) {
      LOG.error("Exception in secureMain", e);
      terminate(1, e);
    } finally {
      // We need to terminate the process here because either shutdown was called
      // or some disk related conditions like volumes tolerated or volumes required
      // condition was not met. Also, In secure mode, control will go to Jsvc
      // and Datanode process hangs if it does not exit.
      LOG.warn("Exiting Datanode");
      terminate(errorCode);
    }
  }

DataNode#createDataNode

實例化&啟動一個datanode守護進程並等待它完成。

  @VisibleForTesting
  @InterfaceAudience.Private
  public static DataNode createDataNode(String args[], Configuration conf,
      SecureResources resources) throws IOException {
    // 初始化datanode
    DataNode dn = instantiateDataNode(args, conf, resources);
    if (dn != null) {
      // 啟動datanode進程
      dn.runDatanodeDaemon();
    }
    return dn;
  }

先來看看初始化datanode的流程:

DataNode#instantiateDataNode

// 實例化單個datanode對象及其安全資源。這必須通過隨後調用datanodedaemon()來運行。
public static DataNode instantiateDataNode(String args [], Configuration conf,
    SecureResources resources) throws IOException {
  if (conf == null)
    conf = new HdfsConfiguration();
  
  if (args != null) {
    // 解析通用hadoop選項
    GenericOptionsParser hParser = new GenericOptionsParser(conf, args);
    args = hParser.getRemainingArgs();
  }
  // 解析和驗證命令行參數並設置配置參數。
  if (!parseArguments(args, conf)) {
    printUsage(System.err);
    return null;
  }
  // 根據配置dfs.datanode.data.dir 獲取實際的存儲路徑集合
  // StorageLocation: 封裝描述存儲目錄的URI和存儲介質。如果沒有指定存儲介質,則假定默認存儲介質為DISK。
  // 詳細的關於獲取存儲目錄的解析看這篇博文: //blog.csdn.net/Androidlushangderen/article/details/51105876
  Collection<StorageLocation> dataLocations = getStorageLocations(conf);
  // UserGroupInformation: Hadoop的用戶和組信息。
  // 該類封裝了一個JAAS Subject,並提供了確定用戶用戶名和組的方法。
  // 它同時支持Windows、Unix和Kerberos登錄模塊。
  // UserGroupInformation#setConfiguration: 設置UGI的靜態配置。特別是設置安全身份驗證機制和組查找服務。
  UserGroupInformation.setConfiguration(conf);
  // 作為config中指定的主體登錄。將用戶的Kerberos主體名中的$host替換為主機名。 如果是非安全模式-返回。
  SecurityUtil.login(conf, DFS_DATANODE_KEYTAB_FILE_KEY,
      DFS_DATANODE_KERBEROS_PRINCIPAL_KEY, getHostName(conf));
  // 創建DataNode實例
  return makeInstance(dataLocations, conf, resources);
}

DataNode#makeInstance

// 在確保可以創建至少一個給定的數據目錄(以及它們的父目錄,如果需要的話)之後,創建DataNode實例。
static DataNode makeInstance(Collection<StorageLocation> dataDirs,
    Configuration conf, SecureResources resources) throws IOException {
  List<StorageLocation> locations;
  //  StorageLocationChecker: 在DataNode啟動期間封裝存儲位置檢查的實用程序類。其中一些代碼是從DataNode類中提取的。
  StorageLocationChecker storageLocationChecker =
      new StorageLocationChecker(conf, new Timer());
  try {
    // 啟動對提供的存儲卷的檢查,並返回運行正常的卷列表。
    // 為了與現有單元測試兼容,storagellocations將按照與輸入相同的順序返回。
    locations = storageLocationChecker.check(conf, dataDirs);
  } catch (InterruptedException ie) {
    throw new IOException("Failed to instantiate DataNode", ie);
  }
  // 初始化度量系統
  DefaultMetricsSystem.initialize("DataNode");
  // 檢查數據目錄的權限
  assert locations.size() > 0 : "number of data directories should be > 0";
  // 創建DataNode
  return new DataNode(conf, locations, storageLocationChecker, resources);
}

StorageLocationChecker#check

來具體看一下都做了哪些檢查:

  // 啟動對提供的存儲卷的檢查,並返回運行正常的卷列表。
  // 為了與現有單元測試兼容,storagellocations將按照與輸入相同的順序返回。
  // 返回運行正常的卷列表。如果沒有正常運行的卷,則返回一個空列表。
  public List<StorageLocation> check(
      final Configuration conf,
      final Collection<StorageLocation> dataDirs)
      throws InterruptedException, IOException {

    final HashMap<StorageLocation, Boolean> goodLocations =
        new LinkedHashMap<>();
    final Set<StorageLocation> failedLocations = new HashSet<>();
    final Map<StorageLocation, ListenableFuture<VolumeCheckResult>> futures =
        Maps.newHashMap();
    // 獲取本地文件系統。如果沒有就創建一個新的
    final LocalFileSystem localFS = FileSystem.getLocal(conf);
    final CheckContext context = new CheckContext(localFS, expectedPermission);

    // 在所有storagelocation上啟動並行磁盤檢查操作。
    for (StorageLocation location : dataDirs) {
      goodLocations.put(location, true);
      // 對給定的Checkable安排異步檢查。如果檢查計劃成功,則返回ListenableFuture。
      Optional<ListenableFuture<VolumeCheckResult>> olf =
          delegateChecker.schedule(location, context);
      if (olf.isPresent()) {
        futures.put(location, olf.get());
      }
    }

    if (maxVolumeFailuresTolerated >= dataDirs.size()) {
      throw new HadoopIllegalArgumentException("Invalid value configured for "
          + DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY + " - "
          + maxVolumeFailuresTolerated + ". Value configured is >= "
          + "to the number of configured volumes (" + dataDirs.size() + ").");
    }

    final long checkStartTimeMs = timer.monotonicNow();

    // Retrieve the results of the disk checks.
    // 檢索磁盤,檢查磁盤狀態是否健康
    for (Map.Entry<StorageLocation,
             ListenableFuture<VolumeCheckResult>> entry : futures.entrySet()) {

      // Determine how much time we can allow for this check to complete.
      // The cumulative wait time cannot exceed maxAllowedTimeForCheck.
      final long waitSoFarMs = (timer.monotonicNow() - checkStartTimeMs);
      final long timeLeftMs = Math.max(0,
          maxAllowedTimeForCheckMs - waitSoFarMs);
      final StorageLocation location = entry.getKey();

      try {
        final VolumeCheckResult result =
            entry.getValue().get(timeLeftMs, TimeUnit.MILLISECONDS);
        switch (result) {
        case HEALTHY:
          break;
        case DEGRADED:
          LOG.warn("StorageLocation {} appears to be degraded.", location);
          break;
        case FAILED:
          LOG.warn("StorageLocation {} detected as failed.", location);
          failedLocations.add(location);
          goodLocations.remove(location);
          break;
        default:
          LOG.error("Unexpected health check result {} for StorageLocation {}",
              result, location);
        }
      } catch (ExecutionException|TimeoutException e) {
        LOG.warn("Exception checking StorageLocation " + location,
            e.getCause());
        failedLocations.add(location);
        goodLocations.remove(location);
      }
    }

    if (maxVolumeFailuresTolerated == DataNode.MAX_VOLUME_FAILURE_TOLERATED_LIMIT) {
      if (dataDirs.size() == failedLocations.size()) {
        throw new DiskErrorException("Too many failed volumes - "
            + "current valid volumes: " + goodLocations.size()
            + ", volumes configured: " + dataDirs.size()
            + ", volumes failed: " + failedLocations.size()
            + ", volume failures tolerated: " + maxVolumeFailuresTolerated);
      }
    } else {
      if (failedLocations.size() > maxVolumeFailuresTolerated) {
        throw new DiskErrorException("Too many failed volumes - "
            + "current valid volumes: " + goodLocations.size()
            + ", volumes configured: " + dataDirs.size()
            + ", volumes failed: " + failedLocations.size()
            + ", volume failures tolerated: " + maxVolumeFailuresTolerated);
      }
    }

    if (goodLocations.size() == 0) {
      throw new DiskErrorException("All directories in "
          + DFS_DATANODE_DATA_DIR_KEY + " are invalid: "
          + failedLocations);
    }

    return new ArrayList<>(goodLocations.keySet());
  }

DataNode構造方法

// 給定一個配置、一個datadir數組和一個namenode代理,創建DataNode。
  DataNode(final Configuration conf,
           final List<StorageLocation> dataDirs,
           final StorageLocationChecker storageLocationChecker,
           final SecureResources resources) throws IOException {
    // 將配置文件賦值到父類的靜態變量中
    super(conf);
    // 初始化Tracer,與NameNode中此處相比,僅傳入參數有區別
    this.tracer = createTracer(conf);
    // TracerConfigurationManager類提供了通過RPC協議在運行時管理跟蹤器配置的函數。
    this.tracerConfigurationManager =
        new TracerConfigurationManager(DATANODE_HTRACE_PREFIX, conf);
    // FileIoProvider這個類抽象出DataNode執行的各種文件IO操作,
    // 並在每個文件IO之前和之後調用概要分析(用於收集統計數據)和故障注入(用於測試)事件鉤子。
    // 通過DFSConfigKeys啟用剖析和/或錯誤注入事件鉤子,可以將行為注入到這些事件中。
    this.fileIoProvider = new FileIoProvider(conf, this);
    // 初始化卷掃描,BlockScanner負責管理所有的VolumeScanner
    this.blockScanner = new BlockScanner(this);
    // 初始化各種配置參數
    this.lastDiskErrorCheck = 0;
    this.maxNumberOfBlocksToLog = conf.getLong(DFS_MAX_NUM_BLOCKS_TO_LOG_KEY,
        DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT);

    this.usersWithLocalPathAccess = Arrays.asList(
        conf.getTrimmedStrings(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY));
    this.connectToDnViaHostname = conf.getBoolean(
        DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME,
        DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT);
    this.supergroup = conf.get(DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY,
        DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT);
    this.isPermissionEnabled = conf.getBoolean(
        DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY,
        DFSConfigKeys.DFS_PERMISSIONS_ENABLED_DEFAULT);
    this.pipelineSupportECN = conf.getBoolean(
        DFSConfigKeys.DFS_PIPELINE_ECN_ENABLED,
        DFSConfigKeys.DFS_PIPELINE_ECN_ENABLED_DEFAULT);

    confVersion = "core-" +
        conf.get("hadoop.common.configuration.version", "UNSPECIFIED") +
        ",hdfs-" +
        conf.get("hadoop.hdfs.configuration.version", "UNSPECIFIED");
    // DatasetVolumeChecker: 對FsDatasetSpi的每個卷封裝運行磁盤檢查的類,並允許檢索失敗卷的列表。
    // 這分離了最初跨DataNode、FsDatasetImpl和FsVolumeList實現的行為。
    this.volumeChecker = new DatasetVolumeChecker(conf, new Timer());
    // 創建了個ExecutorService,用於執行dataTransfer任務
    // HadoopExecutors:ExecutorService、ScheduledExecutorService實例的工廠方法。這些執行器服務實例提供了額外的功能(例如記錄未捕獲的異常)。
    // DataTransfer:是DataNode的內部類,用於傳輸一個數據塊。這個類將一條數據發送到另一個DataNode。
    this.xferService =
        HadoopExecutors.newCachedThreadPool(new Daemon.DaemonFactory());

    // Determine whether we should try to pass file descriptors to clients.
    // 確定是否應該嘗試將文件描述符傳遞給客戶端。
    if (conf.getBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY,
              HdfsClientConfigKeys.Read.ShortCircuit.DEFAULT)) {
      String reason = DomainSocket.getLoadingFailureReason();
      if (reason != null) {
        LOG.warn("File descriptor passing is disabled because {}", reason);
        this.fileDescriptorPassingDisabledReason = reason;
      } else {
        LOG.info("File descriptor passing is enabled.");
        this.fileDescriptorPassingDisabledReason = null;
      }
    } else {
      this.fileDescriptorPassingDisabledReason =
          "File descriptor passing was not configured.";
      LOG.debug(this.fileDescriptorPassingDisabledReason);
    }
    // 獲取socket工廠,配置項為:hadoop.rpc.socket.factory.class.default,
    // 默認為:org.apache.hadoop.net.StandardSocketFactory
    this.socketFactory = NetUtils.getDefaultSocketFactory(conf);

    try {
      // 獲取本datanode的主機名
      hostName = getHostName(conf);
      LOG.info("Configured hostname is {}", hostName);
      // 啟動datanode
      startDataNode(dataDirs, resources);
    } catch (IOException ie) {
      shutdown();
      throw ie;
    }
    final int dncCacheMaxSize =
        conf.getInt(DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_KEY,
            DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_DEFAULT) ;
    datanodeNetworkCounts =
        CacheBuilder.newBuilder()
            .maximumSize(dncCacheMaxSize)
            .build(new CacheLoader<String, Map<String, Long>>() {
              @Override
              public Map<String, Long> load(String key) throws Exception {
                final Map<String, Long> ret = new HashMap<String, Long>();
                ret.put("networkErrors", 0L);
                return ret;
              }
            });

    initOOBTimeout();
    this.storageLocationChecker = storageLocationChecker;
  }

DataNode#startDataNode

// 此方法使用指定的conf啟動數據節點,如果設置了conf的config_property_simulation屬性,則創建一個模擬的基於存儲的數據節點
void startDataNode(List<StorageLocation> dataDirectories,
                   SecureResources resources
                   ) throws IOException {

  // settings global for all BPs in the Data Node
  this.secureResources = resources;
  synchronized (this) {
    this.dataDirs = dataDirectories;
  }
  // DNConf: 一個簡單的類,封裝了DataNode在啟動時加載的所有配置。
  this.dnConf = new DNConf(this);
  // 檢查secure模式下的配置
  checkSecureConfig(dnConf, getConf(), resources);
  // 檢查DataNode給緩存使用的最大內存量是否在正常範圍
  if (dnConf.maxLockedMemory > 0) {
    if (!NativeIO.POSIX.getCacheManipulator().verifyCanMlock()) {
      throw new RuntimeException(String.format(
          "Cannot start datanode because the configured max locked memory" +
          " size (%s) is greater than zero and native code is not available.",
          DFS_DATANODE_MAX_LOCKED_MEMORY_KEY));
    }
    if (Path.WINDOWS) {
      NativeIO.Windows.extendWorkingSetSize(dnConf.maxLockedMemory);
    } else {
      long ulimit = NativeIO.POSIX.getCacheManipulator().getMemlockLimit();
      if (dnConf.maxLockedMemory > ulimit) {
        throw new RuntimeException(String.format(
          "Cannot start datanode because the configured max locked memory" +
          " size (%s) of %d bytes is more than the datanode's available" +
          " RLIMIT_MEMLOCK ulimit of %d bytes.",
          DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
          dnConf.maxLockedMemory,
          ulimit));
      }
    }
  }
  LOG.info("Starting DataNode with maxLockedMemory = {}",
      dnConf.maxLockedMemory);

  int volFailuresTolerated = dnConf.getVolFailuresTolerated();
  int volsConfigured = dnConf.getVolsConfigured();
  if (volFailuresTolerated < MAX_VOLUME_FAILURE_TOLERATED_LIMIT
      || volFailuresTolerated >= volsConfigured) {
    throw new HadoopIllegalArgumentException("Invalid value configured for "
        + "dfs.datanode.failed.volumes.tolerated - " + volFailuresTolerated
        + ". Value configured is either less than -1 or >= "
        + "to the number of configured volumes (" + volsConfigured + ").");
  }
  // 初始化DataStorage:數據存儲信息文件。
  // 本地存儲信息存儲在一個單獨的文件VERSION中。
  // 包含節點類型、存儲布局版本、命名空間id、fs狀態創建時間。
  // 本地存儲可以位於多個目錄中。每個目錄應該包含與其他目錄相同的VERSION文件。
  // 在啟動期間Hadoop服務器(name-node和data-node)從它們讀取本地存儲信息。
  // 服務器在運行時對每個存儲目錄持有一個鎖,這樣其他節點就不能在啟動時共享相同的存儲。
  // 當服務器停止(正常或異常)時,鎖將被釋放。
  storage = new DataStorage();
  
  // global DN settings
  // 註冊JMX,JMX介紹看着篇: //www.liaoxuefeng.com/wiki/1252599548343744/1282385687609378
  registerMXBean();
  // 初始化DataXceiver(流式通信),DataNode runDatanodeDaemon()中啟動
  initDataXceiver();
  // 啟動InfoServer
  startInfoServer();
  // 啟動JVMPauseMonitor(反向監控JVM情況,可通過JMX查詢)
  pauseMonitor = new JvmPauseMonitor();
  pauseMonitor.init(getConf());
  pauseMonitor.start();

  // BlockPoolTokenSecretManager is required to create ipc server.
  //  BlockPoolTokenSecretManager: 管理每個塊池的BlockTokenSecretManager。將給定塊池Id的請求路由到相應的BlockTokenSecretManager
  this.blockPoolTokenSecretManager = new BlockPoolTokenSecretManager();

  // Login is done by now. Set the DN user name.
  dnUserName = UserGroupInformation.getCurrentUser().getUserName();
  LOG.info("dnUserName = {}", dnUserName);
  LOG.info("supergroup = {}", supergroup);
  // 初始化IpcServer(RPC通信),DataNode-runDatanodeDaemon()中啟動
  initIpcServer();

  metrics = DataNodeMetrics.create(getConf(), getDisplayName());
  peerMetrics = dnConf.peerStatsEnabled ?
      DataNodePeerMetrics.create(getDisplayName(), getConf()) : null;
  metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);

  ecWorker = new ErasureCodingWorker(getConf(), this);
  blockRecoveryWorker = new BlockRecoveryWorker(this);
  // 按照namespace(nameservice)、namenode的結構進行初始化
  blockPoolManager = new BlockPoolManager(this);
  // 心跳管理
  blockPoolManager.refreshNamenodes(getConf());

  // Create the ReadaheadPool from the DataNode context so we can
  // exit without having to explicitly shutdown its thread pool.
  readaheadPool = ReadaheadPool.getInstance();
  saslClient = new SaslDataTransferClient(dnConf.getConf(),
      dnConf.saslPropsResolver, dnConf.trustedChannelResolver);
  saslServer = new SaslDataTransferServer(dnConf, blockPoolTokenSecretManager);
  startMetricsLogger();

  if (dnConf.diskStatsEnabled) {
    diskMetrics = new DataNodeDiskMetrics(this,
        dnConf.outliersReportIntervalMs);
  }
}

DataNode#checkSecureConfig

先看看checkSecureConfig(dnConf, getConf(), resources);方法具體檢測了什麼,又如何使用了傳入的resource參數:

// 如果啟用了安全性,檢查DataNode是否有安全配置。有兩種可能的配置是安全的:
// 1. 服務器已經通過SecureDataNodeStarter綁定到RPC和HTTP的特權端口。
// 2. 該配置對HTTP服務器的DataTransferProtocol和HTTPS(無明文HTTP)啟用SASL。
// SASL握手保證了RPC服務器在客戶端傳輸一個秘密(比如塊訪問令牌)之前的身份驗證。
// 類似地,SSL在客戶端傳輸秘密(比如委託令牌)之前保證HTTP服務器的身份驗證。

// 不可能同時在DataTransferProtocol上運行特權端口和SASL。
// 為了向後兼容,連接邏輯必須檢查目標端口是否為特權端口,如果是,跳過SASL握手。
private static void checkSecureConfig(DNConf dnConf, Configuration conf,
    SecureResources resources) throws RuntimeException {
  if (!UserGroupInformation.isSecurityEnabled()) {
    return;
  }

  // Abort out of inconsistent state if Kerberos is enabled but block access tokens are not enabled.
  // 如果啟用了Kerberos,但沒有啟用塊訪問令牌,則退出不一致狀態
  boolean isEnabled = conf.getBoolean(
      DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY,
      DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_DEFAULT);
  if (!isEnabled) {
    String errMessage = "Security is enabled but block access tokens " +
        "(via " + DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY + ") " +
        "aren't enabled. This may cause issues " +
        "when clients attempt to connect to a DataNode. Aborting DataNode";
    throw new RuntimeException(errMessage);
  }
  // 如果配置設置為跳過安全集群中正確端口配置的檢查,則返回true。這只用於開發測試。
  if (dnConf.getIgnoreSecurePortsForTesting()) {
    return;
  }

  if (resources != null) {
    // 特權端口或配置HTTPS_ONLY
    final boolean httpSecured = resources.isHttpPortPrivileged()
        || DFSUtil.getHttpPolicy(conf) == HttpConfig.Policy.HTTPS_ONLY;
    // 特權端口或配置開啟sasl
    final boolean rpcSecured = resources.isRpcPortPrivileged()
        || resources.isSaslEnabled();

    // Allow secure DataNode to startup if:
    // 1. Http is secure.
    // 2. Rpc is secure
    if (rpcSecured && httpSecured) {
      return;
    }
  } else {
    // Handle cases when SecureDataNodeStarter#getSecureResources is not invoked
    // 處理SecureDataNodeStarter#getSecureResources未被調用的情況
    SaslPropertiesResolver saslPropsResolver = dnConf.getSaslPropsResolver();
    if (saslPropsResolver != null &&
        DFSUtil.getHttpPolicy(conf) == HttpConfig.Policy.HTTPS_ONLY) {
      return;
    }
  }

  throw new RuntimeException("Cannot start secure DataNode due to incorrect "
      + "config. See //cwiki.apache.org/confluence/display/HADOOP/"
      + "Secure+DataNode for details.");
}

DataNode#initDataXceiver

private void initDataXceiver() throws IOException {
    // find free port or use privileged port provided
    TcpPeerServer tcpPeerServer;
    if (secureResources != null) {
      // 通過secureResources中的streamingSocket創建TcpPeerServer
      tcpPeerServer = new TcpPeerServer(secureResources);
    } else {
      int backlogLength = getConf().getInt(
          CommonConfigurationKeysPublic.IPC_SERVER_LISTEN_QUEUE_SIZE_KEY,
          CommonConfigurationKeysPublic.IPC_SERVER_LISTEN_QUEUE_SIZE_DEFAULT);
      tcpPeerServer = new TcpPeerServer(dnConf.socketWriteTimeout,
          DataNode.getStreamingAddr(getConf()), backlogLength);
    }
    if (dnConf.getTransferSocketRecvBufferSize() > 0) {
      tcpPeerServer.setReceiveBufferSize(
          dnConf.getTransferSocketRecvBufferSize());
    }
    streamingAddr = tcpPeerServer.getStreamingAddr();
    LOG.info("Opened streaming server at {}", streamingAddr);
    // 構造一個新的線程組。這個新組的父線程組是當前運行線程的線程組。
    this.threadGroup = new ThreadGroup("dataXceiverServer");
    // DataXceiverServer: 用於接收/發送數據塊的服務器。
    // 創建它是為了偵聽來自客戶端或其他datanode的請求。這個小服務器不使用Hadoop IPC機制。
    xserver = new DataXceiverServer(tcpPeerServer, getConf(), this);
    // DN用來接收客戶端和其他DN發送過來的數據服務,並為每個請求創建一個工作線程以進行請求的響應
    this.dataXceiverServer = new Daemon(threadGroup, xserver);
    this.threadGroup.setDaemon(true); // auto destroy when empty

    if (getConf().getBoolean(
        HdfsClientConfigKeys.Read.ShortCircuit.KEY,
        HdfsClientConfigKeys.Read.ShortCircuit.DEFAULT) ||
        getConf().getBoolean(
            HdfsClientConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC,
            HdfsClientConfigKeys
              .DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT)) {
      DomainPeerServer domainPeerServer =
                getDomainPeerServer(getConf(), streamingAddr.getPort());
      if (domainPeerServer != null) {
        this.localDataXceiverServer = new Daemon(threadGroup,
            new DataXceiverServer(domainPeerServer, getConf(), this));
        LOG.info("Listening on UNIX domain socket: {}",
            domainPeerServer.getBindPath());
      }
    }
    this.shortCircuitRegistry = new ShortCircuitRegistry(getConf());
  }

DataNode#createDataNode

接着回到DataNode#createDataNode方法中,繼續看啟動datanode的流程dn.runDatanodeDaemon();

  public void runDatanodeDaemon() throws IOException {
    blockPoolManager.startAll();

    // start dataXceiveServer
    dataXceiverServer.start();
    if (localDataXceiverServer != null) {
      localDataXceiverServer.start();
    }
    ipcServer.setTracer(tracer);
    ipcServer.start();
    startPlugins(getConf());
  }