本文所說的“核心線程”、“非核心線程”是一個虛擬的概念,是為了方便描述而虛擬出來的概念虑鼎,在代碼中并沒有哪個線程被標(biāo)記為“核心線程”或“非核心線程”耍休,所有線程都是一樣的刃永,只是當(dāng)線程池中的線程多于指定的核心線程數(shù)量時,會將多出來的線程銷毀掉羊精,池中只保留指定個數(shù)的線程斯够。那些被銷毀的線程是隨機的,可能是第一個創(chuàng)建的線程,也可能是最后一個創(chuàng)建的線程读规,或其它時候創(chuàng)建的線程抓督。一開始我以為會有一些線程被標(biāo)記為“核心線程”,而其它的則是“非核心線程”束亏,在銷毀多余線程的時候只銷毀那些“非核心線程”铃在,而“核心線程”不被銷毀。這種理解是錯誤的枪汪。
在ThreadPollExcutor類中涌穆,有一個字段 private final AtomicInteger?ctl?= new AtomicInteger(ctlOf(RUNNING, 0)); 是對線程池的運行狀態(tài)和線程池中有效線程的數(shù)量進行控制的, 它包含兩部分信息: 線程池的運行狀態(tài) (runState) 和線程池內(nèi)有效線程的數(shù)量 (workerCount)雀久,還有幾個對ctl進行計算的方法:
// 獲取運行狀態(tài)
privatestaticintrunStateOf(intc)? ? {returnc & ~CAPACITY; }
// 獲取活動線程數(shù)
privatestaticintworkerCountOf(intc)? {returnc & CAPACITY; }
以上兩個方法在源碼中經(jīng)常用到宿稀,結(jié)合我們的目標(biāo),對運行狀態(tài)的一些判斷及處理可以不用去管赖捌,而對當(dāng)前活動線程數(shù)要加以關(guān)注等等祝沸。
下面將遵循這些原則來分析源碼。
解惑
當(dāng)我們要向線程池添加一個任務(wù)時是調(diào)用ThreadPollExcutor對象的execute(Runnable command)方法來完成的越庇,所以先來看看ThreadPollExcutor類中的execute(Runnable command)方法的源碼:
publicvoid execute(Runnable command) {
? ? if(command ==null)
? ? ? ? thrownew NullPointerException();
? ? intc = ctl.get();
? ? if(workerCountOf(c) < corePoolSize) {
? ? ? ? if(addWorker(command,true))
? ? ? ? ? ? return;
? ? ? ? c = ctl.get();
? ? }
? ? if(isRunning(c) && workQueue.offer(command)) {
? ? ? ? intrecheck = ctl.get();
? ? ? ? if(! isRunning(recheck) && remove(command))
? ? ? ? ? ? reject(command);
? ? ? ? elseif(workerCountOf(recheck) == 0)
? ? ? ? ? ? addWorker(null,false);
? ? }
? ? elseif(!addWorker(command,false))
? ? ? ? reject(command);
}
按照我們在分析方法中提到的一些原則罩锐,去掉一些相關(guān)性不強的代碼,看看核心代碼是怎樣的卤唉。
// 為分析而簡化后的代碼publicvoid execute(Runnable command) {
? ? intc = ctl.get();
? ? if(workerCountOf(c) < corePoolSize) {
? ? ? ? // 如果當(dāng)前活動線程數(shù)小于corePoolSize涩惑,則新建一個線程放入線程池中,并把任務(wù)添加到該線程中if(addWorker(command,true))
? ? ? ? return;
? ? ? ? c = ctl.get();
? ? }
? ? // 如果當(dāng)前活動線程數(shù)大于等于corePoolSize桑驱,則嘗試將任務(wù)放入緩存隊列if (workQueue.offer(command)) {
? ? ? ? intrecheck = ctl.get();
? ? ? ? if(workerCountOf(recheck) == 0)
? ? ? ? addWorker(null,false);
? ? }else {
? ? ? ? // 緩存已滿竭恬,新建一個線程放入線程池,并把任務(wù)添加到該線程中(此時新建的線程相當(dāng)于非核心線程)addWorker(command,false)
? ? }
}
這樣一看熬的,邏輯應(yīng)該清晰很多了痊硕。
如果 當(dāng)前活動線程數(shù) < 指定的核心線程數(shù),則創(chuàng)建并啟動一個線程來執(zhí)行新提交的任務(wù)(此時新建的線程相當(dāng)于核心線程)押框;
如果 當(dāng)前活動線程數(shù) >= 指定的核心線程數(shù)岔绸,且緩存隊列未滿,則將任務(wù)添加到緩存隊列中橡伞;
如果 當(dāng)前活動線程數(shù) >= 指定的核心線程數(shù)盒揉,且緩存隊列已滿,則創(chuàng)建并啟動一個線程來執(zhí)行新提交的任務(wù)(此時新建的線程相當(dāng)于非核心線程)兑徘;
接下來看 addWorker(Runnable firstTask, boolean core)方法
private boolean addWorker(Runnable firstTask,boolean core) {
? ? retry:
? ? for (;;) {
? ? ? ? intc = ctl.get();
? ? ? ? intrs = runStateOf(c);
? ? ? ? // Check if queue empty only if necessary.if(rs >= SHUTDOWN &&? ? ? ? ! (rs == SHUTDOWN &&? ? ? ? firstTask ==null&&? ? ? ? ! workQueue.isEmpty()))
? ? ? ? returnfalse;
? ? ? ? for (;;) {
? ? ? ? ? ? intwc = workerCountOf(c);
? ? ? ? ? ? if(wc >= CAPACITY ||? ? ? ? ? ? wc >= (core ? corePoolSize : maximumPoolSize))
? ? ? ? ? ? returnfalse;
? ? ? ? ? ? if (compareAndIncrementWorkerCount(c))
? ? ? ? ? ? break retry;
? ? ? ? ? ? c = ctl.get();// Re-read ctlif(runStateOf(c) != rs)
? ? ? ? ? ? continue retry;
? ? ? ? ? ? // else CAS failed due to workerCount change; retry inner loop? ? ? ? }
? ? }
? ? booleanworkerStarted =false;
? ? booleanworkerAdded =false;
? ? Worker w =null;
? ? try {
? ? ? ? w =new Worker(firstTask);
? ? ? ? finalThread t = w.thread;
? ? ? ? if(t !=null) {
? ? ? ? ? ? finalReentrantLock mainLock =this.mainLock;
? ? ? ? ? ? mainLock.lock();
? ? ? ? ? ? try {
? ? ? ? ? ? ? ? // Recheck while holding lock.
? ? ? ? ? ? ? ? // Back out on ThreadFactory failure or if
? ? ? ? ? ? ? ? // shut down before lock acquired.intrs = runStateOf(ctl.get());
? ? ? ? ? ? ? ? if(rs < SHUTDOWN ||? ? ? ? ? ? ? ? (rs == SHUTDOWN && firstTask ==null)) {
? ? ? ? ? ? ? ? ? ? if(t.isAlive())// precheck that t is startablethrownew IllegalThreadStateException();
? ? ? ? ? ? ? ? ? ? workers.add(w);
? ? ? ? ? ? ? ? ? ? ints = workers.size();
? ? ? ? ? ? ? ? ? ? if(s > largestPoolSize)
? ? ? ? ? ? ? ? ? ? largestPoolSize = s;
? ? ? ? ? ? ? ? ? ? workerAdded =true;
? ? ? ? ? ? ? ? }
? ? ? ? ? ? } finally {
? ? ? ? ? ? ? ? mainLock.unlock();
? ? ? ? ? ? }
? ? ? ? ? ? if (workerAdded) {
? ? ? ? ? ? ? ? t.start();
? ? ? ? ? ? ? ? workerStarted =true;
? ? ? ? ? ? }
? ? ? ? }
? ? } finally {
? ? ? ? if(! workerStarted)
? ? ? ? ? ? addWorkerFailed(w);
? ? }
? ? return workerStarted;
}
同樣预烙,我們也來簡化一下:
// 為分析而簡化后的代碼private boolean addWorker(Runnable firstTask,boolean core) {
? ? intwc = workerCountOf(c);
? ? if(wc >= (core ? corePoolSize : maximumPoolSize))
? ? // 如果當(dāng)前活動線程數(shù) >= 指定的核心線程數(shù),不創(chuàng)建核心線程
? ? // 如果當(dāng)前活動線程數(shù) >= 指定的最大線程數(shù)道媚,不創(chuàng)建非核心線程 returnfalse;
? ? booleanworkerStarted =false;
? ? booleanworkerAdded =false;
? ? Worker w =null;
? ? try {
? ? ? ? // 新建一個Worker扁掸,將要執(zhí)行的任務(wù)作為參數(shù)傳進去w =new Worker(firstTask);
? ? ? ? finalThread t = w.thread;
? ? ? ? if(t !=null) {
? ? ? ? ? ? workers.add(w);
? ? ? ? ? ? workerAdded =true;
? ? ? ? ? ? if (workerAdded) {
? ? ? ? ? ? ? ? // 啟動剛剛新建的那個worker持有的線程翘县,等下要看看這個線程做了啥? ? ? ? ? ? ? ? t.start();
? ? ? ? ? ? ? ? workerStarted =true;
? ? ? ? ? ? }
? ? ? ? }
? ? } finally {
? ? ? ? if(! workerStarted)
? ? ? ? ? ? addWorkerFailed(w);
? ? }
? ? return workerStarted;
}
看到這里,我們大概能猜測到谴分,addWorker方法的功能就是新建一個線程并啟動這個線程锈麸,要執(zhí)行的任務(wù)應(yīng)該就是在這個線程中執(zhí)行。為了證實我們的這種猜測需要再來看看Worker這個類牺蹄。
private final class Workerextends AbstractQueuedSynchronizerimplements Runnable{
? ? // ....}
Worker(Runnable firstTask) {
? ? setState(-1);// inhibit interrupts until runWorkerthis.firstTask = firstTask;
? ? this.thread = getThreadFactory().newThread(this);
}
從上面的Worker類的聲明可以看到忘伞,它實現(xiàn)了Runnable接口,以及從它的構(gòu)造方法中可以知道待執(zhí)行的任務(wù)賦值給了它的變量firstTask沙兰,并以它自己為參數(shù)新建了一個線程賦值給它的變量thread氓奈,那么運行這個線程的時候其實就是執(zhí)行Worker的run()方法,來看一下這個方法:
public void run() {
? ? runWorker(this);
}
final void runWorker(Worker w) {
? ? Thread wt = Thread.currentThread();
? ? Runnable task = w.firstTask;
? ? w.firstTask =null;
? ? w.unlock(); // allow interruptsbooleancompletedAbruptly =true;
? ? try {
? ? ? ? while(task !=null|| (task = getTask()) !=null) {
? ? ? ? ? ? w.lock();
? ? ? ? ? ? // If pool is stopping, ensure thread is interrupted;
? ? ? ? ? ? // if not, ensure thread is not interrupted. This
? ? ? ? ? ? // requires a recheck in second case to deal with
? ? ? ? ? ? // shutdownNow race while clearing interruptif((runStateAtLeast(ctl.get(), STOP) ||? ? ? ? ? ? (Thread.interrupted() &&? ? ? ? ? ? runStateAtLeast(ctl.get(), STOP))) &&? ? ? ? ? ? !wt.isInterrupted())
? ? ? ? ? ? ? ? wt.interrupt();
? ? ? ? ? ? try {
? ? ? ? ? ? ? ? beforeExecute(wt, task);
? ? ? ? ? ? ? ? Throwable thrown =null;
? ? ? ? ? ? ? ? try {
? ? ? ? ? ? ? ? ? ? task.run();
? ? ? ? ? ? ? ? } catch (RuntimeException x) {
? ? ? ? ? ? ? ? ? ? thrown = x;throw x;
? ? ? ? ? ? ? ? } catch (Error x) {
? ? ? ? ? ? ? ? ? ? thrown = x;throw x;
? ? ? ? ? ? ? ? } catch (Throwable x) {
? ? ? ? ? ? ? ? ? ? thrown = x;thrownew Error(x);
? ? ? ? ? ? ? ? } finally {
? ? ? ? ? ? ? ? ? ? afterExecute(task, thrown);
? ? ? ? ? ? ? ? }
? ? ? ? ? ? } finally {
? ? ? ? ? ? ? ? task =null;
? ? ? ? ? ? ? ? w.completedTasks++;
? ? ? ? ? ? ? ? w.unlock();
? ? ? ? ? ? }
? ? ? ? }
? ? ? ? completedAbruptly =false;
? ? } finally {
? ? ? ? processWorkerExit(w, completedAbruptly);
? ? }
}
在run()方法中只調(diào)了一下 runWorker(this) 方法鼎天,再來簡化一下這個 runWorker() 方法
// 為分析而簡化后的代碼
final void runWorker(Worker w) {
? ? Runnable task = w.firstTask;
? ? w.firstTask =null;
? ? while(task !=null|| (task = getTask()) !=null) {
? ? ? ? try {
? ? ? ? ? ? task.run();
? ? ? ? } finally {
? ? ? ? ? ? task =null;
? ? ? ? }
? ? }
}
很明顯舀奶,runWorker()方法里面執(zhí)行了我們新建Worker對象時傳進去的待執(zhí)行的任務(wù),到這里為止貌似這個worker的run()方法就執(zhí)行完了斋射,既然執(zhí)行完了那么這個線程也就沒用了育勺,只有等待虛擬機銷毀了。那么回顧一下我們的目標(biāo):Java線程池中的核心線程是如何被重復(fù)利用的罗岖?好像并沒有重復(fù)利用啊涧至,新建一個線程,執(zhí)行一個任務(wù)桑包,然后就結(jié)束了南蓬,銷毀了。沒什么特別的啊哑了,難道有什么地方漏掉了赘方,被忽略了?再仔細看一下runWorker()方法的代碼垒手,有一個while循環(huán),當(dāng)執(zhí)行完firstTask后task==null了倒信,那么就會執(zhí)行判斷條件?(task = getTask()) != null科贬,我們假設(shè)這個條件成立的話,那么這個線程就不止只執(zhí)行一個任務(wù)了鳖悠,可以執(zhí)行多個任務(wù)了榜掌,也就實現(xiàn)了重復(fù)利用了。答案呼之欲出了乘综,接著看getTask()方法
private Runnable getTask() {
? ? lean timedOut =false;// Did the last poll() time out?for (;;) {
? ? ? ? intc = ctl.get();
? ? ? ? intrs = runStateOf(c);
? ? ? ? // Check if queue empty only if necessary.if(rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
? ? ? ? ? ? decrementWorkerCount();
? ? ? ? ? ? returnnull;
? ? ? ? }
? ? ? ? intwc = workerCountOf(c);
? ? ? ? // Are workers subject to culling?booleantimed = allowCoreThreadTimeOut || wc > corePoolSize;
? ? ? ? if((wc > maximumPoolSize || (timed && timedOut))
? ? ? ? ? ? && (wc > 1 || workQueue.isEmpty())) {
? ? ? ? ? ? if (compareAndDecrementWorkerCount(c))
? ? ? ? ? ? ? ? returnnull;
? ? ? ? ? ? continue;
? ? ? ? }
? ? ? ? try {
? ? ? ? ? ? Runnable r = timed ?? ? ? ? ? ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
? ? ? ? ? ? workQueue.take();
? ? ? ? ? ? if(r !=null)
? ? ? ? ? ? return r;
? ? ? ? ? ? timedOut =true;
? ? ? ? ? ? } catch (InterruptedException retry) {
? ? ? ? ? ? ? ? timedOut =false;
? ? ? ? ? ? }
? ? }
}
老規(guī)矩憎账,簡化一下代碼來看:
// 為分析而簡化后的代碼
private Runnable getTask() {
? ? booleantimedOut =false;
? ? for (;;) {
? ? ? ? intc = ctl.get();
? ? ? ? intwc = workerCountOf(c);
? ? ? ? // timed變量用于判斷是否需要進行超時控制。
? ? ? ? // allowCoreThreadTimeOut默認是false卡辰,也就是核心線程不允許進行超時胞皱;
? ? ? ? // wc > corePoolSize邪意,表示當(dāng)前線程池中的線程數(shù)量大于核心線程數(shù)量;
? ? ? ? // 對于超過核心線程數(shù)量的這些線程反砌,需要進行超時控制booleantimed = allowCoreThreadTimeOut || wc > corePoolSize;
? ? ? ? if(timed && timedOut) {
? ? ? ? ? ? // 如果需要進行超時控制雾鬼,且上次從緩存隊列中獲取任務(wù)時發(fā)生了超時,那么嘗試將workerCount減1,即當(dāng)前活動線程數(shù)減1宴树,
? ? ? ? ? ? // 如果減1成功策菜,則返回null,這就意味著runWorker()方法中的while循環(huán)會被退出酒贬,其對應(yīng)的線程就要銷毀了又憨,也就是線程池中少了一個線程了if (compareAndDecrementWorkerCount(c))
? ? ? ? ? ? returnnull;
? ? ? ? ? ? continue;
? ? ? ? }
? ? ? ? try {
? ? ? ? ? ? Runnable r = timed ?? ? ? ? ? ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
? ? ? ? ? ? workQueue.take();
? ? ? ? ? ? // 注意workQueue中的poll()方法與take()方法的區(qū)別
? ? ? ? ? ? //poll方式取任務(wù)的特點是從緩存隊列中取任務(wù),最長等待keepAliveTime的時長,取不到返回null
? ? ? ? ? ? //take方式取任務(wù)的特點是從緩存隊列中取任務(wù)锭吨,若隊列為空,則進入阻塞狀態(tài)蠢莺,直到能取出對象為止if(r !=null)
? ? ? ? ? ? return r;
? ? ? ? ? ? timedOut =true;
? ? ? ? ? ? } catch (InterruptedException retry) {
? ? ? ? ? ? ? ? timedOut =false;
? ? ? ? }
? ? }
}
從以上代碼可以看出,getTask()的作用是
如果當(dāng)前活動線程數(shù)大于核心線程數(shù)耐齐,當(dāng)去緩存隊列中取任務(wù)的時候浪秘,如果緩存隊列中沒任務(wù)了,則等待keepAliveTime的時長埠况,此時還沒任務(wù)就返回null耸携,這就意味著runWorker()方法中的while循環(huán)會被退出,其對應(yīng)的線程就要銷毀了辕翰,也就是線程池中少了一個線程了夺衍。因此只要線程池中的線程數(shù)大于核心線程數(shù)就會這樣一個一個地銷毀這些多余的線程。
如果當(dāng)前活動線程數(shù)小于等于核心線程數(shù)喜命,同樣也是去緩存隊列中取任務(wù)沟沙,但當(dāng)緩存隊列中沒任務(wù)了,就會進入阻塞狀態(tài)壁榕,直到能取出任務(wù)為止矛紫,因此這個線程是處于阻塞狀態(tài)的,并不會因為緩存隊列中沒有任務(wù)了而被銷毀牌里。這樣就保證了線程池有N個線程是活的颊咬,可以隨時處理任務(wù),從而達到重復(fù)利用的目的牡辽。
小結(jié)
通過以上的分析喳篇,應(yīng)該算是比較清楚地解答了“線程池中的核心線程是如何被重復(fù)利用的”這個問題,同時也對線程池的實現(xiàn)機制有了更進一步的理解:
當(dāng)有新任務(wù)來的時候态辛,先看看當(dāng)前的線程數(shù)有沒有超過核心線程數(shù)麸澜,如果沒超過就直接新建一個線程來執(zhí)行新的任務(wù),如果超過了就看看緩存隊列有沒有滿奏黑,沒滿就將新任務(wù)放進緩存隊列中炊邦,滿了就新建一個線程來執(zhí)行新的任務(wù)编矾,如果線程池中的線程數(shù)已經(jīng)達到了指定的最大線程數(shù)了,那就根據(jù)相應(yīng)的策略拒絕任務(wù)铣耘。
當(dāng)緩存隊列中的任務(wù)都執(zhí)行完了的時候洽沟,線程池中的線程數(shù)如果大于核心線程數(shù),就銷毀多出來的線程蜗细,直到線程池中的線程數(shù)等于核心線程數(shù)裆操。此時這些線程就不會被銷毀了,它們一直處于阻塞狀態(tài)炉媒,等待新的任務(wù)到來
圖片來源于:https://www.cnblogs.com/linguanh/p/8000063.html