在JDK的并發(fā)包里提供了幾個非常有用的并發(fā)工具類。CountDownLatch壹无、CyclicBarrier和Semaphore工具類提供了一種并發(fā)流程控制的手段葱绒,Exchanger工具類則提供了在線程間交換數(shù)據(jù)的一種手段。
等待多線程完成的CountDownLatch(閉鎖)
假如有這樣一個需求:我們需要解析一個Excel里多個sheet的數(shù)據(jù)斗锭,此時可以考慮使用多線程地淀,每個線程解析一個sheet里的數(shù)據(jù),等到所有的sheet都解析完之后岖是,程序需要提示解析完成帮毁。在這個需求中实苞,要實現(xiàn)主線程等待所有線程完成sheet的解析操作,最簡單的做法是使用join()方法:
public class JoinCountDownLatchTest {
public static void main(String[] args) throws InterruptedException {
Thread parser1 = new Thread(new Runnable() {
@Override
public void run() {
}
});
Thread parser2 = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("parser2 finish");
}
});
parser1.start();
parser2.start();
parser1.join();
parser2.join();
System.out.println("all parser finish");
}
}
join用于讓當前執(zhí)行線程等待join線程執(zhí)行結束烈疚。其實現(xiàn)原理是不停檢查join線程是否存活黔牵,如果join線程存活則讓當前線程永遠等待。直到join線程中止后爷肝,線程的this.notifyAll()方法會被調(diào)用猾浦,調(diào)用notifyAll()方法是在JVM里實現(xiàn)的,所以在JDK里看不到灯抛,大家可以查看JVM源碼金赦。
在JDK 1.5之后的并發(fā)包中提供的CountDownLatch也可以實現(xiàn)join的功能,并且比join的功能更多:
public class CountDownLatchTest {
staticCountDownLatch c = new CountDownLatch(2);
public static void main(String[] args) throws InterruptedException {
new Thread(new Runnable() {
@Override
public void run() {
System.out.println(1);
c.countDown();
System.out.println(2);
c.countDown();
}
}).start();
c.await();
System.out.println("3");
}
}
CountDownLatch的構造函數(shù)接收一個int類型的參數(shù)作為計數(shù)器对嚼,如果你想等待N個點完成夹抗,這里就傳入N。
當我們調(diào)用CountDownLatch的countDown方法時猪半,N就會減1兔朦,CountDownLatch的await方法會阻塞當前線程,直到N變成零磨确。由于countDown方法可以用在任何地方沽甥,所以這里說的N個點,可以是N個線程乏奥,也可以是1個線程里的N個執(zhí)行步驟摆舟。用在多個線程時,只需要把這個CountDownLatch的引用傳遞到線程里即可邓了。
如果有某個解析sheet的線程處理得比較慢恨诱,我們不可能讓主線程一直等待,所以可以使用另外一個帶指定時間的await方法——await(long time骗炉,TimeUnit unit)照宝,這個方法等待特定時間后,就會不再阻塞當前線程句葵。join也有類似的方法厕鹃。
注意:計數(shù)器必須大于等于0,只是等于0時候乍丈,計數(shù)器就是零剂碴,調(diào)用await方法時不會阻塞當前線程。CountDownLatch不可能重新初始化或者修改CountDownLatch對象的內(nèi)部計數(shù)器的值轻专。一個線程調(diào)用countDown方法happens-before另外一個線程調(diào)用await方法忆矛。
補充博客:CountDownLatch的簡單理解
同步屏障CyclicBarrier(柵欄)
CyclicBarrier的字面意思是可循環(huán)使用(Cyclic)的屏障(Barrier)。它要做的事情是请垛,讓一組線程到達一個屏障(也可以叫同步點)時被阻塞催训,直到最后一個線程到達屏障時洽议,屏障才會開門,所有被屏障攔截的線程才會繼續(xù)運行漫拭。
CyclicBarrier簡介
CyclicBarrier默認的構造方法是CyclicBarrier(int parties)绞铃,其參數(shù)表示屏障攔截的線程數(shù)量捌年,每個線程調(diào)用await方法告訴CyclicBarrier我已經(jīng)到達了屏障显蝌,然后當前線程被阻塞。示例代碼如下:
因為主線程和子線程的調(diào)度是由CPU決定的挑宠,兩個線程都有可能先執(zhí)行,所以會產(chǎn)生兩種輸出:
或
如果把new CyclicBarrier(2)修改成new CyclicBarrier(3)颓影,則主線程和子線程會永遠等待各淀,因為沒有第三個線程執(zhí)行await方法,即沒有第三個線程到達屏障诡挂,所以之前到達屏障的兩個線程都不會繼續(xù)執(zhí)行碎浇。
CyclicBarrier還提供一個更高級的構造函數(shù)CyclicBarrier(int parties,Runnable barrierAction)璃俗,barrierAction將在最后一個到達屏障的線程被優(yōu)先執(zhí)行奴璃,并且執(zhí)行完了之后該線程會繼續(xù)從自己當初await的地方再次開始執(zhí)行。
示例見:CyclicBarrier(int parties, Runnable barrierAction) 詳解
CyclicBarrier的應用場景
CyclicBarrier可以用于多線程計算數(shù)據(jù)城豁,最后合并計算結果的場景苟穆。例如,用一個Excel保存了用戶所有銀行流水唱星,每個Sheet保存一個賬戶近一年的每筆銀行流水雳旅,現(xiàn)在需要統(tǒng)計用戶的日均銀行流水,先用多線程處理每個sheet里的銀行流水间聊,都執(zhí)行完之后攒盈,得到每個sheet的日均銀行流水,最后哎榴,再用barrierAction用這些線程的計算結果型豁,計算出整個Excel的日均銀行流水,如下所示:
使用線程池創(chuàng)建4個線程叹话,分別計算每個sheet里的數(shù)據(jù)偷遗,每個sheet計算結果是1,再由BankWaterService線程匯總4個sheet計算出的結果驼壶,輸出結果如下:
CyclicBarrier和CountDownLatch的區(qū)別
1氏豌、CountDownLatch用于等待事件發(fā)生, CyclicBarrier用于等待線程到來热凹。
2泵喘、CountDownLatch的計數(shù)器只能使用一次泪电,而CyclicBarrier的計數(shù)器可以使用reset()方法重置。所以CyclicBarrier能處理更為復雜的業(yè)務場景纪铺。例如相速,如果計算發(fā)生錯誤,可以重置計數(shù)器鲜锚,并讓線程重新執(zhí)行一次突诬。
3、CyclicBarrier還提供其他有用的方法芜繁,比如getNumberWaiting方法可以獲得CyclicBarrier阻塞的線程數(shù)量旺隙。isBroken()方法用來了解阻塞的線程是否被中斷。
使用isBroken()方法例子如下:
結果:
控制并發(fā)線程數(shù)的Semaphore(信號量)
應用場景
Semaphore可以用于做流量控制骏令,特別是公用資源有限的應用場景蔬捷,比如數(shù)據(jù)庫連接。假如有一個需求榔袋,要讀取幾萬個文件的數(shù)據(jù)周拐,因為都是IO密集型任務,我們可以啟動幾十個線程并發(fā)地讀取凰兑,但是如果讀到內(nèi)存后妥粟,還需要存儲到數(shù)據(jù)庫中,而數(shù)據(jù)庫的連接數(shù)只有10個聪黎,這時我們必須控制只有10個線程同時獲取數(shù)據(jù)庫連接保存數(shù)據(jù)罕容,否則會報錯無法獲取數(shù)據(jù)庫連接。這個時候稿饰,就可以使用Semaphore來做流量控制:
在代碼中锦秒,雖然有30個線程在執(zhí)行,但是只允許10個并發(fā)執(zhí)行喉镰。Semaphore的構造方法Semaphore(int permits)接受一個整型的數(shù)字旅择,表示可用的許可證數(shù)量。Semaphore(10)表示允許10個線程獲取許可證侣姆,也就是最大并發(fā)數(shù)是10生真。Semaphore的用法也很簡單,首先線程使用Semaphore的acquire()方法獲取一個許可證捺宗,使用完之后調(diào)用release()方法歸還許可證柱蟀。還可以用tryAcquire()方法嘗試獲取許可證。
補充博客:Java 并發(fā)專題:Semaphore實現(xiàn)互斥與連接池
其他方法
Semaphore還提供一些其他方法蚜厉,具體如下长已。
- int availablePermits():返回此信號量中當前可用的許可證數(shù)。
- int getQueueLength():返回正在等待獲取許可證的線程數(shù)。
- boolean hasQueuedThreads():是否有線程正在等待獲取許可證术瓮。
- void reducePermits(int reduction):減少reduction個許可證康聂,是個protected方法。
- Collection getQueuedThreads():返回所有等待獲取許可證的線程集合胞四,是個protected方法恬汁。
線程間交換數(shù)據(jù)的Exchanger
Exchanger(交換者)是一個用于線程間協(xié)作的工具類。Exchanger用于進行線程間的數(shù)據(jù)交換辜伟。它提供一個同步點氓侧,在這個同步點,兩個線程可以交換彼此的數(shù)據(jù)导狡。這兩個線程通過exchange方法交換數(shù)據(jù)甘苍,如果第一個線程先執(zhí)行exchange()方法,它會一直等待第二個線程也執(zhí)行exchange方法烘豌,當兩個線程都到達同步點時,這兩個線程就可以交換數(shù)據(jù)看彼,將本線程生產(chǎn)出來的數(shù)據(jù)傳遞給對方廊佩。
Exchanger可以用于遺傳算法,遺傳算法里需要選出兩個人作為交配對象靖榕,這時候會交換兩人的數(shù)據(jù)标锄,并使用交叉規(guī)則得出2個交配結果。代碼如下:
package com.thread;
import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ExchangerTest {
public static void main(String[] args) {
ExecutorService service = Executors.newCachedThreadPool();
final Exchanger exchanger = new Exchanger();
service.execute(new Runnable(){
public void run() {
try {
Thread.sleep((long)(Math.random()*10000));
String data1 = "a";
System.out.println("線程" + Thread.currentThread().getName() +
"正在把數(shù)據(jù)" + data1 +"換出去");
String data2 = (String)exchanger.exchange(data1);
System.out.println("線程" + Thread.currentThread().getName() +
"換回的數(shù)據(jù)為" + data2);
}catch(Exception e){
}
}
});
service.execute(new Runnable(){
public void run() {
try {
Thread.sleep((long)(Math.random()*10000));
String data1 = "b";
System.out.println("線程" + Thread.currentThread().getName() +
"正在把數(shù)據(jù)" + data1 +"換出去");
String data2 = (String)exchanger.exchange(data1);
System.out.println("線程" + Thread.currentThread().getName() +
"換回的數(shù)據(jù)為" + data2);
}catch(Exception e){
}
}
});
}
}
運行結果:
線程pool-1-thread-1正在把數(shù)據(jù)a換出去
線程pool-1-thread-2正在把數(shù)據(jù)b換出去
線程pool-1-thread-1換回的數(shù)據(jù)為b
線程pool-1-thread-2換回的數(shù)據(jù)為a
Exchanger也可以用于校對工作茁计,比如我們需要將紙制銀行流水通過人工的方式錄入成電子銀行流水料皇,為了避免錯誤,采用AB崗兩人進行錄入星压,錄入到Excel之后践剂,系統(tǒng)需要加載這兩個Excel,并對兩個Excel數(shù)據(jù)進行校對娜膘,看看是否錄入一致逊脯。