必須明白的幾個(gè)概念
阻塞(Block)和非阻塞(Non-Block)
阻塞和非阻塞是進(jìn)程在訪問數(shù)據(jù)的時(shí)候,數(shù)據(jù)是否準(zhǔn)備就緒的一種處理方式,當(dāng)數(shù)據(jù)沒有準(zhǔn)備的時(shí)候
阻塞:往往需要等待緩沖區(qū)中的數(shù)據(jù)準(zhǔn)備好過后才處理其他的事情,否則一直等待在那里
非阻塞:當(dāng)我們的進(jìn)程訪問我們的數(shù)據(jù)緩沖區(qū)的時(shí)候,如果數(shù)據(jù)沒有準(zhǔn)備好則直接返回,不會(huì)等待.如果數(shù)據(jù)已經(jīng)準(zhǔn)備好,也直接返回
同步(Synchronization)和異步(Asynchronous)
同步和異步都是基于應(yīng)用程序和操作系統(tǒng)處理 IO 事件所采用的方式.比如同步:是應(yīng)用程序要直接參與 IO 讀寫的操作.異步:所有的 IO 讀寫交給操作系統(tǒng)去處理,應(yīng)用程序只需要等待通知
同步方式在處理 IO 事件的時(shí)候,必須阻塞在某個(gè)方法上面等待我們的 IO 事件完成(阻塞 IO 事件或者通過輪詢 IO 事件的方式),對于異步來說,所有的 IO 讀寫都交給了操作系統(tǒng).這個(gè)時(shí)候,我們可以去做其他的事情,并不需要去完成真正的 IO 操作,當(dāng)操作完成 IO 后,會(huì)給我們的應(yīng)用程序一個(gè)通知
同步:阻塞到 IO 事件,阻塞到 read 或則 write.這個(gè)時(shí)候我們就完全不能做自己的事情.讓讀寫方法加入到線程里面,然后阻塞線程來實(shí)現(xiàn),對線程的性能開銷比較大
BIO 與 NIO 對比
下表總結(jié)了 JavaBIO(BlockIO)和 NIO(Non-BlockIO)之間的主要差別異
| IO 模型 | BIO | NIO |
| --- | --- |
| 通信 | 面向流(鄉(xiāng)村公路) | 面向緩沖(高速公路,多路復(fù)用技術(shù)) |
| 處理 | 阻塞 IO(多線程)| 非阻塞 IO(反應(yīng)堆 Reactor)|
| 觸發(fā) | 無 | 選擇器(輪詢機(jī)制) |
面向流與面向緩沖
JavaNIO 和 BIO 之間第一個(gè)最大的區(qū)別是,BIO 是面向流的,NIO 是面向緩沖區(qū)的.JavaBIO 面向流意味著每次從流中讀一個(gè)或多個(gè)字節(jié),直至讀取所有字節(jié),它們沒有被緩存在任何地方.此外,它不能前后移動(dòng)流中的數(shù)據(jù).如果需要前后移動(dòng)從流中讀取的數(shù)據(jù),需要先將它緩存到一個(gè)緩沖區(qū).JavaNIO 的緩沖導(dǎo)向方法略有不同.數(shù)據(jù)讀取到一個(gè)它稍后處理的緩沖區(qū),需要時(shí)可在緩沖區(qū)中前后移動(dòng).這就增加了處理過程中的靈活性.但是,還需要檢查是否該緩沖區(qū)中包含所有您需要處理的數(shù)據(jù).而且,需確保當(dāng)更多的數(shù)據(jù)讀入緩沖區(qū)時(shí),不要覆蓋緩沖區(qū)里尚未處理的數(shù)據(jù)
阻塞與非阻塞
JavaBIO 的各種流是阻塞的.這意味著,當(dāng)一個(gè)線程調(diào)用 read()或 write()時(shí),該線程被阻塞,直到有一些數(shù)據(jù)被讀取,或數(shù)據(jù)完全寫入.該線程在此期間不能再干任何事情了.JavaNIO 的非阻塞模式,使一個(gè)線程從某通道發(fā)送請求讀取數(shù)據(jù),但是它僅能得到目前可用的數(shù)據(jù),如果目前沒有數(shù)據(jù)可用時(shí),就什么都不會(huì)獲取.而不是保持線程阻塞,所以直至數(shù)據(jù)變的可以讀取之前,該線程可以繼續(xù)做其他的事情.非阻塞寫也是如此.一個(gè)線程請求寫入一些數(shù)據(jù)到某通道,但不需要等待它完全寫入,這個(gè)線程同時(shí)可以去做別的事情.線程通常將非阻塞 IO 的空閑時(shí)間用于在其它通道上執(zhí)行 IO 操作,所以一個(gè)單獨(dú)的線程現(xiàn)在可以管理多個(gè)輸入和輸出通道(channel)
選擇器的問世
JavaNIO 的選擇器(Selector)允許一個(gè)單獨(dú)的線程來監(jiān)視多個(gè)輸入通道,你可以注冊多個(gè)通道使用一個(gè)選擇器,然后使用一個(gè)單獨(dú)的線程來“選擇”通道:這些通道里已經(jīng)有可以處理的輸入,或者選擇已準(zhǔn)備寫入的通道.這種選擇機(jī)制,使得一個(gè)單獨(dú)的線程很容易來管理多個(gè)通道
NIO 和 BIO 如何影響應(yīng)用程序的設(shè)計(jì)
無論您選擇 BIO 或 NIO 工具箱,可能會(huì)影響您應(yīng)用程序設(shè)計(jì)的以下幾個(gè)方面:
- 對 NIO 或 BIO 類的 API 調(diào)用
- 數(shù)據(jù)處理邏輯
- 用來處理數(shù)據(jù)的線程數(shù)
API 調(diào)用
當(dāng)然,使用 NIO 的 API 調(diào)用時(shí)看起來與使用 BIO 時(shí)有所不同,但這并不意外,因?yàn)椴⒉皇莾H從一個(gè) InputStream 逐字節(jié)讀取,而是數(shù)據(jù)必須先讀入緩沖區(qū)再處理
數(shù)據(jù)處理
使用純粹的 NIO 設(shè)計(jì)相較 BIO 設(shè)計(jì),數(shù)據(jù)處理也受到影響
在 BIO 設(shè)計(jì)中,我們從 InputStream 或 Reader 逐字節(jié)讀取數(shù)據(jù).假設(shè)你正在處理一基于行的文本數(shù)據(jù)流,例如:有如下一段文本:
Name:Tom
Age:18
Email: tom@qq.com
Phone:13888888888
該文本行的流可以這樣處理:
FileInputStream input = new FileInputStream("d://info.txt");
BufferedReader reader = new BufferedReader(new InputStreamReader(input));
String nameLine = reader.readLine();
String ageLine = reader.readLine();
String emailLine = reader.readLine();
String phoneLine = reader.readLine();
請注意處理狀態(tài)由程序執(zhí)行多久決定.換句話說,一旦 reader.readLine()方法返回,你就知道肯定文本行就已讀完,readline()阻塞直到整行讀完,這就是原因.你也知道此行包含名稱粥诫;同樣,第二個(gè) readline()調(diào)用返回的時(shí)候,你知道這行包含年齡等.正如你可以看到,該處理程序僅在有新數(shù)據(jù)讀入時(shí)運(yùn)行,并知道每步的數(shù)據(jù)是什么.一旦正在運(yùn)行的線程已處理過讀入的某些數(shù)據(jù),該線程不會(huì)再回退數(shù)據(jù)(大多如此).下圖也說明了這條原則:
(JavaBIO:從一個(gè)阻塞的流中讀數(shù)據(jù))而一個(gè) NIO 的實(shí)現(xiàn)會(huì)有所不同,下面是一個(gè)簡單的例子:
ByteBuffer buffer = ByteBuffer.allocate(48);
int bytesRead = inChannel.read(buffer);
注意第二行,從通道讀取字節(jié)到 ByteBuffer.當(dāng)這個(gè)方法調(diào)用返回時(shí),你不知道你所需的所有數(shù)據(jù)是否在緩沖區(qū)內(nèi).你所知道的是,該緩沖區(qū)包含一些字節(jié),這使得處理有點(diǎn)困難
假設(shè)第一次 read(buffer)調(diào)用后,讀入緩沖區(qū)的數(shù)據(jù)只有半行,例如,“Name:An”,你能處理數(shù)據(jù)嗎?顯然不能,需要等待,直到整行數(shù)據(jù)讀入緩存,在此之前,對數(shù)據(jù)的任何處理毫無意義
所以,你怎么知道是否該緩沖區(qū)包含足夠的數(shù)據(jù)可以處理呢?好了,你不知道.發(fā)現(xiàn)的方法只能查看緩沖區(qū)中的數(shù)據(jù).其結(jié)果是,在你知道所有數(shù)據(jù)都在緩沖區(qū)里之前,你必須檢查幾次緩沖區(qū)的數(shù)據(jù).這不僅效率低下,而且可以使程序設(shè)計(jì)方案雜亂不堪.例如:
ByteBuffer buffer = ByteBuffer.allocate(48);
int bytesRead = inChannel.read(buffer);
while(!bufferFull(bytesRead)){
bytesRead = inChannel.read(buffer);
}
bufferFull()方法必須跟蹤有多少數(shù)據(jù)讀入緩沖區(qū),并返回真或假,這取決于緩沖區(qū)是否已滿.換句話說,如果緩沖區(qū)準(zhǔn)備好被處理,那么表示緩沖區(qū)滿了
bufferFull()方法掃描緩沖區(qū),但必須保持在 bufferFull()方法被調(diào)用之前狀態(tài)相同.如果沒有,下一個(gè)讀入緩沖區(qū)的數(shù)據(jù)可能無法讀到正確的位置.這是不可能的,但卻是需要注意的又一問題
如果緩沖區(qū)已滿,它可以被處理.如果它不滿,并且在你的實(shí)際案例中有意義,你或許能處理其中的部分?jǐn)?shù)據(jù).但是許多情況下并非如此.下圖展示了“緩沖區(qū)數(shù)據(jù)循環(huán)就緒”:
設(shè)置處理線程數(shù)
NIO 可讓您只使用一個(gè)(或幾個(gè))單線程管理多個(gè)通道(網(wǎng)絡(luò)連接或文件),但付出的代價(jià)是解析數(shù)據(jù)可能會(huì)比從一個(gè)阻塞流中讀取數(shù)據(jù)更復(fù)雜
如果需要管理同時(shí)打開的成千上萬個(gè)連接,這些連接每次只是發(fā)送少量的數(shù)據(jù),例如聊天服務(wù)器,實(shí)現(xiàn) NIO 的服務(wù)器可能是一個(gè)優(yōu)勢.同樣,如果你需要維持許多打開的連接到其他計(jì)算機(jī)上,如 P2P 網(wǎng)絡(luò)中,使用一個(gè)單獨(dú)的線程來管理你所有出站連接,可能是一個(gè)優(yōu)勢.一個(gè)線程多個(gè)連接的設(shè)計(jì)方案如:
JavaNIO:單線程管理多個(gè)連接
如果你有少量的連接使用非常高的帶寬,一次發(fā)送大量的數(shù)據(jù),也許典型的 IO 服務(wù)器實(shí)現(xiàn)可能非常契合.下圖說明了一個(gè)典型的 IO 服務(wù)器設(shè)計(jì):
JavaBIO:一個(gè)典型的 IO 服務(wù)器設(shè)計(jì)-一個(gè)連接通過一個(gè)線程處理
Java AIO 詳解
jdk1.7(NIO2)才是實(shí)現(xiàn)真正的異步 AIO,把 IO 讀寫操作完全交給操作系統(tǒng),學(xué)習(xí)了 linuxepoll 模式,下面我們來做一些演示
AIO(Asynchronous IO)基本原理
服務(wù)端:AsynchronousServerSocketChannel
客服端:AsynchronousSocketChannel
用戶處理器:CompletionHandler 接口,這個(gè)接口實(shí)現(xiàn)應(yīng)用程序向操作系統(tǒng)發(fā)起 IO 請求,當(dāng)完成后處理具體邏輯,否則做自己該做的事情
“真正”的異步 IO 需要操作系統(tǒng)更強(qiáng)的支持.在 IO 多路復(fù)用模型中,事件循環(huán)將文件句柄的狀態(tài)事件通知給用戶線程,由用戶線程自行讀取數(shù)據(jù),處理數(shù)據(jù).而在異步 IO 模型中,當(dāng)用戶線程收到通知時(shí),數(shù)據(jù)已經(jīng)被內(nèi)核讀取完畢,并放在了用戶線程指定的緩沖區(qū)內(nèi),內(nèi)核在 IO 完成后通知用戶線程直接使用即可.異步 IO 模型使用了 Proactor 設(shè)計(jì)模式實(shí)現(xiàn)了這一機(jī)制,如下圖所示:
AIO 初體驗(yàn)
服務(wù)端代碼:
package com.gupaoedu.vip.netty.io.aio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/*** AIO 服務(wù)端 */
public class AIOServer {
private final int port;
public static void main(String args[]){
int port = 8000; new AIOServer(port);
}
public AIOServer(int port){
this.port = port; listen();
}
private void listen(){
try {
ExecutorService executorService = Executors.newCachedThreadPool();
AsynchronousChannelGroup threadGroup = AsynchronousChannelGroup.withCachedThreadPool(executorService, 1);
final AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open(threadGroup);
server.bind(new InetSocketAddress(port));
System.out.println("服務(wù)已啟動(dòng)侠碧,監(jiān)聽端口" + port);
server.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>(){
final ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
public void completed(AsynchronousSocketChannel result, Object attachment){
System.out.println("IO 操作成功,開始獲取數(shù)據(jù)");
try {
buffer.clear();
result.read(buffer).get();
buffer.flip();
result.write(buffer);
buffer.flip();
} catch (Exception e){
System.out.println(e.toString());
} finally {
try {
result.close();
server.accept(null, this);
} catch (Exception e){
System.out.println(e.toString());
}
}System.out.println("操作完成");
}
@Override
public void failed(Throwable exc, Object attachment){
System.out.println("IO 操作是失敗: " + exc);
}
});
try {
Thread.sleep(Integer.MAX_VALUE);
} catch (InterruptedException ex){
System.out.println(ex);
}
} catch (IOException e){
System.out.println(e);
}
}
}
客戶端代碼:
package com.gupaoedu.vip.netty.io.aio;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
/*** AIO 客戶端 */
public class AIOClient {
private final AsynchronousSocketChannel client;
public AIOClient()throws Exception{
client = AsynchronousSocketChannel.open();
}
public void connect(String host,int port)throws Exception{
client.connect(new InetSocketAddress(host,port),null,new CompletionHandler<Void,Void>(){
@Override
public void completed(Void result, Void attachment){
try {
client.write(ByteBuffer.wrap("這是一條測試數(shù)據(jù)".getBytes())).get();
System.out.println("已發(fā)送至服務(wù)器");
} catch (Exception ex){
ex.printStackTrace();
}
}
@Override
public void failed(Throwable exc, Void attachment){
exc.printStackTrace();
}
});
final ByteBuffer bb = ByteBuffer.allocate(1024);
client.read(bb, null, new CompletionHandler<Integer,Object>(){
@Override
public void completed(Integer result, Object attachment){
System.out.println("IO 操作完成" + result);
System.out.println("獲取反饋結(jié)果" + new String(bb.array()));
}
@Override
public void failed(Throwable exc, Object attachment){
exc.printStackTrace();
}
});
try {
Thread.sleep(Integer.MAX_VALUE);
} catch (InterruptedException ex){
System.out.println(ex);
}
}
public static void main(String args[])throws Exception{
new AIOClient().connect("localhost",8000);
}
}
執(zhí)行結(jié)果:
服務(wù)端
客戶端
各 IO 模型對比與總結(jié)
最后再來一張表總結(jié)
| 屬性同步阻塞 IO(BIO)| 偽異步 IO | 非阻塞 IO(NIO)| 異步 IO(AIO)|
| --- | --- | --- | --- | --- |
| 客戶端數(shù):IO | 線程數(shù) | 1:1 | M:N(M>=N)| M:1 | M:0 |
| 阻塞類型 | 阻塞 | 阻塞 | 非阻塞 | 非阻塞 |
| 同步 | 同步 | 同步 | 同步(多路復(fù)用) | 異步 |
| API | 使用難度 | 簡單 | 簡單 | 復(fù)雜 | 一般 |
| 調(diào)試難度 | 簡單 | 簡單 | 復(fù)雜 | 復(fù)雜 |
| 可靠性 | 非 | 常差 | 差 | 高 | 高 |
| 吞吐量 | 低 | 中 | 高 | 高 |