在學(xué)習(xí)了CAS、原子類旱物、AQS遥缕、各種鎖、并發(fā)隊(duì)列之后我們開始學(xué)習(xí)線程池宵呛,既是對(duì)前面的鞏固也是對(duì)技術(shù)更進(jìn)一步探索单匣,可以說(shuō)大部分的框架都離不開線程池,
所以理解它對(duì)我們后期的學(xué)習(xí)有非常大的幫助
介紹
線程池主要解決兩個(gè)問題:
- 當(dāng)執(zhí)行大量并發(fā)任務(wù)時(shí)宝穗,線程池能提供較好的性能户秤,不需要每次使用new來(lái)創(chuàng)建線程對(duì)象,減少開銷
- 線程池實(shí)現(xiàn)了對(duì)線程的管理和資源限制逮矛,以及一些統(tǒng)計(jì)數(shù)據(jù)
在concurrent包中我們可以使用Executors工具類創(chuàng)建線程池鸡号,工具包中有不同的實(shí)現(xiàn),根據(jù)需要须鼎,返回不同的線程池實(shí)例鲸伴。
ThreadPoolExecutor是其中最基礎(chǔ)也是用的最多的一種,所以我們從它開始入手
類圖構(gòu)成
從類圖中可以看到晋控,它有一個(gè)原子變量ctl汞窗,它是用來(lái)記錄線程池狀態(tài)和線程個(gè)數(shù)的,有點(diǎn)類似讀寫鎖中的state
- private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
- private static final int COUNT_BITS = Integer.SIZE - 3; // 計(jì)數(shù)位為int類型位數(shù)-3(下面我們假設(shè)是32位)赡译,也就是低29位
- private static final int CAPACITY = (1 << COUNT_BITS) - 1; // 容量是29個(gè)1
- private static final int RUNNING = -1 << COUNT_BITS; // 高三位11100000000000000000000000000000
- private static final int SHUTDOWN = 0 << COUNT_BITS; // 000
- private static final int STOP = 1 << COUNT_BITS; // 001
- private static final int TIDYING = 2 << COUNT_BITS; // 010
- private static final int TERMINATED = 3 << COUNT_BITS; // 011
private static int runStateOf(int c) { return c & ~CAPACITY; } // 獲取高三位
private static int workerCountOf(int c) { return c & CAPACITY; } // 獲取低29位
private static int ctlOf(int rs, int wc) { return rs | wc; } // 獲取ctl值
如果看過讀寫鎖部分仲吏,就應(yīng)該覺得很熟悉的操作了,都是通過位運(yùn)算來(lái)獲取狀態(tài)捶朵,其高三位代表狀態(tài):
- RUNNING:處理阻塞隊(duì)列里的任務(wù)蜘矢,并且接受新任務(wù)
- SHUTDOWN:處理阻塞隊(duì)列里的任務(wù),但不接受新任務(wù) 調(diào)用shutdown()方法時(shí)
- STOP:中斷任務(wù)综看,且不處理阻塞隊(duì)列里的任務(wù)品腹,并且不接受新任務(wù) 顯式調(diào)用shutdownNow()
- TIDYING:表示在所有任務(wù)執(zhí)行完后(包括阻塞隊(duì)列)執(zhí)行terminated方法,當(dāng)線程池為空
- TERMINATED 終止?fàn)顟B(tài)3
其他核心參數(shù):
- BlockingQueue<Runnable> workQueue; 用于保存等待執(zhí)行的任務(wù)的阻塞隊(duì)列,也就是我們之前講到過的那些并發(fā)隊(duì)列
- ReentrantLock mainLock 獨(dú)占鎖红碑,用來(lái)控制worder的原子性舞吭,比如新增worker
- HashSet<Worker> workers 線程池執(zhí)行任務(wù)的對(duì)象,也就是真正工作的線程
- int corePoolSize; 核心線程個(gè)數(shù)
- int maximumPoolSize; 線程池最大線程數(shù)量
- RejectedExecutionHandler handler 飽和策略析珊,也就是隊(duì)列滿了羡鸥,并且線程個(gè)數(shù)達(dá)到最大值采取的處理方式,如拋出異常忠寻、丟棄但不拋出異常
- keepAliveTime 當(dāng)線程數(shù)量超過了corePoolSize指定的線程數(shù)惧浴,并且空閑線程空閑的時(shí)間達(dá)到當(dāng)前參數(shù)指定的時(shí)間時(shí)該線程就會(huì)被銷毀,如果調(diào)用過allowCoreThreadTimeOut(boolean value)方法允許核心線程過期奕剃,那么該策略針對(duì)核心線程也是生效的
- threadFactory: 創(chuàng)建線程的工廠衷旅,如果未指定則使用默認(rèn)的線程工廠
方法源碼分析
execute
線程池使用的核心方法是execute捐腿,該方法的作用是提交任務(wù)到線程池中執(zhí)行。
- execute
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); //三種情況柿顶,第一種是當(dāng)前線程個(gè)數(shù)小于corePollSize茄袖,開啟新的新的線程 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true))// 有可能線程數(shù)達(dá)到核心線程限制,則失敗 return; c = ctl.get(); } // 第二種情況嘁锯,如果狀態(tài)是Running宪祥,則添加任務(wù)到阻塞隊(duì)列。 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // 二次檢查家乘,如果不是Running狀態(tài)蝗羊,則執(zhí)行刪除并拒絕 if (! isRunning(recheck) && remove(command)) reject(command); // 否則如果當(dāng)前線程為空,則添加一個(gè)線程 else if (workerCountOf(recheck) == 0) addWorker(null, false); // 如果是 } // 第三種情況烤低,如果隊(duì)列嗎肘交,則新增線程,新增失敗再執(zhí)行拒絕策略 else if (!addWorker(command, false)) reject(command); }
總結(jié)上面的調(diào)度策略:
- 如果線程池中的線程數(shù)小于corePoolSize扑馁,那么每來(lái)一個(gè)任務(wù)都會(huì)創(chuàng)建一個(gè)新的線程
- 若當(dāng)前執(zhí)行的任務(wù)達(dá)到了corePoolSize指定的線程數(shù)時(shí)涯呻,也即所有的核心線程都在執(zhí)行任務(wù)時(shí),此時(shí)來(lái)的新任務(wù)會(huì)保存在workQueue指定的任務(wù)隊(duì)列中腻要;也就是第二種情況
- 當(dāng)所有的核心線程都在執(zhí)行任務(wù)复罐,并且任務(wù)隊(duì)列中存滿了任務(wù),此時(shí)若新來(lái)了任務(wù)雄家,那么線程池將會(huì)創(chuàng)建新線程執(zhí)行任務(wù)效诅;也就是第三種情況
- 如果隊(duì)列滿了,并且線程數(shù)達(dá)到了maximumPoolSize趟济,則reject 乱投,也就是第三種情況但新增失敗。(所以任務(wù)的最大數(shù)是queueSize+maximumPoolSize)
由上面代碼可知顷编,主要邏輯集中在addworker上
- addWorker(Runnable firstTask, boolean core)
addWorker方法中由于沒有做同步双肤,因此有很多判斷線程狀態(tài)的邏輯,我們注意到一般判斷都是以shutdown為分界線钮惠,shutdown以上自不必說(shuō)茅糜,肯定不能再創(chuàng)建線程了,只要注意如果線程池為shutdown狀態(tài)時(shí)如何處理就行素挽,只有當(dāng)入?yún)irstTask為null蔑赘,且隊(duì)列不為空時(shí)才會(huì)增加工作線程,其他不增加。 那么重點(diǎn)就在于什么時(shí)候會(huì)入?yún)閚ull呢米死?我們先往下看锌历,再回來(lái)解答這個(gè)問題贮庞。private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) // 走到這就代表可以安全的創(chuàng)建線程了 break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } // 主要分兩部分戚炫,前半部分使用cas來(lái)更新工作線程數(shù)量,后半部分使用加鎖來(lái)創(chuàng)建線程媳纬。 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); // work線程 final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); // 增加工作線程 int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; // 更新已使用的最大線程數(shù) workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); // 啟動(dòng)線程 workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
Worker執(zhí)行
要了解Worker這個(gè)類峦筒,我就可以從它的官方注釋說(shuō)起
- 為什么要繼承AQS,也就是為什么要鎖窗慎?
This protects against interrupts that are intended to wake up a worker thread waiting for a task from instead interrupting a task being run.
翻譯過來(lái)就是物喷,為了中斷正在等待任務(wù)的線程,而不是中斷正在運(yùn)行的線程遮斥。所以峦失,實(shí)現(xiàn)鎖,主要是為了運(yùn)行中的線程不被中斷(加鎖就不會(huì))术吗。 - 為什么不使用ReentrantLock而是自己實(shí)現(xiàn)呢尉辑?
because we do not want worker tasks to be able to reacquire the lock when they invoke pool control methods like setCorePoolSize.
翻譯過來(lái)就是,不想讓在調(diào)用線程池方法時(shí)较屿,重入獲取鎖隧魄。(會(huì)調(diào)用interruptIdleWorkers,tryLock會(huì)重入隘蝎,來(lái)中斷線程)
- 構(gòu)造函數(shù)
Worker(Runnable firstTask) { setState(-1); // 防止執(zhí)行runWorker之前的中斷 this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); }
- run 這個(gè)是現(xiàn)成的方法购啄,實(shí)際里面執(zhí)行的是runWorker(this)
public void run() { runWorker(this); }
結(jié)合上文中說(shuō)到再addWorker成功后會(huì)啟動(dòng)線程,也就是執(zhí)行這里的runWorker(this)方法嘱么。
- runWorker
函數(shù)的作用就是不斷的循環(huán)几迄,去getTask取任務(wù),如果有任務(wù)則執(zhí)行任務(wù)冰评,在沒有任務(wù)后映胁,線程會(huì)執(zhí)行清理并退出函數(shù)final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts state設(shè)置為0可以相應(yīng)中斷了狮含。 boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { // 到這說(shuō)明還有任務(wù),開始執(zhí)行任務(wù) w.lock(); // 防止被shutdown if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); // 執(zhí)行前的鉤子函數(shù)曼振,默認(rèn)是什么都不做 Throwable thrown = null; try { task.run(); // 實(shí)際執(zhí)行任務(wù) } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; // 統(tǒng)計(jì)完成了多少個(gè)任務(wù) w.unlock(); } } completedAbruptly = false; // 到這說(shuō)明沒有異常 } finally { processWorkerExit(w, completedAbruptly);// 清理函數(shù) } }
- getTasK 獲取任務(wù)
上面這個(gè)函數(shù)主要關(guān)注超時(shí)部分,兩種情況盅惜,一種是核心線程需要超時(shí)中剩,另一種就是線程數(shù)超過核心線程忌穿。private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { // 到這說(shuō)明沒有任務(wù)了,則將工作線程worker減1 decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // 如果設(shè)置了超時(shí)時(shí)間集索,或者線程數(shù)超出核心線程數(shù)則為true屿愚。都使用超時(shí)時(shí)間 if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { // 到這說(shuō)明,線程數(shù)量已經(jīng)足夠多务荆,或者已經(jīng)超時(shí)妆距,則返回null,并減少1 if (compareAndDecrementWorkerCount(c)) return null; continue; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); // 是否使用超時(shí)函匕,采用的獲取方式不一樣娱据。 if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
- processWorkerExit
注意之前的問題,什么時(shí)候addWorker參數(shù)為null時(shí)焦蘑,shutdown狀態(tài)也是可以添加的盯拱,也就是要去完成隊(duì)列里的任務(wù)。private void processWorkerExit(Worker w, boolean completedAbruptly) { if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted,表示意外結(jié)束,則表示在run()發(fā)生了異常结啼,則需要數(shù)量-1 掠剑,如果不是,則不需要郊愧,因?yàn)樵趃etTask()中-1了 decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock();// 釋放線程朴译,需要加鎖。 try { completedTaskCount += w.completedTasks; workers.remove(w); // 從workers中移除 } finally { mainLock.unlock(); } tryTerminate();// 嘗試終止線程池属铁,因?yàn)橛锌赡墁F(xiàn)在已經(jīng)沒有任務(wù)了 int c = ctl.get(); if (runStateLessThan(c, STOP)) { // running狀態(tài)或shutdown狀態(tài)眠寿,則還要增加一個(gè)線程 if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // 如果大于核心線程數(shù)則退出 } // 也就是說(shuō)小于核心線程數(shù)才會(huì)增加 addWorker(null, false); } }
關(guān)閉線程池
-
shutdown
調(diào)用shutdown方法后例嘱,線程池不再接受任務(wù)狡逢,但工作隊(duì)列中的任務(wù)還是會(huì)被執(zhí)行public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess();// 權(quán)限檢查 advanceRunState(SHUTDOWN); // 設(shè)置狀態(tài) interruptIdleWorkers(); // 中斷等待任務(wù)的線程(1) onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } tryTerminate(); } private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); // 可重入鎖 try { for (Worker w : workers) { Thread t = w.thread; if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); // 中斷線程 } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break; } } finally { mainLock.unlock(); } }
1)這里可能會(huì)疑惑,怎么中斷的蝶防?回到runWorker()函數(shù)中甚侣,我們發(fā)現(xiàn)能執(zhí)行到這里說(shuō)明拿到鎖了,那么中斷可能在
task != null || (task = getTask()) != nul
這條語(yǔ)句中间学。其實(shí)就是在getTask()中往getTask()函數(shù)下面看殷费,找到,worQueue.poll或者workQueue.take();這兩個(gè)函數(shù)都是獲取任務(wù)隊(duì)列中的下個(gè)任務(wù)低葫,而且都是加鎖且響應(yīng)中斷详羡。所以我們中斷的作用是是在這。
當(dāng)然肯定還會(huì)問嘿悬,只是結(jié)束這一次的循環(huán)獲取任務(wù)而已实柠,并不會(huì)中斷啊。確實(shí)沒錯(cuò)善涨,它僅僅是結(jié)束這一次的獲取窒盐。但是,如果這是隊(duì)列處在SHUTDOWN并且隊(duì)列為空時(shí)钢拧,那么就會(huì)退出蟹漓,并且移除線程,從而達(dá)到中斷清理線程的目的源内。那如果隊(duì)列中還存在呢葡粒,這又回到我們之前說(shuō)的,shutdown狀態(tài),僅僅是不再接受任務(wù)嗽交,但還是會(huì)處理隊(duì)列中的任務(wù)卿嘲。所以還是會(huì)繼續(xù)獲取任務(wù),只有等任務(wù)處理完畢夫壁,才開始一個(gè)一個(gè)退出2)在shutdown執(zhí)行完拾枣,仍有任務(wù),隊(duì)列不會(huì)退出掌唾,那最后是如何清理的呢放前。
答案是之前講過的processWorkerExit(w, completedAbruptly); 在沒有任務(wù)之后,線程會(huì)自動(dòng)退出從workers中移除糯彬,然后執(zhí)行tryTerminate()
-
tryTerminate()
final void tryTerminate() { for (;;) { int c = ctl.get(); if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return; if (workerCountOf(c) != 0) { // Eligible to terminate interruptIdleWorkers(ONLY_ONE); //中斷workers中的一個(gè) return; } final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 到這說(shuō)明已經(jīng)沒有線程了 workerCountOf(c) == 0,則設(shè)置狀態(tài)為TERMINATED if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { terminated(); } finally { ctl.set(ctlOf(TERMINATED, 0)); termination.signalAll(); // 喚醒所有等待terminate的線程 } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } }
-
shutdownNow
shutdownNow和shutdown函數(shù)不一樣葱她,它會(huì)結(jié)束所有的線程撩扒,public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(STOP); interruptWorkers(); tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; } private void interruptWorkers() { // 不是interruptIdleWorkers final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) w.interruptIfStarted(); // 無(wú)需獲取鎖,也就是執(zhí)行中的任務(wù)也會(huì)被中斷吨些。 } finally { mainLock.unlock(); } }
總結(jié)
創(chuàng)建線程池
使用Executors工具類創(chuàng)建搓谆,根據(jù)最大線程數(shù)、核心線程數(shù)豪墅、超時(shí)空閑時(shí)間來(lái)創(chuàng)建泉手。-
添加任務(wù)
- 如果線程數(shù)小于核心線程數(shù),則直接添加線程
- 如果線程數(shù)達(dá)到核心線程數(shù)偶器,則將任務(wù)添加到任務(wù)阻塞隊(duì)列
- 如果線程數(shù)達(dá)到核心線程數(shù)斩萌,且任務(wù)隊(duì)列滿了,則創(chuàng)建新線程
- 如果線程數(shù)達(dá)到最大線程數(shù)屏轰,且任務(wù)隊(duì)列滿了颊郎,則拒絕任務(wù)
-
執(zhí)行任務(wù)
- 在創(chuàng)建線程時(shí)獲取任務(wù)或者循環(huán)從任務(wù)隊(duì)列中獲取任務(wù),并執(zhí)行
- 在線程數(shù)小于核心線程數(shù)時(shí)霎苗,會(huì)一直阻塞獲取任務(wù)
-
回收線程
- 如果一直沒有獲取到線程且超過核心線程則退出
- 調(diào)用shutdown后姆吭,會(huì)中斷等待任務(wù)的線程,如果沒有任務(wù)唁盏,則全部退出内狸。如果有任務(wù),則核心線程數(shù)內(nèi)的線程會(huì)繼續(xù)去等待任務(wù)厘擂,所有任務(wù)完成后昆淡,退出