背景
java.nio全稱java non-blocking IO先匪,是指jdk1.4 及以上版本里提供的新api(New IO) 敷钾,為所有的原始類型(boolean類型除外)提供緩存支持的數(shù)據(jù)容器,使用它可以提供非阻塞式的高伸縮性網絡。
概述
底層原理:
1. 由一個專門的線程(selector線程)來處理所有的 IO 事件,并負責分發(fā)司蔬。?
2. 事件驅動機制:事件到的時候觸發(fā),而不是同步的去監(jiān)視事件姨蝴。?
3. 線程通訊:線程之間通過 wait,notify 等方式通訊俊啼。保證每次上下文切換都是有意義的。減少無謂的線程切換左医。?
IO模型:
多路復用IO模型
多路復用IO是在同步非阻塞IO基礎上優(yōu)化而來授帕,在同步非阻塞的情況下,用戶線程需要輪訓的請求浮梢,來獲取返回的數(shù)據(jù)跛十,而多路復用模式這個輪訓操作交給了selector(選擇器),selector會一直阻塞秕硝,直到有scoket返回芥映,而且一個selector可以監(jiān)聽多個IO事件。
重要成員:
1.Channel(通道)
NIO中的Channel的主要實現(xiàn)有:
FileChannel
DatagramChannel
SocketChannel
ServerSocketChannel
2.Buffer(緩沖區(qū))
緩沖區(qū)缝裤,實際上是一個容器屏轰,一個連續(xù)數(shù)組颊郎。Channel提供從文件憋飞、網絡讀取數(shù)據(jù)的渠道,但是讀寫的數(shù)據(jù)都必須經過Buffer姆吭。NIO中的關鍵Buffer實現(xiàn)有:ByteBuffer, CharBuffer, DoubleBuffer, FloatBuffer, IntBuffer, LongBuffer, ShortBuffer榛做,分別對應基本數(shù)據(jù)類型: byte, char, double, float, int, long, short。當然NIO中還有MappedByteBuffer, HeapByteBuffer, DirectByteBuffer等内狸。這里介紹一下MappedByteBuffer检眯,MappedByteBuffer是NIO引入的文件內存映射方案,讀寫性能極高昆淡。NIO最主要的就是實現(xiàn)了對異步操作的支持锰瘸。其中一種通過把一個套接字通道(SocketChannel)注冊到一個選擇器(Selector)中,不時調用后者的選擇(select)方法就能返回滿足的選擇鍵(SelectionKey),鍵中包含了SOCKET事件信息。這就是select模型昂灵。
SocketChannel的讀寫是通過一個類叫ByteBuffer來操作的.這個類本身的設計是不錯的,比直接操作byte[]方便多了. ByteBuffer有兩種模式:直接/間接.間接模式最典型(也只有這么一種)的就是HeapByteBuffer,即操作堆內存 (byte[]).但是內存畢竟有限,如果我要發(fā)送一個1G的文件怎么辦?不可能真的去分配1G的內存.這時就必須使用”直接”模式,即 MappedByteBuffer,文件映射.
下面我們看看NIO讀取數(shù)據(jù)的模式:寫數(shù)據(jù)和讀數(shù)據(jù)都會先寫入buffer然后再channel再從buffer讀避凝,然后channel再寫入buffer,server從buffer中取
方法調用:
向Buffer中寫數(shù)據(jù):
從Channel寫到Buffer (fileChannel.read(buf))
通過Buffer的put()方法 (buf.put(…))
從Buffer中讀取數(shù)據(jù):
從Buffer讀取到Channel (channel.write(buf))
使用get()方法從Buffer中讀取數(shù)據(jù) (buf.get())
Buffer數(shù)組結構:
capacity數(shù)組長度眨补,position下一個需要處理的數(shù)據(jù)下標管削,limit數(shù)組中不可操作的下一個數(shù)據(jù)的位置,mark用于記錄當前position的位置
3.Selector
Selector運行單線程處理多個Channel撑螺,如果你的應用打開了多個通道含思,但每個連接的流量都很低,使用Selector就會很方便
方法調用:
Selector的創(chuàng)建:
Selector selector = Selector.open();
selector?.socket().bind(new InetSocketAddress(PORT));
?selector?.configureBlocking(false);
?selector?.register(selector, SelectionKey.OP_ACCEPT);
注意register()方法的第二個參數(shù)。這是一個“interest集合”含潘,意思是在通過Selector監(jiān)聽Channel時對什么事件感興趣饲做。可以監(jiān)聽四種不同類型的事件遏弱,這四種事件用SelectionKey的四個常量來表示:
SelectionKey.OP_CONNECT
SelectionKey.OP_ACCEPT
SelectionKey.OP_READ
SelectionKey.OP_WRITE
可以用像檢測interest集合那樣的方法艇炎,來檢測channel中什么事件或操作已經就緒。但是腾窝,也可以使用以下四個方法缀踪,它們都會返回一個布爾類型:
selectionKey.isAcceptable();
selectionKey.isConnectable();
selectionKey.isReadable();
selectionKey.isWritable();
從SelectionKey訪問Channel和Selector很簡單。如下:
Channel ?channel ?= selectionKey.channel();
Selector selector = selectionKey.selector();
下面是select()方法:
int select()
int select(long timeout)
int selectNow()
select()阻塞到至少有一個通道在你注冊的事件上就緒了虹脯。
select(long timeout)和select()一樣驴娃,除了最長會阻塞timeout毫秒(參數(shù))。
selectNow()不會阻塞循集,不管什么通道就緒都立刻返回(譯者注:此方法執(zhí)行非阻塞的選擇操作唇敞。如果自從前一次選擇操作后,沒有通道變成可選擇的咒彤,則此方法直接返回零疆柔。)。
select()方法返回的int值表示有多少通道已經就緒镶柱。亦即旷档,自上次調用select()方法后有多少通道變成就緒狀態(tài)。如果調用select()方法歇拆,因為有一個通道變成就緒狀態(tài)鞋屈,返回了1,若再次調用select()方法故觅,如果另一個通道就緒了厂庇,它會再次返回1。如果對第一個就緒的channel沒有做任何操作输吏,現(xiàn)在就有兩個就緒的通道权旷,但在每次select()方法調用之間,只有一個通道就緒了贯溅。
一旦調用了select()方法拄氯,并且返回值表明有一個或更多個通道就緒了,然后可以通過調用selector的selectedKeys()方法盗迟,訪問“已選擇鍵集(selected key set)”中的就緒通道坤邪。如下所示:
Set selectedKeys = selector.selectedKeys();
當像Selector注冊Channel時,Channel.register()方法會返回一個SelectionKey 對象罚缕。這個對象代表了注冊到該Selector的通道艇纺。可以通過SelectionKey的selectedKeySet()方法訪問這些對象。
注意每次迭代末尾的keyIterator.remove()調用黔衡。Selector不會自己從已選擇鍵集中移除SelectionKey實例蚓聘。必須在處理完通道時自己移除。下次該通道變成就緒時盟劫,Selector會再次將其放入已選擇鍵集中夜牡。
SelectionKey.channel()方法返回的通道需要轉型成你要處理的類型,如ServerSocketChannel或SocketChannel等侣签。
最后上代碼:
import java.io.IOException;
import java.net.InetSocketAddress;?
import java.nio.ByteBuffer;?
import java.nio.channels.SelectionKey;?
import java.nio.channels.Selector;?
import java.nio.channels.ServerSocketChannel;?
import java.nio.channels.SocketChannel;?
import java.util.Iterator;?
/**
* NIO服務端
*/?
public class NIOServer {?
? ? //通道管理器?
? ? private Selector selector;?
? ? /**
? ? * 獲得一個ServerSocket通道塘装,并對該通道做一些初始化的工作
? ? * @param port? 綁定的端口號
? ? * @throws IOException
? ? */?
? ? public void initServer(int port) throws IOException {?
? ? ? ? // 獲得一個ServerSocket通道?
? ? ? ? ServerSocketChannel serverChannel = ServerSocketChannel.open();?
? ? ? ? // 設置通道為非阻塞?
? ? ? ? serverChannel.configureBlocking(false);?
? ? ? ? // 將該通道對應的ServerSocket綁定到port端口?
? ? ? ? serverChannel.socket().bind(new InetSocketAddress(port));?
? ? ? ? // 獲得一個通道管理器?
? ? ? ? this.selector = Selector.open();?
? ? ? ? //將通道管理器和該通道綁定,并為該通道注冊SelectionKey.OP_ACCEPT事件,注冊該事件后影所,?
? ? ? ? //當該事件到達時蹦肴,selector.select()會返回,如果該事件沒到達selector.select()會一直阻塞猴娩。?
? ? ? ? serverChannel.register(selector, SelectionKey.OP_ACCEPT);?
? ? }?
? ? /**
? ? * 采用輪詢的方式監(jiān)聽selector上是否有需要處理的事件阴幌,如果有,則進行處理
? ? * @throws IOException
? ? */?
? ? @SuppressWarnings("unchecked")?
? ? public void listen() throws IOException {?
? ? ? ? System.out.println("服務端啟動成功卷中!");?
? ? ? ? // 輪詢訪問selector?
? ? ? ? while (true) {?
? ? ? ? ? ? //當注冊的事件到達時矛双,方法返回;否則,該方法會一直阻塞?
? ? ? ? ? ? selector.select();?
? ? ? ? ? ? // 獲得selector中選中的項的迭代器蟆豫,選中的項為注冊的事件?
? ? ? ? ? ? Iterator ite = this.selector.selectedKeys().iterator();?
? ? ? ? ? ? while (ite.hasNext()) {?
? ? ? ? ? ? ? ? SelectionKey key = (SelectionKey) ite.next();?
? ? ? ? ? ? ? ? // 刪除已選的key,以防重復處理?
? ? ? ? ? ? ? ? ite.remove();?
? ? ? ? ? ? ? ? // 客戶端請求連接事件?
? ? ? ? ? ? ? ? if (key.isAcceptable()) {?
? ? ? ? ? ? ? ? ? ? ServerSocketChannel server = (ServerSocketChannel) key?
? ? ? ? ? ? ? ? ? ? ? ? ? ? .channel();?
? ? ? ? ? ? ? ? ? ? // 獲得和客戶端連接的通道?
? ? ? ? ? ? ? ? ? ? SocketChannel channel = server.accept();?
? ? ? ? ? ? ? ? ? ? // 設置成非阻塞?
? ? ? ? ? ? ? ? ? ? channel.configureBlocking(false);?
? ? ? ? ? ? ? ? ? ? //在這里可以給客戶端發(fā)送信息哦
? ? ? ? ? ? ? ? ? ? channel.write(ByteBuffer.wrap(new String("向客戶端發(fā)送了一條信息").getBytes()));?
? ? ? ? ? ? ? ? ? ? //在和客戶端連接成功之后议忽,為了可以接收到客戶端的信息,需要給通道設置讀的權限无埃。?
? ? ? ? ? ? ? ? ? ? channel.register(this.selector, SelectionKey.OP_READ);?
? ? ? ? ? ? ? ? ? ? // 獲得了可讀的事件?
? ? ? ? ? ? ? ? } else if (key.isReadable()) {?
? ? ? ? ? ? ? ? ? ? ? ? read(key);?
? ? ? ? ? ? ? ? }?
? ? ? ? ? ? }?
? ? ? ? }?
? ? }?
? ? /**
? ? * 處理讀取客戶端發(fā)來的信息 的事件
? ? * @param key
? ? * @throws IOException?
? ? */?
? ? public void read(SelectionKey key) throws IOException{?
? ? ? ? // 服務器可讀取消息:得到事件發(fā)生的Socket通道?
? ? ? ? SocketChannel channel = (SocketChannel) key.channel();?
? ? ? ? // 創(chuàng)建讀取的緩沖區(qū)?
? ? ? ? ByteBuffer buffer = ByteBuffer.allocate(10);?
? ? ? ? channel.read(buffer);?
? ? ? ? byte[] data = buffer.array();?
? ? ? ? String msg = new String(data).trim();?
? ? ? ? System.out.println("服務端收到信息:"+msg);?
? ? ? ? ByteBuffer outBuffer = ByteBuffer.wrap(msg.getBytes());?
? ? ? ? channel.write(outBuffer);// 將消息回送給客戶端?
? ? }?
? ? /**
? ? * 啟動服務端測試
? ? * @throws IOException?
? ? */?
? ? public static void main(String[] args) throws IOException {?
? ? ? ? NIOServer server = new NIOServer();?
? ? ? ? server.initServer(8000);?
? ? ? ? server.listen();?
? ? }?
}?
/**
* NIO客戶端
*/?
public class NIOClient {?
? ? //通道管理器?
? ? private Selector selector;?
? ? /**
? ? * 獲得一個Socket通道徙瓶,并對該通道做一些初始化的工作
? ? * @param ip 連接的服務器的ip
? ? * @param port? 連接的服務器的端口號? ? ? ? ?
? ? * @throws IOException
? ? */?
? ? public void initClient(String ip,int port) throws IOException {?
? ? ? ? // 獲得一個Socket通道?
? ? ? ? SocketChannel channel = SocketChannel.open();?
? ? ? ? // 設置通道為非阻塞?
? ? ? ? channel.configureBlocking(false);?
? ? ? ? // 獲得一個通道管理器?
? ? ? ? this.selector = Selector.open();?
? ? ? ? // 客戶端連接服務器,其實方法執(zhí)行并沒有實現(xiàn)連接毛雇,需要在listen()方法中調?
? ? ? ? //用channel.finishConnect();才能完成連接?
? ? ? ? channel.connect(new InetSocketAddress(ip,port));?
? ? ? ? //將通道管理器和該通道綁定嫉称,并為該通道注冊SelectionKey.OP_CONNECT事件。?
? ? ? ? channel.register(selector, SelectionKey.OP_CONNECT);?
? ? }?
? ? /**
? ? * 采用輪詢的方式監(jiān)聽selector上是否有需要處理的事件灵疮,如果有织阅,則進行處理
? ? * @throws IOException
? ? */?
? ? @SuppressWarnings("unchecked")?
? ? public void listen() throws IOException {?
? ? ? ? // 輪詢訪問selector?
? ? ? ? while (true) {?
? ? ? ? ? ? selector.select();?
? ? ? ? ? ? // 獲得selector中選中的項的迭代器?
? ? ? ? ? ? Iterator ite = this.selector.selectedKeys().iterator();?
? ? ? ? ? ? while (ite.hasNext()) {?
? ? ? ? ? ? ? ? SelectionKey key = (SelectionKey) ite.next();?
? ? ? ? ? ? ? ? // 刪除已選的key,以防重復處理?
? ? ? ? ? ? ? ? ite.remove();?
? ? ? ? ? ? ? ? // 連接事件發(fā)生?
? ? ? ? ? ? ? ? if (key.isConnectable()) {?
? ? ? ? ? ? ? ? ? ? SocketChannel channel = (SocketChannel) key?
? ? ? ? ? ? ? ? ? ? ? ? ? ? .channel();?
? ? ? ? ? ? ? ? ? ? // 如果正在連接,則完成連接?
? ? ? ? ? ? ? ? ? ? if(channel.isConnectionPending()){?
? ? ? ? ? ? ? ? ? ? ? ? channel.finishConnect();?
? ? ? ? ? ? ? ? ? ? }?
? ? ? ? ? ? ? ? ? ? // 設置成非阻塞?
? ? ? ? ? ? ? ? ? ? channel.configureBlocking(false);?
? ? ? ? ? ? ? ? ? ? //在這里可以給服務端發(fā)送信息哦?
? ? ? ? ? ? ? ? ? ? channel.write(ByteBuffer.wrap(new String("向服務端發(fā)送了一條信息").getBytes()));?
? ? ? ? ? ? ? ? ? ? //在和服務端連接成功之后震捣,為了可以接收到服務端的信息荔棉,需要給通道設置讀的權限。?
? ? ? ? ? ? ? ? ? ? channel.register(this.selector, SelectionKey.OP_READ);?
? ? ? ? ? ? ? ? ? ? // 獲得了可讀的事件?
? ? ? ? ? ? ? ? } else if (key.isReadable()) {?
? ? ? ? ? ? ? ? ? ? ? ? read(key);?
? ? ? ? ? ? ? ? }?
? ? ? ? ? ? }?
? ? ? ? }?
? ? }?
? ? /**
? ? * 處理讀取服務端發(fā)來的信息 的事件
? ? * @param key
? ? * @throws IOException?
? ? */?
? ? public void read(SelectionKey key) throws IOException{?
? ? ? ? //和服務端的read方法一樣?
? ? }?
? ? /**
? ? * 啟動客戶端測試
? ? * @throws IOException?
? ? */?
? ? public static void main(String[] args) throws IOException {?
? ? ? ? NIOClient client = new NIOClient();?
? ? ? ? client.initClient("localhost",8000);?
? ? ? ? client.listen();?
? ? }?