平時(shí)工作中經(jīng)常碰到個(gè)各種多線(xiàn)程,有時(shí)候搞不清它們之間到底有什么區(qū)別芬失,這次來(lái)個(gè)總體的總結(jié)卷仑,主要是以下這些:
Executor,Executors麸折,ExecutorService, CompletionServie,Future,Callable锡凝,Runnable,F(xiàn)utureTask
一垢啼、Runnable(interface)
public interface Runnable {
public void run();
}
run()方法返回值為void類(lèi)型窜锯,所以在執(zhí)行完任務(wù)之后無(wú)法返回任何結(jié)果。
二芭析、Callable (interface)
public interface Callable<V> {
V call() throws Exception;
}
與 Runnable 不同的是call()函數(shù)返回的類(lèi)型就是傳遞進(jìn)來(lái)的V類(lèi)型锚扎,而且能夠拋出異常。一般情況下是配合ExecutorService來(lái)使用的
三馁启、Future( interface)
Future是對(duì)于具體的Runnable或者Callable任務(wù)的執(zhí)行結(jié)果進(jìn)行取消驾孔、查詢(xún)是否完成芍秆、獲取結(jié)果、設(shè)置結(jié)果操作翠勉。
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
- cancel方法用來(lái)取消任務(wù)妖啥,如果取消任務(wù)成功則返回true,如果取消任務(wù)失敗則返回false对碌。參數(shù)mayInterruptIfRunning表示是否允許取消正在執(zhí)行卻沒(méi)有執(zhí)行完畢的任務(wù)荆虱,如果設(shè)置true,則表示可以取消正在執(zhí)行過(guò)程中的任務(wù)朽们。如果任務(wù)已經(jīng)完成怀读,則無(wú)論mayInterruptIfRunning為true還是false,此方法肯定返回false骑脱,即如果取消已經(jīng)完成的任務(wù)會(huì)返回false菜枷;如果任務(wù)正在執(zhí)行,若mayInterruptIfRunning設(shè)置為true叁丧,則返回true犁跪,若mayInterruptIfRunning設(shè)置為false,則返回false歹袁;如果任務(wù)還沒(méi)有執(zhí)行坷衍,則無(wú)論mayInterruptIfRunning為true還是false,肯定返回true条舔。
- isCancelled方法表示任務(wù)是否被取消成功枫耳,如果在任務(wù)正常完成前被取消成功,則返回 true孟抗。
- isDone方法表示任務(wù)是否已經(jīng)完成迁杨,若任務(wù)完成,則返回true凄硼;
- get()方法用來(lái)獲取執(zhí)行結(jié)果铅协,這個(gè)方法會(huì)產(chǎn)生阻塞,會(huì)一直等到任務(wù)執(zhí)行完畢才返回摊沉;
- get(long timeout, TimeUnit unit)用來(lái)獲取執(zhí)行結(jié)果狐史,如果在指定時(shí)間內(nèi),還沒(méi)獲取到結(jié)果说墨,就直接返回null骏全。
也就是說(shuō)Future提供了三種功能:
- 判斷任務(wù)是否完成;
- 能夠中斷任務(wù)尼斧;
- 能夠獲取任務(wù)執(zhí)行結(jié)果姜贡。
四、FutureTask(Runnable棺棵, Future<V>的具體實(shí)現(xiàn))
public class FutureTask<V> implements RunnableFuture<V> 楼咳。熄捍。。
public interface RunnableFuture<V> extends Runnable, Future<V> {
/**
* Sets this Future to the result of its computation
* unless it has been cancelled.
*/
void run();
}
可以看出RunnableFuture繼承了Runnable接口和Future接口母怜,而FutureTask實(shí)現(xiàn)了RunnableFuture接口余耽。所以它既可以作為Runnable被線(xiàn)程執(zhí)行,又可以作為Future得到Callable的返回值,管理任務(wù)糙申。
其中有兩個(gè)構(gòu)造方法
//接受一個(gè) Callable 參數(shù)
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
//接受一個(gè) Runnable 宾添,利用 Executors.callable 將Runnable 轉(zhuǎn)換為Callable
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
具體使用可以參考 AsyncTask 中的使用
五船惨、Executor(interface)
在Executor框架中柜裸,使用執(zhí)行器(Exectuor)來(lái)管理Thread對(duì)象,從而簡(jiǎn)化了并發(fā)編程粱锐。并發(fā)編程的一種編程方式把任務(wù)拆分為一系列的小任務(wù)疙挺,即Runnable,然后將這些任務(wù)提交給一個(gè)Executor執(zhí)行怜浅,Executor.execute(Runnalbe) 铐然。Executor在執(zhí)行時(shí)使用其內(nèi)部的線(xiàn)程池來(lái)完成操作。
Executor 接口中之定義了一個(gè)方法 execute(Runnable command)恶座,該方法接收一個(gè) Runable 實(shí)例搀暑,它用來(lái)執(zhí)行一個(gè)任務(wù),任務(wù)即一個(gè)實(shí)現(xiàn)了 Runnable 接口的類(lèi)跨琳。
public interface Executor {
void execute(Runnable command);
}
為了避免調(diào)用 new Thread(new RunnableTask()).start()這樣的代碼我們可以
Executor executor = anExecutor;
executor.execute(new RunnableTask1());
executor.execute(new RunnableTask2());
...
Executor 并不是嚴(yán)格的要求一步執(zhí)行自点,我們可以簡(jiǎn)單的直接在調(diào)用者線(xiàn)程執(zhí)行運(yùn)行提交的任務(wù)
class DirectExecutor implements Executor {
public void execute(Runnable r) {
r.run(); // 在調(diào)用者線(xiàn)程執(zhí)行
}}
一般來(lái)說(shuō)任務(wù)在非調(diào)用者的線(xiàn)程中執(zhí)行,比如產(chǎn)生一個(gè)新的線(xiàn)程
class ThreadPerTaskExecutor implements Executor {
public void execute(Runnable r) {
new Thread(r).start(); //新啟一個(gè)線(xiàn)程脉让,在非調(diào)用者線(xiàn)程中執(zhí)行
}}
有很多Executor 的實(shí)現(xiàn)是為了實(shí)現(xiàn)任務(wù)的某種調(diào)度桂敛,比如 AsyncTask 中的串行任務(wù)隊(duì)列
class SerialExecutor implements Executor {
final Queue tasks = new ArrayDeque<>();
final Executor executor;
Runnable active;
SerialExecutor(Executor executor) {
this.executor = executor;
public synchronized void execute(final Runnable r) {
tasks.add(new Runnable() {
public void run() {
try {
r.run();
} finally {
scheduleNext();
}
}
});
if (active == null) {
scheduleNext();
}
}
protected synchronized void scheduleNext() {
if ((active = tasks.poll()) != null) {
executor.execute(active);
}
}
}}
六、ExecutorService(interface溅潜,繼承自Executor)
ExecutorService 接口繼承自 Executor 接口术唬,它提供了更豐富的實(shí)現(xiàn)多線(xiàn)程的方法,比如滚澜,ExecutorService 提供了關(guān)閉自己的方法粗仓,以及可為跟蹤一個(gè)或多個(gè)異步任務(wù)執(zhí)行狀況而生成 Future 的方法。 可以調(diào)用 ExecutorService 的 shutdown()方法來(lái)平滑地關(guān)閉 ExecutorService设捐,調(diào)用該方法后潦牛,將導(dǎo)致 ExecutorService 停止接受任何新的任務(wù)且等待已經(jīng)提交的任務(wù)執(zhí)行完成(已經(jīng)提交的任務(wù)會(huì)分兩類(lèi):一類(lèi)是已經(jīng)在執(zhí)行的,另一類(lèi)是還沒(méi)有開(kāi)始執(zhí)行的)挡育,當(dāng)所有已經(jīng)提交的任務(wù)執(zhí)行完畢后將會(huì)關(guān)閉 ExecutorService巴碗。因此我們一般用該接口來(lái)實(shí)現(xiàn)和管理多線(xiàn)程。
execute(Runnable)
submit(Runnable)
submit(Callable)
invokeAny()
invokeAll()
execute(Runnable)
方法 execute(Runnable) 接收一個(gè)java.lang.Runnable 對(duì)象作為參數(shù)即寒,并且以異步的方式執(zhí)行它橡淆。
ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.execute(new Runnable() {
public void run() {
System.out.println("Asynchronous task");
}
});
executorService.shutdown();
使用這種方式?jīng)]有辦法獲取執(zhí)行 Runnable 之后的結(jié)果召噩,如果你希望獲取運(yùn)行之后的返回值,就必須使用接收 Callable 參數(shù)的 execute() 方法逸爵。
submit(Runnable)
方法 submit(Runnable) 同樣接收一個(gè)Runnable 的實(shí)現(xiàn)作為參數(shù)具滴,但是會(huì)返回一個(gè)Future 對(duì)象。這個(gè)Future 對(duì)象可以用于判斷 Runnable 是否結(jié)束執(zhí)行师倔。如下是一個(gè)ExecutorService 的 submit() 方法的例子:
Future future = executorService.submit(new Runnable() {
public void run() {
System.out.println("Asynchronous task");
}
});
//如果任務(wù)結(jié)束執(zhí)行則返回 null
System.out.println("future.get()=" + future.get());
submit(Callable)
方法 submit(Callable) 和方法 submit(Runnable) 比較類(lèi)似构韵,但是區(qū)別則在于它們接收不同的參數(shù)類(lèi)型。Callable 的實(shí)例與 Runnable 的實(shí)例很類(lèi)似趋艘,但是 Callable 的 call() 方法可以返回一個(gè)結(jié)果疲恢。方法 Runnable.run() 則不能返回結(jié)果。
Callable 的返回值可以從方法 submit(Callable) 返回的 Future 對(duì)象中獲取瓷胧。如下是一個(gè) ExecutorService Callable 的樣例:
Future future = executorService.submit(new Callable(){
public Object call() throws Exception {
System.out.println("Asynchronous Callable");
return "Callable Result";
}
});
System.out.println("future.get() = " + future.get());
輸出結(jié)果
Asynchronous Callable
future.get() = Callable Result
inVokeAny()
方法 invokeAny() 接收一個(gè)包含 Callable 對(duì)象的集合作為參數(shù)显拳。調(diào)用該方法不會(huì)返回 Future 對(duì)象,而是返回集合中某一個(gè)Callable 對(duì)象的結(jié)果搓萧,而且無(wú)法保證調(diào)用之后返回的結(jié)果是哪一個(gè) Callable杂数,只知道它是這些 Callable 中一個(gè)執(zhí)行結(jié)束的 Callable 對(duì)象。如果一個(gè)任務(wù)運(yùn)行完畢或者拋出異常瘸洛,方法會(huì)取消其它的 Callable 的執(zhí)行揍移。
以下是一個(gè)樣例:
ExecutorService executorService = Executors.newSingleThreadExecutor();
Set<Callable<String>> callables = new HashSet<Callable<String>>();
callables.add(new Callable<String>() {
public String call() throws Exception {
return "Task 1";
}
});
callables.add(new Callable<String>() {
public String call() throws Exception {
return "Task 2";
}
});
callables.add(new Callable<String>() {
public String call() throws Exception {
return "Task 3";
}
});
String result = executorService.invokeAny(callables);
System.out.println("result = " + result);
executorService.shutdown();
輸出結(jié)果:
以上樣例代碼會(huì)打印出在給定的集合中的某一個(gè)Callable 的返回結(jié)果。嘗試運(yùn)行后發(fā)現(xiàn)每次結(jié)果都在改變反肋。有時(shí)候返回結(jié)果是"Task 1"那伐,有時(shí)候是"Task 2",等等囚玫。
invokeAll()
方法 invokeAll() 會(huì)調(diào)用存在于參數(shù)集合中的所有 Callable 對(duì)象喧锦,并且返回一個(gè)包含 Future 對(duì)象的集合,你可以通過(guò)這個(gè)返回的集合來(lái)管理每個(gè) Callable 的執(zhí)行結(jié)果抓督。需要注意的是燃少,任務(wù)有可能因?yàn)楫惓6鴮?dǎo)致運(yùn)行結(jié)束,所以它可能并不是真的成功運(yùn)行了铃在。但是我們沒(méi)有辦法通過(guò) Future 對(duì)象來(lái)了解到這個(gè)差異阵具。
ExecutorService服務(wù)的關(guān)閉(shutdown() 或 shutdownNow())
當(dāng)使用 ExecutorService 完畢之后,我們應(yīng)該關(guān)閉它定铜,這樣才能保證線(xiàn)程不會(huì)繼續(xù)保持運(yùn)行狀態(tài)阳液。
舉例來(lái)說(shuō),如果你的程序通過(guò) main() 方法啟動(dòng)揣炕,并且主線(xiàn)程退出了你的程序帘皿,
如果還有一個(gè)活動(dòng)的 ExecutorService 存在于程序中,那么程序?qū)?huì)繼續(xù)保持運(yùn)行狀態(tài)畸陡。存在于 ExecutorService 中的活動(dòng)線(xiàn)程會(huì)阻止Java虛擬機(jī)關(guān)閉鹰溜。
為了關(guān)閉在 ExecutorService 中的線(xiàn)程虽填,需要調(diào)用** shutdown() **方法。但ExecutorService 并不會(huì)馬上關(guān)閉曹动,而是不再接收新的任務(wù)斋日,一旦所有的線(xiàn)程結(jié)束執(zhí)行當(dāng)前任務(wù),ExecutorServie 才會(huì)真的關(guān)閉墓陈。所有在調(diào)用 shutdown() 方法之前提交到 ExecutorService 的任務(wù)都會(huì)執(zhí)行恶守。
如果你希望立即關(guān)閉 ExecutorService,你可以調(diào)用** shutdownNow() **方法贡必。這個(gè)方法會(huì)嘗試馬上關(guān)閉所有正在執(zhí)行的任務(wù)兔港,并且跳過(guò)所有已經(jīng)提交但是還沒(méi)有運(yùn)行的任務(wù)。但是對(duì)于正在執(zhí)行的任務(wù)赊级,是否能夠成功關(guān)閉它是無(wú)法保證的押框,有可能他們真的被關(guān)閉掉了岔绸,也有可能它會(huì)一直執(zhí)行到任務(wù)結(jié)束理逊。這是一個(gè)最好的嘗試。
ExecutorService 接口在 java.util.concurrent 包中有如下實(shí)現(xiàn)類(lèi):
ThreadPoolExecutor
ScheduledThreadPoolExecutor
七盒揉、Executors(class)
Executors 提供了一系列工廠方法用于創(chuàng)先線(xiàn)程池晋被,返回的線(xiàn)程池都實(shí)現(xiàn)了 ExecutorService 接口。
public static ExecutorService newFixedThreadPool(int nThreads)
// 創(chuàng)建固定數(shù)目線(xiàn)程的線(xiàn)程池刚盈。
public static ExecutorService newCachedThreadPool()
// 創(chuàng)建一個(gè)可緩存的線(xiàn)程池羡洛,調(diào)用execute將重用以前構(gòu)造的線(xiàn)程(如果線(xiàn)程可用)。
// 如果現(xiàn)有線(xiàn)程沒(méi)有可用的藕漱,則創(chuàng)建一個(gè)新線(xiàn) 程并添加到池中欲侮。
// 終止并從緩存中移除那些已有 60 秒鐘未被使用的線(xiàn)程。
public static ExecutorService newSingleThreadExecutor()
// 創(chuàng)建一個(gè)單線(xiàn)程化的Executor肋联。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
// 創(chuàng)建一個(gè)支持定時(shí)及周期性的任務(wù)執(zhí)行的線(xiàn)程池威蕉,多數(shù)情況下可用來(lái)替代Timer類(lèi)。
Executors的使用:
ExecutorService executorService = Executors.newFixedThreadPool(10);
executorService.execute(new Runnable() {
public void run() {
System.out.println("Asynchronous task");
}
});
executorService.shutdown();
該示例代碼首先使用 newFixedThreadPool() 工廠方法創(chuàng)建一個(gè)ExecutorService 橄仍,上述代碼創(chuàng)建了一個(gè)可以容納10個(gè)線(xiàn)程任務(wù)的線(xiàn)程池韧涨。其次,向 execute() 方法中傳遞一個(gè)異步的 Runnable 接口的實(shí)現(xiàn)侮繁,這樣做會(huì)讓 ExecutorService 中的某個(gè)線(xiàn)程執(zhí)行這個(gè)Runnable 線(xiàn)程虑粥。
八、CompletionServie
為什么需要CompletionServie
如果你向Executor提交了一個(gè)批處理任務(wù)宪哩,并且希望在它們完成后獲得結(jié)果娩贷。為此你可以保存與每個(gè)任務(wù)相關(guān)聯(lián)的Future,然后不斷地調(diào)用timeout為零的get锁孟,來(lái)檢驗(yàn)Future是否完成彬祖。這樣做固然可以但荤,但卻相當(dāng)乏味。幸運(yùn)的是涧至,還有一個(gè)更好的方法:完成服務(wù)(Completion service)腹躁。
CompletionService整合了Executor和BlockingQueue的功能。你可以將Callable任務(wù)提交給它去執(zhí)行南蓬,然后使用類(lèi)似于隊(duì)列中的take和poll方法纺非,在結(jié)果完整可用時(shí)獲得這個(gè)結(jié)果,像一個(gè)打包的Future赘方。ExecutorCompletionService是實(shí)現(xiàn)CompletionService接口的一個(gè)類(lèi)烧颖,并將計(jì)算任務(wù)委托給一個(gè)Executor。
CompletionService與ExecutorService的對(duì)比使用
public class Main {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// case1();
// case2();
case3();
}
/**
* <一>
* 1. 用List收集任務(wù)結(jié)果 (List記錄每個(gè)submit返回的Future)
* 2. 循環(huán)查看結(jié)果, Future不一定完成, 如果沒(méi)有完成, 那么調(diào)用get會(huì)租塞
* 3. 如果排在前面的任務(wù)沒(méi)有完成, 那么就會(huì)阻塞, 這樣后面已經(jīng)完成的任務(wù)就沒(méi)法獲得結(jié)果了, 導(dǎo)致了不必要的等待時(shí)間.
* 更為嚴(yán)重的是: 第一個(gè)任務(wù)如果幾個(gè)小時(shí)或永遠(yuǎn)完成不了, 而后面的任務(wù)幾秒鐘就完成了, 那么后面的任務(wù)的結(jié)果都將得不到處理
*
* 導(dǎo)致: 已完成的任務(wù)可能得不到及時(shí)處理
*/
private static void case1() throws ExecutionException, InterruptedException {
final Random random = new Random();
ExecutorService service = Executors.newFixedThreadPool(10);
List<Future<String>> taskResultHolder = new ArrayList<>();
for(int i=0; i<50; i++) {
//搜集任務(wù)結(jié)果
taskResultHolder.add(service.submit(new Callable<String>() {
public String call() throws Exception {
Thread.sleep(random.nextInt(5000));
return Thread.currentThread().getName();
}
}));
}
// 處理任務(wù)結(jié)果
int count = 0;
System.out.println("handle result begin");
for(Future<String> future : taskResultHolder) {
System.out.println(future.get());
count++;
}
System.out.println("handle result end");
System.out.println(count + " task done !");
//關(guān)閉線(xiàn)程池
service.shutdown();
}
/**
* <二> 只對(duì)第一種情況進(jìn)行的改進(jìn)
* 1. 查看任務(wù)是否完成, 如果完成, 就獲取任務(wù)的結(jié)果, 讓后重任務(wù)列表中刪除任務(wù).
* 2. 如果任務(wù)未完成, 就跳過(guò)此任務(wù), 繼續(xù)查看下一個(gè)任務(wù)結(jié)果.
* 3. 如果到了任務(wù)列表末端, 那么就從新回到任務(wù)列表開(kāi)始, 然后繼續(xù)從第一步開(kāi)始執(zhí)行
*
* 這樣就可以及時(shí)處理已完成任務(wù)的結(jié)果了
*/
private static void case2() throws ExecutionException, InterruptedException {
final Random random = new Random();
ExecutorService service = Executors.newFixedThreadPool(10);
List<Future<String>> results = new ArrayList<>();
for(int i=0; i<50; i++) {
Callable<String> task = new Callable<String>() {
public String call() throws Exception {
Thread.sleep(random.nextInt(5000)); //模擬耗時(shí)操作
return Thread.currentThread().getName();
}
};
Future<String> future = service.submit(task);
results.add(future); // 搜集任務(wù)結(jié)果
}
int count = 0;
//自旋, 獲取結(jié)果
System.out.println("handle result begin");
for(int i=0; i<results.size(); i++) {
Future<String> taskHolder = results.get(i);
if(taskHolder.isDone()) { //任務(wù)完成
String result = taskHolder.get(); //獲取結(jié)果, 進(jìn)行某些操作
System.out.println("result: " + result);
results.remove(taskHolder);
i--;
count++; //完成的任務(wù)的計(jì)數(shù)器
}
//回到列表開(kāi)頭, 從新獲取結(jié)果
if(i == results.size() - 1) i = -1;
}
System.out.println("handle result end");
System.out.println(count + " task done !");
//線(xiàn)程池使用完必須關(guān)閉
service.shutdown();
}
/**
* <三> 使用ExecutorCompletionService管理異步任務(wù)
* 1. Java中的ExecutorCompletionService<V>本身有管理任務(wù)隊(duì)列的功能
* i. ExecutorCompletionService內(nèi)部維護(hù)列一個(gè)隊(duì)列, 用于管理已完成的任務(wù)
* ii. 內(nèi)部還維護(hù)列一個(gè)Executor, 可以執(zhí)行任務(wù)
*
* 2. ExecutorCompletionService內(nèi)部維護(hù)了一個(gè)BlockingQueue, 只有完成的任務(wù)才被加入到隊(duì)列中
*
* 3. 任務(wù)一完成就加入到內(nèi)置管理隊(duì)列中, 如果隊(duì)列中的數(shù)據(jù)為空時(shí), 調(diào)用take()就會(huì)阻塞 (等待任務(wù)完成)
* i. 關(guān)于完成任務(wù)是如何加入到完成隊(duì)列中的, 請(qǐng)參考ExecutorCompletionService的內(nèi)部類(lèi)QueueingFuture的done()方法
*
* 4. ExecutorCompletionService的take/poll方法是對(duì)BlockingQueue對(duì)應(yīng)的方法的封裝, 關(guān)于BlockingQueue的take/poll方法:
* i. take()方法, 如果隊(duì)列中有數(shù)據(jù), 就返回?cái)?shù)據(jù), 否則就一直阻塞;
* ii. poll()方法: 如果有值就返回, 否則返回null
* iii. poll(long timeout, TimeUnit unit)方法: 如果有值就返回, 否則等待指定的時(shí)間; 如果時(shí)間到了如果有值, 就返回值, 否則返回null
*
* 解決了已完成任務(wù)得不到及時(shí)處理的問(wèn)題
*/
static void case3() throws InterruptedException, ExecutionException {
Random random = new Random();
ExecutorService service = Executors.newFixedThreadPool(10);
ExecutorCompletionService<String> completionService = new ExecutorCompletionService<String>(service);
for(int i=0; i<50; i++) {
completionService.submit(new Callable<String>() {
@Override
public String call() throws Exception {
Thread.sleep(random.nextInt(5000));
return Thread.currentThread().getName();
}
});
}
int completionTask = 0;
while(completionTask < 50) {
//如果完成隊(duì)列中沒(méi)有數(shù)據(jù), 則阻塞; 否則返回隊(duì)列中的數(shù)據(jù)
Future<String> resultHolder = completionService.take();
System.out.println("result: " + resultHolder.get());
completionTask++;
}
System.out.println(completionTask + " task done !");
//ExecutorService使用完一定要關(guān)閉 (回收資源, 否則系統(tǒng)資源耗盡! .... 呵呵...)
service.shutdown();
}
}
九窄陡、參考
Java并發(fā)編程:Callable炕淮、Future和FutureTask
Java中的Runnable、Callable跳夭、Future涂圆、FutureTask的區(qū)別與示例
Java并發(fā)編程 - Executor,Executors币叹,ExecutorService, CompletionServie,Future,Callable