1.線程池的使用
Exector框架提供了各種類型的線程池九火,主要有以下幾種方法:
public static ExecutorService newFixedThreadPool(int nThreads)
public static ExecutorService newSingleThreadExecutor()
public static ExecutorService newCachedThreadPool()
public static ScheduledExecutorService newSingleThreadScheduledExecutor()
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
- newFixedThreadPool(int nThreads): 該方法返回一個(gè)固定線程數(shù)量的線程池。該線程池中的線程數(shù)量始終不變册招。當(dāng)有一個(gè)新的任務(wù)提交時(shí)岔激,線程池中若有空閑線程,則立即執(zhí)行是掰;如果沒(méi)有虑鼎,則新的任務(wù)會(huì)被暫存在一個(gè)任務(wù)隊(duì)列(LinkedBlockingQueue有序)中,等到有線程空閑的時(shí)候键痛,便處理任務(wù)隊(duì)列中的任務(wù)炫彩。
2.newSingleThreadExecutor():該方法返回一個(gè)只有一個(gè)線程的線程池。若多余一個(gè)任務(wù)被提交到線程池散休,任務(wù)會(huì)被保存到一個(gè)任務(wù)隊(duì)列(LinkedBlockingQueue)中媒楼,等到線程空閑乐尊,按先入先出的順序執(zhí)行隊(duì)列中的任務(wù)戚丸。
3.newCachedThreadPool() :該方法返回一個(gè)可根據(jù)實(shí)際情況調(diào)整線程數(shù)量的線程池。線程池中的數(shù)量不確定,但若有空閑的線程可以復(fù)用限府,則優(yōu)先使用可復(fù)用的線程夺颤。若所有線程均在工作,又有新的任務(wù)提交胁勺,則會(huì)創(chuàng)建新的線程處理任務(wù)世澜。所有線程在當(dāng)前任務(wù)執(zhí)行完畢后,返回線程池進(jìn)行復(fù)用署穗。
4.newSingleThreadScheduledExecutor() :該方法返回一個(gè)ScheduledExecutorService對(duì)象寥裂,線程池的大小為1。ScheduledExecutorService接口在ExecutorService接口之上擴(kuò)展了在給定時(shí)間執(zhí)行某任務(wù)的功能案疲,如在某個(gè)固定的延時(shí)之后執(zhí)行封恰,或者周期性執(zhí)行某個(gè)任務(wù)。
5.newScheduledThreadPool(int corePoolSize):該方法也返回一個(gè)ScheduledExecutorService對(duì)象褐啡,但可以指定線程的數(shù)量诺舔。
固定大小線程池使用實(shí)例
public class FixedThreadPoolDemo {
public static void main(String[] args) {
ExecutorService es = Executors.newFixedThreadPool(5);
for (int i=0;i<5;i++){
es.submit(()->{
System.out.println(System.currentTimeMillis()+"ThreadID:"+Thread.currentThread().getId());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
}
計(jì)劃任務(wù)
另一個(gè)值得注意的方法是newScheduledThreadPool(),它返回一個(gè)ScheduledExecutorService對(duì)象备畦,可以根據(jù)時(shí)間需要對(duì)線程進(jìn)行調(diào)度低飒。它的一些方法如下:
public ScheduledFuture<?> schedule(Runnable command,
long delay, TimeUnit unit);
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);
schedule會(huì)在給定時(shí)間,對(duì)任務(wù)進(jìn)行一次調(diào)度懂盐。
scheduleAtFixedRate任務(wù)調(diào)度的頻率是一定的褥赊,它是以上一個(gè)任務(wù)開(kāi)始執(zhí)行時(shí)間為起點(diǎn),在之后的period時(shí)間調(diào)度下一次任務(wù)允粤。
scheduleWithFixedDelay以上一次任務(wù)結(jié)束以后的時(shí)間為起點(diǎn)崭倘,再經(jīng)過(guò)delay時(shí)間進(jìn)行任務(wù)調(diào)度。
實(shí)例代碼
public class ScheduleAtFixedRate {
public static void main(String[] args) {
ScheduledExecutorService ses = Executors.newScheduledThreadPool(10);
ses.scheduleAtFixedRate(()->{
try {
Thread.sleep(1000);
System.out.println(Thread.currentThread().getId()+System.currentTimeMillis()/1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
},0,2, TimeUnit.SECONDS);
}
}
注意:如果任務(wù)的執(zhí)行時(shí)間超過(guò)調(diào)度時(shí)間的話类垫,比如調(diào)度周期是2S司光,任務(wù)執(zhí)行時(shí)間是8S,那么并不會(huì)出現(xiàn)多個(gè)任務(wù)堆疊到一起的情況悉患,如果將上述代碼的Thread.sleep改為8000残家,那么任務(wù)的執(zhí)行周期就不是2S,而是8S售躁。
2.核心線程池的內(nèi)部實(shí)現(xiàn)
無(wú)論是newFixedThreadPool方法還是newSingleThreadExecutor方法還是newCachedThreadPool方法坞淮,其內(nèi)部實(shí)現(xiàn)均使用了ThreadPoolExecutor類。
下面給出這三個(gè)線程池的實(shí)現(xiàn)方式
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
創(chuàng)建線程池主要通過(guò)ThreadPoolExecutor類完成陪捷。ThreadPoolExcetor有很多重載方法回窘,通過(guò)參數(shù)最多的構(gòu)造方法來(lái)理解創(chuàng)建線程池需要配置那些參數(shù),構(gòu)造方法如下:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
1.corePoolSize:指定了線程池中的線程數(shù)量市袖。
2.maximumPoolSize:指定了線程池中的最大線程數(shù)量啡直。烁涌。
3.keepAliveTime:空閑線程存活時(shí)間。如果當(dāng)前線程池的線程個(gè)數(shù)已經(jīng)超過(guò)了corePoolSize酒觅,并且線程空閑時(shí)間超過(guò)了keepAliveTime的話撮执,就會(huì)將這些空閑線程銷毀,這樣可以盡可能降低系統(tǒng)資源消耗舷丹。
4.unit:為keepAliveTime指定時(shí)間單位
- workQueue:任務(wù)隊(duì)列抒钱。被提交但尚未執(zhí)行的任務(wù)。
6.threadFactory:創(chuàng)建線程的工廠類颜凯∧北遥可以通過(guò)指定線程工廠為每個(gè)創(chuàng)建出來(lái)的線程設(shè)置更有意義的名字,如果出現(xiàn)并發(fā)問(wèn)題症概,也方便查找問(wèn)題原因瑞信。一般用默認(rèn)的就行。
7.handler:拒絕策略穴豫,當(dāng)任務(wù)太多來(lái)不及處理時(shí)凡简,如何拒絕任務(wù)。
workQueue和handler詳解
workQueue
參數(shù)workQueue指被提交但未執(zhí)行的任務(wù)隊(duì)列精肃,它是一個(gè)BlockingQueue接口的對(duì)象秤涩,僅用于存放Runnable對(duì)象∷颈В可以使用以下幾種:
1.直接提交隊(duì)列(SynchronousQueue)
SynchronousQueue是一種特殊的BlockingQueue筐眷。SynchronousQueue沒(méi)有容量,每一個(gè)插入操作都要等待一個(gè)響應(yīng)的刪除操作习柠,反之匀谣,每一個(gè)刪除操作都要等待對(duì)應(yīng)的插入操作。如果使用SynchronousQueue资溃,提交的任務(wù)不會(huì)被真實(shí)保存武翎,而總是將新任務(wù)提交給線程執(zhí)行,如果沒(méi)有空閑線程溶锭,則嘗試創(chuàng)建新的線程宝恶,則嘗試創(chuàng)建新的線程,如果線程數(shù)量達(dá)到最大值趴捅,則執(zhí)行拒絕策略垫毙。因此,使用SynchronousQueue隊(duì)列拱绑,通常要設(shè)置很大的maxumumPoolSize值综芥,否則很容易執(zhí)行拒絕策略
2.有界的任務(wù)隊(duì)列(ArrayBlockingQueue)
ArrayBlockingQueue類的構(gòu)造函數(shù)必須帶一個(gè)容量參數(shù),表示該隊(duì)列的最大容量
public ArrayBlockingQueue(int capacity)
當(dāng)使用有界的任務(wù)隊(duì)列的時(shí)候猎拨,若有新的任務(wù)需要執(zhí)行膀藐,如果線程池的實(shí)際線程數(shù)小于corePoolSize,則會(huì)優(yōu)先創(chuàng)建新的線程征峦;若大于corePoolSize,則會(huì)將新任務(wù)加入等待隊(duì)列消请。若等待隊(duì)列已滿,無(wú)法加入类腮,則在總線程數(shù)不大于maximumPoolSize的前提下臊泰,創(chuàng)建新的線程執(zhí)行任務(wù)。若大于maximumPoolSize蚜枢,則執(zhí)行拒絕策略缸逃。可見(jiàn)厂抽,有界隊(duì)列僅當(dāng)任務(wù)隊(duì)列裝滿時(shí)需频,才可能將線程數(shù)提升到corePoolSize以上。也就是說(shuō)除非系統(tǒng)特別繁忙筷凤,否則要確保核心線程數(shù)維持在corePoolSize昭殉。
3.無(wú)界的任務(wù)隊(duì)列 (LinkedBlockingQueue)
與有界任務(wù)隊(duì)列相比,除非系統(tǒng)資源耗盡藐守,否則無(wú)界的任務(wù)隊(duì)列不存在任務(wù)入隊(duì)失敗的情況挪丢。當(dāng)有新的任務(wù)到來(lái)時(shí),系統(tǒng)線程數(shù)小于corePoolSize時(shí)卢厂,線程池會(huì)生成新的線程執(zhí)行任務(wù)乾蓬,當(dāng)系統(tǒng)的線程數(shù)達(dá)到corePoolSize后,就不會(huì)繼續(xù)增加了慎恒。若后續(xù)仍有新的任務(wù)加入任内,又沒(méi)有空閑的線程資源,則任務(wù)直接進(jìn)入隊(duì)列等待融柬,若任務(wù)創(chuàng)建和處理的速度差異很大死嗦,無(wú)界隊(duì)列會(huì)保持一直增長(zhǎng),直到資源耗盡粒氧。
4.優(yōu)先任務(wù)隊(duì)列(PriorityBlockingQueue)
可以控制任務(wù)的執(zhí)行先后順序越走。它是一個(gè)特殊的無(wú)界隊(duì)列。PriorityBlockingQueue類可以根據(jù)任務(wù)自身的優(yōu)先級(jí)順序先后執(zhí)行靠欢,在確保系統(tǒng)性能的同時(shí)廊敌,也有很好額質(zhì)量保證。
回顧newFixedThreadPool方法的實(shí)現(xiàn)门怪,它返回了一個(gè)corePoolSize和maximumPoolSize大小一樣的骡澈,并且使用LinkedBlockingQueue任務(wù)隊(duì)列的線程池。對(duì)于固定大小的線程池而言掷空,不存在線程數(shù)量動(dòng)態(tài)變化肋殴,因此corePoolSize和maximumPoolSize大小相等囤锉,它使用無(wú)界隊(duì)列存放無(wú)法執(zhí)行的任務(wù),當(dāng)任務(wù)提交頻繁時(shí)护锤,該隊(duì)列迅速增長(zhǎng)官地,從而導(dǎo)致系統(tǒng)資源耗盡。
newSingleThreadExecutor方法返回的是單線程線程池烙懦,是newFixedThreadPool方法的一種退化驱入,只是將線程池線程數(shù)量設(shè)為1。
newCachedThreadPool方法的corePoolSize為0氯析,而maximumPoolSize為無(wú)窮大的線程池亏较,這意味著沒(méi)有任務(wù)時(shí),該線程池內(nèi)無(wú)線程掩缓,而當(dāng)任務(wù)被提交時(shí)雪情,該線程池會(huì)使用空閑線程執(zhí)行任務(wù),如果沒(méi)有空閑線程你辣,則將任務(wù)加入SynchronousQueue隊(duì)列巡通,而SynchronousQueue隊(duì)列是一種直接提交隊(duì)列,它會(huì)迫使線程池增加新的線程執(zhí)行任務(wù)舍哄。當(dāng)任務(wù)執(zhí)行完畢后扁达,由于corePoolSIze為0,空閑線程又會(huì)在指定60S被回收蠢熄。
對(duì)于newChchedThreadPool方法跪解,如果同時(shí)有大量任務(wù)被提交,而任務(wù)的執(zhí)行又不是很快時(shí)签孔,系統(tǒng)會(huì)開(kāi)啟等量的線程處理叉讥,這樣很快會(huì)耗盡系統(tǒng)資源。
ThreadPoolExecutor線程池的核心調(diào)度代碼如下
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
handler
handler指定了拒絕策略饥追,也就是當(dāng)任務(wù)數(shù)量超過(guò)系統(tǒng)實(shí)際承載能力時(shí)图仓,就要用到拒絕策略。拒絕策略可以說(shuō)是系統(tǒng)超負(fù)荷運(yùn)行時(shí)的補(bǔ)救措施但绕,通常由于壓力太大而引起的救崔。也就是當(dāng)?shù)却?duì)列已滿并且當(dāng)前線程數(shù)量超過(guò)maximumPoolSize時(shí),處理
的策略捏顺。
JDK內(nèi)置4種策略
AbortPolicy策略:該策略會(huì)直接拋出異常六孵,阻止系統(tǒng)正常工作。
CallerRunsPolicy策略:只要線程池未關(guān)閉幅骄,該策略直接在調(diào)用者線程中劫窒,運(yùn)行當(dāng)前被丟棄的任務(wù),顯然這樣做不會(huì)真的丟棄任務(wù)拆座,但是主巍,任務(wù)提交線程的性能極有可能急速下降冠息。
DiscardOldestPolicy策略:該策略丟棄最老的一個(gè)請(qǐng)求,也就是即將被執(zhí)行的一個(gè)任務(wù)孕索,并嘗試再次提交當(dāng)前任務(wù)
DiscardPolicy策略:該策略默默丟棄無(wú)法處理的任務(wù)逛艰,不予任何處理。如果允許任務(wù)丟棄搞旭,這是最好的方案散怖。