目錄
AQS
- AQS是ReentrantLock撕予,CyclicBarrier乞封,CountDownLatch做裙,Semaphore,ArrayBlockingQueue的基礎(chǔ)肃晚,深入理解AQS很有必要
數(shù)據(jù)結(jié)構(gòu)
-
sync隊(duì)列(雙端隊(duì)列)
sync隊(duì)列.png -
condition(單向隊(duì)列)
condition.png
繼承關(guān)系
// AOS主要設(shè)置/獲取獨(dú)占線程
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable
示例
- 例子是直接照搬參考文章leesf寫的AQS源碼
package com.hust.grid.leesf.reentrantLock;
class Consumer {
private Depot depot;
public Consumer(Depot depot) {
this.depot = depot;
}
public void consume(int no) {
new Thread(new Runnable() {
@Override
public void run() {
depot.consume(no);
}
}, no + " consume thread").start();
}
}
class Producer {
private Depot depot;
public Producer(Depot depot) {
this.depot = depot;
}
public void produce(int no) {
new Thread(new Runnable() {
@Override
public void run() {
depot.produce(no);
}
}, no + " produce thread").start();
}
}
public class ReentrantLockDemo {
public static void main(String[] args) throws InterruptedException {
Depot depot = new Depot(500);
new Producer(depot).produce(500);
new Producer(depot).produce(200);
new Consumer(depot).consume(500);
new Consumer(depot).consume(200);
}
}
根據(jù)例子分析源碼
-
參考文章leesf寫的AQS源碼提供的時(shí)序圖锚贱,p1代表produce 500的那個(gè)線程,p2代表produce 200的那個(gè)線程关串,c1代表consume 500的那個(gè)線程拧廊,c2代表consume 200的那個(gè)線程
時(shí)序圖.png
lock調(diào)用分析圖
-
也是leesf寫的AQS源碼提供的分析圖
lock調(diào)用分析圖.png
p1調(diào)用lock.lock()獲取鎖
- ReentrantLock::lock --> 默認(rèn)非公平鎖ReentrantLock::NonfairSync::lock,根據(jù)時(shí)序圖這里p1獲取到鎖走if分支
final void lock() {
// state之前為0能設(shè)置成1就獲取到鎖晋修,并設(shè)置獨(dú)占鎖的線程
if (compareAndSetState(0, 1))
// AOS的方法
setExclusiveOwnerThread(Thread.currentThread());
// 獲取不到鎖吧碾,后續(xù)分析
else
acquire(1);
}
p2調(diào)用lock.lock()未獲取鎖
- 由p1獲取到鎖流程分析,p2未獲取到鎖走else流程
- ReentrantLock::lock --> 默認(rèn)非公平鎖ReentrantLock::NonfairSync::lock -->
AQS::acquire
// AQS acquire
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
- AQS::acquire --> ReentrantLock::Sync::tryAcquire, 這里有可重入鎖為啥可重入原因墓卦,根據(jù)時(shí)序圖分析p2 tryAcquire即不走if也不走if else流程倦春,直接返回fasle
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 可重入鎖可重入是因?yàn)橛性O(shè)置獨(dú)占線程的功能,如果是這個(gè)線程則state++
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
- AQS::acquire --> ReentrantLock::Sync::tryAcquire方法中addWaiter(Node.EXCLUSIVE)這里ReentrantLock是設(shè)置獨(dú)占鎖落剪,即同一時(shí)間只有一個(gè)線程能占有鎖睁本,共享鎖是同一時(shí)間有多個(gè),比如讀鎖共享忠怖,寫鎖獨(dú)占
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// 剛開始都為null
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 剛開始走這里
enq(node);
return node;
}
private Node enq(final Node node) {
// 剛開始時(shí)呢堰,第一個(gè)for循環(huán)創(chuàng)建空的head節(jié)點(diǎn),第二個(gè)for頭節(jié)點(diǎn)后面加入p2
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
- AQS::acquire --> ReentrantLock::Sync::tryAcquire方法中acquireQueued凡泣,細(xì)節(jié)可以看中文注解部分
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// p2的前面一個(gè)節(jié)點(diǎn)是頭節(jié)點(diǎn)枉疼,所以會嘗試獲取鎖
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 剛開始節(jié)點(diǎn)狀態(tài)為0,shouldParkAfterFailedAcquire將其設(shè)置為singal節(jié)點(diǎn)(-1)
// 然后LockSupport.park將線程掛起
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
- 以上是整個(gè)lock調(diào)用流程
c1鞋拟,c2調(diào)用lock
-
狀態(tài)如下(根據(jù)時(shí)序圖此時(shí)沒有調(diào)用unlock)往衷,先是c1被加入到sync隊(duì)列尾部,再是c2加入到sync隊(duì)列尾部
c1严卖,c2調(diào)用lock.png
中間穿插p1調(diào)用emptyCondition.signal
- AQS::ConditionObject::signal目前沒有emptyCondition await的部分席舍,所以不起啥作用
unlock調(diào)用分析
- ReentrantLock::unlock --> AQS::release
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
- AQS::release先是調(diào)用ReentrantLock::tryRelease
protected final boolean tryRelease(int releases) {
// 釋放state并釋放獨(dú)占線程
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
- AQS::release再是調(diào)用AQS::unparkSuccessor
private void unparkSuccessor(Node node) {
// 此時(shí)正常狀態(tài)是singal,將其設(shè)置為初始狀態(tài)
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 走的這步unpark head后面的線程
if (s != null)
LockSupport.unpark(s.thread);
}
此時(shí)head后面線程從park中恢復(fù)哮笆,在cpu時(shí)間片獲取到的情況下来颤,繼續(xù)for循環(huán)嘗試獲取鎖汰扭。即此線程unlcok,會解鎖下個(gè)線程嘗試獲取鎖
-
unlock后變成
p1 unlock后.png
condition的await分析
-
p2線程執(zhí)行fullCondition.await
圖示.png
AQS:ConditionObject(CO):await
public final void await() throws InterruptedException {
// 對中斷做出處理福铅,拋出讓調(diào)用方接
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
// fullyRelease最終調(diào)用tryRelease方法萝毛,會釋放當(dāng)前線程持有的鎖,并通知后繼線程unpark
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
// 將當(dāng)前線程park
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
-
addConditionWaiter方法是在Condition隊(duì)列生成一個(gè)滑黔,Node.CONDITION屬性
Condition隊(duì)列.png -
fullyRelease最終調(diào)用tryRelease方法笆包,會釋放當(dāng)前線程持有的鎖,并通知后繼線程unpark略荡。還會把自己線程park
最終狀態(tài).png
c1線程獲取鎖
-
c1線程被unpark之后獲取cpu時(shí)間片庵佣,執(zhí)行前面提到的acquireQueued函數(shù),之后汛兜,c1判斷自己的前驅(qū)結(jié)點(diǎn)為head巴粪,并且可以獲取鎖資源, 然后獲取鎖。 最終是Condition有p2線程節(jié)點(diǎn)
c1線程獲取鎖.png
fullCondtion.signal分析
-
圖示分析
signal分析.png - AQS::CO::signal --> AQS::CO::doSignal --> AQS::CO::transferForSignal -->
AQS::CO::compareAndSetWaitStatus
final boolean transferForSignal(Node node) {
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
-
transferForSignal的enq就是將condition的p2節(jié)點(diǎn)入隊(duì)尾
enq.png -
然后是設(shè)置status, compareAndSetWaitStatus
設(shè)置status.png
c1.unclock
c2線程執(zhí)行emptyCondition.await
-
p2線程被unpark粥谬,故可以繼續(xù)運(yùn)行肛根,經(jīng)過CPU調(diào)度后,p2繼續(xù)運(yùn)行漏策,之后p2線程在AQS:await函數(shù)中被park派哲,繼續(xù)AQS.CO:await函數(shù)的運(yùn)行
p2線程被unpark.png
p2繼續(xù)運(yùn)行,執(zhí)行emptyCondition.signal
p2執(zhí)行unclock
總結(jié)
- 每一個(gè)結(jié)點(diǎn)都是由前一個(gè)結(jié)點(diǎn)喚醒
- 當(dāng)結(jié)點(diǎn)發(fā)現(xiàn)前驅(qū)結(jié)點(diǎn)是head并且嘗試獲取成功掺喻,則會輪到該線程運(yùn)行狮辽。
- condition queue中的結(jié)點(diǎn)向sync queue中轉(zhuǎn)移是通過signal操作完成的。
- 當(dāng)結(jié)點(diǎn)的狀態(tài)為SIGNAL時(shí)巢寡,表示后面的結(jié)點(diǎn)需要運(yùn)行
ReentrantLock
- AQS獨(dú)占鎖實(shí)現(xiàn)喉脖,公平是通過sync隊(duì)列按照先后順序?qū)崿F(xiàn)的,非公平有可能被其他線程CAS設(shè)置state而搶占
- 具體實(shí)現(xiàn)同AQS舉例的
CyclicBarrier
- 通過它可以實(shí)現(xiàn)讓一組線程等待至某個(gè)狀態(tài)之后再全部同時(shí)執(zhí)行抑月。
- AQS獨(dú)占鎖
CountDownLatch
- 場景: 主線程等待多個(gè)線程完成任務(wù)树叽。AQS共享模式
- 數(shù)據(jù)結(jié)構(gòu)類似ReentrantLock,但是沒有公平非公平
// 內(nèi)部類
private static final class Sync extends AbstractQueuedSynchronizer
- 構(gòu)造函數(shù)谦絮,初始化state數(shù)
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
// 初始化狀態(tài)數(shù) setState
this.sync = new Sync(count);
}
例子
- 參考自lessf的博客
package com.hust.grid.leesf.cyclicbarrier;
import java.util.concurrent.CountDownLatch;
class MyThread extends Thread {
private CountDownLatch countDownLatch;
public MyThread(String name, CountDownLatch countDownLatch) {
super(name);
this.countDownLatch = countDownLatch;
}
public void run() {
System.out.println(Thread.currentThread().getName() + " doing something");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " finish");
countDownLatch.countDown();
}
}
public class CountDownLatchDemo {
public static void main(String[] args) {
CountDownLatch countDownLatch = new CountDownLatch(2);
MyThread t1 = new MyThread("t1", countDownLatch);
MyThread t2 = new MyThread("t2", countDownLatch);
t1.start();
t2.start();
System.out.println("Waiting for t1 thread and t2 thread to finish");
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " continue");
}
}
總體流程
await分析
-
main線程就被park了题诵,即禁止運(yùn)行了。此時(shí)Sync queue(同步隊(duì)列)中有兩個(gè)節(jié)點(diǎn)层皱,AQS的state為2性锭,包含main線程的結(jié)點(diǎn)的nextWaiter指向SHARED結(jié)點(diǎn)
await分析.png
t1線程執(zhí)行countDownLatch.countDown
-
Sync queue隊(duì)列里的結(jié)點(diǎn)個(gè)數(shù)未發(fā)生變化,但是此時(shí)叫胖,AQS的state已經(jīng)變?yōu)?了
t1線程執(zhí)行countDownLatch.countDown.png
t2線程執(zhí)行countDownLatch.countDown
-
main線程獲取cpu資源草冈,繼續(xù)運(yùn)行,由于main線程是在parkAndCheckInterrupt函數(shù)中被禁止的,所以此時(shí)怎棱,繼續(xù)在parkAndCheckInterrupt函數(shù)運(yùn)行
繼續(xù)運(yùn)行.png
Semaphore
- 內(nèi)部類跟ReentrantLock類似有公平非公平鎖之分哩俭,使用的是AQS共享鎖
- 使用場景,數(shù)據(jù)庫連接之類的多個(gè)線程共同持有資源
示例
package com.hust.grid.leesf.semaphore;
import java.util.concurrent.Semaphore;
class MyThread extends Thread {
private Semaphore semaphore;
public MyThread(String name, Semaphore semaphore) {
super(name);
this.semaphore = semaphore;
}
public void run() {
int count = 3;
System.out.println(Thread.currentThread().getName() + " trying to acquire");
try {
semaphore.acquire(count);
System.out.println(Thread.currentThread().getName() + " acquire successfully");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release(count);
System.out.println(Thread.currentThread().getName() + " release successfully");
}
}
}
public class SemaphoreDemo {
public final static int SEM_SIZE = 10;
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(SEM_SIZE);
MyThread t1 = new MyThread("t1", semaphore);
MyThread t2 = new MyThread("t2", semaphore);
t1.start();
t2.start();
int permits = 5;
System.out.println(Thread.currentThread().getName() + " trying to acquire");
try {
semaphore.acquire(permits);
System.out.println(Thread.currentThread().getName() + " acquire successfully");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
System.out.println(Thread.currentThread().getName() + " release successfully");
}
}
}
生成10個(gè)許可圖示
-
main線程執(zhí)行acquire操作拳恋,并且成功獲得許可凡资,之后t1線程執(zhí)行acquire操作,成功獲得許可谬运,之后t2執(zhí)行acquire操作隙赁,由于此時(shí)許可數(shù)量不夠,t2線程將會阻塞梆暖,直到許可可用伞访。之后t1線程釋放許可,main線程釋放許可式廷,此時(shí)的許可數(shù)量可以滿足t2線程的要求,所以芭挽,此時(shí)t2線程會成功獲得許可運(yùn)行滑废,t2運(yùn)行完成后釋放許可
生成10個(gè)許可圖示.png
semaphore::acquire分析
-
main調(diào)用semaphore::acquire
main調(diào)用semaphore::acquire.png -
t1調(diào)用semaphore::acquire
t1調(diào)用semaphore::acquire.png -
t2調(diào)用semaphore::acquire,此時(shí)資源不足
t2調(diào)用semaphore::acquire.png
semaphore::release分析
-
t1 release, t2線程將會被unpark袜爪,并且AQS的state為5蠕趁,t2獲取cpu資源后可以繼續(xù)運(yùn)行
t1 release .png -
t2獲取CPU資源,繼續(xù)運(yùn)行, setHeadAndPropagate會調(diào)用doReleaseShared辛馆,在setHeadAndPropagate的函數(shù)中會設(shè)置頭結(jié)點(diǎn)并且會unpark隊(duì)列中的其他結(jié)點(diǎn)俺陋。(與獨(dú)占模式很重要區(qū)別)
t2獲取CPU資源,繼續(xù)運(yùn)行.png
ArrayBlockingQueue
- ArrayBlockingQueue是通過ReentrantLock和Condition條件來保證多線程的正確訪問的昙篙±白矗可以參考AQS的分析。
ReentrantReadWriteLock
- ReentrantReadWriteLock底層是基于ReentrantLock和AQS來實(shí)現(xiàn)的
結(jié)構(gòu)
-
Sync繼承自AQS苔可、NonfairSync繼承自Sync類缴挖、FairSync繼承自Sync類狡逢;ReadLock實(shí)現(xiàn)了Lock接口炮车、WriteLock也實(shí)現(xiàn)了Lock接口
結(jié)構(gòu).png
總體
- 寫鎖獨(dú)占鎖
- 讀鎖共享鎖
- 讀讀不沖突,讀寫沖突
- 有讀鎖存在時(shí)袱讹,寫獲取鎖失敗進(jìn)入sync隊(duì)列原因是同蜻,寫線程存在讀線程也會進(jìn)入等待棚点。寫線程等待獲取鎖后,后續(xù)讀線程不會再獲取到鎖湾蔓,寫線程等待前面讀線程釋放鎖瘫析。才獲取到鎖,寫線程后面的讀線程才能獲取到鎖。
- 喚醒其后到下一個(gè)嘗試獲取鎖的的節(jié)點(diǎn)之間的所有嘗試獲取讀鎖的線程
// 寫鎖獲取鎖
protected final boolean tryAcquire(int acquires) {
// 獲取當(dāng)前線程
Thread current = Thread.currentThread();
// 獲取狀態(tài)
int c = getState();
// 寫線程數(shù)量
int w = exclusiveCount(c);
if (c != 0) { // 狀態(tài)不為0
// (Note: if c != 0 and w == 0 then shared count != 0)
if (w == 0 || current != getExclusiveOwnerThread()) // 寫線程數(shù)量為0(說明存在讀鎖)或者當(dāng)前線程沒有占有獨(dú)占資源
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT) // 判斷是否超過最高寫線程數(shù)量
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
// 設(shè)置AQS狀態(tài)
setState(c + acquires);
return true;
}
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires)) // 寫線程是否應(yīng)該被阻塞
return false;
// 設(shè)置獨(dú)占線程
setExclusiveOwnerThread(current);
return true;
}
// 讀鎖獲取鎖颁股,此函數(shù)表示讀鎖線程獲取讀鎖么库。首先判斷寫鎖是否為0并且當(dāng)前線程不占有獨(dú)占鎖,直接返回甘有;否則诉儒,判斷讀線程是否需要被阻塞并且讀鎖數(shù)量是否小于最大值并且比較設(shè)置狀態(tài)成功
protected final int tryAcquireShared(int unused) {
// 獲取當(dāng)前線程
Thread current = Thread.currentThread();
// 獲取狀態(tài)
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current) // 寫線程數(shù)不為0并且占有資源的不是當(dāng)前線程
return -1;
// 讀鎖數(shù)量
int r = sharedCount(c);
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) { // 讀線程是否應(yīng)該被阻塞、并且小于最大值亏掀、并且比較設(shè)置成功
if (r == 0) { // 讀鎖數(shù)量為0
// 設(shè)置第一個(gè)讀線程
firstReader = current;
// 讀線程占用的資源數(shù)為1
firstReaderHoldCount = 1;
} else if (firstReader == current) { // 當(dāng)前線程為第一個(gè)讀線程
// 占用資源數(shù)加1
firstReaderHoldCount++;
} else { // 讀鎖數(shù)量不為0并且不為當(dāng)前線程
// 獲取計(jì)數(shù)器
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) // 計(jì)數(shù)器為空或者計(jì)數(shù)器的tid不為當(dāng)前正在運(yùn)行的線程的tid
// 獲取當(dāng)前線程對應(yīng)的計(jì)數(shù)器
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0) // 計(jì)數(shù)為0
// 設(shè)置
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
- 當(dāng)讀線程執(zhí)行unlock操作后忱反,AQS的state為0(有多個(gè)讀時(shí)需要等帶多個(gè)讀unlock),寫線程將會被unpark滤愕,其獲得CPU資源就可以運(yùn)行温算。
參考文章
- 【JUC】JDK1.8源碼分析之AbstractQueuedSynchronizer(二)
- 【JUC】JDK1.8源碼分析之ReentrantLock(三)
- 【JUC】JDK1.8源碼分析之ArrayBlockingQueue(三)
- 【JUC】JDK1.8源碼分析之CountDownLatch(五)
- 【JUC】JDK1.8源碼分析之CyclicBarrier(四)
- 【JUC】JDK1.8源碼分析之Semaphore(六)
- 【JUC】JDK1.8源碼分析之ReentrantReadWriteLock(七)
- CountDownLatch、CyclicBarrier间影、Semaphore的區(qū)別
- CountDownLatch注竿、CyclicBarrier、Semaphore共同之處與區(qū)別以及各自使用場景
- 【Java并發(fā)編程】詳細(xì)分析AQS原理之共享鎖
- 理解java線程的中斷(interrupt)
- 【AQS】獨(dú)占鎖與共享鎖