并發(fā)編程是提高程序運(yùn)行效率與響應(yīng)速度的重要手段,在多CPU條件下字管,并發(fā)編程可以使硬件得到更大程度的運(yùn)用黄橘。由于在并發(fā)環(huán)境下CPU隨時(shí)會(huì)對(duì)多線程的運(yùn)行進(jìn)行調(diào)度,因此線程中各指令執(zhí)行的順序是不確定的悼沈,出現(xiàn)問題時(shí)也難以復(fù)現(xiàn)和定位贱迟。如果開發(fā)人員了解并發(fā)的原理,就能在有并發(fā)問題隱患的地方妥善處理來規(guī)避風(fēng)險(xiǎn)絮供。
并發(fā)的知識(shí)體系很龐大衣吠,涉及到內(nèi)存模型、并發(fā)容器杯缺、線程池等一系列知識(shí)點(diǎn)蒸播,優(yōu)秀并發(fā)程序?qū)π阅芘c活躍性也有較高的要求,因此想要吃透并發(fā)并不是一件容易的事萍肆。想要寫好并發(fā)程序袍榆,不僅需要對(duì)并發(fā)的原理有所了解,更需要工程上的實(shí)踐塘揣,萬丈高樓平地起包雀,下面讓我們來一起探索并發(fā)編程。
一亲铡、原子性才写、可見性、順序性問題
1.1 原子性
提到并發(fā)奖蔓,有一個(gè)很經(jīng)典的例子就是同時(shí)啟動(dòng)2個(gè)線程對(duì)一個(gè)數(shù)循環(huán)+1赞草,程序如下所示。線程t1和t2都會(huì)循環(huán)1萬次a++吆鹤,理想狀態(tài)下a最終的值應(yīng)該是20000厨疙,但是運(yùn)行之后a的值總是小于20000,并且每次的結(jié)果都不盡相同疑务,這是為什么呢沾凄?
public class Test {
public static int a = 0;
public static void main(String[] args) {
Runnable r = () -> {
for (int i = 0; i < 10000; i++) {
a++;
}
};
Thread t1 = new Thread(r);
Thread t2 = new Thread(r);
t1.start();
t2.start();
try {
// 程序本身運(yùn)行在主線程, 這里需要讓主線程等待t1和t2運(yùn)行完再獲取a的值
t1.join();
t2.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("a: " + a);
}
}
這里涉及到2個(gè)問題:
- 一是原子性問題。先明確原子操作的定義:如果某個(gè)操作要么不執(zhí)行知允,要么完全執(zhí)行且不會(huì)被打斷撒蟀,則將其稱為原子操作。上述程序中我們將
a++
當(dāng)成了原子操作温鸽,而它實(shí)際由多個(gè)原子操作組成保屯,本身并不是原子操作。 - 二是可見性問題。線程t1修改了a的值后配椭,t2可能感知不到虫溜。
可見性問題1.2中會(huì)講,這里我們先討論原子性問題股缸。a++
看起來是一個(gè)原子操作衡楞,而實(shí)際上a++
需要3條CPU指令完成,這里的CPU指令才具備原子性敦姻。
指令1: 將變量a從內(nèi)存加載到CPU寄存器
指令2: 寄存器修改a的值為a+1
指令3: 將結(jié)果寫入內(nèi)存
并發(fā)情況下瘾境,線程切換可能發(fā)生在任一CPU指令執(zhí)行完的時(shí)候。例如當(dāng)t1和t2一起運(yùn)行時(shí)镰惦,可能出現(xiàn)下面的情況迷守。
t1線程讀到a的值為0
t1線程對(duì)寄存器中的值+1得到1
----------線程切換----------
t2線程從內(nèi)存中讀到a的值為0
t2線程對(duì)寄存器的值+1得到1
t2線程將1寫入內(nèi)存
----------線程切換----------
t1線程將1寫入內(nèi)存
可以發(fā)現(xiàn)在并發(fā)環(huán)境下,某個(gè)線程從內(nèi)存中讀取到的值可能不是最新數(shù)據(jù)旺入,這也解釋了為什么程序的結(jié)果總是小于20000兑凿。要解決這個(gè)問題,只需要將a++
變?yōu)樵硬僮骷纯梢瘃ㄟ^synchronized
關(guān)鍵字即可使某塊代碼具備原子性礼华,該代碼塊也被稱為同步代碼塊,如下所示拗秘。
synchronized (Test.class) {
a++;
}
該同步代碼塊以Test.class作為互斥鎖圣絮,任何線程在進(jìn)入該代碼塊之前需要先獲取該鎖,如果獲取不到則需等待當(dāng)前持有該鎖的線程釋放鎖雕旨“缃常可以理解為,鎖是用來保護(hù)共享變量的凡涩,這里的Test.class就是用來保護(hù)共享變量a同一時(shí)刻只能被一個(gè)線程訪問的鎖棒搜。
那么如果新增一個(gè)與a無關(guān)的共享變量b,是否也可以使用Test.class來保護(hù)呢活箕?
答案是否定的帮非,如果用同一個(gè)鎖來保護(hù)它們,那么不管修改a或b之前都要得到Test.class這個(gè)鎖讹蘑,導(dǎo)致訪問a的同時(shí)不能訪問b,但這兩個(gè)變量并無關(guān)聯(lián)筑舅,反而降低了運(yùn)行效率座慰。
不過可以通過另一個(gè)鎖來保護(hù)b,由于Java中的任何對(duì)象都可以作為鎖來使用翠拣,所以可以直接新建一個(gè)final對(duì)象來保護(hù)b版仔,如下所示。
public class Test {
public static int b = 0;
public static final Object lock = new Object();
public static void main(String[] args) {
Runnable r = () -> {
for (int i = 0; i < 10000; i++) {
synchronized (lock) {
b++;
}
}
};
......
}
}
1.2 可見性與CPU緩存
可見性問題是指在并發(fā)條件下,一個(gè)線程修改共享變量后蛮粮,另一個(gè)線程無法感知到共享變量的變化益缎。該問題是由CPU緩存引起的,由于CPU與內(nèi)存的運(yùn)行速度相差太多然想,為了平衡這個(gè)差距莺奔,CPU引入了高速緩存cache作為中間層,當(dāng)CPU從內(nèi)存中讀取數(shù)據(jù)時(shí)會(huì)將其讀入cache中变泄,之后CPU不用每次讀取內(nèi)存令哟,而是直接操作cache中的數(shù)據(jù),最后在合適的時(shí)機(jī)際將cache中的數(shù)據(jù)寫入內(nèi)存妨蛹,當(dāng)然這個(gè)時(shí)機(jī)對(duì)開發(fā)人員來說是不可控的屏富。
在單核CPU條件下,CPU整體只有一個(gè)cache蛙卤,因此多線程操作的也是同一個(gè)cache狠半,不會(huì)出現(xiàn)可見性問題。而在多核CPU條件下颤难,每核CPU都對(duì)應(yīng)一個(gè)cache神年,很有可能兩個(gè)線程讀寫的是各自的cache,由此產(chǎn)生了可見性問題乐严。
下面用一個(gè)例子說明可見性問題瘤袖,子線程t1在flag為true時(shí)會(huì)一直運(yùn)行,但是主線程會(huì)在500ms后將flag改為false昂验,看起來子線程只會(huì)運(yùn)行500ms捂敌,但事實(shí)是子線程會(huì)一直運(yùn)行下去。換句話說既琴,主線程修改flag的行為對(duì)子線程來說不可見占婉。
public class Test {
private static boolean flag = true;
private static int i = 0;
public static void main(String[] args) throws InterruptedException {
Runnable r1 = () -> {
while (flag) {
i++;
}
System.out.println("子線程結(jié)束");
};
Thread t1 = new Thread(r1);
t1.start();
Thread.sleep(500);
flag = false;
System.out.println("Main Thread結(jié)束");
}
}
要解決這個(gè)問題,使用volatile關(guān)鍵字修飾flag變量即可甫恩。
volatile關(guān)鍵字表示一個(gè)變量是易變的逆济,從抽象的角度明確了該變量的可見性。作為高級(jí)語言關(guān)鍵字磺箕,volatile屏蔽了底層硬件的不一致性奖慌,在不同的環(huán)境下,volatile關(guān)鍵字在底層可能具有不同的實(shí)現(xiàn)松靡,但是作為高級(jí)語言的開發(fā)者简僧,我們只需要理解volatile在抽象層面的定義即可。
之前提到雕欺,CPU高速緩存中的數(shù)據(jù)會(huì)在合適的時(shí)機(jī)被寫入內(nèi)存岛马,那么上面程序中的flag變量即使不加volatile棉姐,主線程對(duì)flag變量的修改也會(huì)在一段時(shí)間后寫入內(nèi)存。那為什么子線程一直感知不到flag的變化呢啦逆?
我們有理由猜測伞矩,子線程在while(flag)
循環(huán)中讀取的一直是CPU緩存中的flag變量,而沒有從內(nèi)存中重新獲取夏志。為了驗(yàn)證這個(gè)猜測乃坤,嘗試在while(flag)
循環(huán)中加上一些代碼,如下所示盲镶。
public class Test {
private static boolean flag = true;
private static int i = 0;
public static void main(String[] args) throws InterruptedException {
Runnable r1 = () -> {
while (flag) {
i++;
try {
Thread.sleep(50);
System.out.println("......");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("子線程結(jié)束");
};
Thread t1 = new Thread(r1);
t1.start();
Thread.sleep(500);
flag = false;
System.out.println("Main Thread結(jié)束");
}
}
運(yùn)行后發(fā)現(xiàn)子線程運(yùn)行約500ms后停止侥袜,表示子線程在某個(gè)時(shí)機(jī)獲取到了內(nèi)存中flag的值,印證了我們的猜測溉贿。這表明如果不加volatile關(guān)鍵字枫吧,雖然cache的值與內(nèi)存的值也會(huì)同步,但同步的時(shí)機(jī)是不確定的宇色,這也是很多并發(fā)bug難以溯源的原因九杂。
這時(shí)候你可能會(huì)對(duì)1.1的例子產(chǎn)生疑惑,雖然1.1中使用synchronized關(guān)鍵字使a++
具備了原子性宣蠕,不過并沒有使用volatile修飾變量a例隆,如果a不具備可見性的話,最終的結(jié)果也應(yīng)該小于預(yù)期值才對(duì)抢蚀。
但實(shí)際情況是镀层,該程序最終的結(jié)果都是正確的,這與Java內(nèi)存模型(JMM)有關(guān)皿曲,內(nèi)存模型保證原子操作中的變量具備可見性唱逢,第2節(jié)會(huì)闡述JMM相關(guān)內(nèi)容。
1.3 順序性與指令重排
指令重排是指編譯器為了提高程序的運(yùn)行效率屋休,在不影響運(yùn)行結(jié)果的前提下對(duì)CPU的指令重新排序坞古。即使指令之間存在依賴關(guān)系居凶,編譯器也會(huì)保證運(yùn)行結(jié)果不受影響揩局。
在單線程運(yùn)行環(huán)境下,指令重排確實(shí)不會(huì)影響運(yùn)行結(jié)果凹髓,但在多線程環(huán)境下它是存在一定隱患的叠艳。拿雙重判空的單例模式來舉例奶陈,其代碼如下。假設(shè)A線程和B線程同時(shí)運(yùn)行到同步代碼塊附较,A線程成功獲取到鎖并新建Singleton尿瞭,A線程釋放鎖后B線程搶占到鎖進(jìn)入同步代碼塊,隨后B線程發(fā)現(xiàn)單例已經(jīng)初始化就退出翅睛。如果沒有指令重排声搁,這一段代碼確實(shí)沒有任何問題。
public class Singleton {
private static Singleton sInstance;
public static Singleton getInstance() {
if (sInstance == null) {
synchronized(Singleton.class) {
if (sInstance == null)
sInstance = new Singleton();
}
}
return instance;
}
}
但是sInstance= new Singleton()
是可能會(huì)被指令重排的捕发,正常來說這句代碼的指令的執(zhí)行順序是這樣的疏旨。
指令1: 分配一塊內(nèi)存
指令2: 在內(nèi)存上初始化Singleton對(duì)象
指令3: 將內(nèi)存地址賦值給sInstance變量
指令重排后的執(zhí)行順序可能是這樣的。
指令1: 分配一塊內(nèi)存
指令2: 將內(nèi)存的地址賦值給sInstance變量
指令3: 在內(nèi)存上初始化Singleton對(duì)象
假設(shè)A線程執(zhí)行到指令2處時(shí)被掛起扎酷,而B線程進(jìn)入getInstance()
方法檐涝,在第一個(gè)判空處發(fā)現(xiàn)sInstance不為空就會(huì)直接返回,但此時(shí)sInstance指向的內(nèi)存并沒有初始化法挨,訪問sInstance時(shí)很可能出現(xiàn)問題谁榜。因此需要使用volatile修飾sInstance來禁止相關(guān)的指令重排。
那么有沒有一種更簡單的實(shí)現(xiàn)單例的方法呢凡纳?自然是有的窃植,如果一個(gè)單例對(duì)象在系統(tǒng)運(yùn)行中肯定會(huì)被使用,在類初始化的時(shí)候直接新建單例的實(shí)例對(duì)象即可荐糜,不用考慮并發(fā)情況巷怜,其實(shí)現(xiàn)如下。
public class Singleton {
private static class Holder {
private static Singleton INSTANCE = new Singleton();
}
private Singleton() {
}
public static Singleton getInstance() {
return Holder.INSTANCE;
}
二暴氏、Java內(nèi)存模型
并發(fā)場景下的原子性延塑、可見性和順序性問題會(huì)導(dǎo)致程序的運(yùn)行結(jié)果不可預(yù)測,為了解決這些問題答渔,Java內(nèi)存模型(JMM)提供了volatile和synchronized關(guān)鍵字关带,其中volatile關(guān)鍵字用于實(shí)現(xiàn)共享變量的可見性與限制重排序,synchronized關(guān)鍵字用于實(shí)現(xiàn)代碼塊的原子性沼撕。JMM提供了6條Happens-Before規(guī)則來描述這兩個(gè)關(guān)鍵字的作用宋雏,以便給開發(fā)人員提供指導(dǎo)。
Happens-Before是指端朵,如果操作A Happens-Before 操作B好芭,則表示在內(nèi)存順序上,A的結(jié)果對(duì)B是可見的冲呢。Happens-Before的具體規(guī)則如下舍败。
① 對(duì)synchronized互斥鎖的解鎖Happens-Before對(duì)該互斥鎖的后續(xù)加鎖
② 對(duì)volatile變量的寫操作Happens-Before對(duì)該變量的后續(xù)讀操作
③ 主線程調(diào)用子線程start()
方法前的操作Happens-Before該子線程的start()
方法
④ 一個(gè)線程內(nèi)的所有操作Happens-Before其他線程成功join()
該線程
⑤ 任意對(duì)象的默認(rèn)初始化Happens-Before程序的其他操作
⑥ Happens-Before具有可傳遞性,如果A Happens-Before B敬拓,B Happens-Before C邻薯,則A Happens-Before C
回頭看1.1,代碼中使用synchronized關(guān)鍵字使a++具備了原子性乘凸,同時(shí)a變量也具備了可見性厕诡。這是因?yàn)榛コ怄i的解鎖Happens-Before后續(xù)加鎖,因此之后拿到互斥鎖的線程都知道上個(gè)線程對(duì)a的操作結(jié)果营勤。
三灵嫌、互斥鎖
互斥鎖用于保證代碼塊的原子性壹罚,實(shí)際是用于保護(hù)一個(gè)或一系列共享變量同一時(shí)刻只能被一個(gè)線程訪問。當(dāng)某個(gè)線程進(jìn)入同步代碼塊時(shí)寿羞,該線程需要先獲取同步代碼塊的鎖猖凛,退出同步代碼塊時(shí)則釋放鎖。如果線程嘗試獲取鎖時(shí)發(fā)現(xiàn)鎖已被占用绪穆,則該線程被掛起辨泳,直到獲取鎖之后再進(jìn)入運(yùn)行態(tài)。
Java提供了synchronized和Lock兩種互斥鎖的實(shí)現(xiàn)玖院。synchronized是Java關(guān)鍵字菠红,屬于語言特性,但是在Java6之前它的效率并不高难菌。Lock系列由并發(fā)大師Doug Lea編寫试溯,在Java5時(shí)加入Java并發(fā)包。
3.1 synchronized
3.1.1 基本使用
synchronized用于修飾方法或代碼塊扔傅,它隱式地實(shí)現(xiàn)了加鎖/解鎖操作耍共,線程進(jìn)入方法或代碼塊時(shí)會(huì)自動(dòng)加鎖,退出時(shí)會(huì)自動(dòng)解鎖猎塞。其使用示例如下试读。
public class Sample {
private static final Object lock = new Object();
// 1. 修飾代碼塊
public void fun1() {
synchronized(lock) {
// ......
}
}
// 2. 修飾非靜態(tài)方法
public synchronized void fun2() {
// ......
}
// 3. 修飾靜態(tài)方法
public synchronized static void fun3() {
// ......
}
}
可以發(fā)現(xiàn)synchronized只有在修飾代碼塊時(shí)才需要指定互斥鎖,而修飾方法時(shí)的互斥鎖是Java隱性添加的荠耽。其規(guī)則為:修飾static方法時(shí)钩骇,互斥鎖為當(dāng)前類的class對(duì)象,也就是例子中的Sample.class铝量;而修飾非static方法時(shí)倘屹,互斥鎖為當(dāng)前的對(duì)象實(shí)例。
3.1.2 鎖的選擇
首先需要明確什么樣的對(duì)象適合作為互斥鎖慢叨,由于互斥鎖是用于保護(hù)共享變量的纽匙,那么互斥鎖的生命周期應(yīng)該與它保護(hù)的共享變量一致,且在運(yùn)行過程中不可再被賦值拍谐。
synchronized修飾方法時(shí)隱式添加的鎖就遵循了這樣的規(guī)則烛缔,當(dāng)synchronized修飾靜態(tài)方法時(shí),它的鎖為當(dāng)前類的class對(duì)象轩拨,該對(duì)象在程序開始運(yùn)行時(shí)就被創(chuàng)建践瓷,且不可更改。
以下面的程序?yàn)槔鋈兀瑂taticList作為靜態(tài)變量晕翠,它的生命周期是整個(gè)程序,與互斥鎖Sample.class的生命周期相同砍濒。
public class Sample {
private static List<String> staticList = new ArrayList<>();
public synchronized static void addString(String s) {
staticList.add(s);
}
}
當(dāng)synchronized修飾非靜態(tài)方法時(shí)淋肾,它的鎖為當(dāng)前對(duì)象this硫麻。因?yàn)榉庆o態(tài)方法用于操作非靜態(tài)成員,例如下面示例中的mList樊卓,而非靜態(tài)成員的生命周期就是當(dāng)前對(duì)象this的生命周期庶香。
public class Sample {
private List<String> mList;
public Sample() {
mList = new ArrayList<>();
}
public synchronized void addString(String s) {
mList.add(s);
}
}
需要注意的是,使用synchronized修飾static方法時(shí)简识,該類所有static方法的互斥鎖都是同一個(gè)對(duì)象,所以該類所有static方法都是互斥的感猛。如果各個(gè)static方法中需要保護(hù)的資源不一樣七扰,這樣只會(huì)影響運(yùn)行效率,使用synchronized修飾非靜態(tài)方法時(shí)同理陪白。
下方程序就使用了Sample.class這把鎖保護(hù)了兩個(gè)不同的資源颈走,那么該如何修改呢?
public class Sample {
private static List<String> staticList1 = new ArrayList<>();
private static List<String> staticList2 = new ArrayList<>();
public synchronized static void addString1(String s) {
staticList1.add(s);
}
public synchronized static void addString2(String s) {
staticList2.add(s);
}
}
一般來說咱士,互斥鎖應(yīng)與被保護(hù)的資源一一對(duì)應(yīng)立由,最簡單的方法就是將被保護(hù)的對(duì)象本身作為互斥鎖,如下所示序厉。
public class Sample {
private static final List<String> staticList1 = new ArrayList<>();
private static final List<String> staticList2 = new ArrayList<>();
public static void addString1(String s) {
synchronized (staticList1) {
staticList1.add(s);
}
}
public static void addString2(String s) {
synchronized (staticList2) {
staticList2.add(s);
}
}
}
可以發(fā)現(xiàn)新的示例中锐膜,我們使用final修飾了staticList1和staticList2,這是為什么呢弛房?
這涉及到對(duì)線程加鎖的原理道盏,當(dāng)程序?qū)€程加鎖時(shí),實(shí)際是在鎖對(duì)象中寫入了該線程的id文捶,因此鎖對(duì)象在運(yùn)行期間不該被重新賦值荷逞。如果在運(yùn)行期間鎖被重新賦值(例如上方的staticList1指向了一個(gè)新的對(duì)象),就相當(dāng)于使用多個(gè)鎖保護(hù)一個(gè)資源粹排,無法達(dá)到互斥的目的种远。而被final修飾的對(duì)象是必須被初始化的,且無法被重新賦值顽耳,滿足作為鎖的條件坠敷。
3.1.3 線程"等待-通知"機(jī)制
考慮如下程序,當(dāng)fun()
方法中canExecute()
這個(gè)運(yùn)行條件不滿足時(shí)斧抱,我們可能會(huì)通過循環(huán)等待的方式常拓,直到canExecute()
方法返回true后再運(yùn)行接下來的邏輯。
public class Sample {
private boolean execute = false;
public void fun() throws InterruptedException {
while (!canExecute()) {
Thread.sleep(10);
}
// ......
}
private synchronized boolean canExecute() {
return execute;
}
public synchronized void setExecute() {
execute = true;
}
}
問題在于辉浦,線程睡眠的時(shí)間是一個(gè)固定的值弄抬。如果該值太大,線程無法在運(yùn)行條件滿足后的第一時(shí)間開始運(yùn)行宪郊;如果該值太小掂恕,線程會(huì)不斷地調(diào)用同步方法canExecute()
判斷當(dāng)前的狀態(tài)拖陆,浪費(fèi)了運(yùn)行資源。那么有沒有一種方法懊亡,能夠使不滿足運(yùn)行條件的線程進(jìn)入休眠狀態(tài)(該狀態(tài)下線程不消耗運(yùn)行資源)依啰,而在滿足條件后的第一時(shí)間開始運(yùn)行呢?這就是線程的"等待-通知"機(jī)制店枣。
Java所有的對(duì)象都有wait()
速警、notify()
和notifyAll()
這3個(gè)方法,這是對(duì)象作為互斥鎖時(shí)所使用的方法鸯两,它們與synchronized共同實(shí)現(xiàn)了線程的"等待-通知"機(jī)制闷旧,需要注意的是,這3個(gè)方法必須在synchronized代碼塊中執(zhí)行钧唐。下面介紹這3個(gè)方法的作用忙灼。
-
wait(): 用于使當(dāng)前線程進(jìn)入休眠狀態(tài)。如果當(dāng)前的運(yùn)行條件未滿足钝侠,可以調(diào)用互斥鎖的
wait()
方法主動(dòng)放棄互斥鎖并進(jìn)入休眠態(tài)该园,隨后該線程會(huì)進(jìn)入互斥鎖的等待隊(duì)列中,直到被其他線程喚醒帅韧。 -
notify(): 用于喚醒互斥鎖等待隊(duì)列中的一個(gè)線程里初。當(dāng)?shù)却?duì)列中的線程的運(yùn)行條件被滿足時(shí),可以調(diào)用
notify()
方法隨機(jī)喚醒等待隊(duì)列中的一個(gè)線程弱匪,隨后被喚醒的線程去爭奪互斥鎖青瀑。 -
notifyAll(): 喚醒互斥鎖等待隊(duì)列中的所有線程。由于
notify()
方法喚醒線程時(shí)是隨機(jī)的萧诫,可能喚醒的并不是真正滿足運(yùn)行條件的線程斥难,因此實(shí)際開發(fā)中基本只用notifyAll()
,讓等待隊(duì)列中的所有線程去爭奪互斥鎖帘饶。
舉個(gè)簡單的例子哑诊,代碼如下所示。
線程t1運(yùn)行tryExecute()
方法及刻,發(fā)現(xiàn)mShouldExecute為false镀裤,調(diào)用wait()
主動(dòng)進(jìn)入休眠態(tài)。線程t2睡眠3秒后將mShouldExecute設(shè)置為true并調(diào)用notifyAll()
通知等待隊(duì)列中的線程缴饭,t1被喚醒后發(fā)現(xiàn)運(yùn)行條件滿足暑劝,繼續(xù)執(zhí)行。
public class Sample {
private boolean mShouldExecute = false;
public void tryExecute() throws InterruptedException {
synchronized (this) {
while (!mShouldExecute) {
System.out.println("tryExecute wait...");
wait();
}
realExecute();
}
}
private void realExecute() {
System.out.println("realExecute");
}
public void setExecute() {
synchronized (this) {
mShouldExecute = true;
notifyAll();
}
}
public static void main(String[] args) {
Sample sample = new Sample();
Thread t1 = new Thread(() -> {
try {
sample.tryExecute();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
Thread t2 = new Thread(() -> {
try {
Thread.sleep(3000);
sample.setExecute();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
t1.start();
t2.start();
}
}
上述程序中的互斥鎖是當(dāng)前對(duì)象this颗搂,因此在調(diào)用wait()
和notifyAll()
方法時(shí)省略了this担猛,寫完整應(yīng)該是this.wait()
和this.notifyAll()
。
在判斷程序是否滿足運(yùn)行條件時(shí),tryExecute()
方法中使用了循環(huán)while (!mShouldExecute)
傅联,這是因?yàn)榫€程t2調(diào)用notifyAll()
后先改,線程t1并不會(huì)馬上執(zhí)行,而是要去爭奪互斥鎖蒸走。有可能爭奪到互斥鎖時(shí)仇奶,運(yùn)行條件又不滿足了,因此需要重新判斷比驻。
線程"等待-通知"機(jī)制可以用來實(shí)現(xiàn)阻塞隊(duì)列(BlockingQueue)该溯。阻塞隊(duì)列是一種"生產(chǎn)者-消費(fèi)者"模型的數(shù)據(jù)結(jié)構(gòu),當(dāng)隊(duì)列為空時(shí)别惦,所有嘗試獲取數(shù)據(jù)的線程都會(huì)休眠朗伶,直到別的線程成功添加數(shù)據(jù)后喚醒它們;當(dāng)隊(duì)列已滿時(shí)步咪,所有嘗試添加數(shù)據(jù)的線程都會(huì)休眠,直到獲取數(shù)據(jù)成功的線程喚醒它們益楼。
public class BlockingQueue<T> {
private Queue<T> mQueue;
private int mCapacity;
public BlockingQueue(int capacity) {
mQueue = new ArrayDeque<>(capacity);
mCapacity = capacity;
}
/**
* 嘗試向隊(duì)列添加數(shù)據(jù), 如果隊(duì)列已滿則休眠當(dāng)前線程
*/
public void offer(T t) throws InterruptedException {
synchronized (this) {
while (isQueueFull()) {
wait();
}
mQueue.offer(t);
notifyAll();
}
}
/**
* 嘗試獲取隊(duì)頭數(shù)據(jù), 如果隊(duì)列為空則休眠當(dāng)前線程
*/
public T take() throws InterruptedException {
synchronized (this) {
while (isQueueEmpty()) {
wait();
}
T result = mQueue.poll();
notifyAll();
return result;
}
}
/**
* 判斷隊(duì)列是否為空
*/
private boolean isQueueEmpty() {
return mQueue.isEmpty();
}
/**
* 判斷隊(duì)列是否已滿
*/
private boolean isQueueFull() {
return mQueue.size() == mCapacity;
}
}
3.2 Lock
Lock用于解決synchronized在某些場景下的缺陷猾漫,在以下場景中synchronized無法實(shí)現(xiàn)最佳效果,但是Lock可以輕松解決感凤。
- 當(dāng)線程進(jìn)入synchronized修飾的方法或代碼塊中悯周,只有等線程執(zhí)行完或者調(diào)用
wait()
方法才會(huì)釋放鎖。如果一個(gè)線程在進(jìn)行耗時(shí)任務(wù)陪竿,那么其他線程都必須等待它運(yùn)行完畢禽翼,無法中斷。 - 使用synchronized保護(hù)共享變量時(shí)族跛,在同一時(shí)刻最多只有一個(gè)線程進(jìn)行讀寫闰挡。但是多個(gè)讀線程并不沖突,如果讀線程也互斥的話會(huì)影響程序效率礁哄。針對(duì)這種情況长酗,Lock提供了讀寫鎖ReadWriteLock。
- 使用synchronized實(shí)現(xiàn)"等待-通知"機(jī)制時(shí)桐绒,調(diào)用
notifyAll()
會(huì)喚醒所有阻塞的線程夺脾,無法喚醒特定的線程。例如在3.1.3的阻塞隊(duì)列中茉继,如果某個(gè)線程執(zhí)行poll()
方法取出了隊(duì)列中的最后一個(gè)數(shù)據(jù)咧叭,隨后只需要喚醒那些調(diào)用offer(T t)
的線程即可,而notifyAll()
會(huì)喚醒所有線程烁竭,如果調(diào)用poll()
方法的線程搶占到互斥鎖菲茬,也會(huì)馬上發(fā)現(xiàn)條件不滿足,然后繼續(xù)休眠。
Lock本身是一個(gè)接口生均,針對(duì)不同的場景Lock提供了不同的實(shí)現(xiàn)類听想,接口如下所示。
/**
* 獲取鎖
*
* Lock的實(shí)現(xiàn)應(yīng)該能夠監(jiān)測到鎖的錯(cuò)誤使用, 例如可能產(chǎn)生死鎖或拋出異常
* Lock的實(shí)現(xiàn)必須記錄鎖的情況和異常類型
*/
void lock();
/**
* 獲取鎖, 除非線程被中斷
*
* 如果當(dāng)前線程在進(jìn)入方法前或者在獲取鎖的過程中被設(shè)置為interrupted
* 那么會(huì)拋出InterruptedException并且當(dāng)前線程的中斷狀態(tài)會(huì)被清除
*/
void lockInterruptibly() throws InterruptedException;
/**
* 如果當(dāng)前鎖可用, 則獲取鎖, 否則直接返回false
*
* 一個(gè)典型的用法如下所示:
* Lock lock = ...;
* if (lock.tryLock()) {
* try {
* // manipulate protected state
* } finally {
* lock.unlock();
* }
* } else {
* // 備用邏輯
* }}
*/
boolean tryLock();
/**
* 如果在給定時(shí)間內(nèi)線程沒有被中斷且獲取到鎖則返回true
* 如果在給定時(shí)間之后線程還未獲取到鎖, 則返回false
*/
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
/**
* 釋放鎖
*/
void unlock();
/**
* 返回一個(gè)綁定了當(dāng)前Lock的條件變量Condition, 用于實(shí)現(xiàn)線程的"等待-通知"機(jī)制
* 在某個(gè)Condition上被阻塞的線程可以被單獨(dú)喚醒
*/
Condition newCondition();
Lock作為并發(fā)包的實(shí)現(xiàn)马胧,在使用上與synchronized關(guān)鍵字有所不同汉买。synchronized是自動(dòng)加鎖/解鎖的,即使在同步代碼塊中出現(xiàn)異常佩脊,JVM也能保證鎖正常釋放蛙粘。但是Lock無法做到,在使用Lock時(shí)威彰,需要遵守以下的范式來保證遇到異常時(shí)也能正常釋放鎖出牧,否則其他線程永遠(yuǎn)得不到運(yùn)行的機(jī)會(huì)。
public void fun() {
lock.lock();
try {
// ......
} finally {
rtl.unlock();
}
}
3.2.1 可重入鎖ReentrantLock
可重入鎖指的是線程可以重復(fù)獲取同一把鎖歇盼,示例如下所示舔痕,一個(gè)線程運(yùn)行到fun1()
時(shí)獲取到了鎖,在未釋放鎖的情況下調(diào)用fun2()
是可以正常運(yùn)行的豹缀。
synchronized關(guān)鍵字也是可重入鎖伯复,因?yàn)樗募渔i操作本質(zhì)上就是在鎖這個(gè)Java對(duì)象的對(duì)象頭寫入當(dāng)前線程的id,表示鎖被當(dāng)前線程占有了邢笙,自然是可重入的啸如。
public class Sample {
private Lock lock = new ReentrantLock();
public void fun1() {
lock.lock();
try {
fun2(); // fun2()也需要獲取鎖
} finally {
lock.unlock();
}
}
public void fun2() {
lock.lock();
try {
// ......
} finally {
lock.unlock();
}
}
使用synchronized關(guān)鍵字時(shí),可以通過wait()
和notifyAll()
實(shí)現(xiàn)線程的"等待-通知"機(jī)制氮惯,在3.1.3中通過它們實(shí)現(xiàn)了一個(gè)阻塞隊(duì)列叮雳。但是使用wait()
和notifyAll()
存在的問題是,只能喚醒所有休眠的線程妇汗,而無法根據(jù)當(dāng)前的條件喚醒特定的線程去執(zhí)行帘不。
但是Lock解決了這個(gè)問題,Lock接口有個(gè)方法Condition newCondition()
可以新建一個(gè)與當(dāng)前Lock綁定的條件變量杨箭⊙峋可以通過Condition.await()
方法使某個(gè)線程休眠,當(dāng)該條件變量滿足后告唆,可以通過Condition.signalAll()
喚醒該條件變量下休眠的線程棺弊。
下面用ReentrantLock和Condition實(shí)現(xiàn)阻塞隊(duì)列如下,此時(shí)可以創(chuàng)建兩個(gè)Condition擒悬,一個(gè)Condition表示隊(duì)列不滿模她,如果線程添加數(shù)據(jù)時(shí)發(fā)現(xiàn)隊(duì)列已滿,那么阻塞在該Condition上懂牧,直到有數(shù)據(jù)出隊(duì)時(shí)喚醒阻塞在該條件上的線程侈净;另一個(gè)Condition表示隊(duì)列不空尊勿,如果線程獲取數(shù)據(jù)時(shí)發(fā)現(xiàn)隊(duì)列為空,則阻塞在該Condition上畜侦,直到有數(shù)據(jù)入隊(duì)時(shí)喚醒阻塞在該條件上的線程元扔。
public class BlockingQueue<T> {
final Lock lock = new ReentrantLock();
final Condition notFull = lock.newCondition(); // 條件: 隊(duì)列不滿
final Condition notEmpty = lock.newCondition(); // 條件: 隊(duì)列不空
private Queue<T> mQueue;
private int mCapacity;
public BlockingQueue(int capacity) {
mQueue = new ArrayDeque<>(capacity);
mCapacity = capacity;
}
public void offer(T t) {
lock.lock();
try {
while (isQueueFull()) {
notFull.await();
}
mQueue.offer(t);
// 入隊(duì)后, 通知可以出隊(duì)了
notEmpty.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
private boolean isQueueFull() {
return mQueue.size() == mCapacity;
}
public T take() {
T result = null;
lock.lock();
try {
while (isQueueEmpty()) {
notEmpty.await();
}
result = mQueue.poll();
// 出隊(duì)后, 通知可以入隊(duì)了
notFull.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
return result;
}
private boolean isQueueEmpty() {
return mQueue.size() == 0;
}
}
3.2.2 讀寫鎖ReadWriteLock
使用synchronized關(guān)鍵字保護(hù)共享變量時(shí),線程的讀操作也會(huì)互斥旋膳,但是多個(gè)線程的讀操作并不會(huì)產(chǎn)生并發(fā)問題澎语。針對(duì)讀寫場景,Java并發(fā)包提供了讀寫鎖ReadWriteLock验懊,它是一個(gè)接口该肴,如下所示攀痊。
public interface ReadWriteLock {
Lock readLock(); // 返回讀鎖
Lock writeLock(); // 返回寫鎖
}
ReadWriteLock的實(shí)現(xiàn)為ReentrantReadWriteLock夹纫,從命名可以看出蝌戒,這是一個(gè)可重入鎖。當(dāng)調(diào)用readLock()
或writeLock()
獲取讀鎖或?qū)戞i時(shí)碱工,返回的是ReentrantReadWriteLock中的內(nèi)部變量readerLock和writerLock娃承,它們都是Lock接口的實(shí)現(xiàn)類。
public class ReentrantReadWriteLock
implements ReadWriteLock, java.io.Serializable {
/** Inner class providing readlock */
private final ReentrantReadWriteLock.ReadLock readerLock;
/** Inner class providing writelock */
private final ReentrantReadWriteLock.WriteLock writerLock;
/** Performs all synchronization mechanics */
final Sync sync;
// 默認(rèn)新建非公平鎖
public ReentrantReadWriteLock() {
this(false);
}
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; }
......
}
在使用之前先來看一下ReadWriteLock的特性:
- 讀和讀之間不互斥怕篷,讀和寫之間互斥草慧,寫和寫之間互斥。這意味著沒有寫鎖時(shí)匙头,讀鎖可以被多個(gè)線程持有。
- 讀寫鎖只適用于讀多寫少的場景仔雷,例如某個(gè)很少被修改但是經(jīng)常被搜索的數(shù)據(jù)(如條目)就適合使用讀寫鎖蹂析。如果寫操作較為頻繁,那么數(shù)據(jù)大部分時(shí)間都被獨(dú)占鎖占據(jù)碟婆,并不會(huì)提升并發(fā)性能电抚。
- 持有讀鎖的線程無法直接獲取寫鎖,但是持有寫鎖的線程可以獲取讀鎖竖共,其他線程無法獲取讀鎖蝙叛。換句話說,讀鎖無法升級(jí)為寫鎖公给,寫鎖可以降級(jí)為讀鎖借帘。
- 讀鎖和寫鎖都實(shí)現(xiàn)了Lock接口,因此它們都支持
tryLock()
和lockInterruptibly()
等方法淌铐。但是只有寫鎖支持Condition肺然,讀鎖不支持使用條件變量。
使用ReadWriteLock的一個(gè)簡單示例如下腿准。
public class Sample<T> {
private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private T mData;
public T read() {
readWriteLock.readLock().lock();
try {
return mData;
} finally {
readWriteLock.readLock().unlock();
}
}
public void write(T t) {
readWriteLock.writeLock().lock();
try {
mData = t;
} finally {
readWriteLock.writeLock().unlock();
}
}
}
3.2.3 支持樂觀讀的StampedLock
StampedLock具有三種支持讀/寫功能的模式际起,它的狀態(tài)由版本(stamp)和當(dāng)前的模式組成。StampedLock獲取鎖的方法會(huì)返回一個(gè)stamp,該值用于表示當(dāng)前鎖的狀態(tài)街望,在釋放鎖和轉(zhuǎn)換鎖時(shí)需要stamp作為參數(shù)校翔,如果它與鎖的狀態(tài)不匹配就會(huì)失敗。StampedLock支持的三種模式為寫灾前、讀和樂觀讀防症。
- 寫模式,類似讀寫鎖的寫鎖豫柬。線程調(diào)用
StampedLock.writeLock()
后獨(dú)占訪問數(shù)據(jù)告希,該方法返回一個(gè)stamp表示鎖的狀態(tài),調(diào)用StampedLock.unlockWrite(stamp)
解鎖時(shí)需要該stamp烧给。
tryWriteLock()
和tryWriteLock()
方法也會(huì)返回stamp燕偶,這2個(gè)方法獲取鎖失敗時(shí)會(huì)返回0。當(dāng)鎖處于寫模式時(shí)無法獲得讀鎖础嫡,所有樂觀讀的驗(yàn)證都將失敗指么。 - 讀模式,類似讀寫鎖的讀鎖榴鼎。線程調(diào)用
StampedLock.readLock()
后進(jìn)行讀操作伯诬,多個(gè)線程的讀操作可同時(shí)進(jìn)行。讀鎖加鎖時(shí)也會(huì)返回stamp巫财,用于解鎖時(shí)調(diào)用StampedLock.unlockRead(stamp)
使用盗似。 - 樂觀讀模式,該模式是讀鎖的弱化版本平项,使用時(shí)不需要獲取鎖赫舒。樂觀讀的思想是,假設(shè)讀的過程中數(shù)據(jù)并未被修改闽瓢。樂觀讀避免了鎖的爭用并提高了吞吐量接癌,但是它的正確性無法保證,因此讀到結(jié)果后需要驗(yàn)證在樂觀讀時(shí)是否有線程獲取了寫鎖扣讼。
調(diào)用StampedLock.tryooptimisticread()
后可進(jìn)行樂觀讀缺猛,如果當(dāng)前不處于寫入模式則返回非零stamp,樂觀讀結(jié)束后調(diào)用validate(stamp)
判斷結(jié)果是否有效椭符。
StampedLock的使用依賴于對(duì)所保護(hù)的數(shù)據(jù)荔燎、對(duì)象和方法的了解,如果樂觀讀的結(jié)果未經(jīng)驗(yàn)證销钝,不應(yīng)該用于無法忍受潛在不一致的方法湖雹。StampedLock是不可重入的,因此獲取了鎖的線程不應(yīng)該調(diào)用其他嘗試獲取鎖的方法曙搬。
來看官方提供的例子摔吏,Point類有3個(gè)方法鸽嫂,move(...)
方法描述了寫模式的一般流程;distanceFromOrigin()
描述了樂觀讀模式的流程征讲,如果樂觀讀的數(shù)據(jù)失效据某,則獲取讀鎖進(jìn)行讀操作;moveIfAtOrigin(...)
描述了StampedLock的鎖升級(jí)诗箍,在平時(shí)開發(fā)中癣籽,鎖的升級(jí)要慎重使用。
public class Point {
private double x, y;
private final StampedLock sl = new StampedLock();
// 寫鎖
void move(double deltaX, double deltaY) {
long stamp = sl.writeLock();
try {
x += deltaX;
y += deltaY;
} finally {
sl.unlockWrite(stamp);
}
}
// 樂觀讀
double distanceFromOrigin() {
long stamp = sl.tryOptimisticRead();
double currentX = x, currentY = y;
if (!sl.validate(stamp)) {
// 如果數(shù)據(jù)已過時(shí), 則獲取讀鎖進(jìn)行讀操作
stamp = sl.readLock();
try {
currentX = x;
currentY = y;
} finally {
sl.unlockRead(stamp);
}
}
return Math.sqrt(currentX * currentX + currentY * currentY);
}
// 鎖的升級(jí)
void moveIfAtOrigin(double newX, double newY) {
long stamp = sl.readLock();
try {
while (x == 0.0 && y == 0.0) {
long ws = sl.tryConvertToWriteLock(stamp);
if (ws != 0L) {
// 需要將升級(jí)后的stamp值更新
stamp = ws;
x = newX;
y = newY;
break;
} else {
sl.unlockRead(stamp);
stamp = sl.writeLock();
}
}
} finally {
sl.unlock(stamp);
}
}
}
四滤祖、線程安全的變量
互斥鎖是用于保護(hù)共享變量的筷狼,但如果一個(gè)變量不會(huì)被多個(gè)線程同時(shí)訪問也就沒有保護(hù)的必要了,例如方法內(nèi)的局部變量匠童。根據(jù)Java虛擬機(jī)規(guī)范埂材,每個(gè)線程的內(nèi)存中都包含程序計(jì)數(shù)器、虛擬機(jī)棧以及本地方法區(qū)這3塊內(nèi)存汤求,其中虛擬機(jī)棧用于存儲(chǔ)線程的方法調(diào)用俏险,當(dāng)調(diào)用某個(gè)方法時(shí),線程會(huì)在虛擬機(jī)棧中壓入一個(gè)棧幀扬绪;等該方法調(diào)用完畢時(shí)竖独,對(duì)應(yīng)的棧幀就會(huì)從虛擬機(jī)棧中彈出,而局部變量表就保存在棧幀中挤牛。
在大部分Java虛擬機(jī)的實(shí)現(xiàn)中莹痢,虛擬機(jī)棧的大小是固定的,只有少數(shù)JVM的虛擬機(jī)棧是可以擴(kuò)展的墓赴。虛擬機(jī)棧大小固定的情況下竞膳,如果進(jìn)行一個(gè)深度過大的遞歸調(diào)用時(shí),就可能產(chǎn)生StackOverflow異常竣蹦,就是因?yàn)樘摂M機(jī)棧沒有足夠的空間去存儲(chǔ)一個(gè)棧幀了。
當(dāng)然共享變量也不一定是線程不安全的沧奴,下面介紹3種線程安全的變量痘括,它們實(shí)現(xiàn)線程安全的方式各不相同。
4.1 不可變變量
并發(fā)環(huán)境下對(duì)共享變量的讀寫會(huì)出現(xiàn)問題滔吠,但是只讀不寫就沒有并發(fā)問題纲菌。如果一個(gè)共享變量在初始化后不會(huì)再改變,它就是線程安全變量疮绷。我們知道被final關(guān)鍵字修飾的變量是必須被初始化且初始化后不再改變的翰舌,那么被final修飾的共享變量是否就是線程安全的呢?我們以兩個(gè)例子來說明冬骚。
先來看這個(gè)例子椅贱,類中有2個(gè)共享變量分別是String和int類型的懂算,它們在構(gòu)造方法中會(huì)被初始化,并且不可更改庇麦,因此是線程安全的计技。
public class Sample {
private final String mString;
private final int mInt;
public Sample(String s, int i) {
mString = s;
mInt = i;
}
public String getString() {
return mString;
}
public int getInt() {
return mInt;
}
}
來看第二個(gè)例子,雖然Sample中將mTest修飾為final變量山橄,但是這里的final只是表示mTest對(duì)象的內(nèi)存地址不可變垮媒,對(duì)象中的值是可變的,所以通過getTest()
獲取到mTest變量后航棱,是可以對(duì)mTest中的String和int重新賦值的睡雇。而Test類中的String和int變量不是final變量,所以Sample不是線程安全的饮醇。
public class Sample {
private final Test mTest;
public Sample(Test test) {
this.mTest = test;
}
public Test getTest() {
return mTest;
}
static class Test {
public String s;
public int i;
}
}
如果想要讓類的對(duì)象是不可變的它抱,那么類的屬性應(yīng)該全部修飾為final,如果某個(gè)屬性不是基本類型驳阎,那么該類也應(yīng)將所有屬性修飾為final抗愁。
需要注意的是,String類型是不可變的呵晚,當(dāng)調(diào)用String的subString()
或replace()
方法時(shí)不是修改原來的String蜘腌,而是生成一個(gè)新的String對(duì)象。但是StringBuilder和StringBuffer是可變的饵隙,調(diào)用append()
方法時(shí)是在原對(duì)象上操作的撮珠。
4.2 原子類變量
在1.1中我們解決a++
原子性問題的方法是使用synchronized,但是加鎖/解鎖操作以及線程切換都是比較消耗性能的金矛。但是有一種變量叫原子類變量芯急,它的方法本身就是原子操作,這是通過CPU指令支持的驶俊,因此具有很好的效率娶耍。
4.2.1 基本變量原子類
原子類中的AtomicBoolean, AtomicInteger, AtomicLong對(duì)應(yīng)基本類型的Boolean, Integer, Long,這3個(gè)原子類共有的方法如下饼酿。
// 只有當(dāng)前原子變量的值是expect才更新為update, 成功返回true, 失敗返回false
boolean compareAndSet(expect, update);
// 將原子變量的值更新為newValue并返回更新前的值
boolean/int/long getAndSet(newValue);
// 將原子變量的值更新為newValue, 但是對(duì)其他線程并不一定立即可見
void lazySet(newValue);
當(dāng)然AtomicInteger和AtomicLong還有另外的方法榕酒,這些方法通常成對(duì)出現(xiàn),例如getAndIncrement()
和incrementAndGet()
故俐,它們的功能都是自增想鹰,唯一的不同就在于返回的是更新前的值還是更新后的值。如果1.1的例子使用原子類來實(shí)現(xiàn)药版,只需要將a改為原子變量a = new new AtomicInteger(0)
并將a++
改為a.getAndIncrement()
就能得到正確的結(jié)果辑舷。
// 增加delta, 返回更新后的值, getAndAdd()與其作用相同但是返回更新前的值
int/long addAndGet(int delta);
// 通過IntBinaryOperator對(duì)prev和x計(jì)算得到新的值, IntBinaryOperator的方法必須無副作用
int/long accumulateAndGet(x, IntBinaryOperator);
// 通過IntUnaryOperator對(duì)prev計(jì)算得到新的值, IntUnaryOperator的方法必須無副作用
int/long updateAndGet(IntUnaryOperator updateFunction);
原子類通過CPU提供的CAS指令實(shí)現(xiàn)了原子性,該指令本身具有原子性槽片。CAS指Compare And Swap何缓,即比較并交換肢础,該指令先比較共享變量的值與期望值,只有這兩個(gè)值相等才將共享變量的值更新歌殃。原子類的compareAndSet(expect, update)
方法就是通過native的compareAndSwapXXX(...)
方法實(shí)現(xiàn)的乔妈。
CAS+自旋是并發(fā)編程的一大利器,在Java并發(fā)包中得到了廣泛的應(yīng)用氓皱,"自旋"實(shí)際上就是通過循環(huán)不斷嘗試CAS操作直到成功路召。例如AtomicInteger的getAndAdd(delta)
方法,它實(shí)際調(diào)用Unsafe中的getAndAddInt(Object var1, long var2, int var4)
方法波材,如下所示股淡。由于在進(jìn)行CAS指令之前,原子類的值可能被其他線程修改廷区,因此需要循環(huán)獲取原子變量當(dāng)前的值并調(diào)用compareAndSwapInt(...)
直到成功唯灵。
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}
4.2.2 原子數(shù)組類
數(shù)組原子類包括AtomicIntegerArray, AtomicLongArray和AtomicReferenceArray,它們的方法都大同小異隙轻。以AtomicIntegerArray為例埠帕,它有2個(gè)構(gòu)造方法,一個(gè)傳入數(shù)組長度敛瓷,數(shù)組中所有的值被初始化為0呐籽,另一個(gè)構(gòu)造方法直接傳入一個(gè)int數(shù)組蚀瘸。
public AtomicIntegerArray(int length) {
array = new int[length];
}
public AtomicIntegerArray(int[] array) {
// Visibility guaranteed by final field guarantees
this.array = array.clone();
}
相比于AtomicInteger,AtomicIntegerArray的方法在使用時(shí)只是多傳入了數(shù)組下標(biāo)參數(shù)i贮勃,看看下面的方法寂嘉,是不是似曾相識(shí)丝格?AtomicLongArray同理。
AtomicReferenceArray是原子引用的數(shù)組订咸,接下來的4.2.3會(huì)講解原子引用AtomicReference骆撇,使用原子數(shù)組類時(shí)只需加上下標(biāo)即可神郊。
boolean compareAndSet(int i, int expect, int update);
int getAndAdd(int i, int delta);
int getAndIncrement(int i);
......
4.2.3 原子引用類
引用原子類為AtomicReference,那么它是將改變引用這個(gè)操作封裝成了原子操作嗎夕晓?當(dāng)然不是!因?yàn)楦淖円眠@個(gè)操作本身就是原子的躬贡,對(duì)引用賦值只需要一條指令逗宜,它不會(huì)被CPU打斷。
這點(diǎn)也可以從AtomicReference的set(V newValue)
方法看出來熬甚,該方法只是對(duì)引用賦值。
public final void set(V newValue) {
value = newValue;
}
因此AtomicReference的價(jià)值就在于它的CAS指令诲泌,用于防止其他線程隨意篡改引用的值敷扫。
boolean compareAndSet(V expect, V update);
使用AtomicReference要注意是否存在ABA問題绘迁,先來解釋一下什么是ABA問題。
之前使用AtomicInteger時(shí)膛腐,我們將原子類的值當(dāng)成了它的狀態(tài)依疼,在調(diào)用compareAndSet(A, update)
時(shí)只要原子類的值與A相等就認(rèn)為它沒有被修改過,但這樣的判斷是存在隱患的误辑。如果T1線程在調(diào)用compareAndSet(A, update)
之前,T2線程將值從A改為B砰苍,再從B改為A赚导,雖然原子類的值沒變吼旧,但是它的狀態(tài)已經(jīng)發(fā)生了變化。隨后T1線程執(zhí)行CAS時(shí)發(fā)現(xiàn)值未變就會(huì)繼續(xù)執(zhí)行员串,但是原子類狀態(tài)上的變化會(huì)引起一些意想不到的錯(cuò)誤。
維基百科的CAS詞條[參考2]描述了一個(gè)無鎖椃梅蓿可能出現(xiàn)的ABA問題。先解釋一下無鎖棧卧斟,如果需要實(shí)現(xiàn)一個(gè)支持并發(fā)的棧,有哪些方式可以實(shí)現(xiàn)呢板乙?
顯而易見的實(shí)現(xiàn)方式是在pop()
和push()
方法上加鎖,這種實(shí)現(xiàn)簡單粗暴放接,不會(huì)出錯(cuò),但是效率較低。更好的方式是使用CAS+自旋汉操,該實(shí)現(xiàn)不需要加鎖,通過CAS+自旋實(shí)現(xiàn)的數(shù)據(jù)結(jié)構(gòu)也被稱為無鎖結(jié)構(gòu)采缚。
以無鎖棧為例,其數(shù)據(jù)結(jié)構(gòu)為鏈表贸呢,通過一個(gè)指針head指向棧頂怔鳖,棧的push和pop操作都依賴head完成,當(dāng)head指向null時(shí)棧為空献幔。
為了保證線程對(duì)head操作時(shí)的原子性,需要將head定義為原子引用AtomicReference<StackNode>
铸敏,無鎖棧代碼如下所示。
下面以push操作為例,介紹CAS+自旋是如何工作的禁筏。如下方代碼的push()
方法所示,新建pushNode后州刽,將pushNode.next指向head,并通過CAS將head指向pushNode门坷。但是執(zhí)行CAS之前head可能會(huì)被修改,因此如果CAS執(zhí)行失敗绸吸,就需要重新為pushNode賦值并再次嘗試CAS直到成功温数。
public class ConcurrentStack {
private AtomicReference<StackNode> headReference;
public ConcurrentStack() {
headReference = new AtomicReference<>(null);
}
public void push(Integer i) {
StackNode pushNode = new StackNode(i);
while (true) {
StackNode head = headReference.get();
pushNode.next = head;
// 其他線程可能在此處修改了head的值
if (headReference.compareAndSet(head, pushNode)) {
break;
}
}
}
public StackNode pop() {
while (true) {
StackNode head = headReference.get();
if (head == null) {
return null;
}
StackNode newHead = head.next;
// 其他線程可能在此處修改了head的值
if (headReference.compareAndSet(head, newHead)) {
return head;
}
}
}
private static class StackNode {
public int value;
public StackNode next;
public StackNode(Integer i) {
value = i;
}
}
}
那么在什么情況下鹉胖,無鎖棧會(huì)發(fā)生ABA問題呢?假設(shè)當(dāng)前棧如下所示。
此時(shí)T1線程調(diào)用pop()
,但是在執(zhí)行CAS之前該線程被掛起丘喻,也就是停在了上述pop()
方法的注釋處。此時(shí)head指向StackNode4,newHead指向StackNode3叽躯。
但是這時(shí)T2線程調(diào)用了2次pop()
使得StackNode4和StackNode3出棧酣难,隨后新建另一個(gè)值為4的節(jié)點(diǎn)StackNode4*并push入棧。可以看到此時(shí)StackNode4和StackNode3是游離在棧外的尾膊,但是StackNode3.next還是指向StackNode2的。T1線程中的臨時(shí)變量head指向StackNode4抓谴,臨時(shí)變量newHead指向StackNode3。
如果由于某種原因,StackNode4*的地址與T1線程中的head(也就是StackNode4)相等,那么T1線程獲得時(shí)間片后會(huì)認(rèn)為head并未發(fā)生變化并執(zhí)行CAS券犁,即使此時(shí)棧的結(jié)構(gòu)和T1掛起前已經(jīng)不一樣了。T1執(zhí)行完之后棧如下所示,該結(jié)果與預(yù)期的并不一致褂删。
問題在于,StackNode4*的地址可能與StackNode4一樣嗎钦无?換個(gè)問法,新的對(duì)象可能與之前的對(duì)象地址相同嗎?當(dāng)然有可能决记!主要是以下2種情況:
① 該類使用享元模式按价,所有值一樣的對(duì)象實(shí)際都是一個(gè)。但是StackNode不可能使用享元模式凄杯,因?yàn)楣?jié)點(diǎn)的next值都是不同的。
② 類使用了對(duì)象緩存池,以StackNode為例隔崎,被pop的節(jié)點(diǎn)會(huì)被緩存,需要新節(jié)點(diǎn)時(shí)先從緩存池中獲取实牡。
了解了ABA問題產(chǎn)生的原因后,我們通過代碼來模擬。由于該場景難以模擬携栋,因此選擇在棧的pop()
方法中進(jìn)行延時(shí),保證其他線程在CAS之前能夠修改head,修改后的ConcurrentStack如下。
public class ConcurrentStack {
省略其他代碼......
public StackNode pop(long t) {
while (true) {
StackNode head = headReference.get();
if (head == null) {
return null;
}
StackNode newHead = head.next;
try {
Thread.sleep(t);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (headReference.compareAndSet(head, newHead)) {
return head;
}
}
}
}
然后修改StackNode類,通過ConcurrentHashMap緩存被回收的StackNode對(duì)象蒜危,獲取StackNode對(duì)象統(tǒng)一通過obtain(Integer i)
方法。
public class StackNode {
public int value;
public StackNode next;
private static ConcurrentHashMap<Integer, StackNode> mCache = new ConcurrentHashMap<>();
public static StackNode obtain(Integer i) {
StackNode stackNode = mCache.remove(i);
if (stackNode == null) {
stackNode = new StackNode(i);
}
return stackNode;
}
public static void recycle(StackNode stackNode) {
mCache.putIfAbsent(stackNode.value, stackNode);
}
private StackNode(Integer i) {
value = i;
}
}
隨后運(yùn)行以下代碼測試,就會(huì)發(fā)現(xiàn)無鎖棧最后的結(jié)果與預(yù)期不符。代碼中的注釋詳細(xì)描述了每一步的運(yùn)行流程,不再贅述蛔翅。
public class Main {
public static void main(String[] args) throws InterruptedException {
ConcurrentStack concurrentStack = new ConcurrentStack();
// 初始化棧為 StackNode4->StackNode3->StackNode2->StackNode1->StackNode0
for (int i = 0; i < 5; i++) {
concurrentStack.push(i);
}
// 開啟 popThread, 在 CAS 前 sleep 一段時(shí)間, 讓另一個(gè)線程去修改棧
Thread popThread = new Thread(() -> {
concurrentStack.pop(1000);
});
// 開啟 abaThread 先 pop StackNode4 和 StackNode3, 并回收 StackNode4
// 再 push StackNode4, 正常情況下棧應(yīng)為 4->2->1->0
// 但是 popThread 在休眠后繼續(xù)執(zhí)行 CAS 時(shí)發(fā)現(xiàn)棧頭還是 StackNode4, 誤以為棧未發(fā)生變化
// 因此 popThread 將 headReference 賦值給已經(jīng)被 pop 的 StackNode3
// 而 StackNode3.next 指向 StackNode2, 最后棧為 3->2->1->0
Thread abaThread = new Thread(() -> {
StackNode.recycle(concurrentStack.pop(0));
concurrentStack.pop(0);
concurrentStack.push(4);
});
popThread.start();
Thread.sleep(500);
abaThread.start();
popThread.join();
// 運(yùn)行完, 查看棧的結(jié)果
StackNode s;
System.out.print("當(dāng)前棧為: ");
while ((s = concurrentStack.pop(0)) != null) {
System.out.print(s.value + " -> ");
}
}
}
針對(duì)這個(gè)問題,并發(fā)包中自然也提供了解決方案,那就是AtomicStampedReference
,該類通過stamp記錄原子類當(dāng)前的版本洁灵,每次修改都會(huì)更新版本。
4.3 ThreadLocal
ThreadLocal也被稱為線程本地變量,它為各個(gè)線程都存儲(chǔ)了一個(gè)變量琅翻。每個(gè)線程調(diào)用ThreadLocal.set(value)
或ThreadLocal.get()
時(shí)操作的都是當(dāng)前線程對(duì)應(yīng)的變量聂抢,因此ThreadLocal是線程安全的。
4.3.1 基本使用
Java提供了一個(gè)例子,通過ThreadLocal為每個(gè)線程設(shè)置一個(gè)ID,代碼如下所示篱瞎。
public class ThreadId {
// 通過原子變量為線程設(shè)置ID
private static final AtomicInteger nextId = new AtomicInteger(0);
// 重寫ThreadLocal的initialValue()方法, 用于初始化各線程對(duì)應(yīng)的變量
private static final ThreadLocal<Integer> threadId = new ThreadLocal<Integer>() {
@Override
protected Integer initialValue() {
return nextId.getAndIncrement();
}
};
// Returns the current thread's unique ID, assigning it if necessary
public static int get() {
return threadId.get();
}
}
在新建ThreadLocal時(shí)重寫了它的initialValue()
方法,該方法的作用是,如果某個(gè)線程調(diào)用ThreadLocal.get()
時(shí)發(fā)現(xiàn)值為null腰奋,就會(huì)調(diào)用initialValue()
初始化該線程對(duì)應(yīng)的變量。
public T get() {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null) {
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
T result = (T)e.value;
return result;
}
}
return setInitialValue();
}
private T setInitialValue() {
T value = initialValue();
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this, value);
else
createMap(t, value);
return value;
}
ThreadLocal還提供了一個(gè)靜態(tài)的構(gòu)造方法withInitial(Supplier<? extends S> supplier)
,也用于為ThreadLocal的每個(gè)線程設(shè)置初始值。
ThreadLocal<Integer> threadId = ThreadLocal.withInitial(new Supplier<Integer>() {
@Override
public Integer get() {
return nextId.getAndIncrement();
}
});
ThreadLocal.withInitial(...)
返回的是SuppliedThreadLocal
對(duì)象,它繼承了ThreadLocal并重寫了initialValue()
方法,其實(shí)與第一種初始化方法沒什么區(qū)別蔓挖。
static final class SuppliedThreadLocal<T> extends ThreadLocal<T> {
private final Supplier<? extends T> supplier;
SuppliedThreadLocal(Supplier<? extends T> supplier) {
this.supplier = Objects.requireNonNull(supplier);
}
@Override
protected T initialValue() {
return supplier.get();
}
}
4.3.2 ThreadLocal實(shí)現(xiàn)原理
在ThreadLocal中角溃,線程和數(shù)據(jù)是一對(duì)一的關(guān)系,這種對(duì)應(yīng)關(guān)系很容易讓我們聯(lián)想到Map。直觀上來看燕鸽,似乎是ThreadLocal中維護(hù)了一個(gè)Key為線程九昧,Value為數(shù)據(jù)的Map。這樣當(dāng)然也能實(shí)現(xiàn)ThreadLocal,不過在這種實(shí)現(xiàn)下,ThreadLocal成為了線程數(shù)據(jù)的持有者,并且持有Thread的引用胶滋,這種實(shí)現(xiàn)很容易造成內(nèi)存泄漏镀迂。
而在Java的實(shí)現(xiàn)中,Thread才是數(shù)據(jù)的持有者,ThreadLocal是線程本地變量的管理者。Thread中持有一個(gè)ThreadLocalMap類型的Map用于保存該線程的本地變量赚瘦。
public class Thread {
......
ThreadLocal.ThreadLocalMap threadLocals = null;
......
}
ThreadLocalMap是ThreadLocal的內(nèi)部類,它是一個(gè)專門設(shè)計(jì)出來,用于維護(hù)線程本地變量的Map亲善,它的Key為ThreadLocal的弱引用戏溺,Value則是數(shù)據(jù)袍睡。問題在于肋僧,為什么要專門實(shí)現(xiàn)一個(gè)Map來維護(hù)線程本地變量斑胜,難道原生的HashMap不滿足需求嗎?下面讓我們帶著問題去分析ThreadLocalMap的源碼嫌吠。
①. 成員變量和構(gòu)造方法
ThreadLocalMap通過Entry[] table
保存所有的實(shí)體。實(shí)體Entry繼承自ThreadLocal的弱引用辫诅,Entry中的value保存當(dāng)前線程與ThreadLocal關(guān)聯(lián)的變量凭戴。我們知道,當(dāng)一個(gè)對(duì)象只有弱引用指向它時(shí)炕矮,它就會(huì)被GC回收么夫。因此ThreadLocal本身不會(huì)內(nèi)存泄漏。
static class ThreadLocalMap {
static class Entry extends WeakReference<ThreadLocal<?>> {
/** The value associated with this ThreadLocal. */
Object value;
Entry(ThreadLocal<?> k, Object v) {
super(k);
value = v;
}
}
private static final int INITIAL_CAPACITY = 16; // 初始容量, 容量必須是2的倍數(shù)
private Entry[] table; // Hash表
private int size = 0; // Hash表當(dāng)前Entry的數(shù)量
private int threshold; // size達(dá)到這個(gè)數(shù)量時(shí)擴(kuò)容
// 根據(jù)容量計(jì)算下一個(gè)索引
private static int nextIndex(int i, int len) {
return ((i + 1 < len) ? i + 1 : 0);
}
// 根據(jù)容量計(jì)算上一個(gè)索引
private static int prevIndex(int i, int len) {
return ((i - 1 >= 0) ? i - 1 : len - 1);
}
// ThreadLocalMap是懶加載的肤视,只有當(dāng)線程需要保存本地變量時(shí)
// 才會(huì)新建ThreadLocalMap并在構(gòu)造方法傳入第一個(gè)變量的key和value档痪。
ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
table = new Entry[INITIAL_CAPACITY];
int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
table[i] = new Entry(firstKey, firstValue);
size = 1;
setThreshold(INITIAL_CAPACITY);
}
......
}
②. 保存數(shù)據(jù)
ThreadLocl.set(value)
方法如下,首先獲取當(dāng)前線程的ThreadLocalMap邢滑,如果存在直接調(diào)用ThreadLocalMap的set(key, value)
方法腐螟,如果不存在就新建。
public void set(T value) {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this, value);
else
createMap(t, value);
}
下面來看ThreadLocalMap是如何保存數(shù)據(jù)的,它使用線性探測法解決Hash沖突乐纸。保存Entry時(shí)先計(jì)算key的hash值衬廷,再通過index = hash & (len - 1)
得到Entry應(yīng)保存的位置。如果table[index]
為空則在該位置保存Entry汽绢,如果不為空說明出現(xiàn)hash沖突泵督,對(duì)index向后順移直到table[index+n] (n=1,2,3...)
為空。使用線性探測法時(shí)庶喜,如果Hash沖突較多小腊,它的效率會(huì)大幅下降。
ThreadLocalMap的set(ThreadLocal<?> key, Object value)
方法如下久窟,該方法描述了使用線性探測法時(shí)添加數(shù)據(jù)的整體邏輯秩冈。
private void set(ThreadLocal<?> key, Object value) {
Entry[] tab = table;
int len = tab.length;
int i = key.threadLocalHashCode & (len-1);
// 從位置i開始向后遍歷, 直到table[i]為空
for (Entry e = tab[i]; e != null; e = tab[i = nextIndex(i, len)]) {
ThreadLocal<?> k = e.get();
// 當(dāng)前key已經(jīng)存在, 則保存新的value即可
if (k == key) {
e.value = value;
return;
}
// key為null說明ThreadLocal已經(jīng)被GC回收了
// 需要用新的數(shù)據(jù)替代之前已經(jīng)過期的
if (k == null) {
// replaceStaleEntry()方法做了大量的工作, 下面會(huì)詳細(xì)分析
replaceStaleEntry(key, value, i);
return;
}
}
// 遍歷到table[i]為空也沒有發(fā)現(xiàn)相同的key或過期的key
// 因此需要新建一個(gè)Entry放置在當(dāng)前位置
tab[i] = new Entry(key, value);
int sz = ++size;
// 如果數(shù)量達(dá)到了threshold則進(jìn)行擴(kuò)容并重新hash
if (!cleanSomeSlots(i, sz) && sz >= threshold)
rehash();
}
接著來看replaceStaleEntry(key, value, staleSlot)
方法,該方法用于將要插入的數(shù)據(jù)放到合適的位置斥扛,并清除Hash表中過期的key和value入问。
private void replaceStaleEntry(ThreadLocal<?> key, Object value, int staleSlot) {
Entry[] tab = table;
int len = tab.length;
Entry e;
// 向前遍歷, 找到最前面的Entry不為空但是key被回收的位置
int slotToExpunge = staleSlot;
for (int i = prevIndex(staleSlot, len); (e = tab[i]) != null; i = prevIndex(i, len))
if (e.get() == null)
slotToExpunge = i;
// 從staleSlot下一個(gè)位置開始向后遍歷
for (int i = nextIndex(staleSlot, len); (e = tab[i]) != null; i = nextIndex(i, len)) {
ThreadLocal<?> k = e.get();
// 如果找到了與當(dāng)前key相同的Entry
// 需要將其與staleSlot位置上的Entry對(duì)換, 這樣能保證Hash表的順序
if (k == key) {
e.value = value;
tab[i] = tab[staleSlot];
tab[staleSlot] = e;
// slotToExpunge == staleSlot表示向前遍歷時(shí)沒有找到過期的key
if (slotToExpunge == staleSlot)
slotToExpunge = i;
cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
return;
}
// slotToExpunge == staleSlot表示向前遍歷時(shí)沒有找到過期的key
// 這里找到的是staleSlot后的第1個(gè)過期的位置, 將其賦值給slotToExpunge
// staleSlot位置本身不用清除, 因?yàn)橐谠撐恢蒙戏胖眯虏迦氲臄?shù)據(jù)
if (k == null && slotToExpunge == staleSlot)
slotToExpunge = i;
}
// 如果當(dāng)前要插入的key不存在, 那么新建一個(gè)Entry放到staleSlot位置上
tab[staleSlot].value = null;
tab[staleSlot] = new Entry(key, value);
// slotToExpunge != staleSlot表示有過期的key, 需要將它們清除
if (slotToExpunge != staleSlot)
cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
}
這里通過一個(gè)例子來分析replaceStaleEntry(key, value, staleSlot)
方法的運(yùn)行過程,為了便于分析稀颁,假設(shè)使用index = key % 10
來計(jì)算數(shù)據(jù)存放的位置芬失。初始時(shí)情況如下圖,假設(shè)當(dāng)前調(diào)用ThreadLocalMap.set(key, value)
方法插入一個(gè)key為13的數(shù)據(jù)匾灶,該數(shù)據(jù)的index應(yīng)為3棱烂,發(fā)現(xiàn)該位置上的key已經(jīng)過時(shí),則調(diào)用replaceStaleEntry(key, value, staleSlot)
方法阶女。
首先向前遍歷颊糜,找到最前面的Entry不為空但是key已經(jīng)過時(shí)的位置,示例中就是index為1的位置秃踩,因此slotToExpunge為1衬鱼。
隨后向后遍歷,嘗試尋找key與13相等的位置憔杨,發(fā)現(xiàn)table[4]
符合條件鸟赫,隨后將table[4]
與staleSlot位置的數(shù)據(jù)table[3]
交換,得到結(jié)果如下消别。
這里交換的意義在于維持線性探測法的特性抛蚤,如果不交換的話,table[3]
上的數(shù)據(jù)之后會(huì)被清除妖啥,就不滿足線性探測法的規(guī)律了霉颠。因?yàn)橹蟛檎襨ey為13的數(shù)據(jù)的話,定位到table[3]
會(huì)發(fā)現(xiàn)為空荆虱,而實(shí)際上key為13的數(shù)據(jù)在table[4]
上。
如果在replaceStaleEntry(key, value, staleSlot)
方法中找不到key與要插入數(shù)據(jù)相等的位置,就在staleSlot位置存放要插入的數(shù)據(jù)怀读。將數(shù)據(jù)存放到合適的位置后诉位,最后調(diào)用expungeStaleEntry(slotToExpunge)
將Hash表中過期的數(shù)據(jù)清除。
private int expungeStaleEntry(int staleSlot) {
Entry[] tab = table;
int len = tab.length;
// 清除staleSlot位置上的數(shù)據(jù)
tab[staleSlot].value = null;
tab[staleSlot] = null;
size--;
// 向后遍歷, 對(duì)各個(gè)位置上的數(shù)據(jù)重新hash, 直到Entry為null
Entry e;
int i;
for (i = nextIndex(staleSlot, len); (e = tab[i]) != null; i = nextIndex(i, len)) {
ThreadLocal<?> k = e.get();
if (k == null) {
e.value = null;
tab[i] = null;
size--;
} else {
int h = k.threadLocalHashCode & (len - 1);
if (h != i) {
tab[i] = null;
while (tab[h] != null)
h = nextIndex(h, len);
tab[h] = e;
}
}
}
return i;
}
③. 獲取數(shù)據(jù)
ThreadLocal獲取數(shù)據(jù)的邏輯比較簡單菜枷,直接調(diào)用了ThreadLocalMap的getEntry(key)
方法苍糠。
private Entry getEntry(ThreadLocal<?> key) {
int i = key.threadLocalHashCode & (table.length - 1);
Entry e = table[i];
if (e != null && e.get() == key)
return e;
else
return getEntryAfterMiss(key, i, e);
}
如果直接key直接命中第一個(gè)數(shù)據(jù),那么直接返回啤誊,否則調(diào)用getEntryAfterMiss()
方法岳瞭。方法內(nèi)遵循線性探測法去尋找對(duì)應(yīng)的Entry,在發(fā)現(xiàn)過時(shí)的數(shù)據(jù)時(shí)調(diào)用expungeStaleEntry()
方法進(jìn)行清理蚊锹,直到找到對(duì)應(yīng)key的Entry或遍歷到空數(shù)據(jù)瞳筏。
private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e) {
Entry[] tab = table;
int len = tab.length;
while (e != null) {
ThreadLocal<?> k = e.get();
if (k == key)
return e;
if (k == null)
expungeStaleEntry(i);
else
i = nextIndex(i, len);
e = tab[i];
}
return null;
}
4.3.3 預(yù)防ThreadLocal內(nèi)存泄漏
ThreadLocalMap內(nèi)部使用弱引用指向ThreadLocal,當(dāng)ThreadLocal被回收時(shí)牡昆,map中對(duì)應(yīng)的數(shù)據(jù)過時(shí)姚炕,Entry和value會(huì)殘留在map中造成內(nèi)存泄露。雖然在ThreadLocalMap下一次添加或刪除數(shù)據(jù)時(shí)就可能被清理丢烘,但也可能一直留在map中柱宦。
因此當(dāng)線程不需要某個(gè)ThreadLocal時(shí)需要手動(dòng)調(diào)用一下ThreadLocal.remove()
方法,該方法最終會(huì)調(diào)用ThreadLocalMap的remove(ThreadLocal<?> key)
方法去清理對(duì)應(yīng)的Entry播瞳。
private void remove(ThreadLocal<?> key) {
Entry[] tab = table;
int len = tab.length;
int i = key.threadLocalHashCode & (len-1);
for (Entry e = tab[i]; e != null; e = tab[i = nextIndex(i, len)]) {
if (e.get() == key) {
e.clear();
expungeStaleEntry(i);
return;
}
}
}
五掸刊、并發(fā)容器
由于ArrayList、HashMap等容器不支持并發(fā)赢乓,早期JDK提供了Vector痒给、Hashtable等一系列同步容器,它們實(shí)現(xiàn)的方法是對(duì)所有公有方法進(jìn)行同步骏全,使得同一時(shí)刻只有一個(gè)線程能訪問容器的狀態(tài)苍柏。通過Collections.synchronizedXxx()
方法,可以創(chuàng)建各個(gè)基礎(chǔ)容器的同步容器姜贡。
即使Vector這樣的容器對(duì)所有方法都進(jìn)行了同步试吁,但它還是不安全的,這種情況出現(xiàn)在對(duì)容器進(jìn)行復(fù)合操作時(shí)楼咳。以下面的程序?yàn)槔ê矗僭O(shè)A線程執(zhí)行getLast()
方法,在step1處掛起母怜,隨后B線程執(zhí)行完了整個(gè)deleteLast()
方法余耽,A線程繼續(xù)執(zhí)行時(shí)size的值已經(jīng)過時(shí)了,再執(zhí)行vector.get(size - 1)
就會(huì)出錯(cuò)苹熏。
public Object getLast(Vector vector) {
int size = vector.size();
// step1
return vector.get(size - 1);
}
public void deleteLast(Vector vector) {
int size = vector.size();
vector.remove(size - 1);
}
這種“先判斷后執(zhí)行”的操作稱為競態(tài)條件碟贾,再以Hashtable為例币喧,如果有2個(gè)線程執(zhí)行如下putIfNotExist(Object key, Object value)
方法,可能會(huì)導(dǎo)致覆蓋插入袱耽。要想解決這個(gè)問題杀餐,需要將整個(gè)復(fù)合操作進(jìn)行同步。
public void putIfNotExist(Object key, Object value) {
if (!hashtable.contains(key)) {
hashtable.put(key, value);
}
}
由于同步容器的效率不高且具備競態(tài)條件這樣的隱患朱巨,Java推出了并發(fā)容器來改進(jìn)同步容器的性能史翘,不同的并發(fā)容器對(duì)應(yīng)不同的場景,并且ConcurrentHashMap這樣的并發(fā)容器還封裝了常見的復(fù)合操作冀续。
5.1 CopyOnWriteArrayList
當(dāng)通過Iterator對(duì)容器進(jìn)行迭代時(shí)琼讽,如果有別的線程修改了容器,那么正在迭代的線程會(huì)拋出ConcurrentModificationException洪唐,這是一種“及時(shí)失敗”的機(jī)制钻蹬,用于通知用戶該處代碼存在隱患,想要避免該異常桐罕,需要在迭代過程中加鎖脉让。但是如果容器數(shù)量很大或者每個(gè)元素的操作時(shí)間很長,那么其余線程就會(huì)等待很久功炮。一種解決方案是在容器發(fā)生修改時(shí)克隆該容器溅潜,并在副本上進(jìn)行寫操作,期間的迭代操作都在原容器上進(jìn)行薪伏。
上述方法被稱為寫時(shí)拷貝CopyOnWrite(COW)滚澜,CopyOnWriteArrayList中通過變量array
保存實(shí)際的數(shù)組,當(dāng)容器即將發(fā)生變化(add, remove...)時(shí)克隆當(dāng)前容器嫁怀,并在副本上進(jìn)行增刪操作设捐,操作之后再將結(jié)果賦值給array
,以add(E e)
方法為例塘淑,相關(guān)代碼如下所示萝招。
public class CopyOnWriteArrayList<E>
implements List<E>, RandomAccess, Cloneable, java.io.Serializable {
private transient volatile Object[] array;
final void setArray(Object[] a) {
array = a;
}
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}
......
}
當(dāng)對(duì)CopyOnWriteArrayList迭代時(shí),通過iterator()
方法新建迭代器存捺,該迭代器遍歷的是當(dāng)前array
的快照槐沼。迭代器中已經(jīng)保存了引用,即使別的線程通過setArray(newElements)
方法修改了array
的引用也不會(huì)出現(xiàn)異常捌治。
public class CopyOnWriteArrayList<E>
implements List<E>, RandomAccess, Cloneable, java.io.Serializable {
public Iterator<E> iterator() {
return new COWIterator<E>(getArray(), 0);
}
static final class COWIterator<E> implements ListIterator<E> {
/** Snapshot of the array */
private final Object[] snapshot;
/** Index of element to be returned by subsequent call to next. */
private int cursor;
private COWIterator(Object[] elements, int initialCursor) {
cursor = initialCursor;
snapshot = elements;
}
......
}
......
}
根據(jù)CopyOnWriteArrayList的實(shí)現(xiàn)岗钩,可以發(fā)現(xiàn)該容器具有“弱一致性”,因?yàn)榈鷷r(shí)容器的內(nèi)容可能會(huì)發(fā)生變化肖油,如果一個(gè)場景能容忍短暫的數(shù)據(jù)不一致性兼吓,才適合使用該容器。例如通過CopyOnWriteArrayList保存監(jiān)聽器森枪,該場景對(duì)時(shí)效性的要求不高视搏,而且遍歷監(jiān)聽器并執(zhí)行是一個(gè)非常耗時(shí)的操作审孽。
5.2 ConcurrentHashMap
ConcurrentHashMap與HashMap一樣是一個(gè)基于散列的Map,JDK1.8中它采用CAS+Node鎖的同步方式來提供更高的并發(fā)性與伸縮性凶朗。ConcurrentHashMap的使用方法與HashMap相同瓷胧,并且封裝了一些常見的復(fù)合操作显拳,如下所示棚愤。
// 當(dāng)前key沒有對(duì)應(yīng)值時(shí)插入
public V putIfAbsent(K key, V value)
// 僅當(dāng)key對(duì)應(yīng)的值為value時(shí)才移除
public boolean remove(Object key, Object value)
// 僅當(dāng)key對(duì)應(yīng)的值為oldValue時(shí)才替換為newValue
public boolean replace(K key, V oldValue, V newValue)
// 僅當(dāng)key對(duì)應(yīng)到某個(gè)值時(shí)才替換為value
public V replace(K key, V value)
ConcurrentHashMap中的檢索操作一般不阻塞,因此可能與更新操作重疊杂数,檢索操作反映的是最近的更新操作完成的結(jié)果宛畦,換句話說,更新操作對(duì)檢索操作來說是可見的(happen-before)揍移。以get()
方法為例次和,代碼中并沒有進(jìn)行同步的地方,這是因?yàn)镹ode節(jié)點(diǎn)中的val和next字段都是volatile修飾的那伐,因此一個(gè)線程的更新操作是對(duì)其他線程可見的踏施。
public class ConcurrentHashMap<K,V> extends AbstractMap<K,V>
implements ConcurrentMap<K,V>, Serializable {
......
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode()); // 得到hash值
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
if ((eh = e.hash) == h) {
// 槽中的首節(jié)點(diǎn)就是要尋找的節(jié)點(diǎn)
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
// hash < 0表示正在擴(kuò)容
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;
......
}
}
ConcurrentHashMap中的更新操作是通過CAS+Node鎖的方式進(jìn)行同步的,以putVal(K key, V value, boolean onlyIfAbsent)
方法為例罕邀,其主體邏輯如下畅形。
① 判斷table是否已經(jīng)初始化,如果沒有則調(diào)用initTable()
方法
② 如果當(dāng)前槽為空诉探,則調(diào)用casTabAt(...)
插入數(shù)據(jù)
③ 如果當(dāng)前正在擴(kuò)容日熬,當(dāng)前線程也去參與擴(kuò)容(ConcurrentHashMap支持并發(fā)擴(kuò)容)
④ 根據(jù)當(dāng)前是鏈表還是樹插入對(duì)應(yīng)節(jié)點(diǎn)
可以發(fā)現(xiàn)putVal(...)
方法的主體邏輯是位于一個(gè)循環(huán)中的,這是一種CAS+自旋的思路肾胯。假設(shè)2個(gè)線程同時(shí)執(zhí)行到第②步竖席,成功的線程會(huì)退出循環(huán),失敗的線程會(huì)開始自旋直到插入成功或失敗敬肚。
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0) // 如果table為空則進(jìn)行初始化
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// 當(dāng)前槽為空毕荐,通過cas插入節(jié)點(diǎn),成功則退出循環(huán)
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
// 有hash值為MOVED的節(jié)點(diǎn)表示正在擴(kuò)容
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) { // 當(dāng)前槽上是鏈表
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key, value, null);
break;
}
}
}
else if (f instanceof TreeBin) { // 當(dāng)前槽上是樹節(jié)點(diǎn)
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}
擴(kuò)容Hash表是一個(gè)相對(duì)緩慢的操作艳馒,最好在新建時(shí)提供一個(gè)預(yù)估的大小initialCapacity來構(gòu)造憎亚。
六、線程生命周期
Java中線程的生命周期與操作系統(tǒng)的有所不同鹰溜,在操作系統(tǒng)中虽填,線程只有在真正獲取到CPU使用權(quán)時(shí)才屬于運(yùn)行狀態(tài),如果一個(gè)線程可以執(zhí)行但是沒獲取到CPU曹动,它則屬于就緒狀態(tài)斋日。而在Java中,如果一個(gè)線程在廣義上能夠運(yùn)行墓陈,它就是運(yùn)行狀態(tài)恶守。
Java將線程分為以下6種狀態(tài):
NEW(初始化狀態(tài)):新建了一個(gè)線程對(duì)象第献,但是還沒有調(diào)用線程的
start()
方法,此時(shí)線程只存在于JVM兔港,在操作系統(tǒng)中它還沒有被創(chuàng)建庸毫。RUNNABLE(運(yùn)行狀態(tài)):Java線程的RUNNABLE狀態(tài)表達(dá)的含義比較寬泛,它對(duì)應(yīng)操作系統(tǒng)中線程的就緒狀態(tài)衫樊、運(yùn)行狀態(tài)以及部分休眠狀態(tài)飒赃。
當(dāng)線程在搶占CPU的使用權(quán)時(shí),其對(duì)應(yīng)操作系統(tǒng)中的就緒狀態(tài)科侈;當(dāng)線程調(diào)用操作系統(tǒng)層面的阻塞式API時(shí)(例如I/O)载佳,其對(duì)應(yīng)操作系統(tǒng)中的休眠狀態(tài)。在Java中臀栈,這些狀態(tài)統(tǒng)稱為RUNNABLE蔫慧,JVM并不關(guān)心線程是否真的在運(yùn)行,只要當(dāng)前線程不在等待鎖权薯,也沒有被其他線程阻塞姑躲,那么它就是運(yùn)行狀態(tài)。BLOCKED(阻塞狀態(tài)):當(dāng)線程正在等待進(jìn)入synchronized代碼塊時(shí)盟蚣,該線程會(huì)從RUNNABLE狀態(tài)變?yōu)锽LOCKED狀態(tài)黍析;當(dāng)搶占到鎖時(shí),再從BLOCKED狀態(tài)轉(zhuǎn)換到RUNNABLE狀態(tài)刁俭。
WAITING(無時(shí)限等待):當(dāng)獲得鎖的線程主動(dòng)調(diào)用
Object.wait()
時(shí)橄仍,線程會(huì)從RUNNABLE狀態(tài)進(jìn)入WAITING狀態(tài)直到被喚醒。還有一種情況是線程調(diào)用LockSupport.park()
從RUNNABLE狀態(tài)進(jìn)入WAITING狀態(tài)牍戚,并發(fā)包中的Lock就是依賴LockSupport實(shí)現(xiàn)的侮繁,如果要將線程恢復(fù)到RUNNABLE狀態(tài),則需調(diào)用LockSupport.unpark(Thread thread)
方法如孝。TIMED_WAITING(有時(shí)限等待):TIMED_WAITING狀態(tài)與WAITING狀態(tài)唯一的區(qū)別是宪哩,線程調(diào)用方法進(jìn)入該狀態(tài)時(shí)多了時(shí)間參數(shù)。例如
Thread.sleep(long millis)
,Object.wait(long timeout)
,LockSupport.parkNanos(Object blocker, long deadline)
等方法第晰。TERMINATED(終止?fàn)顟B(tài)):線程的
run()
方法執(zhí)行完后锁孟,或者出現(xiàn)未捕獲的異常就會(huì)進(jìn)入TERMINATED狀態(tài)。如果想要主動(dòng)終止一個(gè)線程茁瘦,可以調(diào)用Thread.interrupt()
方法通知線程停止品抽,其他類似Thread.stop()
這樣的方法因?yàn)榘踩詥栴}已經(jīng)被棄用了。
如何正確地停止線程甜熔?
上面提到通過Thread.interrupt()
方法通知線程結(jié)束運(yùn)行圆恤,該方法并不會(huì)強(qiáng)行停止線程,它只是為線程添加了一個(gè)標(biāo)志位表示該線程應(yīng)該結(jié)束了腔稀。為什么這么做呢盆昙?因?yàn)槲覀兏M€程收到通知后進(jìn)行收尾工作再停止(例如釋放Lock)羽历,這樣可以保證共享資源狀態(tài)的一致性,更加安全淡喜,而不是像已被棄用的Thread.stop()
方法那樣強(qiáng)制停止線程秕磷。
下面來看如何處理中斷異常(InterruptedException)和停止線程,當(dāng)對(duì)線程調(diào)用Thread.interrupt()
方法后炼团,RUNNABLE狀態(tài)下的線程可以通過Thread.currentThread().isInterrupted()
檢查自己的中斷標(biāo)志位澎嚣,如果發(fā)現(xiàn)自己被中斷,則進(jìn)行退出工作们镜。但是如果線程處于WAITING狀態(tài)币叹,那么當(dāng)前線程會(huì)拋出異常并清除中斷標(biāo)志位润歉,以下方法可以響應(yīng)中斷并拋出中斷異常模狭。
Object.wait() / Object.wait(long) / Object.wait(long,int)
Thread.sleep(long) / Thread.sleep(long,int)
Thread.join() / Thread.join(long) / Thread.join(long,int)
java.util.concurrent.BlockingQueue.take() / put(E)
java.util.concurrent.locks.Lock.lockInterruptibly()
java.util.concurrent.CountDownLatch.await()
java.util.concurrent.CyclicBarrier.await()
java.util.concurrent.Rxchanger.exchange(V)
java.nio.channels.InterruptibleChannel相關(guān)方法
java.nio.channels.Selector的相關(guān)方法
對(duì)于RUNNABLE狀態(tài)下的線程,可以在循環(huán)中檢查自己的狀態(tài)踩衩,發(fā)現(xiàn)被中斷后進(jìn)行收尾工作嚼鹉。
Thread thread = new Thread(() -> {
while (!Thread.currentThread().isInterrupted()) {
// 業(yè)務(wù)邏輯
}
// 收尾工作
});
如果線程處于WAITING狀態(tài)并響應(yīng)了中斷異常,在方法內(nèi)的話一般選擇拋出驱富,讓上層處理锚赤;如果必須處理的話,則需要重置線程的中斷標(biāo)志褐鸥,以便讓線程正常退出线脚,如下所示。
Thread thread = new Thread(() -> {
while (!Thread.currentThread().isInterrupted()) {
// 業(yè)務(wù)邏輯
try {
Thread.sleep(500);
} catch (InterruptedException e) {
// 捕獲中斷異常會(huì)清除中斷標(biāo)志, 需要重新設(shè)置
Thread.currentThread().interrupt();
// 收尾工作...
}
}
});
七叫榕、死鎖
7.1 死鎖的產(chǎn)生條件
死鎖是指兩個(gè)線程都需要多個(gè)資源浑侥,但是每個(gè)線程只占有了其中一部分并請求另外的資源,此時(shí)就會(huì)產(chǎn)生循環(huán)等待晰绎,導(dǎo)致兩個(gè)線程都無法運(yùn)行寓落。在《操作系統(tǒng)》這門課中,歸納了4個(gè)產(chǎn)生死鎖的必要條件:
① 資源互斥:共享資源在同一時(shí)刻只能被一個(gè)線程占有荞下。
② 占有資源并等待:占有資源的線程不會(huì)釋放伶选,并請求其余資源。
③ 不可搶占:線程無法搶奪其余線程占有的資源尖昏。
④ 循環(huán)等待:每個(gè)線程都在等待其余線程占有的資源仰税。
7.2 死鎖的解決方式
7.2.1 使用Lock
使用synchronized關(guān)鍵字進(jìn)行同步時(shí),線程占有資源后不會(huì)釋放抽诉,如果在申請其他資源時(shí)阻塞陨簇,就有出現(xiàn)死鎖的風(fēng)險(xiǎn)。而使用Lock.tryLock()
方法掸鹅,線程嘗試獲取資源失敗時(shí)會(huì)直接返回false塞帐,開發(fā)人員可以使線程釋放之前占有的資源拦赠。也可以使用Lock.tryLock(long time, TimeUnit unit)
方法,讓線程嘗試在指定時(shí)間內(nèi)獲取某個(gè)資源葵姥,如果失敗則返回false荷鼠,開發(fā)人員可以根據(jù)返回值進(jìn)一步處理。
7.2.2 統(tǒng)一資源獲取順序
出現(xiàn)死鎖的代碼榔幸,一定是兩個(gè)線程循環(huán)等待允乐,這是因?yàn)閮蓚€(gè)線程獲取資源的順序不一樣,如下所示削咆。線程t1先嘗試獲取a資源牍疏,再嘗試獲取b資源;而線程t2先嘗試獲取b資源拨齐,再嘗試獲取a資源鳞陨,這就有死鎖的風(fēng)險(xiǎn)。如果這兩個(gè)線程獲取資源的順序相等瞻惋,就不可能發(fā)生死鎖了厦滤。
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
synchronized (a) {
......
synchronized (b) {
......
}
}
}
});
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
synchronized (b) {
......
synchronized (a) {
......
}
}
}
});
八、線程池
線程池遵循典型的"生產(chǎn)者-消費(fèi)者"模型歼狼,生產(chǎn)者是線程池的使用者掏导,消費(fèi)者是線程池本身。提到"生產(chǎn)者-消費(fèi)者"模型羽峰,我們很容易聯(lián)想到阻塞隊(duì)列趟咆,實(shí)際上線程池就是依賴阻塞隊(duì)列實(shí)現(xiàn)的。在線程池中梅屉,生產(chǎn)者提交的任務(wù)會(huì)被添加到阻塞隊(duì)列中值纱,如果阻塞隊(duì)列已滿,則生產(chǎn)者阻塞履植;消費(fèi)者會(huì)不斷從阻塞隊(duì)列中取出任務(wù)執(zhí)行计雌,如果阻塞隊(duì)列為空,則消費(fèi)者阻塞玫霎。
利用3.2.1中實(shí)現(xiàn)的BlockQueue可以實(shí)現(xiàn)一個(gè)簡單的線程池凿滤,代碼如下所示。在測試時(shí)庶近,新建了一個(gè)具有5個(gè)工作線程的線程池翁脆,運(yùn)行后發(fā)現(xiàn)各任務(wù)執(zhí)行的順序是不確定的,因此線程池一般不用于執(zhí)行一系列耦合的任務(wù)鼻种。不過如果將工作線程數(shù)量設(shè)為1個(gè)的話反番,任務(wù)的執(zhí)行順序就是確定的,當(dāng)需要執(zhí)行一連串耦合的任務(wù)時(shí),可以新建只有1個(gè)工作線程的線程池進(jìn)行處理罢缸。
public class SampleTreadPool {
private BlockingQueue<Runnable> mRunnableQueue;
private List<Worker> mWorkers = new ArrayList<>();
public SampleTreadPool(int threadSize, BlockingQueue<Runnable> queue) {
mRunnableQueue = queue;
for (int i = 0; i < threadSize; i++) {
Worker worker = new Worker();
worker.start();
mWorkers.add(worker);
}
}
public void execute(Runnable r) {
mRunnableQueue.offer(r);
}
private class Worker extends Thread {
@Override
public void run() {
while (true) {
Runnable r = mRunnableQueue.take();
r.run();
}
}
}
// 測試
public static void main(String[] args) {
BlockingQueue<Runnable> blockingQueue = new BlockingQueue<>(20);
SampleTreadPool treadPool = new SampleTreadPool(5, blockingQueue);
for (int i = 0; i < 50; i++) {
int t = i;
treadPool.execute(() -> {
System.out.println("execute: " + t);
});
}
}
}
上述線程池顯然無法投入實(shí)際使用篙贸,因?yàn)樗嬖诤芏鄦栴}:
① 在阻塞隊(duì)列已滿的情況下提交任務(wù),該線程池會(huì)阻塞主線程枫疆。雖然可以使用無界的阻塞隊(duì)列爵川,但是當(dāng)任務(wù)數(shù)量過多時(shí),可能會(huì)造成OOM息楔。
② 該線程池中工作線程的數(shù)量是固定的寝贡,我們希望能設(shè)置工作線程數(shù)量的最大值和最小值,讓它能夠隨系統(tǒng)的運(yùn)行情況靈活地增加或減少值依。
③ 當(dāng)線程運(yùn)行出現(xiàn)異常時(shí)圃泡,線程池沒有容錯(cuò)機(jī)制。
④ 該線程池只能提交Runnable這種無返回值的任務(wù)愿险,無法處理Future<T>這類有返回值的任務(wù)颇蜡。
8.1 Java線程池的使用
Java線程池中最核心的實(shí)現(xiàn)是ThreadPoolExecutor,它定義了線程池的5種狀態(tài)如下拯啦。
- RUNNING: 線程池正在運(yùn)行澡匪,接受新任務(wù)并處理已經(jīng)提交的任務(wù)。
- SHUTDOWN: 線程池即將停止褒链,此時(shí)不接受新任務(wù),但是會(huì)處理已經(jīng)提交的任務(wù)疑苔。調(diào)用線程池的
shutdown()
方法會(huì)使其狀態(tài)從RUNNING變?yōu)镾HUTDOWN甫匹。 - STOP: 線程池即將停止,此時(shí)不接受新任務(wù)惦费,也不處理已經(jīng)提交的任務(wù)兵迅,并會(huì)中斷正在處理的任務(wù)。調(diào)用線程池的
shutdownNow()
方法會(huì)使其狀態(tài)從(RUNNING/SHUTDOWN)變?yōu)镾TOP薪贫。 - TIDYING: 線程池已經(jīng)停止運(yùn)行恍箭,工作線程為0,此時(shí)會(huì)執(zhí)行用戶重載的
terminated()
函數(shù)扎唾。 - TERMINATED:
terminated()
函數(shù)運(yùn)行完畢新翎,線程池徹底停止拓型。
8.1.1 新建線程池
下面來看如何新建一個(gè)線程池,ThreadPoolExecutor中重載了4個(gè)構(gòu)造函數(shù)交洗,最完整的構(gòu)造函數(shù)有7個(gè)參數(shù),如下所示橡淑。
/**
* @param corePoolSize: 核心線程數(shù)量构拳,指線程池中最少的線程數(shù)量,即使這些線程空閑
* 但如果設(shè)置了 allowCoreThreadTimeOut, 核心線程也可以回收
* @param maximumPoolSize: 線程池中允許存在的最大線程數(shù)
* @param keepAliveTime: 當(dāng)前線程數(shù)量大于核心線程數(shù)量時(shí)
* 如果等待該時(shí)間后仍然沒有新任務(wù), 則回收空閑線程
* @param unit: keepAliveTime參數(shù)的單位
* @param workQueue: 任務(wù)隊(duì)列, 這是一個(gè)阻塞隊(duì)列
* 用于保存用戶提交但尚未執(zhí)行的 Runnable
* @param threadFactory: 線程池新建線程時(shí)使用的 ThreadFactory
* @param handler: 任務(wù)拒絕策略, 用于在線程數(shù)量和任務(wù)隊(duì)列都已滿時(shí)拒絕新任務(wù)
* 線程池提供了以下四種策略:
* CallerRunsPolicy: 提交任務(wù)的線程自己執(zhí)行
* AbortPolicy: 默認(rèn)策略, 拋出RejectedExecutionException
* DiscardPolicy: 靜默丟棄任務(wù)
* DiscardOldestPolicy: 丟棄任務(wù)隊(duì)列中最老的任務(wù)并添加新任務(wù)
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
......
}
為了避免OOM,在新建線程池時(shí)置森,強(qiáng)烈建議指定線程池的maximumPoolSize斗埂,否則在任務(wù)繁忙時(shí)會(huì)出現(xiàn)線程數(shù)暴增的情況;同時(shí)也建議使用有界的阻塞隊(duì)列凫海,不然阻塞隊(duì)列的無限制增長也會(huì)增加OOM的風(fēng)險(xiǎn)蜜笤。
8.1.2 提交任務(wù)
ExecutorService接口中定義了線程池提交任務(wù)的方法,如下所示盐碱。
// 提交一個(gè)Runnable, 該任務(wù)無返回值, 也無法查詢它的運(yùn)行情況
void execute(Runnable command);
// 提交一個(gè)Callable, 該任務(wù)有返回值, 并且可以查詢它的運(yùn)行情況
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
// 提交一個(gè)Runnable任務(wù)把兔,可以通過返回的Future查詢其運(yùn)行情況,返回null時(shí)表示運(yùn)行完畢
Future<?> submit(Runnable task);
// 執(zhí)行給定的任務(wù), 當(dāng)所有任務(wù)完成(或超時(shí)), 返回一個(gè)保存其狀態(tài)和結(jié)果的Future列表
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks);
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit);
// 執(zhí)行給定的任務(wù)瓮顽,當(dāng)任一任務(wù)完成(或超時(shí)), 返回對(duì)應(yīng)結(jié)果, 其余任務(wù)會(huì)被取消
<T> T invokeAny(Collection<? extends Callable<T>> tasks);
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit);
需要注意的是县好,如果調(diào)用Future.get()
方法,調(diào)用線程會(huì)被阻塞暖混,直到Future任務(wù)返回結(jié)果缕贡,因此可以選擇Future.get(long timeout, TimeUnit unit)
設(shè)置最大等待時(shí)間,也可以在獲取結(jié)果前先調(diào)用Future.isDone()
查詢其運(yùn)行情況拣播。
8.1.3 關(guān)閉線程池
在介紹線程池的狀態(tài)時(shí)我們提到了shutdown()
和shutdownNow()
這兩個(gè)方法晾咪,這兩個(gè)方法都用于關(guān)閉線程池,不過處理上有所不同:shutdown()
方法會(huì)讓線程池不再接受新的任務(wù)贮配,但是會(huì)繼續(xù)處理已經(jīng)開始運(yùn)行的任務(wù)谍倦;而shutdownNow()
方法不僅會(huì)讓線程池不再接受新的任務(wù),也會(huì)去中斷當(dāng)前正在執(zhí)行任務(wù)的線程泪勒。
關(guān)閉線程池可以使用如下的兩段式關(guān)閉法昼蛀,調(diào)用shutdown()
后等待尚未完成的任務(wù)繼續(xù)運(yùn)行一段時(shí)間,如果等待后還沒運(yùn)行完圆存,則調(diào)用shutdownNow()
中斷這些線程叼旋。
這里用到了awaitTermination()
方法,它會(huì)阻塞一段時(shí)間沦辙,直到線程池中的任務(wù)全部運(yùn)行完或者等待超時(shí)夫植,如果線程池的所有任務(wù)運(yùn)行完則返回true,否則返回false油讯。需要注意的是详民,該方法會(huì)阻塞調(diào)用線程。
void shutdownAndAwaitTermination(ExecutorService pool) {
// 拒絕新任務(wù)的提交
pool.shutdown();
try {
// 等待一段時(shí)間, 讓當(dāng)前任務(wù)繼續(xù)執(zhí)行
if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
// 嘗試中斷正在運(yùn)行的線程
pool.shutdownNow();
// 等待一段時(shí)間, 讓尚未結(jié)束任務(wù)的線程響應(yīng)中斷
if (!pool.awaitTermination(60, TimeUnit.SECONDS))
System.err.println("Pool did not terminate");
}
} catch (InterruptedException ie) {
// 如果當(dāng)前線程被中斷了, 再次嘗試關(guān)閉線程池
pool.shutdownNow();
// 重新設(shè)置中斷標(biāo)志位
Thread.currentThread().interrupt();
}
}
8.2 如何確定線程池的參數(shù)
新建ThreadPoolExecutor時(shí)需要傳入多個(gè)參數(shù)撞羽,包括核心線程數(shù)阐斜、最大線程數(shù)、空閑線程保留時(shí)間以及阻塞隊(duì)列等诀紊,實(shí)際開發(fā)中應(yīng)該根據(jù)具體的業(yè)務(wù)類型來確定這一系列的參數(shù)谒出。
在項(xiàng)目中并不建議將所有的任務(wù)都放到一個(gè)線程池中去執(zhí)行隅俘,可以根據(jù)任務(wù)場景新建CPU線程池、IO線程池等笤喳,CPU線程池的大小可以固定為(CPU核心數(shù)+1)为居,而IO線程池的大小可以預(yù)設(shè)為(2 * CPU核心數(shù)+1),之后再根據(jù)IO的吞吐量進(jìn)行調(diào)整杀狡。美團(tuán)的技術(shù)博客Java線程池實(shí)現(xiàn)原理及其在美團(tuán)業(yè)務(wù)中的實(shí)踐介紹了動(dòng)態(tài)修改線程池參數(shù)的實(shí)現(xiàn)蒙畴,可以根據(jù)線程池的運(yùn)行情況不斷調(diào)整參數(shù),保證系統(tǒng)的吞吐量呜象。