LinkedTransferQueue 是單向鏈表結(jié)構(gòu)的無界阻塞隊列秤涩, 從JDK1.7開始加入到J.U.C的行列中禀苦。通過 CAS 和 LockSupport 實現(xiàn)線程安全矫俺,元素操作按照 FIFO (first-in-first-out 先入先出) 的順序呻澜。內(nèi)存一致性遵循對LinkedTransferQueue的插入操作先行發(fā)生于(happen-before)訪問或移除操作滥崩。相對于其他傳統(tǒng) Queue据忘,LinkedTransferQueue 有它獨特的性質(zhì)鹦牛,本章將對其進行詳細的講解。
概述
LinkedTransferQueue(后稱LTQ) 采用一種預(yù)占模式勇吊。意思就是消費者線程取元素時曼追,如果隊列為空,那就生成一個節(jié)點(節(jié)點元素為null)入隊汉规,然后消費者線程被等待在這個節(jié)點上礼殊,后面生產(chǎn)者線程入隊時發(fā)現(xiàn)有一個元素為null的節(jié)點,生產(chǎn)者線程就不入隊了针史,直接就將元素填充到該節(jié)點晶伦,并喚醒該節(jié)點等待的線程,被喚醒的消費者線程取走元素悟民,從調(diào)用的方法返回坝辫。我們稱這種節(jié)點操作為“匹配”方式。
LTQ的算法實現(xiàn)可以總結(jié)為以下幾點:
雙重隊列:
和典型的單向鏈表結(jié)構(gòu)不同射亏,LTQ 的 Node 存儲了一個isData
的 boolean 型字段近忙,也就是說它的節(jié)點可以代表一個數(shù)據(jù)或者是一個請求竭业,稱為雙重隊列(Dual Queue)。上面說過及舍,在消費者獲取元素時未辆,如果隊列為空,當前消費者就會作為一個“元素為null”的節(jié)點被放入隊列中等待锯玛,所以 LTQ中 的節(jié)點存儲了生產(chǎn)者節(jié)點(item不為null)和消費者節(jié)點(item為null)咐柜,這兩種節(jié)點就是通過isData
來區(qū)分的。松弛度:
為了節(jié)省 CAS 操作的開銷攘残,LTQ 引入了“松弛度”的概念:在節(jié)點被匹配(被刪除)之后拙友,不會立即更新head/tail,而是當 head/tail 節(jié)點和最近一個未匹配的節(jié)點之間的距離超過一個“松弛閥值”之后才會更新(在 LTQ 中歼郭,這個值為 2)遗契。這個“松弛閥值”一般為1-3,如果太大會降低緩存命中率病曾,并且會增加遍歷鏈的長度牍蜂;太小會增加 CAS 的開銷。節(jié)點自鏈接:
已匹配節(jié)點的 next 引用會指向自身泰涂。
如果GC延遲回收鲫竞,已刪除節(jié)點鏈會積累的很長,此時垃圾收集會耗費高昂的代價逼蒙,并且所有剛匹配的節(jié)點也不會被回收从绘。為了避免這種情況,我們在 CAS 向后推進 head 時其做,會把已匹配的 head 的"next"引用指向自身(即“自鏈接節(jié)點”)顶考,這樣就限制了連接已刪除節(jié)點的長度(我們也采取類似的方法赁还,清除在其他節(jié)點字段中可能的垃圾保留值)妖泄。如果在遍歷時遇到一個自鏈接節(jié)點,那就表明當前線程已經(jīng)滯后于另外一個更新 head 的線程艘策,此時就需要重新獲取 head 來遍歷蹈胡。
所以,在 LTQ 中朋蔫,數(shù)據(jù)在某個線程的“某一時刻”可能存在下面這種形式:
unmatched node:未被匹配的節(jié)點罚渐。可能是一個生產(chǎn)者節(jié)點(item不為null)驯妄,也可能是一個消費者節(jié)點(item為null)荷并。
matched node:已經(jīng)被匹配的節(jié)點∏嗳樱可能是一個生產(chǎn)者節(jié)點(item不為null)的數(shù)據(jù)已經(jīng)被一個消費者拿走源织;也可能是一個消費者節(jié)點(item為null)已經(jīng)被一個生產(chǎn)者填充上數(shù)據(jù)翩伪。
數(shù)據(jù)結(jié)構(gòu)
LTQ 繼承自AbstractQueue,支持傳統(tǒng)Queue的所有操作谈息;實現(xiàn)了 TransferQueue 接口缘屹,并且是 TransferQueue 的唯一實現(xiàn),TransferQueue 定義了一種“預(yù)占模式”侠仇,允許消費者在節(jié)點上等待轻姿,直到生產(chǎn)者把元素放入節(jié)點。
核心參數(shù)
//隊列頭節(jié)點逻炊,第一次入列之前為空
transient volatile Node head;
//隊列尾節(jié)點互亮,第一次添加節(jié)點之前為空
private transient volatile Node tail;
//累計到一定次數(shù)再清除無效node
private transient volatile int sweepVotes;
//當一個節(jié)點是隊列中的第一個waiter時,在多處理器上進行自旋的次數(shù)(隨機穿插調(diào)用thread.yield)
private static final int FRONT_SPINS = 1 << 7;
// 當前繼節(jié)點正在處理余素,當前節(jié)點在阻塞之前的自旋次數(shù)胳挎,也為FRONT_SPINS
// 的位變化充當增量,也可在自旋時作為yield的平均頻率
private static final int CHAINED_SPINS = FRONT_SPINS >>> 1;
//sweepVotes的閥值
static final int SWEEP_THRESHOLD = 32;
/*
* Possible values for "how" argument in xfer method.
* xfer方法類型
*/
private static final int NOW = 0; // for untimed poll, tryTransfer
private static final int ASYNC = 1; // for offer, put, add
private static final int SYNC = 2; // for transfer, take
private static final int TIMED = 3; // for timed poll, tryTransfer
這里我們重點說一下sweepVotes這個屬性溺森,其他的都很簡單慕爬,就不一一介紹了。
上面我們提到屏积,head/tail 節(jié)點并不是及時更新的医窿,在并發(fā)操作時鏈表內(nèi)部可能存在已匹配節(jié)點,此時就需要一個閥值來決定何時清除已匹配的內(nèi)部節(jié)點鏈炊林,這就是sweepVotes
和SWEEP_THRESHOLD
的作用姥卢。
我們通過節(jié)點自鏈接的方式來減少垃圾滯留,同樣也會解除內(nèi)部已移除節(jié)點的鏈接渣聚。在匹配超時独榴、線程中斷或調(diào)用remove
時,這也些節(jié)點也會被清除(解除鏈接)奕枝。例如棺榔,在某一時刻有一個節(jié)點 s 已經(jīng)被移除,我們可以通過 CAS 修改 s 的前繼節(jié)點的 next 引用的方式來解除 s 的鏈接隘道。 但是有兩種情況并不能保證節(jié)點 s 被解除鏈接:
1. 如果 s 節(jié)點是一個 next 為 null 的節(jié)點(trailing node)症歇,但是它被作為入列時的目標節(jié)點,所以只有在其他節(jié)點入列之后才能移除它
2. 通過給定 s 的前繼節(jié)點谭梗,不一定會移除 s 節(jié)點:因為前繼節(jié)點有可能已經(jīng)被解除鏈接忘晤,這種情況下前繼節(jié)點的前繼節(jié)點有可能指向了s。
所以激捏,通過這兩點设塔,說明在 s 節(jié)點或它的前繼節(jié)點已經(jīng)出列時,并不是必須要移除它們远舅。對于這些情況闰蛔,我們記錄了一個解除節(jié)點鏈接失敗的值-sweepVotes竞思,并且為其定義了一個閥值-SWEEP_THRESHOLD,當解除鏈接失敗次數(shù)超過這個閥值時就會對隊列進行一次“大掃除”(通過sweep()
方法)钞护,解除所有已取消的節(jié)點鏈接盖喷。
xfer方法類型:
在 LTQ 中,所有的入隊/出隊操作都是通過xfer
方法來控制难咕,并且通過一個類型區(qū)分offer, put, poll, take, transfer
课梳,這樣做大大簡化了代碼。來看一下xfer
的方法類型:
NOW
:不等待余佃,直接返回匹配結(jié)果暮刃。用在poll, tryTransfer
中。
ASYNC
:異步操作爆土,直接把元素添加到隊列尾椭懊,不等待匹配。用在offer, put, add
中步势。
SYNC
:等待元素被消費者接收氧猬。用在transfer, take
中。
TIMED
:附帶超時時間的NOW
坏瘩,等待指定時間后返回匹配結(jié)果盅抚。用在附帶超時時間的poll, tryTransfer
中。
源碼解析
由于 LTQ 的入列/出列方法都是由xfer
來實現(xiàn)倔矾,所以我們這里只對xfer
進行解析妄均。
xfer(E e, boolean haveData, int how, long nanos)
/**
* Implements all queuing methods. See above for explanation.
*
* @param e the item or null for take
* @param haveData true if this is a put, else a take
* @param how NOW, ASYNC, SYNC, or TIMED
* @param nanos timeout in nanosecs, used only if mode is TIMED
* @return an item if matched, else e
* @throws NullPointerException if haveData mode but e is null
*/
private E xfer(E e, boolean haveData, int how, long nanos) {
if (haveData && (e == null))
throw new NullPointerException();
Node s = null; // the node to append, if needed
retry:
for (;;) { // restart on append race
//從head開始向后匹配
for (Node h = head, p = h; p != null;) { // find & match first node
boolean isData = p.isData;
Object item = p.item;
if (item != p && (item != null) == isData) { // 找到有效節(jié)點,進入匹配
if (isData == haveData) //節(jié)點與此次操作模式一致哪自,無法匹配 can't match
break;
if (p.casItem(item, e)) { // 匹配成功丰包,cas修改為指定元素 match
for (Node q = p; q != h;) {
Node n = q.next; // update by 2 unless singleton
if (head == h && casHead(h, n == null ? q : n)) {//更新head為匹配節(jié)點的next節(jié)點
h.forgetNext();//舊head節(jié)點指向自身等待回收
break;
} // cas失敗,重新獲取head advance and retry
if ((h = head) == null ||
(q = h.next) == null || !q.isMatched())//如果head的next節(jié)點未被匹配壤巷,跳出循環(huán)邑彪,不更新head,也就是松弛度<2
break; // unless slack < 2
}
LockSupport.unpark(p.waiter);//喚醒在節(jié)點上等待的線程
return LinkedTransferQueue.<E>cast(item);
}
}
//匹配失敗隙笆,繼續(xù)向后查找節(jié)點
Node n = p.next;
p = (p != n) ? n : (h = head); // Use head if p offlist
}
//未找到匹配節(jié)點锌蓄,把當前節(jié)點加入到隊列尾
if (how != NOW) { // No matches available
if (s == null)
s = new Node(e, haveData);
//將新節(jié)點s添加到隊列尾并返回s的前繼節(jié)點
Node pred = tryAppend(s, haveData);
if (pred == null)
continue retry; //與其他不同模式線程競爭失敗重新循環(huán) lost race vs opposite mode
if (how != ASYNC)//同步操作升筏,等待匹配
return awaitMatch(s, pred, e, (how == TIMED), nanos);
}
return e; // not waiting
}
}
說明:xfer
的基本流程如下:
- 從head開始向后匹配撑柔,找到一個節(jié)點模式跟本次操作的模式不同的未匹配的節(jié)點(生產(chǎn)或消費)進行匹配;
- 匹配節(jié)點成功 CAS 修改匹配節(jié)點的 item 為給定元素 e您访;
- 如果此時所匹配節(jié)點向后移動铅忿,則 CAS 更新 head 節(jié)點為匹配節(jié)點的 next 節(jié)點,舊 head 節(jié)點鏈接指向自身等待被回收(
forgetNext()
方法)灵汪;如果CAS 失敗檀训,并且松弛度大于等于2柑潦,就需要重新獲取 head 重試。 - 匹配成功峻凫,喚醒匹配節(jié)點 p 的等待線程
waiter
渗鬼,返回匹配的 item。 - 如果在上述操作中沒有找到匹配節(jié)點荧琼,則根據(jù)參數(shù)
how
做不同的處理:
NOW:立即返回譬胎。
SYNC:通過tryAppend
方法插入一個新的節(jié)點 s(item=e,isData = haveData
)到隊列尾,然后自旋或阻塞當前線程直到節(jié)點被匹配或者取消返回命锄。
ASYNC:通過tryAppend
方法插入一個新的節(jié)點 s(item=e,isData = haveData
)到隊列尾堰乔,異步直接返回。
TIMED:通過tryAppend
方法插入一個新的節(jié)點 s(item=e,isData = haveData
)到隊列尾脐恩,然后自旋或阻塞當前線程直到節(jié)點被匹配或者取消或等待超時返回镐侯。
tryAppend(Node s, boolean haveData)
/**
* Tries to append node s as tail.
* 嘗試添加給定節(jié)點s作為尾節(jié)點
*
* @param s the node to append
* @param haveData true if appending in data mode
* @return null on failure due to losing race with append in
* different mode, else s's predecessor, or s itself if no
* predecessor
*/
private Node tryAppend(Node s, boolean haveData) {
for (Node t = tail, p = t;;) { // move p to last node and append
Node n, u; // temps for reads of next & tail
if (p == null && (p = head) == null) {//head和tail都為null
if (casHead(null, s))//修改head為新節(jié)點s
return s; // initialize
}
else if (p.cannotPrecede(haveData))
return null; // lost race vs opposite mode
else if ((n = p.next) != null) // not last; keep traversing
p = p != t && t != (u = tail) ? (t = u) : // stale tail
(p != n) ? n : null; // restart if off list
else if (!p.casNext(null, s))
p = p.next; // re-read on CAS failure
else {
if (p != t) { // update if slack now >= 2
while ((tail != t || !casTail(t, s)) &&
(t = tail) != null &&
(s = t.next) != null && // advance and retry
(s = s.next) != null && s != t);
}
return p;
}
}
}
說明:添加給定節(jié)點 s 到隊列尾并返回 s 的前繼節(jié)點,失敗時(與其他不同模式線程競爭失斒幻啊)返回null苟翻,沒有前繼節(jié)點返回自身。
awaitMatch(Node s, Node pred, E e, boolean timed, long nanos)
/**
* Spins/yields/blocks until node s is matched or caller gives up.
* 自旋/讓步/阻塞,直到給定節(jié)點s匹配到或放棄匹配
*
* @param s the waiting node
* @param pred the predecessor of s, or s itself if it has no
* predecessor, or null if unknown (the null case does not occur
* in any current calls but may in possible future extensions)
* @param e the comparison value for checking match
* @param timed if true, wait only until timeout elapses
* @param nanos timeout in nanosecs, used only if timed is true
* @return matched item, or e if unmatched on interrupt or timeout
*/
private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Thread w = Thread.currentThread();
//在首個item和取消檢查后初始
int spins = -1; // initialized after first item and cancel checks
ThreadLocalRandom randomYields = null; // bound if needed
for (;;) {
Object item = s.item;
if (item != e) { //matched
// assert item != s;
s.forgetContents(); // avoid garbage
return LinkedTransferQueue.<E>cast(item);
}
if ((w.isInterrupted() || (timed && nanos <= 0)) &&
s.casItem(e, s)) { //取消匹配骗污,item指向自身 cancel
unsplice(pred, s);//解除s節(jié)點和前繼節(jié)點的鏈接
return e;
}
if (spins < 0) { // establish spins at/near front
if ((spins = spinsFor(pred, s.isData)) > 0)
randomYields = ThreadLocalRandom.current();
}
else if (spins > 0) { // spin
--spins;
if (randomYields.nextInt(CHAINED_SPINS) == 0)
Thread.yield(); //不定期讓步袜瞬,給其他線程執(zhí)行機會 occasionally yield
}
else if (s.waiter == null) {
s.waiter = w; // request unpark then recheck
}
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos > 0L)
LockSupport.parkNanos(this, nanos);
}
else {
LockSupport.park(this);
}
}
}
說明:當前操作為同步操作時,會調(diào)用awaitMatch
方法阻塞等待匹配身堡,成功返回匹配節(jié)點 item邓尤,失敗返回給定參數(shù)e(s.item)。在等待期間如果線程被中斷或等待超時贴谎,則取消匹配汞扎,并調(diào)用unsplice
方法解除節(jié)點s
和其前繼節(jié)點的鏈接。
/**
* Unsplices (now or later) the given deleted/cancelled node with
* the given predecessor.
*
* 解除給定已經(jīng)被刪除/取消節(jié)點和前繼節(jié)點的鏈接(可能延遲解除)
* @param pred a node that was at one time known to be the
* predecessor of s, or null or s itself if s is/was at head
* @param s the node to be unspliced
*/
final void unsplice(Node pred, Node s) {
s.forgetContents(); // forget unneeded fields
if (pred != null && pred != s && pred.next == s) {
Node n = s.next;
if (n == null ||
(n != s && pred.casNext(s, n) && pred.isMatched())) {//解除s節(jié)點的鏈接
for (;;) { // check if at, or could be, head
Node h = head;
if (h == pred || h == s || h == null)
return; // at head or list empty
if (!h.isMatched())
break;
Node hn = h.next;
if (hn == null)
return; // now empty
if (hn != h && casHead(h, hn))//更新head
h.forgetNext(); // advance head
}
if (pred.next != pred && s.next != s) { // recheck if offlist
for (;;) { // sweep now if enough votes
int v = sweepVotes;
if (v < SWEEP_THRESHOLD) {
if (casSweepVotes(v, v + 1))
break;
}
else if (casSweepVotes(v, 0)) {//達到閥值擅这,進行"大掃除"澈魄,清除隊列中的無效節(jié)點
sweep();
break;
}
}
}
}
}
}
說明:首先把給定節(jié)點s
的next引用指向自身,如果s
的前繼節(jié)點pred
還是指向s
(pred.next == s
)仲翎,嘗試解除s
的鏈接痹扇,把pred
的 next 引用指向s
的 next 節(jié)點。如果s
不能被解除(由于它是尾節(jié)點或者pred
可能被解除鏈接溯香,并且pred
和s
都不是head
節(jié)點或已經(jīng)出列)鲫构,則添加到sweepVotes
,sweepVotes
累計到閥值SWEEP_THRESHOLD
之后就調(diào)用sweep()
對隊列進行一次“大掃除”玫坛,清除隊列中所有的無效節(jié)點结笨。sweep()
源碼如下:
/**
* Unlinks matched (typically cancelled) nodes encountered in a
* traversal from head.
* 解除(通常是取消)從頭部遍歷時遇到的已經(jīng)被匹配的節(jié)點的鏈接
*/
private void sweep() {
for (Node p = head, s, n; p != null && (s = p.next) != null; ) {
if (!s.isMatched())
// Unmatched nodes are never self-linked
p = s;
else if ((n = s.next) == null) // trailing node is pinned
break;
else if (s == n) // stale
// No need to also check for p == s, since that implies s == n
p = head;
else
p.casNext(s, n);
}
}
小結(jié)
本章重點:理解 LinkedTransferQueue 的特性:雙重隊列、松弛度、節(jié)點的移除操作炕吸。
在 ConcurrentLinkedQueue 伐憾、 ConcurrentLinkeDeque 以及 SynchronousQueue 中都用到了 LinkedTransferQueue 的某些特性,如果同學們對它們感興趣赫模,理解本章對之后的源碼解析會有很大的幫助树肃。