1 ArrayBlockingQueue
ArrayBlockingQueue是一個(gè)阻塞隊(duì)列佑女,底層使用數(shù)組結(jié)構(gòu)實(shí)現(xiàn),按照先進(jìn)先出(FIFO)的原則對(duì)元素進(jìn)行排序痴柔。
ArrayBlockingQueue是一個(gè)線程安全的集合裳涛,通過(guò)ReentrantLock鎖來(lái)實(shí)現(xiàn),在并發(fā)情況下可以保證數(shù)據(jù)的一致性迁筛。
此外煤蚌,ArrayBlockingQueue的容量是有限的,數(shù)組的大小在初始化時(shí)就固定了细卧,不會(huì)隨著隊(duì)列元素的增加而出現(xiàn)擴(kuò)容的情況尉桩,也就是說(shuō)ArrayBlockingQueue是一個(gè)“有界緩存區(qū)”。
在下面圖片中贪庙,以數(shù)組形式展示了一個(gè)ArrayBlockingQueue:
當(dāng)向隊(duì)列插入元素時(shí)蜘犁,首先會(huì)插入到數(shù)組的0角標(biāo)處,再有新元素進(jìn)來(lái)時(shí)止邮,依次類推这橙,角標(biāo)1奏窑、角標(biāo)2、角標(biāo)3析恋。
整個(gè)item[]就是一個(gè)隊(duì)伍良哲,我們用時(shí)間來(lái)排序,展示入隊(duì)場(chǎng)景助隧。
而當(dāng)有元素出隊(duì)時(shí)筑凫,先移除角標(biāo)為0的元素,與入隊(duì)一樣并村,依次類推巍实,移除角標(biāo)1、角標(biāo)2...上的元素哩牍。
這也形成了“先進(jìn)先出”棚潦。
接下來(lái),我們來(lái)看看ArrayBlockingQueue的源碼實(shí)現(xiàn)膝昆!
- 構(gòu)造方法
在多線程中丸边,默認(rèn)不保證線程公平的訪問(wèn)隊(duì)列。
什么叫做公平訪問(wèn)隊(duì)列荚孵?我們都知道妹窖,在ArrayBlockingQueue中為了保證數(shù)據(jù)的安全,使用了ReentrantLock鎖收叶。由于鎖的引入骄呼,導(dǎo)致了線程之間的競(jìng)爭(zhēng)。當(dāng)有一個(gè)線程獲取到鎖時(shí)判没,其余線程處于等待狀態(tài)蜓萄。當(dāng)鎖被釋放時(shí),所有等待線程為奪鎖而競(jìng)爭(zhēng)澄峰。
而所謂的公平訪問(wèn)嫉沽,就是等待的線程在獲取鎖而競(jìng)爭(zhēng)時(shí),按照等待的先后順序進(jìn)行獲取操作俏竞,先等待的先獲取耻蛇,后等待的后獲取。
而非公平訪問(wèn)胞此,就是在獲取時(shí)候,無(wú)論是先等待還是后等待的線程跃捣,均有可能獲取到鎖漱牵。
在ArrayBlockingQueue中,由于公平鎖會(huì)降低隊(duì)列的性能疚漆,因而使用非公平鎖(默認(rèn))酣胀。
是否公平刁赦,根據(jù)ReentrantLock對(duì)象來(lái)實(shí)現(xiàn)---ReentrantLock lock = new ReentrantLock(false),具體看下構(gòu)造便可得知闻镶。
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
//隊(duì)列實(shí)現(xiàn):數(shù)組
final Object[] items;
//當(dāng)讀取元素時(shí)數(shù)組的下標(biāo)(下一個(gè)被添加元素的索引)
int takeIndex;
//添加元素時(shí)數(shù)組的下標(biāo) (下一個(gè)被取出元素的索引)
int putIndex;
//隊(duì)列中元素個(gè)數(shù):
int count;
//鎖:
final ReentrantLock lock;
//控制take()操作時(shí)是否讓線程等待
private final Condition notEmpty;
//控制put()操作時(shí)是否讓線程等待
private final Condition notFull;
//初始化隊(duì)列容量構(gòu)造:
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
//帶初始容量大小和公平鎖隊(duì)列(公平鎖通過(guò)ReentrantLock實(shí)現(xiàn)):
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
}
- 插入元素
在ArrayBlockingQueue中甚脉,提供了兩種不同形式的元素插入--阻塞式和非阻塞式。
對(duì)于阻塞式插入來(lái)說(shuō)铆农,當(dāng)隊(duì)列中的元素已滿時(shí)牺氨,則會(huì)將此線程停止,讓其處于等待狀態(tài)墩剖,直到隊(duì)列中有空余位置產(chǎn)生猴凹。
//向隊(duì)列尾部添加元素,如果隊(duì)列滿了岭皂,則線程等待
public void put(E e) throws InterruptedException {
//不能插入非空元素,會(huì)拋出異常
checkNotNull(e);
//上鎖郊霎,保證數(shù)據(jù)安全
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
//隊(duì)列中元素 == 數(shù)組長(zhǎng)度(隊(duì)列滿了),則線程等待
while (count == items.length)
notFull.await();
//添加隊(duì)列元素
insert(e);
} finally {
//插入完成,釋放鎖
lock.unlock();
}
}
而對(duì)于非阻塞式來(lái)說(shuō)爷绘,當(dāng)隊(duì)列中的元素已滿時(shí)书劝,并不會(huì)阻塞此線程的操作,而是讓其返回又或者是拋出異常土至。
//向隊(duì)列尾部添加元素购对,隊(duì)列滿了返回false
public boolean offer(E e) {
//不能插入非空元素,會(huì)拋出異常
checkNotNull(e);
//上鎖,保證數(shù)據(jù)安全
final ReentrantLock lock = this.lock;
lock.lock();
try {
//隊(duì)列中元素 == 數(shù)組長(zhǎng)度(隊(duì)列滿了),則返回false
if (count == items.length)
return false;
else {
//添加隊(duì)列元素
insert(e);
return true;
}
} finally {
//插入完成毙籽,釋放鎖
lock.unlock();
}
}
上面的offer(E e)并不會(huì)阻塞線程的執(zhí)行洞斯,但是如果想讓阻塞和非阻塞相結(jié)合的話,需要怎么處理?
ArrayBlockingQueue為我們提供了折中的方法--offer(E e, long timeout, TimeUnit unit);
向隊(duì)列尾部添加元素坑赡,可以設(shè)置線程等待時(shí)間烙如,如果超過(guò)指定時(shí)間隊(duì)列還是滿的,則返回false毅否;
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
//不能插入非空元素,會(huì)拋出異常
checkNotNull(e);
//轉(zhuǎn)換成超時(shí)時(shí)間閥值:
long nanos = unit.toNanos(timeout);
//上鎖亚铁,保證數(shù)據(jù)安全
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
//對(duì)隊(duì)列是否元素滿了,做判斷螟加。
while (count == items.length) {
//如果隊(duì)列是滿的徘溢,則每次遍歷都去遞減一次nanos的值
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
//添加隊(duì)列元素
insert(e);
return true;
} finally {
//插入完成,釋放鎖
lock.unlock();
}
}
以上添加方法捆探,都是通過(guò)返回false/true來(lái)實(shí)現(xiàn)的然爆,而在ArrayBlockingQueue中,還提供了集合最原始的插入方法--add(E e)黍图。
該方法在插入時(shí)候曾雕,如果隊(duì)列中的元素滿了,則會(huì)拋出異常助被。如果插入成功剖张,則返回true切诀。
在add(E e)中,使用父類的add(E e),實(shí)際上其底層也是調(diào)用的offer(E e)方法搔弄。
//向隊(duì)列尾部添加元素幅虑,隊(duì)列滿了拋出異常;
public boolean add(E e) {
return super.add(e);
}
ArrayBlockingQueue中顾犹,最底層的插入方法倒庵,上面的各種實(shí)現(xiàn),都是基于insert(E x)來(lái)實(shí)現(xiàn)的蹦渣。由于insert(E x)是用private來(lái)修飾的哄芜,所以我們不能直接對(duì)其進(jìn)行調(diào)用。
//插入元素到隊(duì)尾柬唯,調(diào)整putIndex认臊,喚起等待的獲取線程
private void insert(E x) {
//向數(shù)組中插入元素
items[putIndex] = x;
//設(shè)置下一個(gè)被取出元素的索引
putIndex = inc(putIndex);
//增加隊(duì)列元素個(gè)數(shù):
++count;
//喚醒notEmpty上的等待線程
notEmpty.signal();
}
- 獲取元素
//獲取隊(duì)列頭部元素,如果隊(duì)列為空锄奢,則返回null.不為空失晴。
// 則返回隊(duì)列頭部,并從隊(duì)列中刪除拘央。
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : extract();
} finally {
lock.unlock();
}
}
//返回隊(duì)列的頭部元素涂屁,并從隊(duì)列中刪除。如果隊(duì)列為空灰伟,則等待
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
//如果隊(duì)列為空拆又,則進(jìn)行等待
while (count == 0)
notEmpty.await();
//獲取頭部元素:
return extract();
} finally {
lock.unlock();
}
}
//獲取隊(duì)列頭部元素,如果隊(duì)列為空栏账,則設(shè)置線程等待時(shí)間帖族,超過(guò)指定時(shí)間,還為空挡爵,則返回null竖般。
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
return extract();
} finally {
lock.unlock();
}
}
以上就是關(guān)于ArrayBlockingQueue的全部?jī)?nèi)容!下面茶鹃,我們繼續(xù)說(shuō)說(shuō)LinkedBlockingQueue