并發(fā)專題-Executor源碼詳解

Runnable && Thread

Runnable和Thread都是java.lang包最基本的線程操作類,相當于官方的,而Executor接口及其實現都是Doug Lea寫的java.util.concurrent包下菩暗,屬于民間的惑折,當然因為太牛逼了所以也在jdk中

先看官方提供的線程操作,其中Runnable是一個函數式接口

@FunctionalInterface
public interface Runnable {
    public abstract void run();
}

可以理解為一個待執(zhí)行的函數归园,或者理解為一個任務(通過調用run方法可以實際的執(zhí)行任務)

Runable是一個定義的任務黄虱,而Thread是它的一個執(zhí)行者,它提供start方法可以開啟一個新線程執(zhí)行傳入的Runable任務庸诱,這很像命令模式捻浦,用戶通過實現Runable制定一個命令,交給Thread這個執(zhí)行者去具體執(zhí)行

Runnable && Thread

所以一般開啟新線程執(zhí)行方法的方式如下

Runnable task = () -> {
    // do something
};
new Thread(task).start(); // 開啟新線程執(zhí)行

而開啟新線程執(zhí)行方法也只有這一個途徑可走桥爽,就是必須通過官方的Thread.start方法

Executor

雖然開啟線程執(zhí)行任務只能走Thread.start方法朱灿,方法只有一個,但我們能做的是可以改變任務運行的方式钠四,比如我們可以決定什么時候執(zhí)行任務盗扒,多少個任務共用某個線程排隊工作

最具代表性的就是線程池,線程池只是修改了任務執(zhí)行的方式:即所有任務共用固定數量的線程缀去,但最終的運行終歸還是通過Thread.start方法實際在線程中執(zhí)行Runnable方法

Doug Lea所寫的java.util.concurrent.Executor把各種方式的Runnable執(zhí)行器的一個抽象

public interface Executor {
    void execute(Runnable command);
}
Executor

而且提供了一些常用的執(zhí)行器供我們使用侣灶,比如ThreadPoolExecutor(線程池),ForkJoinPool缕碎,當然我們也可以自己定義一個Executor按照自己的方式執(zhí)行任務褥影,比如netty中實現的SingleThreadEventExecutor是一種單個線程依次處理所有任務的執(zhí)行器

ExecutorService

如果說Executor是一種對按自己套路執(zhí)行任務的執(zhí)行器,是一種抽象分類阎曹,那么ExecutorService就是其下的一個子分類伪阶,它是一種特殊的執(zhí)行器煞檩,從名字直譯來看:"執(zhí)行器服務",從一個執(zhí)行器升級為執(zhí)行服務栅贴,像不像某公司從賣產品業(yè)務升級為產品安裝售后整套服務

所以ExecutorService作為一個特殊的服務類Executor斟湃,不光能按照自己的方式執(zhí)行任務,還推出了一系列附加"服務"檐薯,那就看看這種特殊的執(zhí)行器都提供了什么服務

public interface ExecutorService extends Executor {

    void shutdown();
    
    Future<?> submit(Runnable task);
    
    <T> Future<T> submit(Callable<T> task);
    
    ......
}

只貼了些重要方法凝赛,首先繼承了Executor肯定是要繼承void execute(Runnable) 方法代表它首先是一個任務執(zhí)行器

shutdown方法代表這個執(zhí)行器是有狀態(tài)的,可以關閉服務的

然后就是重量級的submit方法坛缕,這也是ExecutorService提供的最具特色的服務墓猎,如果打開ExecutorService的類,注釋第一句就寫著:

/**
 * An {@link Executor} that provides methods to manage termination and
 * methods that can produce a {@link Future} for tracking progress of
 * one or more asynchronous tasks.

翻譯過來大致就是ExecutorService是一個特殊的Executor執(zhí)行器赚楚,他可以終止服務并且可以創(chuàng)建一個Future來跟蹤任務執(zhí)行進度

ExecutorService的submit不光能接受Runnable毙沾,還可以接受一種新型任務形式:Callable,即有返回結果和異常的任務

Callable

上面我們總結Runnable是一種可執(zhí)行任務宠页,而這種任務是沒有返回結果的左胞,也不能拋出異常,很顯然現實中很多任務是需要有返回結果的举户,比如計算1+1等于幾的任務烤宙,所以為了擴展任務類型,Doug Lea又定義一種新的任務:Callable俭嘁,而ExecutorService可以接受并處理這樣的任務

@FunctionalInterface
public interface Callable<V> {
    V call() throws Exception;
}
Future

submit執(zhí)行的返回值是一個Future躺枕,從注釋看出他可以跟蹤任務的進度,它就好比任務的一個訂單供填,通過訂單可以取消任務拐云,查看任務進度等

  • boolean cancel(boolean) 取消
    取消任務,就好比在某寶買了個東西捕虽,本質就是提交一個"把東西給我送過來"的任務慨丐,而通過訂單我們就可以取消這個任務
  • isCancelled() && isDone()
    查看任務狀態(tài),是否取消和是否完成
  • V get()
    獲取任務執(zhí)行結果泄私,如果沒完成則阻塞,如果是Runnable备闲,返回的就是null
ExecutorService

AbstractExecutorService

ExecutorService制訂了一種新型執(zhí)行器晌端,它的特殊在于可以跟蹤任務進度甚至取消任務,那么如何實現吶

首先execute方法只會單純的執(zhí)行任務恬砂,按照自己執(zhí)行器的邏輯調用Thread.start方法咧纠,不會有返回值,也不支持跟蹤進度或取消

所以解決方案只有一個:調包任務泻骤,具體這樣操作:當用戶提交任務漆羔,不是直接去execute執(zhí)行梧奢,而是把任務包裝為一個新任務,新任務執(zhí)行原任務的同時演痒,還負責獲取任務結果亲轨,跟蹤任務狀態(tài)等工作,相當于是原任務的一個代理

以上即是AbstractExecutorService負責的工作鸟顺,它是ExecutorService關于任務跟蹤業(yè)務的相關實現(解決方案即是代理任務)惦蚊,繼承了AbstractExecutorService的任務執(zhí)行器既可以實現跟蹤任務的功能

AbstractExecutorService

來看一下AbstractExecutorService的submit方法

public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    // 生成一個依賴于原任務的新任務
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    // 執(zhí)行新任務
    execute(ftask);
    // 返回新任務(充當任務跟蹤器)
    return ftask;
}

其中newTaskFor方法負責生成新任務,同時也是原任務的跟蹤器(訂單)

protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
    return new FutureTask<T>(runnable, value);
}

所以整個執(zhí)行過程就是把Runnable或Callable轉換為FutureTask的過程讯嫂,而FutureTask首先是一個新任務(繼承Runnable)蹦锋,又是原任務的跟蹤器(繼承Future),這種任務歸類為RunnableFuture

public interface RunnableFuture<V> extends Runnable, Future<V> {
    void run();
}
FutureTask

接下來就看FutureTask是如何實現跟蹤替換原任務的欧芽,首先它的重點屬性如下

  • int state; 存儲原任務的執(zhí)行狀態(tài)
  • Callable<V> callable; 原任務Runnable也可以適配為返回null的Callable
  • Object outcome; 原任務的返回結果

再看一下重點方法

1.初始化莉掂,存儲原任務,任務狀態(tài)為NEW

public FutureTask(Callable<V> callable) {
    if (callable == null)
        throw new NullPointerException();
    this.callable = callable;
    this.state = NEW;       // ensure visibility of callable
}

2.run()千扔,真實的被執(zhí)行任務巫湘,加入狀態(tài)判斷,執(zhí)行原任務通過try-catch獲取異常昏鹃,通過outcome保存結果

public void run() {
    // 如果任務不是新狀態(tài)尚氛,直接返回
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        // 獲取原任務
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                // 執(zhí)行原任務
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                // 有異常設置異常
                setException(ex);
            }
            if (ran)
                // 設置outcome存儲返回值
                set(result);
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

3.get(),獲取結果洞渤,如果狀態(tài)未完成阻塞等待阅嘶,完成則返回outcome

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    // 如果任務未完成,阻塞等待
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);
    // 任務完成载迄,返回結果    
    return report(s);
}

private V report(int s) throws ExecutionException {
    Object x = outcome;
    if (s == NORMAL)
        return (V)x; // 返回結果即outcome
    if (s >= CANCELLED)
        throw new CancellationException();
    throw new ExecutionException((Throwable)x);
}

4.cancel()讯柔,只要把狀態(tài)設置為CANCEL,run時就會直接return而不會執(zhí)行原任務

public boolean cancel(boolean mayInterruptIfRunning) {
    // 狀態(tài)變?yōu)镃ANCELLED
    if (!(state == NEW &&
          UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
              mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
        return false;
    // 如果正在運行調用interrupt阻斷正在執(zhí)行的任務
    try {    // in case call to interrupt throws exception
        if (mayInterruptIfRunning) {
            try {
                Thread t = runner;
                if (t != null)
                    t.interrupt();
            } finally { // final state
                UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
            }
        }
    } finally {
        finishCompletion();
    }
    return true;
}
總結

AbstractExecutorService只是實現了讓任務變得可跟蹤护昧,通過調包任務魂迄,而具體任務的執(zhí)行最終依然會調用execute,這個方法AbstractExecutorService并沒有實現惋耙,因為這也是所有執(zhí)行器的差異所在捣炬,即按自己的方式選擇線程執(zhí)行任務, 比如ThreadPoolExecutor線程池的固定線程數執(zhí)行所有任務的模式绽榛,也就是只需要實現execute方法湿酸,而submit則交給父類AbstractExecutorService處理

?著作權歸作者所有,轉載或內容合作請聯系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市灭美,隨后出現的幾起案子推溃,更是在濱河造成了極大的恐慌,老刑警劉巖届腐,帶你破解...
    沈念sama閱讀 219,366評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件铁坎,死亡現場離奇詭異蜂奸,居然都是意外死亡,警方通過查閱死者的電腦和手機硬萍,發(fā)現死者居然都...
    沈念sama閱讀 93,521評論 3 395
  • 文/潘曉璐 我一進店門扩所,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人襟铭,你說我怎么就攤上這事碌奉。” “怎么了寒砖?”我有些...
    開封第一講書人閱讀 165,689評論 0 356
  • 文/不壞的土叔 我叫張陵赐劣,是天一觀的道長。 經常有香客問我哩都,道長魁兼,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,925評論 1 295
  • 正文 為了忘掉前任漠嵌,我火速辦了婚禮咐汞,結果婚禮上,老公的妹妹穿的比我還像新娘儒鹿。我一直安慰自己化撕,他們只是感情好,可當我...
    茶點故事閱讀 67,942評論 6 392
  • 文/花漫 我一把揭開白布约炎。 她就那樣靜靜地躺著植阴,像睡著了一般。 火紅的嫁衣襯著肌膚如雪圾浅。 梳的紋絲不亂的頭發(fā)上掠手,一...
    開封第一講書人閱讀 51,727評論 1 305
  • 那天,我揣著相機與錄音狸捕,去河邊找鬼喷鸽。 笑死,一個胖子當著我的面吹牛灸拍,可吹牛的內容都是我干的做祝。 我是一名探鬼主播,決...
    沈念sama閱讀 40,447評論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼株搔,長吁一口氣:“原來是場噩夢啊……” “哼剖淀!你這毒婦竟也來了?” 一聲冷哼從身側響起纤房,我...
    開封第一講書人閱讀 39,349評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎翻诉,沒想到半個月后炮姨,有當地人在樹林里發(fā)現了一具尸體捌刮,經...
    沈念sama閱讀 45,820評論 1 317
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,990評論 3 337
  • 正文 我和宋清朗相戀三年舒岸,在試婚紗的時候發(fā)現自己被綠了绅作。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,127評論 1 351
  • 序言:一個原本活蹦亂跳的男人離奇死亡蛾派,死狀恐怖俄认,靈堂內的尸體忽然破棺而出,到底是詐尸還是另有隱情洪乍,我是刑警寧澤眯杏,帶...
    沈念sama閱讀 35,812評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站壳澳,受9級特大地震影響岂贩,放射性物質發(fā)生泄漏。R本人自食惡果不足惜巷波,卻給世界環(huán)境...
    茶點故事閱讀 41,471評論 3 331
  • 文/蒙蒙 一萎津、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧抹镊,春花似錦锉屈、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,017評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至氨菇,卻和暖如春儡炼,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背查蓉。 一陣腳步聲響...
    開封第一講書人閱讀 33,142評論 1 272
  • 我被黑心中介騙來泰國打工乌询, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人豌研。 一個月前我還...
    沈念sama閱讀 48,388評論 3 373
  • 正文 我出身青樓妹田,卻偏偏與公主長得像,于是被迫代替她去往敵國和親鹃共。 傳聞我的和親對象是個殘疾皇子鬼佣,可洞房花燭夜當晚...
    茶點故事閱讀 45,066評論 2 355

推薦閱讀更多精彩內容