Hadoop 源碼學(xué)習(xí)筆記(3)--Hdfs的RPC通信框架

前言

單個(gè) Hdfs 集群中可能存在成百上千個(gè) DataNode ,但默認(rèn)情況下 NameNode 只有一個(gè) , 各個(gè)節(jié)點(diǎn)不斷的進(jìn)行內(nèi)部通信霉翔,如果不能快速的處理掉通信消息世舰,可能會(huì)導(dǎo)致掉節(jié)點(diǎn)续搀,或者數(shù)據(jù)傳輸緩慢等問(wèn)題端衰。因此Hdfs內(nèi)部集群對(duì)內(nèi)部RPC通信具有較高的性能要求。

本文會(huì)對(duì) Hdfs 集群的RPC通信框架進(jìn)行分析凫海,看看它是如何保證節(jié)點(diǎn)通信的效率呛凶。

Protobuf 簡(jiǎn)介

在 Hdfs 中,為了提升內(nèi)部通信的傳輸效率行贪,整個(gè)RPC通信框架使用 Google 的 Protobuf 序列化框架進(jìn)行數(shù)據(jù)傳輸漾稀。為了方便后續(xù)理解模闲,這里先對(duì) Protobuf 進(jìn)行簡(jiǎn)單介紹。

數(shù)據(jù)傳輸

Protobuf 首先是一個(gè)跨語(yǔ)言的數(shù)據(jù)傳輸框架崭捍。把它和 XML 和 JSON進(jìn)行對(duì)比可以看出:

語(yǔ)言 特點(diǎn) 可讀性 數(shù)據(jù)Size 解析效率
Protobuf 將數(shù)據(jù)內(nèi)容解析成純字節(jié)形式傳輸 數(shù)據(jù)以字節(jié)形式存在尸折,不具備可讀性 占用數(shù)據(jù)量少 直接讀取數(shù)據(jù)內(nèi)容,效率高
XML和JSON 引入額外文本構(gòu)造出格式化數(shù)據(jù) 額外文本使得數(shù)據(jù)具備良好的可讀性 數(shù)據(jù)以字符形式存在殷蛇,且額外文本占用大量空間 需要解析剔除額外數(shù)據(jù)实夹,效率低

XML和Json都是將數(shù)據(jù)封裝成一個(gè)格式化文本,因此在必要的傳輸數(shù)據(jù)之外粒梦,還有大量的額外文本進(jìn)行狀態(tài)描述亮航。而 Protobuf 通過(guò)將數(shù)據(jù)字段序列化成為一串不可讀的字節(jié)碼,同XML和Json相比匀们,對(duì)于同樣的數(shù)據(jù)缴淋,它所需要傳輸?shù)臄?shù)據(jù)量更小,解析的速度更快泄朴。

Protobuf 也是一門(mén)天生的跨語(yǔ)言數(shù)據(jù)傳輸框架重抖。 對(duì)于不同的語(yǔ)言,都用同一個(gè) .proto 的文件進(jìn)行數(shù)據(jù)描述叼旋,如下:

message Person {
  string name = 1;
  int32 id = 2;  // Unique ID number for this person.
  string email = 3;
}

代碼中的 Person 數(shù)據(jù),可以通過(guò) Google 或者三方的 protobuf 處理工具沦辙,被轉(zhuǎn)化為特定編程語(yǔ)言下的數(shù)據(jù)對(duì)象夫植。

例如,在Java代碼中油讯,通過(guò) .proto 文件生成一個(gè) AddressBook 數(shù)據(jù)類(lèi)详民,那么生成的 Java 文件中會(huì)自帶 mergeFromwriteTo 方法如下:

// 從輸入流中反序列化數(shù)據(jù)
AddressBook.Builder addressBook = AddressBook.newBuilder();
addressBook.mergeFrom(new FileInputStream(args[0]));

// 序列化數(shù)據(jù)到輸入流
FileOutputStream output = new FileOutputStream(args[0]);
addressBook.build().writeTo(output);

通過(guò)Protobuf內(nèi)部的IO邏輯,我們可以將指定的數(shù)據(jù)轉(zhuǎn)化為少量的字節(jié)碼進(jìn)行傳輸陌兑,從而提升整體的傳輸效率沈跨。

對(duì)于任意語(yǔ)言,只要以同樣的方式記錄和讀取同一份字節(jié)碼數(shù)據(jù)就可以得到同樣的數(shù)據(jù)對(duì)象兔综,從而保證序列化數(shù)據(jù)的可還原性饿凛。同時(shí),在數(shù)據(jù)的序列化過(guò)程中软驰,由于沒(méi)有額外文本的參與涧窒,也不需要保持?jǐn)?shù)據(jù)在傳輸過(guò)程中的可讀性,因此對(duì)于同一個(gè)數(shù)據(jù)锭亏,Protobuf擁有比XML和Json更小的數(shù)據(jù)量和更快的解析速度纠吴。

RPC 調(diào)用

Protobuf 除了實(shí)現(xiàn)數(shù)據(jù)的傳輸作用以外,還實(shí)現(xiàn)了一套R(shí)PC遠(yuǎn)程調(diào)用框架慧瘤。

定義一個(gè) .proto 文件如下

option java_generic_services = true;

service ReconfigurationProtocolService {
    rpc getReconfigurationStatus(GetReconfigurationStatusRequestProto)
      returns(GetReconfigurationStatusResponseProto);
}

使用Protobuf編譯工具進(jìn)行處理之后戴已,可以得到一個(gè) ReconfigurationProtocolService 接口固该,例如上方代碼對(duì)應(yīng)的接口中會(huì)有一個(gè)叫做 getReconfigurationStatus,參數(shù)類(lèi)型為GetReconfigurationStatusRequestProto, 返回值為GetReconfigurationStatusResponseProto 的方法。

// 構(gòu)造BlockingService
ReconfigurationProtocolServerSideTranslatorPB reconfigurationProtocolXlator
        = new ReconfigurationProtocolServerSideTranslatorPB(this);
BlockingService reconfigurationPbService = ReconfigurationProtocolService
        .newReflectiveBlockingService(reconfigurationProtocolXlator);

// 調(diào)用BlockingService
service.callBlockingMethod(methodDescriptor, null, param);

在 Java 文件中糖儡,通過(guò)動(dòng)態(tài)代理得到一個(gè)BlockingService對(duì)象伐坏,內(nèi)部包裹一個(gè)實(shí)現(xiàn)了 ReconfigurationProtocolService.BlockingInterface 接口的對(duì)象。

當(dāng)需要使用RPC服務(wù)時(shí)休玩,系統(tǒng)通過(guò)傳輸需要調(diào)用的方法名和相關(guān)的調(diào)用參數(shù)著淆,使用 BlockingService::callBlockingMethod,就可以在Server端解析調(diào)用邏輯,實(shí)現(xiàn)RPC遠(yuǎn)程調(diào)用拴疤。

RPC通信的邏輯實(shí)現(xiàn)

總覽

言歸正傳永部,我們回到 Hdfs 的內(nèi)部通信機(jī)制本身。

RPC操作

如上圖中呐矾,ProxyImpl是對(duì)同一個(gè)RPC調(diào)用接口的實(shí)現(xiàn)類(lèi)苔埋,當(dāng)Proxy中的接口被調(diào)用時(shí),通過(guò)Client發(fā)送消息到 Server 蜒犯,Server 會(huì)按照標(biāo)準(zhǔn)數(shù)據(jù)格式進(jìn)行解析组橄,再調(diào)用Server側(cè)的 Impl方法進(jìn)行執(zhí)行,并返回結(jié)果數(shù)據(jù)罚随。Client 發(fā)送消息到 Server 的過(guò)程對(duì)于接口訪問(wèn)而言是透明的玉工,對(duì)于使用者來(lái)說(shuō),他在本地執(zhí)行 Proxy 的接口,會(huì)得到具有相同接口的 Impl 的調(diào)用結(jié)果摹量。

不同的RPC框架的具體實(shí)現(xiàn)邏輯不盡相同瘾婿,在Hdfs中,RPC.Server類(lèi)扮演RPC框架中的 Server 角色狭郑,處理響應(yīng)內(nèi)部通信請(qǐng)求; Client 類(lèi)扮演RPC框架中的 Client 角色汇在,負(fù)責(zé)調(diào)用消息的發(fā)送和結(jié)果數(shù)據(jù)接收翰萨。

接下來(lái)會(huì)針對(duì) Server 和 Client 的進(jìn)行代碼邏輯的走讀。

Server

RPC.Server的源碼路徑是 $src/hadooop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java糕殉。

RPC 類(lèi)中有一個(gè) Builder 類(lèi)負(fù)責(zé)構(gòu)造 RPC.Server亩鬼,在構(gòu)造方法中我們看到:

public Server build() throws IOException, HadoopIllegalArgumentException {
    return getProtocolEngine(this.protocol, this.conf).getServer(
          this.protocol, this.instance, this.bindAddress, this.port,
          this.numHandlers, this.numReaders, this.queueSizePerHandler,
          this.verbose, this.conf, this.secretManager, this.portRangeConfig);
}

默認(rèn)情況下,通過(guò) getProtocolEngine 都是得到一個(gè) ProtobufRpcEngine 對(duì)象阿蝶,再通過(guò)ProtobufRpcEngine::getServer構(gòu)造出 ProtobufRpcEngine.Server 對(duì)象辛孵。

ProtobufRpcEngine.ServerServer 的子類(lèi),整個(gè)內(nèi)部通信機(jī)制在 Server 類(lèi)中就已經(jīng)實(shí)現(xiàn)了赡磅,下面是 Server 中的數(shù)據(jù)處理流程魄缚。

Server通信邏輯

Server類(lèi)中使用了四種類(lèi)型的線程類(lèi),分別是Listener,Reader,HandlerResponder。如上圖所示冶匹,為了方便表示各個(gè)線程間的通信邏輯习劫,使用泳道代表著對(duì)應(yīng)類(lèi)型的線程類(lèi)操作時(shí)鎖使用的關(guān)鍵方法。

Listener

Listener 作為單線程任務(wù)負(fù)責(zé)監(jiān)聽(tīng)指定端口的socketACCEPT 請(qǐng)求嚼隘,當(dāng)新的 socket鏈接到來(lái)時(shí)诽里,將其封裝成一個(gè) Connection 對(duì)象,通過(guò)addConnection添加Reader的處理隊(duì)列中飞蛹。

Server 中只有一個(gè) Listener 線程負(fù)責(zé)接收新的socket請(qǐng)求谤狡,但有多個(gè) Reader 線程,在Listener::doAccept 中會(huì)根據(jù)以下代碼盡可能將 Connection 平均分配到各個(gè) Reader中卧檐,讓多個(gè)線程可以同時(shí)讀取不同的 socket 數(shù)據(jù)墓懂,從而避免Listener單線程引起的性能瓶頸。

Reader getReader() {
      currentReader = (currentReader + 1) % readers.length;
      return readers[currentReader];
}

Reader

Reader負(fù)責(zé)內(nèi)部通信數(shù)據(jù)的解析工作霉囚,它不斷嘗試從Connection所包裝的socket對(duì)象中讀取數(shù)據(jù)捕仔。當(dāng)發(fā)現(xiàn)某個(gè) socket 可讀時(shí),通過(guò) readAndProcess-> processOneRpc 處理到來(lái)的消息盈罐。

private void processOneRpc(ByteBuffer bb) throws IOException, WrappedRpcServerException, InterruptedException {
    final RpcWritable.Buffer buffer = RpcWritable.Buffer.wrap(bb);
    final RpcRequestHeaderProto header = getMessage(RpcRequestHeaderProto.getDefaultInstance(), buffer);
    callId = header.getCallId();
    if (callId < 0) { 
        processRpcOutOfBandRequest(header, buffer);
    } else if(!connectionContextRead) {
        throw new WrappedRpcServerException();
    } else {
        processRpcRequest(header, buffer);
    }
}

從上面的代碼可以看出榜跌,每次從 socket 請(qǐng)求傳來(lái)的數(shù)據(jù)請(qǐng)求都必然帶著一個(gè) RpcRequestHeaderProto 對(duì)象,這個(gè)對(duì)象中封裝著后續(xù)參數(shù)的相關(guān)信息盅粪,就像 Http 協(xié)議中的頭信息钓葫。

當(dāng) socket 初次建立鏈接時(shí),需要通過(guò) procesRpcOutOfBandRequest 進(jìn)行鏈接初始化票顾,初始化時(shí)的 callId < 0础浮。初始化完成之后,后續(xù)請(qǐng)求通過(guò) processRpcRequest 進(jìn)行消費(fèi)。

private void processRpcRequest(RpcRequestHeaderProto header,
    RpcWritable.Buffer buffer) throws WrappedRpcServerException,
    InterruptedException {
    Class<? extends Writable> rpcRequestClass =  getRpcRequestWrapper(header.getRpcKind());
    Writable rpcRequest;
    rpcRequest = buffer.newInstance(rpcRequestClass, conf);
    RpcCall call = new RpcCall(this, header.getCallId(),
          header.getRetryCount(), rpcRequest,
          ProtoUtil.convert(header.getRpcKind()),
          header.getClientId().toByteArray(), traceScope, callerContext);
    queueCall(call);
}

這里根據(jù)RpcRequestHeaderProto中包含的body類(lèi)型解析出對(duì)應(yīng)的數(shù)據(jù)類(lèi)库物,將其封裝成一個(gè) RpcCall 對(duì)象霸旗,放入 Handler 的消費(fèi)隊(duì)列中贷帮。

Handler

Handler 線程負(fù)責(zé)具體指令的執(zhí)行工作戚揭。

final Call call = callQueue.take(); // pop the queue; maybe blocked here
CurCall.set(call);
// always update the current call context
CallerContext.setCurrent(call.callerContext);
UserGroupInformation remoteUser = call.getRemoteUser();
if (remoteUser != null) {
    remoteUser.doAs(call);
} else {
    call.run();
}

Handler 的循環(huán)隊(duì)列中,不斷從 callQueue 中獲取需要消費(fèi)的任務(wù)信息撵枢,然后通過(guò) call.run() 進(jìn)行任務(wù)執(zhí)行民晒。

@Override
public Void run() throws Exception {
    Writable value = null;
    ResponseParams responseParams = new ResponseParams();
    value = call(rpcKind, connection.protocolName, rpcRequest, timestamp);
    if (!isResponseDeferred()) {
        setupResponse(this, responseParams.returnStatus, responseParams.detailedErr, value, responseParams.errorClass, responseParams.error);
        sendResponse();
    }
}    

RpcCall::run 中我們看到,系統(tǒng)實(shí)際上是通過(guò)Server::call方法執(zhí)行的锄禽,這個(gè)方法在 RPC.Server 中被實(shí)現(xiàn)潜必。

static { // Register the rpcRequest deserializer for ProtobufRpcEngine
    org.apache.hadoop.ipc.Server.registerProtocolEngine(
        RPC.RpcKind.RPC_PROTOCOL_BUFFER, RpcProtobufRequest.class,
        new Server.ProtoBufRpcInvoker());
}

@Override
public Writable call(RPC.RpcKind rpcKind, String protocol,
        Writable rpcRequest, long receiveTime) throws Exception {
    return getRpcInvoker(rpcKind).call(this, protocol, rpcRequest,
          receiveTime);
}

// Server.ProtoBufRpcInvoker
public Writable call(RPC.Server server, String connectionProtocolName,
          Writable writableRequest, long receiveTime) throws Exception {
    RpcProtobufRequest request = (RpcProtobufRequest) writableRequest;
    RequestHeaderProto rpcRequest = request.getRequestHeader();
    String methodName = rpcRequest.getMethodName();
    
    String declaringClassProtoName = 
            rpcRequest.getDeclaringClassProtocolName();
    long clientVersion = rpcRequest.getClientProtocolVersion();
    
    ProtoClassProtoImpl protocolImpl = getProtocolImpl(server, 
                              declaringClassProtoName, clientVersion);
    BlockingService service = (BlockingService) protocolImpl.protocolImpl;
    result = service.callBlockingMethod(methodDescriptor, null, param);
}

從源碼中可以看到,RPC.Server::call經(jīng)過(guò)層層路徑沃但,最終在Server.ProtoBufRpcInvoker 根據(jù)傳入的數(shù)據(jù)找到對(duì)應(yīng)的BlockingService,利用 Protobuf (這里沒(méi)有使用Protobuf內(nèi)置的RpcChannel,而是自己手動(dòng)調(diào)用BlockingService::callBlockingMethod)實(shí)現(xiàn)方法的調(diào)用磁滚。

Responder

Reponder 線程的 while 循環(huán)中,我們看到當(dāng)socket可寫(xiě)時(shí),會(huì)嘗試調(diào)用 doAsyncWrite->processResponse 進(jìn)行寫(xiě)入操作

private boolean processResponse(LinkedList<RpcCall> responseQueue,
                                    boolean inHandler) throws IOException {
    call = responseQueue.removeFirst();
    SocketChannel channel = call.connection.channel;
    int numBytes = channelWrite(channel, call.rpcResponse);
    if (numBytes < 0) {
        return true;
    }
    if (!call.rpcResponse.hasRemaining()) {
        ...
    } else {
        call.connection.responseQueue.addFirst(call);
    }
    return done;
}

private int channelWrite(WritableByteChannel channel, 
                           ByteBuffer buffer) throws IOException {
    int count =  (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
                 channel.write(buffer) : channelIO(null, channel, buffer);
    if (count > 0) {
        rpcMetrics.incrSentBytes(count);
    }
    return count;
}

Responder會(huì)將得到的 response 寫(xiě)入socket 的輸出流中垂攘,返回給Client维雇。

Client

Client 的源碼路徑是 $src/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java

@Override
@SuppressWarnings("unchecked")
public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
      InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
      SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy,
      AtomicBoolean fallbackToSimpleAuth) throws IOException {

    final Invoker invoker = new Invoker(protocol, addr, ticket, conf, factory,
        rpcTimeout, connectionRetryPolicy, fallbackToSimpleAuth);
    return new ProtocolProxy<T>(protocol, (T) Proxy.newProxyInstance(
        protocol.getClassLoader(), new Class[]{protocol}, invoker), false);
}

Client 端通過(guò) ProtobufRpcEngine::getProxy 構(gòu)建出一個(gè)動(dòng)態(tài)代理的接口對(duì)象晒他。當(dāng) Client 訪問(wèn)接口時(shí)吱型,通過(guò) Invoker 類(lèi)通知 Client 發(fā)送請(qǐng)求給 Server。

public Message invoke(Object proxy, final Method method, Object[] args) throws ServiceException {
    RequestHeaderProto rpcRequestHeader = constructRpcRequestHeader(method);
    final Message theRequest = (Message) args[1];
    final RpcWritable.Buffer val;
    val = (RpcWritable.Buffer) client.call(RPC.RpcKind.RPC_PROTOCOL_BUFFER,
            new RpcProtobufRequest(rpcRequestHeader, theRequest), remoteId,
            fallbackToSimpleAuth);
    return getReturnMessage(method, val);
}

Invoker 會(huì)根據(jù)訪問(wèn)接口的簽名信息構(gòu)造出一個(gè) RequestHeaderProto 對(duì)象陨仅,在上一小節(jié)中津滞,我們看到當(dāng) Server 接收到 socket 信息時(shí),會(huì)先讀取這個(gè) RequestHeaderProto灼伤,了解當(dāng)前調(diào)用的方法名稱(chēng)触徐,然后進(jìn)行后續(xù)分發(fā)。

RequestHeaderProto 對(duì)象隨著 Message 對(duì)象一起被封裝成一個(gè) Call 對(duì)象傳遞給 Client 進(jìn)行發(fā)送饺蔑,每一個(gè) Call 對(duì)象會(huì)有一個(gè)唯一的 callId锌介, 便于在接收到返回信息中,返回給指定的 Call猾警。

Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
      ConnectionId remoteId, int serviceClass,
      AtomicBoolean fallbackToSimpleAuth) throws IOException {
  final Connection connection = getConnection(remoteId, call, serviceClass,
  fallbackToSimpleAuth);
  connection.sendRpcRequest(call);
}

private Connection getConnection(ConnectionId remoteId,
      Call call, int serviceClass, AtomicBoolean fallbackToSimpleAuth)
      throws IOException {
  connection = connections.get(remoteId);
  Connection existing = connections.putIfAbsent(remoteId, connection);
  if (connection == null) {
        connection = new Connection(remoteId, serviceClass);
  }
  connection.setupIOstreams(fallbackToSimpleAuth);
  return connection;
}

Client 有一個(gè) connectionsConnection 隊(duì)列負(fù)責(zé)同各個(gè)節(jié)點(diǎn)的NameNode 進(jìn)行通信孔祸,首次構(gòu)造 Connection 對(duì)象后,通過(guò) setupIOstreams初始化鏈接信息发皿,同時(shí)發(fā)送相關(guān)的設(shè)置信息到 Server::processRpcOutOfBandRequest 中進(jìn)行Server側(cè)的初始化崔慧。

當(dāng)有一個(gè)可用的Connection 后,通過(guò) connection::sendRpcRequest將請(qǐng)求發(fā)送給對(duì)應(yīng)的Server穴墅。

同時(shí)Connection 也是一個(gè)線程類(lèi)惶室,在 setupIOstreams 的時(shí)候會(huì)啟動(dòng)接收線程。接收線程在收到消息之后玄货,根據(jù)消息中的唯一callId將返回?cái)?shù)據(jù)返回給指定的 Call 對(duì)象皇钞,完成整個(gè) Client 的通信流程。

NameNode 和 DataNode的心跳邏輯

接下來(lái)松捉,以 NameNodeDataNode的心跳發(fā)送機(jī)制為例夹界,舉例說(shuō)明內(nèi)部通信的流程。

在 Hdfs 中隘世,心跳是單向的可柿,總是由DataNode主動(dòng)上報(bào)當(dāng)前狀態(tài)到NameNode中,因此對(duì)于心跳而言丙者,NameNode是Server,DataNode是Client复斥。

DataNode

在前一篇文章中,我介紹了DataNode 在啟動(dòng)的時(shí)候械媒,會(huì)構(gòu)造一個(gè) BlockPoolManager 對(duì)象目锭,在 BlockPoolManager 中有一個(gè) BPOfferService的集合對(duì)象。

BPOfferService(List<InetSocketAddress> nnAddrs, List<InetSocketAddress> lifelineNnAddrs, DataNode dn) {
    for (int i = 0; i < nnAddrs.size(); ++i) {
        this.bpServices.add(new BPServiceActor(nnAddrs.get(i),
          lifelineNnAddrs.get(i), this));
    }
}

void start() {
    for (BPServiceActor actor : bpServices) {
        actor.start();
    }
}

每一個(gè)BPOfferService對(duì)應(yīng)著一個(gè) NameService , 對(duì)于 NameService 的每一個(gè) NameNode 節(jié)點(diǎn),會(huì)對(duì)應(yīng) BPServiceActor 的Runnable類(lèi)痢虹。在啟動(dòng)BPOfferService的時(shí)候键俱,其實(shí)就是啟動(dòng)每一個(gè)BPServiceActor類(lèi)。

void start() {
    bpThread = new Thread(this, formatThreadName("heartbeating", nnAddr));
    bpThread.start();
}

@Override
public void run() {
    connectToNNAndHandshake();
    while (shouldRun()) {
        offerService();
    }
}

private void offerService() throws Exception {
    while (shouldRun()) {
        final long startTime = scheduler.monotonicNow();
         final boolean sendHeartbeat = scheduler.isHeartbeatDue(startTime);
         HeartbeatResponse resp = null;
         if (sendHeartbeat) {
            resp = sendHeartBeat(requestBlockReportLease);
         }
         ....
    }
}

BPServiceActor類(lèi)本身是一個(gè)Runnable的實(shí)現(xiàn)類(lèi)世分,在線程循環(huán)中编振,先鏈接到NameNode ,再在 while 循環(huán)中不斷offerService臭埋。

offerService中踪央,通過(guò) sendHeartBeat 進(jìn)行周期性的心跳發(fā)送。

private void connectToNNAndHandshake() throws IOException {
    // get NN proxy
    bpNamenode = dn.connectToNN(nnAddr);

    // First phase of the handshake with NN - get the namespace
    // info.
    NamespaceInfo nsInfo = retrieveNamespaceInfo();

    // Verify that this matches the other NN in this HA pair.
    // This also initializes our block pool in the DN if we are
    // the first NN connection for this BP.
    bpos.verifyAndSetNamespaceInfo(this, nsInfo);
    
    // Second phase of the handshake with the NN.
    register(nsInfo);
}

HeartbeatResponse sendHeartBeat(boolean requestBlockReportLease)
      throws IOException {
    scheduler.scheduleNextHeartbeat();
    scheduler.updateLastHeartbeatTime(monotonicNow());
    return bpNamenode.sendHeartbeat(bpRegistration,
        reports,
        dn.getFSDataset().getCacheCapacity(),
        dn.getFSDataset().getCacheUsed(),
        dn.getXmitsInProgress(),
        dn.getXceiverCount(),
        numFailedVolumes,
        volumeFailureSummary,
        requestBlockReportLease);
}

// DatanodeProtocolClientSideTranslatorPB.java
@Override
public HeartbeatResponse sendHeartbeat(DatanodeRegistration registration,
      StorageReport[] reports, long cacheCapacity, long cacheUsed,
      int xmitsInProgress, int xceiverCount, int failedVolumes,
      VolumeFailureSummary volumeFailureSummary,
      boolean requestFullBlockReportLease) throws IOException {
  HeartbeatRequestProto.Builder builder = HeartbeatRequestProto.newBuilder()
        .setRegistration(PBHelper.convert(registration))
        .setXmitsInProgress(xmitsInProgress).setXceiverCount(xceiverCount)
        .setFailedVolumes(failedVolumes)
        .setRequestFullBlockReportLease(requestFullBlockReportLease);
  resp = rpcProxy.sendHeartbeat(NULL_CONTROLLER, builder.build());
  return new HeartbeatResponse(cmds, PBHelper.convert(resp.getHaStatus()),
        rollingUpdateStatus, resp.getFullBlockReportLeaseId());
}

connectToNNAndHandshake中瓢阴,通過(guò)ProtobufRpcEngine::getProxy 獲得一個(gè)bpNamenode 的RPC代理類(lèi)畅蹂,調(diào)用 bpNamenode.sendHeartbeat時(shí),通過(guò)動(dòng)態(tài)代理將消息通過(guò) Client 發(fā)送出去荣恐。

NameNode

DataNode發(fā)送了心跳之后液斜,對(duì)應(yīng)的NameNode會(huì)接收到一條對(duì)應(yīng)的請(qǐng)求信息。

通過(guò)走讀代碼叠穆,我們找到了同樣實(shí)現(xiàn) DatanodeProtocolService 接口的是DatanodeProtocolServerSideTranslatorPB 類(lèi)少漆。

public HeartbeatResponseProto sendHeartbeat(RpcController controller,
      HeartbeatRequestProto request) throws ServiceException {
  return namesystem.handleHeartbeat(nodeReg, report,
        dnCacheCapacity, dnCacheUsed, xceiverCount, xmitsInProgress,
        failedVolumes, volumeFailureSummary, requestFullBlockReportLease);
}

DatanodeProtocolServerSideTranslatorPB::sendHeartbeat 中通過(guò)事件分發(fā)將心跳事件交給 FSNamesystem 進(jìn)行消費(fèi),從而完成了 DataNodeNameNode 的心跳事件硼被。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末示损,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子嚷硫,更是在濱河造成了極大的恐慌检访,老刑警劉巖,帶你破解...
    沈念sama閱讀 207,248評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件仔掸,死亡現(xiàn)場(chǎng)離奇詭異脆贵,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)起暮,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,681評(píng)論 2 381
  • 文/潘曉璐 我一進(jìn)店門(mén)卖氨,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人鞋怀,你說(shuō)我怎么就攤上這事双泪〕炙眩” “怎么了密似?”我有些...
    開(kāi)封第一講書(shū)人閱讀 153,443評(píng)論 0 344
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)葫盼。 經(jīng)常有香客問(wèn)我残腌,道長(zhǎng),這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 55,475評(píng)論 1 279
  • 正文 為了忘掉前任抛猫,我火速辦了婚禮蟆盹,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘闺金。我一直安慰自己逾滥,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,458評(píng)論 5 374
  • 文/花漫 我一把揭開(kāi)白布败匹。 她就那樣靜靜地躺著寨昙,像睡著了一般。 火紅的嫁衣襯著肌膚如雪掀亩。 梳的紋絲不亂的頭發(fā)上舔哪,一...
    開(kāi)封第一講書(shū)人閱讀 49,185評(píng)論 1 284
  • 那天,我揣著相機(jī)與錄音槽棍,去河邊找鬼捉蚤。 笑死,一個(gè)胖子當(dāng)著我的面吹牛炼七,可吹牛的內(nèi)容都是我干的缆巧。 我是一名探鬼主播,決...
    沈念sama閱讀 38,451評(píng)論 3 401
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼豌拙,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼盅蝗!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起姆蘸,我...
    開(kāi)封第一講書(shū)人閱讀 37,112評(píng)論 0 261
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤墩莫,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后逞敷,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體狂秦,經(jīng)...
    沈念sama閱讀 43,609評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,083評(píng)論 2 325
  • 正文 我和宋清朗相戀三年推捐,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了裂问。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,163評(píng)論 1 334
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡牛柒,死狀恐怖堪簿,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情皮壁,我是刑警寧澤椭更,帶...
    沈念sama閱讀 33,803評(píng)論 4 323
  • 正文 年R本政府宣布,位于F島的核電站蛾魄,受9級(jí)特大地震影響虑瀑,放射性物質(zhì)發(fā)生泄漏湿滓。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,357評(píng)論 3 307
  • 文/蒙蒙 一舌狗、第九天 我趴在偏房一處隱蔽的房頂上張望叽奥。 院中可真熱鬧,春花似錦痛侍、人聲如沸朝氓。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,357評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)膀篮。三九已至,卻和暖如春岂膳,著一層夾襖步出監(jiān)牢的瞬間誓竿,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,590評(píng)論 1 261
  • 我被黑心中介騙來(lái)泰國(guó)打工谈截, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留筷屡,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 45,636評(píng)論 2 355
  • 正文 我出身青樓簸喂,卻偏偏與公主長(zhǎng)得像毙死,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子喻鳄,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,925評(píng)論 2 344

推薦閱讀更多精彩內(nèi)容