在沒有看線程池的源碼的時候,我腦子里經(jīng)常會想砰诵,線程跑完怎么之后怎么歸還給線程池雨席。其實這種思想很明顯是和對象池的概念搞混淆了菩咨。
首先講對象池。對象池的核心思想是緩存和共享。那些被頻繁使用的對象旦委,在使用完后奇徒,不立即將它們釋放,而是將它們緩存起來缨硝,以供后續(xù)的應(yīng)用程序重復(fù)使用摩钙,從而減少創(chuàng)建對象和釋放對象的次數(shù),進而改善應(yīng)用程序的性能查辩。但是這個共享并不是同時共享胖笛。而是一個線程用完之后,交給另外一個線程用宜岛。就像數(shù)據(jù)庫連接池长踊。就像一把鑰匙,多個人共用萍倡,但是一次只能一個人進入身弊。我們把這種叫做串行線程關(guān)閉
。
串行線程封閉使得任意時刻列敲,最多僅有一個線程擁有對象的所有權(quán)阱佛。當然,這不是絕對的戴而,只要線程T1事實不會再修改對象O凑术,那么就相當于僅有T2擁有對象的所有權(quán)。串行線層封閉讓對象變得可以共享(雖然只能串行的擁有所有權(quán))所意,靈活性得到大大提高淮逊;相對的,要共享對象就涉及安全發(fā)布的問題扶踊,依靠BlockingQueue等同步工具很容易實現(xiàn)這一點泄鹏。
見前面我寫的自定義數(shù)據(jù)庫連接池http://www.reibang.com/p/545f8b1194ef
對象的復(fù)用是對象的所有權(quán)的交接。
而線程池的話是生產(chǎn)者消費者模型
.
小于core個數(shù)的時候姻檀,去創(chuàng)建worker線程命满。然后調(diào)用runworker方法中的,task.run();假設(shè)core=3 ;然后不斷創(chuàng)建3個的消費者線程涝滴。即使先創(chuàng)建消費者線程1绣版,和2 。然后這個時候消費者1執(zhí)行完成了歼疮。假如這個時候再有任務(wù)杂抽。還是創(chuàng)建消費者線程3 。后面再有任務(wù)的話韩脏,堆積在BlockingQueue上缩麸。用戶不斷的submit提交任務(wù)。生產(chǎn)者赡矢。然后 3個消費者去并發(fā)消費提交的任務(wù)杭朱。他并沒有歸還給線程池阅仔。task.run();這句執(zhí)行完成之后,會進入下一個循環(huán)弧械,執(zhí)行task = getTask()方法八酒。
講完了這兩個東西的區(qū)別之后。我們來看線程池的源碼刃唐。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadPoolTest {
private static final ExecutorService service = Executors.newFixedThreadPool(5);
public static void main(String[] args) {
service.submit(new Task());
}
}
class Task implements Runnable{
@Override
public void run() {
System.out.println("hello world");
}
}
注意羞迷,《阿里開發(fā)手冊》 上不建議用這種方式創(chuàng)建線程池。ExecutorService service = Executors.newFixedThreadPool(5); 因為這樣創(chuàng)建的LinkedBlockingQueue的長度是Integer.MaxValue画饥。會耗盡系統(tǒng)資源衔瓮。
Runnable任務(wù)的走向。
分析源碼之前抖甘,我們先畫出線程池的UML類圖热鞍。
主要的方法邏輯都在ThreadPoolExecutor上。
下面我們分析runnable task怎么走衔彻。
1 提交任務(wù)
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
1.首先不能提交空任務(wù)碍现,否者拋個異常
2.然后通過newTaskFor方法將一個Runnable類型的task包裝成一個RunnableFuture類型的ftask
3.然后調(diào)用executor方法執(zhí)行ftask
newTaskFor(task, null)這一句的目的是將Runnable接口適配成一個返回值為null的FutureTask類型的任務(wù)。當然這都不重要米奸,我們關(guān)心的是task到哪里去了
2 封裝任務(wù)成FutureTask
包裝成一個FutureTask
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
2.包裝成一個Callable交給成員變量callable
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
3.包裝成一個RunnableAdapter
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}
4.交給了成員變量task,并交由call方法來執(zhí)行
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
}
task先封裝成 RunnableAdapter 的task屬性昼接。然后這個RunnableAdapter是FutureTask的callable屬性。
然后接下來 我們關(guān)心ftask的callable屬性
悴晰。
3 執(zhí)行exuecute方法慢睡。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
addWorker方法
addWorker方法是線程池的核心方法,它負責(zé)控制線程什么情況可以提交铡溪,什么情況下執(zhí)行漂辐,什么時候拒絕新線程的加。這個方法很重要棕硫,想要學(xué)習(xí)線程池還需要了解這個方法
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
w = new Worker(firstTask); 先封裝成worker對象髓涯。然后執(zhí)行t.start()。
執(zhí)行worker對象 的thread屬性的 run()方法哈扮。
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
這段代碼把線程的執(zhí)行機制纬纪,和任務(wù)的提交分離. this指代當前worker對象。也就是把當前worker對象(runnable對象)給thread執(zhí)行滑肉。但是這里有個問題包各,傳入的firstTask干嘛去了。
addWorker方法中執(zhí)行的t.start() 靶庙,啟動worker方法中的run()方法问畅。
調(diào)用worker對象的runwork方法。
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
這里前面的疑問解決了,先拿到前面?zhèn)魅氲膄irstTask护姆,作為task變量矾端。第一次task不為空,短路或
后面語句不執(zhí)行卵皂。然后task.run(); 最后task置為空须床。while循環(huán)下次判斷。while (task != null || (task = getTask()) != null)
.執(zhí)行task =getTask()方法渐裂。
執(zhí)行下一個任務(wù)
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
workQueue.take() 從阻塞隊列中獲取豺旬,任務(wù)Runnable。作為getTask()方法返回值柒凉。繼續(xù)執(zhí)行上面的runWorker方法中的task.run()方法
參考資料:
從一個run方法的經(jīng)歷看線程池https://blog.csdn.net/u011803809/article/details/77427641