本篇為dubbo高性能剖析的下篇钩乍,側(cè)重于剖析dubbo在網(wǎng)絡(luò)層的源碼實(shí)現(xiàn)及設(shè)計(jì)。(上篇傳送門dubbo的線程模型)
概述
Dubbo中Consumer端和Provider端由于角色和職責(zé)不盡相同逃贝,本篇將分開(kāi)介紹Consumer、Provider迫摔。希望借助示意圖再加上源碼的剖析沐扳,能讓大家對(duì)Dubbo中的網(wǎng)絡(luò)請(qǐng)求模型有個(gè)比較清晰的了解和認(rèn)識(shí)。
Consumer端
Consumer端由于支持的場(chǎng)景非常多句占,直接給出代碼分析大家可能會(huì)看的云里霧里沪摄。所以這里先給出示意圖,從整體上來(lái)了解請(qǐng)求模型有哪些纱烘,并通過(guò)簡(jiǎn)單的示例對(duì)各個(gè)模型有個(gè)簡(jiǎn)單的認(rèn)識(shí)卓起。然后再開(kāi)始深入剖析源碼實(shí)現(xiàn)。
我們先來(lái)了解下consumer端請(qǐng)求模型有哪些及其特點(diǎn):
- oneway 客戶端發(fā)出請(qǐng)求之后就結(jié)束了凹炸,不需要等待服務(wù)端響應(yīng)戏阅。
- 同步請(qǐng)求,客戶端請(qǐng)求之后當(dāng)前線程同步等待消息返回啤它,處理請(qǐng)求和響應(yīng)是同一個(gè)線程奕筐。
- 異步請(qǐng)求,一種是通過(guò)上下文獲取
Future
對(duì)象后主動(dòng)調(diào)用get
方法阻塞直到返回消息变骡。處理請(qǐng)求和響應(yīng)的也是同一個(gè)線程离赫,只不過(guò)在等待響應(yīng)的同時(shí)可以見(jiàn)縫插針的處理其他的任務(wù)。 - 異步請(qǐng)求塌碌,另一種是通過(guò)
Future
對(duì)象注冊(cè)回調(diào)方法渊胸,處理請(qǐng)求和響應(yīng)的是兩個(gè)不同的線程。(這里拋出個(gè)問(wèn)題響應(yīng)和如何關(guān)聯(lián)到對(duì)應(yīng)的請(qǐng)求的呢)
交互示意圖如下所示:
為了進(jìn)一步區(qū)分異步請(qǐng)求阻塞get()
與回調(diào)的區(qū)別台妆,我們通過(guò)代碼示例進(jìn)一步說(shuō)明
接口契約如下:
public interface DemoService {
String sayHello(String name);
default CompletableFuture<String> sayHelloAsync(String name) {
return CompletableFuture.completedFuture(sayHello(name));
}
}
服務(wù)提供方實(shí)現(xiàn):
public class DemoServiceImpl implements DemoService {
private static final Logger logger = LoggerFactory.getLogger(DemoServiceImpl.class);
@Override
public String sayHello(String name) {
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(3));
return "Hello " + name + ";
}
}
調(diào)用方代碼:
DemoService demoService = ReferenceConfigCache.getCache().get(reference);
CompletableFuture<String> future = demoService.sayHelloAsync("get");
System.err.println("threadName:" + Thread.currentThread().getName() + " get 開(kāi)始");
future.get();
System.err.println("threadName:" + Thread.currentThread().getName() + " get 結(jié)束");
System.err.println("threadName:" + Thread.currentThread().getName() + " callback 開(kāi)始");
demoService.sayHelloAsync("callback").whenComplete((r, e) -> {
System.err.println("threadName:" + Thread.currentThread().getName() + " callback 結(jié)束");
});
System.err.println("threadName:" + Thread.currentThread().getName() + " 執(zhí)行完畢");
代碼中在關(guān)鍵的節(jié)點(diǎn)打印了當(dāng)前線程名翎猛,依次調(diào)用了Future.get
,CompletableFuture.whenComplete
接剩,控制臺(tái)輸出結(jié)果為:
threadName:main get 開(kāi)始
threadName:main get 結(jié)束
threadName:main callback 開(kāi)始
threadName:main 執(zhí)行完畢
threadName:DubboClientHandler-10.0.107.214:20880-thread-1 callback 結(jié)束
通過(guò)執(zhí)行結(jié)果可以發(fā)現(xiàn):通過(guò)Future.get()'會(huì)阻塞當(dāng)前線程等待結(jié)果切厘,而
CompletableFuture`的回調(diào)則使用單獨(dú)的線程不會(huì)阻塞當(dāng)前線程的執(zhí)行。
Consumer關(guān)鍵源碼分析
上面概述了Consumer中存在的幾種網(wǎng)絡(luò)模型懊缺,本小節(jié)我們側(cè)重于源碼分析疫稿,剖析上面幾種請(qǐng)求模型實(shí)現(xiàn)原理。
protected Result doInvoke(final Invocation invocation) throws Throwable {
// 省略部分代碼
try {
// 如果是單向請(qǐng)求
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
int timeout = getUrl().getMethodPositiveParameter(methodName, TIMEOUT_KEY, DEFAULT_TIMEOUT);
if (isOneway) {
// 是否確認(rèn)異步發(fā)送
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
return AsyncRpcResult.newDefaultAsyncResult(invocation);
} else {
// 如果是同步使用ThreadlessExecutor,其他場(chǎng)景使用Dubbo創(chuàng)建的共享線程池
ExecutorService executor = getCallbackExecutor(getUrl(), inv);
// 創(chuàng)建CompletableFuture遗座,并通過(guò)netty work線程池將消息異步發(fā)送出去
CompletableFuture<AppResponse> appResponseFuture =
currentClient.request(inv, timeout, executor).thenApply(o -> (AppResponse) o);
// save for 2.6.x compatibility, for example, TraceFilter in Zipkin uses com.alibaba.xxx.FutureAdapter
FutureContext.getContext().setCompatibleFuture(appResponseFuture);
AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv);
// 設(shè)置執(zhí)行異步回調(diào)任務(wù)的線程池為executor
result.setExecutor(executor);
return result;
}
} catch (TimeoutException e) {
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} catch (RemotingException e) {
throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
可以看到整體上的代碼邏輯有兩個(gè)分支:處理單向請(qǐng)求和非單向請(qǐng)求舀凛,那么我們下面就依次展開(kāi)兩種場(chǎng)景具體分析。
單向請(qǐng)求
1. boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
2. currentClient.send(inv, isSent);
3. return AsyncRpcResult.newDefaultAsyncResult(invocation);
單向請(qǐng)求代碼很簡(jiǎn)單只有三行就完成了途蒋。第1行代碼出現(xiàn)的isSent
參數(shù)表示線程是否等待確認(rèn)異步發(fā)送猛遍。因?yàn)橄⒆罱K還是通過(guò)Netty NIO異步發(fā)送出去的,如果設(shè)置為true意味著要一直等待至Netty發(fā)送完成碎绎,默認(rèn)為false螃壤。第2行代碼跟蹤消息發(fā)送過(guò)程,調(diào)用鏈如下:
-ReferenceCountExchangeClient
--HeaderExchangeClient
---HeaderExchangeChannel
----NettyChannel
最終發(fā)送消息核心代碼如下:
1. public void send(Object message, boolean sent) throws RemotingException {
// 省略非關(guān)鍵代碼
// 這里通過(guò)調(diào)用Netty方法將消息發(fā)送出去
2. ChannelFuture future = channel.writeAndFlush(message);
// 如果方法配置sent=true則需要同步等待消息發(fā)送完畢
3. if (sent) {
// wait timeout ms
4. timeout = getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
5. success = future.await(timeout);
}
// 省略非關(guān)鍵代碼
}
在第2行可以看到消息通過(guò)Netty中的channel將消息異步發(fā)送出去筋帖,返回了一個(gè)Future對(duì)象奸晴,如果isSent
設(shè)置為true那么在執(zhí)行到第5行時(shí)就會(huì)阻塞至消息發(fā)送成功或者超時(shí)后返回。
方法send
執(zhí)行結(jié)束后程序會(huì)返回主流程的doInvoke
方法繼續(xù)執(zhí)行到AsyncRpcResult.newDefaultAsyncResult(invocation);
日麸,這里創(chuàng)建默認(rèn)返回值后就完成了單向請(qǐng)求全部過(guò)程寄啼。
除了單向請(qǐng)求,其他另外三種都是雙向的代箭,有去有回墩划。一個(gè)是請(qǐng)求報(bào)文的發(fā)送,一個(gè)是響應(yīng)報(bào)文的接收嗡综,兩者需要結(jié)合起來(lái)一起合作才能完成一個(gè)RPC的全部過(guò)程乙帮。那么后續(xù)針對(duì)其他三種請(qǐng)求模型代碼分析時(shí)我們也按照先請(qǐng)求后響應(yīng)的步驟展開(kāi)。
同步請(qǐng)求
即客戶端請(qǐng)求之后當(dāng)前線程會(huì)阻塞等待響應(yīng)极景,處理請(qǐng)求和響應(yīng)的同一個(gè)線程察净。接下來(lái)我們重點(diǎn)分析以下實(shí)現(xiàn):
- 消息發(fā)送過(guò)程。
- 消息接受過(guò)程盼樟。
- Dubbo是如何通過(guò)響應(yīng)找到請(qǐng)求氢卡。
- 如何實(shí)現(xiàn)阻塞當(dāng)前線程。
- 響應(yīng)消息返回后阻塞線程是如何喚醒的晨缴。
下面代碼是非單向請(qǐng)求的處理邏輯
// 如果是同步使用ThreadlessExecutor译秦,其他場(chǎng)景使用Dubbo創(chuàng)建的共享線程池
1. ExecutorService executor = getCallbackExecutor(getUrl(), inv);
// 創(chuàng)建CompletableFuture,并通過(guò)netty work線程池將消息異步發(fā)送出去
2. CompletableFuture<AppResponse> appResponseFuture =
currentClient.request(inv, timeout, executor).thenApply(o -> (AppResponse) o);
// save for 2.6.x compatibility, for example, TraceFilter in Zipkin uses com.alibaba.xxx.FutureAdapter
FutureContext.getContext().setCompatibleFuture(appResponseFuture);
AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv);
// 設(shè)置執(zhí)行異步回調(diào)任務(wù)的線程池為executor
result.setExecutor(executor);
return result;
第1行代碼:當(dāng)前執(zhí)行的請(qǐng)求通過(guò)getCallbackExecutor
獲得對(duì)應(yīng)的線程池(用來(lái)執(zhí)行返回請(qǐng)求時(shí)異步回調(diào)過(guò)程)击碗,代碼如下:
protected ExecutorService getCallbackExecutor(URL url, Invocation inv) {
ExecutorService sharedExecutor = ExtensionLoader.getExtensionLoader(ExecutorRepository.class).getDefaultExtension().getExecutor(url);
// 如果當(dāng)前請(qǐng)求是同步請(qǐng)求筑悴,則返回ThreadlessExecutor
if (InvokeMode.SYNC == RpcUtils.getInvokeMode(getUrl(), inv)) {
return new ThreadlessExecutor(sharedExecutor);
} else {
// 否則返回客戶端創(chuàng)建的共享線程池
return sharedExecutor;
}
}
關(guān)于ThreadlessExecutor
有必要介紹下它的特點(diǎn),官方文檔說(shuō)明如下:
-
ThreadlessExecutor
和其他的線程池最大的區(qū)別是它不管理任何線程延都。 - 提交給它的任務(wù)不會(huì)由單獨(dú)的線程去調(diào)度執(zhí)行雷猪。
- 被存放在阻塞隊(duì)列中的任務(wù)只有被調(diào)用了
waitAndDrain()
才會(huì)執(zhí)行,并且執(zhí)行任務(wù)的線程是當(dāng)前調(diào)用了waitAndDrain()
方法的線程晰房。
補(bǔ)充說(shuō)明,對(duì)理解同步請(qǐng)求阻塞原理非常重要:在ThreadlessExecutor
中任務(wù)使用阻塞隊(duì)列存儲(chǔ),如果隊(duì)列是空的情況下直接調(diào)用waitAndDrain()
方法時(shí)殊者,阻塞隊(duì)列會(huì)阻塞當(dāng)前線程直至有新的任務(wù)到來(lái)与境。Dubbo充分利用阻塞隊(duì)列的特性,在請(qǐng)求完成之后調(diào)用waitAndDrain()
方法阻塞住當(dāng)前線程猖吴,當(dāng)Netty的work線程收到響應(yīng)之后會(huì)將消息轉(zhuǎn)換成待處理任務(wù)加入到該線程池中摔刁。此時(shí)剛剛因?yàn)闆](méi)有任務(wù)處理而阻塞的線程會(huì)被喚醒,用來(lái)繼續(xù)處理響應(yīng)的消息海蔽。理解這段話我們來(lái)繼續(xù)分析請(qǐng)求代碼:
第2行代碼:currentClient
調(diào)用request
方法發(fā)送請(qǐng)求并返回CompletableFuture
,進(jìn)一步跟蹤request
源碼:
public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {
if (closed) {
throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
}
// 創(chuàng)建請(qǐng)求共屈,這里隱藏了構(gòu)造方法中創(chuàng)建請(qǐng)求id的細(xì)節(jié),這也是關(guān)聯(lián)request和response的關(guān)鍵党窜。
Request req = new Request();
req.setVersion(Version.getProtocolVersion());
req.setTwoWay(true);
req.setData(request);
// 創(chuàng)建Future對(duì)象拗引,將請(qǐng)求以及線程池保存到Future中
DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout, executor);
try {
channel.send(req);
} catch (RemotingException e) {
future.cancel();
throw e;
}
return future;
}
這段代碼主要完成了三件事:
- 請(qǐng)求對(duì)象的創(chuàng)建,在
new Request()
這里隱藏了構(gòu)造方法中創(chuàng)建請(qǐng)求id的細(xì)節(jié)幌衣,這也是關(guān)聯(lián)request和response的關(guān)鍵矾削,響應(yīng)消息帶上request中生成的id,consumer收到消息時(shí)就能找到對(duì)應(yīng)的request上下文豁护。 - 創(chuàng)建
DefaultFuture
對(duì)象哼凯,把請(qǐng)求、線程池等信息存儲(chǔ)起來(lái)楚里,在DefaultFuture
中有個(gè)非常重要的全局靜態(tài)變量Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<>();
断部,FUTURES
中的key是剛剛請(qǐng)求創(chuàng)建的唯一id,value即為DefaultFuture
對(duì)象班缎。收到響應(yīng)消息之后蝴光,可以很方便的通過(guò)Response中攜帶的id信息查到對(duì)應(yīng)的信息。 - 調(diào)用消息發(fā)送吝梅。
返回future對(duì)象之后虱疏,對(duì)象會(huì)被進(jìn)一步封裝成AsyncRpcResult,到這里貌似主流程都結(jié)束了并沒(méi)有看到調(diào)用``waitAndDrain()方法產(chǎn)生阻塞關(guān)鍵代碼苏携。Dubbo將這塊代碼封裝到了
AsyncToSyncInvoker`中:
@Override
public Result invoke(Invocation invocation) throws RpcException {
1. Result asyncResult = invoker.invoke(invocation);
try {
2. if (InvokeMode.SYNC == ((RpcInvocation) invocation).getInvokeMode()) {
/**
* NOTICE!
* must call {@link java.util.concurrent.CompletableFuture#get(long, TimeUnit)} because
* {@link java.util.concurrent.CompletableFuture#get()} was proved to have serious performance drop.
*/
3. asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
}
// 省略非關(guān)鍵代碼
}
可以看到第2行判斷是否為同步請(qǐng)求做瞪,如果是同步請(qǐng)求會(huì)調(diào)用第3行的get()
方法,而該方法跟進(jìn)去之后如下:
public Result get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
1. if (executor != null && executor instanceof ThreadlessExecutor) {
ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) executor;
2. threadlessExecutor.waitAndDrain();
}
return responseFuture.get(timeout, unit);
}
這里可以看到第一行判斷線程池是否為ThreadlessExecutor
,如果是則會(huì)調(diào)用waitAndDrain()
阻塞等待右冻,到這里同步請(qǐng)求的過(guò)程我們已經(jīng)分析完畢装蓬,接下來(lái)我們來(lái)分析同步請(qǐng)求的響應(yīng)區(qū)里過(guò)程。
還記上篇中出現(xiàn)的AllChannelHandler
嗎纱扭,這個(gè)類將部分IO事件委派給Dubbo中的線程池處理牍帚,其中就包括了消息的接收,我們一起來(lái)看下源碼:
@Override
public void received(Channel channel, Object message) throws RemotingException {
1. ExecutorService executor = getPreferredExecutorService(message);
try {
2. executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
if(message instanceof Request && t instanceof RejectedExecutionException){
sendFeedback(channel, (Request) message, t);
return;
}
throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
}
}
第1行通過(guò)返回的消息獲取線程池乳蛾,第2行將消息進(jìn)一步封裝成任務(wù)交給剛剛線程池處理暗赶。我們進(jìn)一步跟進(jìn)獲取線程池的代碼鄙币,剖析Dubbo是如何通過(guò)響應(yīng)找到請(qǐng)求的上下文的。
public ExecutorService getPreferredExecutorService(Object msg) {
1. if (msg instanceof Response) {
2. Response response = (Response) msg;
3. DefaultFuture responseFuture = DefaultFuture.getFuture(response.getId());
// a typical scenario is the response returned after timeout, the timeout response may has completed the future
4. if (responseFuture == null) {
5. return getSharedExecutorService();
6. } else {
7. ExecutorService executor = responseFuture.getExecutor();
8. if (executor == null || executor.isShutdown()) {
9. executor = getSharedExecutorService();
}
return executor;
}
} else {
return getSharedExecutorService();
}
}
在第3行蹂随,通過(guò)響應(yīng)消息攜帶回來(lái)的id就可以在DefaultFuture
的靜態(tài)全局變量Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<>();
中獲得關(guān)聯(lián)請(qǐng)求的DefaultFuture
了十嘿,進(jìn)一步也就可以獲取到該DefaultFuture
中存儲(chǔ)的線程池ThreadlessExecutor
了。那么任務(wù)被Netty中的工作線程調(diào)用executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
被加入阻塞隊(duì)列后又會(huì)發(fā)生什么呢岳锁,我們來(lái)分析下waitAndDrain()
中的源碼:
public void waitAndDrain() throws InterruptedException {
if (finished) {
return;
}
1. Runnable runnable = queue.take();
2. synchronized (lock) {
3. waiting = false;
4. runnable.run();
}
runnable = queue.poll();
while (runnable != null) {
try {
runnable.run();
} catch (Throwable t) {
logger.info(t);
}
runnable = queue.poll();
}
// mark the status of ThreadlessExecutor as finished.
finished = true;
}
在請(qǐng)求時(shí)會(huì)調(diào)用一次waitAndDrain()
绩衷,由于隊(duì)列中是空的所以線程會(huì)阻塞在第1行代碼。當(dāng)收到響應(yīng)消息時(shí)新的處理任務(wù)被添加進(jìn)來(lái)激率,代碼會(huì)繼續(xù)執(zhí)行走到第4行執(zhí)行剛剛添加進(jìn)來(lái)的任務(wù)咳燕。下面我們剖析任務(wù)執(zhí)行過(guò)程,調(diào)用鏈如下:
ChannelEventRunnable.run()
-HeaderExchangeHandler.received()
--HeaderExchangeHandler.handleResponse()
---DefaultFuture.received()
public static void received(Channel channel, Response response, boolean timeout) {
try {
1. DefaultFuture future = FUTURES.remove(response.getId());
if (future != null) {
Timeout t = future.timeoutCheckTask;
if (!timeout) {
// decrease Time
t.cancel();
}
2. future.doReceived(response);
} else {
logger.warn("The timeout response finally returned at "
+ (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
+ ", response " + response
+ (channel == null ? "" : ", channel: " + channel.getLocalAddress()
+ " -> " + channel.getRemoteAddress()));
}
} finally {
CHANNELS.remove(response.getId());
}
}
第1行通過(guò)調(diào)用id獲取future對(duì)象乒躺,第2行調(diào)用doReceived()
方法招盲,方法內(nèi)容如下:
private void doReceived(Response res) {
if (res == null) {
throw new IllegalStateException("response cannot be null");
}
if (res.getStatus() == Response.OK) {
1. this.complete(res.getResult());
} else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {
this.completeExceptionally(new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage()));
} else {
this.completeExceptionally(new RemotingException(channel, res.getErrorMessage()));
}
// the result is returning, but the caller thread may still waiting
// to avoid endless waiting for whatever reason, notify caller thread to return.
if (executor != null && executor instanceof ThreadlessExecutor) {
ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) executor;
if (threadlessExecutor.isWaiting()) {
threadlessExecutor.notifyReturn(new IllegalStateException("The result has returned, but the biz thread is still waiting" +
" which is not an expected state, interrupt the thread manually by returning an exception."));
}
}
}
這里關(guān)鍵代碼就一行this.complete(res.getResult());。
首先通過(guò)res.getResult()
獲取返回值聪蘸,調(diào)用this.complete()
并將返回值傳入宪肖。如果該future
注冊(cè)了類如whenComplete()
回調(diào)函數(shù)就會(huì)在此時(shí)觸發(fā)。
異步請(qǐng)求
在本篇開(kāi)始的時(shí)候我們就通過(guò)簡(jiǎn)單的示例演示了Dubbo中兩種異步不同的使用方式健爬,一種是通過(guò)get()
主動(dòng)阻塞獲取返回值(是否阻塞要看get時(shí)消息是否返回了)控乾,另一種則是完全異步由其他的線程執(zhí)行回調(diào)方法。
通過(guò)上面對(duì)同步的分析娜遵,假設(shè)AsyncToSyncInvoker
中asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
這段代碼不執(zhí)行會(huì)是不是就可以直接返回了呢蜕衡?
答案是肯定,沒(méi)有了阻塞的過(guò)程设拟,在請(qǐng)求消息發(fā)送之后就立刻返回了慨仿,忽略了這段代碼就是異步請(qǐng)求了。同步請(qǐng)求與客戶端采用future.get()
這種方式獲取結(jié)果集區(qū)別在于前者是框架內(nèi)部阻塞纳胧,后者是客戶端自己主動(dòng)阻塞镰吆。同步阻塞是借助阻塞隊(duì)列實(shí)現(xiàn)的,而future.get()
是借助CompletableFuture.waitingGet
實(shí)現(xiàn)的跑慕。
而采用whenComplete
異步回調(diào)方式則不會(huì)阻塞當(dāng)前線程万皿,異步的回調(diào)是在響應(yīng)結(jié)果到達(dá)之后,通過(guò)共享線程池中的線程執(zhí)行this.complete(res.getResult());
方法核行,來(lái)回調(diào)whenCompelete
方法中的內(nèi)容牢硅。
到這里異步請(qǐng)求的剖析也就結(jié)束了,可以看到Dubbo中同步請(qǐng)求和異步請(qǐng)求代碼大部分都是一樣的芝雪,但是通過(guò)巧妙的ThreadLessExecutor
設(shè)計(jì)完成了異步轉(zhuǎn)同步的操作减余,再借助于CompletableFuture
中提供的異步特性實(shí)現(xiàn)了真正實(shí)現(xiàn)了請(qǐng)求異步。
Provider端
上面我們簡(jiǎn)單介紹了consumer端的幾種請(qǐng)求模型惩系,那么provider端有哪些響應(yīng)模型呢位岔?
一個(gè)請(qǐng)求過(guò)來(lái)時(shí)如筛,在provider端大致要經(jīng)過(guò)以下幾個(gè)處理過(guò)程:
- 首先接收請(qǐng)求
- 反射調(diào)用對(duì)應(yīng)的方法獲取返回結(jié)果
- 將返回?cái)?shù)據(jù)發(fā)送給consumer端。
在上篇線程模型中我們介紹了Dubbo在默認(rèn)策略下赃承,請(qǐng)求的接收會(huì)由單獨(dú)創(chuàng)建的線程池處理妙黍,而非Netty的工作線程處理悴侵。請(qǐng)求接收到之后瞧剖,經(jīng)過(guò)解析處理最終會(huì)通過(guò)代理類反射調(diào)用對(duì)應(yīng)的方法并拿到結(jié)果,下面為調(diào)用方法的關(guān)鍵代碼(位于AbstractProxyInvoker
)可免。
public Result invoke(Invocation invocation) throws RpcException {
try {
// 反射調(diào)用方法獲取結(jié)果
System.err.println("threadName:" + Thread.currentThread().getName() + "發(fā)起了調(diào)用");
Object value = doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments());
// 將返回結(jié)果封裝成CompletableFuture
CompletableFuture<Object> future = wrapWithFuture(value);
// 將結(jié)果通過(guò)AppResponse封裝
CompletableFuture<AppResponse> appResponseFuture = future.handle((obj, t) -> {
AppResponse result = new AppResponse();
if (t != null) {
if (t instanceof CompletionException) {
result.setException(t.getCause());
} else {
result.setException(t);
}
} else {
result.setValue(obj);
}
return result;
});
// 返回支持異步結(jié)果
return new AsyncRpcResult(appResponseFuture, invocation);
} catch (InvocationTargetException e) {
if (RpcContext.getContext().isAsyncStarted() && !RpcContext.getContext().stopAsync()) {
logger.error("Provider async started, but got an exception from the original method, cannot write the exception back to consumer because an async result may have returned the new thread.", e);
}
return AsyncRpcResult.newDefaultAsyncResult(null, e.getTargetException(), invocation);
} catch (Throwable e) {
throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
以上調(diào)用方法代碼會(huì)將結(jié)果統(tǒng)一封裝為支持異步的CompletableFuture
抓于。如果被調(diào)用的方法本身不支持異步,那么在這里主線程即調(diào)用方法的都是一個(gè)線程浇借。下面示例我們以sayHelloAsync
方法為例演示服務(wù)端異步的場(chǎng)景:
@Override
public CompletableFuture<String> sayHelloAsync(String name) {
return CompletableFuture.supplyAsync(() -> {
System.err.println("threadName:" + Thread.currentThread().getName() + "執(zhí)行了方法");
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(3));
return "Hello";
});
}
在執(zhí)行調(diào)用前以及方法內(nèi)部分別打印了線程名稱捉撮,輸出如下內(nèi)容:
threadName:DubboServerHandler-10.0.107.214:20880-thread-2發(fā)起了調(diào)用
threadName:ForkJoinPool.commonPool-worker-9執(zhí)行了方法
可以明顯看到兩者是由不同線程執(zhí)行的,而同步的輸入如下的內(nèi)容:
threadName:DubboServerHandler-10.0.107.214:20880-thread-2發(fā)起了調(diào)用
threadName:DubboServerHandler-10.0.107.214:20880-thread-2 執(zhí)行了方法
可以看到調(diào)用以及方法的執(zhí)行都是有同一個(gè)線程負(fù)責(zé)的妇垢。
上面我們分析了provider端請(qǐng)求接收即服務(wù)方法調(diào)用的過(guò)程巾遭,下面我們繼續(xù)分析結(jié)果發(fā)送的過(guò)程(HeaderExchangeHandler
)。
繼上面反射調(diào)用之后獲得返回的CompletableFuture
闯估,這里注冊(cè)了回調(diào)方法whenComplete
灼舍,方法將調(diào)用結(jié)果通過(guò)channel
發(fā)送出去。
void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException {
// 省略無(wú)關(guān)代碼
// find handler by message class.
Object msg = req.getData();
try {
// 反射調(diào)用方法
CompletionStage<Object> future = handler.reply(channel, msg);
future.whenComplete((appResult, t) -> {
System.err.println("threadName:" + Thread.currentThread().getName() + "執(zhí)行了回調(diào)");
try {
if (t == null) {
res.setStatus(Response.OK);
res.setResult(appResult);
} else {
res.setStatus(Response.SERVICE_ERROR);
res.setErrorMessage(StringUtils.toString(t));
}
// 發(fā)送返回消息
channel.send(res);
} catch (RemotingException e) {
logger.warn("Send result to consumer failed, channel is " + channel + ", msg is " + e);
}
});
} catch (Throwable e) {
res.setStatus(Response.SERVICE_ERROR);
res.setErrorMessage(StringUtils.toString(e));
channel.send(res);
}
}
進(jìn)一步追蹤發(fā)送消息代碼涨薪,調(diào)用鏈路如下:
-HeaderExchangeHandler
--HeaderExchangeChannel
---NettyChannel
----AbstractChannel
-----DefaultChannelPipeline
------AbstractChannelHandlerContext
最終關(guān)鍵代碼如下:
private static void safeExecute(EventExecutor executor, Runnable runnable, ChannelPromise promise, Object msg) {
try {
executor.execute(runnable);
} catch (Throwable cause) {
try {
promise.setFailure(cause);
} finally {
if (msg != null) {
ReferenceCountUtil.release(msg);
}
}
}
}
這里出現(xiàn)的executor
是Netty中的worker線程池骑素,真正的消息響應(yīng)發(fā)送的工作是由該組線程池來(lái)完成的。到這里消息響應(yīng)的關(guān)鍵過(guò)程已經(jīng)分析完畢刚夺,總結(jié)示意圖如下所示:
在異步場(chǎng)景時(shí)献丑,Dubbo Server Handler Thread 只負(fù)責(zé)服務(wù)調(diào)用,方法的執(zhí)行由異步的ForkJoin Common Pool完成(業(yè)務(wù)線程)侠姑,當(dāng)然也可以手動(dòng)指定線程池创橄。當(dāng)方法執(zhí)行完畢后在回調(diào)方法中完成消息發(fā)送的方法觸發(fā),消息的發(fā)送則又由單獨(dú)的Netty Work Thread來(lái)完成了莽红。
在同步場(chǎng)景時(shí)妥畏,Dubbo Server Handler Thread 需要負(fù)責(zé)服務(wù)調(diào)用以及方法的執(zhí)行,獲取結(jié)構(gòu)后則觸發(fā)消息發(fā)送船老,具體的發(fā)送動(dòng)作由單獨(dú)的Netty Work Thread來(lái)完成咖熟,可見(jiàn)同步和異步的區(qū)別在于方法的執(zhí)行是由誰(shuí)來(lái)完成。