十一哼绑、Java中的并發(fā)工具類

轉(zhuǎn)《Java并發(fā)編程的藝術-第8章》

1.等待多線程完成的CountDownLatch

JDk1.5提供了一個非常有用的包,Concurrent包倒淫,這個包主要用來操作一些并發(fā)操作伙菊,提供一些并發(fā)類,可以方便在項目當中傻瓜式應用敌土。

JDK1.5以前镜硕,使用并發(fā)操作,都是通過Thread返干,Runnable來操作多線程兴枯;但是在JDK1.5之后,提供了非常方便的線程池(ThreadExecutorPool)矩欠,主要代碼由大牛Doug Lea完成财剖,其實是在jdk1.4時代,由于java語言內(nèi)置對多線程編程的支持比較基礎和有限癌淮,所以他寫了這個躺坟,因為實在太過于優(yōu)秀,所以被加入到jdk之中乳蓄;

這次主要對CountDownLatch進行系統(tǒng)的講解

使用場景:比如對于馬拉松比賽咪橙,進行排名計算,參賽者的排名虚倒,肯定是跑完比賽之后美侦,進行計算得出的,翻譯成Java識別的預發(fā)魂奥,就是N個線程執(zhí)行操作菠剩,主線程等到N個子線程執(zhí)行完畢之后,在繼續(xù)往下執(zhí)行捧弃。

代碼示例

public static void testCountDownLatch(){
        
        int threadCount = 10;
        
        final CountDownLatch latch = new CountDownLatch(threadCount);
        
        for(int i=0; i< threadCount; i++){
             
            new Thread(new Runnable() {
                
                @Override
                public void run() {

                    System.out.println("線程" + Thread.currentThread().getId() + "開始出發(fā)");

                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }

                    System.out.println("線程" + Thread.currentThread().getId() + "已到達終點");

                    latch.countDown();
                }
            }).start();
        }
        
        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("10個線程已經(jīng)執(zhí)行完畢赠叼!開始計算排名");
    }

執(zhí)行結果:

線程10開始出發(fā)
線程13開始出發(fā)
線程12開始出發(fā)
線程11開始出發(fā)
線程14開始出發(fā)
線程15開始出發(fā)
線程16開始出發(fā)
線程17開始出發(fā)
線程18開始出發(fā)
線程19開始出發(fā)
線程14已到達終點
線程15已到達終點
線程13已到達終點
線程12已到達終點
線程10已到達終點
線程11已到達終點
線程16已到達終點
線程17已到達終點
線程18已到達終點
線程19已到達終點
10個線程已經(jīng)執(zhí)行完畢擦囊!開始計算排名

主要方法:

  • void await() //當前線程等待計數(shù)器為0
  • boolean await(long timeout, TimeUnit unit) //與上面的方法不同违霞,它加了一個時間限制嘴办。
  • void countDown() //計數(shù)器減1
  • long getCount() //獲取計數(shù)器的值

實現(xiàn)方式:
它的內(nèi)部有一個輔助的內(nèi)部類:Sync(繼承至AQS)

await() 方法的實現(xiàn):

    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg); //加入到等待隊列中
    }

countDown() 方法的實現(xiàn):
    public void countDown() {
        sync.releaseShared(1);
    }

    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared(); //解鎖
            return true;
        }
        return false;
    }

2.同步屏障CyclicBarrier

CyclicBarrier的字面意思是可循環(huán)使用(Cyclic)的屏障(Barrier)。它要做的事情是买鸽,讓一組線程到達一個屏障(也可以叫同步點)時被阻塞涧郊,直到最后一個線程到達屏障時,屏障才會開門眼五,所有被屏障攔截的線程才會繼續(xù)運行妆艘。

CyclicBarrier簡介

CyclicBarrier默認的構造方法是CyclicBarrier(int parties),其參數(shù)表示屏障攔截的線程數(shù)量看幼,每個線程調(diào)用await方法告訴CyclicBarrier我已經(jīng)到達了屏障批旺,然后當前線程被阻塞。示例代碼如下所示诵姜。

public class CyclicBarrierTest {
    static CyclicBarrier c = new CyclicBarrier(2);

    public static void main(String[] args) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    c.await();
                } catch (Exception e) {
                    e.printStackTrace();
                } 
                System.out.println(1);
            }
        }).start();
        try {
            c.await();
        } catch (Exception e) {
            e.printStackTrace();
        }  
        System.out.println(2);
    }
}

因為主線程和子線程的調(diào)度是由CPU決定的汽煮,兩個線程都有可能先執(zhí)行,所以會產(chǎn)生兩種輸出棚唆,第一種可能輸出如下暇赤。

1
2

第二種可能輸出如下。

2
1

如果把new CyclicBarrier(2)修改成new CyclicBarrier(3)宵凌,則主線程和子線程會永遠等待鞋囊,因為沒有第三個線程執(zhí)行await方法,即沒有第三個線程到達屏障瞎惫,所以之前到達屏障的兩個線程都不會繼續(xù)執(zhí)行溜腐。

CyclicBarrier還提供一個更高級的構造函數(shù)CyclicBarrier(int parties,Runnable barrier-Action)瓜喇,用于在線程到達屏障時逗扒,優(yōu)先執(zhí)行barrierAction,方便處理更復雜的業(yè)務場景

public class CyclicBarrierTest2 {
    static CyclicBarrier c = new CyclicBarrier(2,new A());

    public static void main(String[] args) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    c.await();
                } catch (Exception e) {
                }
                System.out.println(1);
            }
        }).start();
        try {
            c.await();
        } catch (Exception e) {
        }
        System.out.println(2);
    }

    static class A implements Runnable {
        @Override
        public void run() {
            System.out.println(3);
        }
    }
}

因為CyclicBarrier設置了攔截線程的數(shù)量是2欠橘,所以必須等代碼中的第一個線程和線程A都執(zhí)行完之后矩肩,才會繼續(xù)執(zhí)行主線程,然后輸出2肃续,所以代碼執(zhí)行后的輸出如下黍檩。

3
1
2

CyclicBarrier可以用于多線程計算數(shù)據(jù),最后合并計算結果的場景始锚。例如刽酱,用一個Excel保存了用戶所有銀行流水,每個Sheet保存一個賬戶近一年的每筆銀行流水瞧捌,現(xiàn)在需要統(tǒng)計用戶的日均銀行流水棵里,先用多線程處理每個sheet里的銀行流水润文,都執(zhí)行完之后,得到每個sheet的日均銀行流水殿怜,最后典蝌,再用barrierAction用這些線程的計算結果,計算出整個Excel的日均銀行流水头谜,如代碼清單8-5所示骏掀。

public class BankWaterService implements Runnable{

    /**
     * 創(chuàng)建4個屏障,處理完之后執(zhí)行當前類的run方法
     */
    private CyclicBarrier c = new CyclicBarrier(4, this);
    /**
     * 假設只有4個sheet柱告,所以只啟動4個線程
     */
    private Executor executor = Executors.newFixedThreadPool(4);
    /**
     * 保存每個sheet計算出的銀流結果
     */
    private ConcurrentHashMap<String, Integer>sheetBankWaterCount = new ConcurrentHashMap<String, Integer>();
    private void count(){
        for (int i = 0; i < 4; i++) {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    // 計算當前sheet的銀流數(shù)據(jù)截驮,計算代碼省略
                    sheetBankWaterCount.put(Thread.currentThread().getName(), 1);
                    // 銀流計算完成,插入一個屏障
                    try {
                        c.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    }

    @Override
    public void run() {
        int result = 0;
        for (Map.Entry<String, Integer> sheet : sheetBankWaterCount.entrySet()) {
            result += sheet.getValue();
        }
        sheetBankWaterCount.put("result", result);
        System.out.println(result);
    }

    public static void main(String[] args) {
        BankWaterService bankWaterService = new BankWaterService();
        bankWaterService.count();
    }
}

使用線程池創(chuàng)建4個線程际度,分別計算每個sheet里的數(shù)據(jù)葵袭,每個sheet計算結果是1,再由BankWaterService線程匯總4個sheet計算出的結果乖菱,輸出結果如下坡锡。

4

3.CyclicBarrier和CountDownLatch的區(qū)別

應用場景區(qū)別

CountDownLatch : 一個線程(或者多個), 等待另外N個線程完成某個事情之后才能執(zhí)行块请。
CyclicBarrier : N個線程相互等待娜氏,任何一個線程完成之前,所有的線程都必須等待墩新。

這樣應該就清楚一點了贸弥,對于CountDownLatch來說,重點是那個“一個線程”, 是它在等待海渊, 而另外那N的線程在把“某個事情”做完之后可以繼續(xù)等待绵疲,可以終止。

而對于CyclicBarrier來說臣疑,重點是那N個線程盔憨,他們之間任何一個沒有完成,所有的線程都必須等待讯沈。

CountDownLatch 是計數(shù)器, 線程完成一個就記一個, 就像 報數(shù)一樣, 只不過是遞減的.

而CyclicBarrier更像一個水閘, 線程執(zhí)行就想水流, 在水閘處都會堵住, 等到水滿(線程到齊)了, 才開始泄流.

使用區(qū)別

CountDownLatch的計數(shù)器只能使用一次郁岩,而CyclicBarrier的計數(shù)器可以使用reset()方法重置。所以CyclicBarrier能處理更為復雜的業(yè)務場景缺狠。例如问慎,如果計算發(fā)生錯誤,可以重置計數(shù)器挤茄,并讓線程重新執(zhí)行一次如叼。

CyclicBarrier還提供其他有用的方法,比如getNumberWaiting方法可以獲得Cyclic-Barrier阻塞的線程數(shù)量穷劈。isBroken()方法用來了解阻塞的線程是否被中斷

4.控制并發(fā)線程數(shù)的Semaphore

Semaphore(信號量)是用來控制同時訪問特定資源的線程數(shù)量笼恰,它通過協(xié)調(diào)各個線程踊沸,以保證合理的使用公共資源。

應用場景

Semaphore可以用于做流量控制社证,特別是公用資源有限的應用場景逼龟,比如數(shù)據(jù)庫連接。假如有一個需求猴仑,要讀取幾萬個文件的數(shù)據(jù)审轮,因為都是IO密集型任務肥哎,我們可以啟動幾十個線程并發(fā)地讀取辽俗,但是如果讀到內(nèi)存后,還需要存儲到數(shù)據(jù)庫中篡诽,而數(shù)據(jù)庫的連接數(shù)只有10個崖飘,這時我們必須控制只有10個線程同時獲取數(shù)據(jù)庫連接保存數(shù)據(jù),否則會報錯無法獲取數(shù)據(jù)庫連接杈女。這個時候朱浴,就可以使用Semaphore來做流量控制

public class SemaphoreTest {
private static final int THREAD_COUNT = 30;
private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT);

private static Semaphore s = new Semaphore(10);

public static void main(String[] args) {
    for (int i = 0; i <THREAD_COUNT ; i++) {
        threadPool.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    s.acquire();
                    System.out.println("save data");
                    s.release();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                
            }
        });
    }
    threadPool.shutdown();
}

}

在代碼中,雖然有30個線程在執(zhí)行达椰,但是只允許10個并發(fā)執(zhí)行翰蠢。Semaphore的構造方法Semaphore(int permits)接受一個整型的數(shù)字,表示可用的許可證數(shù)量啰劲。Semaphore(10)表示允許10個線程獲取許可證梁沧,也就是最大并發(fā)數(shù)是10。Semaphore的用法也很簡單蝇裤,首先線程使用Semaphore的acquire()方法獲取一個許可證廷支,使用完之后調(diào)用release()方法歸還許可證。還可以用tryAcquire()方法嘗試獲取許可證栓辜。

Semaphore還提供一些其他方法恋拍,具體如下。

  • intavailablePermits():返回此信號量中當前可用的許可證數(shù)藕甩。
  • intgetQueueLength():返回正在等待獲取許可證的線程數(shù)施敢。
  • booleanhasQueuedThreads():是否有線程正在等待獲取許可證。
  • void reducePermits(int reduction):減少reduction個許可證狭莱,是個protected方法僵娃。
  • Collection getQueuedThreads():返回所有等待獲取許可證的線程集合,是個protected方法贩毕。
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末悯许,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子辉阶,更是在濱河造成了極大的恐慌先壕,老刑警劉巖瘩扼,帶你破解...
    沈念sama閱讀 221,273評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異垃僚,居然都是意外死亡集绰,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,349評論 3 398
  • 文/潘曉璐 我一進店門谆棺,熙熙樓的掌柜王于貴愁眉苦臉地迎上來栽燕,“玉大人,你說我怎么就攤上這事改淑“恚” “怎么了?”我有些...
    開封第一講書人閱讀 167,709評論 0 360
  • 文/不壞的土叔 我叫張陵朵夏,是天一觀的道長蔼啦。 經(jīng)常有香客問我,道長仰猖,這世上最難降的妖魔是什么捏肢? 我笑而不...
    開封第一講書人閱讀 59,520評論 1 296
  • 正文 為了忘掉前任,我火速辦了婚禮饥侵,結果婚禮上鸵赫,老公的妹妹穿的比我還像新娘。我一直安慰自己躏升,他們只是感情好辩棒,可當我...
    茶點故事閱讀 68,515評論 6 397
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著煮甥,像睡著了一般盗温。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上成肘,一...
    開封第一講書人閱讀 52,158評論 1 308
  • 那天卖局,我揣著相機與錄音,去河邊找鬼双霍。 笑死砚偶,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的洒闸。 我是一名探鬼主播染坯,決...
    沈念sama閱讀 40,755評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼丘逸!你這毒婦竟也來了单鹿?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 39,660評論 0 276
  • 序言:老撾萬榮一對情侶失蹤深纲,失蹤者是張志新(化名)和其女友劉穎仲锄,沒想到半個月后劲妙,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,203評論 1 319
  • 正文 獨居荒郊野嶺守林人離奇死亡儒喊,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,287評論 3 340
  • 正文 我和宋清朗相戀三年镣奋,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片怀愧。...
    茶點故事閱讀 40,427評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡侨颈,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出芯义,到底是詐尸還是另有隱情哈垢,我是刑警寧澤,帶...
    沈念sama閱讀 36,122評論 5 349
  • 正文 年R本政府宣布毕贼,位于F島的核電站温赔,受9級特大地震影響蛤奢,放射性物質(zhì)發(fā)生泄漏鬼癣。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,801評論 3 333
  • 文/蒙蒙 一啤贩、第九天 我趴在偏房一處隱蔽的房頂上張望待秃。 院中可真熱鬧,春花似錦痹屹、人聲如沸章郁。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,272評論 0 23
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽暖庄。三九已至,卻和暖如春楼肪,著一層夾襖步出監(jiān)牢的瞬間培廓,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,393評論 1 272
  • 我被黑心中介騙來泰國打工春叫, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留肩钠,地道東北人。 一個月前我還...
    沈念sama閱讀 48,808評論 3 376
  • 正文 我出身青樓暂殖,卻偏偏與公主長得像价匠,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子呛每,可洞房花燭夜當晚...
    茶點故事閱讀 45,440評論 2 359

推薦閱讀更多精彩內(nèi)容