Semaphore源碼解讀

關(guān)鍵字:AQS店量、自旋大年、CAS换薄、LockSupport、CLH阻塞隊(duì)列

1. AQS

Semaphore的相關(guān)操作主要由其內(nèi)部成員變量sync完成翔试,sync有兩種轻要,分別是支持公平鎖的FairSync和不公平鎖的NonfairSync,兩種都是基于AQS擴(kuò)展而來(lái)垦缅。我們?cè)诼暶饕粋€(gè)信號(hào)量對(duì)象的時(shí)候冲泥,sync便在構(gòu)造函數(shù)里被初始化。這里先簡(jiǎn)單介紹以下AQS失都,后續(xù)會(huì)出一篇文章詳細(xì)解讀柏蘑。

AQS全名為AbstractQueuedSynchronizer,即抽象隊(duì)列同步器粹庞,是并發(fā)包作者Doug Lea為了解決在Java 1.5之前synchronized性能問(wèn)題而開(kāi)發(fā)的并發(fā)框架咳焚,主要實(shí)現(xiàn)有ReentrantLock,ReentrantReadWriteLock, CountDownLatch, Semaphore等庞溜,和synchronized對(duì)標(biāo)的便是ReentrantLock革半。

AQS內(nèi)維護(hù)了一個(gè)volatile類型的int 成員變量state,以及一個(gè)雙向CLH隊(duì)列流码,線程嘗試修改state屬性值又官,修改成功便表明成功獲取鎖,否則進(jìn)入CLH隊(duì)列并阻塞漫试,直到持有鎖的線程釋放六敬,并喚醒CLH隊(duì)列中的線程。

2. 初始化信號(hào)量

我們?cè)诔跏蓟疭emaphore的時(shí)候驾荣,便指定了state的值外构,表明可以獲取的最大信號(hào)量,線程嘗試獲取信號(hào)量即對(duì)state減去相應(yīng)的值播掷,修改成功便表明成功獲取信號(hào)量审编,否則進(jìn)入CLH隊(duì)列并阻塞,直到持有信號(hào)量的線程釋放歧匈,state加1垒酬,并喚醒CLH隊(duì)列中的全部線程。

sync由構(gòu)造函數(shù)進(jìn)行初始化


//構(gòu)造permits個(gè)數(shù)量的不公平鎖
public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}

//根據(jù)fair構(gòu)造permits個(gè)數(shù)量的公平&不公平鎖
public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
    

3. 加鎖操作(獲取信號(hào)量)

Semaphore提供了8中常用的加鎖操作,可分為三大類勘究,即獲取一定數(shù)量的共享鎖&是否支持中斷&獲取不到是否阻塞矮湘,以下8中操作便是其兩兩組合。

//通過(guò)sync獲取共享鎖(可中斷)
public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

//通過(guò)sync獲取共享鎖(不可中斷)
public void acquireUninterruptibly() {
    sync.acquireShared(1);
}

//嘗試獲取1個(gè)共享鎖乱顾,獲取不到則立刻返回false板祝,不進(jìn)行阻塞    
public boolean tryAcquire() {
    return sync.nonfairTryAcquireShared(1) >= 0;
}
    
//嘗試獲取1個(gè)共享鎖宫静,獲取不到則等待timeout時(shí)間后返回false  
public boolean tryAcquire(long timeout, TimeUnit unit)
    throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
 
//嘗試獲取permits個(gè)共享鎖走净,獲取不到則立刻返回false,不進(jìn)行阻塞   
public boolean tryAcquire(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    return sync.nonfairTryAcquireShared(permits) >= 0;
}

//嘗試獲取permits個(gè)共享鎖孤里,獲取不到則等待timeout時(shí)間后返回false  
public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
    throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
}

先來(lái)看一下獲取可中斷鎖


public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    //線程是否被中斷,中斷則拋出中斷異常
    if (Thread.interrupted())
        throw new InterruptedException();
    // 1.首先嘗試獲取共享鎖
    // 2.獲取成功則進(jìn)行相應(yīng)的業(yè)務(wù)邏輯,獲取失敗進(jìn)入阻塞隊(duì)列
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

獲取信號(hào)量鎖時(shí)又分為公平鎖和不公平鎖陵霉,以下分別是兩種鎖是如何獲取的


static final class FairSync extends Sync {
    private static final long serialVersionUID = 2014338818796000944L;

    FairSync(int permits) {
        super(permits);
    }

    protected int tryAcquireShared(int acquires) {
        //自旋
        for (;;) {
            //阻塞隊(duì)列中是否已經(jīng)有節(jié)點(diǎn)在等待貌夕,如有則直接返回獲取失敗
            //這個(gè)判斷就是和不公平鎖的區(qū)別,不公平鎖不管隊(duì)列中是否有節(jié)點(diǎn)等待虏等,上來(lái)就搶鎖
            if (hasQueuedPredecessors())
                return -1;
            //通過(guò)CAS設(shè)置state
            int available = getState();
            int remaining = available - acquires;
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                return remaining;
        }
    }
}

static final class NonfairSync extends Sync {
    private static final long serialVersionUID = -2694183684443567898L;

    NonfairSync(int permits) {
        super(permits);
    }

    protected int tryAcquireShared(int acquires) {
        //調(diào)用父方法
        /*
        for (;;) {
            int available = getState();
            int remaining = available - acquires;
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                return remaining;
        }
        */
        return nonfairTryAcquireShared(acquires);
    }
}

獲取鎖失敗請(qǐng)求入隊(duì)

private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
    //將當(dāng)前節(jié)點(diǎn)放入阻塞隊(duì)列
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        // 自旋
        for (;;) {
            //獲取當(dāng)前節(jié)點(diǎn)的上一節(jié)點(diǎn)
            final Node p = node.predecessor();
            //上一節(jié)點(diǎn)如果是隊(duì)頭
            if (p == head) {
                //再次嘗試獲取args數(shù)量的共享鎖弄唧,r為剩余的共享數(shù)量
                int r = tryAcquireShared(arg);
                //獲取成功
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            //上一節(jié)點(diǎn)如果不是隊(duì)頭,即阻塞隊(duì)列中已經(jīng)有節(jié)點(diǎn)在等待或者是隊(duì)頭但獲取鎖失敗則執(zhí)行以下邏輯
            //1.將當(dāng)前節(jié)點(diǎn)的有效前驅(qū)節(jié)點(diǎn)標(biāo)示為可喚醒狀態(tài)
            //2.將當(dāng)前節(jié)點(diǎn)阻塞霍衫,等待被喚醒或中斷
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
    }
//獲取當(dāng)前節(jié)點(diǎn)的上一個(gè)節(jié)點(diǎn)
final Node predecessor() throws NullPointerException {
    Node p = prev;
    if (p == null)
        throw new NullPointerException();
    else
        return p;
}


/*
 *
 * pred 上一個(gè)節(jié)點(diǎn)
 * node 當(dāng)前節(jié)點(diǎn)
 * CANCELLED =  1;SIGNAL    = -1;CONDITION = -2;PROPAGATE = -3;
 */
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    //上一個(gè)節(jié)點(diǎn)已經(jīng)處于可喚醒狀態(tài)則直接返回
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        /*
         * This node has already set status asking a release
         * to signal it, so it can safely park.
         */
        return true;
    if (ws > 0) {
        /*
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.
         */
         //節(jié)點(diǎn)已失效候引,將失效節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)賦值為當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn),直到前驅(qū)節(jié)點(diǎn)不存在已經(jīng)取消的情況
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
        //通過(guò)CAS將有效的前驅(qū)節(jié)點(diǎn)的狀態(tài)修改為可喚醒狀態(tài) 
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}


private static final boolean compareAndSetWaitStatus(Node node,
                                                     int expect,
                                                     int update) {
    return unsafe.compareAndSwapInt(node, waitStatusOffset,
                                    expect, update);
}

private final boolean parkAndCheckInterrupt() {
    //線程阻塞在這里
    LockSupport.park(this);
    //線程被喚醒時(shí)從這里開(kāi)始執(zhí)行
    return Thread.interrupted();
}

現(xiàn)在來(lái)對(duì)比分析一下嘗試獲取(tryAcquire)敦跌、不可中斷澄干、最大嘗試時(shí)間分別是如何處理的

tryAcquire

//嘗試獲取,獲取不到直接返回了
public boolean tryAcquire() {
    return sync.nonfairTryAcquireShared(1) >= 0;
}
final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}

設(shè)置最大獲取時(shí)間

private boolean doAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException {
    if (nanosTimeout <= 0L)
        return false;
    final long deadline = System.nanoTime() + nanosTimeout;
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            //注意這里的隊(duì)頭只是一個(gè)虛擬節(jié)點(diǎn)柠傍,真正存放線程的節(jié)點(diǎn)為隊(duì)列的第二個(gè)節(jié)點(diǎn)麸俘,以下提到的隊(duì)頭同樣
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return true;
                }
            }
            nanosTimeout = deadline - System.nanoTime();
            if (nanosTimeout <= 0L)
                return false;
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > spinForTimeoutThreshold)
                //區(qū)別在這里,線程只會(huì)park一定時(shí)間惧笛,過(guò)期后再次嘗試獲取失敗便直接返回
                LockSupport.parkNanos(this, nanosTimeout);
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

不可中斷鎖


private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                //和可中斷鎖區(qū)別在這里从媚,可中斷這里直接拋出異常了,但是不可中斷鎖只是設(shè)置一個(gè)值便又去獲取了
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

4. 解鎖操作

//調(diào)用sync的releaseShared方法進(jìn)行解鎖患整,每次解鎖數(shù)量為1
public void release() {
    sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}
//通過(guò)自旋+CAS將共享鎖的數(shù)量加回去
protected final boolean tryReleaseShared(int releases) {
    for (;;) {
        int current = getState();
        int next = current + releases;
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        if (compareAndSetState(current, next))
            return true;
    }
}

private void doReleaseShared() {
    /*
     * Ensure that a release propagates, even if there are other
     * in-progress acquires/releases.  This proceeds in the usual
     * way of trying to unparkSuccessor of head if it needs
     * signal. But if it does not, status is set to PROPAGATE to
     * ensure that upon release, propagation continues.
     * Additionally, we must loop in case a new node is added
     * while we are doing this. Also, unlike other uses of
     * unparkSuccessor, we need to know if CAS to reset status
     * fails, if so rechecking.
     */
    for (;;) {
        Node h = head;
        //CLH隊(duì)列不為空拜效,即有線程在等待獲取鎖
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                //喚醒頭節(jié)點(diǎn)的next節(jié)點(diǎn)
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        //剛才被喚醒的線程已將head設(shè)置為head的下一節(jié)點(diǎn),所以這里不會(huì)相等
        //所以這里一般會(huì)多喚醒一次并级,假如多喚醒的節(jié)點(diǎn)獲取到鎖拂檩,重復(fù)此邏輯,否則多喚醒的節(jié)點(diǎn)會(huì)繼續(xù)阻塞
        if (h == head)                   // loop if head changed
            break;
    }
}

這里再貼一下節(jié)點(diǎn)被喚醒時(shí)的邏輯

private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
            //線程被喚醒后從這里再次執(zhí)行    
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末嘲碧,一起剝皮案震驚了整個(gè)濱河市稻励,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌,老刑警劉巖望抽,帶你破解...
    沈念sama閱讀 219,110評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件加矛,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡煤篙,警方通過(guò)查閱死者的電腦和手機(jī)斟览,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,443評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)辑奈,“玉大人苛茂,你說(shuō)我怎么就攤上這事○埃” “怎么了妓羊?”我有些...
    開(kāi)封第一講書(shū)人閱讀 165,474評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)稍计。 經(jīng)常有香客問(wèn)我躁绸,道長(zhǎng),這世上最難降的妖魔是什么臣嚣? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,881評(píng)論 1 295
  • 正文 為了忘掉前任净刮,我火速辦了婚禮,結(jié)果婚禮上硅则,老公的妹妹穿的比我還像新娘淹父。我一直安慰自己,他們只是感情好抢埋,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,902評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布弹灭。 她就那樣靜靜地躺著,像睡著了一般揪垄。 火紅的嫁衣襯著肌膚如雪穷吮。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 51,698評(píng)論 1 305
  • 那天饥努,我揣著相機(jī)與錄音捡鱼,去河邊找鬼。 笑死酷愧,一個(gè)胖子當(dāng)著我的面吹牛驾诈,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播溶浴,決...
    沈念sama閱讀 40,418評(píng)論 3 419
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼乍迄,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了士败?” 一聲冷哼從身側(cè)響起闯两,我...
    開(kāi)封第一講書(shū)人閱讀 39,332評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤褥伴,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后漾狼,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體重慢,經(jīng)...
    沈念sama閱讀 45,796評(píng)論 1 316
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,968評(píng)論 3 337
  • 正文 我和宋清朗相戀三年逊躁,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了似踱。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,110評(píng)論 1 351
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡稽煤,死狀恐怖核芽,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情念脯,我是刑警寧澤狞洋,帶...
    沈念sama閱讀 35,792評(píng)論 5 346
  • 正文 年R本政府宣布弯淘,位于F島的核電站绿店,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏庐橙。R本人自食惡果不足惜假勿,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,455評(píng)論 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望态鳖。 院中可真熱鬧转培,春花似錦、人聲如沸浆竭。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 32,003評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)邦泄。三九已至删窒,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間顺囊,已是汗流浹背肌索。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,130評(píng)論 1 272
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留特碳,地道東北人诚亚。 一個(gè)月前我還...
    沈念sama閱讀 48,348評(píng)論 3 373
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像午乓,于是被迫代替她去往敵國(guó)和親站宗。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,047評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容