1 Future介紹
1.1 Future的主要功能
JDK5新增了Future接口换况,用于描述一個異步計算的結(jié)果。
Future就是對于具體的Runnable或者Callable任務的執(zhí)行結(jié)果進行取消厅各、查詢是否完成镜撩、獲取結(jié)果等操作。必要時可以通過get方法獲取執(zhí)行結(jié)果讯检,該方法會阻塞直到任務返回結(jié)果琐鲁。
Future類位于java.util.concurrent包下,它是一個接口:
public interface Future<V> {
/**
* 方法用來取消任務人灼,如果取消任務成功則返回true围段,如果取消任務失敗則返回false。 *
* @param mayInterruptIfRunning 表示是否允許取消正在執(zhí)行卻沒有執(zhí)行完畢的任務投放,如果設置true奈泪,則表示可以取消正在執(zhí)行過程中的任務。
* @return 如果任務已經(jīng)完成灸芳,則無論mayInterruptIfRunning為true還是false涝桅,此方法肯定返回false,即如果取消已經(jīng)完成的任務會返回false烙样;
* 如果任務正在執(zhí)行冯遂,若mayInterruptIfRunning設置為true,則返回true谒获,若mayInterruptIfRunning設置為false蛤肌,則返回false壁却;
* 如果任務還沒有執(zhí)行,則無論mayInterruptIfRunning為true還是false裸准,肯定返回true展东。
*/
boolean cancel(boolean mayInterruptIfRunning);
/**
* 方法表示任務是否被取消成功
* @return 如果在任務正常完成前被取消成功,則返回 true
*/
boolean isCancelled();
/**
* 方法表示任務是否已經(jīng)完成
* @return 若任務完成炒俱,則返回true
*/
boolean isDone();
/**
* 方法用來獲取執(zhí)行結(jié)果盐肃,這個方法會產(chǎn)生阻塞,會一直等到任務執(zhí)行完畢才返回
* @return 任務執(zhí)行的結(jié)果值
* @throws InterruptedException 線程被中斷異常
* @throws ExecutionException 任務執(zhí)行異常权悟,如果任務被取消砸王,還會拋出CancellationException
*/
V get() throws InterruptedException, ExecutionException;
/**
* 用來獲取執(zhí)行結(jié)果,如果在指定時間內(nèi)僵芹,還沒獲取到結(jié)果处硬,就直接返回null(并不是拋出異常,需要注意)拇派。
* @param timeout 超時時間
* @param unit 超時單位
* @return
* @throws InterruptedException
* @throws ExecutionException
* @throws TimeoutException 如果計算超時荷辕,將拋出TimeoutException(待確認)
*/
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}
從上面方法的注釋可以看出,F(xiàn)utrue提供了三種功能:
1)判斷任務是否完成件豌;
2)能夠中斷任務疮方;
3)能夠獲取任務執(zhí)行結(jié)果。(最為常用的)
1.2 Future的局限性
從本質(zhì)上說茧彤,F(xiàn)uture表示一個異步計算的結(jié)果骡显。它提供了isDone()來檢測計算是否已經(jīng)完成,并且在計算結(jié)束后曾掂,可以通過get()方法來獲取計算結(jié)果惫谤。在異步計算中,F(xiàn)uture確實是個非常優(yōu)秀的接口珠洗。但是溜歪,它的本身也確實存在著許多限制:
- 并發(fā)執(zhí)行多任務:Future只提供了get()方法來獲取結(jié)果,并且是阻塞的许蓖。所以蝴猪,除了等待你別無他法;當 for 循環(huán)批量獲取 Future 的結(jié)果時容易 block膊爪,因此get 方法調(diào)用時應使用 timeout 限制自阱。
- 無法對多個任務進行鏈式調(diào)用:如果你希望在計算任務完成后執(zhí)行特定動作,比如發(fā)郵件米酬,但Future卻沒有提供這樣的能力沛豌;
- 無法組合多個任務:如果你運行了10個任務,并期望在它們?nèi)繄?zhí)行結(jié)束后執(zhí)行特定動作赃额,那么在Future中這是無能為力的加派;
- 沒有異常處理:Future接口中沒有關于異常處理的方法阁簸;
2 CompletableFuture介紹
雖然 Future 以及相關使用方法提供了異步執(zhí)行任務的能力,但是對于結(jié)果的獲取卻是很不方便哼丈,只能通過阻塞或者輪詢的方式得到任務的結(jié)果。如果遇到前面的task執(zhí)行較慢時需要阻塞等待前面的task執(zhí)行完后面task才能取得結(jié)果筛严。
阻塞的方式顯然和我們的異步編程的初衷相違背醉旦,輪詢的方式又會耗費無謂的 CPU 資源,而且也不能及時地得到計算結(jié)果桨啃。而CompletableFuture的主要功能就是一邊生成任務,一邊獲取任務的返回值车胡。讓兩件事分開執(zhí)行,任務之間不會互相阻塞,可以實現(xiàn)先執(zhí)行完的先取結(jié)果照瘾,不再依賴任務順序了匈棘。
2.1 CompletableFuture原理
內(nèi)部通過阻塞隊列+FutureTask,實現(xiàn)了任務先完成可優(yōu)先獲取到析命,即結(jié)果按照完成先后順序排序主卫,內(nèi)部有一個先進先出的阻塞隊列,用于保存已經(jīng)執(zhí)行完成的Future鹃愤,通過調(diào)用它的take方法或poll方法可以獲取到一個已經(jīng)執(zhí)行完成的Future簇搅,進而通過調(diào)用Future接口實現(xiàn)類的get方法獲取最終的結(jié)果。
2.2 應用場景
當需要批量提交異步任務的時候建議使用CompletableFuture软吐。CompletableFuture將線程池Executor和阻塞隊列BlockingQueue的功能融合在了一起瘩将,能夠讓批量異步任務的管理更簡單。
CompletableFuture能夠讓異步任務的執(zhí)行結(jié)果有序化凹耙。先執(zhí)行完的先進入阻塞隊列姿现,利用這個特性,你可以輕松實現(xiàn)后續(xù)處理的有序性肖抱,避免無謂的等待备典。
線程池隔離。CompletionService支持自己創(chuàng)建線程池虐沥,這種隔離性能避免幾個特別耗時的任務拖垮整個應用的風險熊经。
2.3 CompletableFuture使用詳解
簡單的任務,用Future獲取結(jié)果還好欲险,但我們并行提交的多個異步任務镐依,往往并不是獨立的,很多時候業(yè)務邏輯處理存在串行[依賴]天试、并行槐壳、聚合的關系。如果要我們手動用 Fueture 實現(xiàn)喜每,是非常麻煩的务唐。
CompletableFuture是Future接口的擴展和增強雳攘。CompletableFuture實現(xiàn)了Future接口,并在此基礎上進行了豐富地擴展枫笛,完美地彌補了Future上述的種種問題吨灭。
更為重要的是,CompletableFuture實現(xiàn)了對任務的編排能力刑巧。借助這項能力喧兄,我們可以輕松地組織不同任務的運行順序、規(guī)則以及方式啊楚。從某種程度上說,這項能力是它的核心能力恭理。而在以往拯辙,雖然通過CountDownLatch等工具類也可以實現(xiàn)任務的編排,但需要復雜的邏輯處理颜价,不僅耗費精力且難以維護涯保。
3 CompletableFuture應用梳理
很多方法上,可以指定線程池拍嵌,而沒有指定Executor的方法會使用ForkJoinPool.commonPool() 作為它的線程池執(zhí)行異步代碼遭赂。如果指定線程池,則使用指定的線程池運行横辆。
默認情況下 CompletableFuture 會使用公共的 ForkJoinPool 線程池撇他,這個線程池默認創(chuàng)建的線程數(shù)是 CPU 的核數(shù)(也可以通過 JVM option:-Djava.util.concurrent.ForkJoinPool.common.parallelism 來設置 ForkJoinPool 線程池的線程數(shù))。
如果所有 CompletableFuture 共享一個線程池狈蚤,那么一旦有任務執(zhí)行一些很慢的 I/O 操作困肩,就會導致線程池中所有線程都阻塞在 I/O 操作上,從而造成線程饑餓脆侮,進而影響整個系統(tǒng)的性能锌畸。所以,強烈建議要根據(jù)不同的業(yè)務類型創(chuàng)建不同的線程池靖避,以避免互相干擾潭枣。
-
等我們使用的時候,會注意到
CompletableFuture
的方法命名規(guī)則:-
xxx()
:表示該方法將繼續(xù)在已有的線程中執(zhí)行幻捏;
-
xxxAsync()
:表示可能會使用其它的線程去執(zhí)行(如果使用相同的線程池盆犁,也可能會被同一個線程選中執(zhí)行)。
4 使用案例
4.1 基礎使用案例
串行執(zhí)行:
定義兩個CompletableFuture
篡九,第一個CompletableFuture
根據(jù)證券名稱查詢證券代碼谐岁,第二個CompletableFuture
根據(jù)證券代碼查詢證券價格,這兩個CompletableFuture
實現(xiàn)串行操作如下:
CompletableFuture.supplyAsync():創(chuàng)建一個包含返回值的異步任務;
thenApplyAsync():獲取前一個線程的結(jié)果進行轉(zhuǎn)換伊佃,有返回值窜司;
thenAccept():獲取前一個線程的結(jié)果進行消費蹋凝,無返回值赊颠。
public class Demo {
private static CountDownLatch countDownLatch = new CountDownLatch(1);
public static void main(String[] args) throws Exception {
// 第一個任務:創(chuàng)建一個包含返回值的CompletableFuture
CompletableFuture<String> cfQuery = CompletableFuture.supplyAsync(() -> {
return queryCode("中國石油");
});
// cfQuery成功后繼續(xù)執(zhí)行下一個任務:
CompletableFuture<Double> cfFetch = cfQuery.thenApplyAsync((code) -> {
return fetchPrice(code);
});
// cfFetch成功后打印結(jié)果:
cfFetch.thenAccept((result) -> {
System.out.println("price: " + result);
});
// 主線程不要結(jié)束棒仍,否則CompletableFuture默認使用的線程池會立刻關閉:
countDownLatch.await();
}
public static void main2(String[] args) throws Exception {
CompletableFuture.supplyAsync(() -> queryCode("中國石油"))
.thenApplyAsync((code) -> fetchPrice(code))
.thenAccept((result) -> System.out.println("price: " + result));
countDownLatch.await();
}
static String queryCode(String name) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
String code = "601857";
System.out.println("查詢證券編碼,name:" + name + ",code:" + code);
return code;
}
static Double fetchPrice(String code) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
Double price = 5 + Math.random() * 20;
System.out.println("根據(jù)證券編碼查詢價格,code:" + code + ";price:" + price);
return price;
}
}
并行執(zhí)行:
除了串行執(zhí)行外尉姨,多個CompletableFuture
還可以并行執(zhí)行。例如词顾,我們考慮這樣的場景:
同時從新浪和網(wǎng)易查詢證券代碼剃执,只要任意一個返回結(jié)果懦底,就進行下一步查詢價格漠秋,查詢價格也同時從新浪和網(wǎng)易查詢,只要任意一個返回結(jié)果抵屿,就完成操作:
CompletableFuture.supplyAsync():創(chuàng)建一個包含返回值的異步任務庆锦;
CompletableFuture.anyOf(cf1,cf2,cf3).join():多個異步線程任一執(zhí)行完即返回,有返回值Object轧葛;
thenAccept():獲取前一個線程的結(jié)果進行消費搂抒,無返回值。
public class Demo2 {
private static CountDownLatch COUNT_DOWN_LATCH = new CountDownLatch(1);
public static void main(String[] args) throws Exception {
// 兩個CompletableFuture執(zhí)行異步查詢:
CompletableFuture<String> cfQueryFromSina = CompletableFuture.supplyAsync(() -> {
return queryCode("中國石油", "https://finance.sina.com.cn/code/");
});
CompletableFuture<String> cfQueryFrom163 = CompletableFuture.supplyAsync(() -> {
return queryCode("中國石油", "https://money.163.com/code/");
});
// 用anyOf合并為一個新的CompletableFuture:
CompletableFuture<Object> cfQuery = CompletableFuture.anyOf(cfQueryFromSina, cfQueryFrom163);
// 兩個CompletableFuture執(zhí)行異步查詢:
CompletableFuture<Double> cfFetchFromSina = cfQuery.thenApplyAsync((code) -> {
return fetchPrice((String) code, "https://finance.sina.com.cn/price/");
});
CompletableFuture<Double> cfFetchFrom163 = cfQuery.thenApplyAsync((code) -> {
return fetchPrice((String) code, "https://money.163.com/price/");
});
// 用anyOf合并為一個新的CompletableFuture:
CompletableFuture<Object> cfFetch = CompletableFuture.anyOf(cfFetchFromSina, cfFetchFrom163);
// 最終結(jié)果:
cfFetch.thenAccept((result) -> {
System.out.println("price: " + result);
});
// 主線程不要立刻結(jié)束尿扯,否則CompletableFuture默認使用的線程池會立刻關閉:
COUNT_DOWN_LATCH.await();
}
public static void main2(String[] args) throws Exception {
// 兩個CompletableFuture執(zhí)行異步查詢:
CompletableFuture<String> cfQueryFromSina = CompletableFuture.supplyAsync(() -> queryCode("中國石油", "https://finance.sina.com.cn/code/"));
CompletableFuture<String> cfQueryFrom163 = CompletableFuture.supplyAsync(() -> queryCode("中國石油", "https://money.163.com/code/"));
// 用anyOf合并為一個新的CompletableFuture:
CompletableFuture<Object> cfQuery = CompletableFuture.anyOf(cfQueryFromSina, cfQueryFrom163);
// 兩個CompletableFuture執(zhí)行異步查詢:
CompletableFuture<Double> cfFetchFromSina = cfQuery.thenApplyAsync((code) -> fetchPrice((String) code, "https://finance.sina.com.cn/price/"));
CompletableFuture<Double> cfFetchFrom163 = cfQuery.thenApplyAsync((code) -> fetchPrice((String) code, "https://money.163.com/price/"));
// 用anyOf合并為一個新的CompletableFuture:
CompletableFuture<Object> cfFetch = CompletableFuture.anyOf(cfFetchFromSina, cfFetchFrom163);
// 最終結(jié)果:
cfFetch.thenAccept((result) -> System.out.println("price: " + result));
// 主線程不要立刻結(jié)束求晶,否則CompletableFuture默認使用的線程池會立刻關閉:
COUNT_DOWN_LATCH.await();
}
static String queryCode(String name, String url) {
System.out.println(Thread.currentThread().getName() + " query code from " + url + "...");
try {
Thread.sleep((long) (Math.random() * 100));
} catch (InterruptedException e) {
e.printStackTrace();
}
String code = "601857";
System.out.println(Thread.currentThread().getName() + " 查詢證券編碼,name:" + name + ",code:" + code);
return code;
}
static Double fetchPrice(String code, String url) {
System.out.println(Thread.currentThread().getName() + " query price from " + url + "...");
try {
Thread.sleep((long) (Math.random() * 100));
} catch (InterruptedException e) {
e.printStackTrace();
}
Double price = 5 + Math.random() * 20;
System.out.println(Thread.currentThread().getName() + " 根據(jù)證券編碼查詢價格,code:" + code + ";price:" + price);
return price;
}
}
上述邏輯實現(xiàn)的異步查詢規(guī)則實際上是:
4.2 實現(xiàn)最優(yōu)的“燒水泡茶”程序
public class Demo3 {
public static void main(String[] args) {
//任務1:洗水壺 -> 燒開水
CompletableFuture<String> f11 = CompletableFuture.supplyAsync(() -> {
System.out.println("T1:洗水壺...開始");
sleep(1000);
return "T1:洗水壺...完成";
});
CompletableFuture<String> f12 = f11.thenApply((f11Result) -> {
System.out.println(f11Result);
System.out.println("T1:燒開水...開始");
sleep(3000);
return "T1:燒開水...完成";
});
//任務2:洗茶壺->洗茶杯->拿茶葉
CompletableFuture<Void> f21 = CompletableFuture.runAsync(() -> {
System.out.println("==============T2:洗茶壺...開始");
sleep(1000);
System.out.println("==============T2:洗茶壺...完成");
});
CompletableFuture<Void> f22 = f21.thenRun(() -> {
System.out.println("==============T2:洗茶杯...開始");
sleep(2000);
System.out.println("==============T2:洗茶杯...完成");
});
CompletableFuture<String> f23 = f22.thenApply(result -> {
System.out.println("==============T2:拿茶葉...開始");
sleep(1000);
System.out.println("==============T2:拿茶葉...完成");
return "龍井";
});
//任務3:任務1和任務2完成后執(zhí)行:泡茶
CompletableFuture<String> f3 = f12.thenCombine(f23, (f1Result, f2Result) -> {
System.out.println(f1Result);
System.out.println("************T2:拿到茶葉:result" + f2Result);
System.out.println("************T3:泡茶...,什么茶:" + f2Result);
return "上茶:" + f2Result;
});
//等待任務3執(zhí)行結(jié)果
System.out.println(f3.join());
}
static void sleep(int t) {
try {
Thread.sleep(t);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}