Redis高性能原理探秘-IO模型

Redis的性能由哪些因素決定?

  • 內(nèi)存

    由于Redis是基于內(nèi)存的操作,因此內(nèi)存大小是決定其性能的一個重要因素髓考。

  • CPU

    CPU 是另一個重要的影響因素,由于是單線程模型汞贸,Redis 更喜歡大緩存快速 CPU绳军, 而不是多核。

  • 網(wǎng)絡(luò)通信

    網(wǎng)絡(luò)帶寬和延遲通常是最大短板矢腻。

網(wǎng)絡(luò)通信模型

最終目標: 增加客戶端的訪問連接數(shù)量

  • BIO(阻塞IO模型)
    • ServerSocket

    • Socket

    • 阻塞體現(xiàn)在兩個地方:

      連接阻塞

      IO阻塞

    • 使用場景

      zookeeper的leader選舉(3個節(jié)點门驾, 5個節(jié)點)
      nacos的注冊地址信息同步

  • NIO(非阻塞IO)

    把連接阻塞和IO阻塞改成非阻塞

Redis 為什么那么快

Redis的高性能主要依賴于幾個方面。

  • C語言實現(xiàn)多柑,C語言在一定程度上還是比Java語言性能要高一些奶是,因為C語言不需要經(jīng)過JVM進行翻譯。
  • 純內(nèi)存I/O竣灌,內(nèi)存I/O比磁盤I/O性能更快
  • I/O多路復(fù)用聂沙,基于epoll的I/O多路復(fù)用技術(shù),實現(xiàn)高吞吐網(wǎng)絡(luò)I/O
  • 單線程模型初嘹,單線程無法利用到多核CPU及汉,但是在Redis中,性能瓶頸并不是在計算上屯烦,而是在I/O
  • 能力坷随,所以單線程能夠滿足高并發(fā)的要求。 從另一個層面來說驻龟,單線程可以避免多線程的頻繁上
  • 下文切換以及同步鎖機制帶來的性能開銷温眉。
  • 下面我們分別從上述幾個方面進行展開說明,先來看網(wǎng)絡(luò)I/O的多路復(fù)用模型翁狐。

從請求處理開始分析

當我們在客戶端向Redis Server發(fā)送一條指令类溢,并且得到Redis回復(fù)的整個過程中,Redis做了什么呢露懒?

redis-req.png

要處理命令闯冷,則redis必須完整地接收客戶端的請求砂心,并將命令解析出來,再將結(jié)果讀出來蛇耀,通過網(wǎng)絡(luò)回
寫到客戶端计贰。整個工序分為以下幾個部分:

  • 接收,通過TCP接收到命令蒂窒,可能會歷經(jīng)多次TCP包、ack荞怒、IO操作
  • 解析洒琢,將命令取出來
  • 執(zhí)行,到對應(yīng)的地方將value讀出來
  • 返回褐桌,將value通過TCP返回給客戶端衰抑,如果value較大,則IO負荷會更重

其中解析執(zhí)行是純cpu/內(nèi)存操作荧嵌,而接收和返回主要是IO操作呛踊,首先我們先來看通信的過程。

網(wǎng)絡(luò)IO的通信原理

網(wǎng)絡(luò)通信.png

同樣啦撮,用一幅圖來描述網(wǎng)絡(luò)數(shù)據(jù)的傳輸流程

  • 首先谭网,對于TCP通信來說,每個TCP Socket的內(nèi)核中都有一個發(fā)送緩沖區(qū)和一個接收緩沖區(qū)接收緩沖區(qū)把數(shù)據(jù)緩存到內(nèi)核赃春,若應(yīng)用進程一直沒有調(diào)用Socket的read方法進行讀取愉择,那么該數(shù)據(jù)會一直被緩存在接收緩沖區(qū)內(nèi)。不管進程是否讀取Socket织中,對端發(fā)來的數(shù)據(jù)都會經(jīng)過內(nèi)核接收并緩存到Socket的內(nèi)核接收緩沖區(qū)锥涕。
  • read所要做的工作,就是把內(nèi)核接收緩沖區(qū)中的數(shù)據(jù)復(fù)制到應(yīng)用層用戶的Buffer里狭吼。
  • 進程調(diào)用Socket的send發(fā)送數(shù)據(jù)的時候层坠,一般情況下是將數(shù)據(jù)從應(yīng)用層用戶的Buffer里復(fù)制到Socket的內(nèi)核發(fā)送緩沖區(qū),然后send就會在上層返回刁笙。換句話說破花,send返回時,數(shù)據(jù)不一定會被發(fā)送到對端采盒。
  • 網(wǎng)卡中的緩沖區(qū)既不屬于內(nèi)核空間旧乞,也不屬于用戶空間。它屬于硬件緩沖磅氨,允許網(wǎng)卡與操作系統(tǒng)之間有
    個緩沖尺栖; 內(nèi)核緩沖區(qū)在內(nèi)核空間,在內(nèi)存中烦租,用于內(nèi)核程序延赌,做為讀自或?qū)懲布臄?shù)據(jù)緩沖區(qū)除盏; 用
    戶緩沖區(qū)在用戶空間,在內(nèi)存中挫以,用于用戶程序者蠕,做為讀自或?qū)懲布臄?shù)據(jù)緩沖區(qū)。
  • 網(wǎng)卡芯片收到網(wǎng)絡(luò)數(shù)據(jù)會以中斷的方式通知CPU掐松,我有數(shù)據(jù)了踱侣,存在我的硬件緩沖里了,來讀我啊大磺。
    CPU收到這個中斷信號后抡句,會調(diào)用相應(yīng)的驅(qū)動接口函數(shù)從網(wǎng)卡的硬件緩沖里把數(shù)據(jù)讀到內(nèi)核緩沖區(qū),正
    常情況下會向上傳遞給TCP/IP模塊一層一層的處理杠愧。

IO多路復(fù)用機制

Redis的通信采用的是多路復(fù)用機制待榔,什么是多路復(fù)用機制呢?

  • 由于Redis是C語言實現(xiàn)流济,為了方便理解锐锣,我們采用Java語言來描述這個過程。

在理解多路復(fù)用之前绳瘟,我們先來了解一下BIO雕憔。

BIO模型

在Java中,如果要實現(xiàn)網(wǎng)絡(luò)通信稽荧,我們會采用Socket套接字來完成橘茉。
Socket不是一個協(xié)議,而是一個通信模型姨丈。其實它最初是BSD發(fā)明的畅卓,主要用于一臺電腦的兩個進程間通信,后來把它用到了兩臺電腦的進程間通信蟋恬。所以翁潘,可以把它簡單理解為進程間通信,不是什么高級的東西歼争。主要做的事情不就是:

  • A發(fā)包:發(fā)請求包給某個已經(jīng)綁定的端口(所以我們經(jīng)常會訪問這樣的地址182.13.15.16:1235拜马,1235就是端口);收到B的允許沐绒;然后正式發(fā)送俩莽;發(fā)送完了,告訴B要斷開鏈接乔遮;收到斷開允許扮超,馬上斷開,然后發(fā)送已經(jīng)斷開信息給B。
  • B收包:綁定端口和IP出刷;然后在這個端口監(jiān)聽璧疗;接收到A的請求,發(fā)允許給A馁龟,并做好接收準備崩侠,主要就是清理緩存等待接收新數(shù)據(jù);然后正式接收坷檩;接收到斷開請求却音,允許斷開;確認斷開后矢炼,繼續(xù)監(jiān)聽其它請求僧家。

可見,Socket其實就是I/O操作裸删,Socket并不僅限于網(wǎng)絡(luò)通信,在網(wǎng)絡(luò)通信中阵赠,它涵蓋了網(wǎng)絡(luò)層涯塔、傳輸層、會話層清蚀、表示層匕荸、應(yīng)用層——其實這都不需要記,因為Socket通信時候用到了IP和端口枷邪,僅這兩個就表明了它用到了網(wǎng)絡(luò)層和傳輸層榛搔;而且它無視多臺電腦通信的系統(tǒng)差別,所以它涉及了表示層东揣;一般Socket都是基于一個應(yīng)用程序的践惑,所以會涉及到會話層和應(yīng)用層。

構(gòu)建基礎(chǔ)的BIO通信模型
@Slf4j
public class BIOServerSocket {
    public static final int PORT = 8080;
    public static void main(String[] args) {
        try(ServerSocket serverSocket = new ServerSocket(PORT)) {
            log.info("啟動服務(wù):監(jiān)聽端口:{}",PORT);
            //阻塞等待監(jiān)聽一個客戶端連接,返回的socket表示連接的客戶端信息
            Socket socket = serverSocket.accept();
            log.info("客戶端:{}連接成功",socket.getPort());
            //阻塞(InputStream是阻塞的)等待獲取客戶端請求報文
            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
            String clientStr = bufferedReader.readLine();
            log.info("收到客戶端發(fā)送的消息:{}",clientStr);
            //構(gòu)建輸出流,寫回客戶端
            BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
            bufferedWriter.write("server return message:" + clientStr + "\n");
            //清空緩沖區(qū)嘶卧,發(fā)送消息
            bufferedWriter.flush();
        } catch (IOException ioException) {
            ioException.printStackTrace();
        }
    }
}

測試效果:

bio-1.png

我們通過對BIOServerSocket進行改造尔觉,關(guān)注case1和case2部分。

  • case1: 增加了while循環(huán)芥吟,實現(xiàn)重復(fù)監(jiān)聽
  • case2: 當服務(wù)端收到客戶端的請求后侦铜,不直接返回,而是等待20s钟鸵。
@Slf4j
public class BIOServerSocket2 {
    public static final int PORT = 8080;
    public static void main(String[] args) {
        try(ServerSocket serverSocket = new ServerSocket(PORT)) {
            log.info("啟動服務(wù):監(jiān)聽端口:{}",PORT);
            //循環(huán)接收請求
            while (true){
                //阻塞等待監(jiān)聽一個客戶端連接,返回的socket表示連接的客戶端信息
                Socket socket = serverSocket.accept();
                log.info("客戶端:{}連接成功",socket.getPort());
                //阻塞(InputStream是阻塞的)等待獲取客戶端請求報文
                BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
                String clientStr = bufferedReader.readLine();
                log.info("收到客戶端發(fā)送的消息:{}",clientStr);
                //等待20秒
                Thread.sleep(20*1000);
                //構(gòu)建輸出流,寫回客戶端
                BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
                bufferedWriter.write("server return message:" + clientStr + "\n");
                //清空緩沖區(qū)钉稍,發(fā)送消息
                bufferedWriter.flush();
            }
        } catch (IOException ioException) {
            ioException.printStackTrace();
        } catch (InterruptedException interruptedException) {
            interruptedException.printStackTrace();
        }
    }
}

客戶端代碼:BIOClientSocket

@Slf4j
public class BIOClientSocket {
    public static final String HOST = "127.0.0.1";
    public static final int PORT = 8080;
    public static void main(String[] args) {
        try(Socket socket = new Socket(HOST,PORT)) {
            log.info("客戶端連接端口:{}:{}",HOST,PORT);
            //構(gòu)建輸出流,請求服務(wù)端
            BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
            bufferedWriter.write("client1 msg: hello \n");
            //清空緩沖區(qū),發(fā)送消息
            bufferedWriter.flush();
            //阻塞(InputStream是阻塞的)等待獲取客戶端請求報文
            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
            String clientStr = bufferedReader.readLine();
            log.info("收到服務(wù)端返回的消息:{}",clientStr);

        } catch (IOException ioException) {
            ioException.printStackTrace();
        }
    }
}

接著棺耍,把BIOClientSocket復(fù)制兩份(client1贡未、client2),同時向BIOServerSocket發(fā)起請求。

現(xiàn)象: client1先發(fā)送請求到Server端羞秤,由于Server端等待20s才返回缸托,導(dǎo)致client2的請求一直被阻塞。

bio-2.png

這個情況會導(dǎo)致一個問題瘾蛋,如果服務(wù)端在同一個時刻只能處理一個客戶端的連接俐镐,而如果一個網(wǎng)站同時有1000個用戶訪問,那么剩下的999個用戶都需要等待哺哼,而這個等待的耗時取決于前面的請求的處理時長佩抹,如圖所示。

bio-req.png
基于多線程優(yōu)化BIO

為了讓服務(wù)端能夠同時處理更多的客戶端連接取董,避免因為某個客戶端連接阻塞導(dǎo)致后續(xù)請求被阻塞棍苹,于是引入多線程技術(shù),代碼如下茵汰。

BIOServerSocketWithThread

@Slf4j
public class BIOServerSocketWithThread {
    public static TaskExecutor taskExecutor = new TaskExecutor() {
        ExecutorService executorService = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors(),
            0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<>(), Executors.defaultThreadFactory());
        @Override
        public void execute(Runnable task) {
            executorService.execute(task);
        }
    };
    public static final int PORT = 8080;
    public static void main(String[] args) {
        try(ServerSocket serverSocket = new ServerSocket(PORT)) {
            log.info("啟動服務(wù):監(jiān)聽端口:{}",PORT);
            //循環(huán)接收請求
            while (true){
                //阻塞等待監(jiān)聽一個客戶端連接,返回的socket表示連接的客戶端信息
                Socket socket = serverSocket.accept();
                log.info("客戶端:{}連接成功",socket.getPort());
                // I/O異步執(zhí)行
                taskExecutor.execute(new SocketThread(socket));
            }
        } catch (IOException ioException) {
            ioException.printStackTrace();
        }
    }
}

SocketThread

@Slf4j
public class SocketThread implements Runnable{
    private Socket socket;

    public SocketThread(Socket socket) {
        this.socket = socket;
    }
    @Override
    public void run() {
        try {
            //阻塞(InputStream是阻塞的)等待獲取客戶端請求報文
            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
            String clientStr = bufferedReader.readLine();
            log.info("收到客戶端發(fā)送的消息:{}",clientStr);
            //等待20秒
            Thread.sleep(20*1000);
            //構(gòu)建輸出流,寫回客戶端
            BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
            bufferedWriter.write("server return message:" + clientStr + "\n");
            //清空緩沖區(qū)枢里,發(fā)送消息
            bufferedWriter.flush();
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            //TODO 關(guān)閉IO流
        }
    }
}

結(jié)果:

bio-3.png

當引入了多線程之后,每個客戶端的鏈接(Socket)蹂午,我們可以直接給到線程池去執(zhí)行栏豺,而由于這個過程是異步的,所以并不會同步阻塞影響后續(xù)鏈接的監(jiān)聽豆胸,因此在一定程度上可以提升服務(wù)端鏈接的處理數(shù)量奥洼。

bi-req-2.png
NIO非阻塞IO
  • 使用多線程的方式來解決這個問題,仍然有一個缺點晚胡,線程的數(shù)量取決于硬件配置灵奖,所以線程數(shù)量是有限的,如果請求量比較大的時候估盘,線程本身會收到限制從而并發(fā)量也不會太高瓷患。那怎么辦呢,我們可以采用非阻塞IO遣妥。

  • NIO 從JDK1.4 提出的尉尾,本意是New IO,它的出現(xiàn)為了彌補原本IO的不足燥透,提供了更高效的方式沙咏,提出一個通道(channel)的概念,在IO中它始終以流的形式對數(shù)據(jù)的傳輸和接收班套,下面我們演示一下NIO的使用肢藐。

    NIOServerSocket

    @Slf4j
    public class NIOServerSocket {
        public static final int PORT = 8080;
        public static void main(String[] args) {
            try {
                ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
                //設(shè)置連接非阻塞
                serverSocketChannel.configureBlocking(false);
                serverSocketChannel.socket().bind(new InetSocketAddress(PORT));
                while(true){
                    //獲得一個客戶端連接,非阻塞
                    SocketChannel socketChannel = serverSocketChannel.accept();
                    if(socketChannel != null){
                        log.info("客戶端連接:{}",socketChannel.getRemoteAddress());
                        ByteBuffer byteBuffer=ByteBuffer.allocate(1024);
                        socketChannel.read(byteBuffer);
                        log.info("Server Receive Msg:"+new String(byteBuffer.array()));
                        Thread.sleep(10000);
                        //反轉(zhuǎn)
                        byteBuffer.flip();
                        socketChannel.write(byteBuffer);
                    }else{
                        Thread.sleep(1000);
                        log.info("客戶端未連接");
                    }
                }
            } catch (IOException | InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    

    NIOClientSocket

    @Slf4j
    public class NIOClientSocket {
        public static final String HOST = "127.0.0.1";
        public static final int PORT = 8080;
        public static void main(String[] args) {
            try(SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress(HOST,PORT))) {
                socketChannel.configureBlocking(false);
                log.info("客戶端連接端口:{}:{}",HOST,PORT);
                ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                String hello = "NIOClient msg: hello \n";
                byteBuffer.put(hello.getBytes());
                byteBuffer.flip();
                socketChannel.write(byteBuffer);
                while(true) {
                    byteBuffer.clear();
                    int i = socketChannel.read(byteBuffer);
                    if (i > 0) {
                        log.info("收到服務(wù)端的數(shù)據(jù):{}",new String(byteBuffer.array()));
                    } else {
                        log.info("服務(wù)端數(shù)據(jù)未準備好");
                        Thread.sleep(1000);
                    }
                }
    
            } catch (IOException | InterruptedException ioException) {
                ioException.printStackTrace();
            }
        }
    }
    

    演示效果:

    服務(wù)端

nio-1.png

客戶端

nio-2.png
  • 所謂的NIO(非阻塞IO),其實就是取消了IO阻塞和連接阻塞吱韭,當服務(wù)端不存在阻塞的時候吆豹,就可以不斷輪詢處理客戶端的請求鱼的,如圖所示,表示NIO下的運行流程痘煤。
nio-loop.png

上述這種NIO的使用方式凑阶,仍然存在一個問題,就是客戶端或者服務(wù)端需要通過一個線程不斷輪詢才能
獲得結(jié)果衷快,而這個輪詢過程中會浪費線程資源宙橱。

多路復(fù)用IO
  • 大家站在全局的角度再思考一下整個過程,有哪些地方可以優(yōu)化呢蘸拔?

  • 我們回到NIOClientSocket中下面這段代碼师郑,當客戶端通過 read 方法去讀取服務(wù)端返回的數(shù)據(jù)時,如果此時服務(wù)端數(shù)據(jù)未準備好调窍,對于客戶端來說就是一次無效的輪詢宝冕。

    while(true) {
                    byteBuffer.clear();
                    int i = socketChannel.read(byteBuffer);
                    if (i > 0) {
                        log.info("收到服務(wù)端的數(shù)據(jù):{}",new String(byteBuffer.array()));
                    } else {
                        log.info("服務(wù)端數(shù)據(jù)未準備好");
                        Thread.sleep(1000);
                    }
                }
    
  • 我們能不能夠設(shè)計成,當客戶端調(diào)用 read 方法之后邓萨,不僅僅不阻塞地梨,同時也不需要輪詢。而是等到服務(wù)端的數(shù)據(jù)就緒之后缔恳, 告訴客戶端湿刽。然后客戶端再去讀取服務(wù)端返回的數(shù)據(jù)呢?所以為了優(yōu)化這個問題褐耳,引入了多路復(fù)用機制。

  • I/O多路復(fù)用的本質(zhì)是通過一種機制(系統(tǒng)內(nèi)核緩沖I/O數(shù)據(jù))渴庆,讓單個進程可以監(jiān)視多個文件描述符铃芦,一旦某個描述符就緒(一般是讀就緒或?qū)懢途w),能夠通知程序進行相應(yīng)的讀寫操作襟雷。

    什么是文件描述符(fd):在linux中刃滓,內(nèi)核把所有的外部設(shè)備都當成是一個文件來操作,對一個文件的讀寫會調(diào)用內(nèi)核提供的系統(tǒng)命令耸弄,返回一個fd(文件描述符)咧虎。而對于一個socket的讀寫也會有相應(yīng)的文件描述符,成為socketfd计呈。

  • 常見的IO多路復(fù)用方式有【select砰诵、poll、epoll】捌显,都是Linux API提供的IO復(fù)用方式茁彭,那么接下來重點講一下select、和epoll這兩個模型扶歪。

    • select:進程可以通過把一個或者多個fd傳遞給select系統(tǒng)調(diào)用理肺,進程會阻塞在select操作上,這樣select可以幫我們檢測多個fd是否處于就緒狀態(tài),這個模式有兩個缺點

      • 由于他能夠同時監(jiān)聽多個文件描述符妹萨,假如說有1000個年枕,這個時候如果其中一個fd 處于就緒狀態(tài)了,那么當前進程需要線性輪詢所有的fd乎完,也就是監(jiān)聽的fd越多熏兄,性能開銷越大。

      • 同時囱怕,select在單個進程中能打開的fd是有限制的霍弹,默認是1024,對于那些需要支持單機上萬的TCP連接來說確實有點少娃弓。

    • epoll:linux還提供了epoll的系統(tǒng)調(diào)用典格,epoll是基于事件驅(qū)動方式來代替順序掃描,因此性能相對來說更高台丛,主要原理是耍缴,當被監(jiān)聽的fd中,有fd就緒時挽霉,會告知當前進程具體哪一個fd就緒防嗡,那么當前進程只需要去從指定的fd上讀取數(shù)據(jù)即可,另外侠坎,epoll所能支持的fd上線是操作系統(tǒng)的最大文件句柄蚁趁,這個數(shù)字要遠遠大于1024。

      由于epoll能夠通過事件告知應(yīng)用進程哪個fd是可讀的实胸,所以我們也稱這種IO為異步非阻塞IO他嫡,當然它是偽異步的,因為它還需要去把數(shù)據(jù)從內(nèi)核同步復(fù)制到用戶空間中庐完,真正的異步非阻塞钢属,應(yīng)該是數(shù)據(jù)已經(jīng)完全準備好了,我只需要從用戶空間讀就行门躯。

  • I/O多路復(fù)用的好處是可以通過把多個I/O的阻塞復(fù)用到同一個select的阻塞上淆党,從而使得系統(tǒng)在單線程的情況下可以同時處理多個客戶端請求。它的最大優(yōu)勢是系統(tǒng)開銷小讶凉,并且不需要創(chuàng)建新的進程或者線程染乌,降低了系統(tǒng)的資源開銷,它的整體實現(xiàn)思想如圖所示

多路復(fù)用.png
  • 客戶端請求到服務(wù)端后懂讯,此時客戶端在傳輸數(shù)據(jù)過程中慕匠,為了避免Server端在read客戶端數(shù)據(jù)過程中阻塞,服務(wù)端會把該請求注冊到Selector復(fù)路器上域醇,服務(wù)端此時不需要等待台谊,只需要啟動一個線程蓉媳,通過selector.select()阻塞輪詢復(fù)路器上就緒的channel即可,也就是說锅铅,如果某個客戶端連接數(shù)據(jù)傳輸完成酪呻,那么select()方法會返回就緒的channel,然后執(zhí)行相關(guān)的處理即可盐须。代碼如下:

    @Slf4j
    public class NIOSelectorServerSocket implements Runnable{
        public static final int PORT = 8080;
        Selector selector;
        ServerSocketChannel serverSocketChannel;
    
        public NIOSelectorServerSocket(int port) throws IOException {
            selector = Selector.open();
            serverSocketChannel=ServerSocketChannel.open();
            //如果采用selector模型玩荠,必須要設(shè)置非阻塞
            serverSocketChannel.configureBlocking(false);
            serverSocketChannel.socket().bind(new InetSocketAddress(port));
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        }
    
        @Override
        public void run() {
            while(!Thread.interrupted()){
                try {
                    //阻塞等待事件就緒
                    selector.select();
                    //事件列表
                    Set selected = selector.selectedKeys();
                    Iterator it = selected.iterator();
                    while(it.hasNext()){
                        //說明有連接進來
                        dispatch((SelectionKey) it.next());
                        //移除當前就緒的事件
                        it.remove();
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
        private void dispatch(SelectionKey key) throws IOException {
            //是連接事件?
            if(key.isAcceptable()){
                log.info("客戶端注冊事件:{}",key);
                register(key);
            }else if(key.isReadable()){
                log.info("讀事件:{}",key);
                //讀事件
                read(key);
            }else if(key.isWritable()){
                log.info("寫事件:{}",key);
                //寫事件
                write(key);
            }
        }
        private void register(SelectionKey key) throws IOException {
            //客戶端連接
            ServerSocketChannel channel= (ServerSocketChannel) key.channel();
            //獲得客戶端連接
            SocketChannel socketChannel = channel.accept();
            socketChannel.configureBlocking(false);
            socketChannel.register(selector,SelectionKey.OP_READ);
        }
        private void read(SelectionKey key) throws IOException {
            //得到的是socketChannel
            SocketChannel channel= (SocketChannel) key.channel();
            ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
            channel.read(byteBuffer);
            log.info("Server Receive Msg:{}",new String(byteBuffer.array()));
            //反轉(zhuǎn)
            byteBuffer.flip();
            channel.write(byteBuffer);
        }
        private void write(SelectionKey key) throws IOException {
            //得到的是socketChannel
         
        }
    
        public static void main(String[] args) throws IOException {
            NIOSelectorServerSocket selectorServerSocket=new NIOSelectorServerSocket(PORT);
            new Thread(selectorServerSocket).start();
        }
    }
    
  • 事實上NIO已經(jīng)解決了上述BIO暴露的下面兩個問題:

    • 同步阻塞IO贼邓,讀寫阻塞阶冈,線程等待時間過長。
    • 在制定線程策略的時候塑径,只能根據(jù)CPU的數(shù)目來限定可用線程資源女坑,不能根據(jù)連接并發(fā)數(shù)目來制定,也就是連接有限制统舀。否則很難保證對客戶端請求的高效和公平匆骗。
  • 到這里為止,通過NIO的多路復(fù)用機制誉简,解決了IO阻塞導(dǎo)致客戶端連接處理受限的問題碉就,服務(wù)端只需要一個線程就可以維護多個客戶端,并且客戶端的某個連接如果準備就緒時闷串,會通過事件機制告訴應(yīng)用程序某個channel可用瓮钥,應(yīng)用程序通過select方法選出就緒的channel進行處理。

單線程Reactor 模型(高性能I/O設(shè)計模式)

  • 了解了NIO多路復(fù)用后烹吵,就有必要再和大家說一下Reactor多路復(fù)用高性能I/O設(shè)計模式碉熄,Reactor本質(zhì)上就是基于NIO多路復(fù)用機制提出的一個高性能IO設(shè)計模式,它的核心思想是把響應(yīng)IO事件和業(yè)務(wù)處理進行分離年叮,通過一個或者多個線程來處理IO事件,然后將就緒得到事件分發(fā)到業(yè)務(wù)處理handlers線程去異步非阻塞處理玻募,如圖所示只损。

    reactor-1.png

Reactor模型有三個重要的組件:

  • Reactor :將I/O事件發(fā)派給對應(yīng)的Handler

  • Acceptor :處理客戶端連接請求

  • Handlers :執(zhí)行非阻塞讀/寫

  • 一個單線程的Reactor模型,代碼如下:

    ReactorMain

    public class ReactorMain {
        public static final int PORT = 8080;
        public static void main(String[] args) throws IOException {
            new Thread(new Reactor(PORT),"Main-Thread").start();
        }
    }
    

    Reactor

    @Slf4j
    public class Reactor implements Runnable{
        private final Selector selector;
        private final ServerSocketChannel serverSocketChannel;
    
        public Reactor(int port) throws IOException {
            selector=Selector.open();
            serverSocketChannel= ServerSocketChannel.open();
            serverSocketChannel.socket().bind(new InetSocketAddress(port));
            serverSocketChannel.configureBlocking(false);
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT,new Acceptor(selector,serverSocketChannel));
        }
    
        @Override
        public void run() {
            while(!Thread.interrupted()){
                try {
                    selector.select();
                    Set<SelectionKey> selectionKeys = selector.selectedKeys();
                    Iterator<SelectionKey> iterator = selectionKeys.iterator();
                    while(iterator.hasNext()){
                        dispatch(iterator.next());
                        iterator.remove();
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
        private void dispatch(SelectionKey key){
            //可能拿到的對象有兩個
            // Acceptor
            // Handler
            Runnable runnable = (Runnable)key.attachment();
            if(runnable != null){
                runnable.run();
            }
        }
    }
    
    

    Acceptor

    @Slf4j
    public class Acceptor implements Runnable{
    
        private final Selector selector;
        private final ServerSocketChannel serverSocketChannel;
    
        public Acceptor(Selector selector, ServerSocketChannel serverSocketChannel) {
            this.selector = selector;
            this.serverSocketChannel = serverSocketChannel;
        }
    
        @Override
        public void run() {
            SocketChannel channel;
            try {
                //得到一個客戶端連接
                channel = serverSocketChannel.accept();
                log.info("{}:收到一個客戶端連接",channel.getRemoteAddress());
                channel.configureBlocking(false);
                channel.register(selector, SelectionKey.OP_READ,new Handler(channel));
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    

    Handler

    @Slf4j
    public class Handler implements Runnable{
        SocketChannel channe;
    
        public Handler(SocketChannel channe) {
            this.channe = channe;
        }
    
        @Override
        public void run() {
            log.info("{}------",Thread.currentThread().getName());
            ByteBuffer buffer=ByteBuffer.allocate(1024);
            /*try {
                Thread.sleep(1000000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }*/
            int len=0,total=0;
            String msg="";
            try {
                do {
                    len = channe.read(buffer);
                    if(len>0){
                        total+=len;
                        msg+=new String(buffer.array());
                    }
                } while (len > buffer.capacity());
                log.info("total:{}",total);
    
                //msg=表示通信傳輸報文
                //耗時2s
                //登錄: username:password
                //ServetRequets: 請求信息
                //數(shù)據(jù)庫的判斷
                //返回數(shù)據(jù)七咧,通過channel寫回到客戶端
    
                log.info("{}: Server receive Msg:{}",channe.getRemoteAddress(),msg);
            }catch (Exception e){
                e.printStackTrace();
            }finally {
                if(channe!=null){
                    try {
                        channe.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
    

    代碼是最基本的單Reactor單線程模型(整體的I/O操作是由同一個線程完成的)跃惫。

    其中Reactor線程,負責(zé)多路分離套接字艾栋,有新連接到來觸發(fā)connect 事件之后爆存,交由Acceptor進行處理,有IO讀寫事件之后交給hanlder 處理蝗砾。
    Acceptor主要任務(wù)就是構(gòu)建handler 先较,在獲取到和client相關(guān)的SocketChannel之后 携冤,綁定到相應(yīng)的hanlder上隐孽,對應(yīng)的SocketChannel有讀寫事件之后耐薯,基于racotor 分發(fā),hanlder就可以處理了(所有的IO事件都綁定到selector上攒发,由Reactor分發(fā))手幢。

    Reactor 模式本質(zhì)上指的是使用 I/O 多路復(fù)用(I/O multiplexing) + 非阻塞 I/O(nonblocking I/O) 的模式琳钉。

多線程單Reactor模型

  • 單線程Reactor這種實現(xiàn)方式有存在著缺點籍铁,從實例代碼中可以看出冕碟,handler的執(zhí)行是串行的滚粟,如果其中一個handler處理線程阻塞將導(dǎo)致其他的業(yè)務(wù)處理阻塞癌幕。由于handler和reactor在同一個線程中的執(zhí)行衙耕,這也將導(dǎo)致新的無法接收新的請求,我們做一個小實驗:

    • 在上述Reactor代碼的DispatchHandler的run方法中勺远,增加一個Thread.sleep()橙喘。
    • 打開多個客戶端窗口連接到Reactor Server端,其中一個窗口發(fā)送一個信息后被阻塞谚中,另外一個窗口再發(fā)信息時由于前面的請求阻塞導(dǎo)致后續(xù)請求無法被處理渴杆。
  • 為了解決這種問題,有人提出使用多線程的方式來處理業(yè)務(wù)宪塔,也就是在業(yè)務(wù)處理的地方加入線程池異步處理磁奖,將reactor和handler在不同的線程來執(zhí)行,如圖所示某筐。

reactor-2.png

多線程改造-MultiDispatchHandler比搭,我們直接將前面的Reactor單線程模型改成多線程,其實我們就是把IO阻塞的問題通過異步的方式做了優(yōu)化南誊,源碼如下:

@Slf4j
public class MutilDispatchHandler implements Runnable{

    SocketChannel channel;

    private Executor executor= Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

    public MutilDispatchHandler(SocketChannel channel) {
        this.channel = channel;
    }

    @Override
    public void run() {
        processor();
    }
    private void processor(){
        executor.execute(new ReaderHandler(channel));
    }
    static class ReaderHandler implements Runnable{
        private SocketChannel channel;

        public ReaderHandler(SocketChannel channel) {
            this.channel = channel;
        }

        @Override
        public void run() {
            log.info("{}------",Thread.currentThread().getName());
            ByteBuffer buffer=ByteBuffer.allocate(1024);
            try {
                Thread.sleep(100000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            int len=0,total=0;
            String msg="";
            try {
                do {
                    len = channel.read(buffer);
                    if(len>0){
                        total+=len;
                        msg+=new String(buffer.array());
                    }
                } while (len > buffer.capacity());
                System.out.println("total:"+total);

                //msg=表示通信傳輸報文
                //耗時2s
                //登錄: username:password
                //ServetRequets: 請求信息
                //數(shù)據(jù)庫的判斷
                //返回數(shù)據(jù)身诺,通過channel寫回到客戶端
                log.info("{}: Server receive Msg:{}",channel.getRemoteAddress(),msg);
            }catch (Exception e){
                e.printStackTrace();
            }finally {
                if(channel!=null){
                    try {
                        channel.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
}
  • 在多線程Reactor模型中,添加了一個工作者線程池抄囚,并將非I/O操作從Reactor線程中移出轉(zhuǎn)交給工作者線程池來執(zhí)行霉赡。這樣能夠提高Reactor線程的I/O響應(yīng),不至于因為一些耗時的業(yè)務(wù)邏輯而延遲對后面I/O請求的處理幔托。
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末穴亏,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子重挑,更是在濱河造成了極大的恐慌嗓化,老刑警劉巖,帶你破解...
    沈念sama閱讀 216,372評論 6 498
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件谬哀,死亡現(xiàn)場離奇詭異刺覆,居然都是意外死亡,警方通過查閱死者的電腦和手機史煎,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,368評論 3 392
  • 文/潘曉璐 我一進店門谦屑,熙熙樓的掌柜王于貴愁眉苦臉地迎上來驳糯,“玉大人,你說我怎么就攤上這事伦仍〗峋剑” “怎么了?”我有些...
    開封第一講書人閱讀 162,415評論 0 353
  • 文/不壞的土叔 我叫張陵充蓝,是天一觀的道長隧枫。 經(jīng)常有香客問我,道長谓苟,這世上最難降的妖魔是什么官脓? 我笑而不...
    開封第一講書人閱讀 58,157評論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮涝焙,結(jié)果婚禮上卑笨,老公的妹妹穿的比我還像新娘。我一直安慰自己仑撞,他們只是感情好赤兴,可當我...
    茶點故事閱讀 67,171評論 6 388
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著隧哮,像睡著了一般桶良。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上沮翔,一...
    開封第一講書人閱讀 51,125評論 1 297
  • 那天陨帆,我揣著相機與錄音,去河邊找鬼采蚀。 笑死疲牵,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的榆鼠。 我是一名探鬼主播纲爸,決...
    沈念sama閱讀 40,028評論 3 417
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼妆够!你這毒婦竟也來了识啦?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 38,887評論 0 274
  • 序言:老撾萬榮一對情侶失蹤责静,失蹤者是張志新(化名)和其女友劉穎袁滥,沒想到半個月后盖桥,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體灾螃,經(jīng)...
    沈念sama閱讀 45,310評論 1 310
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,533評論 2 332
  • 正文 我和宋清朗相戀三年揩徊,在試婚紗的時候發(fā)現(xiàn)自己被綠了腰鬼。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片嵌赠。...
    茶點故事閱讀 39,690評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖熄赡,靈堂內(nèi)的尸體忽然破棺而出姜挺,到底是詐尸還是另有隱情,我是刑警寧澤彼硫,帶...
    沈念sama閱讀 35,411評論 5 343
  • 正文 年R本政府宣布炊豪,位于F島的核電站,受9級特大地震影響拧篮,放射性物質(zhì)發(fā)生泄漏词渤。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,004評論 3 325
  • 文/蒙蒙 一串绩、第九天 我趴在偏房一處隱蔽的房頂上張望缺虐。 院中可真熱鬧,春花似錦礁凡、人聲如沸高氮。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,659評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽剪芍。三九已至,卻和暖如春韧掩,著一層夾襖步出監(jiān)牢的瞬間紊浩,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,812評論 1 268
  • 我被黑心中介騙來泰國打工疗锐, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留坊谁,地道東北人。 一個月前我還...
    沈念sama閱讀 47,693評論 2 368
  • 正文 我出身青樓滑臊,卻偏偏與公主長得像口芍,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子雇卷,可洞房花燭夜當晚...
    茶點故事閱讀 44,577評論 2 353

推薦閱讀更多精彩內(nèi)容