簡(jiǎn)介
SynchronousQueue 沒(méi)有長(zhǎng)度萎馅,每一個(gè)入隊(duì)操作必須對(duì)應(yīng)一個(gè)出隊(duì)操作盆犁,或者每一個(gè)出隊(duì)操作必須對(duì)應(yīng)一個(gè)入棧操作砸捏,否則阻塞。SynchronousQueue內(nèi)部提供兩種模式TransferStack非公平模式(LIFO)和TransferQueue公平模式(FIFO)离福。
SynchronousQueue 類(lèi)
public class SynchronousQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable
SynchronousQueue 繼承AbstractQueue抽象類(lèi),并實(shí)現(xiàn)BlockingQueue接口
重要內(nèi)部類(lèi)Transferer
abstract static class Transferer<E>
Transferer是抽象類(lèi)炼蛤,它有兩個(gè)實(shí)現(xiàn)TransferStack妖爷、TransferQueue
Transferer 方法
abstract E transfer(E e, boolean timed, long nanos);
此方法可以既可以執(zhí)行put也可以執(zhí)行take操作。
重要內(nèi)部類(lèi)TransferStack
static final class TransferStack<E> extends Transferer<E>
TransferStack 內(nèi)部類(lèi)SNode
static final class SNode
TransferStack.SNode 屬性
// 后面節(jié)點(diǎn)
volatile SNode next;
// 匹配節(jié)點(diǎn)
volatile SNode match;
// 等待線程
volatile Thread waiter;
// 元素
Object item;
// 節(jié)點(diǎn)模式
int mode;
// 內(nèi)存操作不安全類(lèi)
private static final sun.misc.Unsafe UNSAFE;
// 匹配節(jié)點(diǎn)偏移量
private static final long matchOffset;
// 后續(xù)節(jié)點(diǎn)偏移量
private static final long nextOffset;
TransferStack.SNode 靜態(tài)加載偏移量
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = SNode.class;
matchOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("match"));
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
} catch (Exception e) {
throw new Error(e);
}
}
TransferStack.SNode 構(gòu)造函數(shù)
SNode(Object item) {
this.item = item;
}
TransferStack.SNode 方法
// 修改后續(xù)節(jié)點(diǎn)
boolean casNext(SNode cmp, SNode val) {
return cmp == next &&
UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
// 嘗試匹配
boolean tryMatch(SNode s) {
// 修改匹配節(jié)點(diǎn)
if (match == null &&
UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
// 喚醒等待線程
Thread w = waiter;
if (w != null) {
waiter = null;
LockSupport.unpark(w);
}
return true;
}
return match == s;
}
// 嘗試取消等待
void tryCancel() {
UNSAFE.compareAndSwapObject(this, matchOffset, null, this);
}
// 是否匹配
boolean isCancelled() {
return match == this;
}
TransferStack 屬性
// 消費(fèi)者
static final int REQUEST = 0;
// 生產(chǎn)者
static final int DATA = 1;
// 生產(chǎn)者在等待消費(fèi)者消費(fèi)
static final int FULFILLING = 2;
// 頭節(jié)點(diǎn)
volatile SNode head;
// 內(nèi)存操作不安全類(lèi)
private static final sun.misc.Unsafe UNSAFE;
// head偏移量
private static final long headOffset;
TransferStack 靜態(tài)加載偏移量
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = TransferStack.class;
headOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("head"));
} catch (Exception e) {
throw new Error(e);
}
}
TransferStack 基礎(chǔ)方法
// 更新頭節(jié)點(diǎn)
boolean casHead(SNode h, SNode nh) {
// 把頭節(jié)點(diǎn)從h更新為nh
return h == head &&
UNSAFE.compareAndSwapObject(this, headOffset, h, nh);
}
// 判斷是否為FULFILLING模式
static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; }
// 設(shè)置節(jié)點(diǎn)屬性理朋,節(jié)點(diǎn)為空創(chuàng)建新節(jié)點(diǎn)
static SNode snode(SNode s, Object e, SNode next, int mode) {
// s 為空赠涮,創(chuàng)建新節(jié)點(diǎn)
if (s == null) s = new SNode(e);
// 設(shè)置s屬性
s.mode = mode;
s.next = next;
return s;
}
// 如果節(jié)點(diǎn)在棧頭或棧頭為FULFILLING的節(jié)點(diǎn)子寓,則返回true
boolean shouldSpin(SNode s) {
SNode h = head;
return (h == s || h == null || isFulfilling(h.mode));
}
TransferStack 重要方法
入隊(duì)出隊(duì)
E transfer(E e, boolean timed, long nanos) {
// 根據(jù)元素判斷模式
SNode s = null;
int mode = (e == null) ? REQUEST : DATA;
// 自旋
for (;;) {
SNode h = head;
// 頭節(jié)點(diǎn)模式與當(dāng)前模式一樣
if (h == null || h.mode == mode) {
// 如果超時(shí),則取消等待
if (timed && nanos <= 0) {
if (h != null && h.isCancelled())
casHead(h, h.next);
else
return null;
}
// 沒(méi)有超時(shí)笋除,入棧斜友,head指向他
else if (casHead(h, s = snode(s, e, h, mode))) {
// 自旋等待匹配
SNode m = awaitFulfill(s, timed, nanos);
// 取消等待
if (m == s) {
// 清理取消等待的節(jié)點(diǎn)
clean(s);
return null;
}
// 頭節(jié)點(diǎn)匹配成功,頭后移
if ((h = head) != null && h.next == s)
casHead(h, s.next);
// REQUEST返回匹配元素垃它,DATA返回本身
return (E) ((mode == REQUEST) ? m.item : s.item);
}
}
// 頭節(jié)點(diǎn)不為Fulfilling模式
else if (!isFulfilling(h.mode)) {
// 頭節(jié)點(diǎn)是否取消等待
if (h.isCancelled())
// 頭節(jié)點(diǎn)后移
casHead(h, h.next);
// 入棧修改頭節(jié)點(diǎn)
else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
// 自旋
for (;;) {
SNode m = s.next;
// next節(jié)點(diǎn)為null鲜屏,則出棧
if (m == null) {
casHead(s, null);
s = null;
break;
}
SNode mn = m.next;
// 嘗試匹配是s節(jié)點(diǎn)
if (m.tryMatch(s)) {
// s出棧
casHead(s, mn);
return (E) ((mode == REQUEST) ? m.item : s.item);
} else
// 節(jié)點(diǎn)后移
s.casNext(m, mn);
}
}
}
// 頭節(jié)點(diǎn)為Fulfilling模式
else {
SNode m = h.next;
// 頭節(jié)點(diǎn)next為空,修改頭節(jié)點(diǎn)
if (m == null)
casHead(h, null);
else {
SNode mn = m.next;
// 試匹配国拇,如果匹配成功洛史,
// 棧頭和匹配節(jié)點(diǎn)出棧,否則跳過(guò)后繼節(jié)點(diǎn)
if (m.tryMatch(h))
casHead(h, mn);
else
h.casNext(m, mn);
}
}
}
}
自旋或阻塞
SNode awaitFulfill(SNode s, boolean timed, long nanos) {
// 獲取剩余等待時(shí)間
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Thread w = Thread.currentThread();
// 獲取自旋次數(shù)
int spins = (shouldSpin(s) ?
(timed ? maxTimedSpins : maxUntimedSpins) : 0);
// 自旋
for (;;) {
// 響應(yīng)中斷
if (w.isInterrupted())
s.tryCancel();
// 獲取匹配節(jié)點(diǎn)
SNode m = s.match;
if (m != null)
return m;
// 是否需要等待
if (timed) {
nanos = deadline - System.nanoTime();
// 等待時(shí)間小于0酱吝,出隊(duì)
if (nanos <= 0L) {
s.tryCancel();
continue;
}
}
// 自旋次數(shù)減一
if (spins > 0)
spins = shouldSpin(s) ? (spins-1) : 0;
// 數(shù)組等待線程
else if (s.waiter == null)
s.waiter = w;
// 等待
else if (!timed)
LockSupport.park(this);
else if (nanos > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanos);
}
}
移除隊(duì)列中取消等待的線程節(jié)點(diǎn)
void clean(SNode s) {
// 置空元素和等待線程
s.item = null;
s.waiter = null;
SNode past = s.next;
// past已取消節(jié)點(diǎn)后移
if (past != null && past.isCancelled())
past = past.next;
// 循環(huán)修改頭節(jié)點(diǎn)
SNode p;
while ((p = head) != null && p != past && p.isCancelled())
// 設(shè)置棧頭節(jié)點(diǎn)的next為第一個(gè)非取消等待的節(jié)點(diǎn)
casHead(p, p.next);
// 遍歷棧
while (p != null && p != past) {
SNode n = p.next;
// 移除取消等待的節(jié)點(diǎn)
if (n != null && n.isCancelled())
p.casNext(n, n.next);
else
// 節(jié)點(diǎn)后移
p = n;
}
}
這里說(shuō)一下大概邏輯喻圃,TransferStack在執(zhí)行讀寫(xiě)時(shí),首先判斷元素是否為空袋狞,為空REQUEST模式吓笙,否則DATA模式。隊(duì)列為空或當(dāng)前模式與隊(duì)頭模式一樣崎岂,自旋阻塞捆毫;隊(duì)列不為空且與隊(duì)頭的模式不同,匹配成功冲甘,出隊(duì)操作绩卤;隊(duì)列不為空且隊(duì)頭為FULFILLING模式,從隊(duì)頭往后遍歷找第一個(gè)非FULFILLING模式匹配江醇,匹配成功出隊(duì)濒憋。
重要內(nèi)部類(lèi)TransferQueue
static final class TransferQueue<E> extends Transferer<E>
TransferQueue 內(nèi)部類(lèi)QNode
static final class QNode
TransferQueue.QNode 屬性
// 下一個(gè)節(jié)點(diǎn)
volatile QNode next;
// 節(jié)點(diǎn)元素
volatile Object item;
// 等待線程
volatile Thread waiter;
// 是否為DATA模式
final boolean isData;
// 內(nèi)存操作不安全類(lèi)
private static final sun.misc.Unsafe UNSAFE;
// item偏移量
private static final long itemOffset;
// next偏移量
private static final long nextOffset;
TransferQueue.QNode 加載獲取偏移量
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = QNode.class;
itemOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("item"));
nextOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("next"));
} catch (Exception e) {
throw new Error(e);
}
}
TransferQueue.QNode 構(gòu)造函數(shù)
QNode(Object item, boolean isData) {
this.item = item;
this.isData = isData;
}
TransferQueue.QNode 方法
// CAS設(shè)置next屬性
boolean casNext(QNode cmp, QNode val) {
return next == cmp &&
UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
// CAS設(shè)置元素值
boolean casItem(Object cmp, Object val) {
return item == cmp &&
UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
// 取消節(jié)點(diǎn)等待(方便GC)
void tryCancel(Object cmp) {
UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this);
}
// 是否取消等待
boolean isCancelled() {
return item == this;
}
// 是否已出隊(duì)
boolean isOffList() {
return next == this;
}
TransferQueue 屬性
// 頭節(jié)點(diǎn)
transient volatile QNode head;
// 尾節(jié)點(diǎn)
transient volatile QNode tail;
// 待取消節(jié)點(diǎn)
transient volatile QNode cleanMe;
// 內(nèi)存操作不安全類(lèi)
private static final sun.misc.Unsafe UNSAFE;
// 頭節(jié)點(diǎn)偏移量
private static final long headOffset;
// 尾節(jié)點(diǎn)偏移量
private static final long tailOffset;
// 待取消節(jié)點(diǎn)偏移量
private static final long cleanMeOffset;
TransferQueue 加載獲取偏移量
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = TransferQueue.class;
headOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("head"));
tailOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("tail"));
cleanMeOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("cleanMe"));
} catch (Exception e) {
throw new Error(e);
}
}
TransferQueue 構(gòu)造函數(shù)
TransferQueue() {
// 初始化一個(gè)空的QNode
// isData為false
QNode h = new QNode(null, false);
// 設(shè)置頭節(jié)點(diǎn)和尾節(jié)點(diǎn)
head = h;
tail = h;
}
TransferQueue 基礎(chǔ)方法
// 嘗試修改新頭節(jié)點(diǎn)
void advanceHead(QNode h, QNode nh) {
if (h == head &&
UNSAFE.compareAndSwapObject(this, headOffset, h, nh))
h.next = h; // forget old next
}
// 嘗試修改新尾節(jié)點(diǎn)
void advanceTail(QNode t, QNode nt) {
if (tail == t)
UNSAFE.compareAndSwapObject(this, tailOffset, t, nt);
}
// 嘗試修改新取消等待節(jié)點(diǎn)
boolean casCleanMe(QNode cmp, QNode val) {
return cleanMe == cmp &&
UNSAFE.compareAndSwapObject(this, cleanMeOffset, cmp, val);
}
TransferQueue 重要方法
入隊(duì)出隊(duì)
E transfer(E e, boolean timed, long nanos) {
QNode s = null;
// e不為null,則為DATA模式陶夜,否則為REQUEST模式
boolean isData = (e != null);
for (;;) {
// 獲取當(dāng)前頭尾節(jié)點(diǎn)
QNode t = tail;
QNode h = head;
// 頭或尾節(jié)點(diǎn)為空
if (t == null || h == null)
continue;
// 頭尾節(jié)點(diǎn)一樣凛驮,模式一樣
if (h == t || t.isData == isData) {
QNode tn = t.next;
// 尾節(jié)點(diǎn)已變化
if (t != tail)
continue;
// 尾節(jié)點(diǎn)next不為空
if (tn != null) {
// 嘗試修改尾節(jié)點(diǎn)
advanceTail(t, tn);
continue;
}
// 超時(shí),并且剩余時(shí)間小于0
if (timed && nanos <= 0)
// 返回null
return null;
// 新節(jié)點(diǎn)為空律适,初始化新節(jié)點(diǎn)
if (s == null)
s = new QNode(e, isData);
// 尾節(jié)點(diǎn)的next為null辐烂,就把新節(jié)點(diǎn)加到后面
if (!t.casNext(null, s))
// 替換失敗,開(kāi)始下輪自旋
continue;
// 嘗試修改尾節(jié)點(diǎn)
advanceTail(t, s);
// 阻塞等待
Object x = awaitFulfill(s, e, timed, nanos);
// 如果s指向自己捂贿,s出隊(duì)列
if (x == s) {
// 清除隊(duì)列中取消等待的線程節(jié)點(diǎn)
clean(t, s);
return null;
}
// 是否已出隊(duì)
if (!s.isOffList()) {
// 修改頭節(jié)點(diǎn)
advanceHead(t, s);
// 元素指向自己
if (x != null)
s.item = s;
// 等待取消
s.waiter = null;
}
// 返回元素
return (x != null) ? (E)x : e;
}
// 頭尾節(jié)點(diǎn)不一樣
else {
QNode m = h.next;
// 頭尾節(jié)點(diǎn)變化
if (t != tail || m == null || h != head)
continue;
// 獲取當(dāng)前元素
Object x = m.item;
// 后續(xù)節(jié)點(diǎn)模式跟當(dāng)前模式一樣
// 或者已經(jīng)嘗試取消
// 或者修改后續(xù)節(jié)點(diǎn)元素為當(dāng)前元素(交換元素)
if (isData == (x != null) || x == m ||
!m.casItem(x, e)) {
// 出隊(duì)
advanceHead(h, m);
// 進(jìn)入下次自旋纠修,修改對(duì)方線程
continue;
}
// 直接修改頭節(jié)點(diǎn)
advanceHead(h, m);
// 解除后續(xù)節(jié)點(diǎn)阻塞
LockSupport.unpark(m.waiter);
// 返回后續(xù)節(jié)點(diǎn)元素
return (x != null) ? (E)x : e;
}
}
}
自旋阻塞
Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
// 計(jì)算超時(shí)時(shí)間
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Thread w = Thread.currentThread();
// 自旋次數(shù)
int spins = ((head.next == s) ?
(timed ? maxTimedSpins : maxUntimedSpins) : 0);
for (;;) {
// 如果中斷,則取消等待
if (w.isInterrupted())
// 把s的item從e修改為s
s.tryCancel(e);
// 獲取s的元素
Object x = s.item;
// s的item不為e厂僧,直接返回x
if (x != e)
return x;
// 超時(shí)
if (timed) {
// 計(jì)算剩余超時(shí)時(shí)間
nanos = deadline - System.nanoTime();
// 超時(shí)時(shí)間小于等于0扣草,取消節(jié)點(diǎn)等待
if (nanos <= 0L) {
s.tryCancel(e);
continue;
}
}
// 自旋次數(shù)減一
if (spins > 0)
--spins;
// 等待線程不為空
else if (s.waiter == null)
// 設(shè)置等待線程為當(dāng)前線程
s.waiter = w;
// 沒(méi)有超時(shí)
else if (!timed)
// 開(kāi)始阻塞
LockSupport.park(this);
// 超時(shí)時(shí)間大于1000
else if (nanos > spinForTimeoutThreshold)
// 阻塞
LockSupport.parkNanos(this, nanos);
}
}
移除隊(duì)列中取消等待的線程節(jié)點(diǎn)
void clean(QNode pred, QNode s) {
s.waiter = null;
// 遍歷
while (pred.next == s) {
// 獲取頭節(jié)點(diǎn)和頭節(jié)點(diǎn)next
QNode h = head;
QNode hn = h.next;
// 頭節(jié)點(diǎn)next不為空,并且是取消等待節(jié)點(diǎn)
if (hn != null && hn.isCancelled()) {
// 修改頭節(jié)點(diǎn)
advanceHead(h, hn);
continue;
}
// 獲取尾節(jié)點(diǎn)
QNode t = tail;
// 頭尾一樣時(shí)返回
if (t == h)
return;
// 尾節(jié)點(diǎn)有next
QNode tn = t.next;
// 非一致性讀
if (t != tail)
continue;
// 尾節(jié)點(diǎn)next不為空,修改尾節(jié)點(diǎn)
if (tn != null) {
advanceTail(t, tn);
continue;
}
// s不是尾節(jié)點(diǎn)
if (s != t) {
// 修改pred下級(jí)節(jié)點(diǎn)
QNode sn = s.next;
if (sn == s || pred.casNext(s, sn))
return;
}
// 獲取取消節(jié)點(diǎn)
QNode dp = cleanMe;
// 取消節(jié)點(diǎn)不為空
if (dp != null) {
// 獲取取消節(jié)點(diǎn)next
QNode d = dp.next;
QNode dn;
// 移除前一個(gè)取消等待的節(jié)點(diǎn)
if (d == null ||
d == dp ||
!d.isCancelled() ||
(d != t &&
(dn = d.next) != null &&
dn != d &&
dp.casNext(d, dn)))
casCleanMe(dp, null);
if (dp == pred)
return;
}
// 設(shè)置取消節(jié)點(diǎn)
else if (casCleanMe(null, pred))
return;
}
}
這里說(shuō)一下大概邏輯辰妙,TransferQueue在執(zhí)行讀寫(xiě)時(shí)鹰祸,首先判斷元素是否為空,為空REQUEST模式密浑,否則DATA模式蛙婴。隊(duì)列為空或當(dāng)前模式與隊(duì)尾模式一樣,自旋阻塞尔破;隊(duì)列不為空且與隊(duì)頭的模式不同街图,匹配成功,出隊(duì)操作懒构。
SynchronousQueue 屬性
// CPU的數(shù)量
static final int NCPUS = Runtime.getRuntime().availableProcessors();
// 有超時(shí)的情況自旋多少次餐济,當(dāng)CPU數(shù)量小于2的時(shí)候不自旋
static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32;
// 沒(méi)有超時(shí)的情況自旋多少次
static final int maxUntimedSpins = maxTimedSpins * 16;
// 剩余時(shí)間閾值常量(沒(méi)有超時(shí)時(shí)間會(huì)用到)
static final long spinForTimeoutThreshold = 1000L;
// Transferer模式
private transient volatile Transferer<E> transferer;
// 下面三個(gè)都是序列化時(shí)使用
private ReentrantLock qlock;
private WaitQueue waitingProducers;
private WaitQueue waitingConsumers;
SynchronousQueue 構(gòu)造函數(shù)
// 默認(rèn)初始化
public SynchronousQueue() {
// 默認(rèn)非公平模式
this(false);
}
// 設(shè)置是否使用公平模式
public SynchronousQueue(boolean fair) {
transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}
SynchronousQueue 基礎(chǔ)方法
public boolean isEmpty() {
return true;
}
public int size() {
return 0;
}
public int remainingCapacity() {
return 0;
}
public void clear() {
}
public boolean contains(Object o) {
return false;
}
public boolean remove(Object o) {
return false;
}
public boolean containsAll(Collection<?> c) {
return c.isEmpty();
}
public boolean removeAll(Collection<?> c) {
return false;
}
public boolean retainAll(Collection<?> c) {
return false;
}
public E peek() {
return null;
}
可以說(shuō)SynchronousQueue是沒(méi)有容量的(只有生成者線程或者消費(fèi)者線程),所以長(zhǎng)度可以看做0胆剧,也不能peek絮姆。
SynchronousQueue 入隊(duì)出隊(duì)
// 入隊(duì),沒(méi)有出隊(duì)操作一直阻塞
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// 返回為null秩霍,則put失敗篙悯,中斷當(dāng)前線程
if (transferer.transfer(e, false, 0) == null) {
Thread.interrupted();
throw new InterruptedException();
}
}
// 入隊(duì),超時(shí)阻塞
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (e == null) throw new NullPointerException();
if (transferer.transfer(e, true, unit.toNanos(timeout)) != null)
return true;
// 返回為null前域,則offer失敗辕近,中斷當(dāng)前線程
if (!Thread.interrupted())
return false;
throw new InterruptedException();
}
// 入隊(duì)
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
// 入隊(duì)
return transferer.transfer(e, true, 0) != null;
}
// 出隊(duì)韵吨,沒(méi)有入隊(duì)操作一直阻塞
public E take() throws InterruptedException {
E e = transferer.transfer(null, false, 0);
if (e != null)
return e;
Thread.interrupted();
throw new InterruptedException();
}
// 出隊(duì)匿垄,超時(shí)阻塞
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
// 出隊(duì),獲取返回值
E e = transferer.transfer(null, true, unit.toNanos(timeout));
// 返回值不為空归粉,或者線程未中斷椿疗,返回結(jié)果
if (e != null || !Thread.interrupted())
return e;
throw new InterruptedException();
}
// 出隊(duì)
public E poll() {
// 出隊(duì)
return transferer.transfer(null, true, 0);
}