為什么要使用線程池
- 反復創(chuàng)建線程開銷大
- 過多的線程會占用太多內(nèi)存
線程池的好處
- 加快響應(yīng)速度
- 合理利用CPU和內(nèi)存
- 統(tǒng)一管理
線程池適合應(yīng)用的場合
- 服務(wù)器接受到大量請求時蒸矛,使用線程池技術(shù)是非常合適的,它可以大大減少線程的創(chuàng)建和銷毀次數(shù)锻拘,提高服務(wù)器的工作效率
- 在開發(fā)中绪杏,如果需要創(chuàng)建5哥以上的線程下愈,那么就可以使用線程池來管理
線程池構(gòu)造函數(shù)的參數(shù)
參數(shù)名 | 類型 | 含義 |
---|---|---|
corePoolSize | int | 核心線程數(shù) |
maximumPoolSize | int | 最大線程數(shù) |
keepAliveTime | long | 保持存活時間 |
workQueue | BlockingQueue | 任務(wù)存儲隊列 |
threadFactory | ThreadFactory | 當線程池需要新的線程的時候,會使用threadFactory來生成新的線程 |
Handler | RejectedExecutionHandler | 由于線程池無法接受你所提交的任務(wù)的拒絕策略 |
corePoolSize
指的是核心線程數(shù)寞忿,線程池在完成初始化后驰唬,默認情況下顶岸,線程池中并沒有任何線程腔彰,線程池會等待有任務(wù)到來時叫编,再創(chuàng)建新線程去執(zhí)行任務(wù)
maximumPoolSize
線程池有可能會在核心線程數(shù)的基礎(chǔ)上,額外增加一些線程霹抛,但是這些新增加的線程數(shù)有一個上限搓逾,這就是maximumPoolSize
keepAliveTime
如果線程池當前的線程數(shù)多于corePoolSize,那么如果多余的線程空閑時間超過keepAliveTime杯拐,它們就會被終止
ThreadFactory
新的線程都是由ThreadFactory創(chuàng)建的霞篡,默認使用Executors.defaultThreadFactory,創(chuàng)建出來的線程都在同一個線程組端逼,擁有同樣的NORM_PRIORITY優(yōu)先級并且都不是守護線程朗兵。如果自己指定ThreadFactory,那么就可以改變線程名顶滩、線程組余掖、優(yōu)先級、是否是守護線程等礁鲁。通常使用默認的ThreadFactory就可以了盐欺。
workQueue
有3種最常見的隊列類型
1.直接交接:SynchronousQueue
2.無界隊列:LinkedBlockingQueue
3.有界隊列:ArrayBlockingQueue
線程池添加線程規(guī)則
- 如果線程數(shù)小于corePoolSize,即使其他工作線程處于空閑狀態(tài)仅醇,也會創(chuàng)建一個新線程來運行新任務(wù)
- 如果線程數(shù)等于(或大于)corePoolSize但少于maximumPoolSize冗美,則將任務(wù)放入隊列
- 如果隊列已滿,并且線程數(shù)小于maximumPoolSize析二,則創(chuàng)建一個新線程來運行任務(wù)
- 如果隊列已滿粉洼,并且線程數(shù)大于或等于maximumPoolSize,則拒絕該任務(wù)
增減線程的特點
- 通過設(shè)置corePoolSize和maximumPoolSize相同叶摄,就可以創(chuàng)建固定大小的線程池漆改;
- 線程池希望保持較少的線程數(shù),并且只有在負載變得很大時才增加它准谚;
- 通過設(shè)置maximumPoolSize為很高的值(例如Integer.MAX_VALUE)挫剑,可以允許線程池容納任意數(shù)量的并發(fā)任務(wù);
- 是只有在隊列填滿時才創(chuàng)建多于corePoolSize的線程柱衔,所以如果你使用的是無界隊列(例如LinkedBlockingQueue),那么線程數(shù)就不會超過corePoolSize樊破。
線程池應(yīng)該手動創(chuàng)建還是自動創(chuàng)建
newFixedThreadPool:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
- 通過源碼可以看出,newFixedThreadPool 使用的是 LinkedBlockingQueue唆铐,由于 LinkedBlockingQueue 是沒有容量上限的哲戚,所以當請求數(shù)越來越多,并且無法及時處理完畢的時候艾岂,也就是請求堆積的時候顺少,會容易造成占用大量的內(nèi)存,可能會導致OOM
newSingleThreadExecutor:
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
- 通過源碼可以看出,這里和上面的 newFixedThreadPool 的原理基本一樣脆炎,只不過是把線程數(shù)直接設(shè)置成了1梅猿,所以這也會導致同樣的問題,也就是當請求堆積的時候秒裕,可能會占用大量的內(nèi)存袱蚓。
newCachedThreadPool:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
- 可緩存線程池
- 特點:無界限線程池,具有自動回收多余線程的功能(默認時間是60秒)
- 弊端:在于第二個參數(shù) maximumPoolSize 被設(shè)置為了Integer.MAX_VALUE几蜻,這可能會創(chuàng)建數(shù)量非常多的線程喇潘,甚至導致OOM。
newScheduledThreadPool:
- 支持定時及周期性任務(wù)執(zhí)行的線程池
正確的創(chuàng)建線程池的方法
根據(jù)不同的業(yè)務(wù)場景梭稚,選擇合適的方式颖低,最后是我們自己手動創(chuàng)建線程池,自己設(shè)置線程池參數(shù)弧烤。
線程池里的線程數(shù)量設(shè)定為多少比較合適
- CPU密集型(加密枫甲、計算hash等):最佳線程數(shù)為CPU核心數(shù)的 1-2 倍左右
- 耗時IO型(讀寫數(shù)據(jù)庫、文件扼褪、網(wǎng)絡(luò)讀寫等):最佳線程數(shù)一般會大于CPU核心數(shù)很多倍想幻,以JVM線程監(jiān)控顯示繁忙情況為依據(jù),保證線程空閑可以銜接上话浇,參考 Brain Goetz 推薦的計算方法:
線程數(shù)=CPU核心數(shù)x(1+平均等待時間/平均工作時間)
停止線程池的正確方法
- shutdown
shutdown 并不是立即粗暴的結(jié)束線程脏毯,線程池仍然會繼續(xù)執(zhí)行已創(chuàng)建的任務(wù),但是不會接收新的任務(wù)了幔崖。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ShutDownDemo {
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < 1000; i++) {
executorService.execute(new ShutDownTask());
}
Thread.sleep(1500);
executorService.shutdown();
// 1500毫秒后此處會拋出異常 RejectedExecutionException
executorService.execute(new ShutDownTask());
}
}
class ShutDownTask implements Runnable {
@Override
public void run() {
try {
Thread.sleep(500);
System.out.println(Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
上述代碼在1.5秒后執(zhí)行 executorService.shutdown(); 之后食店,再執(zhí)行
executorService.execute(new ShutDownTask()); 則會報錯拋出異常 RejectedExecutionException
- isShutdown
返回true或false告訴我們線程是否已經(jīng)停止
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < 1000; i++) {
executorService.execute(new ShutDownTask());
}
Thread.sleep(1500);
System.out.println(executorService.isShutdown()); // 打印false
executorService.shutdown();
System.out.println(executorService.isShutdown()); // 打印true
System.out.println(executorService.isTerminated());
}
- isTerminated
返回true或false告訴我們線程是否已經(jīng)完全停止
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < 1000; i++) {
executorService.execute(new ShutDownTask());
}
Thread.sleep(1500);
executorService.shutdown();
// 打印false,因為線程還沒有完全執(zhí)行完
System.out.println(executorService.isTerminated());
}
- awaitTermination
awaitTermination 有三種情況會返回赏寇,沒返回之前都是阻塞的
第一種情況:所有任務(wù)都執(zhí)行完畢了
第二種情況:等待的時間到了
第三種情況:等待的過程中被打斷了吉嫩,會拋出 InterruptedException
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < 1000; i++) {
executorService.execute(new ShutDownTask());
}
Thread.sleep(1500);
boolean b = executorService.awaitTermination(3L, TimeUnit.SECONDS);
// 會打印false,因為3秒鐘不夠線程全部執(zhí)行完
System.out.println(b);
}
- shutdownNow
正在執(zhí)行任務(wù)的線程繼續(xù)執(zhí)行嗅定,等待隊列中的線程直接結(jié)束并返回
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ShutDownDemo {
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < 100; i++) {
executorService.execute(new ShutDownTask());
}
// 等待1.5秒
Thread.sleep(1500);
// 這里會返回已經(jīng)放到線程池隊列中還沒有執(zhí)行的Runnable
List<Runnable> runnables = executorService.shutdownNow();
}
}
class ShutDownTask implements Runnable {
@Override
public void run() {
try {
Thread.sleep(500);
System.out.println(Thread.currentThread().getName());
} catch (InterruptedException e) {
// 接收被中斷信號
System.out.println(Thread.currentThread().getName() + "被中斷了自娩!");
}
}
}
executorService.shutdownNow();會返回已經(jīng)放到線程池隊列中還沒有執(zhí)行的Runnable
運行結(jié)果:
...
...
...
pool-1-thread-8
pool-1-thread-7
pool-1-thread-6
pool-1-thread-10
pool-1-thread-1
pool-1-thread-3
pool-1-thread-1被中斷了!
pool-1-thread-4
pool-1-thread-2
pool-1-thread-7被中斷了渠退!
pool-1-thread-8被中斷了忙迁!
pool-1-thread-10被中斷了!
pool-1-thread-6被中斷了碎乃!
pool-1-thread-9被中斷了姊扔!
pool-1-thread-5被中斷了!
Process finished with exit code 0
線程池任務(wù)太多梅誓,怎么拒絕
-
拒絕時機
- 當Executor關(guān)閉時恰梢,提交新任務(wù)會被拒絕
- 以及當Executor對最大線程和工作隊列容量使用有限邊界并且已經(jīng)飽和時
-
拒絕策略
- AbortPolicy佛南,直接拋出一個異常
- DiscardPolicy,悄悄的把任務(wù)丟棄嵌言,沒有通知
- DiscardOldestPolicy嗅回,把隊列中最老的那個任務(wù)丟棄,新任務(wù)加進來
- CallerRunsPolicy呀页,誰提交的任務(wù)誰去執(zhí)行(比如說主線程給線程池提交了一個任務(wù)妈拌,但是線程池已經(jīng)飽和無法再執(zhí)行了拥坛,這時則會讓提交任務(wù)的主線程去執(zhí)行這個任務(wù))
線程池鉤子函數(shù)
import java.util.concurrent.*;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* 演示每個任務(wù)執(zhí)行前后放鉤子函數(shù)
*/
public class PauseableThreadPool extends ThreadPoolExecutor {
private boolean isPaused;
private final ReentrantLock lock = new ReentrantLock();
private Condition unpaused = lock.newCondition();
public static void main(String[] args) throws InterruptedException {
PauseableThreadPool pauseableThreadPool = new PauseableThreadPool(10, 20, 10L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>());
Runnable runnable = new Runnable() {
@Override
public void run() {
System.out.println("我被執(zhí)行了");
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
for (int i = 0; i < 10000; i++) {
pauseableThreadPool.execute(runnable);
}
Thread.sleep(2000);
// 等待2秒蓬蝶,執(zhí)行暫停方法
pauseableThreadPool.pause();
System.out.println("線程池被暫停了!");
Thread.sleep(2000);
// 再等待2秒猜惋,執(zhí)行恢復方法
pauseableThreadPool.resume();
System.out.println("線程池被恢復了丸氛!");
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
lock.lock();
try {
while (isPaused) {
unpaused.await();
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
/**
* 暫停線程
*/
private void pause() {
lock.lock();
try {
isPaused = true;
} finally {
lock.unlock();
}
}
/**
* 恢復線程
*/
private void resume() {
lock.lock();
try {
isPaused = false;
unpaused.signalAll();
} finally {
lock.unlock();
}
}
public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
}
運行結(jié)果: