java.util.concurrent(J.U.C)大大提高了并發(fā)性能捐腿,AQS 被認為是 J.U.C 的核心课锌。?????
在多線程程序設計中厨内,經(jīng)常會遇到一個線程等待一個或多個線程的場景,遇到這樣的場景應該如何解決渺贤?
?? 如果是一個線程等待一個線程雏胃,則可以通過await()和notify()來實現(xiàn);
?? 如果是一個線程等待多個線程志鞍,則就可以使用CountDownLatch和CyclicBarrier來實現(xiàn)比較好的控制瞭亮。
CountDownLatch:一個線程(或者多個), 等待另外N個線程完成某個事情之后才能執(zhí)行固棚。?
CyclicBarrier: N個線程相互等待统翩,任何一個線程完成之前仙蚜,所有的線程都必須等待。
這樣應該就清楚一點了厂汗,對于CountDownLatch來說委粉,重點是那個“一個線程”, 是它在等待,而另外那N的線程在把“某個事情”做完之后可以繼續(xù)等待娶桦,可以終止贾节。而對于CyclicBarrier來說,重點是那N個線程衷畦,他們之間任何一個沒有完成栗涂,所有的線程都必須等待。? ?
CountDownLatch是計數(shù)器祈争,線程完成一個就記一個斤程,就像報數(shù)一樣,只不過是遞減的菩混。而CyclicBarrier更像一個水閘暖释,線程執(zhí)行就像水流,在水閘處都會堵住, 等到水滿(線程到齊)了墨吓,才開始泄流。
1纹磺、CountDownLatch
用來控制一個線程等待多個線程帖烘。
維護了一個計數(shù)器 cnt,每次調(diào)用 countDown() 方法會讓計數(shù)器的值減 1橄杨,減到 0 的時候秘症,那些因為調(diào)用 await() 方法而在等待的線程就會被喚醒。??? ??
示例:
public class CountDownLatchTest {
?????? static CountDownLatch latch = new CountDownLatch(2);
? ? ? ?public static void main(String[] args) throws InterruptedException {
?????????????????????? newThread(new Runnable() {
?????????????????????? ??? @Override
?????????????????????? ??? public void run() {
????????????????????????????????????????????? System.out.println(1);
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? latch.countDown();
????????????????????????????????????????????? System.out.println(2);
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? latch.countDown();
?????????????????????? ???? }
?????????????????????? }).start();
? ? ? ? ? //等待到上面兩個操作結(jié)束式矫,latch減少到0時才能通過
? ? ? ? ? latch.await();
????????? System.out.println(3);
???? }
}
1
2
3
public class CountdownLatchExample {
??? public static voidmain(String[] args) throws InterruptedException {
??????? final inttotalThread = 10;
??????? CountDownLatch countDownLatch = new CountDownLatch(totalThread);
??????? ExecutorService executorService = Executors.newCachedThreadPool();
??????? for (int i = 0; i< totalThread; i++) {
???????????executorService.execute(() -> {
???????????????System.out.print("run..");
???????????????countDownLatch.countDown();
????????? ??});
??????? }
???????countDownLatch.await();
???????System.out.println("end");
???????executorService.shutdown();
??? }
}
run..run..run..run..run..run..run..run..run..run..end
CountDownLatch的構(gòu)造函數(shù)接收一個int類型的參數(shù)作為計數(shù)器乡摹,如果你想等待N個點完成,這里就傳入N采转。
當我們調(diào)用一次CountDownLatch的countDown方法時聪廉,N就會減1,CountDownLatch的await會阻塞當前線程故慈,直到N變成零板熊。由于countDown方法可以用在任何地方,所以這里說的N個點察绷,可以是N個線程干签,也可以是1個線程里的N個執(zhí)行步驟。用在多個線程時拆撼,你只需要把這個CountDownLatch的引用傳遞到線程里容劳。
2喘沿、CyclicBarrier
用來控制多個線程互相等待,只有當多個線程都到達時竭贩,這些線程才會繼續(xù)執(zhí)行蚜印。
和 CountdownLatch 相似,都是通過維護計數(shù)器來實現(xiàn)的娶视。線程執(zhí)行 await() 方法之后計數(shù)器會減 1晒哄,并進行等待,直到計數(shù)器為 0肪获,所有調(diào)用 await() 方法而在等待的線程才能繼續(xù)執(zhí)行寝凌。
CyclicBarrier 和 CountdownLatch的一個區(qū)別是,CyclicBarrier 的計數(shù)器通過調(diào)用 reset() 方法可以循環(huán)使用孝赫,所以它才叫做循環(huán)屏障较木。
CyclicBarrier 有兩個構(gòu)造函數(shù),其中 parties 指示計數(shù)器的初始值青柄,barrierAction 在所有線程都到達屏障的時候會執(zhí)行一次伐债。
public CyclicBarrier(int parties, Runnable barrierAction) {
??? if (parties <= 0)throw new IllegalArgumentException();
??? this.parties = parties;
??? this.count = parties;
??? this.barrierCommand =barrierAction;
}
public CyclicBarrier(int parties) {
??? this(parties, null);
}
示例代碼:
public class CyclicBarrierTest {
?????????? staticCyclicBarrier c = new CyclicBarrier(2);
? ? ? ? ? ?public static void main(String[] args) {
?????????????????????? newThread(new Runnable() {
? ? ? ? ? ? ? ? ? ? ? ?@Override
?????????????????????? publicvoid run() {
?????????????????????? ??????? try {
?????????????????????? ????????????? c.await();
?????????????????????? ??????? } catch (Exception e) {
???????????????????? ?????????}
?????????????????????? System.out.println(1);
?????????? ????????}
????????? }).start();
?????????? try {
??? ??????????????????c.await();
?????????? ?} catch (Exception e) {
? ? ? ? ? ? ? }
????????????? System.out.println(2);
???????? }
}
輸出
1 | 2
2 | 1
或者輸出
1 | 1
2 | 2
如果把new CyclicBarrier(2)修改成new CyclicBarrier(3)則主線程和子線程會永遠等待,因為沒有第三個線程執(zhí)行await方法致开,即沒有第三個線程到達屏障峰锁,所以之前到達屏障的兩個線程都不會繼續(xù)執(zhí)行。
CyclicBarrier還提供一個更高級的構(gòu)造函數(shù)CyclicBarrier(int parties, Runnable barrierAction)双戳,用于在線程到達屏障時虹蒋,優(yōu)先執(zhí)行barrierAction,方便處理更復雜的業(yè)務場景飒货。代碼如下:
public class CyclicBarrierTest2 {
?????????? staticCyclicBarrier c = new CyclicBarrier(2, new A());
?????????? public static void main(String[] args) {
?????????????????????? newThread(new Runnable() {
????????????????????????? ????@Override
?????????????????????? ???????? public void run() {
????????????????????????????????????????????? try{
?????????????????????? ??????????????????????????? c.await();
????????????????????????????????????????????? }catch (Exception e) {
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? }
????????????????????????????????????????????? System.out.println(1);
?????????????????????????????}
?????????????????????? }).start();
?????????????????????? try {
?????????????????????? ??????? c.await();
?????????????????????? }catch (Exception e) {
? ? ? ? ? ? ? ? ? ? ? }
?????????????????????System.out.println(2);
????????? }
??????????? static class A implements Runnable {
?????????????????????? @Override
?????????????????????? public void run() {
???????????????????????????????System.out.println(3);
?????????????????????? }
??????????? }
}
輸出
1 | 3
2 | 1
3 | 2
比較:
??? 1)CountDownLatch是把主干線程掛起魄衅,在任務線程中進行倒數(shù)計數(shù),直到任務線程執(zhí)行完才喚醒主干線程繼續(xù)執(zhí)行塘辅;
?????? CyclicBarrier是把任務線程掛起晃虫,直到所有任務線程執(zhí)行到屏障處再放行繼續(xù)執(zhí)行;
??? 2)CountDownLatch達到屏障放行標準后放行的是主干線程扣墩;
?????? CyclicBarrier達到屏障放行標準后放行的是任務線程哲银,并且還會額外地觸發(fā)一個達到標準后執(zhí)行的響應線程;
3呻惕、Semaphore
Semaphore(信號量)是用來控制同時訪問特定資源的線程數(shù)量盘榨,它通過協(xié)調(diào)各個線程,以保證合理的使用公共資源蟆融。
很多年以來草巡,我都覺得從字面上很難理解Semaphore所表達的含義,只能把它比作是控制流量的紅綠燈,比如XX馬路要限制流量山憨,只允許同時有一百輛車在這條路上行使查乒,其他的都必須在路口等待,所以前一百輛車會看到綠燈郁竟,可以開進這條馬路玛迄,后面的車會看到紅燈,不能駛?cè)隭X馬路棚亩,但是如果前一百輛中有五輛車已經(jīng)離開了XX馬路蓖议,那么后面就允許有5輛車駛?cè)腭R路,這個例子里說的車就是線程讥蟆,駛?cè)腭R路就表示線程在執(zhí)行勒虾,離開馬路就表示線程執(zhí)行完成,看見紅燈就表示線程被阻塞瘸彤,不能執(zhí)行修然。
下面說一下Semaphore類中比較重要的幾個方法,首先是acquire()质况、release()方法:
public void acquire() throws InterruptedException {? }????//獲取一個許可
public void acquire(int permits) throws InterruptedException {}??? //獲取permits個許可
public void release() { }?????????//釋放一個許可
public void release(int permits) { }??? //釋放permits個許可
acquire()用來獲取一個許可愕宋,若無許可能夠獲得,則會一直等待结榄,直到獲得許可中贝。
release()用來釋放許可。注意臼朗,在釋放許可之前邻寿,必須先獲獲得許可。
這4個方法都會被阻塞依溯,如果想立即得到執(zhí)行結(jié)果,可以使用下面幾個方法:
public boolean tryAcquire() { };???//嘗試獲取一個許可瘟则,若獲取成功黎炉,則立即返回true,若獲取失敗醋拧,則立即返回false
public boolean tryAcquire(long timeout, TimeUnit unit) throwsInterruptedException { };? //嘗試獲取一個許可慷嗜,若在指定的時間內(nèi)獲取成功,則立即返回true丹壕,否則則立即返回false
public boolean tryAcquire(int permits) { }; //嘗試獲取permits個許可庆械,若獲取成功,則立即返回true菌赖,若獲取失敗缭乘,則立即返回false
public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException { }; //嘗試獲取permits個許可,若在指定的時間內(nèi)獲取成功琉用,則立即返回true堕绩,否則則立即返回false
Semaphore可以用于做流量控制策幼,特別公用資源有限的應用場景,比如數(shù)據(jù)庫連接奴紧。假如有一個需求特姐,要讀取幾萬個文件的數(shù)據(jù),因為都是IO密集型任務黍氮,我們可以啟動幾十個線程并發(fā)的讀取唐含,但是如果讀到內(nèi)存后,還需要存儲到數(shù)據(jù)庫中沫浆,而數(shù)據(jù)庫的連接數(shù)只有10個捷枯,這時我們必須控制只有十個線程同時獲取數(shù)據(jù)庫連接保存數(shù)據(jù),否則會報錯無法獲取數(shù)據(jù)庫連接件缸。這個時候铜靶,我們就可以使用Semaphore來做流控,代碼如下:
public class SemaphoreTest {
????????????? private staticfinal int THREAD_COUNT = 30;
? ? ? ? ? ? ? private staticExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT);
? ? ? ? ? ? ? private staticSemaphore s = new Semaphore(10);
????????????? public static void main(String[] args) {
????????????? ?????????? for (int i = 0; i < THREAD_COUNT;i++) {
???????????????????????????????????? threadPool.execute(new Runnable() {
???????????????????????????????????? ??????????? @Override
???????????????????????????????????? ??????????? public void run() {
??????????????????????????????????????????????????????????? try{
??????????????????????????????????????????????????????????? ?????????? s.acquire();??????? ?????????? ???????
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?System.out.println("save data");
??????????????????????????????????????????????????????????? ?????????? s.release();
??????????????????????????????????????????????????????????? }
????????????????????????????????????????????????????????catch (InterruptedException e) {
?????????????????????????????????????????????????????????}
???????????????????????????????????? ???????????? }
???????????????????????????????????? ?});
????????????? ????????? }
????????????? ???????? threadPool.shutdown();
????????????? }
}
在代碼中他炊,雖然有30個線程在執(zhí)行争剿,但是只允許10個并發(fā)的執(zhí)行。Semaphore的構(gòu)造方法Semaphore(int permits) 接受一個整型的數(shù)字痊末,表示可用的許可證數(shù)量蚕苇。Semaphore(10)表示允許10個線程獲取許可證,也就是最大并發(fā)數(shù)是10凿叠。Semaphore的用法也很簡單涩笤,首先線程使用Semaphore的acquire()獲取一個許可證,使用完之后調(diào)用release()歸還許可證盒件。還可以用tryAcquire()方法嘗試獲取許可證蹬碧。
public class SemaphoreExample{
??? public static void main(String[] args) {
??????? final int clientCount = 3;
??????? final int totalRequestCount = 10;
??????? Semaphoresemaphore= newSemaphore(clientCount);
??????? ExecutorServiceexecutorService=Executors.newCachedThreadPool();
??????? for (int i = 0; i < totalRequestCount; i++) {
??????????? executorService.execute(()->{
??????????????? try{
??????????????????? semaphore.acquire();
??????????????????? System.out.print(semaphore.availablePermits() + " ");
??????????????? }catch(InterruptedExceptione) {
??????????????????? e.printStackTrace();
??????????????? }finally{
??????????????????? semaphore.release();
??????????????? }
??????????? });
??????? }
??????? executorService.shutdown();
??? }
}
2 1 2 2 2 2 2 1 2 2
下面對上面說的三個輔助類進行一個總結(jié):
1)CountDownLatch和CyclicBarrier都能夠?qū)崿F(xiàn)線程之間的等待,只不過它們側(cè)重點不同:
? CountDownLatch一般用于某個線程A等待若干個其他線程執(zhí)行完任務之后炒刁,它才執(zhí)行恩沽;
? 而CyclicBarrier一般用于一組線程互相等待至某個狀態(tài),然后這一組線程再同時執(zhí)行翔始;
另外罗心,CountDownLatch是不能夠重用的,而CyclicBarrier是可以重用的城瞎。
2)Semaphore其實和鎖有點類似渤闷,它一般用于控制對某組資源的訪問權(quán)限。