layout: post
title: 《Java并發(fā)編程的藝術(shù)》筆記
categories: Java
excerpt: The Art of Java Concurrency Programming.
<img src="http://upload-images.jianshu.io/upload_images/658453-a94405da52987372.jpg?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240" width="70%">
好記性不如爛筆頭。多讀多思考准颓。
基本概念 & Java 并發(fā)機制的底層實現(xiàn)原理
上下文切換:CPU在任務(wù)切換前會保存前一個任務(wù)的狀態(tài)该默,以便下次切換回這個任務(wù)時,可以再加載這個任務(wù)的狀態(tài)。所以任務(wù)從保存到再加載的過程就是一次任務(wù)切換。
內(nèi)存屏障:一組處理器指令,用于實現(xiàn)對內(nèi)存操作的順序限制。
鎖的升級
現(xiàn)在我們應(yīng)該知道哪轿,Synchronized 是通過對象內(nèi)部的一個叫做監(jiān)視器鎖(monitor)來實現(xiàn)的。但是監(jiān)視器鎖本質(zhì)又是依賴于底層的操作系統(tǒng)的 Mutex Lock 來實現(xiàn)的翔怎。而操作系統(tǒng)實現(xiàn)線程之間的切換這就需要從用戶態(tài)轉(zhuǎn)換到核心態(tài)窃诉,這個成本非常高,狀態(tài)之間的轉(zhuǎn)換需要相對比較長的時間赤套,這就是為什么 Synchronized 效率低的原因飘痛。因此,這種依賴于操作系統(tǒng) Mutex Lock 所實現(xiàn)的鎖我們稱之為“重量級鎖”容握。JDK 中對 Synchronized 做的種種優(yōu)化宣脉,其核心都是為了減少這種重量級鎖的使用。JDK1.6 以后剔氏,為了減少獲得鎖和釋放鎖所帶來的性能消耗塑猖,提高性能,引入了“輕量級鎖”和“偏向鎖”谈跛。
每一個線程在準(zhǔn)備獲取共享資源時:
已經(jīng)獲取偏向鎖的線程為線程1羊苟, 新線程為:線程2
第一步,線程2檢查MarkWord里面是不是放的自己的ThreadId ,如果是感憾,表示當(dāng)前線程是處于 “偏向鎖” 蜡励,就可以直接執(zhí)行方法體了。
第二步阻桅,如果MarkWord不是自己的ThreadId, 用CAS來執(zhí)行切換凉倚,如果不成功,線程2根據(jù)MarkWord里現(xiàn)有的ThreadId嫂沉,通知之前線程暫停稽寒,之前線程將Markword的內(nèi)容置為空。 (線程1的同步體執(zhí)行完后 會根據(jù)線程2的請求趟章,暫停線程杏糙,置空markword里面的線程ID)
第三步,這樣線程2就以輕量級的鎖機制工作尤揣,如果這時線程3進入搔啊,就會進入自旋模式等待鎖
第四步柬祠,自旋的線程3在自旋過程中北戏,成功獲得資源(即之前獲的資源的線程執(zhí)行完成并釋放了共享資源),則整個狀態(tài)依然處于 輕量級鎖的狀態(tài)漫蛔,如果自旋失敗 嗜愈,即自旋時間結(jié)束旧蛾,仍然沒有獲取輕量級鎖,進入重量級鎖蠕嫁。
第五步锨天,線程3進入重量級鎖,將對象的markword修改為指向重量級鎖的指針剃毒,線程2執(zhí)行為同步體病袄,修改Markword時,會失敗赘阀,這樣線程2就會意識到進入重量級鎖了益缠,
第六步,線程2釋放鎖基公,通知重量級鎖喚醒阻塞隊列幅慌。
輕量級鎖是為了在線程交替執(zhí)行同步塊時提高性能,而偏向鎖則是在只有一個線程執(zhí)行同步塊時進一步提高性能轰豆。
處理器實現(xiàn)原子操作的方式:總線鎖(鎖住整個內(nèi)存)胰伍;緩存鎖(在處理器內(nèi)部緩存中實現(xiàn)原子操作,使其他處理器不能緩存 i 的緩存行)酸休。
Java 實現(xiàn)原子操作的方式:鎖和循環(huán) CAS(Compare and Swap 比較并交換)骂租;CAS 利用了處理器的 CMPXCHG 指令(該指令是原子的)。
除了偏向鎖斑司,JVM 實現(xiàn)鎖的方式都用了循環(huán) CAS菩咨,即當(dāng)一個線程想進入同步塊的時候使用循環(huán) CAS 的方式來獲取鎖,當(dāng)它退出同步塊的時候使用循環(huán) CAS 釋放鎖陡厘。
// 循環(huán)CAS
public final int incrementAndGet() {
for (;;) {
int current = get();
int next = current + 1;
if (compareAndSet(current, next))
return next;
}
}
Java內(nèi)存模型
3個同步原語:synchronized,volatile云茸,final标捺;
并發(fā)編程的兩個關(guān)鍵問題:線程間通信和線程間同步亡容;
在共享內(nèi)存的并發(fā)模型中冤今,線程之間共享內(nèi)存的公共狀態(tài)闺兢,通過讀-寫內(nèi)存的公共狀態(tài)進行隱式通信。在消息傳遞的并發(fā)模型中戏罢,線程之間沒有公共狀態(tài)屋谭,必須通過發(fā)送消息來顯式進行通信脚囊。
同步是指用于控制不同線程間操作發(fā)生相對順序的機制。在共享內(nèi)存并發(fā)模型里桐磁,同步是顯式進行的——程序員需要顯式指定某個方法或某段代碼需要在線程間互斥執(zhí)行悔耘。在消息傳遞的并發(fā)模型里,由于消息的發(fā)送必須在消息的接收之前我擂,因此同步是隱式進行的衬以。
Java的并發(fā)采用的是共享內(nèi)存模型,所以Java線程之間的通信總是隱式進行校摩。
Java內(nèi)存模型(JMM):
(本地內(nèi)存是JMM的一個抽象概念泄鹏,并不真實存在。它涵蓋了緩存、寫緩沖區(qū)车猬、寄存器以及其他硬件和編譯器優(yōu)化瘫辩。(不完全是內(nèi)存,也不完全是Cache))
從上圖來看军熏,線程A與線程B之間如要通信的話摩幔,必須要經(jīng)歷下面2個步驟:
- 首先,線程A把本地內(nèi)存A中更新過的共享變量刷新到主內(nèi)存中去。
- 然后,線程B到主內(nèi)存中去讀取線程A之前已更新過的共享變量。
重要概念:重排序,編譯器重排序和處理器重排序详囤,為了提高并行度该贾。
數(shù)據(jù)依賴:寫后讀,寫后寫,讀后寫卵皂;這3種情況添祸,只要重排序兩個操作的執(zhí)行順序,程序的執(zhí)行結(jié)果就會改變亚侠;所以重排序時會遵守數(shù)據(jù)依賴性铜幽,不會改變存在數(shù)據(jù)依賴關(guān)系的兩個操作的執(zhí)行順序狮杨。
控制依賴:由于處理器會采用分支預(yù)測技術(shù)來提高并行度喘漏,i = a * a
可能會被重排序到if (flag)
之前執(zhí)行——這在單線程中是沒問題的,但在多線程環(huán)境下就可能改變程序的執(zhí)行結(jié)果。
if (flag) {
i = a * a;
}
as-if-serial語義:不管怎么重排序姨涡,單線程程序的執(zhí)行結(jié)果不能被改變检诗。
happens-before
JSR-133使用happens-before的概念來闡述操作之間的內(nèi)存可見性。在JMM中鉴象,如果一個操作執(zhí)行的結(jié)果需要對另一個操作可見,那么這兩個操作之間必須要存在happens-before關(guān)系。這里提到的兩個操作既可以是在一個線程之內(nèi)稽犁,也可以是在不同線程之間来屠。
happen-before的定義如下:
- 如果一個操作happens-before另一個操作传趾,那么第一個操作的執(zhí)行結(jié)果將對第二個操作可見,而且第一個操作的執(zhí)行順序排在第二個操作之前
- 兩個操作之間存在happens-before關(guān)系,并不意味著Java平臺的具體實現(xiàn)必須要按照happens-before關(guān)系指定的順序來執(zhí)行劫恒。如果重排序之后的執(zhí)行結(jié)果與按照原來那種happens-before關(guān)系執(zhí)行的結(jié)果一致,那么JMM允許編譯器和處理器進行這種重排序
as-if-serial語義保證單線程內(nèi)的程序執(zhí)行結(jié)果不會改變,happens-before保證正確同步的多線程程序的執(zhí)行結(jié)果不會被改變喧务。
總共有六條規(guī)則:
- 程序順序規(guī)則:一個線程中的每個操作孽亲,happens-before于隨后該線程中的任意后續(xù)操作
- 監(jiān)視器鎖規(guī)則:對一個鎖的解鎖,happens-before于隨后對這個鎖的獲取
- volatile變量規(guī)則:對一個volatile域的寫,happens-before于對這個變量的讀
- 傳遞性:如果A happens-before B,B happens-before C,那么A happens-before C
- start規(guī)則:如果線程A執(zhí)行線程B的start方法,那么線程A的ThreadB.start()happens-before于線程B的任意操作
- join規(guī)則:如果線程A執(zhí)行線程B的join方法,那么線程B的任意操作happens-before于線程A從TreadB.join()方法成功返回。
順序一致性內(nèi)存模型:順序一致性內(nèi)存模型是一個被計算機科學(xué)家理想化了的理論參考模型电湘,它為程序員提供了極強的內(nèi)存可見性保證瘾晃。順序一致性內(nèi)存模型有兩大特性:
- 一個線程中的所有操作必須按照程序的順序來執(zhí)行。
- (不管程序是否同步)所有線程都只能看到一個單一的操作執(zhí)行順序妹沙。在順序一致性內(nèi)存模型中,每個操作都必須原子執(zhí)行且立刻對所有線程可見牵寺。
順序一致性內(nèi)存模型的視圖:
在JMM中悍引,臨界區(qū)內(nèi)的代碼可以重排序,但不允許臨界區(qū)內(nèi)的代碼“溢出”到臨界區(qū)之外帽氓,那樣會破壞監(jiān)視器的內(nèi)存語義趣斤。
JMM保證:單線程程序和正確同步的多線程程序的執(zhí)行結(jié)果與在順序一致性內(nèi)存模型中的執(zhí)行結(jié)果相同。
volatile
對volatile變量的單個讀寫藤树,可以看成是使用了同一個鎖對這些單個讀寫作了同步。(這樣册养,即使是64位的long/double型變量,只要用volatile修飾孙乖,對該變量的讀寫就具有了原子性店归。注意遇汞,++這種復(fù)合操作依舊不具有原子性白对。)
volatile變量自身的特性:
- 可見性。對一個volatile變量的讀,總是能看到(任意線程)對這個volatile變量最后的寫入饥脑。
- 原子性:對任意單個volatile變量的讀/寫具有原子性,但類似于volatile++這種復(fù)合操作不具有原子性问词。
volatile的內(nèi)存語義(對內(nèi)存可見性的影響)
- 當(dāng)寫一個volatile變量時,JMM會把該線程對應(yīng)的本地內(nèi)存中的共享變量刷新到主內(nèi)存派任。
- 當(dāng)讀一個volatile變量時心傀,JMM會把該線程對應(yīng)的本地內(nèi)存置為無效。線程接下來將從主內(nèi)存中讀取共享變量扎唾。
當(dāng)?shù)诙€操作是volatile寫時召川,不管第一個操作是什么,都不能重排序胸遇。這個規(guī)則確保volatile寫之前的操作不會被編譯器重排序到volatile寫之后荧呐。
當(dāng)?shù)谝粋€操作是volatile讀時,不管第二個操作是什么纸镊,都不能重排序坛增。這個規(guī)則確保volatile讀之后的操作不會被編譯器重排序到volatile讀之前。
當(dāng)?shù)谝粋€操作是volatile寫薄腻,第二個操作是volatile讀時收捣,不能重排序。
鎖的內(nèi)存語義
眾所周知庵楷,鎖可以讓臨界區(qū)互斥執(zhí)行罢艾;但鎖有一個同樣重要,但常常被忽視的功能:鎖的內(nèi)存語義尽纽。
- 當(dāng)線程釋放鎖時咐蚯,JMM會把該線程對應(yīng)的本地內(nèi)存中的共享變量刷新到主內(nèi)存中
- 當(dāng)線程獲取鎖時,JMM會把該線程對應(yīng)的本地內(nèi)存置為無效弄贿。從而使得被監(jiān)視器保護的臨界區(qū)代碼必須要從主內(nèi)存中去讀取共享變量
對比鎖釋放-獲取的內(nèi)存語義與volatile寫-讀的內(nèi)存語義春锋,可以看出:鎖釋放與volatile寫有相同的內(nèi)存語義;鎖獲取與volatile讀有相同的內(nèi)存語義差凹。
final的內(nèi)存語義
- JMM禁止編譯器把final域的寫重排序到構(gòu)造函數(shù)之外(對普通域的寫可能被重排序到構(gòu)造函數(shù)之外期奔!)
- 在一個線程中,初次讀對象引用與初次讀該對象包含的final域危尿,JMM禁止處理器重排序這兩個操作(這兩個操作之間存在間接依賴呐萌,大多數(shù)處理器會遵守間接依賴,不會重排序這兩個操作谊娇,但有少數(shù)處理器不遵守間接依賴關(guān)系肺孤,這個規(guī)則就是專門用來針對這種處理器的)
如果final域是引用類型:
public class FinalReferenceExample {
final int[] intArray; //final是引用類型
static FinalReferenceExample obj;
public FinalReferenceExample () { //構(gòu)造函數(shù)
intArray = new int[1]; //1
intArray[0] = 1; //2
}
public static void writerOne () { //寫線程A執(zhí)行
obj = new FinalReferenceExample (); //3
}
...
}
這里final域為一個引用類型,它引用一個int型的數(shù)組對象。對于引用類型赠堵,寫final域的重排序規(guī)則對編譯器和處理器增加了如下約束:
在構(gòu)造函數(shù)內(nèi)對一個final引用的對象的成員域的寫入小渊,與隨后在構(gòu)造函數(shù)外把這個被構(gòu)造對象的引用賦值給一個引用變量,這兩個操作之間不能重排序茫叭。
在上圖中酬屉,1是對final域的寫入坞靶,2是對這個final域引用的對象的成員域的寫入默伍,3是把被構(gòu)造的對象的引用賦值給某個引用變量。這里除了前面提到的1不能和3重排序外劫樟,2和3也不能重排序吗垮。
為什么final引用不能從構(gòu)造函數(shù)內(nèi)“逸出”
前面我們提到過垛吗,寫final域的重排序規(guī)則可以確保:在引用變量為任意線程可見之前,該引用變量指向的對象的final域已經(jīng)在構(gòu)造函數(shù)中被正確初始化過了(構(gòu)造函數(shù)完成烁登,對象引用才會產(chǎn)生)怯屉。其實要得到這個效果,還需要一個保證:在構(gòu)造函數(shù)內(nèi)部饵沧,不能讓這個被構(gòu)造對象的引用為其他線程可見锨络,也就是對象引用不能在構(gòu)造函數(shù)中“逸出”。為了說明問題狼牺,讓我們來看下面示例代碼:
public class FinalReferenceEscapeExample {
final int i;
static FinalReferenceEscapeExample obj;
public FinalReferenceEscapeExample () {
i = 1; //1 寫final域
obj = this; //2 this引用在此“逸出”
}
public static void writer() {
new FinalReferenceEscapeExample ();
}
public static void reader {
if (obj != null) { //3
int temp = obj.i; //4
}
}
}
這里1和2可能會發(fā)生重排序羡儿,導(dǎo)致final域在被正確初始化之前對象引用就暴露了,從而在線程B的reader中訪問到未初始化的final域是钥。
JSR-133為什么要增強final的語義
在舊的Java內(nèi)存模型中 掠归,最嚴(yán)重的一個缺陷就是線程可能看到final域的值會改變。比如悄泥,一個線程當(dāng)前看到一個整形final域的值為0(還未初始化之前的默認(rèn)值)虏冻,過一段時間之后這個線程再去讀這個final域的值時,卻發(fā)現(xiàn)值變?yōu)榱?(被某個線程初始化之后的值)弹囚。最常見的例子就是在舊的Java內(nèi)存模型中厨相,String的值可能會改變。
為了修補這個漏洞鸥鹉,JSR-133專家組增強了final的語義蛮穿。通過為final域增加寫和讀重排序規(guī)則,可以為java程序員提供初始化安全保證:只要對象是正確構(gòu)造的(被構(gòu)造對象的引用在構(gòu)造函數(shù)中沒有“逸出”)宋舷,那么不需要使用同步(指lock和volatile的使用)绪撵,就可以保證任意線程都能看到這個final域在構(gòu)造函數(shù)中被初始化之后的值。
雙重檢查鎖定與延遲初始化
延遲初始化:推遲一些高開銷的對象初始化操作祝蝠,并且只有在使用這些對象時才進行初始化。
private static Instance instance;
public synchronized static Instance getInstance() {
if (instance == null) {
instance = new Instance();
}
return instance;
}
上面的方法雖然線程安全,但用synchronized將導(dǎo)致性能開銷绎狭。
一個“聰明”的技巧:雙重檢查鎖定:
public class DoubleCheckLocking {
private static Instance instance;
public static Instance getInstance() {
if (instance == null) {
synchronized(DoubleCheckLocking.class) {
if (instance == null) {
instance = new Instance(); // 問題的根源出在這里
}
}
}
return instance;
}
}
創(chuàng)建對象的過程instance = new Instance()可以分解為以下三步:
- memory = allocate(); // 分配對象的內(nèi)存空間
- ctorInstance(memory); // 初始化對象
- instance = memory; // 返回對象地址
- 初次訪問對象
其中细溅,2和3可能會被重排序!重排序之后變成了:分配對象內(nèi)存空間儡嘶,返回對象地址喇聊,初始化對象;(在單線程內(nèi)蹦狂,只要保證2排在4的前面執(zhí)行誓篱,單線程內(nèi)的執(zhí)行結(jié)果就不會被改變,這個重排序就是被允許的)
在多線程環(huán)境下凯楔,假設(shè)2和3發(fā)生重排序窜骄,那么一個未初始化的對象引用將從同步塊中“溢出”,另一個線程可能會通過instance訪問到這個未初始化的對象摆屯!
解決方案:
1邻遏,利用volatile的內(nèi)存語義來禁止重排序
private volatile static Instance instance;
根據(jù)volatile寫的內(nèi)存語義:volatile寫之前的操作禁止被重排序到volatile寫之后。這樣上面2和3之間的重排序?qū)唤古捌铮瑔栴}根源得到解決准验。
2,利用類初始化的原子性
在執(zhí)行類的初始化期間廷没,JVM會去獲取一個鎖糊饱。這個鎖可以同步多個線程對同一個類的初始化。
public class InstanceFactory {
private static class InstanceHolder {
public static Instance instance = new Instance();
}
public static Instance getInstance() {
return InstanceHolder.instance ; // 這里將導(dǎo)致 InstanceHolder 類被初始化
}
}
Java并發(fā)編程基礎(chǔ)
設(shè)置線程優(yōu)先級時颠黎,針對頻繁阻塞(休眠或IO操作)的線程需要設(shè)置較高的優(yōu)先級另锋,而偏重計算的線程則設(shè)置較低的優(yōu)先級,確保處理器不會被獨占盏缤。
線程狀態(tài)變遷
可參考 鏈接
疑惑:貌似可以從等待態(tài)直接回到就緒/運行態(tài)砰蠢,WHY / HOW?
另唉铜,書上一句話:
阻塞狀態(tài)是線程阻塞在進入 synchronized 同步代碼塊或方法(獲取鎖)時的狀態(tài)台舱,但是阻塞在 java.concurrent 包中 Lock 接口的線程狀態(tài)卻是等待狀態(tài),因為 java.concurrent 包中的 Lock 接口對于阻塞的實現(xiàn)均使用了 LockSupport 類中的相關(guān)方法潭流。
中斷
中斷可以理解為線程的一個標(biāo)識位屬性竞惋,它表示一個線程是否被其他線程進行了中斷操作。
調(diào)用一個線程對象的interrupt()方法灰嫉,只是將該線程的中斷標(biāo)識位設(shè)為true拆宛,并不是真的“中斷“了該線程。這個地方很容易迷惑人讼撒。
一個被中斷的線程(被調(diào)用了interrupt()方法)如何響應(yīng)中斷完全取決于該線程本身浑厚。
線程有兩種方法來判斷自己是否被中斷:
- 實例方法isInterrupted()股耽,返回true/false,不對中斷標(biāo)識位復(fù)位钳幅;
- 靜態(tài)方法Thread.interrupted()物蝙,返回true/false,同時對中斷標(biāo)識位進行復(fù)位敢艰;
Object.wait()诬乞,Thread.sleep(),Thread.join()等方法均聲明拋出InterruptedException異常钠导,說明這些方法是可中斷的——這些方法在執(zhí)行時會不斷輪詢監(jiān)聽中斷標(biāo)識位震嫉,當(dāng)發(fā)現(xiàn)其為true時,會恢復(fù)中斷標(biāo)識位(即設(shè)為false)牡属,并拋出InterruptedException異常票堵。
進入synchronized塊和Lock.lock()等操作是不可被中斷的(不拋出中斷異常)。
安全地終止線程
輪詢中斷標(biāo)識位湃望,或另設(shè)一個標(biāo)志:
public class Runner implements Runnable {
private volatile boolean on = true;
private long i;
@Override
public void run() {
while (on && !Thread.currentThread().isInterrupted()) {
i++;
}
System.out.println("Count i = " + i);
}
public void cancel() {
on = false;
}
}
Runner one = new Runner();
Thread t1 = new Thread(one);
t1.start();
...
t1.interrupt();
Runner two = new Runner();
new Thread(two).start();
...
two.cancel();
等待/通知機制
等待/通知的經(jīng)典范式:
synchronized(obj) {
while(條件不滿足) {
obj.wait();
}
處理邏輯换衬;
}
synchronized(obj) {
改變條件;
obj.notifyAll();
}
在while循環(huán)中判斷條件并調(diào)用wait()是使用wait()的唯一正確方式——這樣能保證線程在睡眠前后都會檢查條件证芭。
wait()返回的前提是當(dāng)前線程獲得鎖瞳浦;返回后從wait()處繼續(xù)執(zhí)行。
注意一點:wait()會使當(dāng)前對象釋放鎖废士,notify() 和 notifyAll() 不會叫潦!
synchronized(obj) {
if (條件不滿足) {
obj.wait();
}
處理邏輯;
}
用 if 為什么錯了呢官硝?
wait()的線程被其他線程用notify()或notifyAll()喚醒后矗蕊,是需要先獲得鎖的(畢竟你是在synchronized塊里);如果在被喚醒到獲得鎖的這段時間內(nèi)氢架,條件又被另一個線程改變了傻咖,而你獲得鎖并從wait()方法返回后,直接跳出了 if 的條件判斷——這時條件是不滿足的岖研,于是產(chǎn)生了邏輯錯誤卿操。所以,線程在睡眠前后都需要檢查條件孙援。
狀態(tài)轉(zhuǎn)換圖
線程調(diào)用wait()方法釋放鎖害淤,進入等待隊列,等待狀態(tài)(WAITING)拓售;被notify()/notifyAll()喚醒后窥摄,進入同步隊列,變?yōu)樽枞麪顟B(tài)(BLOCKING)础淤;隨后可再次獲得鎖并從wait()返回繼續(xù)執(zhí)行崭放。
管道輸入/輸出流
4種實現(xiàn):PipedOutputStream, PipedInputStream, PipedReader, PipedWriter
PipedWriter out = new PipedWriter();
PipedReader in = new PipedReader();
out.connect(in); // 將輸入流和輸出流進行連接哨苛,否則在使用時會拋出IOException;
ThreadLocal
在main線程中定義一個ThreadLocal對象莹菱,在各個線程中訪問時移国,訪問到的是各個線程獨立的版本——并且是獨立初始化的ThreadLocal對象吱瘩。
默認(rèn)情況下 initValue() 返回 null 道伟。線程在沒有調(diào)用 set 之前,第一次調(diào)用 get 的時候使碾, get 方法會默認(rèn)去調(diào)用 initValue 這個方法蜜徽。所以如果沒有覆寫這個方法,可能導(dǎo)致 get 返回的是 null 票摇。當(dāng)然如果調(diào)用過 set 就不會有這種情況了拘鞋。但是往往在多線程情況下我們不能保證每個線程的在調(diào)用 get 之前都調(diào)用了 set ,所以最好對 initValue 進行覆寫矢门,以免導(dǎo)致空指針異常盆色。
public class ConcurrentProgramming {
public static ThreadLocal<Integer> threadLocalInt = new ThreadLocal<Integer>() {
@Override
protected Integer initialValue() {
return 0;
}
};
// public static ThreadLocal<Integer> threadLocalInt = new ThreadLocal<>();
public static void main(String[] args) throws InterruptedException {
// threadLocalInt.set(0);
// System.out.println(threadLocalInt.get()); // 這里可以正常輸出,因為在當(dāng)前main線程中是先set祟剔,再get隔躲;
for (int i = 0; i < 2; i++) {
new Thread(new Worker()).start();
}
}
}
class Worker implements Runnable {
@Override
public void run() {
for (int i = 0; i < 5; i++) {
// 但在這里就報空指針錯了———— 所以,并不是共享的同一個ThreadLocal對象物延,而是每個線程new一個宣旱,對嗎?
ConcurrentProgramming.threadLocalInt.set(ConcurrentProgramming.threadLocalInt.get() + 1);
System.out.println(Thread.currentThread().getName() + ": " + ConcurrentProgramming.threadLocalInt.get());
}
}
}
output:
Thread-0: 1
Thread-1: 1
Thread-1: 2
Thread-1: 3
Thread-1: 4
Thread-0: 2
Thread-0: 3
Thread-0: 4
Thread-0: 5
Thread-1: 5
注意代碼中的注釋部分叛薯。沒有重寫initialValue()時浑吟,在main中set(0)然后get,沒有問題耗溜;但在另外兩個線程中的get卻報空指針異匙榱Γ——說明在main中set的值只在main線程中可見。
This class provides thread-local variables. These variables differ from their normal counterparts in that each thread that accesses one (via its get or set method) has its own, independently initialized copy of the variable.
—— 每個線程有自己的抖拴、獨立初始化的變量拷貝燎字。
所以,每個線程會獨自new一個Threadlocal對象城舞,只是共用了同一個變量名轩触,或你寫的ThreadLocal匿名內(nèi)部類。
等待超時模式
開發(fā)人員經(jīng)常會遇到這樣的方法調(diào)用場景:調(diào)用一個方法時等待一段時間家夺,如果該方法在給定的時間段內(nèi)能夠得到結(jié)果脱柱,那么將立刻返回;反之拉馋,超時返回默認(rèn)結(jié)果榨为。
實現(xiàn)方式:在經(jīng)典的等待/通知模型的加鎖惨好、條件循環(huán)、邏輯處理的基礎(chǔ)上作出非常小的改動:
public synchronized Object get(long mills) throws InterruptedException {
long future = System.currentTimeMillis() + mills;
long remaining = mills;
while ((result == null) && remaining > 0) {
wait(remaining);
remaining = future - System.currentTimeMillis();
}
return result;
}
(數(shù)據(jù)庫連接池示例随闺、線程池示例 未)
Java中的鎖
Lock接口
- void lock() 獲取鎖,調(diào)用該方法當(dāng)前線程將會獲取鎖日川,當(dāng)鎖獲取后,該方法將返回矩乐。
- void lockInterruptibly() throws InterruptedException 可中斷獲取鎖龄句,與lock()方法不同之處在于該方法會響應(yīng)中斷,即在鎖的獲取過程中可以中斷當(dāng)前線程
- boolean tryLock() 嘗試非阻塞的獲取鎖散罕,調(diào)用該方法立即返回分歇,true表示獲取到鎖
- boolean tryLock(long time,TimeUnit unit) throws InterruptedException 超時獲取鎖,以下情況會返回:時間內(nèi)獲取到了鎖欧漱,時間內(nèi)被中斷职抡,時間到了沒有獲取到鎖。
- void unlock() 釋放鎖
- Condition newCondition() 獲取等待通知組件
隊列同步器
隊列同步器AbstractQueuedSynchronizer(AQS)是用來構(gòu)建鎖或者其他同步組件的基礎(chǔ)框架误甚,它使用了一個int成員變量表示同步狀態(tài)缚甩,通過內(nèi)置的FIFO隊列來完成資源獲取線程的排隊工作。下圖顯示了java.concurrent包的實現(xiàn)示意圖:
隊列同步器的實現(xiàn)依賴內(nèi)部的同步隊列來完成同步狀態(tài)的管理窑邦。它是一個FIFO的雙向隊列擅威,當(dāng)線程獲取同步狀態(tài)失敗時,同步器會將當(dāng)前線程和等待狀態(tài)等信息包裝成一個節(jié)點并將其加入同步隊列奕翔,同時會阻塞當(dāng)前線程裕寨。當(dāng)同步狀態(tài)釋放時,會把首節(jié)點中的線程喚醒派继,使其再次嘗試獲取同步狀態(tài)宾袜。
共享式同步狀態(tài)獲取與釋放
共享式獲取與獨占式獲取最主要的區(qū)別在于同一時刻能否有多個線程同時獲取到同步狀態(tài)。以文件的讀寫為例驾窟,如果一個程序在對文件進行讀操作庆猫,那么這一時刻對于該文件的寫操作均被阻塞,而讀操作能夠同時進行绅络。寫操作要求對資源的獨占式訪問月培,而讀操作可以是共享式訪問。
左半部分恩急,共享式訪問資源時杉畜,其他共享式的訪問均被允許,而獨占式訪問被阻塞衷恭;右半部分是獨占式訪問資源時此叠,同一時刻其他訪問均被阻塞。
重入鎖 ReentrantLock
重入鎖 ReentrantLock随珠,顧名思義灭袁,就是支持重進入的鎖猬错,它表示該鎖能夠支持一個線程對資源的重復(fù)加鎖。除此之外茸歧,該鎖的還支持獲取鎖時的公平和非公平性選擇倦炒。
對于獨占鎖(Mutex),考慮如下場景:當(dāng)一個線程調(diào)用Mutex的lock()方法獲取鎖之后软瞎,如果再次調(diào)用lock()方法逢唤,則該線程將會被自己所阻塞,原因是Mutex在實現(xiàn)tryAcquire(int acquires)方法時沒有考慮占有鎖的線程再次獲取鎖的場景铜涉,而在調(diào)用tryAcquire(int acquires)方法時返回了false智玻,導(dǎo)致該線程被阻塞。簡單地說芙代,Mutex是一個不支持重進入的鎖。
synchronized關(guān)鍵字隱式的支持重進入盖彭,比如一個synchronized修飾的遞歸方法纹烹,在方法執(zhí)行時,執(zhí)行線程在獲取了鎖之后仍能連續(xù)多次地獲得該鎖召边,而不像Mutex由于獲取了鎖铺呵,而在下一次獲取鎖時出現(xiàn)阻塞自己的情況。
ReentrantLock雖然沒能像synchronized關(guān)鍵字一樣支持隱式的重進入隧熙,但是在調(diào)用lock()方法時片挂,已經(jīng)獲取到鎖的線程,能夠再次調(diào)用lock()方法獲取鎖而不被阻塞贞盯。
鎖獲取的公平性問題
公平性與否是針對獲取鎖而言的音念,如果一個鎖是公平的,那么鎖的獲取順序就應(yīng)該和鎖的請求順序一致躏敢,也就是FIFO闷愤。
非公平性鎖可能使線程“饑餓”,當(dāng)一個線程請求鎖時件余,只要獲取了同步狀態(tài)即成功獲取鎖讥脐。在這個前提下,剛釋放鎖的線程再次獲取同步狀態(tài)的幾率會非常大啼器,使得其他線程只能在同步隊列中等待旬渠。
非公平鎖可能使線程“饑餓”,為什么它又被設(shè)定成默認(rèn)的實現(xiàn)呢端壳?非公平性鎖模式下線程上下文切換的次數(shù)少告丢,因此其性能開銷更小。公平性鎖保證了鎖的獲取按照FIFO原則更哄,而代價是進行大量的線程切換芋齿。非公平性鎖雖然可能造成線程“饑餓”腥寇,但極少的線程切換,保證了其更大的吞吐量觅捆。
讀寫鎖
在Java并發(fā)包中常用的鎖(如ReentrantLock)赦役,基本上都是排他鎖,這些鎖在同一時刻只允許一個線程進行訪問栅炒,而讀寫鎖在同一時刻可以允許多個讀線程訪問掂摔,但是在寫線程訪問時,所有的讀線程和其他寫線程均被阻塞赢赊。讀寫鎖維護了一對鎖乙漓,一個讀鎖和一個寫鎖,通過分離讀鎖和寫鎖释移,使得并發(fā)性相比一般的排他鎖有了很大提升叭披。
除了保證寫操作對讀操作的可見性以及并發(fā)性的提升之外,讀寫鎖能夠簡化讀寫交互場景的編程方式玩讳。假設(shè)在程序中定義一個共享的數(shù)據(jù)結(jié)構(gòu)用作緩存涩蜘,它大部分時間提供讀服務(wù)(例如:查詢和搜索),而寫操作占有的時間很少熏纯,但是寫操作完成之后的更新需要對后續(xù)的讀服務(wù)可見同诫。
在沒有讀寫鎖支持的(Java 5 之前)時候,如果需要完成上述工作就要使用Java的等待通知機制樟澜,就是當(dāng)寫操作開始時误窖,所有晚于寫操作的讀操作均會進入等待狀態(tài),只有寫操作完成并進行通知之后秩贰,所有等待的讀操作才能繼續(xù)執(zhí)行(寫操作之間依靠synchronized關(guān)鍵字進行同步)霹俺,這樣做的目的是使讀操作都能讀取到正確的數(shù)據(jù),而不會出現(xiàn)臟讀萍膛。
改用讀寫鎖實現(xiàn)上述功能吭服,只需要在讀操作時獲取讀鎖,而寫操作時獲取寫鎖即可蝗罗,當(dāng)寫鎖被獲取到時艇棕,后續(xù)(非當(dāng)前寫操作線程)的讀寫操作都會被阻塞,寫鎖釋放之后串塑,所有操作繼續(xù)執(zhí)行沼琉,編程方式相對于使用等待通知機制的實現(xiàn)方式而言,變得簡單明了桩匪。
一般情況下打瘪,讀寫鎖的性能都會比排它鎖要好,因為大多數(shù)場景讀是多于寫的。在讀多于寫的情況下闺骚,讀寫鎖能夠提供比排它鎖更好的并發(fā)性和吞吐量彩扔。Java并發(fā)包提供讀寫鎖的實現(xiàn)是ReentrantReadWriteLock。
ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
Lock r = rwl.readLock();
Lock w = rwl.writeLock();
Condition接口
任何一個Java對象僻爽,都擁有一組監(jiān)視器方法虫碉,主要包括wait()、notify()胸梆、notifyAll()方法敦捧,這些方法與synchronized關(guān)鍵字配合使用可以實現(xiàn)等待/通知模式。Condition接口也提供類似的Object的監(jiān)視器的方法碰镜,主要包括await()兢卵、signal()、signalAll()方法绪颖,這些方法與Lock鎖配合使用也可以實現(xiàn)等待/通知模式秽荤。
相比Object實現(xiàn)的監(jiān)視器方法,Condition接口的監(jiān)視器方法具有一些Object所沒有的特性:
- Condition接口可以支持多個等待隊列:一個Lock實例可以綁定多個Condition菠发。
- Condition接口支持在等待時不響應(yīng)中斷:wait()是會響應(yīng)中斷的王滤;
- Condition接口支持等待到將來的某個時間點返回(和awaitNanos(long)/wait(long)不同!):awaitUntil(Date deadline)滓鸠;
class BoundedBuffer {
final Lock lock = new ReentrantLock();// 鎖對象
final Condition notFull = lock.newCondition(); //寫線程條件
final Condition notEmpty = lock.newCondition();//讀線程條件
final Object[] items = new Object[100];// 初始化一個長度為100的隊列
int putptr/* 寫索引 */, takeptr/* 讀索引 */, count/* 隊列中存在的數(shù)據(jù)個數(shù) */;
public void put(Object x) throws InterruptedException {
lock.lock(); //獲取鎖
try {
while (count == items.length)
notFull.await();// 當(dāng)計數(shù)器count等于隊列的長度時,不能再插入第喳,因此等待糜俗。阻塞寫線程。
items[putptr] = x;//賦值
putptr++;
if (putptr == items.length)
putptr = 0;// 若寫索引寫到隊列的最后一個位置了曲饱,將putptr置為0悠抹。
count++; // 每放入一個對象就將計數(shù)器加1。
notEmpty.signal(); // 一旦插入就喚醒取數(shù)據(jù)線程扩淀。
} finally {
lock.unlock(); // 最后釋放鎖
}
}
public Object take() throws InterruptedException {
lock.lock(); // 獲取鎖
try {
while (count == 0)
notEmpty.await(); // 如果計數(shù)器等于0則等待楔敌,即阻塞讀線程。
Object x = items[takeptr]; // 取值
takeptr++;
if (takeptr == items.length)
takeptr = 0; //若讀鎖應(yīng)讀到了隊列的最后一個位置了驻谆,則讀鎖應(yīng)置為0卵凑;即當(dāng)takeptr達到隊列長度時,從零開始取
count++; // 每取一個將計數(shù)器減1胜臊。
notFull.signal(); //枚取走一個就喚醒存線程勺卢。
return x;
} finally {
lock.unlock();// 釋放鎖
}
}
}
上面用了兩個Condition。(是不是很熟悉象对?王道黑忱,信號量,線程間同步)
等待隊列與同步隊列
在Object的監(jiān)視器模型上,一個對象擁有一個同步隊列和一個等待隊列甫煞,而并發(fā)包中的Lock(更確切的說是同步器)可以擁有一個同步隊列和多個等待多列菇曲。
Java并發(fā)容器和框架
ConcurrentHashMap
在并發(fā)環(huán)境下,HashMap的put操作會引起死循環(huán)抚吠。因為多線程會導(dǎo)致HashMap的Entry鏈表形成環(huán)形數(shù)據(jù)結(jié)構(gòu)常潮,使得Entry的next節(jié)點永遠(yuǎn)不為空。
HashTable容器使用synchronized來保證線程安全埃跷,但在線程競爭激烈的情況下HashTable的效率非常低下蕊玷。因為當(dāng)一個線程訪問HashTable的同步方法時,其他線程訪問HashTable的同步方法時弥雹,可能會進入阻塞或輪詢狀態(tài)垃帅。如線程1使用put進行添加元素,線程2不但不能使用put方法添加元素剪勿,并且也不能使用get方法來獲取元素贸诚,所以競爭越激烈效率越低。
ConcurrentHashMap的鎖分段技術(shù)
HashTable容器在競爭激烈的并發(fā)環(huán)境下表現(xiàn)出效率低下的原因厕吉,是因為所有訪問HashTable的線程都必須競爭同一把鎖酱固,那假如容器里有多把鎖,每一把鎖用于鎖容器其中一部分?jǐn)?shù)據(jù)头朱,那么當(dāng)多線程訪問容器里不同數(shù)據(jù)段的數(shù)據(jù)時运悲,線程間就不會存在鎖競爭,從而可以有效的提高并發(fā)訪問效率项钮,這就是ConcurrentHashMap所使用的鎖分段技術(shù)班眯,首先將數(shù)據(jù)分成一段一段的存儲,然后給每一段數(shù)據(jù)配一把鎖烁巫,當(dāng)一個線程占用鎖訪問其中一個段數(shù)據(jù)的時候署隘,其他段的數(shù)據(jù)也能被其他線程訪問。
ConcurrentHashMap是由Segment數(shù)組結(jié)構(gòu)和HashEntry數(shù)組結(jié)構(gòu)組成亚隙。Segment是一種可重入鎖ReentrantLock磁餐,在ConcurrentHashMap里扮演鎖的角色,HashEntry則用于存儲鍵值對數(shù)據(jù)阿弃。
ConcurrentHashMap的get操作
Segment的get操作實現(xiàn)非常簡單和高效诊霹。先經(jīng)過一次再哈希尊残,然后使用這個哈希值通過哈希運算定位到segment葛圃,再通過哈希算法定位到元素戳气,代碼如下:(兩次哈希)
public V get(Object key) {
int hash = hash(key.hashCode());
return segmentFor(hash).get(key, hash);
}
ConcurrentHashMap的Put操作
由于put方法里需要對共享變量進行寫入操作茁彭,所以為了線程安全甥雕,在操作共享變量時必須得加鎖欲主。Put方法首先定位到Segment挥吵,然后在Segment里進行插入操作裤唠。插入操作需要經(jīng)歷兩個步驟,第一步判斷是否需要對Segment里的HashEntry數(shù)組進行擴容泥张,第二步定位添加元素的位置然后放在HashEntry數(shù)組里呵恢。(擴容的時候首先會創(chuàng)建一個兩倍于原容量的數(shù)組,然后將原數(shù)組里的元素進行再hash后插入到新的數(shù)組里媚创。為了高效ConcurrentHashMap不會對整個容器進行擴容渗钉,而只對某個segment進行擴容)
ConcurrentHashMap的size操作
如果我們要統(tǒng)計整個ConcurrentHashMap里元素的大小,就必須統(tǒng)計所有Segment里元素的大小后求和钞钙。Segment里的全局變量count是一個volatile變量鳄橘,那么在多線程場景下,我們是不是直接把所有Segment的count相加就可以得到整個ConcurrentHashMap大小了呢芒炼?不是的瘫怜,雖然相加時可以獲取每個Segment的count的最新值,但是拿到之后可能累加前使用的count發(fā)生了變化本刽,那么統(tǒng)計結(jié)果就不準(zhǔn)了鲸湃。所以最安全的做法,是在統(tǒng)計size的時候把所有Segment的put子寓,remove和clean方法全部鎖住暗挑,但是這種做法顯然非常低效。
因為在累加count操作過程中斜友,之前累加過的count發(fā)生變化的幾率非常小炸裆,所以ConcurrentHashMap的做法是先嘗試2次通過不鎖住Segment的方式來統(tǒng)計各個Segment大小,如果統(tǒng)計的過程中鲜屏,容器的count發(fā)生了變化晒衩,則再采用加鎖的方式來統(tǒng)計所有Segment的大小。
并發(fā)隊列:ConcurrentLinkedQueue
用非阻塞的循環(huán)CAS方式實現(xiàn)墙歪。
Java中的阻塞隊列
阻塞隊列(BlockingQueue)是一個支持兩個附加操作的隊列。這兩個附加的操作是:在隊列為空時贝奇,獲取元素的線程會等待隊列變?yōu)榉强蘸绶啤.?dāng)隊列滿時,存儲元素的線程會等待隊列可用掉瞳。
插入和移除操作的四種處理方式
- 拋出異常:是指當(dāng)阻塞隊列滿時候毕源,再往隊列里插入元素,會拋出IllegalStateException(“Queue full”)異常陕习。當(dāng)隊列為空時霎褐,從隊列里獲取元素時會拋出NoSuchElementException異常 。
- 返回特殊值:插入方法會返回是否成功该镣,成功則返回true冻璃。移除方法,則是從隊列里拿出一個元素,如果沒有則返回null
- 一直阻塞:當(dāng)阻塞隊列滿時省艳,如果生產(chǎn)者線程往隊列里put元素娘纷,隊列會一直阻塞生產(chǎn)者線程,直到拿到數(shù)據(jù)跋炕,或者響應(yīng)中斷退出赖晶。當(dāng)隊列空時,消費者線程試圖從隊列里take元素辐烂,隊列也會阻塞消費者線程遏插,直到隊列可用。
- 超時退出:當(dāng)阻塞隊列滿時纠修,隊列會阻塞生產(chǎn)者線程一段時間胳嘲,如果超過一定的時間,生產(chǎn)者線程就會退出分瘾。
Java里的阻塞隊列
- ArrayBlockingQueue :一個由數(shù)組結(jié)構(gòu)組成的有界阻塞隊列胎围。
- LinkedBlockingQueue :一個由鏈表結(jié)構(gòu)組成的有界阻塞隊列。
- PriorityBlockingQueue :一個支持優(yōu)先級排序的無界阻塞隊列德召。
- DelayQueue:一個使用優(yōu)先級隊列實現(xiàn)的無界阻塞隊列白魂;支持延時獲取元素——在創(chuàng)建元素時可以指定多久才能從隊列中取出當(dāng)前元素;
- SynchronousQueue:一個不存儲元素的阻塞隊列——每一個put操作必須等待一個take操作上岗;
- LinkedTransferQueue:一個由鏈表結(jié)構(gòu)組成的無界阻塞隊列福荸。
- LinkedBlockingDeque:一個由鏈表結(jié)構(gòu)組成的雙向阻塞隊列。
// 大小1000的肴掷、線程公平的阻塞隊列敬锐;
// 傳入了大小參數(shù),這就叫有界呆瞻;
ArrayBlockingQueue fairQueue = new ArrayBlockingQueue(1000, true);
阻塞隊列的實現(xiàn)原理台夺,見前面BoundedBuffer的代碼。(一個隊列痴脾,一個鎖颤介,兩個Condition:notFull,notEmpty赞赖,等待通知模型)
Fork/Join框架
與MapReduce一致的思想滚朵。
ForkJoinTask(抽象類):我們要使用ForkJoin框架,必須首先創(chuàng)建一個ForkJoin任務(wù)前域。它提供在任務(wù)中執(zhí)行fork()和join()操作的機制辕近。Fork/Join框架提供了以下兩個子類:
- RecursiveAction:用于沒有返回結(jié)果的任務(wù)。
- RecursiveTask :用于有返回結(jié)果的任務(wù)匿垄。
package com.xiao;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;
public class CountTask extends RecursiveTask<Integer> {
private static final int THRESHOLD = 2; // 閾值
private int start;
private int end;
public CountTask(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
int sum = 0;
// 如果任務(wù)足夠小就計算任務(wù)
boolean canCompute = (end - start) <= THRESHOLD;
if (canCompute) {
for (int i = start; i <= end; i++) {
sum += i;
}
} else {
//如果任務(wù)大于閥值移宅,就分裂成兩個子任務(wù)計算
int middle = (start + end) / 2;
CountTask leftTask = new CountTask(start, middle);
CountTask rightTask = new CountTask(middle + 1, end);
//執(zhí)行子任務(wù)
leftTask.fork();
rightTask.fork();
//等待子任務(wù)執(zhí)行完归粉,并得到其結(jié)果
int leftResult = leftTask.join();
int rightResult = rightTask.join();
//合并子任務(wù)
sum = leftResult + rightResult;
}
return sum;
}
public static void main(String[] args) {
ForkJoinPool forkJoinPool = new ForkJoinPool();
//生成一個計算任務(wù),負(fù)責(zé)計算1+2+3+4
CountTask task = new CountTask(1, 4);
//執(zhí)行一個任務(wù)
Future result = forkJoinPool.submit(task);
try {
System.out.println(result.get());
} catch (InterruptedException | ExecutionException e) {
}
}
}
Fork/Join框架的實現(xiàn)原理
ForkJoinPool由ForkJoinTask數(shù)組和ForkJoinWorkerThread數(shù)組組成吞杭,F(xiàn)orkJoinTask數(shù)組負(fù)責(zé)存放程序提交給ForkJoinPool的任務(wù)盏浇,而ForkJoinWorkerThread數(shù)組負(fù)責(zé)執(zhí)行這些任務(wù)。(類似于線程池的實現(xiàn))
Java中的13個原子操作類
原子更新方式
- 原子更新基本類型
- 原子更新數(shù)組
- 原子更新引用
- 原子更新屬性(字段)
1芽狗,原子更新基本類型
- AtomicBoolean :原子更新布爾類型
- AtomicInteger: 原子更新整型
- AtomicLong: 原子更新長整型
2绢掰,原子更新數(shù)組
- AtomicIntegerArray :原子更新整型數(shù)組里的元素
- AtomicLongArray :原子更新長整型數(shù)組里的元素
- AtomicReferenceArray : 原子更新引用類型數(shù)組的元素
3,原子更新引用類型
- AtomicReference :原子更新引用類型
- AtomicReferenceFieldUpdater :原子更新引用類型里的字段
- AtomicMarkableReference:原子更新帶有標(biāo)記位的引用類型童擎〉尉ⅲ可以原子更新一個布爾類型的標(biāo)記位和應(yīng)用類型
4,原子更新字段類
- AtomicIntegerFieldUpdater:原子更新整型的字段的更新器
- AtomicLongFieldUpdater:原子更新長整型字段的更新器
- AtomicStampedReference:原子更新帶有版本號的引用類型顾复。該類將整型數(shù)值與引用關(guān)聯(lián)起來班挖,可用于原子的更新數(shù)據(jù)和數(shù)據(jù)的版本號,可以解決使用CAS進行原子更新時可能出現(xiàn)的ABA問題芯砸。
(恩萧芙,是個坑,需要踩)
Java中的并發(fā)工具類
CountDownLatch
(Latch:門閂)
用于等待其他線程完成操作假丧。一個功能更強大的 join().
CountDownLatch c = new CountDownLatch(2); // 等待兩個[點]完成双揪;
...
c.countDown(); // 第一個等待的操作完成;
...
c.countDown(); // 第二個等待的操作完成包帚;
...
c.await(); // 等待兩個操作完成渔期;
...
CountDownLatch(N)等待N個點完成;這里說的N個點渴邦,可以是N個線程疯趟,也可以是一個線程里的N個執(zhí)行步驟。
同步屏障:CyclicBarrier
讓一組線程到達一個屏障(也可以叫同步點)時被阻塞谋梭,直到最后一個線程到達屏障時信峻,屏障才會打開,所有被屏障攔截的線程才會繼續(xù)運行瓮床。
CyclicBarrier c = new CyclicBarrier(2); // 屏障會攔截/等待兩個線程站欺;
// 在第一個線程中;
c.await(); // 當(dāng)前線程(執(zhí)行了某些操作后)到達屏障纤垂;
// 在第二個線程中;
c.await(); // 當(dāng)前線程(執(zhí)行了某些操作后)到達屏障磷账;
CyclicBarrier和CountDownLatch的區(qū)別
CountDownLatch的計數(shù)器只能用一次峭沦,而CyclicBarrier的計數(shù)器可以使用reset()方法重置。所以CyclicBarrier可以處理更復(fù)雜的業(yè)務(wù)場景逃糟。例如吼鱼,如果計算發(fā)生錯誤蓬豁,可以重置計數(shù)器,并讓線程重新執(zhí)行一次菇肃。
控制并發(fā)線程數(shù)的Semaphore
信號量地粪,用來控制同時訪問特定資源的線程數(shù)量。
Semaphore s = new Semaphore(10);
Executor threadPool = Executors.newFixedThreadPool(30);
for (int i = 0; i < 30; i++) {
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
s.acquire();
System.out.println("Save Date");
s.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
在代碼中琐谤,雖然有30個線程在執(zhí)行蟆技,但只允許10個并發(fā)執(zhí)行。
線程間交換數(shù)據(jù)的Exchanger
Exchanger用于進行線程間的數(shù)據(jù)交換斗忌。它提供一個同步點质礼,在這個同步點,兩個線程可以交換彼此的數(shù)據(jù)织阳。如果第一個線程先執(zhí)行exchange()方法眶蕉,它會一直等待第二個線程也執(zhí)行exchange()方法,然后兩個線程交換數(shù)據(jù)唧躲。
Exchanger<String> exchanger = new Exchanger<>();
// 在線程A中造挽;
try {
String B = exchanger.exchange("A's data");
} catch (InterruptedException e) {
e.printStackTrace();
}
// 在線程B中;
try {
String A = exchanger.exchange("B's data");
} catch (InterruptedException e) {
e.printStackTrace();
}
Java中的線程池
corePool
首先理解一個[corePool 核心池]的概念:核心池是一個線程池的基本/平均能力保障弄痹。在線程池的使用初期饭入,隨著任務(wù)的提交,線程池會先盡快填滿核心池——提交一個任務(wù)就創(chuàng)建一個線程界酒,即使核心池中有空閑的線程圣拄。如果線程池有溫度的話,核心池就是線程池的“常溫”毁欣。
線程池的創(chuàng)建
我們可以通過ThreadPoolExecutor來創(chuàng)建一個線程池庇谆。
new ThreadPoolExecutor(corePoolSize, maximumPoolSize,
keepAliveTime, milliseconds,runnableTaskQueue, threadFactory,handler);
- corePoolSize(線程池的基本大小):當(dāng)提交一個任務(wù)到線程池時凭疮,線程池會創(chuàng)建一個線程來執(zhí)行任務(wù)饭耳,即使其他空閑的基本線程能夠執(zhí)行新任務(wù)也會創(chuàng)建線程,等到需要執(zhí)行的任務(wù)數(shù)大于線程池基本大小時就不再創(chuàng)建执解。如果調(diào)用了線程池的prestartAllCoreThreads方法寞肖,線程池會提前創(chuàng)建并啟動所有基本線程。
- runnableTaskQueue(任務(wù)隊列):用于保存等待執(zhí)行的任務(wù)的阻塞隊列衰腌⌒麦。可以選擇以下幾個阻塞隊列。
- ArrayBlockingQueue:是一個基于數(shù)組結(jié)構(gòu)的有界阻塞隊列右蕊,此隊列按 FIFO(先進先出)原則對元素進行排序琼稻。
- LinkedBlockingQueue:一個基于鏈表結(jié)構(gòu)的阻塞隊列,此隊列按FIFO (先進先出) 排序元素饶囚,吞吐量通常要高于ArrayBlockingQueue帕翻。靜態(tài)工廠方法Executors.newFixedThreadPool()使用了這個隊列鸠补。
- SynchronousQueue:一個不存儲元素的阻塞隊列。每個插入操作(offer())必須等到另一個線程調(diào)用移除操作(poll())嘀掸,否則插入操作一直處于阻塞狀態(tài)紫岩,吞吐量通常要高于LinkedBlockingQueue,靜態(tài)工廠方法Executors.newCachedThreadPool使用了這個隊列睬塌。
- PriorityBlockingQueue:一個具有優(yōu)先級得無限阻塞隊列泉蝌。
maximumPoolSize(線程池最大大小):線程池允許創(chuàng)建的最大線程數(shù)衫仑。如果隊列滿了梨与,并且已創(chuàng)建的線程數(shù)小于最大線程數(shù),則線程池會再創(chuàng)建新的線程執(zhí)行任務(wù)文狱。值得注意的是如果使用了無界的任務(wù)隊列這個參數(shù)就沒什么效果粥鞋。
- ThreadFactory:用于設(shè)置創(chuàng)建線程的工廠,可以通過線程工廠給每個創(chuàng)建出來的線程設(shè)置更有意義的名字瞄崇,Debug和定位問題時非常又幫助呻粹。
- RejectedExecutionHandler(飽和策略):當(dāng)隊列和線程池都滿了,說明線程池處于飽和狀態(tài)苏研,那么必須采取一種策略處理提交的新任務(wù)等浊。這個策略默認(rèn)情況下是AbortPolicy,表示無法處理新任務(wù)時拋出異常摹蘑。以下是JDK1.5提供的四種策略筹燕。
- AbortPolicy:直接拋出異常。
- CallerRunsPolicy:只用調(diào)用者所在線程來運行任務(wù)衅鹿。
- DiscardOldestPolicy:丟棄隊列里最近的一個任務(wù)撒踪,并執(zhí)行當(dāng)前任務(wù)。
- DiscardPolicy:不處理大渤,丟棄掉制妄。
當(dāng)然也可以根據(jù)應(yīng)用場景需要來實現(xiàn)RejectedExecutionHandler接口自定義策略。如記錄日志或持久化不能處理的任務(wù)泵三。
- keepAliveTime(線程活動保持時間):線程池的工作線程空閑后耕捞,保持存活的時間。所以如果任務(wù)很多烫幕,并且每個任務(wù)執(zhí)行的時間比較短俺抽,可以調(diào)大這個時間,提高線程的利用率较曼。
- TimeUnit(線程活動保持時間的單位):可選的單位有天(DAYS)凌埂,小時(HOURS),分鐘(MINUTES),毫秒(MILLISECONDS)瞳抓,微秒(MICROSECONDS, 千分之一毫秒)和毫微秒(NANOSECONDS, 千分之一微秒)。
提交任務(wù)
void execute(Runnable command) // 沒有返回值伏恐;
<T> Future<T> submit(Callable<T> task) // 有返回值的任務(wù)孩哑;
關(guān)閉線程池
我們可以通過調(diào)用線程池的shutdown或shutdownNow方法來關(guān)閉線程池,但是它們的實現(xiàn)原理不同翠桦,shutdown的原理是只是將線程池的狀態(tài)設(shè)置成SHUTDOWN狀態(tài)横蜒,然后中斷所有沒有正在執(zhí)行任務(wù)的線程。shutdownNow的原理是遍歷線程池中的工作線程销凑,然后逐個調(diào)用線程的interrupt方法來中斷線程丛晌,所以無法響應(yīng)中斷的任務(wù)可能永遠(yuǎn)無法終止。shutdownNow會首先將線程池的狀態(tài)設(shè)置成STOP斗幼,然后嘗試停止所有的正在執(zhí)行或暫停任務(wù)的線程澎蛛,并返回等待執(zhí)行任務(wù)的列表。
只要調(diào)用了這兩個關(guān)閉方法的其中一個蜕窿,isShutdown方法就會返回true谋逻。當(dāng)所有的任務(wù)都已關(guān)閉后,才表示線程池關(guān)閉成功,這時調(diào)用isTerminaed方法會返回true桐经。至于我們應(yīng)該調(diào)用哪一種方法來關(guān)閉線程池毁兆,應(yīng)該由提交到線程池的任務(wù)特性決定,通常調(diào)用shutdown來關(guān)閉線程池阴挣,如果任務(wù)不一定要執(zhí)行完气堕,則可以調(diào)用shutdownNow。
合理的配置線程池
要想合理的配置線程池畔咧,就必須首先分析任務(wù)特性茎芭,可以從以下幾個角度來進行分析:
- 任務(wù)的性質(zhì):CPU密集型任務(wù),IO密集型任務(wù)和混合型任務(wù)盒卸。
- 任務(wù)的優(yōu)先級:高骗爆,中和低。
- 任務(wù)的執(zhí)行時間:長蔽介,中和短摘投。
- 任務(wù)的依賴性:是否依賴其他系統(tǒng)資源,如數(shù)據(jù)庫連接虹蓄。
任務(wù)性質(zhì)不同的任務(wù)可以用不同規(guī)模的線程池分開處理犀呼。CPU密集型任務(wù)配置盡可能少的線程數(shù)量,如配置Ncpu+1個線程的線程池薇组。IO密集型任務(wù)則由于需要等待IO操作外臂,線程并不是一直在執(zhí)行任務(wù),則配置盡可能多的線程律胀,如2*Ncpu宋光∶部螅混合型的任務(wù),如果可以拆分罪佳,則將其拆分成一個CPU密集型任務(wù)和一個IO密集型任務(wù)逛漫,只要這兩個任務(wù)執(zhí)行的時間相差不是太大,那么分解后執(zhí)行的吞吐率要高于串行執(zhí)行的吞吐率赘艳,如果這兩個任務(wù)執(zhí)行時間相差太大酌毡,則沒必要進行分解。我們可以通過Runtime.getRuntime().availableProcessors()方法獲得當(dāng)前設(shè)備的CPU個數(shù)蕾管。
優(yōu)先級不同的任務(wù)可以使用優(yōu)先級隊列PriorityBlockingQueue來處理枷踏。它可以讓優(yōu)先級高的任務(wù)先得到執(zhí)行,需要注意的是如果一直有優(yōu)先級高的任務(wù)提交到隊列里掰曾,那么優(yōu)先級低的任務(wù)可能永遠(yuǎn)不能執(zhí)行旭蠕。
執(zhí)行時間不同的任務(wù)可以交給不同規(guī)模的線程池來處理,或者也可以使用優(yōu)先級隊列婴梧,讓執(zhí)行時間短的任務(wù)先執(zhí)行下梢。
依賴數(shù)據(jù)庫連接池的任務(wù),因為線程提交SQL后需要等待數(shù)據(jù)庫返回結(jié)果塞蹭,如果等待的時間越長CPU空閑時間就越長孽江,那么線程數(shù)應(yīng)該設(shè)置越大,這樣才能更好的利用CPU番电。
建議使用有界隊列岗屏,有界隊列能增加系統(tǒng)的穩(wěn)定性和預(yù)警能力,可以根據(jù)需要設(shè)大一點漱办,比如幾千这刷。有一次我們組使用的后臺任務(wù)線程池的隊列和線程池全滿了,不斷的拋出拋棄任務(wù)的異常娩井,通過排查發(fā)現(xiàn)是數(shù)據(jù)庫出現(xiàn)了問題暇屋,導(dǎo)致執(zhí)行SQL變得非常緩慢,因為后臺任務(wù)線程池里的任務(wù)全是需要向數(shù)據(jù)庫查詢和插入數(shù)據(jù)的洞辣,所以導(dǎo)致線程池里的工作線程全部阻塞住咐刨,任務(wù)積壓在線程池里。如果當(dāng)時我們設(shè)置成無界隊列扬霜,線程池的隊列就會越來越多定鸟,有可能會撐滿內(nèi)存,導(dǎo)致整個系統(tǒng)不可用著瓶,而不只是后臺任務(wù)出現(xiàn)問題联予。當(dāng)然我們的系統(tǒng)所有的任務(wù)是用的單獨的服務(wù)器部署的,而我們使用不同規(guī)模的線程池跑不同類型的任務(wù),但是出現(xiàn)這樣問題時也會影響到其他任務(wù)沸久。
線程池的監(jiān)控
通過線程池提供的參數(shù)進行監(jiān)控季眷。線程池里有一些屬性在監(jiān)控線程池的時候可以使用
- taskCount:線程池需要執(zhí)行的任務(wù)數(shù)量。
- completedTaskCount:線程池在運行過程中已完成的任務(wù)數(shù)量卷胯。小于或等于taskCount瘟裸。
- largestPoolSize:線程池曾經(jīng)創(chuàng)建過的最大線程數(shù)量。通過這個數(shù)據(jù)可以知道線程池是否滿過诵竭。如等于線程池的最大大小,則表示線程池曾經(jīng)滿了兼搏。
- getPoolSize:線程池的線程數(shù)量卵慰。如果線程池不銷毀的話,池里的線程不會自動銷毀佛呻,所以這個大小只增不減裳朋。
- getActiveCount:獲取活動的線程數(shù)。
通過擴展線程池進行監(jiān)控吓著。通過繼承線程池并重寫線程池的beforeExecute鲤嫡,afterExecute和terminated方法,我們可以在任務(wù)執(zhí)行前绑莺,執(zhí)行后和線程池關(guān)閉前干一些事情暖眼。如監(jiān)控任務(wù)的平均執(zhí)行時間,最大執(zhí)行時間和最小執(zhí)行時間等纺裁。這幾個方法在線程池里是空方法诫肠。
Executor框架
Executor框架的結(jié)構(gòu)和成員
Executor框架主要由3大部分組成如下:
- 任務(wù):Runnable接口和Callable接口;
- 任務(wù)的執(zhí)行:Executor接口欺缘,繼承Executor的ExecutorService接口栋豫,以及ExecutorService接口的兩個實現(xiàn)類ThreadPoolExecutor和ScheduledThreadPoolExecutor;以及一個工具類:Executors谚殊;
- 異步計算的結(jié)果:Future接口和Future接口的實現(xiàn)類FutureTask丧鸯;
ThreadPoolExecutor
ThreadPoolExecutor通常由工廠類Executors來創(chuàng)建。Executors可以創(chuàng)建3種類型的ThreadPoolExecutor:SingleThreadExecutor嫩絮,F(xiàn)ixedThreadPool丛肢,CachedThreadPool;
FixedThreadPool是使用固定線程數(shù)的線程池,Executors提供的API有如下兩個:
public static ExecutorService newFixedThreadPool(int nThreads);
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory theadFactory);
public static ExecutorService newFixedThreadPool(int nThreads){
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
// corePoolSize和maximumPoolSize都設(shè)為nThreads絮记;
// 空閑線程的存活時間為0摔踱,意味著多余的空閑線程會立即死亡;
// 使用無界的LinkedBlockingQueue怨愤,不會拒絕任務(wù)派敷;
FixedThreadPool滿足了資源管理的需求,可以限制當(dāng)前線程數(shù)量。適用于負(fù)載較重的服務(wù)器環(huán)境篮愉。
SingleThreadExecutor使用單線程執(zhí)行任務(wù)腐芍,Executors提供的API有如下兩個:
public static ExecutorService newSingleThreadExecutor();
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory);
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
// corePoolSize和maximumPoolSize均為1;
// 多余的空閑線程立即死亡试躏;
// 不拒絕任務(wù)猪勇;
SingleThreadExecutor保證了任務(wù)執(zhí)行的順序,不會存在多線程活動颠蕴。
CachedThreadPool是無界線程池泣刹,Executors提供的API有如下兩個:
public static ExecutorService newCachedThreadPool();
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory);
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
// 線程池大小不限;
// 多余的空閑線程存活60s;
// 使用不存儲元素的SynchronousQueue作為線程池的任務(wù)隊列犀被,一個offer操作必須等待另一個線程的poll操作椅您;如果主線程提交任務(wù)的速度高于線程池中處理任務(wù)的速度,CachedThreadPool會不斷創(chuàng)建新線程寡键;極端情況下掀泳,可能會因為創(chuàng)建過多的線程而耗盡CPU和內(nèi)存資源;
CachedThreadPool適用于執(zhí)行很多短期異步任務(wù)的小程序西轩,適用于負(fù)載較輕的服務(wù)器员舵。
ScheduledThreadPoolExecutor
它是ThreadPoolExecutor的子類且實現(xiàn)了ScheduledExecutorService接口,它可以在給定的延遲時間后執(zhí)行命令藕畔,或者定期執(zhí)行命令马僻,它比Timer更強大更靈活。
Executors可以創(chuàng)建的ScheduledThreadPoolExecutor的類型有ScheduledThreadPoolExecutor和SingleThreadScheduledExecutor等
ScheduledThreadPoolExecutor具有固定線程個數(shù)劫流,適用于需要多個后臺線程執(zhí)行周期任務(wù)巫玻,并且為了滿足資源管理需求而限制后臺線程數(shù)量的場景,Executors中提供的API有如下兩個:
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize);
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory);
SingleThreadScheduledExecutor具有單個線程,Executors提供的創(chuàng)建API有如下兩個:
public static ScheduledExecutorService newSingleThreadScheduledExecutor();
public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory);
它適用于單個后臺線程執(zhí)行周期任務(wù)祠汇,并且保證順序一致執(zhí)行的場景仍秤。
ScheduledThreadPoolExecutor
在給定延遲之后執(zhí)行任務(wù),或者定期執(zhí)行任務(wù)可很。ScheduledThreadPoolExecutor的功能與Timer類似诗力,但更強大、更靈活我抠。Timer對應(yīng)的是單個后臺線程苇本,而ScheduledThreadPoolExecutor可以在構(gòu)造函數(shù)中指定多個對應(yīng)的后臺線程數(shù)。
ScheduledThreadPoolExecutor中線程執(zhí)行某個周期任務(wù)的4個步驟:
步驟1:線程1從工作隊列DelayQueue中獲取已到期的task菜拓;
步驟2:線程1執(zhí)行該task瓣窄;
步驟3:線程1修改ScheduledFutureTask的time變量為下次被執(zhí)行的時間;
步驟4:線程1將修改后的task重新放回DelayQueue中纳鼎。
FutureTask類
Runnable接口:
@FunctionalInterface
public interface Runnable {
public abstract void run();
}
Callable接口(可以有返回值俺夕,可以拋出異常):
@FunctionalInterface
public interface Callable<V> {
V call() throws Exception;
}
Future接口:
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
FutureTask類的構(gòu)造方法:
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
ExecutorService的3個submit()方法都返回Future:
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result); // 執(zhí)行成功返回指定的值result裳凸;
Future<?> submit(Runnable task); // 線程執(zhí)行成功返回null;
Callable和Future的普通用法:
Callable<Integer> callable = new Callable<Integer>() {
public Integer call() throws Exception {
return new Random().nextInt(100);
}
};
FutureTask<Integer> future = new FutureTask<Integer>(callable);
new Thread(future).start();
int result = future.get();