并發(fā)編程08--Java中的并發(fā)工具類

在JDK的并發(fā)包里提供了幾個(gè)非常有用的并發(fā)工具類。CountDownLatch景东、CyclicBarrier和Semaphore工具類提供了一種并發(fā)流程控制的手段砂轻,Exchanger工具類則提供了在線程間交換數(shù)據(jù)的一種手段。


等待多線程完成的CountDownLatch

CountDownLatch允許一個(gè)或多個(gè)線程等待其他線程完成操作斤吐。

假設(shè)有這樣一個(gè)需求:我們需要解析一個(gè)Excel里多個(gè)sheet的數(shù)據(jù)搔涝,此時(shí)可以考慮使用多線程,每個(gè)線程解析一個(gè)sheet里的數(shù)據(jù)和措,等到所有的sheet都解析完之后庄呈,程序需要提示解析完成诬留。在這個(gè)需求中文兑,要實(shí)現(xiàn)主線程等待所有線程完成sheet的解析操作彩届,最簡(jiǎn)單的做法是使用join()方法


JoinCountDownLatchTest.java

join用于讓當(dāng)前執(zhí)行線程等待join線程執(zhí)行結(jié)束。其實(shí)現(xiàn)原理是不停檢查join線程是否存活歼冰,如果join線程存活則讓當(dāng)前線程永遠(yuǎn)等待。其中,wait(0)表示永遠(yuǎn)等待下去秸滴,代碼片段如下。


直到j(luò)oin線程中止后,線程的this.notifyAll()方法會(huì)被調(diào)用.
在JDK 1.5之后的并發(fā)包中提供的CountDownLatch也可以實(shí)現(xiàn)join的功能,并且比join的功能更多

public class CountDownLatchTest {
    static CountDownLatch c = new CountDownLatch(2);
    public static void main(String[] args) throws InterruptedException {
        new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println(1);
                c.countDown();
                System.out.println(2);
                c.countDown();
            }
        }).start();
        c.await();
        System.out.println("3");
    }
}

CountDownLatch的構(gòu)造函數(shù)接收一個(gè)int類型的參數(shù)作為計(jì)數(shù)器,如果你想等待N個(gè)點(diǎn)完成,這里就傳入N周叮。

調(diào)用CountDownLatch的countDown方法時(shí)各薇,N就會(huì)減1开缎,CountDownLatch的await方法會(huì)阻塞當(dāng)前線程疗认,直到N變成零。由于countDown方法可以用在任何地方铝宵,所以這里說(shuō)的N個(gè)點(diǎn),可以是N個(gè)線程,也可以是1個(gè)線程里的N個(gè)執(zhí)行步驟。用在多個(gè)線程時(shí),只需要把這個(gè)CountDownLatch的引用傳遞到線程里即可地回。

如果有某個(gè)解析sheet的線程處理得比較慢细睡,我們不可能讓主線程一直等待,所以可以使用另外一個(gè)帶指定時(shí)間的await方法——await(long time购公,TimeUnit unit)萌京,這個(gè)方法等待特定時(shí) 間后,就會(huì)不再阻塞當(dāng)前線程靠瞎。join也有類似的方法比庄。

注意:計(jì)數(shù)器必須大于等于0,只是等于0時(shí)候,計(jì)數(shù)器就是零藻三,調(diào)用await方法時(shí)不會(huì)阻塞當(dāng)前線程棵帽。CountDownLatch不可能重新初始化或者修改CountDownLatch對(duì)象的內(nèi)部計(jì)數(shù)器的值熄求。一個(gè)線程調(diào)用countDown方法happen-before,另外一個(gè)線程調(diào)用await方法仗谆。


CyclicBarrier的字面意思是可循環(huán)使用(Cyclic)的屏障(Barrier)指巡。它要做的事情是,讓一組線程到達(dá)一個(gè)屏障(也可以叫同步點(diǎn))時(shí)被阻塞隶垮,直到最后一個(gè)線程到達(dá)屏障時(shí)藻雪,屏障才會(huì)開門,所有被屏障攔截的線程才會(huì)繼續(xù)運(yùn)行狸吞。

CyclicBarrier簡(jiǎn)介

CyclicBarrier默認(rèn)的構(gòu)造方法是CyclicBarrier(int parties)勉耀,其參數(shù)表示屏障攔截的線程數(shù)量,每個(gè)線程調(diào)用await方法告訴CyclicBarrier我已經(jīng)到達(dá)了屏障蹋偏,然后當(dāng)前線程被阻塞便斥。

import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierTest {
    static CyclicBarrier c = new CyclicBarrier(2);
    public static void main(String[] args) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    c.await();
                } catch (Exception e) {
                }
                System.out.println(1);
            }
        }).start();

        try {
            c.await();
        } catch (Exception e) {

        }
        System.out.println(2);
    }
}

因?yàn)橹骶€程和子線程的調(diào)度是由CPU決定的,兩個(gè)線程都有可能先執(zhí)行威始,所以會(huì)產(chǎn)生兩種
輸出枢纠,第一種可能輸出如下。

第二種可能輸出如下黎棠。


如果把new CyclicBarrier(2)修改成new CyclicBarrier(3)晋渺,則主線程和子線程會(huì)永遠(yuǎn)等待,因?yàn)闆](méi)有第三個(gè)線程執(zhí)行await方法脓斩,即沒(méi)有第三個(gè)線程到達(dá)屏障木西,所以之前到達(dá)屏障的兩個(gè)線程都不會(huì)繼續(xù)執(zhí)行。

CyclicBarrier還提供一個(gè)更高級(jí)的構(gòu)造函數(shù)CyclicBarrier(int parties随静,Runnable barrier?Action)八千,用于在線程達(dá)屏障時(shí),優(yōu)先執(zhí)行barrierAction,方便處理更復(fù)雜的業(yè)務(wù)場(chǎng)景


CyclicBarrier的應(yīng)用場(chǎng)景

CyclicBarrier可以用于多線程計(jì)算數(shù)據(jù)恋捆,最后合并計(jì)算結(jié)果的場(chǎng)景照皆。等待所有指定數(shù)量完成后進(jìn)行統(tǒng)一操作.

CyclicBarrier和CountDownLatch的區(qū)別

CountDownLatch的計(jì)數(shù)器只能使用一次,而CyclicBarrier的計(jì)數(shù)器可以使用reset()方法重置鸠信。所以CyclicBarrier能處理更為復(fù)雜的業(yè)務(wù)場(chǎng)景纵寝。例如,如果計(jì)算發(fā)生錯(cuò)誤星立,可以重置計(jì)數(shù)器爽茴,并讓線程重新執(zhí)行一次。

CyclicBarrier還提供其他有用的方法绰垂,比如getNumberWaiting方法可以獲得Cyclic-Barrier阻塞的線程數(shù)量室奏。isBroken()方法用來(lái)了解阻塞的線程是否被中斷。

import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierTest3 {

    static CyclicBarrier barrier = new CyclicBarrier(2);

    public static void main(String[] args) throws Exception{
        Thread thread = new Thread(() -> {
            try {
                barrier.await();
            } catch (Exception e) {
            }
        });
        thread.start();
        thread.interrupt();
        try {
            barrier.await();
        } catch (Exception e) {
            System.out.println(barrier.isBroken());
        }
    }
}

輸出結(jié)果:

true


控制并發(fā)線程數(shù)的Semaphore

Semaphore(信號(hào)量)是用來(lái)控制同時(shí)訪問(wèn)特定資源的線程數(shù)量劲装,它通過(guò)協(xié)調(diào)各個(gè)線程胧沫,以保證合理的使用公共資源。

應(yīng)用場(chǎng)景

Semaphore可以用于做流量控制占业,特別是公用資源有限的應(yīng)用場(chǎng)景绒怨,比如數(shù)據(jù)庫(kù)連接。

假如,有一個(gè)需求谦疾,要讀取幾萬(wàn)個(gè)文件的數(shù)據(jù)南蹂,因?yàn)槎际荌O密集型任務(wù),我們可以啟動(dòng)幾十個(gè)線程并發(fā)地讀取念恍,但是如果讀到內(nèi)存后六剥,還需要存儲(chǔ)到數(shù)據(jù)庫(kù)中,而數(shù)據(jù)庫(kù)的連接數(shù)只有10個(gè)峰伙,這時(shí)我們必須控制只有10個(gè)線程同時(shí)獲取數(shù)據(jù)庫(kù)連接保存數(shù)據(jù)疗疟,否則會(huì)報(bào)錯(cuò)無(wú)法獲取數(shù)據(jù)庫(kù)連接。這個(gè)時(shí)候瞳氓,就可以使用Semaphore來(lái)做流量控制.

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

public class SemaphoreTest {
    private static final int THREAD_COUNT = 30;
    private static ExecutorService threadPool = Executors
            .newFixedThreadPool(THREAD_COUNT);

    static int counter  = 1;

    private static Semaphore s = new Semaphore(10);
    public static void main(String[] args) {
        for (int i = 0; i< THREAD_COUNT; i++) {
            threadPool.execute(() -> {
                try {
                    s.acquire();
                    System.out.println("save data "+counter++);
                    s.release();
                } catch (InterruptedException ignored) {
                }
            });
        }
        threadPool.shutdown();
    }
}

在代碼中策彤,雖然有30個(gè)線程在執(zhí)行,但是只允許10個(gè)并發(fā)執(zhí)行匣摘。Semaphore的構(gòu)造方法Semaphore(int permits)接受一個(gè)整型的數(shù)字锅锨,表示可用的許可證數(shù)量。Semaphore(10)表示允許10個(gè)線程獲取許可證恋沃,也就是最大并發(fā)數(shù)是10。Semaphore的用法也很簡(jiǎn)單必指,首先線程使用Semaphore的acquire()方法獲取一個(gè)許可證囊咏,使用完之后調(diào)用release()方法歸還許可證。還可以用tryAcquire()方法嘗試獲取許可證。

其他方法

Semaphore還提供一些其他方法梅割,具體如下霜第。

  • intavailablePermits():返回此信號(hào)量中當(dāng)前可用的許可證數(shù)。
  • intgetQueueLength():返回正在等待獲取許可證的線程數(shù)户辞。
  • booleanhasQueuedThreads():是否有線程正在等待獲取許可證泌类。
  • void reducePermits(int reduction):減少reduction個(gè)許可證,是個(gè)protected方法底燎。
  • Collection getQueuedThreads():返回所有等待獲取許可證的線程集合刃榨,是個(gè)protected方法。

線程間交換數(shù)據(jù)的Exchanger

Exchanger(交換者)是一個(gè)用于線程間協(xié)作的工具類双仍。

Exchanger用于進(jìn)行線程間的數(shù)據(jù)交換枢希。它提供一個(gè)同步點(diǎn),在這個(gè)同步點(diǎn)朱沃,兩個(gè)線程可以交換彼此的數(shù)據(jù)苞轿。

  • 這兩個(gè)線程通過(guò)exchange方法交換數(shù)據(jù),如果第一個(gè)線程先執(zhí)行exchange()方法逗物,它會(huì)一直等待第二個(gè)線程也執(zhí)行exchange方法搬卒,當(dāng)兩個(gè)線程都到達(dá)同步點(diǎn)時(shí),這兩個(gè)線程就可以交換數(shù)據(jù)翎卓,將本線程生產(chǎn)出來(lái)的數(shù)據(jù)傳遞給對(duì)方契邀。

Exchanger的應(yīng)用場(chǎng)景

  • Exchanger可以用于遺傳算法,遺傳算法里需要選出兩個(gè)人作為交配對(duì)象莲祸,這時(shí)候會(huì)交換兩人的數(shù)據(jù)蹂安,并使用交叉規(guī)則得出2個(gè)交配結(jié)果。
  • Exchanger也可以用于校對(duì)工作锐帜,比如我們需要將紙制銀行流水通過(guò)人工的方式錄入成電子銀行流水田盈,為了避免錯(cuò)誤,采用AB崗兩人進(jìn)行錄入缴阎,錄入到Excel之后允瞧,系統(tǒng)需要加載這兩個(gè)Excel,并對(duì)兩個(gè)Excel數(shù)據(jù)進(jìn)行校對(duì)蛮拔,看看是否錄入一致
import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ExchangerTest {
    private static final Exchanger<String> exgr = new Exchanger<String>();
    private static ExecutorService threadPool = Executors.newFixedThreadPool(2);
    public static void main(String[] args) {
        threadPool.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    String A = "銀行流水A"; // A錄入銀行流水?dāng)?shù)據(jù)
                    exgr.exchange(A);
                } catch (InterruptedException e) {
                }
            }
        });
        threadPool.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    String B = "銀行流水B"; // B錄入銀行流水?dāng)?shù)據(jù)
                    String A = exgr.exchange("B");
                    System.out.println("A和B數(shù)據(jù)是否一致:" + A.equals(B) + "述暂,A錄入的是:"
                            + A + ",B錄入是:" + B);
                } catch (InterruptedException e) {
                }
            }
        });
        threadPool.shutdown();
    }
}

如果兩個(gè)線程有一個(gè)沒(méi)有執(zhí)行exchange()方法建炫,則會(huì)一直等待畦韭,如果擔(dān)心有特殊情況發(fā)生,避免一直等待肛跌,可以使用exchange(V x艺配,longtimeout察郁,TimeUnit unit)設(shè)置最大等待時(shí)長(zhǎng)。


參考書籍:《Java并發(fā)編程的藝術(shù)》

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末转唉,一起剝皮案震驚了整個(gè)濱河市皮钠,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌赠法,老刑警劉巖麦轰,帶你破解...
    沈念sama閱讀 221,548評(píng)論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異砖织,居然都是意外死亡款侵,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,497評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門镶苞,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)喳坠,“玉大人,你說(shuō)我怎么就攤上這事茂蚓『攫模” “怎么了?”我有些...
    開封第一講書人閱讀 167,990評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵聋涨,是天一觀的道長(zhǎng)晾浴。 經(jīng)常有香客問(wèn)我,道長(zhǎng)牍白,這世上最難降的妖魔是什么脊凰? 我笑而不...
    開封第一講書人閱讀 59,618評(píng)論 1 296
  • 正文 為了忘掉前任,我火速辦了婚禮茂腥,結(jié)果婚禮上狸涌,老公的妹妹穿的比我還像新娘。我一直安慰自己最岗,他們只是感情好帕胆,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,618評(píng)論 6 397
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著般渡,像睡著了一般懒豹。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上驯用,一...
    開封第一講書人閱讀 52,246評(píng)論 1 308
  • 那天脸秽,我揣著相機(jī)與錄音,去河邊找鬼蝴乔。 笑死记餐,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的薇正。 我是一名探鬼主播剥扣,決...
    沈念sama閱讀 40,819評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼巩剖,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了钠怯?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,725評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤曙聂,失蹤者是張志新(化名)和其女友劉穎晦炊,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體宁脊,經(jīng)...
    沈念sama閱讀 46,268評(píng)論 1 320
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡断国,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,356評(píng)論 3 340
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了榆苞。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片稳衬。...
    茶點(diǎn)故事閱讀 40,488評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖坐漏,靈堂內(nèi)的尸體忽然破棺而出薄疚,到底是詐尸還是另有隱情,我是刑警寧澤赊琳,帶...
    沈念sama閱讀 36,181評(píng)論 5 350
  • 正文 年R本政府宣布街夭,位于F島的核電站,受9級(jí)特大地震影響躏筏,放射性物質(zhì)發(fā)生泄漏板丽。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,862評(píng)論 3 333
  • 文/蒙蒙 一趁尼、第九天 我趴在偏房一處隱蔽的房頂上張望埃碱。 院中可真熱鬧,春花似錦酥泞、人聲如沸砚殿。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,331評(píng)論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)瓮具。三九已至,卻和暖如春凡人,著一層夾襖步出監(jiān)牢的瞬間名党,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,445評(píng)論 1 272
  • 我被黑心中介騙來(lái)泰國(guó)打工挠轴, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留传睹,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,897評(píng)論 3 376
  • 正文 我出身青樓岸晦,卻偏偏與公主長(zhǎng)得像欧啤,于是被迫代替她去往敵國(guó)和親睛藻。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,500評(píng)論 2 359

推薦閱讀更多精彩內(nèi)容