背景
如果你想要在同一個CPU上執(zhí)行幾個松耦合的任務曹宴,同時防止因某個任務等待過長而阻塞線程的執(zhí)行驴剔,那么你需要做的是充分利用CPU的核届巩,讓其足夠忙碌旱易,最大化程序的吞吐量從而實現(xiàn)并發(fā)。
并行與并發(fā)的區(qū)別:
并行:在同一個核上同時執(zhí)行多個任務建丧,任務不互相阻塞
并發(fā):多個任務分發(fā)給多個核去執(zhí)行
在java5中排龄,已經(jīng)引入了Future接口方便開發(fā)人員進行異步編程。由于其使用繁瑣翎朱,代碼復雜橄维,不足以讓我們編寫簡介并發(fā)代碼,因此java8引入了CompletableFuture接口闭翩。
使用CompletableFuture構(gòu)建異步應用
查詢商品價格的例子挣郭,假設(shè)獲取價格是一個遠程服務迄埃,我們使用sleep 1秒來模擬此行為疗韵。
public class Shop {
private String name;
public Double getPrice(String product){
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
return new Random().nextDouble() * product.charAt(0) + product.charAt(1);
}
}
List<Shop> shops = Arrays.asList(new Shop("BestPrice"),
new Shop("LetsSaveBig"),
new Shop("MyFavoriteShop"),
new Shop("MyFavoriteShop2"),
new Shop("MyFavoriteShop3"),
new Shop("MyFavoriteShop4"),
new Shop("MyFavoriteShop5"),
new Shop("MyFavoriteShop6"),
new Shop("MyFavoriteShop7"),
new Shop("BuyItAll"));
-
情景一
試想此場景,我們需要根據(jù)某個商品名稱去查詢商品的價格侄非,發(fā)貨地等等一系列操作蕉汪,我們可能會寫出如下偽代碼:
操作A
shop.getPrice(product)
操作B
操作C
...
這些操作直接沒有什么關(guān)聯(lián)性,上述代碼中靠后的操作需要等待前面的操作執(zhí)行完之后才能執(zhí)行逞怨,造成了阻塞者疤。
那么我們其實可以使用CompletableFuture來實現(xiàn)異步執(zhí)行,下面的代碼中每個操作都不需要等待前面的操作便能執(zhí)行叠赦。
操作A
CompletableFuture<Double> completableFuture
= CompletableFuture.supplyAsync(() -> shop.getPrice(product));
操作B
操作C
...
Double d = completableFuture.get();
-
情景二
如果給定一個product和一個List<Shop>,想要獲取所有shop中對此product的定價驹马,該如何實現(xiàn)革砸?
我們已經(jīng)知道流的使用,按照常規(guī)思路糯累,寫出下列代碼應該不難
List<Double> list = shops.stream() .map( (Shop s) -> s.getPrice(product) ) .collect(Collectors.toList());
但是我們可不可以把map中獲取價格的代碼實現(xiàn)異步執(zhí)行呢算利?答案當然是可以的。
其中一種操作是將流轉(zhuǎn)為并行流泳姐,這里我們使用另一種方式:
List<CompletableFuture<Double>> list = shops.stream() .map( (Shop s) -> CompletableFuture.supplyAsync( () -> s.getPrice(product) ) ) .collect(Collectors.toList()); List<Double> list2 =list.stream() .map(CompletableFuture::join).collect(Collectors.toList());
-
情景三
如果你試過情景三種的實現(xiàn)方式后效拭,你會發(fā)現(xiàn)其執(zhí)行速度并沒有多少提升。那么有沒有方法能夠讓他更快點呢胖秒?我們可以通過調(diào)整線程池的大小缎患,確保整體的計算不會因為線程都在等待I/O而發(fā)生阻塞。
List中有10個shop阎肝,我們可以調(diào)整線程池大小為10個挤渔。
final Executor executor = Executors.newFixedThreadPool(11, new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setDaemon(true); return t; } }); // CompletableFuture.supplyAsync()方法可以設(shè)置第二個參數(shù) CompletableFuture.supplyAsync( () -> s.getPrice(product),executor )
-
情景四
對兩個異步操作進行流水線,第一個操作完成時风题,將其 結(jié)果作為參數(shù)傳遞給第二個操作蚂蕴。使用thenCompose連接。
List<CompletableFuture<String>> list = shops.stream() .map( (Shop s) -> CompletableFuture.supplyAsync( () -> s.getPrice(product), executor ) ) .map(c -> c.thenCompose( (Double d) -> CompletableFuture.supplyAsync( () ->d.doubleValue() + "---" + (d.doubleValue() - new Random().nextInt(10))) )) .collect(Collectors.toList());
-
情景五
將兩個完全不相干的CompletableFuture對象的結(jié)果整合起來俯邓,而且你也不希望等到第一個任務完全結(jié)束才開始第二項任務 骡楼。使用thenCombine連接
List<CompletableFuture<String>> list = shops.stream() .map(s -> CompletableFuture.supplyAsync(() -> s.getPrice(product), executor) .thenCombine( CompletableFuture.supplyAsync( () -> new Random().nextInt(10) ), (d, c) -> d + "-----" + c )) .collect(Collectors.toList());
-
-
情景六
響應CompletableFuture的completion事件
一旦CompletableFuture計算得到結(jié)果,就得到一個相應稽鞭。那么可以使用thenAccept
Stream<CompletableFuture<String>> list = shops.stream() .map((Shop s) -> CompletableFuture.supplyAsync(() -> s.getPrice(product), executor)) .map(c -> c.thenCompose( (Double d) -> CompletableFuture.supplyAsync(() -> d.doubleValue() + "---" + (d.doubleValue() - new Random().nextInt(10)),executor) )); list.map(c->c.thenAccept(System.out::println));