Executor
public interface Executor {
void execute(Runnable command);
}
Executor抽象提供了一種將任務(wù)提交與每個(gè)任務(wù)的運(yùn)行機(jī)制(包括線程使用蒲犬、調(diào)度)分離的方法,即Runnable
代表任務(wù)岸啡,execute
處理調(diào)度的邏輯暖哨;
static class ThreadPerTaskExecutor implements Executor {
public void execute(Runnable r) {
System.out.println("ThreadPerTaskExecutor - execute");
new Thread(r).start();
}
}
static class DirectExecutor implements Executor {
public void execute(Runnable r) {
System.out.println("DirectExecutor - execute");
r.run();
}
}
class SerialExecutor implements Executor {
final Queue<Runnable> tasks = new ArrayDeque<Runnable>();
final Executor executor;
Runnable active;
SerialExecutor(Executor executor) {
this.executor = executor;
}
public synchronized void execute(final Runnable r) {
System.out.println("task offer ... ");
tasks.offer(new Runnable() {
public void run() {
System.out.println("SerialExecutor - execute ... ");
try {
r.run();
} finally {
scheduleNext();
}
}
});
if (active == null) {
scheduleNext();
}
}
protected synchronized void scheduleNext() {
if ((active = tasks.poll()) != null) {
System.out.println("schedule next task ");
executor.execute(active);
}
}
}
public static void main(String[] args) {
// SerialExecutor executor = new SerialExecutor(new ThreadPerTaskExecutor());
SerialExecutor executor = new SerialExecutor(new DirectExecutor());
executor.execute(() -> { System.out.println(Thread.currentThread()); });
executor.execute(() -> { System.out.println(Thread.currentThread()); });
}
#輸出
#task offer ...
#schedule next task
#DirectExecutor - execute
#SerialExecutor - execute ...
#Thread[main,5,main]
#task offer ...
#schedule next task
#DirectExecutor - execute
#SerialExecutor - execute ...
#Thread[main,5,main]
官方文檔上提供的示例,演示了Executor
抽象的簡(jiǎn)單使用方法凰狞,示例想體現(xiàn)的除了異步同步任務(wù)外,我覺(jué)得更加重要的是任務(wù)執(zhí)行和任務(wù)調(diào)度分離的思想沛慢;
ExecutorService
public interface ExecutorService extends Executor {
// 啟動(dòng)有序關(guān)閉赡若,在此過(guò)程中執(zhí)行以前提交的任務(wù),但不接受任何新任務(wù)团甲。
void shutdown();
// 嘗試停止所有正在執(zhí)行的任務(wù)逾冬,停止等待任務(wù)的處理,并返回等待執(zhí)行的任務(wù)列表躺苦。
List<Runnable> shutdownNow();
// executor是否被shutdown
boolean isShutdown();
// 如果所有任務(wù)都完成關(guān)閉則返回true
boolean isTerminated();
// 阻塞直到所有任務(wù)在關(guān)閉請(qǐng)求后完成執(zhí)行身腻,或超時(shí)發(fā)生后,或當(dāng)前線程中斷后匹厘;
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
// 提交任務(wù)嘀趟,返回
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
// 執(zhí)行一個(gè)集合的任務(wù),返回一個(gè)列表其中包含所有任務(wù)完成時(shí)的狀態(tài)和結(jié)果(可以是異常結(jié)果)愈诚。
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
// 執(zhí)行給定的任務(wù)她按,返回已成功完成的任務(wù)的結(jié)果(即不拋出異常),如果有的話炕柔。在正匙锰或異常返回時(shí),未完成的任務(wù)將被取消匕累。
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
ExecutorService提供了對(duì)每個(gè)Executor
跟蹤陵刹、管理一個(gè)或者多個(gè)異步任務(wù)的進(jìn)度,也可以關(guān)閉服務(wù)來(lái)終止service接收新任務(wù)和回收資源欢嘿;
void shutdownAndAwaitTermination(ExecutorService pool) {
pool.shutdown(); // 禁止提交新任務(wù)
try {
//等待現(xiàn)有任務(wù)終止
if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
pool.shutdownNow(); // 取消當(dāng)前正在執(zhí)行的任務(wù)
// 等待一段時(shí)間衰琐,等待任務(wù)響應(yīng)被取消
if (!pool.awaitTermination(60, TimeUnit.SECONDS))
System.err.println("Pool did not terminate");
}
} catch (InterruptedException ie) {
// 如果當(dāng)前線程處于中斷狀態(tài)也糊,重試shutdown
pool.shutdownNow();
// 保存中斷狀態(tài)
Thread.currentThread().interrupt();
}
}
文檔中演示了一個(gè)終止服務(wù)的示例,分兩個(gè)階段關(guān)閉一個(gè) ExecutorService
碘耳,首先需要調(diào)用shutdown
來(lái)拒絕傳入的任務(wù)显设,然后調(diào)用shutdownNow
(如果需要的話)來(lái)取消任何延遲的任務(wù);
CompletionService
public interface CompletionService<V> {
Future<V> submit(Callable<V> task);
Future<V> submit(Runnable task, V result);
Future<V> take() throws InterruptedException;
Future<V> poll();
Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
}
CompletionService旨在提供一個(gè)異步任務(wù)產(chǎn)生執(zhí)行和已完成任務(wù)結(jié)果的解耦服務(wù)辛辨,即會(huì)有一個(gè)隊(duì)列儲(chǔ)存已完成的任務(wù)結(jié)果的合集捕捂,任務(wù)的提交和執(zhí)行不會(huì)阻塞獲取結(jié)果操作;
ExecutorCompletionService
public class ExecutorCompletionService<V> implements CompletionService<V> {
private final Executor executor;
private final AbstractExecutorService aes;
private final BlockingQueue<Future<V>> completionQueue;
...
}
ExecutorCompletionService是CompletionService的實(shí)現(xiàn)類斗搞,內(nèi)部有個(gè)阻塞隊(duì)列儲(chǔ)存的是完成任務(wù)的結(jié)果指攒;
// 注意這個(gè)queue的默認(rèn)長(zhǎng)度為Integer.MAX_VALUE
public ExecutorCompletionService(Executor executor) {
if (executor == null)
throw new NullPointerException();
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
this.completionQueue = new LinkedBlockingQueue<Future<V>>();
}
public ExecutorCompletionService(Executor executor,
BlockingQueue<Future<V>> completionQueue) {
if (executor == null || completionQueue == null)
throw new NullPointerException();
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
this.completionQueue = completionQueue;
}
構(gòu)造器默認(rèn)使用LinkedBlockingQueue
,那么很容易就聯(lián)想到該阻塞隊(duì)列的特性:
- 是有界的雙端隊(duì)列
- 提供阻塞和非阻塞方法獲取已完成的任務(wù)
public Future<V> take() throws InterruptedException {
return completionQueue.take();
}
public Future<V> poll() {
return completionQueue.poll();
}
public Future<V> poll(long timeout, TimeUnit unit)
throws InterruptedException {
return completionQueue.poll(timeout, unit);
}
- 當(dāng)完成任務(wù)的隊(duì)列滿了僻焚,新完成的任務(wù)結(jié)果會(huì)被拋棄
private class QueueingFuture extends FutureTask<Void> {
QueueingFuture(RunnableFuture<V> task) {
super(task, null);
this.task = task;
}
// FutureTask提供的鉤子
protected void done() { completionQueue.add(task); }
private final Future<V> task;
}
注意這里的done
在FutureTask
完成或者異吃试茫或者cancel
都會(huì)被調(diào)用;
public Future<V> submit(Callable<V> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<V> f = newTaskFor(task);
executor.execute(new QueueingFuture(f));
return f;
}
private RunnableFuture<V> newTaskFor(Callable<V> task) {
if (aes == null)
return new FutureTask<V>(task);
else
return aes.newTaskFor(task);
}
若構(gòu)造器中傳進(jìn)來(lái)的Executor
是AbstractExecutorService
的子類虑啤,那么newTaskFor
就會(huì)交由子類來(lái)決定FutureTask
的類型隙弛,達(dá)到定制擴(kuò)展的效果;
void solve(Executor e, Collection<Callable<Result>> solvers)
throws InterruptedException, ExecutionException {
CompletionService<Result> ecs = new ExecutorCompletionService<Result>(e);
for (Callable<Result> s : solvers)
ecs.submit(s);
int n = solvers.size();
for (int i = 0; i < n; ++i) {
Result r = ecs.take().get(); // 這里會(huì)拋出中斷異常
if (r != null)
use(r);
}
}
官方提供的用法示例狞山,第一個(gè)比較簡(jiǎn)單全闷,遍歷任務(wù)集合中每個(gè)任務(wù)執(zhí)行后獲取結(jié)果,是一種順序執(zhí)行的過(guò)程萍启,執(zhí)行 -> 獲取結(jié)果 -> 執(zhí)行 -> 獲取結(jié)果总珠,當(dāng)然執(zhí)行任務(wù)過(guò)程可以是異步的,如果遇到中斷異常會(huì)停止任務(wù)勘纯;