前言
SynchronousQueue
是一個普通用戶不怎么常用的隊列,通常在創(chuàng)建無界線程池(Executors.newCachedThreadPool()
)的時候使用哪替,也就是那個非常危險的線程池 ^_^
庸娱。
它是一個非常特殊的阻塞隊列,他的模式是:在 offer
的時候,如果沒有另一個線程在 take 或者 poll
的話,就會失敗边篮,反之,如果在 take
或者 poll
的時候奏甫,沒有線程在offer
苟耻,則也會失敗,而這種特性扶檐,則非常適合用來做高響應并且線程不固定的線程池的Queue
。所以胁艰,在很多高性能服務器中款筑,如果并發(fā)很高,這時候腾么,普通的 LinkedQueue
就會成為瓶頸奈梳,性能就會出現(xiàn)毛刺,當換上 SynchronousQueue
后解虱,性能就會好很多攘须。
今天就看看這個特殊的 Queue 是怎么實現(xiàn)的。友情提示:代碼有些小復雜殴泰。于宙。浮驳。請做好心理準備。
源碼實現(xiàn)
SynchronousQueue 內(nèi)部分為公平(隊列)和非公平(棧)捞魁,隊列的性能相對而言會好點至会。構造方法中,就看出來了谱俭。默認是非公平的奉件,通常非公平(棧 FIFO)的性能會高那么一點點。
構造方法:
public SynchronousQueue(boolean fair) {
transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}
offer 方法
該方法我們通常建議使用帶有超時機制的 offer
方法昆著。
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (e == null) throw new NullPointerException();
if (transferer.transfer(e, true, unit.toNanos(timeout)) != null)
return true;
if (!Thread.interrupted())
return false;
throw new InterruptedException();
}
從上面的代碼中县貌,可以看到核心方法就是 transfer
方法。如果該方法返回 true
凑懂,表示煤痕,插入成功,如果失敗征候,就返回 false
杭攻。
poll 方法
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E e = transferer.transfer(null, true, unit.toNanos(timeout));
if (e != null || !Thread.interrupted())
return e;
throw new InterruptedException();
}
同樣的該方法也是調用了 transfer
方法。結果返回得到的值或者null
疤坝。區(qū)別在于兆解,offer
方法的 e
參數(shù)是實體的。而 poll
方法 e
參數(shù)是 null
跑揉,我們猜測锅睛,方法內(nèi)部肯定根據(jù)這個做了判斷。所以历谍,重點在于transfer
方法的實現(xiàn)现拒。
而 transferer 有 2 種,隊列和棧望侈,我們就研究一種印蔬,知曉其原理,另一種有時間在看脱衙。
TransferQueue 源碼實現(xiàn)
構造方法:
TransferQueue() {
QNode h = new QNode(null, false); // initialize to dummy node.
head = h;
tail = h;
}
構造一個 Node 節(jié)點侥猬,注釋說這是一個加的 node。并賦值給 head 和 tail 節(jié)點捐韩。形成一個初始化的鏈表退唠。
看看這個 node:
/** Node class for TransferQueue. */
static final class QNode {
volatile QNode next; // next node in queue
volatile Object item; // CAS'ed to or from null
volatile Thread waiter; // to control park/unpark
final boolean isData;
}
node 持有隊列中下一個 node,node 對應的值 value荤胁,持有該 node 的線程瞧预,擁有 park 或者 unpark,這里用的是 JUC 的工具類 LockSupport,還有一個布爾類型垢油,isData盆驹,這個非常重要,需要好好理解秸苗,到后面我們會好好講解召娜。
我們更關注的是這個類的 transfer 方法,該方法是 SynchronousQueue 的核心惊楼。
該方法接口定義如下:
/**
* Performs a put or take. put 或者 take
*
* @param e if non-null, the item to be handed to a consumer;
* if null, requests that transfer return an item
* offered by producer.
* @param timed if this operation should timeout
* @param nanos the timeout, in nanoseconds
* @return if non-null, the item provided or received; if null,
* the operation failed due to timeout or interrupt --
* the caller can distinguish which of these occurred
* by checking Thread.interrupted.
*/
abstract E transfer(E e, boolean timed, long nanos);
注釋說道 e 參數(shù)的作用:
如果 e 不是 null(說明是生產(chǎn)者調用) 玖瘸,將 item 交給消費者,并返回 e檀咙;反之雅倒,如果是 null(說明是消費者調用),將生產(chǎn)者提供的 item 返回給消費者弧可。
看看 TransferQueue 類的 transfer 方法實現(xiàn)蔑匣,樓主寫了很多的注釋嘗試解讀:
QNode s = null; // constructed/reused as needed
boolean isData = (e != null);// 當輸入的是數(shù)據(jù)時,isData 就是 ture棕诵,表明這個操作是一個輸入數(shù)據(jù)的操作裁良;同理,當調用者輸入的是 null校套,則是在消費數(shù)據(jù)价脾。
for (;;) {
QNode t = tail;
QNode h = head;
if (t == null || h == null) // 如果并發(fā)導致未"來得及"初始化
continue; // 自旋重來
// 以下分成兩個部分進行
// 1. 如果當前操作和 tail 節(jié)點的操作是一樣的;或者頭尾相同(表明隊列中啥都沒有)笛匙。
if (h == t || t.isData == isData) {
QNode tn = t.next;
if (t != tail) // 如果 t 和 tail 不一樣侨把,說明,tail 被其他的線程改了妹孙,重來
continue;
if (tn != null) { // 如果 tail 的 next 不是空秋柄。就需要將 next 追加到 tail 后面了。
advanceTail(t, tn); // 使用 CAS 將 tail.next 變成 tail,
continue;
}
if (timed && nanos <= 0) // 時間到了蠢正,不等待骇笔,返回 null,插入失敗嚣崭,獲取也是失敗的蜘拉。
return null;
if (s == null) // 如果能走到這里,說明 tail 的 next 是 null有鹿,這里的判斷是避免重復創(chuàng)建 Qnode 對象。
s = new QNode(e, isData);// 創(chuàng)建一個新的節(jié)點谎脯。
if (!t.casNext(null, s)) // 嘗試 CAS 將這個剛剛創(chuàng)建的節(jié)點追加到 tail 的 next 節(jié)點上.
continue;// 如果失敗葱跋,則重來
advanceTail(t, s); // 當新的節(jié)點成功追加到 tail 節(jié)點的 next 上了, 就嘗試將 tail.next 節(jié)點覆蓋 tail 節(jié)點,稱之為推進。
// s == 新節(jié)點娱俺,“可能”是新的 tail稍味;e 是實際數(shù)據(jù)。
Object x = awaitFulfill(s, e, timed, nanos);// 該方法作用就是荠卷,讓當前線程等待模庐。排除意外情況和超時的話,就是等待其他線程拿走數(shù)據(jù)并替換成 isData 不同的數(shù)據(jù)油宜。
if (x == s) { // x == s 是什么意思呢掂碱? 表明在 awaitFulfill 方法中,這個數(shù)據(jù)被取消了慎冤,tryCancel 方法就是將 item 覆蓋了 QNode疼燥。說明這次操作失敗了。
clean(t, s);// 操作失敗則需要清理數(shù)據(jù)蚁堤,并返回 null醉者。
return null;
}
// 如果一切順利,確實被其他線程喚醒了披诗,其他線程也交換了數(shù)據(jù)撬即。
// 這個判斷:next != this,說明了什么呈队?當這個 tail 節(jié)點的 next 不再指向自己剥槐,說明了
if (!s.isOffList()) { // not already unlinked
// 這一步是將 S 節(jié)點設置為 Head,并且將新 Head 的 next 指向自己掂咒,讓 Head 和之前的 next 斷開才沧。
advanceHead(t, s); // unlink if head
// 當 x 不是 null,表明對方線程是存放數(shù)據(jù)的绍刮。
if (x != null) // and forget fields
// 這一步操作將自己的 item 設置成自己温圆。
s.item = s;
// 將 S 節(jié)點的持有線程變成 null。
s.waiter = null;
}
// x 不是 null 表明孩革,對方線程是生產(chǎn)者岁歉,返回他生產(chǎn)的數(shù)據(jù);如果是 null膝蜈,說明對方線程是消費者锅移,那他自己就是生產(chǎn)者,返回自己的數(shù)據(jù)饱搏,表示成功非剃。
return (x != null) ? (E)x : e;
}
// 2. 如果當前的操作類型和 tail 的操作不一樣。稱之為互補推沸。
else { // complementary-mode
QNode m = h.next; // node to fulfill
// 如果下方這些判斷沒過备绽,說明并發(fā)修改了券坞,自旋重來。
if (t != tail || m == null || h != head)
continue; // inconsistent read
Object x = m.item;
// 如果 head 節(jié)點的 isData 和當前操作相同肺素,
// 如果 操作不同恨锚,但 head 的 item 就是自身,也就是發(fā)生了取消操作倍靡,tryCancel 方法會做這件事情猴伶。
// 如果上面2個都不滿足,嘗試使用 CAS 將 e 覆蓋 item塌西。
if (isData == (x != null) || // m already fulfilled
x == m || // m cancelled
!m.casItem(x, e)) { // lost CAS
// CAS 失敗了他挎,Head 的操作類型和當前類型相同,item 被取消了雨让,都會走這里雇盖。
// 將 h.next 覆蓋 head。重來栖忠。
advanceHead(h, m); // dequeue and retry
continue;
}
// 這里也是將 h.next 覆蓋 head崔挖。能夠走到這里,說明庵寞,上面的 CAS 操作成功了狸相,當前線程已經(jīng)將 e 覆蓋了 next 的 item 。
advanceHead(h, m); // successfully fulfilled
// 喚醒 next 的 線程捐川。提醒他可以取出數(shù)據(jù)脓鹃,或者“我”已經(jīng)拿到數(shù)據(jù)了。
LockSupport.unpark(m.waiter);
// 如果 x 不是 null古沥,表明這是一次消費數(shù)據(jù)的操作瘸右,反之,這是一次生產(chǎn)數(shù)據(jù)的操作岩齿。
return (x != null) ? (E)x : e;
}
}
說實話太颤,代碼還是比較復雜的。JDK 中注釋是這么說的:
基本算法是死循環(huán)采取 2 種方式中的其中一種盹沈。
1 如果隊列是空的龄章,或者持有相同的模式節(jié)點(isData
相同),就嘗試添加節(jié)點到隊列中乞封,并讓當前線程等待做裙。
2 如果隊列中有線程在等待,那么就使用一種互補
的方式肃晚,使用 CAS 和等待者交換數(shù)據(jù)锚贱。并返回。
什么意思呢关串?
首先明確一點拧廊,隊列中杂穷,數(shù)據(jù)有 2 種情況(但同時只存在一種),要么QNode
中有實際數(shù)據(jù)(offer
的時候卦绣,是有數(shù)據(jù)的,但沒有“人”來确沈尽)滤港,要么沒有實際數(shù)據(jù)(poll
的時候,隊列中沒有數(shù)據(jù)趴拧,線程只好等待)溅漾。隊列在哪一種狀態(tài)取決于他為空后,第一個插入的是什么類型的數(shù)據(jù)
著榴。
樓主畫了點圖來表示:
- 隊列初始化的時候添履,只有一個空的
Node
。
- 此時脑又,一個線程嘗試
offer
或者poll
數(shù)據(jù)暮胧,都會插入一個Node
插入到節(jié)點中。
- 假設剛剛發(fā)生的是 offer 操作问麸,這個時候往衷,另一個線程也來 offer,這時就會有 2 個節(jié)點严卖。
- 這個時候席舍,隊列中有 2 個有真實數(shù)據(jù)(offer 操作)的節(jié)點了,注意哮笆,這個時候来颤,那 2 個線程都是
wait
的,因為沒有人接受他們的數(shù)據(jù)稠肘。此時福铅,又來一個線程,做 poll 操作启具。
從上圖可以看出本讥,poll
線程從head
開始取數(shù)據(jù),因為它的 isData
和 tail
節(jié)點的 isData 不同鲁冯,那么就會從 head 開始找節(jié)點拷沸,并嘗試將自己的 null 值和節(jié)點中的真實數(shù)據(jù)進行交換。并喚醒等待中的線程薯演。
這 4 幅圖就是 SynchronousQueue
的精華撞芍。
既然叫做同步隊列,一定是 A 線程生產(chǎn)數(shù)據(jù)的時候跨扮,有 B 線程在消費序无,否則 A 線程就需要等待验毡,反之,如果 A 線程準備消費數(shù)據(jù)帝嗡,但隊列中沒有數(shù)據(jù)晶通,線程也會等待,直到有 B 線程存放數(shù)據(jù)哟玷。
而 JDK 的實現(xiàn)原理則是:使用一個隊列狮辽,隊列中的用一個 isData
來區(qū)分生產(chǎn)還是消費,所有新操作都根據(jù) tail 節(jié)點的模式來決定到底是追加到 tail
節(jié)點還是和 tail
節(jié)點(從 head
開始)交換數(shù)據(jù)巢寡。
而所謂的交換是從head
開始喉脖,取出節(jié)點的實際數(shù)據(jù),然后使用 CAS
和匹配到的節(jié)點進行交換抑月。從而完成兩個線程直接交換數(shù)據(jù)的操作树叽。
為什么他在某些情況下,比LinkedBlockingQueue
性能高呢谦絮?其中有個原因就是沒有使用鎖题诵,減少了線程上下文切換。第二則是線程之間交換數(shù)據(jù)的方式更加的高效挨稿。
好仇轻,重點部分講完了,再看看其中線程是如何等待的奶甘。邏輯在 awaitFulfill
方法中:
// 自旋或者等待篷店,直到填充完畢
// 這里的策略是什么呢?如果自旋次數(shù)不夠了臭家,通常是 16 次疲陕,但還有超過 1 秒的時間,就阻塞等待被喚醒钉赁。
// 如果時間到了蹄殃,就取消這次的入隊行為。
// 返回的是 Node 本身
// s.item 就是 e
Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Thread w = Thread.currentThread();
int spins = ((head.next == s) ?// 如果成功將 tail.next 覆蓋了 tail你踩,如果有超時機制诅岩,則自旋 32 次,如果沒有超時機制带膜,則自旋 32 *16 = 512次
(timed ? maxTimedSpins : maxUntimedSpins) : 0);
for (;;) {
if (w.isInterrupted())// 當前線程被中斷
s.tryCancel(e);// 嘗試取消這個 item
Object x = s.item;// 獲取到這個 tail 的 item
if (x != e) // 如果不相等吩谦,說明 node 中的 item 取消了,返回這個 item膝藕。
// 這里是唯一停止循環(huán)的地方式廷。當 s.item 已經(jīng)不是當初的哪個 e 了,說明要么是時間到了被取消了芭挽,要么是線程中斷被取消了滑废。
// 當然蝗肪,不僅僅只有這2種 “意外” 情況,還有一種情況是:當另一個線程拿走了這個數(shù)據(jù)蠕趁,并修改了 item薛闪,也會通過這個判斷,返回被“修改”過的 item俺陋。
return x;
if (timed) {// 如果有時間限制
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {// 如果時間到了
s.tryCancel(e);// 嘗試取消 item逛绵,供上面的 x != e 判斷
continue;// 重來
}
}
if (spins > 0)// 如果還有自旋次數(shù)
--spins;// 減一
else if (s.waiter == null)// 如果自旋不夠,且 tail 的等待線程還沒有賦值
s.waiter = w;// 當前線程賦值給 tail 的等待線程
else if (!timed)// 如果自旋不夠倔韭,且如果線程賦值過了,且沒有限制時間瓢对,則 wait寿酌,(危險操作)
LockSupport.park(this);
else if (nanos > spinForTimeoutThreshold)// 如果自旋不夠,且如果限制了時間硕蛹,且時間還剩余超過 1 秒醇疼,則 wait 剩余時間。
// 主要目的就是等待法焰,等待其他線程喚醒這個節(jié)點所在的線程秧荆。
LockSupport.parkNanos(this, nanos);
}
}
該方法邏輯如下:
- 默認自旋 32 次,如果沒有超時機制埃仪,則 512 次乙濒。
- 如果時間到了,或者線程被中斷卵蛉,則取消這次的操作颁股,將
item
設置成自己。供后面判斷傻丝。 - 如果自旋結束甘有,且剩余時間還超過 1 秒窒盐,則阻塞等待至剩余時間秆吵。
- 當線程被其他的線程喚醒,說明數(shù)據(jù)被交換了渤弛。則
return
泛释,返回的是交換后的數(shù)據(jù)滤愕。
總結
好了,關于 SynchronousQueue
的核心源碼分析就到這里了胁澳,樓主沒有分析這個類的所有源碼该互,只研究了核心部分代碼,這足夠我們理解這個 Queue
的內(nèi)部實現(xiàn)了韭畸。
總結下來就是:
JDK 使用了隊列或者棧來實現(xiàn)公平或非公平模型宇智。其中蔓搞,isData
屬性極為重要,標識這這個線程的這次操作随橘,決定了他到底應該是追加到隊列中喂分,還是從隊列中交換數(shù)據(jù)。
每個線程在沒有遇到自己的另一半時机蔗,要么快速失敗蒲祈,要么進行阻塞,阻塞等待自己的另一半來萝嘁,至于對方是給數(shù)據(jù)還是取數(shù)據(jù)梆掸,取決于她自己,如果她是消費者牙言,那么他就是生產(chǎn)者酸钦。
good luck!T弁鳌1傲颉!