(六)手撕并發(fā)編程之基于Semaphore與CountDownLatch分析AQS共享模式實(shí)現(xiàn)

引言

在上篇文章深入剖析并發(fā)之AQS獨(dú)占鎖&重入鎖(ReetrantLock)及Condition實(shí)現(xiàn)原理中我們曾基于ReetrantLock鎖分析了AQS獨(dú)占模式的實(shí)現(xiàn)原理,本章則準(zhǔn)備從Semaphore信號量的角度出發(fā)一探AQS共享模式的具體實(shí)現(xiàn)忍坷。共享模式與獨(dú)占模式區(qū)別在于:共享模式下允許多條線程同時(shí)獲取鎖資源佩研,而在之前分析的獨(dú)占模式中,在同一時(shí)刻只允許一條線程持有鎖資源晰骑。

一硕舆、快速認(rèn)識Semaphore信號量及實(shí)戰(zhàn)

Semaphore信號量是java.util.concurrent(JUC)包下的一個(gè)并發(fā)工具類抚官,可以用來控制同一時(shí)刻訪問臨界資源(共享資源)的線程數(shù)耗式,以確保訪問臨界資源的線程能夠正確、合理的使用公共資源彪见。而其內(nèi)部則于ReetrantLock一樣余指,都是通過直接或間接的調(diào)用AQS框架的方法實(shí)現(xiàn)酵镜。在Semaphore中存在一個(gè)“許可”的概念:

在初始化Semaphore信號量需要為這個(gè)許可傳入一個(gè)數(shù)值,該數(shù)值表示表示同一時(shí)刻可以訪問臨界資源的最大線程數(shù)垢粮,也被稱為許可集蜡吧。一條線程想要訪問臨界資源則需要先執(zhí)行acquire()獲取一個(gè)許可昔善,如果線程在獲取時(shí)許可集已經(jīng)被分配完了君仆,那么該線程則會進(jìn)入阻塞等待狀態(tài),直至有其他持有許可的線程釋放后才有可能獲取到許可钥庇。當(dāng)線程訪問完成臨界資源后則需要執(zhí)行release()方法釋放已獲取的許可上沐。

其實(shí)通過如上這段描述参咙,我們不難發(fā)現(xiàn)蕴侧,Semaphore信號量里面的“許可”概念與前面我們文章中净宵,分析的互斥鎖中的“同步狀態(tài)標(biāo)識”有著異曲同工之妙择葡,其實(shí)也就是我們所談的“鎖資源”剃氧。下面我們可以簡單看看Semaphore類中所提供的方法:

// 調(diào)用該方法后線程會從許可集中嘗試獲取一個(gè)許可
public void acquire()

// 線程調(diào)用該方法時(shí)會釋放已獲取的許可
public void release()

// Semaphore構(gòu)造方法:permits→許可集數(shù)量
Semaphore(int permits) 

// Semaphore構(gòu)造方法:permits→許可集數(shù)量朋鞍,fair→公平與非公平
Semaphore(int permits, boolean fair) 

// 從信號量中獲取許可滥酥,該方法不響應(yīng)中斷
void acquireUninterruptibly() 

// 返回當(dāng)前信號量中未被獲取的許可數(shù)
int availablePermits() 

// 獲取并返回當(dāng)前信號量中立即未被獲取的所有許可
int drainPermits() 

// 返回等待獲取許可的所有線程Collection集合
protected  Collection<Thread> getQueuedThreads();

// 返回等待獲取許可的線程估計(jì)數(shù)量
int getQueueLength() 

// 查詢是否有線程正在等待獲取當(dāng)前信號量中的許可
boolean hasQueuedThreads() 

// 返回當(dāng)前信號量的公平類型坎吻,如為公平鎖返回true禾怠,非公平鎖為false
boolean isFair() 

// 獲取當(dāng)前信號量中一個(gè)許可,當(dāng)沒有許可可用時(shí)直接返回false不阻塞線程
boolean tryAcquire() 

// 在給定時(shí)間內(nèi)獲取當(dāng)前信號量中一個(gè)許可芽偏,超時(shí)還未獲取成功則返回false
boolean tryAcquire(long timeout, TimeUnit unit) 

如上便是Semaphore信號量提供的一些主要方法污尉,下面我們可以上個(gè)簡單小案例演示被碗,需求如下:

現(xiàn)在項(xiàng)目中有個(gè)需求,每晚需要長時(shí)間處理大量的Excel表數(shù)據(jù)與數(shù)據(jù)庫中數(shù)據(jù)對賬請求兴喂,由于文件讀取屬于IO密集型任務(wù)衣迷,我們可以使用多線程的方式優(yōu)化壶谒,加速處理速度汗菜。但是在該項(xiàng)目中因?yàn)檫€有其他業(yè)務(wù)要處理陨界,為了保證整體性能痛阻,所以對于該業(yè)務(wù)的實(shí)現(xiàn)最多只能使用三個(gè)數(shù)據(jù)庫連接對象录平。因?yàn)槿绻?dāng)前業(yè)務(wù)線程同一時(shí)刻獲取的數(shù)據(jù)庫連接數(shù)量過多斗这,會導(dǎo)致其他業(yè)務(wù)線程需要操作數(shù)據(jù)庫時(shí)獲取不到連接對象阻塞(因?yàn)閿?shù)據(jù)庫連接對象與線程對象一樣數(shù)據(jù)珍惜資源/資源有限)表箭,從而引發(fā)整體程序堆積大量客戶端請求導(dǎo)致系統(tǒng)整體癱瘓免钻。這時(shí)我們就需要控制同一時(shí)刻最多只有三條線程拿到數(shù)據(jù)庫連接進(jìn)行操作极舔,此時(shí)就可以使用Semaphore做流量控制拆魏。

public class SemaphoreDemo {
    public static void main(String[] args) {
        // 自定義線程池(后續(xù)文章會詳細(xì)分析到)
        // 環(huán)境:四核四線程CPU 任務(wù)阻塞系數(shù)0.9
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
                4*2, 40,
                30, TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(1024*10),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy());
        // 設(shè)置信號量同一時(shí)刻最大線程數(shù)為3
        final Semaphore semaphore = new Semaphore(3);
        // 模擬100個(gè)對賬請求
        for (int index = 0; index < 100; index++) {
            final int serial = index;
            threadPool.execute(()->{
                try {
                    // 使用acquire()獲取許可
                    semaphore.acquire();
                    System.out.println(Thread.currentThread().getName() +
                            "線程成功獲取許可!請求序號: " + serial);
                    // 模擬數(shù)據(jù)庫IO
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }  finally {
                    // 臨界資源訪問結(jié)束后釋放許可
                    semaphore.release();
                }
            });
        }
        // 關(guān)閉線程池資源
        threadPool.shutdown();
    }
}

上述代碼中拥峦,在創(chuàng)建Semaphore信號量對象時(shí)為該對象初始化了三個(gè)許可略号,也就意味著在同一時(shí)刻允許三條線程同時(shí)訪問臨界資源玄柠。線程在訪問臨界資源之前随闪,需要使用acquire()先成功獲取一個(gè)許可铐伴,才能訪問臨界資源当宴。如果一條線程獲取許可泽疆,該信號量對象的許可集已經(jīng)被分配完時(shí)殉疼,新來的線程需進(jìn)入等待狀態(tài)瓢娜。之前獲取許可成功的線程在操作完成之后需執(zhí)行release()方法釋放已獲取的許可眠砾。我們執(zhí)行如上案例則可看到執(zhí)行結(jié)果幾乎每隔一千毫秒會出現(xiàn)三條線程同時(shí)訪問,如下:

/*
第一秒:程序運(yùn)行初
  pool-1-thread-1線程成功獲取許可柒巫!請求序號: 0
  pool-1-thread-2線程成功獲取許可堡掏!請求序號: 1
  pool-1-thread-3線程成功獲取許可布疼!請求序號: 2
第二秒:程序運(yùn)行1000ms后
  pool-1-thread-4線程成功獲取許可!請求序號: 3
  pool-1-thread-5線程成功獲取許可砾层!請求序號: 4
  pool-1-thread-6線程成功獲取許可肛炮!請求序號: 5
第三秒:程序運(yùn)行2000ms后
  pool-1-thread-7線程成功獲取許可侨糟!請求序號: 6
  pool-1-thread-8線程成功獲取許可秕重!請求序號: 7
  pool-1-thread-2線程成功獲取許可溶耘!請求序號: 8
第四秒:程序運(yùn)行3000ms后
  ........
*/

如上便是一個(gè)簡單使用Demo服鹅,總體看來關(guān)于Semaphore信號量的用法還是比較簡單的企软。不過我們也在前面提到過這么一句話:

Semaphore信號量里的“許可”概念與前面我們文章中分析的互斥鎖的“同步狀態(tài)標(biāo)識”有著異曲同工之妙仗哨。

那我們能否使用Semaphore信號量實(shí)現(xiàn)一把獨(dú)占鎖呢厌漂?答案也是肯定的桩卵,可以。我們只需要在創(chuàng)建信號量對象時(shí)高职,只給許可集分配一個(gè)數(shù)量即可,如下:

final Semaphore semaphore = new Semaphore(1);

二寥粹、Semaphore信號量中AQS的共享模式實(shí)現(xiàn)

Semaphore信號量其實(shí)與我們上篇文章所分析的ReetrantLock類結(jié)構(gòu)大致相同,其內(nèi)部存在繼承自AbstractQueuedSynchronizer內(nèi)部Sync類以及它的兩個(gè)子類:FairSync公平鎖類和NofairSync非公平鎖類媚狰,從這我們也可以看出崭孤,Semaphore的內(nèi)部實(shí)現(xiàn)其實(shí)與ReetrantLock一樣都是基于AQS組件實(shí)現(xiàn)的辨宠。在上一篇文章中我們也曾提到货裹,AQS設(shè)計(jì)的初衷并不打算直接作為調(diào)用類對外暴露服務(wù)弧圆,而只是作為并發(fā)包基礎(chǔ)組件墓阀,為其他并發(fā)工具類提供基礎(chǔ)設(shè)施,如維護(hù)同步隊(duì)列经伙、控制/修改同步狀態(tài)等帕膜。具體的獲取鎖和釋放鎖的邏輯則交給子類自己去實(shí)現(xiàn)溢十,從而也能最大程度的保留框架的靈活性张弛。因此無論是Semaphore還是ReetrantLock都需要獨(dú)自實(shí)現(xiàn)tryAcquireShared(int arg)獲取鎖方法以及tryReleaseShared(int arg)釋放鎖方法吞鸭。AQS總體類圖關(guān)系如下:

AQS DML類圖結(jié)構(gòu)

如上圖刻剥,Semaphore與ReetrantLock的結(jié)構(gòu)大致相同造虏,而實(shí)現(xiàn)思路也大致相同,獲取鎖(許可)的方法tryAcquireShared(arg)分別由兩個(gè)子類FairSync和NofairSync實(shí)現(xiàn)陶珠,因?yàn)楣芥i和非公平鎖的加鎖方式畢竟存在些許不同背率,而釋放鎖tryReleaseShared(arg)的邏輯則交由Sync實(shí)現(xiàn)寝姿,因?yàn)獒尫挪僮鞫际窍嗤模虼朔旁诟割怱ync中實(shí)現(xiàn)自然是最好的方式埃篓。下面我們就從Semaphore源碼的角度分析AQS共享模式的具體實(shí)現(xiàn)原理架专,我們先從非公平鎖的獲取鎖實(shí)現(xiàn)開始部脚。

2.1委刘、AQS共享模式之Semaphore的NofairSync非公平鎖實(shí)現(xiàn)

我們在創(chuàng)建Semaphore對象時(shí)鹰椒,也和ReetrantLock一樣手動(dòng)選擇公平鎖和非公平鎖:

public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

通過Semaphore的構(gòu)造函數(shù)我們不難發(fā)現(xiàn)淆珊,如果在我們創(chuàng)建時(shí)不選擇聲明公平類型奸汇,Semaphore默認(rèn)創(chuàng)建的是非公平鎖類型擂找,NonfairSync構(gòu)造如下:

static final class NonfairSync extends Sync {
    // 構(gòu)造函數(shù):將給定的許可數(shù)permits傳給父類同步狀態(tài)標(biāo)識state
    NonfairSync(int permits) {
          super(permits);
    }
   // 釋放鎖的方法實(shí)現(xiàn)則是直接調(diào)用父類Sync的釋放鎖方法
   protected int tryAcquireShared(int acquires) {
       return nonfairTryAcquireShared(acquires);
   }
}

從如上源碼中婴洼,我們可以得知Semaphore中的非公平鎖NonfairSync類的構(gòu)造函數(shù)是基于調(diào)用父類Sync構(gòu)造函數(shù)完成的柬采,而在創(chuàng)建Semaphore對象時(shí)傳入的許可數(shù)permits最終則會傳遞給AQS同步器的同步狀態(tài)標(biāo)識state,如下:

// 父類 - Sync類構(gòu)造函數(shù)
Sync(int permits) {
    setState(permits); // 調(diào)用AQS內(nèi)部的set方法
}

// AQS(AbstractQueuedSynchronizer)同步器
public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer {
    // 同步狀態(tài)標(biāo)識
    private volatile int state;
    
    protected final int getState() {
        return state;
    }
    protected final void setState(int newState) {
        state = newState;
    }
    // 對state變量進(jìn)行CAS操作
    protected final boolean compareAndSetState(int expect, int update) {
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }
}

從上述分析中可知礁遣,Semaphore對象創(chuàng)建時(shí)傳入的許可數(shù)permits祟霍,實(shí)則其實(shí)最終是在對AQS內(nèi)部的state進(jìn)行初始化沸呐。初始化完成后崭添,state代表著當(dāng)前信號量對象的可用許可數(shù)呼渣。

2.1.1屁置、信號量中非公平鎖NonfairSync獲取許可/鎖實(shí)現(xiàn)

我們在使用Semaphore時(shí)獲取鎖是調(diào)用Semaphore.acquire()方法仁连,怖糊,調(diào)用該方法的線程會開始獲取鎖/許可伍伤,嘗試對permits/state進(jìn)行CAS加一,CAS成功則代表獲取成功麦乞。下面我們來分析一下Semaphore獲取許可的方法acquire()的具體實(shí)現(xiàn)姐直,源碼如下:

// Semaphore類 → acquire()方法
public void acquire() throws InterruptedException {
      // Sync類繼承AQS声畏,此處直接調(diào)用AQS內(nèi)部的acquireSharedInterruptibly()方法
      sync.acquireSharedInterruptibly(1);
  }

// AbstractQueuedSynchronizer類 → acquireSharedInterruptibly()方法
public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    // 判斷是否出現(xiàn)線程中斷信號(標(biāo)志)
    if (Thread.interrupted())
        throw new InterruptedException();
    // 如果tryAcquireShared(arg)執(zhí)行結(jié)果不小于0愿棋,則線程獲取同步狀態(tài)成功
    if (tryAcquireShared(arg) < 0)
        // 未獲取成功加入同步隊(duì)列阻塞等待
        doAcquireSharedInterruptibly(arg);
}

信號量獲取許可的方法acquire()最終是通過Sync對象調(diào)用AQS內(nèi)部的acquireSharedInterruptibly()方法完成的糠雨,而acquireSharedInterruptibly()在獲取同步狀態(tài)標(biāo)識的過程中是可以響應(yīng)線程中斷操作的甘邀,如果該操作沒有沒中斷垮庐,則首先調(diào)用tryAcquireShared(arg)嘗試獲取一個(gè)許可數(shù)突硝,獲取成功則返回執(zhí)行業(yè)務(wù)解恰,方法結(jié)束护盈。如果獲取失敗,則調(diào)用doAcquireSharedInterruptibly(arg)將當(dāng)前線程加入同步隊(duì)列阻塞等待紊服。不過值得我們注意的是:tryAcquireShared(arg)方法是AQS提供的模板方法欺嗤,并沒有提供具體實(shí)現(xiàn)煎饼,而是把具體實(shí)現(xiàn)的邏輯交由子類完成吆玖,我們先看看信號量中非公平鎖NonfairSync類的實(shí)現(xiàn):

    // Semaphore類 → NofairSync內(nèi)部類 → tryAcquireShared()方法
protected int tryAcquireShared(int acquires) {
    // 調(diào)用了父類Sync中的實(shí)現(xiàn)方法
    return nonfairTryAcquireShared(acquires);
}

// Syn類 → nonfairTryAcquireShared()方法
abstract static class Sync extends AbstractQueuedSynchronizer {
    final int nonfairTryAcquireShared(int acquires) {
         // 開啟自旋死循環(huán)
         for (;;) {
             int available = getState();
             int remaining = available - acquires;
             // 判斷信號量中可用許可數(shù)是否已<0或者CAS執(zhí)行是否成功
             if (remaining < 0 ||
                 compareAndSetState(available, remaining))
                 return remaining;
         }
     }
}

nonfairTryAcquireShared(acquires)方法首先獲取到state值后马篮,減去一得到remaining值沾乘,如果不小于0則代表著當(dāng)前信號量中還存在可用許可,當(dāng)前線程開始嘗試cas更新state值浑测,cas成功則代表獲取同步狀態(tài)成功翅阵,返回remaining值。反之,如果remaining值小于0則代表著信號量中的許可數(shù)已被其他線程獲取掷匠,目前不存在可用許可數(shù)槐雾,直接返回小于0的remaining值,nonfairTryAcquireShared(acquires)方法執(zhí)行結(jié)束擎值,回到AQS的acquireSharedInterruptibly()方法鸠儿。當(dāng)返回的remaining值小于0時(shí),if(tryAcquireShared(arg)<0)條件成立田晚,進(jìn)入if執(zhí)行doAcquireSharedInterruptibly(arg)方法將當(dāng)前線程加入同步隊(duì)列阻塞贤徒,等待其他線程釋放同步狀態(tài)。線程入列方法如下:

// AbstractQueuedSynchronizer類 → doAcquireSharedInterruptibly()方法
private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
    // 創(chuàng)建節(jié)點(diǎn)狀態(tài)為Node.SHARED共享模式的節(jié)點(diǎn)并將其加入同步隊(duì)列
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
     // 開啟自旋操作
     for (;;) {
         final Node p = node.predecessor();
         // 判斷前驅(qū)節(jié)點(diǎn)是否為head
         if (p == head) {
             // 嘗試獲取同步狀態(tài)state
             int r = tryAcquireShared(arg);
             // 如果r不小于0說明獲取同步狀態(tài)成功
             if (r >= 0) {
                 // 將當(dāng)前線程結(jié)點(diǎn)設(shè)置為頭節(jié)點(diǎn)并喚醒后繼節(jié)點(diǎn)線程             
                 setHeadAndPropagate(node, r);
                 p.next = null; // 置空方便GC
                 failed = false;
                 return;
             }
         }
       // 調(diào)整同步隊(duì)列中node節(jié)點(diǎn)的狀態(tài)并判斷是否應(yīng)該被掛起 
       // 并判斷是否存在中斷信號序宦,如果需要中斷直接拋出異常結(jié)束執(zhí)行
         if (shouldParkAfterFailedAcquire(p, node) &&
             parkAndCheckInterrupt())
             throw new InterruptedException();
     }
    } finally {
     if (failed)
         // 結(jié)束該節(jié)點(diǎn)線程的請求
         cancelAcquire(node);
    }
}

// AbstractQueuedSynchronizer類 → setHeadAndPropagate()方法
private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // 獲取同步隊(duì)列中原本的head頭節(jié)點(diǎn)
    setHead(node); // 將傳入的node節(jié)點(diǎn)設(shè)置為頭節(jié)點(diǎn)
    /*
     * propagate=剩余可用許可數(shù),h=舊的head節(jié)點(diǎn)
     * h==null,(h=head)==null:
     *      非空判斷的標(biāo)準(zhǔn)寫法疫剃,避免原本head以及新的頭節(jié)點(diǎn)node為空
     * 如果當(dāng)前信號量對象中剩余可用許可數(shù)大于0或者
     * 原本頭節(jié)點(diǎn)h或者新的頭節(jié)點(diǎn)node不是結(jié)束狀態(tài)則喚醒后繼節(jié)點(diǎn)線程
     * 
     * 寫兩個(gè)if的原因在于避免造成不必要的喚醒,因?yàn)楹苡锌赡軉拘蚜撕罄m(xù)
     * 節(jié)點(diǎn)的線程之后壤躲,還沒有線程釋放許可/鎖凌唬,從而導(dǎo)致再次陷入阻塞
     */
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        // 避免傳入的node為同步隊(duì)列的唯一節(jié)點(diǎn),
        // 因?yàn)殛?duì)列中如果只存在node一個(gè)節(jié)點(diǎn)更耻,那么后驅(qū)節(jié)點(diǎn)s必然為空
        if (s == null || s.isShared())
            doReleaseShared(); // 喚醒后繼節(jié)點(diǎn)
    }
}

doAcquireSharedInterruptibly(arg)方法中總共做了三件事:

  • 一秧均、創(chuàng)建一個(gè)狀態(tài)為Node.SHARED共享模式的節(jié)點(diǎn),并通過addWaiter()加入隊(duì)列
  • 二讶隐、加入成功后開啟自旋巫延,判斷前驅(qū)節(jié)點(diǎn)是否為head,是則嘗試獲取同步狀態(tài)標(biāo)識疼阔,獲取成功后婆廊,將自己設(shè)置為head節(jié)點(diǎn),如果可用許可數(shù)大于0則喚醒后繼節(jié)點(diǎn)的線程
  • 三宾舅、如果前驅(qū)節(jié)點(diǎn)不為head的節(jié)點(diǎn)以及前驅(qū)節(jié)點(diǎn)為head節(jié)點(diǎn)但獲取同步狀態(tài)失敗的節(jié)點(diǎn)扶平,則調(diào)用shouldParkAfterFailedAcquire(p,node)判斷前驅(qū)節(jié)點(diǎn)的狀態(tài)是否為SIGNAL狀態(tài)(一般shouldParkAfterFailedAcquire(p,node)中的for循環(huán)至少需要執(zhí)行兩次以上才會返回ture结澄,第一次把前驅(qū)節(jié)點(diǎn)設(shè)置為SIGNAL狀態(tài),第二次檢測到SIGNAL狀態(tài)),如果是則調(diào)用parkAndCheckInterrupt()掛起當(dāng)前線程并返回線程中斷狀態(tài)

如上便是doAcquireSharedInterruptibly(arg)方法的大概工作破镰,接下來我們可以看看shouldParkAfterFailedAcquire()以及parkAndCheckInterrupt()方法:

// AbstractQueuedSynchronizer類 → shouldParkAfterFailedAcquire()方法
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    // 獲取當(dāng)前節(jié)點(diǎn)的等待狀態(tài)
    int ws = pred.waitStatus;
    // 如果為等待喚醒(SIGNAL)狀態(tài)則返回true
    if (ws == Node.SIGNAL)
        return true;
    // 如果當(dāng)前節(jié)點(diǎn)等待狀態(tài)大于0則說明是結(jié)束狀態(tài),
    // 遍歷前驅(qū)節(jié)點(diǎn)直到找到?jīng)]有結(jié)束狀態(tài)的節(jié)點(diǎn)
    if (ws > 0) {
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        // 如果當(dāng)前節(jié)點(diǎn)等待狀態(tài)小于0又不是SIGNAL狀態(tài)孕似,
        // 則將其設(shè)置為SIGNAL狀態(tài),代表該節(jié)點(diǎn)的線程正在等待喚醒
        // 也就是代表節(jié)點(diǎn)是剛從Condition的條件等待隊(duì)列轉(zhuǎn)移到同步隊(duì)列泛烙,
        // 節(jié)點(diǎn)狀態(tài)為CONDITION狀態(tài)(Semaphore中不存在condition的概念蔽氨,
        // 所以同步隊(duì)列不會出現(xiàn)這個(gè)狀態(tài)的節(jié)點(diǎn),此處代碼不會執(zhí)行)
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

// AbstractQueuedSynchronizer類 → parkAndCheckInterrupt()方法
private final boolean parkAndCheckInterrupt() {
    // 將當(dāng)前線程掛起
    LockSupport.park(this);
    // 獲取線程中斷狀態(tài),interrupted()是判斷當(dāng)前中斷狀態(tài)自赔,
    // 而并不是中斷線程蟋滴,線程需要中斷返回true津函,反之false
    return Thread.interrupted();
}

LockSupport → park()方法
public static void park(Object blocker) {
    Thread t = Thread.currentThread();
    // 設(shè)置當(dāng)前線程的監(jiān)視器blocker
    setBlocker(t, blocker);
    // 調(diào)用了native方法到JVM級別的阻塞機(jī)制阻塞當(dāng)前線程
    UNSAFE.park(false, 0L);
    // 阻塞結(jié)束后把blocker置空
    setBlocker(t, null);
}

shouldParkAfterFailedAcquire()方法的作用是判斷節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)是否為等待喚醒狀態(tài)(SIGNAL狀態(tài)),如果是則返回true允坚。如果前驅(qū)節(jié)點(diǎn)的waitStatus大于0(只有CANCELLED結(jié)束狀態(tài)=1>0)稠项,既代表該前驅(qū)節(jié)點(diǎn)已沒有用了展运,應(yīng)該從同步隊(duì)列移除拗胜,執(zhí)行do/while循環(huán)遍歷所有前前驅(qū)節(jié)點(diǎn),直到尋找到非CANCELLED結(jié)束狀態(tài)的節(jié)點(diǎn)勘畔。如果節(jié)點(diǎn)狀態(tài)為SIGNAL等待喚醒狀態(tài)則直接調(diào)用parkAndCheckInterrupt()掛起當(dāng)前線程。至此整個(gè)Semaphore.acquire()獲取許可的方法流程結(jié)束诉字。如下圖:

AQS之圖解共享式獲取鎖過程

如上圖琅轧,在AQS同步器中存在一個(gè)變量state冲杀,Semaphore信號量對象在初始化時(shí)傳遞的permits許可數(shù)會間接的賦值給AQS中的state同步標(biāo)識剩檀,而permits/state則代表著同一時(shí)刻可同時(shí)訪問臨界/共享資源的最大線程數(shù)。當(dāng)一條線程調(diào)用Semaphore.acquire()獲取許可時(shí)运嗜,會首先判斷state是否大于0,如果大于則代表還有可用許可數(shù)奋救,state減1冗荸,線程獲取成功并返回執(zhí)行盔粹。直到state為零時(shí),代表著當(dāng)前信號量已經(jīng)不存在可用許可數(shù)了进萄,后續(xù)請求的線程則需要封裝成Node節(jié)點(diǎn)并將其加入同步隊(duì)列開啟自旋操作直至有線程釋放許可(state加一)。

至此援雇,AQS共享模式中非公平鎖的獲取鎖原理分析完畢具温。但是我們?nèi)缟戏治龅氖强身憫?yīng)線程中斷請求的獲取許可方式,而Semaphore中也實(shí)現(xiàn)了一套不可中斷式的獲取方法剂习,如下:

// Semaphore類 → acquireUninterruptibly()方法
public void acquireUninterruptibly() {
    sync.acquireShared(1);
}

// AbstractQueuedSynchronizer類 → acquireShared()方法
public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

// AbstractQueuedSynchronizer類 → doAcquireShared()方法
private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                // 在前面的可中斷式獲取鎖方法中此處是直接拋出異常強(qiáng)制中斷線程的
                // 而在不可中斷式的獲取方法中尸曼,這里是沒有拋出異常中斷線程的
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

觀察如上源碼不難發(fā)現(xiàn)冤竹,可響應(yīng)線程中斷的方法與不可響應(yīng)線程中斷的方法區(qū)別在于:

可響應(yīng)線程中斷的方法在每次操作之前會先檢測線程中斷信號,如果線程需要中斷操作,則直接拋出異常強(qiáng)制中斷線程的執(zhí)行肠阱。反之,不可響應(yīng)線程中斷的方法不會檢測線程中斷信號噪伊,而且不會拋出異常強(qiáng)制中斷。

2.1.2拙寡、信號量中非公平鎖NonfairSync釋放許可/鎖實(shí)現(xiàn)

使用Semaphore時(shí)釋放鎖則調(diào)用的是Semaphore.release()方法般堆,調(diào)用該方法之后線程持有的許可會被釋放,同時(shí)permits/state加一和橙,接下來Semaphore獲取許可的方法release()的具體實(shí)現(xiàn),源碼如下:

// Semaphore類 → release()方法
public void release() {
    sync.releaseShared(1);
}

// AbstractQueuedSynchronizer類 → releaseShared(arg)方法
public final boolean releaseShared(int arg) {
    // 調(diào)用子類Semaphore中tryReleaseShared()方法實(shí)現(xiàn)
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

與之前獲取許可的方法一樣,Semaphore釋放許可的方法release()也是通過間接調(diào)用AQS內(nèi)部的releaseShared(arg)完成乡翅。因?yàn)锳QS的releaseShared(arg)是魔法方法,所以最終的邏輯實(shí)現(xiàn)由Semaphore的子類Sync完成靶累,如下:

// Semaphore類 → Sync子類 → tryReleaseShared()方法
protected final boolean tryReleaseShared(int releases) {
    for (;;) {
        // 獲取AQS中當(dāng)前同步狀態(tài)state值
        int current = getState();
        // 對當(dāng)前的state值進(jìn)行增加操作
        int next = current + releases;
        // 不可能出現(xiàn)拴曲,除非傳入的releases為負(fù)數(shù)
        if (next < current) 
            throw new Error("Maximum permit count exceeded");
        // CAS更新state值為增加之后的next值
        if (compareAndSetState(current, next))
            return true;
    }
}

釋放鎖/許可的方法邏輯相對來說比較簡單,對AQS中的state加一釋放獲取的同步狀態(tài)委乌。不過值得注意的是:在我們上篇文章分享的AQS獨(dú)占模式實(shí)現(xiàn)中心软,釋放鎖的邏輯中是沒有保證線程安全的耳贬,因?yàn)楠?dú)占模式的釋放鎖邏輯永遠(yuǎn)只會存在一條線程同時(shí)操作顷蟆。而在共享模式中,可能會存在多條線程同時(shí)釋放許可/鎖資源肮街,所以在此處使用了CAS+自旋的方式保證線程安全問題。

如果此處tryReleaseShared(releases)CAS更新成功,那么則會進(jìn)入if(tryReleaseShared(arg))中執(zhí)行doReleaseShared();喚醒后繼節(jié)點(diǎn)線程仪际。

// AbstractQueuedSynchronizer類 → doReleaseShared()方法
private void doReleaseShared() {
    /*
     * 為了防止釋放過程中有其他線程進(jìn)入隊(duì)列,這里必須開啟自旋
     * 如果頭節(jié)點(diǎn)設(shè)置失敗則重新檢測繼續(xù)循環(huán)
     */
    for (;;) {
        // 獲取隊(duì)列head頭節(jié)點(diǎn)
        Node h = head; 
        // 如果頭節(jié)點(diǎn)不為空并且隊(duì)列中還存在其他節(jié)點(diǎn)
        if (h != null && h != tail) { 
            // 獲取頭節(jié)點(diǎn)的節(jié)點(diǎn)狀態(tài)
            int ws = h.waitStatus; 
            // 如果節(jié)點(diǎn)狀態(tài)為SIGNAL等待喚醒狀態(tài)則代表
            if (ws == Node.SIGNAL) { 
                // 嘗試cas修改節(jié)點(diǎn)狀態(tài)值為0
                // 失敗則繼續(xù)下次循環(huán)
                // 成功則喚醒頭節(jié)點(diǎn)的后繼節(jié)點(diǎn)
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;         
                unparkSuccessor(h);  // 喚醒后繼節(jié)點(diǎn)線程
            }
            // 節(jié)點(diǎn)狀態(tài)為0時(shí)嘗試將節(jié)點(diǎn)狀態(tài)修改為PROPAGATE傳播狀態(tài)
            // 失敗則跳出循環(huán)繼續(xù)下次循環(huán)
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;               
        }
        // 如果當(dāng)前隊(duì)列頭節(jié)點(diǎn)發(fā)生變化繼續(xù)循環(huán),反之則終止自旋
        if (h == head)
            break;
    }
}
// AbstractQueuedSynchronizer類 → unparkSuccessor()方法
// 參數(shù):傳入需要喚醒后繼節(jié)點(diǎn)的節(jié)點(diǎn)
private void unparkSuccessor(Node node) {
    // 獲取node節(jié)點(diǎn)的線程狀態(tài)
    int ws = node.waitStatus;
    if (ws < 0)
        // 設(shè)置head節(jié)點(diǎn)為0
        compareAndSetWaitStatus(node, ws, 0);
    // 獲取后繼節(jié)點(diǎn)
    Node s = node.next;
    // 如果后繼節(jié)點(diǎn)為空或線程狀態(tài)已經(jīng)結(jié)束
    if (s == null || s.waitStatus > 0) {
        s = null;
        // 遍歷整個(gè)隊(duì)列拿到可喚醒的節(jié)點(diǎn)
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        // 喚醒后繼節(jié)點(diǎn)線程
        LockSupport.unpark(s.thread);
}

doReleaseShared()方法直接在執(zhí)行中獲取head節(jié)點(diǎn),通過調(diào)用unparkSuccessor()方法喚醒head后繼節(jié)點(diǎn)中的線程挣输。而因?yàn)樵摲椒w的邏輯是一個(gè)for(;;){}死循環(huán),退出的條件為:只當(dāng)隊(duì)頭不發(fā)生改變時(shí)才退出向瓷,如果發(fā)生改變了則代表著一定有其他線程在當(dāng)前線程釋放鎖/許可的過程中獲取到了鎖猖任,所以當(dāng)前循環(huán)會繼續(xù),在第二次循環(huán)過程中长搀,在滿足條件h.waitStauts==0的情況下彻况,這里會把head節(jié)點(diǎn)的waitStauts設(shè)置為Node.PROPAGATE傳播狀態(tài)是為了保證喚醒傳遞良蛮。因?yàn)锳QS共享模式下是會出現(xiàn)多個(gè)線程同時(shí)對同步狀態(tài)標(biāo)識state進(jìn)行操作,如線程T1在執(zhí)行release()→doReleaseShared()釋放許可操作皮胡,剛喚醒后繼線程準(zhǔn)備替換為head頭節(jié)點(diǎn)(準(zhǔn)備替換但是還沒替換),此時(shí)另外一條線程T2正好在同一時(shí)刻執(zhí)行acquire()→doAcquireShared()→setHeadAndPropagate()獲取鎖操作烹笔,假設(shè)T2線程獲取的是最后一個(gè)可用許可饰豺,在執(zhí)行到setHeadAndPropagate()方法(這個(gè)方法中存在一個(gè)超長判斷)饶套,傳入的propagate=0

// AbstractQueuedSynchronizer類 → setHeadAndPropagate()方法
private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // 獲取同步隊(duì)列中原本的head頭節(jié)點(diǎn)
    setHead(node); // 將傳入的node節(jié)點(diǎn)設(shè)置為頭節(jié)點(diǎn)
    /*
     * propagate=剩余可用許可數(shù),h=舊的head節(jié)點(diǎn)
     * h==null,(h=head)==null:
     *      非空判斷的標(biāo)準(zhǔn)寫法,避免原本head以及新的頭節(jié)點(diǎn)node為空
     * 如果當(dāng)前信號量對象中剩余可用許可數(shù)大于0或者
     * 原本頭節(jié)點(diǎn)h或者新的頭節(jié)點(diǎn)node不是結(jié)束狀態(tài)則喚醒后繼節(jié)點(diǎn)線程
     * 
     * 寫兩個(gè)if的原因在于避免造成不必要的喚醒构挤,因?yàn)楹苡锌赡軉拘蚜撕罄m(xù)
     * 節(jié)點(diǎn)的線程之后,還沒有線程釋放許可/鎖矾飞,從而導(dǎo)致再次陷入阻塞
     */
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        // 避免傳入的node為同步隊(duì)列的唯一節(jié)點(diǎn)驼鹅,
        // 因?yàn)殛?duì)列中如果只存在node一個(gè)節(jié)點(diǎn)豺型,那么后驅(qū)節(jié)點(diǎn)s必然為空
        if (s == null || s.isShared())
            doReleaseShared(); // 喚醒后繼節(jié)點(diǎn)
    }
}

// 超長判斷:該判斷的作用在于面對各種特殊情況能夠時(shí)保證及時(shí)獲取鎖
if (propagate > 0 || h == null || h.waitStatus < 0 ||
    (h = head) == null || h.waitStatus < 0) {
    Node s = node.next;
    // 避免傳入的node為同步隊(duì)列的唯一節(jié)點(diǎn)剪验,
    // 因?yàn)殛?duì)列中如果只存在node一個(gè)節(jié)點(diǎn)娶眷,那么后驅(qū)節(jié)點(diǎn)s必然為空
    if (s == null || s.isShared())
        doReleaseShared(); // 喚醒后繼節(jié)點(diǎn)
}

根據(jù)這個(gè)超長判斷的邏輯,因?yàn)閭魅氲?code>propagate=0代表著當(dāng)前已經(jīng)沒有可用許可數(shù)了伤塌,不滿足超長判斷中的第一個(gè)條件propagate>0,所以T2線程獲取鎖之后理論上是不需要再喚醒其他線程獲取鎖/許可熊痴,但是因?yàn)門1線程已經(jīng)訪問完成臨界資源了正在釋放持有的許可,那么就會造成一種情況:隊(duì)列中head節(jié)點(diǎn)的后繼節(jié)點(diǎn)如果此時(shí)嘗試獲取鎖/許可,那么有很大幾率獲取到T1線程釋放的許可鄙煤。所以在釋放鎖時(shí),將head節(jié)點(diǎn)的waitStauts設(shè)置為Node.PROPAGATE傳播狀態(tài)值為-3,滿足這個(gè)超長判斷中的第三個(gè)條件h.waitStatus < 0锥腻,所以此時(shí)T2也會喚醒head后繼節(jié)點(diǎn)中等待獲取鎖/許可資源的線程。這樣去實(shí)現(xiàn)的好處在于:能夠充分照顧到head的后繼節(jié)點(diǎn)同時(shí)也能保證喚醒的傳遞。

至于這里為什么獲取到鎖/許可的線程需要繼續(xù)喚醒后繼節(jié)點(diǎn)線程甲葬?因?yàn)檫@里是共享鎖,而不是獨(dú)占鎖。一個(gè)線程剛獲得了共享鎖/許可棉钧,那么很有可能還有剩余的共享鎖可供排隊(duì)在后面的線程獲得,所以需要喚醒后面的線程。

至此休溶,釋放許可邏輯結(jié)束,對比獲取許可的邏輯相對來說要簡單許多,只需要更新state值后調(diào)用doReleaseShared()方法喚醒后繼節(jié)點(diǎn)線程即可杉女。但是在理解doReleaseShared()方法時(shí)需要額外注意:調(diào)用doReleaseShared()方法的線程會存在兩種:

  • 一是釋放共享鎖/許可數(shù)的線程层释。調(diào)用release()方法釋放許可時(shí)必然調(diào)用它喚醒后繼線程
  • 二是剛獲取到共享鎖/許可數(shù)的線程廉白。一定情況下楣嘁,在滿足“超長判斷”的任意條件時(shí)也會調(diào)用它喚醒后繼線程

2.2、AQS共享模式之Semaphore的FairSync公平鎖實(shí)現(xiàn)

AQS共享模式中的公平鎖實(shí)現(xiàn)除開在獲取鎖的邏輯上與非公平鎖的有些許不同外,其他的實(shí)現(xiàn)大致相同漱病。

2.2.1、信號量中公平鎖FairSync獲取許可/鎖實(shí)現(xiàn)

公平鎖的概念是指先請求鎖的線程一定比后請求鎖的線程要先執(zhí)行,先獲取到鎖資源,從時(shí)間上需要保證執(zhí)行的先后順序沿量。

公平鎖獲取許可執(zhí)行邏輯:Semaphore.acquire()獲取許可方法 → AQS.acquireSharedInterruptibly()方法 → AQS.tryAcquireShared()獲取共享鎖模板方法 → FairSync.tryAcquireShared()方法

// Semaphore類 → 構(gòu)造方法
public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
// Semaphore類 → acquire()獲取鎖方法
public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

// AbstractQueuedSynchronizer類 → acquireSharedInterruptibly()方法
public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    // 調(diào)用AQS定義的獲取共享鎖的模板方法
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}
// AbstractQueuedSynchronizer類 → tryAcquireShared()模板方法
protected int tryAcquireShared(int arg) {
    throw new UnsupportedOperationException();
}

// Semaphore類 → FairSync公平鎖類
static final class FairSync extends Sync {
    FairSync(int permits) {
        super(permits);
    }
    
    // Semaphore類 → FairSync內(nèi)部類 → tryAcquireShared()子類實(shí)現(xiàn)
    protected int tryAcquireShared(int acquires) {
        for (;;) {
            // 不同點(diǎn):先判斷隊(duì)列中是否存在節(jié)點(diǎn)后再執(zhí)行獲取鎖操作
            if (hasQueuedPredecessors())
                return -1;
            int available = getState();
            int remaining = available - acquires;
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                return remaining;
        }
    }
}

從獲取鎖/許可的代碼中可以非常明顯的看出乌妒,公平鎖的實(shí)現(xiàn)相對來說损话,與非公平鎖的唯一不同點(diǎn)在于:公平鎖的模式下獲取鎖,會先調(diào)用hasQueuedPredecessors()方法判斷同步隊(duì)列中是否存在節(jié)點(diǎn)忘闻,如果存在則直接返回-1回到acquireSharedInterruptibly()方法if(tryAcquireShared(arg)<0)判斷,調(diào)用doAcquireSharedInterruptibly(arg)方法將當(dāng)前線程封裝成Node.SHARED共享節(jié)點(diǎn)加入同步隊(duì)列等待。反之永高,如果隊(duì)列中不存在節(jié)點(diǎn)則嘗試直接獲取鎖/許可辐脖。

2.2.2、信號量中公平鎖FairSync釋放許可/鎖實(shí)現(xiàn)

公平鎖釋放許可的邏輯與非公平鎖的實(shí)現(xiàn)是一致的,因?yàn)槎际荢ync類的子類,而釋放鎖的邏輯都是對state減一更新后歹苦,喚醒后繼節(jié)點(diǎn)的線程蚪腋。所以關(guān)于釋放鎖的具體實(shí)現(xiàn)則是交由Sync類實(shí)現(xiàn),這里不再重復(fù)贅述。

三哩簿、ReetrantLock與Semaphore的區(qū)別

對比項(xiàng) ReetrantLock Semaphore
實(shí)現(xiàn)模式 獨(dú)占模式 共享模式
獲取鎖方法 tryAcquire() tryAcquireShared()
釋放鎖方法 tryRelease() tryAcquireShared()
是否支持重入 支持 不支持
線程中斷 支持 支持
Condition 支持 不支持
隊(duì)列數(shù)量 一個(gè)同步+多個(gè)等待 單個(gè)同步
節(jié)點(diǎn)類型 Node.EXCLUSIVE Node.SHARED

四稼稿、共享模式的其他實(shí)現(xiàn)者

除開Semaphore信號量的實(shí)現(xiàn)是基于AQS的共享模式之外,在JUC并發(fā)包中CountDownLatch、ReetrantReadWriteLock讀寫鎖的Read讀鎖等都是基于AQS的共享模式實(shí)現(xiàn)的改执,下面我們也可以簡單的看看關(guān)于CountDownLatch的用法裹粤。

4.1、CountDownLatch應(yīng)用場景實(shí)戰(zhàn)

在CountDownLatch初始化時(shí)和Semaphore一樣,我們需要傳入一個(gè)數(shù)字count作為最大線程數(shù)

CountDownLatch countDownLatch = new CountDownLatch(3);

這個(gè)參數(shù)同樣會間接的賦值給AQS內(nèi)部的state同步狀態(tài)標(biāo)識。一般我們會調(diào)用它的兩個(gè)方法:await()與countDown()

  • await():調(diào)用await()方法的線程會被封裝成共享節(jié)點(diǎn)加入同步隊(duì)列阻塞等待葱弟,直至state=0時(shí)才會喚醒同步隊(duì)列中所有的線程
  • countDown():調(diào)用countDown()方法的線程會對state減一

而關(guān)于CountDownLatch有兩種用法:

  • 一射窒、多等一:初始化count=1点寥,多條線程await()阻塞弟疆,一條線程調(diào)用countDown()喚醒所有阻塞線程
  • 二历葛、一等多:初始化count=x帜羊,多線程countDown()對count進(jìn)行減一稠集,一條線程await()阻塞,當(dāng)count=0時(shí)阻塞的線程開始執(zhí)行

如上兩種用法在我們的項(xiàng)目中也可以有很多的應(yīng)用場景,多等一的用法我們可以用來在一定程度上模擬并發(fā)測試接口并發(fā)安全問題线定、死鎖問題等湾趾,如:

final CountDownLatch countDownLatch = new CountDownLatch(1);
for (int i = 1; i <= 3; i++) {
    new Thread(() -> {
        try {
            System.out.println("線程:" + Thread.currentThread().getName()
            + "....阻塞等待澳眷!");
            countDownLatch.await();
            // 可以在此處調(diào)用需要并發(fā)測試的方法或接口
            System.out.println("線程:" + Thread.currentThread().getName()
            + "....開始執(zhí)行勿侯!");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }, "T" + i).start();
}
Thread.sleep(1000);
countDownLatch.countDown();

/*
   程序開始運(yùn)行:
    線程:T2....阻塞等待!
    線程:T1....阻塞等待!
    線程:T3....阻塞等待!
   程序運(yùn)行一秒后(三條線程幾乎同時(shí)執(zhí)行):
    線程:T2....開始執(zhí)行睦焕!
    線程:T1....開始執(zhí)行本谜!
    線程:T3....開始執(zhí)行!
*/

如上案例中,創(chuàng)建了一個(gè)CountDownLatch對象,初始化時(shí)傳遞count=1籽腕,循環(huán)創(chuàng)建三條線程T1,T2,T3阻塞等待揍很,主線程在一秒后調(diào)用countDown()喚醒了同步隊(duì)列中的三條線程繼續(xù)執(zhí)行,原理如下:

CountDownLatch多等一原理

我們在實(shí)際開發(fā)過程中,往往很多并發(fā)任務(wù)都存在前后依賴關(guān)系膘融,如詳情頁需要調(diào)用多個(gè)接口完成數(shù)據(jù)聚合攘宙、并行執(zhí)行獲取到數(shù)據(jù)后需要進(jìn)行結(jié)果合并铺韧、多個(gè)操作完成后需要進(jìn)行數(shù)據(jù)檢查等等讯壶,而這些場景下我們可以使用一等多的用法:

final CountDownLatch countDownLatch = new CountDownLatch(3);
Map data = new HashMap();
for (int i = 1; i <= 3; i++) {
    final int page = i;
    new Thread(() -> {
        System.out.println("線程:" + Thread.currentThread().getName() +
                "....讀取分段數(shù)據(jù):"+(page-1)*200+"-"+page*200+"行");
        // 數(shù)據(jù)加入結(jié)果集:data.put();
        countDownLatch.countDown();
    }, "T" + i).start();
}
countDownLatch.await();
System.out.println("線程:" + Thread.currentThread().getName() 
        + "....對數(shù)據(jù)集:data進(jìn)行處理");
        
/*
運(yùn)行結(jié)果:
    線程:T1....讀取分段數(shù)據(jù):0-200行
    線程:T2....讀取分段數(shù)據(jù):200-400行
    線程:T3....讀取分段數(shù)據(jù):400-600行

    線程main....對數(shù)據(jù)集:data進(jìn)行處理
*/

如上一等多的案例中帐萎,for循環(huán)開啟三個(gè)線程T1,T2,T3并行執(zhí)行同時(shí)讀取數(shù)據(jù)增快處理效率葛躏,讀取完成之后將數(shù)據(jù)加入data結(jié)果集中匯總,主線程等待三條線程讀取完成后對數(shù)據(jù)集data進(jìn)行處理,如下:

CountDownLatch一等多原理

4.2匪蟀、CountDownLatch實(shí)現(xiàn)原理

前面我們曾提到過段化,CountDownLatch也是基于AQS共享模式實(shí)現(xiàn)的,與Semaphore一樣,會將傳入的count間接的賦值給AQS內(nèi)部的state同步狀態(tài)標(biāo)識。

private final Sync sync;

// CountDownLatch構(gòu)造方法
public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    // 對其內(nèi)部Sync對象進(jìn)行初始化
    this.sync = new Sync(count);
}

// CountDownLatch類 → Sync內(nèi)部類
private static final class Sync extends AbstractQueuedSynchronizer {
    // Sync構(gòu)造函數(shù):對AQS內(nèi)部的state進(jìn)行賦值
    Sync(int count) {setState(count);}
    
    // 調(diào)用await()方法最終會調(diào)用到這里
    protected int tryAcquireShared(int acquires) {
        return (getState() == 0) ? 1 : -1;
    }
}

// CountDownLatch類 → await()方法
public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

在CountDownLatch中,存在一個(gè)Sync內(nèi)部類,當(dāng)我們創(chuàng)建一個(gè)CountDownLatch對象時(shí),實(shí)則其內(nèi)部的構(gòu)造函數(shù)是在對其sync對象進(jìn)行初始化,與我們前面所說的一樣

CountDownLatch countDownLatch = new CountDownLatch(count);

初始化時(shí)傳遞的count數(shù)字最終會通過調(diào)用setState(state)方法賦值給AQS內(nèi)部的同步狀態(tài)標(biāo)識state變量,而當(dāng)線程調(diào)用await()方法時(shí),會調(diào)用AQS的acquireSharedInterruptibly()方法:

// AbstractQueuedSynchronizer類 → acquireSharedInterruptibly()方法
public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    // 最終調(diào)用到CountDownLatch內(nèi)部Sync類的tryAcquireShared()方法
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

// CountDownLatch類 → Sync內(nèi)部類 → tryAcquireShared()方法
protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

因?yàn)锳QS中tryAcquireShared(arg)方法僅是模板方法的原因,所以線程在執(zhí)行acquireSharedInterruptibly()方法時(shí)最終會調(diào)用到CountDownLatch內(nèi)部Sync類的tryAcquireShared()方法眠菇。當(dāng)count/state為0時(shí)返回true,反之,count不為0時(shí)返回false,最終回到if(tryAcquireShared(arg)<0)執(zhí)行時(shí)急凰,如果count不為0則執(zhí)行doAcquireSharedInterruptibly(arg)方法將當(dāng)前線程信息封裝成Node.SHARED共享節(jié)點(diǎn)加入同步隊(duì)列阻塞等待。原理如下:

CountDownLatch.await原理

如上便是CountDownLatch.await()的實(shí)現(xiàn)原理聂渊,大體來說還是比較簡單。下面我們接著分析一下CountDownLatch.countDown()方法的實(shí)現(xiàn)撵孤。

// CountDownLatch類 → countDown()方法
public void countDown() {
    sync.releaseShared(1);
}

// AbstractQueuedSynchronizer類 → releaseShared()方法
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

// AbstractQueuedSynchronizer類 → 模板方法:tryReleaseShared()
protected boolean tryReleaseShared(int arg) {
    throw new UnsupportedOperationException();
}

// CountDownLatch類 → Sync內(nèi)部類 → tryReleaseShared()方法
// 調(diào)用countDown()方法最終會調(diào)用到這里
protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) {
        int c = getState();
        if (c == 0)
            return false;
        int nextc = c-1;
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}

調(diào)用CountDownLatch.countDown()方法后會調(diào)用AQS的tryReleaseShared()模板方法奴潘,最終調(diào)用CountDownLatchSync內(nèi)部類tryReleaseShared()方法奈虾。在該方法中首先會先對于state/count進(jìn)行一次為0判斷碉纳,如果不為0則對count/state減一铁孵,然后再次對更新之后的state/count進(jìn)行為0判斷熙宇,如果減一后state等于0返回true馆蠕,回到releaseShared()if(tryReleaseShared(arg))執(zhí)行doReleaseShared()喚醒同步隊(duì)列中的阻塞線程吼渡。反之寄雀,如果減一后不為0阿趁,當(dāng)前線程則直接返回呜呐,方法結(jié)束洋魂。如下:

CountDownLatch.countDown原理

4.3、CountDownLatch與CyclicBarrier的區(qū)別

在JUC包中還存在另一個(gè)和CountDownLatch作用相同的工具類:CyclicBarrier。與CountDownLatch不同的是:CyclicBarrier是基于AQS的獨(dú)占模式實(shí)現(xiàn)的,其內(nèi)部通過ReetrantLock與Condition實(shí)現(xiàn)線程阻塞與喚醒。對比如下:

對比項(xiàng) CountDownLatch CyclicBarrier
實(shí)現(xiàn)模式 共享模式 獨(dú)占模式
計(jì)數(shù)方式 減法 減法
復(fù)用支持 不可復(fù)用 計(jì)數(shù)可置0
重置支持 不可重置 可重置
設(shè)計(jì)重點(diǎn) 一等多 多等多

五再膳、總結(jié)

通過Semaphore與CountDownLatch原理進(jìn)行分析后灾杰,不難得知凛篙,在初始化時(shí)傳遞的許可數(shù)/計(jì)數(shù)器最終都會間接的傳遞給AQS的同步狀態(tài)標(biāo)識state填物。當(dāng)一條線程嘗試獲取共享鎖時(shí)雁刷,會對state減一目派,當(dāng)state為0時(shí)代表沒有可用共享鎖了,其他后續(xù)請求的線程會被封裝成共享節(jié)點(diǎn)加入同步隊(duì)列等待,直至其他持有共享鎖的線程釋放(state加一)。不過與獨(dú)占模式不同的是:共享模式中裳擎,除開釋放鎖時(shí)會喚醒后繼節(jié)點(diǎn)的線程外拷淘,獲取共享鎖成功的線程也會在滿足一定條件下喚醒后繼節(jié)點(diǎn)结洼。至于共享模式中的公平鎖與非公平鎖則與之前的獨(dú)占模式的公平鎖與非公平鎖相同鸣峭,公平鎖情況下,先判斷隊(duì)列是否存在Node再獲取鎖惰爬,從而保證每條線程獲取共享鎖時(shí)都是先到先得的順序執(zhí)行的咨跌。而非公平鎖情況下殉摔,通過線程競爭的方式獲取,不管隊(duì)列中是否已經(jīng)存在Node節(jié)點(diǎn)挽懦,請求的線程都會先執(zhí)行一遍獲取鎖的邏輯蔫磨,只要執(zhí)行成功就能獲取到共享鎖搀罢,獲得線程執(zhí)行權(quán)。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末淡诗,一起剝皮案震驚了整個(gè)濱河市座掘,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌蛾坯,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,013評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件聋迎,死亡現(xiàn)場離奇詭異萌焰,居然都是意外死亡掌猛,警方通過查閱死者的電腦和手機(jī)孔飒,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,205評論 2 382
  • 文/潘曉璐 我一進(jìn)店門睦柴,熙熙樓的掌柜王于貴愁眉苦臉地迎上來躬络,“玉大人丹弱,你說我怎么就攤上這事。” “怎么了?”我有些...
    開封第一講書人閱讀 152,370評論 0 342
  • 文/不壞的土叔 我叫張陵,是天一觀的道長。 經(jīng)常有香客問我伶丐,道長葫男,這世上最難降的妖魔是什么利职? 我笑而不...
    開封第一講書人閱讀 55,168評論 1 278
  • 正文 為了忘掉前任,我火速辦了婚禮饺鹃,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘。我一直安慰自己研底,他們只是感情好埠偿,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,153評論 5 371
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著榜晦,像睡著了一般。 火紅的嫁衣襯著肌膚如雪乾胶。 梳的紋絲不亂的頭發(fā)上抖剿,一...
    開封第一講書人閱讀 48,954評論 1 283
  • 那天朽寞,我揣著相機(jī)與錄音,去河邊找鬼斩郎。 笑死脑融,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的缩宜。 我是一名探鬼主播肘迎,決...
    沈念sama閱讀 38,271評論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼锻煌!你這毒婦竟也來了妓布?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 36,916評論 0 259
  • 序言:老撾萬榮一對情侶失蹤炼幔,失蹤者是張志新(化名)和其女友劉穎秋茫,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體乃秀,經(jīng)...
    沈念sama閱讀 43,382評論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡肛著,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,877評論 2 323
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了跺讯。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片枢贿。...
    茶點(diǎn)故事閱讀 37,989評論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖刀脏,靈堂內(nèi)的尸體忽然破棺而出局荚,到底是詐尸還是另有隱情,我是刑警寧澤愈污,帶...
    沈念sama閱讀 33,624評論 4 322
  • 正文 年R本政府宣布耀态,位于F島的核電站,受9級特大地震影響暂雹,放射性物質(zhì)發(fā)生泄漏首装。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,209評論 3 307
  • 文/蒙蒙 一杭跪、第九天 我趴在偏房一處隱蔽的房頂上張望仙逻。 院中可真熱鬧,春花似錦涧尿、人聲如沸系奉。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,199評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽缺亮。三九已至,卻和暖如春庄蹋,著一層夾襖步出監(jiān)牢的瞬間瞬内,已是汗流浹背迷雪。 一陣腳步聲響...
    開封第一講書人閱讀 31,418評論 1 260
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留虫蝶,地道東北人章咧。 一個(gè)月前我還...
    沈念sama閱讀 45,401評論 2 352
  • 正文 我出身青樓,卻偏偏與公主長得像能真,于是被迫代替她去往敵國和親赁严。 傳聞我的和親對象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,700評論 2 345

推薦閱讀更多精彩內(nèi)容