Java-線程池實(shí)現(xiàn)線程復(fù)用和工作線程的關(guān)閉

1.ThreadPoolExecutor.execute

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

這里主要做兩件事:

  • 小于corePoolSize的時(shí)候創(chuàng)建核心線程
  • 當(dāng)前核心線程都正在執(zhí)行則入隊(duì)循签,判斷是否需要?jiǎng)?chuàng)建工作線程

2.ThreadPoolExecutor.addWorker

    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        // 判斷是否可以創(chuàng)建worker
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        // 創(chuàng)建Worker并啟動(dòng)
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

這里采用顯示鎖的方式實(shí)現(xiàn)同步創(chuàng)建Worker

3.Worker的run()

Worker其實(shí)也是一個(gè)Runnable的子類,那么從執(zhí)行到添加Worker的過(guò)程都沒(méi)有線程復(fù)用的邏輯祖今,那么該邏輯就應(yīng)該是在Worker的run方法中。
在這里可以看到耙饰,Worker的runWorker會(huì)調(diào)用ThreadPoolExecutor的runWorker方法挠阁。
而在runWorker方法中,會(huì)采用一個(gè)while循環(huán)從task任務(wù)隊(duì)列中取出待執(zhí)行的任務(wù)塑崖,如果任務(wù)是沒(méi)有被中斷的,則會(huì)調(diào)用task.run()方法執(zhí)行任務(wù)痛倚。

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

getTask()的內(nèi)部是采用一個(gè)無(wú)限for循環(huán)的方式進(jìn)行循環(huán)遍歷

    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // 判斷是否允許核心線程超時(shí)或者當(dāng)前工作線程是否大于核心線程數(shù)
            // 如果為true
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                // 如果存在的線程數(shù)大于核心線程數(shù)规婆,則從隊(duì)列中取任務(wù)時(shí)沒(méi)有任務(wù)返回null
                // 如果存在的線程數(shù)小于核心線程數(shù),則從隊(duì)列中取任務(wù)時(shí)沒(méi)有任務(wù)將阻塞隊(duì)列
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

而工作線程的關(guān)閉蝉稳,其實(shí)可以從上面兩種情況知道抒蚜,在線程通過(guò)while無(wú)限循環(huán)的時(shí)候,如果是非核心線程(即工作線程)調(diào)用BlockingQueue的poll方法耘戚,則會(huì)在隊(duì)列為空的時(shí)候返回null而不會(huì)阻塞隊(duì)列嗡髓,當(dāng)返回的task是null的時(shí)候,那么runWorker中的while循環(huán)就會(huì)退出毕莱,最終就會(huì)執(zhí)行processWorkerExit(w, completedAbruptly)而completedAbruptly=false進(jìn)行線程的刪除器贩;如果是核心線程的話,則會(huì)調(diào)用BlockingQueue的take()方法朋截,那么在隊(duì)列為空的時(shí)候就會(huì)通過(guò)take()方法調(diào)用而阻塞等待隊(duì)列結(jié)果的返回,而不會(huì)返回一個(gè)task=null吧黄,這樣runWorker中的while循環(huán)就不會(huì)退出部服,就會(huì)一直等待結(jié)果。

Runnable r = timed ?
    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
    workQueue.take();

所以工作線程會(huì)被關(guān)閉拗慨,而核心線程不會(huì)被關(guān)閉廓八,就是因?yàn)楣ぷ骶€程的run()方法會(huì)通過(guò)task=null退出循環(huán)而執(zhí)行完成run()的方法體內(nèi)容,從而自動(dòng)關(guān)閉赵抢;而核心線程因?yàn)樽枞却?duì)列返回的原因而不會(huì)導(dǎo)致while循環(huán)的結(jié)束剧蹂,所以不會(huì)關(guān)閉。

BlockingQueue的方法都是成對(duì)出現(xiàn):
add和remove:這兩個(gè)是非阻塞的烦却,當(dāng)隊(duì)列滿的時(shí)候宠叼,add會(huì)拋出異常,當(dāng)隊(duì)列為空的時(shí)候,remove會(huì)拋出異常冒冬。
offer和poll:使用offer往滿的隊(duì)列里放入元素伸蚯,會(huì)返回false;poll方法往空的隊(duì)列里拿元素简烤,會(huì)返回一個(gè)null
put和take:這是真正的阻塞方法剂邮,使用put往滿的隊(duì)列里放元素,會(huì)被阻塞横侦;使用take往空的隊(duì)列里拿方法挥萌,會(huì)被阻塞。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末枉侧,一起剝皮案震驚了整個(gè)濱河市瑞眼,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌棵逊,老刑警劉巖伤疙,帶你破解...
    沈念sama閱讀 223,126評(píng)論 6 520
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異辆影,居然都是意外死亡徒像,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 95,421評(píng)論 3 400
  • 文/潘曉璐 我一進(jìn)店門蛙讥,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)锯蛀,“玉大人,你說(shuō)我怎么就攤上這事次慢∨缘樱” “怎么了?”我有些...
    開(kāi)封第一講書人閱讀 169,941評(píng)論 0 366
  • 文/不壞的土叔 我叫張陵迫像,是天一觀的道長(zhǎng)劈愚。 經(jīng)常有香客問(wèn)我,道長(zhǎng)闻妓,這世上最難降的妖魔是什么菌羽? 我笑而不...
    開(kāi)封第一講書人閱讀 60,294評(píng)論 1 300
  • 正文 為了忘掉前任,我火速辦了婚禮由缆,結(jié)果婚禮上注祖,老公的妹妹穿的比我還像新娘。我一直安慰自己均唉,他們只是感情好是晨,可當(dāng)我...
    茶點(diǎn)故事閱讀 69,295評(píng)論 6 398
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著舔箭,像睡著了一般罩缴。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書人閱讀 52,874評(píng)論 1 314
  • 那天靴庆,我揣著相機(jī)與錄音时捌,去河邊找鬼。 笑死炉抒,一個(gè)胖子當(dāng)著我的面吹牛奢讨,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播焰薄,決...
    沈念sama閱讀 41,285評(píng)論 3 424
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼拿诸,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了塞茅?” 一聲冷哼從身側(cè)響起亩码,我...
    開(kāi)封第一講書人閱讀 40,249評(píng)論 0 277
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎野瘦,沒(méi)想到半個(gè)月后描沟,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,760評(píng)論 1 321
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡鞭光,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,840評(píng)論 3 343
  • 正文 我和宋清朗相戀三年吏廉,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片惰许。...
    茶點(diǎn)故事閱讀 40,973評(píng)論 1 354
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡席覆,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出汹买,到底是詐尸還是另有隱情佩伤,我是刑警寧澤,帶...
    沈念sama閱讀 36,631評(píng)論 5 351
  • 正文 年R本政府宣布晦毙,位于F島的核電站生巡,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏结序。R本人自食惡果不足惜障斋,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 42,315評(píng)論 3 336
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望徐鹤。 院中可真熱鬧,春花似錦邀层、人聲如沸返敬。這莊子的主人今日做“春日...
    開(kāi)封第一講書人閱讀 32,797評(píng)論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)劲赠。三九已至,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間凛澎,已是汗流浹背霹肝。 一陣腳步聲響...
    開(kāi)封第一講書人閱讀 33,926評(píng)論 1 275
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留塑煎,地道東北人沫换。 一個(gè)月前我還...
    沈念sama閱讀 49,431評(píng)論 3 379
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像最铁,于是被迫代替她去往敵國(guó)和親讯赏。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,982評(píng)論 2 361

推薦閱讀更多精彩內(nèi)容