文章內(nèi)容來自李林峰的《Netty權威指南》
JDK的NIO類庫
-
緩沖區(qū)Buffer
- ByteBuffer:字節(jié)緩沖區(qū)
- CharBuffer:字符緩沖區(qū)
- ShortBuffer:短整型緩沖區(qū)
- IntBuffer:整型緩沖區(qū)
- LongBuffer:長整型緩沖區(qū)
- FloatBuffer:浮點型緩沖區(qū)
- DoubleBuffer:雙精度浮點型緩沖區(qū)
通道Channel
網(wǎng)絡數(shù)據(jù)通過channel讀取和寫入笔诵,全雙工模式多路復用器Selector
提供選擇已就緒任務的能力,selector會不斷輪詢注冊在上面的channel,某個channel發(fā)生讀或?qū)懯录?就處于就緒狀態(tài),會被selector輪詢出來,然后通過SelectionKey獲取就緒Channel的集合喘先,進行后續(xù)IO操作
NIO服務端
-
序列圖
-
步驟
- 打開ServerSocketChannel,用于監(jiān)聽客戶端的連接,是所有客戶端連接的父管道
ServerSocketChannel serverChannel=ServerSocketChannel.open();
- 綁定監(jiān)聽端口,設置為非阻塞模式
serverChannel.configureBlocking(false); serverChannel.socket().bind(new InetSocketAddress(port),1024); (注:1024指serverChannel的backlog為1024)
- 創(chuàng)建Reactor線程,創(chuàng)建多路復用器Selector并啟動線程
selector=Selector.open(); new Thread(new ReactorTask()).start();
- 將ServerSocketChannel 注冊到Reactor線程的多路復用器Selector,監(jiān)聽accept事件
serverChannel.register(selector,SelectionKey.OP_ACCEPT);
- 多路復用器在線程run方法的無限循環(huán)體內(nèi)輪詢準備就緒的key
selector.select(1000); Set<SelectionKey> selectedKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectedKeys.iterator(); SelectionKey key = null; while (iterator.hasNext()){ key = iterator.next(); iterator.remove(); try { handleInput(key); } catch (Exception e) { if(key!=null){ key.cancel(); if(key.channel()!=null){ key.channel().close(); } } } }
- 多路復用器監(jiān)聽到新客戶端的接入,處理新的接入請求,完成TCP三次握手,建立物理鏈路
SocketChannel sc=serverChannel.accept();
- 設置客戶端鏈路為非阻塞模式
sc.configureBlocking(false);
- 將新接入的客戶端連接注冊到Reactor線程的多路復用器,監(jiān)聽讀操作,讀取客戶端發(fā)送的網(wǎng)絡消息
sc.register(selector, SelectionKey.OP_READ);
- 異步讀取客戶端請求消息到緩沖區(qū)
ByteBuffer readBuffer=ByteBuffer.allocate(1024); int readBytes=sc.read(readBuffer);
- 對ByteBuffer進行編解碼,如果有半包消息指針reset,繼續(xù)讀取后續(xù)的報文,將解碼成功的消息封裝成task,放入線程池中,進行業(yè)務操作
List<Object> messageList=null; while(buffer.hasRemain()){ byteBuffer.mark(); Object message=decode(byteBuffer); if(message==null){ byteBuffer.reset(); break; } messageList.add(message); } if(!byteBuffer.hasRemain()){ byteBuffer.clear(); }else{ byteBuffer.compact(); } if(!messageList!=null & !messageList.isEmpty()){ for(Object messageE : messageList){ handlerTask(messageE); } }
- 將POJO對象encode成ByteBuffer,調(diào)用SocketChannel的異步write接口,將消息異步發(fā)送給客戶端
channel.write(buffer);
NIO客戶端
-
序列圖
-
步驟
- 打開SocketChannel,綁定客戶端本地地址
SocketChannel socketChannel = SocketChannel.open();
- 設置SocketChannel 為非阻塞模式,同時設置客戶端連接TCP參數(shù)
socketChannel.configureBlocking(false); socket.setSendBufferSize(BUFFER_SIZE); ......
- 異步連接服務端
socketChannel.connect(new InetSocketAddress(host, port);
- 判斷是否連接成功,如果連接成功,注冊SelectionKey.OP_READ到多路復用器良风,如果沒有說明服務端沒返回TCP握手應答谊迄,鏈路沒有建立,需要將socketChannel注冊到selector,注冊SelectionKey.OP_CONNECT
if (socketChannel.connect(new InetSocketAddress(host, port))) { socketChannel.register(selector, SelectionKey.OP_READ); } else { socketChannel.register(selector, SelectionKey.OP_CONNECT); }
- 向Reactor線程的多路復用器Selector注冊OP_CONNECT狀態(tài)位,監(jiān)聽服務端ACK應答
socketChannel.register(selector, SelectionKey.OP_CONNECT);
- 創(chuàng)建selector烟央,創(chuàng)建多路復用器并啟動線程
selector = Selector.open();
new Thread(new ReactorTask()).start();
- selector在無限循環(huán)體輪詢就緒的key
selector.select(1000);
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectedKeys.iterator();
SelectionKey key = null;
while (iterator.hasNext()){
key = iterator.next();
iterator.remove();
try {
handleInput(key);
} catch (Exception e) {
if(key!=null){
key.cancel();
if(key.channel()!=null){
key.channel().close();
}
}
}
}
- 接受connect事件并處理
if (key.isConnectable()) {
handlerConnect();
}
- 判斷連接是否完成统诺,完成則注冊讀操作到selector
if (key.isReadable()) {
registerRead();
}
- 向多路復用器selector注冊OP_READ事件
socketChannel.register(selector, SelectionKey.OP_READ);
- 異步讀請求消息到ByteBuffer
int readBytes = sc.read(byteBuffer);
- 對ByteBuffer進行編解碼,如果有半包消息指針reset,繼續(xù)讀取后續(xù)的報文,將解碼成功的消息封裝成task,放入線程池中,進行業(yè)務操作
List<Object> messageList=null;
while(buffer.hasRemain()){
byteBuffer.mark();
Object message=decode(byteBuffer);
if(message==null){
byteBuffer.reset();
break;
}
messageList.add(message);
}
if(!byteBuffer.hasRemain()){
byteBuffer.clear();
}else{
byteBuffer.compact();
}
if(!messageList!=null & !messageList.isEmpty()){
for(Object messageE : messageList){
handlerTask(messageE);
}
}
- 將POJO對象encode成ByteBuffer,調(diào)用SocketChannel的異步write接口,將消息異步發(fā)送給客戶端
channel.write(buffer);