什么是線程池
線程池:管理一組工作線程的資源池。
為什么使用線程池
1.避免反復(fù)創(chuàng)建回收線程徙邻,降低資源消耗排嫌。
2.提供線程的可管理性。
3.提高響應(yīng)速度
如何創(chuàng)建線程池
ThreadPoolExecutor是jdk提供的線程池的服務(wù)類缰犁,基于ThreadPoolExecutor可以很容易將一個實(shí)現(xiàn)Runnable接口的任務(wù)放入線程池中執(zhí)行淳地,下面是ThreadPoolExecutor實(shí)現(xiàn):
//構(gòu)造函數(shù):
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
創(chuàng)建一個線程池需要以下幾個參數(shù):
- corePoolSize:線程池中線程的個數(shù)。
- maximumPoolSize:線程池最大容量帅容。
- keepAliveTime:線程池中空閑存活時間(單位:TimeUnit定)
- workQueue:存放任務(wù)的工作隊列颇象。
- threadFactory:線程工廠
- handler:當(dāng)任務(wù)數(shù)超過maximumPoolSize+corePoolSize后,任務(wù)交給handler處理并徘。
1.corePoolSize
當(dāng)客戶端提交一個新任務(wù)時遣钳,如果此時線程池中線程的個數(shù)小于corePoolSize,則線程池會新建一個線程來處理該任務(wù)饮亏,即時此時有空閑線程耍贾。
2.maximumPoolSize
客戶端提交新任務(wù)阅爽,此時線程池的任務(wù)隊列也已經(jīng)滿了,那么如果maximumPoolSize < corePoolSize,就新建線程執(zhí)行該任務(wù)荐开。
如果線程池用的是無界隊列付翁,那么這個參數(shù)也就不起任何作用了。
3.keepAliveTime
線程池中超過corePoolSize的線程會在空閑keepAliveTime時間后被關(guān)閉晃听,keepAliveTime單位由TimeUnit指定百侧,如果allowCoreThreadTimeOut=true,即核心線程如果空閑時間超過keepAliveTime時間后同樣會被關(guān)閉能扒。
4.workQueue
當(dāng)核心線程池已滿佣渴,即當(dāng)前worker數(shù)=corePoolSize時,新提交的任務(wù)會被放入工作隊列中初斑。此時一旦有worker完成手頭的任務(wù)就會到workQueue中領(lǐng)取一個新任務(wù)繼續(xù)執(zhí)行辛润。
工作隊列可以有以下幾種選擇:
(1).ArrayBlockingQueue:基于數(shù)組的有界阻塞隊列
(2).LinkedBlockingQueue:基于鏈表的阻塞隊列,可以不指定隊列大小,默認(rèn)Integer.MAX_VALUE见秤。性能高于ArrayBlockingQueue砂竖。
(3).SynchronousQueue
(4).PriorityBlockingQueue:具有優(yōu)先級的阻塞列,基于數(shù)組實(shí)現(xiàn)鹃答,內(nèi)部實(shí)際上實(shí)現(xiàn)了一個最小堆乎澄,每次offer、poll,都需要進(jìn)行堆調(diào)整操作O(logn)测摔。隊列中元素需要實(shí)現(xiàn)Comparable接口或初始化隊列時傳入一個Comparator對象置济。雖然初始化隊列時需指定隊列大小,但PrioriityBlockingQueue支持動態(tài)擴(kuò)容锋八,所以可以認(rèn)為是無限阻塞隊列浙于。
5.threadFactory
每當(dāng)線程池需要創(chuàng)建一個線程時,都是通過線程工廠方法來完成查库。默認(rèn)的線程工廠方法將創(chuàng)建一個新的路媚、非守護(hù)的線程,并且不包含特殊配置信息樊销。通過指定一個線程工廠方法,可以定制線程池的配置信息脏款。
我們可以通過實(shí)現(xiàn)ThreadFactory接口围苫,來定制線程工廠。樣例如下:
public interface ThreadFactory {
Thread newThread(Runnable r);
}
public class MyThreadFactory implements ThreadFactory {
private final String poolName;
public MyThreadFactory(String name){
this.poolName = name;
}
@Override
public Thread newThread(Runnable r) {
// TODO Auto-generated method stub
return new MyAppThread(r,poolName);
}
}
public class MyAppThread extends Thread {
public static final String DEFAULT_NAME = "MyAppThread";
private static volatile boolean debugLifecycle = false;
private static final AtomicInteger created = new AtomicInteger();
private static final AtomicInteger alive = new AtomicInteger();
public MyAppThread(Runnable r){
this(r,DEFAULT_NAME);
}
public MyAppThread(Runnable r,String name){
super(r,name + "-" + created.incrementAndGet());
setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
System.out.println("UNCATGHT in Thread " + t.getName());
}
});
}
public void run(){
try {
alive.incrementAndGet();
super.run();
} finally{
alive.decrementAndGet();
}
}
public static int getThreadsCreated(){return created.get();}
public static int getThreadAlive(){return alive.get();}
public static boolean getDebug(){return debugLifecycle;}
}
6.RejectedExecutionHandler
當(dāng)線程池達(dá)到最大容量撤师,飽和策略就發(fā)揮作用剂府,ThreadPoolExecutor的飽和策略可以通過setRejectedExecutionHandler方法來修改。
JDK提供幾種不同的RejectedExecutionHandler的實(shí)現(xiàn):
- AbortPolicy:終止策略是默認(rèn)飽和策略剃盾,直接拋出RejectedExecutionException腺占。調(diào)用者可以捕獲這個異常淤袜,并做相應(yīng)處理。
- CallerRunsPolicy:“調(diào)用者運(yùn)行”策略實(shí)現(xiàn)了一種調(diào)用機(jī)制衰伯,該策略不會拋棄任務(wù)铡羡,也不拋出異常,而是將任務(wù)退回調(diào)用者意鲸,從而降低新任務(wù)的流量
- DiscardPolicy:新任務(wù)無法入隊列時,直接拋棄該任務(wù)怎顾。
- DiscardOldestPolicy:拋棄下一個將要被執(zhí)行的任務(wù)读慎,然后嘗試重新提交新任務(wù)。(若工作隊列是一個優(yōu)先隊列槐雾,那么“拋棄最舊的”策略導(dǎo)致拋棄優(yōu)先級最高的任務(wù)夭委。)
飽和策略實(shí)現(xiàn)類都需要實(shí)現(xiàn)RejectedExecutionHandler接口,下面是四種jdk內(nèi)置的四種飽和策略的源碼:
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
// 直接拋出RejectedExecutionException異常
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
public static class DiscardPolicy implements RejectedExecutionHandler {
public DiscardPolicy() { }
//不做操作募强,直接丟棄了
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public DiscardOldestPolicy() { }
//拋棄隊頭的任務(wù)闰靴,即最早提交且未執(zhí)行的任務(wù)
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
創(chuàng)建線程池
jdk為我們提供了一個工廠類Executors,其中提供了幾個靜態(tài)工廠方法用于新建不同特性的線程池钻注。如下:
public class Executors {
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
newFixedThreadPool:將創(chuàng)建一個固定長度的線程池蚂且,每提交一個任務(wù)時就創(chuàng)建一個線程,直到達(dá)到線程池的最大數(shù)量幅恋,此時線程池的規(guī)模不再變化(如果某個線程發(fā)生Exception而結(jié)束杏死,那么線程池會補(bǔ)充一個新的線程)
newCachedThreadPool:創(chuàng)建一個可緩存的線程池,如果當(dāng)前線程池中線程的個數(shù)超過了處理需求時捆交,那么空閑線程將被回收淑翼,而當(dāng)需要增加線程時,則可以添加新的線程品追,線程池中個數(shù)不受限制(使用時格外注意玄括,防止內(nèi)存溢出)
newSingleThreadPool:這是一個單線程的Executor,它創(chuàng)建單個工作線程來執(zhí)行任務(wù)肉瓦,如果線程異常結(jié)束遭京,會創(chuàng)建一個新的線程來替代。newSingleThreadPool能確保依照任務(wù)在隊列中的順序串行執(zhí)行泞莉。
newScheduledThreadPool:創(chuàng)建一個固定長度的線程池哪雕,而且可以延時或定時方式來執(zhí)行任務(wù)。
注意事項
1.newFixedThreadPool和newSingleThreadExecutor都是用了LinkedBlockingQueue(),默認(rèn)capacity=Integer.MAX_VALUE鲫趁,線程池工作隊列可以認(rèn)為是無限大的斯嚎,所以線程池中的線程數(shù)不會超過CorePoolSize,maximumPoolSize可以認(rèn)為是一個無效參數(shù),且飽和策略不可能執(zhí)行堡僻,這幾點(diǎn)需要注意糠惫。
2.newFixedThreadPool(1)和newSingleThreadPool區(qū)別?
newSingleThreadPool返回的是一個代理對象钉疫,屏蔽了ThreadPoolExecutor的一些set方法硼讽,即newSingleThreadPool一旦返回,就無法在重新配置線程池的參數(shù)了陌选。
3.CachedThreadPool的corePoolSize=0理郑,即核心線程池默認(rèn)為空,maximumPoolSize=Integer.MAX_VALUE,最大線程池為無限大的咨油。且空閑線程等待新任務(wù)超過60秒即被終止您炉。
Executor生命周期
由于Executor以異步方式來執(zhí)行任務(wù),因此在任意時刻役电,之前提交的任務(wù)的狀態(tài)是無法立刻得到的赚爵。有些任務(wù)可能已經(jīng)完成,有些可能正在運(yùn)行法瑟,而其他的任務(wù)可能在隊列中等待執(zhí)行冀膝。
為了解決執(zhí)行任務(wù)的生命周期問題,ExecutorService擴(kuò)展了Executor接口霎挟,添加了一些生命周期管理的方法窝剖。如下:
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
// ...還有用于提交任務(wù)的一些方法
線程池的生命周期有以下幾種狀態(tài)
- RUNNING:接受新task,并且處理工作隊列中的任務(wù)酥夭。
- SHUTDOWN:不接受新task赐纱,但是繼續(xù)處理工作隊列中的任務(wù)。
- STOP:不接受新task熬北,不處理工作隊列中的任務(wù)疙描,并且中斷運(yùn)行中的線程。
- TIDYING:所有任務(wù)已被終止讶隐,線程池已清空(workerCount=0),此時線程池狀態(tài)變?yōu)門IDYING,并且準(zhǔn)備執(zhí)行terminated()方法起胰。
- TERMINATED:以完成terminated()方法。
線程池如何存儲自身狀態(tài)的巫延?
線程池的狀態(tài)信息是用一個AtomicInteger類型的變量ctl存儲的效五,定義如下:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
ctl中除了存放狀態(tài)信息,還存放了線程池當(dāng)前工作線程的個數(shù)信息烈评。下圖展示這兩個信息在ctl中的存儲形式:
下面是狀態(tài)相關(guān)信息的源碼火俄,結(jié)合上圖應(yīng)該就不難理解了
private static final int COUNT_BITS = Integer.SIZE - 3;//為啥減3
//0-28位全為1
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// RUNNING : 111。讲冠。。
private static final int RUNNING = -1 << COUNT_BITS;
// SHUTDOWN : 000适瓦。竿开。谱仪。
private static final int SHUTDOWN = 0 << COUNT_BITS;
// STOP : 001。否彩。疯攒。
private static final int STOP = 1 << COUNT_BITS;
// TIDYING : 010。列荔。敬尺。
private static final int TIDYING = 2 << COUNT_BITS;
//TERMINATED :110。贴浙。砂吞。
private static final int TERMINATED = 3 << COUNT_BITS;
// 通過簡單的位運(yùn)算獲取ctl中的信息
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
runState有5種狀態(tài),所以最少需要3bit來表示崎溃。這就是為什么COUNT_BITS=32-3
各種狀態(tài)之間的轉(zhuǎn)換時機(jī):
- RUNNING -> SHUTDOWN:調(diào)用shutdown()方法
- (RUNNING or SHUTDOWN) -> STOP:調(diào)用shutdownNow()
- SHUTDOWN -> TIDYING:線程池和工作隊列都為空時
- STOP -> TIDYING:線程池為空時
- TIDYING -> TERMINATED:調(diào)用terminated()后
關(guān)閉線程池方法
- shutdown():不再接受新任務(wù)蜻直,同時已提交的任務(wù)執(zhí)行完成,包括那些還在隊列中等待袁串,未開始執(zhí)行的任務(wù)概而。
- shutdownNow():將取消所有運(yùn)行中的任務(wù),并且不再啟動隊列中尚未開始執(zhí)行的任務(wù)囱修。之后再提交任務(wù)則拋出異常:java.util.concurrent.RejectedExecutionException赎瑰。
線程池工作流程
線程池處理新提交任務(wù)的流程如下:
如果當(dāng)前運(yùn)行的線程數(shù)小于配置的corePoolSize,就新建一個線程執(zhí)行該command任務(wù)破镰,即時此時線程池中有空閑線程餐曼。
如果線程池中線程個數(shù)達(dá)到corePoolSize,新提交的任務(wù)就被放入workQueue啤咽,等待線程池任務(wù)調(diào)度晋辆。
當(dāng)workQueue滿后,且maximumPoolSize > corePoolSize宇整,新提交任務(wù)會創(chuàng)建新線程執(zhí)行任務(wù)
當(dāng)workQueue滿后瓶佳,且線程池個數(shù)達(dá)到maximumPoolSize,則提交的任務(wù)交由RejectedExecutionHandler處理鳞青。
超過corePoolSize的線程霸饲,在空閑keepAliveTime時間后,將被關(guān)閉臂拓。
當(dāng)allowCoreThreadTimeOut=true時厚脉,corePoolSize個數(shù)內(nèi)的線程空閑時間達(dá)到keepAliveTime后,也將被關(guān)閉胶惰。
ThreadPoolExecutor 中execute源碼:如下:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {//新建線程執(zhí)行任務(wù)
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {//提交到工作隊列
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))//
reject(command);
}
線程池監(jiān)控
//線程池已完成任務(wù)數(shù)量
private long completedTaskCount;
//當(dāng)前運(yùn)行的線程數(shù)
public int getActiveCount()
此外傻工,ThreadPoolExecutor還提供了以下幾個鉤子函數(shù)用于擴(kuò)展它的行為,我們可以在子類中實(shí)現(xiàn)自己的邏輯,在每個任務(wù)執(zhí)行的前、后以及worker退出時進(jìn)行定制處理中捆。
protected void beforeExecute(Thread t, Runnable r) { }
protected void afterExecute(Runnable r, Throwable t) { }
protected void terminated() { }
線程的生命周期
下面從線程池中的某個線程的角度出發(fā)鸯匹,分析一下線程從被創(chuàng)建一直到被銷毀,整個生命周期里的工作流程
1. 線程的創(chuàng)建時機(jī)
- 提交任務(wù)時被創(chuàng)建(即客戶端調(diào)用submit方法)泄伪。不過提交任務(wù)未必一定會創(chuàng)建線程殴蓬,這在前面線程池的工作流程里已經(jīng)提到。
- 預(yù)先啟動線程池中核心線程池蟋滴。(如:調(diào)用prestartCoreThread染厅、prestartAllCoreThreads()等方法),下面是prestartAllCoreThreads的源碼
public int prestartAllCoreThreads() {
int n = 0;
while (addWorker(null, true))//firstTask為null
++n;
return n;
}
2. 線程在線程池中的數(shù)據(jù)結(jié)構(gòu)
線程池中的工作線程是被封裝到一個Worker類中津函,部分源碼如下:
private final class Worker extends AbstractQueuedSynchronizer
implements Runnable
{
/** worker關(guān)聯(lián)的線程 */
final Thread thread;
/** worker的第一個任務(wù),可能為null */
Runnable firstTask;
/** worker完成的任務(wù)數(shù)量 */
volatile long completedTasks;
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** 將任務(wù)代理給runWorker方法 */
public void run() {
runWorker(this);
}
肖粮。。球散。尿赚。
}
//下面是ThreadPoolExecutor中的runWorker方法源碼:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);//鉤子函數(shù)
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);//鉤子函數(shù)
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
當(dāng)firstTassk為null的情況下,線程的執(zhí)行流程如下圖
對于Executor框架蕉堰,需要明白以下兩點(diǎn)
Executor框架基于生產(chǎn)者-消費(fèi)者模式:提交任務(wù)的執(zhí)行者是生成者凌净,執(zhí)行任務(wù)的線程是消費(fèi)者。
Executor是異步執(zhí)行任務(wù)屋讶,這是通過隊列來實(shí)現(xiàn)的冰寻。