線程池續(xù):你必須要知道的線程池submit()實(shí)現(xiàn)原理之FutureTask骗村!

FutureTask思維導(dǎo)圖.png

前言

上一篇內(nèi)容寫了Java中線程池的實(shí)現(xiàn)原理及源碼分析嫌褪,說好的是實(shí)實(shí)在在的大滿足,想通過一篇文章讓大家對(duì)線程池有個(gè)透徹的了解,但是文章寫完總覺得還缺點(diǎn)什么?

上篇文章只提到線程提交的execute()方法,并沒有講解線程提交的submit()方法芒珠,submit()有一個(gè)返回值,可以獲取線程執(zhí)行的結(jié)果Future<T>缨伊,這一講就那深入學(xué)習(xí)下submit()FutureTask實(shí)現(xiàn)原理。

使用場(chǎng)景&示例

使用場(chǎng)景

我能想到的使用場(chǎng)景就是在并行計(jì)算的時(shí)候进宝,例如一個(gè)方法中調(diào)用methodA()刻坊、methodB(),我們可以通過線程池異步去提交方法A党晋、B谭胚,然后在主線程中獲取組裝方法A徐块、B計(jì)算后的結(jié)果,能夠大大提升方法的吞吐量灾而。

使用示例

/**
 * @author wangmeng
 * @date 2020/5/28 15:30
 */
public class FutureTaskTest {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService threadPool = Executors.newCachedThreadPool();

        System.out.println("====執(zhí)行FutureTask線程任務(wù)====");
        Future<String> futureTask = threadPool.submit(new Callable<String>() {
            @Override
            public String call() throws Exception {
                System.out.println("FutureTask執(zhí)行業(yè)務(wù)邏輯");
                Thread.sleep(2000);
                System.out.println("FutureTask業(yè)務(wù)邏輯執(zhí)行完畢胡控!");
                return "歡迎關(guān)注: 一枝花算不算浪漫!";
            }
        });

        System.out.println("====執(zhí)行主線程任務(wù)====");
        Thread.sleep(1000);
        boolean flag = true;
        while(flag){
            if(futureTask.isDone() && !futureTask.isCancelled()){
                System.out.println("FutureTask異步任務(wù)執(zhí)行結(jié)果:" + futureTask.get());
                flag = false;
            }
        }

        threadPool.shutdown();
    }
}

上面的使用很簡(jiǎn)單绰疤,submit()內(nèi)部傳遞的實(shí)際上是個(gè)Callable接口铜犬,我們自己實(shí)現(xiàn)其中的call()方法舞终,我們通過futureTask既可以獲取到具體的返回值轻庆。

submit()實(shí)現(xiàn)原理

submit() 是也是提交任務(wù)到線程池,只是它可以獲取任務(wù)返回結(jié)果敛劝,返回結(jié)果是通過FutureTask來實(shí)現(xiàn)的余爆,先看下ThreadPoolExecutor中代碼實(shí)現(xiàn):

public class ThreadPoolExecutor extends AbstractExecutorService {
    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }
}

public abstract class AbstractExecutorService implements ExecutorService {
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }
}

提交任務(wù)還是執(zhí)行execute()方法,只是task被包裝成了FutureTask 夸盟,也就是在excute()中啟動(dòng)線程后會(huì)執(zhí)行FutureTask.run()方法蛾方。

再來具體看下它執(zhí)行的完整鏈路圖:

submit()全鏈路圖.png

上圖可以看到,執(zhí)行任務(wù)并返回執(zhí)行結(jié)果的核心邏輯實(shí)在FutureTask中上陕,我們以FutureTask.run/get 兩個(gè)方法為突破口桩砰,一點(diǎn)點(diǎn)剖析FutureTask的實(shí)現(xiàn)原理。

FutureTask源碼初探

先看下FutureTask中部分屬性:

FutureTask屬性.png
public class FutureTask<V> implements RunnableFuture<V> {
    private volatile int state;
    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;

    private Callable<V> callable;
    private Object outcome;
    private volatile Thread runner;
    private volatile WaitNode waiters;
}
  1. state

當(dāng)前task狀態(tài)释簿,共有7中類型亚隅。
NEW: 當(dāng)前任務(wù)尚未執(zhí)行
COMPLETING: 當(dāng)前任務(wù)正在結(jié)束,尚未完全結(jié)束庶溶,一種臨界狀態(tài)
NORMAL:當(dāng)前任務(wù)正常結(jié)束
EXCEPTIONAL: 當(dāng)前任務(wù)執(zhí)行過程中發(fā)生了異常煮纵。
CANCELLED: 當(dāng)前任務(wù)被取消
INTERRUPTING: 當(dāng)前任務(wù)中斷中..
INTERRUPTED: 當(dāng)前任務(wù)已中斷

  1. callble

用戶提交任務(wù)傳遞的Callable,自定義call方法偏螺,實(shí)現(xiàn)業(yè)務(wù)邏輯

  1. outcome

任務(wù)結(jié)束時(shí)行疏,outcome保存執(zhí)行結(jié)果或者異常信息。

  1. runner

當(dāng)前任務(wù)被線程執(zhí)行期間套像,保存當(dāng)前任務(wù)的線程對(duì)象引用

  1. waiters

因?yàn)闀?huì)有很多線程去get當(dāng)前任務(wù)的結(jié)果酿联,所以這里使用了一種stack數(shù)據(jù)結(jié)構(gòu)來保存

FutureTask.run()實(shí)現(xiàn)原理

我們已經(jīng)知道在線程池runWorker()中最終會(huì)調(diào)用到FutureTask.run()方法中,我們就來看下它的執(zhí)行原理吧:

run()執(zhí)行邏輯.png

具體代碼如下:

public class FutureTask<V> implements RunnableFuture<V> {
    public void run() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {
            runner = null;
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }
}

首先是判斷FutureTaskstate狀態(tài)夺巩,必須是NEW才可以繼續(xù)執(zhí)行货葬。

然后通過CAS修改runner引用為當(dāng)前線程。

接著執(zhí)行用戶自定義的call()方法劲够,將返回結(jié)果設(shè)置到result中震桶,result可能為正常返回也可能為異常信息。這里主要是調(diào)用set()/setException()

FutureTask.set()實(shí)現(xiàn)原理

set()方法的實(shí)現(xiàn)很簡(jiǎn)單征绎,直接看下代碼:

public class FutureTask<V> implements RunnableFuture<V> {
    protected void set(V v) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL);
            finishCompletion();
        }
    }
}

call()返回的數(shù)據(jù)賦值給全局變量outcome上蹲姐,然后修改state狀態(tài)為NORMAL磨取,最后調(diào)用finishCompletion()來做掛起線程的喚醒操作,這個(gè)方法等到get()后面再來講解柴墩。

FutureTask.get()實(shí)現(xiàn)原理

get().png

接著看下代碼:

public class FutureTask<V> implements RunnableFuture<V> {
    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }
}

如果FutureTaskstateNORMAL或者COMPLETING忙厌,說明當(dāng)前任務(wù)并沒有執(zhí)行完成,調(diào)用get()方法會(huì)被阻塞江咳,具體的阻塞邏輯在awaitDone()方法:

private int awaitDone(boolean timed, long nanos) throws InterruptedException {

        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }

            int s = state;
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            }
            else if (s == COMPLETING)
                Thread.yield();
            else if (q == null)
                q = new WaitNode();
            else if (!queued)
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q);
            else if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                LockSupport.parkNanos(this, nanos);
            }
            else
                LockSupport.park(this);
        }
    }

這個(gè)方法可以說是FutureTask中最核心的方法了逢净,一步步來分析:

如果timed不為空,這說明指定nanos時(shí)間還未返回結(jié)果歼指,線程就會(huì)退出爹土。

q是一個(gè)WaitNode對(duì)象,是將當(dāng)前引用線程封裝在一個(gè)stack數(shù)據(jù)結(jié)構(gòu)中踩身,WaitNode對(duì)象屬性如下:

 static final class WaitNode {
    volatile Thread thread;
    volatile WaitNode next;
    WaitNode() { thread = Thread.currentThread(); }
}

接著判斷當(dāng)前線程是否中斷胀茵,如果中斷則拋出中斷異常。

下面就進(jìn)入一輪輪的if... else if...判斷邏輯挟阻,我們還是采用分支的方式去分析琼娘。

分支一:if (s > COMPLETING) {

此時(shí)get()方法已經(jīng)有結(jié)果了,無論是正常返回的結(jié)果附鸽,還是異常脱拼、中斷、取消等坷备,此時(shí)直接返回state狀態(tài)熄浓,然后執(zhí)行report()方法。

分支二:else if (s == COMPLETING)

條件成立击你,說明當(dāng)前任務(wù)接近完成狀態(tài)玉组,這里讓當(dāng)前線程再釋放cpu,進(jìn)行下一輪搶占cpu丁侄。

分支三:else if (q == null)

第一次自旋執(zhí)行惯雳,WaitNode還沒有初始化,初始化q=new WaitNode();

分支四:else if (!queued){

queued代表當(dāng)前線程是否入棧鸿摇,如果沒有入棧則進(jìn)行入棧操作石景,順便將全局變量waiters指向棧頂元素。

分支五/六:LockSupport.park

如果設(shè)置了超時(shí)時(shí)間拙吉,則使用parkNanos來掛起當(dāng)前線程潮孽,否則使用park()

經(jīng)過這么一輪自旋循環(huán)后,如果執(zhí)行call()還沒有返回結(jié)果筷黔,那么調(diào)用get()方法的線程都會(huì)被掛起往史。

被掛起的線程會(huì)等待run()返回結(jié)果后依次喚醒,具體的執(zhí)行邏輯在finishCompletion()中佛舱。

最終stack結(jié)構(gòu)中數(shù)據(jù)如下:

WaitNode.png

FutureTask.finishCompletion()實(shí)現(xiàn)原理

finishCompletion().png

具體實(shí)現(xiàn)代碼如下:

private void finishCompletion() {
    for (WaitNode q; (q = waiters) != null;) {
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
            for (;;) {
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null;
                    LockSupport.unpark(t);
                }
                WaitNode next = q.next;
                if (next == null)
                    break;
                q.next = null;
                q = next;
            }
            break;
        }
    }

    done();

    callable = null;
}

代碼實(shí)現(xiàn)很簡(jiǎn)單椎例,看過get()方法后挨决,我們知道所有調(diào)用get()方法的線程,在run()還沒有返回結(jié)果前订歪,都會(huì)保存到一個(gè)有WaitNode構(gòu)成的statck數(shù)據(jù)結(jié)構(gòu)中脖祈,而且每個(gè)線程都會(huì)被掛起。

這里是遍歷waiters棧頂元素刷晋,然后依次查詢起next節(jié)點(diǎn)進(jìn)行喚醒盖高,喚醒后的節(jié)點(diǎn)接著會(huì)往后調(diào)用report()方法。

FutureTask.report()實(shí)現(xiàn)原理

report().png

具體代碼如下:

private V report(int s) throws ExecutionException {
    Object x = outcome;
    if (s == NORMAL)
        return (V)x;
    if (s >= CANCELLED)
        throw new CancellationException();
    throw new ExecutionException((Throwable)x);
}

這個(gè)方法很簡(jiǎn)單眼虱,因?yàn)閳?zhí)行到了這里喻奥,說明當(dāng)前state狀態(tài)肯定大于COMPLETING,判斷如果是正常返回蒙幻,那么返回outcome數(shù)據(jù)映凳。

如果state是取消狀態(tài)胆筒,拋出CancellationException異常邮破。

如果狀態(tài)都不滿足,則說明執(zhí)行中出現(xiàn)了差錯(cuò)仆救,直接拋出ExecutionException

FutureTask.cancel()實(shí)現(xiàn)原理

cancel().png
public boolean cancel(boolean mayInterruptIfRunning) {
    if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
        return false;
    try {
        if (mayInterruptIfRunning) {
            try {
                Thread t = runner;
                if (t != null)
                    t.interrupt();
            } finally {
                UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
            }
        }
    } finally {
        finishCompletion();
    }
    return true;
}

cancel()方法的邏輯很簡(jiǎn)單抒和,就是修改state狀態(tài)為CANCELLED,然后調(diào)用finishCompletion()來喚醒等待的線程彤蔽。

這里如果mayInterruptIfRunning摧莽,就會(huì)先中斷當(dāng)前線程,然后再去喚醒等待的線程顿痪。

總結(jié)

FutureTask的實(shí)現(xiàn)原理其實(shí)很簡(jiǎn)單镊辕,每個(gè)方法基本上都畫了一個(gè)簡(jiǎn)單的流程圖來方便立即。

后面還打算分享一個(gè)BlockingQueue相關(guān)的源碼解讀蚁袭,這樣線程池也可以算是完結(jié)了征懈。

在這之前可能會(huì)先分享一個(gè)SpringCloud常見配置代碼分析、最佳實(shí)踐等手冊(cè)揩悄,方便工作中使用卖哎,也是對(duì)之前看過的源碼一種總結(jié)。敬請(qǐng)期待删性!
歡迎關(guān)注:

原創(chuàng)干貨分享.png

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末亏娜,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子蹬挺,更是在濱河造成了極大的恐慌维贺,老刑警劉巖,帶你破解...
    沈念sama閱讀 216,843評(píng)論 6 502
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件巴帮,死亡現(xiàn)場(chǎng)離奇詭異溯泣,居然都是意外死亡群发,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,538評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門发乔,熙熙樓的掌柜王于貴愁眉苦臉地迎上來熟妓,“玉大人,你說我怎么就攤上這事栏尚∑鹩” “怎么了?”我有些...
    開封第一講書人閱讀 163,187評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵译仗,是天一觀的道長(zhǎng)抬虽。 經(jīng)常有香客問我,道長(zhǎng)纵菌,這世上最難降的妖魔是什么阐污? 我笑而不...
    開封第一講書人閱讀 58,264評(píng)論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮咱圆,結(jié)果婚禮上笛辟,老公的妹妹穿的比我還像新娘。我一直安慰自己序苏,他們只是感情好手幢,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,289評(píng)論 6 390
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著忱详,像睡著了一般围来。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上匈睁,一...
    開封第一講書人閱讀 51,231評(píng)論 1 299
  • 那天监透,我揣著相機(jī)與錄音,去河邊找鬼航唆。 笑死胀蛮,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的佛点。 我是一名探鬼主播醇滥,決...
    沈念sama閱讀 40,116評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼超营!你這毒婦竟也來了鸳玩?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 38,945評(píng)論 0 275
  • 序言:老撾萬榮一對(duì)情侶失蹤演闭,失蹤者是張志新(化名)和其女友劉穎不跟,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體米碰,經(jīng)...
    沈念sama閱讀 45,367評(píng)論 1 313
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡窝革,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,581評(píng)論 2 333
  • 正文 我和宋清朗相戀三年购城,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片虐译。...
    茶點(diǎn)故事閱讀 39,754評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡瘪板,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出漆诽,到底是詐尸還是另有隱情侮攀,我是刑警寧澤,帶...
    沈念sama閱讀 35,458評(píng)論 5 344
  • 正文 年R本政府宣布厢拭,位于F島的核電站兰英,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏供鸠。R本人自食惡果不足惜畦贸,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,068評(píng)論 3 327
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望楞捂。 院中可真熱鬧薄坏,春花似錦、人聲如沸泡一。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,692評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽鼻忠。三九已至,卻和暖如春杈绸,著一層夾襖步出監(jiān)牢的瞬間帖蔓,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,842評(píng)論 1 269
  • 我被黑心中介騙來泰國打工瞳脓, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留塑娇,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 47,797評(píng)論 2 369
  • 正文 我出身青樓劫侧,卻偏偏與公主長(zhǎng)得像埋酬,于是被迫代替她去往敵國和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子烧栋,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,654評(píng)論 2 354