并發(fā)十三:并發(fā)容器Queue實現(xiàn)分析

Queue

J.U.C中分為阻塞隊里和非阻塞隊列。
阻塞隊列在滿時進(jìn)行入列操作會被阻塞,空時進(jìn)行出列操作會被阻塞,很適合并發(fā)編程中最常見的生產(chǎn)者-消費(fèi)者模式戏蔑。
非阻塞隊使用CAS無鎖算法避免鎖競爭,相比同步方式實現(xiàn)的隊列鲁纠,提高了吞吐量辛臊。

阻塞隊列:

  1. ArrayBlockingQueue基于數(shù)組實現(xiàn)的有界阻塞隊列。
  2. LinkedBlockingQueue基于鏈表實現(xiàn)的有界阻塞隊列房交。
  3. PriorityBlockingQueue基于數(shù)組實現(xiàn)的彻舰,支持優(yōu)先級排序的無界阻塞隊列。
  4. LinkedBlockingDeque基于鏈表實現(xiàn)的雙端阻塞隊列候味。
  5. SynchronousQueue不存儲元素的阻塞隊列刃唤。
  6. LinkedTransferQueue基于鏈表實現(xiàn)的無界阻塞隊列。

非阻塞隊列:

  1. ConcurrentLinkedQueue基于鏈表實現(xiàn)的無界非阻塞隊列白群。
  2. ConcurrentLinkedDeque基于鏈表實現(xiàn)的無界非阻塞雙端隊列尚胞。

隊列的入列、出列方法及處理方式(阻塞和超時只適用于阻塞隊列):

方法\處理方式 異常 特殊值 阻塞 超時
入列方法 add(e) offer(e) put(e) offer(e, time, unit)
出列方法 remove() poll() take poll(time, unit)
查看方法 element() peek()

ArrayBlockingQueue

基于數(shù)組實現(xiàn)的有界阻塞隊列:

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
    /** 數(shù)組容器 */
    final Object[] items;
    /** 出列下標(biāo) */
    int takeIndex;
    /** 入列下標(biāo) */
    int putIndex;
    /** 元素個數(shù) */
    int count;
    /** 重入鎖 */
    final ReentrantLock lock;
    /** Condition for waiting takes */
    private final Condition notEmpty;
    /** Condition for waiting puts */
    private final Condition notFull;

    public ArrayBlockingQueue(int capacity) {}
    public ArrayBlockingQueue(int capacity, boolean fair) {}
    public ArrayBlockingQueue(int capacity, boolean fair,
                              Collection<? extends E> c) {}
    ... ... 
}

ArrayBlockingQueue沒有默認(rèn)長度帜慢,初始化的時候必須指定笼裳。
fair參是用來設(shè)置重入鎖lock的公平性,重入鎖默認(rèn)是非公平鎖所以不能保證線程公平的訪問隊列粱玲」恚可以通過fair將重入鎖設(shè)置為公平鎖,但是會降低部分吞吐量抽减。
生產(chǎn)者線程和消費(fèi)者線程線程的協(xié)調(diào)工作是由兩個Condition完成的允青。

阻塞入列:

public void put(E e) throws InterruptedException {
    checkNotNull(e);//元素為空拋異常
    final ReentrantLock lock = this.lock;
    //加鎖,鎖響應(yīng)中斷
    lock.lockInterruptibly();
    try {
        //隊列已滿卵沉,入列線程在notFull上等待
        while (count == items.length){
            notFull.await();
        }
        //插入元素
        insert(e);
    } finally {
        lock.unlock();//釋放鎖
    }
}
private void insert(E x) {
    //加入到數(shù)組
    items[putIndex] = x;
    //inc() putIndex下標(biāo)等于capacity如果等于隊列長度返回0
    putIndex = inc(putIndex);
    ++count;//元素數(shù)量遞增
    //喚醒在notEmpty上等待的出列線程
    notEmpty.signal();
}

阻塞出列:

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();//加鎖颠锉,響應(yīng)中斷
    try {
        //隊列已空法牲,出列線程在notEmpty上等待
        while (count == 0){
            notEmpty.await();
        }
        //出列
        return extract();
    } finally {
        lock.unlock();//解鎖
    }
}
private E extract() {
    //數(shù)組
    final Object[] items = this.items;
    //元素類型泛型轉(zhuǎn)換
    E x = this.<E>cast(items[takeIndex]);
    //置空下標(biāo)為takeIndex的元素
    items[takeIndex] = null;
    //inc() putIndex下標(biāo)等于capacity如果等于隊列長度返回0
    takeIndex = inc(takeIndex);
    --count;//元素個數(shù)遞減
    //喚醒在notFull上等待的入列線程
    notFull.signal();
    return x;
}

當(dāng)隊列滿時,入列的線程會阻塞在notFull上琼掠,當(dāng)有出列操作時喚醒notFull上等待的線程拒垃,隊列空時出列線程會阻塞在notEmpty上,當(dāng)有入列操作時喚醒在notEmpty上等待的線程瓷蛙,典型的生產(chǎn)者消費(fèi)者邏輯悼瓮,關(guān)鍵點在于線程的協(xié)調(diào)。

LinkedBlockingQueue

基于單向鏈表實現(xiàn)的有界阻塞隊列:

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable{
    /** 內(nèi)部類 節(jié)點 */
    static class Node<E> {}
    /** 容量 */
    private final int capacity;
    /** 元素數(shù)量 計數(shù)器 */
    private final AtomicInteger count = new AtomicInteger(0);
    /** 頭節(jié)點 */
    private transient Node<E> head;
    /** 尾節(jié)點 */
    private transient Node<E> last;
    /** 出列鎖 */
    private final ReentrantLock takeLock = new ReentrantLock();
    /** takeLock->condition */
    private final Condition notEmpty = takeLock.newCondition();
    /** 入列鎖 */
    private final ReentrantLock putLock = new ReentrantLock();
    /** putLock->condition */
    private final Condition notFull = putLock.newCondition();
    public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }
    public LinkedBlockingQueue(int capacity) {}
    public LinkedBlockingQueue(Collection<? extends E> c) {}
    ... ...
}

LinkedBlockingQueue在初始化時可以不指定長度速挑,默認(rèn)長為整數(shù)的最大值 2147483647 谤牡。

使用了兩把鎖對對出列和入列進(jìn)行了鎖分離副硅,takeLock出列鎖姥宝、putLock入列鎖。

LinkedBlockingQueue沒有公平性設(shè)置恐疲,只能使用非公平鎖腊满。

阻塞入列:

public void put(E e) throws InterruptedException {
    if (e == null)//入列元素不能為空
        throw new NullPointerException();
    int c = -1;//計數(shù)
    Node<E> node = new Node(e);//構(gòu)造節(jié)點
    //入列鎖
    final ReentrantLock putLock = this.putLock;
    //元素數(shù)量
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly();//加鎖,響應(yīng)中斷
    try {
        //隊列已滿,入列線程在notFull上等待
        while (count.get() == capacity) {
            notFull.await();
        }
        //入列培己,在尾節(jié)點后鏈入node
        enqueue(node);
        //獲取元素數(shù)量 后加1
        c = count.getAndIncrement();
        //如果隊列還沒滿
        //喚醒在notFull等待的入列線程碳蛋,表示可繼續(xù)入列
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();//解鎖
    }
    //原本為空的隊列,即使加入一個元素
    //喚醒在notEmpty上等待的出列線程
    if (c == 0)
        signalNotEmpty();
    }
//鏈入尾節(jié)點
private void enqueue(Node<E> node) {
    last = last.next = node;
}
//喚醒在notEmpty上等待的出列線程
private void signalNotEmpty() {
    //出列鎖
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        //喚醒
        notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
}

阻塞出列:

public E take() throws InterruptedException {
    E x;
    int c = -1;
    //元素數(shù)量
    final AtomicInteger count = this.count;
    //出列鎖
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();//加鎖省咨,響應(yīng)中斷
    try {
        //隊列為空肃弟,出列線程在notEmpty上等待
        while (count.get() == 0) {
            notEmpty.await();
        }
        //出列
        x = dequeue();
        //獲取元素數(shù)量 后減1
        c = count.getAndDecrement();
        //出列之后,隊列還沒空零蓉,表示可繼續(xù)出列
        //喚醒在notEmpty等待的出列線程
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    //隊列有一個空位笤受,喚醒入列線程
    if (c == capacity)
        signalNotFull();
    return x;
}
//喚醒入列線程
private void signalNotFull() {
    //入列鎖
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        notFull.signal();
    } finally {
        putLock.unlock();
    }
}

當(dāng)隊列滿時,入列的線程會阻塞在notFull上敌蜂,當(dāng)有出列操作時喚醒notFull上等待的線程箩兽,隊列空時出列線程會阻塞在notEmpty上,當(dāng)有入列操作時喚醒在notEmpty上等待的線程章喉,入列和出列使用了兩把鎖汗贫,喚醒notFull時要在putLock監(jiān)視范圍,喚醒notEmpty要做takeLock的監(jiān)視范圍秸脱。

ArrayBlockingQueue和LinkedBlockingQueue的差異

  1. ArrayBlockingQueue使用循環(huán)數(shù)組必須指定容量落包,LinkedBlockingQueue使用鏈表可以不指定容量,能預(yù)判隊列容量使用ArrayBlockingQueue可以更有效的利用內(nèi)存摊唇。LinkedBlockingQueue如果沒有指定容量妥色,過快大批量的入列有可能會導(dǎo)致內(nèi)存溢出。
  2. ArrayBlockingQueue可以設(shè)置為公平鎖遏片,使得線程能夠公平地訪問隊列嘹害。
  3. LinkedBlockingQueue使用鎖分離撮竿,入列和出列使用不同的鎖,之間互不干擾笔呀,減少了鎖爭用的次數(shù)幢踏,吞吐量比ArrayBlockingQueue更高。

PriorityBlockingQueue

基于數(shù)組實現(xiàn)的無界阻塞隊列许师,因為是無界隊列當(dāng)數(shù)組長度不夠時會自動擴(kuò)容所以put方法不會阻塞房蝉,但是隊列空時進(jìn)行take會阻塞。

PriorityBlockingQueue不再是FIFO微渠,而是根據(jù)元素的排序來確定元素出列的優(yōu)先級搭幻,元素必須實現(xiàn)Comparable接口。

LinkedBlockingDeque

基于鏈表實現(xiàn)的組成的雙向阻塞隊列逞盆,同時支持FIFO和FILO兩種操作方式檀蹋。

SynchronousQueue

SynchronousQueue是一個沒有容器的隊列,所謂沒有容器就是指它不能存儲任何元素云芦。不像ArrayBlockingQueue或LinkedBlockingQueue如果隊列沒有滿俯逾,生產(chǎn)線程入列之后就返回了,而SynchronousQueue不同舅逸,因為它沒有緩沖存儲區(qū)所以生產(chǎn)者線程入列之后會一直阻塞桌肴,直到有消費(fèi)線程取走數(shù)據(jù)。

就像一手交錢一手交貨的過程琉历,賣方拿著貨物不松手坠七,直到買房把錢給他,買方也是一樣的拿著錢不松手旗笔,直到賣方把貨物給他彪置。
所以SynchronousQueue從線程的角度看是一個配對的過程一個生成線程必須匹配一個消費(fèi)線程,一個消費(fèi)線程必須匹配一個生成線程换团,從數(shù)據(jù)的角度看是一個數(shù)據(jù)傳遞的過程生成線程將數(shù)據(jù)傳遞給消費(fèi)線程悉稠。

SynchronousQueue:

public class SynchronousQueue<E> extends AbstractQueue<E>
    implements BlockingQueue<E>, java.io.Serializable{
    /** Transferer */
    abstract static class Transferer {
        abstract Object transfer(Object e, boolean timed, long nanos);
    }
    /** Transferer子類 棧 */
    static final class TransferStack extends Transferer {}
        /** Transferer子類 隊列 */
    static final class TransferQueue extends Transferer {}
        /** transferer實例 */
    private transient volatile Transferer transferer;
        /** 默認(rèn)構(gòu)造 */
    public SynchronousQueue() {
        this(false);
    }
        /** fair 公平性參數(shù) */
    public SynchronousQueue(boolean fair) {
         transferer = fair ? new TransferQueue() : new TransferStack();
    }
    ... ...
}

SynchronousQueue可以設(shè)置公平性策略,默認(rèn)是非公平隊列艘包。
Transferer是核心設(shè)置的猛,實現(xiàn)線程數(shù)據(jù)傳遞的基礎(chǔ),公平性隊列用TransferQueue新入列的節(jié)點會在隊尾或者和隊頭節(jié)點批量想虎,非公平隊列用TransferStack新入列的節(jié)點會在棧頂進(jìn)行匹配卦尊。因為沒有緩沖存儲所以容器類的常用方法size()、contains(Object o)舌厨、remove(Object o)等對其來說根本沒用岂却。

TransferStack:

static final class TransferStack extends Transferer {
    /** 消費(fèi)端 consumer */
    static final int REQUEST = 0;
    /** 生成端 producer */
    static final int DATA = 1;
    /** 匹配 */
    static final int FULFILLING = 2;
    /** true 匹配中 */
    static boolean isFulfilling(int m) {
        return (m & FULFILLING) != 0;
    }
    /** TransferStacks節(jié)點 */
    static final class SNode {}
    /** 棧頂節(jié)點 */
    volatile SNode head;
    /** CAS設(shè)置棧頂 */
    boolean casHead(SNode h, SNode nh) {}
    /** 構(gòu)造節(jié)點 */
    static SNode snode(SNode s, Object e, SNode next, int mode) {}
    /** 交換方法 */
    Object transfer(Object e, boolean timed, long nanos) {}
    /** 線程等待 */
    SNode awaitFulfill(SNode s, boolean timed, long nanos){}
    /** 是否自旋 */
    boolean shouldSpin(SNode s){}
    /** 將節(jié)點從棧清除 */
    void clean(SNode s){}
    //Unsafe 相關(guān)初始化 ... ..
}

SNode :

static final class SNode {
    volatile SNode next; //后繼節(jié)點
    volatile SNode match; //匹配節(jié)點
    volatile Thread waiter; //等待線程
    Object item; //生產(chǎn)端:data;消費(fèi)端:null
    int mode;//模式:DATA/REQUEST/FULFILLING
    SNode(Object item) {
        this.item = item;
    }
    /** CAS設(shè)置后繼節(jié)點 */
    boolean casNext(SNode cmp, SNode val) {
        return cmp == next 
            && UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
    }
    /** 當(dāng)前節(jié)點和s節(jié)點匹配,匹配成功喚醒當(dāng)前節(jié)點等待的線程 */
    boolean tryMatch(SNode s) {
        //當(dāng)前節(jié)點的match設(shè)置為s
        if (match == null  
        &&UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
            Thread w = waiter;
            if (w != null) { //waiter不為空 喚醒。
                waiter = null;
                LockSupport.unpark(w);
            }
            return true;
        }
        //如果match == s則說明已經(jīng)匹配成功
        return match == s;
    }
    //取消 將match設(shè)置為自身
    void tryCancel() {
        UNSAFE.compareAndSwapObject(this, matchOffset, null, this);
    }
    //是否已取消
    boolean isCancelled() {
        return match == this;
    }
    // Unsafe mechanics ... ...
}

TransferStack的transfer方法:

/** Puts 和 takes 數(shù)據(jù)交換 */
Object transfer(Object e, boolean timed, long nanos) {
    SNode s = null;
    // 0消費(fèi)端躏哩,1生產(chǎn)端
    int mode = (e == null) ? REQUEST : DATA;
    for (;;) {
        SNode h = head;// 頭節(jié)點
        // 棧為空,當(dāng)前線程進(jìn)入等待
        // 或者棧不為空署浩,但是棧頂元素模式與當(dāng)前線程模式相同
        // 即同為生成著或消費(fèi)者,比如線程put線程
        // 當(dāng)前線程進(jìn)入等待
        if (h == null || h.mode == mode) {
            if (timed && nanos <= 0) { // 不等待
                //h不為空并被取消
                if (h != null && h.isCancelled())
                    //出棧
                    casHead(h, h.next);
                else
                    return null;
            // 壓棧 更新棧頂為s
            } else if (casHead(h, s = snode(s, e, h, mode))) {
                // 進(jìn)入等待扫尺,等待一個互補(bǔ)的節(jié)點進(jìn)行匹配
                SNode m = awaitFulfill(s, timed, nanos);
                // 取消的時候?qū)atch設(shè)置成了this
                // 所以m==s即被取消筋栋,清除,返回正驻。
                if (m == s) {
                    clean(s);
                    return null;
                }
                //已經(jīng)完成了批量
                if ((h = head) != null && h.next == s) {
                    casHead(h, s.next);
                }
                // 如果是消費(fèi)者則返回生成值的值
                // 如果是生產(chǎn)者返回自身的值
                return (mode == REQUEST) ? m.item : s.item;
            }
        // 棧頂和當(dāng)前節(jié)點互補(bǔ)即模式不同弊攘,進(jìn)入匹配邏輯
        } else if (!isFulfilling(h.mode)) {
            if (h.isCancelled()) {// 已取消,出棧姑曙,置換棧頂為h.next
                casHead(h, h.next);
            //構(gòu)造當(dāng)前“正在匹配"狀態(tài)的節(jié)點s
            } else if (casHead(h, s = snode(s, e, h, FULFILLING | mode))) {
                for (;;) { // 循環(huán)直到找到一個可以匹配的節(jié)點
                    SNode m = s.next; // m即與s匹配的節(jié)點
                    //m==null說明棧s之后無元素襟交,可能被其他線程匹配了。
                    //s出棧伤靠,s置空捣域,進(jìn)行最外層的循環(huán).
                    if (m == null) { 
                        casHead(s, null); 
                        s = null; 
                        break; 
                    }
                    //mn為后備的棧頂
                    //匹配成功,將s和m同時出棧醋界,mn為棧頂
                    SNode mn = m.next;
                    if (m.tryMatch(s)) {
                        //匹配成功竟宋,mn設(shè)置為棧頂
                        casHead(s, mn);
                        // 如果是消費(fèi)者則返回生成值的值
                        // 如果是生產(chǎn)者返回自身的值
                        return (mode == REQUEST) ? m.item : s.item;
                    } else
                        // 設(shè)置匹配失敗提完,則說明m已經(jīng)被其他節(jié)點匹配了
                        s.casNext(m, mn); // help unlink
                }
            }
        } else { // 非棧頂匹配形纺,邏輯與棧頂匹配一致
            SNode m = h.next; // m is h's match
            if (m == null) // waiter is gone
                casHead(h, null); // pop fulfilling node
            else {
                SNode mn = m.next;
                if (m.tryMatch(h)) // help match
                    casHead(h, mn); // pop both h and m
                else // lost match
                    h.casNext(m, mn); // help unlink
            }
        }
    }
}
// 等待
SNode awaitFulfill(SNode s, boolean timed, long nanos) {
    long lastTime = timed ? System.nanoTime() : 0;
    // 當(dāng)前線程
    Thread w = Thread.currentThread();
    // 頭節(jié)點
    SNode h = head;
    // 自旋次數(shù)
    int spins = (shouldSpin(s) ? 
        (timed ? maxTimedSpins : maxUntimedSpins) : 0);
    for (;;) {
        if (w.isInterrupted()) {// 當(dāng)前線程中斷
            s.tryCancel();//取消節(jié)點
        }
        SNode m = s.match;
        if (m != null) {//匹配成功,返回匹配的節(jié)點
            return m;
        }
        // 超時
        if (timed) {
            long now = System.nanoTime();
            nanos -= now - lastTime;
            lastTime = now;
            if (nanos <= 0) {
                s.tryCancel();//取消
                continue;
            }
        }
        // 自旋隶症,直到spins==0缺亮,進(jìn)入等待景用,自旋的目的是為了減少線程掛起的次數(shù)
        // 如果線程掛起前,匹配線程來了脂新,則線程不需要掛起
        if (spins > 0) {
            spins = shouldSpin(s) ? (spins - 1) : 0;
        }
        // 設(shè)置節(jié)點的等待線程
        else if (s.waiter == null) {
            s.waiter = w; // establish waiter so can park next iter
        }
        // 掛起操作
        else if (!timed) {
            LockSupport.park(this);
        } else if (nanos > spinForTimeoutThreshold) {
            LockSupport.parkNanos(this, nanos);
        }
    }
}

TransferStack的transfer大致邏輯是:線程A進(jìn)行put(A),此時棧為空粗梭,將節(jié)點A入棧争便,線程A掛起,棧頂為節(jié)點A断医。線程B進(jìn)行put(B),和節(jié)點A模式一樣滞乙,同為DATA,將節(jié)點B入棧線程B掛起,棧頂為節(jié)點B鉴嗤。線程C進(jìn)行take(),和棧頂B模式互補(bǔ)斩启,將節(jié)點C的狀態(tài)設(shè)置為FULFILLING入棧,開始進(jìn)行匹配操作醉锅,匹配成則線程B被喚醒兔簇、節(jié)點B和節(jié)點C出棧,并返回節(jié)點B的值。

如果節(jié)點B和節(jié)點C正在匹配中垄琐,即棧頂節(jié)點的狀態(tài)為ULFILLING边酒,線程D進(jìn)行take(),那么線程D將幫助節(jié)點B和節(jié)點C完成匹配和出棧,自己在留在下一輪循環(huán)中匹配狸窘。

線程A是先入棧的反而后匹配甚纲,所以TransferStack的匹配過程是非公平的。TransferQueue則是在隊尾入列朦前,從隊列頭匹配介杆,能保證先入列的線程可以盡早的得到匹配,阻塞和匹配邏輯和上述差不多韭寸,只是入列過程不一樣春哨,不再贅述。

SynchronousQueue不能使用在緩沖場景恩伺,但是非常適合用在傳遞場景赴背,由于其阻塞過程沒有鎖的競爭吞吐量高于ArrayBlockingQueue和LinkedBlockingQueue。

LinkedTransferQueue

LinkedTransferQueue基于鏈表的無界阻塞隊列晶渠。同時它還實現(xiàn)了TransferQueue接口凰荚,這是一個在JDK1.7中新增的接口,接口中的transfer系列方法會使生產(chǎn)者一直阻塞直到所添加到隊列的元素被某一個消費(fèi)者所消費(fèi)褒脯。和SynchronousQueue中實現(xiàn)的TransferQue意思差不多便瑟。

與SynchronousQueue相比LinkedTransferQueue的應(yīng)用更廣泛,可以使用put/take方法用作緩沖番川,還可以使用transfer方法用作傳遞到涂,可以看做是ConcurrentLinkedQueue、SynchronousQueue(公平模式)和LinkedBlockingQueue的超集颁督。

ConcurrentLinkedQueue

使用單向鏈表構(gòu)造的無界非阻塞隊列:

public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
      implements Queue<E>, java.io.Serializable {
    //內(nèi)部類 節(jié)點
    private static class Node<E> {
        volatile E item;
        volatile Node<E> next;
        ... ...
    }
    //頭節(jié)點
    private transient volatile Node<E> head;
    //尾節(jié)點
    private transient volatile Node<E> tail;
    public ConcurrentLinkedQueue() {
        head = tail = new Node<E>(null);
    }
    public ConcurrentLinkedQueue(Collection<? extends E> c){}
    ... ...
}

入列操作:

public boolean offer(E e) {
    checkNotNull(e);// 檢查e是否為空践啄,為空直接拋異常
    // 構(gòu)造新節(jié)點
    final Node<E> newNode = new Node<E>(e);
    // 循環(huán),移動p節(jié)點沉御、確保CAS操作成功
    for (Node<E> t = tail, p = t;;) {
        Node<E> q = p.next;// p的后繼節(jié)點
        if (q == null) {// q為空屿讽,說明p為尾節(jié)點
            // CAS更新p的next節(jié)點為新入節(jié)點
            if (p.casNext(null, newNode)) {
                // 當(dāng)p為尾節(jié)點時只進(jìn)行了p.casNext()操作,
                // 并沒有移動尾節(jié)點吠裆。p和t中間至少隔了一個節(jié)點伐谈。
                if (p != t) {
                    // CAS 更新尾節(jié)點
                    casTail(t, newNode);
                }
                return true;
            }
        } else if (p == q) {// 尾節(jié)點被出列
            p = (t != (t = tail)) ? t : head;
        } else {// p節(jié)點后移
            p = (p != t && t != (t = tail)) ? t : q;
        }
    }
}

出列操作:

public E poll() {
    restartFromHead: for (;;) {//循環(huán)體,移動p節(jié)點硫痰、確保CAS成功
        for (Node<E> h = head, p = h, q;;) {
            E item = p.item;
            // 頭節(jié)點的內(nèi)容不為空衩婚,將其置空
            if (item != null && p.casItem(item, null)) {
                // 出列時,進(jìn)行了p.casItem()但并沒有移動頭節(jié)點
                // p節(jié)點和h節(jié)點中間至少隔了一個節(jié)點
                if (p != h) {
                    // 設(shè)置頭節(jié)點
                    updateHead(h, ((q = p.next) 
                                    != null) ? q : p);
                }
                return item;
            } else if ((q = p.next) == null) {//空隊為空
                updateHead(h, p);
                return null;
            } else if (p == q) {//從隊列頭重新開始
                continue restartFromHead;
            } else {// p后移
                p = q;
            }
        }
    }
}

入列操作offer(E e)只做了兩件事情效斑,第一是將新節(jié)點鏈到隊列尾部非春,第二是定位尾節(jié)點將其指向新入列的節(jié)點,這兩個操作都是使用CAS方式,出列poll()操作也類似奇昙。入列和出列都只需要動用一個節(jié)點护侮,并且是無鎖的,所以ConcurrentLinkedQueue在并發(fā)環(huán)境下出列和入列效率極高储耐。

獲取長度的方法羊初,需要遍歷整個鏈表,效率極低什湘,所以慎用长赞。如果想實時獲取列表長度,不妨使用一個AtomicInteger在入列和出列時記錄下闽撤,好過整表遍歷得哆。

public int size() {
    int count = 0;
    //從頭節(jié)點開始遍歷,p!=null說明p還沒到隊列尾
    for (Node<E> p = first(); p != null; p = succ(p))
        if (p.item != null)
            // Collection.size() spec says to max out
            if (++count == Integer.MAX_VALUE)
            break;
    return count;
}

ConcurrentLinkedDeque

使用雙向鏈表構(gòu)造的無界非阻塞雙端隊列哟旗,雙端隊列中的元素可以從兩端彈出贩据,入列和出列操作可以在表的兩端進(jìn)行,支持FIFO和FILO兩種出列模式闸餐。

盡管看起來比隊列更靈活饱亮,但實際上在應(yīng)用中遠(yuǎn)不及隊列有用。

碼字不易舍沙,轉(zhuǎn)載請保留原文連接http://www.reibang.com/p/0120984dea81

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末近上,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子场勤,更是在濱河造成了極大的恐慌戈锻,老刑警劉巖歼跟,帶你破解...
    沈念sama閱讀 217,907評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件和媳,死亡現(xiàn)場離奇詭異,居然都是意外死亡哈街,警方通過查閱死者的電腦和手機(jī)留瞳,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,987評論 3 395
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來骚秦,“玉大人她倘,你說我怎么就攤上這事∽鞴浚” “怎么了硬梁?”我有些...
    開封第一講書人閱讀 164,298評論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長胞得。 經(jīng)常有香客問我荧止,道長,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,586評論 1 293
  • 正文 為了忘掉前任跃巡,我火速辦了婚禮危号,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘素邪。我一直安慰自己外莲,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 67,633評論 6 392
  • 文/花漫 我一把揭開白布兔朦。 她就那樣靜靜地躺著偷线,像睡著了一般。 火紅的嫁衣襯著肌膚如雪沽甥。 梳的紋絲不亂的頭發(fā)上淋昭,一...
    開封第一講書人閱讀 51,488評論 1 302
  • 那天,我揣著相機(jī)與錄音安接,去河邊找鬼翔忽。 笑死,一個胖子當(dāng)著我的面吹牛盏檐,可吹牛的內(nèi)容都是我干的歇式。 我是一名探鬼主播,決...
    沈念sama閱讀 40,275評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼胡野,長吁一口氣:“原來是場噩夢啊……” “哼材失!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起硫豆,我...
    開封第一講書人閱讀 39,176評論 0 276
  • 序言:老撾萬榮一對情侶失蹤龙巨,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后熊响,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體旨别,經(jīng)...
    沈念sama閱讀 45,619評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,819評論 3 336
  • 正文 我和宋清朗相戀三年汗茄,在試婚紗的時候發(fā)現(xiàn)自己被綠了秸弛。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,932評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡洪碳,死狀恐怖递览,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情瞳腌,我是刑警寧澤绞铃,帶...
    沈念sama閱讀 35,655評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站嫂侍,受9級特大地震影響儿捧,放射性物質(zhì)發(fā)生泄漏冷离。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,265評論 3 329
  • 文/蒙蒙 一纯命、第九天 我趴在偏房一處隱蔽的房頂上張望西剥。 院中可真熱鬧,春花似錦亿汞、人聲如沸瞭空。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,871評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽咆畏。三九已至,卻和暖如春吴裤,著一層夾襖步出監(jiān)牢的瞬間旧找,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,994評論 1 269
  • 我被黑心中介騙來泰國打工麦牺, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留钮蛛,地道東北人。 一個月前我還...
    沈念sama閱讀 48,095評論 3 370
  • 正文 我出身青樓剖膳,卻偏偏與公主長得像魏颓,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子吱晒,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,884評論 2 354

推薦閱讀更多精彩內(nèi)容