數(shù)據(jù)結(jié)構(gòu) - SynchronousQueue 線程通信阻塞隊(duì)列

簡(jiǎn)介

SynchronousQueue 沒(méi)有長(zhǎng)度萎馅,每一個(gè)入隊(duì)操作必須對(duì)應(yīng)一個(gè)出隊(duì)操作盆犁,或者每一個(gè)出隊(duì)操作必須對(duì)應(yīng)一個(gè)入棧操作砸捏,否則阻塞。SynchronousQueue內(nèi)部提供兩種模式TransferStack非公平模式(LIFO)和TransferQueue公平模式(FIFO)离福。

SynchronousQueue 類(lèi)

public class SynchronousQueue<E> extends AbstractQueue<E>
    implements BlockingQueue<E>, java.io.Serializable

SynchronousQueue 繼承AbstractQueue抽象類(lèi),并實(shí)現(xiàn)BlockingQueue接口

重要內(nèi)部類(lèi)Transferer

abstract static class Transferer<E>

Transferer是抽象類(lèi)炼蛤,它有兩個(gè)實(shí)現(xiàn)TransferStack妖爷、TransferQueue

Transferer 方法

abstract E transfer(E e, boolean timed, long nanos);

此方法可以既可以執(zhí)行put也可以執(zhí)行take操作。

重要內(nèi)部類(lèi)TransferStack

static final class TransferStack<E> extends Transferer<E>

TransferStack 內(nèi)部類(lèi)SNode

static final class SNode

TransferStack.SNode 屬性

// 后面節(jié)點(diǎn)
volatile SNode next;
// 匹配節(jié)點(diǎn)
volatile SNode match;
// 等待線程
volatile Thread waiter;
// 元素
Object item;
// 節(jié)點(diǎn)模式
int mode;
// 內(nèi)存操作不安全類(lèi)
private static final sun.misc.Unsafe UNSAFE;
// 匹配節(jié)點(diǎn)偏移量
private static final long matchOffset;
// 后續(xù)節(jié)點(diǎn)偏移量
private static final long nextOffset;

TransferStack.SNode 靜態(tài)加載偏移量

static {
    try {
        UNSAFE = sun.misc.Unsafe.getUnsafe();
        Class<?> k = SNode.class;
        matchOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("match"));
        nextOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("next"));
    } catch (Exception e) {
        throw new Error(e);
    }
}

TransferStack.SNode 構(gòu)造函數(shù)

SNode(Object item) {
    this.item = item;
}

TransferStack.SNode 方法

// 修改后續(xù)節(jié)點(diǎn)
boolean casNext(SNode cmp, SNode val) {
    return cmp == next &&
            UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
// 嘗試匹配
boolean tryMatch(SNode s) {
    // 修改匹配節(jié)點(diǎn)    
    if (match == null &&
            UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
        // 喚醒等待線程
        Thread w = waiter;
        if (w != null) {
            waiter = null;
            LockSupport.unpark(w);
        }
        return true;
    }
    return match == s;
}
// 嘗試取消等待
void tryCancel() {
    UNSAFE.compareAndSwapObject(this, matchOffset, null, this);
}
// 是否匹配
boolean isCancelled() {
    return match == this;
} 

TransferStack 屬性

// 消費(fèi)者
static final int REQUEST    = 0;
// 生產(chǎn)者
static final int DATA       = 1;
// 生產(chǎn)者在等待消費(fèi)者消費(fèi)
static final int FULFILLING = 2;
// 頭節(jié)點(diǎn)
volatile SNode head;
// 內(nèi)存操作不安全類(lèi)
private static final sun.misc.Unsafe UNSAFE;
// head偏移量
private static final long headOffset;

TransferStack 靜態(tài)加載偏移量

static {
    try {
        UNSAFE = sun.misc.Unsafe.getUnsafe();
        Class<?> k = TransferStack.class;
        headOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("head"));
    } catch (Exception e) {
        throw new Error(e);
    }
}

TransferStack 基礎(chǔ)方法

// 更新頭節(jié)點(diǎn)
boolean casHead(SNode h, SNode nh) {
    // 把頭節(jié)點(diǎn)從h更新為nh
    return h == head &&
            UNSAFE.compareAndSwapObject(this, headOffset, h, nh);
}
// 判斷是否為FULFILLING模式
static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; }
// 設(shè)置節(jié)點(diǎn)屬性理朋,節(jié)點(diǎn)為空創(chuàng)建新節(jié)點(diǎn)
static SNode snode(SNode s, Object e, SNode next, int mode) {
    // s 為空赠涮,創(chuàng)建新節(jié)點(diǎn)
    if (s == null) s = new SNode(e);
    // 設(shè)置s屬性
    s.mode = mode;
    s.next = next;
    return s;
}
// 如果節(jié)點(diǎn)在棧頭或棧頭為FULFILLING的節(jié)點(diǎn)子寓,則返回true
boolean shouldSpin(SNode s) {
    SNode h = head;
    return (h == s || h == null || isFulfilling(h.mode));
}

TransferStack 重要方法

入隊(duì)出隊(duì)
E transfer(E e, boolean timed, long nanos) {
    // 根據(jù)元素判斷模式
    SNode s = null;
    int mode = (e == null) ? REQUEST : DATA;
    // 自旋
    for (;;) {
        SNode h = head;
        // 頭節(jié)點(diǎn)模式與當(dāng)前模式一樣
        if (h == null || h.mode == mode) {
            // 如果超時(shí),則取消等待
            if (timed && nanos <= 0) {
                if (h != null && h.isCancelled())
                    casHead(h, h.next);
                else
                    return null;
            }
            // 沒(méi)有超時(shí)笋除,入棧斜友,head指向他
            else if (casHead(h, s = snode(s, e, h, mode))) {
                // 自旋等待匹配
                SNode m = awaitFulfill(s, timed, nanos);
                // 取消等待
                if (m == s) {
                    // 清理取消等待的節(jié)點(diǎn)
                    clean(s);
                    return null;
                }
                // 頭節(jié)點(diǎn)匹配成功,頭后移
                if ((h = head) != null && h.next == s)
                    casHead(h, s.next);
                // REQUEST返回匹配元素垃它,DATA返回本身
                return (E) ((mode == REQUEST) ? m.item : s.item);
            }
        }
        // 頭節(jié)點(diǎn)不為Fulfilling模式
        else if (!isFulfilling(h.mode)) {
            // 頭節(jié)點(diǎn)是否取消等待
            if (h.isCancelled())
                // 頭節(jié)點(diǎn)后移
                casHead(h, h.next);
            // 入棧修改頭節(jié)點(diǎn)
            else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
                // 自旋
                for (;;) {
                    SNode m = s.next;
                    // next節(jié)點(diǎn)為null鲜屏,則出棧
                    if (m == null) {
                        casHead(s, null);
                        s = null;
                        break;
                    }
                    SNode mn = m.next;
                    // 嘗試匹配是s節(jié)點(diǎn)
                    if (m.tryMatch(s)) {
                        // s出棧
                        casHead(s, mn);
                        return (E) ((mode == REQUEST) ? m.item : s.item);
                    } else
                        // 節(jié)點(diǎn)后移
                        s.casNext(m, mn);
                }
            }
        }
        // 頭節(jié)點(diǎn)為Fulfilling模式
        else {
            SNode m = h.next;
            // 頭節(jié)點(diǎn)next為空,修改頭節(jié)點(diǎn)
            if (m == null)
                casHead(h, null);
            else {
                SNode mn = m.next;
                // 試匹配国拇,如果匹配成功洛史,
                // 棧頭和匹配節(jié)點(diǎn)出棧,否則跳過(guò)后繼節(jié)點(diǎn) 
                if (m.tryMatch(h))
                    casHead(h, mn);
                else
                    h.casNext(m, mn);
            }
        }
    }
}

自旋或阻塞

SNode awaitFulfill(SNode s, boolean timed, long nanos) {
    // 獲取剩余等待時(shí)間
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    Thread w = Thread.currentThread();
    // 獲取自旋次數(shù)
    int spins = (shouldSpin(s) ?
            (timed ? maxTimedSpins : maxUntimedSpins) : 0);
    // 自旋
    for (;;) {
        // 響應(yīng)中斷
        if (w.isInterrupted())
            s.tryCancel();
        // 獲取匹配節(jié)點(diǎn)
        SNode m = s.match;
        if (m != null)
            return m;
        // 是否需要等待
        if (timed) {
            nanos = deadline - System.nanoTime();
            // 等待時(shí)間小于0酱吝,出隊(duì)
            if (nanos <= 0L) {
                s.tryCancel();
                continue;
            }
        }
        // 自旋次數(shù)減一
        if (spins > 0)
            spins = shouldSpin(s) ? (spins-1) : 0;
        // 數(shù)組等待線程
        else if (s.waiter == null)
            s.waiter = w;
        // 等待
        else if (!timed)
            LockSupport.park(this);
        else if (nanos > spinForTimeoutThreshold)
            LockSupport.parkNanos(this, nanos);
    }
}

移除隊(duì)列中取消等待的線程節(jié)點(diǎn)

void clean(SNode s) {
    // 置空元素和等待線程
    s.item = null;
    s.waiter = null;
    SNode past = s.next;
    // past已取消節(jié)點(diǎn)后移
    if (past != null && past.isCancelled())
        past = past.next;
    // 循環(huán)修改頭節(jié)點(diǎn)
    SNode p;
    while ((p = head) != null && p != past && p.isCancelled())
        // 設(shè)置棧頭節(jié)點(diǎn)的next為第一個(gè)非取消等待的節(jié)點(diǎn)
        casHead(p, p.next);
    // 遍歷棧
    while (p != null && p != past) {
        SNode n = p.next;
        // 移除取消等待的節(jié)點(diǎn)
        if (n != null && n.isCancelled())
            p.casNext(n, n.next);
        else
            // 節(jié)點(diǎn)后移
            p = n;
    }
}

這里說(shuō)一下大概邏輯喻圃,TransferStack在執(zhí)行讀寫(xiě)時(shí),首先判斷元素是否為空袋狞,為空REQUEST模式吓笙,否則DATA模式。隊(duì)列為空或當(dāng)前模式與隊(duì)頭模式一樣崎岂,自旋阻塞捆毫;隊(duì)列不為空且與隊(duì)頭的模式不同,匹配成功冲甘,出隊(duì)操作绩卤;隊(duì)列不為空且隊(duì)頭為FULFILLING模式,從隊(duì)頭往后遍歷找第一個(gè)非FULFILLING模式匹配江醇,匹配成功出隊(duì)濒憋。

重要內(nèi)部類(lèi)TransferQueue

static final class TransferQueue<E> extends Transferer<E>

TransferQueue 內(nèi)部類(lèi)QNode

static final class QNode

TransferQueue.QNode 屬性

// 下一個(gè)節(jié)點(diǎn)
volatile QNode next;
// 節(jié)點(diǎn)元素
volatile Object item;
// 等待線程
volatile Thread waiter;
// 是否為DATA模式
final boolean isData;
// 內(nèi)存操作不安全類(lèi)
private static final sun.misc.Unsafe UNSAFE;
// item偏移量
private static final long itemOffset;
// next偏移量
private static final long nextOffset;

TransferQueue.QNode 加載獲取偏移量

static {
    try {
        UNSAFE = sun.misc.Unsafe.getUnsafe();
        Class<?> k = QNode.class;
        itemOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("item"));
        nextOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("next"));
    } catch (Exception e) {
        throw new Error(e);
    }
}

TransferQueue.QNode 構(gòu)造函數(shù)

QNode(Object item, boolean isData) {
    this.item = item;
    this.isData = isData;
}

TransferQueue.QNode 方法

// CAS設(shè)置next屬性
boolean casNext(QNode cmp, QNode val) {
    return next == cmp &&
            UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
// CAS設(shè)置元素值
boolean casItem(Object cmp, Object val) {
    return item == cmp &&
            UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
// 取消節(jié)點(diǎn)等待(方便GC)
void tryCancel(Object cmp) {
    UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this);
}   
// 是否取消等待
boolean isCancelled() {
    return item == this;
}
// 是否已出隊(duì)
boolean isOffList() {
    return next == this;
}

TransferQueue 屬性

// 頭節(jié)點(diǎn)
transient volatile QNode head;
// 尾節(jié)點(diǎn)
transient volatile QNode tail;
// 待取消節(jié)點(diǎn)
transient volatile QNode cleanMe;
// 內(nèi)存操作不安全類(lèi)
private static final sun.misc.Unsafe UNSAFE;
// 頭節(jié)點(diǎn)偏移量
private static final long headOffset;
// 尾節(jié)點(diǎn)偏移量
private static final long tailOffset;
// 待取消節(jié)點(diǎn)偏移量
private static final long cleanMeOffset;

TransferQueue 加載獲取偏移量

static {
    try {
        UNSAFE = sun.misc.Unsafe.getUnsafe();
        Class<?> k = TransferQueue.class;
        headOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("head"));
        tailOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("tail"));
        cleanMeOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("cleanMe"));
    } catch (Exception e) {
        throw new Error(e);
    }
}

TransferQueue 構(gòu)造函數(shù)

TransferQueue() {
    // 初始化一個(gè)空的QNode
    // isData為false
    QNode h = new QNode(null, false);
    // 設(shè)置頭節(jié)點(diǎn)和尾節(jié)點(diǎn)
    head = h;
    tail = h;
}

TransferQueue 基礎(chǔ)方法

// 嘗試修改新頭節(jié)點(diǎn)
void advanceHead(QNode h, QNode nh) {
    if (h == head &&
            UNSAFE.compareAndSwapObject(this, headOffset, h, nh))
        h.next = h; // forget old next
}
// 嘗試修改新尾節(jié)點(diǎn)
void advanceTail(QNode t, QNode nt) {
    if (tail == t)
        UNSAFE.compareAndSwapObject(this, tailOffset, t, nt);
}
// 嘗試修改新取消等待節(jié)點(diǎn)
boolean casCleanMe(QNode cmp, QNode val) {
    return cleanMe == cmp &&
            UNSAFE.compareAndSwapObject(this, cleanMeOffset, cmp, val);
}

TransferQueue 重要方法

入隊(duì)出隊(duì)

E transfer(E e, boolean timed, long nanos) {
    QNode s = null;
    // e不為null,則為DATA模式陶夜,否則為REQUEST模式
    boolean isData = (e != null);
    for (;;) {
        // 獲取當(dāng)前頭尾節(jié)點(diǎn)
        QNode t = tail;
        QNode h = head;
        // 頭或尾節(jié)點(diǎn)為空
        if (t == null || h == null)
            continue;
        // 頭尾節(jié)點(diǎn)一樣凛驮,模式一樣
        if (h == t || t.isData == isData) {
            QNode tn = t.next;
            // 尾節(jié)點(diǎn)已變化
            if (t != tail)
                continue;
            // 尾節(jié)點(diǎn)next不為空
            if (tn != null) {
                // 嘗試修改尾節(jié)點(diǎn)
                advanceTail(t, tn);
                continue;
            }
            // 超時(shí),并且剩余時(shí)間小于0
            if (timed && nanos <= 0)
                // 返回null
                return null;
            // 新節(jié)點(diǎn)為空律适,初始化新節(jié)點(diǎn)
            if (s == null)
                s = new QNode(e, isData);
            // 尾節(jié)點(diǎn)的next為null辐烂,就把新節(jié)點(diǎn)加到后面
            if (!t.casNext(null, s))
                // 替換失敗,開(kāi)始下輪自旋
                continue;
            // 嘗試修改尾節(jié)點(diǎn)
            advanceTail(t, s);
            // 阻塞等待
            Object x = awaitFulfill(s, e, timed, nanos);
            // 如果s指向自己捂贿,s出隊(duì)列
            if (x == s) {
                // 清除隊(duì)列中取消等待的線程節(jié)點(diǎn)
                clean(t, s);
                return null;
            }
            // 是否已出隊(duì)
            if (!s.isOffList()) {
                // 修改頭節(jié)點(diǎn)
                advanceHead(t, s);
                // 元素指向自己
                if (x != null)
                    s.item = s;
                // 等待取消
                s.waiter = null;
            }
            // 返回元素
            return (x != null) ? (E)x : e;
        }
        // 頭尾節(jié)點(diǎn)不一樣
        else {
            QNode m = h.next;
            // 頭尾節(jié)點(diǎn)變化
            if (t != tail || m == null || h != head)
                continue;
            // 獲取當(dāng)前元素
            Object x = m.item;
            // 后續(xù)節(jié)點(diǎn)模式跟當(dāng)前模式一樣
            // 或者已經(jīng)嘗試取消
            // 或者修改后續(xù)節(jié)點(diǎn)元素為當(dāng)前元素(交換元素)
            if (isData == (x != null) || x == m ||
                    !m.casItem(x, e)) {
                // 出隊(duì)
                advanceHead(h, m);
                // 進(jìn)入下次自旋纠修,修改對(duì)方線程
                continue;
            }
            // 直接修改頭節(jié)點(diǎn)
            advanceHead(h, m);
            // 解除后續(xù)節(jié)點(diǎn)阻塞
            LockSupport.unpark(m.waiter);
            // 返回后續(xù)節(jié)點(diǎn)元素
            return (x != null) ? (E)x : e;
        }
    }
}

自旋阻塞

Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
    // 計(jì)算超時(shí)時(shí)間
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    Thread w = Thread.currentThread();
    // 自旋次數(shù)
    int spins = ((head.next == s) ?
            (timed ? maxTimedSpins : maxUntimedSpins) : 0);
    for (;;) {
        // 如果中斷,則取消等待
        if (w.isInterrupted())
            // 把s的item從e修改為s
            s.tryCancel(e);
        // 獲取s的元素
        Object x = s.item;
        // s的item不為e厂僧,直接返回x
        if (x != e)
            return x;
        // 超時(shí)
        if (timed) {
            // 計(jì)算剩余超時(shí)時(shí)間
            nanos = deadline - System.nanoTime();
            // 超時(shí)時(shí)間小于等于0扣草,取消節(jié)點(diǎn)等待
            if (nanos <= 0L) {
                s.tryCancel(e);
                continue;
            }
        }
        // 自旋次數(shù)減一
        if (spins > 0)
            --spins;
        // 等待線程不為空
        else if (s.waiter == null)
            // 設(shè)置等待線程為當(dāng)前線程
            s.waiter = w;
        // 沒(méi)有超時(shí)
        else if (!timed)
            // 開(kāi)始阻塞
            LockSupport.park(this);
        // 超時(shí)時(shí)間大于1000
        else if (nanos > spinForTimeoutThreshold)
            // 阻塞
            LockSupport.parkNanos(this, nanos);
    }
}

移除隊(duì)列中取消等待的線程節(jié)點(diǎn)

void clean(QNode pred, QNode s) {
    s.waiter = null;
    // 遍歷
    while (pred.next == s) {
        // 獲取頭節(jié)點(diǎn)和頭節(jié)點(diǎn)next
        QNode h = head;
        QNode hn = h.next;
        // 頭節(jié)點(diǎn)next不為空,并且是取消等待節(jié)點(diǎn)
        if (hn != null && hn.isCancelled()) {
            // 修改頭節(jié)點(diǎn)
            advanceHead(h, hn);
            continue;
        }
        // 獲取尾節(jié)點(diǎn)
        QNode t = tail;
        // 頭尾一樣時(shí)返回
        if (t == h)
            return;
        // 尾節(jié)點(diǎn)有next
        QNode tn = t.next;
        // 非一致性讀
        if (t != tail)
            continue;
        // 尾節(jié)點(diǎn)next不為空,修改尾節(jié)點(diǎn)
        if (tn != null) {
            advanceTail(t, tn);
            continue;
        }
        // s不是尾節(jié)點(diǎn)
        if (s != t) {
            // 修改pred下級(jí)節(jié)點(diǎn)
            QNode sn = s.next;
            if (sn == s || pred.casNext(s, sn))
                return;
        }
        // 獲取取消節(jié)點(diǎn)
        QNode dp = cleanMe;
        // 取消節(jié)點(diǎn)不為空
        if (dp != null) {
            // 獲取取消節(jié)點(diǎn)next
            QNode d = dp.next;
            QNode dn;
            // 移除前一個(gè)取消等待的節(jié)點(diǎn)
            if (d == null ||
                    d == dp ||
                    !d.isCancelled() ||
                    (d != t &&
                            (dn = d.next) != null &&
                            dn != d &&
                            dp.casNext(d, dn)))
                casCleanMe(dp, null);
            if (dp == pred)
                return;
        }
        // 設(shè)置取消節(jié)點(diǎn)
        else if (casCleanMe(null, pred))
            return;
    }
}

這里說(shuō)一下大概邏輯辰妙,TransferQueue在執(zhí)行讀寫(xiě)時(shí)鹰祸,首先判斷元素是否為空,為空REQUEST模式密浑,否則DATA模式蛙婴。隊(duì)列為空或當(dāng)前模式與隊(duì)尾模式一樣,自旋阻塞尔破;隊(duì)列不為空且與隊(duì)頭的模式不同街图,匹配成功,出隊(duì)操作懒构。

SynchronousQueue 屬性

// CPU的數(shù)量
static final int NCPUS = Runtime.getRuntime().availableProcessors();
// 有超時(shí)的情況自旋多少次餐济,當(dāng)CPU數(shù)量小于2的時(shí)候不自旋
static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32;
// 沒(méi)有超時(shí)的情況自旋多少次
static final int maxUntimedSpins = maxTimedSpins * 16;
// 剩余時(shí)間閾值常量(沒(méi)有超時(shí)時(shí)間會(huì)用到)
static final long spinForTimeoutThreshold = 1000L;
// Transferer模式
private transient volatile Transferer<E> transferer;
// 下面三個(gè)都是序列化時(shí)使用
private ReentrantLock qlock;
private WaitQueue waitingProducers;
private WaitQueue waitingConsumers;

SynchronousQueue 構(gòu)造函數(shù)

// 默認(rèn)初始化
public SynchronousQueue() {
    // 默認(rèn)非公平模式
    this(false);
}
// 設(shè)置是否使用公平模式
public SynchronousQueue(boolean fair) {
    transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}

SynchronousQueue 基礎(chǔ)方法

public boolean isEmpty() {
    return true;
}
public int size() {
    return 0;
}
public int remainingCapacity() {
    return 0;
}
public void clear() {
    
}
public boolean contains(Object o) {
    return false;
}
public boolean remove(Object o) {
    return false;
}
public boolean containsAll(Collection<?> c) {
    return c.isEmpty();
}
public boolean removeAll(Collection<?> c) {
    return false;
}
public boolean retainAll(Collection<?> c) {
    return false;
}
public E peek() {
    return null;
}

可以說(shuō)SynchronousQueue是沒(méi)有容量的(只有生成者線程或者消費(fèi)者線程),所以長(zhǎng)度可以看做0胆剧,也不能peek絮姆。

SynchronousQueue 入隊(duì)出隊(duì)

// 入隊(duì),沒(méi)有出隊(duì)操作一直阻塞
public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    // 返回為null秩霍,則put失敗篙悯,中斷當(dāng)前線程
    if (transferer.transfer(e, false, 0) == null) {
        Thread.interrupted();
        throw new InterruptedException();
    }
}
// 入隊(duì),超時(shí)阻塞
public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {
    if (e == null) throw new NullPointerException();
    if (transferer.transfer(e, true, unit.toNanos(timeout)) != null)
        return true;
    // 返回為null前域,則offer失敗辕近,中斷當(dāng)前線程
    if (!Thread.interrupted())
        return false;
    throw new InterruptedException();
}
// 入隊(duì)
public boolean offer(E e) {
    if (e == null) throw new NullPointerException();
    // 入隊(duì)
    return transferer.transfer(e, true, 0) != null;
}
// 出隊(duì)韵吨,沒(méi)有入隊(duì)操作一直阻塞
public E take() throws InterruptedException {
    E e = transferer.transfer(null, false, 0);
    if (e != null)
        return e;
    Thread.interrupted();
    throw new InterruptedException();
}
// 出隊(duì)匿垄,超時(shí)阻塞
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    // 出隊(duì),獲取返回值
    E e = transferer.transfer(null, true, unit.toNanos(timeout));
    // 返回值不為空归粉,或者線程未中斷椿疗,返回結(jié)果
    if (e != null || !Thread.interrupted())
        return e;
    throw new InterruptedException();
}
// 出隊(duì)
public E poll() {
    // 出隊(duì)
    return transferer.transfer(null, true, 0);
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市糠悼,隨后出現(xiàn)的幾起案子届榄,更是在濱河造成了極大的恐慌,老刑警劉巖倔喂,帶你破解...
    沈念sama閱讀 219,270評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件铝条,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡席噩,警方通過(guò)查閱死者的電腦和手機(jī)班缰,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,489評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門(mén),熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)悼枢,“玉大人埠忘,你說(shuō)我怎么就攤上這事。” “怎么了莹妒?”我有些...
    開(kāi)封第一講書(shū)人閱讀 165,630評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵名船,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我旨怠,道長(zhǎng)渠驼,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,906評(píng)論 1 295
  • 正文 為了忘掉前任鉴腻,我火速辦了婚禮渴邦,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘拘哨。我一直安慰自己谋梭,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,928評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布倦青。 她就那樣靜靜地躺著瓮床,像睡著了一般。 火紅的嫁衣襯著肌膚如雪产镐。 梳的紋絲不亂的頭發(fā)上隘庄,一...
    開(kāi)封第一講書(shū)人閱讀 51,718評(píng)論 1 305
  • 那天,我揣著相機(jī)與錄音癣亚,去河邊找鬼丑掺。 笑死,一個(gè)胖子當(dāng)著我的面吹牛述雾,可吹牛的內(nèi)容都是我干的街州。 我是一名探鬼主播,決...
    沈念sama閱讀 40,442評(píng)論 3 420
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼玻孟,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼唆缴!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起黍翎,我...
    開(kāi)封第一講書(shū)人閱讀 39,345評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤面徽,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后匣掸,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體趟紊,經(jīng)...
    沈念sama閱讀 45,802評(píng)論 1 317
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,984評(píng)論 3 337
  • 正文 我和宋清朗相戀三年碰酝,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了霎匈。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,117評(píng)論 1 351
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡砰粹,死狀恐怖唧躲,靈堂內(nèi)的尸體忽然破棺而出造挽,到底是詐尸還是另有隱情,我是刑警寧澤弄痹,帶...
    沈念sama閱讀 35,810評(píng)論 5 346
  • 正文 年R本政府宣布饭入,位于F島的核電站,受9級(jí)特大地震影響肛真,放射性物質(zhì)發(fā)生泄漏谐丢。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,462評(píng)論 3 331
  • 文/蒙蒙 一蚓让、第九天 我趴在偏房一處隱蔽的房頂上張望乾忱。 院中可真熱鬧,春花似錦历极、人聲如沸窄瘟。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 32,011評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)蹄葱。三九已至,卻和暖如春锄列,著一層夾襖步出監(jiān)牢的瞬間图云,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,139評(píng)論 1 272
  • 我被黑心中介騙來(lái)泰國(guó)打工邻邮, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留竣况,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,377評(píng)論 3 373
  • 正文 我出身青樓筒严,卻偏偏與公主長(zhǎng)得像丹泉,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子萝风,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,060評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容