Java中的IO,BIO趴生,NIO阀趴,AIO

IO一直是軟件開(kāi)發(fā)中的核心部分之一昏翰,而隨著互聯(lián)網(wǎng)技術(shù)的提高苍匆,IO的重要性也越來(lái)越重刘急。縱觀開(kāi)發(fā)界浸踩,能夠巧妙運(yùn)用IO叔汁,不但對(duì)于公司,而且對(duì)于開(kāi)發(fā)人員都非常的重要检碗。Java的IO機(jī)制也是一直在不斷的完善据块,以應(yīng)對(duì)日見(jiàn)增多的流量。

一.JavaIO的方式

????第一折剃,傳統(tǒng)java.io包提供了諸如File的抽象另假,輸入,輸出流怕犁。交互方式是同步边篮,阻塞;
  第二奏甫,在java1.4中引入NIO框架(java.nio包)戈轿,提供了Channel,Selector阵子,Buffer等抽象思杯,構(gòu)建多路復(fù)用的,同步非阻塞IO挠进,同時(shí)提供了更接近操作系統(tǒng)底層的高性能數(shù)據(jù)操作方式色乾;
  第三颖杏,在Java 7中匪傍,NIO有了進(jìn)一步的改進(jìn),也就是NIO 2费变。其引入了異步非阻塞IO方式攘须,或者說(shuō)AIO(Asynchronous IO)漆撞。異步IO基于事件和回調(diào)機(jī)制。

二.傳統(tǒng)BIO

????傳統(tǒng)BIO于宙,采用BIO編程通信模型的服務(wù)端浮驳,通常由一個(gè)獨(dú)立的Acceptor線程負(fù)責(zé)監(jiān)聽(tīng)客戶端的連接。它接收到客戶端連接請(qǐng)求之后為每個(gè)客戶端創(chuàng)建一個(gè)新的線程進(jìn)行鏈路處理捞魁,處理完成后至会,通過(guò)輸出流返回給對(duì)應(yīng)的客戶端,然后銷毀線程谱俭。
服務(wù)端提供IP地址和監(jiān)聽(tīng)的端口奉件,客戶端通過(guò)TCP的三次握手與之連接宵蛀,連接成功后,雙方通過(guò)套接字通信县貌。
  ServerSocket負(fù)責(zé)綁定IP地址术陶,啟動(dòng)監(jiān)聽(tīng)端口;Socket負(fù)責(zé)發(fā)起鏈接操作煤痕。連接成功梧宫,雙方通過(guò)輸入和輸出流進(jìn)行同步阻塞式通信“诘铮“你來(lái)了塘匣,我才往”的交流方式。
  Java.net提供的API巷帝,如Socket忌卤,ServerSocket,HttpURLConnection也歸與同步阻塞IO類庫(kù)楞泼,因?yàn)榫W(wǎng)絡(luò)通信同樣是IO行為驰徊。
  優(yōu)點(diǎn):代碼簡(jiǎn)單,直觀现拒,維護(hù)方便辣垒。
  缺點(diǎn):IO效率極低,影響性能印蔬,卻反彈性處理勋桶。
  當(dāng)吞吐量增大了,有妙招:線程池侥猬。
  使用線程池管理線程例驹,避免頻繁創(chuàng)建,銷毀線程的開(kāi)銷退唠。但是鹃锈,其底層使用的依然是同步阻塞IO,通常被稱為“偽異步IO模型”瞧预,真的是治標(biāo)不治本屎债。
比如tomcat 采用的傳統(tǒng)的BIO(同步阻塞IO模型)+ 線程池 模式: 這個(gè)模式適合活動(dòng)連接數(shù)不是特別高的(連接<1000)
這個(gè)模式是每個(gè)連接每個(gè)線程,之所以用多線程垢油, 主要原因是在socket.accept(),socket.read(),socket.wirte() 三個(gè)函數(shù)都是同步阻塞的盆驹, 當(dāng)一個(gè)連接在處理IO的時(shí)候, 系統(tǒng)是阻塞的滩愁,如果是單線程的話系統(tǒng)必然死掉躯喇, 如果是單線程的話,對(duì)于多核cpu硝枉,cpu的資源沒(méi)有得到很好的利用廉丽,所以采用線程池的模式倦微,這樣線程的創(chuàng)建和回收成本相對(duì)較低;
如果對(duì)十萬(wàn)甚至百萬(wàn)級(jí)連接的時(shí)候正压,傳統(tǒng)的BIO模型是無(wú)能為力的欣福, 因?yàn)锽IO有幾個(gè)問(wèn)題:

  • 線程的創(chuàng)建和銷毀成本很高,在Linux這樣的操作系統(tǒng)中蔑匣,線程本質(zhì)上就是一個(gè)進(jìn)程劣欢。創(chuàng)建和銷毀都是重量級(jí)的系統(tǒng)函數(shù)棕诵;
  • 線程本身占用較大內(nèi)存裁良,像Java的線程棧,一般至少分配512K~1M的空間校套,如果系統(tǒng)中的線程數(shù)過(guò)千价脾,恐怕整個(gè)JVM的內(nèi)存都會(huì)被吃掉一半;
  • 線程的切換成本是很高的笛匙。操作系統(tǒng)發(fā)生線程切換的時(shí)候侨把,需要保留線程的上下文,然后執(zhí)行系統(tǒng)調(diào)用妹孙。如果線程數(shù)過(guò)高秋柄,可能執(zhí)行線程切換的時(shí)間甚至?xí)笥诰€程執(zhí)行的時(shí)間,這時(shí)候帶來(lái)的表現(xiàn)往往是系統(tǒng)load偏高蠢正、CPU sy使用率特別高(超過(guò)20%以上)骇笔,導(dǎo)致系統(tǒng)幾乎陷入不可用的狀態(tài);
  • 容易造成鋸齒狀的系統(tǒng)負(fù)載嚣崭。因?yàn)橄到y(tǒng)負(fù)載是用活動(dòng)線程數(shù)或CPU核心數(shù)笨触,一旦線程數(shù)量高但外部網(wǎng)絡(luò)環(huán)境不是很穩(wěn)定,就很容易造成大量請(qǐng)求的結(jié)果同時(shí)返回雹舀,激活大量阻塞線程從而使系統(tǒng)負(fù)載壓力過(guò)大芦劣;

概念:

Socket又稱“套接字”,應(yīng)用程序通常通過(guò)“套接字”向網(wǎng)絡(luò)發(fā)出請(qǐng)求或者應(yīng)答網(wǎng)絡(luò)請(qǐng)求说榆。
Socket和ServerSocket類庫(kù)位于java.net包中虚吟,serverSocket用于服務(wù)器端,Socket是建立網(wǎng)絡(luò)連接時(shí)使用的签财。在連接成功時(shí)串慰,應(yīng)用程序兩端都會(huì)產(chǎn)生一個(gè)Socket實(shí)例,操作這個(gè)實(shí)例荠卷,完成所需的會(huì)話模庐,對(duì)于一個(gè)網(wǎng)絡(luò)連接來(lái)說(shuō),套接字是平等的油宜,不因?yàn)樵诜?wù)器端或在客戶端而產(chǎn)生不同的級(jí)別掂碱,不管是Socket還是ServerSocket它們的工作都是通過(guò)SocketImpl類及其子類完成的

套接字之間鏈接過(guò)程分為四步:

  1. 服務(wù)器監(jiān)聽(tīng)
    服務(wù)器監(jiān)聽(tīng):是服務(wù)端套接字并不定位具體的客戶端套接字怜姿,而是處于等待連接的狀態(tài),實(shí)時(shí)監(jiān)控網(wǎng)絡(luò)的狀態(tài)
  2. 客戶端請(qǐng)求服務(wù)器
    客戶端請(qǐng)求:是指由客戶端的套接字提出連接請(qǐng)求疼燥,要連接的目標(biāo)是服務(wù)器端的套接字沧卢。為此,客戶端的套接字必須首先描述它要連接的服務(wù)器的套接字醉者,指出服務(wù)器套接字的地址和端口號(hào)但狭,然后就想服務(wù)器端套接字提出連接請(qǐng)求
  3. 服務(wù)器確認(rèn)
    服務(wù)器端連接確認(rèn),是指當(dāng)服務(wù)器端套接字監(jiān)聽(tīng)到或者說(shuō)接受到客戶端套接字的連接請(qǐng)求撬即,他就響應(yīng)客戶端套接字的請(qǐng)求立磁,建立一個(gè)新的線程,把服務(wù)器端套接字的描述發(fā)給客戶端剥槐。
  4. 客戶端進(jìn)行通信
    客戶端連接確認(rèn):一旦客戶端確認(rèn)了此描述唱歧,連接就建立好了,雙方開(kāi)始通信粒竖。而服務(wù)器端套接字繼續(xù)處于監(jiān)聽(tīng)狀態(tài)颅崩,繼續(xù)接受其他客戶端套接字的連接請(qǐng)求

代碼示例:
ServerHandler方法

public class ServerHandler implements Runnable{
   private Socket socket ;
   public ServerHandler(Socket socket){
      this.socket = socket;
   }
   @Override
   public void run() {
      BufferedReader in = null;
      PrintWriter out = null;
      try {
         in = new BufferedReader(new InputStreamReader(this.socket.getInputStream()));
         out = new PrintWriter(this.socket.getOutputStream(),true);
         String body = null;
         while(true){
            body = in.readLine();
            if(body == null)  break;
            System.out.println("Server :"+ body);
            out.println("服務(wù)器端回送響的應(yīng)數(shù)據(jù).");
         }     
      } catch (Exception e) {
         e.printStackTrace();
      } finally {
         if(in != null){
            try {
               in.close();
            } catch (IOException e) {
               e.printStackTrace();
            }
         }
         if(out != null){
            try {
               out.close();
            } catch (Exception e) {
               e.printStackTrace();
            }
         }
         if(socket != null){
            try {
               socket.close();
            } catch (IOException e) {
               e.printStackTrace();
            }
         }
         socket = null;
      }   
   }
}

Server方法

public class Server {
   final static int PROT =8765;
   public static void main(String[] args) {
      ServerSocket server = null;
      try {
         server = new ServerSocket(PROT);
         System.out.println(" server start .. ");
         //進(jìn)行阻塞         
        Socket socket = server.accept();
         //新建一個(gè)線程執(zhí)行客戶端的任務(wù)       
        new Thread(new ServerHandler(socket)).start();
      } catch (Exception e) {
         e.printStackTrace();
      } finally {
            if (server !=null) {
                try {
                    server.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            server = null;
        }
   }
}

Client方法

public class Client {
   final static String ADDRESS ="127.0.0.1";
   final static int PORT =8765;
   public static void main(String[] args) {
      Socket socket = null;
      BufferedReader in = null;
      PrintWriter out = null;
      try {
         socket = new Socket(ADDRESS, PORT);
         in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
         out = new PrintWriter(socket.getOutputStream(),true);
         //向服務(wù)器端發(fā)送數(shù)據(jù)        
         out.println("接收到客戶端的請(qǐng)求數(shù)據(jù)...");
         out.println("接收到客戶端的請(qǐng)求數(shù)據(jù)1111...");
         String response = in.readLine();
         System.out.println("Client: " + response);
      } catch (Exception e) {
         e.printStackTrace();
      } finally {
         if(in != null){
            try {
               in.close();
            } catch (IOException e) {
               e.printStackTrace();
            }
         }
         if(out != null){
            try {
               out.close();
            } catch (Exception e) {
               e.printStackTrace();
            }
         }
         if(socket != null){
            try {
               socket.close();
            } catch (IOException e) {
               e.printStackTrace();
            }
         }
         socket = null;
      }
   }
}

網(wǎng)絡(luò)編程的基本模型Client/Server模型,也就是兩個(gè)進(jìn)程直接進(jìn)行相互通信蕊苗,其中服務(wù)端提供配置信息(綁定的IP地址和監(jiān)聽(tīng)端口)沿后,客戶端通過(guò)連接操作向服務(wù)端監(jiān)聽(tīng)的地址發(fā)起連接請(qǐng)求,通過(guò)三次握手建立連接朽砰,如果連接成功尖滚,則雙方即可進(jìn)行通信(網(wǎng)絡(luò)套接字socket)
優(yōu)化方法:使用連接池和任務(wù)隊(duì)列創(chuàng)建偽異步IO
采用線程池和任務(wù)隊(duì)列可以實(shí)現(xiàn)一種偽異步的IO通信框架。
我們學(xué)過(guò)連接池的使用和隊(duì)列的使用锅移,其實(shí)就是將客戶端的socket封裝成一個(gè)task任務(wù)(實(shí)現(xiàn)runnable接口的類)然后投遞到線程池中去熔掺,配置相應(yīng)的隊(duì)列進(jìn)行實(shí)現(xiàn)。
代碼示例
ServerHandler方法

public class ServerHandler implements Runnable {
    private Socket socket;
    public ServerHandler (Socket socket){
    this.socket = socket;
}
@Override
public void run() {
    BufferedReader in = null;
    PrintWriter out = null;
try {
    in = new BufferedReader(new InputStreamReader(this.socket.getInputStream()));
    out = new PrintWriter(this.socket.getOutputStream(), true);
    String body = null;
    while(true){
      body = in.readLine();
      if(body == null) break;
      System.out.println("Server:" + body);
      out.println("Server response");
    }
} catch (Exception e) {
    e.printStackTrace();
} finally {
    if(in != null){
      try {
        in.close();
      } catch (Exception e1) {
          e1.printStackTrace();
      }
}
if(out != null){
    try {
        out.close();
    } catch (Exception e2) {
        e2.printStackTrace();
    }
}
if(socket != null){
    try {
        socket.close();
    } catch (Exception e3) {
        e3.printStackTrace();
    }
}
    socket = null; 
} 
}
}

HandlerExecutorPool方法

public class HandlerExecutorPool {
    private ExecutorService executor;
    public HandlerExecutorPool(int maxPoolSize, int queueSize){
    this.executor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),
      maxPoolSize, 120L, TimeUnit.SECONDS,
     new ArrayBlockingQueue<Runnable>(queueSize));
}
public void execute(Runnable task){
    this.executor.execute(task);
} 
}

Server方法

public class Server {
final static int PORT = 8765;
 
public static void main(String[] args) {
    ServerSocket server = null;
    BufferedReader in = null;
    PrintWriter out = null;
try {
    server = new ServerSocket(PORT);
    System.out.println("server start");
    Socket socket = null;
    HandlerExecutorPool executorPool = new HandlerExecutorPool(50, 1000);
    while(true){
        socket = server.accept();
        executorPool.execute(new ServerHandler(socket));
    }
} catch (Exception e) {
    e.printStackTrace();
} finally {
if(in != null){
    try {
        in.close();
    } catch (Exception e1) {
        e1.printStackTrace();
    }
}
if(out != null){
    try {
        out.close();
    } catch (Exception e2) {
        e2.printStackTrace();
    }
}
if(server != null){
    try {
        server.close();
    } catch (Exception e3) {
        e3.printStackTrace();
    }
}
    server = null; 
}
} 
}

Client方法

public class Client {
final static String ADDRESS = "127.0.0.1";
final static int PORT =8765;
public static void main(String[] args) {
    Socket socket = null;
    BufferedReader in = null;
    PrintWriter out = null;
    try {
        socket = new Socket(ADDRESS, PORT);
        in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
        out = new PrintWriter(socket.getOutputStream(), true);
        out.println("Client request");
        String response = in.readLine();
        System.out.println("Client:" + response); 
}  catch (Exception e) {
      e.printStackTrace();
} finally {
    if(in != null){
      try {
          in.close();
      } catch (Exception e1) {
          e1.printStackTrace();
      }
}
if(out != null){
    try {
        out.close();
    } catch (Exception e2) {
        e2.printStackTrace();
    }
}
if(socket != null){
    try {
        socket.close();
    } catch (Exception e3) {
        e3.printStackTrace();
    }
}
      socket = null; 
} 
}
}

注意:

????IO不僅僅時(shí)對(duì)文件的操作非剃,網(wǎng)絡(luò)編程中置逻,比如Socket通信也是典型的IO操作。
  輸入流(InputStream)备绽,輸出流(OutputStream)券坞,用于讀取或?qū)懭胱止?jié)的(在java中以Stream結(jié)尾)。
  而Reader/Writer操作字符肺素,增加了字符編解碼等功能(在java中以Reader/Writer結(jié)尾)恨锚。本質(zhì)上計(jì)算機(jī)操作的都是字節(jié),不管是網(wǎng)絡(luò)通信還是文件讀取倍靡,Reader/Writer相當(dāng)于構(gòu)建了應(yīng)用邏輯和原始數(shù)據(jù)之間的橋梁猴伶。
  BufferdOutputStream,等帶緩沖區(qū)的實(shí)現(xiàn),可以避免頻繁的磁盤讀寫(xiě)他挎,進(jìn)而提高IO處理效率筝尾。


1.jpeg.jpg

區(qū)分異步和同步

????同步,一種可靠有序的運(yùn)行機(jī)制办桨,在進(jìn)行同步操作時(shí)筹淫,后續(xù)的任務(wù)要等待當(dāng)前調(diào)用返回;
  異步呢撞,其他任務(wù)不需要等待當(dāng)前調(diào)用是否返回损姜,依靠事件,回調(diào)機(jī)制來(lái)實(shí)現(xiàn)任務(wù)調(diào)用殊霞。

阻塞和非阻塞

????阻塞摧阅,當(dāng)前線程處于阻塞狀態(tài),無(wú)法從事其他任務(wù)脓鹃,只有當(dāng)條件就緒才能繼續(xù)逸尖;
  非阻塞,不管IO操作是否結(jié)束瘸右,直接返回,相應(yīng)的操作在后臺(tái)繼續(xù)處理岩齿。
  由此總結(jié):同步和異步是結(jié)果太颤,阻塞和非阻塞是過(guò)程。

三.NIO通過(guò)線程的輪詢盹沈,實(shí)現(xiàn)非阻塞IO

NIO的組成部分:

  1. Buffer 緩沖區(qū)龄章;高效的數(shù)據(jù)容器
    不同的是BIO將數(shù)據(jù)直接讀寫(xiě)到Stream對(duì)象中
    NIO的數(shù)據(jù)操作都是在緩沖區(qū)中進(jìn)行的
    緩沖區(qū)實(shí)際上是一個(gè)數(shù)組,常見(jiàn)類型ByteBuffer乞封,CharBuffer做裙,ShortBuffer,IntBuffer肃晚,LongBuffer锚贱,F(xiàn)loutBuffer,DoubleBuffer
  2. Channel 在NIO中被用來(lái)支持批量式IO操作的一種抽象关串,與流不同拧廊,通道是雙向的。
    File/Socket晋修,通常被認(rèn)為是比較高層次的抽象吧碾,而Channel則是更加偏向操作系統(tǒng)底層的一種抽象,這也使得NIO得以充分利用現(xiàn)代操作系統(tǒng)底層機(jī)制墓卦,進(jìn)行性能優(yōu)化倦春。分兩大類:網(wǎng)絡(luò)讀寫(xiě)SelectableChannel(子類包括SocketChannel和ServerSocketChannel);文件操(FileChannel)
  3. Selector 是NIO實(shí)現(xiàn)多路復(fù)用的基礎(chǔ)。它提供一種高效機(jī)制睁本,不斷輪詢注冊(cè)在其上的Channel山叮,找出處于就緒狀態(tài),通過(guò)SelectionKey取得就緒的Channel集合添履,進(jìn)行后續(xù)的IO操作屁倔。服務(wù)端只要提供一個(gè)負(fù)責(zé)Selector輪詢的線程即可,實(shí)現(xiàn)單線程對(duì)多Channel的高效管理暮胧,也是基于操作系統(tǒng)底層機(jī)制
  4. Charset 提供Unicode字符串定義锐借,NIO也提供了相應(yīng)的編解碼等


    2.jpeg.jpg

    NIO的本質(zhì)就是避免原始的TCP建立連接使用3次握手的操作,減少連接的開(kāi)銷
    NIO和IO之間一個(gè)最大區(qū)別:IO是面向流的往衷,NIO是面向緩沖區(qū)的钞翔。

通道(Channel),它就像自來(lái)水管道一樣席舍,網(wǎng)絡(luò)數(shù)據(jù)通過(guò)Channel讀取和寫(xiě)入布轿,通道與流不同之處在于通道是雙向的,而流只是一個(gè)方向上移動(dòng)(一個(gè)流必須是inputStream或者outputStream的子類)来颤,而通道可以用于讀汰扭,寫(xiě)或者二者同時(shí)進(jìn)行,最關(guān)鍵的是可以與多路復(fù)用器結(jié)合起來(lái)福铅,有多種的狀態(tài)位萝毛,方便多路復(fù)用器去識(shí)別。事實(shí)上通道分為兩大類滑黔,一類是網(wǎng)絡(luò)讀寫(xiě)的(SelectableChannel)笆包,一類是用于文件操作的(FileChannel),我們使用的SocketChannel和ServerSockerChannel都是SelectableChannel的子類

多路復(fù)用器(seletor)略荡,他是NIO編程的基礎(chǔ)庵佣,非常重要,多路復(fù)用器提供選擇已經(jīng)就緒的任務(wù)的能力汛兜。
簡(jiǎn)單說(shuō)巴粪,就是Selector會(huì)不斷地輪詢注冊(cè)在其上的通道(Channel),如果某個(gè)通道發(fā)生了讀寫(xiě)操作序无,這個(gè)通道就處于就緒狀態(tài)验毡,會(huì)被Selector輪詢出來(lái),然后通過(guò)SelectionKey可以取得就緒的Channel集合帝嗡,從而進(jìn)行后續(xù)的IO操作晶通。

Selector線程就類似一個(gè)管理者(Master),管理了成千上萬(wàn)個(gè)管道,然后輪詢哪個(gè)管道的數(shù)據(jù)已經(jīng)準(zhǔn)備好哟玷,通知CPU執(zhí)行IO的讀取或?qū)懭氩僮鳌?br> Selector模式:當(dāng)IO事件(管理)注冊(cè)到選擇器以后狮辽,selector會(huì)分配給每個(gè)管道一個(gè)key值一也,相當(dāng)于標(biāo)簽。selector選擇器是以輪詢的方式進(jìn)行查找注冊(cè)的所有IO事件(管道)
當(dāng)我們的IO事件(管道)準(zhǔn)備就緒后喉脖,select就會(huì)識(shí)別椰苟,會(huì)通過(guò)key值來(lái)找到相應(yīng)的管道,進(jìn)行相關(guān)的數(shù)據(jù)處理操作(從管道里讀或?qū)憯?shù)據(jù)树叽,寫(xiě)道我們的數(shù)據(jù)緩沖區(qū)中)舆蝴。

每個(gè)管道都會(huì)對(duì)選擇器進(jìn)行注冊(cè)不同的事件狀態(tài),以便選擇器查找题诵。
SelectionKey.OP_CONNECT 連接狀態(tài)
SelectionKey.OP_ACCEPT 阻塞狀態(tài)
SelectionKey.OP_READ 可讀狀態(tài)
SelectionKey.OP_WRITE 可寫(xiě)狀態(tài)
代碼如下:

public class Server implements Runnable{
//1 多路復(fù)用器(管理所有的通道)
private Selector seletor;
//2 建立緩沖區(qū)
private ByteBuffer readBuf = ByteBuffer.allocate(1024);
//3 
private ByteBuffer writeBuf = ByteBuffer.allocate(1024);
public Server(int port){
try {
//1 打開(kāi)路復(fù)用器
this.seletor = Selector.open();
//2 打開(kāi)服務(wù)器通道
ServerSocketChannel ssc = ServerSocketChannel.open();
//3 設(shè)置服務(wù)器通道為非阻塞模式
ssc.configureBlocking(false);
//4 綁定地址
ssc.bind(new InetSocketAddress(port));
//5 把服務(wù)器通道注冊(cè)到多路復(fù)用器上洁仗,并且監(jiān)聽(tīng)阻塞事件
ssc.register(this.seletor, SelectionKey.OP_ACCEPT);

System.out.println("Server start, port :" + port);

} catch (IOException e) {
e.printStackTrace();
}
}
 
@Override
public void run() {
while(true){
try {
//1 必須要讓多路復(fù)用器開(kāi)始監(jiān)聽(tīng)
this.seletor.select();
//2 返回多路復(fù)用器已經(jīng)選擇的結(jié)果集
Iterator<SelectionKey>keys=this.seletor.selectedKeys().iterator();
//3 進(jìn)行遍歷
while(keys.hasNext()){
//4 獲取一個(gè)選擇的元素
SelectionKey key = keys.next();
//5 直接從容器中移除就可以了
keys.remove();
//6 如果是有效的
if(key.isValid()){
//7 如果為阻塞狀態(tài)
if(key.isAcceptable()){
this.accept(key);
}
//8 如果為可讀狀態(tài)
if(key.isReadable()){
this.read(key);
}
//9 寫(xiě)數(shù)據(jù)
if(key.isWritable()){
//this.write(key); //ssc
}
}

}
} catch (IOException e) {
e.printStackTrace();
}
}
}

private void write(SelectionKey key){
//ServerSocketChannel ssc =  (ServerSocketChannel) key.channel();
//ssc.register(this.seletor, SelectionKey.OP_WRITE);
}
 
private void read(SelectionKey key) {
try {
//1 清空緩沖區(qū)舊的數(shù)據(jù)
this.readBuf.clear();
//2 獲取之前注冊(cè)的socket通道對(duì)象
SocketChannel sc = (SocketChannel) key.channel();
//3 讀取數(shù)據(jù)
int count = sc.read(this.readBuf);
//4 如果沒(méi)有數(shù)據(jù)
if(count == -1){
key.channel().close();
key.cancel();
return;
}
//5 有數(shù)據(jù)則進(jìn)行讀取 讀取之前需要進(jìn)行復(fù)位方法(把position 和limit進(jìn)行復(fù)位)
this.readBuf.flip();
//6 根據(jù)緩沖區(qū)的數(shù)據(jù)長(zhǎng)度創(chuàng)建相應(yīng)大小的byte數(shù)組,接收緩沖區(qū)的數(shù)據(jù)
byte[] bytes = new byte[this.readBuf.remaining()];
//7 接收緩沖區(qū)數(shù)據(jù)
this.readBuf.get(bytes);
//8 打印結(jié)果
String body = new String(bytes).trim();
System.out.println("Server : " + body);

// 9..可以寫(xiě)回給客戶端數(shù)據(jù)

} catch (IOException e) {
e.printStackTrace();
}

}
 
private void accept(SelectionKey key) {
try {
//1 獲取服務(wù)通道
ServerSocketChannel ssc =  (ServerSocketChannel) key.channel();
//2 執(zhí)行阻塞方法
SocketChannel sc = ssc.accept();
//3 設(shè)置阻塞模式
sc.configureBlocking(false);
//4 注冊(cè)到多路復(fù)用器上性锭,并設(shè)置讀取標(biāo)識(shí)
sc.register(this.seletor, SelectionKey.OP_READ);
} catch (IOException e) {
e.printStackTrace();
}
}

public static void main(String[] args) {

new Thread(new Server(8765)).start();;
}


}

Client方法

public class Client {
 
//需要一個(gè)Selector
public static void main(String[] args) {

//創(chuàng)建連接的地址
InetSocketAddress address = new InetSocketAddress("127.0.0.1", 8765);

//聲明連接通道
SocketChannel sc = null;

//建立緩沖區(qū)
ByteBuffer buf = ByteBuffer.allocate(1024);

try {
//打開(kāi)通道
sc = SocketChannel.open();
//進(jìn)行連接
sc.connect(address);

while(true){
//定義一個(gè)字節(jié)數(shù)組赠潦,然后使用系統(tǒng)錄入功能:
byte[] bytes = new byte[1024];
System.in.read(bytes);

//把數(shù)據(jù)放到緩沖區(qū)中
buf.put(bytes);
//對(duì)緩沖區(qū)進(jìn)行復(fù)位
buf.flip();
//寫(xiě)出數(shù)據(jù)
sc.write(buf);
//清空緩沖區(qū)數(shù)據(jù)
buf.clear();
}
} catch (IOException e) {
e.printStackTrace();
} finally {
if(sc != null){
try {
sc.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}

}

}

創(chuàng)建NIO服務(wù)器的主要步驟:

  1. 打開(kāi)ServerSocketChannel,監(jiān)聽(tīng)客戶端連接

  2. 綁定監(jiān)聽(tīng)端口草冈,設(shè)置連接為非阻塞模式

  3. 創(chuàng)建Reactor線程她奥,創(chuàng)建多路復(fù)用器并啟動(dòng)線程

  4. 將ServerSocketChannel注冊(cè)到Reactor線程中的Selector上,監(jiān)聽(tīng)ACCEPT事件

  5. Selector輪詢準(zhǔn)備就緒的key

  6. Selector監(jiān)聽(tīng)到新的客戶端接入怎棱,處理新的接入請(qǐng)求哩俭,完成TCP三次握手,簡(jiǎn)歷物理鏈路

  7. 設(shè)置客戶端鏈路為非阻塞模式

  8. 將新接入的客戶端連接注冊(cè)到Reactor線程的Selector上蹄殃,監(jiān)聽(tīng)讀操作携茂,讀取客戶端發(fā)送的網(wǎng)絡(luò)消息

  9. 異步讀取客戶端消息到緩沖區(qū)

  10. 對(duì)Buffer編解碼,處理半包消息诅岩,將解碼成功的消息封裝成Task

  11. 將應(yīng)答消息編碼為Buffer,調(diào)用SocketChannel的write將消息異步發(fā)送給客戶端

    因?yàn)閼?yīng)答消息的發(fā)送带膜,SocketChannel也是異步非阻塞的吩谦,所以不能保證一次能吧需要發(fā)送的數(shù)據(jù)發(fā)送完,此時(shí)就會(huì)出現(xiàn)寫(xiě)半包的問(wèn)題膝藕。我們需要注冊(cè)寫(xiě)操作式廷,不斷輪詢Selector將沒(méi)有發(fā)送完的消息發(fā)送完畢,然后通過(guò)Buffer的hasRemain()方法判斷消息是否發(fā)送完成芭挽。

NIO存在的問(wèn)題:

使用NIO != 高性能滑废,當(dāng)連接數(shù)<1000,并發(fā)程度不高或者局域網(wǎng)環(huán)境下NIO并沒(méi)有顯著的性能優(yōu)勢(shì)袜爪。
NIO并沒(méi)有完全屏蔽平臺(tái)差異蠕趁,它仍然是基于各個(gè)操作系統(tǒng)的I/O系統(tǒng)實(shí)現(xiàn)的,差異仍然存在辛馆。使用NIO做網(wǎng)絡(luò)編程構(gòu)建事件驅(qū)動(dòng)模型并不容易俺陋,陷阱重重。
推薦大家使用成熟的NIO框架,如Netty腊状,MINA等诱咏。解決了很多NIO的陷阱,并屏蔽了操作系統(tǒng)的差異缴挖,有較好的性能和編程模型袋狞。

四.AIO (Asynchronous IO)

NIO 2 ,在Java 7提出映屋,NIO的升級(jí)版苟鸯。實(shí)現(xiàn)非阻塞異步的通信模式;
提供異步文件通道和異步套接字通道秧荆;
其read倔毙,write方法的返回類型都是Future對(duì)象;
而Future模型是異步的乙濒,其核心思想是:去主函數(shù)等待時(shí)間陕赃;
基于事件和回調(diào)機(jī)制。

AIO編程颁股,在NIO基礎(chǔ)之上引入了異步通道的概念么库。并提供異步文件和異步套接字通道的實(shí)現(xiàn),從而在真正意義上實(shí)現(xiàn)了異步非阻塞甘有,之前我們學(xué)過(guò)的NIO只是非阻塞而非異步诉儒。而AIO它不需要通過(guò)多路復(fù)用器對(duì)注冊(cè)的通道的進(jìn)行輪訓(xùn)操作即可實(shí)現(xiàn)異步讀寫(xiě),從而簡(jiǎn)化了NIO編程模型亏掀。也可以稱為NIO2.0忱反,這種模式才是真正的屬于異步非阻塞的模型。

AsynchronousServerScoketChannel
AsynchronousScoketChanel

示例代碼:
Server端代碼

public class Server {
//線程池
private ExecutorService executorService;
//線程組
private AsynchronousChannelGroup threadGroup;
//服務(wù)器通道
public AsynchronousServerSocketChannel assc;

public Server(int port){
try {
//創(chuàng)建一個(gè)緩存池
executorService = Executors.newCachedThreadPool();
//創(chuàng)建線程組
threadGroup = AsynchronousChannelGroup.withCachedThreadPool(executorService, 1);
//創(chuàng)建服務(wù)器通道
assc = AsynchronousServerSocketChannel.open(threadGroup);
//進(jìn)行綁定
assc.bind(new InetSocketAddress(port));

System.out.println("server start , port : " + port);
//進(jìn)行阻塞
assc.accept(this, new ServerCompletionHandler());
//一直阻塞 不讓服務(wù)器停止
Thread.sleep(Integer.MAX_VALUE);

} catch (Exception e) {
e.printStackTrace();
}
}

public static void main(String[] args) {
Server server = new Server(8765);
}

}

ServerCompletionHandler端代碼

Public class ServerCompletionHandler implements CompletionHandler<AsynchronousSocketChannel, Server> {

@Override
public void completed(AsynchronousSocketChannel asc, Server attachment) {
//當(dāng)有下一個(gè)客戶端接入的時(shí)候 直接調(diào)用Server的accept方法滤愕,這樣反復(fù)執(zhí)行下去温算,保證多個(gè)客戶端都可以阻塞(沒(méi)有遞歸上限),1.7以后AIO才實(shí)現(xiàn)了異步非阻塞
attachment.assc.accept(attachment, this);
read(asc);
}

private void read(final AsynchronousSocketChannel asc) {
//讀取數(shù)據(jù)
ByteBuffer buf = ByteBuffer.allocate(1024);
asc.read(buf, buf, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer resultSize, ByteBuffer attachment) {
//進(jìn)行讀取之后,重置標(biāo)識(shí)位
attachment.flip();
//獲得讀取的字節(jié)數(shù)
System.out.println("Server -> " + "收到客戶端的數(shù)據(jù)長(zhǎng)度為:" + resultSize);
//獲取讀取的數(shù)據(jù)
String resultData = new String(attachment.array()).trim();
System.out.println("Server -> " + "收到客戶端的數(shù)據(jù)信息為:" + resultData);
String response = "服務(wù)器響應(yīng), 收到了客戶端發(fā)來(lái)的數(shù)據(jù): " + resultData;
write(asc, response);
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
exc.printStackTrace();
}
});
}

private void write(AsynchronousSocketChannel asc, String response) {
try {
ByteBuffer buf = ByteBuffer.allocate(1024);
buf.put(response.getBytes());
buf.flip();
asc.write(buf).get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}

@Override
public void failed(Throwable exc, Server attachment) {
exc.printStackTrace();
}

}

Client端代碼

public class Clientimplements Runnable{

private AsynchronousSocketChannel asc ;

public Client() throws Exception {
asc = AsynchronousSocketChannel.open();
}

public void connect(){
asc.connect(new InetSocketAddress("127.0.0.1", 8765));
}

public void write(String request){
try {
asc.write(ByteBuffer.wrap(request.getBytes())).get();
read();
} catch (Exception e) {
e.printStackTrace();
}
}

private void read() {
ByteBuffer buf = ByteBuffer.allocate(1024);
try {
asc.read(buf).get();
buf.flip();
byte[] respByte = new byte[buf.remaining()];
buf.get(respByte);
System.out.println(new String(respByte,"utf-8").trim());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}

@Override
public void run() {
while(true){

}
}

public static void main(String[] args) throws Exception {
Client c1 = new Client();
c1.connect();

Client c2 = new Client();
c2.connect();

Client c3 = new Client();
c3.connect();

new Thread(c1, "c1").start();
new Thread(c2, "c2").start();
new Thread(c3, "c3").start();

Thread.sleep(1000);

c1.write("c1 aaa");
c2.write("c2 bbbb");
c3.write("c3 ccccc");
}

}

五.小結(jié)

  1. IO间影,阻塞同步通信模式注竿,客戶端與服務(wù)端三次握手,簡(jiǎn)單魂贬,吞吐量小巩割。關(guān)鍵詞:Socket和ServerSocket;
  2. NIO付燥,非阻塞同步通信模式宣谈,客戶端與服務(wù)端通過(guò)Channel連接,采用多路復(fù)用器輪詢注冊(cè)的Channel机蔗。關(guān)鍵詞:SocketChannel和ServerSocketChannel蒲祈;
  3. AIO甘萧,非阻塞異步通信模式,基于事件和回調(diào)機(jī)制梆掸,采用異步通道實(shí)現(xiàn)異步通信扬卷。關(guān)鍵詞:AsynchronousSocketChannel和AsynchronousServerSocketChannel。
  4. 適用場(chǎng)景:
  • BIO方式適用于連接數(shù)目比較小且固定的架構(gòu)酸钦,這種方式對(duì)服務(wù)器資源要求比較高怪得,并發(fā)局限于應(yīng)用中,JDK1.4以前的唯一選擇卑硫,但程序直觀簡(jiǎn)單易理解徒恋。
  • NIO方式適用于連接數(shù)目多且連接比較短(輕操作)的架構(gòu),比如聊天服務(wù)器欢伏,并發(fā)局限于應(yīng)用中入挣,編程比較復(fù)雜,JDK1.4開(kāi)始支持硝拧。
  • AIO方式使用于連接數(shù)目多且連接比較長(zhǎng)(重操作)的架構(gòu)径筏,比如相冊(cè)服務(wù)器,充分調(diào)用OS參與并發(fā)操作障陶,編程比較復(fù)雜滋恬,JDK7開(kāi)始支持。
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末抱究,一起剝皮案震驚了整個(gè)濱河市恢氯,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌鼓寺,老刑警劉巖勋拟,帶你破解...
    沈念sama閱讀 218,858評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異妈候,居然都是意外死亡指黎,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,372評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門州丹,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人杂彭,你說(shuō)我怎么就攤上這事墓毒。” “怎么了亲怠?”我有些...
    開(kāi)封第一講書(shū)人閱讀 165,282評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵所计,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我团秽,道長(zhǎng)主胧,這世上最難降的妖魔是什么叭首? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,842評(píng)論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮踪栋,結(jié)果婚禮上焙格,老公的妹妹穿的比我還像新娘。我一直安慰自己夷都,他們只是感情好眷唉,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,857評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著囤官,像睡著了一般冬阳。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上党饮,一...
    開(kāi)封第一講書(shū)人閱讀 51,679評(píng)論 1 305
  • 那天肝陪,我揣著相機(jī)與錄音,去河邊找鬼刑顺。 笑死氯窍,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的捏检。 我是一名探鬼主播荞驴,決...
    沈念sama閱讀 40,406評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼贯城!你這毒婦竟也來(lái)了熊楼?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 39,311評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤能犯,失蹤者是張志新(化名)和其女友劉穎鲫骗,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體踩晶,經(jīng)...
    沈念sama閱讀 45,767評(píng)論 1 315
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡执泰,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,945評(píng)論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了渡蜻。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片术吝。...
    茶點(diǎn)故事閱讀 40,090評(píng)論 1 350
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖茸苇,靈堂內(nèi)的尸體忽然破棺而出排苍,到底是詐尸還是另有隱情,我是刑警寧澤学密,帶...
    沈念sama閱讀 35,785評(píng)論 5 346
  • 正文 年R本政府宣布淘衙,位于F島的核電站,受9級(jí)特大地震影響腻暮,放射性物質(zhì)發(fā)生泄漏彤守。R本人自食惡果不足惜毯侦,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,420評(píng)論 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望具垫。 院中可真熱鬧侈离,春花似錦、人聲如沸做修。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,988評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)饰及。三九已至蔗坯,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間燎含,已是汗流浹背宾濒。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,101評(píng)論 1 271
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留屏箍,地道東北人绘梦。 一個(gè)月前我還...
    沈念sama閱讀 48,298評(píng)論 3 372
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像赴魁,于是被迫代替她去往敵國(guó)和親卸奉。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,033評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容