第三章 Java
內(nèi)存模型
3.1 Java
內(nèi)存模型的基礎(chǔ)
- 通信
在共享內(nèi)存的模型里,通過寫-讀內(nèi)存中的公共狀態(tài)進(jìn)行隱式通信干签;在消息傳遞的并發(fā)模型里,線程之間必須通過發(fā)送消息來進(jìn)行顯示的通信恭理。 - 同步
在共享內(nèi)存并發(fā)模型里殉挽,同步是顯示進(jìn)行的,程序員必須顯示指定某個(gè)方法或者某段代碼需要在線程之間互斥執(zhí)行裳擎;在消息傳遞的并發(fā)模型里涎永,由于消息的發(fā)送必須在接收之前,因此同步是隱式進(jìn)行的。
在Java
中羡微,所有實(shí)例域谷饿、靜態(tài)域和數(shù)組元素都存儲(chǔ)在堆內(nèi)存中,堆內(nèi)存在線程之間共享妈倔;局部變量博投、方法定義參數(shù)和異常處理器參數(shù)不會(huì)在線程之間共享。
從抽象角度來看启涯,JMM
定義了線程和主內(nèi)存之間的抽象關(guān)系:線程之間的共享變量存儲(chǔ)在主內(nèi)存中贬堵,每個(gè)線程都有一個(gè)私有的本地內(nèi)存,本地內(nèi)存涵蓋了緩存结洼、寫緩沖區(qū)黎做、寄存器以及其它的硬件和編譯器優(yōu)化。
JMM
通過控制主內(nèi)存與每個(gè)線程的本地內(nèi)存之間的交互松忍,來為Java
程序員提供內(nèi)存可見性保證蒸殿。
重排序
指編譯器和處理器為了優(yōu)化程序性能而對(duì)指令序列進(jìn)行重新排序的一種手段:
- 編譯器優(yōu)化的重排序:編譯器在不改變單線程程序語義的前提下,重新安排語句的執(zhí)行順序鸣峭。
- 處理器的指令級(jí)并行的重排序:如果不存在數(shù)據(jù)依賴性宏所,處理器可以改變語句對(duì)應(yīng)機(jī)器指令的執(zhí)行順序。
- 內(nèi)存系統(tǒng)的重排序:由于處理器使用緩存和讀/寫緩沖區(qū)摊溶,這使得加載和存儲(chǔ)操作看上去可能是在亂序執(zhí)行爬骤。
JMM
的編譯器重新排序規(guī)則會(huì)禁止特定類型的編譯器重排序,對(duì)于處理器重排序莫换,JMM
的處理器重排序規(guī)則會(huì)要求Java
編譯器在生成指令時(shí)霞玄,插入特定類型的內(nèi)存屏障。
現(xiàn)代的處理器使用寫緩沖區(qū)臨時(shí)保存向內(nèi)存寫入的數(shù)據(jù)拉岁,但每個(gè)處理器上的寫緩沖區(qū)坷剧,僅僅對(duì)它所在的處理器可見。
由于寫緩沖區(qū)僅對(duì)自己的處理器可見喊暖,它會(huì)導(dǎo)致處理器執(zhí)行內(nèi)存操作的順序可能會(huì)與內(nèi)存實(shí)際的操作執(zhí)行順序不一致惫企,由于現(xiàn)代的處理器都會(huì)使用寫緩沖區(qū),因此現(xiàn)代的處理器都會(huì)允許對(duì)寫-讀操作進(jìn)行重排序陵叽,但不允許對(duì)存在數(shù)據(jù)依賴的操作做重排序狞尔。
happens-before
簡(jiǎn)介
用來闡述操作之間的內(nèi)存可見性,如果一個(gè)操作執(zhí)行的結(jié)果需要對(duì)另一個(gè)操作可見巩掺,那么這兩個(gè)操作必須要存在happens-before
關(guān)系沪么,這兩個(gè)操作既可以在一個(gè)線程之內(nèi),也可以在不同線程之間锌半,但并不等于前一個(gè)操作必須要在后一個(gè)操作之前執(zhí)行。
數(shù)據(jù)依賴性
編譯器和處理器不會(huì)改變存在數(shù)據(jù)依賴關(guān)系的兩個(gè)操作的執(zhí)行順序,但是僅針對(duì)單個(gè)處理器中執(zhí)行的指令序列和單個(gè)線程中執(zhí)行的操作刊殉。
as-if-serial
無論怎么重排序殉摔,單線程程序的執(zhí)行結(jié)果不能改變。
在單線程中记焊,對(duì)存在控制依賴的操作重排序逸月,不會(huì)改變執(zhí)行結(jié)果;但在多線程程序中遍膜,對(duì)存在控制依賴的操作重排序碗硬,可能會(huì)改變程序的執(zhí)行結(jié)果。
順序一致性
順序一致性是一個(gè)理論參考模型瓢颅,在設(shè)計(jì)的時(shí)候恩尾,處理器的內(nèi)存模型和編程語言的內(nèi)存模型都會(huì)以順序一致性內(nèi)存作為參照。
如果程序是正確同步的挽懦,程序的執(zhí)行將具有順序一致性:即程序的執(zhí)行結(jié)果與該程序在順序一致性內(nèi)存模型中的執(zhí)行結(jié)果相同翰意。
如果程序是正確同步的,程序的執(zhí)行將具有順序一致性:即程序的執(zhí)行結(jié)果與該程序在順序一致性內(nèi)存模型中的執(zhí)行結(jié)果相同信柿。
順序一致模型有兩大特性:
- 一個(gè)線程中的所有操作必須按照程序的順序來執(zhí)行冀偶。
- 所有線程都只能看到一個(gè)單一的操作執(zhí)行順序,在順序一致性內(nèi)存模型中渔嚷,每個(gè)操作都必須原子執(zhí)行且立刻對(duì)所有線程可見进鸠。
對(duì)于未同步或未正確同步的多線程程序,JMM
只提供最小安全性:線程執(zhí)行時(shí)讀取到的值形病,要么是之前某個(gè)線程寫入的值客年,要么是默認(rèn)值。
JMM
不保證未同步程序的執(zhí)行結(jié)果與該程序在順序一致性模型中的執(zhí)行結(jié)果一致窒朋。
未同步程序在兩個(gè)模型中的執(zhí)行特征有如下差異:
- 順序一致性模型保證單線程內(nèi)的操作會(huì)按程序的順序執(zhí)行搀罢,而
JMM
不保證單線程內(nèi)的操作會(huì)按程序的順序執(zhí)行。 - 順序一致性模型保證所有線程只能看到一致的操作執(zhí)行順序侥猩,而
JMM
不保證所有線程能看到一致的操作執(zhí)行順序榔至。 -
JMM
不保證對(duì)64位的long/double
型變量的寫操作具有原子性,而順序一致性模型保證對(duì)所有內(nèi)存讀/寫操作都具有原子性欺劳。
第四章 Java
并發(fā)編程基礎(chǔ)
- 現(xiàn)代操作系統(tǒng)調(diào)度的最小單元是線程唧取,也叫輕量級(jí)進(jìn)程,在一個(gè)進(jìn)程里可以創(chuàng)建多個(gè)線程划提,這些線程都擁有各自的計(jì)數(shù)器枫弟、堆棧和局部變量等特性,并且能夠訪問共享的內(nèi)存變量鹏往。
- 設(shè)置線程優(yōu)先級(jí)時(shí)淡诗,針對(duì)頻繁阻塞(休眠或者
I/O
操作)的線程需要設(shè)置較高優(yōu)先級(jí),而偏重計(jì)算(需要較多CPU
時(shí)間或者偏運(yùn)算)的線程則設(shè)置較低的優(yōu)先級(jí),確保處理器不會(huì)被獨(dú)占韩容。 - 線程在運(yùn)行的生命周期中可能處于以下6種不同的狀態(tài):
-
New
:初始狀態(tài)款违,線程被創(chuàng)建,但是沒有調(diào)用start()
方法群凶。 -
Runnable
:運(yùn)行狀態(tài)插爹,Java
線程將操作系統(tǒng)中的就緒和運(yùn)行兩種狀態(tài)統(tǒng)稱為“運(yùn)行中”。 -
Blocked
:阻塞狀態(tài)请梢,表示線程阻塞于鎖赠尾。 -
Waiting
:等待狀態(tài),表示線程進(jìn)入等待狀態(tài)毅弧,進(jìn)入該狀態(tài)表示當(dāng)前線程需要等待其它線程做出一些指定動(dòng)作(通知或中斷)气嫁。 -
Time_Waiting
:超時(shí)等待狀態(tài),可以在指定的時(shí)間自行返回形真。 -
Terminated
:終止?fàn)顟B(tài)杉编,表示當(dāng)前線程已經(jīng)執(zhí)行完畢。 - 中斷可以理解為線程的一個(gè)標(biāo)識(shí)位屬性咆霜,它標(biāo)識(shí)一個(gè)運(yùn)行中的線程是否被其它線程進(jìn)行了中斷操作邓馒。中斷好比其他線程對(duì)該線程打了一個(gè)招呼,其他線程通過調(diào)用該線程的
interrupt()
方法對(duì)其進(jìn)行中斷操作蛾坯。 - 線程通過檢查自身是否被中斷來進(jìn)行響應(yīng)光酣,線程通過方法
isInterrupt
來進(jìn)行判斷是否被中斷,也可以調(diào)用靜態(tài)方法Thread.interrupt
對(duì)當(dāng)前線程的中斷標(biāo)識(shí)位進(jìn)行復(fù)位脉课,如果該線程已經(jīng)處于終止?fàn)顟B(tài)救军,即使該線程被中斷過,在調(diào)用該線程對(duì)象的isInterrupt
時(shí)依舊返回false
倘零。 - 在拋出
InterruptedException
異常之前唱遭,Java
虛擬機(jī)會(huì)先將該線程的中斷標(biāo)識(shí)位清除。 - 中斷狀態(tài)是線程的一個(gè)標(biāo)識(shí)位呈驶,而中斷操作是一種簡(jiǎn)便的線程間交互方式拷泽,而這種交互方式最適合用來取消或停止任務(wù),除了中斷之外袖瞻,還可以利用一個(gè)
boolean
變量來控制是否需要停止任務(wù)并終止該線程司致。 -
Java
支持多個(gè)線程同時(shí)訪問一個(gè)對(duì)象或者對(duì)象的成員變量,由于每個(gè)線程可以擁有這個(gè)變量的拷貝聋迎,所以在程序的執(zhí)行過程中脂矫,一個(gè)線程看到的變量并不一定是最新的。 -
volatile
可以用來修飾字段霉晕,就是告知程序任何對(duì)該變量的訪問需要從共享內(nèi)存中獲取庭再,而對(duì)它的改變必須同步刷新回共享內(nèi)存捞奕,它能保證所有線程對(duì)變量訪問的可見性。 -
synchronized
可以修飾方法或者以同步塊的形式來進(jìn)行使用佩微,它主要確保多個(gè)線程在同一時(shí)刻缝彬,只能有一個(gè)線程處于方法或者同步塊中,它保證了線程對(duì)變量訪問的可見性和排他性哺眯。 - 任意線程對(duì)
Object
(Object
由synchronized
保護(hù))的訪問,首先要獲得Object
的監(jiān)視器扒俯,如果獲取失敗奶卓,線程進(jìn)入同步隊(duì)列,線程狀態(tài)變?yōu)?code>Blocked撼玄,當(dāng)訪問Object
的前驅(qū)(獲得了鎖的線程)釋放了鎖夺姑,則該釋放操作喚醒阻塞在同步隊(duì)列中的線程,使其重新嘗試對(duì)監(jiān)視器的獲取掌猛。 - 等待/通知的相關(guān)方法:
-
notify()
:通知一個(gè)在對(duì)象上等待的線程盏浙,使其從wait()
方法返回,而返回的前提是該線程獲取到了對(duì)象上的鎖荔茬。 -
notifyAll()
:通知所有等待在該對(duì)象上的鎖废膘。 -
wait()
:調(diào)用該方法的線程進(jìn)入Waiting
狀態(tài),只有等待另外線程的通知或被中斷才會(huì)返回慕蔚,調(diào)用wait()
方法后丐黄,會(huì)釋放對(duì)象的鎖。 -
wait(long)
:超時(shí)等待一段時(shí)間孔飒,如果沒有通知就返回灌闺。 -
wait(long, int)
:對(duì)于超時(shí)時(shí)間更精細(xì)粒度的控制,可以達(dá)到納秒坏瞄。 - 兩個(gè)線程通過對(duì)象來完成交互桂对,而對(duì)象上的
wait
和notify/notifyAll()
的關(guān)系就如同開關(guān)信號(hào)一樣,用來完成等待方和通知方之間的交互工作鸠匀。 - 等待/通知的經(jīng)典范式:
- 等待方
(1) 獲取對(duì)象的鎖蕉斜。
(2) 如果條件不滿足,那么調(diào)用對(duì)象的wait()
方法狮崩,被通知后仍要檢查條件蛛勉。
(3) 條件滿足則執(zhí)行對(duì)應(yīng)的邏輯。
synchronized(對(duì)象) {
while(條件不滿足) {
對(duì)象.wait();
}
對(duì)應(yīng)的處理邏輯;
}
- 通知方
(1) 獲得對(duì)象的鎖
(2) 改變條件
(3) 通知所有等待在該對(duì)象上的線程睦柴。
synchronized(對(duì)象) {
改變條件;
對(duì)象.notifyAll();
}
- 管道輸入/輸出流用于線程之間的數(shù)據(jù)傳輸诽凌,而傳輸?shù)拿浇闉閮?nèi)存,主要包括了以下4種實(shí)現(xiàn):
PipedOutputStream坦敌、PipeInputStream侣诵、PipedReader痢法、PipedWriter
,前兩種面向字節(jié)杜顺,后兩種面向字符财搁。 - 如果一個(gè)線程
A
執(zhí)行了Thread.join()
,其含義是:當(dāng)前線程A
等待Thread
線程終止后躬络,才從Thread.join
返回尖奔,線程Thread
除了提供join()
方法外桦卒,還提供了join(long millis)
和join(long millis, int nanos)
兩個(gè)具備超時(shí)特性的方法筒严,如果在給定的超時(shí)時(shí)間內(nèi)沒有終止,那么將會(huì)從超時(shí)方法中返回殿如。 -
ThreadLocal
馁菜,即線程變量茴扁,是一個(gè)以ThreadLocal
對(duì)象為鍵、任意對(duì)象為值的存儲(chǔ)結(jié)構(gòu)汪疮,這個(gè)結(jié)構(gòu)被附帶在線程上峭火,也就是說一個(gè)線程可以根據(jù)一個(gè)ThreadLocal
對(duì)象查詢到綁定在這個(gè)線程上的一個(gè)值,可以通過set(T)
方法來設(shè)置一個(gè)值智嚷,在當(dāng)前線程下再通過get()
方法獲取到原先設(shè)置的值卖丸。
第五章 Java
中的鎖
5.1 Lock
接口
鎖是用來控制多個(gè)線程訪問共享資源的方式,雖然它缺少了隱式獲取釋放鎖的便捷性纤勒,但是卻擁有了鎖獲取與釋放的可操作性坯苹、可中斷地獲取鎖以及超時(shí)獲取鎖等多種
synchronized
關(guān)鍵字不具備的同步特性。在
finally
塊中釋放鎖摇天,目的是保證在獲取到鎖之后粹湃,最終能夠被釋放。Lock
接口提供的synchronized
關(guān)鍵字不具備的主要特性嘗試非阻塞地獲取鎖:當(dāng)前線程嘗試獲取鎖泉坐,如果這一時(shí)刻沒有被其它線程獲取到为鳄,則成功獲取并持有鎖。
能被中斷地獲取鎖:與
synchronized
不同腕让,獲取到鎖的線程能夠響應(yīng)中斷孤钦,當(dāng)獲取到鎖的線程被中斷時(shí),中斷異常將會(huì)被拋出纯丸,同時(shí)鎖會(huì)被釋放偏形。在指定的截止時(shí)間之前獲取鎖:如果截止時(shí)間到了仍舊無法獲取鎖,則返回觉鼻。
Lock
的API
void lock()
:獲取鎖俊扭,調(diào)用該方法當(dāng)前線程將會(huì)獲取鎖,當(dāng)鎖獲得后坠陈,從該方法返回萨惑。void lockInterruptibly()
:可中斷地獲取鎖捐康,該方法會(huì)響應(yīng)中斷,即在鎖的獲取中可以中斷當(dāng)前線程庸蔼。boolean tryLock()
:嘗試非阻塞地獲取鎖解总,調(diào)用該方法后立刻返回,如果能夠獲取則返回true
姐仅,否則返回false
花枫。boolean tryLock(long time, TimeUnit unit) throws InterruptedException
:當(dāng)前線程在超時(shí)時(shí)間內(nèi)獲得了鎖;當(dāng)前線程在超時(shí)時(shí)間內(nèi)被中斷萍嬉;超時(shí)時(shí)間結(jié)束乌昔,返回false
。void unlock()
:釋放鎖壤追。Condition newCondition()
:獲取等待/通知組件,該組件和當(dāng)前的鎖綁定供屉,當(dāng)前線程只有獲得了鎖行冰,才能調(diào)用該組件的wait()
方法,而調(diào)用后伶丐,當(dāng)前線程將釋放鎖悼做。
5.2 隊(duì)列同步器
5.2.1 隊(duì)列同步器接口
- 隊(duì)列同步器
AbstractQueuedSynchronizer
,是用來構(gòu)建鎖或者其它同步組件的基礎(chǔ)框架哗魂,它使用了一個(gè)int
成員變量表示同步狀態(tài)肛走,通過內(nèi)置的FIFO
隊(duì)列來完成資源獲取線程的排隊(duì)工作。 - 同步器是實(shí)現(xiàn)鎖的關(guān)鍵录别,在鎖的實(shí)現(xiàn)中聚合同步器朽色,利用同步器實(shí)現(xiàn)鎖的語義∽樘猓可以理解二者之間的關(guān)系:鎖是面向使用者的葫男,它定義了使用者與鎖交互的接口,隱藏了實(shí)現(xiàn)細(xì)節(jié)崔列;同步器面向鎖的實(shí)現(xiàn)者梢褐,它簡(jiǎn)化了鎖的實(shí)現(xiàn)方式,屏蔽了同步狀態(tài)管理赵讯、線程的排隊(duì)盈咳、等待與喚醒等底層操作。鎖和同步器很好地隔離了使用者和實(shí)現(xiàn)者所需關(guān)注地領(lǐng)域边翼。
- 同步器的設(shè)計(jì)是基于模板方法模式鱼响,使用者需要繼承同步器并重寫指定的方法,隨后將同步器組合在自定義同步組件的實(shí)現(xiàn)中讯私,并調(diào)用同步器的模板方法热押,而這些模板方法將會(huì)調(diào)用使用者重載的方法西傀。
- 重寫同步器指定的方法時(shí),需要使用同步器提供的3個(gè)方法來訪問或者修改同步狀態(tài):
-
getState()
:獲取當(dāng)前同步狀態(tài)桶癣。 -
setState(int newState)
:設(shè)置當(dāng)前同步狀態(tài)拥褂。 -
compareAndSetState(int except, int update)
:使用CAS
設(shè)置當(dāng)前狀態(tài),該方法能夠保證狀態(tài)設(shè)置的原始性牙寞。 - 同步器提供的模板方法基本上分為以下3類:
- 獨(dú)占式獲取與釋放同步狀態(tài)
- 共享式獲取與釋放同步狀態(tài)
- 查詢同步隊(duì)列中的等待線程情況饺鹃。
5.2.2 隊(duì)列同步器的實(shí)現(xiàn)分析
5.2.2.1 同步隊(duì)列
- 同步器依賴內(nèi)部的同步隊(duì)列來完成同步狀態(tài)的管理,當(dāng)前線程獲取同步狀態(tài)失敗時(shí)间雀,同步器會(huì)將當(dāng)前線程以及等待狀態(tài)等信息構(gòu)造稱為一個(gè)節(jié)點(diǎn)悔详,并將其加入同步隊(duì)列,同時(shí)會(huì)阻塞當(dāng)前線程惹挟,當(dāng)同步狀態(tài)釋放時(shí)茄螃,會(huì)把首節(jié)點(diǎn)中的線程喚醒,使其再次嘗試獲取同步狀態(tài)连锯。
- 同步器中包含了兩個(gè)節(jié)點(diǎn)類型的引用归苍,一個(gè)指向頭節(jié)點(diǎn),而另一個(gè)指向尾節(jié)點(diǎn)运怖。
- 當(dāng)一個(gè)線程成功地獲取了同步狀態(tài)拼弃,其他線程將無法獲取到同步狀態(tài),轉(zhuǎn)而被構(gòu)造成為節(jié)點(diǎn)并加入到同步隊(duì)列當(dāng)中摇展,而這個(gè)加入到隊(duì)列地過程必須要保證線程安全吻氧,因此同步器提供了一個(gè)基于
CAS
的設(shè)置尾節(jié)點(diǎn)的方法。 - 同步隊(duì)列遵循
FIFO
咏连,首節(jié)點(diǎn)是獲取同步狀態(tài)成功的節(jié)點(diǎn)盯孙,首節(jié)點(diǎn)的線程在釋放同步狀態(tài)時(shí),將會(huì)喚醒后繼節(jié)點(diǎn)捻勉,而后繼節(jié)點(diǎn)將會(huì)在獲取同步狀態(tài)成功時(shí)將自己設(shè)置為首節(jié)點(diǎn)镀梭。
5.2.2.2 獨(dú)占式同步狀態(tài)獲取與釋放
- 通過調(diào)用同步器的
acquire(int arg)
方法可以獲取同步狀態(tài),該方法對(duì)中斷不敏感踱启,即由于線程獲取同步狀態(tài)失敗而進(jìn)入同步隊(duì)列后报账,后續(xù)對(duì)線程進(jìn)行中斷操作時(shí),線程不會(huì)從同步隊(duì)列中移除埠偿。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
它的主要邏輯是:
- (1)調(diào)用自定義同步器實(shí)現(xiàn)的
tryAcquire
方法透罢,該方法保證線程安全的獲取同步狀態(tài),這個(gè)方法需要隊(duì)列同步器的實(shí)現(xiàn)者來重寫冠蒋。 - (2)如果同步狀態(tài)獲取失敗羽圃,則構(gòu)造同步節(jié)點(diǎn)(獨(dú)占式
Node.EXCLUSIVE
)并通過addWaiter(Node node)
方法將該節(jié)點(diǎn)加入到同步隊(duì)列的尾部。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
//1.確保節(jié)點(diǎn)能夠線程安全地被添加
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//2.通過死循環(huán)來確保節(jié)點(diǎn)的正確添加抖剿,在"死循環(huán)"中只有通過`CAS`將節(jié)點(diǎn)設(shè)置為尾節(jié)點(diǎn)之后朽寞,當(dāng)前線程才能從該方法返回识窿,否則當(dāng)前線程不斷地進(jìn)行嘗試。
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
- (3)最后調(diào)用
acquireQueued(Node node, int arg)
方法脑融,使得該節(jié)點(diǎn)以死循環(huán)的方式獲取同步狀態(tài)喻频。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//1.得到當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)
final Node p = node.predecessor();
//2.如果當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)是頭節(jié)點(diǎn),只有在這種情況下獲取同步狀態(tài)成功
if (p == head && tryAcquire(arg)) {
//3.將當(dāng)前節(jié)點(diǎn)設(shè)為頭節(jié)點(diǎn)
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
-
可以看到肘迎,當(dāng)前線程在“死循環(huán)”中嘗試獲取同步狀態(tài)甥温,而只有前驅(qū)節(jié)點(diǎn)是頭節(jié)點(diǎn)才能夠嘗試獲取同步狀態(tài),這是由于:
- 頭節(jié)點(diǎn)是成功獲取到同步狀態(tài)的節(jié)點(diǎn)妓布,而頭節(jié)點(diǎn)的線程釋放了同步狀態(tài)后姻蚓,將會(huì)喚醒其后繼節(jié)點(diǎn),后繼節(jié)點(diǎn)的線程被喚醒后需要檢查自己的前驅(qū)節(jié)點(diǎn)是否是頭節(jié)點(diǎn)匣沼。
- 維護(hù)同步隊(duì)列的
FIFO
原則狰挡,通過簡(jiǎn)單地判斷自己的前驅(qū)是否為頭節(jié)點(diǎn),這樣就使得節(jié)點(diǎn)的釋放規(guī)則符合FIFO
释涛,并且也便于對(duì)過早通知的處理(過早通知是指前驅(qū)節(jié)點(diǎn)不是頭節(jié)點(diǎn)的線程由于中斷而被喚醒)
當(dāng)同步狀態(tài)獲取成功之后圆兵,當(dāng)前線程從
acquire(int arg)
方法返回,如果對(duì)于鎖這種并發(fā)組件而言枢贿,代表著當(dāng)前線程獲取了鎖。通過調(diào)用同步器的
release(int arg)
方法可以釋放同步狀態(tài)刀脏,該方法執(zhí)行時(shí)局荚,會(huì)喚醒頭節(jié)點(diǎn)的后繼節(jié)點(diǎn)線程,unparkSuccessor(Node node)
方法使用LockSupport
來喚醒處于等待狀態(tài)的線程愈污。
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
- (4)如果獲取不到耀态,則阻塞節(jié)點(diǎn)中的線程,而被阻塞線程的喚醒主要依靠前驅(qū)節(jié)點(diǎn)的出隊(duì)或阻塞線程被中斷來實(shí)現(xiàn)暂雹。
總結(jié):
1.在獲取同步狀態(tài)時(shí)首装,同步器維護(hù)一個(gè)同步隊(duì)列,獲取狀態(tài)失敗的線程都會(huì)被加入到隊(duì)列中進(jìn)行自旋杭跪;
2.移出隊(duì)列(或停止自旋)的條件是前驅(qū)節(jié)點(diǎn)為頭節(jié)點(diǎn)且成功獲取了同步狀態(tài)仙逻。
3.在釋放同步狀態(tài)時(shí),同步器調(diào)用tryRelease(int arg)
方法來釋放同步狀態(tài)涧尿,然后喚醒頭節(jié)點(diǎn)的后繼節(jié)點(diǎn)系奉。
5.2.2.3 共享式同步狀態(tài)獲取與釋放
- 共享式獲取和獨(dú)占式獲取最主要的區(qū)別在于同一時(shí)刻能夠有多個(gè)線程同時(shí)獲取到同步狀態(tài)。
- 通過調(diào)用同步器的
acquireShared(int arg)
方法可以共享式地獲取同步狀態(tài):
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
tryAcquireShared
返回int
類型姑廉,如果同步狀態(tài)獲取成功缺亮,那么返回值大于等于0,否則進(jìn)入自旋狀態(tài)桥言;成功獲取到同步狀態(tài)并退出自旋狀態(tài)的條件是當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)為頭節(jié)點(diǎn)萌踱,并且返回值大于等于0.
- 共享式獲取葵礼,通過調(diào)用
releaseShared(int arg)
方法釋放同步狀態(tài),tryReleaseShared
必須要確保同步狀態(tài)線程安全釋放并鸵,一般是通過循環(huán)或CAS
來保證的鸳粉,因?yàn)獒尫磐綘顟B(tài)的操作會(huì)同時(shí)來自多個(gè)線程。
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
5.2.2.4 獨(dú)占式超時(shí)獲取同步狀態(tài)
- 通過調(diào)用同步器的
doAcquireNanos(int arg, long nanosTimeout)
方法可以超時(shí)獲取同步狀態(tài)能真,即在指定的時(shí)間段內(nèi)獲取同步狀態(tài)赁严。 - 在此之前,一個(gè)線程如果獲取不到鎖而被阻塞在
synchronized
之外粉铐,對(duì)該線程進(jìn)行中斷操作疼约,此時(shí)線程中斷的標(biāo)志位會(huì)被修改,但線程依舊會(huì)阻塞在synchronized
上蝙泼;如果通過acquireInterruptibly(int arg)
方法獲取程剥,如果在等待過程中被中斷,會(huì)立刻返回汤踏,并拋出InterruptedException
異常织鲸。
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
//1.計(jì)算出截止時(shí)間.
final long deadline = System.nanoTime() + nanosTimeout;
//2.加入節(jié)點(diǎn)
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
//3.取出前驅(qū)節(jié)點(diǎn)
final Node p = node.predecessor();
//4.如果獲取成功則直接返回
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
nanosTimeout = deadline - System.nanoTime();
//5.如果到了超時(shí)時(shí)間,則直接返回
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
//6.如果在自旋過程中被中斷溪胶,那么拋出異常返回
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
通過上面的代碼可以知道搂擦,它和獨(dú)占式獲取的區(qū)別在于未獲取到同步狀態(tài)時(shí)的處理邏輯:獨(dú)占式獲取在獲取不到是會(huì)一直自旋等待;而超時(shí)獲取則會(huì)使當(dāng)前線程等待nanosTimeout
納秒哗脖,如果當(dāng)前線程在這個(gè)時(shí)間內(nèi)沒有獲取到同步狀態(tài)瀑踢,將會(huì)從等待邏輯中自動(dòng)返回。
5.2.2.5 自定義同步組件 - TwinsLock
TwinsLock
只允許至多兩個(gè)線程同時(shí)訪問才避,超過兩個(gè)線程的訪問將會(huì)被阻塞橱夭。
public class TwinsLock implements Lock {
private final Sync sync = new Sync(2);
private static final class Sync extends AbstractQueuedSynchronizer {
Sync(int count) {
//初始值為2.
setState(count);
}
@Override
protected int tryAcquireShared(int arg) {
for(;;) {
//1.獲得當(dāng)前的狀態(tài).
int current = getState();
//2.newCount表示剩余可獲取同步狀態(tài)的線程數(shù)
int newCount = current - arg;
//3.如果小于0,那么返回獲取同步狀態(tài)失敗;否則通過CAS確保設(shè)置的正確性.
if (newCount < 0 || compareAndSetState(current, newCount)) {
//4.當(dāng)返回值大于等于0表示獲取同步狀態(tài)成功.
return newCount;
}
}
}
@Override
protected boolean tryReleaseShared(int arg) {
for (;;) {
int current = getState();
//將可獲取同步狀態(tài)的線程數(shù)加1.
int newCount = current + current;
if (compareAndSetState(current, newCount)) {
return true;
}
}
}
}
@Override
public void lock() {
sync.acquireShared(1);
}
@Override
public void unlock() {
sync.releaseShared(1);
}
@Override
public boolean tryLock() {
return false;
}
@Override
public boolean tryLock(long time, @NonNull TimeUnit unit) throws InterruptedException {
return false;
}
@Override
public void lockInterruptibly() throws InterruptedException {
}
@NonNull
@Override
public Condition newCondition() {
return null;
}
}
測(cè)試用例:
public static void createTwinsLock() {
final Lock lock = new TwinsLock();
class TwinsLockThread extends Thread {
@Override
public void run() {
Log.d(TAG, "TwinsLockThread, run=" + Thread.currentThread().getName());
while (true) {
lock.lock();
try {
Thread.sleep(1000);
Log.d(TAG, "TwinsLockThread, name=" + Thread.currentThread().getName());
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
} finally {
Log.d(TAG, "TwinsLockThread, unlock=" + Thread.currentThread().getName());
lock.unlock();
}
}
}
}
for (int i = 0; i < 10; i++) {
Thread thread = new TwinsLockThread();
thread.start();
}
}
5.3 重入鎖
- 重入鎖
ReentrantLock
表示該鎖能夠支持一個(gè)線程對(duì)資源的重復(fù)加鎖。 - 如果在絕對(duì)時(shí)間上桑逝,先對(duì)鎖獲取的請(qǐng)求一定先被滿足棘劣,那么這個(gè)鎖是公平的,公平地獲取鎖楞遏,也就是等待時(shí)間最長(zhǎng)的線程最優(yōu)先地獲取鎖茬暇。
5.3.1 實(shí)現(xiàn)重進(jìn)入
重進(jìn)入需要解決兩個(gè)問題:
- 線程再次獲取鎖,鎖需要去識(shí)別獲取鎖地線程是否為當(dāng)前占據(jù)鎖的線程橱健,如果是而钞,則再次獲取成功。
- 鎖的最終釋放拘荡,線程重復(fù)
n
次獲取了鎖臼节,隨后在第n
次釋放該鎖后,其它線程能夠獲取到該鎖。
5.3.2 公平與非公平鎖的區(qū)別
- 公平與否是針對(duì)獲取鎖而言的网缝,如果一個(gè)鎖是公平的巨税,那么鎖的獲取順序就應(yīng)該符合請(qǐng)求的絕對(duì)時(shí)間順序,即
FIFO
粉臊。 - 公平鎖的區(qū)別在于加入了同步隊(duì)列中當(dāng)前節(jié)點(diǎn)是否有前驅(qū)節(jié)點(diǎn)的判斷草添,如果該方法返回
true
,表示有線程比當(dāng)前線程更早地請(qǐng)求獲取鎖扼仲,因此需要等待前驅(qū)線程獲取并釋放鎖之后才能繼續(xù)獲取鎖远寸;而對(duì)于非公平鎖,只要CAS
設(shè)置同步狀態(tài)成功即可屠凶。 - 因此驰后,公平鎖每次都是從同步隊(duì)列中的第一個(gè)節(jié)點(diǎn)獲取到鎖,而非公平鎖出現(xiàn)了一個(gè)線程連續(xù)獲取鎖的情況矗愧。
- 非公平鎖可能使線程饑餓灶芝,但其極少的線程切換,保證了更大的吞吐量唉韭。
5.4 讀寫鎖
- 之前提到的鎖都是排它鎖夜涕,這些鎖在同一時(shí)刻只允許一個(gè)線程進(jìn)行訪問,而讀寫鎖在同一時(shí)刻可以允許多個(gè)讀線程訪問属愤,但是在寫線程訪問時(shí)女器,所有的讀線程和其他寫線程均被阻塞。讀寫鎖維護(hù)了一對(duì)鎖住诸,一個(gè)讀鎖和一個(gè)寫鎖晓避,通過分離讀鎖和寫鎖,使得并發(fā)性有很大提升只壳。
- 并發(fā)包提供的讀寫鎖的實(shí)現(xiàn)是
ReentrantReadWrireLock
,它支持公平性選擇暑塑、重進(jìn)入吼句、鎖降級(jí)(寫鎖能夠降級(jí)為讀鎖)。
ReadWriteLock
僅定義了獲取讀鎖和寫鎖的兩個(gè)方法事格,即readLock
和writeLock
惕艳,而其實(shí)現(xiàn)ReentrantReadWriteLock
:
-
getReadLockCount
:返回當(dāng)前讀鎖被獲取的次數(shù)。 -
getReadHoldCount
:返回當(dāng)前線程獲取讀鎖的次數(shù)驹愚。 -
isWriteLocked
:判斷寫鎖是否被獲取远搪。 -
getWriteHoldCount
:返回當(dāng)前線程獲取寫鎖的次數(shù)。
下面是一個(gè)讀寫鎖的簡(jiǎn)單用例:
public class ReadWriteCache {
static Map<String, Object> map = new HashMap<>();
static ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
static Lock r = rwl.readLock();
static Lock w = rwl.writeLock();
public static Object get(String key) {
r.lock();
try {
return map.get(key);
} finally {
r.unlock();
}
}
public static Object put(String key, Object value) {
w.lock();
try {
return map.put(key, value);
} finally {
w.unlock();
}
}
public static void clear() {
w.lock();
try {
map.clear();
} finally {
w.unlock();
}
}
}
5.4.2 讀寫鎖的實(shí)現(xiàn)分析
- 讀寫狀態(tài)的設(shè)計(jì)
讀寫鎖需要在同步狀態(tài)(一個(gè)整形變量逢捺,高16表示讀谁鳍,低16表示寫)上維護(hù)多個(gè)讀線程和一個(gè)寫線程的狀態(tài),使得該狀態(tài)的設(shè)計(jì)成為讀寫鎖實(shí)現(xiàn)的關(guān)鍵。 - 寫鎖的獲取與釋放
寫鎖是一個(gè)支持重進(jìn)入的排它鎖倘潜,如果當(dāng)前線程已經(jīng)獲取了寫鎖绷柒,則增加寫狀態(tài)。如果當(dāng)前線程在獲取寫鎖時(shí)涮因,讀鎖已經(jīng)被獲取废睦,則當(dāng)前線程進(jìn)入等待狀態(tài)。
原因在于:讀寫鎖要確保寫鎖的操作對(duì)讀鎖可見养泡,如果允許讀鎖在已經(jīng)被獲取的情況下對(duì)寫鎖的獲取嗜湃,那么正在運(yùn)行的其它讀線程就無法感知到當(dāng)前寫線程的操作。 - 讀鎖的獲取與釋放
讀鎖是一個(gè)支持重進(jìn)入的共享鎖澜掩,它能被多個(gè)線程同時(shí)獲取购披,在沒有其它寫線程訪問(或者寫狀態(tài)為0)時(shí),讀鎖總是被成功地獲取输硝,而所做的也只是(線程安全)增加讀狀態(tài)今瀑。 - 鎖降級(jí)
鎖降級(jí)是指把持住(當(dāng)前擁有的)寫鎖点把,再獲取到讀鎖橘荠,隨后釋放(先前擁有的)寫鎖的過程。
5.6 Condition
接口
Condition
定義了等待/通知兩種類型的方法郎逃,當(dāng)前線程調(diào)用這些方法時(shí)哥童,需要提前獲取到Condition
對(duì)象關(guān)聯(lián)的鎖,Condition
是依賴Lock
對(duì)象的褒翰。
當(dāng)調(diào)用await()
方法后贮懈,當(dāng)前線程會(huì)釋放鎖并在此等待,而其他線程調(diào)用Condition
對(duì)象的signal
方法优训,通知當(dāng)前線程后朵你,當(dāng)前線程才從await
方法返回,并且在返回前已經(jīng)獲取了鎖揣非。
獲取一個(gè)Condition
必須通過Lock
的newCondition
方法抡医,下面是一個(gè)有界隊(duì)列的示例:
public class BoundedQueue<T> {
private Object[] items;
private int addIndex, removeIndex, count;
private Lock lock = new ReentrantLock();
private Condition notEmpty = lock.newCondition();
private Condition notFull = lock.newCondition();
public BoundedQueue(int size) {
items = new Object[size];
}
public void add(T t) throws InterruptedException {
lock.lock();
try {
while (count == items.length) { //如果當(dāng)前隊(duì)列內(nèi)的個(gè)數(shù)等于最大長(zhǎng)度,那么釋放鎖.
notFull.await();
}
if (++addIndex == items.length) { //如果已經(jīng)到了尾部,那么從頭開始.
addIndex = 0;
}
++count;
notEmpty.signal(); //通知阻塞在"空"條件上的線程.
} finally {
lock.unlock();
}
}
public T remove() throws InterruptedException {
lock.lock();
try {
while (count == 0) {
notEmpty.await(); //如果當(dāng)前隊(duì)列的個(gè)數(shù)等于0,那么釋放鎖.
}
Object x = items[removeIndex];
if (++removeIndex == items.length) {
removeIndex = 0;
}
--count;
notFull.signal(); //通知阻塞在"滿"條件上的線程.
return (T) x;
} finally {
lock.unlock();
}
}
}
Condition
的方法:
await()
:當(dāng)前線程進(jìn)入等待狀態(tài)直到被通知signal
或中斷,當(dāng)前線程進(jìn)入運(yùn)行狀態(tài)且從await
返回的情況:其他線程調(diào)用該
Condition
的signal
或signalAll
方法早敬。其它線程中斷當(dāng)前線程(
interrupt
)忌傻。如果當(dāng)前等待線程從
await
方法返回,那么表明當(dāng)前線程已經(jīng)獲取了Condition
對(duì)象所對(duì)應(yīng)的鎖搞监。awaitUninerruptibly
:對(duì)中斷不敏感long await Nanos(long)
:加入了超時(shí)的判斷水孩,返回值是(nanosTimeout
- 實(shí)際耗時(shí)),如果返回值是0或者負(fù)數(shù)琐驴,那么可以認(rèn)定為超時(shí)俘种。boolean awaitUntil(Data)
:直到某個(gè)固定時(shí)間秤标。signal
:?jiǎn)拘岩粋€(gè)等待在Condition
上的線程。signalAll
:?jiǎn)拘阉械却?code>Condition上的線程安疗。
5.6.2 Condition
的實(shí)現(xiàn)
ConditionObject
是AbstractQueuedSynchronizer
的內(nèi)部類抛杨,每個(gè)Condition
對(duì)象都包含著一個(gè)隊(duì)列。
1.等待隊(duì)列
在隊(duì)列中的每個(gè)節(jié)點(diǎn)都包含了一個(gè)線程的引用荐类,該線程就是在Condition
對(duì)象上等待的線程怖现,同步隊(duì)列和等待隊(duì)列中節(jié)點(diǎn)的類型都是同步器的靜態(tài)內(nèi)部類AbstractQueuedSynchronizer.Node
。
由于Condition
的實(shí)現(xiàn)是同步器的內(nèi)部類玉罐,因此每個(gè)Condition
實(shí)例都能夠訪問同步器提供的方法屈嗤,相當(dāng)于每個(gè)Condition
都擁有所屬同步器的引用。
當(dāng)調(diào)用await
方法時(shí)吊输,將會(huì)以當(dāng)前線程構(gòu)造節(jié)點(diǎn)饶号,并將節(jié)點(diǎn)從尾部加入到等待隊(duì)列,也就是將同步隊(duì)列移動(dòng)到Condition
隊(duì)列當(dāng)中季蚂。
2.等待
調(diào)用該方法的前提是當(dāng)前線程必須獲取了鎖茫船,也就是同步隊(duì)列中的首節(jié)點(diǎn),它不是直接加入到等待隊(duì)列當(dāng)中扭屁,而是通過addConditionWaiter()
方法把當(dāng)前線程構(gòu)造成一個(gè)新的節(jié)點(diǎn)并將其加入到等待隊(duì)列當(dāng)中算谈。
3.通知
調(diào)用該方法的前提是當(dāng)前線程必須獲取了鎖,接著獲取等待隊(duì)列的首節(jié)點(diǎn)料滥,將其移動(dòng)到同步隊(duì)列并使用LockSupport
喚醒節(jié)點(diǎn)中的線程然眼。
被喚醒的線程,將從await
方法中的while
中返回葵腹,進(jìn)而調(diào)用同步器的acquireQueued
方法加入到獲取同步狀態(tài)的競(jìng)爭(zhēng)中高每。
Condition
的signalAll
方法,相當(dāng)于對(duì)等待隊(duì)列中的每個(gè)節(jié)點(diǎn)均執(zhí)行一次signal
方法践宴,效果就是將等待隊(duì)列中所有節(jié)點(diǎn)全部移動(dòng)到同步隊(duì)列中鲸匿,并喚醒每個(gè)節(jié)點(diǎn)。
六阻肩、Java
并發(fā)容器和框架
6.1 ConcurrentHashMap
ConcurrentHashMap
是線程安全并且高效的HashMap
晒骇,其它的類似容器有以下缺點(diǎn):
-
HashMap
在并發(fā)執(zhí)行put
操作時(shí),會(huì)導(dǎo)致Entry
鏈表形成環(huán)形數(shù)據(jù)結(jié)構(gòu)磺浙,就會(huì)產(chǎn)生死循環(huán)獲取Entry
。 -
HashTable
使用synchronized
來保證線程安全徒坡,但在線程競(jìng)爭(zhēng)激烈的情況下HashTable
的效率非常低下撕氧。
ConcurrentHashMap
高效的原因在于它采用鎖分段技術(shù),首先將數(shù)據(jù)分成一段一段地存儲(chǔ)喇完,然后給每段數(shù)據(jù)配一把鎖伦泥,當(dāng)一個(gè)線程占用鎖并且訪問一段數(shù)據(jù)的時(shí)候,其他段的數(shù)據(jù)也能被其他線程訪問。
6.1.2 ConcurrentHashMap
的結(jié)構(gòu)
ConcurrentHashMap
是由Segment
數(shù)組結(jié)構(gòu)和HashEntry
數(shù)組結(jié)構(gòu)組成:
-
Segment
是一種可重入鎖不脯,在ConcurrentHashMap
里面扮演鎖的角色府怯; -
HashEntry
則用于存儲(chǔ)鍵值對(duì)數(shù)據(jù)。
一個(gè)ConcurrentHashMap
里包含一個(gè)Segment
數(shù)組防楷,它的結(jié)構(gòu)和HashMap
類似牺丙,是一種數(shù)組和鏈表結(jié)構(gòu)。
一個(gè)Segment
里包含一個(gè)HashEntry
數(shù)組复局,每個(gè)HashEntry
是一個(gè)鏈表結(jié)構(gòu)的元素冲簿,每個(gè)Segment
守護(hù)著一個(gè)HashEntry
里的元素,當(dāng)對(duì)HashEntry
數(shù)組的數(shù)據(jù)進(jìn)行修改時(shí)亿昏,必須首先獲得與它對(duì)應(yīng)的Segment
鎖峦剔。
6.1.5 ConcurrentHashMap
的操作
get
get
的高效在于整個(gè)get
過程中不需要加鎖,除非讀到的值是空才會(huì)加鎖重讀角钩。原因是它的get
方法將要使用的共享變量都設(shè)為volatile
吝沫,能夠在線程間保持可見性,能夠被多線程同時(shí)讀递礼,并且不會(huì)讀到過期的值惨险,例如用于統(tǒng)計(jì)當(dāng)前Segment
大小的count
字段和用于存儲(chǔ)值的HashEntry
的value
。
put
put
方法里需要對(duì)共享變量進(jìn)行寫入操作宰衙,所以為了線程安全平道,在操作共享變量之前必須加鎖,put
首先定位到Segment
供炼,然后在Segment
里進(jìn)行插入操作一屋。
size
先嘗試2次通過不鎖住Segment
的方式來統(tǒng)計(jì)各個(gè)Segment
的大小,如果統(tǒng)計(jì)的過程中袋哼,容器的count
發(fā)生了變化冀墨,則再用加鎖的方式來統(tǒng)計(jì)所有Segment
的大小。
6.2 ConcurrentLinkedQueue
ConcurrentLinkedQueue
是一個(gè)基于鏈接節(jié)點(diǎn)的無界線程安全隊(duì)列涛贯,它采用先進(jìn)先出的規(guī)則對(duì)節(jié)點(diǎn)進(jìn)行排序诽嘉,它采用CAS
算法來實(shí)現(xiàn)。
6.2.1 入隊(duì)列
入隊(duì)主要做兩件事情:
- 將入隊(duì)節(jié)點(diǎn)設(shè)置成當(dāng)前隊(duì)列尾節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)弟翘。
- 更新
tail
節(jié)點(diǎn)虫腋,如果tail
節(jié)點(diǎn)的next
節(jié)點(diǎn)不為空,則將入隊(duì)節(jié)點(diǎn)設(shè)置成tail
節(jié)點(diǎn)稀余;如果tail
節(jié)點(diǎn)的next
節(jié)點(diǎn)為空悦冀,則將入隊(duì)節(jié)點(diǎn)設(shè)置成tail
的next
節(jié)點(diǎn)。
在多線程情況下睛琳,如果有一個(gè)線程正在入隊(duì)盒蟆,那么它必須先獲取尾節(jié)點(diǎn)踏烙,然后設(shè)置尾節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)為入隊(duì)節(jié)點(diǎn),但這時(shí)可能有另外一個(gè)線程插隊(duì)了历等,那么隊(duì)列的尾節(jié)點(diǎn)就會(huì)發(fā)生變化讨惩,這時(shí)第一個(gè)線程要暫停入隊(duì)操作,然后重新獲取尾節(jié)點(diǎn)寒屯。
整個(gè)入隊(duì)操作主要做兩件事:
- 定位出尾節(jié)點(diǎn)荐捻。
- 使用
CAS
算法將入隊(duì)節(jié)點(diǎn)設(shè)置成尾節(jié)點(diǎn)的next
節(jié)點(diǎn),如不成功則重試浩螺。
6.3 阻塞隊(duì)列
6.3.1 阻塞隊(duì)列
阻塞隊(duì)列是一個(gè)支持兩個(gè)附加操作的隊(duì)列靴患,這兩個(gè)附加的操作支持阻塞的插入和移除方法:
- 當(dāng)隊(duì)列滿時(shí),隊(duì)列會(huì)阻塞插入元素的線程要出,直到隊(duì)列不滿鸳君。
- 當(dāng)隊(duì)列空時(shí),獲取元素的線程會(huì)等待隊(duì)列為空患蹂。
在阻塞隊(duì)列不可用時(shí)或颊,附加操作提供了4種處理方式:拋出異常、返回特殊值传于、一直阻塞囱挑、超時(shí)退出。每種方式通過調(diào)用不同的方法來實(shí)現(xiàn)沼溜。
Java
里面提供了7種阻塞隊(duì)列平挑。
6.4 Fork/Join
框架
用于并行執(zhí)行任務(wù)的框架,是把一個(gè)大任務(wù)分割成若干個(gè)小任務(wù)系草,最終匯總每個(gè)小任務(wù)結(jié)果后得到大人物結(jié)果的框架通熄。
Fork/Join
使用兩個(gè)類來完成事情:
-
ForkJoinTask
:它提供了fork()
和join()
操作的機(jī)制,通常情況下找都,我們繼承它的子類:有返回結(jié)果的RecursiveTask
和沒有返回結(jié)果的RecursiveAction
唇辨。 -
ForkJoinPool
:ForkJoinTask
需要通過ForkJoinPool
來添加。
ForkJoinTask
在執(zhí)行的時(shí)候可能會(huì)拋出異常能耻,但是我們沒有辦法在主線程里直接捕獲異常赏枚,所以ForkJoinTask
提供了isCompletedAbnormally()
方法來檢查任務(wù)是否已經(jīng)拋出異常或已經(jīng)取消了晓猛。
ForkJoinPool
由ForkJoinTask
數(shù)組和ForkJoinWorkerThread
數(shù)組組成饿幅,ForkJoinTask
數(shù)組負(fù)責(zé)將存放程序提交給ForkJoinPool
的任務(wù),而ForkJoinWorkerThread
數(shù)組負(fù)責(zé)執(zhí)行這些任務(wù)戒职。
七栗恩、Java
中的13個(gè)原子操作類
Atomic
包里提供了:原子更新基本類型、原子更新數(shù)組帕涌、原子更新引用和原子更新屬性摄凡。
7.1 原子更新基本類型:
AtomicBoolean
AtomicInteger
AtomicLong
基本方法:
-
int addAndGet(int delta)
:以原子方式將輸入的值與當(dāng)前的值相加,并返回結(jié)果蚓曼。 -
boolean compareAndSet(int expect, int update)
:如果當(dāng)前的數(shù)值等于預(yù)期值亲澡,則以原子方式將該值設(shè)置為輸入的值。 -
int getAndIncrement()
:以原子方式加1纫版,并返回自增前的值床绪。 -
void lazySet(int newValue)
:最終會(huì)設(shè)置成newValue
,可能會(huì)導(dǎo)致其他線程在之后的一小段時(shí)間內(nèi)還是讀到舊值其弊。 -
int getAndSet(int newValue)
:以原子方式設(shè)置為newValue
的值癞己,并返回舊值。
7.2 原子更新引用類型
AtomicIntegerArray
AtomicLongArray
AtomicReferenceArray
基本方法:
-
int addAndGet(int i, int delta)
:以原子方式將輸入值和索引i
的元素相加梭伐。 -
boolean compareAndSet(int i, int expect, int update)
:如果當(dāng)前值等于預(yù)期值痹雅,則以原子方式將數(shù)組位置i
的元素設(shè)置成update
值。
7.3 原子更新引用類型
用于原子更新多個(gè)變量糊识,提供了3種類型:
-
AtomicReference
:原子更新引用類型绩社。 -
AtomicReferenceFieldUpdater
:原子更新引用類型里的字段。 -
AtomicMarkableReference
:原子更新帶有標(biāo)記位的引用類型赂苗。
7.4 原子更新字段類
-
AtomicIntegerFieldUpdater
:原子更新整形的字段的更新器愉耙。 -
AtomicLongFieldUpdater
:原子更新長(zhǎng)整形字段的更新器舌镶。 -
AtomicStampedReference
:原子更新帶有版本號(hào)的引用類型眶俩。
原子地更新字段需要兩步:
- 因?yàn)樵痈伦侄晤惗际浅橄箢悾看问褂玫臅r(shí)候必須使用靜態(tài)方法
newUpdater
創(chuàng)建一個(gè)更新器授艰,并且需要設(shè)置想要更新的類和屬性败砂。 - 更新類的字段必須使用
public volatile
來修飾赌渣。
八、Java
中的并發(fā)工具類
九吠卷、Java
中的線程池
線程池的優(yōu)點(diǎn):降低資源消耗锡垄,提高響應(yīng)速度,提高線程的可管理性祭隔。
9.1 線程池的實(shí)現(xiàn)原理
線程池的處理流程如下:
- 判斷核心線程池是否已滿货岭,如果不是,則創(chuàng)建一個(gè)新的工作線程來執(zhí)行任務(wù)疾渴;如果已滿千贯,則進(jìn)入下個(gè)流程。
- 判斷工作隊(duì)列是否已滿搞坝,如果不是搔谴,則將提交的任務(wù)存儲(chǔ)在工作隊(duì)列里;如果已滿桩撮,則進(jìn)入下個(gè)流程敦第。
- 判斷線程池的線程是否都處于工作狀態(tài)峰弹,如果沒有,則創(chuàng)建一個(gè)新的工作線程芜果;如果已滿鞠呈,則交給飽和策略來處理。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) { //1.添加進(jìn)入核心線程.
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) { //2.添加進(jìn)入隊(duì)列.
int recheck = ctl.get();
if (!isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false)) //3.添加進(jìn)入非核心線程.
reject(command);
}
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
在以上的三步中右钾,除了加入隊(duì)列不用獲取全局鎖以外蚁吝,其它兩種情況都需要獲取,為了盡可能地避免獲取全局鎖舀射,在ThreadPoolExecutor
完成預(yù)熱之后(當(dāng)前運(yùn)行的線程數(shù)大于corePoolSize
)窘茁,幾乎所有的execute
方法調(diào)用都是加入到隊(duì)列當(dāng)中。
9.2 線程池的使用
9.2.1 線程池的創(chuàng)建
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
-
corePoolSize
:當(dāng)提交一個(gè)任務(wù)到線程池時(shí)脆烟,線程池會(huì)創(chuàng)建一個(gè)線程來執(zhí)行任務(wù)山林,即使其它空閑的基本線程能夠執(zhí)行新任務(wù)也會(huì)創(chuàng)建。 -
runnableTaskQueue
:用于保存等待執(zhí)行的任務(wù)的阻塞隊(duì)列浩淘,可以選擇: -
ArrayBlockingQueue
:基于數(shù)組結(jié)構(gòu)的有界阻塞隊(duì)列捌朴。 -
LinkedBlockingQueue
:基于鏈表結(jié)構(gòu)的阻塞隊(duì)列,吞吐量高于前者张抄。 -
SynchronousQueue
:不存儲(chǔ)元素的阻塞隊(duì)列砂蔽,每個(gè)插入操作必須等待另一個(gè)線程調(diào)用了移除操作,靜態(tài)工廠方法Executors.newCachedThreadPool
使用了這個(gè)隊(duì)列署惯。 -
PriorityBlockingQueue
:一個(gè)具有優(yōu)先級(jí)的無限阻塞隊(duì)列左驾。 -
maxPoolSize
:允許創(chuàng)建的最大線程數(shù)。 -
ThreadFactory
:用于設(shè)置創(chuàng)建線程的工廠极谊。 -
RejectExecutionHandler
:飽和策略诡右。 -
keepAliveTime
:線程池的工作線程空閑后,保持存活的時(shí)間轻猖。 -
TimeUnit
:線程保持活動(dòng)的單位帆吻。
9.2.2 向線程池提交任務(wù)
-
execute(Runnable runnable)
:提交不需要返回值的任務(wù)。 -
Future<Object> future = executor.submit(haveReturnValuetask)
:用于提交需要返回值的任務(wù)咙边,線程池會(huì)返回一個(gè)future
類型任務(wù)猜煮,可以用它來判斷任務(wù)是否執(zhí)行成功,并且可以通過get
方法來獲取返回值败许,get
方法會(huì)阻塞當(dāng)前線程直到任務(wù)完成王带。
9.2.3 關(guān)閉線程池
-
shutdownNow
:首先將線程池的狀態(tài)設(shè)為STOP
,然后嘗試停止所有的正在執(zhí)行或暫停任務(wù)的線程市殷,并返回等待執(zhí)行任務(wù)的列表愕撰。 -
shutdown
:將線程池的狀態(tài)置為SHUTDOWN
,然后中斷所有沒有正在執(zhí)行任務(wù)的線程。
十搞挣、Executor
框架
(1)在上層带迟,Java
多線程程序通常把應(yīng)用分解為若干個(gè)任務(wù),然后使用用戶級(jí)的調(diào)度器(Executor
框架)將這些任務(wù)映射為固定數(shù)量的線程囱桨。
(2)在HotSpot VM
的線程模型中邮旷,Java
線程再被一對(duì)一映射為本地操作系統(tǒng)線程,Java
線程啟動(dòng)時(shí)會(huì)創(chuàng)建一個(gè)本地操作系統(tǒng)線程蝇摸,當(dāng)該線程終止時(shí),這個(gè)操作系統(tǒng)線程也會(huì)被回收办陷。
(3)操作系統(tǒng)會(huì)調(diào)度所有線程并將它們分配給可用的CPU
貌夕。
Executor
框架
由三個(gè)部分組成:
- 任務(wù),即
Runnable
接口或Callable
接口民镜。 - 任務(wù)的執(zhí)行啡专,包括核心接口
Executor
,以及繼承自Executor
的ExecutorService
制圈,還有它的兩個(gè)關(guān)鍵類ThreadPoolExecutor
(用來執(zhí)行任務(wù))和ScheduledThreadPoolExecutor
(可以在給定的延遲后運(yùn)行命令们童,或者定期執(zhí)行命令)。 - 異步計(jì)算的結(jié)果鲸鹦,包括接口
Future
和實(shí)現(xiàn)類FutureTask
慧库。
10.2 ThreadPoolExecutor
詳解
通過工具類Executors
,可以創(chuàng)建以下三種類型的ThreadPoolExecutor
馋嗜,調(diào)用靜態(tài)創(chuàng)建方法之后齐板,會(huì)返回ExecutorService
-
FixedThreadPool
可重用固定線程數(shù)的線程池;如果當(dāng)前運(yùn)行的線程數(shù)少于corePoolSize
葛菇,則創(chuàng)建新線程來執(zhí)行任務(wù)甘磨;如果等于corePoolSize
,將任務(wù)加入到無界隊(duì)列LinkedBlockingQueue
當(dāng)中眯停;多余的空閑線程將會(huì)被立即終止济舆。 -
SingleThreadPool
單個(gè)woker
線程的executor
;corePoolSize
和maximumPoolSize
為1莺债;采用無界隊(duì)列作為工作隊(duì)列滋觉。 -
CacheThreadPool
采用沒有容量的SynchronousQueue
作為線程池的工作隊(duì)列,其corePoolSize
為0九府,maximumPool
是無界的椎瘟;其中的空閑線程最多等待60s。
如果主線程提交任務(wù)的速度高于maximumPool
中線程處理任務(wù)的速度時(shí)侄旬,CacheThreadPool
會(huì)不斷創(chuàng)建新線程肺蔚,極端情況下,CacheThreadPool
會(huì)因?yàn)閯?chuàng)建過多線程而耗盡CPU
資源儡羔。
10.3 ScheduledThreadPoolExecutor
詳解
用來在給定的延遲之后執(zhí)行任務(wù)宣羊,或者定期執(zhí)行任務(wù)璧诵,并且可以在指定的構(gòu)造函數(shù)中指定多個(gè)對(duì)應(yīng)的后臺(tái)線程數(shù)。
它采用DelayQueue
這個(gè)無界隊(duì)列作為工作隊(duì)列仇冯,其執(zhí)行分為兩個(gè)部分:
- 當(dāng)調(diào)用
ScheduledThreadPoolExecutor
的scheduleAtFixedRate()
或者scheduleWithFIxedDelay
之宿,它會(huì)向DelayQueue
中添加ScheduledFutureTask
。 - 線程池中的線程從
DelayQueue
中獲取ScheduledFutureTask
苛坚。