Java 線程池

線程池是什么?

線程池用于多線程處理中蝎宇,它可以根據(jù)系統(tǒng)的情況,可以有效控制線程執(zhí)行的數(shù)量祷安,優(yōu)化運(yùn)行效果姥芥。線程池做的工作主要是控制運(yùn)行的線程的數(shù)量,處理過(guò)程中將任務(wù)放入隊(duì)列汇鞭,然后在線程創(chuàng)建后啟動(dòng)這些任務(wù)凉唐,如果線程數(shù)量超過(guò)了最大數(shù)量超出數(shù)量的線程排隊(duì)等候,等其它線程執(zhí)行完畢霍骄,再?gòu)年?duì)列中取出任務(wù)來(lái)執(zhí)行台囱。

線程池的作用

在面向?qū)ο蟮木幊踢^(guò)程中,創(chuàng)建對(duì)象和銷毀對(duì)象是非常消耗時(shí)間和資源的读整。因此想要最小化這種消耗的一種思想就是『池化資源』簿训。線程池就是這樣的一種思想。我們通過(guò)重用線程池中的資源來(lái)減少創(chuàng)建和銷毀線程所需要耗費(fèi)的時(shí)間和資源。

線程池的一個(gè)作用是創(chuàng)建和銷毀線程的次數(shù)强品,每個(gè)工作線程可以多次使用膘侮;另一個(gè)作用是可根據(jù)系統(tǒng)情況調(diào)整執(zhí)行的線程數(shù)量,防止消耗過(guò)多內(nèi)存的榛。另外琼了,通過(guò)線程池,能有效的控制線程的最大并發(fā)數(shù)夫晌,提高系統(tǒng)資源利用率雕薪,同時(shí)避免過(guò)多的資源競(jìng)爭(zhēng),避免堵塞晓淀。

線程池的優(yōu)點(diǎn)總結(jié)如下幾個(gè)方面:

  • 線程復(fù)用
  • 控制最大并發(fā)數(shù)
  • 管理線程

線程池的組成

一般的線程池主要分為以下4個(gè)組成部分:

  1. 線程池管理器:用于創(chuàng)建并管理線程池
  2. 工作線程:線程池中的線程
  3. 任務(wù)接口:每個(gè)任務(wù)必須實(shí)現(xiàn)的接口所袁,用于工作線程調(diào)度其運(yùn)行
  4. 任務(wù)隊(duì)列:用于存放待處理的任務(wù),提供一種緩沖機(jī)制

線程池的常見(jiàn)應(yīng)用場(chǎng)景

許多服務(wù)器應(yīng)用常常需要處理大量而短小的請(qǐng)求(例如要糊,Web 服務(wù)器纲熏,數(shù)據(jù)庫(kù)服務(wù)器等等)妆丘,通常它們收到的請(qǐng)求數(shù)量很大锄俄,一個(gè)簡(jiǎn)單的模型是,當(dāng)服務(wù)器收到來(lái)自遠(yuǎn)程的請(qǐng)求時(shí)勺拣,為每一個(gè)請(qǐng)求開(kāi)啟一個(gè)線程奶赠,在請(qǐng)求完畢之后再對(duì)線程進(jìn)行銷毀。這樣處理帶來(lái)的問(wèn)題是药有,創(chuàng)建和銷毀線程所消耗的時(shí)間往往比任務(wù)本身所需消耗的資源要大得多毅戈。那么應(yīng)該怎么辦呢?

線程池為線程生命周期開(kāi)銷問(wèn)題和資源不足問(wèn)題提供了解決方案愤惰。我們可以通過(guò)線程池做到線程復(fù)用苇经,不需要頻繁的創(chuàng)建和銷毀線程,讓線程池中的線程一直存在于線程池中宦言,然后線程從任務(wù)隊(duì)列中取得任務(wù)來(lái)執(zhí)行扇单。而且這樣做的另一個(gè)好處有,通過(guò)適當(dāng)?shù)卣{(diào)整線程池中的線程數(shù)目奠旺,也就是當(dāng)請(qǐng)求的數(shù)目超過(guò)某個(gè)閾值時(shí)蜘澜,就強(qiáng)制其它任何新到的請(qǐng)求一直等待,直到獲得一個(gè)線程來(lái)處理為止响疚,從而可以防止資源不足鄙信。

Java線程池的簡(jiǎn)介

Java中提供了實(shí)現(xiàn)線程池的框架Executor,并且提供了許多種類的線程池忿晕,接下來(lái)的文章中將會(huì)做詳細(xì)介紹装诡。

Java線程池框架

Java中的線程池是通過(guò)Executor框架實(shí)現(xiàn)的,該框架中用到了Executor,Executors慎王,ExecutorService蚓土,ThreadPoolExecutor ,Callable和Future赖淤、FutureTask這幾個(gè)類蜀漆。

  • Executor:所有線程池的接口,只有一個(gè)方法
  • Executors:Executor 的工廠類咱旱,提供了創(chuàng)建各種不同線程池的方法确丢,返回的線程池都實(shí)現(xiàn)了ExecutorService 接口
  • ThreadPoolExecutor:線程池的具體實(shí)現(xiàn)類,一般所有的線程池都是基于這個(gè)類實(shí)現(xiàn)的

其中ThreadPoolExecutor的構(gòu)造方法如下:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
}

其中:

  • corePoolSize:線程池的核心線程數(shù)
  • maximumPoolSize:線程池中允許的最大線程數(shù)
  • keepAliveTime:空閑線程結(jié)束的超時(shí)時(shí)間
  • unit:是一個(gè)枚舉吐限,它表示的是 keepAliveTime 的單位
  • workQueue:工作隊(duì)列鲜侥,用于任務(wù)的存放

Java線程池的工作過(guò)程

Java線程池的工作過(guò)程如下:

  1. 線程池剛創(chuàng)建時(shí),里面沒(méi)有一個(gè)線程诸典。任務(wù)隊(duì)列是作為參數(shù)傳進(jìn)來(lái)的描函。不過(guò),就算隊(duì)列里面有任務(wù)狐粱,線程池也不會(huì)馬上執(zhí)行它們舀寓。
  2. 當(dāng)調(diào)用 execute() 方法添加一個(gè)任務(wù)時(shí),線程池會(huì)做如下判斷:
    • 如果正在運(yùn)行的線程數(shù)量小于 corePoolSize肌蜻,那么馬上創(chuàng)建線程運(yùn)行這個(gè)任務(wù)互墓;
    • 如果正在運(yùn)行的線程數(shù)量大于或等于 corePoolSize,那么將這個(gè)任務(wù)放入隊(duì)列蒋搜;
    • 如果這時(shí)候隊(duì)列滿了篡撵,而且正在運(yùn)行的線程數(shù)量小于 maximumPoolSize,那么還是要?jiǎng)?chuàng)建非核心線程立刻運(yùn)行這個(gè)任務(wù)豆挽;
    • 如果隊(duì)列滿了育谬,而且正在運(yùn)行的線程數(shù)量大于或等于 maximumPoolSize,那么線程池會(huì)拋出異常RejectExecutionException帮哈。
  3. 當(dāng)一個(gè)線程完成任務(wù)時(shí)膛檀,它會(huì)從隊(duì)列中取下一個(gè)任務(wù)來(lái)執(zhí)行。
  4. 當(dāng)一個(gè)線程無(wú)事可做但汞,超過(guò)一定的時(shí)間(keepAliveTime)時(shí)宿刮,線程池會(huì)判斷,如果當(dāng)前運(yùn)行的線程數(shù)大于 corePoolSize私蕾,那么這個(gè)線程就被停掉僵缺。所以線程池的所有任務(wù)完成后,它最終會(huì)收縮到 corePoolSize 的大小踩叭。

常見(jiàn)的Java線程池

生成線程池使用的是Executors的工廠方法磕潮,以下是常見(jiàn)的 Java 線程池:

SingleThreadExecutor

SingleThreadExecutor是單個(gè)線程的線程池翠胰,即線程池中每次只有一個(gè)線程在運(yùn)行,單線程串行執(zhí)行任務(wù)自脯。如果這個(gè)唯一的線程因?yàn)楫惓=Y(jié)束之景,那么會(huì)有一個(gè)新的線程來(lái)替代它。此線程池保證所有任務(wù)的執(zhí)行順序按照任務(wù)的提交順序執(zhí)行膏潮。

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}
FixedThreadPool

FixedThreadPool是固定數(shù)量的線程池锻狗,只有核心線程,每提交一個(gè)任務(wù)就是一個(gè)線程焕参,直到達(dá)到線程池的最大數(shù)量轻纪,然后后面進(jìn)入等待隊(duì)列,直到前面的任務(wù)完成才繼續(xù)執(zhí)行叠纷。線程池的大小一旦達(dá)到最大值就會(huì)保持不變刻帚,如果某個(gè)線程因?yàn)閳?zhí)行異常而結(jié)束,那么線程池會(huì)補(bǔ)充一個(gè)新線程涩嚣。

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

CachedThreadPool

CachedThreadPool是可緩存線程池崇众。如果線程池的大小超過(guò)了處理任務(wù)所需要的線程,那么就會(huì)回收部分空閑(60秒不執(zhí)行任務(wù))的線程航厚,當(dāng)任務(wù)數(shù)增加時(shí)顷歌,此線程池又可以智能的添加新線程來(lái)處理任務(wù)。此線程池不會(huì)對(duì)線程池大小做限制阶淘,線程池大小完全依賴于操作系統(tǒng)(或者說(shuō)JVM)能夠創(chuàng)建的最大線程大小衙吩。其中互妓,SynchronousQueue是一個(gè)是緩沖區(qū)為1的阻塞隊(duì)列溪窒。

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

ScheduledThreadPool

ScheduledThreadPool是核心線程池固定,大小無(wú)限制的線程池冯勉,支持定時(shí)和周期性的執(zhí)行線程澈蚌。創(chuàng)建一個(gè)周期性執(zhí)行任務(wù)的線程池。如果閑置,非核心線程池會(huì)在DEFAULT_KEEPALIVEMILLIS時(shí)間內(nèi)回收灼狰。

public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}

Java 線程池的創(chuàng)建和使用

我們可以通過(guò)Executors的工廠方法來(lái)創(chuàng)建一個(gè)線程池宛瞄。但是我們?cè)撊绾巫尵€程池執(zhí)行任務(wù)呢?

線程池最常用的提交任務(wù)的方法有兩種:

  • execute:
ExecutorService.execute(Runnable runable)交胚;
  • submit:
FutureTask task = ExecutorService.submit(Runnable runnable);
FutureTask<T> task = ExecutorService.submit(Runnable runnable,T Result);
FutureTask<T> task = ExecutorService.submit(Callable<T> callable);

可以看出submit開(kāi)啟的是有返回結(jié)果的任務(wù)份汗,會(huì)返回一個(gè)FutureTask對(duì)象,這樣就能通過(guò)get()方法得到結(jié)果蝴簇。submit最終調(diào)用的也是execute(Runnable runable)杯活,submit只是將Callable對(duì)象或Runnable封裝成一個(gè)FutureTask對(duì)象,因?yàn)镕utureTask是個(gè)Runnable熬词,所以可以在execute中執(zhí)行旁钧。

下面的示例代碼演示了如何創(chuàng)建一個(gè)線程池吸重,并且使用它管理線程:

public class MyThread extends Thread {
    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + " is running.");
    }
}
 
public class TestSingleThreadExecutor {
    public static void main(String[] args) {
        //創(chuàng)建一個(gè)可重用固定線程數(shù)的線程池
        ExecutorService pool = Executors.newFixedThreadPool(2);
        //創(chuàng)建實(shí)現(xiàn)了Runnable接口對(duì)象
        Thread tt1 = new MyThread();
        Thread tt2 = new MyThread();
        Thread tt3 = new MyThread();
        Thread tt4 = new MyThread();
        Thread tt5 = new MyThread();
        //將線程放入池中并執(zhí)行
        pool.execute(tt1);
        pool.execute(tt2);
        pool.execute(tt3);
        pool.execute(tt4);
        pool.execute(tt5);
        //關(guān)閉
        pool.shutdown();
    }
}

運(yùn)行結(jié)果:

pool-1-thread-1 is running.
pool-1-thread-2 is running.
pool-1-thread-1 is running.
pool-1-thread-2 is running.
pool-1-thread-1 is running.

Java線程池原理

這篇文章會(huì)分別從這三個(gè)方面,結(jié)合具體的代碼實(shí)現(xiàn)來(lái)剖析 Java 線程池的原理以及它的具體實(shí)現(xiàn)歪今。

線程復(fù)用

我們知道線程池的一個(gè)作用是創(chuàng)建和銷毀線程的次數(shù)嚎幸,每個(gè)工作線程可以多次使用。這個(gè)功能就是線程復(fù)用寄猩。想要了解 Java 線程池是如何進(jìn)行線程復(fù)用的嫉晶,我們首先需要了解線程的生命周期。

線程生命周期

下圖描述了線程完整的生命周期:

在一個(gè)線程完整的生命周期中田篇,它可能經(jīng)歷五種狀態(tài):新建(New)车遂、就緒(Runnable)、運(yùn)行(Running)斯辰、阻塞(Blocked)舶担、終止(Zombie)。

在 Java中彬呻,Thread 通過(guò)new來(lái)新建一個(gè)線程衣陶,這個(gè)過(guò)程是是初始化一些線程信息,如線程名闸氮、id剪况、線程所屬group等,可以認(rèn)為只是個(gè)普通的對(duì)象蒲跨。調(diào)用Thread的start()后Java虛擬機(jī)會(huì)為其創(chuàng)建方法調(diào)用棧和程序計(jì)數(shù)器译断,同時(shí)將hasBeenStarted為true,之后如果再次調(diào)用start()方法就會(huì)有異常或悲。

處于這個(gè)狀態(tài)中的線程并沒(méi)有開(kāi)始運(yùn)行孙咪,只是表示該線程可以運(yùn)行了。至于該線程何時(shí)開(kāi)始運(yùn)行巡语,取決于 JVM 里線程調(diào)度器的調(diào)度翎蹈。當(dāng)線程獲取CPU后,run()方法會(huì)被調(diào)用男公。不要自己去調(diào)用Thread的run()方法荤堪。之后根據(jù)CPU的調(diào)度,線程就會(huì)在就緒—運(yùn)行—阻塞間切換枢赔,直到run()方法結(jié)束或其他方式停止線程澄阳,進(jìn)入終止?fàn)顟B(tài)。

因此踏拜,如果要實(shí)現(xiàn)線程的復(fù)用碎赢,我們必須要保證線程池中的線程保持存活狀態(tài)(就緒籽孙、運(yùn)行怯伊、阻塞)筛婉。接下來(lái)却邓,我們就來(lái)看看ThreadPoolExecutor是如何實(shí)現(xiàn)線程復(fù)用的。

Worker 類

ThreadPoolExecutor主要是通過(guò)一個(gè)類來(lái)控制線程復(fù)用的:Worker 類峦嗤。

我們來(lái)看一下簡(jiǎn)化后的 Worker 類代碼:

private final class Worker implements Runnable {
 
    final Thread thread;
 
    Runnable firstTask;
 
    Worker(Runnable firstTask) {
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }
 
    public void run() {
        runWorker(this);
    }
 
    final void runWorker(Worker w) {
        Runnable task = w.firstTask;
        w.firstTask = null;
        while (task != null || (task = getTask()) != null){
             task.run();
        }
    }
  ……
}

從代碼中蕊唐,我們可以看到 Worker 實(shí)現(xiàn)了 Runnable 接口,并且它還有一個(gè) Thread成員變量 thread烁设,這個(gè) thread 就是要開(kāi)啟運(yùn)行的線程替梨。我們看到 Worker 的構(gòu)造方法中傳遞了一個(gè) Runnable 參數(shù),同時(shí)它把自己作為參數(shù)傳入 newThread()装黑,這樣的話副瀑,當(dāng) Thread 的start()方法得到調(diào)用時(shí),執(zhí)行的其實(shí)是 Worker 的run()方法恋谭,即runWorker()方法糠睡。

runWorker()方法之中有一個(gè) while 循環(huán),使用 getTask()來(lái)獲取任務(wù)疚颊,并執(zhí)行狈孔。接下來(lái),我們將會(huì)看到getTask()是如何獲取到 Runnable 對(duì)象的材义。

getTask()

我們來(lái)看一下簡(jiǎn)化后的getTask()代碼:

private Runnable getTask() {
  if(一些特殊情況) {
    return null;
  }
  Runnable r = workQueue.take();
  return r;
}

我們可以看到任務(wù)是從 workQueue中獲取的均抽,這個(gè) workQueue 就是我們初始化 ThreadPoolExecutor 時(shí)存放任務(wù)的 BlockingQueue隊(duì)列,這個(gè)隊(duì)列里的存放的都是將要執(zhí)行的 Runnable任務(wù)其掂。因?yàn)?BlockingQueue 是個(gè)阻塞隊(duì)列油挥,BlockingQueue.take()返回的是空,則進(jìn)入等待狀態(tài)直到 BlockingQueue 有新的對(duì)象被加入時(shí)喚醒阻塞的線程款熬。所以一般情況下深寥,Thread的run()方法不會(huì)結(jié)束,而是不斷執(zhí)行workQueue里的Runnable任務(wù)华烟,這就達(dá)到了線程復(fù)用的目的了翩迈。

控制最大并發(fā)數(shù)

我們現(xiàn)在已經(jīng)知道了 Java 線程池是如何做到線程復(fù)用的了持灰,但是Runnable 是什么時(shí)候被放入 workQueue 隊(duì)列中的呢盔夜,Worker里的Thread的又是什么時(shí)候調(diào)用start()開(kāi)啟新線程來(lái)執(zhí)行Worker的run()方法的呢?從上面的分析中我們可以看出Worker里的runWorker()執(zhí)行任務(wù)時(shí)是一個(gè)接一個(gè)堤魁,串行進(jìn)行的喂链,那并發(fā)是怎么體現(xiàn)的呢?它又是如何做到控制最大并發(fā)數(shù)的呢妥泉?

execute()

通過(guò)查看 execute()就能解答上述的一些問(wèn)題椭微,同樣是簡(jiǎn)化后的代碼:

public void execute(Runnable command) {
    if (command == null) throw new NullPointerException();
    int c = ctl.get();
    // 當(dāng)前線程數(shù) < corePoolSize
    if (workerCountOf(c) < corePoolSize) {
        // 直接啟動(dòng)新的線程。
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 活動(dòng)線程數(shù) >= corePoolSize
    // runState為RUNNING && 隊(duì)列未滿
    if (isRunning(c) && workQueue.offer(command)) {
    int recheck = ctl.get();
    // 再次檢驗(yàn)是否為RUNNING狀態(tài)
    // 非RUNNING狀態(tài) 則從workQueue中移除任務(wù)并拒絕
    if (!isRunning(recheck) && remove(command))
      reject(command);
    // 采用線程池指定的策略拒絕任務(wù)
    // 兩種情況:
    // 1.非RUNNING狀態(tài)拒絕新的任務(wù)
    // 2.隊(duì)列滿了啟動(dòng)新的線程失斆ち础(workCount > maximumPoolSize)
    } else if (!addWorker(command, false))
        reject(command);
}

addWorker()

我們?cè)賮?lái)看一下addWorker()的簡(jiǎn)化代碼:

private boolean addWorker(Runnable firstTask, boolean core) {
    int wc = workerCountOf(c);
    if (wc >= (core ? corePoolSize : maximumPoolSize)) {
        return false;
    }
    w = new Worker(firstTask);
    final Thread t = w.thread;
    t.start();
}

根據(jù)上面的代碼蝇率,線程池工作過(guò)程中是如何添加任務(wù)的就很清晰了:

  • 如果正在運(yùn)行的線程數(shù)量小于 corePoolSize迟杂,那么馬上創(chuàng)建線程運(yùn)行這個(gè)任務(wù);
  • 如果正在運(yùn)行的線程數(shù)量大于或等于 corePoolSize本慕,那么將這個(gè)任務(wù)放入隊(duì)列排拷;
  • 如果這時(shí)候隊(duì)列滿了,而且正在運(yùn)行的線程數(shù)量小于 maximumPoolSize锅尘,那么還是要?jiǎng)?chuàng)建非核心線程立刻運(yùn)行這個(gè)任務(wù)监氢;
  • 如果隊(duì)列滿了,而且正在運(yùn)行的線程數(shù)量大于或等于 maximumPoolSize藤违,那么線程池會(huì)拋出異常RejectExecutionException

如果通過(guò)addWorker()成功創(chuàng)建新的線程浪腐,則通過(guò)start()開(kāi)啟新線程,同時(shí)將firstTask作為這個(gè)Worker里的run()中執(zhí)行的第一個(gè)任務(wù)顿乒。雖然每個(gè)Worker的任務(wù)是串行處理议街,但如果創(chuàng)建了多個(gè)Worker,因?yàn)楣灿靡粋€(gè)workQueue璧榄,所以就會(huì)并行處理了傍睹。所以可以根據(jù)corePoolSize和maximumPoolSize來(lái)控制最大并發(fā)數(shù)。

過(guò)程如下圖所示:

管理線程

上邊的文章已經(jīng)講了犹菱,通過(guò)線程池可以很好的管理線程的復(fù)用拾稳,控制并發(fā)數(shù),以及銷毀等過(guò)程腊脱,而線程的管理過(guò)程已經(jīng)穿插在其中了访得,也很好理解。

在 ThreadPoolExecutor 有個(gè)AtomicInteger變量 ctl陕凹,這一個(gè)變量保存了兩個(gè)內(nèi)容:

  • 所有線程的數(shù)量
  • 每個(gè)線程所處的狀態(tài)

其中低29位存線程數(shù)悍抑,高3位存runState,通過(guò)位運(yùn)算來(lái)得到不同的值杜耙。

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

//得到線程的狀態(tài)
private static int runStateOf(int c) { return c & ~CAPACITY; }

//得到Worker的的數(shù)量
private static int workerCountOf(int c) { return c & CAPACITY; }

// 判斷線程是否在運(yùn)行
private static boolean isRunning(int c) { return c < SHUTDOWN; }

這里主要通過(guò)shutdown和shutdownNow()來(lái)分析線程池的關(guān)閉過(guò)程搜骡。首先線程池有五種狀態(tài)來(lái)控制任務(wù)添加與執(zhí)行。主要介紹以下三種:

  • RUNNING狀態(tài):線程池正常運(yùn)行佑女,可以接受新的任務(wù)并處理隊(duì)列中的任務(wù)记靡;
  • SHUTDOWN狀態(tài):不再接受新的任務(wù),但是會(huì)執(zhí)行隊(duì)列中的任務(wù)团驱;
  • STOP狀態(tài):不再接受新任務(wù)摸吠,不處理隊(duì)列中的任務(wù)

shutdown()這個(gè)方法會(huì)將runState置為SHUTDOWN,會(huì)終止所有空閑的線程嚎花,而仍在工作的線程不受影響寸痢,所以隊(duì)列中的任務(wù)人會(huì)被執(zhí)行;shutdownNow()方法將runState置為STOP紊选。和shutdown()方法的區(qū)別是啼止,這個(gè)方法會(huì)終止所有的線程道逗,所以隊(duì)列中的任務(wù)也不會(huì)被執(zhí)行了。

Java線程池框架源碼分析

前面的文章中已經(jīng)給出了Java線程池框架中幾個(gè)重要類的關(guān)系圖:

現(xiàn)在我們基于這張圖來(lái)逐步分析献烦。

Executor

public interface Executor {
    void execute(Runnable command);
}

這個(gè)接口表示向線程池中提交一個(gè)任務(wù)憔辫。

ExecutorService

public interface ExecutorService extends Executor {
    void shutdown();
    List<Runnable> shutdownNow();
    boolean isShutdown();
    boolean isTerminated();
    boolean awaitTermination(long timeout, TimeUnit unit)
       throws InterruptedException;
    <T> Future<T> submit(Callable<T> task);
    <T> Future<T> submit(Runnable task, T result);
    Future<?> submit(Runnable task);
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                      long timeout, TimeUnit unit)
            throws InterruptedException;
    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
                throws InterruptedException, ExecutionException;
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                                long timeout, TimeUnit unit)
                    throws InterruptedException, ExecutionException, TimeoutException;
}

可以看到 ExecutorService 擴(kuò)展了 Executor 接口,在 Executor 基礎(chǔ)上提供了更多的提交任務(wù)的方式和管理線程池的一些方法仿荆。

AbstractExecutorService

public abstract class AbstractExecutorService implements ExecutorService {
 
}

ThreadPoolExecutor

ThreadPoolExecutor 是線程池框架的關(guān)鍵類. 首先來(lái)看一下 ThreadPoolExecutor 中幾個(gè)重要的屬性.

public class ThreadPoolExecutor extends AbstractExecutorService {
    /**
     * 整個(gè)線程池的狀態(tài)控制類 ctl, 是一個(gè) AtomicInteger 贰您,封裝了下面2個(gè)部分:
     * workerCount 表示有效的線程數(shù)
     * runState 表示線程池當(dāng)前狀態(tài),是running, shutdown還是其他狀態(tài)
     *
     * runState表示了線程池的整個(gè)整個(gè)生命周期拢操,可以取以下值:
     * RUNNING: 接受新的task锦亦,處理隊(duì)列中的task
     * SHUTDOWN: 不接受新的 task 但會(huì)處理隊(duì)列中的 task
     * STOP: 不接受新的 task, 不處理隊(duì)列中的 task, 中斷正在執(zhí)行的 task
     * TIDYING: 所有 task 執(zhí)行結(jié)束, workerCount 是 0, 線程過(guò)渡到TIDYING會(huì)調(diào)用 terminated()鉤子方法
     * TERMINATED: terminated()執(zhí)行完成
     *
     * 狀態(tài)轉(zhuǎn)換關(guān)系:
     *
     * RUNNING -> SHUTDOWN(調(diào)用shutdown())
     * On invocation of shutdown(), perhaps implicitly in finalize()
     * (RUNNING or SHUTDOWN) -> STOP(調(diào)用shutdownNow())
     * On invocation of shutdownNow()
     * SHUTDOWN -> TIDYING(queue和pool均empty)
     * When both queue and pool are empty
     * TIDYING -> TERMINATED(調(diào)用terminated())
     * When the terminated() hook method has completed
     */
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
 
    /**
     * task 的隊(duì)列
     */
    private final BlockingQueue<Runnable> workQueue;
 
    /**
     * worker 線程的 Set, 只有持有 mainlock 時(shí)才能訪問(wèn)
     */
    private final ReentrantLock mainLock = new ReentrantLock();
    private final HashSet<Worker> workers = new HashSet<Worker>();
 
    private final Condition termination = mainLock.newCondition();
    private int largestPoolSize;
    private long completedTaskCount;
 
    private volatile ThreadFactory threadFactory;
    private volatile RejectedExecutionHandler handler;
 
    /**
     * idle 線程waiting for work的超時(shí)時(shí)間
     */
    private volatile long keepAliveTime;
    /**
     * 如果是 false,core threads將會(huì)保持 alive 即使處于 idel 狀態(tài)
     * 如果是 true,core threads會(huì)keepAliveTime作為超時(shí)時(shí)間 wait for work
     */
    private volatile boolean allowCoreThreadTimeOut;
 
    private volatile int corePoolSize;
    private volatile int maximumPoolSize;
    private static final RejectedExecutionHandler defaultHandler =
        new AbortPolicy();
}

workerCount, runState 使用一個(gè) AtomicInteger 進(jìn)行了封裝, runState用 int 的高3位標(biāo)書(shū),低位表示 workerCount, 所以我們能看到 ThreadPoolExecutor 中和 ctl 相關(guān)的常量和解析方法.

private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
 
// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;
 
// Packing and unpacking ctl
private static int runStateOf(int c)     { return c & ~CAPACITY; }
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

ThreadPoolExecutor 最主要的構(gòu)造函數(shù),設(shè)置上面說(shuō)的重要屬性.

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

現(xiàn)在來(lái)看一下 execute() 方法, execute()方法有3個(gè)處理步驟:

  1. 線程數(shù)小于 corePoolSize 時(shí),則試圖創(chuàng)建一個(gè)新的 worker 線程
  2. 如果上面一步失敗了,則試圖將任務(wù)添加到阻塞隊(duì)列中令境,并且要再一次判斷需要不需要回滾隊(duì)列杠园,或者說(shuō)創(chuàng)建線程
  3. 如果上面兩步都失敗了,則會(huì)試圖強(qiáng)行創(chuàng)建一個(gè)線程來(lái)執(zhí)行這個(gè)任務(wù)舔庶,如果還是失敗抛蚁,扔掉這個(gè)任務(wù)
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    // 1.判斷有效線程數(shù)是否小于核心線程數(shù)
    if (workerCountOf(c) < corePoolSize) {
        //創(chuàng)建新線程
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 2.分開(kāi)來(lái)看,首先判斷當(dāng)前的池子是否是處于 running 狀態(tài)
    // 因?yàn)橹挥?running 狀態(tài)才可以接收新任務(wù)
    // 接下來(lái)判斷能否成功添加到隊(duì)列中惕橙,如果隊(duì)列滿了或者其他情況則會(huì)跳到下一步
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 再次檢查池子的狀態(tài)瞧甩,如果進(jìn)入了非 running 狀態(tài),回滾隊(duì)列弥鹦,扔掉這個(gè)任務(wù)
        if (! isRunning(recheck) && remove(command))
            reject(command);
        //如果處于 running 狀態(tài)則檢查當(dāng)前的有效線程肚逸,如果沒(méi)有則創(chuàng)建一個(gè)線程
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 3.前兩步失敗了,就強(qiáng)行創(chuàng)建線程彬坏,成功會(huì)返回true朦促,如果失敗扔掉這個(gè)任務(wù)
    else if (!addWorker(command, false))
        reject(command);
}

解釋一下第二步,為什么要recheck

當(dāng)這個(gè)任務(wù)被添加到了阻塞隊(duì)列前栓始,池子處于 RUNNING 狀態(tài)务冕,但如果在添加到隊(duì)列成功后,池子進(jìn)入了 SHUTDOWN 狀態(tài)或者其他狀態(tài)幻赚,這時(shí)候是不應(yīng)該再接收新的任務(wù)的禀忆,所以需要把這個(gè)任務(wù)從隊(duì)列中移除,并且 reject

同樣坯屿,在沒(méi)有添加到隊(duì)列前油湖,可能有一個(gè)有效線程,但添加完任務(wù)后领跛,這個(gè)線程閑置超時(shí)或者因?yàn)楫惓1桓傻袅耍@時(shí)候需要?jiǎng)?chuàng)建一個(gè)新的線程來(lái)執(zhí)行任務(wù)

為了更直觀的理解一個(gè)任務(wù)的執(zhí)行過(guò)程撤奸,可以參考下面這張圖

addWorker()

前一步把 execute 的流程捋了一遍吠昭,里面多次出現(xiàn)了 addWorker() 方法喊括,前文說(shuō)到這是個(gè)創(chuàng)建線程的方法,來(lái)看看 addWorker 做了些什么矢棚,這個(gè)方法代碼比較長(zhǎng)郑什,我們拆開(kāi)來(lái)一點(diǎn)一點(diǎn)看.

  • 第一部分 — 判斷各種基礎(chǔ)異常
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
 
        // Check if queue empty only if necessary.
        // 檢查線程池狀態(tài),隊(duì)列狀態(tài)蒲肋,以及 firstask 蘑拯,拆開(kāi)來(lái)看
        // 這段代碼看起來(lái)異常的蛋疼,轉(zhuǎn)換一下邏輯即
        // rs>= SHUTDOWN && (rs != SHUTDOWN || firstTask != null ||workQueue.isEmpty())
        // 總結(jié)起來(lái)就是 當(dāng)前處于非 Running 狀態(tài),并且這三種情況
        // 1. 不是處于 SHUTDOWN 狀態(tài),不能再創(chuàng)建線程
        // 2. 有新的任務(wù) (因?yàn)椴荒茉俳邮招碌娜蝿?wù))
        // 3. 阻塞隊(duì)列中已經(jīng)沒(méi)有任務(wù) (不需要再創(chuàng)建線程)
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;
 
        for (;;) {
            //當(dāng)前有效線程數(shù)目
            int wc = workerCountOf(c);
            // 根據(jù)傳入的參數(shù)確定以核心線程數(shù)還是最大線程數(shù)作為判斷條件
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                // 大于容量 或者指定的線程數(shù)兜粘,不允許創(chuàng)建
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
}
  • 第二部分 — 試圖創(chuàng)建線程

創(chuàng)建一個(gè)Worker

boolean workerStarted = false; //標(biāo)記 worker 開(kāi)啟狀態(tài)
boolean workerAdded = false; //標(biāo)記 worker 添加狀態(tài)
Worker w = null;
try {
    w = new Worker(firstTask); //將這個(gè)任務(wù)作為 worker 的第一個(gè)任務(wù)傳入
    final Thread t = w.thread; //通過(guò) worker 獲取到一個(gè)線程
    if (t != null) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // Recheck while holding lock.
            // Back out on ThreadFactory failure or if
            // shut down before lock acquired.
            int rs = runStateOf(ctl.get());
            // running狀態(tài)申窘,或者 shutdown 狀態(tài)但是沒(méi)有新的任務(wù)
            if (rs < SHUTDOWN ||
                (rs == SHUTDOWN && firstTask == null)) {
                if (t.isAlive()) // precheck that t is startable
                    throw new IllegalThreadStateException();
                // 將這個(gè) worker 添加到線程池中
                workers.add(w);
                int s = workers.size();
                if (s > largestPoolSize)
                    largestPoolSize = s;
                // 標(biāo)記worker添加成功
                workerAdded = true;
            }
        } finally {
            mainLock.unlock();
        }
        // 如果 worker 創(chuàng)建成功,開(kāi)啟線程
        if (workerAdded) {
            t.start();
            workerStarted = true;
        }
    }
} finally {
    if (! workerStarted)
        addWorkerFailed(w);
}
return workerStarted;

上面代碼從邏輯層面來(lái)看不算難懂孔轴,到這里一個(gè)任務(wù)到達(dá)后剃法,ThreadPoolExecutor 的處理就結(jié)束了,那么任務(wù)又是怎么被添加到阻塞隊(duì)列中路鹰,線程是如何從隊(duì)列中取出任務(wù)贷洲,上文中的 Worker 又是什么東西?

一個(gè)一個(gè)來(lái)晋柱,先來(lái)看看 Worker 到底是什么.

Worker

Worker 是 ThreadPoolExecutor 的一個(gè)內(nèi)部類优构,實(shí)現(xiàn)了 Runnable 接口,繼承自 AbstractQueuedSynchronizer,這又是個(gè)什么鬼雁竞?俩块?? 這個(gè)就是經(jīng)常見(jiàn)到的 AQS 的全稱,這個(gè)暫時(shí)還沒(méi)有研究.~~~~

private final class Worker
       extends AbstractQueuedSynchronizer
       implements Runnable {
 
    final Thread thread;
    /** Initial task to run. Possibly null. */
    Runnable firstTask;
    /** Per-thread task counter */
    volatile long completedTasks;
}

簡(jiǎn)單來(lái)說(shuō),Worker實(shí)現(xiàn)了 lock 和 unLock 方法來(lái)標(biāo)示當(dāng)前線程的狀態(tài)是否為閑置

// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
public void lock()        { acquire(1); }
public boolean tryLock()  { return tryAcquire(1); }
public void unlock()      { release(1); }
public boolean isLocked() { return isHeldExclusively(); }

上一節(jié)創(chuàng)建線程成功后調(diào)用 t.start() 而這個(gè)線程又是 Worker 的成員變量

Worker(Runnable firstTask) {
    setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);
}

可以看到這里將 Worker 作為 Runnable 參數(shù)創(chuàng)建了一個(gè)新的線程浓领,我們知道 Thread 接收一個(gè) Runnable 對(duì)象后 start 運(yùn)行的是 Runnable 的 run 方法玉凯,Worker 的 run 方法調(diào)用了 runWorker ,這個(gè)方法里面就是取出任務(wù)執(zhí)行的邏輯

public void run() {
    runWorker(this);
}
 
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask; // 獲取到 worker 的第一個(gè)任務(wù)
    w.firstTask = null;
    w.unlock(); // 標(biāo)記為閑置,還沒(méi)有開(kāi)始任務(wù) 允許打斷
    boolean completedAbruptly = true; // 異常退出標(biāo)記
    try {
        // 循環(huán)取出任務(wù)联贩,如果第一個(gè)任務(wù)不為空漫仆,或者從隊(duì)列中拿到了任務(wù)
        // 只要這兩個(gè)條件滿足,會(huì)一直循環(huán)泪幌,直到?jīng)]有任務(wù)盲厌,正常退出,或者異常退出
        while (task != null || (task = getTask()) != null) {
            w.lock();// 該線程標(biāo)記為非閑置
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted. This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            // 翻譯注釋:1.如果線程池STOPPING狀態(tài)祸泪,需要中斷線程
            // 2.Thread.interrupted()是一個(gè)native方法吗浩,返回當(dāng)前線程是否有被等待中斷的請(qǐng)求
            // 3.第二個(gè)條件成立時(shí),檢查線程池狀態(tài)没隘,如果為STOP懂扼,并且沒(méi)有被中斷,則中斷線程
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            // 執(zhí)行任務(wù)
            try {
                beforeExecute(wt, task);// 執(zhí)行前
                Throwable thrown = null;
                try {
                    task.run(); // 執(zhí)行任務(wù)
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown); // 執(zhí)行結(jié)束
                }
            } finally {
                task = null; // 將 worker 的任務(wù)置空
                w.completedTasks++;
                w.unlock(); // 釋放鎖,進(jìn)入閑置狀態(tài)
            }
        }// 循環(huán)結(jié)束
        completedAbruptly = false; // 標(biāo)記為正常退出
    } finally {
      // 干掉 worker
        processWorkerExit(w, completedAbruptly);
    }
}

這里弄清楚了一件事情阀湿,進(jìn)入循環(huán)準(zhǔn)備執(zhí)行任務(wù)時(shí)赶熟,worker 加鎖標(biāo)記為非閑置,任務(wù)執(zhí)行完畢或者出現(xiàn)異常陷嘴,worker 釋放鎖映砖,進(jìn)入閑置狀態(tài)。

也就是當(dāng)一個(gè) worker 執(zhí)行任務(wù)前或者執(zhí)行完任務(wù)灾挨,到取出下一個(gè)任務(wù)期間邑退,都是閑置狀態(tài)可以被打斷

上面取出任務(wù)調(diào)用了 getTask() ,誒~為什么有一個(gè)死循環(huán)劳澄,別著急地技,慢慢看來(lái)。上面的代碼可以知道如果 getTask 返回任務(wù)則執(zhí)行浴骂,如果返回為 null 則 worker 需要被回收

private Runnable getTask() {
    // 標(biāo)記取任務(wù)是否超時(shí)
    boolean timedOut = false; // Did the last poll() time out?
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        // Check if queue empty only if necessary.
        // 如果線程池狀態(tài)為 STOP 或者 SHUTDOWN 并且隊(duì)列已經(jīng)為空乓土,回收 wroker
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }
        //獲取當(dāng)前有效線程數(shù)
        int wc = workerCountOf(c);
        // Are workers subject to culling?
        // timed 用來(lái)標(biāo)記當(dāng)前的 worker 是否設(shè)置超時(shí)時(shí)間,
        // 還記得獲取線程池的時(shí)候 可以設(shè)置核心線程超時(shí)時(shí)間
        //1.允許核心線程超時(shí)回收(即所有線程) 2.當(dāng)前有效線程超過(guò)核心線程數(shù)(需要回收)
        // 如果timed == false 則該worker不會(huì)被回收溯警,如果沒(méi)有取到任務(wù) 會(huì)一直阻塞
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
 
        // 回收線程條件
        // 1. 有效線程數(shù)已經(jīng)大于了線程池的最大線程數(shù)或者設(shè)置了超時(shí)回收并且已經(jīng)超時(shí)
        // 2. 有效線程數(shù)大于1或者隊(duì)列任務(wù)已經(jīng)為空
        // 只有當(dāng)上面1和2 同時(shí)滿足時(shí) 則試圖回收線程
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
          // 如果減少workercount成功 直接回收
            if (compareAndDecrementWorkerCount(c))
                return null;
          // 否則重走循環(huán)趣苏,從第一個(gè)判斷條件處回收
            continue;
        }
        // 取任務(wù)
        try {
          // 根據(jù)是否設(shè)置超時(shí)回收來(lái)選擇不同的取任務(wù)的方式
          // poll 方法取任務(wù)會(huì)有超時(shí)時(shí)間,超過(guò)時(shí)間則返回null
          // take 方法沒(méi)有超時(shí)時(shí)間梯轻,阻塞式方法
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
          // 如果任務(wù)不為空返回任務(wù)
            if (r != null)
                return r;
          // 否則標(biāo)記超時(shí) 進(jìn)入下一次循環(huán)等待回收
            timedOut = true;
        } catch (InterruptedException retry) {
          // 如果出現(xiàn)異常食磕,試圖重試
            timedOut = false;
        }
    }
}

getTask() 方法邏輯也捋得差不多了,這里又出現(xiàn)了兩個(gè)新的方法喳挑,workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) 和 workQueue.take() 彬伦,這兩個(gè)都是阻塞隊(duì)列的方法,來(lái)看看它們又各自是怎么實(shí)現(xiàn)的

LinkedBlockingQueue — 阻塞隊(duì)列

ThreadPoolExecutor 使用的是鏈表結(jié)構(gòu)的阻塞隊(duì)列伊诵,實(shí)現(xiàn)了 BlockingQueue 接口单绑,而 BlockingQueue 則是繼承自 Queue 接口,再上層就是 Collection 接口曹宴。

因?yàn)楸酒P記主要是分析 ThreadPoolExecutor 的原理搂橙,所以不會(huì)詳細(xì)介紹 LinkedBlockingQueue 中的其它代碼,主要介紹這里所用的方法笛坦,首先來(lái)看一下上文所提到的 take()

public E take() throws InterruptedException {
    E x; // 任務(wù)
    int c = -1; // 取出任務(wù)后的剩余任務(wù)數(shù)量
    final AtomicInteger count = this.count; // 當(dāng)前任務(wù)數(shù)量
    final ReentrantLock takeLock = this.takeLock; // 加鎖防止并發(fā)
    takeLock.lockInterruptibly();
    try {
      // 如果隊(duì)列數(shù)量為空区转,則一直循環(huán),阻塞線程
        while (count.get() == 0) {
            notEmpty.await();
        }
      // 取出任務(wù)
        x = dequeue();
      // 任務(wù)數(shù)量減一
        c = count.getAndDecrement();
        if (c > 1)
            notEmpty.signal();// 標(biāo)記隊(duì)列非空
    } finally {
        takeLock.unlock(); // 釋放鎖
    }
    if (c == capacity)
        signalNotFull();//標(biāo)記隊(duì)列已滿
    return x;// 返回任務(wù)
}

上面的代碼可以知道 take 方法會(huì)一直阻塞直到隊(duì)列有新的任務(wù)為止

接下來(lái)是 poll 方法版扩,可以看到幾乎與 take 方法相同废离,唯一的區(qū)別是在阻塞的循環(huán)代碼塊里面加了時(shí)間判斷,如果超時(shí)則直接返回為空礁芦,不會(huì)一直阻塞下去

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    E x = null; // 存放的任務(wù)
    int c = -1;
    long nanos = unit.toNanos(timeout); // 超時(shí)時(shí)間
    final AtomicInteger count = this.count; // 隊(duì)列中的數(shù)量
    final ReentrantLock takeLock = this.takeLock; // 加鎖防止并發(fā)
    takeLock.lockInterruptibly();
    try {
        // 如果隊(duì)列為空蜻韭,則不斷的循環(huán)
        while (count.get() == 0) {
            // 如果當(dāng)?shù)褂?jì)時(shí)小于0 即超時(shí)時(shí)間到 則返回空
            if (nanos <= 0)
                return null;
            // 讓線程等待
            nanos = notEmpty.awaitNanos(nanos);
        }
        x = dequeue(); // 取出一個(gè)任務(wù)
        c = count.getAndDecrement(); // 取出后的隊(duì)列數(shù)量
        if (c > 1)
            notEmpty.signal(); // 標(biāo)記非空
    } finally {
        takeLock.unlock(); // 釋放鎖
    }
    if (c == capacity)
        signalNotFull(); // 標(biāo)記隊(duì)列已滿
    return x; // 返回任務(wù)
}

線程池的回收及終止

前一節(jié)分析了任務(wù)的執(zhí)行流程及原理,也留下了一個(gè)問(wèn)題,worker 是如何被回收的呢湘捎?線程池該如何管理呢诀豁?回到上一節(jié)的 runWorker() 方法中窄刘,還記得最后調(diào)用了一個(gè)方法

processWorkerExit(w, completedAbruptly);

這個(gè)方法傳入了兩個(gè)參數(shù)窥妇,第一個(gè)是當(dāng)前的 Woker ,第二個(gè)是標(biāo)記異常退出的標(biāo)識(shí)

首先判斷是否為異常退出,如果是異常退出的話需要手動(dòng)調(diào)整線程數(shù)量娩践,如果是正郴铘妫回收的,getTask 方法里面已經(jīng)手動(dòng)調(diào)整過(guò)了翻伺,不記得的小伙伴可以看看前文的代碼材泄,找找 decrementWorkerCount(),

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();
    // 加鎖
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    // 記錄線程池完成的任務(wù)總數(shù),從 workers 中移除該 worker
    try {
        completedTaskCount += w.completedTasks;
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }
 
    tryTerminate(); // 嘗試關(guān)閉池子
    int c = ctl.get();
    // 以下的代碼是判斷需不需要給線程池創(chuàng)建一個(gè)新的線程
    // 如果線程池的狀態(tài)是 RUNNING 或者 SHUTDOWN 進(jìn)一步判斷需不需要?jiǎng)?chuàng)建
    if (runStateLessThan(c, STOP)) {
        // 如果為異常退出直接創(chuàng)建吨岭,如果不是異常退出進(jìn)入判斷
        if (!completedAbruptly) {
            // 獲取線程池應(yīng)該存在的最小線程數(shù) 如果設(shè)置了超時(shí) 則是0拉宗,否則是核心線程數(shù)
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            // 如果 min 是0 但是隊(duì)列又不為空,則 min 應(yīng)該是1
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            //如果當(dāng)前池中的有效線程數(shù)大于等于最小線程數(shù) 則不需要?jiǎng)?chuàng)建
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        // 創(chuàng)建線程
        addWorker(null, false);
    }
}

上面的代碼中調(diào)用了 tryTerminate() 方法辣辫,這個(gè)方法是用于終止線程池的旦事,又是一個(gè) for 循環(huán),從代碼結(jié)構(gòu)來(lái)看是異常情況的重試機(jī)制急灭。還是老方法姐浮,慢慢來(lái)看總共做了幾件事情

final void tryTerminate() {
    for (;;) {
        int c = ctl.get();
      // 如果處于這三種情況不需要關(guān)閉線程池
      // 1. Running 狀態(tài)
      // 2. SHUTDOWN 狀態(tài)并且任務(wù)隊(duì)列不為空,不能終止
      // 3. TIDYING 或者 TERMINATE 狀態(tài)葬馋,說(shuō)明已經(jīng)在關(guān)閉了 不需要重復(fù)關(guān)閉
        if (isRunning(c) ||
            runStateAtLeast(c, TIDYING) ||
            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
            return;
        // 進(jìn)入到關(guān)閉線程池的代碼卖鲤,如果線程池中還有線程,則需要打斷線程
        if (workerCountOf(c) != 0) { // Eligible to terminate 可以關(guān)閉池子
            // 打斷閑置線程畴嘶,只打斷一個(gè)
            interruptIdleWorkers(ONLY_ONE);
            return;
            // 如果有兩個(gè)以上怎么辦蛋逾?只打斷一個(gè)?
            // 這里只打斷一個(gè)是因?yàn)?worker 回收的時(shí)候都會(huì)進(jìn)入到該方法中來(lái)窗悯,可以回去再看看
            // runWorker方法最后的代碼
        }
 
        // 線程已經(jīng)回收完畢区匣,準(zhǔn)備關(guān)閉線程池
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();// 加鎖
        try {
            // 將狀態(tài)改變?yōu)?TIDYING 并且即將調(diào)用 terminated
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    terminated(); // 終止線程池
                } finally {
                    ctl.set(ctlOf(TERMINATED, 0)); // 改變狀態(tài)
                    termination.signalAll();
                }
                return;
            }
        } finally {
            mainLock.unlock();
            // 如果終止失敗會(huì)重試
        }
        // else retry on failed CAS
    }
}

嘗試終止線程池的代碼分析完了,好像就結(jié)束了~但作為好奇寶寶蟀瞧,我們是不是應(yīng)該看看如何打斷閑置線程沉颂,以及 terminated 中做了什么呢?來(lái)吧悦污,繼續(xù)裝逼

先來(lái)看打斷線程

private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();//加鎖~
    try {
      // 遍歷線程池中的 wroker
        for (Worker w : workers) {
            Thread t = w.thread;
          // 如果線程沒(méi)有被中斷铸屉,并且能夠獲取到 worker的鎖(說(shuō)明是閑置線程)
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    t.interrupt();// 中斷線程
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
          // 只中斷一個(gè) worker 跳出循環(huán),否則會(huì)將所有的閑置線程都中斷
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();// 釋放鎖
    }

有同學(xué)開(kāi)始裝逼了切端,說(shuō)我們是好奇寶寶彻坛,t.interrupt() 方法也應(yīng)該看,嗯~沒(méi)錯(cuò),但這里是調(diào)用了 native 方法昌屉,會(huì) c 的可以去看看裝逼钙蒙,我就算了~

好了,再來(lái)看看 terminate, 是不是很坑爹间驮? terminated 里面神躬厌!馬!也竞帽!沒(méi)扛施!干!屹篓。疙渣。。淡定堆巧,其實(shí)這個(gè)方法類似于 Activity 的生命周期方法妄荔,允許你在被終止時(shí)做一些事情,默認(rèn)的線程池沒(méi)有什么要做的事情谍肤,當(dāng)然什么也沒(méi)寫(xiě)啦~

/**
 * Method invoked when the Executor has terminated. Default
 * implementation does nothing. Note: To properly nest multiple
 * overridings, subclasses should generally invoke
 * {@code super.terminated} within this method.
 */
protected void terminated() { }

異常處理

還記得前面講到啦租,出現(xiàn)各種異常情況,添加隊(duì)列失敗等等谣沸,只是籠統(tǒng)的說(shuō)了一句扔掉刷钢,當(dāng)然代碼實(shí)現(xiàn)不可能是簡(jiǎn)單一句扔掉就完了∪楦剑回到 execute() 方法中找到 reject() 任務(wù)内地,看看究竟是怎么處理的

final void reject(Runnable command) {
    handler.rejectedExecution(command, this);
}

還記得在創(chuàng)建線程池的時(shí)候,初始化了一個(gè) handler — RejectedExecutionHandler

這是一個(gè)接口赋除,只有一個(gè)方法,接收兩個(gè)參數(shù)

void rejectedExecution(Runnable r, ThreadPoolExecutor executor);

既然是一個(gè)接口阱缓,那么肯定有他的實(shí)現(xiàn)類,我們先不急著看所有實(shí)現(xiàn)類举农,先來(lái)看看這里的 handler 可能是什么荆针,記得在使用 Executors 獲取線程池調(diào)用構(gòu)造方法的時(shí)候并沒(méi)有傳入 handler 參數(shù),那么 ThreadPoolExecutor 應(yīng)該會(huì)有一個(gè)默認(rèn)的 handler

private static final RejectedExecutionHandler defaultHandler =
    new AbortPolicy();
public static class AbortPolicy implements RejectedExecutionHandler {
    public AbortPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        throw new RejectedExecutionException("Task " + r.toString() +
                                             " rejected from " +
                                             e.toString());
    }
}

默認(rèn) handler 是 AbortPolicy ,這個(gè)類實(shí)現(xiàn)了 rejectedExecution() 方法颁糟,拋了一個(gè) Runtime 異常航背,也就是說(shuō)當(dāng)任務(wù)添加失敗,就會(huì)拋出異常棱貌。這個(gè)類在 AsyncTask 引發(fā)了一場(chǎng)血案~所以在 API19 以后修改了 AsyncTask 的部分代碼邏輯玖媚,這里就不細(xì)說(shuō)啦.

實(shí)際上,在 ThreadPoolExecutor 中除了 AbortPolicy 外還實(shí)現(xiàn)了三種不同類型的 handler

  • CallerRunsPolicy — 在 線程池沒(méi)有 shutdown 的前提下婚脱,會(huì)直接在執(zhí)行 execute 方法的線程里執(zhí)行這個(gè)任務(wù)
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    if (!e.isShutdown()) {
        r.run();
    }
}
  • DiscardPolicy — 啥也不干今魔,默默地丟掉任務(wù)~不信你看
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
  • DiscardOldestPolicy — 丟棄掉隊(duì)列中未執(zhí)行的勺像,最老的任務(wù),也就是任務(wù)隊(duì)列排頭的任務(wù)错森,然后再試圖在執(zhí)行一次
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    if (!e.isShutdown()) {
        e.getQueue().poll();
        e.execute(r);
    }
}

歡迎關(guān)注我的公眾號(hào)

我的公眾號(hào)
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末吟宦,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子涩维,更是在濱河造成了極大的恐慌殃姓,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,948評(píng)論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件激挪,死亡現(xiàn)場(chǎng)離奇詭異辰狡,居然都是意外死亡锋叨,警方通過(guò)查閱死者的電腦和手機(jī)垄分,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,371評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門(mén),熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)娃磺,“玉大人薄湿,你說(shuō)我怎么就攤上這事⊥滴裕” “怎么了豺瘤?”我有些...
    開(kāi)封第一講書(shū)人閱讀 157,490評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)听诸。 經(jīng)常有香客問(wèn)我坐求,道長(zhǎng),這世上最難降的妖魔是什么晌梨? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 56,521評(píng)論 1 284
  • 正文 為了忘掉前任桥嗤,我火速辦了婚禮,結(jié)果婚禮上仔蝌,老公的妹妹穿的比我還像新娘泛领。我一直安慰自己,他們只是感情好敛惊,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,627評(píng)論 6 386
  • 文/花漫 我一把揭開(kāi)白布渊鞋。 她就那樣靜靜地躺著,像睡著了一般瞧挤。 火紅的嫁衣襯著肌膚如雪锡宋。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 49,842評(píng)論 1 290
  • 那天特恬,我揣著相機(jī)與錄音执俩,去河邊找鬼。 笑死鸵鸥,一個(gè)胖子當(dāng)著我的面吹牛奠滑,可吹牛的內(nèi)容都是我干的丹皱。 我是一名探鬼主播,決...
    沈念sama閱讀 38,997評(píng)論 3 408
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼宋税,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼摊崭!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起杰赛,我...
    開(kāi)封第一講書(shū)人閱讀 37,741評(píng)論 0 268
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤呢簸,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后乏屯,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體根时,經(jīng)...
    沈念sama閱讀 44,203評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,534評(píng)論 2 327
  • 正文 我和宋清朗相戀三年辰晕,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了蛤迎。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,673評(píng)論 1 341
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡含友,死狀恐怖替裆,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情窘问,我是刑警寧澤辆童,帶...
    沈念sama閱讀 34,339評(píng)論 4 330
  • 正文 年R本政府宣布,位于F島的核電站惠赫,受9級(jí)特大地震影響把鉴,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜儿咱,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,955評(píng)論 3 313
  • 文/蒙蒙 一庭砍、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧概疆,春花似錦逗威、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,770評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至使套,卻和暖如春罐呼,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背侦高。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,000評(píng)論 1 266
  • 我被黑心中介騙來(lái)泰國(guó)打工嫉柴, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人奉呛。 一個(gè)月前我還...
    沈念sama閱讀 46,394評(píng)論 2 360
  • 正文 我出身青樓计螺,卻偏偏與公主長(zhǎng)得像夯尽,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子登馒,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,562評(píng)論 2 349

推薦閱讀更多精彩內(nèi)容

  • 2017.3.16 周四 剛才跟一個(gè)不認(rèn)識(shí)的學(xué)長(zhǎng)聊天匙握,感覺(jué)情商好低,還自我認(rèn)知不清晰陈轿,感覺(jué)快要沒(méi)救了圈纺。這是一個(gè)片段...
    薇糖糖糖閱讀 115評(píng)論 0 0
  • 2 初進(jìn)小屋,是前年的5月19日麦射。 此時(shí)蛾娶,我到北京已有一個(gè)禮拜了。我必須說(shuō)明一下潜秋,這不是我第一次來(lái)北京蛔琅,而是...
    路雨飛飛閱讀 382評(píng)論 0 3
  • 姓名:黃淑宜 公司:珠海三環(huán)知識(shí)產(chǎn)權(quán) 2017年9月17日打卡 【知~學(xué)習(xí)】 無(wú) 【行~實(shí)踐】 一、修身: 睡了個(gè)...
    淑宜閱讀 316評(píng)論 0 0
  • 死亡對(duì)于某些人來(lái)說(shuō)是結(jié)束半等,是停止揍愁,所以人們害怕死亡,恐懼死亡;但有些時(shí)候杀饵,死亡,是一種解脫谬擦,是一種重獲自由的好...
    蝎子女王閱讀 104評(píng)論 0 1