此篇博客所有源碼均來自JDK 1.8
在沒有Lock之前已慢,我們使用synchronized來控制同步,配合Object的wait()旭咽、notify()系列方法可以實(shí)現(xiàn)等待/通知模式请垛。在Java SE5后混稽,Java提供了Lock接口膳叨,相對(duì)于Synchronized而言龄坪,Lock提供了條件Condition,對(duì)線程的等待抄课、喚醒操作更加詳細(xì)和靈活。下圖是Condition與Object的監(jiān)視器方法的對(duì)比(摘自《Java并發(fā)編程的藝術(shù)》):
Condition提供了一系列的方法來對(duì)阻塞和喚醒線程:
- await() :造成當(dāng)前線程在接到信號(hào)或被中斷之前一直處于等待狀態(tài)哎榴。
- **await(long time, TimeUnit unit) **:造成當(dāng)前線程在接到信號(hào)、被中斷或到達(dá)指定等待時(shí)間之前一直處于等待狀態(tài)。
- **awaitNanos(long nanosTimeout) **:造成當(dāng)前線程在接到信號(hào)尚蝌、被中斷或到達(dá)指定等待時(shí)間之前一直處于等待狀態(tài)迎变。返回值表示剩余時(shí)間,如果在nanosTimesout之前喚醒飘言,那么返回值 = nanosTimeout - 消耗時(shí)間衣形,如果返回值 <= 0 ,則可以認(rèn)定它已經(jīng)超時(shí)了。
- **awaitUninterruptibly() **:造成當(dāng)前線程在接到信號(hào)之前一直處于等待狀態(tài)姿鸿∽晃猓【注意:該方法對(duì)中斷不敏感】。
- **awaitUntil(Date deadline) **:造成當(dāng)前線程在接到信號(hào)苛预、被中斷或到達(dá)指定最后期限之前一直處于等待狀態(tài)句狼。如果沒有到指定時(shí)間就被通知,則返回true热某,否則表示到了指定時(shí)間腻菇,返回返回false。
- signal():喚醒一個(gè)等待線程昔馋。該線程從等待方法返回前必須獲得與Condition相關(guān)的鎖筹吐。
- signal()All:喚醒所有等待線程。能夠從等待方法返回的線程必須獲得與Condition相關(guān)的鎖秘遏。
Condition是一種廣義上的條件隊(duì)列骏令。他為線程提供了一種更為靈活的等待/通知模式,線程在調(diào)用await方法后執(zhí)行掛起操作垄提,直到線程等待的某個(gè)條件為真時(shí)才會(huì)被喚醒。Condition必須要配合鎖一起使用周拐,因?yàn)閷?duì)共享狀態(tài)變量的訪問發(fā)生在多線程環(huán)境下铡俐。一個(gè)Condition的實(shí)例必須與一個(gè)Lock綁定,因此Condition一般都是作為Lock的內(nèi)部實(shí)現(xiàn)妥粟。
Condtion的實(shí)現(xiàn)
獲取一個(gè)Condition必須要通過Lock的newCondition()方法审丘。該方法定義在接口Lock下面,返回的結(jié)果是綁定到此 Lock 實(shí)例的新 Condition 實(shí)例勾给。Condition為一個(gè)接口滩报,其下僅有一個(gè)實(shí)現(xiàn)類ConditionObject,由于Condition的操作需要獲取相關(guān)的鎖播急,而AQS則是同步鎖的實(shí)現(xiàn)基礎(chǔ)脓钾,所以ConditionObject則定義為AQS的內(nèi)部類。定義如下:
public class ConditionObject implements Condition, java.io.Serializable {
}
等待隊(duì)列
每個(gè)Condition對(duì)象都包含著一個(gè)FIFO隊(duì)列桩警,該隊(duì)列是Condition對(duì)象通知/等待功能的關(guān)鍵可训。在隊(duì)列中每一個(gè)節(jié)點(diǎn)都包含著一個(gè)線程引用,該線程就是在該Condition對(duì)象上等待的線程。我們看Condition的定義就明白了:
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
//頭節(jié)點(diǎn)
private transient Node firstWaiter;
//尾節(jié)點(diǎn)
private transient Node lastWaiter;
public ConditionObject() {
}
/** 省略方法 **/
}
從上面代碼可以看出Condition擁有首節(jié)點(diǎn)(firstWaiter)握截,尾節(jié)點(diǎn)(lastWaiter)飞崖。當(dāng)前線程調(diào)用await()方法,將會(huì)以當(dāng)前線程構(gòu)造成一個(gè)節(jié)點(diǎn)(Node)谨胞,并將節(jié)點(diǎn)加入到該隊(duì)列的尾部固歪。結(jié)構(gòu)如下:
Node里面包含了當(dāng)前線程的引用。Node定義與AQS的CLH同步隊(duì)列的節(jié)點(diǎn)使用的都是同一個(gè)類(AbstractQueuedSynchronized.Node靜態(tài)內(nèi)部類)胯努。
Condition的隊(duì)列結(jié)構(gòu)比CLH同步隊(duì)列的結(jié)構(gòu)簡單些牢裳,新增過程較為簡單只需要將原尾節(jié)點(diǎn)的nextWaiter指向新增節(jié)點(diǎn),然后更新lastWaiter即可康聂。
等待
調(diào)用Condition的await()方法會(huì)使當(dāng)前線程進(jìn)入等待狀態(tài)贰健,同時(shí)會(huì)加入到Condition等待隊(duì)列同時(shí)釋放鎖。當(dāng)從await()方法返回時(shí)恬汁,當(dāng)前線程一定是獲取了Condition相關(guān)連的鎖伶椿。
public final void await() throws InterruptedException {
// 當(dāng)前線程中斷
if (Thread.interrupted())
throw new InterruptedException();
//當(dāng)前線程加入等待隊(duì)列
Node node = addConditionWaiter();
//釋放鎖
long savedState = fullyRelease(node);
int interruptMode = 0;
/**
* 檢測此節(jié)點(diǎn)的線程是否在同步隊(duì)上,如果不在氓侧,則說明該線程還不具備競爭鎖的資格脊另,則繼續(xù)等待
* 直到檢測到此節(jié)點(diǎn)在同步隊(duì)列上
*/
while (!isOnSyncQueue(node)) {
//線程掛起
LockSupport.park(this);
//如果已經(jīng)中斷了,則退出
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//競爭同步狀態(tài)
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
//清理下條件隊(duì)列中的不是在等待條件的節(jié)點(diǎn)
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
此段代碼的邏輯是:首先將當(dāng)前線程新建一個(gè)節(jié)點(diǎn)同時(shí)加入到條件隊(duì)列中约巷,然后釋放當(dāng)前線程持有的同步狀態(tài)偎痛。然后則是不斷檢測該節(jié)點(diǎn)代表的線程釋放出現(xiàn)在CLH同步隊(duì)列中(收到signal信號(hào)之后就會(huì)在AQS隊(duì)列中檢測到),如果不存在則一直掛起独郎,否則參與競爭同步狀態(tài)踩麦。
加入條件隊(duì)列(addConditionWaiter())源碼如下:
private Node addConditionWaiter() {
Node t = lastWaiter; //尾節(jié)點(diǎn)
//Node的節(jié)點(diǎn)狀態(tài)如果不為CONDITION,則表示該節(jié)點(diǎn)不處于等待狀態(tài)氓癌,需要清除節(jié)點(diǎn)
if (t != null && t.waitStatus != Node.CONDITION) {
//清除條件隊(duì)列中所有狀態(tài)不為Condition的節(jié)點(diǎn)
unlinkCancelledWaiters();
t = lastWaiter;
}
//當(dāng)前線程新建節(jié)點(diǎn)谓谦,狀態(tài)CONDITION
Node node = new Node(Thread.currentThread(), Node.CONDITION);
/**
* 將該節(jié)點(diǎn)加入到條件隊(duì)列中最后一個(gè)位置
*/
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
該方法主要是將當(dāng)前線程加入到Condition條件隊(duì)列中。當(dāng)然在加入到尾節(jié)點(diǎn)之前會(huì)清楚所有狀態(tài)不為Condition的節(jié)點(diǎn)贪婉。
fullyRelease(Node node)反粥,負(fù)責(zé)釋放該線程持有的鎖。
final long fullyRelease(Node node) {
boolean failed = true;
try {
//節(jié)點(diǎn)狀態(tài)--其實(shí)就是持有鎖的數(shù)量
long savedState = getState();
//釋放鎖
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
isOnSyncQueue(Node node):如果一個(gè)節(jié)點(diǎn)剛開始在條件隊(duì)列上疲迂,現(xiàn)在在同步隊(duì)列上獲取鎖則返回true
final boolean isOnSyncQueue(Node node) {
//狀態(tài)為Condition才顿,獲取前驅(qū)節(jié)點(diǎn)為null,返回false
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
//后繼節(jié)點(diǎn)不為null尤蒿,肯定在CLH同步隊(duì)列中
if (node.next != null)
return true;
return findNodeFromTail(node);
}
unlinkCancelledWaiters():負(fù)責(zé)將條件隊(duì)列中狀態(tài)不為Condition的節(jié)點(diǎn)刪除
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null;
while (t != null) {
Node next = t.nextWaiter;
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}
通知
調(diào)用Condition的signal()方法郑气,將會(huì)喚醒在等待隊(duì)列中等待最長時(shí)間的節(jié)點(diǎn)(條件隊(duì)列里的首節(jié)點(diǎn)),在喚醒節(jié)點(diǎn)前优质,會(huì)將節(jié)點(diǎn)移到CLH同步隊(duì)列中竣贪。
public final void signal() {
//檢測當(dāng)前線程是否為擁有鎖的獨(dú)
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
//頭節(jié)點(diǎn)军洼,喚醒條件隊(duì)列中的第一個(gè)節(jié)點(diǎn)
Node first = firstWaiter;
if (first != null)
doSignal(first); //喚醒
}
該方法首先會(huì)判斷當(dāng)前線程是否已經(jīng)獲得了鎖,這是前置條件演怎。然后喚醒條件隊(duì)列中的頭節(jié)點(diǎn)匕争。
doSignal(Node first):喚醒頭節(jié)點(diǎn)
private void doSignal(Node first) {
do {
//修改頭結(jié)點(diǎn),完成舊頭結(jié)點(diǎn)的移出工作
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
doSignal(Node first)主要是做兩件事:1.修改頭節(jié)點(diǎn)爷耀,2.調(diào)用transferForSignal(Node first) 方法將節(jié)點(diǎn)移動(dòng)到CLH同步隊(duì)列中甘桑。transferForSignal(Node first)源碼如下:
final boolean transferForSignal(Node node) {
//將該節(jié)點(diǎn)從狀態(tài)CONDITION改變?yōu)槌跏紶顟B(tài)0,
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
//將節(jié)點(diǎn)加入到syn隊(duì)列中去,返回的是syn隊(duì)列中node節(jié)點(diǎn)前面的一個(gè)節(jié)點(diǎn)
Node p = enq(node);
int ws = p.waitStatus;
//如果結(jié)點(diǎn)p的狀態(tài)為cancel 或者修改waitStatus失敗歹叮,則直接喚醒
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
整個(gè)通知的流程如下:
- 判斷當(dāng)前線程是否已經(jīng)獲取了鎖跑杭,如果沒有獲取則直接拋出異常,因?yàn)楂@取鎖為通知的前置條件咆耿。
- 如果線程已經(jīng)獲取了鎖德谅,則將喚醒條件隊(duì)列的首節(jié)點(diǎn)
- 喚醒首節(jié)點(diǎn)是先將條件隊(duì)列中的頭節(jié)點(diǎn)移出,然后調(diào)用AQS的enq(Node node)方法將其安全地移到CLH同步隊(duì)列中
- 最后判斷如果該節(jié)點(diǎn)的同步狀態(tài)是否為Cancel萨螺,或者修改狀態(tài)為Signal失敗時(shí)窄做,則直接調(diào)用LockSupport喚醒該節(jié)點(diǎn)的線程。
總結(jié)
一個(gè)線程獲取鎖后慰技,通過調(diào)用Condition的await()方法椭盏,會(huì)將當(dāng)前線程先加入到條件隊(duì)列中,然后釋放鎖吻商,最后通過isOnSyncQueue(Node node)方法不斷自檢看節(jié)點(diǎn)是否已經(jīng)在CLH同步隊(duì)列了掏颊,如果是則嘗試獲取鎖,否則一直掛起艾帐。當(dāng)線程調(diào)用signal()方法后乌叶,程序首先檢查當(dāng)前線程是否獲取了鎖,然后通過doSignal(Node first)方法喚醒CLH同步隊(duì)列的首節(jié)點(diǎn)柒爸。被喚醒的線程枉昏,將從await()方法中的while循環(huán)中退出來,然后調(diào)用acquireQueued()方法競爭同步狀態(tài)揍鸟。
Condition的應(yīng)用
只知道原理,如果不知道使用那就坑爹了句旱,下面是用Condition實(shí)現(xiàn)的生產(chǎn)者消費(fèi)者問題:
public class ConditionTest {
private LinkedList<String> buffer; //容器
private int maxSize ; //容器最大
private Lock lock;
private Condition fullCondition;
private Condition notFullCondition;
ConditionTest(int maxSize){
this.maxSize = maxSize;
buffer = new LinkedList<String>();
lock = new ReentrantLock();
fullCondition = lock.newCondition();
notFullCondition = lock.newCondition();
}
public void set(String string) throws InterruptedException {
lock.lock(); //獲取鎖
try {
while (maxSize == buffer.size()){
notFullCondition.await(); //滿了阳藻,添加的線程進(jìn)入等待狀態(tài)
}
buffer.add(string);
fullCondition.signal();
} finally {
lock.unlock(); //記得釋放鎖
}
}
public String get() throws InterruptedException {
String string;
lock.lock();
try {
while (buffer.size() == 0){
fullCondition.await();
}
string = buffer.poll();
notFullCondition.signal();
} finally {
lock.unlock();
}
return string;
}
}