因為后面要學(xué)習(xí)線程池祷膳,所以在在這里先分析下ArrayBlockingQueue,為以后做準(zhǔn)備屡立。
什么是ArrayBlockingQueue直晨?#####
首先通過名字我們可以理解到這是一個用Array實現(xiàn)的,操作是會產(chǎn)生阻塞的隊列。其實大概意思差不多勇皇。
ArrayBlockingQueue是一個由數(shù)組支持的有界阻塞隊列罩句。此隊列按 FIFO(先進(jìn)先出)原則對元素進(jìn)行排序。隊列的頭部 是在隊列中存在時間最長的元素敛摘。
Queue家族的成員:#####
上面這些是比較全的Queue體系门烂,但是我們常用的其實可能不多,主要有:
1.ArrayDeque, (數(shù)組雙端隊列)
2.PriorityQueue, (優(yōu)先級隊列)
3.ConcurrentLinkedQueue, (基于鏈表的并發(fā)隊列)
4.DelayQueue, (延期阻塞隊列)(阻塞隊列實現(xiàn)了BlockingQueue接口)
5.ArrayBlockingQueue, (基于數(shù)組的并發(fā)阻塞隊列)
6.LinkedBlockingQueue, (基于鏈表的FIFO阻塞隊列)
7.LinkedBlockingDeque, (基于鏈表的FIFO雙端阻塞隊列)
8.PriorityBlockingQueue, (帶優(yōu)先級的無界阻塞隊列)
9.SynchronousQueue (并發(fā)同步阻塞隊列)
這里主要看一下ArrayBlockingQueue的實現(xiàn)着撩。
初始化:#####
/**
* Creates an {@code ArrayBlockingQueue} with the given (fixed)
* capacity and the specified access policy.
*
* @param capacity the capacity of this queue
* @param fair if {@code true} then queue accesses for threads blocked
* on insertion or removal, are processed in FIFO order;
* if {@code false} the access order is unspecified.
* @throws IllegalArgumentException if {@code capacity < 1}
*/
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
這里可以看出來诅福,ArrayBlockingQueue內(nèi)部是通過一個Object數(shù)組和一個ReentrantLock實現(xiàn)的。同時ReentrantLock在使用時也提供了公平和非公平兩種拖叙。因為數(shù)組是有界的氓润,所以在數(shù)組為空和數(shù)組已滿兩種情況下需要阻塞線程,所以使用了Condition來實現(xiàn)線程的阻塞薯鳍。
添加元素的方法:#####
/**
* Inserts the specified element at the tail of this queue if it is
* possible to do so immediately without exceeding the queue's capacity,
* returning {@code true} upon success and {@code false} if this queue
* is full. This method is generally preferable to method {@link #add},
* which can fail to insert an element only by throwing an exception.
*
* @throws NullPointerException if the specified element is null
*/
public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)
return false;
else {
insert(e);
return true;
}
} finally {
lock.unlock();
}
}
首先offer方法是加鎖的咖气,在當(dāng)前的Array的count等于數(shù)組的容量時,也就是數(shù)組滿的時候返回false挖滤,如果沒滿崩溪,那么插入數(shù)據(jù)到Array中,最后解鎖返回true斩松。
之后是add:
public boolean add(E e) {
return super.add(e);
}
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
add方法其實也是使用了offer方法伶唯,但是不同的是,如果數(shù)組是滿的那么add會拋出IllegalStateException異常惧盹,add成功后會返回true乳幸。
最后一個添加元素的方法是put:
/**
* Inserts the specified element at the tail of this queue, waiting
* for space to become available if the queue is full.
*
* @throws InterruptedException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
insert(e);
} finally {
lock.unlock();
}
}
/**
* Inserts element at current put position, advances, and signals.
* Call only when holding lock.
*/
private void insert(E x) {
items[putIndex] = x;
putIndex = inc(putIndex);
++count;
notEmpty.signal();
}
在put的實現(xiàn)里用到了Condition—notFull,在put的時候钧椰,如果數(shù)組已經(jīng)滿了粹断,那么添加元素是不成功的(offer的實現(xiàn)),但是此時如果希望能等待數(shù)組有空間添加元素嫡霞,那么可以使用put瓶埋,如果數(shù)據(jù)已滿,那么在notFull上等待诊沪。如果有數(shù)組的元素移除的操作就會喚醒這個put养筒,讓元素能添加到數(shù)組中。同時在調(diào)用insert方法是會調(diào)用notEmpty.signal() 喚醒在notEmpty上等待的線程端姚。最后解鎖返回闽颇。
移除元素方法:#####
首先看poll方法:
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : extract();
} finally {
lock.unlock();
}
}
/**
* Extracts element at current take position, advances, and signals.
* Call only when holding lock.
*/
private E extract() {
final Object[] items = this.items;
E x = this.<E>cast(items[takeIndex]);
items[takeIndex] = null;
takeIndex = inc(takeIndex);
--count;
notFull.signal();
return x;
}
poll方法是從數(shù)組中彈出一個元素,但是如果當(dāng)前的數(shù)組內(nèi)沒有元素則直接返回null寄锐,如果有元素,那么返回指定的索引位置的元素,并刪除原來的元素橄仆。同時在彈出元素后喚醒等待在notFull上的線程剩膘。
之后看下take方法:
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return extract();
} finally {
lock.unlock();
}
}
如果我們希望在獲取元素的時候,如果沒有元素盆顾,我們希望線程阻塞指導(dǎo)有元素可取怠褐。那么這個實現(xiàn)就是take方法了。在take時您宪,如果當(dāng)前的數(shù)組內(nèi)沒有元素的話奈懒,那么線程等待在notEmpty上,直到insert是喚醒在notEmpty上等待的線程宪巨。
最后看一下peek方法:
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : itemAt(takeIndex);
} finally {
lock.unlock();
}
}
peek方法是比較簡單的磷杏,只是在數(shù)組存在元素的時候,獲取指定的索引的元素捏卓,但是不會移除這個元素极祸。
這里的takeIndex是指定的數(shù)組索引,代碼如下:
/**
* Circularly increment i.
*/
final int inc(int i) {
return (++i == items.length) ? 0 : i;
}
從上面的代碼可以看出來怠晴,數(shù)組內(nèi)的元素在添加元素的時候并沒有進(jìn)行元素移動的遥金,移動的是takeIndex,如果takeIndex==items.length蒜田,那么回到0.
而putInde的移動策略和takeIndex是一致的稿械。所以整個的添加和移除操作是一個環(huán)狀的操作過程:
以put和take為例:
在put的時候利用putIndex為索引進(jìn)行元素添加观谦,每一次put都會檢測count和數(shù)組容量寿冕,如果count小于數(shù)組容量戳杀,證明可以添加元素须肆,而添加以putindex為準(zhǔn)讹躯,如果putindex增長到數(shù)組容量然评,putindex回滾到0.此時添加元素又從頭開始尘惧。而takeIndex也是一樣炬转,在takeIndex等于數(shù)組容量的時候科阎,會回滾到0述吸,從頭開始take÷啾浚可以說ArrayBlockingQueue利用數(shù)組的index的有序性和ReentrantLock實現(xiàn)的蝌矛。保證了ArrayBlockingQueue線程安全性,但是ArrayBlockingQueue是有局限性的错英,容量需要在初始化的時候指定入撒,所以必須在初始化的時候就考慮好容量,否則會對使用的性能有很大的影響椭岩。