CompleteService是一個(gè)生產(chǎn)者消費(fèi)者接口,用來(lái)使生產(chǎn)和消費(fèi)分離最域,生產(chǎn)者通過(guò)submit()方法用來(lái)提交任務(wù)系吭,消費(fèi)者通過(guò)take()方法獲取執(zhí)行完成的任務(wù)。CompleteService通常用來(lái)管理異步任務(wù)的IO隊(duì)列房官,生產(chǎn)者和消費(fèi)者可以屬于系統(tǒng)的不同部分,具有生產(chǎn)者消費(fèi)者類型的需求都可以考慮用這個(gè)模型來(lái)處理续滋,F(xiàn)uture控制任務(wù)的取消執(zhí)行翰守,以及執(zhí)行完畢時(shí)候結(jié)果的獲取,以及管理結(jié)果獲取隊(duì)列疲酌。
通過(guò)Excutor去執(zhí)行任務(wù)蜡峰,
public interface CompleteService<V>{
Future<V> submit(Callable<V> task);//提交一個(gè)任務(wù),這個(gè)任務(wù)可能被執(zhí)行也可能被取消執(zhí)行朗恳,當(dāng)任務(wù)為空的時(shí)候拋出空指針異常湿颅,當(dāng)任務(wù)不會(huì)調(diào)度執(zhí)行的時(shí)候會(huì)拋出RejectedExecutionException.
Future<V>submit(Runnable task,V result);
Future<V> take() throws InterruptedException;//檢索下一個(gè)任務(wù)的Future并返回,如果沒(méi)有檢索到下個(gè)任務(wù)的Future就等待粥诫,如果等待的時(shí)候被Interrupt了就拋出InterruptedException
Future<V> poll()://檢索下一個(gè)任務(wù)的Future并返回油航,如果沒(méi)有檢索到下個(gè)任務(wù)的Future就返回空
Future<V>poll(long timeout,TimeUnit timeUnit) throws InterruptedException;//檢索下一個(gè)任務(wù)的Future并返回,如果沒(méi)有檢索到下個(gè)任務(wù)的Future臀脏,就等待timeout時(shí)間劝堪,沒(méi)等到就返回空,等待過(guò)程中線程被Interrupt 就拋出InterruptedException
}
CompleteService的實(shí)現(xiàn)類揉稚,三個(gè)成員變量,用來(lái)執(zhí)行任務(wù)的執(zhí)行器熬粗,用來(lái)創(chuàng)建Future的搀玖,用來(lái)存儲(chǔ)執(zhí)行結(jié)果的內(nèi)部隊(duì)列。
public class ExecutorCompletionService<V> implements CompletionService<V> {
private final Executor executor;//執(zhí)行器
private final AbstractExecutorService aes;//調(diào)用這個(gè)的創(chuàng)建Future的方法
private final BlockingQueue<Future<V>> completionQueue;//內(nèi)部隊(duì)列
/**
* FutureTask extension to enqueue upon completion.
*/
private static class QueueingFuture<V> extends FutureTask<Void> {
QueueingFuture(RunnableFuture<V> task,
BlockingQueue<Future<V>> completionQueue) {
super(task, null);
this.task = task;
this.completionQueue = completionQueue;//任務(wù)執(zhí)行的隊(duì)列
}
private final Future<V> task;
private final BlockingQueue<Future<V>> completionQueue;
protected void done() { completionQueue.add(task); //外部調(diào)用驻呐,因?yàn)槭莗rotected的灌诅,Executor執(zhí)行完任務(wù)后,將結(jié)果放進(jìn)completionQueue里面
}
}
private RunnableFuture<V> newTaskFor(Callable<V> task) {
if (aes == null)
return new FutureTask<V>(task);
else
return aes.newTaskFor(task);
}
private RunnableFuture<V> newTaskFor(Runnable task, V result) {
if (aes == null)
return new FutureTask<V>(task, result);
else
return aes.newTaskFor(task, result);
}
/**
* Creates an ExecutorCompletionService using the supplied
* executor for base task execution and a
* {@link LinkedBlockingQueue} as a completion queue.
*
* @param executor the executor to use
* @throws NullPointerException if executor is {@code null}
*/
public ExecutorCompletionService(Executor executor) {
if (executor == null)
throw new NullPointerException();
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
this.completionQueue = new LinkedBlockingQueue<Future<V>>();
}
/**
* Creates an ExecutorCompletionService using the supplied
* executor for base task execution and the supplied queue as its
* completion queue.
*
* @param executor the executor to use
* @param completionQueue the queue to use as the completion queue//任務(wù)結(jié)束隊(duì)列
* normally one dedicated for use by this service. This
* queue is treated as unbounded -- failed attempted
* {@code Queue.add} operations for completed tasks cause
* them not to be retrievable.
* @throws NullPointerException if executor or completionQueue are {@code null}
*/
public ExecutorCompletionService(Executor executor,
BlockingQueue<Future<V>> completionQueue) {
if (executor == null || completionQueue == null)
throw new NullPointerException();
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
this.completionQueue = completionQueue;
}
public Future<V> submit(Callable<V> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<V> f = newTaskFor(task);
executor.execute(new QueueingFuture<V>(f, completionQueue));
return f;
}
public Future<V> submit(Runnable task, V result) {
if (task == null) throw new NullPointerException();
RunnableFuture<V> f = newTaskFor(task, result);
executor.execute(new QueueingFuture<V>(f, completionQueue));
return f;
}
public Future<V> take() throws InterruptedException {
return completionQueue.take();
}
public Future<V> poll() {
return completionQueue.poll();
}
public Future<V> poll(long timeout, TimeUnit unit)
throws InterruptedException {
return completionQueue.poll(timeout, unit);
}
}