Java多線程(5)-- 協(xié)作之CountDownLatch、CyclicBarrier和Semaphore

java.util.concurrent(J.U.C)大大提高了并發(fā)性能捐腿,AQS 被認為是 J.U.C 的核心课锌。?????

在多線程程序設計中厨内,經(jīng)常會遇到一個線程等待一個或多個線程的場景,遇到這樣的場景應該如何解決渺贤?

?? 如果是一個線程等待一個線程雏胃,則可以通過await()和notify()來實現(xiàn);

?? 如果是一個線程等待多個線程志鞍,則就可以使用CountDownLatch和CyclicBarrier來實現(xiàn)比較好的控制瞭亮。


CountDownLatch:一個線程(或者多個), 等待另外N個線程完成某個事情之后才能執(zhí)行固棚。?

CyclicBarrier: N個線程相互等待统翩,任何一個線程完成之前仙蚜,所有的線程都必須等待。

這樣應該就清楚一點了厂汗,對于CountDownLatch來說委粉,重點是那個“一個線程”, 是它在等待,而另外那N的線程在把“某個事情”做完之后可以繼續(xù)等待娶桦,可以終止贾节。而對于CyclicBarrier來說,重點是那N個線程衷畦,他們之間任何一個沒有完成栗涂,所有的線程都必須等待。? ?

CountDownLatch是計數(shù)器祈争,線程完成一個就記一個斤程,就像報數(shù)一樣,只不過是遞減的菩混。而CyclicBarrier更像一個水閘暖释,線程執(zhí)行就像水流,在水閘處都會堵住, 等到水滿(線程到齊)了墨吓,才開始泄流。


1纹磺、CountDownLatch

用來控制一個線程等待多個線程帖烘。

維護了一個計數(shù)器 cnt,每次調(diào)用 countDown() 方法會讓計數(shù)器的值減 1橄杨,減到 0 的時候秘症,那些因為調(diào)用 await() 方法而在等待的線程就會被喚醒。??? ??

示例:

public class CountDownLatchTest {

?????? static CountDownLatch latch = new CountDownLatch(2);

? ? ? ?public static void main(String[] args) throws InterruptedException {

?????????????????????? newThread(new Runnable() {

?????????????????????? ??? @Override

?????????????????????? ??? public void run() {

????????????????????????????????????????????? System.out.println(1);

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? latch.countDown();

????????????????????????????????????????????? System.out.println(2);

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? latch.countDown();

?????????????????????? ???? }

?????????????????????? }).start();

? ? ? ? ? //等待到上面兩個操作結(jié)束式矫,latch減少到0時才能通過

? ? ? ? ? latch.await();

????????? System.out.println(3);

???? }

}

1

2

3


public class CountdownLatchExample {

??? public static voidmain(String[] args) throws InterruptedException {

??????? final inttotalThread = 10;

??????? CountDownLatch countDownLatch = new CountDownLatch(totalThread);

??????? ExecutorService executorService = Executors.newCachedThreadPool();

??????? for (int i = 0; i< totalThread; i++) {

???????????executorService.execute(() -> {

???????????????System.out.print("run..");

???????????????countDownLatch.countDown();

????????? ??});

??????? }

???????countDownLatch.await();

???????System.out.println("end");

???????executorService.shutdown();

??? }

}

run..run..run..run..run..run..run..run..run..run..end


CountDownLatch的構(gòu)造函數(shù)接收一個int類型的參數(shù)作為計數(shù)器乡摹,如果你想等待N個點完成,這里就傳入N采转。

當我們調(diào)用一次CountDownLatch的countDown方法時聪廉,N就會減1,CountDownLatch的await會阻塞當前線程故慈,直到N變成零板熊。由于countDown方法可以用在任何地方,所以這里說的N個點察绷,可以是N個線程干签,也可以是1個線程里的N個執(zhí)行步驟。用在多個線程時拆撼,你只需要把這個CountDownLatch的引用傳遞到線程里容劳。


2喘沿、CyclicBarrier

用來控制多個線程互相等待,只有當多個線程都到達時竭贩,這些線程才會繼續(xù)執(zhí)行蚜印。

和 CountdownLatch 相似,都是通過維護計數(shù)器來實現(xiàn)的娶视。線程執(zhí)行 await() 方法之后計數(shù)器會減 1晒哄,并進行等待,直到計數(shù)器為 0肪获,所有調(diào)用 await() 方法而在等待的線程才能繼續(xù)執(zhí)行寝凌。

CyclicBarrier 和 CountdownLatch的一個區(qū)別是,CyclicBarrier 的計數(shù)器通過調(diào)用 reset() 方法可以循環(huán)使用孝赫,所以它才叫做循環(huán)屏障较木。

CyclicBarrier 有兩個構(gòu)造函數(shù),其中 parties 指示計數(shù)器的初始值青柄,barrierAction 在所有線程都到達屏障的時候會執(zhí)行一次伐债。

public CyclicBarrier(int parties, Runnable barrierAction) {

??? if (parties <= 0)throw new IllegalArgumentException();

??? this.parties = parties;

??? this.count = parties;

??? this.barrierCommand =barrierAction;

}


public CyclicBarrier(int parties) {

??? this(parties, null);

}

示例代碼:

public class CyclicBarrierTest {

?????????? staticCyclicBarrier c = new CyclicBarrier(2);

? ? ? ? ? ?public static void main(String[] args) {

?????????????????????? newThread(new Runnable() {

? ? ? ? ? ? ? ? ? ? ? ?@Override

?????????????????????? publicvoid run() {

?????????????????????? ??????? try {

?????????????????????? ????????????? c.await();

?????????????????????? ??????? } catch (Exception e) {

???????????????????? ?????????}

?????????????????????? System.out.println(1);

?????????? ????????}

????????? }).start();


?????????? try {

??? ??????????????????c.await();

?????????? ?} catch (Exception e) {

? ? ? ? ? ? ? }

????????????? System.out.println(2);

???????? }

}

輸出

1 | 2

2 | 1

或者輸出

1 | 1

2 | 2


如果把new CyclicBarrier(2)修改成new CyclicBarrier(3)則主線程和子線程會永遠等待,因為沒有第三個線程執(zhí)行await方法致开,即沒有第三個線程到達屏障峰锁,所以之前到達屏障的兩個線程都不會繼續(xù)執(zhí)行。

CyclicBarrier還提供一個更高級的構(gòu)造函數(shù)CyclicBarrier(int parties, Runnable barrierAction)双戳,用于在線程到達屏障時虹蒋,優(yōu)先執(zhí)行barrierAction,方便處理更復雜的業(yè)務場景飒货。代碼如下:

public class CyclicBarrierTest2 {

?????????? staticCyclicBarrier c = new CyclicBarrier(2, new A());


?????????? public static void main(String[] args) {

?????????????????????? newThread(new Runnable() {

????????????????????????? ????@Override

?????????????????????? ???????? public void run() {

????????????????????????????????????????????? try{

?????????????????????? ??????????????????????????? c.await();

????????????????????????????????????????????? }catch (Exception e) {

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? }

????????????????????????????????????????????? System.out.println(1);

?????????????????????????????}

?????????????????????? }).start();


?????????????????????? try {

?????????????????????? ??????? c.await();

?????????????????????? }catch (Exception e) {

? ? ? ? ? ? ? ? ? ? ? }

?????????????????????System.out.println(2);

????????? }


??????????? static class A implements Runnable {

?????????????????????? @Override

?????????????????????? public void run() {

???????????????????????????????System.out.println(3);

?????????????????????? }

??????????? }

}

輸出

1 | 3

2 | 1

3 | 2


比較:

??? 1)CountDownLatch是把主干線程掛起魄衅,在任務線程中進行倒數(shù)計數(shù),直到任務線程執(zhí)行完才喚醒主干線程繼續(xù)執(zhí)行塘辅;

?????? CyclicBarrier是把任務線程掛起晃虫,直到所有任務線程執(zhí)行到屏障處再放行繼續(xù)執(zhí)行;

??? 2)CountDownLatch達到屏障放行標準后放行的是主干線程扣墩;

?????? CyclicBarrier達到屏障放行標準后放行的是任務線程哲银,并且還會額外地觸發(fā)一個達到標準后執(zhí)行的響應線程;


3呻惕、Semaphore

Semaphore(信號量)是用來控制同時訪問特定資源的線程數(shù)量盘榨,它通過協(xié)調(diào)各個線程,以保證合理的使用公共資源蟆融。

很多年以來草巡,我都覺得從字面上很難理解Semaphore所表達的含義,只能把它比作是控制流量的紅綠燈,比如XX馬路要限制流量山憨,只允許同時有一百輛車在這條路上行使查乒,其他的都必須在路口等待,所以前一百輛車會看到綠燈郁竟,可以開進這條馬路玛迄,后面的車會看到紅燈,不能駛?cè)隭X馬路棚亩,但是如果前一百輛中有五輛車已經(jīng)離開了XX馬路蓖议,那么后面就允許有5輛車駛?cè)腭R路,這個例子里說的車就是線程讥蟆,駛?cè)腭R路就表示線程在執(zhí)行勒虾,離開馬路就表示線程執(zhí)行完成,看見紅燈就表示線程被阻塞瘸彤,不能執(zhí)行修然。


下面說一下Semaphore類中比較重要的幾個方法,首先是acquire()质况、release()方法:

public void acquire() throws InterruptedException {? }????//獲取一個許可

public void acquire(int permits) throws InterruptedException {}??? //獲取permits個許可

public void release() { }?????????//釋放一個許可

public void release(int permits) { }??? //釋放permits個許可

acquire()用來獲取一個許可愕宋,若無許可能夠獲得,則會一直等待结榄,直到獲得許可中贝。

release()用來釋放許可。注意臼朗,在釋放許可之前邻寿,必須先獲獲得許可。

這4個方法都會被阻塞依溯,如果想立即得到執(zhí)行結(jié)果,可以使用下面幾個方法:

public boolean tryAcquire() { };???//嘗試獲取一個許可瘟则,若獲取成功黎炉,則立即返回true,若獲取失敗醋拧,則立即返回false

public boolean tryAcquire(long timeout, TimeUnit unit) throwsInterruptedException { };? //嘗試獲取一個許可慷嗜,若在指定的時間內(nèi)獲取成功,則立即返回true丹壕,否則則立即返回false

public boolean tryAcquire(int permits) { }; //嘗試獲取permits個許可庆械,若獲取成功,則立即返回true菌赖,若獲取失敗缭乘,則立即返回false

public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException { }; //嘗試獲取permits個許可,若在指定的時間內(nèi)獲取成功琉用,則立即返回true堕绩,否則則立即返回false

Semaphore可以用于做流量控制策幼,特別公用資源有限的應用場景,比如數(shù)據(jù)庫連接奴紧。假如有一個需求特姐,要讀取幾萬個文件的數(shù)據(jù),因為都是IO密集型任務黍氮,我們可以啟動幾十個線程并發(fā)的讀取唐含,但是如果讀到內(nèi)存后,還需要存儲到數(shù)據(jù)庫中沫浆,而數(shù)據(jù)庫的連接數(shù)只有10個捷枯,這時我們必須控制只有十個線程同時獲取數(shù)據(jù)庫連接保存數(shù)據(jù),否則會報錯無法獲取數(shù)據(jù)庫連接件缸。這個時候铜靶,我們就可以使用Semaphore來做流控,代碼如下:

public class SemaphoreTest {

????????????? private staticfinal int THREAD_COUNT = 30;

? ? ? ? ? ? ? private staticExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT);

? ? ? ? ? ? ? private staticSemaphore s = new Semaphore(10);


????????????? public static void main(String[] args) {

????????????? ?????????? for (int i = 0; i < THREAD_COUNT;i++) {

???????????????????????????????????? threadPool.execute(new Runnable() {

???????????????????????????????????? ??????????? @Override

???????????????????????????????????? ??????????? public void run() {

??????????????????????????????????????????????????????????? try{

??????????????????????????????????????????????????????????? ?????????? s.acquire();??????? ?????????? ???????

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?System.out.println("save data");

??????????????????????????????????????????????????????????? ?????????? s.release();

??????????????????????????????????????????????????????????? }

????????????????????????????????????????????????????????catch (InterruptedException e) {

?????????????????????????????????????????????????????????}

???????????????????????????????????? ???????????? }

???????????????????????????????????? ?});

????????????? ????????? }

????????????? ???????? threadPool.shutdown();

????????????? }

}

在代碼中他炊,雖然有30個線程在執(zhí)行争剿,但是只允許10個并發(fā)的執(zhí)行。Semaphore的構(gòu)造方法Semaphore(int permits) 接受一個整型的數(shù)字痊末,表示可用的許可證數(shù)量蚕苇。Semaphore(10)表示允許10個線程獲取許可證,也就是最大并發(fā)數(shù)是10凿叠。Semaphore的用法也很簡單涩笤,首先線程使用Semaphore的acquire()獲取一個許可證,使用完之后調(diào)用release()歸還許可證盒件。還可以用tryAcquire()方法嘗試獲取許可證蹬碧。

public class SemaphoreExample{

??? public static void main(String[] args) {

??????? final int clientCount = 3;

??????? final int totalRequestCount = 10;

??????? Semaphoresemaphore= newSemaphore(clientCount);

??????? ExecutorServiceexecutorService=Executors.newCachedThreadPool();

??????? for (int i = 0; i < totalRequestCount; i++) {

??????????? executorService.execute(()->{

??????????????? try{

??????????????????? semaphore.acquire();

??????????????????? System.out.print(semaphore.availablePermits() + " ");

??????????????? }catch(InterruptedExceptione) {

??????????????????? e.printStackTrace();

??????????????? }finally{

??????????????????? semaphore.release();

??????????????? }

??????????? });

??????? }

??????? executorService.shutdown();

??? }

}

2 1 2 2 2 2 2 1 2 2


下面對上面說的三個輔助類進行一個總結(jié):

1)CountDownLatch和CyclicBarrier都能夠?qū)崿F(xiàn)線程之間的等待,只不過它們側(cè)重點不同:

?  CountDownLatch一般用于某個線程A等待若干個其他線程執(zhí)行完任務之后炒刁,它才執(zhí)行恩沽;

? 而CyclicBarrier一般用于一組線程互相等待至某個狀態(tài),然后這一組線程再同時執(zhí)行翔始;

  另外罗心,CountDownLatch是不能夠重用的,而CyclicBarrier是可以重用的城瞎。

2)Semaphore其實和鎖有點類似渤闷,它一般用于控制對某組資源的訪問權(quán)限。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末脖镀,一起剝皮案震驚了整個濱河市飒箭,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌,老刑警劉巖补憾,帶你破解...
    沈念sama閱讀 211,348評論 6 491
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件漫萄,死亡現(xiàn)場離奇詭異,居然都是意外死亡盈匾,警方通過查閱死者的電腦和手機腾务,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,122評論 2 385
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來削饵,“玉大人岩瘦,你說我怎么就攤上這事×耍” “怎么了启昧?”我有些...
    開封第一講書人閱讀 156,936評論 0 347
  • 文/不壞的土叔 我叫張陵,是天一觀的道長劈伴。 經(jīng)常有香客問我密末,道長,這世上最難降的妖魔是什么跛璧? 我笑而不...
    開封第一講書人閱讀 56,427評論 1 283
  • 正文 為了忘掉前任严里,我火速辦了婚禮,結(jié)果婚禮上追城,老公的妹妹穿的比我還像新娘刹碾。我一直安慰自己,他們只是感情好座柱,可當我...
    茶點故事閱讀 65,467評論 6 385
  • 文/花漫 我一把揭開白布迷帜。 她就那樣靜靜地躺著,像睡著了一般色洞。 火紅的嫁衣襯著肌膚如雪戏锹。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,785評論 1 290
  • 那天火诸,我揣著相機與錄音锦针,去河邊找鬼。 笑死惭蹂,一個胖子當著我的面吹牛伞插,可吹牛的內(nèi)容都是我干的割粮。 我是一名探鬼主播盾碗,決...
    沈念sama閱讀 38,931評論 3 406
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼舀瓢!你這毒婦竟也來了廷雅?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,696評論 0 266
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎航缀,沒想到半個月后商架,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,141評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡芥玉,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,483評論 2 327
  • 正文 我和宋清朗相戀三年蛇摸,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片灿巧。...
    茶點故事閱讀 38,625評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡赶袄,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出抠藕,到底是詐尸還是另有隱情饿肺,我是刑警寧澤,帶...
    沈念sama閱讀 34,291評論 4 329
  • 正文 年R本政府宣布盾似,位于F島的核電站敬辣,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏零院。R本人自食惡果不足惜溉跃,卻給世界環(huán)境...
    茶點故事閱讀 39,892評論 3 312
  • 文/蒙蒙 一泌参、第九天 我趴在偏房一處隱蔽的房頂上張望踱承。 院中可真熱鬧浓恶,春花似錦迄薄、人聲如沸嘲更。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,741評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽织阅。三九已至拟蜻,卻和暖如春绎签,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背酝锅。 一陣腳步聲響...
    開封第一講書人閱讀 31,977評論 1 265
  • 我被黑心中介騙來泰國打工诡必, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人搔扁。 一個月前我還...
    沈念sama閱讀 46,324評論 2 360
  • 正文 我出身青樓爸舒,卻偏偏與公主長得像,于是被迫代替她去往敵國和親稿蹲。 傳聞我的和親對象是個殘疾皇子扭勉,可洞房花燭夜當晚...
    茶點故事閱讀 43,492評論 2 348

推薦閱讀更多精彩內(nèi)容