并發(fā)編程之線程池及隊(duì)列
問(wèn)題:
線程池作用,主要實(shí)現(xiàn)類,并說(shuō)出實(shí)現(xiàn)類場(chǎng)景以及區(qū)別乘综。
ThreadPoolExecutor使用場(chǎng)景。以及原理套硼。
Executor拒絕策略說(shuō)的是什么卡辰?
無(wú)界阻塞延遲隊(duì)列delayqueue原理是什么?
CyclicBarrier和CountDownLatch的區(qū)別邪意?
線程池作用九妈,主要實(shí)現(xiàn)類,并說(shuō)出實(shí)現(xiàn)類場(chǎng)景以及區(qū)別
作用
減少在創(chuàng)建和銷毀線程上所花的時(shí)間以及系統(tǒng)資源的開(kāi)銷雾鬼。
如果不使用線程池允蚣,可能造成系統(tǒng)創(chuàng)建大量線程。
線程池種類
newCachedThreadPool
用newCachedThreadPool()方法創(chuàng)建該線程池對(duì)象呆贿,創(chuàng)建之初里面一個(gè)線程都沒(méi)有,當(dāng)execute方法或submit方法向線程池提交任務(wù)時(shí)森渐,會(huì)自動(dòng)新建線程做入;如果線程池中有空余線程,則不會(huì)新建同衣;這種線程池一般最多情況可以容納幾萬(wàn)個(gè)線程竟块,里面的線程空余60s會(huì)被回收。
適用場(chǎng)景:執(zhí)行很多短期異步的小程序耐齐。
newFixedThreadPool
固定線程數(shù)的池子浪秘,每個(gè)線程的存活時(shí)間是無(wú)限的蒋情,當(dāng)池子滿了就不再添加線程;若池中線程均在繁忙狀態(tài)耸携,新任務(wù)會(huì)進(jìn)入阻塞隊(duì)列中(無(wú)界的阻塞隊(duì)列)棵癣。
適用場(chǎng)景:執(zhí)行長(zhǎng)期的任務(wù),性能較好夺衍。
newSingleThreadExecutor
只有一個(gè)線程的線程池狈谊,且線程的存活時(shí)間是無(wú)限的;當(dāng)線程繁忙時(shí)沟沙,對(duì)于新任務(wù)會(huì)進(jìn)入阻塞隊(duì)列中(無(wú)界的阻塞隊(duì)列)河劝。
適用:一個(gè)任務(wù)一個(gè)任務(wù)執(zhí)行的場(chǎng)景。
NewScheduledThreadPool
創(chuàng)建一個(gè)固定大小的線程池矛紫,池內(nèi)的線程存活時(shí)間無(wú)限赎瞎,線程池支持定時(shí)及周期性的任務(wù)執(zhí)行。如果所有線程均處于繁忙狀態(tài)颊咬,對(duì)于新任務(wù)會(huì)進(jìn)入DelayedWorkQueue
隊(duì)列务甥。
適用場(chǎng)景:周期性執(zhí)行任務(wù)的場(chǎng)景。
線程池任務(wù)執(zhí)行流程:
當(dāng)線程池小于
corePoolSize
時(shí)贪染,新任務(wù)將創(chuàng)建一個(gè)新的線程缓呛,即使此時(shí)線程池種存在空閑線程。當(dāng)線程池達(dá)到
corePoolSize
時(shí)杭隙,新提交的任務(wù)將被放入workQueue
中哟绊,等待線程池任務(wù)調(diào)度執(zhí)行。當(dāng)
workQueue
已滿痰憎,且maximumPoolSize
>corePoolSize
時(shí)票髓,新任務(wù)會(huì)創(chuàng)建新線程執(zhí)行任務(wù)。當(dāng)提交任務(wù)數(shù)超過(guò)
maximumPoolSize
時(shí)铣耘,新提交任務(wù)由RejectedExecutionHandler
處理洽沟。當(dāng)線程池中超過(guò)
corePoolSize
時(shí),空閑時(shí)間達(dá)到keepAliveTime
時(shí)蜗细,關(guān)閉空閑線程裆操。當(dāng)設(shè)置了
allowCoreThreadTimeOut(true)
時(shí),線程池中corePoolSize
線程空閑時(shí)間達(dá)到keepAliveTime
也將關(guān)閉炉媒。
ThreadPoolExecutor使用場(chǎng)景踪区。以及原理
ThreadPoolExecutor類實(shí)現(xiàn)了ExecutorService接口和Executor接口。
ThreadPoolExecutor
參數(shù):
參數(shù) | 含義 |
---|---|
corePoolSize | 核心線程池大小 |
maximumPoolSize | 最大線程池大小 |
keepAliveTime | 線程池中超過(guò)corePoolSize數(shù)目的空閑線程最大存活時(shí)間吊骤;可以allowCoreThreadTimeOut(true)使得核心線程有效時(shí)間 |
TimeUnit | keepAliveTime時(shí)間單位 |
workQueue | 阻塞任務(wù)隊(duì)列 |
threadFactory | 新建線程工廠 |
RejectedExecutionHandler | 當(dāng)提交任務(wù)數(shù)超過(guò)maxmumPoolSize+workQueue之和時(shí)缎岗,任務(wù)會(huì)交給RejectedExecutionHandler來(lái)處理 |
當(dāng)線程池小于corePoolSize時(shí),新提交任務(wù)將創(chuàng)建一個(gè)新線程執(zhí)行任務(wù)白粉,即使此時(shí)線程池中存在空閑線程传泊。
當(dāng)線程池達(dá)到corePoolSize時(shí)鼠渺,新提交任務(wù)將被放入workQueue中,等待線程池中任務(wù)調(diào)度執(zhí)行
當(dāng)workQueue已滿眷细,且maximumPoolSize>corePoolSize時(shí)拦盹,新提交任務(wù)會(huì)創(chuàng)建新線程執(zhí)行任務(wù)
當(dāng)提交任務(wù)數(shù)超過(guò)maximumPoolSize時(shí),新提交任務(wù)由RejectedExecutionHandler處理
當(dāng)線程池中超過(guò)corePoolSize線程薪鹦,空閑時(shí)間達(dá)到keepAliveTime時(shí)掌敬,關(guān)閉空閑線程
當(dāng)設(shè)置allowCoreThreadTimeOut(true)時(shí),線程池中corePoolSize線程空閑時(shí)間達(dá)到keepAliveTime也將關(guān)閉
Executor拒絕策略說(shuō)的是什么池磁?
線程池中的數(shù)量大于corePoolSize奔害,緩沖隊(duì)列workQueue滿,并且線程池中的數(shù)量等于maximumPoolSize地熄,那么通過(guò) handler所指定的策略來(lái)處理此任務(wù)华临。
ThreadPoolExecutor.AbortPolicy
拋出java.util.concurrent.RejectedExecutionException異常。
ThreadPoolExecutor.CallerRunsPolicy
用于被拒絕任務(wù)的處理程序端考,它直接在 execute 方法的調(diào)用線程中運(yùn)行被拒絕的任務(wù)雅潭;如果執(zhí)行程序已關(guān)閉,則會(huì)丟棄該任務(wù)却特。
ThreadPoolExecutor.DiscardOldestPolicy
丟棄任務(wù)隊(duì)列中最舊任務(wù)扶供。
ThreadPoolExecutor.DiscardPolicy
丟棄當(dāng)前將要加入隊(duì)列的任務(wù)。
無(wú)界阻塞延遲隊(duì)列delayqueue原理是什么裂明?
DelayQueue
是一個(gè)支持延時(shí)獲取元素的無(wú)界阻塞隊(duì)列椿浓。隊(duì)列使用PriorityQueue
來(lái)實(shí)現(xiàn)。隊(duì)列中的元素必須實(shí)現(xiàn)Delayed接口闽晦,在創(chuàng)建元素時(shí)可以指定多久才能從隊(duì)列中獲取當(dāng)前元素扳碍。只有在延遲期滿時(shí)才能從隊(duì)列中提取元素。
適用場(chǎng)景
緩存系統(tǒng)的設(shè)計(jì):使用DelayQueue保存緩存元素的有效期仙蛉,使用一個(gè)線程循環(huán)查詢DelayQueue笋敞,一旦能從DelayQueue中獲取元素時(shí),就表示有緩存到期了荠瘪。
定時(shí)任務(wù)調(diào)度:使用DelayQueue保存當(dāng)天要執(zhí)行的任務(wù)和執(zhí)行時(shí)間夯巷,一旦從DelayQueue中獲取到任務(wù)就開(kāi)始執(zhí)行,比如Timer就是使用DelayQueue實(shí)現(xiàn)的哀墓。
實(shí)現(xiàn)思路
以支持優(yōu)先級(jí)的PriorityQueue無(wú)界隊(duì)列作為一個(gè)容器鞭莽,因?yàn)樵囟急仨殞?shí)現(xiàn)Delayed接口,可以根據(jù)元素的過(guò)期時(shí)間來(lái)對(duì)元素進(jìn)行排列麸祷,因此,先過(guò)期的元素會(huì)在隊(duì)首褒搔,每次從隊(duì)列里取出來(lái)都是最先要過(guò)期的元素阶牍。如果延遲隊(duì)列中的消息到了延遲時(shí)間則可以從中取出消息否則無(wú)法取出消息也就無(wú)法消費(fèi)喷面。
CyclicBarrier和CountDownLatch的區(qū)別
CountDownLatch | CyclicBarrier |
---|---|
減計(jì)數(shù)方式 | 加計(jì)數(shù)方式 |
計(jì)算為0時(shí)釋放所有等待的線程 | 計(jì)數(shù)達(dá)到指定值時(shí)釋放所有等待線程 |
計(jì)數(shù)為0時(shí),無(wú)法重置 | 計(jì)數(shù)達(dá)到指定值時(shí)走孽,計(jì)數(shù)置為0重新開(kāi)始 |
調(diào)用countDown()方法計(jì)數(shù)減一惧辈,調(diào)用await()方法只進(jìn)行阻塞,對(duì)計(jì)數(shù)沒(méi)任何影響 | 調(diào)用await()方法計(jì)數(shù)加1磕瓷,若加1后的值不等于構(gòu)造方法的值盒齿,則線程阻塞 |
不可重復(fù)利用 | 可重復(fù)利用 |
CyclicBarrier
class Runner implements Runnable {
?
private CyclicBarrier barrier;
?
private String name;
?
public Runner(CyclicBarrier barrier, String name) {
super();
this.barrier = barrier;
this.name = name;
}
?
@Override
public void run() {
try {
Thread.sleep(1000 * (new Random()).nextInt(8));
System.out.println(name + " 準(zhǔn)備OK.");
barrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println(name + " Go!!");
}
}
?
public class Race {
?
public static void main(String[] args) throws IOException, InterruptedException {
CyclicBarrier barrier = new CyclicBarrier(3);
?
ExecutorService executor = Executors.newFixedThreadPool(3);
executor.submit(new Thread(new Runner(barrier, "zhangsan")));
executor.submit(new Thread(new Runner(barrier, "lisi")));
executor.submit(new Thread(new Runner(barrier, "wangwu")));
?
executor.shutdown();
}
?
}
CyclicBarrier就是一個(gè)柵欄,等待所有線程到達(dá)后再執(zhí)行相關(guān)的操作困食。barrier 在釋放等待線程后可以重用边翁。
CountDownLatch
public class TestCountDownLatch {
private static final int N = 10;
?
public static void main(String[] args) throws InterruptedException {
CountDownLatch doneSignal = new CountDownLatch(N);
CountDownLatch startSignal = new CountDownLatch(1);//開(kāi)始執(zhí)行信號(hào)
?
for (int i = 1; i <= N; i++) {
new Thread(new Worker(i, doneSignal, startSignal)).start();//線程啟動(dòng)了
}
System.out.println("begin------------");
startSignal.countDown();//開(kāi)始執(zhí)行啦
doneSignal.await();//等待所有的線程執(zhí)行完畢
System.out.println("Ok");
?
}
?
static class Worker implements Runnable {
private final CountDownLatch doneSignal;
private final CountDownLatch startSignal;
private int beginIndex;
?
Worker(int beginIndex, CountDownLatch doneSignal,
CountDownLatch startSignal) {
this.startSignal = startSignal;
this.beginIndex = beginIndex;
this.doneSignal = doneSignal;
}
?
public void run() {
try {
startSignal.await(); //等待開(kāi)始執(zhí)行信號(hào)的發(fā)布
beginIndex = (beginIndex - 1) * 10 + 1;
for (int i = beginIndex; i <= beginIndex + 10; i++) {
System.out.println(i);
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
doneSignal.countDown();
}
}
}
}
CountDownLatch 是計(jì)數(shù)器, 線程完成一個(gè)就記一個(gè), 就像 報(bào)數(shù)一樣, 只不過(guò)是遞減的。
而CyclicBarrier更像一個(gè)水閘, 線程執(zhí)行就像水流, 在水閘處都會(huì)堵住, 等到水滿(線程到齊)了, 才開(kāi)始泄流硕盹。