之前寫了SynchronousQueue的源碼解析霎冯,其內(nèi)部實現(xiàn)有兩個數(shù)據(jù)結(jié)構(gòu):一個是棧,一個是FIFO隊列钞瀑,在之前的文章中主要分析了棧(非公平模式)的實現(xiàn)思路而沒有分析隊列的實現(xiàn)沈撞,其實是因為JDK里面本身有LinkedTransferQueue這個隊列的實現(xiàn),所以今天單獨來說下這個隊列的實現(xiàn)思路雕什,主要目的方便自己以后快速的回顧缠俺。
首先我們回顧一下SynchronousQueue里面的TransferStack的實現(xiàn)方式:
1.棧中節(jié)點通過0显晶,1,2來標(biāo)記節(jié)點是獲取數(shù)據(jù)還是生產(chǎn)數(shù)據(jù)壹士,或者是正在匹配磷雇。
2.棧中的節(jié)點稱為waiter(等待者),等待其它線程與其匹配躏救。
3.匹配過程是將匹配節(jié)點入棧唯笙,并設(shè)置節(jié)點操作模式為FULFILLING|mode讓后面的匹配節(jié)點和等待節(jié)點都不能入棧,即達(dá)到了匹配動作的線程安全盒使。
LinkedTransferQueue的實現(xiàn)在邏輯上與TransferStack大致相同崩掘,不同點在于隊列是FIFO的,所以匹配每次操作的都是從頭節(jié)點開始尋找忠怖,直到找到一個未匹配的節(jié)點呢堰,而入隊列的節(jié)點是從隊尾進(jìn)入抄瑟,與隊頭不沖突凡泣,所以在實現(xiàn)上,不會需要像棧一定皮假,將匹配中的節(jié)點壓入棧鞋拟。
LinkedTransferQueue的實現(xiàn)也沒有像SynchronousQueue里面的實現(xiàn)那樣,在transfer(xfer)方法的處理上也有一些不同惹资。在SynchronousQueue的transfer方法上只有3個參數(shù)贺纲,方法實現(xiàn)是通過是否傳入元素e來判斷本次操作是獲取還是生產(chǎn),而在LinkedTransferQueue的xfer方法的實現(xiàn)上褪测,還需要手動的傳入一個haveData參數(shù)來表名本次的操作猴誊。(xfer是jdk1.7寫的,transfer方法是1.8寫的侮措,可以看出作者后面也發(fā)現(xiàn)這個參數(shù)真沒必要)懈叹。
先說說總體首先思路吧:
1.一次操作,先從隊列頭開始檢查分扎,看節(jié)點是否已經(jīng)匹配澄成,并且模式是否與本次操作相匹配,如果模式匹配畏吓,則進(jìn)行配對CAS操作墨状。
2.如果頭節(jié)點已經(jīng)被匹配,則循環(huán)查找下一個節(jié)點
3.如果隊列為空菲饼,或者操作模式與隊列中的節(jié)點的模式一樣肾砂,則本次操作構(gòu)造為節(jié)點進(jìn)入隊列成為等待者
4.等待者在隊列中睡眠或者自旋,當(dāng)有調(diào)用者將其喚醒后匹配成功出隊列
接下來是對代碼的詳細(xì)說明:
private E xfer(E e, boolean putData, int how, long nanos) {
//判斷參數(shù)操作模式和e是否匹配宏悦,如果是put通今,則e不為空
if (putData && (e == null))
throw new NullPointerException();
Node s = null; // the node to append, if needed
retry:
for (;;) { // restart on append race
//代碼1:從頭節(jié)點開始遍歷粥谬,直到隊列為空
for (Node h = head, p = h; p != null;) { // find & match first node
boolean nodeIsPutData = p.isData; //頭節(jié)點操作模式(false為消費,true為生產(chǎn))
Object item = p.item; //頭節(jié)點數(shù)據(jù)
//代碼2
//代碼2-1:如果頭節(jié)點的數(shù)據(jù)不等于自己(節(jié)點未被匹配辫塌,見awaitMatch方法代碼1)
//(item != null) == nodeIsPutData 表明節(jié)點的操作模式要與節(jié)點的數(shù)據(jù)對應(yīng)漏策,這樣節(jié)點才是正常的
if (item != p && (item != null) == nodeIsPutData) { // unmatched //如果頭節(jié)點已經(jīng)被匹配,則去到代碼3臼氨,繼續(xù)循環(huán)下一個節(jié)點
if (nodeIsPutData == putData) // can't match //如果頭節(jié)點的模式和本次操作的模式一樣掺喻,則不能匹配,所以跳出循環(huán)來到代碼4(入隊列)
break;
if (p.casItem(item, e)) { // match //匹配成功
//n1(head: h)->n2->n3->n4->n5(p)->n6->n7(tail) 假如找到n5才匹配上,則說明n1,n2,n3,n4都已經(jīng)
//
for (Node q = p; q != h;) {
Node n = q.next; // update by 2 unless singleton
//將頭節(jié)點指向p節(jié)點(p為尾節(jié)點)或者指向p.next储矩,從而p之前的那些節(jié)點出隊列
//這里為什么不是直接指向p.next感耙?為什么有可能指向p,p不是已經(jīng)被匹配了么 持隧?
//因為如果讓head指向null的話還需要cas改變tail即硼,兩個cas沒辦法保證原子性),否則指向q的下一個節(jié)點屡拨。這里在代碼3處得到補(bǔ)償
if (head == h && casHead(h, n == null ? q : n)) {
h.forgetNext(); //將next指向自己只酥,這里印證了代碼3
break; //
} // advance and retry
//重新判斷隊列里面是否至少還有2個節(jié)點,并且第二個是已經(jīng)被匹配了的呀狼,這種情況裂允,再次進(jìn)入循環(huán),嘗試將已經(jīng)匹配的節(jié)點出隊列哥艇。
//if((h = head) != null && (q = h.next) != null && q.isMatched()){
// continue;
//}else break;
if ((h = head) == null ||
(q = h.next) == null ||
!q.isMatched())
break; // unless slack < 2
}
//代碼2-2
//喚醒p節(jié)點绝编,節(jié)點從awaitMatch方法中醒來,然后返回配對的值
LockSupport.unpark(p.waiter);
return LinkedTransferQueue.<E>cast(item);
}
}
//代碼3貌踏,如果p!=p.next(p沒有出隊列)十饥,則取下一個節(jié)點繼續(xù)嘗試,否則從“頭”開始
Node n = p.next;
p = (p != n) ? n : (h = head); // Use head if p offlist
}
//代碼4:如果入隊節(jié)點模式與等待者相同祖乳,或者隊列中已經(jīng)沒有待匹配的等待者
//NOW代表不等待的poll操作逗堵,除此之外
if (how != NOW) { // No matches available
if (s == null)
s = new Node(e, putData); //本次操作構(gòu)造節(jié)點
Node pred = tryAppend(s, putData); //嘗試入隊列,詳情見tryAppend方法
if (pred == null)
continue retry; //入隊失敗凡资,從新嘗試 // lost race vs opposite mode
//ASYNC代表offer, put, add
//如果入隊列成功砸捏,并且是take和timed poll方法,則進(jìn)入等待
if (how != ASYNC) //SYNC / TIMED
//讓當(dāng)前節(jié)點進(jìn)入睡眠狀態(tài)隙赁,等待配對者將其喚醒
return awaitMatch(s, pred, e, (how == TIMED), nanos);
}
return e; // not waiting
}
}
/**
將本次操作封裝成隊列節(jié)點入隊
入隊列邏輯:
1.判斷隊列是否為空垦藏,為空則直接入
2.從隊尾入手,判斷隊尾節(jié)點的狀態(tài)是否與本次操作相同伞访,如果不同則不能入隊列
2.1.如果能入隊列掂骏,則判斷循環(huán)中的p節(jié)點是否還是尾節(jié)點,如果不是厚掷,則繼續(xù)往下找弟灼,直到尾節(jié)點
3.如果p是尾節(jié)點级解,則cas將s節(jié)點入隊列,如果cas失敗田绑,則p重新定位為next勤哗,又重新嘗試
4.如果入隊成功
**/
private Node tryAppend(Node s, boolean putData) {
for (Node t = tail, p = t;;) { // move p to last node and append
Node n, u; // temps for reads of next & tail
//1.隊列為空,將當(dāng)前節(jié)點設(shè)置為頭節(jié)點
if (p == null && (p = head) == null) {
if (casHead(null, s))
return s; // initialize
}
//2.來到這里說明隊列不為空掩驱,此時理論上p!=null
//p節(jié)點未被匹配芒划,并且模式與putData相反,則說明s節(jié)點無法接入隊列
else if (p.cannotPrecede(putData))
return null; // lost race vs opposite mode
//2.1 如果已經(jīng)有新節(jié)點入隊列了欧穴,或者隊尾出隊了(p.next == p)
else if ((n = p.next) != null) // not last; keep traversing
// tail發(fā)生了變化民逼,則指向新的tail(u),否則看老的tail節(jié)點是否出隊列,如果未出涮帘,則讓p指向p.next,如果已出隊列拼苍,則說明隊列空了,則讓p指向null重新嘗試
//讓t每次都指向最新的tail调缨,而如果p!=t疮鲫,則說明p不是尾節(jié)點, 如果t已經(jīng)等于了tail同蜻,則單獨設(shè)置p(如果p!=p.next,則p就指向next棚点,否則p指向null早处,重新從head開始)
//如果t不等于tail湾蔓,則將t指向tail
//這里為什么要這么復(fù)雜? 因為有代碼3.可知砌梆,節(jié)點入隊的保證是casNext, 如果保證線程安全的入隊默责,不能只拿到tail就append,因為有可能老的tail已經(jīng)不是隊尾了咸包,
//所以必須一直遍歷到p.next等于null才能入隊列桃序,并且如果發(fā)現(xiàn)p已經(jīng)出隊列,則將p指向null,然后代碼來到1.處烂瘫,從“頭”開始往后遍歷媒熊。
//為什么從頭開始能保證線程安全呢? 因為節(jié)點出隊列是casHead來保證的坟比。
p = (p != t) && t != (u = tail) ?
(t = u) :
((p != n) ? n : null; )// restart if off list
//3.入隊列
else if (!p.casNext(null, s))
p = p.next; // re-read on CAS failure
//4.如果入隊成功返回前節(jié)點
else {
if (p != t) { // update if slack now >= 2
while ((tail != t || !casTail(t, s)) &&
(t = tail) != null &&
(s = t.next) != null &&
(s = s.next) != null &&
s != t);
// advance and retry
}
return p;
}
}// for over
}
//返回true芦鳍,說明調(diào)用者節(jié)點不能作為前繼節(jié)點
final boolean cannotPrecede(boolean haveData) {
boolean d = isData; //
Object x;
//如果調(diào)用者操作模式和本次操作模式不相同
//并且調(diào)用者沒有被匹配 x = item) != this && (x != null) == d
return d != haveData && (x = item) != this && (x != null) == d;
}
private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Thread w = Thread.currentThread();
int spins = -1; //讓出CPU的次數(shù) // initialized after first item and cancel checks
ThreadLocalRandom randomYields = null; // bound if needed
//自旋方法:睡眠節(jié)點等待超時,被中斷葛账,或者匹配成功才會返回柠衅。
for (;;) {
Object item = s.item;
//代碼1:當(dāng)節(jié)點數(shù)據(jù)放生變化時,說明節(jié)點已經(jīng)被匹配籍琳,返回匹配值
if (item != e) { // matched
// assert item != s;
s.forgetContents(); // avoid garbage
return LinkedTransferQueue.<E>cast(item);
}
//如果當(dāng)前線程被中斷菲宴,或者線程等待超時(timed && nanos <= 0)并且將自己的item指向自己(在代碼2-1處可以看到贷祈,判斷節(jié)點是否被匹配,是通過判斷item是否等于自己)
if ((w.isInterrupted() || (timed && nanos <= 0)) &&
s.casItem(e, s)) { // cancel
unsplice(pred, s);
return e;
}
//初始化spins==-1,當(dāng)其小于0時喝峦,
if (spins < 0) { // establish spins at/near front
if ((spins = spinsFor(pred, s.isData)) > 0)
randomYields = ThreadLocalRandom.current();
}
//自旋次數(shù)大于0势誊,繼續(xù)
else if (spins > 0) { // spin
--spins;
if (randomYields.nextInt(CHAINED_SPINS) == 0)
Thread.yield(); //隨機(jī)讓出CPU時間 // occasionally yield
}
//自旋完成,準(zhǔn)備進(jìn)入睡眠谣蠢,先設(shè)置等待線程
else if (s.waiter == null) {
s.waiter = w; // request unpark then recheck
}
//如果有時間等待限制键科,判斷是否超過等待時間,如果沒有超過等待時間漩怎,則讓節(jié)點睡眠(一定時間勋颖,節(jié)點超時或者被其它節(jié)點喚醒)
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos > 0L)
LockSupport.parkNanos(this, nanos);
}
else { //如果沒有時間限制,則直接將線程睡眠
LockSupport.park(this);
}
}
}
/**
通過前節(jié)點和當(dāng)前節(jié)點的操作模式來判斷自旋次數(shù)
**/
private static int spinsFor(Node pred, boolean haveData) {
//如果是多核心CPU勋锤,并且前繼節(jié)點存在饭玲,則根據(jù)前繼節(jié)點的情況設(shè)置自旋次數(shù)。
if (MP && pred != null) {
if (pred.isData != haveData) // phase change //沒看懂為什么前節(jié)點的模式和本次操作模式會不相等(因為節(jié)點都已入隊列叁执,說明前節(jié)點和本節(jié)點類型應(yīng)該一定相等才對)
return FRONT_SPINS + CHAINED_SPINS;
if (pred.isMatched()) // probably at front //如果前節(jié)點以及被匹配茄厘,那么自己就是最前面的節(jié)點了,那就使用前節(jié)前的自旋次數(shù)定義
return FRONT_SPINS;
if (pred.waiter == null) // pred apparently spinning //如果前節(jié)點的waiter等于空谈宛,說明前節(jié)點還在自旋次哈,則使用較小的CHAINED_SPINS作為自旋次數(shù)
return CHAINED_SPINS;
}
return 0;
}
最后再總結(jié)一下:
1.隊列中的節(jié)點稱為等待者,等待者首先會自旋一會兒來判斷自己是否被匹配上吆录,自旋完了還沒匹配上就線程睡眠窑滞。
2.判斷一個節(jié)點是否被匹配,直接判斷item跟isData模式上是否能匹配恢筝,但是代碼里面多加了一個item是否指向自己的判斷(我覺得沒必要)哀卫,因為匹配成功的條件就是p.casItem(item, e),見代碼2-1
3.判斷節(jié)點是否出隊列撬槽,需要判斷節(jié)點以及節(jié)點的next是否指向自己此改。
4.等待著發(fā)現(xiàn)自己被匹配上了,會將自己節(jié)點的內(nèi)容清空:即通過forgetContents方法將item指向自己侄柔,將線程指向null.
由于個人能力有限共啃,代碼的理解上可能會存在一定的錯誤,還請各位指正暂题,謝謝移剪。