服務暴露將做哪些事情愤诱?
- 注冊ZK芜果,監(jiān)聽動態(tài)配置節(jié)點
- 開啟Server端
- 創(chuàng)建代理服務
- Exporter -> Invoker -> proxyService
服務引用將做哪些事情妆兑?
- 注冊ZK榜贴,監(jiān)聽動態(tài)配置節(jié)點鉴分、providr節(jié)點、路由節(jié)點
- 開啟Client端
- 創(chuàng)建代理服務
- proxyService -> Invoker
客戶端請求
ConsumerProxyService -> Invoker【DubboInvoker】 -> Exchanger【HeaderExchangeClient】 -> Transporter【NettyClient】 -> 編碼 -> SEND-TO-SERVER (創(chuàng)建了DefaultFuture惧蛹,Request帶唯一標識)
服務端響應
解碼 -> Transporter【NettyServer】-> 系列Handlers -> 線程池 -> Exporter#getInvoker -> Invoker#invoke -> ProviderProxyService -> callback
最開始看的就是覺得這一塊涉及到的類好多啊扇救,傻傻分不清楚,沒事多看幾遍吧
Exchanger
Exchangers
門面類香嗓,提供各種便捷方法迅腔,先通過SPI獲取Exchanger
,然后調(diào)用Exchanger
的相關方法創(chuàng)建ExchangeServer
靠娱、ExchangeClient
Exchanger
SPI接口钾挟,默認實現(xiàn)類HeaderExchanger
,提供了兩個快捷方法創(chuàng)建ExchangeServer
饱岸、ExchangeClient
@SPI(HeaderExchanger.NAME)
public interface Exchanger {
@Adaptive({Constants.EXCHANGER_KEY})
ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException;
@Adaptive({Constants.EXCHANGER_KEY})
ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException;
}
public class HeaderExchanger implements Exchanger {
public static final String NAME = "header";
@Override
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}
@Override
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
}
ExchangeServer
Server端使用,默認實現(xiàn)類HeaderExchangeServer
徽千,內(nèi)部調(diào)用Transporter
開啟Server服務
public interface ExchangeServer extends Server {
Collection<ExchangeChannel> getExchangeChannels();
ExchangeChannel getExchangeChannel(InetSocketAddress remoteAddress);
}
ExchangeClient
Client端使用苫费,默認實現(xiàn)類HeaderExchangeClient
,核心request
方法双抽,內(nèi)部調(diào)用Transporter
發(fā)送請求
public interface ExchangeClient extends Client, ExchangeChannel {
}
ExchangeChannel
默認實現(xiàn)類 HeaderExchangeChannel
百框,作為HeaderExchangeClient
的一個屬性
Transporter
Transporters
門面類,提供各種便捷方法牍汹,先通過SPI獲取Transporter
铐维,然后調(diào)用Transporter
的相關方法創(chuàng)建Server
柬泽、Client
Transporter
SPI接口,默認實現(xiàn)類NettyTransporter
嫁蛇,提供了兩個快捷方法創(chuàng)建Server
锨并、Client
@SPI("netty")
public interface Transporter {
@Adaptive({Constants.SERVER_KEY, Constants.TRANSPORTER_KEY})
Server bind(URL url, ChannelHandler handler) throws RemotingException;
@Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})
Client connect(URL url, ChannelHandler handler) throws RemotingException;
}
public class NettyTransporter implements Transporter {
public static final String NAME = "netty";
@Override
public Server bind(URL url, ChannelHandler listener) throws RemotingException {
return new NettyServer(url, listener);
}
@Override
public Client connect(URL url, ChannelHandler listener) throws RemotingException {
return new NettyClient(url, listener);
}
}
Server
Server端使用,默認實現(xiàn)類NettyServer
睬棚,用于開啟Server服務第煮,核心方法doOpen
public class NettyServer extends AbstractServer implements Server {
}
Client
Client端使用,默認實現(xiàn)類NettyClient
抑党,核心request
方法用于發(fā)送請求包警,doOpen
用于與服務端建立連接
public class NettyClient extends AbstractClient {
}
服務端啟動服務
DubboProtocol#export =>
DubboProtocol#openServer =>
DubboProtocol#createServer =>
Exchangers#bind =>
NettyServer#doOpen
最終,在NettyServer#doOpen
中通過Netty開啟了一個Server端
DubboProtocol#createServer
=> Exchangers#bind(url, requestHandler)
=> HeaderExchanger#bind(url, requestHandler)
=> return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))))
// Transporters#bind 語句可以拆解為
Transporters#bind
=> NettyTransporter#bind(url, handler)
=> return new NettyServer(url, handler)
=> NettyServer#doOpen【NettyServer構造函數(shù)中調(diào)用了doOpen方法】
即NettyServer
中的hander
屬性底靠,最終指向的是new DecodeHandler(new HeaderExchangeHandler(handler))
害晦。最終Server端返回HeaderExchangeServer
,然后在NettyServer
的構造函數(shù)中暑中,對handle
其實還做了一些封裝
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
// the handler will be warped: MultiMessageHandler->HeartbeatHandler->handler
super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
}
public class ChannelHandlers {
private static ChannelHandlers INSTANCE = new ChannelHandlers();
protected ChannelHandlers() {}
public static ChannelHandler wrap(ChannelHandler handler, URL url) {
return ChannelHandlers.getInstance().wrapInternal(handler, url);
}
protected static ChannelHandlers getInstance() {
return INSTANCE;
}
static void setTestingChannelHandlers(ChannelHandlers instance) {
INSTANCE = instance;
}
protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
.getAdaptiveExtension().dispatch(handler, url)));
}
}
所以壹瘟,最終NettyServer
中的hander
屬性指向MultiMessageHandler -> HeartbeatHandler -> AllDispatcher -> DecodeHandler
客戶端連接服務
調(diào)用鏈太長了,而且隱藏的非常深痒芝,重點省略了一些俐筋,在應用啟動時為Reference對象生成Invoker時創(chuàng)建的
RegistryProtocol#doRefer =>
RegistryDirectory#subscribe =>
RegistryDirectory#toInvokers =>
ProtocolFilterWrapper#refer =>
AbstractProtocol#refer =>
DubboProtocol#protocolBindingRefer =>
DubboProtocol#getClients =>
DubboProtocol#getSharedClient =>
DubboProtocol#buildReferenceCountExchangeClientList =>
DubboProtocol#buildReferenceCountExchangeClient =>
DubboProtocol#initClient =>
Exchangers#connect =>
HeaderExchanger#connect =>
Transporters#connect =>
NettyTransporter#connect =>
NettyClient#<init> =>
NettyClient#doOpen
最終,在NettyClient#doOpen
中通過Netty與Server建立連接
Exchangers#connect
=> HeaderExchanger#connect(url, handler)
=> return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true)
// Transporters#connect 語句可以拆解為
Transporters#connect
=> NettyTransporter#connect(url, handler)
=> return new NettyClient(url, handler)
=> NettyClient#doOpen【NettyClient構造函數(shù)中調(diào)用了doOpen方法】
即NettyClient
中的hander
屬性严衬,最終指向的是new DecodeHandler(new HeaderExchangeHandler(handler))
澄者。最終Client端返回HeaderExchangeClient
,其中的client
屬性也對NettyClient
做了包裝處理
不過在DubboProtocol#buildReferenceCountExchangeClient
方法中對HeaderExchangeClient
包裝了一層请琳,最終Invoker中的Client類型是ReferenceCountExchangeClient
private ReferenceCountExchangeClient buildReferenceCountExchangeClient(URL url) {
ExchangeClient exchangeClient = initClient(url);
return new ReferenceCountExchangeClient(exchangeClient);
}
ReferenceCountExchangeClient
與HeaderExchangeClient
沒什么區(qū)別花鹅,只不過包裝了一層,然后還有一個比較重要的屬性referenceCount
买猖,用于記錄客戶端的個數(shù)膨蛮?
客戶端發(fā)送請求
調(diào)用方代理類 ->
InvokerInvocationHandler#invoke ->
MockClusterInvoker#invoke ->
AbstractClusterInvoker#invoke【獲取LoadBalance】 ->
FailoverClusterInvoker#doInvoke【處理重試次數(shù)】 ->
ProtocolFilterWrapper#invoke【處理Filter鏈路】 ->
AbstractInvoker#invoke【設置Attachments參數(shù)】 ->
DubboInvoker#doInvoke【Exchange交接層】 ->
ReferenceCountExchangeClient#request ->
HeaderExchangeClient#request ->
HeaderExchangeChannel#request【return CompletableFuture】 ->
AbstractPeer#send ->
AbstractClient#send ->
NettyChannel#send ->
Channel#writeAndFlush【發(fā)消息給服務端】
從DubboInvoker#doInvoke
開始與Exchange層交互,核心代碼如下
protected Result doInvoke(final Invocation invocation) throws Throwable {
ExchangeClient currentClient;
if (clients.length == 1) {
currentClient = clients[0];
} else {
currentClient = clients[index.getAndIncrement() % clients.length];
}
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
// return = false竖慧,即oneWay 嫌套,可以減少不必要的Future對象創(chuàng)建
if (isOneway) {
// send=true,即客戶端發(fā)送之后再返回圾旨,否則直接返回
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
RpcContext.getContext().setFuture(null);
return AsyncRpcResult.newDefaultAsyncResult(invocation);
} else {
AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv);
CompletableFuture<Object> responseFuture = currentClient.request(inv, timeout);
asyncRpcResult.subscribeTo(responseFuture);
RpcContext.getContext().setFuture(new FutureAdapter(asyncRpcResult));
return asyncRpcResult;
}
}
ReferenceCountExchangeClient#request =>
HeaderExchangeClient#request =>
HeaderExchangeChannel#request
// HeaderExchangeChannel.java
public CompletableFuture<Object> request(Object request, int timeout) throws RemotingException {
if (closed) {
throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
}
// create request.
Request req = new Request();
req.setVersion(Version.getProtocolVersion());
req.setTwoWay(true);
req.setData(request);
DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout);
try {
channel.send(req);
} catch (RemotingException e) {
future.cancel();
throw e;
}
return future;
}
在這個方法中踱讨,有以下幾個需要注意的點:
- 在
Request
構造函數(shù)內(nèi)部,會為Request
生成一個遞增唯一的ID砍的,用于標識該請求 -
channel#send
調(diào)用過程中痹筛,涉及到NettyChannel#getOrAddChannel
方法的調(diào)用,NettyChannel
中有一個ConcurrentMap<Channel, NettyChannel> CHANNEL_MAP
緩存,用于維護io.netty.channel.Channel
和NettyChannel
的關系 -
channel#send
調(diào)用過程中帚稠,最終會調(diào)用到NettyChannel#send
方法谣旁,該方法真正的將消息發(fā)給Server端 - 返回的
DefaultFuture
是一個CompletableFuture
// NettyChannel.java
public void send(Object message, boolean sent) throws RemotingException {
boolean success = true;
int timeout = 0;
try {
// 將消息發(fā)給Server
ChannelFuture future = channel.writeAndFlush(message);
if (sent) {
// 如果配置了 send=true 參數(shù),客戶端需要等待消息發(fā)出之后再返回
timeout = getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
success = future.await(timeout);
}
Throwable cause = future.cause();
if (cause != null) {
throw cause;
}
} catch (Throwable e) {
throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);
}
if (!success) {
throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress()
+ "in timeout(" + timeout + "ms) limit");
}
}
從上面消息發(fā)送的流程中滋早,好像沒有看到對消息的編碼工作榄审,那是因為在Netty客戶端初始化的時候,已經(jīng)設置了編解碼器
// NettyClient.java
protected void doOpen() throws Throwable {
final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this);
bootstrap = new Bootstrap();
bootstrap.group(nioEventLoopGroup)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.channel(NioSocketChannel.class);
if (getConnectTimeout() < 3000) {
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000);
} else {
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getConnectTimeout());
}
bootstrap.handler(new ChannelInitializer() {
@Override
protected void initChannel(Channel ch) throws Exception {
int heartbeatInterval = UrlUtils.getHeartbeat(getUrl());
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
ch.pipeline()
.addLast("decoder", adapter.getDecoder())
.addLast("encoder", adapter.getEncoder())
.addLast("client-idle-handler", new IdleStateHandler(heartbeatInterval, 0, 0, MILLISECONDS))
.addLast("handler", nettyClientHandler);
String socksProxyHost = ConfigUtils.getProperty(SOCKS_PROXY_HOST);
if(socksProxyHost != null) {
int socksProxyPort = Integer.parseInt(ConfigUtils.getProperty(SOCKS_PROXY_PORT, DEFAULT_SOCKS_PROXY_PORT));
Socks5ProxyHandler socks5ProxyHandler = new Socks5ProxyHandler(new InetSocketAddress(socksProxyHost, socksProxyPort));
ch.pipeline().addFirst(socks5ProxyHandler);
}
}
});
}
先經(jīng)過編碼器馆衔,即InternalEncoder#encode
方法瘟判,InternalEncoder
實現(xiàn)了MessageToByteEncoder
接口,該方法內(nèi)部調(diào)用了Codec2
的相關方法角溃,而Codec2
是一個SPI接口拷获,默認實現(xiàn)DubboCodec
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
protected static Codec2 getChannelCodec(URL url) {
String codecName = url.getParameter(Constants.CODEC_KEY, "telnet");
if (ExtensionLoader.getExtensionLoader(Codec2.class).hasExtension(codecName)) {
return ExtensionLoader.getExtensionLoader(Codec2.class).getExtension(codecName);
} else {
return new CodecAdapter(ExtensionLoader.getExtensionLoader(Codec.class).getExtension(codecName));
}
}
服務端響應請求
上面提到了NettyServer
中的hander
屬性指向 MultiMessageHandler -> HeartbeatHandler -> AllDispatcher -> DecodeHandler -> HeaderExchangeHandler
而NettyServer
開啟Server端的代碼如下
protected void doOpen() throws Throwable {
bootstrap = new ServerBootstrap();
bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
new DefaultThreadFactory("NettyServerWorker", true));
final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
channels = nettyServerHandler.getChannels();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
.childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
.addLast("decoder", adapter.getDecoder())
.addLast("encoder", adapter.getEncoder())
.addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
.addLast("handler", nettyServerHandler);
}
});
ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
channelFuture.syncUninterruptibly();
channel = channelFuture.channel();
}
- 先經(jīng)過解碼器,即
InternalDecoder#decode
方法减细,InternalDecoder
實現(xiàn)了ByteToMessageDecoder
接口匆瓜,該方法內(nèi)部調(diào)用了Codec2
的相關方法,而Codec2
是一個SPI接口未蝌,默認實現(xiàn)DubboCodec
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
protected static Codec2 getChannelCodec(URL url) {
String codecName = url.getParameter(Constants.CODEC_KEY, "telnet");
if (ExtensionLoader.getExtensionLoader(Codec2.class).hasExtension(codecName)) {
return ExtensionLoader.getExtensionLoader(Codec2.class).getExtension(codecName);
} else {
return new CodecAdapter(ExtensionLoader.getExtensionLoader(Codec.class).getExtension(codecName));
}
}
-
MultiMessageHandler
用于處理數(shù)組消息驮吱,如果是消息是MultiMessage
類型,MultiMessage
實現(xiàn)了Iterable
數(shù)組萧吠,則遍歷調(diào)用handle的received方法左冬;否則直接調(diào)用下一個handle的received方法 -
AllChannelHandler
收到消息,將channel handler message
封裝成state為ChannelState.RECEIVED
類型的ChannelEventRunnable
對象纸型,然后交給線程池執(zhí)行 -
ChannelEventRunnable#run
方法中判斷state為ChannelState.RECEIVED
類型拇砰,直接執(zhí)行下一個handler的received方法,即DecodeHandler
狰腌,這個過程是由線程池執(zhí)行 -
DecodeHandler#received
方法中除破,如果消息是Decodeable
類型,對整個消息進行解碼琼腔;如果消息是Request
類型瑰枫,對Request.getData()
進行解碼;如果消息是Response
類型丹莲,對Response.getResult()
進行解碼 -
HeaderExchangeHandler#received
->HeaderExchangeHandler#handleRequest
->requestHandler#reply
光坝,requestHandler
是DubboProtocol
中的一個屬性,ExchangeHandlerAdapter
類型 -
HeaderExchangeHandler#handleRequest
中會創(chuàng)建一個Response
對象甥材,它的ID屬性值盯另,就是Request
對象的ID值,這樣請求和響應就關聯(lián)起來了 -
requestHandler#reply
方法中擂达,從exporterMap
緩存中獲取對應的DubboExporter
對象,然后從DubboExporter
獲取Invoker
,最后執(zhí)行Invoker#invoke
方法板鬓,然后返回一個CompletableFuture
對象 -
HeaderExchangeHandler#handleRequest
方法中接收返回的CompletableFuture
對象悲敷,對它添加回調(diào)處理,在回調(diào)中將返回結果封裝到Response
對象中俭令,然后通過channel將Response
發(fā)出
// ChannelEventRunnable.java
public void run() {
if (state == ChannelState.RECEIVED) {
try {
// RECEIVED 類型后德,直接執(zhí)行下一個handle的received方法,即 DecodeHandler
handler.received(channel, message);
} catch (Exception e) {}
} else {
switch (state) {
case CONNECTED:
try {
handler.connected(channel);
} catch (Exception e) {}
break;
case DISCONNECTED:
try {
handler.disconnected(channel);
} catch (Exception e) {}
break;
case SENT:
try {
handler.sent(channel, message);
} catch (Exception e) {}
break;
case CAUGHT:
try {
handler.caught(channel, exception);
} catch (Exception e) {}
break;
default:
logger.warn("unknown state: " + state + ", message is " + message);
}
}
}
InternalDecoder#decode
=> NettyServerHandler#channelRead
=> AbstractPeer#received
=> MultiMessageHandler#received
=> HeartbeatHandler#received
=> AllChannelHandler#received
------------------ 異步執(zhí)行抄腔,放到線程池 ----------------------
=> ChannelEventRunnable#run
=> DecodeHandler#received
=> DecodeHandler#decode
=> DecodeableRpcInvocation#decode
=> HeaderExchangeHandler#received
=> HeaderExchangeHandler#handleRequest
=> DubboProtocol.requestHandler#reply
------------------ 異步執(zhí)行 -----------------------
----------------擴展點-------------------
=> ProtocolFilterWrapper.invoke
=> EchoFilter.invoke
=> ClassLoaderFilter.invoke
=> GenericFilter.invoke
=> TraceFilter.invoke
=> MonitorFilter.invoke
=> TimeoutFilter.invoke
=> ExceptionFilter.invoke
=> InvokerWrapper.invoke
-----------------擴展點-------------------
=> AbstractProxyInvoker#invoke
=> JavassistProxyFactory.AbstractProxyInvoker#doInvoke
=> 代理類#invokeMethod
=> 真正的service方法
//把接收處理的結果瓢湃,數(shù)據(jù)發(fā)回consumer future#whenComplete
=> channel.send(response)
=> HeaderExchangeChannel
=> NettyChannel.send
=> NioSocketChannel#writeAndFlush(message)
服務端發(fā)送結果
HeaderExchangeChannel#send =>
NettyChannel#send =>
NioSocketChannel#writeAndFlush(message)
客戶端響應結果
在客戶端啟動的時候,入?yún)andler和服務端的handler是同一個
// DubboProtocol#initClient
Exchangers.connect(url, requestHandler);
// HeaderExchanger#connect
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}
Transporters#connect =>
NettyTransporter#connect
return NettyClient
在NettyClient
構造函數(shù)中赫蛇,對handler做了包裝
ChannelHandlers.wrap(handler, url)
public class ChannelHandlers {
private static ChannelHandlers INSTANCE = new ChannelHandlers();
protected ChannelHandlers() {
}
public static ChannelHandler wrap(ChannelHandler handler, URL url) {
return ChannelHandlers.getInstance().wrapInternal(handler, url);
}
protected static ChannelHandlers getInstance() {
return INSTANCE;
}
static void setTestingChannelHandlers(ChannelHandlers instance) {
INSTANCE = instance;
}
protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
.getAdaptiveExtension().dispatch(handler, url)));
}
}
所以绵患,最終NettyClient
中的handler屬性指向 MultiMessageHandler -> HeartbeatHandler -> AllChannelHandler -> DecodeHandler -> HeaderExchangeHandler -> requestHandler
,和服務端處理流程一樣一樣
- 接收消息悟耘,經(jīng)過
MultiMessageHandler
落蝙、HeartbeatHandler
處理,到達AllDispatcher
-
AllChannelHandler
中將消息封裝成new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message)
類型暂幼,交由線程池執(zhí)行 - 線程池執(zhí)行任務筏勒,經(jīng)過
DecodeHandler
到達HeaderExchangeHandler
-
HeaderExchangeHandler#received -> HeaderExchangeHandler#handleResponse -> DefaultFuture#received
,DefaultFuture
中維護了一個請求ID和DefaultFuture的映射關系
旺嬉,Request和Response通過請求ID可以一一對應
public static void received(Channel channel, Response response, boolean timeout) {
try {
DefaultFuture future = FUTURES.remove(response.getId());
if (future != null) {
Timeout t = future.timeoutCheckTask;
if (!timeout) {
t.cancel();
}
future.doReceived(response);
} else {
}
} finally {
CHANNELS.remove(response.getId());
}
}
private void doReceived(Response res) {
if (res == null) {
throw new IllegalStateException("response cannot be null");
}
if (res.getStatus() == Response.OK) {
this.complete(res.getResult());
} else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {
this.completeExceptionally(new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage()));
} else {
this.completeExceptionally(new RemotingException(channel, res.getErrorMessage()));
}
}
- 通過
response.Id
獲取DefaultFuture
- 執(zhí)行
CompletableFuture#complete
方法可以讓 執(zhí)行了CompletableFuture#get
的用戶線程得到響應管行,獲取結果返回。至此整個調(diào)用過程完成
同步轉異步
可是我們在代碼中很多時候都是同步調(diào)用邪媳,很少自己去調(diào)用CompletableFuture#get
方法捐顷,這一部分邏輯又是怎么處理的。在DubboInvoker#doInvoke
方法中悲酷,返回的是一個AsyncRpcResult
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation);
inv.setAttachment(PATH_KEY, getUrl().getPath());
inv.setAttachment(VERSION_KEY, version);
ExchangeClient currentClient;
if (clients.length == 1) {
currentClient = clients[0];
} else {
currentClient = clients[index.getAndIncrement() % clients.length];
}
try {
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
int timeout = getUrl().getMethodPositiveParameter(methodName, TIMEOUT_KEY, DEFAULT_TIMEOUT);
// return = false套菜,即oneWay ,可以減少不必要的Future對象創(chuàng)建
if (isOneway) {
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
RpcContext.getContext().setFuture(null);
return AsyncRpcResult.newDefaultAsyncResult(invocation);
} else {c
AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv);
CompletableFuture<Object> responseFuture = currentClient.request(inv, timeout);
// 訂閱 responseFuture 设易,當 responseFuture 完成的之后逗柴,執(zhí)行 asyncRpcResult 的complete方法, 這樣用戶線程就可以響應了
asyncRpcResult.subscribeTo(responseFuture);
RpcContext.getContext().setFuture(new FutureAdapter(asyncRpcResult));
return asyncRpcResult;
}
} catch (TimeoutException e) {
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} catch (RemotingException e) {
throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
AsyncToSyncInvoker
在AsyncToSyncInvoker#invoke
方法中顿肺,會判斷是同步調(diào)用還是異步調(diào)用戏溺,如果是同步調(diào)用,將調(diào)用AsyncRpcResult#get
方法阻塞用戶線程屠尊,以達到同步效果
public Result invoke(Invocation invocation) throws RpcException {
Result asyncResult = invoker.invoke(invocation);
try {
if (InvokeMode.SYNC == ((RpcInvocation) invocation).getInvokeMode()) {
// 如果是同步調(diào)用旷祸,調(diào)用 asyncResult#get 阻塞用戶線程
asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
}
} catch (InterruptedException e) {
throw new RpcException("Interrupted unexpectedly while waiting for remoting result to return! method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} catch (ExecutionException e) {
Throwable t = e.getCause();
if (t instanceof TimeoutException) {
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} else if (t instanceof RemotingException) {
throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
}
} catch (Throwable e) {
throw new RpcException(e.getMessage(), e);
}
return asyncResult;
}