一、概述
??有關(guān)優(yōu)先級隊列PriorityQueue的學(xué)習(xí)挫鸽,我們前面已經(jīng)學(xué)習(xí)過了恭应,而PriorityBlockingQueue 同樣是優(yōu)先級隊列,并且是一種無界的阻塞隊列阴孟,該隊列使用了和PriorityQueue相同的排序規(guī)則,在此基礎(chǔ)上提供了阻塞的操作税迷;雖然該隊列在邏輯上是無容量限制的永丝,但實際上是有最大容量限制的,超過最大容量有可能會導(dǎo)致OutOfMemoryError箭养。我們再來簡單看下優(yōu)先級隊列的一些特性:
- 和普通的先進(jìn)先出(FIFO)的隊列不同慕嚷,優(yōu)先隊列每次取出的元素都是隊列中優(yōu)先級最高的,PriorityBlockingQueue默認(rèn)優(yōu)先級最高的是元素最小的值毕泌,當(dāng)然也可以按照我們指定的規(guī)則來自定義優(yōu)先級喝检;
- 隊列不允許null元素;
- 默認(rèn)情況下該隊列根據(jù)元素的自然順序進(jìn)行排序撼泛,或者根據(jù)傳入的比較器進(jìn)行排序挠说,但該隊列不保證具有相同優(yōu)先級的元素的排序;
- 和PriorityQueue很像愿题,PriorityBlockingQueue底層也是通過數(shù)組來實現(xiàn)的损俭;默認(rèn)情況下,PriorityBlockingQueue有默認(rèn)的隊列容量大小潘酗,但當(dāng)隊列滿了之后杆兵,隊列會自動擴容,直到最大容量仔夺;
- 隊列是基于小頂堆(或者最小堆)來實現(xiàn)的琐脏,也就是說小頂堆堆根結(jié)點是所有數(shù)據(jù)中最小的元素,并且堆中每個結(jié)點的值總是不大于其孩子結(jié)點的值。
用圖來簡單的對最小堆舉個例子:
由于PriorityBlockingQueue對底層存儲結(jié)構(gòu)是數(shù)組日裙,所以我們特地對元素標(biāo)了編號吹艇,其實這也就是在數(shù)組中的下標(biāo)值,最終存儲格式可以看下圖:
可以觀察下父子之間的編號阅签,也就是在數(shù)組中的下標(biāo)值掐暮,可以發(fā)現(xiàn):
如果節(jié)點的下標(biāo)是
i
,那么i
節(jié)點對應(yīng)的子節(jié)點在數(shù)組中的位置分別是:2i + 1
政钟,2i + 2
路克,同時i
的父節(jié)點的位置是(i -1)/2
,根據(jù)這個公式养交,我們可以很方便的找到每個節(jié)點的父親節(jié)點和孩子節(jié)點精算。
其中,對堆最主要的操作有兩種:上浮和下沉碎连,但前提是堆是有序的灰羽,后續(xù)我們結(jié)合方法再來學(xué)習(xí)這兩個主要操作。
二鱼辙、PriorityBlockingQueue
接下來我們來學(xué)習(xí)PriorityBlockingQueue的源碼實現(xiàn)廉嚼,首先我們還是先來看下繼承結(jié)構(gòu)。
1. 繼承結(jié)構(gòu)
public class PriorityBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
可以看到倒戏,和其他的阻塞隊列差不多怠噪,實現(xiàn)了BlockingQueue,然后繼承了AbstractQueue杜跷,并且支持序列化傍念。接下來來看下屬性。
2. 屬性
/**
* 默認(rèn)隊列的容量
*/
private static final int DEFAULT_INITIAL_CAPACITY = 11;
/**
* 隊列的最大容量葛闷,超過該容量會導(dǎo)致OutOfMemoryError
*/
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
/**
* 隊列底層通過數(shù)組來實現(xiàn)
*/
private transient Object[] queue;
/**
* 隊列中元素數(shù)量
*/
private transient int size;
/**
* 比較器
*/
private transient Comparator<? super E> comparator;
/**
* Lock used for all public operations
* 用于隊列操作的可重入鎖
*/
private final ReentrantLock lock;
/**
* 隊列不為空的Condition條件
*/
private final Condition notEmpty;
/**
* 專為隊列擴容時用的鎖
*/
private transient volatile int allocationSpinLock;
/**
* 普通優(yōu)先級隊列PriorityQueue憋槐,僅用于序列化
*/
private PriorityQueue<E> q;
可以看到,PriorityBlockingQueue 底層還是通過數(shù)組來實現(xiàn)淑趾,并且支持設(shè)置初始化容量和指定相應(yīng)的比較器阳仔,但該容量只是初始容量,當(dāng)隊列滿了之后扣泊,數(shù)組會進(jìn)行擴容操作驳概,直到達(dá)到最大容量限制,達(dá)到最大容量限制后旷赖,再添加就會拋出異常顺又,而不是進(jìn)行阻塞。
3. 構(gòu)造方法
接下來我們來簡單看下該隊列的構(gòu)造方法:
public PriorityBlockingQueue() {
// 默認(rèn)初始化容量
this(DEFAULT_INITIAL_CAPACITY, null);
}
public PriorityBlockingQueue(int initialCapacity) {
// 指定初始化容量
this(initialCapacity, null);
}
public PriorityBlockingQueue(int initialCapacity,
Comparator<? super E> comparator) {
if (initialCapacity < 1)
throw new IllegalArgumentException();
// 構(gòu)造可重入鎖
this.lock = new ReentrantLock();
this.notEmpty = lock.newCondition();
// 比較器
this.comparator = comparator;
// 數(shù)組初始化容量
this.queue = new Object[initialCapacity];
}
前三個構(gòu)造方法都比較簡單等孵,來看下最后一個構(gòu)造方法:
public PriorityBlockingQueue(Collection<? extends E> c) {
// 構(gòu)建可重入鎖及對應(yīng)的Condition條件
this.lock = new ReentrantLock();
this.notEmpty = lock.newCondition();
// 堆是否進(jìn)行有序化操作稚照,因為原先集合可能不是有序的
boolean heapify = true; // true if not known to be in heap order
// 堆是否要掃描null值
boolean screen = true; // true if must screen for nulls
// 如果集合屬于有序的SortedSet集合,獲取SortedSet的比較器,并將heapify設(shè)置為false
if (c instanceof SortedSet<?>) {
SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
this.comparator = (Comparator<? super E>) ss.comparator();
heapify = false;
}
// 否則果录,如果集合本身就是一個 PriorityBlockingQueue隊列
else if (c instanceof PriorityBlockingQueue<?>) {
PriorityBlockingQueue<? extends E> pq =
(PriorityBlockingQueue<? extends E>) c;
// 獲取集合的比較器上枕,并將screen設(shè)置為false,因為PriorityBlockingQueue本身就不包含null弱恒,不用再次掃描
this.comparator = (Comparator<? super E>) pq.comparator();
screen = false;
// 并且PriorityBlockingQueue本身就有序辨萍,也不用再次有序化
if (pq.getClass() == PriorityBlockingQueue.class) // exact match
heapify = false;
}
// 將集合轉(zhuǎn)化成數(shù)組
Object[] a = c.toArray();
// 獲取數(shù)組長度
int n = a.length;
// If c.toArray incorrectly doesn't return Object[], copy it.
// 如果此時a的數(shù)據(jù)類型不是Object[],拷貝一份
if (a.getClass() != Object[].class)
a = Arrays.copyOf(a, n, Object[].class);
// 如果需要掃描null并且(要么元素就1個返弹,要么元素可排序)
if (screen && (n == 1 || this.comparator != null)) {
for (int i = 0; i < n; ++i)
if (a[i] == null)
throw new NullPointerException();
}
// 數(shù)組賦值
this.queue = a;
// 容量賦值
this.size = n;
// 有序化操作
if (heapify)
heapify();
}
最后一個構(gòu)造方法有點小復(fù)雜锈玉,需要先判斷是否需要排序,是否需要掃描集合中值為null的元素义起,然后這兩者操作完成之后拉背,如果需要排序,再進(jìn)行有序化操作:
private void heapify() {
Object[] array = queue;
int n = size;
// 計算非葉子結(jié)點元素的最大位置默终,循環(huán)的開始條件(在最后一個非葉子節(jié)點處開始椅棺,直到根節(jié)點)
int half = (n >>> 1) - 1;
Comparator<? super E> cmp = comparator;
if (cmp == null) {
// 循環(huán)進(jìn)行下沉操作
for (int i = half; i >= 0; i--)
siftDownComparable(i, (E) array[i], array, n);
}
else {
for (int i = half; i >= 0; i--)
siftDownUsingComparator(i, (E) array[i], array, n, cmp);
}
}
所謂的有序化,其實就是循環(huán)下沉或者上浮操作齐蔽,這里采用的是循環(huán)下沉操作两疚,從非葉子結(jié)點元素的最大位置開始依次向上比較調(diào)整,直到根節(jié)點為止含滴。
4. 方法
4.1 add/put/offer方法
首先诱渤,我們來看下入隊的幾個方法add/put/offer,由于最終都是通過offer方法來實現(xiàn)的蛙吏,所以我們主要來看下offer方法源哩。
public boolean add(E e) {
return offer(e);
}
public void put(E e) {
offer(e); // never need to block
}
public boolean offer(E e, long timeout, TimeUnit unit) {
return offer(e); // never need to block
}
public boolean offer(E e) {
// 元素不能為空
if (e == null)
throw new NullPointerException();
final ReentrantLock lock = this.lock;
// 獲取鎖
lock.lock();
int n, cap;
Object[] array;
// 如果元素數(shù)量大于數(shù)組容量鞋吉,進(jìn)行擴容操作
while ((n = size) >= (cap = (array = queue).length))
tryGrow(array, cap);
try {
// 獲取比較器
Comparator<? super E> cmp = comparator;
// 如果沒有比較器鸦做,根據(jù)默認(rèn)自然規(guī)則進(jìn)行排序
// 然后進(jìn)行上浮操作
if (cmp == null)
siftUpComparable(n, e, array);
else
// 有比較器的上浮操作
siftUpUsingComparator(n, e, array, cmp);
// 隊列中元素數(shù)量+1
size = n + 1;
// 元素添加完成,說明隊列不為空了谓着,喚醒在notEmpty條件上的線程
notEmpty.signal();
} finally {
lock.unlock();
}
// 由于隊列無容量限制泼诱,所以會一直返回true
return true;
}
可以看到,添加元素的時候還是挺簡單的:
- 首先對元素進(jìn)行非空校驗赊锚,然后獲取可重入鎖治筒;
- 然后循環(huán)判斷隊列元素的數(shù)量是否大于數(shù)組容量,如果大于進(jìn)行擴容操作舷蒲;
- 獲取比較器耸袜,添加元素,然后進(jìn)行上浮操作牲平;
- 最后喚醒notEmpty條件上的線程堤框;
這里會調(diào)用擴容方法進(jìn)行擴容操作,我們來看下這個方法。
4.2 tryGrow方法
private void tryGrow(Object[] array, int oldCap) {
// 擴容前必須先釋放鎖蜈抓,然后再重新獲取
lock.unlock(); // must release and then re-acquire main lock
Object[] newArray = null;
// 如果allocationSpinLock == 0启绰,通過CAS方法設(shè)置為1,表示同一時刻沟使,只有一個線程可以擴容操作
if (allocationSpinLock == 0 &&
UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
0, 1)) {
try {
// 這里擴容的時候做了一點小處理委可,后續(xù)詳細(xì)說
int newCap = oldCap + ((oldCap < 64) ?
(oldCap + 2) : // grow faster if small
(oldCap >> 1));
// 該模塊是為了保證數(shù)組的容量不超過最大容量,如果超過了腊嗡,提示OutOfMemoryError錯誤
if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow
int minCap = oldCap + 1;
if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
throw new OutOfMemoryError();
newCap = MAX_ARRAY_SIZE;
}
// 如果數(shù)組沒有被修改着倾,生成新的容量的數(shù)組
if (newCap > oldCap && queue == array)
newArray = new Object[newCap];
} finally {
// 將allocationSpinLock重新設(shè)置為0,表示當(dāng)前線程操作完成
allocationSpinLock = 0;
}
}
// 如果其他線程對隊列進(jìn)行了操作叽唱,則放棄擴容屈呕,并退出
if (newArray == null) // back off if another thread is allocating
Thread.yield();
// 再次加鎖
lock.lock();
// 再次判斷queue == array是否相等,相等的話棺亭,進(jìn)行元素的復(fù)制
if (newArray != null && queue == array) {
queue = newArray;
System.arraycopy(array, 0, newArray, 0, oldCap);
}
}
??可以看到擴容方法中虎眨,使用了allocationSpinLock
這個變量,在擴容前镶摘,該方法會釋放鎖嗽桩,那么這時候就可能有其他的線程進(jìn)行操作,為了保證擴容時的線程安全凄敢,所以添加了這個變量碌冶。在擴容的時候,會通過CAS操作把這個值設(shè)置為1涝缝,表示只有一個線程可以執(zhí)行擴容操作扑庞,擴容操作完成后,會重置該值為1拒逮。
而要判斷其他線程是否對隊列元素進(jìn)行了修改的話罐氨,是通過
queue == array
來判斷,如果其他線程對隊列進(jìn)行了修改滩援,那么就會放棄擴容栅隐,因此在offer
中會看到有一個 while 循環(huán)來判斷是否真正需要擴容。
另外玩徊,針對擴容的容量這點代碼租悄,來簡單說下:
int newCap = oldCap + ((oldCap < 64) ?
(oldCap + 2) : // grow faster if small
(oldCap >> 1));
如果原先隊列的容量小于64,則擴充相當(dāng)于1倍恩袱;如果容量大于64泣棋,則擴容原先容量的50%。也就是說畔塔,如果隊列原先容量很小的話潭辈,那就多擴充些纪吮,如果容量大一點的話,那就少擴充些萎胰。
借用PriorityQueue 的說法碾盟,就是:Double size if small; else grow by 50%
4.3 siftUpComparable方法
然后這里還會調(diào)用到上浮的操作:沒有比較器的siftUpComparable
和有比較器的siftUpUsingComparator
,因為兩者操作相似技竟,這里我們直接來看 siftUpComparable 方法即可:
private static <T> void siftUpComparable(int k, T x, Object[] array) {
// 首先獲取元素本身默認(rèn)的比較器對象
Comparable<? super T> key = (Comparable<? super T>) x;
// 這里的k表示元素要插入的位置冰肴,第一次循環(huán)的時候表示的array[size]位置,也就是數(shù)組中最后一個元素
// 的下一個元素的位置榔组,這里會循環(huán)操作
while (k > 0) {
// 找到該元素的父元素的位置熙尉,parent = (thisNode-1)/2,這里使用了無符號右移
int parent = (k - 1) >>> 1;
// 獲取父元素
Object e = array[parent];
// 如果當(dāng)前元素大于等于父元素搓扯,直接退出循環(huán)检痰,說明元素不需要上浮
if (key.compareTo((T) e) >= 0)
break;
// 如果當(dāng)前元素小于父元素,將父元素進(jìn)行下沉到k所在的節(jié)點處
array[k] = e;
// 然后重置k锨推,從父元素位置再次向上接著進(jìn)行判斷
k = parent;
}
// 找到最終的位置k铅歼,將元素保存在這
array[k] = key;
}
上浮的操作其實很簡單,我們在前面學(xué)習(xí) PriorityQueue 的時候就已經(jīng)介紹過了换可,這里只需要看一下流程操作圖即可:
4.4 poll方法
接下來椎椰,我們來看下出隊的幾個方法,首先來看下poll方法沾鳄。poll方法本身沒什么好說的慨飘,主要是該方法會調(diào)用出隊的主要方法 dequeue
,我們主要就是來看下這個方法:
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return dequeue();
} finally {
lock.unlock();
}
}
private E dequeue() {
// 數(shù)組中元素數(shù)量-1
int n = size - 1;
// 如果數(shù)組中沒有元素译荞,直接返回null
if (n < 0)
return null;
else {
Object[] array = queue;
// 獲取數(shù)組的第一個元素瓤的,也就是要出隊的元素
E result = (E) array[0];
// 獲取數(shù)組的最后一個元素 array[size - 1]
E x = (E) array[n];
// 將數(shù)組的最后一個元素設(shè)置為null
array[n] = null;
Comparator<? super E> cmp = comparator;
if (cmp == null)
// 進(jìn)行元素的下沉操作
siftDownComparable(0, x, array, n);
else
siftDownUsingComparator(0, x, array, n, cmp);
// 數(shù)組中數(shù)量 - 1
size = n;
// 返回數(shù)組原先的第一個元素
return result;
}
}
其實,出隊的操作流程也很簡單吞歼,獲取第一個元素出隊圈膏,然后將最后一個元素移動到第一個元素位置(并不是真的放到第一個位置),然后進(jìn)行下沉操作即可浆熔。這里主要的方法是下沉方法siftDownComparable
和siftDownUsingComparator
本辐,同樣桥帆,我們來看下 siftDownComparable方法医增。
4.5 siftDownComparable方法
private static <T> void siftDownComparable(int k, T x, Object[] array,
int n) {
if (n > 0) {
Comparable<? super T> key = (Comparable<? super T>)x;
// 計算非葉子結(jié)點元素的最大位置,循環(huán)的終止條件(在最后一個非葉子節(jié)點處結(jié)束)
int half = n >>> 1; // loop while a non-leaf
// 其中k表示數(shù)組中原先 最后一個元素要放置的位置
while (k < half) {
// 計算k位置的左孩子的位置 => 2 * k + 1
int child = (k << 1) + 1; // assume left child is least
// 獲取k位置的左孩子的值
Object c = array[child];
// 獲取k位置的右孩子的位置
int right = child + 1;
// 獲取左右孩子中值較小的值
if (right < n &&
((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
c = array[child = right];
// 如果x 比左右孩子都小老虫,不用下沉了叶骨,跳出循環(huán)
if (key.compareTo((T) c) <= 0)
break;
// c節(jié)點下沉
array[k] = c;
// 重置k值
k = child;
}
// 替換k位置處的值
array[k] = key;
}
}
由于PriorityQueue中也已經(jīng)介紹過了,所以這里也就不多介紹了祈匙,來簡單看下操作圖即可:
4.6 poll(long, TimeUnit)方法
這個是超時的poll方法忽刽,表示獲取并移除隊列的隊頭元素天揖,如果沒有獲取到對應(yīng)的值,等待相應(yīng)的超時時間跪帝。
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
// 獲取超時時間今膊,納秒
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
// 可中斷鎖
lock.lockInterruptibly();
E result;
try {
// 如果沒有獲取到元素,并且沒有超時伞剑,進(jìn)行等待
while ( (result = dequeue()) == null && nanos > 0)
nanos = notEmpty.awaitNanos(nanos);
} finally {
lock.unlock();
}
return result;
}
4.7 take方法
take是出隊的阻塞方法斑唬,表示獲取并移除隊列的隊頭元素,如果沒有獲取到元素黎泣,一直等待:
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
// 可中斷鎖
lock.lockInterruptibly();
E result;
try {
// 如果沒有獲取到元素恕刘,一直等待,直到被喚醒
while ( (result = dequeue()) == null)
notEmpty.await();
} finally {
lock.unlock();
}
return result;
}
4.8 peek方法
peek方法只獲取隊頭元素抒倚,不移除元素:
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 獲取隊頭元素
return (size == 0) ? null : (E) queue[0];
} finally {
lock.unlock();
}
}
4.9 writeObject方法
從上面的操作我們可以看到褐着,PriorityBlockingQueue中的屬性private PriorityQueue<E> q;
一直沒有用到,因為該屬性在該對象中只有一個用處就是用于序列化的托呕,我們來看下序列化方法writeObject:
private void writeObject(java.io.ObjectOutputStream s)
throws java.io.IOException {
lock.lock();
try {
// avoid zero capacity argument
q = new PriorityQueue<E>(Math.max(size, 1), comparator);
q.addAll(this);
s.defaultWriteObject();
} finally {
q = null;
lock.unlock();
}
}
之所以這樣做含蓉,是為了避免過多的null值被序列化。
5. 總結(jié)
到這里一些主要的方法都學(xué)習(xí)過了项郊,其他一些方法谴餐,大家有興趣的可以自己了解下,或者參考原先學(xué)習(xí)PriorityQueue時的內(nèi)容呆抑,而有關(guān)PriorityQueue原先學(xué)習(xí)的鏈接地址是:Java1.8-PriorityQueue源碼解析岂嗓。不過方法中最主要的還是隊列的上浮和下沉兩個操作。
本文參考除了包含官方文檔之外鹊碍,還包括:
Java 并發(fā) --- 阻塞隊列之PriorityBlockingQueuey源碼分析 - csdn.net
另外本文畫圖工具:ProcessOn