Java并發(fā)之阻塞隊(duì)列

隊(duì)列

隊(duì)列是先進(jìn)先出(FIFO)的線性表枝誊。在具體應(yīng)用中通常用鏈表或者數(shù)組來(lái)實(shí)現(xiàn)。隊(duì)列只允許在后端(稱為rear)進(jìn)行插入操作惜纸,在前端(稱為front)進(jìn)行刪除操作叶撒。隊(duì)列的操作方式和堆棧類似绝骚,唯一的區(qū)別在于隊(duì)列只允許新數(shù)據(jù)在后端進(jìn)行添加。

操作 拋出異常 有返回值
Insert add(e) offer(e)
Remove remove() poll()
Examine element() peek()

阻塞隊(duì)列

阻塞隊(duì)列(BlockingQueue)是一個(gè)支持兩個(gè)附加操作的隊(duì)列,這兩個(gè)附加操作支持阻塞的插入和移除方法.

  • 支持阻塞的插入方法: 當(dāng)隊(duì)列滿時(shí),隊(duì)列會(huì)阻塞插入元素的線程,直到隊(duì)列不滿為止.
  • 支持阻塞的移除方法: 當(dāng)隊(duì)列為空時(shí),獲取元素的線程阻塞等待線程非空.

阻塞隊(duì)列通常用于生產(chǎn)者和消費(fèi)者的場(chǎng)景,生產(chǎn)者就是向隊(duì)列里添加元素,而消費(fèi)者就是從隊(duì)列里取出元素. 阻塞隊(duì)列就是生產(chǎn)者存儲(chǔ)元素而消費(fèi)者用來(lái)獲取元素的容器.

操作方式 拋出異常 返回特殊值 一直阻塞 超時(shí)退出
插入 add(e) offer(e) put(e) offer(e,time,unit)
移除 remove() poll() take() poll(time,unit)
檢查 element() peek() 不可用 不可用

注意: 如果是無(wú)界阻塞隊(duì)列,隊(duì)列永遠(yuǎn)都不會(huì)出現(xiàn)滿的情況,所以使用put或者take方法永遠(yuǎn)都不會(huì)被阻塞,而且使用put方法時(shí),該方法永遠(yuǎn)返回為true.

JDK提供的阻塞隊(duì)列

從上面的UML圖可以看到,JKD7提供了7個(gè)阻塞隊(duì)列:

  • ArrayBlockingQueue: 由數(shù)組結(jié)構(gòu)組成的有界阻塞隊(duì)列
  • LinkedBlockingQueue: 由鏈表結(jié)構(gòu)組成的有界阻塞隊(duì)列
  • PriorityBlockingQueue: 支持優(yōu)先級(jí)排序的無(wú)界阻塞隊(duì)列
  • DelayQueue: 使用優(yōu)先級(jí)隊(duì)列隊(duì)列實(shí)現(xiàn)的無(wú)界阻塞隊(duì)列
  • SynchronousQueue: 不存儲(chǔ)元素的阻塞隊(duì)列
  • LinkedTransferQueue: 由鏈表結(jié)構(gòu)組成的無(wú)界阻塞隊(duì)列
  • LinkedBlockingDeque: 由鏈表結(jié)構(gòu)組成的雙向阻塞隊(duì)列

ArrayBlockingQueue

ArrayBlockingQueue是一個(gè)用數(shù)組實(shí)現(xiàn)的有界隊(duì)列,此隊(duì)列按照先進(jìn)先出的原則對(duì)元素進(jìn)行排序.

默認(rèn)情況下不保證線程公平的訪問(wèn)隊(duì)列,所謂公平訪問(wèn)隊(duì)列是指阻塞的線程,可以按照阻塞的先后順序訪問(wèn)隊(duì)列,即先阻塞線程先訪問(wèn)隊(duì)列.非公平性對(duì)先等待的線程是非公平的,當(dāng)隊(duì)列可用時(shí),阻塞的線程都可以爭(zhēng)奪訪問(wèn)隊(duì)列的資格,有可能先阻塞的線程最后才訪問(wèn)隊(duì)列.

為了保證公平性,通常會(huì)降低吞吐量,可以使用以下代碼創(chuàng)建一個(gè)公平的阻塞隊(duì)列.


    ArrayBlockingQueue fairQueue= new ArrayBlockingQueue(1000,true);

訪問(wèn)者的公平性是使用可重入鎖實(shí)現(xiàn)的,代碼如下:


    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

LinkedBlockingQueue

LinkedBlockingQueue是一個(gè)用鏈表實(shí)現(xiàn)的有界阻塞隊(duì)列,此隊(duì)列默認(rèn)最大長(zhǎng)度為Integer.MAX_VALUE,按照先進(jìn)先出(FIFO)的原則對(duì)元素進(jìn)行排序

PriorityBlockingQueue

PriorityBlockingQueue是一個(gè)支持優(yōu)先級(jí)的無(wú)界阻塞隊(duì)列,默認(rèn)情況下元素采用自然排序升序排列,也可以自定義類實(shí)現(xiàn)compareTo()方法來(lái)指定元素排序規(guī)則,或者初始化PriorityBlockingQueue時(shí),指定構(gòu)造參數(shù)Comparator來(lái)對(duì)元素進(jìn)行排序,需要注意的是不能保證同優(yōu)先級(jí)的元素排序.

DelayQueue

DelayQueue是一個(gè)支持延時(shí)獲取元素的無(wú)界阻塞隊(duì)列,隊(duì)列使用PriorityQueue來(lái)實(shí)現(xiàn). 隊(duì)列中元素必須實(shí)現(xiàn)Delayed接口,在創(chuàng)建元素時(shí)可以指定多久才能從隊(duì)列中獲取當(dāng)前元素.只有延遲期滿時(shí)才能從隊(duì)列中提出元素.

DelayQueue非常有用,可以將DelayQueue運(yùn)用在一下場(chǎng)景:

  • 緩存系統(tǒng)的設(shè)計(jì): 可以送DelayQueue保存緩存元素的有效期,使用一個(gè)線程循環(huán)查詢DelayQueue,一旦從DelayQueue獲取元素,就表示緩存到期了.
  • 定時(shí)任務(wù)調(diào)度:使用DelayQueue保存當(dāng)前將會(huì)執(zhí)行的任務(wù)和執(zhí)行時(shí)間,一旦從DelayQueue中獲取到任務(wù)就開(kāi)始執(zhí)行,比如TimeQueue就是使用DelayQueue實(shí)現(xiàn)的.

SynchronousQueue

SynchronousQueue是一個(gè)不存儲(chǔ)元素的阻塞隊(duì)列,每個(gè)put操作必須等待一個(gè)take操作,否則不能繼續(xù)添加元素.

支持公平訪問(wèn)隊(duì)列,默認(rèn)情況下線程采用非公平性策略,使用帶boolean參數(shù)的構(gòu)造方法可以實(shí)現(xiàn)等待線程采用先進(jìn)先出(FIFO)的順序訪問(wèn)隊(duì)列.


    public SynchronousQueue(boolean fair) {
        transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
    }

LinkedTransferQueue

LinkedTransferQueue是一個(gè)由鏈表結(jié)構(gòu)組成的無(wú)界阻塞TransferQueue隊(duì)列,相當(dāng)于其他阻塞隊(duì)列,LinkedTransferQueue多了一tryTransfer和transfer方法.

  • transfer方法

    如果當(dāng)前有消費(fèi)者正在等待接收元素(消費(fèi)者使用take()方法或者帶時(shí)間限制的poll方式時(shí))transfer()方法可以吧生產(chǎn)者傳入的元素立即transfer(傳輸)給消費(fèi)者,如果沒(méi)有消費(fèi)者在等待接收元素,transfer方法將元素存放在隊(duì)列的tail節(jié)點(diǎn),并等待該元素被消費(fèi)者消費(fèi)了才返回.

  • tryTransfer方法

    tryTransfer方法用來(lái)試探生產(chǎn)者傳入元素是否能夠直接傳遞給消費(fèi)者,如果沒(méi)有消費(fèi)者等待接收元素.則返回false, 和transfer方法的區(qū)別是tryTransfer方法無(wú)論消費(fèi)者是否接收,方法立即返回,而transfer需要等待消費(fèi)者消費(fèi)了才返回.

LinkedBlockingDeque

LinkedBlockingDeque是一由鏈表結(jié)構(gòu)組成的雙向阻塞隊(duì)列,所謂雙向隊(duì)列指的是可以從隊(duì)列兩端插入和移除元素,雙端隊(duì)列因?yàn)槎嗔艘粋€(gè)操作隊(duì)列的入口,在多線程同時(shí)入隊(duì)時(shí),也就減少了一般競(jìng)爭(zhēng).相比其他阻塞隊(duì)列,LinkedBlockingDeque多了addFirst, addLast,offerFirst,offerLast,peekFirst,peekLast等方法.

在初始化LinkedBlockingDeque時(shí)可以設(shè)置容量防止其過(guò)渡膨脹, 另外,雙向阻塞隊(duì)列可以運(yùn)行在"工作竊取"模式中.

阻塞隊(duì)列實(shí)現(xiàn)的原理

通知模式實(shí)現(xiàn): 所謂通知模式,就是當(dāng)生產(chǎn)者從滿的隊(duì)列里添加元素時(shí)會(huì)阻塞生產(chǎn)者,而當(dāng)消費(fèi)者消費(fèi)了一個(gè)隊(duì)列中的元素后,就會(huì)通知生產(chǎn)者當(dāng)前隊(duì)列可用. ArrayBlockingQueue使用ReentrantLock和Condition實(shí)現(xiàn).


    /** Main lock guarding all access */
    final ReentrantLock lock;
    /** Condition for waiting takes */
    private final Condition notEmpty;
    /** Condition for waiting puts */
    private final Condition notFull;

     public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

     public void put(E e) throws InterruptedException {
        Objects.requireNonNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }
    
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

    private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length) putIndex = 0;
        count++;
        notEmpty.signal();
    }

    /**
     * Extracts element at current take position, advances, and signals.
     * Call only when holding lock.
     */
    private E dequeue() {
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        if (++takeIndex == items.length) takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        notFull.signal();
        return x;
    }

當(dāng)往隊(duì)列里插入一個(gè)元素時(shí),如果隊(duì)列不可用,那么阻塞生產(chǎn)者主要通過(guò)LockSupport.part(this)實(shí)現(xiàn):


    public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

然后看看LockSupport的源碼:發(fā)現(xiàn)調(diào)研setBlocker先保存一下將要阻塞的線程,然后代用unsafe.park阻塞當(dāng)前線程:


    public static void park(Object blocker) {
        Thread t = Thread.currentThread();
        setBlocker(t, blocker);
        U.park(false, 0L);
        setBlocker(t, null);
    }

park是個(gè)native方法,會(huì)阻塞當(dāng)前線程,只有以下四種情況中一種發(fā)生時(shí),該返回才會(huì)返回.

  • 與park相對(duì)的unpark執(zhí)行或者已經(jīng)執(zhí)行. "已經(jīng)執(zhí)行"是指執(zhí)行unpark,再執(zhí)行park的情況
  • 線程被中斷時(shí)
  • 等待完time參數(shù)指定的毫秒數(shù)時(shí)
  • 異踌艄唬現(xiàn)象發(fā)生時(shí),這個(gè)異逞雇簦現(xiàn)象沒(méi)有任何原因
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市古瓤,隨后出現(xiàn)的幾起案子止剖,更是在濱河造成了極大的恐慌,老刑警劉巖落君,帶你破解...
    沈念sama閱讀 218,036評(píng)論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件穿香,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡绎速,警方通過(guò)查閱死者的電腦和手機(jī)皮获,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,046評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門(mén),熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)纹冤,“玉大人洒宝,你說(shuō)我怎么就攤上這事≌哉埽” “怎么了奴愉?”我有些...
    開(kāi)封第一講書(shū)人閱讀 164,411評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵壳快,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我,道長(zhǎng)撒蟀,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,622評(píng)論 1 293
  • 正文 為了忘掉前任荸频,我火速辦了婚禮留晚,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘扒最。我一直安慰自己丑勤,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,661評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布吧趣。 她就那樣靜靜地躺著法竞,像睡著了一般。 火紅的嫁衣襯著肌膚如雪强挫。 梳的紋絲不亂的頭發(fā)上岔霸,一...
    開(kāi)封第一講書(shū)人閱讀 51,521評(píng)論 1 304
  • 那天,我揣著相機(jī)與錄音俯渤,去河邊找鬼呆细。 笑死,一個(gè)胖子當(dāng)著我的面吹牛八匠,可吹牛的內(nèi)容都是我干的絮爷。 我是一名探鬼主播趴酣,決...
    沈念sama閱讀 40,288評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼坑夯!你這毒婦竟也來(lái)了岖寞?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 39,200評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤渊涝,失蹤者是張志新(化名)和其女友劉穎慎璧,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體跨释,經(jīng)...
    沈念sama閱讀 45,644評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡胸私,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,837評(píng)論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了鳖谈。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片岁疼。...
    茶點(diǎn)故事閱讀 39,953評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖缆娃,靈堂內(nèi)的尸體忽然破棺而出捷绒,到底是詐尸還是另有隱情,我是刑警寧澤贯要,帶...
    沈念sama閱讀 35,673評(píng)論 5 346
  • 正文 年R本政府宣布暖侨,位于F島的核電站,受9級(jí)特大地震影響崇渗,放射性物質(zhì)發(fā)生泄漏字逗。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,281評(píng)論 3 329
  • 文/蒙蒙 一宅广、第九天 我趴在偏房一處隱蔽的房頂上張望葫掉。 院中可真熱鬧,春花似錦跟狱、人聲如沸俭厚。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,889評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)挪挤。三九已至,卻和暖如春关翎,著一層夾襖步出監(jiān)牢的瞬間扛门,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,011評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工笤休, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人症副。 一個(gè)月前我還...
    沈念sama閱讀 48,119評(píng)論 3 370
  • 正文 我出身青樓店雅,卻偏偏與公主長(zhǎng)得像政基,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子闹啦,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,901評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容

  • 相關(guān)文章Java并發(fā)編程(一)線程定義沮明、狀態(tài)和屬性 Java并發(fā)編程(二)同步Java并發(fā)編程(三)volatil...
    劉望舒閱讀 5,235評(píng)論 1 31
  • 阻塞隊(duì)列(BlockingQueue)是一個(gè)支持兩個(gè)附加操作的隊(duì)列。這兩個(gè)附加的操作是:在隊(duì)列為空時(shí)窍奋,獲取元素的線...
    端木軒閱讀 1,004評(píng)論 0 2
  • 1.阻塞隊(duì)列定義阻塞隊(duì)列常用于生產(chǎn)者和消費(fèi)者的場(chǎng)景荐健,生產(chǎn)者是往隊(duì)列里添加元素的線程,消費(fèi)者是從隊(duì)列里拿元素的線程琳袄。...
    SDY_0656閱讀 429評(píng)論 0 1
  • 目前網(wǎng)絡(luò)上的理財(cái)app有很多江场,投資得收益都是余額寶得幾倍有得甚至更高,面對(duì)這么多得理財(cái)app得時(shí)候我們?nèi)绾稳ミx擇窖逗,...
    阮策策閱讀 255評(píng)論 0 0