說(shuō)明
對(duì)于JDK源碼分析的文章扛点,僅僅記錄我認(rèn)為重要的地方。源碼的細(xì)節(jié)實(shí)在太多笨使,不可能面面俱到地寫(xiě)清每個(gè)邏輯。所以我的JDK源碼分析
僚害,著重在JDK的體系架構(gòu)層面硫椰,具體源碼可以參考:http://www.cnblogs.com/skywang12345/category/455711.html。
架構(gòu)圖
![](http://static.zybuluo.com/Yano/q803e4cwr6brh3c7hzyak2cr/image.png)
Executor 函數(shù)接口
Executor:提供一種將"任務(wù)提交"與"任務(wù)如何運(yùn)行"分離開(kāi)來(lái)的機(jī)制萨蚕。
void execute(Runnable command)
ExecutorService
ExecutorService提供了"將任務(wù)提交給執(zhí)行者的接口(submit方法)"靶草,"讓執(zhí)行者執(zhí)行任務(wù)(invokeAll, invokeAny方法)"的接口等等。
AbstractExecutorService
抽象類(lèi)岳遥,為ExecutorService中的函數(shù)接口提供了默認(rèn)實(shí)現(xiàn)奕翔。
ThreadPoolExecutor
大名鼎鼎的“線程池”。
ScheduledExecutorService
接口浩蓉,提供了“延時(shí)”和“周期執(zhí)行”接口派继。
ScheduledThreadPoolExecutor
繼承于ThreadPoolExecutor,并且實(shí)現(xiàn)了ScheduledExecutorService接口捻艳。它相當(dāng)于提供了"延時(shí)"和"周期執(zhí)行"功能的ScheduledExecutorService互艾。
Executors
靜態(tài)工廠類(lèi)
。它通過(guò)靜態(tài)工廠方法返回 ExecutorService讯泣、ScheduledExecutorService纫普、ThreadFactory 和 Callable 等類(lèi)的對(duì)象。
示例
class MyThreadPool extends Thread {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " is running... ");
}
}
@Test
public void testThreadPoolExecutor() throws InterruptedException {
ExecutorService pool = Executors.newFixedThreadPool(2);
for (int i = 0; i < 5; i++) {
TimeUnit.SECONDS.sleep(2);
pool.execute(new MyThreadPool());
}
pool.shutdown();
}
輸出:
pool-1-thread-1 is running...
pool-1-thread-2 is running...
pool-1-thread-1 is running...
pool-1-thread-2 is running...
pool-1-thread-1 is running...
分析:
示例代碼中好渠,新建了一個(gè)大小固定為2的線程池昨稼,并將5個(gè)線程依次放入線程池。從輸出結(jié)果中可以看出拳锚,是線程pool-1-thread-1
和線程pool-1-thread-2
相互交替假栓,并沒(méi)有新建多余的線程。
靜態(tài)工廠類(lèi)
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
創(chuàng)建一個(gè)線程池霍掺,線程池的容量是nThreads匾荆。已提交但是沒(méi)有執(zhí)行的任務(wù),會(huì)被阻塞杆烁,直到有任務(wù)運(yùn)行完成牙丽。
newFixedThreadPool()在調(diào)用ThreadPoolExecutor()時(shí),會(huì)傳遞一個(gè)LinkedBlockingQueue()對(duì)象兔魂,而LinkedBlockingQueue
是單向鏈表實(shí)現(xiàn)的阻塞隊(duì)列
烤芦。在線程池中,就是通過(guò)該阻塞隊(duì)列來(lái)實(shí)現(xiàn)"當(dāng)線程池中任務(wù)數(shù)量超過(guò)允許的任務(wù)數(shù)量時(shí)析校,部分任務(wù)會(huì)阻塞等待"构罗。
ThreadFactory
public static ThreadFactory defaultThreadFactory() {
return new DefaultThreadFactory();
}
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
// 提供創(chuàng)建線程的API铜涉。
public Thread newThread(Runnable r) {
// 線程對(duì)應(yīng)的任務(wù)是Runnable對(duì)象r
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
// 設(shè)為“非守護(hù)線程”
if (t.isDaemon())
t.setDaemon(false);
// 將優(yōu)先級(jí)設(shè)為“Thread.NORM_PRIORITY”
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
excute 添加任務(wù)到“線程池”
public void execute(Runnable command) {
// 如果任務(wù)為null,則拋出異常遂唧。
if (command == null)
throw new NullPointerException();
// 獲取ctl對(duì)應(yīng)的int值芙代。該int值保存了"線程池中任務(wù)的數(shù)量"和"線程池狀態(tài)"信息
int c = ctl.get();
// 當(dāng)線程池中的任務(wù)數(shù)量 < "核心池大小"時(shí),即線程池中少于corePoolSize個(gè)任務(wù)盖彭。
// 則通過(guò)addWorker(command, true)新建一個(gè)線程链蕊,并將任務(wù)(command)添加到該線程中;然后谬泌,啟動(dòng)該線程從而執(zhí)行任務(wù)滔韵。
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 當(dāng)線程池中的任務(wù)數(shù)量 >= "核心池大小"時(shí),
// 而且掌实,"線程池處于允許狀態(tài)"時(shí)陪蜻,則嘗試將任務(wù)添加到阻塞隊(duì)列中。
if (isRunning(c) && workQueue.offer(command)) {
// 再次確認(rèn)“線程池狀態(tài)”贱鼻,若線程池異常終止了宴卖,則刪除任務(wù);然后通過(guò)reject()執(zhí)行相應(yīng)的拒絕策略的內(nèi)容邻悬。
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
// 否則症昏,如果"線程池中任務(wù)數(shù)量"為0,則通過(guò)addWorker(null, false)嘗試新建一個(gè)線程父丰,新建線程對(duì)應(yīng)的任務(wù)為null肝谭。
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 通過(guò)addWorker(command, false)新建一個(gè)線程,并將任務(wù)(command)添加到該線程中蛾扇;然后攘烛,啟動(dòng)該線程從而執(zhí)行任務(wù)。
// 如果addWorker(command, false)執(zhí)行失敗镀首,則通過(guò)reject()執(zhí)行相應(yīng)的拒絕策略的內(nèi)容坟漱。
else if (!addWorker(command, false))
reject(command);
}
shutdown 關(guān)閉線程池
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
// 獲取鎖
mainLock.lock();
try {
// 檢查終止線程池的“線程”是否有權(quán)限。
checkShutdownAccess();
// 設(shè)置線程池的狀態(tài)為關(guān)閉狀態(tài)更哄。
advanceRunState(SHUTDOWN);
// 中斷線程池中空閑的線程芋齿。
interruptIdleWorkers();
// 鉤子函數(shù),在ThreadPoolExecutor中沒(méi)有任何動(dòng)作成翩。
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
// 釋放鎖
mainLock.unlock();
}
// 嘗試終止線程池
tryTerminate();
}
線程池狀態(tài)
- Running
- SHUTDOWN
- STOP
- TIDYING
- TERMINATED
rivate final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
ctl是一個(gè)AtomicInteger類(lèi)型的原子對(duì)象觅捆。ctl記錄了"線程池中的任務(wù)數(shù)量"和"線程池狀態(tài)"2個(gè)信息。
ctl共包括32位捕传。其中惠拭,高3位表示"線程池狀態(tài)",低29位表示"線程池中的任務(wù)數(shù)量"庸论。
RUNNING -- 對(duì)應(yīng)的高3位值是111职辅。
SHUTDOWN -- 對(duì)應(yīng)的高3位值是000。
STOP -- 對(duì)應(yīng)的高3位值是001聂示。
TIDYING -- 對(duì)應(yīng)的高3位值是010域携。
TERMINATED -- 對(duì)應(yīng)的高3位值是011。