FutureTask
FutureTask是J.U.C中的類就轧,是一個可刪除的異步計算類柜蜈。這個類提供了Future接口的的基本實現(xiàn)姥闭,使用相關(guān)方法啟動和取消計算瓜挽,查詢計算是否完成蜕乡,并檢索計算結(jié)果蜀踏。只有在計算完成時才能使用get方法檢索結(jié)果;如果計算尚未完成屋摇,get方法將會阻塞偶洋。一旦計算完成,計算就不能重新啟動或取消(除非使用runAndReset方法調(diào)用計算)妓雾。
Runnable與Callable對比
通常實現(xiàn)一個線程我們會使用繼承Thread的方式或者實現(xiàn)Runnable接口娶吞,這兩種方式有一個共同的缺陷就是在執(zhí)行完任務(wù)之后無法獲取執(zhí)行結(jié)果。從Java1.5之后就提供了Callable與Future械姻,這兩個接口就可以實現(xiàn)獲取任務(wù)執(zhí)行結(jié)果妒蛇。
- Runnable接口:代碼非常簡單,只有一個方法run
public interface RunnableFuture<V> extends Runnable, Future<V> {
void run();
}
- Callable泛型接口:有泛型參數(shù)楷拳,提供了一個call方法绣夺,執(zhí)行后可返回傳入的泛型參數(shù)類型的結(jié)果。
public interface Callable<V> {
V call() throws Exception;
}
Future接口
Future接口提供了一系列方法用于控制線程執(zhí)行計算欢揖,如下:
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);//取消任務(wù)
boolean isCancelled();//是否被取消
boolean isDone();//計算是否完成
V get() throws InterruptedException, ExecutionException;//獲取計算結(jié)果陶耍,在執(zhí)行過程中任務(wù)被阻塞
V get(long timeout, TimeUnit unit)//timeout等待時間、unit時間單位
throws InterruptedException, ExecutionException, TimeoutException;
}
使用方法:
public class FutureExample {
static class MyCallable implements Callable<String> {
@Override
public String call() throws Exception {
log.info("do something in callable");
Thread.sleep(5000);
return "Done";
}
}
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
Future<String> future = executorService.submit(new MyCallable());//線程池提交任務(wù)
log.info("do something in main");
Thread.sleep(1000);
String result = future.get();//獲取不到一直阻塞
log.info("result:{}", result);
}
}
運行結(jié)果:阻塞效果
FutureTask
Future實現(xiàn)了RunnableFuture接口她混,而RunnableFuture接口繼承了Runnable與Future接口烈钞,所以它既可以作為Runnable被線程中執(zhí)行,又可以作為callable獲得返回值坤按。
public class FutureTask<V> implements RunnableFuture<V> {
...
}
public interface RunnableFuture<V> extends Runnable, Future<V> {
void run();
}
FutureTask支持兩種參數(shù)類型毯欣,Callable和Runnable,在使用Runnable 時臭脓,還可以多指定一個返回結(jié)果類型酗钞。
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
1. FutureTask執(zhí)行多任務(wù)計算的使用場景
利用FutureTask和ExecutorService,可以用多線程的方式提交計算任務(wù)来累,主線程繼續(xù)執(zhí)行其他任務(wù)砚作,當(dāng)主線程需要子線程的計算結(jié)果時,在異步獲取子線程的執(zhí)行結(jié)果佃扼。
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
public class FutureTaskForMultiCompute {
public static void main(String[] args) {
FutureTaskForMultiCompute inst=new FutureTaskForMultiCompute();
// 創(chuàng)建任務(wù)集合
List<FutureTask<Integer>> taskList = new ArrayList<FutureTask<Integer>>();
// 創(chuàng)建線程池
ExecutorService exec = Executors.newFixedThreadPool(5);
for (int i = 0; i < 10; i++) {
// 傳入Callable對象創(chuàng)建FutureTask對象
FutureTask<Integer> ft = new FutureTask<Integer>(inst.new ComputeTask(i, ""+i));
taskList.add(ft);
// 提交給線程池執(zhí)行任務(wù)偎巢,也可以通過exec.invokeAll(taskList)一次性提交所有任務(wù);
exec.submit(ft);
}
System.out.println("所有計算任務(wù)提交完畢, 主線程接著干其他事情蔼夜!");
// 開始統(tǒng)計各計算線程計算結(jié)果
Integer totalResult = 0;
for (FutureTask<Integer> ft : taskList) {
try {
//FutureTask的get方法會自動阻塞,直到獲取計算結(jié)果為止
totalResult = totalResult + ft.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
// 關(guān)閉線程池
exec.shutdown();
System.out.println("多任務(wù)計算后的總結(jié)果是:" + totalResult);
}
private class ComputeTask implements Callable<Integer> {
private Integer result = 0;
private String taskName = "";
public ComputeTask(Integer iniResult, String taskName){
result = iniResult;
this.taskName = taskName;
System.out.println("生成子線程計算任務(wù): "+taskName);
}
public String getTaskName(){
return this.taskName;
}
@Override
public Integer call() throws Exception {
// TODO Auto-generated method stub
for (int i = 0; i < 100; i++) {
result =+ i;
}
// 休眠5秒鐘兼耀,觀察主線程行為,預(yù)期的結(jié)果是主線程會繼續(xù)執(zhí)行求冷,到要取得FutureTask的結(jié)果是等待直至完成瘤运。
Thread.sleep(5000);
System.out.println("子線程計算任務(wù): "+taskName+" 執(zhí)行完成!");
return result;
}
}
}
2. FutureTask在高并發(fā)環(huán)境下確保任務(wù)只執(zhí)行一次
在很多高并發(fā)的環(huán)境下,往往我們只需要某些任務(wù)只執(zhí)行一次匠题。這種使用情景FutureTask的特性恰能勝任拯坟。舉一個例子,假設(shè)有一個帶key的連接池韭山,當(dāng)key存在時郁季,即直接返回key對應(yīng)的對象冷溃;當(dāng)key不存在時,則創(chuàng)建連接梦裂。對于這樣的應(yīng)用場景似枕,通常采用的方法為使用一個Map對象來存儲key和連接池對應(yīng)的對應(yīng)關(guān)系,典型的代碼如下面所示:
private Map<String, Connection> connectionPool = new HashMap<String, Connection>();
private ReentrantLock lock = new ReentrantLock();
public Connection getConnection(String key){
try{
lock.lock();
if(connectionPool.containsKey(key)){
return connectionPool.get(key);
}
else{
//創(chuàng)建 Connection
Connection conn = createConnection();
connectionPool.put(key, conn);
return conn;
}
}
finally{
lock.unlock();
}
}
//創(chuàng)建Connection
private Connection createConnection(){
return null;
}
在上面的例子中年柠,我們通過加鎖確保高并發(fā)環(huán)境下的線程安全凿歼,也確保了connection只創(chuàng)建一次,然而確犧牲了性能冗恨。改用ConcurrentHash的情況下答憔,幾乎可以避免加鎖的操作,性能大大提高掀抹,但是在高并發(fā)的情況下有可能出現(xiàn)Connection被創(chuàng)建多次的現(xiàn)象虐拓。這時最需要解決的問題就是當(dāng)key不存在時,創(chuàng)建Connection的動作能放在connectionPool之后執(zhí)行渴丸,這正是FutureTask發(fā)揮作用的時機侯嘀,基于ConcurrentHashMap和FutureTask的改造代碼如下:
private ConcurrentHashMap<String,FutureTask<Connection>>connectionPool = new ConcurrentHashMap<String, FutureTask<Connection>>();
public Connection getConnection(String key) throws Exception{
FutureTask<Connection>connectionTask=connectionPool.get(key);
if(connectionTask!=null){
return connectionTask.get();
}
else{
Callable<Connection> callable = new Callable<Connection>(){
@Override
public Connection call() throws Exception {
// TODO Auto-generated method stub
return createConnection();
}
};
FutureTask<Connection>newTask = new FutureTask<Connection>(callable);
connectionTask = connectionPool.putIfAbsent(key, newTask);
if(connectionTask==null){
connectionTask = newTask;
connectionTask.run();
}
return connectionTask.get();
}
}
//創(chuàng)建Connection
private Connection createConnection(){
return null;
}
經(jīng)過這樣的改造,可以避免由于并發(fā)帶來的多次創(chuàng)建連接及鎖的出現(xiàn)谱轨。