概述
現(xiàn)在機(jī)器基本都是多核的,開啟多線程可以有效地增加系統(tǒng)的吞吐量和性能旅挤,如下是開啟一個線程最簡單的方式
new Thread(new Runnable() {
@Override
public void run() {
// do something
}
}).start();
這個線程使用完后瓤逼,就會被系統(tǒng)所回收笼吟。線程雖是輕量級的,但其創(chuàng)建霸旗、關(guān)閉依然需要花費時間贷帮、資源。當(dāng)任務(wù)粒度不大的時候诱告,大量創(chuàng)建線程會得不償失撵枢。而且當(dāng)線程數(shù)超過核心數(shù),創(chuàng)建后的線程還會處于等待狀態(tài)精居。
因此線程的數(shù)量最好是能控制的锄禽,且能夠復(fù)用,線程池即能滿足需求靴姿。下面看看創(chuàng)建一個線程池沃但,并提交一個任務(wù)去執(zhí)行的方式
ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.execute(new Runnable() {
@Override
public void run() {
// do something
}
});
實現(xiàn)原理
上面創(chuàng)建了一個只擁有一個線程的線程池,并提交一個任務(wù)去執(zhí)行佛吓。線程池的創(chuàng)建可以使用線程池工廠 Executors 里面的 new... 系列方法宵晚,含義如下
Executors.newSingleThreadExecutor(); // 創(chuàng)建一個只有單個線程的線程池垂攘。任務(wù)提交后,若這個線程空閑坝疼,則執(zhí)行搜贤,否則加入隊列,待線程空閑后執(zhí)行
Executors.newFixedThreadPool(numbersOfThread); // 創(chuàng)建一個有numbersOfThread個線程的線程池钝凶。任務(wù)提交后仪芒,若有空閑線程,則執(zhí)行耕陷,否則加入隊列掂名,待有線程空閑后執(zhí)行
Executors.newCachedThreadPool(); // 創(chuàng)建一個有無限個線程的線程池。任務(wù)提交后哟沫,若有空閑線程饺蔑,則執(zhí)行,否則創(chuàng)建新的線程去執(zhí)行
Executors.newSingleThreadScheduledExecutor(); // 在newSingleThreadExecutor之上擴(kuò)展了在給定時間執(zhí)行某任務(wù)的功能
...
查看線程工廠 Executors new... 方法的實現(xiàn)嗜诀,我們可以看到最終都是調(diào)用了 ThreadPoolExecutor 的構(gòu)造方法
public ThreadPoolExecutor(int corePoolSize, // 核心線程數(shù)量
int maximumPoolSize, // 最大線程數(shù)量
long keepAliveTime, // 當(dāng)線程數(shù)量超過核心線程數(shù)量時猾警,其余線程保活時間
TimeUnit unit, // 時間單位
BlockingQueue<Runnable> workQueue, // 任務(wù)隊列
ThreadFactory threadFactory, // 線程工廠
RejectedExecutionHandler handler // 任務(wù)提交失敗時的執(zhí)行策略
)
這里先說下任務(wù)提交后隆敢,線程池的執(zhí)行策略
(1) 當(dāng)線程池的實際線程數(shù)量小于corePoolSize時发皿,則優(yōu)先創(chuàng)建核心線程;
(2) 若大于等于corePooSize拂蝎,則將新的任務(wù)加入等待隊列穴墅;
(3) 若加入隊列失敗,并且線程數(shù)小于maximumPoolSize温自,則創(chuàng)建新的線程執(zhí)行任務(wù)玄货;
(4) 否則執(zhí)行拒絕策略。
下面我們看看具體的源碼
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) { // workerCountOf:當(dāng)前線程總數(shù)
if (addWorker(command, true)) // 創(chuàng)建一個核心線程并執(zhí)行當(dāng)前提交任務(wù)
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) { // 把任務(wù)加到任務(wù)隊列里面
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false)) // 創(chuàng)建一個非核心線程并執(zhí)行任務(wù)
reject(command);
}
接下來我們看看 Worker 工作線程的執(zhí)行過程
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable // 實現(xiàn)了Runnable接口
{
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this); // 用一開始設(shè)置的線程工廠創(chuàng)建了線程悼泌,addWorker之后會調(diào)用這個線程的start方法啟動線程
}
public void run() {
runWorker(this); // 執(zhí)行工作線程
}
final void runWorker(Worker w) {
try {
while (task != null || (task = getTask()) != null) { // 如果當(dāng)前任務(wù)未執(zhí)行松捉,則先執(zhí)行當(dāng)前任務(wù);否則去任務(wù)隊列里面拿任務(wù)執(zhí)行券躁,拿不到任務(wù)時惩坑,這個線程就退出了
...
try {
beforeExecute(wt, task); //任務(wù)執(zhí)行前的回調(diào)
Throwable thrown = null;
try {
task.run(); // 任務(wù)執(zhí)行
} finally {
afterExecute(task, thrown); // 任務(wù)執(zhí)行完的回掉
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
接下來看看 getTask() 獲取任務(wù)的過程
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // 要等待 or 阻塞
try {
Runnable r = timed ?
// 從隊列里面拿任務(wù),如果隊列為空也拜,最長等待時間為 keepAliveTime
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
// 從隊列里面拿任務(wù)以舒,如果隊列為空,則阻塞慢哈,直到隊列有新任務(wù)添加為止
workQueue.take();
// 所以線程池的核心線程為什么不會銷毀蔓钟、
// 非核心線程為什么能存活 keepAliveTime 時間,
// 超時后會被回收卵贱,是不是豁然開朗了滥沫?
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
可以看到侣集,線程池實現(xiàn)的相關(guān)特性,主要是通過 getTask() 時從任務(wù)隊列里面拿任務(wù)的等待阻塞來實現(xiàn)的
至于線程工廠兰绣,這里不再贅述世分。
下面最后再看看拒絕策略的種類
(1) AbortPolicy:直接拋異,阻止系統(tǒng)正常工作
(2) CallerRunsPolicy:只要線程池未關(guān)閉缀辩,直接在調(diào)用者線程執(zhí)行當(dāng)前任務(wù)
(3) DiscardOledesPolicy:丟棄最老的請求臭埋,嘗試重新提交任務(wù)
(4) DiscardPolicy:默默丟棄當(dāng)前提交的任務(wù)
或者可以自己實現(xiàn)RejectedExecutionHandler接口
至此,線程池的大體實現(xiàn)基本就清晰了