8-Java中的并發(fā)工具類

1.等待多線程完成的CountDownLatch

CountDownLatch允許一個(gè)或多個(gè)線程等待其他線程完成操作蝶糯。

假設(shè)有一個(gè)需求:需要解析一個(gè)Excel里多個(gè)sheet的數(shù)據(jù)费就,可以考慮使用多線程赐俗,每個(gè)線程解析一個(gè)sheet里的數(shù)據(jù)形纺,等到所有sheet解析完之后企软,程序需要提示解析完成拄丰。

使用join():

public class JoinCountDownLatchTest {
    public static void main(String[] args) throws InterruptedException {
        Thread parser1 = new Thread(() -> System.out.println("parser1 finish"));
        Thread parser2 = new Thread(() -> System.out.println("parser2 finish"));
        parser1.start();
        parser2.start();
        parser1.join();
        parser2.join();
        System.out.println("all parser finish");
    }
}

使用CountDownLatch:

public class CountDownLatchTest {
    static CountDownLatch c = new CountDownLatch(2);
    public static void main(String[] args) throws InterruptedException {
        new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println(1);
                c.countDown();
                System.out.println(2);
                c.countDown();
            }
        }).start();
        c.await();
        System.out.println("3");
    }
}

CountDownLatch的構(gòu)造函數(shù)接收一個(gè)int類型的參數(shù)作為計(jì)數(shù)器现斋,如果你想等待N個(gè)點(diǎn)完成喜最,這里就傳入N。

當(dāng)調(diào)用CountDownLatch的countDown方法時(shí)庄蹋,N就會(huì)減1瞬内,CountDownLatch的await方法會(huì)阻塞當(dāng)前線程,直到N變成零限书。這里所說的N個(gè)點(diǎn)虫蝶,可以是N個(gè)線程,也可以是1個(gè)線程里的N個(gè)執(zhí)行步驟倦西。

注意:計(jì)數(shù)器必須大于等于0能真,只是等于0時(shí),調(diào)用await方法時(shí)不會(huì)阻塞當(dāng)前線程扰柠。CountDownLatch不能重新初始化或修改對(duì)象的內(nèi)部計(jì)數(shù)器的值粉铐。

2.同步屏障CyclicBarrier

CyclicBarrier的字面意思是可循環(huán)使用(Cyclic)的屏障(Barrier)。它要做的事情是卤档,讓一組線程到達(dá)一個(gè)屏障時(shí)被阻塞蝙泼,知道最后一個(gè)線程到達(dá)屏障時(shí),屏障才會(huì)開門劝枣,所有被屏障攔截的線程才會(huì)繼續(xù)運(yùn)行汤踏。

①CyclicBarrier簡(jiǎn)介

默認(rèn)構(gòu)造方法CyclicBarrier(int parties),parties標(biāo)識(shí)屏障攔截的線程數(shù)量舔腾,每個(gè)線程調(diào)用await方法告訴CyclicBarrier它已經(jīng)到達(dá)了屏障溪胶,然后當(dāng)前線程被阻塞。

public class CynlicBarrierTest {
    static CyclicBarrier c = new CyclicBarrier(2);
    public static void main(String[] args) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    c.await();
                } catch (Exception e) {
                }
                System.out.println(1);
            }
        }).start();
        try{
            c.await();
        } catch (Exception e) {
        }
        System.out.println(2);
    }
}

輸出結(jié)果:

1
2

或者

2
1

如果把 new CyclicBarrier(2) 改成 new CyclicBarrier(3) 稳诚,則主線程和子線程會(huì)永遠(yuǎn)等待哗脖,因?yàn)闆]有第三個(gè)線程執(zhí)行await方法。

如果起了三個(gè)線程調(diào)用await,但是new CyclicBarrier(2)懒熙,依然會(huì)永遠(yuǎn)等待丘损,因?yàn)楫?dāng)CyclicBarrier的count為0時(shí),會(huì)重置為parties工扎。

            int index = --count;
            if (index == 0) {  // tripped
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();
                    ranAction = true;
                    nextGeneration();
                    return 0;
                } finally {
                    if (!ranAction)
                        breakBarrier();
                }
            }
    private void nextGeneration() {
        // signal completion of last generation
        trip.signalAll();
        // set up next generation
        count = parties;
        generation = new Generation();
    }

另一個(gè)構(gòu)造器:CyclicBarrier(int parties, Runnable barrierAction)徘钥,用于在線程到達(dá)屏障時(shí),優(yōu)先執(zhí)行barrierAction肢娘,方便更負(fù)責(zé)的業(yè)務(wù)場(chǎng)景:

public class CynlicBarrierTest2 {

    static CyclicBarrier c = new CyclicBarrier(2, new A());

    public static void main(String[] args) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    c.await();
                } catch (Exception e) {
                }
                System.out.println(1);
            }
        }).start();
        try{
            c.await();
        } catch (Exception e) {
        }
        System.out.println(2);
    }
    static class A implements Runnable {
        @Override
        public void run() {
            System.out.println(3);
        }
    }
}

輸出結(jié)果:

3
2
1

或者

3
1
2

②CyclicBarrier的應(yīng)用場(chǎng)景

CyclicBarrier可用于多線程計(jì)算數(shù)據(jù)呈础,最后合并計(jì)算結(jié)果的場(chǎng)景。

如:用一個(gè)Excel保存了用戶所有銀行流水橱健,每個(gè)sheet薄脆一個(gè)賬戶近一年的每筆銀行流水而钞,現(xiàn)在需要統(tǒng)計(jì)用戶的日均銀行流水,最后再用barrierAction用這些線程的計(jì)算結(jié)果拘荡,計(jì)算出整個(gè)Excel的日均銀行流水臼节。

/**
 * 銀行流水處理服務(wù)類
 */
public class BankWaterService implements Runnable {
    /**
     * 創(chuàng)建4個(gè)屏障,處理完之后執(zhí)行當(dāng)前類的run方法
     */
    private CyclicBarrier c = new CyclicBarrier(4, this);
    /**
     * 假設(shè)只有4個(gè)sheet珊皿,所以只啟動(dòng)4個(gè)線程
     */
    private Executor executor = Executors.newFixedThreadPool(4);
    /**
     * 保存每個(gè)sheet計(jì)算出的銀行流水結(jié)果
     */
    private ConcurrentHashMap<String, Integer> sheetBankWaterCount = new ConcurrentHashMap<>();
    private void count() {
        for (int i = 0; i < 4; i++) {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    //計(jì)算當(dāng)前sheet的銀行流水?dāng)?shù)據(jù)网缝,計(jì)算代碼省略
                    sheetBankWaterCount.put(Thread.currentThread().getName(), 1);
                    //銀行流水計(jì)算完成插入一個(gè)屏障
                    try {
                        c.await();
                    } catch (InterruptedException | BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    }
    @Override
    public void run() {
        int result = 0;
        //匯總每個(gè)sheet計(jì)算出的結(jié)果
        for (Map.Entry<String, Integer> sheet : sheetBankWaterCount.entrySet()) {
            result += sheet.getValue();
        }
        //將結(jié)果輸出
        sheetBankWaterCount.put("result", result);
        System.out.println(result);
    }

    public static void main(String[] args) {
        BankWaterService bankWaterCount = new BankWaterService();
        bankWaterCount.count();
    }
}

輸出結(jié)果

4

③CyclicBarrier和CountDownLatch的區(qū)別

CountDownLatch的計(jì)數(shù)器只能使用一次,而CyclicBarrier的計(jì)數(shù)器考科一使用reset方法重置蟋定。

CyclicBarrier還提供其他有用的方法粉臊,比如:

  • getNumberWaiting():可以獲取CyclicBarrier阻塞的線程數(shù)。
  • isBroken():用來了解阻塞的線程是否被中斷驶兜。
public class CynlicBarrierTest3 {
    static CyclicBarrier c = new CyclicBarrier(2);

    public static void main(String[] args) {
        Thread thread = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    c.await();
                } catch (Exception e) {
                }
            }
        });
        thread.start();
        thread.interrupt();
        try{
            c.await();
        } catch (Exception e) {
            System.out.println(c.isBroken());
        }
    }
}

輸出:

true

3.控制并發(fā)線程數(shù)的Semaphore

Semaphore(信號(hào)量)是用來控制同時(shí)訪問特定資源的線程數(shù)量扼仲,它通過協(xié)調(diào)各個(gè)線程,以保障合理的使用公共資源抄淑。

①應(yīng)用場(chǎng)景

Semaphore可以用于做流量控制屠凶,特別是共用資源有限的應(yīng)用場(chǎng)景,比如數(shù)據(jù)庫連接肆资。

如:讀取幾萬個(gè)文件的數(shù)據(jù)矗愧,因?yàn)槎际荌O密集型人物,我們可以啟動(dòng)幾十個(gè)線程并發(fā)地讀取迅耘,但是如果讀到內(nèi)存后贱枣,還需要存儲(chǔ)到數(shù)據(jù)庫中监署,而數(shù)據(jù)庫的連接數(shù)只有10個(gè)颤专,這時(shí)我們必須控制只有10個(gè)線程同時(shí)獲取數(shù)據(jù)庫連接保存數(shù)據(jù),否則會(huì)報(bào)錯(cuò)無法獲取數(shù)據(jù)庫連接钠乏。

public class SemaphoreTest {
    private static final int THREAD_COUNT = 30;
    private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT);
    private static Semaphore s = new Semaphore(10);

    public static void main(String[] args) {
        for (int i = 0; i < THREAD_COUNT; i++) {
            threadPool.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        s.acquire();//獲取一個(gè)許可證
                        System.out.println("save data");
                        s.release();//歸還許可證
                    } catch (InterruptedException e) {
                    }
                }
            });
        }
        threadPool.shutdown();
    }
}

代碼中栖秕,雖然有30個(gè)線程在執(zhí)行,但是只允許10個(gè)并發(fā)執(zhí)行晓避。

構(gòu)造方法:Semaphore(int permits)簇捍,permits表示可用的許可證數(shù)量只壳。

②其他方法

  • boolean tryAcquire():嘗試獲取許可證。
  • int availablePermits():返回此信號(hào)量中當(dāng)前可用的許可證數(shù)暑塑。
  • int getQueueLength():返回正在等待獲取許可證的線程數(shù)吼句。
  • boolean hasQueuedThreads():是否有線程正在等待獲取許可證。
  • void reducePermits(int reduction):減少reduction個(gè)許可證事格,是個(gè)protected方法惕艳。
  • Collection<Thread> getQueuedThreads():返回所有等待獲取許可證的線程集合,是個(gè)protected方法驹愚。

4.線程間交換數(shù)據(jù)的Exchanger

Exchanger(交換者)是一個(gè)用于線程間協(xié)作的工具類远搪。用于進(jìn)行線程間的數(shù)據(jù)交換。它提供一個(gè)同步點(diǎn)逢捺,在這個(gè)同步點(diǎn)谁鳍,兩個(gè)線程可以交換彼此的數(shù)據(jù)。

這兩個(gè)線程通過exchange方法交換數(shù)據(jù)劫瞳,如果第一個(gè)線程先執(zhí)行exchange方法倘潜,它會(huì)一直等待第二個(gè)線程也執(zhí)行exchange方法,當(dāng)兩個(gè)線程都到達(dá)同步點(diǎn)時(shí)柠新,這兩個(gè)線程可以交換數(shù)據(jù)窍荧,將本線程生產(chǎn)出來的數(shù)據(jù)傳遞給對(duì)方。

public class ExchangerTest {
    private static final Exchanger<String> exgr = new Exchanger<>();
    private static ExecutorService threadPool = Executors.newFixedThreadPool(2);

    public static void main(String[] args) {
        threadPool.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    String a = "銀行流水A";//A錄入銀行流水?dāng)?shù)據(jù)
                    String exchange = exgr.exchange(a);
                    System.out.println(exchange);
                } catch (InterruptedException e) {
                }
            }
        });
        threadPool.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    String b = "銀行流水B";//B錄入銀行流水?dāng)?shù)據(jù)
                    String a = exgr.exchange(b);
                    System.out.println("A和B數(shù)據(jù)是否一致:" + a.equals(b) + "恨憎,A錄入的是:" + a + "蕊退,B錄入的是:" + b);
                } catch (InterruptedException e) {
                }
            }
        });
        threadPool.shutdown();
    }
}

如果擔(dān)心有特殊情況發(fā)生,避免一直等待憔恳,可以使用exchange(V x, long timeout, TimeUnit unit)設(shè)置最大等待時(shí)長瓤荔。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市钥组,隨后出現(xiàn)的幾起案子输硝,更是在濱河造成了極大的恐慌,老刑警劉巖程梦,帶你破解...
    沈念sama閱讀 221,273評(píng)論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件点把,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡屿附,警方通過查閱死者的電腦和手機(jī)郎逃,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,349評(píng)論 3 398
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來挺份,“玉大人褒翰,你說我怎么就攤上這事。” “怎么了优训?”我有些...
    開封第一講書人閱讀 167,709評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵蚤吹,是天一觀的道長抹缕。 經(jīng)常有香客問我吭历,道長达址,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 59,520評(píng)論 1 296
  • 正文 為了忘掉前任早敬,我火速辦了婚禮魂拦,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘搁嗓。我一直安慰自己芯勘,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,515評(píng)論 6 397
  • 文/花漫 我一把揭開白布腺逛。 她就那樣靜靜地躺著荷愕,像睡著了一般。 火紅的嫁衣襯著肌膚如雪棍矛。 梳的紋絲不亂的頭發(fā)上安疗,一...
    開封第一講書人閱讀 52,158評(píng)論 1 308
  • 那天,我揣著相機(jī)與錄音够委,去河邊找鬼荐类。 笑死,一個(gè)胖子當(dāng)著我的面吹牛茁帽,可吹牛的內(nèi)容都是我干的玉罐。 我是一名探鬼主播,決...
    沈念sama閱讀 40,755評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼潘拨,長吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼吊输!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起铁追,我...
    開封第一講書人閱讀 39,660評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤季蚂,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后琅束,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體扭屁,經(jīng)...
    沈念sama閱讀 46,203評(píng)論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,287評(píng)論 3 340
  • 正文 我和宋清朗相戀三年涩禀,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了料滥。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,427評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡埋泵,死狀恐怖幔欧,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情丽声,我是刑警寧澤礁蔗,帶...
    沈念sama閱讀 36,122評(píng)論 5 349
  • 正文 年R本政府宣布,位于F島的核電站雁社,受9級(jí)特大地震影響浴井,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜霉撵,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,801評(píng)論 3 333
  • 文/蒙蒙 一磺浙、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧徒坡,春花似錦撕氧、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,272評(píng)論 0 23
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至锦溪,卻和暖如春不脯,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背刻诊。 一陣腳步聲響...
    開封第一講書人閱讀 33,393評(píng)論 1 272
  • 我被黑心中介騙來泰國打工防楷, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人则涯。 一個(gè)月前我還...
    沈念sama閱讀 48,808評(píng)論 3 376
  • 正文 我出身青樓复局,卻偏偏與公主長得像,于是被迫代替她去往敵國和親粟判。 傳聞我的和親對(duì)象是個(gè)殘疾皇子肖揣,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,440評(píng)論 2 359

推薦閱讀更多精彩內(nèi)容