一扳埂、線(xiàn)程池簡(jiǎn)介:
多線(xiàn)程技術(shù)主要解決處理器單元內(nèi)多個(gè)線(xiàn)程執(zhí)行的問(wèn)題查近,它可以顯著減少處理器單元的閑置時(shí)間竿奏,增加處理器單元的吞吐能力荠耽。
假設(shè)一個(gè)服務(wù)器完成一項(xiàng)任務(wù)所需時(shí)間為:T1 創(chuàng)建線(xiàn)程時(shí)間瞻颂,T2 在線(xiàn)程中執(zhí)行任務(wù)的時(shí)間豺谈,T3 銷(xiāo)毀線(xiàn)程時(shí)間。
如果:T1 + T3 遠(yuǎn)大于 T2贡这,則可以采用線(xiàn)程池茬末,以提高服務(wù)器性能。
一個(gè)線(xiàn)程池包括以下四個(gè)基本組成部分:
1盖矫、線(xiàn)程池管理器(ThreadPool):用于創(chuàng)建并管理線(xiàn)程池丽惭,包括 創(chuàng)建線(xiàn)程池,銷(xiāo)毀線(xiàn)程池辈双,添加新任務(wù);
2责掏、工作線(xiàn)程(PoolWorker):線(xiàn)程池中線(xiàn)程,在沒(méi)有任務(wù)時(shí)處于等待狀態(tài)湃望,可以循環(huán)的執(zhí)行任務(wù);
3换衬、任務(wù)接口(Task):每個(gè)任務(wù)必須實(shí)現(xiàn)的接口,以供工作線(xiàn)程調(diào)度任務(wù)的執(zhí)行证芭,它主要規(guī)定了任務(wù)的入口瞳浦,任務(wù)執(zhí)行完后的收尾工作,任務(wù)的執(zhí)行狀態(tài)等;
4废士、任務(wù)隊(duì)列(taskQueue):用于存放沒(méi)有處理的任務(wù)叫潦。提供一種緩沖機(jī)制。
代碼實(shí)現(xiàn)中并沒(méi)有實(shí)現(xiàn)任務(wù)接口官硝,而是把Runnable對(duì)象加入到線(xiàn)程池管理器(ThreadPool)矗蕊,然后剩下的事情就由線(xiàn)程池管理器(ThreadPool)來(lái)完成了四敞。
二、java類(lèi)庫(kù)中提供的線(xiàn)程池簡(jiǎn)介:
java提供的線(xiàn)程池更加強(qiáng)大拔妥,相信理解線(xiàn)程池的工作原理忿危,看類(lèi)庫(kù)中的線(xiàn)程池就不會(huì)感到陌生了。基本框架的了解:Executor框架
- Executor: 所有線(xiàn)程池的接口,只有一個(gè)方法没龙。
- ExecutorService: 增加Executor的行為铺厨,是Executor實(shí)現(xiàn)類(lèi)的最直接接口。
- Executors: 提供了一系列工廠(chǎng)方法用于創(chuàng)先線(xiàn)程池硬纤,返回的線(xiàn)程池都實(shí)現(xiàn)了ExecutorService 接口解滓。
- ThreadPoolExecutor:線(xiàn)程池的具體實(shí)現(xiàn)類(lèi),一般用的各種線(xiàn)程池都是基于這個(gè)類(lèi)實(shí)現(xiàn)的。
三筝家、線(xiàn)程池實(shí)現(xiàn)原理:
線(xiàn)程池的優(yōu)點(diǎn):
- 重用線(xiàn)程池中的線(xiàn)程,減少因?qū)ο髣?chuàng)建,銷(xiāo)毀所帶來(lái)的性能開(kāi)銷(xiāo);
- 能有效的控制線(xiàn)程的最大并發(fā)數(shù),提高系統(tǒng)資源利用率,同時(shí)避免過(guò)多的資源競(jìng)爭(zhēng),避免堵塞;
- 能夠多線(xiàn)程進(jìn)行簡(jiǎn)單的管理,使線(xiàn)程的使用簡(jiǎn)單洼裤、高效。
線(xiàn)程池的實(shí)現(xiàn)過(guò)程沒(méi)有用到Synchronized關(guān)鍵字溪王,用的都是Volatile,Lock和同步(阻塞)隊(duì)列,Atomic相關(guān)類(lèi)腮鞍,F(xiàn)utureTask等等,因?yàn)楹笳叩男阅芨鼉?yōu)莹菱。理解的過(guò)程可以很好的學(xué)習(xí)源碼中并發(fā)控制的思想移国。
在開(kāi)篇提到過(guò)線(xiàn)程池的優(yōu)點(diǎn)是可總結(jié)為以下三點(diǎn):
- 線(xiàn)程復(fù)用
- 控制最大并發(fā)數(shù)
- 管理線(xiàn)程
1.線(xiàn)程復(fù)用過(guò)程
在線(xiàn)程的生命周期中,它要經(jīng)過(guò)新建(New)道伟、就緒(Runnable)迹缀、運(yùn)行(Running)、阻塞(Blocked)和死亡(Dead)5種狀態(tài)蜜徽。
Thread通過(guò)new來(lái)新建一個(gè)線(xiàn)程祝懂,這個(gè)過(guò)程是是初始化一些線(xiàn)程信息,如線(xiàn)程名拘鞋,id,線(xiàn)程所屬group等砚蓬,可以認(rèn)為只是個(gè)普通的對(duì)象。調(diào)用Thread的start()后Java虛擬機(jī)會(huì)為其創(chuàng)建方法調(diào)用棧和程序計(jì)數(shù)器掐禁,同時(shí)將hasBeenStarted為true,之后調(diào)用start方法就會(huì)有異常怜械。
處于這個(gè)狀態(tài)中的線(xiàn)程并沒(méi)有開(kāi)始運(yùn)行,只是表示該線(xiàn)程可以運(yùn)行了傅事。至于該線(xiàn)程何時(shí)開(kāi)始運(yùn)行缕允,取決于JVM里線(xiàn)程調(diào)度器的調(diào)度。當(dāng)線(xiàn)程獲取cpu后蹭越,run()方法會(huì)被調(diào)用障本。不要自己去調(diào)用Thread的run()方法。之后根據(jù)CPU的調(diào)度在就緒——運(yùn)行——阻塞間切換,直到run()方法結(jié)束或其他方式停止線(xiàn)程驾霜,進(jìn)入dead狀態(tài)案训。
所以實(shí)現(xiàn)線(xiàn)程復(fù)用的原理應(yīng)該就是要保持線(xiàn)程處于存活狀態(tài)(就緒,運(yùn)行或阻塞)粪糙。接下來(lái)來(lái)看下ThreadPoolExecutor是怎么實(shí)現(xiàn)線(xiàn)程復(fù)用的强霎。
- 在ThreadPoolExecutor主要Worker類(lèi)來(lái)控制線(xiàn)程的復(fù)用∪馗裕看下Worker類(lèi)簡(jiǎn)化后的代碼城舞,這樣方便理解:
private final class Worker implements Runnable {
final Thread thread;
Runnable firstTask;
Worker(Runnable firstTask) {
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
final void runWorker(Worker w) {
Runnable task = w.firstTask;
w.firstTask = null;
while (task != null || (task = getTask()) != null){
task.run();
}
}
Worker是一個(gè)Runnable,同時(shí)擁有一個(gè)thread寞酿,這個(gè)thread就是要開(kāi)啟的線(xiàn)程家夺,在新建Worker對(duì)象時(shí)同時(shí)新建一個(gè)Thread對(duì)象,同時(shí)將Worker自己作為參數(shù)傳入TThread伐弹,這樣當(dāng)Thread的start()方法調(diào)用時(shí)拉馋,運(yùn)行的實(shí)際上是Worker的run()方法,接著到runWorker()中,有個(gè)while循環(huán)惨好,一直從getTask()里得到Runnable對(duì)象煌茴,順序執(zhí)行。getTask()又是怎么得到Runnable對(duì)象的呢昧狮?
- 依舊是簡(jiǎn)化后的代碼:
private Runnable getTask() {
if(一些特殊情況) {
return null;
}
Runnable r = workQueue.take();
return r;
}
這個(gè)workQueue就是初始化ThreadPoolExecutor時(shí)存放任務(wù)的BlockingQueue隊(duì)列景馁,這個(gè)隊(duì)列里的存放的都是將要執(zhí)行的Runnable任務(wù)。因?yàn)锽lockingQueue是個(gè)阻塞隊(duì)列逗鸣,BlockingQueue.take()得到如果是空,則進(jìn)入等待狀態(tài)直到BlockingQueue有新的對(duì)象被加入時(shí)喚醒阻塞的線(xiàn)程绰精。所以一般情況Thread的run()方法就不會(huì)結(jié)束,而是不斷執(zhí)行從workQueue里的Runnable任務(wù)撒璧,這就達(dá)到了線(xiàn)程復(fù)用的原理了。
2.控制最大并發(fā)數(shù)
那Runnable是什么時(shí)候放入workQueue笨使?Worker又是什么時(shí)候創(chuàng)建卿樱,Worker里的Thread的又是什么時(shí)候調(diào)用start()開(kāi)啟新線(xiàn)程來(lái)執(zhí)行Worker的run()方法的呢?有上面的分析看出Worker里的runWorker()執(zhí)行任務(wù)時(shí)是一個(gè)接一個(gè)硫椰,串行進(jìn)行的繁调,那并發(fā)是怎么體現(xiàn)的呢?
很容易想到是在execute(Runnable runnable)時(shí)會(huì)做上面的一些任務(wù)靶草√阋龋看下execute里是怎么做的。
- execute:簡(jiǎn)化后的代碼
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 當(dāng)前線(xiàn)程數(shù) < corePoolSize
if (workerCountOf(c) < corePoolSize) {
// 直接啟動(dòng)新的線(xiàn)程奕翔。
if (addWorker(command, true))
return;
c = ctl.get();
}
// 活動(dòng)線(xiàn)程數(shù) >= corePoolSize
// runState為RUNNING && 隊(duì)列未滿(mǎn)
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 再次檢驗(yàn)是否為RUNNING狀態(tài)
// 非RUNNING狀態(tài) 則從workQueue中移除任務(wù)并拒絕
if (!isRunning(recheck) && remove(command))
reject(command);// 采用線(xiàn)程池指定的策略拒絕任務(wù)
// 兩種情況:
// 1.非RUNNING狀態(tài)拒絕新的任務(wù)
// 2.隊(duì)列滿(mǎn)了啟動(dòng)新的線(xiàn)程失斣U(workCount > maximumPoolSize)
} else if (!addWorker(command, false))
reject(command);
}
- addWorker:簡(jiǎn)化后的代碼
private boolean addWorker(Runnable firstTask, boolean core) {
int wc = workerCountOf(c);
if (wc >= (core ? corePoolSize : maximumPoolSize)) {
return false;
}
w = new Worker(firstTask);
final Thread t = w.thread;
t.start();
}
根據(jù)代碼再來(lái)看上面提到的線(xiàn)程池工作過(guò)程中的添加任務(wù)的情況:
* 如果正在運(yùn)行的線(xiàn)程數(shù)量小于 corePoolSize,那么馬上創(chuàng)建線(xiàn)程運(yùn)行這個(gè)任務(wù);
* 如果正在運(yùn)行的線(xiàn)程數(shù)量大于或等于 corePoolSize宾袜,那么將這個(gè)任務(wù)放入隊(duì)列捻艳;
* 如果這時(shí)候隊(duì)列滿(mǎn)了,而且正在運(yùn)行的線(xiàn)程數(shù)量小于 maximumPoolSize庆猫,那么還是要?jiǎng)?chuàng)建非核心線(xiàn)程立刻運(yùn)行這個(gè)任務(wù)认轨;
* 如果隊(duì)列滿(mǎn)了,而且正在運(yùn)行的線(xiàn)程數(shù)量大于或等于 maximumPoolSize月培,那么線(xiàn)程池會(huì)拋出異常RejectExecutionException嘁字。
這就是Android的AsyncTask在并行執(zhí)行是在超出最大任務(wù)數(shù)是拋出RejectExecutionException的原因所在,詳見(jiàn)基于最新版本的AsyncTask源碼解讀及AsyncTask的黑暗面
通過(guò)addWorker如果成功創(chuàng)建新的線(xiàn)程成功节视,則通過(guò)start()開(kāi)啟新線(xiàn)程拳锚,同時(shí)將firstTask作為這個(gè)Worker里的run()中執(zhí)行的第一個(gè)任務(wù)。
雖然每個(gè)Worker的任務(wù)是串行處理寻行,但如果創(chuàng)建了多個(gè)Worker霍掺,因?yàn)楣灿靡粋€(gè)workQueue,所以就會(huì)并行處理了拌蜘。
所以根據(jù)corePoolSize和maximumPoolSize來(lái)控制最大并發(fā)數(shù)杆烁。
3.管理線(xiàn)程
通過(guò)線(xiàn)程池可以很好的管理線(xiàn)程的復(fù)用,控制并發(fā)數(shù)简卧,以及銷(xiāo)毀等過(guò)程,線(xiàn)程的復(fù)用和控制并發(fā)上面已經(jīng)講了兔魂,而線(xiàn)程的管理過(guò)程已經(jīng)穿插在其中了,也很好理解举娩。
在ThreadPoolExecutor有個(gè)ctl的AtomicInteger變量析校。通過(guò)這一個(gè)變量保存了兩個(gè)內(nèi)容:
所有線(xiàn)程的數(shù)量
每個(gè)線(xiàn)程所處的狀態(tài)
其中低29位存線(xiàn)程數(shù),高3位存runState铜涉,通過(guò)位運(yùn)算來(lái)得到不同的值智玻。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//得到線(xiàn)程的狀態(tài)
private static int runStateOf(int c) {
return c & ~CAPACITY;
}
//得到Worker的的數(shù)量
private static int workerCountOf(int c) {
return c & CAPACITY;
}
// 判斷線(xiàn)程是否在運(yùn)行
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
這里主要通過(guò)shutdown和shutdownNow()來(lái)分析線(xiàn)程池的關(guān)閉過(guò)程。首先線(xiàn)程池有五種狀態(tài)來(lái)控制任務(wù)添加與執(zhí)行芙代。主要介紹以下三種:
RUNNING狀態(tài):線(xiàn)程池正常運(yùn)行吊奢,可以接受新的任務(wù)并處理隊(duì)列中的任務(wù);
SHUTDOWN狀態(tài):不再接受新的任務(wù)纹烹,但是會(huì)執(zhí)行隊(duì)列中的任務(wù)页滚;
STOP狀態(tài):不再接受新任務(wù),不處理隊(duì)列中的任務(wù)
shutdown 這個(gè)方法會(huì)將runState置為SHUTDOWN铺呵,會(huì)終止所有空閑的線(xiàn)程裹驰,而仍在工作的線(xiàn)程不受影響,所以隊(duì)列中的任務(wù)人會(huì)被執(zhí)行陪蜻。
shutdownNow 方法將runState置為STOP邦马。和shutdown方法的區(qū)別,這個(gè)方法會(huì)終止所有的線(xiàn)程,所以隊(duì)列中的任務(wù)也不會(huì)被執(zhí)行了滋将。
總結(jié)
通過(guò)對(duì)ThreadPoolExecutor源碼的分析邻悬,從總體上了解了線(xiàn)程池的創(chuàng)建,任務(wù)的添加随闽,執(zhí)行等過(guò)程父丰,熟悉這些過(guò)程,使用線(xiàn)程池就會(huì)更輕松了掘宪。
而從中學(xué)到的一些對(duì)并發(fā)控制蛾扇,以及生產(chǎn)者——消費(fèi)者模型任務(wù)處理的使用,對(duì)以后理解或解決其他相關(guān)問(wèn)題會(huì)有很大的幫助魏滚。比如Android中的Handler機(jī)制镀首,而Looper中的Messager隊(duì)列用一個(gè)BlookQueue來(lái)處理同樣是可以的,這寫(xiě)就是讀源碼的收獲吧。