Java8新的異步編程方式 CompletableFuture

CompletableFuture特別是對微服務(wù)架構(gòu)而言漂佩,會有很大的作為脖含。舉一個具體的場景,電商的商品頁面可能會涉及到商品詳情服務(wù)投蝉、商品評論服務(wù)养葵、相關(guān)商品推薦服務(wù)等等。獲取商品的信息時(/productdetails?productid=xxx)瘩缆,需要調(diào)用多個服務(wù)來處理這一個請求并返回結(jié)果关拒。這里可能會涉及到并發(fā)編程,我們完全可以使用Java 8的CompletableFuture或者RxJava來實現(xiàn)庸娱。

使用demo


    public List<String> findPriceExecutorsCompletableFuture(String product){
        Executor executor = Executors.newFixedThreadPool(Math.min(shops.size(), 100));
        List<CompletableFuture<String>> priceFuture = shops.stream()
                .map(shop -> CompletableFuture
                        .supplyAsync(()-> String.format("%s price is %.2f", shop.getName(), shop.getPrice(product)), executor))
                .collect(Collectors.toList());
        return priceFuture.stream().map(CompletableFuture::join).collect(Collectors.toList());
    }

https://my.oschina.net/u/3703858/blog/1799785

建議如下:

  • 如果你進行的是計算密集型的操作,并且沒有I/O,那么推薦使用Stream接口,因為實現(xiàn)簡單,同時效率也可能是最高的
  • 反之,如果你并行的工作單元還涉及等待I/O的操作(包括網(wǎng)絡(luò)連接等待).那么使用CompletableFuture是靈活性更好,你可以像前面討論的那樣,依據(jù)等待/計算,或者W/C的比率設(shè)定需要使用的線程數(shù)

Future Callable例子:

public void renderPage(CharSequence source) {
List<ImageInfo> info = scanForImageInfo(source);
//創(chuàng)建Callable着绊,它代表了下載所有的圖片
final Callable<List<ImageData>> task = () ->
  info.stream()
        .map(ImageInfo::downloadImage)
        .collect(Collectors.toList());
    // 將下載任務(wù)提交到executor
    Future<List<ImageData>> images = executor.submit(task);
    // renderText(source);
try {
   // 獲得所有下載的圖片(在所有圖片可用之前會一直阻塞)
   final List<ImageData> imageDatas = images.get();
   // 渲染圖片
   imageDatas.forEach(this::renderImage);
} catch (InterruptedException e) {
   // 重新維護線程的中斷狀態(tài)
   Thread.currentThread().interrupt();
   // 我們不需要結(jié)果,所以取消任務(wù)
   images.cancel(true);
} catch (ExecutionException e) {
  throw launderThrowable(e.getCause()); }
}

CompletableFuture

CompletableFuture類實現(xiàn)了CompletionStage和Future接口熟尉。Future是Java 5添加的類归露,用來描述一個異步計算的結(jié)果,但是獲取一個結(jié)果時方法較少,要么通過輪詢isDone斤儿,確認(rèn)完成后剧包,調(diào)用get()獲取值,要么調(diào)用get()設(shè)置一個超時時間雇毫。但是這個get()方法會阻塞住調(diào)用線程玄捕,這種阻塞的方式顯然和我們的異步編程的初衷相違背。
為了解決這個問題棚放,JDK吸收了guava的設(shè)計思想枚粘,加入了Future的諸多擴展功能形成了CompletableFuture。
CompletionStage是一個接口飘蚯,從命名上看得知是一個完成的階段馍迄,它里面的方法也標(biāo)明是在某個運行階段得到了結(jié)果之后要做的事情。

1局骤、進行變換

public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);
public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn);
public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn,Executor executor);

首先說明一下已Async結(jié)尾的方法都是可以異步執(zhí)行的攀圈,如果指定了線程池,會在指定的線程池中執(zhí)行峦甩,如果沒有指定赘来,默認(rèn)會在ForkJoinPool.commonPool()中執(zhí)行,下文中將會有好多類似的凯傲,都不詳細(xì)解釋了犬辰。關(guān)鍵的入?yún)⒅挥幸粋€Function,它是函數(shù)式接口冰单,所以使用Lambda表示起來會更加優(yōu)雅幌缝。它的入?yún)⑹巧弦粋€階段計算后的結(jié)果,返回值是經(jīng)過轉(zhuǎn)化后結(jié)果诫欠。
例如:

@Test
    public void thenApply() {
        String result = CompletableFuture.supplyAsync(() -> "hello").thenApply(s -> s + " world").join();
        System.out.println(result);
    }

結(jié)果為:

hello world

2涵卵、進行消耗

public CompletionStage<Void> thenAccept(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);

thenAccept是針對結(jié)果進行消耗浴栽,因為他的入?yún)⑹荂onsumer,有入?yún)o返回值轿偎。
例如:

@Test
public void thenAccept(){    
       CompletableFuture.supplyAsync(() -> "hello").thenAccept(s -> System.out.println(s+" world"));
}

結(jié)果為:hello world
3典鸡、對上一步的計算結(jié)果不關(guān)心,執(zhí)行下一個操作

public CompletionStage<Void> thenRun(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor);

thenRun它的入?yún)⑹且粋€Runnable的實例贴硫,表示當(dāng)?shù)玫缴弦徊降慕Y(jié)果時的操作椿每。
例如:

  @Test
    public void thenRun(){
        CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "hello";
        }).thenRun(() -> System.out.println("hello world"));
        while (true){}
    }

4、結(jié)合兩個CompletionStage的結(jié)果英遭,進行轉(zhuǎn)化后返回

public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn,Executor executor);

它需要原來的處理返回值,并且other代表的CompletionStage也要返回值之后亦渗,利用這兩個返回值挖诸,進行轉(zhuǎn)換后返回指定類型的值。
例如:

 @Test
    public void thenCombine() {
        String result = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "hello";
        }).thenCombine(CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "world";
        }), (s1, s2) -> s1 + " " + s2).join();
        System.out.println(result);
    }

5法精、結(jié)合兩個CompletionStage的結(jié)果多律,進行消耗

public <U> CompletionStage<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action,     Executor executor);

它需要原來的處理返回值,并且other代表的CompletionStage也要返回值之后搂蜓,利用這兩個返回值狼荞,進行消耗。
例如:

  @Test
    public void thenAcceptBoth() {
        CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "hello";
        }).thenAcceptBoth(CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "world";
        }), (s1, s2) -> System.out.println(s1 + " " + s2));
        while (true){}
    }

6帮碰、在兩個CompletionStage都運行完執(zhí)行

public CompletionStage<Void> runAfterBoth(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action,Executor executor);

不關(guān)心這兩個CompletionStage的結(jié)果相味,只關(guān)心這兩個CompletionStage執(zhí)行完畢,之后在進行操作(Runnable)殉挽。

例如:

 @Test
    public void runAfterBoth(){
        CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "s1";
        }).runAfterBothAsync(CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "s2";
        }), () -> System.out.println("hello world"));
        while (true){}
    }

7丰涉、兩個CompletionStage,誰計算的快斯碌,我就用那個CompletionStage的結(jié)果進行下一步的轉(zhuǎn)化操作

public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other,Function<? super T, U> fn);
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn);
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn,Executor executor);

我們現(xiàn)實開發(fā)場景中一死,總會碰到有兩種渠道完成同一個事情,所以就可以調(diào)用這個方法傻唾,找一個最快的結(jié)果進行處理投慈。
例如:

@Test
    public void applyToEither() {
        String result = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "s1";
        }).applyToEither(CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "hello world";
        }), s -> s).join();
        System.out.println(result);
    }

8、兩個CompletionStage冠骄,誰計算的快伪煤,我就用那個CompletionStage的結(jié)果進行下一步的消耗操作。

public CompletionStage<Void> acceptEither(CompletionStage<? extends T> other,Consumer<? super T> action);
public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action);
public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action,Executor executor);

例如:

@Test
    public void acceptEither() {
        CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "s1";
        }).acceptEither(CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "hello world";
        }), System.out::println);
        while (true){}
    }

9猴抹、兩個CompletionStage带族,任何一個完成了都會執(zhí)行下一步的操作(Runnable)


public CompletionStage<Void> runAfterEither(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action,Executor executor);

例如:

 @Test
    public void runAfterEither() {
        CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "s1";
        }).runAfterEither(CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "s2";
        }), () -> System.out.println("hello world"));
        while (true) {
        }
    }

10、當(dāng)運行時出現(xiàn)了異常蟀给,可以通過exceptionally進行補償蝙砌。

public CompletionStage<T> exceptionally(Function<Throwable, ? extends T> fn);

例如:

 @Test
    public void exceptionally() {
        String result = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            if (1 == 1) {
                throw new RuntimeException("測試一下異常情況");
            }
            return "s1";
        }).exceptionally(e -> {
            System.out.println(e.getMessage());
            return "hello world";
        }).join();
        System.out.println(result);
    }

11阳堕、當(dāng)運行完成時,對結(jié)果的記錄择克。這里的完成時有兩種情況恬总,一種是正常執(zhí)行,返回值肚邢。另外一種是遇到異常拋出造成程序的中斷壹堰。這里為什么要說成記錄,因為這幾個方法都會返回CompletableFuture骡湖,當(dāng)Action執(zhí)行完畢后它的結(jié)果返回原始的CompletableFuture的計算結(jié)果或者返回異常贱纠。所以不會對結(jié)果產(chǎn)生任何的作用。

public CompletionStage<T> whenComplete(BiConsumer<? super T, ? super Throwable> action);
public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action);
public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action,Executor executor);

例如:

 @Test
    public void whenComplete() {
        String result = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            if (1 == 1) {
                throw new RuntimeException("測試一下異常情況");
            }
            return "s1";
        }).whenComplete((s, t) -> {
            System.out.println(s);
            System.out.println(t.getMessage());
        }).exceptionally(e -> {
            System.out.println(e.getMessage());
            return "hello world";
        }).join();
        System.out.println(result);
    }

結(jié)果:

null
java.lang.RuntimeException: 測試一下異常情況
java.lang.RuntimeException: 測試一下異常情況
hello world

12响蕴、運行完成時谆焊,對結(jié)果的處理。這里的完成時有兩種情況浦夷,一種是正常執(zhí)行辖试,返回值。另外一種是遇到異常拋出造成程序的中斷劈狐。

public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,Executor executor);

例如:
出現(xiàn)異常時

@Test
    public void handle() {
        String result = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //出現(xiàn)異常
            if (1 == 1) {
                throw new RuntimeException("測試一下異常情況");
            }
            return "s1";
        }).handle((s, t) -> {
            if (t != null) {
                return "hello world";
            }
            return s;
        }).join();
        System.out.println(result);
    }

結(jié)果:hello world
未出現(xiàn)異常時

@Test
    public void handle() {
        String result = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "s1";
        }).handle((s, t) -> {
            if (t != null) {
                return "hello world";
            }
            return s;
        }).join();
        System.out.println(result);
    }

結(jié)果為:s1

上面就是CompletionStage接口中方法的使用實例罐孝,CompletableFuture同樣也同樣實現(xiàn)了Future,所以也同樣可以使用get進行阻塞獲取值肥缔,總的來說莲兢,CompletableFuture使用起來還是比較爽的,看起來也比較優(yōu)雅一點辫继。

處理自定義異常

1怒见、創(chuàng)建原子對象保存異常對象

final AtomicReference<BizException> foundException = new AtomicReference<>();
...

final CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
                try {
                    //todo 業(yè)務(wù)邏輯
                } catch (BizException e) {
                    foundException.set(e);
                }
            }
            return "OK";
        });

...

if(foundException.get() != null){
    throw foundException.get();
}

2、使用CompletionException

List<CompletableFuture<Object>> futures =
    tasks.stream()
        .map(task -> CompletableFuture.supplyAsync(() -> businessLogic(task)))
        .collect(Collectors.toList());
try {
    List<Object> results = futures.stream()
        .map(CompletableFuture::join)
        .collect(Collectors.toList());
} catch (CompletionException e) {
    throw e.getCause() instanceof BusinessException?
        new BadRequestException("at least one async task had an exception"): e;
}

摘自: http://www.reibang.com/p/6f3ee90ab7d3
https://leokongwq.github.io/2017/01/17/java8-CompletableFuture.html
https://www.jdon.com/idea/java/java-8-completablefuture-vs-parallel-stream.html

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末姑宽,一起剝皮案震驚了整個濱河市遣耍,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌炮车,老刑警劉巖舵变,帶你破解...
    沈念sama閱讀 221,406評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異瘦穆,居然都是意外死亡纪隙,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,395評論 3 398
  • 文/潘曉璐 我一進店門扛或,熙熙樓的掌柜王于貴愁眉苦臉地迎上來绵咱,“玉大人,你說我怎么就攤上這事熙兔”妫” “怎么了艾恼?”我有些...
    開封第一講書人閱讀 167,815評論 0 360
  • 文/不壞的土叔 我叫張陵,是天一觀的道長麸锉。 經(jīng)常有香客問我钠绍,道長,這世上最難降的妖魔是什么花沉? 我笑而不...
    開封第一講書人閱讀 59,537評論 1 296
  • 正文 為了忘掉前任柳爽,我火速辦了婚禮,結(jié)果婚禮上碱屁,老公的妹妹穿的比我還像新娘磷脯。我一直安慰自己,他們只是感情好娩脾,可當(dāng)我...
    茶點故事閱讀 68,536評論 6 397
  • 文/花漫 我一把揭開白布争拐。 她就那樣靜靜地躺著,像睡著了一般晦雨。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上隘冲,一...
    開封第一講書人閱讀 52,184評論 1 308
  • 那天闹瞧,我揣著相機與錄音,去河邊找鬼展辞。 笑死奥邮,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的罗珍。 我是一名探鬼主播洽腺,決...
    沈念sama閱讀 40,776評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼覆旱!你這毒婦竟也來了蘸朋?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,668評論 0 276
  • 序言:老撾萬榮一對情侶失蹤扣唱,失蹤者是張志新(化名)和其女友劉穎藕坯,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體噪沙,經(jīng)...
    沈念sama閱讀 46,212評論 1 319
  • 正文 獨居荒郊野嶺守林人離奇死亡炼彪,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,299評論 3 340
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了正歼。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片辐马。...
    茶點故事閱讀 40,438評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖局义,靈堂內(nèi)的尸體忽然破棺而出喜爷,到底是詐尸還是另有隱情冗疮,我是刑警寧澤,帶...
    沈念sama閱讀 36,128評論 5 349
  • 正文 年R本政府宣布贞奋,位于F島的核電站赌厅,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏轿塔。R本人自食惡果不足惜特愿,卻給世界環(huán)境...
    茶點故事閱讀 41,807評論 3 333
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望勾缭。 院中可真熱鬧揍障,春花似錦、人聲如沸俩由。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,279評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽幻梯。三九已至兜畸,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間碘梢,已是汗流浹背咬摇。 一陣腳步聲響...
    開封第一講書人閱讀 33,395評論 1 272
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留煞躬,地道東北人肛鹏。 一個月前我還...
    沈念sama閱讀 48,827評論 3 376
  • 正文 我出身青樓,卻偏偏與公主長得像恩沛,于是被迫代替她去往敵國和親在扰。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,446評論 2 359

推薦閱讀更多精彩內(nèi)容