J.U.C 之AQS

圖片.png

CountDownLatch

  • CountDownLatchExample1
package com.alan.concurrency.example.aqs;


import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;


@Slf4j
public class CountDownLatchExample1 {

    private final static int threadCount = 200;

    public static void main(String[] args) throws InterruptedException {

        ExecutorService exec = Executors.newCachedThreadPool();

        final CountDownLatch countDownLatch = new CountDownLatch(threadCount);

        for (int i = 0; i < threadCount; i++) {
            {
                int threadNum = i;
                exec.execute(() -> {
                    try {
                        test(threadNum);
                    } catch (InterruptedException e) {
                        log.error("InterruptedException", e);
                    } finally {
                        countDownLatch.countDown();
                    }
                });
            }
        }

        //通過countDown()和await()能保證所有線程執(zhí)行完成后如贷,再調(diào)用log.info("finish")
        countDownLatch.await();
        log.info("finish");
        exec.shutdown();

    }

    public static void test(int threadNum) throws InterruptedException {
        Thread.sleep(100);
        log.info("{}",threadNum);
    }

}
  • CountDownLatchExample2 限制指定時(shí)間完成
package com.alan.concurrency.example.aqs;


import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;


@Slf4j
public class CountDownLatchExample2 {

    private final static int threadCount = 200;

    public static void main(String[] args) throws InterruptedException {

        ExecutorService exec = Executors.newCachedThreadPool();

        final CountDownLatch countDownLatch = new CountDownLatch(threadCount);

        for (int i = 0; i < threadCount; i++) {
            {
                int threadNum = i;
                exec.execute(() -> {
                    try {
                        test(threadNum);
                    } catch (InterruptedException e) {
                        log.error("InterruptedException", e);
                    } finally {
                        countDownLatch.countDown();
                    }
                });
            }
        }

        //通過countDown()和await()能保證所有線程執(zhí)行完成后,再調(diào)用log.info("finish")
        //設(shè)置超時(shí)時(shí)間10毫秒
        countDownLatch.await(10,TimeUnit.MILLISECONDS);
        log.info("finish");
        //是先讓當(dāng)前線程任務(wù)都執(zhí)行完成后,才進(jìn)行shutdown操作
        exec.shutdown();

    }

    public static void test(int threadNum) throws InterruptedException {
        Thread.sleep(100);
        log.info("{}",threadNum);
    }

}

Semaphore 同步組件-信號量

  • Semaphore是一種在多線程環(huán)境下使用的設(shè)施杠袱,該設(shè)施負(fù)責(zé)協(xié)調(diào)各個(gè)線程尚猿,以保證它們能夠正確、合理的使用公共資源的設(shè)施霞掺,也是操作系統(tǒng)中用于控制進(jìn)程同步互斥的量谊路。

  • 以一個(gè)停車場是運(yùn)作為例。為了簡單起見菩彬,假設(shè)停車場只有三個(gè)車位缠劝,一開始三個(gè)車位都是空的。這時(shí)如果同時(shí)來了五輛車骗灶,看門人允許其中三輛不受阻礙的進(jìn)入惨恭,然后放下車攔,剩下的車則必須在入口等待耙旦,此后來的車也都不得不在入口處等待脱羡。這時(shí),有一輛車離開停車場免都,看門人得知后锉罐,打開車攔,放入一輛绕娘,如果又離開兩輛脓规,則又可以放入兩輛,如此往復(fù)险领。

  • 在這個(gè)停車場系統(tǒng)中侨舆,車位是公共資源,每輛車好比一個(gè)線程绢陌,看門人起的就是信號量的作用挨下。

  • 更進(jìn)一步,信號量的特性如下:信號量是一個(gè)非負(fù)整數(shù)(車位數(shù))脐湾,所有通過它的線程(車輛)都會將該整數(shù)減一(通過它當(dāng)然是為了使用資源)臭笆,當(dāng)該整數(shù)值為零時(shí),所有試圖通過它的線程都將處于等待狀態(tài)沥割。在信號量上我們定義兩種操作: Wait(等待) 和 Release(釋放)耗啦。 當(dāng)一個(gè)線程調(diào)用Wait(等待)操作時(shí),它要么通過然后將信號量減一机杜,要么一直等下去帜讲,直到信號量大于一或超時(shí)。Release(釋放)實(shí)際上是在信號量上執(zhí)行加操作椒拗,對應(yīng)于車輛離開停車場似将,該操作之所以叫做“釋放”是因?yàn)榧硬僮鲗?shí)際上是釋放了由信號量守護(hù)的資源获黔。

  • 應(yīng)用場景:只能訪問有限的資源
    1、設(shè)置數(shù)據(jù)庫的連接數(shù)
    2在验、設(shè)置數(shù)為1玷氏,將相當(dāng)于單線程運(yùn)行了。

  • 單一許可

package com.alan.concurrency.example.aqs;


import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;


@Slf4j
public class SemaphoreExample1 {

    private final static int threadCount = 200;

    //設(shè)置允許的并發(fā)數(shù)為20
    private final static Semaphore semaphore = new Semaphore(20);

    public static void main(String[] args) throws InterruptedException {

        ExecutorService exec = Executors.newCachedThreadPool();


        for (int i = 0; i < threadCount; i++) {
            {
                int threadNum = i;
                exec.execute(() -> {
                    try {
                        semaphore.acquire();  //獲取一個(gè)許可
                        test(threadNum);
                        semaphore.release();  //釋放一個(gè)許可
                    } catch (InterruptedException e) {
                        log.error("InterruptedException", e);
                    }
                });
            }
        }

        exec.shutdown();

    }

    public static void test(int threadNum) throws InterruptedException {
        log.info("{}",threadNum);
        Thread.sleep(1000);
    }

}
  • 多個(gè)許可
package com.alan.concurrency.example.aqs;


import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;


@Slf4j
public class SemaphoreExample1 {

    private final static int threadCount = 200;

    //設(shè)置允許的并發(fā)數(shù)為20
    private final static Semaphore semaphore = new Semaphore(20);

    public static void main(String[] args) throws InterruptedException {

        ExecutorService exec = Executors.newCachedThreadPool();


        for (int i = 0; i < threadCount; i++) {
            {
                int threadNum = i;
                exec.execute(() -> {
                    try {
                        semaphore.acquire(20);
                        test(threadNum);
                        semaphore.release(20);
                    } catch (InterruptedException e) {
                        log.error("InterruptedException", e);
                    }
                });
            }
        }

        exec.shutdown();

    }

    public static void test(int threadNum) throws InterruptedException {
        log.info("{}",threadNum);
        Thread.sleep(1000);
    }

}

CyclicBarrier

  • CyclicBarrier是一個(gè)同步工具類腋舌,它允許一組線程互相等待盏触,直到到達(dá)某個(gè)公共屏障點(diǎn)。與CountDownLatch不同的是該barrier在釋放等待線程后可以重用块饺,所以稱它為循環(huán)(Cyclic)的屏障(Barrier)赞辩。
  • CyclicBarrier支持一個(gè)可選的Runnable命令,在一組線程中的最后一個(gè)線程到達(dá)之后(但在釋放所有線程之前)授艰,該命令只在每個(gè)屏障點(diǎn)運(yùn)行一次辨嗽。若在繼續(xù)所有參與線程之前更新共享狀態(tài),此屏障操作很有用淮腾。
package com.alan.concurrency.example.aqs;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;



@Slf4j
public class CyclicBarrierExample1 {


    private static CyclicBarrier barrier = new CyclicBarrier(5);

    public static void main(String[] args) throws Exception {

        ExecutorService executor= Executors.newCachedThreadPool();

        for (int i = 0; i < 10; i++) {

            final int threadNum = i;
            Thread.sleep(1000);
            executor.execute(()->{
                try {
                    race(threadNum);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
        }
    }


    private static  void race(int threadNum) throws Exception{
        Thread.sleep(1000);
        log.info("{} is ready",threadNum);
        barrier.await();
        log.info("{} continue",threadNum);

    }

}

ReentrantLock 與鎖

  • 可重入性:
    從名字上理解糟需,ReenTrantLock的字面意思就是再進(jìn)入的鎖,其實(shí)synchronized關(guān)鍵字所使用的鎖也是可重入的谷朝,兩者關(guān)于這個(gè)的區(qū)別不大洲押。兩者都是同一個(gè)線程沒進(jìn)入一次,鎖的計(jì)數(shù)器都自增1圆凰,所以要等到鎖的計(jì)數(shù)器下降為0時(shí)才能釋放鎖诅诱。

  • 鎖的實(shí)現(xiàn):
    Synchronized是依賴于JVM實(shí)現(xiàn)的,而ReenTrantLock是JDK實(shí)現(xiàn)的送朱,有什么區(qū)別,說白了就類似于操作系統(tǒng)來控制實(shí)現(xiàn)和用戶自己敲代碼實(shí)現(xiàn)的區(qū)別干旁。前者的實(shí)現(xiàn)是比較難見到的驶沼,后者有直接的源碼可供閱讀。

  • 性能的區(qū)別:
    在Synchronized優(yōu)化以前争群,synchronized的性能是比ReenTrantLock差很多的回怜,但是自從Synchronized引入了偏向鎖,輕量級鎖(自旋鎖)后换薄,兩者的性能就差不多了玉雾,在兩種方法都可用的情況下,官方甚至建議使用synchronized轻要,其實(shí)synchronized的優(yōu)化我感覺就借鑒了ReenTrantLock中的CAS技術(shù)复旬。都是試圖在用戶態(tài)就把加鎖問題解決,避免進(jìn)入內(nèi)核態(tài)的線程阻塞冲泥。

  • 功能區(qū)別:
    便利性:很明顯Synchronized的使用比較方便簡潔驹碍,并且由編譯器去保證鎖的加鎖和釋放壁涎,而ReenTrantLock需要手工聲明來加鎖和釋放鎖,為了避免忘記手工釋放鎖造成死鎖志秃,所以最好在finally中聲明釋放鎖怔球。

鎖的細(xì)粒度和靈活度:很明顯ReenTrantLock優(yōu)于Synchronized

  • ReenTrantLock獨(dú)有的能力:
    1、ReenTrantLock可以指定是公平鎖還是非公平鎖浮还。而synchronized只能是非公平鎖竟坛。所謂的公平鎖就是先等待的線程先獲得鎖。
    2钧舌、ReenTrantLock提供了一個(gè)Condition(條件)類担汤,用來實(shí)現(xiàn)分組喚醒需要喚醒的線程們,而不是像synchronized要么隨機(jī)喚醒一個(gè)線程要么喚醒全部線程延刘。
    3漫试、ReenTrantLock提供了一種能夠中斷等待鎖的線程的機(jī)制,通過lock.lockInterruptibly()來實(shí)現(xiàn)這個(gè)機(jī)制碘赖。
package com.alan.concurrency.example.lock;


import com.alan.concurrency.annoations.ThreadSafe;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

@Slf4j
@ThreadSafe
public class LockExample2 {


    //請求數(shù)1000
    public static int clientTotal = 5000;
    //同時(shí)并發(fā)執(zhí)行的線程數(shù)
    public static int threadTotal = 200;

    public static int count = 0;


    //通過Lock接口實(shí)現(xiàn)
    private static Lock lock = new ReentrantLock();


    private  static void add(){

        lock.lock();
        try {
            count++;
        } finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) throws InterruptedException {

        //定義線程池ExecutorService接口
        ExecutorService executorService = Executors.newCachedThreadPool();
        //定義信號量,傳入并發(fā)線程數(shù) final修飾不允許重新賦值
        final Semaphore semaphore = new Semaphore(threadTotal);
        //定義計(jì)數(shù)器閉鎖驾荣。傳入請求總數(shù)
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);

        for (int i = 0; i < clientTotal; i++) {
            //通過匿名內(nèi)部類方式
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        //semaphore控制并發(fā)數(shù)量
                        semaphore.acquire();
                        add();
                        semaphore.release();
                    } catch (InterruptedException e) {
                        log.error("exception",e);
                    }
                    //每次執(zhí)行計(jì)數(shù)器減掉一個(gè)
                    countDownLatch.countDown();
                }

            });

        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("count:{}",count);
    }
}
  • ReentrantReadWriteLock
package com.alan.concurrency.example.lock;


import com.alan.concurrency.annoations.ThreadSafe;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;

import java.util.Date;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

@Slf4j
public class LockExample3 {


    private final Map<String, Data> map = new TreeMap<>();

    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();

    //分別定義讀鎖和寫鎖

    private final Lock readLock = lock.readLock();

    private final Lock writeLock = lock.writeLock();

    public Data get(String key) {
        readLock.lock();
        try {
            return  map.get(key);
        } finally {
            readLock.unlock();
        }
    }

    public Set<String> getAllKeys(){
        readLock.lock();
        try {
            return  map.keySet();
        } finally {
            readLock.unlock();
        }
    }

    public Data put(String key, Data value){
        writeLock.lock();
        try {
            return  map.put(key,value);
        } finally {
            writeLock.unlock();
        }
    }
}
  • StampedLock

package com.alan.concurrency.example.lock;

import java.util.concurrent.locks.StampedLock;

public class LockExample4 {

    class Point {
        private double x, y;
        private final StampedLock sl = new StampedLock();

        void move(double deltaX, double deltaY) { // an exclusively locked method
            long stamp = sl.writeLock();
            try {
                x += deltaX;
                y += deltaY;
            } finally {
                sl.unlockWrite(stamp);
            }
        }

        //下面看看樂觀讀鎖案例
        double distanceFromOrigin() { // A read-only method
            long stamp = sl.tryOptimisticRead(); //獲得一個(gè)樂觀讀鎖
            double currentX = x, currentY = y;  //將兩個(gè)字段讀入本地局部變量
            if (!sl.validate(stamp)) { //檢查發(fā)出樂觀讀鎖后同時(shí)是否有其他寫鎖發(fā)生?
                stamp = sl.readLock();  //如果沒有普泡,我們再次獲得一個(gè)讀悲觀鎖
                try {
                    currentX = x; // 將兩個(gè)字段讀入本地局部變量
                    currentY = y; // 將兩個(gè)字段讀入本地局部變量
                } finally {
                    sl.unlockRead(stamp);
                }
            }
            return Math.sqrt(currentX * currentX + currentY * currentY);
        }

        //下面是悲觀讀鎖案例
        void moveIfAtOrigin(double newX, double newY) { // upgrade
            // Could instead start with optimistic, not read mode
            long stamp = sl.readLock();
            try {
                while (x == 0.0 && y == 0.0) { //循環(huán)播掷,檢查當(dāng)前狀態(tài)是否符合
                    long ws = sl.tryConvertToWriteLock(stamp); //將讀鎖轉(zhuǎn)為寫鎖
                    if (ws != 0L) { //這是確認(rèn)轉(zhuǎn)為寫鎖是否成功
                        stamp = ws; //如果成功 替換票據(jù)
                        x = newX; //進(jìn)行狀態(tài)改變
                        y = newY;  //進(jìn)行狀態(tài)改變
                        break;
                    } else { //如果不能成功轉(zhuǎn)換為寫鎖
                        sl.unlockRead(stamp);  //我們顯式釋放讀鎖
                        stamp = sl.writeLock();  //顯式直接進(jìn)行寫鎖 然后再通過循環(huán)再試
                    }
                }
            } finally {
                sl.unlock(stamp); //釋放讀鎖或?qū)戞i
            }
        }
    }
}
package com.alan.concurrency.example.lock;

import com.alan.concurrency.annoations.ThreadSafe;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.locks.StampedLock;

@Slf4j
@ThreadSafe
public class LockExample5 {

    // 請求總數(shù)
    public static int clientTotal = 5000;

    // 同時(shí)并發(fā)執(zhí)行的線程數(shù)
    public static int threadTotal = 200;

    public static int count = 0;

    private final static StampedLock lock = new StampedLock();

    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int i = 0; i < clientTotal ; i++) {
            executorService.execute(() -> {
                try {
                    semaphore.acquire();
                    add();
                    semaphore.release();
                } catch (Exception e) {
                    log.error("exception", e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("count:{}", count);
    }

    private static void add() {
        long stamp = lock.writeLock();
        try {
            count++;
        } finally {
            lock.unlock(stamp);
        }
    }
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市撼班,隨后出現(xiàn)的幾起案子歧匈,更是在濱河造成了極大的恐慌,老刑警劉巖砰嘁,帶你破解...
    沈念sama閱讀 211,948評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件件炉,死亡現(xiàn)場離奇詭異,居然都是意外死亡矮湘,警方通過查閱死者的電腦和手機(jī)斟冕,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,371評論 3 385
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來缅阳,“玉大人磕蛇,你說我怎么就攤上這事∈欤” “怎么了秀撇?”我有些...
    開封第一講書人閱讀 157,490評論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長向族。 經(jīng)常有香客問我呵燕,道長,這世上最難降的妖魔是什么炸枣? 我笑而不...
    開封第一講書人閱讀 56,521評論 1 284
  • 正文 為了忘掉前任虏等,我火速辦了婚禮弄唧,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘霍衫。我一直安慰自己候引,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,627評論 6 386
  • 文/花漫 我一把揭開白布敦跌。 她就那樣靜靜地躺著澄干,像睡著了一般。 火紅的嫁衣襯著肌膚如雪柠傍。 梳的紋絲不亂的頭發(fā)上麸俘,一...
    開封第一講書人閱讀 49,842評論 1 290
  • 那天,我揣著相機(jī)與錄音惧笛,去河邊找鬼从媚。 笑死,一個(gè)胖子當(dāng)著我的面吹牛患整,可吹牛的內(nèi)容都是我干的拜效。 我是一名探鬼主播,決...
    沈念sama閱讀 38,997評論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼各谚,長吁一口氣:“原來是場噩夢啊……” “哼紧憾!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起昌渤,我...
    開封第一講書人閱讀 37,741評論 0 268
  • 序言:老撾萬榮一對情侶失蹤赴穗,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后膀息,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體般眉,經(jīng)...
    沈念sama閱讀 44,203評論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,534評論 2 327
  • 正文 我和宋清朗相戀三年潜支,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了煤篙。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,673評論 1 341
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡毁腿,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出苛茂,到底是詐尸還是另有隱情已烤,我是刑警寧澤,帶...
    沈念sama閱讀 34,339評論 4 330
  • 正文 年R本政府宣布妓羊,位于F島的核電站胯究,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏躁绸。R本人自食惡果不足惜裕循,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,955評論 3 313
  • 文/蒙蒙 一臣嚣、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧剥哑,春花似錦硅则、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,770評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至困介,卻和暖如春大审,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背座哩。 一陣腳步聲響...
    開封第一講書人閱讀 32,000評論 1 266
  • 我被黑心中介騙來泰國打工徒扶, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人根穷。 一個(gè)月前我還...
    沈念sama閱讀 46,394評論 2 360
  • 正文 我出身青樓姜骡,卻偏偏與公主長得像,于是被迫代替她去往敵國和親缠诅。 傳聞我的和親對象是個(gè)殘疾皇子溶浴,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,562評論 2 349

推薦閱讀更多精彩內(nèi)容