線程池學(xué)習(xí)看這篇就夠了!!!

本文目錄

  • 為什么要使用線程池抗俄?
  • 線程池參數(shù)詳解
  • 6種常見的線程池
  • 為什么不能直接自動(dòng)創(chuàng)建線程
  • 如果自定義合適的線程池?
  • 如何正確關(guān)閉線程池?
  • 線程池實(shí)現(xiàn)線程復(fù)用原理

為什么要使用線程池刨仑?

為什么要使用線程池占婉?

反復(fù)創(chuàng)建線程系統(tǒng)開銷比較大,而且每個(gè)線程的創(chuàng)建和銷毀都需要時(shí)間,如果任務(wù)比較簡(jiǎn)單,那么有可能導(dǎo)致線程的創(chuàng)建和銷毀占用的資源超過執(zhí)行任務(wù)所消耗的資源.

如果當(dāng)要執(zhí)行的任務(wù)比較多時(shí),每個(gè)線程負(fù)責(zé)一個(gè)任務(wù),那么需要?jiǎng)?chuàng)建很多線程去執(zhí)行任務(wù),過多的線程會(huì)占用過多的內(nèi)存資源等,還會(huì)帶來上下文切換,同時(shí)還會(huì)導(dǎo)致系統(tǒng)不穩(wěn)定.

線程池好處

線程池解決了線程生命周期的系統(tǒng)開銷問題,線程池中的線程可以反復(fù)使用,可以用少量的線程去執(zhí)行大量的任務(wù),減少了線程創(chuàng)建和銷毀的開銷,而且線程都是創(chuàng)建好的,來任務(wù)就可以執(zhí)行.

通過設(shè)置合適的線程池的線程數(shù),可以避免資源使用不當(dāng),線程池可以通過線程數(shù)和任務(wù)靈活的控制線程數(shù)量,任務(wù)多的時(shí)候可以繼續(xù)創(chuàng)建線程,任務(wù)少的時(shí)候只保留核心線程,這樣可以避免系統(tǒng)資源浪費(fèi)和線程過多導(dǎo)致內(nèi)存溢出.

線程池可以統(tǒng)一管理資源,通過線程書和任務(wù)隊(duì)列,可以統(tǒng)一開始和結(jié)束,并設(shè)置相關(guān)的拒絕策略.

線程池參數(shù)詳解

介紹線程池各個(gè)參數(shù)含義

corePoolSize:核心線程數(shù),常駐線程池的線程數(shù)量

maxPoolSize:線程池最大線程數(shù)量,當(dāng)任務(wù)特別多的時(shí)候,corePoolSize線程數(shù)量無法滿足需求的時(shí)候,就會(huì)繼續(xù)創(chuàng)建線程,最大不超過maxPoolSize.

KeepAliveTime+時(shí)間單位:空閑線程的存活時(shí)間

ThreadFactory:線程工廠,用來創(chuàng)建線程

WorkQueue:任務(wù)隊(duì)列,用來存放任務(wù)

Handler:處理被拒絕的策略

線程池處理任務(wù)流程圖:

如上圖所示,流程如下:


在這里插入圖片描述

當(dāng)提交任務(wù)后,,線程池首先會(huì)檢查當(dāng)前線程數(shù),如果當(dāng)前線程數(shù)小于核心線程數(shù),則新建線程并執(zhí)行任務(wù).

隨著任務(wù)不斷增加,線程數(shù)達(dá)到了核心線程數(shù)的數(shù)量,此時(shí)任務(wù)依然在增加,那么新來的任務(wù)將會(huì)放到workQueue等待隊(duì)列中,等核心線程執(zhí)行完任務(wù)后重新從隊(duì)列中提取出等待被執(zhí)行的任務(wù)

如果已經(jīng)達(dá)到了核心線程數(shù),且任務(wù)隊(duì)列也滿了,則線程池就會(huì)繼續(xù)創(chuàng)建線程來執(zhí)行任務(wù),如果任務(wù)不斷提交,線程池會(huì)持續(xù)創(chuàng)建線程直到達(dá)到maximumPoolSize最大線程數(shù),當(dāng)達(dá)到了最大線程數(shù)后,任務(wù)仍不斷提交,那么此時(shí)就超過了線程池的最大處理能力,這個(gè)時(shí)候線程池就會(huì)拒絕處理這些任務(wù),處理策略就是handler.

corePoolSize和maximumPoolSize:

從上面的流程中可以看出,線程池初始化時(shí),默認(rèn)的線程數(shù)是0,當(dāng)有任務(wù)提交后,開始創(chuàng)建核心線程去執(zhí)行任務(wù),當(dāng)線程數(shù)達(dá)到核心線程數(shù)時(shí)且任務(wù)隊(duì)列滿了后,開始創(chuàng)建非核心線程執(zhí)行任務(wù),最大可以達(dá)到maximumPoolSize,如果這是任務(wù)不提交了,線程開始空閑,那么默認(rèn)情況下大于corePoolSize的線程在超過設(shè)置的KeepAliveTime時(shí)間后會(huì)被合理的收回,所以默認(rèn)情況下,線程池中的線程數(shù)量處于corePoolSize和maximumPoolSize之間.
KeepAliveTime+時(shí)間單位:

默認(rèn)情況下,當(dāng)線程池中的數(shù)量多于核心線程數(shù)時(shí),而此時(shí)有沒有任務(wù)可做,那么線程池就會(huì)檢測(cè)線程的KeepAliveTime,如果超過了規(guī)定的時(shí)間,則無事可做的線程就會(huì)被銷毀,以便減少內(nèi)存的占用和資源消耗,如果后期任務(wù)又多了起來,則線程池根據(jù)規(guī)則重新創(chuàng)建線程,通過這個(gè)可伸縮的功能,可以實(shí)現(xiàn)對(duì)資源的合理使用,我們可以通過setKeepAliveTime設(shè)置keepAliveTime時(shí)間,還可以通過設(shè)置allowCoreThreadTimeOut參數(shù),這個(gè)參數(shù)默認(rèn)是false,如果設(shè)置成ture,則會(huì)給核心線程數(shù)設(shè)置超時(shí)等待時(shí)間, 如果超過時(shí)間了核心線程就會(huì)銷毀.
ThreadFactory:

ThreadFactory是一個(gè)線程工廠,負(fù)責(zé)生產(chǎn)線程去執(zhí)行任務(wù),默認(rèn)的線程工廠,創(chuàng)建的線程會(huì)在同一個(gè)線程組,并且擁有一樣的優(yōu)先級(jí),且都不是守護(hù)線程,我們也可自定義線程工廠,以便給線程自定義名字.
workQueue:

阻塞隊(duì)列,用來存放任務(wù),我們主要分析一下5種阻塞隊(duì)列:

  • ArrayBlockingQueue是基于數(shù)組的有界阻塞隊(duì)列,按照FIFO排序,新來的隊(duì)列會(huì)放到隊(duì)列尾部,有界的數(shù)組可以防止資源被耗盡問題,當(dāng)線程達(dá)到了核心線程數(shù),再來任務(wù)的時(shí)候就放到隊(duì)列的尾部,當(dāng)隊(duì)列滿了的時(shí)候,則繼續(xù)創(chuàng)建非核心線程,如果線程數(shù)量達(dá)到了maxPoolSize,則會(huì)執(zhí)行拒絕策略.

  • LinkedBlockingQueue是基于鏈表的無界阻塞隊(duì)列(最大容量是Integer.MAX),按照FIFO排序,當(dāng)線程池中線程數(shù)量達(dá)到核心線程數(shù)時(shí),繼續(xù)來了新任務(wù)會(huì)一直存放到隊(duì)列中,而不會(huì)創(chuàng)建新線程.因此使用此隊(duì)列時(shí),maxPoolSize是不起做的

  • SynchronousQueue是一個(gè)不緩存任務(wù)的阻塞隊(duì)列,當(dāng)來了新任務(wù)的時(shí)候,不會(huì)緩存到隊(duì)列中,而是直接被線程執(zhí)行該任務(wù),如果沒有核心線程可用就創(chuàng)建新線程去執(zhí)行任務(wù),達(dá)到了maxPoolSize時(shí),就執(zhí)行拒絕策略.

  • PriorityBlockingQueue是一個(gè)具有優(yōu)先級(jí)的無界阻塞隊(duì)列,優(yōu)先級(jí)通過參數(shù)Comparator實(shí)現(xiàn)

  • DelayedWorkQueu隊(duì)列的特點(diǎn)是內(nèi)部的任務(wù)并不是按照放入的時(shí)間排序,而是會(huì)按照延遲的時(shí)間長(zhǎng)短對(duì)任務(wù)進(jìn)行排序,內(nèi)部采用的是“堆”數(shù)據(jù)結(jié)構(gòu).而且它也是一個(gè)無界隊(duì)列.

handler:

拒絕策略是當(dāng)線程池中任務(wù)達(dá)到了隊(duì)列最大容量,且線程數(shù)量也達(dá)到了最大maxPoolSize的時(shí)候,如果繼續(xù)有新任務(wù)來了,則執(zhí)行這個(gè)拒絕策略來處理新來的任務(wù),jdk提供4種拒絕策略,它們都實(shí)現(xiàn)了RejectedExecutionHandler接口:
CallRunsPolicy:該策略下,在調(diào)用者線程中直接執(zhí)行被拒絕任務(wù)的run方法,就是誰提交的任務(wù),誰負(fù)責(zé)執(zhí)行任務(wù),這樣任務(wù)不會(huì)丟失,而且執(zhí)行任務(wù)比較費(fèi)時(shí),那么提交任務(wù)的線程也會(huì)被占用,就可以減緩任務(wù)提交速度.

  • AbortPolicy:該策略下,直接拋棄任務(wù),并拋RejectedExecutionException異常.

  • DiscardPolicy:該策略下,直接拋棄任務(wù).

  • DiscardOldestPolicy:該策略下,拋棄最早進(jìn)入隊(duì)列中的那個(gè)任務(wù),然后嘗試把這次拒絕的任務(wù)放入隊(duì)列.

除此之外虱疏,我們還可以通過實(shí)現(xiàn) RejectedExecutionHandler 接口來實(shí)現(xiàn)自己的拒絕策略,在接口中我們需要實(shí)現(xiàn)rejectedExecution方法,在rejectedExecution方法中,執(zhí)行例如暫存任務(wù)、重新執(zhí)行等自定義拒絕策略.

六種常見的線程池

FixedThreadPool

這個(gè)線程池的核心線程數(shù)和最大線程數(shù)是一樣的,所以可以看作是固定線程數(shù)的線程池,特點(diǎn)是當(dāng)線程達(dá)到核心線程數(shù)后,如果任務(wù)隊(duì)列滿了,也不會(huì)創(chuàng)建額外的非核心線程去執(zhí)行任務(wù),而是執(zhí)行拒絕策略.

CachedThreadPool

這個(gè)線程池叫做緩存線程池,特點(diǎn)是線程數(shù)幾乎是可以無限增加的(最大值是Integer.MAX_VALUE,基本不會(huì)達(dá)到),當(dāng)線程閑置時(shí)還可以進(jìn)行回收,而且它采用的存儲(chǔ)任務(wù)的隊(duì)列是SynchronousQueue隊(duì)列,隊(duì)列容量是0,實(shí)際不存儲(chǔ)任務(wù),只負(fù)責(zé)對(duì)任務(wù)的中轉(zhuǎn)和傳遞,所以來一個(gè)任務(wù)線程池就看是否有空閑的線程,有的話就用空閑的線程去執(zhí)行任務(wù),否則就創(chuàng)建一個(gè)線程去執(zhí)行,效率比較高.

ScheduledThreadPool

通過這個(gè)線程池的名字可以看出,它支持定時(shí)或者周期性執(zhí)行任務(wù),實(shí)現(xiàn)這種功能的方法主要有三種:

ScheduledExecutorService service = Executors.newScheduledThreadPool(10);
service.schedule(new Task(), 10, TimeUnit.SECONDS);
service.scheduleAtFixedRate(new Task(), 10, 10, TimeUnit.SECONDS);
service.scheduleWithFixedDelay(new Task(), 10, 10, TimeUnit.SECONDS);

第一種是schedule,通過延遲指定時(shí)間后執(zhí)行一次任務(wù),代碼中設(shè)置的是10秒,所以10秒后執(zhí)行一次任務(wù)就結(jié)束.
第二種是scheduleAtFixedRate,通過名稱我們可以看出,第二種是以固定頻率去執(zhí)行任務(wù),它的第二個(gè)參數(shù)initialDelay表示第一次延遲時(shí)間,第三個(gè)參數(shù)period表示周期,總體按照上面的代碼意思就是,第一次延遲10秒后執(zhí)行任務(wù),然后,每次延遲10秒執(zhí)行一次任務(wù).
第三種方法是scheduleWithFixeddelay這種與第二種方法類似,也是周期執(zhí)行任務(wù),不同的是對(duì)周期的定義,之前的scheduleAtFixedRate是以任務(wù)的開始時(shí)間為起點(diǎn)開始計(jì)時(shí),時(shí)間到了就開始執(zhí)行第二次任務(wù),而不管任務(wù)需要多久執(zhí)行,而scheduleWithFixeddelay是以任務(wù)結(jié)束的時(shí)間作為下一次循環(huán)開始的時(shí)間起點(diǎn).

SingleThreadExecutor

第四種線程池中只有一個(gè)線程去執(zhí)行任務(wù),如果執(zhí)行任務(wù)過程中發(fā)生了異常,則線程池會(huì)創(chuàng)建一個(gè)新線程來執(zhí)行后續(xù)任務(wù),這個(gè)線程因?yàn)橹挥幸粋€(gè)線程,所以可以保證任務(wù)執(zhí)行的有序性.

SingleThreadScheduleExecutor

這個(gè)線程池它和ScheduledThreadPool很相似,只不過它的內(nèi)部也只有一個(gè)線程,他只是將核心線程數(shù)設(shè)置為了1,如果執(zhí)行期間發(fā)生異常,同樣會(huì)創(chuàng)建一個(gè)新線程去執(zhí)行任務(wù).

ForkJoinPool

最后一種線程池是ForkJoinPool,這個(gè)線程池是來支持將一個(gè)任務(wù)拆分成多個(gè)“小任務(wù)”并行計(jì)算昙读,這個(gè)線程池是在jdk1.7之后加入的,它主要用于實(shí)現(xiàn)“分而治之”的算法擎椰,特別是分治之后遞歸調(diào)用的函數(shù),這里只是對(duì)ForkJoinPool做了一個(gè)簡(jiǎn)單的介紹,我們先來介紹一下ForkJoinPool和之前的線程池主要的兩個(gè)特點(diǎn)施流。

第一點(diǎn)是fork和join:

我們現(xiàn)來看看fork和join的含義响疚,fork就是將任務(wù)分解成多個(gè)子任務(wù),多個(gè)子任務(wù)互相獨(dú)立瞪醋,不受影響忿晕,執(zhí)行的時(shí)候可以利用 CPU 的多核優(yōu)勢(shì),并行計(jì)算银受,計(jì)算完成后各個(gè)子任務(wù)在調(diào)用join方法進(jìn)行結(jié)果匯總践盼,第一步是拆分也就是 Fork,第二步是匯總也就是 Join宾巍,我們通過下圖來理解:


在這里插入圖片描述

我們通過舉例斐波那契數(shù)列來展示這個(gè)線程池的使用咕幻。

1.首先我們創(chuàng)建任務(wù)類FibonacciTask繼承RecursiveTask類,重寫compute方法顶霞。其中的ForkJoinTask代表一個(gè)可以并行肄程、合并的任務(wù),F(xiàn)orkJoinTask是一個(gè)抽象類选浑,它還有兩個(gè)抽象子類:RecusiveAction和RecusiveTask蓝厌。其中RecusiveTask代表有返回值的任務(wù),而RecusiveAction代表沒有返回值的任務(wù)古徒,

2.我們?cè)赾ompute方法中實(shí)現(xiàn)斐波那契數(shù)列計(jì)算并獲取返回值拓提。

3.在main方法中創(chuàng)建ForkJoinPool,并調(diào)用線程池的submit(ForkJoinTask<T>task)方法隧膘,通過獲取返回的task.get()方法獲取計(jì)算的返回值代态。

任務(wù)類:FibonacciTask

/**
 * 這里我們的定義任務(wù)類繼承RecursiveTask寺惫,需要重寫一個(gè)compute方法,或者任務(wù)執(zhí)行的返回值
 * RecursiveAction和RecursiveTask是ForkJoinTask的兩個(gè)抽象子類蹦疑,
 * 其中的ForkJoinTask西雀,代表一個(gè)可以并行、合并的任務(wù)其中RecursiveAction
 * 表示沒有返回值的任務(wù)必尼,RecursiveTask是有返回值的任務(wù)
 */
public class FibonacciTask extends RecursiveTask<Integer> {
    private int i;
    FibonacciTask(int i){
        this.i=i;
    }
    @Override
        protected Integer compute() {
        if(i<=1){
            return i;
        }
        FibonacciTask f1=new FibonacciTask(i-1);
        //用 fork() 方法分裂任務(wù)并分別執(zhí)行
        f1.fork();
        FibonacciTask f2=new FibonacciTask(i-2);
        f2.fork();
        //使用 join() 方法把結(jié)果匯總
        return f1.join()+f2.join();
    }
}

main方法:

public static void main(String[] args) {
        ForkJoinPool forkJoinPool=new ForkJoinPool();
        for(int i=0;i<10;i++){
            ForkJoinTask<Integer> task = forkJoinPool.submit(new FibonacciTask(i));
            try {
                System.out.println(task.get());
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
    }

計(jì)算結(jié)果如下:

在這里插入圖片描述

第二點(diǎn)是內(nèi)部結(jié)構(gòu)不同:

之前的線程池所有的線程共用一個(gè)隊(duì)列蒋搜,但 ForkJoinPool 線程池中每個(gè)線程都有自己獨(dú)立的任務(wù)隊(duì)列,這個(gè)隊(duì)列是雙端隊(duì)列,如圖下所示:


在這里插入圖片描述

ForkJoinPool 線程池內(nèi)部除了有一個(gè)共用的任務(wù)隊(duì)列之外判莉,每個(gè)線程還有一個(gè)對(duì)應(yīng)的雙端隊(duì)列 deque,這時(shí)一旦線程中的任務(wù)被 Fork 分裂了育谬,分裂出來的子任務(wù)放入線程自己的 deque 里券盅,而不是放入公共的任務(wù)隊(duì)列中(公共任務(wù)隊(duì)列采用數(shù)組存放),如果此時(shí)有三個(gè)子任務(wù)放入線程 t1 的 deque 隊(duì)列中膛檀,對(duì)于線程 t1 而言獲取任務(wù)的成本就降低了锰镀,可以直接在自己的任務(wù)隊(duì)列中獲取而不必去公共隊(duì)列中爭(zhēng)搶也不會(huì)發(fā)生阻塞(除了后面會(huì)講到的 steal 情況外),減少了線程間的競(jìng)爭(zhēng)和切換咖刃,是非常高效的泳炉。

我們?cè)倏紤]一種情況,此時(shí)線程有多個(gè)嚎杨,而線程 t1 的任務(wù)特別繁重花鹅,分裂了數(shù)十個(gè)子任務(wù),但是 t0 此時(shí)卻無事可做枫浙,它自己的 deque 隊(duì)列為空刨肃,這時(shí)為了提高效率,t0 就會(huì)想辦法幫助 t1 執(zhí)行任務(wù)箩帚,這就是“work-stealing”的含義真友。

雙端隊(duì)列 deque 中,線程 t1 獲取任務(wù)的邏輯是后進(jìn)先出紧帕,也就是LIFO(Last In Frist Out)盔然,而線程 t0 在“steal”偷線程 t1 的 deque 中的任務(wù)的邏輯是先進(jìn)先出,也就是FIFO(Fast In Frist Out)是嗜,如圖所示愈案,圖中很好的描述了兩個(gè)線程使用雙端隊(duì)列分別獲取任務(wù)的情景。你可以看到叠纷,使用 “work-stealing” 算法和雙端隊(duì)列很好地平衡了各線程的負(fù)載刻帚。

在這里插入圖片描述

最后,我們用一張全景圖來描述 ForkJoinPool 線程池的內(nèi)部結(jié)構(gòu)涩嚣,你可以看到 ForkJoinPool 線程池和其他線程池很多地方都是一樣的崇众,但重點(diǎn)區(qū)別在于它每個(gè)線程都有一個(gè)自己的雙端隊(duì)列來存儲(chǔ)分裂出來的子任務(wù)掂僵。ForkJoinPool 非常適合用于遞歸的場(chǎng)景,例如樹的遍歷顷歌、最優(yōu)路徑搜索等場(chǎng)景锰蓬。

為什么不能直接自動(dòng)創(chuàng)建線程池

首先自動(dòng)創(chuàng)建線程池通過直接調(diào)用Executors.newCachedThreadPool()方法直接創(chuàng)建線程池.但是開發(fā)中我們不能直接使用創(chuàng)建的線程池,原因如下:

FixedThreadPool

通過下面FiexdThreadPool內(nèi)部代碼可以看出,FixedThreadPool內(nèi)部調(diào)用的是ThreadPoolExecutor的構(gòu)造函數(shù),構(gòu)造函數(shù)中是的的阻塞隊(duì)列是LinkedBlockingQueue,那么這就帶來了問題,當(dāng)任務(wù)處理速度比較慢的時(shí)候,雖然新增任務(wù)越來越多,隊(duì)列中堆積的任務(wù)就越來越多,最終會(huì)占用大量?jī)?nèi)存,并發(fā)生OOM,就會(huì)嚴(yán)重影響到程序運(yùn)行.

public static ExecutorService newFixedThreadPool(int nThreads) { 
    return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
}

SingleThreadExecutor

通過看下面SingleThreadExecutor的內(nèi)部代碼可以發(fā)現(xiàn),newSingleThreadExecutor和newFixedThreadPool的原理是一樣的,只不過是核心線程數(shù)和最大線程數(shù)都設(shè)置成了1,但是任務(wù)隊(duì)列還是無界的LinkedBlockingQueue,所以也會(huì)導(dǎo)致任務(wù)堆積,發(fā)生OOM問題.

public static ExecutorService newSingleThreadExecutor() { 
    return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));
}

CachedThreadPool

繼續(xù)看下圖CachedThreadPool內(nèi)部代碼,從代碼中可以看出,CachedThreadPool使用的任務(wù)阻塞隊(duì)列是SynchronousQueue,SynchronousQueue隊(duì)列我們前面介紹過,并不存儲(chǔ)任務(wù),只是對(duì)任務(wù)進(jìn)行直接轉(zhuǎn)發(fā),這個(gè)隊(duì)列不會(huì)引發(fā)OOM問題,但是我們?cè)诳醋畲缶€程數(shù)設(shè)置成了Integer.MAX_VALUE,所以CachedThreadPool線程池并不線程的數(shù)量,那么任務(wù)特別多的時(shí)候,就會(huì)創(chuàng)建非常多的線程,進(jìn)而導(dǎo)致系統(tǒng)內(nèi)存不足.

public static ExecutorService newCachedThreadPool() { 
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
}

ScheduledThreadPool和SingleThreadScheduledExector

ScheduledThreadPool和SingleThreadScheduledExector差不多,只不過是后者線程池中只有一個(gè)線程眯漩,ScheduledThreadPool的源碼如下:

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

我們?cè)谶M(jìn)入ScheduledThreadPoolExecutor構(gòu)造方法中去,從下圖可以看出,它采用的任務(wù)隊(duì)列是DelayWorkQueue,上面我們說過這個(gè)隊(duì)列一個(gè)延遲隊(duì)列同時(shí)也是一個(gè)無界隊(duì)列,所以它和LinkedBlockingQueue一樣,如果任務(wù)過多就可能OOM,代碼如下:

public ScheduledThreadPoolExecutor(int corePoolSize) { 
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue());
}

SingleThreadExecutor

第四種線程池中只有一個(gè)線程去執(zhí)行任務(wù),如果執(zhí)行任務(wù)過程中發(fā)生了異常,則線程池會(huì)創(chuàng)建一個(gè)新線程來執(zhí)行后續(xù)任務(wù),這個(gè)線程因?yàn)橹挥幸粋€(gè)線程,所以可以保證任務(wù)執(zhí)行的有序性.

如何自定義合適的線程池

這個(gè)也是面試中會(huì)被問到的問題,如果自定義合適的線程池呢?首先我們要調(diào)整線程池中的線程數(shù)量以便充分并合理的使用CPU和內(nèi)存資源,從而最大限度的提高性能.

CPU密集型任務(wù)

如果任務(wù)是一些列比較消耗CPU資源的任務(wù),比如加密芹扭、解密、壓縮赦抖、計(jì)算等,那么最佳線程數(shù)是CPU核心數(shù)的1~2倍,過多很導(dǎo)致占用大量CPU資源,這時(shí)每個(gè)CPU的核心工作基本都是滿負(fù)荷,設(shè)置過多的線程會(huì)造成不必要的上下文切換,而導(dǎo)致性能下降,而且在同一臺(tái)機(jī)器上,我們還要考慮到其他會(huì)占用較多CPU資源的程序運(yùn)行,然后做整體平衡.

耗時(shí)IO任務(wù)

例如數(shù)據(jù)庫舱卡、文件的讀寫,網(wǎng)絡(luò)通信等任務(wù),這種任務(wù)的特點(diǎn)是不會(huì)消耗很多CPU資源,但是IO操作很費(fèi)時(shí).這個(gè)時(shí)候可以設(shè)置最大線程數(shù)一般會(huì)大于CPU核心線程數(shù)很多倍,因?yàn)镮O速度相比于CPU速度比較慢,我們?cè)O(shè)置較少的線程數(shù),就會(huì)浪費(fèi)CPU資源,如果設(shè)置更多的線程數(shù),那么一部分線程正在等待IO的時(shí)候,他們此時(shí)不需要CPU計(jì)算,就能有更多線程去執(zhí)行IO操作,任務(wù)隊(duì)列中的等待任務(wù)就會(huì)減少,更合理的利用了資源.

java并發(fā)編程實(shí)戰(zhàn)中有推薦:線程數(shù) = CPU核心數(shù) *(1+平均等待時(shí)間/平均工作時(shí)間),我們可以通過這個(gè)式子計(jì)算出一個(gè)合理的線程數(shù)量,同時(shí)也可以根據(jù)進(jìn)行壓測(cè)队萤、監(jiān)控jvm的線程情況等方式,確定線程數(shù),更合理的利用資源.

總結(jié)以上特點(diǎn)可以得出以下幾點(diǎn):

線程的平均工作時(shí)間所占比例越高,就需要越少線程.

線程的平均等待時(shí)間所占比例越高,就需要越多的線程

針對(duì)不同的程序,進(jìn)行對(duì)應(yīng)的實(shí)際測(cè)試就可以獲得更合適的選擇.

如何正確關(guān)閉線程池

首先有5種在ThreadPoolExecutor中涉及的關(guān)閉線程的方法轮锥,我們挨個(gè)來分析。

void shutdown()

它可以安全的關(guān)閉一個(gè)線程池,調(diào)用shutdown()方法后,線程池不會(huì)立刻關(guān)閉,而是等執(zhí)行完正在執(zhí)行的任務(wù)和隊(duì)列中等待的任務(wù)后才徹底關(guān)閉,而且調(diào)用shutdown()方法后,如果還有新的任務(wù)繼續(xù)到來,那么線程池會(huì)根據(jù)拒絕策略直接拒絕后面來的新任務(wù).

boolean isShutdown()

這個(gè)方法可以返回ture或者false來判斷是否已經(jīng)開始了關(guān)閉工作,也就是是否執(zhí)行了shutdown或者shutdownNow方法,調(diào)用isShutdown()方法后如果返回true,并不代表線程池已經(jīng)徹底關(guān)閉了,僅僅代表開始了關(guān)閉流程,仍然可能有線程正在執(zhí)行任務(wù),隊(duì)列里也可能有任務(wù)等待被執(zhí)行.

boolean isTerminated()

這個(gè)方法可以檢測(cè)是否真正關(guān)閉了,不僅代表線程池是否已經(jīng)關(guān)閉,同時(shí)也代表線程池中的所有任務(wù)是否已經(jīng)都執(zhí)行完畢,比如已經(jīng)調(diào)用了shutdown()方法,但是有一個(gè)線程正在執(zhí)行任務(wù),則此時(shí)調(diào)用isShutdown方法返回true,而調(diào)用isTerminated方法便返回false,,因?yàn)榫€程池中還有任務(wù)再執(zhí)行,線程池沒有真正關(guān)閉,直到所有線程都執(zhí)行完畢,任務(wù)都執(zhí)行完畢,再調(diào)用isTermainted就返回ture.

boolean awaitTermination(long timeout,TimeUnit unit)要尔,throws IntereuptedException

awaitTermination并不是用來關(guān)閉線程池的,而是用來判斷線程池狀態(tài)的,參數(shù)需要傳入一個(gè)時(shí)間,如果我們?cè)O(shè)置10秒鐘,那么會(huì)有以下幾種情況:

等待期間,線程池已經(jīng)關(guān)閉且所有提交的任務(wù)都執(zhí)行完畢,那么方法就返回ture,相當(dāng)于線程池真正關(guān)閉了.

等待時(shí)間超時(shí)后,第一種情況未發(fā)生,那么方法返回false.

等待時(shí)間中,執(zhí)行任務(wù)的線程被中斷了,方法會(huì)拋出InterruptedException異常.

所以綜上可以看出,調(diào)用 awaitTermination 方法后當(dāng)前線程會(huì)嘗試等待一段指定的時(shí)間舍杜,如果在等待時(shí)間內(nèi),線程池已關(guān)閉并且內(nèi)部的任務(wù)都執(zhí)行完畢了赵辕,也就是說線程池真正“終結(jié)”了既绩,那么方法就返回 true,否則超時(shí)返回 fasle,我們則可以根據(jù) awaitTermination() 返回的布爾值來判斷下一步應(yīng)該執(zhí)行的操作还惠。

List<Runnable> shutdownNow()

調(diào)用shutdownNow()方法后,首先會(huì)給所有線程池中的線程發(fā)送interrupt中斷信號(hào),嘗試中斷這些任務(wù)的執(zhí)行,然后就任務(wù)隊(duì)列中在等待被執(zhí)行的任務(wù)轉(zhuǎn)移到一個(gè)List中并返回,我們可以再根據(jù)List做一些操作,shutdownNow() 的源碼如下所示:

public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            //轉(zhuǎn)換線程運(yùn)行狀態(tài)
            advanceRunState(STOP);
            //讓每一個(gè)已經(jīng)啟動(dòng)的線程都中斷,如果被中斷的線程對(duì)于中斷信號(hào)不理不睬
            //那么依然有可能導(dǎo)致任務(wù)不會(huì)停止
            interruptWorkers();
            //將隊(duì)列中任務(wù)放入tasks集合中,并返回.
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
    }

線程池復(fù)用原理

線程池復(fù)用原理

線程池可以把線程和任務(wù)進(jìn)行解耦饲握,線程歸線程,任務(wù)歸任務(wù)吸重,擺脫了之前通過 Thread 創(chuàng)建線程時(shí)的一個(gè)線程必須對(duì)應(yīng)一個(gè)任務(wù)的限制互拾。在線程池中,同一個(gè)線程可以從 BlockingQueue 中不斷提取新任務(wù)來執(zhí)行嚎幸,其核心原理在于線程池對(duì) Thread 進(jìn)行了封裝颜矿,并不是每次執(zhí)行任務(wù)都會(huì)調(diào)用 Thread.start() 來創(chuàng)建新線程,而是讓每個(gè)線程去執(zhí)行一個(gè)“循環(huán)任務(wù)”嫉晶,在這個(gè)“循環(huán)任務(wù)”中骑疆,不停地檢查是否還有任務(wù)等待被執(zhí)行,如果有則直接去執(zhí)行這個(gè)任務(wù)替废,也就是調(diào)用任務(wù)的 run 方法箍铭,把 run 方法當(dāng)作和普通方法一樣的地位去調(diào)用,相當(dāng)于把每個(gè)任務(wù)的 run() 方法串聯(lián)了起來椎镣,所以線程數(shù)量并不增加诈火。其中execute代碼如下:

public void execute(Runnable command) { 
    //如果傳入的Runnable的空,就拋出異常
    if (command == null) 
        throw new NullPointerException();
    int c = ctl.get();
    /**
    * 當(dāng)前線程數(shù)是否小于核心線程數(shù)状答,如果小于核心線程數(shù)就調(diào)用 addWorker() 
    * 方法增加一個(gè) Worker冷守,這里的 Worker 就可以理解為一個(gè)線程
    */
    if (workerCountOf(c) < corePoolSize) { 
        if (addWorker(command, true)) 
            return;
        c = ctl.get();
    } 
    if (isRunning(c) && workQueue.offer(command)) { 
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command)) 
            reject(command);
        else if (workerCountOf(recheck) == 0) 
            addWorker(null, false);
    } 
    else if (!addWorker(command, false)) 
        reject(command);
}

我們先來分析上面部分代碼刀崖,我們先分析下面這一段代碼:

if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
我們主要看addWorK(comond,tue)方法,addWorker 方法的主要作用是在線程池中創(chuàng)建一個(gè)線程并執(zhí)行第一個(gè)參數(shù)傳入的任務(wù)拍摇,它的第二個(gè)參數(shù)是個(gè)布爾值亮钦,如果布爾值傳入 true 代表增加線程時(shí)判斷當(dāng)前線程是否少于 corePoolSize,小于則增加新線程充活,大于等于則不增加蜂莉;同理,如果傳入 false 代表增加線程時(shí)判斷當(dāng)前線程是否少于 maxPoolSize混卵,小于則增加新線程映穗,大于等于則不增加,所以這里的布爾值的含義是以核心線程數(shù)為界限還是以最大線程數(shù)為界限進(jìn)行是否新增線程的判斷淮菠。addWorker() 方法如果返回 true 代表添加成功男公,如果返回 false 代表添加失敗。

接下來我們看下面這部分代碼

 if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
            //執(zhí)行拒絕策略
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }

如果代碼執(zhí)行到這里合陵,說明當(dāng)前線程數(shù)大于或等于核心線程數(shù)或者 addWorker 失敗了,那么就需要通過 if (isRunning(c) && workQueue.offer(command)) 檢查線程池狀態(tài)是否為 Running澄阳,如果線程池狀態(tài)是 Running 就把任務(wù)放入任務(wù)隊(duì)列中拥知,也就是 workQueue.offer(command)。如果線程池已經(jīng)不處于 Running 狀態(tài)碎赢,說明線程池被關(guān)閉低剔,那么就移除剛剛添加到任務(wù)隊(duì)列中的任務(wù),并執(zhí)行拒絕策略肮塞。

接下來我們上面這部分代碼的else分支邏輯:

else if (workerCountOf(recheck) == 0)
                addWorker(null, false);

能進(jìn)入這個(gè) else 說明前面判斷到線程池狀態(tài)為 Running襟齿,那么當(dāng)任務(wù)被添加進(jìn)來之后就需要防止沒有可執(zhí)行線程的情況發(fā)生(比如之前的線程被回收了或意外終止了),所以此時(shí)如果檢查當(dāng)前線程數(shù)為 0枕赵,也就是 workerCountOf**(recheck) == 0猜欺,那就執(zhí)行 addWorker() 方法新建線程。

接著我們?cè)倏醋詈笠徊糠执a:

else if (!addWorker(command, false)) 
    reject(command);

執(zhí)行到這里拷窜,說明線程池不是 Running 狀態(tài)或線程數(shù)大于或等于核心線程數(shù)并且任務(wù)隊(duì)列已經(jīng)滿了开皿,所以此時(shí)需要添加新線程,直到線程數(shù)達(dá)到“最大線程數(shù)”篮昧,所以此時(shí)就會(huì)再次調(diào)用 addWorker 方法并將第二個(gè)參數(shù)傳入 false赋荆,傳入 false 代表增加線程時(shí)判斷當(dāng)前線程數(shù)是否少于 maxPoolSize,小于則增加新線程懊昨,大于等于則不增加窄潭,也就是以 maxPoolSize 為上限創(chuàng)建新的 worker;addWorker 方法如果返回 true 代表添加成功酵颁,如果返回 false 代表任務(wù)添加失敗嫉你,說明當(dāng)前線程數(shù)已經(jīng)達(dá)到 maxPoolSize月帝,然后執(zhí)行拒絕策略 reject 方法。如果執(zhí)行到這里線程池的狀態(tài)不是 Running均抽,那么 addWorker 會(huì)失敗并返回 false嫁赏,所以也會(huì)執(zhí)行拒絕策略 reject 方法。

所以看到這里我們需要著重分析addWorker()方法油挥,這里的 Worker 可以理解為是對(duì) Thread 的包裝潦蝇,Worker 內(nèi)部有一個(gè) Thread 對(duì)象,它正是最終真正執(zhí)行任務(wù)的線程深寥,所以一個(gè) Worker 就對(duì)應(yīng)線程池中的一個(gè)線程攘乒,addWorker 就代表增加線程。我們看部分addWorker內(nèi)的方法:

boolean workerStarted = false;
        boolean workerAdded = false;
        //worker是內(nèi)部類實(shí)現(xiàn)了接口Runnable惋鹅,封裝了Thread
        Worker w = null;
        try {
            //獲取隊(duì)列第一個(gè)任務(wù)
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    int rs = runStateOf(ctl.get());
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        //集合则酝,包含池中的所有工作線程。只有當(dāng)持有主鎖時(shí)才能訪問闰集。
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                //調(diào)用線程的start方法沽讹。
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }

通過上圖中的注釋我們可以看出,addWork方法實(shí)際上是調(diào)用自己封裝的線程的start方法來啟動(dòng)線程武鲁,我們繼續(xù)看worker內(nèi)部類的run方法是如何實(shí)現(xiàn)的:

public void run() {
            runWorker(this);
        }
?
final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        //獲取第一個(gè)要執(zhí)行的任務(wù)爽雄,先進(jìn)先出
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            //實(shí)現(xiàn)線程復(fù)用的邏輯主要在一個(gè)不停循環(huán)的 while 循環(huán)體中
            while (task != null || (task = getTask()) != null) {
                w.lock();
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                    //直接調(diào)用task的run方法,而不是新建線程
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

從上圖中我們可以看到內(nèi)部類worker的run()方法實(shí)際上是調(diào)用runWorker(this)方法沐鼠,實(shí)現(xiàn)線程復(fù)用的邏輯主要是在一個(gè)不同的循環(huán)體while中進(jìn)行挚瘟,所以在runWorker(this)方法中主要做了兩件事:

通過取 Worker 的 firstTask 或者通過 getTask 方法從 workQueue 中獲取待執(zhí)行的任務(wù)。

直接調(diào)用 task 的 run 方法來執(zhí)行具體的任務(wù)(而不是新建線程饲梭,調(diào)用線程的start()方法)乘盖。

好了,本篇文章主要分析了線程池的基本概念和核心原理,也是作者對(duì)線程池學(xué)習(xí)的各方面的總結(jié)憔涉,基本上看完本篇文章能應(yīng)對(duì)很多線程池的相關(guān)面試以及日常開發(fā)需求订框,如果有什么不足或者錯(cuò)誤的地方希望讀者能給出建議!

-END

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末监氢,一起剝皮案震驚了整個(gè)濱河市布蔗,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌浪腐,老刑警劉巖纵揍,帶你破解...
    沈念sama閱讀 219,427評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異议街,居然都是意外死亡泽谨,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,551評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來吧雹,“玉大人骨杂,你說我怎么就攤上這事⌒劬恚” “怎么了搓蚪?”我有些...
    開封第一講書人閱讀 165,747評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)丁鹉。 經(jīng)常有香客問我妒潭,道長(zhǎng),這世上最難降的妖魔是什么揣钦? 我笑而不...
    開封第一講書人閱讀 58,939評(píng)論 1 295
  • 正文 為了忘掉前任雳灾,我火速辦了婚禮,結(jié)果婚禮上冯凹,老公的妹妹穿的比我還像新娘谎亩。我一直安慰自己,他們只是感情好宇姚,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,955評(píng)論 6 392
  • 文/花漫 我一把揭開白布匈庭。 她就那樣靜靜地躺著,像睡著了一般浑劳。 火紅的嫁衣襯著肌膚如雪嚎花。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,737評(píng)論 1 305
  • 那天呀洲,我揣著相機(jī)與錄音,去河邊找鬼啼止。 笑死道逗,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的献烦。 我是一名探鬼主播滓窍,決...
    沈念sama閱讀 40,448評(píng)論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼巩那!你這毒婦竟也來了吏夯?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,352評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤即横,失蹤者是張志新(化名)和其女友劉穎噪生,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體东囚,經(jīng)...
    沈念sama閱讀 45,834評(píng)論 1 317
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡跺嗽,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,992評(píng)論 3 338
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片桨嫁。...
    茶點(diǎn)故事閱讀 40,133評(píng)論 1 351
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡植兰,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出璃吧,到底是詐尸還是另有隱情楣导,我是刑警寧澤,帶...
    沈念sama閱讀 35,815評(píng)論 5 346
  • 正文 年R本政府宣布畜挨,位于F島的核電站筒繁,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏朦促。R本人自食惡果不足惜膝晾,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,477評(píng)論 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望务冕。 院中可真熱鬧血当,春花似錦、人聲如沸禀忆。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,022評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽箩退。三九已至离熏,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間戴涝,已是汗流浹背滋戳。 一陣腳步聲響...
    開封第一講書人閱讀 33,147評(píng)論 1 272
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留啥刻,地道東北人奸鸯。 一個(gè)月前我還...
    沈念sama閱讀 48,398評(píng)論 3 373
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像可帽,于是被迫代替她去往敵國和親娄涩。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,077評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容