Java NIO 三件套
在 NIO 中有幾個(gè)核心對(duì)象需要掌握:緩沖區(qū)(Buffer),選擇器(Selector),通道(Channel)
緩沖區(qū) Buffer
Buffer 操作基本 API
緩沖區(qū)實(shí)際上是一個(gè)容器對(duì)象,更直接的說,其實(shí)就是一個(gè)數(shù)組,在 NIO 庫(kù)中,所有數(shù)據(jù)都是用緩沖區(qū)處理的.在讀取數(shù)據(jù)時(shí),它是直接讀到緩沖區(qū)中的篙议;在寫入數(shù)據(jù)時(shí),它也是寫入到緩沖區(qū)中的;任何時(shí)候訪問 NIO 中的數(shù)據(jù),都是將它放到緩沖區(qū)中.而在面向流 I/O 系統(tǒng)中,所有數(shù)據(jù)都是直接寫入或者直接將數(shù)據(jù)讀取到 Stream 對(duì)象中.在 NIO 中,所有的緩沖區(qū)類型都繼承于抽象類 Buffer,最常用的就是 ByteBuffer,對(duì)于 Java 中的基本類型,基本都有一個(gè)具體 Buffer 類型與之相對(duì)應(yīng),它們之間的繼承關(guān)系如下圖所示:
下面是一個(gè)簡(jiǎn)單的使用 IntBuffer 的例子:
public class IntBufferDemo {
public static void main(String[] args){
// 分配新的 int 緩沖區(qū),參數(shù)為緩沖區(qū)容量
// 新緩沖區(qū)的當(dāng)前位置將為零,其界限(限制位置)將為其容量。它將具有一個(gè)底層實(shí)現(xiàn)數(shù)組粱栖,其數(shù)組偏移量將為零
IntBuffer buffer = IntBuffer.allocate(8);
for (int i = 0; i < buffer.capacity(); ++i){
int j = 2 * (i + 1);
// 將給定整數(shù)寫入此緩沖區(qū)的當(dāng)前位置,當(dāng)前位置遞增
buffer.put(j);
}
// 重設(shè)此緩沖區(qū),將限制設(shè)置為當(dāng)前位置赖草,然后將當(dāng)前位置設(shè)置為 0
buffer.flip();
// 查看在當(dāng)前位置和限制位置之間是否有元素
while(buffer.hasRemaining()){
// 讀取此緩沖區(qū)當(dāng)前位置的整數(shù),然后當(dāng)前位置遞增
int j = buffer.get();
System.out.print(j + " ");
}
}
運(yùn)行后可以看到:
Buffer 的基本的原理
在談到緩沖區(qū)時(shí),我們說緩沖區(qū)對(duì)象本質(zhì)上是一個(gè)數(shù)組,但它其實(shí)是一個(gè)特殊的數(shù)組,緩沖區(qū)對(duì)象內(nèi)置了一些機(jī)制,能夠跟蹤和記錄緩沖區(qū)的狀態(tài)變化情況,如果我們使用 get()方法從緩沖區(qū)獲取數(shù)據(jù)或者使用 put()方法把數(shù)據(jù)寫入緩沖區(qū),都會(huì)引起緩沖區(qū)狀態(tài)的變化
在緩沖區(qū)中,最重要的屬性有下面三個(gè),它們一起合作完成對(duì)緩沖區(qū)內(nèi)部狀態(tài)的變化跟蹤:
position:指定下一個(gè)將要被寫入或者讀取的元素索引,它的值由 get()/put()方法自動(dòng)更新,在新創(chuàng)建一個(gè) Buffer 對(duì)象時(shí),position 被初始化為 0
limit:指定還有多少數(shù)據(jù)需要取出(在從緩沖區(qū)寫入通道時(shí)),或者還有多少空間可以放入數(shù)據(jù)(在從通道讀入緩沖區(qū)時(shí))
capacity:指定了可以存儲(chǔ)在緩沖區(qū)中的最大數(shù)據(jù)容量,實(shí)際上,它指定了底層數(shù)組的大小,或者至少是指定了準(zhǔn)許我們使用的底層數(shù)組的容量
以上三個(gè)屬性值之間有一些相對(duì)大小的關(guān)系:0<=position<=limit<=capacity.如果我們創(chuàng)建一個(gè)新的容量大小為 10 的 ByteBuffer 對(duì)象,在初始化的時(shí)候,position 設(shè)置為 0,limit 和 capacity 被設(shè)置為 10,在以后使用 ByteBuffer 對(duì)象過程中,capacity 的值不會(huì)再發(fā)生變化,而其它兩個(gè)個(gè)將會(huì)隨著使用而變化
下面我們用代碼來演示一遍,準(zhǔn)備一個(gè) txt 文檔,存放的 E 盤,輸入以下內(nèi)容:
Tom
下面我們用一段代碼來驗(yàn)證 position,limit 和 capacity 這幾個(gè)值的變化過程,代碼如下:
public class BufferDemo {
public static void main(String args[])throws Exception {
//這用用的是文件 IO 處理
FileInputStream fin = new FileInputStream("E://test.txt");
//創(chuàng)建文件的操作管道
FileChannel fc = fin.getChannel();
//分配一個(gè) 10 個(gè)大小緩沖區(qū)剪个,說白了就是分配一個(gè) 10 個(gè)大小的 byte 數(shù)組
ByteBuffer buffer = ByteBuffer.allocate(10);
output("初始化", buffer);
//先讀一下
fc.read(buffer);
output("調(diào)用 read()", buffer);
//準(zhǔn)備操作之前秧骑,先鎖定操作范圍
buffer.flip();
output("調(diào)用 flip()", buffer);
//判斷有沒有可讀數(shù)據(jù)
while (buffer.remaining()> 0){
byte b = buffer.get();
// System.out.print(((char)b));
}
output("調(diào)用 get()", buffer);
//可以理解為解鎖
buffer.clear();
output("調(diào)用 clear()", buffer);
//最后把管道關(guān)閉
fin.close();
}
//把這個(gè)緩沖里面實(shí)時(shí)狀態(tài)給答應(yīng)出來
public static void output(String step, Buffer buffer){
System.out.println(step + " : ");
//容量,數(shù)組大小
System.out.print("capacity: " + buffer.capacity()+ ", ");
//當(dāng)前操作數(shù)據(jù)所在的位置扣囊,也可以叫做游標(biāo)
System.out.print("position: " + buffer.position()+ ", ");
//鎖定值乎折,flip,數(shù)據(jù)操作范圍索引只能在 position - limit 之間
System.out.println("limit: " + buffer.limit());
System.out.println();
}
}
完成的輸出結(jié)果為:
運(yùn)行結(jié)果我們已經(jīng)看到,下面呢對(duì)以上結(jié)果進(jìn)行圖解,四個(gè)屬性值分別如圖所示:
我們可以從通道中讀取一些數(shù)據(jù)到緩沖區(qū)中,注意從通道讀取數(shù)據(jù),相當(dāng)于往緩沖區(qū)中寫入數(shù)據(jù).如果讀取 4 個(gè)自己的數(shù)據(jù),則此時(shí) position 的值為 4,即下一個(gè)將要被寫入的字節(jié)索引為 4,而 limit 仍然是 10,如下圖所示:
下一步把讀取的數(shù)據(jù)寫入到輸出通道中,相當(dāng)于從緩沖區(qū)中讀取數(shù)據(jù),在此之前,必須調(diào)用 flip()方法,該方法將會(huì)完成兩件事情:
- 把 limit 設(shè)置為當(dāng)前的 position 值
- 把 position 設(shè)置為 0
由于 position 被設(shè)置為 0,所以可以保證在下一步輸出時(shí)讀取到的是緩沖區(qū)中的第一個(gè)字節(jié),而 limit 被設(shè)置為當(dāng)前的 position,可以保證讀取的數(shù)據(jù)正好是之前寫入到緩沖區(qū)中的數(shù)據(jù),如下圖所示:
現(xiàn)在調(diào)用 get()方法從緩沖區(qū)中讀取數(shù)據(jù)寫入到輸出通道,這會(huì)導(dǎo)致 position 的增加而 limit 保持不變,但 position 不會(huì)超過 limit 的值,所以在讀取我們之前寫入到緩沖區(qū)中的 4 個(gè)自己之后,position 和 limit 的值都為 4,如下圖所示:
在從緩沖區(qū)中讀取數(shù)據(jù)完畢后,limit 的值仍然保持在我們調(diào)用 flip()方法時(shí)的值,調(diào)用 clear()方法能夠把所有的狀態(tài)變化設(shè)置為初始化時(shí)的值,如下圖所示:
緩沖區(qū)的分配
在前面的幾個(gè)例子中,我們已經(jīng)看過了,在創(chuàng)建一個(gè)緩沖區(qū)對(duì)象時(shí),會(huì)調(diào)用靜態(tài)方法 allocate()來指定緩沖區(qū)的容量,其實(shí)調(diào)用 allocate()相當(dāng)于創(chuàng)建了一個(gè)指定大小的數(shù)組,并把它包裝為緩沖區(qū)對(duì)象.或者我們也可以直接將一個(gè)現(xiàn)有的數(shù)組,包裝為緩沖區(qū)對(duì)象,如下示例代碼所示:
/** 手動(dòng)分配緩沖區(qū) */
public class BufferWrap {
public void myMethod(){
// 分配指定大小的緩沖區(qū)
ByteBuffer buffer1 = ByteBuffer.allocate(10);
// 包裝一個(gè)現(xiàn)有的數(shù)組
byte array[] = new byte[10];
ByteBuffer buffer2 = ByteBuffer.wrap(array);
}
}
緩沖區(qū)分片
在 NIO 中,除了可以分配或者包裝一個(gè)緩沖區(qū)對(duì)象外,還可以根據(jù)現(xiàn)有的緩沖區(qū)對(duì)象來創(chuàng)建一個(gè)子緩沖區(qū),即在現(xiàn)有緩沖區(qū)上切出一片來作為一個(gè)新的緩沖區(qū),但現(xiàn)有的緩沖區(qū)與創(chuàng)建的子緩沖區(qū)在底層數(shù)組層面上是數(shù)據(jù)共享的,也就是說,子緩沖區(qū)相當(dāng)于是現(xiàn)有緩沖區(qū)的一個(gè)視圖窗口.調(diào)用 slice()方法可以創(chuàng)建一個(gè)子緩沖區(qū),讓我們通過例子來看一下:
/*** 緩沖區(qū)分片 */
public class BufferSlice {
static public void main(String args[])throws Exception {
ByteBuffer buffer = ByteBuffer.allocate(10);
// 緩沖區(qū)中的數(shù)據(jù) 0-9
for (int i = 0; i < buffer.capacity(); ++i){
buffer.put((byte)i);
}
// 創(chuàng)建子緩沖區(qū)
buffer.position(3);
buffer.limit(7);
ByteBuffer slice = buffer.slice();
// 改變子緩沖區(qū)的內(nèi)容
for (int i = 0; i < slice.capacity(); ++i){
byte b = slice.get(i);
b *= 10;
slice.put(i, b);
}
buffer.position(0);
buffer.limit(buffer.capacity());
while (buffer.remaining()> 0){
System.out.println(buffer.get());
}
}
}
在該示例中,分配了一個(gè)容量大小為10的緩沖區(qū),并在其中放入了數(shù)據(jù)0-9,而在該緩沖區(qū)基礎(chǔ)之上又創(chuàng)建了一個(gè)子緩沖區(qū),并改變子緩沖區(qū)中的內(nèi)容,從最后輸出的結(jié)果來看,只有子緩沖區(qū)“可見的”那部分?jǐn)?shù)據(jù)發(fā)生了變化,并且說明子緩沖區(qū)與原緩沖區(qū)是數(shù)據(jù)共享的,輸出結(jié)果如下所示:
只讀緩沖區(qū)
只讀緩沖區(qū)非常簡(jiǎn)單,可以讀取它們,但是不能向它們寫入數(shù)據(jù).可以通過調(diào)用緩沖區(qū)的 asReadOnlyBuffer()方法,將任何常規(guī)緩沖區(qū)轉(zhuǎn)換為只讀緩沖區(qū),這個(gè)方法返回一個(gè)與原緩沖區(qū)完全相同的緩沖區(qū),并與原緩沖區(qū)共享數(shù)據(jù),只不過它是只讀的.如果原緩沖區(qū)的內(nèi)容發(fā)生了變化,只讀緩沖區(qū)的內(nèi)容也隨之發(fā)生變化:
/** 只讀緩沖區(qū) */
public class ReadOnlyBuffer {
static public void main(String args[])throws Exception {
ByteBuffer buffer = ByteBuffer.allocate(10);
// 緩沖區(qū)中的數(shù)據(jù) 0-9
for (int i=0; i<buffer.capacity(); ++i){
buffer.put((byte)i);
}
// 創(chuàng)建只讀緩沖區(qū)
ByteBuffer readonly = buffer.asReadOnlyBuffer();
// 改變?cè)彌_區(qū)的內(nèi)容
for (int i=0; i<buffer.capacity(); ++i){
byte b = buffer.get(i);
b *= 10; buffer.put(i, b);
}
readonly.position(0);
readonly.limit(buffer.capacity());
// 只讀緩沖區(qū)的內(nèi)容也隨之改變
while (readonly.remaining()>0){
System.out.println(readonly.get());
}
}
}
如果嘗試修改只讀緩沖區(qū)的內(nèi)容,則會(huì)報(bào) ReadOnlyBufferException 異常.只讀緩沖區(qū)對(duì)于保護(hù)數(shù)據(jù)很有用.在將緩沖區(qū)傳遞給某個(gè)對(duì)象的方法時(shí),無法知道這個(gè)方法是否會(huì)修改緩沖區(qū)中的數(shù)據(jù).創(chuàng)建一個(gè)只讀的緩沖區(qū)可以保證該緩沖區(qū)不會(huì)被修改.只可以把常規(guī)緩沖區(qū)轉(zhuǎn)換為只讀緩沖區(qū),而不能將只讀的緩沖區(qū)轉(zhuǎn)換為可寫的緩沖區(qū)
直接緩沖區(qū)
直接緩沖區(qū)是為加快 I/O 速度,使用一種特殊方式為其分配內(nèi)存的緩沖區(qū),JDK 文檔中的描述為:給定一個(gè)直接字節(jié)緩沖區(qū),Java 虛擬機(jī)將盡最大努力直接對(duì)它執(zhí)行本機(jī) I/O 操作.也就是說,它會(huì)在每一次調(diào)用底層操作系統(tǒng)的本機(jī) I/O 操作之前(或之后),嘗試避免將緩沖區(qū)的內(nèi)容拷貝到一個(gè)中間緩沖區(qū)中或者從一個(gè)中間緩沖區(qū)中拷貝數(shù)據(jù).要分配直接緩沖區(qū),需要調(diào)用 allocateDirect()方法,而不是 allocate()方法,使用方式與普通緩沖區(qū)并無區(qū)別,如下面的拷貝文件示例:
/*** 直接緩沖區(qū) */
public class DirectBuffer {
static public void main(String args[])throws Exception {
//首先我們從磁盤上讀取剛才我們寫出的文件內(nèi)容
String infile = "E://test.txt";
FileInputStream fin = new FileInputStream(infile);
FileChannel fcin = fin.getChannel();
//把剛剛讀取的內(nèi)容寫入到一個(gè)新的文件中
String outfile = String.format("E://testcopy.txt");
FileOutputStream fout = new FileOutputStream(outfile);
FileChannel fcout = fout.getChannel();
// 使用 allocateDirect侵歇,而不是 allocate
ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
while (true){
buffer.clear();
int r = fcin.read(buffer);
if (r==-1){
break;
}
buffer.flip();
fcout.write(buffer);
}
}
}
內(nèi)存映射
內(nèi)存映射是一種讀和寫文件數(shù)據(jù)的方法,它可以比常規(guī)的基于流或者基于通道的 I/O 快的多.內(nèi)存映射文件 I/O 是通過使文件中的數(shù)據(jù)出現(xiàn)為內(nèi)存數(shù)組的內(nèi)容來完成的,這其初聽起來似乎不過就是將整個(gè)文件讀到內(nèi)存中,但是事實(shí)上并不是這樣.一般來說,只有文件中實(shí)際讀取或者寫入的部分才會(huì)映射到內(nèi)存中.如下面的示例代碼:
/*** IO 映射緩沖區(qū) */
public class MappedBuffer {
static private final int start = 0;
static private final int size = 1024;
static public void main(String args[])throws Exception {
RandomAccessFile raf = new RandomAccessFile( "E://test.txt", "rw");
FileChannel fc = raf.getChannel();
//把緩沖區(qū)跟文件系統(tǒng)進(jìn)行一個(gè)映射關(guān)聯(lián)
//只要操作緩沖區(qū)里面的內(nèi)容骂澄,文件內(nèi)容也會(huì)跟著改變
MappedByteBuffer mbb = fc.map(FileChannel.MapMode.READ_WRITE,start, size);
mbb.put(0, (byte)97);
mbb.put(1023, (byte)122);
raf.close();
}
}
選擇器 Selector
傳統(tǒng)的 Server/Client 模式會(huì)基于 TPR(ThreadperRequest),服務(wù)器會(huì)為每個(gè)客戶端請(qǐng)求建立一個(gè)線程,由該線程單獨(dú)負(fù)責(zé)處理一個(gè)客戶請(qǐng)求.這種模式帶來的一個(gè)問題就是線程數(shù)量的劇增,大量的線程會(huì)增大服務(wù)器的開銷.大多數(shù)的實(shí)現(xiàn)為了避免這個(gè)問題,都采用了線程池模型,并設(shè)置線程池線程的最大數(shù)量,這又帶來了新的問題,如果線程池中有 200 個(gè)線程,而有 200 個(gè)用戶都在進(jìn)行大文件下載,會(huì)導(dǎo)致第 201 個(gè)用戶的請(qǐng)求無法及時(shí)處理,即便第 201 個(gè)用戶只想請(qǐng)求一個(gè)幾 KB 大小的頁(yè)面.傳統(tǒng)的 Server/Client 模式如下圖所示:
NIO 中非阻塞 I/O 采用了基于 Reactor 模式的工作方式,I/O 調(diào)用不會(huì)被阻塞,相反是注冊(cè)感興趣的特定 I/O 事件,如可讀數(shù)據(jù)到達(dá),新的套接字連接等等,在發(fā)生特定事件時(shí),系統(tǒng)再通知我們.NIO 中實(shí)現(xiàn)非阻塞 I/O 的核心對(duì)象就是 Selector,Selector 就是注冊(cè)各種 I/O 事件地方,而且當(dāng)那些事件發(fā)生時(shí),就是這個(gè)對(duì)象告訴我們所發(fā)生的事件,如下圖所示:
從圖中可以看出,當(dāng)有讀或?qū)懙热魏巫?cè)的事件發(fā)生時(shí),可以從 Selector 中獲得相應(yīng)的 SelectionKey,同時(shí)從 SelectionKey 中可以找到發(fā)生的事件和該事件所發(fā)生的具體的 SelectableChannel,以獲得客戶端發(fā)送過來的數(shù)據(jù)
使用 NIO 中非阻塞 I/O 編寫服務(wù)器處理程序,大體上可以分為下面三個(gè)步驟:
- 向 Selector 對(duì)象注冊(cè)感興趣的事件
- 從 Selector 中獲取感興趣的事件
- 根據(jù)不同的事件進(jìn)行相應(yīng)的處理
接下來我們用一個(gè)簡(jiǎn)單的示例來說明整個(gè)過程.首先是向 Selector 對(duì)象注冊(cè)感興趣的事件:
/** 注冊(cè)事件 */
private Selector getSelector()throws IOException {
// 創(chuàng)建 Selector 對(duì)象
Selector sel = Selector.open();
// 創(chuàng)建可選擇通道,并配置為非阻塞模式
ServerSocketChannel server = ServerSocketChannel.open();
server.configureBlocking(false);
// 綁定通道到指定端口
ServerSocket socket = server.socket();
InetSocketAddress address = new InetSocketAddress(port);
socket.bind(address);
// 向 Selector 中注冊(cè)感興趣的事件
server.register(sel, SelectionKey.OP_ACCEPT);
return sel;
}
創(chuàng)建了 ServerSocketChannel 對(duì)象,并調(diào)用 configureBlocking()方法,配置為非阻塞模式,接下來的三行代碼把該通道綁定到指定端口,最后向 Selector 中注冊(cè)事件,此處指定的是參數(shù)是 OP_ACCEPT,即指定我們想要監(jiān)聽 accept 事件,也就是新的連接發(fā)生時(shí)所產(chǎn)生的事件,對(duì)于 ServerSocketChannel 通道來說,我們唯一可以指定的參數(shù)就是 OP_ACCEPT
從 Selector 中獲取感興趣的事件,即開始監(jiān)聽,進(jìn)入內(nèi)部循環(huán):
/** 開始監(jiān)聽 */
public void listen(){
System.out.println("listen on " + port);
try {
while(true){
// 該調(diào)用會(huì)阻塞盒至,直到至少有一個(gè)事件發(fā)生
selector.select();
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iter = keys.iterator();
while (iter.hasNext()){
SelectionKey key = (SelectionKey)iter.next();
iter.remove();
process(key);
}
}
} catch (IOException e){
e.printStackTrace();
}
}
在非阻塞 I/O 中,內(nèi)部循環(huán)模式基本都是遵循這種方式.首先調(diào)用 select()方法,該方法會(huì)阻塞,直到至少有一個(gè)事件發(fā)生,然后再使用 selectedKeys()方法獲取發(fā)生事件的 SelectionKey,再使用迭代器進(jìn)行循環(huán)
最后一步就是根據(jù)不同的事件,編寫相應(yīng)的處理代碼:
/** 根據(jù)不同的事件做處理 */
private void process(SelectionKey key)throws IOException{
// 接收請(qǐng)求
if (key.isAcceptable()){
ServerSocketChannel server = (ServerSocketChannel)key.channel();
SocketChannel channel = server.accept();
channel.configureBlocking(false);
channel.register(selector, SelectionKey.OP_READ);
}
// 讀信息
else if (key.isReadable()){
SocketChannel channel = (SocketChannel)key.channel();
int len = channel.read(buffer);
if (len > 0){
buffer.flip();
content = new String(buffer.array(),0,len);
SelectionKey sKey = channel.register(selector, SelectionKey.OP_WRITE);
sKey.attach(content);
} else {
channel.close();
}
buffer.clear();
}
// 寫事件
else if (key.isWritable()){
SocketChannel channel = (SocketChannel)key.channel();
String content = (String)key.attachment();
ByteBuffer block = ByteBuffer.wrap(("輸出內(nèi)容:" + content).getBytes());
if(block != null){
channel.write(block);
}else{
channel.close();
}
}
}
此處分別判斷是接受請(qǐng)求,讀數(shù)據(jù)還是寫事件,分別作不同的處理.在 Java1.4 之前的 I/O 系統(tǒng)中,提供的都是面向流的 I/O 系統(tǒng),系統(tǒng)一次一個(gè)字節(jié)地處理數(shù)據(jù),一個(gè)輸入流產(chǎn)生一個(gè)字節(jié)的數(shù)據(jù),一個(gè)輸出流消費(fèi)一個(gè)字節(jié)的數(shù)據(jù),面向流的 I/O 速度非常慢,而在 Java1.4 中推出了 NIO,這是一個(gè)面向塊的 I/O 系統(tǒng),系統(tǒng)以塊的方式處理處理,每一個(gè)操作在一步中產(chǎn)生或者消費(fèi)一個(gè)數(shù)據(jù)庫(kù),按塊處理要比按字節(jié)處理數(shù)據(jù)快的多
通道 Channel
通道是一個(gè)對(duì)象,通過它可以讀取和寫入數(shù)據(jù),當(dāng)然了所有數(shù)據(jù)都通過 Buffer 對(duì)象來處理.我們永遠(yuǎn)不會(huì)將字節(jié)直接寫入通道中,相反是將數(shù)據(jù)寫入包含一個(gè)或者多個(gè)字節(jié)的緩沖區(qū).同樣不會(huì)直接從通道中讀取字節(jié),而是將數(shù)據(jù)從通道讀入緩沖區(qū),再?gòu)木彌_區(qū)獲取這個(gè)字節(jié)
在 NIO 中,提供了多種通道對(duì)象,而所有的通道對(duì)象都實(shí)現(xiàn)了 Channel 接口.它們之間的繼承關(guān)系如下圖所示:
使用 NIO 讀取數(shù)據(jù)
在前面我們說過,任何時(shí)候讀取數(shù)據(jù),都不是直接從通道讀取,而是從通道讀取到緩沖區(qū).所以使用 NIO 讀取數(shù)據(jù)可以分為下面三個(gè)步驟:
- 從 FileInputStream 獲取 Channel
- 創(chuàng)建 Buffer
- 將數(shù)據(jù)從 Channel 讀取到 Buffer 中
下面是一個(gè)簡(jiǎn)單的使用 NIO 從文件中讀取數(shù)據(jù)的例子:
使用 NIO 寫入數(shù)據(jù)
使用 NIO 寫入數(shù)據(jù)與讀取數(shù)據(jù)的過程類似,同樣數(shù)據(jù)不是直接寫入通道,而是寫入緩沖區(qū),可以分為下面三個(gè)步驟:
- 從 FileInputStream 獲取 Channel
- 創(chuàng)建 Buffer
- 將數(shù)據(jù)從 Channel 寫入到 Buffer 中
public class FileInputDemo {
static public void main(String args[])throws Exception {
FileInputStream fin = new FileInputStream("E://test.txt");
// 獲取通道
FileChannel fc = fin.getChannel();
// 創(chuàng)建緩沖區(qū)
ByteBuffer buffer = ByteBuffer.allocate(1024);
// 讀取數(shù)據(jù)到緩沖區(qū)
fc.read(buffer);
buffer.flip();
while (buffer.remaining()> 0){
byte b = buffer.get();
System.out.print(((char)b));
}
fin.close();
}
}
下面是一個(gè)簡(jiǎn)單的使用 NIO 向文件中寫入數(shù)據(jù)的例子:
public class FileOutputDemo {
static private final byte message[] = { 83, 111, 109, 101, 32, 98, 121, 116, 101, 115, 46 };
static public void main(String args[])throws Exception {
FileOutputStream fout = new FileOutputStream("E://test.txt");
FileChannel fc = fout.getChannel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
for (int i=0; i<message.length; ++i){
buffer.put(message[I]);
}
buffer.flip();
fc.write(buffer);
fout.close();
}
}
IO 多路復(fù)用
我們?cè)囅胍幌逻@樣的現(xiàn)實(shí)場(chǎng)景:
一個(gè)餐廳同時(shí)有100位客人到店,當(dāng)然到店后第一件要做的事情就是點(diǎn)菜.但是問題來了,餐廳老板為了節(jié)約人力成本目前只有一位大堂服務(wù)員拿著唯一的一本菜單等待客人進(jìn)行服務(wù)
那么最笨(但是最簡(jiǎn)單)的方法是(方法 A),無論有多少客人等待點(diǎn)餐,服務(wù)員都把僅有的一份菜單遞給其中一位客人,然后站在客人身旁等待這個(gè)客人完成點(diǎn)菜過程.在記錄客人點(diǎn)菜內(nèi)容后,把點(diǎn)菜記錄交給后堂廚師.然后是第二位客人....然后是第三位客人.很明顯,只有腦袋被門夾過的老板,才會(huì)這樣設(shè)置服務(wù)流程.因?yàn)殡S后的 80 位客人,再等待超時(shí)后就會(huì)離店(還會(huì)給差評(píng))
于是還有一種辦法(方法 B),老板馬上新雇傭 99 名服務(wù)員,同時(shí)印制 99 本新的菜單.每一名服務(wù)員手持一本菜單負(fù)責(zé)一位客人(關(guān)鍵不只在于服務(wù)員,還在于菜單.因?yàn)闆]有菜單客人也無法點(diǎn)菜).在客人點(diǎn)完菜后,記錄點(diǎn)菜內(nèi)容交給后堂廚師(當(dāng)然為了更高效,后堂廚師最好也有 100 名).這樣每一位客人享受的就是 VIP 服務(wù)咯,當(dāng)然客人不會(huì)走,但是人力成本可是一個(gè)大頭哦(虧死你)
另外一種辦法(方法 C),就是改進(jìn)點(diǎn)菜的方式,當(dāng)客人到店后,自己申請(qǐng)一本菜單.想好自己要點(diǎn)的才后,就呼叫服務(wù)員.服務(wù)員站在自己身邊后記錄客人的菜單內(nèi)容.將菜單遞給廚師的過程也要進(jìn)行改進(jìn),并不是每一份菜單記錄好以后,都要交給后堂廚師.服務(wù)員可以記錄號(hào)多份菜單后,同時(shí)交給廚師就行了.那么這種方式,對(duì)于老板來說人力成本是最低的酗洒;對(duì)于客人來說,雖然不再享受 VIP 服務(wù)并且要進(jìn)行一定的等待,但是這些都是可接受的;對(duì)于服務(wù)員來說,基本上她的時(shí)間都沒有浪費(fèi),基本上被老板壓桿了最后一滴油水
如果您是老板,您會(huì)采用哪種方式呢?
到店情況:并發(fā)量.到店情況不理想時(shí),一個(gè)服務(wù)員一本菜單,當(dāng)然是足夠了.所以不同的老板在不同的場(chǎng)合下,將會(huì)靈活選擇服務(wù)員和菜單的配置
客人:客戶端請(qǐng)求
點(diǎn)餐內(nèi)容:客戶端發(fā)送的實(shí)際數(shù)據(jù)
老板:操作系統(tǒng)
人力成本:系統(tǒng)資源
菜單:文件狀態(tài)描述符(FD).操作系統(tǒng)對(duì)于一個(gè)進(jìn)程能夠同時(shí)持有的文件狀態(tài)描述符的個(gè)數(shù)是有限制的,在 linux 系統(tǒng)中$ulimit-n 查看這個(gè)限制值,當(dāng)然也是可以(并且應(yīng)該)進(jìn)行內(nèi)核參數(shù)調(diào)整的
服務(wù)員:操作系統(tǒng)內(nèi)核用于 IO 操作的線程(內(nèi)核線程)
廚師:應(yīng)用程序線程(當(dāng)然廚房就是應(yīng)用程序進(jìn)程咯)
方法 A:同步 IO 方法
B:同步 IO 方法
C:多路復(fù)用 IO
目前流行的多路復(fù)用 IO 實(shí)現(xiàn)主要包括四種:select,poll,epoll,kqueue.下表是他們的一些重要特性的比較:
IO 模型 | 相對(duì)性能 | 關(guān)鍵思路 | 操作系統(tǒng) | JAVA 支持 |
---|---|---|---|---|
select | 較高 | Reactor | windows/Linux | 支持,Reactor 模式(反應(yīng)器設(shè)計(jì)模式).Linux 操作系統(tǒng)的 kernels2.4 內(nèi)核版本之前,默認(rèn)使用 select枷遂;而目前 windows 下對(duì)同步 IO 的支持,都是 select 模型 |
poll | 較高 | Reactor | Linux | Linux 下的 JAVANIO 框架,Linuxkernels2.6 內(nèi)核版本之前使用 poll 進(jìn)行支持.也是使用的 Reactor 模式 |
epoll | 高 | Reactor/Proactor | Linux | Linuxkernels2.6 內(nèi)核版本及以后使用 epoll 進(jìn)行支持樱衷;Linuxkernels2.6 內(nèi)核版本之前使用 poll 進(jìn)行支持;另外一定注意,由于 Linux 下沒有 Windows 下的 IOCP 技術(shù)提供真正的異步 IO 支持,所以 Linux 下使用 epoll 模擬異步 IO |
kqueue | 高 | Proactor | Linux | 目前 JAVA 的版本不支持 |
多路復(fù)用 IO 技術(shù)最適用的是“高并發(fā)”場(chǎng)景,所謂高并發(fā)是指 1 毫秒內(nèi)至少同時(shí)有上千個(gè)連接請(qǐng)求準(zhǔn)備好.其他情況下多路復(fù)用 IO 技術(shù)發(fā)揮不出來它的優(yōu)勢(shì).另一方面,使用 JAVANIO 進(jìn)行功能實(shí)現(xiàn),相對(duì)于傳統(tǒng)的 Socket 套接字實(shí)現(xiàn)要復(fù)雜一些,所以實(shí)際應(yīng)用中,需要根據(jù)自己的業(yè)務(wù)需求進(jìn)行技術(shù)選擇
NIO 源碼初探
說到源碼先得從 Selector 的 open 方法開始看起,java.nio.channels.Selector:
public static Selector open()throws IOException {
return SelectorProvider.provider().openSelector();
}
看看 SelectorProvider.provider()做了什么:
public static SelectorProvider provider(){
synchronized (lock){
if (provider != null){
return provider;
}
return AccessController.doPrivileged(new PrivilegedAction<SelectorProvider>(){
public SelectorProvider run(){
if (loadProviderFromProperty()){
return provider;
}
if (loadProviderAsService()){
return provider;
}
provider = sun.nio.ch.DefaultSelectorProvider.create();
return provider;
}
});
}
}
其中 provider=sun.nio.ch.DefaultSelectorProvider.create();會(huì)根據(jù)操作系統(tǒng)來返回不同的實(shí)現(xiàn)類,windows 平臺(tái)就返回 WindowsSelectorProvider
而 if(provider!=null)returnprovider;保證了整個(gè) server 程序中只有一個(gè) WindowsSelectorProvider 對(duì)象
再看看 WindowsSelectorProvider.openSelector():
public AbstractSelector openSelector()throws IOException {
return new WindowsSelectorImpl(this);
}
newWindowsSelectorImpl(SelectorProvider)代碼:
WindowsSelectorImpl(SelectorProvider sp)throws IOException {
super(sp);
pollWrapper = new PollArrayWrapper(INIT_CAP);
wakeupPipe = Pipe.open();
wakeupSourceFd = ((SelChImpl)wakeupPipe.source()).getFDVal();
// Disable the Nagle algorithm so that the wakeup is more immediate
SinkChannelImpl sink = (SinkChannelImpl)wakeupPipe.sink();
(sink.sc).socket().setTcpNoDelay(true);
wakeupSinkFd = ((SelChImpl)sink).getFDVal();
pollWrapper.addWakeupSocket(wakeupSourceFd, 0);
}
其中 Pipe.open()是關(guān)鍵,這個(gè)方法的調(diào)用過程是:
public static Pipe open()throws IOException {
return SelectorProvider.provider().openPipe();
}
SelectorProvider 中:
public Pipe openPipe()throws IOException {
return new PipeImpl(this);
}
再看看怎么 newPipeImpl()的:
PipeImpl(SelectorProvider sp){
long pipeFds = IOUtil.makePipe(true);
int readFd = (int)(pipeFds >>> 32);
int writeFd = (int)pipeFds;
FileDescriptor sourcefd = new FileDescriptor();
IOUtil.setfdVal(sourcefd, readFd);
source = new SourceChannelImpl(sp, sourcefd);
FileDescriptor sinkfd = new FileDescriptor();
IOUtil.setfdVal(sinkfd, writeFd);
sink = new SinkChannelImpl(sp, sinkfd);
}
其中 IOUtil.makePipe(true)是個(gè) native 方法:
/**
* Returns two file descriptors for a pipe encoded in a long.
* The read end of the pipe is returned in the high 32 bits,
* while the write end is returned in the low 32 bits.
*/
staticnativelong makePipe(boolean blocking);
具體實(shí)現(xiàn):
JNIEXPORT jlong JNICALL
Java_sun_nio_ch_IOUtil_makePipe(JNIEnv *env, jobject this, jboolean blocking){
int fd[2];
if (pipe(fd)< 0){
JNU_ThrowIOExceptionWithLastError(env, "Pipe failed");
return 0;
}
if (blocking == JNI_FALSE){
if ((configureBlocking(fd[0], JNI_FALSE)< 0)|| (configureBlocking(fd[1], JNI_FALSE)< 0)){
JNU_ThrowIOExceptionWithLastError(env, "Configure blocking failed");
close(fd[0]);
close(fd[1]);
return 0;
}
}
return ((jlong)fd[0] << 32)| (jlong)fd[1];
}
static int configureBlocking(int fd, jboolean blocking){
int flags = fcntl(fd, F_GETFL);
int newflags = blocking ? (flags & ~O_NONBLOCK): (flags | O_NONBLOCK);
return (flags == newflags)? 0 : fcntl(fd, F_SETFL, newflags);
}
正如這段注釋所描述的:
/**
* Returns two file descriptors for a pipe encoded in a long.
* The read end of the pipe is returned in the high 32 bits,
* while the write end is returned in the low 32 bits.
*/
High32 位存放的是通道 read 端的文件描述符 FD(filedescriptor),low32bits 存放的是 write 端的文件描述符.所以取到 makepipe()返回值后要做移位處理
pollWrapper.addWakeupSocket(wakeupSourceFd, 0);
這行代碼把返回的 pipe 的 write 端的 FD 放在了 pollWrapper 中(后面會(huì)發(fā)現(xiàn),這么做是為了實(shí)現(xiàn) selector 的 wakeup())ServerSocketChannel.open()的實(shí)現(xiàn):
public static ServerSocketChannel open()throws IOException {
return SelectorProvider.provider().openServerSocketChannel();
}
SelectorProvider:
public ServerSocketChannel openServerSocketChannel()throws IOException {
return new ServerSocketChannelImpl(this);
}
可見創(chuàng)建的 ServerSocketChannelImpl 也有 WindowsSelectorImpl 的引用
public ServerSocketChannelImpl(SelectorProvider sp)throws IOException {
super(sp);
this.fd = Net.serverSocket(true);
this.fdVal = IOUtil.fdVal(fd);
this.state = ST_INUSE;
}
然后通過 serverChannel1.register(selector,SelectionKey.OP_ACCEPT);把 selector 和 channel 綁定在一起,也就是把 newServerSocketChannel 時(shí)創(chuàng)建的 FD 與 selector 綁定在了一起
到此,server 端已啟動(dòng)完成了,主要?jiǎng)?chuàng)建了以下對(duì)象:WindowsSelectorProvider:單例
WindowsSelectorImpl 中包含:
- pollWrapper:保存 selector 上注冊(cè)的 FD,包括 pipe 的 write 端 FD 和 ServerSocketChannel 所用的 FD
- wakeupPipe:通道(其實(shí)就是兩個(gè) FD,一個(gè) read,一個(gè) write)
再到 Server 中的 run():
selector.select();主要調(diào)用了 WindowsSelectorImpl 中的這個(gè)方法:
protected int doSelect(long timeout)throws IOException {
if (channelArray == null){
throw new ClosedSelectorException();
}
this.timeout = timeout; // set selector timeout
processDeregisterQueue();
if (interruptTriggered){
resetWakeupSocket(); return 0;
}
// Calculate number of helper threads needed for poll. If necessary
// threads are created here and start waiting on startLock
adjustThreadsCount();
finishLock.reset(); // reset finishLock
// Wakeup helper threads, waiting on startLock, so they start polling
// Redundant threads will exit here after wakeup
startLock.startThreads();
// do polling in the main thread. Main thread is responsible for
// first MAX_SELECTABLE_FDS entries in pollArray
try {
begin();
try {
subSelector.poll();
} catch (IOException e){
finishLock.setException(e); // Save this exception
}
// Main thread is out of poll(). Wakeup others and wait for them
if (threads.size()> 0){
finishLock.waitForHelperThreads();
}
} finally {
end();
}
// Done with poll(). Set wakeupSocket to nonsignaled for the next run
finishLock.checkForException();
processDeregisterQueue();
int updated = updateSelectedKeys();
// Done with poll(). Set wakeupSocket to nonsignaled for the next run
resetWakeupSocket();
return updated;
}
其中 subSelector.poll()是核心,也就是輪訓(xùn) pollWrapper 中保存的 FD酒唉;具體實(shí)現(xiàn)是調(diào)用 native 方法 poll0:
private int poll()throws IOException{
// poll for the main thread
return poll0(pollWrapper.pollArrayAddress, Math.min(totalChannels, MAX_SELECTABLE_FDS), readFds, writeFds, exceptFds, timeout);
}
private native int poll0(long pollAddress, int numfds, int[] readFds, int[] writeFds, int[] exceptFds, long timeout);
// These arrays will hold result of native select()
// The first element of each array is the number of selected sockets
// Other elements are file descriptors of selected sockets
private final int[] readFds = new int [MAX_SELECTABLE_FDS + 1];//保存發(fā)生 read 的 FD
private final int[] writeFds = new int [MAX_SELECTABLE_FDS + 1]; //保存發(fā)生 write 的 FD
private final int[] exceptFds = new int [MAX_SELECTABLE_FDS + 1]; //保存發(fā)生 except 的 FD
這個(gè) poll0()會(huì)監(jiān)聽 pollWrapper 中的 FD 有沒有數(shù)據(jù)進(jìn)出,這會(huì)造成 IO 阻塞,直到有數(shù)據(jù)讀寫事件發(fā)生.比如,由于 pollWrapper 中保存的也有 ServerSocketChannel 的 FD,所以只要 ClientSocket 發(fā)一份數(shù)據(jù)到 ServerSocket,那么 poll0()就會(huì)返回矩桂;又由于 pollWrapper 中保存的也有 pipe 的 write 端的 FD,所以只要 pipe 的 write 端向 FD 發(fā)一份數(shù)據(jù),也會(huì)造成 poll0()返回;如果這兩種情況都沒有發(fā)生,那么 poll0()就一直阻塞,也就是 selector.select()會(huì)一直阻塞痪伦;如果有任何一種情況發(fā)生,那么 selector.select()就會(huì)返回,所有在 OperationServer 的 run()里要用 while(true){,這樣就可以保證在 selector 接收到數(shù)據(jù)并處理完后繼續(xù)監(jiān)聽 poll();
這時(shí)再來看看 WindowsSelectorImpl.Wakeup():
public Selector wakeup(){
synchronized (interruptLock){
if (!interruptTriggered){
setWakeupSocket();
interruptTriggered = true;
}
}return this;
}
// Sets Windows wakeup socket to a signaled state
private void setWakeupSocket(){
setWakeupSocket0(wakeupSinkFd);
}
private native void setWakeupSocket0(int wakeupSinkFd);
JNIEXPORT void JNICALL
Java_sun_nio_ch_WindowsSelectorImpl_setWakeupSocket0(JNIEnv *env, jclass this, jint scoutFd){
/* Write one byte into the pipe */
const char byte = 1;
send(scoutFd, &byte, 1, 0);
}
可見 wakeup()是通過 pipe 的 write 端 send(scoutFd,&byte,1,0),發(fā)生一個(gè)字節(jié) 1,來喚醒 poll().所以在需要的時(shí)候就可以調(diào)用 selector.wakeup()來喚醒 selector
反應(yīng)堆 Reactor
現(xiàn)在我們已經(jīng)對(duì)阻塞 I/O 已有了一定了解,我們知道阻塞 I/O 在調(diào)用 InputStream.read()方法時(shí)是阻塞的,它會(huì)一直等到數(shù)據(jù)到來時(shí)(或超時(shí))才會(huì)返回侄榴;同樣,在調(diào)用 ServerSocket.accept()方法時(shí),也會(huì)一直阻塞到有客戶端連接才會(huì)返回,每個(gè)客戶端連接過來后,服務(wù)端都會(huì)啟動(dòng)一個(gè)線程去處理該客戶端的請(qǐng)求.阻塞 I/O 的通信模型示意圖如下:
如果你細(xì)細(xì)分析,一定會(huì)發(fā)現(xiàn)阻塞 I/O 存在一些缺點(diǎn).根據(jù)阻塞 I/O 通信模型,我總結(jié)了它的兩點(diǎn)缺點(diǎn):
- 當(dāng)客戶端多時(shí),會(huì)創(chuàng)建大量的處理線程.且每個(gè)線程都要占用棧空間和一些 CPU 時(shí)間
- 阻塞可能帶來頻繁的上下文切換,且大部分上下文切換可能是無意義的.在這種情況下非阻塞式 I/O 就有了它的應(yīng)用前景
JavaNIO 是在 jdk1.4 開始使用的,它既可以說成“新 I/O”,也可以說成非阻塞式 I/O.下面是 JavaNIO 的工作原理:
- 由一個(gè)專門的線程來處理所有的 IO 事件,并負(fù)責(zé)分發(fā)
- 事件驅(qū)動(dòng)機(jī)制:事件到的時(shí)候觸發(fā),而不是同步的去監(jiān)視事件
- 線程通訊:線程之間通過 wait,notify 等方式通訊.保證每次上下文切換都是有意義的.減少無謂的線程切換
下面貼出我理解的 JavaNIO 反應(yīng)堆的工作原理圖:
(注:每個(gè)線程的處理流程大概都是讀取數(shù)據(jù),解碼,計(jì)算處理,編碼,發(fā)送響應(yīng).)
Netty 與 NIO
Netty 支持的功能與特性
按照定義來說,Netty 是一個(gè)異步,事件驅(qū)動(dòng)的用來做高性能,高可靠性的網(wǎng)絡(luò)應(yīng)用框架.主要的優(yōu)點(diǎn)有:
- 框架設(shè)計(jì)優(yōu)雅,底層模型隨意切換適應(yīng)不同的網(wǎng)絡(luò)協(xié)議要求
- 提供很多標(biāo)準(zhǔn)的協(xié)議,安全,編碼解碼的支持
- 解決了很多 NIO 不易用的問題
- 社區(qū)更為活躍,在很多開源框架中使用,如 Dubbo,RocketMQ,Spark 等
上圖體現(xiàn)的主要是 Netty 支持的功能或者特性:
- 底層核心有:Zero-Copy-CapableBuffer,非常易用的靈拷貝 Buffer(這個(gè)內(nèi)容很有意思,稍后專門來說)网沾;統(tǒng)一的 API癞蚕;標(biāo)準(zhǔn)可擴(kuò)展的時(shí)間模型
- 傳輸方面的支持有:管道通信(具體不知道干啥的,還請(qǐng)老司機(jī)指教);Http 隧道辉哥;TCP 與 UDP
- 協(xié)議方面的支持有:基于原始文本和二進(jìn)制的協(xié)議桦山;解壓縮攒射;大文件傳輸;流媒體傳輸恒水;protobuf 編解碼会放;安全認(rèn)證;http 和 websocket
Netty 采用 NIO 而非 AIO 的理由
- Netty 不看重 Windows 上的使用,在 Linux 系統(tǒng)上,AIO 的底層實(shí)現(xiàn)仍使用 EPOLL,沒有很好實(shí)現(xiàn) AIO,因此在性能上沒有明顯的優(yōu)勢(shì),而且被 JDK 封裝了一層不容易深度優(yōu)化
- Netty 整體架構(gòu)是 reactor 模型,而 AIO 是 proactor 模型,混合在一起會(huì)非扯ち瑁混亂,把 AIO 也改造成 reactor 模型看起來是把 epoll 繞個(gè)彎又繞回來
- AIO 還有個(gè)缺點(diǎn)是接收數(shù)據(jù)需要預(yù)先分配緩存,而不是 NIO 那種需要接收時(shí)才需要分配緩存,所以對(duì)連接數(shù)量非常大但流量小的情況,內(nèi)存浪費(fèi)很多
- Linux 上 AIO 不夠成熟,處理回調(diào)結(jié)果速度跟不到處理需求,比如外賣員太少,顧客太多,供不應(yīng)求,造成處理速度有瓶頸(待驗(yàn)證)