引言
Executor框架是指JDK 1.5中引入的一系列并發(fā)庫(kù)中與Executor相關(guān)的功能類,包括Executor哼转、Executors明未、ExecutorService、Future壹蔓、Callable等趟妥。
一、為什么要引入Executor框架佣蓉?
1披摄、如果使用new Thread(...).start()的方法處理多線程,有如下缺點(diǎn):
① 開銷大勇凭。對(duì)于JVM來說疚膊,每次新建線程和銷毀線程都會(huì)有很大的開銷。
② 線程缺乏管理虾标。沒有一個(gè)池來限制線程的數(shù)量寓盗,如果并發(fā)量很高,會(huì)創(chuàng)建很多線程璧函,而且線程之間可能會(huì)有相互競(jìng)爭(zhēng)傀蚌,這將會(huì)過多占用系統(tǒng)資源,增加系統(tǒng)資源的消耗量蘸吓。而且線程數(shù)量超過系統(tǒng)負(fù)荷喳张,容易導(dǎo)致系統(tǒng)不穩(wěn)定。
2美澳、使用線程池的方法销部,有如下優(yōu)點(diǎn):
① 線程復(fù)用。通過復(fù)用創(chuàng)建了的線程制跟,減少了線程的創(chuàng)建舅桩、消亡的開銷。
② 有效控制并發(fā)線程數(shù)雨膨。
③ 提供了更簡(jiǎn)單靈活的線程管理擂涛。可以提供定時(shí)執(zhí)行聊记、單線程撒妈、可變線程數(shù)等多種使用功能。
二排监、Executor框架的UML圖
三狰右、下面開始分析一下Executor框架中幾個(gè)比較重要的接口和類。
1舆床、Callable
Callable位于java.util.concurrent包下棋蚌,它是一個(gè)接口嫁佳,只聲明了一個(gè)call()方法。
Callable接口類似于Runnable谷暮,兩者都是為了可能在線程中執(zhí)行的類而設(shè)計(jì)的蒿往。和Runnable接口中的run()方法類似,Callable 提供的call()方法作為線程的執(zhí)行體湿弦。但是call()比run()方法更強(qiáng)大瓤漏,體現(xiàn)在
① call方法有返回值。
② call方法可以聲明拋出異常颊埃。
2蔬充、Future
Future 接口位于java.util.concurrent包下,是Java 1.5中引入的接口竟秫。
Future主要用來對(duì)具體的Runnable或Callable任務(wù)的執(zhí)行結(jié)果進(jìn)行取消、查詢是否完成跷乐、獲取結(jié)果肥败。必要時(shí)可以通過get()方法獲取執(zhí)行結(jié)果,get()方法會(huì)阻塞知道任務(wù)返回結(jié)果愕提。
當(dāng)你提交一個(gè)Callable對(duì)象給線程池時(shí)馒稍,將得到一個(gè)Future對(duì)象,并且它和你傳入的Callable示例有相同泛型浅侨。
Future 接口中的5個(gè)方法:
public interface Future<V> {
//用來取消任務(wù)
//參數(shù)mayInterruptIfRunning表示是否允許取消正在執(zhí)行卻沒有執(zhí)行完畢的任務(wù)纽谒。
boolean cancel(boolean mayInterruptIfRunning);
//表示任務(wù)是否被取消成功
boolean isCancelled();
//表示任務(wù)是否已經(jīng)完成
boolean isDone();
//用來獲取執(zhí)行結(jié)果,這個(gè)方法會(huì)產(chǎn)生阻塞如输,會(huì)一直等到任務(wù)執(zhí)行完畢才返回鼓黔;
V get()
//用來獲取執(zhí)行結(jié)果,如果在指定時(shí)間內(nèi)不见,還沒獲取到結(jié)果澳化,會(huì)拋出TimeoutException異常。
V get(long timeout, TimeUnit unit)
}
Future提供了三種功能:
① 判斷任務(wù)是否完成
② 能夠中斷任務(wù)
③ 能夠獲取任務(wù)執(zhí)行結(jié)果
3稳吮、Executor
Executor是一個(gè)接口缎谷,它將任務(wù)的提交與任務(wù)的執(zhí)行分離開來,定義了一個(gè)接收Runnable對(duì)象的方法executor灶似。Executor是Executor框架中最基礎(chǔ)的一個(gè)接口列林,類似于集合中的Collection接口。
4酪惭、ExecutorService
ExecutorService繼承了Executor希痴,是一個(gè)比Executor使用更廣泛的子類接口。定義了終止任務(wù)春感、提交任務(wù)润梯、跟蹤任務(wù)返回結(jié)果等方法。
一個(gè)ExecutorService是可以關(guān)閉的,關(guān)閉之后它將不能再接收任何任務(wù)纺铭,對(duì)于不在使用的ExecutorService寇钉,應(yīng)該將其關(guān)閉以釋放資源。
ExecutorService方法介紹:
package java.util.concurrent;
import java.util.List;
import java.util.Collection;
public interface ExecutorService extends Executor {
/**
* 平滑地關(guān)閉線程池舶赔,已經(jīng)提交到線程池中的任務(wù)會(huì)繼續(xù)執(zhí)行完扫倡。
*/
void shutdown();
/**
* 立即關(guān)閉線程池,返回還沒有開始執(zhí)行的任務(wù)列表竟纳。
* 會(huì)嘗試中斷正在執(zhí)行的任務(wù)(每個(gè)線程調(diào)用 interruput方法)撵溃,但這個(gè)行為不一定會(huì)成功。
*/
List<Runnable> shutdownNow();
/**
* 判斷線程池是否已經(jīng)關(guān)閉
*/
boolean isShutdown();
/**
* 判斷線程池的任務(wù)是否已經(jīng)執(zhí)行完畢锥累。
* 注意此方法調(diào)用之前需要先調(diào)用shutdown()方法或者shutdownNow()方法缘挑,否則總是會(huì)返回false
*/
boolean isTerminated();
/**
* 判斷線程池的任務(wù)是否都執(zhí)行完。
* 如果沒有任務(wù)沒有執(zhí)行完畢則阻塞桶略,直至任務(wù)完成或者達(dá)到了指定的timeout時(shí)間就會(huì)返回
*/
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
/**
* 提交帶有一個(gè)返回值的任務(wù)到線程池中去執(zhí)行(回調(diào))语淘,返回的 Future 表示任務(wù)的待定結(jié)果。
* 當(dāng)任務(wù)成功完成后际歼,通過 Future 實(shí)例的 get() 方法可以獲取該任務(wù)的結(jié)果惶翻。
* Future 的 get() 方法是會(huì)阻塞的。
*/
<T> Future<T> submit(Callable<T> task);
/**
*提交一個(gè)Runnable的任務(wù)鹅心,當(dāng)任務(wù)完成后吕粗,可以通過Future.get()獲取的是提交時(shí)傳遞的參數(shù)T result
*
*/
<T> Future<T> submit(Runnable task, T result);
/**
* 提交一個(gè)Runnable的人無語,它的Future.get()得不到任何內(nèi)容旭愧,它返回值總是Null颅筋。
* 為什么有這個(gè)方法?為什么不直接設(shè)計(jì)成void submit(Runnable task)這種方式输枯?
* 這是因?yàn)镕uture除了get這種獲取任務(wù)信息外垃沦,還可以控制任務(wù),
具體體現(xiàn)在 Future的這個(gè)方法上:boolean cancel(boolean mayInterruptIfRunning)
這個(gè)方法能夠去取消提交的Rannable任務(wù)。
*/
Future<?> submit(Runnable task);
/**
* 執(zhí)行一組給定的Callable任務(wù),返回對(duì)應(yīng)的Future列表捐晶。列表中每一個(gè)Future都將持有該任務(wù)的結(jié)果和狀態(tài)寺谤。
* 當(dāng)所有任務(wù)執(zhí)行完畢后,方法返回,此時(shí)并且每一個(gè)Future的isDone()方法都是true。
* 完成的任務(wù)可能是正常結(jié)束,也可以是異常結(jié)束
* 如果當(dāng)任務(wù)執(zhí)行過程中收夸,tasks集合被修改了,那么方法的返回結(jié)果將是不確定的血崭,
即不能確定執(zhí)行的是修改前的任務(wù)卧惜,還是修改后的任務(wù)
*/
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
/**
* 執(zhí)行一組給定的Callable任務(wù)厘灼,返回對(duì)應(yīng)的Future列表。列表中每一個(gè)Future都將持有該任務(wù)的結(jié)果和狀態(tài)咽瓷。
* 當(dāng)所有任務(wù)執(zhí)行完畢后或者超時(shí)后设凹,方法將返回,此時(shí)并且每一個(gè)Future的isDone()方法都是true茅姜。
* 一旦方法返回闪朱,未執(zhí)行完成的任務(wù)被取消,而完成的任務(wù)可能正常結(jié)束或者異常結(jié)束钻洒,
* 完成的任務(wù)可以是正常結(jié)束奋姿,也可以是異常結(jié)束
* 如果當(dāng)任務(wù)執(zhí)行過程中,tasks集合被修改了素标,那么方法的返回結(jié)果將是不確定的
*/
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
/**
* 執(zhí)行一組給定的Callable任務(wù)称诗,當(dāng)成功執(zhí)行完(沒拋異常)一個(gè)任務(wù)后此方法便返回,返回的是該任務(wù)的結(jié)果
* 一旦此正常返回或者異常結(jié)束头遭,未執(zhí)行的任務(wù)都會(huì)被取消寓免。
* 如果當(dāng)任務(wù)執(zhí)行過程中,tasks集合被修改了任岸,那么方法的返回結(jié)果將是不確定的
*/
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
/**
* 執(zhí)行一組給定的Callable任務(wù)再榄,當(dāng)在timeout(超時(shí))之前成功執(zhí)行完(沒拋異常)一個(gè)任務(wù)后此方法便返回狡刘,返回的是該任務(wù)的結(jié)果
* 一旦此正常返回或者異常結(jié)束享潜,未執(zhí)行的任務(wù)都會(huì)被取消。
* 如果當(dāng)任務(wù)執(zhí)行過程中嗅蔬,tasks集合被修改了剑按,那么方法的返回結(jié)果將是不確定的
*/
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
shutdown() 和 shutdownNow() 是用來關(guān)閉連接池的兩個(gè)方法,而且這兩個(gè)方法都是在當(dāng)前線程立即返回澜术,不會(huì)阻塞至線程池中的方法執(zhí)行結(jié)束艺蝴。調(diào)用這兩個(gè)方法之后,連接池將不能再接受任務(wù)鸟废。
下面給寫幾個(gè)示例來加深ExecutorService的方法的理解猜敢。
先寫兩個(gè)任務(wù)類:ShortTask和LongTask,這兩個(gè)類都繼承了Runnable接口盒延,ShortTask的run()方法執(zhí)行很快缩擂,LongTask的run()方法執(zhí)行時(shí)間為10s。
public class LongTask implements Runnable {
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(10L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("complete a long task");
}
}
public class ShortTask implements Runnable {
@Override
public void run() {
System.out.println("complete a short task...");
}
}
測(cè)試shutdown()方法
package OSChina.Client;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.*;
public class Client10 {
public static void main(String[] args) {
ExecutorService threadpool = Executors.newFixedThreadPool(4);
threadpool.submit(new ShortTask());
threadpool.submit(new ShortTask());
threadpool.submit(new LongTask());
threadpool.submit(new ShortTask());
threadpool.shutdown();
boolean isShutdown = threadpool.isShutdown();
System.out.println("線程池是否已經(jīng)關(guān)閉:" + isShutdown);
final SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
try {
while (!threadpool.awaitTermination(1L, TimeUnit.SECONDS)){
System.out.println("線程池中還有任務(wù)在執(zhí)行添寺,當(dāng)前時(shí)間:" + sdf.format(new Date()));
}
System.out.println("線程池中已經(jīng)沒有在執(zhí)行的任務(wù)胯盯,線程池已完全關(guān)閉!");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
測(cè)試shutdownNow()方法
package OSChina.Client;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class Client11 {
public static void main(String[] args) {
ExecutorService threadpool = Executors.newFixedThreadPool(3);
//將5個(gè)任務(wù)提交到有3個(gè)線程的線程池
threadpool.submit(new LongTask());
threadpool.submit(new LongTask());
threadpool.submit(new LongTask());
threadpool.submit(new LongTask());
threadpool.submit(new LongTask());
try {
TimeUnit.SECONDS.sleep(1L);
//關(guān)閉線程池
List<Runnable> waiteRunnables = threadpool.shutdownNow();
System.out.println("還沒有執(zhí)行的任務(wù)數(shù):" + waiteRunnables.size());
boolean isShutdown = threadpool.isShutdown();
System.out.println("線程池是否已經(jīng)關(guān)閉:" + isShutdown);
final SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
while (!threadpool.awaitTermination(1L, TimeUnit.SECONDS)) {
System.out.println("線程池中還有任務(wù)在執(zhí)行计露,當(dāng)前時(shí)間:" + sdf.format(new Date()));
}
System.out.println("線程池中已經(jīng)沒有在執(zhí)行的任務(wù)博脑,線程池已完全關(guān)閉憎乙!");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
當(dāng)調(diào)用shutdownNow()后,三個(gè)執(zhí)行的任務(wù)都被interrupt了叉趣。而且awaitTermination(1L, TimeUnit.SECONDS)返回的都是true泞边。
測(cè)試submit(Callable<T> task)方法
package OSChina.Client;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
public class CallableTask implements Callable{
@Override
public Object call() throws Exception {
TimeUnit.SECONDS.sleep(5L);
return "success";
}
}
package OSChina.Client;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class Client12 {
public static void main(String[] args) {
ExecutorService threadpool = null;
threadpool = Executors.newFixedThreadPool(3);
final SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
System.out.println("提交一個(gè)callable任務(wù)到線程池,現(xiàn)在時(shí)間是:" + sdf.format(new Date()));
Future<String> future = threadpool.submit(new CallableTask());
try {
System.out.println("獲取callable任務(wù)的結(jié)果:" + future.get() + "君账,現(xiàn)在時(shí)間是:" + sdf.format(new Date()));
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} finally {
if(threadpool!=null){
threadpool.shutdown();
}
}
}
}