簡(jiǎn)介
主要對(duì)stream這個(gè)概念做一個(gè)形象的描述和理解翼闽,同時(shí)介紹一下比較常用的API株茶。主要參考了Node.js的官方文檔楣颠。
stream的種類(lèi)
-
Readable - streams from which data can be read (for example
fs.createReadStream()
). -
Writable - streams to which data can be written (for example
fs.createWriteStream()
). -
Duplex - streams that are both Readable and Writable (for example
net.Socket
). -
Transform - Duplex streams that can modify or transform the data as it is written and read (for example
zlib.createDeflate()
).
stream解決的問(wèn)題
設(shè)計(jì) stream API 的一個(gè)關(guān)鍵目標(biāo)是將數(shù)據(jù)緩沖限制到可接受的級(jí)別拄衰,從而使得不同傳輸速率的源可以進(jìn)行數(shù)據(jù)的傳輸络拌,同時(shí)不會(huì)占用過(guò)量的內(nèi)存。比如擂送,文件的讀取悦荒。系統(tǒng)從硬盤(pán)中讀取文件的速度和我們程序處理文件內(nèi)容的速度是不相匹配的,而且讀取的文件可能是很大的嘹吨。如果不用流來(lái)讀取文件搬味,那么我們首先就需要把整個(gè)文件讀取到內(nèi)存中,然后程序從內(nèi)存中讀取文件內(nèi)容來(lái)進(jìn)行后續(xù)的業(yè)務(wù)處理蟀拷。這會(huì)極大的消耗系統(tǒng)的內(nèi)存碰纬,并且降低處理的效率(要先讀取整個(gè)文件,再處理數(shù)據(jù))问芬。
stream這個(gè)概念是很形象的悦析,就像是水流,可以通過(guò)管道此衅,從一處流向另一處强戴。比如從文件輸入,最終由程序接收挡鞍,進(jìn)行后續(xù)的處理骑歹。而 stream.pipe()
就是流中最關(guān)鍵的一個(gè)管道方法。
// 此處代碼實(shí)現(xiàn)了從file.txt讀取數(shù)據(jù)墨微,然后壓縮數(shù)據(jù)道媚,然后寫(xiě)入file.txt.gz的過(guò)程
const r = fs.createReadStream('file.txt');
const z = zlib.createGzip();
const w = fs.createWriteStream('file.txt.gz');
r.pipe(z).pipe(w);
緩存機(jī)制
Writeable 和 Readable stream 都將數(shù)據(jù)存儲(chǔ)在一個(gè)自身內(nèi)部的buffer中。分別writable.writableBuffer
or readable.readableBuffer
可以得到buffer的數(shù)據(jù)翘县。有一個(gè)參數(shù)highWaterMark
用來(lái)限制這個(gè)buffer的最大容量衰琐。從而使得流之間的數(shù)據(jù)傳輸可以被限制在一定的內(nèi)存占用下,并且擁有較高的效率炼蹦。
Writable Streams
幾個(gè)典型的可寫(xiě)流:
- HTTP requests, on the client
- HTTP responses, on the server
- fs write streams
- zlib streams
- crypto streams
- TCP sockets
- child process stdin
-
process.stdout
,process.stderr
// 示例
const myStream = getWritableStreamSomehow();
myStream.write('some data');
myStream.write('some more data');
myStream.end('done writing data'); // 關(guān)閉流羡宙,不可再寫(xiě)入。
觸發(fā)的事件:
- close
-
drain
If a call tostream.write(chunk)
returnsfalse
, the'drain'
event will be emitted when it is appropriate to resume writing data to the stream.
不能無(wú)限制的寫(xiě)入掐隐,當(dāng)writeable stream的內(nèi)部緩存數(shù)據(jù)超過(guò)highWaterMark
的閾值狗热,stream.write會(huì)返回false,這是應(yīng)該停止寫(xiě)入虑省,等到觸發(fā)drain事件再繼續(xù)寫(xiě)入匿刮。 -
error
注意,error事件觸發(fā)的時(shí)候探颈,stream不會(huì)自動(dòng)被關(guān)閉熟丸,需要手動(dòng)處理關(guān)閉。 - finish
-
pipe
對(duì)應(yīng)于readable.pipe()
,有一個(gè)readable stream和該stream連通的時(shí)候觸發(fā) - unpipe
對(duì)應(yīng)于readable.unpipe()
,有一個(gè)readable stream和該stream管道斷開(kāi)的時(shí)候觸發(fā)
Readable Streams
幾個(gè)典型的可讀流:
- HTTP responses, on the client
- HTTP requests, on the server
- fs read streams
- zlib streams
- crypto streams
- TCP sockets
- child process stdout and stderr
process.stdin
兩種模式: flowing and paused
readable stream 創(chuàng)建時(shí)都為paused模式伪节,但是可以通過(guò)以下幾個(gè)方法變?yōu)閒lowing:
- Adding a
'data'
event handler. - Calling the
stream.resume()
method. - Calling the
stream.pipe()
method to send the data to a Writable.
最常用的其實(shí)就是stream.pipe()了光羞。
觸發(fā)的事件:
- close
- data
- end
- readable
// 示例
const readable = getReadableStreamSomehow();
readable.on('data', (chunk) => {
console.log(`Received ${chunk.length} bytes of data.`);
});
readable.on('end', () => {
console.log('There will be no more data.');
});
readable.setEncoding(encoding)
調(diào)用readable.setEncoding('utf8')可以使得chunk的類(lèi)型由buffer變?yōu)閟tring绩鸣。
const readable = getReadableStreamSomehow();
readable.setEncoding('utf8');
readable.on('data', (chunk) => {
assert.equal(typeof chunk, 'string');
console.log('got %d characters of string data', chunk.length);
});
HTTP通信中的應(yīng)用
看代碼和注釋?xiě)?yīng)該就能懂了
const http = require('http');
const server = http.createServer((req, res) => {
// req is an http.IncomingMessage, which is a Readable Stream
// res is an http.ServerResponse, which is a Writable Stream
let body = '';
// Get the data as utf8 strings.
// If an encoding is not set, Buffer objects will be received.
req.setEncoding('utf8');
// Readable streams emit 'data' events once a listener is added
req.on('data', (chunk) => {
body += chunk;
});
// the end event indicates that the entire body has been received
req.on('end', () => {
try {
const data = JSON.parse(body);
// write back something interesting to the user:
res.write(typeof data);
res.end();
} catch (er) {
// uh oh! bad json!
res.statusCode = 400;
return res.end(`error: ${er.message}`);
}
});
});
server.listen(1337);
// $ curl localhost:1337 -d "{}"
// object
// $ curl localhost:1337 -d "\"foo\""
// string
// $ curl localhost:1337 -d "not json"
// error: Unexpected token o in JSON at position 1