阻塞隊列

身份越來越多,自己越來越少淘正。 — 《一念天堂》

寫在前面

阻塞隊列常用于生產者和消費者的場景祸穷,生產者就是往隊列中放入元素,消費者就是從隊列中獲取元素停蕉,阻塞隊列就是生產者存放元素的容器愕鼓,而消費者也從該容器中拿元素。

阻塞隊列有兩種常見的阻塞場景慧起,滿足這兩種阻塞場景的隊列就是阻塞隊列菇晃,分別如下:

  • 當隊列中沒有元素的情況下,消費者端的所有線程會被自動阻塞蚓挤,直到生產者往隊列中放入元素磺送,線程會被自動喚醒驻子。
  • 當隊列中元素填滿的情況下,生產者端的所有線程會被自動阻塞估灿,直到消費者從隊列中獲取元素崇呵,線程會被自動喚醒。

Java中的阻塞隊列

Java中提供了7個阻塞隊列馅袁,分別如下:

  • ArrayBlockingQueue:由數(shù)組結構組成的有界阻塞隊列域慷,按照先進先出的原則對元素進行排序,支持公平鎖和非公平鎖汗销。
  • LinkedBlockingQueue:由鏈表結構組成的有界阻塞隊列犹褒,按照先進先出的原則對元素進行排序,默認長度為Integer.MAX_VALUE大溜。
  • PriorityBlockingQueue:支持優(yōu)先級排序的無界阻塞隊列化漆,默認自然序對元素進行排序,可以自定義實現(xiàn)compareTo()方法指定排序規(guī)則钦奋,不保證同優(yōu)先級元素的順序座云。
  • DelayedQueue:使用優(yōu)先級隊列實現(xiàn)的無界阻塞隊列,在創(chuàng)建元素時付材,可以指定多久才能從隊列中獲取元素朦拖,只有延時期滿后才能從隊列中獲取元素。
  • SynchronousQueue:不存儲元素的阻塞隊列厌衔,每一個put操作都要等待take操作璧帝,否則不能添加元素,支持公平鎖和非公平鎖富寿。
  • LinkedTransferQueue:由鏈表結構組成的無界阻塞隊列睬隶,相當于其他隊列,多了transfer和tryTransfer方法页徐。
  • LinkedBlockingDeque:由鏈表結構組成的雙向阻塞隊列苏潜,隊首和隊尾都可以添加和移除元素,多線程并發(fā)時变勇,可以將鎖的競爭最多降到一半恤左。

ArrayBlockingQueue和LinkedBlockingQueue一般為常用的阻塞隊列。

阻塞隊列的使用

接下來通過一個Demo演示阻塞隊列的用法搀绣。

public class MainActivity extends AppCompatActivity {

    private static final String TAG = MainActivity.class.getSimpleName();

    private ArrayBlockingQueue<String> mBlockingQueue = new ArrayBlockingQueue<>(10);

    private Producer mProducer;
    private Consumer mConsumer;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);

        mProducer = new Producer();
        mProducer.start();
        mConsumer = new Consumer();
        mConsumer.start();
    }

    @Override
    protected void onStop() {
        super.onStop();
        mProducer.stopProduct();
        mConsumer.stopConsume();
    }

    /**
     * 生產者
     */
    private class Producer extends Thread {

        private volatile boolean isStop;

        private int event;

        public void stopProduct() {
            isStop = true;
        }

        @Override
        public void run() {
            super.run();
            while (!isStop) {
                try {
                    // 事件 - 5 發(fā)送完成后飞袋,睡2秒
                    if (event == 5) {
                        Thread.sleep(2000);
                    }
                    // 事件 - 10 發(fā)送完成后,調用stopProduct()
                    if (event == 10) {
                        stopProduct();
                    }
                    event++;
                    // 隊列中沒有空余位置链患,生產者端的線程進入阻塞狀態(tài)巧鸭,直到消費者端的線程從隊列中拿元素,喚醒生產者端的線程繼續(xù)執(zhí)行
                    mBlockingQueue.put("事件 - " + event);
                    Log.d(TAG, "生產者 生產事件 = " + event);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    /**
     * 消費者
     */
    private class Consumer extends Thread {

        private volatile boolean isStop;

        public void stopConsume() {
            isStop = true;
        }

        @Override
        public void run() {
            super.run();
            while (!isStop) {
                try {
                    // 拿不到元素消費者端的線程進入阻塞狀態(tài)麻捻,直到生產者端的線程往隊列中放入元素蹄皱,喚醒消費者端的線程繼續(xù)執(zhí)行
                    String event = mBlockingQueue.take();
                    Log.d(TAG, "消費者 消費事件 = " + event);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

這里維護了一個ArrayBlockingQueue览闰,并指定其大小為10芯肤,創(chuàng)建了一個生產者線程和一個消費者線程巷折,生產者線程在生產5個事件后睡兩秒鐘,消費者線程在消費完“事件 - 5”后由于從隊列中拿不到元素崖咨,就會自動阻塞锻拘,等待生產者往隊列中放入元素,只要隊列中有生產者放入元素击蹲,就會立即喚醒消費者線程繼續(xù)獲取元素署拟,詳見以下Log:

2019-09-02 21:23:58.822 18692-18764/com.chad.blockingqueue D/MainActivity: 生產者 生產事件 = 1
2019-09-02 21:23:58.822 18692-18764/com.chad.blockingqueue D/MainActivity: 生產者 生產事件 = 2
2019-09-02 21:23:58.822 18692-18765/com.chad.blockingqueue D/MainActivity: 消費者 消費事件 = 事件 - 1
2019-09-02 21:23:58.823 18692-18765/com.chad.blockingqueue D/MainActivity: 消費者 消費事件 = 事件 - 2
2019-09-02 21:23:58.823 18692-18764/com.chad.blockingqueue D/MainActivity: 生產者 生產事件 = 3
2019-09-02 21:23:58.823 18692-18765/com.chad.blockingqueue D/MainActivity: 消費者 消費事件 = 事件 - 3
2019-09-02 21:23:58.823 18692-18764/com.chad.blockingqueue D/MainActivity: 生產者 生產事件 = 4
2019-09-02 21:23:58.823 18692-18765/com.chad.blockingqueue D/MainActivity: 消費者 消費事件 = 事件 - 4
2019-09-02 21:23:58.823 18692-18764/com.chad.blockingqueue D/MainActivity: 生產者 生產事件 = 5
2019-09-02 21:23:58.823 18692-18765/com.chad.blockingqueue D/MainActivity: 消費者 消費事件 = 事件 - 5

生產者線程睡2秒,繼續(xù)生產事件

2019-09-02 21:24:00.825 18692-18764/com.chad.blockingqueue D/MainActivity: 生產者 生產事件 = 6
2019-09-02 21:24:00.825 18692-18765/com.chad.blockingqueue D/MainActivity: 消費者 消費事件 = 事件 - 6
2019-09-02 21:24:00.825 18692-18764/com.chad.blockingqueue D/MainActivity: 生產者 生產事件 = 7
2019-09-02 21:24:00.825 18692-18764/com.chad.blockingqueue D/MainActivity: 生產者 生產事件 = 8
2019-09-02 21:24:00.825 18692-18765/com.chad.blockingqueue D/MainActivity: 消費者 消費事件 = 事件 - 7
2019-09-02 21:24:00.825 18692-18764/com.chad.blockingqueue D/MainActivity: 生產者 生產事件 = 9
2019-09-02 21:24:00.826 18692-18765/com.chad.blockingqueue D/MainActivity: 消費者 消費事件 = 事件 - 8
2019-09-02 21:24:00.826 18692-18764/com.chad.blockingqueue D/MainActivity: 生產者 生產事件 = 10
2019-09-02 21:24:00.826 18692-18764/com.chad.blockingqueue D/MainActivity: 生產者 生產事件 = 11
2019-09-02 21:24:00.826 18692-18765/com.chad.blockingqueue D/MainActivity: 消費者 消費事件 = 事件 - 9
2019-09-02 21:24:00.826 18692-18765/com.chad.blockingqueue D/MainActivity: 消費者 消費事件 = 事件 - 10
2019-09-02 21:24:00.826 18692-18765/com.chad.blockingqueue D/MainActivity: 消費者 消費事件 = 事件 - 11

阻塞隊列的原理

下面通過分析ArrayBlockingQueue的原理加深對阻塞隊列的理解歌豺。

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

    /** 存放元素的數(shù)組 */
    final Object[] items;

    /** 隊首元素下標 */
    int takeIndex;

    /** 隊尾元素下標 */
    int putIndex;

    /** 當前隊列中存放的元素總數(shù) */
    int count;

    /** 重入鎖 */
    final ReentrantLock lock;

    /** 等待獲取元素的條件 */
    private final Condition notEmpty;

    /** 等待放入元素的條件 */
    private final Condition notFull;

    /** 構造函數(shù)推穷,參數(shù)capacity為該隊列的容量 */
    public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }

    /** 構造函數(shù),參數(shù)capacity為該隊列的容量类咧,參數(shù)fair為重入鎖是公平鎖還是非公平鎖 */
    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

    /** 構造函數(shù)馒铃,構造函數(shù),參數(shù)capacity為該隊列的容量痕惋,參數(shù)fair為重入鎖是公平鎖還是非公平鎖区宇,參數(shù)c為初始隊列的元素集合 */
    public ArrayBlockingQueue(int capacity, boolean fair,
                              Collection<? extends E> c) {
        this(capacity, fair);

        final ReentrantLock lock = this.lock;
        // 得到鎖
        lock.lock(); // Lock only for visibility, not mutual exclusion
        // 遍歷元素集合,初始化元素數(shù)組
        try {
            int i = 0;
            try {
                for (E e : c) {
                    checkNotNull(e);
                    items[i++] = e;
                }
            } catch (ArrayIndexOutOfBoundsException ex) {
                throw new IllegalArgumentException();
            }
            count = i;
            putIndex = (i == capacity) ? 0 : i;
        } finally {
            // 釋放鎖
            lock.unlock();
        }
    }

    /**
     * 放入元素
     * 如果元素數(shù)組沒有空余位置值戳,不會阻塞消費者端的線程议谷,直接返回false
     * 如果元素數(shù)組還有空余位置,調用enqueue()函數(shù)堕虹,并且返回true
     */
    public boolean offer(E e) {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        // 得到鎖
        lock.lock();
        try {
            if (count == items.length)
                return false;
            else {
                enqueue(e);
                return true;
            }
        } finally {
            // 釋放鎖
            lock.unlock();
        }
    }

    /**
     * 放入元素
     * 如果元素數(shù)組沒有空余位置卧晓,調用notFull.awaitNanos(nanos),會使生產者端的線程進入阻塞狀態(tài)等待一段時間赴捞,
     * 當?shù)却瑫r后逼裆,如果元素數(shù)組依然沒有空余位置,直接返回false
     * 如果元素數(shù)組還有空余位置螟炫,調用enqueue()函數(shù)波附,并且返回true
     */
    public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {

        checkNotNull(e);
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        // 得到可中斷的鎖
        lock.lockInterruptibly();
        try {
            while (count == items.length) {
                if (nanos <= 0)
                    return false;
                nanos = notFull.awaitNanos(nanos);
            }
            enqueue(e);
            return true;
        } finally {
            // 釋放鎖
            lock.unlock();
        }
    }

    /**
     * 放入元素
     * 如果元素數(shù)組沒有空余位置,調用notFull.await()使生產者端的線程進入阻塞狀態(tài)昼钻,
     * 直到有消費者從隊列中獲取元素并且會喚醒生產者端進入阻塞狀態(tài)的線程繼續(xù)執(zhí)行
     * 如果元素數(shù)組還有空余位置掸屡,調用enqueue()函數(shù)
     */
    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        // 得到可中斷的鎖
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                // 使生產者端的線程進入阻塞狀態(tài)
                notFull.await();
            enqueue(e);
        } finally {
            // 釋放鎖
            lock.unlock();
        }
    }

    /**
     * 放入元素(核心函數(shù))
     */
    private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        // 喚醒消費端因隊列沒有元素獲取而進入阻塞狀態(tài)的線程繼續(xù)執(zhí)行
        notEmpty.signal();
    }

    /**
     * 獲取元素
     * 如果元素數(shù)組中沒有元素,則直接返回null然评,否則調用dequeue()返回隊首元素
     */
    public E poll() {
        final ReentrantLock lock = this.lock;
        // 得到鎖
        lock.lock();
        try {
            return (count == 0) ? null : dequeue();
        } finally {
            // 釋放鎖
            lock.unlock();
        }
    }

    /**
     * 獲取元素
     * 如果元素數(shù)組中沒有元素仅财,則調用notEmpty.awaitNanos(nanos)使消費者端線程進入阻塞狀態(tài),
     * 直到有生產者往隊列中放入元素并且會喚醒消費者端進入阻塞狀態(tài)的線程繼續(xù)執(zhí)行
     * 如果元素數(shù)組中還有元素碗淌,調用dequeue()函數(shù)返回隊首元素
     */
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        // 得到可中斷的鎖
        lock.lockInterruptibly();
        try {
            while (count == 0) {
                if (nanos <= 0)
                    return null;
                nanos = notEmpty.awaitNanos(nanos);
            }
            return dequeue();
        } finally {
            // 釋放鎖
            lock.unlock();
        }
    }
    
    /**
     * 獲取元素
     * 如果元素數(shù)組沒有元素盏求,調用notEmpty.await()使消費者端的線程進入阻塞狀態(tài)抖锥,
     * 直到有生產者往隊列中放入元素并且會喚醒消費者端進入阻塞狀態(tài)的線程繼續(xù)執(zhí)行
     * 如果元素數(shù)組還有元素,調用enqueue()函數(shù)返回隊首元素
     */
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        // 得到可中斷的鎖
        lock.lockInterruptibly();
        try {
            while (count == 0)
                // 使消費者端的線程進入阻塞狀態(tài)
                notEmpty.await();
            return dequeue();
        } finally {
            // 釋放鎖
            lock.unlock();
        }
    }

    /**
     * 獲取元素(核心函數(shù))
     */
    private E dequeue() {
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        // 喚醒生產者端因隊列元素填滿而進入阻塞狀態(tài)的線程繼續(xù)執(zhí)行
        notFull.signal();
        return x;
    }
}

總結

在生產者消費模型中碎罚,生產數(shù)據(jù)和消費數(shù)據(jù)的速率不一致磅废,如果生產數(shù)據(jù)速度快一些,消費不過來荆烈,就會導致數(shù)據(jù)丟失拯勉,這時候我們就可以使用阻塞隊列來解決這個問題。

阻塞隊列是一個隊列憔购,我們使用單線程生產數(shù)據(jù)宫峦,使用多線程消費數(shù)據(jù)。由于阻塞隊列的特點:隊列為空的時候消費者端阻塞玫鸟,隊列滿的時候生產者端阻塞导绷。多線程消費數(shù)據(jù)起到了加速消費的作用,使得生產的數(shù)據(jù)不會在隊列里積壓過多屎飘,而生產的數(shù)據(jù)也不會丟失處理妥曲。

?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市枚碗,隨后出現(xiàn)的幾起案子逾一,更是在濱河造成了極大的恐慌,老刑警劉巖肮雨,帶你破解...
    沈念sama閱讀 218,036評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件遵堵,死亡現(xiàn)場離奇詭異,居然都是意外死亡怨规,警方通過查閱死者的電腦和手機陌宿,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,046評論 3 395
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來波丰,“玉大人壳坪,你說我怎么就攤上這事£蹋” “怎么了爽蝴?”我有些...
    開封第一講書人閱讀 164,411評論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長纫骑。 經常有香客問我蝎亚,道長,這世上最難降的妖魔是什么先馆? 我笑而不...
    開封第一講書人閱讀 58,622評論 1 293
  • 正文 為了忘掉前任发框,我火速辦了婚禮,結果婚禮上煤墙,老公的妹妹穿的比我還像新娘梅惯。我一直安慰自己宪拥,他們只是感情好,可當我...
    茶點故事閱讀 67,661評論 6 392
  • 文/花漫 我一把揭開白布铣减。 她就那樣靜靜地躺著她君,像睡著了一般。 火紅的嫁衣襯著肌膚如雪徙歼。 梳的紋絲不亂的頭發(fā)上犁河,一...
    開封第一講書人閱讀 51,521評論 1 304
  • 那天,我揣著相機與錄音魄梯,去河邊找鬼。 笑死宾符,一個胖子當著我的面吹牛酿秸,可吹牛的內容都是我干的。 我是一名探鬼主播魏烫,決...
    沈念sama閱讀 40,288評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼辣苏,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了哄褒?” 一聲冷哼從身側響起稀蟋,我...
    開封第一講書人閱讀 39,200評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎呐赡,沒想到半個月后退客,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經...
    沈念sama閱讀 45,644評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡链嘀,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,837評論 3 336
  • 正文 我和宋清朗相戀三年萌狂,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片怀泊。...
    茶點故事閱讀 39,953評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡茫藏,死狀恐怖,靈堂內的尸體忽然破棺而出霹琼,到底是詐尸還是另有隱情务傲,我是刑警寧澤,帶...
    沈念sama閱讀 35,673評論 5 346
  • 正文 年R本政府宣布枣申,位于F島的核電站售葡,受9級特大地震影響,放射性物質發(fā)生泄漏糯而。R本人自食惡果不足惜天通,卻給世界環(huán)境...
    茶點故事閱讀 41,281評論 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望熄驼。 院中可真熱鬧像寒,春花似錦烘豹、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,889評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至筷笨,卻和暖如春憔鬼,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背胃夏。 一陣腳步聲響...
    開封第一講書人閱讀 33,011評論 1 269
  • 我被黑心中介騙來泰國打工轴或, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人仰禀。 一個月前我還...
    沈念sama閱讀 48,119評論 3 370
  • 正文 我出身青樓照雁,卻偏偏與公主長得像,于是被迫代替她去往敵國和親答恶。 傳聞我的和親對象是個殘疾皇子饺蚊,可洞房花燭夜當晚...
    茶點故事閱讀 44,901評論 2 355

推薦閱讀更多精彩內容