NodeJs中的stream(流)- 基礎(chǔ)篇

一诅福、什么是Stream(流)

流(stream)在 Node.js 中是處理流數(shù)據(jù)的抽象接口(abstract interface)撤师。 stream 模塊提供了基礎(chǔ)的 API 叠纹。使用這些 API 可以很容易地來構(gòu)建實現(xiàn)流接口的對象玉控。

流是可讀的疲吸、可寫的座每,或是可讀寫的。

二摘悴、NodeJs中的Stream的幾種類型

Node.js 中有四種基本的流類型:

  • Readable - 可讀的流(fs.createReadStream())
  • Writable - 可寫的流(fs.createWriteStream())
  • Duplex - 可讀寫的流(net.Socket)
  • Transform - 在讀寫過程中可以修改和變換數(shù)據(jù)的 Duplex 流 (例如 zlib.createDeflate())

NodeJs中關(guān)于流的操作被封裝到了Stream模塊中峭梳,這個模塊也被多個核心模塊所引用。

const stream = require('stream');

在 NodeJS 中對文件的處理多數(shù)使用流來完成

  • 普通文件
  • 設(shè)備文件(stdin蹂喻、stdout)
  • 網(wǎng)絡文件(http葱椭、net)

注:在NodeJs中所有的Stream(流)都是EventEmitter的實例

Example:

1.將1.txt的文件內(nèi)容讀取為流數(shù)據(jù)

const fs = require('fs');

// 創(chuàng)建一個可讀流(生產(chǎn)者)
let rs = fs.createReadStream('./1.txt'); 

通過fs模塊提供的createReadStream()可以輕松創(chuàng)建一個可讀的文件流捂寿。但我們并有直接使用Stream模塊,因為fs模塊內(nèi)部已經(jīng)引用了Stream模塊并做了封裝孵运。所以說 流(stream)在 Node.js 中是處理流數(shù)據(jù)的抽象接口秦陋,提供了基礎(chǔ)Api來構(gòu)建實現(xiàn)流接口的對象。

var rs = fs.createReadStream(path,[options]);

1.path 讀取文件的路徑

2.options

  • flags打開文件的操作, 默認為'r'
  • mode 權(quán)限位 0o666
  • encoding默認為null
  • start開始讀取的索引位置
  • end結(jié)束讀取的索引位置(包括結(jié)束位置)
  • highWaterMark讀取緩存區(qū)默認的大小64kb

Node.js 提供了多種流對象治笨。 例如:

  • HTTP 請求 (request response)
  • process.stdout 就都是流的實例驳概。

2.創(chuàng)建可寫流(消費者)處理可讀流

將1.txt的可讀流 寫入到2.txt文件中 這時我們需要一個可寫流

const fs = require('fs');
// 創(chuàng)建一個可寫流
let ws = fs.createWriteStream('./2.txt');
// 通過pipe讓可讀流流入到可寫流 寫入文件
rs.pipe(ws); 
var ws = fs.createWriteStream(path,[options]);

1.path 讀取文件的路徑

2.options

  • flags打開文件的操作, 默認為'w'
  • mode 權(quán)限位 0o666
  • encoding默認為utf8
  • autoClose:true是否自動關(guān)閉文件
  • highWaterMark讀取緩存區(qū)默認的大小16kb

pipe 它是Readable流的方法,相當于一個"管道"大磺,數(shù)據(jù)必須從上游 pipe 到下游抡句,也就是從一個 readable 流 pipe 到 writable 流。
后續(xù)將深入將介紹pipe杠愧。

stream-1.png

如上圖待榔,我們把文件比作裝水的桶,而水就是文件里的內(nèi)容流济,我們用一根管子(pipe)連接兩個桶使得水從一個桶流入另一個桶锐锣,這樣就慢慢的實現(xiàn)了大文件的傳輸過程。

三绳瘟、為什么應該使用 Stream

當有用戶在線看視頻雕憔,假定我們通過HTTP請求返回給用戶視頻內(nèi)容

const http = require('http');
const fs = require('fs');

http.createServer((req, res) => {
    fs.readFile(videoPath, (err, data) => {
        res.end(data);
    });
}).listen(8080);

但這樣有兩個明顯的問題

1.視頻文件需要全部讀取完,才能返回給用戶糖声,這樣等待時間會很長
2.視頻文件一次全放入內(nèi)存中斤彼,內(nèi)存吃不消

用流可以將視頻文件一點一點讀到內(nèi)存中,再一點一點返回給用戶蘸泻,讀一部分琉苇,寫一部分。(利用了 HTTP 協(xié)議的 Transfer-Encoding: chunked 分段傳輸特性)悦施,用戶體驗得到優(yōu)化并扇,同時對內(nèi)存的開銷明顯下降

const http = require('http');
const fs = require('fs');

http.createServer((req, res) => {
    fs.createReadStream(videoPath).pipe(res);
}).listen(8080);

四、可讀流(Readable Stream)

可讀流(Readable streams)是對提供數(shù)據(jù)的源頭(source)的抽象抡诞。

例如:

  • HTTP responses, on the client
  • HTTP requests, on the server
  • fs read streams
  • TCP sockets
  • process.stdin

所有的 Readable 都實現(xiàn)了 stream.Readable 類定義的接口穷蛹。

可讀流的兩種模式(flowing 和 paused)

1.在 flowing 模式下, 可讀流自動從系統(tǒng)底層讀取數(shù)據(jù)昼汗,并通過 EventEmitter 接口的事件盡快將數(shù)據(jù)提供給應用肴熏。

2.在 paused 模式下,必須顯式調(diào)用 stream.read()方法來從流中讀取數(shù)據(jù)片段乔遮。

所有初始工作模式為paused的Readable流扮超,可以通過下面三種途徑切換為flowing模式:

  • 監(jiān)聽'data'事件
  • 調(diào)用stream.resume()方法
  • 調(diào)用stream.pipe()方法將數(shù)據(jù)發(fā)送到Writable

流動模式flowing

流切換到流動模式 監(jiān)聽data事件

const rs = fs.createReadStream('./1.txt');
const ws = fs.createWriteStream('./2.txt');
rs.on('data', chunk => {
    ws.write(chunk);
});
ws.on('end', () => {
    ws.end();
});

如果寫入的速度跟不上讀取的速度,有可能導致數(shù)據(jù)丟失。正常的情況應該是出刷,寫完一段璧疗,再讀取下一段,如果沒有寫完的話馁龟,就讓讀取流先暫停崩侠,等寫完再繼續(xù)。

var fs = require('fs');
// 讀取highWaterMark(3字節(jié))數(shù)據(jù)坷檩,讀完之后填充緩存區(qū)却音,然后觸發(fā)data事件
var rs = fs.createReadStream(sourcePath, {
    highWaterMark: 3 
});
var ws = fs.createWriteStream(destPath, {
    highWaterMark: 3
});

rs.on('data', function(chunk) { // 當有數(shù)據(jù)流出時,寫入數(shù)據(jù)
    if (ws.write(chunk) === false) { // 如果沒有寫完矢炼,暫停讀取流
        rs.pause();
    }
});

ws.on('drain', function() { // 緩沖區(qū)清空觸發(fā)drain事件 這時再繼續(xù)讀取
    rs.resume();
});

rs.on('end', function() { // 當沒有數(shù)據(jù)時系瓢,關(guān)閉數(shù)據(jù)流
    ws.end();
});

或者使用更直接的pipe

fs.createReadStream(sourcePath).pipe(fs.createWriteStream(destPath));

暫停模式paused

1.在流沒有 pipe() 時,調(diào)用 pause() 方法可以將流暫停
2.pipe() 時句灌,需要移除所有 data 事件的監(jiān)聽夷陋,再調(diào)用 unpipe() 方法

read(size)

流在暫停模式下需要程序顯式調(diào)用 read() 方法才能得到數(shù)據(jù)。read() 方法會從內(nèi)部緩沖區(qū)中拉取并返回若干數(shù)據(jù)胰锌,當沒有更多可用數(shù)據(jù)時骗绕,會返回null。read()不會觸發(fā)'data'事件资昧。

使用 read() 方法讀取數(shù)據(jù)時酬土,如果傳入了 size 參數(shù),那么它會返回指定字節(jié)的數(shù)據(jù)格带;當指定的size字節(jié)不可用時撤缴,則返回null。如果沒有指定size參數(shù)叽唱,那么會返回內(nèi)部緩沖區(qū)中的所有數(shù)據(jù)腹泌。
NodeJS 為我們提供了一個 readable 的事件,事件在可讀流準備好數(shù)據(jù)的時候觸發(fā)尔觉,也就是先監(jiān)聽這個事件,收到通知又數(shù)據(jù)了我們再去讀取就好了:

const fs = require('fs');
rs = fs.createReadStream(sourcePath);

// 當你監(jiān)聽 readable事件的時候芥吟,會進入暫停模式
rs.on('readable', () => {
    console.log(rs._readableState.length);
        // read如果不加參數(shù)表示讀取整個緩存區(qū)數(shù)據(jù)
        // 讀取一個字段,如果可讀流發(fā)現(xiàn)你要讀的字節(jié)小于等于緩存字節(jié)大小侦铜,則直接返回
        let ch = rs.read(1);
});

暫停模式 緩存區(qū)的數(shù)據(jù)以鏈表的形式保存在BufferList中

五、可寫流(Writable Stream)

可寫流是對數(shù)據(jù)流向設(shè)備的抽象钟鸵,用來消費上游流過來的數(shù)據(jù)钉稍,通過可寫流程序可以把數(shù)據(jù)寫入設(shè)備,常見的是本地磁盤文件或者 TCP棺耍、HTTP 等網(wǎng)絡響應贡未。

Writable 的例子包括了:

  • HTTP requests, on the client
  • HTTP responses, on the server
  • fs write streams
  • zlib streams
  • crypto streams
  • TCP sockets
  • child process stdin
  • process.stdout, process.stderr

所有 Writable 流都實現(xiàn)了 stream.Writable 類定義的接口。

process.stdin.pipe(process.stdout);

process.stdout 是一個可寫流,程序把可讀流 process.stdin 傳過來的數(shù)據(jù)寫入的標準輸出設(shè)備俊卤。在了解了可讀流的基礎(chǔ)上理解可寫流非常簡單嫩挤,流就是有方向的數(shù)據(jù),其中可讀流是數(shù)據(jù)源消恍,可寫流是目的地岂昭,中間的管道環(huán)節(jié)是雙向流。

可寫流使用

調(diào)用可寫流實例的 write() 方法就可以把數(shù)據(jù)寫入可寫流

const fs = require('fs');
const rs = fs.createReadStream(sourcePath);
const ws = fs.createWriteStream(destPath);

rs.setEncoding('utf-8'); // 設(shè)置編碼格式
rs.on('data', chunk => {
  ws.write(chunk); // 寫入數(shù)據(jù)
});

監(jiān)聽了可讀流的 data 事件就會使可讀流進入流動模式狠怨,我們在回調(diào)事件里調(diào)用了可寫流的 write() 方法约啊,這樣數(shù)據(jù)就被寫入了可寫流抽象的設(shè)備destPath中佣赖。

write() 方法有三個參數(shù)

  • chunk {String| Buffer}恰矩,表示要寫入的數(shù)據(jù)
  • encoding 當寫入的數(shù)據(jù)是字符串的時候可以設(shè)置編碼
  • callback 數(shù)據(jù)被寫入之后的回調(diào)函數(shù)

'drain'事件

如果調(diào)用 stream.write(chunk) 方法返回 false,表示當前緩存區(qū)已滿憎蛤,流將在適當?shù)臅r機(緩存區(qū)清空后)觸發(fā) 'drain

const fs = require('fs');
const rs = fs.createReadStream(sourcePath);
const ws = fs.createWriteStream(destPath);

rs.setEncoding('utf-8'); // 設(shè)置編碼格式
rs.on('data', chunk => {
  let flag = ws.write(chunk); // 寫入數(shù)據(jù)
  if (!flag) { // 如果緩存區(qū)已滿暫停讀取
      rs.pause();
  }
});

ws.on('drain', () => {
    rs.resume(); // 緩存區(qū)已清空 繼續(xù)讀取寫入
});

六外傅、總結(jié)

stream(流)分為可讀流(flowing mode 和 paused mode)、可寫流蹂午、可讀寫流栏豺,Node.js 提供了多種流對象。 例如豆胸, HTTP 請求 和 process.stdout 就都是流的實例奥洼。stream 模塊提供了基礎(chǔ)的 API 。使用這些 API 可以很容易地來構(gòu)建實現(xiàn)流接口的對象晚胡。它們底層都調(diào)用了stream模塊并進行封裝灵奖。

后續(xù)我們將繼續(xù)對stream深入解析以及Readable Writable pipe的實現(xiàn)

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市估盘,隨后出現(xiàn)的幾起案子瓷患,更是在濱河造成了極大的恐慌,老刑警劉巖遣妥,帶你破解...
    沈念sama閱讀 206,311評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件擅编,死亡現(xiàn)場離奇詭異,居然都是意外死亡箫踩,警方通過查閱死者的電腦和手機爱态,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,339評論 2 382
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來境钟,“玉大人锦担,你說我怎么就攤上這事】鳎” “怎么了洞渔?”我有些...
    開封第一講書人閱讀 152,671評論 0 342
  • 文/不壞的土叔 我叫張陵套媚,是天一觀的道長。 經(jīng)常有香客問我磁椒,道長堤瘤,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 55,252評論 1 279
  • 正文 為了忘掉前任衷快,我火速辦了婚禮宙橱,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘蘸拔。我一直安慰自己师郑,他們只是感情好,可當我...
    茶點故事閱讀 64,253評論 5 371
  • 文/花漫 我一把揭開白布调窍。 她就那樣靜靜地躺著宝冕,像睡著了一般。 火紅的嫁衣襯著肌膚如雪邓萨。 梳的紋絲不亂的頭發(fā)上地梨,一...
    開封第一講書人閱讀 49,031評論 1 285
  • 那天,我揣著相機與錄音缔恳,去河邊找鬼宝剖。 笑死,一個胖子當著我的面吹牛歉甚,可吹牛的內(nèi)容都是我干的万细。 我是一名探鬼主播,決...
    沈念sama閱讀 38,340評論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼纸泄,長吁一口氣:“原來是場噩夢啊……” “哼赖钞!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起聘裁,我...
    開封第一講書人閱讀 36,973評論 0 259
  • 序言:老撾萬榮一對情侶失蹤雪营,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后衡便,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體献起,經(jīng)...
    沈念sama閱讀 43,466評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 35,937評論 2 323
  • 正文 我和宋清朗相戀三年镣陕,在試婚紗的時候發(fā)現(xiàn)自己被綠了征唬。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,039評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡茁彭,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出扶歪,到底是詐尸還是另有隱情理肺,我是刑警寧澤摄闸,帶...
    沈念sama閱讀 33,701評論 4 323
  • 正文 年R本政府宣布,位于F島的核電站妹萨,受9級特大地震影響年枕,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜乎完,卻給世界環(huán)境...
    茶點故事閱讀 39,254評論 3 307
  • 文/蒙蒙 一熏兄、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧树姨,春花似錦摩桶、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,259評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至转晰,卻和暖如春芦拿,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背查邢。 一陣腳步聲響...
    開封第一講書人閱讀 31,485評論 1 262
  • 我被黑心中介騙來泰國打工蔗崎, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人扰藕。 一個月前我還...
    沈念sama閱讀 45,497評論 2 354
  • 正文 我出身青樓缓苛,卻偏偏與公主長得像,于是被迫代替她去往敵國和親实胸。 傳聞我的和親對象是個殘疾皇子他嫡,可洞房花燭夜當晚...
    茶點故事閱讀 42,786評論 2 345

推薦閱讀更多精彩內(nèi)容

  • stream 流是一個抽象接口,在 Node 里被不同的對象實現(xiàn)庐完。例如 request to an HTTP se...
    明明三省閱讀 3,392評論 1 10
  • https://nodejs.org/api/documentation.html 工具模塊 Assert 測試 ...
    KeKeMars閱讀 6,305評論 0 6
  • 編譯地址:https://github.com/substack/stream-handbook譯者:jabez1...
    IT程序獅閱讀 1,291評論 0 4
  • 流是Node中最重要的組件和模式之一钢属。在社區(qū)里有一句格言說:讓一切事務流動起來。這已經(jīng)足夠來描述在Node中流...
    宮若石閱讀 539評論 0 0
  • 什么流 通俗的說就是一種门躯,有起點和終點的字節(jié)數(shù)據(jù)傳輸手段淆党,把數(shù)據(jù)從一個地方傳到另一個地方。流(Stream)是一個...
    JOKER_HAN閱讀 1,133評論 0 3