SOFABolt 源碼分析5 - Oneway 單向通信方式的設(shè)計(jì)

 MyRequest request = new MyRequest();
 request.setReq("hello, bolt-server");
 client.oneway("127.0.0.1:8888", request);

注意:oneway 沒(méi)有請(qǐng)求超時(shí)的概念鸿脓,所以其調(diào)用api中沒(méi)有 int timeoutMillis 參數(shù)吩愧。但是連接超時(shí)時(shí)間是有的。

一蝶锋、線(xiàn)程模型圖

image.png

總結(jié):

  1. 客戶(hù)端用戶(hù)線(xiàn)程 user-thread 發(fā)出請(qǐng)求(實(shí)際上是將 netty task 壓到 netty 處理隊(duì)列中挑庶,netty 客戶(hù)端 worker 線(xiàn)程進(jìn)行真正的請(qǐng)求發(fā)出)言秸,然后 user-thread 就可以做其他事了,對(duì)于該請(qǐng)求的處理就結(jié)束
  2. 服務(wù)端 worker 線(xiàn)程接收請(qǐng)求迎捺,根據(jù)是否在 IO 線(xiàn)程執(zhí)行所有操作來(lái)決定是否使用一個(gè) Bolt 線(xiàn)程池(或者自定義的線(xiàn)程池)來(lái)處理業(yè)務(wù)举畸,處理完成之后,結(jié)束

二凳枝、代碼執(zhí)行流程梯形圖

2.1 客戶(hù)端發(fā)出請(qǐng)求

-->RpcClient.oneway(String addr, Object request)
  -->RpcClientRemoting.oneway(String addr, Object request, InvokeContext invokeContext) // invokeContext = null
    -->Url url = this.addressParser.parse(addr) // 將 addr 轉(zhuǎn)化為 Url
    -->RpcClientRemoting.oneway(Url url, Object request, InvokeContext invokeContext)
      <!-- 一抄沮、獲取連接 -->
      -->Connection conn = getConnectionAndInitInvokeContext(url, invokeContext)
      <!-- 二、檢查連接 -->
      -->this.connectionManager.check(conn) // 校驗(yàn) connection 不為 null && channel 不為 null && channel 是 active 狀態(tài) && channel 可寫(xiě)
      <!-- 三、創(chuàng)建請(qǐng)求對(duì)象 -->
      -->RpcCommandFactory.createRequestCommand(Object requestObject)
        -->new RpcRequestCommand(Object request) // 設(shè)置唯一id + 消息類(lèi)型為 Request(還有 Response 和 heartbeat)+ MyRequest request
        -->command.serialize() // 序列化
      -->requestCommand.setType(RpcCommandType.REQUEST_ONEWAY) // 設(shè)置請(qǐng)求響應(yīng)模型
      <!-- 四叛买、發(fā)起請(qǐng)求 -->
      -->BaseRemoting.oneway(Connection conn, RemotingCommand request, int timeoutMillis)
        <!-- 4.1 使用 netty 發(fā)送消息 -->
        -->conn.getChannel().writeAndFlush(request) // netty發(fā)送消息

關(guān)于連接 Connection 相關(guān)的砂代,放在《Connection 連接設(shè)計(jì)》章節(jié)分析,此處跳過(guò)率挣;

總結(jié):

  1. 獲取連接
  2. 檢查連接
  3. 使用 RpcCommandFactory 創(chuàng)建請(qǐng)求對(duì)象 + 序列化 + 設(shè)置請(qǐng)求響應(yīng)模型為 REQUEST_ONEWAY
  4. 使用 netty 發(fā)送消息

2.2 服務(wù)端處理請(qǐng)求

RpcHandler.channelRead(ChannelHandlerContext ctx, Object msg)
<!-- 一泊藕、創(chuàng)建上下文 -->
-->new InvokeContext()
-->new RemotingContext(ChannelHandlerContext ctx, InvokeContext invokeContext, boolean serverSide, ConcurrentHashMap<String, UserProcessor<?>> userProcessors)
<!-- 二、根據(jù) channel 中的附加屬性獲取相應(yīng)的 Protocol难礼,之后使用該 Protocol 實(shí)例的 CommandHandler 處理消息 -->
-->RpcCommandHandler.handle(RemotingContext ctx, Object msg)
  <!-- 2.1 從 CommandHandler 中獲取 CommandCode 為 REQUEST 的 RemotingProcessor 實(shí)例 RpcRequestProcessor,之后使用 RpcRequestProcessor 進(jìn)行請(qǐng)求處理-->
  -->RpcRequestProcessor.process(RemotingContext ctx, RpcRequestCommand cmd, ExecutorService defaultExecutor)
    <!-- 2.1.1 反序列化clazz(感興趣key)玫锋,用于獲取相應(yīng)的UserProcessor蛾茉;如果相應(yīng)的UserProcessor==null,創(chuàng)建異常響應(yīng)撩鹿,發(fā)送給調(diào)用端谦炬,否則,繼續(xù)執(zhí)行 -->
    -->反序列化 clazz
    <!-- 
        2.1.2 如果 userProcessor.processInIOThread()==true节沦,直接對(duì)請(qǐng)求進(jìn)行反序列化键思,然后創(chuàng)建ProcessTask任務(wù),最后直接在當(dāng)前的netty worker線(xiàn)程中執(zhí)行ProcessTask.run()甫贯;
              否則吼鳞,如果用戶(hù)自定義了ExecutorSelector,則從眾多的自定義線(xiàn)程池選擇一個(gè)線(xiàn)程池叫搁,如果沒(méi)定義赔桌,則使用自定義的線(xiàn)程池userProcessor.getExecutor(),如果最后沒(méi)有自定義的線(xiàn)程池渴逻,則使用ProcessorManager的defaultExecutor疾党,
              來(lái)執(zhí)行ProcessTask.run()
    -->
      <!-- ProcessTask.run() -->
      -->RpcRequestProcessor.doProcess(RemotingContext ctx, RpcRequestCommand cmd)
        -->反序列化header、content
        -->dispatchToUserProcessor(RemotingContext ctx, RpcRequestCommand cmd)
          <!-- 構(gòu)造用戶(hù)業(yè)務(wù)上下文 -->
          -->new DefaultBizContext(remotingCtx)
          <!-- 使用用戶(hù)自定義處理器處理請(qǐng)求 -->
          -->MyServerUserProcessor.handleRequest(BizContext bizCtx, MyRequest request)
          <!-- 創(chuàng)建響應(yīng) -->
          -->RemotingCommand response = RpcCommandFactory.createResponse(Object responseObject, RemotingCommand requestCmd) // 這里將response.id = requestCmd.id
          <!-- sync:序列化響應(yīng)并發(fā)送 oneway:logger.debug -->
          -->RpcRequestProcessor.sendResponseIfNecessary(RemotingContext ctx, byte type, RemotingCommand response)
            -->logger.debug

總結(jié):

服務(wù)端處理請(qǐng)求步驟與Sync模式幾乎完全相同惨奕,只是最后Sync會(huì) “構(gòu)造響應(yīng) + 序列化響應(yīng) + 發(fā)送響應(yīng)”雪位;而 oneway 只會(huì) “構(gòu)造響應(yīng)”(實(shí)際上,也不應(yīng)該構(gòu)造響應(yīng)

  1. 創(chuàng)建 InvokeContext 和 RemotingContext
  1. 根據(jù) channel 中的附加屬性獲取相應(yīng)的 Protocol梨撞,之后使用該 Protocol 實(shí)例的 CommandHandler 處理消息
  • 從 CommandHandler 中獲取 CommandCode 為 REQUEST 的 RemotingProcessor 實(shí)例 RpcRequestProcessor雹洗,之后使用 RpcRequestProcessor 進(jìn)行請(qǐng)求處理
  • 反序列化clazz(感興趣key),用于獲取相應(yīng)的UserProcessor聋袋;如果相應(yīng)的 UserProcessor==null队伟,創(chuàng)建異常響應(yīng),發(fā)送給調(diào)用端幽勒,否則嗜侮,繼續(xù)執(zhí)行
  • 如果 userProcessor.processInIOThread()==true,直接對(duì)請(qǐng)求進(jìn)行反序列化,然后創(chuàng)建 ProcessTask 任務(wù)锈颗,最后直接在當(dāng)前的 netty worker 線(xiàn)程中執(zhí)行 ProcessTask.run()顷霹;
    否則,如果用戶(hù) UserProcessor 自定義了 ExecutorSelector击吱,則從眾多的自定義線(xiàn)程池選擇一個(gè)線(xiàn)程池淋淀,如果沒(méi)定義,則使用 UserProcessor 自定義的線(xiàn)程池 userProcessor.getExecutor()覆醇,如果還沒(méi)有朵纷,則使用 RemotingProcessor 自定義的線(xiàn)程池 executor,如果最后沒(méi)有自定義的線(xiàn)程池永脓,則使用 ProcessorManager 的defaultExecutor袍辞,來(lái)執(zhí)行ProcessTask.run()
  • 反序列化 header、content(如果用戶(hù)自定義了 ExecutorSelector常摧,則header的反序列化需要提前搅吁,header 會(huì)作為眾多自定義線(xiàn)程池的選擇參數(shù))
  • 構(gòu)造用戶(hù)業(yè)務(wù)上下文 DefaultBizContext
  • 使用用戶(hù)自定義處理器處理請(qǐng)求
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市落午,隨后出現(xiàn)的幾起案子谎懦,更是在濱河造成了極大的恐慌,老刑警劉巖溃斋,帶你破解...
    沈念sama閱讀 217,509評(píng)論 6 504
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件界拦,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡盐类,警方通過(guò)查閱死者的電腦和手機(jī)寞奸,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,806評(píng)論 3 394
  • 文/潘曉璐 我一進(jìn)店門(mén),熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)在跳,“玉大人枪萄,你說(shuō)我怎么就攤上這事∶睿” “怎么了瓷翻?”我有些...
    開(kāi)封第一講書(shū)人閱讀 163,875評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)割坠。 經(jīng)常有香客問(wèn)我齐帚,道長(zhǎng),這世上最難降的妖魔是什么彼哼? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,441評(píng)論 1 293
  • 正文 為了忘掉前任对妄,我火速辦了婚禮,結(jié)果婚禮上敢朱,老公的妹妹穿的比我還像新娘剪菱。我一直安慰自己摩瞎,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,488評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布孝常。 她就那樣靜靜地躺著旗们,像睡著了一般。 火紅的嫁衣襯著肌膚如雪构灸。 梳的紋絲不亂的頭發(fā)上上渴,一...
    開(kāi)封第一講書(shū)人閱讀 51,365評(píng)論 1 302
  • 那天,我揣著相機(jī)與錄音喜颁,去河邊找鬼稠氮。 笑死,一個(gè)胖子當(dāng)著我的面吹牛半开,可吹牛的內(nèi)容都是我干的括袒。 我是一名探鬼主播,決...
    沈念sama閱讀 40,190評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼稿茉,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了芥炭?” 一聲冷哼從身側(cè)響起漓库,我...
    開(kāi)封第一講書(shū)人閱讀 39,062評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎园蝠,沒(méi)想到半個(gè)月后渺蒿,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,500評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡彪薛,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,706評(píng)論 3 335
  • 正文 我和宋清朗相戀三年茂装,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片善延。...
    茶點(diǎn)故事閱讀 39,834評(píng)論 1 347
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡少态,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出易遣,到底是詐尸還是另有隱情彼妻,我是刑警寧澤,帶...
    沈念sama閱讀 35,559評(píng)論 5 345
  • 正文 年R本政府宣布豆茫,位于F島的核電站侨歉,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏揩魂。R本人自食惡果不足惜幽邓,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,167評(píng)論 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望火脉。 院中可真熱鬧牵舵,春花似錦柒啤、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,779評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至重斑,卻和暖如春兵睛,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背窥浪。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,912評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工祖很, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人漾脂。 一個(gè)月前我還...
    沈念sama閱讀 47,958評(píng)論 2 370
  • 正文 我出身青樓假颇,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親骨稿。 傳聞我的和親對(duì)象是個(gè)殘疾皇子笨鸡,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,779評(píng)論 2 354