簡(jiǎn)述
??為了徹底了解線(xiàn)程池的時(shí)候泳炉,我們需要弄清楚線(xiàn)程池創(chuàng)建的幾個(gè)參數(shù)
- corepollsize : 核心池的大小,默認(rèn)情況下花鹅,在創(chuàng)建線(xiàn)程池后,每當(dāng)有新的任務(wù)來(lái)的時(shí)候翠胰,如果此時(shí)線(xiàn)程池中的線(xiàn)程數(shù)小于核心線(xiàn)程數(shù),就會(huì)去創(chuàng)建一個(gè)線(xiàn)程執(zhí)行(就算有空線(xiàn)程也不復(fù)用)之景,當(dāng)創(chuàng)建的線(xiàn)程數(shù)達(dá)到核心線(xiàn)程數(shù)之后,再有任務(wù)進(jìn)來(lái)就會(huì)放入任務(wù)緩存隊(duì)列中锻狗。當(dāng)任務(wù)緩存隊(duì)列也滿(mǎn)了的時(shí)候,就會(huì)繼續(xù)創(chuàng)建線(xiàn)程轻纪,知道達(dá)到最大線(xiàn)程數(shù)。如果達(dá)到最大線(xiàn)程數(shù)之后再有任務(wù)過(guò)來(lái)潦嘶,那么就會(huì)采取拒絕服務(wù)策略。
- Maximumpoolsize : 線(xiàn)程池中最多可以創(chuàng)建的線(xiàn)程數(shù)
- keeplivetime : 線(xiàn)程空閑狀態(tài)時(shí)掂僵,最多保持多久的時(shí)間會(huì)終止。默認(rèn)情況下顷歌,當(dāng)線(xiàn)程池中的線(xiàn)程數(shù)大于corepollsize 時(shí),才會(huì)起作用 眯漩,直到線(xiàn)程數(shù)不大于 corepollsize 麻顶。
- workQuque: 阻塞隊(duì)列舱卡,用來(lái)存放等待的任務(wù)
- rejectedExecutionHandler :任務(wù)拒絕處理器(這個(gè)注意一下)辅肾,有四種
(1)abortpolicy丟棄任務(wù)轮锥,拋出異常
(2)discardpolicy拒絕執(zhí)行,不拋異常
(3)discardoldestpolicy 丟棄任務(wù)緩存隊(duì)列中最老的任務(wù)
(4)CallerRunsPolicy 線(xiàn)程池不執(zhí)行這個(gè)任務(wù)交胚,主線(xiàn)程自己執(zhí)行盈电。
1、newFixedThreadPool 定長(zhǎng)線(xiàn)程池
一個(gè)有指定的線(xiàn)程數(shù)的線(xiàn)程池匆帚,有核心的線(xiàn)程,里面有固定的線(xiàn)程數(shù)量吸重,響應(yīng)的速度快。正規(guī)的并發(fā)線(xiàn)程嚎幸,多用于服務(wù)器。固定的線(xiàn)程數(shù)由系統(tǒng)資源設(shè)置嫉晶。核心線(xiàn)程是沒(méi)有超時(shí)機(jī)制的骑疆,隊(duì)列大小沒(méi)有限制替废,除非線(xiàn)程池關(guān)閉了核心線(xiàn)程才會(huì)被回收。
2椎镣、newCachedThreadPool 可緩沖線(xiàn)程池
只有非核心線(xiàn)程,最大線(xiàn)程數(shù)很大状答,每新來(lái)一個(gè)任務(wù),當(dāng)沒(méi)有空余線(xiàn)程的時(shí)候就會(huì)重新創(chuàng)建一個(gè)線(xiàn)程剪况,這邊有一個(gè)超時(shí)機(jī)制,當(dāng)空閑的線(xiàn)程超過(guò)60s內(nèi)沒(méi)有用到的話(huà)译断,就會(huì)被回收,它可以一定程序減少頻繁創(chuàng)建/銷(xiāo)毀線(xiàn)程,減少系統(tǒng)開(kāi)銷(xiāo),適用于執(zhí)行時(shí)間短并且數(shù)量多的任務(wù)場(chǎng)景巡语。
3淮菠、ScheduledThreadPool 周期線(xiàn)程池
創(chuàng)建一個(gè)定長(zhǎng)線(xiàn)程池男公,支持定時(shí)及周期性任務(wù)執(zhí)行合陵,通過(guò)過(guò)schedule方法可以設(shè)置任務(wù)的周期執(zhí)行
4、newSingleThreadExecutor 單任務(wù)線(xiàn)程池
創(chuàng)建一個(gè)單線(xiàn)程化的線(xiàn)程池拥知,它只會(huì)用唯一的工作線(xiàn)程來(lái)執(zhí)行任務(wù)踏拜,保證所有任務(wù)按照指定順序(FIFO, LIFO, 優(yōu)先級(jí))執(zhí)行低剔,每次任務(wù)到來(lái)后都會(huì)進(jìn)入阻塞隊(duì)列,然后按指定順序執(zhí)行襟齿。
關(guān)鍵源碼解讀
1、execute方法
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//判斷當(dāng)前線(xiàn)程個(gè)數(shù)是否小于corePoolSize(核心線(xiàn)程數(shù))猜欺,如果小于的話(huà),就再創(chuàng)建一個(gè)線(xiàn)程替梨,每個(gè)線(xiàn)程都被封裝成一個(gè)Worker,
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//如果大于核心線(xiàn)程的話(huà)就嘗試把任務(wù)加入緩存隊(duì)列,這里增加了狀態(tài)出現(xiàn)異常的確認(rèn)判斷副瀑,
//如果狀態(tài)出現(xiàn)異常會(huì)繼續(xù)remove操作,如果執(zhí)行true挽鞠,則按照拒絕處理策略駁回任務(wù)
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//如果隊(duì)列放不了,只能采用默認(rèn)的拒絕服務(wù)策略了信认,
else if (!addWorker(command, false))
reject(command);
}
源碼中出現(xiàn)ctl的次數(shù)比較多均抽,那么這是個(gè)什么呢嫁赏?
我們可以看看它的定義
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
線(xiàn)程池狀態(tài)
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
方法
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
它實(shí)質(zhì)上就是一個(gè)線(xiàn)程安全的32位的Integer,用前三位表示線(xiàn)程池的狀態(tài)油挥,后29位來(lái)表示線(xiàn)程的個(gè)數(shù)款熬,所以在計(jì)算個(gè)數(shù)的時(shí)候用到了workerCountOf攘乒,忽略前三位帶來(lái)的影響贤牛。
2则酝、Worker中的run方法
Worker就是對(duì)線(xiàn)程的封裝,線(xiàn)程池中維護(hù)了一個(gè)HashSet<Worker>的一個(gè)集合來(lái)存儲(chǔ)工作線(xiàn)程沽讹,每次addWork的時(shí)候就往這個(gè)里面加,因?yàn)镠ashSet是不安全的爽雄,所以加了ReentrantLock來(lái)做同步
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
其實(shí)這個(gè)操作很簡(jiǎn)單盲链,就是一個(gè)while不斷的去getTask,獲得任務(wù)之后,就依次執(zhí)行
beforeExecute(wt, task);
task.run();
afterExecute(task, thrown);
(發(fā)現(xiàn)每次執(zhí)行任務(wù)的時(shí)候都加了鎖刽沾,有點(diǎn)奇怪,這里我還要看一下)
那么假設(shè)任務(wù)隊(duì)列中沒(méi)有了呢侧漓?那這里就用到了我們定義的keeplivetime ,在getTask中有這樣一段代碼布蔗,
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
也就是當(dāng)超過(guò)keeplivetime 沒(méi)有拿到就會(huì)返回null,這個(gè)時(shí)候循環(huán)就會(huì)截止浪腐,這個(gè)線(xiàn)程Wo也就會(huì)結(jié)束纵揍,所以說(shuō)keepAliveTime指的是最長(zhǎng)的poll時(shí)間