Dubbo使用了CompletableFuture,實(shí)現(xiàn)了真異步

Dubbo在服務(wù)調(diào)用時(shí)支持同步調(diào)用和異步調(diào)用等方式坟乾。

在Dubbo2.6版本及之前的版本在實(shí)現(xiàn)異步調(diào)用時(shí)存在一定的缺點(diǎn)迹辐,實(shí)際上是一種假異步。

下面列舉一個(gè)異步案例甚侣。

// 此方法應(yīng)該返回Foo明吩,但異步后會(huì)立刻返回NULL
fooService.findFoo(fooId);
// 立刻得到當(dāng)前調(diào)用的Future實(shí)例,當(dāng)發(fā)生新的調(diào)用時(shí)這個(gè)東西將會(huì)被覆蓋
Future<Foo> fooFuture = RpcContext.getContext().getFuture();

// 調(diào)用另一個(gè)服務(wù)的方法
barService.findBar(barId);
// 立刻得到當(dāng)前調(diào)用的Future
Future<Bar> barFuture = RpcContext.getContext().getFuture();
 
// 此時(shí)殷费,兩個(gè)服務(wù)的方法在并發(fā)執(zhí)行
// 等待第一個(gè)調(diào)用完成印荔,線程會(huì)進(jìn)入Sleep狀態(tài),當(dāng)調(diào)用完成后被喚醒详羡。
Foo foo = fooFuture.get();
// 同上
Bar bar = barFuture.get();
// 假如第一個(gè)調(diào)用需要等待5秒剔宪,第二個(gè)等待6秒,則整個(gè)調(diào)用過(guò)程完成的時(shí)間是6秒副硅。

當(dāng)調(diào)用服務(wù)方法后桐经,Dubbo會(huì)創(chuàng)建一個(gè)DefaultFuture,并將該Future存放到RpcContext中,在用戶(hù)線程中茶行,如果用戶(hù)想獲取調(diào)用結(jié)果時(shí)躯概,會(huì)從RpcContext中獲取該Future,并調(diào)用get方法畔师,但是如果此時(shí)該服務(wù)仍沒(méi)有處理完畢娶靡,則會(huì)出現(xiàn)阻塞,直到結(jié)果返回或調(diào)用超時(shí)為止看锉。發(fā)生阻塞時(shí)姿锭,該方法的后續(xù)步驟則得不到執(zhí)行。對(duì)于異步來(lái)說(shuō)伯铣,這顯然是不合理的呻此。理想中的異步是如果服務(wù)沒(méi)有處理好,會(huì)繼續(xù)執(zhí)行用戶(hù)線程的后續(xù)方法腔寡,不會(huì)阻塞等待焚鲜。

從Dubbo2.7開(kāi)始,Dubbo的異步調(diào)用開(kāi)始以CompletableFuture為基礎(chǔ)進(jìn)行實(shí)現(xiàn)放前。

DubboInvoker是一個(gè)執(zhí)行體忿磅,通過(guò)它可以發(fā)起遠(yuǎn)程調(diào)用。
在Dubbo2.6的遠(yuǎn)程調(diào)用中凭语,部分代碼如下所示(只保留了部分代碼):

DubboInvoker類(lèi)
protected Result doInvoke(final Invocation invocation) throws Throwable {
        RpcInvocation inv = (RpcInvocation) invocation;
        //忽略部分代碼
        boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
        boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
        //忽略部分代碼
        //單向調(diào)用葱她,無(wú)返回值
        if (isOneway) {
           boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
           currentClient.send(inv, isSent);
           RpcContext.getContext().setFuture(null);
           return new RpcResult();
        // 異步調(diào)用
        } else if (isAsync) {
           ResponseFuture future = currentClient.request(inv, timeout);
           RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
           return new RpcResult();
        // 同步調(diào)用
        } else {
           RpcContext.getContext().setFuture(null);
           return (Result) currentClient.request(inv, timeout).get();
        }     
}

在Dubbo2.6版本及之前的版本中,不管同步調(diào)用還是異步調(diào)用似扔,都會(huì)調(diào)用HeaderExchangeClient.request方法吨些,返回一個(gè)DefaultFuture對(duì)象,不同的點(diǎn)是:異步調(diào)用會(huì)將該future存放到RpcContext中炒辉,并先返回一個(gè)空的RpcResult結(jié)果豪墅。而同步調(diào)用不會(huì)將該future存放到RpcContext中,而是直接調(diào)用該future的get方法辆脸,阻塞等待調(diào)用結(jié)果但校。

HeaderExchangeChannel類(lèi) 
public ResponseFuture request(Object request, int timeout) throws RemotingException {
        Request req = new Request();
        req.setVersion(Version.getProtocolVersion());
        req.setTwoWay(true);
        req.setData(request);
        DefaultFuture future = new DefaultFuture(channel, req, timeout); 
        channel.send(req);
        //忽略了部分代碼
        return future;
}
DefaultFuture類(lèi)(忽略了部分代碼)
public Object get(int timeout) throws RemotingException {
        if (!isDone()) {
            long start = System.currentTimeMillis();
            lock.lock();
            try {
                while (!isDone()) {
                    done.await(timeout, TimeUnit.MILLISECONDS);
                    if (isDone() || System.currentTimeMillis() - start > timeout) {
                        break;
                    }
                }
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } finally {
                lock.unlock();
            }
        }
        return returnFromResponse();
}

當(dāng)服務(wù)端處理完信息后,HeaderExchangeHandler會(huì)處理發(fā)送過(guò)來(lái)的Response啡氢,根據(jù)requestId獲取對(duì)應(yīng)的DefaultFuture對(duì)象状囱,最終調(diào)用doReceived方法對(duì)結(jié)果賦值。利用AQS的條件鎖機(jī)制倘是,喚醒阻塞線程亭枷。

DefaultFuture類(lèi)
private void doReceived(Response res) {
        lock.lock();
        try {
            response = res;
            if (done != null) {
                done.signal();
            }
        } finally {
            lock.unlock();
        }
        if (callback != null) {
            invokeCallback(callback);
        }
}

在Dubbo2.7版本中,對(duì)異步調(diào)用進(jìn)行了改良搀崭,使用了CompletableFuture叨粘。

Dubbo2.7異步調(diào)用的一個(gè)樣例:

// 此調(diào)用會(huì)立即返回null
asyncService.sayHello("world");
// 拿到調(diào)用的Future引用猾编,當(dāng)結(jié)果返回后,會(huì)被通知和設(shè)置到此Future
CompletableFuture<String> helloFuture = RpcContext.getContext().getCompletableFuture();
// 為Future添加回調(diào)
helloFuture.whenComplete((retValue, exception) -> {
    if (exception == null) {
        System.out.println(retValue);
    } else {
        exception.printStackTrace();
    }
});

同樣是DubboInvoker發(fā)起遠(yuǎn)程調(diào)用升敲,在doInvoke方法中進(jìn)行了改進(jìn):

DubboInvoker2.7.9版本
protected Result doInvoke(final Invocation invocation) throws Throwable {
     RpcInvocation inv = (RpcInvocation) invocation;
     final String methodName = RpcUtils.getMethodName(invocation);
     boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
     //單向調(diào)用
     if (isOneway) {
         boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
         currentClient.send(inv, isSent);
         return AsyncRpcResult.newDefaultAsyncResult(invocation);
      //同步調(diào)用和異步調(diào)用
      } else {
         ExecutorService executor = getCallbackExecutor(getUrl(), inv);
                CompletableFuture<AppResponse> appResponseFuture =
                        currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj);           FutureContext.getContext().setCompatibleFuture(appResponseFuture);
        AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv);
        result.setExecutor(executor);
        return result;
    } 
}

在Dubbo2.7版本中答倡,DubboInvolnvoker對(duì)同步調(diào)用和異步調(diào)用進(jìn)行了統(tǒng)一處理,封裝成CompletableFuture驴党,并以 AsyncRpcResult返回瘪撇。

Dubbo2.7版本下HeaderExchangeChannel.request方法與2.6版本相差不大,只是DeafultFuture對(duì)象有一點(diǎn)不同港庄,即后續(xù)版本繼承了 CompletableFuture類(lèi)倔既。

對(duì)于同步調(diào)用和異步調(diào)用的處理交給AsyncToSyncInvoker類(lèi)處理。

public Result invoke(Invocation invocation) throws RpcException {
        // 調(diào)用DubboInvoker等Invoker返回的調(diào)用結(jié)果
        Result asyncResult = invoker.invoke(invocation);
        try {
            // 如果是同步調(diào)用
            if (InvokeMode.SYNC == ((RpcInvocation) invocation).getInvokeMode()) {
                // 不能使用CompletableFuture#get()方法鹏氧,否則性能會(huì)出現(xiàn)嚴(yán)重下降渤涌。
                asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
            }
        }
        //忽略了部分代碼
        return asyncResult;
    }

不同與Dubbo2.6版本,Dubbo2.7在處理服務(wù)端返回結(jié)果時(shí)放棄了AQS的條件鎖機(jī)制把还,改用CompletableFuture類(lèi)的complete方法去實(shí)現(xiàn)实蓬。

DefaultFuture類(lèi)
private void doReceived(Response res) {
        //忽略部分代碼
        if (res.getStatus() == Response.OK) {
            // 對(duì)CompletableFuture賦值結(jié)果
            this.complete(res.getResult());
        } else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {
            this.completeExceptionally(new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage()));
        } else {
            this.completeExceptionally(new RemotingException(channel, res.getErrorMessage()));
        }
      //忽略部分代碼
}

對(duì)于上述的Result接口,有兩個(gè)實(shí)現(xiàn)對(duì)象笨篷,我們?cè)谶@里進(jìn)行簡(jiǎn)單對(duì)比分析下瞳秽。

AsyncRpcResult

此類(lèi)表示未完成的RPC調(diào)用,它將保留此調(diào)用的一些上下文信息率翅,例如RpcContext和Invocation,因此袖迎,當(dāng)調(diào)用完成并且返回結(jié)果時(shí)冕臭,它可以確保與調(diào)用時(shí)相同地恢復(fù)所有上下文, 是在調(diào)用任何回調(diào)之前進(jìn)行的。

當(dāng)Result實(shí)現(xiàn)CompletionStage時(shí)燕锥,AsyncRpcResult允許您輕松構(gòu)建異步過(guò)濾器鏈辜贵,其狀態(tài)將完全由基礎(chǔ)RPC調(diào)用的狀態(tài)驅(qū)動(dòng)。

AsyncRpcResult不包含任何具體值(由CompletableFuture帶來(lái)的基礎(chǔ)值除外)归形,請(qǐng)將其視為狀態(tài)傳輸節(jié)點(diǎn)托慨。#getValue()#getException()都是從Result接口繼承的,主要實(shí)現(xiàn)它們出于兼容性考慮暇榴。 因?yàn)樵S多舊式Filter實(shí)現(xiàn)很可能直接調(diào)用getValue厚棵。

AppResponse

Duboo3.0.0中引入了AsyncRpcResult來(lái)替換RpcResult,并且RpcResult被替換為AppResponse:AsyncRpcResult是在調(diào)用鏈中實(shí)際傳遞的對(duì)象蔼紧,
AppResponse僅代表業(yè)務(wù)結(jié)果婆硬。

AsyncRpcResult是表示未完成的RPC調(diào)用的未來(lái),而AppResponse是此調(diào)用的實(shí)際返回類(lèi)型奸例。
從理論上講彬犯,AppResponse不必實(shí)現(xiàn)Result接口,這主要是出于兼容性目的。

在Dubbo服務(wù)暴露中谐区,ProtocolFilterWrapper會(huì)構(gòu)建攔截器鏈Filter湖蜕,在調(diào)用實(shí)際的DubboInvoker之前,會(huì)先調(diào)用一些構(gòu)造的Filter宋列,比如ExecuteLimitFilter重荠,限制每個(gè)服務(wù)中每個(gè)方法的最大并發(fā)數(shù)。下面是Dubbo2.6構(gòu)建攔截器器鏈的邏輯:

private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
    Invoker<T> last = invoker;
    List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
    if (!filters.isEmpty()) {
       for (int i = filters.size() - 1; i >= 0; i--) {
           final Filter filter = filters.get(i);
           final Invoker<T> next = last;
           ast = new Invoker<T>() {
                 //忽略部分代碼
                 @Override
                 public Result invoke(Invocation invocation) {
                        return filter.invoke(next, invocation);
                 }
                };
            }
        }
        return last;
}

但是在Dubbo2.6版本進(jìn)行異步調(diào)用中虚茶,會(huì)出現(xiàn)一些問(wèn)題戈鲁,因?yàn)镈ubbo2.6在進(jìn)行異步調(diào)用時(shí),會(huì)先返回一個(gè)空的RpcResult對(duì)象嘹叫,當(dāng)某些Filter需要對(duì)返回的結(jié)果進(jìn)行處理時(shí)婆殿,顯然在該情景下無(wú)法處理結(jié)果。Dubbo2.7對(duì)這種情況進(jìn)行了改進(jìn)罩扇。

Dubbo2.7構(gòu)建攔截器鏈的邏輯如下所示:

ProtocolFilterWrapper類(lèi)
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
        Invoker<T> last = invoker;
        List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
        if (!filters.isEmpty()) {
            for (Filter filter : filters) {
                last = new FilterNode<T>(invoker, last, filter);
            }
        }
        return last;
}

然后解釋下在FilterNode中的invoke方法:

@Override
public Result invoke(Invocation invocation) throws RpcException {
      Result asyncResult;
      asyncResult = filter.invoke(next, invocation);
      //忽略部分代碼
      return asyncResult.whenCompleteWithContext((r, t) -> {
            //忽略部分代碼
            } else if (filter instanceof Filter.Listener) {
                Filter.Listener listener = (Filter.Listener) filter;
                if (t == null) {
                    listener.onResponse(r, invoker, invocation);
                } else {
                    listener.onError(t, invoker, invocation);
                }
            }
        });
}

當(dāng)異步調(diào)用時(shí)婆芦,以AsyncRpcResult對(duì)象傳遞,通過(guò)CompletableFuture#whenComplete實(shí)現(xiàn)異步下的邏輯處理喂饥。

public Result whenCompleteWithContext(BiConsumer<Result, Throwable> fn){
     // 是CompletableFuture類(lèi)
     this.responseFuture = this.responseFuture.whenComplete((v, t) -> {
            beforeContext.accept(v, t);
            fn.accept(v, t);
            afterContext.accept(v, t);
      });
      return this;
}

Dubbo異步分析到這里就結(jié)束了消约,感謝大家的閱讀。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末员帮,一起剝皮案震驚了整個(gè)濱河市或粮,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌捞高,老刑警劉巖氯材,帶你破解...
    沈念sama閱讀 217,509評(píng)論 6 504
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異硝岗,居然都是意外死亡氢哮,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,806評(píng)論 3 394
  • 文/潘曉璐 我一進(jìn)店門(mén)型檀,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)冗尤,“玉大人,你說(shuō)我怎么就攤上這事胀溺×哑撸” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 163,875評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵月幌,是天一觀的道長(zhǎng)碍讯。 經(jīng)常有香客問(wèn)我,道長(zhǎng)扯躺,這世上最難降的妖魔是什么捉兴? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,441評(píng)論 1 293
  • 正文 為了忘掉前任蝎困,我火速辦了婚禮,結(jié)果婚禮上倍啥,老公的妹妹穿的比我還像新娘禾乘。我一直安慰自己,他們只是感情好虽缕,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,488評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布始藕。 她就那樣靜靜地躺著,像睡著了一般氮趋。 火紅的嫁衣襯著肌膚如雪伍派。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 51,365評(píng)論 1 302
  • 那天剩胁,我揣著相機(jī)與錄音诉植,去河邊找鬼。 笑死昵观,一個(gè)胖子當(dāng)著我的面吹牛晾腔,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播啊犬,決...
    沈念sama閱讀 40,190評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼灼擂,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了觉至?” 一聲冷哼從身側(cè)響起剔应,我...
    開(kāi)封第一講書(shū)人閱讀 39,062評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎康谆,沒(méi)想到半個(gè)月后领斥,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,500評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡沃暗,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,706評(píng)論 3 335
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了何恶。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片孽锥。...
    茶點(diǎn)故事閱讀 39,834評(píng)論 1 347
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖细层,靈堂內(nèi)的尸體忽然破棺而出惜辑,到底是詐尸還是另有隱情,我是刑警寧澤疫赎,帶...
    沈念sama閱讀 35,559評(píng)論 5 345
  • 正文 年R本政府宣布盛撑,位于F島的核電站,受9級(jí)特大地震影響捧搞,放射性物質(zhì)發(fā)生泄漏抵卫。R本人自食惡果不足惜狮荔,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,167評(píng)論 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望介粘。 院中可真熱鬧殖氏,春花似錦、人聲如沸姻采。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,779評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)慨亲。三九已至婚瓜,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間刑棵,已是汗流浹背巴刻。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,912評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留铐望,地道東北人冈涧。 一個(gè)月前我還...
    沈念sama閱讀 47,958評(píng)論 2 370
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像正蛙,于是被迫代替她去往敵國(guó)和親督弓。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,779評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容