Unblocking IO(New IO): 同步非阻塞的編程方式。
NIO本身是基于事件驅(qū)動(dòng)思想來完成的,其主要想解決的是BIO的大并發(fā)問題,NIO基于Reactor耻煤,當(dāng)socket有流可讀或可寫入socket時(shí),操作系統(tǒng)會(huì)相應(yīng)的通知引用程序進(jìn)行處理准颓,應(yīng)用再將流讀取到緩沖區(qū)或?qū)懭氩僮飨到y(tǒng)哈蝇。也就是說,這個(gè)時(shí)候攘已,已經(jīng)不是一個(gè)連接就要對(duì)應(yīng)一個(gè)處理線程了炮赦,而是有效的請(qǐng)求,對(duì)應(yīng)一個(gè)線程样勃,當(dāng)連接沒有數(shù)據(jù)時(shí)吠勘,是沒有工作線程來處理的。
NIO的最重要的地方是當(dāng)一個(gè)連接創(chuàng)建后峡眶,不需要對(duì)應(yīng)一個(gè)線程剧防,這個(gè)連接會(huì)被注冊(cè)到多路復(fù)用器上面,所以所有的連接只需要一個(gè)線程就可以搞定辫樱,當(dāng)這個(gè)線程中的多路復(fù)用器進(jìn)行輪詢的時(shí)候峭拘,發(fā)現(xiàn)連接上有請(qǐng)求的話,才開啟一個(gè)線程進(jìn)行處理狮暑,也就是一個(gè)請(qǐng)求一個(gè)線程模式棚唆。
在NIO的處理方式中,當(dāng)一個(gè)請(qǐng)求來的話心例,開啟線程進(jìn)行處理,可能會(huì)等待后端應(yīng)用的資源(JDBC連接等)鞋囊,其實(shí)這個(gè)線程就被阻塞了止后,當(dāng)并發(fā)上來的話,還是會(huì)有BIO一樣的問題溜腐。
同步非阻塞译株,服務(wù)器實(shí)現(xiàn)模式為一個(gè)請(qǐng)求一個(gè)通道,即客戶端發(fā)送的連接請(qǐng)求都會(huì)注冊(cè)到多路復(fù)用器上挺益,多路復(fù)用器輪詢到連接有I/O請(qǐng)求時(shí)才啟動(dòng)一個(gè)線程進(jìn)行處理歉糜。
NIO方式適用于連接數(shù)目多且連接比較短(輕操作)的架構(gòu),比如聊天服務(wù)器望众,并發(fā)局限于應(yīng)用中匪补,編程復(fù)雜伞辛,JDK1.4開始支持。
Buffer:ByteBuffer,CharBuffer,ShortBuffer,IntBuffer,LongBuffer,FloatBuffer,DoubleBuffer夯缺。
Channel:SocketChannel,ServerSocketChannel蚤氏。
Selector:Selector,AbstractSelector
SelectionKey:OP_READ,OP_WRITE,OP_CONNECT,OP_ACCEPT
package com.bjsxt.socket.nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.Scanner;
public class NIOClient {
public static void main(String[] args) {
// 遠(yuǎn)程地址創(chuàng)建
InetSocketAddress remote = new InetSocketAddress("localhost", 9999);
SocketChannel channel = null;
// 定義緩存。
ByteBuffer buffer = ByteBuffer.allocate(1024);
try {
// 開啟通道
channel = SocketChannel.open();
// 連接遠(yuǎn)程服務(wù)器踊兜。
channel.connect(remote);
Scanner reader = new Scanner(System.in);
while(true){
System.out.print("put message for send to server > ");
String line = reader.nextLine();
if(line.equals("exit")){
break;
}
// 將控制臺(tái)輸入的數(shù)據(jù)寫入到緩存竿滨。
buffer.put(line.getBytes("UTF-8"));
// 重置緩存游標(biāo)
buffer.flip();
// 將數(shù)據(jù)發(fā)送給服務(wù)器
channel.write(buffer);
// 清空緩存數(shù)據(jù)。
buffer.clear();
// 讀取服務(wù)器返回的數(shù)據(jù)
int readLength = channel.read(buffer);
if(readLength == -1){
break;
}
// 重置緩存游標(biāo)
buffer.flip();
byte[] datas = new byte[buffer.remaining()];
// 讀取數(shù)據(jù)到字節(jié)數(shù)組捏境。
buffer.get(datas);
System.out.println("from server : " + new String(datas, "UTF-8"));
// 清空緩存于游。
buffer.clear();
}
} catch (IOException e) {
e.printStackTrace();
} finally{
if(null != channel){
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
package com.bjsxt.socket.nio;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Scanner;
public class NIOServer implements Runnable {
// 多路復(fù)用器, 選擇器垫言。 用于注冊(cè)通道的贰剥。
private Selector selector;
// 定義了兩個(gè)緩存。分別用于讀和寫骏掀。 初始化空間大小單位為字節(jié)鸠澈。
private ByteBuffer readBuffer = ByteBuffer.allocate(1024);
private ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
public static void main(String[] args) {
new Thread(new NIOServer(9999)).start();
}
public NIOServer(int port) {
init(port);
}
private void init(int port){
try {
System.out.println("server starting at port " + port + " ...");
// 開啟多路復(fù)用器
this.selector = Selector.open();
// 開啟服務(wù)通道
ServerSocketChannel serverChannel = ServerSocketChannel.open();
// 非阻塞, 如果傳遞參數(shù)true截驮,為阻塞模式笑陈。
serverChannel.configureBlocking(false);
// 綁定端口
serverChannel.bind(new InetSocketAddress(port));
// 注冊(cè),并標(biāo)記當(dāng)前服務(wù)通道狀態(tài)
/*
* register(Selector, int)
* int - 狀態(tài)編碼
* OP_ACCEPT : 連接成功的標(biāo)記位葵袭。
* OP_READ : 可以讀取數(shù)據(jù)的標(biāo)記
* OP_WRITE : 可以寫入數(shù)據(jù)的標(biāo)記
* OP_CONNECT : 連接建立后的標(biāo)記
*/
serverChannel.register(this.selector, SelectionKey.OP_ACCEPT);
System.out.println("server started.");
} catch (IOException e) {
e.printStackTrace();
}
}
public void run(){
while(true){
try {
// 阻塞方法涵妥,當(dāng)至少一個(gè)通道被選中,此方法返回坡锡。
// 通道是否選擇蓬网,由注冊(cè)到多路復(fù)用器中的通道標(biāo)記決定。
this.selector.select();
// 返回以選中的通道標(biāo)記集合鹉勒, 集合中保存的是通道的標(biāo)記帆锋。相當(dāng)于是通道的ID。
Iterator<SelectionKey> keys = this.selector.selectedKeys().iterator();
while(keys.hasNext()){
SelectionKey key = keys.next();
// 將本次要處理的通道從集合中刪除禽额,下次循環(huán)根據(jù)新的通道列表再次執(zhí)行必要的業(yè)務(wù)邏輯
keys.remove();
// 通道是否有效
if(key.isValid()){
// 阻塞狀態(tài)
try{
if(key.isAcceptable()){
accept(key);
}
}catch(CancelledKeyException cke){
// 斷開連接锯厢。 出現(xiàn)異常。
key.cancel();
}
// 可讀狀態(tài)
try{
if(key.isReadable()){
read(key);
}
}catch(CancelledKeyException cke){
key.cancel();
}
// 可寫狀態(tài)
try{
if(key.isWritable()){
write(key);
}
}catch(CancelledKeyException cke){
key.cancel();
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
private void write(SelectionKey key){
this.writeBuffer.clear();
SocketChannel channel = (SocketChannel)key.channel();
Scanner reader = new Scanner(System.in);
try {
System.out.print("put message for send to client > ");
String line = reader.nextLine();
// 將控制臺(tái)輸入的字符串寫入Buffer中脯倒。 寫入的數(shù)據(jù)是一個(gè)字節(jié)數(shù)組实辑。
writeBuffer.put(line.getBytes("UTF-8"));
writeBuffer.flip();
channel.write(writeBuffer);
channel.register(this.selector, SelectionKey.OP_READ);
} catch (IOException e) {
e.printStackTrace();
}
}
private void read(SelectionKey key){
try {
// 清空讀緩存。
this.readBuffer.clear();
// 獲取通道
SocketChannel channel = (SocketChannel)key.channel();
// 將通道中的數(shù)據(jù)讀取到緩存中藻丢。通道中的數(shù)據(jù)剪撬,就是客戶端發(fā)送給服務(wù)器的數(shù)據(jù)。
int readLength = channel.read(readBuffer);
// 檢查客戶端是否寫入數(shù)據(jù)悠反。
if(readLength == -1){
// 關(guān)閉通道
key.channel().close();
// 關(guān)閉連接
key.cancel();
return;
}
/*
* flip残黑, NIO中最復(fù)雜的操作就是Buffer的控制馍佑。
* Buffer中有一個(gè)游標(biāo)。游標(biāo)信息在操作后不會(huì)歸零萍摊,如果直接訪問Buffer的話挤茄,數(shù)據(jù)有不一致的可能。
* flip是重置游標(biāo)的方法冰木。NIO編程中穷劈,flip方法是常用方法。
*/
this.readBuffer.flip();
// 字節(jié)數(shù)組踊沸,保存具體數(shù)據(jù)的歇终。 Buffer.remaining() -> 是獲取Buffer中有效數(shù)據(jù)長(zhǎng)度的方法。
byte[] datas = new byte[readBuffer.remaining()];
// 是將Buffer中的有效數(shù)據(jù)保存到字節(jié)數(shù)組中逼龟。
readBuffer.get(datas);
System.out.println("from " + channel.getRemoteAddress() + " client : " + new String(datas, "UTF-8"));
// 注冊(cè)通道评凝, 標(biāo)記為寫操作。
channel.register(this.selector, SelectionKey.OP_WRITE);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
try {
key.channel().close();
key.cancel();
} catch (IOException e1) {
e1.printStackTrace();
}
}
}
private void accept(SelectionKey key){
try {
// 此通道為init方法中注冊(cè)到Selector上的ServerSocketChannel
ServerSocketChannel serverChannel = (ServerSocketChannel)key.channel();
// 阻塞方法腺律,當(dāng)客戶端發(fā)起請(qǐng)求后返回奕短。 此通道和客戶端一一對(duì)應(yīng)。
SocketChannel channel = serverChannel.accept();
channel.configureBlocking(false);
// 設(shè)置對(duì)應(yīng)客戶端的通道標(biāo)記狀態(tài)匀钧,此通道為讀取數(shù)據(jù)使用的翎碑。
channel.register(this.selector, SelectionKey.OP_READ);
} catch (IOException e) {
e.printStackTrace();
}
}
}
package com.bjsxt.socket.nio;
import java.nio.ByteBuffer;
/**
*
* Buffer的應(yīng)用固定邏輯
* 寫操作順序
* 1. clear()
* 2. put() -> 寫操作
* 3. flip() -> 重置游標(biāo)
* 4. SocketChannel.write(buffer); -> 將緩存數(shù)據(jù)發(fā)送到網(wǎng)絡(luò)的另一端
* 5. clear()
*
* 讀操作順序
* 1. clear()
* 2. SocketChannel.read(buffer); -> 從網(wǎng)絡(luò)中讀取數(shù)據(jù)
* 3. buffer.flip() -> 重置游標(biāo)
* 4. buffer.get() -> 讀取數(shù)據(jù)
* 5. buffer.clear()
*
*/
public class TestBuffer {
public static void main(String[] args) throws Exception {
ByteBuffer buffer = ByteBuffer.allocate(8);
byte[] temp = new byte[]{3,2,1};
// 寫入數(shù)據(jù)之前 : java.nio.HeapByteBuffer[pos=0 lim=8 cap=8]
// pos - 游標(biāo)位置, lim - 限制數(shù)量之斯, cap - 最大容量
System.out.println("寫入數(shù)據(jù)之前 : " + buffer);
// 寫入字節(jié)數(shù)組到緩存
buffer.put(temp);
// 寫入數(shù)據(jù)之后 : java.nio.HeapByteBuffer[pos=3 lim=8 cap=8]
// 游標(biāo)為3日杈, 限制為8, 容量為8
System.out.println("寫入數(shù)據(jù)之后 : " + buffer);
// 重置游標(biāo) 佑刷, lim = pos ; pos = 0;
buffer.flip();
// 重置游標(biāo)之后 : java.nio.HeapByteBuffer[pos=0 lim=3 cap=8]
// 游標(biāo)為0莉擒, 限制為3, cap為8
System.out.println("重置游標(biāo)之后 : " + buffer);
// 清空Buffer瘫絮, pos = 0; lim = cap;
// buffer.clear();
// get() -> 獲取當(dāng)前游標(biāo)指向的位置的數(shù)據(jù)涨冀。
// System.out.println(buffer.get());
/*for(int i = 0; i < buffer.remaining(); i++){
// get(int index) -> 獲取指定位置的數(shù)據(jù)。
int data = buffer.get(i);
System.out.println(i + " - " + data);
}*/
}
}