關(guān)鍵字:AQS店量、自旋大年、CAS换薄、LockSupport、CLH阻塞隊(duì)列
1. AQS
Semaphore的相關(guān)操作主要由其內(nèi)部成員變量sync完成翔试,sync有兩種轻要,分別是支持公平鎖的FairSync和不公平鎖的NonfairSync,兩種都是基于AQS擴(kuò)展而來(lái)垦缅。我們?cè)诼暶饕粋€(gè)信號(hào)量對(duì)象的時(shí)候冲泥,sync便在構(gòu)造函數(shù)里被初始化。這里先簡(jiǎn)單介紹以下AQS失都,后續(xù)會(huì)出一篇文章詳細(xì)解讀柏蘑。
AQS全名為AbstractQueuedSynchronizer,即抽象隊(duì)列同步器粹庞,是并發(fā)包作者Doug Lea為了解決在Java 1.5之前synchronized性能問(wèn)題而開(kāi)發(fā)的并發(fā)框架咳焚,主要實(shí)現(xiàn)有ReentrantLock,ReentrantReadWriteLock, CountDownLatch, Semaphore等庞溜,和synchronized對(duì)標(biāo)的便是ReentrantLock革半。
AQS內(nèi)維護(hù)了一個(gè)volatile類型的int 成員變量state,以及一個(gè)雙向CLH隊(duì)列流码,線程嘗試修改state屬性值又官,修改成功便表明成功獲取鎖,否則進(jìn)入CLH隊(duì)列并阻塞漫试,直到持有鎖的線程釋放六敬,并喚醒CLH隊(duì)列中的線程。
2. 初始化信號(hào)量
我們?cè)诔跏蓟疭emaphore的時(shí)候驾荣,便指定了state的值外构,表明可以獲取的最大信號(hào)量,線程嘗試獲取信號(hào)量即對(duì)state減去相應(yīng)的值播掷,修改成功便表明成功獲取信號(hào)量审编,否則進(jìn)入CLH隊(duì)列并阻塞,直到持有信號(hào)量的線程釋放歧匈,state加1垒酬,并喚醒CLH隊(duì)列中的全部線程。
sync由構(gòu)造函數(shù)進(jìn)行初始化
//構(gòu)造permits個(gè)數(shù)量的不公平鎖
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
//根據(jù)fair構(gòu)造permits個(gè)數(shù)量的公平&不公平鎖
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
3. 加鎖操作(獲取信號(hào)量)
Semaphore提供了8中常用的加鎖操作,可分為三大類勘究,即獲取一定數(shù)量的共享鎖&是否支持中斷&獲取不到是否阻塞矮湘,以下8中操作便是其兩兩組合。
//通過(guò)sync獲取共享鎖(可中斷)
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
//通過(guò)sync獲取共享鎖(不可中斷)
public void acquireUninterruptibly() {
sync.acquireShared(1);
}
//嘗試獲取1個(gè)共享鎖乱顾,獲取不到則立刻返回false板祝,不進(jìn)行阻塞
public boolean tryAcquire() {
return sync.nonfairTryAcquireShared(1) >= 0;
}
//嘗試獲取1個(gè)共享鎖宫静,獲取不到則等待timeout時(shí)間后返回false
public boolean tryAcquire(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
//嘗試獲取permits個(gè)共享鎖走净,獲取不到則立刻返回false,不進(jìn)行阻塞
public boolean tryAcquire(int permits) {
if (permits < 0) throw new IllegalArgumentException();
return sync.nonfairTryAcquireShared(permits) >= 0;
}
//嘗試獲取permits個(gè)共享鎖孤里,獲取不到則等待timeout時(shí)間后返回false
public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
}
先來(lái)看一下獲取可中斷鎖
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
//線程是否被中斷,中斷則拋出中斷異常
if (Thread.interrupted())
throw new InterruptedException();
// 1.首先嘗試獲取共享鎖
// 2.獲取成功則進(jìn)行相應(yīng)的業(yè)務(wù)邏輯,獲取失敗進(jìn)入阻塞隊(duì)列
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
獲取信號(hào)量鎖時(shí)又分為公平鎖和不公平鎖陵霉,以下分別是兩種鎖是如何獲取的
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;
FairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
//自旋
for (;;) {
//阻塞隊(duì)列中是否已經(jīng)有節(jié)點(diǎn)在等待貌夕,如有則直接返回獲取失敗
//這個(gè)判斷就是和不公平鎖的區(qū)別,不公平鎖不管隊(duì)列中是否有節(jié)點(diǎn)等待虏等,上來(lái)就搶鎖
if (hasQueuedPredecessors())
return -1;
//通過(guò)CAS設(shè)置state
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
//調(diào)用父方法
/*
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
*/
return nonfairTryAcquireShared(acquires);
}
}
獲取鎖失敗請(qǐng)求入隊(duì)
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
//將當(dāng)前節(jié)點(diǎn)放入阻塞隊(duì)列
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
// 自旋
for (;;) {
//獲取當(dāng)前節(jié)點(diǎn)的上一節(jié)點(diǎn)
final Node p = node.predecessor();
//上一節(jié)點(diǎn)如果是隊(duì)頭
if (p == head) {
//再次嘗試獲取args數(shù)量的共享鎖弄唧,r為剩余的共享數(shù)量
int r = tryAcquireShared(arg);
//獲取成功
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
//上一節(jié)點(diǎn)如果不是隊(duì)頭,即阻塞隊(duì)列中已經(jīng)有節(jié)點(diǎn)在等待或者是隊(duì)頭但獲取鎖失敗則執(zhí)行以下邏輯
//1.將當(dāng)前節(jié)點(diǎn)的有效前驅(qū)節(jié)點(diǎn)標(biāo)示為可喚醒狀態(tài)
//2.將當(dāng)前節(jié)點(diǎn)阻塞霍衫,等待被喚醒或中斷
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
//獲取當(dāng)前節(jié)點(diǎn)的上一個(gè)節(jié)點(diǎn)
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
/*
*
* pred 上一個(gè)節(jié)點(diǎn)
* node 當(dāng)前節(jié)點(diǎn)
* CANCELLED = 1;SIGNAL = -1;CONDITION = -2;PROPAGATE = -3;
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//上一個(gè)節(jié)點(diǎn)已經(jīng)處于可喚醒狀態(tài)則直接返回
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
//節(jié)點(diǎn)已失效候引,將失效節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)賦值為當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn),直到前驅(qū)節(jié)點(diǎn)不存在已經(jīng)取消的情況
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
//通過(guò)CAS將有效的前驅(qū)節(jié)點(diǎn)的狀態(tài)修改為可喚醒狀態(tài)
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private static final boolean compareAndSetWaitStatus(Node node,
int expect,
int update) {
return unsafe.compareAndSwapInt(node, waitStatusOffset,
expect, update);
}
private final boolean parkAndCheckInterrupt() {
//線程阻塞在這里
LockSupport.park(this);
//線程被喚醒時(shí)從這里開(kāi)始執(zhí)行
return Thread.interrupted();
}
現(xiàn)在來(lái)對(duì)比分析一下嘗試獲取(tryAcquire)敦跌、不可中斷澄干、最大嘗試時(shí)間分別是如何處理的
tryAcquire
//嘗試獲取,獲取不到直接返回了
public boolean tryAcquire() {
return sync.nonfairTryAcquireShared(1) >= 0;
}
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
設(shè)置最大獲取時(shí)間
private boolean doAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
//注意這里的隊(duì)頭只是一個(gè)虛擬節(jié)點(diǎn)柠傍,真正存放線程的節(jié)點(diǎn)為隊(duì)列的第二個(gè)節(jié)點(diǎn)麸俘,以下提到的隊(duì)頭同樣
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return true;
}
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
//區(qū)別在這里,線程只會(huì)park一定時(shí)間惧笛,過(guò)期后再次嘗試獲取失敗便直接返回
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
不可中斷鎖
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
//和可中斷鎖區(qū)別在這里从媚,可中斷這里直接拋出異常了,但是不可中斷鎖只是設(shè)置一個(gè)值便又去獲取了
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
4. 解鎖操作
//調(diào)用sync的releaseShared方法進(jìn)行解鎖患整,每次解鎖數(shù)量為1
public void release() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
//通過(guò)自旋+CAS將共享鎖的數(shù)量加回去
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
//CLH隊(duì)列不為空拜效,即有線程在等待獲取鎖
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
//喚醒頭節(jié)點(diǎn)的next節(jié)點(diǎn)
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
//剛才被喚醒的線程已將head設(shè)置為head的下一節(jié)點(diǎn),所以這里不會(huì)相等
//所以這里一般會(huì)多喚醒一次并级,假如多喚醒的節(jié)點(diǎn)獲取到鎖拂檩,重復(fù)此邏輯,否則多喚醒的節(jié)點(diǎn)會(huì)繼續(xù)阻塞
if (h == head) // loop if head changed
break;
}
}
這里再貼一下節(jié)點(diǎn)被喚醒時(shí)的邏輯
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
//線程被喚醒后從這里再次執(zhí)行
}
} finally {
if (failed)
cancelAcquire(node);
}
}