Java 多線程(九):ArrayBlockingQueue 與 LinkedBlockingQueue

什么是阻塞隊列钩乍?

  • 阻塞隊列與我們平常接觸到的普通隊列(ArrayList)的最大不同點在于阻塞隊列的添加和刪除方法都是阻塞的
    • 阻塞添加:當(dāng)阻塞隊列元素已滿時击儡,隊列會阻塞加入元素的線程,直到隊列元素不滿時才重新喚醒線程執(zhí)行元素加入操作
    • 阻塞刪除:當(dāng)隊列元素為空時嗦枢,刪除隊列元素的線程將被阻塞致开,直到隊列不為空再執(zhí)行刪除操作

BlockingQueue

  • BlockingQueue 能夠解決多線程中如何高效安全的傳輸數(shù)據(jù)的問題悼嫉,幫助我們快速搭建高質(zhì)量的多線程程序
  • 通過隊列,可以使得數(shù)據(jù)由隊列的一端輸入回还,從另一端輸出
  • 適用場景:生產(chǎn)者線程在一端生成裆泳,消費者線程在另一端消費
BlockingQueue

BlockingQueue 主要方法

  • 插入方法:
    • add(E e):
      • 將元素插入到隊尾(如果立即可行且不會超過該隊列的容量)
      • 成功返回 true,失敗拋 IllegalStateException 異常
    • offer(E e,long timeout,TimeUnit unit):
      • 將元素插入到隊尾(如果立即可行且不會超過該隊列的容量)
      • 可設(shè)置超時時間柠硕,該方法可以中斷
      • 成功返回 true工禾,如果隊列已滿,返回 false
    • put(E e):
      • 將元素插入到隊尾蝗柔,如果隊列已滿則一直等待(阻塞)
  • 刪除方法:
    • remove(Object o):
      • 移除指定元素闻葵,成功返回 true,失敗返回 false
    • poll(long timeout, TimeUnit unit):
      • 獲取并移除此隊列的頭元素癣丧,在指定等待的時間前一直等到獲取元素
    • take():
      • 獲取并移除隊列頭元素槽畔,若沒有元素則一直阻塞
  • 檢查方法:
    • element():
      • 獲取但不移除隊列的頭元素,沒有元素則拋異常
    • peek():
      • 獲取但不移除隊列的頭胁编,若隊列為空則返回 null

BlockingQueue 基本使用

  • 該例子中使用 ArrayBlockingQueue厢钧,生產(chǎn)者(Producer)將字符串插入共享隊列中,消費者將它們?nèi)〕?/li>
public class BlockingQueueExample {
    public static void main(String[] args) throws Exception {
        BlockingQueue queue = new ArrayBlockingQueue(3);
        Producer producer = new Producer(queue);
        Consumer consumer = new Consumer(queue);
        new Thread(producer).start();
        new Thread(consumer).start();
        Thread.sleep(4000);
    }
}
  • 生產(chǎn)者通過 put() 方法將元素插入共享隊列中
public class Producer implements Runnable {

    private BlockingQueue queue;

    public Producer(BlockingQueue queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            queue.put("1");
            Thread.sleep(1000);
            queue.put("2");
            Thread.sleep(1000);
            queue.put("3");
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}
  • 消費者通過 take() 方法從隊列中取出元素嬉橙,take() 方法會阻塞直到獲取元素為止
public class Consumer implements Runnable {

    private BlockingQueue queue = null;

    public Consumer(BlockingQueue queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            System.out.println(queue.take());
            System.out.println(queue.take());
            System.out.println(queue.take());
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

BlockingQueue 接口實現(xiàn)類與源碼分析

ArrayBlockingQueue

  • 分析:
    • 基于數(shù)組的阻塞隊列實現(xiàn)早直,內(nèi)部維護(hù)了一個數(shù)組用于緩存隊列中的數(shù)據(jù)對象
    • 有兩個 Integer 類型的索引,指向添加市框、獲取下一個元素的位置的索引
    • 并通過一個重入鎖 ReentrantLock 和兩個 Condition 條件來實現(xiàn)阻塞
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

    /** 存儲數(shù)據(jù)的數(shù)組 */
    final Object[] items;

    /**獲取數(shù)據(jù)的索引霞扬,主要用于take,poll,peek祥得,remove方法 */
    int takeIndex;

    /**添加數(shù)據(jù)的索引兔沃,主要用于 put, offer, or add 方法*/
    int putIndex;

    /** 隊列元素的個數(shù) */
    int count;


    /** 控制并非訪問的鎖 */
    final ReentrantLock lock;

    /**notEmpty條件對象,用于通知take方法隊列已有元素级及,可執(zhí)行獲取操作 */
    private final Condition notEmpty;

    /**notFull條件對象乒疏,用于通知put方法隊列未滿,可執(zhí)行添加操作 */
    private final Condition notFull;

    /**
       迭代器
     */
    transient Itrs itrs = null;

}

  • 分析:
    • add 方法實際上是調(diào)用了 offer 方法
    • enqueue(E x) 方法內(nèi)部通過 putIndex 索引直接將元素添加到數(shù)組 item 中饮焦,當(dāng) putIndex 索引大小等于數(shù)組長度時怕吴,需要將 putIndex 重新設(shè)置為 0,這是因為當(dāng)前隊列元素總是從隊頭獲取县踢,從隊尾添加
//add方法實現(xiàn)转绷,間接調(diào)用了offer(e)
public boolean add(E e) {
        if (offer(e))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }

//offer方法
public boolean offer(E e) {
     checkNotNull(e);//檢查元素是否為null
     final ReentrantLock lock = this.lock;
     lock.lock();//加鎖
     try {
         if (count == items.length)//判斷隊列是否滿
             return false;
         else {
             enqueue(e);//添加元素到隊列
             return true;
         }
     } finally {
         lock.unlock();
     }
 }

//入隊操作
private void enqueue(E x) {
    //獲取當(dāng)前數(shù)組
    final Object[] items = this.items;
    //通過putIndex索引對數(shù)組進(jìn)行賦值
    items[putIndex] = x;
    //索引自增,如果已是最后一個位置硼啤,重新設(shè)置 putIndex = 0;
    if (++putIndex == items.length)
        putIndex = 0;
    count++;//隊列中元素數(shù)量加1
    //喚醒調(diào)用take()方法的線程议经,執(zhí)行元素獲取操作。
    notEmpty.signal();
}

  • 分析:
    • put 方法是一個阻塞方法谴返,如果隊列元素已滿煞肾,那么當(dāng)前線程將會被 notFull 條件對象掛起加入到等待隊列中,直到有空擋才會喚醒執(zhí)行添加操作
//put方法嗓袱,阻塞時可中斷
 public void put(E e) throws InterruptedException {
     checkNotNull(e);
      final ReentrantLock lock = this.lock;
      lock.lockInterruptibly();//該方法可中斷
      try {
          //當(dāng)隊列元素個數(shù)與數(shù)組長度相等時籍救,無法添加元素
          while (count == items.length)
              //將當(dāng)前調(diào)用線程掛起,添加到notFull條件隊列中等待喚醒
              notFull.await();
          enqueue(e);//如果隊列沒有滿直接添加渠抹。蝙昙。
      } finally {
          lock.unlock();
      }
  }

LinkedBlockingQueue

  • 基于鏈表的阻塞隊列,內(nèi)部維護(hù)著一個鏈表構(gòu)成的緩沖隊列梧却,用于緩存隊列中的數(shù)據(jù)對象
  • 在正常情況下鏈表阻塞隊列的吞吐量要高于數(shù)組的阻塞隊列(ArrayBlockingQueue)奇颠,因為其內(nèi)部實現(xiàn)添加和刪除操作使用了兩個 ReentrantLock 來控制并發(fā)執(zhí)行(插入、獲取各有一個鎖)篮幢,而 ArrayBlockingQueue 內(nèi)部只使用一個 ReentrantLock 控制并發(fā)
  • 它與 ArrayBlockingQueue 的 API 幾乎一致但內(nèi)部實現(xiàn)原理不太相同
  • 當(dāng)創(chuàng)建一個 LinkedBlockingQueue 時大刊,默認(rèn)阻塞隊列中元素的數(shù)量大小為 Interger.MAX_VALUE

LinkedBlockingQueue 和 ArrayBlockingQueue 的區(qū)別

  • 隊列大小有所不同:ArrayBlockingQueue 必須指定隊列大小,而 LinkedBlockingQueue 默認(rèn)為 Integer.MAX_VALUE(當(dāng)元素添加速度大于移除速度時三椿,需要注意一下缺菌,以免內(nèi)存溢出)
  • 實現(xiàn)結(jié)構(gòu)不同:ArrayBlockingQueue 采用數(shù)組實現(xiàn)、而 LinkedBlockingQueue 采用鏈表實現(xiàn)
  • 由于 ArrayBlockingQueue 采用數(shù)組存儲隊列元素搜锰,因此再插入伴郁、刪除元素時不會產(chǎn)生或銷毀任何額外的對象實例,而 LinkedBlockingQueue 每次插入都會生成一個新的結(jié)點(Node)對象蛋叼,這會影響日后 GC 垃圾回收
  • ArrayBlockingQueue 中添加焊傅、刪除操作只使用一個鎖(ReentrantLock)剂陡,而 LinkedBlockingQueue 添加、刪除操作各使用一個鎖狐胎,因此 LinkedBlockingQueue 的并發(fā)吞吐量大于 ArrayBlockingQueue
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末鸭栖,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子握巢,更是在濱河造成了極大的恐慌晕鹊,老刑警劉巖,帶你破解...
    沈念sama閱讀 217,826評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件暴浦,死亡現(xiàn)場離奇詭異溅话,居然都是意外死亡,警方通過查閱死者的電腦和手機歌焦,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,968評論 3 395
  • 文/潘曉璐 我一進(jìn)店門飞几,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人独撇,你說我怎么就攤上這事屑墨。” “怎么了券勺?”我有些...
    開封第一講書人閱讀 164,234評論 0 354
  • 文/不壞的土叔 我叫張陵绪钥,是天一觀的道長。 經(jīng)常有香客問我关炼,道長,這世上最難降的妖魔是什么匣吊? 我笑而不...
    開封第一講書人閱讀 58,562評論 1 293
  • 正文 為了忘掉前任儒拂,我火速辦了婚禮,結(jié)果婚禮上色鸳,老公的妹妹穿的比我還像新娘社痛。我一直安慰自己,他們只是感情好命雀,可當(dāng)我...
    茶點故事閱讀 67,611評論 6 392
  • 文/花漫 我一把揭開白布蒜哀。 她就那樣靜靜地躺著,像睡著了一般吏砂。 火紅的嫁衣襯著肌膚如雪撵儿。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,482評論 1 302
  • 那天狐血,我揣著相機與錄音淀歇,去河邊找鬼。 笑死匈织,一個胖子當(dāng)著我的面吹牛浪默,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播,決...
    沈念sama閱讀 40,271評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼泪姨,長吁一口氣:“原來是場噩夢啊……” “哼肠槽!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起阔加,我...
    開封第一講書人閱讀 39,166評論 0 276
  • 序言:老撾萬榮一對情侶失蹤饵史,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后掸哑,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體约急,經(jīng)...
    沈念sama閱讀 45,608評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,814評論 3 336
  • 正文 我和宋清朗相戀三年苗分,在試婚紗的時候發(fā)現(xiàn)自己被綠了厌蔽。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,926評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡摔癣,死狀恐怖奴饮,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情择浊,我是刑警寧澤戴卜,帶...
    沈念sama閱讀 35,644評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站琢岩,受9級特大地震影響投剥,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜担孔,卻給世界環(huán)境...
    茶點故事閱讀 41,249評論 3 329
  • 文/蒙蒙 一江锨、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧糕篇,春花似錦啄育、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,866評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至墩崩,卻和暖如春氓英,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背泰鸡。 一陣腳步聲響...
    開封第一講書人閱讀 32,991評論 1 269
  • 我被黑心中介騙來泰國打工债蓝, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人盛龄。 一個月前我還...
    沈念sama閱讀 48,063評論 3 370
  • 正文 我出身青樓饰迹,卻偏偏與公主長得像芳誓,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子啊鸭,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,871評論 2 354

推薦閱讀更多精彩內(nèi)容