什么是AQS
AQS(AbstractQueuedSynchronizer)岸浑,AQS是JDK下提供的一套用于實現(xiàn)基于FIFO等待隊列的阻塞鎖和相關的同步器的一個同步框架陷猫。這個抽象類被設計為作為一些可用原子int值來表示狀態(tài)的同步器的基類秫舌。如果你有看過類似 CountDownLatch 類的源碼實現(xiàn)的妖,會發(fā)現(xiàn)其內部有一個繼承了 AbstractQueuedSynchronizer 的內部類 Sync ∽阍桑可見 CountDownLatch 是基于AQS框架來實現(xiàn)的一個同步器.類似的同步器在JUC下還有不少嫂粟。(eg. Semaphore )
AQS用法
如上所述,AQS管理一個關于狀態(tài)信息的單一整數(shù)墨缘,該整數(shù)可以表現(xiàn)任何狀態(tài)星虹。比如, Semaphore 用它來表現(xiàn)剩余的許可數(shù)镊讼,ReentrantLock 用它來表現(xiàn)擁有它的線程已經(jīng)請求了多少次鎖宽涌;FutureTask 用它來表現(xiàn)任務的狀態(tài)(尚未開始、運行蝶棋、完成和取消)
如JDK的文檔中所說卸亮,使用AQS來實現(xiàn)一個同步器需要覆蓋實現(xiàn)如下幾個方法,并且使用getState
,setState
,compareAndSetState
這幾個方法來設置獲取狀態(tài)
boolean tryAcquire(int arg)
boolean tryRelease(int arg)
int tryAcquireShared(int arg)
boolean tryReleaseShared(int arg)
-
boolean isHeldExclusively()
以上方法不需要全部實現(xiàn)玩裙,根據(jù)獲取的鎖的種類可以選擇實現(xiàn)不同的方法
J.U.C是基于AQS(AbstractQueuedSynchronizer
)實現(xiàn)的兼贸,AQS是一個同步器,設計模式是模板模式吃溅。
核心數(shù)據(jù)結構:雙向鏈表 + state(鎖狀態(tài))
底層操作:CAS
首先溶诞,我們根據(jù)API的方法功能,由我們前面階段學習的知識進行一個自己定義的AQS罕偎,來加深印象很澄。
// 抽象隊列同步器
// state, owner颜及, waiters
public class kfAqs {
// acquire甩苛、 acquireShared : 定義了資源爭用的邏輯,如果沒拿到俏站,則等待讯蒲。
// tryAcquire、 tryAcquireShared : 實際執(zhí)行占用資源的操作肄扎,如何判定一個由使用者具體去實現(xiàn)墨林。
// release、 releaseShared : 定義釋放資源的邏輯犯祠,釋放之后旭等,通知后續(xù)節(jié)點進行爭搶。
// tryRelease衡载、 tryReleaseShared: 實際執(zhí)行資源釋放的操作搔耕,具體的AQS使用者去實現(xiàn)。
// 1痰娱、 如何判斷一個資源的擁有者
public volatile AtomicReference<Thread> owner = new AtomicReference<>();
// 2弃榨、 保存正在等待的線程
public volatile LinkedBlockingQueue<Thread> waiters = new LinkedBlockingQueue<>();
// 3菩收、 記錄資源的狀態(tài)
public volatile AtomicInteger state = new AtomicInteger(0);
// 共享資源占用的邏輯,返回資源的占用情況
public int tryAcquireShared(){
throw new UnsupportedOperationException();
}
public void acquireShared(){
boolean addQ = true;
while(tryAcquireShared() < 0) {
if (addQ) {
// 沒拿到鎖鲸睛,加入到等待集合
waiters.offer(Thread.currentThread());
addQ = false;
} else {
// 阻塞 掛起當前的線程娜饵,不要繼續(xù)往下跑了
LockSupport.park(); // 偽喚醒,就是非unpark喚醒的
}
}
waiters.remove(Thread.currentThread()); // 把線程移除
}
// 共享資源釋放的邏輯官辈,返回資源是否已釋放
public boolean tryReleaseShared(){
throw new UnsupportedOperationException();
}
public void releaseShared(){
if (tryReleaseShared()) {
// 通知等待者
Iterator<Thread> iterator = waiters.iterator();
while (iterator.hasNext()) {
Thread next = iterator.next();
LockSupport.unpark(next); // 喚醒
}
}
}
// 獨占資源相關的代碼
public boolean tryAcquire() { // 交給使用者去實現(xiàn)箱舞。 模板方法設計模式
throw new UnsupportedOperationException();
}
public void acquire() {
boolean addQ = true;
while (!tryAcquire()) {
if (addQ) {
// 沒拿到鎖,加入到等待集合
waiters.offer(Thread.currentThread());
addQ = false;
} else {
// 阻塞 掛起當前的線程钧萍,不要繼續(xù)往下跑了
LockSupport.park(); // 偽喚醒褐缠,就是非unpark喚醒的
}
}
waiters.remove(Thread.currentThread()); // 把線程移除
}
public boolean tryRelease() {
throw new UnsupportedOperationException();
}
public void release() { // 定義了 釋放資源之后要做的操作
if (tryRelease()) {
// 通知等待者
Iterator<Thread> iterator = waiters.iterator();
while (iterator.hasNext()) {
Thread next = iterator.next();
LockSupport.unpark(next); // 喚醒
}
}
}
public AtomicInteger getState() {
return state;
}
public void setState(AtomicInteger state) {
this.state = state;
}
}
資源占用流程圖
源碼解析
static final class Node {
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;// 等待超時或被中斷
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;// 釋放鎖之后哲泊,是否通知后一個節(jié)點
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;// 處于等待隊列中缸血,結點的線程等待在Condition上
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;// 共享模式中使用剃根,線程處于可運行狀態(tài)
//核心數(shù)據(jù)結構:雙向鏈表 + state(鎖狀態(tài))
//資源爭用的邏輯
public final void acquire(int arg) {
if (!tryAcquire(arg) && // 判斷是否拿到鎖
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false; // 當前線程釋放中斷的標志位
for (;;) {// 不斷嘗試
final Node p = node.predecessor(); // 獲取前一個節(jié)點
if (p == head && tryAcquire(arg)) { // 如果前一個節(jié)點是head沥曹,嘗試搶一次鎖
setHead(node); // 更換head
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&// 檢查狀態(tài)险掀,是否需要掛起線程
parkAndCheckInterrupt())// 如果需要掛起魁索,則通過Park進入停車場掛起
interrupted = true; // 如果出現(xiàn)中斷履植,則修改標記
}
} finally {
if (failed)
cancelAcquire(node);
}
}
//資源釋放的邏輯
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head; // 從頭開始找
if (h != null && h.waitStatus != 0)
unparkSuccessor(h); // 喚醒下一個線程
return true;
}
return false;
}
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
/** 喚醒等待者
* Wakes up node's successor, if one exists.
*
* @param node the node
*/
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus; // 正在釋放鎖的線程節(jié)點狀態(tài)
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0); // 修改當前節(jié)點狀態(tài)
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next; // 找下一個節(jié)點
if (s == null || s.waitStatus > 0) { // 如果不存在或者被取消了烫堤,繼續(xù)尋找合適的下一個節(jié)點
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null) // 如果找到了合適的節(jié)點瞬雹,就喚醒它
LockSupport.unpark(s.thread);
}