Node.js Streams(流)

流的概念

  • 流是一組有序的舌胶、有起點(diǎn)和終點(diǎn)的字節(jié)數(shù)據(jù)傳輸手段
  • 流不關(guān)心文件的整體內(nèi)容捆蜀,只關(guān)注是否從文件中讀到了數(shù)據(jù),以及讀到數(shù)據(jù)之后的處理
  • 流是一個抽象接口幔嫂,被 Node 中的很多對象所實(shí)現(xiàn)辆它。比如 HTTP 服務(wù)器 request 和 response 對象都是流
  • 流 是 Node.js 的核心模塊,基本上都是 stream的實(shí)例履恩,比如 process.stdout锰茉、http.clientRequest

流的好處

  • 流是基于事件的 API,用于管理和處理數(shù)據(jù),而且有不錯的效率
  • 借助事件和非阻塞 I/O 庫切心,流模塊允許在其可用的時候動態(tài)處理,在其不需要的時候釋放掉

流中的數(shù)據(jù)有兩種模式飒筑,二進(jìn)制模式和對象模式

  • 二進(jìn)制模式片吊, 每個分塊都是 buffer 或者 string 對象
  • 對象模式, 流內(nèi)部處理的是一系列普通對象

所有使用 Node.js API 創(chuàng)建的流對象都只能操作 strings 和 Buffer對象协屡。但是俏脊,通過一些第三方流的實(shí)現(xiàn),你依然能夠處理其它類型的 JavaScript 值 (除了 null肤晓,它在流處理中有特殊意義)爷贫。 這些流被認(rèn)為是工作在 “對象模式”(object mode)。 在創(chuàng)建流的實(shí)例時补憾,可以通過 objectMode 選項(xiàng)使流的實(shí)例切換到對象模式沸久。試圖將已經(jīng)存在的流切換到對象模式是不安全的。

Node.js 中有四種基本的流類型

  1. Readable-可讀流 (例如 fs.createReadStream() )
  2. Writable-可寫的流(例如 fs.createWriteStreame() )
  3. Duplex-可讀寫的流(例如 net.Socket )
  4. Transform-在讀寫過程中可以修改和變換數(shù)據(jù)的 Duplex 流 (例如
    zlib.createDeflate() )

第一種類型:可讀流 createReadStream

創(chuàng)建一個可讀流
// 引入 fs(讀取文件) 模塊
let fs = require('fs');
// 創(chuàng)建一個可讀流
let rs = fs.createReadStream('./1.txt',{
    flags:'r',
    encoding:'utf8',
    start:0,
    autoClose:true,
    end: 3,
    highWaterMark:3 
});

API:createReadStream(path, [options]);

  1. path 是讀取文件的路徑
  2. options 里面有
    • flags:打開文件要做的操作余蟹,默認(rèn)為 'r'
    • encoding:默認(rèn)是null卷胯,null 代表的是 buffer
    • start:開始讀取的索引位置
    • autoClose:讀取完畢后自動關(guān)閉
    • end:結(jié)束讀取的索引位置(包括結(jié)束位置)
    • highWaterMark:讀取緩存區(qū)默認(rèn)的默認(rèn)的大小 64kb (64*1024b)

    如果指定 encoding 為 utf8 編碼, highWaterMark 要大于 3 個字節(jié)

可讀流的一些監(jiān)聽事件
  1. data 事件
  2. end 事件
  3. error 事件
  4. open 事件
  5. close 事件

各個寫法如下:

// 流切換到流動模式,數(shù)據(jù)會被盡可能快的讀出
rs.on('data',function(data){ // 暫停模式 -> 流動模式
    console.log(data);
});

// 該事件會在讀完數(shù)據(jù)后被觸發(fā)
rs.on('end', function () {
    console.log('讀取完成');
});

// 讀文件失敗后被觸發(fā)
rs.on('error', function (err) {
    console.log(err);
});

// 文件打開后被觸發(fā)
rs.on('open', function () {
    console.log('文件打開了');
});

// 文件關(guān)閉后被觸發(fā)
rs.on('close', function () {
    console.log('關(guān)閉');
});
設(shè)置編碼

與指定 {encoding:'utf8'} 效果相同威酒,設(shè)置編碼

rs.setEncoding('utf8');
暫停和恢復(fù)觸發(fā) data

通過 pause() 方法和 resume() 方法

rs.on('data', function (data) {
    console.log(data);
    rs.pause(); // 暫停方法 表示暫停讀取窑睁,暫停data事件觸發(fā)
});
setTimeout(function () {
    rs.resume(); // 恢復(fù)方法
},2000);

第二種類型:可寫流 createWriteStream

創(chuàng)建一個可寫流
// 引入 fs(讀取文件) 模塊
let fs = require('fs');
// 創(chuàng)建一個可寫流
let ws = fs.createWriteStream('./1.txt',{
    flags:'w',
    encoding:'utf8',
    highWaterMark:3 
});

API:createWriteStream(path, [options]);

  1. path 是讀取文件的路徑
  2. options 里面有
    • flags:打開文件要做的操作,默認(rèn)為 'w'
    • encoding:默認(rèn)是 utf8
    • highWaterMark:寫入緩存區(qū)的葵孤,默認(rèn)大小 16kb
可寫流的一些方法
1. write 方法
ws.write(chunk, [encoding], [callback]);
  • chunk 寫入的數(shù)據(jù) buffer/string
  • encoding 編碼格式担钮,chunk 為字符串時有用,是個可選參數(shù)
  • callback 寫入成功后的回調(diào)

返回值為布爾值尤仍,系統(tǒng)緩存區(qū)滿時為 false箫津,未滿時為 true

2. end 方法
ws.end(chunk, [encoding], [callback]);

表明接下來沒有數(shù)據(jù)要被寫入 Writable 通過傳入可選的 chunk 和 encoding 參數(shù),可以在關(guān)閉流之前再寫入一段數(shù)據(jù) 如果傳入了可選的 callback 函數(shù)宰啦,它將作為 'finish' 事件的回調(diào)函數(shù)

3. drain 方法
ws.on('drain',function(){
    console.log('drain')
});
  • 當(dāng)一個流不處在 drain 的狀態(tài)苏遥, 對 write() 的調(diào)用會緩存數(shù)據(jù)塊, 并且返回 false
  • 當(dāng)前所有緩存的數(shù)據(jù)塊滿了赡模,滿了之后情況才會出發(fā) drain
  • 一旦 write() 返回 false田炭, 在 'drain' 事件觸發(fā)前, 不能寫入任何數(shù)據(jù)塊
4. finish 方法
ws.end('結(jié)束');
ws.on('finish',function(){
    console.log('drain')
});
  • 在調(diào)用 end 方法漓柑,且緩沖區(qū)數(shù)據(jù)都已經(jīng)傳給底層系統(tǒng)之后教硫, 'finish' 事件將被觸發(fā)

第三種類型:可讀寫的流,也叫雙工流(Duplex)

雙工流辆布,可以在同一個對象上同時實(shí)現(xiàn)可讀瞬矩、可寫,就好像同時繼承這兩個接口锋玲。而且讀取可以沒關(guān)系(互不干擾)

// 引入雙工流模塊
let {Duplex} =  require('stream');
let d = Duplex({
    read(){
        this.push('hello');
        this.push(null)
    },
    write(chunk,encoding,callback){
        console.log(chunk);
        callback();
    }
});
d.on('data',function(data){
    console.log(data);
});
d.write('hello');

第四種類型:轉(zhuǎn)換流(Transform)

  • 轉(zhuǎn)換流輸出是從輸入中計(jì)算出來的
  • 轉(zhuǎn)換流中景用,不需要實(shí)現(xiàn) read 和 write 方法,只需要實(shí)現(xiàn)一個 transform 方法嫩絮,就可以結(jié)合兩者丛肢。
// 引入轉(zhuǎn)換流
let {Transform} =  require('stream');
// 轉(zhuǎn)換流的參數(shù)和可寫流一樣
let tranform1 = Transform({
    transform(chunk,encoding,callback){
        this.push(chunk.toString().toUpperCase()); 
        callback();
    }
});
let tranform2 = Transform({
    transform(chunk,encoding,callback){
        console.log(chunk.toString());
        callback();
    }
});
process.stdin.pipe(tranform1).pipe(tranform2);

pipe 方法

大家都知道,想把 Readable 的數(shù)據(jù) 寫到 Writable剿干,需要手動將數(shù)據(jù)讀入內(nèi)存中蜂怎,然后在寫入 Writable。也就是每次傳遞數(shù)據(jù)的時候置尔,都需要寫一下的代碼:

readable.on('readable', (err) => {
 if(err) throw err
 writable.write(readable.read())
})

為了方便使用杠步,Node.js 提供了 pipe() 方法

readable.pipe(writable)
pipe 方法的原理
var fs = require('fs');
var ws = fs.createWriteStream('./2.txt');
var rs = fs.createReadStream('./1.txt');
rs.on('data', function (data) {
    var flag = ws.write(data);
    if(!flag)
    rs.pause();
});
ws.on('drain', function () {
    rs.resume();
});
rs.on('end', function () {
    ws.end();
});
unpipe 用法
  • readable.unpipe() 方法將之前通過 stream.pipe() 方法綁定的流分離
  • 如果 destination 沒有傳入, 則所有綁定的流都會被分離
let fs = require('fs');
var from = fs.createReadStream('./1.txt');
var to = fs.createWriteStream('./2.txt');
from.pipe(to);
setTimeout(() => {
console.log('關(guān)閉向2.txt的寫入');
from.unpipe(writable);
console.log('手工關(guān)閉文件流');
to.end();
}, 1000);
cork & uncork
  • 調(diào)用 writable.cork() 方法將強(qiáng)制所有寫入數(shù)據(jù)都存到內(nèi)存中的緩沖區(qū)里。 直到調(diào)用 stream.uncork() 或 stream.end() 方法時榜轿,緩沖區(qū)里的數(shù)據(jù)才會被輸出
  • writable.uncork() 將輸出在 stream.cork() 方法被調(diào)用之后緩沖在內(nèi)存中的所有數(shù)據(jù)
stream.cork();
stream.write('1');
stream.write('2');
process.nextTick(() => stream.uncork());

readable

'readable' 事件將在流中有數(shù)據(jù)可供讀取時才觸發(fā)幽歼。在某些情況下,為 'readable' 事件添加回調(diào)將會導(dǎo)致一些數(shù)據(jù)被讀取到內(nèi)部緩存中

const readable = getReadableStreamSomehow();
readable.on('readable', () => {
  // 某些數(shù)據(jù)可讀
});
let fs = require('fs');
let rs = fs.createReadStream('./1.txt',{
  start:3,
  end:8,
  encoding:'utf8',
  highWaterMark:3
});
rs.on('readable',function () {
  console.log('readable');
  console.log('rs._readableState.buffer.length',rs._readableState.length);
  let d = rs.read(1);
  console.log('rs._readableState.buffer.length',rs._readableState.length);
  console.log(d);
  setTimeout(()=>{
      console.log('rs._readableState.buffer.length',rs._readableState.length);
  },500)
});
  • 當(dāng)流數(shù)據(jù)到達(dá)尾部時谬盐, 'readable' 事件會觸發(fā)甸私。觸發(fā)順序在 'end' 事件之前
  • 事實(shí)上, 'readable' 事件表明流有了新的動態(tài):要么是有了新的數(shù)據(jù)飞傀,要么是到了流的尾部皇型。 對于前者, stream.read() 將返回可用的數(shù)據(jù)砸烦。而對于后者弃鸦, stream.read() 將返回 null。

可讀流的兩種模式

  1. 可讀流的兩種工作模式:flowing 和 paused
  2. flowing 模式下幢痘,可讀流自動從系統(tǒng)底層讀取數(shù)據(jù)唬格,通過 EventEmitter 接口的事件盡快將數(shù)據(jù)提供給應(yīng)用
  3. paused 模式下,調(diào)用 stream.read() 方法來從流中讀取數(shù)據(jù)片段
  4. 所有初始工作模式為 paused 的 Readable 流颜说,可以通過下面三種途徑切換到 flowing 模式
    • 監(jiān)聽 'data' 事件
    • 調(diào)用 stream.resume() 方法
    • 調(diào)用 stream.pipe() 方法將數(shù)據(jù)發(fā)送到 Writable
  5. 可讀流可以通過下面途徑切換到 paused 模式:
    • 如果不存在管道目標(biāo)(pipe destination)购岗,可以通過調(diào)用 stream.pause() 方法實(shí)現(xiàn)。
    • 如果存在管道目標(biāo)门粪,可以通過取消 'data' 事件監(jiān)聽藕畔,并調(diào)用 stream.unpipe() 方法移除所有管道目標(biāo)來實(shí)現(xiàn)。

如果 Readable 切換到 flowing 模式庄拇,且沒有消費(fèi)者處理流中的數(shù)據(jù)注服,這些數(shù)據(jù)將會丟失。 比如措近, 調(diào)用了 readable.resume() 方法卻沒有監(jiān)聽 'data' 事件溶弟,或是取消了 'data' 事件監(jiān)聽,就有可能出現(xiàn)這種情況瞭郑。

可讀流的三種狀態(tài)

在任意時刻辜御,任意可讀流應(yīng)確切處于下面三種狀態(tài)之一:

  1. readable._readableState.flowing = null
  2. readable._readableState.flowing = false
  3. readable._readableState.flowing = true
  • 若 readable._readableState.flowing 為 null,由于不存在數(shù)據(jù)消費(fèi)者屈张,可讀流將不會產(chǎn)生數(shù)據(jù)擒权。 在這個狀態(tài)下袱巨,監(jiān)聽 'data' 事件,調(diào)用 readable.pipe() 方法碳抄,或者調(diào)用 readable.resume() 方法愉老, readable._readableState.flowing 的值將會變?yōu)?true 。這時剖效,隨著數(shù)據(jù)生成嫉入,可讀流開始頻繁觸發(fā)事件。

  • 調(diào)用 readable.pause() 方法璧尸, readable.unpipe() 方法咒林, 或者接收 “背壓”(back pressure), 將導(dǎo)致 readable._readableState.flowing 值變?yōu)?false爷光。 這將暫停事件流垫竞,但 不會 暫停數(shù)據(jù)生成。 在這種情況下蛀序,為 'data' 事件設(shè)置監(jiān)聽函數(shù)不會導(dǎo)致 readable._readableState.flowing 變?yōu)?true件甥。

  • 當(dāng) readable._readableState.flowing 值為 false 時, 數(shù)據(jù)可能堆積到流的內(nèi)部緩存中哼拔。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末引有,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子倦逐,更是在濱河造成了極大的恐慌譬正,老刑警劉巖,帶你破解...
    沈念sama閱讀 221,198評論 6 514
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件檬姥,死亡現(xiàn)場離奇詭異曾我,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)健民,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,334評論 3 398
  • 文/潘曉璐 我一進(jìn)店門抒巢,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人秉犹,你說我怎么就攤上這事蛉谜。” “怎么了崇堵?”我有些...
    開封第一講書人閱讀 167,643評論 0 360
  • 文/不壞的土叔 我叫張陵型诚,是天一觀的道長。 經(jīng)常有香客問我鸳劳,道長狰贯,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 59,495評論 1 296
  • 正文 為了忘掉前任,我火速辦了婚禮涵紊,結(jié)果婚禮上傍妒,老公的妹妹穿的比我還像新娘。我一直安慰自己摸柄,他們只是感情好颤练,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,502評論 6 397
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著塘幅,像睡著了一般。 火紅的嫁衣襯著肌膚如雪尿贫。 梳的紋絲不亂的頭發(fā)上电媳,一...
    開封第一講書人閱讀 52,156評論 1 308
  • 那天,我揣著相機(jī)與錄音庆亡,去河邊找鬼匾乓。 笑死,一個胖子當(dāng)著我的面吹牛又谋,可吹牛的內(nèi)容都是我干的拼缝。 我是一名探鬼主播,決...
    沈念sama閱讀 40,743評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼彰亥,長吁一口氣:“原來是場噩夢啊……” “哼咧七!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起任斋,我...
    開封第一講書人閱讀 39,659評論 0 276
  • 序言:老撾萬榮一對情侶失蹤继阻,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后废酷,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體瘟檩,經(jīng)...
    沈念sama閱讀 46,200評論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,282評論 3 340
  • 正文 我和宋清朗相戀三年澈蟆,在試婚紗的時候發(fā)現(xiàn)自己被綠了墨辛。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,424評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡趴俘,死狀恐怖睹簇,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情寥闪,我是刑警寧澤带膀,帶...
    沈念sama閱讀 36,107評論 5 349
  • 正文 年R本政府宣布,位于F島的核電站橙垢,受9級特大地震影響垛叨,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,789評論 3 333
  • 文/蒙蒙 一嗽元、第九天 我趴在偏房一處隱蔽的房頂上張望敛纲。 院中可真熱鬧,春花似錦剂癌、人聲如沸淤翔。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,264評論 0 23
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽旁壮。三九已至,卻和暖如春谐檀,著一層夾襖步出監(jiān)牢的瞬間抡谐,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,390評論 1 271
  • 我被黑心中介騙來泰國打工桐猬, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留麦撵,地道東北人。 一個月前我還...
    沈念sama閱讀 48,798評論 3 376
  • 正文 我出身青樓溃肪,卻偏偏與公主長得像免胃,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子惫撰,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,435評論 2 359

推薦閱讀更多精彩內(nèi)容

  • stream 流是一個抽象接口羔沙,在 Node 里被不同的對象實(shí)現(xiàn)。例如 request to an HTTP se...
    明明三省閱讀 3,410評論 1 10
  • 流是Node中最重要的組件和模式之一厨钻。在社區(qū)里有一句格言說:讓一切事務(wù)流動起來撬碟。這已經(jīng)足夠來描述在Node中流...
    宮若石閱讀 554評論 0 0
  • 流的基本概念及理解 流是一種數(shù)據(jù)傳輸手段,是有順序的莉撇,有起點(diǎn)和終點(diǎn)呢蛤,比如你要把數(shù)據(jù)從一個地方傳到另外一個地方流非常...
    October_yang閱讀 7,694評論 3 9
  • Buffer Buffer的構(gòu)成 Buffer對象類似數(shù)組,它的元素位16進(jìn)制的兩位數(shù)棍郎,即0到255的數(shù)值其障。主要是...
    人失格閱讀 1,840評論 0 0
  • 記得2016年畢業(yè)季,每天焦灼地找工作涂佃,家鄉(xiāng)就業(yè)環(huán)境不好励翼,還好最終憑自己的努力入職國企,但里面工作氛圍壓的人喘不過...
    xxx33閱讀 661評論 2 2