Dubbo在服務(wù)調(diào)用時(shí)支持同步調(diào)用和異步調(diào)用等方式坟乾。
在Dubbo2.6版本及之前的版本在實(shí)現(xiàn)異步調(diào)用時(shí)存在一定的缺點(diǎn)迹辐,實(shí)際上是一種假異步。
下面列舉一個(gè)異步案例甚侣。
// 此方法應(yīng)該返回Foo明吩,但異步后會(huì)立刻返回NULL
fooService.findFoo(fooId);
// 立刻得到當(dāng)前調(diào)用的Future實(shí)例,當(dāng)發(fā)生新的調(diào)用時(shí)這個(gè)東西將會(huì)被覆蓋
Future<Foo> fooFuture = RpcContext.getContext().getFuture();
// 調(diào)用另一個(gè)服務(wù)的方法
barService.findBar(barId);
// 立刻得到當(dāng)前調(diào)用的Future
Future<Bar> barFuture = RpcContext.getContext().getFuture();
// 此時(shí)殷费,兩個(gè)服務(wù)的方法在并發(fā)執(zhí)行
// 等待第一個(gè)調(diào)用完成印荔,線程會(huì)進(jìn)入Sleep狀態(tài),當(dāng)調(diào)用完成后被喚醒详羡。
Foo foo = fooFuture.get();
// 同上
Bar bar = barFuture.get();
// 假如第一個(gè)調(diào)用需要等待5秒剔宪,第二個(gè)等待6秒,則整個(gè)調(diào)用過(guò)程完成的時(shí)間是6秒副硅。
當(dāng)調(diào)用服務(wù)方法后桐经,Dubbo會(huì)創(chuàng)建一個(gè)DefaultFuture,并將該Future存放到RpcContext中,在用戶(hù)線程中茶行,如果用戶(hù)想獲取調(diào)用結(jié)果時(shí)躯概,會(huì)從RpcContext中獲取該Future,并調(diào)用get方法畔师,但是如果此時(shí)該服務(wù)仍沒(méi)有處理完畢娶靡,則會(huì)出現(xiàn)阻塞,直到結(jié)果返回或調(diào)用超時(shí)為止看锉。發(fā)生阻塞時(shí)姿锭,該方法的后續(xù)步驟則得不到執(zhí)行。對(duì)于異步來(lái)說(shuō)伯铣,這顯然是不合理的呻此。理想中的異步是如果服務(wù)沒(méi)有處理好,會(huì)繼續(xù)執(zhí)行用戶(hù)線程的后續(xù)方法腔寡,不會(huì)阻塞等待焚鲜。
從Dubbo2.7開(kāi)始,Dubbo的異步調(diào)用開(kāi)始以CompletableFuture為基礎(chǔ)進(jìn)行實(shí)現(xiàn)放前。
DubboInvoker是一個(gè)執(zhí)行體忿磅,通過(guò)它可以發(fā)起遠(yuǎn)程調(diào)用。
在Dubbo2.6的遠(yuǎn)程調(diào)用中凭语,部分代碼如下所示(只保留了部分代碼):
DubboInvoker類(lèi)
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
//忽略部分代碼
boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
//忽略部分代碼
//單向調(diào)用葱她,無(wú)返回值
if (isOneway) {
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
RpcContext.getContext().setFuture(null);
return new RpcResult();
// 異步調(diào)用
} else if (isAsync) {
ResponseFuture future = currentClient.request(inv, timeout);
RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
return new RpcResult();
// 同步調(diào)用
} else {
RpcContext.getContext().setFuture(null);
return (Result) currentClient.request(inv, timeout).get();
}
}
在Dubbo2.6版本及之前的版本中,不管同步調(diào)用還是異步調(diào)用似扔,都會(huì)調(diào)用HeaderExchangeClient.request
方法吨些,返回一個(gè)DefaultFuture
對(duì)象,不同的點(diǎn)是:異步調(diào)用會(huì)將該future存放到RpcContext中炒辉,并先返回一個(gè)空的RpcResult結(jié)果豪墅。而同步調(diào)用不會(huì)將該future存放到RpcContext中,而是直接調(diào)用該future的get方法辆脸,阻塞等待調(diào)用結(jié)果但校。
HeaderExchangeChannel類(lèi)
public ResponseFuture request(Object request, int timeout) throws RemotingException {
Request req = new Request();
req.setVersion(Version.getProtocolVersion());
req.setTwoWay(true);
req.setData(request);
DefaultFuture future = new DefaultFuture(channel, req, timeout);
channel.send(req);
//忽略了部分代碼
return future;
}
DefaultFuture類(lèi)(忽略了部分代碼)
public Object get(int timeout) throws RemotingException {
if (!isDone()) {
long start = System.currentTimeMillis();
lock.lock();
try {
while (!isDone()) {
done.await(timeout, TimeUnit.MILLISECONDS);
if (isDone() || System.currentTimeMillis() - start > timeout) {
break;
}
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
}
return returnFromResponse();
}
當(dāng)服務(wù)端處理完信息后,HeaderExchangeHandler
會(huì)處理發(fā)送過(guò)來(lái)的Response
啡氢,根據(jù)requestId獲取對(duì)應(yīng)的DefaultFuture
對(duì)象状囱,最終調(diào)用doReceived方法對(duì)結(jié)果賦值。利用AQS的條件鎖機(jī)制倘是,喚醒阻塞線程亭枷。
DefaultFuture類(lèi)
private void doReceived(Response res) {
lock.lock();
try {
response = res;
if (done != null) {
done.signal();
}
} finally {
lock.unlock();
}
if (callback != null) {
invokeCallback(callback);
}
}
在Dubbo2.7版本中,對(duì)異步調(diào)用進(jìn)行了改良搀崭,使用了CompletableFuture叨粘。
Dubbo2.7異步調(diào)用的一個(gè)樣例:
// 此調(diào)用會(huì)立即返回null
asyncService.sayHello("world");
// 拿到調(diào)用的Future引用猾编,當(dāng)結(jié)果返回后,會(huì)被通知和設(shè)置到此Future
CompletableFuture<String> helloFuture = RpcContext.getContext().getCompletableFuture();
// 為Future添加回調(diào)
helloFuture.whenComplete((retValue, exception) -> {
if (exception == null) {
System.out.println(retValue);
} else {
exception.printStackTrace();
}
});
同樣是DubboInvoker發(fā)起遠(yuǎn)程調(diào)用升敲,在doInvoke方法中進(jìn)行了改進(jìn):
DubboInvoker2.7.9版本
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation);
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
//單向調(diào)用
if (isOneway) {
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
return AsyncRpcResult.newDefaultAsyncResult(invocation);
//同步調(diào)用和異步調(diào)用
} else {
ExecutorService executor = getCallbackExecutor(getUrl(), inv);
CompletableFuture<AppResponse> appResponseFuture =
currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj); FutureContext.getContext().setCompatibleFuture(appResponseFuture);
AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv);
result.setExecutor(executor);
return result;
}
}
在Dubbo2.7版本中答倡,DubboInvolnvoker對(duì)同步調(diào)用和異步調(diào)用進(jìn)行了統(tǒng)一處理,封裝成CompletableFuture驴党,并以 AsyncRpcResult返回瘪撇。
Dubbo2.7版本下HeaderExchangeChannel.request方法與2.6版本相差不大,只是DeafultFuture對(duì)象有一點(diǎn)不同港庄,即后續(xù)版本繼承了 CompletableFuture類(lèi)倔既。
對(duì)于同步調(diào)用和異步調(diào)用的處理交給AsyncToSyncInvoker
類(lèi)處理。
public Result invoke(Invocation invocation) throws RpcException {
// 調(diào)用DubboInvoker等Invoker返回的調(diào)用結(jié)果
Result asyncResult = invoker.invoke(invocation);
try {
// 如果是同步調(diào)用
if (InvokeMode.SYNC == ((RpcInvocation) invocation).getInvokeMode()) {
// 不能使用CompletableFuture#get()方法鹏氧,否則性能會(huì)出現(xiàn)嚴(yán)重下降渤涌。
asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
}
}
//忽略了部分代碼
return asyncResult;
}
不同與Dubbo2.6版本,Dubbo2.7在處理服務(wù)端返回結(jié)果時(shí)放棄了AQS的條件鎖機(jī)制把还,改用CompletableFuture類(lèi)的complete方法去實(shí)現(xiàn)实蓬。
DefaultFuture類(lèi)
private void doReceived(Response res) {
//忽略部分代碼
if (res.getStatus() == Response.OK) {
// 對(duì)CompletableFuture賦值結(jié)果
this.complete(res.getResult());
} else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {
this.completeExceptionally(new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage()));
} else {
this.completeExceptionally(new RemotingException(channel, res.getErrorMessage()));
}
//忽略部分代碼
}
對(duì)于上述的Result接口,有兩個(gè)實(shí)現(xiàn)對(duì)象笨篷,我們?cè)谶@里進(jìn)行簡(jiǎn)單對(duì)比分析下瞳秽。
AsyncRpcResult
此類(lèi)表示未完成的RPC調(diào)用,它將保留此調(diào)用的一些上下文信息率翅,例如RpcContext和Invocation,因此袖迎,當(dāng)調(diào)用完成并且返回結(jié)果時(shí)冕臭,它可以確保與調(diào)用時(shí)相同地恢復(fù)所有上下文, 是在調(diào)用任何回調(diào)之前進(jìn)行的。
當(dāng)Result實(shí)現(xiàn)CompletionStage時(shí)燕锥,AsyncRpcResult允許您輕松構(gòu)建異步過(guò)濾器鏈辜贵,其狀態(tài)將完全由基礎(chǔ)RPC調(diào)用的狀態(tài)驅(qū)動(dòng)。
AsyncRpcResult不包含任何具體值(由CompletableFuture帶來(lái)的基礎(chǔ)值除外)归形,請(qǐng)將其視為狀態(tài)傳輸節(jié)點(diǎn)托慨。#getValue()
和#getException()
都是從Result接口繼承的,主要實(shí)現(xiàn)它們出于兼容性考慮暇榴。 因?yàn)樵S多舊式Filter實(shí)現(xiàn)很可能直接調(diào)用getValue厚棵。
AppResponse
Duboo3.0.0中引入了AsyncRpcResult來(lái)替換RpcResult,并且RpcResult被替換為AppResponse:AsyncRpcResult是在調(diào)用鏈中實(shí)際傳遞的對(duì)象蔼紧,
AppResponse僅代表業(yè)務(wù)結(jié)果婆硬。
AsyncRpcResult是表示未完成的RPC調(diào)用的未來(lái),而AppResponse是此調(diào)用的實(shí)際返回類(lèi)型奸例。
從理論上講彬犯,AppResponse不必實(shí)現(xiàn)Result接口,這主要是出于兼容性目的。
在Dubbo服務(wù)暴露中谐区,ProtocolFilterWrapper會(huì)構(gòu)建攔截器鏈Filter湖蜕,在調(diào)用實(shí)際的DubboInvoker之前,會(huì)先調(diào)用一些構(gòu)造的Filter宋列,比如ExecuteLimitFilter重荠,限制每個(gè)服務(wù)中每個(gè)方法的最大并發(fā)數(shù)。下面是Dubbo2.6構(gòu)建攔截器器鏈的邏輯:
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
Invoker<T> last = invoker;
List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
if (!filters.isEmpty()) {
for (int i = filters.size() - 1; i >= 0; i--) {
final Filter filter = filters.get(i);
final Invoker<T> next = last;
ast = new Invoker<T>() {
//忽略部分代碼
@Override
public Result invoke(Invocation invocation) {
return filter.invoke(next, invocation);
}
};
}
}
return last;
}
但是在Dubbo2.6版本進(jìn)行異步調(diào)用中虚茶,會(huì)出現(xiàn)一些問(wèn)題戈鲁,因?yàn)镈ubbo2.6在進(jìn)行異步調(diào)用時(shí),會(huì)先返回一個(gè)空的RpcResult對(duì)象嘹叫,當(dāng)某些Filter需要對(duì)返回的結(jié)果進(jìn)行處理時(shí)婆殿,顯然在該情景下無(wú)法處理結(jié)果。Dubbo2.7對(duì)這種情況進(jìn)行了改進(jìn)罩扇。
Dubbo2.7構(gòu)建攔截器鏈的邏輯如下所示:
ProtocolFilterWrapper類(lèi)
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
Invoker<T> last = invoker;
List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
if (!filters.isEmpty()) {
for (Filter filter : filters) {
last = new FilterNode<T>(invoker, last, filter);
}
}
return last;
}
然后解釋下在FilterNode中的invoke方法:
@Override
public Result invoke(Invocation invocation) throws RpcException {
Result asyncResult;
asyncResult = filter.invoke(next, invocation);
//忽略部分代碼
return asyncResult.whenCompleteWithContext((r, t) -> {
//忽略部分代碼
} else if (filter instanceof Filter.Listener) {
Filter.Listener listener = (Filter.Listener) filter;
if (t == null) {
listener.onResponse(r, invoker, invocation);
} else {
listener.onError(t, invoker, invocation);
}
}
});
}
當(dāng)異步調(diào)用時(shí)婆芦,以AsyncRpcResult對(duì)象傳遞,通過(guò)CompletableFuture#whenComplete實(shí)現(xiàn)異步下的邏輯處理喂饥。
public Result whenCompleteWithContext(BiConsumer<Result, Throwable> fn){
// 是CompletableFuture類(lèi)
this.responseFuture = this.responseFuture.whenComplete((v, t) -> {
beforeContext.accept(v, t);
fn.accept(v, t);
afterContext.accept(v, t);
});
return this;
}
Dubbo異步分析到這里就結(jié)束了消约,感謝大家的閱讀。