PriorityBlockingQueue 源碼分析(基于Java 8)

1. PriorityBlockingQueue定義

PriorityBlockingQueue 是基于 二叉堆, ReentrantLock, Condition 實現(xiàn)的并發(fā)安全的優(yōu)先級隊列.

  1. 數(shù)據(jù)容量沒有界限(最大值 Integer.MAX_VALUE - 8)
  2. 居于ReentrantLock 實現(xiàn)并發(fā)安全, 基于 Condition 實現(xiàn)線程等待喚醒
  3. 數(shù)據(jù)底層存放在居于數(shù)組實現(xiàn)的二叉堆上, 注意這里沒有實現(xiàn)堆排序, 只是每次有數(shù)據(jù)變更時將最小/大放在了堆的最上面的節(jié)點上(PS: 不了解二叉堆的請戳這里)
2. 初始化方法 堆化(heapify)

實現(xiàn)思路: 從堆的最后一個 parent 開始, 將最小/大值放在 parent位置, 直到最頂層的parent為止舔涎;

 * Establishes the heap invariant (described above) in the entire tree
 * assuming nothing about the order of the elements prior to the call
private void heapify(){
     * 將這個數(shù)組進行 堆化 (heapify)
    Object[] array = queue;
    int n = size;
    int half = (n >>> 1) -1;                // 1. 這里有個注意點 n 是數(shù)組的 length,
    Comparator<? super E> cmp = comparator; // 2. 獲取 比較器, 若這里的 comparator是空, 則用元素自己實現(xiàn)的比較接口進行比較
    if(cmp == null){
        for(int i = half; i >= 0; i--){     // 3. 從整個數(shù)組的最后一顆樹開始, 將二叉樹的最小值放置在parent位置, 一直到最上面的那顆二叉樹
            siftDownComparable(i, (E)array[i], array, n);
        for(int i = half; i >= 0; i--){
            siftDownUsingComparator(i, (E)array[i], array, n, cmp);
                                            // 4. 經(jīng)過這個 heapify 方法后, 整個二叉堆中的最小值已經(jīng)放在的 index=0 的位置上(注意: 這時不保證 左子樹一定小于右子樹)
                                            // 5. 若要進行二叉堆的排序, 則需要將 index=0的位置排查在外 從 index= 1的位置開始, 到最后一個位置, 再進行上面的操作
                                            // 其實思路就是 每次將最小值放在數(shù)組的最上面, 然后排除這個節(jié)點在外, 將下面的數(shù)組作為一個整體, 然后重復(fù)上面的步驟, 直到最后一個元素

這個方法其實從最后一個 parent 開始進行與子節(jié)點比較, 將最小/大值放在 parent 位置, 直到 頂層的 parent 為止
我們發(fā)現(xiàn)代碼中有個 siftDownComparable 方法, 這是實現(xiàn) 堆化的重要步驟

 * Inserts item x at position k, maintaining heap invariant by
 * demoting x down the tree repeatedly until it is less than or
 * equal to its children or is a leaf
 * @param k     the position to fill
 * @param x     the item to insert
 * @param array the heap array
 * @param n     the heap array
 * @param <T>
private static <T> void siftDownComparable(int k, T x, Object[] array, int n){
     * 從整個數(shù)組的 k 位置開始向下進行 比較更換操作
     * 1. 獲取這個數(shù)組的中間值(大于等于它其實就是說已經(jīng)沒有子節(jié)點)
     *      舉例: 數(shù)組 array 含有元素 : 1,2,3,4,5,6,7,8,9,10 共10個元素
     *          其中的之間 half = n >>> 1 = 10 >>> 1 = 5; (就是下面代碼中的 half, 堆中所有parent的 index 均小于 5)
     *          而最大 parent 的index 是 : (9 - 1) >>> 1 = 4;
     *          再parent調(diào)整好后, 再下面的代碼中獲取的 k 就變成 9/10, 但是 9/10 > 5 (就是下面代碼的 while(k < half))
     * 2. 從k位子開始不斷向下比較, 將最小值放到 parent位置, 直到 k >= half
     * 3. 經(jīng)過這個方法比較后, 從k往下 都是最小值上parent上的一個棵二叉樹
    if(n > 0){
        Comparable<? super T> key = (Comparable<? super T>)x;
        int half = n >>> 1;                 // 1. 獲取整個數(shù)組的中間坐標(biāo)
        while(k < half){                    // 2. k這里其實表示 parent 在數(shù)組中的 index, k >= half 其實就說明 k 在數(shù)組中已經(jīng)沒有子節(jié)點
            int child = (k << 1) + 1;       // 3. 獲取 k 的左子節(jié)點的 index
            Object c = array[child];        // 4. 獲取左子節(jié)點的值
            int right = child + 1;          // 5. 獲取右子節(jié)點的 index
            if(right < n &&                 // 6. 這個 if 判斷其實是 判斷左右子節(jié)點的大小, 并且找到其中的最小值, 賦值給 c;
                    ((Comparable<? super T>)c).compareTo((T)array[right]) > 0
                c = array[child = right];
            if(key.compareTo((T)c) <= 0){   // 7. key <= c 則說明, 進行下面 sift 已經(jīng)完成 (父節(jié)點k已經(jīng)小于等于子節(jié)點), 直接 break 出
            array[k] = c;                   // 8. 代碼運行到這里說明 k > c惶看, 則將子數(shù)據(jù)c賦值到k的位置
            k = child;                      // 9. 將上次的子節(jié)點 child作為父節(jié)點, 再次下面進行比較, 直到 k >= half
        array[k] = key;                     // 10. 將key值賦值給最后一次進行 siftdown 比較的  父節(jié)點上


3. 添加元素 offer 方法

主要思路: 將添加的元素放置到數(shù)組的最尾端, 然后調(diào)用 siftUp 進行向上調(diào)整

 * Inserts the specified element into this priority queue
 * As the queue is unbounded, his method will never return {@code false}
 * @param e the lement to add
 * @return {@code true} (as specified element cannot be compared
 *          with elements currently in the priority queue according to the
 *          priority queue's ordering)
 * @throws NullPointerException if the specified element is null
public boolean offer(E e) {
    if(e != null){
        throw new NullPointerException();
    final ReentrantLock lock = this.lock;       // 1. 獲取全局共享的鎖
    int n, cap;
    Object[] array;                             // 2. 判斷容器是否需要擴容
    while((n = size) >= (cap = (array = queue).length)){
        tryGrow(array, cap);                    // 3. 進行擴容操作

        Comparator<? super E> cmp = comparator;
        if(cmp == null){                        // 4. 進行 保持 heap 性質(zhì)的 siftUp 操作
            siftUpComparable(n, e, array);
            siftUpUsingComparator(n, e, array, cmp);
        size = n + 1;                           // 5. 數(shù)據(jù)插入后, 整個容量值 + 1;
        notEmpty.signal();                      // 6. Condition 釋放信號, 告知其他等待的線程: 容器中已經(jīng)有元素
    }finally {
        lock.unlock();                          // 7. 釋放鎖
    return true;

在代碼中我們看到了 tryGrow, 這個調(diào)整堆存儲空間的方法
在里面運用了 先進行鎖的釋放 lock.unlock, 然后 根據(jù) allocationSpinLock 這個指標(biāo)判斷是否其他線程在進行擴容, 基本上每次擴容都是 * 1.5

 * Tries to grow array to accommodate at least one more element
 * (but normally expend by about 50%), giving up (allowing retry)
 * on contention (which we expect to be race). Call only this while
 * holding lock
 * @param array the heap array
 * @param oldCap    the length of the array
private void tryGrow(Object[] array, int oldCap){
     * tryGrow 數(shù)組容量擴容操作
     * 整個方法的執(zhí)行是在已經(jīng) ReentrantLock 獲取鎖的情況下進行的

    lock.unlock(); // must release and then re-acquire main lock // 1. 釋放全局的鎖(為什么呢? 原因也非常簡單, 這個 lock 是全局方法共享的, 為的是更好的并發(fā)性能, 而擴容操作的并發(fā)是通過簡單的樂觀鎖 allocationSpinLock 來進行控制de)
    Object[] newArray = null;
    if(allocationSpinLock == 0 &&                                // 2. 居于CAS操作, 在 allocationSpinLock 實現(xiàn)樂觀鎖, 這個也是為了在擴容時不影響容器的其他并發(fā)操作
            unsafe.compareAndSwapInt(this, allocationSpinLockOffset, 0, 1)){
            int newCap = oldCap + ((oldCap < 64)?                // 3. 容量若小于 64則直接 double + 2; 大于的話, 直接 * 1.5
                    (oldCap + 2): // grow faster if small
                    (oldCap >> 1)
            if(newCap - MAX_ARRAY_SIZE > 0){ // possible overflow
                int minCap = oldCap + 1;                         // 4. 擴容后超過最大容量處理
                if(minCap < 0 || minCap > MAX_ARRAY_SIZE){
                    throw new OutOfMemoryError();
                newCap = MAX_ARRAY_SIZE;
            if(newCap > oldCap && queue == array){              // 5. queue == array 若數(shù)組沒變化, 直接進行新建數(shù)組
                newArray = new Object[newCap];
        }finally {
            allocationSpinLock = 0;
                                                                // 6. newArray == null 說明上面的操作過程中, 有其他的線程進行了擴容的操作
    if(newArray == null){ // back off if another thread is allocating
        Thread.yield();                                         // 7. 讓出 CPU 調(diào)度(因為其他線程擴容后必定有其他的操作)
    lock.lock();                                                // 8. 重新獲取鎖
    if(newArray != null && queue == array){                     // 9. 判斷數(shù)組 queue 有沒有在其他線程中變化過
        queue = newArray;                                       // 10. 未變化, 直接進行賦值操作
        System.arraycopy(array, 0, newArray, 0, oldCap);

在進行offer元素時主要還調(diào)用了 siftUpComparable 方法
思路: 將元素與上面的 parent 進行比較, 直到 parent >= 這個元素

     * Insert item x at position k, maintaining heap invariant by
     * promoting x up the tree until it is greater than or equal to
     * its parent, or is the root
     * To simplify and speed up coercions and comparisons. the
     * Comparable and Comparator versions are separated into different
     * method that are otherwise identical. (Similarly for siftDown)
     * These methods are statics, with heap state as arguments, to
     * simplify use in light og possible comparator exceptions
     * @param k the position to fill
     * @param x the item to insert
     * @param array the heap array
     * @param <T>
    private static <T> void siftUpComparable(int k, T x, Object[] array){
         * 簡單的 siftUp 操作: 大體操作就是將元素x放置到k位置, 然后對k的parent進行比較, 直到 k>=parent為止
        Comparable<? super T> key = (Comparable<? super T>)x;
        while(k > 0){                           // 1. k是否到達二叉樹的頂端
            int parent = (k - 1) >>> 1;         // 2. 尋找 k 的parent位置
            Object e = array[parent];           // 3. 獲取parent的值
            if(key.compareTo((T)e) >= 0){       // 4. key >= e說明 parent >=子節(jié)點, 則不需要 siftUp 操作
            array[k] = e;                       // 5. 將上次比較中 parent節(jié)點的值放在子節(jié)點上
            k = parent;                         // 6. 將這次比較中的 parent 當(dāng)作下次比價的k(k是下次比較的子節(jié)點)
        array[k] = key;                         // 7. 將值key放置合適的位置上
4. 刪除元素 poll 方法

思路: 將元素的首節(jié)點拿出, 作為返回, 末尾節(jié)點放置到 index=0位置, 開始 siftDown直到 滿足 parent >= child

public E poll() {
    final ReentrantLock lock = this.lock;
    try {
        return dequeue();
    } finally {

private E dequeue(){
    int n = size - 1;
    if(n < 0){                              // 1. 判斷元素是否未空
        return null;                        // 2. 容器中沒有元素, 直接返回 null
        Object[] array = queue;
        E result = (E)array[0];             // 3. 取出數(shù)組中的第一個元素, 作為返回值
        E x = (E)array[n];                  // 4. 將數(shù)組的最后一個元素取出
        array[n] = null;
        Comparator<? super E> cmp = comparator;
        if(cmp == null){                    // 5. 將剛才取出的數(shù)組中最后一個元素放到第一個index位置, 進行siftDown操作(就是向下堆化操作)
            siftDownComparable(0, x, array, n);
            siftDownUsingComparator(0, x, array, n, cmp);
        size = n;                           // 6. 重新賦值 size值
        return result;                      // 7. 返回取出的值
  1. 尋找待刪除元素在數(shù)組中的位置
  2. 刪除待刪除的元素, 將數(shù)組中的最后一個元素放置到這個位置, 先進行 siftDown 尋找合適放置的位置, 然后再siftUp 尋找合適的放置位置
 * Remove a single instance of the specified element from this queue,
 * if it is present. More formally, removes an element {@code e} such
 * that {@code o.equal(e)}, if this queue contains one or more such
 * elements. Returns {@code true} if and onlu if this queue contained
 * the specified element (or equivalently, if this queue changed as
 * a result of the call)
 * @param o element to removed from this queue, if present
 * @return {@code true} if this queue changed as a result of the call
public boolean remove(Object o){
     * 刪除堆中對應(yīng)的元素
    final ReentrantLock lock = this.lock;
        int i = indexOf(o);     // 1. 找出元素 o 在堆中的位置
        if(i == -1){
            return false;
        removeAt(i);            // 2. 調(diào)用 removeAt 定點刪除元素
        return true;
    }finally {

 * Removes the ith element from queue
 * @param i
private void removeAt(int i){
     * 刪除堆中指定位置的元素
    Object[] array = queue;
    int n = size - 1;
    if(n == i){ // remove last lement                           // 1. 若元素是末尾元素, 則直接進行刪除操作
        array[i] = null;
        E moved = (E)array[n];                                  // 2. 獲取待堆中最后的值(這個不是最大值)
        array[n] = null;                                        // 3. 將對應(yīng)元素置空
        Comparator<? super E> cmp = comparator;
        if(cmp == null){
            siftDownComparable(i, moved, array, n);             // 4. 將最后值 moved 放在 i 位置進行 siftDown
            siftDownUsingComparator(i, moved, array, n, cmp);
        if(array[i] == moved){                                  // 5. array[i] = moved 說明 siftDown 沒起作用, 節(jié)點 moved可能應(yīng)該在堆上面的位置, 所以進行 siftUp, 從而將 moved 放在上面堆中某個位置
            if(cmp == null){
                siftUpComparable(i, moved, array);
                siftUpUsingComparator(i, moved, array, cmp);
    size = n;                                                   // 6. 進行size重新賦值操作

總結(jié): PriorityBlockingQueue 是一個基于二叉堆實現(xiàn)的優(yōu)先級隊列, 與其他隊列不同的是它支持高優(yōu)先級的數(shù)據(jù)線poll出, 在有優(yōu)先級需要的生產(chǎn)者消費者場景中用這個類比較合適.

