Scalable IO in Java

經(jīng)典的網(wǎng)絡(luò)編程

一般網(wǎng)絡(luò)編程都具有以下幾個(gè)步驟:

  • 讀取請(qǐng)求 Read request
  • 解碼請(qǐng)求 Decode request
  • 處理服務(wù) Process services
  • 加密回應(yīng) Encode reply
  • 發(fā)送回應(yīng) Send reply

但是每一步的處理的內(nèi)容和成本都不一樣酬核。xml捅位、json嗓节、file等等

image.png

每種類型的處理程序都需要在各自都線程中來(lái)進(jìn)行练湿,用代碼表示就是如下

class Server implements Runnable {
    public void run() {
        try {
            ServerSocket ss = new ServerSocket(PORT);
            while (!Thread.interrupted())
                new Thread(new Handler(ss.accept())).start();
            // or, single-threaded, or a thread pool
        } catch (IOException ex) { /* ... */ }
    }

    static class Handler implements Runnable {
        final Socket socket;
        Handler(Socket s) { socket = s; }
        public void run() {
            try {
                byte[] input = new byte[MAX_INPUT];
                socket.getInputStream().read(input);
                byte[] output = process(input);
                socket.getOutputStream().write(output);
            } catch (IOException ex) { /* ... */ }
        }
        private byte[] process(byte[] cmd) { /* ... */ }
    }
}

如果當(dāng)前運(yùn)行線程沒(méi)有被中斷就一直循環(huán)創(chuàng)建一個(gè)線程或者線程池用來(lái)處理ServerSocket里面的Socket請(qǐng)求。

注意:Thread.interrupted()和Thread.isInterrupted()

這樣會(huì)造成我們需要為每一個(gè)socket請(qǐng)求創(chuàng)建一個(gè)線程來(lái)處理對(duì)應(yīng)的數(shù)據(jù)。一旦用戶過(guò)多或者處理程序時(shí)間較長(zhǎng)就會(huì)造成各種各樣的問(wèn)題。無(wú)法并發(fā)狈谊,前面的活沒(méi)干完后面的需要等著,負(fù)載等等

優(yōu)化方向

  • 增加負(fù)載

  • 增加硬件 (CPU, memory, disk, bandwidth)

  • 同時(shí)滿足可用性和性能目標(biāo)

  • 減短延遲

  • 滿足高峰需求

  • 提高服務(wù)質(zhì)量

  • 通常來(lái)說(shuō)Divide-and-conquer(分而治之)是實(shí)現(xiàn)任何可擴(kuò)展性目標(biāo)的最佳方法

Divide-and-conquer(分而治之)

  • 將整體任務(wù)切割成小任務(wù)沟沙。每個(gè)小任務(wù)只執(zhí)行單一任務(wù)河劝,并且不會(huì)阻塞其他小任務(wù)的運(yùn)行

  • 用IO事件來(lái)觸發(fā)每個(gè)小任務(wù)的啟動(dòng)

  • java.nio 中支持的基本機(jī)制

  • 非阻塞讀取和寫入

  • 用感測(cè)到的IO事件來(lái)調(diào)度相關(guān)的任務(wù)

  • 事件驅(qū)動(dòng)設(shè)計(jì)中可能出現(xiàn)的無(wú)盡變化

Event-driven Designs 事件驅(qū)動(dòng)設(shè)計(jì)

  • 比較有效的方法

  • 占用更少的資源,每個(gè)客戶端不一定需要單獨(dú)創(chuàng)建一個(gè)線程

  • 減少開銷矛紫。減少Context的切換可以相應(yīng)的減少鎖定

  • 調(diào)度可能會(huì)更慢赎瞎,所以必須手動(dòng)將動(dòng)作綁定到事件

  • 更難的編程

  • 將動(dòng)作分解為簡(jiǎn)單非阻塞的

  • 類似于GUI事件驅(qū)動(dòng)的動(dòng)作

  • 無(wú)法消除所有阻塞。比如:GC颊咬,頁(yè)面錯(cuò)誤等

  • 必須跟蹤服務(wù)的邏輯狀態(tài)

AWT事件機(jī)制

IO事件驅(qū)動(dòng)使用相似的想法务甥,但設(shè)計(jì)不同

image.png

java.awt是一個(gè)軟件包,包含用于創(chuàng)建用戶界面和繪制圖形圖像的所有分類

Reactor Pattern(反應(yīng)堆模式)

  • Reactor通過(guò)調(diào)度來(lái)響應(yīng)IO事件喳篇。如:AWT thread
  • Handler執(zhí)行非阻塞動(dòng)作敞临。如:AWT ActionListeners
  • Manager將Handler綁定到事件上。如:AWT addActionListener

預(yù)先使用Manager將Handler綁定到指定的事件上麸澜,如onClick

用戶點(diǎn)擊按鈕的時(shí)候挺尿,Reactor獲取到事件,并調(diào)度事先綁定好的處理程序

經(jīng)典的Reactor設(shè)計(jì)

單線程版本

image.png

java.nio 支持

  • Channels

  • 支持非阻塞的讀取文件和socket連接

  • Buffers

  • Channels通過(guò)Buffers可以直接讀取或者寫入對(duì)象

  • Selectors

  • 通知一組Channels觸發(fā)了哪些IO事件

  • SelectionKeys

  • 維護(hù)IO事件的狀態(tài)和綁定

Reactor 實(shí)現(xiàn)

Setup

class Reactor implements Runnable {
        //Selector選擇器
        final Selector selector;
        //Socket服務(wù)通道
        final ServerSocketChannel serverSocket;

        Reactor(int port) throws IOException {
            //創(chuàng)建一個(gè)Selector
            selector = Selector.open();
            //創(chuàng)建一個(gè)Socket Channel
            serverSocket = ServerSocketChannel.open();
            //將Socket Channel綁定到指定端口
            serverSocket.socket().bind(
                    new InetSocketAddress(port));
            //設(shè)置Socket Channel為非阻塞
            serverSocket.configureBlocking(false);
            //將Selector和Socket Channel注冊(cè)到SelectionKey
            SelectionKey sk =
                    serverSocket.register(selector, SelectionKey.OP_ACCEPT);
            //將SelectionKey附加到接受者
            sk.attach(new Acceptor());
        }

        /*
        也可以使用SPI提供接口:
        SelectorProvider p = SelectorProvider.provider();
        selector = p.openSelector();
        serverSocket = p.openServerSocketChannel();
         */
    }

Dispatch Loop

// class Reactor continued
        public void run() { //通常在新線程中執(zhí)行
            try {
                //如果當(dāng)前線程沒(méi)有中斷就循環(huán)執(zhí)行
                while (!Thread.interrupted()) {
                    //查詢選擇器中獲取已經(jīng)準(zhǔn)備好的并且注冊(cè)過(guò)的操作
                    selector.select();
                    //獲取所有已經(jīng)準(zhǔn)備好的并且注冊(cè)過(guò)的操作
                    Set selected = selector.selectedKeys();
                    //循環(huán)遍歷
                    for (Object o : selected) {
                        //調(diào)度任務(wù)并處理事件操作
                        dispatch((SelectionKey) o);
                    }
                    //移除選擇器
                    selected.clear();
                }
            } catch (IOException ex) { /* ... */ }
        }

        //處理事件操作
        void dispatch(SelectionKey k) {
            //獲取SelectionKey中綁定的處理程序,如果不為空就執(zhí)行
            Runnable r = (Runnable) (k.attachment());
            if (r != null)
                r.run();
        }

Acceptor

// class Reactor continued
        // 創(chuàng)建接收器
        class Acceptor implements Runnable {
            public void run() {
                try {
                    //獲取連接成功到客戶端連接
                    SocketChannel c = serverSocket.accept();
                    if (c != null) {
                        //如果不為空就處理客戶端連接以及selector
                        new Handler(selector, c);
                    }
                } catch (IOException ex) { /* ... */ }
            }
        }

Handler setup

//處理程序
        final class Handler implements Runnable {
            //指定最大輸入bytes
            private static final int MAXIN = 1024;
            //指定最大輸出bytes
            private static final int MAXOUT = 1024;
            //客戶端連接
            final SocketChannel socket;
            final SelectionKey sk;
            ByteBuffer input = ByteBuffer.allocate(MAXIN);
            ByteBuffer output = ByteBuffer.allocate(MAXOUT);
            static final int READING = 0, SENDING = 1;
            int state = READING;

            Handler(Selector sel, SocketChannel c) throws IOException {
                socket = c;
                //配置非阻塞模式
                c.configureBlocking(false);
                //將客戶端連接和讀注冊(cè)到SelectionKey
                sk = socket.register(sel, SelectionKey.OP_READ);
                //將SelectionKey附加到當(dāng)前線程的run
                sk.attach(this);
                //將SelectionKey的操作設(shè)置為讀取
                sk.interestOps(SelectionKey.OP_READ);
                //喚醒Selector
                sel.wakeup();
            }
        }

Request handling

// class Handler continued

            //輸入處理完成
            boolean inputIsComplete() { /* ... */
                return true;
            }

            //輸出處理完成
            boolean outputIsComplete() { /* ... */
                return true;
            }

            //處理過(guò)程中
            void process() { /* ... */ }

            public void run() {
                try {
                    //根據(jù)不同的狀態(tài)進(jìn)行不同的處理程序
                    if (state == READING) read();
                    else if (state == SENDING) send();
                } catch (IOException ex) { /* ... */ }
            }

            //讀取數(shù)據(jù)
            void read() throws IOException {
                //從客戶端獲取數(shù)據(jù)
                socket.read(input);
                //如果讀取完成
                if (inputIsComplete()) {
                    //處理數(shù)據(jù)
                    process();
                    //標(biāo)記為發(fā)送狀態(tài)
                    state = SENDING;
                    // 將SelectionKey的操作設(shè)置為寫入
                    sk.interestOps(SelectionKey.OP_WRITE);
                }
            }

            //發(fā)送數(shù)據(jù)
            void send() throws IOException {
                //將數(shù)據(jù)寫入客戶端連接
                socket.write(output);
                //發(fā)送完成后將SelectionKey中的綁定取消
                if (outputIsComplete()) sk.cancel();
            }
        }

Per-State Handlers

GoF State-Object pattern 狀態(tài)模式编矾,針對(duì)狀態(tài)重新綁定對(duì)應(yīng)的處理程序

//處理程序
        class Handler {
            // 初始化為讀取狀態(tài)
            public void run() { 
                //客戶端讀取數(shù)據(jù)
                socket.read(input);
                //讀取完成
                if (inputIsComplete()) {
                    //處理數(shù)據(jù)
                    process();
                    //附加新的處理程序Sender
                    sk.attach(new Sender());
                    //標(biāo)記狀態(tài)為寫入
                    sk.interest(SelectionKey.OP_WRITE);
                    //喚醒SelectionKey中綁定的Selector
                    sk.selector().wakeup();
                }
            }
            
            //處理程序Sender
            class Sender implements Runnable {
                public void run(){ // ...
                    //寫入數(shù)據(jù)
                    socket.write(output);
                    //寫入完成之后將SelectionKey中的綁定取消
                    if (outputIsComplete()) sk.cancel();
                }
            }
        }

Multithreaded Designs 多線程設(shè)計(jì)

  • 戰(zhàn)略性的為擴(kuò)展性增加線程

  • 主要適用于多處理器

  • 工作線程

  • Reactor可以快速的觸發(fā)處理程序

  • 因?yàn)樘幚沓绦蜻^(guò)多或者處理時(shí)間過(guò)程會(huì)減慢Reactor的速度

  • 將非IO處理放到其他的線程

  • 多個(gè)Reactor處理線程

  • Reactor線程任務(wù)過(guò)多會(huì)導(dǎo)致IO飽和

  • 分配一些任務(wù)給其他Reactor線程

  • 負(fù)載均衡以匹配CPU和IO速率

Worker Threads 工作線程設(shè)計(jì)

  • 將非IO處理放到其他的線程來(lái)加快Reactor線程

  • 比計(jì)算綁定處理重新處理為事件驅(qū)動(dòng)的形式更簡(jiǎn)單

  • 應(yīng)該仍然是純非阻塞計(jì)算

  • 足夠的處理勝過(guò)開銷

  • 很難與IO重疊處理

  • 最好能先將所有輸入讀入緩沖區(qū)

  • 使用線程池可以進(jìn)行調(diào)優(yōu)和控制

  • 通常需要的線程數(shù)比客戶端少得多

image.png

Handler with Thread Pool 多線程處理

class Handler implements Runnable {
            // 創(chuàng)建一個(gè)線程池 
            static PooledExecutor pool = new PooledExecutor(...);
            //設(shè)置處理狀態(tài)
            static final int PROCESSING = 3;
            //讀數(shù)據(jù)操作熟史,設(shè)計(jì)到多線程讀取需要加線程鎖
            synchronized void read() { 
                //讀取數(shù)據(jù)
                socket.read(input);
                //讀取完成
                if (inputIsComplete()) {
                    //標(biāo)記為處理狀態(tài)
                    state = PROCESSING;
                    //將處理過(guò)程放到線程池中執(zhí)行
                    pool.execute(new Processer());
                }
            }

            //處理數(shù)據(jù)線程
            class Processer implements Runnable {
                public void run() { processAndHandOff(); }
            }
            
            //處理數(shù)據(jù)并關(guān)閉
            synchronized void processAndHandOff() {
                //處理數(shù)據(jù)
                process();
                //標(biāo)記處理完成并標(biāo)記發(fā)送狀態(tài)
                state = SENDING; // 或者綁定其他操作
                //將SelectionKey的操作設(shè)置為寫入
                sk.interest(SelectionKey.OP_WRITE);
            }
        }

協(xié)調(diào)任務(wù)Coordinating Tasks

  • Handoffs 傳遞

  • 循環(huán)任務(wù)的啟用、觸發(fā)或調(diào)用下一個(gè)任務(wù)

  • 通常是最快的但同時(shí)也是脆弱的

  • 給每個(gè)處理程序觸發(fā)回調(diào)

  • 設(shè)置狀態(tài)窄俏、附加處理程序等等

  • 狀態(tài)模式

  • Queues 隊(duì)列

  • 比如跨階段傳遞buffers

  • Futures

  • 當(dāng)每個(gè)任務(wù)產(chǎn)生結(jié)果時(shí)觸發(fā)

  • 協(xié)調(diào)層位于連接或等待/通知之上

Using PooledExecutor 使用線程池執(zhí)行

  • 一個(gè)可優(yōu)化的工作線程池

  • 主方法執(zhí)行(Runnable r)

  • 控制

  • 任務(wù)隊(duì)列的類型(任何通道)

  • 最大線程數(shù)

  • 最小線程數(shù)

  • "Warm" 與按需加載線程

  • 保持活動(dòng)間隔蹂匹,直到空閑線程死亡

  • 如有必要,稍后將其替換為新的

  • 飽和策略

  • 阻塞凹蜈、下降限寞、生產(chǎn)運(yùn)行等

Multiple Reactor Threads 多個(gè)Reactor線程

  • 使用Reactor線程池

  • 用于匹配CPU和IO速率

  • 靜態(tài)或動(dòng)態(tài)構(gòu)造

  • 每個(gè)Reactor都有自己的選擇器,線程踪区,調(diào)度循環(huán)

  • 主接收器分配到專用的Reactor

image.png

Using other java.nio features 使用其他的java.nio特性

  • 一個(gè)Reactor對(duì)應(yīng)多個(gè)Selectors

  • 將不同的處理程序綁定到不同的IO事件

  • 調(diào)度需要仔細(xì)處理線程安全

  • 文件傳輸

  • 自動(dòng)化的文件到網(wǎng)絡(luò)或網(wǎng)絡(luò)到文件的復(fù)制

  • 內(nèi)存映射文件

  • 通過(guò)緩沖區(qū)訪問(wèn)文件

  • 直接訪問(wèn)緩沖區(qū)

  • 有可能實(shí)現(xiàn)零拷貝傳輸嗎

  • 但是有設(shè)置和完成的開銷

  • 最適合長(zhǎng)時(shí)間連接的應(yīng)用

Connection-Based Extensions 基礎(chǔ)連接的擴(kuò)展

  • 不能使用單個(gè)服務(wù)請(qǐng)求

  • 客戶端連接

  • 客戶端發(fā)送一系列消息/請(qǐng)求

  • 客戶端斷開連接

  • 舉例

  • 數(shù)據(jù)庫(kù)和事務(wù)監(jiān)控器

  • 多人游戲昆烁,聊天等

  • 可以擴(kuò)展基本的網(wǎng)絡(luò)服務(wù)模式

  • 處理許多相對(duì)長(zhǎng)連接的客戶

  • 跟蹤客戶端和會(huì)話狀態(tài)(包括丟棄)

  • 分布式部署服務(wù)

原文:Doug Lea Scalable IO in Java

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末吊骤,一起剝皮案震驚了整個(gè)濱河市缎岗,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌白粉,老刑警劉巖传泊,帶你破解...
    沈念sama閱讀 211,123評(píng)論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異鸭巴,居然都是意外死亡眷细,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,031評(píng)論 2 384
  • 文/潘曉璐 我一進(jìn)店門鹃祖,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)溪椎,“玉大人,你說(shuō)我怎么就攤上這事恬口⌒6粒” “怎么了?”我有些...
    開封第一講書人閱讀 156,723評(píng)論 0 345
  • 文/不壞的土叔 我叫張陵祖能,是天一觀的道長(zhǎng)歉秫。 經(jīng)常有香客問(wèn)我,道長(zhǎng)养铸,這世上最難降的妖魔是什么雁芙? 我笑而不...
    開封第一講書人閱讀 56,357評(píng)論 1 283
  • 正文 為了忘掉前任,我火速辦了婚禮钞螟,結(jié)果婚禮上兔甘,老公的妹妹穿的比我還像新娘。我一直安慰自己鳞滨,他們只是感情好洞焙,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,412評(píng)論 5 384
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著,像睡著了一般闽晦。 火紅的嫁衣襯著肌膚如雪扳碍。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,760評(píng)論 1 289
  • 那天仙蛉,我揣著相機(jī)與錄音笋敞,去河邊找鬼。 笑死荠瘪,一個(gè)胖子當(dāng)著我的面吹牛夯巷,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播哀墓,決...
    沈念sama閱讀 38,904評(píng)論 3 405
  • 文/蒼蘭香墨 我猛地睜開眼趁餐,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了篮绰?” 一聲冷哼從身側(cè)響起后雷,我...
    開封第一講書人閱讀 37,672評(píng)論 0 266
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎吠各,沒(méi)想到半個(gè)月后臀突,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,118評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡贾漏,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,456評(píng)論 2 325
  • 正文 我和宋清朗相戀三年候学,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片纵散。...
    茶點(diǎn)故事閱讀 38,599評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡梳码,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出伍掀,到底是詐尸還是另有隱情掰茶,我是刑警寧澤,帶...
    沈念sama閱讀 34,264評(píng)論 4 328
  • 正文 年R本政府宣布硕盹,位于F島的核電站符匾,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏瘩例。R本人自食惡果不足惜啊胶,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,857評(píng)論 3 312
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望垛贤。 院中可真熱鬧焰坪,春花似錦、人聲如沸聘惦。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,731評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至黔漂,卻和暖如春诫尽,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背炬守。 一陣腳步聲響...
    開封第一講書人閱讀 31,956評(píng)論 1 264
  • 我被黑心中介騙來(lái)泰國(guó)打工牧嫉, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人减途。 一個(gè)月前我還...
    沈念sama閱讀 46,286評(píng)論 2 360
  • 正文 我出身青樓酣藻,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親鳍置。 傳聞我的和親對(duì)象是個(gè)殘疾皇子辽剧,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,465評(píng)論 2 348