原文出處http://cmsblogs.com/ 『chenssy』
【注】:SynchronousQueue實(shí)現(xiàn)算法看的暈乎乎的亭姥,寫(xiě)了好久才寫(xiě)完莺禁,如果當(dāng)中有什么錯(cuò)誤之處,忘各位指正
作為BlockingQueue中的一員只酥,SynchronousQueue與其他BlockingQueue有著不同特性:
- SynchronousQueue沒(méi)有容量哩罪。與其他BlockingQueue不同,SynchronousQueue是一個(gè)不存儲(chǔ)元素的BlockingQueue锁摔。每一個(gè)put操作必須要等待一個(gè)take操作廓旬,否則不能繼續(xù)添加元素,反之亦然谐腰。
- 因?yàn)闆](méi)有容量孕豹,所以對(duì)應(yīng) peek, contains, clear, isEmpty ... 等方法其實(shí)是無(wú)效的。例如clear是不執(zhí)行任何操作的怔蚌,contains始終返回false,peek始終返回null巩步。
- SynchronousQueue分為公平和非公平,默認(rèn)情況下采用非公平性訪問(wèn)策略桦踊,當(dāng)然也可以通過(guò)構(gòu)造函數(shù)來(lái)設(shè)置為公平性訪問(wèn)策略(為true即可)椅野。
- 若使用 TransferQueue, 則隊(duì)列中永遠(yuǎn)會(huì)存在一個(gè) dummy node(這點(diǎn)后面詳細(xì)闡述)。
SynchronousQueue非常適合做交換工作,生產(chǎn)者的線程和消費(fèi)者的線程同步以傳遞某些信息竟闪、事件或者任務(wù)离福。
SynchronousQueue
與其他BlockingQueue一樣,SynchronousQueue同樣繼承AbstractQueue和實(shí)現(xiàn)BlockingQueue接口:
public class SynchronousQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable
SynchronousQueue提供了兩個(gè)構(gòu)造函數(shù):
public SynchronousQueue() {
this(false);
}
public SynchronousQueue(boolean fair) {
// 通過(guò) fair 值來(lái)決定公平性和非公平性
// 公平性使用TransferQueue炼蛤,非公平性采用TransferStack
transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}
TransferQueue妖爷、TransferStack繼承Transferer,Transferer為SynchronousQueue的內(nèi)部類理朋,它提供了一個(gè)方法transfer()絮识,該方法定義了轉(zhuǎn)移數(shù)據(jù)的規(guī)范,如下:
abstract static class Transferer<E> {
abstract E transfer(E e, boolean timed, long nanos);
}
transfer()方法主要用來(lái)完成轉(zhuǎn)移數(shù)據(jù)的嗽上,如果e != null次舌,相當(dāng)于將一個(gè)數(shù)據(jù)交給消費(fèi)者,如果e == null兽愤,則相當(dāng)于從一個(gè)生產(chǎn)者接收一個(gè)消費(fèi)者交出的數(shù)據(jù)彼念。
SynchronousQueue采用隊(duì)列TransferQueue來(lái)實(shí)現(xiàn)公平性策略,采用堆棧TransferStack來(lái)實(shí)現(xiàn)非公平性策略浅萧,他們兩種都是通過(guò)鏈表實(shí)現(xiàn)的逐沙,其節(jié)點(diǎn)分別為QNode,SNode洼畅。TransferQueue和TransferStack在SynchronousQueue中扮演著非常重要的作用吩案,SynchronousQueue的put、take操作都是委托這兩個(gè)類來(lái)實(shí)現(xiàn)的土思。
TransferQueue
TransferQueue是實(shí)現(xiàn)公平性策略的核心類务热,其節(jié)點(diǎn)為QNode,其定義如下:
static final class TransferQueue<E> extends Transferer<E> {
/** 頭節(jié)點(diǎn) */
transient volatile QNode head;
/** 尾節(jié)點(diǎn) */
transient volatile QNode tail;
// 指向一個(gè)取消的結(jié)點(diǎn)
//當(dāng)一個(gè)節(jié)點(diǎn)中最后一個(gè)插入時(shí)己儒,它被取消了但是可能還沒(méi)有離開(kāi)隊(duì)列
transient volatile QNode cleanMe;
/**
* 省略很多代碼O(∩_∩)O
*/
}
在TransferQueue中除了頭、尾節(jié)點(diǎn)外還存在一個(gè)cleanMe節(jié)點(diǎn)捆毫。該節(jié)點(diǎn)主要用于標(biāo)記闪湾,當(dāng)刪除的節(jié)點(diǎn)是尾節(jié)點(diǎn)時(shí)則需要使用該節(jié)點(diǎn)。
同時(shí)绩卤,對(duì)于TransferQueue需要注意的是途样,其隊(duì)列永遠(yuǎn)都存在一個(gè)dummy node,在構(gòu)造時(shí)創(chuàng)建:
TransferQueue() {
QNode h = new QNode(null, false); // initialize to dummy node.
head = h;
tail = h;
}
在TransferQueue中定義了QNode類來(lái)表示隊(duì)列中的節(jié)點(diǎn)濒憋,QNode節(jié)點(diǎn)定義如下:
static final class QNode {
// next 域
volatile QNode next;
// item數(shù)據(jù)項(xiàng)
volatile Object item;
// 等待線程何暇,用于park/unpark
volatile Thread waiter; // to control park/unpark
//模式,表示當(dāng)前是數(shù)據(jù)還是請(qǐng)求凛驮,只有當(dāng)匹配的模式相匹配時(shí)才會(huì)交換
final boolean isData;
QNode(Object item, boolean isData) {
this.item = item;
this.isData = isData;
}
/**
* CAS next域裆站,在TransferQueue中用于向next推進(jìn)
*/
boolean casNext(QNode cmp, QNode val) {
return next == cmp &&
UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
/**
* CAS itme數(shù)據(jù)項(xiàng)
*/
boolean casItem(Object cmp, Object val) {
return item == cmp &&
UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
/**
* 取消本結(jié)點(diǎn),將item域設(shè)置為自身
*/
void tryCancel(Object cmp) {
UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this);
}
/**
* 是否被取消
* 與tryCancel相照應(yīng)只需要判斷item釋放等于自身即可
*/
boolean isCancelled() {
return item == this;
}
boolean isOffList() {
return next == this;
}
private static final sun.misc.Unsafe UNSAFE;
private static final long itemOffset;
private static final long nextOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = QNode.class;
itemOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("item"));
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
} catch (Exception e) {
throw new Error(e);
}
}
}
上面代碼沒(méi)啥好看的,需要注意的一點(diǎn)就是isData宏胯,該屬性在進(jìn)行數(shù)據(jù)交換起到關(guān)鍵性作用羽嫡,兩個(gè)線程進(jìn)行數(shù)據(jù)交換的時(shí)候,必須要兩者的模式保持一致肩袍。
TransferStack
TransferStack用于實(shí)現(xiàn)非公平性,定義如下:
static final class TransferStack<E> extends Transferer<E> {
static final int REQUEST = 0;
static final int DATA = 1;
static final int FULFILLING = 2;
volatile SNode head;
/**
* 省略一堆代碼 O(∩_∩)O~
*/
}
TransferStack中定義了三個(gè)狀態(tài):REQUEST表示消費(fèi)數(shù)據(jù)的消費(fèi)者,DATA表示生產(chǎn)數(shù)據(jù)的生產(chǎn)者皮仁,F(xiàn)ULFILLING剃根,表示匹配另一個(gè)生產(chǎn)者或消費(fèi)者。任何線程對(duì)TransferStack的操作都屬于上述3種狀態(tài)中的一種(對(duì)應(yīng)著SNode節(jié)點(diǎn)的mode)艰管。同時(shí)還包含一個(gè)head域甫窟,表示頭結(jié)點(diǎn)。
內(nèi)部節(jié)點(diǎn)SNode定義如下:
static final class SNode {
// next 域
volatile SNode next;
// 相匹配的節(jié)點(diǎn)
volatile SNode match;
// 等待的線程
volatile Thread waiter;
// item 域
Object item; // data; or null for REQUESTs
// 模型
int mode;
/**
* item域和mode域不需要使用volatile修飾蛙婴,因?yàn)樗鼈冊(cè)趘olatile/atomic操作之前寫(xiě)粗井,之后讀
*/
SNode(Object item) {
this.item = item;
}
boolean casNext(SNode cmp, SNode val) {
return cmp == next &&
UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
/**
* 將s結(jié)點(diǎn)與本結(jié)點(diǎn)進(jìn)行匹配,匹配成功街图,則unpark等待線程
*/
boolean tryMatch(SNode s) {
if (match == null &&
UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
Thread w = waiter;
if (w != null) { // waiters need at most one unpark
waiter = null;
LockSupport.unpark(w);
}
return true;
}
return match == s;
}
void tryCancel() {
UNSAFE.compareAndSwapObject(this, matchOffset, null, this);
}
boolean isCancelled() {
return match == this;
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long matchOffset;
private static final long nextOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = SNode.class;
matchOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("match"));
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
} catch (Exception e) {
throw new Error(e);
}
}
}
上面簡(jiǎn)單介紹了TransferQueue浇衬、TransferStack,由于SynchronousQueue的put餐济、take操作都是調(diào)用Transfer的transfer()方法耘擂,只不過(guò)是傳遞的參數(shù)不同而已,put傳遞的是e參數(shù)絮姆,所以模式為數(shù)據(jù)(公平isData = true醉冤,非公平mode= DATA),而take操作傳遞的是null篙悯,所以模式為請(qǐng)求(公平isData = false蚁阳,非公平mode = REQUEST),如下:
// put操作
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
if (transferer.transfer(e, false, 0) == null) {
Thread.interrupted();
throw new InterruptedException();
}
}
// take操作
public E take() throws InterruptedException {
E e = transferer.transfer(null, false, 0);
if (e != null)
return e;
Thread.interrupted();
throw new InterruptedException();
}
公平模式
公平性調(diào)用TransferQueue的transfer方法:
E transfer(E e, boolean timed, long nanos) {
QNode s = null;
// 當(dāng)前節(jié)點(diǎn)模式
boolean isData = (e != null);
for (;;) {
QNode t = tail;
QNode h = head;
// 頭鸽照、尾節(jié)點(diǎn) 為null螺捐,沒(méi)有初始化
if (t == null || h == null)
continue;
// 頭尾節(jié)點(diǎn)相等(隊(duì)列為null) 或者當(dāng)前節(jié)點(diǎn)和隊(duì)列節(jié)點(diǎn)模式一樣
if (h == t || t.isData == isData) {
// tn = t.next
QNode tn = t.next;
// t != tail表示已有其他線程操作了,修改了tail矮燎,重新再來(lái)
if (t != tail)
continue;
// tn != null定血,表示已經(jīng)有其他線程添加了節(jié)點(diǎn),tn 推進(jìn)诞外,重新處理
if (tn != null) {
// 當(dāng)前線程幫忙推進(jìn)尾節(jié)點(diǎn)澜沟,就是嘗試將tn設(shè)置為尾節(jié)點(diǎn)
advanceTail(t, tn);
continue;
}
// 調(diào)用的方法的 wait 類型的, 并且 超時(shí)了, 直接返回 null
// timed 在take操作闡述
if (timed && nanos <= 0)
return null;
// s == null,構(gòu)建一個(gè)新節(jié)點(diǎn)Node
if (s == null)
s = new QNode(e, isData);
// 將新建的節(jié)點(diǎn)加入到隊(duì)列中峡谊,如果不成功茫虽,繼續(xù)處理
if (!t.casNext(null, s))
continue;
// 替換尾節(jié)點(diǎn)
advanceTail(t, s);
// 調(diào)用awaitFulfill, 若節(jié)點(diǎn)是 head.next, 則進(jìn)行自旋
// 若不是的話, 直接 block, 直到有其他線程 與之匹配, 或它自己進(jìn)行線程的中斷
Object x = awaitFulfill(s, e, timed, nanos);
// 若返回的x == s表示刊苍,當(dāng)前線程已經(jīng)超時(shí)或者中斷,不然的話s == null或者是匹配的節(jié)點(diǎn)
if (x == s) {
// 清理節(jié)點(diǎn)S
clean(t, s);
return null;
}
// isOffList:用于判斷節(jié)點(diǎn)是否已經(jīng)從隊(duì)列中離開(kāi)了
if (!s.isOffList()) {
// 嘗試將S節(jié)點(diǎn)設(shè)置為head席噩,移出t
advanceHead(t, s);
if (x != null)
s.item = s;
// 釋放線程 ref
s.waiter = null;
}
// 返回
return (x != null) ? (E)x : e;
}
// 這里是從head.next開(kāi)始班缰,因?yàn)門(mén)ransferQueue總是會(huì)存在一個(gè)dummy node節(jié)點(diǎn)
else {
// 節(jié)點(diǎn)
QNode m = h.next;
// 不一致讀,重新開(kāi)始
// 有其他線程更改了線程結(jié)構(gòu)
if (t != tail || m == null || h != head)
continue;
/**
* 生產(chǎn)者producer和消費(fèi)者consumer匹配操作
*/
Object x = m.item;
// isData == (x != null):判斷isData與x的模式是否相同悼枢,相同表示已經(jīng)匹配了
// x == m :m節(jié)點(diǎn)被取消了
// !m.casItem(x, e):如果嘗試將數(shù)據(jù)e設(shè)置到m上失敗
if (isData == (x != null) || x == m || !m.casItem(x, e)) {
// 將m設(shè)置為頭結(jié)點(diǎn)埠忘,h出列,然后重試
advanceHead(h, m);
continue;
}
// 成功匹配了馒索,m設(shè)置為頭結(jié)點(diǎn)h出列莹妒,向前推進(jìn)
advanceHead(h, m);
// 喚醒m上的等待線程
LockSupport.unpark(m.waiter);
return (x != null) ? (E)x : e;
}
}
}
整個(gè)transfer的算法如下:
- 如果隊(duì)列為null或者尾節(jié)點(diǎn)模式與當(dāng)前節(jié)點(diǎn)模式一致,則嘗試將節(jié)點(diǎn)加入到等待隊(duì)列中(采用自旋的方式)绰上,直到被匹配或旨怠、超時(shí)或者取消。匹配成功的話要么返回null(producer返回的)要么返回真正傳遞的值(consumer返回的)蜈块,如果返回的是node節(jié)點(diǎn)本身則表示當(dāng)前線程超時(shí)或者取消了鉴腻。
- 如果隊(duì)列不為null,且隊(duì)列的節(jié)點(diǎn)是當(dāng)前節(jié)點(diǎn)匹配的節(jié)點(diǎn)百揭,則進(jìn)行數(shù)據(jù)的傳遞匹配并返回匹配節(jié)點(diǎn)的數(shù)據(jù)
- 在整個(gè)過(guò)程中都會(huì)檢測(cè)并幫助其他線程推進(jìn)
當(dāng)隊(duì)列為空時(shí)爽哎,節(jié)點(diǎn)入列然后通過(guò)調(diào)用awaitFulfill()方法自旋,該方法主要用于自旋/阻塞節(jié)點(diǎn)器一,直到節(jié)點(diǎn)被匹配返回或者取消课锌、中斷。
Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
// 超時(shí)控制
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Thread w = Thread.currentThread();
// 自旋次數(shù)
// 如果節(jié)點(diǎn)Node恰好是head節(jié)點(diǎn)祈秕,則自旋一段時(shí)間渺贤,這里主要是為了效率問(wèn)題,如果里面阻塞请毛,會(huì)存在喚醒志鞍、線程上下文切換的問(wèn)題
// 如果生產(chǎn)者、消費(fèi)者者里面到來(lái)的話获印,就避免了這個(gè)阻塞的過(guò)程
int spins = ((head.next == s) ?
(timed ? maxTimedSpins : maxUntimedSpins) : 0);
// 自旋
for (;;) {
// 線程中斷了述雾,剔除當(dāng)前節(jié)點(diǎn)
if (w.isInterrupted())
s.tryCancel(e);
// 如果線程進(jìn)行了阻塞 -> 喚醒或者中斷了,那么x != e 肯定成立兼丰,直接返回當(dāng)前節(jié)點(diǎn)即可
Object x = s.item;
if (x != e)
return x;
// 超時(shí)判斷
if (timed) {
nanos = deadline - System.nanoTime();
// 如果超時(shí)了,取消節(jié)點(diǎn),continue唆缴,在if(x != e)肯定會(huì)成立鳍征,直接返回x
if (nanos <= 0L) {
s.tryCancel(e);
continue;
}
}
// 自旋- 1
if (spins > 0)
--spins;
// 等待線程
else if (s.waiter == null)
s.waiter = w;
// 進(jìn)行沒(méi)有超時(shí)的 park
else if (!timed)
LockSupport.park(this);
// 自旋次數(shù)過(guò)了, 直接 + timeout 方式 park
else if (nanos > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanos);
}
}
在自旋/阻塞過(guò)程中做了一點(diǎn)優(yōu)化,就是判斷當(dāng)前節(jié)點(diǎn)是否為對(duì)頭元素面徽,如果是的則先自旋艳丛,如果自旋次數(shù)過(guò)了匣掸,則才阻塞,這樣做的主要目的就在如果生產(chǎn)者氮双、消費(fèi)者立馬來(lái)匹配了則不需要阻塞碰酝,因?yàn)樽枞拘褧?huì)消耗資源戴差。在整個(gè)自旋的過(guò)程中會(huì)不斷判斷是否超時(shí)或者中斷了送爸,如果中斷或者超時(shí)了則調(diào)用tryCancel()取消該節(jié)點(diǎn)。
tryCancel
void tryCancel(Object cmp) {
UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this);
}
取消過(guò)程就是將節(jié)點(diǎn)的item設(shè)置為自身(itemOffset是item的偏移量)暖释。所以在調(diào)用awaitFulfill()方法時(shí)袭厂,如果當(dāng)前線程被取消、中斷球匕、超時(shí)了那么返回的值肯定時(shí)S纹磺,否則返回的則是匹配的節(jié)點(diǎn)。如果返回值是節(jié)點(diǎn)S亮曹,那么if(x == s)必定成立橄杨,如下:
Object x = awaitFulfill(s, e, timed, nanos);
if (x == s) { // wait was cancelled
clean(t, s);
return null;
}
如果返回的x == s成立,則調(diào)用clean()方法清理節(jié)點(diǎn)S:
void clean(QNode pred, QNode s) {
//
s.waiter = null;
while (pred.next == s) {
QNode h = head;
QNode hn = h.next;
// hn節(jié)點(diǎn)被取消了照卦,向前推進(jìn)
if (hn != null && hn.isCancelled()) {
advanceHead(h, hn);
continue;
}
// 隊(duì)列為空式矫,直接return null
QNode t = tail;
if (t == h)
return;
QNode tn = t.next;
// 不一致,說(shuō)明有其他線程改變了tail節(jié)點(diǎn)窄瘟,重新開(kāi)始
if (t != tail)
continue;
// tn != null 推進(jìn)tail節(jié)點(diǎn)衷佃,重新開(kāi)始
if (tn != null) {
advanceTail(t, tn);
continue;
}
// s 不是尾節(jié)點(diǎn) 移出
if (s != t) {
QNode sn = s.next;
// 如果s已經(jīng)被移除退出循環(huán),否則嘗試斷開(kāi)s
if (sn == s || pred.casNext(s, sn))
return;
}
// s是尾節(jié)點(diǎn)蹄葱,則有可能會(huì)有其他線程在添加新節(jié)點(diǎn)氏义,則cleanMe出場(chǎng)
QNode dp = cleanMe;
// 如果dp不為null,說(shuō)明是前一個(gè)被取消節(jié)點(diǎn)图云,將其移除
if (dp != null) {
QNode d = dp.next;
QNode dn;
if (d == null || // 節(jié)點(diǎn)d已經(jīng)刪除
d == dp || // 原來(lái)的節(jié)點(diǎn) cleanMe 已經(jīng)通過(guò) advanceHead 進(jìn)行刪除
!d.isCancelled() || // 原來(lái)的節(jié)點(diǎn) s已經(jīng)刪除
(d != t && // d 不是tail節(jié)點(diǎn)
(dn = d.next) != null && //
dn != d && // that is on list
dp.casNext(d, dn))) // d unspliced
// 清除 cleanMe 節(jié)點(diǎn), 這里的 dp == pred 若成立, 說(shuō)明清除節(jié)點(diǎn)s惯悠,成功, 直接return, 不然的話要再次循環(huán)
casCleanMe(dp, null);
if (dp == pred)
return;
} else if (casCleanMe(null, pred)) // 原來(lái)的 cleanMe 是 null, 則將 pred 標(biāo)記為 cleamMe 為下次 清除 s 節(jié)點(diǎn)做標(biāo)識(shí)
return;
}
}
這個(gè)clean()方法感覺(jué)有點(diǎn)兒難度,我也看得不是很懂竣况。這里是引用http://www.reibang.com/p/95cb570c8187
刪除的節(jié)點(diǎn)不是queue尾節(jié)點(diǎn), 這時(shí) 直接 pred.casNext(s, s.next) 方式來(lái)進(jìn)行刪除(和ConcurrentLikedQueue中差不多)
刪除的節(jié)點(diǎn)是隊(duì)尾節(jié)點(diǎn)
- 此時(shí) cleanMe == null, 則 前繼節(jié)點(diǎn)pred標(biāo)記為 cleanMe, 為下次刪除做準(zhǔn)備
- 此時(shí) cleanMe != null, 先刪除上次需要?jiǎng)h除的節(jié)點(diǎn), 然后將 cleanMe至null, 讓后再將 pred 賦值給 cleanMe
非公平模式
非公平模式transfer方法如下:
E transfer(E e, boolean timed, long nanos) {
SNode s = null; // constructed/reused as needed
int mode = (e == null) ? REQUEST : DATA;
for (;;) {
SNode h = head;
// 棧為空或者當(dāng)前節(jié)點(diǎn)模式與頭節(jié)點(diǎn)模式一樣克婶,將節(jié)點(diǎn)壓入棧內(nèi),等待匹配
if (h == null || h.mode == mode) {
// 超時(shí)
if (timed && nanos <= 0) {
// 節(jié)點(diǎn)被取消了丹泉,向前推進(jìn)
if (h != null && h.isCancelled())
// 重新設(shè)置頭結(jié)點(diǎn)(彈出之前的頭結(jié)點(diǎn))
casHead(h, h.next);
else
return null;
}
// 不超時(shí)
// 生成一個(gè)SNode節(jié)點(diǎn)情萤,并嘗試替換掉頭節(jié)點(diǎn)head (head -> s)
else if (casHead(h, s = snode(s, e, h, mode))) {
// 自旋,等待線程匹配
SNode m = awaitFulfill(s, timed, nanos);
// 返回的m == s 表示該節(jié)點(diǎn)被取消了或者超時(shí)摹恨、中斷了
if (m == s) {
// 清理節(jié)點(diǎn)S筋岛,return null
clean(s);
return null;
}
// 因?yàn)橥ㄟ^(guò)前面一步將S替換成了head,如果h.next == s 晒哄,則表示有其他節(jié)點(diǎn)插入到S前面了,變成了head
// 且該節(jié)點(diǎn)就是與節(jié)點(diǎn)S匹配的節(jié)點(diǎn)
if ((h = head) != null && h.next == s)
// 將s.next節(jié)點(diǎn)設(shè)置為head睁宰,相當(dāng)于取消節(jié)點(diǎn)h肪获、s
casHead(h, s.next);
// 如果是請(qǐng)求則返回匹配的域,否則返回節(jié)點(diǎn)S的域
return (E) ((mode == REQUEST) ? m.item : s.item);
}
}
// 如果棧不為null柒傻,且兩者模式不匹配(h != null && h.mode != mode)
// 說(shuō)明他們是一隊(duì)對(duì)等匹配的節(jié)點(diǎn)孝赫,嘗試用當(dāng)前節(jié)點(diǎn)s來(lái)滿足h節(jié)點(diǎn)
else if (!isFulfilling(h.mode)) {
// head 節(jié)點(diǎn)已經(jīng)取消了,向前推進(jìn)
if (h.isCancelled())
casHead(h, h.next);
// 嘗試將當(dāng)前節(jié)點(diǎn)打上"正在匹配"的標(biāo)記红符,并設(shè)置為head
else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
// 循環(huán)loop
for (;;) {
// s為當(dāng)前節(jié)點(diǎn)青柄,m是s的next節(jié)點(diǎn),
// m節(jié)點(diǎn)是s節(jié)點(diǎn)的匹配節(jié)點(diǎn)
SNode m = s.next;
// m == null违孝,其他節(jié)點(diǎn)把m節(jié)點(diǎn)匹配走了
if (m == null) {
// 將s彈出
casHead(s, null);
// 將s置空刹前,下輪循環(huán)的時(shí)候還會(huì)新建
s = null;
// 退出該循環(huán),繼續(xù)主循環(huán)
break;
}
// 獲取m的next節(jié)點(diǎn)
SNode mn = m.next;
// 嘗試匹配
if (m.tryMatch(s)) {
// 匹配成功雌桑,將s 喇喉、 m彈出
casHead(s, mn); // pop both s and m
return (E) ((mode == REQUEST) ? m.item : s.item);
} else
// 如果沒(méi)有匹配成功,說(shuō)明有其他線程已經(jīng)匹配了校坑,把m移出
s.casNext(m, mn);
}
}
}
// 到這最后一步說(shuō)明節(jié)點(diǎn)正在匹配階段
else {
// head 的next的節(jié)點(diǎn)拣技,是正在匹配的節(jié)點(diǎn),m 和 h配對(duì)
SNode m = h.next;
// m == null 其他線程把m節(jié)點(diǎn)搶走了耍目,彈出h節(jié)點(diǎn)
if (m == null)
casHead(h, null);
else {
SNode mn = m.next;
if (m.tryMatch(h))
casHead(h, mn);
else
h.casNext(m, mn);
}
}
}
}
整個(gè)處理過(guò)程分為三種情況膏斤,具體如下:
- 如果當(dāng)前棧為空獲取節(jié)點(diǎn)模式與棧頂模式一樣,則嘗試將節(jié)點(diǎn)加入棧內(nèi)邪驮,同時(shí)通過(guò)自旋方式等待節(jié)點(diǎn)匹配莫辨,最后返回匹配的節(jié)點(diǎn)或者null(被取消)
- 如果棧不為空且節(jié)點(diǎn)的模式與首節(jié)點(diǎn)模式匹配,則嘗試將該節(jié)點(diǎn)打上FULFILLING標(biāo)記毅访,然后加入棧中沮榜,與相應(yīng)的節(jié)點(diǎn)匹配,成功后將這兩個(gè)節(jié)點(diǎn)彈出棧并返回匹配節(jié)點(diǎn)的數(shù)據(jù)
- 如果有節(jié)點(diǎn)在匹配喻粹,那么幫助這個(gè)節(jié)點(diǎn)完成匹配和出棧操作蟆融,然后在主循環(huán)中繼續(xù)執(zhí)行
當(dāng)節(jié)點(diǎn)加入棧內(nèi)后,通過(guò)調(diào)用awaitFulfill()方法自旋等待節(jié)點(diǎn)匹配:
SNode awaitFulfill(SNode s, boolean timed, long nanos) {
// 超時(shí)
final long deadline = timed ? System.nanoTime() + nanos : 0L;
// 當(dāng)前線程
Thread w = Thread.currentThread();
// 自旋次數(shù)
// shouldSpin 用于檢測(cè)當(dāng)前節(jié)點(diǎn)是否需要自旋
// 如果棧為空守呜、該節(jié)點(diǎn)是首節(jié)點(diǎn)或者該節(jié)點(diǎn)是匹配節(jié)點(diǎn)型酥,則先采用自旋,否則阻塞
int spins = (shouldSpin(s) ?
(timed ? maxTimedSpins : maxUntimedSpins) : 0);
for (;;) {
// 線程中斷了查乒,取消該節(jié)點(diǎn)
if (w.isInterrupted())
s.tryCancel();
// 匹配節(jié)點(diǎn)
SNode m = s.match;
// 如果匹配節(jié)點(diǎn)m不為空弥喉,則表示匹配成功,直接返回
if (m != null)
return m;
// 超時(shí)
if (timed) {
nanos = deadline - System.nanoTime();
// 節(jié)點(diǎn)超時(shí)玛迄,取消
if (nanos <= 0L) {
s.tryCancel();
continue;
}
}
// 自旋;每次自旋的時(shí)候都需要檢查自身是否滿足自旋條件档桃,滿足就 - 1,否則為0
if (spins > 0)
spins = shouldSpin(s) ? (spins-1) : 0;
// 第一次阻塞時(shí)憔晒,會(huì)將當(dāng)前線程設(shè)置到s上
else if (s.waiter == null)
s.waiter = w;
// 阻塞 當(dāng)前線程
else if (!timed)
LockSupport.park(this);
// 超時(shí)
else if (nanos > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanos);
}
}
awaitFulfill()方法會(huì)一直自旋/阻塞直到匹配節(jié)點(diǎn)藻肄。在S節(jié)點(diǎn)阻塞之前會(huì)先調(diào)用shouldSpin()方法判斷是否采用自旋方式,為的就是如果有生產(chǎn)者或者消費(fèi)者馬上到來(lái)拒担,就不需要阻塞了嘹屯,在多核條件下這種優(yōu)化是有必要的。同時(shí)在調(diào)用park()阻塞之前會(huì)將當(dāng)前線程設(shè)置到S節(jié)點(diǎn)的waiter上从撼。匹配成功州弟,返回匹配節(jié)點(diǎn)m。
shouldSpin()方法如下:
boolean shouldSpin(SNode s) {
SNode h = head;
return (h == s || h == null || isFulfilling(h.mode));
}
同時(shí)在阻塞過(guò)程中會(huì)一直檢測(cè)當(dāng)前線程是否中斷了低零,如果中斷了婆翔,則調(diào)用tryCancel()方法取消該節(jié)點(diǎn),取消過(guò)程就是將當(dāng)前節(jié)點(diǎn)的math設(shè)置為當(dāng)前節(jié)點(diǎn)掏婶。所以如果線程中斷了啃奴,那么在返回m時(shí)一定是S節(jié)點(diǎn)自身。
void tryCancel() {
UNSAFE.compareAndSwapObject(this, matchOffset, null, this);
}
awaitFullfill()方法如果返回的m == s雄妥,則表示當(dāng)前節(jié)點(diǎn)已經(jīng)中斷取消了最蕾,則需要調(diào)用clean()方法,清理節(jié)點(diǎn)S:
void clean(SNode s) {
// 清理item域
s.item = null;
// 清理waiter域
s.waiter = null;
// past節(jié)點(diǎn)
SNode past = s.next;
if (past != null && past.isCancelled())
past = past.next;
// 從棧頂head節(jié)點(diǎn)老厌,取消從棧頂head到past節(jié)點(diǎn)之間所有已經(jīng)取消的節(jié)點(diǎn)
// 注意:這里如果遇到一個(gè)節(jié)點(diǎn)沒(méi)有取消瘟则,則會(huì)退出while
SNode p;
while ((p = head) != null && p != past && p.isCancelled())
casHead(p, p.next); // 如果p節(jié)點(diǎn)已經(jīng)取消了,則剔除該節(jié)點(diǎn)
// 如果經(jīng)歷上面while p節(jié)點(diǎn)還沒(méi)有取消枝秤,則再次循環(huán)取消掉所有p 到past之間的取消節(jié)點(diǎn)
while (p != null && p != past) {
SNode n = p.next;
if (n != null && n.isCancelled())
p.casNext(n, n.next);
else
p = n;
}
}
clean()方法就是將head節(jié)點(diǎn)到S節(jié)點(diǎn)之間所有已經(jīng)取消的節(jié)點(diǎn)全部移出醋拧。【不清楚為何要用兩個(gè)while淀弹,一個(gè)不行么】
至此丹壕,SynchronousQueue的源碼分析完成了,說(shuō)下我個(gè)人感覺(jué)吧:個(gè)人感覺(jué)SynchronousQueue實(shí)現(xiàn)好復(fù)雜(可能是自己智商不夠吧~~~(>_<)~~~)垦页,源碼看了好久雀费,這篇博客寫(xiě)了將近一個(gè)星期,如果有什么錯(cuò)誤之處痊焊,煩請(qǐng)各位指正U蛋馈!