多線程之阻塞隊列

阻塞隊列

BlockingQueue

隊列主要有兩種:FIFO(先進先出)唱逢、LIFO(后進先出)。
再多線程環(huán)境中,隊列很容實現(xiàn)數(shù)據(jù)共享迂求,我們常用的"生產(chǎn)者"、"消費者"模型就可以通過隊列來傳遞數(shù)據(jù)達到數(shù)據(jù)共享晃跺。但是現(xiàn)實中揩局,大多數(shù)情況都是生產(chǎn)者產(chǎn)生消息的速度和消費的速度是不匹配的,就需要相應的對生產(chǎn)或者消費進行阻塞掀虎。當生產(chǎn)的消息積累到一定程度時凌盯,就需要對生產(chǎn)者就行阻塞,以便消費者將積累的消息進行消費烹玉。在concurrent包發(fā)布以前驰怎,在多線程環(huán)境下,我們每個程序員都必須去自己控制這些細節(jié)二打,尤其還要兼顧效率和線程安全县忌。BlockingQueue釋放了我們的雙手,他讓我們不用關系什么時候去阻塞继效,什么時候去喚醒線程症杏。

? 拋異常 返回false 阻塞 超時,拋異常
插入 add offer put offer(timeout)
移除 take remove ? poll(timeout)
檢查 ? contains ? ?

操作方法:

//--------添加 ----------
boolean add(E e);        //添加元素瑞信,加不了拋異常
boolean offer(E e);      //添加元素厉颤,加不了返回false
void put(E e) throws InterruptedException;  //添加元素,加不了一直阻塞
boolean offer(E e, long timeout, TimeUnit unit)
       throws InterruptedException; //添加元素喧伞,達到指定時間沒有加入拋異常
// -------移除-----------
boolean remove(Object o);
E poll(long timeout, TimeUnit unit)
       throws InterruptedException;
E take() throws InterruptedException;
// ------

常見的BlockingQueue

? 有界性 數(shù)據(jù)結(jié)構
ArrayBlockingQueue bounded 加鎖 ArrayList
LinkedBlockingQueue optionally-bounded 加鎖 LinkedList
DelayQueue unbounded 加鎖 heap
PriorityBlockingQueue unbounded 加鎖 heap
SynchronousQueue bounded 加鎖

1. ArrayBlockingQueue

基于數(shù)組實現(xiàn)的有界阻塞安全線程隊列走芋。
構造函數(shù)

public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }
public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        //初始化重入鎖
        lock = new ReentrantLock(fair);
        //初始化讀、寫等待隊列
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }
public ArrayBlockingQueue(int capacity, boolean fair,
                              Collection<? extends E> c) {
       //初始化構造器
        this(capacity, fair);
        //獲取重入鎖
        final ReentrantLock lock = this.lock;
        lock.lock(); // Lock only for visibility, not mutual exclusion
        try {
            int i = 0;
            try {
                for (E e : c) {
                    checkNotNull(e);
                    items[i++] = e;
                }
            } catch (ArrayIndexOutOfBoundsException ex) {
                throw new IllegalArgumentException();
            }
           //初始化元素中的個數(shù)
            count = i;
           //插入元素的下標索引
            putIndex = (i == capacity) ? 0 : i;
        } finally {
           //釋放鎖
            lock.unlock();
        }
    }

相關屬性

final Object[] items;        //存放元素的數(shù)組
int takeIndex;                 //取元素的下標索引
int putIndex;                  //存元素的下標索引
int count;                      //數(shù)組中的元素個數(shù)
final ReentrantLock lock;   //數(shù)據(jù)讀取的可重入鎖
private final Condition notEmpty; //讀等待的隊列
private final Condition notFull;    //寫等待的隊列

核心函數(shù)
put

public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
       //如果當前線程沒有中斷潘鲫,就將當前線程鎖定
        lock.lockInterruptibly();
        try {
            //當前隊列已經(jīng)滿了就一直等待
            while (count == items.length)
                notFull.await();
            //插入元素
            enqueue(e);
        } finally {
           //釋放鎖
            lock.unlock();
        }
    }

take

public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

2. LinkedBlockingQueue

基于鏈表實現(xiàn)的阻塞隊列
構造函數(shù)

public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }
public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
    }
public LinkedBlockingQueue(Collection<? extends E> c) {
        this(Integer.MAX_VALUE);
        final ReentrantLock putLock = this.putLock;
        putLock.lock(); // Never contended, but necessary for visibility
        try {
            int n = 0;
            for (E e : c) {
                if (e == null)
                    throw new NullPointerException();
                if (n == capacity)
                    throw new IllegalStateException("Queue full");
                enqueue(new Node<E>(e));
                ++n;
            }
            count.set(n);
        } finally {
            putLock.unlock();
        }
    }

相關屬性

private final int capacity;           //元素個數(shù)
private final AtomicInteger count = new AtomicInteger();  //
transient Node<E> head;             //頭節(jié)點
private transient Node<E> last;     //尾節(jié)點
private final ReentrantLock takeLock = new ReentrantLock();  //讀可重入鎖
private final Condition notEmpty = takeLock.newCondition();  //讀等待隊列
private final ReentrantLock putLock = new ReentrantLock(); //寫可重入鎖
private final Condition notFull = putLock.newCondition();  //寫隊列

核心函數(shù)
put

public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        // Note: convention in all put/take/etc is to preset local var
        // holding count negative to indicate failure unless set.
        int c = -1;
        Node<E> node = new Node<E>(e);
       //獲取寫的可重入鎖
        final ReentrantLock putLock = this.putLock;
        //線程安全的原子操作類
        final AtomicInteger count = this.count;
        //判斷線程是否中斷
        putLock.lockInterruptibly();
        try {
            /*
             * Note that count is used in wait guard even though it is
             * not protected by lock. This works because count can
             * only decrease at this point (all other puts are shut
             * out by lock), and we (or some other waiting put) are
             * signalled if it ever changes from capacity. Similarly
             * for all other uses of count in other wait guards.
             */
            while (count.get() == capacity) {
                notFull.await();
            }
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                //如果加1后還小于當前容量翁逞,則喚醒一個等待的線程
                notFull.signal();
        } finally {
            //釋放鎖
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
    }
3. DelayQueue

DelayQueue每次都是將元素加入排序隊列,以delay/過期時間為排序因素溉仑,將快過期的元素放在隊首挖函,取數(shù)據(jù)的時候每次都是先取快過期的元素。
構造方法

public DelayQueue() {}
public DelayQueue(Collection<? extends E> c) {
        this.addAll(c);
    }

相關屬性

private final transient ReentrantLock lock = new ReentrantLock();
private final PriorityQueue<E> q = new PriorityQueue<E>();  //根據(jù)隊列里某些元素排序的有序隊列
private final Condition available = lock.newCondition();
private Thread leader = null;

核心函數(shù)
offer

public boolean offer(E e) {
        //獲取可重入鎖
        final ReentrantLock lock = this.lock;
        //加鎖
        lock.lock();
        try {
            //將元素加入優(yōu)先級隊列中
            q.offer(e);
           //如果當前元素為隊首浊竟,將leader=null怨喘,喚起其他線程
            if (q.peek() == e) {
                leader = null;
                available.signal();
            }
            return true;
        } finally {
           //釋放鎖
            lock.unlock();
        }
    }

take

public E take() throws InterruptedException {
       //獲取可重入鎖
        final ReentrantLock lock = this.lock;
        //判斷當前線程是否中斷,沒有中斷就將當前線程鎖定
        lock.lockInterruptibly();
        try {
            //循環(huán)執(zhí)行
            for (;;) {
                E first = q.peek();
                //如果隊首為空振定,阻塞當前線程
                if (first == null)
                    available.await();
                else {
                    //獲取當前元素過期時間
                    long delay = first.getDelay(NANOSECONDS);
                    //小于等于0 直接彈出
                    if (delay <= 0)
                        return q.poll();
                     //將first 只為null必怜,避免內(nèi)存泄漏
                    first = null; // don't retain ref while waiting
                    if (leader != null)
                        //阻塞當前線程
                        available.await();
                    else {
                        //將當前線程賦值給leader,然后阻塞delay時間后频,等待隊首元素達到可出隊時間
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            available.awaitNanos(delay);
                        } finally {
                             //釋放leader引用
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            //如果leader元素為空梳庆,優(yōu)先級隊列不為空喚起其他線程
            if (leader == null && q.peek() != null)
                available.signal();
            //釋放鎖
            lock.unlock();
        }
    }
4. PriorityBlockingQueue

無界優(yōu)先隊列
構造函數(shù)

 public PriorityBlockingQueue() {
        this(DEFAULT_INITIAL_CAPACITY, null);
    }
public PriorityBlockingQueue(int initialCapacity) {
        this(initialCapacity, null);
    }
public PriorityBlockingQueue(int initialCapacity,
                                 Comparator<? super E> comparator) {
        if (initialCapacity < 1)
            throw new IllegalArgumentException();
        this.lock = new ReentrantLock();
        this.notEmpty = lock.newCondition();
        this.comparator = comparator;
        this.queue = new Object[initialCapacity];
    }
public PriorityBlockingQueue(Collection<? extends E> c) {
        this.lock = new ReentrantLock();
        this.notEmpty = lock.newCondition();
        boolean heapify = true; // true if not known to be in heap order
        boolean screen = true;  // true if must screen for nulls
        if (c instanceof SortedSet<?>) {
            SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
            this.comparator = (Comparator<? super E>) ss.comparator();
            heapify = false;
        }
        else if (c instanceof PriorityBlockingQueue<?>) {
            PriorityBlockingQueue<? extends E> pq =
                (PriorityBlockingQueue<? extends E>) c;
            this.comparator = (Comparator<? super E>) pq.comparator();
            screen = false;
            if (pq.getClass() == PriorityBlockingQueue.class) // exact match
                heapify = false;
        }
        Object[] a = c.toArray();
        int n = a.length;
        // If c.toArray incorrectly doesn't return Object[], copy it.
        if (a.getClass() != Object[].class)
            a = Arrays.copyOf(a, n, Object[].class);
        if (screen && (n == 1 || this.comparator != null)) {
            for (int i = 0; i < n; ++i)
                if (a[i] == null)
                    throw new NullPointerException();
        }
        this.queue = a;
        this.size = n;
        if (heapify)
            heapify();
    }

相關屬性

private static final int DEFAULT_INITIAL_CAPACITY = 11;   //默認容量
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; //最大容量
private transient Object[] queue;   //存放元素數(shù)組
private transient int size;  //元素個數(shù)
private transient Comparator<? super E> comparator;  //比較器
private final ReentrantLock lock;  //可重入鎖
private final Condition notEmpty;  //非空條件
private transient volatile int allocationSpinLock; //擴容時暖途,CAS更新這個值誰更新成功誰執(zhí)行
private PriorityQueue<E> q;//不阻塞的優(yōu)先隊列,用于序列化/反序列化

核心函數(shù)
offer

public boolean offer(E e) {
        if (e == null)
            throw new NullPointerException();
        final ReentrantLock lock = this.lock;
        lock.lock();
        int n, cap;
        Object[] array;
         //判斷是否需要擴容
        while ((n = size) >= (cap = (array = queue).length))
            tryGrow(array, cap);
        try {
            //根據(jù)是否有比較器選擇不同方法
            Comparator<? super E> cmp = comparator;
            if (cmp == null)
                siftUpComparable(n, e, array);
            else
                siftUpUsingComparator(n, e, array, cmp);
            size = n + 1;
            //喚醒notEmpty條件
            notEmpty.signal();
        } finally {
           //釋放鎖
            lock.unlock();
        }
        return true;
    }
private static <T> void siftUpComparable(int k, T x, Object[] array) {
    Comparable<? super T> key = (Comparable<? super T>) x;
    while (k > 0) {
        // 取父節(jié)點
        int parent = (k - 1) >>> 1;
        // 父節(jié)點的元素值
        Object e = array[parent];
        // 如果key大于父節(jié)點膏执,堆化結(jié)束
        if (key.compareTo((T) e) >= 0)
            break;
        // 否則驻售,交換二者的位置,繼續(xù)下一輪比較
        array[k] = e;
        k = parent;
    }
    // 找到了應該放的位置更米,放入元素
    array[k] = key;
}

take

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    // 加鎖
    lock.lockInterruptibly();
    E result;
    try {
        // 隊列沒有元素欺栗,就阻塞在notEmpty條件上
        // 出隊成功,就跳出這個循環(huán)
        while ( (result = dequeue()) == null)
            notEmpty.await();
    } finally {
        // 解鎖
        lock.unlock();
    }
    // 返回出隊的元素
    return result;
}
private E dequeue() {
    // 元素個數(shù)減1
    int n = size - 1;
    if (n < 0)
        // 數(shù)組元素不足征峦,返回null
        return null;
    else {
        Object[] array = queue;
        // 彈出堆頂元素
        E result = (E) array[0];
        // 把堆尾元素拿到堆頂
        E x = (E) array[n];
        array[n] = null;
        Comparator<? super E> cmp = comparator;
        // 并做自上而下的堆化
        if (cmp == null)
            siftDownComparable(0, x, array, n);
        else
            siftDownUsingComparator(0, x, array, n, cmp);
        // 修改size
        size = n;
        // 返回出隊的元素
        return result;
    }
}
private static <T> void siftDownComparable(int k, T x, Object[] array,
                                           int n) {
    if (n > 0) {
        Comparable<? super T> key = (Comparable<? super T>)x;
        int half = n >>> 1;           // loop while a non-leaf
        // 只需要遍歷到葉子節(jié)點就夠了
        while (k < half) {
            // 左子節(jié)點
            int child = (k << 1) + 1; // assume left child is least
            // 左子節(jié)點的值
            Object c = array[child];
            // 右子節(jié)點
            int right = child + 1;
            // 取左右子節(jié)點中最小的值
            if (right < n &&
                ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
                c = array[child = right];
            // key如果比左右子節(jié)點都小迟几,則堆化結(jié)束
            if (key.compareTo((T) c) <= 0)
                break;
            // 否則,交換key與左右子節(jié)點中最小的節(jié)點的位置
            array[k] = c;
            k = child;
        }
        // 找到了放元素的位置眶痰,放置元素
        array[k] = key;
    }
}
5. SynchronousQueue

雙棧雙隊列算法瘤旨,一個寫SynchronousQueue需要和一個讀SynchronousQueue組隊出現(xiàn)
構造方法

public SynchronousQueue() {
        this(false);
    }
public SynchronousQueue(boolean fair) {
        transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
    }

相關屬性

static final int NCPUS = Runtime.getRuntime().availableProcessors();
static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32;
static final int maxUntimedSpins = maxTimedSpins * 16;
static final long spinForTimeoutThreshold = 1000L;
private ReentrantLock qlock;
private WaitQueue waitingProducers;
private WaitQueue waitingConsumers;

核心方法
put

public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        if (transferer.transfer(e, false, 0) == null) {
            Thread.interrupted();
            throw new InterruptedException();
        }
    }
  E transfer(E e, boolean timed, long nanos) {
            /* Basic algorithm is to loop trying to take either of
             * two actions:
             *
             * 1. If queue apparently empty or holding same-mode nodes,
             *    try to add node to queue of waiters, wait to be
             *    fulfilled (or cancelled) and return matching item.
             *
             * 2. If queue apparently contains waiting items, and this
             *    call is of complementary mode, try to fulfill by CAS'ing
             *    item field of waiting node and dequeuing it, and then
             *    returning matching item.
             *
             * In each case, along the way, check for and try to help
             * advance head and tail on behalf of other stalled/slow
             * threads.
             *
             * The loop starts off with a null check guarding against
             * seeing uninitialized head or tail values. This never
             * happens in current SynchronousQueue, but could if
             * callers held non-volatile/final ref to the
             * transferer. The check is here anyway because it places
             * null checks at top of loop, which is usually faster
             * than having them implicitly interspersed.
             */

            QNode s = null; // constructed/reused as needed
            boolean isData = (e != null);

            for (;;) {
                QNode t = tail;
                QNode h = head;
                if (t == null || h == null)         // saw uninitialized value
                    continue;                       // spin

                if (h == t || t.isData == isData) { // empty or same-mode
                    QNode tn = t.next;
                    if (t != tail)                  // inconsistent read
                        continue;
                    if (tn != null) {               // lagging tail
                        advanceTail(t, tn);
                        continue;
                    }
                    if (timed && nanos <= 0)        // can't wait
                        return null;
                    if (s == null)
                        s = new QNode(e, isData);
                    if (!t.casNext(null, s))        // failed to link in
                        continue;

                    advanceTail(t, s);              // swing tail and wait
                    Object x = awaitFulfill(s, e, timed, nanos);
                    if (x == s) {                   // wait was cancelled
                        clean(t, s);
                        return null;
                    }

                    if (!s.isOffList()) {           // not already unlinked
                        advanceHead(t, s);          // unlink if head
                        if (x != null)              // and forget fields
                            s.item = s;
                        s.waiter = null;
                    }
                    return (x != null) ? (E)x : e;

                } else {                            // complementary-mode
                    QNode m = h.next;               // node to fulfill
                    if (t != tail || m == null || h != head)
                        continue;                   // inconsistent read

                    Object x = m.item;
                    if (isData == (x != null) ||    // m already fulfilled
                        x == m ||                   // m cancelled
                        !m.casItem(x, e)) {         // lost CAS
                        advanceHead(h, m);          // dequeue and retry
                        continue;
                    }

                    advanceHead(h, m);              // successfully fulfilled
                    LockSupport.unpark(m.waiter);
                    return (x != null) ? (E)x : e;
                }
            }
        }

take

 public E take() throws InterruptedException {
        E e = transferer.transfer(null, false, 0);
        if (e != null)
            return e;
        Thread.interrupted();
        throw new InterruptedException();
    }
最后編輯于
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市竖伯,隨后出現(xiàn)的幾起案子存哲,更是在濱河造成了極大的恐慌,老刑警劉巖七婴,帶你破解...
    沈念sama閱讀 212,454評論 6 493
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件祟偷,死亡現(xiàn)場離奇詭異,居然都是意外死亡打厘,警方通過查閱死者的電腦和手機修肠,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,553評論 3 385
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來户盯,“玉大人嵌施,你說我怎么就攤上這事∶а迹” “怎么了吗伤?”我有些...
    開封第一講書人閱讀 157,921評論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長硫眨。 經(jīng)常有香客問我足淆,道長,這世上最難降的妖魔是什么礁阁? 我笑而不...
    開封第一講書人閱讀 56,648評論 1 284
  • 正文 為了忘掉前任巧号,我火速辦了婚禮,結(jié)果婚禮上姥闭,老公的妹妹穿的比我還像新娘丹鸿。我一直安慰自己,他們只是感情好棚品,可當我...
    茶點故事閱讀 65,770評論 6 386
  • 文/花漫 我一把揭開白布卜高。 她就那樣靜靜地躺著弥姻,像睡著了一般。 火紅的嫁衣襯著肌膚如雪掺涛。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,950評論 1 291
  • 那天疼进,我揣著相機與錄音薪缆,去河邊找鬼。 笑死伞广,一個胖子當著我的面吹牛拣帽,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播嚼锄,決...
    沈念sama閱讀 39,090評論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼减拭,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了区丑?” 一聲冷哼從身側(cè)響起拧粪,我...
    開封第一講書人閱讀 37,817評論 0 268
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎沧侥,沒想到半個月后可霎,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,275評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡宴杀,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,592評論 2 327
  • 正文 我和宋清朗相戀三年癣朗,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片旺罢。...
    茶點故事閱讀 38,724評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡旷余,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出扁达,到底是詐尸還是另有隱情正卧,我是刑警寧澤,帶...
    沈念sama閱讀 34,409評論 4 333
  • 正文 年R本政府宣布罩驻,位于F島的核電站穗酥,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏惠遏。R本人自食惡果不足惜砾跃,卻給世界環(huán)境...
    茶點故事閱讀 40,052評論 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望节吮。 院中可真熱鬧抽高,春花似錦、人聲如沸透绩。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,815評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至碳竟,卻和暖如春草丧,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背莹桅。 一陣腳步聲響...
    開封第一講書人閱讀 32,043評論 1 266
  • 我被黑心中介騙來泰國打工昌执, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人诈泼。 一個月前我還...
    沈念sama閱讀 46,503評論 2 361
  • 正文 我出身青樓懂拾,卻偏偏與公主長得像,于是被迫代替她去往敵國和親铐达。 傳聞我的和親對象是個殘疾皇子岖赋,可洞房花燭夜當晚...
    茶點故事閱讀 43,627評論 2 350

推薦閱讀更多精彩內(nèi)容