基本使用
@Test
public void testPriorityQueue() throws InterruptedException {
PriorityQueue priorityQueue = new PriorityQueue(Lists.newArrayList(5, 4, 2, 1, 3));
System.out.println(priorityQueue);
System.out.println(priorityQueue.poll());
System.out.println(priorityQueue.poll());
PriorityBlockingQueue<Integer> blockingQueue = new PriorityBlockingQueue<>();
blockingQueue.add(5);
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.take());
}
輸出
[1, 3, 2, 4, 5]
1
2
5
(阻塞)
PriorityQueue
成員變量
/**
* Priority queue represented as a balanced binary heap: the two
* children of queue[n] are queue[2*n+1] and queue[2*(n+1)]. The
* priority queue is ordered by comparator, or by the elements'
* natural ordering, if comparator is null: For each node n in the
* heap and each descendant d of n, n <= d. The element with the
* lowest value is in queue[0], assuming the queue is nonempty.
*/
transient Object[] queue; // non-private to simplify nested class access
/**
* The number of elements in the priority queue.
*/
private int size = 0;
/**
* The comparator, or null if priority queue uses elements'
* natural ordering.
*/
private final Comparator<? super E> comparator;
/**
* The number of times this priority queue has been
* <i>structurally modified</i>. See AbstractList for gory details.
*/
transient int modCount = 0; // non-private to simplify nested class access
通過數(shù)組實現(xiàn)一個堆客蹋,元素在queue數(shù)組中并不是完全有序的塞蹭,僅堆頂元素最大或最小。
基本方法
public E poll() {
if (size == 0)
return null;
int s = --size;
modCount++;
E result = (E) queue[0];
E x = (E) queue[s];
queue[s] = null;
if (s != 0)
siftDown(0, x);
return result;
}
/**
* Inserts item x at position k, maintaining heap invariant by
* demoting x down the tree repeatedly until it is less than or
* equal to its children or is a leaf.
*
* @param k the position to fill
* @param x the item to insert
*/
private void siftDown(int k, E x) {
if (comparator != null)
siftDownUsingComparator(k, x);
else
siftDownComparable(k, x);
}
@SuppressWarnings("unchecked")
private void siftDownComparable(int k, E x) {
Comparable<? super E> key = (Comparable<? super E>)x;
int half = size >>> 1; // loop while a non-leaf
while (k < half) {
int child = (k << 1) + 1; // assume left child is least
Object c = queue[child];
int right = child + 1;
if (right < size &&
((Comparable<? super E>) c).compareTo((E) queue[right]) > 0)
c = queue[child = right];
if (key.compareTo((E) c) <= 0)
break;
queue[k] = c;
k = child;
}
queue[k] = key;
}
以poll方法為例讶坯,實際上是獲取堆頂元素番电,然后調(diào)整堆。
調(diào)整堆的方法(以大頂堆為例):
- 判斷是否傳入comparator辆琅,有則按照comparator排序漱办,否則按照自然順序排序
- 取節(jié)點左右孩子節(jié)點最大值,與父親節(jié)點交換
擴容方法
/**
* Increases the capacity of the array.
*
* @param minCapacity the desired minimum capacity
*/
private void grow(int minCapacity) {
int oldCapacity = queue.length;
// Double size if small; else grow by 50%
int newCapacity = oldCapacity + ((oldCapacity < 64) ?
(oldCapacity + 2) :
(oldCapacity >> 1));
// overflow-conscious code
if (newCapacity - MAX_ARRAY_SIZE > 0)
newCapacity = hugeCapacity(minCapacity);
queue = Arrays.copyOf(queue, newCapacity);
}
private static int hugeCapacity(int minCapacity) {
if (minCapacity < 0) // overflow
throw new OutOfMemoryError();
return (minCapacity > MAX_ARRAY_SIZE) ?
Integer.MAX_VALUE :
MAX_ARRAY_SIZE;
}
- 小容量擴容1倍
- 大容量擴容0.5倍
- 快溢出時調(diào)整為Integer.MAX_VALUE - 8 或 Integer.MAX_VALUE
是否線程安全
非線程安全
PriorityBlockingQueue
其實現(xiàn)基本與PriorityQueue一致婉烟,不過PriorityBlockingQueue是線程安全的娩井,并且實現(xiàn)了BlockingQueue接口,在隊列為空時take會阻塞似袁。
/**
* Priority queue represented as a balanced binary heap: the two
* children of queue[n] are queue[2*n+1] and queue[2*(n+1)]. The
* priority queue is ordered by comparator, or by the elements'
* natural ordering, if comparator is null: For each node n in the
* heap and each descendant d of n, n <= d. The element with the
* lowest value is in queue[0], assuming the queue is nonempty.
*/
private transient Object[] queue;
/**
* The number of elements in the priority queue.
*/
private transient int size;
/**
* The comparator, or null if priority queue uses elements'
* natural ordering.
*/
private transient Comparator<? super E> comparator;
/**
* Lock used for all public operations
*/
private final ReentrantLock lock;
/**
* Condition for blocking when empty
*/
private final Condition notEmpty;
/**
* Spinlock for allocation, acquired via CAS.
*/
private transient volatile int allocationSpinLock;
/**
* A plain PriorityQueue used only for serialization,
* to maintain compatibility with previous versions
* of this class. Non-null only during serialization/deserialization.
*/
private PriorityQueue<E> q;
和PriorityQueue的區(qū)別:增加了
- 重入鎖ReentrantLock
- Condition洞辣,用于隊列空情況下的阻塞
- allocationSpinLock,通過CAS手段對queue擴容
private void tryGrow(Object[] array, int oldCap) {
lock.unlock(); // must release and then re-acquire main lock
Object[] newArray = null;
if (allocationSpinLock == 0 &&
UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
0, 1)) {
try {
int newCap = oldCap + ((oldCap < 64) ?
(oldCap + 2) : // grow faster if small
(oldCap >> 1));
if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow
int minCap = oldCap + 1;
if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
throw new OutOfMemoryError();
newCap = MAX_ARRAY_SIZE;
}
if (newCap > oldCap && queue == array)
newArray = new Object[newCap];
} finally {
allocationSpinLock = 0;
}
}
if (newArray == null) // back off if another thread is allocating
Thread.yield();
lock.lock();
if (newArray != null && queue == array) {
queue = newArray;
System.arraycopy(array, 0, newArray, 0, oldCap);
}
}
可以看到與PriorityQueue的擴容函數(shù)很像昙衅,不同點:
- 調(diào)用函數(shù)時必須持有鎖
- 使用CAS方法進行擴容扬霜,在allocationSpinLock為0,并且CAS將其置為1時而涉,線程才能夠?qū)?shù)組進行擴容著瓶。如果多個線程并發(fā)擴容,其余線程會調(diào)用Thread.yield()方法婴谱。
為什么這樣實現(xiàn)PriorityBlockingQueue擴容蟹但?
因為PriorityBlockingQueue內(nèi)部使用的ReentrantLock重入鎖,同一個線程多次調(diào)用add函數(shù)谭羔,可能恰好同時調(diào)用了tryGrow函數(shù)华糖。此時通過重入鎖是無法加鎖的,僅能通過Synchronized或CAS方式控制并發(fā)瘟裸。
allocationSpinLock是transient的客叉,因為序列化時并不需要此參數(shù);同時又是volatile的话告,因為可能有多個線程同時調(diào)用兼搏。
private transient volatile int allocationSpinLock;
UNSAFE.compareAndSwapInt
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long allocationSpinLockOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = PriorityBlockingQueue.class;
allocationSpinLockOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("allocationSpinLock"));
} catch (Exception e) {
throw new Error(e);
}
}
調(diào)用方法
UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset, 0, 1)
allocationSpinLockOffset是allocationSpinLock變量在PriorityBlockingQueue類中的偏移量。
那么使用allocationSpinLockOffset有什么好處呢沙郭?它和直接修改allocationSpinLock變量有什么區(qū)別佛呻?
獲取該字段在類中的內(nèi)存偏移量,直接將內(nèi)存中的值改為新值病线。直接修改allocationSpinLock并不是CAS吓著。JDK 1.8代碼如下:
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}
在AtomicInteger類中的調(diào)用如下,getAndAddInt方法由具體類的實現(xiàn)方法送挑,抽取到了UNSAFE類中:
public final int getAndDecrement() {
return unsafe.getAndAddInt(this, valueOffset, -1);
}
對比 PriorityQueue 和 PriorityBlockingQueue
- PriorityQueue是非線程安全的绑莺,PriorityBlockingQueue是線程安全的
- PriorityBlockingQueue使用重入鎖,每一個操作都需要加鎖
- PriorityBlockingQueue擴容時使用了CAS操作
- 兩者都使用了堆惕耕,算法原理相同
- PriorityBlockingQueue可以在queue為空時阻塞take操作
JDK實現(xiàn)堆的方法
/**
* Establishes the heap invariant (described above) in the entire tree,
* assuming nothing about the order of the elements prior to the call.
*/
@SuppressWarnings("unchecked")
private void heapify() {
for (int i = (size >>> 1) - 1; i >= 0; i--)
siftDown(i, (E) queue[i]);
}
private void siftDown(int k, E x) {
if (comparator != null)
siftDownUsingComparator(k, x);
else
siftDownComparable(k, x);
}
@SuppressWarnings("unchecked")
private void siftDownComparable(int k, E x) {
Comparable<? super E> key = (Comparable<? super E>)x;
int half = size >>> 1; // loop while a non-leaf
while (k < half) {
int child = (k << 1) + 1; // assume left child is least
Object c = queue[child];
int right = child + 1;
if (right < size &&
((Comparable<? super E>) c).compareTo((E) queue[right]) > 0)
c = queue[child = right];
if (key.compareTo((E) c) <= 0)
break;
queue[k] = c;
k = child;
}
queue[k] = key;
}
public boolean offer(E e) {
if (e == null)
throw new NullPointerException();
modCount++;
int i = size;
if (i >= queue.length)
grow(i + 1);
size = i + 1;
if (i == 0)
queue[0] = e;
else
siftUp(i, e);
return true;
}
private void siftUp(int k, E x) {
if (comparator != null)
siftUpUsingComparator(k, x);
else
siftUpComparable(k, x);
}
@SuppressWarnings("unchecked")
private void siftUpComparable(int k, E x) {
Comparable<? super E> key = (Comparable<? super E>) x;
while (k > 0) {
int parent = (k - 1) >>> 1;
Object e = queue[parent];
if (key.compareTo((E) e) >= 0)
break;
queue[k] = e;
k = parent;
}
queue[k] = key;
}