- Java-AQS同步器 源碼解讀<一>獨占鎖加鎖
- Java-AQS同步器 源碼解讀<二>獨占鎖解鎖
- Java-AQS同步器 源碼解讀<三>共享鎖
- Java-AQS同步器 源碼解讀<四>-條件隊列上
- Java-AQS同步器 源碼解讀<五>-條件隊列下
共享鎖
前面2篇文章描述了AQS中獨占鎖的加鎖解鎖祭陷,那今篇文章我們聊下AQS 中分享鎖的加鎖解鎖
既然說道共享鎖和獨占鎖,那2者最本質的區(qū)別是什么呢趣席,大家應該記得AQS中有一個同步器狀態(tài)State 字段兵志,其實說說白了共享模式和獨占模式,就是同步器的狀態(tài)是否允許被多個線程所獲取宣肚,比如我們之前說的ReentrantLock就是獨占鎖的模式想罕,因為同步器狀態(tài)只能被一個線程所獲取,那這篇我將使用Semaphore來做分析共享鎖霉涨。
共享鎖加鎖
Semaphore初始化
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
上面是信號量的默認構造函數(shù) 默認實現(xiàn)的是非公平鎖
/**
* NonFair version
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
我們看到我們傳遞的信號量permits 最終還是調(diào)用了Sync的構造函數(shù)
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;
Sync(int permits) {
setState(permits);
}
}
setState 其實就是調(diào)用AQS中的方法 就是給State 賦值
Semaphore獲取 acquire() 方法
選取一個默認的獲取方法如下:
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
可以看到 我們這邊默認獲取的信號量是1按价,當然acquire也有帶參數(shù)的構造方法
很明顯 我們看到acquire默認的構造函數(shù)調(diào)的是Sync中acquireSharedInterruptibly方法 之前我也說過Sync是繼承了AQS的 我們IDE跟進這個方法 就進入了AQS類中
進入AQS中acquireSharedInterruptibly方法
/*這個方法就是去獲取同步鎖惭适,除非線程發(fā)送了中斷*/
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())//這邊就是堅持線程是否發(fā)生中斷,如果中斷則拋出異常
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
重寫tryAcquireShared的實現(xiàn)
看了上面的代碼俘枫,我們有看到一個熟悉的方法tryAcquireShared和之前獨占鎖tryAcquire很像腥沽,這個方法也是需要子類去重寫的,熟悉的套路鸠蚪,熟悉的味道今阳,哈哈!那我們就去找下tryAcquireShared這個方法
找下找 很快我找到了 在NonfairSync里面找到了 代碼上面也有茅信,NonfairSync非公平鎖的tryAcquireShared方法原來是調(diào)用的父類的方法也就是Sync的盾舌,那我就去Sync類中看一看,果然找到了
final int nonfairTryAcquireShared(int acquires) {
for (;;) {//又是一個自旋的操作,AQS 中有大量的這樣的寫法
int available = getState();
int remaining = available - acquires;
//整個自旋唯一的出口蘸鲸,就是當前的線程獲占用完同步器的狀態(tài)值后小于0或者CAS修改State值失敗就返回了
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
看到這個返回我很懵逼呀妖谴,怎么就返回了呢,那我們結合剛才上面的acquireSharedInterruptibly來看 原來他判斷如果返回值小于0 就會執(zhí)行下面的方法doAcquireSharedInterruptibly方法酌摇,翻譯成白話文 就是 就是 同步器里面5個蘋果膝舅,可能前面幾個線程都把蘋果拿了 我再來拿的時候發(fā)現(xiàn)小于0了,拿怎么辦呢 窑多,只能去排隊等待咯仍稀,和獨占鎖流程差不多,區(qū)別就是這個后面埂息,好的 我們慢慢再開看技潘,
doAcquireSharedInterruptibly 獲取失敗后排隊
/**
* Acquires in shared interruptible mode.
* @param arg the acquire argument
*/
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
//以共享模式加入到阻塞隊列中 這里addWaiter和獨占鎖加鎖使用的是同一個方法 不清楚的 可以看之前的文章
final Node node = addWaiter(Node.SHARED);// 返回成功加入隊尾的節(jié)點
boolean failed = true;//標識是否獲取資源失敗
try {
for (;;) {//自旋
final Node p = node.predecessor();// 獲取當前節(jié)點的前置節(jié)點
if (p == head) {// 如果前置節(jié)點是head 那就去嘗試獲取資源,因為可能head已經(jīng)釋放了資源
int r = tryAcquireShared(arg);
if (r >= 0) {// 如果獲取成功且大于等于0千康,意味這資源還有剩余享幽,可喚醒其余線程獲取
setHeadAndPropagate(node, r);// 這邊方法就是和獨占鎖處理不一樣地放 我們可以重點去看下 其余的流程是一樣的
p.next = null; // help GC
failed = false;
return;
}
}
/*下面的方法和獨占鎖的是一樣的 在第一篇文章中已經(jīng)解讀過,小伙伴們?nèi)绻磺宄?可以去看下
有區(qū)別的地方就是對中斷的處理這邊是直接拋出中斷異常拾弃,獨占鎖處理是返回標記是否中斷 讓上一層處理中斷
*/
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
setHeadAndPropagate方法
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; //賦值當前的head節(jié)點 因為下一步會對head 重寫賦值
setHead(node);//設置當前node 節(jié)點 為head 這個和獨占鎖的是一樣的
/*
* propagate > 0 的意思 就是同步器里面State是有剩余的 可以喚醒其他線程
* 后面的判斷意思是 當前的head節(jié)點或者之前的head節(jié)點等于null 或者狀態(tài)小于0 那也必須能喚醒后面線程去獲取資源 head等于null 說明可能被GC回收了
* 這邊的head的waitStatus 我自己模擬了下各自情況 只可能是-1或者-3
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;//當前node 的下一個節(jié)點
if (s == null || s.isShared())//如果snull
doReleaseShared();
}
}
doReleaseShared方法
private void doReleaseShared() {
/*
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {// 這個head!=tail 說明阻塞隊列中至少2個節(jié)點 不然也沒必要去傳播喚醒 如果就自己一個節(jié)點 就算資源條件滿足 還換個誰呢值桩?
int ws = h.waitStatus;// head 節(jié)點狀態(tài)SIGNAL
if (ws == Node.SIGNAL) {// 如果head狀態(tài)是
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);//是和獨占鎖釋放用的同樣的方法 喚醒的是下一個節(jié)點 上一篇有分析到
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; //這邊設置為-3 是為了喚醒的傳播 也就是滿足上一個方法有判斷waitStatus 小于0
}
if (h == head)
break;
}
}
這個條件是自旋唯一的出口就是head 沒有發(fā)送變化 說明沒有后面的線程獲取資源 那就退出自旋,如果head發(fā)生了變化 說明傳播的有效果了 后面線程獲取到到了資源
還有要注意的地方 是doReleaseShared這個方法有2個地方調(diào)用 一個是就是這邊共享鎖加鎖 還有一個就是共享鎖解鎖的地方
共享鎖解鎖
我們看下共享鎖的解鎖 其實看完了上面的內(nèi)容 這個就簡單了很多
Semaphore獲取 releaseShared() 方法
代碼在Semaphore類中
public void release() {
sync.releaseShared(1);
}
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();//獲取同步器的狀態(tài)
int next = current + releases;//累加釋放的資源值
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))//CAS 更新同步器狀態(tài)值 成功就退出自旋
return true;
}
}
代碼跳轉到了AQS類中
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {//tryReleaseShared 還是和之前的套路一樣 子類去重寫的 也很簡單 代碼也貼在了上面
doReleaseShared();//是不是很熟悉
return true;
}
return false;
}
看到doReleaseShared 我們應該很熟悉了 剛才加鎖的時候 也用到了這個方法 具體就不多說了
總結共享鎖和獨占鎖 區(qū)別之處
看了共享鎖的加鎖 我們在回顧下獨占鎖加鎖 這邊的處理豪椿,不然想到 這邊的區(qū)別就是head獲取資源后 獨占鎖直接設置自己為head 然后返回 而共享鎖這邊head 獲取資源后 如果資源狀態(tài)還有剩余 就會喚醒其余線程去獲取颠毙,這就是2者的區(qū)別
同樣的解鎖的過程也是幾乎一樣 底層喚醒線程的unparkSuccessor方法都是公用的,解鎖的過程也是多一個喚醒傳播的過程
好的AQS的同步隊列 的共享模式和獨占模式 用了前面的3篇文章 和大家分享完了
后面 會分析下AQS中的條件隊列 具體怎么運行的~
不要吝嗇你的點贊 砂碉,點贊 給我東西 ,繼續(xù)寫作