netty極簡教程(五):Netty的Reactor模型演進(jìn)及JDK nio聊天室實(shí)現(xiàn)

介紹了jdk實(shí)現(xiàn)nio的關(guān)鍵Selector以及SelectableChannel罐盔,了解了它的原理耗跛,就明白了netty為什么是事件驅(qū)動模型:(netty極簡教程(四):Selector事件驅(qū)動以及SocketChannel
的使用
斤儿,接下來將它的使用更深入一步裹虫, nio reactor模型演進(jìn)以及聊天室的實(shí)現(xiàn)飞盆;


示例源碼: https://github.com/jsbintask22/netty-learning

nio server

對于io消耗而言娄琉,我們知道提升效率的關(guān)鍵在于服務(wù)端對于io的使用;而nio壓榨cpu的關(guān)鍵在于使用Selector實(shí)現(xiàn)的reactor事件模型以及多線程的加入時(shí)機(jī):

單線程reactor模型

image

省略Selector以及ServerSocketChannel的獲取注冊吓歇; 將所有的操作至于reactor主線程

 while (true) {   // 1
    if (selector.select(1000) == 0) {   // 2
        continue;
    }

    Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();    // 3
    while (selectedKeys.hasNext()) {
        SelectionKey selectionKey = selectedKeys.next();
        SelectableChannel channel = selectionKey.channel();

        if (selectionKey.isAcceptable()) {    // 4
            ServerSocketChannel server = (ServerSocketChannel) channel;
            SocketChannel client = server.accept();
            client.configureBlocking(false);
            client.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(CLIENT_BUFFER_SIZE));
            String serverGlobalInfo = "系統(tǒng)消息:用戶[" + client.getRemoteAddress() + "]上線了";
            System.err.println(serverGlobalInfo);

            forwardClientMsg(serverGlobalInfo, client);   //  5
        } else if (selectionKey.isReadable()) {

                SocketChannel client = (SocketChannel) channel;
                SocketAddress remoteAddress = null;
                try {
                    remoteAddress = client.getRemoteAddress();
                    String clientMsg = retrieveClientMsg(selectionKey);
                    if (clientMsg.equals("")) {
                        return;
                    }
                    System.err.println("收到用戶[" + remoteAddress + "]消息:" + clientMsg);

                    forwardClientMsg("[" + remoteAddress + "]:" + clientMsg, client);   // 6
                } catch (Exception e) {
                    String msg = "系統(tǒng)消息:" + remoteAddress + "下線了";
                    forwardClientMsg(msg, client);            
                    System.err.println(msg);
                    selectionKey.cancel();    // 7
                    try {
                        client.close();
                    } catch (IOException ex) {
                        ex.printStackTrace();
                    }
                }
        }

        selectedKeys.remove();
    }
}
  1. 開啟一個(gè)while循環(huán)孽水,讓Selector不斷的詢問操作系統(tǒng)是否有對應(yīng)的事件已經(jīng)準(zhǔn)備好
  2. Selector檢查事件(等待時(shí)間為1s),如果沒有直接開啟下一次循環(huán)
  3. 獲取已經(jīng)準(zhǔn)備好的事件(SelectionKey)城看,然后依次循環(huán)遍歷處理
  4. 如果是Accept事件女气,說明是ServerSocketChannel注冊的,說明新的連接已經(jīng)建立好了测柠,從中獲取新的連接并將新連接再次注冊到Selector
  5. 注冊后炼鞠,然后生成消息給其它Socket,表示有新用戶上線了
  6. 如果是Read事件轰胁,說明客戶端Socket有新的數(shù)據(jù)可讀取谒主,讀取然后廣播該消息到其它所有客戶端
  7. 如果發(fā)生異常,表示該客戶端斷開連接了(粗略的處理)赃阀,同樣廣播一條消息霎肯,并且將該Socket從Selector上注銷

讀取以及廣播消息方法如下:

SocketChannel client = (SocketChannel) selectionKey.channel();
        ByteBuffer buffer = (ByteBuffer) selectionKey.attachment();
        int len = client.read(buffer);
        if (len == 0) {
            return "";
        }
        buffer.flip();
        byte[] data = new byte[buffer.remaining()];
        int index = 0;
        while (len != index) {
            data[index++] = buffer.get();
        }
        buffer.clear();
        return new String(data, StandardCharsets.UTF_8);

Set<SelectionKey> allClient = selector.keys();
allClient.forEach(selectionKey -> {
    SelectableChannel channel = selectionKey.channel();
    if (!(channel instanceof ServerSocketChannel) && channel != client) {  // 1
        SocketChannel otherClient = (SocketChannel) channel;
        try {
            otherClient.write(ByteBuffer.wrap(clientMsg.getBytes(StandardCharsets.UTF_8)));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
});

從Selector上獲取所有注冊的Channel然后遍歷,如果不是ServerSocketChannel或者當(dāng)前消息的Channel榛斯,就將消息發(fā)送出去.


以上观游,所有代碼放在同一線程中,對于單核cpu而言肖抱,相比于bio的Socket編程备典,我們主要有一個(gè)方面的改進(jìn)

  • 雖然accept方法依然是阻塞的,可是我們已經(jīng)知道了肯定會有新的連接進(jìn)來意述,所以調(diào)用改方法不會再阻塞而是直接獲取一個(gè)新連接
  • 對于read方法而言同樣如此提佣,雖然該方法依然是一個(gè)阻塞的方法,可是我們已經(jīng)知道了接下來調(diào)用必定會有有效數(shù)據(jù)荤崇,這樣cpu不用再進(jìn)行等待
  • 通過Selector在一個(gè)線程中便管理了多個(gè)Channel

而對于多核cpu而言拌屏,Selector雖然能夠有效規(guī)避accept和read的無用等待時(shí)間,可是它依然存在一些問題术荤;

  1. 上面的操作關(guān)鍵在于Selector的select操作倚喂,該方法必須能夠快速循環(huán)調(diào)用,不宜和其它io讀取寫入放在一起
  2. channel的io(read和write)操作較為耗時(shí),不宜放到同一線程中處理

多線程reactor模型

Reactor多線程模型

基于上面的單線程問題考慮端圈,我們可以將io操作放入線程池中處理:

  1. 將accept事件的廣播放入線程池中處理
  2. 將read事件的所有io操作放入線程池中處理
if (selectionKey.isAcceptable()) {
        ServerSocketChannel server = (ServerSocketChannel) channel;
        SocketChannel client = server.accept();
        client.configureBlocking(false);
        client.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(CLIENT_BUFFER_SIZE));
        String serverGlobalInfo = "系統(tǒng)消息:用戶[" + client.getRemoteAddress() + "]上線了";
        System.err.println(serverGlobalInfo);

        executorService.submit(() -> {    // 1
            forwardClientMsg(serverGlobalInfo, client);
        });
    } else if (selectionKey.isReadable()) {

        executorService.submit(() -> {    // 2
            SocketChannel client = (SocketChannel) channel;
            SocketAddress remoteAddress = null;
            try {
                remoteAddress = client.getRemoteAddress();
                String clientMsg = retrieveClientMsg(selectionKey);
                if (clientMsg.equals("")) {
                    return;
                }
                System.err.println("收到用戶[" + remoteAddress + "]消息:" + clientMsg);

                forwardClientMsg("[" + remoteAddress + "]:" + clientMsg, client);  
            } catch (Exception e) {
                String msg = "系統(tǒng)消息:" + remoteAddress + "下線了";
                forwardClientMsg(msg, client);
                System.err.println(msg);
                selectionKey.cancel();
                try {
                    client.close();
                } catch (IOException ex) {
                    ex.printStackTrace();
                }
            }
        });
    }

    selectedKeys.remove();
}

在 1與2處焦读,我們加入了線程池處理,不再在reactor主線程中做任何io操作舱权。 這便是reactor多線程模型


雖然模型2有效利用了多核cpu優(yōu)勢矗晃,可是依然能夠找到瓶頸

  • 雖然廣播消息是在一個(gè)獨(dú)立線程中,可是我們需要將Selector上注冊的所有的channel全部遍歷宴倍,如果Selector注冊了太多的channel张症,依舊會有效率問題
  • 因?yàn)镾elector注冊了過多的Channel,所以在進(jìn)行select選取時(shí)對于主線程而言依舊會有很多的循環(huán)操作鸵贬,存在瓶頸

基于以上問題俗他,我們可以考慮引入多個(gè)Selector,這樣主Selector只負(fù)責(zé)讀取accept操作阔逼,而其他的io操作均有子Selector負(fù)責(zé)兆衅,這便是多Reactor多線程模型

多Reactor多線程模型

Reactor多線程模型

基于上面的思考,我們要在單Reactor多線程模型上主要需要以下操作

  1. 對于accept到的新連接不再放入主Selector颜价,將其加入多個(gè)子Selector
  2. 子Selector操作應(yīng)該在異步線程中進(jìn)行.
  3. 所有子Selector只進(jìn)行read write操作

基于以上涯保,會增加一個(gè)子Selector列表,并且將原來的accept以及讀取廣播分開周伦;
private List<Selector> subSelector = new ArrayList<>(8); 定義一個(gè)包含8個(gè)子selector的列表并進(jìn)行初始化

image


如圖夕春,分別開啟了一個(gè)reactor主線程,以及8個(gè)子selector子線程专挪,其中及志,主線程現(xiàn)在只進(jìn)行accept然后添加至子selector

 while (true) {
    if (mainSelector.select(1000) == 0) {
        continue;
    }

    Iterator<SelectionKey> selectedKeys = mainSelector.selectedKeys().iterator();
    while (selectedKeys.hasNext()) {
        SelectionKey selectionKey = selectedKeys.next();
        SelectableChannel channel = selectionKey.channel();

        if (selectionKey.isAcceptable()) {

            ServerSocketChannel server = (ServerSocketChannel) channel;
            SocketChannel client = server.accept();
            client.configureBlocking(false);
            client.register(subSelector.get(index++), SelectionKey.OP_READ,     // 1
                    ByteBuffer.allocate(CLIENT_BUFFER_SIZE));
            if (index == 8) {   // 2
                index = 0;
            }

            String serverGlobalInfo = "系統(tǒng)消息:用戶[" + client.getRemoteAddress() + "]上線了";
            System.err.println(serverGlobalInfo);

            forwardClientMsg(serverGlobalInfo, client);
        }
    }

    selectedKeys.remove();
}
  1. 將新連接注冊至從Selector.
  2. 如果當(dāng)前的selector已經(jīng)全部添加了一遍則重新從第一個(gè)開始

所有的從Selector只進(jìn)行io操作,并且本身已經(jīng)在異步線程中運(yùn)行

while (true) {
    if (subSelector.select(1000) == 0) {
        continue;
    }

    Iterator<SelectionKey> selectedKeys = subSelector.selectedKeys().iterator();
    while (selectedKeys.hasNext()) {
        SelectionKey selectionKey = selectedKeys.next();
        SelectableChannel channel = selectionKey.channel();

        if (selectionKey.isReadable()) {
            SocketChannel client = (SocketChannel) channel;
            SocketAddress remoteAddress = null;
            try {
                remoteAddress = client.getRemoteAddress();
                String clientMsg = retrieveClientMsg(selectionKey);  // 1
                if (clientMsg.equals("")) {
                    return;
                }
                System.err.println("收到用戶[" + remoteAddress + "]消息:" + clientMsg);
            
                forwardClientMsg("[" + remoteAddress + "]:" + clientMsg, client);  // 2
            } catch (Exception e) {
                String msg = "系統(tǒng)消息:" + remoteAddress + "下線了";
                forwardClientMsg(msg, client);
                System.err.println(msg);
                selectionKey.cancel();
                try {
                    client.close();
                } catch (IOException ex) {
                    ex.printStackTrace();
                }
            }
        }

        selectedKeys.remove();
    }
  1. 讀取消息
  2. 廣播消息
    啟動server寨腔,并且打開三個(gè)客戶端:


    image

    image

    image

    如圖所示速侈,上線通知,消息轉(zhuǎn)發(fā)迫卢,下線通知成功倚搬, 主Selector與從Selector交互成功

netty線程模型思考

事實(shí)上,在netty的線程模型中乾蛤,與上方的多Reactor多線程模型類似每界,一個(gè)改進(jìn)版的多路復(fù)用多Reactor模型; Reactor主從線程模型

  1. 一個(gè)主線程不斷輪詢進(jìn)行accept操作家卖,將channel注冊至子Selector
  2. 一個(gè)線程持有一個(gè)Selector
  3. 一個(gè)子Selector又可以管理多個(gè)channel
  4. 在斷開連接前眨层,一個(gè)channel總是在同一個(gè)線程中進(jìn)行io操作處理

基于以上思考,我們將在后面在netty源碼中進(jìn)行一一驗(yàn)證上荡。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末趴樱,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌叁征,老刑警劉巖纳账,帶你破解...
    沈念sama閱讀 217,277評論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異航揉,居然都是意外死亡塞祈,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,689評論 3 393
  • 文/潘曉璐 我一進(jìn)店門帅涂,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人尤蛮,你說我怎么就攤上這事媳友。” “怎么了产捞?”我有些...
    開封第一講書人閱讀 163,624評論 0 353
  • 文/不壞的土叔 我叫張陵醇锚,是天一觀的道長。 經(jīng)常有香客問我坯临,道長焊唬,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,356評論 1 293
  • 正文 為了忘掉前任看靠,我火速辦了婚禮赶促,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘挟炬。我一直安慰自己鸥滨,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,402評論 6 392
  • 文/花漫 我一把揭開白布谤祖。 她就那樣靜靜地躺著婿滓,像睡著了一般。 火紅的嫁衣襯著肌膚如雪粥喜。 梳的紋絲不亂的頭發(fā)上凸主,一...
    開封第一講書人閱讀 51,292評論 1 301
  • 那天,我揣著相機(jī)與錄音额湘,去河邊找鬼卿吐。 笑死,一個(gè)胖子當(dāng)著我的面吹牛缩挑,可吹牛的內(nèi)容都是我干的但两。 我是一名探鬼主播,決...
    沈念sama閱讀 40,135評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼供置,長吁一口氣:“原來是場噩夢啊……” “哼谨湘!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 38,992評論 0 275
  • 序言:老撾萬榮一對情侶失蹤紧阔,失蹤者是張志新(化名)和其女友劉穎坊罢,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體擅耽,經(jīng)...
    沈念sama閱讀 45,429評論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡活孩,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,636評論 3 334
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了乖仇。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片憾儒。...
    茶點(diǎn)故事閱讀 39,785評論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖乃沙,靈堂內(nèi)的尸體忽然破棺而出起趾,到底是詐尸還是另有隱情,我是刑警寧澤警儒,帶...
    沈念sama閱讀 35,492評論 5 345
  • 正文 年R本政府宣布训裆,位于F島的核電站,受9級特大地震影響蜀铲,放射性物質(zhì)發(fā)生泄漏边琉。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,092評論 3 328
  • 文/蒙蒙 一记劝、第九天 我趴在偏房一處隱蔽的房頂上張望变姨。 院中可真熱鬧,春花似錦隆夯、人聲如沸钳恕。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,723評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽忧额。三九已至,卻和暖如春愧口,著一層夾襖步出監(jiān)牢的瞬間睦番,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,858評論 1 269
  • 我被黑心中介騙來泰國打工耍属, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留托嚣,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 47,891評論 2 370
  • 正文 我出身青樓厚骗,卻偏偏與公主長得像示启,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個(gè)殘疾皇子领舰,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,713評論 2 354