背景
面試的時(shí)候經(jīng)常會(huì)被三連問(wèn)。用過(guò)嗎箩退?如何用的?場(chǎng)景是什么佳谦?所以有必要好好的研究下線程池迫在眉睫戴涝。
1、講解之前先了解下 retry: 因?yàn)樵创a中有這個(gè)retry標(biāo)記
先看一個(gè)簡(jiǎn)單的例子
/**
* @author shuliangzhao
* @Title: RetryTest
* @ProjectName design-parent
* @Description: TODO
* @date 2019/6/1 23:43
*/
public class RetryTest {
public static void main(String[] args) {
testRetry();
}
public static void testRetry() {
//retry:注釋1
for (int i = 0; i < 10; i++) {
retry: //注釋2
while (i == 5) {
continue retry;
}
System.out.print(i + " ");
}
}
}
如上如果只保留注釋1钻蔑,循環(huán)到 i==5的時(shí)候啥刻,程序跳到retry的那一行開(kāi)始執(zhí)行,此時(shí) i 的值未變咪笑,然后又是i==5可帽,程序進(jìn)入死循環(huán)一直執(zhí)行4到6行;執(zhí)行結(jié)果為0 1 2 3 4
如果直流注釋2窗怒,循環(huán)到 i==5的時(shí)候映跟,程序跳到retry的那一行開(kāi)始執(zhí)行,注意此時(shí) i 的值還是5扬虚,接著 i++(i 不是從0開(kāi)始了)申窘,所以輸出 0 1 2 3 4 6 7 8 9
說(shuō)明:其實(shí)retry就是一個(gè)標(biāo)記,標(biāo)記程序跳出循環(huán)的時(shí)候從哪里開(kāi)始執(zhí)行孔轴,功能類似于goto。retry一般都是跟隨者for循環(huán)出現(xiàn)碎捺,第一個(gè)retry的下面一行就是for循環(huán)路鹰,而且第二個(gè)retry的前面一般是 continue或是 break。
2收厨、為什么要使用線程池
缺點(diǎn)
a晋柱、每次new Thread新建對(duì)象,性能差诵叁。
b雁竞、缺乏統(tǒng)一管理,可能無(wú)限制的新建線程,過(guò)多占用系統(tǒng)資源導(dǎo)致死機(jī)或OOM
優(yōu)點(diǎn)
a碑诉、重用存在的線程彪腔,減少對(duì)象創(chuàng)建,消亡的開(kāi)銷
b进栽、有效控制最大并發(fā)線程數(shù)德挣,提高系統(tǒng)資源利用率
3、線程池實(shí)現(xiàn)原理
當(dāng)線程提交一個(gè)任務(wù)時(shí)候快毛,如果處理請(qǐng)看下圖
ThreadPoolExecutor執(zhí)行execute()分4種情況
a格嗅、若當(dāng)前運(yùn)行的線程少于corePoolSize,則創(chuàng)建新線程來(lái)執(zhí)行任務(wù)(執(zhí)行這一步需要獲取全局鎖)
b、若運(yùn)行的線程多于或等于corePoolSize,則將任務(wù)加入BlockingQueue
c唠帝、若無(wú)法將任務(wù)加入BlockingQueue,則創(chuàng)建新的線程來(lái)處理任務(wù)(執(zhí)行這一步需要獲取全局鎖)
d屯掖、若創(chuàng)建新線程將使當(dāng)前運(yùn)行的線程超出maximumPoolSize,任務(wù)將被拒絕,并調(diào)用RejectedExecutionHandler.rejectedExecution()
采取上述思路,是為了在執(zhí)行execute()時(shí),盡可能避免獲取全局鎖
在ThreadPoolExecutor完成預(yù)熱之后(當(dāng)前運(yùn)行的線程數(shù)大于等于corePoolSize),幾乎所有的execute()方法調(diào)用都是執(zhí)行步驟b,而步驟b不需要獲取全局鎖
源碼分析execute()
/**
* Executes the given task sometime in the future. The task
* may execute in a new thread or in an existing pooled thread.
*
* If the task cannot be submitted for execution, either because this
* executor has been shutdown or because its capacity has been reached,
* the task is handled by the current {@code RejectedExecutionHandler}.
*
* @param command the task to execute
* @throws RejectedExecutionException at discretion of
* {@code RejectedExecutionHandler}, if the task
* cannot be accepted for execution
* @throws NullPointerException if {@code command} is null
*/
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
//表示 “線程池狀態(tài)” 和 “線程數(shù)” 的整數(shù)
int c = ctl.get();
// 如果當(dāng)前線程數(shù)少于核心線程數(shù),直接添加一個(gè) worker 執(zhí)行任務(wù)襟衰,
// 創(chuàng)建一個(gè)新的線程贴铜,并把當(dāng)前任務(wù) command 作為這個(gè)線程的第一個(gè)任務(wù)(firstTask)
if (workerCountOf(c) < corePoolSize) {
// 添加任務(wù)成功,即結(jié)束
// 執(zhí)行的結(jié)果右蒲,會(huì)包裝到 FutureTask
// 返回 false 代表線程池不允許提交任務(wù)
if (addWorker(command, true))
return;
c = ctl.get();
}
// 到這說(shuō)明阀湿,要么當(dāng)前線程數(shù)大于等于核心線程數(shù),要么剛剛 addWorker 失敗
// 如果線程池處于 RUNNING 瑰妄,把這個(gè)任務(wù)添加到任務(wù)隊(duì)列 workQueue 中
if (isRunning(c) && workQueue.offer(command)) {
/* 若任務(wù)進(jìn)入 workQueue陷嘴,我們是否需要開(kāi)啟新的線程
* 線程數(shù)在 [0, corePoolSize) 是無(wú)條件開(kāi)啟新線程的
* 若線程數(shù)已經(jīng)大于等于 corePoolSize,則將任務(wù)添加到隊(duì)列中间坐,然后進(jìn)到這里
*/
int recheck = ctl.get();
// 若線程池不處于 RUNNING 灾挨,則移除已經(jīng)入隊(duì)的這個(gè)任務(wù),并且執(zhí)行拒絕策略
if (! isRunning(recheck) && remove(command))
reject(command);
// 若線程池還是 RUNNING 竹宋,且線程數(shù)為 0劳澄,則開(kāi)啟新的線程
// 這塊代碼的真正意圖:擔(dān)心任務(wù)提交到隊(duì)列中了,但是線程都關(guān)閉了
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 若 workQueue 滿蜈七,到該分支
// 以 maximumPoolSize 為界創(chuàng)建新 worker秒拔,
// 若失敗,說(shuō)明當(dāng)前線程數(shù)已經(jīng)達(dá)到 maximumPoolSize飒硅,執(zhí)行拒絕策略
else if (!addWorker(command, false))
reject(command);
}
其他源碼暫不貼出來(lái)了砂缩,自己可以認(rèn)真閱讀下。
4三娩、線程池創(chuàng)建
我們可以通過(guò)ThreadPoolExecutor來(lái)創(chuàng)建一個(gè)線程池
創(chuàng)建一個(gè)線程池時(shí)需要的參數(shù)
corePoolSize(核心線程數(shù)量)
線程池中應(yīng)該保持的主要線程的數(shù)量.即使線程處于空閑狀態(tài)庵芭,除非設(shè)置了allowCoreThreadTimeOut這個(gè)參數(shù),當(dāng)提交一個(gè)任務(wù)到線程池時(shí),若線程數(shù)量<corePoolSize,線程池會(huì)創(chuàng)建一個(gè)新線程放入works(一個(gè)HashSet)中執(zhí)行任務(wù),即使其他空閑的基本線程能夠執(zhí)行新任務(wù)也還是會(huì)創(chuàng)建新線程,等到需要執(zhí)行的任務(wù)數(shù)大于線程池基本大小時(shí)就不再創(chuàng)建,會(huì)嘗試放入等待隊(duì)列workQueue(一個(gè)BlockingQueue),如果調(diào)用了線程池的prestartAllCoreThreads(),線程池會(huì)提前創(chuàng)建并啟動(dòng)所有核心線程
workQueue
存儲(chǔ)待執(zhí)行任務(wù)的阻塞隊(duì)列,這些任務(wù)必須是Runnable的對(duì)象(如果是Callable對(duì)象雀监,會(huì)在submit內(nèi)部轉(zhuǎn)換為Runnable對(duì)象)
runnableTaskQueue(任務(wù)隊(duì)列):用于保存等待執(zhí)行的任務(wù)的阻塞隊(duì)列.可以選擇以下幾個(gè)阻塞隊(duì)列.
LinkedBlockingQueue:一個(gè)基于鏈表結(jié)構(gòu)的阻塞隊(duì)列,此隊(duì)列按FIFO排序元素,吞吐量通常要高于ArrayBlockingQueue.靜態(tài)工廠方法Executors.newFixedThreadPool()使用了這個(gè)隊(duì)列
SynchronousQueue:一個(gè)不存儲(chǔ)元素的阻塞隊(duì)列.每個(gè)插入操作必須等到另一個(gè)線程調(diào)用移除操作,否則插入操作一直處于阻塞狀態(tài),吞吐量通常要高于Linked-BlockingQueue,靜態(tài)工廠方法Executors.newCachedThreadPool使用了這個(gè)隊(duì)列
maximumPoolSize(線程池最大線程數(shù))
線程池允許創(chuàng)建的最大線程數(shù)
若隊(duì)列滿,并且已創(chuàng)建的線程數(shù)小于最大線程數(shù),則線程池會(huì)再創(chuàng)建新的線程放入works中執(zhí)行任務(wù),CashedThreadPool的關(guān)鍵,固定線程數(shù)的線程池?zé)o效
若使用了無(wú)界任務(wù)隊(duì)列,這個(gè)參數(shù)就沒(méi)什么效果
ThreadFactory:用于設(shè)置創(chuàng)建線程的工廠,可以通過(guò)線程工廠給每個(gè)創(chuàng)建出來(lái)的線程設(shè)置更有意義的名字.使用開(kāi)源框架guava提供ThreadFactoryBuilder可以快速給線程池里的線程設(shè)置有意義的名字,代碼如下
new ThreadFactoryBuilder().setNameFormat("XX-task-%d").build();
RejectedExecutionHandler(飽和策略):當(dāng)隊(duì)列和線程池都滿,說(shuō)明線程池處于飽和,必須采取一種策略處理提交的新任務(wù).策略默認(rèn)AbortPolicy,表無(wú)法處理新任務(wù)時(shí)拋出異常.在JDK 1.5中Java線程池框架提供了以下4種策略
AbortPolicy:丟棄任務(wù)双吆,拋出 RejectedExecutionException
CallerRunsPolicy:只用調(diào)用者所在線程來(lái)運(yùn)行任務(wù),有反饋機(jī)制,使任務(wù)提交的速度變慢)。
DiscardOldestPolicy
若沒(méi)有發(fā)生shutdown,嘗試丟棄隊(duì)列里最近的一個(gè)任務(wù),并執(zhí)行當(dāng)前任務(wù), 丟棄任務(wù)緩存隊(duì)列中最老的任務(wù)好乐,并且嘗試重新提交新的任務(wù)
DiscardPolicy:不處理,丟棄掉, 拒絕執(zhí)行匾竿,不拋異常
當(dāng)然,也可以根據(jù)應(yīng)用場(chǎng)景需要來(lái)實(shí)現(xiàn)RejectedExecutionHandler接口自定義策略.如記錄日志或持久化存儲(chǔ)不能處理的任務(wù)
keepAliveTime(線程活動(dòng)保持時(shí)間)
線程沒(méi)有任務(wù)執(zhí)行時(shí)最多保持多久時(shí)間終止
線程池的工作線程空閑后,保持存活的時(shí)間曹宴。
所以搂橙,如果任務(wù)很多,并且每個(gè)任務(wù)執(zhí)行的時(shí)間比較短笛坦,可以調(diào)大時(shí)間区转,提高線程的利用率
TimeUnit(線程活動(dòng)保持時(shí)間的單位):指示第三個(gè)參數(shù)的時(shí)間單位;可選的單位有天(DAYS)版扩、小時(shí)(HOURS)废离、分鐘(MINUTES)、毫秒(MILLISECONDS)礁芦、微秒(MICROSECONDS蜻韭,千分之一毫秒)和納秒(NANOSECONDS,千分之一微秒)
可以使用Executors創(chuàng)建線程池
使用線程池例子
/**
* @author shuliangzhao
* @Title: ThreadTaskId
* @ProjectName design-parent
* @Description: TODO
* @date 2019/6/1 23:03
*/
public class ThreadTaskId implements Runnable {
private final int id;
public ThreadTaskId(int id) {
this.id = id;
}
@Override
public void run() {
for (int i = 0;i < 5;i++) {
System.out.println("TaskInPool-["+id+"] is running phase-"+i);
try {
TimeUnit.SECONDS.sleep(1);
System.out.println("TaskInPool-["+id+"] is over");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
客戶端
/**
* @author shuliangzhao
* @Title: ThreadPoolExample
* @ProjectName design-parent
* @Description: TODO
* @date 2019/6/1 23:03
*/
public class ThreadPoolExample {
public static void main(String[] args) {
ExecutorService executorService = Executors.newSingleThreadExecutor();
for (int i = 0; i < 5; i++) {
executorService.execute(new ThreadTaskId(i));
}
executorService.shutdown();
}
}
執(zhí)行結(jié)果
以上就是線程池的簡(jiǎn)單介紹柿扣,這個(gè)不是完善版本肖方,會(huì)繼續(xù)補(bǔ)充的。_