眾所周知者蠕,NODEJS在高并發(fā),I/O密集型應用操作的時候有很多優(yōu)勢屡贺,而這些都脫離不了“流”的支撐蠢棱。請求流锌杀,響應流甩栈,文件流,Socket流糕再,甚至console模塊都使用了流量没。而流的實現(xiàn),尤其是其內(nèi)部的實現(xiàn)突想,在整個NODEJS的學習中就很有學習的必要殴蹄。
流的模型可以總結為“生產(chǎn)者究抓,消費者”模型。流的一端生產(chǎn)數(shù)據(jù)(可以理解為從水龍頭放水)袭灯,另一端消費數(shù)據(jù)(水放出來后你干嘛用不管刺下,消費就好了)。當然稽荧,其內(nèi)部代碼實現(xiàn)會考慮很多細節(jié)的地方橘茉,這些可以通過調(diào)試進入源碼查看很多細節(jié)的地方。
在源碼中跟流相關的模塊有:
- lib/module.js
- lib/stream_readable.js
- lib/stream_writable.js
- lib/stream_transform.js
- lib/stream_duplex.js
源碼非常清晰姨丈,這就對應流的四種類型畅卓,Readable流,Writable流蟋恬,Transform流翁潘,Duplex流。
其中Readable和Writable是重點歼争,這兩個搞明白拜马,Transform和Duplex就比較簡單了。
Readable Stream
Readable Stream有兩種模式矾飞,一種是Flowing Mode一膨,流動模式;另外一種是Paused Mode洒沦,暫停模式豹绪。
切換到流動模式的方式有:
- 監(jiān)聽data時間
rs.on("data", (chunk)=>{}); - 調(diào)用stream.resume方法
- 調(diào)用stream.pipe方法將數(shù)據(jù)發(fā)送給writable stream
切換到暫停模式的方法有:
- 調(diào)用stream.pause方法
- 如果存在管道,調(diào)用stream.unpipe方法
tip: 這兩種流是可以隨時切換的
流動模式和暫停模式有什么區(qū)別申眼,為什么要這么設計瞒津。
流動模式就是像流水一樣源源不斷的讀取數(shù)據(jù)(注意不是到緩存對象),不管你消費不消費括尸。
暫停模式可以暫時不讀取數(shù)據(jù)巷蚪,關閉水龍頭。
有一個用的比較多的詞語叫背壓濒翻,一般來說屁柏,讀的速度會比寫入的速度快,如果不暫停有送,還是源源不斷的讀取數(shù)據(jù)淌喻,會造成內(nèi)存過大,消耗性能雀摘,這個時候最好的方式是消費多少裸删,讀取多少,由此引申出管道的概念阵赠。
最好的管道就是生產(chǎn)和消費同步涯塔,如果讀的過快肌稻,先暫停讀取,有需要再通知讀取即可匕荸。
從上面的語義描述可以看出爹谭,流是基于事件的,事件在這里承擔著消息的注冊和發(fā)送榛搔。
上面的都是概念相關旦棉,現(xiàn)在來一個簡單版的可讀流,可寫流药薯。代碼雖然簡化了很多绑洛,但是對于理解整個流的流程,會非常有幫助童本。
先來看流動模式的可讀流
let EventEmitter = require('events');
let fs = require('fs');
class ReadStream extends EventEmitter {
constructor(path, options) {
super(path, options);
this.path = path;
this.flags = options.flags || 'r';
this.mode = options.mode || 0o666;
this.highWaterMark = options.highWaterMark || 64 * 1024;
this.pos = this.start = options.start || 0;
this.end = options.end;
this.encoding = options.encoding;
this.flowing = null;
this.buffer = Buffer.alloc(this.highWaterMark);
this.open();//準備打開文件讀取
//當給這個實例添加了任意的監(jiān)聽函數(shù)時會觸發(fā)newListener
this.on('newListener',(type,listener)=>{
//如果監(jiān)聽了data事件真屯,流會自動切換的流動模式
if(type == 'data'){
this.flowing = true;
this.read();
}
});
}
read(){
if(typeof this.fd != 'number'){
return this.once('open',()=>this.read());
}
let howMuchToRead = this.end?Math.min(this.end - this.pos + 1,this.highWaterMark):this.highWaterMark;
//this.buffer并不是緩存區(qū)
console.log('howMuchToRead',howMuchToRead);
fs.read(this.fd,this.buffer,0,howMuchToRead,this.pos,(err,bytes)=>{//bytes是實際讀到的字節(jié)數(shù)
if(err){
if(this.autoClose)
this.destroy();
return this.emit('error',err);
}
if(bytes){
let data = this.buffer.slice(0,bytes);
this.pos += bytes;
data = this.encoding?data.toString(this.encoding):data;
this.emit('data',data);
if(this.end && this.pos > this.end){
return this.endFn();
}else{
if(this.flowing)
this.read();
}
}else{
return this.endFn();
}
})
}
endFn(){
this.emit('end');
this.destroy();
}
open() {
fs.open(this.path,this.flags,this.mode,(err,fd)=>{
if(err){
if(this.autoClose){
this.destroy();
return this.emit('error',err);
}
}
this.fd = fd;
this.emit('open');
})
}
destroy(){
fs.close(this.fd,()=>{
this.emit('close');
});
}
pipe(dest){
this.on('data',data=>{
let flag = dest.write(data);
if(!flag){
this.pause();
}
});
dest.on('drain',()=>{
this.resume();
});
}
//可讀流會進入流動模式,當暫停的時候穷娱,
pause(){
this.flowing = false;
}
resume(){
this.flowing = true;
this.read();
}
}
module.exports = ReadStream;
暫停模式的可讀流
let fs = require('fs');
let EventEmitter = require('events');
class ReadStream extends EventEmitter {
constructor(path, options) {
super(path, options);
this.path = path;
this.highWaterMark = options.highWaterMark || 64 * 1024;
this.buffer = Buffer.alloc(this.highWaterMark);
this.flags = options.flags || 'r';
this.encoding = options.encoding;
this.mode = options.mode || 0o666;
this.start = options.start || 0;
this.end = options.end;
this.pos = this.start;
this.autoClose = options.autoClose || true;
this.bytesRead = 0;
this.closed = false;
this.flowing;
this.needReadable = false;
this.length = 0;
this.buffers = [];
this.on('end', function () {
if (this.autoClose) {
this.destroy();
}
});
this.on('newListener', (type) => {
if (type == 'data') {
this.flowing = true;
this.read();
}
if (type == 'readable') {
this.read(0);
}
});
this.open();
}
open() {
fs.open(this.path, this.flags, this.mode, (err, fd) => {
if (err) {
if (this.autoClose) {
this.destroy();
return this.emit('error', err);
}
}
this.fd = fd;
this.emit('open');
});
}
read(n) {
if (typeof this.fd != 'number') {
return this.once('open', () => this.read());
}
n = parseInt(n, 10);
if (n != n) {
n = this.length;
}
if (this.length == 0)
this.needReadable = true;
let ret;
if (0 < n < this.length) {
ret = Buffer.alloc(n);
let b;
let index = 0;
while (null != (b = this.buffers.shift())) {
for (let i = 0; i < b.length; i++) {
ret[index++] = b[i];
if (index == ret.length) {
this.length -= n;
b = b.slice(i + 1);
this.buffers.unshift(b);
break;
}
}
}
if (this.encoding) ret = ret.toString(this.encoding);
}
let _read = () => {
let m = this.end ? Math.min(this.end - this.pos + 1, this.highWaterMark) : this.highWaterMark;
fs.read(this.fd, this.buffer, 0, m, this.pos, (err, bytesRead) => {
if (err) {
return
}
let data;
if (bytesRead > 0) {
data = this.buffer.slice(0, bytesRead);
this.pos += bytesRead;
this.length += bytesRead;
if (this.end && this.pos > this.end) {
if (this.needReadable) {
this.emit('readable');
}
this.emit('end');
} else {
this.buffers.push(data);
if (this.needReadable) {
this.emit('readable');
this.needReadable = false;
}
}
} else {
if (this.needReadable) {
this.emit('readable');
}
return this.emit('end');
}
})
}
if (this.length == 0 || (this.length < this.highWaterMark)) {
_read(0);
}
return ret;
}
destroy() {
fs.close(this.fd, (err) => {
this.emit('close');
});
}
pause() {
this.flowing = false;
}
resume() {
this.flowing = true;
this.read();
}
pipe(dest) {
this.on('data', (data) => {
let flag = dest.write(data);
if (!flag) this.pause();
});
dest.on('drain', () => {
this.resume();
});
this.on('end', () => {
dest.end();
});
}
}
module.exports = ReadStream;
可寫流
let fs = require('fs');
let EventEmitter = require('events');
class WriteStream extends EventEmitter {
constructor(path, options) {
super(path, options);
this.path = path;
this.flags = options.flags || 'w';
this.mode = options.mode || 0o666;
this.start = options.start || 0;
this.pos = this.start;//文件的寫入索引
this.encoding = options.encoding || 'utf8';
this.autoClose = options.autoClose;
this.highWaterMark = options.highWaterMark || 16 * 1024;
this.buffers = [];//緩存區(qū)
this.writing = false;//表示內(nèi)部正在寫入數(shù)據(jù)
this.length = 0;//表示緩存區(qū)字節(jié)的長度
this.open();
}
open() {
fs.open(this.path, this.flags, this.mode, (err, fd) => {
if (err) {
if (this.autoClose) {
this.destroy();
}
return this.emit('error', err);
}
this.fd = fd;
this.emit('open');
});
}
//如果底層已經(jīng)在寫入數(shù)據(jù)的話绑蔫,則必須當前要寫入數(shù)據(jù)放在緩沖區(qū)里
write(chunk, encoding, cb) {
chunk = Buffer.isBuffer(chunk)?chunk:Buffer.from(chunk,this.encoding);
let len = chunk.length;
//緩存區(qū)的長度加上當前寫入的長度
this.length += len;
//判斷當前最新的緩存區(qū)是否小于最高水位線
let ret = this.length < this.highWaterMark;
if (this.writing) {//表示正在向底層寫數(shù)據(jù),則當前數(shù)據(jù)必須放在緩存區(qū)里
this.buffers.push({
chunk,
encoding,
cb
});
} else {//直接調(diào)用底層的寫入方法進行寫入
//在底層寫完當前數(shù)據(jù)后要清空緩存區(qū)
this.writing = true;
this._write(chunk, encoding, () => this.clearBuffer());
}
return ret;
}
clearBuffer() {
//取出緩存區(qū)中的第一個buffer
//8 7
let data = this.buffers.shift();
if(data){
this._write(data.chunk,data.encoding,()=>this.clearBuffer())
}else{
this.writing = false;
//緩存區(qū)清空了
this.emit('drain');
}
}
_write(chunk, encoding, cb) {
if(typeof this.fd != 'number'){
return this.once('open',()=>this._write(chunk, encoding, cb));
}
fs.write(this.fd,chunk,0,chunk.length,this.pos,(err,bytesWritten)=>{
if(err){
if(this.autoClose){
this.destroy();
this.emit('error',err);
}
}
this.pos += bytesWritten;
//寫入多少字母泵额,緩存區(qū)減少多少字節(jié)
this.length -= bytesWritten;
cb && cb();
})
}
destroy() {
fs.close(this.fd, () => {
this.emit('close');
})
}
}
module.exports = WriteStream;