1. 前言
線程池是JAVA開(kāi)發(fā)中最常使用的池化技術(shù)之一,可以減少線程資源的重復(fù)創(chuàng)建與銷毀造成的開(kāi)銷。
2. 靈魂拷問(wèn):怎么做到線程重復(fù)利用?
很多同學(xué)會(huì)聯(lián)想到連接池胸遇,理所當(dāng)然的說(shuō):需要的時(shí)候從池中取出線程肮帐,執(zhí)行完任務(wù)再放回去。
如何用代碼實(shí)現(xiàn)呢?
此時(shí)就會(huì)發(fā)現(xiàn)番捂,調(diào)用線程的start方法后,生命周期就不由父線程直接控制了。線程的run方法執(zhí)行完成就銷毀了拴曲,所謂的“取出”和“放回”只不過(guò)是想當(dāng)然的操作。
這里先說(shuō)答案:生產(chǎn)者消費(fèi)者模型
3. ThreadPoolExecutor的實(shí)現(xiàn)
3.1 結(jié)構(gòu)
首先看下ThreadPoolExecutor的繼承結(jié)構(gòu)
頂級(jí)接口是Executor凛忿,定義execute方法
ExecutorService添加了submit方法澈灼,支持返回future獲取執(zhí)行結(jié)果,以及線程池運(yùn)行狀態(tài)的相關(guān)方法
本文著重講線程池的執(zhí)行流程店溢,因此將暫時(shí)忽略線程池的狀態(tài)相關(guān)的代碼叁熔,也建議新手看源碼時(shí)從核心流程看起。
3.2 核心方法:execute()
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 判斷是否小于核心線程數(shù)
if (workerCountOf(c) < corePoolSize) {
//添加worker床牧,添加成功則退出
if (addWorker(command, true))
return;
c = ctl.get();
}
// 核心線程數(shù)已用完則放入隊(duì)列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 雙重檢查荣回,避免入隊(duì)完成后,所有線程已銷毀戈咳,導(dǎo)致沒(méi)有消費(fèi)者消費(fèi)當(dāng)前任務(wù)
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 隊(duì)列已滿則開(kāi)啟非核心線程心软,達(dá)到最大線程數(shù)則使用拒絕策略
else if (!addWorker(command, false))
reject(command);
}
execute方法就是一個(gè)生產(chǎn)的過(guò)程壕吹,主要分為開(kāi)啟線程和入隊(duì)
開(kāi)啟線程會(huì)傳入command(即當(dāng)前任務(wù)),開(kāi)啟的線程會(huì)立即消費(fèi)該任務(wù)
入隊(duì)的任務(wù)則會(huì)由Worker消費(fèi)
主要關(guān)注corePoolSize删铃,maximumPoolSize耳贬,queueSize(任務(wù)隊(duì)列長(zhǎng)度),workerCount(當(dāng)前worker數(shù)量)這幾個(gè)參數(shù)猎唁,可以總結(jié)為以下:
已滿 | 未滿 | 操作 |
---|---|---|
corePoolSize | 開(kāi)啟核心線程 | |
corePoolSize | queueSize | 入隊(duì) |
queueSize | maximumPoolSize | 開(kāi)啟非核心線程 |
maximumPoolSize | 拒絕 |
3.3 消費(fèi)者:Worker
Worker類實(shí)現(xiàn)Runnable接口咒劲,繼承AQS,主要先關(guān)注thread和firstTask兩個(gè)屬性和run方法
Worker(Runnable firstTask) {
setState(-1);
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
從Worker的構(gòu)造方法可以看出诫隅,thread就是線程池中真正消費(fèi)任務(wù)的線程腐魂,創(chuàng)建時(shí)會(huì)傳入this(即Worker對(duì)象),而Worker實(shí)現(xiàn)了Runnable逐纬,因此線程運(yùn)行時(shí)就是執(zhí)行了Worker的run方法蛔屹。
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock();
boolean completedAbruptly = true;
try {
// getTask會(huì)阻塞,因此不會(huì)造成cpu飆高
while (task != null || (task = getTask()) != null) {
// ···
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 執(zhí)行傳入的Runnable
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
// 修改為null豁生,否則下次循環(huán)不會(huì)調(diào)用getTask
task = null;
// ···
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
再來(lái)看run方法判导,直接調(diào)用了ThreadPoolExecutor的runWorker方法,runWorker中有一個(gè)while循環(huán)沛硅,循環(huán)體執(zhí)行了task.run方法
task首先會(huì)獲取Worker中的firstTask屬性眼刃,并將其置為null,因此firstTask只會(huì)執(zhí)行一次摇肌,后續(xù)task將通過(guò)getTask方法獲取擂红。
因此Worker的工作流程可以概括為:消費(fèi)完Worker的firstTask后,循環(huán)執(zhí)行g(shù)etTask獲取任務(wù)并消費(fèi)围小,獲取不到task時(shí)昵骤,就退出循環(huán),線程銷毀肯适。
此處便可以看出生產(chǎn)者消費(fèi)者模型了变秦。
private Runnable getTask() {
boolean timedOut = false;
for (;;) {
int c = ctl.get();
// ···
int wc = workerCountOf(c);
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
// 嘗試減少計(jì)數(shù),失敗則會(huì)continue循環(huán)重試
if (compareAndDecrementWorkerCount(c))
// 此處返回null框舔,runWorker將退出循環(huán)
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
runWorker方法退出循環(huán)的條件是getTask返回null蹦玫。
觀察getTask,只有同時(shí)滿足以下情況時(shí)才會(huì)返回null
條件 | 解讀 | |
---|---|---|
1 | wc > maximumPoolSize || (timed && timedOut) | workQueue.poll方法超時(shí) |
2 | wc > 1 || workQueue.isEmpty() | 隊(duì)列任務(wù)全部執(zhí)行完 |
3 | compareAndDecrementWorkerCount(c) | cas減少workerCount成功 |
返回的task是通過(guò)workQueue.poll和workQueue.take得到的
兩者執(zhí)行時(shí)線程均會(huì)掛起刘绣,直至workQueue中有新的任務(wù)
不同之處在于poll方法阻塞keepAliveTime時(shí)間后會(huì)自動(dòng)喚醒并返回null樱溉,此時(shí)timeOut置為true,即滿足條件1纬凤,隨后繼續(xù)循環(huán)福贞,重復(fù)檢查是否大于核心線程數(shù)且隊(duì)列為空,是則嘗試減少workerCount并退出循環(huán)
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
// ···
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get();
// ···
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
// ···
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
// ···
}
return workerStarted;
}
了解了Worker之后停士,再來(lái)看execute中調(diào)用的addWorker方法
方法有兩個(gè)參數(shù)挖帘,firstTask即為Worker的firstTask完丽,core則標(biāo)記需要新增的是否是核心線程
retry循環(huán)與線程池狀態(tài)相關(guān),內(nèi)層for循環(huán)則是重復(fù)嘗試cas增加線程拇舀,若大于corePoolSize或者maximumPoolSize則新增失敗逻族,cas成功后,new一個(gè)Worker并start
3.4 總結(jié)
回到最初的問(wèn)題你稚,線程是如何做到重復(fù)利用的?
并不存在取出線程使用完再歸還的操作朱躺,線程啟動(dòng)后進(jìn)入循環(huán)刁赖,主動(dòng)獲取任務(wù)執(zhí)行,退出循環(huán)則線程銷毀长搀。
execute方法控制新增Worker和任務(wù)入隊(duì)
附:手寫(xiě)簡(jiǎn)易線程池
public class MyThreadPool implements Executor {
private int corePoolSize;
private int maximumPoolSize;
private BlockingQueue<Runnable> queue;
//記錄當(dāng)前工作線程數(shù)
private AtomicInteger count;
private long keepAliveTime;
private RejectHandler rejectHandler;
public MyThreadPool(int corePoolSize, int maximumPoolSize, BlockingQueue<Runnable> queue, long keepAliveTime, RejectHandler rejectHandler) {
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.queue = queue;
this.keepAliveTime = keepAliveTime;
this.rejectHandler = rejectHandler;
count = new AtomicInteger(0);
}
@Override
public void execute(Runnable task) {
int ct = count.get();
//核心線程數(shù)未滿宇弛,嘗試增加核心線程
if (ct < corePoolSize && count.compareAndSet(ct, ct + 1)) {
new Worker(task).start();
return;
}
//入隊(duì)
if (queue.offer(task)) {
return;
}
//重新獲取一遍count,否則如果在core分支cas失敗源请,此處必然也失敗
ct = count.get();
//隊(duì)列已滿枪芒,嘗試增加非核心線程
if (ct < maximumPoolSize && count.compareAndSet(ct, ct + 1)) {
new Worker(task).start();
return;
}
//已達(dá)最大線程數(shù),拒絕
rejectHandler.reject(task);
}
class Worker extends Thread {
Runnable firstTask;
public Worker(Runnable firstTask) {
this.firstTask = firstTask;
}
@Override
public void run() {
Runnable task = firstTask;
firstTask = null;
while (true) {
try {
//getTask會(huì)阻塞
if (task != null || (task = getTask()) != null) {
task.run();
} else {
//getTask超時(shí)才會(huì)進(jìn)入谁尸,直接退出舅踪,線程銷毀
break;
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//置空,否則不能getTask
task = null;
}
}
}
}
Runnable getTask() throws InterruptedException {
//標(biāo)記是否超時(shí)過(guò)
boolean timedOut = false;
while (true) {
int ct = count.get();
//超出核心線程數(shù)才進(jìn)入超時(shí)邏輯良蛮,即使timeOut由于線程poll超時(shí)過(guò)一次變成true抽碌,執(zhí)行到這里如果不超出corePoolSize,可以再次進(jìn)入take分支
if (ct > corePoolSize) {
//超出核心線程數(shù)
if (timedOut) {
//已超時(shí)過(guò)决瞳,嘗試減少工作線程數(shù)货徙,失敗會(huì)continue,然后重新比較corePoolSize皮胡,重試減少線程數(shù)
if (count.compareAndSet(ct, ct - 1)) {
return null;
} else {
continue;
}
}
Runnable task = queue.poll(keepAliveTime, TimeUnit.MILLISECONDS);
if (task == null) {
//poll超時(shí)才進(jìn)入
timedOut = true;
continue;
}
return task;
} else {
//必然能獲取到task
return queue.take();
}
}
}
public static interface RejectHandler {
void reject(Runnable r);
}
public static void main(String[] args) {
MyThreadPool pool = new MyThreadPool(2, 5, new LinkedBlockingQueue<>(100), 2000, r -> {
System.out.println(r + ": reject");
});
for (int i = 0; i < 3; i++) {
final int x = i;
new Thread(() -> {
for (int j = 0; j < 5; j++) {
final int y = j;
pool.execute(() -> {
try {
Thread.sleep(3000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
LocalDateTime now = LocalDateTime.now();
System.out.println(String.format("線程i=%s, j=%s,執(zhí)行結(jié)束: %s", x, y, now.format(DateTimeFormatter.ISO_DATE_TIME)));
});
}
}).start();
}
}
}