Node.js源碼解析-Readable實(shí)現(xiàn)

Node.js源碼解析-Readable實(shí)現(xiàn)

歡迎來我的博客閱讀:《Node.js源碼解析-Readable實(shí)現(xiàn)》

想要了解 Readable 的實(shí)現(xiàn),最好的方法是順著 Readable 的 Birth-Death 走一遍

Base

在了解 Readable 的 Birth-Death 之前送浊,先看看 Readable 的構(gòu)造函數(shù)

// lib/_stream_readable.js

function Readable(options) {
  if (!(this instanceof Readable))
    return new Readable(options);

  // Readable 流的狀態(tài)集
  this._readableState = new ReadableState(options, this);

  // legacy
  this.readable = true;

  if (options) {
    if (typeof options.read === 'function')
      // 真實(shí)數(shù)據(jù)來源音婶,Readable.prototyoe._read() 函數(shù)會(huì)拋出異常落蝙,因此必須有options.read
      this._read = options.read;

    if (typeof options.destroy === 'function')
      this._destroy = options.destroy;
  }

  Stream.call(this);
}

function ReadableState(options, stream) {
  options = options || {};

  // object 模式標(biāo)識
  this.objectMode = !!options.objectMode;

  if (stream instanceof Stream.Duplex)
    this.objectMode = this.objectMode || !!options.readableObjectMode;

  var hwm = options.highWaterMark;
  var defaultHwm = this.objectMode ? 16 : 16 * 1024;
  this.highWaterMark = (hwm || hwm === 0) ? hwm : defaultHwm;

  // highWaterMark 高水位標(biāo)識
  // 內(nèi)部緩存高于 highWaterMark 時(shí)入撒,會(huì)停止調(diào)用 _read() 獲取數(shù)據(jù)
  // 默認(rèn) 16k
  this.highWaterMark = Math.floor(this.highWaterMark);

  // Readable 流內(nèi)部緩沖池荆烈,是一個(gè) buffer 鏈表
  // 之所以不用數(shù)組實(shí)現(xiàn)诈胜,是因?yàn)殒湵碓鰟h頭尾元素更快
  this.buffer = new BufferList();
  // 緩存大小
  this.length = 0;
  // pipe 的流
  this.pipes = null;
  this.pipesCount = 0;
  // flow 模式標(biāo)識
  this.flowing = null;
  // Readable 狀態(tài)標(biāo)識,為 true 表示數(shù)據(jù)源已讀取完畢
  // 此時(shí) Readable 中可能還有數(shù)據(jù)蚁滋,不能再向緩沖池中 push() 數(shù)據(jù)
  this.ended = false;
  // Readable 狀態(tài)標(biāo)識宿接,為 true 表示 end 事件已觸發(fā)
  // 此時(shí) Readable 中數(shù)據(jù)已讀取完畢,不能再向緩沖池中 push() 或 unshift() 數(shù)據(jù)
  this.endEmitted = false;
  // Readable 狀態(tài)標(biāo)識辕录,為 true 表示正在調(diào)用 _read() 讀取數(shù)據(jù)
  this.reading = false;
  this.sync = true;
  // 標(biāo)識需要觸發(fā) readable 事件
  this.needReadable = false;
  // 標(biāo)識已觸發(fā) readable 事件
  this.emittedReadable = false;
  this.readableListening = false;
  this.resumeScheduled = false;
  this.destroyed = false;
  this.defaultEncoding = options.defaultEncoding || 'utf8';
  this.awaitDrain = 0;
  this.readingMore = false;

  // 解碼器
  this.decoder = null;
  this.encoding = null;
  if (options.encoding) {
    if (!StringDecoder)
      StringDecoder = require('string_decoder').StringDecoder;
    this.decoder = new StringDecoder(options.encoding);
    this.encoding = options.encoding;
  }
}

在 Readable 的構(gòu)造函數(shù)中睦霎,可通過 options 傳入?yún)?shù),其中 options.read 函數(shù)是必需的

readable._readableState 中保存了 Readable 的各種狀態(tài)與屬性

Birth-Death

在這里將 Readable 的 Birth-Death 分為五個(gè)狀態(tài):

表中為 this._readableSate 的屬性

  • start: 初始狀態(tài)踏拜,Readable 剛剛被創(chuàng)建碎赢,還未調(diào)用 readable.read()

    length reading ended endEmitted
    0 false false false
  • reading: 代表正在從數(shù)據(jù)源中讀取數(shù)據(jù),此時(shí)緩存大小 this._readableSate.length 小于 highWaterMark速梗,應(yīng)讀取數(shù)據(jù)使緩存達(dá)到 highWaterMark

    length reading ended endEmitted
    < highWaterMark true false false
  • read: Readable 從數(shù)據(jù)源讀取數(shù)據(jù)后的相對穩(wěn)定狀態(tài)

    length reading ended endEmitted
    >= highWaterMark false false false
  • ended: 數(shù)據(jù)已經(jīng)全部讀取完成( push(null) )肮塞,此時(shí) push(chunk) 會(huì)報(bào) stream.push() after EOF 錯(cuò)誤

    length reading ended endEmitted
    >= 0 false true false
  • endEmitted: end 事件觸發(fā)完成,此時(shí) unshift(chunk) 會(huì)報(bào) stream.unshift() after end event 錯(cuò)誤

    length reading ended endEmitted
    0 false true true

它們之間的關(guān)系如下:

         1           4         5
  start ==> reading ==> ended ==> endEmitted
             || /\
           2 \/ || 3
             read

1. start ==> reading

start 狀態(tài)變?yōu)?reading 狀態(tài)姻锁,發(fā)生在第一次調(diào)用 read() 時(shí)

// lib/_stream_readable.js

Readable.prototype.read = function(n) {
  debug('read', n);
  n = parseInt(n, 10);
  var state = this._readableState;
  var nOrig = n;

  if (n !== 0)
    state.emittedReadable = false;

  // 調(diào)用 read(0)時(shí)枕赵,如果緩存大于 highWaterMark 則直接觸發(fā) readable 事件
  if (n === 0 &&
      state.needReadable &&
      (state.length >= state.highWaterMark || state.ended)) {
    debug('read: emitReadable', state.length, state.ended);
    if (state.length === 0 && state.ended)
      endReadable(this);
    else
      emitReadable(this);
    return null;
  }

  // 計(jì)算可讀數(shù)據(jù)量
  // n = NaN ==> 讀取全部
  // n <= state.length ==> 讀取 n
  // n > state.length ==> 讀取 0,并使 Readable 從數(shù)據(jù)源讀取數(shù)據(jù)
  // 
  // n > state.highWaterMark ==> 重新計(jì)算 highWaterMark位隶,大小是大于 n 的最小 2^x
  n = howMuchToRead(n, state);

  // 當(dāng) Readable 已經(jīng)讀完時(shí)拷窜,調(diào)用 endReadable() ,結(jié)束 Readable
  if (n === 0 && state.ended) {
    if (state.length === 0)
      endReadable(this);
    return null;
  }

  // 判斷是否應(yīng)該從數(shù)據(jù)源讀取數(shù)據(jù)
  // BEGIN
  var doRead = state.needReadable;
  debug('need readable', doRead);

  if (state.length === 0 || state.length - n < state.highWaterMark) {
    doRead = true;
    debug('length less than watermark', doRead);
  }
  // END

  if (state.ended || state.reading) {
    // 對于 ended 或 reading 狀態(tài)的 Readable 是不需要讀取數(shù)據(jù)的
    doRead = false;
    debug('reading or ended', doRead);
  } else if (doRead) {
    // 讀取數(shù)據(jù)
    debug('do read');
    state.reading = true;
    state.sync = true;

    if (state.length === 0)
      state.needReadable = true;
    // 從數(shù)據(jù)源讀取數(shù)據(jù)涧黄,可能是異步篮昧,也可能是同步
    this._read(state.highWaterMark);
    state.sync = false;
    // 因?yàn)?_read() 函數(shù)可能是異步的,也可能是同步的
    // 在同步情況下笋妥,需要重新確認(rèn)可讀長度
    if (!state.reading)
      n = howMuchToRead(nOrig, state);
  }

  // 獲取數(shù)據(jù)
  var ret;
  if (n > 0) ret = fromList(n, state); // 從緩沖池中讀取數(shù)據(jù)
  else ret = null;

  if (ret === null) {
    state.needReadable = true;
    n = 0;
  } else {
    state.length -= n;
  }

  // ...

  if (ret !== null)
    this.emit('data', ret);

  return ret;
};

// 必須實(shí)現(xiàn)的方法
Readable.prototype._read = function(n) {
  this.emit('error', new Error('_read() is not implemented'));
};

// 計(jì)算可讀長度
function howMuchToRead(n, state) {
  if (n <= 0 || (state.length === 0 && state.ended))
    return 0;
  if (state.objectMode)
    return 1;
  if (n !== n) { // NaN
    if (state.flowing && state.length)
      return state.buffer.head.data.length;
    else
      return state.length;
  }

  if (n > state.highWaterMark)
    // 當(dāng)需要數(shù)據(jù)大于 highWaterMark 時(shí)懊昨,調(diào)整 highWaterMark 大小到大于 n 的最小 2^x
    state.highWaterMark = computeNewHighWaterMark(n);
  if (n <= state.length)
    return n;
  // 緩沖池中數(shù)據(jù)不夠
  if (!state.ended) {
    state.needReadable = true;
    return 0;
  }
  return state.length;
}

調(diào)用 read() 后,如果緩沖池中數(shù)據(jù)不夠或讀取后低于 highWaterMark春宣,則調(diào)用 _read() 來讀取更多的數(shù)據(jù)酵颁,否則直接返回讀取的數(shù)據(jù)

當(dāng)期望數(shù)據(jù)量大于 highWaterMark 時(shí),重新計(jì)算 highWaterMark月帝,大小是大于期望數(shù)據(jù)量的最小 2^x

2. reading ==> read

調(diào)用 _read() 后躏惋,會(huì)異步或同步地將調(diào)用 push(chunk),將數(shù)據(jù)放入緩沖池嚷辅,并使 Readable 從 reading 狀態(tài)變?yōu)?read 狀態(tài)

// lib/_stream_readable.js

Readable.prototype.push = function(chunk, encoding) {
  var state = this._readableState;
  var skipChunkCheck;

  if (!state.objectMode) {
    if (typeof chunk === 'string') {
      encoding = encoding || state.defaultEncoding;
      // 如果指定編碼與 Readable 編碼不同簿姨,則將 chunk 使用指定編碼解碼為 Buffer
      if (encoding !== state.encoding) {
        chunk = Buffer.from(chunk, encoding);
        encoding = '';
      }
      // string 不需要檢查
      skipChunkCheck = true;
    }
  } else {
    // object mode 的 Readable 也不需要檢查
    skipChunkCheck = true;
  }

  return readableAddChunk(this, chunk, encoding, false, skipChunkCheck);
};

function readableAddChunk(stream, chunk, encoding, addToFront, skipChunkCheck) {
  var state = stream._readableState;
  if (chunk === null) { // 結(jié)束信號
    state.reading = false;
    onEofChunk(stream, state);
  } else {
    var er;
    if (!skipChunkCheck) // 檢查 chunk 格式
      er = chunkInvalid(state, chunk);
    if (er) {
      stream.emit('error', er);
    } else if (state.objectMode || chunk && chunk.length > 0) {
      if (typeof chunk !== 'string' &&
          Object.getPrototypeOf(chunk) !== Buffer.prototype &&
          !state.objectMode) {
        chunk = Stream._uint8ArrayToBuffer(chunk);
      }

      if (addToFront) { // unshift() 的 hook
        if (state.endEmitted)
          stream.emit('error', new Error('stream.unshift() after end event'));
        else
          addChunk(stream, state, chunk, true); // 將數(shù)據(jù)添加到緩沖池中
      } else if (state.ended) {
        stream.emit('error', new Error('stream.push() after EOF'));
      } else {
        state.reading = false;
        if (state.decoder && !encoding) {
          chunk = state.decoder.write(chunk);
          if (state.objectMode || chunk.length !== 0)
            addChunk(stream, state, chunk, false); // 將數(shù)據(jù)添加到緩沖池中
          else 
            maybeReadMore(stream, state); // 會(huì)在 addChunk() 函數(shù)內(nèi)部調(diào)用
        } else {
          addChunk(stream, state, chunk, false); // 將數(shù)據(jù)添加到緩沖池中
        }
      }
    } else if (!addToFront) {
      state.reading = false;
    }
  }

  return needMoreData(state);
  // return !state.ended &&  數(shù)據(jù)源還有數(shù)據(jù)
  //          (state.needReadable ||  需要更多數(shù)據(jù)
  //           state.length < state.highWaterMark ||  緩存小于 highWaterMark
  //           state.length === 0)
}

function addChunk(stream, state, chunk, addToFront) {
  if (state.flowing && state.length === 0 && !state.sync) {
    // 對于 flow 模式的 Readable,直接觸發(fā) data 事件簸搞,并繼續(xù)讀取數(shù)據(jù)就行
    stream.emit('data', chunk);
    stream.read(0);
  } else {
    state.length += state.objectMode ? 1 : chunk.length;
    if (addToFront)
      state.buffer.unshift(chunk);
    else
      state.buffer.push(chunk);

    if (state.needReadable)
      emitReadable(stream);
  }

  // 在允許的情況下款熬,讀取數(shù)據(jù)直到 highWaterMark
  maybeReadMore(stream, state);
}

調(diào)用 push(chunk) 時(shí)深寥,會(huì)將 chunk 放入緩沖池內(nèi)攘乒,并改變 Readable 的狀態(tài)贤牛。如果 Readable 處于 ended 狀態(tài),會(huì)報(bào) stream.push() after EOF 錯(cuò)誤

如果緩存小于 highWaterMark则酝,返回 true殉簸,意味著需要寫入更多的數(shù)據(jù)

3. read ==> reading

從 read 到 reading 狀態(tài),意味著需要讀取更多的數(shù)據(jù)沽讹,即緩存小于 highWaterMark

緩存與 highWaterMark 的關(guān)系可以根據(jù) push(chunk) 的返回值來判斷般卑,但是需要使用者手動(dòng)處理。因此爽雄,為了方便使用蝠检,addChunk() 函數(shù)會(huì)自動(dòng)調(diào)用 maybeReadMore() 來異步讀取數(shù)據(jù)。這樣挚瘟,即使單次 _read() 無法達(dá)到 highWaterMark叹谁,也可以通過多次異步讀取,使數(shù)據(jù)流動(dòng)起來

// lib/_stream_readable.js

function maybeReadMore(stream, state) {
  if (!state.readingMore) {
    state.readingMore = true;
    process.nextTick(maybeReadMore_, stream, state);
  }
}

function maybeReadMore_(stream, state) {
  var len = state.length;
  while (!state.reading && !state.flowing && !state.ended &&
         state.length < state.highWaterMark) {
    debug('maybeReadMore read 0');
    stream.read(0);
    if (len === state.length) // 取不到數(shù)據(jù)就放棄
      break;
    else
      len = state.length;
  }
  state.readingMore = false;
}

maybeReadMore() 函數(shù)內(nèi)乘盖,通過異步讀取數(shù)據(jù)焰檩,直到 highWaterMark

那么為什么是異步讀取數(shù)據(jù)呢?

因?yàn)槎┛颍?_read() 函數(shù)內(nèi)析苫,可能不止一次調(diào)用 push(chunk)

如果是同步,push(chunk) 后穿扳,因?yàn)闆]有達(dá)到 highWaterMark衩侥,會(huì)繼續(xù)調(diào)用 read(0),發(fā)生第二次 _read()矛物。第二次 _read() 也可能導(dǎo)致第三次 _read() 茫死,直到 highWaterMark

待整個(gè)調(diào)用完畢后,緩沖池內(nèi)會(huì)有 highWaterMark * n( _read() 內(nèi)調(diào)用 push(chunk) 次數(shù) )的數(shù)據(jù)泽谨,而這與 highWaterMark 的設(shè)計(jì)是不符的

如果是異步璧榄,則可以等 _read() 執(zhí)行完畢后,在 process.nextTick() 內(nèi)再次調(diào)用 _read() 讀取數(shù)據(jù)吧雹,不會(huì)發(fā)生上面的問題

4. reading ==> ended

當(dāng)數(shù)據(jù)源讀取完畢時(shí)骨杂,需要調(diào)用 push(null) 來通知 Rreadable 數(shù)據(jù)源已經(jīng)讀取完畢。push(null) 函數(shù)內(nèi)部會(huì)調(diào)用 onEofChunk()

// lib/_stream_readable.js

function onEofChunk(stream, state) {
  if (state.ended) return;
  if (state.decoder) {
    var chunk = state.decoder.end();
    if (chunk && chunk.length) {
      state.buffer.push(chunk);
      state.length += state.objectMode ? 1 : chunk.length;
    }
  }
  state.ended = true;

  // 觸發(fā) readable 事件雄卷,通知監(jiān)聽者來處理剩余數(shù)據(jù)
  emitReadable(stream);
}

onEofChunk() 函數(shù)將 readable 標(biāo)記為 ended 狀態(tài)后搓蚪,禁止再向緩沖池內(nèi) push 數(shù)據(jù)。此時(shí)丁鹉,緩沖池內(nèi)可能還有數(shù)據(jù)

5. ended ==> endEmitted

ended 狀態(tài)的 Readable 內(nèi)可能還有數(shù)據(jù)妒潭。因此悴能,當(dāng)數(shù)據(jù)全部被讀取后,需要調(diào)用 endReadable() 來結(jié)束 Readable

// lib/_stream_readable.js

function endReadable(stream) {
  var state = stream._readableState;

  // state.length 一定是 0
  if (state.length > 0)
    throw new Error('"endReadable()" called on non-empty stream');

  if (!state.endEmitted) {
    state.ended = true;
    process.nextTick(endReadableNT, state, stream);
  }
}

function endReadableNT(state, stream) {
  // 防止中間調(diào)用 unshift(chunk)雳灾,向緩沖池中放入數(shù)據(jù)
  if (!state.endEmitted && state.length === 0) {
    state.endEmitted = true;
    stream.readable = false;
    stream.emit('end');
  }
}

調(diào)用 endReadable() 時(shí)漠酿,緩沖池一定為空。整個(gè)調(diào)用完成后谎亩,觸發(fā) end 事件炒嘲,Readable 將不能再讀取或?qū)懭耄?push() / unshift() )數(shù)據(jù)

End

到這里,已經(jīng)走完了 Readable 的整個(gè) Birth-Death 過程

整個(gè)過程就如下面這個(gè)圖:

         1           4         5
  start ==> reading ==> ended ==> endEmitted
             || /\
           2 \/ || 3
             read

1. read()
2. push(chunk)
3. maybeReadMore() ==> read(0)
4. push(null)
5. endReadable()

根據(jù)這個(gè)圖還有代碼匈庭,在腦袋里面夫凸,把 Readable 的模型運(yùn)行一遍,就能了解它的實(shí)現(xiàn)了

參考:

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末阱持,一起剝皮案震驚了整個(gè)濱河市夭拌,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌衷咽,老刑警劉巖鸽扁,帶你破解...
    沈念sama閱讀 206,602評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異兵罢,居然都是意外死亡献烦,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,442評論 2 382
  • 文/潘曉璐 我一進(jìn)店門卖词,熙熙樓的掌柜王于貴愁眉苦臉地迎上來巩那,“玉大人,你說我怎么就攤上這事此蜈〖春幔” “怎么了?”我有些...
    開封第一講書人閱讀 152,878評論 0 344
  • 文/不壞的土叔 我叫張陵裆赵,是天一觀的道長东囚。 經(jīng)常有香客問我,道長战授,這世上最難降的妖魔是什么页藻? 我笑而不...
    開封第一講書人閱讀 55,306評論 1 279
  • 正文 為了忘掉前任,我火速辦了婚禮植兰,結(jié)果婚禮上份帐,老公的妹妹穿的比我還像新娘。我一直安慰自己楣导,他們只是感情好废境,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,330評論 5 373
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著,像睡著了一般噩凹。 火紅的嫁衣襯著肌膚如雪巴元。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,071評論 1 285
  • 那天驮宴,我揣著相機(jī)與錄音逮刨,去河邊找鬼。 笑死幻赚,一個(gè)胖子當(dāng)著我的面吹牛禀忆,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播落恼,決...
    沈念sama閱讀 38,382評論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼离熏!你這毒婦竟也來了佳谦?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,006評論 0 259
  • 序言:老撾萬榮一對情侶失蹤滋戳,失蹤者是張志新(化名)和其女友劉穎钻蔑,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體奸鸯,經(jīng)...
    沈念sama閱讀 43,512評論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡咪笑,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,965評論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了娄涩。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片窗怒。...
    茶點(diǎn)故事閱讀 38,094評論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖蓄拣,靈堂內(nèi)的尸體忽然破棺而出扬虚,到底是詐尸還是另有隱情,我是刑警寧澤球恤,帶...
    沈念sama閱讀 33,732評論 4 323
  • 正文 年R本政府宣布辜昵,位于F島的核電站,受9級特大地震影響咽斧,放射性物質(zhì)發(fā)生泄漏堪置。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,283評論 3 307
  • 文/蒙蒙 一张惹、第九天 我趴在偏房一處隱蔽的房頂上張望舀锨。 院中可真熱鬧,春花似錦诵叁、人聲如沸雁竞。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,286評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽碑诉。三九已至彪腔,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間进栽,已是汗流浹背德挣。 一陣腳步聲響...
    開封第一講書人閱讀 31,512評論 1 262
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留快毛,地道東北人格嗅。 一個(gè)月前我還...
    沈念sama閱讀 45,536評論 2 354
  • 正文 我出身青樓,卻偏偏與公主長得像唠帝,于是被迫代替她去往敵國和親屯掖。 傳聞我的和親對象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,828評論 2 345

推薦閱讀更多精彩內(nèi)容