什么是流
流是一個(gè)在node中與流數(shù)據(jù)工作的抽象接口晌该,stream模塊提供了一個(gè)基本的API,使得比較容易創(chuàng)建一個(gè)實(shí)現(xiàn)了流的接口的對象绿渣。node.js中提供了很多的流對象朝群,例如,一個(gè)發(fā)向http服務(wù)器的請求怯晕,process.stdout都是流的實(shí)例潜圃。
流可以是可讀的,可寫或者兩者兼?zhèn)渲鄄琛K械牧鞫际荅ventEmitter的實(shí)例谭期。
流的類型
在node中有4種基本的流類型:
1.?Writable(可寫流):可以把數(shù)據(jù)寫進(jìn)流里
2.?Readable (可讀流): 在該流上可以讀取數(shù)據(jù)
3.?Duplex (雙工流): 在該流上,既可以寫數(shù)據(jù)吧凉,又可以讀取數(shù)據(jù)
4.?Transform (轉(zhuǎn)換流): 當(dāng)數(shù)據(jù)在被寫或被讀的時(shí)候隧出,可以修改數(shù)據(jù)的格式(例如壓縮)
對象模式
node創(chuàng)建的所有流都是對字符串或者是Buffer對象操作。然而阀捅,也有與js的其他類型值工作的流實(shí)現(xiàn)胀瞪。這些流被視作為在對象模式里操作。
當(dāng)流在被創(chuàng)建的時(shí)候饲鄙,使用了objectMode選項(xiàng)凄诞,流的實(shí)例就被轉(zhuǎn)換為對象模式。試著去轉(zhuǎn)換一個(gè)流為對象模式忍级,這并不是安全的帆谍。
Buffering(數(shù)據(jù)緩沖中...)?
所有的writable和readable流將會把數(shù)據(jù)存儲在內(nèi)部的buffer中,這數(shù)據(jù)可以分別使用writable.writableBuffer或者readable.redableBuffer取得轴咱。
一定數(shù)量的數(shù)據(jù)緩沖能力取決于傳遞進(jìn)流構(gòu)造器中的highWaterMark選項(xiàng)汛蝙。對于正常的流來說烈涮,highWaterMark選項(xiàng)明確了字節(jié)的數(shù)據(jù)量。對于在對象模式中操作的流窖剑,highWaterMark明確了對象的數(shù)量坚洽。
當(dāng)實(shí)例調(diào)用stream.push(chunk)數(shù)據(jù)緩沖在readable流中,如果流的消費(fèi)者沒有調(diào)用stream.read(),數(shù)據(jù)將會待在內(nèi)部隊(duì)列直到被消費(fèi)西土。
一旦內(nèi)部讀取的buffer的數(shù)據(jù)量達(dá)到了highWaterMark標(biāo)明的閥值讶舰,流將會暫時(shí)的停止從源頭讀取數(shù)據(jù),直到緩沖的數(shù)據(jù)能被消費(fèi)的時(shí)候會再次讀取
當(dāng) writable.write(chunk)方法被重復(fù)調(diào)用的時(shí)候翠储,數(shù)據(jù)被緩沖在writable流里绘雁。當(dāng)內(nèi)部寫的緩沖數(shù)據(jù)的總數(shù)量低于highWaterMark標(biāo)明的閥值,調(diào)用writable.write()將會返回true援所,一旦內(nèi)部緩沖的數(shù)據(jù)量達(dá)到或者超過highWaterMark時(shí)庐舟,將會返回false。
stream.pipe()是為了限制數(shù)據(jù)的緩沖在一個(gè)可接受的水平住拭,使得源頭和目的地不同的緩沖速度將不會那沖垮空閑的內(nèi)存挪略。
因?yàn)镈uplex和Transform流都是可讀且可寫的,每個(gè)內(nèi)部都維護(hù)了兩個(gè)獨(dú)立的內(nèi)部緩沖用于讀和寫滔岳,使得每一邊在維護(hù)一個(gè)合適且高效的數(shù)據(jù)流時(shí)都能夠獨(dú)立的進(jìn)行操作杠娱。
可讀流
可讀流是一種數(shù)據(jù)被消費(fèi)的源頭的抽象,所有的可讀流實(shí)例實(shí)現(xiàn)了stream.Readable定義的接口谱煤。
實(shí)例如下摊求,首先創(chuàng)建了一個(gè)可讀流,傳入要讀的文件位置還有一些讀取設(shè)置刘离,從哪里開始讀室叉,設(shè)置緩沖過程中讀取的最大閥值highWaterMark,打開文件時(shí)硫惕,數(shù)據(jù)被讀取流讀取到緩沖中茧痕,當(dāng)定義一個(gè)data事件處理器時(shí),此時(shí)緩沖中的數(shù)據(jù)被消費(fèi)恼除,當(dāng)流中無數(shù)據(jù)時(shí)或者數(shù)據(jù)讀取完畢時(shí)會觸發(fā)end事件踪旷,之后關(guān)閉文件。
????let fs = require('fs');
????let path = require('path');
????let rs = fs.createReadStream(path.join(__dirname, '1.txt'), {
????????flags: 'r', // 讀取操作
????????encoding: 'utf8', // 默認(rèn)為null豁辉,null代表的返回buffer
????????autoClose: true, // 讀取完自動關(guān)閉
????????highWaterMark: 3, // 默認(rèn)是64k, 64*1024b
????????start: 3, // 從第3個(gè)字節(jié)開始讀取
????????end: 8 // 包括索引8
????});
????// 也可以手動設(shè)置編碼或者在options聲明
????// rs.setEncoding('utf8');
????rs.on('open', function() {
????????console.log('文件打開了');
????});
????// 當(dāng)流或其底層資源被關(guān)閉時(shí)觸發(fā)
????rs.on('close', function() {
????????console.log('文件關(guān)閉了');
????});
????rs.on('error', function(err) {
????????console.log(err);
????});
????// 當(dāng)流將數(shù)據(jù)塊傳送給消費(fèi)者后觸發(fā)
????rs.on('data', function(data) {
????????console.log('data', data);
????????// 停止觸發(fā)data事件令野,暫停讀取
????????rs.pause();
????????setTimeout(function() {
????????????// 恢復(fù)讀取,暫停模式(需顯示調(diào)用stream.read())切換為流動模式(數(shù)據(jù)自動從底層系統(tǒng)讀取徽级,并通過EventEmitter接口的事件盡可能快的被提供給應(yīng)用程序)
????????????rs.resume();
????????????}, 1000);
????????});
????????// 表明流有新動態(tài)彩掐,要么有新的數(shù)據(jù),要么到達(dá)流的盡頭灰追,若同時(shí)使用data事件堵幽,當(dāng)調(diào)用rs.read方法才會觸發(fā)data事件
????????// rs.on('readable', function() {
????????// console.log('readable data', rs.read());
????// });
????// 流中無數(shù)據(jù)時(shí)或者讀取數(shù)據(jù)完畢觸發(fā)
????rs.on('end', function() {
????????console.log('end');
????});
????// 文件打開了
????// data 456
????// data 789
????// end
????// 文件關(guān)閉了?
兩種讀取模式
可讀流有兩種讀取的模式:暫停模式和流動模式。這些模式與對象流模式不同弹澎,一個(gè)可讀流可以是對象流朴下,也可以不是也不管它是處在流動模式下還是暫停模式下。
1.?在流動模式下苦蒿,數(shù)據(jù)被系統(tǒng)自動讀取并且使用EventEmitter接口能夠盡可能快的提供給應(yīng)用消費(fèi)
2.?在暫停模式中殴胧,stream.read()必須被顯示調(diào)用,才能夠從流中讀取數(shù)據(jù)塊佩迟。
所有的可讀流開始處于一個(gè)暫停模式但是可以切換為流動模式通過以下一種方式:
1.?添加一個(gè)data事件處理器
2.?調(diào)用stream.resume()方法
3.?調(diào)用stream.pipe(),使得數(shù)據(jù)是可寫的
所有的可讀流可以切換為暫停模式团滥,通過以下一種方式:
1.?如果沒有管道的目的地,通過調(diào)用stream.pause()
2.?如果有管道的目的地报强,通過移除所有管道的目的地灸姊。大多數(shù)管道的目的地能夠通過調(diào)用stream.unpipe()移除
最重要的是要明白readable直到有消費(fèi)或者忽略數(shù)據(jù)被提供,才會產(chǎn)生數(shù)據(jù)秉溉。如果消費(fèi)機(jī)制被帶走了或者被停止了力惯,那么readable會停止產(chǎn)生數(shù)據(jù)。
出于背后兼容性的原因召嘶,移除data事件處理器并不會自動的暫停流父晶。統(tǒng)一,如果有管道的目的地弄跌,調(diào)用stream.pause()也不會保證流會暫停甲喝。
如果readable切換為流動模式,并且沒有消費(fèi)者去消費(fèi)處理數(shù)據(jù)铛只,數(shù)據(jù)將會丟失埠胖。
添加readable的事件處理器可以自動的使得流停止流動,數(shù)據(jù)可以通過readable.read()去消費(fèi)格仲。如果readable的事件處理器被移除押袍,流可以再次流動如果有個(gè)data事件處理器的話
可寫流?
可寫流是數(shù)據(jù)被寫入的目的地的抽象,所有的可寫流實(shí)例實(shí)現(xiàn)了stream.Writable定義的接口
以下是我寫的一個(gè)例子凯肋,首先創(chuàng)建了一個(gè)可寫流對象谊惭,傳入一個(gè)參數(shù)為需要寫入的文件位置,一個(gè)參數(shù)是配置參數(shù)侮东,highWaterMark是在寫入的時(shí)候指定寫入數(shù)據(jù)的閾值圈盔,若內(nèi)部緩沖小于閾值時(shí),會返回true悄雅,若返回false驱敲,應(yīng)該停止寫入數(shù)據(jù),此時(shí)緩沖區(qū)已經(jīng)達(dá)到閾值宽闲,滿了众眨,若所有緩沖的數(shù)據(jù)塊被消費(fèi)完了握牧,清空了會觸發(fā)一個(gè)drain事件。
????let fs = require('fs');
????let path = require('path');
????let ws = fs.createWriteStream(path.join(__dirname, '2.txt'), {
?????????flags: 'w',
?????????encoding: 'utf8',
?????????mode: 0o666,
?????????autoClose: true,
?????????start: 0,
?????????highWaterMark: 3 // 最高水平線娩梨,指定了字節(jié)總數(shù)
?????});
????// 內(nèi)部的緩沖小于創(chuàng)建的配置的highWaterMark沿腰,返回true
????let flag = ws.write('0', 'utf8', ()=>{});
????console.log(flag); // true
????flag = ws.write('1', 'utf8', ()=>{});
????console.log(flag); // true
????// 若返回false,則應(yīng)該停止向流寫入數(shù)據(jù)
????flag = ws.write('2', 'utf8', ()=>{});
????console.log(flag); // 返回false,寫完3時(shí)狈定,緩沖區(qū)已達(dá)到highWaterMark:3時(shí)颂龙,表明緩存區(qū)滿了
????// 所有緩沖的數(shù)據(jù)塊都被排空了,當(dāng)達(dá)到highWaterMark時(shí)纽什,表明緩存區(qū)滿了措嵌,滿了后被清空了才會觸發(fā)drain
????ws.on('drain', function() {
????console.log('drain');
????flag = ws.write('3', 'utf8', ()=>{});
????console.log(flag); // true
????});
????// 可寫流和可讀流都會在內(nèi)部的緩沖器中存儲數(shù)據(jù)?