簡(jiǎn)述
AQS全稱AbstractQueuedSynchronizer塞栅,提供實(shí)現(xiàn)阻塞鎖和相關(guān)的框架
JDK中使用AQS來(lái)實(shí)現(xiàn)的同步工具類有ReentrantLock、ReentrantReadWriteLock作烟、Semaphore、CountDownLatch拿撩。
AQS中有一個(gè)狀態(tài)變量,子類通過(guò)改變這個(gè)狀態(tài)變量來(lái)改變鎖的狀態(tài)影暴。
AQS原理解析
通過(guò)Lock來(lái)講解,講解ReentrantLock中的非公平鎖
final void lock() {
//通過(guò)cas將狀態(tài)設(shè)置成1
if (compareAndSetState(0, 1))
//將線程設(shè)置成當(dāng)前線程
setExclusiveOwnerThread(Thread.currentThread());
else
//獲取線程
acquire(1);
}
public final void acquire(int arg) {
//嘗試獲忍胶铡(子類實(shí)現(xiàn)),獲取失敗加入CLH隊(duì)列伦吠、中斷當(dāng)前線程
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {//鎖未被占用
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {//當(dāng)前線程是本線程
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
//添加等待者節(jié)點(diǎn)毛仪、采用尾插法
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
/**
* 通過(guò)死循環(huán)遍歷,只有遍歷到p是頭節(jié)點(diǎn)并且能夠獲得資源才可以退出谱姓。
* shouldParkAfterFailedAcquire判斷當(dāng)前節(jié)點(diǎn)的狀態(tài)刨晴,
* 通過(guò)parkAndCheckInterrupt將線程設(shè)成waitting狀態(tài)
* 需要中斷或unpack喚醒
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
release
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*找到隊(duì)列中可以運(yùn)行的狈癞,最靠近頭部的節(jié)點(diǎn)
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//喚醒該線程
if (s != null)
LockSupport.unpark(s.thread);
}
講解CLH隊(duì)列
<pre>
* +------+ prev +-----+ +-----+
* head | | <---- | | <---- | | tail
* +------+ +-----+ +-----+
* </pre>
CLH隊(duì)列通過(guò)雙向鏈表實(shí)現(xiàn)隊(duì)列的功能茂契,將等待的線程放入到隊(duì)列中蝶桶。
AQS優(yōu)點(diǎn)
- 用戶可以通過(guò)AQS快速的實(shí)現(xiàn)自己的同步工具類
- AQS相對(duì)于內(nèi)置鎖有更高的性能。AQS通過(guò)CAS實(shí)現(xiàn)掉冶,實(shí)現(xiàn)了無(wú)鎖化真竖。