- ArrayBlockingQueue基于數(shù)組,先進(jìn)先出挡爵,從尾部插入到隊(duì)列荡含,從頭部開始返回咒唆。
- 線程安全的有序阻塞隊(duì)列,內(nèi)部通過“互斥鎖”保護(hù)競爭資源内颗。
- 指定時間的阻塞讀寫
- 容量可限制
定義
ArrayBlockingQueue繼承AbstractQueue钧排,實(shí)現(xiàn)了BlockingQueue敦腔,Serializable接口均澳,內(nèi)部元素使用Object[]數(shù)組保存。初始化時候需要指定容量ArrayBlockingQueue(int capacity)
符衔,ArrayBlockingQueue默認(rèn)會使用非公平鎖找前。
ArrayBlockingQueue只使用一把鎖,造成在存取兩種操作時會競爭同一把鎖判族,而使得性能相對低下躺盛。
add(E)方法和offer(E)
調(diào)用父類中的add方法,查看源碼可知父類中的add方法是調(diào)用offer方法實(shí)現(xiàn),所以查看offer方法源碼形帮,如下:
public boolean offer(E e) {
//檢查元素不為null
checkNotNull(e);
//加鎖槽惫,獨(dú)占鎖保護(hù)競態(tài)資源。
final ReentrantLock lock = this.lock;
lock.lock();
try {
//隊(duì)列已滿辩撑,返回false
if (count == items.length)
return false;
else {
//插入元素,返回true
insert(e);
return true;
}
} finally {
//釋放鎖
lock.unlock();
}
}
insert源碼如下:
private void insert(E x) {
//將元素添加到隊(duì)列中
items[putIndex] = x;
//putIndex表示下一個被添加元素的索引界斜,設(shè)置下一個被添加元素的索引,若隊(duì)列滿了合冀,就設(shè)置下一個被添加元素索引為0
putIndex = inc(putIndex);
//隊(duì)列的元素?cái)?shù)加1
++count;
//喚醒notEmpty上的等待線程,也就是取元素的線程各薇。
notEmpty.signal();
}
take()方法
public E take() throws InterruptedException {
//獲取獨(dú)占鎖,加鎖君躺,線程是中斷狀態(tài)的話會拋異常
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
//隊(duì)列為空峭判,會一直等待
while (count == 0)
notEmpty.await();
//取元素的方法
return extract();
} finally {
//釋放鎖
lock.unlock();
}
}
private E extract() {
final Object[] items = this.items;
E x = this.<E>cast(items[takeIndex]);
//取完之后,刪除元素
items[takeIndex] = null;
//設(shè)置下一個被取出的元素索引棕叫,若是最后一個元素林螃,下一個被取出的元素索引為0
takeIndex = inc(takeIndex);
//元素?cái)?shù)減1
--count;
//喚醒添加元素的線程
notFull.signal();
return x;
}
源碼分析
jdk1.7.0_71
//隊(duì)列元素
final Object[] items;
//下次被take,poll,remove的索引
int takeIndex;
//下次被put,offer,add的索引
int putIndex;
//隊(duì)列中元素的個數(shù)
int count;
//保護(hù)所有訪問的主鎖
final ReentrantLock lock;
//等待take鎖,讀線程條件
private final Condition notEmpty;
//等待put鎖,寫線程條件
private final Condition notFull;
ArrayBlockingQueue(int capacity) 給定容量和默認(rèn)的訪問規(guī)則初始化
public ArrayBlockingQueue(int capacity){}
ArrayBlockingQueue(int capacity, boolean fair)知道你跟容量和訪問規(guī)則
//fair為true,在插入和刪除時,線程的隊(duì)列訪問會阻塞,并且按照先進(jìn)先出的順序,false,訪問順序是不確定的
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
ArrayBlockingQueue(int capacity, boolean fair,Collection<? extends E> c) 指定容量,訪問規(guī)則,集合來初始化
public ArrayBlockingQueue(int capacity, boolean fair,
Collection<? extends E> c) {}
add(E e) 添加元素到隊(duì)列末尾,成功返回true,隊(duì)列滿了拋異常IllegalStateException
public boolean add(E e) {
return super.add(e);
}
offer(E e)添加元素到隊(duì)列末尾,成功返回true,隊(duì)列滿了返回false
public boolean offer(E e) {}
put(E e) 添加元素到隊(duì)列末尾,隊(duì)列滿了,等待.
public void put(E e) throws InterruptedException {}
offer(E e, long timeout, TimeUnit unit)添加元素到隊(duì)列末尾,如果隊(duì)列滿了,等待指定的時間
public boolean offer(E e, long timeout, TimeUnit unit){}
poll() 移除隊(duì)列頭
public E poll() {}
take() 移除隊(duì)列頭,隊(duì)列為空的話就等待
public E take() throws InterruptedException {}
poll(long timeout, TimeUnit unit)移除隊(duì)列頭,隊(duì)列為空,等待指定的時間
public E poll(long timeout, TimeUnit unit) throws InterruptedException {}
peek()返回隊(duì)列頭,不刪除
public E peek() {}
size()
public int size(){}
remainingCapacity() 返回?zé)o阻塞情況下隊(duì)列能接受容量的大小
public int remainingCapacity() {}
remove(Object o)從隊(duì)列中刪除元素
public boolean remove(Object o) {}
contains(Object o) 是否包含元素
public boolean contains(Object o) {}
toArray()
public Object[] toArray(){}
toArray(T[] a)
public <T> T[] toArray(T[] a) {}
toString()
public String toString(){}
clear()
public void clear(){}
drainTo(Collection<? super E> c)移除隊(duì)列中可用元素,添加到集合中
public int drainTo(Collection<? super E> c) {}
drainTo(Collection<? super E> c, int maxElements)移除隊(duì)列中給定數(shù)量的可用元素,添加到集合中
public int drainTo(Collection<? super E> c, int maxElements) {}
iterator() 返回一個迭代器
public Iterator<E> iterator() {
return new Itr();
}