溯本求源: JAVA線程池工作原理

1. 前言

線程池是JAVA開(kāi)發(fā)中最常使用的池化技術(shù)之一,可以減少線程資源的重復(fù)創(chuàng)建與銷毀造成的開(kāi)銷。

2. 靈魂拷問(wèn):怎么做到線程重復(fù)利用?

很多同學(xué)會(huì)聯(lián)想到連接池胸遇,理所當(dāng)然的說(shuō):需要的時(shí)候從池中取出線程肮帐,執(zhí)行完任務(wù)再放回去。

如何用代碼實(shí)現(xiàn)呢?

此時(shí)就會(huì)發(fā)現(xiàn)番捂,調(diào)用線程的start方法后,生命周期就不由父線程直接控制了。線程的run方法執(zhí)行完成就銷毀了拴曲,所謂的“取出”和“放回”只不過(guò)是想當(dāng)然的操作。

這里先說(shuō)答案:生產(chǎn)者消費(fèi)者模型

3. ThreadPoolExecutor的實(shí)現(xiàn)

image

3.1 結(jié)構(gòu)

首先看下ThreadPoolExecutor的繼承結(jié)構(gòu)

頂級(jí)接口是Executor凛忿,定義execute方法

ExecutorService添加了submit方法澈灼,支持返回future獲取執(zhí)行結(jié)果,以及線程池運(yùn)行狀態(tài)的相關(guān)方法

本文著重講線程池的執(zhí)行流程店溢,因此將暫時(shí)忽略線程池的狀態(tài)相關(guān)的代碼叁熔,也建議新手看源碼時(shí)從核心流程看起。

3.2 核心方法:execute()

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        // 判斷是否小于核心線程數(shù)
        if (workerCountOf(c) < corePoolSize) {
            //添加worker床牧,添加成功則退出
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        // 核心線程數(shù)已用完則放入隊(duì)列
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            // 雙重檢查荣回,避免入隊(duì)完成后,所有線程已銷毀戈咳,導(dǎo)致沒(méi)有消費(fèi)者消費(fèi)當(dāng)前任務(wù)
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        // 隊(duì)列已滿則開(kāi)啟非核心線程心软,達(dá)到最大線程數(shù)則使用拒絕策略
        else if (!addWorker(command, false))
            reject(command);
    }

execute方法就是一個(gè)生產(chǎn)的過(guò)程壕吹,主要分為開(kāi)啟線程和入隊(duì)

開(kāi)啟線程會(huì)傳入command(即當(dāng)前任務(wù)),開(kāi)啟的線程會(huì)立即消費(fèi)該任務(wù)

入隊(duì)的任務(wù)則會(huì)由Worker消費(fèi)

主要關(guān)注corePoolSize删铃,maximumPoolSize耳贬,queueSize(任務(wù)隊(duì)列長(zhǎng)度),workerCount(當(dāng)前worker數(shù)量)這幾個(gè)參數(shù)猎唁,可以總結(jié)為以下:

已滿 未滿 操作
corePoolSize 開(kāi)啟核心線程
corePoolSize queueSize 入隊(duì)
queueSize maximumPoolSize 開(kāi)啟非核心線程
maximumPoolSize 拒絕

3.3 消費(fèi)者:Worker

image

Worker類實(shí)現(xiàn)Runnable接口咒劲,繼承AQS,主要先關(guān)注thread和firstTask兩個(gè)屬性和run方法

Worker(Runnable firstTask) {
    setState(-1);
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);
}

從Worker的構(gòu)造方法可以看出诫隅,thread就是線程池中真正消費(fèi)任務(wù)的線程腐魂,創(chuàng)建時(shí)會(huì)傳入this(即Worker對(duì)象),而Worker實(shí)現(xiàn)了Runnable逐纬,因此線程運(yùn)行時(shí)就是執(zhí)行了Worker的run方法蛔屹。

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock();
    boolean completedAbruptly = true;
    try {
        // getTask會(huì)阻塞,因此不會(huì)造成cpu飆高
        while (task != null || (task = getTask()) != null) {
            // ···
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    // 執(zhí)行傳入的Runnable
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                // 修改為null豁生,否則下次循環(huán)不會(huì)調(diào)用getTask
                task = null;
                // ···
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

再來(lái)看run方法判导,直接調(diào)用了ThreadPoolExecutor的runWorker方法,runWorker中有一個(gè)while循環(huán)沛硅,循環(huán)體執(zhí)行了task.run方法

task首先會(huì)獲取Worker中的firstTask屬性眼刃,并將其置為null,因此firstTask只會(huì)執(zhí)行一次摇肌,后續(xù)task將通過(guò)getTask方法獲取擂红。

因此Worker的工作流程可以概括為:消費(fèi)完Worker的firstTask后,循環(huán)執(zhí)行g(shù)etTask獲取任務(wù)并消費(fèi)围小,獲取不到task時(shí)昵骤,就退出循環(huán),線程銷毀肯适。

此處便可以看出生產(chǎn)者消費(fèi)者模型了变秦。

private Runnable getTask() {
    boolean timedOut = false;
    for (;;) {
        int c = ctl.get();
        // ···
        int wc = workerCountOf(c);
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            // 嘗試減少計(jì)數(shù),失敗則會(huì)continue循環(huán)重試
            if (compareAndDecrementWorkerCount(c))
                // 此處返回null框舔,runWorker將退出循環(huán)
                return null;
            continue;
        }
        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

runWorker方法退出循環(huán)的條件是getTask返回null蹦玫。

觀察getTask,只有同時(shí)滿足以下情況時(shí)才會(huì)返回null

條件 解讀
1 wc > maximumPoolSize || (timed && timedOut) workQueue.poll方法超時(shí)
2 wc > 1 || workQueue.isEmpty() 隊(duì)列任務(wù)全部執(zhí)行完
3 compareAndDecrementWorkerCount(c) cas減少workerCount成功

返回的task是通過(guò)workQueue.poll和workQueue.take得到的

兩者執(zhí)行時(shí)線程均會(huì)掛起刘绣,直至workQueue中有新的任務(wù)

不同之處在于poll方法阻塞keepAliveTime時(shí)間后會(huì)自動(dòng)喚醒并返回null樱溉,此時(shí)timeOut置為true,即滿足條件1纬凤,隨后繼續(xù)循環(huán)福贞,重復(fù)檢查是否大于核心線程數(shù)且隊(duì)列為空,是則嘗試減少workerCount并退出循環(huán)

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        // ···
        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();
            // ···
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            // ···
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        // ···
    }
    return workerStarted;
}

了解了Worker之后停士,再來(lái)看execute中調(diào)用的addWorker方法

方法有兩個(gè)參數(shù)挖帘,firstTask即為Worker的firstTask完丽,core則標(biāo)記需要新增的是否是核心線程

retry循環(huán)與線程池狀態(tài)相關(guān),內(nèi)層for循環(huán)則是重復(fù)嘗試cas增加線程拇舀,若大于corePoolSize或者maximumPoolSize則新增失敗逻族,cas成功后,new一個(gè)Worker并start

3.4 總結(jié)

image

回到最初的問(wèn)題你稚,線程是如何做到重復(fù)利用的?

并不存在取出線程使用完再歸還的操作朱躺,線程啟動(dòng)后進(jìn)入循環(huán)刁赖,主動(dòng)獲取任務(wù)執(zhí)行,退出循環(huán)則線程銷毀长搀。

execute方法控制新增Worker和任務(wù)入隊(duì)

附:手寫(xiě)簡(jiǎn)易線程池

public class MyThreadPool implements Executor {

    private int corePoolSize;
    private int maximumPoolSize;
    private BlockingQueue<Runnable> queue;
    //記錄當(dāng)前工作線程數(shù)
    private AtomicInteger count;
    private long keepAliveTime;
    private RejectHandler rejectHandler;

    public MyThreadPool(int corePoolSize, int maximumPoolSize, BlockingQueue<Runnable> queue, long keepAliveTime, RejectHandler rejectHandler) {
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.queue = queue;
        this.keepAliveTime = keepAliveTime;
        this.rejectHandler = rejectHandler;
        count = new AtomicInteger(0);
    }

    @Override
    public void execute(Runnable task) {
        int ct = count.get();
        //核心線程數(shù)未滿宇弛,嘗試增加核心線程
        if (ct < corePoolSize && count.compareAndSet(ct, ct + 1)) {
            new Worker(task).start();
            return;
        }
        //入隊(duì)
        if (queue.offer(task)) {
            return;
        }
        //重新獲取一遍count,否則如果在core分支cas失敗源请,此處必然也失敗
        ct = count.get();
        //隊(duì)列已滿枪芒,嘗試增加非核心線程
        if (ct < maximumPoolSize && count.compareAndSet(ct, ct + 1)) {
            new Worker(task).start();
            return;
        }
        //已達(dá)最大線程數(shù),拒絕
        rejectHandler.reject(task);

    }

    class Worker extends Thread {

        Runnable firstTask;

        public Worker(Runnable firstTask) {
            this.firstTask = firstTask;
        }

        @Override
        public void run() {
            Runnable task = firstTask;
            firstTask = null;
            while (true) {
                try {
                    //getTask會(huì)阻塞
                    if (task != null || (task = getTask()) != null) {
                        task.run();
                    } else {
                        //getTask超時(shí)才會(huì)進(jìn)入谁尸,直接退出舅踪,線程銷毀
                        break;
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    //置空,否則不能getTask
                    task = null;
                }
            }
        }

    }

    Runnable getTask() throws InterruptedException {
        //標(biāo)記是否超時(shí)過(guò)
        boolean timedOut = false;
        while (true) {
            int ct = count.get();
            //超出核心線程數(shù)才進(jìn)入超時(shí)邏輯良蛮,即使timeOut由于線程poll超時(shí)過(guò)一次變成true抽碌,執(zhí)行到這里如果不超出corePoolSize,可以再次進(jìn)入take分支
            if (ct > corePoolSize) {
                //超出核心線程數(shù)
                if (timedOut) {
                    //已超時(shí)過(guò)决瞳,嘗試減少工作線程數(shù)货徙,失敗會(huì)continue,然后重新比較corePoolSize皮胡,重試減少線程數(shù)
                    if (count.compareAndSet(ct, ct - 1)) {
                        return null;
                    } else {
                        continue;
                    }
                }
                Runnable task = queue.poll(keepAliveTime, TimeUnit.MILLISECONDS);
                if (task == null) {
                    //poll超時(shí)才進(jìn)入
                    timedOut = true;
                    continue;
                }
                return task;
            } else {
                //必然能獲取到task
                return queue.take();
            }
        }
    }

    public static interface RejectHandler {

        void reject(Runnable r);

    }


    public static void main(String[] args) {
        MyThreadPool pool = new MyThreadPool(2, 5, new LinkedBlockingQueue<>(100), 2000, r -> {
            System.out.println(r + ": reject");
        });
        for (int i = 0; i < 3; i++) {
            final int x = i;
            new Thread(() -> {
                for (int j = 0; j < 5; j++) {
                    final int y = j;
                    pool.execute(() -> {
                        try {
                            Thread.sleep(3000L);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        LocalDateTime now = LocalDateTime.now();
                        System.out.println(String.format("線程i=%s, j=%s,執(zhí)行結(jié)束: %s", x, y, now.format(DateTimeFormatter.ISO_DATE_TIME)));
                    });
                }
            }).start();

        }

    }

}

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末痴颊,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子屡贺,更是在濱河造成了極大的恐慌蠢棱,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,755評(píng)論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件甩栈,死亡現(xiàn)場(chǎng)離奇詭異裳扯,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)谤职,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,305評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門(mén)饰豺,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人允蜈,你說(shuō)我怎么就攤上這事冤吨≥锪” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 165,138評(píng)論 0 355
  • 文/不壞的土叔 我叫張陵漩蟆,是天一觀的道長(zhǎng)垒探。 經(jīng)常有香客問(wèn)我,道長(zhǎng)怠李,這世上最難降的妖魔是什么圾叼? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,791評(píng)論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮捺癞,結(jié)果婚禮上夷蚊,老公的妹妹穿的比我還像新娘。我一直安慰自己髓介,他們只是感情好惕鼓,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,794評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著唐础,像睡著了一般箱歧。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上一膨,一...
    開(kāi)封第一講書(shū)人閱讀 51,631評(píng)論 1 305
  • 那天呀邢,我揣著相機(jī)與錄音,去河邊找鬼豹绪。 笑死驼鹅,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的森篷。 我是一名探鬼主播输钩,決...
    沈念sama閱讀 40,362評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼仲智!你這毒婦竟也來(lái)了买乃?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 39,264評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤钓辆,失蹤者是張志新(化名)和其女友劉穎剪验,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體前联,經(jīng)...
    沈念sama閱讀 45,724評(píng)論 1 315
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡功戚,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,900評(píng)論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了似嗤。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片啸臀。...
    茶點(diǎn)故事閱讀 40,040評(píng)論 1 350
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖烁落,靈堂內(nèi)的尸體忽然破棺而出乘粒,到底是詐尸還是另有隱情豌注,我是刑警寧澤,帶...
    沈念sama閱讀 35,742評(píng)論 5 346
  • 正文 年R本政府宣布灯萍,位于F島的核電站轧铁,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏旦棉。R本人自食惡果不足惜齿风,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,364評(píng)論 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望绑洛。 院中可真熱鬧救斑,春花似錦、人聲如沸诊笤。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,944評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)讨跟。三九已至,卻和暖如春鄙煤,著一層夾襖步出監(jiān)牢的瞬間晾匠,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,060評(píng)論 1 270
  • 我被黑心中介騙來(lái)泰國(guó)打工梯刚, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留凉馆,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,247評(píng)論 3 371
  • 正文 我出身青樓亡资,卻偏偏與公主長(zhǎng)得像澜共,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子锥腻,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,979評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容