NIO簡(jiǎn)介
*NIO到底是什么的簡(jiǎn)稱?有人喜歡稱之為New IO,因?yàn)樗鄬?duì)于以前的IO是新增的沛善,所以官方稱之為New IO。但是例证,由于之前的IO類(lèi)庫(kù)是阻塞的路呜,New IO就是要讓Java能夠支持非阻塞IO,所以织咧,也有人喜歡稱之為Non-block IO胀葱。 *
1.緩沖區(qū)Buffer
Buffer 是一個(gè)對(duì)象, 它包含一些要寫(xiě)入或者剛讀出的數(shù)據(jù)笙蒙。 在 NIO 中加入 Buffer 對(duì)象抵屿,體現(xiàn)了新庫(kù)與原 I/O 的一個(gè)重要區(qū)別。在面向流的 I/O 中捅位,您將數(shù)據(jù)直接寫(xiě)入或者將數(shù)據(jù)直接讀到 Stream 對(duì)象中轧葛。
在 NIO 庫(kù)中搂抒,所有數(shù)據(jù)都是用緩沖區(qū)處理的。在讀取數(shù)據(jù)時(shí)尿扯,它是直接讀到緩沖區(qū)中的求晶。在寫(xiě)入數(shù)據(jù)時(shí),它是寫(xiě)入到緩沖區(qū)中的衷笋。任何時(shí)候訪問(wèn) NIO 中的數(shù)據(jù)芳杏,您都是將它放到緩沖區(qū)中。
緩沖區(qū)實(shí)質(zhì)上是一個(gè)數(shù)組辟宗。通常它是一個(gè)字節(jié)數(shù)組爵赵,但是也可以使用其他種類(lèi)的數(shù)組。但是一個(gè)緩沖區(qū)不 僅僅 是一個(gè)數(shù)組泊脐。緩沖區(qū)提供了對(duì)數(shù)據(jù)的結(jié)構(gòu)化訪問(wèn)空幻,而且還可以跟蹤系統(tǒng)的讀/寫(xiě)進(jìn)程。
最常用的緩沖區(qū)類(lèi)型是 ByteBuffer容客。一個(gè) ByteBuffer 可以在其底層字節(jié)數(shù)組上進(jìn)行 get/set 操作(即字節(jié)的獲取和設(shè)置)秕铛。
2.通道Channel
Channel是一個(gè)通道,可以通過(guò)它讀取與寫(xiě)入數(shù)據(jù)耘柱,它就像自來(lái)水管一樣如捅,網(wǎng)絡(luò)數(shù)據(jù)通過(guò)Channel讀取和寫(xiě)入棍现。通道與流的不同之處在于通道是雙向的调煎,流只是在一個(gè)方向上移動(dòng)(一個(gè)流必須是 InputStream 或者 OutputStream 的子類(lèi)),而且通道可以用于讀己肮、寫(xiě)或者同時(shí)用于讀寫(xiě)士袄。
因?yàn)镃hannel是全雙工的娄柳,所以它可以比流更好的映射底層操作系統(tǒng)的API艘绍。特別是在UNIX網(wǎng)絡(luò)編程模型中诱鞠,底層操作系統(tǒng)的通道都是全雙工的,同時(shí)支持讀寫(xiě)操作蕉朵。
3.多路復(fù)用器Selector
多路復(fù)用器提供選擇已經(jīng)就緒的任務(wù)的能力始衅。簡(jiǎn)單來(lái)講汛闸,Selector會(huì)不斷地輪詢注冊(cè)在其上的Channel,如果某個(gè)channel上有新的TCP連接接入尸闸、讀和寫(xiě)事件吮廉,這個(gè)Channel就處于就緒狀態(tài)畸肆,會(huì)被Selector輪詢出來(lái)轴脐,然后通過(guò)SelectionKey可以獲取就緒Channel的集合,進(jìn)行后續(xù)的I/O操作恬涧。
一個(gè)多路復(fù)用器Selector可以同時(shí)輪詢多個(gè)Channel碴巾,由于JDK使用epool()代替?zhèn)鹘y(tǒng)的select實(shí)現(xiàn)厦瓢,所以它并沒(méi)有最大連續(xù)句柄1024/2048的限制煮仇。這也就意味著只需要一個(gè)線程負(fù)責(zé)Selector的輪詢,就可以接入成千上萬(wàn)的客戶端刨仑。
NIO服務(wù)端序列圖
一般流程
打開(kāi)ServerSocketChannel,用于監(jiān)聽(tīng)客戶端的鏈接
ServerSocketChannel acceptor = ServerSocketChannel.open();
綁定監(jiān)聽(tīng)端口,設(shè)置連接為非阻塞模式
int port = 8080;
acceptor.socket().bind(new InetSocketAddress(InetAddress.getByName("IP"),port));
acceptor.configureBlocking(false);
創(chuàng)建Reactor線程,創(chuàng)建多路復(fù)用器并啟動(dòng)線程
Selector selector = Selector.open();
new Thread(new ReactorTask()).start();
將ServerSocketChannel注冊(cè)到Reactor線程的多路復(fù)用器Selcetor上
SelectionKey key = acceptor.register(selector,SelectionKey.OP_ACCEPT,ioHandler);
輪詢
int num = selector.select();
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> keys = selectedKeys.iterator();
while(keys.hasNext()){
SelectionKey key = keys.next();
//doWhat
}
新的客戶端接入
SocketChannel sc = serverChannel.accept();
設(shè)置位非阻塞模式
sc.configureBlocking(false);
sc.socket().setReuseAddress(true);
將新接入的客戶端連接注冊(cè)到Reactor上的多路復(fù)用器
SelectionKey key = sc.register(selector,SelectionKey.OP_READ);
異步讀取客戶端消息到緩沖區(qū)
int number = sc.read(receivedBuffer);
最后讀取bytebuffer
while(buffer.hasRemain){
writeBuffer();
}
TimeServer示例
MultiplexerTimeServer.class
/**
* used to test nio
* Created by spark on 10/14/16.
*/
public class MultiplexerTimeServer implements Runnable {
private Selector selector;
private ServerSocketChannel serverSocketChannel;
private volatile boolean stop;
/**
* 初始化多路復(fù)用器,綁定監(jiān)聽(tīng)端口
* @param port
*/
public MultiplexerTimeServer(int port) {
try {
selector = Selector.open();
serverSocketChannel = ServerSocketChannel.open();
//設(shè)為異步非阻塞
serverSocketChannel.configureBlocking(false);
//backlog設(shè)為1024
serverSocketChannel.socket().bind(new InetSocketAddress(port), 1024);
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("The time server is start in port:" + port);
} catch (IOException e) {
e.printStackTrace();
System.exit(1);
}
}
public void stop() {
this.stop = true;
}
/**
* 根據(jù)key的操作位獲取網(wǎng)絡(luò)事件的類(lèi)型 TCP三次握手過(guò)程
* @param key
* @throws IOException
*/
private void handleInput(SelectionKey key) throws IOException {
if (key.isValid()) {
if (key.isAcceptable()) {
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
sc.register(selector, SelectionKey.OP_READ);
}
if(key.isReadable()){
SocketChannel sc = (SocketChannel) key.channel();
//通過(guò)ByteBuffer讀取客戶端的請(qǐng)求信息 開(kāi)辟1K的緩沖區(qū)
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
int readBytes = sc.read(readBuffer);
if(readBytes > 0){
readBuffer.flip();
byte[] bytes = new byte[readBuffer.remaining()];
readBuffer.get(bytes);
String body = new String(bytes,"UTF-8");
System.out.println("The time server received order : " + body );
String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date(System.currentTimeMillis()).toString() : "BAD ORDER";
doWrite(sc,currentTime);
}else if(readBytes < 0){
key.cancel();
sc.close();
}else{
}
}
}
}
/**
* 通過(guò)ByteBuffer將應(yīng)答消息異步發(fā)送給客戶端
* @param socketChannel
* @param response
* @throws IOException
*/
private void doWrite(SocketChannel socketChannel,String response) throws IOException {
if(response != null && response.trim().length() > 0){
byte[] bytes = response.getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
writeBuffer.put(bytes);
writeBuffer.flip();
socketChannel.write(writeBuffer);
}
}
@Override
public void run() {
//遍歷selector,間隔為1s
while (!stop) {
try {
selector.select(1000);
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectedKeys.iterator();
SelectionKey key = null;
//有就緒狀態(tài)的Channel時(shí),selector返回就緒狀態(tài)的Channel的SelectionKey集合,通過(guò)對(duì)就緒狀態(tài)的Channel集合進(jìn)行迭代,進(jìn)行異步讀寫(xiě)操作
while (it.hasNext()) {
key = it.next();
it.remove();
try {
handleInput(key);
} catch (IOException e) {
if(key != null){
key.cancel();
if(key.channel() != null){
key.channel().close();
}
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
TimeServer.class
public class TimeServer {
public static void main(String[] args) {
int port = 8080;
if(args != null && args.length > 0){
try {
port = Integer.valueOf(args[0]);
} catch (NumberFormatException e) {
e.printStackTrace();
}
}
MultiplexerTimeServer timeServer = new MultiplexerTimeServer(port);
new Thread(timeServer,"NIO-MultiplexerTimeServer-001").start();
}
}
NIO客戶端序列圖
一般流程
打開(kāi)SocketChannel,綁定客戶端本地地址
SocketChannel clientChannel = SocketChannel.open();
設(shè)置SocketChannel為非阻塞模式
clientChannel.configureBlocking(false);
clientChannel.socket().setReuseAddress(true);
clientChannel.socket().setReceiveBufferSize(BUFFER_SIZE);
clientChannel.socket().setSendBufferSize(BUFFER_SIZE);
異步連接服務(wù)端
boolean connected = clientChannel.connect(new InetSocketAddress("ip",port));
判斷 注冊(cè)
if(connected){
clientChannel.register(selector,SelectionKey.OP_READ,ioHandler);
}else{
clientChannel.register(selector,SelectionKey.OP_CONNECT,ioHandler);
}
創(chuàng)建Reactor線程,創(chuàng)建多路復(fù)用器并啟動(dòng)線程
Selector selector = Selector.open();
new Thread(new ReactorTask()).start();
輪詢
int num = selector.select();
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> keys = selectedKeys.iterator();
while(keys.hasNext()){
SelectionKey key = keys.next();
}
接受connect事件進(jìn)行處理
if(key.isConnectable()){
//handleConnect
}
連接成功,注冊(cè)讀事件
if(clientChannel.finishConnect()) registerRead();
異步讀和消息讀取
int number = sc.read(receivedBuffer);
while(buffer.hasRemain){
}
TimeClient示例
TimeClientHandler.class
public class TimeClientHandler implements Runnable {
private String host;
private int port;
private Selector selector;
private SocketChannel socketChannel;
private volatile boolean stop;
public TimeClientHandler(int port, String host) {
this.port = port;
this.host = host == null ? "127.0.0.1" : host;
try {
selector = Selector.open();
socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
} catch (IOException e) {
e.printStackTrace();
System.exit(1);
}
}
private void handleInput(SelectionKey key) throws IOException {
if(key.isValid()){
SocketChannel sc = (SocketChannel) key.channel();
if(key.isConnectable()){
if(sc.finishConnect()){
sc.register(selector,SelectionKey.OP_READ);
doWrite(sc);
}else{
System.exit(1);
}
}
if(key.isReadable()){
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
int readBytes = sc.read(readBuffer);
if(readBytes > 0) {
readBuffer.flip();
byte[] bytes = new byte[readBuffer.remaining()];
readBuffer.get(bytes);
String body = new String(bytes, "UTF-8");
System.out.println("Now is " + body);
this.stop = true;
}else if(readBytes < 0){
key.cancel();
sc.close();
}else{
}
}
}
}
private void doWrite(SocketChannel sc) throws IOException {
byte[] req = "QUERY TIME ORDER".getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
writeBuffer.put(req);
writeBuffer.flip();
sc.write(writeBuffer);
if(!writeBuffer.hasRemaining()){
System.out.println("Send order 2 server succeed.");
}
}
private void doConnect() throws IOException {
if(socketChannel.connect(new InetSocketAddress(host,port))){
socketChannel.register(selector,SelectionKey.OP_READ);
doWrite(socketChannel);
}else{
socketChannel.register(selector,SelectionKey.OP_CONNECT);
}
}
@Override
public void run() {
try {
doConnect();
} catch (IOException e) {
e.printStackTrace();
System.exit(1);
}
while (!stop) {
try {
selector.select(1000);
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectedKeys.iterator();
SelectionKey key = null;
while (it.hasNext()) {
key = it.next();
it.remove();
try {
handleInput(key);
} catch (IOException e) {
if(key != null){
key.cancel();
if(key.channel() != null){
key.channel().close();
}
}
}
}
} catch (IOException e) {
e.printStackTrace();
System.exit(1);
}
}
if(selector != null){
try {
selector.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
TimeClient.class
public class TimeClient {
public static void main(String[] args) {
int port = 8080;
if(args != null && args.length > 0){
try {
port = Integer.valueOf(args[0]);
} catch (NumberFormatException e) {
e.printStackTrace();
}
}
new Thread(new TimeClientHandler(port,"127.0.0.1"),"TimeClient-001").start();
}
}
AIO
NIO2.0引入了新的異步通道概念封拧,并提供了異步文件通道和異步套接字通道的實(shí)現(xiàn)泽西。
異步通道提供兩種方式獲取操作結(jié)果
- 通過(guò)java.util.concurrent.Feature類(lèi)來(lái)表示異步操作的結(jié)果缰趋;
- 在執(zhí)行異步操作的時(shí)候傳入一個(gè)java.nio.channels秘血。
CompletionHandler接口的實(shí)現(xiàn)類(lèi)作為操作完成的回調(diào)。
AsyncTimeServerHandler.class
public class AsyncTimeServerHandler implements Runnable {
private int port;
CountDownLatch latch;
AsynchronousServerSocketChannel asynchronousServerSocketChannel;
public AsyncTimeServerHandler(int port) {
this.port = port;
try {
asynchronousServerSocketChannel = AsynchronousServerSocketChannel.open(); // 創(chuàng)建一個(gè)異步服務(wù)端通道仔涩。
asynchronousServerSocketChannel.bind(new InetSocketAddress(port));// bind 一個(gè)監(jiān)聽(tīng)端口
System.out.println("The time server is start in port : " + port);
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
latch = new CountDownLatch(1); // 在完成一組正在執(zhí)行的操作之前熔脂,允許當(dāng)前的線程一直阻塞霞揉。
doAccept();
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void doAccept() {
asynchronousServerSocketChannel.accept(this, new AcceptCompletionHandler());// 處理接受消息的通知晰骑。
}
}
AcceptCompletionHandler.class
public class AcceptCompletionHandler implements CompletionHandler<AsynchronousSocketChannel,AsyncTimeServerHandler> {
@Override
public void completed(AsynchronousSocketChannel result, AsyncTimeServerHandler attachment) {
attachment.asynchronousServerSocketChannel.accept(attachment, this);
ByteBuffer buffer = ByteBuffer.allocate(1024);
result.read(buffer, buffer, new ReadCompletionHandler(result));
}
@Override
public void failed(Throwable exc, AsyncTimeServerHandler attachment) {
exc.printStackTrace();
attachment.latch.countDown();
}
}
ReadCompletionHandler.class
public class ReadCompletionHandler implements CompletionHandler<Integer, ByteBuffer> {
private AsynchronousSocketChannel channel;
public ReadCompletionHandler(AsynchronousSocketChannel channel) {
if (this.channel == null)
this.channel = channel;
}
@Override
public void completed(Integer result, ByteBuffer attachment) {
attachment.flip();
byte[] body = new byte[attachment.remaining()];
attachment.get(body);
try {
String req = new String(body, "UTF-8");
System.out.println("The time server receive order : " + req);
String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(req) ? new java.util.Date(
System.currentTimeMillis()).toString() : "BAD ORDER";
doWrite(currentTime);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
try {
this.channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
private void doWrite(String currentTime) {
if (currentTime != null && currentTime.trim().length() > 0) {
byte[] bytes = (currentTime).getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
writeBuffer.put(bytes);
writeBuffer.flip();
channel.write(writeBuffer, writeBuffer,
new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer buffer) {
// 如果沒(méi)有發(fā)送完成隶症,繼續(xù)發(fā)送
if (buffer.hasRemaining())
channel.write(buffer, buffer, this);
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
try {
channel.close();
} catch (IOException e) {
// ingnore on close
}
}
});
}
}
}
TimeServer.class
public class TimeServer {
public static void main(String[] args) throws IOException {
int port = 8080;
if (args != null && args.length > 0) {
try {
port = Integer.valueOf(args[0]);
} catch (NumberFormatException e) {
// 采用默認(rèn)值
}
}
AsyncTimeServerHandler timeServer = new AsyncTimeServerHandler(port);
new Thread(timeServer, "AIO-AsyncTimeServerHandler-001").start();
}
}
AsyncTimeClientHandler.class
public class AsyncTimeClientHandler implements CompletionHandler<Void, AsyncTimeClientHandler>, Runnable{
private AsynchronousSocketChannel client;
private String host;
private int port;
private CountDownLatch latch;
public AsyncTimeClientHandler(String host, int port) {
this.host = host;
this.port = port;
try {
client = AsynchronousSocketChannel.open();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void completed(Void result, AsyncTimeClientHandler attachment) {
byte[] req = "QUERY TIME ORDER".getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
writeBuffer.put(req);
writeBuffer.flip();
client.write(writeBuffer, writeBuffer,
new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer buffer) {
if (buffer.hasRemaining()) {
client.write(buffer, buffer, this);
} else {
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
client.read(
readBuffer,
readBuffer,
new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer buffer) {
buffer.flip();
byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
String body;
try {
body = new String(bytes, "UTF-8");
System.out.println("Now is : " + body);
latch.countDown();
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
try {
client.close();
latch.countDown();
} catch (IOException e) {
// ingnore on close
}
}
});
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
try {
client.close();
latch.countDown();
} catch (IOException e) {
// ingnore on close
}
}
});
}
@Override
public void failed(Throwable exc, AsyncTimeClientHandler attachment) {
exc.printStackTrace();
try {
client.close();
latch.countDown();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
latch = new CountDownLatch(1);
client.connect(new InetSocketAddress(host, port), this, this);
try {
latch.await();
} catch (InterruptedException e1) {
e1.printStackTrace();
}
try {
client.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
TimeClient.class
public class TimeClient {
public static void main(String[] args) {
int port = 8080;
if (args != null && args.length > 0) {
try {
port = Integer.valueOf(args[0]);
} catch (NumberFormatException e) {
// 采用默認(rèn)值
}
}
new Thread(new AsyncTimeClientHandler("127.0.0.1", port), "AIO-AsyncTimeClientHandler-001").start();
}
}