ArrayBlockingQueue
類實(shí)現(xiàn)了BlockingQueue
接口锐涯。閱讀BlockingQueue
文本以獲取有關(guān)的更多信息。
ArrayBlockingQueue
是一個(gè)有界的阻塞隊(duì)列填物,它將元素存儲(chǔ)在數(shù)組內(nèi)部纹腌。有界意味著它無(wú)法存儲(chǔ)無(wú)限量的元素,它可以同時(shí)存儲(chǔ)的元素?cái)?shù)量有一個(gè)上限滞磺。你需要在實(shí)例化時(shí)設(shè)置上限升薯,之后無(wú)法更改,所以它和ArrayList
有些區(qū)別击困,不要因?yàn)樗鼈兊拿Q相似而將它們的功能混雜涎劈。
ArrayBlockingQueue
內(nèi)部是以FIFO(先入先出)次序來(lái)存儲(chǔ)元素的。隊(duì)列的頭部是在隊(duì)列中存活時(shí)間最長(zhǎng)的元素,而隊(duì)列的尾部是在隊(duì)列中存活時(shí)間最短的元素责语。
以下是實(shí)例化和使用ArrayBlockingQueue
的例子:
BlockingQueue queue = new ArrayBlockingQueue(1024);
queue.put("1");
Object object = queue.take();
這是一個(gè)使用Java 泛型的BlockingQueue
例子:
BlockingQueue<String> queue = new ArrayBlockingQueue<String>(1024);
queue.put("1");
String string = queue.take();
源碼
ArrayBlockingQueue
中使用了這幾個(gè)成員變量來(lái)保證操作炮障,其實(shí)內(nèi)部使用了一個(gè)循環(huán)數(shù)組,其中takeIndex和putIndex其實(shí)相當(dāng)于隊(duì)列的頭部和尾部坤候。
/** 使用數(shù)組保存元素 */
final Object[] items;
/** 下一個(gè)take胁赢,poll,peek或remove方法調(diào)用時(shí)訪問(wèn)此下標(biāo)的元素 */
int takeIndex;
/** 下一個(gè)put, offer, 或add方法調(diào)用時(shí)訪問(wèn)此下標(biāo)的元素 */
int putIndex;
/**隊(duì)列中的元素?cái)?shù)量 */
int count;
/** 保護(hù)所有操作的主鎖 */
final ReentrantLock lock;
/** 獲取元素的等待條件 */
private final Condition notEmpty;
/** 放置元素的等待條件 */
private final Condition notFull;
構(gòu)造函數(shù)如下:
/**
* 使用一個(gè)固定的數(shù)值和默認(rèn)的訪問(wèn)規(guī)則創(chuàng)建白筹,默認(rèn)是使用非公平鎖
*
* @param capacity the capacity of this queue
* @throws IllegalArgumentException if {@code capacity < 1}
*/
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
/**
* 使用一個(gè)固定的數(shù)值和指定的訪問(wèn)規(guī)則創(chuàng)建
*
* @param capacity the capacity of this queue
* @param fair if {@code true} then queue accesses for threads blocked
* on insertion or removal, are processed in FIFO order;
* if {@code false} the access order is unspecified.
* @throws IllegalArgumentException if {@code capacity < 1}
*/
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
/**
*使用一個(gè)固定的數(shù)值和指定的訪問(wèn)規(guī)則創(chuàng)建智末,并將給定集合中的元素
* 增加到隊(duì)列中,增加的順序是指定的集合迭代器的遍歷順序
*
* @param capacity the capacity of this queue
* @param fair if {@code true} then queue accesses for threads blocked
* on insertion or removal, are processed in FIFO order;
* if {@code false} the access order is unspecified.
* @param c the collection of elements to initially contain
* @throws IllegalArgumentException if {@code capacity} is less than
* {@code c.size()}, or less than 1.
* @throws NullPointerException if the specified collection or any
* of its elements are null
*/
public ArrayBlockingQueue(int capacity, boolean fair,
Collection<? extends E> c) {
this(capacity, fair);
final ReentrantLock lock = this.lock;
lock.lock(); // Lock only for visibility, not mutual exclusion
try {
final Object[] items = this.items;
int i = 0;
try {
for (E e : c)
items[i++] = Objects.requireNonNull(e);
} catch (ArrayIndexOutOfBoundsException ex) {
throw new IllegalArgumentException();
}
count = i;
putIndex = (i == capacity) ? 0 : i;
} finally {
lock.unlock();
}
}
增加操作
public boolean add(E e) {
return super.add(e);
}
public boolean add(E e) {
// 內(nèi)部重用offer方法
if (offer(e))
return true;
// 如果增加失敗徒河,拋出異常指示隊(duì)列已滿
else
throw new IllegalStateException("Queue full");
}
-------------------------------------------------------------------------
public boolean offer(E e) {
// 檢查是否是否為null系馆,如果是拋出NPE異常
Objects.requireNonNull(e);
// 加鎖。 此處使用final的原因是將成員變量賦值為局部變量顽照,
// 然后使用此變量就不需要經(jīng)過(guò)兩次訪問(wèn)由蘑,即先訪問(wèn)this,再
// 訪問(wèn)lock代兵,輕微提升程序性能尼酿,后面此種方法的使用也是一樣。
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 如果隊(duì)列滿了植影,返回false
if (count == items.length)
return false;
// 否則裳擎,加入隊(duì)列
else {
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}
public static <T> T requireNonNull(T obj) {
if (obj == null)
throw new NullPointerException();
return obj;
}
private void enqueue(E e) {
// assert lock.isHeldByCurrentThread();
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
// 插入元素
items[putIndex] = e;
if (++putIndex == items.length) putIndex = 0;
count++;
// 隨機(jī)通知一個(gè)等待的線程
notEmpty.signal();
}
-------------------------------------------------------------------------
// 阻塞方法
public void put(E e) throws InterruptedException {
Objects.requireNonNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 如果隊(duì)列已經(jīng),在notFull上阻塞自己等待通知
// 關(guān)于等待-通知機(jī)制已經(jīng)說(shuō)過(guò)很多次思币,此處不再多說(shuō)
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
-------------------------------------------------------------------------
// 超時(shí)方法
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
Objects.requireNonNull(e);
// 計(jì)算超時(shí)時(shí)間鹿响,轉(zhuǎn)換為納秒
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 如果隊(duì)列已滿,超時(shí)等待谷饿,如果時(shí)間用完惶我,返回false
while (count == items.length) {
if (nanos <= 0L)
return false;
nanos = notFull.awaitNanos(nanos);
}
enqueue(e);
return true;
} finally {
lock.unlock();
}
}
刪除操作
// 刪除指定元素
public boolean remove(Object o) {
if (o == null) return false;
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 如果隊(duì)列中存在元素
if (count > 0) {
final Object[] items = this.items;
// 注意此處精彩的循環(huán)使用,因?yàn)閮?nèi)部是一個(gè)循環(huán)數(shù)組
for (int i = takeIndex, end = putIndex,
to = (i < end) ? end : items.length;
; i = 0, to = end) {
for (; i < to; i++)
if (o.equals(items[i])) {
removeAt(i);
return true;
}
if (to == end) break;
}
}
return false;
} finally {
lock.unlock();
}
}
void removeAt(final int removeIndex) {
// assert lock.isHeldByCurrentThread();
// assert lock.getHoldCount() == 1;
// assert items[removeIndex] != null;
// assert removeIndex >= 0 && removeIndex < items.length;
final Object[] items = this.items;
// 如果刪除的是頭元素博投,只需修改頭元素下標(biāo)即可
if (removeIndex == takeIndex) {
// removing front item; just advance
items[takeIndex] = null;
if (++takeIndex == items.length) takeIndex = 0;
count--;
// 此處是為了保持迭代器與隊(duì)列的一致性
if (itrs != null)
itrs.elementDequeued();
} else {
// an "interior" remove
// slide over all others up through putIndex.
for (int i = removeIndex, putIndex = this.putIndex;;) {
int pred = i;
if (++i == items.length) i = 0;
// 如果已經(jīng)移到了最后一個(gè)元素绸贡,跳出循環(huán)
if (i == putIndex) {
items[pred] = null;
this.putIndex = pred;
break;
}
// 將元素前移一位
items[pred] = items[i];
}
count--;
if (itrs != null)
itrs.removedAt(removeIndex);
}
notFull.signal();
}
-------------------------------------------------------------------------
public E remove() {
// 重用poll方法,如果隊(duì)列為空贬堵,拋出異常
E x = poll();
if (x != null)
return x;
else
throw new NoSuchElementException();
}
-------------------------------------------------------------------------
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
private E dequeue() {
// assert lock.isHeldByCurrentThread();
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
// 獲取頭元素恃轩,因?yàn)槭褂肙bject[]保存,所以要進(jìn)行類型轉(zhuǎn)換
// 因?yàn)橹荒茉黾又付愋偷脑乩枳觯钥梢源_保類型轉(zhuǎn)換一定
// 會(huì)成功叉跛,抑制此非受檢警告
@SuppressWarnings("unchecked")
E e = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length) takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();
return e;
}
-------------------------------------------------------------------------
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
-------------------------------------------------------------------------
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0) {
if (nanos <= 0L)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
return dequeue();
} finally {
lock.unlock();
}
}
阻塞方法以及超時(shí)方法和增加操作一樣,此處不多做講解蒸殿。
訪問(wèn)操作
// element()方法在AbstractQueue<E>類中筷厘,ArrayBlockingQueue繼承自此類
public E element() {
// 重用peek方法鸣峭,如果隊(duì)列為空拋出異常
E x = peek();
if (x != null)
return x;
else
throw new NoSuchElementException();
}
-------------------------------------------------------------------------
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return itemAt(takeIndex); // null when queue is empty
} finally {
lock.unlock();
}
}
final E itemAt(int i) {
return (E) items[i];
}
輔助方法
部分方法邏輯簡(jiǎn)單,有興趣自己查看即可酥艳。
public void clear() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
int k;
// 如果隊(duì)列中存在元素摊溶,清空隊(duì)列
if ((k = count) > 0) {
circularClear(items, takeIndex, putIndex);
takeIndex = putIndex;
count = 0;
// 使迭代器保持一致
if (itrs != null)
itrs.queueIsEmpty();
// 如果有線程等待插入元素,喚醒
for (; k > 0 && lock.hasWaiters(notFull); k--)
notFull.signal();
}
} finally {
lock.unlock();
}
}
// 將存在的元素全部置為null即可充石,等待 gc回收它們莫换,此時(shí)等于清空了隊(duì)列。
private static void circularClear(Object[] items, int i, int end) {
// assert 0 <= i && i < items.length;
// assert 0 <= end && end < items.length;
for (int to = (i < end) ? end : items.length;
; i = 0, to = end) {
for (; i < to; i++) items[i] = null;
if (to == end) break;
}
}
-------------------------------------------------------------------------
public int drainTo(Collection<? super E> c) {
// 重用drainTo(Collection<? super E> c, int maxElements)方法
return drainTo(c, Integer.MAX_VALUE);
}
public int drainTo(Collection<? super E> c, int maxElements) {
Objects.requireNonNull(c);
// 如果指定的集合是自己骤铃,拋出異常拉岁,符合BlockingQueue接口文檔中的定義
if (c == this)
throw new IllegalArgumentException();
if (maxElements <= 0)
return 0;
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 獲取需要轉(zhuǎn)移的元素?cái)?shù)量
int n = Math.min(maxElements, count);
int take = takeIndex;
int i = 0;
try {
// 通過(guò)直接訪問(wèn)數(shù)組,比重復(fù)調(diào)用poll()方法再增加性能會(huì)高很多
while (i < n) {
@SuppressWarnings("unchecked")
E e = (E) items[take];
c.add(e);
items[take] = null;
if (++take == items.length) take = 0;
i++;
}
return n;
} finally {
// Restore invariants even if c.add() threw
// 做一些處理工作
if (i > 0) {
count -= i;
takeIndex = take;
if (itrs != null) {
if (count == 0)
itrs.queueIsEmpty();
else if (i > take)
itrs.takeIndexWrapped();
}
for (; i > 0 && lock.hasWaiters(notFull); i--)
notFull.signal();
}
}
} finally {
lock.unlock();
}
}
核心要點(diǎn)
- 內(nèi)部使用了一個(gè)循環(huán)數(shù)組
- 是一個(gè)有界數(shù)組惰爬,提供了容量后無(wú)法被更改
- 可以指定鎖的公平性