清明節(jié)和朋友去被抖音帶火的一個餐廳敞恋,下午兩點(diǎn)鐘取晚上的號丽啡,前面已經(jīng)有十幾桌了,四點(diǎn)半餐廳開始正式營業(yè)硬猫,等輪到我們已經(jīng)近八點(diǎn)了补箍。餐廳分為幾個區(qū)域,只有最火的區(qū)域(在小船上)需要排號浦徊,其他區(qū)域基本上是隨到隨吃的馏予,最冷清的區(qū)域幾乎都沒什么人。菜的價格異常的貴盔性,味道也并不好霞丧。最后送出兩張圖:
好了,進(jìn)入今天的正題冕香,今天要講的是ArrayBlockQueue蛹尝,ArrayBlockQueue是JUC提供的線程安全的有界的阻塞隊(duì)列,一看到Array悉尾,第一反應(yīng):這貨肯定和數(shù)組有關(guān)突那,既然是數(shù)組,那自然是有界的了构眯,我們先來看看ArrayBlockQueue的基本使用方法愕难,然后再看看ArrayBlockQueue的源碼。
ArrayBlockQueue基本使用
public static void main(String[] args) throws InterruptedException {
ArrayBlockingQueue<Integer> arrayBlockingQueue=new ArrayBlockingQueue(5);
arrayBlockingQueue.offer(10);
arrayBlockingQueue.offer(50);
arrayBlockingQueue.add(20);
arrayBlockingQueue.add(60);
System.out.println(arrayBlockingQueue);
System.out.println(arrayBlockingQueue.poll());
System.out.println(arrayBlockingQueue);
System.out.println(arrayBlockingQueue.take());
System.out.println(arrayBlockingQueue);
System.out.println(arrayBlockingQueue.peek());
System.out.println(arrayBlockingQueue);
}
運(yùn)行結(jié)果:
- 創(chuàng)建了一個長度為5的ArrayBlockQueue惫霸。
- 用offer方法猫缭,向ArrayBlockQueue添加了兩個元素,分別是10壹店,50猜丹。
- 用put方法,向ArrayBlockQueue添加了兩個元素硅卢,分別是20射窒,60藏杖。
- 打印出ArrayBlockQueue,結(jié)果是10脉顿,50蝌麸,20,60弊予。
- 用poll方法祥楣,彈出ArrayBlockQueue第一個元素,并且打印出來:10汉柒。
- 打印出ArrayBlockQueue误褪,結(jié)果是50,20碾褂,60兽间。
- 用take方法,彈出ArrayBlockQueue第一個元素正塌,并且打印出來:50嘀略。
- 打印出ArrayBlockQueue,結(jié)果是20乓诽,60帜羊。
- 用peek方法,彈出ArrayBlockQueue第一個元素鸠天,并且打印出來:20讼育。
- 打印出ArrayBlockQueue,結(jié)果是20稠集,60奶段。
代碼比較簡單,但是你肯定會有疑問
- offer/add(在上面的代碼中沒有演示)/put都是往隊(duì)列里面添加元素剥纷,區(qū)別是什么痹籍?
- poll/take/peek都是彈出隊(duì)列的元素,區(qū)別是什么晦鞋?
- 底層代碼是如何保證線程安全的蹲缠?
- 數(shù)據(jù)保存在哪里?
要解決上面幾個疑問悠垛,最好的辦法當(dāng)然是看下源碼线定,通過親自閱讀源碼所產(chǎn)生的印象遠(yuǎn)遠(yuǎn)要比看視頻,看博客鼎文,死記硬背最后的結(jié)論要深刻的多。就算真的忘記了因俐,只要再看看源碼拇惋,瞬間可以回憶起來周偎。
ArrayBlockQueue源碼解析
構(gòu)造方法
ArrayBlockQueue提供了三個構(gòu)造方法,如下圖所示:
ArrayBlockingQueue(int capacity)
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
這是最常用的構(gòu)造方法撑帖,傳入capacity蓉坎,capacity是容量的意思,也就是ArrayBlockingQueue的最大長度胡嘿,方法內(nèi)部直接調(diào)用了第二個構(gòu)造方法蛉艾,傳入的第二個參數(shù)為false。
ArrayBlockingQueue(int capacity, boolean fair)
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
這個構(gòu)造方法接受兩個參數(shù)衷敌,分別是capacity和fair勿侯,fair是boolean類型的,代表是公平鎖缴罗,還是非公平鎖助琐,可以看出如果我們用第一個構(gòu)造方法來創(chuàng)建ArrayBlockingQueue的話,采用的是非公平鎖面氓,因?yàn)楣芥i會損失一定的性能兵钮,在沒有充足的理由的情況下,是沒有必要采用公平鎖的舌界。
方法內(nèi)部做了幾件事情:
- 創(chuàng)建Object類型的數(shù)組掘譬,容量為capacity,并且賦值給當(dāng)前類對象的items呻拌。
- 創(chuàng)建排他鎖葱轩。
- 創(chuàng)建條件變量notEmpty 。
- 創(chuàng)建條件變量notFull柏锄。
至于排他鎖和兩個條件變量是做什么用的酿箭,看到后面就明白了。
ArrayBlockingQueue(int capacity, boolean fair,Collection<? extends E> c)
public ArrayBlockingQueue(int capacity, boolean fair,
Collection<? extends E> c) {
//調(diào)用第二個構(gòu)造方法趾娃,方法內(nèi)部就是初始化數(shù)組缭嫡,排他鎖,兩個條件變量
this(capacity, fair);
final ReentrantLock lock = this.lock;
lock.lock(); // 開啟排他鎖
try {
int i = 0;
try {
// 循環(huán)傳入的集合抬闷,把集合中的元素賦值給items數(shù)組妇蛀,其中i會自增
for (E e : c) {
checkNotNull(e);
items[i++] = e;
}
} catch (ArrayIndexOutOfBoundsException ex) {
throw new IllegalArgumentException();
}
count = i;//把i賦值給count
//如果i==capacity,也就是到了最大容量笤成,把0賦值給putIndex评架,否則把i賦值給putIndex
putIndex = (i == capacity) ? 0 : i;
} finally {
lock.unlock();//釋放排他鎖
}
}
- 調(diào)用第二個構(gòu)造方法,方法內(nèi)部就是初始化數(shù)組items炕泳,排他鎖lock纵诞,以及兩個條件變量。
- 開啟排他鎖培遵。
- 循環(huán)傳入的集合浙芙,將集合中的元素賦值給items數(shù)組登刺,其中i會自增。
- 把i賦值給count嗡呼。
- 如果i==capacity纸俭,說明到了最大的容量,就把0賦值給putIndex南窗,否則把i賦值給putIndex揍很。
- 在finally中釋放排他鎖。
看到這里万伤,我們應(yīng)該明白這個構(gòu)造方法的作用是什么了窒悔,就是把傳入的集合作為ArrayBlockingQueuede初始化數(shù)據(jù),但是我們又會有一個新的疑問:count壕翩,putIndex 是做什么用的。
offer(E e)
public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();//開啟排他鎖
try {
if (count == items.length)//如果count==items.length放妈,返回false
return false;
else {
enqueue(e);//入隊(duì)
return true;//返回true
}
} finally {
lock.unlock();//釋放鎖
}
}
- 開啟排他鎖北救。
- 如果count==items.length,也就是到了最大的容量芜抒,返回false珍策。
- 如果count<items.length,執(zhí)行入隊(duì)方法宅倒,并且返回true攘宙。
- 釋放排他鎖。
看到這里拐迁,我們應(yīng)該可以明白了蹭劈,ArrayBlockQueue是如何保證線程安全的,還是利用了ReentrantLock排他鎖线召,count就是用來保存數(shù)組的當(dāng)前大小的铺韧。我們再來看看enqueue方法。
private void enqueue(E x) {
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();
}
這方法比較簡單缓淹,在代碼里面就不寫注釋了哈打,做了如下的操作:
- 把x賦值給items[putIndex] 。
- 將putIndex進(jìn)行自增讯壶,如果自增后的值 == items.length料仗,把0賦值給putIndex 。
- 執(zhí)行count++操作伏蚊。
- 調(diào)用條件變量notEmpty的signal方法立轧,說明在某個地方,必定調(diào)用了notEmpty的await方法,這里就是喚醒因?yàn)檎{(diào)用notEmpty的await方法而被阻塞的線程氛改。
這里就解答了一個疑問:putIndex是做什么的匀借,就是入隊(duì)元素的下標(biāo)。
add(E e)
public boolean add(E e) {
return super.add(e);
}
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
這個方法內(nèi)部最終還是調(diào)用的offer方法平窘。
put(E e)
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();//開啟響應(yīng)中斷的排他鎖
try {
while (count == items.length)//如果隊(duì)列滿了,調(diào)用notFull的await
notFull.await();
enqueue(e);//入隊(duì)
} finally {
lock.unlock();//釋放排他鎖
}
}
- 開啟響應(yīng)中斷的排他鎖凳怨,如果在獲取鎖的過程中瑰艘,當(dāng)前的線程被中斷,會拋出異常肤舞。
- 如果隊(duì)列滿了紫新,調(diào)用notFull的await方法,說明在某個地方李剖,必定調(diào)用了notFull的signal方法來喚醒當(dāng)前線程芒率。
- 執(zhí)行入隊(duì)操作。
- 釋放排他鎖篙顺。
可以看到put方法和 offer/add方法的區(qū)別了:
- offer/add:如果隊(duì)列滿了偶芍,直接返回false。
- put:如果隊(duì)列滿了德玫,當(dāng)前線程被阻塞,等待喚醒材彪。
poll()
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
- 開啟排他鎖琴儿。
- 如果count==0,直接返回null造成,否則執(zhí)行dequeue出隊(duì)操作。
- 釋放排他鎖佃延。
我們來看dequeue方法:
private E dequeue() {
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];//獲得元素的值
items[takeIndex] = null;//把null賦值給items[takeIndex]
if (++takeIndex == items.length)//如果takeIndex自增后的值== items.length履肃,就把0賦值給takeIndex
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();//喚醒因?yàn)檎{(diào)用notFull的await方法而被阻塞的線程
return x;
}
- 獲取元素的值坐桩,takeIndex保存的是出隊(duì)的下標(biāo)。
- 把null賦值給items[takeIndex]成福,也就是清空被彈出的元素。
- 如果takeIndex自增后的值== items.length奴艾,就把0賦值給takeIndex蕴潦。
- count--潭苞。
- 喚醒因?yàn)檎{(diào)用notFull的await方法而被阻塞的線程真朗。
這里調(diào)用了notFull的signal方法來喚醒因?yàn)檎{(diào)用notFull的await方法而被阻塞的線程,那到底在哪里調(diào)用了notFull的await方法呢蝗碎,還記不記得在put方法中調(diào)用了notFull的await方法衍菱,我們再看看:
while (count == items.length)
notFull.await();
當(dāng)隊(duì)列滿了肩豁,就調(diào)用 notFull.await()來等待,在出隊(duì)操作中琼锋,又調(diào)用了notFull.signal()來喚醒缕坎。
take()
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
- 開啟排他鎖谜叹。
- 如果count==0,代表隊(duì)列是空的荷腊,則調(diào)用notEmpty的await方法女仰。
- 執(zhí)行出隊(duì)操作疾忍。
- 釋放排他鎖。
這里調(diào)用了notEmpty的await方法一罩,那么哪里調(diào)用了notEmpty的signal方法呢聂渊?在enqueue入隊(duì)方法里歧沪。
我們可以看到take和poll的區(qū)別:
- take:如果隊(duì)列為空诊胞,會阻塞锹杈,直到被喚醒了。
- poll: 如果隊(duì)列為空邪码,直接返回null咬清。
peek()
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return itemAt(takeIndex);
} finally {
lock.unlock();
}
}
final E itemAt(int i) {
return (E) items[i];
}
- 開啟排他鎖影钉。
- 獲得元素掘剪。
- 釋放排他鎖。
我們可以看到peek和poll/take的區(qū)別:
- peek夺谁,只是獲取元素匾鸥,不會清空元素。
- poll/take岗照,獲取并清空元素。
size()
public int size() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return count;
} finally {
lock.unlock();
}
}
- 開啟排他鎖厚者。
- 返回count库菲。
- 釋放排他鎖熙宇。
總結(jié)
至此溉浙,ArrayBlockQueue的核心源碼就分析完畢了戳稽,我們來做一個總結(jié):
- ArrayBlockQueue有幾個比較重要的字段,分別是items互躬,保存的是隊(duì)列的數(shù)據(jù)吼渡,putIndex保存的是入隊(duì)的下標(biāo)乓序,takeIndex保存的是出隊(duì)的下標(biāo)替劈,count用來統(tǒng)計隊(duì)列元素的個數(shù)抬纸,lock用來保證線程的安全性湿故,notEmpty和notFull兩個條件變量實(shí)現(xiàn)喚醒和阻塞。
- offer和add是一樣的脖阵,其中add方法內(nèi)部調(diào)用的就是offer方法命黔,如果隊(duì)列滿了,直接返回false悍募。
- put坠宴,如果隊(duì)列滿了,會被阻塞副砍。
- peek豁翎,只是彈出元素,不會清空元素心剥。
- poll,彈出并清空元素胳赌,如果隊(duì)列為空疑苫,直接返回null。
- take撼短,彈出并清空元素曲横,如果隊(duì)列為空禾嫉,會被阻塞蚊丐。