轉(zhuǎn)《Java并發(fā)編程的藝術-第8章》
1.等待多線程完成的CountDownLatch
JDk1.5提供了一個非常有用的包,Concurrent包倒淫,這個包主要用來操作一些并發(fā)操作伙菊,提供一些并發(fā)類,可以方便在項目當中傻瓜式應用敌土。
JDK1.5以前镜硕,使用并發(fā)操作,都是通過Thread返干,Runnable來操作多線程兴枯;但是在JDK1.5之后,提供了非常方便的線程池(ThreadExecutorPool)矩欠,主要代碼由大牛Doug Lea完成财剖,其實是在jdk1.4時代,由于java語言內(nèi)置對多線程編程的支持比較基礎和有限癌淮,所以他寫了這個躺坟,因為實在太過于優(yōu)秀,所以被加入到jdk之中乳蓄;
這次主要對CountDownLatch進行系統(tǒng)的講解
使用場景:比如對于馬拉松比賽咪橙,進行排名計算,參賽者的排名虚倒,肯定是跑完比賽之后美侦,進行計算得出的,翻譯成Java識別的預發(fā)魂奥,就是N個線程執(zhí)行操作菠剩,主線程等到N個子線程執(zhí)行完畢之后,在繼續(xù)往下執(zhí)行捧弃。
代碼示例
public static void testCountDownLatch(){
int threadCount = 10;
final CountDownLatch latch = new CountDownLatch(threadCount);
for(int i=0; i< threadCount; i++){
new Thread(new Runnable() {
@Override
public void run() {
System.out.println("線程" + Thread.currentThread().getId() + "開始出發(fā)");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("線程" + Thread.currentThread().getId() + "已到達終點");
latch.countDown();
}
}).start();
}
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("10個線程已經(jīng)執(zhí)行完畢赠叼!開始計算排名");
}
執(zhí)行結果:
線程10開始出發(fā)
線程13開始出發(fā)
線程12開始出發(fā)
線程11開始出發(fā)
線程14開始出發(fā)
線程15開始出發(fā)
線程16開始出發(fā)
線程17開始出發(fā)
線程18開始出發(fā)
線程19開始出發(fā)
線程14已到達終點
線程15已到達終點
線程13已到達終點
線程12已到達終點
線程10已到達終點
線程11已到達終點
線程16已到達終點
線程17已到達終點
線程18已到達終點
線程19已到達終點
10個線程已經(jīng)執(zhí)行完畢擦囊!開始計算排名
主要方法:
- void await() //當前線程等待計數(shù)器為0
- boolean await(long timeout, TimeUnit unit) //與上面的方法不同违霞,它加了一個時間限制嘴办。
- void countDown() //計數(shù)器減1
- long getCount() //獲取計數(shù)器的值
實現(xiàn)方式:
它的內(nèi)部有一個輔助的內(nèi)部類:Sync(繼承至AQS)
await() 方法的實現(xiàn):
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg); //加入到等待隊列中
}
countDown() 方法的實現(xiàn):
public void countDown() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared(); //解鎖
return true;
}
return false;
}
2.同步屏障CyclicBarrier
CyclicBarrier的字面意思是可循環(huán)使用(Cyclic)的屏障(Barrier)。它要做的事情是买鸽,讓一組線程到達一個屏障(也可以叫同步點)時被阻塞涧郊,直到最后一個線程到達屏障時,屏障才會開門眼五,所有被屏障攔截的線程才會繼續(xù)運行妆艘。
CyclicBarrier簡介
CyclicBarrier默認的構造方法是CyclicBarrier(int parties),其參數(shù)表示屏障攔截的線程數(shù)量看幼,每個線程調(diào)用await方法告訴CyclicBarrier我已經(jīng)到達了屏障批旺,然后當前線程被阻塞。示例代碼如下所示诵姜。
public class CyclicBarrierTest {
static CyclicBarrier c = new CyclicBarrier(2);
public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
try {
c.await();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println(1);
}
}).start();
try {
c.await();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println(2);
}
}
因為主線程和子線程的調(diào)度是由CPU決定的汽煮,兩個線程都有可能先執(zhí)行,所以會產(chǎn)生兩種輸出棚唆,第一種可能輸出如下暇赤。
1
2
第二種可能輸出如下。
2
1
如果把new CyclicBarrier(2)修改成new CyclicBarrier(3)宵凌,則主線程和子線程會永遠等待鞋囊,因為沒有第三個線程執(zhí)行await方法,即沒有第三個線程到達屏障瞎惫,所以之前到達屏障的兩個線程都不會繼續(xù)執(zhí)行溜腐。
CyclicBarrier還提供一個更高級的構造函數(shù)CyclicBarrier(int parties,Runnable barrier-Action)瓜喇,用于在線程到達屏障時逗扒,優(yōu)先執(zhí)行barrierAction,方便處理更復雜的業(yè)務場景
public class CyclicBarrierTest2 {
static CyclicBarrier c = new CyclicBarrier(2,new A());
public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
try {
c.await();
} catch (Exception e) {
}
System.out.println(1);
}
}).start();
try {
c.await();
} catch (Exception e) {
}
System.out.println(2);
}
static class A implements Runnable {
@Override
public void run() {
System.out.println(3);
}
}
}
因為CyclicBarrier設置了攔截線程的數(shù)量是2欠橘,所以必須等代碼中的第一個線程和線程A都執(zhí)行完之后矩肩,才會繼續(xù)執(zhí)行主線程,然后輸出2肃续,所以代碼執(zhí)行后的輸出如下黍檩。
3
1
2
CyclicBarrier可以用于多線程計算數(shù)據(jù),最后合并計算結果的場景始锚。例如刽酱,用一個Excel保存了用戶所有銀行流水,每個Sheet保存一個賬戶近一年的每筆銀行流水瞧捌,現(xiàn)在需要統(tǒng)計用戶的日均銀行流水棵里,先用多線程處理每個sheet里的銀行流水润文,都執(zhí)行完之后,得到每個sheet的日均銀行流水殿怜,最后典蝌,再用barrierAction用這些線程的計算結果,計算出整個Excel的日均銀行流水头谜,如代碼清單8-5所示骏掀。
public class BankWaterService implements Runnable{
/**
* 創(chuàng)建4個屏障,處理完之后執(zhí)行當前類的run方法
*/
private CyclicBarrier c = new CyclicBarrier(4, this);
/**
* 假設只有4個sheet柱告,所以只啟動4個線程
*/
private Executor executor = Executors.newFixedThreadPool(4);
/**
* 保存每個sheet計算出的銀流結果
*/
private ConcurrentHashMap<String, Integer>sheetBankWaterCount = new ConcurrentHashMap<String, Integer>();
private void count(){
for (int i = 0; i < 4; i++) {
executor.execute(new Runnable() {
@Override
public void run() {
// 計算當前sheet的銀流數(shù)據(jù)截驮,計算代碼省略
sheetBankWaterCount.put(Thread.currentThread().getName(), 1);
// 銀流計算完成,插入一個屏障
try {
c.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
});
}
}
@Override
public void run() {
int result = 0;
for (Map.Entry<String, Integer> sheet : sheetBankWaterCount.entrySet()) {
result += sheet.getValue();
}
sheetBankWaterCount.put("result", result);
System.out.println(result);
}
public static void main(String[] args) {
BankWaterService bankWaterService = new BankWaterService();
bankWaterService.count();
}
}
使用線程池創(chuàng)建4個線程际度,分別計算每個sheet里的數(shù)據(jù)葵袭,每個sheet計算結果是1,再由BankWaterService線程匯總4個sheet計算出的結果乖菱,輸出結果如下坡锡。
4
3.CyclicBarrier和CountDownLatch的區(qū)別
應用場景區(qū)別
CountDownLatch : 一個線程(或者多個), 等待另外N個線程完成某個事情之后才能執(zhí)行块请。
CyclicBarrier : N個線程相互等待娜氏,任何一個線程完成之前,所有的線程都必須等待墩新。
這樣應該就清楚一點了贸弥,對于CountDownLatch來說,重點是那個“一個線程”, 是它在等待海渊, 而另外那N的線程在把“某個事情”做完之后可以繼續(xù)等待绵疲,可以終止。
而對于CyclicBarrier來說臣疑,重點是那N個線程盔憨,他們之間任何一個沒有完成,所有的線程都必須等待讯沈。
CountDownLatch 是計數(shù)器, 線程完成一個就記一個, 就像 報數(shù)一樣, 只不過是遞減的.
而CyclicBarrier更像一個水閘, 線程執(zhí)行就想水流, 在水閘處都會堵住, 等到水滿(線程到齊)了, 才開始泄流.
使用區(qū)別
CountDownLatch的計數(shù)器只能使用一次郁岩,而CyclicBarrier的計數(shù)器可以使用reset()方法重置。所以CyclicBarrier能處理更為復雜的業(yè)務場景缺狠。例如问慎,如果計算發(fā)生錯誤,可以重置計數(shù)器挤茄,并讓線程重新執(zhí)行一次如叼。
CyclicBarrier還提供其他有用的方法,比如getNumberWaiting方法可以獲得Cyclic-Barrier阻塞的線程數(shù)量穷劈。isBroken()方法用來了解阻塞的線程是否被中斷
4.控制并發(fā)線程數(shù)的Semaphore
Semaphore(信號量)是用來控制同時訪問特定資源的線程數(shù)量笼恰,它通過協(xié)調(diào)各個線程踊沸,以保證合理的使用公共資源。
應用場景
Semaphore可以用于做流量控制社证,特別是公用資源有限的應用場景逼龟,比如數(shù)據(jù)庫連接。假如有一個需求猴仑,要讀取幾萬個文件的數(shù)據(jù)审轮,因為都是IO密集型任務肥哎,我們可以啟動幾十個線程并發(fā)地讀取辽俗,但是如果讀到內(nèi)存后,還需要存儲到數(shù)據(jù)庫中篡诽,而數(shù)據(jù)庫的連接數(shù)只有10個崖飘,這時我們必須控制只有10個線程同時獲取數(shù)據(jù)庫連接保存數(shù)據(jù),否則會報錯無法獲取數(shù)據(jù)庫連接杈女。這個時候朱浴,就可以使用Semaphore來做流量控制
public class SemaphoreTest {
private static final int THREAD_COUNT = 30;
private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT);
private static Semaphore s = new Semaphore(10);
public static void main(String[] args) {
for (int i = 0; i <THREAD_COUNT ; i++) {
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
s.acquire();
System.out.println("save data");
s.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
threadPool.shutdown();
}
}
在代碼中,雖然有30個線程在執(zhí)行达椰,但是只允許10個并發(fā)執(zhí)行翰蠢。Semaphore的構造方法Semaphore(int permits)接受一個整型的數(shù)字,表示可用的許可證數(shù)量啰劲。Semaphore(10)表示允許10個線程獲取許可證梁沧,也就是最大并發(fā)數(shù)是10。Semaphore的用法也很簡單蝇裤,首先線程使用Semaphore的acquire()方法獲取一個許可證廷支,使用完之后調(diào)用release()方法歸還許可證。還可以用tryAcquire()方法嘗試獲取許可證栓辜。
Semaphore還提供一些其他方法恋拍,具體如下。
- intavailablePermits():返回此信號量中當前可用的許可證數(shù)藕甩。
- intgetQueueLength():返回正在等待獲取許可證的線程數(shù)施敢。
- booleanhasQueuedThreads():是否有線程正在等待獲取許可證。
- void reducePermits(int reduction):減少reduction個許可證狭莱,是個protected方法僵娃。
- Collection getQueuedThreads():返回所有等待獲取許可證的線程集合,是個protected方法贩毕。