簡單介紹
ArrayBlockingQueue
是基于數(shù)組的有界阻塞隊列脚仔。
-
有界
指它不能夠存儲無限多數(shù)量的元素,在創(chuàng)建ArrayBlockingQueue
時舆绎,必須要給它指定一個隊列的大小 -
阻塞
指在添加 / 取走元素時鲤脏,當(dāng)隊列 沒有空間 / 為空的時候會阻塞,知道隊列有空間 / 有新的元素加入時再繼續(xù)
源碼解讀
屬性
- 隊列集合吕朵,是一個數(shù)組猎醇,用來存放元素
/** The queued items */
final Object[] items;
- 調(diào)用
take
,poll
努溃,peek
或者remove
方法所取元素的下標(biāo)位置
/** items index for next take, poll, peek or remove */
int takeIndex;
- 調(diào)用
put
硫嘶,offer
或者add
方法添加元素時,所添加的位置
/** items index for next put, offer, or add */
int putIndex;
- 隊列的所有元素數(shù)目
/** Number of elements in the queue */
int count;
- 全局鎖梧税,掌管所有訪問操作
/** Main lock guarding all access */
final ReentrantLock lock;
- 取元素操作的等待條件沦疾,如果隊列中沒有元素,會調(diào)用
notEmpty.await()
方法讓當(dāng)前線程處于等待狀態(tài)
/** Condition for waiting takes */
private final Condition notEmpty;
- 新增元素操作的等待條件第队,如果隊列中已經(jīng)滿元素曹鸠,會調(diào)用
notFull.await()
方法讓當(dāng)前線程處于等待狀態(tài)
/** Condition for waiting puts */
private final Condition notFull;
- 這個
Itrs
是迭代器和隊列之間數(shù)據(jù)共享的工具類(這塊代碼太多了有空再看把 - -||| )
transient Itrs itrs = null;
其他方法
- 參數(shù)自減的方法,就是當(dāng)傳入的參數(shù)
i
為0
時返回 數(shù)組長度減1
斥铺,否則返回i
減1
final int dec(int i) {
return ((i == 0) ? items.length : i) - 1;
}
- 返回對應(yīng)位置上的元素
@SuppressWarnings("unchecked")
final E itemAt(int i) {
return (E) items[i];
}
- 參數(shù)
v
的非空判斷
private static void checkNotNull(Object v) {
if (v == null)
throw new NullPointerException();
}
核心方法
向隊列中添加元素
add(e)
、offer(e)
和 put(e)
都是添加元素的方法坛善, add(e)
和 offer(e)
是無阻塞的添加晾蜘, put(e)
是阻塞添加
-
add(e)
方法,實際上調(diào)用了offer(e)
方法
public boolean add(E e) {
return super.add(e);
}
-
offe(e)
方法眠屎,將元素添加到BlockingQueue
里剔交,如果可以容納返回true
否則返回false
public boolean offer(E e) {
checkNotNull(e); // 檢查元素是否為 null
final ReentrantLock lock = this.lock;
lock.lock(); // 加鎖
try {
if (count == items.length) // 如果隊列已經(jīng)滿了返回 false
return false;
else { // 隊列還沒有滿,則添加到隊列中
enqueue(e); // 進隊
return true;
}
} finally {
lock.unlock(); // 釋放鎖
}
}
-
put(e)
方法改衩,將元素添加到BlockingQueue
里岖常,如果BlockQueue
沒有空間,則調(diào)用此方法的線程被阻塞葫督,直到BlockingQueue
里面有空間
public void put(E e) throws InterruptedException {
checkNotNull(e); // 檢查元素是否為 null
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); // 加鎖
try {
while (count == items.length)
notFull.await(); //如果隊列已經(jīng)滿了竭鞍,就阻塞(添加到 notFull 條件隊列中等待喚醒)
enqueue(e); // 如果隊列沒有滿直接添加
} finally {
lock.unlock(); // 釋放鎖
}
}
-
enqueue
進隊操作
private void enqueue(E x) {
// 獲取當(dāng)前數(shù)組
final Object[] items = this.items;
// 通過索引賦值
items[putIndex] = x;
// 如果當(dāng)前添加對象的位置 +1 等于 數(shù)組的長度板惑,也就是當(dāng)前對象的位置在數(shù)組的最后一個
// 那么下一個應(yīng)該從數(shù)組的第一個添加
if (++putIndex == items.length)
putIndex = 0;
count++;
// 喚醒正在等待獲取對象的線程
notEmpty.signal();
}
- 除了以上三種常用的添加方法之外,還有個帶超時時間的添加方法
offer(e, timeout, unit)
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
checkNotNull(e);
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length) {
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
enqueue(e);
return true;
} finally {
lock.unlock();
}
}
從隊列中取元素
-
poll()
方法偎快,取隊頭(首個)元素并刪除冯乘,沒有則返回null
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 如果隊列中有元素,則執(zhí)行 dequeue 操作晒夹,否則返回 null
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
-
take()
方法裆馒,如果隊列中有元素,則獲取并刪除丐怯,如果沒有元素喷好,則阻塞等待
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await(); // 如果隊列中沒有元素,則添加到 notEmpty 條件隊列中等待
return dequeue();
} finally {
lock.unlock();
}
}
-
poll(timeout, unit)
帶阻塞超時的取首個元素的方法
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
return dequeue();
} finally {
lock.unlock();
}
}
-
peek()
方法读跷,只取不刪梗搅,當(dāng)隊列中沒有元素時會返回null
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return itemAt(takeIndex);
} finally {
lock.unlock();
}
}
-
dequeue
出隊操作
private E dequeue() {
final Object[] items = this.items;
@SuppressWarnings("unchecked")
// 根據(jù)索引獲取對象
E x = (E) items[takeIndex];
// 當(dāng)前位置的對象被取走,位置就騰出來了
items[takeIndex] = null;
// 如果被取走的是數(shù)組的最后一個舔亭,那下一個要從第一個取
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
// 喚醒正在等待添加對象的線程
notFull.signal();
return x;
}
刪除隊列中某個元素
-
remove(o)
方法些膨,如果隊列為空或者沒有找到該元素返回false
,否則刪除元素并且返回true
public boolean remove(Object o) {
if (o == null) return false;
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count > 0) {
final int putIndex = this.putIndex;
int i = takeIndex; // 從取得位置開始钦铺,到添加的位置
do {
if (o.equals(items[i])) {
removeAt(i);
return true;
}
if (++i == items.length) // 若果判斷的位置到了隊列最末尾订雾,那下一個從第一個判斷
i = 0;
} while (i != putIndex);
}
return false;
} finally {
lock.unlock();
}
}
-
removeAt(index)
方法,刪除指定位置上的元素
void removeAt(final int removeIndex) {
final Object[] items = this.items;
if (removeIndex == takeIndex) { // 當(dāng)刪除的元素是下次取操作要取到的元素時矛洞,既隊頭元素
items[takeIndex] = null; // 刪除隊頭元素洼哎,并且 takeIndex 加 1
if (++takeIndex == items.length) // 如果刪除得是數(shù)組最后一個元素,則 takeIndex 從數(shù)組第一個元素開始
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
} else {
// 如果刪除的不是隊頭元素
// 則從刪除元素的后面一直到添加元素的位置(removeIndex ~ putIndex)期間的元素都要往前挪一個位置
// 取 putIndex 作為循環(huán)結(jié)束判斷條件
final int putIndex = this.putIndex;
for (int i = removeIndex;;) { // 順序往前挪一個位置
int next = i + 1;
if (next == items.length)// 當(dāng)循環(huán)到數(shù)組最后一個元素沼本,下一個元素應(yīng)該是數(shù)組第一個元素
next = 0;
if (next != putIndex) { // 如果查找的索引不等于要添加元素的索引噩峦,說明元素可以再移動
items[i] = items[next];
i = next;
} else { // 在 removeIndex 索引之后的元素都往前移動完畢后清空最后一個元素
items[i] = null;
this.putIndex = i;
break;
}
}
count--;
if (itrs != null)
itrs.removedAt(removeIndex);
}
notFull.signal();
}
啊,累了抽兆,不寫了 汪