Java 優(yōu)先隊列 PriorityQueue PriorityBlockingQueue 源碼分析

基本使用

@Test
public void testPriorityQueue() throws InterruptedException {
    PriorityQueue priorityQueue = new PriorityQueue(Lists.newArrayList(5, 4, 2, 1, 3));
    System.out.println(priorityQueue);
    System.out.println(priorityQueue.poll());
    System.out.println(priorityQueue.poll());

    PriorityBlockingQueue<Integer> blockingQueue = new PriorityBlockingQueue<>();
    blockingQueue.add(5);
    System.out.println(blockingQueue.poll());
    System.out.println(blockingQueue.take());
}

輸出

[1, 3, 2, 4, 5]
1
2
5
(阻塞)

PriorityQueue

成員變量

/**
 * Priority queue represented as a balanced binary heap: the two
 * children of queue[n] are queue[2*n+1] and queue[2*(n+1)].  The
 * priority queue is ordered by comparator, or by the elements'
 * natural ordering, if comparator is null: For each node n in the
 * heap and each descendant d of n, n <= d.  The element with the
 * lowest value is in queue[0], assuming the queue is nonempty.
 */
transient Object[] queue; // non-private to simplify nested class access

/**
 * The number of elements in the priority queue.
 */
private int size = 0;

/**
 * The comparator, or null if priority queue uses elements'
 * natural ordering.
 */
private final Comparator<? super E> comparator;

/**
 * The number of times this priority queue has been
 * <i>structurally modified</i>.  See AbstractList for gory details.
 */
transient int modCount = 0; // non-private to simplify nested class access

通過數(shù)組實現(xiàn)一個堆客蹋,元素在queue數(shù)組中并不是完全有序的塞蹭,僅堆頂元素最大或最小。

基本方法

public E poll() {
    if (size == 0)
        return null;
    int s = --size;
    modCount++;
    E result = (E) queue[0];
    E x = (E) queue[s];
    queue[s] = null;
    if (s != 0)
        siftDown(0, x);
    return result;
}

/**
 * Inserts item x at position k, maintaining heap invariant by
 * demoting x down the tree repeatedly until it is less than or
 * equal to its children or is a leaf.
 *
 * @param k the position to fill
 * @param x the item to insert
 */
private void siftDown(int k, E x) {
    if (comparator != null)
        siftDownUsingComparator(k, x);
    else
        siftDownComparable(k, x);
}

@SuppressWarnings("unchecked")
private void siftDownComparable(int k, E x) {
    Comparable<? super E> key = (Comparable<? super E>)x;
    int half = size >>> 1;        // loop while a non-leaf
    while (k < half) {
        int child = (k << 1) + 1; // assume left child is least
        Object c = queue[child];
        int right = child + 1;
        if (right < size &&
            ((Comparable<? super E>) c).compareTo((E) queue[right]) > 0)
            c = queue[child = right];
        if (key.compareTo((E) c) <= 0)
            break;
        queue[k] = c;
        k = child;
    }
    queue[k] = key;
}

以poll方法為例讶坯,實際上是獲取堆頂元素番电,然后調(diào)整堆。

調(diào)整堆的方法(以大頂堆為例):

  1. 判斷是否傳入comparator辆琅,有則按照comparator排序漱办,否則按照自然順序排序
  2. 取節(jié)點左右孩子節(jié)點最大值,與父親節(jié)點交換

擴容方法

/**
 * Increases the capacity of the array.
 *
 * @param minCapacity the desired minimum capacity
 */
private void grow(int minCapacity) {
    int oldCapacity = queue.length;
    // Double size if small; else grow by 50%
    int newCapacity = oldCapacity + ((oldCapacity < 64) ?
                                     (oldCapacity + 2) :
                                     (oldCapacity >> 1));
    // overflow-conscious code
    if (newCapacity - MAX_ARRAY_SIZE > 0)
        newCapacity = hugeCapacity(minCapacity);
    queue = Arrays.copyOf(queue, newCapacity);
}

private static int hugeCapacity(int minCapacity) {
    if (minCapacity < 0) // overflow
        throw new OutOfMemoryError();
    return (minCapacity > MAX_ARRAY_SIZE) ?
        Integer.MAX_VALUE :
        MAX_ARRAY_SIZE;
}
  1. 小容量擴容1倍
  2. 大容量擴容0.5倍
  3. 快溢出時調(diào)整為Integer.MAX_VALUE - 8 或 Integer.MAX_VALUE

是否線程安全

非線程安全

PriorityBlockingQueue

其實現(xiàn)基本與PriorityQueue一致婉烟,不過PriorityBlockingQueue是線程安全的娩井,并且實現(xiàn)了BlockingQueue接口,在隊列為空時take會阻塞似袁。

/**
 * Priority queue represented as a balanced binary heap: the two
 * children of queue[n] are queue[2*n+1] and queue[2*(n+1)].  The
 * priority queue is ordered by comparator, or by the elements'
 * natural ordering, if comparator is null: For each node n in the
 * heap and each descendant d of n, n <= d.  The element with the
 * lowest value is in queue[0], assuming the queue is nonempty.
 */
private transient Object[] queue;

/**
 * The number of elements in the priority queue.
 */
private transient int size;

/**
 * The comparator, or null if priority queue uses elements'
 * natural ordering.
 */
private transient Comparator<? super E> comparator;

/**
 * Lock used for all public operations
 */
private final ReentrantLock lock;

/**
 * Condition for blocking when empty
 */
private final Condition notEmpty;

/**
 * Spinlock for allocation, acquired via CAS.
 */
private transient volatile int allocationSpinLock;

/**
 * A plain PriorityQueue used only for serialization,
 * to maintain compatibility with previous versions
 * of this class. Non-null only during serialization/deserialization.
 */
private PriorityQueue<E> q;

和PriorityQueue的區(qū)別:增加了

  1. 重入鎖ReentrantLock
  2. Condition洞辣,用于隊列空情況下的阻塞
  3. allocationSpinLock,通過CAS手段對queue擴容
private void tryGrow(Object[] array, int oldCap) {
    lock.unlock(); // must release and then re-acquire main lock
    Object[] newArray = null;
    if (allocationSpinLock == 0 &&
        UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
                                 0, 1)) {
        try {
            int newCap = oldCap + ((oldCap < 64) ?
                                   (oldCap + 2) : // grow faster if small
                                   (oldCap >> 1));
            if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
                int minCap = oldCap + 1;
                if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                    throw new OutOfMemoryError();
                newCap = MAX_ARRAY_SIZE;
            }
            if (newCap > oldCap && queue == array)
                newArray = new Object[newCap];
        } finally {
            allocationSpinLock = 0;
        }
    }
    if (newArray == null) // back off if another thread is allocating
        Thread.yield();
    lock.lock();
    if (newArray != null && queue == array) {
        queue = newArray;
        System.arraycopy(array, 0, newArray, 0, oldCap);
    }
}

可以看到與PriorityQueue的擴容函數(shù)很像昙衅,不同點:

  1. 調(diào)用函數(shù)時必須持有鎖
  2. 使用CAS方法進行擴容扬霜,在allocationSpinLock為0,并且CAS將其置為1時而涉,線程才能夠?qū)?shù)組進行擴容著瓶。如果多個線程并發(fā)擴容,其余線程會調(diào)用Thread.yield()方法婴谱。

為什么這樣實現(xiàn)PriorityBlockingQueue擴容蟹但?

因為PriorityBlockingQueue內(nèi)部使用的ReentrantLock重入鎖,同一個線程多次調(diào)用add函數(shù)谭羔,可能恰好同時調(diào)用了tryGrow函數(shù)华糖。此時通過重入鎖是無法加鎖的,僅能通過Synchronized或CAS方式控制并發(fā)瘟裸。

allocationSpinLock是transient的客叉,因為序列化時并不需要此參數(shù);同時又是volatile的话告,因為可能有多個線程同時調(diào)用兼搏。

private transient volatile int allocationSpinLock;

UNSAFE.compareAndSwapInt

// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long allocationSpinLockOffset;
static {
    try {
        UNSAFE = sun.misc.Unsafe.getUnsafe();
        Class<?> k = PriorityBlockingQueue.class;
        allocationSpinLockOffset = UNSAFE.objectFieldOffset
            (k.getDeclaredField("allocationSpinLock"));
    } catch (Exception e) {
        throw new Error(e);
    }
}

調(diào)用方法

UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset, 0, 1)

allocationSpinLockOffset是allocationSpinLock變量在PriorityBlockingQueue類中的偏移量。

那么使用allocationSpinLockOffset有什么好處呢沙郭?它和直接修改allocationSpinLock變量有什么區(qū)別佛呻?

獲取該字段在類中的內(nèi)存偏移量,直接將內(nèi)存中的值改為新值病线。直接修改allocationSpinLock并不是CAS吓著。JDK 1.8代碼如下:

public final int getAndAddInt(Object var1, long var2, int var4) {
    int var5;
    do {
        var5 = this.getIntVolatile(var1, var2);
    } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

    return var5;
}

在AtomicInteger類中的調(diào)用如下,getAndAddInt方法由具體類的實現(xiàn)方法送挑,抽取到了UNSAFE類中:

public final int getAndDecrement() {
    return unsafe.getAndAddInt(this, valueOffset, -1);
}

對比 PriorityQueue 和 PriorityBlockingQueue

  1. PriorityQueue是非線程安全的绑莺,PriorityBlockingQueue是線程安全的
  2. PriorityBlockingQueue使用重入鎖,每一個操作都需要加鎖
  3. PriorityBlockingQueue擴容時使用了CAS操作
  4. 兩者都使用了堆惕耕,算法原理相同
  5. PriorityBlockingQueue可以在queue為空時阻塞take操作

JDK實現(xiàn)堆的方法

/**
 * Establishes the heap invariant (described above) in the entire tree,
 * assuming nothing about the order of the elements prior to the call.
 */
@SuppressWarnings("unchecked")
private void heapify() {
    for (int i = (size >>> 1) - 1; i >= 0; i--)
        siftDown(i, (E) queue[i]);
}

private void siftDown(int k, E x) {
    if (comparator != null)
        siftDownUsingComparator(k, x);
    else
        siftDownComparable(k, x);
}

@SuppressWarnings("unchecked")
private void siftDownComparable(int k, E x) {
    Comparable<? super E> key = (Comparable<? super E>)x;
    int half = size >>> 1;        // loop while a non-leaf
    while (k < half) {
        int child = (k << 1) + 1; // assume left child is least
        Object c = queue[child];
        int right = child + 1;
        if (right < size &&
            ((Comparable<? super E>) c).compareTo((E) queue[right]) > 0)
            c = queue[child = right];
        if (key.compareTo((E) c) <= 0)
            break;
        queue[k] = c;
        k = child;
    }
    queue[k] = key;
}

public boolean offer(E e) {
    if (e == null)
        throw new NullPointerException();
    modCount++;
    int i = size;
    if (i >= queue.length)
        grow(i + 1);
    size = i + 1;
    if (i == 0)
        queue[0] = e;
    else
        siftUp(i, e);
    return true;
}

private void siftUp(int k, E x) {
    if (comparator != null)
        siftUpUsingComparator(k, x);
    else
        siftUpComparable(k, x);
}

@SuppressWarnings("unchecked")
private void siftUpComparable(int k, E x) {
    Comparable<? super E> key = (Comparable<? super E>) x;
    while (k > 0) {
        int parent = (k - 1) >>> 1;
        Object e = queue[parent];
        if (key.compareTo((E) e) >= 0)
            break;
        queue[k] = e;
        k = parent;
    }
    queue[k] = key;
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末纺裁,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子司澎,更是在濱河造成了極大的恐慌欺缘,老刑警劉巖,帶你破解...
    沈念sama閱讀 217,826評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件惭缰,死亡現(xiàn)場離奇詭異浪南,居然都是意外死亡,警方通過查閱死者的電腦和手機漱受,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,968評論 3 395
  • 文/潘曉璐 我一進店門络凿,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人昂羡,你說我怎么就攤上這事絮记。” “怎么了虐先?”我有些...
    開封第一講書人閱讀 164,234評論 0 354
  • 文/不壞的土叔 我叫張陵怨愤,是天一觀的道長。 經(jīng)常有香客問我蛹批,道長撰洗,這世上最難降的妖魔是什么篮愉? 我笑而不...
    開封第一講書人閱讀 58,562評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮差导,結(jié)果婚禮上试躏,老公的妹妹穿的比我還像新娘。我一直安慰自己设褐,他們只是感情好颠蕴,可當(dāng)我...
    茶點故事閱讀 67,611評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著助析,像睡著了一般犀被。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上外冀,一...
    開封第一講書人閱讀 51,482評論 1 302
  • 那天寡键,我揣著相機與錄音,去河邊找鬼雪隧。 笑死昌腰,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的膀跌。 我是一名探鬼主播遭商,決...
    沈念sama閱讀 40,271評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼捅伤!你這毒婦竟也來了劫流?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,166評論 0 276
  • 序言:老撾萬榮一對情侶失蹤丛忆,失蹤者是張志新(化名)和其女友劉穎祠汇,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體熄诡,經(jīng)...
    沈念sama閱讀 45,608評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡可很,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,814評論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了凰浮。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片我抠。...
    茶點故事閱讀 39,926評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖袜茧,靈堂內(nèi)的尸體忽然破棺而出菜拓,到底是詐尸還是另有隱情,我是刑警寧澤笛厦,帶...
    沈念sama閱讀 35,644評論 5 346
  • 正文 年R本政府宣布纳鼎,位于F島的核電站,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏贱鄙。R本人自食惡果不足惜劝贸,卻給世界環(huán)境...
    茶點故事閱讀 41,249評論 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望逗宁。 院中可真熱鬧悬荣,春花似錦、人聲如沸疙剑。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,866評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽言缤。三九已至,卻和暖如春禁灼,著一層夾襖步出監(jiān)牢的瞬間管挟,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,991評論 1 269
  • 我被黑心中介騙來泰國打工弄捕, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留僻孝,地道東北人。 一個月前我還...
    沈念sama閱讀 48,063評論 3 370
  • 正文 我出身青樓守谓,卻偏偏與公主長得像穿铆,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子斋荞,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,871評論 2 354

推薦閱讀更多精彩內(nèi)容

  • Java-Review-Note——4.多線程 標(biāo)簽: JavaStudy PS:本來是分開三篇的荞雏,后來想想還是整...
    coder_pig閱讀 1,649評論 2 17
  • 在一個方法內(nèi)部定義的變量都存儲在棧中,當(dāng)這個函數(shù)運行結(jié)束后平酿,其對應(yīng)的棧就會被回收凤优,此時,在其方法體中定義的變量將不...
    Y了個J閱讀 4,417評論 1 14
  • 本人經(jīng)歷了幾家大公司的Java研發(fā)的面試蜈彼,現(xiàn)就面試中筑辨,經(jīng)常遇到的問題整理如下:(java研發(fā)工程師) 不吹不捧,本...
    chansonpro閱讀 892評論 0 7
  • 第一章:Java程序設(shè)計概述 Java和C++最大的不同在于Java采用的指針模型可以消除重寫內(nèi)存和損壞數(shù)據(jù)的可能...
    loneyzhou閱讀 1,251評論 1 7
  • 窗 外 窗外---- 天兒蔚藍(lán)幸逆, 陽光明媚棍辕, 鳥兒歡笑…… 晨起---- 拉開窗簾, 窗簾如幕簾--- 人生戲又開始……
    東原郡人閱讀 170評論 0 1