AQS
隊列同步器(AbstractQueuedSynchronizer)簡稱AQS,是J.U.C同步構(gòu)件的基礎,包括ReentrantLock谅阿、ReentrantReadWriteLock根悼、CountDownLatch、Semaphor都是基于AQS實現(xiàn)的新博。
理解AQS是理解這些同步工具的基礎,基于AQS提供的同步語義可以定制各種功能的同步工具。
AQS原理
用一個int類型的狀態(tài)變量(volatile)state記錄同步狀態(tài)业簿,默認值是0
用一個雙向鏈表實現(xiàn)的隊列對線程進程進行排隊和調(diào)度
A線程使用compareAndSet(state,0,1)原子設置state的
值,設置成功說明state當前無其他線程爭用阳懂,A線程取鎖的使用權(quán)辖源。
設置不成功,說明B線程對state的值進行了設置希太,并且沒有復位(state!=0)克饶,B線程持有鎖的使用權(quán)(B線程還沒有釋放鎖)。A線程會構(gòu)造成一個Node節(jié)點加入隊列尾部并掛起誊辉。
當B線程執(zhí)行完同步操作后矾湃,對state進行復位(state==0),即釋放鎖堕澄,然后從隊列頭開始尋找邀跃,發(fā)現(xiàn)正在沉睡的A線程,將其喚醒蛙紫。
Node節(jié)點
static final class Node {
/**共享模式 */
static final Node SHARED = new Node();
/**獨占模式 */
static final Node EXCLUSIVE = null;
/**取消狀態(tài)拍屑,由于在同步隊列中等待的線程等待超時或被中斷,
* 需要從同步隊列中取消等待坑傅。
*/
static final int CANCELLED = 1;
/**通知狀態(tài)僵驰,當前節(jié)點的后繼節(jié)點包含的線程需要運行(unpark)
* 當前節(jié)點的線程如果釋放了同步狀態(tài)或者被取消,將通知后續(xù)節(jié)點唁毒。
*/
static final int SIGNAL = -1;
/**條件阻塞狀態(tài)蒜茴,節(jié)點線程等待在Condition上,
* 當其他線程對Condition調(diào)用了signal()方法后浆西,
* 該節(jié)點將會從等待隊列中轉(zhuǎn)移到同步隊列中粉私,
* 加入到對同步狀態(tài)的獲取中。
*/
static final int CONDITION = -2;
/**傳播狀態(tài)近零,表示當前場景下后續(xù)的acquireShared能夠得以執(zhí)行诺核。*/
static final int PROPAGATE = -3;
/**節(jié)點的的狀態(tài)
* 初始狀態(tài)為0 表示當前節(jié)點在sync隊列中抄肖,等待著獲取狀態(tài)。
*/
volatile int waitStatus;
/** 前驅(qū)節(jié)點 */
volatile Node prev;
/** 后繼節(jié)點 */
volatile Node next;
/**節(jié)點對應的線程窖杀,等待獲取同步狀態(tài)的線程漓摩。 */
volatile Thread thread;
/**下一等待節(jié)點*/
Node nextWaiter;
/**是否共享模式 */
final boolean isShared() {
return nextWaiter == SHARED;
}
/**獲取前驅(qū)節(jié)點 */
final Node predecessor() throws NullPointerException {}
Node() {}
Node(Thread thread, Node mode) {}
Node(Thread thread, int waitStatus) {}
}
EXCLUSIVE、SHARED是節(jié)點的兩種模式:獨占模式和共享模式陈瘦,分別對應獨占鎖和共享鎖這兩種典型的鎖幌甘。
thread就是節(jié)點對應的線程。
waitStatus指節(jié)點狀態(tài):
1.取消狀態(tài)CANCELLED
2.通知狀態(tài)SIGNAL
3.條件阻塞狀態(tài)CONDITION
4.傳播狀態(tài)PROPAGATE
獨占模式
獨占鎖也稱排它鎖痊项,一次只允許一個線程獲取到鎖锅风,鎖未釋放前其他線程無法獲取到鎖。
Synchronized關(guān)鍵字獲取的內(nèi)置鎖就是一個獨占鎖鞍泉,每次只能一個線程獲得對象的監(jiān)視器進入臨界區(qū)皱埠。
一個獨占鎖的例子,然后來分析AQS是如何實現(xiàn)線程同步的咖驮。
/**
* Lock是J.U.C中的接口接口边器,定義了一組體現(xiàn)完整鎖語義的方法。
*/
public class ExclusiveLock implements Lock {
private static class Sync extends AbstractQueuedSynchronizer {
// 用CAS操作設置同步狀態(tài)State托修,當前線程把state改為1
// 其他線程便修改不了忘巧,可以看成當前線程持有了鎖。
protected boolean tryAcquire(int acquires) {
if (this.compareAndSetState(0, acquires)) {
// 將持有線程設置為當前線程
this.setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
// 釋放同步狀態(tài)
protected boolean tryRelease(int releases) {// 釋放同步狀態(tài)
if (Thread.currentThread() != this.getExclusiveOwnerThread()) {
throw new IllegalMonitorStateException();
}
if (this.getState() == 0) {
throw new IllegalMonitorStateException();
}
this.setExclusiveOwnerThread(null);
this.setState(0);
return true;
}
// 當前線程是否持有線程
protected final boolean isHeldExclusively() {
return getExclusiveOwnerThread() == Thread.currentThread();
}
// 實例化Condition對象
final ConditionObject newCondition() {
return new ConditionObject();
}
}
// 隊列同步器實例
private final Sync sync = new Sync();
// 加鎖睦刃,線程請求到鎖則返回砚嘴,請求不到鎖則阻塞
public void lock() {
sync.acquire(1);
}
// 非阻塞加鎖,線程請求到鎖返回true涩拙,請求不到鎖返回false际长,不會阻塞。
public boolean tryLock() {
return sync.tryAcquire(1);
}
// 解鎖兴泥,
public void unlock() {
sync.release(1);
}
// 加鎖 響應中斷工育,在獲取鎖的過程中線程被中斷拋出InterruptedException異常,停止鎖獲取
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
// 非阻塞加鎖 時間限制
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(time));
}
// 實例化 Condition
public Condition newCondition() {
return sync.newCondition();
}
}
獨占模式加鎖流程:
// s1
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
s1: 調(diào)用tryAcquire(1)加鎖失敗搓彻,構(gòu)造一個獨占模式(EXCLUSIVE)的Node準備入列,轉(zhuǎn)入s2如绸。
// s2
private Node addWaiter(Node mode) {
// 用當前線程和mode,實例化Node
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {// 隊列不為空
node.prev = pred;
if (compareAndSetTail(pred, node)) {// 快速入列,設置node為新的尾節(jié)點
pred.next = node;
return node;
}
}
enq(node);//隊列為空||CAS尾節(jié)點設置失敗
return node;
}
s2: 如果隊列已經(jīng)初始化,使用compareAndSetTail以CAS的方式將Node設置為尾節(jié)點,轉(zhuǎn)入s4;如果隊列為空或者設置尾節(jié)點失敗好唯,轉(zhuǎn)入s3
// s3
private Node enq(final Node node) {
for (;;) { // 循環(huán)確保入列成功
Node t = tail;// 尾節(jié)點
if (t == null) { // 未初始化竭沫,將頭、尾節(jié)點都指向空白節(jié)點
if (compareAndSetHead(new Node()))
tail = head;
} else { // node 入列
node.prev = t;// node的前驅(qū)指向尾節(jié)點
if (compareAndSetTail(t, node)) {// 設置尾節(jié)點為當前節(jié)點
t.next = node;
return t;
}
}
}
}
s3: 如果隊列未初始化,實例化一個空隊列tail和head都指向一個空白節(jié)點骑篙,使用compareAndSetTail以CAS的方式將Node設置為尾節(jié)點,在循環(huán)中確保設置成功森书,轉(zhuǎn)入s4
// s4
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();//前驅(qū)節(jié)點
if (p == head && tryAcquire(arg)) {//前驅(qū)節(jié)點為頭節(jié)點并成功獲取鎖
setHead(node);//設置節(jié)點為頭
p.next = null; // help GC 摘除原來的頭節(jié)點
failed = false;
return interrupted;
} // 前驅(qū)非頭節(jié)點或者重試獲取鎖失敗
// 線程在此處被掛起,當線程被喚醒后也會在這里重新進入for(;;)獲取鎖
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
s4: 如果Node節(jié)點在隊列中拍第二靶端,重試獲取鎖谎势,獲取成功后,將Node設置為頭節(jié)點直接返回杨名;否則進入s5
s5返回了true說明當前節(jié)點可掛起了脏榆,調(diào)用parkAndCheckInterrupt()方法將線程掛起,線程掛起操作和CAS操作一樣都是調(diào)用Unsafe中的native方法台谍。
如果此線程被中斷了须喂,他會被喚醒并且返回中斷標識true,進入到下次循環(huán)趁蕊,如果拿不到鎖還是接著park坞生,如果拿到鎖返回到s1,會記錄下中斷狀態(tài)selfInterrupt()掷伙,用戶可以自行處理中斷狀態(tài)是己,對流程沒有任何影響。
for(;;)主要是為了保障兩點:
一是每個Node掛起前都能將前驅(qū)節(jié)點的狀態(tài)設置為SIGNAL
二是在每個Node被喚醒后再次進入鎖的獲取中
循環(huán)中如果出現(xiàn)異常任柜,將取消鎖獲取 cancelAcquire(node)卒废。
// s5
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus; // 前驅(qū)節(jié)點狀態(tài)
if (ws == Node.SIGNAL)//狀態(tài)為SIGNAL,說明node可掛起,返回true
return true;
if (ws > 0) {//摘掉狀態(tài)為CANCELLED的前驅(qū)節(jié)點
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;// 找到非CANCELLED狀態(tài)的節(jié)點,將Node掛在其后面
} else {//為-3宙地、-2
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);// 設置前驅(qū)節(jié)點為SIGNAL狀態(tài)
}
return false;
}
s5: 此處主要是將前驅(qū)節(jié)點的waitStatus設置為SIGNAL摔认。
如果前驅(qū)節(jié)點的狀態(tài)為SIGNAL表示前驅(qū)節(jié)點為"通知狀態(tài)",前驅(qū)節(jié)點釋放鎖后會后繼等待的節(jié)點會被喚醒宅粥,所以Node可以放心的掛起参袱,直接返回true至s4 進行線程掛起。
如果前驅(qū)節(jié)點的狀態(tài)為CANCELLED表示前驅(qū)節(jié)點放棄了鎖獲取粹胯,通過循環(huán)向前查找到蓖柔,直到找到最近一個非CANCELLED狀態(tài)的節(jié)點,將node掛在它的后邊风纠,CANCELLED節(jié)點會被從隊列中摘除况鸣。
CAS設置前驅(qū)節(jié)點的狀態(tài)為SIGNAL,返回s4 false至s4,進入下次循環(huán)竹观,下次循環(huán)發(fā)現(xiàn)前驅(qū)節(jié)點的狀態(tài)已經(jīng)為SIGNAL了镐捧,可以掛起了。
獨占模式解鎖流程:
// s1
public final boolean release(int arg) {
if (tryRelease(arg)) {//解鎖成功
Node h = head;
if (h != null && h.waitStatus != 0)//頭節(jié)點不為空&&不在進行中
unparkSuccessor(h);//喚醒后續(xù)節(jié)點
return true;
}
return false;
}
s1:調(diào)用tryRelease(arg)進行解鎖邏輯臭增,解鎖成功懂酱,準備喚醒頭節(jié)點的后繼節(jié)點。
h.waitStatus ==0說明后繼節(jié)點正在喚醒中誊抛。
轉(zhuǎn)入s2列牺。
// s2
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)// head節(jié)點狀態(tài)設置為0
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {// s為空或CANCELLED
s = null;
// 從尾部開始向前查找,找到一個非CANCELLED狀態(tài)的節(jié)點
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;// 賦給s
}
if (s != null)// 喚醒s
LockSupport.unpark(s.thread);
}
s2:將head節(jié)點的狀態(tài)設置為0拗窃,表示有節(jié)點正在被喚醒瞎领。
如果后繼節(jié)點為空或者CANCELLED泌辫,從尾節(jié)點開始向前查找,找到一個非CANCELLED狀態(tài)的節(jié)點九默,將其賦值給s震放。
喚醒s節(jié)點,調(diào)用Unsafe中的unpark方法驼修。
需要注意的是被喚醒的節(jié)點還要回到acquireQueued()方法里的掛起點殿遂,再次進行鎖獲取,如果還是沒有獲取到鎖則接著被掛起。
共享模式
共享鎖是可以有多個線程獲取并持有的鎖乙各,獲取到鎖的線程都可以進入同步代碼塊執(zhí)行墨礁。
這有點像數(shù)據(jù)庫連接池,操作數(shù)據(jù)庫時先從池中借出一個連接觅丰,操作完畢饵溅,將連接歸還入池,供后續(xù)操作繼續(xù)借用妇萄。
AQS共享鎖實現(xiàn):
public class ShareLock implements Lock {
// 同步器
private static class Sync extends AbstractQueuedSynchronizer {
// 初始化蜕企,available:共享許可數(shù)
Sync(int available) {
this.setState(available);
}
// 獲取同步狀態(tài)
protected int tryAcquireShared(int acquires) {
for (;;) {
//當前許可數(shù)
int available = this.getState();
//減去1后的剩余量
int remaining = available - acquires;
//剩余量<0,直接返回
if (remaining < 0
//說明還有剩余量CAS設置available
|| compareAndSetState(available, remaining)) {
return remaining;
}
}
}
// 釋放同步狀態(tài)
protected boolean tryReleaseShared(int releases) {
for (;;) {
//當前許可數(shù)量
int current = this.getState();
//加1后的最新許可量
int available = current + releases;
if (available < current) // overflow
throw new Error("釋放不合法");
if (compareAndSetState(current, available))
return true;
}
}
}
// 同步器實例
private final Sync sync = new Sync(2);
// 加鎖
public void lock() {
sync.acquireShared(1);
}
// 非阻塞加鎖
public boolean tryLock() {
return sync.tryAcquireShared(1) > 0;
}
// 解鎖
public void unlock() {
sync.releaseShared(1);
}
// 加鎖 響應中斷
public void lockInterruptibly()
throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
// 非阻塞加鎖 時間限制
public boolean tryLock(long time, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(time));
}
// 實例化 Condition
public Condition newCondition() {
return null;
}
}
共享模式實現(xiàn)邏輯:
1:初始化同步器Sync時冠句,先設定許可共享數(shù)轻掩,即有多少把鎖,許可共享數(shù)保存在共享狀態(tài)state中懦底。
2:每次加鎖許可共享數(shù)都會減1作為剩余量唇牧,當剩余量小于0時,說明沒有可用的許可了聚唐,直接返回剩余量丐重,AQS中的“acquireShared”發(fā)現(xiàn)剩余量小于0,開始構(gòu)造Node進入排隊邏輯。
當還有剩余量(remaining>=0)說明線程還能獲取共享鎖杆查,剩余量減1扮惦,直接返回,取鎖成功亲桦。
3:釋放鎖時將剩余量加1崖蜜,CAS設置state為剩余量,設置成功則釋放鎖成功客峭。
4:獨占鎖釋放時沒有使用CAS操作豫领,因為獨占鎖釋放不存在線程爭用,共享鎖會出現(xiàn)多個線程釋放鎖的情況舔琅,state存在爭用等恐。:
5:共享鎖的newCondition()方法返回null,因為Condition只能使用在獨占鎖中。后面會專門分析條件隊列Condition。
6:理解了獨占鎖的加鎖解鎖流程鼠锈,再看共享鎖的加解鎖流程闪檬,應該沒有障礙星著,這里不再累述购笆。
7:需要注意共享鎖在喚醒的節(jié)點后,如發(fā)現(xiàn)還有剩余量虚循,還有節(jié)點在排隊同欠,將繼續(xù)喚醒后繼節(jié)。
小結(jié)
- AQS是J.U.C最核心的同步構(gòu)件横缔,也是J.U.C中最難弄懂的類之一铺遂,是理解重入鎖、讀寫鎖及J.U.C中其他同步工具的基礎茎刚。
- AQS提供了基礎的同步語義襟锐,可以根據(jù)應用場景創(chuàng)建不同的同步組件。
- 基于AQS實現(xiàn)的鎖膛锭,并非有些文檔說的自旋鎖粮坞,底層是使用Unsafe的park和unpark方法來掛起和喚醒線程。
碼字不易初狰,轉(zhuǎn)載請保留原文連接http://www.reibang.com/p/d291a6a1879c