Java并發(fā) - 并發(fā)工具類

AQS

可以先查看 Java并發(fā) - 讀寫鎖與AQS簡單了解 進(jìn)行簡單的了解

AQS中的隊(duì)列

通過維護(hù) state 進(jìn)行加鎖和解鎖(含讀鎖(共享鎖)和寫鎖(獨(dú)占鎖))

    /**
     * The synchronization state.
     */
    private volatile int state;

繼承 AbstractQueuedSynchronizer 后需要實(shí)現(xiàn)的方法

  • isHeldExclusively():該線程是否正在獨(dú)占資源氛魁。只有用到condition才需要去實(shí)現(xiàn)它其屏。
  • tryAcquire(int):獨(dú)占方式。嘗試獲取資源,成功則返回true拥知,失敗則返回false。
  • tryRelease(int):獨(dú)占方式蟋座。嘗試釋放資源褐澎,成功則返回true,失敗則返回false烈评。
  • tyAcquireShared(int):共享方式火俄。嘗試獲取資源。負(fù)數(shù)表示失敗;0表示成功讲冠,但沒有剩余可用資源;正數(shù)表示成功瓜客,且有剩余資源。
  • tryReleaseShared(int):共享方式竿开。嘗試釋放資源谱仪,如果釋放后允許喚醒后續(xù)等待結(jié)點(diǎn)返回true,否則返回false否彩。

1. Semaphore (依賴了 AQS 定義的模板方法)

Semaphore是一個(gè)計(jì)數(shù)信號(hào)量疯攒,常用于限制可以訪問某些資源(物理或邏輯的)線程數(shù)目。
簡單說列荔,是一種用來控制并發(fā)量的共享鎖敬尺。


例子
package concurrent;

import java.util.concurrent.Semaphore;

public class Semaphore_Demo {
     static Semaphore sp =new Semaphore(6);


    public static void main(String[] args) {
        for (int i = 0; i <1000 ; i++) {
            new Thread(()->{
                try {
                    sp.acquire(); //就是拿信號(hào)量
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("當(dāng)前信號(hào)量限制....");
                try {
                    Thread.sleep(2000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("執(zhí)行完了");
                sp.release(); //執(zhí)行完了,就釋放信號(hào)量
            }).start();
        }
    }
}
當(dāng)前信號(hào)量限制....
當(dāng)前信號(hào)量限制....
當(dāng)前信號(hào)量限制....
當(dāng)前信號(hào)量限制....
當(dāng)前信號(hào)量限制....
當(dāng)前信號(hào)量限制....
執(zhí)行完了
執(zhí)行完了
執(zhí)行完了
執(zhí)行完了
當(dāng)前信號(hào)量限制....
執(zhí)行完了
執(zhí)行完了
當(dāng)前信號(hào)量限制....
.....

以上例子表明贴浙,Semaphore(6) 限制了只能同時(shí)有六個(gè)線程執(zhí)行砂吞,也就起到了限流的作用

  • 自己實(shí)現(xiàn)Semaphore的例子
package concurrent;

import java.util.concurrent.locks.AbstractQueuedSynchronizer;

public class MySemaphore {

    private Sync sync;

    public MySemaphore(int permits) {
        sync = new Sync(permits);
    }

    public void acquire() {
        sync.acquireShared(1);
    }

    public void release() {
        sync.releaseShared(1);
    }


    class Sync extends AbstractQueuedSynchronizer {
        // 信號(hào)量大小
        private int permits;

        public Sync(int permits) {
            this.permits = permits;
        }

        @Override
        protected int tryAcquireShared(int arg) {
            //定義自己的方法
            int state = getState();
            int nextState = state + arg;

            if (nextState <= permits) {
                if (compareAndSetState(state, nextState))
                    return 1;
            }
            return -1;
        }

        @Override
        protected boolean tryReleaseShared(int arg) {
            int state = getState();
            if (compareAndSetState(state, state - arg)) {
                return true;
            } else {
                return false;
            }

        }
    }
}

執(zhí)行代碼

package concurrent;

public class Semaphore_Demo {
     static MySemaphore sp =new MySemaphore(6);


    public static void main(String[] args) {
        for (int i = 0; i <1000 ; i++) {
            new Thread(()->{
                sp.acquire(); //就是拿信號(hào)量
                System.out.println("當(dāng)前信號(hào)量限制....");
                try {
                    Thread.sleep(2000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("執(zhí)行完了");
                sp.release(); //執(zhí)行完了,就釋放信號(hào)量
            }).start();
        }
    }
}

執(zhí)行結(jié)果

當(dāng)前信號(hào)量限制....
當(dāng)前信號(hào)量限制....
當(dāng)前信號(hào)量限制....
當(dāng)前信號(hào)量限制....
當(dāng)前信號(hào)量限制....
當(dāng)前信號(hào)量限制....
執(zhí)行完了
執(zhí)行完了
執(zhí)行完了
執(zhí)行完了
執(zhí)行完了
執(zhí)行完了

2. CountDownLatch (依賴了 AQS 定義的模板方法)

即實(shí)現(xiàn)了一種計(jì)數(shù)器崎溃,可以認(rèn)為是倒數(shù)計(jì)時(shí)蜻直,比方說 從 6 開始依次遞減(此時(shí)只是準(zhǔn)備就緒),然后到 0 的時(shí)候再一起啟動(dòng)。
好比有這樣的一個(gè)跑步比賽概而,比賽開始之前唤殴,所有的選手都需要向裁判示意已經(jīng)準(zhǔn)備就緒,一共有六個(gè)選手到腥,那么每一個(gè)選手向裁判示意準(zhǔn)備就緒朵逝,未準(zhǔn)備就緒的人數(shù)就減1,直到所有的選手都準(zhǔn)備就緒乡范,此時(shí)未準(zhǔn)備就緒的人數(shù)為0配名,那么裁判就會(huì)扣下發(fā)令槍示意比賽開始。

例子
package concurrent;

import java.util.concurrent.CountDownLatch;

public class CountDownLatch_Demo {

    public static void main(String[] args) throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(6); //計(jì)數(shù)為6


        for (int i = 0; i <6 ; i++) {
            new Thread(()->{
                System.out.println("開始準(zhǔn)備.....");
                latch.countDown();//計(jì)數(shù)減一
            }).start();
            Thread.sleep(1000);
        }

        latch.await(); //每個(gè)線程執(zhí)行一次晋辆,則-1渠脉,在latch為0的時(shí)候開始向下運(yùn)行 這是這些線程都準(zhǔn)備就緒,然后去一起干同一件事

        //還有一種方式瓶佳, 將一個(gè)活分為多段芋膘,每個(gè)線程去干一段
//        for (int i = 0; i <6 ; i++) {
//            new Thread(()->{
//                  latch.countDown(); // 計(jì)數(shù)減一
//                try {
//                    latch.await(); // 阻塞 -- > 0
//                    System.out.println("線程:"+Thread.currentThread().getName()+"執(zhí)行完畢");
//                } catch (InterruptedException e) {
//                    e.printStackTrace();
//                }
//            }).start();
//        }

        System.out.println("開始干活....");
    }
}
開始準(zhǔn)備.....
開始準(zhǔn)備.....
開始準(zhǔn)備.....
開始準(zhǔn)備.....
開始準(zhǔn)備.....
開始準(zhǔn)備.....
開始干活....
  • 自己實(shí)現(xiàn)CountDownLatch的例子
package concurrent;

import java.util.concurrent.locks.AbstractQueuedSynchronizer;

public class MyCountDownLatch {

    private Sync sync;

    public MyCountDownLatch(int count) {
        sync = new Sync(count);
    }

    public void countDown() {
        sync.releaseShared(1);
    }

    public void await() {
        sync.acquireShared(1);
    }


    class Sync extends AbstractQueuedSynchronizer {

        public Sync(int count) {
            setState(count);
        }


        @Override
        protected int tryAcquireShared(int arg) {
            return getState() == 0 ? 1 : -1;
        }

        @Override
        protected boolean tryReleaseShared(int arg) {
            while (true) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c - 1;
                if (compareAndSetState(c, nextc)) {
                    return nextc == 0;
                }
            }
        }
    }
}

執(zhí)行代碼

package concurrent;

import java.util.concurrent.CountDownLatch;

public class CountDownLatch_Demo {

    public static void main(String[] args) throws InterruptedException {
        MyCountDownLatch latch = new MyCountDownLatch(6); //計(jì)數(shù)為6


        for (int i = 0; i <6 ; i++) {
            new Thread(()->{
                System.out.println("開始準(zhǔn)備.....");
                latch.countDown();//計(jì)數(shù)減一
            }).start();
            Thread.sleep(1000);
        }

        latch.await(); //每個(gè)線程執(zhí)行一次,則-1霸饲,在latch為0的時(shí)候開始向下運(yùn)行 這是這些線程都準(zhǔn)備就緒为朋,然后去一起干同一件事

        //還有一種方式, 將一個(gè)活分為多段厚脉,每個(gè)線程去干一段
//        for (int i = 0; i <6 ; i++) {
//            new Thread(()->{
//                  latch.countDown(); // 計(jì)數(shù)減一
//                try {
//                    latch.await(); // 阻塞 -- > 0
//                    System.out.println("線程:"+Thread.currentThread().getName()+"執(zhí)行完畢");
//                } catch (InterruptedException e) {
//                    e.printStackTrace();
//                }
//            }).start();
//        }

        System.out.println("開始干活....");
    }
}

執(zhí)行結(jié)果

開始準(zhǔn)備.....
開始準(zhǔn)備.....
開始準(zhǔn)備.....
開始準(zhǔn)備.....
開始準(zhǔn)備.....
開始準(zhǔn)備.....
開始干活....

3. CyclicBarrier (依賴了可重入鎖的Condition和 signalAll)

循環(huán)柵欄习寸,可以循環(huán)利用的屏障。
舉例:排隊(duì)上摩天輪時(shí)傻工,每到齊四個(gè)人霞溪,就可以上同一個(gè)車廂。


例子
package concurrent;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class CyclicBarrier_Demo {

    public static void main(String[] args) throws InterruptedException {
        CyclicBarrier barrier = new CyclicBarrier(4);

        for (int i = 0; i < 100; i++) { //假設(shè)有100個(gè)任務(wù)中捆,每次只能有固定數(shù)量的線程去執(zhí)行鸯匹,可以使用這個(gè)
            new Thread(() -> {
                try {
                    barrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
                System.out.println("任務(wù)開始執(zhí)行");

            }).start();
            Thread.sleep(500L);
        }
    }
}
  • 自己實(shí)現(xiàn)CyclicBarrier 的例子
package concurrent;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class MyCyclicBarrier {

    private final ReentrantLock lock = new ReentrantLock();
    private final Condition condition = lock.newCondition();

    private int count = 0; //批次

    private final int parties; //多少線程準(zhǔn)備就緒?

    private Object generation = new Object(); // 類似于版本號(hào)泄伪,解決偽喚醒的問題

    public MyCyclicBarrier(int parties) {
        this.parties = parties;
    }


    public void await() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            final Object g = generation;

            int index = ++count;
            if (index == parties) {
                nextGeneration();
                return;
            }

            while (true) {
                try {
                    condition.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                if (g != generation) {
                    return;
                }
            }
        } finally {
            lock.unlock();
        }
    }

    public void nextGeneration() {
        condition.signalAll();
        count = 0;
        generation = new Object();
    }
}

執(zhí)行代碼

package concurrent;

public class CyclicBarrier_Demo {

    public static void main(String[] args) throws InterruptedException {
        MyCyclicBarrier barrier = new MyCyclicBarrier(4);

        for (int i = 0; i < 100; i++) { //假設(shè)有100個(gè)任務(wù)殴蓬,每次只能有固定數(shù)量的線程去執(zhí)行,可以使用這個(gè)
            new Thread(() -> {
                barrier.await();
                System.out.println("任務(wù)開始執(zhí)行");
            }).start();
            Thread.sleep(500L);
        }
    }
}

如果覺得有收獲就點(diǎn)個(gè)贊吧臂容,更多知識(shí)科雳,請(qǐng)點(diǎn)擊關(guān)注查看我的主頁信息哦~

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市脓杉,隨后出現(xiàn)的幾起案子糟秘,更是在濱河造成了極大的恐慌,老刑警劉巖球散,帶你破解...
    沈念sama閱讀 211,290評(píng)論 6 491
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件尿赚,死亡現(xiàn)場離奇詭異,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)凌净,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,107評(píng)論 2 385
  • 文/潘曉璐 我一進(jìn)店門悲龟,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人冰寻,你說我怎么就攤上這事须教。” “怎么了斩芭?”我有些...
    開封第一講書人閱讀 156,872評(píng)論 0 347
  • 文/不壞的土叔 我叫張陵轻腺,是天一觀的道長。 經(jīng)常有香客問我划乖,道長贬养,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,415評(píng)論 1 283
  • 正文 為了忘掉前任琴庵,我火速辦了婚禮误算,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘迷殿。我一直安慰自己儿礼,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,453評(píng)論 6 385
  • 文/花漫 我一把揭開白布贪庙。 她就那樣靜靜地躺著蜘犁,像睡著了一般翰苫。 火紅的嫁衣襯著肌膚如雪止邮。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,784評(píng)論 1 290
  • 那天奏窑,我揣著相機(jī)與錄音导披,去河邊找鬼。 笑死埃唯,一個(gè)胖子當(dāng)著我的面吹牛撩匕,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播墨叛,決...
    沈念sama閱讀 38,927評(píng)論 3 406
  • 文/蒼蘭香墨 我猛地睜開眼止毕,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了漠趁?” 一聲冷哼從身側(cè)響起扁凛,我...
    開封第一講書人閱讀 37,691評(píng)論 0 266
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎闯传,沒想到半個(gè)月后谨朝,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,137評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,472評(píng)論 2 326
  • 正文 我和宋清朗相戀三年字币,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了则披。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,622評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡洗出,死狀恐怖士复,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情翩活,我是刑警寧澤判没,帶...
    沈念sama閱讀 34,289評(píng)論 4 329
  • 正文 年R本政府宣布,位于F島的核電站隅茎,受9級(jí)特大地震影響澄峰,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜辟犀,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,887評(píng)論 3 312
  • 文/蒙蒙 一俏竞、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧堂竟,春花似錦魂毁、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,741評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至税稼,卻和暖如春烦秩,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背郎仆。 一陣腳步聲響...
    開封第一講書人閱讀 31,977評(píng)論 1 265
  • 我被黑心中介騙來泰國打工只祠, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人扰肌。 一個(gè)月前我還...
    沈念sama閱讀 46,316評(píng)論 2 360
  • 正文 我出身青樓抛寝,卻偏偏與公主長得像,于是被迫代替她去往敵國和親曙旭。 傳聞我的和親對(duì)象是個(gè)殘疾皇子盗舰,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,490評(píng)論 2 348

推薦閱讀更多精彩內(nèi)容