1 volatile 的工作原理
眾所周知律歼,在如今的計算機時代粱挡,CPU的運算處理速度與內存讀寫速度的差異非常巨大,為了解決這種差異充分利用CPU的使用效率唤衫,于是在CPU 和內存之間增加 CPU Cache(高速緩存)奄喂,將運算需要的數(shù)據(jù)先復制到CPU Cache中之剧,在進行運算時CPU不再和內存打交道,而是直接對CPU Cache 進行讀寫砍聊。除此之外還可以減少 CPU 與 I/O 設備爭搶訪存背稼,由于 CPU 和 I/O 設備會競爭同一條內存總線,有可能出現(xiàn) CPU 等待 I/O 設備訪存的情況玻蝌。而如果 CPU 能直接從緩存中獲取數(shù)據(jù)蟹肘,就可以減少競爭词疼,提高 CPU 的使用率。
1.1 緩存一致性問題
CPU Cache 會引起緩存一致性問題帘腹, 在分析緩存一致性問題時贰盗,忽略 L1 / L2 / L3 的多級緩存結構,提出緩存一致性抽象模型:
在單核 CPU 中阳欲,只需要考慮 Cache 與內存的一致性舵盈。但是在多核 CPU 中,由于每個核心都有一份獨占的 Cache球化,就會存在一個核心修改Cache 數(shù)據(jù)后秽晚,兩個核心 Cache 數(shù)據(jù)不一致的問題。因此CPU 的緩存一致性問題應該從 2 個維度考慮:
Cache 與內存的一致性問題: 在修改 Cache 數(shù)據(jù)后筒愚,如何同步回內存赴蝇?
-
多核心 Cache 的一致性問題: 在一個核心修改 Cache 數(shù)據(jù)后,如何同步給其他核心 Cache巢掺?
1.1.1 Cache 與內存的一致性問題
1> 寫直達策略(Write-Through)
寫直達策略的讀取過程:
1句伶、CPU 在訪問內存地址時,會先檢查該地址的數(shù)據(jù)是否已經(jīng)加載到 Cache 中陆淀;
2考余、如果數(shù)據(jù)在 Cache 中,則直接讀取 Cache 上的數(shù)據(jù)到 CPU 中轧苫;
-
3楚堤、如果數(shù)據(jù)不在 Cache 中:
如果 Cache 已裝滿或者 Cache Line 被占用,先執(zhí)行替換策略浸剩,騰出空閑位置钾军;
訪問內存地址鳄袍,并將內存地址所處的整個Cache LIne 寫入到映射的 Cache Line 中绢要;
讀取 Cache Line上的數(shù)據(jù)到 CPU 中。
寫直達策略的寫入過程:
1拗小、如果數(shù)據(jù)不在 Cache 中重罪,則直接將數(shù)據(jù)寫入內存;
2哀九、如果數(shù)據(jù)已經(jīng)加載到 Cache 中剿配,則不僅要將數(shù)據(jù)寫入 Cache,還要將數(shù)據(jù)寫入內存阅束。
寫直達的優(yōu)點和缺點:
優(yōu)點: 每次讀取操作就是純粹的讀取呼胚,不涉及對內存的寫入操作,讀取速度更快息裸;
-
缺點: 每次寫入操作都需要同時寫入 Cache 和寫入內存蝇更,在寫入操作上失去了 CPU 高速緩存的價值沪编,需要花費更多時間。
2> 寫回策略(Write Back)
既然寫直達策略在每次寫入操作都會寫內存年扩,那么有沒有什么辦法可以減少寫回內存的次數(shù)呢蚁廓?這就是寫回策略:
1、寫回策略會在每個 Cache Line上增加一個 “臟(Dirty)” 標記位 厨幻,當一個 Cache Line 被標記為臟時相嵌,說明它的數(shù)據(jù)與內存數(shù)據(jù)是不一致的;
2况脆、在寫入操作時饭宾,我們只需要修改 Cache Line 并將其標記為臟,而不需要寫入內存漠另;
-
3捏雌、那么,什么時候才將臟數(shù)據(jù)寫回內存呢笆搓?—— 就發(fā)生在 Cache 塊被替換出去的時候:
在寫入操作中性湿,如果目標內存塊不在 Cache 中,需要先將內存塊數(shù)據(jù)讀取到 Cache 中满败。如果替換策略換出的舊 Cache Line 是臟的肤频,就會觸發(fā)一次寫回內存操作;
在讀取操作中算墨,如果目標內存塊不在 Cache 中宵荒,且替換策略換出的舊 Cache 塊是臟的,就會觸發(fā)一次寫回內存操作净嘀;
可以看到报咳,寫回策略只有當一個 Cache 數(shù)據(jù)將被替換出去時判斷數(shù)據(jù)的狀態(tài),清(未修改過挖藏,數(shù)據(jù)與內存一致) 的 Cache 塊不需要寫回內存暑刃,臟 的 Cache 塊才需要寫回內存。這個策略能夠減少寫回內存的次數(shù)膜眠,性能會比寫直達更高岩臣。
當然,寫回策略在讀取的時候宵膨,有可能不是純粹的讀取了架谎,因為還可能會觸發(fā)一次臟 Cache Line 的寫入。
通過寫直達或寫回策略辟躏,我們已經(jīng)能夠解決 在修改 Cache 數(shù)據(jù)后谷扣,如何同步回內存 的問題
1.1.2 多核心 Cache 的一致性問題
在單核 CPU 中,我們通過寫直達策略或寫回策略保持了Cache 與內存的一致性捎琐。但是在多核 CPU 中会涎,由于每個核心都有一份獨占的 Cache涯曲,就會存在一個核心修改數(shù)據(jù)后,兩個核心 Cache 不一致的問題在塔。
舉個例子:
1幻件、Core 1 和 Core 2 讀取了同一個內存塊的數(shù)據(jù),在兩個 Core 都緩存了一份內存塊的副本蛔溃。此時Cache 和內存塊是一致的绰沥;
-
2、Core 1 執(zhí)行內存寫入操作:
在寫直達策略中贺待,新數(shù)據(jù)會直接寫回內存徽曲,此時,Cache 和內存塊一致麸塞。但由于之前 Core 2 已經(jīng)讀過這塊數(shù)據(jù)秃臣,所以 Core 2 緩存的數(shù)據(jù)還是舊的。此時哪工,Core 1 和 Core 2 不一致奥此;
在寫回策略中,新數(shù)據(jù)會延遲寫回內存雁比,此時 Cache 和內存塊不一致稚虎。不管 Core 2 之前有沒有讀過這塊數(shù)據(jù),Core 2 的數(shù)據(jù)都是舊的偎捎。此時蠢终,Core 1 和 Core 2 不一致。
-
3茴她、由于 Core 2 無法感知到 Core 1 的寫入操作寻拂,如果繼續(xù)使用過時的數(shù)據(jù),就會出現(xiàn)邏輯問題丈牢。
可以看到:由于兩個核心的工作是獨立的祭钉,在一個核心上的修改行為不會被其它核心感知到,所以不管 CPU 使用寫直達策略還是寫回策略赡麦,都會出現(xiàn)緩存不一致問題朴皆。 所以帕识,我們需要一種機制泛粹,將多個核心的工作聯(lián)合起來,共同保證多個核心下的 Cache 一致性肮疗,這就是緩存一致性機制晶姊。
1.1.3 寫傳播 & 事務串行化
緩存一致性機制必須解決的如下兩個問題:
-
寫傳播(Write Propagation): 一個CPU核心對Cache中的值進行了修改,需要傳播其他CPU核心伪货,也就是需要用到寫更新或者寫無效策略们衙。
當某個 CPU 核心的Cache中執(zhí)行寫操作時钾怔,其他 CPU 核心中對應的Cache Line 的更新策略也有兩種
寫更新(Write Update):一個CPU核心對Cache中的值進行修改時,該 CPU 核心都必須先發(fā)起一次總線請求蒙挑,通知其他 CPU 核心將它們的CPU Cache 值更新為剛寫入的值宗侦,所以寫更新會很占用總線帶寬。
寫無效(Write Invalidate) : 一個CPU核心對Cache中的值進行修改時忆蚀,該 CPU 核心都必須先發(fā)起一次總線請求矾利,通知其他 CPU 核心將它們的CPU Cache 設置為無效。MESI協(xié)議用的就是這個策略馋袜,也是絕大多數(shù) CPU 都會采用緩存一致性協(xié)議男旗。這是因為多次寫操作只需要發(fā)起一次總線事件即可,第一次寫已經(jīng)將其他緩存的值置為無效欣鳖,之后的寫不必再更新狀態(tài)察皇,這樣可以有效地節(jié)省 CPU 核間總線帶寬。
事務串行化(Transaction Serialization): 各個 CPU 核心所有寫入操作的順序泽台,在所有 CPU 核心看起來是一致什荣。
寫傳播解決了 感知 問題,如果一個核心修改了數(shù)據(jù)怀酷,就需要同步給其它核心溃睹。但只做到同步還不夠,如果各個核心收到的同步信號順序不一致胰坟,那最終的同步結果也會不一致因篇。
舉個例子:假如 CPU 有 4 個核心,Core 2 將共享數(shù)據(jù)修改為 1000笔横,隨后 Core 1 將共享數(shù)據(jù)修改為 2000竞滓。在寫傳播下,“修改為 1000” 和 “修改為 2000” 兩個事務會同步到 Core 3 和 Core 4吹缔。但是如果沒有事務串行化商佑,不同核心收到的事務順序可能是不同的,最終數(shù)據(jù)還是不一致厢塘。
1.1.4 總線嗅探 & 總線仲裁
寫傳播和事務串行化在 CPU 中是如何實現(xiàn)的呢茶没?
寫傳播 - 總線嗅探(bus snooping): 本質上就是進行Cache讀寫操作時發(fā)送到總線請求,然后讓各個核心去嗅探這些請求晚碾,再根據(jù)本地的情況進行響應抓半;
事務串行化 - 總線仲裁: 總線的獨占性要求同一時刻最多只有一個模塊占用總線,天然地會將所有核心對內存的讀寫操作串行化格嘁。如果多個核心同時發(fā)起總線事務笛求,此時總線仲裁單元會對競爭做出仲裁,未獲勝的事務只能等待獲勝的事務處理完成后才能執(zhí)行。
基于總線嗅探和總線仲裁探入,現(xiàn)代 CPU 逐漸形成了各種緩存一致性協(xié)議狡孔,例如 MESI 協(xié)議。
MESI 協(xié)議使用的是寫回策略和寫無效策略蜂嗽,MESI 協(xié)議通過Cache Line 的四種狀態(tài)非常有效地降低了 CPU 核間帶寬
1.1.5 MESI 協(xié)議
CPU Cache 是由很多個 Cache Line 組成的苗膝,CPU Line 是 CPU 從內存讀取數(shù)據(jù)的基本單位,Cache Line 大小通常是64字節(jié)植旧。
CPU對Cache的請求:
PrRd: CPU核心請求讀一個 Cache Line
PrWr: CPU核心請求寫一個 Cache Line
總線對Cache的請求:
BusRd: 其他CPU核心請求讀一個 Cache Line
BusRdX: 其他CPU核心請求寫一個該CPU核心不擁有的緩存塊
BusUpgr: 其他CPU核心請求寫一個該CPU核心擁有的緩存塊
Flush: 請求回寫整個Cache Line到內存
FlushOpt: 整個Cache Line被發(fā)到總線以發(fā)送給另外一個CPU核心(緩存到緩存的復制)
初始狀態(tài) | 操作 | 響應 |
---|---|---|
Shared(S) | PrRd | - 無總線事務生成 - 狀態(tài)保持不變 - 讀操作為緩存命中 |
Shared(S) | PrWr | - 發(fā)出總線事務BusUpgr信號 - 狀態(tài)轉換為(M)Modified - 其他緩存看到BusUpgr總線信號荚醒,標記其副本為(I)Invalid. |
Modified(M) | PrRd | - 無總線事務生成 - 狀態(tài)保持不變 - 讀操作為緩存命中 |
Modified(M) | PrWr | - 無總線事務生成 - 狀態(tài)保持不變 - 寫操作為緩存命中 |
Invalid(I) | PrRd | - 發(fā)出總線事務BusRd信號 - 其他CPU核心看到BusRd,檢查自己是否有有效的數(shù)據(jù)副本隆嗅,通知發(fā)出請求的緩存 - 狀態(tài)轉換為(S)Shared, 如果其他緩存有有效的副本 - 狀態(tài)轉換為(E)Exclusive, 如果其他緩存都沒有有效的副本 - 如果其他緩存有有效的副本, 其中一個緩存發(fā)出數(shù)據(jù)(FlushOpt)界阁;否則從主存獲得數(shù)據(jù) |
Invalid(I) | PrWr | - 發(fā)出總線事務BusRdX信號 - 狀態(tài)轉換為(M)Modified - 如果其他緩存有有效的副本, 其中一個緩存發(fā)出數(shù)據(jù)(FlushOpt);否則從主存獲得數(shù)據(jù) - 如果其他緩存有有效的副本, 見到BusRdX信號后無效其副本 - 向Cache Line中寫入修改后的值 |
Exclusive(E) | PrRd | - 無總線事務生成 - 狀態(tài)保持不變 - 讀操作為緩存命中 |
Exclusive(E) | PrWr | - 無總線事務生成 - 狀態(tài)轉換為(M)Modified - 向Cache Line中寫入修改后的值 |
緩存一致性協(xié)議定義了Cache Line的4個狀態(tài):獨占(exclusive)胖喳、共享(share)泡躯、修改(modified)、失效(invalid)丽焊。
M(Modified较剃,修改): 表明 Cache Line 被修改過,與內存中不一致技健,只有本地一個拷貝(專有)写穴,并且其它核心的同一個 Cache Line 會失效;
E(Exclusive雌贱,獨占): 表明 Cache LIne 只有本地一個拷貝(專有)啊送;
S(Share,共享): 表明 Cache Line 不僅有本地一個拷貝并且其它核心也存在其拷貝欣孤;
I(Invalid馋没,失效): 未從內存加載數(shù)據(jù)或者已失效;
在 獨占 和 共享 狀態(tài)下降传,Cache Line的數(shù)據(jù)是干凈的篷朵,任何讀取操作可以直接使用該Cache Line的數(shù)據(jù);
在 失效 和 修改 狀態(tài)下婆排,Cache Line 的數(shù)據(jù)是臟的声旺,其數(shù)據(jù)和內存中的可能不一致,在讀取或寫入 失效 Cache Line 時段只,需要先將其它核心 修改 的Cache Line寫回內存腮猖,再從內存讀取翼悴;
在 共享 和 失效 狀態(tài)缚够,核心沒有獲得 Cache Line 的獨占權(鎖)。在修改數(shù)據(jù)時不能直接修改鹦赎,而是要先向總線發(fā)起 RFO(Request For Ownership)請求 谍椅,將其它核心的 Cache 置為 失效,等到獲得回應 ACK 后才算獲得 Cache Line 的獨占權古话。
這個獨占權這有點類似于開發(fā)語言層面的鎖概念雏吭,在修改資源之前,需要先獲取資源的鎖陪踩;
在 修改 和 獨占 狀態(tài)下杖们,核心已經(jīng)獲得了 Cache Line 的獨占權(鎖)。在修改數(shù)據(jù)時不需要向總線發(fā)送RFO 請求肩狂,能夠減輕總線的通信壓力摘完。
MESI 協(xié)議有一個非常 nice 的在線體驗網(wǎng)站,你可以對照文章內容傻谁,在網(wǎng)站上操作指令區(qū)孝治,并觀察內存和緩存的數(shù)據(jù)和狀態(tài)變化。網(wǎng)站地址:(VivioJS MESI)
1.1.6 寫緩沖區(qū) & 失效隊列
MESI 協(xié)議保證了 CPU Cache 的一致性审磁,但完全地遵循協(xié)議會影響性能谈飒。 因此,現(xiàn)代的 CPU 會增加寫緩沖區(qū)和失效隊列將 MESI 協(xié)議的請求異步化态蒂,以提高效率:
- 寫緩沖區(qū)(Store Buffer)
由于在寫入操作之前杭措,CPU Core 需要先發(fā)起 RFO 請求獲得獨占權,在其它 CPU Core 回應 ACK 之前钾恢,當前 CPU Core 只能空等待重斑,這對 CPU 資源是一種浪費朵夏。因此現(xiàn)代 CPU 會采用 寫緩沖區(qū) 機制,寫入的數(shù)據(jù)放到寫緩沖區(qū)后并發(fā)送 RFO 請求后,CPU 就可以去執(zhí)行其它任務镜盯,等收到 ACK 后再將寫入數(shù)據(jù)寫到 Cache 上。
當前CPU核心如果要讀Cache Line中的數(shù)據(jù)徘溢,需要先掃描Store Buffer之后再讀取Cache Line(Store-Buffer Forwarding)盹兢。
- 失效隊列(Invalidation Queue)
Store Buffer 容量是有限的,當Store Buffer滿了之后CPU核心還是要卡住等待ACK拱礁。所以其他核心在收到 RFO 請求時琢锋,需要及時回應 ACK。如果核心很忙不能及時回復呢灶,就會造成發(fā)送 RFO 請求的核心在等待 ACK吴超。因此現(xiàn)代 CPU 會采用 失效隊列 機制,先把其它核心發(fā)過來的 RFO 請求放到失效隊列鸯乃,然后直接返回 ACK鲸阻,等當前核心處理完任務后再去處理失效隊列中的失效請求跋涣。
因此核心可能并不知道在它Cache里的某個Cache Line是Invalid狀態(tài)的,因為失效隊列包含有收到但還沒有處理的Invalidation消息鸟悴。CPU在讀取數(shù)據(jù)的時候陈辱,并不像Store buffer那樣讀取Invalidate queue。
1.2 內存屏障
CPU 已經(jīng)實現(xiàn)了 MESI 協(xié)議细诸,已經(jīng)在硬件層面實現(xiàn)了寫傳播和事務串行化沛贪,為什么 Java 語言層面還需要定義 volatile
關鍵字呢?豈不是多此一舉震贵?
MESI 協(xié)議解決了緩存一致性利赋,一致性有強弱之分:
強一致性: 保證在任意時刻任意副本上的同一份數(shù)據(jù)都是相同的,或者允許不同猩系,但是每次使用前都要刷新確保數(shù)據(jù)一致媚送,所以最終還是一致。
弱一致性: 不保證在任意時刻任意副本上的同一份數(shù)據(jù)都是相同的寇甸,也不要求使用前刷新季希,但是隨著時間的遷移,不同副本上的同一份數(shù)據(jù)總是向趨同的方向變化幽纷,最終還是趨向一致式塌。
例如,MESI 協(xié)議就是強一致性的友浸,但引入寫緩沖區(qū)或失效隊列后就變成弱一致性峰尝,隨著寫緩沖區(qū)和失效隊列被消費,各個核心 Cache 最終還是會趨向一致狀態(tài)收恢。
引入寫緩沖區(qū)或失效隊列后會導致內存一致性的問題(出現(xiàn)內存重排 Memory Reordering)武学,舉個例子:初始狀態(tài)變量 a 和變量 b 都是 0,現(xiàn)在 Core1 和 Core2 分別執(zhí)行這兩段指令伦意,最終 x 和 y 的結果是什么火窒?
Core1
a = 1; // A1
x = b; // A2
Core2
b = 2; // B1
y = a; // B2
寫緩存區(qū)造成內存重排:
可以看到:從內存的視角看,直到 Core1 執(zhí)行 A3 來刷新寫緩沖區(qū)驮肉,寫操作 A1 才算真正執(zhí)行了熏矿。雖然 Core 的執(zhí)行順序是 A1 → A2 → B1 → B2,但內存看到的順序卻是 A2 → B1 → B2 → A1离钝,變量 a 寫入沒有同步給對變量 a 的讀取票编,即發(fā)生了內存重排。內存重排是硬件上無法解決的問題卵渴,這個時候就必須加入內存屏障來解決這個問題慧域。
內存屏障 (Memory Barrier)分為寫屏障(Store Barrier)、讀屏障(Load Barrier)和全屏障(Full Barrier)浪读,寫屏障會阻塞等待Store Buffer中的數(shù)據(jù)同步刷到Cache后再執(zhí)行屏障后面的讀寫操作昔榴;讀屏障會阻塞Invalid Queue中的消息理完成后再執(zhí)行屏障后面的讀寫操作辛藻。
JVM 并不直接顯露內存屏障。(Memory barriers are not directly exposed by the JVM. )反之互订,為了保證語言級的并發(fā)原語語義吱肌,它們被 JVM 插入到指令序列中。(Instead they are inserted into the instruction sequence by the JVM in order to uphold the semantics of language level concurrency primitives. )
還是以上面的例子說明下:
Core1
// volatile 類型
a = 1; // A1
// 插入寫內存屏障
// 插入讀內存屏障
x = b; // A2
Core2
// volatile 類型
b = 2; // B1
// 插入寫內存屏障
// 插入讀內存屏障
y = a; // B2
初始狀態(tài)變量 a 和變量 b 都是 0 并且設置為volatile類型屁奏,現(xiàn)在 Core1 和 Core2 分別執(zhí)行這兩段指令
由于在 A1和B1之后插入一個寫內存屏障岩榆,所以在執(zhí)行A2和B2之前會將a=1和b=2刷到Cache
由于在 A2前插入一個讀內存屏障错负,所以在執(zhí)行A2之前會將b=1同步到Core1的Cache
由于在 B2前插入一個讀內存屏障坟瓢,所以在執(zhí)行B2之前會將a=1同步到Core2的Cache
內存屏障解決了內存重排的問題,除此意外內存屏障可以阻止屏障兩側的指令重排序(即編譯器重排)
需要注意的是犹撒,內存屏障并不是Java源代碼中的一部分折联,它們是在編譯到機器指令時由Java內存模型隱式插入的,寫Java代碼時是看不到的识颊。
x86架構CPU提供了比較強的緩存一致性支持诚镰,但有的ARM構的CPU指供的緩存一致性支持就很弱,大部分Android手機采用的CPU是ARM架構祥款,所以需要在正確的位置插入內存屏障來保證一致性清笨。
對于app開發(fā)者來說理解到這里就可以了,下面從源碼角度分析刃跛,有興趣的同學可以自己看看Hotspot虛擬機arm架構下的模板解釋器中的putfield和getfield方法抠艾。
2 synchronized 的工作原理
首先舉個例子:
public class UnSafeTest {
@SneakyThrows
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(10);
CustomService customService = new CustomService();
for (int i = 0; i < 10; i++) {
new Thread(() -> {
for (int j = 0; j < 100; j++) {
customService.add();
}
countDownLatch.countDown();
}).start();
}
// 等待其他線程執(zhí)行完畢
countDownLatch.await();
System.out.println("num:" + customService.getNum());
}
static class CustomService {
private int num = 0;
public void add() {
num++;
}
public int getNum() {
return num;
}
}
}
上述代碼啟動了10個線程,每個線程使num累加100次桨昙,期望結果是1000检号,但打印通常小于1000,這是一個典型的線程安全問題蛙酪。 我們在多線程環(huán)境下調用了CustomService.add()齐苛,因而導致線程不安全,那么為什么會有線程安全問題呢桂塞?我們來看關鍵代碼:
public void add() {
num++;
}
num++ 實際上并不是原子操作凹蜂,這需要看其對應的字節(jié)碼信息(可以使用javap -v xx.class進行查看):
public void add();
descriptor: ()V
flags: ACC_PUBLIC
Code:
stack=3, locals=1, args_size=1
0: aload_0
1: dup
2: getfield #2 // Field num:I
5: iconst_1
6: iadd
7: putfield #2 // Field num:I
10: return
可以發(fā)現(xiàn)num++實際上由3個指令組成getfield、iadd阁危、putfield組成炊甲。
getfield:獲取變量值
iadd:執(zhí)行+1
putfield:設置變量值
既然num++實際上由3步組成,那么在多線程環(huán)境中就無法保證其執(zhí)行過程的原子性欲芹,通過下圖可以更加清晰的說明這一點:
上面這種情況是原子性問題導致線程不安全卿啡,synchronized是一個同步鎖,在同一時刻被修飾的方法或代碼塊只有一個線程能執(zhí)行菱父,以保證線程安全颈娜。很多人都稱之為重量級鎖(也叫做悲觀鎖)剑逃,但是隨著JDK1.6對synchronized進行了鎖升級的優(yōu)化后,在線程競爭不激烈的情況下性能還是不錯的官辽,接下來看看synchronized怎么解決線程安全問題:
public synchronized void add() {
num++;
}
或者
public void add() {
synchronized (CustomService.class) {
num++;
}
}
接下來看一下字節(jié)碼
public synchronized void add();
descriptor: ()V
flags: ACC_PUBLIC, ACC_SYNCHRONIZED
Code:
stack=3, locals=1, args_size=1
0: aload_0
1: dup
2: getield #2 // Field num:I
5: iconst_1
6: iadd
7: putfield #2 // Field num:I
10: return
或者
public void add();
descriptor: ()V
flags: ACC_PUBLIC
Code:
stack=3, locals=3, args_size=1
0: ldc #3 // class com/cytmxk/trainproject/UnSafeTest$CustomService
2: dup
3: astore_1
4: monitorenter
5: aload_0
6: dup
7: getfield #2 // Field num:I
10: iconst_1
11: iadd
12: putfield #2 // Field num:I
15: aload_1
16: monitorexit
17: goto 25
20: astore_2
21: aload_1
22: monitorexit
23: aload_2
24: athrow
25: return
synchronized修飾方法時蛹磺,會在訪問標識符(flags)中加入ACC_SYNCHRONIZED標識
方法級同步是隱式執(zhí)行的。當調用這些方法時同仆,如果發(fā)現(xiàn)會ACC_SYNCHRONIZED標識萤捆,則會進入一個monitor,執(zhí)行方法俗批,然后退出monitor俗或。這時如果其他線程來請求執(zhí)行方法,會因為無法獲得監(jiān)視器鎖而被阻斷住岁忘。
無論方法調用正常還是發(fā)生異常辛慰,都會自動退出monitor,也就是釋放鎖干像。
synchronized修飾代碼塊時帅腌,會增加monitorenter和monitorexit指令
每個對象都與一個監(jiān)視器monitor關聯(lián),執(zhí)行monitorenter指令的線程嘗試獲取鎖對象關聯(lián)的監(jiān)視器monitor的所有權麻汰,此時其他線程將阻塞等待速客,直到執(zhí)行monitorexit退出monitor時,
其他線程才有機會來獲取監(jiān)視器monitor的所有權五鲫。
synchronized保證了有且僅有一個線程獲取執(zhí)行權(即保證了原子性)溺职。
synchronized也是基于MESI協(xié)議和內存屏障實現(xiàn)的緩存一致性
在monitorenter指令之后有一個Load內存屏障將CPU Cache更新到最新值
在monitorexit指令之后會有一個Store內存屏障將Store Buffer更新到CPU Cache
3 通過ReentrantLock解決線程安全問題
3.1 LockSupport 的工作原理
AQS使用LockSupport來控制線程的阻塞和喚醒,我們更加熟悉的阻塞喚醒操作是wait/notify方式臣镣,它是以Object的角度來設計辅愿,而LockSupport提供的park/unpark則是以線程的角度來設計。LockSupport主要提供兩類操作:
1 park 操作提供了4個方法
/**
* 許可證可用則消耗許可證并且調用立即返回忆某;否則當前線程會進入WAITING或者TIMED_WAITING狀態(tài)点待,直到以下三種情況之一發(fā)生:
* 1> 其他線程以當前線程為目標調用 unpark
* 2> 其他線程中斷當前線程
* 3> 調用錯誤地(沒有原因地)返回
*
* 此方法不報告導致方法返回的原因,調用方應首先重新檢查導致線程停止的條件弃舒,調用方還可以在返回時確定線程的中斷狀態(tài)癞埠。
*/
public static void park() {
U.park(false, 0L);
}
/**
* 同上
*/
public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
U.park(false, 0L);
setBlocker(t, null);
}
/**
* 許可證可用則消耗許可證并且調用立即返回;否則當前線程會進入WAITING或者TIMED_WAITING狀態(tài)聋呢,直到以下三種情況之一發(fā)生:
* 1> 其他線程以當前線程為目標調用 unpark
* 2> 其他線程中斷當前線程
* 3> 指定的等待時間已到
* 4> 調用錯誤地(沒有原因地)返回
*
* 此方法不報告導致方法返回的原因苗踪,調用方應首先重新檢查導致線程停止的條件,調用方還可以在返回時確定線程的中斷狀態(tài)或者調用花費的時間削锰。
*/
public static void parkNanos(Object blocker, long nanos) {
if (nanos > 0) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
U.park(false, nanos);
setBlocker(t, null);
}
}
/**
* 許可證可用則消耗許可證并且調用立即返回通铲;否則當前線程會進入WAITING或者TIMED_WAITING狀態(tài),直到以下三種情況之一發(fā)生:
* 1> 其他線程以當前線程為目標調用 unpark
* 2> 其他線程中斷當前線程
* 3> 到達指定的時間線
* 4> 調用錯誤地(沒有原因地)返回
*
* 此方法不報告導致方法返回的原因器贩,調用方應首先重新檢查導致線程停止的條件颅夺,調用方還可以在返回時確定線程的中斷狀態(tài)或者調用返回時的時間朋截。
*/
public static void parkUntil(Object blocker, long deadline) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
U.park(true, deadline);
setBlocker(t, null);
}
上面三種形式的park都有blocker參數(shù),此參數(shù)在線程進入WAITING或者TIMED_WAITING狀態(tài)時被記錄吧黄,以幫助監(jiān)視工具和診斷工具確定線程受阻塞的原因(通過getBlocker方法獲取blocker)部服。
看下線程dump的結果來理解blocker的作用。
2 unpark 操作
/**
* 使給定線程的許可證可用(如果尚未可用)拗慨,如果線程在 park上被阻塞那么它將解除阻塞廓八,否則保證下一次調用park不會被阻塞。如果指定的線
* 程尚未啟動赵抢,則不能保證此操作有任何效果剧蹂。
*/
public static void unpark(Thread thread) {
if (thread != null)
U.unpark(thread);
}
歸根結底,LockSupport調用的是Unsafa的native代碼:
// park方法
UNSAFE_ENTRY(void, Unsafe_Park(JNIEnv *env, jobject unsafe, jboolean isAbsolute, jlong time))
...
// 調用parker的park方法
thread->parker()->park(isAbsolute != 0, time);
...
UNSAFE_END
// unpark方法
UNSAFE_ENTRY(void, Unsafe_Unpark(JNIEnv *env, jobject unsafe, jobject jthread))
...
if (p != NULL) {
HS_DTRACE_PROBE1(hotspot, thread__unpark, p);
// 調用parker的unpark方法
p->unpark();
}
UNSAFE_END
// JSR166 per-thread parker
private:
Parker* _parker;
public:
Parker* parker() { return _parker; }
park函數(shù)使當前線程進入WAITING或者TIMED_WAITING狀態(tài)昌讲,而其unpark函數(shù)則是將指定線程喚醒国夜,看一下Parker的park和unpark方法:
class Parker : public os::PlatformParker {
private:
volatile int _counter ;
...
public:
Parker() : PlatformParker() {
_counter = 0 ; //初始化為0
...
}
...
public:
// For simplicity of interface with Java, all forms of park (indefinite,
// relative, and absolute) are multiplexed into one call.
void park(bool isAbsolute, jlong time);
void unpark();
...
};
os::PlatformParker是對底層的抽象减噪,根據(jù)不同的操作系統(tǒng)有不同的實現(xiàn)短绸,在Linux系統(tǒng)下,是用Posix線程庫pthreads中的mutex(互斥量)和condition(條件變量)實現(xiàn)的筹裕,mutex和condition保護了一個counter的變量醋闭,當park時counter被設置為0,當unpark時_counter被設置為1朝卒。接下來看一下Linux系統(tǒng)中的實現(xiàn):
class PlatformParker : public CHeapObj<mtInternal> {
protected:
enum {
REL_INDEX = 0,
ABS_INDEX = 1
};
int _cur_index; // which cond is in use: -1, 0, 1
pthread_mutex_t _mutex [1] ; // pthread互斥鎖
pthread_cond_t _cond [2] ; // pthread條件變量數(shù)組,一個用于相對時間证逻,一個用于絕對時間
public: // TODO-FIXME: make dtor private
~PlatformParker() { guarantee (0, "invariant") ; }
public:
PlatformParker() {
int status;
status = pthread_cond_init (&_cond[REL_INDEX], os::Linux::condAttr());
assert_status(status == 0, status, "cond_init rel");
status = pthread_cond_init (&_cond[ABS_INDEX], NULL);
assert_status(status == 0, status, "cond_init abs");
status = pthread_mutex_init (_mutex, NULL);
assert_status(status == 0, status, "mutex_init");
_cur_index = -1; // mark as unused
}
};
void Parker::park(bool isAbsolute, jlong time) {
// Ideally we'd do something useful while spinning, such
// as calling unpackTime().
// Optional fast-path check:
// Return immediately if a permit is available.
// We depend on Atomic::xchg() having full barrier semantics
// since we are doing a lock-free update to _counter.
if (Atomic::xchg(0, &_counter) > 0) return; // 使用 xchgl 指令將_counter的值設置為0,如果_counter原來的值大于0則返回
Thread* thread = Thread::current();
assert(thread->is_Java_thread(), "Must be JavaThread");
JavaThread *jt = (JavaThread *)thread;
// Optional optimization -- avoid state transitions if there's an interrupt pending.
// Check interrupt before trying to wait
if (Thread::is_interrupted(thread, false)) { // 如果線程處于中斷狀態(tài)抗斤,直接返回
return;
}
// Next, demultiplex/decode time arguments
timespec absTime;
// 如果time小于0囚企,或者isAbsolute是true并且time等于0則直接返回
if (time < 0 || (isAbsolute && time == 0) ) { // don't wait at all
return;
}
if (time > 0) {
unpackTime(&absTime, isAbsolute, time);
}
// Enter safepoint region
// Beware of deadlocks such as 6317397.
// The per-thread Parker:: mutex is a classic leaf-lock.
// In particular a thread must never block on the Threads_lock while
// holding the Parker:: mutex. If safepoints are pending both the
// the ThreadBlockInVM() CTOR and DTOR may grab Threads_lock.
ThreadBlockInVM tbivm(jt); // 構造當前線程的 ThreadBlockInVM
// 如果當前線程設置了中斷標志,或者獲取mutex互斥鎖失敗則直接返回
// 由于Parker是每個線程都有的瑞眼,所以_counter cond mutex都是每個線程都有的龙宏,不是所有線程共享的,所以加鎖失敗只有兩種情況伤疙,
// 1> unpark已經(jīng)加鎖這時只需要返回即可银酗,對應unpark先調用的情況
// 2> 調用pthread_mutex_trylock出錯
if (Thread::is_interrupted(thread, false) || pthread_mutex_trylock(_mutex) != 0) { // 嘗試獲取鎖,失敗則返回
return;
}
int status ;
// 如果當前線程持有許可(即_counter大于0)徒像,說明之前已經(jīng)調用unpark方法將_counter置為了1
if (_counter > 0) {
_counter = 0; // 將許可消耗掉
status = pthread_mutex_unlock(_mutex);
assert (status == 0, "invariant") ;
// Paranoia to ensure our locked and lock-free paths interact
// correctly with each other and Java-level accesses.
OrderAccess::fence();
return;
}
#ifdef ASSERT
// Don't catch signals while blocked; let the running threads have the signals.
// (This allows a debugger to break into the running thread.)
sigset_t oldsigs;
sigset_t* allowdebug_blocked = os::Linux::allowdebug_blocked_signals();
pthread_sigmask(SIG_BLOCK, allowdebug_blocked, &oldsigs);
#endif
OSThreadWaitState osts(thread->osthread(), false /* not Object.wait() */);
jt->set_suspend_equivalent();
// cleared by handle_special_suspend_equivalent_condition() or java_suspend_self()
assert(_cur_index == -1, "invariant");
// 如果time等于0黍特,說明是相對時間也就是isAbsolute是fasle(否則前面就直接返回了)
if (time == 0) {
_cur_index = REL_INDEX; // arbitrary choice when not timed
// 當前線程進入WAITING狀態(tài)
status = pthread_cond_wait (&_cond[_cur_index], _mutex) ;
} else {
// 判斷isAbsolute是false還是true,false的話使用_cond[0]锯蛀,否則用_cond[1]
_cur_index = isAbsolute ? ABS_INDEX : REL_INDEX;
// 使用條件變量當前線程進入TIMED_WAITING狀態(tài)
status = os::Linux::safe_cond_timedwait (&_cond[_cur_index], _mutex, &absTime) ;
if (status != 0 && WorkAroundNPTLTimedWaitHang) {
pthread_cond_destroy (&_cond[_cur_index]) ;
pthread_cond_init (&_cond[_cur_index], isAbsolute ? NULL : os::Linux::condAttr());
}
}
// 如果當前線程被喚醒則繼續(xù)向下執(zhí)行
_cur_index = -1;
assert_status(status == 0 || status == EINTR ||
status == ETIME || status == ETIMEDOUT,
status, "cond_timedwait");
#ifdef ASSERT
pthread_sigmask(SIG_SETMASK, &oldsigs, NULL);
#endif
_counter = 0 ; // 返回后 _counter 狀態(tài)位重置
status = pthread_mutex_unlock(_mutex) ;
assert_status(status == 0, status, "invariant") ;
// 使用內存屏障使_counter對其它線程可見
OrderAccess::fence();
// If externally suspended while waiting, re-suspend
if (jt->handle_special_suspend_equivalent_condition()) {
jt->java_suspend_self();
}
}
void Parker::unpark() {
int s, status ;
status = pthread_mutex_lock(_mutex);
assert (status == 0, "invariant") ;
s = _counter;
_counter = 1;
if (s < 1) {
// 說明當前parker對應的線程掛起了灭衷,因為_cur_index初始是-1,并且等待條件變量的線程被喚醒
// 后也會將_cur_index重置-1
if (_cur_index != -1) {
// 如果設置了WorkAroundNPTLTimedWaitHang先調用signal再調用unlock旁涤,在hotspot在Linux下默認使用這種方式
// 即先調用signal再調用unlock
if (WorkAroundNPTLTimedWaitHang) {
status = pthread_cond_signal (&_cond[_cur_index]);
assert (status == 0, "invariant");
status = pthread_mutex_unlock(_mutex);
assert (status == 0, "invariant");
} else {
// must capture correct index before unlocking
int index = _cur_index;
status = pthread_mutex_unlock(_mutex);
assert (status == 0, "invariant");
status = pthread_cond_signal (&_cond[index]);
assert (status == 0, "invariant");
}
} else { // 如果_cur_index == -1說明線程沒在等待條件變量翔曲,則直接解鎖
pthread_mutex_unlock(_mutex);
assert (status == 0, "invariant") ;
}
} else { // 如果_counter == 1,說明線程調用了一次或多次unpark但是沒調用park经备,則直接解鎖
pthread_mutex_unlock(_mutex);
assert (status == 0, "invariant") ;
}
}
inline jint Atomic::xchg (jint exchange_value, volatile jint* dest) {
__asm__ volatile ( "xchgl (%2),%0"
: "=r" (exchange_value)
: "0" (exchange_value), "r" (dest)
: "memory");
return exchange_value;
}
__asm__ 代表后面的xchgl是匯編指令,xchgl是一個原子操作部默,交換exchange_value和dest的值侵蒙,并且將exchange_value的值返回。
首先說明一下上面用到的三個方法:
int pthread_mutex_lock(pthread_mutex_t* mutex); // 阻塞式的加鎖
int pthread_mutex_trylock(pthread_mutex_t* mutex); // 非阻塞式的加鎖,加不上鎖就返回
int pthread_mutex_unlock(pthread_mutex_t* mutex); // 解鎖
// 以上函數(shù)成功返回0傅蹂;失敗返回錯誤碼
接下來總結一下park方法的邏輯:
1> 使用 xchgl 指令將counter修改為 0 返回纷闺,如果counter原來的值大于0則返回,即有許可直接消費掉
2> pthread_mutex_trylock 嘗試獲取鎖份蝴,失敗則返回非0的錯誤碼(其他線程unpark該線程時持有mutex互斥量或者pthread_mutex_trylock調用出錯) 3> 如果持有許可(即counter >0成立)犁功,則消費掉許可(即執(zhí)行counter = 0),釋放鎖(執(zhí)行pthread_mutex_unlock)然后返回
4> 如果 3 不成立婚夫,根據(jù)時間的不同執(zhí)行不同的等待函數(shù)浸卦,如果等待正確返回,則消費掉許可(即執(zhí)行counter = 0)案糙,釋放鎖(執(zhí)行pthread_mutex_unlock)然后返回
unpark的邏輯為:
1> pthread_mutex_lock 獲取鎖限嫌,可能會阻塞線程
2> 將參數(shù)線程設置為持有許可(即_counter 設置為 1)
3> 判斷 _counter 的舊值:
小于 1 時,調用 pthread_cond_signal 喚醒在 park 阻塞的線程时捌;
等于 1 時怒医,直接返回
3.2 CAS 的工作原理
說到CAS,第一個想到的就是原子類型奢讨,其實4.1中的問題就可以通過AtomicInteger解決:
static class CustomService {
private final AtomicInteger num = new AtomicInteger();
public void add() {
num.getAndIncrement();
}
public int getNum() {
return num.get();
}
}
為什么上面的代碼可以解決線程安全的問題呢稚叹?先看下AtomicInteger的部分源碼:
// Unsafe 對象,可以直接根據(jù)內存地址操作數(shù)據(jù)拿诸,可以突破JVM的現(xiàn)在直接操作內存扒袖,所以是不安全的
private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
// 存儲value屬性在AtomicInteger類實例內部的偏移地址
private static final long VALUE;
static {
try {
// 在類初始化的時候就獲取就獲取到了value屬性在對象內部的偏移地址
VALUE = U.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value"));
} catch (ReflectiveOperationException e) {
throw new Error(e);
}
}
// 存儲實際的值
private volatile int value;
......
/**
* Atomically increments by one the current value.
*
* @return the previous value
*/
public final int getAndIncrement() {
return U.getAndAddInt(this, VALUE, 1);
}
1> 首先內部持有一個Unsafe對象,原子類底層的操作都是基于Unsafe進行的亩码。
2> volatile int value 是AtomicInteger實例的實際數(shù)值季率,volatile保證并發(fā)中的可見性和有序性。
3> VALUE 是 value屬性在AtomicInteger對象內部的偏移地址蟀伸,通過Unsafe類是可以在內存級別給變量賦值的蚀同,要操作AtomicInteger實例的數(shù)據(jù),首先要知道AtomicInteger實例的內存地址啊掏,其次是要知道value屬性在對象內部的偏移量VALUE蠢络,接著直接給這塊內存賦值就行了:
4> AtomicInteger的getAndIncrement()方法是基于Unsafe的getAndAddInt的,Unsafe的getAndAddInt方法源碼:
public final int getAndAddInt(Object o, long offset, int delta) {
int v;
do {
v = getIntVolatile(o, offset);
} while (!compareAndSwapInt(o, offset, v, v + delta));
return v;
}
1> o + offset 得到value屬性在內存中的地址迟蜜,然后根據(jù)地址從內存中獲取value的值刹孔,保存在v中(即期望值)。
2> compareAndSwapInt : v的值跟當前內存的值(o + offsetSet 地址對應的值)進行對比,如果相等則將內存的值修改為 v + delta(即CAS操作成功)髓霞,如果值不相等卦睹,則進入下一次循環(huán),
直到CAS操作成功為止方库。
3> CAS操作是怎么保證原子性的呢(比較和修改這是兩個動作结序,如果比較的時候是一樣的,修改的時候被別的線程修改了)纵潦?
接下來hotspot中compareAndSwapInt的實現(xiàn):
UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x))
UnsafeWrapper("Unsafe_CompareAndSwapInt");
oop p = JNIHandles::resolve(obj);
jint* addr = (jint *) index_oop_from_field_offset_long(p, offset);
return (jint)(Atomic::cmpxchg(x, addr, e)) == e;
UNSAFE_END
inline jint Atomic::cmpxchg (jint exchange_value, volatile jint* dest, jint compare_value) {
int mp = os::is_MP();
__asm__ volatile (LOCK_IF_MP(%4) "cmpxchgl %1,(%3)"
: "=a" (exchange_value)
: "r" (exchange_value), "a" (compare_value), "r" (dest), "r" (mp)
: "cc", "memory");
return exchange_value;
}
__asm_ "cmpxchg面的xchgl是匯編指令徐鹤,LOCK_IF_MP代表如果設計多核心則添加lock,使cmpxchgl指令原子性
CAS保證了加1操作的原子性邀层,volatile確保了可見性返敬、有序性,所以可以解決上面的問題
3.3 ReentrantLock的工作原理
LockSupport 底層使用互斥量mutex和condition寥院,互斥量mutex和condition是內核提供的能力劲赠,所以性能消耗是非常大的。
首先通過下圖整體看一下ReentrantLock的類結構
首先
ReentrantLock
繼承自父類Lock
秸谢,然后有3
個內部類凛澎,其中Sync
內部類繼承自AQS
,另外的兩個內部類繼承自Sync
钮追,這兩個類分別是用來實現(xiàn)公平鎖和非公平鎖的预厌,首先看一下非公平鎖的流程圖:接下來就是對應的源碼:
// ReentrantLock.java
/** Synchronizer providing all implementation mechanics */
private final Sync sync;
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
public void lock() {
sync.lock();
}
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
final void lock() {
// 通過CAS的方式先判斷state == 0是否成立阿迈,如果成立則將當前state設置為1
if (compareAndSetState(0, 1))
// state成功設置為1元媚,說明當前線程獲取到資源,則將當前線程保存下來
setExclusiveOwnerThread(Thread.currentThread());
else
// 失敗的話將當前線程放到等待隊列
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
abstract void lock();
// 非公平的嘗試獲取鎖
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// c == 0 表示沒有線程占有鎖
if (c == 0) {
// 通過CAS的方式先判斷state == 0是否成立苗沧,如果成立則將當前state設置為1
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 如果當前線程已經(jīng)占有鎖刊棕,則state加1
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
...
}
// AbstractQueuedSynchronizer.java
private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
private static final long STATE;
private static final long HEAD;
private static final long TAIL;
static {
try {
STATE = U.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
HEAD = U.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("head"));
TAIL = U.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
} catch (ReflectiveOperationException e) {
throw new Error(e);
}
// Reduce the risk of rare disastrous classloading in first call to
// LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
Class<?> ensureLoaded = LockSupport.class;
}
/**
* The synchronization state.
*/
private volatile int state;
protected final boolean compareAndSetState(int expect, int update) {
return U.compareAndSwapInt(this, STATE, expect, update);
}
// 請求鎖
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
Sync
通過acquire
和release
方法獲取個釋放鎖,所以 ReentrantLock
實現(xiàn)的是AQS
的獨占模式待逞,也就是獨占鎖甥角。
上面說明了非公平鎖的獲取和釋放流程,接下來看一下公平鎖:
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
// 將當前線程放到等待隊列
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// c == 0 表示沒有線程占有鎖
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
// 等待隊列中不存在有效節(jié)點并且CAS成功的情況下當前線程獲取到鎖
setExclusiveOwnerThread(current);
return true;
}
}
// 如果當前線程已經(jīng)獲取到鎖识樱,則state加1
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
釋放鎖的流程和非公平鎖一樣嗤无,不一樣的地方體現(xiàn)在獲取鎖的流程。
接下來看一下AQS中核心等待隊列
的管理:
3.3.1 線程加入隊列的時機
當執(zhí)行Acquire(1)時怜庸,會通過tryAcquire獲取鎖当犯。在這種情況下,如果獲取鎖失敗割疾,就會調用addWaiter加入到等待隊列中去嚎卫。
獲取鎖失敗后,會執(zhí)行 addWaiter(Node.EXCLUSIVE) 加入等待隊列宏榕,具體實現(xiàn)方法如下:
private Node addWaiter(Node mode) {
// 創(chuàng)建于當前線程關聯(lián)的Node節(jié)點(即將thread字段設置為當前線程)
Node node = new Node(mode);
for (;;) {
Node oldTail = tail;
if (oldTail != null) {
// 將node的prev設置為oldTail
U.putObject(node, Node.PREV, oldTail);
// 使用CAS的方式判斷oldTail地址和tail的地址是否相同拓诸,成立的話將tail設置為node
if (compareAndSetTail(oldTail, node)) {
// 最終完成將Node插入隊尾
oldTail.next = node;
return node;
}
} else {
initializeSyncQueue();
}
}
}
private final void initializeSyncQueue() {
Node h;
// 創(chuàng)建第一個節(jié)點(虛節(jié)點侵佃,占位用),head和tail同時指向該節(jié)點
if (U.compareAndSwapObject(this, HEAD, null, (h = new Node())))
tail = h;
}
回到公平鎖的代碼奠支,hasQueuedPredecessors是公平鎖加鎖時判斷等待隊列中是否存在有效節(jié)點的方法馋辈。如果返回False,說明當前線程可以爭取共享資源倍谜;如果返回True首有,說明隊列中存在有效節(jié)點,當前線程必須加入到等待隊列中枢劝。
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
// h != t 成立代表等待對列中存在等待的節(jié)點
// (s = h.next) == null 成立代表其他線程正在添加到等待隊列井联,進行到了tail已經(jīng)指向新Node,但是Head還沒有指向新Node您旁,此時隊列中有元素烙常,需要返回true
// s.thread != Thread.currentThread() 成立代表等待隊列的第一個有效節(jié)點線程與當前線程不同,所以當前線程不可以獲取鎖鹤盒,當前線程必須加入進等待隊列
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
回到最初的代碼:
// 請求鎖
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
上文解釋了addWaiter方法蚕脏,這個方法其實就是把對應的線程以Node的數(shù)據(jù)結構形式加入到雙端隊列里,返回的是一個包含該線程的Node侦锯。而這個Node會作為參數(shù)驼鞭,進入到acquireQueued方法中。acquireQueued會把放入隊列中的線程不斷去獲取鎖尺碰,直到獲取成功或者被park掛起:
final boolean acquireQueued(final Node node, int arg) {
try {
// 標記等待過程中是否中斷過
boolean interrupted = false;
// 開始自旋挣棕,要么獲取鎖,要么中斷
for (;;) {
// 獲取當前節(jié)點的前驅節(jié)點
final Node p = node.predecessor();
// 如果p是首節(jié)點亲桥,說明當前節(jié)點在真實數(shù)據(jù)隊列的頭部洛心,就嘗試獲取鎖(別忘了頭結點是虛節(jié)點)
if (p == head && tryAcquire(arg)) {
// 獲取鎖成功,head指針移動到當前node题篷,node節(jié)點變成虛節(jié)點
setHead(node);
p.next = null; // help GC
return interrupted;
}
// 說明p為首節(jié)點且當前沒有獲取到鎖(可能是鎖被其他線程占了)或者是p不是首節(jié)點词身,這個時候就要判斷當前node是否要被阻塞(被阻塞條件:前驅節(jié)點的waitStatus為SIGNAL),
// 防止無限循環(huán)浪費資源番枚。具體兩個方法下面細細分析
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
}
}
// 將指定的節(jié)點設置為首節(jié)點(即虛節(jié)點)
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
// 靠前驅節(jié)點判斷當前線程是否應該被阻塞
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 獲取前驅節(jié)點的節(jié)點狀態(tài)
int ws = pred.waitStatus;
// 說明前驅節(jié)點處于SIGNAL狀態(tài)(在獨占鎖機制中法严,waitStatus 只會使用到 CANCELLED 和 SIGNAL 兩個狀態(tài))
if (ws == Node.SIGNAL)
return true;
// 通過枚舉值我們知道waitStatus>0是取消狀態(tài)
if (ws > 0) {
do {
// 循環(huán)向前查找取消節(jié)點,把取消節(jié)點從隊列中剔除
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 設置前驅節(jié)點等待狀態(tài)為SIGNAL
pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
}
return false;
}
parkAndCheckInterrupt主要用于掛起當前線程葫笼,阻塞調用棧深啤,返回當前線程的中斷狀態(tài)。
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
上述方法的流程圖如下:
從上圖可以看出渔欢,跳出當前循環(huán)的條件是當
前置節(jié)點是頭結點墓塌,且當前線程獲取鎖成功
。為了防止因死循環(huán)導致CPU資源被浪費,我們會判斷前置節(jié)點的狀態(tài)來決定是否要將當前線程掛起苫幢,具體掛起流程用流程圖表示如下(shouldParkAfterFailedAcquire流程):3.3.2 等待隊列中出隊時機
// ReentrantLock.java
public void unlock() {
sync.release(1);
}
abstract static class Sync extends AbstractQueuedSynchronizer {
...
// 嘗試釋放鎖
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
// 如果當前線程沒有占有鎖則拋出異常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// c == 0 代表當前線程要釋放鎖
if (c == 0) {
free = true;
// 清空占有鎖的線程對象
setExclusiveOwnerThread(null);
}
// 更新state的值
setState(c);
return free;
}
}
// AbstractQueuedSynchronizer.java
// 釋放鎖
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
// 解鎖隊列總的第一個有效節(jié)點
unparkSuccessor(h);
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
// 獲取當前節(jié)點的waitStatus
int ws = node.waitStatus;
if (ws < 0)
node.compareAndSetWaitStatus(ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node p = tail; p != node && p != null; p = p.prev)
if (p.waitStatus <= 0)
s = p;
}
if (s != null)
LockSupport.unpark(s.thread);
}