NIO來(lái)源:
傳統(tǒng)的io操作性能低凭舶,從jdk1.4開(kāi)始引入nio概念,?????Nio顧名思義就是Non-Blocking IO,非阻塞型IO操作堕扶,與傳統(tǒng)的java io操作一樣,NIO也提供SocketChannel和ServerSocketChannel兩種不同的套接字通道實(shí)現(xiàn),這兩種都支持阻塞和非阻塞兩種模式。
與IO的區(qū)別:
標(biāo)準(zhǔn)的IO基于字節(jié)流和字符流進(jìn)行操作的否灾,而NIO是基于通道(Channel)和緩沖區(qū)(Buffer)進(jìn)行操作卖擅,數(shù)據(jù)總是從通道讀取到緩沖區(qū)中,或者從緩沖區(qū)寫(xiě)入到通道中。當(dāng)線程從通道讀取數(shù)據(jù)到緩沖區(qū)時(shí)惩阶,線程還是可以進(jìn)行其他事情挎狸。當(dāng)數(shù)據(jù)被寫(xiě)入到緩沖區(qū)時(shí),線程可以繼續(xù)處理它断楷。從緩沖區(qū)寫(xiě)入通道也類(lèi)似.
核心概念:
一锨匆、緩沖區(qū)Buffer
在傳統(tǒng)的io操作中,請(qǐng)求是面向流的操作,數(shù)據(jù)直接寫(xiě)入到Stream或者程序直接從Stream中讀取數(shù)據(jù),在NIO操作中,所有數(shù)據(jù)都是用緩沖區(qū)處理,任何訪問(wèn)NIO的數(shù)據(jù)都是通過(guò)緩沖區(qū)進(jìn)行冬筒。
緩沖區(qū)在NIO中抽象為Buffer類(lèi).
我們看下Buffer類(lèi)的注釋信息
/**
* A container for data of a specific primitive type.
*
* <p> A buffer is a linear, finite sequence of elements of a specific
* primitive type.? Aside from its content, the essential properties of a
* buffer are its capacity, limit, and position: </p>
Buffer是一個(gè)存儲(chǔ)數(shù)據(jù)的特殊原始類(lèi)型,并且是線性恐锣、存儲(chǔ)有限元素的數(shù)據(jù)結(jié)構(gòu)(數(shù)組),Buffer核心的屬性除了存儲(chǔ)內(nèi)容之外還有容量capacity,限制Limit舞痰,和當(dāng)前位置position
Buffer是一個(gè)頂層抽象,最常用的API有以下幾個(gè):
1土榴、Buffer clear()?
清除buffer,將讀寫(xiě)位置position置為0,將最大可存儲(chǔ)數(shù)量limit置為最大容量capacity,一般在讀寫(xiě)操作之前都需要調(diào)用改方法
2、Buffer flip()
翻轉(zhuǎn)Buffer.意思大概是這樣的:調(diào)換這個(gè)buffer的當(dāng)前位置响牛,并且設(shè)置當(dāng)前位置是0鞭衩。說(shuō)的意思就是:將緩存字節(jié)數(shù)組的指針設(shè)置為數(shù)組的開(kāi)始序列即數(shù)組下標(biāo)0。這樣就可以從buffer開(kāi)頭娃善,對(duì)該buffer進(jìn)行遍歷(讀嚷垩堋)了。 一般用于當(dāng)讀寫(xiě)操作后調(diào)用改方法進(jìn)行寫(xiě)和讀操作聚磺。
說(shuō)白了就是當(dāng)向buffer寫(xiě)入數(shù)據(jù)完畢坯台,如果要從Buffer中讀取數(shù)據(jù),那么需要調(diào)用flip翻轉(zhuǎn)成可讀狀態(tài),否則讀取的數(shù)據(jù)不正確
更詳細(xì)的結(jié)束 大家請(qǐng)閱讀這篇文章??Java nio 之 Buffer反轉(zhuǎn)flip - 枯鴉專欄 - CSDN博客
由于傳輸?shù)臄?shù)據(jù)結(jié)構(gòu)不同具體實(shí)現(xiàn)的Buffer也很多,其中最常用的就是ByteBuffer
下面通過(guò)一個(gè)簡(jiǎn)歷的例子熟悉下ByteBuffer的常用的API
```
public static void main(String[] args) {
//分配88字節(jié)空間
? ? ByteBuffer byteBuffer = ByteBuffer.allocate(88);
? ? System.out.println("此時(shí) limit = "+byteBuffer.limit()+" position="+byteBuffer.position()+" capacity="+byteBuffer.capacity());
? ? String value ="Netty權(quán)威指南";
? ? //向buffer寫(xiě)數(shù)據(jù)
? ? byteBuffer.put(value.getBytes());
? ? System.out.println("寫(xiě)入數(shù)據(jù)后 limit = "+byteBuffer.limit()+" position="+byteBuffer.position()+" capacity="+byteBuffer.capacity());
? ? //翻轉(zhuǎn)buffer
? ? byteBuffer.flip();
? ? System.out.println("翻轉(zhuǎn)后 limit = "+byteBuffer.limit()+" position="+byteBuffer.position()+" capacity="+byteBuffer.capacity());
? ? byte[] tbyte =new byte[17];
? ? //從buffer讀數(shù)據(jù)
? ? byteBuffer.get(tbyte);
? ? System.out.println("讀后 limit = "+byteBuffer.limit()+" position="+byteBuffer.position()+" capacity="+byteBuffer.capacity());
}
```
執(zhí)行結(jié)果:
二、通道Channel
Channel是一個(gè)通道,可以通過(guò)他讀取或者寫(xiě)入數(shù)據(jù),并且可以全雙工同時(shí)讀寫(xiě)操作,Channel的狀態(tài)除了打開(kāi)就是關(guān)閉,在NIO中Channel一般注冊(cè)在多路復(fù)用器Selector中
主要有以下API
1瘫寝、isOpen 通道是否關(guān)閉
2蜒蕾、close 關(guān)閉通道
Channel主要有兩類(lèi),分別用于網(wǎng)絡(luò)讀寫(xiě)SelectableChannel和文件讀寫(xiě)的FileChannel
三焕阿、多路復(fù)用器Selector
Selector提供選擇已經(jīng)就緒任務(wù)的能力咪啡,它會(huì)不斷輪詢注冊(cè)在其上的Channel,如果某個(gè)Channel上面有新的Tcp連接暮屡,讀寫(xiě)事件撤摸,這個(gè)Channel就處于就緒狀態(tài)會(huì)被Selector選擇出來(lái),然后通過(guò)SelectionKey獲取就緒Channel的集合后續(xù)進(jìn)行對(duì)應(yīng)IO操作褒纲。使用Selector的好處在于:?使用更少的線程來(lái)就可以來(lái)處理通道了准夷, 相比使用多個(gè)線程,避免了線程上下文切換帶來(lái)的開(kāi)銷(xiāo)莺掠。
常用API:
1衫嵌、Selector open() 創(chuàng)建一個(gè)Selector
2、select()? 返回準(zhǔn)備好的channel個(gè)數(shù)彻秆,該方法會(huì)阻塞直到有Channel就緒
3楔绞、selectedKeys 返回就緒的selectedKeys
將Channel注冊(cè)到Selector上:
serverChannel.configureBlocking(false);//開(kāi)啟非阻塞模式
serverChannel.socket().bind(new InetSocketAddress(port),1024);//綁定端口
serverChannel.register(selector, SelectionKey.OP_ACCEPT);//將channel 注冊(cè)到selector,監(jiān)聽(tīng)客戶端連接ACCEPT請(qǐng)求
其中register第二個(gè)參數(shù)是一個(gè)事件的集合结闸。
可以監(jiān)聽(tīng)四種不同類(lèi)型的事件:Connect、Accept酒朵、Read桦锄、Write,通過(guò)SelectKey封裝實(shí)現(xiàn)
示例:
public class Server {
private static int DEFAULT_PORT =12345;
? ? private static ServerHandlerserverHandle;
? ? public static void start() {
start(DEFAULT_PORT);
? ? }
public static synchronized void start(int port) {
if (serverHandle !=null) {serverHandle.stop(); }
serverHandle =new ServerHandler(port);
? ? ? ? new Thread(serverHandle, "Server").start();
? ? }
public static void main(String[] args) {
start();
? ? }
}
```
```
/**
* NIO服務(wù)端
* @author wcs
* @version 1.0
*/
public class ServerHandlerimplements Runnable{
private Selectorselector;
? ? private ServerSocketChannelserverChannel;
? ? private volatile boolean started;
? ? /**
* 構(gòu)造方法
? ? * @param port 指定要監(jiān)聽(tīng)的端口號(hào)
*/
? ? public ServerHandler(int port) {
try{
//創(chuàng)建reactor線程矾缓,創(chuàng)建多路復(fù)用器
? ? ? ? ? ? selector = Selector.open();
? ? ? ? ? ? //打開(kāi)監(jiān)聽(tīng)通道 監(jiān)聽(tīng)客戶端連接
? ? ? ? ? ? serverChannel = ServerSocketChannel.open();
? ? ? ? ? ? //如果為 true,則此通道將被置于阻塞模式识颊;如果為 false兆衅,則此通道將被置于非阻塞模式
? ? ? ? ? ? serverChannel.configureBlocking(false);//開(kāi)啟非阻塞模式
//綁定監(jiān)聽(tīng)端口 設(shè)置連接為非阻塞模式 backlog設(shè)為1024
? ? ? ? ? ? serverChannel.socket().bind(new InetSocketAddress(port),1024);
? ? ? ? ? ? //將channel 注冊(cè)到selector,監(jiān)聽(tīng)客戶端連接ACCEPT請(qǐng)求
? ? ? ? ? ? serverChannel.register(selector, SelectionKey.OP_ACCEPT);
? ? ? ? ? ? //標(biāo)記服務(wù)器已開(kāi)啟
? ? ? ? ? ? started =true;
? ? ? ? ? ? System.out.println("服務(wù)器已啟動(dòng),端口號(hào):" + port);
? ? ? ? }catch(IOException e){
e.printStackTrace();
? ? ? ? ? ? System.exit(1);
? ? ? ? }
}
public void stop(){
started =false;
? ? }
@Override
? ? public void run() {
//循環(huán)遍歷selector
? ? ? ? while(started){
try{
//無(wú)論是否有讀寫(xiě)事件發(fā)生秦叛,selector每隔1s被喚醒一次
? ? ? ? ? ? ? ? selector.select(1000);
? ? ? ? ? ? ? ? //阻塞,只有當(dāng)至少一個(gè)注冊(cè)的事件發(fā)生的時(shí)候才會(huì)繼續(xù).
//? ? ? ? ? selector.select();
? ? ? ? ? ? ? ? Set keys =selector.selectedKeys();
? ? ? ? ? ? ? ? Iterator it = keys.iterator();
? ? ? ? ? ? ? ? SelectionKey key =null;
? ? ? ? ? ? ? ? while(it.hasNext()){
key = it.next();
? ? ? ? ? ? ? ? ? ? it.remove();
? ? ? ? ? ? ? ? ? ? try{
handleInput(key);
? ? ? ? ? ? ? ? ? ? }catch(Exception e){
if(key !=null){
key.cancel();
? ? ? ? ? ? ? ? ? ? ? ? ? ? if(key.channel() !=null){
key.channel().close();
? ? ? ? ? ? ? ? ? ? ? ? ? ? }
}
}
}
}catch(Throwable t){
t.printStackTrace();
? ? ? ? ? ? }
}
//selector關(guān)閉后會(huì)自動(dòng)釋放里面管理的資源
? ? ? ? if(selector !=null)
try{
selector.close();
? ? ? ? ? ? }catch (Exception e) {
e.printStackTrace();
? ? ? ? ? ? }
}
private void handleInput(SelectionKey key)throws IOException{
if(key.isValid()){
//處理新接入的請(qǐng)求消息
? ? ? ? ? ? if(key.isAcceptable()){
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
? ? ? ? ? ? ? ? //通過(guò)ServerSocketChannel的accept創(chuàng)建SocketChannel實(shí)例
//完成該操作意味著完成TCP三次握手,TCP物理鏈路正式建立
? ? ? ? ? ? ? ? SocketChannel sc = ssc.accept();
? ? ? ? ? ? ? ? //設(shè)置為非阻塞的
? ? ? ? ? ? ? ? sc.configureBlocking(false);
? ? ? ? ? ? ? ? //注冊(cè)為讀
? ? ? ? ? ? ? ? sc.register(selector, SelectionKey.OP_READ);
? ? ? ? ? ? }
//讀消息
? ? ? ? ? ? if(key.isReadable()){
SocketChannel sc = (SocketChannel) key.channel();
? ? ? ? ? ? ? ? //創(chuàng)建ByteBuffer,并開(kāi)辟一個(gè)1K的緩沖區(qū)
? ? ? ? ? ? ? ? ByteBuffer buffer = ByteBuffer.allocate(1024);
? ? ? ? ? ? ? ? //讀取請(qǐng)求碼流慰枕,返回讀取到的字節(jié)數(shù)
? ? ? ? ? ? ? ? int readBytes = sc.read(buffer);
? ? ? ? ? ? ? ? //讀取到字節(jié),對(duì)字節(jié)進(jìn)行編解碼
? ? ? ? ? ? ? ? if(readBytes>0){
//將緩沖區(qū)當(dāng)前的limit設(shè)置為position=0即纲,用于后續(xù)對(duì)緩沖區(qū)的讀取操作
? ? ? ? ? ? ? ? ? ? buffer.flip();
? ? ? ? ? ? ? ? ? ? //根據(jù)緩沖區(qū)可讀字節(jié)數(shù)創(chuàng)建字節(jié)數(shù)組
? ? ? ? ? ? ? ? ? ? byte[] bytes =new byte[buffer.remaining()];
? ? ? ? ? ? ? ? ? ? //將緩沖區(qū)可讀字節(jié)數(shù)組復(fù)制到新建的數(shù)組中
? ? ? ? ? ? ? ? ? ? buffer.get(bytes);
? ? ? ? ? ? ? ? ? ? String expression =new String(bytes,"UTF-8");
? ? ? ? ? ? ? ? ? ? System.out.println("服務(wù)器收到消息:" + expression);
? ? ? ? ? ? ? ? ? ? //處理數(shù)據(jù)
? ? ? ? ? ? ? ? ? ? String result =null;
? ? ? ? ? ? ? ? ? ? try{
result = expression+"nio hshs";
? ? ? ? ? ? ? ? ? ? }catch(Exception e){
result ="計(jì)算錯(cuò)誤:" + e.getMessage();
? ? ? ? ? ? ? ? ? ? }
//發(fā)送應(yīng)答消息
? ? ? ? ? ? ? ? ? ? doWrite(sc,result);
? ? ? ? ? ? ? ? }
//沒(méi)有讀取到字節(jié) 忽略
//? ? ? ? ? else if(readBytes==0);
//鏈路已經(jīng)關(guān)閉具帮,釋放資源
? ? ? ? ? ? ? ? else if(readBytes<0){
key.cancel();
? ? ? ? ? ? ? ? ? ? sc.close();
? ? ? ? ? ? ? ? }
}
}
}
//異步發(fā)送應(yīng)答消息
? ? private void doWrite(SocketChannel channel,String response)throws IOException{
//將消息編碼為字節(jié)數(shù)組
? ? ? ? byte[] bytes = response.getBytes();
? ? ? ? //根據(jù)數(shù)組容量創(chuàng)建ByteBuffer
? ? ? ? ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
? ? ? ? //將字節(jié)數(shù)組復(fù)制到緩沖區(qū)
? ? ? ? writeBuffer.put(bytes);
? ? ? ? //flip操作
? ? ? ? writeBuffer.flip();
? ? ? ? //發(fā)送緩沖區(qū)的字節(jié)數(shù)組
? ? ? ? channel.write(writeBuffer);
? ? ? ? //****此處不含處理“寫(xiě)半包”的代碼
? ? }
}
```
客戶端代碼
```
public class Client {
private static StringDEFAULT_HOST ="127.0.0.1";
? ? private static int DEFAULT_PORT =12345;
? ? private static ClientHandlerclientHandle;
? ? public static void start(){
start(DEFAULT_HOST,DEFAULT_PORT);
? ? }
public static synchronized void start(String ip,int port){
if(clientHandle!=null)
clientHandle.stop();
? ? ? ? clientHandle =new ClientHandler(ip,port);
? ? ? ? new Thread(clientHandle,"Server").start();
? ? }
//向服務(wù)器發(fā)送消息
? ? public static boolean sendMsg(String msg)throws Exception{
if(msg.equals("q"))return false;
? ? ? ? clientHandle.sendMsg(msg);
return true;
? ? }
public static void main(String[] args){
start();
? ? }
}
```
```
package com.wcs.learn.NIO;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
/**
* NIO客戶端
* @author wcs
* @version 1.0
*/
public class ClientHandlerimplements Runnable{
private Stringhost;
? ? private int port;
? ? private Selectorselector;
? ? private SocketChannelsocketChannel;
? ? private volatile boolean started;
? ? public ClientHandler(String ip,int port) {
this.host = ip;
? ? ? ? this.port = port;
? ? ? ? try{
//創(chuàng)建選擇器
? ? ? ? ? ? selector = Selector.open();
? ? ? ? ? ? //打開(kāi)監(jiān)聽(tīng)通道
? ? ? ? ? ? socketChannel = SocketChannel.open();
? ? ? ? ? ? //如果為 true,則此通道將被置于阻塞模式低斋;如果為 false蜂厅,則此通道將被置于非阻塞模式
? ? ? ? ? ? socketChannel.configureBlocking(false);//開(kāi)啟非阻塞模式
? ? ? ? ? ? started =true;
? ? ? ? }catch(IOException e){
e.printStackTrace();
? ? ? ? ? ? System.exit(1);
? ? ? ? }
}
public void stop(){
started =false;
? ? }
@Override
? ? public void run() {
try{
doConnect();
? ? ? ? }catch(IOException e){
e.printStackTrace();
? ? ? ? ? ? System.exit(1);
? ? ? ? }
//循環(huán)遍歷selector
? ? ? ? while(started){
try{
//無(wú)論是否有讀寫(xiě)事件發(fā)生,selector每隔1s被喚醒一次
? ? ? ? ? ? ? ? selector.select(1000);
? ? ? ? ? ? ? ? //阻塞,只有當(dāng)至少一個(gè)注冊(cè)的事件發(fā)生的時(shí)候才會(huì)繼續(xù).
//? ? ? ? ? selector.select();
? ? ? ? ? ? ? ? Set keys =selector.selectedKeys();
? ? ? ? ? ? ? ? Iterator it = keys.iterator();
? ? ? ? ? ? ? ? SelectionKey key =null;
? ? ? ? ? ? ? ? while(it.hasNext()){
key = it.next();
? ? ? ? ? ? ? ? ? ? it.remove();
? ? ? ? ? ? ? ? ? ? try{
handleInput(key);
? ? ? ? ? ? ? ? ? ? }catch(Exception e){
if(key !=null){
key.cancel();
? ? ? ? ? ? ? ? ? ? ? ? ? ? if(key.channel() !=null){
key.channel().close();
? ? ? ? ? ? ? ? ? ? ? ? ? ? }
}
}
}
}catch(Exception e){
e.printStackTrace();
? ? ? ? ? ? ? ? System.exit(1);
? ? ? ? ? ? }
}
//selector關(guān)閉后會(huì)自動(dòng)釋放里面管理的資源
? ? ? ? if(selector !=null)
try{
selector.close();
? ? ? ? ? ? }catch (Exception e) {
e.printStackTrace();
? ? ? ? ? ? }
}
private void handleInput(SelectionKey key)throws IOException{
if(key.isValid()){
SocketChannel sc = (SocketChannel) key.channel();
? ? ? ? ? ? if(key.isConnectable()){
if(sc.finishConnect());
? ? ? ? ? ? ? ? else System.exit(1);
? ? ? ? ? ? }
//讀消息
? ? ? ? ? ? if(key.isReadable()){
//創(chuàng)建ByteBuffer膊畴,并開(kāi)辟一個(gè)1M的緩沖區(qū)
? ? ? ? ? ? ? ? ByteBuffer buffer = ByteBuffer.allocate(1024);
? ? ? ? ? ? ? ? //讀取請(qǐng)求碼流掘猿,返回讀取到的字節(jié)數(shù)
? ? ? ? ? ? ? ? int readBytes = sc.read(buffer);
? ? ? ? ? ? ? ? //讀取到字節(jié),對(duì)字節(jié)進(jìn)行編解碼
? ? ? ? ? ? ? ? if(readBytes>0){
//將緩沖區(qū)當(dāng)前的limit設(shè)置為position=0唇跨,用于后續(xù)對(duì)緩沖區(qū)的讀取操作
? ? ? ? ? ? ? ? ? ? buffer.flip();
? ? ? ? ? ? ? ? ? ? //根據(jù)緩沖區(qū)可讀字節(jié)數(shù)創(chuàng)建字節(jié)數(shù)組
? ? ? ? ? ? ? ? ? ? byte[] bytes =new byte[buffer.remaining()];
? ? ? ? ? ? ? ? ? ? //將緩沖區(qū)可讀字節(jié)數(shù)組復(fù)制到新建的數(shù)組中
? ? ? ? ? ? ? ? ? ? buffer.get(bytes);
? ? ? ? ? ? ? ? ? ? String result =new String(bytes,"UTF-8");
? ? ? ? ? ? ? ? ? ? System.out.println("客戶端收到消息:" + result);
? ? ? ? ? ? ? ? }
//沒(méi)有讀取到字節(jié) 忽略
//? ? ? ? ? else if(readBytes==0);
//鏈路已經(jīng)關(guān)閉稠通,釋放資源
? ? ? ? ? ? ? ? else if(readBytes<0){
key.cancel();
? ? ? ? ? ? ? ? ? ? sc.close();
? ? ? ? ? ? ? ? }
}
}
}
//異步發(fā)送消息
? ? private void doWrite(SocketChannel channel,String request)throws IOException{
//將消息編碼為字節(jié)數(shù)組
? ? ? ? byte[] bytes = request.getBytes();
? ? ? ? //根據(jù)數(shù)組容量創(chuàng)建ByteBuffer
? ? ? ? ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
? ? ? ? //將字節(jié)數(shù)組復(fù)制到緩沖區(qū)
? ? ? ? writeBuffer.put(bytes);
? ? ? ? //flip操作
? ? ? ? writeBuffer.flip();
? ? ? ? //發(fā)送緩沖區(qū)的字節(jié)數(shù)組
? ? ? ? channel.write(writeBuffer);
? ? ? ? //****此處不含處理“寫(xiě)半包”的代碼
? ? }
private void doConnect()throws IOException{
if(socketChannel.connect(new InetSocketAddress(host,port)));
? ? ? ? else socketChannel.register(selector, SelectionKey.OP_CONNECT);
? ? }
public void sendMsg(String msg)throws Exception{
socketChannel.register(selector, SelectionKey.OP_READ);
? ? ? ? doWrite(socketChannel, msg);
? ? }
}
```