Nodejs Stream 手冊

編譯地址:https://github.com/substack/stream-handbook
譯者:jabez128
原文地址:https://github.com/jabez128/stream-handbook

寫在前面的話:
如果你正在學習Nodejs,那么流一定是一個你需要掌握的概念。如果你想成為一個Node高手葱轩,那么流一定是武功秘籍中不可缺少的一個部分宴树。關于流這個主題,由Node高手substack帶來的stream-handbook絕對是經典入門讀物之一额各,其在Github上的star數量已經超過了4500個国觉,足以見其權威程度。本文下面的內容將參考自substack的這篇文章虾啦。本文也放在Github上麻诀,如果你本文覺得對你有幫助,鼓勵大家去github上幫我點個贊傲醉。https://github.com/jabez128/stream-handbook

引子

在編寫代碼時蝇闭,我們應該有一些方法將程序像連接水管一樣連接起來 -- 當我們需要獲取一些數據時,可以去通過"擰"其他的部分來達到目的硬毕。這也應該是IO應有的方式呻引。 -- Doug McIlroy. October 11, 1964

早先的unix開始,stream便開始進入了人們的視野昭殉,在過去的幾十年的時間里苞七,它被證明是一種可依賴的編程方式,它可以將一個大型的系統(tǒng)拆成一些很小的部分挪丢,并且讓這些部分之間完美地進行合作蹂风。在unix中,我們可以使用|符號來實現流乾蓬。在node中惠啄,node內置的stream模塊已經被多個核心模塊使用,同時也可以被用戶自定義的模塊使用任内。和unix類似撵渡,node中的流模塊的基本操作符叫做.pipe(),同時你也可以使用一個后壓機制來應對那些對數據消耗較慢的對象死嗦。

在node中趋距,流可以幫助我們將事情的重點分為幾份,因為使用流可以幫助我們將實現接口的部分分割成一些連續(xù)的接口越除,這些接口都是可重用的节腐。接著外盯,你可以將一個流的輸出口接到另一個流的輸入口,然后使用使用一些庫來對流實現高級別的控制翼雀。

對于小型程序設計(small-program design)以及unix哲學來說饱苟,流都是一個重要的組成部分,但是除此之外還有一些重要的事情值得我們思考狼渊。永遠要記得:十鳥在森林不如一鳥在手里箱熬。

為什么應該使用流

在node中,I/O都是異步的狈邑,所以在和硬盤以及網絡的交互過程中會涉及到傳遞回調函數的過程城须。你之前可能會寫出這樣的代碼:

var http = require('http');
var fs = require('fs');

var server = http.createServer(function (req, res) {
    fs.readFile(__dirname + '/data.txt', function (err, data) {
        res.end(data);
    });
});
server.listen(8000);

上面的這段代碼并沒有什么問題,但是在每次請求時官地,我們都會把整個data.txt文件讀入到內存中酿傍,然后再把結果返回給客戶端。想想看驱入,如果data.txt文件非常大赤炒,在響應大量用戶的并發(fā)請求時,程序可能會消耗大量的內存亏较,這樣很可能會造成用戶連接緩慢的問題莺褒。

其次,上面的代碼可能會造成很不好的用戶體驗雪情,因為用戶在接收到任何的內容之前首先需要等待程序將文件內容完全讀入到內存中遵岩。

所幸的是,(req,res)參數都是流對象巡通,這意味著我們可以使用一種更好的方法來實現上面的需求:

var http = require('http');
var fs = require('fs');

var server = http.createServer(function (req, res) {
    var stream = fs.createReadStream(__dirname + '/data.txt');
    stream.pipe(res);
});
server.listen(8000);

在這里尘执,.pipe()方法會自動幫助我們監(jiān)聽dataend事件。上面的這段代碼不僅簡潔宴凉,而且data.txt文件中每一小段數據都將源源不斷的發(fā)送到客戶端誊锭。

除此之外,使用.pipe()方法還有別的好處弥锄,比如說它可以自動控制后端壓力丧靡,以便在客戶端連接緩慢的時候node可以將盡可能少的緩存放到內存中。

想要將數據進行壓縮籽暇?我們可以使用相應的流模塊完成這項工作!

var http = require('http');
var fs = require('fs');
var oppressor = require('oppressor');

var server = http.createServer(function (req, res) {
    var stream = fs.createReadStream(__dirname + '/data.txt');
    stream.pipe(oppressor(req)).pipe(res);
});
server.listen(8000);

通過上面的代碼温治,我們成功的將發(fā)送到瀏覽器端的數據進行了gzip壓縮。我們只是使用了一個oppressor模塊來處理這件事情戒悠。

一旦你學會使用流api熬荆,你可以將這些流模塊像搭樂高積木或者像連接水管一樣拼湊起來,從此以后你可能再也不會去使用那些沒有流API的模塊獲取和推送數據了绸狐。

流模塊基礎

在node中惶看,一共有五種類型的流:readable,writable,transform,duplex以及"classic"

pipe

無論哪一種流捏顺,都會使用.pipe()方法來實現輸入和輸出六孵。

.pipe()函數很簡單纬黎,它僅僅是接受一個源頭src并將數據輸出到一個可寫的流dst中:

src.pipe(dst)

.pipe(dst)將會返回dst因此你可以鏈式調用多個流:

a.pipe(b).pipe(c).pipe(d)

上面的代碼也可以等價為:

a.pipe(b);
b.pipe(c);
c.pipe(d);

這和你在unix中編寫流代碼很類似:

a | b | c | d

只不過此時你是在node中編寫而不是在shell中!

readable流

Readable流可以產出數據劫窒,你可以將這些數據傳送到一個writable本今,transform或者duplex流中,只需要調用pipe()方法:

readableStream.pipe(dst)

創(chuàng)建一個readable流

現在我們就來創(chuàng)建一個readable流主巍!

var Readable = require('stream').Readable;

var rs = new Readable;
rs.push('beep ');
rs.push('boop\n');
rs.push(null);

rs.pipe(process.stdout);

下面運行代碼:

$ node read0.js
beep boop

在上面的代碼中rs.push(null)的作用是告訴rs輸出數據應該結束了冠息。

需要注意的一點是我們在將數據輸出到process.stdout之前已經將內容推送進readable流rs中,但是所有的數據依然是可寫的孕索。

這是因為在你使用.push()將數據推進一個readable流中時逛艰,一直要到另一個東西來消耗數據之前,數據都會存在一個緩存中搞旭。

然而散怖,在更多的情況下,我們想要的是當需要數據時數據才會產生肄渗,以此來避免大量的緩存數據镇眷。

我們可以通過定義一個._read函數來實現按需推送數據:

var Readable = require('stream').Readable;
var rs = Readable();

var c = 97;
rs._read = function () {
    rs.push(String.fromCharCode(c++));
    if (c > 'z'.charCodeAt(0)) rs.push(null);
};

rs.pipe(process.stdout);

代碼的運行結果如下所示:

$ node read1.js
abcdefghijklmnopqrstuvwxyz

在這里我們將字母az推進了rs中,但是只有當數據消耗者出現時翎嫡,數據才會真正實現推送欠动。

_read函數也可以獲取一個size參數來指明消耗者想要讀取多少比特的數據,但是這個參數是可選的惑申。

需要注意到的是你可以使用util.inherit()來繼承一個Readable流具伍。

為了說明只有在數據消耗者出現時,_read函數才會被調用圈驼,我們可以將上面的代碼簡單的修改一下:

var Readable = require('stream').Readable;
var rs = Readable();

var c = 97 - 1;

rs._read = function () {
    if (c >= 'z'.charCodeAt(0)) return rs.push(null);

    setTimeout(function () {
        rs.push(String.fromCharCode(++c));
    }, 100);
};

rs.pipe(process.stdout);

process.on('exit', function () {
    console.error('\n_read() called ' + (c - 97) + ' times');
});
process.stdout.on('error', process.exit);

運行上面的代碼我們可以發(fā)現如果我們只請求5比特的數據人芽,那么_read只會運行5次:

$ node read2.js | head -c5
abcde
_read() called 5 times

在上面的代碼中,setTimeout很重要碗脊,因為操作系統(tǒng)需要花費一些時間來發(fā)送程序結束信號啼肩。

另外,process.stdout.on('error',fn)處理器也很重要,因為當head不再關心我們的程序輸出時衙伶,操作系統(tǒng)將會向我們的進程發(fā)送一個SIGPIPE信號祈坠,此時process.stdout將會捕獲到一個EPIPE錯誤。

上面這些復雜的部分在和操作系統(tǒng)相關的交互中是必要的矢劲,但是如果你直接和node中的流交互的話赦拘,則可有可無。

如果你創(chuàng)建了一個readable流芬沉,并且想要將任何的值推送到其中的話躺同,確保你在創(chuàng)建流的時候指定了objectMode參數,Readable({ objectMode: true })阁猜。

消耗一個readable流

大部分時候,將一個readable流直接pipe到另一種類型的流或者使用through或者concat-stream創(chuàng)建的流中蹋艺,是一件很容易的事情剃袍。但是有時我們也會需要直接來消耗一個readable流。

process.stdin.on('readable', function () {
    var buf = process.stdin.read();
    console.dir(buf);
});

代碼運行結果如下所示:

$ (echo abc; sleep 1; echo def; sleep 1; echo ghi) | node consume0.js 
<Buffer 61 62 63 0a>
<Buffer 64 65 66 0a>
<Buffer 67 68 69 0a>
null

當數據可用時捎谨,readable事件將會被觸發(fā)民效,此時你可以調用.read()方法來從緩存中獲取這些數據。

當流結束時涛救,.read()將返回null畏邢,因為此時已經沒有更多的字節(jié)可以供我們獲取了。

你也可以告訴.read()方法來返回n個字節(jié)的數據检吆。雖然所有核心對象中的流都支持這種方式舒萎,但是對于對象流來說這種方法并不可用。

下面是一個例子蹭沛,在這里我們制定每次讀取3個字節(jié)的數據:

process.stdin.on('readable', function () {
    var buf = process.stdin.read(3);
    console.dir(buf);
});

運行上面的例子臂寝,我們將獲取到不完整的數據:

$ (echo abc; sleep 1; echo def; sleep 1; echo ghi) | node consume1.js 
<Buffer 61 62 63>
<Buffer 0a 64 65>
<Buffer 66 0a 67>

這是因為多余的數據都留在了內部的緩存中,因此這個時候我們需要告訴node我們還對剩下的數據感興趣致板,我們可以使用.read(0)來完成這件事:

process.stdin.on('readable', function () {
    var buf = process.stdin.read(3);
    console.dir(buf);
    process.stdin.read(0);
});

到現在為止我們的代碼和我們所期望的一樣了交煞!

$ (echo abc; sleep 1; echo def; sleep 1; echo ghi) | node consume2.js 
<Buffer 61 62 63>
<Buffer 0a 64 65>
<Buffer 66 0a 67>
<Buffer 68 69 0a>

我們也可以使用.unshift()方法來放置多余的數據。

使用unshift()方法能夠放置我們進行不必要的緩存拷貝斟或。在下面的代碼中我們將創(chuàng)建一個分割新行的可讀解析器:

var offset = 0;

process.stdin.on('readable', function () {
    var buf = process.stdin.read();
    if (!buf) return;
    for (; offset < buf.length; offset++) {
        if (buf[offset] === 0x0a) {
            console.dir(buf.slice(0, offset).toString());
            buf = buf.slice(offset + 1);
            offset = 0;
            process.stdin.unshift(buf);
            return;
        }
    }
    process.stdin.unshift(buf);
});

代碼的運行結果如下所示:

$ tail -n +50000 /usr/share/dict/american-english | head -n10 | node lines.js 
'hearties'
'heartiest'
'heartily'
'heartiness'
'heartiness\'s'
'heartland'
'heartland\'s'
'heartlands'
'heartless'
'heartlessly'

當然素征,已經有很多這樣的模塊比如split來幫助你完成這件事情,你完全不需要自己寫一個萝挤。

writable流

一個writable流指的是只能流進不能流出的流:

src.pipe(writableStream)

創(chuàng)建一個writable流

只需要定義一個._write(chunk,enc,next)函數御毅,你就可以將一個readable流的數據釋放到其中:

var Writable = require('stream').Writable;
var ws = Writable();
ws._write = function (chunk, enc, next) {
    console.dir(chunk);
    next();
};

process.stdin.pipe(ws);

代碼運行結果如下所示:

$ (echo beep; sleep 1; echo boop) | node write0.js 
<Buffer 62 65 65 70 0a>
<Buffer 62 6f 6f 70 0a>

第一個參數,chunk代表寫進來的數據怜珍。

第二個參數enc代表編碼的字符串端蛆,但是只有在opts.decodeStringfalse的時候你才可以寫一個字符串。

第三個參數酥泛,next(err)是一個回調函數今豆,使用這個回調函數你可以告訴數據消耗者可以寫更多的數據。你可以有選擇性的傳遞一個錯誤對象error柔袁,這時會在流實體上觸發(fā)一個emit事件呆躲。

在從一個readable流向一個writable流傳數據的過程中,數據會自動被轉換為Buffer對象捶索,除非你在創(chuàng)建writable流的時候制定了decodeStrings參數為false,Writable({decodeStrings: false})插掂。

如果你需要傳遞對象,需要指定objectMode參數為trueWritable({ objectMode: true })辅甥。

向一個writable流中寫東西

如果你需要向一個writable流中寫東西酝润,只需要調用.write(data)即可。

process.stdout.write('beep boop\n');

為了告訴一個writable流你已經寫完畢了璃弄,只需要調用.end()方法要销。你也可以使用.end(data)在結束前再寫一些數據。

var fs = require('fs');
var ws = fs.createWriteStream('message.txt');

ws.write('beep ');

setTimeout(function () {
    ws.end('boop\n');
}, 1000);

運行結果如下所示:

$ node writing1.js 
$ cat message.txt
beep boop

如果你在創(chuàng)建writable流時指定了highWaterMark參數谢揪,那么當沒有更多數據寫入時蕉陋,調用.write()方法將會返回false。

如果你想要等待緩存情況拨扶,可以監(jiān)聽drain事件。

transform流

你可以將transform流想象成一個流的中間部分茁肠,它可以讀也可寫患民,但是并不保存數據,它只負責處理流經它的數據垦梆。

duplex流

Duplex流是一個可讀也可寫的流匹颤,就好像一個電話,可以接收也可以發(fā)送語音托猩。一個rpc交換是一個duplex流的最好的例子印蓖。如果你看到過下面這樣的代碼:

a.pipe(b).pipe(a)

那么你需要處理的就是一個duplex流對象。

classic流

Classic流是一個古老的接口京腥,最早出現在node 0.4中赦肃。雖然現在不怎么用,但是我們最好還是來了解一下它的工作原理公浪。

無論何時他宛,只要一個流對象注冊了一個data監(jiān)聽器,它就會自動的切換到classic模式欠气,并且根據舊API的方式運行厅各。

classic readable流

Classic readable流只是一個事件發(fā)射器,當有數據消耗者出現時發(fā)射emit事件预柒,當輸出數據完畢時發(fā)射end事件队塘。

我們可以同構檢查stream.readable來檢查一個classic流對象是否可讀。

下面是一個簡單的readable流對象的例子宜鸯,程序的運行結果將會輸出AJ

var Stream = require('stream');
var stream = new Stream;
stream.readable = true;

var c = 64;
var iv = setInterval(function () {
    if (++c >= 75) {
        clearInterval(iv);
        stream.emit('end');
    }
    else stream.emit('data', String.fromCharCode(c));
}, 100);

stream.pipe(process.stdout);

運行結果如下所示:

$ node classic0.js
ABCDEFGHIJ

為了從一個classic readable流中讀取數據憔古,你可以注冊dataend監(jiān)聽器。下面是一個使用舊readable流方式從process.stdin中讀取數據的例子:

process.stdin.on('data', function (buf) {
    console.log(buf);
});
process.stdin.on('end', function () {
    console.log('__END__');
});

運行結果如下所示:

$ (echo beep; sleep 1; echo boop) | node classic1.js 
<Buffer 62 65 65 70 0a>
<Buffer 62 6f 6f 70 0a>
__END__

需要注意的一點是當你在一個流對象上注冊了一個data監(jiān)聽器顾翼,你就將這個流放在了兼容模式下投放,此時你不能使用兩個stream2的api。

如果你自己創(chuàng)建流對象适贸,永遠不要綁定dataend監(jiān)聽器灸芳。如果你需要和舊版本的流兼容涝桅,最好使用第三方庫來實現.pipe()方法。

例如烙样,你可以使用through模塊來避免顯式的使用dataend監(jiān)聽器:

var through = require('through');
process.stdin.pipe(through(write, end));

function write (buf) {
    console.log(buf);
}
function end () {
    console.log('__END__');
}

程序運行結果如下所示:

$ (echo beep; sleep 1; echo boop) | node through.js 
<Buffer 62 65 65 70 0a>
<Buffer 62 6f 6f 70 0a>
__END__

你也可以使用concat-stream模塊來將整個流的內容緩存起來:

var concat = require('concat-stream');
process.stdin.pipe(concat(function (body) {
    console.log(JSON.parse(body));
}));

程序運行結果如下所示:

$ echo '{"beep":"boop"}' | node concat.js 
{ beep: 'boop' }

Classic readable流擁有.pause().resume()邏輯來暫停一個流冯遂,但是這都是可選的。如果你想要使用.pause().resume()方法谒获,你應該使用through模塊來幫助你處理緩存蛤肌。

classic writable流

Classic writable流非常簡單。其中只定義了.write(buf)批狱,.end(buf)裸准,以及.desctory()方法。其中.end(buf)的參數buf是可選參數赔硫,但是一般來說node程序員還是喜歡使用.end(buf)這種寫法炒俱。

接下來讀什么

  • node核心stream模塊文檔
  • 你可以使用readable-stream模塊來確保你的stream2代碼兼容node 0.8及其之前的代碼。在你npm install readable-stream之后直接require('readable-stream')而不要require('stream')爪膊。

本文參考自stream-handbook.

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯系作者
  • 序言:七十年代末权悟,一起剝皮案震驚了整個濱河市,隨后出現的幾起案子推盛,更是在濱河造成了極大的恐慌峦阁,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,311評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件耘成,死亡現場離奇詭異榔昔,居然都是意外死亡,警方通過查閱死者的電腦和手機凿跳,發(fā)現死者居然都...
    沈念sama閱讀 88,339評論 2 382
  • 文/潘曉璐 我一進店門件豌,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人控嗜,你說我怎么就攤上這事茧彤。” “怎么了疆栏?”我有些...
    開封第一講書人閱讀 152,671評論 0 342
  • 文/不壞的土叔 我叫張陵曾掂,是天一觀的道長。 經常有香客問我壁顶,道長珠洗,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 55,252評論 1 279
  • 正文 為了忘掉前任若专,我火速辦了婚禮许蓖,結果婚禮上,老公的妹妹穿的比我還像新娘。我一直安慰自己膊爪,他們只是感情好自阱,可當我...
    茶點故事閱讀 64,253評論 5 371
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著米酬,像睡著了一般沛豌。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上赃额,一...
    開封第一講書人閱讀 49,031評論 1 285
  • 那天加派,我揣著相機與錄音,去河邊找鬼跳芳。 笑死芍锦,一個胖子當著我的面吹牛,可吹牛的內容都是我干的筛严。 我是一名探鬼主播醉旦,決...
    沈念sama閱讀 38,340評論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼桨啃!你這毒婦竟也來了?” 一聲冷哼從身側響起檬输,我...
    開封第一講書人閱讀 36,973評論 0 259
  • 序言:老撾萬榮一對情侶失蹤照瘾,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后丧慈,有當地人在樹林里發(fā)現了一具尸體析命,經...
    沈念sama閱讀 43,466評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 35,937評論 2 323
  • 正文 我和宋清朗相戀三年逃默,在試婚紗的時候發(fā)現自己被綠了鹃愤。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,039評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡完域,死狀恐怖软吐,靈堂內的尸體忽然破棺而出,到底是詐尸還是另有隱情吟税,我是刑警寧澤凹耙,帶...
    沈念sama閱讀 33,701評論 4 323
  • 正文 年R本政府宣布,位于F島的核電站肠仪,受9級特大地震影響肖抱,放射性物質發(fā)生泄漏。R本人自食惡果不足惜异旧,卻給世界環(huán)境...
    茶點故事閱讀 39,254評論 3 307
  • 文/蒙蒙 一意述、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧,春花似錦荤崇、人聲如沸拌屏。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,259評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽瑰抵。三九已至署惯,卻和暖如春责嚷,著一層夾襖步出監(jiān)牢的瞬間箕慧,已是汗流浹背亦渗。 一陣腳步聲響...
    開封第一講書人閱讀 31,485評論 1 262
  • 我被黑心中介騙來泰國打工又兵, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留带兜,地道東北人枫笛。 一個月前我還...
    沈念sama閱讀 45,497評論 2 354
  • 正文 我出身青樓,卻偏偏與公主長得像刚照,于是被迫代替她去往敵國和親刑巧。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 42,786評論 2 345

推薦閱讀更多精彩內容

  • stream 流是一個抽象接口无畔,在 Node 里被不同的對象實現啊楚。例如 request to an HTTP se...
    明明三省閱讀 3,392評論 1 10
  • --如果你想成為一個Node高手,那么流一定是武功秘籍中不可缺少的一個部分 詳細講解stream以及一些實例 ht...
    lmem閱讀 941評論 0 1
  • 流是Node中最重要的組件和模式之一浑彰。在社區(qū)里有一句格言說:讓一切事務流動起來恭理。這已經足夠來描述在Node中流...
    宮若石閱讀 539評論 0 0
  • https://nodejs.org/api/documentation.html 工具模塊 Assert 測試 ...
    KeKeMars閱讀 6,305評論 0 6
  • 為什么應該使用流 你可能看過這樣的代碼。 這段代碼中郭变,服務器每收到一次請求颜价,就會先把data.txt讀入到內存中,...
    饑人谷_xxxxx閱讀 10,791評論 1 12