【精】徹底理解HDFS寫文件流程

本文包含如下內(nèi)容:
① Pipeline的建立過程街望,以及下游節(jié)點(diǎn)如何給上游節(jié)點(diǎn)發(fā)Ack
② DFSOutputStream榆骚、DataStreamer的原理
③ Sender、BlockReceiver闰蚕、PacketResponder的原理

作為引子两踏,先從最上游談起:

我們使用HDFS API創(chuàng)建文件,寫文件時艰垂,首先會調(diào)用FileSystem的create方法泡仗,獲得一個FSDataOutputStream流,然后通過用這個流來write數(shù)據(jù)即可猜憎。

別看API這么簡單娩怎,這后面發(fā)生的事情可是十分復(fù)雜!比如這后面涉及到Client通過RPC調(diào)用在NameNode側(cè)的文件系統(tǒng)目錄中創(chuàng)建文件拉宗、addBlock峦树,NameNode給客戶端返回文件的LocatedBlock、客戶端根據(jù)LocatedBlock創(chuàng)建輸出流旦事、Datanode建立Pipeline魁巩、Client發(fā)送Packet到Pipeline,Sender發(fā)送數(shù)據(jù)到DataXeicver等等姐浮。

接下來讓我們一起來深入探索吧谷遂!

一、寫數(shù)據(jù)Pipeline總覽

我們跳過客戶端調(diào)用RPC添加數(shù)據(jù)塊卖鲤,NameNode側(cè)為數(shù)據(jù)塊選擇最優(yōu)存放的DataNode列表這一步驟的講解肾扰,直接來到已經(jīng)選好DataNode列表后畴嘶,客戶端向列表中的DataNode寫數(shù)據(jù)的步驟。

這個步驟需要客戶端與DataNode列表之間建立一個Pipeline集晚。如下圖所示:

數(shù)據(jù)是以DFSPacket對象的形式封裝的窗悯,一個Block可能由很多個Packet組成,Packet的具體格式參照HDFS源碼中的DFSPacket.java的注釋偷拔。

Client發(fā)送Packet給Pipeline中的第一個DataNode A蒋院,A收到數(shù)據(jù)后轉(zhuǎn)發(fā)給B,B再轉(zhuǎn)發(fā)給C莲绰。 當(dāng)Pipeline中的最下游節(jié)點(diǎn)收到數(shù)據(jù)包后欺旧,會按照數(shù)據(jù)包傳送方向的反方向發(fā)送對數(shù)據(jù)包的Ack信息。這個Ack信息是一個數(shù)據(jù)包的序列號蛤签,在Client側(cè)是單調(diào)遞增的辞友。上圖是從單個數(shù)據(jù)包的視角出發(fā)。我們來從多個數(shù)據(jù)包的視角出發(fā)來看震肮,下圖引自雅虎實(shí)驗(yàn)室的HDFS論文:

OK称龙,從整體上看,整個寫pipeline的過程很容易理解戳晌,但這里面的代碼量非常多茵瀑,涉及到Client側(cè)、DataNode側(cè)的很多線程類躬厌。后文中,我們將逐一擊破竞帽。

二扛施、DFSOutputStream的獲取

首先來看看DFSClient中如何獲取寫數(shù)據(jù)的流:DFSOutputStream

  /**
   * Same as {@link #create(String, FsPermission, EnumSet, boolean, short, long,
   * Progressable, int, ChecksumOpt, InetSocketAddress[])} with the addition of
   * ecPolicyName that is used to specify a specific erasure coding policy
   * instead of inheriting any policy from this new file's parent directory.
   * This policy will be persisted in HDFS. A value of null means inheriting
   * parent groups' whatever policy.
   */
  public DFSOutputStream create(String src, FsPermission permission,
      EnumSet<CreateFlag> flag, boolean createParent, short replication,
      long blockSize, Progressable progress, int buffersize,
      ChecksumOpt checksumOpt, InetSocketAddress[] favoredNodes,
      String ecPolicyName) throws IOException {
    checkOpen();
    final FsPermission masked = applyUMask(permission);
    LOG.debug("{}: masked={}", src, masked);
    final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this,
        src, masked, flag, createParent, replication, blockSize, progress,
        dfsClientConf.createChecksum(checksumOpt),
        getFavoredNodesStr(favoredNodes), ecPolicyName);
    beginFileLease(result.getFileId(), result);
    return result;
  }

通過DFSOutputStream#newStreamForCreate方法獲取到一個DFSOutputStream流對象。參數(shù)比較多屹篓,關(guān)注一下this代表DFSClient對象疙渣,還有favoredNodes列表代表優(yōu)先選擇的DataNode。

在newStreamForCreate方法中主要做兩個工作:
①調(diào)用Namenode代理對象上的create方法堆巧,在NameNode文件系統(tǒng)中創(chuàng)建一個文件妄荔,并獲得此文件的元數(shù)據(jù)信息HdfsFileStatus對象。
②將①中的HdfsFileStatus對象傳入DFSOutputStream構(gòu)造方法中谍肤,構(gòu)造出一個DFSOutputStream流對象啦租,同時啟動DFSOutputStream對象中的DataStreamer線程。

  static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src,
      FsPermission masked, EnumSet<CreateFlag> flag, boolean createParent,
      short replication, long blockSize, Progressable progress,
      DataChecksum checksum, String[] favoredNodes, String ecPolicyName)
      throws IOException {
    try (TraceScope ignored =
             dfsClient.newPathTraceScope("newStreamForCreate", src)) {
      HdfsFileStatus stat = null;

      // Retry the create if we get a RetryStartFileException up to a maximum
      // number of times
      boolean shouldRetry = true;
      int retryCount = CREATE_RETRY_COUNT;
      while (shouldRetry) {
        shouldRetry = false;
        try {
          stat = dfsClient.namenode.create(src, masked, dfsClient.clientName,
              new EnumSetWritable<>(flag), createParent, replication,
              blockSize, SUPPORTED_CRYPTO_VERSIONS, ecPolicyName);
          break;
        } catch (RemoteException re) {
          IOException e = re.unwrapRemoteException(
              AccessControlException.class,
              DSQuotaExceededException.class,
              QuotaByStorageTypeExceededException.class,
              FileAlreadyExistsException.class,
              FileNotFoundException.class,
              ParentNotDirectoryException.class,
              NSQuotaExceededException.class,
              RetryStartFileException.class,
              SafeModeException.class,
              UnresolvedPathException.class,
              SnapshotAccessControlException.class,
              UnknownCryptoProtocolVersionException.class);
          if (e instanceof RetryStartFileException) {
            if (retryCount > 0) {
              shouldRetry = true;
              retryCount--;
            } else {
              throw new IOException("Too many retries because of encryption" +
                  " zone operations", e);
            }
          } else {
            throw e;
          }
        }
      }
      Preconditions.checkNotNull(stat, "HdfsFileStatus should not be null!");
      // 構(gòu)造一個DFSOutputStream對象
      final DFSOutputStream out;
      if(stat.getErasureCodingPolicy() != null) {
        out = new DFSStripedOutputStream(dfsClient, src, stat,
            flag, progress, checksum, favoredNodes);
      } else {
        out = new DFSOutputStream(dfsClient, src, stat,
            flag, progress, checksum, favoredNodes, true);
      }
      // 啟動DataStreamer線程
      out.start();
      return out;
    }
  }

那我們就一路跟蹤DFSOutputStream的構(gòu)造方法吧:

也是主要做了兩件事:
①設(shè)置一下DFSOutputStream的成員變量的值荒揣,比如對應(yīng)的fileId篷角、src、對應(yīng)的DFSClient對象系任、數(shù)據(jù)packet大小等等
②創(chuàng)建DataStreamer線程(外層去start這個線程)

/** Construct a new output stream for creating a file. */
  protected DFSOutputStream(DFSClient dfsClient, String src,
      HdfsFileStatus stat, EnumSet<CreateFlag> flag, Progressable progress,
      DataChecksum checksum, String[] favoredNodes, boolean createStreamer) {
    this(dfsClient, src, flag, progress, stat, checksum);
    this.shouldSyncBlock = flag.contains(CreateFlag.SYNC_BLOCK);

    computePacketChunkSize(dfsClient.getConf().getWritePacketSize(),
        bytesPerChecksum);

    if (createStreamer) {
      streamer = new DataStreamer(stat, null, dfsClient, src, progress,
          checksum, cachingStrategy, byteArrayManager, favoredNodes,
          addBlockFlags);
    }
  }

OK恳蹲,到這里似乎斷線了虐块?不知道接下來改怎么看了?DataStreamer線程的run方法啊嘉蕾。為了讓行文邏輯更順暢贺奠,我進(jìn)行了段落的重排,下面的輸出流write數(shù)據(jù)這塊代碼是我DataStreamer源碼后回來寫的错忱。

DataStreamer線程從dataQueue中提取數(shù)據(jù)包儡率,并將數(shù)據(jù)包發(fā)送到將管道中的第一個datanode,然后把這個數(shù)據(jù)包從dataQueue移動到ackQueue航背。那我們不就得看看數(shù)據(jù)包是怎么添加到dataQueue中的喉悴。全局代碼搜索dataQueue.add,發(fā)現(xiàn)dataQueue.addLast是我們想要的方法玖媚。通過查看調(diào)用關(guān)系:

最后你會發(fā)現(xiàn):是在調(diào)用FSOutputSummer#write方法時箕肃,才會將數(shù)據(jù)封裝成DFSPacket對象入到dataQueue里等待DataStreamer去發(fā)送給DataNode的。

而DFSOutputStream類恰恰就是FSOutputSummer的子類今魔,且沒有重寫write方法勺像。也就是說我們最后的write方法是走FSOutputSummer的write方法,也就會把數(shù)據(jù)封裝成DFSPacket對象入隊到dataQueue中了错森。

notes
①我們在使用HDFS API的時候吟宦,雖然流對象是FSDataOutputStream類型的,但是本質(zhì)上底層的流還是DFSOutputStream的涩维,F(xiàn)SDataOutputStream這個流是個包裝流殃姓,是DFSClient#createWrappedOutputStream方法中把DFSOutputStream類包裝成了FSDataOutputStream類。
②構(gòu)造流的時候瓦阐,我們根據(jù)配置參數(shù)獲得到max packet的大小蜗侈,然后再write數(shù)據(jù)的時候,由于流是FSOutputSummer的子類睡蟋,會記錄當(dāng)前已經(jīng)寫了的字節(jié)數(shù)踏幻,如果達(dá)到配置設(shè)置的最大上限,那么就構(gòu)造DFSPacket戳杀,入隊到dataQueue中该面,等待DataStreamer線程進(jìn)行處理。

三、DataStream類

DataStreamer是Client寫數(shù)據(jù)的核心類。它本質(zhì)上是個Thread類闻镶。它的主要功能在JavaDoc中描述的很詳細(xì),這里摘錄一下:

/*********************************************************************
 *
 * The DataStreamer class is responsible for sending data packets to the
 * datanodes in the pipeline. It retrieves a new blockid and block locations
 * from the namenode, and starts streaming packets to the pipeline of
 * Datanodes. Every packet has a sequence number associated with
 * it. When all the packets for a block are sent out and acks for each
 * if them are received, the DataStreamer closes the current block.
 *
 * The DataStreamer thread picks up packets from the dataQueue, sends it to
 * the first datanode in the pipeline and moves it from the dataQueue to the
 * ackQueue. The ResponseProcessor receives acks from the datanodes. When an
 * successful ack for a packet is received from all datanodes, the
 * ResponseProcessor removes the corresponding packet from the ackQueue.
 *
 * In case of error, all outstanding packets are moved from ackQueue. A new
 * pipeline is setup by eliminating the bad datanode from the original
 * pipeline. The DataStreamer now starts sending packets from the dataQueue.
 *
 *********************************************************************/

翻譯一下蚕泽,什么tmd叫tmd驚喜:

DataStreamer類負(fù)責(zé)給在pipeline中的datanode發(fā)送數(shù)據(jù)包。它從namenode檢索一個新的blockid和block位置,并開始把數(shù)據(jù)包以流的形式發(fā)送到datanodes組成的pipeline中须妻。每個數(shù)據(jù)包都有一個與之相關(guān)聯(lián)的序列號仔蝌。當(dāng)一個塊的所有包都被發(fā)送出去并且每個包的ack信息也都被收到,則DataStreamer會關(guān)閉當(dāng)前塊荒吏。

DataStreamer線程從dataQueue中提取數(shù)據(jù)包敛惊,并將數(shù)據(jù)包發(fā)送到將管道中的第一個datanode,然后把這個數(shù)據(jù)包從dataQueue移動到ackQueue绰更。ResponseProcessor線程接收來自datanode的響應(yīng)瞧挤。當(dāng)一個數(shù)據(jù)包的成功的ack信息從所有datanode發(fā)送過來時,ResponseProcessor會從ackQueue中移除相應(yīng)的數(shù)據(jù)包儡湾。

在出現(xiàn)錯誤的情況下特恬,所有未完成的數(shù)據(jù)包將從ackQueue中被移除。接著在原來的出錯的pipeline中消除掉bad datanode的基礎(chǔ)上構(gòu)建一個新的pipeline徐钠。DataStreamer再繼續(xù)開始發(fā)送dataQueue中的數(shù)據(jù)包癌刽。

OK,看完JavaDoc知道了DataStreamer的作用之后尝丐,來看下流程圖和一些關(guān)鍵源碼显拜。

從整體上看,DataStream#run方法的執(zhí)行邏輯如下圖:

它是一個線程類爹袁,那我們就看它的run方法就好了远荠。run方法很長,大概有200行代碼左右失息,讀者可以不用全看譬淳,只需關(guān)注我在代碼中注釋中拋出的問題即可,后面我會針對每個問題進(jìn)行解答:

  /*
   * streamer thread is the only thread that opens streams to datanode,
   * and closes them. Any error recovery is also done by this thread.
   */
  @Override
  public void run() {
    long lastPacket = Time.monotonicNow();
    TraceScope scope = null;
    while (!streamerClosed && dfsClient.clientRunning) {
      // if the Responder encountered an error, shutdown Responder
      if (errorState.hasError()) {
        closeResponder();
      }

      DFSPacket one;
      try {
        // process datanode IO errors if any
        boolean doSleep = processDatanodeOrExternalError();

        final int halfSocketTimeout = dfsClient.getConf().getSocketTimeout()/2;
        synchronized (dataQueue) {
          // wait for a packet to be sent.
          long now = Time.monotonicNow();
          // ①halfSocketTimeout和timeout時間是怎么回事盹兢?
          while ((!shouldStop() && dataQueue.size() == 0 &&
              (stage != BlockConstructionStage.DATA_STREAMING ||
                  now - lastPacket < halfSocketTimeout)) || doSleep) {
            long timeout = halfSocketTimeout - (now-lastPacket);
            timeout = timeout <= 0 ? 1000 : timeout;
            timeout = (stage == BlockConstructionStage.DATA_STREAMING)?
                timeout : 1000;
            try {
              dataQueue.wait(timeout);
            } catch (InterruptedException  e) {
              LOG.warn("Caught exception", e);
            }
            doSleep = false;
            now = Time.monotonicNow();
          }
          if (shouldStop()) {
            continue;
          }
          // get packet to be sent.
          if (dataQueue.isEmpty()) {
            one = createHeartbeatPacket();
          } else {
            try {
              backOffIfNecessary();
            } catch (InterruptedException e) {
              LOG.warn("Caught exception", e);
            }
            one = dataQueue.getFirst(); // regular data packet
            SpanId[] parents = one.getTraceParents();
            if (parents.length > 0) {
              scope = dfsClient.getTracer().
                  newScope("dataStreamer", parents[0]);
              scope.getSpan().setParents(parents);
            }
          }
        }

        // get new block from namenode.
        if (LOG.isDebugEnabled()) {
          LOG.debug("stage=" + stage + ", " + this);
        }
        // ②如何建立起Pipeline的瘦赫,Socket怎么創(chuàng)建的?
        if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
          LOG.debug("Allocating new block: {}", this);
          setPipeline(nextBlockOutputStream());
          initDataStreaming();
        } else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) {
          LOG.debug("Append to block {}", block);
          setupPipelineForAppendOrRecovery();
          if (streamerClosed) {
            continue;
          }
          initDataStreaming();
        }

        long lastByteOffsetInBlock = one.getLastByteOffsetBlock();
        if (lastByteOffsetInBlock > stat.getBlockSize()) {
          throw new IOException("BlockSize " + stat.getBlockSize() +
              " < lastByteOffsetInBlock, " + this + ", " + one);
        }

        if (one.isLastPacketInBlock()) {
          // wait for all data packets have been successfully acked
          synchronized (dataQueue) {
            while (!shouldStop() && ackQueue.size() != 0) {
              try {
                // wait for acks to arrive from datanodes
                dataQueue.wait(1000);
              } catch (InterruptedException  e) {
                LOG.warn("Caught exception", e);
              }
            }
          }
          if (shouldStop()) {
            continue;
          }
          stage = BlockConstructionStage.PIPELINE_CLOSE;
        }

        // send the packet
        SpanId spanId = SpanId.INVALID;
        synchronized (dataQueue) {
          // move packet from dataQueue to ackQueue
          if (!one.isHeartbeatPacket()) {
            if (scope != null) {
              spanId = scope.getSpanId();
              scope.detach();
              one.setTraceScope(scope);
            }
            scope = null;
            dataQueue.removeFirst();
            ackQueue.addLast(one);
            packetSendTime.put(one.getSeqno(), Time.monotonicNow());
            dataQueue.notifyAll();
          }
        }

        LOG.debug("{} sending {}", this, one);

        // write out data to remote datanode
        // 此處是寫蛤迎,調(diào)了flush刷流
        try (TraceScope ignored = dfsClient.getTracer().
            newScope("DataStreamer#writeTo", spanId)) {
          one.writeTo(blockStream);
          blockStream.flush();
        } catch (IOException e) {
          // HDFS-3398 treat primary DN is down since client is unable to
          // write to primary DN. If a failed or restarting node has already
          // been recorded by the responder, the following call will have no
          // effect. Pipeline recovery can handle only one node error at a
          // time. If the primary node fails again during the recovery, it
          // will be taken out then.
          errorState.markFirstNodeIfNotMarked();
          throw e;
        }
        lastPacket = Time.monotonicNow();

        // update bytesSent
        long tmpBytesSent = one.getLastByteOffsetBlock();
        if (bytesSent < tmpBytesSent) {
          bytesSent = tmpBytesSent;
        }

        if (shouldStop()) {
          continue;
        }
         // ③如果是block的最后一個packet,則等待ackQueue中的Packet都被移除(代表接收到了DataNode的響應(yīng))
        // Is this block full?
        if (one.isLastPacketInBlock()) {
          // wait for the close packet has been acked
          synchronized (dataQueue) {
            while (!shouldStop() && ackQueue.size() != 0) {
              dataQueue.wait(1000);// wait for acks to arrive from datanodes
            }
          }
          if (shouldStop()) {
            continue;
          }

          endBlock();
        }
        if (progress != null) { progress.progress(); }

        // This is used by unit test to trigger race conditions.
        if (artificialSlowdown != 0 && dfsClient.clientRunning) {
          Thread.sleep(artificialSlowdown);
        }
      } catch (Throwable e) {
        // Log warning if there was a real error.
        if (!errorState.isRestartingNode()) {
          // Since their messages are descriptive enough, do not always
          // log a verbose stack-trace WARN for quota exceptions.
          if (e instanceof QuotaExceededException) {
            LOG.debug("DataStreamer Quota Exception", e);
          } else {
            LOG.warn("DataStreamer Exception", e);
          }
        }
        lastException.set(e);
        assert !(e instanceof NullPointerException);
        errorState.setInternalError();
        if (!errorState.isNodeMarked()) {
          // Not a datanode issue
          streamerClosed = true;
        }
      } finally {
        if (scope != null) {
          scope.close();
          scope = null;
        }
      }
    }
    closeInternal();
  }

OK含友,上面我們拋出了三個問題:
① wait的參數(shù)timeout時間是怎么確定的替裆?

為什么需要wait呢?看while的條件:

while ((!shouldStop() && dataQueue.size() == 0 &&
              (stage != BlockConstructionStage.DATA_STREAMING ||
                  now - lastPacket < halfSocketTimeout)) || doSleep) 

dataQueue的size是0窘问,證明沒有數(shù)據(jù)需要發(fā)送辆童,因此不需要執(zhí)行后面的發(fā)送邏輯,所以線程可以wait進(jìn)入等待狀態(tài)惠赫。至于為什么要用halfSocketTimeout這個值把鉴,我覺得單純是HDFS這塊開發(fā)者做的一個trade-off,你也可以減小這個值,這樣無非就是多發(fā)送幾個heartbeat Packet而已庭砍。(而且场晶,最新的HDFS社區(qū)代碼這里已經(jīng)移除了wait halfSocketTimeout的邏輯)。

做個簡單的數(shù)學(xué)計算
long timeout = halfSocketTimeout - (now-lastPacket);
timeout = timeout <= 0 ? 1000 : timeout;
那如果timeout>0實(shí)際上結(jié)束wait的時間是:
now + timeout = now + halfSocketTimeout - now + lastPacket = lastPacket + halfSocketTimeout;
也即保證在上一個Packet包發(fā)送后怠缸,在wait至少halfSocketTimeout時長诗轻。

②如何構(gòu)建的DN網(wǎng)絡(luò)連接,Pipeline

這塊是重中之重揭北,以create文件為例:

Pipeline建立過程.png

這里有個細(xì)節(jié)扳炬,需要了解一下。就是假設(shè)數(shù)據(jù)塊存放的DataNode列表是:【A搔体、B恨樟、C】三臺。 Client首先和A節(jié)點(diǎn)建立Socket連接疚俱,然后A節(jié)點(diǎn)和B節(jié)點(diǎn)再建立Socket連接劝术,B節(jié)點(diǎn)再和C節(jié)點(diǎn)建立Socket連接。這里實(shí)現(xiàn)的方法如下:

在構(gòu)造參數(shù)的序列化對象時计螺,涉及到生成發(fā)送到下一步的目標(biāo)節(jié)點(diǎn)列表時夯尽,會調(diào)用PBHelperClient.convert方法,并傳入原始列表和startIndex=1登馒〕孜眨看這個convert方法的實(shí)現(xiàn),startIndex = 1的意思就是每次都從列表的第二個元素開始構(gòu)造新的target列表陈轿。舉個例子說明:開始是target DataNode列表是:【A圈纺、B、C】麦射,客戶端和A建立Socket連接時蛾娶,發(fā)送的target DataNode列表就是:【B、C】了潜秋,這樣A就可以和B建立連接蛔琅,以此類推 B -> C。

OK峻呛,到這里我們就知道Pipeline的整個構(gòu)造過程了罗售。

③ pipeline下游的DataNode怎么給上游發(fā)送ack的,以及ackQueue這個數(shù)據(jù)結(jié)構(gòu)相關(guān)的操作钩述。 這塊放到第三章講寨躁。

三、BlockReceiver & PacketResponder

上面提到過牙勘,DataStream線程從dataQueue隊列中取出待發(fā)送的DFSPacket對象時职恳,會把packet加入到ackQueue中,表示此Packet需要等待pipeline中的DataNode都返回ACK信息。那DataNode是如果給上游的DataNode以及Client返回Ack信息的呢放钦?下面我們就來看看色徘。

在DataStreamer中不是調(diào)用了new Sender(xxx).writeBlock方法么?這個東西會被DataNode側(cè)的DataXceiver類中的writeBlock方法處理最筒。writeBlock方法中又會委托給BlockReceiver#receiveBlock方法贺氓。receiveBlock方法中啟動了PacketResponder線程用來對接收到的packet進(jìn)行響應(yīng)。

觀察一下receiveBlock方法的參數(shù):

這三個流分別代表給下游datanode發(fā)送數(shù)據(jù)的輸出流床蜘、接收下游datanode數(shù)據(jù)的輸入流辙培、以及回復(fù)上游數(shù)據(jù)的輸出流⌒暇猓看一下PacketResponder的構(gòu)造函數(shù)扬蕊,將其中的兩個流傳入作為參數(shù)了:

type表示此responder是pipeline中的最后一個節(jié)點(diǎn)還是中間節(jié)點(diǎn)。

回到receiveBlock里丹擎,會有這樣一行代碼:

while (receivePacket() >= 0) { /* Receive until the last packet */ }

這是一個空循環(huán)尾抑,只要還有數(shù)據(jù)過來,那我就一直調(diào)用receivePacket方法蒂培。這個receivePacket方法返回的是接收的數(shù)據(jù)字節(jié)數(shù)再愈。里面會從輸入流中讀取Packet的各種信息,然后入隊护戳,等待packet線程后臺去處理翎冲,如下圖:

那接下來就是看PacketResponeder怎么給上游的DataNode或者Client返回響應(yīng)的吧。其實(shí)是在PacketResponeder線程的run方法中最終調(diào)用了sendAckUpstreamUnprotected方法給上游發(fā)送Ack媳荒,如下圖所示:

OK抗悍,到這里我們也就知道了DataNode是如何回復(fù)Ack信息的了。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末钳枕,一起剝皮案震驚了整個濱河市缴渊,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌鱼炒,老刑警劉巖衔沼,帶你破解...
    沈念sama閱讀 221,198評論 6 514
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異昔瞧,居然都是意外死亡指蚁,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,334評論 3 398
  • 文/潘曉璐 我一進(jìn)店門硬爆,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人擎鸠,你說我怎么就攤上這事缀磕。” “怎么了?”我有些...
    開封第一講書人閱讀 167,643評論 0 360
  • 文/不壞的土叔 我叫張陵袜蚕,是天一觀的道長糟把。 經(jīng)常有香客問我,道長牲剃,這世上最難降的妖魔是什么遣疯? 我笑而不...
    開封第一講書人閱讀 59,495評論 1 296
  • 正文 為了忘掉前任,我火速辦了婚禮凿傅,結(jié)果婚禮上缠犀,老公的妹妹穿的比我還像新娘。我一直安慰自己聪舒,他們只是感情好辨液,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,502評論 6 397
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著箱残,像睡著了一般滔迈。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上被辑,一...
    開封第一講書人閱讀 52,156評論 1 308
  • 那天燎悍,我揣著相機(jī)與錄音,去河邊找鬼盼理。 笑死谈山,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的榜揖。 我是一名探鬼主播勾哩,決...
    沈念sama閱讀 40,743評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼举哟!你這毒婦竟也來了思劳?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,659評論 0 276
  • 序言:老撾萬榮一對情侶失蹤妨猩,失蹤者是張志新(化名)和其女友劉穎潜叛,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體壶硅,經(jīng)...
    沈念sama閱讀 46,200評論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡威兜,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,282評論 3 340
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了庐椒。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片椒舵。...
    茶點(diǎn)故事閱讀 40,424評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖约谈,靈堂內(nèi)的尸體忽然破棺而出笔宿,到底是詐尸還是另有隱情犁钟,我是刑警寧澤,帶...
    沈念sama閱讀 36,107評論 5 349
  • 正文 年R本政府宣布泼橘,位于F島的核電站涝动,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏炬灭。R本人自食惡果不足惜醋粟,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,789評論 3 333
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望重归。 院中可真熱鬧米愿,春花似錦、人聲如沸提前。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,264評論 0 23
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽狈网。三九已至宙搬,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間拓哺,已是汗流浹背勇垛。 一陣腳步聲響...
    開封第一講書人閱讀 33,390評論 1 271
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留士鸥,地道東北人闲孤。 一個月前我還...
    沈念sama閱讀 48,798評論 3 376
  • 正文 我出身青樓,卻偏偏與公主長得像烤礁,于是被迫代替她去往敵國和親讼积。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,435評論 2 359

推薦閱讀更多精彩內(nèi)容