Java - NIO網(wǎng)絡(luò)編程

JAVA NIO

始于Java1.4, 提供了新的Java IO 操作非阻塞API。目的是替代Java IO 和 JAVA Networking相關(guān)的API艘款。

NIO中有三個核心的組件:

  • Buffer 緩沖區(qū)
  • Channel 通道
  • Selector 選擇器

1. Buffer 緩沖區(qū)

緩沖區(qū)本質(zhì)上是一個可以寫入數(shù)據(jù)的內(nèi)存塊(類似數(shù)組)离熏,然后可以再次讀取。此內(nèi)存塊包含在NIO Buffer對象中凤壁,該對象提供了一組方法,可以更輕松地使用內(nèi)存塊跪另。
相比較直接對數(shù)組的操作拧抖,Buffer API更加容易操作和管理

使用Buffer進行數(shù)據(jù)寫入與讀取免绿,需要進行如下四個步驟

  1. 將數(shù)據(jù)寫入緩沖區(qū)
  2. 調(diào)用buffer.flip()唧席,轉(zhuǎn)換為讀取模式
  3. 緩沖區(qū)讀取數(shù)據(jù)
  4. 調(diào)用buffer.clear() 或 buffer.compact() 清楚緩沖區(qū)
1.1 Buffer工作原理

Buffer 三個重要屬性:

  • capacity 容量:作為一個內(nèi)存塊,Buffer具有一定的固定大小嘲驾,也叫容量
  • position 位置:寫入模式時代表的是寫數(shù)據(jù)的位置淌哟。讀取模式時代表的是讀取數(shù)據(jù)的位置。
  • limit 限制:寫入模式辽故,limit等于buffer的容量徒仓。讀取模式下,limit等于寫入的數(shù)據(jù)量誊垢。


    Buffer工作原理
1.2 Buffer的例子代碼
package cn.lazyfennec.net.nio;

import java.nio.ByteBuffer;

public class BufferDemo {
    public static void main(String[] args) {
        // 構(gòu)建一個byte字節(jié)緩沖區(qū)掉弛,容量是4
        ByteBuffer byteBuffer = ByteBuffer.allocate(4);
        // 默認寫入模式,查看三個重要的指標(biāo)
        System.out.println(String.format("初始化:capacity容量:%s, position位置:%s, limit限制:%s", byteBuffer.capacity(),
                byteBuffer.position(), byteBuffer.limit()));
        // 寫入2字節(jié)的數(shù)據(jù)
        byteBuffer.put((byte) 1);
        byteBuffer.put((byte) 2);
        byteBuffer.put((byte) 3);
        // 再看數(shù)據(jù)
        System.out.println(String.format("寫入3字節(jié)后喂走,capacity容量:%s, position位置:%s, limit限制:%s", byteBuffer.capacity(),
                byteBuffer.position(), byteBuffer.limit()));

        // 轉(zhuǎn)換為讀取模式(不調(diào)用flip方法殃饿,也是可以讀取數(shù)據(jù)的,但是position記錄讀取的位置不對)
        System.out.println("#######開始讀取");
        byteBuffer.flip();
        byte a = byteBuffer.get();
        System.out.println(a);
        byte b = byteBuffer.get();
        System.out.println(b);
        System.out.println(String.format("讀取2字節(jié)數(shù)據(jù)后芋肠,capacity容量:%s, position位置:%s, limit限制:%s", byteBuffer.capacity(),
                byteBuffer.position(), byteBuffer.limit()));

        // 繼續(xù)寫入3字節(jié)壁晒,此時讀模式下,limit=3,position=2.繼續(xù)寫入只能覆蓋寫入一條數(shù)據(jù)
        // clear()方法清除整個緩沖區(qū)秒咐。compact()方法僅清除已閱讀的數(shù)據(jù)谬晕。轉(zhuǎn)為寫入模式
        byteBuffer.compact(); // buffer : 1 , 3
        byteBuffer.put((byte) 3);
        byteBuffer.put((byte) 4);
        byteBuffer.put((byte) 5);
        System.out.println(String.format("最終的情況,capacity容量:%s, position位置:%s, limit限制:%s", byteBuffer.capacity(),
                byteBuffer.position(), byteBuffer.limit()));

        // rewind() 重置position為0
        // mark() 標(biāo)記position的位置
        // reset() 重置position為上次mark()標(biāo)記的位置
    }
}

運行結(jié)果

初始化:capacity容量:4, position位置:0, limit限制:4
寫入3字節(jié)后携取,capacity容量:4, position位置:3, limit限制:4
#######開始讀取
1
2
讀取2字節(jié)數(shù)據(jù)后攒钳,capacity容量:4, position位置:2, limit限制:3
最終的情況,capacity容量:4, position位置:4, limit限制:4
1.3 ByteBuffer的內(nèi)存類型

ByteBuffer為性能關(guān)鍵型代碼提供了直接內(nèi)存(direct堆外)和非直接內(nèi)存(heap堆)兩種實現(xiàn)雷滋。堆外內(nèi)存獲取的方式: ByteBuffer directByteBuffer = ByteBuffer.allocateDirect(noBytes);

  • 好處:
    1不撑、進行網(wǎng)絡(luò)IO或者文件IO時比heapBuffer少一次拷貝。(file/socket ----OS memory ---- jvm heap)GC會移動對象內(nèi)存晤斩,在寫file或socket的過程中焕檬,JVM的實現(xiàn)中,會先把數(shù)據(jù)復(fù)制到堆外澳泵,再進行寫入实愚。
    2、GC范圍之外兔辅,降低GC壓力腊敲,但實現(xiàn)了自動管理。DirectByteBuffer中有一個Cleaner對象(PhantomReference)维苔,Cleaner被GC前會執(zhí)行clean方法碰辅,觸發(fā)DirectByteBuffer中定義的Deallocator

  • 建議:
    1、性能確實可觀的時候才去使用;分配給大型介时、長壽命没宾;(網(wǎng)絡(luò)傳輸、文件讀寫場景)
    2沸柔、通過虛擬機參數(shù)MaxDirectMemorySize限制大小榕吼,防止耗盡整個機器的內(nèi)存;

  • 使用堆外內(nèi)存的方式:

        // 構(gòu)建一個byte字節(jié)緩沖區(qū),容量是4勉失,使用堆外內(nèi)存
        ByteBuffer byteBuffer = ByteBuffer.allocateDirect(4);

2. Channel 通道
BIO和NIO的網(wǎng)絡(luò)傳輸對比
  • Channel的API涵蓋了UDP/TCP網(wǎng)絡(luò)和文件IO
  • FileChannel
  • DatagramChannel
  • SocketChannel
  • ServerSocketChannel
  • 和標(biāo)準IO Stream操作的區(qū)別:
  • 在一個通道內(nèi)進行讀取和寫入
  • stream通常是單向的(input或output)
  • 可以非阻塞讀取和寫入通道
  • 通道始終讀取或?qū)懭刖彌_區(qū)
2.1 SocketChannel
  • 用于建立TCO網(wǎng)絡(luò)連接
  • 兩種創(chuàng)建socketChannel的形式:
    1.客戶端主動發(fā)起和服務(wù)器的連接羹蚣。
    2.服務(wù)端獲取的新鏈接。
        // 客戶端主動發(fā)起連接的方式
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.configureBlocking(false); // 設(shè)置為非阻塞模式
        socketChannel.connect(new InetSocketAddress("127.0.0.1", 8080));

        socketChannel.write(byteBuffer);// 發(fā)送請求數(shù)據(jù) - 向通道寫入數(shù)據(jù)

        int bytesRead = socketChannel.read(byteBuffer); // 讀取服務(wù)端返回 - 讀取緩沖區(qū)的數(shù)據(jù)

        socketChannel.close(); // 關(guān)閉連接

write寫:write()在尚未寫入任何內(nèi)容時就可能返回了乱凿。需要在循環(huán)中調(diào)用write()顽素。
read讀: read()方法可能直接返回而根本不讀取任何數(shù)據(jù),根據(jù)返回的int值判斷讀取了多少字節(jié)徒蟆。

2.2 ServerSocketChannel

ServerSocketChannel可以監(jiān)聽新建的TCP連接通道胁出,類似ServerSocket。

        // 創(chuàng)建網(wǎng)絡(luò)服務(wù)端
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false); // 設(shè)置為非阻塞模式
        serverSocketChannel.socket().bind(new InetSocketAddress(8080)); // 綁定端口
        while (true) {
            SocketChannel socketChannel = serverSocketChannel.accept(); // 獲取新tcp連接通道
            if (socketChannel != null) {
               // tcp請求 讀取/響應(yīng)
            }
        }

serverSocketChannel.accept():如果該通道處于非阻塞模式段审,那么如果沒有掛起的連接全蝶,該方法將立即返回null。必須檢查返回的SocketChannel是否為null。

  • 代碼示例

服務(wù)端

package cn.lazyfennec.net.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;

/**
 * 直接基于非阻塞的寫法
 */
public class NIOServer {

    public static void main(String[] args) throws Exception {
        // 創(chuàng)建網(wǎng)絡(luò)服務(wù)端
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false); // 設(shè)置為非阻塞模式
        serverSocketChannel.socket().bind(new InetSocketAddress(8080)); // 綁定端口
        System.out.println("啟動成功");
        while (true) {
            SocketChannel socketChannel = serverSocketChannel.accept(); // 獲取新tcp連接通道
            // tcp請求 讀取/響應(yīng)
            if (socketChannel != null) {
                System.out.println("收到新連接 : " + socketChannel.getRemoteAddress());
                socketChannel.configureBlocking(false); // 默認是阻塞的,一定要設(shè)置為非阻塞
                try {
                    ByteBuffer requestBuffer = ByteBuffer.allocate(1024);
                    while (socketChannel.isOpen() && socketChannel.read(requestBuffer) != -1) {
                        // 長連接情況下,需要手動判斷數(shù)據(jù)有沒有讀取結(jié)束 (此處做一個簡單的判斷: 超過0字節(jié)就認為請求結(jié)束了)
                        if (requestBuffer.position() > 0) break;
                    }
                    if (requestBuffer.position() == 0) continue; // 如果沒數(shù)據(jù)了, 則不繼續(xù)后面的處理
                    requestBuffer.flip();
                    byte[] content = new byte[requestBuffer.limit()];
                    requestBuffer.get(content);
                    System.out.println(new String(content));
                    System.out.println("收到數(shù)據(jù),來自:" + socketChannel.getRemoteAddress());

                    // 響應(yīng)結(jié)果 200
                    String response = "HTTP/1.1 200 OK\r\n" +
                            "Content-Length: 11\r\n\r\n" +
                            "Hello World";
                    ByteBuffer buffer = ByteBuffer.wrap(response.getBytes());
                    while (buffer.hasRemaining()) {
                        socketChannel.write(buffer);// 非阻塞
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
        // 用到了非阻塞的API, 在設(shè)計上,和BIO可以有很大的不同.繼續(xù)改進
    }
}

客戶端

package cn.lazyfennec.net.nio;

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.Scanner;

public class NIOClient {

    public static void main(String[] args) throws Exception {
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.configureBlocking(false);
        socketChannel.connect(new InetSocketAddress("127.0.0.1", 8080));
        while (!socketChannel.finishConnect()) {
            // 沒連接上,則一直等待
            Thread.yield();
        }
        Scanner scanner = new Scanner(System.in);
        System.out.println("請輸入:");
        // 發(fā)送內(nèi)容
        String msg = scanner.nextLine();
        ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes());
        while (buffer.hasRemaining()) {
            socketChannel.write(buffer);
        }
        // 讀取響應(yīng)
        System.out.println("收到服務(wù)端響應(yīng):");
        ByteBuffer requestBuffer = ByteBuffer.allocate(1024);

        while (socketChannel.isOpen() && socketChannel.read(requestBuffer) != -1) {
            // 長連接情況下,需要手動判斷數(shù)據(jù)有沒有讀取結(jié)束 (此處做一個簡單的判斷: 超過0字節(jié)就認為請求結(jié)束了)
            if (requestBuffer.position() > 0) break;
        }
        requestBuffer.flip();
        byte[] content = new byte[requestBuffer.limit()];
        requestBuffer.get(content);
        System.out.println(new String(content));
        scanner.close();
        socketChannel.close();
    }
}

以上的Server類中抑淫,連接依舊會阻塞绷落,可以做以下調(diào)整升級:

  • 輪詢通道的方式
package cn.lazyfennec.net.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Iterator;

/**
 * 直接基于非阻塞的寫法,一個線程處理輪詢所有請求
 */
public class NIOServer1 {
    /**
     * 已經(jīng)建立連接的集合
     */
    private static ArrayList<SocketChannel> channels = new ArrayList<>();

    public static void main(String[] args) throws Exception {
        // 創(chuàng)建網(wǎng)絡(luò)服務(wù)端
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false); // 設(shè)置為非阻塞模式
        serverSocketChannel.socket().bind(new InetSocketAddress(8080)); // 綁定端口
        System.out.println("啟動成功");
        while (true) {
            SocketChannel socketChannel = serverSocketChannel.accept(); // 獲取新tcp連接通道
            // tcp請求 讀取/響應(yīng)
            if (socketChannel != null) {
                System.out.println("收到新連接 : " + socketChannel.getRemoteAddress());
                socketChannel.configureBlocking(false); // 默認是阻塞的,一定要設(shè)置為非阻塞
                channels.add(socketChannel);
            } else {
                // 沒有新連接的情況下,就去處理現(xiàn)有連接的數(shù)據(jù),處理完的就刪除掉
                Iterator<SocketChannel> iterator = channels.iterator();
                while (iterator.hasNext()) {
                    SocketChannel ch = iterator.next();
                    try {
                        ByteBuffer requestBuffer = ByteBuffer.allocate(1024);

                        if (ch.read(requestBuffer) == 0) {
                            // 等于0,代表這個通道沒有數(shù)據(jù)需要處理,那就待會再處理
                            continue;
                        }
                        while (ch.isOpen() && ch.read(requestBuffer) != -1) {
                            // 長連接情況下,需要手動判斷數(shù)據(jù)有沒有讀取結(jié)束 (此處做一個簡單的判斷: 超過0字節(jié)就認為請求結(jié)束了)
                            if (requestBuffer.position() > 0) break;
                        }
                        if (requestBuffer.position() == 0) continue; // 如果沒數(shù)據(jù)了, 則不繼續(xù)后面的處理
                        requestBuffer.flip();
                        byte[] content = new byte[requestBuffer.limit()];
                        requestBuffer.get(content);
                        System.out.println(new String(content));
                        System.out.println("收到數(shù)據(jù),來自:" + ch.getRemoteAddress());

                        // 響應(yīng)結(jié)果 200
                        String response = "HTTP/1.1 200 OK\r\n" +
                                "Content-Length: 11\r\n\r\n" +
                                "Hello World";
                        ByteBuffer buffer = ByteBuffer.wrap(response.getBytes());
                        while (buffer.hasRemaining()) {
                            ch.write(buffer);
                        }
                        iterator.remove();
                    } catch (IOException e) {
                        e.printStackTrace();
                        iterator.remove();
                    }
                }
            }
        }
        // 用到了非阻塞的API, 再設(shè)計上,和BIO可以有很大的不同
        // 問題: 輪詢通道的方式,低效,浪費CPU
    }
}
Selector 選擇器

可以檢查一個或多個NIO通道,并確定哪些通道已準備好進行讀取或?qū)懭搿?strong>實現(xiàn)了單線程管理多個通道始苇,從而管理多個網(wǎng)絡(luò)連接砌烁。

1.1 Selector監(jiān)聽多個channel的多個事件:
  • 四個事件對應(yīng)的四個SelectionKey常量:
  1. Connect連接(SelectionKey.OP_CONNECT)
  2. Accept準備就緒(OP_ACCEPT)
  3. Read讀取(OP_READ)
  4. Write 寫入(OP_WRITE)
Selector選擇器
  • 使用Selector 避免輪詢處理
package cn.lazyfennec.net.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

/**
 * 結(jié)合Selector實現(xiàn)的非阻塞服務(wù)端(放棄對channel的輪詢,借助消息通知機制)
 */
public class NIOServerV2 {

    public static void main(String[] args) throws Exception {
        // 1. 創(chuàng)建網(wǎng)絡(luò)服務(wù)端ServerSocketChannel
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false); // 設(shè)置為非阻塞模式

        // 2. 構(gòu)建一個Selector選擇器,并且將channel注冊上去
        Selector selector = Selector.open();
        SelectionKey selectionKey = serverSocketChannel.register(selector, 0, serverSocketChannel);// 將serverSocketChannel注冊到selector
        selectionKey.interestOps(SelectionKey.OP_ACCEPT); // 對serverSocketChannel上面的accept事件感興趣(serverSocketChannel只能支持accept操作)

        // 3. 綁定端口
        serverSocketChannel.socket().bind(new InetSocketAddress(8080));

        System.out.println("啟動成功");

        while (true) {
            // 不再輪詢通道,改用下面輪詢事件的方式.select方法有阻塞效果,直到有事件通知才會有返回
            selector.select();
            // 獲取事件
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            // 遍歷查詢結(jié)果e
            Iterator<SelectionKey> iter = selectionKeys.iterator();
            while (iter.hasNext()) {
                // 被封裝的查詢結(jié)果
                SelectionKey key = iter.next();
                iter.remove();
                // 關(guān)注 Read 和 Accept兩個事件
                if (key.isAcceptable()) {
                    ServerSocketChannel server = (ServerSocketChannel) key.attachment();
                    // 將拿到的客戶端連接通道,注冊到selector上面
                    SocketChannel clientSocketChannel = server.accept(); // mainReactor 輪詢accept
                    clientSocketChannel.configureBlocking(false);
                    clientSocketChannel.register(selector, SelectionKey.OP_READ, clientSocketChannel);
                    System.out.println("收到新連接 : " + clientSocketChannel.getRemoteAddress());
                }

                if (key.isReadable()) {
                    SocketChannel socketChannel = (SocketChannel) key.attachment();
                    try {
                        ByteBuffer requestBuffer = ByteBuffer.allocate(1024);
                        while (socketChannel.isOpen() && socketChannel.read(requestBuffer) != -1) {
                            // 長連接情況下,需要手動判斷數(shù)據(jù)有沒有讀取結(jié)束 (此處做一個簡單的判斷: 超過0字節(jié)就認為請求結(jié)束了)
                            if (requestBuffer.position() > 0) break;
                        }
                        if (requestBuffer.position() == 0) continue; // 如果沒數(shù)據(jù)了, 則不繼續(xù)后面的處理
                        requestBuffer.flip();
                        byte[] content = new byte[requestBuffer.limit()];
                        requestBuffer.get(content);
                        System.out.println(new String(content));
                        System.out.println("收到數(shù)據(jù),來自:" + socketChannel.getRemoteAddress());
                        // TODO 業(yè)務(wù)操作 數(shù)據(jù)庫 接口調(diào)用等等

                        // 響應(yīng)結(jié)果 200
                        String response = "HTTP/1.1 200 OK\r\n" +
                                "Content-Length: 11\r\n\r\n" +
                                "Hello World";
                        ByteBuffer buffer = ByteBuffer.wrap(response.getBytes());
                        while (buffer.hasRemaining()) {
                            socketChannel.write(buffer);
                        }
                    } catch (IOException e) {
                        // e.printStackTrace();
                        key.cancel(); // 取消事件訂閱
                    }
                }
            }
            selector.selectNow();
        }
        // 問題: 此處一個selector監(jiān)聽所有事件,一個線程處理所有請求事件. 會成為瓶頸! 要有多線程的運用
    }
}

NIO 對比 BIO

NIO 對比 BIO

NIO與多線程結(jié)合的改進方案

Doug Lea的著名文章《 Scalable l0 in Java》

image.png
  • Reactors 的方式
package com.study.hc.net.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * NIO selector 多路復(fù)用reactor線程模型
 */
public class NIOServerV3 {
    /** 處理業(yè)務(wù)操作的線程 */
    private static ExecutorService workPool = Executors.newCachedThreadPool();

    /**
     * 封裝了selector.select()等事件輪詢的代碼
     */
    abstract class ReactorThread extends Thread {

        Selector selector;
        LinkedBlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>();

        /**
         * Selector監(jiān)聽到有事件后,調(diào)用這個方法
         */
        public abstract void handler(SelectableChannel channel) throws Exception;

        private ReactorThread() throws IOException {
            selector = Selector.open();
        }

        volatile boolean running = false;

        @Override
        public void run() {
            // 輪詢Selector事件
            while (running) {
                try {
                    // 執(zhí)行隊列中的任務(wù)
                    Runnable task;
                    while ((task = taskQueue.poll()) != null) {
                        task.run();
                    }
                    selector.select(1000);

                    // 獲取查詢結(jié)果
                    Set<SelectionKey> selected = selector.selectedKeys();
                    // 遍歷查詢結(jié)果
                    Iterator<SelectionKey> iter = selected.iterator();
                    while (iter.hasNext()) {
                        // 被封裝的查詢結(jié)果
                        SelectionKey key = iter.next();
                        iter.remove();
                        int readyOps = key.readyOps();
                        // 關(guān)注 Read 和 Accept兩個事件
                        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                            try {
                                SelectableChannel channel = (SelectableChannel) key.attachment();
                                channel.configureBlocking(false);
                                handler(channel);
                                if (!channel.isOpen()) {
                                    key.cancel(); // 如果關(guān)閉了,就取消這個KEY的訂閱
                                }
                            } catch (Exception ex) {
                                key.cancel(); // 如果有異常,就取消這個KEY的訂閱
                            }
                        }
                    }
                    selector.selectNow();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }

        private SelectionKey register(SelectableChannel channel) throws Exception {
            // 為什么register要以任務(wù)提交的形式,讓reactor線程去處理催式?
            // 因為線程在執(zhí)行channel注冊到selector的過程中函喉,會和調(diào)用selector.select()方法的線程爭用同一把鎖
            // 而select()方法實在eventLoop中通過while循環(huán)調(diào)用的,爭搶的可能性很高荣月,為了讓register能更快的執(zhí)行管呵,就放到同一個線程來處理
            FutureTask<SelectionKey> futureTask = new FutureTask<>(() -> channel.register(selector, 0, channel));
            taskQueue.add(futureTask);
            return futureTask.get();
        }

        private void doStart() {
            if (!running) {
                running = true;
                start();
            }
        }
    }

    private ServerSocketChannel serverSocketChannel;
    // 1、創(chuàng)建多個線程 - accept處理reactor線程 (accept線程)
    private ReactorThread[] mainReactorThreads = new ReactorThread[1];
    // 2哺窄、創(chuàng)建多個線程 - io處理reactor線程  (I/O線程)
    private ReactorThread[] subReactorThreads = new ReactorThread[8];

    /**
     * 初始化線程組
     */
    private void newGroup() throws IOException {
        // 創(chuàng)建IO線程,負責(zé)處理客戶端連接以后socketChannel的IO讀寫
        for (int i = 0; i < subReactorThreads.length; i++) {
            subReactorThreads[i] = new ReactorThread() {
                @Override
                public void handler(SelectableChannel channel) throws IOException {
                    // work線程只負責(zé)處理IO處理捐下,不處理accept事件
                    SocketChannel ch = (SocketChannel) channel;
                    ByteBuffer requestBuffer = ByteBuffer.allocate(1024);
                    while (ch.isOpen() && ch.read(requestBuffer) != -1) {
                        // 長連接情況下,需要手動判斷數(shù)據(jù)有沒有讀取結(jié)束 (此處做一個簡單的判斷: 超過0字節(jié)就認為請求結(jié)束了)
                        if (requestBuffer.position() > 0) break;
                    }
                    if (requestBuffer.position() == 0) return; // 如果沒數(shù)據(jù)了, 則不繼續(xù)后面的處理
                    requestBuffer.flip();
                    byte[] content = new byte[requestBuffer.limit()];
                    requestBuffer.get(content);
                    System.out.println(new String(content));
                    System.out.println(Thread.currentThread().getName() + "收到數(shù)據(jù),來自:" + ch.getRemoteAddress());

                    // TODO 業(yè)務(wù)操作 數(shù)據(jù)庫、接口...
                    workPool.submit(() -> {
                    });

                    // 響應(yīng)結(jié)果 200
                    String response = "HTTP/1.1 200 OK\r\n" +
                            "Content-Length: 11\r\n\r\n" +
                            "Hello World";
                    ByteBuffer buffer = ByteBuffer.wrap(response.getBytes());
                    while (buffer.hasRemaining()) {
                        ch.write(buffer);
                    }
                }
            };
        }

               // 創(chuàng)建mainReactor線程, 只負責(zé)處理serverSocketChannel
        for (int i = 0; i < mainReactorThreads.length; i++) {
            mainReactorThreads[i] = new ReactorThread() {
                AtomicInteger incr = new AtomicInteger(0);

                @Override
                public void handler(SelectableChannel channel) throws Exception {
                    // 只做請求分發(fā)堂氯,不做具體的數(shù)據(jù)讀取
                    ServerSocketChannel ch = (ServerSocketChannel) channel;
                    SocketChannel socketChannel = ch.accept();
                    socketChannel.configureBlocking(false);
                    // 收到連接建立的通知之后,分發(fā)給I/O線程繼續(xù)去讀取數(shù)據(jù)
                    int index = incr.getAndIncrement() % subReactorThreads.length;
                    ReactorThread workEventLoop = subReactorThreads[index];
                    workEventLoop.doStart();
                    SelectionKey selectionKey = workEventLoop.register(socketChannel);
                    selectionKey.interestOps(SelectionKey.OP_READ);
                    System.out.println(Thread.currentThread().getName() + "收到新連接 : " + socketChannel.getRemoteAddress());
                }
            };
        }


    }

    /**
     * 初始化channel,并且綁定一個eventLoop線程
     *
     * @throws IOException IO異常
     */
    private void initAndRegister() throws Exception {
        // 1牌废、 創(chuàng)建ServerSocketChannel
        serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false);
        // 2咽白、 將serverSocketChannel注冊到selector
        int index = new Random().nextInt(mainReactorThreads.length);
        mainReactorThreads[index].doStart();
        SelectionKey selectionKey = mainReactorThreads[index].register(serverSocketChannel);
        selectionKey.interestOps(SelectionKey.OP_ACCEPT);
    }

    /**
     * 綁定端口
     *
     * @throws IOException IO異常
     */
    private void bind() throws IOException {
        //  1、 正式綁定端口鸟缕,對外服務(wù)
        serverSocketChannel.bind(new InetSocketAddress(8080));
        System.out.println("啟動完成晶框,端口8080");
    }

    public static void main(String[] args) throws Exception {
        NIOServerV3 nioServerV3 = new NIOServerV3();
        nioServerV3.newGroup(); // 1、 創(chuàng)建main和sub兩組線程
        nioServerV3.initAndRegister(); // 2懂从、 創(chuàng)建serverSocketChannel授段,注冊到mainReactor線程上的selector上
        nioServerV3.bind(); // 3、 為serverSocketChannel綁定端口
    }
}

如果覺得有收獲就點個贊吧番甩,更多知識侵贵,請點擊關(guān)注查看我的主頁信息哦~

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市缘薛,隨后出現(xiàn)的幾起案子窍育,更是在濱河造成了極大的恐慌,老刑警劉巖宴胧,帶你破解...
    沈念sama閱讀 218,858評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件漱抓,死亡現(xiàn)場離奇詭異,居然都是意外死亡恕齐,警方通過查閱死者的電腦和手機乞娄,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,372評論 3 395
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人仪或,你說我怎么就攤上這事确镊。” “怎么了溶其?”我有些...
    開封第一講書人閱讀 165,282評論 0 356
  • 文/不壞的土叔 我叫張陵骚腥,是天一觀的道長。 經(jīng)常有香客問我瓶逃,道長束铭,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,842評論 1 295
  • 正文 為了忘掉前任厢绝,我火速辦了婚禮契沫,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘昔汉。我一直安慰自己懈万,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 67,857評論 6 392
  • 文/花漫 我一把揭開白布靶病。 她就那樣靜靜地躺著会通,像睡著了一般。 火紅的嫁衣襯著肌膚如雪娄周。 梳的紋絲不亂的頭發(fā)上涕侈,一...
    開封第一講書人閱讀 51,679評論 1 305
  • 那天,我揣著相機與錄音煤辨,去河邊找鬼裳涛。 笑死,一個胖子當(dāng)著我的面吹牛众辨,可吹牛的內(nèi)容都是我干的端三。 我是一名探鬼主播,決...
    沈念sama閱讀 40,406評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼鹃彻,長吁一口氣:“原來是場噩夢啊……” “哼郊闯!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起蛛株,我...
    開封第一講書人閱讀 39,311評論 0 276
  • 序言:老撾萬榮一對情侶失蹤虚婿,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后泳挥,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體然痊,經(jīng)...
    沈念sama閱讀 45,767評論 1 315
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,945評論 3 336
  • 正文 我和宋清朗相戀三年屉符,在試婚紗的時候發(fā)現(xiàn)自己被綠了剧浸。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片锹引。...
    茶點故事閱讀 40,090評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖唆香,靈堂內(nèi)的尸體忽然破棺而出嫌变,到底是詐尸還是另有隱情,我是刑警寧澤躬它,帶...
    沈念sama閱讀 35,785評論 5 346
  • 正文 年R本政府宣布腾啥,位于F島的核電站,受9級特大地震影響冯吓,放射性物質(zhì)發(fā)生泄漏倘待。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,420評論 3 331
  • 文/蒙蒙 一组贺、第九天 我趴在偏房一處隱蔽的房頂上張望凸舵。 院中可真熱鬧,春花似錦失尖、人聲如沸啊奄。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,988評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽菇夸。三九已至,卻和暖如春仪吧,著一層夾襖步出監(jiān)牢的瞬間庄新,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,101評論 1 271
  • 我被黑心中介騙來泰國打工邑商, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留摄咆,地道東北人凡蚜。 一個月前我還...
    沈念sama閱讀 48,298評論 3 372
  • 正文 我出身青樓人断,卻偏偏與公主長得像,于是被迫代替她去往敵國和親朝蜘。 傳聞我的和親對象是個殘疾皇子恶迈,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,033評論 2 355