簡單介紹 Asynchronous I/O
JDK7 已經(jīng)大致確定發(fā)布時(shí)間别惦。JSR 203 提出很久了允趟。2009.11.13屎慢,JDK7 M5(b76)已經(jīng)發(fā)布亚享。JSR 203 習(xí)慣上稱為 NIO.2咽块,主要包括新的:
異步 I/O(簡稱 AIO);
Multicase 多播欺税;
Stream Control Transport Protocol(SCTP)侈沪;
文件系統(tǒng) API揭璃;
以及一些 I/O API 的更新,例如:java.io.File.toPath亭罪,NetworkChannel 的完整抽象瘦馍,等等。
本文將主要關(guān)注 AIO应役。AIO 包括 Sockets 和 Files 兩部分的異步通道接口及其實(shí)現(xiàn)情组,并盡量使用操作系統(tǒng)提供的原生本地 I/O 功能進(jìn)行實(shí)現(xiàn)。例如 Windows 版本的實(shí)現(xiàn)就使用了所謂的完成端口模型(IOCP)箩祥。其實(shí) JDK 7 中 AIO 實(shí)現(xiàn)本質(zhì)上說應(yīng)該是 Proactor 模式的實(shí)現(xiàn)院崇。Alexander Libman 提供 NIO 版本的 JProactor 的實(shí)現(xiàn)。NIO.2 版本 JProactor 正在進(jìn)行袍祖。Grizzly 也已經(jīng)提供新的基于 AIO 的實(shí)現(xiàn)底瓣。如果您只想檢查這些最新的 API,?NIO.2 項(xiàng)目的 Javadoc 站點(diǎn)?只列出了 NIO.2 部分的 API蕉陋。
AIO 的核心概念:發(fā)起非阻塞方式的 I/O 操作濒持。當(dāng) I/O 操作完成時(shí)通知。
應(yīng)用程序的責(zé)任就是:什么時(shí)候發(fā)起操作寺滚? I/O 操作完成時(shí)通知誰?
AIO 的 I/O 操作屈雄,有兩種方式的 API 可以進(jìn)行:
Future 方式村视;
Callback 方式。
進(jìn)群:697699179可以獲取Java各類入門學(xué)習(xí)資料酒奶!
這是我的微信公眾號【編程study】各位大佬有空可以關(guān)注下蚁孔,每天更新Java學(xué)習(xí)方法,感謝惋嚎!
學(xué)習(xí)中遇到問題有不明白的地方杠氢,推薦加小編Java學(xué)習(xí)群:697699179內(nèi)有視頻教程 ,直播課程 另伍,等學(xué)習(xí)資料鼻百,期待你的加入
下面我們分別對這兩種方式的 API 進(jìn)行舉例說明。
Future 方式
Future 方式:即提交一個(gè) I/O 操作請求摆尝,返回一個(gè) Future温艇。然后您可以對 Future 進(jìn)行檢查,確定它是否完成堕汞,或者阻塞 IO 操作直到操作正常完成或者超時(shí)異常勺爱。使用 Future 方式很簡單,比較典型的代碼通常像清單 1 所示讯检。
清單 1. 使用 Future 方式的代碼示例
AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();// 連接遠(yuǎn)程服務(wù)器琐鲁,等待連接完成或者失敗Future result = ch.connect(remote);// 進(jìn)行其他工作卫旱,例如,連接后的準(zhǔn)備環(huán)境围段,f.e. //prepareForConnection(); //Future 返回 null 表示連接成功if(result.get()!=null){// 連接失敗顾翼,清理剛才準(zhǔn)備好的環(huán)境,f.e. //clearPreparation(); return;? }// 網(wǎng)絡(luò)連接正常建立...? ByteBuffer buffer = ByteBuffer.allocateDirect(8192);// 進(jìn)行讀操作Future result = ch.read(buffer);// 此時(shí)可以進(jìn)行其他工作蒜撮,f.e. //prepareLocalFile(); // 然后等待讀操作完成try{? ? int bytesRead = result.get();if(bytesRead==-1){// 返回 -1 表示沒有數(shù)據(jù)了而且通道已經(jīng)結(jié)束暴构,即遠(yuǎn)程服務(wù)器正常關(guān)閉連接。//clear(); return;? ? }// 處理讀到的內(nèi)容段磨,例如取逾,寫入本地文件,f.e. //writeToLocolFile(buffer); }catch(ExecutionExecption x) {//failed }
需要注意的是苹支,因?yàn)?Future.get()?是同步的砾隅,所以如果不仔細(xì)考慮使用場合,使用 Future 方式可能很容易進(jìn)入完全同步的編程模式债蜜,從而使得異步操作成為一個(gè)擺設(shè)晴埂。如果這樣,那么原來舊版本的 Socket API 便可以完全勝任寻定,大可不必使用異步 I/O儒洛。
Callback 方式
Callback 方式:即提交一個(gè) I/O 操作請求,并且指定一個(gè)?CompletionHandler?狼速。當(dāng)異步 I/O 操作完成時(shí)琅锻,便發(fā)送一個(gè)通知,此時(shí)這個(gè) CompletionHandler 對象的 completed 或者 failed 方法將會被調(diào)用向胡,樣例代碼如清單 2 所示恼蓬。
清單 2. 完成處理接口
publicinterfaceCompletionHandler{// 當(dāng)操作完成后被調(diào)用,result 參數(shù)表示操作結(jié)果僵芹,//attachment 參數(shù)表示提交操作請求時(shí)的參數(shù)处硬。voidcompleted(V result, A attachment);// 當(dāng)操作失敗是調(diào)用,exc 參數(shù)表示失敗原因拇派。attachment 參數(shù)同上荷辕。voidfailed(Throwable exc, A attachment);? }
關(guān)于 Attachment 參數(shù)
Attachment 參數(shù)是不是看著十分眼熟呢?是的件豌,NIO 中也使用類似的方法桐腌。當(dāng)然 I/O 操作是不會對這個(gè)參數(shù)進(jìn)行任何操作的,可以用于在不同的 CompletionHandler 對象之間進(jìn)行通信苟径。
V?表示結(jié)果值的類型案站。對于異步網(wǎng)絡(luò)通道的讀寫操作而言,這個(gè)結(jié)果值 V 都是整數(shù)類型,表示已經(jīng)操作的卦數(shù)蟆盐,如果是 -1承边,NIO.2 內(nèi)核實(shí)現(xiàn)保證傳遞的?ByteBuffer?參數(shù)不會有變化。
A?表示關(guān)聯(lián)到 I/O 操作的對象的類型石挂。用于傳遞操作環(huán)境博助。通常會封裝一個(gè)連接環(huán)境。
如果成功則 completed 方法被調(diào)用痹愚。如果失敗則 failed 方法被調(diào)用富岳。
準(zhǔn)備好 CompletionHandler 之后,如何使用 CompletionHandler 呢拯腮? AIO 提供四種類型的異步通道以及不同的 I/O 操作接受一個(gè) CompletionHandler 對象窖式,它們分別是:
AsynchronousSocketChannel:connect,read动壤,write
AsynchronousFileChannel:lock萝喘,read,write
AsynchronousServerSocketChannel:accept
AsynchronousDatagramChannel:read琼懊,write阁簸,send,receive
本文重點(diǎn)關(guān)注 AsynchronousSocketChannel 的使用哼丈,首先簡單瀏覽一下該類型的 API启妹。
AsynchronousSocketChannel
publicabstractclassAsynchronousSocketChannelimplementsAsynchronousByteChannel,NetworkChannel
創(chuàng)建一個(gè)異步網(wǎng)絡(luò)通道,并且綁定到一個(gè)默認(rèn)組醉旦。
publicstaticAsynchronousSocketChannelopen() throws IOException
將異步網(wǎng)絡(luò)通道連接到遠(yuǎn)程服務(wù)器翅溺,使用指定的 CompletionHandler 聽候完成通知。
publicabstractvoidconnect(SocketAddress remote,? ? A attachment,? ? CompletionHandler handler)
從異步網(wǎng)絡(luò)通道讀取數(shù)據(jù)到指定的緩沖區(qū)髓抑,使用指定的 CompletionHandler 聽候完成通知。
publicfinalvoidread(ByteBuffer dst,? ? A attachment,? ? CompletionHandler handler)
向異步網(wǎng)絡(luò)通道寫緩沖區(qū)中的數(shù)據(jù)优幸,使用指定的 CompletionHandler 聽候完成通知吨拍。
publicfinalvoidwrite(ByteBuffer src,? ? A attachment,? ? CompletionHandler handler)
開始簡單的異步 I/O 網(wǎng)絡(luò)客戶端程序
本文重點(diǎn)關(guān)注 AIO 的 socket 部分。接下來网杆,我們以 AIO 方式的 FTP 客戶端程序?yàn)槔危_始體會異步執(zhí)行的快樂。需要提醒的是:快樂和痛苦如影隨行碳却。好队秩,那“痛并快樂著”吧。
為什么選擇客戶端編程的主題呢昼浦?
為什么選擇客戶端編程的主題呢馍资?相對來說,其他文章和資料通常使用網(wǎng)絡(luò)服務(wù)器作為主題关噪∧裥罚客戶端的相對較少乌妙。 使用 AIO 進(jìn)行客戶端編程有什么好處呢?想象一個(gè) UI 的客戶端程序建钥,再看看時(shí)下流行的下載工具藤韵,線程一大堆,搞得你手上的工作做得不爽熊经。好有一比泽艘,工廠希望有訂單的時(shí)候多些工人,訂單少的時(shí)候就少些工人镐依。 使用 AIO匹涮,程序通常可以使用更少的線程馋吗。
使用 AIO焕盟,可以想象一個(gè)在線視頻播放的應(yīng)用場景。使用異步 I/O 回調(diào)方式宏粤,可以這樣描述一邊下載視頻一邊播放的功能:
準(zhǔn)備好網(wǎng)絡(luò)連接
準(zhǔn)備一個(gè)緩沖區(qū)脚翘,提交讀操作希望下載部分視頻內(nèi)容,(這個(gè)讀請求馬上完成)
等待讀請求完成操作绍哎,此時(shí)可以進(jìn)行其他工作来农,比如播放廣告
讀操作真正完成,得到通知崇堰,CompletionHandler#completed 方法被調(diào)用沃于,
啟動(dòng)另外的播放線程,從下載的緩沖區(qū)讀取內(nèi)容播放視頻海诲。
再準(zhǔn)備一個(gè)另外的緩沖區(qū)繁莹,回到第二步
這樣,第二步到第六步自動(dòng)構(gòu)成一個(gè)執(zhí)行循環(huán)特幔,但不是 while 之類的代碼循環(huán)咨演。
本文以 FTP 客戶端程序?yàn)槔瑏黻U述如何使用異步 I/O 進(jìn)行網(wǎng)絡(luò)程序的編寫蚯斯。
FTP 分為兩個(gè)通道進(jìn)行處理:控制通道和數(shù)據(jù)通道薄风。
首先,開始 FTP 的控制通道的編程拍嵌。FTP 的控制通道使用 telnet 行命令方式進(jìn)行請求和響應(yīng)處理遭赂。 第一個(gè)例子不會復(fù)雜,我們只是連接到一個(gè)遠(yuǎn)程服務(wù)器横辆,并且檢查某個(gè)文件的大小撇他,然后退出。基本步驟如下:
連接到 FTP 服務(wù)器逆粹。為了便于測試募疮,本文將“攻擊”ftp.gnu.org 服務(wù)器。
讀取服務(wù)器的歡迎信息僻弹,檢查遠(yuǎn)程服務(wù)器是否已經(jīng)準(zhǔn)備就緒阿浓。
如果服務(wù)器沒有準(zhǔn)備好,關(guān)閉連接蹋绽,退出
如果服務(wù)器沒有問題芭毙,發(fā)送登錄命令。
檢查登錄命令結(jié)果卸耘。如果登錄失敗退敦,轉(zhuǎn)到第 8 步。
如果服務(wù)器沒有問題蚣抗,發(fā)送檢查文件大小的命令侈百。
檢查命令結(jié)果。并且顯示結(jié)果翰铡。
發(fā)送退出命令
關(guān)閉連接钝域。
使用進(jìn)行一個(gè)簡單的設(shè)計(jì):
第一個(gè)簡單有問題的例子
importjava.io.IOException;importjava.net.InetSocketAddress;importjava.nio.ByteBuffer;importjava.nio.channels.AsynchronousSocketChannel;importjava.nio.channels.CompletionHandler;publicclassFTPClient1{publicstaticvoidmain(String[] args)throwsIOException{// 第一個(gè),創(chuàng)建異步網(wǎng)絡(luò)通道AsynchronousSocketChannel channel = AsynchronousSocketChannel.open();// 連接到服務(wù)器锭魔,以 ftp.gnu.org 為目標(biāo)channel.connect(newInetSocketAddress("ftp.gnu.org",21), channel,// 使用連接完成的回調(diào)newCompletionHandler() {@Overridepublicvoidcompleted(Void result, AsynchronousSocketChannel attachment){// 完成連接后例证,啟動(dòng) FTP 的控制邏輯FTPClient1 client =newFTPClient1();? ? ? ? ? ? ? ? client.start(attachment);? ? ? ? ? ? }@Overridepublicvoidfailed(Throwable exc, AsynchronousSocketChannel attachment){? ? ? ? ? ? ? ? exc.printStackTrace();? ? ? ? ? ? }? ? ? ? });//connect 的調(diào)用異步執(zhí)行,馬上完成迷捧,阻止 JVM 退出System.in.read();? ? }? ? AsynchronousSocketChannel channel;publicvoidstart(AsynchronousSocketChannel channel){this.channel = channel;// 準(zhǔn)備讀緩沖區(qū)ByteBuffer buffer = ByteBuffer.allocateDirect(128);// 發(fā)出讀操作請求织咧,channel.read(buffer, buffer,// 讀操作完成后通知newCompletionHandler() {@Overridepublicvoidcompleted(Integer result, ByteBuffer attachment){if(result >0) {// 簡單處理讀到的響應(yīng)結(jié)果intposition = attachment.position() -1;if(attachment.get(position -1) ==13&&? ? ? ? ? ? ? ? ? ? attachment.get(position) ==10) {if(isValidReply(attachment)) {? ? ? ? ? ? ? ? ? ? ? ? ? ? attachment.flip();? ? ? ? ? ? ? ? ? ? ? ? ? ? showReply(attachment);if(getReplyCode(attachment) ==220)? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? login();? ? ? ? ? ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? ? ? }else{// 繼續(xù)讀FTPClient1.this.channel.read(attachment, attachment,this);? ? ? ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? ? }else{? ? ? ? ? ? ? ? ? ? ? System.out.println("remote server closed");? ? ? ? ? ? ? ? ? }? ? ? ? ? ? ? }@Overridepublicvoidfailed(Throwable exc, ByteBuffer attachment){? ? ? ? ? ? ? ? ? exc.printStackTrace();? ? ? ? ? ? ? }? ? ? ? ? });? ? ? }protectedvoidlogin(){// 準(zhǔn)備寫緩沖區(qū)String user ="user anonymous\r\n";? ? ? ? ? ByteBuffer buffer = ByteBuffer.wrap(user.getBytes());// 發(fā)出寫操作請求channel.write(buffer, buffer,// 寫操作完成通知newCompletionHandler() {@Overridepublicvoidcompleted(Integer result, ByteBuffer attachment){if(attachment.hasRemaining())? ? ? ? ? ? ? ? ? ? ? channel.write(attachment, attachment,this);else{// channel.read(dst, attachment, handler); // readReply(); // 此處有問題}? ? ? ? ? ? ? }@Overridepublicvoidfailed(Throwable exc, ByteBuffer attachment){? ? ? ? ? ? ? ? ? exc.printStackTrace();? ? ? ? ? ? ? }? ? ? ? ? });? ? ? }protectedvoidshowReply(ByteBuffer attachment){while(attachment.hasRemaining())? ? ? ? ? ? ? System.out.print((char) attachment.get());? ? ? }publicstaticintgetReplyCode(ByteBuffer buffer){returnCharacter.digit(buffer.get(0),10) *100+? ? ? ? ? Character.digit(buffer.get(1),10) *10+ Character.digit(buffer.get(2),10);? ? ? }publicstaticbooleanisValidReply(ByteBuffer buffer){returnbuffer.get(3) ==32&& Character.isDigit(buffer.get(0))? ? ? ? && Character.isDigit(buffer.get(1))? ? ? ? ? ? ? ? ? && Character.isDigit(buffer.get(2));? ? ? }? }
問題:上面的代碼中,login 方法中漠秋,完成 login 命令之后笙蒙,如何繼續(xù)?
答案是:不能繼續(xù)庆锦。實(shí)際上捅位,上面的例子代碼回到了同步處理時(shí)代。典型的錯(cuò)誤使用方式肥荔。痛。 同時(shí)朝群,CompletionHandler 的創(chuàng)建也成了問題燕耿,需要不停地創(chuàng)建這種類型的對象嗎?痛姜胖! 回顧前面提到的核心:?應(yīng)用程序的責(zé)任:什么時(shí)候發(fā)起操作誉帅? I/O 操作完成時(shí)通知誰?
就本例而言,F(xiàn)TPClient 本身應(yīng)該承擔(dān)應(yīng)用程序的責(zé)任蚜锨,正如 Client 名稱所示档插,應(yīng)該由 Client 來實(shí)現(xiàn) CompletionHandler。 Client 負(fù)責(zé)發(fā)出 I/O 操作請求亚再,I/O 操作完成通知 Client郭膛。正如世界上其他諸多問題一樣,名稱本身就是個(gè)問題氛悬。此處的 Client 的意思是真正的顧客则剃。
可以想象另外一個(gè)場景:去一個(gè)有叫號機(jī)的銀行大廳辦理業(yè)務(wù)∪缤保“我”到銀行棍现,“我”決定辦理個(gè)人業(yè)務(wù),所以取個(gè)人業(yè)務(wù)的號碼镜遣。然后看看前面等待的其他客人還不少己肮,計(jì)算一下時(shí)間,“我”決定去隔壁饞嘴一個(gè)冰淇淋悲关,回來后谎僻,在大廳到處晃晃,這時(shí)候坚洽,大廳廣播通知戈稿,333 號顧客請到 3 號窗口辦理業(yè)務(wù),“我”聽到了讶舰,檢查一下號碼鞍盗,“我”持有 333 號,所以“我”去 3 號窗口跳昼。
上面這個(gè)場景中有幾個(gè)非常重要的事實(shí)?“我”決定取個(gè)人業(yè)務(wù)號碼般甲,“我”聽到了,“我”是顧客鹅颊。因此敷存,上面例子應(yīng)該讓 FTPClient1 實(shí)現(xiàn) CompletionHandler。這是對的堪伍。但是 FTPClient1 有兩個(gè)重要的職責(zé):發(fā)出讀操作請求和發(fā)出寫操作請求锚烦。需要兩個(gè) CompletionHandler 的角色,但是不能重復(fù)實(shí)現(xiàn) CompletionHandler 接口帝雇,此時(shí)內(nèi)部類是個(gè)不錯(cuò)的選擇涮俄。修改上面的例子,如下:
第二個(gè)簡單的例子
import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler;publicclassFTPClient2{publicstaticvoidmain(String[] args) throws IOException{? ? ? ? AsynchronousSocketChannel channel = AsynchronousSocketChannel.open();? ? ? ? channel.connect(newInetSocketAddress("ftp.gnu.org",21), channel,newCompletionHandler() {? ? ? ? ? ? ? ? ? ? @Overridepublicvoidcompleted(Void result,
? ? ? ? ? ? ? ? ? ? ? ? AsynchronousSocketChannel attachment){? ? ? ? ? ? ? ? ? ? ? ? FTPClient2 client =newFTPClient2();? ? ? ? ? ? ? ? ? ? ? ? client.start(attachment);? ? ? ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? ? ? @Overridepublicvoidfailed(Throwable exc,
? ? ? ? ? ? ? ? ? ? ? ? AsynchronousSocketChannel attachment){? ? ? ? ? ? ? ? ? ? ? ? exc.printStackTrace();? ? ? ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? });? ? ? ? System.in.read();? ? }? ? AsynchronousSocketChannel channel;voidreadResponse(){? ? ? ? ByteBuffer buffer = ByteBuffer.allocateDirect(128);? ? ? ? read(buffer);? ? }voidread(ByteBuffer buffer){? ? ? ? channel.read(buffer, buffer, reader);? ? }// 使用內(nèi)部類接收讀操作完成通知? ? CompletionHandler reader =newCompletionHandler() {? ? ? ? @Overridepublicvoidcompleted(Integer result, ByteBuffer attachment){if(result >0) {intposition = attachment.position() -1;if(attachment.get(position -1) ==13&&? ? ? ? ? ? ? ? ? ? attachment.get(position) ==10) {if(isValidReply(attachment,0)) {? ? ? ? ? ? ? ? ? ? ? ? attachment.flip();? ? ? ? ? ? ? ? ? ? ? ? showReply(attachment);// 狀態(tài)邏輯尸闸,處理響應(yīng)onReply(getReplyCode(attachment,0));? ? ? ? ? ? ? ? ? ? }else{? ? ? ? ? ? ? ? ? ? ? ? removeLine(attachment, position -2);if(isValidReply(attachment,0)) {? ? ? ? ? ? ? ? ? ? ? ? ? ? attachment.flip();? ? ? ? ? ? ? ? ? ? ? ? ? ? showReply(attachment);? ? ? ? ? ? ? ? ? ? ? ? ? ? onReply(getReplyCode(attachment,0));? ? ? ? ? ? ? ? ? ? ? ? }elseread(attachment);? ? ? ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? }else{if(!attachment.hasRemaining())? ? ? ? ? ? ? ? ? ? ? ? removeLine(attachment, position -1);? ? ? ? ? ? ? ? ? ? read(attachment);? ? ? ? ? ? ? ? }? ? ? ? ? ? }else{? ? ? ? ? ? ? ? System.out.println("remote server closed");? ? ? ? ? ? }? ? ? ? }? ? ? ? ? @Overridepublicvoidfailed(Throwable exc, ByteBuffer attachment){? ? ? ? ? ? exc.printStackTrace();? ? ? ? }? ? };publicvoidstart(AsynchronousSocketChannel channel){this.channel = channel;? ? ? ? readResponse();? ? }protectedvoidonReply(intreplyCode){// 按照前面定義好的步驟彻亲,處理狀態(tài)邏輯if(replyCode ==220)? ? ? ? ? ? login();if(replyCode ==230)? ? ? ? ? ? writeCommand("size README");elseif(replyCode ==213)writeCommand("QUIT");elseif(replyCode ==221)try{? ? ? ? ? ? ? ? channel.close();? ? ? ? ? ? }catch(IOException e) {? ? ? ? ? ? ? ? e.printStackTrace();? ? ? ? ? ? }? ? }voidwriteCommand(Stringstring){? ? ? ? System.out.print("==>");? ? ? ? System.out.println(string);? ? ? ? ByteBuffer buffer = ByteBuffer.wrap((string+"\r\n").getBytes());? ? ? ? write(buffer);? ? }voidwrite(ByteBuffer buffer){? ? ? ? channel.write(buffer, buffer, writer);? ? }// 使用內(nèi)部類接收寫操作完成通知CompletionHandler writer =newCompletionHandler() {? ? ? ? @Overridepublicvoidcompleted(Integer result, ByteBuffer attachment){if(attachment.hasRemaining())? ? ? ? ? ? ? ? channel.write(attachment, attachment,this);elsereadResponse();? ? ? ? }? ? ? ? @Overridepublicvoidfailed(Throwable exc, ByteBuffer attachment){? ? ? ? ? ? ? exc.printStackTrace();? ? ? ? }? ? };protectedvoidlogin(){? ? ? ? String user ="user anonymous";? ? ? ? writeCommand(user);? ? }// 處理多行響應(yīng)protectedvoidremoveLine(ByteBuffer buffer,intposition){intlimit = buffer.position();bytec;while(position >=0) {? ? ? ? ? ? c = buffer.get(position);if(c ==13|| c ==10) {? ? ? ? ? ? ? ? showReply(buffer, position);? ? ? ? ? ? ? ? buffer.position(position +1);? ? ? ? ? ? ? ? buffer.limit(limit);? ? ? ? ? ? ? ? buffer.compact();break;? ? ? ? ? ? }? ? ? ? ? ? position--;? ? ? ? }? ? }// 針對多數(shù) FTP 服務(wù)器的響應(yīng)的偷懶的方法孕锄,不用費(fèi)勁處理 String。protectedvoidshowReply(ByteBuffer buffer){while(buffer.hasRemaining())? ? ? ? ? ? System.out.print((char) buffer.get());? ? }protectedvoidshowReply(ByteBuffer buffer,intposition){for(inti =0; i < position; i++)? ? ? ? ? ? System.out.print((char) buffer.get(i));? ? }publicstaticintgetReplyCode(ByteBuffer buffer,intstart){returnCharacter.digit(buffer.get(start),10) *100+? ? ? ? Character.digit(buffer.get(start +1),10) *10+ Character.digit(buffer.get(start +2),10);? ? }publicstaticbooleanisValidReply(ByteBuffer buffer,intstart){returnbuffer.get(start +3) ==32&&? ? ? ? Character.isDigit(buffer.get(start))? ? ? ? ? ? ? ? && Character.isDigit(buffer.get(start +1))? ? ? ? ? ? ? ? && Character.isDigit(buffer.get(start +2));? ? }publicstaticintfindCRLF(ByteBuffer buffer,intstart,intend){while(start < end) {if(buffer.get(start++) ==13) {if(start < end) {if(buffer.get(start) ==10) {returnstart +1;? ? ? ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? }? ? ? ? ? ? }? ? ? ? }return-1;? ? } }
對比兩個(gè)代碼苞尝,可以發(fā)現(xiàn):修改后的代碼的 onReply 方法畸肆,與上文中描述的需求步驟基本上一模一樣。與使用阻塞模式編寫的代碼相比宙址,應(yīng)該更加簡潔轴脐。阻塞模式下,你至少需要一個(gè)控制循環(huán)曼氛。似乎有點(diǎn)快樂了豁辉。
繼續(xù) FTP 的編程,升華完成通知類型
因?yàn)樽x寫操作的使用遠(yuǎn)遠(yuǎn)多于其他類型的操作舀患,所以重點(diǎn)考慮如何處理讀寫操作徽级。 回顧前面的第二個(gè)例子中的 reader 和 writer 成員, 其實(shí)與對象編程理論和實(shí)踐中的一個(gè)很重要的原理“單一職責(zé)原理”比較吻合聊浅。 但是餐抢,如果需要寫很多的網(wǎng)絡(luò)程序,或者提供一個(gè)網(wǎng)絡(luò)編程的框架(雖然現(xiàn)在有不少低匙,例如:grizzly旷痕,JProactor),那么內(nèi)部類的方式顯然顯得局限顽冶。
重用欺抗?重用什么?如何重用强重?
完成讀寫操作的回調(diào)次數(shù)绞呈?
發(fā)送 1K 的數(shù)據(jù),到底需要幾次回調(diào)呢间景?鬼知道佃声。實(shí)際上鬼也不知道。
需要注意的是:AIO 讀寫操作并不保證操作一次全部完成倘要。單個(gè)讀寫操作請求可能收到多次完成通知
多數(shù)網(wǎng)絡(luò)應(yīng)用程序發(fā)送響應(yīng)或者請求消息圾亏,都需要將準(zhǔn)備好的緩沖區(qū)全部內(nèi)容發(fā)送出去》馀。可以預(yù)見志鹃,前面的 writer 內(nèi)部類成員可以獨(dú)立,改編為抽象的 Writer 類型泽西。這時(shí)候曹铃,前文中內(nèi)部類的隱式引用好處就會失去,而且誕生出新的回調(diào)接口尝苇。
BUG 铛只?太重復(fù)的勞動(dòng)
很多情況下都應(yīng)該檢查?CompletionHandler.completed?的 Integer 類型結(jié)果是否為 -1,看看是否通常已經(jīng)關(guān)閉糠溜。
if(result==-1) {? // 通道已經(jīng)關(guān)閉,執(zhí)行 onChannelClosed }else{? // 正常處理}
實(shí)際上 AIO 的內(nèi)核實(shí)現(xiàn)已經(jīng)對 result 是否等于 -1 做出了判斷,不知道基于何種考慮寝杖,completed 方法的 result 參數(shù)包含 -1 值酬凳。read() 方法的傳統(tǒng)嗎?這種處理直接導(dǎo)致人類的資源浪費(fèi):重復(fù)考慮這種判斷红柱,重復(fù)考慮判斷后的處理承匣。作者在 NIO2 的 dev list 中已經(jīng)提起“訴訟”,但是看樣子不果锤悄。
FTP 使用 telnet 協(xié)議的消息格式韧骗。消息以 <CRLF> 結(jié)束。 Telnet 協(xié)議家族的響應(yīng)消息基本上都使用“code<SPACE>message<CRLF>"零聚。
從處理 FTP 或者 telnet 協(xié)議家族的響應(yīng)消息來看袍暴,前文的 reader 成員應(yīng)該可以獨(dú)立,至少可以抽象一個(gè)專門用于讀取 Telnet 響應(yīng)的 TelnetReader 類型隶症。同樣政模,也誕生出新的回調(diào)接口。對于 Reader 類型蚂会,還可以想象幾種應(yīng)用模式:
讀取指定長度的數(shù)據(jù)淋样,SizeReader;
一直讀直到對方關(guān)閉通道胁住,EOFReader趁猴。
多數(shù)情況下讀操作都會去檢查讀到的數(shù)據(jù)長度是否為 -1,以檢測對方是否已經(jīng)關(guān)閉通道
這樣措嵌,對于 Reader 類型躲叼,某種程度的策略模式的應(yīng)用需求已經(jīng)浮現(xiàn)出來。
但是 Client 類型本身至少也可以實(shí)現(xiàn)一種類型的 CompletionHandler企巢。如果這樣枫慷,將產(chǎn)生一個(gè)爭論:繼承還是委托? 很多情況下這實(shí)際上是口味的問題浪规,并非優(yōu)劣的選擇或听。
同時(shí),對于讀寫操作而言笋婿,CompletionHandler 的類型是確定的 Integer 類型誉裆,似乎增加一個(gè)新的派生接口 Callback<T> 更加滿足需要。
新的讀寫操作回調(diào)接口
publicinterfaceCallbackextendsCompletionHandler{@Overridevoidcompleted(Integer result, T context);@Overridevoidfailed(Throwable cause, T context);? }
除上述考慮之外缸濒,最重要的一點(diǎn)是足丢,有狀態(tài)還是無狀態(tài)粱腻。CompletionHandler 或者 Callback 接口本身無狀態(tài)可言,但其實(shí)現(xiàn)存在有無狀態(tài)的選擇斩跌。AIO 內(nèi)核并不關(guān)心 CompletionHandler 的 attachment 參數(shù)绍些,內(nèi)核不會使用也不會施加任何限制。但是實(shí)現(xiàn)類則大不同耀鸦。有狀態(tài)和無狀態(tài)的設(shè)計(jì)將直接影響到 attachment 參數(shù)的使用柬批。如您所看見,Callback 接口已經(jīng)將 attachment 參數(shù)更名為 context袖订。同時(shí)氮帐,因?yàn)?AsynchronousChannel 都需要?ByteBuffer?,attachment 的使用也必須考慮 ByteBuffer 的使用方式洛姑。對于每一個(gè)讀寫操作而言上沐,有三個(gè)因素是必須考慮的:AsynchronousChannel,ByteBuffer楞艾,attachment奄容。普通應(yīng)用程序也好,還是框架产徊,實(shí)際上只考慮一個(gè)問題昂勒,就是如何組合這三個(gè)因素。某種程度上說舟铜,AIO 編程其實(shí)是?attachment?編程戈盈,實(shí)不為過。怎一個(gè)痛字了得谆刨!
與此同時(shí)塘娶,因?yàn)檎Q生新的回調(diào)接口,預(yù)示著 Client 的層次在不斷增加痊夭,也意味著 Client 的職責(zé)在進(jìn)行分化刁岸。某些網(wǎng)絡(luò)應(yīng)用框架中的 filter 類型與此類似。
在沒有更好的方案的時(shí)候她我,作者選擇有狀態(tài)方式的設(shè)計(jì)虹曙。
簡單的有狀態(tài)寫操作類型
publicclassBufferWriterimplementsCallback{privateAsynchronousSocketChannel channel;privateByteBuffer buffer;privateCharset charset;publicBufferWriter(AsynchronousSocketChannel channel, Charset charset){this.channel = channel;this.charset = charset;? }publicvoidwrite(String string, WriteCallback write){? buffer = ByteBuffer.wrap(string.getBytes(charset));? channel.write(buffer, write,this);? }@Overridepublicvoidcompleted(Integer result, WriteCallback context){if(buffer.hasRemaining())? channel.write(buffer, context,this);else{? buffer =null;? context.writeCompleted();? }? }@Overridepublicvoidfailed(Throwable cause, WriteCallback context){? buffer =null;? context.writeFailed(cause);? }? }
抽象讀操作模板類型
publicabstractclassAbstractReadCallbackimplementsCallback{protectedabstractvoidreadCompleted(Integer result, T context);protectedabstractvoidonChannelClose(T context);@Overridepublicvoidcompleted(Integer result, T context){// 重新分發(fā)回調(diào)通知if(result >0)? readCompleted(result, context);elseonChannelClose(context);? }? }
簡單的有狀態(tài)讀操作類型
publicclassTelnetReplyReaderextendsAbstractReadCallback>{privateAsynchronousSocketChannel channel;privateCharsetDecoder decoder;// 簡單的 ByteBuffer 工廠,來自 JDK 的 corba 中的實(shí)現(xiàn)privateByteBufferPool pool;privateByteBuffer buffer;//FTP 響應(yīng)數(shù)據(jù)對象privateReply reply =newReply();publicTelnetReplyReader(AsynchronousSocketChannel channel,
? ? ByteBufferPool pool, Charset charset){this.channel = channel;this.pool = pool;? ? ? ? decoder = charset.newDecoder();? ? }publicvoidread(ResponseCallback<Reply> protocol){? ? ? ? reply.reset();if(buffer ==null)? ? ? ? ? ? buffer = pool.get(1024);? ? ? ? buffer.clear();? ? ? ? channel.read(buffer, protocol,this);? ? }@OverrideprotectedvoidonChannelClose(ResponseCallback<Reply> context){try{? ? ? ? ? ? channel.close();? ? ? ? }catch(IOException e) {// ignore; }// 轉(zhuǎn)換為特定的異常類型failed(newClosedChannelException(), context);? ? }@OverrideprotectedvoidreadCompleted(Integer result,ResponseCallback<Reply> context){? ? ? ? ByteBuffer buffer =this.buffer;try{// 響應(yīng)代碼的處理邏輯番舆,直到獲得有效的響應(yīng)代碼酝碳,否則哭到長城intposition = buffer.position();if(buffer.get(position -2) ==13&&? ? ? ? ? ? ? ? buffer.get(position -1) ==10) {// Yes check reply code; if(findReplyCode(buffer, position -2)) {// buffer position at the code first char; intfirst = buffer.position();? ? ? ? ? ? ? ? ? ? reply.code = getReplyCode(buffer, first);if(first >0) {? ? ? ? ? ? ? ? ? ? ? ? buffer.flip();? ? ? ? ? ? ? ? ? ? ? ? reply.other.append(decoder.decode(buffer));? ? ? ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? ? ? buffer.limit(position -2);? ? ? ? ? ? ? ? ? ? buffer.position(first +4);? ? ? ? ? ? ? ? ? ? reply.message = decoder.decode(buffer).toString();? ? ? ? ? ? ? ? ? ? returnBuffer();? ? ? ? ? ? ? ? ? ? context.onResponse(reply);return;? ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? buffer.flip();? ? ? ? ? ? ? ? reply.other.append(decoder.decode(buffer));? ? ? ? ? ? ? ? buffer.clear();? ? ? ? ? ? ? ? channel.read(buffer, context,this);return;? ? ? ? ? ? }// No reply code, consider cache other message if(buffer.hasRemaining()) {? ? ? ? ? ? ? ? channel.read(buffer, context,this);return;? ? ? ? ? ? }// Have to cache some message, but may be have reply code, so just check CRLF;intindex = findLF(buffer, position -2);if(index == -1) {? ? ? ? ? ? ? ? buffer.flip();? ? ? ? ? ? ? ? reply.other.append(decoder.decode(buffer));? ? ? ? ? ? }else{? ? ? ? ? ? ? ? buffer.position(0).limit(index +1);? ? ? ? ? ? ? ? reply.other.append(decoder.decode(buffer));? ? ? ? ? ? ? ? buffer.position(index);? ? ? ? ? ? }? ? ? ? ? ? buffer.limit(position);? ? ? ? ? ? buffer.compact();? ? ? ? ? ? channel.read(buffer, context,this);? ? ? ? }catch(CharacterCodingException ex) {? ? ? ? ? ? failed(ex, context);? ? ? ? }? ? }@Overridepublicvoidfailed(Throwable cause, ResponseCallback<Reply> context){? ? ? ? returnBuffer();? ? ? ? context.failed(cause);? ? }privatevoidreturnBuffer(){? ? ? ? pool.releaseBuffer(buffer);? ? ? ? buffer =null;? ? } ...
使用有狀態(tài)讀寫操作類型的控制類
publicclassFTPClientimplementsResponseCallback,WriteCallback,CommandProvider{privateTelnetReplyReader reader;privateBufferWriter writer;privateSemaphore semaphore =newSemaphore(0);// 傳輸通道的處理環(huán)境privateTransferContext transferContext;protectedvoidstart(Context context, AsynchronousSocketChannel channel){? InetSocketAddress remote;try{? remote = (InetSocketAddress) channel.getRemoteAddress();? }catch(IOException e) {? failed(e);return;? }? InetSocketAddress local;try{? local = (InetSocketAddress) channel.getLocalAddress();? }catch(IOException e) {? failed(e);return;? }? Charset charset = Charset.forName("UTF-8");? reader =newTelnetReplyReader(channel, context.pool(), charset);? writer =newBufferWriter(channel, charset);// 發(fā)起讀操作請求reader.read(this);// 同時(shí),預(yù)備傳輸通道環(huán)境transferContext =newSimpleTransferContext(context,? remote.getAddress(), local.getAddress());? }@OverridepublicvoidonResponse(Reply reply){// 簡單的響應(yīng)處理邏輯try{? transferContext.check(reply);? }catch(Throwable ex) {? ex.printStackTrace();? }// If reply not process right, just pending any advance operation. if(reply.code /100==1)? reader.read(this);elsesemaphore.release();? }@OverridepublicvoidwriteCompleted(){//FTP 規(guī)則恨狈,發(fā)出請求命令后疏哗,開始等待對方的響應(yīng)reader.read(this);? }? ...
除協(xié)議相關(guān)的部分代碼,其余的看上去還蠻簡單禾怠,似乎抽象 Reader 和 Writer 的代價(jià)值得的返奉。上面代碼中的 Context贝搁,Reply 等小類型,可以在完整的源代碼中檢查芽偏。
繼續(xù) FTP徘公,處理傳輸通道
本文之所以選擇 FTP 作為 AIO 的實(shí)踐例子,F(xiàn)TP 的控制通道必須協(xié)調(diào)單獨(dú)的數(shù)據(jù)傳輸通道哮针。不僅如此,使用 Port 方式的話坦袍,客戶端程序還需要建立一個(gè)簡單的網(wǎng)絡(luò)服務(wù)器十厢。
關(guān)于防火墻
本文的編程環(huán)境是 Windows7,由于防火墻的原因捂齐,忽略服務(wù)器方式蛮放。 如果網(wǎng)絡(luò)程序出現(xiàn)故障,防火墻是否為問題的根源奠宜,可以優(yōu)先考慮包颁。 作者在實(shí)踐過程中,曾經(jīng)遭遇到這樣的問題压真。Windows 7 自帶的防火墻識別 FTP 的 PASV 命令娩嚼,并且阻止該命令的執(zhí)行。 而且滴肿,AIO 的內(nèi)核實(shí)現(xiàn)使用轉(zhuǎn)換后系統(tǒng)錯(cuò)誤消息作為異常消息岳悟,會讓你痛的哭。但是泼差,請放心贵少,長城不會倒。
上文中堆缘,我們盡量回避建立網(wǎng)絡(luò)連接的 CompletionHandler 的再處理問題滔灶。FTP 的數(shù)據(jù)傳輸通道
要么使用服務(wù)器方式,使用 AsynchronousServerSocketChannel.accept 方法
要么使用客戶端方式吼肥,使用 AsynchronousSocketChannel.connect 方法录平,與前文類似
延續(xù)上文的處理思路,繼續(xù)抽象用于 connect 的 CompletionHandler 類型缀皱。與前不同的是萄涯,該連接回調(diào)類型使用無狀態(tài)方式設(shè)計(jì)。該例演示下載文件的處理唆鸡。
無狀態(tài)類型的連接回調(diào)類型
publicclassSocketConnectorimplementsConnector {publicvoidconnect(InetSocketAddress remote, ConnectionCallback client)throwsIOException {// 創(chuàng)建新的異步網(wǎng)絡(luò)通道AsynchronousSocketChannel channel = AsynchronousSocketChannel.open();// 無狀態(tài)方式處理涝影,將所有需要的參數(shù)打包為單個(gè) attachment 參數(shù)Object[] attachment = { client, remote, channel };// 啟動(dòng)連接操作channel.connect(remote, attachment,this);? }publicvoidconnect(InetSocketAddress remote, InetSocketAddress local,? ConnectionCallback client)throwsIOException {? AsynchronousSocketChannel channel = AsynchronousSocketChannel.open();// 綁定本地網(wǎng)絡(luò)地址,對于客戶端而言争占,通常是 IP燃逻,對于服務(wù)器而言序目,一定需要端口號channel.bind(local);? Object[] attachment = { client, remote, channel };? channel.connect(remote, attachment,this);? }@Overridepublicvoidcompleted(Void result, Object[] attachment) {// 連接完成,通知 Client 啟動(dòng)協(xié)議控制邏輯((ConnectionCallback) attachment[0]).? start((AsynchronousSocketChannel) attachment[2]);? }@Overridepublicvoidfailed(Throwable cause, Object[] attachment) {? ((ConnectionCallback) attachment[0]).? connectFailed(newException(attachment[1].toString(), cause));? }? }
但是伯襟,有些 FTP 服務(wù)器要求數(shù)據(jù)傳輸通道必須使用與控制通道相同的 ip 地址猿涨,導(dǎo)致連接必須知道并保持控制通道的 ip 地址。唉姆怪!又痛到有狀態(tài)方式了叛赚。
有狀態(tài)類型的連接回調(diào)類型
publicclassTransferConnectorextendsSocketConnector{privateInetAddress localAddress;privateInetAddress remoteAddress;publicTransferConnector(InetAddress remoteAddress, InetAddress localAddress){this.remoteAddress = remoteAddress;this.localAddress = localAddress;? ? ? }protectedInetSocketAddresscreateRemoteAddress(intport){returnnewInetSocketAddress(remoteAddress, port);? ? ? }protectedInetSocketAddresscreateLocalAddress(){returnnewInetSocketAddress(localAddress,0);? ? ? }publicvoidconnect(intport, ConnectionCallback client)throwsIOException{if(port <1)thrownewIOException("Error remote server port number: "+ port);super.connect(createRemoteAddress(port), createLocalAddress(), client);? ? ? }? }
使用連接回調(diào)類型建立數(shù)據(jù)傳輸通道
publicclassSimpleTransferContextimplementsTransferContext,ConnectionCallback,FileLockCallback{? ? ...// 使用單獨(dú)的傳輸連接回調(diào)對象再次進(jìn)行連接完成通知connector =newTransferConnector(remoteAddress, localAddress);? ...// 發(fā)起傳輸通道的連接操作請求caseRETR:? connector.connect(port,this);// clear for next time port =0;? ...// 傳輸通道連接完成 @Overridepublicvoidstart(AsynchronousSocketChannel channel){this.channel = channel;? }
因?yàn)樯婕暗轿募奶幚恚現(xiàn)TP 的數(shù)據(jù)傳輸通道起始控制看起來相當(dāng)簡單稽揭“掣剑快樂其實(shí)是很簡單的東西。
繼續(xù) FTP溪掀,使用 AIO 的異步文件操作
AsynchronousFileChannel 沒有 connect 方法事镣,但是有一個(gè)類似的方法 lock。JDK7 中該方法的聲明如下:
異步文件通道的 lock 方法 API
無狀態(tài)的文件連接回調(diào)類型
publicclassFileLockerimplementsCompletionHandler{publicvoidlock(String filename,longposition,longsize,booleanshared, FileLockCallback client,? ? ? ? ? OpenOption... options)throwsIOException{// 使用新的 AIO 中的 Path API Path path = Paths.get(filename);// 創(chuàng)建異步文件通道對象AsynchronousFileChannel file = AsynchronousFileChannel.open(path, options);// 鎖定要寫的區(qū)域file.lock(position, size, shared, client,this);? ? ? }@Overridepublicvoidcompleted(FileLock result, FileLockCallback attachment){// 文件鎖(或者文件連接)完成通知傳輸通道環(huán)境可以工作attachment.start(result);? ? ? }@Overridepublicvoidfailed(Throwable cause, FileLockCallback attachment){? ? ? ? attachment.lockFailed(cause);? ? ? } }
使用文件連接回調(diào)類型建立文件通道
publicclassSimpleTransferContextimplementsTransferContext,ConnectionCallback,FileLockCallback{ ...@Overridepublicvoidstart(FileLock fileLock){this.fileLock = fileLock;// at here socket channel already prepared // 啟動(dòng)下載過程startDownload();? ? }privateDownloader download;privatevoidstartDownload(){? ? ? ? download =newDownloader(context, channel, fileLock, size);? ? ? ? channel =null;? ? ? ? fileLock =null;? ? ? ? download.run();? ? }publicvoidcheck(Reply reply){if(currentCommand ==null) {? ? ? ? ? ? System.out.println(reply);return;? ? ? ? }intcode = reply.code;? ? ? ? String message = reply.message;switch(currentCommand) {caseSIZE...caseRETR:if(code ==150) {// 150 Opening BINARY mode data connection for README (1765 bytes).intend = message.lastIndexOf(')');if(end != -1) {intstart = message.lastIndexOf('(', end -1);if(start != -1) {//RETR 命令響應(yīng)正確揪胃,檢查本地文件璃哟,預(yù)備下載lockFile(checkSize(message.substring(start +1, end -6)));break;? ? ? ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? ...? ? }protectedvoidlockFile(longsize){try{? ? ? ? ? ? locker.lock(filename,0, size,false,this,? ? ? ? ? ? ? ? StandardOpenOption.CREATE,? ? ? ? ? ? ? ? StandardOpenOption.READ,? ? ? ? ? ? ? ? StandardOpenOption.WRITE);? ? ? ? }catch(IOException e) {? ? ? ? ? ? e.printStackTrace();? ? ? ? }? ? }
當(dāng) FTP 的 RETR 命令正確響應(yīng)后,準(zhǔn)備下載文件喊递。首先準(zhǔn)備好要寫入的本地文件通道随闪,鎖住文件。 文件鎖完成后骚勘,創(chuàng)建新的 Downloader 對象蕴掏,開始真正的下載操作。
使用文件連接回調(diào)類型建立文件通道
publicabstractclassTransfer{// 用于 Socket 和 File 讀寫操作使用的 ByteBuffer 的交換隊(duì)列protectedBlockingDeque bufferQueue =newLinkedBlockingDeque();protectedContext context;publicTransfer(Context context){this.context = context;? }publicByteBuffergetBuffer(intsize){returncontext.pool().get(size);? }protectedvoidreleaseBuffer(ByteBuffer buffer){? context.pool().releaseBuffer(buffer);? }? }
下載實(shí)現(xiàn)调鲸,讀和寫
publicclassDownloaderextendsTransferimplementsReadCallback,FileWriteCallback2,Runnable{// 讀入指定長度內(nèi)容的回調(diào)對象盛杰,處理網(wǎng)絡(luò)內(nèi)容privateSizeReader reader;// 寫入指定長度內(nèi)容的回調(diào)對象,處理文件內(nèi)容privateFileWriter2 writer;privateAtomicBoolean writable =newAtomicBoolean(true);// 用于顯示網(wǎng)絡(luò)數(shù)據(jù)傳輸速率的工具privateConsoleProgress progress =newConsoleProgress();publicDownloader(Context context, AsynchronousSocketChannel socket,? FileLock fileLock,longsize){super(context);? reader =newSizeReader(socket, size,this);? writer =newFileWriter2(fileLock,this);? progress.reset(size);? }@Overridepublicvoidrun(){? reader.read();? }@OverridepublicvoidwriteCompleted(ByteBuffer buffer){// 一個(gè)緩沖區(qū)寫入文件完畢releaseBuffer(buffer);? buffer = bufferQueue.poll();if(buffer !=null)// 如果網(wǎng)絡(luò)已經(jīng)讀好一個(gè)緩沖區(qū)藐石,繼續(xù)寫入文件writer.write(buffer);else// 否則清除寫狀態(tài)writable.set(true);? }@OverridepublicvoidreadCompletedBytes(Integer bytes,longstart,longend){// 顯示網(wǎng)絡(luò)傳輸進(jìn)度progress.update(bytes, start, end);? progress.run();? }@OverridepublicvoidcompletedReadBuffer(ByteBuffer buffer){if(writable.compareAndSet(true,false)) {// 從網(wǎng)絡(luò)下載了一個(gè)緩沖區(qū)的內(nèi)容即供,如果寫文件空閑,通知寫文件writer.write(buffer);? }else{// 如果文件正在寫于微,將當(dāng)前緩沖區(qū)放入后備隊(duì)列bufferQueue.offer(buffer);? }? }@OverridepublicvoidwriteCompleted(){? System.out.println("file saved OK");? }@OverridepublicvoidreadCompleted(){? System.out.println("file transfer OK");? }? ...
Downloader 使用 SizeReader 讀取網(wǎng)絡(luò)數(shù)據(jù)逗嫡。SizeReader 使用自主的緩沖區(qū)申請,不需要調(diào)用者傳遞 ByteBuffer 參數(shù)株依。
Downloader 使用 FileWriter2 寫文件內(nèi)容驱证。FileWriter2 使用一次性寫完外部傳遞的緩沖區(qū)的策略。 需要調(diào)用者傳遞 ByteBuffer 參數(shù)恋腕。
限于篇幅抹锄,具體實(shí)現(xiàn)可以在源代碼中檢查。
總結(jié)
線程池和 Group
前文提到到 group,但是沒有解釋伙单。group 指 AsynchronousChannelGroup获高,用于管理異步通道資源的環(huán)境對象,封裝一個(gè)處理 I/O 完成的機(jī)制吻育。 這個(gè)組對象關(guān)聯(lián)一個(gè)線程池念秧。可以將處理 I/O 事件的任務(wù)提交到這個(gè)線程池布疼,通過 channel 的 read摊趾,write,connect 等方法進(jìn)行游两。線程池中的工作線程將會帶著 channel 上 I/O 操作結(jié)果調(diào)用?CompletionHandler.complete?方法砾层。除了處理 I/O 事件,組關(guān)聯(lián)的線程池可能會執(zhí)行其他與 I/O 操作相關(guān)的任務(wù)器罐。這個(gè) group 對象相當(dāng)于 Proactor 模式中 Dispatcher。
四種異步通道的 open 方法可以指定 group 參數(shù)渐行,或者不指定轰坊。 每個(gè)異步通道都必須關(guān)聯(lián)一個(gè)組,要么是系統(tǒng)默認(rèn)組祟印,要么是創(chuàng)建的一個(gè)特定的組肴沫。例如,不能直接從一個(gè) socket 對象上創(chuàng)建一個(gè) AsynchronousSocketChannel蕴忆。 如果不使用 group 參數(shù)颤芬,java 使用一個(gè)默認(rèn)的系統(tǒng)范圍的組對象。系統(tǒng)默認(rèn)的組對象的線程池參數(shù)可以使用兩個(gè)屬性進(jìn)行配置:
java.nio.channels.DefaultThreadPool.threadFactory 默認(rèn)組對象不會將其關(guān)聯(lián)的線程池中的線程進(jìn)行額外的配置套鹅,因此站蝠,這些線程都是 daemon 線程。
java.nio.channels.DefaultThreadPool.initialSize: 處理 I/O 事件的最大線程數(shù)量卓鹿。
是否使用自定義的 group 對象菱魔,各有優(yōu)劣,由你決定吟孙。
使用 group澜倦,好處是你可以將文件通常與網(wǎng)絡(luò)通道分開,避免線程干擾杰妓。缺點(diǎn)是:使用者通常必須負(fù)責(zé)關(guān)閉組藻治,多數(shù)時(shí)候取決于使用的現(xiàn)成工廠類型。組與 ExecutorService 類似巷挥,這意味著關(guān)閉過程通常是兩步關(guān)閉方法桩卵。 在多層次 Client 結(jié)構(gòu)(例如 FTP 的控制通道需要衍生新的數(shù)據(jù)傳輸通道)中,如果要使用 group,很討厭的一點(diǎn)就是 group 參數(shù)傳遞吸占。沒有環(huán)境編程之類的工具進(jìn)行輔助的話晴叨,使用者必須考慮如何有效傳遞 group 參數(shù)。
不使用 group矾屯,最大的好處是不用傳遞 group 參數(shù)兼蕊。缺點(diǎn)是:必須注意處理非 daemon 線程的完成和退出,不小心的話件蚕,將會導(dǎo)致異步通道的工作丟失孙技;同時(shí)還需要處理線程工廠和最大線程數(shù)的配置。
*PendingException 和 AsynchronousChannel
AsynchronousChannel 設(shè)計(jì)為線程安全的排作,即可以同時(shí)進(jìn)行讀寫操作牵啦,全雙工模式操作。不少協(xié)議使用半雙工模式妄痪。讀完寫或者寫完讀哈雏。什么時(shí)候會進(jìn)行并發(fā)訪問 AsynchronousChannel,即使用全雙工模式衫生?主要看協(xié)議的實(shí)現(xiàn)裳瘪。例如 FTP 的 abort 命令,要求可以控制連接可以同時(shí)進(jìn)行讀寫罪针。數(shù)據(jù)連接在進(jìn)行文件傳輸?shù)臅r(shí)候彭羹,控制連接等待服務(wù)器響應(yīng)。實(shí)際上此時(shí)也可以進(jìn)行寫操作泪酱,發(fā)送一個(gè) abort 命令派殷,促使數(shù)據(jù)傳輸過程中斷。這個(gè) abort 可以從 UI 線程或者從 UI 事件產(chǎn)生的線程中發(fā)出墓阀。雖然如此毡惜,但是不少系統(tǒng)實(shí)現(xiàn)最多只允許一個(gè)寫操作和一個(gè)讀操作。如果一個(gè)讀寫操作沒有完成斯撮,程序又發(fā)送一個(gè)讀寫操作命令虱黄,則導(dǎo)致 ReadPendingException 或者 WritePendingException。如果你的程序非要這樣的話吮成,只有一個(gè)解決辦法橱乱,將讀寫操作的命令使用隊(duì)列排隊(duì)進(jìn)行。通常應(yīng)該不會出現(xiàn)這種需求粱甫,如果有的話泳叠,很有可能是設(shè)計(jì)上的缺陷。
讀寫超時(shí)茶宵。AsynchronousChannel 的讀寫操作可以指定超時(shí)參數(shù)危纫,但是超時(shí)發(fā)生之后,傳遞給讀寫操作的 ByteBuffer 參數(shù)不應(yīng)該向正常讀寫完成一樣進(jìn)行處理。通常設(shè)計(jì)如果超時(shí)發(fā)生种蝶,一般應(yīng)該丟棄當(dāng)前期望數(shù)據(jù)結(jié)果契耿。
ByteBuffer 和解碼
AIO 鼓勵(lì)使用 DirectByteBuffer。就算應(yīng)用程序代碼中不使用 DirectByteBuffer螃征,AIO 內(nèi)核實(shí)現(xiàn)也會使用 DirectByteBuffer 來復(fù)制外部傳入的 HeadByteBuffer 內(nèi)容搪桂。在某些情況下完全可以利用這一特征,偷懶而不會有損失盯滚。例如:傳輸協(xié)議中發(fā)送普通命令踢械,完全可以不使用 DirectByteBuffer,這些命令的提供通常以 String 類型出現(xiàn)魄藕,而 String 到 DirectByteBuffer 無論如何必須經(jīng)過兩個(gè)步驟: String--byte[]--DirectByteBuffer. 第二步完全可以由 AIO 內(nèi)核進(jìn)行内列。
如果需要從 DirectByteBuffer 解碼到 String,有選擇余地:
使用 Decoder 和 CharBuffer:DirectByteBuffer--CharBuffer--(char[])String背率。
使用 String 和 byte[]:DirectByteBuffer--byte[]--(char[])String
可以看出话瞧,這種情況數(shù)組復(fù)制的工作量不小。如果沒有使用 Javolution 方式的棧內(nèi)存分配和對象工廠寝姿,其實(shí)沒有什么區(qū)別交排。
關(guān)于性能
Java 已經(jīng)不少的 NIO 類型的框架, 這里有個(gè)很有意思:“?Announcement: Java NIO Framework?” “也許您想要確認(rèn)某些:?performance comparison: nio v nio2?” 從第二個(gè)例子可以看出会油,使用 AIO 方式進(jìn)行有時(shí)候出奇的簡單个粱,真讓人快樂古毛。 本文提供的 FTPClient 的例子 main 演示了單個(gè)目標(biāo)下載翻翩,在測試過程中與 c 語言實(shí)現(xiàn)的 wget 比較毫不遜色。 簡單修改一下就可以執(zhí)行多個(gè)目標(biāo)下載稻薇,應(yīng)該更快樂嫂冻。