MyRequest request = new MyRequest(); request.setReq("hello, bolt-server"); client.oneway("127.0.0.1:8888", request);
注意:oneway 沒(méi)有請(qǐng)求超時(shí)的概念鸿脓,所以其調(diào)用api中沒(méi)有 int timeoutMillis 參數(shù)吩愧。但是連接超時(shí)時(shí)間是有的。
一蝶锋、線(xiàn)程模型圖
image.png
總結(jié):
- 客戶(hù)端用戶(hù)線(xiàn)程 user-thread 發(fā)出請(qǐng)求(實(shí)際上是將 netty task 壓到 netty 處理隊(duì)列中挑庶,netty 客戶(hù)端 worker 線(xiàn)程進(jìn)行真正的請(qǐng)求發(fā)出)言秸,然后 user-thread 就可以做其他事了,對(duì)于該請(qǐng)求的處理就結(jié)束
- 服務(wù)端 worker 線(xiàn)程接收請(qǐng)求迎捺,根據(jù)是否在 IO 線(xiàn)程執(zhí)行所有操作來(lái)決定是否使用一個(gè) Bolt 線(xiàn)程池(或者自定義的線(xiàn)程池)來(lái)處理業(yè)務(wù)举畸,處理完成之后,結(jié)束
二凳枝、代碼執(zhí)行流程梯形圖
2.1 客戶(hù)端發(fā)出請(qǐng)求
-->RpcClient.oneway(String addr, Object request)
-->RpcClientRemoting.oneway(String addr, Object request, InvokeContext invokeContext) // invokeContext = null
-->Url url = this.addressParser.parse(addr) // 將 addr 轉(zhuǎn)化為 Url
-->RpcClientRemoting.oneway(Url url, Object request, InvokeContext invokeContext)
<!-- 一抄沮、獲取連接 -->
-->Connection conn = getConnectionAndInitInvokeContext(url, invokeContext)
<!-- 二、檢查連接 -->
-->this.connectionManager.check(conn) // 校驗(yàn) connection 不為 null && channel 不為 null && channel 是 active 狀態(tài) && channel 可寫(xiě)
<!-- 三、創(chuàng)建請(qǐng)求對(duì)象 -->
-->RpcCommandFactory.createRequestCommand(Object requestObject)
-->new RpcRequestCommand(Object request) // 設(shè)置唯一id + 消息類(lèi)型為 Request(還有 Response 和 heartbeat)+ MyRequest request
-->command.serialize() // 序列化
-->requestCommand.setType(RpcCommandType.REQUEST_ONEWAY) // 設(shè)置請(qǐng)求響應(yīng)模型
<!-- 四叛买、發(fā)起請(qǐng)求 -->
-->BaseRemoting.oneway(Connection conn, RemotingCommand request, int timeoutMillis)
<!-- 4.1 使用 netty 發(fā)送消息 -->
-->conn.getChannel().writeAndFlush(request) // netty發(fā)送消息
關(guān)于連接 Connection 相關(guān)的砂代,放在《Connection 連接設(shè)計(jì)》章節(jié)分析,此處跳過(guò)率挣;
總結(jié):
- 獲取連接
- 檢查連接
- 使用 RpcCommandFactory 創(chuàng)建請(qǐng)求對(duì)象 + 序列化 + 設(shè)置請(qǐng)求響應(yīng)模型為 REQUEST_ONEWAY
- 使用 netty 發(fā)送消息
2.2 服務(wù)端處理請(qǐng)求
RpcHandler.channelRead(ChannelHandlerContext ctx, Object msg)
<!-- 一泊藕、創(chuàng)建上下文 -->
-->new InvokeContext()
-->new RemotingContext(ChannelHandlerContext ctx, InvokeContext invokeContext, boolean serverSide, ConcurrentHashMap<String, UserProcessor<?>> userProcessors)
<!-- 二、根據(jù) channel 中的附加屬性獲取相應(yīng)的 Protocol难礼,之后使用該 Protocol 實(shí)例的 CommandHandler 處理消息 -->
-->RpcCommandHandler.handle(RemotingContext ctx, Object msg)
<!-- 2.1 從 CommandHandler 中獲取 CommandCode 為 REQUEST 的 RemotingProcessor 實(shí)例 RpcRequestProcessor,之后使用 RpcRequestProcessor 進(jìn)行請(qǐng)求處理-->
-->RpcRequestProcessor.process(RemotingContext ctx, RpcRequestCommand cmd, ExecutorService defaultExecutor)
<!-- 2.1.1 反序列化clazz(感興趣key)玫锋,用于獲取相應(yīng)的UserProcessor蛾茉;如果相應(yīng)的UserProcessor==null,創(chuàng)建異常響應(yīng)撩鹿,發(fā)送給調(diào)用端谦炬,否則,繼續(xù)執(zhí)行 -->
-->反序列化 clazz
<!--
2.1.2 如果 userProcessor.processInIOThread()==true节沦,直接對(duì)請(qǐng)求進(jìn)行反序列化键思,然后創(chuàng)建ProcessTask任務(wù),最后直接在當(dāng)前的netty worker線(xiàn)程中執(zhí)行ProcessTask.run()甫贯;
否則吼鳞,如果用戶(hù)自定義了ExecutorSelector,則從眾多的自定義線(xiàn)程池選擇一個(gè)線(xiàn)程池叫搁,如果沒(méi)定義赔桌,則使用自定義的線(xiàn)程池userProcessor.getExecutor(),如果最后沒(méi)有自定義的線(xiàn)程池渴逻,則使用ProcessorManager的defaultExecutor疾党,
來(lái)執(zhí)行ProcessTask.run()
-->
<!-- ProcessTask.run() -->
-->RpcRequestProcessor.doProcess(RemotingContext ctx, RpcRequestCommand cmd)
-->反序列化header、content
-->dispatchToUserProcessor(RemotingContext ctx, RpcRequestCommand cmd)
<!-- 構(gòu)造用戶(hù)業(yè)務(wù)上下文 -->
-->new DefaultBizContext(remotingCtx)
<!-- 使用用戶(hù)自定義處理器處理請(qǐng)求 -->
-->MyServerUserProcessor.handleRequest(BizContext bizCtx, MyRequest request)
<!-- 創(chuàng)建響應(yīng) -->
-->RemotingCommand response = RpcCommandFactory.createResponse(Object responseObject, RemotingCommand requestCmd) // 這里將response.id = requestCmd.id
<!-- sync:序列化響應(yīng)并發(fā)送 oneway:logger.debug -->
-->RpcRequestProcessor.sendResponseIfNecessary(RemotingContext ctx, byte type, RemotingCommand response)
-->logger.debug
總結(jié):
服務(wù)端處理請(qǐng)求步驟與Sync模式幾乎完全相同惨奕,只是最后Sync會(huì) “構(gòu)造響應(yīng) + 序列化響應(yīng) + 發(fā)送響應(yīng)”雪位;而 oneway 只會(huì) “構(gòu)造響應(yīng)”(
實(shí)際上,也不應(yīng)該構(gòu)造響應(yīng)
)
- 創(chuàng)建 InvokeContext 和 RemotingContext
- 根據(jù) channel 中的附加屬性獲取相應(yīng)的 Protocol梨撞,之后使用該 Protocol 實(shí)例的 CommandHandler 處理消息
- 從 CommandHandler 中獲取 CommandCode 為 REQUEST 的 RemotingProcessor 實(shí)例 RpcRequestProcessor雹洗,之后使用 RpcRequestProcessor 進(jìn)行請(qǐng)求處理
- 反序列化clazz(感興趣key),用于獲取相應(yīng)的UserProcessor聋袋;如果相應(yīng)的 UserProcessor==null队伟,創(chuàng)建異常響應(yīng),發(fā)送給調(diào)用端幽勒,否則嗜侮,繼續(xù)執(zhí)行
- 如果 userProcessor.processInIOThread()==true,直接對(duì)請(qǐng)求進(jìn)行反序列化,然后創(chuàng)建 ProcessTask 任務(wù)锈颗,最后直接在當(dāng)前的 netty worker 線(xiàn)程中執(zhí)行 ProcessTask.run()顷霹;
否則,如果用戶(hù) UserProcessor 自定義了 ExecutorSelector击吱,則從眾多的自定義線(xiàn)程池選擇一個(gè)線(xiàn)程池淋淀,如果沒(méi)定義,則使用 UserProcessor 自定義的線(xiàn)程池 userProcessor.getExecutor()覆醇,如果還沒(méi)有朵纷,則使用 RemotingProcessor 自定義的線(xiàn)程池 executor,如果最后沒(méi)有自定義的線(xiàn)程池永脓,則使用 ProcessorManager 的defaultExecutor袍辞,來(lái)執(zhí)行ProcessTask.run()
- 反序列化 header、content(如果用戶(hù)自定義了 ExecutorSelector常摧,則header的反序列化需要提前搅吁,header 會(huì)作為眾多自定義線(xiàn)程池的選擇參數(shù))
- 構(gòu)造用戶(hù)業(yè)務(wù)上下文 DefaultBizContext
- 使用用戶(hù)自定義處理器處理請(qǐng)求