什么是線程池
線程池瘤泪,顧名思義就是裝線程的池子灶泵。其用途是為了幫我們重復(fù)管理線程,避免創(chuàng)建大量的線程增加開(kāi)銷(xiāo)均芽,提高響應(yīng)速度丘逸。
為什么要用線程池
作為一個(gè)嚴(yán)謹(jǐn)?shù)墓コ仟{,不會(huì)希望別人看到我們的代碼就開(kāi)始吐槽掀宋,new Thread().start()會(huì)讓代碼看起來(lái)混亂臃腫深纲,并且不好管理和維護(hù),那么我們就需要用到了線程池劲妙。
在編程中經(jīng)常會(huì)使用線程來(lái)異步處理任務(wù)湃鹊,但是每個(gè)線程的創(chuàng)建和銷(xiāo)毀都需要一定的開(kāi)銷(xiāo)。如果每次執(zhí)行一個(gè)任務(wù)都需要開(kāi)一個(gè)新線程去執(zhí)行镣奋,則這些線程的創(chuàng)建和銷(xiāo)毀將消耗大量的資源币呵;并且線程都是“各自為政”的,很難對(duì)其進(jìn)行控制侨颈,更何況有一堆的線程在執(zhí)行余赢。線程池為我們做的,就是線程創(chuàng)建之后為我們保留哈垢,當(dāng)我們需要的時(shí)候直接拿來(lái)用妻柒,省去了重復(fù)創(chuàng)建銷(xiāo)毀的過(guò)程。
線程池的處理邏輯
線程池ThreadPoolExecutor構(gòu)造函數(shù)
//五個(gè)參數(shù)的構(gòu)造函數(shù)publicThreadPoolExecutor(intcorePoolSize,intmaximumPoolSize,longkeepAliveTime, TimeUnit unit, BlockingQueue workQueue)
//六個(gè)參數(shù)的構(gòu)造函數(shù)-1
publicThreadPoolExecutor(intcorePoolSize,intmaximumPoolSize,longkeepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory)
//六個(gè)參數(shù)的構(gòu)造函數(shù)-2
publicThreadPoolExecutor(intcorePoolSize,intmaximumPoolSize,longkeepAliveTime, TimeUnit unit, BlockingQueue workQueue, RejectedExecutionHandler handler)
//七個(gè)參數(shù)的構(gòu)造函數(shù)publicThreadPoolExecutor(intcorePoolSize,intmaximumPoolSize,longkeepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
1.corePoolSize -> 該線程池中核心線程數(shù)最大值
核心線程:在創(chuàng)建完線程池之后耘分,核心線程先不創(chuàng)建举塔,在接到任務(wù)之后創(chuàng)建核心線程绑警。并且會(huì)一直存在于線程池中(即使這個(gè)線程啥都不干),有任務(wù)要執(zhí)行時(shí)央渣,如果核心線程沒(méi)有被占用计盒,會(huì)優(yōu)先用核心線程執(zhí)行任務(wù)。數(shù)量一般情況下設(shè)置為CPU核數(shù)的二倍即可芽丹。
2.maximumPoolSize -> 該線程池中線程總數(shù)最大值
線程總數(shù)=核心線程數(shù)+非核心線程數(shù)
非核心線程:簡(jiǎn)單理解北启,即核心線程都被占用,但還有任務(wù)要做志衍,就創(chuàng)建非核心線程
3.keepAliveTime -> 非核心線程閑置超時(shí)時(shí)長(zhǎng)
這個(gè)參數(shù)可以理解為暖庄,任務(wù)少,但池中線程多楼肪,非核心線程不能白養(yǎng)著培廓,超過(guò)這個(gè)時(shí)間不工作的就會(huì)被干掉,但是核心線程會(huì)保留春叫。
4.TimeUnit -> keepAliveTime的單位
TimeUnit是一個(gè)枚舉類(lèi)型肩钠,其包括:
NANOSECONDS : 1微毫秒 = 1微秒 / 1000
MICROSECONDS : 1微秒 = 1毫秒 / 1000
MILLISECONDS : 1毫秒 = 1秒 /1000
SECONDS : 秒
MINUTES : 分
HOURS : 小時(shí)
DAYS : 天
5.BlockingQueue workQueue -> 線程池中的任務(wù)隊(duì)列
默認(rèn)情況下,任務(wù)進(jìn)來(lái)之后先分配給核心線程執(zhí)行暂殖,核心線程如果都被占用价匠,并不會(huì)立刻開(kāi)啟非核心線程執(zhí)行任務(wù),而是將任務(wù)插入任務(wù)隊(duì)列等待執(zhí)行呛每,核心線程會(huì)從任務(wù)隊(duì)列取任務(wù)來(lái)執(zhí)行踩窖,任務(wù)隊(duì)列可以設(shè)置最大值,一旦插入的任務(wù)足夠多晨横,達(dá)到最大值洋腮,才會(huì)創(chuàng)建非核心線程執(zhí)行任務(wù)。
常見(jiàn)的workQueue有四種:
1.SynchronousQueue:這個(gè)隊(duì)列接收到任務(wù)的時(shí)候手形,會(huì)直接提交給線程處理啥供,而不保留它,如果所有線程都在工作怎么辦库糠?那就新建一個(gè)線程來(lái)處理這個(gè)任務(wù)伙狐!所以為了保證不出現(xiàn)<線程數(shù)達(dá)到了maximumPoolSize而不能新建線程>的錯(cuò)誤,使用這個(gè)類(lèi)型隊(duì)列的時(shí)候瞬欧,maximumPoolSize一般指定成Integer.MAX_VALUE贷屎,即無(wú)限大
2.LinkedBlockingQueue:這個(gè)隊(duì)列接收到任務(wù)的時(shí)候,如果當(dāng)前已經(jīng)創(chuàng)建的核心線程數(shù)小于線程池的核心線程數(shù)上限艘虎,則新建線程(核心線程)處理任務(wù)豫尽;如果當(dāng)前已經(jīng)創(chuàng)建的核心線程數(shù)等于核心線程數(shù)上限,則進(jìn)入隊(duì)列等待顷帖。由于這個(gè)隊(duì)列沒(méi)有最大值限制美旧,即所有超過(guò)核心線程數(shù)的任務(wù)都將被添加到隊(duì)列中,這也就導(dǎo)致了maximumPoolSize的設(shè)定失效贬墩,因?yàn)榭偩€程數(shù)永遠(yuǎn)不會(huì)超過(guò)corePoolSize
3.ArrayBlockingQueue:可以限定隊(duì)列的長(zhǎng)度榴嗅,接收到任務(wù)的時(shí)候,如果沒(méi)有達(dá)到corePoolSize的值陶舞,則新建線程(核心線程)執(zhí)行任務(wù)嗽测,如果達(dá)到了,則入隊(duì)等候肿孵,如果隊(duì)列已滿唠粥,則新建線程(非核心線程)執(zhí)行任務(wù),又如果總線程數(shù)到了maximumPoolSize停做,并且隊(duì)列也滿了晤愧,則發(fā)生錯(cuò)誤,或是執(zhí)行實(shí)現(xiàn)定義好的飽和策略
4.DelayQueue:隊(duì)列內(nèi)元素必須實(shí)現(xiàn)Delayed接口蛉腌,這就意味著你傳進(jìn)去的任務(wù)必須先實(shí)現(xiàn)Delayed接口官份。這個(gè)隊(duì)列接收到任務(wù)時(shí),首先先入隊(duì)烙丛,只有達(dá)到了指定的延時(shí)時(shí)間舅巷,才會(huì)執(zhí)行任務(wù)
6.ThreadFactory threadFactory -> 創(chuàng)建線程的工廠
可以用線程工廠給每個(gè)創(chuàng)建出來(lái)的線程設(shè)置名字。一般情況下無(wú)須設(shè)置該參數(shù)河咽。
7.RejectedExecutionHandler handler -> 飽和策略
這是當(dāng)任務(wù)隊(duì)列和線程池都滿了時(shí)所采取的應(yīng)對(duì)策略钠右,默認(rèn)是AbordPolicy, 表示無(wú)法處理新任務(wù)忘蟹,并拋出 RejectedExecutionException 異常飒房。此外還有3種策略,它們分別如下寒瓦。
(1)CallerRunsPolicy:用調(diào)用者所在的線程來(lái)處理任務(wù)情屹。此策略提供簡(jiǎn)單的反饋控制機(jī)制,能夠減緩新任務(wù)的提交速度杂腰。
(2)DiscardPolicy:不能執(zhí)行的任務(wù)垃你,并將該任務(wù)刪除。
(3)DiscardOldestPolicy:丟棄隊(duì)列最近的任務(wù)喂很,并執(zhí)行當(dāng)前的任務(wù)惜颇。
如何使用線程池
說(shuō)了半天原理,接下來(lái)就要用了少辣,java為我們提供了4種線程池FixedThreadPool凌摄、CachedThreadPool、SingleThreadExecutor漓帅、ScheduledThreadPool锨亏,幾乎可以滿足我們大部分的需要了:
1.FixedThreadPool
可重用固定線程數(shù)的線程池痴怨,超出的線程會(huì)在隊(duì)列中等待,在Executors類(lèi)中我們可以找到創(chuàng)建方式:
publicstaticExecutorServicenewFixedThreadPool(intnThreads){returnnewThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,newLinkedBlockingQueue());? ? }
FixedThreadPool的corePoolSize和maximumPoolSize都設(shè)置為參數(shù)nThreads器予,也就是只有固定數(shù)量的核心線程浪藻,不存在非核心線程。keepAliveTime為0L表示多余的線程立刻終止乾翔,因?yàn)椴粫?huì)產(chǎn)生多余的線程爱葵,所以這個(gè)參數(shù)是無(wú)效的。FixedThreadPool的任務(wù)隊(duì)列采用的是LinkedBlockingQueue反浓。
創(chuàng)建線程池的方法萌丈,在我們的程序中只需要,后面其他種類(lèi)的同理:
publicstaticvoidmain(String[] args){// 參數(shù)是要線程池的線程最大值ExecutorService executorService = Executors.newFixedThreadPool(10);
}
2.CachedThreadPool
CachedThreadPool是一個(gè)根據(jù)需要?jiǎng)?chuàng)建線程的線程池
publicstaticExecutorServicenewCachedThreadPool(){returnnewThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,newSynchronousQueue());? ? }
CachedThreadPool的corePoolSize是0雷则,maximumPoolSize是Int的最大值辆雾,也就是說(shuō)CachedThreadPool沒(méi)有核心線程,全部都是非核心線程巧婶,并且沒(méi)有上限乾颁。keepAliveTime是60秒,就是說(shuō)空閑線程等待新任務(wù)60秒艺栈,超時(shí)則銷(xiāo)毀英岭。此處用到的隊(duì)列是阻塞隊(duì)列SynchronousQueue,這個(gè)隊(duì)列沒(méi)有緩沖區(qū),所以其中最多只能存在一個(gè)元素,有新的任務(wù)則阻塞等待湿右。
3.SingleThreadExecutor
SingleThreadExecutor是使用單個(gè)線程工作的線程池诅妹。其創(chuàng)建源碼如下:
publicstaticExecutorServicenewSingleThreadExecutor(){returnnewFinalizableDelegatedExecutorService? ? ? ? ? ? (newThreadPoolExecutor(1,1,0L, TimeUnit.MILLISECONDS,newLinkedBlockingQueue()));? ? }
我們可以看到總線程數(shù)和核心線程數(shù)都是1,所以就只有一個(gè)核心線程毅人。該線程池才用鏈表阻塞隊(duì)列LinkedBlockingQueue吭狡,先進(jìn)先出原則,所以保證了任務(wù)的按順序逐一進(jìn)行丈莺。
4.ScheduledThreadPool
ScheduledThreadPool是一個(gè)能實(shí)現(xiàn)定時(shí)和周期性任務(wù)的線程池划煮,它的創(chuàng)建源碼如下:
publicstaticScheduledExecutorServicenewScheduledThreadPool(intcorePoolSize){returnnewScheduledThreadPoolExecutor(corePoolSize);? ? }
這里創(chuàng)建了ScheduledThreadPoolExecutor,繼承自ThreadPoolExecutor缔俄,主要用于定時(shí)延時(shí)或者定期處理任務(wù)弛秋。ScheduledThreadPoolExecutor的構(gòu)造如下:
publicScheduledThreadPoolExecutor(intcorePoolSize){super(corePoolSize, Integer.MAX_VALUE,? ? ? ? ? ? ? DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,newDelayedWorkQueue());? ? }
可以看出corePoolSize是傳進(jìn)來(lái)的固定值,maximumPoolSize無(wú)限大俐载,因?yàn)椴捎玫年?duì)列DelayedWorkQueue是無(wú)解的蟹略,所以maximumPoolSize參數(shù)無(wú)效。
當(dāng)執(zhí)行scheduleAtFixedRate或者scheduleWithFixedDelay方法時(shí)遏佣,會(huì)向DelayedWorkQueue添加一個(gè)實(shí)現(xiàn)RunnableScheduledFuture接口的ScheduledFutureTask(任務(wù)的包裝類(lèi))挖炬,并會(huì)檢查運(yùn)行的線程是否達(dá)到corePoolSize。如果沒(méi)有則新建線程并啟動(dòng)ScheduledFutureTask状婶,然后去執(zhí)行任務(wù)意敛。如果運(yùn)行的線程達(dá)到了corePoolSize時(shí)馅巷,則將任務(wù)添加到DelayedWorkQueue中。DelayedWorkQueue會(huì)將任務(wù)進(jìn)行排序草姻,先要執(zhí)行的任務(wù)會(huì)放在隊(duì)列的前面令杈。在跟此前介紹的線程池不同的是,當(dāng)執(zhí)行完任務(wù)后碴倾,會(huì)將ScheduledFutureTask中的time變量改為下次要執(zhí)行的時(shí)間并放回到DelayedWorkQueue中。
如何合理配置線程池的大小
一般需要根據(jù)任務(wù)的類(lèi)型來(lái)配置線程池大械衾觥:
如果是CPU密集型任務(wù)跌榔,就需要盡量壓榨CPU,參考值可以設(shè)為 NCPU+1
如果是IO密集型任務(wù)捶障,參考值可以設(shè)置為2*NCPU
當(dāng)然僧须,這只是一個(gè)參考值,具體的設(shè)置還需要根據(jù)實(shí)際情況進(jìn)行調(diào)整项炼,比如可以先將線程池大小設(shè)置為參考值担平,再觀察任務(wù)運(yùn)行情況和系統(tǒng)負(fù)載、資源利用率來(lái)進(jìn)行適當(dāng)調(diào)整锭部。
實(shí)現(xiàn)一個(gè)ThreadManager的線程池管理類(lèi):
public?class?ThreadManager?{??
//?定義兩個(gè)池子暂论,mNormalPool?訪問(wèn)網(wǎng)絡(luò)用的,mDownloadPool?是下載用的??
private?static?ThreadPoolProxy?mNormalPool???=?new?ThreadPoolProxy(1,?3,?5?*?1000);//param?0??最大線程數(shù)拌禾,param?1?核心線程數(shù)??
private?static?ThreadPoolProxy?mDownloadPool?=?new?ThreadPoolProxy(3,?3,?5?*?1000);??
//?proxy?是代理的意思??
//?定義兩個(gè)get方法取胎,獲得兩個(gè)池子的對(duì)象?,直接get?獲得到的是代理對(duì)象??
public?static?ThreadPoolProxy?getNormalPool()?{??
return?mNormalPool;??
????}??
public?static?ThreadPoolProxy?getDownloadPool()?{??
return?mDownloadPool;??
????}??
//?代理設(shè)計(jì)模式類(lèi)似一個(gè)中介湃窍,所以在中介這里有我們真正想獲取的對(duì)象??
//?所以要先獲取代理闻蛀,再獲取這個(gè)線程池??
public?static?class?ThreadPoolProxy?{??
private?final?int????????????????mCorePoolSize;?????//?核心線程數(shù)??
private?final?int????????????????mMaximumPoolSize;??//?最大線程數(shù)??
private?final?long???????????????mKeepAliveTime;????//?所有任務(wù)執(zhí)行完畢后普通線程回收的時(shí)間間隔??
private?ThreadPoolExecutor?mPool;??//?代理對(duì)象內(nèi)部保存的是原來(lái)類(lèi)的對(duì)象??
//?賦值??
public?ThreadPoolProxy(int?corePoolSize,?int?maximumPoolSize,?long?keepAliveTime)?{??
this.mCorePoolSize?=?corePoolSize;??
this.mMaximumPoolSize?=?maximumPoolSize;??
this.mKeepAliveTime?=?keepAliveTime;??
????????}??
private?void?initPool()?{??
if?(mPool?==?null?||?mPool.isShutdown())?{??
//????????????????int?corePoolSize?=?1;//核心線程池大小??
//????????????????int?maximumPoolSize?=?3;//最大線程池大小??
//????????????????long?keepAliveTime?=?5?*?1000;//保持存活的時(shí)間??
TimeUnit?unit??????=?TimeUnit.MILLISECONDS;//單位??
BlockingQueue?workQueue?=null;//阻塞隊(duì)列??
workQueue?=new?ArrayBlockingQueue(3);//FIFO,大小有限制,為3個(gè)??
//workQueue?=?new?LinkedBlockingQueue();??//隊(duì)列類(lèi)型為linked您市,其大小不定觉痛,無(wú)限大小??
//????????????????workQueue?=?new?PriorityBlockingQueue();??
ThreadFactory?threadFactory?=?Executors.defaultThreadFactory();//線程工廠??
RejectedExecutionHandler?handler?=null;//異常捕獲器??
//????????????????handler?=?new?ThreadPoolExecutor.DiscardOldestPolicy();//去掉隊(duì)列中首個(gè)任務(wù),將新加入的放到隊(duì)列中去??
//????????????????handler?=?new?ThreadPoolExecutor.AbortPolicy();//觸發(fā)異常??
handler?=new?ThreadPoolExecutor.DiscardPolicy();//不做任何處理??
//????????????????handler?=?new?ThreadPoolExecutor.CallerRunsPolicy();//直接執(zhí)行茵休,不歸線程池控制,在調(diào)用線程中執(zhí)行??
//????????????????new?Thread(task).start();??
//?創(chuàng)建線程池??
mPool?=new?ThreadPoolExecutor(mCorePoolSize,??
????????????????????????mMaximumPoolSize,??
????????????????????????mKeepAliveTime,??
????????????????????????unit,??
????????????????????????workQueue,??
????????????????????????threadFactory,??
????????????????????????handler);??
????????????}??
????????}??
/**
?????????*?執(zhí)行任務(wù)
?????????*?@param?task
?????????*/??
public?void?execute(Runnable?task)?{??
????????????initPool();??
//執(zhí)行任務(wù)??
????????????mPool.execute(task);??
????????}??
//?提交任務(wù)??
public?Future?submit(Runnable?task)?{??
????????????initPool();??
return?mPool.submit(task);??
????????}??
//?取消任務(wù)??
public?void?remove(Runnable?task)?{??
if?(mPool?!=?null?&&?!mPool.isShutdown())?{??
????????????????mPool.getQueue()??
????????????????????????.remove(task);??
????????????}??
????????}??
????}??
}??
END