BlockingQueue詳解

在Java中勘伺,BlockingQueue是一個接口,它的實現(xiàn)類有 ArrayBlockingQueue、DelayQueue盹愚、 LinkedBlockingDeque、 LinkedBlockingQueue站故、PriorityBlockingQueue皆怕、 SynchronousQueue等毅舆,它們的區(qū)別主要體現(xiàn)在存儲結(jié)構(gòu)上或?qū)υ?操作上的不同,但是對于take與put操作的原理愈腾,卻是類似的憋活。

阻塞與非阻塞

  • 入隊

    offer(E e):如果隊列沒滿,立即返回true虱黄;如果隊列滿了悦即,立即返回 false --> 不阻塞

    put(E e):如果隊列滿了,一直阻塞橱乱,直到隊列不滿了或者線程被中 斷 --> 阻塞

  • 出隊

    poll():如果沒有元素辜梳,直接返回null;如果有元素泳叠,出隊 --> 不阻塞

    take():如果隊列空了作瞄,一直阻塞,直到隊列不為空或者線程被中斷 --> 阻塞

一危纫、LinkedBlockingQueue

LinkedBlockingQueue可以指定容量宗挥,如果在初始化時沒有指定容量,那么默認使用int的最大值作為隊列容量种蝶。內(nèi)部維持一個隊列属韧,所以有一個頭節(jié)點head和一個尾節(jié)點last,內(nèi)部維持兩把鎖蛤吓,一個用于入隊宵喂,一個用于出隊,還有鎖關(guān)聯(lián)的Condition對象会傲。

1锅棕、底層數(shù)據(jù)結(jié)構(gòu)

LinkedBlockingQueue內(nèi)部是使用鏈表實現(xiàn)一個隊列的,但是有別于一般的隊列淌山,在于該隊列至少是有一個節(jié)點的裸燎,頭節(jié)點不含有元素。如果隊列為空時泼疑,頭節(jié)點的next參數(shù)為null德绿。尾節(jié)點的next參數(shù)也為null。

2退渗、主要變量

// 容量限制移稳,如果沒有指定,則為 Integer.MAX_VALUE
private final int capacity;

// 當前隊列中的元素數(shù)量
private final AtomicInteger count = new AtomicInteger();

// 隊列頭節(jié)點会油,始終滿足head.item == null
transient Node<E> head;

// 隊列的尾節(jié)點个粱,始終滿足last.next == null
private transient Node<E> last;

// 由 take、poll 等持有的鎖
private final ReentrantLock takeLock = new ReentrantLock();

// 當隊列為空時翻翩,保存執(zhí)行出隊的線程
private final Condition notEmpty = takeLock.newCondition();

// 由 put都许、offer 等持有的鎖
private final ReentrantLock putLock = new ReentrantLock();

// 當隊列滿時稻薇,保存執(zhí)行入隊的線程
private final Condition notFull = putLock.newCondition();

3、put(E e)方法

put(E e)方法用于將一個元素插入到隊列的尾部胶征。在隊列滿時會阻塞塞椎,直到隊列中有元素被取出。

// 在此隊列的尾部插入指定元素睛低,如有必要忱屑,等待空間可用。
public void put(E e) throws InterruptedException {
    // 不允許元素為null
    if (e == null) throw new NullPointerException();
    
    int c = -1;
    // 以當前元素新建一個節(jié)點
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    // 獲得入隊的鎖
    putLock.lockInterruptibly();
    try {
        // 如果隊列已滿暇昂,那么將該線程加入到Condition的等待隊列中
        while (count.get() == capacity) {
            notFull.await();
        }
        // 當隊列未滿莺戒,然后有出隊線程取出導(dǎo)致,將節(jié)點入隊
        enqueue(node);
        // 得到插入之前隊列的元素個數(shù)急波。getAndIncrement返回的是 +1 前的值
        c = count.getAndIncrement();
        // 如果還可以插入元素从铲,那么釋放等待的入隊線程
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        // 釋放入隊的鎖
        putLock.unlock();
    }
    // 如果插入隊列之前元素個數(shù)為0,插入后就通知出隊線程隊列非空
    if (c == 0)
        signalNotEmpty();
}

put方法總結(jié):

1澄暮、LinkedBlockingQueue不允許插入的元素為null名段;

2、同一時刻只有一個線程可以進行入隊操作泣懊,putLock在將元素插入隊列尾部時加鎖了伸辟;

3、如果隊列滿了馍刮,則會調(diào)用notFull.await(),將該線程加入到Condition等待隊列中信夫。await方法會釋放線程占有的鎖,這將導(dǎo)致之前由于被阻塞的入隊線程將會獲取到鎖卡啰,執(zhí)行到while循環(huán)處静稻,不過可能因為隊列仍舊是滿的,也被進入到條件隊列中匈辱;

4振湾、一旦有出隊線程取走元素,就會通知到入隊等待隊列釋放線程亡脸。那么第一個加入到Condition隊列中的將會被釋放押搪,那么該線程將會重新獲得put鎖,繼而執(zhí)行enqueue()方法浅碾,將節(jié)點插入到隊列的尾部大州;

5、然后得到插入隊列前元素的個數(shù)及穗,如果插入后隊列中還可以繼續(xù)插入元素摧茴,那么就通知notFull條件的等待隊列中的線程绵载;

6埂陆、如果插入隊列前個數(shù)為0苛白,那現(xiàn)在插入后,就為1了焚虱,那就可以通知因為隊列為空而導(dǎo)致阻塞的出隊線程去取元素了购裙。

4、E take()方法

take()方法用于得到隊頭的元素鹃栽,在隊列為空時會阻塞躏率,直到隊列中有元素可取。

public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    // 獲取takeLock鎖
    takeLock.lockInterruptibly();
    try {
        // 如果隊列中元素數(shù)量為0民鼓,則將出隊線程加入到notEmpty隊列中進行等待
        while (count.get() == 0) {
            notEmpty.await();
        }
        // 得到到隊頭的元素
        x = dequeue();
        // 得到出隊列前元素的個數(shù)薇芝。getAndDecrement返回的是 -1 前的值
        c = count.getAndDecrement();
        // 如果出隊列前的元素數(shù)量大于1,那說明還可以繼續(xù)取丰嘉,那就釋放在notEmpty隊列的第一個線程
        if (c > 1)
            notEmpty.signal();
    } finally {
        // 釋放出隊鎖
        takeLock.unlock();
    }
    // 如果出隊前隊列是滿的夯到,那現(xiàn)在取走一個元素了,隊列就不滿了饮亏,就可以去通知等待中的入隊線程了耍贾。
    if (c == capacity)
        signalNotFull();
    return x;
}

take方法總結(jié):

1、同一時刻只有一個線程可以進行出隊操作路幸,takeLock在出隊之前加鎖了荐开;

2、如果隊列中元素為空简肴,那就進入notEmpty隊列中進行等待晃听。直到隊列不為空時,得到隊列中的第一個元素砰识。當發(fā)現(xiàn)取完發(fā)現(xiàn)還有元素可取時杂伟,再通知一下notEmpty隊列中等待的其他線程。最后判斷自己取元素前的是不是滿的仍翰,如果是滿的赫粥,那自己取完,就不滿了予借,就可以通知在notFull隊列中等待插入的線程進行put了越平。

5、remove()方法

remove()方法用于刪除隊列中一個元素灵迫,如果隊列中不含有該元素秦叛,那么返回 false ;有的話則刪除并返回true瀑粥。入隊和出隊都是只獲取一個鎖挣跋,而 remove()方法需要同時獲得兩把鎖,

public boolean remove(Object o) {
    // 因為隊列不包含null元素狞换,返回false
    if (o == null) return false;
    // 獲取兩把鎖
    fullyLock();
    try {
        // 從頭的下一個節(jié)點開始遍歷
        for (Node<E> trail = head, p = trail.next;
             p != null;
             trail = p, p = p.next) {
            // 如果匹配避咆,那么將節(jié)點從隊列中移除舟肉,trail表示需要刪除節(jié)點的前一節(jié)點
            if (o.equals(p.item)) {
                unlink(p, trail);
                return true;
            }
        }
        return false;
    } finally {
        // 釋放兩把鎖
        fullyUnlock();
    }
}
/**
 * 鎖定以防止 put 和 take.
 */
void fullyLock() {
    putLock.lock();
    takeLock.lock();
}

/**
 * 解鎖以允許 put 和 take.
 */
void fullyUnlock() {
    takeLock.unlock();
    putLock.unlock();
}

6、總結(jié)

LinkedBlockingQueue允許兩個線程同時在兩端進行入隊和出隊操作查库,但一端同時只能有一個線程進行操作路媚,是通過兩個鎖進行區(qū)分的。

為了維護底部數(shù)據(jù)的統(tǒng)一樊销,引入了AtomicInteger的一個count變量整慎,表示隊列中元素的個數(shù)。count只能在兩個地方變化围苫,一個是入隊的方法(進行+1操作)裤园,另一個是出隊的方法(進行-1操作),而AtomicInteger是原子安全的剂府,所以也就確保了底層隊列的數(shù)據(jù)同步比然。

二、ArrayBlockingQueue

// 構(gòu)造方法
public ArrayBlockingQueue(int capacity) {
    this(capacity, false);
}

// 創(chuàng)建具有給定(固定)容量和指定訪問策略的ArrayBlockingQueue 周循。
// 參數(shù):capacity —— 這個隊列的容量
        fair     —— 如果為true 强法,則在插入或刪除時阻塞的線程的隊列訪問將按 FIFO 順序處理;如果為false 湾笛,則未指定訪問順序饮怯。
public ArrayBlockingQueue(int capacity, boolean fair) {
    //  如果capacity < 1,拋出異常
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}

1嚎研、底層數(shù)據(jù)結(jié)構(gòu)

ArrayBlockingQueue內(nèi)部是使用數(shù)組實現(xiàn)一個隊列的蓖墅,并且在構(gòu)造方法中就需要指定容量,也就意味著底層數(shù)組一旦創(chuàng)建了临扮,容量就不能改變了论矾,因此ArrayBlockingQueue是一個容量限制的阻塞隊列。因此在隊列滿的時候執(zhí)行入隊會阻塞杆勇,在隊列為空時出隊也會阻塞贪壳。

2、主要變量

// 元素數(shù)組蚜退,其長度在構(gòu)造方法中指定
final Object[] items;

// 隊列中實際的元素數(shù)量
int count;

// 保護所有通道的主鎖
final ReentrantLock lock;

// 等待take條件的隊列
private final Condition notEmpty;

// 等待put條件的隊列
private final Condition notFull;

3闰靴、put(E e)

在此隊列的尾部插入指定元素,如果隊列已滿钻注,則等待空間可用

public void put(E e) throws InterruptedException {
    // 檢查元素是否為null蚂且,如果是,拋出NullPointerException
    Objects.requireNonNull(e);
    final ReentrantLock lock = this.lock;
    // 加鎖
    lock.lockInterruptibly();
    try {
        // 當隊里中的元素數(shù)量等于數(shù)組長度幅恋,則隊列已滿杏死,阻塞,等待隊列成為不滿狀態(tài)
        while (count == items.length)
            notFull.await();
        // 將元素入隊
        enqueue(e);
    } finally {
        // 釋放鎖
        lock.unlock();
    }
}

put方法總結(jié):

1、ArrayBlockingQueue不允許添加null元素淑翼;

2腐巢、ArrayBlockingQueue在隊列已滿的時候,會調(diào)用notFull.await()窒舟,釋放鎖并處于阻塞狀態(tài)系忙;

3诵盼、一旦ArrayBlockingQueue在隊列不滿的時候惠豺,就立即入隊。

4风宁、E take()

取出此隊列的頭部元素洁墙,如果隊列空,則阻塞戒财,等待元素可取热监。

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    // 加鎖
    lock.lockInterruptibly();
    try {
        // 當隊列中元素數(shù)量為0時,則進入阻塞狀態(tài)
        while (count == 0)
            notEmpty.await();
        // 隊列不為空是饮寞,調(diào)用dequeue()出隊
        return dequeue();
    } finally {
        // 釋放鎖
        lock.unlock();
    }
}

take方法總結(jié):

1孝扛、取元素時,一旦獲得鎖幽崩,隊列為空苦始, 則會阻塞,直至不為空慌申,調(diào)用dequeue()出隊陌选。

5、總結(jié)

ArrayBlockingQueue是一個底層結(jié)構(gòu)是數(shù)組的阻塞隊列蹄溉,是通過 ReentrantLockCondition 來實現(xiàn)的咨油。不可插入為null的元素,入隊和出隊使用的是同一個鎖柒爵。意味著同一時刻只能有一個線程能進行入隊或者出隊操作役电。入隊時,隊列已滿則會調(diào)用notFull.await()棉胀,進入阻塞狀態(tài)宴霸。直到隊列不滿時,再進行入隊操作膏蚓。當出隊時瓢谢,隊列為空,則調(diào)用notEmpty.await()驮瞧,進入阻塞狀態(tài)氓扛,直到隊列不為空時,則出隊。

三采郎、LinkedBlockingQueue和ArrayBlockingQueue的區(qū)別

1千所、底層實現(xiàn)不同

LinkedBlockingQueue底層實現(xiàn)是鏈表,ArrayBlockingQueue底層實現(xiàn)是數(shù)組

2蒜埋、隊列容量

LinkedBlockingQueue默認的隊列長度是Integer.Max淫痰,但是可以指定容量。在入隊與出隊都高并發(fā)的情況下整份,性能比ArrayBlockingQueue高很多待错;

ArrayBlockingQueue必須在構(gòu)造方法中指定隊列長度,不可變烈评。在只有入隊高并發(fā)或出隊高并發(fā)的情況下火俄,因為操作數(shù)組,且不需要擴容讲冠,性能很高瓜客。

3、鎖的數(shù)量

LinkedBlockingQueue有兩把鎖竿开,可以有兩個線程同時進行入隊和出隊操作谱仪,但同時只能有一個線程進行入隊或出隊操作。

ArrayBlockingQueue只有一把鎖否彩,同時只能有一個線程進行入隊和出隊操作疯攒。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市胳搞,隨后出現(xiàn)的幾起案子卸例,更是在濱河造成了極大的恐慌,老刑警劉巖肌毅,帶你破解...
    沈念sama閱讀 211,884評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件筷转,死亡現(xiàn)場離奇詭異,居然都是意外死亡悬而,警方通過查閱死者的電腦和手機呜舒,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,347評論 3 385
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來笨奠,“玉大人袭蝗,你說我怎么就攤上這事“闫牛” “怎么了到腥?”我有些...
    開封第一講書人閱讀 157,435評論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長蔚袍。 經(jīng)常有香客問我乡范,道長配名,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,509評論 1 284
  • 正文 為了忘掉前任晋辆,我火速辦了婚禮渠脉,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘瓶佳。我一直安慰自己芋膘,他們只是感情好,可當我...
    茶點故事閱讀 65,611評論 6 386
  • 文/花漫 我一把揭開白布霸饲。 她就那樣靜靜地躺著为朋,像睡著了一般。 火紅的嫁衣襯著肌膚如雪贴彼。 梳的紋絲不亂的頭發(fā)上潜腻,一...
    開封第一講書人閱讀 49,837評論 1 290
  • 那天埃儿,我揣著相機與錄音器仗,去河邊找鬼。 笑死童番,一個胖子當著我的面吹牛精钮,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播剃斧,決...
    沈念sama閱讀 38,987評論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼轨香,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了幼东?” 一聲冷哼從身側(cè)響起臂容,我...
    開封第一講書人閱讀 37,730評論 0 267
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎根蟹,沒想到半個月后脓杉,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,194評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡简逮,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,525評論 2 327
  • 正文 我和宋清朗相戀三年球散,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片散庶。...
    茶點故事閱讀 38,664評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡蕉堰,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出悲龟,到底是詐尸還是另有隱情屋讶,我是刑警寧澤,帶...
    沈念sama閱讀 34,334評論 4 330
  • 正文 年R本政府宣布须教,位于F島的核電站皿渗,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜羹奉,卻給世界環(huán)境...
    茶點故事閱讀 39,944評論 3 313
  • 文/蒙蒙 一秒旋、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧诀拭,春花似錦迁筛、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,764評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至筒占,卻和暖如春贪庙,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背翰苫。 一陣腳步聲響...
    開封第一講書人閱讀 31,997評論 1 266
  • 我被黑心中介騙來泰國打工止邮, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人奏窑。 一個月前我還...
    沈念sama閱讀 46,389評論 2 360
  • 正文 我出身青樓导披,卻偏偏與公主長得像,于是被迫代替她去往敵國和親埃唯。 傳聞我的和親對象是個殘疾皇子撩匕,可洞房花燭夜當晚...
    茶點故事閱讀 43,554評論 2 349