Netty 源碼分析之 番外篇 Java NIO 的前生今世


簡介

Java NIO 是由 Java 1.4 引進的異步 IO.
Java NIO 由以下幾個核心部分組成:

  • Channel
  • Buffer
  • Selector

NIO 和 IO 的對比

IO 和 NIO 的區(qū)別主要體現在三個方面:

  • IO 基于流(Stream oriented), 而 NIO 基于 Buffer (Buffer oriented)
  • IO 操作是阻塞的, 而 NIO 操作是非阻塞的
  • IO 沒有 selector 概念, 而 NIO 有 selector 概念.

基于 Stream 與基于 Buffer

傳統(tǒng)的 IO 是面向字節(jié)流或字符流的, 而在 NIO 中, 我們拋棄了傳統(tǒng)的 IO 流, 而是引入了 ChannelBuffer 的概念. 在 NIO 中, 我只能從 Channel 中讀取數據到 Buffer 中或將數據從 Buffer 中寫入到 Channel.
那么什么是 基于流 呢? 在一般的 Java IO 操作中, 我們以流式的方式順序地從一個 Stream 中讀取一個或多個字節(jié), 因此我們也就不能隨意改變讀取指針的位置.
基于 Buffer 就顯得有點不同了. 我們首先需要從 Channel 中讀取數據到 Buffer 中, 當 Buffer 中有數據后, 我們就可以對這些數據進行操作了. 不像 IO 那樣是順序操作, NIO 中我們可以隨意地讀取任意位置的數據.

阻塞和非阻塞

Java 提供的各種 Stream 操作都是阻塞的, 例如我們調用一個 read 方法讀取一個文件的內容, 那么調用 read 的線程會被阻塞住, 直到 read 操作完成.
而 NIO 的非阻塞模式允許我們非阻塞地進行 IO 操作. 例如我們需要從網絡中讀取數據, 在 NIO 的非阻塞模式中, 當我們調用 read 方法時, 如果此時有數據, 則 read 讀取并返回; 如果此時沒有數據, 則 read 直接返回, 而不會阻塞當前線程.

selector

selector 是 NIO 中才有的概念, 它是 Java NIO 之所以可以非阻塞地進行 IO 操作的關鍵.
通過 Selector, 一個線程可以監(jiān)聽多個 Channel 的 IO 事件, 當我們向一個 Selector 中注冊了 Channel 后, Selector 內部的機制就可以自動地為我們不斷地查詢(select) 這些注冊的 Channel 是否有已就緒的 IO 事件(例如可讀, 可寫, 網絡連接完成等). 通過這樣的 Selector 機制, 我們就可以很簡單地使用一個線程高效地管理多個 Channel 了.

Java NIO Channel

通常來說, 所有的 NIO 的 I/O 操作都是從 Channel 開始的. 一個 channel 類似于一個 stream.
java Stream 和 NIO Channel 對比

  • 我們可以在同一個 Channel 中執(zhí)行讀和寫操作, 然而同一個 Stream 僅僅支持讀或寫.
  • Channel 可以異步地讀寫, 而 Stream 是阻塞的同步讀寫.
  • Channel 總是從 Buffer 中讀取數據, 或將數據寫入到 Buffer 中.

Channel 類型有:

  • FileChannel, 文件操作
  • DatagramChannel, UDP 操作
  • SocketChannel, TCP 操作
  • ServerSocketChannel, TCP 操作, 使用在服務器端.

這些通道涵蓋了 UDP 和 TCP網絡 IO以及文件 IO.
基本的 Channel 使用例子:

public static void main( String[] args ) throws Exception
{
    RandomAccessFile aFile = new RandomAccessFile("/Users/xiongyongshun/settings.xml", "rw");
    FileChannel inChannel = aFile.getChannel();

    ByteBuffer buf = ByteBuffer.allocate(48);

    int bytesRead = inChannel.read(buf);
    while (bytesRead != -1) {
        buf.flip();

        while(buf.hasRemaining()){
            System.out.print((char) buf.get());
        }

        buf.clear();
        bytesRead = inChannel.read(buf);
    }
    aFile.close();
}

FileChannel

FileChannel 是操作文件的Channel, 我們可以通過 FileChannel 從一個文件中讀取數據, 也可以將數據寫入到文件中.
注意, FileChannel 不能設置為非阻塞模式.

打開 FileChannel

RandomAccessFile aFile     = new RandomAccessFile("data/nio-data.txt", "rw");
FileChannel      inChannel = aFile.getChannel();

從 FileChannel 中讀取數據

ByteBuffer buf = ByteBuffer.allocate(48);
int bytesRead = inChannel.read(buf);

寫入數據

String newData = "New String to write to file..." + System.currentTimeMillis();

ByteBuffer buf = ByteBuffer.allocate(48);
buf.clear();
buf.put(newData.getBytes());

buf.flip();

while(buf.hasRemaining()) {
    channel.write(buf);
}

關閉

當我們對 FileChannel 的操作完成后, 必須將其關閉

channel.close(); 

設置 position

long pos channel.position();
channel.position(pos +123);

文件大小

我們可以通過 channel.size()獲取關聯到這個 Channel 中的文件的大小. 注意, 這里返回的是文件的大小, 而不是 Channel 中剩余的元素個數.

截斷文件

channel.truncate(1024);

將文件的大小截斷為1024字節(jié).

強制寫入

我們可以強制將緩存的未寫入的數據寫入到文件中:

channel.force(true);

SocketChannel

SocketChannel 是一個客戶端用來進行 TCP 連接的 Channel.
創(chuàng)建一個 SocketChannel 的方法有兩種:

  • 打開一個 SocketChannel, 然后將其連接到某個服務器中
  • 當一個 ServerSocketChannel 接受到連接請求時, 會返回一個 SocketChannel 對象.

打開 SocketChannel

SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress("http://example.com", 80));

關閉

socketChannel.close(); 

讀取數據

ByteBuffer buf = ByteBuffer.allocate(48);
int bytesRead = socketChannel.read(buf);

如果 read()返回 -1, 那么表示連接中斷了.

寫入數據

String newData = "New String to write to file..." + System.currentTimeMillis();

ByteBuffer buf = ByteBuffer.allocate(48);
buf.clear();
buf.put(newData.getBytes());

buf.flip();

while(buf.hasRemaining()) {
    channel.write(buf);
}

非阻塞模式

我們可以設置 SocketChannel 為異步模式, 這樣我們的 connect, read, write 都是異步的了.

連接
socketChannel.configureBlocking(false);
socketChannel.connect(new InetSocketAddress("http://example.com", 80));

while(! socketChannel.finishConnect() ){
    //wait, or do something else...    
}

在異步模式中, 或許連接還沒有建立, connect 方法就返回了, 因此我們需要檢查當前是否是連接到了主機, 因此通過一個 while 循環(huán)來判斷.

讀寫

在異步模式下, 讀寫的方式是一樣的.
在讀取時, 因為是異步的, 因此我們必須檢查 read 的返回值, 來判斷當前是否讀取到了數據.

ServerSocketChannel

ServerSocketChannel 顧名思義, 是用在服務器為端的, 可以監(jiān)聽客戶端的 TCP 連接, 例如:

ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().bind(new InetSocketAddress(9999));
while(true){
    SocketChannel socketChannel =
            serverSocketChannel.accept();

    //do something with socketChannel...
}

打開 關閉

ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.close();

監(jiān)聽連接

我們可以使用ServerSocketChannel.accept()方法來監(jiān)聽客戶端的 TCP 連接請求, accept()方法會阻塞, 直到有連接到來, 當有連接時, 這個方法會返回一個 SocketChannel 對象:

while(true){
    SocketChannel socketChannel =
            serverSocketChannel.accept();

    //do something with socketChannel...
}

非阻塞模式

在非阻塞模式下, accept()是非阻塞的, 因此如果此時沒有連接到來, 那么 accept()方法會返回null:

ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

serverSocketChannel.socket().bind(new InetSocketAddress(9999));
serverSocketChannel.configureBlocking(false);

while(true){
    SocketChannel socketChannel =
            serverSocketChannel.accept();

    if(socketChannel != null){
        //do something with socketChannel...
        }
}

DatagramChannel

DatagramChannel 是用來處理 UDP 連接的.

打開

DatagramChannel channel = DatagramChannel.open();
channel.socket().bind(new InetSocketAddress(9999));

讀取數據

ByteBuffer buf = ByteBuffer.allocate(48);
buf.clear();

channel.receive(buf);

發(fā)送數據

String newData = "New String to write to file..."
                    + System.currentTimeMillis();
    
ByteBuffer buf = ByteBuffer.allocate(48);
buf.clear();
buf.put(newData.getBytes());
buf.flip();

int bytesSent = channel.send(buf, new InetSocketAddress("example.com", 80));

連接到指定地址

因為 UDP 是非連接的, 因此這個的 connect 并不是向 TCP 一樣真正意義上的連接, 而是它會講 DatagramChannel 鎖住, 因此我們僅僅可以從指定的地址中讀取或寫入數據.

channel.connect(new InetSocketAddress("example.com", 80));

Java NIO Buffer

當我們需要與 NIO Channel 進行交互時, 我們就需要使用到 NIO Buffer, 即數據從 Buffer讀取到 Channel 中, 并且從 Channel 中寫入到 Buffer 中.
實際上, 一個 Buffer 其實就是一塊內存區(qū)域, 我們可以在這個內存區(qū)域中進行數據的讀寫. NIO Buffer 其實是這樣的內存塊的一個封裝, 并提供了一些操作方法讓我們能夠方便地進行數據的讀寫.
Buffer 類型有:

  • ByteBuffer
  • CharBuffer
  • DoubleBuffer
  • FloatBuffer
  • IntBuffer
  • LongBuffer
  • ShortBuffer

這些 Buffer 覆蓋了能從 IO 中傳輸的所有的 Java 基本數據類型.

NIO Buffer 的基本使用

使用 NIO Buffer 的步驟如下:

  • 將數據寫入到 Buffer 中.
  • 調用 Buffer.flip()方法, 將 NIO Buffer 轉換為讀模式.
  • 從 Buffer 中讀取數據
  • 調用 Buffer.clear() 或 Buffer.compact()方法, 將 Buffer 轉換為寫模式.

當我們將數據寫入到 Buffer 中時, Buffer 會記錄我們已經寫了多少的數據, 當我們需要從 Buffer 中讀取數據時, 必須調用 Buffer.flip()將 Buffer 切換為讀模式.
一旦讀取了所有的 Buffer 數據, 那么我們必須清理 Buffer, 讓其從新可寫, 清理 Buffer 可以調用 Buffer.clear() 或 Buffer.compact().
例如:

public class Test {
    public static void main(String[] args) {
        IntBuffer intBuffer = IntBuffer.allocate(2);
        intBuffer.put(12345678);
        intBuffer.put(2);
        intBuffer.flip();
        System.err.println(intBuffer.get());
        System.err.println(intBuffer.get());
    }
}

上述中, 我們分配兩個單位大小的 IntBuffer, 因此它可以寫入兩個 int 值.
我們使用 put 方法將 int 值寫入, 然后使用 flip 方法將 buffer 轉換為讀模式, 然后連續(xù)使用 get 方法從 buffer 中獲取這兩個 int 值.
每當調用一次 get 方法讀取數據時, buffer 的讀指針都會向前移動一個單位長度(在這里是一個 int 長度)

Buffer 屬性

一個 Buffer 有三個屬性:

  • capacity
  • position
  • limit

其中 positionlimit 的含義與 Buffer 處于讀模式或寫模式有關, 而 capacity 的含義與 Buffer 所處的模式無關.

Capacity

一個內存塊會有一個固定的大小, 即容量(capacity), 我們最多寫入capacity 個單位的數據到 Buffer 中, 例如一個 DoubleBuffer, 其 Capacity 是100, 那么我們最多可以寫入100個 double 數據.

Position

當從一個 Buffer 中寫入數據時, 我們是從 Buffer 的一個確定的位置(position)開始寫入的. 在最初的狀態(tài)時, position 的值是0. 每當我們寫入了一個單位的數據后, position 就會遞增一.
當我們從 Buffer 中讀取數據時, 我們也是從某個特定的位置開始讀取的. 當我們調用了 filp()方法將 Buffer 從寫模式轉換到讀模式時, position 的值會自動被設置為0, 每當我們讀取一個單位的數據, position 的值遞增1.
position 表示了讀寫操作的位置指針.

limit

limit - position 表示此時還可以寫入/讀取多少單位的數據.
例如在寫模式, 如果此時 limit 是10, position 是2, 則表示已經寫入了2個單位的數據, 還可以寫入 10 - 2 = 8 個單位的數據.

例子:

public class Test {
    public static void main(String args[]) {
        IntBuffer intBuffer = IntBuffer.allocate(10);
        intBuffer.put(10);
        intBuffer.put(101);
        System.err.println("Write mode: ");
        System.err.println("\tCapacity: " + intBuffer.capacity());
        System.err.println("\tPosition: " + intBuffer.position());
        System.err.println("\tLimit: " + intBuffer.limit());

        intBuffer.flip();
        System.err.println("Read mode: ");
        System.err.println("\tCapacity: " + intBuffer.capacity());
        System.err.println("\tPosition: " + intBuffer.position());
        System.err.println("\tLimit: " + intBuffer.limit());
    }
}

這里我們首先寫入兩個 int 值, 此時 capacity = 10, position = 2, limit = 10.
然后我們調用 flip 轉換為讀模式, 此時 capacity = 10, position = 0, limit = 2;

分配 Buffer

為了獲取一個 Buffer 對象, 我們首先需要分配內存空間. 每個類型的 Buffer 都有一個 allocate()方法, 我們可以通過這個方法分配 Buffer:

ByteBuffer buf = ByteBuffer.allocate(48);

這里我們分配了48 * sizeof(Byte)字節(jié)的內存空間.

CharBuffer buf = CharBuffer.allocate(1024);

這里我們分配了大小為1024個字符的 Buffer, 即 這個 Buffer 可以存儲1024 個 Char, 其大小為 1024 * 2 個字節(jié).

關于 Direct Buffer 和 Non-Direct Buffer 的區(qū)別

Direct Buffer:

  • 所分配的內存不在 JVM 堆上, 不受 GC 的管理.(但是 Direct Buffer 的 Java 對象是由 GC 管理的, 因此當發(fā)生 GC, 對象被回收時, Direct Buffer 也會被釋放)
  • 因為 Direct Buffer 不在 JVM 堆上分配, 因此 Direct Buffer 對應用程序的內存占用的影響就不那么明顯(實際上還是占用了這么多內存, 但是 JVM 不好統(tǒng)計到非 JVM 管理的內存.)
  • 申請和釋放 Direct Buffer 的開銷比較大. 因此正確的使用 Direct Buffer 的方式是在初始化時申請一個 Buffer, 然后不斷復用此 buffer, 在程序結束后才釋放此 buffer.
  • 使用 Direct Buffer 時, 當進行一些底層的系統(tǒng) IO 操作時, 效率會比較高, 因為此時 JVM 不需要拷貝 buffer 中的內存到中間臨時緩沖區(qū)中.

Non-Direct Buffer:

  • 直接在 JVM 堆上進行內存的分配, 本質上是 byte[] 數組的封裝.
  • 因為 Non-Direct Buffer 在 JVM 堆中, 因此當進行操作系統(tǒng)底層 IO 操作中時, 會將此 buffer 的內存復制到中間臨時緩沖區(qū)中. 因此 Non-Direct Buffer 的效率就較低.

寫入數據到 Buffer

int bytesRead = inChannel.read(buf); //read into buffer.
buf.put(127);

從 Buffer 中讀取數據

//read from buffer into channel.
int bytesWritten = inChannel.write(buf);
byte aByte = buf.get();

重置 position

Buffer.rewind()方法可以重置 position 的值為0, 因此我們可以重新讀取/寫入 Buffer 了.
如果是讀模式, 則重置的是讀模式的 position, 如果是寫模式, 則重置的是寫模式的 position.
例如:

public class Test {
    public static void main(String[] args) {
        IntBuffer intBuffer = IntBuffer.allocate(2);
        intBuffer.put(1);
        intBuffer.put(2);
        System.err.println("position: " + intBuffer.position());

        intBuffer.rewind();
        System.err.println("position: " + intBuffer.position());
        intBuffer.put(1);
        intBuffer.put(2);
        System.err.println("position: " + intBuffer.position());

        
        intBuffer.flip();
        System.err.println("position: " + intBuffer.position());
        intBuffer.get();
        intBuffer.get();
        System.err.println("position: " + intBuffer.position());

        intBuffer.rewind();
        System.err.println("position: " + intBuffer.position());
    }
}

rewind() 主要針對于讀模式. 在讀模式時, 讀取到 limit 后, 可以調用 rewind() 方法, 將讀 position 置為0.

關于 mark()和 reset()

我們可以通過調用 Buffer.mark()將當前的 position 的值保存起來, 隨后可以通過調用 Buffer.reset()方法將 position 的值回復回來.
例如:

public class Test {
    public static void main(String[] args) {
        IntBuffer intBuffer = IntBuffer.allocate(2);
        intBuffer.put(1);
        intBuffer.put(2);
        intBuffer.flip();
        System.err.println(intBuffer.get());
        System.err.println("position: " + intBuffer.position());
        intBuffer.mark();
        System.err.println(intBuffer.get());

        System.err.println("position: " + intBuffer.position());
        intBuffer.reset();
        System.err.println("position: " + intBuffer.position());
        System.err.println(intBuffer.get());
    }
}

這里我們寫入兩個 int 值, 然后首先讀取了一個值. 此時讀 position 的值為1.
接著我們調用 mark() 方法將當前的 position 保存起來(在讀模式, 因此保存的是讀的 position), 然后再次讀取, 此時 position 就是2了.
接著使用 reset() 恢復原來的讀 position, 因此讀 position 就為1, 可以再次讀取數據.

flip, rewind 和 clear 的區(qū)別

flip

方法源碼:

public final Buffer flip() {
    limit = position;
    position = 0;
    mark = -1;
    return this;
}

Buffer 的讀/寫模式共用一個 position 和 limit 變量.
當從寫模式變?yōu)樽x模式時, 原先的 寫 position 就變成了讀模式的 limit.

rewind

方法源碼

public final Buffer rewind() {
    position = 0;
    mark = -1;
    return this;
}

rewind, 即倒帶, 這個方法僅僅是將 position 置為0.

clear

方法源碼:

public final Buffer clear() {
    position = 0;
    limit = capacity;
    mark = -1;
    return this;
}

根據源碼我們可以知道, clear 將 positin 設置為0, 將 limit 設置為 capacity.
clear 方法使用場景:

  • 在一個已經寫滿數據的 buffer 中, 調用 clear, 可以從頭讀取 buffer 的數據.
  • 為了將一個 buffer 填充滿數據, 可以調用 clear, 然后一直寫入, 直到達到 limit.
例子:
IntBuffer intBuffer = IntBuffer.allocate(2);
intBuffer.flip();
System.err.println("position: " + intBuffer.position());
System.err.println("limit: " + intBuffer.limit());
System.err.println("capacity: " + intBuffer.capacity());

// 這里不能讀, 因為 limit == position == 0, 沒有數據.
//System.err.println(intBuffer.get());

intBuffer.clear();
System.err.println("position: " + intBuffer.position());
System.err.println("limit: " + intBuffer.limit());
System.err.println("capacity: " + intBuffer.capacity());

// 這里可以讀取數據了, 因為 clear 后, limit == capacity == 2, position == 0,
// 即使我們沒有寫入任何的數據到 buffer 中.
System.err.println(intBuffer.get()); // 讀取到0
System.err.println(intBuffer.get()); // 讀取到0

Buffer 的比較

我們可以通過 equals() 或 compareTo() 方法比較兩個 Buffer, 當且僅當如下條件滿足時, 兩個 Buffer 是相等的:

  • 兩個 Buffer 是相同類型的
  • 兩個 Buffer 的剩余的數據個數是相同的
  • 兩個 Buffer 的剩余的數據都是相同的.

通過上述條件我們可以發(fā)現, 比較兩個 Buffer 時, 并不是 Buffer 中的每個元素都進行比較, 而是比較 Buffer 中剩余的元素.

Selector

Selector 允許一個單一的線程來操作多個 Channel. 如果我們的應用程序中使用了多個 Channel, 那么使用 Selector 很方便的實現這樣的目的, 但是因為在一個線程中使用了多個 Channel, 因此也會造成了每個 Channel 傳輸效率的降低.
使用 Selector 的圖解如下:
![Alt text](./Selector 圖解.png)

為了使用 Selector, 我們首先需要將 Channel 注冊到 Selector 中, 隨后調用 Selector 的 select()方法, 這個方法會阻塞, 直到注冊在 Selector 中的 Channel 發(fā)送可讀寫事件. 當這個方法返回后, 當前的這個線程就可以處理 Channel 的事件了.

創(chuàng)建選擇器

通過 Selector.open()方法, 我們可以創(chuàng)建一個選擇器:

Selector selector = Selector.open();

將 Channel 注冊到選擇器中

為了使用選擇器管理 Channel, 我們需要將 Channel 注冊到選擇器中:

channel.configureBlocking(false);

SelectionKey key = channel.register(selector, SelectionKey.OP_READ);

注意, 如果一個 Channel 要注冊到 Selector 中, 那么這個 Channel 必須是非阻塞的, 即channel.configureBlocking(false);
因為 Channel 必須要是非阻塞的, 因此 FileChannel 是不能夠使用選擇器的, 因為 FileChannel 都是阻塞的.

注意到, 在使用 Channel.register()方法時, 第二個參數指定了我們對 Channel 的什么類型的事件感興趣, 這些事件有:

  • Connect, 即連接事件(TCP 連接), 對應于SelectionKey.OP_CONNECT
  • Accept, 即確認事件, 對應于SelectionKey.OP_ACCEPT
  • Read, 即讀事件, 對應于SelectionKey.OP_READ, 表示 buffer 可讀.
  • Write, 即寫事件, 對應于SelectionKey.OP_WRITE, 表示 buffer 可寫.

一個 Channel發(fā)出一個事件也可以稱為** 對于某個事件, Channel 準備好了. 因此一個 Channel 成功連接到了另一個服務器也可以被稱為 connect ready.
我們可以使用或運算
|**來組合多個事件, 例如:

int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;    

注意, 一個 Channel 僅僅可以被注冊到一個 Selector 一次, 如果將 Channel 注冊到 Selector 多次, 那么其實就是相當于更新 SelectionKey 的 interest set. 例如:

channel.register(selector, SelectionKey.OP_READ);
channel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);

上面的 channel 注冊到同一個 Selector 兩次了, 那么第二次的注冊其實就是相當于更新這個 Channel 的 interest set 為 SelectionKey.OP_READ | SelectionKey.OP_WRITE.

關于 SelectionKey

如上所示, 當我們使用 register 注冊一個 Channel 時, 會返回一個 SelectionKey 對象, 這個對象包含了如下內容:

  • interest set, 即我們感興趣的事件集, 即在調用 register 注冊 channel 時所設置的 interest set.
  • ready set
  • channel
  • selector
  • attached object, 可選的附加對象

interest set

我們可以通過如下方式獲取 interest set:

int interestSet = selectionKey.interestOps();

boolean isInterestedInAccept  = interestSet & SelectionKey.OP_ACCEPT;
boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT;
boolean isInterestedInRead    = interestSet & SelectionKey.OP_READ;
boolean isInterestedInWrite   = interestSet & SelectionKey.OP_WRITE;    

ready set

代表了 Channel 所準備好了的操作.
我們可以像判斷 interest set 一樣操作 Ready set, 但是我們還可以使用如下方法進行判斷:

int readySet = selectionKey.readyOps();

selectionKey.isAcceptable();
selectionKey.isConnectable();
selectionKey.isReadable();
selectionKey.isWritable();

Channel 和 Selector

我們可以通過 SelectionKey 獲取相對應的 Channel 和 Selector:

Channel  channel  = selectionKey.channel();
Selector selector = selectionKey.selector();  

Attaching Object

我們可以在selectionKey中附加一個對象:

selectionKey.attach(theObject);
Object attachedObj = selectionKey.attachment();

或者在注冊時直接附加:

SelectionKey key = channel.register(selector, SelectionKey.OP_READ, theObject);

通過 Selector 選擇 Channel

我們可以通過 Selector.select()方法獲取對某件事件準備好了的 Channel, 即如果我們在注冊 Channel 時, 對其的可寫事件感興趣, 那么當 select()返回時, 我們就可以獲取 Channel 了.

注意, select()方法返回的值表示有多少個 Channel 可操作.

獲取可操作的 Channel

如果 select()方法返回值表示有多個 Channel 準備好了, 那么我們可以通過 Selected key set 訪問這個 Channel:

Set<SelectionKey> selectedKeys = selector.selectedKeys();

Iterator<SelectionKey> keyIterator = selectedKeys.iterator();

while(keyIterator.hasNext()) {
    
    SelectionKey key = keyIterator.next();

    if(key.isAcceptable()) {
        // a connection was accepted by a ServerSocketChannel.

    } else if (key.isConnectable()) {
        // a connection was established with a remote server.

    } else if (key.isReadable()) {
        // a channel is ready for reading

    } else if (key.isWritable()) {
        // a channel is ready for writing
    }

    keyIterator.remove();
}

注意, 在每次迭代時, 我們都調用 "keyIterator.remove()" 將這個 key 從迭代器中刪除, 因為 select() 方法僅僅是簡單地將就緒的 IO 操作放到 selectedKeys 集合中, 因此如果我們從 selectedKeys 獲取到一個 key, 但是沒有將它刪除, 那么下一次 select 時, 這個 key 所對應的 IO 事件還在 selectedKeys 中.

例如此時我們收到 OP_ACCEPT 通知, 然后我們進行相關處理, 但是并沒有將這個 Key 從 SelectedKeys 中刪除, 那么下一次 select() 返回時 我們還可以在 SelectedKeys 中獲取到 OP_ACCEPT 的 key.

注意, 我們可以動態(tài)更改 SekectedKeys 中的 key 的 interest set.

例如在 OP_ACCEPT 中, 我們可以將 interest set 更新為 OP_READ, 這樣 Selector 就會將這個 Channel 的 讀 IO 就緒事件包含進來了.

Selector 的基本使用流程

  1. 通過 Selector.open() 打開一個 Selector.
  2. 將 Channel 注冊到 Selector 中, 并設置需要監(jiān)聽的事件(interest set)
  3. 不斷重復:
  • 調用 select() 方法
  • 調用 selector.selectedKeys() 獲取 selected keys
  • 迭代每個 selected key:
  • *從 selected key 中獲取 對應的 Channel 和附加信息(如果有的話)
  • *判斷是哪些 IO 事件已經就緒了, 然后處理它們. 如果是 OP_ACCEPT 事件, 則調用 "SocketChannel clientChannel = ((ServerSocketChannel) key.channel()).accept()" 獲取 SocketChannel, 并將它設置為 非阻塞的, 然后將這個 Channel 注冊到 Selector 中.
  • *根據需要更改 selected key 的監(jiān)聽事件.
  • *將已經處理過的 key 從 selected keys 集合中刪除.

關閉 Selector

當調用了 Selector.close()方法時, 我們其實是關閉了 Selector 本身并且將所有的 SelectionKey 失效, 但是并不會關閉 Channel.

完整的 Selector 例子

public class NioEchoServer {
    private static final int BUF_SIZE = 256;
    private static final int TIMEOUT = 3000;

    public static void main(String args[]) throws Exception {
        // 打開服務端 Socket
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

        // 打開 Selector
        Selector selector = Selector.open();

        // 服務端 Socket 監(jiān)聽8080端口, 并配置為非阻塞模式
        serverSocketChannel.socket().bind(new InetSocketAddress(8080));
        serverSocketChannel.configureBlocking(false);

        // 將 channel 注冊到 selector 中.
        // 通常我們都是先注冊一個 OP_ACCEPT 事件, 然后在 OP_ACCEPT 到來時, 再將這個 Channel 的 OP_READ
        // 注冊到 Selector 中.
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

        while (true) {
            // 通過調用 select 方法, 阻塞地等待 channel I/O 可操作
            if (selector.select(TIMEOUT) == 0) {
                System.out.print(".");
                continue;
            }

            // 獲取 I/O 操作就緒的 SelectionKey, 通過 SelectionKey 可以知道哪些 Channel 的哪類 I/O 操作已經就緒.
            Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();

            while (keyIterator.hasNext()) {

                // 當獲取一個 SelectionKey 后, 就要將它刪除, 表示我們已經對這個 IO 事件進行了處理.
                keyIterator.remove();

                SelectionKey key = keyIterator.next();

                if (key.isAcceptable()) {
                    // 當 OP_ACCEPT 事件到來時, 我們就有從 ServerSocketChannel 中獲取一個 SocketChannel,
                    // 代表客戶端的連接
                    // 注意, 在 OP_ACCEPT 事件中, 從 key.channel() 返回的 Channel 是 ServerSocketChannel.
                    // 而在 OP_WRITE 和 OP_READ 中, 從 key.channel() 返回的是 SocketChannel.
                    SocketChannel clientChannel = ((ServerSocketChannel) key.channel()).accept();
                    clientChannel.configureBlocking(false);
                    //在 OP_ACCEPT 到來時, 再將這個 Channel 的 OP_READ 注冊到 Selector 中.
                    // 注意, 這里我們如果沒有設置 OP_READ 的話, 即 interest set 仍然是 OP_CONNECT 的話, 那么 select 方法會一直直接返回.
                    clientChannel.register(key.selector(), OP_READ, ByteBuffer.allocate(BUF_SIZE));
                }

                if (key.isReadable()) {
                    SocketChannel clientChannel = (SocketChannel) key.channel();
                    ByteBuffer buf = (ByteBuffer) key.attachment();
                    long bytesRead = clientChannel.read(buf);
                    if (bytesRead == -1) {
                        clientChannel.close();
                    } else if (bytesRead > 0) {
                        key.interestOps(OP_READ | SelectionKey.OP_WRITE);
                        System.out.println("Get data length: " + bytesRead);
                    }
                }

                if (key.isValid() && key.isWritable()) {
                    ByteBuffer buf = (ByteBuffer) key.attachment();
                    buf.flip();
                    SocketChannel clientChannel = (SocketChannel) key.channel();

                    clientChannel.write(buf);

                    if (!buf.hasRemaining()) {
                        key.interestOps(OP_READ);
                    }
                    buf.compact();
                }
            }
        }
    }
}
最后編輯于
?著作權歸作者所有,轉載或內容合作請聯系作者
  • 序言:七十年代末务傲,一起剝皮案震驚了整個濱河市咳焚,隨后出現的幾起案子弧腥,更是在濱河造成了極大的恐慌宏侍,老刑警劉巖,帶你破解...
    沈念sama閱讀 216,324評論 6 498
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件医增,死亡現場離奇詭異慎皱,居然都是意外死亡,警方通過查閱死者的電腦和手機叶骨,發(fā)現死者居然都...
    沈念sama閱讀 92,356評論 3 392
  • 文/潘曉璐 我一進店門茫多,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人忽刽,你說我怎么就攤上這事天揖《嵊” “怎么了?”我有些...
    開封第一講書人閱讀 162,328評論 0 353
  • 文/不壞的土叔 我叫張陵今膊,是天一觀的道長些阅。 經常有香客問我,道長斑唬,這世上最難降的妖魔是什么市埋? 我笑而不...
    開封第一講書人閱讀 58,147評論 1 292
  • 正文 為了忘掉前任昌跌,我火速辦了婚禮件炉,結果婚禮上,老公的妹妹穿的比我還像新娘篡撵。我一直安慰自己褐着,他們只是感情好弓千,可當我...
    茶點故事閱讀 67,160評論 6 388
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著献起,像睡著了一般。 火紅的嫁衣襯著肌膚如雪镣陕。 梳的紋絲不亂的頭發(fā)上谴餐,一...
    開封第一講書人閱讀 51,115評論 1 296
  • 那天,我揣著相機與錄音呆抑,去河邊找鬼岂嗓。 笑死,一個胖子當著我的面吹牛鹊碍,可吹牛的內容都是我干的厌殉。 我是一名探鬼主播,決...
    沈念sama閱讀 40,025評論 3 417
  • 文/蒼蘭香墨 我猛地睜開眼侈咕,長吁一口氣:“原來是場噩夢啊……” “哼公罕!你這毒婦竟也來了?” 一聲冷哼從身側響起耀销,我...
    開封第一講書人閱讀 38,867評論 0 274
  • 序言:老撾萬榮一對情侶失蹤楼眷,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后熊尉,有當地人在樹林里發(fā)現了一具尸體罐柳,經...
    沈念sama閱讀 45,307評論 1 310
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,528評論 2 332
  • 正文 我和宋清朗相戀三年狰住,在試婚紗的時候發(fā)現自己被綠了张吉。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,688評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡催植,死狀恐怖肮蛹,靈堂內的尸體忽然破棺而出勺择,到底是詐尸還是另有隱情,我是刑警寧澤蔗崎,帶...
    沈念sama閱讀 35,409評論 5 343
  • 正文 年R本政府宣布酵幕,位于F島的核電站,受9級特大地震影響缓苛,放射性物質發(fā)生泄漏芳撒。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,001評論 3 325
  • 文/蒙蒙 一未桥、第九天 我趴在偏房一處隱蔽的房頂上張望笔刹。 院中可真熱鬧,春花似錦冬耿、人聲如沸舌菜。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,657評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽日月。三九已至,卻和暖如春缤骨,著一層夾襖步出監(jiān)牢的瞬間爱咬,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,811評論 1 268
  • 我被黑心中介騙來泰國打工绊起, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留精拟,地道東北人。 一個月前我還...
    沈念sama閱讀 47,685評論 2 368
  • 正文 我出身青樓虱歪,卻偏偏與公主長得像蜂绎,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子笋鄙,可洞房花燭夜當晚...
    茶點故事閱讀 44,573評論 2 353

推薦閱讀更多精彩內容

  • Java NIO(New IO)是從Java 1.4版本開始引入的一個新的IO API师枣,可以替代標準的Java I...
    JackChen1024閱讀 7,555評論 1 143
  • (轉載說明:本文非原創(chuàng),轉載自http://ifeve.com/java-nio-all/) Java NIO: ...
    數獨題閱讀 805評論 0 3
  • 這兩天了解了一下關于NIO方面的知識萧落,網上關于這一塊的介紹只是介紹了一下基本用法坛吁,沒有系統(tǒng)的解釋NIO與阻塞、非阻...
    Ruheng閱讀 7,124評論 5 48
  • (轉載說明:本文非原創(chuàng),轉載自http://ifeve.com/java-nio-all/) Java NIO: ...
    柳岸閱讀 818評論 0 3
  • 原文地址http://www.importnew.com/19816.html 概述 NIO主要有三個核心部分:C...
    期待現在閱讀 864評論 0 4