深入理解線程通信
開發(fā)中不免會遇到需要所有子線程執(zhí)行完畢通知主線程處理某些邏輯的場景悦析。
或者是線程 A 在執(zhí)行到某個條件通知線程 B 執(zhí)行某個操作膨处。
可以通過以下幾種方式實現(xiàn):
等待通知模式是 Java 中比較經(jīng)典的線程通信方式冕屯。
兩個線程通過對同一對象調用等待 wait() 和通知 notify() 方法來進行通訊。
如兩個線程交替打印奇偶數(shù):
public class TwoThreadWaitNotify { private int start = 1; private boolean flag = false; public static void main(String[] args) { TwoThreadWaitNotify twoThread = new TwoThreadWaitNotify(); Thread t1 = new Thread(new OuNum(twoThread)); t1.setName("A"); Thread t2 = new Thread(new JiNum(twoThread)); t2.setName("B"); t1.start(); t2.start(); } /** * 偶數(shù)線程 */ public static class OuNum implements Runnable { private TwoThreadWaitNotify number; public OuNum(TwoThreadWaitNotify number) { this.number = number; } @Override public void run() { while (number.start <= 100) { synchronized (TwoThreadWaitNotify.class) { System.out.println("偶數(shù)線程搶到鎖了"); if (number.flag) { System.out.println(Thread.currentThread().getName() + "+-+偶數(shù)" + number.start); number.start++; number.flag = false; TwoThreadWaitNotify.class.notify(); }else { try { TwoThreadWaitNotify.class.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } } } } /** * 奇數(shù)線程 */ public static class JiNum implements Runnable { private TwoThreadWaitNotify number; public JiNum(TwoThreadWaitNotify number) { this.number = number; } @Override public void run() { while (number.start <= 100) { synchronized (TwoThreadWaitNotify.class) { System.out.println("奇數(shù)線程搶到鎖了"); if (!number.flag) { System.out.println(Thread.currentThread().getName() + "+-+奇數(shù)" + number.start); number.start++; number.flag = true; TwoThreadWaitNotify.class.notify(); }else { try { TwoThreadWaitNotify.class.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } } } } }
輸出結果:
t2+-+奇數(shù)93
t1+-+偶數(shù)94
t2+-+奇數(shù)95
t1+-+偶數(shù)96
t2+-+奇數(shù)97
t1+-+偶數(shù)98
t2+-+奇數(shù)99
t1+-+偶數(shù)100
這里的線程 A 和線程 B 都對同一個對象?TwoThreadWaitNotify.class?獲取鎖进肯,A 線程調用了同步對象的 wait() 方法釋放了鎖并進入?WAITING?狀態(tài)艇肴。
B 線程調用了 notify() 方法腔呜,這樣 A 線程收到通知之后就可以從 wait() 方法中返回。
這里利用了?TwoThreadWaitNotify.class?對象完成了通信再悼。
有一些需要注意:
wait() 育谬、notify()、notifyAll() 調用的前提都是獲得了對象的鎖(也可稱為對象監(jiān)視器)帮哈。
調用 wait() 方法后線程會釋放鎖膛檀,進入?WAITING?狀態(tài),該線程也會被移動到等待隊列中娘侍。
調用 notify() 方法會將等待隊列中的線程移動到同步隊列中咖刃,線程狀態(tài)也會更新為?BLOCKED
從 wait() 方法返回的前提是調用 notify() 方法的線程釋放鎖,wait() 方法的線程獲得鎖憾筏。
等待通知有著一個經(jīng)典范式:
線程 A 作為消費者:
獲取對象的鎖嚎杨。
進入 while(判斷條件),并調用 wait() 方法氧腰。
當條件滿足跳出循環(huán)執(zhí)行具體處理邏輯枫浙。
線程 B 作為生產者:
獲取對象鎖刨肃。
更改與線程 A 共用的判斷條件。
調用 notify() 方法箩帚。
偽代碼如下:
//Thread A
synchronized(Object){
? ? while(條件){
? ? ? ? Object.wait();
? ? }
? ? //do something
}
//Thread B
synchronized(Object){
? ? 條件=false;//改變條件
? ? Object.notify();
}
privatestaticvoidjoin() throwsInterruptedException{Threadt1=newThread(newRunnable() {@Overridepublicvoidrun() {LOGGER.info("running");try{Thread.sleep(3000);? ? ? ? ? ? ? ? }catch(InterruptedExceptione) {? ? ? ? ? ? ? ? ? ? e.printStackTrace();? ? ? ? ? ? ? ? }? ? ? ? ? ? }? ? ? ? }) ;Threadt2=newThread(newRunnable() {@Overridepublicvoidrun() {LOGGER.info("running2");try{Thread.sleep(4000);? ? ? ? ? ? ? ? }catch(InterruptedExceptione) {? ? ? ? ? ? ? ? ? ? e.printStackTrace();? ? ? ? ? ? ? ? }? ? ? ? ? ? }? ? ? ? }) ;? ? ? ? t1.start();? ? ? ? t2.start();//等待線程1終止t1.join();//等待線程2終止t2.join();LOGGER.info("main over");? ? }
輸出結果:
2018-03-16 20:21:30.967 [Thread-1] INFO? c.c.actual.ThreadCommunication - running2
2018-03-16 20:21:30.967 [Thread-0] INFO? c.c.actual.ThreadCommunication - running
2018-03-16 20:21:34.972 [main] INFO? c.c.actual.ThreadCommunication - main over
在?t1.join()?時會一直阻塞到 t1 執(zhí)行完畢真友,所以最終主線程會等待 t1 和 t2 線程執(zhí)行完畢。
其實從源碼可以看出紧帕,join() 也是利用的等待通知機制:
核心邏輯:
while(isAlive()) {? ? ? ? wait(0);? ? }
在 join 線程完成后會調用 notifyAll() 方法盔然,是在 JVM 實現(xiàn)中調用,所以這里看不出來是嗜。
因為 Java 是采用共享內存的方式進行線程通信的愈案,所以可以采用以下方式用主線程關閉 A 線程:
publicclassVolatileimplementsRunnable{privatestaticvolatilebooleanflag=true;@Overridepublicvoidrun() {while(flag){System.out.println(Thread.currentThread().getName()+"正在運行。鹅搪。站绪。");? ? ? ? }System.out.println(Thread.currentThread().getName()+"執(zhí)行完畢");? ? }publicstaticvoidmain(String[]args)throwsInterruptedException{VolatileaVolatile=newVolatile();newThread(aVolatile,"thread A").start();System.out.println("main 線程正在運行") ;TimeUnit.MILLISECONDS.sleep(100) ;? ? ? ? aVolatile.stopThread();? ? }privatevoidstopThread(){? ? ? ? flag=false;? ? }}
輸出結果:
thread A正在運行。丽柿。恢准。
thread A正在運行。航厚。顷歌。
thread A正在運行。幔睬。眯漩。
thread A正在運行。麻顶。赦抖。
thread A執(zhí)行完畢
這里的 flag 存放于主內存中,所以主線程和線程 A 都可以看到辅肾。
flag 采用 volatile 修飾主要是為了內存可見性队萤,更多內容可以查看這里。
CountDownLatch 可以實現(xiàn) join 相同的功能矫钓,但是更加的靈活要尔。
privatestaticvoidcountDownLatch() throwsException{intthread=3;longstart=System.currentTimeMillis();finalCountDownLatchcountDown=newCountDownLatch(thread);for(inti=0;i<thread ; i++){newThread(newRunnable() {@Overridepublicvoidrun() {LOGGER.info("thread run");try{Thread.sleep(2000);? ? ? ? ? ? ? ? ? ? ? ? countDown.countDown();LOGGER.info("thread end");? ? ? ? ? ? ? ? ? ? }catch(InterruptedExceptione) {? ? ? ? ? ? ? ? ? ? ? ? e.printStackTrace();? ? ? ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? }? ? ? ? ? ? }).start();? ? ? ? }? ? ? ? countDown.await();longstop=System.currentTimeMillis();LOGGER.info("main over total time={}",stop-start);? ? }
輸出結果:
2018-03-16 20:19:44.126 [Thread-0] INFO? c.c.actual.ThreadCommunication - thread run
2018-03-16 20:19:44.126 [Thread-2] INFO? c.c.actual.ThreadCommunication - thread run
2018-03-16 20:19:44.126 [Thread-1] INFO? c.c.actual.ThreadCommunication - thread run
2018-03-16 20:19:46.136 [Thread-2] INFO? c.c.actual.ThreadCommunication - thread end
2018-03-16 20:19:46.136 [Thread-1] INFO? c.c.actual.ThreadCommunication - thread end
2018-03-16 20:19:46.136 [Thread-0] INFO? c.c.actual.ThreadCommunication - thread end
2018-03-16 20:19:46.136 [main] INFO? c.c.actual.ThreadCommunication - main over total time=2012
CountDownLatch 也是基于 AQS(AbstractQueuedSynchronizer) 實現(xiàn)的,更多實現(xiàn)參考?ReentrantLock 實現(xiàn)原理
初始化一個 CountDownLatch 時告訴并發(fā)的線程新娜,然后在每個線程處理完畢之后調用 countDown() 方法赵辕。
該方法會將 AQS 內置的一個 state 狀態(tài) -1 。
最終在主線程調用 await() 方法概龄,它會阻塞直到?state == 0?的時候返回还惠。
privatestaticvoidcyclicBarrier() throwsException{CyclicBarriercyclicBarrier=newCyclicBarrier(3) ;newThread(newRunnable() {@Overridepublicvoidrun() {LOGGER.info("thread run");try{? ? ? ? ? ? ? ? ? ? cyclicBarrier.await() ;? ? ? ? ? ? ? ? }catch(Exceptione) {? ? ? ? ? ? ? ? ? ? e.printStackTrace();? ? ? ? ? ? ? ? }LOGGER.info("thread end do something");? ? ? ? ? ? }? ? ? ? }).start();newThread(newRunnable() {@Overridepublicvoidrun() {LOGGER.info("thread run");try{? ? ? ? ? ? ? ? ? ? cyclicBarrier.await() ;? ? ? ? ? ? ? ? }catch(Exceptione) {? ? ? ? ? ? ? ? ? ? e.printStackTrace();? ? ? ? ? ? ? ? }LOGGER.info("thread end do something");? ? ? ? ? ? }? ? ? ? }).start();newThread(newRunnable() {@Overridepublicvoidrun() {LOGGER.info("thread run");try{Thread.sleep(5000);? ? ? ? ? ? ? ? ? ? cyclicBarrier.await() ;? ? ? ? ? ? ? ? }catch(Exceptione) {? ? ? ? ? ? ? ? ? ? e.printStackTrace();? ? ? ? ? ? ? ? }LOGGER.info("thread end do something");? ? ? ? ? ? }? ? ? ? }).start();LOGGER.info("main thread");? ? }
CyclicBarrier 中文名叫做屏障或者是柵欄,也可以用于線程間通信私杜。
它可以等待 N 個線程都達到某個狀態(tài)后繼續(xù)運行的效果蚕键。
首先初始化線程參與者救欧。
調用?await()?將會在所有參與者線程都調用之前等待。
直到所有參與者都調用了?await()?后锣光,所有線程從?await()?返回繼續(xù)后續(xù)邏輯笆怠。
運行結果:
2018-03-18 22:40:00.731 [Thread-0] INFO? c.c.actual.ThreadCommunication - thread run
2018-03-18 22:40:00.731 [Thread-1] INFO? c.c.actual.ThreadCommunication - thread run
2018-03-18 22:40:00.731 [Thread-2] INFO? c.c.actual.ThreadCommunication - thread run
2018-03-18 22:40:00.731 [main] INFO? c.c.actual.ThreadCommunication - main thread
2018-03-18 22:40:05.741 [Thread-0] INFO? c.c.actual.ThreadCommunication - thread end do something
2018-03-18 22:40:05.741 [Thread-1] INFO? c.c.actual.ThreadCommunication - thread end do something
2018-03-18 22:40:05.741 [Thread-2] INFO? c.c.actual.ThreadCommunication - thread end do something
可以看出由于其中一個線程休眠了五秒,所有其余所有的線程都得等待這個線程調用?await()?嫉晶。
該工具可以實現(xiàn) CountDownLatch 同樣的功能骑疆,但是要更加靈活田篇。甚至可以調用?reset()?方法重置 CyclicBarrier (需要自行捕獲 BrokenBarrierException 處理) 然后重新執(zhí)行替废。
publicclassStopThreadimplementsRunnable{@Overridepublicvoidrun() {while(!Thread.currentThread().isInterrupted()) {//線程執(zhí)行具體邏輯System.out.println(Thread.currentThread().getName()+"運行中。泊柬。");? ? ? ? }System.out.println(Thread.currentThread().getName()+"退出椎镣。。");? ? }publicstaticvoidmain(String[]args)throwsInterruptedException{Threadthread=newThread(newStopThread(),"thread A");? ? ? ? thread.start();System.out.println("main 線程正在運行") ;TimeUnit.MILLISECONDS.sleep(10) ;? ? ? ? thread.interrupt();? ? }}
輸出結果:
thread A運行中兽赁。状答。
thread A運行中。刀崖。
thread A退出惊科。。
可以采用中斷線程的方式來通信亮钦,調用了?thread.interrupt()?方法其實就是將 thread 中的一個標志屬性置為了 true馆截。
并不是說調用了該方法就可以中斷線程,如果不對這個標志進行響應其實是沒有什么作用(這里對這個標志進行了判斷)蜂莉。
但是如果拋出了 InterruptedException 異常蜡娶,該標志就會被 JVM 重置為 false。
如果是用線程池來管理線程映穗,可以使用以下方式來讓主線程等待線程池中所有任務執(zhí)行完畢:
privatestaticvoidexecutorService() throwsException{BlockingQueuequeue=newLinkedBlockingQueue<>(10) ;ThreadPoolExecutorpoolExecutor=newThreadPoolExecutor(5,5,1,TimeUnit.MILLISECONDS,queue) ;? ? ? ? poolExecutor.execute(newRunnable() {@Overridepublicvoidrun() {LOGGER.info("running");try{Thread.sleep(3000);? ? ? ? ? ? ? ? }catch(InterruptedExceptione) {? ? ? ? ? ? ? ? ? ? e.printStackTrace();? ? ? ? ? ? ? ? }? ? ? ? ? ? }? ? ? ? });? ? ? ? poolExecutor.execute(newRunnable() {@Overridepublicvoidrun() {LOGGER.info("running2");try{Thread.sleep(2000);? ? ? ? ? ? ? ? }catch(InterruptedExceptione) {? ? ? ? ? ? ? ? ? ? e.printStackTrace();? ? ? ? ? ? ? ? }? ? ? ? ? ? }? ? ? ? });? ? ? ? poolExecutor.shutdown();while(!poolExecutor.awaitTermination(1,TimeUnit.SECONDS)){LOGGER.info("線程還在執(zhí)行窖张。。蚁滋。");? ? ? ? }LOGGER.info("main over");? ? }
輸出結果:
2018-03-16 20:18:01.273 [pool-1-thread-2] INFO? c.c.actual.ThreadCommunication - running2
2018-03-16 20:18:01.273 [pool-1-thread-1] INFO? c.c.actual.ThreadCommunication - running
2018-03-16 20:18:02.273 [main] INFO? c.c.actual.ThreadCommunication - 線程還在執(zhí)行宿接。。辕录。
2018-03-16 20:18:03.278 [main] INFO? c.c.actual.ThreadCommunication - 線程還在執(zhí)行睦霎。。踏拜。
2018-03-16 20:18:04.278 [main] INFO? c.c.actual.ThreadCommunication - main over
使用這個?awaitTermination()?方法的前提需要關閉線程池碎赢,如調用了?shutdown()?方法。
調用了?shutdown()?之后線程池會停止接受新任務速梗,并且會平滑的關閉線程池中現(xiàn)有的任務肮塞。
publicstaticvoidpiped() throwsIOException{//面向于字符 PipedInputStream 面向于字節(jié)PipedWriterwriter=newPipedWriter();PipedReaderreader=newPipedReader();//輸入輸出流建立連接writer.connect(reader);Threadt1=newThread(newRunnable() {@Overridepublicvoidrun() {LOGGER.info("running");try{for(inti=0; i<10; i++) {? ? ? ? ? ? ? ? ? ? ? ? writer.write(i+"");Thread.sleep(10);? ? ? ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? }catch(Exceptione) {? ? ? ? ? ? ? ? }finally{try{? ? ? ? ? ? ? ? ? ? ? ? writer.close();? ? ? ? ? ? ? ? ? ? }catch(IOExceptione) {? ? ? ? ? ? ? ? ? ? ? ? e.printStackTrace();? ? ? ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? }? ? ? ? ? ? }? ? ? ? });Threadt2=newThread(newRunnable() {@Overridepublicvoidrun() {LOGGER.info("running2");intmsg=0;try{while((msg=reader.read())!=-1) {LOGGER.info("msg={}", (char) msg);? ? ? ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? }catch(Exceptione) {? ? ? ? ? ? ? ? }? ? ? ? ? ? }? ? ? ? });? ? ? ? t1.start();? ? ? ? t2.start();? ? }
輸出結果:
2018-03-16 19:56:43.014 [Thread-0] INFO? c.c.actual.ThreadCommunication - running
2018-03-16 19:56:43.014 [Thread-1] INFO? c.c.actual.ThreadCommunication - running2
2018-03-16 19:56:43.130 [Thread-1] INFO? c.c.actual.ThreadCommunication - msg=0
2018-03-16 19:56:43.132 [Thread-1] INFO? c.c.actual.ThreadCommunication - msg=1
2018-03-16 19:56:43.132 [Thread-1] INFO? c.c.actual.ThreadCommunication - msg=2
2018-03-16 19:56:43.133 [Thread-1] INFO? c.c.actual.ThreadCommunication - msg=3
2018-03-16 19:56:43.133 [Thread-1] INFO? c.c.actual.ThreadCommunication - msg=4
2018-03-16 19:56:43.133 [Thread-1] INFO? c.c.actual.ThreadCommunication - msg=5
2018-03-16 19:56:43.133 [Thread-1] INFO? c.c.actual.ThreadCommunication - msg=6
2018-03-16 19:56:43.134 [Thread-1] INFO? c.c.actual.ThreadCommunication - msg=7
2018-03-16 19:56:43.134 [Thread-1] INFO? c.c.actual.ThreadCommunication - msg=8
2018-03-16 19:56:43.134 [Thread-1] INFO? c.c.actual.ThreadCommunication - msg=9
Java 雖說是基于內存通信的襟齿,但也可以使用管道通信。
需要注意的是枕赵,輸入流和輸出流需要首先建立連接猜欺。這樣線程 B 就可以收到線程 A 發(fā)出的消息了。
實際開發(fā)中可以靈活根據(jù)需求選擇最適合的線程通信方式拷窜。