IO一直是軟件開(kāi)發(fā)中的核心部分之一昏翰,而隨著互聯(lián)網(wǎng)技術(shù)的提高苍匆,IO的重要性也越來(lái)越重刘急。縱觀開(kāi)發(fā)界浸踩,能夠巧妙運(yùn)用IO叔汁,不但對(duì)于公司,而且對(duì)于開(kāi)發(fā)人員都非常的重要检碗。Java的IO機(jī)制也是一直在不斷的完善据块,以應(yīng)對(duì)日見(jiàn)增多的流量。
一.JavaIO的方式
????第一折剃,傳統(tǒng)java.io包提供了諸如File的抽象另假,輸入,輸出流怕犁。交互方式是同步边篮,阻塞;
第二奏甫,在java1.4中引入NIO框架(java.nio包)戈轿,提供了Channel,Selector阵子,Buffer等抽象思杯,構(gòu)建多路復(fù)用的,同步非阻塞IO挠进,同時(shí)提供了更接近操作系統(tǒng)底層的高性能數(shù)據(jù)操作方式色乾;
第三颖杏,在Java 7中匪傍,NIO有了進(jìn)一步的改進(jìn),也就是NIO 2费变。其引入了異步非阻塞IO方式攘须,或者說(shuō)AIO(Asynchronous IO)漆撞。異步IO基于事件和回調(diào)機(jī)制。
二.傳統(tǒng)BIO
????傳統(tǒng)BIO于宙,采用BIO編程通信模型的服務(wù)端浮驳,通常由一個(gè)獨(dú)立的Acceptor線程負(fù)責(zé)監(jiān)聽(tīng)客戶端的連接。它接收到客戶端連接請(qǐng)求之后為每個(gè)客戶端創(chuàng)建一個(gè)新的線程進(jìn)行鏈路處理捞魁,處理完成后至会,通過(guò)輸出流返回給對(duì)應(yīng)的客戶端,然后銷毀線程谱俭。
服務(wù)端提供IP地址和監(jiān)聽(tīng)的端口奉件,客戶端通過(guò)TCP的三次握手與之連接宵蛀,連接成功后,雙方通過(guò)套接字通信县貌。
ServerSocket負(fù)責(zé)綁定IP地址术陶,啟動(dòng)監(jiān)聽(tīng)端口;Socket負(fù)責(zé)發(fā)起鏈接操作煤痕。連接成功梧宫,雙方通過(guò)輸入和輸出流進(jìn)行同步阻塞式通信“诘铮“你來(lái)了塘匣,我才往”的交流方式。
Java.net提供的API巷帝,如Socket忌卤,ServerSocket,HttpURLConnection也歸與同步阻塞IO類庫(kù)楞泼,因?yàn)榫W(wǎng)絡(luò)通信同樣是IO行為驰徊。
優(yōu)點(diǎn):代碼簡(jiǎn)單,直觀现拒,維護(hù)方便辣垒。
缺點(diǎn):IO效率極低,影響性能印蔬,卻反彈性處理勋桶。
當(dāng)吞吐量增大了,有妙招:線程池侥猬。
使用線程池管理線程例驹,避免頻繁創(chuàng)建,銷毀線程的開(kāi)銷退唠。但是鹃锈,其底層使用的依然是同步阻塞IO,通常被稱為“偽異步IO模型”瞧预,真的是治標(biāo)不治本屎债。
比如tomcat 采用的傳統(tǒng)的BIO(同步阻塞IO模型)+ 線程池 模式: 這個(gè)模式適合活動(dòng)連接數(shù)不是特別高的(連接<1000)
這個(gè)模式是每個(gè)連接每個(gè)線程,之所以用多線程垢油, 主要原因是在socket.accept(),socket.read(),socket.wirte() 三個(gè)函數(shù)都是同步阻塞的盆驹, 當(dāng)一個(gè)連接在處理IO的時(shí)候, 系統(tǒng)是阻塞的滩愁,如果是單線程的話系統(tǒng)必然死掉躯喇, 如果是單線程的話,對(duì)于多核cpu硝枉,cpu的資源沒(méi)有得到很好的利用廉丽,所以采用線程池的模式倦微,這樣線程的創(chuàng)建和回收成本相對(duì)較低;
如果對(duì)十萬(wàn)甚至百萬(wàn)級(jí)連接的時(shí)候正压,傳統(tǒng)的BIO模型是無(wú)能為力的欣福, 因?yàn)锽IO有幾個(gè)問(wèn)題:
- 線程的創(chuàng)建和銷毀成本很高,在Linux這樣的操作系統(tǒng)中蔑匣,線程本質(zhì)上就是一個(gè)進(jìn)程劣欢。創(chuàng)建和銷毀都是重量級(jí)的系統(tǒng)函數(shù)棕诵;
- 線程本身占用較大內(nèi)存裁良,像Java的線程棧,一般至少分配512K~1M的空間校套,如果系統(tǒng)中的線程數(shù)過(guò)千价脾,恐怕整個(gè)JVM的內(nèi)存都會(huì)被吃掉一半;
- 線程的切換成本是很高的笛匙。操作系統(tǒng)發(fā)生線程切換的時(shí)候侨把,需要保留線程的上下文,然后執(zhí)行系統(tǒng)調(diào)用妹孙。如果線程數(shù)過(guò)高秋柄,可能執(zhí)行線程切換的時(shí)間甚至?xí)笥诰€程執(zhí)行的時(shí)間,這時(shí)候帶來(lái)的表現(xiàn)往往是系統(tǒng)load偏高蠢正、CPU sy使用率特別高(超過(guò)20%以上)骇笔,導(dǎo)致系統(tǒng)幾乎陷入不可用的狀態(tài);
- 容易造成鋸齒狀的系統(tǒng)負(fù)載嚣崭。因?yàn)橄到y(tǒng)負(fù)載是用活動(dòng)線程數(shù)或CPU核心數(shù)笨触,一旦線程數(shù)量高但外部網(wǎng)絡(luò)環(huán)境不是很穩(wěn)定,就很容易造成大量請(qǐng)求的結(jié)果同時(shí)返回雹舀,激活大量阻塞線程從而使系統(tǒng)負(fù)載壓力過(guò)大芦劣;
概念:
Socket又稱“套接字”,應(yīng)用程序通常通過(guò)“套接字”向網(wǎng)絡(luò)發(fā)出請(qǐng)求或者應(yīng)答網(wǎng)絡(luò)請(qǐng)求说榆。
Socket和ServerSocket類庫(kù)位于java.net包中虚吟,serverSocket用于服務(wù)器端,Socket是建立網(wǎng)絡(luò)連接時(shí)使用的签财。在連接成功時(shí)串慰,應(yīng)用程序兩端都會(huì)產(chǎn)生一個(gè)Socket實(shí)例,操作這個(gè)實(shí)例荠卷,完成所需的會(huì)話模庐,對(duì)于一個(gè)網(wǎng)絡(luò)連接來(lái)說(shuō),套接字是平等的油宜,不因?yàn)樵诜?wù)器端或在客戶端而產(chǎn)生不同的級(jí)別掂碱,不管是Socket還是ServerSocket它們的工作都是通過(guò)SocketImpl類及其子類完成的
套接字之間鏈接過(guò)程分為四步:
- 服務(wù)器監(jiān)聽(tīng)
服務(wù)器監(jiān)聽(tīng):是服務(wù)端套接字并不定位具體的客戶端套接字怜姿,而是處于等待連接的狀態(tài),實(shí)時(shí)監(jiān)控網(wǎng)絡(luò)的狀態(tài) - 客戶端請(qǐng)求服務(wù)器
客戶端請(qǐng)求:是指由客戶端的套接字提出連接請(qǐng)求疼燥,要連接的目標(biāo)是服務(wù)器端的套接字沧卢。為此,客戶端的套接字必須首先描述它要連接的服務(wù)器的套接字醉者,指出服務(wù)器套接字的地址和端口號(hào)但狭,然后就想服務(wù)器端套接字提出連接請(qǐng)求 - 服務(wù)器確認(rèn)
服務(wù)器端連接確認(rèn),是指當(dāng)服務(wù)器端套接字監(jiān)聽(tīng)到或者說(shuō)接受到客戶端套接字的連接請(qǐng)求撬即,他就響應(yīng)客戶端套接字的請(qǐng)求立磁,建立一個(gè)新的線程,把服務(wù)器端套接字的描述發(fā)給客戶端剥槐。 - 客戶端進(jìn)行通信
客戶端連接確認(rèn):一旦客戶端確認(rèn)了此描述唱歧,連接就建立好了,雙方開(kāi)始通信粒竖。而服務(wù)器端套接字繼續(xù)處于監(jiān)聽(tīng)狀態(tài)颅崩,繼續(xù)接受其他客戶端套接字的連接請(qǐng)求
代碼示例:
ServerHandler方法
public class ServerHandler implements Runnable{
private Socket socket ;
public ServerHandler(Socket socket){
this.socket = socket;
}
@Override
public void run() {
BufferedReader in = null;
PrintWriter out = null;
try {
in = new BufferedReader(new InputStreamReader(this.socket.getInputStream()));
out = new PrintWriter(this.socket.getOutputStream(),true);
String body = null;
while(true){
body = in.readLine();
if(body == null) break;
System.out.println("Server :"+ body);
out.println("服務(wù)器端回送響的應(yīng)數(shù)據(jù).");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if(in != null){
try {
in.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if(out != null){
try {
out.close();
} catch (Exception e) {
e.printStackTrace();
}
}
if(socket != null){
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
socket = null;
}
}
}
Server方法
public class Server {
final static int PROT =8765;
public static void main(String[] args) {
ServerSocket server = null;
try {
server = new ServerSocket(PROT);
System.out.println(" server start .. ");
//進(jìn)行阻塞
Socket socket = server.accept();
//新建一個(gè)線程執(zhí)行客戶端的任務(wù)
new Thread(new ServerHandler(socket)).start();
} catch (Exception e) {
e.printStackTrace();
} finally {
if (server !=null) {
try {
server.close();
} catch (IOException e) {
e.printStackTrace();
}
}
server = null;
}
}
}
Client方法
public class Client {
final static String ADDRESS ="127.0.0.1";
final static int PORT =8765;
public static void main(String[] args) {
Socket socket = null;
BufferedReader in = null;
PrintWriter out = null;
try {
socket = new Socket(ADDRESS, PORT);
in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
out = new PrintWriter(socket.getOutputStream(),true);
//向服務(wù)器端發(fā)送數(shù)據(jù)
out.println("接收到客戶端的請(qǐng)求數(shù)據(jù)...");
out.println("接收到客戶端的請(qǐng)求數(shù)據(jù)1111...");
String response = in.readLine();
System.out.println("Client: " + response);
} catch (Exception e) {
e.printStackTrace();
} finally {
if(in != null){
try {
in.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if(out != null){
try {
out.close();
} catch (Exception e) {
e.printStackTrace();
}
}
if(socket != null){
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
socket = null;
}
}
}
網(wǎng)絡(luò)編程的基本模型Client/Server模型,也就是兩個(gè)進(jìn)程直接進(jìn)行相互通信蕊苗,其中服務(wù)端提供配置信息(綁定的IP地址和監(jiān)聽(tīng)端口)沿后,客戶端通過(guò)連接操作向服務(wù)端監(jiān)聽(tīng)的地址發(fā)起連接請(qǐng)求,通過(guò)三次握手建立連接朽砰,如果連接成功尖滚,則雙方即可進(jìn)行通信(網(wǎng)絡(luò)套接字socket)
優(yōu)化方法:使用連接池和任務(wù)隊(duì)列創(chuàng)建偽異步IO
采用線程池和任務(wù)隊(duì)列可以實(shí)現(xiàn)一種偽異步的IO通信框架。
我們學(xué)過(guò)連接池的使用和隊(duì)列的使用锅移,其實(shí)就是將客戶端的socket封裝成一個(gè)task任務(wù)(實(shí)現(xiàn)runnable接口的類)然后投遞到線程池中去熔掺,配置相應(yīng)的隊(duì)列進(jìn)行實(shí)現(xiàn)。
代碼示例
ServerHandler方法
public class ServerHandler implements Runnable {
private Socket socket;
public ServerHandler (Socket socket){
this.socket = socket;
}
@Override
public void run() {
BufferedReader in = null;
PrintWriter out = null;
try {
in = new BufferedReader(new InputStreamReader(this.socket.getInputStream()));
out = new PrintWriter(this.socket.getOutputStream(), true);
String body = null;
while(true){
body = in.readLine();
if(body == null) break;
System.out.println("Server:" + body);
out.println("Server response");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if(in != null){
try {
in.close();
} catch (Exception e1) {
e1.printStackTrace();
}
}
if(out != null){
try {
out.close();
} catch (Exception e2) {
e2.printStackTrace();
}
}
if(socket != null){
try {
socket.close();
} catch (Exception e3) {
e3.printStackTrace();
}
}
socket = null;
}
}
}
HandlerExecutorPool方法
public class HandlerExecutorPool {
private ExecutorService executor;
public HandlerExecutorPool(int maxPoolSize, int queueSize){
this.executor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),
maxPoolSize, 120L, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(queueSize));
}
public void execute(Runnable task){
this.executor.execute(task);
}
}
Server方法
public class Server {
final static int PORT = 8765;
public static void main(String[] args) {
ServerSocket server = null;
BufferedReader in = null;
PrintWriter out = null;
try {
server = new ServerSocket(PORT);
System.out.println("server start");
Socket socket = null;
HandlerExecutorPool executorPool = new HandlerExecutorPool(50, 1000);
while(true){
socket = server.accept();
executorPool.execute(new ServerHandler(socket));
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if(in != null){
try {
in.close();
} catch (Exception e1) {
e1.printStackTrace();
}
}
if(out != null){
try {
out.close();
} catch (Exception e2) {
e2.printStackTrace();
}
}
if(server != null){
try {
server.close();
} catch (Exception e3) {
e3.printStackTrace();
}
}
server = null;
}
}
}
Client方法
public class Client {
final static String ADDRESS = "127.0.0.1";
final static int PORT =8765;
public static void main(String[] args) {
Socket socket = null;
BufferedReader in = null;
PrintWriter out = null;
try {
socket = new Socket(ADDRESS, PORT);
in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
out = new PrintWriter(socket.getOutputStream(), true);
out.println("Client request");
String response = in.readLine();
System.out.println("Client:" + response);
} catch (Exception e) {
e.printStackTrace();
} finally {
if(in != null){
try {
in.close();
} catch (Exception e1) {
e1.printStackTrace();
}
}
if(out != null){
try {
out.close();
} catch (Exception e2) {
e2.printStackTrace();
}
}
if(socket != null){
try {
socket.close();
} catch (Exception e3) {
e3.printStackTrace();
}
}
socket = null;
}
}
}
注意:
????IO不僅僅時(shí)對(duì)文件的操作非剃,網(wǎng)絡(luò)編程中置逻,比如Socket通信也是典型的IO操作。
輸入流(InputStream)备绽,輸出流(OutputStream)券坞,用于讀取或?qū)懭胱止?jié)的(在java中以Stream結(jié)尾)。
而Reader/Writer操作字符肺素,增加了字符編解碼等功能(在java中以Reader/Writer結(jié)尾)恨锚。本質(zhì)上計(jì)算機(jī)操作的都是字節(jié),不管是網(wǎng)絡(luò)通信還是文件讀取倍靡,Reader/Writer相當(dāng)于構(gòu)建了應(yīng)用邏輯和原始數(shù)據(jù)之間的橋梁猴伶。
BufferdOutputStream,等帶緩沖區(qū)的實(shí)現(xiàn),可以避免頻繁的磁盤讀寫(xiě)他挎,進(jìn)而提高IO處理效率筝尾。
區(qū)分異步和同步
????同步,一種可靠有序的運(yùn)行機(jī)制办桨,在進(jìn)行同步操作時(shí)筹淫,后續(xù)的任務(wù)要等待當(dāng)前調(diào)用返回;
異步呢撞,其他任務(wù)不需要等待當(dāng)前調(diào)用是否返回损姜,依靠事件,回調(diào)機(jī)制來(lái)實(shí)現(xiàn)任務(wù)調(diào)用殊霞。
阻塞和非阻塞
????阻塞摧阅,當(dāng)前線程處于阻塞狀態(tài),無(wú)法從事其他任務(wù)脓鹃,只有當(dāng)條件就緒才能繼續(xù)逸尖;
非阻塞,不管IO操作是否結(jié)束瘸右,直接返回,相應(yīng)的操作在后臺(tái)繼續(xù)處理岩齿。
由此總結(jié):同步和異步是結(jié)果太颤,阻塞和非阻塞是過(guò)程。
三.NIO通過(guò)線程的輪詢盹沈,實(shí)現(xiàn)非阻塞IO
NIO的組成部分:
- Buffer 緩沖區(qū)龄章;高效的數(shù)據(jù)容器
不同的是BIO將數(shù)據(jù)直接讀寫(xiě)到Stream對(duì)象中
NIO的數(shù)據(jù)操作都是在緩沖區(qū)中進(jìn)行的
緩沖區(qū)實(shí)際上是一個(gè)數(shù)組,常見(jiàn)類型ByteBuffer乞封,CharBuffer做裙,ShortBuffer,IntBuffer肃晚,LongBuffer锚贱,F(xiàn)loutBuffer,DoubleBuffer - Channel 在NIO中被用來(lái)支持批量式IO操作的一種抽象关串,與流不同拧廊,通道是雙向的。
File/Socket晋修,通常被認(rèn)為是比較高層次的抽象吧碾,而Channel則是更加偏向操作系統(tǒng)底層的一種抽象,這也使得NIO得以充分利用現(xiàn)代操作系統(tǒng)底層機(jī)制墓卦,進(jìn)行性能優(yōu)化倦春。分兩大類:網(wǎng)絡(luò)讀寫(xiě)SelectableChannel(子類包括SocketChannel和ServerSocketChannel);文件操(FileChannel) - Selector 是NIO實(shí)現(xiàn)多路復(fù)用的基礎(chǔ)。它提供一種高效機(jī)制睁本,不斷輪詢注冊(cè)在其上的Channel山叮,找出處于就緒狀態(tài),通過(guò)SelectionKey取得就緒的Channel集合添履,進(jìn)行后續(xù)的IO操作屁倔。服務(wù)端只要提供一個(gè)負(fù)責(zé)Selector輪詢的線程即可,實(shí)現(xiàn)單線程對(duì)多Channel的高效管理暮胧,也是基于操作系統(tǒng)底層機(jī)制
-
Charset 提供Unicode字符串定義锐借,NIO也提供了相應(yīng)的編解碼等
2.jpeg.jpg
NIO的本質(zhì)就是避免原始的TCP建立連接使用3次握手的操作,減少連接的開(kāi)銷
NIO和IO之間一個(gè)最大區(qū)別:IO是面向流的往衷,NIO是面向緩沖區(qū)的钞翔。
通道(Channel),它就像自來(lái)水管道一樣席舍,網(wǎng)絡(luò)數(shù)據(jù)通過(guò)Channel讀取和寫(xiě)入布轿,通道與流不同之處在于通道是雙向的,而流只是一個(gè)方向上移動(dòng)(一個(gè)流必須是inputStream或者outputStream的子類)来颤,而通道可以用于讀汰扭,寫(xiě)或者二者同時(shí)進(jìn)行,最關(guān)鍵的是可以與多路復(fù)用器結(jié)合起來(lái)福铅,有多種的狀態(tài)位萝毛,方便多路復(fù)用器去識(shí)別。事實(shí)上通道分為兩大類滑黔,一類是網(wǎng)絡(luò)讀寫(xiě)的(SelectableChannel)笆包,一類是用于文件操作的(FileChannel),我們使用的SocketChannel和ServerSockerChannel都是SelectableChannel的子類
多路復(fù)用器(seletor)略荡,他是NIO編程的基礎(chǔ)庵佣,非常重要,多路復(fù)用器提供選擇已經(jīng)就緒的任務(wù)的能力汛兜。
簡(jiǎn)單說(shuō)巴粪,就是Selector會(huì)不斷地輪詢注冊(cè)在其上的通道(Channel),如果某個(gè)通道發(fā)生了讀寫(xiě)操作序无,這個(gè)通道就處于就緒狀態(tài)验毡,會(huì)被Selector輪詢出來(lái),然后通過(guò)SelectionKey可以取得就緒的Channel集合帝嗡,從而進(jìn)行后續(xù)的IO操作晶通。
Selector線程就類似一個(gè)管理者(Master),管理了成千上萬(wàn)個(gè)管道,然后輪詢哪個(gè)管道的數(shù)據(jù)已經(jīng)準(zhǔn)備好哟玷,通知CPU執(zhí)行IO的讀取或?qū)懭氩僮鳌?br>
Selector模式:當(dāng)IO事件(管理)注冊(cè)到選擇器以后狮辽,selector會(huì)分配給每個(gè)管道一個(gè)key值一也,相當(dāng)于標(biāo)簽。selector選擇器是以輪詢的方式進(jìn)行查找注冊(cè)的所有IO事件(管道)
當(dāng)我們的IO事件(管道)準(zhǔn)備就緒后喉脖,select就會(huì)識(shí)別椰苟,會(huì)通過(guò)key值來(lái)找到相應(yīng)的管道,進(jìn)行相關(guān)的數(shù)據(jù)處理操作(從管道里讀或?qū)憯?shù)據(jù)树叽,寫(xiě)道我們的數(shù)據(jù)緩沖區(qū)中)舆蝴。
每個(gè)管道都會(huì)對(duì)選擇器進(jìn)行注冊(cè)不同的事件狀態(tài),以便選擇器查找题诵。
SelectionKey.OP_CONNECT 連接狀態(tài)
SelectionKey.OP_ACCEPT 阻塞狀態(tài)
SelectionKey.OP_READ 可讀狀態(tài)
SelectionKey.OP_WRITE 可寫(xiě)狀態(tài)
代碼如下:
public class Server implements Runnable{
//1 多路復(fù)用器(管理所有的通道)
private Selector seletor;
//2 建立緩沖區(qū)
private ByteBuffer readBuf = ByteBuffer.allocate(1024);
//3
private ByteBuffer writeBuf = ByteBuffer.allocate(1024);
public Server(int port){
try {
//1 打開(kāi)路復(fù)用器
this.seletor = Selector.open();
//2 打開(kāi)服務(wù)器通道
ServerSocketChannel ssc = ServerSocketChannel.open();
//3 設(shè)置服務(wù)器通道為非阻塞模式
ssc.configureBlocking(false);
//4 綁定地址
ssc.bind(new InetSocketAddress(port));
//5 把服務(wù)器通道注冊(cè)到多路復(fù)用器上洁仗,并且監(jiān)聽(tīng)阻塞事件
ssc.register(this.seletor, SelectionKey.OP_ACCEPT);
System.out.println("Server start, port :" + port);
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
while(true){
try {
//1 必須要讓多路復(fù)用器開(kāi)始監(jiān)聽(tīng)
this.seletor.select();
//2 返回多路復(fù)用器已經(jīng)選擇的結(jié)果集
Iterator<SelectionKey>keys=this.seletor.selectedKeys().iterator();
//3 進(jìn)行遍歷
while(keys.hasNext()){
//4 獲取一個(gè)選擇的元素
SelectionKey key = keys.next();
//5 直接從容器中移除就可以了
keys.remove();
//6 如果是有效的
if(key.isValid()){
//7 如果為阻塞狀態(tài)
if(key.isAcceptable()){
this.accept(key);
}
//8 如果為可讀狀態(tài)
if(key.isReadable()){
this.read(key);
}
//9 寫(xiě)數(shù)據(jù)
if(key.isWritable()){
//this.write(key); //ssc
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
private void write(SelectionKey key){
//ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
//ssc.register(this.seletor, SelectionKey.OP_WRITE);
}
private void read(SelectionKey key) {
try {
//1 清空緩沖區(qū)舊的數(shù)據(jù)
this.readBuf.clear();
//2 獲取之前注冊(cè)的socket通道對(duì)象
SocketChannel sc = (SocketChannel) key.channel();
//3 讀取數(shù)據(jù)
int count = sc.read(this.readBuf);
//4 如果沒(méi)有數(shù)據(jù)
if(count == -1){
key.channel().close();
key.cancel();
return;
}
//5 有數(shù)據(jù)則進(jìn)行讀取 讀取之前需要進(jìn)行復(fù)位方法(把position 和limit進(jìn)行復(fù)位)
this.readBuf.flip();
//6 根據(jù)緩沖區(qū)的數(shù)據(jù)長(zhǎng)度創(chuàng)建相應(yīng)大小的byte數(shù)組,接收緩沖區(qū)的數(shù)據(jù)
byte[] bytes = new byte[this.readBuf.remaining()];
//7 接收緩沖區(qū)數(shù)據(jù)
this.readBuf.get(bytes);
//8 打印結(jié)果
String body = new String(bytes).trim();
System.out.println("Server : " + body);
// 9..可以寫(xiě)回給客戶端數(shù)據(jù)
} catch (IOException e) {
e.printStackTrace();
}
}
private void accept(SelectionKey key) {
try {
//1 獲取服務(wù)通道
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
//2 執(zhí)行阻塞方法
SocketChannel sc = ssc.accept();
//3 設(shè)置阻塞模式
sc.configureBlocking(false);
//4 注冊(cè)到多路復(fù)用器上性锭,并設(shè)置讀取標(biāo)識(shí)
sc.register(this.seletor, SelectionKey.OP_READ);
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
new Thread(new Server(8765)).start();;
}
}
Client方法
public class Client {
//需要一個(gè)Selector
public static void main(String[] args) {
//創(chuàng)建連接的地址
InetSocketAddress address = new InetSocketAddress("127.0.0.1", 8765);
//聲明連接通道
SocketChannel sc = null;
//建立緩沖區(qū)
ByteBuffer buf = ByteBuffer.allocate(1024);
try {
//打開(kāi)通道
sc = SocketChannel.open();
//進(jìn)行連接
sc.connect(address);
while(true){
//定義一個(gè)字節(jié)數(shù)組赠潦,然后使用系統(tǒng)錄入功能:
byte[] bytes = new byte[1024];
System.in.read(bytes);
//把數(shù)據(jù)放到緩沖區(qū)中
buf.put(bytes);
//對(duì)緩沖區(qū)進(jìn)行復(fù)位
buf.flip();
//寫(xiě)出數(shù)據(jù)
sc.write(buf);
//清空緩沖區(qū)數(shù)據(jù)
buf.clear();
}
} catch (IOException e) {
e.printStackTrace();
} finally {
if(sc != null){
try {
sc.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
創(chuàng)建NIO服務(wù)器的主要步驟:
打開(kāi)ServerSocketChannel,監(jiān)聽(tīng)客戶端連接
綁定監(jiān)聽(tīng)端口草冈,設(shè)置連接為非阻塞模式
創(chuàng)建Reactor線程她奥,創(chuàng)建多路復(fù)用器并啟動(dòng)線程
將ServerSocketChannel注冊(cè)到Reactor線程中的Selector上,監(jiān)聽(tīng)ACCEPT事件
Selector輪詢準(zhǔn)備就緒的key
Selector監(jiān)聽(tīng)到新的客戶端接入怎棱,處理新的接入請(qǐng)求哩俭,完成TCP三次握手,簡(jiǎn)歷物理鏈路
設(shè)置客戶端鏈路為非阻塞模式
將新接入的客戶端連接注冊(cè)到Reactor線程的Selector上蹄殃,監(jiān)聽(tīng)讀操作携茂,讀取客戶端發(fā)送的網(wǎng)絡(luò)消息
異步讀取客戶端消息到緩沖區(qū)
對(duì)Buffer編解碼,處理半包消息诅岩,將解碼成功的消息封裝成Task
-
將應(yīng)答消息編碼為Buffer,調(diào)用SocketChannel的write將消息異步發(fā)送給客戶端
因?yàn)閼?yīng)答消息的發(fā)送带膜,SocketChannel也是異步非阻塞的吩谦,所以不能保證一次能吧需要發(fā)送的數(shù)據(jù)發(fā)送完,此時(shí)就會(huì)出現(xiàn)寫(xiě)半包的問(wèn)題膝藕。我們需要注冊(cè)寫(xiě)操作式廷,不斷輪詢Selector將沒(méi)有發(fā)送完的消息發(fā)送完畢,然后通過(guò)Buffer的hasRemain()方法判斷消息是否發(fā)送完成芭挽。
NIO存在的問(wèn)題:
使用NIO != 高性能滑废,當(dāng)連接數(shù)<1000,并發(fā)程度不高或者局域網(wǎng)環(huán)境下NIO并沒(méi)有顯著的性能優(yōu)勢(shì)袜爪。
NIO并沒(méi)有完全屏蔽平臺(tái)差異蠕趁,它仍然是基于各個(gè)操作系統(tǒng)的I/O系統(tǒng)實(shí)現(xiàn)的,差異仍然存在辛馆。使用NIO做網(wǎng)絡(luò)編程構(gòu)建事件驅(qū)動(dòng)模型并不容易俺陋,陷阱重重。
推薦大家使用成熟的NIO框架,如Netty腊状,MINA等诱咏。解決了很多NIO的陷阱,并屏蔽了操作系統(tǒng)的差異缴挖,有較好的性能和編程模型袋狞。
四.AIO (Asynchronous IO)
NIO 2 ,在Java 7提出映屋,NIO的升級(jí)版苟鸯。實(shí)現(xiàn)非阻塞異步的通信模式;
提供異步文件通道和異步套接字通道秧荆;
其read倔毙,write方法的返回類型都是Future對(duì)象;
而Future模型是異步的乙濒,其核心思想是:去主函數(shù)等待時(shí)間陕赃;
基于事件和回調(diào)機(jī)制。
AIO編程颁股,在NIO基礎(chǔ)之上引入了異步通道的概念么库。并提供異步文件和異步套接字通道的實(shí)現(xiàn),從而在真正意義上實(shí)現(xiàn)了異步非阻塞甘有,之前我們學(xué)過(guò)的NIO只是非阻塞而非異步诉儒。而AIO它不需要通過(guò)多路復(fù)用器對(duì)注冊(cè)的通道的進(jìn)行輪訓(xùn)操作即可實(shí)現(xiàn)異步讀寫(xiě),從而簡(jiǎn)化了NIO編程模型亏掀。也可以稱為NIO2.0忱反,這種模式才是真正的屬于異步非阻塞的模型。
AsynchronousServerScoketChannel
AsynchronousScoketChanel
示例代碼:
Server端代碼
public class Server {
//線程池
private ExecutorService executorService;
//線程組
private AsynchronousChannelGroup threadGroup;
//服務(wù)器通道
public AsynchronousServerSocketChannel assc;
public Server(int port){
try {
//創(chuàng)建一個(gè)緩存池
executorService = Executors.newCachedThreadPool();
//創(chuàng)建線程組
threadGroup = AsynchronousChannelGroup.withCachedThreadPool(executorService, 1);
//創(chuàng)建服務(wù)器通道
assc = AsynchronousServerSocketChannel.open(threadGroup);
//進(jìn)行綁定
assc.bind(new InetSocketAddress(port));
System.out.println("server start , port : " + port);
//進(jìn)行阻塞
assc.accept(this, new ServerCompletionHandler());
//一直阻塞 不讓服務(wù)器停止
Thread.sleep(Integer.MAX_VALUE);
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
Server server = new Server(8765);
}
}
ServerCompletionHandler端代碼
Public class ServerCompletionHandler implements CompletionHandler<AsynchronousSocketChannel, Server> {
@Override
public void completed(AsynchronousSocketChannel asc, Server attachment) {
//當(dāng)有下一個(gè)客戶端接入的時(shí)候 直接調(diào)用Server的accept方法滤愕,這樣反復(fù)執(zhí)行下去温算,保證多個(gè)客戶端都可以阻塞(沒(méi)有遞歸上限),1.7以后AIO才實(shí)現(xiàn)了異步非阻塞
attachment.assc.accept(attachment, this);
read(asc);
}
private void read(final AsynchronousSocketChannel asc) {
//讀取數(shù)據(jù)
ByteBuffer buf = ByteBuffer.allocate(1024);
asc.read(buf, buf, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer resultSize, ByteBuffer attachment) {
//進(jìn)行讀取之后,重置標(biāo)識(shí)位
attachment.flip();
//獲得讀取的字節(jié)數(shù)
System.out.println("Server -> " + "收到客戶端的數(shù)據(jù)長(zhǎng)度為:" + resultSize);
//獲取讀取的數(shù)據(jù)
String resultData = new String(attachment.array()).trim();
System.out.println("Server -> " + "收到客戶端的數(shù)據(jù)信息為:" + resultData);
String response = "服務(wù)器響應(yīng), 收到了客戶端發(fā)來(lái)的數(shù)據(jù): " + resultData;
write(asc, response);
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
exc.printStackTrace();
}
});
}
private void write(AsynchronousSocketChannel asc, String response) {
try {
ByteBuffer buf = ByteBuffer.allocate(1024);
buf.put(response.getBytes());
buf.flip();
asc.write(buf).get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
@Override
public void failed(Throwable exc, Server attachment) {
exc.printStackTrace();
}
}
Client端代碼
public class Clientimplements Runnable{
private AsynchronousSocketChannel asc ;
public Client() throws Exception {
asc = AsynchronousSocketChannel.open();
}
public void connect(){
asc.connect(new InetSocketAddress("127.0.0.1", 8765));
}
public void write(String request){
try {
asc.write(ByteBuffer.wrap(request.getBytes())).get();
read();
} catch (Exception e) {
e.printStackTrace();
}
}
private void read() {
ByteBuffer buf = ByteBuffer.allocate(1024);
try {
asc.read(buf).get();
buf.flip();
byte[] respByte = new byte[buf.remaining()];
buf.get(respByte);
System.out.println(new String(respByte,"utf-8").trim());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
@Override
public void run() {
while(true){
}
}
public static void main(String[] args) throws Exception {
Client c1 = new Client();
c1.connect();
Client c2 = new Client();
c2.connect();
Client c3 = new Client();
c3.connect();
new Thread(c1, "c1").start();
new Thread(c2, "c2").start();
new Thread(c3, "c3").start();
Thread.sleep(1000);
c1.write("c1 aaa");
c2.write("c2 bbbb");
c3.write("c3 ccccc");
}
}
五.小結(jié)
- IO间影,阻塞同步通信模式注竿,客戶端與服務(wù)端三次握手,簡(jiǎn)單魂贬,吞吐量小巩割。關(guān)鍵詞:Socket和ServerSocket;
- NIO付燥,非阻塞同步通信模式宣谈,客戶端與服務(wù)端通過(guò)Channel連接,采用多路復(fù)用器輪詢注冊(cè)的Channel机蔗。關(guān)鍵詞:SocketChannel和ServerSocketChannel蒲祈;
- AIO甘萧,非阻塞異步通信模式,基于事件和回調(diào)機(jī)制梆掸,采用異步通道實(shí)現(xiàn)異步通信扬卷。關(guān)鍵詞:AsynchronousSocketChannel和AsynchronousServerSocketChannel。
- 適用場(chǎng)景:
- BIO方式適用于連接數(shù)目比較小且固定的架構(gòu)酸钦,這種方式對(duì)服務(wù)器資源要求比較高怪得,并發(fā)局限于應(yīng)用中,JDK1.4以前的唯一選擇卑硫,但程序直觀簡(jiǎn)單易理解徒恋。
- NIO方式適用于連接數(shù)目多且連接比較短(輕操作)的架構(gòu),比如聊天服務(wù)器欢伏,并發(fā)局限于應(yīng)用中入挣,編程比較復(fù)雜,JDK1.4開(kāi)始支持硝拧。
- AIO方式使用于連接數(shù)目多且連接比較長(zhǎng)(重操作)的架構(gòu)径筏,比如相冊(cè)服務(wù)器,充分調(diào)用OS參與并發(fā)操作障陶,編程比較復(fù)雜滋恬,JDK7開(kāi)始支持。