本文作者:王一飛,叩丁狼高級講師芙沥。原創(chuàng)文章优俘,轉(zhuǎn)載請注明出處京办。
概述
按api上的解釋,PriorityBlockingQueue 是有一個帶有優(yōu)先級級別的無界阻塞隊列帆焕,不支持null元素入列惭婿,并且要求隊列對象必須為可以比較對象。這點跟PriorityQueue類 類似叶雹,區(qū)別是PriorityBlockingQueue 帶有阻塞功能财饥。
PriorityBlockingQueue 出列具有優(yōu)先級之分,每次出列返回優(yōu)先級最高的元素浑娜。其底層通過是二叉樹最小堆實現(xiàn)佑力,這也導(dǎo)致了遍歷隊列時,獲取到的元素是無序的筋遭。
內(nèi)部結(jié)構(gòu):
public class PriorityBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
private transient Object[] queue; //存數(shù)據(jù)
private transient int size; //隊列大小
private transient Comparator<? super E> comparator; //隊列比較器
private final ReentrantLock lock; //線程安全鎖
private final Condition notEmpty; //鎖條件
...
}
基本使用
需求:學(xué)號越大打颤,優(yōu)先級越高
public class Student implements Comparable<Student>{
private int num; //學(xué)號
private String name;
public Student(int num, String name){
this.num = num;
this.name = name;
}
public int compareTo(Student o) {
return o.num - this.num;
}
public String toString() {
return "Student{num=" + num +", name='" + name +"}";
}
}
public class App {
public static void main(String[] args) throws InterruptedException {
//初始化隊列,容量為3
PriorityBlockingQueue<Student> queue = new PriorityBlockingQueue<Student>(3);
queue.offer(new Student(1, "zhangsan"));
queue.offer(new Student(4, "zhaoliu"));
queue.offer(new Student(3, "wangwu"));
queue.offer(new Student(2, "lisi")); //超過3漓滔,自動拓展
System.out.println(queue.take());
System.out.println(queue.take());
System.out.println(queue.take());
System.out.println(queue.take());
System.out.println(queue.take()); //超過容量编饺,阻塞
}
}
上面代碼可以大體看出PriorityBlockingQueue的特性,阻塞响驴,無界透且,帶優(yōu)先級的隊列。
源碼解析
構(gòu)造器
//空參數(shù),使用默認(rèn)的初始容量: 11
public PriorityBlockingQueue() {
this(DEFAULT_INITIAL_CAPACITY, null);
}
//單參數(shù)秽誊, 指定初始容量
public PriorityBlockingQueue(int initialCapacity) {
this(initialCapacity, null);
}
//2參數(shù)數(shù)鲸沮, 指定初始容量與額外的比較器
public PriorityBlockingQueue(int initialCapacity,
Comparator<? super E> comparator) {
if (initialCapacity < 1)
throw new IllegalArgumentException();
this.lock = new ReentrantLock();
this.notEmpty = lock.newCondition();
this.comparator = comparator;
this.queue = new Object[initialCapacity];
}
入列-offer
PriorityBlockingQueue 是一個無界的隊列,所以隨便插入不需要進(jìn)行邊界限制锅论,一直返回true
public boolean offer(E e) {
//不允許null入列
if (e == null)
throw new NullPointerException();
final ReentrantLock lock = this.lock;
lock.lock(); //加鎖
int n, cap; //操作臨時變量
Object[] array;
//判斷當(dāng)前隊列元素是否超過隊列容量讼溺,超過擴(kuò)容
while ((n = size) >= (cap = (array = queue).length))
tryGrow(array, cap); //嘗試擴(kuò)容,可能會失敗
try {
Comparator<? super E> cmp = comparator;
if (cmp == null)
//無比較器使用自然排序
siftUpComparable(n, e, array); //根據(jù)優(yōu)先級插入數(shù)據(jù)
else
//使用指定的比較器
siftUpUsingComparator(n, e, array, cmp);//根據(jù)優(yōu)先級插入數(shù)據(jù)
size = n + 1;
notEmpty.signal(); //隊列非空最易,喚醒出列阻塞線程
} finally {
lock.unlock(); //釋放鎖
}
//入列成功返回true
return true;
}
入列一個核心點隊列擴(kuò)容: tryGrow:
private void tryGrow(Object[] array, int oldCap) {
lock.unlock(); //出于性能考慮先釋放鎖
Object[] newArray = null; //擴(kuò)容后的數(shù)組
//allocationSpinLock 自旋鎖變量怒坯,默認(rèn)為0,需要經(jīng)過原子cas判斷之后才會改值此處控制單線程進(jìn)入if語句
if (allocationSpinLock == 0 &&
UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
0, 1)) {
try {
//擴(kuò)容大小藻懒,<64, +2, 如果大于64剔猿, + 50%, 封頂為int最大值-8
int newCap = oldCap + ((oldCap < 64) ?
(oldCap + 2) : // grow faster if small
(oldCap >> 1));
if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow
int minCap = oldCap + 1;
if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
throw new OutOfMemoryError();
newCap = MAX_ARRAY_SIZE;
}
//擴(kuò)容出新數(shù)組嬉荆,此處需要queue==array, 原因:已有線程擴(kuò)容成功不必再擴(kuò)容
if (newCap > oldCap && queue == array)
newArray = new Object[newCap];
} finally {
allocationSpinLock = 0;
}
}
//線程不滿足allocationSpinLock 自旋鎖變量的if判斷归敬,表示擴(kuò)容失敗,讓出cpu
if (newArray == null)
Thread.yield();
//數(shù)據(jù)拷貝员寇,不允許同時出列入列弄慰,需要獲取鎖
lock.lock();
//queue==array 如果已經(jīng)擴(kuò)容, 此處擴(kuò)容放棄
if (newArray != null && queue == array) {
queue = newArray;
System.arraycopy(array, 0, newArray, 0, oldCap);
}
}
總結(jié)一下上代碼蝶锋, tryGrow的最終的目的是進(jìn)行擴(kuò)容,先釋放鎖是出于性能考慮什往,比較擴(kuò)容操作需要消耗一定時間扳缕,而期間無法進(jìn)行入列出列,那隊列的并發(fā)性就大大打折扣了别威。但是躯舔,如果釋放鎖之后,那么擴(kuò)容的安全就需要另辟蹊徑了省古。
所以tryGrow方法采用了cas原子操作方式實現(xiàn):
if (allocationSpinLock == 0 &&
UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset, 0, 1)) {
.....
}
allocationSpinLock 變量最初為0粥庄, UNSAFE.compareAndSwapInt方法比較與置換allocationSpinLock =1, 因為compareAndSwapInt原子操作性豺妓,導(dǎo)致同一時刻只有一個線程進(jìn)入執(zhí)行if語句塊惜互。這樣也可以達(dá)到加鎖目的。
無法進(jìn)入if語句塊的線程琳拭,則執(zhí)行Thread.yield();語句训堆,讓cpu, 別占著茅坑不拉屎白嘁。
if (newArray == null)
Thread.yield();
當(dāng)幸運(yùn)線程執(zhí)行if語句塊之后坑鱼,新隊列空間已經(jīng)開辟好了, 接下來就是數(shù)據(jù)拷貝絮缅,這里就又得注意的鲁沥,為防止數(shù)據(jù)拷貝出亂子呼股,又得爭奪鎖,保證隊列數(shù)據(jù)安全画恰,所有需要重新加鎖彭谁。
lock.lock();
if (newArray != null && queue == array) {
queue = newArray;
System.arraycopy(array, 0, newArray, 0, oldCap);
}
入列另外一個核心點,數(shù)據(jù)存儲:siftUpComparable / siftUpUsingComparator
private static <T> void siftUpComparable(int k, T x, Object[] array) {
Comparable<? super T> key = (Comparable<? super T>) x;
while (k > 0) {
int parent = (k - 1) >>> 1;
Object e = array[parent];
if (key.compareTo((T) e) >= 0)
break;
array[k] = e;
k = parent;
}
array[k] = key;
}
private static <T> void siftUpUsingComparator(int k, T x, Object[] array,
Comparator<? super T> cmp) {
while (k > 0) {
int parent = (k - 1) >>> 1;
Object e = array[parent];
if (cmp.compare(x, (T) e) >= 0)
break;
array[k] = e;
k = parent;
}
array[k] = x;
}
PriorityBlockingQueue使用的是二叉樹最小堆的方式進(jìn)行存儲數(shù)據(jù)阐枣。
要想理解上面代碼马靠, 先得明白二叉樹最小堆操作概念:
1>二叉樹最小堆:是一種經(jīng)過排序的完全二叉樹,其中任一非終端節(jié)點的數(shù)據(jù)值均不大于其左子節(jié)點和右子節(jié)點的值蔼两。
2>使用數(shù)組來實現(xiàn)二叉堆最小堆甩鳄,如果把最新入列元素下標(biāo)設(shè)置 i,那么該節(jié)點的父節(jié)點是i/2额划。
3>如果父節(jié)點的值大于子節(jié)點子需要進(jìn)行交互妙啃。
有這個概念之后,我們來看 siftUpComparable代碼俊戳,進(jìn)入siftUpComparable方法揖赴,先算出父parent的索引,再判斷父節(jié)點跟入列數(shù)據(jù)比對抑胎,如果父節(jié)點數(shù)據(jù)小于入列數(shù)據(jù)跳過燥滑,尋找上一個父節(jié)點。再重復(fù)剛剛判斷即可阿逃,如果父節(jié)點數(shù)據(jù)大于入列數(shù)據(jù)铭拧,則交換。
出列-take
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); //加鎖
E result;
try {
//出列失敗恃锉,等待
while ( (result = dequeue()) == null)
notEmpty.await();
} finally {
lock.unlock();
}
return result;
}
出列核心方法dequeue:
private E dequeue() {
int n = size - 1;
if (n < 0)
return null;
else {
Object[] array = queue;
E result = (E) array[0]; //出列剔除數(shù)組第一個搀菩,也是最小堆的樹根
E x = (E) array[n];
array[n] = null;
Comparator<? super E> cmp = comparator;
//樹根出列之后,需要向上調(diào)整堆結(jié)構(gòu)破托,弄出新的最小堆肪跋。
if (cmp == null)
siftDownComparable(0, x, array, n);
else
siftDownUsingComparator(0, x, array, n, cmp);
size = n;
return result;
}
}
總結(jié)
PriorityBlockingQueue是一個無界阻塞隊列,出隊的元素是優(yōu)先級最高的元素土砂,而優(yōu)先級的規(guī)則可以自己指定州既,如果沒指定默認(rèn)使用自然趴下規(guī)則。
PriorityBlockingQueue內(nèi)部通過使用一個二叉樹最小堆算法來維護(hù)內(nèi)部數(shù)組瘟芝,這個數(shù)組是可擴(kuò)容的易桃,當(dāng)前元素個數(shù)>=最大容量時候會通過算法(小于64+2 大于64 + 50%)擴(kuò)容。
想獲取更多技術(shù)干貨锌俱,請前往叩丁狼官網(wǎng):http://www.wolfcode.cn/all_article.html