Node.js源碼解析-Writable實(shí)現(xiàn)
歡迎來我的博客閱讀:《Node.js源碼解析-Writable實(shí)現(xiàn)》
對于一個(gè) stream 模塊來說娶靡,最基本的就是讀和寫。讀由 Readable 負(fù)責(zé),寫則是由 Writable 負(fù)責(zé)。Readable 的實(shí)現(xiàn)上篇文章已經(jīng)介紹過了凹嘲,這篇介紹 Writable 的實(shí)現(xiàn)
Base
在開始之前呜魄,先看看 Writable 的構(gòu)造函數(shù)
// lib/_stream_readable.js
function Writable(options) {
// 只能在 Writable 或 Duplex 的上下文中執(zhí)行
if (!(realHasInstance.call(Writable, this)) &&
!(this instanceof Stream.Duplex)) {
return new Writable(options);
}
// Wrirable 的狀態(tài)集
this._writableState = new WritableState(options, this);
// legacy
this.writable = true;
if (options) {
if (typeof options.write === 'function')
this._write = options.write;
if (typeof options.writev === 'function')
this._writev = options.writev;
if (typeof options.destroy === 'function')
this._destroy = options.destroy;
if (typeof options.final === 'function')
this._final = options.final;
}
Stream.call(this);
}
function WritableState(options, stream) {
options = options || {};
// object 模式標(biāo)識
this.objectMode = !!options.objectMode;
if (stream instanceof Stream.Duplex)
this.objectMode = this.objectMode || !!options.writableObjectMode;
var hwm = options.highWaterMark;
var defaultHwm = this.objectMode ? 16 : 16 * 1024;
this.highWaterMark = (hwm || hwm === 0) ? hwm : defaultHwm;
// highWaterMark 高水位標(biāo)識
// 此時(shí)烁焙,write() 返回 false
// 默認(rèn) 16k
this.highWaterMark = Math.floor(this.highWaterMark);
this.finalCalled = false;
// drain 事件標(biāo)識
this.needDrain = false;
// 剛調(diào)用 end() 時(shí)的狀態(tài)標(biāo)識
this.ending = false;
// end() 調(diào)用完成后的狀態(tài)標(biāo)識
this.ended = false;
this.finished = false;
this.destroyed = false;
var noDecode = options.decodeStrings === false;
// 數(shù)據(jù)寫入前,是否應(yīng)該將 string 解析為 buffer
this.decodeStrings = !noDecode;
this.defaultEncoding = options.defaultEncoding || 'utf8';
// 緩存中的數(shù)據(jù)長度
// 不是真正的 buffer 長度耕赘,而是正在等待寫入的數(shù)據(jù)長度
this.length = 0;
// writing 標(biāo)識
this.writing = false;
// cork 標(biāo)識
this.corked = 0;
// 標(biāo)識是否有異步回調(diào)
this.sync = true;
// 標(biāo)識正在寫入緩存中的內(nèi)容
this.bufferProcessing = false;
// _write() 和 _writev() 函數(shù)的回調(diào)
this.onwrite = onwrite.bind(undefined, stream);
// 調(diào)用 write(chunk, cb) 時(shí)的回調(diào)函數(shù)
this.writecb = null;
// 需寫入的單個(gè) chunk 塊長度
this.writelen = 0;
// Writable 的緩沖池實(shí)現(xiàn)也是一個(gè)鏈表骄蝇,其每個(gè)節(jié)點(diǎn)的結(jié)構(gòu)如下:
// {
// chunk,
// encoding,
// isBuf,
// callback,
// next
// }
// 緩沖池頭節(jié)點(diǎn)
this.bufferedRequest = null;
// 緩沖池尾節(jié)點(diǎn)
this.lastBufferedRequest = null;
// 緩沖池的大小
this.bufferedRequestCount = 0;
// 還需要執(zhí)行的 callback 數(shù)量,必須在 finish 事件發(fā)生之前降為 0
this.pendingcb = 0;
this.prefinished = false;
this.errorEmitted = false;
// cork 的回調(diào)函數(shù)操骡,最多只能有兩個(gè)函數(shù)對象
var corkReq = { next: null, entry: null, finish: undefined };
corkReq.finish = onCorkedFinish.bind(undefined, corkReq, this);
this.corkedRequestsFree = corkReq;
}
Writable 與 Readable 類似九火,也使用一個(gè)對象 ( WritableState ) 對狀態(tài)和屬性進(jìn)行集中管理
在 Writable 的構(gòu)造函數(shù)參數(shù)中,options.write
函數(shù)是必須的册招,options.writev
則是用于批量寫入數(shù)據(jù)岔激,可以選擇實(shí)現(xiàn)
Writable 的緩沖池也是由鏈表實(shí)現(xiàn),但與 Readable 不同的是是掰,Writable 的緩沖池實(shí)現(xiàn)更簡單虑鼎,其節(jié)點(diǎn)結(jié)構(gòu)如下:
{
chunk, // 數(shù)據(jù)塊,可能是 object / string / buffer
encoding, // 數(shù)據(jù)塊編碼
isBuf, // buffer 標(biāo)識
callback, // write(chunk, cb) 的回調(diào)函數(shù)
next // 下一個(gè)寫入任務(wù)
}
除此之外键痛,WritableState 還使用 bufferedRequest炫彩、lastBufferedRequest、bufferedRequestCount 屬性分別記錄緩沖池的頭絮短、尾節(jié)點(diǎn)和節(jié)點(diǎn)數(shù)量
與 Duplex 的關(guān)系
在 Duplex 的源碼中有這么一段注釋
// a duplex stream is just a stream that is both readable and writable.
// Since JS doesn't have multiple prototypal inheritance, this class
// prototypally inherits from Readable, and then parasitically from
// Writable.
意思是: Duplex 流既是 Readable 流又是 Writable 流江兢,但是由于 JS 中的繼承是基于原型的,沒有多繼承丁频。所以 Duplex 是繼承自 Readable杉允,寄生自 Writable
寄生自 Writable 體現(xiàn)在兩個(gè)方面:
-
duplex instanceof Writable
為 true - duplex 具有 Writable 的屬性和方法
// lib/_stream_writable.js
var realHasInstance;
if (typeof Symbol === 'function' && Symbol.hasInstance) {
realHasInstance = Function.prototype[Symbol.hasInstance];
Object.defineProperty(Writable, Symbol.hasInstance, {
value: function(object) {
if (realHasInstance.call(this, object))
return true;
return object && object._writableState instanceof WritableState;
}
});
} else {
realHasInstance = function(object) {
return object instanceof this;
};
}
function Writable(options) {
if (!(realHasInstance.call(Writable, this)) &&
!(this instanceof Stream.Duplex)) {
return new Writable(options);
}
// ...
}
可以看出,通過修改 Writable 的 Symbol.hasInstance
使得 duplex/writable instanceof Writable
為 true
席里。Writable 的構(gòu)造函數(shù)也只能在 writable 或 duplex 的上下文中調(diào)用叔磷,使 duplex 具有 Writable 的屬性
// lib/_stream_duplex.js
util.inherits(Duplex, Readable);
var keys = Object.keys(Writable.prototype);
// 獲取 Writable 的所有方法
for (var v = 0; v < keys.length; v++) {
var method = keys[v];
if (!Duplex.prototype[method])
Duplex.prototype[method] = Writable.prototype[method];
}
function Duplex(options) {
if (!(this instanceof Duplex))
return new Duplex(options);
Readable.call(this, options);
Writable.call(this, options);
// ...
}
遍歷 Writable 原型上的方法,并添加到 Duplex 的原型上奖磁,使 duplex 具有 Writable 的方法
Write 過程
Writable 的寫過程比 Readable 的讀過程簡單得多改基,不用考慮異步操作,直接寫入即可
// lib/_stream_writable.js
Writable.prototype.write = function(chunk, encoding, cb) {
var state = this._writableState;
var ret = false;
// 判斷是否是 buffer
var isBuf = Stream._isUint8Array(chunk) && !state.objectMode;
// ...
if (state.ended)
// Writable 結(jié)束后繼續(xù)寫入數(shù)據(jù)會(huì)報(bào)錯(cuò)
// 觸發(fā) error 事件
writeAfterEnd(this, cb);
else if (isBuf || validChunk(this, state, chunk, cb)) {
// 對 chunk 進(jìn)行校驗(yàn)
// 不能為 null
// undefined 和非字符串只在 objectMode 下接受
state.pendingcb++;
ret = writeOrBuffer(this, state, isBuf, chunk, encoding, cb);
}
return ret;
};
write()
函數(shù)對傳入數(shù)據(jù)進(jìn)行初步處理與校驗(yàn)后交由 writeOrBuffer()
函數(shù)繼續(xù)處理
// lib/_stream_writable.js
function writeOrBuffer(stream, state, isBuf, chunk, encoding, cb) {
if (!isBuf) {
// 非 buffer 的情況有: string 或 object
// 為 object署穗,代表是 objectMode寥裂,直接返回即可
// 為 string,則需解碼成 buffer
var newChunk = decodeChunk(state, chunk, encoding);
if (chunk !== newChunk) {
isBuf = true;
encoding = 'buffer';
chunk = newChunk;
}
}
var len = state.objectMode ? 1 : chunk.length;
state.length += len;
var ret = state.length < state.highWaterMark;
if (!ret) state.needDrain = true;
if (state.writing || state.corked) {
// 正在寫或處于 cork 狀態(tài)
// 將數(shù)據(jù)塊添加到緩沖池鏈表為尾部
var last = state.lastBufferedRequest;
state.lastBufferedRequest = {
chunk,
encoding,
isBuf,
callback: cb,
next: null
};
if (last) {
last.next = state.lastBufferedRequest;
} else {
state.bufferedRequest = state.lastBufferedRequest;
}
state.bufferedRequestCount += 1;
} else {
// 寫入數(shù)據(jù)
doWrite(stream, state, false, len, chunk, encoding, cb);
}
return ret;
}
writeOrBuffer()
函數(shù)對 chunk 進(jìn)行處理后案疲,根據(jù) Writable 自身狀態(tài)決定應(yīng)何時(shí)寫入數(shù)據(jù)
如果正在寫入或處于 cork 狀態(tài)封恰,就將數(shù)據(jù)存儲(chǔ)到緩沖池鏈表尾部,等待以后處理褐啡。否則诺舔,直接調(diào)用 doWrite()
寫入數(shù)據(jù)
當(dāng)緩存達(dá)到 highWaterMark 時(shí),writeOrBuffer()
返回 false,表示不應(yīng)該再寫入數(shù)據(jù)
// lib/_stream_writable.js
function doWrite(stream, state, writev, len, chunk, encoding, cb) {
state.writelen = len; // 寫入的數(shù)據(jù)塊長度
state.writecb = cb; // 寫入操作的回調(diào)函數(shù)
state.writing = true;
state.sync = true; // 同步狀態(tài)標(biāo)識
if (writev) // 一次寫入多個(gè)數(shù)據(jù)塊
stream._writev(chunk, state.onwrite);
else // 一次寫入單個(gè)數(shù)據(jù)塊
stream._write(chunk, encoding, state.onwrite);
state.sync = false;
}
在 doWrite()
函數(shù)中低飒,根據(jù) writev 參數(shù)決定該執(zhí)行 _write()
還是 _writev()
_write()
函數(shù)用于寫入單個(gè)數(shù)據(jù)塊许昨,_writev()
函數(shù)用于寫入多個(gè)數(shù)據(jù)塊
_write()
/ _writev()
中的回調(diào)函數(shù)不是傳入的 cb 而是 state.onwrite
,其定義如下:
this.onwrite = onwrite.bind(undefined, stream);
可知褥赊,寫入完成后糕档,執(zhí)行 onwrite(stream, err)
// lib/_stream_writable.js
function onwrite(stream, er) {
var state = stream._writableState;
var sync = state.sync;
var cb = state.writecb;
// 更新 state
onwriteStateUpdate(state);
if (er) // 發(fā)生錯(cuò)誤
onwriteError(stream, state, sync, er, cb);
else {
var finished = needFinish(state);
if (!finished &&
!state.corked &&
!state.bufferProcessing &&
state.bufferedRequest) {
// 清空緩沖池
// 有 _writev() 函數(shù)時(shí),執(zhí)行 _writev() 一次寫入多個(gè)數(shù)據(jù)塊
// 沒有拌喉,則循環(huán)執(zhí)行 _write() 寫入單個(gè)數(shù)據(jù)塊
clearBuffer(stream, state);
}
if (sync) { // 代表寫入操作是同步的速那,需要在 process.nextTick() 中執(zhí)行 callback
process.nextTick(afterWrite, stream, state, finished, cb);
} else { // 代表寫入操作是異步的,直接執(zhí)行 callback 即可
afterWrite(stream, state, finished, cb);
}
}
}
當(dāng)寫入過程中有錯(cuò)誤發(fā)生時(shí)尿背,會(huì)執(zhí)行 onwriteError()
端仰,繼而調(diào)用 cb(err)
并觸發(fā) error 事件
如果寫入過程正確執(zhí)行,則先查看還有多少數(shù)據(jù)塊正在等待寫入田藐,有多個(gè)荔烧,就執(zhí)行 clearBuffer()
清空緩存,然后執(zhí)行 afterWrite()
// lib/_stream_writable.js
function afterWrite(stream, state, finished, cb) {
if (!finished)
onwriteDrain(stream, state);
state.pendingcb--;
// 執(zhí)行回調(diào)函數(shù)
cb();
// 檢查是否應(yīng)該結(jié)束 Writable
finishMaybe(stream, state);
}
Cork
當(dāng)有大量小數(shù)據(jù)塊需要寫入時(shí)汽久,如果一個(gè)個(gè)寫入鹤竭,會(huì)導(dǎo)致效率低下。Writable 提供了 cork()
和 uncork()
兩個(gè)方法用于大量小數(shù)據(jù)塊寫入的情況
先將寫操作柱塞住回窘,等到緩存達(dá)到一定量后诺擅,再解除柱塞,然后一次性將存儲(chǔ)的數(shù)據(jù)塊寫入啡直,這個(gè)操作需要 _writev()
支持
// lib/_stream_writable.js
Writable.prototype.cork = function() {
var state = this._writableState;
// 增加柱塞的次數(shù)
state.corked++;
};
由于 cork()
函數(shù)可能會(huì)被多次調(diào)用,所以 state.corked
需要記錄 cork()
調(diào)用的次數(shù)苍碟,是個(gè) number
// lib/_stream_writable.js
Writable.prototype.uncork = function() {
var state = this._writableState;
if (state.corked) {
// 減少柱塞的次數(shù)
state.corked--;
if (!state.writing &&
!state.corked &&
!state.finished &&
!state.bufferProcessing &&
state.bufferedRequest)
// 清空緩沖池
clearBuffer(this, state);
}
};
當(dāng) state.corked === 0
時(shí)酒觅,才能表示柱塞已經(jīng)全部解除完畢,可以執(zhí)行 clearBuffer()
來處理緩存中的數(shù)據(jù)
// lib/_stream_writable.js
function clearBuffer(stream, state) {
// 正在清空 buffer 的標(biāo)識
state.bufferProcessing = true;
// 緩存的頭節(jié)點(diǎn)
var entry = state.bufferedRequest;
if (stream._writev && entry && entry.next) {
// _writev() 函數(shù)存在微峰,且有一個(gè)以上數(shù)據(jù)塊,就使用 _writev() 寫入數(shù)據(jù),效率更高
var l = state.bufferedRequestCount;
var buffer = new Array(l);
var holder = state.corkedRequestsFree;
holder.entry = entry;
var count = 0;
var allBuffers = true;
// 取得所有數(shù)據(jù)塊
while (entry) {
buffer[count] = entry;
if (!entry.isBuf)
allBuffers = false;
entry = entry.next;
count += 1;
}
buffer.allBuffers = allBuffers;
// 寫入數(shù)據(jù)
doWrite(stream, state, true, state.length, buffer, '', holder.finish);
state.pendingcb++;
state.lastBufferedRequest = null;
// 保證最多只有兩個(gè)實(shí)例
if (holder.next) {
state.corkedRequestsFree = holder.next;
holder.next = null;
} else {
var corkReq = { next: null, entry: null, finish: undefined };
corkReq.finish = onCorkedFinish.bind(undefined, corkReq, state);
state.corkedRequestsFree = corkReq;
}
} else {
// 一個(gè)個(gè)的寫入
while (entry) {
var chunk = entry.chunk;
var encoding = entry.encoding;
var cb = entry.callback;
var len = state.objectMode ? 1 : chunk.length;
doWrite(stream, state, false, len, chunk, encoding, cb);
entry = entry.next;
// 如果寫操作不是同步執(zhí)行的呢铆,就意味著需要等待此次寫入完成舀奶,再繼續(xù)寫入
if (state.writing) {
break;
}
}
if (entry === null)
state.lastBufferedRequest = null;
}
state.bufferedRequestCount = 0;
// 修正緩存的頭節(jié)點(diǎn)
state.bufferedRequest = entry;
state.bufferProcessing = false;
}
執(zhí)行 clearBuffer()
時(shí),根據(jù)是否有 _writev()
函數(shù)和待寫入數(shù)據(jù)塊數(shù)量仗扬,決定使用 _writev()
還是 _write()
寫入數(shù)據(jù)
-
_writev()
: 會(huì)先將所有數(shù)據(jù)塊包裝成數(shù)組症概,然后寫入。寫入完成后早芭,回調(diào)corkReq.finish
-
_write()
: 只需要將數(shù)據(jù)塊一個(gè)個(gè)寫入即可
在使用 _writev()
的情況下彼城,寫入完成后回調(diào) corkReq.finish
也就是 onCorkedFinish()
函數(shù)
// lib/_stream_writable.js
function onCorkedFinish(corkReq, state, err) {
var entry = corkReq.entry;
corkReq.entry = null;
// 依次執(zhí)行回調(diào)函數(shù)
while (entry) {
var cb = entry.callback;
state.pendingcb--;
cb(err);
entry = entry.next;
}
// 保證最多只有兩個(gè)實(shí)例
if (state.corkedRequestsFree) {
state.corkedRequestsFree.next = corkReq;
} else {
state.corkedRequestsFree = corkReq;
}
}
根據(jù)緩沖池鏈表的順序,依次執(zhí)行寫操作的回調(diào)函數(shù)
End
每次調(diào)用 stream.write(chunk, cb)
,Writable 都會(huì)根據(jù)自身狀態(tài)募壕,決定是將 chunk 加到緩沖池调炬,還是直接寫入
當(dāng)需要寫入大量小數(shù)據(jù)塊時(shí),推薦先使用 cork()
將寫操作柱塞住舱馅,待調(diào)用完畢后缰泡,再調(diào)用 uncork()
解除柱塞,然后一次性寫入所有緩存數(shù)據(jù)
參考: