編譯地址:https://github.com/substack/stream-handbook
譯者:jabez128
原文地址:https://github.com/jabez128/stream-handbook
寫在前面的話:
如果你正在學習Nodejs,那么流一定是一個你需要掌握的概念。如果你想成為一個Node高手葱轩,那么流一定是武功秘籍中不可缺少的一個部分宴树。關于流這個主題,由Node高手substack帶來的stream-handbook
絕對是經典入門讀物之一额各,其在Github上的star數量已經超過了4500個国觉,足以見其權威程度。本文下面的內容將參考自substack的這篇文章虾啦。本文也放在Github上麻诀,如果你本文覺得對你有幫助,鼓勵大家去github上幫我點個贊傲醉。https://github.com/jabez128/stream-handbook
引子
在編寫代碼時蝇闭,我們應該有一些方法將程序像連接水管一樣連接起來 -- 當我們需要獲取一些數據時,可以去通過"擰"其他的部分來達到目的硬毕。這也應該是IO應有的方式呻引。 -- Doug McIlroy. October 11, 1964
從早先的unix開始,stream便開始進入了人們的視野昭殉,在過去的幾十年的時間里苞七,它被證明是一種可依賴的編程方式,它可以將一個大型的系統(tǒng)拆成一些很小的部分挪丢,并且讓這些部分之間完美地進行合作蹂风。在unix中,我們可以使用|
符號來實現流乾蓬。在node中惠啄,node內置的stream模塊已經被多個核心模塊使用,同時也可以被用戶自定義的模塊使用任内。和unix類似撵渡,node中的流模塊的基本操作符叫做.pipe()
,同時你也可以使用一個后壓機制來應對那些對數據消耗較慢的對象死嗦。
在node中趋距,流可以幫助我們將事情的重點分為幾份,因為使用流可以幫助我們將實現接口的部分分割成一些連續(xù)的接口越除,這些接口都是可重用的节腐。接著外盯,你可以將一個流的輸出口接到另一個流的輸入口,然后使用使用一些庫來對流實現高級別的控制翼雀。
對于小型程序設計(small-program design)以及unix哲學來說饱苟,流都是一個重要的組成部分,但是除此之外還有一些重要的事情值得我們思考狼渊。永遠要記得:十鳥在森林不如一鳥在手里箱熬。
為什么應該使用流
在node中,I/O都是異步的狈邑,所以在和硬盤以及網絡的交互過程中會涉及到傳遞回調函數的過程城须。你之前可能會寫出這樣的代碼:
var http = require('http');
var fs = require('fs');
var server = http.createServer(function (req, res) {
fs.readFile(__dirname + '/data.txt', function (err, data) {
res.end(data);
});
});
server.listen(8000);
上面的這段代碼并沒有什么問題,但是在每次請求時官地,我們都會把整個data.txt
文件讀入到內存中酿傍,然后再把結果返回給客戶端。想想看驱入,如果data.txt
文件非常大赤炒,在響應大量用戶的并發(fā)請求時,程序可能會消耗大量的內存亏较,這樣很可能會造成用戶連接緩慢的問題莺褒。
其次,上面的代碼可能會造成很不好的用戶體驗雪情,因為用戶在接收到任何的內容之前首先需要等待程序將文件內容完全讀入到內存中遵岩。
所幸的是,(req,res)
參數都是流對象巡通,這意味著我們可以使用一種更好的方法來實現上面的需求:
var http = require('http');
var fs = require('fs');
var server = http.createServer(function (req, res) {
var stream = fs.createReadStream(__dirname + '/data.txt');
stream.pipe(res);
});
server.listen(8000);
在這里尘执,.pipe()
方法會自動幫助我們監(jiān)聽data
和end
事件。上面的這段代碼不僅簡潔宴凉,而且data.txt
文件中每一小段數據都將源源不斷的發(fā)送到客戶端誊锭。
除此之外,使用.pipe()
方法還有別的好處弥锄,比如說它可以自動控制后端壓力丧靡,以便在客戶端連接緩慢的時候node可以將盡可能少的緩存放到內存中。
想要將數據進行壓縮籽暇?我們可以使用相應的流模塊完成這項工作!
var http = require('http');
var fs = require('fs');
var oppressor = require('oppressor');
var server = http.createServer(function (req, res) {
var stream = fs.createReadStream(__dirname + '/data.txt');
stream.pipe(oppressor(req)).pipe(res);
});
server.listen(8000);
通過上面的代碼温治,我們成功的將發(fā)送到瀏覽器端的數據進行了gzip壓縮。我們只是使用了一個oppressor模塊來處理這件事情戒悠。
一旦你學會使用流api熬荆,你可以將這些流模塊像搭樂高積木或者像連接水管一樣拼湊起來,從此以后你可能再也不會去使用那些沒有流API的模塊獲取和推送數據了绸狐。
流模塊基礎
在node中惶看,一共有五種類型的流:readable,writable,transform,duplex以及"classic"
pipe
無論哪一種流捏顺,都會使用.pipe()
方法來實現輸入和輸出六孵。
.pipe()
函數很簡單纬黎,它僅僅是接受一個源頭src
并將數據輸出到一個可寫的流dst
中:
src.pipe(dst)
.pipe(dst)
將會返回dst
因此你可以鏈式調用多個流:
a.pipe(b).pipe(c).pipe(d)
上面的代碼也可以等價為:
a.pipe(b);
b.pipe(c);
c.pipe(d);
這和你在unix中編寫流代碼很類似:
a | b | c | d
只不過此時你是在node中編寫而不是在shell中!
readable流
Readable流可以產出數據劫窒,你可以將這些數據傳送到一個writable本今,transform或者duplex流中,只需要調用pipe()
方法:
readableStream.pipe(dst)
創(chuàng)建一個readable流
現在我們就來創(chuàng)建一個readable流主巍!
var Readable = require('stream').Readable;
var rs = new Readable;
rs.push('beep ');
rs.push('boop\n');
rs.push(null);
rs.pipe(process.stdout);
下面運行代碼:
$ node read0.js
beep boop
在上面的代碼中rs.push(null)
的作用是告訴rs
輸出數據應該結束了冠息。
需要注意的一點是我們在將數據輸出到process.stdout
之前已經將內容推送進readable流rs
中,但是所有的數據依然是可寫的孕索。
這是因為在你使用.push()
將數據推進一個readable流中時逛艰,一直要到另一個東西來消耗數據之前,數據都會存在一個緩存中搞旭。
然而散怖,在更多的情況下,我們想要的是當需要數據時數據才會產生肄渗,以此來避免大量的緩存數據镇眷。
我們可以通過定義一個._read
函數來實現按需推送數據:
var Readable = require('stream').Readable;
var rs = Readable();
var c = 97;
rs._read = function () {
rs.push(String.fromCharCode(c++));
if (c > 'z'.charCodeAt(0)) rs.push(null);
};
rs.pipe(process.stdout);
代碼的運行結果如下所示:
$ node read1.js
abcdefghijklmnopqrstuvwxyz
在這里我們將字母a
到z
推進了rs中,但是只有當數據消耗者出現時翎嫡,數據才會真正實現推送欠动。
_read
函數也可以獲取一個size
參數來指明消耗者想要讀取多少比特的數據,但是這個參數是可選的惑申。
需要注意到的是你可以使用util.inherit()
來繼承一個Readable流具伍。
為了說明只有在數據消耗者出現時,_read
函數才會被調用圈驼,我們可以將上面的代碼簡單的修改一下:
var Readable = require('stream').Readable;
var rs = Readable();
var c = 97 - 1;
rs._read = function () {
if (c >= 'z'.charCodeAt(0)) return rs.push(null);
setTimeout(function () {
rs.push(String.fromCharCode(++c));
}, 100);
};
rs.pipe(process.stdout);
process.on('exit', function () {
console.error('\n_read() called ' + (c - 97) + ' times');
});
process.stdout.on('error', process.exit);
運行上面的代碼我們可以發(fā)現如果我們只請求5比特的數據人芽,那么_read
只會運行5次:
$ node read2.js | head -c5
abcde
_read() called 5 times
在上面的代碼中,setTimeout
很重要碗脊,因為操作系統(tǒng)需要花費一些時間來發(fā)送程序結束信號啼肩。
另外,process.stdout.on('error',fn)
處理器也很重要,因為當head
不再關心我們的程序輸出時衙伶,操作系統(tǒng)將會向我們的進程發(fā)送一個SIGPIPE
信號祈坠,此時process.stdout
將會捕獲到一個EPIPE
錯誤。
上面這些復雜的部分在和操作系統(tǒng)相關的交互中是必要的矢劲,但是如果你直接和node中的流交互的話赦拘,則可有可無。
如果你創(chuàng)建了一個readable流芬沉,并且想要將任何的值推送到其中的話躺同,確保你在創(chuàng)建流的時候指定了objectMode參數,Readable({ objectMode: true })
阁猜。
消耗一個readable流
大部分時候,將一個readable流直接pipe到另一種類型的流或者使用through或者concat-stream創(chuàng)建的流中蹋艺,是一件很容易的事情剃袍。但是有時我們也會需要直接來消耗一個readable流。
process.stdin.on('readable', function () {
var buf = process.stdin.read();
console.dir(buf);
});
代碼運行結果如下所示:
$ (echo abc; sleep 1; echo def; sleep 1; echo ghi) | node consume0.js
<Buffer 61 62 63 0a>
<Buffer 64 65 66 0a>
<Buffer 67 68 69 0a>
null
當數據可用時捎谨,readable
事件將會被觸發(fā)民效,此時你可以調用.read()
方法來從緩存中獲取這些數據。
當流結束時涛救,.read()
將返回null
畏邢,因為此時已經沒有更多的字節(jié)可以供我們獲取了。
你也可以告訴.read()
方法來返回n
個字節(jié)的數據检吆。雖然所有核心對象中的流都支持這種方式舒萎,但是對于對象流來說這種方法并不可用。
下面是一個例子蹭沛,在這里我們制定每次讀取3個字節(jié)的數據:
process.stdin.on('readable', function () {
var buf = process.stdin.read(3);
console.dir(buf);
});
運行上面的例子臂寝,我們將獲取到不完整的數據:
$ (echo abc; sleep 1; echo def; sleep 1; echo ghi) | node consume1.js
<Buffer 61 62 63>
<Buffer 0a 64 65>
<Buffer 66 0a 67>
這是因為多余的數據都留在了內部的緩存中,因此這個時候我們需要告訴node我們還對剩下的數據感興趣致板,我們可以使用.read(0)
來完成這件事:
process.stdin.on('readable', function () {
var buf = process.stdin.read(3);
console.dir(buf);
process.stdin.read(0);
});
到現在為止我們的代碼和我們所期望的一樣了交煞!
$ (echo abc; sleep 1; echo def; sleep 1; echo ghi) | node consume2.js
<Buffer 61 62 63>
<Buffer 0a 64 65>
<Buffer 66 0a 67>
<Buffer 68 69 0a>
我們也可以使用.unshift()
方法來放置多余的數據。
使用unshift()
方法能夠放置我們進行不必要的緩存拷貝斟或。在下面的代碼中我們將創(chuàng)建一個分割新行的可讀解析器:
var offset = 0;
process.stdin.on('readable', function () {
var buf = process.stdin.read();
if (!buf) return;
for (; offset < buf.length; offset++) {
if (buf[offset] === 0x0a) {
console.dir(buf.slice(0, offset).toString());
buf = buf.slice(offset + 1);
offset = 0;
process.stdin.unshift(buf);
return;
}
}
process.stdin.unshift(buf);
});
代碼的運行結果如下所示:
$ tail -n +50000 /usr/share/dict/american-english | head -n10 | node lines.js
'hearties'
'heartiest'
'heartily'
'heartiness'
'heartiness\'s'
'heartland'
'heartland\'s'
'heartlands'
'heartless'
'heartlessly'
當然素征,已經有很多這樣的模塊比如split來幫助你完成這件事情,你完全不需要自己寫一個萝挤。
writable流
一個writable流指的是只能流進不能流出的流:
src.pipe(writableStream)
創(chuàng)建一個writable流
只需要定義一個._write(chunk,enc,next)
函數御毅,你就可以將一個readable流的數據釋放到其中:
var Writable = require('stream').Writable;
var ws = Writable();
ws._write = function (chunk, enc, next) {
console.dir(chunk);
next();
};
process.stdin.pipe(ws);
代碼運行結果如下所示:
$ (echo beep; sleep 1; echo boop) | node write0.js
<Buffer 62 65 65 70 0a>
<Buffer 62 6f 6f 70 0a>
第一個參數,chunk
代表寫進來的數據怜珍。
第二個參數enc
代表編碼的字符串端蛆,但是只有在opts.decodeString
為false
的時候你才可以寫一個字符串。
第三個參數酥泛,next(err)
是一個回調函數今豆,使用這個回調函數你可以告訴數據消耗者可以寫更多的數據。你可以有選擇性的傳遞一個錯誤對象error
柔袁,這時會在流實體上觸發(fā)一個emit
事件呆躲。
在從一個readable流向一個writable流傳數據的過程中,數據會自動被轉換為Buffer
對象捶索,除非你在創(chuàng)建writable流的時候制定了decodeStrings
參數為false
,Writable({decodeStrings: false})
插掂。
如果你需要傳遞對象,需要指定objectMode
參數為true
,Writable({ objectMode: true })
辅甥。
向一個writable流中寫東西
如果你需要向一個writable流中寫東西酝润,只需要調用.write(data)
即可。
process.stdout.write('beep boop\n');
為了告訴一個writable流你已經寫完畢了璃弄,只需要調用.end()
方法要销。你也可以使用.end(data)
在結束前再寫一些數據。
var fs = require('fs');
var ws = fs.createWriteStream('message.txt');
ws.write('beep ');
setTimeout(function () {
ws.end('boop\n');
}, 1000);
運行結果如下所示:
$ node writing1.js
$ cat message.txt
beep boop
如果你在創(chuàng)建writable流時指定了highWaterMark
參數谢揪,那么當沒有更多數據寫入時蕉陋,調用.write()
方法將會返回false。
如果你想要等待緩存情況拨扶,可以監(jiān)聽drain
事件。
transform流
你可以將transform流想象成一個流的中間部分茁肠,它可以讀也可寫患民,但是并不保存數據,它只負責處理流經它的數據垦梆。
duplex流
Duplex流是一個可讀也可寫的流匹颤,就好像一個電話,可以接收也可以發(fā)送語音托猩。一個rpc交換是一個duplex流的最好的例子印蓖。如果你看到過下面這樣的代碼:
a.pipe(b).pipe(a)
那么你需要處理的就是一個duplex流對象。
classic流
Classic流是一個古老的接口京腥,最早出現在node 0.4中赦肃。雖然現在不怎么用,但是我們最好還是來了解一下它的工作原理公浪。
無論何時他宛,只要一個流對象注冊了一個data
監(jiān)聽器,它就會自動的切換到classic
模式欠气,并且根據舊API的方式運行厅各。
classic readable流
Classic readable流只是一個事件發(fā)射器,當有數據消耗者出現時發(fā)射emit
事件预柒,當輸出數據完畢時發(fā)射end
事件队塘。
我們可以同構檢查stream.readable
來檢查一個classic流對象是否可讀。
下面是一個簡單的readable流對象的例子宜鸯,程序的運行結果將會輸出A
到J
:
var Stream = require('stream');
var stream = new Stream;
stream.readable = true;
var c = 64;
var iv = setInterval(function () {
if (++c >= 75) {
clearInterval(iv);
stream.emit('end');
}
else stream.emit('data', String.fromCharCode(c));
}, 100);
stream.pipe(process.stdout);
運行結果如下所示:
$ node classic0.js
ABCDEFGHIJ
為了從一個classic readable流中讀取數據憔古,你可以注冊data
和end
監(jiān)聽器。下面是一個使用舊readable流方式從process.stdin
中讀取數據的例子:
process.stdin.on('data', function (buf) {
console.log(buf);
});
process.stdin.on('end', function () {
console.log('__END__');
});
運行結果如下所示:
$ (echo beep; sleep 1; echo boop) | node classic1.js
<Buffer 62 65 65 70 0a>
<Buffer 62 6f 6f 70 0a>
__END__
需要注意的一點是當你在一個流對象上注冊了一個data
監(jiān)聽器顾翼,你就將這個流放在了兼容模式下投放,此時你不能使用兩個stream2的api。
如果你自己創(chuàng)建流對象适贸,永遠不要綁定data
和end
監(jiān)聽器灸芳。如果你需要和舊版本的流兼容涝桅,最好使用第三方庫來實現.pipe()
方法。
例如烙样,你可以使用through模塊來避免顯式的使用data
和end
監(jiān)聽器:
var through = require('through');
process.stdin.pipe(through(write, end));
function write (buf) {
console.log(buf);
}
function end () {
console.log('__END__');
}
程序運行結果如下所示:
$ (echo beep; sleep 1; echo boop) | node through.js
<Buffer 62 65 65 70 0a>
<Buffer 62 6f 6f 70 0a>
__END__
你也可以使用concat-stream模塊來將整個流的內容緩存起來:
var concat = require('concat-stream');
process.stdin.pipe(concat(function (body) {
console.log(JSON.parse(body));
}));
程序運行結果如下所示:
$ echo '{"beep":"boop"}' | node concat.js
{ beep: 'boop' }
Classic readable流擁有.pause()
和.resume()
邏輯來暫停一個流冯遂,但是這都是可選的。如果你想要使用.pause()
和.resume()
方法谒获,你應該使用through模塊來幫助你處理緩存蛤肌。
classic writable流
Classic writable流非常簡單。其中只定義了.write(buf)
批狱,.end(buf)
裸准,以及.desctory()
方法。其中.end(buf)
的參數buf是可選參數赔硫,但是一般來說node程序員還是喜歡使用.end(buf)
這種寫法炒俱。
接下來讀什么
- node核心stream模塊文檔
- 你可以使用readable-stream模塊來確保你的stream2代碼兼容node 0.8及其之前的代碼。在你
npm install readable-stream
之后直接require('readable-stream')
而不要require('stream')
爪膊。
本文參考自stream-handbook.