深入理解Node中可讀流和可寫流

12.jpg

流是什么?

這個字進入我腦海我第一時間想到的是一句詩祠斧,抽刀斷水水更流闻察,舉杯消愁...額,今天的主角是流琢锋。不好意思差點跑題了辕漂,嗯,流是一個抽象接口吴超,被 Node 中的很多對象所實現(xiàn)钮热。比如HTTP服務器request和response對象都是流。本人最近研究node烛芬,特意記下隧期,分享一下飒责。

對于流,官方文檔是這樣描述的:

流(stream)在 Node.js 中是處理流數(shù)據(jù)的抽象接口(abstract interface)

Node.js 中有四種基本的流類型:

  • Readable - 可讀的流 (例如 fs.createReadStream()).
  • Writable - 可寫的流 (例如 fs.createWriteStream()).
  • Duplex - 可讀寫的流 (例如 net.Socket).
  • Transform - 在讀寫過程中可以修改和變換數(shù)據(jù)的 Duplex 流 (例如 zlib.createDeflate())

今天主要分享的是node可讀流和可寫流

可寫流

先上個流程圖讓大家直觀了解整個流程


Writable.png
  • Open()后write()開始寫入
  • 判斷是否底層寫入和緩存區(qū)是否小于最高水位線同步或異步進行
  • 如果底層在寫入中放到緩存區(qū)里面仆潮,否則就調用底層_write()
  • 成功寫入后判斷緩存區(qū)是否有數(shù)據(jù)宏蛉,如果有在寫入則存加入緩沖區(qū)隊列中,緩沖區(qū)排空后觸發(fā) drain 事件性置;
  • 當一個流不處在 drain 的狀態(tài)拾并, 對 write() 的調用會緩存數(shù)據(jù)塊, 并且返回 false鹏浅。一旦所有當前所有緩存的數(shù)據(jù)塊都排空了(被操作系統(tǒng)接受來進行輸出)嗅义, 那么 'drain' 事件就會被觸發(fā),一旦 write() 返回 false隐砸, 在 'drain' 事件觸發(fā)前之碗, 不能寫入任何數(shù)據(jù)塊。

可寫流的模擬實現(xiàn):

let EventEmitter = require('events');

class WriteStream extends EventEmitter {
    constructor(path, options) {
        super(path, options);
        this.path = path;
        this.flags = options.flags || 'w';
        this.mode = options.mode || 0o666;
        this.start = options.start || 0;
        this.pos = this.start;//文件的寫入索引
        this.encoding = options.encoding || 'utf8';
        this.autoClose = options.autoClose;
        this.highWaterMark = options.highWaterMark || 16 * 1024;
        this.buffers = [];//緩存區(qū)季希,源碼用的鏈表
        this.writing = false;//表示內部正在寫入數(shù)據(jù)
        this.length = 0;//表示緩存區(qū)字節(jié)的長度
        this.open();
    }

    open() {
        fs.open(this.path, this.flags, this.mode, (err, fd) => {
            if (err) {
                if (this.autoClose) {
                    this.destroy();
                }
                return this.emit('error', err);
            }
            this.fd = fd;
            this.emit('open');
        });
    }

    //如果底層已經(jīng)在寫入數(shù)據(jù)的話褪那,則必須當前要寫入數(shù)據(jù)放在緩沖區(qū)里
    write(chunk, encoding, cb) {
        chunk = Buffer.isBuffer(chunk)?chunk:Buffer.from(chunk,this.encoding);
        let len = chunk.length;
        //緩存區(qū)的長度加上當前寫入的長度
        this.length += len;
        //判斷當前最新的緩存區(qū)是否小于最高水位線
        let ret = this.length < this.highWaterMark;
        if (this.writing) {//表示正在向底層寫數(shù)據(jù),則當前數(shù)據(jù)必須放在緩存區(qū)里
            this.buffers.push({
                chunk,
                encoding,
                cb
            });
        } else {//直接調用底層的寫入方法進行寫入
            //在底層寫完當前數(shù)據(jù)后要清空緩存區(qū)
            this.writing = true;
            this._write(chunk, encoding, () => this.clearBuffer());
        }
        return ret;
    }

    clearBuffer() {
        //取出緩存區(qū)中的第一個buffer
        //8 7
        let data = this.buffers.shift();
        if(data){
            this._write(data.chunk,data.encoding,()=>this.clearBuffer())
        }else{
            this.writing = false;
            //緩存區(qū)清空了
            this.emit('drain');
        }
    }

    _write(chunk, encoding, cb) {
       if(typeof this.fd != 'number'){
           return this.once('open',()=>this._write(chunk, encoding, cb));
       }
        fs.write(this.fd,chunk,0,chunk.length,this.pos,(err,bytesWritten)=>{
            if(err){
                if(this.autoClose){
                    this.destroy();
                    this.emit('error',err);
                }
            }
            this.pos += bytesWritten;
            //寫入多少字母式塌,緩存區(qū)減少多少字節(jié)
            this.length -= bytesWritten;
            cb && cb();
       })
    }

    destroy() {
        fs.close(this.fd, () => {
            this.emit('close');
        })
    }
}
module.exports = WriteStream;

可讀流

可讀流事實上工作在下面兩種模式之一:flowing 和 paused

  • 在 flowing 模式下博敬, 可讀流自動從系統(tǒng)底層讀取數(shù)據(jù),并通過 EventEmitter 接口的事件盡快將數(shù)據(jù)提供給應用峰尝。
  • 在 paused 模式下偏窝,必須顯式調用 stream.read() 方法來從流中讀取數(shù)據(jù)片段。

flowing 流動模式

流動模式比較簡單武学,代碼實現(xiàn)如下:

let EventEmitter = require('events');
let fs = require('fs');
class ReadStream extends EventEmitter {
    constructor(path, options) {
        super(path, options);
        this.path = path;
        this.flags = options.flags || 'r';
        this.mode = options.mode || 0o666;
        this.highWaterMark = options.highWaterMark || 64 * 1024;
        this.pos = this.start = options.start || 0;
        this.end = options.end;
        this.encoding = options.encoding;
        this.flowing = null;
        this.buffer = Buffer.alloc(this.highWaterMark);
        this.open();//準備打開文件讀取
        //當給這個實例添加了任意的監(jiān)聽函數(shù)時會觸發(fā)newListener
        this.on('newListener',(type,listener)=>{
            //如果監(jiān)聽了data事件祭往,流會自動切換的流動模式
            if(type == 'data'){
              this.flowing = true;
              this.read();
            }
        });
    }
    read(){
        if(typeof this.fd != 'number'){
            return this.once('open',()=>this.read());
        }
        let howMuchToRead = this.end?Math.min(this.end - this.pos + 1,this.highWaterMark):this.highWaterMark;
        //this.buffer并不是緩存區(qū)
        console.log('howMuchToRead',howMuchToRead);
        fs.read(this.fd,this.buffer,0,howMuchToRead,this.pos,(err,bytes)=>{//bytes是實際讀到的字節(jié)數(shù)
            if(err){
                if(this.autoClose)
                    this.destroy();
                return this.emit('error',err);
            }
            if(bytes){
                let data = this.buffer.slice(0,bytes);
                this.pos += bytes;
                data = this.encoding?data.toString(this.encoding):data;
                this.emit('data',data);
                if(this.end && this.pos > this.end){
                   return this.endFn();
                }else{
                    if(this.flowing)
                      this.read();
                }
            }else{
                return this.endFn();
            }

        })
    }
    endFn(){
        this.emit('end');
        this.destroy();
    }
    open() {
        fs.open(this.path,this.flags,this.mode,(err,fd)=>{
           if(err){
               if(this.autoClose){
                   this.destroy();
                   return this.emit('error',err);
               }
           }
           this.fd = fd;
           this.emit('open');
        })
    }
    destroy(){
        fs.close(this.fd,()=>{
            this.emit('close');
        });
    }
    pipe(dest){
        this.on('data',data=>{
            let flag = dest.write(data);
            if(!flag){
                this.pause();
            }
        });
        dest.on('drain',()=>{
            this.resume();
        });
    }
    //可讀流會進入流動模式,當暫停的時候劳淆,
    pause(){
        this.flowing = false;
    }
    resume(){
       this.flowing = true;
       this.read();
    }
}
module.exports = ReadStream;

paused 暫停模式:

暫停模式邏輯有點復雜, 畫了一張圖梳理一下

Readable.png

_read 方法是把數(shù)據(jù)存在緩存區(qū)中,因為是異步 的默赂,流是通過readable事件來通知消耗方的沛鸵。
說明一下,流中維護了一個緩存缆八,當緩存中的數(shù)據(jù)足夠多時曲掰,調用read()不會引起_read()的調用,即不需要向底層請求數(shù)據(jù)奈辰。state.highWaterMark是給緩存大小設置的一個上限閾值栏妖。如果取走n個數(shù)據(jù)后,緩存中保有的數(shù)據(jù)不足這個量奖恰,便會從底層取一次數(shù)據(jù)

暫停模式代碼模擬實現(xiàn):

let fs = require('fs');
let EventEmitter = require('events');

class ReadStream extends EventEmitter {
    constructor(path, options) {
        super(path, options);
        this.path = path;
        this.highWaterMark = options.highWaterMark || 64 * 1024;
        this.buffer = Buffer.alloc(this.highWaterMark);
        this.flags = options.flags || 'r';
        this.encoding = options.encoding;
        this.mode = options.mode || 0o666;
        this.start = options.start || 0;
        this.end = options.end;
        this.pos = this.start;
        this.autoClose = options.autoClose || true;
        this.bytesRead = 0;
        this.closed = false;
        this.flowing;
        this.needReadable = false;
        this.length = 0;
        this.buffers = [];
        this.on('end', function () {
            if (this.autoClose) {
                this.destroy();
            }
        });
        this.on('newListener', (type) => {
            if (type == 'data') {
                this.flowing = true;
                this.read();
            }
            if (type == 'readable') {
                this.read(0);
            }
        });
        this.open();
    }

    open() {
        fs.open(this.path, this.flags, this.mode, (err, fd) => {
            if (err) {
                if (this.autoClose) {
                    this.destroy();
                    return this.emit('error', err);
                }
            }
            this.fd = fd;
            this.emit('open');
        });
    }

    read(n) {
        if (typeof this.fd != 'number') {
            return this.once('open', () => this.read());
        }
        n = parseInt(n, 10);
        if (n != n) {
            n = this.length;
        }
        if (this.length == 0)
            this.needReadable = true;
        let ret;
        if (0 < n < this.length) {
            ret = Buffer.alloc(n);
            let b;
            let index = 0;
            while (null != (b = this.buffers.shift())) {
                for (let i = 0; i < b.length; i++) {
                    ret[index++] = b[i];
                    if (index == ret.length) {
                        this.length -= n;
                        b = b.slice(i + 1);
                        this.buffers.unshift(b);
                        break;
                    }
                }
            }
            if (this.encoding) ret = ret.toString(this.encoding);
        }
        //數(shù)據(jù)存緩存區(qū)中
        let _read = () => {
            let m = this.end ? Math.min(this.end - this.pos + 1, this.highWaterMark) : this.highWaterMark;
            fs.read(this.fd, this.buffer, 0, m, this.pos, (err, bytesRead) => {
                if (err) {
                    return
                }
                let data;
                if (bytesRead > 0) {
                    data = this.buffer.slice(0, bytesRead);
                    this.pos += bytesRead;
                    this.length += bytesRead;
                    if (this.end && this.pos > this.end) {
                        if (this.needReadable) {
                            this.emit('readable');
                        }

                        this.emit('end');
                    } else {
                        this.buffers.push(data);
                        if (this.needReadable) {
                            this.emit('readable');
                            this.needReadable = false;
                        }
                    }
                } else {
                    if (this.needReadable) {
                        this.emit('readable');
                    }
                    return this.emit('end');
                }
            })
        }
        if (this.length == 0 || (this.length < this.highWaterMark)) {
            _read(0);
        }
        return ret;
    }

    destroy() {
        fs.close(this.fd, (err) => {
            this.emit('close');
        });
    }

    pause() {
        this.flowing = false;
    }

    resume() {
        this.flowing = true;
        this.read();
    }

    pipe(dest) {
        this.on('data', (data) => {
            let flag = dest.write(data);
            if (!flag) this.pause();
        });
        dest.on('drain', () => {
            this.resume();
        });
        this.on('end', () => {
            dest.end();
        });
    }
}
module.exports = ReadStream;

小弟我能力有限吊趾,歡迎各位大神指點宛裕,謝謝~

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市论泛,隨后出現(xiàn)的幾起案子揩尸,更是在濱河造成了極大的恐慌,老刑警劉巖屁奏,帶你破解...
    沈念sama閱讀 211,042評論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件岩榆,死亡現(xiàn)場離奇詭異,居然都是意外死亡坟瓢,警方通過查閱死者的電腦和手機勇边,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 89,996評論 2 384
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來折联,“玉大人粒褒,你說我怎么就攤上這事≌赣梗” “怎么了怀浆?”我有些...
    開封第一講書人閱讀 156,674評論 0 345
  • 文/不壞的土叔 我叫張陵,是天一觀的道長怕享。 經(jīng)常有香客問我执赡,道長,這世上最難降的妖魔是什么函筋? 我笑而不...
    開封第一講書人閱讀 56,340評論 1 283
  • 正文 為了忘掉前任沙合,我火速辦了婚禮,結果婚禮上跌帐,老公的妹妹穿的比我還像新娘首懈。我一直安慰自己,他們只是感情好谨敛,可當我...
    茶點故事閱讀 65,404評論 5 384
  • 文/花漫 我一把揭開白布究履。 她就那樣靜靜地躺著,像睡著了一般脸狸。 火紅的嫁衣襯著肌膚如雪最仑。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,749評論 1 289
  • 那天炊甲,我揣著相機與錄音泥彤,去河邊找鬼。 笑死卿啡,一個胖子當著我的面吹牛吟吝,可吹牛的內容都是我干的。 我是一名探鬼主播颈娜,決...
    沈念sama閱讀 38,902評論 3 405
  • 文/蒼蘭香墨 我猛地睜開眼剑逃,長吁一口氣:“原來是場噩夢啊……” “哼浙宜!你這毒婦竟也來了?” 一聲冷哼從身側響起炕贵,我...
    開封第一講書人閱讀 37,662評論 0 266
  • 序言:老撾萬榮一對情侶失蹤梆奈,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后称开,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體亩钟,經(jīng)...
    沈念sama閱讀 44,110評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 36,451評論 2 325
  • 正文 我和宋清朗相戀三年鳖轰,在試婚紗的時候發(fā)現(xiàn)自己被綠了清酥。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,577評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡蕴侣,死狀恐怖焰轻,靈堂內的尸體忽然破棺而出,到底是詐尸還是另有隱情昆雀,我是刑警寧澤辱志,帶...
    沈念sama閱讀 34,258評論 4 328
  • 正文 年R本政府宣布,位于F島的核電站狞膘,受9級特大地震影響揩懒,放射性物質發(fā)生泄漏。R本人自食惡果不足惜挽封,卻給世界環(huán)境...
    茶點故事閱讀 39,848評論 3 312
  • 文/蒙蒙 一已球、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧辅愿,春花似錦智亮、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,726評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至癞埠,卻和暖如春状原,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背燕差。 一陣腳步聲響...
    開封第一講書人閱讀 31,952評論 1 264
  • 我被黑心中介騙來泰國打工遭笋, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留坝冕,地道東北人徒探。 一個月前我還...
    沈念sama閱讀 46,271評論 2 360
  • 正文 我出身青樓,卻偏偏與公主長得像喂窟,于是被迫代替她去往敵國和親测暗。 傳聞我的和親對象是個殘疾皇子央串,可洞房花燭夜當晚...
    茶點故事閱讀 43,452評論 2 348

推薦閱讀更多精彩內容

  • stream 流是一個抽象接口,在 Node 里被不同的對象實現(xiàn)碗啄。例如 request to an HTTP se...
    明明三省閱讀 3,398評論 1 10
  • 背景 之前在開發(fā)ASP.NET的時候质和,根據(jù)源代碼依次追蹤整個程序的執(zhí)行過程,發(fā)現(xiàn)最重要的過程是基于一個管道流的稚字,像...
    小武song閱讀 746評論 0 0
  • 什么流 通俗的說就是一種饲宿,有起點和終點的字節(jié)數(shù)據(jù)傳輸手段,把數(shù)據(jù)從一個地方傳到另一個地方胆描。流(Stream)是一個...
    JOKER_HAN閱讀 1,135評論 0 3
  • Buffer Buffer的構成 Buffer對象類似數(shù)組瘫想,它的元素位16進制的兩位數(shù),即0到255的數(shù)值昌讲。主要是...
    人失格閱讀 1,810評論 0 0
  • 毫無懸念短绸,Sir今天要跟你們聊的车吹,是《愛樂之城》。 關于這部追平《泰坦尼克號》奧斯卡提名紀錄的電影醋闭,你們已聽過太多...
    Sir電影閱讀 1,094評論 2 22