ThreadPoolExecutor中的Worker

首先看ThreadPoolExecutor的核心方法execute

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {//workerCountOf(c) 就是當(dāng)前線程數(shù)
            if (addWorker(command, true))//步驟1
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {//步驟2
            int recheck = ctl.get();
            if (!isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);//這里注意為傳入的task為null
        }
        else if (!addWorker(command, false))//步驟3
            reject(command);
    }

這個(gè)方法很簡明暑椰,就是線程池的基本原理:
1.線程池?cái)?shù)量小于核心線程池?cái)?shù)量,則通過addWorker(稍后分析該方法)將任務(wù)加入線程池中荐绝,如果成功則返回干茉。
2.如果步驟1返回失敗,則看線程池是否在running狀態(tài)很泊,如果在則把任務(wù)送進(jìn)等待隊(duì)列角虫。如果這一步成功,再檢查一次線程池狀態(tài)委造,如果線程池不是running狀態(tài)并且當(dāng)前任務(wù)從隊(duì)列移除成功戳鹅,則執(zhí)行拒絕策略,否則如果worker數(shù)量等于0的話昏兆,則相當(dāng)于新建一個(gè)線程枫虏。如果沒有這個(gè)調(diào)用,當(dāng)你把coreSize設(shè)置為0時(shí)爬虱,往線程池里添加任務(wù)隶债,任務(wù)會(huì)被放在任務(wù)隊(duì)列了,永遠(yuǎn)得不到執(zhí)行跑筝。
3.如果addWoker失敗死讹,即超過了最大線程池?cái)?shù)量,則執(zhí)行拒絕策略曲梗。

BTW赞警,線程池中的有個(gè)ctl變量妓忍,這個(gè)變量的定義是

    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

道格老爺子用的這個(gè)變量的前3位定義線程池的狀態(tài),后29位作為worker的數(shù)量愧旦。

我們可以看出世剖,整個(gè)流程的核心方法就是addWoker,在看它的代碼前笤虫,先來看一看Worker這個(gè)類旁瘫,該類定義如下

private final class Worker extends AbstractQueuedSynchronizer implements Runnable

先看它的成員變量和構(gòu)造方法:
Worker的變量中有thread和runnable,那么大概可以猜出Worker就是一個(gè)Thread的包裝類琼蚯,負(fù)責(zé)運(yùn)行任務(wù)境蜕。
構(gòu)造方法中,就使用線程池的ThreadFactory來new了線程凌停。

        final Thread thread;
        Runnable firstTask;
        volatile long completedTasks;

        Worker(Runnable firstTask) {
            setState(-1); //不希望在runWorker之前中斷
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

看到這個(gè)類也繼承了AQS,看一下實(shí)現(xiàn)的方法售滤,使用的互斥模式罚拟,也就是很正常的并發(fā)訪問控制。state=0代表解鎖狀態(tài)完箩,state=1代表上鎖狀態(tài)赐俗。但是要注意的是這個(gè)并不是像ReentrantLock一樣的重入鎖。因?yàn)楫?dāng)執(zhí)行interruptIdleWorkers時(shí)(shutdown等會(huì)調(diào)用)弊知,會(huì)獲取Worker的鎖阻逮,而我們不希望這時(shí)候Worker能獲取鎖中斷線程,因?yàn)闀?huì)增大線程管理和中斷控制的難度秩彤。
再看為什么初始化Worker時(shí)要setState(-1)叔扼,就是要避免開始執(zhí)行之前的Worker不會(huì)被中斷。那么什么時(shí)候會(huì)中斷Worker的線程漫雷?就是調(diào)用shutdownNow時(shí)(shutdownNow不像shutdown需要提前獲取worker的鎖才能中斷線程)瓜富,里面會(huì)調(diào)用interruptIfStarted方法,判斷了state>=0才會(huì)被中斷(見下interruptIfStarted方法)降盹。

        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {//state=-1不會(huì)被中斷
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }

ok与柑,這個(gè)類也實(shí)現(xiàn)了Runnable接口,看下它的run方法蓄坏,一個(gè)while循環(huán)价捧,先運(yùn)行自己傳進(jìn)來的任務(wù),如果傳進(jìn)來的任務(wù)為null涡戳,則從隊(duì)列里面取任務(wù)運(yùn)行结蟋。代碼中看到如果要走出這個(gè)循環(huán)的話,要么Worker的線程被中斷渔彰,要么getTask=null椎眯。順便一提的是挠将,Worker本身不運(yùn)行run,而是里面thread通過start運(yùn)行這個(gè)方法编整。再進(jìn)去看下getTask方法舔稀。

public void run() {
            runWorker(this);
     }

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // 因?yàn)閃orker初始化state=-1,這里先設(shè)置為0掌测,否則獲取不了鎖
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {//getTask是從隊(duì)列取出任務(wù)内贮,取到就繼續(xù)運(yùn)行。
                w.lock();
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();//線程池shutdownNow后中斷worker
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();//運(yùn)行自己的業(yè)務(wù)方法
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

getTask方法就是不斷從隊(duì)列里面取任務(wù)汞斧。如果是核心線程夜郁,就一直取任務(wù);如果是非核心線程粘勒,在keepAliveTime沒取到任務(wù)就返回null竞端。

private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            //...省略代碼
            int wc = workerCountOf(c);
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;  //看到如果當(dāng)前線程數(shù)大于coreSize,則啟動(dòng)從隊(duì)列取任務(wù)時(shí)采用超時(shí)的方法取庙睡。

            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {//可以看到如果線程池只有一個(gè)線程事富,那么這個(gè)線程就算是非核心線程,也不會(huì)銷毀的乘陪。
                if (compareAndDecrementWorkerCount(c))
                    return null;//如果當(dāng)前線程數(shù)大于coreSize统台,并且隊(duì)列是空,返回null
                continue;
            }
            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) ://這里終于看到keepAliveTime的作用了啡邑。
                    //如果是非核心線程贱勃,在keepAliveTime時(shí)間內(nèi)沒有任務(wù)進(jìn)來,那么根據(jù)上面的runWorker方法谤逼,取到的值是null贵扰,那么就跳出循環(huán),線程自動(dòng)銷毀流部。
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

??分析完Worker后拔鹰,又回到addWorker方法。每一步的注釋如下

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
            // 省略代碼...
            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))//這里就是看是否是添加的核心線程池?cái)?shù)的任務(wù)還是核心線程之外的任務(wù)贵涵。
                    return false;
            // 省略代碼...
            }
        }
        //經(jīng)過一系列校驗(yàn)后
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);//這個(gè)就是線程池中的核心內(nèi)部類列肢。
            final Thread t = w.thread;//取出new好的線程
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    int rs = runStateOf(ctl.get());//線程池的狀態(tài)

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {//再次檢查線程池狀態(tài)
                        if (t.isAlive()) //如果線程池SHUTDOWN或者RUNNING,但是線程被啟動(dòng)了宾茂,拋異常
                            throw new IllegalThreadStateException();
                        workers.add(w);//緩存worker
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;//更新池中最大線程數(shù)量
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {如果添加成功
                    t.start();//最終調(diào)用runWorker的地方瓷马。
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);//如果添加失敗就會(huì)嘗試關(guān)閉線程池。
        }
        return workerStarted;
    }

總結(jié):首先大致分析了線程池運(yùn)行的基本流程跨晴,簡單來說execute就是一直往隊(duì)列中扔任務(wù)欧聘,創(chuàng)建好的Worker不斷從中取任務(wù)運(yùn)行。這里要注意的是端盆,線程池在初始化時(shí)并沒有將核心線程數(shù)的線程一起初始化怀骤,而是來一個(gè)任務(wù)费封,創(chuàng)建一個(gè)線程。還有就是當(dāng)線程數(shù)超過核心線程數(shù)并且開始銷毀多出核心線程數(shù)的線程時(shí)蒋伦,有可能銷毀的是在小于核心線程數(shù)時(shí)創(chuàng)建出來的舊線程弓摘。
??然后是內(nèi)部類Worker,這里Worker就是thread和task的一個(gè)包裝類痕届,它的職能就是控制中斷和任務(wù)的運(yùn)行韧献。Worker是一個(gè)集成了AQS,實(shí)現(xiàn)了Runnable方法的內(nèi)部類研叫。Worker創(chuàng)建好后锤窑,通過new好的線程來運(yùn)行任務(wù)。核心Worker通過while不斷從隊(duì)列中取出任務(wù)嚷炉,任務(wù)隊(duì)列為空線程就阻塞渊啰;非核心Worker也是通過while不斷取任務(wù),只是有個(gè)取任務(wù)時(shí)keepAliveTime的超時(shí)時(shí)間申屹,在時(shí)間之內(nèi)取不到的任務(wù)的話線程就跳出循環(huán)绘证,自動(dòng)銷毀了。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末独柑,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子私植,更是在濱河造成了極大的恐慌忌栅,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,858評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件曲稼,死亡現(xiàn)場離奇詭異索绪,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)贫悄,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,372評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門瑞驱,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人窄坦,你說我怎么就攤上這事唤反。” “怎么了鸭津?”我有些...
    開封第一講書人閱讀 165,282評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵彤侍,是天一觀的道長。 經(jīng)常有香客問我逆趋,道長盏阶,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,842評(píng)論 1 295
  • 正文 為了忘掉前任闻书,我火速辦了婚禮名斟,結(jié)果婚禮上脑慧,老公的妹妹穿的比我還像新娘。我一直安慰自己砰盐,他們只是感情好闷袒,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,857評(píng)論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著楞卡,像睡著了一般霜运。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上蒋腮,一...
    開封第一講書人閱讀 51,679評(píng)論 1 305
  • 那天淘捡,我揣著相機(jī)與錄音,去河邊找鬼池摧。 笑死焦除,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的作彤。 我是一名探鬼主播膘魄,決...
    沈念sama閱讀 40,406評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼竭讳!你這毒婦竟也來了创葡?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,311評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤绢慢,失蹤者是張志新(化名)和其女友劉穎灿渴,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體胰舆,經(jīng)...
    沈念sama閱讀 45,767評(píng)論 1 315
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡骚露,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,945評(píng)論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了缚窿。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片棘幸。...
    茶點(diǎn)故事閱讀 40,090評(píng)論 1 350
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖倦零,靈堂內(nèi)的尸體忽然破棺而出误续,到底是詐尸還是另有隱情,我是刑警寧澤扫茅,帶...
    沈念sama閱讀 35,785評(píng)論 5 346
  • 正文 年R本政府宣布女嘲,位于F島的核電站,受9級(jí)特大地震影響诞帐,放射性物質(zhì)發(fā)生泄漏欣尼。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,420評(píng)論 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望愕鼓。 院中可真熱鬧钙态,春花似錦、人聲如沸菇晃。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,988評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽磺送。三九已至驻子,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間估灿,已是汗流浹背崇呵。 一陣腳步聲響...
    開封第一講書人閱讀 33,101評(píng)論 1 271
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留馅袁,地道東北人域慷。 一個(gè)月前我還...
    沈念sama閱讀 48,298評(píng)論 3 372
  • 正文 我出身青樓,卻偏偏與公主長得像汗销,于是被迫代替她去往敵國和親犹褒。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,033評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容