Java并發(fā)編程--ReentrantLock可重入性探索
我們直接先看其公平鎖情況下的可重入性到底是怎么回事,由于我們討論的是公平鎖的情況逐哈,而相關的代碼在ReentrantLock的內部類FairSync中芬迄。
1. lock()
public void lock() {
sync.lock();
}
由于是公平鎖,所以我們需要重FairSync中查看lock方法:
final void lock() {
acquire(1);
}
而這里的acquire方法繼承自AbstractQueuedSynchronizer
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
首先我們先提前說一下tryAcquire返回值是一個boolean昂秃,為true說明當前線程成功獲取了ReentrantLock的鎖禀梳,并且ReentrantLock鎖是一個獨占鎖,而這個if條件肠骆,如果成功獲取了鎖算途,那么acquire方法就直接返回了。
AQS已經為該方法做了方法的實現(xiàn)蚀腿,在FairSync中我們只要實現(xiàn)tryAcquire方法即可:
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) { // 說明當前鎖還未被持有
// hasQueuedPredecessors返回false的情況為:當前線程在等待隊列的頭部或者等待隊列為空
// 這就說明了:只有等待隊列的頭結點可以獲取鎖
// Q:什么情況下當前線程已經在頭結點了嘴瓤,但是還沒有獲取鎖扫外?
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) { // 這里說明當前線程就是獨占的線程
int nextc = c + acquires; // 持有鎖的線程獲取鎖的次數,這也表明了可重入性
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc); // 原來可重入性是這么個意思
return true; // 如果current再次使用該所對象加鎖廓脆,那么會直接返回true筛谚,可重入就是這么個意思
}
return false;
}
如注釋中所說,當前線程能夠成功獲得鎖有兩種情況停忿,分別代表首次加鎖和重入鎖:
- 如果c為0的話驾讲,說明當前鎖還沒有被任何線程獨占,這時候會對隊列進行判斷
- hasQueuedPredecessors方法返回false有兩種情況:當前等待隊列為空或者當前線程就是隊列頭部席赂;此時才可以嘗試加鎖
- 一個CAS操作將ReentrantLock的state屬性設置為acquire值(調用的時候傳遞的為1)
- setExclusiveOwnerThread方法將當前線程設置為獨占鎖的線程
- 如果當前線程為獨占線程吮铭,則保證可重入性:
- 將state進行加一操作
從這一個成功加鎖的過程我們可以產生一些大膽的推斷:
- 獨占該鎖的線程位于等待隊列的頭部
- state屬性表示獨占的線程加鎖的次數,在之后解鎖的時候可能也要這么多次數的unlock才可以釋放鎖
- 可重入性的保證就是一句current == getExclusiveOwnerThread()
注意:有沒有發(fā)現(xiàn)颅停,對于第一個加鎖的線程谓晌,它會加鎖成功,但是這個第一次加鎖的線程便监,沒有被封裝成一個Node節(jié)點放到隊列中扎谎;所以說,持有鎖的線程是隊列的頭結點這句話有問題:因為持有鎖的線程根本不在隊列中烧董,何來頭結點一說毁靶。在下文分析加鎖失敗的情況會證明這一論斷。
關于hasQueuedPredecessors方法:
public final boolean hasQueuedPredecessors() {
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
首先對于第一次加鎖的線程逊移,此時由于h == t == null所以返回false预吆,而對于后面的返回false的情況大都是h != t但是s != null、s.thread == Thread.currentThread()返回的false胳泉,也就是只有頭結點的后繼節(jié)點調用該方法時拐叉,才會返回false表示可以嘗試加鎖。同時這也是可重入鎖中公平鎖的來源扇商,對于之前已經在隊列中的節(jié)點凤瘦,那么新來的節(jié)點想要加鎖,該方法會返回true說明隊列中在你之前還有人在等待案铺,得前面沒人等待了你才能返回false蔬芥,才能嘗試去加鎖,保證了先來后到的公平性控汉。
注意:對于第二個嘗試加鎖的線程笔诵,由于此時前面有一個人持有鎖,所以它在調用lock-acquire-tryAcquire方法時由于判斷state姑子!=0且當前線程不是獨占鎖的線程直接判斷了加鎖失敗乎婿,從而被添加到了隊列中,這條路線中不涉及到hasQueuedPredecessors方法的調用(顯然此時它是滿足h==t這個判斷的)
加鎖失敗情況
還記得acquire方法嗎:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
如果tryAcquire方法返回false的話街佑,說明加鎖失敗谢翎,同時通過上面的代碼我們知道捍靠,如果加鎖失敗的話,當前線程沒有被執(zhí)行各種處理岳服,所以我們在分析acquireQueued方法的時候沒有任何后顧之憂剂公,它的代碼沒有收到tryAcquire的影響:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) { // 注意,這里的意思是頭結點的后繼節(jié)點tryAcquire成功吊宋,也就是獲取了鎖
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted; // 這里說明了纲辽,如果頭結點的后繼節(jié)點成功獲得了鎖,直接返回false
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) // parkAndCheckInterrupt方法最后一句return Thread.currentThread().interrupted()
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
對于這個入隊操作璃搜,有幾點需要說明:
- 只有頭結點的next節(jié)點才會主動調用tryAcquire方法取申請獲取鎖
- 當頭結點的next節(jié)點成功的得到了鎖之后拖吼,通過setHead方法會將自己設為頭結點
- 移除原來的頭結點之后return false
好了,接下來我們就來說一說為啥上面說持有鎖的線程不在隊列中这吻,如果說上文對于首次加鎖的線程沒有加入隊列產生懷疑的話吊档,那么這里的setHead方法會使你幡然醒悟:
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
看到了沒:node.thread = null把頭結點的線程置為空了!M倥础怠硼!所以,對于獨占到鎖的線程來說移怯,它此時已經不再隊列中了O懔А!所以說舟误,只有頭結點時持有鎖的節(jié)點這句話不準確F厦搿!
那么問題來了嵌溢,這個頭結點時怎么初始化的呢眯牧?
頭結點的初始化
答案就在調用acquireQueue方法時的addWaiter方法:
private Node addWaiter(Node mode) {
// 思路:新鍵當前線程的Node節(jié)點;如果tail被初始化過赖草,則直接添加到尾部学少;否則執(zhí)行enq操作先初始化tail再入隊
// 根據注釋:mode傳遞的值要么是Node.SHARED要么是Node.EXCLUSIVE
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
這里的代碼思路很簡單,就是構建Node節(jié)點秧骑,然后插入到尾部版确,對于這個if判斷語句,為false的情況會執(zhí)行enq方法腿堤,在enq方法里面會有頭結點的初始化:
private Node enq(final Node node) {
// 思路:獲取尾部,將參數中的node節(jié)點直接加入尾部如暖;然后CAS更新tail引用
for (;;) {
// 獲取尾部
Node t = tail;
// 如果tail為空笆檀,說明是第一次入隊操作,通過CAS初始化節(jié)點
// 這里初始化只是調用了默認構造器盒至,目的是為了第二次for循環(huán)時tail不為null
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
// 因為tail是AQS在并發(fā)環(huán)境下的共享資源酗洒,所以修改tail變量要使用CAS操作
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
有沒有發(fā)現(xiàn)士修,這個頭結點的thread沒有傳遞是參數,是一個null樱衷,這也證明了我們之前說的是正確的棋嘲。
所以我們就知道了,在第一次有線程加鎖的時候矩桂,它會成功得到鎖沸移,那么當第二個線程需要等待這個鎖的時候,調用addWaiter的時候會初始化鏈表的頭結點侄榴。
阻塞線程的掛起
可能有的人會問雹锣,每一個阻塞的節(jié)點都有一個無限for循環(huán)自旋,那么線程沒有被掛起的話豈不是很浪費cpu資源癞蚕?掛起的操作就在for循環(huán)的后面
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) // parkAndCheckInterrupt方法最后一句return Thread.currentThread().interrupted()
interrupted = true;
其中shouldParkAfterFailedAcquire(p, node)方法用來判斷是否需要掛起獲取鎖失敗的線程:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
// 判斷是否可以掛起的思想是:如果該節(jié)點的pred節(jié)點的waitStatus已經被設置為了SIGNAL
// 那么說明該節(jié)點已經料理好后事了蕊爵,可以在某個時刻被喚醒,所以可以安全的掛起
return true;
if (ws > 0) { // 說明pred已經被cancelled了桦山,直接移除
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 說明pred的waitStatue是0或者PROPAGATE攒射,此時設置為SINGAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
看到了嗎,只有參數中的pred恒水,也就是需要被掛起的節(jié)點node的前一個節(jié)點的waitStatus為SINGLE的時候会放,才會返回true。而我們調用addWaiter方法創(chuàng)建Node節(jié)點的時候寇窑,waitStatue都是默認值0鸦概,所以在該方法的后面else語句中有一個CAS操作將其設置為SINGLE,這樣的話甩骏,在acquireQueued方法的for自旋中窗市,需要被掛起的線程經歷兩個for循環(huán)就可以使得shouldParkAfterFailedAcquire方法返回true。
之后饮笛,真正為node執(zhí)行掛起的操作位于parkAndCheckInterrupt方法咨察。
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
這樣的話,沒有獲取到鎖的線程就真的被掛起了福青。
2. unlock()
public void unlock() {
sync.release(1);
}
release方法同acquire方法一樣摄狱,都是在AQS中又方法實現(xiàn)的:
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
AQS的實現(xiàn)類只需要重寫tryRelease方法即可。
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
tryRelease方法很簡單无午,就是將ReentrantLock的state值減去一媒役,然后如果此時state為0說明獨占的線程已經完全釋放了鎖,此時可以解除綁定宪迟,否則返回false酣衷。
而真正實現(xiàn)釋放鎖后喚醒其他線程的方法位于release中的
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
我們上面在分析線程的掛起的時候說到了要想掛起,那么node的前一個節(jié)點的waitStatus必須為SINGLE次泽,而SINGLE在Node這個類中的值為-1.是小于0的穿仪,所以一定會執(zhí)行到unparkSuccessor方法席爽。
private void unparkSuccessor(Node node) { // 通過名稱,該方法是喚醒后繼節(jié)點
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
// 根據注釋和代碼:這里是處理node參數的next節(jié)點為null或者已經取消的情況
// 此時從尾部開始遍歷啊片,找沒有被canclled的節(jié)點喚醒
// PS:為何不從node開始往后找只锻,而是從尾部開始往前找?
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 代表參數node的next節(jié)點不為空紫谷,則喚醒其中的線程
if (s != null)
LockSupport.unpark(s.thread);
}
這里的LockSupport.unpark方法就是喚起其他線程的地方齐饮,且一次只會喚醒一個線程,大部分情況就是頭結點的后繼節(jié)點碴里。