Java多線程6 CompletionService

Java多線程目錄

CompletionService

1 CompletionService介紹

CompletionService用于提交一組Callable任務,其take方法返回已完成的一個Callable任務對應的Future對象令蛉。
如果你向Executor提交了一個批處理任務,并且希望在它們完成后獲得結果。為此你可以將每個任務的Future保存進一個集合,然后循環(huán)這個集合調用Future的get()取出數據苦丁。幸運的是CompletionService幫你做了這件事情碟案。
CompletionService整合了Executor和BlockingQueue的功能。你可以將Callable任務提交給它去執(zhí)行笛丙,然后使用類似于隊列中的take和poll方法漾脂,在結果完整可用時獲得這個結果,像一個打包的Future胚鸯。
CompletionService的take返回的future是哪個先完成就先返回哪一個骨稿,而不是根據提交順序。

2 CompletionService源碼分析

首先看一下 構造方法:

   public ExecutorCompletionService(Executor executor) {
        if (executor == null)
            throw new NullPointerException();
        this.executor = executor;
        this.aes = (executor instanceof AbstractExecutorService) ?
            (AbstractExecutorService) executor : null;
        this.completionQueue = new LinkedBlockingQueue<Future<V>>();
    }

構造法方法主要初始化了一個阻塞隊列,用來存儲已完成的task任務坦冠。
然后看一下 completionService.submit 方法:

    public Future<V> submit(Callable<V> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<V> f = newTaskFor(task);
        executor.execute(new QueueingFuture(f));
        return f;
    }

    public Future<V> submit(Runnable task, V result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<V> f = newTaskFor(task, result);
        executor.execute(new QueueingFuture(f));
        return f;
    }

可以看到形耗,callable任務被包裝成QueueingFuture,而 QueueingFuture是 FutureTask的子類辙浑,所以最終執(zhí)行了FutureTask中的run()方法激涤。來看一下該方法:

 public void run() {
 //判斷執(zhí)行狀態(tài),保證callable任務只被運行一次
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
            //這里回調我們創(chuàng)建的callable對象中的call方法
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran)
            //處理執(zhí)行結果
                set(result);
        }
    } finally {
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

可以看到在該 FutureTask 中執(zhí)行run方法例衍,最終回調自定義的callable中的call方法昔期,執(zhí)行結束之后,通過 set(result) 處理執(zhí)行結果:

    /**
     * Sets the result of this future to the given value unless
     * this future has already been set or has been cancelled.
     *
     * <p>This method is invoked internally by the {@link #run} method
     * upon successful completion of the computation.
     *
     * @param v the value
     */
    protected void set(V v) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            finishCompletion();
        }
    }

繼續(xù)跟進finishCompletion()方法佛玄,在該方法中找到 done()方法:

protected void done() { completionQueue.add(task); }

可以看到該方法只做了一件事情硼一,就是將執(zhí)行結束的task添加到了隊列中,只要隊列中有元素梦抢,我們調用take()方法時就可以獲得執(zhí)行的結果般贼。
到這里就已經清晰了,異步非阻塞獲取執(zhí)行結果的實現原理其實就是通過隊列來實現的奥吩,FutureTask將執(zhí)行結果放到隊列中哼蛆,先進先出,線程執(zhí)行結束的順序就是獲取結果的順序霞赫。

CompletionService實際上可以看做是Executor和BlockingQueue的結合體腮介。CompletionService在接收到要執(zhí)行的任務時,通過類似BlockingQueue的put和take獲得任務執(zhí)行的結果端衰。CompletionService的一個實現是ExecutorCompletionService叠洗,ExecutorCompletionService把具體的計算任務交給Executor完成。

在實現上旅东,ExecutorCompletionService在構造函數中會創(chuàng)建一個BlockingQueue(使用的基于鏈表的無界隊列LinkedBlockingQueue)灭抑,該BlockingQueue的作用是保存Executor執(zhí)行的結果。當計算完成時抵代,調用FutureTask的done方法腾节。當提交一個任務到ExecutorCompletionService時,首先將任務包裝成QueueingFuture荤牍,它是FutureTask的一個子類案腺,然后改寫FutureTask的done方法,之后把Executor執(zhí)行的計算結果放入BlockingQueue中康吵。QueueingFuture的源碼如下:

    /**
     * FutureTask extension to enqueue upon completion
     */
    private class QueueingFuture extends FutureTask<Void> {
        QueueingFuture(RunnableFuture<V> task) {
            super(task, null);
            this.task = task;
        }
        protected void done() { completionQueue.add(task); }
        private final Future<V> task;
    }
3 CompletionService實現任務
public class CompletionServiceTest {
    public static void main(String[] args) {

        ExecutorService threadPool = Executors.newFixedThreadPool(10);
        CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(threadPool);
        for (int i = 1; i <=10; i++) {
            final int seq = i;
            completionService.submit(new Callable<Integer>() {
                @Override
                public Integer call() throws Exception {

                    Thread.sleep(new Random().nextInt(5000));

                    return seq;
                }
            });
        }
        threadPool.shutdown();
        for (int i = 0; i < 10; i++) {
            try {
                System.out.println(
                        completionService.take().get());
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }

    }
}
7
3
9
8
1
2
4
6
5
10

CompletionService總結

相比ExecutorService救湖,CompletionService可以更精確和簡便地完成異步任務的執(zhí)行
CompletionService的一個實現是ExecutorCompletionService,它是Executor和BlockingQueue功能的融合體涎才,Executor完成計算任務,BlockingQueue負責保存異步任務的執(zhí)行結果
在執(zhí)行大量相互獨立和同構的任務時,可以使用CompletionService
CompletionService可以為任務的執(zhí)行設置時限耍铜,主要是通過BlockingQueue的poll(long time,TimeUnit unit)為任務執(zhí)行結果的取得限制時間邑闺,如果沒有完成就取消任務

多線程 | CompletionService異步非阻塞獲取并行任務執(zhí)行結果
CompletionService簡介、原理以及小案例

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯系作者
  • 序言:七十年代末棕兼,一起剝皮案震驚了整個濱河市陡舅,隨后出現的幾起案子,更是在濱河造成了極大的恐慌伴挚,老刑警劉巖靶衍,帶你破解...
    沈念sama閱讀 206,214評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現場離奇詭異茎芋,居然都是意外死亡颅眶,警方通過查閱死者的電腦和手機,發(fā)現死者居然都...
    沈念sama閱讀 88,307評論 2 382
  • 文/潘曉璐 我一進店門田弥,熙熙樓的掌柜王于貴愁眉苦臉地迎上來涛酗,“玉大人,你說我怎么就攤上這事偷厦∩烫荆” “怎么了?”我有些...
    開封第一講書人閱讀 152,543評論 0 341
  • 文/不壞的土叔 我叫張陵只泼,是天一觀的道長剖笙。 經常有香客問我,道長请唱,這世上最難降的妖魔是什么弥咪? 我笑而不...
    開封第一講書人閱讀 55,221評論 1 279
  • 正文 為了忘掉前任,我火速辦了婚禮籍滴,結果婚禮上酪夷,老公的妹妹穿的比我還像新娘。我一直安慰自己孽惰,他們只是感情好晚岭,可當我...
    茶點故事閱讀 64,224評論 5 371
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著勋功,像睡著了一般坦报。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上狂鞋,一...
    開封第一講書人閱讀 49,007評論 1 284
  • 那天片择,我揣著相機與錄音,去河邊找鬼骚揍。 笑死字管,一個胖子當著我的面吹牛啰挪,可吹牛的內容都是我干的。 我是一名探鬼主播嘲叔,決...
    沈念sama閱讀 38,313評論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼亡呵,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了硫戈?” 一聲冷哼從身側響起锰什,我...
    開封第一講書人閱讀 36,956評論 0 259
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎丁逝,沒想到半個月后汁胆,有當地人在樹林里發(fā)現了一具尸體,經...
    沈念sama閱讀 43,441評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡霜幼,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 35,925評論 2 323
  • 正文 我和宋清朗相戀三年嫩码,在試婚紗的時候發(fā)現自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片辛掠。...
    茶點故事閱讀 38,018評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡谢谦,死狀恐怖,靈堂內的尸體忽然破棺而出萝衩,到底是詐尸還是另有隱情回挽,我是刑警寧澤,帶...
    沈念sama閱讀 33,685評論 4 322
  • 正文 年R本政府宣布猩谊,位于F島的核電站千劈,受9級特大地震影響,放射性物質發(fā)生泄漏牌捷。R本人自食惡果不足惜墙牌,卻給世界環(huán)境...
    茶點故事閱讀 39,234評論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望暗甥。 院中可真熱鬧喜滨,春花似錦、人聲如沸撤防。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,240評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽寄月。三九已至辜膝,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間漾肮,已是汗流浹背厂抖。 一陣腳步聲響...
    開封第一講書人閱讀 31,464評論 1 261
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留克懊,地道東北人忱辅。 一個月前我還...
    沈念sama閱讀 45,467評論 2 352
  • 正文 我出身青樓七蜘,卻偏偏與公主長得像,于是被迫代替她去往敵國和親耕蝉。 傳聞我的和親對象是個殘疾皇子崔梗,可洞房花燭夜當晚...
    茶點故事閱讀 42,762評論 2 345

推薦閱讀更多精彩內容

  • 本文是我自己在秋招復習時的讀書筆記,整理的知識點垒在,也是為了防止忘記,尊重勞動成果扔亥,轉載注明出處哦场躯!如果你也喜歡,那...
    波波波先森閱讀 11,239評論 4 56
  • 進程和線程 進程 所有運行中的任務通常對應一個進程,當一個程序進入內存運行時,即變成一個進程.進程是處于運行過程中...
    勝浩_ae28閱讀 5,085評論 0 23
  • 在Java中旅挤,使用線程來異步執(zhí)行任務踢关。Java線程的創(chuàng)建與銷毀需要一定的開銷,如果我們?yōu)槊恳粋€任務創(chuàng)建一個新線程來...
    Steven1997閱讀 747評論 0 0
  • 進程和線程 進程 所有運行中的任務通常對應一個進程,當一個程序進入內存運行時,即變成一個進程.進程是處于運行過程中...
    小徐andorid閱讀 2,797評論 3 53
  • 現在買房也不需要等三個月過后再搬進去住了粘茄,因為現在有除甲醛的啦签舞,首先來說說甲醛的危害吧,甲醛的危害分為以...
    金雅麗呀閱讀 661評論 1 0