1.什么是AbstractQueuedSynchronizer孵淘?
2.同步隊(duì)列中的節(jié)點(diǎn)(Node)
3.獨(dú)占式同步狀態(tài)獲取與釋放
4.并發(fā)問(wèn)題
5.掛起等待線(xiàn)程
6.一個(gè)例子
什么是AbstractQueuedSynchronizer涯冠?
AbstractQueuedSynchronizer即是同步器(簡(jiǎn)稱(chēng)AQS)行拢,同步器依賴(lài)內(nèi)部的同步隊(duì)列(一個(gè)FIFO雙向隊(duì)列)來(lái)完成同步狀態(tài)的管理囱晴,當(dāng)前線(xiàn)程獲取同步狀態(tài)失敗時(shí)详囤,同步器會(huì)將當(dāng)前線(xiàn)程以及等待狀態(tài)等信息構(gòu)造成為一個(gè)節(jié)點(diǎn)(Node)并將其加入同步隊(duì)列霜定,同時(shí)會(huì)阻塞當(dāng)前線(xiàn)程噩翠,當(dāng)同步狀態(tài)釋放時(shí),會(huì)把首節(jié)點(diǎn)中的線(xiàn)程喚醒言沐,使其再次嘗試獲取同步狀態(tài)邓嘹。
AQS的核心思想是基于volatile int state這樣的一個(gè)屬性同時(shí)配合Unsafe工具對(duì)其原子性的操作來(lái)實(shí)現(xiàn)對(duì)當(dāng)前鎖的狀態(tài)進(jìn)行修改。當(dāng)state的值為0的時(shí)候险胰,標(biāo)識(shí)改Lock不被任何線(xiàn)程所占有汹押。
同步隊(duì)列中的節(jié)點(diǎn)(Node)
同步隊(duì)列中的節(jié)點(diǎn)(Node)用來(lái)保存獲取同步狀態(tài)失敗的線(xiàn)程引用、等待狀態(tài)以及前驅(qū)和后繼節(jié)點(diǎn)起便,節(jié)點(diǎn)的屬性類(lèi)型與名稱(chēng)以及描述如下表所示
節(jié)點(diǎn)是構(gòu)成同步隊(duì)列(等待隊(duì)列)的基礎(chǔ)棚贾,同步器擁有首節(jié)點(diǎn)(head)和尾節(jié)點(diǎn)(tail)窖维,沒(méi)有成功獲取同步狀態(tài)的線(xiàn)程將會(huì)成為節(jié)點(diǎn)加入該隊(duì)列的尾部,同步隊(duì)列的基本結(jié)構(gòu)如下圖所示
同步器包含了兩個(gè)節(jié)點(diǎn)類(lèi)型的引用妙痹,一個(gè)指向頭節(jié)點(diǎn)铸史,而另一個(gè)指向尾節(jié)點(diǎn)。試想一下怯伊,當(dāng)一個(gè)線(xiàn)程成功地獲取了同步狀態(tài)(或者鎖)琳轿,其他線(xiàn)程將無(wú)法獲取到同步狀態(tài),轉(zhuǎn)而被構(gòu)造成為節(jié)點(diǎn)并加入到同步隊(duì)列中耿芹,而這個(gè)加入隊(duì)列的過(guò)程必須要保證線(xiàn)程安全崭篡,因此同步器提供了一個(gè)基于CAS的設(shè)置尾節(jié)點(diǎn)的方法:compareAndSetTail(Node expect,Node update),它需要傳遞當(dāng)前線(xiàn)程“認(rèn)為”的尾節(jié)點(diǎn)和當(dāng)前節(jié)點(diǎn)猩系,只有設(shè)置成功后媚送,當(dāng)前節(jié)點(diǎn)才正式與之前的尾節(jié)點(diǎn)建立關(guān)聯(lián)。
同步器將節(jié)點(diǎn)加入到同步隊(duì)列的過(guò)程
同步隊(duì)列遵循FIFO寇甸,首節(jié)點(diǎn)是獲取同步狀態(tài)成功的節(jié)點(diǎn),首節(jié)點(diǎn)的線(xiàn)程在釋放同步狀態(tài)時(shí)疗涉,將會(huì)喚醒后繼節(jié)點(diǎn)拿霉,而后繼節(jié)點(diǎn)將會(huì)在獲取同步狀態(tài)成功時(shí)將自己設(shè)置為首節(jié)點(diǎn),該過(guò)程
獨(dú)占式同步狀態(tài)獲取與釋放
當(dāng)多個(gè)線(xiàn)程同時(shí)去競(jìng)爭(zhēng)鎖的時(shí)候發(fā)生了什么咱扣?
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
多個(gè)線(xiàn)程同時(shí)進(jìn)來(lái)绽淘,他們會(huì)首先會(huì)通過(guò)CAS去修改state的狀態(tài),如果修改成功闹伪,那么競(jìng)爭(zhēng)成功沪铭,因此這個(gè)時(shí)候多個(gè)線(xiàn)程只有一個(gè)CAS成功,其他兩個(gè)線(xiàn)程失敗偏瓤,也就是tryAcquire返回false杀怠。
接下來(lái),addWaiter會(huì)把將當(dāng)前線(xiàn)程關(guān)聯(lián)的EXCLUSIVE類(lèi)型的節(jié)點(diǎn)入隊(duì)列:
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
如果隊(duì)尾節(jié)點(diǎn)不為null厅克,則說(shuō)明隊(duì)列中已經(jīng)有線(xiàn)程在等待了赔退,那么直接入隊(duì)尾。對(duì)于我們舉的例子证舟,這邊的邏輯應(yīng)該是走enq硕旗,也就是開(kāi)始隊(duì)尾是null,其實(shí)這個(gè)時(shí)候整個(gè)隊(duì)列都是null的女责。
private Node enq(Node node) {
for (;;) {
Node oldTail = tail;
if (oldTail != null) {
node.setPrevRelaxed(oldTail);
if (compareAndSetTail(oldTail, node)) {
oldTail.next = node;
return oldTail;
}
} else {
initializeSyncQueue();
}
}
}
如果Thread2和Thread3同時(shí)進(jìn)入了enq漆枚,同時(shí)t == null,則同時(shí)進(jìn)入initializeSyncQueue()方法;initializeSyncQueue方法通過(guò)HEAD.compareAndSet(this, null, (h = new Node()創(chuàng)建同步器tail抵知,第二次同步器尾節(jié)點(diǎn)不為空則都進(jìn)入if代碼塊墙基,進(jìn)行CAS加入隊(duì)尾昔榴,這個(gè)時(shí)候只有一個(gè)線(xiàn)程能夠成功,然后其他繼續(xù)進(jìn)入循環(huán)碘橘,因此這個(gè)時(shí)候又是只有一個(gè)線(xiàn)程成功互订,我們假設(shè)是Thread2成功,哈哈痘拆,Thread2開(kāi)心的返回了仰禽,Thread3失落的再進(jìn)行下一次的循環(huán),最終入隊(duì)列成功纺蛆,返回自己吐葵。
并發(fā)問(wèn)題
基于上面兩段代碼,他們是如何實(shí)現(xiàn)不進(jìn)行加鎖桥氏,當(dāng)有多個(gè)線(xiàn)程温峭,或者說(shuō)很多很多的線(xiàn)程同時(shí)執(zhí)行的時(shí)候,怎么能保證最終他們都能夠乖乖的入隊(duì)列而不會(huì)出現(xiàn)并發(fā)問(wèn)題的呢字支?
這也是這部分代碼的經(jīng)典之處凤藏,多線(xiàn)程競(jìng)爭(zhēng),熱點(diǎn)堕伪、單點(diǎn)在隊(duì)列尾部揖庄,多個(gè)線(xiàn)程都通過(guò)【CAS+死循環(huán)】這個(gè)free-lock黃金搭檔來(lái)對(duì)隊(duì)列進(jìn)行修改,每次能夠保證只有一個(gè)成功欠雌,如果失敗下次重試蹄梢,如果是N個(gè)線(xiàn)程,那么每個(gè)線(xiàn)程最多l(xiāng)oop N次富俄,最終都能夠成功禁炒。
掛起等待線(xiàn)程
上面只是addWaiter的實(shí)現(xiàn)部分,那么節(jié)點(diǎn)入隊(duì)列之后會(huì)繼續(xù)發(fā)生什么呢霍比?那就要看看acquireQueued是怎么實(shí)現(xiàn)的了幕袱,為保證文章整潔,代碼我就不貼了桂塞,同志們自行查閱凹蜂,我們還是以上面的例子來(lái)看看,Thread2和Thread3已經(jīng)被放入隊(duì)列了阁危,進(jìn)入acquireQueued之后:
對(duì)于Thread2來(lái)說(shuō)玛痊,它的prev指向HEAD,因此會(huì)首先再?lài)L試獲取鎖一次狂打,如果失敗擂煞,則會(huì)將HEAD的waitStatus值為SIGNAL,下次循環(huán)的時(shí)候再去嘗試獲取鎖趴乡,如果還是失敗对省,且這個(gè)時(shí)候prev節(jié)點(diǎn)的waitStatus已經(jīng)是SIGNAL蝗拿,則這個(gè)時(shí)候線(xiàn)程會(huì)被通過(guò)LockSupport掛起捺信。
對(duì)于Thread3來(lái)說(shuō)谎懦,它的prev指向Thread2,因此直接看看Thread2對(duì)應(yīng)的節(jié)點(diǎn)的waitStatus是否為SIGNAL痪欲,如果不是則將它設(shè)置為SIGNAL劳秋,再給自己一次去看看自己有沒(méi)有資格獲取鎖仓手,如果Thread2還是擋在前面,且它的waitStatus是SIGNAL玻淑,則將自己掛起嗽冒。
如果Thread1死死的握住鎖不放,那么Thread2和Thread3現(xiàn)在的狀態(tài)就是掛起狀態(tài)啦补履,而且HEAD添坊,以及Thread的waitStatus都是SIGNAL,盡管他們?cè)谡麄€(gè)過(guò)程中曾經(jīng)數(shù)次去嘗試獲取鎖箫锤,但是都失敗了贬蛙,失敗了不能死循環(huán)呀,所以就被掛起了麻汰。當(dāng)前狀態(tài)如下:
一個(gè)例子
在上述對(duì)同步器AbstractQueuedSynchronizer進(jìn)行了實(shí)現(xiàn)層面的分析之后速客,我們通過(guò)一個(gè)例子來(lái)加深對(duì)同步器的理解:
設(shè)計(jì)一個(gè)同步工具,該工具在同一時(shí)刻五鲫,只能有兩個(gè)線(xiàn)程能夠并行訪(fǎng)問(wèn),超過(guò)限制的其他線(xiàn)程進(jìn)入阻塞狀態(tài)岔擂。
對(duì)于這個(gè)需求位喂,可以利用同步器完成一個(gè)這樣的設(shè)定,定義一個(gè)初始狀態(tài)乱灵,為2塑崖,一個(gè)線(xiàn)程進(jìn)行獲取那么減1,一個(gè)線(xiàn)程釋放那么加1痛倚,狀態(tài)正確的范圍在[0规婆,1,2]三個(gè)之間蝉稳,當(dāng)在0時(shí)抒蚜,代表再有新的線(xiàn)程對(duì)資源進(jìn)行獲取時(shí)只能進(jìn)入阻塞狀態(tài)(注意在任何時(shí)候進(jìn)行狀態(tài)變更的時(shí)候均需要以CAS作為原子性保障)。由于資源的數(shù)量多于1個(gè)耘戚,同時(shí)可以有兩個(gè)線(xiàn)程占有資源嗡髓,因此需要實(shí)現(xiàn)tryAcquireShared和tryReleaseShared方法
public class TwinsLock implements Lock {
private final Sync sync = new Sync(2);
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -7889272986162341211L;
Sync(int count) {
if (count <= 0) {
throw new IllegalArgumentException("count must large than zero.");
}
setState(count);
}
public int tryAcquireShared(int reduceCount) {
for (;;) {
int current = getState();
int newCount = current - reduceCount;
if (newCount < 0 || compareAndSetState(current, newCount)) {
return newCount;
}
}
}
public boolean tryReleaseShared(int returnCount) {
for (;;) {
int current = getState();
int newCount = current + returnCount;
if (compareAndSetState(current, newCount)) {
return true;
}
}
}
final ConditionObject newCondition() {
return new ConditionObject();
}
}
public void lock() {
sync.acquireShared(1);
}
public void unlock() {
sync.releaseShared(1);
}
public void lockInterruptibly() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public boolean tryLock() {
return sync.tryAcquireShared(1) >= 0;
}
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(time));
}
@Override
public Condition newCondition() {
return sync.newCondition();
}
}
測(cè)試用例
public class TwinsLockTest {
public void test() {
final Lock lock = new TwinsLock();
class Worker extends Thread {
public void run() {
while (true) {
lock.lock();
try {
SleepUtils.second(1);
System.out.println(Thread.currentThread().getName());
SleepUtils.second(1);
} finally {
lock.unlock();
}
}
}
}
// 啟動(dòng)10個(gè)線(xiàn)程
for (int i = 0; i < 10; i++) {
Worker w = new Worker();
w.setDaemon(true);
w.start();
}
// 每隔1秒換行
for (int i = 0; i < 10; i++) {
SleepUtils.second(1);
System.out.println();
}
}
public static void main(String[] args) {
new TwinsLockTest().test();
}
}
參考文獻(xiàn):
[1] AbstractQueuedSynchronizer的介紹和原理分析
[2] 扒一扒ReentrantLock以及AQS實(shí)現(xiàn)原理 原
[3] 分析ReentrantLock的實(shí)現(xiàn)原理
[4] 《Java并發(fā)編程藝術(shù)》