10. LinkedBlockingQueue

LinkedBlockingQueue類實(shí)現(xiàn)了BlockingQueue接口邻储。閱讀BlockingQueue文本以獲取有關(guān)的更多信息蹄皱。

LinkedBlockingQueue在內(nèi)部將元素存儲在鏈接結(jié)構(gòu)(鏈接節(jié)點(diǎn))中。如果需要菌湃,該鏈接結(jié)構(gòu)可以具有一個上限问拘。如果未指定上限,則使用Integer.MAX_VALUE作為上限惧所。

LinkedBlockingQueue內(nèi)部將元素以FIFO(先入先出)次序存儲骤坐。隊列的頭部是已在隊列中的時間最長的元素,隊列的尾部是已在隊列中的時間最短的元素下愈。

以下是如何實(shí)例化和使用LinkedBlockingQueue

BlockingQueue<String> unbounded = new LinkedBlockingQueue<String>();
BlockingQueue<String> bounded   = new LinkedBlockingQueue<String>(1024);

bounded.put("Value");

String value = bounded.take();

源碼

LinkedBlockingQueue內(nèi)部使用了一個單向鏈表纽绍,同時它提供了兩個鎖,一個用于獲取并刪除元素势似,一個用于增加元素拌夏。count字段使用原子變量,避免修改它時需要同時獲取兩個鎖履因。

static class Node<E> {
    E item;

    /**
     * 下面中的一個:
     * - 真實(shí)的后繼節(jié)點(diǎn)
     * - 這個節(jié)點(diǎn)本身障簿,此時原后繼節(jié)點(diǎn)現(xiàn)在是head.next,即第一個元素
     * - null, 意味沒有后繼節(jié)點(diǎn)栅迄,此節(jié)點(diǎn)是隊列最后一個節(jié)點(diǎn)
     */
    Node<E> next;

    Node(E x) { item = x; }
}

private final int capacity;

/** Current number of elements */
private final AtomicInteger count = new AtomicInteger();

/**
 * Head of linked list.
 * Invariant: head.item == null
 */
transient Node<E> head;

/**
 * Tail of linked list.
 * Invariant: last.next == null
 */
private transient Node<E> last;

/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();

/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();

/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();

/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();

增加操作

注意進(jìn)行增加操作時站故,只對putLock加鎖,如果還對takeLock也進(jìn)行加鎖霞篡,那么就會影響性能世蔗。同時,為了彌補(bǔ)此方法帶來的后果朗兵,count使用原子變量污淋,進(jìn)行CAS更新,防止數(shù)據(jù)不一致余掖。

為了提升性能寸爆,在增加元素成功后礁鲁,如果隊列還沒有滿,那么便喚醒其他因隊列滿而被阻塞的插入線程赁豆。

public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly();
    try {
        // 注意即使count沒有被鎖保護(hù)仅醇,它依然可以被用作等待條件
        // 判定。因為此時count只會被減少(putLock已加鎖)魔种,如果容量
        // 改變析二,會被喚醒。count在其他地方的使用也與此相似节预。

        // 隊列已滿叶摄,阻塞自己
        while (count.get() == capacity) {
            notFull.await();
        }
        // 插入隊列中
        enqueue(node);
        // CAS更新count值
        c = count.getAndIncrement();
        // 如果隊列沒滿,喚醒其他等待插入的線程
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    // 如果隊列原來是空隊列安拟,喚醒等待提取元素的線程
    if (c == 0)
        signalNotEmpty();
}

private void enqueue(Node<E> node) {
    // assert putLock.isHeldByCurrentThread();
    // assert last.next == null;
    last = last.next = node;
}

private void signalNotEmpty() {
    final ReentrantLock takeLock = this.takeLock;
    // 先加鎖蛤吓,才能調(diào)用對應(yīng)Condtion的signal()方法
    takeLock.lock();
    try {
        notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
}

public boolean offer(E e) {
    if (e == null) throw new NullPointerException();
    final AtomicInteger count = this.count;
    // 隊列已滿,返回false
    if (count.get() == capacity)
        return false;
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        if (count.get() < capacity) {
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        }
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
    return c >= 0;
}

public boolean offer(E e, long timeout, TimeUnit unit)
    throws InterruptedException {

    if (e == null) throw new NullPointerException();
    long nanos = unit.toNanos(timeout);
    int c = -1;
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly();
    try {
        // 等待-超時機(jī)制
        while (count.get() == capacity) {
            if (nanos <= 0L)
                return false;
            nanos = notFull.awaitNanos(nanos);
        }
        enqueue(new Node<E>(e));
        c = count.getAndIncrement();
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
    return true;
}

刪除操作

刪除操作與增加操作一樣糠赦。

public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        // 當(dāng)隊列為空会傲,阻塞自己
        while (count.get() == 0) {
            notEmpty.await();
        }
        // 將頭節(jié)點(diǎn)出隊
        x = dequeue();
        c = count.getAndDecrement();
        // 如果隊列還有元素,喚醒其他等待提取元素的線程
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    // 如果原本隊列是滿的拙泽,喚醒增加線程淌山,因為現(xiàn)在元素已經(jīng)被取出,隊列不滿
    if (c == capacity)
        signalNotFull();
    return x;
}

private E dequeue() {
    // assert takeLock.isHeldByCurrentThread();
    // assert head.item == null;

    // 頭節(jié)點(diǎn)為空奔滑,其中不存儲元素
    Node<E> h = head;
    Node<E> first = h.next;
    h.next = h; // help GC
    head = first;
    E x = first.item;
    first.item = null;
    return x;
}

private void signalNotFull() {
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        notFull.signal();
    } finally {
        putLock.unlock();
    }
}

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    E x = null;
    int c = -1;
    long nanos = unit.toNanos(timeout);
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        while (count.get() == 0) {
            if (nanos <= 0L)
                return null;
            nanos = notEmpty.awaitNanos(nanos);
        }
        x = dequeue();
        c = count.getAndDecrement();
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}

public E poll() {
    final AtomicInteger count = this.count;
    if (count.get() == 0)
        return null;
    E x = null;
    int c = -1;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        if (count.get() > 0) {
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        }
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}

// 刪除一個指定元素
public boolean remove(Object o) {
    if (o == null) return false;
    // 將兩個鎖全部加鎖
    fullyLock();
    try {
        for (Node<E> pred = head, p = pred.next;
             p != null;
             pred = p, p = p.next) {
            if (o.equals(p.item)) {
                // 從隊列中移除此節(jié)點(diǎn)
                unlink(p, pred);
                return true;
            }
        }
        return false;
    } finally {
        // 釋放全部兩個鎖
        fullyUnlock();
    }
}

void unlink(Node<E> p, Node<E> pred) {
    // assert putLock.isHeldByCurrentThread();
    // assert takeLock.isHeldByCurrentThread();
    // p.next沒有被設(shè)置為null艾岂,為了保證迭代器遍歷到p時繼續(xù)工作,
    // 保證弱一致性
    p.item = null;
    pred.next = p.next;
    if (last == p)
        last = pred;
    if (count.getAndDecrement() == capacity)
        notFull.signal();
}

訪問操作

public E peek() {
    // 隊列為空朋其,返回null
    if (count.get() == 0)
        return null;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        // 返回第一個元素
        return (count.get() > 0) ? head.next.item : null;
    } finally {
        takeLock.unlock();
    }
}

其他操作

public void clear() {
    fullyLock();
    try {
        for (Node<E> p, h = head; (p = h.next) != null; h = p) {
            // 使得next指向自己
            h.next = h;
            // 解除對元素實(shí)體的引用
            p.item = null;
        }
        head = last;
        // assert head.item == null && head.next == null;
        // 如果原來隊列是滿的王浴,喚醒等待的插入線程
        if (count.getAndSet(0) == capacity)
            notFull.signal();
    } finally {
        fullyUnlock();
    }
}


public int drainTo(Collection<? super E> c) {
    return drainTo(c, Integer.MAX_VALUE);
}

public int drainTo(Collection<? super E> c, int maxElements) {
    Objects.requireNonNull(c);
    if (c == this)
        throw new IllegalArgumentException();
    if (maxElements <= 0)
        return 0;
    boolean signalNotFull = false;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        // 獲取當(dāng)前隊列中的元素數(shù)量
        int n = Math.min(maxElements, count.get());
        // count.get provides visibility to first n Nodes
        Node<E> h = head;
        int i = 0;
        try {
            // 將n個元素加入到指定集合中
            while (i < n) {
                Node<E> p = h.next;
                c.add(p.item);
                p.item = null;
                h.next = h;
                h = p;
                ++i;
            }
            return n;
        } finally {
            // Restore invariants even if c.add() threw
            if (i > 0) {
                // assert h.item == null;
                head = h;
                signalNotFull = (count.getAndAdd(-i) == capacity);
            }
        }
    } finally {
        takeLock.unlock();
        if (signalNotFull)
            signalNotFull();
    }
}

迭代器

LinkedBlockingQueue的迭代器與DelayQueue的不同,DelayQueue的迭代器與原組件沒有任何的一致性梅猿,而LinkedBlockingQueue的迭代器與內(nèi)部的鏈表保持了弱一致性氓辣。

注意它的next()方法,它會跳過內(nèi)容為null的節(jié)點(diǎn)袱蚓,回憶前面刪除操作中的remove(Object)方法钞啸,他沒有修改節(jié)點(diǎn)的next字段,如果修改了喇潘,迭代器就會無法正常工作体斩,而為了保證一致性,迭代器也需要跳過這個空節(jié)點(diǎn)颖低。

而它的forEachRemaining(Consumer<? super E> action)方法是分批次進(jìn)行處理的絮吵,每批64個元素,如果數(shù)量小于64忱屑,那就使用此數(shù)量蹬敲。

private class Itr implements Iterator<E> {
    private Node<E> next;           // 持有nextItem的節(jié)點(diǎn)
    private E nextItem;             // 下一個進(jìn)行處理的元素
    private Node<E> lastRet;        // 上一個返回的元素暇昂,即當(dāng)前正在使用的
    private Node<E> ancestor;       // Helps unlink lastRet on remove()

    Itr() {
        fullyLock();
        try {
            // 保存第一個元素
            if ((next = head.next) != null)
                nextItem = next.item;
        } finally {
            fullyUnlock();
        }
    }

    public boolean hasNext() {
        return next != null;
    }

    public E next() {
        Node<E> p;
        if ((p = next) == null)
            throw new NoSuchElementException();
        lastRet = p;
        E x = nextItem;
        fullyLock();
        try {
            E e = null;
            // 注意此處,遇到空節(jié)點(diǎn)會跳過去訪問下一個節(jié)點(diǎn)
            for (p = p.next; p != null && (e = p.item) == null; )
                p = succ(p);
            next = p;
            nextItem = e;
        } finally {
            fullyUnlock();
        }
        return x;
    }
    
    Node<E> succ(Node<E> p) {
        // 正常出隊的元素next字段會指向自己
        if (p == (p = p.next))
            p = head.next;
        return p;
    }
    
    public void forEachRemaining(Consumer<? super E> action) {
        // A variant of forEachFrom
        Objects.requireNonNull(action);
        Node<E> p;
        if ((p = next) == null) return;
        lastRet = p;
        next = null;
        final int batchSize = 64;
        Object[] es = null;
        int n, len = 1;
        do {
            fullyLock();
            try {
                if (es == null) {
                    p = p.next;
                    // 獲取真正存在的元素的數(shù)量伴嗡,如果多于64急波,分批進(jìn)行,一批為64個
                    for (Node<E> q = p; q != null; q = succ(q))
                        if (q.item != null && ++len == batchSize)
                            break;
                    es = new Object[len];
                    es[0] = nextItem;
                    nextItem = null;
                    n = 1;
                } else
                    n = 0;
                // n為1的使用只因為p=p.next瘪校,經(jīng)過此步后p已經(jīng)不是首元素澄暮,
                // 而是第二個元素。而后面批次的插入直接從0開始即可
                // 將元素放入數(shù)組中
                for (; p != null && n < len; p = succ(p))
                    if ((es[n] = p.item) != null) {
                        lastRet = p;
                        n++;
                    }
            } finally {
                fullyUnlock();
            }
            // 分別調(diào)用accept方法
            for (int i = 0; i < n; i++) {
                @SuppressWarnings("unchecked") E e = (E) es[i];
                action.accept(e);
            }
        } while (n > 0 && p != null);
    }

    public void remove() {
        // 獲取當(dāng)前元素
        Node<E> p = lastRet;
        if (p == null)
            throw new IllegalStateException();
        lastRet = null;
        fullyLock();
        try {
            if (p.item != null) {
                if (ancestor == null)
                    ancestor = head;
                // 獲取p的前驅(qū)結(jié)點(diǎn)
                ancestor = findPred(p, ancestor);
                // 從鏈表中刪除結(jié)點(diǎn)p
                unlink(p, ancestor);
            }
        } finally {
            fullyUnlock();
        }
    }
}

測試:

import org.junit.Test;

import java.util.Iterator;
import java.util.concurrent.LinkedBlockingQueue;

public class LinkedBlockingQueueTest {
    private static LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>();

    @Test
    public void test() {
        queue.offer("1");
        queue.offer("2");
        queue.offer("3");
        queue.offer("4");

        Iterator<String> itr = queue.iterator();
        queue.remove("3");
        itr.forEachRemaining(System.out::println);
    }
}

輸出如下:

1
2
4

核心要點(diǎn)

  1. 內(nèi)部使用一個單向鏈表阱扬,以FIFO順序存儲
  2. 可以在鏈表兩頭同時進(jìn)行操作赏寇,所以使用兩個鎖分別保護(hù)
  3. 插入線程在執(zhí)行完操作后如果隊列未滿會喚醒其他等待插入的線程,同時隊列非空還會喚醒等待獲取元素的線程价认;提取線程同理。
  4. 迭代器與單向鏈表保持弱一致性自娩,調(diào)用remove(T)方法刪除一個元素后用踩,不會解除其對下一個結(jié)點(diǎn)的next引用,否則迭代器將無法工作忙迁。
  5. 迭代器的forEachRemaining(Consumer<? super E> action)以64個元素為一批進(jìn)行操作
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末脐彩,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子姊扔,更是在濱河造成了極大的恐慌惠奸,老刑警劉巖,帶你破解...
    沈念sama閱讀 221,820評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件恰梢,死亡現(xiàn)場離奇詭異佛南,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)嵌言,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,648評論 3 399
  • 文/潘曉璐 我一進(jìn)店門嗅回,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人摧茴,你說我怎么就攤上這事绵载。” “怎么了苛白?”我有些...
    開封第一講書人閱讀 168,324評論 0 360
  • 文/不壞的土叔 我叫張陵娃豹,是天一觀的道長。 經(jīng)常有香客問我购裙,道長懂版,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 59,714評論 1 297
  • 正文 為了忘掉前任缓窜,我火速辦了婚禮定续,結(jié)果婚禮上谍咆,老公的妹妹穿的比我還像新娘。我一直安慰自己私股,他們只是感情好摹察,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,724評論 6 397
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著倡鲸,像睡著了一般供嚎。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上峭状,一...
    開封第一講書人閱讀 52,328評論 1 310
  • 那天克滴,我揣著相機(jī)與錄音,去河邊找鬼优床。 笑死劝赔,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的胆敞。 我是一名探鬼主播着帽,決...
    沈念sama閱讀 40,897評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼移层!你這毒婦竟也來了仍翰?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,804評論 0 276
  • 序言:老撾萬榮一對情侶失蹤观话,失蹤者是張志新(化名)和其女友劉穎予借,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體频蛔,經(jīng)...
    沈念sama閱讀 46,345評論 1 318
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡灵迫,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,431評論 3 340
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了帽驯。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片龟再。...
    茶點(diǎn)故事閱讀 40,561評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖尼变,靈堂內(nèi)的尸體忽然破棺而出利凑,到底是詐尸還是另有隱情,我是刑警寧澤嫌术,帶...
    沈念sama閱讀 36,238評論 5 350
  • 正文 年R本政府宣布哀澈,位于F島的核電站,受9級特大地震影響度气,放射性物質(zhì)發(fā)生泄漏割按。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,928評論 3 334
  • 文/蒙蒙 一磷籍、第九天 我趴在偏房一處隱蔽的房頂上張望适荣。 院中可真熱鬧现柠,春花似錦、人聲如沸弛矛。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,417評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽丈氓。三九已至周循,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間万俗,已是汗流浹背湾笛。 一陣腳步聲響...
    開封第一講書人閱讀 33,528評論 1 272
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留闰歪,地道東北人嚎研。 一個月前我還...
    沈念sama閱讀 48,983評論 3 376
  • 正文 我出身青樓,卻偏偏與公主長得像库倘,于是被迫代替她去往敵國和親嘉赎。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,573評論 2 359

推薦閱讀更多精彩內(nèi)容

  • 第一章:Java程序設(shè)計概述 Java和C++最大的不同在于Java采用的指針模型可以消除重寫內(nèi)存和損壞數(shù)據(jù)的可能...
    loneyzhou閱讀 1,260評論 1 7
  • 在一個方法內(nèi)部定義的變量都存儲在棧中于樟,當(dāng)這個函數(shù)運(yùn)行結(jié)束后,其對應(yīng)的棧就會被回收拇囊,此時迂曲,在其方法體中定義的變量將不...
    Y了個J閱讀 4,420評論 1 14
  • 文/陳澤坤 特別喜歡木心的《素履之往》里的一句話路捧,“寂寞的是,在生時传黄,沒有一個朋友杰扫。更寂寞的是,被理解的膘掰,都不可能...
    陳澤坤閱讀 2,634評論 16 22
  • 對于我來說章姓,米粉真是個不變的嗜好,走到哪吃到哪识埋。 在老家小鎮(zhèn)上凡伊,街頭菜市場那個棚棚底下攤位上的肉絲粉,再加上一根油...
    小小綠兒閱讀 586評論 0 1
  • 關(guān)于寫作窒舟,也許是你很頭疼的問題系忙,你通常會遇到以下的問題: 不知道別人喜歡看什么; 憋了半天最后連寫什么主題都還想不...
    阿MI閱讀 372評論 0 0