深入nodejs中流(stream)的理解

流的基本概念及理解

流是一種數(shù)據(jù)傳輸手段,是有順序的融撞,有起點(diǎn)和終點(diǎn)勉耀,比如你要把數(shù)據(jù)從一個(gè)地方傳到另外一個(gè)地方
流非常重要指煎,gulp,webpack,HTTP里的請求和響應(yīng)便斥,http里的socket都是流至壤,包括后面壓縮,加密等

流為什么這么好用還這么重要呢枢纠?

  • 因?yàn)橛袝r(shí)候我們不關(guān)心文件的主體內(nèi)容像街,只關(guān)心能不能取到數(shù)據(jù),取到數(shù)據(jù)之后怎么進(jìn)行處理
  • 對(duì)于小型的文本文件晋渺,我們可以把文件內(nèi)容全部讀入內(nèi)存镰绎,然后再寫入文件,比如grunt-file-copy
  • 對(duì)于體積較大的二進(jìn)制文件木西,比如音頻畴栖、視頻文件,動(dòng)輒幾個(gè)GB大小八千,如果使用這種方法吗讶,很容易使內(nèi)存“爆倉”。
  • 理想的方法應(yīng)該是讀一部分恋捆,寫一部分照皆,不管文件有多大,只要時(shí)間允許鸠信,總會(huì)處理完成纵寝,這里就需要用到流的概念

流是一個(gè)抽象接口,被Node中很多對(duì)象所實(shí)現(xiàn),比如HTTP服務(wù)器request和response對(duì)象都是流

Node.js 中有四種基本的流類型:

  • Readable - 可讀的流 (例如 fs.createReadStream()).
  • Writable - 可寫的流 (例如 fs.createWriteStream()).
  • Duplex - 可讀寫的流 (例如 net.Socket).
  • Transform - 在讀寫過程中可以修改和變換數(shù)據(jù)的 Duplex 流 (例如 zlib.createDeflate()).

可以通過 require('stream') 加載 Stream 基類爽茴。其中包括了 Readable 流葬凳、Writable 流、Duplex 流和 Transform 流的基類

Readable streams可讀流

可讀流(Readable streams)是對(duì)提供數(shù)據(jù)的 源頭(source)的抽象
可讀流的例子包括:

  • HTTP responses, on the client :客戶端請求
  • HTTP requests, on the server :服務(wù)端請求
  • fs read streams :讀文件
  • zlib streams :壓縮
  • crypto streams :加密
  • TCP sockets :TCP協(xié)議
  • child process stdout and stderr :子進(jìn)程標(biāo)準(zhǔn)輸出和錯(cuò)誤輸出
  • process.stdin :標(biāo)準(zhǔn)輸入

所有的 Readable 都實(shí)現(xiàn)了 stream.Readable 類定義的接口

通過流讀取數(shù)據(jù)

  • 用Readable創(chuàng)建對(duì)象readable后室奏,便得到了一個(gè)可讀流
  • 如果實(shí)現(xiàn)_read方法火焰,就將流連接到一個(gè)底層數(shù)據(jù)源
  • 流通過調(diào)用_read向底層請求數(shù)據(jù),底層再調(diào)用流的push方法將需要的數(shù)據(jù)傳遞過來
  • 當(dāng)readable連接了數(shù)據(jù)源后胧沫,下游便可以調(diào)用readable.read(n)向流請求數(shù)據(jù)昌简,同時(shí)監(jiān)聽readable的data事件來接收取到的數(shù)據(jù)

下面簡單舉個(gè)可讀流的例子:

  • 監(jiān)聽可讀流的data事件,當(dāng)你一旦開始監(jiān)聽data事件的時(shí)候绒怨,流就可以讀文件的內(nèi)容并且發(fā)射data纯赎,讀一點(diǎn)發(fā)射一點(diǎn)讀一點(diǎn)發(fā)射一點(diǎn)
  • 默認(rèn)情況下,當(dāng)你監(jiān)聽data事件之后南蹂,會(huì)不停的讀數(shù)據(jù)犬金,然后觸發(fā)data事件,觸發(fā)完data事件后再次讀數(shù)據(jù)
  • 讀的時(shí)候不是把文件整體內(nèi)容讀出來再發(fā)射出來的六剥,而且設(shè)置一個(gè)緩沖區(qū)晚顷,大小默認(rèn)是64K,比如文件是128K疗疟,先讀64K發(fā)射出來该默,再讀64K在發(fā)射出來,會(huì)發(fā)射兩次
  • 緩沖區(qū)的大小可以通過highWaterMark來設(shè)置
let fs = require('fs');
//通過創(chuàng)建一個(gè)可讀流
let rs = fs.createReadStream('./1.txt',{
    flags:'r',//我們要對(duì)文件進(jìn)行何種操作
    mode:0o666,//權(quán)限位
    encoding:'utf8',//不傳默認(rèn)為buffer策彤,顯示為字符串
    start:3,//從索引為3的位置開始讀
    //這是我的見過唯一一個(gè)包括結(jié)束索引的
    end:8,//讀到索引為8結(jié)束
    highWaterMark:3//緩沖區(qū)大小
});
rs.on('open',function () {
    console.log('文件打開');
});
rs.setEncoding('utf8');//顯示為字符串
//希望流有一個(gè)暫停和恢復(fù)觸發(fā)的機(jī)制
rs.on('data',function (data) {
    console.log(data);
    rs.pause();//暫停讀取和發(fā)射data事件
    setTimeout(function(){
        rs.resume();//恢復(fù)讀取并觸發(fā)data事件
    },2000);
});
//如果讀取文件出錯(cuò)了栓袖,會(huì)觸發(fā)error事件
rs.on('error',function () {
    console.log("error");
});
//如果文件的內(nèi)容讀完了,會(huì)觸發(fā)end事件
rs.on('end',function () {
    console.log('讀完了');
});
rs.on('close',function () {
    console.log('文件關(guān)閉');
});

/**
文件打開
334
455
讀完了
文件關(guān)閉
**/

可讀流的簡單實(shí)現(xiàn)

let fs = require('fs');
let ReadStream = require('./ReadStream');
let rs = ReadStream('./1.txt', {
    flags: 'r',
    encoding: 'utf8',
    start: 3,
    end: 7,
    highWaterMark: 3
});
rs.on('open', function () {
    console.log("open");
});
rs.on('data', function (data) {
    console.log(data);
});
rs.on('end', function () {
    console.log("end");
});
rs.on('close', function () {
    console.log("close");
});
/**
 open
 456
 789
 end
 close
 **/
let fs = require('fs');
let EventEmitter = require('events');

class ReadStream extends EventEmitter {
    constructor(path, options) {
        super(path, options);
        this.path = path;
        this.highWaterMark = options.highWaterMark || 64 * 1024;
        this.buffer = Buffer.alloc(this.highWaterMark);
        this.flags = options.flags || 'r';
        this.encoding = options.encoding;
        this.mode = options.mode || 0o666;
        this.start = options.start || 0;
        this.end = options.end;
        this.pos = this.start;
        this.autoClose = options.autoClose || true;
        this.bytesRead = 0;
        this.closed = false;
        this.flowing;
        this.needReadable = false;
        this.length = 0;
        this.buffers = [];
        this.on('end', function () {
            if (this.autoClose) {
                this.destroy();
            }
        });
        this.on('newListener', (type) => {
            if (type == 'data') {
                this.flowing = true;
                this.read();
            }
            if (type == 'readable') {
                this.read(0);
            }
        });
        this.open();
    }

    open() {
        fs.open(this.path, this.flags, this.mode, (err, fd) => {
            if (err) {
                if (this.autoClose) {
                    this.destroy();
                    return this.emit('error', err);
                }
            }
            this.fd = fd;
            this.emit('open');
        });
    }

    read(n) {
        if (typeof this.fd != 'number') {
            return this.once('open', () => this.read());
        }
        n = parseInt(n, 10);
        if (n != n) {
            n = this.length;
        }
        if (this.length == 0)
            this.needReadable = true;
        let ret;
        if (0 < n < this.length) {
            ret = Buffer.alloc(n);
            let b;
            let index = 0;
            while (null != (b = this.buffers.shift())) {
                for (let i = 0; i < b.length; i++) {
                    ret[index++] = b[i];
                    if (index == ret.length) {
                        this.length -= n;
                        b = b.slice(i + 1);
                        this.buffers.unshift(b);
                        break;
                    }
                }
            }
            if (this.encoding) ret = ret.toString(this.encoding);
        }

        let _read = () => {
            let m = this.end ? Math.min(this.end - this.pos + 1, this.highWaterMark) : this.highWaterMark;
            fs.read(this.fd, this.buffer, 0, m, this.pos, (err, bytesRead) => {
                if (err) {
                    return
                }
                let data;
                if (bytesRead > 0) {
                    data = this.buffer.slice(0, bytesRead);
                    this.pos += bytesRead;
                    this.length += bytesRead;
                    if (this.end && this.pos > this.end) {
                        if (this.needReadable) {
                            this.emit('readable');
                        }

                        this.emit('end');
                    } else {
                        this.buffers.push(data);
                        if (this.needReadable) {
                            this.emit('readable');
                            this.needReadable = false;
                        }

                    }
                } else {
                    if (this.needReadable) {
                        this.emit('readable');
                    }
                    return this.emit('end');
                }
            })
        }
        if (this.length == 0 || (this.length < this.highWaterMark)) {
            _read(0);
        }
        return ret;
    }

    destroy() {
        fs.close(this.fd, (err) => {
            this.emit('close');
        });
    }

    pause() {
        this.flowing = false;
    }

    resume() {
        this.flowing = true;
        this.read();
    }

    pipe(dest) {
        this.on('data', (data) => {
            let flag = dest.write(data);
            if (!flag) this.pause();
        });
        dest.on('drain', () => {
            this.resume();
        });
        this.on('end', () => {
            dest.end();
        });
    }
}
module.exports = ReadStream;

自定義可讀流

為了實(shí)現(xiàn)可讀流锅锨,引用Readable接口并用它構(gòu)造新對(duì)象

  • 我們可以直接把供使用的數(shù)據(jù)push出去叽赊。
  • 當(dāng)push一個(gè)null對(duì)象就意味著我們想發(fā)出信號(hào)——這個(gè)流沒有更多數(shù)據(jù)了
var stream = require('stream');
var util = require('util');
util.inherits(Counter, stream.Readable);
function Counter(options) {
    stream.Readable.call(this, options);
    this._index = 0;
}
Counter.prototype._read = function() {
    if(this._index++<3){
        this.push(this._index+'');
    }else{
        this.push(null);
    }
};
var counter = new Counter();

counter.on('data', function(data){
    console.log("讀到數(shù)據(jù): " + data.toString());//no maybe
});
counter.on('end', function(data){
    console.log("讀完了");
});

可讀流的兩種模式

Readable Stream 存在兩種模式(flowing mode 與 paused mode),這兩種模式?jīng)Q定了chunk數(shù)據(jù)流動(dòng)的方式---自動(dòng)流動(dòng)還是手工流動(dòng)必搞。那如何觸發(fā)這兩種模式呢:

  • flowing mode: 注冊事件data必指、調(diào)用resume方法、調(diào)用pipe方法
  • paused mode: 調(diào)用pause方法(沒有pipe方法)恕洲、移除data事件 && unpipe所有pipe

如果 Readable 切換到 flowing 模式塔橡,且沒有消費(fèi)者處理流中的數(shù)據(jù),這些數(shù)據(jù)將會(huì)丟失霜第。 比如葛家, 調(diào)用了 readable.resume() 方法卻沒有監(jiān)聽 'data' 事件,或是取消了 'data' 事件監(jiān)聽泌类,就有可能出現(xiàn)這種情況

可讀流的三種狀態(tài)

在任意時(shí)刻癞谒,任意可讀流應(yīng)確切處于下面三種狀態(tài)之一:

  • readable._readableState.flowing = null
  • readable._readableState.flowing = false
  • readable._readableState.flowing = true

兩種模式取決于可讀流flowing狀態(tài):

  • 若為true : flowing mode;
  • 若為false : paused mode

flowing mode

通過注冊data、pipe弹砚、resume可以自動(dòng)獲取所需要的數(shù)據(jù)双仍,我們來看下源碼的實(shí)現(xiàn)

// data事件觸發(fā)flowing mode
 if (ev === 'data') {
    // Start flowing on next tick if stream isn't explicitly paused
    if (this._readableState.flowing !== false)
      this.resume();
  } else if (ev === 'readable') {
    const state = this._readableState;
    if (!state.endEmitted && !state.readableListening) {
      state.readableListening = state.needReadable = true;
      state.emittedReadable = false;
      if (!state.reading) {
        process.nextTick(nReadingNextTick, this);
      } else if (state.length) {
        emitReadable(this);
      }
    }
  }

// resume觸發(fā)flowing mode
Readable.prototype.resume = function() {
    var state = this._readableState;
    if (!state.flowing) {
        debug('resume');
        state.flowing = true;
    resume(this, state);
  }
  return this;
}

// pipe方法觸發(fā)flowing模式
Readable.prototype.resume = function() {
    if (!state.flowing) {
        this.resume()
    }
}

flowing mode的三種方法最后均是通過resume方法,將狀態(tài)變?yōu)閠rue:state.flowing = true

paused mode

在paused mode下桌吃,需要手動(dòng)地讀取數(shù)據(jù)朱沃,并且可以直接指定讀取數(shù)據(jù)的長度
可以通過監(jiān)聽事件readable,觸發(fā)時(shí)手工讀取chunk數(shù)據(jù):

  • 當(dāng)你監(jiān)聽 readable事件的時(shí)候茅诱,會(huì)進(jìn)入暫停模式
  • 當(dāng)監(jiān)聽readable事件的時(shí)候逗物,可讀流會(huì)馬上去向底層讀取文件,然后把讀到文件的文件放在緩存區(qū)里const state = this._readableState;
  • self.read(0); 只填充緩存瑟俭,但是并不會(huì)發(fā)射data事件,但是會(huì)發(fā)射stream.emit('readable');事件
  • this._read(state.highWaterMark); 每次調(diào)用底層的方法讀取的時(shí)候是讀取3個(gè)字節(jié)
let fs = require('fs');
let rs = fs.createReadStream('./1.txt',{
    highWaterMark:3
});
rs.on('readable',function(){
    console.log(rs._readableState.length);
    //read如果不加參數(shù)表示讀取整個(gè)緩存區(qū)數(shù)據(jù)
    //讀取一個(gè)字段,如果可讀流發(fā)現(xiàn)你要讀的字節(jié)小于等于緩存字節(jié)大小翎卓,則直接返回
    let chunk = rs.read(1);
    console.log(chunk);
    console.log(rs._readableState.length);
    //當(dāng)你讀完指定的字節(jié)后,如果可讀流發(fā)現(xiàn)剩下的字節(jié)已經(jīng)比最高水位線小了摆寄。則會(huì)立馬再次讀取填滿 最高水位線
    setTimeout(function(){
        console.log(rs._readableState.length);
    },200)
});

注意:一旦注冊了readable事件莲祸,必須手工讀取read數(shù)據(jù),否則數(shù)據(jù)就會(huì)流失椭迎,我們來看下源碼的實(shí)現(xiàn)

function emitReadable(stream) {
  var state = stream._readableState;
  state.needReadable = false;
  if (!state.emittedReadable) {
    debug('emitReadable', state.flowing);
    state.emittedReadable = true;
    process.nextTick(emitReadable_, stream);
  }
}

function emitReadable_(stream) {
  var state = stream._readableState;
  debug('emit readable');
  if (!state.destroyed && (state.length || state.ended)) {
    stream.emit('readable');
  }
  state.needReadable = !state.flowing && !state.ended;
  flow(stream);
}

function flow(stream) {
  const state = stream._readableState;
  debug('flow', state.flowing);
  while (state.flowing && stream.read() !== null);
}

function endReadable(stream) {
  var state = stream._readableState;
  debug('endReadable', state.endEmitted);
  if (!state.endEmitted) {
    state.ended = true;
    process.nextTick(endReadableNT, state, stream);
  }
}

Readable.prototype.read = function(n) {
  debug('read', n);
  n = parseInt(n, 10);
  var state = this._readableState;
  var nOrig = n;
  if (n !== 0)
    state.emittedReadable = false;
  if (n === 0 &&
      state.needReadable &&
      (state.length >= state.highWaterMark || state.ended)) {
    debug('read: emitReadable', state.length, state.ended);
    if (state.length === 0 && state.ended)
      endReadable(this);
    else
      emitReadable(this);
    return null;
  }
  n = howMuchToRead(n, state);
  if (n === 0 && state.ended) {
    if (state.length === 0)
      endReadable(this);
    return null;
  }

flow方法直接read數(shù)據(jù),將得到的數(shù)據(jù)通過事件data交付出去田盈,然而此處沒有注冊data事件監(jiān)控畜号,因此,得到的chunk數(shù)據(jù)并沒有交付給任何對(duì)象允瞧,這樣數(shù)據(jù)就白白流失了简软,所以在觸發(fā)emit('readable')時(shí),需要提前read數(shù)據(jù)

Writable streams可寫流

可寫流是對(duì)數(shù)據(jù)寫入'目的地'的一種抽象
Writable:可寫流的例子包括了:

  • HTTP requests, on the client 客戶端請求
  • HTTP responses, on the server 服務(wù)器響應(yīng)
  • fs write streams 文件
  • zlib streams 壓縮
  • crypto streams 加密
  • TCP sockets TCP服務(wù)器
  • child process stdin 子進(jìn)程標(biāo)準(zhǔn)輸入
  • process.stdout, process.stderr 標(biāo)準(zhǔn)輸出述暂,錯(cuò)誤輸出

下面舉個(gè)可寫流的簡單例子

  • 當(dāng)你往可寫流里寫數(shù)據(jù)的時(shí)候痹升,不是會(huì)立刻寫入文件的,而是會(huì)很寫入緩存區(qū)畦韭,緩存區(qū)的大小就是highWaterMark,默認(rèn)值是16K疼蛾。然后等緩存區(qū)滿了之后再次真正的寫入文件里
let fs = require('fs');
let ws = fs.createWriteStream('./2.txt',{
   flags:'w',
   mode:0o666,
   start:3,
   highWaterMark:3//默認(rèn)是16K
});
  • 如果緩存區(qū)已滿 ,返回false,如果緩存區(qū)未滿艺配,返回true
  • 如果能接著寫察郁,返回true,如果不能接著寫,返回false
  • 按理說如果返回了false,就不能再往里面寫了转唉,但是如果你真寫了皮钠,如果也不會(huì)丟失,會(huì)緩存在內(nèi)存里赠法。等緩存區(qū)清空之后再從內(nèi)存里讀出來
let flag = ws.write('1');
console.log(flag);//true
flag =ws.write('2');
console.log(flag);//true
flag =ws.write('3');
console.log(flag);//false
flag =ws.write('4');
console.log(flag);//false

'drain' 事件

如果調(diào)用 stream.write(chunk) 方法返回 false麦轰,流將在適當(dāng)?shù)臅r(shí)機(jī)觸發(fā) 'drain' 事件,這時(shí)才可以繼續(xù)向流中寫入數(shù)據(jù)

當(dāng)一個(gè)流不處在 drain 的狀態(tài), 對(duì) write() 的調(diào)用會(huì)緩存數(shù)據(jù)塊款侵, 并且返回 false末荐。 一旦所有當(dāng)前所有緩存的數(shù)據(jù)塊都排空了(被操作系統(tǒng)接受來進(jìn)行輸出), 那么 'drain' 事件就會(huì)被觸發(fā)

建議喳坠, 一旦 write() 返回 false鞠评, 在 'drain' 事件觸發(fā)前, 不能寫入任何數(shù)據(jù)塊

舉個(gè)簡單的例子說明一下:

let fs = require('fs');
let ws = fs.createWriteStream('2.txt',{
    flags:'w',
    mode:0o666,
    start:0,
    highWaterMark:3
});
let count = 9;
function write(){
 let flag = true;//緩存區(qū)未滿
    //寫入方法是同步的壕鹉,但是寫入文件的過程是異步的剃幌。在真正寫入文件后還會(huì)執(zhí)行我們的回調(diào)函數(shù)
 while(flag && count>0){
     console.log('before',count);
     flag = ws.write((count)+'','utf8',(function (i) {
         return ()=>console.log('after',i);
     })(count));
     count--;
 }
}
write();//987
//監(jiān)聽緩存區(qū)清空事件
ws.on('drain',function () {
    console.log('drain');
    write();//654 321
});
ws.on('error',function (err) {
    console.log(err);
});
/**
before 9
before 8
before 7
after 9
after 8
after 7
**/

如果已經(jīng)不再需要寫入了,可以調(diào)用end方法關(guān)閉寫入流晾浴,一旦調(diào)用end方法之后則不能再寫入
比如在ws.end();后寫ws.write('x');负乡,會(huì)報(bào)錯(cuò)write after end

'pipe'事件

linux精典的管道的概念,前者的輸出是后者的輸入

pipe是一種最簡單直接的方法連接兩個(gè)stream脊凰,內(nèi)部實(shí)現(xiàn)了數(shù)據(jù)傳遞的整個(gè)過程抖棘,在開發(fā)的時(shí)候不需要關(guān)注內(nèi)部數(shù)據(jù)的流動(dòng)

  • 這個(gè)方法從可讀流拉取所有數(shù)據(jù), 并將數(shù)據(jù)寫入到提供的目標(biāo)中
  • 自動(dòng)管理流量,將數(shù)據(jù)的滯留量限制到一個(gè)可接受的水平狸涌,以使得不同速度的來源和目標(biāo)不會(huì)淹沒可用內(nèi)存
  • 默認(rèn)情況下切省,當(dāng)源數(shù)據(jù)流觸發(fā) end的時(shí)候調(diào)用end(),所以寫入數(shù)據(jù)的目標(biāo)不可再寫帕胆。傳 { end:false }作為options朝捆,可以保持目標(biāo)流打開狀態(tài)

pipe方法的原理

var fs = require('fs');
var ws = fs.createWriteStream('./2.txt');
var rs = fs.createReadStream('./1.txt');
rs.on('data', function (data) {
    var flag = ws.write(data);
    if(!flag)
    rs.pause();
});
ws.on('drain', function () {
    rs.resume();
});
rs.on('end', function () {
    ws.end();
});

下面舉個(gè)簡單的例子說明一下pipe的用法:

let fs = require('fs');
let rs = fs.createReadStream('./1.txt',{
  highWaterMark:3
});
let ws = fs.createWriteStream('./2.txt',{
    highWaterMark:3
});
rs.pipe(ws);
//移除目標(biāo)可寫流
rs.unpipe(ws);
  • 當(dāng)監(jiān)聽可讀流data事件的時(shí)候會(huì)觸發(fā)回調(diào)函數(shù)的執(zhí)行
  • 可以實(shí)現(xiàn)數(shù)據(jù)的生產(chǎn)者和消費(fèi)者速度的均衡
rs.on('data',function (data) {
    console.log(data);
    let flag = ws.write(data);
   if(!flag){
       rs.pause();
   }
});
  • 監(jiān)聽可寫流緩存區(qū)清空事件,當(dāng)所有要寫入的數(shù)據(jù)寫入完成后懒豹,接著恢復(fù)從可讀流里讀取并觸發(fā)data事件
ws.on('drain',function () {
    console.log('drain');
    rs.resume();
});

unpipe

readable.unpipe()方法將之前通過stream.pipe()方法綁定的流分離

  • 如果寫入的目標(biāo)沒有傳入, 則所有綁定的流都會(huì)被分離
  • 如果指定了寫入的目標(biāo)芙盘,但是沒有綁定流,則什么事情都不會(huì)發(fā)生

簡單距離說明下unpipe的用法:

let fs = require('fs');
var from = fs.createReadStream('./1.txt');
var to = fs.createWriteStream('./2.txt');
from.pipe(to);
setTimeout(() => {
console.log('關(guān)閉向2.txt的寫入');
from.unpipe(writable);
console.log('手工關(guān)閉文件流');
to.end();
}, 1000);

pipe的簡單實(shí)現(xiàn)

let fs = require('fs');
let ReadStream = require('./ReadStream');
let rs = ReadStream('./1.txt', {
    flags: 'r',
    encoding: 'utf8',
    highWaterMark: 3
});
let FileWriteStream = require('./WriteStream');
let ws = FileWriteStream('./2.txt',{
    flags:'w',
    encoding:'utf8',
    highWaterMark:3
});
rs.pipe(ws);
ReadStream.prototype.pipe = function (dest) {
    this.on('data', (data)=>{
        let flag = dest.write(data);
        if(!flag){
            this.pause();
        }
    });
    dest.on('drain', ()=>{
        this.resume();
    });
    this.on('end', ()=>{
        dest.end();
    });
}
ReadStream.prototype.pause = function(){
    this.flowing = false;

}
ReadStream.prototype.resume = function(){
    this.flowing = true;
    this.read();
}

自定義管道流

const stream = require('stream')

var index = 0;
const readable = stream.Readable({
    highWaterMark: 2,
    read: function () {
        process.nextTick(() => {
            console.log('push', ++index)
            this.push(index+'');
        })
    }
})
const writable = stream.Writable({
    highWaterMark: 2,
    write: function (chunk, encoding, next) {
        console.log('寫入:', chunk.toString())
    }
})
readable.pipe(writable);

可寫流的簡單實(shí)現(xiàn)

let fs = require('fs');
 let FileWriteStream = require('./FileWriteStream');
 let ws = FileWriteStream('./2.txt',{
     flags:'w',
     encoding:'utf8',
     highWaterMark:3
 });
 let i = 10;
 function write(){
     let  flag = true;
     while(i&&flag){
         flag = ws.write("1",'utf8',(function(i){
             return function(){
                 console.log(i);
             }
         })(i));
         i--;
         console.log(flag);
     }
 }
 write();
 ws.on('drain',()=>{
     console.log("drain");
     write();
 });
 /**
  10
  9
  8
  drain
  7
  6
  5
  drain
  4
  3
  2
  drain
  1
  **/
let EventEmitter = require('events');
let util = require('util');
let fs = require('fs');
util.inherits(WriteStream, EventEmitter);

function WriteStream(path, options) {
    EventEmitter.call(this);
    if (!(this instanceof WriteStream)) {
        return new WriteStream(path, options);
    }
    this.path = path;
    this.fd = options.fd;
    this.encoding = options.encoding||'utf8';
    this.flags = options.flags || 'w';
    this.mode = options.mode || 0o666;
    this.autoClose = options.autoClose || true;
    this.start = options.start || 0;
    this.pos = this.start;//開始寫入的索引位置
    this.open();//打開文件進(jìn)行操作
    this.writing = false;//沒有在寫入過程 中
    this.buffers = [];
    this.highWaterMark = options.highWaterMark||16*1024;
    //如果監(jiān)聽到end事件脸秽,而且要求自動(dòng)關(guān)閉的話則關(guān)閉文件
    this.on('end', function () {
        if (this.autoClose) {
            this.destroy()
        }
    });
}
WriteStream.prototype.close = function(){
    fs.close(this.fd,(err)=>{
        if(err)
            this.emit('error',err);
    });
}
WriteStream.prototype.open = function () {
    fs.open(this.path, this.flags, this.mode, (err, fd) => {
        if (err)
            return this.emit('error', err);
        this.fd = fd;//把文件描述符賦給當(dāng)前實(shí)例的fd屬性
        //發(fā)射open事件
        this.emit('open', fd);
    });
}
/**
 * 會(huì)判斷當(dāng)前是后臺(tái)是否在寫入過程中儒老,如果在寫入過程中,則把這個(gè)數(shù)據(jù)放在待處理的緩存中记餐,如果不在寫入過程中驮樊,可以直接寫。
 */
WriteStream.prototype.write = function (chunk, encoding, cb) {
    chunk= Buffer.isBuffer(chunk)?chunk:Buffer.from(chunk,this.encoding);

    //先把數(shù)據(jù)放在緩存里
    this.buffers.push({
        chunk,
        encoding,
        cb
    });

    let isFull = this.buffers.reduce((len, item) => len + item.chunk.length, 0)>=this.highWaterMark;
    //只有當(dāng)緩存區(qū)寫滿了片酝,那么清空緩存區(qū)的時(shí)候才會(huì)發(fā)射drain事件巩剖,否則 不發(fā)放
    this.needDrain = isFull;
    //如果說文件還沒有打開,則把寫入的方法壓入open事件的監(jiān)聽函數(shù)钠怯。等文件一旦打開佳魔,立刻執(zhí)行寫入操作
    if (typeof this.fd !== 'number') {
         this.once('open', () => {
            this._write();
        });
        return !isFull;
    }else{
        if(!this.writing){
            setImmediate(()=>{
                this._write();
                this.writing = true;
            });
        }

        return !isFull;
    }
}
WriteStream.prototype._write = function () {
    let part = this.buffers.shift();
    if (part) {
        fs.write(this.fd,part.chunk,0,part.chunk.length,null,(err,bytesWritten)=>{
            if(err)return this.emit('error',err);
            part.cb && part.cb();
            this._write();
        });
    }else{
        //發(fā)射一個(gè)緩存區(qū)清空的事件
        this.emit('drain');
        this.writing = false;
    }
}
module.exports = WriteStream;

自定義可寫流

為了實(shí)現(xiàn)可寫流,我們需要使用流模塊中的Writable構(gòu)造函數(shù)晦炊。 我們只需給Writable構(gòu)造函數(shù)傳遞一些選項(xiàng)并創(chuàng)建一個(gè)對(duì)象鞠鲜。唯一需要的選項(xiàng)是write函數(shù)宁脊,該函數(shù)揭露數(shù)據(jù)塊要往哪里寫

  • chunk通常是一個(gè)buffer,除非我們配置不同的流贤姆。
  • encoding是在特定情況下需要的參數(shù)榆苞,通常我們可以忽略它。
  • callback是在完成處理數(shù)據(jù)塊后需要調(diào)用的函數(shù)霞捡。這是寫數(shù)據(jù)成功與否的標(biāo)志坐漏。若要發(fā)出故障信號(hào),請用錯(cuò)誤對(duì)象調(diào)用回調(diào)函數(shù)
var stream = require('stream');
var util = require('util');
util.inherits(Writer, stream.Writable);
let stock = [];
function Writer(opt) {
    stream.Writable.call(this, opt);
}
Writer.prototype._write = function(chunk, encoding, callback) {
    setTimeout(()=>{
        stock.push(chunk.toString('utf8'));
        console.log("增加: " + chunk);
        callback();
    },500)
};
var w = new Writer();
for (var i=1; i<=5; i++){
    w.write("項(xiàng)目:" + i, 'utf8');
}
w.end("結(jié)束寫入",function(){
    console.log(stock);
});

Duplex streams可讀寫的流(雙工流)

Duplex 流是同時(shí)實(shí)現(xiàn)了 Readable 和 Writable 接口的流
雙工流的可讀性和可寫性操作完全獨(dú)立于彼此碧信,這僅僅是將兩個(gè)特性組合成一個(gè)對(duì)象

Duplex 流的實(shí)例包括了:

  • TCP sockets
  • zlib streams
  • crypto streams

下面簡單實(shí)現(xiàn)雙工流:

const {Duplex} = require('stream');
const inoutStream = new Duplex({
    write(chunk, encoding, callback) {
        console.log(chunk.toString());
        callback();
    },
    read(size) {
        this.push((++this.index)+'');
        if (this.index > 3) {
            this.push(null);
        }
    }
});

inoutStream.index = 0;
process.stdin.pipe(inoutStream).pipe(process.stdout);

Transform streams轉(zhuǎn)換流

變換流(Transform streams) 是一種 Duplex 流赊琳。它的輸出與輸入是通過某種方式關(guān)聯(lián)的。和所有 Duplex 流一樣砰碴,變換流同時(shí)實(shí)現(xiàn)了 Readable 和 Writable 接口

轉(zhuǎn)換流的輸出是從輸入中計(jì)算出來的
對(duì)于轉(zhuǎn)換流躏筏,我們不必實(shí)現(xiàn)read或write的方法,我們只需要實(shí)現(xiàn)一個(gè)transform方法呈枉,將兩者結(jié)合起來趁尼。它有write方法的意思,我們也可以用它來push數(shù)據(jù)

變換流的實(shí)例包括:

  • zlib streams
  • crypto streams

下面簡單實(shí)現(xiàn)轉(zhuǎn)換流:

const {Transform} = require('stream');
const upperCase = new Transform({
    transform(chunk, encoding, callback) {
        this.push(chunk.toString().toUpperCase());
        callback();
    }
});
process.stdin.pipe(upperCase).pipe(process.stdout);

對(duì)象流

默認(rèn)情況下猖辫,流處理的數(shù)據(jù)是Buffer/String類型的值酥泞。有一個(gè)objectMode標(biāo)志,我們可以設(shè)置它讓流可以接受任何JavaScript對(duì)象

const {Transform} = require('stream');
let fs = require('fs');
let rs = fs.createReadStream('./users.json');
rs.setEncoding('utf8');
let toJson = Transform({
    readableObjectMode: true,
    transform(chunk, encoding, callback) {
        this.push(JSON.parse(chunk));
        callback();
    }
});
let jsonOut = Transform({
    writableObjectMode: true,
    transform(chunk, encoding, callback) {
        console.log(chunk);
        callback();
    }
});
rs.pipe(toJson).pipe(jsonOut);

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末啃憎,一起剝皮案震驚了整個(gè)濱河市婶博,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌荧飞,老刑警劉巖,帶你破解...
    沈念sama閱讀 210,978評(píng)論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件名党,死亡現(xiàn)場離奇詭異叹阔,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)传睹,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 89,954評(píng)論 2 384
  • 文/潘曉璐 我一進(jìn)店門耳幢,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人欧啤,你說我怎么就攤上這事睛藻。” “怎么了邢隧?”我有些...
    開封第一講書人閱讀 156,623評(píng)論 0 345
  • 文/不壞的土叔 我叫張陵店印,是天一觀的道長。 經(jīng)常有香客問我,道長,這世上最難降的妖魔是什么连锯? 我笑而不...
    開封第一講書人閱讀 56,324評(píng)論 1 282
  • 正文 為了忘掉前任炉擅,我火速辦了婚禮熊镣,結(jié)果婚禮上出吹,老公的妹妹穿的比我還像新娘有缆。我一直安慰自己晾剖,他們只是感情好兰珍,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,390評(píng)論 5 384
  • 文/花漫 我一把揭開白布侍郭。 她就那樣靜靜地躺著,像睡著了一般掠河。 火紅的嫁衣襯著肌膚如雪亮元。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,741評(píng)論 1 289
  • 那天口柳,我揣著相機(jī)與錄音苹粟,去河邊找鬼。 笑死跃闹,一個(gè)胖子當(dāng)著我的面吹牛嵌削,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播望艺,決...
    沈念sama閱讀 38,892評(píng)論 3 405
  • 文/蒼蘭香墨 我猛地睜開眼苛秕,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了找默?” 一聲冷哼從身側(cè)響起艇劫,我...
    開封第一講書人閱讀 37,655評(píng)論 0 266
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎惩激,沒想到半個(gè)月后店煞,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,104評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡风钻,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,451評(píng)論 2 325
  • 正文 我和宋清朗相戀三年顷蟀,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片骡技。...
    茶點(diǎn)故事閱讀 38,569評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡鸣个,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出布朦,到底是詐尸還是另有隱情囤萤,我是刑警寧澤,帶...
    沈念sama閱讀 34,254評(píng)論 4 328
  • 正文 年R本政府宣布是趴,位于F島的核電站涛舍,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏唆途。R本人自食惡果不足惜做盅,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,834評(píng)論 3 312
  • 文/蒙蒙 一缤削、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧吹榴,春花似錦亭敢、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,725評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至远剩,卻和暖如春扣溺,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背瓜晤。 一陣腳步聲響...
    開封第一講書人閱讀 31,950評(píng)論 1 264
  • 我被黑心中介騙來泰國打工锥余, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人痢掠。 一個(gè)月前我還...
    沈念sama閱讀 46,260評(píng)論 2 360
  • 正文 我出身青樓驱犹,卻偏偏與公主長得像,于是被迫代替她去往敵國和親足画。 傳聞我的和親對(duì)象是個(gè)殘疾皇子雄驹,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,446評(píng)論 2 348

推薦閱讀更多精彩內(nèi)容

  • https://nodejs.org/api/documentation.html 工具模塊 Assert 測試 ...
    KeKeMars閱讀 6,312評(píng)論 0 6
  • stream 流是一個(gè)抽象接口,在 Node 里被不同的對(duì)象實(shí)現(xiàn)淹辞。例如 request to an HTTP se...
    明明三省閱讀 3,398評(píng)論 1 10
  • 一医舆、什么是Stream(流) 流(stream)在 Node.js 中是處理流數(shù)據(jù)的抽象接口(abstract i...
    Brolly閱讀 5,382評(píng)論 0 0
  • 簡介 主要對(duì)stream這個(gè)概念做一個(gè)形象的描述和理解,同時(shí)介紹一下比較常用的API象缀。主要參考了Node.js的官...
    cooody閱讀 1,200評(píng)論 0 0
  • 流是Node中最重要的組件和模式之一蔬将。在社區(qū)里有一句格言說:讓一切事務(wù)流動(dòng)起來。這已經(jīng)足夠來描述在Node中流...
    宮若石閱讀 544評(píng)論 0 0