流的概念
- 流是一組有序的舌胶、有起點(diǎn)和終點(diǎn)的字節(jié)數(shù)據(jù)傳輸手段
- 流不關(guān)心文件的整體內(nèi)容捆蜀,只關(guān)注是否從文件中讀到了數(shù)據(jù),以及讀到數(shù)據(jù)之后的處理
- 流是一個抽象接口幔嫂,被 Node 中的很多對象所實(shí)現(xiàn)辆它。比如 HTTP 服務(wù)器 request 和 response 對象都是流
- 流 是 Node.js 的核心模塊,基本上都是 stream的實(shí)例履恩,比如 process.stdout锰茉、http.clientRequest
流的好處
- 流是基于事件的 API,用于管理和處理數(shù)據(jù),而且有不錯的效率
- 借助事件和非阻塞 I/O 庫切心,流模塊允許在其可用的時候動態(tài)處理,在其不需要的時候釋放掉
流中的數(shù)據(jù)有兩種模式飒筑,二進(jìn)制模式和對象模式
- 二進(jìn)制模式片吊, 每個分塊都是 buffer 或者 string 對象
- 對象模式, 流內(nèi)部處理的是一系列普通對象
所有使用 Node.js API 創(chuàng)建的流對象都只能操作 strings 和 Buffer對象协屡。但是俏脊,通過一些第三方流的實(shí)現(xiàn),你依然能夠處理其它類型的 JavaScript 值 (除了 null肤晓,它在流處理中有特殊意義)爷贫。 這些流被認(rèn)為是工作在 “對象模式”(object mode)。 在創(chuàng)建流的實(shí)例時补憾,可以通過 objectMode 選項(xiàng)使流的實(shí)例切換到對象模式沸久。試圖將已經(jīng)存在的流切換到對象模式是不安全的。
Node.js 中有四種基本的流類型
- Readable-可讀流 (例如 fs.createReadStream() )
- Writable-可寫的流(例如 fs.createWriteStreame() )
- Duplex-可讀寫的流(例如 net.Socket )
- Transform-在讀寫過程中可以修改和變換數(shù)據(jù)的 Duplex 流 (例如
zlib.createDeflate() )
第一種類型:可讀流 createReadStream
創(chuàng)建一個可讀流
// 引入 fs(讀取文件) 模塊
let fs = require('fs');
// 創(chuàng)建一個可讀流
let rs = fs.createReadStream('./1.txt',{
flags:'r',
encoding:'utf8',
start:0,
autoClose:true,
end: 3,
highWaterMark:3
});
API:createReadStream(path, [options]);
- path 是讀取文件的路徑
- options 里面有
- flags:打開文件要做的操作余蟹,默認(rèn)為 'r'
- encoding:默認(rèn)是null卷胯,null 代表的是 buffer
- start:開始讀取的索引位置
- autoClose:讀取完畢后自動關(guān)閉
- end:結(jié)束讀取的索引位置(包括結(jié)束位置)
- highWaterMark:讀取緩存區(qū)默認(rèn)的默認(rèn)的大小 64kb (64*1024b)
如果指定 encoding 為 utf8 編碼, highWaterMark 要大于 3 個字節(jié)
可讀流的一些監(jiān)聽事件
- data 事件
- end 事件
- error 事件
- open 事件
- close 事件
各個寫法如下:
// 流切換到流動模式,數(shù)據(jù)會被盡可能快的讀出
rs.on('data',function(data){ // 暫停模式 -> 流動模式
console.log(data);
});
// 該事件會在讀完數(shù)據(jù)后被觸發(fā)
rs.on('end', function () {
console.log('讀取完成');
});
// 讀文件失敗后被觸發(fā)
rs.on('error', function (err) {
console.log(err);
});
// 文件打開后被觸發(fā)
rs.on('open', function () {
console.log('文件打開了');
});
// 文件關(guān)閉后被觸發(fā)
rs.on('close', function () {
console.log('關(guān)閉');
});
設(shè)置編碼
與指定 {encoding:'utf8'} 效果相同威酒,設(shè)置編碼
rs.setEncoding('utf8');
暫停和恢復(fù)觸發(fā) data
通過 pause() 方法和 resume() 方法
rs.on('data', function (data) {
console.log(data);
rs.pause(); // 暫停方法 表示暫停讀取窑睁,暫停data事件觸發(fā)
});
setTimeout(function () {
rs.resume(); // 恢復(fù)方法
},2000);
第二種類型:可寫流 createWriteStream
創(chuàng)建一個可寫流
// 引入 fs(讀取文件) 模塊
let fs = require('fs');
// 創(chuàng)建一個可寫流
let ws = fs.createWriteStream('./1.txt',{
flags:'w',
encoding:'utf8',
highWaterMark:3
});
API:createWriteStream(path, [options]);
- path 是讀取文件的路徑
- options 里面有
- flags:打開文件要做的操作,默認(rèn)為 'w'
- encoding:默認(rèn)是 utf8
- highWaterMark:寫入緩存區(qū)的葵孤,默認(rèn)大小 16kb
可寫流的一些方法
1. write 方法
ws.write(chunk, [encoding], [callback]);
- chunk 寫入的數(shù)據(jù) buffer/string
- encoding 編碼格式担钮,chunk 為字符串時有用,是個可選參數(shù)
- callback 寫入成功后的回調(diào)
返回值為布爾值尤仍,系統(tǒng)緩存區(qū)滿時為 false箫津,未滿時為 true
2. end 方法
ws.end(chunk, [encoding], [callback]);
表明接下來沒有數(shù)據(jù)要被寫入 Writable 通過傳入可選的 chunk 和 encoding 參數(shù),可以在關(guān)閉流之前再寫入一段數(shù)據(jù) 如果傳入了可選的 callback 函數(shù)宰啦,它將作為 'finish' 事件的回調(diào)函數(shù)
3. drain 方法
ws.on('drain',function(){
console.log('drain')
});
- 當(dāng)一個流不處在 drain 的狀態(tài)苏遥, 對 write() 的調(diào)用會緩存數(shù)據(jù)塊, 并且返回 false
- 當(dāng)前所有緩存的數(shù)據(jù)塊滿了赡模,滿了之后情況才會出發(fā) drain
- 一旦 write() 返回 false田炭, 在 'drain' 事件觸發(fā)前, 不能寫入任何數(shù)據(jù)塊
4. finish 方法
ws.end('結(jié)束');
ws.on('finish',function(){
console.log('drain')
});
- 在調(diào)用 end 方法漓柑,且緩沖區(qū)數(shù)據(jù)都已經(jīng)傳給底層系統(tǒng)之后教硫, 'finish' 事件將被觸發(fā)
第三種類型:可讀寫的流,也叫雙工流(Duplex)
雙工流辆布,可以在同一個對象上同時實(shí)現(xiàn)可讀瞬矩、可寫,就好像同時繼承這兩個接口锋玲。而且讀取可以沒關(guān)系(互不干擾)
// 引入雙工流模塊
let {Duplex} = require('stream');
let d = Duplex({
read(){
this.push('hello');
this.push(null)
},
write(chunk,encoding,callback){
console.log(chunk);
callback();
}
});
d.on('data',function(data){
console.log(data);
});
d.write('hello');
第四種類型:轉(zhuǎn)換流(Transform)
- 轉(zhuǎn)換流輸出是從輸入中計(jì)算出來的
- 轉(zhuǎn)換流中景用,不需要實(shí)現(xiàn) read 和 write 方法,只需要實(shí)現(xiàn)一個 transform 方法嫩絮,就可以結(jié)合兩者丛肢。
// 引入轉(zhuǎn)換流
let {Transform} = require('stream');
// 轉(zhuǎn)換流的參數(shù)和可寫流一樣
let tranform1 = Transform({
transform(chunk,encoding,callback){
this.push(chunk.toString().toUpperCase());
callback();
}
});
let tranform2 = Transform({
transform(chunk,encoding,callback){
console.log(chunk.toString());
callback();
}
});
process.stdin.pipe(tranform1).pipe(tranform2);
pipe 方法
大家都知道,想把 Readable 的數(shù)據(jù) 寫到 Writable剿干,需要手動將數(shù)據(jù)讀入內(nèi)存中蜂怎,然后在寫入 Writable。也就是每次傳遞數(shù)據(jù)的時候置尔,都需要寫一下的代碼:
readable.on('readable', (err) => {
if(err) throw err
writable.write(readable.read())
})
為了方便使用杠步,Node.js 提供了 pipe() 方法
readable.pipe(writable)
pipe 方法的原理
var fs = require('fs');
var ws = fs.createWriteStream('./2.txt');
var rs = fs.createReadStream('./1.txt');
rs.on('data', function (data) {
var flag = ws.write(data);
if(!flag)
rs.pause();
});
ws.on('drain', function () {
rs.resume();
});
rs.on('end', function () {
ws.end();
});
unpipe 用法
- readable.unpipe() 方法將之前通過 stream.pipe() 方法綁定的流分離
- 如果 destination 沒有傳入, 則所有綁定的流都會被分離
let fs = require('fs');
var from = fs.createReadStream('./1.txt');
var to = fs.createWriteStream('./2.txt');
from.pipe(to);
setTimeout(() => {
console.log('關(guān)閉向2.txt的寫入');
from.unpipe(writable);
console.log('手工關(guān)閉文件流');
to.end();
}, 1000);
cork & uncork
- 調(diào)用 writable.cork() 方法將強(qiáng)制所有寫入數(shù)據(jù)都存到內(nèi)存中的緩沖區(qū)里。 直到調(diào)用 stream.uncork() 或 stream.end() 方法時榜轿,緩沖區(qū)里的數(shù)據(jù)才會被輸出
- writable.uncork() 將輸出在 stream.cork() 方法被調(diào)用之后緩沖在內(nèi)存中的所有數(shù)據(jù)
stream.cork();
stream.write('1');
stream.write('2');
process.nextTick(() => stream.uncork());
readable
'readable' 事件將在流中有數(shù)據(jù)可供讀取時才觸發(fā)幽歼。在某些情況下,為 'readable' 事件添加回調(diào)將會導(dǎo)致一些數(shù)據(jù)被讀取到內(nèi)部緩存中
const readable = getReadableStreamSomehow();
readable.on('readable', () => {
// 某些數(shù)據(jù)可讀
});
let fs = require('fs');
let rs = fs.createReadStream('./1.txt',{
start:3,
end:8,
encoding:'utf8',
highWaterMark:3
});
rs.on('readable',function () {
console.log('readable');
console.log('rs._readableState.buffer.length',rs._readableState.length);
let d = rs.read(1);
console.log('rs._readableState.buffer.length',rs._readableState.length);
console.log(d);
setTimeout(()=>{
console.log('rs._readableState.buffer.length',rs._readableState.length);
},500)
});
- 當(dāng)流數(shù)據(jù)到達(dá)尾部時谬盐, 'readable' 事件會觸發(fā)甸私。觸發(fā)順序在 'end' 事件之前
- 事實(shí)上, 'readable' 事件表明流有了新的動態(tài):要么是有了新的數(shù)據(jù)飞傀,要么是到了流的尾部皇型。 對于前者, stream.read() 將返回可用的數(shù)據(jù)砸烦。而對于后者弃鸦, stream.read() 將返回 null。
可讀流的兩種模式
- 可讀流的兩種工作模式:flowing 和 paused
- flowing 模式下幢痘,可讀流自動從系統(tǒng)底層讀取數(shù)據(jù)唬格,通過 EventEmitter 接口的事件盡快將數(shù)據(jù)提供給應(yīng)用
- paused 模式下,調(diào)用 stream.read() 方法來從流中讀取數(shù)據(jù)片段
- 所有初始工作模式為 paused 的 Readable 流颜说,可以通過下面三種途徑切換到 flowing 模式
- 監(jiān)聽 'data' 事件
- 調(diào)用 stream.resume() 方法
- 調(diào)用 stream.pipe() 方法將數(shù)據(jù)發(fā)送到 Writable
- 可讀流可以通過下面途徑切換到 paused 模式:
- 如果不存在管道目標(biāo)(pipe destination)购岗,可以通過調(diào)用 stream.pause() 方法實(shí)現(xiàn)。
- 如果存在管道目標(biāo)门粪,可以通過取消 'data' 事件監(jiān)聽藕畔,并調(diào)用 stream.unpipe() 方法移除所有管道目標(biāo)來實(shí)現(xiàn)。
如果 Readable 切換到 flowing 模式庄拇,且沒有消費(fèi)者處理流中的數(shù)據(jù)注服,這些數(shù)據(jù)將會丟失。 比如措近, 調(diào)用了 readable.resume() 方法卻沒有監(jiān)聽 'data' 事件溶弟,或是取消了 'data' 事件監(jiān)聽,就有可能出現(xiàn)這種情況瞭郑。
可讀流的三種狀態(tài)
在任意時刻辜御,任意可讀流應(yīng)確切處于下面三種狀態(tài)之一:
- readable._readableState.flowing = null
- readable._readableState.flowing = false
- readable._readableState.flowing = true
若 readable._readableState.flowing 為 null,由于不存在數(shù)據(jù)消費(fèi)者屈张,可讀流將不會產(chǎn)生數(shù)據(jù)擒权。 在這個狀態(tài)下袱巨,監(jiān)聽 'data' 事件,調(diào)用 readable.pipe() 方法碳抄,或者調(diào)用 readable.resume() 方法愉老, readable._readableState.flowing 的值將會變?yōu)?true 。這時剖效,隨著數(shù)據(jù)生成嫉入,可讀流開始頻繁觸發(fā)事件。
調(diào)用 readable.pause() 方法璧尸, readable.unpipe() 方法咒林, 或者接收 “背壓”(back pressure), 將導(dǎo)致 readable._readableState.flowing 值變?yōu)?false爷光。 這將暫停事件流垫竞,但 不會 暫停數(shù)據(jù)生成。 在這種情況下蛀序,為 'data' 事件設(shè)置監(jiān)聽函數(shù)不會導(dǎo)致 readable._readableState.flowing 變?yōu)?true件甥。
當(dāng) readable._readableState.flowing 值為 false 時, 數(shù)據(jù)可能堆積到流的內(nèi)部緩存中哼拔。