前言
java多線程我個人覺得是javaSe中最難的一部分叼屠,我以前也是感覺學(xué)會了,但是真正有多線程的需求卻不知道怎么下手暂衡,實際上還是對多線程這塊知識了解不深刻育瓜,不知道多線程api的應(yīng)用場景,不知道多線程的運行流程等等株依,本篇文章將使用實例+圖解+源碼的方式來解析java多線程驱证。
文章篇幅較長,大家也可以有選擇的看具體章節(jié)恋腕,建議多線程的代碼全部手敲抹锄,永遠不要相信你看到的結(jié)論,自己編碼后運行出來的荠藤,才是自己的伙单。
什么是java多線程?
進程與線程
進程
- 當一個程序被運行,就開啟了一個進程哈肖, 比如啟動了qq吻育,word
- 程序由指令和數(shù)據(jù)組成,指令要運行淤井,數(shù)據(jù)要加載布疼,指令被cpu加載運行,數(shù)據(jù)被加載到內(nèi)存庄吼,指令運行時可由cpu調(diào)度硬盤缎除、網(wǎng)絡(luò)等設(shè)備
線程
- 一個進程內(nèi)可分為多個線程
- 一個線程就是一個指令流,cpu調(diào)度的最小單位总寻,由cpu一條一條執(zhí)行指令
并行與并發(fā)
并發(fā):單核cpu運行多線程時器罐,時間片進行很快的切換。線程輪流執(zhí)行cpu
并行:多核cpu運行 多線程時渐行,真正的在同一時刻運行
java提供了豐富的api來支持多線程轰坊。
為什么用多線程?
多線程能實現(xiàn)的都可以用單線程來完成,那單線程運行的好好的祟印,為什么java要引入多線程的概念呢肴沫?
多線程的好處:
程序運行的更快!快蕴忆!快颤芬!
-
充分利用cpu資源,目前幾乎沒有線上的cpu是單核的,發(fā)揮多核cpu強大的能力
image
多線程難在哪里站蝠?
單線程只有一條執(zhí)行線汰具,過程容易理解,可以在大腦中清晰的勾勒出代碼的執(zhí)行流程
多線程卻是多條線菱魔,而且一般多條線之間有交互渣刷,多條線之間需要通信翠语,一般難點有以下幾點
- 多線程的執(zhí)行結(jié)果不確定,受到cpu調(diào)度的影響
- 多線程的安全問題
- 線程資源寶貴,依賴線程池操作線程嗤无,線程池的參數(shù)設(shè)置問題
- 多線程執(zhí)行是動態(tài)的霹陡,同時的,難以追蹤過程
- 多線程的底層是操作系統(tǒng)層面的婆殿,源碼難度大
有時候希望自己變成一個字節(jié)穿梭于服務(wù)器中载矿,搞清楚來龍去脈板辽,就像無敵破壞王一樣(沒看過這部電影的可以看下,腦洞大開)栋艳。
java多線程的基本使用
定義任務(wù)恰聘、創(chuàng)建和運行線程
任務(wù): 線程的執(zhí)行體句各。也就是我們的核心代碼邏輯
定義任務(wù)
- 繼承Thread類 (可以說是 將任務(wù)和線程合并在一起)
- 實現(xiàn)Runnable接口 (可以說是 將任務(wù)和線程分開了)
- 實現(xiàn)Callable接口 (利用FutureTask執(zhí)行任務(wù))
Thread實現(xiàn)任務(wù)的局限性
- 任務(wù)邏輯寫在Thread類的run方法中吸占,有單繼承的局限性
- 創(chuàng)建多線程時,每個任務(wù)有成員變量時不共享凿宾,必須加static才能做到共享
Runnable和Callable解決了Thread的局限性
但是Runbale相比Callable有以下的局限性
- 任務(wù)沒有返回值
- 任務(wù)無法拋異常給調(diào)用方
如下代碼 幾種定義線程的方式
@Slf4j
class T extends Thread {
@Override
public void run() {
log.info("我是繼承Thread的任務(wù)");
}
}
@Slf4j
class R implements Runnable {
@Override
public void run() {
log.info("我是實現(xiàn)Runnable的任務(wù)");
}
}
@Slf4j
class C implements Callable<String> {
@Override
public String call() throws Exception {
log.info("我是實現(xiàn)Callable的任務(wù)");
return "success";
}
}
創(chuàng)建線程的方式
- 通過Thread類直接創(chuàng)建線程
- 利用線程池內(nèi)部創(chuàng)建線程
啟動線程的方式
- 調(diào)用線程的start()方法
// 啟動繼承Thread類的任務(wù)
new T().start();
// 啟動繼承Thread匿名內(nèi)部類的任務(wù) 可用lambda優(yōu)化
Thread t = new Thread(){
@Override
public void run() {
log.info("我是Thread匿名內(nèi)部類的任務(wù)");
}
};
// 啟動實現(xiàn)Runnable接口的任務(wù)
new Thread(new R()).start();
// 啟動實現(xiàn)Runnable匿名實現(xiàn)類的任務(wù)
new Thread(new Runnable() {
@Override
public void run() {
log.info("我是Runnable匿名內(nèi)部類的任務(wù)");
}
}).start();
// 啟動實現(xiàn)Runnable的lambda簡化后的任務(wù)
new Thread(() -> log.info("我是Runnable的lambda簡化后的任務(wù)")).start();
// 啟動實現(xiàn)了Callable接口的任務(wù) 結(jié)合FutureTask 可以獲取線程執(zhí)行的結(jié)果
FutureTask<String> target = new FutureTask<>(new C());
new Thread(target).start();
log.info(target.get());
以上各個線程相關(guān)的類的類圖如下
上下文切換
多核cpu下矾屯,多線程是并行工作的,如果線程數(shù)多初厚,單個核又會并發(fā)的調(diào)度線程,運行時會有上下文切換的概念
cpu執(zhí)行線程的任務(wù)時件蚕,會為線程分配時間片,以下幾種情況會發(fā)生上下文切換产禾。
- 線程的cpu時間片用完
- 垃圾回收
- 線程自己調(diào)用了 sleep排作、yield、wait亚情、join妄痪、park、synchronized楞件、lock 等方法
當發(fā)生上下文切換時衫生,操作系統(tǒng)會保存當前線程的狀態(tài),并恢復(fù)另一個線程的狀態(tài),jvm中有塊內(nèi)存地址叫程序計數(shù)器土浸,用于記錄線程執(zhí)行到哪一行代碼,是線程私有的罪针。
idea打斷點的時候可以設(shè)置為Thread模式,idea的debug模式可以看出棧幀的變化
線程的禮讓-yield()&線程的優(yōu)先級
yield()方法會讓運行中的線程切換到就緒狀態(tài),重新爭搶cpu的時間片黄伊,爭搶時是否獲取到時間片看cpu的分配泪酱。
代碼如下
// 方法的定義
public static native void yield();
Runnable r1 = () -> {
int count = 0;
for (;;){
log.info("---- 1>" + count++);
}
};
Runnable r2 = () -> {
int count = 0;
for (;;){
Thread.yield();
log.info(" ---- 2>" + count++);
}
};
Thread t1 = new Thread(r1,"t1");
Thread t2 = new Thread(r2,"t2");
t1.start();
t2.start();
// 運行結(jié)果
11:49:15.796 [t1] INFO thread.TestYield - ---- 1>129504
11:49:15.796 [t1] INFO thread.TestYield - ---- 1>129505
11:49:15.796 [t1] INFO thread.TestYield - ---- 1>129506
11:49:15.796 [t1] INFO thread.TestYield - ---- 1>129507
11:49:15.796 [t1] INFO thread.TestYield - ---- 1>129508
11:49:15.796 [t1] INFO thread.TestYield - ---- 1>129509
11:49:15.796 [t1] INFO thread.TestYield - ---- 1>129510
11:49:15.796 [t1] INFO thread.TestYield - ---- 1>129511
11:49:15.796 [t1] INFO thread.TestYield - ---- 1>129512
11:49:15.798 [t2] INFO thread.TestYield - ---- 2>293
11:49:15.798 [t1] INFO thread.TestYield - ---- 1>129513
11:49:15.798 [t1] INFO thread.TestYield - ---- 1>129514
11:49:15.798 [t1] INFO thread.TestYield - ---- 1>129515
11:49:15.798 [t1] INFO thread.TestYield - ---- 1>129516
11:49:15.798 [t1] INFO thread.TestYield - ---- 1>129517
11:49:15.798 [t1] INFO thread.TestYield - ---- 1>129518
如上述結(jié)果所示,t2線程每次執(zhí)行時進行了yield(),線程1執(zhí)行的機會明顯比線程2要多墓阀。
線程的優(yōu)先級
? 線程內(nèi)部用1~10的數(shù)來調(diào)整線程的優(yōu)先級愈腾,默認的線程優(yōu)先級為NORM_PRIORITY:5
? cpu比較忙時,優(yōu)先級高的線程獲取更多的時間片
? cpu比較閑時岂津,優(yōu)先級設(shè)置基本沒用
public final static int MIN_PRIORITY = 1;
public final static int NORM_PRIORITY = 5;
public final static int MAX_PRIORITY = 10;
// 方法的定義
public final void setPriority(int newPriority) {
}
cpu比較忙時
Runnable r1 = () -> {
int count = 0;
for (;;){
log.info("---- 1>" + count++);
}
};
Runnable r2 = () -> {
int count = 0;
for (;;){
log.info(" ---- 2>" + count++);
}
};
Thread t1 = new Thread(r1,"t1");
Thread t2 = new Thread(r2,"t2");
t1.setPriority(Thread.NORM_PRIORITY);
t2.setPriority(Thread.MAX_PRIORITY);
t1.start();
t2.start();
// 可能的運行結(jié)果
11:59:00.696 [t1] INFO thread.TestYieldPriority - ---- 1>44102
11:59:00.696 [t2] INFO thread.TestYieldPriority - ---- 2>135903
11:59:00.696 [t2] INFO thread.TestYieldPriority - ---- 2>135904
11:59:00.696 [t2] INFO thread.TestYieldPriority - ---- 2>135905
11:59:00.696 [t2] INFO thread.TestYieldPriority - ---- 2>135906
cpu比較閑時
Runnable r1 = () -> {
int count = 0;
for (int i = 0; i < 10; i++) {
log.info("---- 1>" + count++);
}
};
Runnable r2 = () -> {
int count = 0;
for (int i = 0; i < 10; i++) {
log.info(" ---- 2>" + count++);
}
};
Thread t1 = new Thread(r1,"t1");
Thread t2 = new Thread(r2,"t2");
t1.setPriority(Thread.MIN_PRIORITY);
t2.setPriority(Thread.MAX_PRIORITY);
t1.start();
t2.start();
// 可能的運行結(jié)果 線程1優(yōu)先級低 卻先運行完
12:01:09.916 [t1] INFO thread.TestYieldPriority - ---- 1>7
12:01:09.916 [t1] INFO thread.TestYieldPriority - ---- 1>8
12:01:09.916 [t1] INFO thread.TestYieldPriority - ---- 1>9
12:01:09.916 [t2] INFO thread.TestYieldPriority - ---- 2>2
12:01:09.916 [t2] INFO thread.TestYieldPriority - ---- 2>3
12:01:09.916 [t2] INFO thread.TestYieldPriority - ---- 2>4
12:01:09.916 [t2] INFO thread.TestYieldPriority - ---- 2>5
12:01:09.916 [t2] INFO thread.TestYieldPriority - ---- 2>6
12:01:09.916 [t2] INFO thread.TestYieldPriority - ---- 2>7
12:01:09.916 [t2] INFO thread.TestYieldPriority - ---- 2>8
12:01:09.916 [t2] INFO thread.TestYieldPriority - ---- 2>9
守護線程
默認情況下虱黄,java進程需要等待所有線程都運行結(jié)束,才會結(jié)束吮成,有一種特殊線程叫守護線程橱乱,當所有的非守護線程都結(jié)束后,即使它沒有執(zhí)行完粱甫,也會強制結(jié)束泳叠。
默認的線程都是非守護線程。
垃圾回收線程就是典型的守護線程
// 方法的定義
public final void setDaemon(boolean on) {
}
Thread thread = new Thread(() -> {
while (true) {
}
});
// 具體的api茶宵。設(shè)為true表示未守護線程危纫,當主線程結(jié)束后,守護線程也結(jié)束乌庶。
// 默認是false种蝶,當主線程結(jié)束后,thread繼續(xù)運行瞒大,程序不停止
thread.setDaemon(true);
thread.start();
log.info("結(jié)束");
線程的阻塞
線程的阻塞可以分為好多種螃征,從操作系統(tǒng)層面和java層面阻塞的定義可能不同,但是廣義上使得線程阻塞的方式有下面幾種
- BIO阻塞透敌,即使用了阻塞式的io流
- sleep(long time) 讓線程休眠進入阻塞狀態(tài)
- a.join() 調(diào)用該方法的線程進入阻塞盯滚,等待a線程執(zhí)行完恢復(fù)運行
- sychronized或ReentrantLock 造成線程未獲得鎖進入阻塞狀態(tài) (同步鎖章節(jié)細說)
- 獲得鎖之后調(diào)用wait()方法 也會讓線程進入阻塞狀態(tài) (同步鎖章節(jié)細說)
- LockSupport.park() 讓線程進入阻塞狀態(tài) (同步鎖章節(jié)細說)
sleep()
? 使線程休眠,會將運行中的線程進入阻塞狀態(tài)酗电。當休眠時間結(jié)束后魄藕,重新爭搶cpu的時間片繼續(xù)運行
// 方法的定義 native方法
public static native void sleep(long millis) throws InterruptedException;
try {
// 休眠2秒
// 該方法會拋出 InterruptedException異常 即休眠過程中可被中斷,被中斷后拋出異常
Thread.sleep(2000);
} catch (InterruptedException異常 e) {
}
try {
// 使用TimeUnit的api可替代 Thread.sleep
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
join()
? join是指調(diào)用該方法的線程進入阻塞狀態(tài)撵术,等待某線程執(zhí)行完成后恢復(fù)運行
// 方法的定義 有重載
// 等待線程執(zhí)行完才恢復(fù)運行
public final void join() throws InterruptedException {
}
// 指定join的時間背率。指定時間內(nèi) 線程還未執(zhí)行完 調(diào)用方線程不繼續(xù)等待就恢復(fù)運行
public final synchronized void join(long millis)
throws InterruptedException{}
Thread t = new Thread(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
r = 10;
});
t.start();
// 讓主線程阻塞 等待t線程執(zhí)行完才繼續(xù)執(zhí)行
// 去除該行,執(zhí)行結(jié)果為0荷荤,加上該行 執(zhí)行結(jié)果為10
t.join();
log.info("r:{}", r);
// 運行結(jié)果
13:09:13.892 [main] INFO thread.TestJoin - r:10
線程的打斷-interrupt()
// 相關(guān)方法的定義
public void interrupt() {
}
public boolean isInterrupted() {
}
public static boolean interrupted() {
}
打斷標記:線程是否被打斷退渗,true表示被打斷了,false表示沒有
isInterrupted() 獲取線程的打斷標記 ,調(diào)用后不會修改線程的打斷標記
interrupt()方法用于中斷線程
- 可以打斷sleep,wait,join等顯式的拋出InterruptedException方法的線程蕴纳,但是打斷后,線程的打斷標記還是false
- 打斷正常線程 会油,線程不會真正被中斷,但是線程的打斷標記為true
interrupted() 獲取線程的打斷標記古毛,調(diào)用后清空打斷標記 即如果獲取為true 調(diào)用后打斷標記為false (不常用)
interrupt實例: 有個后臺監(jiān)控線程不停的監(jiān)控翻翩,當外界打斷它時都许,就結(jié)束運行。代碼如下
@Slf4j
class TwoPhaseTerminal{
// 監(jiān)控線程
private Thread monitor;
public void start(){
monitor = new Thread(() ->{
// 不停的監(jiān)控
while (true){
Thread thread = Thread.currentThread();
// 判斷當前線程是否被打斷
if (thread.isInterrupted()){
log.info("當前線程被打斷,結(jié)束運行");
break;
}
try {
Thread.sleep(1000);
// 監(jiān)控邏輯中被打斷后嫂冻,打斷標記為true
log.info("監(jiān)控");
} catch (InterruptedException e) {
// 睡眠時被打斷時拋出異常 在該處捕獲到 此時打斷標記還是false
// 在調(diào)用一次中斷 使得中斷標記為true
thread.interrupt();
}
}
});
monitor.start();
}
public void stop(){
monitor.interrupt();
}
}
線程的狀態(tài)
上面說了一些基本的api的使用胶征,調(diào)用上面的方法后都會使得線程有對應(yīng)的狀態(tài)。
線程的狀態(tài)可從 操作系統(tǒng)層面分為五種狀態(tài) 從java api層面分為六種狀態(tài)桨仿。
五種狀態(tài)
- 初始狀態(tài):創(chuàng)建線程對象時的狀態(tài)
- 可運行狀態(tài)(就緒狀態(tài)):調(diào)用start()方法后進入就緒狀態(tài)睛低,也就是準備好被cpu調(diào)度執(zhí)行
- 運行狀態(tài):線程獲取到cpu的時間片,執(zhí)行run()方法的邏輯
- 阻塞狀態(tài): 線程被阻塞服傍,放棄cpu的時間片钱雷,等待解除阻塞重新回到就緒狀態(tài)爭搶時間片
- 終止狀態(tài): 線程執(zhí)行完成或拋出異常后的狀態(tài)
六種狀態(tài)
Thread類中的內(nèi)部枚舉State
public enum State {
NEW,
RUNNABLE,
BLOCKED,
WAITING,
TIMED_WAITING,
TERMINATED;
}
- NEW 線程對象被創(chuàng)建
- Runnable 線程調(diào)用了start()方法后進入該狀態(tài),該狀態(tài)包含了三種情況
- 就緒狀態(tài) :等待cpu分配時間片
- 運行狀態(tài):進入Runnable方法執(zhí)行任務(wù)
- 阻塞狀態(tài):BIO 執(zhí)行阻塞式io流時的狀態(tài)
- Blocked 沒獲取到鎖時的阻塞狀態(tài)(同步鎖章節(jié)會細說)
- WAITING 調(diào)用wait()吹零、join()等方法后的狀態(tài)
- TIMED_WAITING 調(diào)用 sleep(time)罩抗、wait(time)、join(time)等方法后的狀態(tài)
- TERMINATED 線程執(zhí)行完成或拋出異常后的狀態(tài)
六種線程狀態(tài)和方法的對應(yīng)關(guān)系
線程的相關(guān)方法總結(jié)
主要總結(jié)Thread類中的核心方法
方法名稱 | 是否static | 方法說明 |
---|---|---|
start() | 否 | 讓線程啟動灿椅,進入就緒狀態(tài),等待cpu分配時間片 |
run() | 否 | 重寫Runnable接口的方法,線程獲取到cpu時間片時執(zhí)行的具體邏輯 |
yield() | 是 | 線程的禮讓套蒂,使得獲取到cpu時間片的線程進入就緒狀態(tài),重新爭搶時間片 |
sleep(time) | 是 | 線程休眠固定時間茫蛹,進入阻塞狀態(tài)操刀,休眠時間完成后重新爭搶時間片,休眠可被打斷 |
join()/join(time) | 否 | 調(diào)用線程對象的join方法,調(diào)用者線程進入阻塞,等待線程對象執(zhí)行完或者到達指定時間才恢復(fù)麻惶,重新爭搶時間片 |
isInterrupted() | 否 | 獲取線程的打斷標記馍刮,true:被打斷,false:沒有被打斷窃蹋。調(diào)用后不會修改打斷標記 |
interrupt() | 否 | 打斷線程,拋出InterruptedException異常的方法均可被打斷静稻,但是打斷后不會修改打斷標記警没,正常執(zhí)行的線程被打斷后會修改打斷標記 |
interrupted() | 否 | 獲取線程的打斷標記。調(diào)用后會清空打斷標記 |
stop() | 否 | 停止線程運行 不推薦 |
suspend() | 否 | 掛起線程 不推薦 |
resume() | 否 | 恢復(fù)線程運行 不推薦 |
currentThread() | 是 | 獲取當前線程 |
Object中與線程相關(guān)方法
方法名稱 | 方法說明 |
---|---|
wait()/wait(long timeout) | 獲取到鎖的線程進入阻塞狀態(tài) |
notify() | 隨機喚醒被wait()的一個線程 |
notifyAll(); | 喚醒被wait()的所有線程振湾,重新爭搶時間片 |
同步鎖
線程安全
- 一個程序運行多個線程本身是沒有問題的
- 問題有可能出現(xiàn)在多個線程訪問共享資源
- 多個線程都是讀共享資源也是沒有問題的
- 當多個線程讀寫共享資源時,如果發(fā)生指令交錯杀迹,就會出現(xiàn)問題
臨界區(qū): 一段代碼如果對共享資源的多線程讀寫操作,這段代碼就被稱為臨界區(qū)。
注意的是 指令交錯指的是 java代碼在解析成字節(jié)碼文件時押搪,java代碼的一行代碼在字節(jié)碼中可能有多行树酪,在線程上下文切換時就有可能交錯。
線程安全指的是多線程調(diào)用同一個對象的臨界區(qū)的方法時大州,對象的屬性值一定不會發(fā)生錯誤续语,這就是保證了線程安全。
如下面不安全的代碼
// 對象的成員變量
private static int count = 0;
public static void main(String[] args) throws InterruptedException {
// t1線程對變量+5000次
Thread t1 = new Thread(() -> {
for (int i = 0; i < 5000; i++) {
count++;
}
});
// t2線程對變量-5000次
Thread t2 = new Thread(() -> {
for (int i = 0; i < 5000; i++) {
count--;
}
});
t1.start();
t2.start();
// 讓t1 t2都執(zhí)行完
t1.join();
t2.join();
System.out.println(count);
}
// 運行結(jié)果
-1399
上面的代碼 兩個線程厦画,一個+5000次疮茄,一個-5000次滥朱,如果線程安全,count的值應(yīng)該還是0力试。
但是運行很多次徙邻,每次的結(jié)果不同,且都不是0畸裳,所以是線程不安全的缰犁。
線程安全的類一定所有的操作都線程安全嗎?
開發(fā)中經(jīng)常會說到一些線程安全的類怖糊,如ConcurrentHashMap民鼓,線程安全指的是類里每一個獨立的方法是線程安全的,但是方法的組合就不一定是線程安全的蓬抄。
成員變量和靜態(tài)變量是否線程安全?
- 如果沒有多線程共享丰嘉,則線程安全
- 如果存在多線程共享
- 多線程只有讀操作,則線程安全
- 多線程存在寫操作嚷缭,寫操作的代碼又是臨界區(qū),則線程不安全
局部變量是否線程安全?
- 局部變量是線程安全的
- 局部變量引用的對象未必是線程安全的
- 如果該對象沒有逃離該方法的作用范圍饮亏,則線程安全
- 如果該對象逃離了該方法的作用范圍,比如:方法的返回值,需要考慮線程安全
synchronized
同步鎖也叫對象鎖阅爽,是鎖在對象上的路幸,不同的對象就是不同的鎖。
該關(guān)鍵字是用于保證線程安全的付翁,是阻塞式的解決方案简肴。
讓同一個時刻最多只有一個線程能持有對象鎖,其他線程在想獲取這個對象鎖就會被阻塞百侧,不用擔(dān)心上下文切換的問題砰识。
注意: 不要理解為一個線程加了鎖 ,進入 synchronized代碼塊中就會一直執(zhí)行下去佣渴。如果時間片切換了辫狼,也會執(zhí)行其他線程,再切換回來會緊接著執(zhí)行辛润,只是不會執(zhí)行到有競爭鎖的資源膨处,因為當前線程還未釋放鎖。
當一個線程執(zhí)行完synchronized的代碼塊后 會喚醒正在等待的線程
synchronized實際上使用對象鎖保證臨界區(qū)的原子性 臨界區(qū)的代碼是不可分割的 不會因為線程切換所打斷
基本使用
// 加在方法上 實際是對this對象加鎖
private synchronized void a() {
}
// 同步代碼塊,鎖對象可以是任意的砂竖,加在this上 和a()方法作用相同
private void b(){
synchronized (this){
}
}
// 加在靜態(tài)方法上 實際是對類對象加鎖
private synchronized static void c() {
}
// 同步代碼塊 實際是對類對象加鎖 和c()方法作用相同
private void d(){
synchronized (TestSynchronized.class){
}
}
// 上述b方法對應(yīng)的字節(jié)碼源碼 其中monitorenter就是加鎖的地方
0 aload_0
1 dup
2 astore_1
3 monitorenter
4 aload_1
5 monitorexit
6 goto 14 (+8)
9 astore_2
10 aload_1
11 monitorexit
12 aload_2
13 athrow
14 return
線程安全的代碼
private static int count = 0;
private static Object lock = new Object();
private static Object lock2 = new Object();
// t1線程和t2對象都是對同一對象加鎖真椿。保證了線程安全。此段代碼無論執(zhí)行多少次乎澄,結(jié)果都是0
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(() -> {
for (int i = 0; i < 5000; i++) {
synchronized (lock) {
count++;
}
}
});
Thread t2 = new Thread(() -> {
for (int i = 0; i < 5000; i++) {
synchronized (lock) {
count--;
}
}
});
t1.start();
t2.start();
// 讓t1 t2都執(zhí)行完
t1.join();
t2.join();
System.out.println(count);
}
重點:加鎖是加在對象上突硝,一定要保證是同一對象,加鎖才能生效
線程通信
wait+notify
線程間通信可以通過共享變量+wait()¬ify()來實現(xiàn)
wait()將線程進入阻塞狀態(tài)三圆,notify()將線程喚醒
當多線程競爭訪問對象的同步方法時狞换,鎖對象會關(guān)聯(lián)一個底層的Monitor對象(重量級鎖的實現(xiàn))
如下圖所示 Thread0,1先競爭到鎖執(zhí)行了代碼后避咆,2,3,4,5線程同時來執(zhí)行臨界區(qū)的代碼,開始競爭鎖
- Thread-0先獲取到對象的鎖,關(guān)聯(lián)到monitor的owner修噪,同步代碼塊內(nèi)調(diào)用了鎖對象的wait()方法查库,調(diào)用后會進入waitSet等待,Thread-1同樣如此黄琼,此時Thread-0的狀態(tài)為Waitting
- Thread2樊销、3、4脏款、5同時競爭围苫,2獲取到鎖后,關(guān)聯(lián)了monitor的owner撤师,3剂府、4、5只能進入EntryList中等待剃盾,此時2線程狀態(tài)為 Runnable腺占,3、4痒谴、5狀態(tài)為Blocked
- 2執(zhí)行后衰伯,喚醒entryList中的線程,3积蔚、4意鲸、5進行競爭鎖,獲取到的線程即會關(guān)聯(lián)monitor的owner
- 3尽爆、4怎顾、5線程在執(zhí)行過程中,調(diào)用了鎖對象的notify()或notifyAll()時教翩,會喚醒waitSet的線程杆勇,喚醒的線程進入entryList等待重新競爭鎖
注意:
Blocked狀態(tài)和Waitting狀態(tài)都是阻塞狀態(tài)
Blocked線程會在owner線程釋放鎖時喚醒
wait和notify使用場景是必須要有同步,且必須獲得對象的鎖才能調(diào)用,使用鎖對象去調(diào)用,否則會拋異常
- wait() 釋放鎖 進入 waitSet 可傳入時間饱亿,如果指定時間內(nèi)未被喚醒 則自動喚醒
- notify()隨機喚醒一個waitSet里的線程
- notifyAll()喚醒waitSet中所有的線程
static final Object lock = new Object();
new Thread(() -> {
synchronized (lock) {
log.info("開始執(zhí)行");
try {
// 同步代碼內(nèi)部才能調(diào)用
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("繼續(xù)執(zhí)行核心邏輯");
}
}, "t1").start();
new Thread(() -> {
synchronized (lock) {
log.info("開始執(zhí)行");
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("繼續(xù)執(zhí)行核心邏輯");
}
}, "t2").start();
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("開始喚醒");
synchronized (lock) {
// 同步代碼內(nèi)部才能調(diào)用
lock.notifyAll();
}
// 執(zhí)行結(jié)果
14:29:47.138 [t1] INFO TestWaitNotify - 開始執(zhí)行
14:29:47.141 [t2] INFO TestWaitNotify - 開始執(zhí)行
14:29:49.136 [main] INFO TestWaitNotify - 開始喚醒
14:29:49.136 [t2] INFO TestWaitNotify - 繼續(xù)執(zhí)行核心邏輯
14:29:49.136 [t1] INFO TestWaitNotify - 繼續(xù)執(zhí)行核心邏輯
wait 和 sleep的區(qū)別?
二者都會讓線程進入阻塞狀態(tài),有以下區(qū)別
- wait是Object的方法 sleep是Thread的方法
- wait會立即釋放鎖 sleep不會釋放鎖
- wait后線程的狀態(tài)是Watting sleep后線程的狀態(tài)為 Time_Waiting
park&unpark
LockSupport是juc下的工具類闰靴,提供了park和unpark方法彪笼,可以實現(xiàn)線程通信
與wait和notity相比的不同點
- wait 和notify需要獲取對象鎖 park unpark不要
- unpark 可以指定喚醒線程 notify隨機喚醒
- park和unpark的順序可以先unpark wait和notify的順序不能顛倒
生產(chǎn)者消費者模型
指的是有生產(chǎn)者來生產(chǎn)數(shù)據(jù),消費者來消費數(shù)據(jù)蚂且,生產(chǎn)者生產(chǎn)滿了就不生產(chǎn)了配猫,通知消費者取,等消費了再進行生產(chǎn)杏死。
消費者消費不到了就不消費了泵肄,通知生產(chǎn)者生產(chǎn)捆交,生產(chǎn)到了再繼續(xù)消費。
public static void main(String[] args) throws InterruptedException {
MessageQueue queue = new MessageQueue(2);
// 三個生產(chǎn)者向隊列里存值
for (int i = 0; i < 3; i++) {
int id = i;
new Thread(() -> {
queue.put(new Message(id, "值" + id));
}, "生產(chǎn)者" + i).start();
}
Thread.sleep(1000);
// 一個消費者不停的從隊列里取值
new Thread(() -> {
while (true) {
queue.take();
}
}, "消費者").start();
}
}
// 消息隊列被生產(chǎn)者和消費者持有
class MessageQueue {
private LinkedList<Message> list = new LinkedList<>();
// 容量
private int capacity;
public MessageQueue(int capacity) {
this.capacity = capacity;
}
/**
* 生產(chǎn)
*/
public void put(Message message) {
synchronized (list) {
while (list.size() == capacity) {
log.info("隊列已滿腐巢,生產(chǎn)者等待");
try {
list.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
list.addLast(message);
log.info("生產(chǎn)消息:{}", message);
// 生產(chǎn)后通知消費者
list.notifyAll();
}
}
public Message take() {
synchronized (list) {
while (list.isEmpty()) {
log.info("隊列已空品追,消費者等待");
try {
list.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Message message = list.removeFirst();
log.info("消費消息:{}", message);
// 消費后通知生產(chǎn)者
list.notifyAll();
return message;
}
}
}
// 消息
class Message {
private int id;
private Object value;
}
同步鎖案例
為了更形象的表達加同步鎖的概念,這里舉一個生活中的例子冯丙,盡量把以上的概念具體化出來肉瓦。
這里舉一個每個人非常感興趣的一件東西。 錢N赶АE⒗颉!(馬老師除外)船殉。
現(xiàn)實中鲫趁,我們?nèi)ャy行門口的自動取款機取錢,取款機的錢就是共享變量利虫,為了保障安全挨厚,不可能兩個陌生人同時進入同一個取款機內(nèi)取錢,所以只能一個人進入取錢列吼,然后鎖上取款機的門幽崩,其他人只能在取款機門口等待。
取款機有多個寞钥,里面的錢互不影響慌申,鎖也有多個(多個對象鎖),取錢人在多個取款機里同時取錢也沒有安全問題理郑。
假如每個取錢的陌生人都是線程蹄溉,當取錢人進入取款機鎖了門后(線程獲得鎖),取到錢后出門(線程釋放鎖)您炉,下一個人競爭到鎖來取錢柒爵。
假設(shè)工作人員也是一個線程,如果取錢人進入后發(fā)現(xiàn)取款機錢不足了,這時通知工作人員來向取款機里加錢(調(diào)用notifyAll方法)赚爵,取錢人暫停取錢棉胀,進入銀行大堂阻塞等待(調(diào)用wait方法)。
銀行大堂里的工作人員和取錢人都被喚醒冀膝,重新競爭鎖唁奢,進入后如果是取錢人,由于取款機沒錢窝剖,還得進入銀行大堂等待麻掸。
當工作人員獲得取款機的鎖進入后,加了錢后會通知大廳里的人來取錢(調(diào)用notifyAll方法)赐纱。自己暫停加錢脊奋,進入銀行大堂等待喚醒加錢(調(diào)用wait方法)熬北。
這時大堂里等待的人都來競爭鎖,誰獲取到誰進入繼續(xù)取錢诚隙。
和現(xiàn)實中不同的就是這里沒有排隊的概念讶隐,誰搶到鎖誰進去取。
ReentrantLock
可重入鎖 : 一個線程獲取到對象的鎖后最楷,執(zhí)行方法內(nèi)部在需要獲取鎖的時候是可以獲取到的整份。如以下代碼
private static final ReentrantLock LOCK = new ReentrantLock();
private static void m() {
LOCK.lock();
try {
log.info("begin");
// 調(diào)用m1()
m1();
} finally {
// 注意鎖的釋放
LOCK.unlock();
}
}
public static void m1() {
LOCK.lock();
try {
log.info("m1");
m2();
} finally {
// 注意鎖的釋放
LOCK.unlock();
}
}
synchronized 也是可重入鎖,ReentrantLock有以下優(yōu)點
- 支持獲取鎖的超時時間
- 獲取鎖時可被打斷
- 可設(shè)為公平鎖
- 可以有不同的條件變量籽孙,即有多個waitSet烈评,可以指定喚醒
api
// 默認非公平鎖,參數(shù)傳true 表示未公平鎖
ReentrantLock lock = new ReentrantLock(false);
// 嘗試獲取鎖
lock()
// 釋放鎖 應(yīng)放在finally塊中 必須執(zhí)行到
unlock()
try {
// 獲取鎖時可被打斷,阻塞中的線程可被打斷
LOCK.lockInterruptibly();
} catch (InterruptedException e) {
return;
}
// 嘗試獲取鎖 獲取不到就返回false
LOCK.tryLock()
// 支持超時時間 一段時間沒獲取到就返回false
tryLock(long timeout, TimeUnit unit)
// 指定條件變量 休息室 一個鎖可以創(chuàng)建多個休息室
Condition waitSet = ROOM.newCondition();
// 釋放鎖 進入waitSet等待 釋放后其他線程可以搶鎖
yanWaitSet.await()
// 喚醒具體休息室的線程 喚醒后 重寫競爭鎖
yanWaitSet.signal()
實例:一個線程輸出a犯建,一個線程輸出b讲冠,一個線程輸出c,abc按照順序輸出适瓦,連續(xù)輸出5次
這個考的就是線程的通信竿开,利用 wait()/notify()和控制變量可以實現(xiàn),此處使用ReentrantLock即可實現(xiàn)該功能玻熙。
public static void main(String[] args) {
AwaitSignal awaitSignal = new AwaitSignal(5);
// 構(gòu)建三個條件變量
Condition a = awaitSignal.newCondition();
Condition b = awaitSignal.newCondition();
Condition c = awaitSignal.newCondition();
// 開啟三個線程
new Thread(() -> {
awaitSignal.print("a", a, b);
}).start();
new Thread(() -> {
awaitSignal.print("b", b, c);
}).start();
new Thread(() -> {
awaitSignal.print("c", c, a);
}).start();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
awaitSignal.lock();
try {
// 先喚醒a
a.signal();
} finally {
awaitSignal.unlock();
}
}
}
class AwaitSignal extends ReentrantLock {
// 循環(huán)次數(shù)
private int loopNumber;
public AwaitSignal(int loopNumber) {
this.loopNumber = loopNumber;
}
/**
* @param print 輸出的字符
* @param current 當前條件變量
* @param next 下一個條件變量
*/
public void print(String print, Condition current, Condition next) {
for (int i = 0; i < loopNumber; i++) {
lock();
try {
try {
// 獲取鎖之后等待
current.await();
System.out.print(print);
} catch (InterruptedException e) {
}
next.signal();
} finally {
unlock();
}
}
}
死鎖
說到死鎖,先舉個例子否彩,
下面是代碼實現(xiàn)
static Beer beer = new Beer();
static Story story = new Story();
public static void main(String[] args) {
new Thread(() ->{
synchronized (beer){
log.info("我有酒,給我故事");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (story){
log.info("小王開始喝酒講故事");
}
}
},"小王").start();
new Thread(() ->{
synchronized (story){
log.info("我有故事嗦随,給我酒");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (beer){
log.info("老王開始喝酒講故事");
}
}
},"老王").start();
}
class Beer {
}
class Story{
}
死鎖導(dǎo)致程序無法正常運行下去
檢測工具可以檢查到死鎖信息
java內(nèi)存模型(JMM)
jmm 體現(xiàn)在以下三個方面
- 原子性 保證指令不會受到上下文切換的影響
- 可見性 保證指令不會受到cpu緩存的影響
- 有序性 保證指令不會受并行優(yōu)化的影響
可見性
停不下來的程序
static boolean run = true;
public static void main(String[] args) throws InterruptedException {
Thread t = new Thread(() -> {
while (run) {
// ....
}
});
t.start();
Thread.sleep(1000);
// 線程t不會如預(yù)想的停下來
run = false;
}
如上圖所示列荔,線程有自己的工作緩存,當主線程修改了變量并同步到主內(nèi)存時枚尼,t線程沒有讀取到贴浙,所以程序停不下來
有序性
JVM在不影響程序正確性的情況下可能會調(diào)整語句的執(zhí)行順序,該情況也稱為 指令重排序
static int i;
static int j;
// 在某個線程內(nèi)執(zhí)行如下賦值操作
i = ...;
j = ...;
有可能將j先賦值
原子性
原子性大家應(yīng)該比較熟悉署恍,上述同步鎖的synchronized代碼塊就是保證了原子性崎溃,就是一段代碼是一個整體,原子性保證了線程安全盯质,不會受到上下文切換的影響袁串。
volatile
該關(guān)鍵字解決了可見性和有序性,volatile通過內(nèi)存屏障來實現(xiàn)的
- 寫屏障
會在對象寫操作之后加寫屏障呼巷,會對寫屏障的之前的數(shù)據(jù)都同步到主存般婆,并且保證寫屏障的執(zhí)行順序在寫屏障之前
- 讀屏障
會在對象讀操作之前加讀屏障,會在讀屏障之后的語句都從主存讀朵逝,并保證讀屏障之后的代碼執(zhí)行在讀屏障之后
注意: volatile不能解決原子性,即不能通過該關(guān)鍵字實現(xiàn)線程安全乡范。
volatile應(yīng)用場景:一個線程讀取變量配名,另外的線程操作變量啤咽,加了該關(guān)鍵字后保證寫變量后,讀變量的線程可以及時感知渠脉。
無鎖-cas
cas (compare and swap) 比較并交換
為變量賦值時宇整,從內(nèi)存中讀取到的值v,獲取到要交換的新值n芋膘,執(zhí)行 compareAndSwap()方法時鳞青,比較v和當前內(nèi)存中的值是否一致,如果一致則將n和v交換为朋,如果不一致臂拓,則自旋重試。
cas底層是cpu層面的习寸,即不使用同步鎖也可以保證操作的原子性胶惰。
private AtomicInteger balance;
// 模擬cas的具體操作
@Override
public void withdraw(Integer amount) {
while (true) {
// 獲取當前值
int pre = balance.get();
// 進行操作后得到新值
int next = pre - amount;
// 比較并設(shè)置成功 則中斷 否則自旋重試
if (balance.compareAndSet(pre, next)) {
break;
}
}
}
無鎖的效率是要高于之前的鎖的,由于無鎖不會涉及線程的上下文切換
cas是樂觀鎖的思想霞溪,sychronized是悲觀鎖的思想
cas適合很少有線程競爭的場景孵滞,如果競爭很強,重試經(jīng)常發(fā)生鸯匹,反而降低效率
juc并發(fā)包下包含了實現(xiàn)了cas的原子類
- AtomicInteger/AtomicBoolean/AtomicLong
- AtomicIntegerArray/AtomicLongArray/AtomicReferenceArray
- AtomicReference/AtomicStampedReference/AtomicMarkableReference
AtomicInteger
常用api
new AtomicInteger(balance)
get()
compareAndSet(pre, next)
// i.incrementAndGet() ++i
// i.decrementAndGet() --i
// i.getAndIncrement() i++
// i.getAndDecrement() ++i
i.addAndGet()
// 傳入函數(shù)式接口 修改i
int getAndUpdate(IntUnaryOperator updateFunction)
// cas 的核心方法
compareAndSet(int expect, int update)
ABA問題
cas存在ABA問題坊饶,即比較并交換時,如果原值為A,有其他線程將其修改為B殴蓬,在有其他線程將其修改為A匿级。
此時實際發(fā)生過交換,但是比較和交換由于值沒改變可以交換成功
解決方式
AtomicStampedReference/AtomicMarkableReference
上面兩個類解決ABA問題科雳,原理就是為對象增加版本號,每次修改時增加版本號根蟹,就可以避免ABA問題
或者增加個布爾變量標識,修改后調(diào)整布爾變量值糟秘,也可以避免ABA問題
線程池
線程池的介紹
線程池是java并發(fā)最重要的一個知識點简逮,也是難點,是實際應(yīng)用最廣泛的尿赚。
線程的資源很寶貴散庶,不可能無限的創(chuàng)建,必須要有管理線程的工具凌净,線程池就是一種管理線程的工具悲龟,java開發(fā)中經(jīng)常有池化的思想,如 數(shù)據(jù)庫連接池、Redis連接池等毯盈。
預(yù)先創(chuàng)建好一些線程嗜闻,任務(wù)提交時直接執(zhí)行乏冀,既可以節(jié)約創(chuàng)建線程的時間轻腺,又可以控制線程的數(shù)量乐疆。
線程池的好處
- 降低資源消耗,通過池化思想贬养,減少創(chuàng)建線程和銷毀線程的消耗挤土,控制資源
- 提高響應(yīng)速度,任務(wù)到達時误算,無需創(chuàng)建線程即可運行
- 提供更多更強大的功能仰美,可擴展性高
線程池的構(gòu)造方法
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
}
構(gòu)造器參數(shù)的意義
參數(shù)名 | 參數(shù)意義 |
---|---|
corePoolSize | 核心線程數(shù) |
maximumPoolSize | 最大線程數(shù) |
keepAliveTime | 救急線程的空閑時間 |
unit | 救急線程的空閑時間單位 |
workQueue | 阻塞隊列 |
threadFactory | 創(chuàng)建線程的工廠,主要定義線程名 |
handler | 拒絕策略 |
線程池案例
下面 我們通過一個實例來理解線程池的參數(shù)以及線程池的接收任務(wù)的過程
如上圖 銀行辦理業(yè)務(wù)儿礼。
- 客戶到銀行時咖杂,開啟柜臺進行辦理,柜臺相當于線程蜘犁,客戶相當于任務(wù)翰苫,有兩個是常開的柜臺,三個是臨時柜臺这橙。2就是核心線程數(shù)奏窑,5是最大線程數(shù)。即有兩個核心線程
- 當柜臺開到第二個后屈扎,都還在處理業(yè)務(wù)埃唯。客戶再來就到排隊大廳排隊鹰晨。排隊大廳只有三個座位墨叛。
- 排隊大廳坐滿時,再來客戶就繼續(xù)開柜臺處理模蜡,目前最大有三個臨時柜臺漠趁,也就是三個救急線程
- 此時再來客戶,就無法正常為其 提供業(yè)務(wù)忍疾,采用拒絕策略來處理它們
- 當柜臺處理完業(yè)務(wù)闯传,就會從排隊大廳取任務(wù),當柜臺隔一段空閑時間都取不到任務(wù)時卤妒,如果當前線程數(shù)大于核心線程數(shù)時甥绿,就會回收線程。即撤銷該柜臺则披。
線程池的狀態(tài)
線程池通過一個int變量的高3位來表示線程池的狀態(tài)共缕,低29位來存儲線程池的數(shù)量
狀態(tài)名稱 | 高三位 | 接收新任務(wù) | 處理阻塞隊列任務(wù) | 說明 |
---|---|---|---|---|
Running | 111 | Y | Y | 正常接收任務(wù),正常處理任務(wù) |
Shutdown | 000 | N | Y | 不會接收任務(wù),會執(zhí)行完正在執(zhí)行的任務(wù),也會處理阻塞隊列里的任務(wù) |
stop | 001 | N | N | 不會接收任務(wù)士复,會中斷正在執(zhí)行的任務(wù),會放棄處理阻塞隊列里的任務(wù) |
Tidying | 010 | N | N | 任務(wù)全部執(zhí)行完畢图谷,當前活動線程是0,即將進入終結(jié) |
Termitted | 011 | N | N | 終結(jié)狀態(tài) |
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
線程池的主要流程
線程池創(chuàng)建、接收任務(wù)蜓萄、執(zhí)行任務(wù)隅茎、回收線程的步驟
- 創(chuàng)建線程池后,線程池的狀態(tài)是Running嫉沽,該狀態(tài)下才能有下面的步驟
- 提交任務(wù)時,線程池會創(chuàng)建線程去處理任務(wù)
- 當線程池的工作線程數(shù)達到corePoolSize時俏竞,繼續(xù)提交任務(wù)會進入阻塞隊列
- 當阻塞隊列裝滿時绸硕,繼續(xù)提交任務(wù),會創(chuàng)建救急線程來處理
- 當線程池中的工作線程數(shù)達到maximumPoolSize時魂毁,會執(zhí)行拒絕策略
- 當線程取任務(wù)的時間達到keepAliveTime還沒有取到任務(wù)玻佩,工作線程數(shù)大于corePoolSize時,會回收該線程
注意: 不是剛創(chuàng)建的線程是核心線程席楚,后面創(chuàng)建的線程是非核心線程咬崔,線程是沒有核心非核心的概念的,這是我長期以來的誤解烦秩。
拒絕策略
- 調(diào)用者拋出RejectedExecutionException (默認策略)
- 讓調(diào)用者運行任務(wù)
- 丟棄此次任務(wù)
- 丟棄阻塞隊列中最早的任務(wù)垮斯,加入該任務(wù)
提交任務(wù)的方法
// 執(zhí)行Runnable
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
// 提交Callable
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
// 內(nèi)部構(gòu)建FutureTask
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
// 提交Runnable,指定返回值
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
// 內(nèi)部構(gòu)建FutureTask
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
// 提交Runnable,指定返回值
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
// 內(nèi)部構(gòu)建FutureTask
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
Execetors創(chuàng)建線程池
注意: 下面幾種方式都不推薦使用
1.newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
- 核心線程數(shù) = 最大線程數(shù) 沒有救急線程
- 阻塞隊列無界 可能導(dǎo)致oom
2.newCachedThreadPool
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
- 核心線程數(shù)是0,最大線程數(shù)無限制 只祠,救急線程60秒回收
- 隊列采用 SynchronousQueue 實現(xiàn) 沒有容量兜蠕,即放入隊列后沒有線程來取就放不進去
- 可能導(dǎo)致線程數(shù)過多,cpu負擔(dān)太大
3.newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
- 核心線程數(shù)和最大線程數(shù)都是1抛寝,沒有救急線程熊杨,無界隊列 可以不停的接收任務(wù)
- 將任務(wù)串行化 一個個執(zhí)行, 使用包裝類是為了屏蔽修改線程池的一些參數(shù) 比如 corePoolSize
- 如果某線程拋出異常了盗舰,會重新創(chuàng)建一個線程繼續(xù)執(zhí)行
- 可能造成oom
4.newScheduledThreadPool
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
- 任務(wù)調(diào)度的線程池 可以指定延遲時間調(diào)用晶府,可以指定隔一段時間調(diào)用
線程池的關(guān)閉
shutdown()
會讓線程池狀態(tài)為shutdown,不能接收任務(wù)钻趋,但是會將工作線程和阻塞隊列里的任務(wù)執(zhí)行完 相當于優(yōu)雅關(guān)閉
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
shutdownNow()
會讓線程池狀態(tài)為stop川陆, 不能接收任務(wù),會立即中斷執(zhí)行中的工作線程爷绘,并且不會執(zhí)行阻塞隊列里的任務(wù)书劝, 會返回阻塞隊列的任務(wù)列表
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
線程池的正確使用姿勢
線程池難就難在參數(shù)的配置,有一套理論配置參數(shù)
cpu密集型 : 指的是程序主要發(fā)生cpu的運算
? 核心線程數(shù): CPU核心數(shù)+1
IO密集型: 遠程調(diào)用RPC土至,操作數(shù)據(jù)庫等购对,不需要使用cpu進行大量的運算。 大多數(shù)應(yīng)用的場景
? 核心線程數(shù)=核數(shù)*cpu期望利用率 *總時間/cpu運算時間
但是基于以上理論還是很難去配置陶因,因為cpu運算時間不好估算
實際配置大小可參考下表
cpu密集型 | io密集型 | |
---|---|---|
線程數(shù)數(shù)量 | 核數(shù)<=x<=核數(shù)*2 | 核心數(shù)*50<=x<=核心數(shù) *100 |
隊列長度 | y>=100 | 1<=y<=10 |
1.線程池參數(shù)通過分布式配置骡苞,修改配置無需重啟應(yīng)用
線程池參數(shù)是根據(jù)線上的請求數(shù)變化而變化的,最好的方式是 核心線程數(shù)、最大線程數(shù) 隊列大小都是可配置的
主要配置 corePoolSize maxPoolSize queueSize
java提供了可方法覆蓋參數(shù)解幽,線程池內(nèi)部會處理好參數(shù) 進行平滑的修改
public void setCorePoolSize(int corePoolSize) {
}
2.增加線程池的監(jiān)控
3.io密集型可調(diào)整為先新增任務(wù)到最大線程數(shù)后再將任務(wù)放到阻塞隊列
代碼 主要可重寫阻塞隊列 加入任務(wù)的方法
public boolean offer(Runnable runnable) {
if (executor == null) {
throw new RejectedExecutionException("The task queue does not have executor!");
}
final ReentrantLock lock = this.lock;
lock.lock();
try {
int currentPoolThreadSize = executor.getPoolSize();
// 如果提交任務(wù)數(shù)小于當前創(chuàng)建的線程數(shù), 說明還有空閑線程,
if (executor.getTaskCount() < currentPoolThreadSize) {
// 將任務(wù)放入隊列中贴见,讓線程去處理任務(wù)
return super.offer(runnable);
}
// 核心改動
// 如果當前線程數(shù)小于最大線程數(shù),則返回 false 躲株,讓線程池去創(chuàng)建新的線程
if (currentPoolThreadSize < executor.getMaximumPoolSize()) {
return false;
}
// 否則片部,就將任務(wù)放入隊列中
return super.offer(runnable);
} finally {
lock.unlock();
}
}
3.拒絕策略 建議使用tomcat的拒絕策略(給一次機會)
// tomcat的源碼
@Override
public void execute(Runnable command) {
if ( executor != null ) {
try {
executor.execute(command);
} catch (RejectedExecutionException rx) {
// 捕獲到異常后 在從隊列獲取,相當于重試1取不到任務(wù) 在執(zhí)行拒絕任務(wù)
if ( !( (TaskQueue) executor.getQueue()).force(command) ) throw new RejectedExecutionException("Work queue full.");
}
} else throw new IllegalStateException("StandardThreadPool not started.");
}
建議修改從隊列取任務(wù)的方式: 增加超時時間霜定,超時1分鐘取不到在進行返回
public boolean offer(E e, long timeout, TimeUnit unit){}
結(jié)語
工作三四年了档悠,還沒有正式的寫過博客,自學(xué)一直都是通過筆記的方式積累望浩,最近重新學(xué)了一下java多線程辖所,想著周末把這部分內(nèi)容認真的寫篇博客分享出去。
文章篇幅較長磨德,給看到這里的小伙伴點個大大的贊!由于作者水平有限缘回,加之第一次寫博客,文章中難免會有錯誤之處典挑,歡迎小伙伴們反饋指正酥宴。
如果覺得文章對你有幫助,麻煩 點贊、評論搔弄、轉(zhuǎn)發(fā)幅虑、在看 走起
你的支持是我最大的動力!9擞獭倒庵!