PriorityBlockingQueue 源碼分析(基于Java 8)

1. PriorityBlockingQueue定義

PriorityBlockingQueue 是基于 二叉堆, ReentrantLock, Condition 實現(xiàn)的并發(fā)安全的優(yōu)先級隊列.
主要有以下特點:

  1. 數(shù)據(jù)容量沒有界限(最大值 Integer.MAX_VALUE - 8)
  2. 居于ReentrantLock 實現(xiàn)并發(fā)安全, 基于 Condition 實現(xiàn)線程等待喚醒
  3. 數(shù)據(jù)底層存放在居于數(shù)組實現(xiàn)的二叉堆上, 注意這里沒有實現(xiàn)堆排序, 只是每次有數(shù)據(jù)變更時將最小/大放在了堆的最上面的節(jié)點上(PS: 不了解二叉堆的請戳這里)
2. 初始化方法 堆化(heapify)

實現(xiàn)思路: 從堆的最后一個 parent 開始, 將最小/大值放在 parent位置, 直到最頂層的parent為止舔涎;
直接看代碼

/**
 * Establishes the heap invariant (described above) in the entire tree
 * assuming nothing about the order of the elements prior to the call
 */
private void heapify(){
    /**
     * 將這個數(shù)組進行 堆化 (heapify)
     *
     */
    Object[] array = queue;
    int n = size;
    int half = (n >>> 1) -1;                // 1. 這里有個注意點 n 是數(shù)組的 length,
    Comparator<? super E> cmp = comparator; // 2. 獲取 比較器, 若這里的 comparator是空, 則用元素自己實現(xiàn)的比較接口進行比較
    if(cmp == null){
        for(int i = half; i >= 0; i--){     // 3. 從整個數(shù)組的最后一顆樹開始, 將二叉樹的最小值放置在parent位置, 一直到最上面的那顆二叉樹
            siftDownComparable(i, (E)array[i], array, n);
        }
    }else{
        for(int i = half; i >= 0; i--){
            siftDownUsingComparator(i, (E)array[i], array, n, cmp);
        }
    }
                                            // 4. 經(jīng)過這個 heapify 方法后, 整個二叉堆中的最小值已經(jīng)放在的 index=0 的位置上(注意: 這時不保證 左子樹一定小于右子樹)
                                            // 5. 若要進行二叉堆的排序, 則需要將 index=0的位置排查在外 從 index= 1的位置開始, 到最后一個位置, 再進行上面的操作
                                            // 其實思路就是 每次將最小值放在數(shù)組的最上面, 然后排除這個節(jié)點在外, 將下面的數(shù)組作為一個整體, 然后重復(fù)上面的步驟, 直到最后一個元素
}

這個方法其實從最后一個 parent 開始進行與子節(jié)點比較, 將最小/大值放在 parent 位置, 直到 頂層的 parent 為止
我們發(fā)現(xiàn)代碼中有個 siftDownComparable 方法, 這是實現(xiàn) 堆化的重要步驟

/**
 * Inserts item x at position k, maintaining heap invariant by
 * demoting x down the tree repeatedly until it is less than or
 * equal to its children or is a leaf
 *
 * @param k     the position to fill
 * @param x     the item to insert
 * @param array the heap array
 * @param n     the heap array
 * @param <T>
 */
private static <T> void siftDownComparable(int k, T x, Object[] array, int n){
    /**
     * 從整個數(shù)組的 k 位置開始向下進行 比較更換操作
     * 1. 獲取這個數(shù)組的中間值(大于等于它其實就是說已經(jīng)沒有子節(jié)點)
     *      舉例: 數(shù)組 array 含有元素 : 1,2,3,4,5,6,7,8,9,10 共10個元素
     *          其中的之間 half = n >>> 1 = 10 >>> 1 = 5; (就是下面代碼中的 half, 堆中所有parent的 index 均小于 5)
     *          而最大 parent 的index 是 : (9 - 1) >>> 1 = 4;
     *          再parent調(diào)整好后, 再下面的代碼中獲取的 k 就變成 9/10, 但是 9/10 > 5 (就是下面代碼的 while(k < half))
     * 2. 從k位子開始不斷向下比較, 將最小值放到 parent位置, 直到 k >= half
     * 3. 經(jīng)過這個方法比較后, 從k往下 都是最小值上parent上的一個棵二叉樹
     */
    if(n > 0){
        Comparable<? super T> key = (Comparable<? super T>)x;
        int half = n >>> 1;                 // 1. 獲取整個數(shù)組的中間坐標(biāo)
        while(k < half){                    // 2. k這里其實表示 parent 在數(shù)組中的 index, k >= half 其實就說明 k 在數(shù)組中已經(jīng)沒有子節(jié)點
            int child = (k << 1) + 1;       // 3. 獲取 k 的左子節(jié)點的 index
            Object c = array[child];        // 4. 獲取左子節(jié)點的值
            int right = child + 1;          // 5. 獲取右子節(jié)點的 index
            if(right < n &&                 // 6. 這個 if 判斷其實是 判斷左右子節(jié)點的大小, 并且找到其中的最小值, 賦值給 c;
                    ((Comparable<? super T>)c).compareTo((T)array[right]) > 0
                    ){
                c = array[child = right];
            }
            if(key.compareTo((T)c) <= 0){   // 7. key <= c 則說明, 進行下面 sift 已經(jīng)完成 (父節(jié)點k已經(jīng)小于等于子節(jié)點), 直接 break 出
                break;
            }
            array[k] = c;                   // 8. 代碼運行到這里說明 k > c惶看, 則將子數(shù)據(jù)c賦值到k的位置
            k = child;                      // 9. 將上次的子節(jié)點 child作為父節(jié)點, 再次下面進行比較, 直到 k >= half
        }
        array[k] = key;                     // 10. 將key值賦值給最后一次進行 siftdown 比較的  父節(jié)點上
    }
}

操作思路:

 從整個數(shù)組的 k 位置開始向下進行 比較更換操作
  1. 獲取這個數(shù)組的中間值(大于等于它其實就是說已經(jīng)沒有子節(jié)點)
       舉例: 數(shù)組 array 含有元素 : 1,2,3,4,5,6,7,8,9,10 共10個元素
           其中的之間 half = n >>> 1 = 10 >>> 1 = 5; (就是下面代碼中的 half, 堆中所有parent的 index 均小于 5)
           而最大 parent 的index 是 : (9 - 1) >>> 1 = 4;
           再parent調(diào)整好后, 再下面的代碼中獲取的 k 就變成 9/10, 但是 9/10 > 5 (就是下面代碼的 while(k < half))
  2. 從k位子開始不斷向下比較, 將最小值放到 parent位置, 直到 k >= half
  3. 經(jīng)過這個方法比較后, 從k往下 都是最小值上parent上的一個棵二叉樹
3. 添加元素 offer 方法

主要思路: 將添加的元素放置到數(shù)組的最尾端, 然后調(diào)用 siftUp 進行向上調(diào)整

  /**
 * Inserts the specified element into this priority queue
 * As the queue is unbounded, his method will never return {@code false}
 *
 * @param e the lement to add
 * @return {@code true} (as specified element cannot be compared
 *          with elements currently in the priority queue according to the
 *          priority queue's ordering)
 * @throws NullPointerException if the specified element is null
 */
@Override
public boolean offer(E e) {
    if(e != null){
        throw new NullPointerException();
    }
    final ReentrantLock lock = this.lock;       // 1. 獲取全局共享的鎖
    lock.lock();
    int n, cap;
    Object[] array;                             // 2. 判斷容器是否需要擴容
    while((n = size) >= (cap = (array = queue).length)){
        tryGrow(array, cap);                    // 3. 進行擴容操作
    }

    try{
        Comparator<? super E> cmp = comparator;
        if(cmp == null){                        // 4. 進行 保持 heap 性質(zhì)的 siftUp 操作
            siftUpComparable(n, e, array);
        }else{
            siftUpUsingComparator(n, e, array, cmp);
        }
        size = n + 1;                           // 5. 數(shù)據(jù)插入后, 整個容量值 + 1;
        notEmpty.signal();                      // 6. Condition 釋放信號, 告知其他等待的線程: 容器中已經(jīng)有元素
    }finally {
        lock.unlock();                          // 7. 釋放鎖
    }
    return true;
}

在代碼中我們看到了 tryGrow, 這個調(diào)整堆存儲空間的方法
在里面運用了 先進行鎖的釋放 lock.unlock, 然后 根據(jù) allocationSpinLock 這個指標(biāo)判斷是否其他線程在進行擴容, 基本上每次擴容都是 * 1.5

/**
 * Tries to grow array to accommodate at least one more element
 * (but normally expend by about 50%), giving up (allowing retry)
 * on contention (which we expect to be race). Call only this while
 * holding lock
 *
 * @param array the heap array
 * @param oldCap    the length of the array
 */
private void tryGrow(Object[] array, int oldCap){
    /**
     * tryGrow 數(shù)組容量擴容操作
     * 整個方法的執(zhí)行是在已經(jīng) ReentrantLock 獲取鎖的情況下進行的
     */

    lock.unlock(); // must release and then re-acquire main lock // 1. 釋放全局的鎖(為什么呢? 原因也非常簡單, 這個 lock 是全局方法共享的, 為的是更好的并發(fā)性能, 而擴容操作的并發(fā)是通過簡單的樂觀鎖 allocationSpinLock 來進行控制de)
    Object[] newArray = null;
    if(allocationSpinLock == 0 &&                                // 2. 居于CAS操作, 在 allocationSpinLock 實現(xiàn)樂觀鎖, 這個也是為了在擴容時不影響容器的其他并發(fā)操作
            unsafe.compareAndSwapInt(this, allocationSpinLockOffset, 0, 1)){
        try{
            int newCap = oldCap + ((oldCap < 64)?                // 3. 容量若小于 64則直接 double + 2; 大于的話, 直接 * 1.5
                    (oldCap + 2): // grow faster if small
                    (oldCap >> 1)
                                    );
            if(newCap - MAX_ARRAY_SIZE > 0){ // possible overflow
                int minCap = oldCap + 1;                         // 4. 擴容后超過最大容量處理
                if(minCap < 0 || minCap > MAX_ARRAY_SIZE){
                    throw new OutOfMemoryError();
                }
                newCap = MAX_ARRAY_SIZE;
            }
            if(newCap > oldCap && queue == array){              // 5. queue == array 若數(shù)組沒變化, 直接進行新建數(shù)組
                newArray = new Object[newCap];
            }
        }finally {
            allocationSpinLock = 0;
        }
    }
                                                                // 6. newArray == null 說明上面的操作過程中, 有其他的線程進行了擴容的操作
    if(newArray == null){ // back off if another thread is allocating
        Thread.yield();                                         // 7. 讓出 CPU 調(diào)度(因為其他線程擴容后必定有其他的操作)
    }
    lock.lock();                                                // 8. 重新獲取鎖
    if(newArray != null && queue == array){                     // 9. 判斷數(shù)組 queue 有沒有在其他線程中變化過
        queue = newArray;                                       // 10. 未變化, 直接進行賦值操作
        System.arraycopy(array, 0, newArray, 0, oldCap);
    }
}

在進行offer元素時主要還調(diào)用了 siftUpComparable 方法
思路: 將元素與上面的 parent 進行比較, 直到 parent >= 這個元素

    /**
     * Insert item x at position k, maintaining heap invariant by
     * promoting x up the tree until it is greater than or equal to
     * its parent, or is the root
     *
     * To simplify and speed up coercions and comparisons. the
     * Comparable and Comparator versions are separated into different
     * method that are otherwise identical. (Similarly for siftDown)
     * These methods are statics, with heap state as arguments, to
     * simplify use in light og possible comparator exceptions
     *
     * @param k the position to fill
     * @param x the item to insert
     * @param array the heap array
     * @param <T>
     */
    private static <T> void siftUpComparable(int k, T x, Object[] array){
        /**
         * 簡單的 siftUp 操作: 大體操作就是將元素x放置到k位置, 然后對k的parent進行比較, 直到 k>=parent為止
         */
        Comparable<? super T> key = (Comparable<? super T>)x;
        while(k > 0){                           // 1. k是否到達二叉樹的頂端
            int parent = (k - 1) >>> 1;         // 2. 尋找 k 的parent位置
            Object e = array[parent];           // 3. 獲取parent的值
            if(key.compareTo((T)e) >= 0){       // 4. key >= e說明 parent >=子節(jié)點, 則不需要 siftUp 操作
                break;
            }
            array[k] = e;                       // 5. 將上次比較中 parent節(jié)點的值放在子節(jié)點上
            k = parent;                         // 6. 將這次比較中的 parent 當(dāng)作下次比價的k(k是下次比較的子節(jié)點)
        }
        array[k] = key;                         // 7. 將值key放置合適的位置上
    }
4. 刪除元素 poll 方法

思路: 將元素的首節(jié)點拿出, 作為返回, 末尾節(jié)點放置到 index=0位置, 開始 siftDown直到 滿足 parent >= child

@Override
public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return dequeue();
    } finally {
        lock.unlock();
    }
}

private E dequeue(){
    int n = size - 1;
    if(n < 0){                              // 1. 判斷元素是否未空
        return null;                        // 2. 容器中沒有元素, 直接返回 null
    }
    else{
        Object[] array = queue;
        E result = (E)array[0];             // 3. 取出數(shù)組中的第一個元素, 作為返回值
        E x = (E)array[n];                  // 4. 將數(shù)組的最后一個元素取出
        array[n] = null;
        Comparator<? super E> cmp = comparator;
        if(cmp == null){                    // 5. 將剛才取出的數(shù)組中最后一個元素放到第一個index位置, 進行siftDown操作(就是向下堆化操作)
            siftDownComparable(0, x, array, n);
        }else{
            siftDownUsingComparator(0, x, array, n, cmp);
        }
        size = n;                           // 6. 重新賦值 size值
        return result;                      // 7. 返回取出的值
    }
}
4. 刪除元素 remove 方法

思路:

  1. 尋找待刪除元素在數(shù)組中的位置
  2. 刪除待刪除的元素, 將數(shù)組中的最后一個元素放置到這個位置, 先進行 siftDown 尋找合適放置的位置, 然后再siftUp 尋找合適的放置位置
/**
 * Remove a single instance of the specified element from this queue,
 * if it is present. More formally, removes an element {@code e} such
 * that {@code o.equal(e)}, if this queue contains one or more such
 * elements. Returns {@code true} if and onlu if this queue contained
 * the specified element (or equivalently, if this queue changed as
 * a result of the call)
 *
 * @param o element to removed from this queue, if present
 * @return {@code true} if this queue changed as a result of the call
 */
public boolean remove(Object o){
    /**
     * 刪除堆中對應(yīng)的元素
     */
    final ReentrantLock lock = this.lock;
    lock.lock();
    try{
        int i = indexOf(o);     // 1. 找出元素 o 在堆中的位置
        if(i == -1){
            return false;
        }
        removeAt(i);            // 2. 調(diào)用 removeAt 定點刪除元素
        return true;
    }finally {
        lock.unlock();
    }
}

/**
 * Removes the ith element from queue
 * @param i
 */
private void removeAt(int i){
    /**
     * 刪除堆中指定位置的元素
     */
    Object[] array = queue;
    int n = size - 1;
    if(n == i){ // remove last lement                           // 1. 若元素是末尾元素, 則直接進行刪除操作
        array[i] = null;
    }else{
        E moved = (E)array[n];                                  // 2. 獲取待堆中最后的值(這個不是最大值)
        array[n] = null;                                        // 3. 將對應(yīng)元素置空
        Comparator<? super E> cmp = comparator;
        if(cmp == null){
            siftDownComparable(i, moved, array, n);             // 4. 將最后值 moved 放在 i 位置進行 siftDown
        }else{
            siftDownUsingComparator(i, moved, array, n, cmp);
        }
        if(array[i] == moved){                                  // 5. array[i] = moved 說明 siftDown 沒起作用, 節(jié)點 moved可能應(yīng)該在堆上面的位置, 所以進行 siftUp, 從而將 moved 放在上面堆中某個位置
            if(cmp == null){
                siftUpComparable(i, moved, array);
            }else{
                siftUpUsingComparator(i, moved, array, cmp);
            }
        }
    }
    size = n;                                                   // 6. 進行size重新賦值操作
}

總結(jié): PriorityBlockingQueue 是一個基于二叉堆實現(xiàn)的優(yōu)先級隊列, 與其他隊列不同的是它支持高優(yōu)先級的數(shù)據(jù)線poll出, 在有優(yōu)先級需要的生產(chǎn)者消費者場景中用這個類比較合適.

參考:
二叉堆
堆排序
vickyqi PriorityBlockingQueue

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末静稻,一起剝皮案震驚了整個濱河市把敢,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌,老刑警劉巖武花,帶你破解...
    沈念sama閱讀 221,820評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異杈帐,居然都是意外死亡体箕,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,648評論 3 399
  • 文/潘曉璐 我一進店門挑童,熙熙樓的掌柜王于貴愁眉苦臉地迎上來累铅,“玉大人,你說我怎么就攤上這事站叼⊥奘蓿” “怎么了?”我有些...
    開封第一講書人閱讀 168,324評論 0 360
  • 文/不壞的土叔 我叫張陵大年,是天一觀的道長换薄。 經(jīng)常有香客問我玉雾,道長,這世上最難降的妖魔是什么轻要? 我笑而不...
    開封第一講書人閱讀 59,714評論 1 297
  • 正文 為了忘掉前任复旬,我火速辦了婚禮,結(jié)果婚禮上冲泥,老公的妹妹穿的比我還像新娘驹碍。我一直安慰自己,他們只是感情好凡恍,可當(dāng)我...
    茶點故事閱讀 68,724評論 6 397
  • 文/花漫 我一把揭開白布志秃。 她就那樣靜靜地躺著,像睡著了一般嚼酝。 火紅的嫁衣襯著肌膚如雪浮还。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,328評論 1 310
  • 那天闽巩,我揣著相機與錄音钧舌,去河邊找鬼。 笑死涎跨,一個胖子當(dāng)著我的面吹牛洼冻,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播隅很,決...
    沈念sama閱讀 40,897評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼撞牢,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了叔营?” 一聲冷哼從身側(cè)響起屋彪,我...
    開封第一講書人閱讀 39,804評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎审编,沒想到半個月后撼班,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體歧匈,經(jīng)...
    沈念sama閱讀 46,345評論 1 318
  • 正文 獨居荒郊野嶺守林人離奇死亡垒酬,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,431評論 3 340
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了件炉。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片勘究。...
    茶點故事閱讀 40,561評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖斟冕,靈堂內(nèi)的尸體忽然破棺而出口糕,到底是詐尸還是另有隱情,我是刑警寧澤磕蛇,帶...
    沈念sama閱讀 36,238評論 5 350
  • 正文 年R本政府宣布景描,位于F島的核電站十办,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏超棺。R本人自食惡果不足惜向族,卻給世界環(huán)境...
    茶點故事閱讀 41,928評論 3 334
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望棠绘。 院中可真熱鬧件相,春花似錦、人聲如沸氧苍。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,417評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽让虐。三九已至紊撕,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間赡突,已是汗流浹背逛揩。 一陣腳步聲響...
    開封第一講書人閱讀 33,528評論 1 272
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留麸俘,地道東北人辩稽。 一個月前我還...
    沈念sama閱讀 48,983評論 3 376
  • 正文 我出身青樓,卻偏偏與公主長得像从媚,于是被迫代替她去往敵國和親逞泄。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,573評論 2 359

推薦閱讀更多精彩內(nèi)容

  • 1. Java基礎(chǔ)部分 基礎(chǔ)部分的順序:基本語法拜效,類相關(guān)的語法喷众,內(nèi)部類的語法,繼承相關(guān)的語法紧憾,異常的語法到千,線程的語...
    子非魚_t_閱讀 31,664評論 18 399
  • 一憔四、基本數(shù)據(jù)類型 注釋 單行注釋:// 區(qū)域注釋:/* */ 文檔注釋:/** */ 數(shù)值 對于byte類型而言...
    龍貓小爺閱讀 4,268評論 0 16
  • PriorityQueue 一個無限的優(yōu)先級隊列基于一個優(yōu)先級堆。優(yōu)先級隊列中的元素根據(jù)它們的Comparable...
    tomas家的小撥浪鼓閱讀 2,547評論 1 2
  • 2016.9.16 今天是休閑也是馬力十足的一天般眉。 今天一共跟進/邀約/溫暖了6個非會員朋友; 4個會員朋友;約到...
    風(fēng)飄飄_閱讀 299評論 0 0
  • 修行小札 陶醉 2016.12.27 記不清楚有多久懶得動手寫下如影隨形的諸多情緒了了赵,漸漸養(yǎng)成了在心底消化所...
    閑敲棋子Ray閱讀 146評論 1 1