Future、CompletableFuture與dubbo異步調(diào)用

Future

在java 8之前,我們可以使用Callable+Future來異步執(zhí)行任務(wù)和獲取結(jié)果,比如

ExecutorService service = new ThreadPoolExecutor(5,5,0, TimeUnit.SECONDS,new ArrayBlockingQueue<>(100));
Future<String> f = service.submit(()->{
                    Thread.sleep(200);
                    return "helloWorld";
                }
        );
        System.out.println(f.get(300,TimeUnit.MILLISECONDS));

其獲取結(jié)果玉控,get方法實(shí)現(xiàn)本質(zhì)是輪詢校驗(yàn)結(jié)果狀態(tài)積,阻塞實(shí)現(xiàn)依賴的是LockSupport.park()方法狮惜。
那么在dubbo交給Apache進(jìn)行孵化之前的版本中高诺,比如2.6.1版本中,其異步調(diào)用機(jī)制ResponseFuture的實(shí)現(xiàn)就借鑒了jdk的Future的模式碾篡,以DubboInvoker#doInvoke方法為例

if (isOneway) {
                boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                currentClient.send(inv, isSent);
                RpcContext.getContext().setFuture(null);
                return new RpcResult();
            } else if (isAsync) {
                ResponseFuture future = currentClient.request(inv, timeout);
                RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
                return new RpcResult();
            } else {
                RpcContext.getContext().setFuture(null);
                return (Result) currentClient.request(inv, timeout).get();
            }

可以看到虱而,同步與異步的本質(zhì)區(qū)別就是調(diào)用get()方法的時(shí)機(jī)不同,同步調(diào)用的話开泽,請(qǐng)求的同時(shí)由dubbo線程直接調(diào)用get方法阻塞牡拇,獲取結(jié)果;而異步調(diào)用穆律,dubbo直接返回RpcResult惠呼,后續(xù)由業(yè)務(wù)線程再來調(diào)用get方法獲取結(jié)果。

dubbo雖然借鑒了jdk的Future峦耘,但是代碼全部是自己寫的剔蹋,以DefaultFuture#get()為例

public Object get(int timeout) throws RemotingException {
        if (timeout <= 0) {
            timeout = Constants.DEFAULT_TIMEOUT;
        }
        if (!isDone()) {
            long start = System.currentTimeMillis();
            lock.lock();
            try {
                while (!isDone()) {
                    done.await(timeout, TimeUnit.MILLISECONDS);
                    if (isDone() || System.currentTimeMillis() - start > timeout) {
                        break;
                    }
                }
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } finally {
                lock.unlock();
            }
            if (!isDone()) {
                throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
            }
        }
        return returnFromResponse();
    }

可以看到,dubbo的DefaultFuture實(shí)現(xiàn)贡歧,主要依賴lock+condition的模式滩租,不是jdk Future的LockSupport.park()模式赋秀。
這種模式的缺點(diǎn)有很多,最大的缺點(diǎn)就是結(jié)果獲取是阻塞的律想。

CompletableFuture

在java 8之后猎莲,jdk引入了CompletableFuture類,可以看到其實(shí)現(xiàn)了Future和CompletionStage技即,所以我們可以繼續(xù)像使用Future一樣使用CompletableFuture著洼。

public class CompletableFuture<T> implements Future<T>, CompletionStage<T> 

那么CompletionStage 是做什么的呢,用類文件注釋的第一句話說,其代表一種異步階段而叼,執(zhí)行一些行為或者計(jì)算身笤,執(zhí)行完畢后,會(huì)觸發(fā)其他CompletionStage的執(zhí)行葵陵。

A stage of a possibly asynchronous computation, that performs an
 action or computes a value when another CompletionStage completes.

相較于Future液荸,CompletableFuture提供的很多新特性都依賴與這個(gè)CompletionStage,這里主要介紹其在dubbo異步調(diào)用中的應(yīng)用脱篙,其他特性不多介紹娇钱,重點(diǎn)介紹下其回調(diào)機(jī)制,先看用法

CompletableFuture<String> f = new CompletableFuture();
        try {
            f.whenComplete((v,t)->{
                if(t!=null){
                    System.out.println("Exception");

                }else{
                    System.out.println(v);
                }

            });
            f.complete("HelloWorld");

當(dāng)CompletableFuture拿到結(jié)果的時(shí)候绊困,會(huì)回調(diào)whenComplete方法注冊(cè)的回調(diào)邏輯文搂,其核心實(shí)現(xiàn)見CompletableFuture#postComplete, 用注釋的話說,每一步秤朗,這個(gè)stack會(huì)pop and run煤蹭。回調(diào)也是基于此實(shí)現(xiàn)(Doug Lea大神的作品不是簡(jiǎn)單能說明白的取视,后續(xù)再開一文研究)

/**
     * Pops and tries to trigger all reachable dependents.  Call only
     * when known to be done.
     */
    final void postComplete() {
        /*
         * On each step, variable f holds current dependents to pop
         * and run.  It is extended along only one path at a time,
         * pushing others to avoid unbounded recursion.
         */
        CompletableFuture<?> f = this; Completion h;
        while ((h = f.stack) != null ||
               (f != this && (h = (f = this).stack) != null)) {
            CompletableFuture<?> d; Completion t;
            if (f.casStack(h, t = h.next)) {
                if (t != null) {
                    if (f != this) {
                        pushStack(h);
                        continue;
                    }
                    h.next = null;    // detach
                }
                f = (d = h.tryFire(NESTED)) == null ? this : d;
            }
        }
    }

那么dubbo的異步調(diào)用是怎么利用這個(gè)回調(diào)機(jī)制的呢硝皂?見DubboInvoker#doInvoke (2.7.3版本)

 if (isOneway) {
                boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                currentClient.send(inv, isSent);
                return AsyncRpcResult.newDefaultAsyncResult(invocation);
            } else {
                AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv);
                CompletableFuture<Object> responseFuture = currentClient.request(inv, timeout);
                asyncRpcResult.subscribeTo(responseFuture);
                // save for 2.6.x compatibility, for example, TraceFilter in Zipkin uses com.alibaba.xxx.FutureAdapter
                FutureContext.getContext().setCompatibleFuture(responseFuture);
                return asyncRpcResult;
            }

之前2.6.1版本中,同步異步的區(qū)別是誰來調(diào)get()方法贫途,那么在2.7.3版本吧彪,DubboInvoker對(duì)同步異步調(diào)用的處理直接統(tǒng)一了,都會(huì)返回一個(gè)AsyncRpcResult丢早, 這個(gè)AsyncRpcResult本身就繼承自CompletableFuture姨裸,同時(shí)其會(huì)subscribe一個(gè)響應(yīng)的CompletableFuture,這里就有了兩個(gè)CompletableFuture怨酝;那么subscribe做了什么呢傀缩?

public void subscribeTo(CompletableFuture<?> future) {
        future.whenComplete((obj, t) -> {
            if (t != null) {
                this.completeExceptionally(t);
            } else {
                this.complete((Result) obj);
            }
        });
    }

subscribe會(huì)對(duì)響應(yīng)CompletableFuture注冊(cè)了一個(gè)回調(diào),響應(yīng)完成時(shí)农猬,觸發(fā)這個(gè)回調(diào)赡艰;這個(gè)回調(diào)邏輯就是執(zhí)行AsyncRpcResult自身的complete方法,那么如果AsyncRpcResult也有注冊(cè)回調(diào)斤葱,此時(shí)就會(huì)被鏈?zhǔn)接|發(fā)慷垮。
新版本的dubbo既然在DubboInvoker這里對(duì)于同步異步的處理是一樣的揖闸,都是直接返回一個(gè)AsyncRpcResult,那么對(duì)于我們使用者來說料身,怎么來區(qū)別同步和異步呢汤纸?其實(shí)關(guān)鍵就在于怎么用這個(gè)AsyncRpcResult。如果我們拿到AsyncRpcResult直接get芹血,可以認(rèn)為這就是同步調(diào)用贮泞,如果我們拿到AsyncRpcResult,不去調(diào)用get幔烛,而是去注冊(cè)一個(gè)回調(diào)函數(shù)啃擦,等待鏈?zhǔn)接|發(fā),用回調(diào)的方式拿結(jié)果饿悬,那么這就是異步令蛉。

總結(jié):老版本dubbo的異步調(diào)用可以認(rèn)為是假異步,因?yàn)榻Y(jié)果的獲取是阻塞的乡恕,新版本隨著jdk引入CompletableFuture言询,由于回調(diào)機(jī)制的存在,我們業(yè)務(wù)代碼使用dubbo時(shí)候傲宜,也可以注冊(cè)回調(diào),實(shí)現(xiàn)真正的異步非阻塞夫啊。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末函卒,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子撇眯,更是在濱河造成了極大的恐慌报嵌,老刑警劉巖,帶你破解...
    沈念sama閱讀 222,807評(píng)論 6 518
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件熊榛,死亡現(xiàn)場(chǎng)離奇詭異锚国,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)玄坦,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 95,284評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門血筑,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人煎楣,你說我怎么就攤上這事豺总。” “怎么了择懂?”我有些...
    開封第一講書人閱讀 169,589評(píng)論 0 363
  • 文/不壞的土叔 我叫張陵喻喳,是天一觀的道長(zhǎng)。 經(jīng)常有香客問我困曙,道長(zhǎng)表伦,這世上最難降的妖魔是什么谦去? 我笑而不...
    開封第一講書人閱讀 60,188評(píng)論 1 300
  • 正文 為了忘掉前任,我火速辦了婚禮蹦哼,結(jié)果婚禮上哪轿,老公的妹妹穿的比我還像新娘。我一直安慰自己翔怎,他們只是感情好窃诉,可當(dāng)我...
    茶點(diǎn)故事閱讀 69,185評(píng)論 6 398
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著赤套,像睡著了一般飘痛。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上容握,一...
    開封第一講書人閱讀 52,785評(píng)論 1 314
  • 那天宣脉,我揣著相機(jī)與錄音,去河邊找鬼剔氏。 笑死塑猖,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的谈跛。 我是一名探鬼主播羊苟,決...
    沈念sama閱讀 41,220評(píng)論 3 423
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼感憾!你這毒婦竟也來了蜡励?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 40,167評(píng)論 0 277
  • 序言:老撾萬榮一對(duì)情侶失蹤阻桅,失蹤者是張志新(化名)和其女友劉穎凉倚,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體嫂沉,經(jīng)...
    沈念sama閱讀 46,698評(píng)論 1 320
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡稽寒,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,767評(píng)論 3 343
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了趟章。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片杏糙。...
    茶點(diǎn)故事閱讀 40,912評(píng)論 1 353
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖尤揣,靈堂內(nèi)的尸體忽然破棺而出搔啊,到底是詐尸還是另有隱情,我是刑警寧澤北戏,帶...
    沈念sama閱讀 36,572評(píng)論 5 351
  • 正文 年R本政府宣布负芋,位于F島的核電站,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏旧蛾。R本人自食惡果不足惜莽龟,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 42,254評(píng)論 3 336
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望锨天。 院中可真熱鬧毯盈,春花似錦、人聲如沸病袄。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,746評(píng)論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽益缠。三九已至脑奠,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間幅慌,已是汗流浹背宋欺。 一陣腳步聲響...
    開封第一講書人閱讀 33,859評(píng)論 1 274
  • 我被黑心中介騙來泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留胰伍,地道東北人齿诞。 一個(gè)月前我還...
    沈念sama閱讀 49,359評(píng)論 3 379
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像骂租,于是被迫代替她去往敵國(guó)和親祷杈。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,922評(píng)論 2 361

推薦閱讀更多精彩內(nèi)容