流的基本概念及理解
流是一種數(shù)據(jù)傳輸手段,是有順序的融撞,有起點(diǎn)和終點(diǎn)勉耀,比如你要把數(shù)據(jù)從一個(gè)地方傳到另外一個(gè)地方
流非常重要指煎,gulp,webpack,HTTP里的請求和響應(yīng)便斥,http里的socket都是流至壤,包括后面壓縮,加密等流為什么這么好用還這么重要呢枢纠?
- 因?yàn)橛袝r(shí)候我們不關(guān)心文件的主體內(nèi)容像街,只關(guān)心能不能取到數(shù)據(jù),取到數(shù)據(jù)之后怎么進(jìn)行處理
- 對(duì)于小型的文本文件晋渺,我們可以把文件內(nèi)容全部讀入內(nèi)存镰绎,然后再寫入文件,比如grunt-file-copy
- 對(duì)于體積較大的二進(jìn)制文件木西,比如音頻畴栖、視頻文件,動(dòng)輒幾個(gè)GB大小八千,如果使用這種方法吗讶,很容易使內(nèi)存“爆倉”。
- 理想的方法應(yīng)該是讀一部分恋捆,寫一部分照皆,不管文件有多大,只要時(shí)間允許鸠信,總會(huì)處理完成纵寝,這里就需要用到流的概念
流是一個(gè)抽象接口,被Node中很多對(duì)象所實(shí)現(xiàn),比如HTTP服務(wù)器request和response對(duì)象都是流
Node.js 中有四種基本的流類型:
- Readable - 可讀的流 (例如 fs.createReadStream()).
- Writable - 可寫的流 (例如 fs.createWriteStream()).
- Duplex - 可讀寫的流 (例如 net.Socket).
- Transform - 在讀寫過程中可以修改和變換數(shù)據(jù)的 Duplex 流 (例如 zlib.createDeflate()).
可以通過 require('stream') 加載 Stream 基類爽茴。其中包括了 Readable 流葬凳、Writable 流、Duplex 流和 Transform 流的基類
Readable streams可讀流
可讀流(Readable streams)是對(duì)提供數(shù)據(jù)的 源頭(source)的抽象
可讀流的例子包括:
- HTTP responses, on the client :客戶端請求
- HTTP requests, on the server :服務(wù)端請求
- fs read streams :讀文件
- zlib streams :壓縮
- crypto streams :加密
- TCP sockets :TCP協(xié)議
- child process stdout and stderr :子進(jìn)程標(biāo)準(zhǔn)輸出和錯(cuò)誤輸出
- process.stdin :標(biāo)準(zhǔn)輸入
所有的 Readable 都實(shí)現(xiàn)了 stream.Readable 類定義的接口
通過流讀取數(shù)據(jù)
- 用Readable創(chuàng)建對(duì)象readable后室奏,便得到了一個(gè)可讀流
- 如果實(shí)現(xiàn)_read方法火焰,就將流連接到一個(gè)底層數(shù)據(jù)源
- 流通過調(diào)用_read向底層請求數(shù)據(jù),底層再調(diào)用流的push方法將需要的數(shù)據(jù)傳遞過來
- 當(dāng)readable連接了數(shù)據(jù)源后胧沫,下游便可以調(diào)用readable.read(n)向流請求數(shù)據(jù)昌简,同時(shí)監(jiān)聽readable的data事件來接收取到的數(shù)據(jù)
下面簡單舉個(gè)可讀流的例子:
- 監(jiān)聽可讀流的data事件,當(dāng)你一旦開始監(jiān)聽data事件的時(shí)候绒怨,流就可以讀文件的內(nèi)容并且發(fā)射data纯赎,讀一點(diǎn)發(fā)射一點(diǎn)讀一點(diǎn)發(fā)射一點(diǎn)
- 默認(rèn)情況下,當(dāng)你監(jiān)聽data事件之后南蹂,會(huì)不停的讀數(shù)據(jù)犬金,然后觸發(fā)data事件,觸發(fā)完data事件后再次讀數(shù)據(jù)
- 讀的時(shí)候不是把文件整體內(nèi)容讀出來再發(fā)射出來的六剥,而且設(shè)置一個(gè)緩沖區(qū)晚顷,大小默認(rèn)是64K,比如文件是128K疗疟,先讀64K發(fā)射出來该默,再讀64K在發(fā)射出來,會(huì)發(fā)射兩次
- 緩沖區(qū)的大小可以通過highWaterMark來設(shè)置
let fs = require('fs');
//通過創(chuàng)建一個(gè)可讀流
let rs = fs.createReadStream('./1.txt',{
flags:'r',//我們要對(duì)文件進(jìn)行何種操作
mode:0o666,//權(quán)限位
encoding:'utf8',//不傳默認(rèn)為buffer策彤,顯示為字符串
start:3,//從索引為3的位置開始讀
//這是我的見過唯一一個(gè)包括結(jié)束索引的
end:8,//讀到索引為8結(jié)束
highWaterMark:3//緩沖區(qū)大小
});
rs.on('open',function () {
console.log('文件打開');
});
rs.setEncoding('utf8');//顯示為字符串
//希望流有一個(gè)暫停和恢復(fù)觸發(fā)的機(jī)制
rs.on('data',function (data) {
console.log(data);
rs.pause();//暫停讀取和發(fā)射data事件
setTimeout(function(){
rs.resume();//恢復(fù)讀取并觸發(fā)data事件
},2000);
});
//如果讀取文件出錯(cuò)了栓袖,會(huì)觸發(fā)error事件
rs.on('error',function () {
console.log("error");
});
//如果文件的內(nèi)容讀完了,會(huì)觸發(fā)end事件
rs.on('end',function () {
console.log('讀完了');
});
rs.on('close',function () {
console.log('文件關(guān)閉');
});
/**
文件打開
334
455
讀完了
文件關(guān)閉
**/
可讀流的簡單實(shí)現(xiàn)
let fs = require('fs');
let ReadStream = require('./ReadStream');
let rs = ReadStream('./1.txt', {
flags: 'r',
encoding: 'utf8',
start: 3,
end: 7,
highWaterMark: 3
});
rs.on('open', function () {
console.log("open");
});
rs.on('data', function (data) {
console.log(data);
});
rs.on('end', function () {
console.log("end");
});
rs.on('close', function () {
console.log("close");
});
/**
open
456
789
end
close
**/
let fs = require('fs');
let EventEmitter = require('events');
class ReadStream extends EventEmitter {
constructor(path, options) {
super(path, options);
this.path = path;
this.highWaterMark = options.highWaterMark || 64 * 1024;
this.buffer = Buffer.alloc(this.highWaterMark);
this.flags = options.flags || 'r';
this.encoding = options.encoding;
this.mode = options.mode || 0o666;
this.start = options.start || 0;
this.end = options.end;
this.pos = this.start;
this.autoClose = options.autoClose || true;
this.bytesRead = 0;
this.closed = false;
this.flowing;
this.needReadable = false;
this.length = 0;
this.buffers = [];
this.on('end', function () {
if (this.autoClose) {
this.destroy();
}
});
this.on('newListener', (type) => {
if (type == 'data') {
this.flowing = true;
this.read();
}
if (type == 'readable') {
this.read(0);
}
});
this.open();
}
open() {
fs.open(this.path, this.flags, this.mode, (err, fd) => {
if (err) {
if (this.autoClose) {
this.destroy();
return this.emit('error', err);
}
}
this.fd = fd;
this.emit('open');
});
}
read(n) {
if (typeof this.fd != 'number') {
return this.once('open', () => this.read());
}
n = parseInt(n, 10);
if (n != n) {
n = this.length;
}
if (this.length == 0)
this.needReadable = true;
let ret;
if (0 < n < this.length) {
ret = Buffer.alloc(n);
let b;
let index = 0;
while (null != (b = this.buffers.shift())) {
for (let i = 0; i < b.length; i++) {
ret[index++] = b[i];
if (index == ret.length) {
this.length -= n;
b = b.slice(i + 1);
this.buffers.unshift(b);
break;
}
}
}
if (this.encoding) ret = ret.toString(this.encoding);
}
let _read = () => {
let m = this.end ? Math.min(this.end - this.pos + 1, this.highWaterMark) : this.highWaterMark;
fs.read(this.fd, this.buffer, 0, m, this.pos, (err, bytesRead) => {
if (err) {
return
}
let data;
if (bytesRead > 0) {
data = this.buffer.slice(0, bytesRead);
this.pos += bytesRead;
this.length += bytesRead;
if (this.end && this.pos > this.end) {
if (this.needReadable) {
this.emit('readable');
}
this.emit('end');
} else {
this.buffers.push(data);
if (this.needReadable) {
this.emit('readable');
this.needReadable = false;
}
}
} else {
if (this.needReadable) {
this.emit('readable');
}
return this.emit('end');
}
})
}
if (this.length == 0 || (this.length < this.highWaterMark)) {
_read(0);
}
return ret;
}
destroy() {
fs.close(this.fd, (err) => {
this.emit('close');
});
}
pause() {
this.flowing = false;
}
resume() {
this.flowing = true;
this.read();
}
pipe(dest) {
this.on('data', (data) => {
let flag = dest.write(data);
if (!flag) this.pause();
});
dest.on('drain', () => {
this.resume();
});
this.on('end', () => {
dest.end();
});
}
}
module.exports = ReadStream;
自定義可讀流
為了實(shí)現(xiàn)可讀流锅锨,引用Readable接口并用它構(gòu)造新對(duì)象
- 我們可以直接把供使用的數(shù)據(jù)push出去叽赊。
- 當(dāng)push一個(gè)null對(duì)象就意味著我們想發(fā)出信號(hào)——這個(gè)流沒有更多數(shù)據(jù)了
var stream = require('stream');
var util = require('util');
util.inherits(Counter, stream.Readable);
function Counter(options) {
stream.Readable.call(this, options);
this._index = 0;
}
Counter.prototype._read = function() {
if(this._index++<3){
this.push(this._index+'');
}else{
this.push(null);
}
};
var counter = new Counter();
counter.on('data', function(data){
console.log("讀到數(shù)據(jù): " + data.toString());//no maybe
});
counter.on('end', function(data){
console.log("讀完了");
});
可讀流的兩種模式
Readable Stream 存在兩種模式(flowing mode 與 paused mode),這兩種模式?jīng)Q定了chunk數(shù)據(jù)流動(dòng)的方式---自動(dòng)流動(dòng)還是手工流動(dòng)必搞。那如何觸發(fā)這兩種模式呢:
- flowing mode: 注冊事件data必指、調(diào)用resume方法、調(diào)用pipe方法
- paused mode: 調(diào)用pause方法(沒有pipe方法)恕洲、移除data事件 && unpipe所有pipe
如果 Readable 切換到 flowing 模式塔橡,且沒有消費(fèi)者處理流中的數(shù)據(jù),這些數(shù)據(jù)將會(huì)丟失霜第。 比如葛家, 調(diào)用了 readable.resume() 方法卻沒有監(jiān)聽 'data' 事件,或是取消了 'data' 事件監(jiān)聽泌类,就有可能出現(xiàn)這種情況
可讀流的三種狀態(tài)
在任意時(shí)刻癞谒,任意可讀流應(yīng)確切處于下面三種狀態(tài)之一:
- readable._readableState.flowing = null
- readable._readableState.flowing = false
- readable._readableState.flowing = true
兩種模式取決于可讀流flowing狀態(tài):
- 若為true : flowing mode;
- 若為false : paused mode
flowing mode
通過注冊data、pipe弹砚、resume可以自動(dòng)獲取所需要的數(shù)據(jù)双仍,我們來看下源碼的實(shí)現(xiàn)
// data事件觸發(fā)flowing mode
if (ev === 'data') {
// Start flowing on next tick if stream isn't explicitly paused
if (this._readableState.flowing !== false)
this.resume();
} else if (ev === 'readable') {
const state = this._readableState;
if (!state.endEmitted && !state.readableListening) {
state.readableListening = state.needReadable = true;
state.emittedReadable = false;
if (!state.reading) {
process.nextTick(nReadingNextTick, this);
} else if (state.length) {
emitReadable(this);
}
}
}
// resume觸發(fā)flowing mode
Readable.prototype.resume = function() {
var state = this._readableState;
if (!state.flowing) {
debug('resume');
state.flowing = true;
resume(this, state);
}
return this;
}
// pipe方法觸發(fā)flowing模式
Readable.prototype.resume = function() {
if (!state.flowing) {
this.resume()
}
}
flowing mode的三種方法最后均是通過resume方法,將狀態(tài)變?yōu)閠rue:state.flowing = true
paused mode
在paused mode下桌吃,需要手動(dòng)地讀取數(shù)據(jù)朱沃,并且可以直接指定讀取數(shù)據(jù)的長度
可以通過監(jiān)聽事件readable,觸發(fā)時(shí)手工讀取chunk數(shù)據(jù):
- 當(dāng)你監(jiān)聽 readable事件的時(shí)候茅诱,會(huì)進(jìn)入暫停模式
- 當(dāng)監(jiān)聽readable事件的時(shí)候逗物,可讀流會(huì)馬上去向底層讀取文件,然后把讀到文件的文件放在緩存區(qū)里const state = this._readableState;
- self.read(0); 只填充緩存瑟俭,但是并不會(huì)發(fā)射data事件,但是會(huì)發(fā)射stream.emit('readable');事件
- this._read(state.highWaterMark); 每次調(diào)用底層的方法讀取的時(shí)候是讀取3個(gè)字節(jié)
let fs = require('fs');
let rs = fs.createReadStream('./1.txt',{
highWaterMark:3
});
rs.on('readable',function(){
console.log(rs._readableState.length);
//read如果不加參數(shù)表示讀取整個(gè)緩存區(qū)數(shù)據(jù)
//讀取一個(gè)字段,如果可讀流發(fā)現(xiàn)你要讀的字節(jié)小于等于緩存字節(jié)大小翎卓,則直接返回
let chunk = rs.read(1);
console.log(chunk);
console.log(rs._readableState.length);
//當(dāng)你讀完指定的字節(jié)后,如果可讀流發(fā)現(xiàn)剩下的字節(jié)已經(jīng)比最高水位線小了摆寄。則會(huì)立馬再次讀取填滿 最高水位線
setTimeout(function(){
console.log(rs._readableState.length);
},200)
});
注意:一旦注冊了readable事件莲祸,必須手工讀取read數(shù)據(jù),否則數(shù)據(jù)就會(huì)流失椭迎,我們來看下源碼的實(shí)現(xiàn)
function emitReadable(stream) {
var state = stream._readableState;
state.needReadable = false;
if (!state.emittedReadable) {
debug('emitReadable', state.flowing);
state.emittedReadable = true;
process.nextTick(emitReadable_, stream);
}
}
function emitReadable_(stream) {
var state = stream._readableState;
debug('emit readable');
if (!state.destroyed && (state.length || state.ended)) {
stream.emit('readable');
}
state.needReadable = !state.flowing && !state.ended;
flow(stream);
}
function flow(stream) {
const state = stream._readableState;
debug('flow', state.flowing);
while (state.flowing && stream.read() !== null);
}
function endReadable(stream) {
var state = stream._readableState;
debug('endReadable', state.endEmitted);
if (!state.endEmitted) {
state.ended = true;
process.nextTick(endReadableNT, state, stream);
}
}
Readable.prototype.read = function(n) {
debug('read', n);
n = parseInt(n, 10);
var state = this._readableState;
var nOrig = n;
if (n !== 0)
state.emittedReadable = false;
if (n === 0 &&
state.needReadable &&
(state.length >= state.highWaterMark || state.ended)) {
debug('read: emitReadable', state.length, state.ended);
if (state.length === 0 && state.ended)
endReadable(this);
else
emitReadable(this);
return null;
}
n = howMuchToRead(n, state);
if (n === 0 && state.ended) {
if (state.length === 0)
endReadable(this);
return null;
}
flow方法直接read數(shù)據(jù),將得到的數(shù)據(jù)通過事件data交付出去田盈,然而此處沒有注冊data事件監(jiān)控畜号,因此,得到的chunk數(shù)據(jù)并沒有交付給任何對(duì)象允瞧,這樣數(shù)據(jù)就白白流失了简软,所以在觸發(fā)emit('readable')時(shí),需要提前read數(shù)據(jù)
Writable streams可寫流
可寫流是對(duì)數(shù)據(jù)寫入'目的地'的一種抽象
Writable:可寫流的例子包括了:
- HTTP requests, on the client 客戶端請求
- HTTP responses, on the server 服務(wù)器響應(yīng)
- fs write streams 文件
- zlib streams 壓縮
- crypto streams 加密
- TCP sockets TCP服務(wù)器
- child process stdin 子進(jìn)程標(biāo)準(zhǔn)輸入
- process.stdout, process.stderr 標(biāo)準(zhǔn)輸出述暂,錯(cuò)誤輸出
下面舉個(gè)可寫流的簡單例子
- 當(dāng)你往可寫流里寫數(shù)據(jù)的時(shí)候痹升,不是會(huì)立刻寫入文件的,而是會(huì)很寫入緩存區(qū)畦韭,緩存區(qū)的大小就是highWaterMark,默認(rèn)值是16K疼蛾。然后等緩存區(qū)滿了之后再次真正的寫入文件里
let fs = require('fs');
let ws = fs.createWriteStream('./2.txt',{
flags:'w',
mode:0o666,
start:3,
highWaterMark:3//默認(rèn)是16K
});
- 如果緩存區(qū)已滿 ,返回false,如果緩存區(qū)未滿艺配,返回true
- 如果能接著寫察郁,返回true,如果不能接著寫,返回false
- 按理說如果返回了false,就不能再往里面寫了转唉,但是如果你真寫了皮钠,如果也不會(huì)丟失,會(huì)緩存在內(nèi)存里赠法。等緩存區(qū)清空之后再從內(nèi)存里讀出來
let flag = ws.write('1');
console.log(flag);//true
flag =ws.write('2');
console.log(flag);//true
flag =ws.write('3');
console.log(flag);//false
flag =ws.write('4');
console.log(flag);//false
'drain' 事件
如果調(diào)用 stream.write(chunk) 方法返回 false麦轰,流將在適當(dāng)?shù)臅r(shí)機(jī)觸發(fā) 'drain' 事件,這時(shí)才可以繼續(xù)向流中寫入數(shù)據(jù)
當(dāng)一個(gè)流不處在 drain 的狀態(tài), 對(duì) write() 的調(diào)用會(huì)緩存數(shù)據(jù)塊款侵, 并且返回 false末荐。 一旦所有當(dāng)前所有緩存的數(shù)據(jù)塊都排空了(被操作系統(tǒng)接受來進(jìn)行輸出), 那么 'drain' 事件就會(huì)被觸發(fā)
建議喳坠, 一旦 write() 返回 false鞠评, 在 'drain' 事件觸發(fā)前, 不能寫入任何數(shù)據(jù)塊
舉個(gè)簡單的例子說明一下:
let fs = require('fs');
let ws = fs.createWriteStream('2.txt',{
flags:'w',
mode:0o666,
start:0,
highWaterMark:3
});
let count = 9;
function write(){
let flag = true;//緩存區(qū)未滿
//寫入方法是同步的壕鹉,但是寫入文件的過程是異步的剃幌。在真正寫入文件后還會(huì)執(zhí)行我們的回調(diào)函數(shù)
while(flag && count>0){
console.log('before',count);
flag = ws.write((count)+'','utf8',(function (i) {
return ()=>console.log('after',i);
})(count));
count--;
}
}
write();//987
//監(jiān)聽緩存區(qū)清空事件
ws.on('drain',function () {
console.log('drain');
write();//654 321
});
ws.on('error',function (err) {
console.log(err);
});
/**
before 9
before 8
before 7
after 9
after 8
after 7
**/
如果已經(jīng)不再需要寫入了,可以調(diào)用end方法關(guān)閉寫入流晾浴,一旦調(diào)用end方法之后則不能再寫入
比如在ws.end();
后寫ws.write('x');
负乡,會(huì)報(bào)錯(cuò)write after end
'pipe'事件
linux精典的管道的概念,前者的輸出是后者的輸入
pipe是一種最簡單直接的方法連接兩個(gè)stream脊凰,內(nèi)部實(shí)現(xiàn)了數(shù)據(jù)傳遞的整個(gè)過程抖棘,在開發(fā)的時(shí)候不需要關(guān)注內(nèi)部數(shù)據(jù)的流動(dòng)
- 這個(gè)方法從可讀流拉取所有數(shù)據(jù), 并將數(shù)據(jù)寫入到提供的目標(biāo)中
- 自動(dòng)管理流量,將數(shù)據(jù)的滯留量限制到一個(gè)可接受的水平狸涌,以使得不同速度的來源和目標(biāo)不會(huì)淹沒可用內(nèi)存
- 默認(rèn)情況下切省,當(dāng)源數(shù)據(jù)流觸發(fā) end的時(shí)候調(diào)用end(),所以寫入數(shù)據(jù)的目標(biāo)不可再寫帕胆。傳 { end:false }作為options朝捆,可以保持目標(biāo)流打開狀態(tài)
pipe方法的原理
var fs = require('fs');
var ws = fs.createWriteStream('./2.txt');
var rs = fs.createReadStream('./1.txt');
rs.on('data', function (data) {
var flag = ws.write(data);
if(!flag)
rs.pause();
});
ws.on('drain', function () {
rs.resume();
});
rs.on('end', function () {
ws.end();
});
下面舉個(gè)簡單的例子說明一下pipe的用法:
let fs = require('fs');
let rs = fs.createReadStream('./1.txt',{
highWaterMark:3
});
let ws = fs.createWriteStream('./2.txt',{
highWaterMark:3
});
rs.pipe(ws);
//移除目標(biāo)可寫流
rs.unpipe(ws);
- 當(dāng)監(jiān)聽可讀流data事件的時(shí)候會(huì)觸發(fā)回調(diào)函數(shù)的執(zhí)行
- 可以實(shí)現(xiàn)數(shù)據(jù)的生產(chǎn)者和消費(fèi)者速度的均衡
rs.on('data',function (data) {
console.log(data);
let flag = ws.write(data);
if(!flag){
rs.pause();
}
});
- 監(jiān)聽可寫流緩存區(qū)清空事件,當(dāng)所有要寫入的數(shù)據(jù)寫入完成后懒豹,接著恢復(fù)從可讀流里讀取并觸發(fā)data事件
ws.on('drain',function () {
console.log('drain');
rs.resume();
});
unpipe
readable.unpipe()方法將之前通過stream.pipe()方法綁定的流分離
- 如果寫入的目標(biāo)沒有傳入, 則所有綁定的流都會(huì)被分離
- 如果指定了寫入的目標(biāo)芙盘,但是沒有綁定流,則什么事情都不會(huì)發(fā)生
簡單距離說明下unpipe的用法:
let fs = require('fs');
var from = fs.createReadStream('./1.txt');
var to = fs.createWriteStream('./2.txt');
from.pipe(to);
setTimeout(() => {
console.log('關(guān)閉向2.txt的寫入');
from.unpipe(writable);
console.log('手工關(guān)閉文件流');
to.end();
}, 1000);
pipe的簡單實(shí)現(xiàn)
let fs = require('fs');
let ReadStream = require('./ReadStream');
let rs = ReadStream('./1.txt', {
flags: 'r',
encoding: 'utf8',
highWaterMark: 3
});
let FileWriteStream = require('./WriteStream');
let ws = FileWriteStream('./2.txt',{
flags:'w',
encoding:'utf8',
highWaterMark:3
});
rs.pipe(ws);
ReadStream.prototype.pipe = function (dest) {
this.on('data', (data)=>{
let flag = dest.write(data);
if(!flag){
this.pause();
}
});
dest.on('drain', ()=>{
this.resume();
});
this.on('end', ()=>{
dest.end();
});
}
ReadStream.prototype.pause = function(){
this.flowing = false;
}
ReadStream.prototype.resume = function(){
this.flowing = true;
this.read();
}
自定義管道流
const stream = require('stream')
var index = 0;
const readable = stream.Readable({
highWaterMark: 2,
read: function () {
process.nextTick(() => {
console.log('push', ++index)
this.push(index+'');
})
}
})
const writable = stream.Writable({
highWaterMark: 2,
write: function (chunk, encoding, next) {
console.log('寫入:', chunk.toString())
}
})
readable.pipe(writable);
可寫流的簡單實(shí)現(xiàn)
let fs = require('fs');
let FileWriteStream = require('./FileWriteStream');
let ws = FileWriteStream('./2.txt',{
flags:'w',
encoding:'utf8',
highWaterMark:3
});
let i = 10;
function write(){
let flag = true;
while(i&&flag){
flag = ws.write("1",'utf8',(function(i){
return function(){
console.log(i);
}
})(i));
i--;
console.log(flag);
}
}
write();
ws.on('drain',()=>{
console.log("drain");
write();
});
/**
10
9
8
drain
7
6
5
drain
4
3
2
drain
1
**/
let EventEmitter = require('events');
let util = require('util');
let fs = require('fs');
util.inherits(WriteStream, EventEmitter);
function WriteStream(path, options) {
EventEmitter.call(this);
if (!(this instanceof WriteStream)) {
return new WriteStream(path, options);
}
this.path = path;
this.fd = options.fd;
this.encoding = options.encoding||'utf8';
this.flags = options.flags || 'w';
this.mode = options.mode || 0o666;
this.autoClose = options.autoClose || true;
this.start = options.start || 0;
this.pos = this.start;//開始寫入的索引位置
this.open();//打開文件進(jìn)行操作
this.writing = false;//沒有在寫入過程 中
this.buffers = [];
this.highWaterMark = options.highWaterMark||16*1024;
//如果監(jiān)聽到end事件脸秽,而且要求自動(dòng)關(guān)閉的話則關(guān)閉文件
this.on('end', function () {
if (this.autoClose) {
this.destroy()
}
});
}
WriteStream.prototype.close = function(){
fs.close(this.fd,(err)=>{
if(err)
this.emit('error',err);
});
}
WriteStream.prototype.open = function () {
fs.open(this.path, this.flags, this.mode, (err, fd) => {
if (err)
return this.emit('error', err);
this.fd = fd;//把文件描述符賦給當(dāng)前實(shí)例的fd屬性
//發(fā)射open事件
this.emit('open', fd);
});
}
/**
* 會(huì)判斷當(dāng)前是后臺(tái)是否在寫入過程中儒老,如果在寫入過程中,則把這個(gè)數(shù)據(jù)放在待處理的緩存中记餐,如果不在寫入過程中驮樊,可以直接寫。
*/
WriteStream.prototype.write = function (chunk, encoding, cb) {
chunk= Buffer.isBuffer(chunk)?chunk:Buffer.from(chunk,this.encoding);
//先把數(shù)據(jù)放在緩存里
this.buffers.push({
chunk,
encoding,
cb
});
let isFull = this.buffers.reduce((len, item) => len + item.chunk.length, 0)>=this.highWaterMark;
//只有當(dāng)緩存區(qū)寫滿了片酝,那么清空緩存區(qū)的時(shí)候才會(huì)發(fā)射drain事件巩剖,否則 不發(fā)放
this.needDrain = isFull;
//如果說文件還沒有打開,則把寫入的方法壓入open事件的監(jiān)聽函數(shù)钠怯。等文件一旦打開佳魔,立刻執(zhí)行寫入操作
if (typeof this.fd !== 'number') {
this.once('open', () => {
this._write();
});
return !isFull;
}else{
if(!this.writing){
setImmediate(()=>{
this._write();
this.writing = true;
});
}
return !isFull;
}
}
WriteStream.prototype._write = function () {
let part = this.buffers.shift();
if (part) {
fs.write(this.fd,part.chunk,0,part.chunk.length,null,(err,bytesWritten)=>{
if(err)return this.emit('error',err);
part.cb && part.cb();
this._write();
});
}else{
//發(fā)射一個(gè)緩存區(qū)清空的事件
this.emit('drain');
this.writing = false;
}
}
module.exports = WriteStream;
自定義可寫流
為了實(shí)現(xiàn)可寫流,我們需要使用流模塊中的Writable構(gòu)造函數(shù)晦炊。 我們只需給Writable構(gòu)造函數(shù)傳遞一些選項(xiàng)并創(chuàng)建一個(gè)對(duì)象鞠鲜。唯一需要的選項(xiàng)是write函數(shù)宁脊,該函數(shù)揭露數(shù)據(jù)塊要往哪里寫
- chunk通常是一個(gè)buffer,除非我們配置不同的流贤姆。
- encoding是在特定情況下需要的參數(shù)榆苞,通常我們可以忽略它。
- callback是在完成處理數(shù)據(jù)塊后需要調(diào)用的函數(shù)霞捡。這是寫數(shù)據(jù)成功與否的標(biāo)志坐漏。若要發(fā)出故障信號(hào),請用錯(cuò)誤對(duì)象調(diào)用回調(diào)函數(shù)
var stream = require('stream');
var util = require('util');
util.inherits(Writer, stream.Writable);
let stock = [];
function Writer(opt) {
stream.Writable.call(this, opt);
}
Writer.prototype._write = function(chunk, encoding, callback) {
setTimeout(()=>{
stock.push(chunk.toString('utf8'));
console.log("增加: " + chunk);
callback();
},500)
};
var w = new Writer();
for (var i=1; i<=5; i++){
w.write("項(xiàng)目:" + i, 'utf8');
}
w.end("結(jié)束寫入",function(){
console.log(stock);
});
Duplex streams可讀寫的流(雙工流)
Duplex 流是同時(shí)實(shí)現(xiàn)了 Readable 和 Writable 接口的流
雙工流的可讀性和可寫性操作完全獨(dú)立于彼此碧信,這僅僅是將兩個(gè)特性組合成一個(gè)對(duì)象Duplex 流的實(shí)例包括了:
- TCP sockets
- zlib streams
- crypto streams
下面簡單實(shí)現(xiàn)雙工流:
const {Duplex} = require('stream');
const inoutStream = new Duplex({
write(chunk, encoding, callback) {
console.log(chunk.toString());
callback();
},
read(size) {
this.push((++this.index)+'');
if (this.index > 3) {
this.push(null);
}
}
});
inoutStream.index = 0;
process.stdin.pipe(inoutStream).pipe(process.stdout);
Transform streams轉(zhuǎn)換流
變換流(Transform streams) 是一種 Duplex 流赊琳。它的輸出與輸入是通過某種方式關(guān)聯(lián)的。和所有 Duplex 流一樣砰碴,變換流同時(shí)實(shí)現(xiàn)了 Readable 和 Writable 接口
轉(zhuǎn)換流的輸出是從輸入中計(jì)算出來的
對(duì)于轉(zhuǎn)換流躏筏,我們不必實(shí)現(xiàn)read或write的方法,我們只需要實(shí)現(xiàn)一個(gè)transform方法呈枉,將兩者結(jié)合起來趁尼。它有write方法的意思,我們也可以用它來push數(shù)據(jù)變換流的實(shí)例包括:
- zlib streams
- crypto streams
下面簡單實(shí)現(xiàn)轉(zhuǎn)換流:
const {Transform} = require('stream');
const upperCase = new Transform({
transform(chunk, encoding, callback) {
this.push(chunk.toString().toUpperCase());
callback();
}
});
process.stdin.pipe(upperCase).pipe(process.stdout);
對(duì)象流
默認(rèn)情況下猖辫,流處理的數(shù)據(jù)是Buffer/String類型的值酥泞。有一個(gè)objectMode標(biāo)志,我們可以設(shè)置它讓流可以接受任何JavaScript對(duì)象
const {Transform} = require('stream');
let fs = require('fs');
let rs = fs.createReadStream('./users.json');
rs.setEncoding('utf8');
let toJson = Transform({
readableObjectMode: true,
transform(chunk, encoding, callback) {
this.push(JSON.parse(chunk));
callback();
}
});
let jsonOut = Transform({
writableObjectMode: true,
transform(chunk, encoding, callback) {
console.log(chunk);
callback();
}
});
rs.pipe(toJson).pipe(jsonOut);