1.概述
ReentrantLock
、CountDownLatch
奕扣、Semaphore
底層都是基于AQS
實(shí)現(xiàn)的薪鹦,其中tryAcquire
、tryRelease
惯豆、tryAcquireShared
池磁、tryReleaseShared
即加鎖以及釋放鎖的邏輯則由這三個(gè)子類自己實(shí)現(xiàn)。AQS的實(shí)現(xiàn)細(xì)節(jié)詳見AbstractQueuedSynchronizer(AQS)中獨(dú)占模式與共享模式的設(shè)計(jì)與實(shí)現(xiàn)循帐。
2.ReentrantLock
2.1 ReentrantLock
中加鎖邏輯的實(shí)現(xiàn)
ReentrantLock
中擁有Sync
類型的內(nèi)部類框仔,其中Sync
繼承AQS
,采用獨(dú)占模式下的AQS
拄养,Sync
的子類包括NonfairSync
(非公平)與FairSync
(公平)兩種模式,所有對(duì)AQS
的操作便通過Sync
進(jìn)行。在初始化的時(shí)候會(huì)實(shí)例化一個(gè)Sync
對(duì)象瘪匿,默認(rèn)實(shí)現(xiàn)為非公平模式跛梗。
/**
* Creates an instance of {@code ReentrantLock}.
* This is equivalent to using {@code ReentrantLock(false)}.
*/
public ReentrantLock() {
sync = new NonfairSync();
}
同時(shí)也可以指定Sync
的實(shí)現(xiàn)為公平模式or非公平模式
/**
* Creates an instance of {@code ReentrantLock} with the
* given fairness policy.
*
* @param fair {@code true} if this lock should use a fair ordering policy
*/
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
2.1.1 ReentrantLock
非公平模式下的加鎖實(shí)現(xiàn)
非公平模式下獲取鎖的邏輯,線程第一次進(jìn)入的時(shí)候會(huì)利用cas操作更新state
狀態(tài)為1棋弥,如果更新成功核偿,則設(shè)置當(dāng)前線程為owner;當(dāng)cas操作失斖缛尽(包括兩種情況1.重入鎖邏輯:已經(jīng)獲取鎖的線程再次調(diào)用lock
方法漾岳;2.當(dāng)前線程持有鎖,其他線程調(diào)用lock
方法粉寞,即持有鎖的線程和調(diào)用lock
方法的線程不是同一個(gè))尼荆,則進(jìn)入到acquire
方法中,acquire
會(huì)優(yōu)先調(diào)用子類的tryAcquire
方法唧垦,具體獲取鎖的實(shí)現(xiàn)就是在該方法中捅儒。
/**
* Sync object for non-fair locks
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
/*第一次獲取鎖時(shí),設(shè)置state為1*/
if (compareAndSetState(0, 1))
/*設(shè)置當(dāng)前線程為owner*/
setExclusiveOwnerThread(Thread.currentThread());
else
/*搶奪鎖以及重入鎖邏輯*/
acquire(1);
}
/*獲取鎖的具體實(shí)現(xiàn)*/
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
tryAcquire
中調(diào)用nonfairTryAcquire
方法振亮,采用cas操作更新state
的值巧还,成功后就設(shè)置當(dāng)前線程為owner,代表獲取鎖成功坊秸;如果當(dāng)前線程等于owner麸祷,就累加state
值(重入鎖的實(shí)現(xiàn)邏輯),多次調(diào)用lock
方法加鎖褒搔,解鎖時(shí)也要調(diào)用同樣次數(shù)的unlock
方法阶牍,直到state
值減為0。
/**
* Performs non-fair tryLock. tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
*/
final boolean nonfairTryAcquire(int acquires) {
/*獲取當(dāng)前線程*/
final Thread current = Thread.currentThread();
/*獲取state值*/
int c = getState();
if (c == 0) {
/*cas操作更新state狀態(tài)*/
if (compareAndSetState(0, acquires)) {
/*更新成功后設(shè)置當(dāng)前線程為owner*/
setExclusiveOwnerThread(current);
return true;
}
}
/*如果當(dāng)前線程是onwer(重入鎖邏輯)*/
else if (current == getExclusiveOwnerThread()) {
/*獲取state的值站超,并且加上本次的acquires再更新state*/
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
/*獲取鎖失敗*/
return false;
}
2.1.2 ReentrantLock
公平模式下的加鎖實(shí)現(xiàn)
公平模式下的加鎖實(shí)現(xiàn)與非公平的比較在于多了hasQueuedPredecessors
方法的判斷荸恕,該方法主要是判斷阻塞隊(duì)列的隊(duì)首是否含有元素以及如果隊(duì)首有元素是否與當(dāng)前線程相等,目的其實(shí)就是檢測(cè)有沒有排在當(dāng)前線程之前的線程需要出隊(duì)死相,嚴(yán)格保證先進(jìn)先出融求。tryAcquire
除了在阻塞線程被喚醒的時(shí)候會(huì)被調(diào)用外,外部代碼調(diào)用lock
方法的時(shí)候也會(huì)間接調(diào)用到算撮,當(dāng)外部代碼調(diào)用lock
時(shí)生宛,就會(huì)通過hasQueuedPredecessors
方法校驗(yàn)阻塞隊(duì)列中是否還有等待中的線程,如果有肮柜,就不會(huì)讓該線程獲取鎖陷舅。
/**
* Sync object for fair locks
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
/*hasQueuedPredecessors用于判斷AQS的阻塞隊(duì)列里是否有等待中的線程*/
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
hasQueuedPredecessors
的實(shí)現(xiàn)
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
2.2 ReentrantLock
釋放鎖邏輯
公平模式與非公平模式下釋放鎖的實(shí)現(xiàn)都是相同的,tryRelease
方法在Sync
類中實(shí)現(xiàn)审洞。釋放鎖的過程就是減少state
的值直到0(因此在重入鎖的情況下莱睁,調(diào)用unlock
的次數(shù)要與lock
的次數(shù)相同),同時(shí)將owner置空。
protected final boolean tryRelease(int releases) {
/*獲取state值并減少對(duì)應(yīng)次數(shù)*/
int c = getState() - releases;
/*判斷當(dāng)前線程是否為owner*/
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
/*state減為0后仰剿,設(shè)置owner為null*/
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
2.3 ReentrantLock
中lockInterruptibly
響應(yīng)中斷原理
當(dāng)線程調(diào)用lock
方法后并處于阻塞狀態(tài)下创淡,該線程是無法響應(yīng)中斷的,只有獲取到鎖之后才能響應(yīng)中斷南吮,但是使用lockInterruptibly
就能在阻塞狀態(tài)下響應(yīng)中斷琳彩。lockInterruptibly
方法調(diào)用acquireInterruptibly
(該方法在AQS
中實(shí)現(xiàn))
public final void acquireInterruptibly(int arg)
throws InterruptedException {
/*判斷是否被中斷過,如果被中斷過則直接拋出異常*/
if (Thread.interrupted())
throw new InterruptedException();
/*獲取鎖失敗則調(diào)用doAcquireInterruptibly方法*/
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
重點(diǎn)關(guān)注doAcquireInterruptibly
方法部凑,該方法的實(shí)現(xiàn)與acquireQueued
非常相似(acquireQueued
實(shí)現(xiàn)可以參考AQS)露乏,區(qū)別在于阻塞狀態(tài)下的線程因?yàn)橹袛啾粏拘褧r(shí),parkAndCheckInterrupt
返回true涂邀,就會(huì)拋出InterruptedException
異常瘟仿。
/**
* Acquires in exclusive interruptible mode.
* @param arg the acquire argument
*/
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
/*線程因?yàn)橹袛啾粏拘押笾苯訏伋霎惓?/
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
3.CountDownLatch
CountDownLatch
同樣也擁有一個(gè)Sync
類型的內(nèi)部類,其中Sync
繼承AQS
必孤,使用的是共享模式下的AQS
猾骡,Sync
的子類只有一個(gè),在初始化的時(shí)候?qū)嵗?code>Sync的實(shí)現(xiàn)如下敷搪,包括構(gòu)造函數(shù)(給state
賦初始化值)兴想,獲取鎖邏輯(判斷state
值是否為0,為0則返回1否則返回-1)赡勘,釋放鎖邏輯(state
值減少1)嫂便。
/**
* Synchronization control For CountDownLatch.
* Uses AQS state to represent count.
*/
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
/*初始化的時(shí)候state的值*/
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
/*獲取鎖邏輯*/
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
/*釋放鎖邏輯*/
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
/*獲取state狀態(tài)*/
int c = getState();
if (c == 0)
return false;
/*state值減1并使用cas更新*/
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
2.1.1 CountDownLatch
初始化構(gòu)造
CountDownLatch
初始化的時(shí)候會(huì)根據(jù)傳入的值設(shè)置state
的值,當(dāng)調(diào)用countDown
方法的時(shí)候會(huì)把該值減少1闸与,直到減為0的時(shí)候毙替,阻塞中的線程就能繼續(xù)執(zhí)行。
public CountDownLatch(int count) {
/*校驗(yàn)count合法性*/
if (count < 0) throw new IllegalArgumentException("count < 0");
/*設(shè)置state初始化值*/
this.sync = new Sync(count);
}
2.1.2 await
方法實(shí)現(xiàn)原理
調(diào)用await
方法時(shí)會(huì)調(diào)用acquireSharedInterruptibly
践樱,該方法會(huì)首先判斷當(dāng)前線程是否有中斷標(biāo)記厂画,如果有則直接拋出異常。接著會(huì)調(diào)用tryAcquireShared
方法獲取鎖拷邢,該方法通過state
是否等于0決定返回1還是-1袱院,因?yàn)樵跇?gòu)造函數(shù)中,會(huì)初始化state
的值為傳遞進(jìn)去的數(shù)字瞭稼,所以在沒有調(diào)用countDown
方法的前提下(同時(shí)初始化傳遞的值為非0)忽洛,tryAcquireShared
獲取鎖是一定會(huì)失敗的,因此返回-1环肘,接著進(jìn)入AQS
提供的doAcquireSharedInterruptibly
方法中欲虚。
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
/*如果有中斷標(biāo)記則直接拋出異常*/
if (Thread.interrupted())
throw new InterruptedException();
/*獲取鎖失敗則進(jìn)入阻塞隊(duì)列*/
if (tryAcquireShared(arg) < 0)
/*AQS方法,讓當(dāng)前線程進(jìn)入阻塞隊(duì)列并阻塞*/
doAcquireSharedInterruptibly(arg);
}
doAcquireSharedInterruptibly
在該場(chǎng)景下主要完成兩件事情悔雹,1.將當(dāng)前線程加入到阻塞隊(duì)列中复哆;2.阻塞當(dāng)前線程欣喧。doAcquireSharedInterruptibly
與doAcquireShared
(doAcquireShared實(shí)現(xiàn)參考)的實(shí)現(xiàn)很相似,區(qū)別在于doAcquireSharedInterruptibly
如果因?yàn)橹袛啾粏拘鸭盘瘢蜁?huì)直接拋出異常续誉,該方法是能響應(yīng)中斷的莱没。
/**
* Acquires in shared interruptible mode.
* @param arg the acquire argument
*/
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
/*進(jìn)入阻塞隊(duì)列*/
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
/*如果是因?yàn)橹袛啾粏拘丫€程則直接拋出異常*/
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
因此當(dāng)前線程調(diào)用await
方法的時(shí)候初肉,如果state
不為0時(shí)就會(huì)被阻塞,等待其他線程調(diào)用countDown
直到state
減少到0才會(huì)繼續(xù)執(zhí)行下去饰躲,同時(shí)牙咏,如果阻塞中的線程被調(diào)用中斷方法,則會(huì)直接拋出InterruptedException
異常嘹裂。
2.1.3 countDown
方法實(shí)現(xiàn)原理
countDown
方法最終會(huì)調(diào)用CountDownLatch
實(shí)現(xiàn)的tryReleaseShared
釋放鎖方法妄壶,該方法會(huì)將state
中的值減少1,并返回state
是否等于0的信息寄狼。當(dāng)調(diào)用countDown
方法直到state
減少到0的時(shí)候丁寄,tryReleaseShared
就會(huì)返回true,通過調(diào)用AQS
中的doReleaseShared
方法泊愧,喚醒阻塞隊(duì)列中因?yàn)檎{(diào)用await
所阻塞的線程伊磺。
public void countDown() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
/*獲取鎖成功則喚醒等待隊(duì)列中的線程*/
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
3.Semaphore
3.1 Semaphore
的構(gòu)造方法
Semaphore
同樣擁有Sync
類型的內(nèi)部類,其中Sync
繼承AQS
删咱,采用共享模式下的AQS
屑埋,Sync
的子類與ReentrantLock
類似,包括NonfairSync
(非公平)與FairSync
(公平)兩種模式痰滋,默認(rèn)實(shí)現(xiàn)為非公平模式摘能。
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
同時(shí)也能指定使用哪一種模式
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
Semaphore
初始化的時(shí)候,會(huì)傳遞permits
敲街,該值會(huì)被設(shè)置為state
的值团搞。
3.2 acquire
方法實(shí)現(xiàn)原理
acquire
會(huì)調(diào)用acquireSharedInterruptibly
方法,tryAcquireShared
方法(獲取鎖邏輯)由Semaphore
自己實(shí)現(xiàn)多艇。如果獲取鎖失敗逻恐,則進(jìn)入阻塞隊(duì)列。
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
/*發(fā)現(xiàn)中斷標(biāo)記則直接拋出異常*/
if (Thread.interrupted())
throw new InterruptedException();
/*獲取鎖失敗則進(jìn)入阻塞隊(duì)列*/
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
tryAcquireShared
有兩種不同的實(shí)現(xiàn)墩蔓,先說在非公平模式下的實(shí)現(xiàn)梢莽,直接減去acquires
的大小并更新state
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
/*獲取state的值*/
int available = getState();
/*較少acquires的值并更新state*/
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
公平模式下獲取鎖的邏輯相對(duì)于非公平模式下的多了hasQueuedPredecessors
函數(shù)判斷出隊(duì)的元素是否位于隊(duì)首,如果是外部代碼調(diào)用的奸披,則判斷阻塞隊(duì)列中是否還有未出隊(duì)的元素昏名,此處邏輯與ReentrantLock
的公平鎖實(shí)現(xiàn)邏輯相似。
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
3.3 release
方法實(shí)現(xiàn)原理
release
方法會(huì)調(diào)用releaseShared
阵面,其中tryReleaseShared
(釋放鎖)成功后就會(huì)通過AQS
提供的doReleaseShared
方法喚醒阻塞隊(duì)列中的線程轻局。
public final boolean releaseShared(int arg) {
/*成功釋放則喚醒阻塞隊(duì)列中的線程*/
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
tryReleaseShared
的邏輯非常簡(jiǎn)單洪鸭,就是增加對(duì)應(yīng)的releases
值并更新state
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
/*增加state的值并更新*/
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
4.總結(jié)
對(duì)于ReentrantLock
、CountDownLatch
仑扑、Semaphore
實(shí)現(xiàn)加鎖解鎖邏輯最重要的變量是AQS
中的state
變量览爵。
-
ReentrantLock
加鎖流程就是將state
增加1,解鎖便是減1镇饮,所以調(diào)用多次lock
方法就要調(diào)用對(duì)應(yīng)次數(shù)的unlock
方法才能解鎖蜓竹。 -
CountDownLatch
初始化的時(shí)候設(shè)置state
的值,當(dāng)其他線程調(diào)用countDown
的時(shí)候就減少state
的值储藐,直到state
減少為0的時(shí)候俱济,因?yàn)檎{(diào)用await
方法而在阻塞隊(duì)列中等待的線程,就會(huì)在最后一次調(diào)用countDown
的時(shí)候被喚醒钙勃。 -
Semaphore
同樣也是初始化的時(shí)候設(shè)置state
的值蛛碌,調(diào)用acquire
成功獲取鎖的時(shí)候就會(huì)減少state
的值,如果state
減少到0則無法再成功獲取鎖辖源,線程就會(huì)進(jìn)入到阻塞隊(duì)列中蔚携;調(diào)用release
釋放鎖的時(shí)候會(huì)增加state
的值。