Java8學(xué)習(xí)筆記之CompletableFuture組合式異步編程

1韧献、Future接口

Future接口在Java 5中被引入舌劳,設(shè)計(jì)初衷是對(duì)將來(lái)某個(gè)時(shí)刻會(huì)發(fā)生的結(jié)果進(jìn)行建模。它建模了一種異步計(jì)算军洼,返回一個(gè)執(zhí)行運(yùn)算結(jié)果的引用巩螃,當(dāng)運(yùn)算結(jié)束后,這個(gè)引用被返回給調(diào)用方匕争。在Future中觸發(fā)那些潛在耗時(shí)的操作把調(diào)用線(xiàn)程解放出來(lái)避乏,讓它能繼續(xù)執(zhí)行其他有價(jià)值的工作,不需要等待耗時(shí)的操作完成甘桑。
示例:使用Future以異步的方式執(zhí)行一個(gè)耗時(shí)的操作

ExecutorService executor = Executors.newCachedThreadPool();
Future<Double> future = executor.submit(new Callable<Double>() { //向ExecutorService提交一個(gè)Callable對(duì)象 
  public Double call() {
    return doSomeLongComputation();//以異步方式在新線(xiàn)程中執(zhí)行耗時(shí)的操作
  }
});
doSomethingElse();
try {
  Double result = future.get(1, TimeUnit.SECONDS);//獲取異步操作結(jié)果拍皮,如果被阻塞,無(wú)法得到結(jié)果跑杭,在等待1秒鐘后退出
} catch (ExecutionException ee) {
  // 計(jì)算拋出一個(gè)異常
} catch (InterruptedException ie) {
  // 當(dāng)前線(xiàn)程在等待過(guò)程中被中斷
} catch (TimeoutException te) {
  // 在Future對(duì)象完成之前超時(shí)
}

這種編程方式讓你的線(xiàn)程可以在ExecutorService以并發(fā)方式調(diào)用另一個(gè)線(xiàn)程執(zhí)行耗時(shí)操作的同時(shí)铆帽,去執(zhí)行一些其他任務(wù)。如果已經(jīng)運(yùn)行到?jīng)]有異步操作的結(jié)果就無(wú)法繼續(xù)進(jìn)行時(shí)德谅,可以調(diào)用它的get方法去獲取操作結(jié)果爹橱。如果操作已經(jīng)完成,該方法會(huì)立刻返回操作結(jié)果窄做,否則它會(huì)阻塞線(xiàn)程愧驱,直到操作完成慰技,返回相應(yīng)的結(jié)果。
為了處理長(zhǎng)時(shí)間運(yùn)行的操作永遠(yuǎn)不返回的可能性组砚,雖然Future提供了一個(gè)無(wú)需任何參數(shù)的get方法吻商,但還是推薦使用重載版本的get方法,它接受一個(gè)超時(shí)的參數(shù)糟红,可以定義線(xiàn)程等待Future結(jié)果的時(shí)間手报,而不是永無(wú)止境地等待下去。

image.png

Future接口的局限性
Future接口提供了方法來(lái)檢測(cè)異步計(jì)算是否已經(jīng)結(jié)束(使用isDone方法)改化,等待異步操作結(jié)束掩蛤,以及獲取計(jì)算的結(jié)果。但這些特性還不足以讓你編寫(xiě)簡(jiǎn)潔的并發(fā)代碼陈肛。

  • 將兩個(gè)異步計(jì)算合并為一個(gè)揍鸟,這兩個(gè)異步計(jì)算之間相互獨(dú)立,同時(shí)第二個(gè)又依賴(lài)于第一個(gè)的結(jié)果句旱。
  • 等待Future集合中的所有任務(wù)都完成阳藻。
  • 僅等待Future集合中快結(jié)束的任務(wù)完成,并返回它的結(jié)果谈撒。
  • 通過(guò)編程方式完成一個(gè)Future任務(wù)的執(zhí)行腥泥。
  • 應(yīng)對(duì)Future的完成事件(即當(dāng)Future的完成事件發(fā)生時(shí)會(huì)收到通知,并能使用Future計(jì)算的結(jié)果進(jìn)行下一步操作啃匿,不只是簡(jiǎn)單地阻塞等待操作結(jié)果)蛔外。
2、實(shí)現(xiàn)異步API

示例:實(shí)現(xiàn)價(jià)格查詢(xún)器
聲明依據(jù)指定產(chǎn)品名稱(chēng)返回價(jià)格的方法:

public class Shop {
  public double getPrice(String product) {
    // 待實(shí)現(xiàn)
  }
}

采用delay方法模擬長(zhǎng)期運(yùn)行的方法的執(zhí)行溯乒,它會(huì)人為地引入1秒鐘的延遲夹厌,方法聲明如下:

public static void delay() {
 try {
   Thread.sleep(1000L);
 } catch (InterruptedException e) {
   throw new RuntimeException(e);
 }
}

在getPrice方法中引入一個(gè)模擬的延遲:

public double getPrice(String product) {
  return calculatePrice(product);
}
private double calculatePrice(String product) 
  delay();
  return random.nextDouble() * product.charAt(0) + product.charAt(1);
}

這個(gè)API的使用者調(diào)用該方法時(shí),它依舊會(huì)被阻塞裆悄。為等待同步事件完成而等待1秒鐘矛纹,這是無(wú)法接受的,尤其是考慮到佳價(jià)格查詢(xún)器對(duì)網(wǎng)絡(luò)中的所有商店都要重復(fù)這種操作光稼。

1)將同步方法轉(zhuǎn)換為異步方法

首先需要將getPrice轉(zhuǎn)換為getPriceAsync方法或南,并修改它的返回值:

public Future<Double> getPriceAsync(String product) { ... }

Java 5引入了java.util.concurrent.Future接口表示一個(gè)異步計(jì)算(即調(diào)用線(xiàn)程可以繼續(xù)運(yùn)行,不會(huì)因?yàn)檎{(diào)用方法而阻塞)的結(jié)果艾君。這意味著Future是一個(gè)暫時(shí)還不可知值的處理器采够,這個(gè)值在計(jì)算完成后,可以通過(guò)調(diào)用它的get方法取得腻贰。getPriceAsync方法能立刻返回吁恍,給調(diào)用線(xiàn)程一個(gè)機(jī)會(huì)扒秸,能在同一時(shí)間去執(zhí)行其他有價(jià)值的計(jì)算任務(wù)播演。新的CompletableFuture類(lèi)提供了大量的方法冀瓦,讓我們有機(jī)會(huì)以多種可能的方式輕松地實(shí)現(xiàn)這個(gè)方法:

public Future<Double> getPriceAsync(String product) {
  CompletableFuture<Double> futurePrice = new CompletableFuture<>();//創(chuàng)建CompletableFuture對(duì)象,它會(huì)包含計(jì)算結(jié)果
  new Thread(() -> {
    double price = calculatePrice(product);//在另一個(gè)線(xiàn)程中以異步方式執(zhí)行計(jì)算 
    futurePrice.complete(price);//需長(zhǎng)時(shí)間計(jì)算的任務(wù)結(jié)束并得出結(jié)果時(shí)写烤,設(shè)置Future的返回值 
  }).start();
  return futurePrice;//無(wú)需等待還沒(méi)結(jié)束的計(jì)算翼闽,直接返回Future對(duì)象
}

上述代碼中,創(chuàng)建了一個(gè)代表異步計(jì)算的CompletableFuture對(duì)象實(shí)例洲炊,它在計(jì)算完成時(shí)會(huì)包含計(jì)算的結(jié)果感局。接著調(diào)用fork創(chuàng)建了另一個(gè)線(xiàn)程去執(zhí)行實(shí)際的價(jià)格計(jì)算工作,不等該耗時(shí)計(jì)算任務(wù)結(jié)束暂衡,直接返回一個(gè)Future實(shí)例询微。當(dāng)請(qǐng)求的產(chǎn)品價(jià)格終計(jì)算得出時(shí),可以使用它的complete方法狂巢,結(jié)束completableFuture對(duì)象的運(yùn)行撑毛,并設(shè)置變量的值。
使用異步API:

Shop shop = new Shop("BestShop");
long start = System.nanoTime();
Future<Double> futurePrice = shop.getPriceAsync("my favorite product"); 
long invocationTime = ((System.nanoTime() - start) / 1_000_000); 
System.out.println("Invocation returned after " + invocationTime                                                  + " msecs");
doSomethingElse(); //執(zhí)行更多任務(wù)
try {
  double price = futurePrice.get();//從Future對(duì)象中讀取價(jià)格唧领,如果價(jià)格未知藻雌,會(huì)發(fā)生阻塞
  System.out.printf("Price is %.2f%n", price);
} catch (Exception e) {
  throw new RuntimeException(e);
}
long retrievalTime = ((System.nanoTime() - start) / 1_000_000);
System.out.println("Price returned after " + retrievalTime + " msecs");

上述代碼中,客戶(hù)向商店查詢(xún)了某種商品的價(jià)格斩个。由于商店提供了異步API胯杭,該調(diào)用立刻返回了一個(gè)Future對(duì)象,通過(guò)該對(duì)象客戶(hù)可以在將來(lái)的某個(gè)時(shí)刻取得商品的價(jià)格受啥。這種方式下做个,客戶(hù)在進(jìn)行商品價(jià)格查詢(xún)的同時(shí),還能執(zhí)行一些其他的任務(wù)滚局,比如查詢(xún)其他家商店中商品的價(jià)格叁温,不會(huì)阻塞在那里等待第一家商店返回請(qǐng)求的結(jié)果。最后核畴,如果所有有意義的工作都完成膝但,客戶(hù)所有要執(zhí)行的工作都依賴(lài)于商品價(jià)格時(shí),再調(diào)用Future的get方法谤草。執(zhí)行這個(gè)操作后跟束,客戶(hù)要么獲得Future中封裝的值(如果異步任務(wù)已經(jīng)完成),要么發(fā)生阻塞丑孩,直到該異步任務(wù)完成冀宴,期望的值能夠訪(fǎng)問(wèn)。

2)錯(cuò)誤處理

如果價(jià)格計(jì)算過(guò)程中產(chǎn)生了錯(cuò)誤温学,這種情況下你會(huì)得到一個(gè)相當(dāng)糟糕的結(jié)果:用于提示錯(cuò)誤的異常會(huì)被限制在試圖計(jì)算商品價(jià)格的當(dāng)前線(xiàn)程的范圍內(nèi)略贮,最終會(huì)殺死該線(xiàn)程,而這會(huì)導(dǎo)致等待get方法返回結(jié)果的客戶(hù)端永久地被阻塞。
客戶(hù)端可以使用重載版本的get方法逃延,它使用一個(gè)超時(shí)參數(shù)來(lái)避免發(fā)生這樣的情況览妖。這是值得推薦的做法,應(yīng)盡量在代碼中添加超時(shí)判斷的邏輯揽祥,避免發(fā)生類(lèi)似的問(wèn)題讽膏。這種方法能防止程序永久地等待下去,超時(shí)發(fā)生時(shí)拄丰,程序會(huì)得到通知發(fā)生了TimeoutException府树。也因?yàn)槿绱耍悴粫?huì)有機(jī)會(huì)發(fā)現(xiàn)計(jì)算商品價(jià)格的線(xiàn)程內(nèi)到底發(fā)生了什么問(wèn)題才引發(fā)了這樣的失效料按。為了讓客戶(hù)端能了解商店無(wú)法提供請(qǐng)求商品價(jià)格的原因奄侠,需要使用CompletableFuture的completeExceptionally方法將導(dǎo)致CompletableFuture內(nèi)發(fā)生問(wèn)題的異常拋出。
拋出CompletableFuture內(nèi)的異常:

public Future<Double> getPriceAsync(String product) {
  CompletableFuture<Double> futurePrice = new CompletableFuture<>();
  new Thread(() -> {
    try {
      double price = calculatePrice(product);
      futurePrice.complete(price);//如果價(jià)格計(jì)算正常結(jié)束载矿,完成Future操作并設(shè)置商品價(jià)格 
    } catch (Exception ex) {
      futurePrice.completeExceptionally(ex);//否則拋出導(dǎo)致失敗的異常遭铺,完成這次Future操作 
    }
  }).start();
  return futurePrice;
}

客戶(hù)端現(xiàn)在會(huì)收到一個(gè)ExecutionException異常,該異常接收了一個(gè)包含失敗原因的Exception參數(shù)恢准,即價(jià)格計(jì)算方法初拋出的異常魂挂。
使用工廠方法supplyAsync創(chuàng)建CompletableFuture
CompletableFuture類(lèi)提供了大量精巧的工廠方法,使用這些方法能更容易地完成整個(gè)流程馁筐,不用擔(dān)心實(shí)現(xiàn)細(xì)節(jié)涂召。比如,采用supplyAsync方法后敏沉,可以用一行語(yǔ)句重寫(xiě)getPriceAsync方法:

public Future<Double> getPriceAsync(String product) {
  return CompletableFuture.supplyAsync(() -> calculatePrice(product));
}

supplyAsync方法接受一個(gè)生產(chǎn)者(Supplier)作為參數(shù)果正,返回一個(gè)CompletableFuture對(duì)象,該對(duì)象完成異步執(zhí)行后會(huì)讀取調(diào)用生產(chǎn)者方法的返回值盟迟。生產(chǎn)者方法會(huì)交由ForkJoinPool池中的某個(gè)執(zhí)行線(xiàn)程運(yùn)行秋泳,也可以使用supplyAsync方法的重載版本,傳遞第二個(gè)參數(shù)指定不同的執(zhí)行線(xiàn)程執(zhí)行生產(chǎn)者方法攒菠。一般而言迫皱,向CompletableFuture的工廠方法傳遞可選參數(shù),指定生產(chǎn)者方法的執(zhí)行線(xiàn)程是可行的辖众。

3卓起、無(wú)阻塞執(zhí)行
1)使用并行流對(duì)請(qǐng)求進(jìn)行并行操作

使用并行流來(lái)避免順序計(jì)算:

public List<String> findPrices(String product) {
  return shops.parallelStream()
    .map(shop -> String.format("%s price is %.2f", shop.getName(), shop.getPrice(product)))
    .collect(toList());
}
2)使用CompletableFuture發(fā)起異步請(qǐng)求

使用工廠方法supplyAsync創(chuàng)建CompletableFuture對(duì)象:

List<CompletableFuture<String>> priceFutures = shops.stream()
    .map(shop -> CompletableFuture.supplyAsync(() -> String.format("%s price is %.2f",shop.getName(), shop.getPrice(product))))
    .collect(toList());

使用這種方式,會(huì)得到一個(gè)List<CompletableFuture<String>>凹炸,列表中的每個(gè)CompletableFuture對(duì)象在計(jì)算完成后都包含商店的String類(lèi)型的名稱(chēng)戏阅。但是,由于用CompletableFutures實(shí)現(xiàn)的findPrices方法要求返回一個(gè)List<String>啤它,需要等待所有的future執(zhí)行完畢奕筐,將其包含的值抽取出來(lái)舱痘,填充到列表中才能返回。
為了實(shí)現(xiàn)這個(gè)效果离赫,可以向List<CompletableFuture<String>>施加第二個(gè)map操作芭逝,對(duì)List中的所有future對(duì)象執(zhí)行join操作,一個(gè)接一個(gè)地等待它們運(yùn)行結(jié)束笆怠。
CompletableFuture類(lèi)中的join方法和Future接口中的get有相同的含義铝耻,并且也聲明在Future接口中誊爹,它們唯一的不同是join不會(huì)拋出任何檢測(cè)到的異常蹬刷。使用它不需要使用try/catch語(yǔ)句塊,讓傳遞給第二個(gè)map方法的Lambda表達(dá)式變得臃腫频丘。
使用CompletableFuture實(shí)現(xiàn)findPrices方法:

public List<String> findPrices(String product) {
    List<CompletableFuture<String>> priceFutures = shops.stream()
    .map(shop -> CompletableFuture.supplyAsync(() -> shop.getName() + " price is " +shop.getPrice(product)))
    .collect(Collectors.toList()); 
 
    return priceFutures.stream()
      .map(CompletableFuture::join)
      .collect(toList());
}

這里使用了兩個(gè)不同的Stream流水線(xiàn)办成,而不是在同一個(gè)處理流的流水線(xiàn)上一 個(gè)接一個(gè)地放置兩個(gè)map操作。原因是考慮流操作之間的延遲特性搂漠,如果你在單一流水線(xiàn)中處理流迂卢,發(fā)向不同商家的請(qǐng)求只能以同步、順序執(zhí)行的方式才會(huì)成功桐汤。因此而克,每個(gè)創(chuàng)建CompletableFuture對(duì)象只能在前一個(gè)操作結(jié)束后執(zhí)行查詢(xún)指定商家的動(dòng)作、通知join方法返回計(jì)算結(jié)果怔毛。

3)更優(yōu)方案

并行流的版本工作得非常好员萍,那是因?yàn)樗懿⑿械貓?zhí)行四個(gè)任務(wù),所以它幾乎能為每個(gè)商家分配一個(gè)線(xiàn)程拣度。如果想要增加第五個(gè)商家到商店列表中碎绎,非常不幸,并行流版本的程序比之前也多消耗了差不多1秒鐘的時(shí)間抗果,因?yàn)榭梢圆⑿羞\(yùn)行(通用線(xiàn)程池中處于可用狀態(tài)的)的四個(gè)線(xiàn)程現(xiàn)在都處于繁忙狀態(tài)筋帖,都在對(duì)前4個(gè)商店進(jìn)行查 詢(xún)。第五個(gè)查詢(xún)只能等到前面某一個(gè)操作完成釋放出空閑線(xiàn)程才能繼續(xù)冤馏。
CompletableFuture版本的程序似乎比并行流版本的程序還快一點(diǎn)日麸,但這個(gè)版本也不太令人滿(mǎn)意。如果試圖讓代碼處理9個(gè)商店逮光,并行流版本耗時(shí)3143毫秒赘淮, 而CompletableFuture版本耗時(shí)3009毫秒。它們看起來(lái)不相伯仲睦霎,究其原因都一樣:它們內(nèi)部采用的是同樣的通用線(xiàn)程池梢卸,默認(rèn)都使用固定數(shù)目的線(xiàn)程,具體線(xiàn)程數(shù)取決于Runtime. getRuntime().availableProcessors()的返回值副女。然而蛤高,CompletableFuture具有一定的優(yōu)勢(shì),因?yàn)樗试S你對(duì)執(zhí)行器(Executor)進(jìn)行配置,尤其是線(xiàn)程池的大小戴陡,讓它以更適合應(yīng)用需求的方式進(jìn)行配置塞绿,滿(mǎn)足程序的要求,而這是并行流API無(wú)法提供的恤批。

4)使用定制的執(zhí)行器

調(diào)整線(xiàn)程池的大幸煳恰:如果線(xiàn)程池中線(xiàn)程的數(shù)量過(guò)多,最終它們會(huì)競(jìng)爭(zhēng)稀缺的處理器和內(nèi)存資源喜庞,浪費(fèi)大量的時(shí)間在上下文切換上诀浪。反之,如果線(xiàn)程的數(shù)目過(guò)少延都,處理器的一些核可能就無(wú)法充分利用雷猪。
線(xiàn)程池大小與處理器的利用率之比可以使用下面的公式進(jìn)行估算:
Nthreads = NCPU * UCPU * (1 + W/C)
NCPU:是處理器核的數(shù)目,可通過(guò)Runtime.getRuntime().availableProce- ssors()得到
UCPU:是期望的CPU利用率(該值應(yīng)該介于0和1之間)
W/C:是等待時(shí)間與計(jì)算時(shí)間的比率
你的應(yīng)用99%的時(shí)間都在等待商店的響應(yīng)晰房,所以估算出的W/C比率為100求摇。如果期望的CPU利用率是100%,需要?jiǎng)?chuàng)建一個(gè)擁有400個(gè)線(xiàn)程的線(xiàn)程池殊者。實(shí)際操作中与境,如果你創(chuàng)建的線(xiàn)程數(shù)比商店的數(shù)目更多,反而是一種浪費(fèi)猖吴。出于這種考慮摔刁,建議將執(zhí)行器使用的線(xiàn)程數(shù),與需要查詢(xún)的商店數(shù)目設(shè)定為同一個(gè)值距误,這樣每個(gè)商店都應(yīng)該對(duì)應(yīng)一個(gè)服務(wù)線(xiàn)程簸搞。不過(guò),為了避免發(fā)生由于商店的數(shù)目過(guò)多導(dǎo)致服務(wù)器超負(fù)荷而崩潰准潭,你還是需要設(shè)置一個(gè)上限趁俊,比如100個(gè)線(xiàn)程。

private final Executor executor = Executors.newFixedThreadPool(Math.min(shops.size(), 100),//創(chuàng)建一個(gè)線(xiàn)程池刑然,線(xiàn)程池中線(xiàn)程數(shù)目為100和商店數(shù)目二者中較小的一個(gè)值
  new ThreadFactory() {
    public Thread newThread(Runnable r) {
      Thread t = new Thread(r);
      t.setDaemon(true);//使用守護(hù)線(xiàn)程寺擂,這種方式不會(huì)阻止程序的關(guān)停
      return t;
    }
});

現(xiàn)在創(chuàng)建的是一個(gè)由守護(hù)線(xiàn)程構(gòu)成的線(xiàn)程池。Java程序無(wú)法終止或者退出一個(gè)正在運(yùn)行中的線(xiàn)程泼掠,所以最后剩下的那個(gè)線(xiàn)程會(huì)由于一直等待無(wú)法發(fā)生的事件而引發(fā)問(wèn)題怔软。相反,如果將線(xiàn)程標(biāo)記為守護(hù)進(jìn)程择镇,意味著程序退出時(shí)它也會(huì)被回收挡逼。這二者之間沒(méi)有性能上的差異。現(xiàn)在腻豌,可以將執(zhí)行器作為第二個(gè)參數(shù)傳遞給supplyAsync工廠方法了家坎。比如可以按照下面的方式創(chuàng)建一個(gè)可查詢(xún)指定商品價(jià)格的CompletableFuture對(duì)象:

CompletableFuture.supplyAsync(() -> shop.getName() + " price is " + shop.getPrice(product), executor);

改進(jìn)后嘱能,使用CompletableFuture方案的程序處理5個(gè)商店僅耗時(shí)1021秒,處理9個(gè)商店耗時(shí)1022秒虱疏。一般而言惹骂,這種狀態(tài)會(huì)一直持續(xù),直到商店的數(shù)目達(dá)到之前計(jì)算的閾值400做瞪。
并行時(shí)使用流還是CompletableFutures对粪?
對(duì)集合進(jìn)行并行計(jì)算有兩種方式:要么將其轉(zhuǎn)化為并行流,利用map操作装蓬,要么枚舉出集合中的每一個(gè)元素著拭,創(chuàng)建新的線(xiàn)程,在CompletableFuture內(nèi)對(duì)其進(jìn)行操作矛物。后者提供了更多的靈活性茫死,你可以調(diào)整線(xiàn)程池的大小跪但,而這能幫助你確保整體的計(jì)算不會(huì)因?yàn)榫€(xiàn)程都在等待I/O而發(fā)生阻塞履羞。
使用這些API的建議:

  • 如果你進(jìn)行的是計(jì)算密集型的操作,并且沒(méi)有I/O屡久,那么推薦使用Stream接口忆首,因?yàn)閷?shí)現(xiàn)簡(jiǎn)單被环,同時(shí)效率也可能是最高的。
  • 如果并行的工作單元還涉及等待I/O的操作(包括網(wǎng)絡(luò)連接等待)浸锨,那么使用CompletableFuture靈活性更好,可以依據(jù)等待/計(jì)算柱搜,或者W/C的比率設(shè)定需要使用的線(xiàn)程數(shù)剥险。這種情況不使用并行流的另一個(gè)原因是,處理流的流水線(xiàn)中如果發(fā)生I/O等待表制,流的延遲特性會(huì)讓我們很難判斷到底什么時(shí)候觸發(fā)了等待。
4么介、對(duì)多個(gè)異步任務(wù)進(jìn)行流水線(xiàn)操作

示例:所有商店都使用一個(gè)集中式的折扣服務(wù),該折扣服務(wù)提供了五個(gè)不同的折扣代碼设拟,每個(gè)折扣代碼對(duì)應(yīng)不同的折扣率衷咽。

public class Discount {
  public enum Code {
    NONE(0), SILVER(5), GOLD(10), PLATINUM(15), DIAMOND(20);
    private final int percentage; 
    Code(int percentage) {
      this.percentage = percentage;
    }
  }
  // Discount類(lèi)的具體實(shí)現(xiàn)
}

假設(shè)所有的商店都統(tǒng)一修改getPrice方法的返回格式镶骗。getPrice現(xiàn)在以ShopName:price:DiscountCode的格式返回一個(gè)String類(lèi)型的值躲雅。

public String getPrice(String product) {
  double price = calculatePrice(product);
  Discount.Code code = Discount.Code.values()[random.nextInt(Discount.Code.values().length)];
  return String.format("%s:%.2f:%s", name, price, code);
}
private double calculatePrice(String product) {
  delay();
  return random.nextDouble() * product.charAt(0) + product.charAt(1);
}
1)實(shí)現(xiàn)折扣服務(wù)

對(duì)商店返 回字符串的解析操作封裝到了下面的Quote類(lèi)之中:

public class Quote { 
    private final String shopName;
    private final double price;
    private final Discount.Code discountCode; 
 
    public Quote(String shopName, double price, Discount.Code code) {
      this.shopName = shopName;
      this.price = price;
      this.discountCode = code;
    } 
 
    public static Quote parse(String s) {
      String[] split = s.split(":");
      String shopName = split[0];
      double price = Double.parseDouble(split[1]);
      Discount.Code discountCode = Discount.Code.valueOf(split[2]);
      return new Quote(shopName, price, discountCode);
    }
 
    public String getShopName() { return shopName; }
    public double getPrice() { return price; 
    public Discount.Code getDiscountCode() { return discountCode;}
}

通過(guò)傳遞shop對(duì)象返回的字符串給靜態(tài)工廠方法parse,可以得到Quote類(lèi)的一個(gè)實(shí)例相赁, 它包含了shop的名稱(chēng)相寇、折扣之前的價(jià)格,以及折扣代碼钮科。
Discount服務(wù)還提供了一個(gè)applyDiscount方法唤衫,它接收一個(gè)Quote對(duì)象,返回一個(gè)字符串绵脯,表示生成該Quote的shop中的折扣價(jià)格:

public class Discount {
  public enum Code {
    // 源碼省略……
  } 
 
  public static String applyDiscount(Quote quote) {
    return quote.getShopName() + " price is " +                Discount.apply(quote.getPrice(),quote.getDiscountCode());
  }
  private static double apply(double price, Code code) {
    delay();
    return format(price * (100 - code.percentage) / 100);
  }
}
2)使用Discount服務(wù)

以簡(jiǎn)單的方式實(shí)現(xiàn)使用Discount服務(wù)的findPrices方法:

public List<String> findPrices(String product) {
  return shops.stream()
    .map(shop -> shop.getPrice(product)) //取得每個(gè)shop對(duì)象中商品的原始價(jià)格
    .map(Quote::parse)//在Quote對(duì)象中對(duì)shop返回的字符串進(jìn)行轉(zhuǎn)換 
    .map(Discount::applyDiscount)//為每個(gè)Quote申請(qǐng)折扣
    .collect(toList());
}

通過(guò)在shop構(gòu)成的流上采用流水線(xiàn)方式執(zhí)行三次map操作佳励,得到了期望的結(jié)果。

3)構(gòu)造同步和異步操作

使用CompletableFuture實(shí)現(xiàn)findPrices方法:

public List<String> findPrices(String product) {
  List<CompletableFuture<String>> priceFutures = shops.stream()
    .map(shop -> CompletableFuture.supplyAsync(() -> //以異步方式取得每個(gè)shop中指定產(chǎn)品的原始價(jià)格 
 shop.getPrice(product), executor))
    .map(future -> future.thenApply(Quote::parse))//Quote對(duì)象存在時(shí)蛆挫,對(duì)其返回的值進(jìn)行轉(zhuǎn)換
    .map(future -> future.thenCompose(quote ->//使用另一個(gè)異步任務(wù)構(gòu)造期望的Future赃承,申請(qǐng)折扣 CompletableFuture.supplyAsync(() -> Discount.applyDiscount(quote), executor)))
    .collect(toList()); 
 
    return priceFutures.stream()
      .map(CompletableFuture::join)
      .collect(toList()); 
}
image.png

通常而言,名稱(chēng)中不帶Async的方法和它的前一個(gè)任務(wù)一樣悴侵,在同一個(gè)線(xiàn)程中運(yùn)行瞧剖;而名稱(chēng)以Async結(jié)尾的方法會(huì)將后續(xù)的任務(wù)提交到一個(gè)線(xiàn)程池,所以每個(gè)任務(wù)是由不同的線(xiàn)程處理的可免。

4)將兩個(gè)CompletableFuture對(duì)象整合起來(lái)

對(duì)一個(gè)CompletableFuture對(duì)象調(diào)用了thenCompose方法抓于,并向其傳遞了第二個(gè)CompletableFuture,而第二個(gè)CompletableFuture又需要使用第一個(gè)CompletableFuture的執(zhí)行結(jié)果作為輸入浇借。但是捉撮,另一種比較常見(jiàn)的情況是,你需要將兩個(gè)完 全不相干的CompletableFuture對(duì)象的結(jié)果整合起來(lái)逮刨,而且也不希望等到第一個(gè)任務(wù)完全結(jié) 束才開(kāi)始第二項(xiàng)任務(wù)呕缭。
這種情況,應(yīng)該使用thenCombine方法修己,它接收名為BiFunction的第二參數(shù)恢总,這個(gè)參數(shù)定義了當(dāng)兩個(gè)CompletableFuture對(duì)象完成計(jì)算后,結(jié)果如何合并睬愤。同thenCompose方法一樣片仿,thenCombine方法也提供有一個(gè)Async的版本。如果使用thenCombineAsync會(huì)導(dǎo)致BiFunction中定義的合并操作被提交到線(xiàn)程池中尤辱,由另一個(gè)任務(wù)以異步的方式執(zhí)行厢岂。
合并兩個(gè)獨(dú)立的CompletableFuture對(duì)象:

Future<Double> futurePriceInUSD = CompletableFuture
  .supplyAsync(() -> shop.getPrice(product))//創(chuàng)建第一個(gè)任務(wù)查詢(xún) 商店取得商品的價(jià)格
  .thenCombine(
    CompletableFuture.supplyAsync(() -> exchangeService.getRate(Money.EUR, Money.USD)),//創(chuàng)建第二個(gè)獨(dú)立任務(wù)塔粒,查詢(xún)美元和歐元之間的轉(zhuǎn)換匯率 
    (price, rate) -> price * rate //通過(guò)乘法整合得到的商品價(jià)格和匯率
);
image.png
5)對(duì)Future和CompletableFuture的回顧

CompletableFuture利用Lambda表達(dá)式以聲明式的API提供了一種機(jī)制,能夠用有效的方式圃酵,非常容易地將多個(gè)以同步或異步方式執(zhí)行復(fù)雜操作的任務(wù)結(jié)合到一起郭赐。
利用Java 7的方法合并兩個(gè)Future對(duì)象:

ExecutorService executor = Executors.newCachedThreadPool();
final Future<Double> futureRate = executor.submit(new Callable<Double>() {
  public Double call() {
    return exchangeService.getRate(Money.EUR, Money.USD);
  }
});
Future<Double> futurePriceInUSD = executor.submit(new Callable<Double>() {
  public Double call() {
    double priceInEUR = shop.getPrice(product);
    return priceInEUR * futureRate.get();
  }
});  

通過(guò)向執(zhí)行器提交一個(gè)Callable對(duì)象的方式創(chuàng)建了第一個(gè)Future對(duì)象捌锭,向外部服務(wù)查詢(xún)歐元和美元之間的轉(zhuǎn)換匯率舀锨。接著宛逗,創(chuàng)建了第二個(gè)Future對(duì)象雷激,查詢(xún)指定商店中特定商品的歐元價(jià)格屎暇。最終根悼,用與上述合并兩個(gè)獨(dú)立的CompletableFuture對(duì)象一樣的方式挤巡,在同一個(gè)Future中通過(guò)查詢(xún)商店得到的歐元商品價(jià)格乘以匯率得到了終的價(jià)格矿卑。

5母廷、響應(yīng)CompletableFuture的completion事件

使用randomDelay方法取代原來(lái)的固定延遲琴昆,模擬生成0.5秒至2.5秒隨機(jī)延遲的方法:

private static final Random random = new Random();
public static void randomDelay() {
  int delay = 500 + random.nextInt(2000);
  try {
    Thread.sleep(delay);
  } catch (InterruptedException e) {
    throw new RuntimeException(e);
  }
}

對(duì)最佳價(jià)格查詢(xún)器應(yīng)用的優(yōu)化
要避免的首要問(wèn)題是业舍,等待創(chuàng)建一個(gè)包含了所有價(jià)格的List創(chuàng)建完成勤讽。應(yīng)該做的是直接處理CompletableFuture流脚牍,這樣每個(gè)CompletableFuture都在為某個(gè)商店執(zhí)行必要的操作诸狭。
重構(gòu)findPrices方法返回一個(gè)由Future構(gòu)成的流:

public Stream<CompletableFuture<String>> findPricesStream(String product) {
  return shops.stream()
    .map(shop -> CompletableFuture.supplyAsync(() -> shop.getPrice(product), executor))
    .map(future -> future.thenApply(Quote::parse))
    .map(future -> future.thenCompose(quote -> CompletableFuture.supplyAsync(() -> Discount.applyDiscount(quote), executor)));
}

現(xiàn)在驯遇,為findPricesStream方法返回的Stream添加了第四個(gè)map操作叉庐,在此之前陡叠,已經(jīng)在該方法內(nèi)部調(diào)用了三次map枉阵。這個(gè)新添加的操作只是在每個(gè)CompletableFuture上注冊(cè)一個(gè)操作兴溜,該操作會(huì)在CompletableFuture完成執(zhí)行后使用它的返回值拙徽。Java 8的CompletableFuture通過(guò)thenAccept方法提供了這一功能,它接收 CompletableFuture執(zhí)行完畢后的返回值做參數(shù)已卷。
將結(jié)果打印輸出:

findPricesStream("myPhone").map(f -> f.thenAccept(System.out::println));

和thenCompose和thenCombine方法一樣侧蘸,thenAccept方法也提供 了一個(gè)異步版本讳癌,名為thenAcceptAsync。異步版本的方法會(huì)對(duì)處理結(jié)果的消費(fèi)者進(jìn)行調(diào)度逢艘,從線(xiàn)程池中選擇一個(gè)新的線(xiàn)程繼續(xù)執(zhí)行它改,不再由同一個(gè)線(xiàn)程完成CompletableFuture的所有任務(wù)央拖。
由于thenAccept方法已經(jīng)定義了如何處理CompletableFuture返回的結(jié)果鲜戒,一旦CompletableFuture計(jì)算得到結(jié)果遏餐,它就返回一個(gè)CompletableFuture<Void>赢底。所以颖系,map操作返回的是一個(gè)Stream<CompletableFuture<Void>>嘁扼。
把構(gòu)成Stream的所有CompletableFuture<Void>對(duì)象放到一個(gè)數(shù)組中趁啸,等待所有的任務(wù)執(zhí)行完成:

CompletableFuture[] futures = findPricesStream("myPhone") 
    .map(f -> f.thenAccept(System.out::println))
    .toArray(size -> new CompletableFuture[size]); CompletableFuture.allOf(futures).join(); 

allOf工廠方法接收一個(gè)由CompletableFuture構(gòu)成的數(shù)組不傅,數(shù)組中的所有CompletableFuture對(duì)象執(zhí)行完成之后访娶,它返回一個(gè)CompletableFuture<Void>對(duì)象崖疤。
如果只要CompletableFuture對(duì)象數(shù)組中有任何一個(gè)執(zhí)行完畢就不再等待,可以使用一個(gè)類(lèi)似的工廠方法anyOf叮趴。該方法接收一個(gè)CompletableFuture對(duì)象構(gòu)成的數(shù)組眯亦,返回由第一個(gè)執(zhí)行完畢的CompletableFuture對(duì)象的返回值構(gòu)成的CompletableFuture<Object>妻率。

--參考文獻(xiàn)《Java8實(shí)戰(zhàn)》

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末舌涨,一起剝皮案震驚了整個(gè)濱河市囊嘉,隨后出現(xiàn)的幾起案子扭粱,更是在濱河造成了極大的恐慌琢蛤,老刑警劉巖博其,帶你破解...
    沈念sama閱讀 211,561評(píng)論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件慕淡,死亡現(xiàn)場(chǎng)離奇詭異沸毁,居然都是意外死亡携兵,警方通過(guò)查閱死者的電腦和手機(jī)搂誉,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,218評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門(mén)拂檩,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)死遭,“玉大人呀潭,你說(shuō)我怎么就攤上這事钠署⌒扯Γ” “怎么了身害?”我有些...
    開(kāi)封第一講書(shū)人閱讀 157,162評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵塌鸯,是天一觀的道長(zhǎng)丙猬。 經(jīng)常有香客問(wèn)我,道長(zhǎng)星持,這世上最難降的妖魔是什么钉汗? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 56,470評(píng)論 1 283
  • 正文 為了忘掉前任,我火速辦了婚禮酒来,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘。我一直安慰自己伟墙,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,550評(píng)論 6 385
  • 文/花漫 我一把揭開(kāi)白布拱烁。 她就那樣靜靜地躺著戏自,像睡著了一般擅笔。 火紅的嫁衣襯著肌膚如雪猛们。 梳的紋絲不亂的頭發(fā)上弯淘,一...
    開(kāi)封第一講書(shū)人閱讀 49,806評(píng)論 1 290
  • 那天耳胎,我揣著相機(jī)與錄音,去河邊找鬼惕它。 笑死怕午,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的淹魄。 我是一名探鬼主播郁惜,決...
    沈念sama閱讀 38,951評(píng)論 3 407
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼甲锡!你這毒婦竟也來(lái)了兆蕉?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 37,712評(píng)論 0 266
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤缤沦,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后亡电,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體或辖,經(jīng)...
    沈念sama閱讀 44,166評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,510評(píng)論 2 327
  • 正文 我和宋清朗相戀三年捌省,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了祝高。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片陆蟆。...
    茶點(diǎn)故事閱讀 38,643評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡纫塌,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出胸嘁,到底是詐尸還是另有隱情毫胜,我是刑警寧澤口渔,帶...
    沈念sama閱讀 34,306評(píng)論 4 330
  • 正文 年R本政府宣布咧党,位于F島的核電站蛙埂,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,930評(píng)論 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望儡首。 院中可真熱鬧,春花似錦舞竿、人聲如沸鄙皇。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,745評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)洲敢。三九已至询一,卻和暖如春抗俄,著一層夾襖步出監(jiān)牢的瞬間胰蝠,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,983評(píng)論 1 266
  • 我被黑心中介騙來(lái)泰國(guó)打工济丘, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留闪盔,地道東北人异赫。 一個(gè)月前我還...
    沈念sama閱讀 46,351評(píng)論 2 360
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子栅干,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,509評(píng)論 2 348

推薦閱讀更多精彩內(nèi)容