一蚓让、基本概念
隊(duì)列同步器在Java
并發(fā)包中的實(shí)現(xiàn)是AbstractQueuedSynchronizer
,簡(jiǎn)稱(chēng)為AQS
,它是用來(lái)構(gòu)建鎖或者其它同步組件的基礎(chǔ)框架余指。了解其實(shí)現(xiàn)原理有助于:
- 理解同步組件
ReentrantLock
和ReentrantReadWriteLock
的原理 - 理解
Condition
實(shí)現(xiàn)等待通知/模式的原理 - 根據(jù)業(yè)務(wù)場(chǎng)景,自定義同步組件(較少用到)
隊(duì)列同步器AQS
和同步組件(ReentrantLock
、ReentrantReadWriteLock
等)的區(qū)別在于:
- 同步組件面向使用者,它定義了使用者與鎖交互的接口余舶,隱藏了實(shí)現(xiàn)細(xì)節(jié)。
- 隊(duì)列同步器面向鎖的實(shí)現(xiàn)者锹淌,它簡(jiǎn)化了鎖的實(shí)現(xiàn)方式匿值,屏蔽了同步狀態(tài)管理、線(xiàn)程排隊(duì)赂摆、等待與喚醒等底層操作挟憔。
隊(duì)列同步器基于模板方法模式,它的使用方式為如下:
- 創(chuàng)建自定義同步組件的實(shí)現(xiàn)類(lèi)烟号。
- 實(shí)現(xiàn)者繼承
AbstractQueuedSynchronizer
(一般會(huì)將它作為自定義同步組件的內(nèi)部類(lèi))绊谭,根據(jù)業(yè)務(wù)場(chǎng)景,重寫(xiě)指定的模板方法tryAcquire(int acquires)
汪拥、tryRelease(int releases)
等达传。 - 創(chuàng)建同步器子類(lèi)的對(duì)象,作為自定義同步組件的成員變量迫筑,在對(duì)外提供的公共方法中宪赶,調(diào)用隊(duì)列同步器的指定方法。
以ReentrantLock
為例脯燃,ReentrantLock
為自定義的同步組件搂妻,NonfairSync
和FairSync
就是隊(duì)列同步器的實(shí)現(xiàn)類(lèi)。
public class ReentrantLock implements Lock, java.io.Serializable {
private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer {
//...
}
static final class NonfairSync extends Sync {
//...
}
static final class FairSync extends Sync {
//...
}
public void lock() {
sync.lock();
}
}
二辕棚、同步狀態(tài) & 同步隊(duì)列
在隊(duì)列同步器中有兩個(gè)關(guān)鍵的元素:
- 同步狀態(tài)
- 同步隊(duì)列
2.1 同步狀態(tài)
同步狀態(tài)欲主,用一個(gè)int
型表示,訪(fǎng)問(wèn)或者修改同步狀態(tài)需要使用指定的方法:
-
getState()
:獲取當(dāng)前同步狀態(tài)逝嚎。 -
setState(int newState)
:設(shè)置當(dāng)前同步狀態(tài)扁瓢。 -
compareAndSetState(int except, int update)
:使用CAS
設(shè)置當(dāng)前狀態(tài),該方法可以保證狀態(tài)設(shè)置的原子性懈糯。
2.2 同步隊(duì)列
同步隊(duì)列用來(lái)完成同步狀態(tài)的管理涤妒,當(dāng)前線(xiàn)程獲取同步狀態(tài)失敗時(shí),會(huì)將當(dāng)前線(xiàn)程以及等待狀態(tài)等信息構(gòu)造成一個(gè)結(jié)點(diǎn)赚哗,將其加入同步隊(duì)列她紫,同時(shí)會(huì)阻塞當(dāng)前線(xiàn)程,當(dāng)同步狀態(tài)釋放時(shí)屿储,會(huì)把首結(jié)點(diǎn)的后繼結(jié)點(diǎn)的線(xiàn)程喚醒贿讹,使其再次嘗試獲取同步狀態(tài)。
同步隊(duì)列中的結(jié)點(diǎn)保存了以下的信息:
-
thread
:線(xiàn)程引用 -
waitStatus
:等待狀態(tài)-
CANCELLED
:由于在同步隊(duì)列中等待的線(xiàn)程等待超時(shí)或者被中斷够掠,需要從同步隊(duì)列中取消等待民褂。 -
SIGNAL
:后繼結(jié)點(diǎn)的線(xiàn)程處于等待狀態(tài),而當(dāng)前結(jié)點(diǎn)的線(xiàn)程如果釋放了同步狀態(tài)或取消,將會(huì)通知后繼結(jié)點(diǎn)赊堪,使后繼結(jié)點(diǎn)的線(xiàn)程運(yùn)行面殖。 -
CONDITION
:結(jié)點(diǎn)在等待隊(duì)列中,結(jié)點(diǎn)線(xiàn)程等待在Condition
上哭廉,當(dāng)其他線(xiàn)程對(duì)Condition
調(diào)用了signal
方法后脊僚,該結(jié)點(diǎn)會(huì)從等待隊(duì)列移到同步隊(duì)列中。 -
PROPAGATE
:表示下一次共享式同步狀態(tài)獲取將會(huì)無(wú)條件傳播下去遵绰。 -
INITIAL
:初始狀態(tài)辽幌。
-
-
prev & next
:前驅(qū) & 后繼結(jié)點(diǎn)
同步器包含了兩個(gè)類(lèi)型結(jié)點(diǎn)的引用:頭結(jié)點(diǎn)和尾結(jié)點(diǎn)。設(shè)置的區(qū)別在于:
- 頭結(jié)點(diǎn)是獲取同步狀態(tài)成功的結(jié)點(diǎn)椿访,頭結(jié)點(diǎn)的線(xiàn)程在釋放同步狀態(tài)時(shí)乌企,將會(huì)喚醒后繼結(jié)點(diǎn),而后繼結(jié)點(diǎn)將會(huì)在獲取同步狀態(tài)成功時(shí)將自己設(shè)置成頭結(jié)點(diǎn)成玫。由于頭結(jié)點(diǎn)是通過(guò)獲取同步狀態(tài)成功的線(xiàn)程來(lái)完成的加酵,因此設(shè)置頭結(jié)點(diǎn)的方式不需要
CAS
來(lái)保證。 - 如果已經(jīng)有一個(gè)線(xiàn)程獲取了同步狀態(tài)梁剔,那么其他線(xiàn)程無(wú)法獲取同步狀態(tài)虽画,就會(huì)構(gòu)造結(jié)點(diǎn)假如到同步隊(duì)列當(dāng)中,為了保證線(xiàn)程安全荣病,因此需要采用
compareAndSetTail(Node expect, Node update)
的CAS
方式來(lái)設(shè)置尾結(jié)點(diǎn)码撰。
三、同步器提供的模板方法
- 獨(dú)占式獲取 & 釋放同步狀態(tài)
- 共享式獲取 & 釋放同步狀態(tài)
- 獨(dú)占式超時(shí)獲取同步狀態(tài)
3.1 獨(dú)占式同步狀態(tài)獲取與釋放
3.1.1 獲取
通過(guò)調(diào)用同步器的acquire(int arg)
方法可以獲取同步狀態(tài)个盆,該方法對(duì)中斷不敏感脖岛。當(dāng)同步狀態(tài)獲取成功后,當(dāng)前線(xiàn)程從acquire(int arg)
方法返回颊亮,如果對(duì)于鎖這種并發(fā)組件柴梆,代表著當(dāng)前線(xiàn)程獲取了鎖。其實(shí)現(xiàn)為:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
我們將其工作拆解稱(chēng)為三個(gè)部分:
Step 1 tryAcquire(arg)
調(diào)用 自定義同步器 實(shí)現(xiàn)的tryAcquire(int arg)
方法终惑,該方法需要保證線(xiàn)程安全的獲取同步狀態(tài)绍在。
Step 2 addWaiter(Node.EXCLUSIVE), arg)
如果同步狀態(tài)獲取失敗(tryAcquire
方法返回false
)雹有,則構(gòu)造同步結(jié)點(diǎn)偿渡,并通過(guò)addWaiter(Node node)
方法加入到同步隊(duì)列的尾部。為了保證能夠線(xiàn)程安全地添加霸奕,它采用了兩層保護(hù)機(jī)制:
-
CAS
:compareAndSetTail
溜宽。 - 死循環(huán):
enq
。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
//1.確保節(jié)點(diǎn)能夠線(xiàn)程安全地被添加
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//2.通過(guò)死循環(huán)來(lái)確保節(jié)點(diǎn)的正確添加质帅,在"死循環(huán)"中只有通過(guò)`CAS`將節(jié)點(diǎn)設(shè)置為尾節(jié)點(diǎn)之后适揉,當(dāng)前線(xiàn)程才能從該方法返回留攒,否則當(dāng)前線(xiàn)程不斷地進(jìn)行嘗試。
enq(node);
return node;
}
Step 3 acquireQueued(Node node, int arg)
調(diào)用acquireQueued(Node node, int arg)
方法嫉嘀,使得新構(gòu)造的同步結(jié)點(diǎn)以“死循環(huán)”的方式獲取同步狀態(tài)炼邀。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//1.1 得到當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)
final Node p = node.predecessor();
//1.2 如果當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)是頭節(jié)點(diǎn),只有在這種情況下獲取同步狀態(tài)成功
if (p == head && tryAcquire(arg)) {
//1.3 將當(dāng)前節(jié)點(diǎn)設(shè)為頭節(jié)點(diǎn)
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//2 前驅(qū)結(jié)點(diǎn)不是頭結(jié)點(diǎn)吃沪,或者無(wú)法獲取到同步狀態(tài)的情況汤善。
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
這里需要注意的有以下幾點(diǎn):
- 只有前驅(qū)結(jié)點(diǎn)是頭結(jié)點(diǎn)時(shí),才會(huì)嘗試再次調(diào)用
tryAcquire(int arg)
來(lái)獲取同步狀態(tài)票彪。這么做是為了維護(hù)同步隊(duì)列的FIFO
原則,并且也便于對(duì)過(guò)早通知的處理不狮。 - 如果當(dāng)前結(jié)點(diǎn)成功獲取到同步狀態(tài)降铸,那么會(huì)通過(guò)
setHead
方法將其設(shè)為新的頭結(jié)點(diǎn),并斷開(kāi)舊的頭結(jié)點(diǎn)與后繼結(jié)點(diǎn)的關(guān)系(p.next = null
)摇零。 - 如果無(wú)法獲取到同步狀態(tài)推掸,那么會(huì)通過(guò)
shouldParkAfterFailedAcquire
判斷是否要調(diào)用parkAndCheckInterrupt
進(jìn)入阻塞狀態(tài)。waitStatus
的默認(rèn)值是0
驻仅,因此第一次會(huì)進(jìn)入最后一個(gè)判斷谅畅,并將其waitStatue
設(shè)置為Node.SIGNAL
,之后進(jìn)入第二次的for
循環(huán)噪服,假如其前驅(qū)結(jié)點(diǎn)仍然不是頭結(jié)點(diǎn)或者無(wú)法獲取到同步狀態(tài)毡泻,那么將會(huì)進(jìn)入第一個(gè)判斷,阻塞當(dāng)前線(xiàn)程粘优。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
//阻塞當(dāng)前線(xiàn)程仇味。
LockSupport.park(this);
//如果當(dāng)前線(xiàn)程已經(jīng)被中斷,那么返回 true雹顺。
return Thread.interrupted();
}
- 如果當(dāng)前線(xiàn)程在獲取同步狀態(tài)時(shí)丹墨,已經(jīng)被中斷,即
interrupted=true
嬉愧,那么會(huì)調(diào)用selfInterrupt
方法贩挣。
3.1.2 釋放
當(dāng)前線(xiàn)程獲取同步狀態(tài)并執(zhí)行了相應(yīng)的邏輯后,就需要釋放同步狀態(tài)没酣,并調(diào)用unparkSuccessor
來(lái)喚醒后繼結(jié)點(diǎn)王财。
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
node.compareAndSetWaitStatus(ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node p = tail; p != node && p != null; p = p.prev)
if (p.waitStatus <= 0)
s = p;
}
if (s != null)
LockSupport.unpark(s.thread);
}
3.1.3 小結(jié)
由以上兩點(diǎn)分析,可以看出獨(dú)占式獲取 & 釋放同步狀態(tài)采用了模板方法的設(shè)計(jì)模式四康,AQS
內(nèi)部會(huì)基于當(dāng)前線(xiàn)程創(chuàng)建一個(gè)結(jié)點(diǎn)搪搏,并負(fù)責(zé)管理線(xiàn)程的入隊(duì)和出隊(duì),線(xiàn)程的阻塞和喚醒闪金,我們只需要重寫(xiě)重要的模板方法:
-
protected boolean tryAcquire(int arg)
:獨(dú)占式獲取同步狀態(tài)疯溺,實(shí)現(xiàn)該方法需要查詢(xún)當(dāng)前狀態(tài)并判斷同步狀態(tài)是否符合預(yù)期论颅,然后再進(jìn)行CAS
設(shè)置同步狀態(tài)。 -
protected boolean tryRelease(int arg)
:獨(dú)占式釋放同步狀態(tài)囱嫩。
下面是一個(gè)獨(dú)占式獲取 & 釋放同步狀態(tài)的簡(jiǎn)單實(shí)現(xiàn)示例:
/**
* @author lizejun
**/
public class ExclusiveLock implements Lock {
private static class Sync extends AbstractQueuedSynchronizer {
protected boolean isHeldExclusively() {
return getState() == 1;
}
@Override
protected boolean tryAcquire(int acquires) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
@Override
protected boolean tryRelease(int releases) {
if (getState() == 0) {
throw new IllegalMonitorStateException();
}
setExclusiveOwnerThread(null);
setState(0);
return true;
}
Condition newCondition() {
return new ConditionObject();
}
}
private final Sync sync = new Sync();
@Override
public void lock() {
sync.acquire(1);
}
@Override
public boolean tryLock() {
return sync.tryAcquire(1);
}
@Override
public void unlock() {
sync.release(1);
}
@Override
public void lockInterruptibly() throws InterruptedException{
sync.acquireInterruptibly(1);
}
@Override
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
@Override
public Condition newCondition() {
return sync.newCondition();
}
public boolean isLocked() {
return sync.isHeldExclusively();
}
public boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
}
public static void run() {
final ExclusiveLock lock = new ExclusiveLock();
for (int i = 0; i < 3; i++) {
final int index = i;
new Thread() {
@Override
public void run() {
lock.lock();
System.out.println("begin:" + index);
try {
System.out.println("run:" + index);
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("end:" + index);
lock.unlock();
}
}
}.start();
}
}
}
運(yùn)行結(jié)果為如下恃疯,可以看到,處于lock ~ unlock
之間的邏輯在同一時(shí)間只能由一個(gè)線(xiàn)程執(zhí)行墨闲。
假如我們注釋掉lock.lock()
和lock.unlock()
今妄,那么打印的結(jié)果為,這時(shí)不能保證是線(xiàn)程安全的鸳碧。
3.2 共享式獲取 & 釋放同步狀態(tài)
3.2.1 獲取
共享式獲取與獨(dú)占式獲取最主要的區(qū)別在于 同一時(shí)刻能否有多個(gè)線(xiàn)程同時(shí)獲取到同步狀態(tài)盾鳞。
調(diào)用同步器的acquireShared(int arg)
方法可以共享式地獲取同步狀態(tài)。鎖的實(shí)現(xiàn)者需要重寫(xiě)tryAcquireShared
方法瞻离,如果該方法的返回值大于0
腾仅,表示能夠獲取到同步狀態(tài)。
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
//1.如果前驅(qū)結(jié)點(diǎn)是頭結(jié)點(diǎn)套利,那么會(huì)嘗試獲取同步狀態(tài)推励。
int r = tryAcquireShared(arg);
if (r >= 0) {
//2.獲取同步狀態(tài)成功,設(shè)為頭結(jié)點(diǎn)肉迫。
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
//3.獲取同步狀態(tài)失敗验辞,那么阻塞當(dāng)前線(xiàn)程。
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
3.2.2 釋放
通過(guò)調(diào)用releaseShared(int args)
方法可以釋放同步狀態(tài)喊衫,該方法在釋放同步狀態(tài)后跌造,將會(huì)喚醒后續(xù)處于阻塞狀態(tài)的結(jié)點(diǎn)。對(duì)于支持多個(gè)線(xiàn)程同時(shí)訪(fǎng)問(wèn)的并發(fā)組件格侯,它和獨(dú)占式的區(qū)別在于鼻听,tryReleaseShared(int arg)
方法必須保證同步狀態(tài)安全釋放,一般是通過(guò)循環(huán)和CAS
來(lái)保證的联四,因?yàn)獒尫磐綘顟B(tài)的操作會(huì)同時(shí)來(lái)自多個(gè)線(xiàn)程撑碴。
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
3.2.3 小結(jié)
與獨(dú)占式獲取 & 釋放同步狀態(tài)類(lèi)似,在繼承AQS
時(shí)主要關(guān)注兩個(gè)重要的模板方法:
-
protected int tryAcquireShared(int arg)
:共享式獲取同步狀態(tài)朝墩,返回值>=0
表示獲取成功醉拓,否則失敗。 -
protected boolean tryReleaseShared(int arg)
:共享式釋放同步狀態(tài)收苏。
下面是一個(gè)簡(jiǎn)單地共享式獲取 & 釋放同步狀態(tài)的示例:
/**
* @author lizejun
**/
public class TwinsLock implements Lock {
private Sync sync = new Sync(2);
private static final class Sync extends AbstractQueuedSynchronizer {
Sync(int count) {
setState(count);
}
@Override
protected int tryAcquireShared(int reduceCount) {
for (;;) {
int current = getState();
int newCount = current - reduceCount;
if (newCount < 0 || compareAndSetState(current, newCount)) {
return newCount;
}
}
}
@Override
protected boolean tryReleaseShared(int returnCount) {
for (;;) {
int current = getState();
int newCount = current + returnCount;
if (compareAndSetState(current, newCount)) {
return true;
}
}
}
Condition newCondition() {
return new ConditionObject();
}
}
@Override
public void lock() {
sync.acquireShared(1);
}
@Override
public void unlock() {
sync.releaseShared(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
@Override
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
@Override
public Condition newCondition() {
return sync.newCondition();
}
@Override
public boolean tryLock() {
return sync.tryAcquireShared(1) >= 0;
}
public static void run() {
final TwinsLock lock = new TwinsLock();
for (int i = 0; i < 5; i++) {
final int index = i;
new Thread() {
@Override
public void run() {
lock.lock();
System.out.println("begin:" + index);
try {
System.out.println("run:" + index);
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("end:" + index);
lock.unlock();
}
}
}.start();
}
}
}
運(yùn)行結(jié)果為如下亿卤,可以看到在同一時(shí)刻只有兩個(gè)線(xiàn)程能夠獲取到鎖:
3.3 獨(dú)占式超時(shí)獲取同步狀態(tài)
通過(guò)調(diào)用同步器的doAcquireNanos(int arg, long nanosTimeout)
可以超時(shí)獲取同步狀態(tài)。
-
synchronized
:當(dāng)一個(gè)線(xiàn)程獲取不到鎖而被阻塞在synchronized
之外時(shí)鹿霸,對(duì)該線(xiàn)程進(jìn)行中斷操作排吴,此時(shí)線(xiàn)程中斷標(biāo)志位會(huì)被修改,但線(xiàn)程仍然會(huì)阻塞在synchronized
方法上懦鼠。 -
acquireInterruptibly(int arg)
:在等待獲取同步狀態(tài)時(shí)钻哩,如果當(dāng)前線(xiàn)程被中斷屹堰,會(huì)立刻返回InterruptedException
。 -
acquireInterruptibly(int arg, long nanosTimeout)
:與獨(dú)占式獲取的區(qū)別在于街氢,在獲取同步狀態(tài)失敗后扯键,不是簡(jiǎn)單地進(jìn)入阻塞狀態(tài),而是會(huì)判斷是否超時(shí)珊肃。- 如果超時(shí) >
1000ns
荣刑,重新計(jì)算超時(shí)時(shí)間,通過(guò)LockSupport.parkNanos
等待對(duì)應(yīng)的超時(shí)時(shí)間后返回 - 如果超時(shí) <=
1000ns
伦乔,不進(jìn)入超時(shí)等待厉亏,而是進(jìn)入快速的自旋過(guò)程。
- 如果超時(shí) >
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
//1.計(jì)算出截止時(shí)間.
final long deadline = System.nanoTime() + nanosTimeout;
//2.加入節(jié)點(diǎn)
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
//3.取出前驅(qū)節(jié)點(diǎn)
final Node p = node.predecessor();
//4.如果獲取成功則直接返回
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
nanosTimeout = deadline - System.nanoTime();
//5.如果到了超時(shí)時(shí)間烈和,則直接返回
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
//6.如果在自旋過(guò)程中被中斷叶堆,那么拋出異常返回
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (faisled)
cancelAcquire(node);
}
}
四、阻塞和喚醒線(xiàn)程
在前面斥杜,我們多次提到了阻塞和喚醒線(xiàn)程,它的實(shí)現(xiàn)依靠的是LockSupport
工具類(lèi)沥匈,它定義了一組的公共靜態(tài)方法蔗喂,提供了最基本的線(xiàn)程阻塞和喚醒功能:
-
park
:阻塞當(dāng)前線(xiàn)程,直到調(diào)用unpark(Thread thread)
或者當(dāng)前線(xiàn)程被中斷才返回高帖。 -
parkNanos(long nanos)
:在park
基礎(chǔ)上增加了超時(shí)返回缰儿。 -
parkUntil(long deadline)
:直到deadline
時(shí)間才返回。 -
unpark(Thread thread)
:?jiǎn)拘烟幱谧枞麪顟B(tài)的線(xiàn)程thread
散址。
五乖阵、Condition
ConditionObject
是同步器AQS
的內(nèi)部類(lèi),每個(gè)Condition
對(duì)象都包含著一個(gè)等待隊(duì)列预麸,該隊(duì)列是實(shí)現(xiàn)等待/通知功能的關(guān)鍵瞪浸,其用法也很簡(jiǎn)單,就是通過(guò)await
和signal/signalAll
方法來(lái)進(jìn)行等待和通知吏祸。
5.1 等待隊(duì)列
每一個(gè)Condition
包含了一個(gè)等待隊(duì)列对蒲,該隊(duì)列結(jié)點(diǎn)的類(lèi)型為AbstractQueuedSynchronizer.Node
,與AQS
中同步隊(duì)列結(jié)點(diǎn)的類(lèi)型相同贡翘,頭結(jié)點(diǎn)firstWaiter
和尾結(jié)點(diǎn)lastWaiter
蹈矮。
這里需要注意的是 一個(gè)AQS
對(duì)象只有一個(gè)同步隊(duì)列,但是它可以關(guān)聯(lián)到多個(gè)等待隊(duì)列上鸣驱。
5.2 等待
調(diào)用Condition
的await()
方法的前提是當(dāng)前線(xiàn)程已經(jīng)獲取了同步狀態(tài)泛鸟。它會(huì)執(zhí)行下面的操作:
- 使用當(dāng)前線(xiàn)程創(chuàng)建一個(gè)新的結(jié)點(diǎn)加入等待隊(duì)列。
- 從同步隊(duì)列中移除當(dāng)前線(xiàn)程踊东,即釋放當(dāng)前線(xiàn)程的同步狀態(tài)北滥,再喚醒其在同步隊(duì)列中的后繼結(jié)點(diǎn)刚操。
public final void await() throws InterruptedException {
if (Thread.interrupted()) throw new InterruptedException();
//1.創(chuàng)建新的結(jié)點(diǎn)加入等待隊(duì)列。
Node node = addConditionWaiter();
//2.釋放同步狀態(tài)碑韵。
long savedState = fullyRelease(node);
int interruptMode = 0;
//3.判斷是否在同步隊(duì)列當(dāng)中赡茸,如果不再則阻塞。
while (!isOnSyncQueue(node)) {
//3.1.等待被喚醒祝闻,喚醒的方法就是調(diào)用 signal 方法占卧。
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//4.被喚醒后從等待隊(duì)列移到同步隊(duì)列中,繼續(xù)參與同步狀態(tài)的競(jìng)爭(zhēng)联喘。
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
5.3 通知
調(diào)用Condition
的signal()
方法的前提條件是當(dāng)前線(xiàn)程已經(jīng)獲取了同步狀態(tài)华蜒,它會(huì)執(zhí)行下面的操作:
- 調(diào)用同步器的
end(Node node)
方法,將等待隊(duì)列中的頭結(jié)點(diǎn)(注意是等待隊(duì)列的頭結(jié)點(diǎn)豁遭,而不是當(dāng)前線(xiàn)程關(guān)聯(lián)的結(jié)點(diǎn))線(xiàn)程安全地移動(dòng)到同步隊(duì)列叭喜。 - 結(jié)點(diǎn)移動(dòng)到同步隊(duì)列后,當(dāng)前線(xiàn)程再使用
LockSupport
喚醒該結(jié)點(diǎn)的線(xiàn)程蓖谢。
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))
return false;
//1.加入到同步隊(duì)列中捂蕴。
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))
//2.喚醒。
LockSupport.unpark(node.thread);
return true;
}
- 被喚醒的線(xiàn)程闪幽,從
await
方法第3
步中的while
循環(huán)中退出啥辨,進(jìn)而調(diào)用同步器的acquireQueued()
方法加入到獲取同步狀態(tài)的競(jìng)爭(zhēng)中(也就是5.2
小結(jié)中從3
循環(huán)退出后的邏輯)。
signalAll
相當(dāng)于對(duì)等待隊(duì)列中的每個(gè)結(jié)點(diǎn)均執(zhí)行一次signal
方法盯腌,效果就是將等待隊(duì)列中所有節(jié)點(diǎn)全部移動(dòng)到同步隊(duì)列中溉知,并喚醒每個(gè)節(jié)點(diǎn)的線(xiàn)程。
5.4 生產(chǎn)者消費(fèi)者模型
生產(chǎn)者消費(fèi)者模型是多線(xiàn)程協(xié)作的經(jīng)典示例腕够。在這個(gè)模型中包含三個(gè)角色级乍。
- 生產(chǎn)者:向緩沖區(qū)中寫(xiě)入,當(dāng)緩沖區(qū)滿(mǎn)時(shí)等待帚湘;當(dāng)緩沖區(qū)有新的數(shù)據(jù)后玫荣,通知消費(fèi)者。
- 消費(fèi)者:從緩存區(qū)獲取客们,當(dāng)緩沖區(qū)為空時(shí)等待崇决,并通知生產(chǎn)者。
- 緩沖區(qū):存儲(chǔ)數(shù)據(jù)底挫,這里我們用
LinkedList
來(lái)表示恒傻。
下面是實(shí)現(xiàn)的代碼:
/**
* @author lizejun
**/
public class Demo {
private static final int MAX_SIZE = 5;
private final LinkedList<Integer> factory = new LinkedList<>();
private Lock lock = new ReentrantLock();
private Condition consumer = lock.newCondition();
private Condition producer = lock.newCondition();
private void producer2() {
lock.lock();
try {
while (factory.size() == MAX_SIZE) {
try {
System.out.println("-- factory is full --");
producer.await();
Thread.sleep(Math.round(Math.random() * 100));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("produce product");
factory.add(factory.size());
consumer.signalAll();
} finally {
lock.unlock();
}
}
private void consumer2() {
lock.lock();
try {
while (factory.size() == 0) {
try {
System.out.println("-- factory is empty --");
consumer.await();
Thread.sleep(Math.round(Math.random() * 100));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("consume product");
factory.removeLast();
producer.signalAll();
} finally {
lock.unlock();
}
}
public static void run2() {
final Demo demo = new Demo();
Thread pThread = new Thread() {
@Override
public void run() {
for (int i = 0; i < 100; i++) {
demo.producer2();
}
}
};
Thread cThread = new Thread() {
@Override
public void run() {
for (int i = 0; i < 100; i++) {
demo.consumer2();
}
}
};
pThread.start();
cThread.start();
}
}
運(yùn)行結(jié)果為:
5.5 Condition 提供的其它方法
-
void await() throw InterruptedException
:當(dāng)前線(xiàn)程進(jìn)入等待隊(duì)列直到被signal
通知或中斷。 -
void awaitUninterruptibly()
:當(dāng)前線(xiàn)程進(jìn)入等待狀態(tài)直到被通知建邓,對(duì)中斷不敏感盈厘。 -
long awaitNanos(long nanosTimeout) throws InterruptedException
:當(dāng)前線(xiàn)程進(jìn)入等待狀態(tài)直到被通知,返回值表示剩余的時(shí)間官边,如果返回值<=0
沸手,那么就是超時(shí)外遇。 -
boolean waitUntil(Data deadline) throws InterruptedException
:當(dāng)前線(xiàn)程進(jìn)入等待狀態(tài)直到被通知、中斷或者到某個(gè)時(shí)間契吉,如果沒(méi)有到指定時(shí)間就通知跳仿,則返回true
。 -
void signal()
:?jiǎn)拘岩粋€(gè)等待在Condition
上的線(xiàn)程捐晶,該線(xiàn)程從等待方法返回前必須獲得和Condition
相關(guān)的鎖菲语。 -
void signalAll()
:與signal()
的區(qū)別在于,它會(huì)喚醒所有等待在Condition
上的線(xiàn)程惑灵。
這里容易混淆的是響應(yīng)中斷的概念山上,根據(jù)await()
調(diào)用后所處的階段,可以分為下面三種情況:
- 調(diào)用
awaiXXt()
之前已經(jīng)被中斷:-
await()
:拋出異常英支。 -
awaitUninterruptibly()
:不拋出異常佩憾。
-
- 調(diào)用完
awaitXX()
之后,仍然處于等待隊(duì)列中:-
await()
:被喚醒干花,并拋出異常妄帘。 -
awaitUninterruptibly()
:被喚醒,設(shè)置標(biāo)志位interrupted
為true
后池凄,繼續(xù)在等待隊(duì)列中等待寄摆。
-
- 調(diào)用完
awaitXX()
之后,并且調(diào)用了signal
(將它等待隊(duì)列移動(dòng)到了同步隊(duì)列):-
await()
:繼續(xù)在同步隊(duì)列中等待修赞,并不會(huì)拋出異常,只是在返回后Thread
的interrupted
為true
桑阶。 -
awaitUninterruptibly()
:和await()
的表現(xiàn)是一樣的柏副。
-
也就是說(shuō)是響應(yīng)中斷是針對(duì) 調(diào)用之前和處于等待隊(duì)列 這兩種情況而言的,當(dāng)它已經(jīng)移到到同步隊(duì)列后蚣录,兩者都不會(huì)響應(yīng)中斷割择。