上一篇博客纺涤,我們介紹了ArrayBlockQueue桦锄,知道了它是基于數(shù)組實(shí)現(xiàn)的有界阻塞隊(duì)列扎附,既然有基于數(shù)組實(shí)現(xiàn)的,那么一定有基于鏈表實(shí)現(xiàn)的隊(duì)列了结耀,沒(méi)錯(cuò)留夜,當(dāng)然有,這就是我們今天的主角:LinkedBlockingQueue图甜。ArrayBlockQueue是有界的碍粥,那么LinkedBlockingQueue是有界還是無(wú)界的呢?我覺(jué)得可以說(shuō)是有界的黑毅,也可以說(shuō)是無(wú)界的即纲,為什么這么說(shuō)呢?看下去你就知道了博肋。
和上篇博客一樣低斋,我們還是先看下LinkedBlockingQueue的基本應(yīng)用,然后解析LinkedBlockingQueue的核心代碼匪凡。
LinkedBlockingQueue基本應(yīng)用
public static void main(String[] args) throws InterruptedException {
LinkedBlockingQueue<Integer> linkedBlockingQueue = new LinkedBlockingQueue();
linkedBlockingQueue.add(15);
linkedBlockingQueue.add(60);
linkedBlockingQueue.offer(50);
linkedBlockingQueue.put(100);
System.out.println(linkedBlockingQueue);
System.out.println(linkedBlockingQueue.size());
System.out.println(linkedBlockingQueue.take());
System.out.println(linkedBlockingQueue);
System.out.println(linkedBlockingQueue.poll());
System.out.println(linkedBlockingQueue);
System.out.println(linkedBlockingQueue.peek());
System.out.println(linkedBlockingQueue);
System.out.println(linkedBlockingQueue.remove(50));
System.out.println(linkedBlockingQueue);
}
運(yùn)行結(jié)果:
[15, 60, 50, 100]
4
15
[60, 50, 100]
60
[50, 100]
50
[50, 100]
true
[100]
代碼比較簡(jiǎn)單膊畴,先試著分析下:
- 創(chuàng)建了一個(gè)LinkedBlockingQueue 。
- 分別使用add/offer/put方法向LinkedBlockingQueue中添加元素病游,其中add方法執(zhí)行了兩次唇跨。
- 打印出LinkedBlockingQueue:[15, 60, 50, 100]稠通。
- 打印出LinkedBlockingQueue的size:4。
- 使用take方法彈出第一個(gè)元素买猖,并打印出來(lái):15改橘。
- 打印出LinkedBlockingQueue:[60, 50, 100]。
- 使用poll方法彈出第一個(gè)元素玉控,并打印出來(lái):60飞主。
- 打印出LinkedBlockingQueue:[50, 100]。
- 使用peek方法彈出第一個(gè)元素高诺,并打印出來(lái):50碌识。
- 打印出LinkedBlockingQueue:[50, 10]。
- 使用remove方法虱而,移除值為50的元素筏餐,返回true。
- 打印出LinkedBlockingQueue:100牡拇。
代碼比較簡(jiǎn)單魁瞪,但是還是有些細(xì)節(jié)不明白:
- 底層是如何保證線程安全性的?
- 數(shù)據(jù)保存在哪里,以什么形式保存的?
- offer/add/put都是往隊(duì)列里面添加元素惠呼,區(qū)別是什么导俘?
- poll/take/peek都是彈出隊(duì)列的元素,區(qū)別是什么罢杉?
要解決上面的疑問(wèn)趟畏,最好的途徑還是看源碼贡歧,下面我們就來(lái)看看LinkedBlockingQueue的核心源碼滩租。
LinkedBlockingQueue源碼解析
構(gòu)造方法
LinkedBlockingQueue提供了三個(gè)構(gòu)造方法,如下圖所示:
我們一個(gè)一個(gè)來(lái)分析利朵。
LinkedBlockingQueue()
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
無(wú)參的構(gòu)造方法竟然直接把“鍋”甩出去了律想,甩給了另外一個(gè)構(gòu)造方法,但是我們要注意傳的參數(shù):Integer.MAX_VALUE绍弟。
LinkedBlockingQueue(int capacity)
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
- 判斷傳入的capacity是否合法技即,如果不大于0,直接拋出異常樟遣。
- 把傳入的capacity賦值給capacity而叼。
- 新建一個(gè)Node節(jié)點(diǎn),并且把此節(jié)點(diǎn)賦值給head和last字段豹悬。
這個(gè)capacity是什么呢葵陵?如果大家對(duì)代碼有一定的感覺(jué)的話,應(yīng)該很容易猜到這是LinkedBlockingQueue的最大容量瞻佛。如果我們調(diào)用無(wú)參的構(gòu)造方法來(lái)創(chuàng)建LinkedBlockingQueue的話脱篙,那么它的最大容量就是Integer.MAX_VALUE娇钱,我們把它稱為“無(wú)界”,但是我們也可以指定最大容量绊困,那么此隊(duì)列又是一個(gè)“有界”隊(duì)列了文搂,所以有些博客很草率的說(shuō)LinkedBlockingQueue是有界隊(duì)列,或者是無(wú)界隊(duì)列秤朗,個(gè)人認(rèn)為這是不嚴(yán)謹(jǐn)?shù)摹?/p>
我們?cè)賮?lái)看看這個(gè)Node是個(gè)什么鬼:
static class Node<E> {
E item;
Node<E> next;
Node(E x) { item = x; }
}
是不是有一種莫名的親切感煤蹭,很明顯,這是單向鏈表的實(shí)現(xiàn)呀川梅,next指向的就是下一個(gè)Node疯兼。
LinkedBlockingQueue(Collection<? extends E> c)
public LinkedBlockingQueue(Collection<? extends E> c) {
this(Integer.MAX_VALUE);//調(diào)用第二個(gè)構(gòu)造方法,傳入的capacity是Int的最大值贫途,可以說(shuō) 是一個(gè)無(wú)界隊(duì)列吧彪。
final ReentrantLock putLock = this.putLock;
putLock.lock(); //開(kāi)啟排他鎖
try {
int n = 0;//用于記錄LinkedBlockingQueue的size
//循環(huán)傳入的c集合
for (E e : c) {
if (e == null)//如果e==null,則拋出空指針異常
throw new NullPointerException();
if (n == capacity)//如果n==capacity丢早,說(shuō)明到了最大的容量姨裸,則拋出“Queue full”異常
throw new IllegalStateException("Queue full");
enqueue(new Node<E>(e));//入隊(duì)操作
++n;//n自增
}
count.set(n);//設(shè)置count
} finally {
putLock.unlock();//釋放排他鎖
}
}
- 調(diào)用第二個(gè)構(gòu)造方法,傳入了int的最大值怨酝,所以可以說(shuō)此時(shí)LinkedBlockingQueue是無(wú)界隊(duì)列傀缩。
- 開(kāi)啟排他鎖putLock 。
- 定義了一個(gè)變量n农猬,用來(lái)記錄當(dāng)前LinkedBlockingQueue的size赡艰。
- 循環(huán)傳入的集合,如果其中的元素為null斤葱,則拋出空指針異常慷垮,如果n==capacity,說(shuō)明到了最大的容量揍堕,則拋出“Queue full”異常料身,否則執(zhí)行enqueue操作來(lái)進(jìn)行入隊(duì),然后n進(jìn)行自增衩茸。
- 設(shè)置count為n芹血,由此可知,count就是LinkedBlockingQueue的size了楞慈。
- 在finally中釋放排他鎖putLock 幔烛。
offer
public boolean offer(E e) {
if (e == null) throw new NullPointerException();//如果傳入的元素為NULL,拋出異常
final AtomicInteger count = this.count;//取出count
if (count.get() == capacity)//如果count==capacity囊蓝,說(shuō)明到了最大容量饿悬,直接返回false
return false;
int c = -1;//表示size
Node<E> node = new Node<E>(e);//新建Node節(jié)點(diǎn)
final ReentrantLock putLock = this.putLock;
putLock.lock();//開(kāi)啟排他鎖
try {
if (count.get() < capacity) {//如果count<capacity,說(shuō)明還沒(méi)有達(dá)到最大容量
enqueue(node);//入隊(duì)操作
c = count.getAndIncrement();//獲得count慎颗,賦值給c后完成自增操作
if (c + 1 < capacity)//如果c+1 <capacity乡恕,說(shuō)明還有剩余的空間言询,喚醒因?yàn)檎{(diào)用notFull的await方法而被阻塞的線程
notFull.signal();
}
} finally {
putLock.unlock();//在finally中釋放排他鎖
}
if (c == 0)//如果c==0,說(shuō)明釋放putLock的時(shí)候傲宜,隊(duì)列中有一個(gè)元素运杭,則調(diào)用signalNotEmpty
signalNotEmpty();
return c >= 0;
}
- 如果傳進(jìn)來(lái)的元素為null,則拋出異常函卒。
- 把本類(lèi)實(shí)例的count賦值給局部變量count辆憔。
- 如果count==capacity,說(shuō)明到了最大的容量报嵌,直接返回false虱咧。
- 定義局部變量c,用來(lái)表示size锚国,初始值是-1腕巡。
- 新建Node節(jié)點(diǎn)。
- 開(kāi)啟排他鎖putLock血筑。
- 如果count>=capacity绘沉,說(shuō)明到了最大的容量,釋放排他鎖后豺总,返回false车伞,因?yàn)榇藭r(shí)c=-1,c>=0為false喻喳;如果count<capacity另玖,說(shuō)明還有剩余空間,繼續(xù)往下執(zhí)行表伦。這里需要思考一個(gè)問(wèn)題谦去,為什么第三步已經(jīng)判斷過(guò)了是否還有剩余空間,這里還要再判斷一次呢绑榴?因?yàn)榭赡苡卸鄠€(gè)線程都在執(zhí)行add/offer/put方法哪轿,當(dāng)隊(duì)列沒(méi)有滿的時(shí)候盈魁,多個(gè)線程同時(shí)執(zhí)行到第三步(第三步的時(shí)候還沒(méi)有開(kāi)啟排他鎖)翔怎,然后同時(shí)往下走,所以開(kāi)啟排他鎖后杨耙,還需要重新判斷下赤套。
- 執(zhí)行入隊(duì)操作。
- 獲得count珊膜,并且賦值給c后容握,完成自增的操作。注意车柠,是先賦值后自增剔氏,賦值和自增的先后順序會(huì)直接影響到后面的判斷邏輯塑猖。
- 如果c+1<capacity,說(shuō)明還有剩余的空間谈跛,喚醒因?yàn)檎{(diào)用notFull的await方法而被阻塞的線程羊苟。這里為什么要+1再進(jìn)行判斷?因?yàn)樵诘?步中感憾,是先賦值后自增蜡励,也就是說(shuō)局部變量c保存的還是入隊(duì)之前LinkedBlockingQueue的size,所以要先進(jìn)行+1操作阻桅,得到的才是當(dāng)前LinkedBlockingQueue的size凉倚。
- 在finally中,釋放排他鎖putLock嫂沉。
- 如果c==0稽寒,說(shuō)明在釋放putLock排他鎖的時(shí)候,隊(duì)列中有且只有一個(gè)元素趟章,則調(diào)用signalNotEmpty方法瓦胎。讓我們來(lái)看看signalNotEmpty方法:
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
代碼比較簡(jiǎn)單,就是開(kāi)啟排他鎖尤揣,喚醒因?yàn)檎{(diào)用notEmpty的await方法而被阻塞的線程搔啊,但是這里需要注意,這里獲得的排他鎖已經(jīng)不再是putLock北戏,而是takeLock负芋。
add
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
add方法直接調(diào)用了offer方法,但是add和offer還不完全一樣嗜愈,當(dāng)隊(duì)列滿了旧蛾,如果調(diào)用offer方法,會(huì)直接返回false蠕嫁,但是調(diào)用add方法锨天,會(huì)拋出"Queue full"的異常。
put
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();//如果傳入的元素為NULL剃毒,拋出異常
int c = -1;//表示size
Node<E> node = new Node<E>(e);//新建Node節(jié)點(diǎn)
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;//獲得count
putLock.lockInterruptibly();//開(kāi)啟排他鎖
try {
//如果到了最大容量病袄,調(diào)用notFull的await方法,等待喚醒
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);//入隊(duì)
c = count.getAndIncrement();//count先賦值給c后赘阀,再進(jìn)行自增操作
if (c + 1 < capacity)//如果c+1<capacity益缠,調(diào)用notFull的signal方法,喚醒因?yàn)檎{(diào)用notFull的await方法而被阻塞的線程
notFull.signal();
} finally {
putLock.unlock();//釋放排他鎖
}
if (c == 0)//如果隊(duì)列中有一個(gè)元素基公,喚醒因?yàn)檎{(diào)用notEmpty的await方法而被阻塞的線程
signalNotEmpty();
}
- 如果傳入的元素為NULL幅慌,則拋出異常。
- 定義一個(gè)局部變量c轰豆,來(lái)表示size胰伍,初始值是-1齿诞。
- 新建Node節(jié)點(diǎn)。
- 把本類(lèi)實(shí)例中的count賦值給局部變量count骂租。
- 開(kāi)啟排他鎖putLock掌挚。
- 如果到了最大容量,則調(diào)用notFull的await方法菩咨,阻塞當(dāng)前線程吠式,等待其他線程調(diào)用notFull的signal方法來(lái)喚醒自己。
- 執(zhí)行入隊(duì)操作抽米。
- count先賦值給c后特占,再進(jìn)行自增操作。
- 如果c+1<capacity云茸,說(shuō)明還有剩余的空間是目,則調(diào)用notFull的signal方法,喚醒因?yàn)檎{(diào)用notFull的await方法而被阻塞的線程标捺。
- 釋放排他鎖putLock懊纳。
- 如果隊(duì)列中有且只有一個(gè)元素,喚醒因?yàn)檎{(diào)用notEmpty的await方法而被阻塞的線程亡容。
enqueue
private void enqueue(Node<E> node) {
last = last.next = node;
}
入隊(duì)操作是不是特別簡(jiǎn)單嗤疯,就是把傳入的Node節(jié)點(diǎn),賦值給last節(jié)點(diǎn)的next字段闺兢,再賦值給last字段茂缚,從而形成一個(gè)單向鏈表。
小總結(jié)
至此offer/add/put的核心源碼已經(jīng)分析完畢屋谭,我們來(lái)做一個(gè)小總結(jié)脚囊,offer/add/put都是添加元素的方法,不過(guò)他們之間還是有所區(qū)別的桐磁,當(dāng)隊(duì)列滿了悔耘,調(diào)用以上三個(gè)方法會(huì)出現(xiàn)不同的情況:
- offer:直接返回false。
- add:雖然內(nèi)部也調(diào)用了offer方法我擂,但是隊(duì)列滿了衬以,會(huì)拋出異常。
- put:線程會(huì)阻塞住扶踊,等待喚醒泄鹏。
size
public int size() {
return count.get();
}
沒(méi)什么好說(shuō)的郎任,count記錄著LinkedBlockingQueue的size秧耗,獲得后返回就是了。
take
public E take() throws InterruptedException {
E x;
int c = -1;//size
final AtomicInteger count = this.count;//獲得count
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();//開(kāi)啟排他鎖
try {
while (count.get() == 0) {//說(shuō)明目前隊(duì)列中沒(méi)有數(shù)據(jù)
notEmpty.await();//阻塞舶治,等待喚醒
}
x = dequeue();//出隊(duì)
c = count.getAndDecrement();//先賦值分井,后自減
if (c > 1)//如果size>1车猬,說(shuō)明在出隊(duì)之前,隊(duì)列中有至少兩個(gè)元素
notEmpty.signal();//喚醒因?yàn)檎{(diào)用notEmpty的await方法而被阻塞的線程
} finally {
takeLock.unlock();//釋放排他鎖
}
if (c == capacity)//如果隊(duì)列中還有一個(gè)剩余空間
signalNotFull();
return x;
}
- 定義局部變量c尺锚,用來(lái)表示size珠闰,初始值是-1。
- 把本類(lèi)實(shí)例的count字段賦值給臨時(shí)變量count瘫辩。
- 開(kāi)啟響應(yīng)中斷的排他鎖takeLock 伏嗜。
- 如果count==0,說(shuō)明目前隊(duì)列中沒(méi)有數(shù)據(jù)伐厌,就阻塞當(dāng)前線程承绸,等待喚醒,直到其他線程調(diào)用了notEmpty的signal方法喚醒了當(dāng)前線程挣轨。
- 進(jìn)行出隊(duì)操作军熏。
- count先賦值給c后,在進(jìn)行自減操作卷扮,這里需要注意是先賦值荡澎,后自減。
- 如果c>1晤锹,也就是size>1摩幔,結(jié)合上面的先賦值,后自減鞭铆,可知如果滿足條件热鞍,說(shuō)明在出隊(duì)之前,隊(duì)列中至少有兩個(gè)元素衔彻,則調(diào)用notEmpty的signal方法薇宠,喚醒因?yàn)檎{(diào)用notEmpty的await方法而被阻塞的線程。
- 釋放排他鎖takeLock 艰额。
- 如果執(zhí)行出隊(duì)后澄港,隊(duì)列中有且只有一個(gè)剩余空間,換個(gè)說(shuō)法柄沮,就是執(zhí)行出隊(duì)操作前回梧,隊(duì)列是滿的,則調(diào)用signalNotFull方法祖搓。
我們?cè)賮?lái)看下signalNotFull方法:
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}
- 開(kāi)啟排他鎖狱意,注意這里的排他鎖是putLock 。
- 調(diào)用notFull的signal方法拯欧,喚醒因?yàn)檎{(diào)用notFull的await方法而被阻塞的線程详囤。
- 釋放排他鎖putLock 。
poll
public E poll() {
final AtomicInteger count = this.count;
if (count.get() == 0)
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
if (count.get() > 0) {
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
相比take方法,最大的區(qū)別就如果隊(duì)列為空藏姐,執(zhí)行take方法會(huì)阻塞當(dāng)前線程隆箩,直到被喚醒,而poll方法羔杨,直接返回null捌臊。
peek
public E peek() {
if (count.get() == 0)
return null;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
Node<E> first = head.next;
if (first == null)
return null;
else
return first.item;
} finally {
takeLock.unlock();
}
}
peek方法,只是拿到頭節(jié)點(diǎn)的值兜材,但是不會(huì)移除該節(jié)點(diǎn)理澎。
dequeue
private E dequeue() {
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
沒(méi)什么好說(shuō)的,就是彈出元素曙寡,并且移除彈出的元素矾端。
小總結(jié)
至此take/poll/peek的核心源碼已經(jīng)分析完畢,我們來(lái)做一個(gè)小總結(jié)卵皂,take/poll/peek都是獲得頭節(jié)點(diǎn)值的方法秩铆,不過(guò)他們之間還是有所區(qū)別的:
- take:當(dāng)隊(duì)列為空,會(huì)阻塞當(dāng)前線程灯变,直到被喚醒殴玛。會(huì)進(jìn)行出隊(duì)操作,移除獲得的節(jié)點(diǎn)添祸。
- poll:當(dāng)隊(duì)列為空滚粟,直接返回null。會(huì)進(jìn)行出隊(duì)操作刃泌,移除獲得的節(jié)點(diǎn)凡壤。
- peek:當(dāng)隊(duì)列為空,直接返回null耙替。不會(huì)移除節(jié)點(diǎn)亚侠。
LinkedBlockingQueue的核心源碼分析到這里完畢了,謝謝大家俗扇。