Node.js源碼解析-pipe實現(xiàn)

Node.js源碼解析-pipe實現(xiàn)

歡迎來我的博客閱讀:《Node.js源碼解析-pipe實現(xiàn)》

從前面兩篇文章,我們了解到拱燃。想要把 Readable 的數(shù)據(jù)寫到 Writable了讨,就必須先手動的將數(shù)據(jù)讀入內(nèi)存献汗,然后寫入 Writable暇赤。換句話說耙蔑,每次傳遞數(shù)據(jù)時,都需要寫如下的模板代碼

readable.on('readable', (err) => {
  if(err) throw err

  writable.write(readable.read())
})

為了方便使用疟赊,Node.js 提供了 pipe() 方法郊供,讓我們可以優(yōu)雅的傳遞數(shù)據(jù)

readable.pipe(writable)

現(xiàn)在,就讓我們來看看它是如何實現(xiàn)的吧

pipe

首先需要先調(diào)用 Readable 的 pipe() 方法

// lib/_stream_readable.js

Readable.prototype.pipe = function(dest, pipeOpts) {
  var src = this;
  var state = this._readableState;

  // 記錄 Writable
  switch (state.pipesCount) {
    case 0:
      state.pipes = dest;
      break;
    case 1:
      state.pipes = [state.pipes, dest];
      break;
    default:
      state.pipes.push(dest);
      break;
  }
  state.pipesCount += 1;

  // ...

    src.once('end', endFn);

  dest.on('unpipe', onunpipe);
  
  // ...

  dest.on('drain', ondrain);

  // ...

  src.on('data', ondata);

  // ...

  // 保證 error 事件觸發(fā)時近哟,onerror 首先被執(zhí)行
  prependListener(dest, 'error', onerror);

  // ...

  dest.once('close', onclose);
  
  // ...

  dest.once('finish', onfinish);

  // ...

  // 觸發(fā) Writable 的 pipe 事件
  dest.emit('pipe', src);

  // 將 Readable 改為 flow 模式
  if (!state.flowing) {
    debug('pipe resume');
    src.resume();
  }

  return dest;
};

執(zhí)行 pipe() 函數(shù)時颂碘,首先將 Writable 記錄到 state.pipes 中,然后綁定相關事件椅挣,最后如果 Readable 不是 flow 模式,就調(diào)用 resume() 將 Readable 改為 flow 模式

傳遞數(shù)據(jù)

Readable 從數(shù)據(jù)源獲取到數(shù)據(jù)后塔拳,觸發(fā) data 事件鼠证,執(zhí)行 ondata()

ondata() 相關代碼:

// lib/_stream_readable.js

  // 防止在 dest.write(chunk) 內(nèi)調(diào)用 src.push(chunk) 造成 awaitDrain 重復增加,awaitDrain 不能清零靠抑,Readable 卡住的情況
  // 詳情見 https://github.com/nodejs/node/issues/7278
  var increasedAwaitDrain = false;
  function ondata(chunk) {
    debug('ondata');
    increasedAwaitDrain = false;
    var ret = dest.write(chunk);
    if (false === ret && !increasedAwaitDrain) {
      // 防止在 dest.write() 內(nèi)調(diào)用 src.unpipe(dest)量九,導致 awaitDrain 不能清零,Readable 卡住的情況
      if (((state.pipesCount === 1 && state.pipes === dest) ||
           (state.pipesCount > 1 && state.pipes.indexOf(dest) !== -1)
          ) && 
          !cleanedUp) {
        debug('false write response, pause', src._readableState.awaitDrain);
        src._readableState.awaitDrain++;
        increasedAwaitDrain = true;
      }
      // 進入 pause 模式
      src.pause();
    }
  }

ondata(chunk) 函數(shù)內(nèi)颂碧,通過 dest.write(chunk) 將數(shù)據(jù)寫入 Writable

此時荠列,在 _write() 內(nèi)部可能會調(diào)用 src.push(chunk) 或使其 unpipe,這會導致 awaitDrain 多次增加载城,不能清零肌似,Readable 卡住

當不能再向 Writable 寫入數(shù)據(jù)時,Readable 會進入 pause 模式诉瓦,直到所有的 drain 事件觸發(fā)

觸發(fā) drain 事件川队,執(zhí)行 ondrain()

// lib/_stream_readable.js

  var ondrain = pipeOnDrain(src);

  function pipeOnDrain(src) {
    return function() {
      var state = src._readableState;
      debug('pipeOnDrain', state.awaitDrain);
      if (state.awaitDrain)
        state.awaitDrain--;
      // awaitDrain === 0力细,且有 data 監(jiān)聽器
      if (state.awaitDrain === 0 && EE.listenerCount(src, 'data')) {
        state.flowing = true;
        flow(src);
      }
    };
  }

每個 drain 事件觸發(fā)時,都會減少 awaitDrain固额,直到 awaitDrain 為 0眠蚂。此時,調(diào)用 flow(src)斗躏,使 Readable 進入 flow 模式

到這里逝慧,整個數(shù)據(jù)傳遞循環(huán)已經(jīng)建立,數(shù)據(jù)會順著循環(huán)源源不斷的流入 Writable啄糙,直到所有數(shù)據(jù)寫入完成

unpipe

不管寫入過程中是否出現(xiàn)錯誤笛臣,最后都會執(zhí)行 unpipe()

// lib/_stream_readable.js

// ...

  function unpipe() {
    debug('unpipe');
    src.unpipe(dest);
  }

// ...

Readable.prototype.unpipe = function(dest) {
  var state = this._readableState;
  var unpipeInfo = { hasUnpiped: false };

  // 啥也沒有
  if (state.pipesCount === 0)
    return this;

  // 只有一個
  if (state.pipesCount === 1) {
    if (dest && dest !== state.pipes)
      return this;
    // 沒有指定就 unpipe 所有
    if (!dest)
      dest = state.pipes;

    state.pipes = null;
    state.pipesCount = 0;
    state.flowing = false;
    if (dest)
      dest.emit('unpipe', this, unpipeInfo);
    return this;
  }

  // 沒有指定就 unpipe 所有
  if (!dest) {
    var dests = state.pipes;
    var len = state.pipesCount;
    state.pipes = null;
    state.pipesCount = 0;
    state.flowing = false;

    for (var i = 0; i < len; i++)
      dests[i].emit('unpipe', this, unpipeInfo);
    return this;
  }

  // 找到指定 Writable,并 unpipe
  var index = state.pipes.indexOf(dest);
  if (index === -1)
    return this;

  state.pipes.splice(index, 1);
  state.pipesCount -= 1;
  if (state.pipesCount === 1)
    state.pipes = state.pipes[0];

  dest.emit('unpipe', this, unpipeInfo);

  return this;
};

Readable.prototype.unpipe() 函數(shù)會根據(jù) state.pipes 屬性和 dest 參數(shù)迈套,選擇執(zhí)行策略捐祠。最后會觸發(fā) dest 的 unpipe 事件

unpipe 事件觸發(fā)后,調(diào)用 onunpipe()桑李,清理相關數(shù)據(jù)

// lib/_stream_readable.js

  function onunpipe(readable, unpipeInfo) {
    debug('onunpipe');
    if (readable === src) {
      if (unpipeInfo && unpipeInfo.hasUnpiped === false) {
        unpipeInfo.hasUnpiped = true;
        // 清理相關數(shù)據(jù)
        cleanup();
      }
    }
  }

End

在整個 pipe 的過程中踱蛀,Readable 是主動方 ( 負責整個 pipe 過程:包括數(shù)據(jù)傳遞、unpipe 與異常處理 )贵白,Writable 是被動方 ( 只需要觸發(fā) drain 事件 )

總結一下 pipe 的過程:

  • 首先執(zhí)行 readbable.pipe(writable)率拒,將 readable 與 writable 對接上
  • 當 readable 中有數(shù)據(jù)時,readable.emit('data')禁荒,將數(shù)據(jù)寫入 writable
  • 如果 writable.write(chunk) 返回 false猬膨,則進入 pause 模式,等待 drain 事件觸發(fā)
  • drain 事件全部觸發(fā)后呛伴,再次進入 flow 模式勃痴,寫入數(shù)據(jù)
  • 不管數(shù)據(jù)寫入完成或發(fā)生中斷,最后都會調(diào)用 unpipe()
  • unpipe() 調(diào)用 Readable.prototype.unpipe()热康,觸發(fā) dest 的 unpipe 事件沛申,清理相關數(shù)據(jù)

參考:

最后編輯于
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市姐军,隨后出現(xiàn)的幾起案子铁材,更是在濱河造成了極大的恐慌,老刑警劉巖奕锌,帶你破解...
    沈念sama閱讀 217,907評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件著觉,死亡現(xiàn)場離奇詭異,居然都是意外死亡惊暴,警方通過查閱死者的電腦和手機饼丘,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,987評論 3 395
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來缴守,“玉大人葬毫,你說我怎么就攤上這事镇辉。” “怎么了贴捡?”我有些...
    開封第一講書人閱讀 164,298評論 0 354
  • 文/不壞的土叔 我叫張陵忽肛,是天一觀的道長。 經(jīng)常有香客問我烂斋,道長屹逛,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,586評論 1 293
  • 正文 為了忘掉前任汛骂,我火速辦了婚禮罕模,結果婚禮上,老公的妹妹穿的比我還像新娘帘瞭。我一直安慰自己淑掌,他們只是感情好,可當我...
    茶點故事閱讀 67,633評論 6 392
  • 文/花漫 我一把揭開白布蝶念。 她就那樣靜靜地躺著抛腕,像睡著了一般。 火紅的嫁衣襯著肌膚如雪媒殉。 梳的紋絲不亂的頭發(fā)上担敌,一...
    開封第一講書人閱讀 51,488評論 1 302
  • 那天,我揣著相機與錄音廷蓉,去河邊找鬼全封。 笑死,一個胖子當著我的面吹牛桃犬,可吹牛的內(nèi)容都是我干的刹悴。 我是一名探鬼主播,決...
    沈念sama閱讀 40,275評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼攒暇,長吁一口氣:“原來是場噩夢啊……” “哼颂跨!你這毒婦竟也來了?” 一聲冷哼從身側響起扯饶,我...
    開封第一講書人閱讀 39,176評論 0 276
  • 序言:老撾萬榮一對情侶失蹤劝萤,失蹤者是張志新(化名)和其女友劉穎得滤,沒想到半個月后际歼,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體底挫,經(jīng)...
    沈念sama閱讀 45,619評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡指煎,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,819評論 3 336
  • 正文 我和宋清朗相戀三年活箕,在試婚紗的時候發(fā)現(xiàn)自己被綠了先壕。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片嗡害。...
    茶點故事閱讀 39,932評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡琢歇,死狀恐怖兰怠,靈堂內(nèi)的尸體忽然破棺而出梦鉴,到底是詐尸還是另有隱情,我是刑警寧澤揭保,帶...
    沈念sama閱讀 35,655評論 5 346
  • 正文 年R本政府宣布肥橙,位于F島的核電站,受9級特大地震影響秸侣,放射性物質(zhì)發(fā)生泄漏存筏。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,265評論 3 329
  • 文/蒙蒙 一味榛、第九天 我趴在偏房一處隱蔽的房頂上張望椭坚。 院中可真熱鬧,春花似錦搏色、人聲如沸善茎。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,871評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽垂涯。三九已至,卻和暖如春略吨,著一層夾襖步出監(jiān)牢的瞬間集币,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,994評論 1 269
  • 我被黑心中介騙來泰國打工翠忠, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留鞠苟,地道東北人。 一個月前我還...
    沈念sama閱讀 48,095評論 3 370
  • 正文 我出身青樓秽之,卻偏偏與公主長得像当娱,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子考榨,可洞房花燭夜當晚...
    茶點故事閱讀 44,884評論 2 354

推薦閱讀更多精彩內(nèi)容

  • stream 流是一個抽象接口跨细,在 Node 里被不同的對象實現(xiàn)。例如 request to an HTTP se...
    明明三省閱讀 3,404評論 1 10
  • 流是Node中最重要的組件和模式之一河质。在社區(qū)里有一句格言說:讓一切事務流動起來冀惭。這已經(jīng)足夠來描述在Node中流...
    宮若石閱讀 552評論 0 0
  • https://nodejs.org/api/documentation.html 工具模塊 Assert 測試 ...
    KeKeMars閱讀 6,332評論 0 6
  • Node.js源碼解析-Writable實現(xiàn) 歡迎來我的博客閱讀:《Node.js源碼解析-Writable實現(xiàn)》...
    xiedacon閱讀 638評論 0 2
  • Buffer Buffer的構成 Buffer對象類似數(shù)組,它的元素位16進制的兩位數(shù)掀鹅,即0到255的數(shù)值散休。主要是...
    人失格閱讀 1,831評論 0 0