線程數(shù)量和隊(duì)列的關(guān)系
流程的示意圖
線程池的優(yōu)勢(shì)
小劉老實(shí)講源碼
線程池的優(yōu)勢(shì)
線程池是 Java 中對(duì)線程進(jìn)行統(tǒng)一管理,復(fù)用和監(jiān)控的組件芬为。它有幾個(gè)重要的參數(shù)疚俱,sizeCtl 它的高3位表示線程池的運(yùn)行狀態(tài)饰恕,剩余27位表示線程的運(yùn)行數(shù)量。它將執(zhí)行的線程封裝成了 Worker 對(duì)象盖奈,Worker 對(duì)象本身也是一個(gè)鎖,是獨(dú)占鎖模式狐援,主要是為了去確認(rèn)線程運(yùn)行的狀態(tài)钢坦,lock 狀態(tài)時(shí),表示 worker 在執(zhí)行啥酱,unlock 狀態(tài)時(shí) worker 沒有在執(zhí)行场钉。所有的 Worker 對(duì)象都由一個(gè) HashSet 管理。在添加一個(gè)線程時(shí)懈涛,會(huì)采用自旋和 CAS 的方式檢查線程的運(yùn)行狀態(tài)和運(yùn)行數(shù)量逛万,只有在狀態(tài)是 Runing 且線程數(shù)量符合要求時(shí),才可以將任務(wù)交給線程池。
線程池中核心線程和普通線程都是 Worker 對(duì)象宇植,區(qū)別是他們是否會(huì)執(zhí)行退出邏輯得封。當(dāng)設(shè)置超時(shí)時(shí)間,并且允許核心線程退出時(shí)指郁,每一個(gè) Worker 都會(huì)通過 BlockingQueue 的帶超時(shí)的 poll 去取任務(wù)忙上,當(dāng)沒有任務(wù)時(shí),就會(huì)執(zhí)行退出邏輯闲坎,此時(shí)就是一個(gè)普通線程疫粥。
當(dāng)不允許核心線程退出時(shí),如果當(dāng)前線程池中的線程沒有達(dá)到核心線程的上限腰懂,Worker 在取消息時(shí)就使用 take 方法梗逮,阻塞到它取到任務(wù),這樣 Worker 就不會(huì)執(zhí)行退出邏輯绣溜,它就是一個(gè)核心線程慷彤。
在 Worker 退出時(shí),如果當(dāng)前線程池是 Running 或者 ShutDown 狀態(tài)怖喻,線程池沒有工作線程了底哗,但是隊(duì)列中仍然有任務(wù),就再創(chuàng)建一個(gè)臨時(shí)的 Worker 去執(zhí)行這些任務(wù)锚沸。
線程池的狀態(tài)更新是通過自旋+ CAS 的方式更新的
線程池的原理
1.將一個(gè)任務(wù)交給線程池時(shí)跋选,線程池會(huì)經(jīng)過以下判斷
[圖片上傳失敗...(image-302947-1597385094797)]
2.創(chuàng)建新的工作線程后,工作線程會(huì)首先執(zhí)行傳遞進(jìn)來的任務(wù)哗蜈,然后再?gòu)娜蝿?wù)隊(duì)列中不斷取新的任務(wù)執(zhí)行
- shutDown 不在接收新的任務(wù)野建,但是可以繼續(xù)執(zhí)行已經(jīng)存在的任務(wù)
- shutDownNow 方法會(huì)停止所有線程,并且停止線程入隊(duì)
一恬叹、線程池的主要參數(shù)
corePoolSize:線程中的核心線程數(shù)候生,代表可以工作的線程數(shù)
maximumPoolSize:線程池中允許存在的最大線程數(shù)(coreThread + idle thread),當(dāng)?shù)却?duì)列滿后绽昼,將使用 maximumPoolSize 作為邊界條件
keeplive + unit : 允許空閑線程存活的時(shí)間
workQueue: 等待隊(duì)列唯鸭,當(dāng)核心線程滿后,新的任務(wù)加入到等待隊(duì)列
threadFactory 線程工廠硅确,用來創(chuàng)建線程
handler 拒絕策略
線程池的創(chuàng)建策略
[圖片上傳失敗...(image-7c730d-1597385094797)]
- 判斷核心線程池是否已滿目溉,如果不是,則創(chuàng)建線程執(zhí)行任務(wù)
- 如果核心線程池滿了菱农,判斷隊(duì)列是否滿了缭付,如果隊(duì)列沒滿,將任務(wù)放在隊(duì)列中
- 如果隊(duì)列滿了循未,則判斷線程池(maximumPoolSize 是否達(dá)到上限)是否已滿陷猫,如果沒滿,創(chuàng)建線程執(zhí)行任務(wù)
- 如果線程池也滿了,則按照拒絕策略對(duì)任務(wù)進(jìn)行處理
[圖片上傳失敗...(image-d33eef-1597385094797)]
隊(duì)列和最大
使用提交隊(duì)列 SynchronousQueue 時(shí)绣檬,它無法設(shè)置任務(wù)數(shù)足陨,當(dāng)任務(wù)來臨時(shí)立即提交執(zhí)行,當(dāng)任務(wù)數(shù)大于最大線程數(shù)時(shí)會(huì)拋出異常
public class ThreadPool {
private static ExecutorService pool;
public static void main( String[] args )
{
//maximumPoolSize設(shè)置為2 娇未,拒絕策略為AbortPolic策略墨缘,直接拋出異常
pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
for(int i=0;i<3;i++) {
pool.execute(new ThreadTask());
}
}
}
public class ThreadTask implements Runnable{
public void run() {
System.out.println(Thread.currentThread().getName());
}
}
output
pool-1-thread-1
pool-1-thread-2
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.hhxx.test.ThreadTask@55f96302 rejected from java.util.concurrent.ThreadPoolExecutor@3d4eac69[Running, pool size = 2, active threads = 0, queued tasks = 0, completed tasks = 2]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(Unknown Source)
使用有界隊(duì)列 ArrayBlockingQueue 時(shí),當(dāng)有任務(wù)來臨時(shí)會(huì)創(chuàng)建線程執(zhí)行任務(wù)零抬,當(dāng)達(dá)到 coreThreadSize 上限時(shí)會(huì)將任務(wù)加入到任務(wù)隊(duì)列中镊讼。如果等待隊(duì)列也滿,會(huì)繼續(xù)創(chuàng)建閑置線程直到達(dá)到 maximumPoolSize 的上限平夜。如果達(dá)到上限后仍然有任務(wù)到來蝶棋,會(huì)執(zhí)行拒絕的策略
線程池的生命狀態(tài)
Running:運(yùn)行的狀態(tài)
ShutDown:不接受新任務(wù),可以處理已經(jīng)添加的請(qǐng)求
Stop:不接受新的任務(wù)褥芒,停止正在運(yùn)行的任務(wù)嚼松,不處理隊(duì)列中的任務(wù)
Tidying:所有任務(wù)都已經(jīng)終止
Terminated:線程池徹底終止
線程池復(fù)用線程的原理
將一個(gè) Runnable 任務(wù)對(duì)象交給線程池后嫡良,如果線程池的核心線程數(shù)還沒有滿锰扶,它會(huì)創(chuàng)建一個(gè) Worker 對(duì)象,并將這個(gè) Runnable 交給這個(gè) Worker 對(duì)象寝受。Worker 的 Run 方法會(huì)啟動(dòng)一個(gè)循環(huán)坷牛,先執(zhí)行我們提交的任務(wù),當(dāng)我們提交的任務(wù)執(zhí)行完后很澄,會(huì)繼續(xù)從隊(duì)列中獲取新的任務(wù)去執(zhí)行京闰。
線程池源碼
// ctl 表示當(dāng)前線程池的運(yùn)行狀態(tài),它的低27位表示當(dāng)前運(yùn)行的線程數(shù)量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 線程運(yùn)行的狀態(tài)
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// ctl 的狀態(tài)是否比指定的 s 狀態(tài)小
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
// ctl 的狀態(tài)是否不小于指定的 s 狀態(tài)
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
// 任務(wù)隊(duì)列甩苛,當(dāng)核心線程數(shù)已經(jīng)滿時(shí)蹂楣,任務(wù)會(huì)加入到 workQueue 中
private final BlockingQueue<Runnable> workQueue;
// 線程池的全局鎖,向線程池添加或者減少線程讯蒲,修改線程池狀態(tài)都需要 lock痊土,保證在同一時(shí)刻是能有一個(gè)線程向線程池加入線程
private final ReentrantLock mainLock = new ReentrantLock();
// 存放所有線程的地方
private final HashSet<Worker> workers = new HashSet<>();
// termination.await 阻塞線程 termination.signalAll 喚醒所有阻塞線程
private final Condition termination = mainLock.newCondition();
// 記錄線程池生命周期內(nèi)運(yùn)行的線程的最大值
private int largestPoolSize;
// 完成過的所有任務(wù),當(dāng)線程被回收時(shí)計(jì)數(shù)
private long completedTaskCount;
// 線程空閑的時(shí)間墨林,如果超出空閑時(shí)間 allowCoreThreadTimeOut = false赁酝,核心線程處于閑置狀態(tài),也不會(huì)被回收
// allowCoreThreadTimeOut = true旭等,核心線程處于閑置狀態(tài)酌呆,會(huì)被回收
// 非核心線程超時(shí)后都會(huì)被回收
private volatile long keepAliveTime;
private volatile boolean allowCoreThreadTimeOut;
// 核心線程數(shù)量
private volatile int corePoolSize;
// 線程池中最大的線程數(shù)量,超過后加入的任務(wù)搔耕,會(huì)被執(zhí)行拒絕策略
private volatile int maximumPoolSize;
// 采用了 AQS 的獨(dú)占模式
// 獨(dú)占模式的兩個(gè)重要屬性 state 和 ExclusiveOwnerThread
// state 0 表示可以占用 >0 表示已經(jīng)被占用 <0 表示初始化中
// ExclusiveOwnerThread 表示獨(dú)占鎖的線程
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
// Worker 中封裝的工作線程
final Thread thread;
// 工作線程要執(zhí)行的第一個(gè)任務(wù)
Runnable firstTask;
volatile long completedTasks;
Worker(Runnable firstTask) {
// 設(shè)置初始化狀態(tài)
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
// 將 Worker 交給 Thread隙袁,當(dāng) Thread start 時(shí),會(huì)執(zhí)行 Worker 的 run 方法
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
// 是否被獨(dú)占 0 沒有 1 被獨(dú)占
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
// 申請(qǐng)占用鎖 設(shè)置 state 為 1
if (compareAndSetState(0, 1)) {
// 設(shè)置占有線程
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
// 外部不會(huì)直接調(diào)用,當(dāng)外部調(diào)用 unlock 時(shí)藤乙,tryRelease 被調(diào)用
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
execute方法
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// ctl 高3位線程運(yùn)行狀態(tài) 低 27 位表示線程數(shù)量
int c = ctl.get();
// 工作的線程數(shù)量是否小于核心線程數(shù)
if (workerCountOf(c) < corePoolSize) {
// 添加一個(gè)核心線程猜揪,并啟動(dòng)線程
if (addWorker(command, true))
return;
// 執(zhí)行到這里表示發(fā)生了并發(fā)調(diào)用,導(dǎo)致 addWorker 失敗
// 當(dāng)前線程狀態(tài)發(fā)生了改變
c = ctl.get();
}
// 核心線程數(shù)已經(jīng)滿了
// addWorker 失敗了
// 如果線程是 running 狀態(tài)坛梁,將任務(wù)加入到隊(duì)列中
if (isRunning(c) && workQueue.offer(command)) {
// 再次獲取 ctl
int recheck = ctl.get();
// 如果是非 running 狀態(tài)而姐,需要把剛提交的任務(wù)移除掉
if (! isRunning(recheck) && remove(command))
// 拒絕這個(gè)任務(wù)
reject(command);
// 如果線程池是運(yùn)行狀態(tài),為了保證當(dāng)前有線程工作划咐,加入了一層檢查
// 如果工作線程數(shù)是 0 拴念,啟動(dòng)一個(gè)線程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 如果線程入隊(duì)失敗,嘗試 add 一個(gè)非工作線程褐缠,如果達(dá)到最大線程數(shù)政鼠,則會(huì)失敗
// 如果線程處于非運(yùn)行狀態(tài)也會(huì)失敗
else if (!addWorker(command, false))
reject(command);
}
addWorker 操作
// firstTask 代表要執(zhí)行的任務(wù),可以為 null队魏,如果為空 Worker 會(huì)自動(dòng)從消息隊(duì)列中取消息
// core 代表啟動(dòng)的是否為核心線程公般,true 使用核心線程數(shù)量限制 false 使用最大線程數(shù)限制
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
// 使用自旋
for (;;) {
// 獲取線程池的狀態(tài)
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
// 如果當(dāng)前線程池不處于運(yùn)行狀態(tài),任務(wù)隊(duì)列沒有任務(wù)可以執(zhí)行胡桨,addWorker 失敗官帘,執(zhí)行拒絕策略
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
// 內(nèi)部自旋
for (;;) {
// 獲取當(dāng)前線程運(yùn)行的數(shù)量
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
// 根據(jù) core 來判斷是選用核心線程數(shù)還是最大線程數(shù)限制
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 通過 CAS 的方式更新執(zhí)行的線程數(shù)
// CAS 失敗,可能是其他線程修改了這個(gè)值
if (compareAndIncrementWorkerCount(c))
// 如果成功昧谊,直接跳出外部循環(huán) 因?yàn)?retry 標(biāo)記的是外部循環(huán)
break retry;
c = ctl.get(); // Re-read ctl
// 判斷線程池狀態(tài)是否發(fā)生了變化刽虹,如果發(fā)生了變化,繼續(xù)執(zhí)行外部循環(huán)
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// 創(chuàng)建 Worker 的過程
// 創(chuàng)建 worker 是否成功
boolean workerStarted = false;
// 添加 Worker 到 HashSet 是否成功
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
// 防止 ThreadFactory 類有 bug呢诬,因?yàn)?ThreadFactory 是可以通過外部傳入的
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 獲取線程狀態(tài)
int rs = runStateOf(ctl.get());
// 如果當(dāng)前線程是運(yùn)行狀態(tài) 可以將 Worker 加入到 HashSet 隊(duì)列中
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 當(dāng)線程已經(jīng)被啟動(dòng)過了涌哲,拋出異常
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
// add 成功
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 如果 add 成功 啟動(dòng)這個(gè)線程
if (workerAdded) {
t.start();
// 執(zhí)行線程成功
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
Worker 的 runWorker 方法
final void runWorker(Worker w) {
// 獲取 Worker 內(nèi)部的 thread 的對(duì)象
Thread wt = Thread.currentThread();
// 獲取我們傳入的任務(wù)
Runnable task = w.firstTask;
w.firstTask = null;
// 啟動(dòng) worker 之前,調(diào)用 unlock 將鎖的狀態(tài)和獨(dú)占線程 release 掉尚镰,進(jìn)行初始化
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// Worker 有外部傳入的任務(wù)阀圾,或者工作隊(duì)列中有任務(wù)可以執(zhí)行
while (task != null || (task = getTask()) != null) {
// 給 worker 加獨(dú)占鎖,主要是為了確認(rèn) Worker 是否有任務(wù)在執(zhí)行
w.lock();
// 如果當(dāng)前線程狀態(tài)大于 stop 給線程一個(gè)中斷信號(hào)
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
// 任務(wù)執(zhí)行完狗唉,釋放鎖
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
** getTask 獲取待執(zhí)行的任務(wù)**
getTask 使用一個(gè)自旋的操作
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 如果線程池不處于運(yùn)行狀態(tài)初烘,或者隊(duì)列中沒有任務(wù)
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
使用 CAS 的方式將計(jì)數(shù)減 1
decrementWorkerCount();
return null;
}
// 當(dāng)前運(yùn)行線程的數(shù)量
int wc = workerCountOf(c);
// Are workers subject to culling?
// 是否允許核心線程被回收
// true 表示核心線程也會(huì)被回收
// false 表示核心不會(huì)被回收
// 1. 我們?cè)O(shè)置了允許核心線程被回收
// 2. 當(dāng)前核心線程數(shù)量已經(jīng)超過了上限
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
// 核心線程是否回收取決于執(zhí)行 poll 超時(shí)方法,還是 take 阻塞方法敞曹。
// 如果使用 poll 在超時(shí)時(shí)間內(nèi)沒有獲取到任務(wù)账月,Worker 會(huì)執(zhí)行退出方法
// 如果使用 take 會(huì)一直阻塞在這里,直到獲取到任務(wù)澳迫,所以 Worker 不會(huì)退出局齿,這時(shí) Worker 就是一個(gè)核心線程
try {
// 如果執(zhí)行 poll 操作,會(huì)有可能到時(shí) worker 取不到任務(wù)橄登,執(zhí)行退出邏輯
// take 會(huì)一直阻塞在這里抓歼,直到取出任務(wù)
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
processWorkerExit 執(zhí)行 Worker 的退出
processWorkerExit 分為正常突出和異常退出
completedAbruptly true 表示異常退出 false 表示正常退出
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 如果是異常退出的讥此,讓工作線程計(jì)數(shù)減 1
if (completedAbruptly)
decrementWorkerCount();
// 需要操作線程池,所以要加鎖
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 從隊(duì)列中移除
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
// 嘗試終止線程
tryTerminate();
// 獲取當(dāng)前線程的狀態(tài)
// 如果是 Running 或者 ShutDown 狀態(tài)
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
// 如果不是異常退出的
if (!completedAbruptly) {
// 隊(duì)列中還有任務(wù)要執(zhí)行谣妻,就在創(chuàng)建一個(gè) Worker 去執(zhí)行任務(wù)
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
// 如果仍然有工作線程萄喳,不需要補(bǔ)充線程了
if (workerCountOf(c) >= min)
return; // replacement not needed
}
// 補(bǔ)充一個(gè)線程
addWorker(null, false);
}
}
shuDown 函數(shù)
根據(jù)鎖狀態(tài)判斷 Worker 是否閑置,閑置的線程直接發(fā)送中斷信號(hào)
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 使用自旋和 CAS 的方式更新線程池狀態(tài)
advanceRunState(SHUTDOWN);
// 中斷所有空閑線程
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 迭代所有 Worker
for (Worker w : workers) {
Thread t = w.thread;
// !t.isInterrupted() 表示當(dāng)前線程處于非中斷狀態(tài)
// tryLock 成功代表 Worker 處于空閑狀態(tài)
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}