源碼|HDFS之DataNode:?jiǎn)?dòng)過(guò)程

掌握Mac編譯Hadoop源碼Hadoop單步debug追源碼后,就能告別人肉調(diào)用棧撒会,利用IDE輕松愉快的追各種開(kāi)源框架的源碼啦~

今天是HDFS中DataNode的第一篇——DataNode啟動(dòng)過(guò)程乳乌。

源碼版本:Apache Hadoop 2.6.0

可參考猴子追源碼時(shí)的速記打斷點(diǎn)翻翩,親自debug一遍实牡。

開(kāi)始之前

總覽

HDFS-2.x與1.x的核心區(qū)別:

  • 為支持Federation贱傀,會(huì)為每個(gè)namespace(或稱(chēng)nameservice)創(chuàng)建BPOfferService(提供BlockPool服務(wù))
  • 為支持HA,BPOfferService還會(huì)為一個(gè)namespace下的每個(gè)namenode創(chuàng)建BPServiceActor(作為具體實(shí)例與各active胎撤、standby的namenode通信晓殊;一個(gè)BPOfferService下只有一個(gè)active的BPServiceActor)

datanode的啟動(dòng)過(guò)程主要完成以下工作:

  • 啟動(dòng)多種工作線程,主要包括:
    • 通信:BPServiceActor伤提、IpcServer巫俺、DataXceiverServer、LocalDataXceiverServer
    • 監(jiān)控:DataBlockScanner肿男、DirectoryScanner介汹、JVMPauseMonitor
    • 其他:InfoServer
  • 向namenode注冊(cè)
  • 初始化存儲(chǔ)結(jié)構(gòu),包括各數(shù)據(jù)目錄${dfs.datanode.data.dir}舶沛,及數(shù)據(jù)目錄下各塊池的存儲(chǔ)結(jié)構(gòu)
  • 【可能】數(shù)據(jù)塊恢復(fù)等(暫不討論)

LazyWriter等特性暫不討論嘹承。

文章的組織結(jié)構(gòu)

  1. 如果只涉及單個(gè)分支的分析,則放在同一節(jié)如庭。
  2. 如果涉及多個(gè)分支的分析叹卷,則在下一級(jí)分多個(gè)節(jié),每節(jié)討論一個(gè)分支坪它。
  3. 多線程的分析同多分支骤竹。
  4. 每一個(gè)分支和線程的組織結(jié)構(gòu)遵循規(guī)則1-3。

主流程

datanode的Main Class是DataNode哟楷,先找DataNode.main():

  public static void main(String args[]) {
    if (DFSUtil.parseHelpArgument(args, DataNode.USAGE, System.out, true)) {
      System.exit(0);
    }

    secureMain(args, null);
  }
  
  ...
  
  public static void secureMain(String args[], SecureResources resources) {
    int errorCode = 0;
    try {
      // 打印啟動(dòng)信息
      StringUtils.startupShutdownMessage(DataNode.class, args, LOG);
      // 完成創(chuàng)建datanode的主要工作
      DataNode datanode = createDataNode(args, null, resources);
      if (datanode != null) {
        datanode.join();
      } else {
        errorCode = 1;
      }
    } catch (Throwable e) {
      LOG.fatal("Exception in secureMain", e);
      terminate(1, e);
    } finally {
      LOG.warn("Exiting Datanode");
      terminate(errorCode);
    }
  }

datanode封裝了非常多工作線程瘤载,但絕大多數(shù)是守護(hù)線程,DataNode#join()只需要等待BPServiceActor線程結(jié)束卖擅,就可以正常退出(略)鸣奔。

DataNode.createDataNode():

  public static DataNode createDataNode(String args[], Configuration conf,
      SecureResources resources) throws IOException {
    // 完成大部分初始化的工作窃判,并啟動(dòng)部分工作線程
    DataNode dn = instantiateDataNode(args, conf, resources);
    if (dn != null) {
      // 啟動(dòng)剩余工作線程
      dn.runDatanodeDaemon();
    }
    return dn;
  }
  • 在DataNode.instantiateDataNode()執(zhí)行的過(guò)程中會(huì)啟動(dòng)部分工作線程(見(jiàn)后)
  • DataNode#runDatanodeDaemon()啟動(dòng)剩余的DataXceiverServer放椰、localDataXceiverServer、IpcServer等:
  /** Start a single datanode daemon and wait for it to finish.
   *  If this thread is specifically interrupted, it will stop waiting.
   */
  public void runDatanodeDaemon() throws IOException {
    // 在DataNode.instantiateDataNode()執(zhí)行過(guò)程中會(huì)調(diào)用該方法(見(jiàn)后)
    blockPoolManager.startAll();

    dataXceiverServer.start();
    if (localDataXceiverServer != null) {
      localDataXceiverServer.start();
    }
    ipcServer.start();
    startPlugins(conf);
  }

回到DataNode.instantiateDataNode():

  public static DataNode instantiateDataNode(String args [], Configuration conf,
      SecureResources resources) throws IOException {
    if (conf == null)
      conf = new HdfsConfiguration();
    
    ... // 參數(shù)檢查等
    
    Collection<StorageLocation> dataLocations = getStorageLocations(conf);
    UserGroupInformation.setConfiguration(conf);
    SecurityUtil.login(conf, DFS_DATANODE_KEYTAB_FILE_KEY,
        DFS_DATANODE_KERBEROS_PRINCIPAL_KEY);
    return makeInstance(dataLocations, conf, resources);
  }

dataLocations維護(hù)的是全部${dfs.datanode.data.dir}河闰,猴子只配置了一個(gè)目錄断楷,實(shí)際使用中會(huì)在將每塊磁盤(pán)都掛載為一塊目錄锨匆。

從DataNode.makeInstance()開(kāi)始創(chuàng)建DataNode:

  static DataNode makeInstance(Collection<StorageLocation> dataDirs,
      Configuration conf, SecureResources resources) throws IOException {
    ...// 檢查數(shù)據(jù)目錄的權(quán)限

    assert locations.size() > 0 : "number of data directories should be > 0";
    return new DataNode(conf, locations, resources);
  }
  
  ...
  
  DataNode(final Configuration conf,
           final List<StorageLocation> dataDirs,
           final SecureResources resources) throws IOException {
    super(conf);
    ...// 參數(shù)設(shè)置
    
    try {
      hostName = getHostName(conf);
      LOG.info("Configured hostname is " + hostName);
      startDataNode(conf, dataDirs, resources);
    } catch (IOException ie) {
      shutdown();
      throw ie;
    }
  }
  
  ...
  
  void startDataNode(Configuration conf, 
                     List<StorageLocation> dataDirs,
                     SecureResources resources
                     ) throws IOException {
    ...// 參數(shù)設(shè)置
    
    // 初始化DataStorage
    storage = new DataStorage();
    
    // global DN settings
    // 注冊(cè)JMX
    registerMXBean();
    // 初始化DataXceiver(流式通信),DataNode#runDatanodeDaemon()中啟動(dòng)
    initDataXceiver(conf);
    // 啟動(dòng)InfoServer(Web UI)
    startInfoServer(conf);
    // 啟動(dòng)JVMPauseMonitor(反向監(jiān)控JVM情況冬筒,可通過(guò)JMX查詢(xún))
    pauseMonitor = new JvmPauseMonitor(conf);
    pauseMonitor.start();
  
    ...// 略
    
    // 初始化IpcServer(RPC通信)恐锣,DataNode#runDatanodeDaemon()中啟動(dòng)
    initIpcServer(conf);

    metrics = DataNodeMetrics.create(conf, getDisplayName());
    metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);
    
    // 按照namespace(nameservice)、namenode的二級(jí)結(jié)構(gòu)進(jìn)行初始化
    blockPoolManager = new BlockPoolManager(this);
    blockPoolManager.refreshNamenodes(conf);
    
    ...// 略
  }

BlockPoolManager抽象了datanode提供的數(shù)據(jù)塊存儲(chǔ)服務(wù)舞痰。BlockPoolManager按照namespace(nameservice)土榴、namenode二級(jí)結(jié)構(gòu)組織,此處按照該二級(jí)結(jié)構(gòu)進(jìn)行初始化响牛。

重點(diǎn)是BlockPoolManager#refreshNamenodes():

  void refreshNamenodes(Configuration conf)
      throws IOException {
    LOG.info("Refresh request received for nameservices: " + conf.get
            (DFSConfigKeys.DFS_NAMESERVICES));

    Map<String, Map<String, InetSocketAddress>> newAddressMap = DFSUtil
            .getNNServiceRpcAddressesForCluster(conf);

    synchronized (refreshNamenodesLock) {
      doRefreshNamenodes(newAddressMap);
    }
  }

命名為刷新玷禽,是因?yàn)槌顺跏蓟^(guò)程主動(dòng)調(diào)用赫段,還可以由namespace通過(guò)datanode心跳過(guò)程下達(dá)刷新命令。

newAddressMap是這樣一個(gè)映射:Map<namespace, Map<namenode, InetSocketAddress>>矢赁。

BlockPoolManager#doRefreshNamenodes():

  private void doRefreshNamenodes(
      Map<String, Map<String, InetSocketAddress>> addrMap) throws IOException {
    assert Thread.holdsLock(refreshNamenodesLock);

    Set<String> toRefresh = Sets.newLinkedHashSet();
    Set<String> toAdd = Sets.newLinkedHashSet();
    Set<String> toRemove;
    
    synchronized (this) {
      // Step 1. For each of the new nameservices, figure out whether
      // it's an update of the set of NNs for an existing NS,
      // or an entirely new nameservice.
      for (String nameserviceId : addrMap.keySet()) {
        if (bpByNameserviceId.containsKey(nameserviceId)) {
          toRefresh.add(nameserviceId);
        } else {
          toAdd.add(nameserviceId);
        }
      }
      
      ...// 略
      
      // Step 3. Start new nameservices
      if (!toAdd.isEmpty()) {
        LOG.info("Starting BPOfferServices for nameservices: " +
            Joiner.on(",").useForNull("<default>").join(toAdd));
      
        for (String nsToAdd : toAdd) {
          ArrayList<InetSocketAddress> addrs =
            Lists.newArrayList(addrMap.get(nsToAdd).values());
          // 為每個(gè)namespace創(chuàng)建對(duì)應(yīng)的BPOfferService
          BPOfferService bpos = createBPOS(addrs);
          bpByNameserviceId.put(nsToAdd, bpos);
          offerServices.add(bpos);
        }
      }
      // 然后通過(guò)startAll啟動(dòng)所有BPOfferService
      startAll();
    }
    
    ...// 略
  }

addrMap即傳入的newAddressMap糯笙。Step 3為每個(gè)namespace創(chuàng)建對(duì)應(yīng)的BPOfferService(包括每個(gè)namenode對(duì)應(yīng)的BPServiceActor),然后通過(guò)BlockPoolManager#startAll()啟動(dòng)所有BPOfferService(實(shí)際是啟動(dòng)所有
BPServiceActor)撩银。

BlockPoolManager#createBPOS()

BlockPoolManager#createBPOS():

  protected BPOfferService createBPOS(List<InetSocketAddress> nnAddrs) {
    return new BPOfferService(nnAddrs, dn);
  }

BPOfferService.<init>

  BPOfferService(List<InetSocketAddress> nnAddrs, DataNode dn) {
    Preconditions.checkArgument(!nnAddrs.isEmpty(),
        "Must pass at least one NN.");
    this.dn = dn;

    for (InetSocketAddress addr : nnAddrs) {
      this.bpServices.add(new BPServiceActor(addr, this));
    }
  }

BPOfferService通過(guò)bpServices維護(hù)同一個(gè)namespace下各namenode對(duì)應(yīng)的BPServiceActor给涕。

BlockPoolManager#startAll()

BlockPoolManager#startAll():

  synchronized void startAll() throws IOException {
    try {
      UserGroupInformation.getLoginUser().doAs(
          new PrivilegedExceptionAction<Object>() {
            @Override
            public Object run() throws Exception {
              for (BPOfferService bpos : offerServices) {
                bpos.start();
              }
              return null;
            }
          });
    } catch (InterruptedException ex) {
      IOException ioe = new IOException();
      ioe.initCause(ex.getCause());
      throw ioe;
    }
  }

逐個(gè)調(diào)用BPOfferService#start(),啟動(dòng)BPOfferService:

  //This must be called only by blockPoolManager
  void start() {
    for (BPServiceActor actor : bpServices) {
      actor.start();
    }
  }

逐個(gè)調(diào)用BPServiceActor#start()蜒蕾,啟動(dòng)BPServiceActor:

  //This must be called only by BPOfferService
  void start() {
    // 保證BPServiceActor線程只啟動(dòng)一次
    if ((bpThread != null) && (bpThread.isAlive())) {
      return;
    }
    bpThread = new Thread(this, formatThreadName());
    bpThread.setDaemon(true); // needed for JUnit testing
    bpThread.start();
  }

BPServiceActor#start()的線程安全性由最外層的BlockPoolManager#startAll()(synchronized方法)保證稠炬。

在完成datanode的初始化后焕阿,DataNode#runDatanodeDaemon()中又調(diào)用了一次BlockPoolManager#startAll()咪啡。猴子沒(méi)明白這次調(diào)用的作用,但BlockPoolManager#startAll()的內(nèi)部邏輯保證其只會(huì)被執(zhí)行一次暮屡,沒(méi)造成什么壞影響撤摸。

主流程小結(jié)

在datanode啟動(dòng)的主流程中,啟動(dòng)多種重要的工作線程褒纲,包括:

  • 通信:BPServiceActor准夷、IpcServer、DataXceiverServer莺掠、LocalDataXceiverServer
  • 監(jiān)控:JVMPauseMonitor
  • 其他:InfoServer

接下來(lái)討論BPServiceActor線程衫嵌,它的主要工作是:

  • 向namonode注冊(cè)
  • 啟動(dòng)DataBlockScanner、DirectoryScanner等工作線程
  • 存儲(chǔ)結(jié)構(gòu)初始化

BPServiceActor線程

在datanode啟動(dòng)的主流程中彻秆,啟動(dòng)了多種工作線程楔绞,包括InfoServer、JVMPauseMonitor唇兑、BPServiceActor等酒朵。其中,最重要的是BPServiceActor線程扎附,真正代表datanode與namenode通信的正是BPServiceActor線程蔫耽。

BPServiceActor#run():

  @Override
  public void run() {
    LOG.info(this + " starting to offer service");

    try {
      while (true) {
        // init stuff
        try {
          // 與namonode握手,注冊(cè)
          connectToNNAndHandshake();
          break;
        } catch (IOException ioe) {
          ...// 大部分握手失敗的情況都需要重試留夜,除非拋出了非IOException異吵渍。或datanode關(guān)閉
        }
      }

      runningState = RunningState.RUNNING;

      while (shouldRun()) {
        try {
          // BPServiceActor提供的服務(wù)
          offerService();
        } catch (Exception ex) {
          ...// 不管拋出任何異常,都持續(xù)提供服務(wù)(包括心跳碍粥、數(shù)據(jù)塊匯報(bào)等)鳖眼,直到datanode關(guān)閉
        }
      }
      runningState = RunningState.EXITED;
    } catch (Throwable ex) {
      LOG.warn("Unexpected exception in block pool " + this, ex);
      runningState = RunningState.FAILED;
    } finally {
      LOG.warn("Ending block pool service for: " + this);
      cleanUp();
    }
  }

此處說(shuō)的“通信”包括與握手、注冊(cè)(BPServiceActor#connectToNNAndHandshake)和后期循環(huán)提供服務(wù)(BPServiceActor#offerService()即纲,本文暫不討論)具帮。

啟動(dòng)過(guò)程中主要關(guān)注BPServiceActor#connectToNNAndHandshake():

  private void connectToNNAndHandshake() throws IOException {
    // get NN proxy
    bpNamenode = dn.connectToNN(nnAddr);

    // 先通過(guò)第一次握手獲得namespace的信息
    NamespaceInfo nsInfo = retrieveNamespaceInfo();
    
    // 然后驗(yàn)證并初始化該datanode上的BlockPool
    bpos.verifyAndSetNamespaceInfo(nsInfo);
    
    // 最后博肋,通過(guò)第二次握手向各namespace注冊(cè)自己
    register();
  }

通過(guò)兩次握手完成了datanode的注冊(cè),比較簡(jiǎn)單蜂厅,不討論匪凡。

重點(diǎn)是BPOfferService#verifyAndSetNamespaceInfo():

  /**
   * Called by the BPServiceActors when they handshake to a NN.
   * If this is the first NN connection, this sets the namespace info
   * for this BPOfferService. If it's a connection to a new NN, it
   * verifies that this namespace matches (eg to prevent a misconfiguration
   * where a StandbyNode from a different cluster is specified)
   */
  void verifyAndSetNamespaceInfo(NamespaceInfo nsInfo) throws IOException {
    writeLock();
    try {
      if (this.bpNSInfo == null) {
        // 如果是第一次連接namenode(也就必然是第一次連接namespace),則初始化blockpool(塊池)
        this.bpNSInfo = nsInfo;
        boolean success = false;

        try {
          // 以BPOfferService為單位初始化blockpool
          dn.initBlockPool(this);
          success = true;
        } finally {
          if (!success) {
            // 如果一個(gè)BPServiceActor線程失敗了掘猿,還可以由同BPOfferService的其他BPServiceActor線程重新嘗試
            this.bpNSInfo = null;
          }
        }
      } else {
        ...// 如果不是第一次連接(刷新)病游,則檢查下信息是否正確即可
      }
    } finally {
      writeUnlock();
    }
  }

盡管是在BPServiceActor線程中,卻試圖以BPOfferService為單位初始化blockpool(包括內(nèi)存與磁盤(pán)上的存儲(chǔ)結(jié)構(gòu))稠通。如果初始化成功衬衬,萬(wàn)事大吉,以后同BPOfferService的其他BPServiceActor線程發(fā)現(xiàn)BPOfferService#bpNSInfo != null就不再初始化改橘;而如果一個(gè)BPServiceActor線程初始化blockpool失敗了滋尉,還可以由同BPOfferService的其他BPServiceActor線程重新嘗試初始化。

DataNode#initBlockPool():

  /**
   * One of the Block Pools has successfully connected to its NN.
   * This initializes the local storage for that block pool,
   * checks consistency of the NN's cluster ID, etc.
   * 
   * If this is the first block pool to register, this also initializes
   * the datanode-scoped storage.
   * 
   * @param bpos Block pool offer service
   * @throws IOException if the NN is inconsistent with the local storage.
   */
  void initBlockPool(BPOfferService bpos) throws IOException {
    ...// 略

    // 將blockpool注冊(cè)到BlockManager
    blockPoolManager.addBlockPool(bpos);
    
    // 初步初始化存儲(chǔ)結(jié)構(gòu)
    initStorage(nsInfo);

    ...// 檢查磁盤(pán)損壞

    // 啟動(dòng)掃描器
    initPeriodicScanners(conf);
    
    // 將blockpool添加到FsDatasetIpml飞主,并繼續(xù)初始化存儲(chǔ)結(jié)構(gòu)
    data.addBlockPool(nsInfo.getBlockPoolID(), conf);
  }

此時(shí)可知狮惜,blockpool是按照namespace逐個(gè)初始化的。這很必要碌识,因?yàn)橐С諪ederation的話碾篡,就必須讓多個(gè)namespace既能共用BlockManager提供的數(shù)據(jù)塊存儲(chǔ)服務(wù),又能獨(dú)立啟動(dòng)筏餐、關(guān)閉开泽、升級(jí)、回滾等魁瞪。

DataNode#initStorage()

在逐個(gè)初始化blockpool之前穆律,先以datanode整體進(jìn)行初始化。這一階段操作的主要對(duì)象是DataStorage佩番、StorageDirectory众旗、FsDatasetImpl、FsVolumeList趟畏、FsVolumeImpl等贡歧;后面的FsDatasetImpl#addBlockPool操作的主要對(duì)象才會(huì)具體到各blockpool。

DataNode#initStorage():

  private void initStorage(final NamespaceInfo nsInfo) throws IOException {
    final FsDatasetSpi.Factory<? extends FsDatasetSpi<?>> factory
        = FsDatasetSpi.Factory.getFactory(conf);
    
    if (!factory.isSimulated()) {
      ...// 構(gòu)造參數(shù)
      // 初始化DataStorage(每個(gè)datanode分別只持有一個(gè))赋秀±洌可能會(huì)觸發(fā)DataStorage級(jí)別的狀態(tài)裝換,因此猎莲,要在DataNode上加鎖
      synchronized (this) {
        storage.recoverTransitionRead(this, bpid, nsInfo, dataDirs, startOpt);
      }
      final StorageInfo bpStorage = storage.getBPStorage(bpid);
      LOG.info("Setting up storage: nsid=" + bpStorage.getNamespaceID()
          + ";bpid=" + bpid + ";lv=" + storage.getLayoutVersion()
          + ";nsInfo=" + nsInfo + ";dnuuid=" + storage.getDatanodeUuid());
    }

    ...// 檢查

    // 初始化FsDatasetImpl(同上绍弟,每個(gè)datanode分別只持有一個(gè))
    synchronized(this)  {
      if (data == null) {
        data = factory.newInstance(this, storage, conf);
      }
    }
  }

初始化DataStorage:DataStorage#recoverTransitionRead()

DataStorage#recoverTransitionRead():

  void recoverTransitionRead(DataNode datanode, String bpID, NamespaceInfo nsInfo,
      Collection<StorageLocation> dataDirs, StartupOption startOpt) throws IOException {
    // First ensure datanode level format/snapshot/rollback is completed
    recoverTransitionRead(datanode, nsInfo, dataDirs, startOpt);

    // Create list of storage directories for the block pool
    Collection<File> bpDataDirs = new ArrayList<File>();
    for(StorageLocation dir : dataDirs) {
      File dnRoot = dir.getFile();
      File bpRoot = BlockPoolSliceStorage.getBpRoot(bpID, new File(dnRoot,
          STORAGE_DIR_CURRENT));
      bpDataDirs.add(bpRoot);
    }

    // 在各${dfs.datanode.data.dir}/current下檢查并創(chuàng)建blockpool目錄
    makeBlockPoolDataDir(bpDataDirs, null);
    
    // 創(chuàng)建BlockPoolSliceStorage,并放入映射DataStorage#bpStorageMap:`Map<bpid, BlockPoolSliceStorage>`
    BlockPoolSliceStorage bpStorage = new BlockPoolSliceStorage(
        nsInfo.getNamespaceID(), bpID, nsInfo.getCTime(), nsInfo.getClusterID());
    bpStorage.recoverTransitionRead(datanode, nsInfo, bpDataDirs, startOpt);
    addBlockPoolStorage(bpID, bpStorage);
  }

根據(jù)Javadoc著洼,BlockPoolSliceStorage管理著該datanode上相同bpid的所有BlockPoolSlice樟遣。然而而叼,猴子暫時(shí)沒(méi)有發(fā)現(xiàn)這個(gè)類(lèi)與升級(jí)外的操作有關(guān)(當(dāng)然,啟動(dòng)也可能是由于升級(jí)重啟)豹悬,暫不深入葵陵。

  • BlockPoolSlice詳見(jiàn)后文FsVolumeImpl#addBlockPool。
  • DataStorage#recoverTransitionRead()瞻佛、BlockPoolSliceStorage#recoverTransitionRead()與數(shù)據(jù)節(jié)點(diǎn)恢復(fù)的關(guān)系非常大脱篙,猴子暫時(shí)還沒(méi)看懂,以后回來(lái)補(bǔ)充伤柄。

初始化FsDatasetImpl:FsDatasetFactory#newInstance()

FsDatasetFactory#newInstance():

  public FsDatasetImpl newInstance(DataNode datanode,
      DataStorage storage, Configuration conf) throws IOException {
    return new FsDatasetImpl(datanode, storage, conf);
  }

FsDatasetImpl.<init>()

  FsDatasetImpl(DataNode datanode, DataStorage storage, Configuration conf
      ) throws IOException {
    ...// 檢查绊困,設(shè)置參數(shù)等

    @SuppressWarnings("unchecked")
    final VolumeChoosingPolicy<FsVolumeImpl> blockChooserImpl =
        ReflectionUtils.newInstance(conf.getClass(
            DFSConfigKeys.DFS_DATANODE_FSDATASET_VOLUME_CHOOSING_POLICY_KEY,
            RoundRobinVolumeChoosingPolicy.class,
            VolumeChoosingPolicy.class), conf);
    volumes = new FsVolumeList(volsFailed, blockChooserImpl);
    
    ...// 略

    // 每一個(gè)Storagedirectory都對(duì)應(yīng)一個(gè)卷FsVolumeImpl,需要將這些卷添加到FsVolumeList中
    for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
      addVolume(dataLocations, storage.getStorageDir(idx));
    }
    
    ...// 設(shè)置lazyWriter适刀、cacheManager等
  }
  
  ...
  
  private void addVolume(Collection<StorageLocation> dataLocations,
      Storage.StorageDirectory sd) throws IOException {
    // 使用`${dfs.datanode.data.dir}/current`目錄
    final File dir = sd.getCurrentDir();
    final StorageType storageType =
        getStorageTypeFromLocations(dataLocations, sd.getRoot());

    FsVolumeImpl fsVolume = new FsVolumeImpl(
        this, sd.getStorageUuid(), dir, this.conf, storageType);
    
    ...// 略
    
    volumes.addVolume(fsVolume);
    
    ...// 略
    
    LOG.info("Added volume - " + dir + ", StorageType: " + storageType);
  }

初始化DataStorage的過(guò)程中秤朗,將各${dfs.datanode.data.dir}放入了storage(即DataNode#storage)。對(duì)于datanode來(lái)說(shuō)蔗彤,${dfs.datanode.data.dir}/current目錄就是要添加的卷FsVolumeImpl川梅。

FsDatasetImpl#initPeriodicScanners()

FsDatasetImpl#initPeriodicScanners()(名為初始化,實(shí)為啟動(dòng)):

  private void initPeriodicScanners(Configuration conf) {
    initDataBlockScanner(conf);
    initDirectoryScanner(conf);
  }

初始化并啟動(dòng)DataBlockScanner然遏、DirectoryScanners

命名為init或許是考慮到有可能禁用了數(shù)據(jù)塊和目錄的掃描器吧彪,導(dǎo)致經(jīng)過(guò)FsDatasetImpl#initPeriodicScanners方法后待侵,掃描器并沒(méi)有啟動(dòng)。但仍然給人造成了誤解姨裸。

FsDatasetImpl#addBlockPool()

FsDatasetImpl#addBlockPool()操作的主要對(duì)象具體到了各blockpool秧倾,完成blockpool、current傀缩、rbw那先、tmp等目錄的檢查、恢復(fù)或初始化:

  public void addBlockPool(String bpid, Configuration conf)
      throws IOException {
    LOG.info("Adding block pool " + bpid);
    synchronized(this) {
      // 向所有卷添加blockpool(所有namespace共享所有卷)
      volumes.addBlockPool(bpid, conf);
      // 初始化ReplicaMap中blockpool的映射
      volumeMap.initBlockPool(bpid);
    }
    // 將所有副本加載到FsDatasetImpl#volumeMap中
    volumes.getAllVolumesMap(bpid, volumeMap, ramDiskReplicaTracker);
  }

FsVolumeList#addBlockPool()

FsVolumeList#addBlockPool()赡艰,并發(fā)向FsVolumeList中的所有卷添加blockpool(所有namespace共享所有卷):

  void addBlockPool(final String bpid, final Configuration conf) throws IOException {
    long totalStartTime = Time.monotonicNow();
    
    final List<IOException> exceptions = Collections.synchronizedList(
        new ArrayList<IOException>());
    List<Thread> blockPoolAddingThreads = new ArrayList<Thread>();
    // 并發(fā)向FsVolumeList中的所有卷添加blockpool(所有namespace共享所有卷)
    for (final FsVolumeImpl v : volumes) {
      Thread t = new Thread() {
        public void run() {
          try {
            ...// 時(shí)間統(tǒng)計(jì)
            // 向卷FsVolumeImpl添加blockpool
            v.addBlockPool(bpid, conf);
            ...// 時(shí)間統(tǒng)計(jì)
          } catch (IOException ioe) {
            ...// 異常處理售淡,循環(huán)外統(tǒng)一處理
          }
        }
      };
      blockPoolAddingThreads.add(t);
      t.start();
    }
    for (Thread t : blockPoolAddingThreads) {
      try {
        t.join();
      } catch (InterruptedException ie) {
        throw new IOException(ie);
      }
    }
    ...// 異常處理。如果存在異常慷垮,僅拋出掃描卷過(guò)程中的第一個(gè)異常
    
    ...// 時(shí)間統(tǒng)計(jì)
  }

正如FsVolumeList#addBlockPool()揖闸,F(xiàn)sVolumeList封裝了很多面向所有卷的操作。

FsVolumeImpl#addBlockPool():

  void addBlockPool(String bpid, Configuration conf) throws IOException {
    File bpdir = new File(currentDir, bpid);
    // 創(chuàng)建BlockPoolSlice
    BlockPoolSlice bp = new BlockPoolSlice(bpid, this, bpdir, conf);
    // 維護(hù)FsVolumeImpl中bpid到BlockPoolSlice的映射
    bpSlices.put(bpid, bp);
  }

BlockPoolSlice是blockpool在每個(gè)卷上的實(shí)際存在形式料身。所有卷上相同bpid的BlockPoolSlice組合成小blockpool(概念上即為BlockPoolSliceStorage)汤纸,再將相關(guān)datanode(向同一個(gè)namespace匯報(bào)的datanode)上相同bpid的小blockpool組合起來(lái),就構(gòu)成了該namespace的blockpool芹血。

而FsVolumeImpl#bpSlices維護(hù)了bpid到BlockPoolSlice的映射贮泞。FsVolumeImpl通過(guò)該映射獲取bpid對(duì)應(yīng)的BlockPoolSlice楞慈,而B(niǎo)lockPoolSlice再反向借助FsDatasetImpl中的靜態(tài)方法完成實(shí)際的文件操作(見(jiàn)后續(xù)文章中的寫(xiě)數(shù)據(jù)塊過(guò)程)。

回到BlockPoolSlice.<init>

  BlockPoolSlice(String bpid, FsVolumeImpl volume, File bpDir,
      Configuration conf) throws IOException {
    this.bpid = bpid;
    this.volume = volume;
    this.currentDir = new File(bpDir, DataStorage.STORAGE_DIR_CURRENT); 
    this.finalizedDir = new File(
        currentDir, DataStorage.STORAGE_DIR_FINALIZED);
    this.lazypersistDir = new File(currentDir, DataStorage.STORAGE_DIR_LAZY_PERSIST);
    // 檢查并創(chuàng)建finalized目錄
    if (!this.finalizedDir.exists()) {
      if (!this.finalizedDir.mkdirs()) {
        throw new IOException("Failed to mkdirs " + this.finalizedDir);
      }
    }

    this.deleteDuplicateReplicas = conf.getBoolean(
        DFSConfigKeys.DFS_DATANODE_DUPLICATE_REPLICA_DELETION,
        DFSConfigKeys.DFS_DATANODE_DUPLICATE_REPLICA_DELETION_DEFAULT);

    // 刪除tmp目錄啃擦。每次啟動(dòng)datanode都會(huì)刪除tmp目錄(并重建)抖部,重新協(xié)調(diào)數(shù)據(jù)塊的一致性。
    this.tmpDir = new File(bpDir, DataStorage.STORAGE_DIR_TMP);
    if (tmpDir.exists()) {
      FileUtil.fullyDelete(tmpDir);
    }
    
    // 檢查并創(chuàng)建rbw目錄
    this.rbwDir = new File(currentDir, DataStorage.STORAGE_DIR_RBW);
    final boolean supportAppends = conf.getBoolean(
        DFSConfigKeys.DFS_SUPPORT_APPEND_KEY,
        DFSConfigKeys.DFS_SUPPORT_APPEND_DEFAULT);
    // 如果不支持append议惰,那么同tmp一樣慎颗,rbw里保存的必然是新寫(xiě)入的數(shù)據(jù),可以在每次啟動(dòng)datanode時(shí)刪除rbw目錄言询,重新協(xié)調(diào)
    if (rbwDir.exists() && !supportAppends) {
      FileUtil.fullyDelete(rbwDir);
    } // 如果支持append俯萎,待datanode啟動(dòng)后,有可能繼續(xù)append數(shù)據(jù)运杭,因此不能刪除夫啊,等待進(jìn)一步確定或恢復(fù)
    
    if (!rbwDir.mkdirs()) {
      if (!rbwDir.isDirectory()) {
        throw new IOException("Mkdirs failed to create " + rbwDir.toString());
      }
    }
    
    if (!tmpDir.mkdirs()) {
      if (!tmpDir.isDirectory()) {
        throw new IOException("Mkdirs failed to create " + tmpDir.toString());
      }
    }
    
    // 啟動(dòng)dfsUsage的監(jiān)控線程(詳見(jiàn)對(duì)hadoop fs shell中df、du區(qū)別的總結(jié))
    this.dfsUsage = new DU(bpDir, conf, loadDfsUsed());
    this.dfsUsage.start();
    ShutdownHookManager.get().addShutdownHook(
      new Runnable() {
        @Override
        public void run() {
          if (!dfsUsedSaved) {
            saveDfsUsed();
          }
        }
      }, SHUTDOWN_HOOK_PRIORITY);
  }

可知辆憔,每個(gè)blockpool目錄下的存儲(chǔ)結(jié)構(gòu)是在構(gòu)造BlockPoolSlice時(shí)初始化的撇眯。

關(guān)于du的作用及優(yōu)化:

在linux系統(tǒng)上,該線程將定期通過(guò)du -sk命令統(tǒng)計(jì)各blockpool目錄的占用情況虱咧,隨著心跳匯報(bào)給namenode熊榛。

執(zhí)行l(wèi)inux命令需要從JVM繼承fork出子進(jìn)程,成本較高(盡管linux使用COW策略避免了對(duì)內(nèi)存空間的完全copy)腕巡。為了加快datanode啟動(dòng)速度玄坦,此處允許使用之前緩存的dfsUsage值,該值保存在current目錄下的dfsUsed文件中绘沉;緩存的dfsUsage會(huì)定期持久化到磁盤(pán)中煎楣;在虛擬機(jī)關(guān)閉時(shí),也會(huì)將當(dāng)前的dfsUsage值持久化车伞。

ReplicaMap#initBlockPool()

ReplicaMap#initBlockPool()择懂,初始化ReplicaMap中blockpool的映射:

  void initBlockPool(String bpid) {
    checkBlockPool(bpid);
    synchronized(mutex) {
      Map<Long, ReplicaInfo> m = map.get(bpid);
      if (m == null) {
        // Add an entry for block pool if it does not exist already
        m = new HashMap<Long, ReplicaInfo>();
        map.put(bpid, m);
      }
    }
  }

FsDatasetImpl#volumeMap(ReplicaMap實(shí)例)中維護(hù)了bpid到各blockpool在該datanode上的所有副本:Map<bpid, Map<blockid, replicaInfo>>

例行挖坑

在以后的文章中另玖,猴子會(huì)陸續(xù)整理DataNode章的寫(xiě)數(shù)據(jù)塊過(guò)程困曙、讀數(shù)據(jù)塊過(guò)程,NameNode章日矫、Client章等赂弓。

由于猴子也是一步步學(xué)習(xí),難免有錯(cuò)漏之處哪轿,煩請(qǐng)讀者批評(píng)指正盈魁;隨著猴子進(jìn)一步的學(xué)習(xí)與自檢,也會(huì)隨時(shí)更新文章窃诉,重要修改會(huì)注明勘誤杨耙。


本文鏈接:源碼|HDFS之DataNode:?jiǎn)?dòng)過(guò)程
作者:猴子007
出處:https://monkeysayhi.github.io
本文基于 知識(shí)共享署名-相同方式共享 4.0 國(guó)際許可協(xié)議發(fā)布赤套,歡迎轉(zhuǎn)載,演繹或用于商業(yè)目的珊膜,但是必須保留本文的署名及鏈接容握。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市车柠,隨后出現(xiàn)的幾起案子剔氏,更是在濱河造成了極大的恐慌,老刑警劉巖竹祷,帶你破解...
    沈念sama閱讀 218,386評(píng)論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件谈跛,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡塑陵,警方通過(guò)查閱死者的電腦和手機(jī)感憾,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,142評(píng)論 3 394
  • 文/潘曉璐 我一進(jìn)店門(mén),熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)令花,“玉大人阻桅,你說(shuō)我怎么就攤上這事〖娑迹” “怎么了嫂沉?”我有些...
    開(kāi)封第一講書(shū)人閱讀 164,704評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)俯抖。 經(jīng)常有香客問(wèn)我输瓜,道長(zhǎng),這世上最難降的妖魔是什么芬萍? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,702評(píng)論 1 294
  • 正文 為了忘掉前任,我火速辦了婚禮搔啊,結(jié)果婚禮上柬祠,老公的妹妹穿的比我還像新娘。我一直安慰自己负芋,他們只是感情好漫蛔,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,716評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著旧蛾,像睡著了一般莽龟。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上锨天,一...
    開(kāi)封第一講書(shū)人閱讀 51,573評(píng)論 1 305
  • 那天毯盈,我揣著相機(jī)與錄音,去河邊找鬼病袄。 笑死搂赋,一個(gè)胖子當(dāng)著我的面吹牛赘阀,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播脑奠,決...
    沈念sama閱讀 40,314評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼基公,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了宋欺?” 一聲冷哼從身側(cè)響起轰豆,我...
    開(kāi)封第一講書(shū)人閱讀 39,230評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎齿诞,沒(méi)想到半個(gè)月后酸休,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,680評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡掌挚,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,873評(píng)論 3 336
  • 正文 我和宋清朗相戀三年雨席,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片吠式。...
    茶點(diǎn)故事閱讀 39,991評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡陡厘,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出特占,到底是詐尸還是另有隱情糙置,我是刑警寧澤,帶...
    沈念sama閱讀 35,706評(píng)論 5 346
  • 正文 年R本政府宣布是目,位于F島的核電站谤饭,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏懊纳。R本人自食惡果不足惜揉抵,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,329評(píng)論 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望嗤疯。 院中可真熱鬧冤今,春花似錦、人聲如沸茂缚。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,910評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)脚囊。三九已至龟糕,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間悔耘,已是汗流浹背讲岁。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,038評(píng)論 1 270
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人催首。 一個(gè)月前我還...
    沈念sama閱讀 48,158評(píng)論 3 370
  • 正文 我出身青樓扶踊,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親郎任。 傳聞我的和親對(duì)象是個(gè)殘疾皇子秧耗,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,941評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容

  • 問(wèn)題:1.job的本質(zhì)是什么? 2.任務(wù)的本質(zhì)是什么舶治? 3.文件系統(tǒng)的Namespace由誰(shuí)來(lái)管理分井,Namespa...
    時(shí)待吾閱讀 2,294評(píng)論 0 0
  • 先思考問(wèn)題 我們處在一個(gè)大數(shù)據(jù)的時(shí)代已經(jīng)是不爭(zhēng)的事實(shí),這主要表現(xiàn)在數(shù)據(jù)源多且大霉猛,如互聯(lián)網(wǎng)數(shù)據(jù)尺锚,人們也認(rèn)識(shí)到數(shù)據(jù)里往...
    墻角兒的花閱讀 7,365評(píng)論 0 9
  • (一)分布式文件系統(tǒng)概述 數(shù)據(jù)量越來(lái)越多,在一個(gè)操作系統(tǒng)管轄的范圍存不下了惜浅,那么就分配到更多的操作系統(tǒng)管理的磁盤(pán)中...
    時(shí)待吾閱讀 1,502評(píng)論 0 0
  • 免費(fèi)圖片網(wǎng)站分享來(lái)了瘫辩!最后再為各位設(shè)計(jì)師們搜羅國(guó)內(nèi)外14個(gè)高清免費(fèi)圖庫(kù),如此實(shí)用的資源設(shè)計(jì)師們趕緊收藏吧坛悉! 這些網(wǎng)...
    打豆豆閱讀 2,248評(píng)論 1 19
  • 如果沒(méi)有回憶我就不會(huì)想起初見(jiàn)時(shí)你贈(zèng)給我的那束繁花和你燦若暖陽(yáng)的笑臉 如果沒(méi)有回憶我就不會(huì)想起相戀時(shí)你陪我看的那片風(fēng)...
    月舞傾城閱讀 766評(píng)論 106 115