1. 前言
BlockingQueue即阻塞隊(duì)列凳宙,它是基于ReentrantLock胚吁,依據(jù)它的基本原理雕拼,我們可以實(shí)現(xiàn)Web中的長(zhǎng)連接聊天功能度帮,當(dāng)然其最常用的還是用于實(shí)現(xiàn)生產(chǎn)者與消費(fèi)者模式,大致如下圖所示:
在Java中定鸟,BlockingQueue是一個(gè)接口而涉,它的實(shí)現(xiàn)類有ArrayBlockingQueue、DelayQueue联予、 LinkedBlockingDeque啼县、LinkedBlockingQueue、PriorityBlockingQueue沸久、SynchronousQueue等季眷,它們的區(qū)別主要體現(xiàn)在存儲(chǔ)結(jié)構(gòu)上或?qū)υ夭僮魃系牟煌菍?duì)于take與put操作的原理卷胯,卻是類似的子刮。
2. 阻塞與非阻塞
入隊(duì)
offer(E e):如果隊(duì)列沒(méi)滿,立即返回true窑睁; 如果隊(duì)列滿了挺峡,立即返回false-->不阻塞
put(E e):如果隊(duì)列滿了,一直阻塞担钮,直到隊(duì)列不滿了或者線程被中斷-->阻塞
offer(E e, long timeout, TimeUnit unit):在隊(duì)尾插入一個(gè)元素,橱赠,如果隊(duì)列已滿,則進(jìn)入等待箫津,直到出現(xiàn)以下三種情況:-->阻塞
被喚醒
等待時(shí)間超時(shí)
當(dāng)前線程被中斷
出隊(duì)
poll():如果沒(méi)有元素狭姨,直接返回null;如果有元素鲤嫡,出隊(duì)
take():如果隊(duì)列空了送挑,一直阻塞,直到隊(duì)列不為空或者線程被中斷-->阻塞
poll(long timeout, TimeUnit unit):如果隊(duì)列不空暖眼,出隊(duì);如果隊(duì)列已空且已經(jīng)超時(shí)纺裁,返回null诫肠;如果隊(duì)列已空且時(shí)間未超時(shí),則進(jìn)入等待欺缘,直到出現(xiàn)以下三種情況:
被喚醒
等待時(shí)間超時(shí)
當(dāng)前線程被中斷
3.?LinkedBlockingQueue 源碼分析
LinkedBlockingQueue是一個(gè)基于鏈表實(shí)現(xiàn)的可選容量的阻塞隊(duì)列栋豫。隊(duì)頭的元素是插入時(shí)間最長(zhǎng)的,隊(duì)尾的元素是最新插入的谚殊。新的元素將會(huì)被插入到隊(duì)列的尾部丧鸯。?
LinkedBlockingQueue的容量限制是可選的,如果在初始化時(shí)沒(méi)有指定容量嫩絮,那么默認(rèn)使用int的最大值作為隊(duì)列容量丛肢。
底層數(shù)據(jù)結(jié)構(gòu)
LinkedBlockingQueue內(nèi)部是使用鏈表實(shí)現(xiàn)一個(gè)隊(duì)列的围肥,但是卻有別于一般的隊(duì)列,在于該隊(duì)列至少有一個(gè)節(jié)點(diǎn)蜂怎,頭節(jié)點(diǎn)不含有元素穆刻。結(jié)構(gòu)圖如下:
原理
LinkedBlockingQueue中維持兩把鎖,一把鎖用于入隊(duì)杠步,一把鎖用于出隊(duì)氢伟,這也就意味著,同一時(shí)刻幽歼,只能有一個(gè)線程執(zhí)行入隊(duì)朵锣,其余執(zhí)行入隊(duì)的線程將會(huì)被阻塞;同時(shí)甸私,可以有另一個(gè)線程執(zhí)行出隊(duì)猪勇,其余執(zhí)行出隊(duì)的線程將會(huì)被阻塞。換句話說(shuō)颠蕴,雖然入隊(duì)和出隊(duì)兩個(gè)操作同時(shí)均只能有一個(gè)線程操作泣刹,但是可以一個(gè)入隊(duì)線程和一個(gè)出隊(duì)線程共同執(zhí)行,也就意味著可能同時(shí)有兩個(gè)線程在操作隊(duì)列犀被,那么為了維持線程安全椅您,LinkedBlockingQueue使用一個(gè)AtomicInterger類型的變量表示當(dāng)前隊(duì)列中含有的元素個(gè)數(shù),所以可以確保兩個(gè)線程之間操作底層隊(duì)列是線程安全的寡键。
源碼分析
LinkedBlockingQueue可以指定容量掀泳,內(nèi)部維持一個(gè)隊(duì)列,所以有一個(gè)頭節(jié)點(diǎn)head和一個(gè)尾節(jié)點(diǎn)last西轩,內(nèi)部維持兩把鎖员舵,一個(gè)用于入隊(duì),一個(gè)用于出隊(duì)藕畔,還有鎖關(guān)聯(lián)的Condition對(duì)象马僻。主要對(duì)象的定義如下:
//容量,如果沒(méi)有指定注服,該值為Integer.MAX_VALUE;
private final int capacity;
//當(dāng)前隊(duì)列中的元素
private final AtomicInteger count =new AtomicInteger();
//隊(duì)列頭節(jié)點(diǎn)韭邓,始終滿足head.item==null
transient Node head;
//隊(duì)列的尾節(jié)點(diǎn),始終滿足last.next==null
private transient Node last;
//用于出隊(duì)的鎖
private final ReentrantLock takeLock =new ReentrantLock();
//當(dāng)隊(duì)列為空時(shí)溶弟,保存執(zhí)行出隊(duì)的線程
private final Condition notEmpty = takeLock.newCondition();
//用于入隊(duì)的鎖
private final ReentrantLock putLock =new ReentrantLock();
//當(dāng)隊(duì)列滿時(shí)女淑,保存執(zhí)行入隊(duì)的線程
private final Condition notFull = putLock.newCondition();
put(E e)方法
put(E e)方法用于將一個(gè)元素插入到隊(duì)列的尾部,其實(shí)現(xiàn)如下:
public void put(E e)throws InterruptedException {
//不允許元素為null
? ? if (e ==null)
throw new NullPointerException();
? ? int c = -1;
? ? //以當(dāng)前元素新建一個(gè)節(jié)點(diǎn)
? ? Node node =new Node(e);
? ? final ReentrantLock putLock =this.putLock;
? ? final AtomicInteger count =this.count;
? ? //獲得入隊(duì)的鎖
? ? putLock.lockInterruptibly();
? ? try {
? ? ? ?//如果隊(duì)列已滿辜御,那么將該線程加入到Condition的等待隊(duì)列中
? ? ? ? while (count.get() == capacity) {
? ? ? ? ? ? ?notFull.await();
? ? ? ? }
? ? ? ?//將節(jié)點(diǎn)入隊(duì)
? ? ? ? enqueue(node);
? ? ? ? //得到插入之前隊(duì)列的元素個(gè)數(shù)
? ? ? ? c = count.getAndIncrement();
? ? ? ? //如果還可以插入元素鸭你,那么釋放等待的入隊(duì)線程
? ? ? ? if (c +1 < capacity){
? ? ? ? ? ? ? notFull.signal();
? ? ? ? }
}finally {
//解鎖
? ? ? ? putLock.unlock();
? ? }
//通知出隊(duì)線程隊(duì)列非空
? ? if (c ==0)
signalNotEmpty();
}
3.1 具體入隊(duì)與出隊(duì)的原理圖:
圖中每一個(gè)節(jié)點(diǎn)前半部分表示封裝的數(shù)據(jù)x,后邊的表示指向的下一個(gè)引用。
初始化之后袱巨,初始化一個(gè)數(shù)據(jù)為null阁谆,且head和last節(jié)點(diǎn)都是這個(gè)節(jié)點(diǎn)。
3.2瓣窄、入隊(duì)兩個(gè)元素過(guò)后
3.3笛厦、出隊(duì)一個(gè)元素后
put方法總結(jié):?
1. LinkedBlockingQueue不允許元素為null。?
2. 同一時(shí)刻俺夕,只能有一個(gè)線程執(zhí)行入隊(duì)操作裳凸,因?yàn)閜utLock在將元素插入到隊(duì)列尾部時(shí)加鎖了?
3. 如果隊(duì)列滿了,那么將會(huì)調(diào)用notFull的await()方法將該線程加入到Condition等待隊(duì)列中劝贸。await()方法就會(huì)釋放線程占有的鎖姨谷,這將導(dǎo)致之前由于被鎖阻塞的入隊(duì)線程將會(huì)獲取到鎖,執(zhí)行到while循環(huán)處映九,不過(guò)可能因?yàn)橛捎陉?duì)列仍舊是滿的梦湘,也被加入到條件隊(duì)列中。?
4. 一旦一個(gè)出隊(duì)線程取走了一個(gè)元素件甥,并通知了入隊(duì)等待隊(duì)列中可以釋放線程了捌议,那么第一個(gè)加入到Condition隊(duì)列中的將會(huì)被釋放,那么該線程將會(huì)重新獲得put鎖引有,繼而執(zhí)行enqueue()方法瓣颅,將節(jié)點(diǎn)插入到隊(duì)列的尾部?
5. 然后得到插入一個(gè)節(jié)點(diǎn)之前的元素個(gè)數(shù),如果隊(duì)列中還有空間可以插入譬正,那么就通知notFull條件的等待隊(duì)列中的線程宫补。?
6. 通知出隊(duì)線程隊(duì)列為空了,因?yàn)椴迦胍粋€(gè)元素之前的個(gè)數(shù)為0曾我,而插入一個(gè)之后隊(duì)列中的元素就從無(wú)變成了有粉怕,就可以通知因隊(duì)列為空而阻塞的出隊(duì)線程了。
E take()方法
take()方法用于得到隊(duì)頭的元素抒巢,在隊(duì)列為空時(shí)會(huì)阻塞幻工,知道隊(duì)列中有元素可取存淫。其實(shí)現(xiàn)如下:
public E take() throws InterruptedException {
? ? ? ? E x;
? ? ? ? int c = -1;
? ? ? ? final AtomicInteger count = this.count;
? ? ? ? final ReentrantLock takeLock = this.takeLock;
? ? ? ? //獲取takeLock鎖? ? ? ?
? ? ? ? ?takeLock.lockInterruptibly();
? ? ? ? try {
? ? ? ? ? ? //如果隊(duì)列為空窘拯,那么加入到notEmpty條件的等待隊(duì)列中? ? ? ? ? ?
? ? ? ? ? ? while (count.get() == 0) {
? ? ? ? ? ? ? ? notEmpty.await();
? ? ? ? ? ? }
? ? ? ? ? ? //得到隊(duì)頭元素? ? ? ? ? ?
? ? ? ? ? ? ?x = dequeue();
? ? ? ? ? ? //得到取走一個(gè)元素之前隊(duì)列的元素個(gè)數(shù)? ? ? ? ? ?
? ? ? ? ? ? ? ?c = count.getAndDecrement();
? ? ? ? ? ? //如果隊(duì)列中還有數(shù)據(jù)可取可款,釋放notEmpty條件等待隊(duì)列中的第一個(gè)線程? ? ? ? ? ?
? ? ? ? ? ? ? ? if (c > 1)
? ? ? ? ? ? ? ? notEmpty.signal();
? ? ? ? } finally {
? ? ? ? ? ? takeLock.unlock();
? ? ? ? }
? ? ? ? //如果隊(duì)列中的元素從滿到非滿炊昆,通知put線程? ? ? ?
? ? ? ? ? ?if (c == capacity)
? ? ? ? ? ? signalNotFull();
? ? ? ? return x;
? ? }
take方法總結(jié):
當(dāng)隊(duì)列為空時(shí)尔许,就加入到notEmpty(的條件等待隊(duì)列中巫俺,當(dāng)隊(duì)列不為空時(shí)就取走一個(gè)元素偎谁,當(dāng)取完發(fā)現(xiàn)還有元素可取時(shí)筑辨,再通知一下自己的伙伴(等待在條件隊(duì)列中的線程)俺驶;最后,如果隊(duì)列從滿到非滿,通知一下put線程暮现。?
remove()方法
remove()方法用于刪除隊(duì)列中一個(gè)元素还绘,如果隊(duì)列中不含有該元素,那么返回false栖袋;有的話則刪除并返回true拍顷。入隊(duì)和出隊(duì)都是只獲取一個(gè)鎖,而remove()方法需要同時(shí)獲得兩把鎖塘幅,其實(shí)現(xiàn)如下:
public boolean remove(Object o) {
? ? ? ? //因?yàn)殛?duì)列不包含null元素昔案,返回false? ? ?
? ? ? ? ? if (o == null) return false;
? ? ? ? //獲取兩把鎖? ? ? ? fullyLock();
? ? ? ? try {
? ? ? ? ? ? //從頭的下一個(gè)節(jié)點(diǎn)開(kāi)始遍歷? ? ? ? ? ?
? ? ? ? ? ? for (Node trail = head, p = trail.next;
? ? ? ? ? ? ? ? p != null;
? ? ? ? ? ? ? ? trail = p, p = p.next) {
? ? ? ? ? ? ? ? //如果匹配,那么將節(jié)點(diǎn)從隊(duì)列中移除电媳,trail表示前驅(qū)節(jié)點(diǎn)? ? ? ? ? ? ? ?
? ? ? ? ? ? ? ?if (o.equals(p.item)) {
? ? ? ? ? ? ? ? ? ? unlink(p, trail);
? ? ? ? ? ? ? ? ? ? return true;
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? ? ? return false;
? ? ? ? } finally {
? ? ? ? ? ? //釋放兩把鎖? ? ? ? ?
? ? ? ? ? ? fullyUnlock();
? ? ? ? }
? ? }
void fullyLock() {? ? ? ? putLock.lock();
? ? ? ? takeLock.lock();
? ? }
提問(wèn):為什么remove()方法同時(shí)需要兩把鎖?
LinkedBlockingQueue總結(jié):
LinkedBlockingQueue是允許兩個(gè)線程同時(shí)在兩端進(jìn)行入隊(duì)或出隊(duì)的操作的踏揣,但一端同時(shí)只能有一個(gè)線程進(jìn)行操作,這是通過(guò)兩把鎖來(lái)區(qū)分的匾乓;
為了維持底部數(shù)據(jù)的統(tǒng)一捞稿,引入了AtomicInteger的一個(gè)count變量,表示隊(duì)列中元素的個(gè)數(shù)拼缝。count只能在兩個(gè)地方變化娱局,一個(gè)是入隊(duì)的方法(可以+1),另一個(gè)是出隊(duì)的方法(可以-1)咧七,而AtomicInteger是原子安全的衰齐,所以也就確保了底層隊(duì)列的數(shù)據(jù)同步。?
4.?ArrayBlockingQueue源碼分析
ArrayBlockingQueue底層是使用一個(gè)數(shù)組實(shí)現(xiàn)隊(duì)列的猪叙,并且在構(gòu)造ArrayBlockingQueue時(shí)需要指定容量娇斩,也就意味著底層數(shù)組一旦創(chuàng)建了,容量就不能改變了穴翩,因此ArrayBlockingQueue是一個(gè)容量限制的阻塞隊(duì)列犬第。因此,在隊(duì)列全滿時(shí)執(zhí)行入隊(duì)將會(huì)阻塞芒帕,在隊(duì)列為空時(shí)出隊(duì)同樣將會(huì)阻塞歉嗓。
ArrayBlockingQueue的重要字段有如下幾個(gè):
? ? ? ? /** The queued items */?
? ? ? ? ? final Object[] items;
? ? ? /** Main lock guarding all access */?
? ? ? ?final ReentrantLock lock;
? ? /** Condition for waiting takes */? ??
? ? ? private final Condition notEmpty;
? ? /** Condition for waiting puts */? ?
? ? ? private final Condition notFull;
put(E e)方法
put(E e)方法在隊(duì)列不滿的情況下,將會(huì)將元素添加到隊(duì)列尾部背蟆,如果隊(duì)列已滿鉴分,將會(huì)阻塞,直到隊(duì)列中有剩余空間可以插入带膀。該方法的實(shí)現(xiàn)如下:
public void put(E e) throws InterruptedException {
? ? ? ? //檢查元素是否為null志珍,如果是,拋出NullPointerException? ? ? ?
? ? ? ?checkNotNull(e);
? ? ? ? final ReentrantLock lock = this.lock;
? ? ? ? //加鎖? ? ? ?
? ? ? ? lock.lockInterruptibly();
? ? ? ? try {
? ? ? ? ? ? //如果隊(duì)列已滿垛叨,阻塞伦糯,等待隊(duì)列成為不滿狀態(tài)? ? ? ? ? ?
? ? ? ? ? ? while (count == items.length)
? ? ? ? ? ? ? ? notFull.await();
? ? ? ? ? ? //將元素入隊(duì)? ? ? ? ? ?
? ? ? ? ? ? enqueue(e);
? ? ? ? } finally {
? ? ? ? ? ? lock.unlock();
? ? ? ? }
? ? }
put方法總結(jié):
1. ArrayBlockingQueue不允許元素為null?
2. ArrayBlockingQueue在隊(duì)列已滿時(shí)將會(huì)調(diào)用notFull的await()方法釋放鎖并處于阻塞狀態(tài)?
3. 一旦ArrayBlockingQueue不為滿的狀態(tài),就將元素入隊(duì)
E take()方法
take()方法用于取走隊(duì)頭的元素,當(dāng)隊(duì)列為空時(shí)將會(huì)阻塞敛纲,直到隊(duì)列中有元素可取走時(shí)將會(huì)被釋放喂击。其實(shí)現(xiàn)如下:
public E take() throws InterruptedException {
? ? ? ? final ReentrantLock lock = this.lock;
? ? ? ? //首先加鎖? ? ? ?
? ? ? ? ?lock.lockInterruptibly();
? ? ? ? try {
? ? ? ? ? ? //如果隊(duì)列為空,阻塞? ? ? ? ? ?
? ? ? ? ? ? while (count == 0)
? ? ? ? ? ? ? ? notEmpty.await();
? ? ? ? ? ? //隊(duì)列不為空淤翔,調(diào)用dequeue()出隊(duì)? ? ? ? ? ?
? ? ? ? ? ? return dequeue();
? ? ? ? } finally {
? ? ? ? ? ? //釋放鎖? ? ? ? ? ?
? ? ? ? ? lock.unlock();
? ? ? ? }
? ? }
take方法總結(jié):
一旦獲得了鎖之后翰绊,如果隊(duì)列為空,那么將阻塞旁壮;否則調(diào)用dequeue()出隊(duì)一個(gè)元素监嗜。?
ArrayBlockingQueue總結(jié):
ArrayBlockingQueue的并發(fā)阻塞是通過(guò)ReentrantLock和Condition來(lái)實(shí)現(xiàn)的,ArrayBlockingQueue內(nèi)部只有一把鎖寡具,意味著同一時(shí)刻只有一個(gè)線程能進(jìn)行入隊(duì)或者出隊(duì)的操作秤茅。
5? 總結(jié)
在上面分析LinkedBlockingQueue的源碼之后,可以與ArrayBlockingQueue做一個(gè)比較童叠。?
ArrayBlockingQueue:
一個(gè)對(duì)象數(shù)組+一把鎖+兩個(gè)條件
入隊(duì)與出隊(duì)都用同一把鎖
在只有入隊(duì)高并發(fā)或出隊(duì)高并發(fā)的情況下框喳,因?yàn)椴僮鲾?shù)組,且不需要擴(kuò)容厦坛,性能很高
采用了數(shù)組五垮,必須指定大小,即容量有限
LinkedBlockingQueue:
一個(gè)單向鏈表+兩把鎖+兩個(gè)條件
兩把鎖杜秸,一把用于入隊(duì)放仗,一把用于出隊(duì),有效的避免了入隊(duì)與出隊(duì)時(shí)使用一把鎖帶來(lái)的競(jìng)爭(zhēng)撬碟。
在入隊(duì)與出隊(duì)都高并發(fā)的情況下诞挨,性能比ArrayBlockingQueue高很多
采用了鏈表,最大容量為整數(shù)最大值呢蛤,可看做容量無(wú)限