ArrayBlockingQueue屬性與構(gòu)造方法
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
final Object[] items;
int takeIndex;
int putIndex;
int count;
final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;
...
}
ArrayBlockingQueue內(nèi)部是由Object[]數(shù)組實(shí)現(xiàn)的待秃。
takeIndex為隊(duì)列取出位置指針驻谆,putIndex為隊(duì)列插入位置指針。
同步操作依賴于ReentrantLock實(shí)現(xiàn)所踊,notEmpty為非空條件,notFull為非滿條件。
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
可以通過構(gòu)造方法的參數(shù)capacity指定數(shù)組長(zhǎng)度读处,參數(shù)fair指定ReentrantLock的公平還是非公平模式,即是否允許后來的操作插隊(duì)與剛喚醒的線程競(jìng)爭(zhēng)鎖唱矛。
public ArrayBlockingQueue(int capacity, boolean fair,
Collection<? extends E> c) {
this(capacity, fair);
final ReentrantLock lock = this.lock;
lock.lock(); // Lock only for visibility, not mutual exclusion
try {
int i = 0;
try {
for (E e : c) {
checkNotNull(e);
items[i++] = e;
}
} catch (ArrayIndexOutOfBoundsException ex) {
throw new IllegalArgumentException();
}
count = i;
putIndex = (i == capacity) ? 0 : i;
} finally {
lock.unlock();
}
}
第三個(gè)參數(shù) Collection<? extends E> c可以將現(xiàn)有集合插入ArrayBlockingQueue罚舱,如果集合的長(zhǎng)度大于參數(shù)capacity俊戳,會(huì)拋出ArrayIndexOutOfBoundsException。
阻塞插入 put
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock; //鎖
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
與LinkedBlockingQueue不同馆匿,ArrayBlockingQueue的插入與取出都是用同一個(gè)鎖抑胎,所以無法在插入的同時(shí)取出元素,吞吐量弱于LinkedBlockingQueue渐北。
當(dāng)隊(duì)列長(zhǎng)度達(dá)到上限阿逃,觸發(fā)notFull條件讓當(dāng)前線程掛起等待。當(dāng)notFull條件滿足赃蛛,執(zhí)行enqueue方法插入元素:
private void enqueue(E x) {
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();
}
putIndex即元素插入隊(duì)列的位置恃锉,當(dāng)元素插入后 putIndex自增如果等于隊(duì)列長(zhǎng)度上限,表示putIndex指針已越界呕臂,將putIndex置于0破托,這樣就能保證重復(fù)利用數(shù)組,而不是通過創(chuàng)建新數(shù)組來擴(kuò)容歧蒋。
然后將count自增1土砂,表示隊(duì)列元素增加一個(gè),最后通過notEmpty條件喚醒掛起等待消費(fèi)元素的線程谜洽。
阻塞取出 take
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
取出操作使用與插入操作相同的ReentrantLock保證同步塊的安全萝映,當(dāng)隊(duì)列長(zhǎng)度為0,觸發(fā)notEmpty條件讓線程掛起等待阐虚。當(dāng)隊(duì)列非空序臂,執(zhí)行dequeue方法取出元素:
private E dequeue() {
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();
return x;
}
取出元素的原理同插入是相同的,首先從takeIndex標(biāo)記的位置取出元素实束,接著將takeIndex自增奥秆,如果takeIndex此時(shí)等于隊(duì)列長(zhǎng)度上限,就將其置0從頭開始咸灿。
接著將count自減1构订,表示隊(duì)列的元素減少一個(gè)。
itrs 變量表示隊(duì)列的iterator析显,如果隊(duì)列元素減少一個(gè)通過elementDequeued方法同步更新iterator遍歷器保證線程安全鲫咽。
最后通過notFull條件喚醒一個(gè)等待非滿條件的線程執(zhí)行插入。
非阻塞插入
public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)
return false;
else {
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}
與阻塞插入的區(qū)別是谷异,當(dāng)隊(duì)列元素?cái)?shù)量已滿分尸,直接返回false。
非阻塞取出
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
與阻塞取出的區(qū)別是歹嘹,當(dāng)隊(duì)列沒有元素箩绍,直接返回null。
不超時(shí)阻塞插入
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
checkNotNull(e);
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length) {
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
enqueue(e);
return true;
} finally {
lock.unlock();
}
}
與阻塞插入的區(qū)別是尺上,等待notFull條件并非永久的材蛛,當(dāng)過了timeout長(zhǎng)度的時(shí)間后如果隊(duì)列還是滿的圆到,直接返回false。
不超時(shí)阻塞取出
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
return dequeue();
} finally {
lock.unlock();
}
}
與阻塞取出的區(qū)別是卑吭,等待notEmpty條件并非永久的芽淡,當(dāng)過了timeout長(zhǎng)度的時(shí)間后如果隊(duì)列還是空的,直接返回null豆赏。