一、調(diào)用方式
TestResponse response = (TestResponse) client.invokeSync("127.0.0.1:8888", request, 30 * 1000);
二、源碼分析
1.final Connection conn = getConnectionAndInitInvokeContext(url, invokeContext);//創(chuàng)建連接對象
2.this.connectionManager.check(conn);//連接有效性校驗(yàn)
3.this.invokeSync(conn, request, invokeContext, timeoutMillis);
3.1RemotingCommand requestCommand = toRemotingCommand(request, conn, invokeContext,timeoutMillis);//初始化遠(yuǎn)程命令對象撇叁,包括序列化器(用戶可以自定義)和crc(可關(guān)閉)
3.2ResponseCommand responseCommand = (ResponseCommand) super.invokeSync(conn, requestCommand,timeoutMillis);
3.2.1final InvokeFuture future = createInvokeFuture(request, request.getInvokeContext());//創(chuàng)建默認(rèn)的InvokeFuture
3.2.2conn.addInvokeFuture(future);//將InvokeFuture設(shè)置到連接下的invokeFutureMap,考慮到連接的復(fù)用畦贸,invokeFutureMap通過每次請求的唯一ID來保存
3.2.3RemotingCommand response = future.waitResponse(timeoutMillis);//因?yàn)槭峭秸埱笤赡郑ㄟ^countDownLatch來阻塞等待異步獲取到請求接口楞捂,有超時(shí)時(shí)間
3.3RpcResponseResolver.resolveResponseObject(responseCommand,
RemotingUtil.parseRemoteAddress(conn.getChannel()));//字節(jié)碼反序列化
但是通過上面的代碼,我們不難發(fā)現(xiàn)趋厉,這里并沒有像netty里的inbound的channelRead寨闹,那這就要回到我們上一章講的RcpClient的new過程了,源碼如下:
RpcClient
private ConnectionFactory connectionFactory = new RpcConnectionFactory(userProcessors,this);//初始化鏈接工廠,這里就是RpcHandler初始化的地方君账,RpcHandler是在后續(xù)客戶端的初始化過程中繁堡,被添加到Bootstrap里的,詳細(xì)分析可以看上一章乡数;這里還初始化了HeartbeatHandler椭蹄,也是后面客戶端初始化過程需要用到的,心跳設(shè)計(jì)會(huì)在后續(xù)的章節(jié)講到
RpcHandler
channelRead(ChannelHandlerContext ctx, Object msg)
1.ProtocolCode protocolCode = ctx.channel().attr(Connection.PROTOCOL).get();
2.Protocol protocol = ProtocolManager.getProtocol(protocolCode);//獲取鏈接的協(xié)議净赴,協(xié)議設(shè)計(jì)會(huì)在后續(xù)講到
3.protocol.getCommandHandler().handleCommand(new RemotingContext(ctx, new InvokeContext(), serverSide, userProcessors), msg);
3.1final RemotingProcessor processor = processorManager.getProcessor(cmd.getCmdCode());//獲取遠(yuǎn)程調(diào)用處理器
3.2processor.process(ctx, cmd, processorManager.getDefaultExecutor());//使用默認(rèn)的線程池處理請求绳矩,也可以用戶自定義
AbstractRemotingProcessor
3.2.1ProcessTask task = new ProcessTask(ctx, msg);//每一次請求一個(gè)子任務(wù),負(fù)責(zé)相應(yīng)接收
RpcResponseProcessor.doProcess
3.2.1.1Connection conn = ctx.getChannelContext().channel().attr(Connection.CONNECTION).get();//獲取當(dāng)前的鏈接
3.2.1.2InvokeFuture future = conn.removeInvokeFuture(cmd.getId());//從invokeFutureMap獲取當(dāng)前請求的InvokeFuture
3.2.1.3future.putResponse(cmd)//將還未反序列化的RemotingCommand對象放入到InvokeFuture劫侧,等待用戶進(jìn)程獲取
3.2.1.4future.executeInvokeCallback()//通知回調(diào)埋酬,但因?yàn)槲覀兪峭侥J剑鋵?shí)callbackListener是空的烧栋,并沒有相應(yīng)的回調(diào)任務(wù)
總結(jié)
SOFA的這樣由一個(gè)單獨(dú)的線程對inbound進(jìn)去讀取写妥,再有線程池去分發(fā)處理,能一定的減少對于inbound區(qū)IO壓力