Java8源碼閱讀 - Executor、ExecutorService料饥、ExecutorCompletionService

Executor

public interface Executor {
    void execute(Runnable command);
}

Executor抽象提供了一種將任務(wù)提交與每個(gè)任務(wù)的運(yùn)行機(jī)制(包括線程使用蒲犬、調(diào)度)分離的方法,即Runnable代表任務(wù)岸啡,execute處理調(diào)度的邏輯暖哨;

static class ThreadPerTaskExecutor implements Executor {
    public void execute(Runnable r) {
        System.out.println("ThreadPerTaskExecutor - execute"); 
        new Thread(r).start();
    }
}

static class DirectExecutor implements Executor {
    public void execute(Runnable r) {
        System.out.println("DirectExecutor - execute");
        r.run();
    }
}

class SerialExecutor implements Executor {
    final Queue<Runnable> tasks = new ArrayDeque<Runnable>();
    final Executor executor;
    Runnable active;
    SerialExecutor(Executor executor) {
        this.executor = executor;
    }
    public synchronized void execute(final Runnable r) {
        System.out.println("task offer ... ");
        tasks.offer(new Runnable() {
            public void run() {
                System.out.println("SerialExecutor - execute ... ");
                try {
                    r.run();
                } finally {
                    scheduleNext();
                }
            }
        });
        if (active == null) {
            scheduleNext();
        }
    }
    protected synchronized void scheduleNext() {
        if ((active = tasks.poll()) != null) {
            System.out.println("schedule next task ");
            executor.execute(active);
        }
    }
}

public static void main(String[] args) {
    // SerialExecutor executor = new SerialExecutor(new ThreadPerTaskExecutor());
    SerialExecutor executor = new SerialExecutor(new DirectExecutor());
    executor.execute(() -> { System.out.println(Thread.currentThread()); });
    executor.execute(() -> { System.out.println(Thread.currentThread()); });
}

#輸出
#task offer ... 
#schedule next task 
#DirectExecutor - execute
#SerialExecutor - execute ... 
#Thread[main,5,main]
#task offer ... 
#schedule next task 
#DirectExecutor - execute
#SerialExecutor - execute ... 
#Thread[main,5,main]

官方文檔上提供的示例,演示了Executor抽象的簡(jiǎn)單使用方法凰狞,示例想體現(xiàn)的除了異步同步任務(wù)外,我覺(jué)得更加重要的是任務(wù)執(zhí)行和任務(wù)調(diào)度分離的思想沛慢;

ExecutorService

public interface ExecutorService extends Executor {
    // 啟動(dòng)有序關(guān)閉赡若,在此過(guò)程中執(zhí)行以前提交的任務(wù),但不接受任何新任務(wù)团甲。
    void shutdown();
    // 嘗試停止所有正在執(zhí)行的任務(wù)逾冬,停止等待任務(wù)的處理,并返回等待執(zhí)行的任務(wù)列表躺苦。
    List<Runnable> shutdownNow();
    // executor是否被shutdown
    boolean isShutdown();
    // 如果所有任務(wù)都完成關(guān)閉則返回true
    boolean isTerminated();
    //  阻塞直到所有任務(wù)在關(guān)閉請(qǐng)求后完成執(zhí)行身腻,或超時(shí)發(fā)生后,或當(dāng)前線程中斷后匹厘;
    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;
    // 提交任務(wù)嘀趟,返回
    <T> Future<T> submit(Callable<T> task);
    <T> Future<T> submit(Runnable task, T result);
    Future<?> submit(Runnable task);
    // 執(zhí)行一個(gè)集合的任務(wù),返回一個(gè)列表其中包含所有任務(wù)完成時(shí)的狀態(tài)和結(jié)果(可以是異常結(jié)果)愈诚。
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;
    // 執(zhí)行給定的任務(wù)她按,返回已成功完成的任務(wù)的結(jié)果(即不拋出異常),如果有的話炕柔。在正匙锰或異常返回時(shí),未完成的任務(wù)將被取消匕累。
    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

ExecutorService提供了對(duì)每個(gè)Executor跟蹤陵刹、管理一個(gè)或者多個(gè)異步任務(wù)的進(jìn)度,也可以關(guān)閉服務(wù)來(lái)終止service接收新任務(wù)和回收資源欢嘿;

void shutdownAndAwaitTermination(ExecutorService pool) {
    pool.shutdown(); // 禁止提交新任務(wù)
    try {
        //等待現(xiàn)有任務(wù)終止
        if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
            pool.shutdownNow(); // 取消當(dāng)前正在執(zhí)行的任務(wù)
            // 等待一段時(shí)間衰琐,等待任務(wù)響應(yīng)被取消
            if (!pool.awaitTermination(60, TimeUnit.SECONDS)) 
                System.err.println("Pool did not terminate");
        }
    } catch (InterruptedException ie) {
        // 如果當(dāng)前線程處于中斷狀態(tài)也糊,重試shutdown
        pool.shutdownNow();
        // 保存中斷狀態(tài)
        Thread.currentThread().interrupt();
    }
}

文檔中演示了一個(gè)終止服務(wù)的示例,分兩個(gè)階段關(guān)閉一個(gè) ExecutorService碘耳,首先需要調(diào)用shutdown來(lái)拒絕傳入的任務(wù)显设,然后調(diào)用shutdownNow(如果需要的話)來(lái)取消任何延遲的任務(wù);

CompletionService

public interface CompletionService<V> {
    Future<V> submit(Callable<V> task);
    Future<V> submit(Runnable task, V result);
    Future<V> take() throws InterruptedException;
    Future<V> poll();
    Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
}

CompletionService旨在提供一個(gè)異步任務(wù)產(chǎn)生執(zhí)行和已完成任務(wù)結(jié)果的解耦服務(wù)辛辨,即會(huì)有一個(gè)隊(duì)列儲(chǔ)存已完成的任務(wù)結(jié)果的合集捕捂,任務(wù)的提交和執(zhí)行不會(huì)阻塞獲取結(jié)果操作;

ExecutorCompletionService

public class ExecutorCompletionService<V> implements CompletionService<V> {
    private final Executor executor;
    private final AbstractExecutorService aes;
    private final BlockingQueue<Future<V>> completionQueue;
    ... 
}

ExecutorCompletionServiceCompletionService的實(shí)現(xiàn)類斗搞,內(nèi)部有個(gè)阻塞隊(duì)列儲(chǔ)存的是完成任務(wù)的結(jié)果指攒;

// 注意這個(gè)queue的默認(rèn)長(zhǎng)度為Integer.MAX_VALUE
public ExecutorCompletionService(Executor executor) {
    if (executor == null)
        throw new NullPointerException();
    this.executor = executor;
    this.aes = (executor instanceof AbstractExecutorService) ?
        (AbstractExecutorService) executor : null;
    this.completionQueue = new LinkedBlockingQueue<Future<V>>();
}

public ExecutorCompletionService(Executor executor,
                                 BlockingQueue<Future<V>> completionQueue) {
    if (executor == null || completionQueue == null)
        throw new NullPointerException();
    this.executor = executor;
    this.aes = (executor instanceof AbstractExecutorService) ?
        (AbstractExecutorService) executor : null;
    this.completionQueue = completionQueue;
}

構(gòu)造器默認(rèn)使用LinkedBlockingQueue,那么很容易就聯(lián)想到該阻塞隊(duì)列的特性:

  • 是有界的雙端隊(duì)列
  • 提供阻塞和非阻塞方法獲取已完成的任務(wù)
public Future<V> take() throws InterruptedException {
    return completionQueue.take();
}
public Future<V> poll() {
    return completionQueue.poll();
}
public Future<V> poll(long timeout, TimeUnit unit)
        throws InterruptedException {
    return completionQueue.poll(timeout, unit);
}
  • 當(dāng)完成任務(wù)的隊(duì)列滿了僻焚,新完成的任務(wù)結(jié)果會(huì)被拋棄
private class QueueingFuture extends FutureTask<Void> {
    QueueingFuture(RunnableFuture<V> task) {
        super(task, null);
        this.task = task;
    }
    // FutureTask提供的鉤子
    protected void done() { completionQueue.add(task); }
    private final Future<V> task;
}

注意這里的doneFutureTask完成或者異吃试茫或者cancel都會(huì)被調(diào)用;

public Future<V> submit(Callable<V> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<V> f = newTaskFor(task);
    executor.execute(new QueueingFuture(f));
    return f;
}
private RunnableFuture<V> newTaskFor(Callable<V> task) {
    if (aes == null)
        return new FutureTask<V>(task);
    else
        return aes.newTaskFor(task);
}

若構(gòu)造器中傳進(jìn)來(lái)的ExecutorAbstractExecutorService的子類虑啤,那么newTaskFor就會(huì)交由子類來(lái)決定FutureTask的類型隙弛,達(dá)到定制擴(kuò)展的效果;

void solve(Executor e, Collection<Callable<Result>> solvers) 
        throws InterruptedException, ExecutionException {
    CompletionService<Result> ecs = new ExecutorCompletionService<Result>(e);
    for (Callable<Result> s : solvers) 
        ecs.submit(s);
        int n = solvers.size();
        for (int i = 0; i < n; ++i) { 
            Result r = ecs.take().get(); // 這里會(huì)拋出中斷異常
            if (r != null) 
                use(r);
        } 
}

官方提供的用法示例狞山,第一個(gè)比較簡(jiǎn)單全闷,遍歷任務(wù)集合中每個(gè)任務(wù)執(zhí)行后獲取結(jié)果,是一種順序執(zhí)行的過(guò)程萍启,執(zhí)行 -> 獲取結(jié)果 -> 執(zhí)行 -> 獲取結(jié)果总珠,當(dāng)然執(zhí)行任務(wù)過(guò)程可以是異步的,如果遇到中斷異常會(huì)停止任務(wù)勘纯;

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末局服,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子驳遵,更是在濱河造成了極大的恐慌淫奔,老刑警劉巖,帶你破解...
    沈念sama閱讀 221,820評(píng)論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件超埋,死亡現(xiàn)場(chǎng)離奇詭異搏讶,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)霍殴,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,648評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門媒惕,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人来庭,你說(shuō)我怎么就攤上這事妒蔚。” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 168,324評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵肴盏,是天一觀的道長(zhǎng)科盛。 經(jīng)常有香客問(wèn)我,道長(zhǎng)菜皂,這世上最難降的妖魔是什么贞绵? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 59,714評(píng)論 1 297
  • 正文 為了忘掉前任,我火速辦了婚禮恍飘,結(jié)果婚禮上榨崩,老公的妹妹穿的比我還像新娘。我一直安慰自己章母,他們只是感情好母蛛,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,724評(píng)論 6 397
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著乳怎,像睡著了一般彩郊。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上蚪缀,一...
    開(kāi)封第一講書(shū)人閱讀 52,328評(píng)論 1 310
  • 那天秫逝,我揣著相機(jī)與錄音,去河邊找鬼询枚。 笑死筷登,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的哩盲。 我是一名探鬼主播,決...
    沈念sama閱讀 40,897評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼狈醉,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼廉油!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起苗傅,我...
    開(kāi)封第一講書(shū)人閱讀 39,804評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤抒线,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后渣慕,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體嘶炭,經(jīng)...
    沈念sama閱讀 46,345評(píng)論 1 318
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,431評(píng)論 3 340
  • 正文 我和宋清朗相戀三年逊桦,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了眨猎。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,561評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡强经,死狀恐怖睡陪,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情,我是刑警寧澤兰迫,帶...
    沈念sama閱讀 36,238評(píng)論 5 350
  • 正文 年R本政府宣布信殊,位于F島的核電站,受9級(jí)特大地震影響汁果,放射性物質(zhì)發(fā)生泄漏涡拘。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,928評(píng)論 3 334
  • 文/蒙蒙 一据德、第九天 我趴在偏房一處隱蔽的房頂上張望良拼。 院中可真熱鬧,春花似錦摇邦、人聲如沸茫虽。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 32,417評(píng)論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)仲吏。三九已至,卻和暖如春蝌焚,著一層夾襖步出監(jiān)牢的瞬間裹唆,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,528評(píng)論 1 272
  • 我被黑心中介騙來(lái)泰國(guó)打工只洒, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留许帐,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,983評(píng)論 3 376
  • 正文 我出身青樓毕谴,卻偏偏與公主長(zhǎng)得像成畦,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子涝开,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,573評(píng)論 2 359

推薦閱讀更多精彩內(nèi)容