以LinkedBlockingQueue為例淺談阻塞隊列的實現(xiàn)

目錄

最近在閱讀Spark源碼的過程中孽糖,又重新接觸到了一些Java并發(fā)方面的知識纵装,于是就見縫插針地將它們記錄下來右蕊,當做復(fù)習(xí)與備忘足丢。

阻塞隊列簡介

阻塞隊列的定義

根據(jù)Doug Lea在JavaDoc中的解釋,所謂阻塞隊列脓钾,就是在普通隊列的基礎(chǔ)之上,支持以下兩種操作的隊列:

  • 當某線程從隊列獲取元素時桩警,如果隊列為空可训,就等待(阻塞)直至隊列中有元素;
  • 當某線程向隊列插入元素時捶枢,如果隊列已滿握截,就等待(阻塞)直至隊列中有空間。

也就是說烂叔,阻塞隊列是自帶同步機制的隊列谨胞。它最常用來解決線程同步中經(jīng)典的生產(chǎn)者-消費者問題,前面講過的Spark Core異步事件總線中蒜鸡,就采用阻塞隊列作為事件存儲胯努。

Java中的阻塞隊列

Java中阻塞隊列的基類是j.u.c.BlockingQueue接口牢裳,它繼承自Queue接口,并且定義了額外的方法實現(xiàn)同步:

    void put(E e) throws InterruptedException;

    boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;

    E take() throws InterruptedException;

    E poll(long timeout, TimeUnit unit) throws InterruptedException;

上述put()與offer()方法用于向隊列插入元素叶沛,take()與poll()方法則是從隊列獲取元素蒲讯。不同的是,put()與take()方法在插入/獲取時灰署,如果必須等待判帮,就會一直阻塞下去;而offer()與poll()方法可以指定阻塞的時間長度溉箕。

以BlockingQueue接口為中心的繼承關(guān)系如下圖所示脊另。


平時開發(fā)中比較常用的阻塞隊列是基于數(shù)組實現(xiàn)的ArrayBlockingQueue,與基于單鏈表實現(xiàn)的LinkedBlockingQueue约巷。本文選擇后者來深入看一下阻塞隊列的實現(xiàn)細節(jié),因為它的性能在多數(shù)情況下更優(yōu)旱捧,可以自行寫benchmark程序來測測独郎。

LinkedBlockingQueue

LinkedBlockingQueue(以下簡稱LBQ)是基于單鏈表實現(xiàn)的,先進先出(FIFO)的有界阻塞隊列枚赡。

單鏈表定義

LBQ的單鏈表結(jié)點數(shù)據(jù)結(jié)構(gòu)定義在靜態(tài)內(nèi)部類Node中氓癌。

    static class Node<E> {
        E item;
        Node<E> next;
        Node(E x) { item = x; }
    }

在類的內(nèi)部還定義了單鏈表的頭結(jié)點與尾結(jié)點。

    transient Node<E> head;
    private transient Node<E> last;

head始終指向鏈表的第一個結(jié)點贫橙,該結(jié)點是哨兵結(jié)點贪婉,不存儲數(shù)據(jù),只標記鏈表的開始卢肃,即head.item == null疲迂。這樣可以避免只有一個結(jié)點時造成混亂。
tail始終指向鏈表的最后一個結(jié)點莫湘,該結(jié)點是有數(shù)據(jù)的尤蒿,并滿足last.next == null

LBQ在隊頭獲取及彈出元素幅垮,在隊尾插入元素腰池。

鎖和等待隊列

LBQ采用雙鎖機制保證入隊和出隊可以同時進行,互不干擾忙芒。

    private final ReentrantLock takeLock = new ReentrantLock();
    private final Condition notEmpty = takeLock.newCondition();
    private final ReentrantLock putLock = new ReentrantLock();
    private final Condition notFull = putLock.newCondition();

可見定義有兩個ReentrantLock示弓,takeLock用于控制出隊,putLock用于控制入隊呵萨。另外奏属,還有這兩個鎖分別對應(yīng)的條件變量notEmpty和notFull,分別維護出隊和入隊線程的等待隊列甘桑。ReentrantLock和Condition都是Java AQS機制的重要組成部分拍皮,之后也會細說歹叮。

值得注意的是,在某些方法中需要同時對takeLock與putLock加鎖與解鎖铆帽,所以LBQ內(nèi)部也提供了這樣的方法咆耿。

    void fullyLock() {
        putLock.lock();
        takeLock.lock();
    }

    void fullyUnlock() {
        takeLock.unlock();
        putLock.unlock();
    }

這兩個方法總會成對調(diào)用,保證所有需要同時加鎖和解鎖的地方爹橱,其順序都一致并且不可中斷萨螺,也防止了前一個鎖操作成功執(zhí)行,后一個鎖操作被打斷導(dǎo)致死鎖的風(fēng)險愧驱。

另外慰技,LBQ也對條件變量的Condition.signal()方法進行了簡單封裝,分別用來喚醒阻塞的出隊操作線程和入隊操作線程组砚。

    private void signalNotEmpty() {
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
    }

    private void signalNotFull() {
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            notFull.signal();
        } finally {
            putLock.unlock();
        }
    }

容量和計數(shù)

    private final int capacity;
    private final AtomicInteger count = new AtomicInteger();

capacity是LBQ的最大容量吻商,可以在構(gòu)造方法中隨同名參數(shù)傳入,默認值是Integer.MAX_VALUE糟红。

    public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }

    public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
    }

count則是LBQ內(nèi)當前元素的計數(shù)艾帐,由于入隊和出隊動作可以并發(fā)執(zhí)行,所以要用原子類型AtomicInteger保證線程安全盆偿。

入隊操作

由于put()和offer()方法的邏輯基本相同柒爸,所以只看offer()方法就好了。

    public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {

        if (e == null) throw new NullPointerException();
        long nanos = unit.toNanos(timeout);
        int c = -1;
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            while (count.get() == capacity) {
                if (nanos <= 0)
                    return false;
                nanos = notFull.awaitNanos(nanos);
            }
            enqueue(new Node<E>(e));
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
        return true;
    }

    private void enqueue(Node<E> node) {
        // assert putLock.isHeldByCurrentThread();
        // assert last.next == null;
        last = last.next = node;
    }

在入隊時事扭,首先將putLock加鎖捎稚,然后用衛(wèi)語句count.get() == capacity判斷隊列是否已滿,若已滿求橄,則進入等待循環(huán)今野。當阻塞的時間超時后,判定入隊操作失敗谈撒,并返回false腥泥。
如果隊列未滿,或者在超時時間未到時有了空間啃匿,就調(diào)用enqueue()方法在隊尾插入元素蛔外,并將計數(shù)器自增。入隊后若還有更多的剩余空間溯乒,則喚醒其他等待的入隊線程夹厌。
最后將putLock解鎖,并檢查由count.getAndIncrement()返回的值是否為0裆悄。如果為0矛纹,表示隊列剛剛由空變?yōu)榉强諣顟B(tài),因此也要喚醒等待的出隊線程光稼。

出隊操作

同理或南,只看poll()方法孩等。

    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        E x = null;
        int c = -1;
        long nanos = unit.toNanos(timeout);
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            while (count.get() == 0) {
                if (nanos <= 0)
                    return null;
                nanos = notEmpty.awaitNanos(nanos);
            }
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }

    private E dequeue() {
        // assert takeLock.isHeldByCurrentThread();
        // assert head.item == null;
        Node<E> h = head;
        Node<E> first = h.next;
        h.next = h; // help GC
        head = first;
        E x = first.item;
        first.item = null;
        return x;
    }

將講解入隊方法時的描述反著說一遍就行了:
在出隊時,首先將takeLock加鎖采够,然后用衛(wèi)語句count.get() == 0判斷隊列是否為空肄方,若為空,則進入等待循環(huán)蹬癌。當阻塞的時間超時后权她,判定出隊操作失敗,并返回false逝薪。
如果隊列不為空隅要,或者在超時時間未到時進了新元素,就調(diào)用dequeue()方法彈出隊頭元素董济,并將計數(shù)器自減步清。出隊后若還有更多的剩余元素,則喚醒其他等待的出隊線程虏肾。
最后將takeLock解鎖尼啡,并檢查由count.getAndDecrement()返回的值是否為capacity。如果為capacity询微,表示隊列剛剛由滿變?yōu)椴粷M狀態(tài),因此也要喚醒等待的入隊線程狂巢。

需要操作雙鎖的情況

以remove()方法為例撑毛。

    public boolean remove(Object o) {
        if (o == null) return false;
        fullyLock();
        try {
            for (Node<E> trail = head, p = trail.next;
                 p != null;
                 trail = p, p = p.next) {
                if (o.equals(p.item)) {
                    unlink(p, trail);
                    return true;
                }
            }
            return false;
        } finally {
            fullyUnlock();
        }
    }

    void unlink(Node<E> p, Node<E> trail) {
        // assert isFullyLocked();
        // p.next is not changed, to allow iterators that are
        // traversing p to maintain their weak-consistency guarantee.
        p.item = null;
        trail.next = p.next;
        if (last == p)
            last = trail;
        if (count.getAndDecrement() == capacity)
            notFull.signal();
    }

由于單鏈表刪除結(jié)點涉及到對鏈表的遍歷,以及對前驅(qū)和后繼結(jié)點的斷鏈和補鏈唧领,因此必須將兩個鎖都加上藻雌,禁止一切修改。待刪除成功后才能解鎖斩个,繼續(xù)正常的入隊和出隊操作胯杭。

生產(chǎn)者-消費者問題示例

生產(chǎn)者-消費者問題的解決方法用操作系統(tǒng)理論中的信號量PV(wait-signal)原語描述如下:

semaphore filled = 0;
semaphore empty = BUF_CAPACITY;
mutex_semaphore mutex = 1;

procedure producer() {
  while (true) {
    item = produce();
    wait(empty);
    wait(mutex);
    buffer.put(item);
    signal(mutex);
    signal(filled);
  }
}

procedure consumer() {
  while (true) {
    wait(filled);
    wait(mutex);
    item = buffer.get();
    signal(mutex);
    signal(empty);
    consume(item);
  }
}

利用阻塞隊列可以免去自己實現(xiàn)同步機制的麻煩,從而非常方便地實現(xiàn)受啥。一個極簡的示例如下:

public class ProducerConsumerExample {
    private static final int BUF_CAPACITY = 16;

    public static void main(String[] args) {
        BlockingQueue<Long> blockingQueue = new LinkedBlockingQueue<>(BUF_CAPACITY);

        Thread producerThread = new Thread(() -> {
            try {
                while (true) {
                    long value = System.currentTimeMillis() % 1000;
                    blockingQueue.put(value);
                    Thread.sleep(value);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "producer");

        Thread consumerThread = new Thread(() -> {
            try {
                while (true) {
                    System.out.println(blockingQueue.take());
                    Thread.sleep(System.currentTimeMillis() % 1000);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "consumer");

        producerThread.start();
        consumerThread.start();
    }
}

一個凶龈觥(?)問題

在上面的代碼(以及j.u.c包中很多類的代碼)的方法體中滚局,經(jīng)常能看到類似以下的語句:

        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;

也就是有些類中定義的字段居暖,在方法中使用時會先賦值給一個局部變量。這樣做到底是為了什么藤肢?以目前我所了解到的而言太闺,還沒有特別確切的答案,但可以確定是一個非常微小的優(yōu)化嘁圈,與JVM及緩存有關(guān)省骂。

以下是reference傳送門:

順便蟀淮,StackOverflow最近(不知道是哪一天)改版成了1998年的樣式,滿滿的懷舊感钞澳。上面concurrency-interest郵件列表中關(guān)于這個問題也是眾說紛紜怠惶,如果仔細爬樓還會發(fā)現(xiàn)Doug Lea本人的回復(fù),不過有些令人費解略贮。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末甚疟,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子逃延,更是在濱河造成了極大的恐慌览妖,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,525評論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件揽祥,死亡現(xiàn)場離奇詭異讽膏,居然都是意外死亡,警方通過查閱死者的電腦和手機拄丰,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,203評論 3 395
  • 文/潘曉璐 我一進店門府树,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人料按,你說我怎么就攤上這事奄侠。” “怎么了载矿?”我有些...
    開封第一講書人閱讀 164,862評論 0 354
  • 文/不壞的土叔 我叫張陵垄潮,是天一觀的道長。 經(jīng)常有香客問我闷盔,道長弯洗,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,728評論 1 294
  • 正文 為了忘掉前任逢勾,我火速辦了婚禮牡整,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘溺拱。我一直安慰自己逃贝,他們只是感情好,可當我...
    茶點故事閱讀 67,743評論 6 392
  • 文/花漫 我一把揭開白布迫摔。 她就那樣靜靜地躺著秋泳,像睡著了一般。 火紅的嫁衣襯著肌膚如雪攒菠。 梳的紋絲不亂的頭發(fā)上迫皱,一...
    開封第一講書人閱讀 51,590評論 1 305
  • 那天,我揣著相機與錄音,去河邊找鬼卓起。 笑死和敬,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的戏阅。 我是一名探鬼主播昼弟,決...
    沈念sama閱讀 40,330評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼奕筐!你這毒婦竟也來了舱痘?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,244評論 0 276
  • 序言:老撾萬榮一對情侶失蹤离赫,失蹤者是張志新(化名)和其女友劉穎芭逝,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體渊胸,經(jīng)...
    沈念sama閱讀 45,693評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡旬盯,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,885評論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了翎猛。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片胖翰。...
    茶點故事閱讀 40,001評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖切厘,靈堂內(nèi)的尸體忽然破棺而出萨咳,到底是詐尸還是另有隱情,我是刑警寧澤疫稿,帶...
    沈念sama閱讀 35,723評論 5 346
  • 正文 年R本政府宣布某弦,位于F島的核電站,受9級特大地震影響而克,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜怔毛,卻給世界環(huán)境...
    茶點故事閱讀 41,343評論 3 330
  • 文/蒙蒙 一员萍、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧拣度,春花似錦碎绎、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,919評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至冤馏,卻和暖如春日麸,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,042評論 1 270
  • 我被黑心中介騙來泰國打工代箭, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留墩划,地道東北人。 一個月前我還...
    沈念sama閱讀 48,191評論 3 370
  • 正文 我出身青樓嗡综,卻偏偏與公主長得像乙帮,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子极景,可洞房花燭夜當晚...
    茶點故事閱讀 44,955評論 2 355