Node.js源碼解析-pipe實現(xiàn)
歡迎來我的博客閱讀:《Node.js源碼解析-pipe實現(xiàn)》
從前面兩篇文章,我們了解到拱燃。想要把 Readable 的數(shù)據(jù)寫到 Writable了讨,就必須先手動的將數(shù)據(jù)讀入內(nèi)存献汗,然后寫入 Writable暇赤。換句話說耙蔑,每次傳遞數(shù)據(jù)時,都需要寫如下的模板代碼
readable.on('readable', (err) => {
if(err) throw err
writable.write(readable.read())
})
為了方便使用疟赊,Node.js 提供了 pipe()
方法郊供,讓我們可以優(yōu)雅的傳遞數(shù)據(jù)
readable.pipe(writable)
現(xiàn)在,就讓我們來看看它是如何實現(xiàn)的吧
pipe
首先需要先調(diào)用 Readable 的 pipe()
方法
// lib/_stream_readable.js
Readable.prototype.pipe = function(dest, pipeOpts) {
var src = this;
var state = this._readableState;
// 記錄 Writable
switch (state.pipesCount) {
case 0:
state.pipes = dest;
break;
case 1:
state.pipes = [state.pipes, dest];
break;
default:
state.pipes.push(dest);
break;
}
state.pipesCount += 1;
// ...
src.once('end', endFn);
dest.on('unpipe', onunpipe);
// ...
dest.on('drain', ondrain);
// ...
src.on('data', ondata);
// ...
// 保證 error 事件觸發(fā)時近哟,onerror 首先被執(zhí)行
prependListener(dest, 'error', onerror);
// ...
dest.once('close', onclose);
// ...
dest.once('finish', onfinish);
// ...
// 觸發(fā) Writable 的 pipe 事件
dest.emit('pipe', src);
// 將 Readable 改為 flow 模式
if (!state.flowing) {
debug('pipe resume');
src.resume();
}
return dest;
};
執(zhí)行 pipe()
函數(shù)時颂碘,首先將 Writable 記錄到 state.pipes
中,然后綁定相關事件椅挣,最后如果 Readable 不是 flow 模式,就調(diào)用 resume()
將 Readable 改為 flow 模式
傳遞數(shù)據(jù)
Readable 從數(shù)據(jù)源獲取到數(shù)據(jù)后塔拳,觸發(fā) data 事件鼠证,執(zhí)行 ondata()
ondata()
相關代碼:
// lib/_stream_readable.js
// 防止在 dest.write(chunk) 內(nèi)調(diào)用 src.push(chunk) 造成 awaitDrain 重復增加,awaitDrain 不能清零靠抑,Readable 卡住的情況
// 詳情見 https://github.com/nodejs/node/issues/7278
var increasedAwaitDrain = false;
function ondata(chunk) {
debug('ondata');
increasedAwaitDrain = false;
var ret = dest.write(chunk);
if (false === ret && !increasedAwaitDrain) {
// 防止在 dest.write() 內(nèi)調(diào)用 src.unpipe(dest)量九,導致 awaitDrain 不能清零,Readable 卡住的情況
if (((state.pipesCount === 1 && state.pipes === dest) ||
(state.pipesCount > 1 && state.pipes.indexOf(dest) !== -1)
) &&
!cleanedUp) {
debug('false write response, pause', src._readableState.awaitDrain);
src._readableState.awaitDrain++;
increasedAwaitDrain = true;
}
// 進入 pause 模式
src.pause();
}
}
在 ondata(chunk)
函數(shù)內(nèi)颂碧,通過 dest.write(chunk)
將數(shù)據(jù)寫入 Writable
此時荠列,在 _write()
內(nèi)部可能會調(diào)用 src.push(chunk)
或使其 unpipe,這會導致 awaitDrain 多次增加载城,不能清零肌似,Readable 卡住
當不能再向 Writable 寫入數(shù)據(jù)時,Readable 會進入 pause 模式诉瓦,直到所有的 drain 事件觸發(fā)
觸發(fā) drain 事件川队,執(zhí)行 ondrain()
// lib/_stream_readable.js
var ondrain = pipeOnDrain(src);
function pipeOnDrain(src) {
return function() {
var state = src._readableState;
debug('pipeOnDrain', state.awaitDrain);
if (state.awaitDrain)
state.awaitDrain--;
// awaitDrain === 0力细,且有 data 監(jiān)聽器
if (state.awaitDrain === 0 && EE.listenerCount(src, 'data')) {
state.flowing = true;
flow(src);
}
};
}
每個 drain 事件觸發(fā)時,都會減少 awaitDrain固额,直到 awaitDrain 為 0眠蚂。此時,調(diào)用 flow(src)
斗躏,使 Readable 進入 flow 模式
到這里逝慧,整個數(shù)據(jù)傳遞循環(huán)已經(jīng)建立,數(shù)據(jù)會順著循環(huán)源源不斷的流入 Writable啄糙,直到所有數(shù)據(jù)寫入完成
unpipe
不管寫入過程中是否出現(xiàn)錯誤笛臣,最后都會執(zhí)行 unpipe()
// lib/_stream_readable.js
// ...
function unpipe() {
debug('unpipe');
src.unpipe(dest);
}
// ...
Readable.prototype.unpipe = function(dest) {
var state = this._readableState;
var unpipeInfo = { hasUnpiped: false };
// 啥也沒有
if (state.pipesCount === 0)
return this;
// 只有一個
if (state.pipesCount === 1) {
if (dest && dest !== state.pipes)
return this;
// 沒有指定就 unpipe 所有
if (!dest)
dest = state.pipes;
state.pipes = null;
state.pipesCount = 0;
state.flowing = false;
if (dest)
dest.emit('unpipe', this, unpipeInfo);
return this;
}
// 沒有指定就 unpipe 所有
if (!dest) {
var dests = state.pipes;
var len = state.pipesCount;
state.pipes = null;
state.pipesCount = 0;
state.flowing = false;
for (var i = 0; i < len; i++)
dests[i].emit('unpipe', this, unpipeInfo);
return this;
}
// 找到指定 Writable,并 unpipe
var index = state.pipes.indexOf(dest);
if (index === -1)
return this;
state.pipes.splice(index, 1);
state.pipesCount -= 1;
if (state.pipesCount === 1)
state.pipes = state.pipes[0];
dest.emit('unpipe', this, unpipeInfo);
return this;
};
Readable.prototype.unpipe()
函數(shù)會根據(jù) state.pipes
屬性和 dest 參數(shù)迈套,選擇執(zhí)行策略捐祠。最后會觸發(fā) dest 的 unpipe 事件
unpipe 事件觸發(fā)后,調(diào)用 onunpipe()
桑李,清理相關數(shù)據(jù)
// lib/_stream_readable.js
function onunpipe(readable, unpipeInfo) {
debug('onunpipe');
if (readable === src) {
if (unpipeInfo && unpipeInfo.hasUnpiped === false) {
unpipeInfo.hasUnpiped = true;
// 清理相關數(shù)據(jù)
cleanup();
}
}
}
End
在整個 pipe 的過程中踱蛀,Readable 是主動方 ( 負責整個 pipe 過程:包括數(shù)據(jù)傳遞、unpipe 與異常處理 )贵白,Writable 是被動方 ( 只需要觸發(fā) drain 事件 )
總結一下 pipe 的過程:
- 首先執(zhí)行
readbable.pipe(writable)
率拒,將 readable 與 writable 對接上 - 當 readable 中有數(shù)據(jù)時,
readable.emit('data')
禁荒,將數(shù)據(jù)寫入 writable - 如果
writable.write(chunk)
返回 false猬膨,則進入 pause 模式,等待 drain 事件觸發(fā) - drain 事件全部觸發(fā)后呛伴,再次進入 flow 模式勃痴,寫入數(shù)據(jù)
- 不管數(shù)據(jù)寫入完成或發(fā)生中斷,最后都會調(diào)用
unpipe()
-
unpipe()
調(diào)用Readable.prototype.unpipe()
热康,觸發(fā) dest 的 unpipe 事件沛申,清理相關數(shù)據(jù)
參考: