偽代碼
static List<Runnable> rList =new ArrayList<>();//用來保存task的隊列
public static void main(String[] args) throws Exception {
for( int a=0;a<10;a++) {
rList.add(new Test().new Run(a));
}
Thread thread =new Thread(new Test().new MyRun()); //創(chuàng)建線程溉委,線程池當(dāng)中是進行了一次封裝將 有個worker 類漓拾,該類實現(xiàn)了Runnable 接口,同時持有Thread 對象逼裆,在內(nèi)部實例化Thread 的時候,將自身當(dāng)做Runnable 傳給了內(nèi)部的Thread 對象,后續(xù)的工作就是在這個類的run()中進行了
thread.start();;
}
class MyRun implements Runnable{ //thread 傳入的Runnable
@Override
public void run() {// 當(dāng)thread.start() 后這個run()就會得到執(zhí)行出革,在這個方法的內(nèi)部循環(huán)task隊列叠萍,直接調(diào)用task 的run()
// TODO Auto-generated method stub
for(int i=0;i<rList.size();i++) {
rList.get(i).run();
}
}
}
class Run implements Runnable{//相當(dāng)于我們自己傳入 的runnable
private int i ;
public Run(int i) {
super();
this.i = i;
}
@Override
public void run() {
// TODO Auto-generated method stub
System.err.println(i);
}
}
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {//小于核心線程數(shù)
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {//加入到隊列
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
addWorker()
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
//創(chuàng)建新的線程
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask); //實例化 Worker 類
final Thread t = w.thread; //創(chuàng)建Thread 對象
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();//啟動線程
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
看下Worker 類芝发,worker 類本身是一個Runnable
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this); // 這里創(chuàng)建了Thread ,并將this 傳入,上面說過苛谷,Worker 本身就是一個Runnable 對象
}
//既然是一個Runnable 辅鲸,必然復(fù)寫了run()
public void run() {
runWorker(this);
}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {// 這里會阻塞,不停的循環(huán)任務(wù)隊列
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();//執(zhí)行task 隊列中每個隊列的run()
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
getTask()
// 為分析而簡化后的代碼
private Runnable getTask() {
boolean timedOut = false;
for (;;) {
int c = ctl.get();
int wc = workerCountOf(c);
// timed變量用于判斷是否需要進行超時控制腹殿。
// allowCoreThreadTimeOut默認(rèn)是false独悴,也就是核心線程不允許進行超時例书;
// wc > corePoolSize,表示當(dāng)前線程池中的線程數(shù)量大于核心線程數(shù)量刻炒;
// 對于超過核心線程數(shù)量的這些線程决采,需要進行超時控制
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if (timed && timedOut) {
// 如果需要進行超時控制,且上次從緩存隊列中獲取任務(wù)時發(fā)生了超時坟奥,那么嘗試將workerCount減1,即當(dāng)前活動線程數(shù)減1树瞭,
// 如果減1成功,則返回null筏勒,這就意味著runWorker()方法中的while循環(huán)會被退出移迫,其對應(yīng)的線程就要銷毀了,也就是線
程池中少了一個線程了
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
// 注意workQueue中的poll()方法與take()方法的區(qū)別
//poll方式取任務(wù)的特點是從緩存隊列中取任務(wù),最長等待keepAliveTime的時長管行,取不到返回null(當(dāng)timed 為true的時候厨埋,就說明當(dāng)前線程數(shù)大于核心線程數(shù),這個時候需要考慮是否需要銷毀線程捐顷,等keepAliveTime時間后如果返回null就表明需要銷毀該線程了)
//take方式取任務(wù)的特點是從緩存隊列中取任務(wù)荡陷,若隊列為空,則進入阻塞狀態(tài),直到能取出對象為止(線程池中的線程數(shù)小于核心線程數(shù)迅涮,說明很閑废赞,只需要獲取任務(wù)即可)
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
總結(jié):Thread.start() 會執(zhí)行傳入Runnable 的run(),同時執(zhí)行完后,當(dāng)前THread 就結(jié)束了叮姑,這怎么實現(xiàn)復(fù)用呢唉地?多線可能會有多個Runnable 啊,每次執(zhí)行完就結(jié)束了那豈不是每一個Runnable 都要重新new Thread(runable),這跟自己new Thread 有什么區(qū)別传透?其實線程池創(chuàng)建的Thead 傳入的Runnable 并不是我們傳入的task Runanble, 而是其內(nèi)部一個封裝的一個Worker 類耘沼,我們創(chuàng)建的Runnable 并沒有被當(dāng)做參數(shù)傳入到Runnable 中,而是被加入到對比當(dāng)中朱盐,在worker 這個runnable 的run()中去循環(huán)取出群嗤,就是讓worker的run()內(nèi)部執(zhí)行while循環(huán)(循環(huán)任務(wù)隊列,這樣就能保證一直復(fù)用了)兵琳,具體分 poll(),和take()兩種形式狂秘,來區(qū)分該線程是否需要被銷毀,同時也說明核心線程的其實就是最后活下來的那幾個線程
參考:https://blog.csdn.net/MingHuang2017/article/details/79571529