java多線程編程中為了滿足一些應(yīng)用場景往往需要對線程進(jìn)行調(diào)度,jdk提供了多種調(diào)度方法邓深,接下來我們一一來舉例說明浓冒。
- 信號量同步 主要是為了限流控制線程的并發(fā)數(shù),這時我們可以采用這種方法所森,信號量簡單理解為小區(qū)入口的閘門囱持,刷卡一次獲取一個信號,當(dāng)然也可以一次占用多個信號焕济,總之信號的總量不變纷妆,占用多了能通過的線程就少了,具體我們可以從代碼中來進(jìn)行分析
public class SemaphoreDemo {
private Semaphore semaphore = new Semaphore(10);
public void printNum(int i){
try {
semaphore.acquire(1);
System.out.println("queue = " + semaphore.getQueueLength());
Thread.sleep(3000);
System.out.println(Thread.currentThread().getName()+" semaphore = " + i);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
semaphore.release(1);
}
}
public static void main(String[] args){
final SemaphoreDemo demo = new SemaphoreDemo();
for(int i =0; i<100; i++){
final int finalI = i;
new Thread(new Runnable() {
@Override
public void run() {
demo.printNum(finalI);
}
}).start();
}
}
}
我們初始化了一個總量為10的信號量池晴弃,每一個線程在執(zhí)行的時候都會獲取一個信號 掩幢,其中semaphore.acquire(1); 方法就是獲取信號,當(dāng)然也可以一次獲取多個上鞠。拿到信號量后际邻,線程開始執(zhí)行自身的業(yè)務(wù)邏輯,當(dāng)任務(wù)執(zhí)行完成或者發(fā)生異常后我們需要釋放信號量芍阎,不然會一直占用信號量世曾,semaphore.acquire方法是阻塞執(zhí)行,如果沒有獲取到信號量會一直阻塞代碼的執(zhí)行直到獲取信號量成功谴咸,如果我們需要讓代碼異步執(zhí)行可以嘗試semaphore.tryAcquire轮听,如果沒有獲取到信號量則會返回false告訴調(diào)用者,這樣我們可以根據(jù)結(jié)果處理對應(yīng)的邏輯岭佳,當(dāng)然也可以設(shè)置一個超時時間血巍,如果在指定的時間內(nèi)沒有獲取到信號量則返回false。
- CountDownLatch 讓指定的線程全部都執(zhí)行完成后在接著做后續(xù)的邏輯珊随,可以簡單理解為藻茂,運(yùn)動會的時候賽場上裁判必須讓運(yùn)動員就位以后才開始進(jìn)行比賽,CountDownLatch就是那個場上的裁判。我們來看看代碼實(shí)現(xiàn)
public class CountDownLatchDemo {
public void race(CountDownLatch latch){
try {
System.out.println(Thread.currentThread().getName()+ " is here");
Thread.sleep(3000);
System.out.println(Thread.currentThread().getName()+ " is done");
latch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
}
}
public static void main(String[] args){
final CountDownLatch latch = new CountDownLatch(4);
final CountDownLatchDemo demo = new CountDownLatchDemo();
for(int i=0;i<4;i++){
new Thread(new Runnable() {
@Override
public void run() {
demo.race(latch);
}
}).start();
}
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("all is done");
}
}
Thread-0 is here
Thread-1 is here
Thread-2 is here
Thread-3 is here
Thread-1 is done
Thread-3 is done
Thread-2 is done
Thread-0 is done
all is done
通過運(yùn)行結(jié)果我們可以看到辨赐,4個線程各自等待3s后在執(zhí)行最后的代碼优俘,每個線程在啟動的時候會進(jìn)行計數(shù),然后latch.await方法就一直處于等待狀態(tài)掀序,只有計數(shù)器為0時才會執(zhí)行之后的的代碼帆焕,latch.await方法還可以設(shè)置超時時間,超時后會自動執(zhí)行后續(xù)的代碼不恭。
- CyclicBarrier 過程同步叶雹,簡單理解就是每個線程階段完成各自任務(wù)后等待其他線程執(zhí)行完成,然后在執(zhí)行下一階段的任務(wù)换吧,完成后在等待其他線程折晦,如此循環(huán)往復(fù),就好比賽場上四名運(yùn)動員跑步沾瓦,哪個人領(lǐng)先來就停下來等其他的運(yùn)動員趕上來满着,最后一起沖刺終點(diǎn)。我們看看在代碼中是如何實(shí)現(xiàn)的
public class CyclicBarrierDemo {
private Random random = new Random();
public void race(CyclicBarrier barrier){
try {
Thread.sleep(random.nextInt(5)*1000);
System.out.println(Thread.currentThread().getName()+"到達(dá)地點(diǎn)1");
barrier.await();
Thread.sleep(random.nextInt(5)*1000);
System.out.println(Thread.currentThread().getName()+"到達(dá)地點(diǎn)2");
barrier.await();
Thread.sleep(random.nextInt(5)*1000);
System.out.println(Thread.currentThread().getName()+"到達(dá)地點(diǎn)3");
barrier.await();
Thread.sleep(random.nextInt(5)*1000);
System.out.println(Thread.currentThread().getName()+"到達(dá)地點(diǎn)4");
barrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
public static void main(String[] args){
final CyclicBarrierDemo demo = new CyclicBarrierDemo();
final CyclicBarrier barrier = new CyclicBarrier(4);
for(int i=0;i<4;i++){
new Thread(new Runnable() {
@Override
public void run() {
demo.race(barrier);
}
}).start();
}
}
}
pool-1-thread-2到達(dá)地點(diǎn)1
pool-1-thread-1到達(dá)地點(diǎn)1
pool-1-thread-3到達(dá)地點(diǎn)1
pool-1-thread-4到達(dá)地點(diǎn)1
pool-1-thread-1到達(dá)地點(diǎn)2
pool-1-thread-4到達(dá)地點(diǎn)2
pool-1-thread-3到達(dá)地點(diǎn)2
pool-1-thread-2到達(dá)地點(diǎn)2
pool-1-thread-3到達(dá)地點(diǎn)3
pool-1-thread-1到達(dá)地點(diǎn)3
pool-1-thread-2到達(dá)地點(diǎn)3
pool-1-thread-4到達(dá)地點(diǎn)3
pool-1-thread-2到達(dá)地點(diǎn)4
pool-1-thread-3到達(dá)地點(diǎn)4
pool-1-thread-1到達(dá)地點(diǎn)4
pool-1-thread-4到達(dá)地點(diǎn)4
通過設(shè)置屏障點(diǎn)贯莺,我們可以看到四個線程都到達(dá)屏障點(diǎn)后才執(zhí)行后續(xù)的邏輯风喇,哪一個線程到達(dá)屏障點(diǎn)后就自動等待,一直到所有的線程都到達(dá)后方才開始執(zhí)行缕探。
- Phaser是一個更強(qiáng)大的魂莫、更復(fù)雜的同步輔助類,可以代替CyclicBarrier CountDownLatch的功能爹耗,但是比他們更強(qiáng)大耙考。
Phaser類機(jī)制是在每一步結(jié)束的位置對線程進(jìn)行同步,當(dāng)所有的線程都完成了這一步潭兽,才能進(jìn)行下一步琳骡。
當(dāng)我們有并發(fā)任務(wù)并且需要分解成幾步執(zhí)行的時候,這種機(jī)制就非常適合讼溺。
CyclicBarrier CountDownLatch 只能在構(gòu)造時指定參與量楣号,而phaser可以動態(tài)的增減參與量。
phaser 使用說明:
使用phaser.arriveAndAwaitAdvance(); //等待參與者達(dá)到指定數(shù)量怒坯,才開始運(yùn)行下面的代碼
使用phaser.arriveAndDeregister(); //注銷當(dāng)前線程炫狱,該線程就不會進(jìn)入休眠狀態(tài),也會從phaser的數(shù)量中減少
模擬代替CountDownLatch功能剔猿,只需要當(dāng)前線程arriveAndAwaitAdvance()之后運(yùn)行需要的代碼之后视译,就arriveAndDeregister()取消當(dāng)前線程的注冊。
phaser有一個重大特性归敬,就是不必對它的方法進(jìn)行異常處理酷含。置于休眠的線程不會響應(yīng)中斷事件鄙早,不會拋出interruptedException異常, 只有一個方法會響應(yīng):AwaitAdvanceInterruptibly(int phaser).
其他api
arrive():這個方法通知phase對象一個參與者已經(jīng)完成了當(dāng)前階段椅亚,但是它不應(yīng)該等待其他參與者都完成當(dāng)前階段限番,必須小心使用這個方法,因?yàn)樗粫c其他線程同步呀舔。
awaitAdvance(int phase):如果傳入的階段參數(shù)與當(dāng)前階段一致弥虐,這個方法會將當(dāng)前線程至于休眠,直到這個階段的所有參與者都運(yùn)行完成媚赖。如果傳入的階段參數(shù)與當(dāng)前階段不一致霜瘪,這個方法立即返回。
awaitAdvanceInterruptibly(int phaser):這個方法跟awaitAdvance(int phase)一樣惧磺,不同處是:該訪問將會響應(yīng)線程中斷颖对。會拋出interruptedException異常
將參與者注冊到phaser中:
register():將一個新的參與者注冊到phase中,這個新的參與者將被當(dāng)成沒有執(zhí)完本階段的線程磨隘。
bulkRegister(int parties):將指定數(shù)目的參與者注冊到phaser中缤底,所有這些新的參與者都將被當(dāng)成沒有執(zhí)行完本階段的線程。
減少參與者
只提供了一個方法來減少參與者:arriveAndDeregister():告知phaser對應(yīng)的線程已經(jīng)完成了當(dāng)前階段琳拭,并它不會參與到下一階段的操作中训堆。
強(qiáng)制終止
當(dāng)一個phaser么有參與者的時候描验,它就處于終止?fàn)顟B(tài)白嘁,使用forceTermination()方法來強(qiáng)制phaser進(jìn)入終止?fàn)顟B(tài),不管是否存在未注冊的參與線程膘流,當(dāng)一個線程出現(xiàn)錯誤時絮缅,強(qiáng)制終止phaser是很有意義的。
當(dāng)phaser處于終止?fàn)顟B(tài)的時候呼股,arriveAndAwaitAdvance() 和 awaitAdvance() 立即返回一個負(fù)數(shù)耕魄,而不再是一個正值了,如果知道phaser可能會被終止彭谁,就需要驗(yàn)證這些方法的值吸奴,以確定phaser是不是被終止了。
被終止的phaser不會保證參與者的同步
public class PhaserDemo {
private Random random = new Random();
public void race(Phaser phaser){
try {
Thread.sleep(random.nextInt(5)* 1000);
System.out.println(Thread.currentThread().getName()+ "aaaaa");
phaser.arriveAndAwaitAdvance();
Thread.sleep(random.nextInt(5)* 1000);
System.out.println(Thread.currentThread().getName()+ "bbbbb");
phaser.arriveAndAwaitAdvance();
Thread.sleep(random.nextInt(5)* 1000);
System.out.println(Thread.currentThread().getName()+ "cccc");
phaser.arriveAndAwaitAdvance();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args){
final PhaserDemo demo = new PhaserDemo();
ExecutorService pool = Executors.newCachedThreadPool();
final Phaser phaser = new Phaser(3);
for(int i=0;i<3;i++){
pool.submit(new Runnable() {
@Override
public void run() {
demo.race(phaser);
}
});
}
pool.shutdown();
}
}