上一篇文章剖析了 LinkedBlockingQueue 的相關(guān)源碼掸绞,那這篇文章接著看另外一個常見的阻塞隊列 —— SynchronousQueue
簡介
SynchronousQueue 是一個比較特殊的阻塞隊列類,為什么這樣說呢?我們不妨從官方的類注釋說起...
根據(jù)類注釋可大概得出以下幾點:
- 每一個插入操作都必須等待另一個線程完成刪除操作
- 隊列沒有內(nèi)部容量哭当,所以不能迭代數(shù)據(jù)
- 可以選擇公平策略。公平策略是使用
隊列
先入先出生均,非公平策略是使用堆棧
先入后出
咦矢否?SynchronousQueue 對象沒有容量,那這個阻塞隊列的使用場景是什么呢眼虱?
其實線程池的其中一種實現(xiàn)——Executors.newCachedThreadPool
就使用了SynchronousQueue作為阻塞隊列
那先從一個demo開始揭開 SynchronousQueue 的廬山真面目吧或舞!
public class SynchronousQueueDemo {
public static void main(String[] args) throws InterruptedException {
SynchronousQueue synchronousQueue = new SynchronousQueue();
new Thread(() -> {
try {
synchronousQueue.put("Hello World!");
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "線程一").start();
new Thread(() -> {
try {
System.out.println(synchronousQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "線程二").start();
}
}
示例中為什么在main方法里使用兩個線程分別執(zhí)行put和take操作呢?因為在同一線程中蒙幻,有可能存在先執(zhí)行take操作映凳,當程序執(zhí)行take方法的時候發(fā)現(xiàn)隊列為空就會阻塞當前線程,那么之后的put方法就不會執(zhí)行邮破,線程將會一直等待诈豌。
源碼剖析
成員變量
// SynchronousQueue 定義的抽象類,由 TransferStack 和 TransferQueue 實現(xiàn)
private transient volatile Transferer<E> transferer;
// CPU 數(shù)量
static final int NCPUS = Runtime.getRuntime().availableProcessors();
// 自旋次數(shù)抒和,如果transfer指定了timeout時間矫渔,則使用maxTimeSpins,如果CPU數(shù)量小于2則自旋次數(shù)為0,否則為32摧莽。不會隨CPU數(shù)量增加而變化
static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32;
// 自旋次數(shù)庙洼,如果沒有指定時間設(shè)置,則使用maxUntimedSpins。如果NCPUS數(shù)量大于等于2則設(shè)定為為32*16油够,否則為0
static final int maxUntimedSpins = maxTimedSpins * 16;
// 為了防止自定義的時間限過長蚁袭,為了優(yōu)化而設(shè)置,如果自定義時間長于這個值則取默認的 spinForTimeoutThreshold 石咬,單位為納秒揩悄。
static final long spinForTimeoutThreshold = 1000L;
構(gòu)造函數(shù)
// 默認使用非公平策略
public SynchronousQueue() {
this(false);
}
// fair為false是非公平策略,使用的數(shù)據(jù)結(jié)構(gòu)是棧鬼悠;fair為true是公平策略删性,使用的數(shù)據(jù)結(jié)構(gòu)是隊列
public SynchronousQueue(boolean fair) {
transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}
先列出相關(guān)方法的源碼,但并沒有加上注釋焕窝,因為核心方法都在 Transferer 對象中聲明5磐Α(值得注意的是,SynchronousQueue類并沒有實現(xiàn)remove它掂、removeAll汗侵、peek、clear等方法群发,都是使用默認值)
添加(add晰韵、offer、put)熟妓、刪除雪猪、查找元素
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
if (e == null) throw new NullPointerException();
if (transferer.transfer(e, true, unit.toNanos(timeout)) != null)
return true;
if (!Thread.interrupted())
return false;
throw new InterruptedException();
}
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
return transferer.transfer(e, true, 0) != null;
}
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
if (transferer.transfer(e, false, 0) == null) {
Thread.interrupted();
throw new InterruptedException();
}
}
public E poll() {
return transferer.transfer(null, true, 0);
}
public E take() throws InterruptedException {
E e = transferer.transfer(null, false, 0);
if (e != null)
return e;
Thread.interrupted();
throw new InterruptedException();
}
相對于 ArrayBlockingQueue 和 LinkedBlockingQueue,可以發(fā)現(xiàn)類似poll起愈、take等相關(guān)方法都被抽象成統(tǒng)一方法來進行操作只恨,通過抽象出內(nèi)部類 Transferer 實現(xiàn)不同的操作。接下來抬虽,咱們重點看看公平模式與非公平模式下的源碼官觅。
1.SynchronousQueue 的非公平模式(TransferStack)
眾所周知,堆棧是FILO(First in last out)的方式阐污,所以也可以理解為非公平模式為什么使用棧這種數(shù)據(jù)結(jié)構(gòu)休涤,如果排隊的時候第一個進來,最后一個才能走笛辟,這很不公平嘛功氨!
在 TransferStack 內(nèi)部有 REQUEST、DATA手幢、FULFILLING
這三個狀態(tài)捷凄。
REQUEST 表示請求從棧獲取數(shù)據(jù)操作的消費者,如:take 方法围来;
DATA 表示往棧內(nèi)部放數(shù)據(jù)的生產(chǎn)者跺涤,如:put 方法匈睁;
FULFILLING 表示正在交易的生產(chǎn)者或消費者
REQUEST 和 DATA 這兩種狀態(tài)理解起來還不難,但或許 FULFILLING 還是不太清楚有什么用桶错,先帶著疑問往下去看航唆,現(xiàn)在只需要簡單的理解為:不同狀態(tài) REQUEST 和 DATA 可以相互匹配的,當與棧頂匹配后就會將他們狀態(tài)轉(zhuǎn)換為 FULFILLING牛曹,當匹配成功后就會將棧頂和匹配的元素一同出棧佛点。
成員變量
//表示一個未填充的消費者
static final int REQUEST = 0;
//表示一個未填充的生產(chǎn)者
static final int DATA = 1;
// 表示生產(chǎn)者正在給等待資源的消費者補給資源醇滥,或生產(chǎn)者在等待消費者消費資源
static final int FULFILLING = 2;
//棧的頭結(jié)點
volatile SNode head;
棧節(jié)點
// 棧節(jié)點
static final class SNode {
// 節(jié)點的后繼
volatile SNode next;
// 相匹配的節(jié)點
volatile SNode match;
// 等待的線程
volatile Thread waiter;
// item和mode不需要可見黎比,由于他們總是在其他可見/原子操作寫之前,讀之后
Object item;// 數(shù)據(jù)
int mode;//節(jié)點模式
SNode(Object item) {
this.item = item;
}
// cas保證線程安全設(shè)置節(jié)點后繼節(jié)點
boolean casNext(SNode cmp, SNode val) {
return cmp == next && UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
//嘗試匹配目標節(jié)點與本節(jié)點鸳玩,如果匹配阅虫,可以喚醒線程。補給者調(diào)用tryMatch方法不跟,確定它們的等待線程颓帝。等待線程阻塞到它們自己被匹配。如果匹配返回true
boolean tryMatch(SNode s) {
// 設(shè)置本節(jié)點的匹配為s節(jié)點
if (match == null && UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
Thread w = waiter;
if (w != null) {
waiter = null;
LockSupport.unpark(w);
}
return true;
}
return match == s;
}
// 節(jié)點嘗試取消等待窝革,match 從原來的 null 變?yōu)閠his
void tryCancel() {
UNSAFE.compareAndSwapObject(this, matchOffset, null, this);
}
// match 指向自己购城,則取消等待
boolean isCancelled() {
return match == this;
}
}
核心方法
- isFulfilling:判斷指定類型是否是互補模式
- casHead(SNode h, SNode nh):替換當前頭結(jié)點
- SNode snode(SNode s, Object e, SNode next, int mode):生成SNode節(jié)點對象
- transfer(E e, boolean timed, long nanos): 主要處理邏輯
- awaitFulfill(SNode s, boolean timed, long nanos): 等待fulfill操作
- shouldSpin(SNode s):判斷節(jié)點s是頭結(jié)點或是fulfill節(jié)點則返回true
- clean(SNode s):將head節(jié)點到S節(jié)點之間所有已經(jīng)取消的節(jié)點全部移出
// 如果m是一個填充為單元,則返回true
static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; }
// 比較head是否為h虐译,并且CAS操作nh為當前head
boolean casHead(SNode h, SNode nh) {
return h == head && UNSAFE.compareAndSwapObject(this, headOffset, h, nh);
}
//創(chuàng)建或重新設(shè)置節(jié)點的變量瘪板。在節(jié)點入棧時創(chuàng)建,在當可能需要保證減少intervals(間隔)讀和head的CAS操或避免由于競爭CAS操作節(jié)點入棧引起的垃圾時漆诽,此方法會被transfer調(diào)用
static SNode snode(SNode s, Object e, SNode next, int mode) {
if (s == null) s = new SNode(e);
s.mode = mode;
s.next = next;
return s;
}
E transfer(E e, boolean timed, long nanos) {
// 1.如果隊列為空或已經(jīng)包含相同模式的節(jié)點侮攀,則嘗試節(jié)點入棧,等待匹配返回厢拭,如果取消返回null兰英。
// 2.如果包含一個互補模式的節(jié)點(take(REQUEST)->put(DATA);put(DATA)->take(REQUEST))供鸠,則嘗試一個FULFILLING節(jié)點入棧畦贸,同時匹配等待的協(xié)同節(jié)點,兩個節(jié)點同時出棧楞捂,返回匹配的元素家制。由于其他線程執(zhí)行步驟3,實際匹配和解除鏈接指針動作不會發(fā)生泡一。
// 3.如果棧頂存在另外一個FULFILLING的節(jié)點颤殴,則匹配節(jié)點,并出棧鼻忠。這段的代碼與fulfilling相同涵但,除非沒有元素返回
SNode s = null;
// 根據(jù)元素判斷節(jié)點模式杈绸,元素不為null,則為DATA矮瘟,否則為REQUEST
int mode = (e == null) ? REQUEST : DATA;
for (;;) {
//剛開始頭節(jié)點為null瞳脓,第一個進來的節(jié)點就是頭節(jié)點。
SNode h = head;
if (h == null || h.mode == mode) {// 如果是空隊列澈侠,或棧頭節(jié)點的模式與要放入的節(jié)點模式相同
if (timed && nanos <= 0) {
//如果超時劫侧,則取消等待,出棧哨啃,設(shè)置棧頭為其后繼
if (h != null && h.isCancelled())
casHead(h, h.next);
else
return null;
} else if (casHead(h, s = snode(s, e, h, mode))) {
//如果非超時烧栋,則將創(chuàng)建的新節(jié)點入棧成功,即放在棧頭拳球,自旋等待匹配節(jié)點(timed決定是否超時)
SNode m = awaitFulfill(s, timed, nanos);
// 返回的m == s 表示該節(jié)點被取消了或者超時审姓、中斷了
if (m == s) {
// 如果返回的是自己,節(jié)點取消等待祝峻,從棧中移除魔吐,并遍歷棧移除取消等待的節(jié)點
clean(s);
return null;
}
if ((h = head) != null && h.next == s)
//s節(jié)點匹配成功,則設(shè)置棧頭為s的后繼
casHead(h, s.next);
// 匹配成功莱找,REQUEST模式返回酬姆,匹配到的節(jié)點元素(DATA),DATA模式返回當前節(jié)點元素
return (E) ((mode == REQUEST) ? m.item : s.item);
}
} else if (!isFulfilling(h.mode)) { // 如果棧頭節(jié)點模式不為Fulfilling奥溺,判斷是否取消等待辞色,是則出棧
if (h.isCancelled()) // already cancelled
casHead(h, h.next); // pop and retry
else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) { //非取消等待,則是節(jié)點入棧
for (;;) { // 自旋直到節(jié)點匹配或者等待節(jié)點都沒有
SNode m = s.next;
//后繼節(jié)點為null谚赎,則出棧
if (m == null) { // 堆棧中沒有等待節(jié)點
casHead(s, null); // 將棧頭節(jié)點位置設(shè)置為null
s = null; // 棧頭節(jié)點設(shè)置為null淫僻,便于GC
break; // 跳出當前循環(huán),重新執(zhí)行主循環(huán)
}
SNode mn = m.next;
// 嘗試匹配 s 節(jié)點
if (m.tryMatch(s)) {
//匹配成功兩個節(jié)點則出棧
casHead(s, mn); // pop both s and m
return (E) ((mode == REQUEST) ? m.item : s.item);
} else
// 如果沒有匹配成功壶唤,則說明已經(jīng)有其它線程與 m 節(jié)點匹配了雳灵,將 mn 作為 s 的后繼節(jié)點
s.casNext(m, mn); // help unlink
}
}
} else {
//如果棧頭節(jié)點模式為Fulfilling,則代表棧頭節(jié)點正在與其它節(jié)點匹配闸盔,可以理解為協(xié)助棧頭節(jié)點匹配成功
SNode m = h.next; // m is h's match
if (m == null)
//如果無后繼節(jié)點悯辙,則棧頭出棧
casHead(h, null); // pop fulfilling node
else {
//嘗試匹配,如果匹配成功迎吵,棧頭和匹配節(jié)點出棧躲撰,否則跳過后繼節(jié)點
SNode mn = m.next;
if (m.tryMatch(h)) // help match
casHead(h, mn); // pop both h and m
else // lost match
h.casNext(m, mn); // help unlink
}
}
}
}
// 自旋或阻塞,直到節(jié)點被一個fulfill操作匹配
SNode awaitFulfill(SNode s, boolean timed, long nanos) {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Thread w = Thread.currentThread();
//獲取自旋的次數(shù)
int spins = (shouldSpin(s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0);
for (;;) {
// 如果線程被中斷击费,則取消等待
if (w.isInterrupted())
s.tryCancel();
SNode m = s.match;
// 如果節(jié)點的匹配節(jié)點不為null拢蛋,則返回匹配節(jié)點
if (m != null)
return m;
if (timed) {
nanos = deadline - System.nanoTime();
//如果超時,則取消等待
if (nanos <= 0L) {
s.tryCancel();
continue;
}
}
// 如果自旋次數(shù)大于零蔫巩,且可以自旋谆棱,則自旋次數(shù)減1
if (spins > 0)
spins = shouldSpin(s) ? (spins-1) : 0;
else if (s.waiter == null)
//如果節(jié)點S的等待線程為空快压,則設(shè)置當前節(jié)點為S節(jié)點的等待線程,以便可以park后繼節(jié)點垃瞧。
s.waiter = w; // establish waiter so can park next iter
else if (!timed)
//非超時等在者蔫劣,park當前線程
LockSupport.park(this);
else if (nanos > spinForTimeoutThreshold)
//如果超時時間大于,最大自旋閾值个从,則超時park當前線程
LockSupport.parkNanos(this, nanos);
}
}
// 如果節(jié)點在棧頭或棧頭為FULFILLING的節(jié)點脉幢,則返回true
boolean shouldSpin(SNode s) {
//因為很可能立刻就會有新的線程到來,那么就會立刻進行交易而不需要進行阻塞嗦锐,然后被喚醒嫌松,這是需要過程的,所以這樣的自旋等待是值得的意推。
SNode h = head;
return (h == s || h == null || isFulfilling(h.mode));
}
// 將head節(jié)點到S節(jié)點之間所有已經(jīng)取消的節(jié)點全部移出豆瘫。
void clean(SNode s) {
s.item = null; // forget item
s.waiter = null; // forget thread
SNode past = s.next;
if (past != null && past.isCancelled())
past = past.next;
// 如果取消的是頭節(jié)點則運行下面的清理操作珊蟀,操作邏輯很簡單就是判斷頭結(jié)點是不是取消節(jié)點菊值,如果是則將節(jié)點一定到下一個節(jié)點
SNode p;
while ((p = head) != null && p != past && p.isCancelled())
//p是從頭節(jié)點開始第一個不移除的節(jié)點
casHead(p, p.next);
// 取消不是頭結(jié)點的嵌套節(jié)點
while (p != null && p != past) {
SNode n = p.next;
if (n != null && n.isCancelled())
//移除節(jié)點n
p.casNext(n, n.next);
else
p = n;
}
}
因為堆棧的出棧和入棧操作都在 transfer 方法里面,所以不容易理解育灸。建議讀者多看幾遍腻窒,結(jié)合下面的圖和我個人分析的思路,一步一步的debug磅崭,這樣理解起來應(yīng)該就不難啦~
1.根據(jù)節(jié)點模式判斷是入棧(put)還是出棧(take)操作
2.判斷棧頭是否為空或棧頭節(jié)點操作是否和本次一樣儿子,是的話執(zhí)行第3步,否則執(zhí)行第6步
3.判斷是否是超時操作砸喻,如果是超時操作的話則執(zhí)行第4步柔逼,否則執(zhí)行第5步
4.判斷棧頭是否非空并且是否可以取消,是的話將棧頭后繼節(jié)點cas操作成為棧頭節(jié)點后執(zhí)行第1步割岛,否則返回null
5.cas操作創(chuàng)建節(jié)點并將該節(jié)點入棧愉适,自旋等待匹配節(jié)點
6.判斷棧頭節(jié)點模式是否為Fulfilling,如果不是的話執(zhí)行第7步癣漆,否則執(zhí)行第10步
7.判斷棧頭節(jié)點是否需要取消等待维咸,需要取消等待的話將棧頭節(jié)點的后繼節(jié)點cas操作成為頭節(jié)點后重新執(zhí)行第1步,否則執(zhí)行第8步
8.棧頭節(jié)點不需取消等待惠爽,將當前(take or put or poll)操作封裝為一個節(jié)點入棧后自旋堆棧癌蓖,直到棧頭節(jié)點與棧中其它節(jié)點匹配后兩個節(jié)點都出棧返回節(jié)點信息或者所有的等待節(jié)點都沒有后跳出子循環(huán)重新執(zhí)行第1步
9.棧頭節(jié)點模式為Fulfilling,如果棧頭節(jié)點的后繼節(jié)點為null婚肆,cas設(shè)置棧頭節(jié)點為null并執(zhí)行第1步租副,否則繼續(xù)嘗試匹配棧中其它節(jié)點
2.SynchronousQueue 的公平模式(TransferQueue)
公平模式下使用的數(shù)據(jù)結(jié)構(gòu)是隊列,其方式是先進先出(FIFO:First In First Out)较性。就比如說咱們在結(jié)賬排隊的時候用僧,肯定是先排隊的人先結(jié)賬呀讨越,這樣才公平!
隊列節(jié)點
// 隊列節(jié)點
static final class QNode {
// 下一個節(jié)點
volatile QNode next;
// 元素信息
volatile Object item;
// 當前等待的線程
volatile Thread waiter;
// 是否是數(shù)據(jù)(put的時候是true永毅,take的時候是false)
final boolean isData;
QNode(Object item, boolean isData) {
this.item = item;
this.isData = isData;
}
// 替換當前節(jié)點的next節(jié)點
boolean casNext(QNode cmp, QNode val) {
return next == cmp &&
UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
// 替換當前節(jié)點的item數(shù)據(jù)
boolean casItem(Object cmp, Object val) {
return item == cmp &&
UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
// 取消當前操作把跨,將當前item賦值為this(當前QNode節(jié)點)
void tryCancel(Object cmp) {
UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this);
}
// 如果item是this(當前QNode節(jié)點)的話就返回true,反之返回false
boolean isCancelled() {
return item == this;
}
// 如果已知此節(jié)點離隊列沼死,判斷next節(jié)點是不是為this着逐,則返回true
boolean isOffList() {
return next == this;
}
}
成員變量
// 隊列頭節(jié)點
transient volatile QNode head;
// 隊列尾節(jié)點
transient volatile QNode tail;
// 節(jié)點被取消但沒有從隊列中移除
transient volatile QNode cleanMe;
核心方法
- advanceHead(QNode h, QNode nh):更新頭節(jié)點
- advanceTail(QNode t, QNode nt):更新尾節(jié)點
- casCleanMe(QNode cmp, QNode val):更新 cleanMe 節(jié)點
- awaitFulfill(QNode s, E e, boolean timed, long nanos):等待fulfill操作
- clean(QNode pred, QNode s):清空cleanMe節(jié)點
- transfer(E e, boolean timed, long nanos): 主要處理邏輯
void advanceHead(QNode h, QNode nh) {
if (h == head && UNSAFE.compareAndSwapObject(this, headOffset, h, nh))
h.next = h; // forget old next
}
void advanceTail(QNode t, QNode nt) {
if (tail == t)
UNSAFE.compareAndSwapObject(this, tailOffset, t, nt);
}
boolean casCleanMe(QNode cmp, QNode val) {
return cleanMe == cmp && UNSAFE.compareAndSwapObject(this, cleanMeOffset, cmp, val);
}
Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
// 和 TransferStack.awaitFulfill 方法的邏輯一樣,因此就不顯示方法的邏輯啦
}
void clean(QNode pred, QNode s) {
s.waiter = null; // forget thread
while (pred.next == s) { // Return early if already unlinked
QNode h = head;
QNode hn = h.next; // Absorb cancelled first node as head
if (hn != null && hn.isCancelled()) {
advanceHead(h, hn);
continue;
}
QNode t = tail; // Ensure consistent read for tail
if (t == h)
return;
QNode tn = t.next;
// 判斷現(xiàn)在的t是不是末尾節(jié)點意蛀,可能其他線程插入了內(nèi)容導(dǎo)致不是最后的節(jié)點耸别。
if (t != tail)
continue;
// 如果不是最后節(jié)點的話將其現(xiàn)在t.next節(jié)點作為tail尾節(jié)點。
if (tn != null) {
advanceTail(t, tn);
continue;
}
// 如果當前節(jié)點不是尾節(jié)點進入到這里面县钥。
if (s != t) { // If not tail, try to unsplice
// 獲取當前節(jié)點(被取消的節(jié)點)的下一個節(jié)點秀姐。
QNode sn = s.next;
// 修改上一個節(jié)點的next(下一個)元素為下下個節(jié)點。
if (sn == s || pred.casNext(s, sn))
return;
}
QNode dp = cleanMe;
// 嘗試清除上一個標記為清除的節(jié)點
if (dp != null) { // Try unlinking previous cancelled node
//1.獲取要被清除的節(jié)點
QNode d = dp.next;
QNode dn;
if (d == null || // d is gone or
d == dp || // d is off list or
!d.isCancelled() || // d not cancelled or
(d != t && // d not tail and
(dn = d.next) != null && // has successor
dn != d && // that is on list
dp.casNext(d, dn))) // d unspliced
casCleanMe(dp, null);
if (dp == pred)
return; // s is already saved node
} else if (casCleanMe(null, pred))
return; // Postpone cleaning s
}
}
E transfer(E e, boolean timed, long nanos) {
QNode s = null; // constructed/reused as needed
// 標識此次操作是存數(shù)據(jù)(put)還是取數(shù)據(jù)(take)
boolean isData = (e != null);
// 自旋匹配節(jié)點
for (;;) {
QNode t = tail;
QNode h = head;
// 如果頭節(jié)點或尾節(jié)點為空繼續(xù)自旋(在TransferQueue初始化的時候已經(jīng)賦值頭尾結(jié)點)
if (t == null || h == null) // saw uninitialized value
continue; // spin
// h == t 說明頭尾結(jié)點相同若贮,是空隊列
// t.isData == isData 說明尾節(jié)點與當前操作一樣
if (h == t || t.isData == isData) { // empty or same-mode
QNode tn = t.next;
// 如果臨時變量 t 不等于尾節(jié)點省有,說明有其它線程改變了尾節(jié)點,則重新自旋匹配節(jié)點
if (t != tail) // inconsistent read
continue;
// 如果尾節(jié)點之后的節(jié)點值不為空谴麦,說明也是有其它線程改變了尾節(jié)點蠢沿,將tn節(jié)點賦值給尾節(jié)點
if (tn != null) { // lagging tail
advanceTail(t, tn);
continue;
}
//超時直接返回 null
if (timed && nanos <= 0)
return null;
// 創(chuàng)建node節(jié)點
if (s == null)
s = new QNode(e, isData);
// 將新創(chuàng)建的node節(jié)點添加到隊列尾部,如果失敗則重新自旋
if (!t.casNext(null, s))
continue;
// 更新新創(chuàng)建節(jié)點為尾節(jié)點
advanceTail(t, s);
// 調(diào)用 awaitFulfill 方法自旋匹配等待節(jié)點
Object x = awaitFulfill(s, e, timed, nanos);
// 如果返回當前節(jié)點匾效,則說明節(jié)點由于被取消舷蟀、超時、中斷導(dǎo)致匹配失敗
if (x == s) { // wait was cancelled
// 清除當前等待匹配節(jié)點
clean(t, s);
return null;
}
// 判斷節(jié)點是否已從隊列離開
if (!s.isOffList()) { // not already unlinked
// 嘗試將s節(jié)點設(shè)置為head面哼,移出t
advanceHead(t, s); // unlink if head
if (x != null) // and forget fields
s.item = s;
// 釋放 s 節(jié)點當前的等待線程
s.waiter = null;
}
// 返回節(jié)點值(put返回put操作的值野宜,take返回匹配到的節(jié)點值)
return (x != null) ? (E)x : e;
} else {// 隊列不為空,并且當前操作與尾節(jié)點的操作不一致魔策。所以當前操作與尾節(jié)點的操作是互相匹配的
QNode m = h.next; // node to fulfill
if (t != tail || m == null || h != head)
continue; // inconsistent read
Object x = m.item;
// isData == (x != null):判斷isData與x的模式是否相同匈子,相同表示已經(jīng)完成匹配,繼續(xù)自旋
// x == m :m節(jié)點被取消了
// !m.casItem(x, e):如果嘗試將數(shù)據(jù)e設(shè)置到m上失敗
if (isData == (x != null) || // m already fulfilled
x == m || // m cancelled
!m.casItem(x, e)) { // lost CAS
// 將m設(shè)置為頭結(jié)點代乃,h出列旬牲,然后重試
advanceHead(h, m); // dequeue and retry
continue;
}
// 成功匹配了,節(jié)點m 設(shè)置為頭結(jié)點搁吓,h出列
advanceHead(h, m); // successfully fulfilled
// 喚醒節(jié)點 m 的等待線程
LockSupport.unpark(m.waiter);
// 返回節(jié)點值(put返回put操作的值原茅,take返回匹配到的節(jié)點值)
return (x != null) ? (E)x : e;
}
}
}
OMG!源碼又是那么長堕仔。但其實也不是很難擂橘,知道一個規(guī)律就行:由于是使用隊列作為公平策略,所以在put的時候會在隊列尾部添加數(shù)據(jù)摩骨,而take的時候會從隊列尾部向隊列頭部方向?qū)ふ业谝粋€被阻塞的線程通贞,這樣就可以保證公平的朗若、按順序的釋放被阻塞的線程。
先簡單的總結(jié)一下核心流程(TransferQueue.trasnfer方法):
1.獲取當前操作是存數(shù)據(jù)還是取數(shù)據(jù)
2.自旋尋找匹配的節(jié)點(put操作匹配take操作昌罩、take操作匹配put操作)
3.如果頭節(jié)點或尾節(jié)點為空哭懈,則繼續(xù)執(zhí)行第2步
4.如果是空隊列或尾節(jié)點和當前操作一樣,執(zhí)行第5步茎用,否則執(zhí)行第13步
5.如果尾節(jié)點被其它線程更改遣总,重新執(zhí)行第2步,否則執(zhí)行第6步
6.如果尾節(jié)點的后繼節(jié)點不為空轨功,說明有其它線程更改旭斥,則設(shè)置后繼節(jié)點為尾節(jié)點,并重新執(zhí)行第2步古涧;否則執(zhí)行第7步
7.如果超時直接返回null垂券,否則往下執(zhí)行第8步
8.為當前操作創(chuàng)建節(jié)點并添加到隊列尾部,如果添加成功往下執(zhí)行第9步羡滑,否則執(zhí)行第2步
9.自旋匹配等待節(jié)點菇爪,當返回節(jié)點與當前節(jié)點不一樣,說明節(jié)點匹配成功啄栓,執(zhí)行第10步娄帖;否則說明由于被取消也祠、超時昙楚、中斷導(dǎo)致匹配失敗,則清除當前節(jié)點并返回null
10.如果節(jié)點已從隊列離開诈嘿,執(zhí)行第11步堪旧,否則執(zhí)行第12步
11.返回節(jié)點值(put返回put操作的值,take返回匹配到的節(jié)點值)
12.將節(jié)點從隊列中移除奖亚,并重新設(shè)置隊列頭節(jié)點后執(zhí)行第11步
13.執(zhí)行到這一步淳梦,說明當前操作與尾節(jié)點的操作是互相匹配;那么如果隊列頭節(jié)點是否匹配完成或隊列頭節(jié)點被取消昔字,又或者cas更新頭節(jié)點操作失敗爆袍,則執(zhí)行第14步,否則執(zhí)行第15步
14.重新設(shè)置頭節(jié)點并執(zhí)行第2步
15.執(zhí)行到這一步代表匹配成功作郭,重新設(shè)置隊列的頭節(jié)點陨囊,并喚醒頭節(jié)點的等待線程,最后返回節(jié)點值(put返回put操作的值夹攒,take返回匹配到的節(jié)點值)
還是依照個人習(xí)慣蜘醋,喜歡通過畫圖分析一下源碼流程!
總結(jié):
- SynchronousQueue 是一個沒有隊列大小的概念咏尝,所有的操作都必須與其匹配的節(jié)點共同入隊出隊(公平模式)或入棧出棧(非公平模式)
- SynchronousQueue 是輕量級的阻塞隊列压语。因為SynchronousQueue是沒有使用到鎖啸罢,都是通過CAS方法保證線程安全
其實也不難發(fā)現(xiàn),SynchronousQueue 的缺點也是十分明顯胎食。如果同一個模式的節(jié)點多的話扰才,就會一直阻塞,這是會損耗性能厕怜,所以需要根據(jù)實際業(yè)務(wù)場景使用训桶。
最后希望讀者們看源碼的時候,親自debug酣倾,這樣才會加深源碼的理解舵揭,讀任何文章都只是輔助,自己真正理解才是學(xué)會東西躁锡。
如果覺得源碼剖析不錯的話午绳,麻煩點個贊哈!對于文章有哪里不清楚或者有誤的地方映之,歡迎在評論區(qū)留言~
參考資料:
https://www.cnblogs.com/dwlsxj/p/Thread.html
慕課網(wǎng):面試官系統(tǒng)精講Java源碼及大廠真題: https://www.imooc.com/read/47/article/862