文章翻譯自:Node.js Streams: Everything you need to know
在開(kāi)發(fā)者中普遍認(rèn)為Node.js流不但難以應(yīng)用冀瓦,而且難以理解。現(xiàn)在有一個(gè)好消息届案,Node.js流將不在難以處理。過(guò)去幾年,為了方便操作Node.js流原叮,開(kāi)發(fā)者開(kāi)發(fā)了許多第三方Node.js包。但是在這篇文章中巡蘸,我將集中在Node.js原生的流接口應(yīng)用的介紹奋隶。
“Streams are Node’s best and most misunderstood idea.”
— Dominic Tarr
什么是流
流就是數(shù)據(jù)集合----諸如數(shù)組或是字符串。不同之處在于流不必一次全部使用悦荒,它們也不必適應(yīng)內(nèi)存唯欣。這兩個(gè)特點(diǎn)使流在處理大量數(shù)據(jù)或一次向外部返回一大塊數(shù)據(jù)時(shí)非常高效。
流代碼的組合性搬味,為流處理大量數(shù)據(jù)境氢,提供了新的力量。就像把微小linux命令組合成功能豐富的組合命令一樣碰纬,Node.js流通過(guò)同樣的方式實(shí)現(xiàn)數(shù)據(jù)通道的功能萍聊。
const grep = ... // A stream for the grep output
const wc = ... // A stream for the wc input
grep.pipe(wc)
許多Node.js內(nèi)置模塊都實(shí)現(xiàn)流接口:
上面展示的API中,一部分原生Node.js對(duì)象既是可讀又是可寫(xiě)流悦析,諸如TCP Sockets,Zlib和Crypto流寿桨。
值得注意的是望侈,對(duì)象的部分行為是密切相關(guān)的亦渗。例如:在客戶端HTTP對(duì)象是可讀流俺亮,在服務(wù)端HTTP對(duì)象是可寫(xiě)流腔呜。這是因?yàn)樵贖TTP上趴梢,程序從一個(gè)對(duì)象上讀取數(shù)據(jù)(http.IncomingMessage)顾腊,然后將讀取的數(shù)據(jù)寫(xiě)到另外一個(gè)對(duì)象上(http.ServerResponse)斋扰。
一個(gè)關(guān)于流實(shí)際用例
理論聽(tīng)起來(lái)美妙谴仙,但并不能完全傳遞流的精妙陵刹。讓我們看一個(gè)例子默伍,通過(guò)這個(gè)例子,可以看出是否使用流對(duì)于內(nèi)存占用的不同影響衰琐。
讓我們先創(chuàng)建一個(gè)大的文件:
const fs = require('fs');
const file = fs.createWriteStream('./big.file');
for(let i=0; i<= 1e6; i++) {
file.write('Lorem ipsum dolor sit amet, consectetur adipisicing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.\n');
}
file.end();
在上面的示例代碼中也糊,fs模塊可以通過(guò)流接口實(shí)現(xiàn)對(duì)文件的讀和寫(xiě)。通過(guò)循環(huán)一百萬(wàn)次可寫(xiě)流羡宙,將數(shù)據(jù)寫(xiě)入到big.file文件中狸剃。
執(zhí)行相應(yīng)的代碼,生成大約400兆的文件狗热。
下面是一個(gè)專門(mén)用來(lái)操作這個(gè)大文件的Node服務(wù)器代碼:
const fs = require('fs');
const server = require('http').createServer();
server.on('request', (req, res) => {
fs.readFile('./big.file', (err, data) => {
if (err) throw err;
res.end(data);
});
});
server.listen(8000);
當(dāng)服務(wù)接收請(qǐng)求钞馁,程序?qū)?huì)通過(guò)異步fs.readFile函數(shù)向請(qǐng)求者發(fā)送數(shù)據(jù)報(bào)文虑省。表面上看這樣的代碼并不會(huì)阻塞程序的事件循環(huán),真的是這樣嗎僧凰?
好探颈,當(dāng)我們啟動(dòng)這段服務(wù),然后請(qǐng)求這個(gè)服務(wù)后训措,讓我們看看內(nèi)存占用將會(huì)有怎樣的變化伪节。
當(dāng)啟動(dòng)服務(wù)時(shí),服務(wù)占用的內(nèi)存量是8.7兆绩鸣。
然后請(qǐng)求這個(gè)服務(wù)怀大,注意內(nèi)存占用的情況:
哇 ---- 內(nèi)存占用突然間跳到434.8兆。
本質(zhì)上講呀闻,程序在將大數(shù)據(jù)文件寫(xiě)入到http響應(yīng)對(duì)象前化借,會(huì)將所有的數(shù)據(jù)寫(xiě)入內(nèi)存中。這種代碼的效率是非常低效的总珠。
HTTP響應(yīng)對(duì)象也是一個(gè)可寫(xiě)流屏鳍,如果我們將代表big.file內(nèi)容可讀流與HTTP相應(yīng)對(duì)象的可寫(xiě)流在管道中連接勘纯,程序就可以通過(guò)兩個(gè)流管道局服,在不產(chǎn)生近400兆內(nèi)存占用的情況下,達(dá)到相同的結(jié)果驳遵。
Node.js中的fs模塊通過(guò)createReadStream方法淫奔,生成讀取文件的可讀流。然后程序可以通過(guò)管道將可讀流傳到http響應(yīng)對(duì)象中:
const fs = require('fs');
const server = require('http').createServer();
server.on('request', (req, res) => {
const src = fs.createReadStream('./big.file');
src.pipe(res);
});
server.listen(8000);
當(dāng)再次請(qǐng)求服務(wù)時(shí)堤结,一個(gè)神奇的事情發(fā)生了(注意內(nèi)存占用):
發(fā)生了什么
當(dāng)客戶端請(qǐng)求大文件時(shí)唆迁,程序一次將一塊數(shù)據(jù)生成流文件,這就意味著我們不需要將數(shù)據(jù)緩存到內(nèi)存中竞穷。內(nèi)存的占用也僅僅上升了25兆唐责。
我們可以將這個(gè)測(cè)試用例推到極限。重新生成五百萬(wàn)行而不是一百萬(wàn)行的big.file文件瘾带,重新生成的文件將會(huì)達(dá)到2GB鼠哥,這將大于Node.js默認(rèn)的緩存量。
使用fs.readFile實(shí)現(xiàn)大內(nèi)存文件的讀取看政,最好不要修改程序默認(rèn)的緩存空間朴恳。但是如果使用fs.createReadStream,即便請(qǐng)求2GB的數(shù)據(jù)流也不會(huì)有問(wèn)題。使用第二種方式作為服務(wù)程序的內(nèi)存占用幾乎不發(fā)生變化允蚣。
流
流在Node.js中有四種:Readable于颖、Writable、Duplex和Transform嚷兔。
- 可讀流(Readable Stream): 可被消費(fèi)的資源的抽象森渐,如 fs.createReadStream方法
- 可寫(xiě)流(Writable Stream):數(shù)據(jù)可被寫(xiě)入目的地的抽象做入,如 fs.createWriteStream方法
- 雙工流(Duplex Stream):既是可讀流,又是可寫(xiě)流同衣, 如 TCP socket
- 轉(zhuǎn)換流(Transform Stream):以雙工流為基礎(chǔ)母蛛,把讀取數(shù)據(jù)或者寫(xiě)入數(shù)據(jù)進(jìn)行修改或者轉(zhuǎn)換。如 zlib.createGzip函數(shù)使用gzip方法實(shí)現(xiàn)數(shù)據(jù)的壓縮乳怎。我們可以認(rèn)為轉(zhuǎn)換流的輸入是可寫(xiě)流彩郊、輸出是可讀流。這就是聽(tīng)說(shuō)過(guò)的"通過(guò)流"的轉(zhuǎn)換流蚪缀。
所有流都是EventEmitter模塊的實(shí)例秫逝,觸發(fā)可讀和可寫(xiě)數(shù)據(jù)的事件。但是询枚,程序可以使用pipe函數(shù)消費(fèi)流數(shù)據(jù)违帆。
管道(pipe)函數(shù)
下面是一段你值得記憶的魔法代碼:
readableSrc.pipe(writableDest)
在這一簡(jiǎn)單的代碼中,將可讀流的輸出 (數(shù)據(jù)源) 作為可寫(xiě)流的輸入 (目標(biāo)) 進(jìn)行管道化。源數(shù)據(jù)必須是可讀流金蜀,目標(biāo)必須是可寫(xiě)流刷后。它們也可以同時(shí)是雙工流或者轉(zhuǎn)換流。事實(shí)上, 如果開(kāi)發(fā)者將雙工流傳入管道中, 我們就可以像Linux那樣鏈接到管道調(diào)用:
readableSrc
.pipe(transformStream1)
.pipe(transformStream2)
.pipe(finalWrtitableDest)
管道函數(shù)返回的是目標(biāo)流渊抄,它可以允許程序做上面的鏈?zhǔn)秸{(diào)用尝胆。下面的代碼: 流a是可讀流、流b與c是雙工流护桦、流c是可寫(xiě)流含衔。
a.pipe(b).pipe(c).pipe(d)
# Which is equivalent to:
a.pipe(b)
b.pipe(c)
c.pipe(d)
# Which, in Linux, is equivalent to:
$ a | b | c | d
管道(pipe)方法是實(shí)現(xiàn)流消費(fèi)的最簡(jiǎn)單方式。通常建議使用管道函數(shù)(pipe)或者事件消費(fèi)流二庵,但是避免將它們混合使用贪染。通常當(dāng)你使用管道(pipe)函數(shù)時(shí),你就不需要使用事件催享。但是如果程序需要定制流的消費(fèi)杭隙,事件可以是一個(gè)不錯(cuò)的選擇。
流事件
除了讀取可讀流源因妙,并把讀取的數(shù)據(jù)寫(xiě)入到可寫(xiě)的目的地上痰憎。管道(pipe)還可以自動(dòng)管理一些事情。例如:它可以處理異常兰迫,當(dāng)一個(gè)流比其它流更快或更慢時(shí)結(jié)束文件信殊。
但是,流可以通過(guò)事件被直接消費(fèi)汁果。下面是一段等效于管道(pipe)方法的程序涡拘,它通過(guò)簡(jiǎn)化的、與事件等效的代碼實(shí)現(xiàn)數(shù)據(jù)的讀取或?qū)懭搿?/p>
# readable.pipe(writable)
readable.on('data', (chunk) => {
writable.write(chunk);
});
readable.on('end', () => {
writable.end();
});
這里有一系列可用于可讀据德、可寫(xiě)流的事件或函數(shù)鳄乏。
這些事件或函數(shù)通常以某種方式相關(guān)聯(lián)跷车,關(guān)于可讀流的事件有:
- data事件,當(dāng)流傳遞給消費(fèi)者時(shí)觸發(fā)
- end事件橱野,當(dāng)流中沒(méi)有數(shù)據(jù)被消費(fèi)時(shí)觸發(fā)
關(guān)于可寫(xiě)流的重要事件有:
-drain事件朽缴,可寫(xiě)流接受數(shù)據(jù)時(shí)的信號(hào)
-finish事件,所有的數(shù)據(jù)已經(jīng)刷新到系統(tǒng)底層時(shí)觸發(fā)
通過(guò)事件和函數(shù)可以組合在一起后自定義流或流的優(yōu)化水援。為了消費(fèi)可讀流密强,程序可以使用pipe/unpipe方法,或是read/unshift/resume方法蜗元。為了消費(fèi)可寫(xiě)流或渤,程序可以將它作為pipe/unpipe的目的地,或是通過(guò)write方法寫(xiě)入數(shù)據(jù)奕扣,在寫(xiě)入完成后調(diào)用end方法薪鹦。
可讀流中的暫停(Paused)和流動(dòng)(Flowing)模式
可讀流中存在兩種模式影響程序?qū)勺x流的使用:
- 可讀要么處在暫停(paused)模式
- 要么處在流動(dòng)(flowing)模式
這些模式又是被認(rèn)為是拉和推模式。
所有的可讀流在默認(rèn)情況下都是從暫停模式開(kāi)始惯豆,在程序需要時(shí)池磁,轉(zhuǎn)換成流動(dòng)模式或者暫停模式。有時(shí)這種轉(zhuǎn)換是自發(fā)的楷兽。
當(dāng)可讀流在暫停(paused)模式時(shí)地熄,我們可以使用read方法按需讀取流數(shù)據(jù)。但是拄养,對(duì)于處在流動(dòng)(flowing)模式下的可讀流离斩,我們必須通過(guò)監(jiān)聽(tīng)事件來(lái)消費(fèi)數(shù)據(jù)银舱。
在流動(dòng)(flowing)模式下瘪匿,如果數(shù)據(jù)沒(méi)有被消費(fèi),數(shù)據(jù)可能會(huì)丟失寻馏。這就是當(dāng)程序中有流動(dòng)的可讀流時(shí)棋弥,需要data事件處理數(shù)據(jù)的原因。事實(shí)上诚欠,只需要添加data事件就可以將流從暫停轉(zhuǎn)換為流動(dòng)模式和解除程序與事件監(jiān)聽(tīng)器的綁定顽染、將流從流動(dòng)模式轉(zhuǎn)換為暫停模式。其中的一些是為了向后兼容老版本Node流的接口轰绵。
開(kāi)發(fā)者可以使用resume方法和pause方法粉寞,手動(dòng)實(shí)現(xiàn)兩種流模式的轉(zhuǎn)換。
當(dāng)程序使用管道(pipe)方法消費(fèi)可讀流時(shí)左腔,開(kāi)發(fā)這就不必關(guān)心流模式的轉(zhuǎn)換了唧垦,因?yàn)楣艿溃╬ipe)會(huì)自動(dòng)實(shí)現(xiàn)。
實(shí)現(xiàn)流
當(dāng)我們Node.js中的流時(shí)液样,有兩種不同的任務(wù):
- 繼承流的任務(wù)
- 消費(fèi)流的任務(wù)
到目前為止振亮,我們僅僅討論著消費(fèi)流巧还。讓我們實(shí)現(xiàn)一些例子吧!
實(shí)現(xiàn)流需要我們?cè)诔绦蛑幸肓髂K
實(shí)現(xiàn)可寫(xiě)流
開(kāi)發(fā)者可以使用流模塊中的Writeable構(gòu)造器坊秸,實(shí)現(xiàn)可寫(xiě)流麸祷。
const { Writable } = require('stream');
開(kāi)發(fā)者實(shí)現(xiàn)可寫(xiě)流有很多種方式。例如:通過(guò)繼承writable構(gòu)造器
class myWritableStream extends Writable {
}
但是褒搔,我更喜歡使用構(gòu)造器的實(shí)現(xiàn)方式阶牍。僅僅通過(guò)writable接口創(chuàng)建對(duì)象并傳遞一些選項(xiàng):一個(gè)必須函數(shù)選項(xiàng)是write函數(shù),傳入要寫(xiě)入的數(shù)據(jù)塊星瘾。
const { Writable } = require('stream');
const outStream = new Writable({
write(chunk, encoding, callback) {
console.log(chunk.toString());
callback();
}
});
process.stdin.pipe(outStream);
write函數(shù)有三個(gè)參數(shù):
- chunk通常是buffer數(shù)組荸恕,除非開(kāi)發(fā)者對(duì)流做了自定義的配置
- encoding參數(shù)在測(cè)試用例中是需要的,但是開(kāi)發(fā)者通乘老啵可以忽略
- callback是程序處理數(shù)據(jù)塊后融求,開(kāi)發(fā)者調(diào)用的回調(diào)函數(shù)。通常是寫(xiě)入操作成功與否的信號(hào)算撮。如果是寫(xiě)入異常的信號(hào)生宛,調(diào)用出現(xiàn)異常的回調(diào)函數(shù)。
在outStream類中肮柜,程序僅僅將數(shù)據(jù)轉(zhuǎn)換為字符串類型打印出來(lái)陷舅,并在沒(méi)有出現(xiàn)異常時(shí)調(diào)用回調(diào)函數(shù),以此來(lái)標(biāo)志程序的成功執(zhí)行审洞。這是一個(gè)簡(jiǎn)單但不是特別有效的回聲流莱睁,程序會(huì)輸出任何輸入的數(shù)據(jù)。
要使用這個(gè)流芒澜,我們可以將它與process.stdin一起使用仰剿,這是一個(gè)可讀的流,將process.stdin傳輸?shù)給utStream痴晦。
當(dāng)程序執(zhí)行時(shí)南吮,任何通過(guò)process.stdin輸入的數(shù)據(jù)都會(huì)被outStream中的console.log函數(shù)打印出來(lái)。
但是這個(gè)功能可以通過(guò)Node.js內(nèi)置模塊實(shí)現(xiàn)誊酌,因此這并不是一個(gè)非常實(shí)用的流部凑。它與process.stdout的功能非常類似,我們使用下面的代碼可以實(shí)現(xiàn)相同的功能:
process.stdin.pipe(process.stdout);
實(shí)現(xiàn)可讀流
為了實(shí)現(xiàn)一個(gè)可讀流碧浊,開(kāi)發(fā)者需要引入Readable的接口后通過(guò)這個(gè)接口構(gòu)建對(duì)象:
const { Readable } = require('stream');
const inStream = new Readable({});
這是實(shí)現(xiàn)可讀流的最簡(jiǎn)單方式涂邀,開(kāi)發(fā)者可以直接推送數(shù)據(jù)以供消費(fèi)使用。
const { Readable } = require('stream');
const inStream = new Readable();
inStream.push('ABCDEFGHIJKLM');
inStream.push('NOPQRSTUVWXYZ');
inStream.push(null); // No more data
inStream.pipe(process.stdout);
當(dāng)程序中推送一個(gè)空對(duì)象時(shí)箱锐,這就意味著不再有數(shù)據(jù)供給可讀流比勉。
開(kāi)發(fā)者可以將可讀流通過(guò)管道傳給process.stdout的方式,供其消費(fèi)可讀流。
執(zhí)行這段代碼敷搪,程序可以讀取來(lái)自可讀流的數(shù)據(jù)兴想,并將數(shù)據(jù)打印出來(lái)。非常簡(jiǎn)單赡勘,但是并不高效嫂便。
上段代碼的本質(zhì)是:把數(shù)據(jù)推送給流,然后將流通過(guò)管道傳給process.stdout消費(fèi)闸与。其實(shí)程序可以在消費(fèi)者請(qǐng)求流時(shí)毙替,按需推送數(shù)據(jù),這種方式比上一種更高效践樱。通過(guò)實(shí)現(xiàn)readable流中的read函數(shù)實(shí)現(xiàn):
const inStream = new Readable({
read(size) {
// there is a demand on the data... Someone wants to read it.
}
});
在readable流中調(diào)用read函數(shù)厂画,程序可以將部分?jǐn)?shù)據(jù)傳輸?shù)疥?duì)列上。例如:每次向隊(duì)列中推送一個(gè)字母拷邢,字母的的序號(hào)從65(代表A)開(kāi)始袱院,每次推送的字母序號(hào)都自增1:
const inStream = new Readable({
read(size) {
this.push(String.fromCharCode(this.currentCharCode++));
if (this.currentCharCode > 90) {
this.push(null);
}
}
});
inStream.currentCharCode = 65;
inStream.pipe(process.stdout);
當(dāng)消費(fèi)者正在消費(fèi)可讀流時(shí),read函數(shù)就會(huì)被激活瞭稼,程序就會(huì)推送更多的字母忽洛。通過(guò)向隊(duì)列些推送空對(duì)象,終止循環(huán)环肘。如上代碼中欲虚,當(dāng)字母的序號(hào)超過(guò)90時(shí),終止循環(huán)悔雹。
這段代碼的功能與之前實(shí)現(xiàn)的代碼是等效的复哆,但是當(dāng)消費(fèi)者要讀流時(shí),程序可以按需推送數(shù)據(jù)的效率更優(yōu)腌零。因此建議使用這種實(shí)現(xiàn)方式梯找。
實(shí)現(xiàn)雙工、轉(zhuǎn)換流
雙工流:在同一對(duì)象上分別實(shí)現(xiàn)可讀流和可寫(xiě)流莱没,就像對(duì)象繼承了兩個(gè)可讀流和可寫(xiě)流接口初肉。
下面是一個(gè)雙工流,它結(jié)合了上面已經(jīng)實(shí)現(xiàn)的可讀流饰躲、可寫(xiě)流的例子:
const { Duplex } = require('stream');
const inoutStream = new Duplex({
write(chunk, encoding, callback) {
console.log(chunk.toString());
callback();
},
read(size) {
this.push(String.fromCharCode(this.currentCharCode++));
if (this.currentCharCode > 90) {
this.push(null);
}
}
});
inoutStream.currentCharCode = 65;
process.stdin.pipe(inoutStream).pipe(process.stdout);
通過(guò)實(shí)現(xiàn)雙工流的對(duì)象,程序可以讀取A-Z的字母臼隔,然后按順序打印出來(lái)嘹裂。開(kāi)發(fā)者將stdin可讀流傳輸?shù)诫p工流,然后將雙工流傳輸?shù)絪tdout可寫(xiě)流中摔握,打印出A-Z字母寄狼。
在雙工流中的可讀流與可寫(xiě)流是完全獨(dú)立的,雙工流僅僅是一個(gè)對(duì)象同時(shí)具有可讀流和可寫(xiě)流的功能。理解這一點(diǎn)至關(guān)重要泊愧。
轉(zhuǎn)換流比雙工流更有趣伊磺,因?yàn)樗慕Y(jié)果是根據(jù)輸入流計(jì)算出來(lái)的。
對(duì)于雙工流删咱,并不需要實(shí)現(xiàn)read和write函數(shù)屑埋,開(kāi)發(fā)者僅僅需要實(shí)現(xiàn)transform函數(shù),因?yàn)閠ransform函數(shù)已經(jīng)實(shí)現(xiàn)了read函數(shù)和write函數(shù)痰滋。
下面是將輸入的字母轉(zhuǎn)換為大寫(xiě)格式后摘能,然后把轉(zhuǎn)換后的數(shù)據(jù)傳給可寫(xiě)流:
const { Transform } = require('stream');
const upperCaseTr = new Transform({
transform(chunk, encoding, callback) {
this.push(chunk.toString().toUpperCase());
callback();
}
});
process.stdin.pipe(upperCaseTr).pipe(process.stdout);
在這個(gè)例子中,開(kāi)發(fā)者僅僅通過(guò)transform函數(shù)敲街,就實(shí)現(xiàn)了向上面雙工流的功能团搞。在transform函數(shù)中,程序?qū)?shù)據(jù)轉(zhuǎn)換為大寫(xiě)后推送到可寫(xiě)流中多艇。
流的對(duì)象模式
默認(rèn)情況下逻恐,流只接受Buffer和String的數(shù)據(jù)。但是開(kāi)發(fā)者可以通過(guò)設(shè)置objectMode標(biāo)識(shí)的值峻黍,可以使流接受任何Javascript數(shù)據(jù)梢莽。
下面的例子可以證明這一點(diǎn)。通過(guò)一組流將以逗號(hào)分隔的字符串轉(zhuǎn)換為Javscript對(duì)象奸披,于是"a,b,c,d"轉(zhuǎn)換成{a: b, c : d}昏名。
const { Transform } = require('stream');
const commaSplitter = new Transform({
readableObjectMode: true,
transform(chunk, encoding, callback) {
this.push(chunk.toString().trim().split(','));
callback();
}
});
const arrayToObject = new Transform({
readableObjectMode: true,
writableObjectMode: true,
transform(chunk, encoding, callback) {
const obj = {};
for(let i=0; i < chunk.length; i+=2) {
obj[chunk[i]] = chunk[i+1];
}
this.push(obj);
callback();
}
});
const objectToString = new Transform({
writableObjectMode: true,
transform(chunk, encoding, callback) {
this.push(JSON.stringify(chunk) + '\n');
callback();
}
});
process.stdin
.pipe(commaSplitter)
.pipe(arrayToObject)
.pipe(objectToString)
.pipe(process.stdout)
commaSplitter轉(zhuǎn)換流將輸入的字符串(例如:“a,b阵面,c轻局,d”)轉(zhuǎn)換為數(shù)組([“a”, “b”样刷, “c”仑扑, “d”])。設(shè)置writeObjectMode標(biāo)識(shí)置鼻,因?yàn)樵趖ransform函數(shù)中推的數(shù)據(jù)是對(duì)象而不是字符串镇饮。
然后將commaSplitter輸出的可讀流傳輸?shù)睫D(zhuǎn)換流arrayToObject中。由于接收的是對(duì)象箕母,同樣需要在arrayToObject中需要設(shè)置writableObjectMode標(biāo)識(shí)储藐。由于需要在程序中推送對(duì)象(將傳入的數(shù)組轉(zhuǎn)換為對(duì)象),這也是程序中設(shè)置readableObjectMode標(biāo)識(shí)的原因嘶是。最后钙勃,轉(zhuǎn)換流objectToString接收對(duì)象,但是輸出字符串聂喇。這就是程序中只設(shè)置writableObjectModel標(biāo)識(shí)的原因辖源。輸出的可讀流時(shí)正常的字符串(序列化后的數(shù)組)。
Node的內(nèi)置轉(zhuǎn)換流
Node有許多內(nèi)置轉(zhuǎn)換流,如:lib和crypto流克饶。
下面的代碼是使用zlib.createGzip()流與fs模塊的可讀和可寫(xiě)流相結(jié)合酝蜒,實(shí)現(xiàn)壓縮文件的代碼:
const fs = require('fs');
const zlib = require('zlib');
const file = process.argv[2];
fs.createReadStream(file)
.pipe(zlib.createGzip())
.pipe(fs.createWriteStream(file + '.gz'));
程序?qū)⒆x取文件的可讀流傳輸進(jìn)Node內(nèi)置的轉(zhuǎn)換流zlib中,最后傳輸?shù)絼?chuàng)建壓縮文件的可寫(xiě)流中矾湃。因此開(kāi)發(fā)者只要將需要壓縮的文件路徑作為參數(shù)傳進(jìn)程序中亡脑,就可以實(shí)現(xiàn)任何文件的壓縮。
開(kāi)發(fā)者可以將管道函數(shù)與事件結(jié)合使用洲尊,這是選擇管道函數(shù)另一個(gè)原因远豺。例如:開(kāi)發(fā)者讓程序通過(guò)打印出標(biāo)記符顯示腳本正在執(zhí)行,并在腳本執(zhí)行完畢后打印出"Done"信息坞嘀。pipe函數(shù)返回的是目標(biāo)流躯护,程序可以在獲取目標(biāo)流后注冊(cè)事件鏈:
const fs = require('fs');
const zlib = require('zlib');
const file = process.argv[2];
fs.createReadStream(file)
.pipe(zlib.createGzip())
.on('data', () => process.stdout.write('.'))
.pipe(fs.createWriteStream(file + '.zz'))
.on('finish', () => console.log('Done'));
開(kāi)發(fā)者通過(guò)pipe函數(shù)可以很容易操作流,甚至在需要時(shí)丽涩,通過(guò)事件對(duì)經(jīng)過(guò)pipe函數(shù)處理后的目標(biāo)流做一些定制交互棺滞。
管道函數(shù)的強(qiáng)大之處在于,使用易理解的方式將多個(gè)管道函數(shù)聯(lián)合在一起矢渊。例如:不同于上個(gè)示例继准,開(kāi)發(fā)者可以通過(guò)傳入一個(gè)轉(zhuǎn)換流,標(biāo)識(shí)腳本正在執(zhí)行矮男。
const fs = require('fs');
const zlib = require('zlib');
const file = process.argv[2];
const { Transform } = require('stream');
const reportProgress = new Transform({
transform(chunk, encoding, callback) {
process.stdout.write('.');
callback(null, chunk);
}
});
fs.createReadStream(file)
.pipe(zlib.createGzip())
.pipe(reportProgress)
.pipe(fs.createWriteStream(file + '.zz'))
.on('finish', () => console.log('Done'));
reportProgress只是一個(gè)轉(zhuǎn)換流移必,在這個(gè)流中標(biāo)識(shí)腳本正在執(zhí)行。值得注意的是毡鉴,代碼中使用callback函數(shù)推送transform中的數(shù)據(jù)崔泵。這與先前示例中this.push()的功能是等效的。
組合流的應(yīng)用場(chǎng)景還有很多猪瞬。例如:開(kāi)發(fā)者要先加密文件憎瘸,然后壓縮文件或是先壓縮后加密。如果要完成這個(gè)功能陈瘦,程序只要將文件按照順序傳入流中幌甘,使用crypto模塊實(shí)現(xiàn):
const crypto = require('crypto');
// ...
fs.createReadStream(file)
.pipe(zlib.createGzip())
.pipe(crypto.createCipher('aes192', 'a_secret'))
.pipe(reportProgress)
.pipe(fs.createWriteStream(file + '.zz'))
.on('finish', () => console.log('Done'));
上面的代碼實(shí)現(xiàn)了壓縮、加密文件痊项,只有知道密碼的用戶才可以使用加密后的文件锅风。因?yàn)殚_(kāi)發(fā)者不能按照普通解壓的工具對(duì)加密后壓縮文件,進(jìn)行解壓线婚。
對(duì)于任何通過(guò)上面代碼壓縮的文件遏弱,開(kāi)發(fā)者只需要以相反的順序使用crypto和zlib流,代碼如下:
fs.createReadStream(file)
.pipe(crypto.createDecipher('aes192', 'a_secret'))
.pipe(zlib.createGunzip())
.pipe(reportProgress)
.pipe(fs.createWriteStream(file.slice(0, -3)))
.on('finish', () => console.log('Done'));
假設(shè)傳輸進(jìn)去的文件是壓縮后的文件塞弊,上面的程序首先會(huì)生成可讀流,然后傳輸?shù)絚rypto的createDecipher()流中,接著將輸出的流文件傳輸?shù)絲lib的createGunzip()流中游沿,最后寫(xiě)入到文件中饰抒。
上面就是我對(duì)這個(gè)主題的總結(jié),感謝您的閱讀诀黍,期待下次與你相遇袋坑。