Java Socket IO演進(jìn)(一)-BIO/NIO/AIO

1. 概覽

Java中主要有三種IO模型烁设,分別是同步阻塞IO(BIO)、同步非阻塞IO(NIO)、異步非阻塞IO(AIO)。

Java IO的演進(jìn)宏多,其實(shí)是依賴于操作系統(tǒng)的IO操作接口實(shí)現(xiàn)。比如在Linux 2.6以后澡罚,Java中NIO和AIO都是通過(guò)epoll來(lái)實(shí)現(xiàn)的伸但;而在Windows上,AIO是通過(guò)IOCP來(lái)實(shí)現(xiàn)的留搔。

Linux(UNIX)操作系統(tǒng)中共有五種IO模型砌烁,分別是:阻塞IO模型、非阻塞IO模型催式、IO復(fù)用模型函喉、信號(hào)驅(qū)動(dòng)IO模型、異步IO模型荣月。

回顧
同步與異步

  • 同步:發(fā)起一個(gè)調(diào)用后管呵,被調(diào)用者未處理完請(qǐng)求前,不向調(diào)用者返回響應(yīng)哺窄。
  • 異步:發(fā)起一個(gè)調(diào)用后捐下,被調(diào)用者收到請(qǐng)求后立刻向調(diào)用者回應(yīng)已接收到請(qǐng)求账锹,但是被調(diào)用者并沒(méi)有返回結(jié)果,此時(shí)調(diào)用者可以處理其他操作坷襟,被調(diào)用者通常依靠事件奸柬,回調(diào)等機(jī)制來(lái)通知調(diào)用者其返回結(jié)果。

阻塞和非阻塞

  • 阻塞:發(fā)起一個(gè)請(qǐng)求后婴程,調(diào)用者一直等待請(qǐng)求結(jié)果返回廓奕,也就是當(dāng)前線程會(huì)被掛起,無(wú)法從事其他任務(wù)档叔,只有當(dāng)條件就緒才能繼續(xù)桌粉。
  • 非阻塞:發(fā)起一個(gè)請(qǐng)求后,調(diào)用者不用一直等著結(jié)果返回衙四,可以先去干其他事情铃肯。

這里介紹Java BIO/NIO/AIO 是結(jié)合Socket網(wǎng)絡(luò)I/O而談,位于 java.net.* 包下传蹈。BIO 使用 java.io.* 包下阻塞IO押逼;NIO使用 java.nio.* 包下非阻塞IO;AIO也就是NIO2.0版惦界,是在NIO基礎(chǔ)上提供異步支持挑格。

NIO 是 JDK1.4 提供的API。nio中n有兩層含義:

  • new: 表示新的io接口
  • Non-blocking: 非阻塞

AIO 是 JDK1.7 在 java.nio.* 包下提供的API

2. BIO

Blocking I/O表锻,同步阻塞I/O模式,數(shù)據(jù)的讀取寫入必須阻塞在一個(gè)線程內(nèi)等待其完成乞娄。
我們常說(shuō)的I/O一般都是指BIO瞬逊。

2.1. BIO 基本模型

BIO基本模型.png

2.1.1. 特點(diǎn)描述

Socket 的連接(accept()),數(shù)據(jù)的讀寫(read()/write())仪或,都是阻塞的确镊。請(qǐng)求一旦建立連接,就無(wú)法再接收其他連接范删。

2.1.2. 代碼示例

public class SocketIO {

    public static void main(String[] args) throws Exception {
        ServerSocket server = new ServerSocket(9090); // #1

        System.out.println("1. new ServerSocket(9090)");

        Socket client = server.accept(); // 阻塞1 // #2

        System.out.println("2. client connect\t" + client.getPort());

        InputStream in = client.getInputStream();
        BufferedReader reader = new BufferedReader(new InputStreamReader(in));
        System.out.println(reader.readLine()); // 阻塞2 // #3

        while (true) {
        }
    }
}

上述代碼中蕾域,有兩處阻塞,分別是 server.accept()reader.readLine()到旦。

使用 strace -ff -o outlog java SocketIO 命令查看JVM調(diào)用了內(nèi)核(kernel)的方法過(guò)程

#1 位置 new ServerSocket(9090)旨巷,調(diào)用了內(nèi)核:

socket(···) = 5  # 創(chuàng)建一個(gè)未連接的Socket,5是個(gè)fd(文件描述符)指向這個(gè)socket
bind(5, 9090, ···)  # 綁定9090端口到socket上
listen(5, ···)  # 把這個(gè)socket監(jiān)聽(tīng)起來(lái)

#2 位置 server.accept()添忘,調(diào)用了內(nèi)核:

poll([{fd=5, ···}], 1, -1) = 1 ([{fd=5, revents=POLLIN}])  # 等待請(qǐng)求過(guò)來(lái)連接采呐。如果沒(méi)有請(qǐng)求將阻塞,-1表示無(wú)限阻塞搁骑;有請(qǐng)求時(shí)斧吐,5fd退出監(jiān)聽(tīng)交給accept建立連接
accept(5, {客戶端信息}, [28]) = 6  # 從5fd中創(chuàng)建一個(gè)新的socket又固,連接client和server,釋放5fd煤率,讓其繼續(xù)處于監(jiān)聽(tīng)狀態(tài)仰冠,等待下一個(gè)連接
# 如果使用jdk1.4之前的版本運(yùn)行,只調(diào)用了內(nèi)核的 accept 方法蝶糯,接收連接阻塞在此處

#3 位置 server.accept()洋只,調(diào)用了內(nèi)核:

recvfrom(6, "hahahaha\n", 8192, 0, NULL, NULL) = 9  # 從6fd這個(gè)socket上讀取接收到的數(shù)據(jù),如果socket上沒(méi)有消息裳涛,將在此阻塞
# 如果使用jdk1.4之前的版本運(yùn)行木张,調(diào)用了內(nèi)核的 recv 方法,讀數(shù)據(jù)阻塞在此處

2.1.3. 問(wèn)題瓶頸

主要問(wèn)題突出點(diǎn):

  1. 接收客戶端連接阻塞
  2. 讀取/寫入數(shù)據(jù)阻塞
  3. 只能接收一個(gè)連接

2.2. BIO 傳統(tǒng)模型

BIO服務(wù)通信模型.png

2.2.1. 特點(diǎn)描述

服務(wù)端使用一個(gè)Acceptor線程端三,用于監(jiān)聽(tīng)接收請(qǐng)求舷礼,收到請(qǐng)求后,創(chuàng)建Socket和一個(gè)新線程來(lái)服務(wù)客戶端郊闯。
這種模型特點(diǎn)是請(qǐng)求數(shù)與線程數(shù)1:1

2.2.2. 代碼示例

public class SocketIO2 {

    public static void main(String[] args) throws Exception {
        ServerSocket server = new ServerSocket(9090);

        System.out.println("1. new ServerSocket(9090)");

        while (true) {
            Socket client = server.accept(); // 阻塞1
            System.out.println("2. client connect\t" + client.getPort());
            Thread thread = new Thread(() -> {
                try {
                    InputStream in = client.getInputStream();
                    BufferedReader reader = new BufferedReader(new InputStreamReader(in));
                    String line;
                    while ((line = reader.readLine()) != null) {
                        System.out.println(line); // 阻塞2
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
                System.out.println("client closed\t" + client.getPort());
            });
            thread.start();
        }
    }
}

上述代碼妻献,可以實(shí)現(xiàn)多個(gè)客戶都安連接,但是還存在兩處阻塞团赁。一個(gè)連接育拨,占用一個(gè)線程。

2.1.3. 問(wèn)題瓶頸

主要問(wèn)題突出點(diǎn):

  1. 接收客戶端連接阻塞
  2. 讀取/寫入數(shù)據(jù)阻塞
  3. 服務(wù)器不能連接過(guò)多請(qǐng)求(C10K問(wèn)題)

2.3. BIO 偽異步模型

BIO偽異步模型.png

2.3.1. 特點(diǎn)描述

服務(wù)端的連接響應(yīng)欢摄,使用線程池管理熬丧。可實(shí)現(xiàn)請(qǐng)求數(shù)與線程m:n(m可以大于n)

服務(wù)端性能的優(yōu)化怀挠,取決于線程池的優(yōu)化

2.3.2. 代碼示例

public class SocketIO3 {
    private static int THREAD_POOL_SIZE = 1;
    private static ThreadPoolExecutor THREAD_POOL = getThreadPoolExecutor();

    private static ThreadPoolExecutor getThreadPoolExecutor() {  // 服務(wù)性能的優(yōu)化析蝴,取決于線程池的優(yōu)化
        ThreadFactory threadFactory = new ThreadFactory() {
            private final AtomicInteger mThreadNum = new AtomicInteger(1);
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "server-thread-" + mThreadNum.getAndIncrement());
            }
        };
        return new ThreadPoolExecutor(THREAD_POOL_SIZE, THREAD_POOL_SIZE, 0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(THREAD_POOL_SIZE), threadFactory);
    }

    public static void main(String[] args) throws Exception {
        ServerSocket server = new ServerSocket(9090);
        System.out.println("1. new ServerSocket(9090)");

        while (true) {
            Socket client = server.accept(); // 阻塞1
            System.out.println("2. client connect\t" + client.getPort());

            try {
                THREAD_POOL.execute(() -> responseHandler(client)); // 響應(yīng)處理
            } catch (RejectedExecutionException e) {
                e.printStackTrace();
                rejectedHandler(client);    // 線程池已滿,拒絕處理
            }
        }
    }

    private static void responseHandler(Socket client) {
        try {
            InputStream in = client.getInputStream();
            BufferedReader reader = new BufferedReader(new InputStreamReader(in));
            String line;
            while ((line = reader.readLine()) != null) {
                System.out.println(line); // 阻塞2
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        System.out.println("client closed\t" + client.getPort());
    }

    private static void rejectedHandler(Socket client) throws Exception {
        OutputStream out = client.getOutputStream();
        BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(out));
        writer.write("請(qǐng)求已達(dá)上線");
        writer.close();
        client.close();
    }
}

2.3.3. 問(wèn)題瓶頸

主要問(wèn)題突出點(diǎn):

  1. 接收客戶端連接阻塞
  2. 讀取/寫入數(shù)據(jù)阻塞
  3. 服務(wù)器不能連接過(guò)多請(qǐng)求(比 2.2 會(huì)好一點(diǎn))

2.4 總結(jié)

BIO從底層看绿淋,如果不能從底層根本上結(jié)果阻塞問(wèn)題闷畸,其性能問(wèn)題就不能得到有效解決。這種模型單一簡(jiǎn)單吞滞,適用于連接不多的場(chǎng)景佑菩。

3. NIO

  • Non-blocking: NIO 流是非阻塞 IO 而 IO 流是阻塞 IO,非阻塞體現(xiàn)在 socket網(wǎng)絡(luò)裁赠,內(nèi)核機(jī)制上殿漠。
  • new: NIO 有3個(gè)核心組件/特性:Channel、Buffer佩捞、Selector凸舵,體現(xiàn)在JDK上。

3.1. NIO 基本模型

3.1.1. 特點(diǎn)描述

Java中非阻塞IO失尖,JDK中可通過(guò) sockeChannel.configureBlocking(false)accept()read()/write() 設(shè)置成非阻塞啊奄,調(diào)用系統(tǒng)內(nèi)核時(shí)渐苏,如果沒(méi)有連接/數(shù)據(jù)讀寫,就返回-1菇夸。

SocketChannel 中提供數(shù)據(jù)的流入流出兩個(gè)通道琼富。數(shù)據(jù)讀寫時(shí),先直接由操作系統(tǒng)將數(shù)據(jù)放入Buffer緩沖區(qū)中庄新。JVM中Server對(duì)數(shù)據(jù)的讀寫直接訪問(wèn)Byffer緩沖區(qū)鞠眉。

NIO數(shù)據(jù)讀寫模型.png

Channel 與 Buffer 搭配

  • 多個(gè)channel配一個(gè)buffer
  • 一個(gè)channel配一個(gè)buffer(使用最多配置)
  • 一個(gè)channel配兩個(gè)buffer(一個(gè)讀一個(gè)寫)

Buffer 緩沖區(qū)創(chuàng)建位置

  • JVM內(nèi)(堆內(nèi))
  • JVM外(堆外,由OS管理分配)

服務(wù)器與外部收發(fā)數(shù)據(jù)择诈,首先由OS管理械蹋,由系統(tǒng)將網(wǎng)卡數(shù)據(jù)放到對(duì)外,如果JVM想要使用羞芍,需要將數(shù)據(jù)拷貝到JVM內(nèi)部哗戈,存在JVM內(nèi)外來(lái)回拷貝,影響性能還浪費(fèi)空間荷科。
Linux內(nèi)核支持共享空間(mmap)唯咬,可以將網(wǎng)卡數(shù)據(jù)直接放到共享空間內(nèi)存,這樣JVM和OS就可以共同使用畏浆。Java中的 零拷貝 就是基于此胆胰。

ByteBuffer 的創(chuàng)建位置,和 channel 的搭配方式根據(jù)實(shí)際應(yīng)用場(chǎng)景靈活選擇

3.1.2. 代碼示例

public class SocketNIO {

    private static final String CHARSET = "UTF-8";

    public static void main(String[] args) throws Exception {
        List<SocketChannel> clientList = new LinkedList<>();

        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.bind(new InetSocketAddress(9090)); // 完成端口綁定刻获,socket監(jiān)聽(tīng)
        ssc.configureBlocking(false); // false指定該ServerSocketChannel為非阻塞 // #1
        System.out.println("1. ServerSocketChannel 9090 non-blocking");

        while (true) {
            Thread.sleep(1000);
            SocketChannel client = ssc.accept(); // 接受連接非阻塞
            if (client == null) {
                System.out.print(".");
            } else {
                client.configureBlocking(false); // #2
                System.out.println();
                System.out.println("2. client connect\t" + client.socket().getPort());
                clientList.add(client);
            }

            ByteBuffer buffer = ByteBuffer.allocateDirect(1024);  // 配置緩沖區(qū)蜀涨,可以堆內(nèi),可以堆外(allocate())

            for (SocketChannel sc : clientList) {
                int len = sc.read(buffer); // 讀取數(shù)據(jù)蝎毡,非阻塞
                if (len > 0) {
                    buffer.flip();
                    byte[] bytes = new byte[buffer.limit()];
                    buffer.get(bytes);
                    buffer.clear();

                    String data = new String(bytes, CHARSET);
                    System.out.println();
                    System.out.println(String.format("received data[%-5d]: %s", sc.socket().getPort(), data));
                }
            }
        }
    }
}

#1 位置厚柳,讓服務(wù)端接后客戶端連接時(shí) ssc.accept() 非阻塞
#2 位置,與客戶端連接的Socket讀/寫數(shù)據(jù)時(shí) sc.read(buffer) 非阻塞

運(yùn)行代碼顶掉,JVM調(diào)用了內(nèi)核一下方法

socket(···) = 6
bind(6, 9090, ···) 
listen(6, ···)
# 以上三步同BIO一樣

# ssc.configureBlocking(false);
fcntl(6, F_SETFL, O_RDWR|O_NONBLOCK) = 0  # 設(shè)置6fd為非阻塞

# accept 不再阻塞草娜。沒(méi)有連接時(shí)挑胸,返回-1痒筒;有連接時(shí),返回建立好的連接
accept(6, xxx, xxx) = -1 EAGAIN (Resource temporarily unavailable)
accept(6, {客戶端信息}, [28]) = 7

# read 讀取數(shù)據(jù)不在阻塞茬贵。channel沒(méi)有數(shù)據(jù)返回-1
read(7, 0x7f92b428e590, 1024) = -1 EAGAIN (Resource temporarily unavailable)
read(7, "hello\n", 1024) = 6

3.1.3. 問(wèn)題瓶頸

雖然NIO接收客戶端連接和讀/寫數(shù)據(jù)都不在阻塞簿透,但是上面的額連接、讀寫都在一個(gè)線程解藻,操作是串行化的老充。高并發(fā)下,后面的讀寫操作降低了可接入連接的性能螟左》茸牵可將客戶端連接由一個(gè)線程處理觅够,數(shù)據(jù)讀寫由兩一個(gè)線程處理。

資源浪費(fèi):

  1. while(true) 中巷嚣,每次都要詢問(wèn)/調(diào)用 accept() 喘先,看是否由客戶端請(qǐng)求接入。
  2. 如果接入了 1w/100w+ 個(gè)連接廷粒,每次都需要把所有連接 read() 一下看是否有數(shù)據(jù)過(guò)來(lái)(復(fù)雜度O(n))窘拯。

3.2. NIO 多路復(fù)用模型

3.2.1. 特點(diǎn)描述

Selector 多路復(fù)用器,JDK中使用Selector對(duì)象來(lái)注冊(cè)/管理Channel(包含Server和Client)坝茎,當(dāng)有事件觸發(fā)時(shí)涤姊,使用 selector.select()selector.selectedKeys() 來(lái)獲取有事件觸發(fā)的Channel,我們根據(jù)事件類型來(lái)做相應(yīng)處理嗤放。
注意Selector獲取的是一個(gè)Channel狀態(tài)思喊,數(shù)據(jù)的讀/寫還是需要用戶自己觸發(fā),即讀寫過(guò)程依然是同步斤吐。

Selector 的實(shí)現(xiàn)依賴內(nèi)核支持搔涝,如:select/poll/epoll等

select/poll: 每次詢問(wèn)是否有事件到達(dá),需要傳入所有socket fd和措,只是所有fd的循環(huán)遍歷交給了內(nèi)存來(lái)完成(JDK不再干這個(gè)事)庄呈。
epoll: 先創(chuàng)建一個(gè)Selector fd,然后所有socket都添加到這個(gè)fd中派阱,只添加一次即可诬留。由這個(gè)fd管理所有socket的事件。

3.2.2. 代碼示例

ServerSocketChannel 的有效事件為 OP_ACCEPT贫母。
SocketChannel 的有效事件為 OP_CONNECT文兑、OP_READOP_WRITE

/**
 * NIO腺劣,單線程多路復(fù)用
 */
public class SocketNIOMultiplexing {

    private ServerSocketChannel server;
    private Selector selector;  // 多路復(fù)用器

    public void initServer() throws IOException {
        server = ServerSocketChannel.open();
        server.bind(new InetSocketAddress(9090));
        server.configureBlocking(false); // accept非阻塞
        selector = Selector.open();
        server.register(selector, SelectionKey.OP_ACCEPT); // 將ServerSocketChannel注冊(cè)到Selector中绿贞,注冊(cè)過(guò)程創(chuàng)建SelectionKey,表示Channel向Selector中注冊(cè)的token
        System.out.println("1. Server started [ServerSocketChannel Selector 9090]");
    }

    public void start() throws IOException {
        initServer();
        while (true) {
            if (selector.select(0) < 1) { // 0表示每次詢問(wèn)不阻塞橘原,立即返回籍铁;非0表示阻塞時(shí)間
                continue;
            }
            Set<SelectionKey> selectionKeys = selector.selectedKeys(); // 取出有效的key
            System.out.println("2. selector 中有事件狀態(tài)進(jìn)來(lái) - size=" + selectionKeys.size());
            for (Iterator<SelectionKey> it = selectionKeys.iterator(); it.hasNext(); it.remove()) { // 注意,這里key處理后要從keys中remove趾断,不然下次會(huì)會(huì)獲取沒(méi)有事件key拒名,poll進(jìn)入死循環(huán)
                SelectionKey selectionKey = it.next();
                if (selectionKey.isAcceptable()) { // selectionKey中持用的Socket狀態(tài)是可連接
                    acceptHandler(selectionKey); // 從socket(Server)中建立連接
                } else if (selectionKey.isReadable()) { // selectionKey中持用的Socket狀態(tài)是可讀
                    readHandler(selectionKey); // 從socket(Client)中讀取數(shù)據(jù)
                }
            }
        }
    }

    private void acceptHandler(SelectionKey selectionKey) {
        ServerSocketChannel ssc = (ServerSocketChannel) selectionKey.channel();
        System.out.println("3.1 接受連接");
        try {
            SocketChannel client = ssc.accept();
            client.configureBlocking(false); // 非阻塞
            ByteBuffer buffer = ByteBuffer.allocate(1024); // 為該client配置一個(gè)buffer,堆內(nèi)
            client.register(selector, SelectionKey.OP_READ, buffer); // 將SocketChannel注冊(cè)到Selector中
            System.out.println("一個(gè)新客戶端連接: client=" + client.getRemoteAddress());
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private void readHandler(SelectionKey selectionKey) {
        SocketChannel sc = (SocketChannel) selectionKey.channel();
        ByteBuffer buffer = (ByteBuffer) selectionKey.attachment();

        System.out.println("3.2 讀取數(shù)據(jù)");

        buffer.clear(); // 讀之前默認(rèn)清空一次buffer
        try {
            while (true) {
                int len = sc.read(buffer); // 讀到數(shù)據(jù)返回?cái)?shù)據(jù)字節(jié)長(zhǎng)度芋酌;流沒(méi)有結(jié)束增显,沒(méi)有數(shù)據(jù)返回0,流結(jié)束返回-1
                if (len > 0) {
                    buffer.flip();
                    while (buffer.hasRemaining()) {
                        sc.write(buffer); // 把數(shù)據(jù)直接返回去
                    }
                    buffer.clear();
                } else if (len == 0) {
                    break;
                } else { // -1  注意bug脐帝,當(dāng)tcp連接處于 close_wait 時(shí)同云,selectionKey.isReadable()返回true糖权,這里出現(xiàn)死循環(huán),cpu飆高
                    sc.close(); // 去掉bug炸站,這里client close温兼,主要是使用 key.cancel(); 從多路復(fù)用器的key集合中移除
                    break;
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws IOException {
        SocketNIOMultiplexing server = new SocketNIOMultiplexing();
        server.start();
    }
}

運(yùn)行代碼,JVM調(diào)用了內(nèi)核一下方法

socket(···) = 6
bind(6, 9090, ···) 
listen(6, ···)
# 以上三步同BIO一樣

fcntl(6, F_SETFL, O_RDWR|O_NONBLOCK) = 0  # 設(shè)置6fd非阻塞

pipe([7, 8])  # 創(chuàng)建一個(gè)channel武契,7fd表示管道的讀取端募判,8fd表示管道的寫端
fcntl(7, F_SETFL, O_RDONLY|O_NONBLOCK) = 0  # 設(shè)置7fd只讀端口非阻塞
fcntl(8, F_SETFL, O_WRONLY|O_NONBLOCK) = 0  # 設(shè)置8fd只寫端口非阻塞

# 創(chuàng)建 Selector 實(shí)例
epoll_create(256) = 9  # 創(chuàng)建一個(gè)新的epoll實(shí)例9fd

# channel.register(selector)
epoll_ctl(9, EPOLL_CTL_ADD, 7, ···) = 0  # 將7fd注冊(cè)到9fd上,并將事件連接到7fd上
epoll_wait(9, ···)  # 阻塞咒唆,9fd上等待事件到來(lái)觸發(fā)

# client連接
rt_sigaction(SIGRT_30, ···, 8) = 0  # 信號(hào)安裝登記
accept(6, {客戶端信息}, [28]) = 10

3.2.3. 問(wèn)題瓶頸

上面的模型不需要每次詢問(wèn) accept() 和所有client的 read()届垫,每次只需要詢問(wèn)一次多路復(fù)用器Selector即可,顯然復(fù)雜度降為O(1)全释,解決了資源浪費(fèi)問(wèn)題装处。
但是Selector詢問(wèn)、Channel客戶端連接浸船、數(shù)據(jù)讀寫依然串行化妄迁。可使用多線將這個(gè)三者分開李命,Netty的模型就是基于此設(shè)計(jì)登淘。

3.3. NIO 多路復(fù)用模型-多線程

3.3.1. 特點(diǎn)描述

3.2 中的問(wèn)題,進(jìn)行多線程分開封字。boss線程接收連接請(qǐng)求后黔州,快速記錄,交給worker線程阔籽,worker線程負(fù)責(zé)耗時(shí)的accept和數(shù)據(jù)的讀寫操作流妻。(這里就有些netty的味道了)

3.3.2. 代碼示例

/**
 * NIO,多路復(fù)用笆制,多線程
 */
public class SocketNioMultiplexing2 {

    private static final String CHARSET = "UTF-8";

    private ServerSocketChannel server;
    private NioThread boss;
    private NioThread[] workers;

    static class NioThread extends Thread {

        private static volatile int workerNum = 0; // worker線程數(shù)量
        private static BlockingQueue<SocketChannel>[] workerQueues; // SocketChannel隊(duì)列绅这,長(zhǎng)度=workerNum
        private static AtomicInteger workerBalance = new AtomicInteger();

        private Selector selector; // 多路復(fù)用器
        private Integer tid; // 線程ID,null表示boss線程

        public void config(Selector selector, Integer tid) {
            this.selector = selector;
            this.tid = tid;
            if (tid == null) {
                System.out.println("boss thread ready");
                this.setName("boss");
            } else {
                System.out.println("worker-" + tid + " thread ready");
                this.setName("worker-" + tid);
            }
        }

        @Override
        public void run() {
            System.out.println(">> " + Thread.currentThread().getName() + " start");
            try {
                if (tid == null) {
                    bossLoop();
                } else {
                    workerLoop();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        private void bossLoop() throws IOException {
            while (true) {
                if (selector.select(10) < 1) { // 詢問(wèn)是否有事件到達(dá)在辆,最多阻塞10ms
                    continue;
                }
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                System.out.println("2. boss-selector 中有請(qǐng)求事件狀態(tài)進(jìn)來(lái) - size=" + selectionKeys.size());
                for (Iterator<SelectionKey> it = selectionKeys.iterator(); it.hasNext(); it.remove()) {
                    SelectionKey selectionKey = it.next();
                    if (!selectionKey.isAcceptable()) {
                        continue;
                    }
                    ServerSocketChannel ssc = (ServerSocketChannel) selectionKey.channel();

                    try {
                        SocketChannel client = ssc.accept();
                        client.configureBlocking(false);
                        int tid = workerBalance.getAndIncrement() % workerNum; // 負(fù)載均衡
                        workerQueues[tid].add(client);
                        System.out.println("3. 連接請(qǐng)求加入隊(duì)列-" + tid);
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }

        private void workerLoop() throws IOException {
            while (true) {
                acceptHandler(workerQueues[tid]); // 建立連接

                if (selector.select(10) < 1) {
                    continue;
                }
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                System.out.println("2. worker-selector 中有讀取事件狀態(tài)進(jìn)來(lái) - size=" + selectionKeys.size());
                for (Iterator<SelectionKey> it = selectionKeys.iterator(); it.hasNext(); it.remove()) {
                    SelectionKey selectionKey = it.next();
                    if (selectionKey.isReadable()) {
                        readHandler(selectionKey); // 讀取數(shù)據(jù)
                    }
                }
            }
        }

        private void acceptHandler(BlockingQueue<SocketChannel> queue) throws IOException {
            SocketChannel client = queue.poll();
            if (client == null) {
                return;
            }
            // 建立Channel連接证薇,配置緩沖器
            ByteBuffer buffer = ByteBuffer.allocate(1024); // 字節(jié)對(duì)齊?
            client.register(this.selector, SelectionKey.OP_READ, buffer);
            System.out.println("4. 一個(gè)新客戶端連接: client=" + client.getRemoteAddress());
        }

        private void readHandler(SelectionKey selectionKey) {
            SocketChannel sc = (SocketChannel) selectionKey.channel();
            ByteBuffer buffer = (ByteBuffer) selectionKey.attachment();

            System.out.println("3 讀取數(shù)據(jù)");

            buffer.clear(); // 讀之前默認(rèn)清空一次buffer
            try {
                while (true) {
                    int len = sc.read(buffer);
                    if (len > 0) {
                        buffer.flip();
                        while (buffer.hasRemaining()) { // Echo應(yīng)答
                            ByteBuffer prefix = ByteBuffer.allocate(20);
                            prefix.put("Echo:".getBytes(CHARSET));
                            prefix.flip();
                            sc.write(new ByteBuffer[]{prefix, buffer});
                        }
                        buffer.clear();
                    } else if (len == 0) {
                        break;
                    } else {
                        sc.close();
                        break;
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

    }

    public SocketNioMultiplexing2 init(NioThread boss, NioThread[] workers) throws IOException {
        this.boss = boss;
        this.workers = workers;

        server = ServerSocketChannel.open();
        server.configureBlocking(false);

        Selector boosSelector = Selector.open();
        boss.config(boosSelector, null);
        server.register(boosSelector, SelectionKey.OP_ACCEPT);

        int workerNum = workers.length;
        NioThread.workerNum = workerNum;
        NioThread.workerQueues = new LinkedBlockingQueue[workerNum];
        for (int i = 0; i < workerNum; i++) {
            workers[i] = new NioThread();
            workers[i].config(Selector.open(), i); // worker 線程开缎,指定線程ID
            NioThread.workerQueues[i] = new LinkedBlockingQueue<>();
        }

        return this;
    }

    public SocketNioMultiplexing2 port(int port) throws IOException {
        server.bind(new InetSocketAddress(port));
        return this;
    }

    public void start() throws IOException {
        boss.start();
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        for (NioThread worker : workers) {
            worker.start();
        }
        System.out.println(String.format("1. Server started [boss: %s, workers: %d]", server.getLocalAddress().toString(), workers.length));
    }

    public static void main(String[] args) throws IOException {
        NioThread boss = new NioThread(); // boss 線程
        NioThread[] workers = new NioThread[2]; // worker 線程
        SocketNioMultiplexing2 server = new SocketNioMultiplexing2();
        server.init(boss, workers)
        .port(9090)
        .start();
    }
}

4. AIO

Asynchronous I/O棕叫,異步非阻塞I/O模型林螃。這里的 異步 是基于事件和回調(diào)機(jī)制實(shí)現(xiàn)的奕删,也就是應(yīng)用操作之后會(huì)直接返回,不會(huì)堵塞在那里疗认,當(dāng)后臺(tái)處理完成完残,操作系統(tǒng)會(huì)通知相應(yīng)的線程進(jìn)行后續(xù)的操作伏钠。

在大多數(shù)業(yè)務(wù)場(chǎng)景中,我們往往在讀/寫數(shù)據(jù)時(shí)需要阻塞谨设,獲取讀取到的數(shù)據(jù)/寫入數(shù)據(jù)狀態(tài)等熟掂。

目前來(lái)看,Linux上 AIO 的應(yīng)用還不真正的異步扎拣,可以說(shuō)時(shí)偽異步赴肚,Netty 之前也嘗試使用過(guò) AIO,不過(guò)又放棄回歸到NIO上二蓝。


參考

Linux的五種IO模型:https://mp.weixin.qq.com/s?__biz=Mzg3MjA4MTExMw==&mid=2247484746&idx=1&sn=c0a7f9129d780786cabfcac0a8aa6bb7&source=41&scene=21#wechat_redirect

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末誉券,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子刊愚,更是在濱河造成了極大的恐慌踊跟,老刑警劉巖,帶你破解...
    沈念sama閱讀 212,383評(píng)論 6 493
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件鸥诽,死亡現(xiàn)場(chǎng)離奇詭異商玫,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)牡借,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,522評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門拳昌,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人钠龙,你說(shuō)我怎么就攤上這事地回。” “怎么了俊鱼?”我有些...
    開封第一講書人閱讀 157,852評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵刻像,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我并闲,道長(zhǎng)细睡,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,621評(píng)論 1 284
  • 正文 為了忘掉前任帝火,我火速辦了婚禮溜徙,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘犀填。我一直安慰自己蠢壹,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,741評(píng)論 6 386
  • 文/花漫 我一把揭開白布九巡。 她就那樣靜靜地躺著图贸,像睡著了一般。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上疏日,一...
    開封第一講書人閱讀 49,929評(píng)論 1 290
  • 那天偿洁,我揣著相機(jī)與錄音,去河邊找鬼沟优。 笑死涕滋,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的挠阁。 我是一名探鬼主播宾肺,決...
    沈念sama閱讀 39,076評(píng)論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼侵俗!你這毒婦竟也來(lái)了爱榕?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,803評(píng)論 0 268
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤坡慌,失蹤者是張志新(化名)和其女友劉穎黔酥,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體洪橘,經(jīng)...
    沈念sama閱讀 44,265評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡跪者,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,582評(píng)論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了熄求。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片渣玲。...
    茶點(diǎn)故事閱讀 38,716評(píng)論 1 341
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖弟晚,靈堂內(nèi)的尸體忽然破棺而出忘衍,到底是詐尸還是另有隱情,我是刑警寧澤卿城,帶...
    沈念sama閱讀 34,395評(píng)論 4 333
  • 正文 年R本政府宣布枚钓,位于F島的核電站,受9級(jí)特大地震影響瑟押,放射性物質(zhì)發(fā)生泄漏搀捷。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 40,039評(píng)論 3 316
  • 文/蒙蒙 一多望、第九天 我趴在偏房一處隱蔽的房頂上張望嫩舟。 院中可真熱鬧,春花似錦怀偷、人聲如沸家厌。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,798評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)饭于。三九已至蜀踏,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間镰绎,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,027評(píng)論 1 266
  • 我被黑心中介騙來(lái)泰國(guó)打工木西, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留畴栖,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 46,488評(píng)論 2 361
  • 正文 我出身青樓八千,卻偏偏與公主長(zhǎng)得像吗讶,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子恋捆,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,612評(píng)論 2 350