Spark RPC Client Request 和 Server Response 流程

需要結(jié)合下面這幾篇文章看坪蚁,下面是自己學(xué)習(xí)的記錄。
https://blog.csdn.net/u011564172/article/details/62043236
https://blog.csdn.net/u011564172/article/details/60875013
https://blog.csdn.net/u011564172/article/details/60143168
https://blog.csdn.net/u011564172/article/details/59113617

Master Main 方法中,調(diào)用 RpcEnv 的 create 方法纺荧,返回 NettyRpcEnv 實(shí)例晴埂,NettyRpcEnv 繼承自 RpcEnv痕囱,create 方法最終啟動(dòng)了 Netty 服務(wù)(具體請參考 Spark RPC之Netty啟動(dòng))诞吱,流程入下圖:

RpcEnv create 方法 返回的 NettyRpcEnv 實(shí)例剂陡,隨后調(diào)用了 setupEndpoint 方法:

    val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)
    val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME,
      new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))

其實(shí)是調(diào)用了 Dispatcher 的 registerRpcEndpoint 方法:

  //NettyRpcEnv.scala 中的代碼
  override def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = {
    dispatcher.registerRpcEndpoint(name, endpoint)
  }

在 NettyRpcEnv.scala 中創(chuàng)建了 TransportContext:

  private val transportContext = new TransportContext(transportConf,
    new NettyRpcHandler(dispatcher, this, streamManager))

TransportContext 構(gòu)造函數(shù)中創(chuàng)建了 NettyRpcHandler狈涮,NettyRpcHandler 繼承自 RpcHandler,看下 NettyRpcHandler 類的部分代碼:

private[netty] class NettyRpcHandler(
    dispatcher: Dispatcher,
    nettyEnv: NettyRpcEnv,
    streamManager: StreamManager) extends RpcHandler with Logging {

  override def receive(
      client: TransportClient,
      message: ByteBuffer,
      callback: RpcResponseCallback): Unit = {
    val messageToDispatch = internalReceive(client, message)
    dispatcher.postRemoteMessage(messageToDispatch, callback)
  }

  override def receive(
      client: TransportClient,
      message: ByteBuffer): Unit = {
    val messageToDispatch = internalReceive(client, message)
    dispatcher.postOneWayMessage(messageToDispatch)
  }
}

可以看到 有兩個(gè) 重寫的 receive 方法鸭栖,我們知道 receive 方法用來接收 遠(yuǎn)端發(fā)來的 RPC消息,最終調(diào)用了 Dispatcher 的 postMessage 方法握巢。
那 receive 最終由哪里調(diào)用呢晕鹊?其實(shí)最終是從 TransportRequestHandler 的 rpcHandler 調(diào)用的。
TransportRequestHandler 類 的 rpcHandler 成員暴浦,持有了 NettyRpcHandler 的引用溅话。我們看下 NettyRpcHandler 如何一步步把自己傳給 TransportRequestHandler 的 rpcHandle 的:

TransportContext 的 rpcHandler 成員持有了 NettyRpcHandler 的引用:

  public TransportContext(TransportConf conf, RpcHandler rpcHandler) {
    this(conf, rpcHandler, false);
  }

  public TransportContext(...RpcHandler rpcHandler) {
    ...
    this.rpcHandler = rpcHandler;
  }

TransportContext 把 rpcHandler 傳給了 TransportServer:

  public TransportServer createServer(int port, List<TransportServerBootstrap> bootstraps) {
    return new TransportServer(this, null, port, rpcHandler, bootstraps);
  }

TransportServer 的成員 appRpcHandler 持有了 NettyRpcHandler 的引用:

  public TransportServer(...RpcHandler appRpcHandler) {
    ...
    this.appRpcHandler = appRpcHandler;
  }

在 TransportServer 的 init 方法中,把 appRpcHandler 傳給了 TransportContext 的initializePipeline 方法:

private void init(String hostToBind, int portToBind) {
  ...
  context.initializePipeline(ch, rpcHandler);
}

我們看下 TransportContext 的initializePipeline 方法:

public TransportChannelHandler initializePipeline(SocketChannel channel, RpcHandler channelRpcHandler) {
  ...
  TransportChannelHandler channelHandler = createChannelHandler(channel, channelRpcHandler);
  //下面把 TransportChannelHandler 添加到 pipeline 中歌焦。
  channel.pipeline()
        .addLast("encoder", ENCODER)
        .addLast(TransportFrameDecoder.HANDLER_NAME, NettyUtils.createFrameDecoder())
        .addLast("decoder", DECODER)
        .addLast("idleStateHandler", new IdleStateHandler(0, 0, conf.connectionTimeoutMs() / 1000))
        // NOTE: Chunks are currently guaranteed to be returned in the order of request, but this
        // would require more logic to guarantee if this were not part of the same event loop.
        .addLast("handler", channelHandler);
   return channelHandler;
}

initializePipeline 方法創(chuàng)建了 TransportChannelHandler飞几,并返回。
看下 createChannelHandler 方法:

  private TransportChannelHandler createChannelHandler(...RpcHandler rpcHandler) {
    TransportRequestHandler requestHandler = new TransportRequestHandler(channel, client,
      rpcHandler);
    return new TransportChannelHandler(client, responseHandler, requestHandler,
      conf.connectionTimeoutMs(), closeIdleConnections);
  }

最終 TransportRequestHandler 的成員 rpcHandler 持有了 NettyRpcHandler 的引用独撇。

我們 看下 TransportRequestHandler 中 使用 rpcHandler 的地方:

  private void processRpcRequest(final RpcRequest req) {
      rpcHandler.receive(reverseClient, req.body().nioByteBuffer(), new RpcResponseCallback() {
        @Override
        public void onSuccess(ByteBuffer response) {
          respond(new RpcResponse(req.requestId, new NioManagedBuffer(response)));
        }
      });
  }

  private void processOneWayMessage(OneWayMessage req) {
    rpcHandler.receive(reverseClient, req.body().nioByteBuffer());
  }

NettyRpcHandler 重寫的 receive 方法屑墨,最終在這里被回調(diào)的:rpcHandler.receive
上面兩個(gè)方法在這里調(diào)用:

  @Override
  public void handle(RequestMessage request) {
    if (request instanceof ChunkFetchRequest) {
      processFetchRequest((ChunkFetchRequest) request);
    } else if (request instanceof RpcRequest) {
      processRpcRequest((RpcRequest) request);
    } else if (request instanceof OneWayMessage) {
      processOneWayMessage((OneWayMessage) request);
    } else if (request instanceof StreamRequest) {
      processStreamRequest((StreamRequest) request);
    } else {
      throw new IllegalArgumentException("Unknown request type: " + request);
    }
  }

handle 方法對 RequestMessage 做了區(qū)分纷铣,驗(yàn)證了我們上面提到的卵史。
在 TransportChannelHandler.java 中調(diào)用了 handle 方法:

  @Override
  public void channelRead(ChannelHandlerContext ctx, Object request) throws Exception {
    if (request instanceof RequestMessage) {
      requestHandler.handle((RequestMessage) request);
    } else if (request instanceof ResponseMessage) {
      responseHandler.handle((ResponseMessage) request);
    } else {
      ctx.fireChannelRead(request);
    }
  }

而 channelRead 中的消息是 client 通過 RPC 發(fā)過來的。

處理client 的 RpcRequest 請求

RpcEndpointRefRpcEndpoint不在一臺機(jī)器


上圖的過程3搜立,簡化了流程以躯,這個(gè)簡化的流程就是我們上面分析的。

不在同一臺機(jī)器時(shí)啄踊,需要借助于netty忧设,大致步驟如下

  1. Spark RPC之Netty啟動(dòng) 所述,創(chuàng)建RpcEnv時(shí)啟動(dòng)netty server颠通,同時(shí)將TransportChannelHandler添加到pipeline中
  2. 如上圖址晕,TransportChannelHandler處理netty接收到的數(shù)據(jù),依次交給TransportRequestHandler蒜哀、NettyRpcHandler處理斩箫。
  3. 最后交由Dispatcher、Inbox撵儿,請參考Spark RPC之Dispatcher乘客、Inbox、Outbox 淀歇∫缀耍看下 Dispatcher 流程圖:

RpcEndpointRefRpcEndpoint在一臺機(jī)器

在同一臺機(jī)器時(shí),不需要netty浪默,直接訪問RpcEndpoint牡直,如上圖缀匕,依然交給Dispatcher、Inbox處理碰逸。

https://blog.csdn.net/u011564172/article/details/62043236

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末乡小,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子饵史,更是在濱河造成了極大的恐慌满钟,老刑警劉巖,帶你破解...
    沈念sama閱讀 219,110評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件胳喷,死亡現(xiàn)場離奇詭異湃番,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)吭露,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,443評論 3 395
  • 文/潘曉璐 我一進(jìn)店門吠撮,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人讲竿,你說我怎么就攤上這事泥兰。” “怎么了戴卜?”我有些...
    開封第一講書人閱讀 165,474評論 0 356
  • 文/不壞的土叔 我叫張陵逾条,是天一觀的道長。 經(jīng)常有香客問我投剥,道長师脂,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,881評論 1 295
  • 正文 為了忘掉前任江锨,我火速辦了婚禮吃警,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘啄育。我一直安慰自己酌心,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,902評論 6 392
  • 文/花漫 我一把揭開白布挑豌。 她就那樣靜靜地躺著安券,像睡著了一般。 火紅的嫁衣襯著肌膚如雪氓英。 梳的紋絲不亂的頭發(fā)上侯勉,一...
    開封第一講書人閱讀 51,698評論 1 305
  • 那天,我揣著相機(jī)與錄音铝阐,去河邊找鬼址貌。 笑死,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的练对。 我是一名探鬼主播遍蟋,決...
    沈念sama閱讀 40,418評論 3 419
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼螟凭!你這毒婦竟也來了虚青?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,332評論 0 276
  • 序言:老撾萬榮一對情侶失蹤赂摆,失蹤者是張志新(化名)和其女友劉穎挟憔,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體烟号,經(jīng)...
    沈念sama閱讀 45,796評論 1 316
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,968評論 3 337
  • 正文 我和宋清朗相戀三年政恍,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了汪拥。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,110評論 1 351
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡篙耗,死狀恐怖迫筑,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情宗弯,我是刑警寧澤脯燃,帶...
    沈念sama閱讀 35,792評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站蒙保,受9級特大地震影響辕棚,放射性物質(zhì)發(fā)生泄漏东臀。R本人自食惡果不足惜赛蔫,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,455評論 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望炉菲。 院中可真熱鬧详恼,春花似錦补君、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,003評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至敞掘,卻和暖如春叽掘,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背渐逃。 一陣腳步聲響...
    開封第一講書人閱讀 33,130評論 1 272
  • 我被黑心中介騙來泰國打工够掠, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人茄菊。 一個(gè)月前我還...
    沈念sama閱讀 48,348評論 3 373
  • 正文 我出身青樓疯潭,卻偏偏與公主長得像赊堪,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個(gè)殘疾皇子竖哩,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,047評論 2 355

推薦閱讀更多精彩內(nèi)容

  • JPA 多數(shù)據(jù)源實(shí)現(xiàn) 線程同步方法 有幾種線程池 CAS機(jī)制 CAS有3個(gè)操作數(shù)哭廉,內(nèi)存值V,舊的預(yù)期值A(chǔ)相叁,要修改的...
    Gxgeek閱讀 226評論 1 0
  • 朝陽區(qū):秒拍 陌陌(第一優(yōu)先級)粉筆網(wǎng) 快看漫畫 馬蜂窩(第一優(yōu)先級) Keep 海淀區(qū):火幣網(wǎng) 頭條(第一優(yōu)先級...
    TonyLan閱讀 283評論 1 1
  • 小時(shí)候增淹,你躲在爸爸的雨衣下椿访,靜靜地看著外面的世界, 長大后虑润,你環(huán)抱著愛人成玫,盡全力給予她作為男人所賦予的安全感。 小...
    生活的橡皮擦閱讀 200評論 0 0
  • 易姑娘拳喻,長的不漂亮哭当,但氣質(zhì)古典,溫潤靜雅冗澈,屬于站在人群中不會(huì)被一眼認(rèn)出來钦勘,可一旦認(rèn)出來就容易喜歡的姑娘。 姑娘在5...
    易秋姑娘閱讀 246評論 0 0
  • 理想和堅(jiān)持亚亲,太簡單的兩個(gè)詞彻采。今天的分享看得我心潮澎湃。 想起來上次出差早晨早起徒步四個(gè)小時(shí)去看景色朵栖,雖然不是多么的...
    張磊沙閱讀 116評論 0 0