ArrayBlockingQueue 與 AQS中的ConditionObject

BlockingQueue是我們在使用線程池的時(shí)候使用比較多的等待隊(duì)列,這里同時(shí)借助BlockingQueue分析下AQS中的ConditionObject誉碴。

ArrayBlockingQueue

構(gòu)造函數(shù) :

    public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }

    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        // 構(gòu)造函數(shù)中會new出一個(gè)新的ReentrantLock 方便后續(xù)使用
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition(); // 用于掛起生產(chǎn)節(jié)點(diǎn)
        notFull =  lock.newCondition(); // 用于掛起消費(fèi)節(jié)點(diǎn)
    }

put方法:

    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        // 這里加鎖保證入隊(duì)的原子性
        // 由于使用Interruptibly結(jié)尾的lock 所以會拋出中斷異常
        lock.lockInterruptibly(); 
        try {
            while (count == items.length)
                notFull.await(); // 隊(duì)列已滿 阻塞自己
            enqueue(e); 入隊(duì)
        } finally {
            lock.unlock();
        }
    }

    private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        notEmpty.signal(); // 喚醒一個(gè)消費(fèi)節(jié)點(diǎn)
    }

poll:

    public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            // 如果 隊(duì)列為空 直接返回空  否則取出一個(gè)
            return (count == 0) ? null : dequeue(); 
        } finally {
            lock.unlock();
        }
    }

    private E dequeue() {
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        if (++takeIndex == items.length)
            takeIndex = 0; // 將takeIndex 重置為隊(duì)列頭部
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        notFull.signal(); // 喚醒一個(gè)生產(chǎn)節(jié)點(diǎn)
        return x;
    }

take:

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await(); // 如果隊(duì)列為空 會阻塞自己
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

可以看到叛薯,在BlockingQueue中稀轨,使用Condition做了一些阻塞操作撮慨,下面來分析下:
首先newCondition方法會生成一個(gè)ConditionObject對象毛萌,該對象是AQS中的一個(gè)內(nèi)部類:

    public class ConditionObject implements Condition, java.io.Serializable {
        private static final long serialVersionUID = 1173984872572414699L;
        /** First node of condition queue. */
        private transient Node firstWaiter;
        /** Last node of condition queue. */
        private transient Node lastWaiter;
    }

在使用condition時(shí)航闺,AQS會維護(hù)一個(gè)ConditionObject隊(duì)列褪测,隊(duì)列中記錄了所有正在等待的節(jié)點(diǎn),并且這些節(jié)點(diǎn)不會去搶鎖潦刃。
然后來看下await方法侮措,該方法的作用是將當(dāng)前線程放入等待隊(duì)列,并從CLH隊(duì)列中取出(關(guān)于CLH隊(duì)列乖杠,其實(shí)就是AQS中維護(hù)的雙向鏈表分扎,用于等待獲取鎖):

    public final void await() throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        // 新建condition狀態(tài)的節(jié)點(diǎn)并將其入隊(duì)
        AbstractQueuedSynchronizer.Node node = addConditionWaiter();
        // 釋放當(dāng)前節(jié)點(diǎn)的鎖
        // 注意 這里會記錄下拿了幾個(gè)鎖 后面加鎖也需要同樣的數(shù)量
        int savedState = fullyRelease(node); 
        int interruptMode = 0;
        while (!isOnSyncQueue(node)) { // 節(jié)點(diǎn)沒有在CLH隊(duì)列中里面 
            // //將線程進(jìn)行掛起,前面已經(jīng)釋放掉鎖了胧洒,并且已經(jīng)安全的添加到了condition隊(duì)列中
            LockSupport.park(this);
            //  執(zhí)行這里的條件: 被中斷 or 被前置節(jié)點(diǎn)喚醒
            // 這里只要checkInterruptWhileWaiting 返回的是0 就繼續(xù)park
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
        }
        //  重新?lián)屾i 此時(shí)節(jié)點(diǎn)已經(jīng)在CLH隊(duì)列中了 獲取成功后判斷該線程是否發(fā)生錯誤
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT; // 退出時(shí)重新中斷
        if (node.nextWaiter != null) // clean up if cancelled
            unlinkCancelledWaiters(); // 清除隊(duì)列中的無效節(jié)點(diǎn)
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode); // 如果出現(xiàn)異常 拋出
    }

    private Node addConditionWaiter() {
        Node t = lastWaiter;
        // If lastWaiter is cancelled, clean out.
        if (t != null && t.waitStatus != Node.CONDITION) {
            unlinkCancelledWaiters(); // 清理無效節(jié)點(diǎn)
            t = lastWaiter;
        }
        // 新建condition類型的節(jié)點(diǎn)
        Node node = new Node(Thread.currentThread(), Node.CONDITION);
        if (t == null) // 隊(duì)列為空 則新建的節(jié)點(diǎn)就是頭
            firstWaiter = node;
        else // 否則 將當(dāng)前節(jié)點(diǎn)加入到隊(duì)尾
            t.nextWaiter = node;
        lastWaiter = node;
        return node;
    }

    final boolean isOnSyncQueue(Node node) {
        if (node.waitStatus == Node.CONDITION || node.prev == null)
            return false;
        // 注意 這里有個(gè)next 有個(gè) nextWaiter 
        // 一個(gè)用于CLH隊(duì)列 一個(gè)用于condition等待隊(duì)列 要區(qū)分開來
        if (node.next != null) // If has successor, it must be on queue
            return true;
        /*
         * node.prev can be non-null, but not yet on queue because
         * the CAS to place it on queue can fail. So we have to
         * traverse from tail to make sure it actually made it.  It
         * will always be near the tail in calls to this method, and
         * unless the CAS failed (which is unlikely), it will be
         * there, so we hardly ever traverse much.
         */
        return findNodeFromTail(node);
    }
    // 獲取隊(duì)列尾節(jié)點(diǎn)
    private boolean findNodeFromTail(Node node) {
        Node t = tail;
        for (;;) { // 從尾部遍歷整個(gè)節(jié)點(diǎn) 看是否有當(dāng)前節(jié)點(diǎn)
            if (t == node)
                return true;
            if (t == null)
                return false;
            t = t.prev;
        }
    }

釋放當(dāng)前節(jié)點(diǎn)的所有鎖:

    final int fullyRelease(Node node) {
        boolean failed = true;
        try {
            int savedState = getState();
            if (release(savedState)) { // 釋放當(dāng)前節(jié)點(diǎn)的所有鎖 并喚醒后置節(jié)點(diǎn)
                failed = false;
                return savedState;
            } else {
                throw new IllegalMonitorStateException();
            }
        } finally {
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
    }

    public final boolean release(int arg) {
        if (tryRelease(arg)) { // 釋放所有鎖
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h); // 喚醒后繼節(jié)點(diǎn)
            return true;
        }
        return false;
    }

清理隊(duì)列中的無效節(jié)點(diǎn):

    private void unlinkCancelledWaiters() {
        AbstractQueuedSynchronizer.Node t = firstWaiter;
        AbstractQueuedSynchronizer.Node trail = null;
        while (t != null) {
            AbstractQueuedSynchronizer.Node next = t.nextWaiter; // 拿到下一個(gè)節(jié)點(diǎn)
            // 若頭節(jié)點(diǎn)的狀態(tài)已經(jīng)不是CONDITION 
            if (t.waitStatus != AbstractQueuedSynchronizer.Node.CONDITION) {
                t.nextWaiter = null; // 剔除頭節(jié)點(diǎn)
                if (trail == null)
                    firstWaiter = next; // 直接將 firstWaiter 記錄為 next
                else
                    // trail已經(jīng)被記錄為CONDITION狀態(tài)的節(jié)點(diǎn)
                    // 將nextWaiter 記錄為next 即:
                    // CONDITION -> CANCELED -> UNKNOW
                    // 轉(zhuǎn)換為 CONDITION -> UNKNOW
                    trail.nextWaiter = next; 
                if (next == null) // 已經(jīng)遍歷到了隊(duì)尾
                    lastWaiter = trail;
            }
            else
                trail = t; // 如果當(dāng)前節(jié)點(diǎn)還是CONDITION狀態(tài) 則使用trail記錄下
            t = next;
        }
    }

該方法中有個(gè)疑點(diǎn)畏吓,CONDITION狀態(tài)是什么時(shí)候被重置掉的 ?
其實(shí)是在await方法中 :

// 這里 線程被喚醒后 會執(zhí)行checkInterruptWhileWaiting 方法 
 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;


    private int checkInterruptWhileWaiting(AbstractQueuedSynchronizer.Node node) {
        //  判斷節(jié)點(diǎn)是否還在condition隊(duì)列里面卫漫,如果在菲饼,將狀態(tài)變成0,放到等待返回true列赎,拋異常宏悦。
        return Thread.interrupted() ?
                (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
                0;
    }

    final boolean transferAfterCancelledWait(Node node) {
        // 也就是在這里 node 狀態(tài)會被重置為0
        if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
            enq(node); // 將該節(jié)點(diǎn)入隊(duì)
            return true;
        }
        /*
         * If we lost out to a signal(), then we can't proceed
         * until it finishes its enq().  Cancelling during an
         * incomplete transfer is both rare and transient, so just
         * spin.
         */
        // 可能有別的線程通過signal 喚醒了當(dāng)前節(jié)點(diǎn)
        // 并且正在入隊(duì) 那么這時(shí) 自旋啥也不干
        while (!isOnSyncQueue(node)) 
            Thread.yield();
        return false; // 修改node 狀態(tài)失敗 返回false
    }

     // 這個(gè)方法應(yīng)該很熟悉了 將一個(gè)節(jié)點(diǎn)放入CLH隊(duì)列中
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

接下來看下signal方法:

    public final void signal() {
        // getExclusiveOwnerThread() == Thread.currentThread(); 
        // 持有鎖的線程是否是本線程,如果不是持有鎖的線程直接拋異常
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        AbstractQueuedSynchronizer.Node first = firstWaiter;
        if (first != null)
            doSignal(first);
    }

    private void doSignal(AbstractQueuedSynchronizer.Node first) {
        do {
            if ( (firstWaiter = first.nextWaiter) == null) // 將頭節(jié)點(diǎn)后移一位
                lastWaiter = null; // 隊(duì)列已經(jīng)空了
            first.nextWaiter = null; // 斷開當(dāng)前節(jié)點(diǎn)對后繼節(jié)點(diǎn)的引用
        } while (!transferForSignal(first) &&
                (first = firstWaiter) != null); // 隊(duì)列不為空 則繼續(xù)
    }

    final boolean transferForSignal(Node node) {
        /*
         * If cannot change waitStatus, the node has been cancelled.
         */
        // 將剝離出來的節(jié)點(diǎn)改為0狀態(tài)
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) 
            return false; // 失敗的話 直接返回 操作等待隊(duì)列中下一個(gè)節(jié)點(diǎn)

        /*
         * Splice onto queue and try to set waitStatus of predecessor to
         * indicate that thread is (probably) waiting. If cancelled or
         * attempt to set waitStatus fails, wake up to resync (in which
         * case the waitStatus can be transiently and harmlessly wrong).
         */
        Node p = enq(node); // 將當(dāng)前節(jié)點(diǎn)入隊(duì) 注意 這里返回的是當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)
        int ws = p.waitStatus;
        // 如果前置節(jié)點(diǎn)狀態(tài)大于0(被取消)
        // 或者更新狀態(tài)為SIGNAL 失敯摺(SIGNAL表示后繼節(jié)點(diǎn)可以被喚醒)
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            // 則直接喚醒當(dāng)前線程
            LockSupport.unpark(node.thread);
        return true;
    }

signalAll:

        public final void signalAll() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignalAll(first);
        }

        private void doSignalAll(Node first) {
            lastWaiter = firstWaiter = null;
            do {
                Node next = first.nextWaiter;
                first.nextWaiter = null;
                transferForSignal(first); 
                first = next;
            } while (first != null); // 主要這里不同 只要等待隊(duì)列還有節(jié)點(diǎn) 就繼續(xù)喚醒
        }
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末饼煞,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子诗越,更是在濱河造成了極大的恐慌砖瞧,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,755評論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件掺喻,死亡現(xiàn)場離奇詭異芭届,居然都是意外死亡储矩,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,305評論 3 395
  • 文/潘曉璐 我一進(jìn)店門褂乍,熙熙樓的掌柜王于貴愁眉苦臉地迎上來持隧,“玉大人,你說我怎么就攤上這事逃片÷挪Γ” “怎么了?”我有些...
    開封第一講書人閱讀 165,138評論 0 355
  • 文/不壞的土叔 我叫張陵褥实,是天一觀的道長呀狼。 經(jīng)常有香客問我,道長损离,這世上最難降的妖魔是什么哥艇? 我笑而不...
    開封第一講書人閱讀 58,791評論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮僻澎,結(jié)果婚禮上貌踏,老公的妹妹穿的比我還像新娘。我一直安慰自己窟勃,他們只是感情好祖乳,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,794評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著秉氧,像睡著了一般眷昆。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上汁咏,一...
    開封第一講書人閱讀 51,631評論 1 305
  • 那天亚斋,我揣著相機(jī)與錄音,去河邊找鬼攘滩。 笑死伞访,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的轰驳。 我是一名探鬼主播厚掷,決...
    沈念sama閱讀 40,362評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼级解!你這毒婦竟也來了冒黑?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,264評論 0 276
  • 序言:老撾萬榮一對情侶失蹤勤哗,失蹤者是張志新(化名)和其女友劉穎抡爹,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體芒划,經(jīng)...
    沈念sama閱讀 45,724評論 1 315
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡冬竟,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,900評論 3 336
  • 正文 我和宋清朗相戀三年欧穴,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片泵殴。...
    茶點(diǎn)故事閱讀 40,040評論 1 350
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡涮帘,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出笑诅,到底是詐尸還是另有隱情调缨,我是刑警寧澤,帶...
    沈念sama閱讀 35,742評論 5 346
  • 正文 年R本政府宣布吆你,位于F島的核電站弦叶,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏妇多。R本人自食惡果不足惜伤哺,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,364評論 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望者祖。 院中可真熱鬧默责,春花似錦、人聲如沸咸包。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,944評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽烂瘫。三九已至,卻和暖如春奇适,著一層夾襖步出監(jiān)牢的瞬間坟比,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,060評論 1 270
  • 我被黑心中介騙來泰國打工嚷往, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留葛账,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,247評論 3 371
  • 正文 我出身青樓皮仁,卻偏偏與公主長得像籍琳,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個(gè)殘疾皇子贷祈,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,979評論 2 355

推薦閱讀更多精彩內(nèi)容