一诅福、什么是Stream(流)
流(stream)在 Node.js 中是處理流數(shù)據(jù)的抽象接口(abstract interface)撤师。 stream 模塊提供了基礎(chǔ)的 API 叠纹。使用這些 API 可以很容易地來構(gòu)建實現(xiàn)流接口的對象玉控。
流是可讀的疲吸、可寫的座每,或是可讀寫的。
二摘悴、NodeJs中的Stream的幾種類型
Node.js 中有四種基本的流類型:
- Readable - 可讀的流(fs.createReadStream())
- Writable - 可寫的流(fs.createWriteStream())
- Duplex - 可讀寫的流(net.Socket)
- Transform - 在讀寫過程中可以修改和變換數(shù)據(jù)的 Duplex 流 (例如 zlib.createDeflate())
NodeJs中關(guān)于流的操作被封裝到了Stream模塊中峭梳,這個模塊也被多個核心模塊所引用。
const stream = require('stream');
在 NodeJS 中對文件的處理多數(shù)使用流來完成
- 普通文件
- 設(shè)備文件(stdin蹂喻、stdout)
- 網(wǎng)絡文件(http葱椭、net)
注:在NodeJs中所有的Stream(流)都是EventEmitter的實例
Example:
1.將1.txt的文件內(nèi)容讀取為流數(shù)據(jù)
const fs = require('fs');
// 創(chuàng)建一個可讀流(生產(chǎn)者)
let rs = fs.createReadStream('./1.txt');
通過fs模塊提供的createReadStream()可以輕松創(chuàng)建一個可讀的文件流捂寿。但我們并有直接使用Stream模塊,因為fs模塊內(nèi)部已經(jīng)引用了Stream模塊并做了封裝孵运。所以說 流(stream)在 Node.js 中是處理流數(shù)據(jù)的抽象接口秦陋,提供了基礎(chǔ)Api來構(gòu)建實現(xiàn)流接口的對象。
var rs = fs.createReadStream(path,[options]);
1.path 讀取文件的路徑
2.options
- flags打開文件的操作, 默認為'r'
- mode 權(quán)限位 0o666
- encoding默認為null
- start開始讀取的索引位置
- end結(jié)束讀取的索引位置(包括結(jié)束位置)
- highWaterMark讀取緩存區(qū)默認的大小64kb
Node.js 提供了多種流對象治笨。 例如:
- HTTP 請求 (request response)
- process.stdout 就都是流的實例驳概。
2.創(chuàng)建可寫流(消費者)處理可讀流
將1.txt的可讀流 寫入到2.txt文件中 這時我們需要一個可寫流
const fs = require('fs');
// 創(chuàng)建一個可寫流
let ws = fs.createWriteStream('./2.txt');
// 通過pipe讓可讀流流入到可寫流 寫入文件
rs.pipe(ws);
var ws = fs.createWriteStream(path,[options]);
1.path 讀取文件的路徑
2.options
- flags打開文件的操作, 默認為'w'
- mode 權(quán)限位 0o666
- encoding默認為utf8
- autoClose:true是否自動關(guān)閉文件
- highWaterMark讀取緩存區(qū)默認的大小16kb
pipe 它是Readable流的方法,相當于一個"管道"大磺,數(shù)據(jù)必須從上游 pipe 到下游抡句,也就是從一個 readable 流 pipe 到 writable 流。
后續(xù)將深入將介紹pipe杠愧。
如上圖待榔,我們把文件比作裝水的桶,而水就是文件里的內(nèi)容流济,我們用一根管子(pipe)連接兩個桶使得水從一個桶流入另一個桶锐锣,這樣就慢慢的實現(xiàn)了大文件的傳輸過程。
三绳瘟、為什么應該使用 Stream
當有用戶在線看視頻雕憔,假定我們通過HTTP請求返回給用戶視頻內(nèi)容
const http = require('http');
const fs = require('fs');
http.createServer((req, res) => {
fs.readFile(videoPath, (err, data) => {
res.end(data);
});
}).listen(8080);
但這樣有兩個明顯的問題
1.視頻文件需要全部讀取完,才能返回給用戶糖声,這樣等待時間會很長
2.視頻文件一次全放入內(nèi)存中斤彼,內(nèi)存吃不消
用流可以將視頻文件一點一點讀到內(nèi)存中,再一點一點返回給用戶蘸泻,讀一部分琉苇,寫一部分。(利用了 HTTP 協(xié)議的 Transfer-Encoding: chunked 分段傳輸特性)悦施,用戶體驗得到優(yōu)化并扇,同時對內(nèi)存的開銷明顯下降
const http = require('http');
const fs = require('fs');
http.createServer((req, res) => {
fs.createReadStream(videoPath).pipe(res);
}).listen(8080);
四、可讀流(Readable Stream)
可讀流(Readable streams)是對提供數(shù)據(jù)的源頭(source)的抽象抡诞。
例如:
- HTTP responses, on the client
- HTTP requests, on the server
- fs read streams
- TCP sockets
- process.stdin
所有的 Readable 都實現(xiàn)了 stream.Readable 類定義的接口穷蛹。
可讀流的兩種模式(flowing 和 paused)
1.在 flowing 模式下, 可讀流自動從系統(tǒng)底層讀取數(shù)據(jù)昼汗,并通過 EventEmitter 接口的事件盡快將數(shù)據(jù)提供給應用肴熏。
2.在 paused 模式下,必須顯式調(diào)用 stream.read()方法來從流中讀取數(shù)據(jù)片段乔遮。
所有初始工作模式為paused的Readable流扮超,可以通過下面三種途徑切換為flowing模式:
- 監(jiān)聽'data'事件
- 調(diào)用stream.resume()方法
- 調(diào)用stream.pipe()方法將數(shù)據(jù)發(fā)送到Writable
流動模式flowing
流切換到流動模式 監(jiān)聽data事件
const rs = fs.createReadStream('./1.txt');
const ws = fs.createWriteStream('./2.txt');
rs.on('data', chunk => {
ws.write(chunk);
});
ws.on('end', () => {
ws.end();
});
如果寫入的速度跟不上讀取的速度,有可能導致數(shù)據(jù)丟失。正常的情況應該是出刷,寫完一段璧疗,再讀取下一段,如果沒有寫完的話馁龟,就讓讀取流先暫停崩侠,等寫完再繼續(xù)。
var fs = require('fs');
// 讀取highWaterMark(3字節(jié))數(shù)據(jù)坷檩,讀完之后填充緩存區(qū)却音,然后觸發(fā)data事件
var rs = fs.createReadStream(sourcePath, {
highWaterMark: 3
});
var ws = fs.createWriteStream(destPath, {
highWaterMark: 3
});
rs.on('data', function(chunk) { // 當有數(shù)據(jù)流出時,寫入數(shù)據(jù)
if (ws.write(chunk) === false) { // 如果沒有寫完矢炼,暫停讀取流
rs.pause();
}
});
ws.on('drain', function() { // 緩沖區(qū)清空觸發(fā)drain事件 這時再繼續(xù)讀取
rs.resume();
});
rs.on('end', function() { // 當沒有數(shù)據(jù)時系瓢,關(guān)閉數(shù)據(jù)流
ws.end();
});
或者使用更直接的pipe
fs.createReadStream(sourcePath).pipe(fs.createWriteStream(destPath));
暫停模式paused
1.在流沒有 pipe() 時,調(diào)用 pause() 方法可以將流暫停
2.pipe() 時句灌,需要移除所有 data 事件的監(jiān)聽夷陋,再調(diào)用 unpipe() 方法
read(size)
流在暫停模式下需要程序顯式調(diào)用 read() 方法才能得到數(shù)據(jù)。read() 方法會從內(nèi)部緩沖區(qū)中拉取并返回若干數(shù)據(jù)胰锌,當沒有更多可用數(shù)據(jù)時骗绕,會返回null。read()不會觸發(fā)'data'事件资昧。
使用 read() 方法讀取數(shù)據(jù)時酬土,如果傳入了 size 參數(shù),那么它會返回指定字節(jié)的數(shù)據(jù)格带;當指定的size字節(jié)不可用時撤缴,則返回null。如果沒有指定size參數(shù)叽唱,那么會返回內(nèi)部緩沖區(qū)中的所有數(shù)據(jù)腹泌。
NodeJS 為我們提供了一個 readable 的事件,事件在可讀流準備好數(shù)據(jù)的時候觸發(fā)尔觉,也就是先監(jiān)聽這個事件,收到通知又數(shù)據(jù)了我們再去讀取就好了:
const fs = require('fs');
rs = fs.createReadStream(sourcePath);
// 當你監(jiān)聽 readable事件的時候芥吟,會進入暫停模式
rs.on('readable', () => {
console.log(rs._readableState.length);
// read如果不加參數(shù)表示讀取整個緩存區(qū)數(shù)據(jù)
// 讀取一個字段,如果可讀流發(fā)現(xiàn)你要讀的字節(jié)小于等于緩存字節(jié)大小侦铜,則直接返回
let ch = rs.read(1);
});
暫停模式 緩存區(qū)的數(shù)據(jù)以鏈表的形式保存在BufferList中
五、可寫流(Writable Stream)
可寫流是對數(shù)據(jù)流向設(shè)備的抽象钟鸵,用來消費上游流過來的數(shù)據(jù)钉稍,通過可寫流程序可以把數(shù)據(jù)寫入設(shè)備,常見的是本地磁盤文件或者 TCP棺耍、HTTP 等網(wǎng)絡響應贡未。
Writable 的例子包括了:
- HTTP requests, on the client
- HTTP responses, on the server
- fs write streams
- zlib streams
- crypto streams
- TCP sockets
- child process stdin
- process.stdout, process.stderr
所有 Writable 流都實現(xiàn)了 stream.Writable 類定義的接口。
process.stdin.pipe(process.stdout);
process.stdout 是一個可寫流,程序把可讀流 process.stdin 傳過來的數(shù)據(jù)寫入的標準輸出設(shè)備俊卤。在了解了可讀流的基礎(chǔ)上理解可寫流非常簡單嫩挤,流就是有方向的數(shù)據(jù),其中可讀流是數(shù)據(jù)源消恍,可寫流是目的地岂昭,中間的管道環(huán)節(jié)是雙向流。
可寫流使用
調(diào)用可寫流實例的 write() 方法就可以把數(shù)據(jù)寫入可寫流
const fs = require('fs');
const rs = fs.createReadStream(sourcePath);
const ws = fs.createWriteStream(destPath);
rs.setEncoding('utf-8'); // 設(shè)置編碼格式
rs.on('data', chunk => {
ws.write(chunk); // 寫入數(shù)據(jù)
});
監(jiān)聽了可讀流的 data 事件就會使可讀流進入流動模式狠怨,我們在回調(diào)事件里調(diào)用了可寫流的 write() 方法约啊,這樣數(shù)據(jù)就被寫入了可寫流抽象的設(shè)備destPath中佣赖。
write() 方法有三個參數(shù)
- chunk {String| Buffer}恰矩,表示要寫入的數(shù)據(jù)
- encoding 當寫入的數(shù)據(jù)是字符串的時候可以設(shè)置編碼
- callback 數(shù)據(jù)被寫入之后的回調(diào)函數(shù)
'drain'事件
如果調(diào)用 stream.write(chunk) 方法返回 false,表示當前緩存區(qū)已滿憎蛤,流將在適當?shù)臅r機(緩存區(qū)清空后)觸發(fā) 'drain
const fs = require('fs');
const rs = fs.createReadStream(sourcePath);
const ws = fs.createWriteStream(destPath);
rs.setEncoding('utf-8'); // 設(shè)置編碼格式
rs.on('data', chunk => {
let flag = ws.write(chunk); // 寫入數(shù)據(jù)
if (!flag) { // 如果緩存區(qū)已滿暫停讀取
rs.pause();
}
});
ws.on('drain', () => {
rs.resume(); // 緩存區(qū)已清空 繼續(xù)讀取寫入
});
六外傅、總結(jié)
stream(流)分為可讀流(flowing mode 和 paused mode)、可寫流蹂午、可讀寫流栏豺,Node.js 提供了多種流對象。 例如豆胸, HTTP 請求 和 process.stdout 就都是流的實例奥洼。stream 模塊提供了基礎(chǔ)的 API 。使用這些 API 可以很容易地來構(gòu)建實現(xiàn)流接口的對象晚胡。它們底層都調(diào)用了stream模塊并進行封裝灵奖。
后續(xù)我們將繼續(xù)對stream深入解析以及Readable Writable pipe的實現(xiàn)