我理解的NODEJS Stream

眾所周知者蠕,NODEJS在高并發(fā),I/O密集型應用操作的時候有很多優(yōu)勢屡贺,而這些都脫離不了“流”的支撐蠢棱。請求流锌杀,響應流甩栈,文件流,Socket流糕再,甚至console模塊都使用了流量没。而流的實現(xiàn),尤其是其內(nèi)部的實現(xiàn)突想,在整個NODEJS的學習中就很有學習的必要殴蹄。

流的模型可以總結為“生產(chǎn)者究抓,消費者”模型。流的一端生產(chǎn)數(shù)據(jù)(可以理解為從水龍頭放水)袭灯,另一端消費數(shù)據(jù)(水放出來后你干嘛用不管刺下,消費就好了)。當然稽荧,其內(nèi)部代碼實現(xiàn)會考慮很多細節(jié)的地方橘茉,這些可以通過調(diào)試進入源碼查看很多細節(jié)的地方。

在源碼中跟流相關的模塊有:

  • lib/module.js
  • lib/stream_readable.js
  • lib/stream_writable.js
  • lib/stream_transform.js
  • lib/stream_duplex.js
    源碼非常清晰姨丈,這就對應流的四種類型畅卓,Readable流,Writable流蟋恬,Transform流翁潘,Duplex流。
    其中Readable和Writable是重點歼争,這兩個搞明白拜马,Transform和Duplex就比較簡單了。

Readable Stream
Readable Stream有兩種模式矾飞,一種是Flowing Mode一膨,流動模式;另外一種是Paused Mode洒沦,暫停模式豹绪。
切換到流動模式的方式有:

  • 監(jiān)聽data時間
    rs.on("data", (chunk)=>{});
  • 調(diào)用stream.resume方法
  • 調(diào)用stream.pipe方法將數(shù)據(jù)發(fā)送給writable stream

切換到暫停模式的方法有:

  • 調(diào)用stream.pause方法
  • 如果存在管道,調(diào)用stream.unpipe方法

tip: 這兩種流是可以隨時切換的

流動模式和暫停模式有什么區(qū)別申眼,為什么要這么設計瞒津。
流動模式就是像流水一樣源源不斷的讀取數(shù)據(jù)(注意不是到緩存對象),不管你消費不消費括尸。
暫停模式可以暫時不讀取數(shù)據(jù)巷蚪,關閉水龍頭。
有一個用的比較多的詞語叫背壓濒翻,一般來說屁柏,讀的速度會比寫入的速度快,如果不暫停有送,還是源源不斷的讀取數(shù)據(jù)淌喻,會造成內(nèi)存過大,消耗性能雀摘,這個時候最好的方式是消費多少裸删,讀取多少,由此引申出管道的概念阵赠。
最好的管道就是生產(chǎn)和消費同步涯塔,如果讀的過快肌稻,先暫停讀取,有需要再通知讀取即可匕荸。
從上面的語義描述可以看出爹谭,流是基于事件的,事件在這里承擔著消息的注冊和發(fā)送榛搔。

上面的都是概念相關旦棉,現(xiàn)在來一個簡單版的可讀流,可寫流药薯。代碼雖然簡化了很多绑洛,但是對于理解整個流的流程,會非常有幫助童本。

先來看流動模式的可讀流

let EventEmitter = require('events');
let fs = require('fs');
class ReadStream extends EventEmitter {
    constructor(path, options) {
        super(path, options);
        this.path = path;
        this.flags = options.flags || 'r';
        this.mode = options.mode || 0o666;
        this.highWaterMark = options.highWaterMark || 64 * 1024;
        this.pos = this.start = options.start || 0;
        this.end = options.end;
        this.encoding = options.encoding;
        this.flowing = null;
        this.buffer = Buffer.alloc(this.highWaterMark);
        this.open();//準備打開文件讀取
        //當給這個實例添加了任意的監(jiān)聽函數(shù)時會觸發(fā)newListener
        this.on('newListener',(type,listener)=>{
            //如果監(jiān)聽了data事件真屯,流會自動切換的流動模式
            if(type == 'data'){
              this.flowing = true;
              this.read();
            }
        });
    }
    read(){
        if(typeof this.fd != 'number'){
            return this.once('open',()=>this.read());
        }
        let howMuchToRead = this.end?Math.min(this.end - this.pos + 1,this.highWaterMark):this.highWaterMark;
        //this.buffer并不是緩存區(qū)
        console.log('howMuchToRead',howMuchToRead);
        fs.read(this.fd,this.buffer,0,howMuchToRead,this.pos,(err,bytes)=>{//bytes是實際讀到的字節(jié)數(shù)
            if(err){
                if(this.autoClose)
                    this.destroy();
                return this.emit('error',err);
            }
            if(bytes){
                let data = this.buffer.slice(0,bytes);
                this.pos += bytes;
                data = this.encoding?data.toString(this.encoding):data;
                this.emit('data',data);
                if(this.end && this.pos > this.end){
                   return this.endFn();
                }else{
                    if(this.flowing)
                      this.read();
                }
            }else{
                return this.endFn();
            }

        })
    }
    endFn(){
        this.emit('end');
        this.destroy();
    }
    open() {
        fs.open(this.path,this.flags,this.mode,(err,fd)=>{
           if(err){
               if(this.autoClose){
                   this.destroy();
                   return this.emit('error',err);
               }
           }
           this.fd = fd;
           this.emit('open');
        })
    }
    destroy(){
        fs.close(this.fd,()=>{
            this.emit('close');
        });
    }
    pipe(dest){
        this.on('data',data=>{
            let flag = dest.write(data);
            if(!flag){
                this.pause();
            }
        });
        dest.on('drain',()=>{
            this.resume();
        });
    }
    //可讀流會進入流動模式,當暫停的時候穷娱,
    pause(){
        this.flowing = false;
    }
    resume(){
       this.flowing = true;
       this.read();
    }
}
module.exports = ReadStream;

暫停模式的可讀流

let fs = require('fs');
let EventEmitter = require('events');

class ReadStream extends EventEmitter {
    constructor(path, options) {
        super(path, options);
        this.path = path;
        this.highWaterMark = options.highWaterMark || 64 * 1024;
        this.buffer = Buffer.alloc(this.highWaterMark);
        this.flags = options.flags || 'r';
        this.encoding = options.encoding;
        this.mode = options.mode || 0o666;
        this.start = options.start || 0;
        this.end = options.end;
        this.pos = this.start;
        this.autoClose = options.autoClose || true;
        this.bytesRead = 0;
        this.closed = false;
        this.flowing;
        this.needReadable = false;
        this.length = 0;
        this.buffers = [];
        this.on('end', function () {
            if (this.autoClose) {
                this.destroy();
            }
        });
        this.on('newListener', (type) => {
            if (type == 'data') {
                this.flowing = true;
                this.read();
            }
            if (type == 'readable') {
                this.read(0);
            }
        });
        this.open();
    }

    open() {
        fs.open(this.path, this.flags, this.mode, (err, fd) => {
            if (err) {
                if (this.autoClose) {
                    this.destroy();
                    return this.emit('error', err);
                }
            }
            this.fd = fd;
            this.emit('open');
        });
    }

    read(n) {
        if (typeof this.fd != 'number') {
            return this.once('open', () => this.read());
        }
        n = parseInt(n, 10);
        if (n != n) {
            n = this.length;
        }
        if (this.length == 0)
            this.needReadable = true;
        let ret;
        if (0 < n < this.length) {
            ret = Buffer.alloc(n);
            let b;
            let index = 0;
            while (null != (b = this.buffers.shift())) {
                for (let i = 0; i < b.length; i++) {
                    ret[index++] = b[i];
                    if (index == ret.length) {
                        this.length -= n;
                        b = b.slice(i + 1);
                        this.buffers.unshift(b);
                        break;
                    }
                }
            }
            if (this.encoding) ret = ret.toString(this.encoding);
        }

        let _read = () => {
            let m = this.end ? Math.min(this.end - this.pos + 1, this.highWaterMark) : this.highWaterMark;
            fs.read(this.fd, this.buffer, 0, m, this.pos, (err, bytesRead) => {
                if (err) {
                    return
                }
                let data;
                if (bytesRead > 0) {
                    data = this.buffer.slice(0, bytesRead);
                    this.pos += bytesRead;
                    this.length += bytesRead;
                    if (this.end && this.pos > this.end) {
                        if (this.needReadable) {
                            this.emit('readable');
                        }

                        this.emit('end');
                    } else {
                        this.buffers.push(data);
                        if (this.needReadable) {
                            this.emit('readable');
                            this.needReadable = false;
                        }

                    }
                } else {
                    if (this.needReadable) {
                        this.emit('readable');
                    }
                    return this.emit('end');
                }
            })
        }
        if (this.length == 0 || (this.length < this.highWaterMark)) {
            _read(0);
        }
        return ret;
    }

    destroy() {
        fs.close(this.fd, (err) => {
            this.emit('close');
        });
    }

    pause() {
        this.flowing = false;
    }

    resume() {
        this.flowing = true;
        this.read();
    }

    pipe(dest) {
        this.on('data', (data) => {
            let flag = dest.write(data);
            if (!flag) this.pause();
        });
        dest.on('drain', () => {
            this.resume();
        });
        this.on('end', () => {
            dest.end();
        });
    }
}
module.exports = ReadStream;

可寫流

let fs = require('fs');
let EventEmitter = require('events');

class WriteStream extends EventEmitter {
    constructor(path, options) {
        super(path, options);
        this.path = path;
        this.flags = options.flags || 'w';
        this.mode = options.mode || 0o666;
        this.start = options.start || 0;
        this.pos = this.start;//文件的寫入索引
        this.encoding = options.encoding || 'utf8';
        this.autoClose = options.autoClose;
        this.highWaterMark = options.highWaterMark || 16 * 1024;
        this.buffers = [];//緩存區(qū)
        this.writing = false;//表示內(nèi)部正在寫入數(shù)據(jù)
        this.length = 0;//表示緩存區(qū)字節(jié)的長度
        this.open();
    }

    open() {
        fs.open(this.path, this.flags, this.mode, (err, fd) => {
            if (err) {
                if (this.autoClose) {
                    this.destroy();
                }
                return this.emit('error', err);
            }
            this.fd = fd;
            this.emit('open');
        });
    }

    //如果底層已經(jīng)在寫入數(shù)據(jù)的話绑蔫,則必須當前要寫入數(shù)據(jù)放在緩沖區(qū)里
    write(chunk, encoding, cb) {
        chunk = Buffer.isBuffer(chunk)?chunk:Buffer.from(chunk,this.encoding);
        let len = chunk.length;
        //緩存區(qū)的長度加上當前寫入的長度
        this.length += len;
        //判斷當前最新的緩存區(qū)是否小于最高水位線
        let ret = this.length < this.highWaterMark;
        if (this.writing) {//表示正在向底層寫數(shù)據(jù),則當前數(shù)據(jù)必須放在緩存區(qū)里
            this.buffers.push({
                chunk,
                encoding,
                cb
            });
        } else {//直接調(diào)用底層的寫入方法進行寫入
            //在底層寫完當前數(shù)據(jù)后要清空緩存區(qū)
            this.writing = true;
            this._write(chunk, encoding, () => this.clearBuffer());
        }
        return ret;
    }

    clearBuffer() {
        //取出緩存區(qū)中的第一個buffer
        //8 7
        let data = this.buffers.shift();
        if(data){
            this._write(data.chunk,data.encoding,()=>this.clearBuffer())
        }else{
            this.writing = false;
            //緩存區(qū)清空了
            this.emit('drain');
        }
    }

    _write(chunk, encoding, cb) {
       if(typeof this.fd != 'number'){
           return this.once('open',()=>this._write(chunk, encoding, cb));
       }
        fs.write(this.fd,chunk,0,chunk.length,this.pos,(err,bytesWritten)=>{
            if(err){
                if(this.autoClose){
                    this.destroy();
                    this.emit('error',err);
                }
            }
            this.pos += bytesWritten;
            //寫入多少字母泵额,緩存區(qū)減少多少字節(jié)
            this.length -= bytesWritten;
            cb && cb();
       })
    }

    destroy() {
        fs.close(this.fd, () => {
            this.emit('close');
        })
    }
}
module.exports = WriteStream;
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末配深,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子嫁盲,更是在濱河造成了極大的恐慌篓叶,老刑警劉巖,帶你破解...
    沈念sama閱讀 210,914評論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件羞秤,死亡現(xiàn)場離奇詭異缸托,居然都是意外死亡,警方通過查閱死者的電腦和手機瘾蛋,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 89,935評論 2 383
  • 文/潘曉璐 我一進店門俐镐,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人哺哼,你說我怎么就攤上這事佩抹。” “怎么了取董?”我有些...
    開封第一講書人閱讀 156,531評論 0 345
  • 文/不壞的土叔 我叫張陵棍苹,是天一觀的道長。 經(jīng)常有香客問我甲葬,道長廊勃,這世上最難降的妖魔是什么懈贺? 我笑而不...
    開封第一講書人閱讀 56,309評論 1 282
  • 正文 為了忘掉前任经窖,我火速辦了婚禮坡垫,結果婚禮上,老公的妹妹穿的比我還像新娘画侣。我一直安慰自己冰悠,他們只是感情好,可當我...
    茶點故事閱讀 65,381評論 5 384
  • 文/花漫 我一把揭開白布配乱。 她就那樣靜靜地躺著溉卓,像睡著了一般。 火紅的嫁衣襯著肌膚如雪搬泥。 梳的紋絲不亂的頭發(fā)上桑寨,一...
    開封第一講書人閱讀 49,730評論 1 289
  • 那天,我揣著相機與錄音忿檩,去河邊找鬼尉尾。 笑死,一個胖子當著我的面吹牛燥透,可吹牛的內(nèi)容都是我干的沙咏。 我是一名探鬼主播,決...
    沈念sama閱讀 38,882評論 3 404
  • 文/蒼蘭香墨 我猛地睜開眼班套,長吁一口氣:“原來是場噩夢啊……” “哼肢藐!你這毒婦竟也來了?” 一聲冷哼從身側響起吱韭,我...
    開封第一講書人閱讀 37,643評論 0 266
  • 序言:老撾萬榮一對情侶失蹤吆豹,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后理盆,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體瞻讽,經(jīng)...
    沈念sama閱讀 44,095評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,448評論 2 325
  • 正文 我和宋清朗相戀三年熏挎,在試婚紗的時候發(fā)現(xiàn)自己被綠了耿焊。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片软驰。...
    茶點故事閱讀 38,566評論 1 339
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出截歉,到底是詐尸還是另有隱情,我是刑警寧澤炸庞,帶...
    沈念sama閱讀 34,253評論 4 328
  • 正文 年R本政府宣布最欠,位于F島的核電站,受9級特大地震影響积担,放射性物質(zhì)發(fā)生泄漏陨晶。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,829評論 3 312
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望先誉。 院中可真熱鬧湿刽,春花似錦、人聲如沸褐耳。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,715評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽铃芦。三九已至雅镊,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間刃滓,已是汗流浹背仁烹。 一陣腳步聲響...
    開封第一講書人閱讀 31,945評論 1 264
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留咧虎,地道東北人晃危。 一個月前我還...
    沈念sama閱讀 46,248評論 2 360
  • 正文 我出身青樓,卻偏偏與公主長得像老客,于是被迫代替她去往敵國和親僚饭。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 43,440評論 2 348

推薦閱讀更多精彩內(nèi)容