背景
之前在開發(fā)ASP.NET的時候阻桅,根據(jù)源代碼依次追蹤整個程序的執(zhí)行過程楣黍,發(fā)現(xiàn)最重要的過程是基于一個管道流的附迷,像水管一樣惧互,依次處理管道流中的十幾個事件拯刁,當時對流的認知是四個字,依次執(zhí)行痘煤。那么現(xiàn)在做Node的開發(fā)哩治,對于Node中的流是另四個字,那就是源源不斷艾猜。本篇文章主要目的是帶大家手寫可讀流與可寫流买喧。
簡介
在Node中,請求流匆赃,響應流淤毛,文件流等都是基于stream模塊封裝的。簡單的理解算柳,流就是將大塊的東西低淡,分小塊依次處理。就像你需要10kg的水瞬项,水管就一點點的源源不斷的流出來給你蔗蹋。又如在程序中
fs.readFileSync('/demo.txt', {encoding:'utf8'});fs.writeFileSync('/demo.txt', data);
以上兩個方法是把文件內(nèi)容全部讀入內(nèi)存,然后再寫入文件囱淋,但是如果文件過大就會出現(xiàn)問題了猪杭,內(nèi)存容易爆掉。這里就需要用到流了妥衣,一點點的讀取或者寫入皂吮。
分類
- Readable - 可讀的流 (例如 fs.createReadStream()).
- Writable - 可寫的流 (例如 fs.createWriteStream()).
- Duplex - 可讀寫的流 (例如 net.Socket).
- Transform - 在讀寫過程中可以修改和變換數(shù)據(jù)的 Duplex 流 (例如 zlib.createDeflate()).
Readable - 可讀的流介紹與實現(xiàn)
可讀流分為兩種模式:flowing 和 paused,并且兩種模式可以相互轉(zhuǎn)換
1.在 flowing 模式下税手, 可讀流自動從系統(tǒng)底層讀取數(shù)據(jù)蜂筹,并通過 EventEmitter 接口的事件盡快將數(shù)據(jù)提供給應用。
2.在 paused 模式下冈止,必須顯式調(diào)用 stream.read() 方法來從流中讀取數(shù)據(jù)片段狂票。
3.所有初始工作模式為 paused 的 Readable 流,可以通過下面三種途徑切換到 flowing 模式:
- 監(jiān)聽 'data' 事件
- 調(diào)用 stream.resume() 方法
- 調(diào)用 stream.pipe() 方法將數(shù)據(jù)發(fā)送到 Writable
4.可讀流可以通過下面途徑切換到 paused 模式:
- 如果不存在管道目標(pipe destination)熙暴,可以通過調(diào)用 stream.pause() 方法實現(xiàn)闺属。
- 如果存在管道目標,可以通過取消 'data' 事件監(jiān)聽周霉,并調(diào)用 stream.unpipe() 方法移除所有管道目標來實現(xiàn)掂器。
5.為了便于理解,這里分開寫兩種模式俱箱,下面為flowing模式基本實現(xiàn)
- 定義類首先要繼承自EventEmitter国瓮,因為要發(fā)射監(jiān)聽事件,在構(gòu)造函數(shù)中依次定義各個參數(shù),其中需要說明的是this.flowing 用于切換模式乃摹,this.buffer并非是緩存禁漓,因為流動模式是默認不走緩存的,這個buffer是讀取的時候fs.read的一個參數(shù)孵睬,所有的事件監(jiān)聽到都會執(zhí)行newListener播歼。當該類被構(gòu)造的時候就打開文件,并監(jiān)聽事件掰读,如果是data秘狞,默認走流動模式開始讀取。
class ReadStream extends EventEmitter{
constructor(path,options){
super(path,options);
this.path = path;//寫入路徑
this.flags = options.flags || 'r'; //操作修飾符
this.mode = options.mode || 0o666; //權(quán)限
this.autoClose = options.autoClose;//是否自動關(guān)閉
this.highWaterMark = options.highWaterMark || 64 * 1024; //默認64k
this.pos = this.start = options.start || 0;//起始位置
this.end = options.end;//結(jié)束位置
this.encoding = options.encoding;//編碼
this.flowing = null;//流動模式
this.buffer = Buffer.alloc(this.highWaterMark);//讀取的buffer 不是緩存
this.open();
this.on('newListener',(type,listener)=>{
if (type == 'data') {
this.flowing = true;
this.read();
}
})
}
- open方法 打開傳入路徑的文件 如果出錯并且設置自動關(guān)閉屬性蹈集,直接關(guān)閉打開烁试,發(fā)射error事件。如果成功了拢肆,發(fā)射open事件减响,供讀取方法接收。
open() {
fs.open(this.path,this.flags,this.mode,(err,fd)=>{
if(err){
if(this.autoClose){
this.destroy();
return this.emit('error',err);
}
}
this.fd = fd;
this.emit('open');
})
}
- 最重要的方法Read郭怪,一進入就需要判斷文件是否已經(jīng)打開了辩蛋,因為文件打開是異步的一個過程,此時可能并未打開移盆,如果沒有打開就監(jiān)聽發(fā)射的open事件,然后在回調(diào)函數(shù)中進行read方法的調(diào)用伤为。其中核心是每次需要讀多少咒循,這個量由傳入的開始結(jié)束位置已經(jīng)最高水位線決定的。最高水位線代表依次最多讀取多少绞愚,默認值是64kb叙甸。那么每次讀取的值 howMuchToRead = this.end?Math.min(this.end - this.pos + 1,this.highWaterMark):this.highWaterMark;
read(){
if(typeof this.fd != 'number'){
return this.once('open',()=>this.read());
}
let howMuchToRead = this.end?Math.min(this.end - this.pos + 1,this.highWaterMark):this.highWaterMark;
fs.read(this.fd,this.buffer,0,howMuchToRead,this.pos,(err,bytes)=>{//bytes是實際讀到的字節(jié)數(shù)
if(err){
if(this.autoClose)
this.destroy();
return this.emit('error',err);
}
if(bytes){
let data = this.buffer.slice(0,bytes);
this.pos += bytes;
data = this.encoding?data.toString(this.encoding):data;
this.emit('data',data);
if(this.end && this.pos > this.end){
return this.endFn();
}else{
if(this.flowing)
this.read();
}
}else{
return this.endFn();
}
})
}
- pipe方法實現(xiàn) pipe方法就是邊讀取邊寫入,控制讀寫速度位衩,當可寫流的寫入返回false時裆蒸,暫停讀取,當可寫流觸發(fā)drain事件后糖驴,恢復讀取僚祷。
pipe(ws){
this.on('data',data =>{
let flag = ws.write(data);
if (!flag) {
this.pause();
}
})
ws.on('drain',()=>{
this.resume();
})
}
pause(){
this.flowing = false;
}
resume(){
this.flowing = true;
this.read();
}
- 暫停模式 暫停模式不同的是需要走緩存,并且監(jiān)聽的是readable事件(這里我只貼出具有差異性的代碼)
- 在構(gòu)造函數(shù)中需要加入this.buffers = [];(源碼中為了提高效率贮缕,使用的是鏈表結(jié)構(gòu)辙谜,這里我用數(shù)組代替),以及readable事件的監(jiān)聽感昼。
this.on('newListener', (type) => {
if (type == 'data') {
this.flowing = true;
this.read();
}
if (type == 'readable') {
this.read(0);
}
});
- 這里read方法需要傳入一個n装哆,表示需要讀取的字節(jié)數(shù)。如果判斷緩存的大小,即this.length,如果this.length == 0 || this.length < this.highWaterMark 蜕琴,執(zhí)行_read()方法,此時執(zhí)行的n為0
let _read = () => {
let m = this.end ? Math.min(this.end - this.pos + 1, this.highWaterMark) : this.highWaterMark;
fs.read(this.fd, this.buffer, 0, m, this.pos, (err, bytesRead) => {
if (err) {
return
}
let data;
if (bytesRead > 0) {
data = this.buffer.slice(0, bytesRead);
this.pos += bytesRead;
this.length += bytesRead;
if (this.end && this.pos > this.end) {
if (this.needReadable) {
this.emit('readable');
}
this.emit('end');
} else {
this.buffers.push(data);
if (this.needReadable) {
this.emit('readable');
this.needReadable = false;
}
}
} else {
if (this.needReadable) {
this.emit('readable');
}
return this.emit('end');
}
})
}
- 如果傳入的n值 0 < n < this.length萍桌,走以下邏輯,即從緩存區(qū)中讀取相應的字節(jié)數(shù)進行讀取
if (0 < n < this.length) {
ret = Buffer.alloc(n);
let b;
let index = 0;
while (null != (b = this.buffers.shift())) {
for (let i = 0; i < b.length; i++) {
ret[index++] = b[i];
if (index == ret.length) {
this.length -= n;
b = b.slice(i + 1);
this.buffers.unshift(b);
break;
}
}
}
if (this.encoding) ret = ret.toString(this.encoding);
}
Writable - 可讀的流介紹與實現(xiàn)
- 構(gòu)造函數(shù)跟上面差距不大,有一個this.buffer的緩存區(qū)凌简,并且最高水位線默認為16k
//構(gòu)造函數(shù)
constructor(path,options){
super(path,options);
this.path = path; //寫入路徑
this.flags = options.flags || 'w';//操作修飾符
this.mode = options.mode || 0o666;//權(quán)限
this.start = options.start || 0;//寫入的起始位置
this.pos = this.start;//文件的寫入索引
this.encoding = options.encoding || 'utf8';//編碼
this.autoClose = options.autoClose;//自動關(guān)閉
this.highWaterMark = options.highWaterMark || 16 * 1024; //默認最高水位線16k
this.buffers = [];//緩存區(qū) 源碼里面是鏈表
this.writing = false;//標識內(nèi)部正在寫入數(shù)據(jù)
this.length = 0;//標識緩存區(qū)字節(jié)的長度
this.open();//默認一創(chuàng)建就打開文件
}
- open即打開文件是一樣的上炎,這里不再描述。最重要的write方法号醉,該方法有一個返回值反症,標識緩存區(qū)的長度是否超過了最高水位線,特別注意的是如果超過了也會寫入進去的畔派,因為會放入到緩存區(qū)中铅碍,等到drain事件觸發(fā),再接著寫入线椰,具體寫入是執(zhí)行的_write方法胞谈,其中有一個this.writing 標識是否正在寫入,如果正在寫入憨愉,則放入緩存區(qū)中烦绳,在清空緩存區(qū)的時候依次取出寫入,即以下的clearBuffer方法配紫,此方法中當緩存區(qū)清空了以后觸發(fā)drain事件径密,這里需要特殊說明一下,如果緩存區(qū)從未滿過躺孝,是不會觸發(fā)這個事件的享扔。
write(chunk,encoding,callback){
chunk = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk,this.encoding);//此方法只吸入buffer或者字符串
this.length += chunk.length;//當前緩存區(qū)的長度
if (this.writing) {//如果正在寫入數(shù)據(jù) 則需要把數(shù)據(jù)放入緩存區(qū)里面
this.buffers.push({
chunk,
encoding,
callback
})
} else { //如果當前空閑 直接調(diào)用底層寫入的方法進行寫入 并且在寫完以后 清空緩存區(qū)
this.writing = true;
this._write(chunk,encoding,()=>{
callback&&callback();
this.clearBuffer();
})
}
//write方法有一個返回值 表示當前緩存區(qū)是否超過了最高水位線 即是否能繼續(xù)寫入
return this.length < this.highWaterMark;
}
_write(chunk,encoding,callback){
if (typeof this.fd != 'number') { //因為是異步的 文件可能在這個時候并未打開
return this.once('open',()=>this._write(chunk, encoding, callback));
}
fs.write(this.fd,chunk,0,chunk.length,this.pos,(err,bytesWritten)=>{
if (err) {
if (this.autoClose) {
this.destory();
}
return this.emit('error',err);
}
this.pos += bytesWritten;
this.length -= bytesWritten;
callback&&callback();
})
}
clearBuffer(){
let data = this.buffers.shift();
if(data){
this._write(data.chunk,data.encoding,()=>{
data.callback && data.callback();
this.clearBuffer();
})
}else{
this.writing = false;
//緩存區(qū)清空了 緩存區(qū)如果沒有滿過 是不會觸發(fā)這個事件的
this.emit('drain');
}
}