Java NIO網(wǎng)絡(luò)編程

OSI網(wǎng)絡(luò)七層模型

  • 低三層
    物料層:使用原始數(shù)據(jù)比特流能再物理介質(zhì)上傳輸
    數(shù)據(jù)鏈路層: 通過(guò)校驗(yàn)留美、確認(rèn)和反饋重發(fā)等手段,行程穩(wěn)定的數(shù)據(jù)鏈路
    網(wǎng)絡(luò)層:進(jìn)行路由選擇和流量控制(IP協(xié)議)
  • 傳輸層:(TCP/UDP協(xié)議)提供可靠的端口到端口的數(shù)據(jù)傳輸服務(wù)
  • 高三層:一般都指JAVA程序
    會(huì)話層:負(fù)責(zé)建立伸刃、管理和終止進(jìn)程之間的會(huì)話和數(shù)據(jù)交換
    表示層:負(fù)責(zé)數(shù)據(jù)格式轉(zhuǎn)換谎砾、數(shù)據(jù)加密與解密抚太、壓縮與解壓等
    應(yīng)用層:為用戶的應(yīng)用進(jìn)程提供網(wǎng)絡(luò)服務(wù)

傳輸控制協(xié)議TCP

面向連接谨究、可靠、有序麻献、字節(jié)流傳輸服務(wù)隘道,應(yīng)用程序在使用TCP前必須建立TCP連接

  • TCP三次握手

    1. 客戶端發(fā)送消息至服務(wù)端等待確認(rèn)
    2. 服務(wù)端收到症歇,并發(fā)送請(qǐng)求等待確認(rèn)
    3. 客戶端收到,建立連接
  • TCP四次揮手

    1. 客戶端發(fā)送斷開(kāi)連接至服務(wù)端等待確認(rèn)
    2. 服務(wù)端收到谭梗,半關(guān)閉狀態(tài)忘晤,服務(wù)端等待釋放
    3. 服務(wù)端發(fā)送等待確認(rèn)
    4. 客戶端等待一會(huì),發(fā)送確認(rèn)關(guān)閉激捏,等待一會(huì)關(guān)閉

用戶數(shù)據(jù)報(bào)協(xié)議UDP

UDP是Internet傳輸協(xié)議设塔,提供無(wú)連接、不可靠远舅、數(shù)據(jù)報(bào)盡力傳輸服務(wù)

  • 無(wú)需建立連接
  • 無(wú)需連接狀態(tài)
  • 數(shù)據(jù)不可靠

Socket編程

  • 數(shù)據(jù)報(bào)類型套接字SOCK_DGRAM(面向UDP接口)
  • 流式套接字SOCK_STREAM(面向TCP接口)
  • 原始套接字SOCK_RAW(面向網(wǎng)絡(luò)層協(xié)議接口IP闰蛔、ICMP等)
  • Socket API及其調(diào)用過(guò)程

socket
  • Socket API函數(shù)定義

    • listen()、accept()函數(shù)只能用于服務(wù)端
    • connect()函數(shù)只能用于客戶端
    • socket() 图柏、bind()序六、send()、recv()蚤吹、sendto()例诀、recvfrom()随抠、close()

網(wǎng)絡(luò)編程

  • 阻塞(bloking)IO
    資源不可用時(shí),IO請(qǐng)求一直阻塞繁涂,直到反饋結(jié)果(有數(shù)據(jù)或超時(shí))

  • 非阻塞(non-bloking)IO
    資源不可用時(shí)拱她,IO請(qǐng)求離開(kāi)返回,返回?cái)?shù)據(jù)標(biāo)識(shí)資源不可用

  • 同步(synchronous)IO
    應(yīng)用阻塞在發(fā)送或接受數(shù)據(jù)的狀態(tài)扔罪,直到數(shù)據(jù)成功傳輸或返回失敗

  • 異步(asynchronous)IO
    應(yīng)用發(fā)送或接受數(shù)據(jù)后立即返回秉沼,實(shí)際處理是異步執(zhí)行


BIO網(wǎng)絡(luò)編程

客戶端阻塞 - OutputStream.write()
import java.io.OutputStream;
import java.net.Socket;
import java.nio.charset.Charset;
import java.util.Scanner;

public class BIOClient {
    private static Charset charset = Charset.forName("UTF-8");

    public static void main(String[] args) throws Exception {
        Socket s = new Socket("localhost", 8080);
        OutputStream out = s.getOutputStream();

        Scanner scanner = new Scanner(System.in);
        System.out.println("請(qǐng)輸入:");
        String msg = scanner.nextLine();
        out.write(msg.getBytes(charset)); // 阻塞,寫完成
        scanner.close();
        s.close();
    }
}
服務(wù)端阻塞 - ServerSocket.accept() 與 BufferedReader.readLine()
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.ServerSocket;
import java.net.Socket;

public class BIOServer {

    public static void main(String[] args) throws Exception {
        ServerSocket serverSocket = new ServerSocket(8080);
        System.out.println("服務(wù)器啟動(dòng)成功");
        while (!serverSocket.isClosed()) {
            Socket request = serverSocket.accept();// 阻塞
            System.out.println("收到新連接 : " + request.toString());
            try {
                // 接收數(shù)據(jù)步势、打印
                InputStream inputStream = request.getInputStream(); // net + i/o
                BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream, "utf-8"));
                String msg;
                while ((msg = reader.readLine()) != null) { // 沒(méi)有數(shù)據(jù)氧猬,阻塞
                    if (msg.length() == 0) {
                        break;
                    }
                    System.out.println(msg);
                }
                System.out.println("收到數(shù)據(jù),來(lái)自:"+ request.toString());
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                try {
                    request.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
        serverSocket.close();
    }
}
服務(wù)端 - 加入多線程并模擬HTTP請(qǐng)求協(xié)議返回?cái)?shù)據(jù) - 子線程阻塞
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class BIOServer2 {

    private static ExecutorService threadPool = Executors.newCachedThreadPool();

    public static void main(String[] args) throws Exception {
        ServerSocket serverSocket = new ServerSocket(8080);
        System.out.println("服務(wù)器啟動(dòng)成功");
        while (!serverSocket.isClosed()) {
            Socket request = serverSocket.accept();
            System.out.println("收到新連接 : " + request.toString());
            threadPool.execute(() -> {
                try {
                    // 接收數(shù)據(jù)背犯、打印
                    InputStream inputStream = request.getInputStream();
                    BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream, "utf-8"));
                    String msg;
                    while ((msg = reader.readLine()) != null) { // 沒(méi)有數(shù)據(jù)阻塞
                        if (msg.length() == 0) {
                            break;
                        }
                        System.out.println(msg);
                    }

                    System.out.println("收到數(shù)據(jù),來(lái)自:"+ request.toString());
                    // 響應(yīng)結(jié)果 200
                    OutputStream outputStream = request.getOutputStream();
                    outputStream.write("HTTP/1.1 200 OK\r\n".getBytes());
                    outputStream.write("Content-Length: 11\r\n\r\n".getBytes());
                    outputStream.write("Hello World".getBytes());
                    outputStream.flush();
                } catch (IOException e) {
                    e.printStackTrace();
                } finally {
                    try {
                        request.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
        serverSocket.close();
    }
}

NIO網(wǎng)絡(luò)編程

NIO中三個(gè)核心組件:Buffer緩沖區(qū)坏瘩、Channel通道、Sellector選擇器

Buffer緩沖區(qū)

緩沖區(qū)是一個(gè)可以寫入數(shù)據(jù)的內(nèi)存塊漠魏,然后可以再次讀取倔矾,Buffer三個(gè)重要屬性:

  • capacity容量:做位一個(gè)內(nèi)存塊,Buffer具有一定的固定大小
  • position位置:寫入模式時(shí)代表寫數(shù)據(jù)的位置柱锹,讀取模式時(shí)代表讀取數(shù)據(jù)的位置
  • limit限制:寫入模式哪自,限制等于buffer的容量;讀取模式下禁熏,limit等于寫入的數(shù)據(jù)量
import java.nio.ByteBuffer;
import java.nio.IntBuffer;
import java.nio.LongBuffer;

public class BufferDemo {
    public static void main(String[] args) {
        // 構(gòu)建一個(gè)byte字節(jié)緩沖區(qū)壤巷,容量是4
        ByteBuffer byteBuffer = ByteBuffer.allocateDirect(4);
        // 默認(rèn)寫入模式,查看三個(gè)重要的指標(biāo)
        System.out.println(String.format("初始化:capacity容量:%s, position位置:%s, limit限制:%s", byteBuffer.capacity(),
                byteBuffer.position(), byteBuffer.limit()));
        // 寫入2字節(jié)的數(shù)據(jù)
        byteBuffer.put((byte) 1);
        byteBuffer.put((byte) 2);
        byteBuffer.put((byte) 3);
        // 再看數(shù)據(jù)
        System.out.println(String.format("寫入3字節(jié)后瞧毙,capacity容量:%s, position位置:%s, limit限制:%s", byteBuffer.capacity(),
                byteBuffer.position(), byteBuffer.limit()));

        // 轉(zhuǎn)換為讀取模式(不調(diào)用flip方法胧华,也是可以讀取數(shù)據(jù)的,但是position記錄讀取的位置不對(duì))
        System.out.println("#######開(kāi)始讀取");
        byteBuffer.flip();
        byte a = byteBuffer.get();
        System.out.println(a);
        byte b = byteBuffer.get();
        System.out.println(b);
        System.out.println(String.format("讀取2字節(jié)數(shù)據(jù)后宙彪,capacity容量:%s, position位置:%s, limit限制:%s", byteBuffer.capacity(),
                byteBuffer.position(), byteBuffer.limit()));

        // 繼續(xù)寫入3字節(jié)矩动,此時(shí)讀模式下,limit=3释漆,position=2.繼續(xù)寫入只能覆蓋寫入一條數(shù)據(jù)
        // clear()方法清除整個(gè)緩沖區(qū)悲没。compact()方法僅清除已閱讀的數(shù)據(jù)。轉(zhuǎn)為寫入模式
        byteBuffer.compact(); // buffer : 1 , 3
        byteBuffer.put((byte) 3);
        byteBuffer.put((byte) 4);
        byteBuffer.put((byte) 5);
        System.out.println(String.format("最終的情況男图,capacity容量:%s, position位置:%s, limit限制:%s", byteBuffer.capacity(),
                byteBuffer.position(), byteBuffer.limit()));

        // rewind() 重置position為0
        // mark() 標(biāo)記position的位置
        // reset() 重置position為上次mark()標(biāo)記的位置

    }
}

ByteBuffer內(nèi)存類型

ByteBuffer為性能關(guān)鍵型代碼提供了直接內(nèi)存(direct堆外)和非直接內(nèi)存(heap堆)兩種實(shí)現(xiàn)
堆外內(nèi)存獲取方式:ByteBuffer buffer = ByteBuffer.allocateDirect(noBytes);

堆外內(nèi)存的好處

  • 進(jìn)行網(wǎng)絡(luò)IO或者文件IO時(shí)比heapBuffer少一次拷貝示姿。因?yàn)镚C會(huì)移動(dòng)對(duì)象內(nèi)存,在寫file或者socket的過(guò)程中逊笆,JVM的實(shí)現(xiàn)中會(huì)把數(shù)據(jù)復(fù)制到堆外栈戳,再進(jìn)行寫入
  • 堆外內(nèi)存不收GC控制,能降低GC的壓力览露,實(shí)現(xiàn)自動(dòng)管理荧琼。DirectByteBuffer中有一個(gè)Cleaner對(duì)象(PhantomReference),Cleaner被GC前會(huì)執(zhí)行clean方法,能觸發(fā)DirectByteBuffer中定義的Deallocator

Channel通道

Channel的API涵蓋了UDP/TCP網(wǎng)絡(luò)和文件IO,他可以創(chuàng)建連接與傳輸數(shù)據(jù),可以非阻塞的讀取和寫入通道命锄,通道始終讀取或吸入緩沖區(qū)

SocketChannel

SocketChannel用戶建立TCP網(wǎng)絡(luò)連接堰乔,有兩種創(chuàng)建形式:

  1. 客戶端主動(dòng)發(fā)起服務(wù)器連接
  2. 服務(wù)器獲取新的連接

讀和寫都變成了非阻塞的方法,wirte()在尚未寫數(shù)據(jù)就可能返回了脐恩,read()可能返回空數(shù)據(jù)镐侯。所以需要再循環(huán)中使用

ServerSocketChannel

ServerSocketChannel可監(jiān)聽(tīng)新建立的TCP連接通道,通過(guò)open()方法創(chuàng)建

ServerSocketChannel.accept() : 如果該通道處于非阻塞模式驶冒,那么如果沒(méi)有掛起的連接苟翻,該方法將立即返回NULL

NIO服務(wù)端 - 基礎(chǔ)實(shí)現(xiàn)
  • 循環(huán)內(nèi)只能實(shí)現(xiàn)單線程處理一個(gè)任務(wù)
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;

/**
 * 直接基于非阻塞的寫法
 */
public class NIOServer {

    public static void main(String[] args) throws Exception {
        // 創(chuàng)建網(wǎng)絡(luò)服務(wù)端
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false); // 設(shè)置為非阻塞模式 - 默認(rèn)都是阻塞的
        serverSocketChannel.socket().bind(new InetSocketAddress(8080)); // 綁定端口
        System.out.println("啟動(dòng)成功");
        while (true) {
            SocketChannel socketChannel = serverSocketChannel.accept(); // 獲取新tcp連接通道
            // tcp請(qǐng)求 讀取/響應(yīng)
            if (socketChannel != null) {
                System.out.println("收到新連接 : " + socketChannel.getRemoteAddress());
                socketChannel.configureBlocking(false); // 默認(rèn)是阻塞的,一定要設(shè)置為非阻塞
                try {
                    ByteBuffer requestBuffer = ByteBuffer.allocate(1024);
                    while (socketChannel.isOpen() && socketChannel.read(requestBuffer) != -1) {
                        // 長(zhǎng)連接情況下,需要手動(dòng)判斷數(shù)據(jù)有沒(méi)有讀取結(jié)束 (此處做一個(gè)簡(jiǎn)單的判斷: 超過(guò)0字節(jié)就認(rèn)為請(qǐng)求結(jié)束了)
                        if (requestBuffer.position() > 0) break;
                    }
                    if(requestBuffer.position() == 0) continue; // 如果沒(méi)數(shù)據(jù)了, 則不繼續(xù)后面的處理
                    requestBuffer.flip();
                    byte[] content = new byte[requestBuffer.limit()];
                    requestBuffer.get(content);
                    System.out.println(new String(content));
                    System.out.println("收到數(shù)據(jù),來(lái)自:"+ socketChannel.getRemoteAddress());

                    // 響應(yīng)結(jié)果 200
                    String response = "HTTP/1.1 200 OK\r\n" +
                            "Content-Length: 11\r\n\r\n" +
                            "Hello World";
                    ByteBuffer buffer = ByteBuffer.wrap(response.getBytes());
                    while (buffer.hasRemaining()) {
                        socketChannel.write(buffer);// 非阻塞
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
        // 用到了非阻塞的API, 在設(shè)計(jì)上,和BIO可以有很大的不同.繼續(xù)改進(jìn)
    }
}
NIO客戶端
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.Scanner;

public class NIOClient {

    public static void main(String[] args) throws Exception {
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.configureBlocking(false);
        socketChannel.connect(new InetSocketAddress("127.0.0.1", 8080));
        while (!socketChannel.finishConnect()) {
            // 沒(méi)連接上,則一直等待
            Thread.yield();
        }
        Scanner scanner = new Scanner(System.in);
        System.out.println("請(qǐng)輸入:");
        // 發(fā)送內(nèi)容
        String msg = scanner.nextLine();
        ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes());
        while (buffer.hasRemaining()) {
            socketChannel.write(buffer);
        }
        // 讀取響應(yīng)
        System.out.println("收到服務(wù)端響應(yīng):");
        ByteBuffer requestBuffer = ByteBuffer.allocate(1024);

        while (socketChannel.isOpen() && socketChannel.read(requestBuffer) != -1) {
            // 長(zhǎng)連接情況下,需要手動(dòng)判斷數(shù)據(jù)有沒(méi)有讀取結(jié)束 (此處做一個(gè)簡(jiǎn)單的判斷: 超過(guò)0字節(jié)就認(rèn)為請(qǐng)求結(jié)束了)
            if (requestBuffer.position() > 0) break;
        }
        requestBuffer.flip();
        byte[] content = new byte[requestBuffer.limit()];
        requestBuffer.get(content);
        System.out.println(new String(content));
        scanner.close();
        socketChannel.close();
    }
}
NIO服務(wù)端優(yōu)化
  • 用到了非阻塞的API, 再設(shè)計(jì)上,和BIO可以有很大的不同
  • 問(wèn)題: 輪詢通道的方式,低效,浪費(fèi)CPU
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Iterator;

/**
 * 直接基于非阻塞的寫法,一個(gè)線程處理輪詢所有請(qǐng)求
 */
public class NIOServer1 {
    /**
     * 已經(jīng)建立連接的集合
     */
    private static ArrayList<SocketChannel> channels = new ArrayList<>();

    public static void main(String[] args) throws Exception {
        // 創(chuàng)建網(wǎng)絡(luò)服務(wù)端
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false); // 設(shè)置為非阻塞模式
        serverSocketChannel.socket().bind(new InetSocketAddress(8080)); // 綁定端口
        System.out.println("啟動(dòng)成功");
        while (true) {
            SocketChannel socketChannel = serverSocketChannel.accept(); // 獲取新tcp連接通道
                // tcp請(qǐng)求 讀取/響應(yīng)
                if (socketChannel != null) {
                System.out.println("收到新連接 : " + socketChannel.getRemoteAddress());
                socketChannel.configureBlocking(false); // 默認(rèn)是阻塞的,一定要設(shè)置為非阻塞
                channels.add(socketChannel);
            } else {
                // 沒(méi)有新連接的情況下,就去處理現(xiàn)有連接的數(shù)據(jù),處理完的就刪除掉
                Iterator<SocketChannel> iterator = channels.iterator();
                while (iterator.hasNext()) {
                    SocketChannel ch = iterator.next();
                    try {
                        ByteBuffer requestBuffer = ByteBuffer.allocate(1024);

                        if (ch.read(requestBuffer) == 0) {
                            // 等于0,代表這個(gè)通道沒(méi)有數(shù)據(jù)需要處理,那就待會(huì)再處理
                            continue;
                        }
                        while (ch.isOpen() && ch.read(requestBuffer) != -1) {
                            // 長(zhǎng)連接情況下,需要手動(dòng)判斷數(shù)據(jù)有沒(méi)有讀取結(jié)束 (此處做一個(gè)簡(jiǎn)單的判斷: 超過(guò)0字節(jié)就認(rèn)為請(qǐng)求結(jié)束了)
                            if (requestBuffer.position() > 0) break;
                        }
                        if(requestBuffer.position() == 0) continue; // 如果沒(méi)數(shù)據(jù)了, 則不繼續(xù)后面的處理
                        requestBuffer.flip();
                        byte[] content = new byte[requestBuffer.limit()];
                        requestBuffer.get(content);
                        System.out.println(new String(content));
                        System.out.println("收到數(shù)據(jù),來(lái)自:" + ch.getRemoteAddress());

                        // 響應(yīng)結(jié)果 200
                        String response = "HTTP/1.1 200 OK\r\n" +
                                "Content-Length: 11\r\n\r\n" +
                                "Hello World";
                        ByteBuffer buffer = ByteBuffer.wrap(response.getBytes());
                        while (buffer.hasRemaining()) {
                            ch.write(buffer);
                        }
                        iterator.remove();
                    } catch (IOException e) {
                        e.printStackTrace();
                        iterator.remove();
                    }
                }
            }
        }
    }
}


Selector選擇器

Selector可以堅(jiān)持一個(gè)或者多個(gè)NIO通道,并確定哪些通道已經(jīng)準(zhǔn)備好進(jìn)行讀取或?qū)懭肫郏瑢?shí)現(xiàn)一個(gè)線程處理多個(gè)通道的核心概念理解:事件驅(qū)動(dòng)機(jī)制

通過(guò)Selector 事件輪詢實(shí)現(xiàn) NIO服務(wù)端
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * 結(jié)合Selector實(shí)現(xiàn)的非阻塞服務(wù)端(放棄對(duì)channel的輪詢,借助消息通知機(jī)制)
 */
public class NIOServerV2 {

    public static void main(String[] args) throws Exception {
        // 1. 創(chuàng)建網(wǎng)絡(luò)服務(wù)端ServerSocketChannel
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false); // 設(shè)置為非阻塞模式

        // 2. 構(gòu)建一個(gè)Selector選擇器,并且將channel注冊(cè)上去
        Selector selector = Selector.open();
        SelectionKey selectionKey = serverSocketChannel.register(selector, 0, serverSocketChannel);// 將serverSocketChannel注冊(cè)到selector
        selectionKey.interestOps(SelectionKey.OP_ACCEPT); // 對(duì)serverSocketChannel上面的accept事件感興趣(serverSocketChannel只能支持accept操作)

        // 3. 綁定端口
        serverSocketChannel.socket().bind(new InetSocketAddress(8080));

        System.out.println("啟動(dòng)成功");

        while (true) {
            // 不再輪詢通道,改用下面輪詢事件的方式.select方法有阻塞效果,直到有事件通知才會(huì)有返回
            selector.select();
            // 獲取事件
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            // 遍歷查詢結(jié)果e
            Iterator<SelectionKey> iter = selectionKeys.iterator();
            while (iter.hasNext()) {
                // 被封裝的查詢結(jié)果
                SelectionKey key = iter.next();
                iter.remove();
                // 關(guān)注 Read 和 Accept兩個(gè)事件
                if (key.isAcceptable()) {
                    ServerSocketChannel server = (ServerSocketChannel) key.attachment();
                    // 將拿到的客戶端連接通道,注冊(cè)到selector上面
                    SocketChannel clientSocketChannel = server.accept(); // mainReactor 輪詢accept
                    clientSocketChannel.configureBlocking(false);
                    clientSocketChannel.register(selector, SelectionKey.OP_READ, clientSocketChannel); //再次注冊(cè)事件
                    System.out.println("收到新連接 : " + clientSocketChannel.getRemoteAddress());
                }

                if (key.isReadable()) {
                    SocketChannel socketChannel = (SocketChannel) key.attachment();
                    try {
                        ByteBuffer requestBuffer = ByteBuffer.allocate(1024);
                        while (socketChannel.isOpen() && socketChannel.read(requestBuffer) != -1) {
                            // 長(zhǎng)連接情況下,需要手動(dòng)判斷數(shù)據(jù)有沒(méi)有讀取結(jié)束 (此處做一個(gè)簡(jiǎn)單的判斷: 超過(guò)0字節(jié)就認(rèn)為請(qǐng)求結(jié)束了)
                            if (requestBuffer.position() > 0) break;
                        }
                        if(requestBuffer.position() == 0) continue; // 如果沒(méi)數(shù)據(jù)了, 則不繼續(xù)后面的處理
                        requestBuffer.flip();
                        byte[] content = new byte[requestBuffer.limit()];
                        requestBuffer.get(content);
                        System.out.println(new String(content));
                        System.out.println("收到數(shù)據(jù),來(lái)自:" + socketChannel.getRemoteAddress());
                        // TODO 業(yè)務(wù)操作 數(shù)據(jù)庫(kù) 接口調(diào)用等等

                        // 響應(yīng)結(jié)果 200
                        String response = "HTTP/1.1 200 OK\r\n" +
                                "Content-Length: 11\r\n\r\n" +
                                "Hello World";
                        ByteBuffer buffer = ByteBuffer.wrap(response.getBytes());
                        while (buffer.hasRemaining()) {
                            socketChannel.write(buffer);
                        }
                    } catch (IOException e) {
                        // e.printStackTrace();
                        key.cancel(); // 取消事件訂閱
                    }
                }
            }
            selector.selectNow();
        }
        // 問(wèn)題: 此處一個(gè)selector監(jiān)聽(tīng)所有事件,一個(gè)線程處理所有請(qǐng)求事件. 會(huì)成為瓶頸! 要有多線程的運(yùn)用
    }
}

Reactor模式

Reactor模式是一種典型的事件驅(qū)動(dòng)的編程模型崇猫,是一種為處理并發(fā)服務(wù)請(qǐng)求,并將請(qǐng)求提交到一個(gè)或
者多個(gè)服務(wù)處理程序的事件設(shè)計(jì)模式

NIO與多線程結(jié)合 Reactor模式

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * NIO selector 多路復(fù)用reactor線程模型
 */
public class NIOServerV3 {
    /** 處理業(yè)務(wù)操作的線程 */
    private static ExecutorService workPool = Executors.newCachedThreadPool();

    /**
     * 封裝了selector.select()等事件輪詢的代碼
     */
    abstract class ReactorThread extends Thread {

        Selector selector;
        LinkedBlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>();

        /**
         * Selector監(jiān)聽(tīng)到有事件后,調(diào)用這個(gè)方法
         */
        public abstract void handler(SelectableChannel channel) throws Exception;

        private ReactorThread() throws IOException {
            selector = Selector.open();
        }

        volatile boolean running = false;

        @Override
        public void run() {
            // 輪詢Selector事件
            while (running) {
                try {
                    // 執(zhí)行隊(duì)列中的任務(wù)
                    Runnable task;
                    while ((task = taskQueue.poll()) != null) {
                        task.run();
                    }
                    selector.select(1000);

                    // 獲取查詢結(jié)果
                    Set<SelectionKey> selected = selector.selectedKeys();
                    // 遍歷查詢結(jié)果
                    Iterator<SelectionKey> iter = selected.iterator();
                    while (iter.hasNext()) {
                        // 被封裝的查詢結(jié)果
                        SelectionKey key = iter.next();
                        iter.remove();
                        int readyOps = key.readyOps();
                        // 關(guān)注 Read 和 Accept兩個(gè)事件
                        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                            try {
                                SelectableChannel channel = (SelectableChannel) key.attachment();
                                channel.configureBlocking(false);
                                handler(channel);
                                if (!channel.isOpen()) {
                                    key.cancel(); // 如果關(guān)閉了,就取消這個(gè)KEY的訂閱
                                }
                            } catch (Exception ex) {
                                key.cancel(); // 如果有異常,就取消這個(gè)KEY的訂閱
                            }
                        }
                    }
                    selector.selectNow();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }

        private SelectionKey register(SelectableChannel channel) throws Exception {
            // 為什么register要以任務(wù)提交的形式需忿,讓reactor線程去處理诅炉?
            // 因?yàn)榫€程在執(zhí)行channel注冊(cè)到selector的過(guò)程中,會(huì)和調(diào)用selector.select()方法的線程爭(zhēng)用同一把鎖
            // 而select()方法實(shí)在eventLoop中通過(guò)while循環(huán)調(diào)用的屋厘,爭(zhēng)搶的可能性很高涕烧,為了讓register能更快的執(zhí)行,就放到同一個(gè)線程來(lái)處理
            FutureTask<SelectionKey> futureTask = new FutureTask<>(() -> channel.register(selector, 0, channel));
            taskQueue.add(futureTask);
            return futureTask.get();
        }

        private void doStart() {
            if (!running) {
                running = true;
                start();
            }
        }
    }

    private ServerSocketChannel serverSocketChannel;
    // 1汗洒、創(chuàng)建多個(gè)線程 - accept處理reactor線程 (accept線程)
    private ReactorThread[] mainReactorThreads = new ReactorThread[1];
    // 2议纯、創(chuàng)建多個(gè)線程 - io處理reactor線程  (I/O線程)
    private ReactorThread[] subReactorThreads = new ReactorThread[8];

    /**
     * 初始化線程組
     */
    private void newGroup() throws IOException {
        // 創(chuàng)建IO線程,負(fù)責(zé)處理客戶端連接以后socketChannel的IO讀寫
        for (int i = 0; i < subReactorThreads.length; i++) {
            subReactorThreads[i] = new ReactorThread() {
                @Override
                public void handler(SelectableChannel channel) throws IOException {
                    // work線程只負(fù)責(zé)處理IO處理,不處理accept事件
                    SocketChannel ch = (SocketChannel) channel;
                    ByteBuffer requestBuffer = ByteBuffer.allocate(1024);
                    while (ch.isOpen() && ch.read(requestBuffer) != -1) {
                        // 長(zhǎng)連接情況下,需要手動(dòng)判斷數(shù)據(jù)有沒(méi)有讀取結(jié)束 (此處做一個(gè)簡(jiǎn)單的判斷: 超過(guò)0字節(jié)就認(rèn)為請(qǐng)求結(jié)束了)
                        if (requestBuffer.position() > 0) break;
                    }
                    if (requestBuffer.position() == 0) return; // 如果沒(méi)數(shù)據(jù)了, 則不繼續(xù)后面的處理
                    requestBuffer.flip();
                    byte[] content = new byte[requestBuffer.limit()];
                    requestBuffer.get(content);
                    System.out.println(new String(content));
                    System.out.println(Thread.currentThread().getName() + "收到數(shù)據(jù),來(lái)自:" + ch.getRemoteAddress());

                    // TODO 業(yè)務(wù)操作 數(shù)據(jù)庫(kù)溢谤、接口...
                    workPool.submit(() -> {
                    });

                    // 響應(yīng)結(jié)果 200
                    String response = "HTTP/1.1 200 OK\r\n" +
                            "Content-Length: 11\r\n\r\n" +
                            "Hello World";
                    ByteBuffer buffer = ByteBuffer.wrap(response.getBytes());
                    while (buffer.hasRemaining()) {
                        ch.write(buffer);
                    }
                }
            };
        }

        // 創(chuàng)建mainReactor線程, 只負(fù)責(zé)處理serverSocketChannel
        for (int i = 0; i < mainReactorThreads.length; i++) {
            mainReactorThreads[i] = new ReactorThread() {
                AtomicInteger incr = new AtomicInteger(0);

                @Override
                public void handler(SelectableChannel channel) throws Exception {
                    // 只做請(qǐng)求分發(fā)瞻凤,不做具體的數(shù)據(jù)讀取
                    ServerSocketChannel ch = (ServerSocketChannel) channel;
                    SocketChannel socketChannel = ch.accept();
                    socketChannel.configureBlocking(false);
                    // 收到連接建立的通知之后,分發(fā)給I/O線程繼續(xù)去讀取數(shù)據(jù)
                    int index = incr.getAndIncrement() % subReactorThreads.length;
                    ReactorThread workEventLoop = subReactorThreads[index];
                    workEventLoop.doStart();
                    SelectionKey selectionKey = workEventLoop.register(socketChannel);
                    selectionKey.interestOps(SelectionKey.OP_READ);
                    System.out.println(Thread.currentThread().getName() + "收到新連接 : " + socketChannel.getRemoteAddress());
                }
            };
        }


    }

    /**
     * 初始化channel,并且綁定一個(gè)eventLoop線程
     *
     * @throws IOException IO異常
     */
    private void initAndRegister() throws Exception {
        // 1溯香、 創(chuàng)建ServerSocketChannel
        serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false);
        // 2鲫构、 將serverSocketChannel注冊(cè)到selector
        int index = new Random().nextInt(mainReactorThreads.length);
        mainReactorThreads[index].doStart();
        SelectionKey selectionKey = mainReactorThreads[index].register(serverSocketChannel);
        selectionKey.interestOps(SelectionKey.OP_ACCEPT);
    }

    /**
     * 綁定端口
     *
     * @throws IOException IO異常
     */
    private void bind() throws IOException {
        //  1、 正式綁定端口玫坛,對(duì)外服務(wù)
        serverSocketChannel.bind(new InetSocketAddress(8080));
        System.out.println("啟動(dòng)完成结笨,端口8080");
    }

    public static void main(String[] args) throws Exception {
        NIOServerV3 nioServerV3 = new NIOServerV3();
        nioServerV3.newGroup(); // 1、 創(chuàng)建main和sub兩組線程
        nioServerV3.initAndRegister(); // 2湿镀、 創(chuàng)建serverSocketChannel炕吸,注冊(cè)到mainReactor線程上的selector上
        nioServerV3.bind(); // 3、 為serverSocketChannel綁定端口
    }
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末勉痴,一起剝皮案震驚了整個(gè)濱河市赫模,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌蒸矛,老刑警劉巖瀑罗,帶你破解...
    沈念sama閱讀 217,907評(píng)論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件胸嘴,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡斩祭,警方通過(guò)查閱死者的電腦和手機(jī)劣像,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,987評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)摧玫,“玉大人耳奕,你說(shuō)我怎么就攤上這事∥芟瘢” “怎么了屋群?”我有些...
    開(kāi)封第一講書人閱讀 164,298評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)坏挠。 經(jīng)常有香客問(wèn)我芍躏,道長(zhǎng),這世上最難降的妖魔是什么癞揉? 我笑而不...
    開(kāi)封第一講書人閱讀 58,586評(píng)論 1 293
  • 正文 為了忘掉前任纸肉,我火速辦了婚禮溺欧,結(jié)果婚禮上喊熟,老公的妹妹穿的比我還像新娘。我一直安慰自己姐刁,他們只是感情好芥牌,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,633評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著聂使,像睡著了一般壁拉。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上柏靶,一...
    開(kāi)封第一講書人閱讀 51,488評(píng)論 1 302
  • 那天弃理,我揣著相機(jī)與錄音,去河邊找鬼屎蜓。 笑死痘昌,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的炬转。 我是一名探鬼主播辆苔,決...
    沈念sama閱讀 40,275評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼扼劈!你這毒婦竟也來(lái)了驻啤?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書人閱讀 39,176評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤荐吵,失蹤者是張志新(化名)和其女友劉穎骑冗,沒(méi)想到半個(gè)月后赊瞬,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,619評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡贼涩,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,819評(píng)論 3 336
  • 正文 我和宋清朗相戀三年森逮,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片磁携。...
    茶點(diǎn)故事閱讀 39,932評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡褒侧,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出谊迄,到底是詐尸還是另有隱情闷供,我是刑警寧澤,帶...
    沈念sama閱讀 35,655評(píng)論 5 346
  • 正文 年R本政府宣布统诺,位于F島的核電站歪脏,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏粮呢。R本人自食惡果不足惜婿失,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,265評(píng)論 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望啄寡。 院中可真熱鬧豪硅,春花似錦、人聲如沸挺物。這莊子的主人今日做“春日...
    開(kāi)封第一講書人閱讀 31,871評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)识藤。三九已至砚著,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間痴昧,已是汗流浹背稽穆。 一陣腳步聲響...
    開(kāi)封第一講書人閱讀 32,994評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留赶撰,地道東北人舌镶。 一個(gè)月前我還...
    沈念sama閱讀 48,095評(píng)論 3 370
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像扣囊,于是被迫代替她去往敵國(guó)和親乎折。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,884評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容