JAVA concurrency -- 阻塞隊列ArrayBlockingQueue源碼詳解

概述

ArrayBlockingQueue顧名思義存炮,使用數(shù)組實現(xiàn)的阻塞隊列。今天我們就來詳細講述下他的代碼實現(xiàn)

阻塞隊列

什么是阻塞隊列?

阻塞隊列是一種特殊的隊列,使用場景為并發(fā)環(huán)境下。在某種情況下(當線程無法獲取鎖的時候)線程會被掛起并且在隊列中等待,如果條件具備(鎖被釋放)那么就會喚醒掛起的線程泞边。

通俗點來講的話勺卢,阻塞隊列類似于理發(fā)店的等待區(qū),當沒有理發(fā)師空閑的時候尽狠,客人會在等待區(qū)等待榴鼎,一旦有了空閑,就會有人自動遞補晚唇。

類的繼承關系

ArrayBlockingQueue繼承關系.png

ArrayBlockingQueue繼承了抽象隊列巫财,并且實現(xiàn)了阻塞隊列,因此它具備隊列的所有基本特性哩陕。

基本實現(xiàn)原理

ArrayBlockingQueue的實現(xiàn)是基于ReentrantLock以及AQS內部實現(xiàn)的鎖機制以及Condition機制平项。
ArrayBlockingQueue內部聲明了兩個Condition變量,一個叫notEmpty悍及,一個叫notFull闽瓢,當有數(shù)據(jù)加入隊列時嘗試喚醒notEmpty,當有數(shù)據(jù)移除隊列時則喚醒notFull心赶,從而實現(xiàn)一個類似于生產(chǎn)者消費者模型的機制扣讼。

源碼分析

類成員變量

    // 隊列的存儲對象數(shù)組
    final Object[] items;

    // 下一個取出的序號
    int takeIndex;

    // 下一個放入隊列的序號
    int putIndex;

    // 隊列中的元素數(shù)目
    int count;

    // 鎖以及用來控制隊列的兩個條件變量
    final ReentrantLock lock;

    private final Condition notEmpty;

    private final Condition notFull;

    transient Itrs itrs = null;

構造函數(shù)

    public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }
    
    // 通用的構造函數(shù),以容量和是否公平鎖為參數(shù)缨叫,余下兩個構造函數(shù)均調用此函數(shù)
    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

    public ArrayBlockingQueue(int capacity, boolean fair,
                              Collection<? extends E> c) {
        // 調用構造函數(shù)
        this(capacity, fair);

        // 為阻塞隊列初始化數(shù)據(jù)(此操作需要上鎖)
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            int i = 0;
            try {
                // 將集合中的數(shù)據(jù)存放到數(shù)組中并且進行判空操作
                for (E e : c) {
                    checkNotNull(e);
                    items[i++] = e;
                }
            } catch (ArrayIndexOutOfBoundsException ex) {
                throw new IllegalArgumentException();
            }
            // 修改count和putIndex的值
            count = i;
            putIndex = (i == capacity) ? 0 : i;
        } finally {
            lock.unlock();
        }
    }

這里有一點疑問椭符,這里明明是構造函數(shù)荔燎,是類初始化的地方,照理來說不會產(chǎn)生競爭销钝,為什么要進行加鎖操作呢有咨?此處原本有一句原版的注釋 Lock only for visibility, not mutual exclusion 鎖是為了可見性而不是互斥。這句話怎么理解呢蒸健?我們仔細觀察代碼座享,發(fā)現(xiàn)當我們把集合中的數(shù)據(jù)全部插入隊列中之后,我們會修改相應的count以及putIndex的數(shù)值似忧,但是如果我們沒有加鎖渣叛,那么在集合插入完成前count以及putIndex沒有完成初始化操作的時候如果有其他線程進行了插入等操作的話,會造成數(shù)據(jù)同步問題從而使得數(shù)據(jù)不準確盯捌,因此這里的鎖是必要的淳衙。

隊列操作

基礎隊列操作enqueue和dequeue

    // 隊列的插入操作
    private void enqueue(E x) {
        // 本地聲明一個item數(shù)組的引用
        final Object[] items = this.items;
        // 將元素放入數(shù)組中
        items[putIndex] = x;
        // 如果此時已經(jīng)到了數(shù)組的末尾了,將putIndex重置為0
        if (++putIndex == items.length)
            putIndex = 0;
        // 元素數(shù)目加1
        count++;
        // 發(fā)出通知告訴所有取數(shù)據(jù)的線程可以取數(shù)據(jù)
        notEmpty.signal();
    }

    // 隊列的移除操作
    private E dequeue() {
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        // 找到要移除的數(shù)據(jù)置空
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        // 如果此時已經(jīng)到了數(shù)組的末尾了挽唉,將takeIndex重置為0
        if (++takeIndex == items.length)
            takeIndex = 0;
        // 元素數(shù)目減1
        count--;
        // 迭代器操作滤祖,這個之后再說
        if (itrs != null)
            itrs.elementDequeued();
        // 發(fā)出通知告知插入線程可以工作
        notFull.signal();
        return x;
    }

這兩個方法是隊列操作的基本方法,基本上就是常規(guī)的數(shù)組數(shù)據(jù)插入移除瓶籽,只是有一點很讓人困惑 final Object[] items = this.items; 這段代碼實現(xiàn)將類成員對象在本地創(chuàng)建了一個引用匠童,然后在本地使用引用進行操作,為什么要多此一舉呢塑顺?除此之外汤求,代碼中大量用到了這種手法,例如: final ReentrantLock lock = this.lock; 這又是為了什么呢严拒?對此筆者猜測可能是和優(yōu)化相關扬绪,因為jdk7中的實現(xiàn)與之不同,是使用的類變量直接操作裤唠。在進行了資料查閱后挤牛,筆者找到了一個相對靠譜的解釋:

這是ArrayBlockingQueue的作者Doug Lea的習慣,他認為這種書寫習慣是對機器更加友好的書寫

當然也有一些大神有一些其他的解釋:

final本身是不可變的种蘸,但是由于反射以及序列化操作的存在墓赴,final的不可變性就變得捉摸不定,除此之外一些編譯器層面上在final上優(yōu)化的不夠好航瞭,導致會在使用到數(shù)據(jù)的時候反復重載導致緩存失效

希望大家可以自己認真思考下诫硕,然后嘗試下,得到自己的結論刊侯。

阻塞隊列的插入操作

    public boolean offer(E e) {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            // 如果阻塞隊列已滿章办,那么插入失敗
            if (count == items.length)
                return false;
            else {
                // 否則插入成功
                enqueue(e);
                return true;
            }
        } finally {
            lock.unlock();
        }
    }

    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }

    public boolean add(E e) {
        if (offer(e))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }

阻塞隊列插入操作大致就以上幾種,這幾種的區(qū)別在代碼中也體現(xiàn)得比較清楚了:

  1. offer返回的是布爾值,插入成功返回true否則(隊列已滿)返回false
  2. put沒有返回值藕届,假如隊列是滿的挪蹭,他會一直阻塞直到隊列為空的時候執(zhí)行插入操作
  3. add實際上調用的就是offer,只是他在加入失敗后會拋出異常

阻塞隊列的移除操作

    public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return (count == 0) ? null : dequeue();
        } finally {
            lock.unlock();
        }
    }

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

    public E peek() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return itemAt(takeIndex);
        } finally {
            lock.unlock();
        }
    }
  1. poll執(zhí)行成功會返回隊列元素翰舌,如果隊列為空則直接返回null
  2. take執(zhí)行成功會返回隊列元素嚣潜,但是如果隊列為空他不會返回而是等待有數(shù)據(jù)插入冬骚,然后取出
  3. peek則是直接獲取隊列元素椅贱,并且執(zhí)行后不會將元素從隊列中刪除

迭代器實現(xiàn)

由于迭代器和內部隊列共享數(shù)據(jù),再加上阻塞隊列的特性只冻,導致為了實現(xiàn)迭代器功能庇麦,需要新增一些很復雜的代碼實現(xiàn)。

內部聲明了兩個類來實現(xiàn)迭代器喜德,一個是Itr繼承Iterator<E>山橄,一個則是Itrs

Itrs

Itrs是用來管理迭代器的舍悯。由于阻塞隊列內部可能會有多個迭代器在同時工作航棱,在迭代器內部發(fā)生刪除或者是一些不常見的操作時可能會產(chǎn)生一些問題,比如他們會丟失自己的數(shù)據(jù)之類的萌衬。所以Itrs內部會維護一個變量用于記錄循環(huán)的圈數(shù)饮醇,并且在刪除操作removeAt的時候會通知所有的迭代器。

    class Itrs {
        // 創(chuàng)建一個Node類作為單向鏈表(節(jié)點是弱引用)來管理迭代器
        private class Node extends WeakReference<Itr> {
            Node next;

            Node(Itr iterator, Node next) {
                super(iterator);
                this.next = next;
            }
        }

        // 循環(huán)圈數(shù)
        int cycles = 0;

        // 鏈表頭
        private Node head;

        // 清理相關的變量
        private Node sweeper = null;

        private static final int SHORT_SWEEP_PROBES = 4;
        private static final int LONG_SWEEP_PROBES = 16;

        Itrs(Itr initial) {
            register(initial);
        }

        // 清理無效的迭代器(如果sweeper為空秕豫,則從頭開始朴艰,否則從sweeper記錄的節(jié)點開始)
        void doSomeSweeping(boolean tryHarder) {
            
        }

        // 新增加一個迭代器
        void register(Itr itr) {
            head = new Node(itr, head);
        }

        // 當takeIndex為0時調用此方法
        void takeIndexWrapped() {
            // cycle數(shù)+1,內部實現(xiàn)通知所有迭代器并進行清理(鏈表遍歷)
        }

        // 有移除操作的時候調用此方法混移,并通知所有迭代器進行清理
        void removedAt(int removedIndex) {
            // 簡單的鏈表遍歷祠墅,內部調用Itr的removedAt方法
        }

        // 當發(fā)現(xiàn)隊列為空的時候調用此方法,清理迭代器內的弱引用
        void queueIsEmpty() {
            
        }

        // 有元素被取時是調用
        void elementDequeued() {
            // 如果數(shù)組為空調用queueIsEmpty進行清理
            if (count == 0)
                queueIsEmpty();
            // 如果takeIndex為0歌径,調用takeIndexWrapped毁嗦,來進行循環(huán)+1操作
            else if (takeIndex == 0)
                takeIndexWrapped();
        }
    }

Itr

Itrs是管理迭代器的,Itr則是迭代器的具體實現(xiàn)

    private class Itr implements Iterator<E> {
        // 游標回铛,用于尋找下一個元素
        private int cursor;

        // 下一個元素
        private E nextItem;

        // 下一個元素的下標
        private int nextIndex;

        // 上一個元素
        private E lastItem;

        // 上一個元素的下標
        private int lastRet;

        // 上一個take的下標
        private int prevTakeIndex;

        // 上一個循環(huán)
        private int prevCycles;

        // 標記為空
        private static final int NONE = -1;

        // 刪除標記
        private static final int REMOVED = -2;

        // DETACH標記專用于prevTakeIndex
        private static final int DETACHED = -3;

        Itr() {
            // 這是構造函數(shù)狗准,內部實現(xiàn)主要是初始化為主,
            // 并且在Itrs不為空的時候進行一波清理操作
        }

        boolean isDetached() {
            return prevTakeIndex < 0;
        }

        private int incCursor(int index) {
            // 游標+1勺届,并重新計算值(判斷是否走完一個循環(huán)驶俊,是否等于putIndex)
            if (++index == items.length)
                index = 0;
            if (index == putIndex)
                index = NONE;
            return index;
        }

        // 判斷給的刪除數(shù)是否是有效值
        private boolean invalidated(int index, int prevTakeIndex,
                                    long dequeues, int length) {
            
        }

        // 計算在迭代器的上一次操作后所有的刪除(出隊)操作
        private void incorporateDequeues() {
            // 主要方法為通過當前圈數(shù)和之前的圈數(shù)以及偏移量計算
            // 真實的刪除數(shù),并且和prevTakeIndex以及index的偏移量進行比較
        }

        // 進行detach操作并進行清理
        private void detach() {
            
        }

        // 判斷是否有下一個節(jié)點
        public boolean hasNext() {
            
        }

        // 沒有下一個節(jié)點(沒有detach的節(jié)點將會被執(zhí)行detach操作)
        private void noNext() {
            
        }

        // 找到下個節(jié)點
        public E next() {
            // 實現(xiàn)不復雜免姿,主要是需要判斷節(jié)點是否是detach模式
        }

        // 刪除節(jié)點
        public void remove() {
            
        }

        // 當隊列為空或者后續(xù)很難找到下個節(jié)點的時候通知迭代器
        void shutdown() {
            
        }

        // 輔助計算游標和prevTakeIndex之間的距離
        private int distance(int index, int prevTakeIndex, int length) {
            
        }

        // 刪除節(jié)點
        boolean removedAt(int removedIndex) {
            
        }

        // 當takeIndex歸0時調用
        boolean takeIndexWrapped() {
            
        }
    }

總結

ArrayBlockingQueue的實現(xiàn)可以說是比較的簡單清晰饼酿,主要是利用了ReentrantLock內部的Condition,通過設置兩個條件來巧妙地完成阻塞隊列的實現(xiàn),只要能夠理解這兩個條件的工作原理故俐,源碼的理解就沒有太大的難度想鹰。ArrayBlockingQueue較難理解的反而是它內部的迭代器,由于阻塞隊列的特性药版,他的迭代器可能會有丟失當前數(shù)據(jù)的風險辑舷,因此,作者創(chuàng)作的時候加入了許多復雜的方法來保證可靠性槽片,但是在這里由于篇幅限制何缓,以及迭代器在阻塞隊列中的地位和重要性并不高,所以簡單講述还栓,如果有興趣可以自己找一份源碼閱讀碌廓。

?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市剩盒,隨后出現(xiàn)的幾起案子谷婆,更是在濱河造成了極大的恐慌,老刑警劉巖辽聊,帶你破解...
    沈念sama閱讀 206,214評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件纪挎,死亡現(xiàn)場離奇詭異,居然都是意外死亡跟匆,警方通過查閱死者的電腦和手機异袄,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,307評論 2 382
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來贾铝,“玉大人隙轻,你說我怎么就攤上這事」缚” “怎么了玖绿?”我有些...
    開封第一講書人閱讀 152,543評論 0 341
  • 文/不壞的土叔 我叫張陵,是天一觀的道長叁巨。 經(jīng)常有香客問我斑匪,道長,這世上最難降的妖魔是什么锋勺? 我笑而不...
    開封第一講書人閱讀 55,221評論 1 279
  • 正文 為了忘掉前任蚀瘸,我火速辦了婚禮,結果婚禮上庶橱,老公的妹妹穿的比我還像新娘贮勃。我一直安慰自己,他們只是感情好苏章,可當我...
    茶點故事閱讀 64,224評論 5 371
  • 文/花漫 我一把揭開白布寂嘉。 她就那樣靜靜地躺著奏瞬,像睡著了一般。 火紅的嫁衣襯著肌膚如雪泉孩。 梳的紋絲不亂的頭發(fā)上硼端,一...
    開封第一講書人閱讀 49,007評論 1 284
  • 那天,我揣著相機與錄音寓搬,去河邊找鬼珍昨。 笑死,一個胖子當著我的面吹牛句喷,可吹牛的內容都是我干的镣典。 我是一名探鬼主播,決...
    沈念sama閱讀 38,313評論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼脏嚷,長吁一口氣:“原來是場噩夢啊……” “哼骆撇!你這毒婦竟也來了瞒御?” 一聲冷哼從身側響起父叙,我...
    開封第一講書人閱讀 36,956評論 0 259
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎肴裙,沒想到半個月后趾唱,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,441評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡蜻懦,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 35,925評論 2 323
  • 正文 我和宋清朗相戀三年甜癞,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片宛乃。...
    茶點故事閱讀 38,018評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡悠咱,死狀恐怖,靈堂內的尸體忽然破棺而出征炼,到底是詐尸還是另有隱情析既,我是刑警寧澤,帶...
    沈念sama閱讀 33,685評論 4 322
  • 正文 年R本政府宣布谆奥,位于F島的核電站眼坏,受9級特大地震影響,放射性物質發(fā)生泄漏酸些。R本人自食惡果不足惜宰译,卻給世界環(huán)境...
    茶點故事閱讀 39,234評論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望魄懂。 院中可真熱鬧沿侈,春花似錦、人聲如沸市栗。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,240評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至智厌,卻和暖如春诲泌,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背铣鹏。 一陣腳步聲響...
    開封第一講書人閱讀 31,464評論 1 261
  • 我被黑心中介騙來泰國打工敷扫, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人诚卸。 一個月前我還...
    沈念sama閱讀 45,467評論 2 352
  • 正文 我出身青樓葵第,卻偏偏與公主長得像,于是被迫代替她去往敵國和親合溺。 傳聞我的和親對象是個殘疾皇子卒密,可洞房花燭夜當晚...
    茶點故事閱讀 42,762評論 2 345

推薦閱讀更多精彩內容