Node.js源碼解析-Readable實(shí)現(xiàn)
歡迎來我的博客閱讀:《Node.js源碼解析-Readable實(shí)現(xiàn)》
想要了解 Readable 的實(shí)現(xiàn),最好的方法是順著 Readable 的 Birth-Death 走一遍
Base
在了解 Readable 的 Birth-Death 之前送浊,先看看 Readable 的構(gòu)造函數(shù)
// lib/_stream_readable.js
function Readable(options) {
if (!(this instanceof Readable))
return new Readable(options);
// Readable 流的狀態(tài)集
this._readableState = new ReadableState(options, this);
// legacy
this.readable = true;
if (options) {
if (typeof options.read === 'function')
// 真實(shí)數(shù)據(jù)來源音婶,Readable.prototyoe._read() 函數(shù)會(huì)拋出異常落蝙,因此必須有options.read
this._read = options.read;
if (typeof options.destroy === 'function')
this._destroy = options.destroy;
}
Stream.call(this);
}
function ReadableState(options, stream) {
options = options || {};
// object 模式標(biāo)識
this.objectMode = !!options.objectMode;
if (stream instanceof Stream.Duplex)
this.objectMode = this.objectMode || !!options.readableObjectMode;
var hwm = options.highWaterMark;
var defaultHwm = this.objectMode ? 16 : 16 * 1024;
this.highWaterMark = (hwm || hwm === 0) ? hwm : defaultHwm;
// highWaterMark 高水位標(biāo)識
// 內(nèi)部緩存高于 highWaterMark 時(shí)入撒,會(huì)停止調(diào)用 _read() 獲取數(shù)據(jù)
// 默認(rèn) 16k
this.highWaterMark = Math.floor(this.highWaterMark);
// Readable 流內(nèi)部緩沖池荆烈,是一個(gè) buffer 鏈表
// 之所以不用數(shù)組實(shí)現(xiàn)诈胜,是因?yàn)殒湵碓鰟h頭尾元素更快
this.buffer = new BufferList();
// 緩存大小
this.length = 0;
// pipe 的流
this.pipes = null;
this.pipesCount = 0;
// flow 模式標(biāo)識
this.flowing = null;
// Readable 狀態(tài)標(biāo)識,為 true 表示數(shù)據(jù)源已讀取完畢
// 此時(shí) Readable 中可能還有數(shù)據(jù)蚁滋,不能再向緩沖池中 push() 數(shù)據(jù)
this.ended = false;
// Readable 狀態(tài)標(biāo)識宿接,為 true 表示 end 事件已觸發(fā)
// 此時(shí) Readable 中數(shù)據(jù)已讀取完畢,不能再向緩沖池中 push() 或 unshift() 數(shù)據(jù)
this.endEmitted = false;
// Readable 狀態(tài)標(biāo)識辕录,為 true 表示正在調(diào)用 _read() 讀取數(shù)據(jù)
this.reading = false;
this.sync = true;
// 標(biāo)識需要觸發(fā) readable 事件
this.needReadable = false;
// 標(biāo)識已觸發(fā) readable 事件
this.emittedReadable = false;
this.readableListening = false;
this.resumeScheduled = false;
this.destroyed = false;
this.defaultEncoding = options.defaultEncoding || 'utf8';
this.awaitDrain = 0;
this.readingMore = false;
// 解碼器
this.decoder = null;
this.encoding = null;
if (options.encoding) {
if (!StringDecoder)
StringDecoder = require('string_decoder').StringDecoder;
this.decoder = new StringDecoder(options.encoding);
this.encoding = options.encoding;
}
}
在 Readable 的構(gòu)造函數(shù)中睦霎,可通過 options 傳入?yún)?shù),其中 options.read
函數(shù)是必需的
readable._readableState
中保存了 Readable 的各種狀態(tài)與屬性
Birth-Death
在這里將 Readable 的 Birth-Death 分為五個(gè)狀態(tài):
表中為
this._readableSate
的屬性
-
start: 初始狀態(tài)踏拜,Readable 剛剛被創(chuàng)建碎赢,還未調(diào)用
readable.read()
length reading ended endEmitted 0 false false false -
reading: 代表正在從數(shù)據(jù)源中讀取數(shù)據(jù),此時(shí)緩存大小
this._readableSate.length
小于highWaterMark
速梗,應(yīng)讀取數(shù)據(jù)使緩存達(dá)到highWaterMark
length reading ended endEmitted < highWaterMark true false false -
read: Readable 從數(shù)據(jù)源讀取數(shù)據(jù)后的相對穩(wěn)定狀態(tài)
length reading ended endEmitted >= highWaterMark false false false -
ended: 數(shù)據(jù)已經(jīng)全部讀取完成(
push(null)
)肮塞,此時(shí)push(chunk)
會(huì)報(bào)stream.push() after EOF
錯(cuò)誤length reading ended endEmitted >= 0 false true false -
endEmitted: end 事件觸發(fā)完成,此時(shí)
unshift(chunk)
會(huì)報(bào)stream.unshift() after end event
錯(cuò)誤length reading ended endEmitted 0 false true true
它們之間的關(guān)系如下:
1 4 5
start ==> reading ==> ended ==> endEmitted
|| /\
2 \/ || 3
read
1. start ==> reading
start 狀態(tài)變?yōu)?reading 狀態(tài)姻锁,發(fā)生在第一次調(diào)用 read()
時(shí)
// lib/_stream_readable.js
Readable.prototype.read = function(n) {
debug('read', n);
n = parseInt(n, 10);
var state = this._readableState;
var nOrig = n;
if (n !== 0)
state.emittedReadable = false;
// 調(diào)用 read(0)時(shí)枕赵,如果緩存大于 highWaterMark 則直接觸發(fā) readable 事件
if (n === 0 &&
state.needReadable &&
(state.length >= state.highWaterMark || state.ended)) {
debug('read: emitReadable', state.length, state.ended);
if (state.length === 0 && state.ended)
endReadable(this);
else
emitReadable(this);
return null;
}
// 計(jì)算可讀數(shù)據(jù)量
// n = NaN ==> 讀取全部
// n <= state.length ==> 讀取 n
// n > state.length ==> 讀取 0,并使 Readable 從數(shù)據(jù)源讀取數(shù)據(jù)
//
// n > state.highWaterMark ==> 重新計(jì)算 highWaterMark位隶,大小是大于 n 的最小 2^x
n = howMuchToRead(n, state);
// 當(dāng) Readable 已經(jīng)讀完時(shí)拷窜,調(diào)用 endReadable() ,結(jié)束 Readable
if (n === 0 && state.ended) {
if (state.length === 0)
endReadable(this);
return null;
}
// 判斷是否應(yīng)該從數(shù)據(jù)源讀取數(shù)據(jù)
// BEGIN
var doRead = state.needReadable;
debug('need readable', doRead);
if (state.length === 0 || state.length - n < state.highWaterMark) {
doRead = true;
debug('length less than watermark', doRead);
}
// END
if (state.ended || state.reading) {
// 對于 ended 或 reading 狀態(tài)的 Readable 是不需要讀取數(shù)據(jù)的
doRead = false;
debug('reading or ended', doRead);
} else if (doRead) {
// 讀取數(shù)據(jù)
debug('do read');
state.reading = true;
state.sync = true;
if (state.length === 0)
state.needReadable = true;
// 從數(shù)據(jù)源讀取數(shù)據(jù)涧黄,可能是異步篮昧,也可能是同步
this._read(state.highWaterMark);
state.sync = false;
// 因?yàn)?_read() 函數(shù)可能是異步的,也可能是同步的
// 在同步情況下笋妥,需要重新確認(rèn)可讀長度
if (!state.reading)
n = howMuchToRead(nOrig, state);
}
// 獲取數(shù)據(jù)
var ret;
if (n > 0) ret = fromList(n, state); // 從緩沖池中讀取數(shù)據(jù)
else ret = null;
if (ret === null) {
state.needReadable = true;
n = 0;
} else {
state.length -= n;
}
// ...
if (ret !== null)
this.emit('data', ret);
return ret;
};
// 必須實(shí)現(xiàn)的方法
Readable.prototype._read = function(n) {
this.emit('error', new Error('_read() is not implemented'));
};
// 計(jì)算可讀長度
function howMuchToRead(n, state) {
if (n <= 0 || (state.length === 0 && state.ended))
return 0;
if (state.objectMode)
return 1;
if (n !== n) { // NaN
if (state.flowing && state.length)
return state.buffer.head.data.length;
else
return state.length;
}
if (n > state.highWaterMark)
// 當(dāng)需要數(shù)據(jù)大于 highWaterMark 時(shí)懊昨,調(diào)整 highWaterMark 大小到大于 n 的最小 2^x
state.highWaterMark = computeNewHighWaterMark(n);
if (n <= state.length)
return n;
// 緩沖池中數(shù)據(jù)不夠
if (!state.ended) {
state.needReadable = true;
return 0;
}
return state.length;
}
調(diào)用 read()
后,如果緩沖池中數(shù)據(jù)不夠或讀取后低于 highWaterMark春宣,則調(diào)用 _read()
來讀取更多的數(shù)據(jù)酵颁,否則直接返回讀取的數(shù)據(jù)
當(dāng)期望數(shù)據(jù)量大于 highWaterMark 時(shí),重新計(jì)算 highWaterMark月帝,大小是大于期望數(shù)據(jù)量的最小 2^x
2. reading ==> read
調(diào)用 _read()
后躏惋,會(huì)異步或同步地將調(diào)用 push(chunk)
,將數(shù)據(jù)放入緩沖池嚷辅,并使 Readable 從 reading 狀態(tài)變?yōu)?read 狀態(tài)
// lib/_stream_readable.js
Readable.prototype.push = function(chunk, encoding) {
var state = this._readableState;
var skipChunkCheck;
if (!state.objectMode) {
if (typeof chunk === 'string') {
encoding = encoding || state.defaultEncoding;
// 如果指定編碼與 Readable 編碼不同簿姨,則將 chunk 使用指定編碼解碼為 Buffer
if (encoding !== state.encoding) {
chunk = Buffer.from(chunk, encoding);
encoding = '';
}
// string 不需要檢查
skipChunkCheck = true;
}
} else {
// object mode 的 Readable 也不需要檢查
skipChunkCheck = true;
}
return readableAddChunk(this, chunk, encoding, false, skipChunkCheck);
};
function readableAddChunk(stream, chunk, encoding, addToFront, skipChunkCheck) {
var state = stream._readableState;
if (chunk === null) { // 結(jié)束信號
state.reading = false;
onEofChunk(stream, state);
} else {
var er;
if (!skipChunkCheck) // 檢查 chunk 格式
er = chunkInvalid(state, chunk);
if (er) {
stream.emit('error', er);
} else if (state.objectMode || chunk && chunk.length > 0) {
if (typeof chunk !== 'string' &&
Object.getPrototypeOf(chunk) !== Buffer.prototype &&
!state.objectMode) {
chunk = Stream._uint8ArrayToBuffer(chunk);
}
if (addToFront) { // unshift() 的 hook
if (state.endEmitted)
stream.emit('error', new Error('stream.unshift() after end event'));
else
addChunk(stream, state, chunk, true); // 將數(shù)據(jù)添加到緩沖池中
} else if (state.ended) {
stream.emit('error', new Error('stream.push() after EOF'));
} else {
state.reading = false;
if (state.decoder && !encoding) {
chunk = state.decoder.write(chunk);
if (state.objectMode || chunk.length !== 0)
addChunk(stream, state, chunk, false); // 將數(shù)據(jù)添加到緩沖池中
else
maybeReadMore(stream, state); // 會(huì)在 addChunk() 函數(shù)內(nèi)部調(diào)用
} else {
addChunk(stream, state, chunk, false); // 將數(shù)據(jù)添加到緩沖池中
}
}
} else if (!addToFront) {
state.reading = false;
}
}
return needMoreData(state);
// return !state.ended && 數(shù)據(jù)源還有數(shù)據(jù)
// (state.needReadable || 需要更多數(shù)據(jù)
// state.length < state.highWaterMark || 緩存小于 highWaterMark
// state.length === 0)
}
function addChunk(stream, state, chunk, addToFront) {
if (state.flowing && state.length === 0 && !state.sync) {
// 對于 flow 模式的 Readable,直接觸發(fā) data 事件簸搞,并繼續(xù)讀取數(shù)據(jù)就行
stream.emit('data', chunk);
stream.read(0);
} else {
state.length += state.objectMode ? 1 : chunk.length;
if (addToFront)
state.buffer.unshift(chunk);
else
state.buffer.push(chunk);
if (state.needReadable)
emitReadable(stream);
}
// 在允許的情況下款熬,讀取數(shù)據(jù)直到 highWaterMark
maybeReadMore(stream, state);
}
調(diào)用 push(chunk)
時(shí)深寥,會(huì)將 chunk 放入緩沖池內(nèi)攘乒,并改變 Readable 的狀態(tài)贤牛。如果 Readable 處于 ended 狀態(tài),會(huì)報(bào) stream.push() after EOF
錯(cuò)誤
如果緩存小于 highWaterMark则酝,返回 true殉簸,意味著需要寫入更多的數(shù)據(jù)
3. read ==> reading
從 read 到 reading 狀態(tài),意味著需要讀取更多的數(shù)據(jù)沽讹,即緩存小于 highWaterMark
緩存與 highWaterMark 的關(guān)系可以根據(jù) push(chunk)
的返回值來判斷般卑,但是需要使用者手動(dòng)處理。因此爽雄,為了方便使用蝠检,addChunk()
函數(shù)會(huì)自動(dòng)調(diào)用 maybeReadMore()
來異步讀取數(shù)據(jù)。這樣挚瘟,即使單次 _read()
無法達(dá)到 highWaterMark叹谁,也可以通過多次異步讀取,使數(shù)據(jù)流動(dòng)起來
// lib/_stream_readable.js
function maybeReadMore(stream, state) {
if (!state.readingMore) {
state.readingMore = true;
process.nextTick(maybeReadMore_, stream, state);
}
}
function maybeReadMore_(stream, state) {
var len = state.length;
while (!state.reading && !state.flowing && !state.ended &&
state.length < state.highWaterMark) {
debug('maybeReadMore read 0');
stream.read(0);
if (len === state.length) // 取不到數(shù)據(jù)就放棄
break;
else
len = state.length;
}
state.readingMore = false;
}
在 maybeReadMore()
函數(shù)內(nèi)乘盖,通過異步讀取數(shù)據(jù)焰檩,直到 highWaterMark
那么為什么是異步讀取數(shù)據(jù)呢?
因?yàn)槎┛颍?_read()
函數(shù)內(nèi)析苫,可能不止一次調(diào)用 push(chunk)
如果是同步,push(chunk)
后穿扳,因?yàn)闆]有達(dá)到 highWaterMark衩侥,會(huì)繼續(xù)調(diào)用 read(0)
,發(fā)生第二次 _read()
矛物。第二次 _read()
也可能導(dǎo)致第三次 _read()
茫死,直到 highWaterMark
待整個(gè)調(diào)用完畢后,緩沖池內(nèi)會(huì)有 highWaterMark * n( _read()
內(nèi)調(diào)用 push(chunk)
次數(shù) )的數(shù)據(jù)泽谨,而這與 highWaterMark 的設(shè)計(jì)是不符的
如果是異步璧榄,則可以等 _read()
執(zhí)行完畢后,在 process.nextTick()
內(nèi)再次調(diào)用 _read()
讀取數(shù)據(jù)吧雹,不會(huì)發(fā)生上面的問題
4. reading ==> ended
當(dāng)數(shù)據(jù)源讀取完畢時(shí)骨杂,需要調(diào)用 push(null)
來通知 Rreadable 數(shù)據(jù)源已經(jīng)讀取完畢。push(null)
函數(shù)內(nèi)部會(huì)調(diào)用 onEofChunk()
// lib/_stream_readable.js
function onEofChunk(stream, state) {
if (state.ended) return;
if (state.decoder) {
var chunk = state.decoder.end();
if (chunk && chunk.length) {
state.buffer.push(chunk);
state.length += state.objectMode ? 1 : chunk.length;
}
}
state.ended = true;
// 觸發(fā) readable 事件雄卷,通知監(jiān)聽者來處理剩余數(shù)據(jù)
emitReadable(stream);
}
onEofChunk()
函數(shù)將 readable 標(biāo)記為 ended 狀態(tài)后搓蚪,禁止再向緩沖池內(nèi) push 數(shù)據(jù)。此時(shí)丁鹉,緩沖池內(nèi)可能還有數(shù)據(jù)
5. ended ==> endEmitted
ended 狀態(tài)的 Readable 內(nèi)可能還有數(shù)據(jù)妒潭。因此悴能,當(dāng)數(shù)據(jù)全部被讀取后,需要調(diào)用 endReadable()
來結(jié)束 Readable
// lib/_stream_readable.js
function endReadable(stream) {
var state = stream._readableState;
// state.length 一定是 0
if (state.length > 0)
throw new Error('"endReadable()" called on non-empty stream');
if (!state.endEmitted) {
state.ended = true;
process.nextTick(endReadableNT, state, stream);
}
}
function endReadableNT(state, stream) {
// 防止中間調(diào)用 unshift(chunk)雳灾,向緩沖池中放入數(shù)據(jù)
if (!state.endEmitted && state.length === 0) {
state.endEmitted = true;
stream.readable = false;
stream.emit('end');
}
}
調(diào)用 endReadable()
時(shí)漠酿,緩沖池一定為空。整個(gè)調(diào)用完成后谎亩,觸發(fā) end 事件炒嘲,Readable 將不能再讀取或?qū)懭耄?push()
/ unshift()
)數(shù)據(jù)
End
到這里,已經(jīng)走完了 Readable 的整個(gè) Birth-Death 過程
整個(gè)過程就如下面這個(gè)圖:
1 4 5
start ==> reading ==> ended ==> endEmitted
|| /\
2 \/ || 3
read
1. read()
2. push(chunk)
3. maybeReadMore() ==> read(0)
4. push(null)
5. endReadable()
根據(jù)這個(gè)圖還有代碼匈庭,在腦袋里面夫凸,把 Readable 的模型運(yùn)行一遍,就能了解它的實(shí)現(xiàn)了
參考: