CompletionService
1 CompletionService介紹
CompletionService用于提交一組Callable任務,其take方法返回已完成的一個Callable任務對應的Future對象令蛉。
如果你向Executor提交了一個批處理任務,并且希望在它們完成后獲得結果。為此你可以將每個任務的Future保存進一個集合,然后循環(huán)這個集合調用Future的get()取出數據苦丁。幸運的是CompletionService幫你做了這件事情碟案。
CompletionService整合了Executor和BlockingQueue的功能。你可以將Callable任務提交給它去執(zhí)行笛丙,然后使用類似于隊列中的take和poll方法漾脂,在結果完整可用時獲得這個結果,像一個打包的Future胚鸯。
CompletionService的take返回的future是哪個先完成就先返回哪一個骨稿,而不是根據提交順序。
2 CompletionService源碼分析
首先看一下 構造方法:
public ExecutorCompletionService(Executor executor) {
if (executor == null)
throw new NullPointerException();
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
this.completionQueue = new LinkedBlockingQueue<Future<V>>();
}
構造法方法主要初始化了一個阻塞隊列,用來存儲已完成的task任務坦冠。
然后看一下 completionService.submit 方法:
public Future<V> submit(Callable<V> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<V> f = newTaskFor(task);
executor.execute(new QueueingFuture(f));
return f;
}
public Future<V> submit(Runnable task, V result) {
if (task == null) throw new NullPointerException();
RunnableFuture<V> f = newTaskFor(task, result);
executor.execute(new QueueingFuture(f));
return f;
}
可以看到形耗,callable任務被包裝成QueueingFuture,而 QueueingFuture是 FutureTask的子類辙浑,所以最終執(zhí)行了FutureTask中的run()方法激涤。來看一下該方法:
public void run() {
//判斷執(zhí)行狀態(tài),保證callable任務只被運行一次
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
//這里回調我們創(chuàng)建的callable對象中的call方法
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
//處理執(zhí)行結果
set(result);
}
} finally {
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
可以看到在該 FutureTask 中執(zhí)行run方法例衍,最終回調自定義的callable中的call方法昔期,執(zhí)行結束之后,通過 set(result) 處理執(zhí)行結果:
/**
* Sets the result of this future to the given value unless
* this future has already been set or has been cancelled.
*
* <p>This method is invoked internally by the {@link #run} method
* upon successful completion of the computation.
*
* @param v the value
*/
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
繼續(xù)跟進finishCompletion()方法佛玄,在該方法中找到 done()方法:
protected void done() { completionQueue.add(task); }
可以看到該方法只做了一件事情硼一,就是將執(zhí)行結束的task添加到了隊列中,只要隊列中有元素梦抢,我們調用take()方法時就可以獲得執(zhí)行的結果般贼。
到這里就已經清晰了,異步非阻塞獲取執(zhí)行結果的實現原理其實就是通過隊列來實現的奥吩,FutureTask將執(zhí)行結果放到隊列中哼蛆,先進先出,線程執(zhí)行結束的順序就是獲取結果的順序霞赫。
CompletionService實際上可以看做是Executor和BlockingQueue的結合體腮介。CompletionService在接收到要執(zhí)行的任務時,通過類似BlockingQueue的put和take獲得任務執(zhí)行的結果端衰。CompletionService的一個實現是ExecutorCompletionService叠洗,ExecutorCompletionService把具體的計算任務交給Executor完成。
在實現上旅东,ExecutorCompletionService在構造函數中會創(chuàng)建一個BlockingQueue(使用的基于鏈表的無界隊列LinkedBlockingQueue)灭抑,該BlockingQueue的作用是保存Executor執(zhí)行的結果。當計算完成時抵代,調用FutureTask的done方法腾节。當提交一個任務到ExecutorCompletionService時,首先將任務包裝成QueueingFuture荤牍,它是FutureTask的一個子類案腺,然后改寫FutureTask的done方法,之后把Executor執(zhí)行的計算結果放入BlockingQueue中康吵。QueueingFuture的源碼如下:
/**
* FutureTask extension to enqueue upon completion
*/
private class QueueingFuture extends FutureTask<Void> {
QueueingFuture(RunnableFuture<V> task) {
super(task, null);
this.task = task;
}
protected void done() { completionQueue.add(task); }
private final Future<V> task;
}
3 CompletionService實現任務
public class CompletionServiceTest {
public static void main(String[] args) {
ExecutorService threadPool = Executors.newFixedThreadPool(10);
CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(threadPool);
for (int i = 1; i <=10; i++) {
final int seq = i;
completionService.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
Thread.sleep(new Random().nextInt(5000));
return seq;
}
});
}
threadPool.shutdown();
for (int i = 0; i < 10; i++) {
try {
System.out.println(
completionService.take().get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
}
7
3
9
8
1
2
4
6
5
10
CompletionService總結
相比ExecutorService救湖,CompletionService可以更精確和簡便地完成異步任務的執(zhí)行
CompletionService的一個實現是ExecutorCompletionService,它是Executor和BlockingQueue功能的融合體涎才,Executor完成計算任務,BlockingQueue負責保存異步任務的執(zhí)行結果
在執(zhí)行大量相互獨立和同構的任務時,可以使用CompletionService
CompletionService可以為任務的執(zhí)行設置時限耍铜,主要是通過BlockingQueue的poll(long time,TimeUnit unit)為任務執(zhí)行結果的取得限制時間邑闺,如果沒有完成就取消任務
多線程 | CompletionService異步非阻塞獲取并行任務執(zhí)行結果
CompletionService簡介、原理以及小案例