java線程池源碼分析

從線程池使用進(jìn)行實(shí)現(xiàn)分析
一.自定義線程池
1.自定義線程池
2.構(gòu)造完成之后狀態(tài)
3.關(guān)鍵參數(shù)介紹
二.執(zhí)行任務(wù)
1.execute一個(gè)任務(wù)
2.執(zhí)行分析
三.線程池停止
1.shutDown分析
2.shutDownNow分析
四.線程池常見(jiàn)問(wèn)題

一.自定義線程池

1.自定義線程池

//阻塞隊(duì)列
LinkedBlockingQueue blockingQueue= new LinkedBlockingQueue<Runnable>(10);
//線程工廠
ThreadFactory threadFactory=new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            t.setName("test");
            return t;
        }
    };
//拒絕策略
RejectedExecutionHandler rejectedExecutionHandler=new RejectedExecutionHandler() {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            System.out.println("自定義拒絕策略");
        }
    };

//核心線程數(shù)
Integer corePoolSize =3;
//最大線程數(shù)
Integer maxPoolSize=10;
//空閑線程等待時(shí)間
Integer keepAliveTime=7;
//構(gòu)造方法   
 ExecutorService threadPoolExecutor=new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime,
            TimeUnit.SECONDS, blockingQueue, threadFactory, rejectedExecutionHandler);

2.構(gòu)造完成之后狀態(tài)

構(gòu)造方法完成之后狀態(tài)

3.幾個(gè)關(guān)鍵參數(shù)

ctl:保存線程池存活狀態(tài),線程池內(nèi)線程數(shù)

workqueque:自定義的任務(wù)隊(duì)列

mainLock:線程池的鎖

termination:mainLock.newCondition()阻塞/通知線程;

keepAliveTime:非核心線程的存活時(shí)間

corePoolSize:3 

maximumPoolSize:10

workers:線程集合 線程池內(nèi)存活的線程數(shù)=0 

二.執(zhí)行任務(wù)

1.execute一個(gè)任務(wù)


execute之后狀態(tài)

workers: 0 到 1

2.執(zhí)行分析

1.首先檢查存活線程數(shù)量
2.根據(jù)存活數(shù)量進(jìn)行不同處理
3.開(kāi)辟新的線程/添加到任務(wù)隊(duì)列/開(kāi)新線程至最大線程數(shù)/拒絕任務(wù)
4.如果成功的開(kāi)了線程,調(diào)用線程的start()開(kāi)始處理
5.worker用while循環(huán)不斷獲取任務(wù).
6.直到當(dāng)前任務(wù)和任務(wù)隊(duì)列都為空,判斷是否阻塞直至有新任務(wù)到來(lái).

下面開(kāi)始就源碼進(jìn)行分析

 public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
         1.獲取線程池狀態(tài)
        int c = ctl.get();
        2.取workers數(shù)量 和如果小于核心線程數(shù),生成一個(gè)新的worker
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        3.workers.count到達(dá)了核心線程數(shù),線程池是運(yùn)行狀態(tài),把任務(wù)添加到任務(wù)隊(duì)列
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            4.再次檢查狀態(tài),如果不是運(yùn)行狀態(tài),將任務(wù)移除出隊(duì)列
            if (! isRunning(recheck) && remove(command))
                reject(command);
                5.如果此時(shí)線程池內(nèi)沒(méi)有線程了,再生成一個(gè)線程
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        6.如果worker加不上去了,執(zhí)行拒絕策略
        else if (!addWorker(command, false))
            reject(command);
    }
從上面可以看到每來(lái)一個(gè)任務(wù)都線程數(shù)量會(huì)判斷使用策略
1.如果沒(méi)有到核心線程,添加新線程執(zhí)行任務(wù)
2.如果到核心線程,添加到任務(wù)隊(duì)列
3.如果隊(duì)列都滿了,那就添加新線程,知道最大線程數(shù).

分析addworker之前需要對(duì)worker這個(gè)內(nèi)部類進(jìn)行了解

worker結(jié)構(gòu)
可以看到,本質(zhì)上就是對(duì)線程的包裝.主要是繼承了AQS,通過(guò)鎖對(duì)線程進(jìn)行保護(hù).

下面開(kāi)始看addWorks方法

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
        1.進(jìn)來(lái)先獲取線程池狀態(tài)
            int c = ctl.get();
            int rs = runStateOf(c);
            // Check if queue empty only if necessary.
            2.校驗(yàn)線程池狀態(tài)
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
            for (;;) {
                int wc = workerCountOf(c);
                3.先看是否達(dá)到最大容量.再判斷是否到達(dá)自定義規(guī)定容量
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if
            4.這里采取的cas算法讓worker計(jì)數(shù)器+1.如果cas失敗,重復(fù)1234.
            (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
        5.下面開(kāi)始正式創(chuàng)建worker
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
        6.獲得線程池的鎖
            final ReentrantLock mainLock = this.mainLock;
            7.初始化worker,構(gòu)造方法里創(chuàng)建了線程
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
            8.因?yàn)閣orks是hashSet不是線程安全的,所以這里需要加鎖處理.里面添加worker,更新相關(guān)計(jì)數(shù)器
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int c = ctl.get();
                    int rs = runStateOf(c);
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                9.如果worker添加成功.直接啟動(dòng)線程
                if (workerAdded) {
                11.start方法實(shí)際調(diào)用worker runWorker()方法
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
        10.如果線程啟動(dòng)失敗,worker數(shù)量減少,嘗試中斷失敗線程.
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

對(duì)runWorker進(jìn)行分析

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        猜測(cè)這里unlock主要避免執(zhí)行shutdownNow時(shí)并發(fā)問(wèn)題
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
        1.這里有兩種方式獲取任務(wù) getTask會(huì)根據(jù)情況是否阻塞線程.
        getTask下面分析
            while (task != null || (task = getTask()) != null) {
        2.這里采用的獨(dú)占鎖,保證了任務(wù)開(kāi)始執(zhí)行后,只有在鎖之前已經(jīng)產(chǎn)生的終止?fàn)顟B(tài)才能使自我中斷设江。
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                    3.這里終于開(kāi)始執(zhí)行任務(wù)了!<枇痢岂却!
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
        4.最后根據(jù)狀態(tài)判斷線程是否可以正常退出.
            processWorkerExit(w, completedAbruptly);
        }
    }
線程不斷獲取任務(wù)執(zhí)行,并且用獨(dú)占鎖保護(hù)線程執(zhí)行過(guò)程,這樣就保證了shutdown時(shí)后,正在執(zhí)行任務(wù)的縣城不會(huì)被中斷,但是shutdownNow不受此影響.可以直接粗暴的中斷線程.

getTask()分析

private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            1.如果線程池調(diào)用了shutDown,線程池狀態(tài)就是shutdown.如果調(diào)用shutdown線程池狀態(tài)就是stop,這2個(gè)策略主要是后續(xù)任務(wù)處理方式不一樣.
            如果是shutdown狀態(tài),隊(duì)列必須不為空,那么可以繼續(xù)從任務(wù)隊(duì)列獲取任務(wù)進(jìn)行處理
            如果是stop狀態(tài),隊(duì)列不為空也不進(jìn)行處理.(shutDownNow時(shí)任務(wù)隊(duì)列已經(jīng)把任務(wù)返回了)
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            boolean timed;      // Are workers subject to culling?

            for (;;) {
                int wc = workerCountOf(c);
                timed = allowCoreThreadTimeOut || wc > corePoolSize;

                if (wc <= maximumPoolSize && ! (timedOut && timed))
                    break;
                if (compareAndDecrementWorkerCount(c))
                    return null;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }

            try {
            2.判斷是阻塞一定時(shí)間還是一直阻塞直到取到任務(wù).
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }
這里通過(guò)多重判斷,來(lái)決定線程是阻塞在這里等任務(wù),還是存活keepAliveTime.
1.如果線程數(shù)是最大線程數(shù),并且任務(wù)隊(duì)列為空,那么阻塞空閑線程還是存活keepAliveTime時(shí)間,期間沒(méi)有任務(wù)到來(lái),讓其向后執(zhí)行,自然中斷.
2.如果線程數(shù)是核心線程數(shù),根據(jù)allowCoreThreadTimeOut處理站欺。true情況下阻塞keepAliveTime時(shí)間,期間沒(méi)有任務(wù)到來(lái),讓其向后執(zhí)行,自然中斷.false情況下,一直阻塞,直到有任務(wù)加入隊(duì)列.

三.線程池停止

1.shutDown分析

 public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            1.這里應(yīng)該是做一些權(quán)限檢查
            checkShutdownAccess();
            2.利用CAS將變線程池轉(zhuǎn)變成shutDown狀態(tài)
            advanceRunState(SHUTDOWN);
            3.中斷空閑線程
            interruptIdleWorkers();
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        4.最后嘗試將線程池狀態(tài)設(shè)置為TIDYING狀態(tài)
        tryTerminate();
    }

interruptIdleWorkers

 private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
                Thread t = w.thread;
                1.首先線程不是中斷狀態(tài),嘗試獲取線程的鎖.這里tryLock的實(shí)現(xiàn)就決定了
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                    2.如果能拿到鎖終端線程
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }
這里是先獲取線程鎖,獲取不到不終止,所以不能中斷任務(wù)中的線程,并且會(huì)把隊(duì)列中的任務(wù)處理完

tryLock

tryLock里面調(diào)用了tryAcquire
protected boolean tryAcquire(int unused) {
            1.這里采用了獨(dú)占鎖的實(shí)現(xiàn)方式 0到1才能獲取鎖.所以保證了正在執(zhí)行任務(wù)的線程不會(huì)被shutdown中斷
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
這是自定義了aqs獲取鎖的方式

2.shutDownNow分析

public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(STOP);
            1.第1個(gè)區(qū)別嘗試中斷所有線程
            interruptWorkers();
            1.第2個(gè)區(qū)別,把隊(duì)列中的任務(wù)返回給調(diào)用者處理
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
    }

interruptWorkers();

    private void interruptWorkers() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers)
            1.這里就粗暴多了,直接終止,可能導(dǎo)致有些線程不能正常終止,處于異常狀態(tài).
                w.interruptIfStarted();
        } finally {
            mainLock.unlock();
        }
    }
    
    ====================================================
    void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }

直接粗暴的終止線程,不管線程上沒(méi)上鎖.

線程池常見(jiàn)問(wèn)題
1.線程池對(duì)線程包裝的作用

2.關(guān)鍵參數(shù)(allowCoreThreadTimeOut,keepAliveTime,corePoolSize,maximumPoolSize,ctl)

3.關(guān)鍵技術(shù)(CAS,AbstractQueuedSynchronizer,ReentrantLock,LinkedB lockingQueue)

4.線程池線程數(shù)動(dòng)態(tài)變化過(guò)程

5.shutDown和shutDownNow執(zhí)行上有什么區(qū)別

6.excute和submit區(qū)別

7.核心線程是什么情況下減少到0

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末鹤耍,一起剝皮案震驚了整個(gè)濱河市哮缺,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌妓忍,老刑警劉巖虏两,帶你破解...
    沈念sama閱讀 211,348評(píng)論 6 491
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異世剖,居然都是意外死亡定罢,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,122評(píng)論 2 385
  • 文/潘曉璐 我一進(jìn)店門旁瘫,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)祖凫,“玉大人,你說(shuō)我怎么就攤上這事酬凳』菘觯” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 156,936評(píng)論 0 347
  • 文/不壞的土叔 我叫張陵宁仔,是天一觀的道長(zhǎng)稠屠。 經(jīng)常有香客問(wèn)我,道長(zhǎng)翎苫,這世上最難降的妖魔是什么权埠? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 56,427評(píng)論 1 283
  • 正文 為了忘掉前任,我火速辦了婚禮煎谍,結(jié)果婚禮上攘蔽,老公的妹妹穿的比我還像新娘。我一直安慰自己呐粘,他們只是感情好秩彤,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,467評(píng)論 6 385
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著事哭,像睡著了一般漫雷。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上鳍咱,一...
    開(kāi)封第一講書(shū)人閱讀 49,785評(píng)論 1 290
  • 那天降盹,我揣著相機(jī)與錄音,去河邊找鬼谤辜。 笑死蓄坏,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的丑念。 我是一名探鬼主播涡戳,決...
    沈念sama閱讀 38,931評(píng)論 3 406
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼脯倚!你這毒婦竟也來(lái)了渔彰?” 一聲冷哼從身側(cè)響起嵌屎,我...
    開(kāi)封第一講書(shū)人閱讀 37,696評(píng)論 0 266
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎恍涂,沒(méi)想到半個(gè)月后宝惰,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,141評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡再沧,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,483評(píng)論 2 327
  • 正文 我和宋清朗相戀三年尼夺,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片炒瘸。...
    茶點(diǎn)故事閱讀 38,625評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡淤堵,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出顷扩,到底是詐尸還是另有隱情粘勒,我是刑警寧澤,帶...
    沈念sama閱讀 34,291評(píng)論 4 329
  • 正文 年R本政府宣布屎即,位于F島的核電站庙睡,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏技俐。R本人自食惡果不足惜乘陪,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,892評(píng)論 3 312
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望雕擂。 院中可真熱鬧啡邑,春花似錦、人聲如沸井赌。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,741評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)仇穗。三九已至流部,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間纹坐,已是汗流浹背枝冀。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,977評(píng)論 1 265
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留耘子,地道東北人果漾。 一個(gè)月前我還...
    沈念sama閱讀 46,324評(píng)論 2 360
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像谷誓,于是被迫代替她去往敵國(guó)和親绒障。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,492評(píng)論 2 348

推薦閱讀更多精彩內(nèi)容

  • 1.1 為什么要線程池 我們?cè)趫?zhí)行大規(guī)模任務(wù)時(shí)捍歪,如安卓中的多圖下載户辱,網(wǎng)絡(luò)請(qǐng)求鸵钝,都少不了使用線程。而線程作為進(jìn)程下面...
    Armstrong_Q閱讀 570評(píng)論 0 9
  • 這篇文章是轉(zhuǎn)載InfoQ上方騰飛老師的一篇文章焕妙,這篇文章已經(jīng)寫(xiě)的非常詳細(xì)了蒋伦,閱讀原文請(qǐng)點(diǎn)擊這里. 1. 引言 合理...
    AlstonWilliams閱讀 313評(píng)論 0 0
  • 從北京坐車來(lái)易縣項(xiàng)目弓摘,在車上的時(shí)候就一直在想一個(gè)問(wèn)題焚鹊,為什么我這么"忙"?總是把工作和生活弄的一團(tuán)糟韧献,把自己也折騰...
    Henry韓閱讀 137評(píng)論 0 0
  • 文/蒙山樵夫 夏日的夜晚末患,喜歡在河邊獨(dú)步。手機(jī)里響著“喜馬拉雅FM”動(dòng)聽(tīng)的朗讀锤窑,我享受這種邊...
    蒙山樵夫下山來(lái)閱讀 509評(píng)論 2 2
  • 自從離開(kāi)校園進(jìn)入社會(huì)后渊啰,便很難再有一種清閑的時(shí)光讓自己心無(wú)旁騖去專注的閱讀探橱。工作之余,偶爾我會(huì)看看電視劇绘证、電影隧膏,用...
    懷襄閱讀 401評(píng)論 0 5