參考鏈接:
java并發(fā)編程-Executor框架+Future
1. 概述
Executor框架是指java 5中引入的一系列并發(fā)庫中與executor相關的一些功能類仙蚜,其中包括線程池怖糊,Executor,Executors孝凌,ExecutorService,CompletionService左电,Future章贞,Callable等。他們的關系為:
- Excutor 執(zhí)行器接口, Executor.execute(Runnalbe) 闺魏。Executor在執(zhí)行時使用內部的線程池完成操作
- Future<T> 接口未状,用于獲取異步任務的執(zhí)行結果 。核心方法
V get() throws InterruptedException, ExecutionException; 獲取類型為V的執(zhí)行結果析桥,如果任務還未產生結果司草,阻塞。 - Callable<T> 接口烹骨,提供一個返回類型T結果的 call()的方法
- public interface RunnableFuture<V> extends Runnable, Future<V>{} 一個繼承了Runnable和Future接口的接口(接口可以繼承其他接口翻伺,繼承接口的時候可以繼承多個),相當于帶有Future功能的線程沮焕。
- public class FutureTask<V> implements RunnableFuture<V> 具有一個Callable的成員變量。
2.創(chuàng)建線程池
Executors類拉宗,提供了一系列工廠方法用于創(chuàng)先線程池峦树,返回的線程池都實現了ExecutorService接口。
public static ExecutorService newFixedThreadPool(int nThreads)
創(chuàng)建固定數目線程的線程池旦事。
public static ExecutorService newCachedThreadPool()
創(chuàng)建一個可緩存的線程池,調用execute
將重用以前構造的線程(如果線程可用)。如果現有線程沒有可用的妓盲,則創(chuàng)建一個新線程并添加到池中超歌。終止并從緩存中移除那些已有 60 秒鐘未被使用的線程。
public static ExecutorService newSingleThreadExecutor()
創(chuàng)建一個單線程化的Executor卖鲤。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
創(chuàng)建一個支持定時及周期性的任務執(zhí)行的線程池肾扰,多數情況下可用來替代Timer類。
3. ExcutorService接口
ExecutorService接口繼承了Executor接口蛋逾,添加了一些生命周期管理和任務提交機制的方法集晚。一個Executor的生命周期有三種狀態(tài),運行 区匣,關閉 偷拔,終止 。ExecutorService創(chuàng)建時處于運行狀態(tài)。當調用ExecutorService.shutdown()后莲绰,處于關閉狀態(tài)欺旧,isShutdown()方法返回true。這時蛤签,不應該再向ExecutorService中添加任務辞友,所有已添加的任務執(zhí)行完畢后,ExecutorService處于終止狀態(tài)顷啼,isTerminated()返回true踏枣。
如果Executor處于關閉狀態(tài),往Executor提交任務會拋出unchecked exception RejectedExecutionException钙蒙。
- 提交任務: <T> Future<T> submit(Callable<T> task);
- 執(zhí)行所有任務:<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
4 一個并發(fā)計算數組和的實例
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
public class ConcurrentCalculator {
private ExecutorService exec;
private int cpuCoreNumber;
private List<Future<Long>> tasks = new ArrayList<Future<Long>>();
class SumCalculator implements Callable<Long> {
private int[] nums;
private int start, end;
public SumCalculator(final int[] nums, int start, int end) {
this.nums = nums;
this.start = start;
this.end = end;
}
@Override
public Long call() throws Exception {
long sum = 0l;
for (int i = start; i < end; i++) {
sum += nums[i];
}
return sum;
}
}
public ConcurrentCalculator() {
cpuCoreNumber = Runtime.getRuntime().availableProcessors();
exec = Executors.newFixedThreadPool(cpuCoreNumber);
}
public Long sum(final int[] nums) {
for (int i = 0; i < cpuCoreNumber; i++) {
int increment = nums.length / cpuCoreNumber + 1;
int start = increment * i;
int end = increment * i + increment;
if (end > nums.length) {
end = nums.length;
}
SumCalculator sumCalc = new SumCalculator(nums, start, end);
FutureTask<Long> task = new FutureTask<Long>(sumCalc);
tasks.add(task);
}
return getResult();
}
private Long getResult() {
Long result = 0l;
for (Future<Long> task : tasks) {
try {
Long subSum = task.get();
result += subSum;
} catch (InterruptedException e) {
e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
} catch (ExecutionException e) {
e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
}
}
return result;
}
public void close() {
exec.shutdown();
}
}