?? 本文以及示例源碼已歸檔在 javacore
一寒锚、并發(fā)鎖簡介
確保線程安全最常見的做法是利用鎖機制(Lock
拴测、sychronized
)來對共享數(shù)據(jù)做互斥同步乓旗,這樣在同一個時刻,只有一個線程可以執(zhí)行某個方法或者某個代碼塊集索,那么操作必然是原子性的拯田,線程安全的川蒙。
在工作晌畅、面試中中剩,經常會聽到各種五花八門的鎖屈芜,聽的人云里霧里。鎖的概念術語很多红选,它們是針對不同的問題所提出的,通過簡單的梳理,也不難理解间学。
可重入鎖
可重入鎖又名遞歸鎖仍律,是指 同一個線程在外層方法獲取了鎖,在進入內層方法會自動獲取鎖畔师。
可重入鎖可以在一定程度上避免死鎖姿锭。
-
ReentrantLock
呻此、ReentrantReadWriteLock
是可重入鎖腔寡。這點,從其命名也不難看出。 -
synchronized
也是一個可重入鎖糯彬。
synchronized void setA() throws Exception{
Thread.sleep(1000);
setB();
}
synchronized void setB() throws Exception{
Thread.sleep(1000);
}
上面的代碼就是一個典型場景:如果使用的鎖不是可重入鎖的話,setB
可能不會被當前線程執(zhí)行撩扒,從而造成死鎖。
公平鎖與非公平鎖
- 公平鎖 - 公平鎖是指 多線程按照申請鎖的順序來獲取鎖偶器。
- 非公平鎖 - 非公平鎖是指 多線程不按照申請鎖的順序來獲取鎖 术裸。這就可能會出現(xiàn)優(yōu)先級反轉(后來者居上)或者饑餓現(xiàn)象(某線程總是搶不過別的線程,導致始終無法執(zhí)行)亭枷。
公平鎖為了保證線程申請順序袭艺,勢必要付出一定的性能代價,因此其吞吐量一般低于非公平鎖叨粘。
公平鎖與非公平鎖 在 Java 中的典型實現(xiàn):
-
synchronized
只支持非公平鎖猾编。 -
ReentrantLock
、ReentrantReadWriteLock
升敲,默認是非公平鎖答倡,但支持公平鎖。
獨享鎖與共享鎖
獨享鎖與共享鎖是一種廣義上的說法驴党,從實際用途上來看瘪撇,也常被稱為互斥鎖與讀寫鎖。
- 獨享鎖 - 獨享鎖是指 鎖一次只能被一個線程所持有港庄。
- 共享鎖 - 共享鎖是指 鎖可被多個線程所持有倔既。
獨享鎖與共享鎖在 Java 中的典型實現(xiàn):
-
synchronized
、ReentrantLock
只支持獨享鎖鹏氧。 -
ReentrantReadWriteLock
其寫鎖是獨享鎖渤涌,其讀鎖是共享鎖。讀鎖是共享鎖使得并發(fā)讀是非常高效的把还,讀寫实蓬,寫讀 茸俭,寫寫的過程是互斥的。
悲觀鎖與樂觀鎖
樂觀鎖與悲觀鎖不是指具體的什么類型的鎖安皱,而是處理并發(fā)同步的策略调鬓。
- 悲觀鎖 - 悲觀鎖對于并發(fā)采取悲觀的態(tài)度,認為:不加鎖的并發(fā)操作一定會出問題练俐。悲觀鎖適合寫操作頻繁的場景袖迎。
- 樂觀鎖 - 樂觀鎖對于并發(fā)采取樂觀的態(tài)度,認為:不加鎖的并發(fā)操作也沒什么問題腺晾。對于同一個數(shù)據(jù)的并發(fā)操作,是不會發(fā)生修改的辜贵。在更新數(shù)據(jù)的時候悯蝉,會采用不斷嘗試更新的方式更新數(shù)據(jù)。樂觀鎖適合讀多寫少的場景托慨。
悲觀鎖與樂觀鎖在 Java 中的典型實現(xiàn):
悲觀鎖在 Java 中的應用就是通過使用
synchronized
和Lock
顯示加鎖來進行互斥同步鼻由,這是一種阻塞同步。樂觀鎖在 Java 中的應用就是采用 CAS 機制(CAS 操作通過
Unsafe
類提供厚棵,但這個類不直接暴露為 API蕉世,所以都是間接使用,如各種原子類)婆硬。
輕量級鎖狠轻、重量級鎖與偏向鎖
所謂輕量級鎖與重量級鎖,指的是鎖控制粒度的粗細彬犯。顯然向楼,控制粒度越細,阻塞開銷越小谐区,并發(fā)性也就越高湖蜕。
Java 1.6 以前,重量級鎖一般指的是 synchronized
宋列,而輕量級鎖指的是 volatile
昭抒。
Java 1.6 以后,針對 synchronized
做了大量優(yōu)化炼杖,引入 4 種鎖狀態(tài): 無鎖狀態(tài)灭返、偏向鎖、輕量級鎖和重量級鎖嘹叫。鎖可以單向的從偏向鎖升級到輕量級鎖婆殿,再從輕量級鎖升級到重量級鎖 。
偏向鎖 - 偏向鎖是指一段同步代碼一直被一個線程所訪問罩扇,那么該線程會自動獲取鎖婆芦。降低獲取鎖的代價怕磨。
輕量級鎖 - 是指當鎖是偏向鎖的時候,被另一個線程所訪問消约,偏向鎖就會升級為輕量級鎖肠鲫,其他線程會通過自旋的形式嘗試獲取鎖,不會阻塞或粮,提高性能导饲。
重量級鎖 - 是指當鎖為輕量級鎖的時候,另一個線程雖然是自旋氯材,但自旋不會一直持續(xù)下去渣锦,當自旋一定次數(shù)的時候,還沒有獲取到鎖氢哮,就會進入阻塞袋毙,該鎖膨脹為重量級鎖。重量級鎖會讓其他申請的線程進入阻塞冗尤,性能降低听盖。
分段鎖
分段鎖其實是一種鎖的設計,并不是具體的一種鎖裂七。所謂分段鎖皆看,就是把鎖的對象分成多段,每段獨立控制背零,使得鎖粒度更細腰吟,減少阻塞開銷,從而提高并發(fā)性捉兴。這其實很好理解蝎困,就像高速公路上的收費站,如果只有一個收費口倍啥,那所有的車只能排成一條隊繳費禾乘;如果有多個收費口,就可以分流了虽缕。
Hashtable
使用 synchronized
修飾方法來保證線程安全性始藕,那么面對線程的訪問,Hashtable 就會鎖住整個對象氮趋,所有的其它線程只能等待伍派,這種阻塞方式的吞吐量顯然很低。
Java 1.7 以前的 ConcurrentHashMap
就是分段鎖的典型案例剩胁。ConcurrentHashMap
維護了一個 Segment
數(shù)組诉植,一般稱為分段桶。
final Segment<K,V>[] segments;
當有線程訪問 ConcurrentHashMap
的數(shù)據(jù)時昵观,ConcurrentHashMap
會先根據(jù) hashCode 計算出數(shù)據(jù)在哪個桶(即哪個 Segment)晾腔,然后鎖住這個 Segment
舌稀。
顯示鎖和內置鎖
Java 1.5 之前,協(xié)調對共享對象的訪問時可以使用的機制只有 synchronized
和 volatile
灼擂。這兩個都屬于內置鎖壁查,即鎖的申請和釋放都是由 JVM 所控制。
Java 1.5 之后剔应,增加了新的機制:ReentrantLock
睡腿、ReentrantReadWriteLock
,這類鎖的申請和釋放都可以由程序所控制峻贮,所以常被稱為顯示鎖席怪。
??
synchronized
的用法和原理可以參考:Java 并發(fā)基礎機制 - synchronized 。:bell: 注意:如果不需要
ReentrantLock
月洛、ReentrantReadWriteLock
所提供的高級同步特性何恶,應該優(yōu)先考慮使用synchronized
。理由如下:
- Java 1.6 以后嚼黔,
synchronized
做了大量的優(yōu)化,其性能已經與ReentrantLock
惜辑、ReentrantReadWriteLock
基本上持平唬涧。- 從趨勢來看,Java 未來更可能會優(yōu)化
synchronized
盛撑,而不是ReentrantLock
碎节、ReentrantReadWriteLock
,因為synchronized
是 JVM 內置屬性抵卫,它能執(zhí)行一些優(yōu)化狮荔。ReentrantLock
、ReentrantReadWriteLock
申請和釋放鎖都是由程序控制介粘,如果使用不當殖氏,可能造成死鎖,這是很危險的姻采。
以下對比一下顯示鎖和內置鎖的差異:
-
主動獲取鎖和釋放鎖
-
synchronized
不能主動獲取鎖和釋放鎖婚瓜。獲取鎖和釋放鎖都是 JVM 控制的刑棵。 -
ReentrantLock
可以主動獲取鎖和釋放鎖巴刻。(如果忘記釋放鎖,就可能產生死鎖)蛉签。
-
-
響應中斷
-
synchronized
不能響應中斷。 -
ReentrantLock
可以響應中斷督弓。
-
-
超時機制
-
synchronized
沒有超時機制蒂阱。 -
ReentrantLock
有超時機制。ReentrantLock
可以設置超時時間狂塘,超時后自動釋放鎖录煤,避免一直等待。
-
-
支持公平鎖
-
synchronized
只支持非公平鎖荞胡。 -
ReentrantLock
支持非公平鎖和公平鎖妈踊。
-
-
是否支持共享
- 被
synchronized
修飾的方法或代碼塊,只能被一個線程訪問(獨享)泪漂。如果這個線程被阻塞廊营,其他線程也只能等待 -
ReentrantLock
可以基于Condition
靈活的控制同步條件。
- 被
-
是否支持讀寫分離
-
synchronized
不支持讀寫鎖分離萝勤; -
ReentrantReadWriteLock
支持讀寫鎖露筒,從而使阻塞讀寫的操作分開,有效提高并發(fā)性敌卓。
-
二慎式、AQS
AbstractQueuedSynchronizer
(簡稱 AQS)是隊列同步器,顧名思義趟径,其主要作用是處理同步瘪吏。它是并發(fā)鎖和很多同步工具類的實現(xiàn)基石(如ReentrantLock
、ReentrantReadWriteLock
蜗巧、Semaphore
等)掌眠。因此,要想深入理解
ReentrantLock
惧蛹、ReentrantReadWriteLock
等并發(fā)鎖和同步工具扇救,必須先理解 AQS 的要點和原理。
AQS 的要點
在 java.util.concurrent.locks
包中的相關鎖(常用的有 ReentrantLock
香嗓、 ReadWriteLock
)都是基于 AQS 來實現(xiàn)迅腔。這些鎖都沒有直接繼承 AQS,而是定義了一個 Sync
類去繼承 AQS靠娱。為什么要這樣呢沧烈?因為鎖面向的是使用用戶,而同步器面向的則是線程控制像云,那么在鎖的實現(xiàn)中聚合同步器而不是直接繼承 AQS 就可以很好的隔離二者所關注的事情锌雀。
AQS 提供了對獨享鎖與共享鎖的支持蚂夕。
獨享鎖 API
獲取、釋放獨享鎖的主要 API 如下:
public final void acquire(int arg)
public final void acquireInterruptibly(int arg)
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
public final boolean release(int arg)
-
acquire
- 獲取獨占鎖腋逆。 -
acquireInterruptibly
- 獲取可中斷的獨占鎖婿牍。 -
tryAcquireNanos
- 嘗試在指定時間內獲取可中斷的獨占鎖。在以下三種情況下回返回:- 在超時時間內惩歉,當前線程成功獲取了鎖等脂;
- 當前線程在超時時間內被中斷;
- 超時時間結束撑蚌,仍未獲得鎖返回 false上遥。
-
release
- 釋放獨占鎖。
共享鎖 API
獲取争涌、釋放共享鎖的主要 API 如下:
public final void acquireShared(int arg)
public final void acquireSharedInterruptibly(int arg)
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
public final boolean releaseShared(int arg)
-
acquireShared
- 獲取共享鎖粉楚。 -
acquireSharedInterruptibly
- 獲取可中斷的共享鎖。 -
tryAcquireSharedNanos
- 嘗試在指定時間內獲取可中斷的共享鎖亮垫。 -
release
- 釋放共享鎖模软。
AQS 的原理
AQS 的數(shù)據(jù)結構
閱讀 AQS 的源碼,可以發(fā)現(xiàn):AQS 繼承自 AbstractOwnableSynchronize
饮潦。
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
/** 等待隊列的隊頭撵摆,懶加載。只能通過 setHead 方法修改害晦。 */
private transient volatile Node head;
/** 等待隊列的隊尾,懶加載暑中。只能通過 enq 方法添加新的等待節(jié)點壹瘟。*/
private transient volatile Node tail;
/** 同步狀態(tài) */
private volatile int state;
}
-
state
- AQS 使用一個整型的volatile
變量來 維護同步狀態(tài)。- 這個整數(shù)狀態(tài)的意義由子類來賦予鳄逾,如
ReentrantLock
中該狀態(tài)值表示所有者線程已經重復獲取該鎖的次數(shù)稻轨,Semaphore
中該狀態(tài)值表示剩余的許可數(shù)量。
- 這個整數(shù)狀態(tài)的意義由子類來賦予鳄逾,如
-
head
和tail
- AQS 維護了一個Node
類型(AQS 的內部類)的雙鏈表來完成同步狀態(tài)的管理雕凹。這個雙鏈表是一個雙向的 FIFO 隊列殴俱,通過head
和tail
指針進行訪問。當 有線程獲取鎖失敗后枚抵,就被添加到隊列末尾线欲。
再來看一下 Node
的源碼
static final class Node {
/** 該等待同步的節(jié)點處于共享模式 */
static final Node SHARED = new Node();
/** 該等待同步的節(jié)點處于獨占模式 */
static final Node EXCLUSIVE = null;
/** 線程等待狀態(tài),狀態(tài)值有: 0汽摹、1李丰、-1、-2逼泣、-3 */
volatile int waitStatus;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
/** 前驅節(jié)點 */
volatile Node prev;
/** 后繼節(jié)點 */
volatile Node next;
/** 等待鎖的線程 */
volatile Thread thread;
/** 和節(jié)點是否共享有關 */
Node nextWaiter;
}
很顯然趴泌,Node 是一個雙鏈表結構舟舒。
-
waitStatus
-Node
使用一個整型的volatile
變量來 維護 AQS 同步隊列中線程節(jié)點的狀態(tài)。waitStatus
有五個狀態(tài)值:-
CANCELLED(1)
- 此狀態(tài)表示:該節(jié)點的線程可能由于超時或被中斷而 處于被取消(作廢)狀態(tài)嗜憔,一旦處于這個狀態(tài)秃励,表示這個節(jié)點應該從等待隊列中移除。 -
SIGNAL(-1)
- 此狀態(tài)表示:后繼節(jié)點會被掛起吉捶,因此在當前節(jié)點釋放鎖或被取消之后夺鲜,必須喚醒(unparking
)其后繼結點。 -
CONDITION(-2)
- 此狀態(tài)表示:該節(jié)點的線程 處于等待條件狀態(tài)帚稠,不會被當作是同步隊列上的節(jié)點谣旁,直到被喚醒(signal
),設置其值為 0滋早,再重新進入阻塞狀態(tài)榄审。 -
PROPAGATE(-3)
- 此狀態(tài)表示:下一個acquireShared
應無條件傳播。 - 0 - 非以上狀態(tài)杆麸。
-
獨占鎖的獲取和釋放
獲取獨占鎖
AQS 中使用 acquire(int arg)
方法獲取獨占鎖搁进,其大致流程如下:
- 先嘗試獲取同步狀態(tài),如果獲取同步狀態(tài)成功昔头,則結束方法饼问,直接返回。
- 如果獲取同步狀態(tài)不成功揭斧,AQS 會不斷嘗試利用 CAS 操作將當前線程插入等待同步隊列的隊尾莱革,直到成功為止。
- 接著讹开,不斷嘗試為等待隊列中的線程節(jié)點獲取獨占鎖盅视。
詳細流程可以用下圖來表示,請結合源碼來理解(一圖勝千言):
釋放獨占鎖
AQS 中使用 release(int arg)
方法釋放獨占鎖旦万,其大致流程如下:
- 先嘗試獲取解鎖線程的同步狀態(tài)闹击,如果獲取同步狀態(tài)不成功,則結束方法成艘,直接返回赏半。
- 如果獲取同步狀態(tài)成功,AQS 會嘗試喚醒當前線程節(jié)點的后繼節(jié)點淆两。
獲取可中斷的獨占鎖
AQS 中使用 acquireInterruptibly(int arg)
方法獲取可中斷的獨占鎖断箫。
acquireInterruptibly(int arg)
實現(xiàn)方式相較于獲取獨占鎖方法( acquire
)非常相似,區(qū)別僅在于它會通過 Thread.interrupted
檢測當前線程是否被中斷琼腔,如果是瑰枫,則立即拋出中斷異常(InterruptedException
)。
獲取超時等待式的獨占鎖
AQS 中使用 tryAcquireNanos(int arg)
方法獲取超時等待的獨占鎖。
doAcquireNanos 的實現(xiàn)方式 相較于獲取獨占鎖方法( acquire
)非常相似光坝,區(qū)別在于它會根據(jù)超時時間和當前時間計算出截止時間尸诽。在獲取鎖的流程中,會不斷判斷是否超時盯另,如果超時瓢捉,直接返回 false硼讽;如果沒超時奄毡,則用 LockSupport.parkNanos
來阻塞當前線程出革。
共享鎖的獲取和釋放
獲取共享鎖
AQS 中使用 acquireShared(int arg)
方法獲取共享鎖。
acquireShared
方法和 acquire
方法的邏輯很相似芝发,區(qū)別僅在于自旋的條件以及節(jié)點出隊的操作有所不同绪商。
成功獲得共享鎖的條件如下:
-
tryAcquireShared(arg)
返回值大于等于 0 (這意味著共享鎖的 permit 還沒有用完)。 - 當前節(jié)點的前驅節(jié)點是頭結點辅鲸。
釋放共享鎖
AQS 中使用 releaseShared(int arg)
方法釋放共享鎖格郁。
releaseShared
首先會嘗試釋放同步狀態(tài),如果成功独悴,則解鎖一個或多個后繼線程節(jié)點例书。釋放共享鎖和釋放獨享鎖流程大體相似,區(qū)別在于:
對于獨享模式刻炒,如果需要 SIGNAL决采,釋放僅相當于調用頭節(jié)點的 unparkSuccessor
。
獲取可中斷的共享鎖
AQS 中使用 acquireSharedInterruptibly(int arg)
方法獲取可中斷的共享鎖坟奥。
acquireSharedInterruptibly
方法與 acquireInterruptibly
幾乎一致树瞭,不再贅述。
獲取超時等待式的共享鎖
AQS 中使用 tryAcquireSharedNanos(int arg)
方法獲取超時等待式的共享鎖爱谁。
tryAcquireSharedNanos
方法與 tryAcquireNanos
幾乎一致移迫,不再贅述。
三管行、ReentrantLock
ReentrantLock
類是Lock
接口的具體實現(xiàn),它是一個可重入鎖邪媳。與內置鎖synchronized
不同捐顷,ReentrantLock
提供了一組無條件的、可輪詢的雨效、定時的以及可中斷的鎖操作迅涮,所有獲取鎖、釋放鎖的操作都是顯式的操作徽龟。
ReentrantLock 的特性
ReentrantLock
的特性如下:
-
ReentrantLock
提供了與synchronized
相同的互斥性叮姑、內存可見性和可重入性。 -
ReentrantLock
支持公平鎖和非公平鎖(默認)兩種模式。 -
ReentrantLock
實現(xiàn)了Lock
接口传透,支持了synchronized
所不具備的靈活性耘沼。-
synchronized
無法中斷一個正在等待獲取鎖的線程 -
synchronized
無法在請求獲取一個鎖時無休止地等待
-
Lock
的接口定義如下:
public interface Lock {
void lock();
void lockInterruptibly() throws InterruptedException;
boolean tryLock();
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
void unlock();
Condition newCondition();
}
-
lock()
- 獲取鎖。 -
unlock()
- 釋放鎖朱盐。 -
tryLock()
- 嘗試獲取鎖群嗤,僅在調用時鎖未被另一個線程持有的情況下,才獲取該鎖兵琳。 -
tryLock(long time, TimeUnit unit)
- 和tryLock()
類似狂秘,區(qū)別僅在于限定時間,如果限定時間內未獲取到鎖躯肌,視為失敗者春。 -
lockInterruptibly()
- 鎖未被另一個線程持有,且線程沒有被中斷的情況下清女,才能獲取鎖钱烟。 -
newCondition()
- 返回一個綁定到Lock
對象上的Condition
實例。
ReentrantLock 的用法
前文了解了 ReentrantLock
的特性校仑,接下來忠售,我們要講述其具體用法。
ReentrantLock 的構造方法
ReentrantLock
有兩個構造方法:
public ReentrantLock() {}
public ReentrantLock(boolean fair) {}
-
ReentrantLock()
- 默認構造方法會初始化一個非公平鎖(NonfairSync)迄沫; -
ReentrantLock(boolean)
-new ReentrantLock(true)
會初始化一個公平鎖(FairSync)稻扬。
lock 和 unlock 方法
-
lock()
- 無條件獲取鎖。如果當前線程無法獲取鎖羊瘩,則當前線程進入休眠狀態(tài)不可用泰佳,直至當前線程獲取到鎖。如果該鎖沒有被另一個線程持有尘吗,則獲取該鎖并立即返回逝她,將鎖的持有計數(shù)設置為 1。 -
unlock()
- 用于釋放鎖睬捶。
:bell: 注意:請務必牢記黔宛,獲取鎖操作
lock()
必須在try catch
塊中進行,并且將釋放鎖操作unlock()
放在finally
塊中進行擒贸,以保證鎖一定被被釋放臀晃,防止死鎖的發(fā)生。
示例:ReentrantLock
的基本操作
public class ReentrantLockDemo {
public static void main(String[] args) {
Task task = new Task();
MyThread tA = new MyThread("Thread-A", task);
MyThread tB = new MyThread("Thread-B", task);
MyThread tC = new MyThread("Thread-C", task);
tA.start();
tB.start();
tC.start();
}
static class MyThread extends Thread {
private Task task;
public MyThread(String name, Task task) {
super(name);
this.task = task;
}
@Override
public void run() {
task.execute();
}
}
static class Task {
private ReentrantLock lock = new ReentrantLock();
public void execute() {
lock.lock();
try {
for (int i = 0; i < 3; i++) {
System.out.println(lock.toString());
// 查詢當前線程 hold 住此鎖的次數(shù)
System.out.println("\t holdCount: " + lock.getHoldCount());
// 查詢正等待獲取此鎖的線程數(shù)
System.out.println("\t queuedLength: " + lock.getQueueLength());
// 是否為公平鎖
System.out.println("\t isFair: " + lock.isFair());
// 是否被鎖住
System.out.println("\t isLocked: " + lock.isLocked());
// 是否被當前線程持有鎖
System.out.println("\t isHeldByCurrentThread: " + lock.isHeldByCurrentThread());
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} finally {
lock.unlock();
}
}
}
}
輸出結果:
java.util.concurrent.locks.ReentrantLock@64fcd88a[Locked by thread Thread-A]
holdCount: 1
queuedLength: 2
isFair: false
isLocked: true
isHeldByCurrentThread: true
java.util.concurrent.locks.ReentrantLock@64fcd88a[Locked by thread Thread-C]
holdCount: 1
queuedLength: 1
isFair: false
isLocked: true
isHeldByCurrentThread: true
// ...
tryLock 方法
與無條件獲取鎖相比介劫,tryLock 有更完善的容錯機制徽惋。
-
tryLock()
- 可輪詢獲取鎖。如果成功座韵,則返回 true险绘;如果失敗,則返回 false。也就是說宦棺,這個方法無論成敗都會立即返回瓣距,獲取不到鎖(鎖已被其他線程獲取)時不會一直等待渺氧。 -
tryLock(long, TimeUnit)
- 可定時獲取鎖旨涝。和tryLock()
類似,區(qū)別僅在于這個方法在獲取不到鎖時會等待一定的時間侣背,在時間期限之內如果還獲取不到鎖白华,就返回 false。如果如果一開始拿到鎖或者在等待期間內拿到了鎖贩耐,則返回 true弧腥。
示例:ReentrantLock
的 tryLock()
操作
修改上個示例中的 execute()
方法
public void execute() {
if (lock.tryLock()) {
try {
for (int i = 0; i < 3; i++) {
// 略...
}
} finally {
lock.unlock();
}
} else {
System.out.println(Thread.currentThread().getName() + " 獲取鎖失敗");
}
}
示例:ReentrantLock
的 tryLock(long, TimeUnit)
操作
修改上個示例中的 execute()
方法
public void execute() {
try {
if (lock.tryLock(2, TimeUnit.SECONDS)) {
try {
for (int i = 0; i < 3; i++) {
// 略...
}
} finally {
lock.unlock();
}
} else {
System.out.println(Thread.currentThread().getName() + " 獲取鎖失敗");
}
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + " 獲取鎖超時");
e.printStackTrace();
}
}
lockInterruptibly 方法
-
lockInterruptibly()
- 可中斷獲取鎖〕碧可中斷獲取鎖可以在獲得鎖的同時保持對中斷的響應管搪。可中斷獲取鎖比其它獲取鎖的方式稍微復雜一些铡买,需要兩個try-catch
塊(如果在獲取鎖的操作中拋出了InterruptedException
更鲁,那么可以使用標準的try-finally
加鎖模式)。- 舉例來說:假設有兩個線程同時通過
lock.lockInterruptibly()
獲取某個鎖時奇钞,若線程 A 獲取到了鎖澡为,則線程 B 只能等待。若此時對線程 B 調用threadB.interrupt()
方法能夠中斷線程 B 的等待過程景埃。由于lockInterruptibly()
的聲明中拋出了異常媒至,所以lock.lockInterruptibly()
必須放在try
塊中或者在調用lockInterruptibly()
的方法外聲明拋出InterruptedException
。
- 舉例來說:假設有兩個線程同時通過
:bell: 注意:當一個線程獲取了鎖之后谷徙,是不會被
interrupt()
方法中斷的拒啰。單獨調用interrupt()
方法不能中斷正在運行狀態(tài)中的線程,只能中斷阻塞狀態(tài)中的線程完慧。因此當通過lockInterruptibly()
方法獲取某個鎖時谋旦,如果未獲取到鎖,只有在等待的狀態(tài)下屈尼,才可以響應中斷蛤织。
示例:ReentrantLock
的 lockInterruptibly()
操作
修改上個示例中的 execute()
方法
public void execute() {
try {
lock.lockInterruptibly();
for (int i = 0; i < 3; i++) {
// 略...
}
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + "被中斷");
e.printStackTrace();
} finally {
lock.unlock();
}
}
newCondition 方法
newCondition()
- 返回一個綁定到 Lock
對象上的 Condition
實例。Condition
的特性和具體方法請閱讀下文 Condition
鸿染。
ReentrantLock 的原理
ReentrantLock 的數(shù)據(jù)結構
閱讀 ReentrantLock
的源碼,可以發(fā)現(xiàn)它有一個核心字段:
private final Sync sync;
-
sync
- 內部抽象類ReentrantLock.Sync
對象乞巧,Sync
繼承自 AQS涨椒。它有兩個子類: -
ReentrantLock.FairSync
- 公平鎖。 -
ReentrantLock.NonfairSync
- 非公平鎖。
查看源碼可以發(fā)現(xiàn)蚕冬,ReentrantLock
實現(xiàn) Lock
接口其實是調用 ReentrantLock.FairSync
或 ReentrantLock.NonfairSync
中各自的實現(xiàn)免猾,這里不一一列舉。
ReentrantLock 的獲取鎖和釋放鎖
ReentrantLock 獲取鎖和釋放鎖的接口囤热,從表象看猎提,是調用 ReentrantLock.FairSync
或 ReentrantLock.NonfairSync
中各自的實現(xiàn);從本質上看旁蔼,是基于 AQS 的實現(xiàn)锨苏。
仔細閱讀源碼很容易發(fā)現(xiàn):
void lock()
調用 Sync 的 lock() 方法。void lockInterruptibly()
直接調用 AQS 的 獲取可中斷的獨占鎖 方法lockInterruptibly()
棺聊。boolean tryLock()
調用 Sync 的nonfairTryAcquire()
伞租。boolean tryLock(long time, TimeUnit unit)
直接調用 AQS 的 獲取超時等待式的獨占鎖 方法tryAcquireNanos(int arg, long nanosTimeout)
。void unlock()
直接調用 AQS 的 釋放獨占鎖 方法release(int arg)
限佩。
直接調用 AQS 接口的方法就不再贅述了葵诈,其原理在 [AQS 的原理](#AQS 的原理) 中已經用很大篇幅進行過講解。
nonfairTryAcquire
方法源碼如下:
// 公平鎖和非公平鎖都會用這個方法區(qū)嘗試獲取鎖
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
// 如果同步狀態(tài)為0祟同,將其設為 acquires作喘,并設置當前線程為排它線程
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
處理流程很簡單:
- 如果同步狀態(tài)為 0,設置同步狀態(tài)設為 acquires晕城,并設置當前線程為排它線程泞坦,然后返回 true,獲取鎖成功广辰。
- 如果同步狀態(tài)不為 0 且當前線程為排它線程暇矫,設置同步狀態(tài)為當前狀態(tài)值+acquires 值,然后返回 true择吊,獲取鎖成功李根。
- 否則,返回 false几睛,獲取鎖失敗房轿。
lock 方法在公平鎖和非公平鎖中的實現(xiàn):
二者的區(qū)別僅在于申請非公平鎖時,如果同步狀態(tài)為 0所森,嘗試將其設為 1囱持,如果成功,直接將當前線程置為排它線程焕济;否則和公平鎖一樣纷妆,調用 AQS 獲取獨占鎖方法 acquire
。
// 非公平鎖實現(xiàn)
final void lock() {
if (compareAndSetState(0, 1))
// 如果同步狀態(tài)為0晴弃,將其設為1掩幢,并設置當前線程為排它線程
setExclusiveOwnerThread(Thread.currentThread());
else
// 調用 AQS 獲取獨占鎖方法 acquire
acquire(1);
}
// 公平鎖實現(xiàn)
final void lock() {
// 調用 AQS 獲取獨占鎖方法 acquire
acquire(1);
}
四逊拍、ReentrantReadWriteLock
ReentrantReadWriteLock
類是ReadWriteLock
接口的具體實現(xiàn),它是一個可重入的讀寫鎖际邻。ReentrantReadWriteLock
維護了一對讀寫鎖芯丧,將讀寫鎖分開,有利于提高并發(fā)效率世曾。
ReentrantLock
實現(xiàn)了一種標準的互斥鎖:每次最多只有一個線程能持有ReentrantLock
缨恒。但對于維護數(shù)據(jù)的完整性來說,互斥通常是一種過于強硬的加鎖策略轮听,因此也就不必要地限制了并發(fā)性骗露。大多數(shù)場景下,讀操作比寫操作頻繁蕊程,只要保證每個線程都能讀取到最新數(shù)據(jù)椒袍,并且在讀數(shù)據(jù)時不會有其它線程在修改數(shù)據(jù),那么就不會出現(xiàn)線程安全問題藻茂。這種策略減少了互斥同步驹暑,自然也提升了并發(fā)性能,ReentrantReadWriteLock
就是這種策略的具體實現(xiàn)辨赐。
ReentrantReadWriteLock 的特性
ReentrantReadWriteLock 的特性如下:
-
ReentrantReadWriteLock
適用于讀多寫少的場景优俘。如果是寫多讀少的場景,由于ReentrantReadWriteLock
其內部實現(xiàn)比ReentrantLock
復雜掀序,性能可能反而要差一些帆焕。如果存在這樣的問題,需要具體問題具體分析不恭。由于ReentrantReadWriteLock
的讀寫鎖(ReadLock
叶雹、WriteLock
)都實現(xiàn)了Lock
接口,所以要替換為ReentrantLock
也較為容易换吧。 -
ReentrantReadWriteLock
實現(xiàn)了ReadWriteLock
接口折晦,支持了ReentrantLock
所不具備的讀寫鎖分離。ReentrantReadWriteLock
維護了一對讀寫鎖(ReadLock
沾瓦、WriteLock
)满着。將讀寫鎖分開,有利于提高并發(fā)效率贯莺。ReentrantReadWriteLock
的加鎖策略是:允許多個讀操作并發(fā)執(zhí)行风喇,但每次只允許一個寫操作。 -
ReentrantReadWriteLock
為讀寫鎖都提供了可重入的加鎖語義缕探。 -
ReentrantReadWriteLock
支持公平鎖和非公平鎖(默認)兩種模式魂莫。
ReadWriteLock
接口定義如下:
public interface ReadWriteLock {
Lock readLock();
Lock writeLock();
}
-
readLock
- 返回用于讀操作的鎖(ReadLock
)。 -
writeLock
- 返回用于寫操作的鎖(WriteLock
)爹耗。
在讀寫鎖和寫入鎖之間的交互可以采用多種實現(xiàn)方式耙考,ReadWriteLock
的一些可選實現(xiàn)包括:
- 釋放優(yōu)先 - 當一個寫入操作釋放寫鎖秽誊,并且隊列中同時存在讀線程和寫線程,那么應該優(yōu)先選擇讀線程琳骡、寫線程,還是最先發(fā)出請求的線程讼溺?
- 讀線程插隊 - 如果鎖是由讀線程持有楣号,但有寫線程正在等待,那么新到達的讀線程能否立即獲得訪問權怒坯,還是應該在寫線程后面等待炫狱?如果允許讀線程插隊到寫線程之前,那么將提高并發(fā)性剔猿,但可能造成線程饑餓問題视译。
- 重入性 - 讀鎖和寫鎖是否是可重入的?
- 降級 - 如果一個線程持有寫入鎖归敬,那么它能否在不釋放該鎖的情況下獲得讀鎖酷含?這可能會使得寫鎖被降級為讀鎖,同時不允許其他寫線程修改被保護的資源汪茧。
- 升級 - 讀鎖能否優(yōu)先于其他正在等待的讀線程和寫線程而升級為一個寫鎖椅亚?在大多數(shù)的讀寫鎖實現(xiàn)中并不支持升級,因為如果沒有顯式的升級操作舱污,那么很容易造成死鎖呀舔。
ReentrantReadWriteLock 的用法
前文了解了 ReentrantReadWriteLock
的特性,接下來扩灯,我們要講述其具體用法媚赖。
ReentrantReadWriteLock 的構造方法
ReentrantReadWriteLock
和 ReentrantLock
一樣,也有兩個構造方法珠插,且用法相似惧磺。
public ReentrantReadWriteLock() {}
public ReentrantReadWriteLock(boolean fair) {}
-
ReentrantReadWriteLock()
- 默認構造方法會初始化一個非公平鎖(NonfairSync)。在非公平的鎖中丧失,線程獲得鎖的順序是不確定的豺妓。寫線程降級為讀線程是可以的,但讀線程升級為寫線程是不可以的(這樣會導致死鎖)布讹。 -
ReentrantReadWriteLock(boolean)
-new ReentrantLock(true)
會初始化一個公平鎖(FairSync)琳拭。對于公平鎖,等待時間最長的線程將優(yōu)先獲得鎖描验。如果這個鎖是讀線程持有白嘁,則另一個線程請求寫鎖,那么其他讀線程都不能獲得讀鎖膘流,直到寫線程釋放寫鎖絮缅。
ReentrantReadWriteLock 的使用實例
在 ReentrantReadWriteLock
的特性 中已經介紹過鲁沥,ReentrantReadWriteLock
的讀寫鎖(ReadLock
、WriteLock
)都實現(xiàn)了 Lock
接口耕魄,所以其各自獨立的使用方式與 ReentrantLock
一樣画恰,這里不再贅述。
ReentrantReadWriteLock
與 ReentrantLock
用法上的差異吸奴,主要在于讀寫鎖的配合使用允扇。本文以一個典型使用場景來進行講解。
示例:基于 ReentrantReadWriteLock
實現(xiàn)一個簡單的本地緩存
/**
* 簡單的無界緩存實現(xiàn)
* <p>
* 使用 WeakHashMap 存儲鍵值對则奥。WeakHashMap 中存儲的對象是弱引用考润,JVM GC 時會自動清除沒有被引用的弱引用對象。
*/
static class UnboundedCache<K, V> {
private final Map<K, V> cacheMap = new WeakHashMap<>();
private final ReentrantReadWriteLock cacheLock = new ReentrantReadWriteLock();
public V get(K key) {
cacheLock.readLock().lock();
V value;
try {
value = cacheMap.get(key);
String log = String.format("%s 讀數(shù)據(jù) %s:%s", Thread.currentThread().getName(), key, value);
System.out.println(log);
} finally {
cacheLock.readLock().unlock();
}
return value;
}
public V put(K key, V value) {
cacheLock.writeLock().lock();
try {
cacheMap.put(key, value);
String log = String.format("%s 寫入數(shù)據(jù) %s:%s", Thread.currentThread().getName(), key, value);
System.out.println(log);
} finally {
cacheLock.writeLock().unlock();
}
return value;
}
public V remove(K key) {
cacheLock.writeLock().lock();
try {
return cacheMap.remove(key);
} finally {
cacheLock.writeLock().unlock();
}
}
public void clear() {
cacheLock.writeLock().lock();
try {
this.cacheMap.clear();
} finally {
cacheLock.writeLock().unlock();
}
}
}
說明:
- 使用
WeakHashMap
而不是HashMap
來存儲鍵值對读处。WeakHashMap
中存儲的對象是弱引用糊治,JVM GC 時會自動清除沒有被引用的弱引用對象。 - 向
Map
寫數(shù)據(jù)前加寫鎖罚舱,寫完后井辜,釋放寫鎖。 - 向
Map
讀數(shù)據(jù)前加讀鎖馆匿,讀完后抑胎,釋放讀鎖。
測試其線程安全性:
/**
* @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
* @since 2020-01-01
*/
public class ReentrantReadWriteLockDemo {
static UnboundedCache<Integer, Integer> cache = new UnboundedCache<>();
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < 20; i++) {
executorService.execute(new MyThread());
cache.get(0);
}
executorService.shutdown();
}
/** 線程任務每次向緩存中寫入 3 個隨機值渐北,key 固定 */
static class MyThread implements Runnable {
@Override
public void run() {
Random random = new Random();
for (int i = 0; i < 3; i++) {
cache.put(i, random.nextInt(100));
}
}
}
}
說明:示例中阿逃,通過線程池啟動 20 個并發(fā)任務。任務每次向緩存中寫入 3 個隨機值赃蛛,key 固定恃锉;然后主線程每次固定讀取緩存中第一個 key 的值。
輸出結果:
main 讀數(shù)據(jù) 0:null
pool-1-thread-1 寫入數(shù)據(jù) 0:16
pool-1-thread-1 寫入數(shù)據(jù) 1:58
pool-1-thread-1 寫入數(shù)據(jù) 2:50
main 讀數(shù)據(jù) 0:16
pool-1-thread-1 寫入數(shù)據(jù) 0:85
pool-1-thread-1 寫入數(shù)據(jù) 1:76
pool-1-thread-1 寫入數(shù)據(jù) 2:46
pool-1-thread-2 寫入數(shù)據(jù) 0:21
pool-1-thread-2 寫入數(shù)據(jù) 1:41
pool-1-thread-2 寫入數(shù)據(jù) 2:63
main 讀數(shù)據(jù) 0:21
main 讀數(shù)據(jù) 0:21
// ...
ReentrantReadWriteLock 的原理
前面了解了 ReentrantLock
的原理呕臂,理解 ReentrantReadWriteLock
就容易多了破托。
ReentrantReadWriteLock 的數(shù)據(jù)結構
閱讀 ReentrantReadWriteLock 的源碼,可以發(fā)現(xiàn)它有三個核心字段:
/** Inner class providing readlock */
private final ReentrantReadWriteLock.ReadLock readerLock;
/** Inner class providing writelock */
private final ReentrantReadWriteLock.WriteLock writerLock;
/** Performs all synchronization mechanics */
final Sync sync;
public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; }
-
sync
- 內部類ReentrantReadWriteLock.Sync
對象歧蒋。與ReentrantLock
類似土砂,它有兩個子類:ReentrantReadWriteLock.FairSync
和ReentrantReadWriteLock.NonfairSync
,分別表示公平鎖和非公平鎖的實現(xiàn)谜洽。 -
readerLock
- 內部類ReentrantReadWriteLock.ReadLock
對象萝映,這是一把讀鎖。 -
writerLock
- 內部類ReentrantReadWriteLock.WriteLock
對象阐虚,這是一把寫鎖序臂。
ReentrantReadWriteLock 的獲取鎖和釋放鎖
public static class ReadLock implements Lock, java.io.Serializable {
// 調用 AQS 獲取共享鎖方法
public void lock() {
sync.acquireShared(1);
}
// 調用 AQS 釋放共享鎖方法
public void unlock() {
sync.releaseShared(1);
}
}
public static class WriteLock implements Lock, java.io.Serializable {
// 調用 AQS 獲取獨占鎖方法
public void lock() {
sync.acquire(1);
}
// 調用 AQS 釋放獨占鎖方法
public void unlock() {
sync.release(1);
}
}
五、Condition
前文中提過 Lock
接口中 有一個 newCondition()
方法用于返回一個綁定到 Lock
對象上的 Condition
實例实束。Condition
是什么奥秆?有什么作用逊彭?本節(jié)將一一講解。
在單線程中构订,一段代碼的執(zhí)行可能依賴于某個狀態(tài)侮叮,如果不滿足狀態(tài)條件,代碼就不會被執(zhí)行(典型的場景悼瘾,如:if ... else ...)签赃。在并發(fā)環(huán)境中,當一個線程判斷某個狀態(tài)條件時分尸,其狀態(tài)可能是由于其他線程的操作而改變,這時就需要有一定的協(xié)調機制來確保在同一時刻歹嘹,數(shù)據(jù)只能被一個線程鎖修改箩绍,且修改的數(shù)據(jù)狀態(tài)被所有線程所感知。
Java 1.5 之前尺上,主要是利用 Object
類中的 wait
材蛛、notify
、notifyAll
配合 synchronized
來進行線程間通信(如果不了解其特性怎抛,可以參考:Java 線程基礎 - wait/notify/notifyAll)卑吭。
wait
、notify
马绝、notifyAll
需要配合 synchronized
使用豆赏,不適用于 Lock
。而使用 Lock
的線程富稻,彼此間通信應該使用 Condition
掷邦。這可以理解為,什么樣的鎖配什么樣的鑰匙椭赋。內置鎖(synchronized
)配合內置條件隊列(wait
抚岗、notify
、notifyAll
)哪怔,顯式鎖(Lock
)配合顯式條件隊列(Condition
)宣蔚。
Condition 的特性
Condition
接口定義如下:
public interface Condition {
void await() throws InterruptedException;
void awaitUninterruptibly();
long awaitNanos(long nanosTimeout) throws InterruptedException;
boolean await(long time, TimeUnit unit) throws InterruptedException;
boolean awaitUntil(Date deadline) throws InterruptedException;
void signal();
void signalAll();
}
其中,await
认境、signal
胚委、signalAll
與 wait
、notify
元暴、notifyAll
相對應篷扩,功能也相似。除此以外茉盏,Condition
相比內置條件隊列( wait
鉴未、notify
枢冤、notifyAll
),提供了更為豐富的功能:
- 每個鎖(
Lock
)上可以存在多個Condition
铜秆,這意味著鎖的狀態(tài)條件可以有多個淹真。 - 支持公平的或非公平的隊列操作。
- 支持可中斷的條件等待连茧,相關方法:
awaitUninterruptibly()
核蘸。 - 支持可定時的等待,相關方法:
awaitNanos(long)
、await(long, TimeUnit)
饶套、awaitUntil(Date)
胧瓜。
Condition 的用法
這里以 Condition
來實現(xiàn)一個消費者、生產者模式徙鱼。
:bell: 注意:事實上,解決此類問題使用
CountDownLatch
针姿、Semaphore
等工具更為便捷袱吆、安全。想了解詳情距淫,可以參考 Java 并發(fā)工具類 绞绒。
產品類
class Message {
private final Lock lock = new ReentrantLock();
private final Condition producedMsg = lock.newCondition();
private final Condition consumedMsg = lock.newCondition();
private String message;
private boolean state;
private boolean end;
public void consume() {
//lock
lock.lock();
try {
// no new message wait for new message
while (!state) { producedMsg.await(); }
System.out.println("consume message : " + message);
state = false;
// message consumed, notify waiting thread
consumedMsg.signal();
} catch (InterruptedException ie) {
System.out.println("Thread interrupted - viewMessage");
} finally {
lock.unlock();
}
}
public void produce(String message) {
lock.lock();
try {
// last message not consumed, wait for it be consumed
while (state) { consumedMsg.await(); }
System.out.println("produce msg: " + message);
this.message = message;
state = true;
// new message added, notify waiting thread
producedMsg.signal();
} catch (InterruptedException ie) {
System.out.println("Thread interrupted - publishMessage");
} finally {
lock.unlock();
}
}
public boolean isEnd() {
return end;
}
public void setEnd(boolean end) {
this.end = end;
}
}
消費者
class MessageConsumer implements Runnable {
private Message message;
public MessageConsumer(Message msg) {
message = msg;
}
@Override
public void run() {
while (!message.isEnd()) { message.consume(); }
}
}
生產者
class MessageProducer implements Runnable {
private Message message;
public MessageProducer(Message msg) {
message = msg;
}
@Override
public void run() {
produce();
}
public void produce() {
List<String> msgs = new ArrayList<>();
msgs.add("Begin");
msgs.add("Msg1");
msgs.add("Msg2");
for (String msg : msgs) {
message.produce(msg);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
message.produce("End");
message.setEnd(true);
}
}
測試
public class LockConditionDemo {
public static void main(String[] args) {
Message msg = new Message();
Thread producer = new Thread(new MessageProducer(msg));
Thread consumer = new Thread(new MessageConsumer(msg));
producer.start();
consumer.start();
}
}