同步隊列:它繼承了一般的AbstractQueue和實現(xiàn)了BlockingQueue接口痘拆。它與其它的BlockingQueue最大的區(qū)別就在它不存儲任何數(shù)據(jù)狱杰,它的內(nèi)部是一個棧(非公平)或者一個隊列(公平策略)用來存儲訪問SynchronousQueue的線程憔披,而訪問它的線程有消費(fèi)者和生產(chǎn)者柱搜,對應(yīng)于方法put和take辩诞。當(dāng)一個生產(chǎn)者或者消費(fèi)者試圖訪問SynchronousQueue的時候课幕,如果找不到與之能夠配對的消費(fèi)者或者生產(chǎn)者锨并,則當(dāng)前線程會阻塞露该,直到對應(yīng)的線程將其喚醒,或者等待超時第煮,或者中斷解幼。
以下是它的put和take方法的實現(xiàn)抑党,不管是put還是take,其核心都是調(diào)用transferer對象的transfer方法撵摆,所以要弄明白SynchronousQueue底靠,就需要弄清楚Transferer。
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
if (transferer.transfer(e, false, 0) == null) {
Thread.interrupted();
throw new InterruptedException();
}
}
public E take() throws InterruptedException {
E e = transferer.transfer(null, false, 0);
if (e != null)
return e;
Thread.interrupted();
throw new InterruptedException();
}
我們先從整體上來看一下SynchronousQueue的內(nèi)部結(jié)構(gòu):
從類圖上可以看出特铝,SynchronousQueue內(nèi)部引用了一個Transfer暑中,而Transfer的實現(xiàn)有兩種,一個是TransferStack鲫剿,一個是TransferQueue.
今天我們分析的重點(diǎn)對象是TransferStack.
1.先來看下TransferStack是如何初始化的:
//SynchronousQueue的構(gòu)造方法
public SynchronousQueue(boolean fair) {
//如果初始化為非公平策略鳄逾,則使用TransferStack
transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}
2.仔細(xì)看一下TransferStack的實現(xiàn):
static final class TransferStack<E> extends Transferer<E> {
//代表本次transfer的操作是獲取數(shù)據(jù)
static final int REQUEST = 0;
//代筆本次transfer的操作是放入數(shù)據(jù)
static final int DATA = 1;
//代表節(jié)點(diǎn)正在配對
static final int FULFILLING = 2;
//判斷節(jié)點(diǎn)目前是否在匹配中,3&2 == 2(匹配中)
//2&2 == 2(匹配中) 1&2==0 灵莲, 0&2==0 (等待匹配)
static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; }
/** 內(nèi)部類雕凹,棧節(jié)點(diǎn)*/
static final class SNode {
volatile SNode next; // next node in stack
volatile SNode match; // 配對的節(jié)點(diǎn)泳炉,如果未配對則未空
volatile Thread waiter; // to control park/unpark
Object item; // data; or null for REQUESTs
int mode;
SNode(Object item) {
this.item = item;
}
boolean casNext(SNode cmp, SNode val) {
return cmp == next &&
UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
/**
嘗試配對
s:被配對的節(jié)點(diǎn)
配對的邏輯為:通過CAS設(shè)置節(jié)點(diǎn)的match屬性紊服,如果能設(shè)置成功,則說明配對成功造烁,配對成功后明场,再通過LockSupport.unpark(w);
將其對應(yīng)的等待線程喚醒汽摹。
這個方法比較簡答,就不再累述
**/
boolean tryMatch(SNode s) {
if (match == null &&
UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
Thread w = waiter;
if (w != null) { // waiters need at most one unpark
waiter = null;
LockSupport.unpark(w);
}
return true;
}
return match == s;
}
/**
* Tries to cancel a wait by matching node to itself.
如果一個節(jié)點(diǎn)等待超時榕堰,或者線程被中斷竖慧,則需要取消節(jié)點(diǎn)的等待,而取消等待的標(biāo)志就是將match指向自己
*/
void tryCancel() {
UNSAFE.compareAndSwapObject(this, matchOffset, null, this);
}
boolean isCancelled() {
return match == this;
}
// Unsafe mechanics
//Unsafe機(jī)制逆屡, 不懂的就不要看juc包了圾旨,先搞懂Unsafe再看
private static final sun.misc.Unsafe UNSAFE;
private static final long matchOffset;
private static final long nextOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = SNode.class;
matchOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("match"));
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
} catch (Exception e) {
throw new Error(e);
}
}
}
/** The head (top) of the stack */
//棧頂節(jié)點(diǎn)
volatile SNode head;
//重新CAS棧頂節(jié)點(diǎn),一般都用在棧頂元素出棧的時候
boolean casHead(SNode h, SNode nh) {
return h == head &&
UNSAFE.compareAndSwapObject(this, headOffset, h, nh);
}
//構(gòu)造棧節(jié)點(diǎn)
static SNode snode(SNode s, Object e, SNode next, int mode) {
if (s == null) s = new SNode(e);
s.mode = mode;
s.next = next;
return s;
}
//關(guān)鍵方法transfer魏蔗, 通過e是否為空來判斷本次操作是獲取元素還是放入元素
E transfer(E e, boolean timed, long nanos) {
SNode s = null; // constructed/reused as needed
int mode = (e == null) ? REQUEST : DATA; //e為空砍的, 表示該次請求為獲取元素,e不為空則是生產(chǎn)放入元素
for (;;) {
SNode h = head;
//①當(dāng)頭節(jié)點(diǎn)為空莺治,則新加入節(jié)點(diǎn)直接入棧廓鞠,然后進(jìn)入等待,當(dāng)頭節(jié)點(diǎn)不為空谣旁,并且本次操作的模式和頭節(jié)點(diǎn)模式一樣床佳,則繼續(xù)進(jìn)入棧等待,否則去到代碼②
if (h == null || h.mode == mode) { // empty or same-mode
if (timed && nanos <= 0) { // can't wait //不需要等待
if (h != null && h.isCancelled()) //頭節(jié)點(diǎn)已經(jīng)中斷或者超時榄审,重新設(shè)置頭節(jié)點(diǎn)
casHead(h, h.next); // pop cancelled node
else
return null; //由于不能等待砌们,而頭節(jié)點(diǎn)與本次入隊列的節(jié)點(diǎn)操作模式相同,所以直接返回空(如果可以等待的話會進(jìn)入自旋)
} else if (casHead(h, s = snode(s, e, h, mode))) { //新節(jié)點(diǎn)入棧
SNode m = awaitFulfill(s, timed, nanos); //等待該節(jié)點(diǎn)被配對或者超時中斷,詳情見下面的方法具體解釋浪感。
if (m == s) { // wait was cancelled
clean(s); //清理從head節(jié)點(diǎn)到s.next節(jié)點(diǎn)之間的“取消”節(jié)點(diǎn)
return null; //返回null昔头,線程被中斷或者節(jié)點(diǎn)等待超時,調(diào)用者(上層應(yīng)用)能處理線程中斷邏輯影兽。
}
//s節(jié)點(diǎn)從awaitFulfill中出來揭斧,說明是配對成功節(jié)點(diǎn)將其喚醒,而喚醒的節(jié)點(diǎn)一定是棧頂節(jié)點(diǎn)(見代碼②邏輯峻堰,因為每次只允許有一個節(jié)點(diǎn)進(jìn)入配對狀態(tài)讹开,
//進(jìn)入配對狀態(tài)后,其它節(jié)點(diǎn)無法加入棧捐名,只有等到配對成功萧吠,重新casHead后其它節(jié)點(diǎn)才能入棧,所以這里直接判斷頭節(jié)點(diǎn))
if ((h = head) != null && h.next == s) //因為匹配線程那邊也可能在調(diào)用casHead,所以這里h.next == s判斷一下head是否已經(jīng)被重置
casHead(h, s.next); // help s's fulfiller
return (E) ((mode == REQUEST) ? m.item : s.item); //如果是消費(fèi)者則返回匹配上的m.item, 如果是生產(chǎn)者桐筏,則返回生產(chǎn)的節(jié)點(diǎn)的item
}
}
//②
//本次操作模式和頭節(jié)點(diǎn)模式不相等,先檢查頭節(jié)點(diǎn)模式是否為匹配中拇砰。
//頭節(jié)點(diǎn)模式為 3或者2時為匹配中梅忌,這時任何新增的節(jié)點(diǎn)都無法入棧,因為任何新增操作的模式都不等于3或者2除破,所以在這次匹配完成之前牧氮,head節(jié)點(diǎn)是不會變化的。
else if (!isFulfilling(h.mode)) {
//頭節(jié)點(diǎn)模式為:0或者1
//head節(jié)點(diǎn)已經(jīng)取消瑰枫,所以將head指向下一個節(jié)點(diǎn)踱葛,head出棧
if (h.isCancelled())
casHead(h, h.next); // pop and retry
//②-1
//如果頭節(jié)點(diǎn)模式為0或1,則本次操作的節(jié)點(diǎn)入棧光坝,入棧的模式為:2或者3尸诽, ps(這個時候頭節(jié)點(diǎn)判斷isFullfilling則返回true),
//所以一旦有節(jié)點(diǎn)在匹配中,則其它新增節(jié)點(diǎn)都會直接去到代碼③
//FULFILLING=2, 2|0 == 2, 2|1==3
else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
for (;;) { // loop until matched or waiters disappear
SNode m = s.next; // m is s's match
//如果新的頭節(jié)點(diǎn)后面沒有等待節(jié)點(diǎn)了(等待節(jié)點(diǎn)有可能被中斷或者超時出棧了)盯另,則清空棧性含,自旋重新開始
if (m == null) { // all waiters are gone
casHead(s, null); // pop fulfill node
s = null; // use new node next time
break; // restart main loop
}
//②-2如果下一個節(jié)點(diǎn)不為空,則嘗試匹配
SNode mn = m.next;
if (m.tryMatch(s)) {
casHead(s, mn); // pop both s and m //匹配成功鸳惯,則彈出頭節(jié)點(diǎn)和等待節(jié)點(diǎn)
//如果本次操作是獲取數(shù)據(jù)商蕴,則返回匹配上的生產(chǎn)數(shù)據(jù)的item,相反芝发, 如果本次操作是生產(chǎn)數(shù)據(jù)绪商,則直接返回自己生產(chǎn)的數(shù)據(jù)。
return (E) ((mode == REQUEST) ? m.item : s.item);
} else // lost match
//如果匹配失敻ňā(理論上一次只有一個節(jié)點(diǎn)能夠進(jìn)行匹配格郁,見②-1,所以理論上不應(yīng)該會匹配失敗,但是為什么會失敗呢理张?
//因為匹配的m還有可能超時或者被中斷)赫蛇,所以如果匹配失敗,彈出m節(jié)點(diǎn)雾叭。
s.casNext(m, mn); // help unlink
}
}
}
//③如果頭節(jié)點(diǎn)不為空悟耘,并且本次操作的模式和頭節(jié)點(diǎn)的模式不相同,并且頭節(jié)點(diǎn)是正在匹配
else { // help a fulfiller
SNode m = h.next; // m is h's match
//當(dāng)②中新的匹配節(jié)點(diǎn)進(jìn)入后织狐,有可能等待節(jié)點(diǎn)超時或者被中斷了暂幼,即waiter is gone
if (m == null) // waiter is gone
//刪除匹配節(jié)點(diǎn),棧清空重新開始移迫,因為最外層有一個自旋旺嬉,所以又會重新安置這個節(jié)點(diǎn)。
casHead(h, null); // pop fulfilling node
else {
//這一段代碼邏輯與②-2相同
SNode mn = m.next;
if (m.tryMatch(h)) // help match
casHead(h, mn); // pop both h and m
else // lost match
h.casNext(m, mn); // help unlink
}
}
}
}
//讓節(jié)點(diǎn)進(jìn)入等待(通常發(fā)生在節(jié)點(diǎn)入棧時發(fā)現(xiàn)沒有對應(yīng)的匹配者)
//根據(jù)設(shè)置的是否等待超時條件厨埋,讓當(dāng)前線程進(jìn)入等待配對的自旋邏輯中邪媳,只有當(dāng)配對成功,或者線程中斷才會退出荡陷。
SNode awaitFulfill(SNode s, boolean timed, long nanos) {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Thread w = Thread.currentThread();
int spins = (shouldSpin(s) ?
(timed ? maxTimedSpins : maxUntimedSpins) : 0);
for (;;) {
if (w.isInterrupted()) //當(dāng)前線程已經(jīng)中斷雨效,則取消當(dāng)前節(jié)點(diǎn)的配對等待
s.tryCancel();
SNode m = s.match;
if (m != null) //當(dāng)前節(jié)點(diǎn)已經(jīng)配對成功,或者取消(m==s)
return m;
if (timed) { //有超時設(shè)置
nanos = deadline - System.nanoTime();
if (nanos <= 0L) { //已經(jīng)超時
s.tryCancel(); //取消s節(jié)點(diǎn)的等待
continue;
}
}
//不超時(一直等待)废赞,或者設(shè)置超時時間后還沒有超時徽龟。
if (spins > 0)
spins = shouldSpin(s) ? (spins-1) : 0; //減少循環(huán)次數(shù)
else if (s.waiter == null) //不再自旋,設(shè)置線程
s.waiter = w; // establish waiter so can park next iter
else if (!timed) //如果設(shè)置的是一直等待唉地,則睡眠當(dāng)前線程
LockSupport.park(this);
else if (nanos > spinForTimeoutThreshold) //如果是有時間的等待据悔,但是如果還剩下小于1秒的時間,則不再park耘沼,直接繼續(xù)自旋
LockSupport.parkNanos(this, nanos);
}
}
//判斷是否還需要繼續(xù)自旋還是睡眠
//這里要注意的是該方法只在awaitFulfill方法被調(diào)用极颓,而awaitFulfill只在入棧元素和棧頂元素操作模式一致時(者說明棧頂元素不是匹配中狀態(tài),見代碼②處的說明)
boolean shouldSpin(SNode s) {
SNode h = head;
//這個邏輯尤其是h==null確實沒看明白什么時候會出現(xiàn)h!=s然后h == null,因為s入棧時head會指向s耕拷,就算s在自旋過程中有新的head節(jié)點(diǎn)加入讼昆,那h也不會為null。 先打骚烧?浸赫??
//但是如果我來寫這個方法的話赃绊, 我我會寫一個shouldNotSpin,如下
return (h == s || h == null || isFulfilling(h.mode));
}
//s不在棧頂既峡,并且棧頂元素不在匹配中,則棧頂以后的元素可以暫時不用自旋碧查,這樣邏輯就很容易說得通
//因為棧頂元素還沒有進(jìn)來匹配者运敢,說明棧頂元素和s一樣屬于等待者(前者都還沒被匹配校仑,s自然不會被匹配,所以先睡一會兒)
boolean shouldNotSpin(Snode s){
Snode h = head;
return (h!=null && h!=s && !isFulfilling(h.mode));
}
//清理節(jié)點(diǎn)出棧
void clean(SNode s) {
s.item = null; // forget item
s.waiter = null; // forget thread
//這里為什么要嘗試下一個传惠,而不到下下一個迄沫??卦方?羊瘩?
SNode past = s.next;
if (past != null && past.isCancelled())
past = past.next;
// Absorb cancelled nodes at head
//清理頭節(jié)點(diǎn)到past節(jié)點(diǎn)之間,從頭節(jié)點(diǎn)開始連續(xù)的“取消”狀態(tài)的節(jié)點(diǎn)(主要是清理頭節(jié)點(diǎn))
SNode p;
while ((p = head) != null && p != past && p.isCancelled())
casHead(p, p.next);
// Unsplice embedded nodes
//清理頭節(jié)點(diǎn)到past節(jié)點(diǎn)之間盼砍,跳躍的被“取消”的節(jié)點(diǎn)(主要是清理頭節(jié)點(diǎn)與past中間的節(jié)點(diǎn))
while (p != null && p != past) {
SNode n = p.next;
if (n != null && n.isCancelled())
p.casNext(n, n.next);
else
p = n;
}
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long headOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = TransferStack.class;
headOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("head"));
} catch (Exception e) {
throw new Error(e);
}
}
}
核心邏輯就是以上的代碼尘吗,只要記住:SynchronousQueue不存儲實際的數(shù)據(jù)浇坐,它的棽谴罚或者隊列中存儲的是生產(chǎn)者和消費(fèi)者節(jié)點(diǎn),最開始進(jìn)入棧的節(jié)點(diǎn)就成為了等待者(這里不管是生產(chǎn)還是消費(fèi))近刘,而后面進(jìn)入的節(jié)點(diǎn)都需要根據(jù)操作的mode來判斷是繼續(xù)入棧等待還是入棧后立即進(jìn)行匹配擒贸。