在前端工程化中產(chǎn)生了很多工具,例如grunt,gulp,webpack,babel...等等侨赡,這些工具都是通過(guò)node中的stream實(shí)現(xiàn)。
在node中stream也是非常非常非常重要的模塊,比如我們常用的console就是基于stream的實(shí)例骇塘,還有net,http等核心模塊都是基于stream來(lái)實(shí)現(xiàn)的,可見stream是多么的重要韩容。
1.什么是stream?
是一種數(shù)據(jù)傳輸手段款违,從一個(gè)地方傳輸?shù)搅硪粋€(gè)地方。
在寫node的時(shí)候會(huì)存在讀取文件群凶,比如現(xiàn)在我們有一個(gè)非常大的文件插爹,50G吧
const fs = require('fs');
// test文件50個(gè)G
fs.readFileSync('./test.text');
這個(gè)時(shí)候需要消耗大量的時(shí)候去讀取這個(gè)文件,然而我們可能關(guān)心的并不是文件所有內(nèi)容请梢,還會(huì)存在直接讀取失敗赠尾。stream就是為了解決這些問(wèn)題而產(chǎn)生,我們讀一些數(shù)據(jù)處理一些數(shù)據(jù)溢陪,當(dāng)讀到所關(guān)心數(shù)據(jù)的時(shí)候萍虽,則可以不再繼續(xù)讀取。
stream翻譯成中文‘流’形真,就像水一樣杉编,從水龍頭流向水杯。
2. Stream模塊
stream繼承于EventEmitter咆霜,擁有事件觸發(fā)和事件監(jiān)聽功能邓馒。主要分為4種基本流類型:
- Readable (可讀流)
- Writable (可寫流)
- Duplex (讀寫流)
- Transform (轉(zhuǎn)換流)
在流中默認(rèn)可操作的類型string和Buffer,如果需要處理其他類型的js值需要傳入?yún)?shù)objectMode: true(默認(rèn)為false)
在流中存在一個(gè)重要的概念,緩存區(qū)蛾坯,就像拿水杯去接水光酣,水杯就是緩存區(qū),當(dāng)水杯滿脉课,則會(huì)關(guān)閉水龍頭救军,等把水杯里面的水消耗完畢,再打開水龍頭去接水倘零。
stream默認(rèn)緩存區(qū)大小為16384(16kb),可以通過(guò)highWaterMark參數(shù)設(shè)置緩存區(qū)大小唱遭,但設(shè)置encoding后,以設(shè)置的字符編碼為單位衡量呈驶。
3. Readable
首先創(chuàng)建一個(gè)可讀流,可接收5個(gè)參數(shù):
- highWaterMark 緩存區(qū)字節(jié)大小拷泽,默認(rèn)16384
- encoding 字符編碼,默認(rèn)為null,就是buffer
- objectMode 是否操作js其他類型 默認(rèn)false
- read 對(duì)內(nèi)部的_read()方式實(shí)現(xiàn) 子類實(shí)現(xiàn),父類調(diào)用
- destroy 對(duì)內(nèi)部的_ destroy()方法實(shí)現(xiàn) 子類實(shí)現(xiàn),父類調(diào)用
可讀流中分為2種模式流動(dòng)模式和暫停模式司致。
監(jiān)聽data事件拆吆,觸發(fā)流動(dòng)模式,會(huì)源源不斷生產(chǎn)數(shù)據(jù)觸發(fā)data事件:
const { Readable } = require('stream');
let i = 0;
const rs = Readable({
encoding: 'utf8',
// 這里傳入的read方法脂矫,會(huì)被寫入_read()
read: (size) => {
// size 為highWaterMark大小
// 在這個(gè)方法里面實(shí)現(xiàn)獲取數(shù)據(jù)枣耀,讀取到數(shù)據(jù)調(diào)用rs.push([data]),如果沒(méi)有數(shù)據(jù)了羹唠,push(null)結(jié)束流
if (i < 10) {
rs.push(`當(dāng)前讀取數(shù)據(jù): ${i++}`);
} else {
rs.push(null);
}
},
// 源代碼奕枢,可覆蓋
destroy(err, cb) {
rs.push(null);
cb(err);
}
});
rs.on('data', (data) => {
console.log(data);
// 每次push數(shù)據(jù)則觸發(fā)data事件
// 當(dāng)前讀取數(shù)據(jù): 0
// 當(dāng)前讀取數(shù)據(jù): 1
// 當(dāng)前讀取數(shù)據(jù): 2
// 當(dāng)前讀取數(shù)據(jù): 3
// 當(dāng)前讀取數(shù)據(jù): 4
// 當(dāng)前讀取數(shù)據(jù): 5
// 當(dāng)前讀取數(shù)據(jù): 6
// 當(dāng)前讀取數(shù)據(jù): 7
// 當(dāng)前讀取數(shù)據(jù): 8
// 當(dāng)前讀取數(shù)據(jù): 9
})
監(jiān)聽readable事件娄昆,觸發(fā)暫停模式佩微,當(dāng)流有了新數(shù)據(jù)或到了流結(jié)束之前觸發(fā)readable事件,需要顯示調(diào)用read([size])讀取數(shù)據(jù):
const { Readable } = require('stream');
let i = 0;
const rs = Readable({
encoding: 'utf8',
highWaterMark: 9,
// 這里傳入的read方法萌焰,會(huì)被寫入_read()
read: (size) => {
// size 為highWaterMark大小
// 在這個(gè)方法里面實(shí)現(xiàn)獲取數(shù)據(jù)哺眯,讀取到數(shù)據(jù)調(diào)用rs.push([data]),如果沒(méi)有數(shù)據(jù)了扒俯,push(null)結(jié)束流
if (i < 10) {
// push其實(shí)是把數(shù)據(jù)放入緩存區(qū)
rs.push(`當(dāng)前讀取數(shù)據(jù): ${i++}`);
} else {
rs.push(null);
}
}
});
rs.on('readable', () => {
const data = rs.read(9);
console.log(data);
//
})
read([size]) size參數(shù):
- 不傳代表讀取緩存區(qū)所有數(shù)據(jù)奶卓。
- 傳入0 填充緩存區(qū), 但返回null
- size < 當(dāng)前緩存區(qū)數(shù)據(jù) 返回所需數(shù)據(jù)
- size > 當(dāng)前緩存區(qū)數(shù)據(jù) 返回null 并改變highWaterMark值
這里的緩存區(qū)數(shù)據(jù)不是指highWaterMark,獲取緩存區(qū)數(shù)據(jù)大小rs._readableState.length撼玄。
流的模式可以自由切換: 通過(guò)rs._readableState.flowing的值獲取當(dāng)前狀態(tài)
- null 初始狀態(tài)
- false 暫停模式
- true 流動(dòng)模式
rs.pause()切換到暫停模式 rs.resume()切換到流動(dòng)模式
在可讀流里面還可以監(jiān)聽其他事件:
rs.on('close', () => {
// 流關(guān)閉時(shí)或文件關(guān)閉時(shí)觸發(fā)
})
rs.on('end', () => {
// 在流中沒(méi)有數(shù)據(jù)可供消費(fèi)時(shí)觸發(fā)
})
rs.on('error', (err) => {
// 發(fā)生錯(cuò)誤時(shí)候
})
4. Writable
可寫流可接受參數(shù):
- highWaterMark 緩存區(qū)字節(jié)大小夺姑,默認(rèn)16384
- decodeStrings 是否將字符編碼傳入緩沖區(qū)
- objectMode 是否操作js其他類型 默認(rèn)false
- write 子類實(shí)現(xiàn),供父類調(diào)用 實(shí)現(xiàn)寫入底層數(shù)據(jù)
- writev 子類實(shí)現(xiàn)掌猛,供父類調(diào)用 一次處理多個(gè)chunk寫入底層數(shù)據(jù)
- destroy 可以覆蓋父類方法盏浙,不能直接調(diào)用,銷毀流時(shí)荔茬,父類調(diào)用
- final 完成寫入所有數(shù)據(jù)時(shí)父類觸發(fā)
在實(shí)現(xiàn)流除了用上面直接傳入?yún)?shù)的方式废膘,還可以用繼承類
class WS extends stream.Writable {
constructor() {
super({
highWaterMark: 1
});
}
_write(chunk, encoding, cb) {
console.log(this._writableState.length);
// chunk 為需要寫入的數(shù)據(jù)
// encoding 字符編碼
// cb 回調(diào)函數(shù), 如果寫入成功需要調(diào)用cb去執(zhí)行下一次寫入,如果發(fā)生錯(cuò)誤慕蔚,可以cb(new Error([錯(cuò)誤信息]))
if (chunk.length < 4) {
fs.writeFileSync('./2.text', chunk, {
flag: 'a'
});
cb();
} else{
cb(new Error('超出4個(gè)字節(jié)'));
}
}
}
const ws = new WS();
let i = 0;
function next() {
let flag = true;
// write() 會(huì)返回boolean false -> 緩存區(qū)沒(méi)滿 true —> 已滿丐黄,需要暫停寫入數(shù)據(jù)
while(i < 10 && flag) {
flag = ws.write(`${i++}`);
console.log('flag', flag);
}
}
next();
// 當(dāng)所有緩存區(qū)數(shù)據(jù)已經(jīng)成功寫入底層數(shù)據(jù),緩存區(qū)沒(méi)有數(shù)據(jù)了,觸發(fā)drain事件
ws.on('drain', () => {
console.log('drain');
// 繼續(xù)寫入緩存區(qū)數(shù)據(jù)
next();
})
可寫流的end事件,一旦觸發(fā)end事件孔飒,后續(xù)不能再寫入數(shù)據(jù).
ws.write('start');
ws.end('end');
ws.wrtie('test'); // 報(bào)錯(cuò) write after end
finish事件:
ws.write('start');
ws.end('end');
ws.on('finish', () => {
console.log('調(diào)用end方法后,并且所有數(shù)據(jù)已經(jīng)寫入底層')
})
cork()與uncork()灌闺,強(qiáng)制所有數(shù)據(jù)先寫入緩存區(qū),直到調(diào)用uncork()或end()坏瞄,這時(shí)一并寫入底層:
const ws = stream.Writable({
writev(chunks, encoding, cb) {
// 這時(shí)chunks為一個(gè)數(shù)組桂对,包含所有的chunk
// 現(xiàn)在length為10
console.log(chunk.length);
}
});
// 寫入數(shù)據(jù)之前,強(qiáng)制寫入數(shù)據(jù)放入緩存區(qū)
ws.cork();
// 寫入數(shù)據(jù)
for (let i = 0; i < 10; i++) {
ws.write(i.toString());
}
// 寫入完畢惦积,可以觸發(fā)寫入底層
ws.uncork();
5. Duplex
讀寫流接校,該方法繼承了可寫流和可讀流,但相互之間沒(méi)有關(guān)系,各自獨(dú)立緩存區(qū)蛛勉,擁有Writable和Readable所有方法和事件鹿寻,同時(shí)實(shí)現(xiàn)_read()和_write()方法。
const fs = require('fs');
const stream = require('stream');
const duplex = stream.Duplex({
write(chunk, encoding, cb) {
console.log(chunk.toString('utf8')); // 寫入
},
read() {
this.push('讀取');
this.push(null);
}
});
console.log(duplex.read(6).toString('utf8')); // 讀取
duplex.write('寫入');
6. Transform
轉(zhuǎn)換流诽凌,這個(gè)流在前端工程化中用到最多毡熏,從一個(gè)地方讀取數(shù)據(jù),轉(zhuǎn)換數(shù)據(jù)后輸出到一個(gè)地方侣诵,該流繼承于Duplex痢法。
const fs = require('fs');
const stream = require('stream');
const transform = stream.Transform({
transform(chunk, encoding, cb){
// 把數(shù)據(jù)轉(zhuǎn)換成大寫字母,然后push到緩存區(qū)
this.push(chunk.toString().toUpperCase());
cb();
}
});
transform.write('a');
console.log(transform.read(1).toString()); // A
7. fs快速創(chuàng)建可讀/可寫流
可讀流和可寫流都需要我們?nèi)?shí)現(xiàn)父類的方法杜顺,那么fs這個(gè)模塊幫我們做了這件事情财搁,fs里面實(shí)現(xiàn)了高效并且可靠的可讀/可寫流,提供快速創(chuàng)建流躬络,不再去實(shí)現(xiàn)父類_write()或_read()尖奔。下面我們來(lái)看看如何使用:
const fs = require('fs');
/**
* 創(chuàng)建可讀流
*
* 第一個(gè)參數(shù)文件路徑
*
* 第二個(gè)參數(shù)為options
*
flags?: string;
encoding?: string; 字符編碼
fd?: number; 文件打開后的標(biāo)識(shí)符
mode?: number; 文件的權(quán)限
autoClose?: boolean; 讀取完畢后,是否自動(dòng)關(guān)閉文件
start?: number; 從哪個(gè)位置開始讀取
end?: number; 讀到什么時(shí)候結(jié)束
highWaterMark?: number; 最高水位線
*/
const rs = fs.createReadStream('1.text');
rs.on('data', data => {
console.log(data);
})
/**
* 創(chuàng)建可寫流
*
* 第一個(gè)參數(shù)文件路徑
*
* 第二個(gè)參數(shù)為options
*
flags?: string;
encoding?: string; 字符編碼
fd?: number; 文件打開后的標(biāo)識(shí)符
mode?: number; 文件的權(quán)限
autoClose?: boolean; 寫入完畢后穷当,是否自動(dòng)關(guān)閉文件
start?: number; 從什么位置開始寫入
*/
const ws = fs.createWriteStream('2.text');
ws.write('123');
8. pipe
在流中搭建一條管道提茁,從可讀流中到可寫流。
可讀流中有pipe()方法,在可寫流中可以監(jiān)聽pipe事件馁菜,下面實(shí)現(xiàn)了從可讀流中通過(guò)管道到可寫流:
const fs = require('fs');
const stream = require('stream');
const rs = stream.Readable({
read() {
this.push(fs.readFileSync('./1.text')); // 文件內(nèi)容 test
this.push(null);
}
});
const ws = stream.Writable({
write(chunk, encoding, cb) {
// chunk為test buffer
fs.writeFileSync('./2.text', chunk.toString());
cb();
}
});
ws.on('pipe', data => {
// 觸發(fā)pipe事件
console.log(data);
});
rs.pipe(ws);
9. 總結(jié)
流分為四種基本類型茴扁,兩種模式。流中的數(shù)據(jù)不是直接寫入或讀取汪疮,有緩存區(qū)的概念峭火。