NIO的來歷:https://juejin.im/entry/592e29a4ac502e006c9b4dc7
NIO是Java提供的非阻塞I/O API拾碌。
非阻塞的意義在于可以使用一個(gè)線程對(duì)大量的數(shù)據(jù)連接進(jìn)行處理,非常適用于"短數(shù)據(jù)長連接"的應(yīng)用場景,例如即時(shí)通訊軟件饵史。
在一個(gè)阻塞C/S系統(tǒng)中,服務(wù)器要為每一個(gè)客戶連接開啟一個(gè)線程阻塞等待客戶端發(fā)送的消息.若使用非阻塞技術(shù),服務(wù)器可以使用一個(gè)線程對(duì)連接進(jìn)行輪詢,無須阻塞等待.這大大減少了內(nèi)存資源的浪費(fèi),也避免了服務(wù)器在客戶線程中不斷切換帶來的CPU消耗,服務(wù)器對(duì)CPU的有效使用率大大提高.
NIO的核心概念包括Channel,Selector,SelectionKey,Buffer髓抑。
Channel是I/O通道,可以向其注冊(cè)Selector,應(yīng)用成功可以通過select操作獲取當(dāng)前通道已經(jīng)準(zhǔn)備好的可以無阻塞執(zhí)行的操作.這由SelectionKey表示。
SelectionKey的常量字段SelectionKey.OP_***分別對(duì)應(yīng)Channel的幾種操作例如connect(),accept(),read(),write()仗处。
select操作后得到SelectionKey.OP_WRITE或者READ即可在Channel上面無阻塞調(diào)用read和write方法,Channel的讀寫操作均需要通過Buffer進(jìn)行.即讀是講數(shù)據(jù)從通道中讀入Buffer然后做進(jìn)一步處理.寫需要先將數(shù)據(jù)寫入Buffer然后通道接收Buffer眯勾。
下面是一個(gè)使用NIO的基本C/S示例.該示例只為顯示如何使用基本的API而存在,其代碼的健壯性,合理性都不具參考價(jià)值。
這個(gè)示例,實(shí)現(xiàn)一個(gè)簡單的C/S,客戶端向服務(wù)器端發(fā)送消息,服務(wù)器將收到的消息打印到控制臺(tái)婆誓,并將該消息返回給客戶端吃环,客戶端再打印到控制臺(tái)。現(xiàn)實(shí)的應(yīng)用中需要定義發(fā)送數(shù)據(jù)使用的協(xié)議,以幫助服務(wù)器解析消息.本示例只是無差別的使用默認(rèn)編碼將收到的字節(jié)轉(zhuǎn)換字符并打印洋幻。ByteBuffer的容量越小,對(duì)一條消息的處理次數(shù)就越多,容量大就可以在更少的循環(huán)次數(shù)內(nèi)讀完整個(gè)消息.所以真是的應(yīng)用場景,要考慮適當(dāng)?shù)木彺娲笮∫蕴岣咝省?/p>
服務(wù)器端代碼:
package nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
public class Server {
private Selector selector;
private ByteBuffer readBuffer = ByteBuffer.allocate(1024);//調(diào)整緩存的大小可以看到打印輸出的變化
private ByteBuffer sendBuffer = ByteBuffer.allocate(1024);//調(diào)整緩存的大小可以看到打印輸出的變化
String str;
public void start() throws IOException {
// 打開服務(wù)器套接字通道
ServerSocketChannel ssc = ServerSocketChannel.open();
// 服務(wù)器配置為非阻塞
ssc.configureBlocking(false);
// 進(jìn)行服務(wù)的綁定
ssc.bind(new InetSocketAddress("localhost", 8001));
// 通過open()方法找到Selector
selector = Selector.open();
// 注冊(cè)到selector郁轻,等待連接
ssc.register(selector, SelectionKey.OP_ACCEPT);
while (!Thread.currentThread().isInterrupted()) {
selector.select();
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> keyIterator = keys.iterator();
while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
if (!key.isValid()) {
continue;
}
if (key.isAcceptable()) {
accept(key);
} else if (key.isReadable()) {
read(key);
} else if (key.isWritable()) {
write(key);
}
keyIterator.remove(); //該事件已經(jīng)處理,可以丟棄
}
}
}
private void write(SelectionKey key) throws IOException, ClosedChannelException {
SocketChannel channel = (SocketChannel) key.channel();
System.out.println("write:"+str);
sendBuffer.clear();
sendBuffer.put(str.getBytes());
sendBuffer.flip();
channel.write(sendBuffer);
channel.register(selector, SelectionKey.OP_READ);
}
private void read(SelectionKey key) throws IOException {
SocketChannel socketChannel = (SocketChannel) key.channel();
// Clear out our read buffer so it's ready for new data
this.readBuffer.clear();
// readBuffer.flip();
// Attempt to read off the channel
int numRead;
try {
numRead = socketChannel.read(this.readBuffer);
} catch (IOException e) {
// The remote forcibly closed the connection, cancel
// the selection key and close the channel.
key.cancel();
socketChannel.close();
return;
}
str = new String(readBuffer.array(), 0, numRead);
System.out.println(str);
socketChannel.register(selector, SelectionKey.OP_WRITE);
}
private void accept(SelectionKey key) throws IOException {
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
SocketChannel clientChannel = ssc.accept();
clientChannel.configureBlocking(false);
clientChannel.register(selector, SelectionKey.OP_READ);
System.out.println("a new client connected "+clientChannel.getRemoteAddress());
}
public static void main(String[] args) throws IOException {
System.out.println("server started...");
new Server().start();
}
}
客戶端代碼:
package nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Scanner;
import java.util.Set;
public class Client {
ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
public void start() throws IOException {
// 打開socket通道
SocketChannel sc = SocketChannel.open();
//設(shè)置為非阻塞
sc.configureBlocking(false);
//連接服務(wù)器地址和端口
sc.connect(new InetSocketAddress("localhost", 8001));
//打開選擇器
Selector selector = Selector.open();
//注冊(cè)連接服務(wù)器socket的動(dòng)作
sc.register(selector, SelectionKey.OP_CONNECT);
Scanner scanner = new Scanner(System.in);
while (true) {
//選擇一組鍵文留,其相應(yīng)的通道已為 I/O 操作準(zhǔn)備就緒范咨。
//此方法執(zhí)行處于阻塞模式的選擇操作故觅。
selector.select();
//返回此選擇器的已選擇鍵集。
Set<SelectionKey> keys = selector.selectedKeys();
System.out.println("keys=" + keys.size());
Iterator<SelectionKey> keyIterator = keys.iterator();
while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
keyIterator.remove();
// 判斷此通道上是否正在進(jìn)行連接操作渠啊。
if (key.isConnectable()) {
sc.finishConnect();
sc.register(selector, SelectionKey.OP_WRITE);
System.out.println("server connected...");
break;
} else if (key.isWritable()) { //寫數(shù)據(jù)
System.out.print("please input message:");
String message = scanner.nextLine();
//ByteBuffer writeBuffer = ByteBuffer.wrap(message.getBytes());
writeBuffer.clear();
writeBuffer.put(message.getBytes());
//將緩沖區(qū)各標(biāo)志復(fù)位,因?yàn)橄蚶锩鎝ut了數(shù)據(jù)標(biāo)志被改變要想從中讀取數(shù)據(jù)發(fā)向服務(wù)器,就要復(fù)位
writeBuffer.flip();
sc.write(writeBuffer);
//注冊(cè)寫操作,每個(gè)chanel只能注冊(cè)一個(gè)操作输吏,最后注冊(cè)的一個(gè)生效
//如果你對(duì)不止一種事件感興趣,那么可以用“位或”操作符將常量連接起來
//int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;
//使用interest集合
sc.register(selector, SelectionKey.OP_READ);
sc.register(selector, SelectionKey.OP_WRITE);
sc.register(selector, SelectionKey.OP_READ);
} else if (key.isReadable()){//讀取數(shù)據(jù)
System.out.print("receive message:");
SocketChannel client = (SocketChannel) key.channel();
//將緩沖區(qū)清空以備下次讀取
readBuffer.clear();
int num = client.read(readBuffer);
System.out.println(new String(readBuffer.array(),0, num));
//注冊(cè)讀操作替蛉,下一次讀取
sc.register(selector, SelectionKey.OP_WRITE);
}
}
}
}
public static void main(String[] args) throws IOException {
new Client().start();
}
}