為了更好的控制多線程鞋既,JDK提供了一套線程框架Executor然眼,幫助開發(fā)人員有效的進(jìn)行線程控制商源。
Executors創(chuàng)建線程的方法:
newFixedThreadPool方法:該方法返回一個(gè)固定數(shù)量的線程池寂诱,該方法的線程數(shù)始終不變癣缅,當(dāng)有一個(gè)任務(wù)提交時(shí)栈戳,若有空閑的線程岂傲,則立即執(zhí)行。若無空閑的線程再會(huì)加入到一個(gè)隊(duì)列里面去子檀。
newSingleThreadExecutor : 創(chuàng)建一個(gè)線程的線程池镊掖,若空閑則執(zhí)行。否則放入一個(gè)隊(duì)列之中褂痰。
newCachedThreadPool: 返回一個(gè)可根據(jù)實(shí)際情況調(diào)整線程個(gè)數(shù)的線程池亩进,不限制最大的線程數(shù)量,若用空閑的線程則執(zhí)行任務(wù)缩歪。若無任務(wù)則不創(chuàng)建線程归薛,并且每個(gè)線程在空閑60秒后會(huì)回收。
newScheduledThreadPool方法: 返回一個(gè)ScheduledExecutorService對(duì)象,該線程池可以指定線程數(shù)主籍。
ThreadPoolExecutor 介紹:
在了解Executors的創(chuàng)建方法之前习贫,必須先了解一個(gè)類 ThreadPoolExecutor ,因?yàn)镋xecutors都是基于這個(gè)類去實(shí)現(xiàn)的千元,我們也可以通過這個(gè)類去自定義我們需要的線程池苫昌。
這個(gè)自定義線程池常用的構(gòu)造方法:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
}
corePoolSize :核心線程數(shù)(在進(jìn)隊(duì)列之前,可以運(yùn)行的線程數(shù))
maximumPoolSize :當(dāng)隊(duì)列裝滿任務(wù)之后幸海,則會(huì)立即創(chuàng)建線程祟身,直到maximumPoolSize定義的上限。
keepAliveTime : 線程空閑之后物独,能夠等待的時(shí)間袜硫。
unit: 等待的時(shí)間單位。
workQueue :corePoolSize 線程滿了之后议纯,存放任務(wù)的隊(duì)列(主要分為有界和無界隊(duì)列)父款。
handler: 拒絕策略
有如下代碼:
public class UseThreadPoolExecutor1 implements Runnable {
private static AtomicInteger atomicInteger = new AtomicInteger(0);
public void run() {
try {
int count = atomicInteger.incrementAndGet();
System.out.println("任務(wù) : " + count);
System.out.println(System.currentTimeMillis());
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws InterruptedException {
//有界隊(duì)列
BlockingQueue blockingQueue = new ArrayBlockingQueue(10);
//核心線程數(shù)是1,最大線程數(shù)是1瞻凤,空閑線程等待時(shí)間60秒憨攒,初始大小為10的有界隊(duì)列
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1, 60, TimeUnit.SECONDS, blockingQueue);
//通過線程池,創(chuàng)建線程
for(int i = 0 ; i < 20 ; i++){
threadPoolExecutor.execute(new UseThreadPoolExecutor1());
}
Thread.sleep(1000);
System.out.println("blockingQueue size :" + blockingQueue.size());
Thread.sleep(2000);
}}
運(yùn)行結(jié)果:
我們總共創(chuàng)建了20個(gè)線程阀参,但是可以看到的是總共運(yùn)行了11個(gè)線程肝集,而且線程之間每2秒執(zhí)行一次,證明了線程是被一個(gè)一個(gè)創(chuàng)建的蛛壳。其中還拋了一個(gè)異常杏瞻,另外九個(gè)線程去哪里了呢?
在這里corePoolSize 是1 衙荐,workQueue 的大小是10.當(dāng)調(diào)用線程的時(shí)候會(huì)根據(jù) corePoolSize 創(chuàng)建一個(gè)線程捞挥,當(dāng)線程多余一個(gè)時(shí)則將任務(wù)放入隊(duì)列workQueue 。但是workQueue 也只能裝10個(gè)忧吟∑龊總共20個(gè)線程,多出了九個(gè)溜族,這個(gè)時(shí)候會(huì)去判斷maximumPoolSize的大小讹俊,如果maximumPoolSize是10,則就說明超出了隊(duì)列的承受范圍煌抒,我們總共可以創(chuàng)建10個(gè)線程仍劈,這個(gè)時(shí)候就會(huì)立即創(chuàng)建九個(gè)線程來運(yùn)行。但是我們這里maximumPoolSize只是1寡壮,所以導(dǎo)致剩余的9個(gè)線程無法進(jìn)入隊(duì)列贩疙,又無法創(chuàng)建讹弯,所以出現(xiàn)了異常。出現(xiàn)這樣的異常这溅,我們可以通過RejectedExecutionHandler 去進(jìn)行一個(gè)相應(yīng)的處理闸婴。
如下我們將代碼改一下,
public class UseThreadPoolExecutor1 implements Runnable {
private static AtomicInteger atomicInteger = new AtomicInteger(0);
public void run() {
try {
int count = atomicInteger.incrementAndGet();
System.out.println("創(chuàng)建時(shí)間: " + System.currentTimeMillis());
System.out.println("任務(wù) : " + count);
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws InterruptedException {
//有界隊(duì)列
BlockingQueue blockingQueue = new ArrayBlockingQueue(10);
//核心線程數(shù)是1芍躏,最大線程數(shù)是10,空閑線程等待時(shí)間60秒降狠,初始大小為10的有界隊(duì)列
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 10, 60, TimeUnit.SECONDS, blockingQueue);
//通過線程池对竣,創(chuàng)建線程
for (int i = 0; i < 20; i++) {
threadPoolExecutor.execute(new UseThreadPoolExecutor1());
}
Thread.sleep(1000);
System.out.println("blockingQueue size :" + blockingQueue.size());
Thread.sleep(2000);
}}
運(yùn)行結(jié)果:
在這里將maximumPoolSize的值設(shè)置為10,結(jié)果就變?yōu)榍?0個(gè)線程基本上是同一時(shí)間創(chuàng)建的榜配,后面10個(gè)線程是相隔2秒之后一同創(chuàng)建否纬。當(dāng)然,這里也可以將blockingQueue改為一個(gè)無界隊(duì)列LinkedBlockingQueue蛋褥,一樣可以保證我們的任務(wù)能夠全部運(yùn)行临燃。
分析Executors代碼:
看了自定義線程池threadPoolExecutor之后,我們可以看看Executors創(chuàng)建線程池的相應(yīng)代碼:
newFixedThreadPool:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
創(chuàng)建指定的固定線程數(shù)烙心,當(dāng)線程任務(wù)結(jié)束直接釋放線程膜廊,使用的無界隊(duì)列可以保證任務(wù)全部執(zhí)行。
newSingleThreadExecutor:
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
同 newFixedThreadPool類似淫茵,但是內(nèi)部固定只創(chuàng)建一個(gè)線程
newCachedThreadPool:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
創(chuàng)建一個(gè)上不封頂?shù)木€程池爪瓜,60空閑后釋放。SynchronousQueue可以保證一開始就被take阻塞中匙瘪,當(dāng)有任務(wù)put時(shí)铆铆,立即被消費(fèi)創(chuàng)建線程。
newScheduledThreadPool:
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
最終還是調(diào)用的ThreadPoolExecutor
該方法允許創(chuàng)建一個(gè)延遲隊(duì)列的線程池丹喻。