在java io中,核心概念為流(Stream)筷弦,面向流的編程,一個流要么是輸出流送讲,要么是輸入流奸笤,不能夠同時是輸出流又同時是輸入流惋啃。
java nio中有三個核心概念,Selector,Channel监右,Buffer边灭,在Nio中,我們是面向塊(block)或者緩沖區(qū)(buffer)編程健盒。Buffer本身是一塊內存绒瘦,底層實際就是數組,數據的讀寫扣癣,都是通過Buffer來實現的惰帽。
Buffer:提供了對于數據的結構化訪問方式,并且可以追蹤到系統(tǒng)的讀寫過程父虑。在java中该酗,七種原生數據類型都有各自的Buffer類型,如IntBuffer,LongBuffer,ByteBuffer...Buffer使用flip()方法來改變讀寫狀態(tài)
Channel:可以向其中寫入或是從中讀取數據的對象士嚎,類似于java.io中的Stream呜魄。所有的Channel的數據讀寫都是通過Buffer來進行的,永遠不能直接向Channel直接讀取對象或者直接寫入對象莱衩。
Selector:Selector 一般稱 為選擇器 爵嗅,當然你也可以翻譯為 多路復用器 。它是Java NIO核心組件中的一個笨蚁,用于檢查一個或多個NIO Channel(通道)的狀態(tài)是否處于可讀睹晒、可寫。如此可以實現單線程管理多個channels,也就是可以管理多個網絡鏈接括细。
Nio結構模型
-
Channel與Buffer的關系
1554441824184.png
-
Selector:
1554604133948.png
Buffer&Channel _Examples
-
Buffer的讀寫
public class NioTest1 { public static void main(String[] args) { IntBuffer intBuffer = IntBuffer.allocate(10); for (int i = 0; i < intBuffer.capacity(); i++) { int randomNumber = new Random().nextInt(20); intBuffer.put(randomNumber); } //狀態(tài)反轉伪很,使buffer成為可讀狀態(tài) intBuffer.flip(); while (intBuffer.hasRemaining()) { System.out.println(intBuffer.get()); } } }
-
通過Channel把文件讀取到程序中(需要使用FileInputStream)
public class NioTest2 { public static void main(String[] args) throws IOException { //傳統(tǒng)io讀取數據 BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(new FileInputStream("src/main/java/com/zakl/nio/NioTest2.txt"),"UTF-8")); while (bufferedReader.ready()) { System.out.println(bufferedReader.readLine()); } System.out.println("----------"); //nio讀取數據 ByteBuffer byteBuffer = ByteBuffer.allocate(512); FileChannel fileChannel = new FileInputStream("src/main/java/com/zakl/nio/NioTest2.txt").getChannel(); //將Channel中的數據讀取到byteBuffer中 fileChannel.read(byteBuffer); //改變bytebuffer狀態(tài),由可寫變成可讀 byteBuffer.flip(); while (byteBuffer.hasRemaining()) { System.out.print((char) byteBuffer.get()); } } }
-
通過Channel將數據寫入到文件中(需要使用FileOutputStream)
public class NioTest3 { public static void main(String[] args) throws IOException { FileOutputStream fileOutputStream = new FileOutputStream("src/main/java/com/zakl/nio/NioTest3.txt"); FileChannel channel = fileOutputStream.getChannel(); ByteBuffer byteBuffer = ByteBuffer.allocate(512); byte[] messages = "message test".getBytes(); for (int i = 0; i < messages.length; i++) { byteBuffer.put(messages[i]); } byteBuffer.flip(); //將byteBuffer中的消息寫入到channel中 channel.write(byteBuffer); channel.close(); fileOutputStream.close(); } }
Buffer的三個特性:capacity, limit , position (要理解讀寫模式奋单,是相對的是掰,channel.read(buffer)是將channel中數據讀取到buffer中,此時buffer是寫模式辱匿,buffer的put也是寫模式键痛。反之亦然)
? 0<=mark<=position<=limit<=capacity
- capacity:構建Buffer時進行初始化,全局不可變匾七。
- position: 永遠指向下一個讀或者寫的元素索引絮短。
- limit:默認位置為capacity的值,當調用flip()時昨忆,limit位置指向方法調用前的position位置丁频。
- 1554450906686.png
- flip()使得buffer由可寫變成可讀狀態(tài),limit=position,position=0
- clear()使得變成可寫狀態(tài),limit=capacity,position=0
- rewind()重新使得position的位置置于0;數據不變,可以繼續(xù)執(zhí)行讀操作席里,重復取出buffer中的數據
單個Channel與多個Buffer的交互(Scattering與Gathering):
public class NioTest11 {
public static void main(String[] args) throws IOException {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
InetSocketAddress address = new InetSocketAddress(8899);
//服務器監(jiān)聽8899端口
serverSocketChannel.socket().bind(address);
int messageLength = 2 + 3 + 4;
ByteBuffer[] byteBuffers = new ByteBuffer[3];
byteBuffers[0] = ByteBuffer.allocate(2);
byteBuffers[1] = ByteBuffer.allocate(3);
byteBuffers[2] = ByteBuffer.allocate(4);
SocketChannel socketChannel = serverSocketChannel.accept();
while (true) {
int bytesRead = 0;
while (bytesRead < messageLength) {
//將buffer數組進行寫入操作叔磷。第一個滿了就寫第二個,返回該次寫入的總長度
long r = socketChannel.read(byteBuffers);
bytesRead += r;
System.out.println("byteRead:" + bytesRead);
//stream將每一個buffer對象轉化成String對象
Arrays.asList(byteBuffers).stream().map(buffer->"position:"+buffer.position()+"limit:"+buffer.limit()).forEach(System.out::println);
}
Arrays.asList(byteBuffers).forEach(byteBuffer -> {
byteBuffer.flip();
});
long byteWritten =0;
while (byteWritten < messageLength) {
long r = socketChannel.write(byteBuffers);
byteWritten += r;
}
Arrays.asList(byteBuffers).forEach(byteBuffer -> byteBuffer.clear());
System.out.println("bytesRead:" + bytesRead + ",bytesWritten:" + byteWritten + ",messageLength:"+messageLength);
}
}
}
?
selector(多路復用器)
? 用于檢查一個或多個NIO Channel的狀態(tài)是否處于可讀奖磁、可寫改基。如此可以實現單線程管理多個channels,也就是可以管理多個網絡鏈接。
-
Registered key-set:
? 所有與選擇器關聯的通道所生成的鍵的集合稱為已經注冊的鍵的集合咖为。并不是所有注冊過的鍵都仍然有效秕狰。這個 集合通過 keys() 方法返回,并且可能是空的躁染。這個已注冊的鍵的集合不是可以直接修改的鸣哀;試圖這么做的話將 引發(fā)java.lang.UnsupportedOperationException。
?
-
selected-key:
? 所有與選擇器關聯的通道所生成的鍵的集合稱為已經注冊的鍵的集合吞彤。并不是所有注冊過的鍵都仍然有效我衬。這個 集合通過 keys() 方法返回,并且可能是空的饰恕。這個已注冊的鍵的集合不是可以直接修改的低飒;試圖這么做的話將 引發(fā)java.lang.UnsupportedOperationException。
-
cancelled-key:
? 已注冊的鍵的集合的子集懂盐,這個集合包含了 cancel() 方法被調用過的鍵(這個鍵已經被無效化),但它們還沒有被 注銷糕档。這個集合是選擇器對象的私有成員莉恼,因而無法直接訪問。
? 注意: 當鍵被取消( 可以通過isValid( ) 方法來判斷)時速那,它將被放在相關的選擇器的已取消的鍵的集合里俐银。注 冊不會立即被取消,但鍵會立即失效端仰。當再次調用 select( ) 方法時(或者一個正在進行的select()調用結束 時)捶惜,已取消的鍵的集合中的被取消的鍵將被清理掉,并且相應的注銷也將完成荔烧。通道會被注銷吱七,而新的 SelectionKey將被返回。當通道關閉時鹤竭,所有相關的鍵會自動取消(記住踊餐,一個通道可以被注冊到多個選擇器 上)。當選擇器關閉時臀稚,所有被注冊到該選擇器的通道都將被注銷吝岭,并且相關的鍵將立即被無效化(取消)。一 旦鍵被無效化,調用它的與選擇相關的方法就將拋出CancelledKeyException窜管。
-
Channel與Selector與SelectionKey的關系
-
Channel的四個events:
SelectionKey.OP_ACCEPT //接收就緒,一般用于服務器判斷是否可以接收 SelectionKey.OP_CONNECT //連接就緒散劫,一般用戶客戶端判斷是否連接成功 SelectionKey.OP_READ //讀取就緒 SelectionKey.OP_WRITE //寫入就緒
-
-
注冊Channel到Selector
int ops = SelectionKey.OP_ACCEPT | SelectionKey.OP_CONNECT;//表示要關注的channel狀態(tài)集合 SelectionKey register(Selector sel, int ops) //所需參數為selector對象,需要關注的Channel集合幕帆。由非阻塞Channel調用获搏, FileChannel(阻塞Channel)對象不可調用.返回當前一個SelectKey對象
-
SelectionKey:表示了一個特定的通道對象和一個特定的選擇器對象之間的注冊關系。
key.attachment(); //返回SelectionKey的attachment蜓肆,attachment可以在注冊channel的時候指定颜凯。 key.channel(); // 返回該SelectionKey對應的channel。 key.selector(); // 返回該SelectionKey對應的Selector仗扬。 key.interestOps(); //返回代表需要Selector監(jiān)控的IO操作的bit mask key.readyOps(); // 返回一個bit mask症概,代表在相應channel上可以進行的IO操作。
-
判斷Selector是否對某個Channel的某種event感興趣
int interestSet = selectionKey.interestOps(); boolean isInterestedInAccept = (interestSet & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT早芭; boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT; boolean isInterestedInRead = interestSet & SelectionKey.OP_READ; boolean isInterestedInWrite = interestSet & SelectionKey.OP_WRITE;
-
判斷某個Channel的操作是否準備就緒(readyOps())
//創(chuàng)建ready集合的方法 int readySet = selectionKey.readyOps(); //檢查這些操作是否就緒的方法 boolean isAcceptable=key.isAcceptable();//是否可讀彼城,是返回 true boolean isWritable()://是否可寫,是返回 true boolean isConnectable()://是否可連接退个,是返回 true boolean isAcceptable()://是否可接收募壕,是返回 true
-
通過SelectionKey訪問其對應的Chnnel與Selector
Channel channel = key.channel(); Selector selector = key.selector();
-
將更多的信息附著在SelectionKey上:
key.attach(theObject); Object attachedObj = key.attachment(); //或者在注冊時想Selector中附加對象 SelectionKey key = channel.register(selector, interestSet, theObject);
一個典型的Nio聊天程序
客戶端
public class NioClient {
public static void main(String[] args) throws Exception {
SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
Selector selector = Selector.open();
//對連接感興趣
socketChannel.register(selector, SelectionKey.OP_CONNECT);
//連接到遠程服務器
socketChannel.connect(new InetSocketAddress(8989));
while (true) {
selector.select();
Set<SelectionKey> selectionKeys = selector.selectedKeys();
selectionKeys.forEach(selectionKey -> {
try {
if (selectionKey.isConnectable()) {
SocketChannel client = (SocketChannel) selectionKey.channel();
//判斷是否處于連接過程中
if (client.isConnectionPending()) {
//完成連接
client.finishConnect();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
byteBuffer.put((LocalDateTime.now() + client.getLocalAddress().toString() + "連接成功").getBytes());
byteBuffer.flip();
client.write(byteBuffer);
//接受鍵盤輸入
ExecutorService executorService = Executors.newSingleThreadExecutor(Executors.defaultThreadFactory());
executorService.submit(() -> {
while (true) {
try {
InputStreamReader inputStreamReader = new InputStreamReader(System.in);
BufferedReader bufferedReader = new BufferedReader(inputStreamReader);
String msg = bufferedReader.readLine();
byteBuffer.clear();
byteBuffer.put(msg.getBytes());
byteBuffer.flip();
client.write(byteBuffer);
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
client.register(selector, SelectionKey.OP_READ);
} else if (selectionKey.isReadable()) {
SocketChannel socketChannel1 = (SocketChannel) selectionKey.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
try {
int read = socketChannel1.read(byteBuffer);
if (read > 0) {
byteBuffer.flip();
System.out.println(new String(byteBuffer.array(), 0, read));
}
} catch (Exception e) {
System.out.println(socketChannel1.getRemoteAddress().toString() + "斷開連接");
socketChannel1.close();
}
}
} catch (Exception e) {
e.printStackTrace();
}
});
selectionKeys.clear();
}
}
}
服務端:
public class NioServer {
private static Map<String, SocketChannel> clientMap = new HashMap<>();
public static void main(String[] args) throws IOException {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
//綁定監(jiān)聽8899端口
ServerSocket socket = serverSocketChannel.socket();
socket.bind(new InetSocketAddress(8989));
Selector selector = Selector.open();
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
try {
selector.select();
Set<SelectionKey> selectionKeys = selector.selectedKeys();
selectionKeys.forEach(
selectionKey -> {
final SocketChannel socketChannel;
try {
if (selectionKey.isAcceptable()) {
ServerSocketChannel server = (ServerSocketChannel) selectionKey.channel();
socketChannel = server.accept();
System.out.println("客戶端:" + socketChannel.getRemoteAddress() + "已經通過端口:" + socketChannel.getLocalAddress() + "連接");
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
clientMap.put("[" + UUID.randomUUID().toString() + "]", socketChannel);
selectionKeys.remove(selectionKey);
} else if (selectionKey.isReadable()) {
ByteBuffer byteBuffer = ByteBuffer.allocate(512);
socketChannel = (SocketChannel) selectionKey.channel();
try {
int read = socketChannel.read(byteBuffer);
if (read > 0) {
byteBuffer.flip();
Charset charset = Charset.forName("utf-8");
String receiveMsg = String.valueOf(charset.decode(byteBuffer).array());
System.out.println(socketChannel + ":" + receiveMsg);
for (Map.Entry<String, SocketChannel> entry : clientMap.entrySet()) {
SocketChannel returnMsg = entry.getValue();
ByteBuffer msgByffer = ByteBuffer.allocate(1024);
msgByffer.put(("senderkey:" + entry.getKey() + receiveMsg).getBytes());
msgByffer.flip();
returnMsg.write(msgByffer);
}
}
} catch (Exception e) {
System.out.println(socketChannel.getRemoteAddress().toString() + "斷開連接");
socketChannel.close();
String senderkey = null;
for (Map.Entry<String, SocketChannel> entry : clientMap.entrySet()) {
if (entry.getValue() == socketChannel) {
senderkey = entry.getKey();
break;
}
}
clientMap.remove(senderkey, socketChannel);
}
selectionKeys.remove(selectionKey);
}
} catch (Exception e) {
e.printStackTrace();
}
}
);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}