在上一篇文章中對(duì)于I/O模型已經(jīng)講的比較清楚了宰缤,在I/O密集型應(yīng)用中使用Reactor模式可以明顯提高系統(tǒng)的性能(我們這里談到的性能很大程度上指的是吞吐量)莫湘,但是在具體的開(kāi)發(fā)過(guò)程中模式還是要落地成真實(shí)的代碼,使用傳統(tǒng)的I/O庫(kù)肯定是不行的殃姓,在Java中需要使用java.nio
包下的庫(kù)。
雖然是講NIO的實(shí)現(xiàn),但本文將不會(huì)把所有Java NIO中的主要API全部過(guò)一遍鹿榜,而是通過(guò)例子理清NIO到底可以做什么事情。
本文中提到的JDK源代碼都可以在
%JAVA_HOME%/jre/lib/rt.jar
中看到锦爵。
Java NIO最初在Java4中被引入舱殿,但是到今天還是有很大部分的開(kāi)發(fā)者從來(lái)沒(méi)使用過(guò)NIO的API,因?yàn)榛A(chǔ)I/O已經(jīng)能滿足了我們?nèi)粘5拈_(kāi)發(fā)需求险掀。但如果要開(kāi)發(fā)I/O密集型應(yīng)用的場(chǎng)景下沪袭,NIO可以明顯的提升程序的性能,另外NIO與基礎(chǔ)I/O有本質(zhì)思想上的區(qū)別樟氢。
本文主要講Java中的NIO冈绊,內(nèi)容包含:
- Oracle官方對(duì)NIO的說(shuō)法
- Java中NIO的歷史進(jìn)程
- NIO和NIO.2的區(qū)別在哪里
- NIO中的主要類的介紹
- 使用NIO的API構(gòu)建一個(gè)Socket服務(wù)器
Oracle官方對(duì)NIO的說(shuō)法
首先看看Oracle的官方文檔中是怎么說(shuō)的:
Java中對(duì)于I/O的支持主要包括
java.io
和java.nio
兩個(gè)包的內(nèi)容,它們共同提供了如下特性:
- 通過(guò)數(shù)據(jù)流和序列化從文件系統(tǒng)中讀取和寫數(shù)據(jù)埠啃。
- 提供Charsets死宣,解碼器和編碼器,用于在字節(jié)和Unicode字符之間的翻譯碴开。
- 訪問(wèn)文件毅该、文件的屬性、文件系統(tǒng)潦牛。
- 提供異步的或者非阻塞多路復(fù)用I/O的API眶掌,用于構(gòu)建可擴(kuò)展的服務(wù)器程序。
這里并沒(méi)有提到網(wǎng)絡(luò)I/O的東西巴碗,在Java1.4以前朴爬,網(wǎng)絡(luò)I/O的API都是被放在java.net
包下,在NIO中才被一起放入了java.nio
包下橡淆。
Java中NIO的歷史進(jìn)程
- 最開(kāi)始Java中使用I/O來(lái)訪問(wèn)文件系統(tǒng)只有通過(guò)
java.io.File
類來(lái)做召噩,其中包含了一些對(duì)文件和目錄基本的操作母赵。對(duì)于開(kāi)發(fā)中常碰到的I/O需求一般都能覆蓋到,所以這也是日常開(kāi)發(fā)工作中最常使用的I/O API蚣常。官方文檔中稱之為基礎(chǔ)I/O(Basic I/O)市咽。
基礎(chǔ)I/O是基于各種流的概念的,其基本模型就是上一篇中講到的阻塞I/O抵蚊。 - 為了進(jìn)一步豐富I/O操作的API施绎,也是為了提升在I/O密集型應(yīng)用中的性能,基于Reactor模式贞绳,在Java1.4中引入了
java.nio
包谷醉,其中重點(diǎn)包含幾個(gè)類:
-
java.nio.Buffer
,用來(lái)存儲(chǔ)各種緩沖數(shù)據(jù)的容器冈闭。 -
java.nio.channels.Channel
俱尼,用于連接程序和I/O設(shè)備的數(shù)據(jù)通道。 -
java.nio.channels.Selector
萎攒,多路復(fù)用選擇器遇八,在上一篇中講到過(guò)。 -
java.nio.charset.Charset
耍休,用來(lái)編解碼刃永。
- 在Java7中引入了NIO.2,引入了一系列新的API(主要在新加入的包
Java.nio.file
)羊精,對(duì)于訪問(wèn)文件系統(tǒng)提供了更多的API實(shí)現(xiàn)斯够,更加豐富的文件屬性類,增加了一些異步I/O的API喧锦。同時(shí)读规,還添加了很多實(shí)用方法。
例如:以前簡(jiǎn)單的拷貝一個(gè)文件就必須要寫一大堆的代碼燃少,現(xiàn)在實(shí)用
java.nio.file.Files.copy(Path, Path, CopyOption...)
就可以很輕松的做到了
NIO和NIO.2的區(qū)別在哪里
在上一節(jié)中已經(jīng)簡(jiǎn)單介紹了這兩個(gè)概念的不同束亏,這里再簡(jiǎn)單羅列一下。NIO中引入的一個(gè)重要概念就是Reactor模式阵具,而NIO.2對(duì)NIO本身不是一次升級(jí)碍遍,而是一次擴(kuò)充,NIO.2中新增了很多實(shí)用方法(utilities)怔昨,以支持更多的功能需求雀久,并不是說(shuō)能夠提升多少的性能宿稀。主要增加了如下兩點(diǎn):
- 新的訪問(wèn)文件的API趁舀。
在Java.nio.file
包和其子包中新增了大量的與訪問(wèn)文件相關(guān)的類,其中比較重要的有以下幾個(gè)祝沸,更完整的更新可以在Oracle的官網(wǎng)文檔中查看矮烹。
-
java.nio.file.Path
越庇,它可以用來(lái)取代早期的java.io.File
用來(lái)訪問(wèn)文件。 -
java.nio.file.Files
奉狈,其中包含了大量的對(duì)文件操作的API卤唉。
-
異步I/O的API
在NIO原來(lái)的API的基礎(chǔ)上,增加了對(duì)Proactor模式的支持仁期,可以在包java.nio.channels
中看到新加入的java.nio.channels.AsynchronousChannel
和java.nio.channels.CompletionHandler<V, A>
桑驱。使用這些類可以實(shí)現(xiàn)異步編程,如代碼1中所示://代碼1 //定義一個(gè)處理文件內(nèi)容的函數(shù)式接口 @FunctionalInterface static interface ProcessBuffer{ void process(int result, ByteBuffer bb); } //遞歸地讀取文件的全部?jī)?nèi)容 static void readFileThrough(AsynchronousFileChannel ch, ProcessBuffer runn, int position) { ByteBuffer bb = ByteBuffer.allocate(512); ch.read(bb, position, null, new CompletionHandler<Integer, Object>() { @Override public void completed(Integer result, Object attachment) { System.out.println("成功了"); bb.flip(); runn.process(result, bb); bb.clear(); if (result == bb.capacity()) readFileThrough(ch, runn, position + result); } @Override public void failed(Throwable exc, Object attachment) { System.err.println("失敗了u说啊0镜摹!"); } }); } //讀取文件內(nèi)容赊级,并打印 static void testAIOReadFile() throws IOException { Path p = Paths.get(fileDir, fileName); AsynchronousFileChannel channel = AsynchronousFileChannel.open(p, StandardOpenOption.READ); Thread daemon = new Thread(() -> { try { System.out.println("守護(hù)"); Thread.sleep(10000); } catch (Exception e) { } }); readFileThrough(channel, (result, bb) -> { if (result < bb.capacity()) { System.out.println(new String(Arrays.copyOf(bb.array(), result))); System.out.println("已讀完押框。。理逊。"); daemon.interrupt(); }else { System.out.print(new String(bb.array())); } }, 0); daemon.start(); }
NIO中的主要類的介紹
NIO的基本思想是要構(gòu)建一個(gè)Reactor模式的實(shí)現(xiàn)橡伞,具體落實(shí)到API,在Java中主要有以下幾個(gè)類:
1. java.nio.Buffer
這是一個(gè)容器類晋被,用來(lái)存儲(chǔ)「基礎(chǔ)數(shù)據(jù)類型」兑徘,所有從Channel中讀取出來(lái)的數(shù)據(jù)都要使用Buffer的子類來(lái)作為存儲(chǔ)單元,可以把它想象成一個(gè)帶著很多屬性的數(shù)組(和ArrayList很類似墨微,其實(shí)它的實(shí)現(xiàn)機(jī)制也差不多就是這樣)道媚。
第一次看到介紹Buffer是在一本書上,書上畫了好多方框和指向這些方框的屬性值翘县,看著就頭暈最域。其實(shí)很簡(jiǎn)單,Buffer就是一個(gè)數(shù)組锈麸。
在讀寫交換時(shí)镀脂,必不可少的要批量地去讀取并寫入到目標(biāo)對(duì)象,這個(gè)道理是不變的忘伞。在基礎(chǔ)I/O中如果我們要把一個(gè)輸入流寫入一個(gè)輸出流薄翅,可能會(huì)這么做:
//代碼2
public static void copy(File src, File dest) throws IOException {
FileInputStream in = new FileInputStream(src);
FileOutputStream out = new FileOutputStream(dest);
byte[] buffer = new byte[1024];
int bytes = 0;
while ((bytes = in.read(buffer)) > -1){
out.write(buffer, 0, bytes);
}
out.close();
in.close();
}
以上代碼中使用了一個(gè)真實(shí)的數(shù)組用來(lái)做讀寫切換,從而達(dá)到批量(緩沖)讀寫的目標(biāo)氓奈。
而在NIO中(如代碼1)翘魄,讀寫切換也同樣是使用了一個(gè)數(shù)組進(jìn)行暫存(緩沖),只不過(guò)在這個(gè)數(shù)組之上舀奶,封裝了一些屬性(java.nio.Buffer
源碼中的一些屬性如代碼3所示)和操作暑竟。
//代碼3 - Buffer類中定義的一些屬性
// Invariants: mark <= position <= limit <= capacity
private int mark = -1;
private int position = 0;
private int limit;
private int capacity;
關(guān)于Buffer類詳細(xì)的繼承關(guān)系和其主要方法,可以參考下圖:
2. java.nio.channels.Channel
Channel可以看做是代碼2中InputStream和OutStream的合體育勺,在實(shí)際使用中但荤,我們往往針對(duì)同一個(gè)I/O設(shè)備同時(shí)存在讀和寫的操作罗岖,在基礎(chǔ)I/O中我們就需要針對(duì)同一個(gè)目標(biāo)對(duì)象生成一個(gè)輸入流和輸出流的對(duì)象,可是在NIO中就可以只建立一個(gè)Channel對(duì)象了腹躁。
Channel抽象的概念是對(duì)于某個(gè)I/O設(shè)備的「連接」桑包,可以使用這個(gè)連接進(jìn)行一些I/O操作,java.nio.channels.Channel
本身是一個(gè)接口纺非,只有兩個(gè)方法哑了,但是在Java的的環(huán)境中,往往最簡(jiǎn)單的接口最煩人烧颖,因?yàn)樗膶?shí)現(xiàn)類總是會(huì)異常的多垒手。
//代碼4 - 去除了所有注釋的Channel類
package java.nio.channels;
import java.io.IOException;
import java.io.Closeable;
public interface Channel extends Closeable {
public boolean isOpen();
public void close() throws IOException;
}
當(dāng)然,這是享受多態(tài)帶來(lái)的好處的同時(shí)必須承受的倒信。詳細(xì)的Channel繼承和實(shí)現(xiàn)關(guān)系如下:
3. java.nio.channels.Selector
如果你是使用NIO來(lái)做網(wǎng)絡(luò)I/O科贬,Selector是JavaNIO中最重要的類,正如它的注釋里第一句說(shuō)的鳖悠,Selector是SelectableChannel的「多路復(fù)用器」榜掌。
多路復(fù)用,這是在上一篇介紹過(guò)的概念乘综,在不同的操作系統(tǒng)也有不同的底層實(shí)現(xiàn)憎账。用戶也可以自己實(shí)現(xiàn)自己的Selector(通過(guò)類
java.nio.channels.spi.SelectorProvider
)
//代碼5 - provider構(gòu)造方法
public static SelectorProvider provider() {
synchronized (lock) {
if (provider != null)
return provider;
return AccessController.doPrivileged(
new PrivilegedAction<SelectorProvider>() {
public SelectorProvider run() {
if (loadProviderFromProperty())
//如果設(shè)置了屬性java.nio.channels.spi.SelectorProvider,則會(huì)載入響應(yīng)的類
return provider;
if (loadProviderAsService())
return provider;
provider = sun.nio.ch.DefaultSelectorProvider.create();
return provider;
}
});
}
}
如果你不實(shí)現(xiàn)自己的SelectorProvidor卡辰,在代碼5中可以看到JDK會(huì)使用類sun.nio.ch.DefaultSelectorProvider
來(lái)創(chuàng)建胞皱,這里會(huì)根據(jù)你的操作系統(tǒng)的類別不同而選擇不同的實(shí)現(xiàn)類。openJDK中也有相應(yīng)的實(shí)現(xiàn)九妈,有興趣的可以去GrepCode查看一下反砌,Mac OS下是使用KQueueSelectorProvider
。
Selector的使用比較簡(jiǎn)單萌朱,同時(shí)要配合SelectionKey使用宴树,它們的繼承結(jié)構(gòu)圖也比較簡(jiǎn)單,如下:
4. 其他
其他一些類如Charset個(gè)人感覺(jué)屬于實(shí)用性很強(qiáng)的類晶疼,但是在NIO與基礎(chǔ)I/O的比較中就顯得不那么重要了酒贬。
使用NIO的API構(gòu)建一個(gè)Socket服務(wù)器
Java1.4引入的NIO中已經(jīng)可以實(shí)現(xiàn)Reactor模式,在NIO.2中又引入了AIO的API翠霍,所以本節(jié)將分別使用兩種模式來(lái)實(shí)現(xiàn)一個(gè)Socket服務(wù)器锭吨,這里重點(diǎn)介紹Java中NIO API的使用,至于NIO和基礎(chǔ)I/O的性能對(duì)比寒匙,網(wǎng)上有很多零如,這里就不再做比較了。
首先定義一些基礎(chǔ)類,將從Socket中獲取的數(shù)據(jù)解析成TestRequest對(duì)象埠况,然后再找到響應(yīng)的Handler】醚ⅲ看代碼:
我這里為了偷懶辕翰,將很多基礎(chǔ)類和方法定義在了一個(gè)類中,這種方法其實(shí)十分不可取狈谊。
//代碼6
/**
* 執(zhí)行計(jì)算工作的線程池
*/
private static ExecutorService workers = Executors.newFixedThreadPool(10);
/**
* 解析出來(lái)的請(qǐng)求對(duì)象
* @author lk
*
*/
public static class TestRequest{
/**
* 根據(jù)解析到的method來(lái)獲取響應(yīng)的Handler
*/
String method;
String args;
public static TestRequest parseFromString(String req) {
System.out.println("收到請(qǐng)求:" + req);
TestRequest request = new TestRequest();
request.method = req.substring(0, 512);
request.args = req.substring(512, req.length());
return request;
}
}
/**
* 具體的邏輯需要實(shí)現(xiàn)此接口
* @author lk
*
*/
public static interface SockerServerHandler {
ByteBuffer handle(TestRequest req);
}
主要的邏輯其實(shí)就是使用ServerSocketChannel
的實(shí)例監(jiān)聽(tīng)本地端口喜命,并且設(shè)置其為非阻塞(默認(rèn)為阻塞模式)。代碼7中的parse()
函數(shù)是一個(gè)典型的「使用Buffer讀取Channel中數(shù)據(jù)」的方法河劝,這里為了簡(jiǎn)(tou)單(lan)壁榕,默認(rèn)只讀取1024個(gè)字節(jié),所以并沒(méi)有實(shí)際去循環(huán)讀取赎瞎。
//代碼7
private static void useNIO() {
Selector dispatcher = null;
ServerSocketChannel serverChannel = null;
try {
dispatcher = Selector.open();
serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
serverChannel.socket().setReuseAddress(true);
serverChannel.socket().bind(LOCAL_8080);
//ServerSocketChannel只支持這一種key牌里,因?yàn)閟erver端的socket只能去accept
serverChannel.register(dispatcher, SelectionKey.OP_ACCEPT);
while (dispatcher.select() > 0) {
operate(dispatcher);
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 在分發(fā)器上循環(huán)獲取連接事件
* @param dispatcher
* @throws IOException
*/
private static void operate(Selector dispatcher) throws IOException {
//Set<SelectionKey> keys = dispatcher.keys();
Set<SelectionKey> keys = dispatcher.selectedKeys();
Iterator<SelectionKey> ki = keys.iterator();
while(ki.hasNext()) {
SelectionKey key = ki.next();
ki.remove();
if (key.isAcceptable()) {
ServerSocketChannel channel = (ServerSocketChannel) key.channel();
//針對(duì)此socket的IO就是BIO了
final SocketChannel socket = channel.accept();
workers.submit(() -> {
try {
TestRequest request = TestRequest.parseFromString(parse(socket));
SockerServerHandler handler = (SockerServerHandler) Class.forName(getClassNameForMethod(request.method)).newInstance();
socket.write(handler.handle(request));
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
});
}
}
}
private static String parse(SocketChannel socket) throws IOException {
String req = null;
try {
ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
byte[] bytes;
int count = 0;
if ((count = socket.read(buffer)) >= 0) {
buffer.flip();
bytes = new byte[count];
buffer.get(bytes);
req = new String(bytes, Charset.forName("utf-8"));
buffer.clear();
}
} finally {
socket.socket().shutdownInput();
}
return req;
}
Java的程序有個(gè)通病,寫出來(lái)的程序又臭又長(zhǎng)务甥,同樣是使用JavaNIO的API實(shí)現(xiàn)一個(gè)非阻塞的Socket服務(wù)器牡辽,使用NIO.2中AIO(異步I/O)的API就很簡(jiǎn)單了,但是卻陷入了回調(diào)地獄(當(dāng)然可以通過(guò)別的方式避免回調(diào)敞临,但是其本質(zhì)還是一樣的)态辛。和上邊介紹的Reactor模式相比,簡(jiǎn)直就是拿核武器比步槍挺尿,有點(diǎn)降維攻擊的意味了奏黑。Reactor中那么復(fù)雜的概念和邏輯所實(shí)現(xiàn)的功能,使用AIO的API很輕松就搞定了编矾,而且概念比較少熟史,邏輯更清晰。
//代碼8
private static void useAIO() {
AsynchronousServerSocketChannel server;
try {
server = AsynchronousServerSocketChannel.open();
server.bind(LOCAL_8080);
while (true) {
Future<AsynchronousSocketChannel> socketF = server.accept();
try {
final AsynchronousSocketChannel socket = socketF.get();
workers.submit(() -> {
try {
ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
socket.read(buffer, null, new CompletionHandler<Integer, Object>() {
@Override
public void completed(Integer count, Object attachment) {
byte[] bytes;
if (count >= 0) {
buffer.flip();
bytes = new byte[count];
buffer.get(bytes);
String req = new String(bytes, Charset.forName("utf-8"));
TestRequest request = TestRequest.parseFromString(req);
try {
SockerServerHandler handler = (SockerServerHandler) Class.forName(getClassNameForMethod(request.method)).newInstance();
ByteBuffer bb = handler.handle(request);
socket.write(bb, null, null);
} catch (InstantiationException | IllegalAccessException
| ClassNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
buffer.clear();
}
}
@Override
public void failed(Throwable exc, Object attachment) {
// TODO Auto-generated method stub
}
});
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally {
}
});
} catch (InterruptedException | ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
break;
}
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
最后是測(cè)試用的客戶端程序窄俏,NIO在客戶端同樣也可以發(fā)揮很重要的作用以故,這里就先略過(guò)了,代碼9中客戶端測(cè)試使用的是基礎(chǔ)I/O:
//代碼9
private volatile static int succ = 0;
public static void main(String[] args) throws UnknownHostException, IOException {
CountDownLatch latch = new CountDownLatch(100);
for (int i = 0; i < 100; i++) {
new Thread( () -> {
Socket soc;
try {
soc = new Socket("localhost", 8080);
if (soc.isConnected()) {
OutputStream out = soc.getOutputStream();
byte[] req = "hello".getBytes("utf-8");
out.write(Arrays.copyOf(req, 1024));
InputStream in = soc.getInputStream();
byte[] resp = new byte[1024];
in.read(resp, 0, 1024);
String result = new String(resp, "utf-8");
if (result.equals("haha")) {
succ++;
}
System.out.println(Thread.currentThread().getName() + "收到回復(fù):" + result);
out.flush();
out.close();
in.close();
soc.close();
}
try {
System.out.println(Thread.currentThread().getName() + "去睡覺(jué)等待裆操。怒详。。");
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
} catch (Exception e) {
e.printStackTrace();
}
}).start();
latch.countDown();
}
Runnable hook = () -> {
System.out.println("成功個(gè)數(shù):" + succ);
};
Runtime.getRuntime().addShutdownHook(new Thread(hook));
}
總結(jié)
原本只是想寫一篇Netty在RPC框架中的使用踪区,寫著寫著就寫多了昆烁。本文從Java中引入NIO的歷史講起,梳理了Java對(duì)NIO支持的具體的API缎岗,最后通過(guò)一個(gè)典型的Socket服務(wù)器的例子具體的展示了Java中NIO相關(guān)API的使用静尼,將Reactor模式和Proactor模式從理論落地到實(shí)際的代碼。
由于作者比較懶,貼圖全部都是在網(wǎng)上找的(代碼大部分是自己寫的)鼠渺,如侵刪鸭巴。下一篇將講到比較火的一個(gè)NIO框架Netty的實(shí)現(xiàn)與使用。