AQS概述
AbstractQueuedSynchronizer(AQS)是一個提供基礎(chǔ)框架婿崭,JDK提供的Lock是通過AQS框架完成铡原,程序員也可以利用AQS實(shí)現(xiàn)自己的鎖绣溜。以JDK提高的ReentrantLock為例鞭达,如果創(chuàng)建了一個ReentrantLock類的對象lock话肖,lock對象中就包含了AQS的一個子類的實(shí)例sync疲酌。
AQS的大致邏輯是:客戶端代碼在執(zhí)行l(wèi)ock.lock()的方法的時候蜡峰,當(dāng)前線程會去檢測AQS的鎖標(biāo)志(同步狀態(tài))來判斷鎖是否可以被持有,如果鎖被其他線程占用朗恳,那么當(dāng)前線程會被放到AQS的等待隊(duì)列中湿颅。檢測AQS所標(biāo)志位,其實(shí)就是檢查lock對象中sync變量中的鎖標(biāo)志位粥诫,AQS的等待隊(duì)列也就是sync中的等待隊(duì)列油航。
程序員可以使用AQS提供的getState(),setState()和compareAndSetState()方法來檢查和修改AQS中鎖標(biāo)志。程序員只需要重寫如下方法:
下面方法是排它鎖
- tryAcquire(int)
- tryRelease(int)
下面方法是共享鎖 - tryAcquireShared(int)
- tryReleaseShared(int)
- isHeldExclusively()
AQS的其他方法都是final怀浆,不需要改變谊囚,AQS本質(zhì)上不是個抽象類。
AQS實(shí)現(xiàn)原理
AQS保護(hù)了一個FIFO隊(duì)列执赡,該隊(duì)列是一個雙向列表镰踏,實(shí)現(xiàn)隊(duì)列的數(shù)據(jù)結(jié)構(gòu)是定義在AQS中的Node類。該隊(duì)列主要保存獲取鎖失敗的線程沙合。AQS中的基本字段:
//指向FIFO列表的頭結(jié)點(diǎn)
private transient volatile Node head;
//指向FIFO列表的尾結(jié)點(diǎn)
private transient volatile Node tail;
/**同步狀態(tài) 0表示鎖可以被線程獲取 1表示鎖被其他線程持有
*線程在執(zhí)行l(wèi)ock方法的時候奠伪,就是通過判斷和修改該值來申請鎖
/*
private volatile int state;
//該屬性是AQS父類的字段,記錄了獨(dú)占鎖的線程
private transient Thread exclusiveOwnerThread;
介紹AQS的實(shí)現(xiàn)原理首懈,需要從它的某個子類開始绊率,這里以ReentrantLock為例。首先定義一個需要同步的測試類
class LockTest{
Lock lock = new ReentrantLock();
public void p(){
try {
lock.lock();
while (true){
}
}
finally {
lock.unlock();
}
}
}
然后定義訪問同步代碼的線程類:
class LockA extends Thread{
LockTest lockTest;
public LockA(LockTest l,String name){
super(name);
lockTest = l;
}
@Override
public void run() {
lockTest.p();
}
}
下面是測試函數(shù):
public static void main(String[] args) throws Exception{
LockTest lt = new LockTest();
LockA lockA = new LockA(lt,"A");
LockA lockB = new LockA(lt,"B");
LockA lockC = new LockA(lt,"C");
LockA lockD = new LockA(lt,"D");
lockA.start();
//讓線程A先啟動
Thread.sleep(500);
lockB.start();
lockC.start();
lockD.start();
}
用idea進(jìn)行斷點(diǎn)調(diào)試究履,首先調(diào)用lockA.start(),讓lockA線程進(jìn)入臨界區(qū)滤否,再進(jìn)入臨界區(qū)之前,會調(diào)用lock.lock()方法先獲取鎖最仑。lock.lock()的實(shí)現(xiàn)如下:
final void lock() {
//CAS更新state狀態(tài)值
if (compareAndSetState(0, 1))
//由于ReenterLock是獨(dú)占鎖,下面的是調(diào)用AQS父類的方法去設(shè)置當(dāng)前擁有獨(dú)占鎖的線程 setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
compareAndSetState是CAS操作藐俺,來判斷AQS中state的狀態(tài)是否為0,如果是0則更新為1泥彤,因?yàn)閘ockA第一個進(jìn)入臨界區(qū)的線程紊搪,那么它執(zhí)行compareAndSetState會成功,然后就會去執(zhí)行if語句的其他操作:擁有獨(dú)占鎖的線程全景。如果compareAndSetState()操作失敗,那么將會去執(zhí)行acquire(1)牵囤,這個方法的邏輯等一下lockB線程的時候再分析爸黄。
lockA執(zhí)行完setExclusiveOwnerThread方法后滞伟,獲取鎖完成,開始執(zhí)行臨界區(qū)代碼炕贵。
下面lockB線程啟動梆奈,同樣調(diào)用lock.lock()方法來獲取鎖,由于lockA正在執(zhí)行臨界區(qū)代碼称开,鎖沒有釋放亩钟,因此lockB線程在執(zhí)行compareAndSetState操作時就會失敗,轉(zhuǎn)而去執(zhí)行acquire(1);下面是acquire方法的代碼:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
該方法執(zhí)行邏輯:
1.如果tryAcquire方法返回false鳖轰,然后去執(zhí)行2;返回true,方法退出
- addWaiter方法是向FIFO列表中增加一個等待節(jié)點(diǎn)清酥,然后執(zhí)行3
- acqd如果返回true,那么執(zhí)行4,否則退出
tryAcquire方法邏輯
tryAcquire方法是AQS提供的需要實(shí)現(xiàn)者重新的方法蕴侣,用于以獨(dú)占式獲取鎖焰轻,在ReenterLock中,tryAcquire的實(shí)現(xiàn)如下
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
nonfairTryAcquire(acquires)方法實(shí)現(xiàn)如下:
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
//再次獲取AQS鎖狀態(tài)昆雀,如果此時之前的線程已經(jīng)執(zhí)行完成,lockB可以嘗試調(diào)用
//compareAndSetState方法獲取鎖,入股返回true那么lockB線程就不需要加入到FIFO隊(duì)列
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}//如果獲取鎖狀態(tài)不為0,然后檢查當(dāng)前持有鎖的線程是不是當(dāng)前執(zhí)行的線程,
//如果是同一個線程求妹,那么就把AQS的鎖狀態(tài)值加上acquires
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
//如果compareAndSetState執(zhí)行失敗或者current != getExclusiveOwnerThread()
//那么tryAcquire方法返回false止吁,然后去執(zhí)行把當(dāng)前線程加入到FIFO列表中的邏輯
return false;
}
addWaiter方法邏輯
當(dāng)tryAcquire方法返回false,然后去執(zhí)行addWaiter(Node.EXCLUSIVE)方法,Node.EXCLUSIVE值為null
/** 該值表示創(chuàng)建的節(jié)點(diǎn)是獨(dú)占式的 */
static final Node EXCLUSIVE = null;
addWaiter方法的邏輯如下:
/**
* 為當(dāng)前線程創(chuàng)建了一個FIFO隊(duì)列的節(jié)點(diǎn),并且需要給節(jié)點(diǎn)設(shè)置鎖類型
*
* 參數(shù)mode:Node.EXCLUSIVE表示排他鎖,Node.SHARED表示共享鎖
*/
private Node addWaiter(Node mode) {
//用當(dāng)前線程和鎖類型創(chuàng)建一個Node對象
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
Node是AQS的內(nèi)部類挽封,它主要包括:
//FIFO列表等待線程的鎖模式:共享鎖
static final Node SHARED = new Node();
//FIFO列表等待線程的鎖模式:獨(dú)占鎖
static final Node EXCLUSIVE = null;
//線程狀態(tài) 取消執(zhí)行
static final int CANCELLED = 1;
//線程狀態(tài) 可以接受被喚起操作
static final int SIGNAL = -1;
//線程狀態(tài) 等待CONDITION條件
static final int CONDITION = -2;
//線程狀態(tài) 等待CONDITION條件并且無條件傳播
static final int PROPAGATE = -3;
//線程狀態(tài)
volatile int waitStatus;
//前序節(jié)點(diǎn)
volatile Node prev;
//后續(xù)節(jié)點(diǎn)
volatile Node next;
//保存當(dāng)前線程
volatile Thread thread;
//節(jié)點(diǎn)鎖模式
Node nextWaiter;
由于lockB在執(zhí)行的時候已球,head和tail都是null,因此if (pred != null) 不會執(zhí)行场仲,直接執(zhí)行enq方法:
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) {
//如果AQS尾節(jié)點(diǎn)是null和悦,說明AQS中的FIFO對里是空的,
//需要創(chuàng)建一個空的Node對象渠缕,作為FIFO的頭結(jié)點(diǎn)
if (compareAndSetHead(new Node()))
tail = head;
}
//如果tail不為null鸽素,把當(dāng)前AQS的尾節(jié)點(diǎn)賦給新創(chuàng)建的node節(jié)點(diǎn)的前序節(jié)點(diǎn),
//然后調(diào)用CAS把新創(chuàng)建的node設(shè)置為AQS的尾節(jié)點(diǎn)
else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
<font color=red>注:enq方法為AQS設(shè)置FILO的頭結(jié)點(diǎn)和尾節(jié)點(diǎn)亦鳞,使用CAS+死循環(huán)馍忽,直到設(shè)置成功。因?yàn)楫?dāng)一個線程在調(diào)用CAS設(shè)置的時候燕差,可能另外一個線程已經(jīng)調(diào)用CAS成功返回遭笋,因此會出現(xiàn)調(diào)用失敗的,如果調(diào)用失敗徒探,需要循環(huán)重新設(shè)置瓦呼。enq只有在成功設(shè)置了tail之后才會返回</font>
由于本次調(diào)試是用多線程debug模式,因此测暗,不會出現(xiàn)多個線程同時執(zhí)行CAS的情況央串,因此當(dāng)lockB執(zhí)行到compareAndSetHead會成功返回磨澡,這樣就創(chuàng)建了一個空的頭結(jié)點(diǎn),然后把頭結(jié)點(diǎn)對象賦值給tail质和,此時FIFO隊(duì)列狀態(tài)如下圖:
addWaiter方法執(zhí)行完成,線程被放到FIFO列表中殿托。下面是執(zhí)行acquireQueued方法:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//獲取當(dāng)前線程的prev節(jié)點(diǎn)
final Node p = node.predecessor();
//如果p是頭結(jié)點(diǎn)霹菊,那么讓當(dāng)前線程再嘗試獲取一次AQS節(jié)點(diǎn)鎖狀態(tài),如果獲成功支竹,
//那么把當(dāng)前線程所在的node設(shè)置為head旋廷,并且把node節(jié)點(diǎn)其prev、next礼搁,thread
//都變?yōu)閚ull
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
根據(jù)上面源碼饶碘,p是node的前序節(jié)點(diǎn),根據(jù)當(dāng)前程序運(yùn)行狀態(tài)p==head馒吴,但是tryAcquire方法執(zhí)行返回false扎运。然后執(zhí)行shouldParkAfterFailedAcquire方法,該方法有兩個邏輯饮戳,如果當(dāng)前節(jié)點(diǎn)的前序節(jié)點(diǎn)p.waitStatus=SIGNAL,那么返回true豪治;如果是大于0,那么需要把這些大于0的node從FIFO中移除(按照從后往前遍歷的方法)并返回false。如果是0扯罐,把當(dāng)前線程鎖在的node節(jié)點(diǎn)前序節(jié)點(diǎn)的waitStatus設(shè)置為SIGNAL(-1)负拟,返回false。
程序返回后厨姚,接著循環(huán)開始處執(zhí)行。此刻當(dāng)程序重新執(zhí)行到shouldParkAfterFailedAcquire方法是因?yàn)閜節(jié)點(diǎn)的waitStatus=-1键菱,因此返回true谬墙。然后程序接著執(zhí)行parkAndCheckInterrupt()方法:
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
調(diào)用LockSupport.park(this),當(dāng)前線程會被所阻塞一直到調(diào)用LockSupport.uppark()方法喚起险耀。
接著執(zhí)行測試程序的lockC.start();第三個線程啟動玖喘。第三個線程經(jīng)過上面的步驟后執(zhí)行到acquireQueued()時,F(xiàn)IFO隊(duì)列的狀態(tài)如下圖:然后執(zhí)行到shouldParkAfterFailedAcquire方法蘑志,由于當(dāng)前線程是lockC累奈,那么他的前序節(jié)點(diǎn)是lockB急但,此時lockB的waitStatus=0,因此shouldParkAfterFailedAcquire執(zhí)行修改lockB的waitStatus=-1的操作波桩,然后方法返回false戒努,然后在從循環(huán)開始處執(zhí)行。然后又會執(zhí)行到shouldParkAfterFailedAcquire方法镐躲,此時由于lockC的前序節(jié)點(diǎn)lockB的waitStatus==-1直接返回true,然后當(dāng)前線程lockC執(zhí)行parkAndCheckInterrupt()把自己阻塞萤皂。
下面分析鎖的釋放。鎖釋放是持有線程執(zhí)行:
finally {
lock.unlock();
}
unlock方法調(diào)用的sync.release(1);此方法的具體實(shí)現(xiàn)如下:
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
tryRelease()方法是在ReenterLock類中的Sync內(nèi)部實(shí)現(xiàn)端礼,他主要邏輯是用當(dāng)前AQS的鎖狀態(tài)-1的差值是否與0相等入录,如果不等于0,那說明還需要等其他線程調(diào)用unLock(),tryRelease()返回false僚稿,這個差值計(jì)算是為了實(shí)現(xiàn)共享鎖的使用贫奠。直到AQS的鎖狀態(tài)與1的差值為0,然后返回true
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
當(dāng)該方法返回true之后唤崭,執(zhí)行unparkSuccessor(h)這里參數(shù)h是FIFO列表的頭結(jié)點(diǎn)谢肾,具體代碼邏輯如下:
private void unparkSuccessor(Node node) {
//獲取頭結(jié)點(diǎn)的鎖狀態(tài)信息
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//把頭結(jié)點(diǎn)的后繼節(jié)點(diǎn)所存儲的線程從阻塞狀態(tài)中釋放出來
if (s != null)
LockSupport.unpark(s.thread);
}
當(dāng)執(zhí)行到LockSupport.unpark(s.thread)時,F(xiàn)IFO第一個阻塞線程會從阻塞狀態(tài)中返回,也就是調(diào)用LockSupport.park()方法返回微姊,執(zhí)行后面代碼分预,然后開始執(zhí)行acquireQueued方法。此方法邏輯上面已經(jīng)分析過了配喳。
到此,AQS基本加鎖解鎖已經(jīng)分析完成凳干。