在java.util.concurrent.locks包中有很多Lock的實(shí)現(xiàn)類, 常用的有ReentrantLock, ReadWriteLock(實(shí)現(xiàn)類ReentrantReadWriteLock). 這些鎖的實(shí)現(xiàn)思路都大同小異, 都依賴java.util.concurrent.AbstractQueuedSynchronizer類, 下面分析最常用的ReentrantLock的實(shí)現(xiàn).
ReentrantLock
ReentrantLock是一種可重入鎖(若是在同一個(gè)線程上請(qǐng)求鎖,則允許獲得鎖).
以下是官方說(shuō)明:一個(gè)可重入的互斥鎖定 Lock, 它具有與使用 synchronized 方法和語(yǔ)句所訪問(wèn)的隱式監(jiān)視器鎖定相同的一些基本行為和語(yǔ)義, 但功能更強(qiáng)大. ReentrantLock將由最近成功獲得鎖定, 并且還沒(méi)有釋放該鎖定的線程所擁有. 當(dāng)鎖定沒(méi)有被另一個(gè)線程所擁有時(shí), 調(diào)用 lock 的線程將成功獲取該鎖定并返回. 如果當(dāng)前線程已經(jīng)擁有該鎖定, 此方法將立即返回true. 可以使用 isHeldByCurrentThread() 和 getHoldCount() 方法來(lái)檢查此情況是否發(fā)生.
本文主要分析以下幾個(gè)過(guò)程:
ReentrantLock的創(chuàng)建(公平鎖/非公平鎖)
上鎖: lock()
解鎖: unlock()
ReentrantLock的內(nèi)部主要結(jié)構(gòu)如下:
ReentrantLock-->Lock
NonfairSync/FairSync-->Sync-->AbstractQueuedSynchronizer-->AbstractOwnableSynchronizer
注意: 邊這四條線, 對(duì)應(yīng)關(guān)系: "子類"-->"父類". 其中, NonfairSync, FairSync和Sync時(shí)ReentrantLock的三個(gè)內(nèi)部類.
ReentrantLock的創(chuàng)建
ReentrantLock支持兩種鎖, 公平鎖和非公平鎖, 公平鎖嚴(yán)格遵循(先進(jìn)來(lái)的線程先執(zhí)行), 非公平鎖總體原則上魚公平鎖一樣, 但在某些情況下, 有可能出現(xiàn)后進(jìn)來(lái)的線程獲得鎖的情況(所謂來(lái)得早不如來(lái)得巧).
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
默認(rèn)創(chuàng)建非公平鎖, 只有當(dāng)傳入?yún)?shù)true時(shí), 才會(huì)創(chuàng)建公平鎖.
上述源代碼中出現(xiàn)了三個(gè)內(nèi)部類Sync/NonfairSync/FairSync, 下面是三個(gè)類的定義.
/**
* 該鎖同步控制的一個(gè)基類.下邊有兩個(gè)子類:非公平機(jī)制和公平機(jī)制.使用了AbstractQueuedSynchronizer類的
*/
static abstract class Sync extends AbstractQueuedSynchronizer
/**
* 非公平鎖同步器
*/
final static class NonfairSync extends Sync
/**
* 公平鎖同步器
*/
final static class FairSync extends Sync
lock
ReentrantLock:lock()
public void lock() {
sync.lock();
}
也就是說(shuō)有內(nèi)部的同步器來(lái)決定.
非公平鎖的lock
NonfairSync: lock()
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
主要流程如下:
1)首先基于CAS將state(鎖數(shù)量)從0設(shè)置為1, 如果設(shè)置成功, 設(shè)置當(dāng)前線程為獨(dú)占鎖的線程;(此處就是非公平鎖的一次插隊(duì)).
2)如果設(shè)置失敗(即當(dāng)前的鎖數(shù)量可能已經(jīng)為1了), 這個(gè)時(shí)候當(dāng)前線程執(zhí)行acquire(1)方法;
2.1)acquire(1)方法首先調(diào)用下邊的tryAcquire(1)方法(獲取鎖數(shù)量), 如果獲取成功, 則返回.(此處就是非公平鎖的第二次嘗試插隊(duì)). 如果失敗, 繼續(xù)執(zhí)行acquireQueued(addWaiter(Node.EXCLUSIVE), arg)方法.
2.2)在執(zhí)行acquireQueued方法之前,會(huì)先執(zhí)行addWaiter方法,將當(dāng)前線程加入到等待獲取鎖的隊(duì)列的尾部.
2.2.1)addWaiter(Node.EXCLUSIVE)將當(dāng)前線程封裝進(jìn)Node節(jié)點(diǎn)node惑朦,然后將該節(jié)點(diǎn)加入等待隊(duì)列(先快速入隊(duì), 如果快速入隊(duì)不成功, 其使用正常入隊(duì)方法無(wú)限循環(huán)一直到Node節(jié)點(diǎn)入隊(duì)為止).
2.2.1.1)快速入隊(duì): 如果同步等待隊(duì)列存在尾節(jié)點(diǎn), 將使用CAS嘗試將尾節(jié)點(diǎn)設(shè)置為node, 并將之前的尾節(jié)點(diǎn)插入到node之前.
2.2.1.2)正常入隊(duì): 如果同步等待隊(duì)列不存在尾節(jié)點(diǎn)或者上述CAS嘗試不成功的話, 就執(zhí)行正常入隊(duì)(該方法是一個(gè)無(wú)限循環(huán)的過(guò)程, 即直到入隊(duì)為止). 如果尾節(jié)點(diǎn)為空(初始化同步等待隊(duì)列), 創(chuàng)建一個(gè)dummy節(jié)點(diǎn), 并將該節(jié)點(diǎn)通過(guò)CAS嘗試設(shè)置到頭節(jié)點(diǎn)上去, 設(shè)置成功的話, 將尾節(jié)點(diǎn)也指向該dummy節(jié)點(diǎn)(即頭節(jié)點(diǎn)和尾節(jié)點(diǎn)都指向該dummy節(jié)點(diǎn)). 如果尾節(jié)點(diǎn)不為空, 執(zhí)行與快速入隊(duì)相同的邏輯, 即使用CAS嘗試將尾節(jié)點(diǎn)設(shè)置為node, 并將之前的尾節(jié)點(diǎn)插入到node之前.
2.2.2)node節(jié)點(diǎn)入隊(duì)之后, 就去執(zhí)行acquireQueued(final Node node, int arg)(此處會(huì)出現(xiàn)阻塞).
2.2.2.1)獲取node的前驅(qū)節(jié)點(diǎn)p, 如果p是頭節(jié)點(diǎn), 就繼續(xù)使用tryAcquire(1)方法去嘗試請(qǐng)求成功. 如果第一次請(qǐng)求就成功, interrupted=false. 如果是之后的循環(huán)中將線程掛起之后被其他線程喚醒, interrupted=true. (注意p==head&&tryAcquire(1)成功是唯一跳出循環(huán)的方法).
2.2.2.2)如果p不是頭節(jié)點(diǎn), 或者tryAcquire(1)請(qǐng)求不成功,就去執(zhí)行shouldParkAfterFailedAcquire(Node pred, Node node)來(lái)檢測(cè)當(dāng)前節(jié)點(diǎn)是不是可以安全的被掛起. 是, 則調(diào)用parkAndCheckInterrupt方法掛起當(dāng)前線程. 否, 則繼續(xù)執(zhí)行2.2.2.1(這是一個(gè)死循環(huán)).
2.2.2.3)線程調(diào)用parkAndCheckInterrupt方法后會(huì)進(jìn)入阻塞狀態(tài),等待其他線程喚醒. 喚醒后會(huì)將interrupted置為true, 并且繼續(xù)執(zhí)行2.2.2.1.
2.2.2.4)若在這個(gè)循環(huán)中產(chǎn)生了異常, 我們就會(huì)執(zhí)行cancelAcquire(Node node)取消node的獲取鎖的意圖.
涉及到的方法:
AbstractQueuedSynchronizer: compareAndSetState()
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
AbstractQueuedSynchronizer: acquire(int arg)
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
NonfairSync: tryAcquire(int acquires)
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
Sync: nonfairTryAcquire(int acquires)
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
AbstractQueuedSynchronizer: addWaiter(Node mode)
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
AbstractQueuedSynchronizer: acquireQueued(final Node node, int arg)
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
公平鎖的lock
FairSync: lock()
final void lock() {
acquire(1);
}
AbstractQueuedSynchronizer: acquire(int arg)
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
主要流程比非公平鎖簡(jiǎn)單很多:
1):如果鎖數(shù)量為0, 如果當(dāng)前線程是等待隊(duì)列中的頭節(jié)點(diǎn), 基于CAS嘗試將state(鎖數(shù)量)從0設(shè)置為1一次, 如果設(shè)置成功, 設(shè)置當(dāng)前線程為獨(dú)占鎖的線程;
2):如果鎖數(shù)量不為0或者當(dāng)前線程不是等待隊(duì)列中的頭節(jié)點(diǎn)或者上邊的嘗試又失敗了, 查看當(dāng)前線程是不是已經(jīng)是獨(dú)占鎖的線程了, 如果是, 則將當(dāng)前的鎖數(shù)量+1;如果不是, 則將該線程封裝在一個(gè)Node內(nèi), 并加入到等待隊(duì)列中去, 等待被其前一個(gè)線程節(jié)點(diǎn)喚醒.
相關(guān)函數(shù)如下.
FairSync: tryAcquire()
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
AbstractQueuedSynchronizer: hasQueuedPredecessors()
public final boolean hasQueuedPredecessors() {
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
其他函數(shù)域非公平鎖的一樣.
非公平鎖的解鎖
ReentrantLock:unlock()
public void unlock() {
sync.release(1);
}
主要流程:
1)獲取當(dāng)前的鎖數(shù)量, 然后用這個(gè)鎖數(shù)量減去解鎖的數(shù)量(這里為1), 最后得出結(jié)果c.
2)判斷當(dāng)前線程是不是獨(dú)占鎖的線程, 如果不是, 拋出異常.
3)如果c==0, 說(shuō)明鎖被成功釋放, 將當(dāng)前的獨(dú)占線程置為null, 鎖數(shù)量置為0, 返回true.
4)如果c!=0, 說(shuō)明釋放鎖失敗, 鎖數(shù)量置為c, 返回false.
5)如果鎖被釋放成功的話, 喚醒距離頭節(jié)點(diǎn)最近的一個(gè)非取消的節(jié)點(diǎn).
涉及的代碼.
AbstractQueuedSynchronizer: release(int arg)
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
Sync: tryRelease(int releases)
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
公平鎖的解鎖
與非公平鎖一致