從網(wǎng)絡(luò)通信的演進(jìn)過程徹底搞懂Redis高性能通信的原理(全網(wǎng)最詳細(xì)车伞,建議收藏)

我們一直說Redis的性能很快,那為什么快运敢?Redis為了達(dá)到性能最大化,做了哪些方面的優(yōu)化呢忠售?
深度解析Redis的數(shù)據(jù)結(jié)構(gòu)
這篇文章中者冤,其實從數(shù)據(jù)結(jié)構(gòu)上分析了Redis性能高的一方面原因。

在目前的k-v數(shù)據(jù)庫的技術(shù)選型中档痪,Redis幾乎是首選的用來實現(xiàn)高性能緩存的方案,它的性能有多快呢邢滑?

根據(jù)官方的基準(zhǔn)測試數(shù)據(jù)腐螟,一臺普通硬件配置的Linux機(jī)器上運(yùn)行單個Redis實例愿汰,處理簡單命令(O(n)或者O(logn)),QPS可以達(dá)到8W乐纸,如果使用pipeline批處理功能衬廷,QPS最高可以達(dá)到10W。

Redis 為什么那么快

Redis的高性能主要依賴于幾個方面汽绢。

  • C語言實現(xiàn)吗跋,C語言在一定程度上還是比Java語言性能要高一些,因為C語言不需要經(jīng)過JVM進(jìn)行翻譯宁昭。
  • 純內(nèi)存I/O跌宛,內(nèi)存I/O比磁盤I/O性能更快
  • I/O多路復(fù)用,基于epoll的I/O多路復(fù)用技術(shù)积仗,實現(xiàn)高吞吐網(wǎng)絡(luò)I/O
  • 單線程模型疆拘,單線程無法利用到多核CPU,但是在Redis中寂曹,性能瓶頸并不是在計算上哎迄,而是在I/O能力,所以單線程能夠滿足高并發(fā)的要求隆圆。 從另一個層面來說漱挚,單線程可以避免多線程的頻繁上下文切換以及同步鎖機(jī)制帶來的性能開銷。

下面我們分別從上述幾個方面進(jìn)行展開說明渺氧,先來看網(wǎng)絡(luò)I/O的多路復(fù)用模型旨涝。

從請求處理開始分析

當(dāng)我們在客戶端向Redis Server發(fā)送一條指令,并且得到Redis回復(fù)的整個過程中阶女,Redis做了什么呢颊糜?

image-20210707221959664

<center>圖4-1</center>

要處理命令,則redis必須完整地接收客戶端的請求秃踩,并將命令解析出來衬鱼,再將結(jié)果讀出來,通過網(wǎng)絡(luò)回寫到客戶端憔杨。整個工序分為以下幾個部分:

  • 接收鸟赫,通過TCP接收到命令,可能會歷經(jīng)多次TCP包消别、ack抛蚤、IO操作
  • 解析,將命令取出來
  • 執(zhí)行寻狂,到對應(yīng)的地方將value讀出來
  • 返回岁经,將value通過TCP返回給客戶端,如果value較大蛇券,則IO負(fù)荷會更重

其中解析執(zhí)行是純cpu/內(nèi)存操作缀壤,而接收和返回主要是IO操作樊拓,首先我們先來看通信的過程。

網(wǎng)絡(luò)IO的通信原理

同樣塘慕,我也畫了一幅圖來描述網(wǎng)絡(luò)數(shù)據(jù)的傳輸流程

首先筋夏,對于TCP通信來說,每個TCP Socket的內(nèi)核中都有一個發(fā)送緩沖區(qū)和一個接收緩沖區(qū)

接收緩沖區(qū)把數(shù)據(jù)緩存到內(nèi)核图呢,若應(yīng)用進(jìn)程一直沒有調(diào)用Socket的read方法進(jìn)行讀取条篷,那么該數(shù)據(jù)會一直被緩存在接收緩沖區(qū)內(nèi)。不管進(jìn)程是否讀取Socket蛤织,對端發(fā)來的數(shù)據(jù)都會經(jīng)過內(nèi)核接收并緩存到Socket的內(nèi)核接收緩沖區(qū)赴叹。

read所要做的工作,就是把內(nèi)核接收緩沖區(qū)中的數(shù)據(jù)復(fù)制到應(yīng)用層用戶的Buffer里瞳筏。

進(jìn)程調(diào)用Socket的send發(fā)送數(shù)據(jù)的時候稚瘾,一般情況下是將數(shù)據(jù)從應(yīng)用層用戶的Buffer里復(fù)制到Socket的內(nèi)核發(fā)送緩沖區(qū),然后send就會在上層返回姚炕。換句話說摊欠,send返回時,數(shù)據(jù)不一定會被發(fā)送到對端柱宦。

1576066931883

網(wǎng)卡中的緩沖區(qū)既不屬于內(nèi)核空間些椒,也不屬于用戶空間。它屬于硬件緩沖掸刊,允許網(wǎng)卡與操作系統(tǒng)之間有個緩沖免糕;
內(nèi)核緩沖區(qū)在內(nèi)核空間,在內(nèi)存中忧侧,用于內(nèi)核程序石窑,做為讀自或?qū)懲布臄?shù)據(jù)緩沖區(qū);
用戶緩沖區(qū)在用戶空間蚓炬,在內(nèi)存中松逊,用于用戶程序,做為讀自或?qū)懲布臄?shù)據(jù)緩沖區(qū)

網(wǎng)卡芯片收到網(wǎng)絡(luò)數(shù)據(jù)會以中斷的方式通知CPU肯夏,我有數(shù)據(jù)了经宏,存在我的硬件緩沖里了,來讀我啊驯击。
CPU收到這個中斷信號后烁兰,會調(diào)用相應(yīng)的驅(qū)動接口函數(shù)從網(wǎng)卡的硬件緩沖里把數(shù)據(jù)讀到內(nèi)核緩沖區(qū),正常情況下會向上傳遞給TCP/IP模塊一層一層的處理徊都。

NIO多路復(fù)用機(jī)制

Redis的通信采用的是多路復(fù)用機(jī)制沪斟,什么是多路復(fù)用機(jī)制呢?

由于Redis是C語言實現(xiàn)暇矫,為了簡化大家的理解币喧,我們采用Java語言來描述這個過程轨域。

在理解多路復(fù)用之前,我們先來了解一下BIO杀餐。

BIO模型

在Java中,如果要實現(xiàn)網(wǎng)絡(luò)通信朱巨,我們會采用Socket套接字來完成史翘。

Socket這不是一個協(xié)議,而是一個通信模型冀续。其實它最初是BSD發(fā)明的琼讽,主要用來一臺電腦的兩個進(jìn)程間通信,然后把它用到了兩臺電腦的進(jìn)程間通信洪唐。所以钻蹬,可以把它簡單理解為進(jìn)程間通信,不是什么高級的東西凭需。主要做的事情不就是:

  • A發(fā)包:發(fā)請求包給某個已經(jīng)綁定的端口(所以我們經(jīng)常會訪問這樣的地址182.13.15.16:1235问欠,1235就是端口);收到B的允許粒蜈;然后正式發(fā)送顺献;發(fā)送完了,告訴B要斷開鏈接枯怖;收到斷開允許注整,馬上斷開,然后發(fā)送已經(jīng)斷開信息給B度硝。

  • B收包:綁定端口和IP肿轨;然后在這個端口監(jiān)聽;接收到A的請求蕊程,發(fā)允許給A椒袍,并做好接收準(zhǔn)備,主要就是清理緩存等待接收新數(shù)據(jù)存捺;然后正式接收槐沼;接受到斷開請求,允許斷開捌治;確認(rèn)斷開后岗钩,繼續(xù)監(jiān)聽其它請求。

可見肖油,Socket其實就是I/O操作兼吓,Socket并不僅限于網(wǎng)絡(luò)通信,在網(wǎng)絡(luò)通信中森枪,它涵蓋了網(wǎng)絡(luò)層视搏、傳輸層审孽、會話層、表示層浑娜、應(yīng)用層——其實這都不需要記佑力,因為Socket通信時候用到了IP和端口,僅這兩個就表明了它用到了網(wǎng)絡(luò)層和傳輸層筋遭;而且它無視多臺電腦通信的系統(tǒng)差別打颤,所以它涉及了表示層;一般Socket都是基于一個應(yīng)用程序的漓滔,所以會涉及到會話層和應(yīng)用層编饺。

構(gòu)建基礎(chǔ)的BIO通信模型

BIOServerSocket

public class BIOServerSocket {
    //先定義一個端口號,這個端口的值是可以自己調(diào)整的响驴。
    static final int DEFAULT_PORT=8080;
    public static void main(String[] args) throws IOException {
        //先定義一個端口號透且,這個端口的值是可以自己調(diào)整的。
        //在服務(wù)器端豁鲤,我們需要使用ServerSocket秽誊,所以我們先聲明一個ServerSocket變量
        ServerSocket serverSocket=null;
        //接下來,我們需要綁定監(jiān)聽端口, 那我們怎么做呢畅形?只需要創(chuàng)建使用serverSocket實例
        //ServerSocket有很多構(gòu)造重載养距,在這里,我們把前邊定義的端口傳入日熬,表示當(dāng)前
        //ServerSocket監(jiān)聽的端口是8080
        serverSocket=new ServerSocket(DEFAULT_PORT);
        System.out.println("啟動服務(wù)棍厌,監(jiān)聽端口:"+DEFAULT_PORT);
        //回顧一下前面我們講的內(nèi)容,接下來我們就需要開始等待客戶端的連接了竖席。
        //所以我們要使用的是accept這個函數(shù)耘纱,并且當(dāng)accept方法獲得一個客戶端請求時,會返回
        //一個socket對象毕荐, 這個socket對象讓服務(wù)器可以用來和客戶端通信的一個端點束析。

        //開始等待客戶端連接,如果沒有客戶端連接憎亚,就會一直阻塞在這個位置
        Socket socket=serverSocket.accept();
        //很可能有多個客戶端來發(fā)起連接员寇,為了區(qū)分客戶端,咱們可以輸出客戶端的端口號
        System.out.println("客戶端:"+socket.getPort()+"已連接");
        //一旦有客戶端連接過來第美,我們就可以用到IO來獲得客戶端傳過來的數(shù)據(jù)蝶锋。
        //使用InputStream來獲得客戶端的輸入數(shù)據(jù)
        //bufferedReader大家還記得吧,他維護(hù)了一個緩沖區(qū)可以減少數(shù)據(jù)源讀取的頻率
        BufferedReader bufferedReader=new BufferedReader(new InputStreamReader(socket.getInputStream()));
        String clientStr=bufferedReader.readLine(); //讀取一行信息
        System.out.println("客戶端發(fā)了一段消息:"+clientStr);
        //服務(wù)端收到數(shù)據(jù)以后什往,可以給到客戶端一個回復(fù)扳缕。這里咱們用到BufferedWriter
        BufferedWriter bufferedWriter=new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
        bufferedWriter.write("我已經(jīng)收到你的消息了\n");
        bufferedWriter.flush(); //清空緩沖區(qū)觸發(fā)消息發(fā)送
    }
}

BIOClientSocket

public class BIOClientSocket {
    static final int DEFAULT_PORT=8080;
    public static void main(String[] args) throws IOException {

        //在客戶端這邊,咱們使用socket來連接到指定的ip和端口
        Socket socket=new Socket("localhost",8080);
        //使用BufferedWriter,像服務(wù)器端寫入一個消息
        BufferedWriter bufferedWriter=new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
        bufferedWriter.write("我是客戶端Client-01\n");
        bufferedWriter.flush();
        BufferedReader bufferedReader=new BufferedReader(new InputStreamReader(socket.getInputStream()));
        String serverStr=bufferedReader.readLine(); //通過bufferedReader讀取服務(wù)端返回的消息
        System.out.println("服務(wù)端返回的消息:"+serverStr);
    }
}

上述代碼構(gòu)建了一個簡單的BIO通信模型躯舔,也就是服務(wù)端建立一個監(jiān)聽驴剔,客戶端向服務(wù)端發(fā)送一個消息,實現(xiàn)簡單的網(wǎng)絡(luò)通信粥庄,那BIO有什么弊端呢丧失?

我們通過對BIOServerSocket進(jìn)行改造,關(guān)注case1和case2部分飒赃。

  • case1: 增加了while循環(huán)利花,實現(xiàn)重復(fù)監(jiān)聽
  • case2: 當(dāng)服務(wù)端收到客戶端的請求后,不直接返回载佳,而是等待20s。
public class BIOServerSocket {
    //先定義一個端口號臀栈,這個端口的值是可以自己調(diào)整的蔫慧。
    static final int DEFAULT_PORT=8080;
    public static void main(String[] args) throws IOException, InterruptedException {
        ServerSocket serverSocket=null;
        serverSocket=new ServerSocket(DEFAULT_PORT);
        System.out.println("啟動服務(wù),監(jiān)聽端口:"+DEFAULT_PORT);

        while(true) { //case1: 增加循環(huán)权薯,允許循環(huán)接收請求
            Socket socket = serverSocket.accept();
            System.out.println("客戶端:" + socket.getPort() + "已連接");
            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
            String clientStr = bufferedReader.readLine(); //讀取一行信息
            System.out.println("客戶端發(fā)了一段消息:" + clientStr);
            Thread.sleep(20000); //case2: 修改:增加等待時間
            BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
            bufferedWriter.write("我已經(jīng)收到你的消息了\n");
            bufferedWriter.flush(); //清空緩沖區(qū)觸發(fā)消息發(fā)送
        }
    }
}

接著姑躲,把BIOClientSocket復(fù)制兩份(client1、client2)盟蚣,同時向BIOServerSocket發(fā)起請求黍析。

運(yùn)行后看到的現(xiàn)象應(yīng)該是: client1先發(fā)送請求到Server端,由于Server端等待20s才返回屎开,導(dǎo)致client2的請求一直被阻塞阐枣。

這個情況會導(dǎo)致一個問題,如果服務(wù)端在同一個時刻只能處理一個客戶端的連接奄抽,而如果一個網(wǎng)站同時有1000個用戶訪問蔼两,那么剩下的999個用戶都需要等待,而這個等待的耗時取決于前面的請求的處理時長逞度,如圖4-2所示额划。

image-20210708152538953

<center>圖4-2</center>

基于多線程優(yōu)化BIO

為了讓服務(wù)端能夠同時處理更多的客戶端連接,避免因為某個客戶端連接阻塞導(dǎo)致后續(xù)請求被阻塞档泽,于是引入多線程技術(shù)俊戳,代碼如下。

ServerSocket

public static void main(String[] args) throws IOException, InterruptedException {
    final int DEFAULT_PORT=8080;
    ServerSocket serverSocket=null;
    serverSocket=new ServerSocket(DEFAULT_PORT);
    System.out.println("啟動服務(wù)馆匿,監(jiān)聽端口:"+DEFAULT_PORT);
    ExecutorService executorService= Executors.newFixedThreadPool(5);
    while(true) {
        Socket socket = serverSocket.accept();
        executorService.submit(new SocketThread(socket));
    }
}

SocketThread

public class SocketThread implements Runnable{
    Socket socket;

    public SocketThread(Socket socket) {
        this.socket = socket;
    }
    @Override
    public void run() {
        System.out.println("客戶端:" + socket.getPort() + "已連接");
        try {
            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
            String clientStr = null; //讀取一行信息
            clientStr = bufferedReader.readLine();
            System.out.println("客戶端發(fā)了一段消息:" + clientStr);
            Thread.sleep(20000);
            BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
            bufferedWriter.write("我已經(jīng)收到你的消息了\n");
            bufferedWriter.flush(); //清空緩沖區(qū)觸發(fā)消息發(fā)送
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

如圖4-3所示抑胎,當(dāng)引入了多線程之后,每個客戶端的鏈接(Socket)甜熔,我們可以直接給到線程池去執(zhí)行圆恤,而由于這個過程是異步的,所以并不會同步阻塞影響后續(xù)鏈接的監(jiān)聽,因此在一定程度上可以提升服務(wù)端鏈接的處理數(shù)量盆昙。

image-20210708160026412

<center>圖4-3</center>

NIO非阻塞IO

使用多線程的方式來解決這個問題羽历,仍然有一個缺點,線程的數(shù)量取決于硬件配置淡喜,所以線程數(shù)量是有限的秕磷,如果請求量比較大的時候,線程本身會收到限制從而并發(fā)量也不會太高炼团。那怎么辦呢澎嚣,我們可以采用非阻塞IO。

NIO 從JDK1.4 提出的瘟芝,本意是New IO易桃,它的出現(xiàn)為了彌補(bǔ)原本IO的不足,提供了更高效的方式锌俱,提出一個通道(channel)的概念晤郑,在IO中它始終以流的形式對數(shù)據(jù)的傳輸和接受,下面我們演示一下NIO的使用贸宏。

NioServerSocket

public class NioServerSocket {
    public static void main(String[] args) {
        try {
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.configureBlocking(false);
            serverSocketChannel.socket().bind(new InetSocketAddress(8080));
            while (true) {
                SocketChannel socketChannel = serverSocketChannel.accept();
                if (socketChannel != null) {
                    //讀取數(shù)據(jù)
                    ByteBuffer buffer = ByteBuffer.allocate(1024);
                    socketChannel.read(buffer);
                    System.out.println(new String(buffer.array()));
                    //寫出數(shù)據(jù)
                    Thread.sleep(10000); //阻塞一段時間
                    //當(dāng)數(shù)據(jù)讀取到緩沖區(qū)之后造寝,接下來就需要把緩沖區(qū)的數(shù)據(jù)寫出到通道,而在寫出之前必須要調(diào)用flip方法吭练,實際上就是重置一個有效字節(jié)范圍诫龙,然后把這個數(shù)據(jù)接觸到通道。
                    buffer.flip();
                    socketChannel.write(buffer);//寫出數(shù)據(jù)
                } else {
                    Thread.sleep(1000);
                    System.out.println("連接未就緒");
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

NioClientSocket

public class NioClientSocket {
    public static void main(String[] args) {
        try {
            SocketChannel socketChannel= SocketChannel.open();
            socketChannel.configureBlocking(false);
            socketChannel.connect(new InetSocketAddress("localhost",8080));
            if(socketChannel.isConnectionPending()){
                socketChannel.finishConnect();
            }
            ByteBuffer byteBuffer= ByteBuffer.allocate(1024);
            byteBuffer.put("Hello I'M SocketChannel Client".getBytes());
            byteBuffer.flip();
            socketChannel.write(byteBuffer);
            //讀取服務(wù)端數(shù)據(jù)
            byteBuffer.clear();
            while(true) {
                int i = socketChannel.read(byteBuffer);
                if (i > 0) {
                    System.out.println("收到服務(wù)端的數(shù)據(jù):" + new String(byteBuffer.array()));
                } else {
                    System.out.println("服務(wù)端數(shù)據(jù)未準(zhǔn)備好");
                    Thread.sleep(1000);
                }
            }
        } catch (IOException | InterruptedException e) {
            e.printStackTrace();
        }

    }
}

所謂的NIO(非阻塞IO)鲫咽,其實就是取消了IO阻塞和連接阻塞签赃,當(dāng)服務(wù)端不存在阻塞的時候,就可以不斷輪詢處理客戶端的請求浑侥,如圖4-4所示姊舵,表示NIO下的運(yùn)行流程。

image-20210708165359843

<center>圖4-4</center>

上述這種NIO的使用方式寓落,仍然存在一個問題括丁,就是客戶端或者服務(wù)端需要通過一個線程不斷輪詢才能獲得結(jié)果,而這個輪詢過程中會浪費(fèi)線程資源伶选。

多路復(fù)用IO

大家站在全局的角度再思考一下整個過程史飞,有哪些地方可以優(yōu)化呢?

我們回到NIOClientSocket中下面這段代碼仰税,當(dāng)客戶端通過read方法去讀取服務(wù)端返回的數(shù)據(jù)時构资,如果此時服務(wù)端數(shù)據(jù)未準(zhǔn)備好,對于客戶端來說就是一次無效的輪詢陨簇。

我們能不能夠設(shè)計成吐绵,當(dāng)客戶端調(diào)用read方法之后,不僅僅不阻塞,同時也不需要輪詢己单。而是等到服務(wù)端的數(shù)據(jù)就緒之后唉窃, 告訴客戶端。然后客戶端再去讀取服務(wù)端返回的數(shù)據(jù)呢纹笼?

就像點外賣一樣纹份,我們在網(wǎng)上下單之后,繼續(xù)做其他事情廷痘,等到外賣到了公司蔓涧,外賣小哥主動打電話告訴你,你直接去前臺取餐即可笋额。

while(true) {
    int i = socketChannel.read(byteBuffer);
    if (i > 0) {
        System.out.println("收到服務(wù)端的數(shù)據(jù):" + new String(byteBuffer.array()));
    } else {
        System.out.println("服務(wù)端數(shù)據(jù)未準(zhǔn)備好");
        Thread.sleep(1000);
    }
}

所以為了優(yōu)化這個問題元暴,引入了多路復(fù)用機(jī)制。

I/O多路復(fù)用的本質(zhì)是通過一種機(jī)制(系統(tǒng)內(nèi)核緩沖I/O數(shù)據(jù))兄猩,讓單個進(jìn)程可以監(jiān)視多個文件描述符昨寞,一旦某個描述符就緒(一般是讀就緒或?qū)懢途w),能夠通知程序進(jìn)行相應(yīng)的讀寫操作

什么是fd:在linux中厦滤,內(nèi)核把所有的外部設(shè)備都當(dāng)成是一個文件來操作,對一個文件的讀寫會調(diào)用內(nèi)核提供的系統(tǒng)命令歼狼,返回一個fd(文件描述符)掏导。而對于一個socket的讀寫也會有相應(yīng)的文件描述符,成為socketfd羽峰。

常見的IO多路復(fù)用方式有【select趟咆、poll、epoll】梅屉,都是Linux API提供的IO復(fù)用方式值纱,那么接下來重點講一下select、和epoll這兩個模型

  • select:進(jìn)程可以通過把一個或者多個fd傳遞給select系統(tǒng)調(diào)用坯汤,進(jìn)程會阻塞在select操作上虐唠,這樣select可以幫我們檢測多個fd是否處于就緒狀態(tài),這個模式有兩個缺點

    • 由于他能夠同時監(jiān)聽多個文件描述符惰聂,假如說有1000個疆偿,這個時候如果其中一個fd 處于就緒狀態(tài)了,那么當(dāng)前進(jìn)程需要線性輪詢所有的fd搓幌,也就是監(jiān)聽的fd越多杆故,性能開銷越大。
    • 同時溉愁,select在單個進(jìn)程中能打開的fd是有限制的处铛,默認(rèn)是1024,對于那些需要支持單機(jī)上萬的TCP連接來說確實有點少
  • epoll:linux還提供了epoll的系統(tǒng)調(diào)用,epoll是基于事件驅(qū)動方式來代替順序掃描撤蟆,因此性能相對來說更高奕塑,主要原理是,當(dāng)被監(jiān)聽的fd中枫疆,有fd就緒時爵川,會告知當(dāng)前進(jìn)程具體哪一個fd就緒,那么當(dāng)前進(jìn)程只需要去從指定的fd上讀取數(shù)據(jù)即可息楔,另外寝贡,epoll所能支持的fd上線是操作系統(tǒng)的最大文件句柄,這個數(shù)字要遠(yuǎn)遠(yuǎn)大于1024

【由于epoll能夠通過事件告知應(yīng)用進(jìn)程哪個fd是可讀的值依,所以我們也稱這種IO為異步非阻塞IO圃泡,當(dāng)然它是偽異步的,因為它還需要去把數(shù)據(jù)從內(nèi)核同步復(fù)制到用戶空間中愿险,真正的異步非阻塞颇蜡,應(yīng)該是數(shù)據(jù)已經(jīng)完全準(zhǔn)備好了,我只需要從用戶空間讀就行】

I/O多路復(fù)用的好處是可以通過把多個I/O的阻塞復(fù)用到同一個select的阻塞上辆亏,從而使得系統(tǒng)在單線程的情況下可以同時處理多個客戶端請求风秤。它的最大優(yōu)勢是系統(tǒng)開銷小,并且不需要創(chuàng)建新的進(jìn)程或者線程扮叨,降低了系統(tǒng)的資源開銷缤弦,它的整體實現(xiàn)思想如圖4-5所示。

客戶端請求到服務(wù)端后彻磁,此時客戶端在傳輸數(shù)據(jù)過程中碍沐,為了避免Server端在read客戶端數(shù)據(jù)過程中阻塞,服務(wù)端會把該請求注冊到Selector復(fù)路器上衷蜓,服務(wù)端此時不需要等待累提,只需要啟動一個線程,通過selector.select()阻塞輪詢復(fù)路器上就緒的channel即可磁浇,也就是說斋陪,如果某個客戶端連接數(shù)據(jù)傳輸完成,那么select()方法會返回就緒的channel扯夭,然后執(zhí)行相關(guān)的處理即可鳍贾。

image-20210708203509498

<center>圖4-5</center>

NIOServer的實現(xiàn)如下

測試訪問的時候,直接在cmd中通過telnet連接NIOServer交洗,便可發(fā)送信息骑科。

public class NIOServer implements Runnable{
    Selector selector;
    ServerSocketChannel serverSocketChannel;
    public NIOServer(int port) throws IOException {
        selector=Selector.open(); //多路復(fù)用器
        serverSocketChannel=ServerSocketChannel.open();
        //綁定監(jiān)聽端口
        serverSocketChannel.socket().bind(new InetSocketAddress(port));
        serverSocketChannel.configureBlocking(false);//非阻塞配置
        //針對serverSocketChannel注冊一個ACCEPT連接監(jiān)聽事件
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
    }
    @Override
    public void run() {
        while(!Thread.interrupted()){
            try {
                selector.select(); //阻塞等待事件就緒
                Set selected=selector.selectedKeys(); //得到事件列表
                Iterator it=selected.iterator();
                while(it.hasNext()){
                    dispatch((SelectionKey) it.next()); //分發(fā)事件
                    it.remove(); //移除當(dāng)前時間
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    private void dispatch(SelectionKey key) throws IOException {
        if(key.isAcceptable()){ //如果是客戶端的連接事件,則需要針對該連接注冊讀寫事件
            register(key);
        }else if(key.isReadable()){
            read(key);
        }else if(key.isWritable()){
            write(key);
        }
    }
    private void register(SelectionKey key) throws IOException {
        //得到事件對應(yīng)的連接
        ServerSocketChannel server=(ServerSocketChannel)key.channel();
        SocketChannel channel=server.accept(); //獲得客戶端的鏈接
        channel.configureBlocking(false);
        //把當(dāng)前客戶端連接注冊到selector上构拳,注冊事件為READ咆爽,
        // 也就是當(dāng)前channel可讀時梁棠,就會觸發(fā)事件,然后讀取客戶端的數(shù)據(jù)
        channel.register(this.selector,SelectionKey.OP_READ);
    }

    private void read(SelectionKey key) throws IOException {
        SocketChannel channel=(SocketChannel)key.channel();
        ByteBuffer byteBuffer= ByteBuffer.allocate(1024);
        channel.read(byteBuffer); //把數(shù)據(jù)從channel讀取到緩沖區(qū)
        System.out.println("server receive msg:"+new String(byteBuffer.array()));
    }
    private void write(SelectionKey key) throws IOException {
        SocketChannel channel=(SocketChannel)key.channel();
        //寫一個信息給到客戶端
        channel.write(ByteBuffer.wrap("hello Client,I'm NIO Server\r\n".getBytes()));
    }

    public static void main(String[] args) throws IOException {
        NIOServer server=new NIOServer(8888);
        new Thread(server).start();
    }
}

事實上NIO已經(jīng)解決了上述BIO暴露的下面兩個問題:

  1. 同步阻塞IO斗埂,讀寫阻塞符糊,線程等待時間過長。
  2. 在制定線程策略的時候呛凶,只能根據(jù)CPU的數(shù)目來限定可用線程資源男娄,不能根據(jù)連接并發(fā)數(shù)目來制定,也就是連接有限制漾稀。否則很難保證對客戶端請求的高效和公平模闲。

到這里為止,通過NIO的多路復(fù)用機(jī)制崭捍,解決了IO阻塞導(dǎo)致客戶端連接處理受限的問題尸折,服務(wù)端只需要一個線程就可以維護(hù)多個客戶端,并且客戶端的某個連接如果準(zhǔn)備就緒時殷蛇,會通過事件機(jī)制告訴應(yīng)用程序某個channel可用实夹,應(yīng)用程序通過select方法選出就緒的channel進(jìn)行處理。

單線程Reactor 模型(高性能I/O設(shè)計模式)

了解了NIO多路復(fù)用后粒梦,就有必要再和大家說一下Reactor多路復(fù)用高性能I/O設(shè)計模式亮航,Reactor本質(zhì)上就是基于NIO多路復(fù)用機(jī)制提出的一個高性能IO設(shè)計模式,它的核心思想是把響應(yīng)IO事件和業(yè)務(wù)處理進(jìn)行分離匀们,通過一個或者多個線程來處理IO事件塞赂,然后將就緒得到事件分發(fā)到業(yè)務(wù)處理handlers線程去異步非阻塞處理,如圖4-6所示昼蛀。

Reactor模型有三個重要的組件:

  • Reactor :將I/O事件發(fā)派給對應(yīng)的Handler
  • Acceptor :處理客戶端連接請求
  • Handlers :執(zhí)行非阻塞讀/寫
image-20210708212057895

<center>圖4-6</center>

下面演示一個單線程的Reactor模型。

Reactor

Reactor 負(fù)責(zé)響應(yīng)IO事件圆存,一旦發(fā)生叼旋,廣播發(fā)送給相應(yīng)的Handler去處理。

public class Reactor implements Runnable{
    private final Selector selector;
    private final ServerSocketChannel serverSocketChannel;

    public Reactor(int port) throws IOException {
        //創(chuàng)建選擇器
        selector= Selector.open();
        //創(chuàng)建NIO-Server
        serverSocketChannel=ServerSocketChannel.open();
        serverSocketChannel.bind(new InetSocketAddress(port));
        serverSocketChannel.configureBlocking(false);
        SelectionKey key=serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        // 綁定一個附加對象
        key.attach(new Acceptor(selector,serverSocketChannel));
    }

    @Override
    public void run() {
        while(!Thread.interrupted()){
            try {
                selector.select(); //阻塞等待就緒事件
                Set selectionKeys=selector.selectedKeys();
                Iterator it=selectionKeys.iterator();
                while(it.hasNext()){
                    dispatch((SelectionKey) it.next());
                    it.remove();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    public void dispatch(SelectionKey key){
        //調(diào)用之前注冊時附加的對象沦辙,也就是attach附加的acceptor
        Runnable r=(Runnable)key.attachment();
        if(r!=null){
            r.run();
        }
    }

    public static void main(String[] args) throws IOException {
        new Thread(new Reactor(8888)).start();
    }
}

Acceptor

public class Acceptor implements Runnable{
    private Selector selector;
    private ServerSocketChannel serverSocketChannel;

    public Acceptor(Selector selector, ServerSocketChannel serverSocketChannel) {
        this.selector = selector;
        this.serverSocketChannel = serverSocketChannel;
    }

    @Override
    public void run() {
        SocketChannel channel;
        try {
            channel=serverSocketChannel.accept();
            System.out.println(channel.getRemoteAddress()+": 收到一個客戶端連接");
            channel.configureBlocking(false);
            //當(dāng)channel連接中數(shù)據(jù)就緒時夫植,調(diào)用DispatchHandler來處理channel
            //巧妙使用了SocketChannel的attach功能,將Hanlder和可能會發(fā)生事件的channel鏈接在一起油讯,當(dāng)發(fā)生事件時详民,可以立即觸發(fā)相應(yīng)鏈接的Handler。
            channel.register(selector, SelectionKey.OP_READ,new DispatchHandler(channel));
        } catch (IOException e) {
            e.printStackTrace();
        }

    }
}

Handler

public class DispatchHandler implements Runnable{
    private SocketChannel channel;

    public DispatchHandler(SocketChannel channel) {
        this.channel = channel;
    }

    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName()+"---handler"); //case: 打印當(dāng)前線程名稱陌兑,證明I/O是同一個線程來處理沈跨。
        ByteBuffer buffer=ByteBuffer.allocate(1024);
        int len=0,total=0;
        String msg="";
        try {
            do {
                len = channel.read(buffer);
                if (len > 0) {
                    total += len;
                    msg += new String(buffer.array());
                }
                buffer.clear();
            } while (len > buffer.capacity());
            System.out.println(channel.getRemoteAddress()+":Server Receive msg:"+msg);

        }catch (Exception e){
            e.printStackTrace();
            if(channel!=null){
                try {
                    channel.close();
                } catch (IOException ioException) {
                    ioException.printStackTrace();
                }
            }
        }

    }
}

演示方式,通過window的cmd窗口兔综,使用telnet 192.168.1.102 8888 連接到Server端進(jìn)行數(shù)據(jù)通信饿凛;也可以通過下面這樣一個客戶端程序來訪問狞玛。

ReactorClient

public class ReactorClient {

    private static Selector selector;
    public static void main(String[] args) throws IOException {
        selector=Selector.open();
        //創(chuàng)建一個連接通道連接指定的server
        SocketChannel socketChannel= SocketChannel.open();
        socketChannel.configureBlocking(false);
        socketChannel.connect(new InetSocketAddress("192.168.1.102",8888));
        socketChannel.register(selector, SelectionKey.OP_CONNECT);
        while(true){
            selector.select();
            Set<SelectionKey> selectionKeys=selector.selectedKeys();
            Iterator<SelectionKey> iterator=selectionKeys.iterator();
            while(iterator.hasNext()){
                SelectionKey key=iterator.next();
                iterator.remove();
                if(key.isConnectable()){
                    handleConnection(key);
                }else if(key.isReadable()){
                    handleRead(key);
                }
            }
        }
    }
    private static void handleConnection(SelectionKey key) throws IOException {
        SocketChannel socketChannel=(SocketChannel)key.channel();
        if(socketChannel.isConnectionPending()){
            socketChannel.finishConnect();
        }
        socketChannel.configureBlocking(false);
        while(true) {
            Scanner in = new Scanner(System.in);
            String msg = in.nextLine();
            socketChannel.write(ByteBuffer.wrap(msg.getBytes()));
            socketChannel.register(selector,SelectionKey.OP_READ);
        }
    }
    private static void handleRead(SelectionKey key) throws IOException {
        SocketChannel channel=(SocketChannel)key.channel();
        ByteBuffer byteBuffer=ByteBuffer.allocate(1024);
        channel.read(byteBuffer);
        System.out.println("client receive msg:"+new String(byteBuffer.array()));
    }
}

這是最基本的單Reactor單線程模型(整體的I/O操作是由同一個線程完成的)

其中Reactor線程涧窒,負(fù)責(zé)多路分離套接字心肪,有新連接到來觸發(fā)connect 事件之后,交由Acceptor進(jìn)行處理纠吴,有IO讀寫事件之后交給hanlder 處理硬鞍。

Acceptor主要任務(wù)就是構(gòu)建handler ,在獲取到和client相關(guān)的SocketChannel之后 戴已,綁定到相應(yīng)的hanlder上固该,對應(yīng)的SocketChannel有讀寫事件之后,基于racotor 分發(fā),hanlder就可以處理了(所有的IO事件都綁定到selector上恭陡,有Reactor分發(fā))

Reactor 模式本質(zhì)上指的是使用 I/O 多路復(fù)用(I/O multiplexing) + 非阻塞 I/O(non-blocking I/O)的模式蹬音。

多線程單Reactor模型

單線程Reactor這種實現(xiàn)方式有存在著缺點,從實例代碼中可以看出休玩,handler的執(zhí)行是串行的著淆,如果其中一個handler處理線程阻塞將導(dǎo)致其他的業(yè)務(wù)處理阻塞。由于handler和reactor在同一個線程中的執(zhí)行拴疤,這也將導(dǎo)致新的無法接收新的請求永部,我們做一個小實驗:

  • 在上述Reactor代碼的DispatchHandler的run方法中,增加一個Thread.sleep()呐矾。
  • 打開多個客戶端窗口連接到Reactor Server端苔埋,其中一個窗口發(fā)送一個信息后被阻塞,另外一個窗口再發(fā)信息時由于前面的請求阻塞導(dǎo)致后續(xù)請求無法被處理蜒犯。

為了解決這種問題组橄,有人提出使用多線程的方式來處理業(yè)務(wù)钳踊,也就是在業(yè)務(wù)處理的地方加入線程池異步處理糟趾,將reactor和handler在不同的線程來執(zhí)行企孩,如圖4-7所示桶唐。

image-20210709154534593

<center>圖4-7</center>

多線程改造-MultiDispatchHandler

我們直接將4.2.5小節(jié)中的Reactor單線程模型改成多線程舌界,其實我們就是把IO阻塞的問題通過異步的方式做了優(yōu)化呛伴,代碼如下忧换,

public class MultiDispatchHandler implements Runnable{
    private SocketChannel channel;

    public MultiDispatchHandler(SocketChannel channel) {
        this.channel = channel;
    }
    private static Executor executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() << 1);

    @Override
    public void run() {
        processor();
    }
    private void processor(){
        executor.execute(new ReaderHandler(channel));
    }
    public static class ReaderHandler implements Runnable{
        private SocketChannel channel;

        public ReaderHandler(SocketChannel socketChannel) {
            this.channel = socketChannel;
        }
        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName()+"---handler"); //case: 打印當(dāng)前線程名稱嫩码,證明I/O是同一個線程來處理潮改。
            ByteBuffer buffer= ByteBuffer.allocate(1024);
            int len=0;
            String msg="";
            try {
                do {
                    len = channel.read(buffer);
                    if (len > 0) {
                        msg += new String(buffer.array());
                    }
                    buffer.clear();
                } while (len > buffer.capacity());

                if(len>0) {
                    System.out.println(channel.getRemoteAddress() + ":Server Receive msg:" + msg);
                }
            }catch (Exception e){
                e.printStackTrace();
                if(channel!=null){
                    try {
                        channel.close();
                    } catch (IOException ioException) {
                        ioException.printStackTrace();
                    }
                }
            }
        }
    }
}

Acceptor

public class Acceptor implements Runnable{
    private Selector selector;
    private ServerSocketChannel serverSocketChannel;

    public Acceptor(Selector selector, ServerSocketChannel serverSocketChannel) {
        this.selector = selector;
        this.serverSocketChannel = serverSocketChannel;
    }

    @Override
    public void run() {
        SocketChannel channel;
        try {
            channel=serverSocketChannel.accept();
            System.out.println(channel.getRemoteAddress()+": 收到一個客戶端連接");
            channel.configureBlocking(false);
            //當(dāng)channel連接中數(shù)據(jù)就緒時狭郑,調(diào)用DispatchHandler來處理channel
            //巧妙使用了SocketChannel的attach功能,將Hanlder和可能會發(fā)生事件的channel鏈接在一起汇在,當(dāng)發(fā)生事件時翰萨,可以立即觸發(fā)相應(yīng)鏈接的Handler。
            channel.register(selector, SelectionKey.OP_READ,new MultiDispatchHandler(channel));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

多線程Reactor總結(jié)

在多線程Reactor模型中糕殉,添加了一個工作者線程池缨历,并將非I/O操作從Reactor線程中移出轉(zhuǎn)交給工作者線程池來執(zhí)行以蕴。這樣能夠提高Reactor線程的I/O響應(yīng),不至于因為一些耗時的業(yè)務(wù)邏輯而延遲對后面I/O請求的處理辛孵。

多Reactor多線程模式(主從多Reactor模型)

在多線程單Reactor模型中丛肮,我們發(fā)現(xiàn)所有的I/O操作是由一個Reactor來完成,而Reactor運(yùn)行在單個線程中魄缚,它需要處理包括Accept()/read()/write/connect操作宝与,對于小容量的場景,影響不大冶匹。但是對于高負(fù)載习劫、大并發(fā)或大數(shù)據(jù)量的應(yīng)用場景時,容易成為瓶頸嚼隘,主要原因如下:

  • 一個NIO線程同時處理成百上千的鏈路诽里,性能上無法支撐,即便NIO線程的CPU負(fù)荷達(dá)到100%飞蛹,也無法滿足海量消息的讀取和發(fā)送谤狡;
  • 當(dāng)NIO線程負(fù)載過重之后,處理速度將變慢卧檐,這會導(dǎo)致大量客戶端連接超時墓懂,超時之后往往會進(jìn)行重發(fā),這更加重了NIO線程的負(fù)載霉囚,最終會導(dǎo)致大量消息積壓和處理超時捕仔,成為系統(tǒng)的性能瓶頸;

所以盈罐,我們還可以更進(jìn)一步優(yōu)化榜跌,引入多Reactor多線程模式,如圖4-8所示盅粪,Main Reactor負(fù)責(zé)接收客戶端的連接請求斜做,然后把接收到的請求傳遞給SubReactor(其中subReactor可以有多個),具體的業(yè)務(wù)IO處理由SubReactor完成湾揽。

Multiple Reactors 模式通常也可以等同于 Master-Workers 模式,比如 Nginx 和 Memcached 等就是采用這種多線程模型笼吟,雖然不同的項目實現(xiàn)細(xì)節(jié)略有區(qū)別库物,但總體來說模式是一致的。

image-20210709162516832

<center>圖4-8</center>

  • Acceptor贷帮,請求接收者戚揭,在實踐時其職責(zé)類似服務(wù)器,并不真正負(fù)責(zé)連接請求的建立撵枢,而只將其請求委托 Main Reactor 線程池來實現(xiàn)民晒,起到一個轉(zhuǎn)發(fā)的作用精居。

  • Main Reactor,主 Reactor 線程組潜必,主要負(fù)責(zé)連接事件靴姿,并將IO讀寫請求轉(zhuǎn)發(fā)到 SubReactor 線程池

  • Sub Reactor磁滚,Main Reactor 通常監(jiān)聽客戶端連接后會將通道的讀寫轉(zhuǎn)發(fā)到 Sub Reactor 線程池中一個線程(負(fù)載均衡)佛吓,負(fù)責(zé)數(shù)據(jù)的讀寫。在 NIO 中 通常注冊通道的讀(OP_READ)垂攘、寫事件(OP_WRITE)维雇。

MultiplyReactor

public class MultiplyReactor {
    public static void main(String[] args) throws IOException {
        MultiplyReactor mr = new MultiplyReactor(8888);
        mr.start();
    }
    private static final int POOL_SIZE = Runtime.getRuntime().availableProcessors();
    // Reactor(Selector) 線程池,其中一個線程被 mainReactor 使用晒他,剩余線程都被 subReactor 使用
    static Executor mainReactorExecutor = Executors.newFixedThreadPool(POOL_SIZE);
    // 主 Reactor吱型,接收連接,把 SocketChannel 注冊到從 Reactor 上
    private Reactor mainReactor;
    private int port;

    public MultiplyReactor(int port) {
        try {
            this.port = port;
            mainReactor = new Reactor();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    /**
     * 啟動主從 Reactor陨仅,初始化并注冊 Acceptor 到主 Reactor
     */
    public void start() throws IOException {
        new Acceptor(mainReactor.getSelector(), port); // 將 ServerSocketChannel 注冊到 mainReactor
        mainReactorExecutor.execute(mainReactor); //使用線程池來處理main Reactor的連接請求
    }
}

Reactor

public class Reactor implements Runnable{
    private ConcurrentLinkedQueue<AsyncHandler> events=new ConcurrentLinkedQueue<>();
    private final Selector selector;

    public Reactor() throws IOException {
        this.selector = Selector.open();
    }

    public Selector getSelector(){
        return selector;
    }
    @Override
    public void run() {
        try {
            while (!Thread.interrupted()) {
                AsyncHandler handler;
                while ((handler = events.poll()) != null) {
                    handler.getChannel().configureBlocking(false);
                    SelectionKey sk=handler.getChannel().register(selector, SelectionKey.OP_READ);
                    sk.attach(handler);
                    handler.setSk(sk);
                }
                selector.select(); //阻塞
                Set<SelectionKey> selectionKeys=selector.selectedKeys();
                Iterator<SelectionKey> it=selectionKeys.iterator();
                while(it.hasNext()){
                    SelectionKey key=it.next();
                    //獲取attach方法傳入的附加對象
                    Runnable runnable=(Runnable)key.attachment();
                    if(runnable!=null){
                        runnable.run();
                    }
                    it.remove();
                }
            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }
    public void register(AsyncHandler asyncHandler){
        events.offer(asyncHandler);
        selector.wakeup();
    }
}

Acceptor

public class Acceptor implements Runnable{

    final Selector sel;
    final ServerSocketChannel serverSocket;
    int handleNext = 0;

    private final int POOL_SIZE=Runtime.getRuntime().availableProcessors();
    private Executor subReactorExecutor= Executors.newFixedThreadPool(POOL_SIZE);

    private Reactor[] subReactors=new Reactor[POOL_SIZE-1];

    public Acceptor(Selector sel, int port) throws IOException {
        this.sel = sel;
        serverSocket = ServerSocketChannel.open();
        serverSocket.socket().bind(new InetSocketAddress(port)); // 綁定端口
        // 設(shè)置成非阻塞模式
        serverSocket.configureBlocking(false);
        // 注冊到 選擇器 并設(shè)置處理 socket 連接事件
        serverSocket.register(sel, SelectionKey.OP_ACCEPT,this);
        init();
        System.out.println("mainReactor-" + "Acceptor: Listening on port: " + port);
    }
    public void init() throws IOException {
        for (int i = 0; i < subReactors.length; i++) {
            subReactors[i]=new Reactor();
            subReactorExecutor.execute(subReactors[i]);
        }
    }
    @Override
    public synchronized void run() {
        try {
            // 接收連接津滞,非阻塞模式下,沒有連接直接返回 null
            SocketChannel sc = serverSocket.accept();
            if (sc != null) {
                // 把提示發(fā)到界面
                sc.write(ByteBuffer.wrap("Multiply Reactor Pattern Example\r\nreactor> ".getBytes()));
                System.out.println(Thread.currentThread().getName()+":Main-Reactor-Acceptor: " + sc.socket().getLocalSocketAddress() +" 注冊到 subReactor-" + handleNext);
                // 如何解決呢掂名,直接調(diào)用 wakeup据沈,有可能還沒有注冊成功又阻塞了。這是一個多線程同步的問題饺蔑,可以借助隊列進(jìn)行處理
                Reactor subReactor = subReactors[handleNext];
                subReactor.register(new AsyncHandler(sc));
                if(++handleNext == subReactors.length) {
                    handleNext = 0;
                }
            }
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }
}

AsyncHandler

public class AsyncHandler implements Runnable{
    private SocketChannel channel;
    private SelectionKey sk;

    ByteBuffer inputBuffer=ByteBuffer.allocate(1024);
    ByteBuffer outputBuffer=ByteBuffer.allocate(1024);
    StringBuilder builder=new StringBuilder(); //存儲客戶端的完整消息

    public AsyncHandler(SocketChannel channel){
        this.channel=channel;
    }

    public SocketChannel getChannel() {
        return channel;
    }

    public void setSk(SelectionKey sk) {
        this.sk = sk;
    }

    @Override
    public void run() {
        try {
            if (sk.isReadable()) {
                read();
            } else if (sk.isWritable()) {
                write();
            }
        }catch (Exception e){
            try {
                this.sk.channel().close();
            } catch (IOException ioException) {
                ioException.printStackTrace();
            }
        }
    }

    protected void read() throws IOException {
        inputBuffer.clear();
        int n=channel.read(inputBuffer);
        if(inputBufferComplete(n)){
            System.out.println(Thread.currentThread().getName()+":Server端收到客戶端的請求消息:"+builder.toString());
            outputBuffer.put(builder.toString().getBytes(StandardCharsets.UTF_8));
            this.sk.interestOps(SelectionKey.OP_WRITE); //更改服務(wù)的邏輯狀態(tài)以及處理的事件類型
        }
    }

    private boolean inputBufferComplete(int bytes) throws EOFException {
        if(bytes>0){
            inputBuffer.flip(); //轉(zhuǎn)化成讀取模式
            while(inputBuffer.hasRemaining()){ //判斷緩沖區(qū)中是否還有元素
                byte ch=inputBuffer.get(); //得到輸入的字符
                if(ch==3){ //表示Ctrl+c 關(guān)閉連接
                    throw new EOFException();
                }else if(ch=='\r'||ch=='\n'){ //表示換行符
                    return true;
                }else{
                    builder.append((char)ch); //拼接讀取到的數(shù)據(jù)
                }
            }
        }else if(bytes==-1){
            throw new EOFException(); //客戶端關(guān)閉了連接
        }
        return false;
    }

    private void write() throws IOException {
        int written=-1;
        outputBuffer.flip(); //轉(zhuǎn)化為讀模式锌介,判斷是否有數(shù)據(jù)需要發(fā)送
        if(outputBuffer.hasRemaining()){
            written=channel.write(outputBuffer); //把數(shù)據(jù)寫回客戶端
        }
        outputBuffer.clear();
        builder.delete(0,builder.length());
        if(written<=0){ //表示客戶端沒有輸信息
            this.sk.channel().close();
        }else{
            channel.write(ByteBuffer.wrap("\r\nreactor>".getBytes()));
            this.sk.interestOps(SelectionKey.OP_READ);
        }
    }
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市猾警,隨后出現(xiàn)的幾起案子孔祸,更是在濱河造成了極大的恐慌,老刑警劉巖发皿,帶你破解...
    沈念sama閱讀 218,386評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件崔慧,死亡現(xiàn)場離奇詭異,居然都是意外死亡穴墅,警方通過查閱死者的電腦和手機(jī)惶室,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,142評論 3 394
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來玄货,“玉大人皇钞,你說我怎么就攤上這事∷勺剑” “怎么了夹界?”我有些...
    開封第一講書人閱讀 164,704評論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長隘世。 經(jīng)常有香客問我可柿,道長鸠踪,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,702評論 1 294
  • 正文 為了忘掉前任复斥,我火速辦了婚禮营密,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘永票。我一直安慰自己卵贱,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 67,716評論 6 392
  • 文/花漫 我一把揭開白布侣集。 她就那樣靜靜地躺著键俱,像睡著了一般。 火紅的嫁衣襯著肌膚如雪世分。 梳的紋絲不亂的頭發(fā)上编振,一...
    開封第一講書人閱讀 51,573評論 1 305
  • 那天,我揣著相機(jī)與錄音臭埋,去河邊找鬼踪央。 笑死,一個胖子當(dāng)著我的面吹牛瓢阴,可吹牛的內(nèi)容都是我干的畅蹂。 我是一名探鬼主播,決...
    沈念sama閱讀 40,314評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼荣恐,長吁一口氣:“原來是場噩夢啊……” “哼液斜!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起叠穆,我...
    開封第一講書人閱讀 39,230評論 0 276
  • 序言:老撾萬榮一對情侶失蹤少漆,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后硼被,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體示损,經(jīng)...
    沈念sama閱讀 45,680評論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,873評論 3 336
  • 正文 我和宋清朗相戀三年嚷硫,在試婚紗的時候發(fā)現(xiàn)自己被綠了检访。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,991評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡仔掸,死狀恐怖脆贵,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情嘉汰,我是刑警寧澤,帶...
    沈念sama閱讀 35,706評論 5 346
  • 正文 年R本政府宣布状勤,位于F島的核電站鞋怀,受9級特大地震影響双泪,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜密似,卻給世界環(huán)境...
    茶點故事閱讀 41,329評論 3 330
  • 文/蒙蒙 一焙矛、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧残腌,春花似錦村斟、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,910評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至闺金,卻和暖如春逾滥,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背败匹。 一陣腳步聲響...
    開封第一講書人閱讀 33,038評論 1 270
  • 我被黑心中介騙來泰國打工寨昙, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人掀亩。 一個月前我還...
    沈念sama閱讀 48,158評論 3 370
  • 正文 我出身青樓舔哪,卻偏偏與公主長得像,于是被迫代替她去往敵國和親槽棍。 傳聞我的和親對象是個殘疾皇子捉蚤,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,941評論 2 355

推薦閱讀更多精彩內(nèi)容