Concurrent Java 04 - JUC之AQS

AbstractQueuedSynchronizer - AQS

AQS本質(zhì)

Provides a framework for implementing blocking locks and related
synchronizers (semaphores, events, etc) that rely on
first-in-first-out (FIFO) wait queues.

AQS本質(zhì)是一個(gè)支持FIFO的同步隊(duì)列眷射,使用Node構(gòu)建鎖或其他同步組件的基礎(chǔ)框架。
CountDownLatch,SemaphoreReentrantLock內(nèi)部就實(shí)現(xiàn)了這種同步隊(duì)列。

abstract static class Sync extends AbstractQueuedSynchronizer {
    ...
}

AQS組件 - CountDownLatch

await線程等待直到某個(gè)條件值為0

使用它使用了計(jì)數(shù)器阻塞當(dāng)前線程裕循,直到計(jì)數(shù)器為0除破,只會(huì)出現(xiàn)一次鼎姐。
等待計(jì)數(shù)的線程
package com.accat.concurrency.example.aqs;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@Slf4j
public class CountDownLatchExample1 {

    private final static int threadCount = 200;

    public static void main(String[] args) throws Exception {

        ExecutorService exec = Executors.newCachedThreadPool();

        final CountDownLatch countDownLatch = new CountDownLatch(threadCount);

        for (int i = 0; i < threadCount; i++) {
            final int threadNum = i;
            exec.execute(() -> {
                try {
                    test(threadNum);
                } catch (Exception e) {
                    log.error("exception", e);
                } finally {
                    countDownLatch.countDown();
                }
            });
        }
        countDownLatch.await();
        log.info("finish");
        exec.shutdown();
    }

    private static void test(int threadNum) throws Exception {
        Thread.sleep(100);
        log.info("{}", threadNum);
        Thread.sleep(100);
    }
}
countDownLatch.await(10, TimeUnit.MILLISECONDS); // 也支持指定等待時(shí)間,超時(shí)則繼續(xù)執(zhí)行

AQS組件 - Semaphore

信號(hào)量茁影,規(guī)劃一次性可同時(shí)運(yùn)行的線程個(gè)數(shù)。

四個(gè)隊(duì)伍的隊(duì)員接力跑運(yùn)動(dòng)員(一次可容納四線程)在起跑線上等待接棒(信號(hào)量)阅束。
等待信號(hào)量的線程
package com.accat.concurrency.example.aqs;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

@Slf4j
public class SemaphoreExample1 {

    private final static int threadCount = 20;

    public static void main(String[] args) throws Exception {

        ExecutorService exec = Executors.newCachedThreadPool();

        final Semaphore semaphore = new Semaphore(3);

        for (int i = 0; i < threadCount; i++) {
            final int threadNum = i;
            exec.execute(() -> {
                try {
                    semaphore.acquire(); // 獲取一個(gè)許可
                    test(threadNum);
                    semaphore.release(); // 釋放一個(gè)許可
                } catch (Exception e) {
                    log.error("exception", e);
                }
            });
        }
        exec.shutdown();
    }

    private static void test(int threadNum) throws Exception {
        log.info("{}", threadNum);
        Thread.sleep(1000);
    }
}

嘗試做一些操作呼胚,如果沒有及時(shí)操作則丟棄這些操作。
如:接包處理息裸,如果處理時(shí)間超時(shí)蝇更,將處理不及時(shí)的包丟棄掉。

package com.accat.concurrency.example.aqs;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

@Slf4j
public class SemaphoreExample4 {

    private final static int threadCount = 20;

    public static void main(String[] args) throws Exception {

        ExecutorService exec = Executors.newCachedThreadPool();

        final Semaphore semaphore = new Semaphore(3);

        for (int i = 0; i < threadCount; i++) {
            final int threadNum = i;
            exec.execute(() -> {
                try {
                    if (semaphore.tryAcquire(5000, TimeUnit.MILLISECONDS)) { // 嘗試獲取一個(gè)許可
                        test(threadNum);
                        semaphore.release(); // 釋放一個(gè)許可
                    }
                } catch (Exception e) {
                    log.error("exception", e);
                }
            });
        }
        exec.shutdown();
    }

    private static void test(int threadNum) throws Exception {
        log.info("{}", threadNum);
        Thread.sleep(1000);
    }
}

AQS組件 - CyclicBarrier

image.png

計(jì)數(shù)器容許重置后再使用呼盆,多個(gè)線程等待其他線程的關(guān)系年扩。

相當(dāng)于多個(gè)運(yùn)動(dòng)員相互等待,等待其他運(yùn)動(dòng)員就位后再一起沖刺访圃,這里既有計(jì)數(shù)器功能厨幻,又有信號(hào)量功能。
相互等待和沖刺的線程
package com.accat.concurrency.example.aqs;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@Slf4j
public class CyclicBarrierExample1 {

    private static CyclicBarrier barrier = new CyclicBarrier(5);

    public static void main(String[] args) throws Exception {

        ExecutorService executor = Executors.newCachedThreadPool();

        for (int i = 0; i < 10; i++) {
            final int threadNum = i;
            Thread.sleep(1000);
            executor.execute(() -> {
                try {
                    race(threadNum);
                } catch (Exception e) {
                    log.error("exception", e);
                }
            });
        }
        executor.shutdown();
    }

    private static void race(int threadNum) throws Exception {
        Thread.sleep(1000);
        log.info("{} is ready", threadNum);
        barrier.await();
        log.info("{} continue", threadNum);
    }
}

當(dāng)所有就位的運(yùn)動(dòng)員(線程)準(zhǔn)備時(shí)腿时, 裁判員鳴槍况脆, 運(yùn)動(dòng)員搶跑。
這個(gè)過程相當(dāng)于多個(gè)線程互相等待就位后批糟,需要在之前執(zhí)行其他操作(鳴槍)格了,
之后所有線程同時(shí)執(zhí)行,CyclicBarrier支持這種操作徽鼎。

package com.accat.concurrency.example.aqs;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@Slf4j
public class CyclicBarrierExample3 {

    private static CyclicBarrier barrier = new CyclicBarrier(5, () -> {
        log.info("callback is running");
    });

    public static void main(String[] args) throws Exception {

        ExecutorService executor = Executors.newCachedThreadPool();

        for (int i = 0; i < 10; i++) {
            final int threadNum = i;
            Thread.sleep(1000);
            executor.execute(() -> {
                try {
                    race(threadNum);
                } catch (Exception e) {
                    log.error("exception", e);
                }
            });
        }
        executor.shutdown();
    }

    private static void race(int threadNum) throws Exception {
        Thread.sleep(1000);
        log.info("{} is ready", threadNum);
        barrier.await();
        log.info("{} continue", threadNum);
    }
}

AQS組件 - ReentrantLock

ReentrantLocksynchronize區(qū)別
1.可重入鎖盛末,synchronize依賴JVM實(shí)現(xiàn)弹惦,ReentrantLock依賴JDK實(shí)現(xiàn),后者能查看源碼悄但。
2.兩者性能相差不大棠隐,官方推薦synchronize,實(shí)現(xiàn)更加容易檐嚣,不用手動(dòng)釋放鎖助泽。
3.ReentrantLock可以指定公平鎖和非公平鎖。
4.ReentrantLock提供一個(gè)Condition類净嘀,可以分組喚醒需要喚醒的線程
5.ReentrantLock提供一個(gè)能夠中斷等待鎖的線程的機(jī)制报咳, lock.lockInterruptibly()
ReentrantLock是一種自旋鎖實(shí)現(xiàn)侠讯,通過CAS機(jī)制不斷嘗試加鎖挖藏,避免線程進(jìn)入內(nèi)核態(tài)的阻塞狀態(tài),想盡辦法讓線程不進(jìn)入內(nèi)核態(tài)的阻塞狀態(tài)是理解鎖設(shè)計(jì)的關(guān)鍵厢漩。

package com.accat.concurrency.example.lock;

import com.accat.concurrency.annoations.ThreadSafe;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

@Slf4j
@ThreadSafe
public class LockExample2 {

    // 請(qǐng)求總數(shù)
    public static int clientTotal = 5000;

    // 同時(shí)并發(fā)執(zhí)行的線程數(shù)
    public static int threadTotal = 200;

    public static int count = 0;

    private final static Lock lock = new ReentrantLock();

    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int i = 0; i < clientTotal ; i++) {
            executorService.execute(() -> {
                try {
                    semaphore.acquire();
                    add();
                    semaphore.release();
                } catch (Exception e) {
                    log.error("exception", e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("count:{}", count);
    }

    private static void add() {
        lock.lock();
        try {
            count++;
        } finally {
            lock.unlock();
        }
    }
}

AQS組件 - ReentrantReadWriteLock

讓我們想一下膜眠,我們有一個(gè)類Data, 其中getData(), setData()溜嗜,我們要保證這個(gè)類線程安全宵膨,在其上加鎖,那么我們需要getData(), setData()互斥和setData(), setData()互斥炸宵,但是如果直接使用synchronize或者ReentrantLock的話辟躏,那么getData(), getData()也將互斥,這不是我們要的土全。
所以JDK提供了一個(gè)ReentrantLock的繼承類ReentrantReadWriteLock捎琐,實(shí)現(xiàn)讀寫鎖分離的實(shí)現(xiàn),避免上述問題裹匙。

package com.mmall.concurrency.example.lock;

import lombok.extern.slf4j.Slf4j;

import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

@Slf4j
public class LockExample3 {

    private final Map<String, Data> map = new TreeMap<>();

    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();

    private final Lock readLock = lock.readLock();

    private final Lock writeLock = lock.writeLock();

    public Data get(String key) {
        readLock.lock();
        try {
            return map.get(key);
        } finally {
            readLock.unlock();
        }
    }

    public Set<String> getAllKeys() {
        readLock.lock();
        try {
            return map.keySet();
        } finally {
            readLock.unlock();
        }
    }

    public Data put(String key, Data value) {
        writeLock.lock();
        try {
            return map.put(key, value);
        } finally {
            readLock.unlock();
        }
    }

    class Data {

    }
}

StampedLock

StampedLock 是Java樂觀鎖的一種實(shí)現(xiàn)瑞凑。樂觀鎖是為執(zhí)行操作的對(duì)象附加一個(gè)versionId,每次操作前獲取versionId概页,操作完后查看versionId是否沒被改變籽御,如果是則執(zhí)行更新,不是則放棄更新惰匙,重新操作技掏。
樂觀鎖適用于讀多寫少的場景。

package com.accat.concurrency.example.lock;

import com.accat.concurrency.annoations.ThreadSafe;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.locks.StampedLock;

@Slf4j
@ThreadSafe
public class LockExample5 {

    // 請(qǐng)求總數(shù)
    public static int clientTotal = 5000;

    // 同時(shí)并發(fā)執(zhí)行的線程數(shù)
    public static int threadTotal = 200;

    public static int count = 0;

    private final static StampedLock lock = new StampedLock();

    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int i = 0; i < clientTotal ; i++) {
            executorService.execute(() -> {
                try {
                    semaphore.acquire();
                    add();
                    semaphore.release();
                } catch (Exception e) {
                    log.error("exception", e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("count:{}", count);
    }

    private static void add() {
        long stamp = lock.writeLock();
        try {
            count++;
        } finally {
            lock.unlock(stamp);
        }
    }
}

AQS組件 - Condition

兩個(gè)隊(duì)列的Node交換

本質(zhì)上就是加入Sync queue之后项鬼, Sync queueCondition queue之間Node的相互放置操作哑梳。

package com.accat.concurrency.example.lock;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

@Slf4j
public class LockExample6 {

    public static void main(String[] args) {
        ReentrantLock reentrantLock = new ReentrantLock();
        Condition condition = reentrantLock.newCondition();

        new Thread(() -> {
            try {
                reentrantLock.lock();  // 獲取鎖,加入 Sync queue
                log.info("wait signal"); // 1 
                condition.await();  // 將鎖釋放秃臣, 進(jìn)入Condition queue
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            log.info("get signal"); // 4
            reentrantLock.unlock();
        }).start();

        new Thread(() -> {
            reentrantLock.lock();  // 由于鎖被釋放涧衙,拿到鎖哪工, 進(jìn)入Sync queue
            log.info("get lock"); // 2
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            condition.signalAll();  // 將Condition queue中的Node放回到 Sync queue
            log.info("send signal ~ "); // 3
            reentrantLock.unlock();  // 釋放鎖, 將Node從Sync queue移除
        }).start();
    }
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末弧哎,一起剝皮案震驚了整個(gè)濱河市雁比,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌撤嫩,老刑警劉巖偎捎,帶你破解...
    沈念sama閱讀 211,123評(píng)論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異序攘,居然都是意外死亡茴她,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,031評(píng)論 2 384
  • 文/潘曉璐 我一進(jìn)店門程奠,熙熙樓的掌柜王于貴愁眉苦臉地迎上來丈牢,“玉大人,你說我怎么就攤上這事瞄沙〖号妫” “怎么了?”我有些...
    開封第一講書人閱讀 156,723評(píng)論 0 345
  • 文/不壞的土叔 我叫張陵距境,是天一觀的道長申尼。 經(jīng)常有香客問我,道長垫桂,這世上最難降的妖魔是什么师幕? 我笑而不...
    開封第一講書人閱讀 56,357評(píng)論 1 283
  • 正文 為了忘掉前任,我火速辦了婚禮诬滩,結(jié)果婚禮上霹粥,老公的妹妹穿的比我還像新娘。我一直安慰自己碱呼,他們只是感情好蒙挑,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,412評(píng)論 5 384
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著愚臀,像睡著了一般忆蚀。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上姑裂,一...
    開封第一講書人閱讀 49,760評(píng)論 1 289
  • 那天馋袜,我揣著相機(jī)與錄音,去河邊找鬼舶斧。 笑死欣鳖,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的茴厉。 我是一名探鬼主播泽台,決...
    沈念sama閱讀 38,904評(píng)論 3 405
  • 文/蒼蘭香墨 我猛地睜開眼什荣,長吁一口氣:“原來是場噩夢(mèng)啊……” “哼!你這毒婦竟也來了怀酷?” 一聲冷哼從身側(cè)響起稻爬,我...
    開封第一講書人閱讀 37,672評(píng)論 0 266
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎蜕依,沒想到半個(gè)月后桅锄,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,118評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡样眠,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,456評(píng)論 2 325
  • 正文 我和宋清朗相戀三年友瘤,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片檐束。...
    茶點(diǎn)故事閱讀 38,599評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡辫秧,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出厢塘,到底是詐尸還是另有隱情茶没,我是刑警寧澤,帶...
    沈念sama閱讀 34,264評(píng)論 4 328
  • 正文 年R本政府宣布晚碾,位于F島的核電站,受9級(jí)特大地震影響喂急,放射性物質(zhì)發(fā)生泄漏格嘁。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,857評(píng)論 3 312
  • 文/蒙蒙 一廊移、第九天 我趴在偏房一處隱蔽的房頂上張望糕簿。 院中可真熱鬧,春花似錦狡孔、人聲如沸懂诗。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,731評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽殃恒。三九已至,卻和暖如春辱揭,著一層夾襖步出監(jiān)牢的瞬間离唐,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,956評(píng)論 1 264
  • 我被黑心中介騙來泰國打工问窃, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留亥鬓,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 46,286評(píng)論 2 360
  • 正文 我出身青樓域庇,卻偏偏與公主長得像嵌戈,于是被迫代替她去往敵國和親覆积。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,465評(píng)論 2 348

推薦閱讀更多精彩內(nèi)容