Future
在java 8之前,我們可以使用Callable+Future來異步執(zhí)行任務(wù)和獲取結(jié)果,比如
ExecutorService service = new ThreadPoolExecutor(5,5,0, TimeUnit.SECONDS,new ArrayBlockingQueue<>(100));
Future<String> f = service.submit(()->{
Thread.sleep(200);
return "helloWorld";
}
);
System.out.println(f.get(300,TimeUnit.MILLISECONDS));
其獲取結(jié)果玉控,get方法實(shí)現(xiàn)本質(zhì)是輪詢校驗(yàn)結(jié)果狀態(tài)積,阻塞實(shí)現(xiàn)依賴的是LockSupport.park()方法狮惜。
那么在dubbo交給Apache進(jìn)行孵化之前的版本中高诺,比如2.6.1版本中,其異步調(diào)用機(jī)制ResponseFuture的實(shí)現(xiàn)就借鑒了jdk的Future的模式碾篡,以DubboInvoker#doInvoke方法為例
if (isOneway) {
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
RpcContext.getContext().setFuture(null);
return new RpcResult();
} else if (isAsync) {
ResponseFuture future = currentClient.request(inv, timeout);
RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
return new RpcResult();
} else {
RpcContext.getContext().setFuture(null);
return (Result) currentClient.request(inv, timeout).get();
}
可以看到虱而,同步與異步的本質(zhì)區(qū)別就是調(diào)用get()方法的時(shí)機(jī)不同,同步調(diào)用的話开泽,請(qǐng)求的同時(shí)由dubbo線程直接調(diào)用get方法阻塞牡拇,獲取結(jié)果;而異步調(diào)用穆律,dubbo直接返回RpcResult惠呼,后續(xù)由業(yè)務(wù)線程再來調(diào)用get方法獲取結(jié)果。
dubbo雖然借鑒了jdk的Future峦耘,但是代碼全部是自己寫的剔蹋,以DefaultFuture#get()為例
public Object get(int timeout) throws RemotingException {
if (timeout <= 0) {
timeout = Constants.DEFAULT_TIMEOUT;
}
if (!isDone()) {
long start = System.currentTimeMillis();
lock.lock();
try {
while (!isDone()) {
done.await(timeout, TimeUnit.MILLISECONDS);
if (isDone() || System.currentTimeMillis() - start > timeout) {
break;
}
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
if (!isDone()) {
throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
}
}
return returnFromResponse();
}
可以看到,dubbo的DefaultFuture實(shí)現(xiàn)贡歧,主要依賴lock+condition的模式滩租,不是jdk Future的LockSupport.park()模式赋秀。
這種模式的缺點(diǎn)有很多,最大的缺點(diǎn)就是結(jié)果獲取是阻塞的律想。
CompletableFuture
在java 8之后猎莲,jdk引入了CompletableFuture類,可以看到其實(shí)現(xiàn)了Future和CompletionStage技即,所以我們可以繼續(xù)像使用Future一樣使用CompletableFuture著洼。
public class CompletableFuture<T> implements Future<T>, CompletionStage<T>
那么CompletionStage 是做什么的呢,用類文件注釋的第一句話說,其代表一種異步階段而叼,執(zhí)行一些行為或者計(jì)算身笤,執(zhí)行完畢后,會(huì)觸發(fā)其他CompletionStage的執(zhí)行葵陵。
A stage of a possibly asynchronous computation, that performs an
action or computes a value when another CompletionStage completes.
相較于Future液荸,CompletableFuture提供的很多新特性都依賴與這個(gè)CompletionStage,這里主要介紹其在dubbo異步調(diào)用中的應(yīng)用脱篙,其他特性不多介紹娇钱,重點(diǎn)介紹下其回調(diào)機(jī)制,先看用法
CompletableFuture<String> f = new CompletableFuture();
try {
f.whenComplete((v,t)->{
if(t!=null){
System.out.println("Exception");
}else{
System.out.println(v);
}
});
f.complete("HelloWorld");
當(dāng)CompletableFuture拿到結(jié)果的時(shí)候绊困,會(huì)回調(diào)whenComplete方法注冊(cè)的回調(diào)邏輯文搂,其核心實(shí)現(xiàn)見CompletableFuture#postComplete, 用注釋的話說,每一步秤朗,這個(gè)stack會(huì)pop and run煤蹭。回調(diào)也是基于此實(shí)現(xiàn)(Doug Lea大神的作品不是簡(jiǎn)單能說明白的取视,后續(xù)再開一文研究)
/**
* Pops and tries to trigger all reachable dependents. Call only
* when known to be done.
*/
final void postComplete() {
/*
* On each step, variable f holds current dependents to pop
* and run. It is extended along only one path at a time,
* pushing others to avoid unbounded recursion.
*/
CompletableFuture<?> f = this; Completion h;
while ((h = f.stack) != null ||
(f != this && (h = (f = this).stack) != null)) {
CompletableFuture<?> d; Completion t;
if (f.casStack(h, t = h.next)) {
if (t != null) {
if (f != this) {
pushStack(h);
continue;
}
h.next = null; // detach
}
f = (d = h.tryFire(NESTED)) == null ? this : d;
}
}
}
那么dubbo的異步調(diào)用是怎么利用這個(gè)回調(diào)機(jī)制的呢硝皂?見DubboInvoker#doInvoke (2.7.3版本)
if (isOneway) {
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
return AsyncRpcResult.newDefaultAsyncResult(invocation);
} else {
AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv);
CompletableFuture<Object> responseFuture = currentClient.request(inv, timeout);
asyncRpcResult.subscribeTo(responseFuture);
// save for 2.6.x compatibility, for example, TraceFilter in Zipkin uses com.alibaba.xxx.FutureAdapter
FutureContext.getContext().setCompatibleFuture(responseFuture);
return asyncRpcResult;
}
之前2.6.1版本中,同步異步的區(qū)別是誰來調(diào)get()方法贫途,那么在2.7.3版本吧彪,DubboInvoker對(duì)同步異步調(diào)用的處理直接統(tǒng)一了,都會(huì)返回一個(gè)AsyncRpcResult丢早, 這個(gè)AsyncRpcResult本身就繼承自CompletableFuture姨裸,同時(shí)其會(huì)subscribe一個(gè)響應(yīng)的CompletableFuture,這里就有了兩個(gè)CompletableFuture怨酝;那么subscribe做了什么呢傀缩?
public void subscribeTo(CompletableFuture<?> future) {
future.whenComplete((obj, t) -> {
if (t != null) {
this.completeExceptionally(t);
} else {
this.complete((Result) obj);
}
});
}
subscribe會(huì)對(duì)響應(yīng)CompletableFuture注冊(cè)了一個(gè)回調(diào),響應(yīng)完成時(shí)农猬,觸發(fā)這個(gè)回調(diào)赡艰;這個(gè)回調(diào)邏輯就是執(zhí)行AsyncRpcResult自身的complete方法,那么如果AsyncRpcResult也有注冊(cè)回調(diào)斤葱,此時(shí)就會(huì)被鏈?zhǔn)接|發(fā)慷垮。
新版本的dubbo既然在DubboInvoker這里對(duì)于同步異步的處理是一樣的揖闸,都是直接返回一個(gè)AsyncRpcResult,那么對(duì)于我們使用者來說料身,怎么來區(qū)別同步和異步呢汤纸?其實(shí)關(guān)鍵就在于怎么用這個(gè)AsyncRpcResult。如果我們拿到AsyncRpcResult直接get芹血,可以認(rèn)為這就是同步調(diào)用贮泞,如果我們拿到AsyncRpcResult,不去調(diào)用get幔烛,而是去注冊(cè)一個(gè)回調(diào)函數(shù)啃擦,等待鏈?zhǔn)接|發(fā),用回調(diào)的方式拿結(jié)果饿悬,那么這就是異步令蛉。
總結(jié):老版本dubbo的異步調(diào)用可以認(rèn)為是假異步,因?yàn)榻Y(jié)果的獲取是阻塞的乡恕,新版本隨著jdk引入CompletableFuture言询,由于回調(diào)機(jī)制的存在,我們業(yè)務(wù)代碼使用dubbo時(shí)候傲宜,也可以注冊(cè)回調(diào),實(shí)現(xiàn)真正的異步非阻塞夫啊。