概念
Stream 是Node.js中最重要的組件和模式之一桥温,之前在社區(qū)中看到這樣一句格言“Stream all the things(流是一切)”玄叠。
具體的來說流是一組有序的惦银,有起點和終點的字節(jié)數(shù)據傳輸手段溉贿,它是一個抽象的接口锥余。
流是數(shù)據的集合 —— 就像數(shù)組或字符串一樣躺苦。區(qū)別在于流中的數(shù)據可能不會立刻就全部可用瓮恭,并且你無需一次性地把這些數(shù)據全部放入內存雄坪。這使得流在操作大量數(shù)據或是數(shù)據從外部來源逐段發(fā)送過來的時候變得非常有用。
每一個流對象都是EventEmitter類的一個實例屯蹦,都有相應的on和emit方法维哈,在下文中演示的代碼示例中我們會看到绳姨。
stream 是node的核心模塊,引入方式如下:
let Stream = require('stream')
實現(xiàn)形式
流在node中有多種實現(xiàn)形式阔挠,例如:
1. http協(xié)議中的 請求req和響應res
2. tcp協(xié)議中的套接字對象sockets
3. fs文件模塊中的可讀流和可寫流
4. process進程模塊中的stdout stderr
5. zlib 中的streams
.....
類型
提供了以下四種類型:
可讀流(Readable)
let Readable = stream.Readable
可寫流(Writeable)
let Readable = stream.Writeable
可讀寫流(Duplex)
let Duplex = stream.Duplex
轉換流(Transform)
let Transform = stream.Transform
原理
Readable
實現(xiàn)了stream.Readable接口的對象,將對象數(shù)據讀取為流數(shù)據,當監(jiān)聽data事件后,開始發(fā)射數(shù)據.
1. 創(chuàng)建
let rs = fs.createReadStream(path,{
flags: 'r', // 打開文件要做的操作,默認為'r'
encoding: null, // 默認為null
start: '0', // 開始讀取的索引位置
end: '', // 結束讀取的索引位置(包括結束位置)
highWaterMark: '', // 讀取緩存區(qū)默認的大小的閥值64kb
})
2. 方法及作用
// 1.監(jiān)聽data事件 流自動切換到流動模式
// 2.數(shù)據會盡可能快的讀出
rs.on('data', function (data) {
console.log(data);
});
// 數(shù)據讀取完畢后觸發(fā)end事件
rs.on('end', function () {
console.log('讀取完成');
});
// 可讀流打開事件
rs.on('open', function () {
console.log(err);
});
// 可讀流關閉事件
rs.on('close', function () {
console.log(err);
});
// 指定編碼 和上面創(chuàng)建流時的參數(shù)encoding意思相同
rs.setEncoding('utf8');
rs.on('data', function (data) {
// 可讀流暫停讀取
rs.pause();
console.log(data);
});
setTimeout(function () {
// 可讀流恢復讀取
rs.resume();
},2000);
3. 分類
可讀流分為:流動模式和暫停模式
可讀流對象readable中有一個維護狀態(tài)的對象飘庄,readable._readableState,這里簡稱為state购撼。
其中有一個標記跪削,state.flowing, 可用來判別流的模式迂求。
它有三種可能值:
true 流動模式碾盐。
false 暫停模式。
null 初始狀態(tài)揩局。
1) 流動模式(flowing mode)
流動模式下毫玖,數(shù)據會源源不斷地生產出來,形成“流動”現(xiàn)象谐腰。
監(jiān)聽流的data事件便可進入該模式孕豹。
2) 暫停模式(paused mode)
暫停模式下,需要顯示地調用read()十气,觸發(fā)data事件。
在初始狀態(tài)下春霍,監(jiān)聽data事件砸西,會使流進入流動模式。
但如果在暫停模式下址儒,監(jiān)聽data事件并不會使它進入流動模式芹枷。
為了消耗流,需要顯示調用read()方法莲趣。
3)相互轉化
調用readable.resume()可使流進入流動模式鸳慈,state.flowing被設為true。
調用readable.pause()可使流進入暫停模式喧伞,state.flowing被設為false走芋。
4. 原理
創(chuàng)建可讀流時,需要繼承Readable潘鲫,并實現(xiàn)_read方法翁逞。
[圖片上傳失敗...(image-c2590b-1517667825912)]
-
_read方法是從底層系統(tǒng)讀取具體數(shù)據的邏輯,即生產數(shù)據的邏輯溉仑。
在_read方法中挖函,通過調用push(data)將數(shù)據放入可讀流中供下游消耗。
在_read方法中浊竟,可以同步調用push(data)怨喘,也可以異步調用津畸。
當全部數(shù)據都生產出來后,必須調用push(null)來結束可讀流必怜。
流一旦結束肉拓,便不能再調用push(data)添加數(shù)據。
可以通過監(jiān)聽data事件的方式消耗可讀流棚赔。在首次監(jiān)聽其data事件后帝簇,readable便會持續(xù)不斷地調用_read(),通過觸發(fā)data事件將數(shù)據輸出靠益。
第一次data事件會在下一個tick中觸發(fā)丧肴,所以,可以安全地將數(shù)據輸出前的邏輯放在事件監(jiān)聽后(同一個tick中)胧后。
當數(shù)據全部被消耗時芋浮,會觸發(fā)end事件。 詳解
[圖片上傳失敗...(image-edbd5b-1517667825912)]
doRead
流中維護了一個緩存壳快,當緩存中的數(shù)據足夠多時纸巷,調用read()不會引起_read()的調用,即不需要向底層請求數(shù)據眶痰。用doRead來表示read(n)是否需要向底層取數(shù)據.
var doRead = state.needReadable
if (state.length === 0 || state.length - n < state.highWaterMark) {
doRead = true
}
if (state.ended || state.reading) {
doRead = false
}
if (doRead) {
state.reading = true
state.sync = true
if (state.length === 0) {
state.needReadable = true
}
this._read(state.highWaterMark)
state.sync = false
}
當緩存區(qū)的長度為0或者緩存區(qū)的數(shù)量小于state.highWaterMark這個閥值瘤旨,則會調用 _read()去底層讀取數(shù)據。
state.reading 標志上次從底層取數(shù)據的操作是否完成竖伯,一旦push 被調用存哲,就會就會設置為false,表示此次_read()結束七婴。
push
消耗方調用read(n)促使流輸出數(shù)據祟偷,而流通過_read()使底層調用push方法將數(shù)據傳給流。
如果調用push方法時緩存為空打厘,則當前數(shù)據即為下一個需要的數(shù)據修肠。
這個數(shù)據可能先添加到緩存中,也可能直接輸出户盯。
執(zhí)行read方法時嵌施,在調用_read后,如果從緩存中取到了數(shù)據先舷,就以data事件輸出艰管。
所以,如果_read異步調用push時發(fā)現(xiàn)緩存為空蒋川,則意味著當前數(shù)據是下一個需要的數(shù)據牲芋,且不會被read方法輸出,應當在push方法中立即以data事件輸出。
上圖立即輸出條件
state.flowing && state.length === 0 && !state.sync
end
由于流是分次向底層請求數(shù)據的缸浦,需要底層顯示地告訴流數(shù)據是否取完夕冲。
所以,當某次(執(zhí)行_read())取數(shù)據時裂逐,調用了push(null)歹鱼,就意味著底層數(shù)據取完。
此時卜高,流會設置state.ended弥姻。
state.length表示緩存中當前的數(shù)據量。
只有當state.length為0掺涛,且state.ended為true庭敦,才意味著所有的數(shù)據都被消耗了。
一旦在執(zhí)行read(n)時檢測到這個條件薪缆,便會觸發(fā)end事件秧廉。
當然,這個事件只會觸發(fā)一次拣帽。
readable
在調用完_read()后疼电,read(n)會試著從緩存中取數(shù)據。
如果_read()是異步調用push方法的减拭,則此時緩存中的數(shù)據量不會增多蔽豺,容易出現(xiàn)數(shù)據量不夠的現(xiàn)象。
如果read(n)的返回值為null拧粪,說明這次未能從緩存中取出所需量的數(shù)據茫虽。
此時,消耗方需要等待新的數(shù)據到達后再次嘗試調用read方法既们。
在數(shù)據到達后,流是通過readable事件來通知消耗方的正什。
在此種情況下啥纸,push方法如果立即輸出數(shù)據,接收方直接監(jiān)聽data事件即可婴氮,否則數(shù)據被添加到緩存中斯棒,需要觸發(fā)readable事件。
消耗方必須監(jiān)聽這個事件主经,再調用read方法取得數(shù)據荣暮。
5. 手寫
流動模式
let EventEmitter = require('events');
let fs = require('fs');
class ReadStream extends EventEmitter {
constructor(path, options) {
super(path, options);
// 初始化參數(shù)
this.path = path; // 路徑
this.flags = options.flags || 'r'; // 打開文件要做的操作,默認為'r'
this.mode = options.mode || 0o666;
this.pos = this.start = options.start || 0; // 偏移量 默認是開始位置
this.end = options.end; // 結束為止
this.encoding = options.encoding; // 編碼形式
this.highWaterMark = options.highWaterMark || 64 * 1024; // 可讀流緩存區(qū)的閥值 默認64k
this.flowing = null;// 標志形式 null初始狀態(tài) false 暫停模式 true 流動模式
this.buffer = Buffer.alloc(this.highWaterMark); // 緩存區(qū)
this.open();// //準備打開文件讀取
//當給這個實例添加了任意的監(jiān)聽函數(shù)時會觸發(fā)newListener
this.on('newListener', (type, listener) => {
if (type == 'data') { // 當可讀流監(jiān)聽data事件時初始態(tài)變成流動模式
this.flowing = true;
this.read();
}
});
}
read() {
// 如果文件標示符不是數(shù)字的話 說明文件還沒有打開,這時候我們要監(jiān)聽 文件的打開事件 一旦打開就開始讀取
if (typeof this.fd !== 'number') {
return this.once('open', () => this.read());
}
// 每次讀取多少個
let howMuchToRead = this.end ? Math.min(this.end - this.pos + 1, this.highWaterMark) : this.highWaterMark;
fs.read(this.fd, this.buffer, 0, howMuchToRead, this.pos, (err, bytes) => {
if (err) {
if (this.autoClose) {
this.destroy();
}
return this.emit('error', err);
}
if (bytes) {
let data = this.buffer.slice(0, bytes);
data = this.encoding ? data.toString(this.encoding) : data;
this.emit('data', data);
this.pos += bytes;
if (this.end && this.pos > this.end) {
return this.endFn();
} else {
if (this.flowing) {
this.read();
}
}
} else {
return this.endFn();
}
})
}
endFn() {
this.emit('end');
this.destroy();
}
open() {
fs.open(this.path, this.flags, this.mode, (err, fd) => {
if (err) {
if (this.autoClose) {
this.destroy();
return this.emit('error', err);
}
}
this.fd = fd;
this.emit('open');
})
}
destroy() {
fs.close(this.fd, () => {
this.emit('close');
});
}
pipe(dest) {
this.on('data', data => {
let flag = dest.write(data);
if (!flag) {
this.pause();
}
});
dest.on('drain', () => {
this.resume();
});
}
pause() {
this.flowing = false;
}
resume() {
this.flowing = true;
this.read();
}
}
module.exports = ReadStream;
測試代碼
let ReadStream = require('./ReadStream');
let rs = new ReadStream('1.txt',{
highWaterMark:3,
encoding:'utf8'
});
//在真實的情況下罩驻,當可讀流創(chuàng)建后會立刻進行暫停模式穗酥。其實會立刻填充緩存區(qū)
//緩存區(qū)大小是可以看到
rs.on('readable',function () {
console.log(rs.length);//3
//當你消費掉一個字節(jié)之后,緩存區(qū)變成2個字節(jié)了
let char = rs.read(1);
console.log(char);
console.log(rs.length);
//一旦發(fā)現(xiàn)緩沖區(qū)的字節(jié)數(shù)小于最高水位線了,則會現(xiàn)再讀到最高水位線個字節(jié)填充到緩存區(qū)里
setTimeout(()=>{
console.log(rs.length);//5
},500)
});
暫停模式
let fs = require('fs');
let EventEmitter = require('events');
class ReadStream extends EventEmitter {
constructor(path, options) {
super(path, options);
this.path = path;
this.highWaterMark = options.highWaterMark || 64 * 1024;
this.buffer = Buffer.alloc(this.highWaterMark);
this.flags = options.flags || 'r';
this.encoding = options.encoding;
this.mode = options.mode || 0o666;
this.start = options.start || 0;
this.end = options.end;
this.pos = this.start;
this.autoClose = options.autoClose || true;
this.bytesRead = 0;
this.closed = false;
this.flowing;
this.needReadable = false;
this.length = 0;
this.buffers = [];
this.on('end', function () {
if (this.autoClose) {
this.destroy();
}
});
this.on('newListener', (type) => {
if (type == 'data') {
this.flowing = true;
this.read();
}
if (type == 'readable') {
this.read(0);
}
});
this.open();
}
open() {
fs.open(this.path, this.flags, this.mode, (err, fd) => {
if (err) {
if (this.autoClose) {
this.destroy();
return this.emit('error', err);
}
}
this.fd = fd;
this.emit('open');
});
}
read(n) {
if (typeof this.fd != 'number') {
return this.once('open', () => this.read());
}
n = parseInt(n, 10);
if (n != n) {
n = this.length;
}
if (this.length == 0)
this.needReadable = true;
let ret;
if (0 < n < this.length) {
ret = Buffer.alloc(n);
let b;
let index = 0;
while (null != (b = this.buffers.shift())) {
for (let i = 0; i < b.length; i++) {
ret[index++] = b[i];
if (index == ret.length) {
this.length -= n;
b = b.slice(i + 1);
this.buffers.unshift(b);
break;
}
}
}
ret = ret.toString(this.encoding);
}
let _read = () => {
let m = this.end ? Math.min(this.end - this.pos + 1, this.highWaterMark) : this.highWaterMark;
fs.read(this.fd, this.buffer, 0, m, this.pos, (err, bytesRead) => {
if (err) {
return
}
let data;
if (bytesRead > 0) {
data = this.buffer.slice(0, bytesRead);
this.pos += bytesRead;
this.length += bytesRead;
if (this.end && this.pos > this.end) {
if (this.needReadable) {
this.emit('readable');
}
this.emit('end');
} else {
this.buffers.push(data);
if (this.needReadable) {
this.emit('readable');
this.needReadable = false;
}
}
} else {
if (this.needReadable) {
this.emit('readable');
}
return this.emit('end');
}
})
}
if (this.length == 0 || (this.length < this.highWaterMark)) {
_read();
}
return ret;
}
destroy() {
fs.close(this.fd, (err) => {
this.emit('close');
});
}
pause() {
this.flowing = false;
}
resume() {
this.flowing = true;
this.read();
}
pipe(dest) {
this.on('data', (data) => {
let flag = dest.write(data);
if (!flag) this.pause();
});
dest.on('drain', () => {
this.resume();
});
this.on('end', () => {
dest.end();
});
}
}
module.exports = ReadStream;
Writable
- 創(chuàng)建
let rs = fs.createWriteStream(path,{
flags: 'w', // 打開文件要做的操作,默認為'w'
encoding: null, // 默認為null
highWaterMark: '', // 讀取緩存區(qū)默認的大小的閥值16kb
})
- 方法及作用
let ws = fs.createWriteStream(path,{
chunk: '',// 寫入的數(shù)據buffer/string
encoding: '', // 編碼格式chunk為字符串時有用砾跃,可選
callback: ()=>{} // 寫入成功后的回調
});
// 返回值為布爾值骏啰,系統(tǒng)緩存區(qū)滿時為false,未滿時為true
ws.end(chunk,[encoding],[callback]);
// 表明接下來沒有數(shù)據要被寫入 Writable 通過傳入可選的 chunk 和 encoding 參數(shù),可以在關閉流之前再寫入一段數(shù)據 如果傳入了可選的 callback 函數(shù)抽高,它將作為 'finish' 事件的回調函數(shù)
// 當一個流不處在 drain 的狀態(tài)判耕, 對 write() 的調用會緩存數(shù)據塊, 并且返回 false翘骂。 一旦所有當前所有緩存的數(shù)據塊都排空了(被操作系統(tǒng)接受來進行輸出)壁熄, 那么 'drain' 事件就會被觸發(fā)
建議, 一旦 write() 返回 false碳竟, 在 'drain' 事件觸發(fā)前草丧, 不能寫入任何數(shù)據塊
let fs = require('fs');
let ws = fs.createWriteStream('./2.txt',{
flags:'w',
encoding:'utf8',
highWaterMark:3
});
let i = 10;
function write(){
let flag = true;
while(i&&flag){
flag = ws.write("1");
i--;
console.log(flag);
}
}
write();
ws.on('drain',()=>{
console.log("drain");
write();
});
// 在調用了 stream.end() 方法,且緩沖區(qū)數(shù)據都已經傳給底層系統(tǒng)之后瞭亮, 'finish' 事件將被觸發(fā)方仿。
var writer = fs.createWriteStream('./2.txt');
for (let i = 0; i < 100; i++) {
writer.write(`hello, ${i}!\n`);
}
writer.end('結束\n');
writer.on('finish', () => {
console.error('所有的寫入已經完成!');
});
// pipe用法 將數(shù)據的滯留量限制到一個可接受的水平,以使得不同速度的來源和目標不會淹沒可用內存统翩。
readStream.pipe(writeStream);
var from = fs.createReadStream('./1.txt');
var to = fs.createWriteStream('./2.txt');
from.pipe(to);
// pipe方法的原理
var fs = require('fs');
var ws = fs.createWriteStream('./2.txt');
var rs = fs.createReadStream('./1.txt');
rs.on('data', function (data) {
var flag = ws.write(data);
if(!flag)
rs.pause();
});
ws.on('drain', function () {
rs.resume();
});
rs.on('end', function () {
ws.end();
});
3.原理
const Writable = require('stream').Writable
const writable = Writable()
// 實現(xiàn)`_write`方法
// 這是將數(shù)據寫入底層的邏輯
writable._write = function (data, enc, next) {
// 將流中的數(shù)據寫入底層
process.stdout.write(data.toString().toUpperCase())
// 寫入完成時仙蚜,調用`next()`方法通知流傳入下一個數(shù)據
process.nextTick(next)
}
// 所有數(shù)據均已寫入底層
writable.on('finish', () => process.stdout.write('DONE'))
// 將一個數(shù)據寫入流中
writable.write('a' + '\n')
writable.write('b' + '\n')
writable.write('c' + '\n')
// 再無數(shù)據寫入流時,需要調用`end`方法
writable.end()
上游通過調用writable.write(data)將數(shù)據寫入可寫流中厂汗。write()方法會調用_write()將data寫入底層委粉。
在_write中,當數(shù)據成功寫入底層后娶桦,必須調用next(err)告訴流開始處理下一個數(shù)據贾节。
next的調用既可以是同步的,也可以是異步的衷畦。
上游必須調用writable.end(data)來結束可寫流栗涂,data是可選的。此后祈争,不能再調用write新增數(shù)據斤程。
在end方法調用后,當所有底層的寫操作均完成時菩混,會觸發(fā)finish事件忿墅。
4.手寫
WriteStream
let fs = require('fs');
let EventEmitter = require('events');
class WriteStream extends EventEmitter {
constructor(path, options) {
super(path, options);
this.path = path;
this.flags = options.flags || 'w';
this.mode = options.mode || 0o666;
this.start = options.start || 0;
this.pos = this.start;
this.encoding = options.encoding || 'utf8';
this.autoClose = options.autoClose;
this.highWaterMark = options.highWaterMark || 16 * 1024;
this.buffers = [];//緩存區(qū)
this.writing = false;//表示內部正在寫入數(shù)據
this.length = 0;//表示緩存區(qū)字節(jié)的長度
this.open();
}
open() {
fs.open(this.path, this.flags, this.mode, (err, fd) => {
if (err) {
if (this.autoClose) {
this.destroy();
}
return this.emit('error', err);
}
this.fd = fd;
this.emit('open');
});
}
write(chunk, encoding, cb) {
chunk = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk, this.encoding);
let len = chunk.length;
this.length += len;
let ret = this.length < this.highWaterMark;
if (this.writing) {
this.buffers.push({
chunk,
encoding,
cb
});
} else {
this.writing = true;
this._write(chunk, encoding, () => this.clearBuffer());
}
return ret;
}
clearBuffer() {
let data = this.buffers.shift();
if (data) {
this._write(data.chunk, data.encoding, () => this.clearBuffer())
} else {
this.writing = false;
this.emit('drain');
}
}
_write(chunk, encoding, cb) {
if (typeof this.fd !== 'number') {
return this.once('open', () => this._write(chunk, encoding, cb));
}
fs.write(this.fd, chunk, 0, chunk.length, this.pos, (err, bytesWritten) => {
if (err) {
if (this.autoClose) {
this.destroy();
this.emit('error', err);
}
}
this.pos += bytesWritten;
this.length -= bytesWritten;
cb && cb();
})
}
destroy() {
fs.close(this.fd, () => {
this.emit('close');
})
}
}
module.exports = WriteStream;
Duplex
Duplex實際上就是繼承了Readable和Writable的一類流。
所以沮峡,一個Duplex對象既可當成可讀流來使用(需要實現(xiàn)_read方法)疚脐,也可當成可寫流來使用(需要實現(xiàn)_write方法)。
var Duplex = require('stream').Duplex
var duplex = Duplex()
// 可讀端底層讀取邏輯
duplex._read = function () {
this._readNum = this._readNum || 0
if (this._readNum > 1) {
this.push(null)
} else {
this.push('' + (this._readNum++))
}
}
// 可寫端底層寫邏輯
duplex._write = function (buf, enc, next) {
// a, b
process.stdout.write('_write ' + buf.toString() + '\n')
next()
}
// 0, 1
duplex.on('data', data => console.log('ondata', data.toString()))
duplex.write('a')
duplex.write('b')
duplex.end()
上面的代碼中實現(xiàn)了_read方法邢疙,所以可以監(jiān)聽data事件來消耗Duplex產生的數(shù)據棍弄。
同時望薄,又實現(xiàn)了_write方法,可作為下游去消耗數(shù)據照卦。
因為它既可讀又可寫式矫,所以稱它有兩端:可寫端和可讀端。
可寫端的接口與Writable一致役耕,作為下游來使用采转;可讀端的接口與Readable一致,作為上游來使用瞬痘。
Transform
在上面的例子中故慈,可讀流中的數(shù)據(0, 1)與可寫流中的數(shù)據('a', 'b')是隔離開的,但在Transform中可寫端寫入的數(shù)據經變換后會自動添加到可讀端框全。
Tranform繼承自Duplex察绷,并已經實現(xiàn)了_read和_write方法,同時要求用戶實現(xiàn)一個_transform方法津辩。
'use strict'
const Transform = require('stream').Transform
class Rotate extends Transform {
constructor(n) {
super()
// 將字母旋轉`n`個位置
this.offset = (n || 13) % 26
}
// 將可寫端寫入的數(shù)據變換后添加到可讀端
_transform(buf, enc, next) {
var res = buf.toString().split('').map(c => {
var code = c.charCodeAt(0)
if (c >= 'a' && c <= 'z') {
code += this.offset
if (code > 'z'.charCodeAt(0)) {
code -= 26
}
} else if (c >= 'A' && c <= 'Z') {
code += this.offset
if (code > 'Z'.charCodeAt(0)) {
code -= 26
}
}
return String.fromCharCode(code)
}).join('')
// 調用push方法將變換后的數(shù)據添加到可讀端
this.push(res)
// 調用next方法準備處理下一個
next()
}
}
var transform = new Rotate(3)
transform.on('data', data => process.stdout.write(data))
transform.write('hello, ')
transform.write('world!')
transform.end()
// khoor, zruog!