JDK(1.8)線程池源碼探究:ThreadPoolExecutor

結(jié)合JDK(1.8)源碼分析線程池(ThreadPoolExecutor)實現(xiàn)原理


我們平時所提的線程池乖菱,大多指的是ThreadPoolExecutor静陈,而非ThreadPool,關(guān)于ThreadPool這里不做贅敘螟碎。集中探討一下ThreadPoolExecutor眉菱。

  • 帶著問題解讀源碼,先看問題:
  1. JDK中有多種線程池,我們應(yīng)該使用哪一種線程池掉分?選擇的依據(jù)是什么俭缓?
  2. 線程池中的線程會一直存活著嗎?
  3. 線程池飽和之后酥郭,無法接收新任務(wù)华坦,線程池如何處理這部分任務(wù)(線程池拒絕策略)?
  4. 線程池是如何實現(xiàn)將任務(wù)提交與任務(wù)執(zhí)行分離的不从?
  5. 線程池中使用了“生產(chǎn)者-消費者”模式惜姐,那么它是如何實現(xiàn)的?
  6. 線程池中的各個參數(shù)分別代表什么消返?并且線程池中的線程數(shù)量如何確定载弄?

如果上述問題,很清楚的話撵颊,也請同學留步宇攻,下面是從源碼角度來分析上述問題。


為了便于后續(xù)描述倡勇,先貼出使用線程池的基本代碼:

/**
 * 演示線程池基本操作
 * @author Fooisart
 * Created on 2018-12-11
 */
public class ExecutorServiceDemo {
    public static void main(String[] args) {
        //1逞刷,線程池創(chuàng)建
        ExecutorService executor = Executors.newFixedThreadPool(2);
        //2嘉涌,新建任務(wù)
        Runnable task = new Runnable() {
            @Override
            public void run() {
                System.out.println("The real task is executing!");
            }
        };
        //3,提交任務(wù)到線程池
        executor.submit(task);
        //4夸浅,線程池關(guān)閉
        executor.shutdown();
    }
}

上述代碼仑最,演示了線程池的一個簡單使用方式。文首提到帆喇,線程池一般指的是ThreadPoolExecutor,而上述示例代碼中警医,并未出現(xiàn)ThreadPoolExecutor。那是不是表明坯钦,在Java中是不是還有其他類表示線程池呢预皇?其實不然,示例代碼中婉刀,有兩個類:

  • ExecutorService
  • Executors

分別解釋一下吟温,這哥倆與ThreadPoolExecutor的關(guān)系。

  • Executors 看一下源碼注釋突颊,
Factory and utility methods for Executor

第一句話表明其真正意義鲁豪,表明自己是一個工具類,進一步說是一個工廠類律秃。這個工廠的產(chǎn)品是—ThreadPoolExecutor爬橡。組裝不同的參數(shù),生產(chǎn)出不同功能的線程池友绝。Executors源碼解讀.

  • ExecutorService 不貼它源碼了堤尾。從他的名字可以看出,它是一個服務(wù)(Service)迁客,為線程池服務(wù)郭宝。它是一個接口,提供了一些操作線程池的接口方法掷漱。
    ExecutorService.png

    結(jié)合示例代碼粘室,分析ThreadPoolExecutor源碼。
    第1步創(chuàng)建線程池卜范,已解釋衔统。第2部創(chuàng)建任務(wù),線程的基礎(chǔ)知識海雪〗蹙簦看第3步,提交任務(wù)到線程池奥裸,跟進(AbstractExecutorService):
 /**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     */
    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

一共四行代碼险掀,第二行是任務(wù)封裝(封裝成FutureTask);第三行湾宙,execute樟氢,把上一步封裝好的任務(wù)冈绊,在此執(zhí)行。(BTW埠啃,之前讀過1.7源碼死宣,這個方法是在ThreadPoolExecutor中的,1.8抽象出了一個AbstractExecutorService)碴开。繼續(xù)跟進毅该,進入ThreadPoolExecutor.execute()方法。此時潦牛,正式進入ThreadPoolExecutor,它是線程池的核心類鹃骂,先簡要介紹一下ThreadPoolExecutor。

/**
     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * @param unit the time unit for the {@code keepAliveTime} argument
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     * @param threadFactory the factory to use when the executor
     *        creates a new thread
     * @param handler the handler to use when execution is blocked
     *        because the thread bounds and queue capacities are reached
     * @throws IllegalArgumentException if one of the following holds:<br>
     *         {@code corePoolSize < 0}<br>
     *         {@code keepAliveTime < 0}<br>
     *         {@code maximumPoolSize <= 0}<br>
     *         {@code maximumPoolSize < corePoolSize}
     * @throws NullPointerException if {@code workQueue}
     *         or {@code threadFactory} or {@code handler} is null
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) 

ThreadPoolExecutor中有多個構(gòu)造函數(shù)罢绽,其中有一個最基礎(chǔ)的構(gòu)造函數(shù),貼出的是它的說明静盅。其中有7個參數(shù)良价,分別介紹,先從簡單地講起:

  • threadFactory : 用來創(chuàng)建線程蒿叠,一般是用默認即可明垢。除非你想把你線程池中的線程編號,或者有規(guī)律地命名市咽。
  • handler : 拒絕策略痊银。所謂的拒絕策略,即當我們自己所定義的線程池無法再添加任務(wù)時(什么時候無法添加一會說)施绎,那么會使用拒絕策略溯革。看一下源碼中對拒絕策略的說明:
1,In the default ThreadPoolExecutor.AbortPolicy, the handler throws a runtime
 RejectedExecutionException upon rejection.
簡介:此為默認拒絕策略谷醉,會拋異常

2,In ThreadPoolExecutor.CallerRunsPolicy, the thread that invokes execute 
itself runs the task. This provides a simple feedback control mechanism that will 
slow down the rate that new tasks are submitted.
簡介:把并行改串行致稀,顯然會降低線程池的執(zhí)行效率。與策略1相比俱尼,優(yōu)點是不
會拋異常抖单,已不會丟任務(wù),缺點也很明細遇八,太慢啦矛绘!

3,In ThreadPoolExecutor.DiscardPolicy, a task that cannot be executed is simply dropped.
簡介:這個很痛快,直接拋棄任務(wù)刃永,也沒有異常货矮。效率是兼顧了,正確性與完整
性丟失了揽碘,或者起碼得讓我知道次屠,任務(wù)被拋棄了

4,In ThreadPoolExecutor.DiscardOldestPolicy, if the executor is not shut down, 
the task at the head of the work queue is dropped, and then execution is retried 
(which can fail again, causing this to be repeated.)
簡介:無
  • corePoolSize : 核心線程數(shù)园匹,即一直存活在線程池中。當有新任務(wù)提價時劫灶,開始創(chuàng)建裸违,直至創(chuàng)建的線程數(shù)達到corePoolSize值。
  • workQueue : 工作隊列本昏,如果說線程數(shù)已經(jīng)達到了corePoolSize,此時供汛,又有新任務(wù)進入,咋辦涌穆?那么這些任務(wù)會進入到等待隊列中怔昨。
  • maximumPoolSize : 最大線程數(shù)。前面提到了工作隊列宿稀,既然是隊列趁舀,就會有滿的那一天。滿了之后祝沸,咋辦矮烹?則會接著創(chuàng)建臨時線程(有點像快遞公司在大促期間找的臨時工),臨時處理一下這些任務(wù)罩锐。
  • unit : 時間單位奉狈。
  • keepAliveTime : 存活時間。既然是存活時間涩惑,那代表什么存活時間呢仁期?代表的是,上面提到的臨時線程存活時間竭恬,配合時間單位(unit)參數(shù)跛蛋,確定臨時線程干完活之后,還能活多久(快遞公司大促期間雇的臨時工痊硕,雇傭時間)问芬。

介紹完線程池各個參數(shù)的意義,回到ThreadPoolExecutor.execute方法:

 /**
     * Executes the given task sometime in the future.  The task
     * may execute in a new thread or in an existing pooled thread.
     * 介紹:該方法是任務(wù)執(zhí)行寿桨,從注釋中看出任務(wù)會在某個時間點執(zhí)行此衅。所有這個方法,只是保證了任務(wù)
     *      提交亭螟。提交了之后挡鞍,具體如何執(zhí)行,還得參照線程池中線程的實際情況预烙。是立即執(zhí)行還是排隊,
     *      是用已有線程執(zhí)行扁掸,還是創(chuàng)建新線程處理它翘县,都是未知的最域。
     *
     * If the task cannot be submitted for execution, either because this
     * executor has been shutdown or because its capacity has been reached,
     * the task is handled by the current {@code RejectedExecutionHandler}.
     *
     * @param command the task to execute
     * @throws RejectedExecutionException at discretion of
     *         {@code RejectedExecutionHandler}, if the task
     *         cannot be accepted for execution
     * @throws NullPointerException if {@code command} is null
     */
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         * 以下為線程池的核心三步,仔細看上文中7個參數(shù)的介紹锈麸,下面三步也就容易理解了
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         * 第一步镀脂,如果線程池中線程數(shù)量小于核心線程數(shù),則創(chuàng)建線程忘伞,然后調(diào)用執(zhí)行薄翅。
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 第二步,達到核心線程數(shù)之后氓奈,加入隊列翘魄。加入隊列成功后,等待被取出執(zhí)行舀奶。
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         * 第三步暑竟,如果隊列滿了(加入隊列失敗)育勺,繼續(xù)創(chuàng)建線程光羞,直至達到max
         * 線程數(shù)。如果達到max線程數(shù)怀大,開始使用某一種拒絕策略,拒絕任務(wù)
         */

          /**
           * ctl這個參數(shù)使用了二進制方式存了線程池的兩個信息:
           * ①當下線程池中線程數(shù)量呀闻;②線程池狀態(tài)化借,比如線程池關(guān)閉了等。
           * 
           * workerCountOf捡多,獲取線程池線程數(shù)量;
           * isRunning() 判斷線程池是否處于運行態(tài)
         */
        int c = ctl.get();
        //第一步
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        //第二步
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //第三步
        else if (!addWorker(command, false))
            reject(command);
    }

源碼中已添加了一些我的個人理解蓖康,應(yīng)該很容易理解了±菔郑回想一下蒜焊,咱們使用線程池,主要是為了高效地執(zhí)行任務(wù)科贬。執(zhí)行任務(wù)泳梆,才是我們最關(guān)心的“裾疲可是至今优妙,仍未明顯地看到任務(wù)執(zhí)行,所以繼續(xù)跟進憎账。我已經(jīng)趟平了套硼,直接進入addWorder():

 private boolean addWorker(Runnable firstTask, boolean core) {
        //線程創(chuàng)建前的一些合理性校驗
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
        //任務(wù)執(zhí)行
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
             
            /**
             * 其實這里的firstTask是我們最初傳進來要執(zhí)行的任務(wù),這里把它封裝成了一個Worker胞皱。
             * 看Worker源碼可以知道邪意,其實它是Runnable接口的一個實現(xiàn)類九妈。在這里,Worker充當?shù)?             * 是一個線程的角色雾鬼,它參數(shù)中有thread萌朱。那么線程的啟動肯定要調(diào)用start()方法。
             * 在下方也確實找到了start()方法呆贿,啟動之后嚷兔,根據(jù)線程的基本知識,肯定是進入了線程
             * 的run()方法做入。
             */
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

根據(jù)上文代碼注釋中冒晰,進入Worker.run()方法:

public void run() {
            runWorker(this);
        }

繼續(xù)跟進runWorker(),

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
         /**
         * firstTask是我們自己提交的任務(wù)
         * 
         * 可以看到竟块,下午中task直接調(diào)用了run()方法壶运。根據(jù)線程的基礎(chǔ)知識,只用調(diào)用線程的start()方法浪秘,
         * 才會啟動一個線程蒋情。而調(diào)用線程的run()方法,則會當做普通的方法執(zhí)行耸携,不會異步執(zhí)行棵癣。
         * 其實在這里就解釋了任務(wù)提交與執(zhí)行分離。線程池幫我們創(chuàng)建了線程夺衍,然后使用它創(chuàng)建的線程串行地
         * 調(diào)用了我們的run()方法狈谊,從而執(zhí)行了我們的任務(wù)。
         */
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            //getTask()此處是不斷地獲取隊列中的任務(wù)沟沙。
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

至此河劝,線程池已基本分析完了。文章開頭介紹的那幾個問題矛紫,沒有一一解釋赎瞎,但如果是認真讀到這里的話,那幾個問題應(yīng)該已經(jīng)了然于胸了颊咬。


As always and let me know what you think, leave the comments below. Bye :)

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末务甥,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子喳篇,更是在濱河造成了極大的恐慌缓呛,老刑警劉巖,帶你破解...
    沈念sama閱讀 210,978評論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件杭隙,死亡現(xiàn)場離奇詭異哟绊,居然都是意外死亡,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 89,954評論 2 384
  • 文/潘曉璐 我一進店門票髓,熙熙樓的掌柜王于貴愁眉苦臉地迎上來攀涵,“玉大人,你說我怎么就攤上這事洽沟∫怨剩” “怎么了?”我有些...
    開封第一講書人閱讀 156,623評論 0 345
  • 文/不壞的土叔 我叫張陵裆操,是天一觀的道長怒详。 經(jīng)常有香客問我,道長踪区,這世上最難降的妖魔是什么昆烁? 我笑而不...
    開封第一講書人閱讀 56,324評論 1 282
  • 正文 為了忘掉前任,我火速辦了婚禮缎岗,結(jié)果婚禮上静尼,老公的妹妹穿的比我還像新娘。我一直安慰自己传泊,他們只是感情好鼠渺,可當我...
    茶點故事閱讀 65,390評論 5 384
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著眷细,像睡著了一般拦盹。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上溪椎,一...
    開封第一講書人閱讀 49,741評論 1 289
  • 那天普舆,我揣著相機與錄音,去河邊找鬼池磁。 笑死,一個胖子當著我的面吹牛楷兽,可吹牛的內(nèi)容都是我干的地熄。 我是一名探鬼主播,決...
    沈念sama閱讀 38,892評論 3 405
  • 文/蒼蘭香墨 我猛地睜開眼芯杀,長吁一口氣:“原來是場噩夢啊……” “哼端考!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起揭厚,我...
    開封第一講書人閱讀 37,655評論 0 266
  • 序言:老撾萬榮一對情侶失蹤却特,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后筛圆,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體裂明,經(jīng)...
    沈念sama閱讀 44,104評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,451評論 2 325
  • 正文 我和宋清朗相戀三年太援,在試婚紗的時候發(fā)現(xiàn)自己被綠了闽晦。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片扳碍。...
    茶點故事閱讀 38,569評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖仙蛉,靈堂內(nèi)的尸體忽然破棺而出笋敞,到底是詐尸還是另有隱情,我是刑警寧澤荠瘪,帶...
    沈念sama閱讀 34,254評論 4 328
  • 正文 年R本政府宣布夯巷,位于F島的核電站,受9級特大地震影響哀墓,放射性物質(zhì)發(fā)生泄漏趁餐。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,834評論 3 312
  • 文/蒙蒙 一麸祷、第九天 我趴在偏房一處隱蔽的房頂上張望澎怒。 院中可真熱鬧,春花似錦阶牍、人聲如沸喷面。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,725評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽惧辈。三九已至,卻和暖如春磕瓷,著一層夾襖步出監(jiān)牢的瞬間盒齿,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,950評論 1 264
  • 我被黑心中介騙來泰國打工困食, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留边翁,地道東北人。 一個月前我還...
    沈念sama閱讀 46,260評論 2 360
  • 正文 我出身青樓硕盹,卻偏偏與公主長得像符匾,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子瘩例,可洞房花燭夜當晚...
    茶點故事閱讀 43,446評論 2 348

推薦閱讀更多精彩內(nèi)容