1. PriorityBlockingQueue定義
PriorityBlockingQueue 是基于 二叉堆, ReentrantLock, Condition 實現(xiàn)的并發(fā)安全的優(yōu)先級隊列.
主要有以下特點:
- 數(shù)據(jù)容量沒有界限(最大值 Integer.MAX_VALUE - 8)
- 居于ReentrantLock 實現(xiàn)并發(fā)安全, 基于 Condition 實現(xiàn)線程等待喚醒
- 數(shù)據(jù)底層存放在居于數(shù)組實現(xiàn)的二叉堆上, 注意這里沒有實現(xiàn)堆排序, 只是每次有數(shù)據(jù)變更時將最小/大放在了堆的最上面的節(jié)點上(PS: 不了解二叉堆的請戳這里)
2. 初始化方法 堆化(heapify)
實現(xiàn)思路: 從堆的最后一個 parent 開始, 將最小/大值放在 parent位置, 直到最頂層的parent為止舔涎;
直接看代碼
/**
* Establishes the heap invariant (described above) in the entire tree
* assuming nothing about the order of the elements prior to the call
*/
private void heapify(){
/**
* 將這個數(shù)組進行 堆化 (heapify)
*
*/
Object[] array = queue;
int n = size;
int half = (n >>> 1) -1; // 1. 這里有個注意點 n 是數(shù)組的 length,
Comparator<? super E> cmp = comparator; // 2. 獲取 比較器, 若這里的 comparator是空, 則用元素自己實現(xiàn)的比較接口進行比較
if(cmp == null){
for(int i = half; i >= 0; i--){ // 3. 從整個數(shù)組的最后一顆樹開始, 將二叉樹的最小值放置在parent位置, 一直到最上面的那顆二叉樹
siftDownComparable(i, (E)array[i], array, n);
}
}else{
for(int i = half; i >= 0; i--){
siftDownUsingComparator(i, (E)array[i], array, n, cmp);
}
}
// 4. 經(jīng)過這個 heapify 方法后, 整個二叉堆中的最小值已經(jīng)放在的 index=0 的位置上(注意: 這時不保證 左子樹一定小于右子樹)
// 5. 若要進行二叉堆的排序, 則需要將 index=0的位置排查在外 從 index= 1的位置開始, 到最后一個位置, 再進行上面的操作
// 其實思路就是 每次將最小值放在數(shù)組的最上面, 然后排除這個節(jié)點在外, 將下面的數(shù)組作為一個整體, 然后重復(fù)上面的步驟, 直到最后一個元素
}
這個方法其實從最后一個 parent 開始進行與子節(jié)點比較, 將最小/大值放在 parent 位置, 直到 頂層的 parent 為止
我們發(fā)現(xiàn)代碼中有個 siftDownComparable 方法, 這是實現(xiàn) 堆化的重要步驟
/**
* Inserts item x at position k, maintaining heap invariant by
* demoting x down the tree repeatedly until it is less than or
* equal to its children or is a leaf
*
* @param k the position to fill
* @param x the item to insert
* @param array the heap array
* @param n the heap array
* @param <T>
*/
private static <T> void siftDownComparable(int k, T x, Object[] array, int n){
/**
* 從整個數(shù)組的 k 位置開始向下進行 比較更換操作
* 1. 獲取這個數(shù)組的中間值(大于等于它其實就是說已經(jīng)沒有子節(jié)點)
* 舉例: 數(shù)組 array 含有元素 : 1,2,3,4,5,6,7,8,9,10 共10個元素
* 其中的之間 half = n >>> 1 = 10 >>> 1 = 5; (就是下面代碼中的 half, 堆中所有parent的 index 均小于 5)
* 而最大 parent 的index 是 : (9 - 1) >>> 1 = 4;
* 再parent調(diào)整好后, 再下面的代碼中獲取的 k 就變成 9/10, 但是 9/10 > 5 (就是下面代碼的 while(k < half))
* 2. 從k位子開始不斷向下比較, 將最小值放到 parent位置, 直到 k >= half
* 3. 經(jīng)過這個方法比較后, 從k往下 都是最小值上parent上的一個棵二叉樹
*/
if(n > 0){
Comparable<? super T> key = (Comparable<? super T>)x;
int half = n >>> 1; // 1. 獲取整個數(shù)組的中間坐標(biāo)
while(k < half){ // 2. k這里其實表示 parent 在數(shù)組中的 index, k >= half 其實就說明 k 在數(shù)組中已經(jīng)沒有子節(jié)點
int child = (k << 1) + 1; // 3. 獲取 k 的左子節(jié)點的 index
Object c = array[child]; // 4. 獲取左子節(jié)點的值
int right = child + 1; // 5. 獲取右子節(jié)點的 index
if(right < n && // 6. 這個 if 判斷其實是 判斷左右子節(jié)點的大小, 并且找到其中的最小值, 賦值給 c;
((Comparable<? super T>)c).compareTo((T)array[right]) > 0
){
c = array[child = right];
}
if(key.compareTo((T)c) <= 0){ // 7. key <= c 則說明, 進行下面 sift 已經(jīng)完成 (父節(jié)點k已經(jīng)小于等于子節(jié)點), 直接 break 出
break;
}
array[k] = c; // 8. 代碼運行到這里說明 k > c惶看, 則將子數(shù)據(jù)c賦值到k的位置
k = child; // 9. 將上次的子節(jié)點 child作為父節(jié)點, 再次下面進行比較, 直到 k >= half
}
array[k] = key; // 10. 將key值賦值給最后一次進行 siftdown 比較的 父節(jié)點上
}
}
操作思路:
從整個數(shù)組的 k 位置開始向下進行 比較更換操作
1. 獲取這個數(shù)組的中間值(大于等于它其實就是說已經(jīng)沒有子節(jié)點)
舉例: 數(shù)組 array 含有元素 : 1,2,3,4,5,6,7,8,9,10 共10個元素
其中的之間 half = n >>> 1 = 10 >>> 1 = 5; (就是下面代碼中的 half, 堆中所有parent的 index 均小于 5)
而最大 parent 的index 是 : (9 - 1) >>> 1 = 4;
再parent調(diào)整好后, 再下面的代碼中獲取的 k 就變成 9/10, 但是 9/10 > 5 (就是下面代碼的 while(k < half))
2. 從k位子開始不斷向下比較, 將最小值放到 parent位置, 直到 k >= half
3. 經(jīng)過這個方法比較后, 從k往下 都是最小值上parent上的一個棵二叉樹
3. 添加元素 offer 方法
主要思路: 將添加的元素放置到數(shù)組的最尾端, 然后調(diào)用 siftUp 進行向上調(diào)整
/**
* Inserts the specified element into this priority queue
* As the queue is unbounded, his method will never return {@code false}
*
* @param e the lement to add
* @return {@code true} (as specified element cannot be compared
* with elements currently in the priority queue according to the
* priority queue's ordering)
* @throws NullPointerException if the specified element is null
*/
@Override
public boolean offer(E e) {
if(e != null){
throw new NullPointerException();
}
final ReentrantLock lock = this.lock; // 1. 獲取全局共享的鎖
lock.lock();
int n, cap;
Object[] array; // 2. 判斷容器是否需要擴容
while((n = size) >= (cap = (array = queue).length)){
tryGrow(array, cap); // 3. 進行擴容操作
}
try{
Comparator<? super E> cmp = comparator;
if(cmp == null){ // 4. 進行 保持 heap 性質(zhì)的 siftUp 操作
siftUpComparable(n, e, array);
}else{
siftUpUsingComparator(n, e, array, cmp);
}
size = n + 1; // 5. 數(shù)據(jù)插入后, 整個容量值 + 1;
notEmpty.signal(); // 6. Condition 釋放信號, 告知其他等待的線程: 容器中已經(jīng)有元素
}finally {
lock.unlock(); // 7. 釋放鎖
}
return true;
}
在代碼中我們看到了 tryGrow, 這個調(diào)整堆存儲空間的方法
在里面運用了 先進行鎖的釋放 lock.unlock, 然后 根據(jù) allocationSpinLock 這個指標(biāo)判斷是否其他線程在進行擴容, 基本上每次擴容都是 * 1.5
/**
* Tries to grow array to accommodate at least one more element
* (but normally expend by about 50%), giving up (allowing retry)
* on contention (which we expect to be race). Call only this while
* holding lock
*
* @param array the heap array
* @param oldCap the length of the array
*/
private void tryGrow(Object[] array, int oldCap){
/**
* tryGrow 數(shù)組容量擴容操作
* 整個方法的執(zhí)行是在已經(jīng) ReentrantLock 獲取鎖的情況下進行的
*/
lock.unlock(); // must release and then re-acquire main lock // 1. 釋放全局的鎖(為什么呢? 原因也非常簡單, 這個 lock 是全局方法共享的, 為的是更好的并發(fā)性能, 而擴容操作的并發(fā)是通過簡單的樂觀鎖 allocationSpinLock 來進行控制de)
Object[] newArray = null;
if(allocationSpinLock == 0 && // 2. 居于CAS操作, 在 allocationSpinLock 實現(xiàn)樂觀鎖, 這個也是為了在擴容時不影響容器的其他并發(fā)操作
unsafe.compareAndSwapInt(this, allocationSpinLockOffset, 0, 1)){
try{
int newCap = oldCap + ((oldCap < 64)? // 3. 容量若小于 64則直接 double + 2; 大于的話, 直接 * 1.5
(oldCap + 2): // grow faster if small
(oldCap >> 1)
);
if(newCap - MAX_ARRAY_SIZE > 0){ // possible overflow
int minCap = oldCap + 1; // 4. 擴容后超過最大容量處理
if(minCap < 0 || minCap > MAX_ARRAY_SIZE){
throw new OutOfMemoryError();
}
newCap = MAX_ARRAY_SIZE;
}
if(newCap > oldCap && queue == array){ // 5. queue == array 若數(shù)組沒變化, 直接進行新建數(shù)組
newArray = new Object[newCap];
}
}finally {
allocationSpinLock = 0;
}
}
// 6. newArray == null 說明上面的操作過程中, 有其他的線程進行了擴容的操作
if(newArray == null){ // back off if another thread is allocating
Thread.yield(); // 7. 讓出 CPU 調(diào)度(因為其他線程擴容后必定有其他的操作)
}
lock.lock(); // 8. 重新獲取鎖
if(newArray != null && queue == array){ // 9. 判斷數(shù)組 queue 有沒有在其他線程中變化過
queue = newArray; // 10. 未變化, 直接進行賦值操作
System.arraycopy(array, 0, newArray, 0, oldCap);
}
}
在進行offer元素時主要還調(diào)用了 siftUpComparable 方法
思路: 將元素與上面的 parent 進行比較, 直到 parent >= 這個元素
/**
* Insert item x at position k, maintaining heap invariant by
* promoting x up the tree until it is greater than or equal to
* its parent, or is the root
*
* To simplify and speed up coercions and comparisons. the
* Comparable and Comparator versions are separated into different
* method that are otherwise identical. (Similarly for siftDown)
* These methods are statics, with heap state as arguments, to
* simplify use in light og possible comparator exceptions
*
* @param k the position to fill
* @param x the item to insert
* @param array the heap array
* @param <T>
*/
private static <T> void siftUpComparable(int k, T x, Object[] array){
/**
* 簡單的 siftUp 操作: 大體操作就是將元素x放置到k位置, 然后對k的parent進行比較, 直到 k>=parent為止
*/
Comparable<? super T> key = (Comparable<? super T>)x;
while(k > 0){ // 1. k是否到達二叉樹的頂端
int parent = (k - 1) >>> 1; // 2. 尋找 k 的parent位置
Object e = array[parent]; // 3. 獲取parent的值
if(key.compareTo((T)e) >= 0){ // 4. key >= e說明 parent >=子節(jié)點, 則不需要 siftUp 操作
break;
}
array[k] = e; // 5. 將上次比較中 parent節(jié)點的值放在子節(jié)點上
k = parent; // 6. 將這次比較中的 parent 當(dāng)作下次比價的k(k是下次比較的子節(jié)點)
}
array[k] = key; // 7. 將值key放置合適的位置上
}
4. 刪除元素 poll 方法
思路: 將元素的首節(jié)點拿出, 作為返回, 末尾節(jié)點放置到 index=0位置, 開始 siftDown直到 滿足 parent >= child
@Override
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return dequeue();
} finally {
lock.unlock();
}
}
private E dequeue(){
int n = size - 1;
if(n < 0){ // 1. 判斷元素是否未空
return null; // 2. 容器中沒有元素, 直接返回 null
}
else{
Object[] array = queue;
E result = (E)array[0]; // 3. 取出數(shù)組中的第一個元素, 作為返回值
E x = (E)array[n]; // 4. 將數(shù)組的最后一個元素取出
array[n] = null;
Comparator<? super E> cmp = comparator;
if(cmp == null){ // 5. 將剛才取出的數(shù)組中最后一個元素放到第一個index位置, 進行siftDown操作(就是向下堆化操作)
siftDownComparable(0, x, array, n);
}else{
siftDownUsingComparator(0, x, array, n, cmp);
}
size = n; // 6. 重新賦值 size值
return result; // 7. 返回取出的值
}
}
4. 刪除元素 remove 方法
思路:
- 尋找待刪除元素在數(shù)組中的位置
- 刪除待刪除的元素, 將數(shù)組中的最后一個元素放置到這個位置, 先進行 siftDown 尋找合適放置的位置, 然后再siftUp 尋找合適的放置位置
/**
* Remove a single instance of the specified element from this queue,
* if it is present. More formally, removes an element {@code e} such
* that {@code o.equal(e)}, if this queue contains one or more such
* elements. Returns {@code true} if and onlu if this queue contained
* the specified element (or equivalently, if this queue changed as
* a result of the call)
*
* @param o element to removed from this queue, if present
* @return {@code true} if this queue changed as a result of the call
*/
public boolean remove(Object o){
/**
* 刪除堆中對應(yīng)的元素
*/
final ReentrantLock lock = this.lock;
lock.lock();
try{
int i = indexOf(o); // 1. 找出元素 o 在堆中的位置
if(i == -1){
return false;
}
removeAt(i); // 2. 調(diào)用 removeAt 定點刪除元素
return true;
}finally {
lock.unlock();
}
}
/**
* Removes the ith element from queue
* @param i
*/
private void removeAt(int i){
/**
* 刪除堆中指定位置的元素
*/
Object[] array = queue;
int n = size - 1;
if(n == i){ // remove last lement // 1. 若元素是末尾元素, 則直接進行刪除操作
array[i] = null;
}else{
E moved = (E)array[n]; // 2. 獲取待堆中最后的值(這個不是最大值)
array[n] = null; // 3. 將對應(yīng)元素置空
Comparator<? super E> cmp = comparator;
if(cmp == null){
siftDownComparable(i, moved, array, n); // 4. 將最后值 moved 放在 i 位置進行 siftDown
}else{
siftDownUsingComparator(i, moved, array, n, cmp);
}
if(array[i] == moved){ // 5. array[i] = moved 說明 siftDown 沒起作用, 節(jié)點 moved可能應(yīng)該在堆上面的位置, 所以進行 siftUp, 從而將 moved 放在上面堆中某個位置
if(cmp == null){
siftUpComparable(i, moved, array);
}else{
siftUpUsingComparator(i, moved, array, cmp);
}
}
}
size = n; // 6. 進行size重新賦值操作
}
總結(jié): PriorityBlockingQueue 是一個基于二叉堆實現(xiàn)的優(yōu)先級隊列, 與其他隊列不同的是它支持高優(yōu)先級的數(shù)據(jù)線poll出, 在有優(yōu)先級需要的生產(chǎn)者消費者場景中用這個類比較合適.