Node中stream的深度感知

背景

之前在開發(fā)ASP.NET的時候阻桅,根據(jù)源代碼依次追蹤整個程序的執(zhí)行過程楣黍,發(fā)現(xiàn)最重要的過程是基于一個管道流的附迷,像水管一樣惧互,依次處理管道流中的十幾個事件拯刁,當時對流的認知是四個字,依次執(zhí)行痘煤。那么現(xiàn)在做Node的開發(fā)哩治,對于Node中的流是另四個字,那就是源源不斷艾猜。本篇文章主要目的是帶大家手寫可讀流與可寫流买喧。

簡介

在Node中,請求流匆赃,響應流淤毛,文件流等都是基于stream模塊封裝的。簡單的理解算柳,流就是將大塊的東西低淡,分小塊依次處理。就像你需要10kg的水瞬项,水管就一點點的源源不斷的流出來給你蔗蹋。又如在程序中

fs.readFileSync('/demo.txt', {encoding:'utf8'});fs.writeFileSync('/demo.txt', data);

以上兩個方法是把文件內(nèi)容全部讀入內(nèi)存,然后再寫入文件囱淋,但是如果文件過大就會出現(xiàn)問題了猪杭,內(nèi)存容易爆掉。這里就需要用到流了妥衣,一點點的讀取或者寫入皂吮。

分類

  • Readable - 可讀的流 (例如 fs.createReadStream()).
  • Writable - 可寫的流 (例如 fs.createWriteStream()).
  • Duplex - 可讀寫的流 (例如 net.Socket).
  • Transform - 在讀寫過程中可以修改和變換數(shù)據(jù)的 Duplex 流 (例如 zlib.createDeflate()).

Readable - 可讀的流介紹與實現(xiàn)

可讀流分為兩種模式:flowing 和 paused,并且兩種模式可以相互轉(zhuǎn)換

1.在 flowing 模式下税手, 可讀流自動從系統(tǒng)底層讀取數(shù)據(jù)蜂筹,并通過 EventEmitter 接口的事件盡快將數(shù)據(jù)提供給應用。

2.在 paused 模式下冈止,必須顯式調(diào)用 stream.read() 方法來從流中讀取數(shù)據(jù)片段狂票。

3.所有初始工作模式為 paused 的 Readable 流,可以通過下面三種途徑切換到 flowing 模式:

  • 監(jiān)聽 'data' 事件
  • 調(diào)用 stream.resume() 方法
  • 調(diào)用 stream.pipe() 方法將數(shù)據(jù)發(fā)送到 Writable

4.可讀流可以通過下面途徑切換到 paused 模式:

  • 如果不存在管道目標(pipe destination)熙暴,可以通過調(diào)用 stream.pause() 方法實現(xiàn)闺属。
  • 如果存在管道目標,可以通過取消 'data' 事件監(jiān)聽周霉,并調(diào)用 stream.unpipe() 方法移除所有管道目標來實現(xiàn)掂器。

5.為了便于理解,這里分開寫兩種模式俱箱,下面為flowing模式基本實現(xiàn)

  • 定義類首先要繼承自EventEmitter国瓮,因為要發(fā)射監(jiān)聽事件,在構(gòu)造函數(shù)中依次定義各個參數(shù),其中需要說明的是this.flowing 用于切換模式乃摹,this.buffer并非是緩存禁漓,因為流動模式是默認不走緩存的,這個buffer是讀取的時候fs.read的一個參數(shù)孵睬,所有的事件監(jiān)聽到都會執(zhí)行newListener播歼。當該類被構(gòu)造的時候就打開文件,并監(jiān)聽事件掰读,如果是data秘狞,默認走流動模式開始讀取。
class ReadStream extends EventEmitter{
    constructor(path,options){
        super(path,options);
        this.path = path;//寫入路徑
        this.flags = options.flags || 'r'; //操作修飾符
        this.mode = options.mode || 0o666; //權(quán)限
        this.autoClose = options.autoClose;//是否自動關(guān)閉
        this.highWaterMark = options.highWaterMark || 64 * 1024; //默認64k
        this.pos = this.start = options.start || 0;//起始位置
        this.end = options.end;//結(jié)束位置
        this.encoding = options.encoding;//編碼
        this.flowing = null;//流動模式
        this.buffer = Buffer.alloc(this.highWaterMark);//讀取的buffer 不是緩存
        this.open();
        this.on('newListener',(type,listener)=>{
            if (type == 'data') {
                this.flowing = true;
                this.read();
            }
        })
    }
  • open方法 打開傳入路徑的文件 如果出錯并且設置自動關(guān)閉屬性蹈集,直接關(guān)閉打開烁试,發(fā)射error事件。如果成功了拢肆,發(fā)射open事件减响,供讀取方法接收。
 open() {
        fs.open(this.path,this.flags,this.mode,(err,fd)=>{
           if(err){
               if(this.autoClose){
                   this.destroy();
                   return this.emit('error',err);
               }
           }
           this.fd = fd;
           this.emit('open');
        })
    }
  • 最重要的方法Read郭怪,一進入就需要判斷文件是否已經(jīng)打開了辩蛋,因為文件打開是異步的一個過程,此時可能并未打開移盆,如果沒有打開就監(jiān)聽發(fā)射的open事件,然后在回調(diào)函數(shù)中進行read方法的調(diào)用伤为。其中核心是每次需要讀多少咒循,這個量由傳入的開始結(jié)束位置已經(jīng)最高水位線決定的。最高水位線代表依次最多讀取多少绞愚,默認值是64kb叙甸。那么每次讀取的值 howMuchToRead = this.end?Math.min(this.end - this.pos + 1,this.highWaterMark):this.highWaterMark;
read(){
        if(typeof this.fd != 'number'){
            return this.once('open',()=>this.read());
        }
        let howMuchToRead = this.end?Math.min(this.end - this.pos + 1,this.highWaterMark):this.highWaterMark;
        fs.read(this.fd,this.buffer,0,howMuchToRead,this.pos,(err,bytes)=>{//bytes是實際讀到的字節(jié)數(shù)
            if(err){
                if(this.autoClose)
                    this.destroy();
                return this.emit('error',err);
            }
            if(bytes){
                let data = this.buffer.slice(0,bytes);
                this.pos += bytes;
                data = this.encoding?data.toString(this.encoding):data;
                this.emit('data',data);
                if(this.end && this.pos > this.end){
                   return this.endFn();
                }else{
                    if(this.flowing)
                      this.read();
                }
            }else{
                return this.endFn();
            }

        })
    }
  • pipe方法實現(xiàn) pipe方法就是邊讀取邊寫入,控制讀寫速度位衩,當可寫流的寫入返回false時裆蒸,暫停讀取,當可寫流觸發(fā)drain事件后糖驴,恢復讀取僚祷。
pipe(ws){
        this.on('data',data =>{
            let flag = ws.write(data);
            if (!flag) {
                this.pause();
            }
        })
        ws.on('drain',()=>{
            this.resume();
        })
    }
    pause(){
        this.flowing  = false;
    }
    resume(){
        this.flowing  = true;
        this.read();
    }
  1. 暫停模式 暫停模式不同的是需要走緩存,并且監(jiān)聽的是readable事件(這里我只貼出具有差異性的代碼)
  • 在構(gòu)造函數(shù)中需要加入this.buffers = [];(源碼中為了提高效率贮缕,使用的是鏈表結(jié)構(gòu)辙谜,這里我用數(shù)組代替),以及readable事件的監(jiān)聽感昼。
 this.on('newListener', (type) => {
            if (type == 'data') {
                this.flowing = true;
                this.read();
            }
            if (type == 'readable') {
                this.read(0);
            }
        });
  • 這里read方法需要傳入一個n装哆,表示需要讀取的字節(jié)數(shù)。如果判斷緩存的大小,即this.length,如果this.length == 0 || this.length < this.highWaterMark 蜕琴,執(zhí)行_read()方法,此時執(zhí)行的n為0
 let _read = () => {
            let m = this.end ? Math.min(this.end - this.pos + 1, this.highWaterMark) : this.highWaterMark;
            fs.read(this.fd, this.buffer, 0, m, this.pos, (err, bytesRead) => {
                if (err) {
                    return
                }
                let data;
                if (bytesRead > 0) {
                    data = this.buffer.slice(0, bytesRead);
                    this.pos += bytesRead;
                    this.length += bytesRead;
                    if (this.end && this.pos > this.end) {
                        if (this.needReadable) {
                            this.emit('readable');
                        }

                        this.emit('end');
                    } else {
                        this.buffers.push(data);
                        if (this.needReadable) {
                            this.emit('readable');
                            this.needReadable = false;
                        }

                    }
                } else {
                    if (this.needReadable) {
                        this.emit('readable');
                    }
                    return this.emit('end');
                }
            })
        }
  • 如果傳入的n值 0 < n < this.length萍桌,走以下邏輯,即從緩存區(qū)中讀取相應的字節(jié)數(shù)進行讀取
  if (0 < n < this.length) {
            ret = Buffer.alloc(n);
            let b;
            let index = 0;
            while (null != (b = this.buffers.shift())) {
                for (let i = 0; i < b.length; i++) {
                    ret[index++] = b[i];
                    if (index == ret.length) {
                        this.length -= n;
                        b = b.slice(i + 1);
                        this.buffers.unshift(b);
                        break;
                    }
                }
            }
            if (this.encoding) ret = ret.toString(this.encoding);
        }

Writable - 可讀的流介紹與實現(xiàn)

  • 構(gòu)造函數(shù)跟上面差距不大,有一個this.buffer的緩存區(qū)凌简,并且最高水位線默認為16k
  //構(gòu)造函數(shù)
    constructor(path,options){
        super(path,options);
        this.path = path; //寫入路徑
        this.flags = options.flags || 'w';//操作修飾符
        this.mode = options.mode || 0o666;//權(quán)限
        this.start = options.start || 0;//寫入的起始位置
        this.pos = this.start;//文件的寫入索引
        this.encoding = options.encoding || 'utf8';//編碼
        this.autoClose = options.autoClose;//自動關(guān)閉
        this.highWaterMark = options.highWaterMark || 16 * 1024; //默認最高水位線16k
        this.buffers = [];//緩存區(qū) 源碼里面是鏈表
        this.writing = false;//標識內(nèi)部正在寫入數(shù)據(jù)
        this.length = 0;//標識緩存區(qū)字節(jié)的長度
        this.open();//默認一創(chuàng)建就打開文件
    }
  • open即打開文件是一樣的上炎,這里不再描述。最重要的write方法号醉,該方法有一個返回值反症,標識緩存區(qū)的長度是否超過了最高水位線,特別注意的是如果超過了也會寫入進去的畔派,因為會放入到緩存區(qū)中铅碍,等到drain事件觸發(fā),再接著寫入线椰,具體寫入是執(zhí)行的_write方法胞谈,其中有一個this.writing 標識是否正在寫入,如果正在寫入憨愉,則放入緩存區(qū)中烦绳,在清空緩存區(qū)的時候依次取出寫入,即以下的clearBuffer方法配紫,此方法中當緩存區(qū)清空了以后觸發(fā)drain事件径密,這里需要特殊說明一下,如果緩存區(qū)從未滿過躺孝,是不會觸發(fā)這個事件的享扔。
write(chunk,encoding,callback){
        chunk = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk,this.encoding);//此方法只吸入buffer或者字符串
        this.length += chunk.length;//當前緩存區(qū)的長度
        if (this.writing) {//如果正在寫入數(shù)據(jù) 則需要把數(shù)據(jù)放入緩存區(qū)里面
            this.buffers.push({
                chunk,
                encoding,
                callback
            })
        } else { //如果當前空閑 直接調(diào)用底層寫入的方法進行寫入 并且在寫完以后 清空緩存區(qū)
            this.writing = true;
            this._write(chunk,encoding,()=>{
                callback&&callback();
                this.clearBuffer();
            })
        }

        //write方法有一個返回值 表示當前緩存區(qū)是否超過了最高水位線 即是否能繼續(xù)寫入
        return this.length < this.highWaterMark;
    }

    _write(chunk,encoding,callback){
        if (typeof this.fd != 'number') { //因為是異步的 文件可能在這個時候并未打開
            return this.once('open',()=>this._write(chunk, encoding, callback));
        }
        fs.write(this.fd,chunk,0,chunk.length,this.pos,(err,bytesWritten)=>{
            if (err) {
                if (this.autoClose) {
                    this.destory();
                }
               return this.emit('error',err);
            }
            this.pos += bytesWritten;

            this.length -= bytesWritten;
            callback&&callback();
        })
    }
    clearBuffer(){
        let data = this.buffers.shift();
        if(data){
                this._write(data.chunk,data.encoding,()=>{
                    data.callback && data.callback();
                    this.clearBuffer();
                })
            }else{
                this.writing = false;
                //緩存區(qū)清空了 緩存區(qū)如果沒有滿過 是不會觸發(fā)這個事件的
                this.emit('drain');
            }
    }

參考鏈接

  1. Node.js API文檔
  2. 深入理解 Node Stream 內(nèi)部機制
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市植袍,隨后出現(xiàn)的幾起案子惧眠,更是在濱河造成了極大的恐慌,老刑警劉巖于个,帶你破解...
    沈念sama閱讀 210,978評論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件氛魁,死亡現(xiàn)場離奇詭異,居然都是意外死亡厅篓,警方通過查閱死者的電腦和手機秀存,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 89,954評論 2 384
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來羽氮,“玉大人应又,你說我怎么就攤上這事》啵” “怎么了株扛?”我有些...
    開封第一講書人閱讀 156,623評論 0 345
  • 文/不壞的土叔 我叫張陵尤筐,是天一觀的道長。 經(jīng)常有香客問我洞就,道長盆繁,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,324評論 1 282
  • 正文 為了忘掉前任旬蟋,我火速辦了婚禮油昂,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘倾贰。我一直安慰自己冕碟,他們只是感情好,可當我...
    茶點故事閱讀 65,390評論 5 384
  • 文/花漫 我一把揭開白布匆浙。 她就那樣靜靜地躺著安寺,像睡著了一般。 火紅的嫁衣襯著肌膚如雪首尼。 梳的紋絲不亂的頭發(fā)上挑庶,一...
    開封第一講書人閱讀 49,741評論 1 289
  • 那天,我揣著相機與錄音软能,去河邊找鬼迎捺。 笑死,一個胖子當著我的面吹牛查排,可吹牛的內(nèi)容都是我干的凳枝。 我是一名探鬼主播,決...
    沈念sama閱讀 38,892評論 3 405
  • 文/蒼蘭香墨 我猛地睜開眼跋核,長吁一口氣:“原來是場噩夢啊……” “哼范舀!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起了罪,我...
    開封第一講書人閱讀 37,655評論 0 266
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎聪全,沒想到半個月后泊藕,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,104評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡难礼,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,451評論 2 325
  • 正文 我和宋清朗相戀三年娃圆,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片蛾茉。...
    茶點故事閱讀 38,569評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡讼呢,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出谦炬,到底是詐尸還是另有隱情悦屏,我是刑警寧澤节沦,帶...
    沈念sama閱讀 34,254評論 4 328
  • 正文 年R本政府宣布,位于F島的核電站础爬,受9級特大地震影響甫贯,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜看蚜,卻給世界環(huán)境...
    茶點故事閱讀 39,834評論 3 312
  • 文/蒙蒙 一叫搁、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧供炎,春花似錦渴逻、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,725評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至纽竣,卻和暖如春墓贿,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背蜓氨。 一陣腳步聲響...
    開封第一講書人閱讀 31,950評論 1 264
  • 我被黑心中介騙來泰國打工聋袋, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人穴吹。 一個月前我還...
    沈念sama閱讀 46,260評論 2 360
  • 正文 我出身青樓幽勒,卻偏偏與公主長得像,于是被迫代替她去往敵國和親港令。 傳聞我的和親對象是個殘疾皇子啥容,可洞房花燭夜當晚...
    茶點故事閱讀 43,446評論 2 348