1韧献、Future接口
Future接口在Java 5中被引入舌劳,設(shè)計(jì)初衷是對(duì)將來(lái)某個(gè)時(shí)刻會(huì)發(fā)生的結(jié)果進(jìn)行建模。它建模了一種異步計(jì)算军洼,返回一個(gè)執(zhí)行運(yùn)算結(jié)果的引用巩螃,當(dāng)運(yùn)算結(jié)束后,這個(gè)引用被返回給調(diào)用方匕争。在Future中觸發(fā)那些潛在耗時(shí)的操作把調(diào)用線(xiàn)程解放出來(lái)避乏,讓它能繼續(xù)執(zhí)行其他有價(jià)值的工作,不需要等待耗時(shí)的操作完成甘桑。
示例:使用Future以異步的方式執(zhí)行一個(gè)耗時(shí)的操作
ExecutorService executor = Executors.newCachedThreadPool();
Future<Double> future = executor.submit(new Callable<Double>() { //向ExecutorService提交一個(gè)Callable對(duì)象
public Double call() {
return doSomeLongComputation();//以異步方式在新線(xiàn)程中執(zhí)行耗時(shí)的操作
}
});
doSomethingElse();
try {
Double result = future.get(1, TimeUnit.SECONDS);//獲取異步操作結(jié)果拍皮,如果被阻塞,無(wú)法得到結(jié)果跑杭,在等待1秒鐘后退出
} catch (ExecutionException ee) {
// 計(jì)算拋出一個(gè)異常
} catch (InterruptedException ie) {
// 當(dāng)前線(xiàn)程在等待過(guò)程中被中斷
} catch (TimeoutException te) {
// 在Future對(duì)象完成之前超時(shí)
}
這種編程方式讓你的線(xiàn)程可以在ExecutorService以并發(fā)方式調(diào)用另一個(gè)線(xiàn)程執(zhí)行耗時(shí)操作的同時(shí)铆帽,去執(zhí)行一些其他任務(wù)。如果已經(jīng)運(yùn)行到?jīng)]有異步操作的結(jié)果就無(wú)法繼續(xù)進(jìn)行時(shí)德谅,可以調(diào)用它的get方法去獲取操作結(jié)果爹橱。如果操作已經(jīng)完成,該方法會(huì)立刻返回操作結(jié)果窄做,否則它會(huì)阻塞線(xiàn)程愧驱,直到操作完成慰技,返回相應(yīng)的結(jié)果。
為了處理長(zhǎng)時(shí)間運(yùn)行的操作永遠(yuǎn)不返回的可能性组砚,雖然Future提供了一個(gè)無(wú)需任何參數(shù)的get方法吻商,但還是推薦使用重載版本的get方法,它接受一個(gè)超時(shí)的參數(shù)糟红,可以定義線(xiàn)程等待Future結(jié)果的時(shí)間手报,而不是永無(wú)止境地等待下去。
Future接口的局限性
Future接口提供了方法來(lái)檢測(cè)異步計(jì)算是否已經(jīng)結(jié)束(使用isDone方法)改化,等待異步操作結(jié)束掩蛤,以及獲取計(jì)算的結(jié)果。但這些特性還不足以讓你編寫(xiě)簡(jiǎn)潔的并發(fā)代碼陈肛。
- 將兩個(gè)異步計(jì)算合并為一個(gè)揍鸟,這兩個(gè)異步計(jì)算之間相互獨(dú)立,同時(shí)第二個(gè)又依賴(lài)于第一個(gè)的結(jié)果句旱。
- 等待Future集合中的所有任務(wù)都完成阳藻。
- 僅等待Future集合中快結(jié)束的任務(wù)完成,并返回它的結(jié)果谈撒。
- 通過(guò)編程方式完成一個(gè)Future任務(wù)的執(zhí)行腥泥。
- 應(yīng)對(duì)Future的完成事件(即當(dāng)Future的完成事件發(fā)生時(shí)會(huì)收到通知,并能使用Future計(jì)算的結(jié)果進(jìn)行下一步操作啃匿,不只是簡(jiǎn)單地阻塞等待操作結(jié)果)蛔外。
2、實(shí)現(xiàn)異步API
示例:實(shí)現(xiàn)價(jià)格查詢(xún)器
聲明依據(jù)指定產(chǎn)品名稱(chēng)返回價(jià)格的方法:
public class Shop {
public double getPrice(String product) {
// 待實(shí)現(xiàn)
}
}
采用delay方法模擬長(zhǎng)期運(yùn)行的方法的執(zhí)行溯乒,它會(huì)人為地引入1秒鐘的延遲夹厌,方法聲明如下:
public static void delay() {
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
在getPrice方法中引入一個(gè)模擬的延遲:
public double getPrice(String product) {
return calculatePrice(product);
}
private double calculatePrice(String product)
delay();
return random.nextDouble() * product.charAt(0) + product.charAt(1);
}
這個(gè)API的使用者調(diào)用該方法時(shí),它依舊會(huì)被阻塞裆悄。為等待同步事件完成而等待1秒鐘矛纹,這是無(wú)法接受的,尤其是考慮到佳價(jià)格查詢(xún)器對(duì)網(wǎng)絡(luò)中的所有商店都要重復(fù)這種操作光稼。
1)將同步方法轉(zhuǎn)換為異步方法
首先需要將getPrice轉(zhuǎn)換為getPriceAsync方法或南,并修改它的返回值:
public Future<Double> getPriceAsync(String product) { ... }
Java 5引入了java.util.concurrent.Future接口表示一個(gè)異步計(jì)算(即調(diào)用線(xiàn)程可以繼續(xù)運(yùn)行,不會(huì)因?yàn)檎{(diào)用方法而阻塞)的結(jié)果艾君。這意味著Future是一個(gè)暫時(shí)還不可知值的處理器采够,這個(gè)值在計(jì)算完成后,可以通過(guò)調(diào)用它的get方法取得腻贰。getPriceAsync方法能立刻返回吁恍,給調(diào)用線(xiàn)程一個(gè)機(jī)會(huì)扒秸,能在同一時(shí)間去執(zhí)行其他有價(jià)值的計(jì)算任務(wù)播演。新的CompletableFuture類(lèi)提供了大量的方法冀瓦,讓我們有機(jī)會(huì)以多種可能的方式輕松地實(shí)現(xiàn)這個(gè)方法:
public Future<Double> getPriceAsync(String product) {
CompletableFuture<Double> futurePrice = new CompletableFuture<>();//創(chuàng)建CompletableFuture對(duì)象,它會(huì)包含計(jì)算結(jié)果
new Thread(() -> {
double price = calculatePrice(product);//在另一個(gè)線(xiàn)程中以異步方式執(zhí)行計(jì)算
futurePrice.complete(price);//需長(zhǎng)時(shí)間計(jì)算的任務(wù)結(jié)束并得出結(jié)果時(shí)写烤,設(shè)置Future的返回值
}).start();
return futurePrice;//無(wú)需等待還沒(méi)結(jié)束的計(jì)算翼闽,直接返回Future對(duì)象
}
上述代碼中,創(chuàng)建了一個(gè)代表異步計(jì)算的CompletableFuture對(duì)象實(shí)例洲炊,它在計(jì)算完成時(shí)會(huì)包含計(jì)算的結(jié)果感局。接著調(diào)用fork創(chuàng)建了另一個(gè)線(xiàn)程去執(zhí)行實(shí)際的價(jià)格計(jì)算工作,不等該耗時(shí)計(jì)算任務(wù)結(jié)束暂衡,直接返回一個(gè)Future實(shí)例询微。當(dāng)請(qǐng)求的產(chǎn)品價(jià)格終計(jì)算得出時(shí),可以使用它的complete方法狂巢,結(jié)束completableFuture對(duì)象的運(yùn)行撑毛,并設(shè)置變量的值。
使用異步API:
Shop shop = new Shop("BestShop");
long start = System.nanoTime();
Future<Double> futurePrice = shop.getPriceAsync("my favorite product");
long invocationTime = ((System.nanoTime() - start) / 1_000_000);
System.out.println("Invocation returned after " + invocationTime + " msecs");
doSomethingElse(); //執(zhí)行更多任務(wù)
try {
double price = futurePrice.get();//從Future對(duì)象中讀取價(jià)格唧领,如果價(jià)格未知藻雌,會(huì)發(fā)生阻塞
System.out.printf("Price is %.2f%n", price);
} catch (Exception e) {
throw new RuntimeException(e);
}
long retrievalTime = ((System.nanoTime() - start) / 1_000_000);
System.out.println("Price returned after " + retrievalTime + " msecs");
上述代碼中,客戶(hù)向商店查詢(xún)了某種商品的價(jià)格斩个。由于商店提供了異步API胯杭,該調(diào)用立刻返回了一個(gè)Future對(duì)象,通過(guò)該對(duì)象客戶(hù)可以在將來(lái)的某個(gè)時(shí)刻取得商品的價(jià)格受啥。這種方式下做个,客戶(hù)在進(jìn)行商品價(jià)格查詢(xún)的同時(shí),還能執(zhí)行一些其他的任務(wù)滚局,比如查詢(xún)其他家商店中商品的價(jià)格叁温,不會(huì)阻塞在那里等待第一家商店返回請(qǐng)求的結(jié)果。最后核畴,如果所有有意義的工作都完成膝但,客戶(hù)所有要執(zhí)行的工作都依賴(lài)于商品價(jià)格時(shí),再調(diào)用Future的get方法谤草。執(zhí)行這個(gè)操作后跟束,客戶(hù)要么獲得Future中封裝的值(如果異步任務(wù)已經(jīng)完成),要么發(fā)生阻塞丑孩,直到該異步任務(wù)完成冀宴,期望的值能夠訪(fǎng)問(wèn)。
2)錯(cuò)誤處理
如果價(jià)格計(jì)算過(guò)程中產(chǎn)生了錯(cuò)誤温学,這種情況下你會(huì)得到一個(gè)相當(dāng)糟糕的結(jié)果:用于提示錯(cuò)誤的異常會(huì)被限制在試圖計(jì)算商品價(jià)格的當(dāng)前線(xiàn)程的范圍內(nèi)略贮,最終會(huì)殺死該線(xiàn)程,而這會(huì)導(dǎo)致等待get方法返回結(jié)果的客戶(hù)端永久地被阻塞。
客戶(hù)端可以使用重載版本的get方法逃延,它使用一個(gè)超時(shí)參數(shù)來(lái)避免發(fā)生這樣的情況览妖。這是值得推薦的做法,應(yīng)盡量在代碼中添加超時(shí)判斷的邏輯揽祥,避免發(fā)生類(lèi)似的問(wèn)題讽膏。這種方法能防止程序永久地等待下去,超時(shí)發(fā)生時(shí)拄丰,程序會(huì)得到通知發(fā)生了TimeoutException府树。也因?yàn)槿绱耍悴粫?huì)有機(jī)會(huì)發(fā)現(xiàn)計(jì)算商品價(jià)格的線(xiàn)程內(nèi)到底發(fā)生了什么問(wèn)題才引發(fā)了這樣的失效料按。為了讓客戶(hù)端能了解商店無(wú)法提供請(qǐng)求商品價(jià)格的原因奄侠,需要使用CompletableFuture的completeExceptionally方法將導(dǎo)致CompletableFuture內(nèi)發(fā)生問(wèn)題的異常拋出。
拋出CompletableFuture內(nèi)的異常:
public Future<Double> getPriceAsync(String product) {
CompletableFuture<Double> futurePrice = new CompletableFuture<>();
new Thread(() -> {
try {
double price = calculatePrice(product);
futurePrice.complete(price);//如果價(jià)格計(jì)算正常結(jié)束载矿,完成Future操作并設(shè)置商品價(jià)格
} catch (Exception ex) {
futurePrice.completeExceptionally(ex);//否則拋出導(dǎo)致失敗的異常遭铺,完成這次Future操作
}
}).start();
return futurePrice;
}
客戶(hù)端現(xiàn)在會(huì)收到一個(gè)ExecutionException異常,該異常接收了一個(gè)包含失敗原因的Exception參數(shù)恢准,即價(jià)格計(jì)算方法初拋出的異常魂挂。
使用工廠方法supplyAsync創(chuàng)建CompletableFuture
CompletableFuture類(lèi)提供了大量精巧的工廠方法,使用這些方法能更容易地完成整個(gè)流程馁筐,不用擔(dān)心實(shí)現(xiàn)細(xì)節(jié)涂召。比如,采用supplyAsync方法后敏沉,可以用一行語(yǔ)句重寫(xiě)getPriceAsync方法:
public Future<Double> getPriceAsync(String product) {
return CompletableFuture.supplyAsync(() -> calculatePrice(product));
}
supplyAsync方法接受一個(gè)生產(chǎn)者(Supplier)作為參數(shù)果正,返回一個(gè)CompletableFuture對(duì)象,該對(duì)象完成異步執(zhí)行后會(huì)讀取調(diào)用生產(chǎn)者方法的返回值盟迟。生產(chǎn)者方法會(huì)交由ForkJoinPool池中的某個(gè)執(zhí)行線(xiàn)程運(yùn)行秋泳,也可以使用supplyAsync方法的重載版本,傳遞第二個(gè)參數(shù)指定不同的執(zhí)行線(xiàn)程執(zhí)行生產(chǎn)者方法攒菠。一般而言迫皱,向CompletableFuture的工廠方法傳遞可選參數(shù),指定生產(chǎn)者方法的執(zhí)行線(xiàn)程是可行的辖众。
3卓起、無(wú)阻塞執(zhí)行
1)使用并行流對(duì)請(qǐng)求進(jìn)行并行操作
使用并行流來(lái)避免順序計(jì)算:
public List<String> findPrices(String product) {
return shops.parallelStream()
.map(shop -> String.format("%s price is %.2f", shop.getName(), shop.getPrice(product)))
.collect(toList());
}
2)使用CompletableFuture發(fā)起異步請(qǐng)求
使用工廠方法supplyAsync創(chuàng)建CompletableFuture對(duì)象:
List<CompletableFuture<String>> priceFutures = shops.stream()
.map(shop -> CompletableFuture.supplyAsync(() -> String.format("%s price is %.2f",shop.getName(), shop.getPrice(product))))
.collect(toList());
使用這種方式,會(huì)得到一個(gè)List<CompletableFuture<String>>凹炸,列表中的每個(gè)CompletableFuture對(duì)象在計(jì)算完成后都包含商店的String類(lèi)型的名稱(chēng)戏阅。但是,由于用CompletableFutures實(shí)現(xiàn)的findPrices方法要求返回一個(gè)List<String>啤它,需要等待所有的future執(zhí)行完畢奕筐,將其包含的值抽取出來(lái)舱痘,填充到列表中才能返回。
為了實(shí)現(xiàn)這個(gè)效果离赫,可以向List<CompletableFuture<String>>施加第二個(gè)map操作芭逝,對(duì)List中的所有future對(duì)象執(zhí)行join操作,一個(gè)接一個(gè)地等待它們運(yùn)行結(jié)束笆怠。
CompletableFuture類(lèi)中的join方法和Future接口中的get有相同的含義铝耻,并且也聲明在Future接口中誊爹,它們唯一的不同是join不會(huì)拋出任何檢測(cè)到的異常蹬刷。使用它不需要使用try/catch語(yǔ)句塊,讓傳遞給第二個(gè)map方法的Lambda表達(dá)式變得臃腫频丘。
使用CompletableFuture實(shí)現(xiàn)findPrices方法:
public List<String> findPrices(String product) {
List<CompletableFuture<String>> priceFutures = shops.stream()
.map(shop -> CompletableFuture.supplyAsync(() -> shop.getName() + " price is " +shop.getPrice(product)))
.collect(Collectors.toList());
return priceFutures.stream()
.map(CompletableFuture::join)
.collect(toList());
}
這里使用了兩個(gè)不同的Stream流水線(xiàn)办成,而不是在同一個(gè)處理流的流水線(xiàn)上一 個(gè)接一個(gè)地放置兩個(gè)map操作。原因是考慮流操作之間的延遲特性搂漠,如果你在單一流水線(xiàn)中處理流迂卢,發(fā)向不同商家的請(qǐng)求只能以同步、順序執(zhí)行的方式才會(huì)成功桐汤。因此而克,每個(gè)創(chuàng)建CompletableFuture對(duì)象只能在前一個(gè)操作結(jié)束后執(zhí)行查詢(xún)指定商家的動(dòng)作、通知join方法返回計(jì)算結(jié)果怔毛。
3)更優(yōu)方案
并行流的版本工作得非常好员萍,那是因?yàn)樗懿⑿械貓?zhí)行四個(gè)任務(wù),所以它幾乎能為每個(gè)商家分配一個(gè)線(xiàn)程拣度。如果想要增加第五個(gè)商家到商店列表中碎绎,非常不幸,并行流版本的程序比之前也多消耗了差不多1秒鐘的時(shí)間抗果,因?yàn)榭梢圆⑿羞\(yùn)行(通用線(xiàn)程池中處于可用狀態(tài)的)的四個(gè)線(xiàn)程現(xiàn)在都處于繁忙狀態(tài)筋帖,都在對(duì)前4個(gè)商店進(jìn)行查 詢(xún)。第五個(gè)查詢(xún)只能等到前面某一個(gè)操作完成釋放出空閑線(xiàn)程才能繼續(xù)冤馏。
CompletableFuture版本的程序似乎比并行流版本的程序還快一點(diǎn)日麸,但這個(gè)版本也不太令人滿(mǎn)意。如果試圖讓代碼處理9個(gè)商店逮光,并行流版本耗時(shí)3143毫秒赘淮, 而CompletableFuture版本耗時(shí)3009毫秒。它們看起來(lái)不相伯仲睦霎,究其原因都一樣:它們內(nèi)部采用的是同樣的通用線(xiàn)程池梢卸,默認(rèn)都使用固定數(shù)目的線(xiàn)程,具體線(xiàn)程數(shù)取決于Runtime. getRuntime().availableProcessors()的返回值副女。然而蛤高,CompletableFuture具有一定的優(yōu)勢(shì),因?yàn)樗试S你對(duì)執(zhí)行器(Executor)進(jìn)行配置,尤其是線(xiàn)程池的大小戴陡,讓它以更適合應(yīng)用需求的方式進(jìn)行配置塞绿,滿(mǎn)足程序的要求,而這是并行流API無(wú)法提供的恤批。
4)使用定制的執(zhí)行器
調(diào)整線(xiàn)程池的大幸煳恰:如果線(xiàn)程池中線(xiàn)程的數(shù)量過(guò)多,最終它們會(huì)競(jìng)爭(zhēng)稀缺的處理器和內(nèi)存資源喜庞,浪費(fèi)大量的時(shí)間在上下文切換上诀浪。反之,如果線(xiàn)程的數(shù)目過(guò)少延都,處理器的一些核可能就無(wú)法充分利用雷猪。
線(xiàn)程池大小與處理器的利用率之比可以使用下面的公式進(jìn)行估算:
Nthreads = NCPU * UCPU * (1 + W/C)
NCPU:是處理器核的數(shù)目,可通過(guò)Runtime.getRuntime().availableProce- ssors()得到
UCPU:是期望的CPU利用率(該值應(yīng)該介于0和1之間)
W/C:是等待時(shí)間與計(jì)算時(shí)間的比率
你的應(yīng)用99%的時(shí)間都在等待商店的響應(yīng)晰房,所以估算出的W/C比率為100求摇。如果期望的CPU利用率是100%,需要?jiǎng)?chuàng)建一個(gè)擁有400個(gè)線(xiàn)程的線(xiàn)程池殊者。實(shí)際操作中与境,如果你創(chuàng)建的線(xiàn)程數(shù)比商店的數(shù)目更多,反而是一種浪費(fèi)猖吴。出于這種考慮摔刁,建議將執(zhí)行器使用的線(xiàn)程數(shù),與需要查詢(xún)的商店數(shù)目設(shè)定為同一個(gè)值距误,這樣每個(gè)商店都應(yīng)該對(duì)應(yīng)一個(gè)服務(wù)線(xiàn)程簸搞。不過(guò),為了避免發(fā)生由于商店的數(shù)目過(guò)多導(dǎo)致服務(wù)器超負(fù)荷而崩潰准潭,你還是需要設(shè)置一個(gè)上限趁俊,比如100個(gè)線(xiàn)程。
private final Executor executor = Executors.newFixedThreadPool(Math.min(shops.size(), 100),//創(chuàng)建一個(gè)線(xiàn)程池刑然,線(xiàn)程池中線(xiàn)程數(shù)目為100和商店數(shù)目二者中較小的一個(gè)值
new ThreadFactory() {
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);//使用守護(hù)線(xiàn)程寺擂,這種方式不會(huì)阻止程序的關(guān)停
return t;
}
});
現(xiàn)在創(chuàng)建的是一個(gè)由守護(hù)線(xiàn)程構(gòu)成的線(xiàn)程池。Java程序無(wú)法終止或者退出一個(gè)正在運(yùn)行中的線(xiàn)程泼掠,所以最后剩下的那個(gè)線(xiàn)程會(huì)由于一直等待無(wú)法發(fā)生的事件而引發(fā)問(wèn)題怔软。相反,如果將線(xiàn)程標(biāo)記為守護(hù)進(jìn)程择镇,意味著程序退出時(shí)它也會(huì)被回收挡逼。這二者之間沒(méi)有性能上的差異。現(xiàn)在腻豌,可以將執(zhí)行器作為第二個(gè)參數(shù)傳遞給supplyAsync工廠方法了家坎。比如可以按照下面的方式創(chuàng)建一個(gè)可查詢(xún)指定商品價(jià)格的CompletableFuture對(duì)象:
CompletableFuture.supplyAsync(() -> shop.getName() + " price is " + shop.getPrice(product), executor);
改進(jìn)后嘱能,使用CompletableFuture方案的程序處理5個(gè)商店僅耗時(shí)1021秒,處理9個(gè)商店耗時(shí)1022秒虱疏。一般而言惹骂,這種狀態(tài)會(huì)一直持續(xù),直到商店的數(shù)目達(dá)到之前計(jì)算的閾值400做瞪。
并行時(shí)使用流還是CompletableFutures对粪?
對(duì)集合進(jìn)行并行計(jì)算有兩種方式:要么將其轉(zhuǎn)化為并行流,利用map操作装蓬,要么枚舉出集合中的每一個(gè)元素著拭,創(chuàng)建新的線(xiàn)程,在CompletableFuture內(nèi)對(duì)其進(jìn)行操作矛物。后者提供了更多的靈活性茫死,你可以調(diào)整線(xiàn)程池的大小跪但,而這能幫助你確保整體的計(jì)算不會(huì)因?yàn)榫€(xiàn)程都在等待I/O而發(fā)生阻塞履羞。
使用這些API的建議:
- 如果你進(jìn)行的是計(jì)算密集型的操作,并且沒(méi)有I/O屡久,那么推薦使用Stream接口忆首,因?yàn)閷?shí)現(xiàn)簡(jiǎn)單被环,同時(shí)效率也可能是最高的。
- 如果并行的工作單元還涉及等待I/O的操作(包括網(wǎng)絡(luò)連接等待)浸锨,那么使用CompletableFuture靈活性更好,可以依據(jù)等待/計(jì)算柱搜,或者W/C的比率設(shè)定需要使用的線(xiàn)程數(shù)剥险。這種情況不使用并行流的另一個(gè)原因是,處理流的流水線(xiàn)中如果發(fā)生I/O等待表制,流的延遲特性會(huì)讓我們很難判斷到底什么時(shí)候觸發(fā)了等待。
4么介、對(duì)多個(gè)異步任務(wù)進(jìn)行流水線(xiàn)操作
示例:所有商店都使用一個(gè)集中式的折扣服務(wù),該折扣服務(wù)提供了五個(gè)不同的折扣代碼设拟,每個(gè)折扣代碼對(duì)應(yīng)不同的折扣率衷咽。
public class Discount {
public enum Code {
NONE(0), SILVER(5), GOLD(10), PLATINUM(15), DIAMOND(20);
private final int percentage;
Code(int percentage) {
this.percentage = percentage;
}
}
// Discount類(lèi)的具體實(shí)現(xiàn)
}
假設(shè)所有的商店都統(tǒng)一修改getPrice方法的返回格式镶骗。getPrice現(xiàn)在以ShopName:price:DiscountCode的格式返回一個(gè)String類(lèi)型的值躲雅。
public String getPrice(String product) {
double price = calculatePrice(product);
Discount.Code code = Discount.Code.values()[random.nextInt(Discount.Code.values().length)];
return String.format("%s:%.2f:%s", name, price, code);
}
private double calculatePrice(String product) {
delay();
return random.nextDouble() * product.charAt(0) + product.charAt(1);
}
1)實(shí)現(xiàn)折扣服務(wù)
對(duì)商店返 回字符串的解析操作封裝到了下面的Quote類(lèi)之中:
public class Quote {
private final String shopName;
private final double price;
private final Discount.Code discountCode;
public Quote(String shopName, double price, Discount.Code code) {
this.shopName = shopName;
this.price = price;
this.discountCode = code;
}
public static Quote parse(String s) {
String[] split = s.split(":");
String shopName = split[0];
double price = Double.parseDouble(split[1]);
Discount.Code discountCode = Discount.Code.valueOf(split[2]);
return new Quote(shopName, price, discountCode);
}
public String getShopName() { return shopName; }
public double getPrice() { return price;
public Discount.Code getDiscountCode() { return discountCode;}
}
通過(guò)傳遞shop對(duì)象返回的字符串給靜態(tài)工廠方法parse,可以得到Quote類(lèi)的一個(gè)實(shí)例相赁, 它包含了shop的名稱(chēng)相寇、折扣之前的價(jià)格,以及折扣代碼钮科。
Discount服務(wù)還提供了一個(gè)applyDiscount方法唤衫,它接收一個(gè)Quote對(duì)象,返回一個(gè)字符串绵脯,表示生成該Quote的shop中的折扣價(jià)格:
public class Discount {
public enum Code {
// 源碼省略……
}
public static String applyDiscount(Quote quote) {
return quote.getShopName() + " price is " + Discount.apply(quote.getPrice(),quote.getDiscountCode());
}
private static double apply(double price, Code code) {
delay();
return format(price * (100 - code.percentage) / 100);
}
}
2)使用Discount服務(wù)
以簡(jiǎn)單的方式實(shí)現(xiàn)使用Discount服務(wù)的findPrices方法:
public List<String> findPrices(String product) {
return shops.stream()
.map(shop -> shop.getPrice(product)) //取得每個(gè)shop對(duì)象中商品的原始價(jià)格
.map(Quote::parse)//在Quote對(duì)象中對(duì)shop返回的字符串進(jìn)行轉(zhuǎn)換
.map(Discount::applyDiscount)//為每個(gè)Quote申請(qǐng)折扣
.collect(toList());
}
通過(guò)在shop構(gòu)成的流上采用流水線(xiàn)方式執(zhí)行三次map操作佳励,得到了期望的結(jié)果。
3)構(gòu)造同步和異步操作
使用CompletableFuture實(shí)現(xiàn)findPrices方法:
public List<String> findPrices(String product) {
List<CompletableFuture<String>> priceFutures = shops.stream()
.map(shop -> CompletableFuture.supplyAsync(() -> //以異步方式取得每個(gè)shop中指定產(chǎn)品的原始價(jià)格
shop.getPrice(product), executor))
.map(future -> future.thenApply(Quote::parse))//Quote對(duì)象存在時(shí)蛆挫,對(duì)其返回的值進(jìn)行轉(zhuǎn)換
.map(future -> future.thenCompose(quote ->//使用另一個(gè)異步任務(wù)構(gòu)造期望的Future赃承,申請(qǐng)折扣 CompletableFuture.supplyAsync(() -> Discount.applyDiscount(quote), executor)))
.collect(toList());
return priceFutures.stream()
.map(CompletableFuture::join)
.collect(toList());
}
通常而言,名稱(chēng)中不帶Async的方法和它的前一個(gè)任務(wù)一樣悴侵,在同一個(gè)線(xiàn)程中運(yùn)行瞧剖;而名稱(chēng)以Async結(jié)尾的方法會(huì)將后續(xù)的任務(wù)提交到一個(gè)線(xiàn)程池,所以每個(gè)任務(wù)是由不同的線(xiàn)程處理的可免。
4)將兩個(gè)CompletableFuture對(duì)象整合起來(lái)
對(duì)一個(gè)CompletableFuture對(duì)象調(diào)用了thenCompose方法抓于,并向其傳遞了第二個(gè)CompletableFuture,而第二個(gè)CompletableFuture又需要使用第一個(gè)CompletableFuture的執(zhí)行結(jié)果作為輸入浇借。但是捉撮,另一種比較常見(jiàn)的情況是,你需要將兩個(gè)完 全不相干的CompletableFuture對(duì)象的結(jié)果整合起來(lái)逮刨,而且也不希望等到第一個(gè)任務(wù)完全結(jié) 束才開(kāi)始第二項(xiàng)任務(wù)呕缭。
這種情況,應(yīng)該使用thenCombine方法修己,它接收名為BiFunction的第二參數(shù)恢总,這個(gè)參數(shù)定義了當(dāng)兩個(gè)CompletableFuture對(duì)象完成計(jì)算后,結(jié)果如何合并睬愤。同thenCompose方法一樣片仿,thenCombine方法也提供有一個(gè)Async的版本。如果使用thenCombineAsync會(huì)導(dǎo)致BiFunction中定義的合并操作被提交到線(xiàn)程池中尤辱,由另一個(gè)任務(wù)以異步的方式執(zhí)行厢岂。
合并兩個(gè)獨(dú)立的CompletableFuture對(duì)象:
Future<Double> futurePriceInUSD = CompletableFuture
.supplyAsync(() -> shop.getPrice(product))//創(chuàng)建第一個(gè)任務(wù)查詢(xún) 商店取得商品的價(jià)格
.thenCombine(
CompletableFuture.supplyAsync(() -> exchangeService.getRate(Money.EUR, Money.USD)),//創(chuàng)建第二個(gè)獨(dú)立任務(wù)塔粒,查詢(xún)美元和歐元之間的轉(zhuǎn)換匯率
(price, rate) -> price * rate //通過(guò)乘法整合得到的商品價(jià)格和匯率
);
5)對(duì)Future和CompletableFuture的回顧
CompletableFuture利用Lambda表達(dá)式以聲明式的API提供了一種機(jī)制,能夠用有效的方式圃酵,非常容易地將多個(gè)以同步或異步方式執(zhí)行復(fù)雜操作的任務(wù)結(jié)合到一起郭赐。
利用Java 7的方法合并兩個(gè)Future對(duì)象:
ExecutorService executor = Executors.newCachedThreadPool();
final Future<Double> futureRate = executor.submit(new Callable<Double>() {
public Double call() {
return exchangeService.getRate(Money.EUR, Money.USD);
}
});
Future<Double> futurePriceInUSD = executor.submit(new Callable<Double>() {
public Double call() {
double priceInEUR = shop.getPrice(product);
return priceInEUR * futureRate.get();
}
});
通過(guò)向執(zhí)行器提交一個(gè)Callable對(duì)象的方式創(chuàng)建了第一個(gè)Future對(duì)象捌锭,向外部服務(wù)查詢(xún)歐元和美元之間的轉(zhuǎn)換匯率舀锨。接著宛逗,創(chuàng)建了第二個(gè)Future對(duì)象雷激,查詢(xún)指定商店中特定商品的歐元價(jià)格屎暇。最終根悼,用與上述合并兩個(gè)獨(dú)立的CompletableFuture對(duì)象一樣的方式挤巡,在同一個(gè)Future中通過(guò)查詢(xún)商店得到的歐元商品價(jià)格乘以匯率得到了終的價(jià)格矿卑。
5母廷、響應(yīng)CompletableFuture的completion事件
使用randomDelay方法取代原來(lái)的固定延遲琴昆,模擬生成0.5秒至2.5秒隨機(jī)延遲的方法:
private static final Random random = new Random();
public static void randomDelay() {
int delay = 500 + random.nextInt(2000);
try {
Thread.sleep(delay);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
對(duì)最佳價(jià)格查詢(xún)器應(yīng)用的優(yōu)化
要避免的首要問(wèn)題是业舍,等待創(chuàng)建一個(gè)包含了所有價(jià)格的List創(chuàng)建完成勤讽。應(yīng)該做的是直接處理CompletableFuture流脚牍,這樣每個(gè)CompletableFuture都在為某個(gè)商店執(zhí)行必要的操作诸狭。
重構(gòu)findPrices方法返回一個(gè)由Future構(gòu)成的流:
public Stream<CompletableFuture<String>> findPricesStream(String product) {
return shops.stream()
.map(shop -> CompletableFuture.supplyAsync(() -> shop.getPrice(product), executor))
.map(future -> future.thenApply(Quote::parse))
.map(future -> future.thenCompose(quote -> CompletableFuture.supplyAsync(() -> Discount.applyDiscount(quote), executor)));
}
現(xiàn)在驯遇,為findPricesStream方法返回的Stream添加了第四個(gè)map操作叉庐,在此之前陡叠,已經(jīng)在該方法內(nèi)部調(diào)用了三次map枉阵。這個(gè)新添加的操作只是在每個(gè)CompletableFuture上注冊(cè)一個(gè)操作兴溜,該操作會(huì)在CompletableFuture完成執(zhí)行后使用它的返回值拙徽。Java 8的CompletableFuture通過(guò)thenAccept方法提供了這一功能,它接收 CompletableFuture執(zhí)行完畢后的返回值做參數(shù)已卷。
將結(jié)果打印輸出:
findPricesStream("myPhone").map(f -> f.thenAccept(System.out::println));
和thenCompose和thenCombine方法一樣侧蘸,thenAccept方法也提供 了一個(gè)異步版本讳癌,名為thenAcceptAsync。異步版本的方法會(huì)對(duì)處理結(jié)果的消費(fèi)者進(jìn)行調(diào)度逢艘,從線(xiàn)程池中選擇一個(gè)新的線(xiàn)程繼續(xù)執(zhí)行它改,不再由同一個(gè)線(xiàn)程完成CompletableFuture的所有任務(wù)央拖。
由于thenAccept方法已經(jīng)定義了如何處理CompletableFuture返回的結(jié)果鲜戒,一旦CompletableFuture計(jì)算得到結(jié)果遏餐,它就返回一個(gè)CompletableFuture<Void>赢底。所以颖系,map操作返回的是一個(gè)Stream<CompletableFuture<Void>>嘁扼。
把構(gòu)成Stream的所有CompletableFuture<Void>對(duì)象放到一個(gè)數(shù)組中趁啸,等待所有的任務(wù)執(zhí)行完成:
CompletableFuture[] futures = findPricesStream("myPhone")
.map(f -> f.thenAccept(System.out::println))
.toArray(size -> new CompletableFuture[size]); CompletableFuture.allOf(futures).join();
allOf工廠方法接收一個(gè)由CompletableFuture構(gòu)成的數(shù)組不傅,數(shù)組中的所有CompletableFuture對(duì)象執(zhí)行完成之后访娶,它返回一個(gè)CompletableFuture<Void>對(duì)象崖疤。
如果只要CompletableFuture對(duì)象數(shù)組中有任何一個(gè)執(zhí)行完畢就不再等待,可以使用一個(gè)類(lèi)似的工廠方法anyOf叮趴。該方法接收一個(gè)CompletableFuture對(duì)象構(gòu)成的數(shù)組眯亦,返回由第一個(gè)執(zhí)行完畢的CompletableFuture對(duì)象的返回值構(gòu)成的CompletableFuture<Object>妻率。
--參考文獻(xiàn)《Java8實(shí)戰(zhàn)》