ArrayBlockingQueue是JDK1.5開始concurrent包中提供的并發(fā)工具類允悦,是一個(gè)基于數(shù)組的有界的先進(jìn)先出隊(duì)列失受,如果加入元素時(shí)哎壳,數(shù)組已滿趴樱,或數(shù)組為空時(shí)馒闷,取元素都會阻塞當(dāng)前線程,是一個(gè)典型的生產(chǎn)者消費(fèi)者模式伊佃。類似的類還有LinkedBlockingQueue窜司,PriorityBlockingQueue。
繼承關(guān)系
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable
public interface BlockingQueue<E> extends Queue<E>
核心成員變量
final Object[] items; //保存元素
/** items index for next take, poll, peek or remove */
int takeIndex; //指向隊(duì)頭
/** items index for next put, offer, or add */
int putIndex; //指向隊(duì)尾
/** Number of elements in the queue */
int count; //隊(duì)中元素計(jì)數(shù)
/*
* Concurrency control uses the classic two-condition algorithm
* found in any textbook.
*/
/** Main lock guarding all access */
final ReentrantLock lock; //可重入鎖
/** Condition for waiting takes */
private final Condition notEmpty; //條件變量
/** Condition for waiting puts */
private final Condition notFull; //條件變量
構(gòu)造方法
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity]; //初始化指定大小的有界隊(duì)列
lock = new ReentrantLock(fair); //fair指示該鎖是否是公平的
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
添加元素:put方法
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await(); //添加元素時(shí)航揉,如果已滿塞祈,則阻塞當(dāng)前線程,等待在notFull這個(gè)條件變量上帅涂,等待notFull調(diào)用signal喚醒
enqueue(e); //此時(shí)隊(duì)列肯定未滿
} finally {
lock.unlock();
}
}
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x; //將元素加入隊(duì)尾
if (++putIndex == items.length) //將putIndex加1议薪,如果到達(dá)數(shù)組尾部尤蛮,則從0開始
putIndex = 0;
count++; //增加計(jì)數(shù)
notEmpty.signal(); //增加一個(gè)元素后,如果有線程等待在noEmpty條件變量上斯议,通知并喚醒一種一個(gè)線程产捞。
}
put方法首先判斷隊(duì)列是否已滿,如果已滿哼御,則阻塞當(dāng)前線程坯临,等待在notFull這個(gè)條件變量上,等待notFull調(diào)用signal喚醒恋昼。當(dāng)隊(duì)列非空時(shí)看靠,將該元素加入隊(duì)尾,增加元素計(jì)數(shù)液肌,喚醒因?yàn)檫M(jìn)行去操作時(shí)數(shù)組為空而等待在notEmpty上的線程挟炬,通知它此時(shí)隊(duì)列已經(jīng)有元素了,你可以進(jìn)行取操作了嗦哆。
取元素:take方法
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await(); //取元素但是隊(duì)列為空時(shí)谤祖,阻塞線程,等待在notEmpty上
return dequeue(); //此時(shí)老速,隊(duì)列肯定非空粥喜,進(jìn)行出隊(duì)操作
} finally {
lock.unlock();
}
}
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex]; //獲取隊(duì)頭元素
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--; //計(jì)數(shù)減1
if (itrs != null)
itrs.elementDequeued();
notFull.signal(); //通知因?yàn)樘砑釉氐顷?duì)列已滿時(shí)而阻塞的線程,隊(duì)列此時(shí)可插入了
return x; //返回隊(duì)頭元素
}
take方法和put方法類似烁峭,先檢查隊(duì)列是否為空容客,隊(duì)列為空時(shí),阻塞線程约郁,等待在notEmpty上缩挑。當(dāng)隊(duì)列非空時(shí),進(jìn)行出隊(duì)操作鬓梅。同時(shí)還要通知因?yàn)樘砑釉氐顷?duì)列已滿時(shí)而阻塞的線程供置,隊(duì)列此時(shí)可插入了。
支持原創(chuàng)绽快,轉(zhuǎn)載請注明出處芥丧。
github:https://github.com/gatsbydhn