Java并發(fā)編程源碼分析系列:
前幾篇文章分析了線程池的原理竞滓,接下來研究鎖的方面曲秉。顯式鎖ReentrantLock和同步工具類的實現(xiàn)基礎(chǔ)都是AQS速侈,所以合起來一齊研究。
什么是AQS
AQS即是AbstractQueuedSynchronizer,一個用來構(gòu)建鎖和同步工具的框架,包括常用的ReentrantLock、CountDownLatch粱挡、Semaphore等。
AQS沒有鎖之類的概念俄精,它有個state變量询筏,是個int類型,在不同場合有著不同含義竖慧。本文研究的是鎖嫌套,為了好理解,姑且先把state當(dāng)成鎖圾旨。
AQS圍繞state提供兩種基本操作“獲取”和“釋放”踱讨,有條雙向隊列存放阻塞的等待線程,并提供一系列判斷和處理方法砍的,簡單說幾點:
- state是獨占的痹筛,還是共享的;
- state被獲取后挨约,其他線程需要等待味混;
- state被釋放后,喚醒等待線程诫惭;
- 線程等不及時翁锡,如何退出等待。
至于線程是否可以獲得state夕土,如何釋放state馆衔,就不是AQS關(guān)心的了,要由子類具體實現(xiàn)怨绣。
直接分析AQS的代碼會比較難明白角溃,所以結(jié)合子類ReentrantLock來分析。AQS的功能可以分為獨占和共享篮撑,ReentrantLock實現(xiàn)了獨占功能减细,是本文分析的目標(biāo)。
ReentrantLock對比synchronized
Lock lock = new ReentranLock();
lock.lock();
try{
//do something
}finally{
lock.unlock();
}
ReentrantLock實現(xiàn)了Lock接口赢笨,加鎖和解鎖都需要顯式寫出未蝌,注意一定要在適當(dāng)時候unlock。
和synchronized相比茧妒,ReentrantLock用起來會復(fù)雜一些萧吠。在基本的加鎖和解鎖上,兩者是一樣的桐筏,所以無特殊情況下纸型,推薦使用synchronized。ReentrantLock的優(yōu)勢在于它更靈活、更強(qiáng)大狰腌,除了常規(guī)的lock()除破、unlock()之外,還有l(wèi)ockInterruptibly()癌别、tryLock()方法皂岔,支持中斷、超時展姐。
公平鎖和非公平鎖
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
ReentrantLock的內(nèi)部類Sync繼承了AQS躁垛,分為公平鎖FairSync和非公平鎖NonfairSync。
- 公平鎖:線程獲取鎖的順序和調(diào)用lock的順序一樣圾笨,F(xiàn)IFO教馆;
- 非公平鎖:線程獲取鎖的順序和調(diào)用lock的順序無關(guān),全憑運(yùn)氣擂达。
ReentrantLock默認(rèn)使用非公平鎖是基于性能考慮土铺,公平鎖為了保證線程規(guī)規(guī)矩矩地排隊,需要增加阻塞和喚醒的時間開銷板鬓。如果直接插隊獲取非公平鎖悲敷,跳過了對隊列的處理,速度會更快俭令。
嘗試獲取鎖
final void lock() { acquire(1);}
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
先來看公平鎖的實現(xiàn)后德,lock方法很簡單的一句話調(diào)用AQS的acquire方法:
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
噢,AQS的tryAcquire不能直接調(diào)用抄腔,因為是否獲取鎖成功是由子類決定的瓢湃,直接看ReentrantLock的tryAcquire的實現(xiàn)。
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
獲取鎖成功分為兩種情況赫蛇,第一個if判斷AQS的state是否等于0绵患,表示鎖沒有人占有。接著悟耘,hasQueuedPredecessors判斷隊列是否有排在前面的線程在等待鎖落蝙,沒有的話調(diào)用compareAndSetState使用cas的方式修改state,傳入的acquires寫死是1暂幼。最后線程獲取鎖成功掘殴,setExclusiveOwnerThread將線程記錄為獨占鎖的線程。
第二個if判斷當(dāng)前線程是否為獨占鎖的線程粟誓,因為ReentrantLock是可重入的,線程可以不停地lock來增加state的值起意,對應(yīng)地需要unlock來解鎖鹰服,直到state為零。
如果最后獲取鎖失敗,下一步需要將線程加入到等待隊列悲酷。
線程進(jìn)入等待隊列
AQS內(nèi)部有一條雙向的隊列存放等待線程套菜,節(jié)點是Node對象。每個Node維護(hù)了線程设易、前后Node的指針和等待狀態(tài)等參數(shù)逗柴。
線程在加入隊列之前,需要包裝進(jìn)Node顿肺,調(diào)用方法是addWaiter:
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
每個Node需要標(biāo)記是獨占的還是共享的戏溺,由傳入的mode決定,ReentrantLock自然是使用獨占模式Node.EXCLUSIVE屠尊。
創(chuàng)建好Node后旷祸,如果隊列不為空,使用cas的方式將Node加入到隊列尾讼昆。注意托享,這里只執(zhí)行了一次修改操作,并且可能因為并發(fā)的原因失敗浸赫。因此修改失敗的情況和隊列為空的情況闰围,需要進(jìn)入enq。
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
enq是個死循環(huán)既峡,保證Node一定能插入隊列羡榴。注意到,當(dāng)隊列為空時涧狮,會先為頭節(jié)點創(chuàng)建一個空的Node炕矮,因為頭節(jié)點代表獲取了鎖的線程,現(xiàn)在還沒有者冤,所以先空著肤视。
阻塞等待線程
線程加入隊列后,下一步是調(diào)用acquireQueued阻塞線程涉枫。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//1
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//2
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
標(biāo)記1是線程喚醒后嘗試獲取鎖的過程邢滑。如果前一個節(jié)點正好是head,表示自己排在第一位愿汰,可以馬上調(diào)用tryAcquire嘗試困后。如果獲取成功就簡單了,直接修改自己為head衬廷。這步是實現(xiàn)公平鎖的核心摇予,保證釋放鎖時,由下個排隊線程獲取鎖吗跋。(看到線程解鎖時侧戴,再看回這里啦)
標(biāo)記2是線程獲取鎖失敗的處理宁昭。這個時候,線程可能等著下一次獲取酗宋,也可能不想要了积仗,Node變量waitState描述了線程的等待狀態(tài),一共四種情況:
static final int CANCELLED = 1; //取消
static final int SIGNAL = -1; //下個節(jié)點需要被喚醒
static final int CONDITION = -2; //線程在等待條件觸發(fā)
static final int PROPAGATE = -3; //(共享鎖)狀態(tài)需要向后傳播
shouldParkAfterFailedAcquire傳入當(dāng)前節(jié)點和前節(jié)點蜕猫,根據(jù)前節(jié)點的狀態(tài)寂曹,判斷線程是否需要阻塞。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
- 前節(jié)點狀態(tài)是SIGNAL時回右,當(dāng)前線程需要阻塞隆圆;
- 前節(jié)點狀態(tài)是CANCELLED時,通過循環(huán)將當(dāng)前節(jié)點之前所有取消狀態(tài)的節(jié)點移出隊列楣黍;
- 前節(jié)點狀態(tài)是其他狀態(tài)時匾灶,需要設(shè)置前節(jié)點為SIGNAL。
如果線程需要阻塞租漂,由parkAndCheckInterrupt方法進(jìn)行操作阶女。
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
parkAndCheckInterrupt使用了LockSupport,和cas一樣哩治,最終使用UNSAFE調(diào)用Native方法實現(xiàn)線程阻塞(以后有機(jī)會就分析下LockSupport的原理秃踩,park和unpark方法作用類似于wait和notify)。最后返回線程喚醒后的中斷狀態(tài)业筏,關(guān)于中斷憔杨,后文會分析。
到這里總結(jié)一下獲取鎖的過程:線程去競爭一個鎖蒜胖,可能成功也可能失敗消别。成功就直接持有資源,不需要進(jìn)入隊列台谢;失敗的話進(jìn)入隊列阻塞寻狂,等待喚醒后再嘗試競爭鎖。
釋放鎖
通過上面詳細(xì)的獲取鎖過程分析朋沮,釋放鎖過程大概可以猜到:頭節(jié)點是獲取鎖的線程蛇券,先移出隊列,再通知后面的節(jié)點獲取鎖樊拓。
public void unlock() {
sync.release(1);
}
ReentrantLock的unlock方法很簡單地調(diào)用了AQS的release:
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
和lock的tryAcquire一樣纠亚,unlock的tryRelease同樣由ReentrantLock實現(xiàn):
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
因為鎖是可以重入的,所以每次lock會讓state加1筋夏,對應(yīng)地每次unlock要讓state減1蒂胞,直到為0時將獨占線程變量設(shè)置為空,返回標(biāo)記是否徹底釋放鎖条篷。
最后啤誊,調(diào)用unparkSuccessor將頭節(jié)點的下個節(jié)點喚醒:
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
尋找下個待喚醒的線程是從隊列尾向前查詢的岳瞭,找到線程后調(diào)用LockSupport的unpark方法喚醒線程。被喚醒的線程重新執(zhí)行acquireQueued里的循環(huán)蚊锹,就是上文關(guān)于acquireQueued標(biāo)記1部分,線程重新嘗試獲取鎖稚瘾。
中斷鎖
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
在acquire里還有最后一句代碼調(diào)用了selfInterrupt牡昆,功能很簡單,對當(dāng)前線程產(chǎn)生一個中斷請求摊欠。
為什么要這樣操作呢丢烘?因為LockSupport.park阻塞線程后,有兩種可能被喚醒些椒。
第一種情況播瞳,前節(jié)點是頭節(jié)點,釋放鎖后免糕,會調(diào)用LockSupport.unpark喚醒當(dāng)前線程赢乓。整個過程沒有涉及到中斷,最終acquireQueued返回false時石窑,不需要調(diào)用selfInterrupt牌芋。
第二種情況,LockSupport.park支持響應(yīng)中斷請求松逊,能夠被其他線程通過interrupt()喚醒躺屁。但這種喚醒并沒有用,因為線程前面可能還有等待線程经宏,在acquireQueued的循環(huán)里犀暑,線程會再次被阻塞。parkAndCheckInterrupt返回的是Thread.interrupted()烁兰,不僅返回中斷狀態(tài)耐亏,還會清除中斷狀態(tài),保證阻塞線程忽略中斷缚柏。最終acquireQueued返回true時苹熏,真正的中斷狀態(tài)已經(jīng)被清除,需要調(diào)用selfInterrupt維持中斷狀態(tài)币喧。
因此普通的lock方法并不能被其他線程中斷轨域,ReentrantLock是可以支持中斷,需要使用lockInterruptibly杀餐。
兩者的邏輯基本一樣干发,不同之處是parkAndCheckInterrupt返回true時,lockInterruptibly直接throw new InterruptedException()史翘。
非公平鎖
分析完公平鎖的實現(xiàn)枉长,還剩下非公平鎖冀续,主要區(qū)別是獲取鎖的過程不同。
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
在NonfairSync的lock方法里必峰,第一步直接嘗試將state修改為1洪唐,很明顯,這是搶先獲取鎖的過程吼蚁。如果修改state失敗凭需,則和公平鎖一樣,調(diào)用acquire肝匆。
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
nonfairTryAcquire和tryAcquire乍一看幾乎一樣粒蜈,差異只是缺少調(diào)用hasQueuedPredecessors。這點體驗出公平鎖和非公平鎖的不同旗国,公平鎖會關(guān)注隊列里排隊的情況枯怖,老老實實按照FIFO的次序;非公平鎖只要有機(jī)會就搶占能曾,才不管排隊的事度硝。
總結(jié)
從ReentrantLock的實現(xiàn)完整分析了AQS的獨占功能,總的來講并不復(fù)雜借浊。別忘了AQS還有共享功能塘淑,下一篇是--分析CountDownLatch的實現(xiàn)原理。