AQS 全稱 AbstractQueuedSynchronizer 中文翻譯同步器 同步器是java中各種鎖實現(xiàn)的基礎(chǔ)川背,非常重用的 思喊。比如我們線程池里面的Worker 類 我們的重入鎖ReentrantLock暮顺,信號量Semaphore隆箩,CountDownLatch 等
- Java-AQS同步器 源碼解讀<一>獨占鎖加鎖
- Java-AQS同步器 源碼解讀<二>獨占鎖解鎖
- Java-AQS同步器 源碼解讀<三>共享鎖
- Java-AQS同步器 源碼解讀<四>-條件隊列上
- Java-AQS同步器 源碼解讀<五>-條件隊列下
我們一步步的來看 是怎么實現(xiàn)的
首先 我們先看下 這個類文件
abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer
implements java.io.Serializable
從文件中我們看出 AbstractQueuedSynchronizer是個抽象類繼承了AbstractOwnableSynchronizer抽象類
那我們就去看看AbstractOwnableSynchronizer有什么
AbstractOwnableSynchronizer
public abstract class AbstractOwnableSynchronizer
implements java.io.Serializable {
/** Use serial ID even though all fields transient. */
private static final long serialVersionUID = 3737899427754241961L;
/**
* 構(gòu)造器
*/
protected AbstractOwnableSynchronizer() { }
/**
* The current owner of exclusive mode synchronization.
* 從上面的英文翻譯 和下面字段的命名 我們能知道 這個是一個線程對象
* 是獨占鎖模式下 同步器的當(dāng)前擁有者線程
*/
private transient Thread exclusiveOwnerThread;
/**
* 設(shè)置 獨占鎖 同步器的擁著線程 沒什么好說的
*/
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}
/**
* 獲取當(dāng)前 同步器的擁著線程 也沒什么
*/
protected final Thread getExclusiveOwnerThread() {
return exclusiveOwnerThread;
}
從以上的代碼中 我們可以分析得到 AQS 基礎(chǔ)這個類 是為了 得到獨占鎖模式擁有鎖的線程 方便監(jiān)控
AbstractQueuedSynchronizer
那看完 上面的代碼 那我們進(jìn)入AbstractQueuedSynchronizer 里面看下 這個類里面又有什么
從源碼中 我們可以將AQS的內(nèi)部屬性分為4類
- 同步器的簡單屬性凉蜂,
- 同步隊列屬性一姿,
- 條件隊列屬性,
- 公用的Node類跃惫,和一個ConditionObject類
一個一個來看吧
先看Node這個類 這個類很重要 它即是同步隊列的節(jié)點 也是條件隊列的節(jié)點
AQS 內(nèi)部Node
/**
* 等待隊列的節(jié)點類叮叹,通過Node 可以實現(xiàn)2個隊列 一個是線程同步隊列(雙向隊列) 一個是條件線程隊列(單向隊列)
*/
static final class Node {
/**
* 標(biāo)識這個節(jié)點用于共享模式
* Marker to indicate a node is waiting in shared mode
*/
static final AbstractQueuedSynchronizer.Node SHARED = new AbstractQueuedSynchronizer.Node();
/**
* 標(biāo)識這個節(jié)點用于 獨占模式(排它 反正一個意思)
* Marker to indicate a node is waiting in exclusive mode
*/
static final AbstractQueuedSynchronizer.Node EXCLUSIVE = null;
/** 下面是 waitStatus 的幾個常量值 */
/**
* 表明等待線程已經(jīng)取消
* waitStatus value to indicate thread has cancelled
*/
static final int CANCELLED = 1;
/**
* 表述如果當(dāng)前節(jié)點的前一個節(jié)點狀態(tài)是 SIGNAL 那么就可以阻塞當(dāng)前自己的線程 不用去爭搶資源了 沒用 不然會一直嘗試去獲取資源
* waitStatus value to indicate successor's thread needs unparking
*/
static final int SIGNAL = -1;
/**
* 線程在條件隊列中等待
* waitStatus value to indicate thread is waiting on condition
*/
static final int CONDITION = -2;
/**
* 共享模式下 無條件傳播 該狀態(tài)的進(jìn)程處于可運行狀態(tài)
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
/**
* 當(dāng)前node 狀態(tài)
*/
volatile int waitStatus;
/**
* 獨占模式下 節(jié)點指向的上一個節(jié)點
*/
volatile AbstractQueuedSynchronizer.Node prev;
/**
* 獨占模式下 節(jié)點指向的下一個節(jié)點
*/
volatile AbstractQueuedSynchronizer.Node next;
/**
* 入隊時的線程
* The thread that enqueued this node. Initialized on
* construction and nulled out after use.
*/
volatile Thread thread;
/**
* condition 條件隊列中的后繼節(jié)點
*/
AbstractQueuedSynchronizer.Node nextWaiter;
/**
* Returns true if node is waiting in shared mode.
*/
final boolean isShared() {
return nextWaiter == SHARED;
}
/**
* 返回當(dāng)前節(jié)點的前驅(qū)節(jié)點
*
* @return the predecessor of this node
*/
final AbstractQueuedSynchronizer.Node predecessor() throws NullPointerException {
AbstractQueuedSynchronizer.Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
/**
* 用于addWaiter
*/
Node(Thread thread, AbstractQueuedSynchronizer.Node mode) {
this.nextWaiter = mode;
this.thread = thread;
}
/**
* 用于Condition
*/
Node(Thread thread, int waitStatus) {
this.waitStatus = waitStatus;
this.thread = thread;
}
}
AQS 實例屬性
看完了 Node 節(jié)點 我看看下AQS 本身的實例屬性 有哪些
/**
* 同步隊列的頭 初始化 或者setHead方法可修改
*/
private transient volatile Node head;
/**
* 同步隊列的尾節(jié)點 通過enq方法去修改尾節(jié)點 后面會分析到
*/
private transient volatile Node tail;
/**
* 同步器的狀態(tài) 這里 可以去 去看下 Semaphore 這個類 我們初始化的時候 實際
上就是調(diào)用的setState 方法 給同步器的狀態(tài)賦值的 每個線程獲取的時候都會去消 耗這個值 達(dá)到控制并發(fā)效果
*/
private volatile int state;
/**
* 獲取同步器狀態(tài)
*/
protected final int getState() {
return state;
}
/**
* 同步器狀態(tài)賦值
*/
protected final void setState(int newState) {
state = newState;
}
看完了 上面的2中模式 解鎖下獨占和共享什么意思吧
獨占的意思的同一時刻 只能有一個線程可以獲得鎖 其余的都排隊阻塞,釋放資源的時候也只有線程可以釋放 比如重入鎖ReentrantLock
共享就是允許多個線程獲得同一個鎖爆存,并且可以設(shè)置獲取鎖的線程數(shù)量 比如我 用Semaphore初始狀態(tài)設(shè)置為4 每個線程獲取2個 那同時可以有2個線程獲取鎖
AQS 獨占鎖
看完了上面的一些接受 下面我們來跟著代碼去分析下獨占鎖 是怎么實現(xiàn)的
獨占鎖 加鎖
-
acquire方法
為什么首先 我們要看這個方法呢蛉顽,因為這是鎖獲取資源的入口,那我們就看下代碼先较,用ReentrantLock來舉例看看
我們使用ReentrantLock 開始加鎖
public void lock() {
sync.lock();
}
我們看下sync.lock 是什么
abstract static class Sync extends AbstractQueuedSynchronizer {
/**
* Performs {@link Lock#lock}. The main reason for subclassing
* is to allow fast path for nonfair version.
*/
abstract void lock();
......
}
可以看到sync的lock 方法也是個抽象方法 應(yīng)該是Sync的子類去實現(xiàn)的
那我們就看下ReentrantLock的構(gòu)造方法
/**
* Creates an instance of {@code ReentrantLock}.
* This is equivalent to using {@code ReentrantLock(false)}.
*/
public ReentrantLock() {
sync = new NonfairSync();
}
其中得到ReentrantLock 默認(rèn)構(gòu)造函數(shù)中 sync的實現(xiàn)是NonfairSync公平鎖携冤,那我們繼續(xù)看NonfairSync這個類
/**
* Sync object for non-fair locks
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
}
如果 我們找到了Sync的抽象方法lock 在子類中的實現(xiàn)悼粮,其實就是執(zhí)行了acquire 方法
acquire方法在那呢 ?
FairSync 繼承了內(nèi)部抽象類 Sync類 而從上面的截取的代碼中
我們看到Sync類其實就繼承了AbstractQueuedSynchronizer 類 這樣最終我們會定位到AQS類中acquire方法
那下面我們就來看下acquire 這個方法
/*
*以獨占鎖的方式 去獲取資源 響應(yīng)中斷
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
- tryAcquire(arg) 方法
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
首先我們看到這個AQS里面 雖然寫了tryAcquire方法 但是 拋出一個異常曾棕,為什么要這么做呢 其實JDK 作者 是為了方便開發(fā)者 我什么這么說呢 因為AQS是個抽象類 這個方法其實可以寫成抽象方法 讓子類來實現(xiàn) 但是這帶來一個問題 因為AQS是很多鎖實現(xiàn)的基礎(chǔ) 如果寫成抽象方法 子類就必須實現(xiàn) 但是可能有些鎖 并不需要實現(xiàn)這個方法 沒用扣猫。所以AQS里面這么去寫,是OK的翘地,因為要使用的這個功能的鎖 必須去實現(xiàn)重寫這個方法申尤,不讓就報錯 ,不要使用到這個方法的子類 不用管衙耕。這樣的思想 我們可以去參考 在以后得開發(fā)中昧穿!
那我們?nèi)タ聪翿eentrantLock 類中是否是這樣做的
看下代碼:
從上面的截圖中 我們可以看到 確實是實現(xiàn)了的 ReentrantLock 有一個抽象類Sync 2個靜態(tài)類 且繼承抽象類 Sync 繼承了我們AQS 并且實現(xiàn)釋放資源的方法,加鎖去獲取資源的方法tryAcquire 分別由2個子類實現(xiàn)就 一個實現(xiàn)FairSync 公平鎖 一個實現(xiàn)NonfairSync非公平鎖
tryAcquire 方法是嘗試以獨占的方式去獲取資源
具體各個鎖里面的方法實現(xiàn) 自己可以去看下 我們還是看下吧ReentrantLock
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
上面的代碼是非公平鎖 實現(xiàn)的TryAcquire方法的版本 其實也很好讀懂就是判斷我們之前提到過的AQS里面的state 橙喘,上面我提到過ReentrantLock 默認(rèn)是非公平鎖时鸵,初始化的state值是0,這個一開始用getState 去獲取方法state當(dāng)前的值厅瞎,如果值等于0饰潜,說明鎖沒有其他線程占用,可以獲取鎖和簸,使用CAS原子操作State 新增1彭雾,如果CAS 成功就設(shè)置當(dāng)前鎖的所屬的線程 這個一開始我描述過AbstractOwnableSynchronizer里面的功能 ,如果獲取的時候state不為0比搭,就判斷下當(dāng)前的線程是否等于鎖所屬的線程,因為是獨占的 南誊,如果相等就把State 再次加1身诺,這個判斷也就是說 我們調(diào)了同個線程多次lock了,這個時非公平鎖的代碼抄囚,公平鎖的實現(xiàn)和上面的差不多 就是多了一個隊列的判斷霉赡,要從阻塞隊列里面的頭部節(jié)點開始一個個地排隊獲取,而非公平鎖就沒有幔托,只要釋放了 看誰先來 就誰先獲取到鎖穴亏,看多不公平啊,哈哈重挑!
是否滿足嗓化,如果滿足就返回True說明獲取到了鎖,不滿足就返回false acquire后面的方法會繼續(xù)執(zhí)行
好的 我們繼續(xù)回到acquire方法中 如果tryacquire方法返回false 也就是沒有獲取到鎖,執(zhí)行下一個方法acquireQueued(addWaiter(Node.EXCLUSIVE), arg) 那進(jìn)入這個方法的時候 又先執(zhí)行addWaiter 后執(zhí)行acquireQueued
- addWaiter(Node.EXCLUSIVE)方法
/**
* 這個方法就是加入當(dāng)前節(jié)點到隊列的尾節(jié)點
*/
private Node addWaiter(Node mode) {
/*構(gòu)建node 節(jié)點 還記得 我們上面寫的方法 用于addWaiter么 可以上面去看下
這里的node 幾點里面的nextWaiter 這個字段被復(fù)用了谬哀,在排他鎖模式下用于記錄鎖類型*/
Node node = new Node(Thread.currentThread(), mode);
/*Try the fast path of enq; backup to full enq on failure
從上面的英文注解上面 我們能得到下面的方法是先嘗試快速加入隊尾刺覆,
成功則返回當(dāng)前節(jié)點,如果失敗就進(jìn)入的 enq方法 最后返回節(jié)點*/
Node pred = tail;
if (pred != null) {
//當(dāng)前節(jié)點的頭節(jié)點指向隊列的尾部節(jié)點史煎,相當(dāng)于加入到最后了
node.prev = pred;
//CAS 賦值隊列尾部節(jié)點
if (compareAndSetTail(pred, node)) {
// 如果成功將尾部節(jié)點的下個節(jié)點指向我們新加的節(jié)點谦屑,因為是雙向鏈表驳糯,所以要互相指一下的
pred.next = node;
return node;
}
}
// 如果 尾部節(jié)點是null 或者CAS賦值尾部節(jié)點失敗,則進(jìn)入enq 方法氢橙,其實就是采用了自旋的方法加入 走我們進(jìn)去看下怎么做的
enq(node);
return node;
}
-enq 方法
老規(guī)矩 上源碼
/*
* 使用自旋的方法 加入尾部節(jié)點
*/
private Node enq(final Node node) {
//無條件的循環(huán) 自旋 好好理解下這個方法 其實循環(huán)只有一個出口 就是成功加入尾部節(jié)點返回
for (;;) {
Node t = tail;
if (t == null) { // 這邊尾部節(jié)點是null 說明整個隊列都是空的 必須初始化一下
/*CAS 賦值一個頭部節(jié)點 如果成功 就頭節(jié)點賦值給尾節(jié)點酝枢,
也很好理解初始化么 沒有任何節(jié)點,當(dāng)然頭就是尾悍手,尾就是頭,不理解的可以看看 鏈表結(jié)構(gòu)如果CAS失敗帘睦,
就不做什么 反正又出不來這個循環(huán),就繼續(xù)循壞吧谓苟,總會成功的官脓,不要覺得這邊會死循環(huán),因為就算CAS不成功涝焙,
可能是有別的線程修改了值導(dǎo)致的,下次循環(huán)的時候就走不到這邊了 t也就是尾節(jié)點就不為null了卑笨,走到下面的判斷*/
if (compareAndSetHead(new Node()))
tail = head;
} else {
//走呀走 終于到這邊了,看看這部代碼 是不是很熟悉仑撞,好好想想赤兴,往上面看看,其實addWaiter的方法是一樣的隧哮,
//只不過這邊做了自旋操作桶良,保證了一定加入隊尾成功
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
ok 看到這邊 小伙伴們 不知道有沒有暈哈,不管怎么樣沮翔,革命尚未成功陨帆,繼續(xù)哈,好的 我們回到acquire中
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
上面我們看了tryAcquire方法 和addWaiter方法采蚀,那最后我們看下acquireQueued方法是什么 acquireQueued方法的參數(shù)一個就是addWaiter 返回加入尾部的節(jié)點疲牵,還有一個arg是我們各個實現(xiàn)鎖方法傳進(jìn)來的
-acquireQueued方法
/**
* 當(dāng)前節(jié)點加入的隊尾后,嘗試自旋的方式去獲取資源榆鼠,
* 這個方法還有個作用就是自己的前置節(jié)點變?yōu)镾IGNAL 這樣他自己就能阻塞了
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;//表述是否拿到資源失敗 也就是說false表述拿到資源了
try {
//標(biāo)記 是否執(zhí)行中斷 為什么有這個是因為此方法沒有拋出異常 如果內(nèi)部發(fā)生異常 要通知上一級調(diào)用纲爸,所有有此標(biāo)識
boolean interrupted = false;
for (;;) {//這邊又是自旋的方法 方法出口只有一個 就是獲取到資源
/*
P 是當(dāng)前節(jié)點node的前一個節(jié)點 這邊判斷的意思 是如果當(dāng)前節(jié)點的前一個節(jié)點就是頭部節(jié)點,那就嘗試去獲取鎖資源妆够,
如果能獲取到 說明頭節(jié)點已經(jīng)釋放了鎖识啦,那就迅速讓自己成為隊列的頭部節(jié)點,讓之前的頭部節(jié)點next指向null
是為了方便GC的回收神妹,然后賦值failed 為true 表示已經(jīng)獲取到資源颓哮,并且返回中斷狀態(tài)
*/
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//如果P不是head節(jié)點或者是head節(jié)點但是嘗試獲取資源失敗,說明鎖資源還沒被釋放鸵荠,那就執(zhí)行到這邊
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
那我們在進(jìn)入shouldParkAfterFailedAcquire 方法中看看 是什么
-shouldParkAfterFailedAcquire 方法
/**
* 更新prev節(jié)點狀態(tài) 题翻,并根據(jù)prev節(jié)點狀態(tài)判斷是否自己當(dāng)前線程需要阻塞
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;// node的prev節(jié)點的狀態(tài)
if (ws == Node.SIGNAL)
// 如果SIGNAL 就返回true 就會執(zhí)行到parkAndCheckInterrupt方法里面
return true;
if (ws > 0) {
/*
* 如果ws 大于0 這里是只能為1,如果是1說明線程已經(jīng)取消,相當(dāng)于無效節(jié)點
* 者說明 當(dāng)前node 節(jié)點加入到了一個無效節(jié)點后面嵌赠,那這個必須處理一下
node.prev = pred = pred.prev
* 這個操作 我們拆解下來看塑荒,下看 pred = pred.prev這個的意思是把prev節(jié)點的prev*節(jié)點 賦值給prev節(jié)點
*后面再看 node.prev = pred 聯(lián)合 剛才的賦值 這個的意思就是把prev節(jié)點的prev節(jié)點和node關(guān)聯(lián)起來,
*原因我上面也說了因為pre節(jié)點線程取消了,所以node節(jié)點不能指向pre節(jié)點 只能一個一個的往前找姜挺,
*找到waitStatus 小于或者等于0的結(jié)束循環(huán)最后再把找到的pre節(jié)點執(zhí)行node節(jié)點 ,這樣就跳過了所有無效的節(jié)點
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
*這邊的操作就是把pred 節(jié)點的狀態(tài)設(shè)置為SIGNAL齿税,這樣返回false 這樣可以返回到上面的自旋中
*再次執(zhí)行一次,如果還是獲取不到鎖炊豪,那么又回到當(dāng)前的shouldParkAfterFailedAcquire方法 執(zhí)行到方法最上面的判斷
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
-parkAndCheckInterrupt方法
回到剛才 如果shouldParkAfterFailedAcquire方法返回true 那么就會執(zhí)行parkAndCheckInterrupt方法
這個方法很簡單
/**
* Convenience method to park and then check if interrupted
* @return {@code true} if interrupted
*/
private final boolean parkAndCheckInterrupt() {
/*
* 這邊線程阻塞凌箕,只有2中方式喚醒當(dāng)前線程,
* 一種方式就是 當(dāng)前線程發(fā)生中斷
* 另外一個情況就是 資源釋放的時候會調(diào)unpark 方法 喚醒當(dāng)前的線程 這個會在下一篇會講到
*/
LockSupport.park(this);
return Thread.interrupted();
}
沒什么好說的 就是阻塞當(dāng)前線程词渤,并且檢查當(dāng)前線程有沒有過中斷 返回 正常的話 只有release釋放資源后 會喚醒線程 但是線程發(fā)生中斷也是可以的 所以這邊要堅持下 是那種情況 喚醒當(dāng)前線程的方式 并返回
最后的最后我們再次回到剛才的方法
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
最后這邊 currentThread做出線程中斷 如果上面都返回true的話
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
不知道 大家有沒有了解acquire 怎么去實現(xiàn)獲取資源的了
如果還是不知道 AQS里面的獨占鎖怎么加鎖的 最后做下簡短的總結(jié):
1.首先第一步 用tryAcquire方法嘗試去獲取資源 此方法由各個繼承類重寫牵舱,如果成功就直接返回 獲取鎖成功,如果失敗 則進(jìn)入步驟2
2.進(jìn)入addWaiter方法 這個方法主要做的就是把當(dāng)前的線程包裝成node節(jié)點放入阻塞隊列的隊尾缺虐,里面有enq方法里面有自旋的操作
3.然后進(jìn)入acquireQueued方法芜壁,入?yún)⒁皇莂ddWaiter方法返回的no的節(jié)點,入?yún)⒍褪莂cquire 方法的參數(shù)arg高氮,因為里面還要調(diào)用tryAcquire方法慧妄,所以此參數(shù)要傳的,這個方法主要做的就是 嘗試自旋去獲取資源剪芍,但是嘗試獲取資源也是條件的 就是當(dāng)前node節(jié)點的prev節(jié)點是head,這樣就避免了無腦去循環(huán)去獲取資源塞淹,這個方法里面還做了一個事情 就是判斷下 如果當(dāng)前node 節(jié)點的prev節(jié)點 waitStatus 如果是SIGNAL 那就直接執(zhí)行下面的parkAndCheckInterrupt方法去阻塞當(dāng)前線程,為什么會這么寫呢罪裹,因為如果當(dāng)前節(jié)點狀態(tài)是SIGNAL 說明自己已經(jīng)獲取過鎖饱普,并且沒獲取到,所以后面的節(jié)點 就不要再去獲取了 直接阻塞吧状共!
4.最后 如果阻塞的線程 有中斷套耕,那會執(zhí)行到最后一步selfInterrupt中,就是做出線程中斷口芍,好讓當(dāng)前的線程 后面的方法 知道這個中斷箍铲,并做出回應(yīng)雇卷,如果不清楚這個的話 就要好好去了解下線程的interrupt 的作用 和三個常用的方法
好的簡單的總結(jié)完畢鬓椭,如果還是不了解 自己一遍看著我的解鎖一個看著源碼,自己理解下关划,我相信一定會有收獲的小染!加油!