概述
- JAVA 中線程創(chuàng)建和銷毀需要時(shí)間完域,如果創(chuàng)建和銷毀的時(shí)間比執(zhí)行時(shí)間還多就不劃算
- 過(guò)多的線程占用系統(tǒng)內(nèi)存
- 頻繁切換上下文影響系統(tǒng)性能
線程池原理
線程池管理器
用于創(chuàng)建并管理線程池,包含創(chuàng)建和銷毀線程池,添加新的任務(wù)。工作線程
線程池中的線程,循環(huán)執(zhí)行任務(wù)遥椿,沒(méi)有任務(wù)時(shí)處理等待狀態(tài)任務(wù)接口
每個(gè)任務(wù)必須實(shí)現(xiàn)的接口,以供工作線程調(diào)度任務(wù)的執(zhí)行继薛,主要規(guī)定的任務(wù)的入口修壕,任務(wù)執(zhí)行完后的收尾工作愈捅,任務(wù)的執(zhí)行狀態(tài)等任務(wù)隊(duì)列
用于存放沒(méi)有處理的任務(wù)遏考,提供一種緩沖機(jī)制。-
執(zhí)行過(guò)程
- 是否達(dá)到核心線程數(shù)量蓝谨?沒(méi)達(dá)到灌具,創(chuàng)建一個(gè)工作線程來(lái)執(zhí)行任務(wù)
- 任務(wù)隊(duì)列是否已滿?沒(méi)滿譬巫,則將新提交的任務(wù)存儲(chǔ)在任務(wù)隊(duì)列中
- 是否達(dá)到線程池最大數(shù)量咖楣?沒(méi)達(dá)到,創(chuàng)建新的工作線程來(lái)執(zhí)行任務(wù)
- 最后執(zhí)行拒絕策略來(lái)處理這個(gè)任務(wù)
image.png
線程池接口 API
類型 | 名稱 | 描述 |
---|---|---|
接口 | Executor | 最上層的接口芦昔,定義了執(zhí)行任務(wù)的發(fā)放 execute |
接口 | ExcutorService | 繼承了 Executor 接口诱贿,拓展了Callable,Future,關(guān)閉方法 |
接口 | ScheduledExcutorService | 繼承了 ExcutorService 接口,增加了定時(shí)任務(wù)相關(guān)方法 |
實(shí)現(xiàn)類 | ThreadPoolExecutor | 基礎(chǔ)咕缎,標(biāo)準(zhǔn)的線程實(shí)現(xiàn)類 |
實(shí)現(xiàn)類 | ScheduledThreadPoolExecutor | 繼承了 ThreadPoolExecutor 珠十,實(shí)現(xiàn)了ScheduledExcutorService 中定時(shí)任務(wù)相關(guān)方法 |
代碼實(shí)現(xiàn)
-
線程池信息: 核心線程數(shù)量5,最大數(shù)量10凭豪,無(wú)界隊(duì)列
結(jié)果: 線程池線程數(shù)量為:5,超出數(shù)量的任務(wù)焙蹭,存入隊(duì)列中等待被執(zhí)行
public class Demo8 {
private void testCommon(ThreadPoolExecutor threadPoolExecutor) throws Exception{
for (int i=0; i<15; i++) {
threadPoolExecutor.submit(new MyTask(i));
System.out.println("任務(wù)提交成功 :" + i);
}
// 查看線程數(shù)量,查看隊(duì)列等待數(shù)量
Thread.sleep(500L);
System.out.println("當(dāng)前線程池線程數(shù)量為:" + threadPoolExecutor.getPoolSize());
System.out.println("當(dāng)前線程池等待的數(shù)量為:" + threadPoolExecutor.getQueue().size());
// 等待15秒嫂伞,查看線程數(shù)量和隊(duì)列數(shù)量(理論上孔厉,會(huì)被超出核心線程數(shù)量的線程自動(dòng)銷毀)
Thread.sleep(15000L);
System.out.println("當(dāng)前線程池線程數(shù)量為:" + threadPoolExecutor.getPoolSize());
System.out.println("當(dāng)前線程池等待的數(shù)量為:" + threadPoolExecutor.getQueue().size());
}
private void threadPoolExecutorTest1() throws Exception{
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5,10,5,
TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>());
testCommon(threadPoolExecutor);
}
public static void main(String[] args) throws Exception{
new Demo8().threadPoolExecutorTest1();
}
}
-
線程池信息: 核心線程數(shù)量5,最大數(shù)量10帖努,隊(duì)列大小3撰豺,超出核心線程數(shù)量的線程存活時(shí)間:5秒, 指定拒絕策略的
結(jié)果: 5個(gè)線程直接分配任務(wù)開始執(zhí)行拼余,3個(gè)任務(wù)存入隊(duì)列污桦,隊(duì)列夠用,臨時(shí)加開5個(gè)線程(5秒沒(méi)活干就銷毀)姿搜,仍有2個(gè)任務(wù)沒(méi)有資源去執(zhí)行寡润,拒絕執(zhí)行捆憎。
private void threadPoolExecutorTest2() throws Exception{
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5,
10,
5,
TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>(3),
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.err.println("任務(wù)被拒絕");
}
});
testCommon(threadPoolExecutor);
}
-
核心線程數(shù)量5,最大數(shù)量10梭纹,無(wú)界隊(duì)列
結(jié)果:線程池線程數(shù)量為:5,超出數(shù)量的任務(wù)众辨,存入隊(duì)列中等待被執(zhí)行
private void threadPoolExecutorTest3() throws Exception{
// 和Executors.newFixedThreadPool(int nThreads)一樣的
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5,
5,
0L,
TimeUnit.MILLISECONDS, new LinkedBlockingDeque<Runnable>());
testCommon(threadPoolExecutor);
}
-
緩存線程池负蚊,適合任務(wù)大小無(wú)法預(yù)估的情況,線程池有就復(fù)用,沒(méi)有就加開浩考。
結(jié)果:創(chuàng)建任務(wù)數(shù)相同的線程,執(zhí)行任務(wù)葛峻,60
秒后如果沒(méi)有任務(wù)两波,所有線程銷毀
private void threadPoolExecutorTest4() throws Exception{
// 相當(dāng)于 Executors.newCachedThreadPool();
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0,
Integer.MAX_VALUE,
60L,
TimeUnit.SECONDS,
new SynchronousQueue<>());
testCommon(threadPoolExecutor);
Thread.sleep(60000L);
System.out.println("60秒后,再看線程池中的數(shù)量:" + threadPoolExecutor.getPoolSize());
}
-
延時(shí)任務(wù)击儡,核心線程5個(gè)塔沃,最大數(shù)量Integer.MAX_VALUE,DelayedWorkQueue延時(shí)隊(duì)列阳谍,超出核心線程數(shù)量的線程存活時(shí)間:0秒
結(jié)果:3秒執(zhí)行一次
private void threadPoolExecutorTest5() throws Exception{
// 和Executors.newScheduledThreadPool()一樣的
ScheduledThreadPoolExecutor threadPoolExecutor = new ScheduledThreadPoolExecutor(5);
threadPoolExecutor.schedule(new Runnable() {
@Override
public void run() {
System.out.println("任務(wù)被執(zhí)行,當(dāng)前時(shí)間為:" + System.currentTimeMillis());
}
},3000,TimeUnit.MILLISECONDS);
System.out.println("定時(shí)任務(wù)執(zhí)行成功蛀柴,時(shí)間為:" + System.currentTimeMillis() + "當(dāng)前線程池中線程數(shù)量:" + threadPoolExecutor.getPoolSize());
}
- 周期執(zhí)行任務(wù),
方式1: 任務(wù)執(zhí)行時(shí)間超出周期時(shí)間矫夯,立即執(zhí)行
方式2: 任務(wù)執(zhí)行時(shí)間超出周期時(shí)間鸽疾,等待周期時(shí)間后執(zhí)行
private void threadPoolExecutorTest6() throws Exception{
ScheduledThreadPoolExecutor threadPoolExecutor = new ScheduledThreadPoolExecutor(5);
// 效果1: 提交后,2秒后開始第一次執(zhí)行训貌,之后每間隔1秒制肮,固定執(zhí)行一次(如果發(fā)現(xiàn)上次執(zhí)行還未完畢,則等待完畢递沪,完畢后立刻執(zhí)行)豺鼻。
// threadPoolExecutor.scheduleAtFixedRate()
//效果2:提交后,2秒后開始第一次執(zhí)行区拳,之后每間隔1秒拘领,固定執(zhí)行一次(如果發(fā)現(xiàn)上次執(zhí)行還未完畢,則等待完畢樱调,等上一次執(zhí)行完畢后再開始計(jì)時(shí)约素,等待1秒)。
threadPoolExecutor.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(3000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任務(wù)被執(zhí)行,當(dāng)前時(shí)間為:" + System.currentTimeMillis());
}
},2000,1000,TimeUnit.MILLISECONDS);
}
- 線程終止
shutdown方式 不接收新的任務(wù)提交笆凌,已提交任務(wù)會(huì)執(zhí)行完
shutdownNow方式 所有任務(wù)終止
/**
* 7圣猎、 終止線程:線程池信息: 核心線程數(shù)量5,最大數(shù)量10乞而,隊(duì)列大小3送悔,超出核心線程數(shù)量的線程存活時(shí)間:5秒, 指定拒絕策略的
*
* @throws Exception
*/
private void threadPoolExecutorTest7() throws Exception {
// 創(chuàng)建一個(gè) 核心線程數(shù)量為5,最大數(shù)量為10,等待隊(duì)列最大是3 的線程池欠啤,也就是最大容納13個(gè)任務(wù)荚藻。
// 默認(rèn)的策略是拋出RejectedExecutionException異常,java.util.concurrent.ThreadPoolExecutor.AbortPolicy
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10, 5, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(3), new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.err.println("有任務(wù)被拒絕執(zhí)行了");
}
});
// 測(cè)試: 提交15個(gè)執(zhí)行時(shí)間需要3秒的任務(wù)洁段,看超過(guò)大小的2個(gè)应狱,對(duì)應(yīng)的處理情況
for (int i = 0; i < 15; i++) {
int n = i;
threadPoolExecutor.submit(new Runnable() {
@Override
public void run() {
try {
System.out.println("開始執(zhí)行:" + n);
Thread.sleep(3000L);
System.err.println("執(zhí)行結(jié)束:" + n);
} catch (InterruptedException e) {
System.out.println("異常:" + e.getMessage());
}
}
});
System.out.println("任務(wù)提交成功 :" + i);
}
// 1秒后終止線程池
Thread.sleep(1000L);
threadPoolExecutor.shutdown();
// 再次提交提示失敗
threadPoolExecutor.submit(new Runnable() {
@Override
public void run() {
System.out.println("追加一個(gè)任務(wù)");
}
});
// 結(jié)果分析
// 1、 10個(gè)任務(wù)被執(zhí)行祠丝,3個(gè)任務(wù)進(jìn)入隊(duì)列等待疾呻,2個(gè)任務(wù)被拒絕執(zhí)行
// 2、調(diào)用shutdown后写半,不接收新的任務(wù)岸蜗,等待13任務(wù)執(zhí)行結(jié)束
// 3、 追加的任務(wù)在線程池關(guān)閉后叠蝇,無(wú)法再提交璃岳,會(huì)被拒絕執(zhí)行
}
/**
* 8、 立刻終止線程:線程池信息: 核心線程數(shù)量5蟆肆,最大數(shù)量10矾睦,隊(duì)列大小3,超出核心線程數(shù)量的線程存活時(shí)間:5秒炎功, 指定拒絕策略的
*
* @throws Exception
*/
private void threadPoolExecutorTest8() throws Exception {
// 創(chuàng)建一個(gè) 核心線程數(shù)量為5,最大數(shù)量為10,等待隊(duì)列最大是3 的線程池缓溅,也就是最大容納13個(gè)任務(wù)蛇损。
// 默認(rèn)的策略是拋出RejectedExecutionException異常,java.util.concurrent.ThreadPoolExecutor.AbortPolicy
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10, 5, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(3), new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.err.println("有任務(wù)被拒絕執(zhí)行了");
}
});
// 測(cè)試: 提交15個(gè)執(zhí)行時(shí)間需要3秒的任務(wù)坛怪,看超過(guò)大小的2個(gè)淤齐,對(duì)應(yīng)的處理情況
for (int i = 0; i < 15; i++) {
int n = i;
threadPoolExecutor.submit(new Runnable() {
@Override
public void run() {
try {
System.out.println("開始執(zhí)行:" + n);
Thread.sleep(3000L);
System.err.println("執(zhí)行結(jié)束:" + n);
} catch (InterruptedException e) {
System.out.println("異常:" + e.getMessage());
}
}
});
System.out.println("任務(wù)提交成功 :" + i);
}
// 1秒后終止線程池
Thread.sleep(1000L);
List<Runnable> shutdownNow = threadPoolExecutor.shutdownNow();
// 再次提交提示失敗
threadPoolExecutor.submit(new Runnable() {
@Override
public void run() {
System.out.println("追加一個(gè)任務(wù)");
}
});
System.out.println("未結(jié)束的任務(wù)有:" + shutdownNow.size());
// 結(jié)果分析
// 1、 10個(gè)任務(wù)被執(zhí)行袜匿,3個(gè)任務(wù)進(jìn)入隊(duì)列等待更啄,2個(gè)任務(wù)被拒絕執(zhí)行
// 2、調(diào)用shutdownnow后居灯,隊(duì)列中的3個(gè)線程不再執(zhí)行祭务,10個(gè)線程被終止
// 3、 追加的任務(wù)在線程池關(guān)閉后怪嫌,無(wú)法再提交义锥,會(huì)被拒絕執(zhí)行
}