SynchronousQueue實(shí)現(xiàn)原理
Java 6的并發(fā)編程包中的SynchronousQueue是一個(gè)沒有數(shù)據(jù)緩沖的BlockingQueue跨算,生產(chǎn)者線程對其的插入操作put必須等待消費(fèi)者的移除操作take疏唾,反過來也一樣兄春。
不像ArrayBlockingQueue或LinkedListBlockingQueue基协,SynchronousQueue內(nèi)部并沒有數(shù)據(jù)緩存空間遏弱,你不能調(diào)用peek()方法來看隊(duì)列中是否有數(shù)據(jù)元素房匆,因?yàn)閿?shù)據(jù)元素只有當(dāng)你試著取走的時(shí)候才可能存在卦尊,不取走而只想偷窺一下是不行的叛拷,當(dāng)然遍歷這個(gè)隊(duì)列的操作也是不允許的。隊(duì)列頭元素是第一個(gè)排隊(duì)要插入數(shù)據(jù)的線程岂却,而不是要交換的數(shù)據(jù)忿薇。數(shù)據(jù)是在配對的生產(chǎn)者和消費(fèi)者線程之間直接傳遞的,并不會將數(shù)據(jù)緩沖到隊(duì)列中躏哩∈鸷疲可以這樣來理解:生產(chǎn)者和消費(fèi)者互相等待對方,握手扫尺,然后一起離開筋栋。
SynchronousQueue的一個(gè)使用場景是在線程池里。Executors.newCachedThreadPool()就使用了SynchronousQueue正驻,這個(gè)線程池根據(jù)需要(新任務(wù)到來時(shí))創(chuàng)建新的線程弊攘,如果有空閑線程則會重復(fù)使用,線程空閑了60秒后會被回收姑曙。
實(shí)現(xiàn)原理
SynchronousQueue是一種很特別的BlockingQueue襟交,任何一個(gè)添加元素的操作都必須等到另外一個(gè)線程拿走元素才會結(jié)束。也就是SynchronousQueue本身不會存儲任何元素伤靠,相當(dāng)于生產(chǎn)者和消費(fèi)者手遞手直接交易捣域。
SynchronousQueue有一個(gè)fair選項(xiàng),如果fair為true宴合,稱為fair模式焕梅,否則就是unfair模式。
在fair模式下卦洽,所有等待的生產(chǎn)者線程或者消費(fèi)者線程會按照開始等待時(shí)間依次排隊(duì)贞言,然后按照等待先后順序進(jìn)行匹配交易。這種情況用隊(duì)列實(shí)現(xiàn)阀蒂。
在unfair模式下蜗字,則剛好相反打肝,后來先匹配,這種情況用棧實(shí)現(xiàn)挪捕。
public SynchronousQueue(boolean fair) {
transferer = fair ? new TransferQueue() : new TransferStack();
}
因?yàn)樘砑釉睾湍米咴厥穷愃剖诌f手交易的粗梭,所以對于拿走元素和添加元素操作,SynchronousQueue調(diào)用的是Transferer同一個(gè)方法transfer级零。
當(dāng)object為null時(shí)表示是拿走元素断医,用于消費(fèi)者線程,否則則是添加元素奏纪,用于生產(chǎn)者線程鉴嗤。因此transfer方法是分析的重點(diǎn)。
abstract Object transfer(Object e, boolean timed, long nanos);
首先來看用于fair模式的TransferQueue的transfer方法:
看代碼之前序调,來理一下邏輯:
開始隊(duì)列肯定是空醉锅。
線程進(jìn)入隊(duì)列,如果隊(duì)列是空的发绢,那么就添加該線程進(jìn)入隊(duì)列硬耍,然后進(jìn)行等待(要么有匹配線程出現(xiàn),要么就是該請求超時(shí)取消)
第二個(gè)線程進(jìn)入边酒,如果前面一個(gè)線程跟它屬于不同類型经柴,也就是說兩者是可以匹配的,那么就從隊(duì)列刪除第一個(gè)線程墩朦。如果是相同的線程坯认,那么做法參照2。
理清了基本邏輯氓涣,也就是會有兩種情況:
隊(duì)列為空或者隊(duì)列中的等待線程是相同類型
隊(duì)列中的等待線程是匹配的類型
Object transfer(Object e, boolean timed, long nanos) {
QNode s = null;
// e不是null表示是生成者線程牛哺,e就是產(chǎn)品,反之就是消費(fèi)者線程
boolean isData = (e != null);
for (;;) {
QNode t = tail;
QNode h = head;
// tail和head在隊(duì)列創(chuàng)建時(shí)會被初始化成一個(gè)虛擬節(jié)點(diǎn)
// 因此發(fā)現(xiàn)沒有初始化劳吠,重新循環(huán)等待直到初始化完成
if (t == null || h == null)
continue;
// 隊(duì)列為空或等待線程類型相同(不同類型才能匹配)
// 這兩種情況都要把當(dāng)前線程加入到等待隊(duì)列中
if (h == t || t.isData == isData) {
QNode tn = t.next;
// tail對象已經(jīng)被更新引润,出現(xiàn)不一致讀的現(xiàn)象,重新循環(huán)
if (t != tail)
continue;
// 添加線程到等待隊(duì)列時(shí)會先更新當(dāng)前tail的next赴背,然后
// 更新tail本身椰拒,因此出現(xiàn)只有next被更新的情況晶渠,應(yīng)該
// 更新tail凰荚,然后重新循環(huán)
if (tn != null) {
advanceTail(t, tn);
continue;
}
// 設(shè)定了超時(shí),剩余等待時(shí)間耗盡的時(shí)候褒脯,就無需再等待
if (timed && nanos <= 0)
return null;
// 首次使用s的時(shí)候便瑟,新建一個(gè)節(jié)點(diǎn)保存當(dāng)前線程和數(shù)據(jù)來初始化s
if (s == null)
s = new QNode(e, isData);
// 嘗試更新tail的next,把新建節(jié)點(diǎn)添加到tail的后面番川,如果失敗了到涂,就重新循環(huán)
if (!t.casNext(null, s))
continue;
// 把新建的節(jié)點(diǎn)設(shè)置為tail
advanceTail(t, s);
// 等待匹配線程脊框,成功匹配則返回的匹配的值
// 否則返回當(dāng)前節(jié)點(diǎn),因此s和x相同表示請求被取消
Object x = awaitFulfill(s, e, timed, nanos);
if (x == s) {
clean(t, s);
return null;
}
// 這個(gè)時(shí)候已經(jīng)匹配成功了践啄,s應(yīng)該是排在第一個(gè)的等待線程
// 如果s依然在隊(duì)列中浇雹,那么需要更新head。
// 更新head的方法是把s這個(gè)排在第一位的節(jié)點(diǎn)作為新的head
// 因此需要重置一些屬性使它變成虛擬節(jié)點(diǎn)
if (!s.isOffList()) {
advanceHead(t, s);
if (x != null)
s.item = s;
s.waiter = null;
}
// x不為null表示拿到匹配線程的數(shù)據(jù)(消費(fèi)者拿到生產(chǎn)者的數(shù)據(jù))屿讽,
// 因此返回該數(shù)據(jù)昭灵,否則返回本身的數(shù)據(jù)(生成者返回自己的數(shù)據(jù))
return (x != null) ? x : e;
} else { // 線程可以匹配
// 因?yàn)槭顷?duì)列,因此匹配的是第一個(gè)節(jié)點(diǎn)
QNode m = h.next;
// 同樣需要檢查不一致讀的情況
if (t != tail || m == null || h != head)
continue;
Object x = m.item;
// 匹配失敗時(shí)伐谈,把m從隊(duì)列中移走烂完,重新循環(huán)
if (isData == (x != null) || // m已經(jīng)被匹配了
x == m || // m已經(jīng)被取消了
!m.casItem(x, e)) { // 用CAS設(shè)置m的數(shù)據(jù)為null
advanceHead(h, m);
continue;
}
// 匹配成功,更新head
advanceHead(h, m);
// 解除m的線程等待狀態(tài)
LockSupport.unpark(m.waiter);
// 返回匹配的數(shù)據(jù)
return (x != null) ? x : e;
}
}
}
接著來用于Unfair模式的TransferStack的transfer方法
大體邏輯應(yīng)該是一樣的诵棵,不同就是隊(duì)列的入隊(duì)和出隊(duì)操作對應(yīng)到棧時(shí)就是入棧和出棧的操作抠蚣。
Object transfer(Object e, boolean timed, long nanos) {
SNode s = null;
int mode = (e == null) ? REQUEST : DATA;
for (;;) {
SNode h = head;
// 棧為空或者節(jié)點(diǎn)類型相同的情況
if (h == null || h.mode == mode) {
if (timed && nanos <= 0) {
// 檢查棧頂節(jié)點(diǎn)是否已經(jīng)取消,如果已經(jīng)取消履澳,彈出節(jié)點(diǎn)
// 重新循環(huán)嘶窄,接著檢查新的棧頂節(jié)點(diǎn)
if (h != null && h.isCancelled())
casHead(h, h.next);
else
return null;
// 新建節(jié)點(diǎn),并且嘗試把新節(jié)點(diǎn)入棧
} else if (casHead(h, s = snode(s, e, h, mode))) {
// 等待匹配奇昙,如果發(fā)現(xiàn)是被取消的情況护侮,則釋放節(jié)點(diǎn),返回null
SNode m = awaitFulfill(s, timed, nanos);
if (m == s) {
clean(s);
return null;
}
// 如果匹配的成功兩個(gè)節(jié)點(diǎn)是棧頂?shù)膬蓚€(gè)節(jié)點(diǎn)
// 把這兩個(gè)節(jié)點(diǎn)都彈出
if ((h = head) != null && h.next == s)
casHead(h, s.next); // help s's fulfiller
return (mode == REQUEST) ? m.item : s.item;
}
} else if (!isFulfilling(h.mode)) { // 棧頂節(jié)點(diǎn)沒有和其他線程在匹配储耐,可以匹配
if (h.isCancelled()) // 棧頂節(jié)點(diǎn)的請求已經(jīng)被取消
casHead(h, h.next); // 移除棧頂元素重新循環(huán)
// 嘗試把該節(jié)點(diǎn)也入棧羊初,該節(jié)點(diǎn)設(shè)置為正在匹配的狀態(tài)
// 也就是isFulfilling返回true
else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
for (;;) {
// 棧頂節(jié)點(diǎn)(當(dāng)前線程的節(jié)點(diǎn))和它的下一個(gè)節(jié)點(diǎn)進(jìn)行匹配,m為null意味著
// 棧里沒有其他節(jié)點(diǎn)了什湘,因?yàn)榍懊嬖摴?jié)點(diǎn)入棧了长赞,需要彈出這個(gè)節(jié)點(diǎn)重新循環(huán)
SNode m = s.next;
if (m == null) {
casHead(s, null);
s = null;
break;
}
// 這個(gè)時(shí)候是有節(jié)點(diǎn)可以匹配的,嘗試為這兩個(gè)節(jié)點(diǎn)做匹配
SNode mn = m.next;
// m和s匹配成功闽撤,彈出這兩個(gè)節(jié)點(diǎn)得哆,返回?cái)?shù)據(jù);匹配失敗哟旗,把m移除
if (m.tryMatch(s)) {
casHead(s, mn);
return (mode == REQUEST) ? m.item : s.item;
} else
s.casNext(m, mn);
}
}
// 棧頂正在匹配贩据,參見代碼:
// else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
// 做法基本類似,只是這里幫助其他線程匹配闸餐,無論成功與否
// 都要重新循環(huán)
} else {
SNode m = h.next;
if (m == null)
casHead(h, null);
else {
SNode mn = m.next;
if (m.tryMatch(h))
casHead(h, mn);
else
h.casNext(m, mn);
}
}
}
}