Reactor Pattern

前言

之前聊過幾種常見的 I/O 模型,不過要說起當紅炸子雞還得是 I/O 多路復用悯衬,Java 1.4 引入的 nio package 提供了對這一模式的支持弹沽,著名開源框架 Netty 更是這一領域的扛鼎之作。

Netty is an asynchronous event-driven network application framework for rapid
development of maintainable high performance protocol servers & clients.

Netty 官網(wǎng)介紹其是一個異步事件驅(qū)動的網(wǎng)絡應用框架筋粗,用于快速開發(fā)可維護的高性能協(xié)議服務器和客戶端策橘。這里異步暫且不說,事件驅(qū)動可是大有文章娜亿,今天就聊聊這個吧丽已。

Event Driven

一個典型的網(wǎng)絡服務通常都有著如下基礎結(jié)構(gòu):

  1. read request

  2. decode request

  3. process service

  4. encode reply

  5. send reply

細究起來,每一步的性質(zhì)和成本各不相同买决,這種不均衡會極大的影響吞吐量沛婴,分治策略通常是解決這一類型問題的最佳方案。我們可以把這其中的每一步都拆分成一個任務督赤,每個任務都以非阻塞的方式運行嘁灯。這里 I/O 事件充當了觸發(fā)器的角色,想想前端火的一塌糊涂的響應式編程躲舌,不正是這樣嗎丑婿?

Reactor Pattern

基于這個設想,就誕生了 Reactor Pattern没卸。Reactor Pattern 首先是事件驅(qū)動的羹奉,它有兩個主要角色:

  1. Reactor,響應 I/O 事件并分發(fā)給對應的 Handler

  2. Handlers约计,非阻塞地執(zhí)行相關操作

從簡單到復雜诀拭,Reactor Pattern 也有很多版本:單線程、多線程以及主從結(jié)構(gòu)病蛉。純理論總是看得人頭疼炫加,我們就以一個 echo server 為例瑰煎,一點點升級它來說明 Reactor Pattern 的各種版本好了。所謂 echo server俗孝,就是客戶端發(fā)什么服務端就回什么酒甸,這里我們約定以#字符標識一條完整的消息。

單線程
single-thread-reactor.png

如上圖所示赋铝,在單線程版本中插勤, Reactor 和 Handlers 共享一個線程,因此連接的建立革骨、數(shù)據(jù)的讀取和處理以及發(fā)送響應均在同一個線程中完成农尖。這個版本的好處是編碼簡單,弊端也是顯而易見的良哲,一旦某個環(huán)節(jié)是一個長時間執(zhí)行的任務盛卡,所有的操作都會被阻塞;并且筑凫,單線程也無法發(fā)揮多核 CPU 的優(yōu)勢滑沧。

@Slf4j
public class SingleThreadedReactor implements Runnable {

    private final int port;

    public SingleThreadedReactor(int port) {
        this.port = port;
    }

    @Override
    public void run() {
        Selector sel = null;
        ServerSocketChannel ssc = null;
        try {
            // 開啟Server端通道
            ssc = ServerSocketChannel.open();
            // 配置非阻塞
            ssc.configureBlocking(false);
            // 注冊一個Acceptor來處理連接
            sel = Selector.open();
            // Acceptor作為attachment進行掛載
            ssc.register(sel, SelectionKey.OP_ACCEPT, new Acceptor(sel, ssc));
            // 綁定端口
            ssc.bind(new InetSocketAddress(port));
            log.info("Server啟動成功,正在監(jiān)聽[{}]端口", port);
            // 輪詢
            iAmListening(sel);
        } catch (IOException ex) {
            log.error("Server啟動失敗", ex);
        } finally {
                // close ssc/sel omitted...
        }
    }

    private void iAmListening(Selector sel) {
        // 輪詢
        while (!Thread.interrupted()) {
            try {
                int num = sel.select();
                // 有事件則處理
                if (num > 0) {
                    Iterator<SelectionKey> iter = sel.selectedKeys().iterator();
                    // 逐個處理
                    while (iter.hasNext()) {
                        SelectionKey sk = iter.next();
                        // 派發(fā)事件
                        dispatch(sk);
                        // 處理過即移除巍实,避免重復處理
                        iter.remove();
                    }
                }
            } catch (IOException ex) {
                log.error("監(jiān)聽失敗", ex);
            }
        }
    }

    private void dispatch(SelectionKey sk) {
        // 事件處理程序都作為attachment掛載
        // 在SelectionKey上滓技,并且包裝成Runnable
        Object attachment = sk.attachment();
        if (attachment instanceof Runnable) {
            ((Runnable) attachment).run();
        }
    }
}

SingleThreadedReactor#run()由兩大塊構(gòu)成,一是配置服務端的ServerSocketChannel棚潦,這一步的核心是掛載Acceptor(一個專門用來處理客戶端接入的 Handler )令漂;二是輪詢 I/O 事件,一旦有事件需要處理就進行派發(fā)丸边。

@Slf4j
public class Acceptor implements Runnable {

    private final Selector sel;

    private final ServerSocketChannel ssc;

    public Acceptor(Selector sel, ServerSocketChannel ssc) {
        this.sel = sel;
        this.ssc = ssc;
    }

    @Override
    public void run() {
        try {
            // 獲取與客戶端通信的SocketChannel
            SocketChannel sc = ssc.accept();
            log.info("檢測到新的客戶端連接: {}", sc.getRemoteAddress());
            // Handler同樣是一個Runnable叠必,用來
            // 處理與客戶端之間的 I/O 操作
            new Handler(sc, sel);
        } catch (IOException ex) {
            log.error("處理連接失敗", ex);
        }
    }
}

Acceptor的職責是處理客戶端的接入,一旦有新的客戶端加入就建立起通道并將其注冊到 Selector 上(通過 Handler 的構(gòu)造函數(shù))妹窖。

@Slf4j
public class Handler implements Runnable {

    enum State {
        READING,
        SENDING
    }

    private final SocketChannel sc;

    private final SelectionKey sk;

    // 初始狀態(tài)為可讀
    private State state = State.READING;

    private final ByteBuffer input = ByteBuffer.allocate(256);
    private final ByteBuffer output = ByteBuffer.allocate(256);

    public Handler(SocketChannel sc, Selector sel) {
        this.sc = sc;
        try {
            // 配置成非阻塞模式
            sc.configureBlocking(false);
            // 將自己也注冊到Selector上挠唆,如此即可供Reactor派發(fā)
            this.sk = sc.register(sel, SelectionKey.OP_READ, this);
            // 注冊完畢,讓select()調(diào)用即刻返回嘱吗,看看此時有沒有事件要處理
            sel.wakeup();
        } catch (IOException ex) {
            throw new RuntimeException("Unable to handler event", ex);
        }
    }

    @Override
    public void run() {
        try {
            // 根據(jù)狀態(tài)做處理
            if (state == State.READING) {
                read();
            } else {
                write();
            }
        } catch (IOException ex) {
            throw new RuntimeException("Unable to handler event", ex);
        }
    }

    private void read() throws IOException {
        log.info("正在讀取客戶端上送的信息");
        // 讀取數(shù)據(jù)
        int oldPos = input.position();
        sc.read(input);
        int newPos = input.position();
        // 以 # 作為結(jié)束符玄组,表示一條完整的消息
        for (int i = oldPos; i < newPos; i++) {
            byte b = input.get(i);
            // 已讀到一條消息,進行處理
            if (b == '#') {
                process(i);
                log.info("處理完畢谒麦,準備發(fā)送響應信息");
                state = State.SENDING;
                sk.interestOps(SelectionKey.OP_WRITE);
            }
        }
    }

    private void write() throws IOException {
        output.flip();
        sc.write(output);
        output.clear();
        state = State.READING;
        sk.interestOps(SelectionKey.OP_READ);
    }

    private void process(int index) throws IOException {
        byte[] bytes = new byte[index + 1];
        input.flip();
        input.get(bytes);
        input.compact();
        // # 刪掉
        String resp = new String(bytes, StandardCharsets.UTF_8)
                .replace("#", "");
        log.info("接受到客戶端[{}]的信息: {}", sc.getRemoteAddress(), resp);
        // 附加一點內(nèi)容
        output.put("Server says: ".getBytes(StandardCharsets.UTF_8));
        // 其它信息原樣返回
        output.put(resp.getBytes(StandardCharsets.UTF_8));
    }
}

Handler負責處理 read -> decode -> process -> encode -> send 這一系列流程俄讹。在我們的例子里,只要沒有讀到#字符绕德,就會一直讀下去患膛,直到客戶端發(fā)送#字符告訴服務端一條完整的消息已經(jīng)發(fā)送完了,服務端就著手進行處理并發(fā)送響應消息耻蛇。

多線程
multi-thread-reactor.png

??多線程版本對 Handler 進行了優(yōu)化踪蹬,通過增加 worker threads 來處理非 I/O 操作胞此,如此一來即使有耗時操作 Handler 也不會阻塞住 Reactor 了;同時多個 worker threads 也能更好地發(fā)揮多核 CPU 的優(yōu)勢跃捣。

@Slf4j
public class Handler implements Runnable {

    enum State {
        READING,
        SENDING
    }

    private final SocketChannel sc;

    private final SelectionKey sk;

    // 初始狀態(tài)為可讀
    private State state = State.READING;

    private final ByteBuffer input = ByteBuffer.allocate(256);
    private final ByteBuffer output = ByteBuffer.allocate(256);

    // 線程池漱牵,用來處理所有入站事件
    private final ExecutorService executor = new ThreadPoolExecutor(2,
            8,
            5, TimeUnit.MINUTES,
            new ArrayBlockingQueue<>(100),
            new ThreadPoolExecutor.CallerRunsPolicy());

    public Handler(SocketChannel sc, Selector sel) {
        this.sc = sc;
        try {
            // 配置成非阻塞模式
            sc.configureBlocking(false);
            // 將自己也注冊到Selector上,如此即可供Reactor派發(fā)
            this.sk = sc.register(sel, SelectionKey.OP_READ, this);
            // 注冊完畢疚漆,讓select()調(diào)用即刻返回酣胀,看看此時有沒有事件要處理
            sel.wakeup();
        } catch (IOException ex) {
            throw new RuntimeException("Unable to handler event", ex);
        }
    }

    @Override
    public void run() {
        try {
            // 根據(jù)狀態(tài)做處理
            if (state == State.READING) {
                // 讀取數(shù)據(jù)
                int oldPos = read();
                // 提交到線程池,異步處理
                executor.execute(() -> {
                    log.info("當前處理的線程: {}", Thread.currentThread().getName());
                    try {
                        process(oldPos);
                    } catch (IOException ex) {
                        throw new RuntimeException("Unable to read data", ex);
                    }
                });
            } else {
                // decode -> process -> encode
                // 以上流程已經(jīng)在 state == State.READING 時處理好了娶聘,此時直接發(fā)送就可以了闻镶,不用異步
                write();
            }
        } catch (IOException ex) {
            throw new RuntimeException("Unable to handler event", ex);
        }
    }

    private synchronized int read() throws IOException {
        log.info("正在讀取客戶端上送的信息");
        // 讀取數(shù)據(jù)
        int oldPos = input.position();
        sc.read(input);
        return oldPos;
    }

    private synchronized void write() throws IOException {
        log.info("正在給客戶端返回響應信息");
        output.flip();
        sc.write(output);
        output.clear();
        state = State.READING;
        sk.interestOps(SelectionKey.OP_READ);
        sk.selector().wakeup();
    }

    private synchronized void process(int oldPos) throws IOException {
        // 計算讀取的長度
        int newPos = input.position();
        // 以 # 作為結(jié)束符,表示一條完整的消息
        for (int i = oldPos; i < newPos; i++) {
            byte b = input.get(i);
            // 已讀到一條消息丸升,進行處理
            if (b == '#') {
                byte[] bytes = new byte[i + 1];
                input.flip();
                input.get(bytes);
                input.compact();
                // # 刪掉
                String resp = new String(bytes, StandardCharsets.UTF_8)
                        .replace("#", "");
                log.info("接受到客戶端[{}]的信息: {}", sc.getRemoteAddress(), resp);
                // 附加一點內(nèi)容
                output.put("Server says: ".getBytes(StandardCharsets.UTF_8));
                // 其它信息原樣返回
                output.put(resp.getBytes(StandardCharsets.UTF_8));
                log.info("處理完畢铆农,準備發(fā)送響應信息");
                state = State.SENDING;
                sk.interestOps(SelectionKey.OP_WRITE);
                sk.selector().wakeup();
            }
        }
    }
}

多線程版本的ReactorAcceptor和單線程版本是一樣的,區(qū)別只在 Handler 中引入了線程池狡耻,這些 worker threads 包辦了流程中的 decode -> process -> encode顿涣。

主從結(jié)構(gòu)
master-slave.png

??主從結(jié)構(gòu)引入了多個 Reactor,將客戶端接入和 socket 讀酝豪、寫進行了解耦:MainReactor 負責處理客戶端的接入,而 SubReactor(s) 則負責在發(fā)生讀精堕、寫事件時進行數(shù)據(jù)處理孵淘。

@Slf4j
public class MainReactor implements Runnable {

    private final int port;

    // 子Reactor數(shù)量
    private final int numberOfSubReactors;

    public MainReactor(int port, int numberOfSubReactors) {
        this.port = port;
        this.numberOfSubReactors = numberOfSubReactors;
    }

    @Override
    public void run() {
        Selector sel = null;
        ServerSocketChannel ssc = null;
        try {
            // 開啟Server端通道
            ssc = ServerSocketChannel.open();
            // 配置非阻塞
            ssc.configureBlocking(false);
            // 注冊一個Acceptor來處理連接
            sel = Selector.open();
            // Acceptor作為attachment進行掛載
            ssc.register(sel, SelectionKey.OP_ACCEPT, new Acceptor(ssc, numberOfSubReactors));
            // 綁定端口
            ssc.bind(new InetSocketAddress(port));
            log.info("Server啟動成功,正在監(jiān)聽[{}]端口", port);
            // 輪詢
            iAmListening(sel);
        } catch (IOException ex) {
            log.error("Server啟動失敗", ex);
        } finally {
            // close ssc/sel omitted...
        }
    }

    private void iAmListening(Selector sel) {
        // 輪詢
        while (!Thread.interrupted()) {
            try {
                int num = sel.select();
                // 有事件則處理
                if (num > 0) {
                    Iterator<SelectionKey> iter = sel.selectedKeys().iterator();
                    // 逐個處理
                    while (iter.hasNext()) {
                        SelectionKey sk = iter.next();
                        // 派發(fā)事件
                        dispatch(sk);
                        // 處理過即移除歹篓,避免重復處理
                        iter.remove();
                    }
                }
            } catch (IOException ex) {
                log.error("監(jiān)聽失敗", ex);
            }
        }
    }

    private void dispatch(SelectionKey sk) {
        // 事件處理程序都作為attachment掛載
        // 在SelectionKey上瘫证,并且包裝成Runnable
        Object attachment = sk.attachment();
        if (attachment instanceof Runnable) {
            ((Runnable) attachment).run();
        }
    }
}

MainReactor#run()和多線程版本的大體相同,區(qū)別主要在Acceptor庄撮。

@Slf4j
public class Acceptor implements Runnable {

    private int next = 0;

    private final Selector[] selectors;

    private final ServerSocketChannel ssc;

    public Acceptor(ServerSocketChannel ssc, int numberOfSubReactors) throws IOException {
        this.ssc = ssc;
        // 每個SubReactor持有一個Selector背捌,互不干擾,避免了潛在的線程同步需求
        Selector[] selectors = new Selector[numberOfSubReactors];
        SubReactor[] subReactors = new SubReactor[numberOfSubReactors];
        for (int i = 0; i < numberOfSubReactors; i++) {
            Selector sel = Selector.open();
            selectors[i] = sel;
            subReactors[i] = new SubReactor(sel);
            // 啟動一個獨立的線程處理SubReactor
            // SubReactor不處理連接請求洞斯,只處理OP_READ/OP_WRITE
            Thread thread = new Thread(subReactors[i]);
            thread.setName("SubReactor-thread-" + i);
            thread.start();
        }
        this.selectors = selectors;
    }

    @Override
    public void run() {
        try {
            // 獲取與客戶端通信的SocketChannel
            SocketChannel sc = ssc.accept();
            log.info("檢測到新的客戶端連接: {}毡庆,當前線程: {}", sc.getRemoteAddress(), Thread.currentThread().getName());
            // 通過簡單的輪詢算法,給每一個SocketChannel分配一個Selector
            // Selector關聯(lián)了SubReactor烙如,而SubReactor又關聯(lián)了一個獨立的
            // 線程么抗,這樣每個SocketChannel上的發(fā)生的事件就有對應的線程來處理了
            new Handler(sc, selectors[next]);
            if (++next == selectors.length) next = 0;
        } catch (IOException ex) {
            log.error("處理連接失敗", ex);
        }
    }
}

主從結(jié)構(gòu)的Acceptor就略微有點復雜了,它的主要工作有二:其一是初始化多個 SubReactor亚铁,每個 SubReactor 都綁定一個 Selector蝇刀,并且運行在獨立的線程中;其二在建立新的通道時徘溢,將其分配給某一個 SubReactor吞琐。這樣做的好處在于分攤了負載捆探,更好地均衡了 CPU 和 I/O 速率。

@Slf4j
public class SubReactor implements Runnable {

    private final Selector sel;

    public SubReactor(Selector sel) {
        this.sel = sel;
    }

    @Override
    public void run() {
        // 輪詢
        while (!Thread.interrupted()) {
            try {
                int num = sel.select(1000);
                // 有事件則處理
                if (num > 0) {
                    Iterator<SelectionKey> iter = sel.selectedKeys().iterator();
                    // 逐個處理
                    while (iter.hasNext()) {
                        SelectionKey sk = iter.next();
                        // 派發(fā)事件
                        dispatch(sk);
                        // 處理過即移除站粟,避免重復處理
                        iter.remove();
                    }
                }
            } catch (IOException ex) {
                log.error("監(jiān)聽失敗", ex);
            }
        }
    }

    private void dispatch(SelectionKey sk) {
        if (!sk.isValid()) return;
        String ops = "";
        if (sk.isReadable()) {
            ops = "有新數(shù)據(jù)可讀";
        }
        if (sk.isWritable()) {
            if (ops.length() != 0) {
                ops = ops + ","+ "有數(shù)據(jù)可寫";
            } else {
                ops = "有數(shù)據(jù)可寫";
            }
        }
        log.info("檢測到新的事件: {}黍图,當前線程: {}", ops, Thread.currentThread().getName());
        // 事件處理程序都作為attachment掛載
        // 在SelectionKey上,并且包裝成Runnable
        Object attachment = sk.attachment();
        if (attachment instanceof Runnable) {
            ((Runnable) attachment).run();
        }
    }
}

SubReactor 和 MainReactor 類似卒蘸,它也維護了一個事件循環(huán)雌隅,用來派發(fā) socket 讀、寫事件缸沃。Handler 和多線程版本保持一致恰起,同樣使用了線程池來處理非 I/O 任務,完整的示例看這里趾牧。

Netty 對 Reactor Pattern 的支持

Netty 對 Reactor Pattern 的幾個變種都有支持检盼,簡單配置一下 ServerBootstrap 即可實現(xiàn)。

Version Implementation
單線程 NioEventLoopGroup group = new NioEventLoopGroup(1);
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(group);
多線程 NioEventLoopGroup group = new NioEventLoopGroup(3);
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(group);
主從結(jié)構(gòu) NioEventLoopGroup bossGroup = new NioEventLoopGroup(2);
NioEventLoopGroup workerGroup = new NioEventLoopGroup(4);
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup);

參考

Doug Lea: Scalable IO in Java

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末翘单,一起剝皮案震驚了整個濱河市吨枉,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌哄芜,老刑警劉巖貌亭,帶你破解...
    沈念sama閱讀 217,277評論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異认臊,居然都是意外死亡圃庭,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,689評論 3 393
  • 文/潘曉璐 我一進店門失晴,熙熙樓的掌柜王于貴愁眉苦臉地迎上來剧腻,“玉大人,你說我怎么就攤上這事涂屁∈樵冢” “怎么了?”我有些...
    開封第一講書人閱讀 163,624評論 0 353
  • 文/不壞的土叔 我叫張陵拆又,是天一觀的道長儒旬。 經(jīng)常有香客問我,道長帖族,這世上最難降的妖魔是什么义矛? 我笑而不...
    開封第一講書人閱讀 58,356評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮盟萨,結(jié)果婚禮上凉翻,老公的妹妹穿的比我還像新娘。我一直安慰自己捻激,他們只是感情好制轰,可當我...
    茶點故事閱讀 67,402評論 6 392
  • 文/花漫 我一把揭開白布前计。 她就那樣靜靜地躺著,像睡著了一般垃杖。 火紅的嫁衣襯著肌膚如雪男杈。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,292評論 1 301
  • 那天调俘,我揣著相機與錄音伶棒,去河邊找鬼。 笑死彩库,一個胖子當著我的面吹牛肤无,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播骇钦,決...
    沈念sama閱讀 40,135評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼宛渐,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了眯搭?” 一聲冷哼從身側(cè)響起窥翩,我...
    開封第一講書人閱讀 38,992評論 0 275
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎鳞仙,沒想到半個月后寇蚊,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,429評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡棍好,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,636評論 3 334
  • 正文 我和宋清朗相戀三年仗岸,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片梳玫。...
    茶點故事閱讀 39,785評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖右犹,靈堂內(nèi)的尸體忽然破棺而出提澎,到底是詐尸還是另有隱情,我是刑警寧澤念链,帶...
    沈念sama閱讀 35,492評論 5 345
  • 正文 年R本政府宣布盼忌,位于F島的核電站,受9級特大地震影響掂墓,放射性物質(zhì)發(fā)生泄漏谦纱。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,092評論 3 328
  • 文/蒙蒙 一君编、第九天 我趴在偏房一處隱蔽的房頂上張望跨嘉。 院中可真熱鬧,春花似錦吃嘿、人聲如沸祠乃。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,723評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽亮瓷。三九已至琴拧,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間嘱支,已是汗流浹背蚓胸。 一陣腳步聲響...
    開封第一講書人閱讀 32,858評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留除师,地道東北人沛膳。 一個月前我還...
    沈念sama閱讀 47,891評論 2 370
  • 正文 我出身青樓,卻偏偏與公主長得像馍盟,于是被迫代替她去往敵國和親于置。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 44,713評論 2 354