1、reactor(反應(yīng)器)模式
使用單線程模擬多線程秩彤,提高資源利用率和程序的效率叔扼,增加系統(tǒng)吞吐量。下面例子比較形象的說明了什么是反應(yīng)器模式:
一個(gè)老板經(jīng)營(yíng)一個(gè)飯店漫雷,
傳統(tǒng)模式 - 來一個(gè)客人安排一個(gè)服務(wù)員招呼瓜富,客人很滿意;(相當(dāng)于一個(gè)連接一個(gè)線程)
后來客人越來越多降盹,需要的服務(wù)員越來越多与柑,資源條件不足以再請(qǐng)更多的服務(wù)員了,傳統(tǒng)模式已經(jīng)不能滿足需求蓄坏。老板之所以為老板自然有過人之處价捧,老板發(fā)現(xiàn),服務(wù)員在為客人服務(wù)時(shí)剑辫,當(dāng)客人點(diǎn)菜的時(shí)候干旧,服務(wù)員基本處于等待狀態(tài),(阻塞線程妹蔽,不做事)。
于是乎就讓服務(wù)員在客人點(diǎn)菜的時(shí)候挠将,去為其他客人服務(wù)胳岂,當(dāng)客人菜點(diǎn)好后再招呼服務(wù)員即可。 --反應(yīng)器(reactor)模式誕生了
飯店的生意紅紅火火舔稀,幾個(gè)服務(wù)員就足以支撐大量的客流量乳丰,老板用有限的資源賺了更多的money~~~~^_^
通道:類似于流,但是可以異步讀寫數(shù)據(jù)(流只能同步讀寫)内贮,通道是雙向的产园,(流是單向的),通道的數(shù)據(jù)總是要先讀到一個(gè)buffer 或者 從一個(gè)buffer寫入夜郁,即通道與buffer進(jìn)行數(shù)據(jù)交互什燕。
通道類型:
FileChannel:從文件中讀寫數(shù)據(jù)。
DatagramChannel:能通過UDP讀寫網(wǎng)絡(luò)中的數(shù)據(jù)竞端。
SocketChannel:能通過TCP讀寫網(wǎng)絡(luò)中的數(shù)據(jù)屎即。
ServerSocketChannel:可以監(jiān)聽新進(jìn)來的TCP連接,像Web服務(wù)器那樣。對(duì)每一個(gè)新進(jìn)來的連接都會(huì)創(chuàng)建一個(gè)SocketChannel技俐。
- FileChannel比較特殊乘陪,它可以與通道進(jìn)行數(shù)據(jù)交互, 不能切換到非阻塞模式雕擂,套接字通道可以切換到非阻塞模式啡邑;
緩沖區(qū)?- 本質(zhì)上是一塊可以存儲(chǔ)數(shù)據(jù)的內(nèi)存,被封裝成了buffer對(duì)象而已井赌!
緩沖區(qū)類型:
ByteBuffer
MappedByteBuffer
CharBuffer
DoubleBuffer
FloatBuffer
IntBuffer
LongBuffer
ShortBuffer
常用方法:
allocate() - 分配一塊緩沖區(qū)
put() - 向緩沖區(qū)寫數(shù)據(jù)
get() - 向緩沖區(qū)讀數(shù)據(jù)
filp() - 將緩沖區(qū)從寫模式切換到讀模式
clear() - 從讀模式切換到寫模式谣拣,不會(huì)清空數(shù)據(jù),但后續(xù)寫數(shù)據(jù)會(huì)覆蓋原來的數(shù)據(jù)族展,即使有部分?jǐn)?shù)據(jù)沒有讀森缠,也會(huì)被遺忘;
compact() - 從讀數(shù)據(jù)切換到寫模式仪缸,數(shù)據(jù)不會(huì)被清空贵涵,會(huì)將所有未讀的數(shù)據(jù)copy到緩沖區(qū)頭部,后續(xù)寫數(shù)據(jù)不會(huì)覆蓋恰画,而是在這些數(shù)據(jù)之后寫數(shù)據(jù)
mark() - 對(duì)position做出標(biāo)記宾茂,配合reset使用
reset() - 將position置為標(biāo)記值
緩沖區(qū)的一些屬性:
capacity - 緩沖區(qū)大小,無論是讀模式還是寫模式拴还,此屬性值不會(huì)變跨晴;
position - 寫數(shù)據(jù)時(shí),position表示當(dāng)前寫的位置片林,每寫一個(gè)數(shù)據(jù)端盆,會(huì)向下移動(dòng)一個(gè)數(shù)據(jù)單元,初始為0费封;最大為capacity - 1焕妙,切換到讀模式時(shí),position會(huì)被置為0弓摘,表示當(dāng)前讀的位置
limit - 寫模式下焚鹊,limit 相當(dāng)于capacity 表示最多可以寫多少數(shù)據(jù),切換到讀模式時(shí)韧献,limit 等于原先的position末患,表示最多可以讀多少數(shù)據(jù)。
非直接緩沖區(qū):通過allocate() 方法 分配緩沖區(qū),將緩沖區(qū)建立在JVM內(nèi)存中
直接緩沖區(qū):通過allocateDirect() 方法直接緩沖區(qū) 將緩沖區(qū)建立在物理內(nèi)存中
2.1 關(guān)于緩沖區(qū)各個(gè)屬性的測(cè)試
String str ="abcde";//1. 分配一個(gè)指定大小的緩沖區(qū)ByteBuffer buf = ByteBuffer.allocate(1024);? ? ? ? ? ? ? ? System.out.println("--------------allocate()----------------");? ? ? ? System.out.println(buf.position());//0System.out.println(buf.limit());//1024System.out.println(buf.capacity());//1024//2. 利用put存入數(shù)據(jù)到緩沖區(qū)中去buf.put(str.getBytes());? ? ? ? ? ? ? ? System.out.println("----------------put()-------------------");? ? ? ? System.out.println(buf.position());//5System.out.println(buf.limit());//1024System.out.println(buf.capacity());//1024//3. 切換到讀取模式buf.flip();? ? ? ? ? ? ? ? System.out.println("----------------flip()------------------");? ? ? ? System.out.println(buf.position());//0System.out.println(buf.limit());//5System.out.println(buf.capacity());//1024//4. 利用get() 讀取緩沖區(qū)中的數(shù)據(jù)byte[] dst =newbyte[buf.limit()];? ? ? ? buf.get(dst);? ? ? ? System.out.println(newString(dst,0,dst.length));? ? ? ? ? ? ? ? System.out.println("----------------get()------------------");? ? ? ? System.out.println(buf.position());//5System.out.println(buf.limit());//5System.out.println(buf.capacity());//1024//5.可重復(fù)讀buf.rewind();? ? ? ? ? ? ? ? System.out.println("----------------rewind()------------------");? ? ? ? System.out.println(buf.position());//0System.out.println(buf.limit());//5System.out.println(buf.capacity());//1024//6.clear(): 清空緩沖區(qū), 但是緩沖區(qū)的數(shù)據(jù)依然存在, 但是處于被遺忘的狀態(tài)buf.clear();? ? ? ? ? ? ? ? System.out.println("----------------clear()-------------------");? ? ? ? System.out.println(buf.position());//0System.out.println(buf.limit());//1024System.out.println(buf.capacity());//1024byte[] newByte =newbyte[buf.limit()];? ? ? ? buf.get(newByte);? ? ? ? System.out.println(newString(newByte,0,newByte.length));
2.2 關(guān)于通道的使用
1.利用通道進(jìn)行 文件的復(fù)制 非直接緩沖區(qū)
FileInputStream fis =null;? ? ? ? FileOutputStream fos =null;? ? ? ? FileChannel inChannel =null;? ? ? ? FileChannel outChannel =null;try{? ? ? ? ? ? fis =newFileInputStream("1.jpg");? ? ? ? ? ? fos =newFileOutputStream("2.jpg");// ①獲取通道inChannel = fis.getChannel();? ? ? ? ? ? outChannel = fos.getChannel();// ②將通道中的數(shù)據(jù)存入緩沖區(qū)ByteBuffer byteBuffer = ByteBuffer.allocate(1024);// 將通道中的數(shù)據(jù)存入緩沖區(qū)while(inChannel.read(byteBuffer) != -1) {? ? ? ? ? ? ? ? byteBuffer.flip();// 切換讀取數(shù)據(jù)的模式outChannel.write(byteBuffer);? ? ? ? ? ? ? ? byteBuffer.clear();? ? ? ? ? ? }? ? ? ? }catch(IOException e) {? ? ? ? ? ? e.printStackTrace();? ? ? ? }finally{if(inChannel !=null) {try{? ? ? ? ? ? ? ? ? ? inChannel.close();? ? ? ? ? ? ? ? }catch(IOException e) {? ? ? ? ? ? ? ? ? ? e.printStackTrace();? ? ? ? ? ? ? ? }? ? ? ? ? ? }if(outChannel !=null) {try{? ? ? ? ? ? ? ? ? ? outChannel.close();? ? ? ? ? ? ? ? }catch(IOException e) {? ? ? ? ? ? ? ? ? ? e.printStackTrace();? ? ? ? ? ? ? ? }? ? ? ? ? ? }if(fis !=null) {try{? ? ? ? ? ? ? ? ? ? fis.close();? ? ? ? ? ? ? ? }catch(IOException e) {? ? ? ? ? ? ? ? ? ? e.printStackTrace();? ? ? ? ? ? ? ? }? ? ? ? ? ? }if(fos !=null) {try{? ? ? ? ? ? ? ? ? ? fos.close();? ? ? ? ? ? ? ? }catch(IOException e) {? ? ? ? ? ? ? ? ? ? e.printStackTrace();? ? ? ? ? ? ? ? }? ? ? ? ? ? }? ? ? ? }
2.通道之間的傳輸
CREATE_NEW:如果文件不存在就創(chuàng)建锤窑,存在就報(bào)錯(cuò)
CREATE:如果文件不存在就創(chuàng)建璧针,存在創(chuàng)建(覆蓋)
FileChannel inChannel =null;? ? ? ? FileChannel outChannel =null;try{? ? ? ? ? ? inChannel = FileChannel.open(Paths.get("hello.txt"), StandardOpenOption.READ);? ? ? ? ? ? outChannel = FileChannel.open(Paths.get("hello2.txt"), StandardOpenOption.READ,StandardOpenOption.WRITE,StandardOpenOption.CREATE_NEW);? ? ? ? ? ? ? ? ? ? ? ? inChannel.transferTo(0, inChannel.size(), outChannel);? ? ? ? }catch(Exception e) {? ? ? ? ? ? e.printStackTrace();? ? ? ? }finally{if(inChannel !=null){try{? ? ? ? ? ? ? ? ? ? inChannel.close();? ? ? ? ? ? ? ? }catch(IOException e) {? ? ? ? ? ? ? ? ? ? e.printStackTrace();? ? ? ? ? ? ? ? }? ? ? ? ? ? }if(outChannel !=null){try{? ? ? ? ? ? ? ? ? ? outChannel.close();? ? ? ? ? ? ? ? }catch(IOException e) {? ? ? ? ? ? ? ? ? ? e.printStackTrace();? ? ? ? ? ? ? ? }? ? ? ? ? ? }? ? ? ? }
3. 使用直接緩沖區(qū)完成內(nèi)存文件的復(fù)制
FileChannel inChannel =null;? ? ? ? FileChannel outChannel =null;try{? ? ? ? ? ? inChannel = FileChannel.open(Paths.get("1.jpg"), StandardOpenOption.READ);? ? ? ? ? ? outChannel = FileChannel.open(Paths.get("x.jpg"), StandardOpenOption.READ,StandardOpenOption.WRITE,StandardOpenOption.CREATE_NEW);? ? ? ? ? ? ? ? ? ? ? ? MappedByteBuffer inMappedBuffer = inChannel.map(MapMode.READ_ONLY,0, inChannel.size());? ? ? ? ? ? MappedByteBuffer outMappedBuffer = outChannel.map(MapMode.READ_WRITE,0, inChannel.size());? ? ? ? ? ? ? ? ? ? ? ? System.out.println(inMappedBuffer.limit());byte[] b =newbyte[inMappedBuffer.limit()];;? ? ? ? ? ? inMappedBuffer.get(b);? ? ? ? ? ? outMappedBuffer.put(b);? ? ? ? ? ? ? ? ? ? }catch(Exception e) {? ? ? ? ? ? e.printStackTrace();? ? ? ? }finally{if(inChannel !=null){try{? ? ? ? ? ? ? ? ? ? inChannel.close();? ? ? ? ? ? ? ? }catch(IOException e) {? ? ? ? ? ? ? ? ? ? e.printStackTrace();? ? ? ? ? ? ? ? }? ? ? ? ? ? }if(outChannel !=null){try{? ? ? ? ? ? ? ? ? ? outChannel.close();? ? ? ? ? ? ? ? }catch(IOException e) {? ? ? ? ? ? ? ? ? ? e.printStackTrace();? ? ? ? ? ? ? ? }? ? ? ? ? ? }? ? ? ? ? ? ? ? ? ? }
2.3 重點(diǎn) NIO-非阻塞IO
個(gè)人認(rèn)為 NIO 最難的兩點(diǎn) 一個(gè)是對(duì)于選擇器和選擇鍵的理解 其次是對(duì)于網(wǎng)絡(luò)通信模型的理解
本章內(nèi)容以防過長(zhǎng) 只講解 NIO 的使用方法 上述兩點(diǎn)參看下回分解
阻塞IO示例:
//客戶端@Testpublicvoidclient()throwsIOException{? ? ? ? SocketChannel sChannel = SocketChannel.open(newInetSocketAddress("127.0.0.1",9898));? ? ? ? ? ? ? ? FileChannel inChannel = FileChannel.open(Paths.get("1.jpg"), StandardOpenOption.READ);? ? ? ? ? ? ? ? ByteBuffer buf = ByteBuffer.allocate(1024);while(inChannel.read(buf) != -1){? ? ? ? ? ? buf.flip();? ? ? ? ? ? sChannel.write(buf);? ? ? ? ? ? buf.clear();? ? ? ? }? ? ? ? ? ? ? ? sChannel.shutdownOutput();//接收服務(wù)端的反饋intlen =0;while((len = sChannel.read(buf)) != -1){? ? ? ? ? ? buf.flip();? ? ? ? ? ? System.out.println(newString(buf.array(),0, len));? ? ? ? ? ? buf.clear();? ? ? ? }? ? ? ? ? ? ? ? inChannel.close();? ? ? ? sChannel.close();? ? }//服務(wù)端@Testpublicvoidserver()throwsIOException{? ? ? ? ServerSocketChannel ssChannel = ServerSocketChannel.open();? ? ? ? ? ? ? ? FileChannel outChannel = FileChannel.open(Paths.get("2.jpg"), StandardOpenOption.WRITE, StandardOpenOption.CREATE);? ? ? ? ? ? ? ? ssChannel.bind(newInetSocketAddress(9898));? ? ? ? ? ? ? ? SocketChannel sChannel = ssChannel.accept();? ? ? ? ? ? ? ? ByteBuffer buf = ByteBuffer.allocate(1024);while(sChannel.read(buf) != -1){? ? ? ? ? ? buf.flip();? ? ? ? ? ? outChannel.write(buf);? ? ? ? ? ? buf.clear();? ? ? ? }//發(fā)送反饋給客戶端buf.put("服務(wù)端接收數(shù)據(jù)成功".getBytes());? ? ? ? buf.flip();? ? ? ? sChannel.write(buf);? ? ? ? ? ? ? ? sChannel.close();? ? ? ? outChannel.close();? ? ? ? ssChannel.close();? ? }
非阻塞IO示例-TCP:
//客戶端@Testpublicvoidclient()throwsIOException{//1. 獲取通道SocketChannel sChannel = SocketChannel.open(newInetSocketAddress("127.0.0.1",9898));//2. 切換非阻塞模式sChannel.configureBlocking(false);//3. 分配指定大小的緩沖區(qū)ByteBuffer buf = ByteBuffer.allocate(1024);//4. 發(fā)送數(shù)據(jù)給服務(wù)端Scanner scan =newScanner(System.in);while(scan.hasNext()){? ? ? ? ? ? String str = scan.next();? ? ? ? ? ? buf.put((newDate().toString() +"\n"+ str).getBytes());? ? ? ? ? ? buf.flip();? ? ? ? ? ? sChannel.write(buf);? ? ? ? ? ? buf.clear();? ? ? ? }//5. 關(guān)閉通道sChannel.close();? ? }//服務(wù)端@Testpublicvoidserver()throwsIOException{//1. 獲取通道ServerSocketChannel ssChannel = ServerSocketChannel.open();//2. 切換非阻塞模式ssChannel.configureBlocking(false);//3. 綁定連接ssChannel.bind(newInetSocketAddress(9898));//4. 獲取選擇器Selector selector = Selector.open();//5. 將通道注冊(cè)到選擇器上, 并且指定“監(jiān)聽接收事件”ssChannel.register(selector, SelectionKey.OP_ACCEPT);//6. 輪詢式的獲取選擇器上已經(jīng)“準(zhǔn)備就緒”的事件while(selector.select() >0){//7. 獲取當(dāng)前選擇器中所有注冊(cè)的“選擇鍵(已就緒的監(jiān)聽事件)”Iterator it = selector.selectedKeys().iterator();while(it.hasNext()){//8. 獲取準(zhǔn)備“就緒”的是事件SelectionKey sk = it.next();//9. 判斷具體是什么事件準(zhǔn)備就緒if(sk.isAcceptable()){//10. 若“接收就緒”,獲取客戶端連接SocketChannel sChannel = ssChannel.accept();//11. 切換非阻塞模式sChannel.configureBlocking(false);//12. 將該通道注冊(cè)到選擇器上sChannel.register(selector, SelectionKey.OP_READ);? ? ? ? ? ? ? ? }elseif(sk.isReadable()){//13. 獲取當(dāng)前選擇器上“讀就緒”狀態(tài)的通道SocketChannel sChannel = (SocketChannel) sk.channel();//14. 讀取數(shù)據(jù)ByteBuffer buf = ByteBuffer.allocate(1024);intlen =0;while((len = sChannel.read(buf)) >0){? ? ? ? ? ? ? ? ? ? ? ? buf.flip();? ? ? ? ? ? ? ? ? ? ? ? System.out.println(newString(buf.array(),0, len));? ? ? ? ? ? ? ? ? ? ? ? buf.clear();? ? ? ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? }//15. 取消選擇鍵 SelectionKeyit.remove();? ? ? ? ? ? }? ? ? ? }? ? }
非阻塞IO示例-UDP:
@Testpublicvoidsend()throwsIOException{? ? ? ? DatagramChannel dc = DatagramChannel.open();? ? ? ? ? ? ? ? dc.configureBlocking(false);? ? ? ? ? ? ? ? ByteBuffer buf = ByteBuffer.allocate(1024);? ? ? ? ? ? ? ? Scanner scan =newScanner(System.in);while(scan.hasNext()){? ? ? ? ? ? String str = scan.next();? ? ? ? ? ? buf.put((newDate().toString() +":\n"+ str).getBytes());? ? ? ? ? ? buf.flip();? ? ? ? ? ? dc.send(buf,newInetSocketAddress("127.0.0.1",9898));? ? ? ? ? ? buf.clear();? ? ? ? }? ? ? ? ? ? ? ? dc.close();? ? }@Testpublicvoidreceive()throwsIOException{? ? ? ? DatagramChannel dc = DatagramChannel.open();? ? ? ? ? ? ? ? dc.configureBlocking(false);? ? ? ? ? ? ? ? dc.bind(newInetSocketAddress(9898));? ? ? ? ? ? ? ? Selector selector = Selector.open();? ? ? ? ? ? ? ? dc.register(selector, SelectionKey.OP_READ);while(selector.select() >0){? ? ? ? ? ? Iterator it = selector.selectedKeys().iterator();while(it.hasNext()){? ? ? ? ? ? ? ? SelectionKey sk = it.next();if(sk.isReadable()){? ? ? ? ? ? ? ? ? ? ByteBuffer buf = ByteBuffer.allocate(1024);? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? dc.receive(buf);? ? ? ? ? ? ? ? ? ? buf.flip();? ? ? ? ? ? ? ? ? ? System.out.println(newString(buf.array(),0, buf.limit()));? ? ? ? ? ? ? ? ? ? buf.clear();? ? ? ? ? ? ? ? }? ? ? ? ? ? }? ? ? ? ? ? ? ? ? ? ? ? it.remove(); ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?歡迎工作一到五年的Java工程師朋友們加入Java群:?891219277
群內(nèi)提供免費(fèi)的Java架構(gòu)學(xué)習(xí)資料(里面有高可用果复、高并發(fā)陈莽、高性能及分布式、Jvm性能調(diào)優(yōu)、Spring源碼走搁,MyBatis独柑,Netty,Redis,Kafka,Mysql,Zookeeper,Tomcat,Docker,Dubbo,Nginx等多個(gè)知識(shí)點(diǎn)的架構(gòu)資料)合理利用自己每一分每一秒的時(shí)間來學(xué)習(xí)提升自己,不要再用"沒有時(shí)間“來掩飾自己思想上的懶惰私植!趁年輕忌栅,使勁拼,給未來的自己一個(gè)交代曲稼!