ReentrantLock condition 源碼分析

本篇主要介紹ReentrantLock 中 condition的await/signal方法的實(shí)現(xiàn)原理。

使用說明

public void foo() throws InterruptedException {
        ReentrantLock reentrantLock = new ReentrantLock();
        Condition condition = reentrantLock.newCondition();

        reentrantLock.lock();
            condition.await();
            //....
            condition.signal();
        reentrantLock.unlock();
    }

當(dāng)前線程在獲取到鎖后丈攒,通過await來讓自己進(jìn)入park阻塞狀態(tài)祷杈、加入等待隊(duì)列智袭,并釋放鎖献烦。
signal方法將其他在等待隊(duì)列中曙求,處于park狀態(tài)下的線程喚醒牌借,并嘗試競爭鎖度气。

源碼分析

await() #1

/**
         * Implements interruptible condition wait.
         * <ol>
         * <li> If current thread is interrupted, throw InterruptedException.
         * <li> Save lock state returned by {@link #getState}.
         * <li> Invoke {@link #release} with saved state as argument,
         *      throwing IllegalMonitorStateException if it fails.
         * <li> Block until signalled or interrupted.
         * <li> Reacquire by invoking specialized version of
         *      {@link #acquire} with saved state as argument.
         * <li> If interrupted while blocked in step 4, throw InterruptedException.
         * </ol>
         */
        public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            // 第一部分
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            // 第二部分
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

await方法的代碼比較多,可以拆分成兩部分膨报。第一部分如何讓當(dāng)前線程park磷籍。第二部分是線程被unpark后的實(shí)現(xiàn)。

第一部分:

addConditionWaiter()

        /** First node of condition queue. */
        private transient Node firstWaiter;
        /** Last node of condition queue. */
        private transient Node lastWaiter;

        /**
         * Adds a new waiter to wait queue.
         * @return its new wait node
         */
        private Node addConditionWaiter() {
            Node t = lastWaiter;
            // If lastWaiter is cancelled, clean out.
            if (t != null && t.waitStatus != Node.CONDITION) {
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            lastWaiter = node;
            return node;
        }

主要目的是將線程構(gòu)建成Conditon模式下的Node现柠,加入到隊(duì)列中院领。
首先,隊(duì)列為空够吩,firstWaiter和lastWaiter都為null比然。當(dāng)?shù)谝粋€node創(chuàng)建成功后,firstWaiter和lastWaiter都指向這個node周循。后續(xù)再來節(jié)點(diǎn)强法,則讓node.next 指向新節(jié)點(diǎn),lastWaiter也指向新節(jié)點(diǎn)湾笛。如此構(gòu)建一個帶有頭尾指針的單向鏈表饮怯。

再看方法里第二行的if判斷,因?yàn)檫M(jìn)入到condition隊(duì)列的node一定都是condition(-2)狀態(tài)嚎研,如果不是蓖墅,則說明當(dāng)前node所屬線程已經(jīng)處理了其他的邏輯。一般是cancel狀態(tài)临扮。此時要從鏈表中去掉cancel態(tài)的節(jié)點(diǎn)论矾。

/** waitStatus value to indicate thread has cancelled */
        static final int CANCELLED =  1;
        /** waitStatus value to indicate successor's thread needs unparking */
        static final int SIGNAL    = -1;
        /** waitStatus value to indicate thread is waiting on condition */
        static final int CONDITION = -2;
        /**
         * waitStatus value to indicate the next acquireShared should
         * unconditionally propagate
         */
        static final int PROPAGATE = -3;

unlinkCancelledWaiters

/**
         * Unlinks cancelled waiter nodes from condition queue.
         * Called only while holding lock. This is called when
         * cancellation occurred during condition wait, and upon
         * insertion of a new waiter when lastWaiter is seen to have
         * been cancelled. This method is needed to avoid garbage
         * retention in the absence of signals. So even though it may
         * require a full traversal, it comes into play only when
         * timeouts or cancellations occur in the absence of
         * signals. It traverses all nodes rather than stopping at a
         * particular target to unlink all pointers to garbage nodes
         * without requiring many re-traversals during cancellation
         * storms.
         */
        private void unlinkCancelledWaiters() {
            Node t = firstWaiter;
            Node trail = null;
            while (t != null) {
                Node next = t.nextWaiter;
                if (t.waitStatus != Node.CONDITION) {
                    t.nextWaiter = null;
                    if (trail == null)
                        firstWaiter = next;
                    else
                        trail.nextWaiter = next;
                    if (next == null)
                        lastWaiter = trail;
                }
                else
                    trail = t;
                t = next;
            }
        }

總而言之做了一件事,將非condition狀態(tài)的node從鏈表中去掉杆勇。此時lastWaiter一定是condition狀態(tài)拇囊,賦值給t。

fullyRelease()

/**
     * Invokes release with current state value; returns saved state.
     * Cancels node and throws exception on failure.
     * @param node the condition node for this wait
     * @return previous sync state
     */
    final int fullyRelease(Node node) {
        boolean failed = true;
        try {
            int savedState = getState();
            if (release(savedState)) {
                failed = false;
                return savedState;
            } else {
                throw new IllegalMonitorStateException();
            }
        } finally {
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
    }

/**
     * Releases in exclusive mode.  Implemented by unblocking one or
     * more threads if {@link #tryRelease} returns true.
     * This method can be used to implement method {@link Lock#unlock}.
     *
     * @param arg the release argument.  This value is conveyed to
     *        {@link #tryRelease} but is otherwise uninterpreted and
     *        can represent anything you like.
     * @return the value returned from {@link #tryRelease}
     */
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

    protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

savedState表示的是重入的次數(shù)靶橱,可能1次,也可能多次,這里一次性全部釋放掉关霸,將全局的state=0传黄,exclusiveOwnerThread=null。并且通過unparkSuccessor獲取同步隊(duì)列中的下一個node队寇。具體過程已經(jīng)在ReentrantLock源碼分析中做了說明膘掰。
簡而言之就是當(dāng)前線程釋放鎖,讓同步隊(duì)列的下一個node開始搶占佳遣。

isOnSyncQueue()

/**
     * Returns true if a node, always one that was initially placed on
     * a condition queue, is now waiting to reacquire on sync queue.
     * @param node the node
     * @return true if is reacquiring
     */
    final boolean isOnSyncQueue(Node node) {
        if (node.waitStatus == Node.CONDITION || node.prev == null)
            return false;
        if (node.next != null) // If has successor, it must be on queue
            return true;
        /*
         * node.prev can be non-null, but not yet on queue because
         * the CAS to place it on queue can fail. So we have to
         * traverse from tail to make sure it actually made it.  It
         * will always be near the tail in calls to this method, and
         * unless the CAS failed (which is unlikely), it will be
         * there, so we hardly ever traverse much.
         */
        return findNodeFromTail(node);
    }

    /**
     * Returns true if node is on sync queue by searching backwards from tail.
     * Called only when needed by isOnSyncQueue.
     * @return true if present
     */
    private boolean findNodeFromTail(Node node) {
        Node t = tail;
        for (;;) {
            if (t == node)
                return true;
            if (t == null)
                return false;
            t = t.prev;
        }
    }

此時判斷node是不是已經(jīng)在sync隊(duì)列中识埋,判斷的標(biāo)準(zhǔn)是waitStatus、prev和next零渐,以及從tail倒序查找窒舟。
這里關(guān)于倒序查詢有很大一段注釋,大意是說單純判斷node.prev是not null诵盼,并不能代表在node已經(jīng)在sync隊(duì)列中惠豺。需要從sync隊(duì)列中的tail倒序查詢,并且說明了node大概率在tail附近风宁,不會有太多性能損耗洁墙。

cas在替換prev時可能失敗,也就是我下面貼的入隊(duì)的方法實(shí)現(xiàn)戒财。因?yàn)閜rev是volatile的热监,會直接可見,但是compareAndSetTail可能會失敗饮寞,從而導(dǎo)致沒有成功入隊(duì)孝扛。


如果node并沒有在sync隊(duì)列中,則被park骂际。

      while (!isOnSyncQueue(node)) {
          LockSupport.park(this);
          if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
              break;
      }

至此第一部分說明完成疗琉,當(dāng)前占有鎖的線程被添加到了condition queue中,釋放鎖被處于park狀態(tài)歉铝。

第二部分:

既然線程已經(jīng)被park了盈简,就先說明是如何被unpark的。一般來說我們都是配置signal(signalAll)一起使用太示。先分析下signal().

signal()

/**
         * Moves the longest-waiting thread, if one exists, from the
         * wait queue for this condition to the wait queue for the
         * owning lock.
         *
         * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
         *         returns {@code false}
         */
        public final void signal() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
        }

如果condition隊(duì)列里有node柠贤,則開始喚醒。

doSignal()

 /**
         * Removes and transfers nodes until hit non-cancelled one or
         * null. Split out from signal in part to encourage compilers
         * to inline the case of no waiters.
         * @param first (non-null) the first node on condition queue
         */
        private void doSignal(Node first) {
            do {
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        }

如果first.nextWaiter是null类缤,則說明condition隊(duì)列中只有這個node臼勉,firstWaiter、lastWaiter餐弱、nextWaiter都是null宴霸。
如果后續(xù)還有節(jié)點(diǎn)囱晴,將nextWaiter指向firstWaiter,并斷開first.nextWaiter瓢谢。
重點(diǎn)看下transferForSignal畸写。

transferForSignal()

/**
     * Transfers a node from a condition queue onto sync queue.
     * Returns true if successful.
     * @param node the node
     * @return true if successfully transferred (else the node was
     * cancelled before signal)
     */
    final boolean transferForSignal(Node node) {
        /*
         * If cannot change waitStatus, the node has been cancelled.
         */
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;

        /*
         * Splice onto queue and try to set waitStatus of predecessor to
         * indicate that thread is (probably) waiting. If cancelled or
         * attempt to set waitStatus fails, wake up to resync (in which
         * case the waitStatus can be transiently and harmlessly wrong).
         */
        Node p = enq(node);
        int ws = p.waitStatus;
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }

/**
     * Inserts node into queue, initializing if necessary. See picture above.
     * @param node the node to insert
     * @return node's predecessor
     */
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

顧名思義,這個方法的目的就是將node從condition隊(duì)列轉(zhuǎn)到sync隊(duì)列氓扛。
轉(zhuǎn)移前的狀態(tài)如果不是condition枯芬,說明是cancel,就不再執(zhí)行采郎。成功則繼續(xù)向后執(zhí)行千所,此時當(dāng)前node的waitState=0
將node節(jié)點(diǎn)enq到sync隊(duì)列中蒜埋,返回前一個node淫痰。
如果前一個node已經(jīng)被取消,或者在cas成signal的過程中失斃砭ァ(也就是可能在設(shè)置過程中cancel)黑界,那就通過unpark將當(dāng)前節(jié)點(diǎn)喚醒(相當(dāng)于被提前喚醒)。

此時皂林,當(dāng)前線程完成signal方法的調(diào)用朗鸠,如果調(diào)用了unpark,則這個線程也被喚醒础倍。兩個線程同時在執(zhí)行烛占。

doSignalAll()

/**
         * Removes and transfers all nodes.
         * @param first (non-null) the first node on condition queue
         */
        private void doSignalAll(Node first) {
            lastWaiter = firstWaiter = null;
            do {
                Node next = first.nextWaiter;
                first.nextWaiter = null;
                transferForSignal(first);
                first = next;
            } while (first != null);
        }

這個all表示把condition隊(duì)列中的所有node全部transfer到sync隊(duì)列。

至此沟启,signal(signalAll)執(zhí)行完成忆家,transfer或者unpark condition隊(duì)列中的node。

await() #2

無論以什么樣的方式喚醒德迹,await內(nèi)的park線程終究還是會被喚醒芽卿,繼續(xù)向后執(zhí)行。

      public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            // 第一部分
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            // 第二部分
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

先檢查在等待過程中是否中斷過胳搞,如果是卸例,看中斷時機(jī)。
在signal信號前被中斷返回THROW_IE肌毅,已經(jīng)在sync隊(duì)列中返回REINTERRUPT筷转。

/**
         * Checks for interrupt, returning THROW_IE if interrupted
         * before signalled, REINTERRUPT if after signalled, or
         * 0 if not interrupted.
         */
        private int checkInterruptWhileWaiting(Node node) {
            return Thread.interrupted() ?
                (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
                0;
        }

      /**
     * Transfers node, if necessary, to sync queue after a cancelled wait.
     * Returns true if thread was cancelled before being signalled.
     *
     * @param node the node
     * @return true if cancelled before the node was signalled
     */
    final boolean transferAfterCancelledWait(Node node) {
        if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
            //如果收到了中斷信號,且當(dāng)前node還在condition隊(duì)列中悬而,則入隊(duì)到sync隊(duì)列呜舒。
            enq(node);
            return true;
        }
        /*
         * If we lost out to a signal(), then we can't proceed
         * until it finishes its enq().  Cancelling during an
         * incomplete transfer is both rare and transient, so just
         * spin.
         */
        while (!isOnSyncQueue(node))
            Thread.yield();
        return false;
    }

acquireQueued()

 /**
     * Acquires in exclusive uninterruptible mode for thread already in
     * queue. Used by condition wait methods as well as acquire.
     *
     * @param node the node
     * @param arg the acquire argument
     * @return {@code true} if interrupted while waiting
     */
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

中斷標(biāo)記記錄后,嘗試獲取鎖笨奠,如果沒有達(dá)到條件袭蝗,則再次進(jìn)入park狀態(tài)唤殴。

            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);

再次被喚醒或者搶占到鎖后,清理一波cancel的condition隊(duì)列到腥。根據(jù)不同的中斷標(biāo)記向上拋出異痴0耍或者返回中斷標(biāo)記。

至此左电,await() 方法也執(zhí)行完成。

總結(jié)

condition的各種await页响、signal的處理結(jié)合了lock和unlock的狀態(tài)篓足。內(nèi)部的很多操作都是需要在獲得鎖的狀態(tài)下執(zhí)行。這也就是為什么await闰蚕、signal需要寫到lock和unlock塊中栈拖。

這四個方法需要整體看。

重點(diǎn)說明

ReentrantLock 內(nèi)部分為了兩個隊(duì)列(sync和condition), 兩種模式(EXCLUSIVE没陡、SHARED)涩哟,五種狀態(tài)(SINGAL, CONDITION, CANCELLED, PROPAGATE, 0)

sync 隊(duì)列是帶有頭尾指針的雙向鏈表,節(jié)點(diǎn)字段是

    private transient volatile Node head;
    private transient volatile Node tail;
    volatile Node prev;
    volatile Node next;

condition隊(duì)列是帶有頭尾指針的單鏈表盼玄,節(jié)點(diǎn)字段是

        private transient Node firstWaiter;
        private transient Node lastWaiter;
        Node nextWaiter;

lock()方法本質(zhì)是將未獲得鎖的node加入到sync隊(duì)列
unlock方法本質(zhì)是將sync隊(duì)列的node依次喚醒執(zhí)行贴彼。
await()方法是將node加入到condition隊(duì)列中。
signal()方法是將condition隊(duì)列中的head node(signalAll是全部node)從condition轉(zhuǎn)到sync隊(duì)列埃儿。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末器仗,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子童番,更是在濱河造成了極大的恐慌精钮,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,042評論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件剃斧,死亡現(xiàn)場離奇詭異轨香,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)幼东,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 89,996評論 2 384
  • 文/潘曉璐 我一進(jìn)店門臂容,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人筋粗,你說我怎么就攤上這事策橘。” “怎么了娜亿?”我有些...
    開封第一講書人閱讀 156,674評論 0 345
  • 文/不壞的土叔 我叫張陵丽已,是天一觀的道長。 經(jīng)常有香客問我买决,道長沛婴,這世上最難降的妖魔是什么吼畏? 我笑而不...
    開封第一講書人閱讀 56,340評論 1 283
  • 正文 為了忘掉前任,我火速辦了婚禮嘁灯,結(jié)果婚禮上泻蚊,老公的妹妹穿的比我還像新娘。我一直安慰自己丑婿,他們只是感情好性雄,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,404評論 5 384
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著羹奉,像睡著了一般秒旋。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上诀拭,一...
    開封第一講書人閱讀 49,749評論 1 289
  • 那天迁筛,我揣著相機(jī)與錄音,去河邊找鬼耕挨。 笑死细卧,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的筒占。 我是一名探鬼主播贪庙,決...
    沈念sama閱讀 38,902評論 3 405
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼赋铝!你這毒婦竟也來了插勤?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,662評論 0 266
  • 序言:老撾萬榮一對情侶失蹤革骨,失蹤者是張志新(化名)和其女友劉穎农尖,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體良哲,經(jīng)...
    沈念sama閱讀 44,110評論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡盛卡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,451評論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了筑凫。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片滑沧。...
    茶點(diǎn)故事閱讀 38,577評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖巍实,靈堂內(nèi)的尸體忽然破棺而出滓技,到底是詐尸還是另有隱情,我是刑警寧澤棚潦,帶...
    沈念sama閱讀 34,258評論 4 328
  • 正文 年R本政府宣布令漂,位于F島的核電站,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏叠必。R本人自食惡果不足惜荚孵,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,848評論 3 312
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望纬朝。 院中可真熱鬧收叶,春花似錦、人聲如沸共苛。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,726評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽隅茎。三九已至哆致,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間患膛,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,952評論 1 264
  • 我被黑心中介騙來泰國打工耻蛇, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留踪蹬,地道東北人。 一個月前我還...
    沈念sama閱讀 46,271評論 2 360
  • 正文 我出身青樓臣咖,卻偏偏與公主長得像跃捣,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子夺蛇,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,452評論 2 348

推薦閱讀更多精彩內(nèi)容