理解NIO及Reactor模式

目前移動互聯(lián)網(wǎng)應用非常流行,如微信炭晒,抖音等都需要實時與服務器通信崎页,獲取實時的信息,這種獲取信息的方式就是通過長連接socket實現(xiàn)的腰埂,那就需要了解服務端是如何處理高并發(fā)IO的飒焦。

IO(BIO)

在jdk1.4之前,java中的IO類庫實在是超級原始,很多我們現(xiàn)在熟知的概念都還沒有出現(xiàn)牺荠,比如說管道翁巍、緩沖區(qū)等等。正是由于這些等等原因休雌,C語言和C++一直都是IO方面的首選灶壶。這是原始的IO方式,也叫作BIO杈曲,它的原理很簡單驰凛,我們使用一張圖來表示一下:

1.png

也就是說BIO時代,每次有一個客戶端連接進來的時候担扑,都會有一個新的線程去處理恰响,缺點顯而易見,如果連接比較多的時候涌献,我們就要建立大量的線程去一一處理胚宦。

NIO

jdk1.4開始被正式發(fā)布了,做出的一個巨大的改變就是新增了NIO包燕垃。它提供了很多異步的IO操作方法枢劝,比如說緩沖區(qū)ByteBuffer、Pipe卜壕、Channel還有多路復用器Selector等等您旁。新的NIO類庫的出現(xiàn),極大地促進了java對異步非阻塞式編程的發(fā)展轴捎。NIO的原理也是很簡單被冒。在這里同樣使用一張圖來演示一遍:


2.png

現(xiàn)在我們可以看到,所有的客戶端連接都可以只用一個線程就可以實現(xiàn)了轮蜕。

AIO

在2011年7月28日昨悼,官方將用了將近十年的NIO類庫做了升級,也被稱為NIO2.0跃洛。后來也叫作AIO率触。AIO的原理是在之前的基礎上進行的改進,意思是異步非阻塞式IO汇竭,也就是說你的客戶端在進行讀寫操作的時候葱蝗,只需要給服務器發(fā)送一個請求,不用一直等待回答就可以去做其他的事了细燎。
由于AIO過于復雜两曼,并沒有廣泛使用,這里就不展開說了玻驻。

NIO解決的問題

Nio要解決的問題網(wǎng)上的解釋一大堆悼凑,諸如銀行取號偿枕、餐廳點餐等等。這些列子就不再具體地重復了户辫,實際上就是為了使用現(xiàn)有的資源提供更高的生產(chǎn)效率渐夸。

如何提高呢?舉個簡單例子渔欢,一個汽車生產(chǎn)廠商有若干條生產(chǎn)線(一條生產(chǎn)線負責汽車制造的所有環(huán)節(jié))墓塌,每個生產(chǎn)線都有相同的工人數(shù)目,每個工人都負責一個生產(chǎn)環(huán)節(jié)奥额,也就是說生產(chǎn)發(fā)動機和生產(chǎn)輪胎的工人數(shù)目是一樣的苫幢,但是很明顯生產(chǎn)發(fā)動機需要的時間肯定比輪胎要長很多,那么在每一條生產(chǎn)線上生產(chǎn)發(fā)動機的那個工人往往滿負荷工作垫挨,而生產(chǎn)輪胎的工人卻很閑韩肝,這樣生產(chǎn)效率很低。因此廠家打破了這種一條生產(chǎn)線生產(chǎn)汽車所有環(huán)節(jié)的模式棒拂,改為一個汽車零部件一條生產(chǎn)線,那么在發(fā)動機生產(chǎn)線雇傭的工人數(shù)目一定多于輪胎生產(chǎn)線玫氢,這樣每條生產(chǎn)線的工人都不會閑著帚屉,通過資源的合理分配最大化利用了工人的價值,提高了生產(chǎn)效率漾峡,賺取了剩余價值攻旦。

而如何通過資源合理分配來提高生產(chǎn)效率就是nio在計算機io領域要解決的問題。

同步/異步生逸、阻塞/非阻塞

同步和異步針對應用程序來牢屋,關注的是程序中間的協(xié)作關系;
阻塞與非阻塞更關注的是單個進程的執(zhí)行狀態(tài)槽袄。

同步:執(zhí)行一個操作之后烙无,等待結果,然后才繼續(xù)執(zhí)行后續(xù)的操作遍尺。
異步:執(zhí)行一個操作后截酷,可以去執(zhí)行其他的操作,然后等待通知再回來執(zhí)行剛才沒執(zhí)行完的操作乾戏。

阻塞:進程給CPU傳達一個任務之后迂苛,一直等待CPU處理完成,然后才執(zhí)行后面的操作鼓择。
非阻塞:進程給CPU傳達任我后三幻,繼續(xù)處理后續(xù)的操作,隔斷時間再來詢問之前的操作是否完成呐能。這樣的過程其實也叫輪詢念搬。

什么是Reactor

后期隨著jdk的一步步發(fā)展,nio非堵塞技術開始變多越來越加廣泛,可分為三種reactor模式锁蠕。

Reactor可以理解為反應器模式夷野。當一個主體發(fā)生改變時,所有依屬體都得到通知荣倾。不過悯搔,觀察者模式與單個事件源關聯(lián),而反應器模式則與多個事件源關聯(lián) 舌仍。

NIO 有一個主要的類Selector妒貌,這個類似一個觀察者,只要把需要探知的SocketChannel告訴Selector铸豁,接著做別的事情灌曙,當有事件發(fā)生時,他會通知我們节芥,傳回一組SelectionKey在刺,讀取這些Key,就會獲得剛剛注冊過的SocketChannel头镊,然后蚣驼,我們從這個Channel中讀取數(shù)據(jù),接著可以處理這些數(shù)據(jù)相艇。

單線程的Reactor模式

Reactor里面的單線程模式的結構圖:


1.png

當有多個請求發(fā)送到server的時候颖杏,會經(jīng)過反應器對其進行處理,相應的代碼如下所示:

 
import lombok.extern.slf4j.Slf4j;
 
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
 
/**
 * @author idea
 * @data 2019/4/11
 */
@Slf4j
public class NioServer {
 
    public static void main(String[] args) {
        try {
            Selector selector = Selector.open();
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.configureBlocking(false);
            serverSocketChannel.bind(new InetSocketAddress(9090));
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            System.out.println("server is open!");
            while (true) {
                if (selector.select() > 0) {
                    Set<SelectionKey> keys = selector.selectedKeys();
                    Iterator<SelectionKey> iterator = keys.iterator();
                    while (iterator.hasNext()) {
 
                        SelectionKey selectionKey = iterator.next();
                        if (selectionKey.isReadable()) {
                            SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
                            ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                            int len = 0;
                            //當管道的數(shù)據(jù)都讀取完畢了
                            while ((len = (socketChannel.read(byteBuffer))) > 0) {
                                byteBuffer.flip();
                                System.out.println(new String(byteBuffer.array(), 0, len));
                                byteBuffer.clear();
                            }
                        } else if (selectionKey.isAcceptable()) {
                            //第一次鏈接到server坛芽,需要構建一個通道
                            ServerSocketChannel acceptServerSocketChannel = (ServerSocketChannel) selectionKey.channel();
                            //開通通道
                            SocketChannel socketChannel = acceptServerSocketChannel.accept();
                            //設置為非堵塞
                            socketChannel.configureBlocking(false);
                            //注冊可讀的監(jiān)聽事件
                            socketChannel.register(selector, SelectionKey.OP_READ);
                            System.out.println("[server]接收到新的鏈接");
                        }
                        iterator.remove();
                    }
                }
 
            }
        } catch (IOException e) {
            log.error("[server]異常出現(xiàn)留储,信息為{}", e);
        }
 
    }
}

單線程模式的Reactor有一個很明顯的缺點,那就是在處理請求的時候咙轩,對于不同狀態(tài)的通道處理获讳,以及請求的監(jiān)聽全部都放在了單個線程上進行,(多個Channel可以注冊到同一個Selector對象上活喊,實現(xiàn)了一個線程同時監(jiān)控多個請求狀態(tài)(Channel))因此效率很低下赔嚎。因此就會有了第二種Reactor模式。

多線程的Reactor模式

在原先的單線程模式中胧弛,一個線程同時處理多個請求尤误,但是所有的讀寫請求以及對于數(shù)據(jù)的處理都在同一線程中,無法充分利用多cpu的優(yōu)勢结缚,因此誕生了這種多線程的Reactor模式损晤。

多線程的Reactor模式基本結構圖如下所示:


2.png

代碼如下所示:

 
 
import lombok.extern.slf4j.Slf4j;
 
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
 
/**
 * @author idea
 * @data 2019/4/11
 */
@Slf4j
public class Server {
 
    public static void main(String[] args) {
        try {
            Selector selector = Selector.open();
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.configureBlocking(false);
            serverSocketChannel.bind(new InetSocketAddress(9090));
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            System.out.println("[server]開始啟動服務器");
            while (true) {
                if (selector.selectNow() < 0) {
                    continue;
                }
                Set<SelectionKey> keys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = keys.iterator();
                while (iterator.hasNext()) {
 
                    SelectionKey selectionKey = iterator.next();
                    if (selectionKey.isReadable()) {
                        Processor processor = (Processor) selectionKey.attachment();
                        processor.process(selectionKey);
                    } else if (selectionKey.isAcceptable()) {
                        ServerSocketChannel acceptServerSocketChannel = (ServerSocketChannel) selectionKey.channel();
                        SocketChannel socketChannel = acceptServerSocketChannel.accept();
                        socketChannel.configureBlocking(false);
                        SelectionKey key = socketChannel.register(selector, SelectionKey.OP_READ);
 
                        //綁定處理器線程
                        key.attach(new Processor());
                        System.out.println("[server]接收到新的鏈接");
 
                    }
                    iterator.remove();
                }
            }
        } catch (IOException e) {
            log.error("[server]異常出現(xiàn),信息為{}", e);
        }
 
    }
}

從代碼可以看到红竭,每次當系相應的channel注冊完相應的OP_READ事件后尤勋,可以對相應的SelectionKey attach一個對象(本例中attach了一個Processor對象喘落,該對象處理讀請求),并且在獲取到可讀事件后最冰,可以取出該對象瘦棋。

再看到相應的Processor對象代碼:

 
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
 
/**
 * 處理器
 *
 * @author idea
 * @data 2019/4/11
 */
public class Processor {
 
    private static final ExecutorService service = new ThreadPoolExecutor(16, 16,
            0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>());
 
    public void process(SelectionKey selectionKey) {
        service.submit(() -> {
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
            int count = socketChannel.read(buffer);
            if (count < 0) {
                socketChannel.close();
                selectionKey.cancel();
                System.out.println("讀取結束!");
                return null;
            } else if (count == 0) {
                return null;
            }
            System.out.println("讀取內(nèi)容:" + new String(buffer.array()));
            return null;
        });
    }
}

需要開啟一個線程池來進行數(shù)據(jù)處理的任務暖哨。這里面就將數(shù)據(jù)處理的壓力分擔給了線程池來執(zhí)行赌朋,充分利用了多線程的優(yōu)勢,將新線程的連接和數(shù)據(jù)的io操作分別放在了不同的線程中進行運行篇裁。

在上述的多線程Reactor模式中沛慢,有專門的nio-acceptor線程來用于監(jiān)聽服務器,接收客戶端的tcp連接达布。然后又有專門的線程池來處理消息的讀取团甲,發(fā)送,編碼解碼等工作黍聂。一個nio同時處理N條鏈路躺苦,每個鏈路只對應一個NIO線程。(防止了并發(fā)操作的發(fā)生)产还∑ダ澹看似這樣的安排很美好,也確實能解決大多數(shù)應用場景的問題雕沉。

但是在極端情況下仍然會有弊端集乔,單獨的NIO線程負責監(jiān)聽和處理所有的客戶端連接可能會存在性能問題去件。例如并發(fā)百萬客戶端連接坡椒,或者服務端需要對客戶端握手進行安全認證躏尉,但是認證本身非常損耗性能涯保。在這類場景下,單獨一個Acceptor線程可能會存在性能不足問題空郊,為了解決性能問題宫莱,產(chǎn)生了第三種Reactor線程模型-主從Reactor多線程模型丈攒。

主從Reactor多線程模式

3.png

主從Reactor線程模型的特點是:服務端用于接收客戶端連接的不再是個1個單獨的NIO線程,而是一個獨立的NIO線程池授霸。Acceptor接收到客戶端TCP連接請求處理完成后(可能包含接入認證等)巡验,將新創(chuàng)建的SocketChannel注冊到IO線程池(subreactor線程池)的某個IO線程上,由它負責SocketChannel的讀寫和編解碼工作碘耳。Acceptor線程池僅僅只用于客戶端的登陸显设、握手和安全認證,一旦鏈路建立成功辛辨,就將鏈路注冊到后端SubReactor線程池的IO線程上捕捂,由IO線程負責后續(xù)的IO操作瑟枫。

相應代碼:

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
 
/**
 * @author idea
 * @data 2019/4/11
 */
public class Server {

    public static void main(String[] args) throws IOException {
        Selector selector = Selector.open();
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false);
        serverSocketChannel.bind(new InetSocketAddress(1234));
        //初始化通道,標志為accept類型
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
 
        int coreNum = Runtime.getRuntime().availableProcessors();
        Processor[] processors = new Processor[coreNum];
        for (int i = 0; i < processors.length; i++) {
            processors[i] = new Processor();
        }
 
        int index = 0;
        //一直處于堵塞的狀態(tài)
        while (selector.select() > 0) {
            //獲取到selectionkey的集合
            Set<SelectionKey> keys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = keys.iterator();
            while (iterator.hasNext()) {
                SelectionKey key = iterator.next();
                if (key.isAcceptable()) {
                    ServerSocketChannel acceptServerSocketChannel = (ServerSocketChannel) key.channel();
                    SocketChannel socketChannel = acceptServerSocketChannel.accept();
                    socketChannel.configureBlocking(false);
                    System.out.println("Accept request from {}" + socketChannel.getRemoteAddress());
                    Processor processor = processors[(int) ((index++) / coreNum)];
                    processor.addChannel(socketChannel);
                }
                iterator.remove();
            }
        }
    }
}

處理器部分的代碼:

 
import lombok.extern.slf4j.Slf4j;
 
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.SelectorProvider;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
 
@Slf4j
public class Processor {
 
    private static final ExecutorService service =
            Executors.newFixedThreadPool(2 * Runtime.getRuntime().availableProcessors());
 
    private Selector selector;
 
    public Processor() throws IOException {
        this.selector = SelectorProvider.provider().openSelector();
        start();
    }
 
    public void addChannel(SocketChannel socketChannel) throws ClosedChannelException {
        socketChannel.register(this.selector, SelectionKey.OP_READ);
    }
 
    public void start() {
        service.submit(() -> {
            while (true) {
                if (selector.selectNow() <= 0) {
                    continue;
                }
                Set<SelectionKey> keys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = keys.iterator();
                while (iterator.hasNext()) {
                    SelectionKey key = iterator.next();
                    iterator.remove();
                    if (key.isReadable()) {
                        ByteBuffer buffer = ByteBuffer.allocate(1024);
                        SocketChannel socketChannel = (SocketChannel) key.channel();
                        int count = socketChannel.read(buffer);
                        if (count < 0) {
                            socketChannel.close();
                            key.cancel();
                            System.out.println("讀取結束" + socketChannel);
                            continue;
                        } else if (count == 0) {
                            System.out.println("客戶端信息大兄冈堋:" + socketChannel);
                            continue;
                        } else {
                            System.out.println("客戶端信息:" + new String(buffer.array()));
                        }
                    }
                }
            }
        });
    }
}

通常在互聯(lián)網(wǎng)公司中慷妙,對于一些高并發(fā)的應用場景里面都會使用到了Reactor模式,其代替了常用的多線程處理方式允悦,節(jié)省系統(tǒng)的資源膝擂,提高系統(tǒng)的吞吐量。類似于一些netty框架的核心原理其實就是通過nio的Reactor模式來進行設計和開發(fā)澡屡。

?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末猿挚,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子驶鹉,更是在濱河造成了極大的恐慌绩蜻,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,525評論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件室埋,死亡現(xiàn)場離奇詭異办绝,居然都是意外死亡,警方通過查閱死者的電腦和手機姚淆,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,203評論 3 395
  • 文/潘曉璐 我一進店門孕蝉,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人腌逢,你說我怎么就攤上這事降淮。” “怎么了搏讶?”我有些...
    開封第一講書人閱讀 164,862評論 0 354
  • 文/不壞的土叔 我叫張陵佳鳖,是天一觀的道長。 經(jīng)常有香客問我媒惕,道長系吩,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,728評論 1 294
  • 正文 為了忘掉前任妒蔚,我火速辦了婚禮穿挨,結果婚禮上,老公的妹妹穿的比我還像新娘肴盏。我一直安慰自己科盛,他們只是感情好,可當我...
    茶點故事閱讀 67,743評論 6 392
  • 文/花漫 我一把揭開白布菜皂。 她就那樣靜靜地躺著贞绵,像睡著了一般。 火紅的嫁衣襯著肌膚如雪幌墓。 梳的紋絲不亂的頭發(fā)上但壮,一...
    開封第一講書人閱讀 51,590評論 1 305
  • 那天冀泻,我揣著相機與錄音,去河邊找鬼蜡饵。 笑死弹渔,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的溯祸。 我是一名探鬼主播肢专,決...
    沈念sama閱讀 40,330評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼焦辅!你這毒婦竟也來了博杖?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,244評論 0 276
  • 序言:老撾萬榮一對情侶失蹤筷登,失蹤者是張志新(化名)和其女友劉穎剃根,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體前方,經(jīng)...
    沈念sama閱讀 45,693評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡狈醉,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,885評論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了惠险。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片苗傅。...
    茶點故事閱讀 40,001評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖班巩,靈堂內(nèi)的尸體忽然破棺而出渣慕,到底是詐尸還是另有隱情,我是刑警寧澤抱慌,帶...
    沈念sama閱讀 35,723評論 5 346
  • 正文 年R本政府宣布逊桦,位于F島的核電站,受9級特大地震影響遥缕,放射性物質(zhì)發(fā)生泄漏卫袒。R本人自食惡果不足惜宵呛,卻給世界環(huán)境...
    茶點故事閱讀 41,343評論 3 330
  • 文/蒙蒙 一单匣、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧宝穗,春花似錦户秤、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,919評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至须鼎,卻和暖如春鲸伴,著一層夾襖步出監(jiān)牢的瞬間府蔗,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,042評論 1 270
  • 我被黑心中介騙來泰國打工汞窗, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留姓赤,地道東北人。 一個月前我還...
    沈念sama閱讀 48,191評論 3 370
  • 正文 我出身青樓仲吏,卻偏偏與公主長得像不铆,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子裹唆,可洞房花燭夜當晚...
    茶點故事閱讀 44,955評論 2 355