前言
阻塞”與"非阻塞"與"同步"與“異步"不能簡(jiǎn)單的從字面理解愿吹,提供一個(gè)從分布式系統(tǒng)角度的回答。
同步與異步
所謂同步,就是在發(fā)出一個(gè)調(diào)用時(shí)滩褥,在沒有得到結(jié)果之前,該調(diào)用就不返回炫加。換句話說(shuō)瑰煎,就是由調(diào)用者主動(dòng)等待這個(gè)調(diào)用的結(jié)果铺然,也就是必須一件一件事做,等前一件做完了才能做下一件事。
而異步則是相反酒甸,調(diào)用在發(fā)出之后探熔,這個(gè)調(diào)用就直接返回了。換句話說(shuō)烘挫,當(dāng)一個(gè)異步過程調(diào)用發(fā)出后诀艰,調(diào)用者不會(huì)立刻得到結(jié)果。而是在調(diào)用發(fā)出后饮六,被調(diào)用者通過狀態(tài)其垄、通知來(lái)通知調(diào)用者,或通過回調(diào)函數(shù)處理這個(gè)調(diào)用卤橄。典型的異步編程模型比如Node.js(感覺自從Node火起來(lái)之后绿满,java中的異步也開始火起來(lái),比如Vertx這個(gè)被稱為java版的Node庫(kù))窟扑。
看過知乎上嚴(yán)肅關(guān)于這個(gè)問題回答的通俗的例子:你打電話問書店老板有沒有《分布式系統(tǒng)》這本書喇颁,如果是同步通信機(jī)制,書店老板會(huì)說(shuō)嚎货,你稍等橘霎,”我查一下",然后開始查啊查殖属,等查好了(可能是5秒姐叁,也可能是一天)告訴你結(jié)果(返回結(jié)果)。
而異步通信機(jī)制洗显,書店老板直接告訴你我查一下啊外潜,查好了打電話給你,然后直接掛電話了(不返回結(jié)果)挠唆。然后查好了处窥,他會(huì)主動(dòng)打電話給你。在這里老板通過“回電”這種方式來(lái)回調(diào)玄组。
阻塞與非阻塞
阻塞和非阻塞關(guān)注的是程序在等待調(diào)用結(jié)果(消息滔驾,返回值)時(shí)的狀態(tài).
阻塞調(diào)用是指調(diào)用結(jié)果返回之前,當(dāng)前線程會(huì)被掛起(線程進(jìn)入非可執(zhí)行狀態(tài)巧勤,在這個(gè)狀態(tài)下嵌灰,cpu不會(huì)給線程分配時(shí)間片,即線程暫停運(yùn)行)颅悉。調(diào)用線程只有在得到結(jié)果之后才會(huì)返回沽瞭。非阻塞調(diào)用指在不能立刻得到結(jié)果之前,該調(diào)用不會(huì)阻塞當(dāng)前線程剩瓶,可以繼續(xù)執(zhí)行其他任務(wù)驹溃。
還是上面的例子城丧,你打電話問書店老板有沒有《分布式系統(tǒng)》這本書,你如果是阻塞式調(diào)用豌鹤,你會(huì)一直把自己“掛起”亡哄,直到得到這本書有沒有的結(jié)果,如果是非阻塞式調(diào)用布疙,你不管老板有沒有告訴你蚊惯,你自己先一邊去玩了, 當(dāng)然你也要偶爾過幾分鐘check一下老板有沒有返回結(jié)果灵临。在這里阻塞與非阻塞與是否同步異步無(wú)關(guān)截型,跟老板通過什么方式給你結(jié)果也無(wú)關(guān)。
先簡(jiǎn)單看下使用jdk實(shí)現(xiàn)異步的方式:
java future
要實(shí)現(xiàn)異步不然要開啟新的線程儒溉,如果是靠自己去開啟線程宦焦、執(zhí)行、線程切換等顿涣,則效率太低波闹,所以我們也需要使用線程池來(lái)管理線程。
- Executors創(chuàng)建線程池的幾種常見方式
Executors是一個(gè)工廠類涛碑,提供很多靜態(tài)方法精堕,注意區(qū)分Executor接口。Executor通過Executors可以創(chuàng)建不同類似的線程池锌唾,常見的大概有下表幾種類型锄码,在實(shí)際應(yīng)用中,主要使用newCachedThreadPool和newFixedThreadPool來(lái)創(chuàng)建線程池晌涕。
類名 | 說(shuō)明 |
---|---|
newCachedThreadPool | 緩存型線程池,先查看池中是否有以前建立的可用的線程痛悯,如果有余黎,就重用改線程;如果沒有载萌,就建一個(gè)新的線程加入池中使用惧财。緩存型池子通常用于執(zhí)行一些生存期很短的異步型任務(wù)。因此在一些面向連接的daemon型SERVER中用得不多扭仁。能reuse的線程垮衷,必須是timeout IDLE內(nèi)的池中線程,缺省timeout為60s乖坠,超過這個(gè)IDLE時(shí)長(zhǎng)搀突,線程實(shí)例將被終止并移出池子。注意:放入CachedThreadPool的線程超過TIMEOUT不活動(dòng)熊泵,其會(huì)自動(dòng)被終止仰迁。 |
newFixedThreadPool | 和cacheThreadPool類似甸昏,有可用的線程就使用,但不能隨時(shí)建新的線程徐许。其獨(dú)特之處:任意時(shí)間點(diǎn)施蜜,最多只能有固定數(shù)目的活動(dòng)線程存在,此時(shí)如果有新的線程要建立雌隅,只能放在另外的隊(duì)列中等待翻默,直到當(dāng)前的線程中某個(gè)線程終止直接被移出池子。cache池和fixed池調(diào)用的是同一個(gè)底層池恰起,只不過參數(shù)不同:fixed池線程數(shù)固定冰蘑,并且是0秒IDLE(無(wú)IDLE)。所以FixedThreadPool多數(shù)針對(duì)一些很穩(wěn)定很固定的正規(guī)并發(fā)線程村缸,多用于服務(wù)器祠肥。cache池線程數(shù)支持0-Integer.MAX_VALUE(顯然完全沒考慮主機(jī)的資源承受能力),60秒IDLE梯皿。 |
ScheduledThreadPool | 調(diào)度型線程池仇箱。這個(gè)池子里的線程可以按schedule依次delay執(zhí)行,或周期執(zhí)行东羹。 |
SingleThreadExecutor | 單例線程剂桥,任意時(shí)間池中只能有一個(gè)線程。用的是和cache池和fixed池相同的底層池属提,但線程數(shù)目是1-1,0秒IDLE(無(wú)IDLE)权逗。 |
- Executors創(chuàng)建線程池
// 創(chuàng)建一個(gè)cache線程池
ExecutorService service = Executors.newCachedThreadPool();
// 實(shí)際返回的是ThreadPoolExecutor對(duì)象
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
從下面的繼承關(guān)系可以看出,AbstractExecutorService實(shí)現(xiàn)了ExecutorService 冤议,ThreadPoolExecutor繼承自AbstractExecutorService
- ExecutorService(線程池)
ExecutorService繼承了Executor接口斟薇,在java.util.concurrent包中,在原有execute方法的基礎(chǔ)上新增了submit方法恕酸,可以傳入Callable和Runnable類型的task堪滨,返回一個(gè)Future類型的對(duì)象,獲取異步執(zhí)行的結(jié)果蕊温。
submit(Callable)和submit(Runnable)類似袱箱,都會(huì)返回一個(gè)Future對(duì)象,但是除此之外义矛,submit(Callable)接收的是一個(gè)Callable的實(shí)現(xiàn)发笔,Callable接口中的call()方法有一個(gè)返回值,可以返回任務(wù)的執(zhí)行結(jié)果凉翻,而Runnable接口中的run()方法是void的了讨,沒有返回值。submit(Runnable)如果任務(wù)執(zhí)行完成,future.get()方法會(huì)返回一個(gè)null量蕊;submit(Callable)任務(wù)執(zhí)行完成铺罢,future.get()方法會(huì)返回Callable任務(wù)的執(zhí)行結(jié)果。注意残炮,future.get()方法會(huì)產(chǎn)生阻塞韭赘。
- ExecutorService的關(guān)閉
當(dāng)我們使用完成ExecutorService之后應(yīng)該關(guān)閉它,否則它里面的線程會(huì)一直處于運(yùn)行狀態(tài)势就。
舉個(gè)例子泉瞻,如果的應(yīng)用程序是通過main()方法啟動(dòng)的,在這個(gè)main()退出之后苞冯,如果應(yīng)用程序中的ExecutorService沒有關(guān)閉袖牙,這個(gè)應(yīng)用將一直運(yùn)行。之所以會(huì)出現(xiàn)這種情況舅锄,是因?yàn)镋xecutorService中運(yùn)行的線程會(huì)阻止JVM關(guān)閉鞭达。
如果要關(guān)閉ExecutorService中執(zhí)行的線程,我們可以調(diào)用ExecutorService.shutdown()方法皇忿。在調(diào)用shutdown()方法之后畴蹭,ExecutorService不會(huì)立即關(guān)閉,但是它不再接收新的任務(wù)鳍烁,直到當(dāng)前所有線程執(zhí)行完成才會(huì)關(guān)閉叨襟,所有在shutdown()執(zhí)行之前提交的任務(wù)都會(huì)被執(zhí)行。
如果我們想立即關(guān)閉ExecutorService幔荒,我們可以調(diào)用ExecutorService.shutdownNow()方法糊闽。這個(gè)動(dòng)作將跳過所有正在執(zhí)行的任務(wù)和被提交還沒有執(zhí)行的任務(wù)。但是它并不對(duì)正在執(zhí)行的任務(wù)做任何保證爹梁,有可能它們都會(huì)停止右犹,也有可能執(zhí)行完成。
- Future(獲取異步計(jì)算結(jié)果)
方法 | 說(shuō)明 |
---|---|
boolean cancel(boolean mayInterruptIfRunning) | 取消任務(wù)的執(zhí)行卫键,如果任務(wù)已經(jīng)完成傀履,則會(huì)取消失敗莉炉;如果任務(wù)取消成功了,isDone總會(huì)返回true |
boolean isCancelled() | 任務(wù)是否已取消碴犬,任務(wù)正常完成前將其取消絮宁,返回 true |
boolean isDone() | 任務(wù)是否已完成,任務(wù)正常終止服协、異成馨海或取消,返回true |
V get() | 等待任務(wù)結(jié)束,然后獲取V類型的結(jié)果窘游,這個(gè)方法會(huì)產(chǎn)生阻塞唠椭,會(huì)一直等到任務(wù)執(zhí)行完畢才返回;任務(wù)已取消忍饰,拋出CancellationException贪嫂; |
V get(long timeout, TimeUnit unit) | 獲取結(jié)果,設(shè)置超時(shí)時(shí)間 |
也就是說(shuō)Future提供了三種功能:
- 判斷任務(wù)是否完成艾蓝;
- 能夠中斷任務(wù)力崇;
- 能夠獲取任務(wù)執(zhí)行結(jié)果。
因?yàn)镕uture只是一個(gè)接口赢织,所以是無(wú)法直接用來(lái)創(chuàng)建對(duì)象使用的亮靴,因此就有了下面的FutureTask。
- FutureTask
可以看出RunnableFuture繼承了Runnable接口和Future接口于置,而FutureTask實(shí)現(xiàn)了RunnableFuture接口茧吊。所以它既可以作為Runnable被線程執(zhí)行,又可以作為Future得到Callable的返回值八毯。
當(dāng)FutureTask處于未啟動(dòng)或者是已啟動(dòng)狀態(tài)時(shí)搓侄,此時(shí)還未得到線程執(zhí)行結(jié)果,調(diào)用FutureTask.get方法會(huì)導(dǎo)致線程阻塞宪彩;
當(dāng)FutureTask處于已完成狀態(tài)時(shí)休讳,此時(shí)已經(jīng)得到線程執(zhí)行結(jié)果,調(diào)用FutureTask.get方法會(huì)立即返回線程執(zhí)行結(jié)果尿孔;
當(dāng)FutureTask處于未啟動(dòng)狀態(tài)時(shí)俊柔,調(diào)用FutureTask.cancel方法將會(huì)導(dǎo)致該task永遠(yuǎn)不會(huì)被執(zhí)行;
當(dāng)FutureTask處于啟動(dòng)狀態(tài)時(shí)活合,調(diào)用FutureTask.cancel方法將會(huì)中斷該任務(wù)的執(zhí)行雏婶,至于會(huì)不會(huì)對(duì)任務(wù)產(chǎn)生影響由cancel方法的入?yún)Q定;
當(dāng)FutureTask處于已完成狀態(tài)白指,調(diào)用FutureTask.cancel方法返回false
FutureTask提供了2個(gè)構(gòu)造器:
public FutureTask(Callable<V> callable)
public FutureTask(Runnable runnable, V result)
當(dāng)構(gòu)造方法傳入?yún)?shù)為Runnable留晚,會(huì)通過Executors.callable方法將其轉(zhuǎn)換成Callable。
下面的例子覺得比較經(jīng)典告嘲,引用來(lái)加深理解错维。
- FutureTask高并發(fā)環(huán)境下應(yīng)用:
FutureTask在高并發(fā)環(huán)境下確保任務(wù)只執(zhí)行一次。在很多高并發(fā)的環(huán)境下橄唬,往往我們只需要某些任務(wù)只執(zhí)行一次赋焕。這種使用情景FutureTask的特性恰能勝任。舉一個(gè)例子仰楚,假設(shè)有一個(gè)帶key的連接池隆判,當(dāng)key存在時(shí)犬庇,即直接返回key對(duì)應(yīng)的對(duì)象;當(dāng)key不存在時(shí)侨嘀,則創(chuàng)建連接臭挽。對(duì)于這樣的應(yīng)用場(chǎng)景,通常采用的方法為使用一個(gè)Map對(duì)象來(lái)存儲(chǔ)key和連接池對(duì)應(yīng)的對(duì)應(yīng)關(guān)系咬腕,典型的代碼如下面所示:
private Map<String, Connection> connectionPool = new HashMap<String, Connection>();
private ReentrantLock lock = new ReentrantLock();
public Connection getConnection(String key) {
try {
lock.lock();
if (connectionPool.containsKey(key)) {
return connectionPool.get(key);
} else {
//創(chuàng)建 Connection
Connection conn = createConnection();
connectionPool.put(key, conn);
return conn;
}
} finally {
lock.unlock();
}
}
//創(chuàng)建Connection
private Connection createConnection() {
return null;
}
在上面的例子中欢峰,我們通過加鎖確保高并發(fā)環(huán)境下的線程安全,也確保了connection只創(chuàng)建一次郎汪,然而確犧牲了性能赤赊。改用ConcurrentHash的情況下,幾乎可以避免加鎖的操作煞赢,性能大大提高抛计,但是在高并發(fā)的情況下有可能出現(xiàn)Connection被創(chuàng)建多次的現(xiàn)象照筑。這時(shí)最需要解決的問題就是當(dāng)key不存在時(shí)吹截,創(chuàng)建Connection的動(dòng)作能放在connectionPool之后執(zhí)行,這正是FutureTask發(fā)揮作用的時(shí)機(jī)波俄,基于ConcurrentHashMap和FutureTask的改造代碼如下:
private ConcurrentHashMap<String, FutureTask<Connection>> connectionPool = new ConcurrentHashMap<String, FutureTask<Connection>>();
public Connection getConnection(String key) throws Exception {
FutureTask<Connection> connectionTask = connectionPool.get(key);
if (connectionTask != null) {
return connectionTask.get();
} else {
Callable<Connection> callable = new Callable<Connection>() {
@Override
public Connection call() throws Exception {
// TODO Auto-generated method stub
return createConnection();
}
};
FutureTask<Connection> newTask = new FutureTask<Connection>(callable);
connectionTask = connectionPool.putIfAbsent(key, newTask);
if (connectionTask == null) {
connectionTask = newTask;
// FutureTask 保證回調(diào)任務(wù)只執(zhí)行一次冬念,在這里的應(yīng)用還是比較巧妙的
connectionTask.run();
}
return connectionTask.get();
}
}
//創(chuàng)建Connection
private Connection createConnection() {
return null;
}
經(jīng)過這樣的改造瀑构,可以避免由于并發(fā)帶來(lái)的多次創(chuàng)建連接及鎖的出現(xiàn)呻征。
- 示例代碼
// 測(cè)試代碼 僅供參考
@Test
public void javaFutureTest() {
System.out.println("main start " + Thread.currentThread());
// 創(chuàng)建工作線程池
ExecutorService service = Executors.newCachedThreadPool();
// 子線程任務(wù)1
Future<Integer> integerTask = service.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
System.out.println("integerTask start " + Thread.currentThread());
Thread.sleep(100);
System.out.println("integerTask end " + Thread.currentThread());
return new Random().nextInt(100);
}
});
// 子線程任務(wù)2
Future<String> stringTask = service.submit(new Callable<String>() {
@Override
public String call() throws Exception {
System.out.println("stringTask start " + Thread.currentThread());
Thread.sleep(50);
System.out.println("stringTask end " + Thread.currentThread());
return " ok";
}
});
// 主線程其他任務(wù)
for (int i = 0 ;i < 4 ;i++){
System.out.println("main other task " + i + " " + Thread.currentThread());
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (integerTask.isDone() && !integerTask.isCancelled()) {
try {
System.out.println("integerTask done " + "result:" + integerTask.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
if (stringTask.isDone() && !stringTask.isCancelled()) {
try {
System.out.println("stringTask done " + "result:" + stringTask.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
// 關(guān)閉線程池
service.shutdown();
System.out.println("main end " + Thread.currentThread());
}
輸出結(jié)果
main start Thread[main,5,main]
main other task 0 Thread[main,5,main]
integerTask start Thread[pool-1-thread-1,5,main]
stringTask start Thread[pool-1-thread-2,5,main]
main other task 1 Thread[main,5,main]
stringTask end Thread[pool-1-thread-2,5,main]
integerTask end Thread[pool-1-thread-1,5,main]
stringTask done result: ok
main other task 2 Thread[main,5,main]
integerTask done result:86
stringTask done result: ok
main other task 3 Thread[main,5,main]
integerTask done result:86
stringTask done result: ok
main end Thread[main,5,main]
可以看到stringTask 和integerTask 兩個(gè)子任務(wù)分別開了兩個(gè)子線程執(zhí)行思杯,子線程執(zhí)行過程中,主線程繼續(xù)做其他任務(wù)君旦,因?yàn)閟tringTask 耗時(shí)短先執(zhí)行完琅绅。可以看到主線程中為了得到回調(diào)結(jié)果需要一直去檢查isDone煤痕,這種寫法是很不推薦的,后續(xù)會(huì)有其他方法來(lái)監(jiān)聽futureTask的執(zhí)行結(jié)果扫夜,思想是開啟一個(gè)新的異步線程去監(jiān)聽這個(gè)子線程的執(zhí)行結(jié)果棍厂。
Guava future
ListenableFuture是可以監(jiān)聽的Future张漂,它是對(duì)java原生Future的擴(kuò)展增強(qiáng)垢油。Future表示一個(gè)異步計(jì)算任務(wù),當(dāng)任務(wù)完成時(shí)可以得到計(jì)算結(jié)果正压。如果希望計(jì)算完成時(shí)馬上就拿到結(jié)果展示給用戶或者做另外的計(jì)算嘉裤,就必須使用另一個(gè)線程不斷的查詢計(jì)算狀態(tài)典奉。這樣做會(huì)使得代碼復(fù)雜杆麸,且效率低下串慰。如果使用ListenableFuture,Guava會(huì)幫助檢測(cè)Future是否完成了滔以,如果完成就自動(dòng)調(diào)用回調(diào)函數(shù)呈队,這樣可以減少并發(fā)程序的復(fù)雜度。
- MoreExecutors
該類是final類型的工具類撼唾,提供了很多靜態(tài)方法猴伶。例如listeningDecorator方法初始化ListeningExecutorService方法办桨,使用此實(shí)例submit方法即可初始化ListenableFuture對(duì)象。
//裝飾一個(gè)自己的線程池返回
ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
- ListeningExecutorService
該類是對(duì)ExecutorService的擴(kuò)展站辉,重寫ExecutorService類中的submit方法呢撞,返回ListenableFuture對(duì)象。
ListenableFuture<String> task =executorService .submit(() -> "hello");
- ListenableFuture
該接口擴(kuò)展了Future接口饰剥,增加了addListener方法殊霞,該方法在給定的excutor上注冊(cè)一個(gè)監(jiān)聽器,當(dāng)計(jì)算完成時(shí)會(huì)馬上調(diào)用該監(jiān)聽器汰蓉。不能夠確保監(jiān)聽器執(zhí)行的順序绷蹲,但可以在計(jì)算完成時(shí)確保馬上被調(diào)用。
task.addListener(() -> System.out.println("world"),executorService );
- Futures
該類提供和很多實(shí)用的靜態(tài)方法以供使用。
Futures.allAsList這個(gè)方法用來(lái)把多個(gè)ListenableFuture組合成一個(gè)祝钢。
ListenableFuture<String> future1 = executorService.submit(() -> "Hello");
ListenableFuture<Integer> future2 = executorService.submit(() -> 2);
ListenableFuture<List<Object>> future = Futures.allAsList(future1, future2);
Futures.transform[Async]這個(gè)方法用于轉(zhuǎn)換返回值比规。
ListenableFuture<String> future1 = executorService.submit(() -> "Hello");
ListenableFuture<Integer> listenableFuture = Futures.transform(future1, String::length, executorService);
System.out.println(listenableFuture.get());
這個(gè)是同步的方法,如果需要異步的執(zhí)行
ListenableFuture<String> future1 = executorService.submit(() -> "Hello");
ListenableFuture<Integer> listenableFuture = Futures.transformAsync(future1, input -> Futures.immediateFuture(input.length()), executorService);
System.out.println(listenableFuture.get());
- 示例代碼
// 測(cè)試代碼 僅供參考
@Test
public void guavaFutureTest() {
System.out.println("main start " + Thread.currentThread());
// 創(chuàng)建工作線程池
ExecutorService service = Executors.newCachedThreadPool();
ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator(service);
// 子線程任務(wù)1
ListenableFuture<Integer> integerTask = listeningExecutorService.submit(() -> {
System.out.println("integerTask start " + Thread.currentThread());
Thread.sleep(100);
System.out.println("integerTask end " + Thread.currentThread());
return new Random().nextInt(100);
});
// 添加監(jiān)聽
Futures.addCallback(integerTask, new FutureCallback<Integer>() {
@Override
public void onSuccess(@Nullable Integer result) {
System.out.println("integerTask done " + "result:" + result + " " +Thread.currentThread());
}
@Override
public void onFailure(Throwable t) {
t.printStackTrace();
}
}, listeningExecutorService);
// 子線程任務(wù)2
ListenableFuture<String> stringTask = listeningExecutorService.submit(() -> {
System.out.println("stringTask start " + Thread.currentThread());
Thread.sleep(50);
System.out.println("stringTask end " + Thread.currentThread());
return " ok";
});
// 添加監(jiān)聽
Futures.addCallback(stringTask, new FutureCallback<String>() {
@Override
public void onSuccess(@Nullable String result) {
System.out.println("stringTask done " + "result:" + result + " " +Thread.currentThread());
}
@Override
public void onFailure(Throwable t) {
t.printStackTrace();
}
}, listeningExecutorService);
// 主線程其他任務(wù)
for (int i = 0 ;i < 4 ;i++){
System.out.println("main other task " + i + " " + Thread.currentThread());
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 關(guān)閉線程池
listeningExecutorService.shutdown();
System.out.println("main end " + Thread.currentThread());
}
輸出結(jié)果:
main start Thread[main,5,main]
integerTask start Thread[pool-1-thread-1,5,main]
stringTask start Thread[pool-1-thread-2,5,main]
main other task 0 Thread[main,5,main]
main other task 1 Thread[main,5,main]
stringTask end Thread[pool-1-thread-2,5,main]
stringTask done result: ok Thread[pool-1-thread-3,5,main]
integerTask end Thread[pool-1-thread-1,5,main]
integerTask done result:63 Thread[pool-1-thread-3,5,main]
main other task 2 Thread[main,5,main]
main other task 3 Thread[main,5,main]
main end Thread[main,5,main]
可以看到結(jié)果和前面java future輸出類似拦英,stringTask 和integerTask 兩個(gè)子任務(wù)分別開了兩個(gè)子線程執(zhí)行蜒什,子線程執(zhí)行過程中,主線程繼續(xù)做其他任務(wù)疤估,因?yàn)閟tringTask 耗時(shí)短先執(zhí)行完灾常。可以看到主線程中為了得到回調(diào)結(jié)果不再需要一直去檢查isDone做裙,有監(jiān)聽函數(shù)獲取子線程的執(zhí)行結(jié)果Futures.addCallback岗憋,開啟一個(gè)新的異步線程Thread[pool-1-thread-3,5,main]去監(jiān)聽這個(gè)子線程的執(zhí)行結(jié)果。
Java future 和 Guava future的對(duì)比
Future 具有局限性锚贱。在實(shí)際應(yīng)用中,當(dāng)需要下載大量圖片或視頻時(shí)关串,可以使用多線程去下載拧廊,提交任務(wù)下載后,可以從多個(gè)Future中獲取下載結(jié)果晋修,由于Future獲取任務(wù)結(jié)果是阻塞的吧碾,所以將會(huì)依次調(diào)用Future.get()方法,這樣的效率會(huì)很低墓卦。很可能第一個(gè)下載速度很慢倦春,則會(huì)拖累整個(gè)下載速度。就像前面說(shuō)的落剪,使用java future需要另起一個(gè)線程去check任務(wù)是否完成睁本,guava相當(dāng)于把這一步幫我們做掉了。
Future主要功能在于獲取任務(wù)執(zhí)行結(jié)果和對(duì)異步任務(wù)的控制忠怖。但如果要獲取批量任務(wù)的執(zhí)行結(jié)果呢堰,從上面的例子我們已經(jīng)可以看到,單使用 Future 是很不方便的凡泣。其主要原因在于:一方面是沒有好的方法去判斷第一個(gè)完成的任務(wù)枉疼;另一方面是 Future的get方法 是阻塞的,使用不當(dāng)會(huì)造成線程的浪費(fèi)鞋拟。第一個(gè)問題可以用 CompletionService 解決骂维,CompletionService 提供了一個(gè) take() 阻塞方法,用以依次獲取所有已完成的任務(wù)贺纲。第二個(gè)問題可以用 Google Guava 庫(kù)所提供的 ListeningExecutorService 和 ListenableFuture 來(lái)解決航闺。除了獲取批量任務(wù)執(zhí)行結(jié)果時(shí)不便,F(xiàn)uture另外一個(gè)不能做的事便是防止任務(wù)的重復(fù)提交哮笆。要做到這件事就需要 Future 最常見的一個(gè)實(shí)現(xiàn)類 FutureTask 了来颤。Future只實(shí)現(xiàn)了異步汰扭,而沒有實(shí)現(xiàn)回調(diào),主線程get時(shí)會(huì)阻塞福铅,可以輪詢以便獲取異步調(diào)用是否完成萝毛。
在實(shí)際的使用中建議使用Guava ListenableFuture代替JDK的 Future來(lái)實(shí)現(xiàn)異步非阻塞,目的就是多任務(wù)異步執(zhí)行滑黔,通過回調(diào)的方方式來(lái)獲取執(zhí)行結(jié)果而不需輪詢?nèi)蝿?wù)狀態(tài)笆包。
關(guān)于Guava的異步還有很多東西需要學(xué)習(xí),比如需要異步任務(wù)的計(jì)算結(jié)果繼續(xù)做計(jì)算的略荡,異步計(jì)算同步獲取結(jié)果(異步轉(zhuǎn)同步)等庵佣,后續(xù)的筆記會(huì)繼續(xù)展開。
參考文章:
https://blog.csdn.net/he90227/article/details/52210490
https://www.zhihu.com/question/19732473/answer/20851256
https://cloud.tencent.com/developer/article/1041329