Node.js 設(shè)計(jì)模式筆記 —— Streams 流編程

Streams 是 Node.js 的組件和模式中最重要的幾個(gè)之一骡显。在 Node.js 這類基于 event 的平臺(tái)上,最高效的實(shí)時(shí)地處理 I/O 的方式梭稚,就是當(dāng)有輸入時(shí)就立即接收數(shù)據(jù),應(yīng)用產(chǎn)生輸出時(shí)就立即發(fā)送數(shù)據(jù)。

Buffering vs streaming

對(duì)于輸入數(shù)據(jù)的處理哩照,buffer 模式會(huì)將來自資源的所有數(shù)據(jù)收集到 buffer 中,待操作完成再將數(shù)據(jù)作為單一的 blob of data 傳遞給調(diào)用者懒浮;相反地飘弧,streams 允許我們一旦接收到數(shù)據(jù)就立即對(duì)其進(jìn)行處理。
單從效率上說砚著,streams 在空間(內(nèi)存使用)和時(shí)間(CPU 時(shí)鐘)的使用上都更加高效次伶。此外 Node.js 中的 streams 還有另一個(gè)重要的優(yōu)勢:組合性

空間效率

使用 buffered API 完成 Gzip 壓縮:

import {promises as fs} from 'fs'
import {gzip} from 'zlib'
import {promisify} from 'util'

const gzipPromise = promisify(gzip)
const filename = process.argv[2]

async function main() {
  const data = await fs.readFile(filename)
  const gzippedData = await gzipPromise(data)
  await fs.writeFile(`${filename}.gz`, gzippedData)
  console.log('File successfully compressed')
}

main()

node gzip-buffer.js <path to file>

如果我們使用上述代碼壓縮一個(gè)足夠大的文件(比如說 8G)稽穆,我們很有可能會(huì)收到一個(gè)錯(cuò)誤信息冠王,類似文件大小超過了允許的最大 buffer 大小。

RangeError [ERR_FS_FILE_TOO_LARGE]: File size (8130792448) is greater
than possible Buffer: 2147483647 bytes

即便沒有超過 V8 的 buffer 大小限制秧骑,也有可能出現(xiàn)物理內(nèi)存不夠用的情況版确。

使用 streams 實(shí)現(xiàn) Gzip 壓縮:

import {createReadStream, createWriteStream} from 'fs'
import {createGzip} from 'zlib'

const filename = process.argv[2]

createReadStream(filename)
  .pipe(createGzip())
  .pipe(createWriteStream(`${filename}.gz`))
  .on('finish', () => console.log('File successfully compressed'))

streams 的優(yōu)勢來自于其接口和可組合性扣囊,允許我們實(shí)現(xiàn)干凈、優(yōu)雅绒疗、簡潔的代碼侵歇。對(duì)于此處的示例,它可以對(duì)任意大小的文件進(jìn)行壓縮吓蘑,只需要消耗常量的內(nèi)存惕虑。

時(shí)間效率

假設(shè)我們需要?jiǎng)?chuàng)建一個(gè)應(yīng)用,能夠壓縮一個(gè)文件并將其上傳到一個(gè)遠(yuǎn)程的 HTTP 服務(wù)器磨镶。而服務(wù)器端則負(fù)責(zé)將接收到的文件解壓縮并保存溃蔫。
如果我們使用 buffer API 實(shí)現(xiàn)客戶端組件,則只有當(dāng)整個(gè)文件讀取和壓縮完成之后琳猫,上傳操作才開始觸發(fā)伟叛。同時(shí)在服務(wù)器端,也只有當(dāng)所有數(shù)據(jù)都接收完畢之后才開始解壓縮操作脐嫂。

更好一些的方案是使用 streams统刮。在客戶端,streams 允許我們以 chunk 為單位從文件系統(tǒng)逐個(gè)账千、分段地讀取數(shù)據(jù)侥蒙,并立即進(jìn)行壓縮和發(fā)送。同時(shí)在服務(wù)器端匀奏,每個(gè) chunk 被接收到后會(huì)立即進(jìn)行解壓縮鞭衩。

服務(wù)端程序:

import {createServer} from 'http'
import {createWriteStream} from 'fs'
import {createGunzip} from 'zlib'
import {basename, join} from 'path'

const server = createServer((req, res) => {
  const filename = basename(req.headers['x-filename'])
  const destFilename = join('received_files', filename)
  console.log(`File request received: ${filename}`)
  req
    .pipe(createGunzip())
    .pipe(createWriteStream(destFilename))
    .on('finish', () => {
      res.writeHead(201, {'Content-Type': 'text/plain'})
      res.end('OK\n')
      console.log(`File saved: ${destFilename}`)
    })
})

server.listen(3000, () => console.log('Listening on http://localhost:3000'))

客戶端程序:

import {request} from 'http'
import {createGzip} from 'zlib'
import {createReadStream} from 'fs'
import {basename} from 'path'

const filename = process.argv[2]
const serverHost = process.argv[3]

const httpRequestOptions = {
  hostname: serverHost,
  port: 3000,
  path: '/',
  method: 'PUT',
  headers: {
    'Content-Type': 'application/octet-stream',
    'Content-Encoding': 'gzip',
    'X-Filename': basename(filename)
  }
}

const req = request(httpRequestOptions, (res) => {
  console.log(`Server response: ${res.statusCode}`)
})

createReadStream(filename)
  .pipe(createGzip())
  .pipe(req)
  .on('finish', () => {
    console.log('File successfully sent')
  })

mkdir received_files
node gzip-receive.js
node gzip-send.js <path to file> localhost

借助 streams,整套流程的流水線在我們接收到第一個(gè)數(shù)據(jù)塊的時(shí)候就開始啟動(dòng)了娃善,完全不需要等待整個(gè)文件被讀取论衍。除此之外,下一個(gè)數(shù)據(jù)塊能夠被讀取時(shí)会放,不需要等到之前的任務(wù)完成就能被處理饲齐。即另一條流水線被并行地被裝配執(zhí)行,Node.js 可以將這些異步的任務(wù)并行化地執(zhí)行咧最。只需要保證數(shù)據(jù)塊最終的順序是固定的,而 Node.js 中 streams 的內(nèi)部實(shí)現(xiàn)機(jī)制保證了這一點(diǎn)御雕。

組合性

借助于 pipe() 方法矢沿,不同的 stream 能夠被組合在一起。每個(gè)處理單元負(fù)責(zé)各自的單一功能酸纲,最終被 pipe() 連接起來捣鲸。因?yàn)?streams 擁有統(tǒng)一的接口,它們彼此之間在 API 層面是互通的闽坡。只需要 pipeline 支持前一個(gè) stream 生成的數(shù)據(jù)類型(可以是二進(jìn)制栽惶、純文本甚至對(duì)象等)愁溜。

客戶端加密
import {createCipheriv, randomBytes} from 'crypto'
import {request} from 'http'
import {createGzip} from 'zlib'
import {createReadStream} from 'fs'
import {basename} from 'path'

const filename = process.argv[2]
const serverHost = process.argv[3]
const secret = Buffer.from(process.argv[4], 'hex')
const iv = randomBytes(16)

const httpRequestOptions = {
  hostname: serverHost,
  port: 3000,
  path: '/',
  method: 'PUT',
  headers: {
    'Content-Type': 'application/octet-stream',
    'Content-Encoding': 'gzip',
    'X-Filename': basename(filename),
    'X-Initialization-Vector': iv.toString('hex')
  }
}

const req = request(httpRequestOptions, (res) => {
  console.log(`Server response: ${res.statusCode}`)
})

createReadStream(filename)
  .pipe(createGzip())
  .pipe(createCipheriv('aes192', secret, iv))
  .pipe(req)
  .on('finish', () => {
    console.log('File successfully sent')
  })
服務(wù)端加密
import {createServer} from 'http'
import {createWriteStream} from 'fs'
import {createGunzip} from 'zlib'
import {basename, join} from 'path'
import {createDecipheriv, randomBytes} from 'crypto'

const secret = randomBytes(24)
console.log(`Generated secret: ${secret.toString('hex')}`)

const server = createServer((req, res) => {
  const filename = basename(req.headers['x-filename'])
  const iv = Buffer.from(
    req.headers['x-initialization-vector'], 'hex'
  )
  const destFilename = join('received_files', filename)
  console.log(`File request received: ${filename}`)
  req
    .pipe(createDecipheriv('aes192', secret, iv))
    .pipe(createGunzip())
    .pipe(createWriteStream(destFilename))
    .on('finish', () => {
      res.writeHead(201, {'Content-Type': 'text/plain'})
      res.end('OK\n')
      console.log(`File saved: ${destFilename}`)
    })
})

server.listen(3000, () => console.log('Listening on http://localhost:3000'))

Streams 詳解

實(shí)際上在 Node.js 中的任何地方都可見到 streams。比如核心模塊 fs 有 createReadStream() 方法用來讀取文件內(nèi)容外厂,createWriteStream() 方法用來向文件寫入數(shù)據(jù)冕象;HTTP requestresponse 對(duì)象本質(zhì)上也是 stream;zlib 模塊允許我們通過流接口壓縮和解壓縮數(shù)據(jù)汁蝶;甚至 crypto 模塊也提供了一些有用的流函數(shù)比如 createCipherivcreateDecipheriv渐扮。

streams 的結(jié)構(gòu)

Node.js 中的每一個(gè) stream 對(duì)象,都是對(duì)以下四種虛擬基類里任意一種的實(shí)現(xiàn)掖棉,這四個(gè)虛擬類都屬于 stream 核心模塊:

  • Readable
  • Writable
  • Duplex
  • Transform

每一個(gè) stream 類同時(shí)也是 EventEmitter 的實(shí)例墓律,實(shí)際上 Streams 可以生成幾種類型的 event。比如當(dāng)一個(gè) Readable 流讀取完畢時(shí)觸發(fā) end 事件幔亥,Writable 流吸入完畢時(shí)觸發(fā) finish 事件耻讽,或者當(dāng)任意錯(cuò)誤發(fā)生時(shí)拋出 error

Steams 之所以足夠靈活帕棉,一個(gè)重要的原因就是它們不僅僅能夠處理 binary data齐饮,還支持幾乎任意的 JavaScript 值。實(shí)際上 streams 有以下兩種操作模式:

  • Binary mode:以 chunk 的形式(比如 buffers 或 strings)傳輸數(shù)據(jù)
  • Object mode:通過由獨(dú)立對(duì)象(可以包含任意 JavaScript 值)組成的序列傳輸數(shù)據(jù)

上述兩種模式使得我們不僅僅可以利用 streams 處理 I/O 操作笤昨,還能夠幫助我們以函數(shù)式的方式將多個(gè)處理單元優(yōu)雅地組合起來祖驱。

從 Readable streams 讀取數(shù)據(jù)

non-flowing mode

默認(rèn)模式。readable 事件表示有新的數(shù)據(jù)可供讀取瞒窒,再通過 read() 方法同步地從內(nèi)部 buffer 讀取數(shù)據(jù)捺僻,返回一個(gè) Buffer 對(duì)象。
即從 stream 按需拉取數(shù)據(jù)崇裁。當(dāng) stream 以 Binary 模式工作時(shí)匕坯,我們還可以給 read() 方法指定一個(gè) size 值,以讀取特定數(shù)量的數(shù)據(jù)拔稳。

process.stdin
  .on('readable', () => {
    let chunk
    console.log('New data available')
    while ((chunk = process.stdin.read()) !== null) {
      console.log(
        `Chunk read (${chunk.length} bytes): "${chunk.toString()}"`
      )
    }
  })
  .on('end', () => console.log('End of stream'))
flowing mode

此模式下葛峻,數(shù)據(jù)并不會(huì)像之前那樣通過 read() 方法拉取巴比,而是一旦有數(shù)據(jù)可用术奖,就主動(dòng)推送給 data 事件的 listener。flowing 模式對(duì)于數(shù)據(jù)流的控制轻绞,相對(duì)而言靈活性較低一些采记。
由于默認(rèn)是 non-flowing 模式,為了使用 flowing 模式政勃,需要綁定一個(gè) listener 給 data 事件或者顯式地調(diào)用 resume() 方法唧龄。調(diào)用 pause() 方法會(huì)導(dǎo)致 stream 暫時(shí)停止發(fā)送 data 事件,任何傳入的數(shù)據(jù)會(huì)先被緩存到內(nèi)部 buffer奸远。即 stream 又切換回 non-flowing 模式既棺。

process.stdin
  .on('readable', () => {
    let chunk
    console.log('New data available')
    while ((chunk = process.stdin.read()) !== null) {
      console.log(
        `Chunk read (${chunk.length} bytes): "${chunk.toString()}"`
      )
    }
  })
  .on('end', () => console.log('End of stream'))
Async iterators

Readable 流同時(shí)也是 async iterators讽挟。

async function main() {
  for await (const chunk of process.stdin) {
    console.log('New data available')
    console.log(
      `Chunk read (${chunk.length} bytes): "${chunk.toString()}"`
    )
  }
  console.log('End of stream')
}

main()

實(shí)現(xiàn) Readable streams

import {Readable} from 'stream'
import Chance from 'chance'

const chance = Chance()

export class RandomStream extends Readable {
  constructor(options) {
    super(options)
    this.emittedBytes = 0
  }

  _read(size) {
    const chunk = chance.string({length: size})
    this.push(chunk, 'utf8')
    this.emittedBytes += chunk.length
    if (chance.bool({likelihood: 5})) {
      this.push(null)
    }
  }
}

const randomStream = new RandomStream()
randomStream
  .on('data', (chunk) => {
    console.log(`Chunk received (${chunk.length} bytes): ${chunk.toString()}`)
  })

為了實(shí)現(xiàn)一個(gè)自定義的 Readable stream,首先必須創(chuàng)建一個(gè)新的類丸冕,該類繼承自 stream 模塊中的 Readable耽梅。其次新創(chuàng)建的類中必須包含 _read() 方法的實(shí)現(xiàn)。
上面代碼中的 _read() 方法做了以下幾件事:

  • 借助第三方的 chance 模塊晨仑,生成一個(gè)長度為 size 的隨機(jī)字符串
  • 通過 push() 方法將字符傳推送到內(nèi)部 buffer
  • 依據(jù) 5% 的幾率自行終止褐墅,終止時(shí)推送 null 到內(nèi)部 buffer,作為 stream 的結(jié)束標(biāo)志

簡化版實(shí)現(xiàn)

import {Readable} from 'stream'
import Chance from 'chance'

const chance = new Chance()
let emittedBytes = 0

const randomStream = new Readable({
  read(size) {
    const chunk = chance.string({length: size})
    this.push(chunk, 'utf8')
    emittedBytes += chunk.length
    if (chance.bool({likelihood: 5})) {
      this.push(null)
    }
  }
})

randomStream
  .on('data', (chunk) => {
    console.log(`Chunk received (${chunk.length} bytes): ${chunk.toString()}`)
  })
從可迭代對(duì)象創(chuàng)建 Readable streams

Readable.from() 方法支持從數(shù)組或者其他可迭代對(duì)象(比如 generators, iterators, async iterators)創(chuàng)建 Readable streams洪己。

import {Readable} from 'stream'

const mountains = [
  {name: 'Everest', height: 8848},
  {name: 'K2', height: 8611},
  {name: 'Kangchenjunga', height: 8586},
  {name: 'Lhotse', height: 8516},
  {name: 'Makalu', height: 8481}
]

const mountainsStream = Readable.from(mountains)
mountainsStream.on('data', (mountain) => {
  console.log(`${mountain.name.padStart(14)}\t${mountain.height}m`)
})

Writable streams

向流寫入數(shù)據(jù)

write() 方法可以向 Writable stream 寫入數(shù)據(jù)妥凳。
writable.write(chunk, [encoding], [callback])

end() 方法可以向 stream 表明沒有更多的數(shù)據(jù)需要寫入攒庵。
writable.end([chunk], [encoding], [callback])

callback 回調(diào)函數(shù)等同于為 finish 事件注冊(cè)了一個(gè) listener赊窥,會(huì)在流中寫入的所有數(shù)據(jù)刷新到底層資源中時(shí)觸發(fā)。

import {createServer} from 'http'
import Chance from 'chance'

const chance = new Chance()
const server = createServer((req, res) => {
  res.writeHead(200, {'Content-Type': 'text/plain'})
  while (chance.bool({likelihood: 95})) {
    res.write(`${chance.string()}\n`)
  }
  res.end('\n\n')
  res.on('finish', () => console.log('All data sent'))
})

server.listen(8080, () => {
  console.log('listening on http://localhost:8080')
})

上面代碼中 HTTP 服務(wù)里的 res 對(duì)象是一個(gè) http.ServerResponse 對(duì)象糕再,實(shí)際上也是一個(gè) Writable stream拱镐。

實(shí)現(xiàn) Writable stream
import {Writable} from 'stream'
import {promises as fs} from 'fs'

class ToFileStream extends Writable {
  constructor(options) {
    super({...options, objectMode: true})
  }

  _write(chunk, encoding, cb) {
    fs.writeFile(chunk.path, chunk.content)
      .then(() => cb())
      .catch(cb)
  }
}

const tfs = new ToFileStream()

tfs.write({path: 'file1.txt', content: 'Hello'})
tfs.write({path: 'file2.txt', content: 'Node.js'})
tfs.write({path: 'file3.txt', content: 'streams'})
tfs.end(() => console.log('All files created'))

簡化形式

import {Writable} from 'stream'
import {promises as fs} from 'fs'

const tfs = new Writable({
  objectMode: true,
  write(chunk, encoding, cb) {
    fs.writeFile(chunk.path, chunk.content)
      .then(() => cb())
      .catch(cb)
  }
})

tfs.write({path: 'file1.txt', content: 'Hello'})
tfs.write({path: 'file2.txt', content: 'Node.js'})
tfs.write({path: 'file3.txt', content: 'streams'})
tfs.end(() => console.log('All files created'))

Duplex streams

Duplex 流艘款,既 Readable 又 Writable 的流。它的場景在于沃琅,有時(shí)候我們描述的實(shí)體既是數(shù)據(jù)源哗咆,也是數(shù)據(jù)的接收者,比如網(wǎng)絡(luò)套接字益眉。
Duplex 流同時(shí)繼承來著 stream.Readablestream.Writable 的方法晌柬。
為了創(chuàng)建一個(gè)自定義的 Duplex 流,我們必須同時(shí)提供 _read()_write() 的實(shí)現(xiàn)郭脂。

Transform streams

Transform 流是一種特殊類型的 Duplex 流年碘,主要針對(duì)數(shù)據(jù)的轉(zhuǎn)換。
對(duì)于 Duplex 流來說展鸡,流入和流出的數(shù)據(jù)之間并沒有直接的聯(lián)系屿衅。比如一個(gè) TCP 套接字,只是從遠(yuǎn)端接收或者發(fā)送數(shù)據(jù)莹弊,套接字本身不知曉輸入輸出之間的任何關(guān)系涤久。

Duplex stream

而 Transform 流則會(huì)對(duì)收到的每一段數(shù)據(jù)都應(yīng)用某種轉(zhuǎn)換操作,從 Writable 端接收數(shù)據(jù)箱硕,進(jìn)行某種形式地轉(zhuǎn)換后再通過 Readable 端提供給外部拴竹。

Transform stream
實(shí)現(xiàn) Transform 流
import {Transform} from 'stream'

class ReplaceStream extends Transform {
  constructor(searchStr, replaceStr, options) {
    super({...options})
    this.searchStr = searchStr
    this.replaceStr = replaceStr
    this.tail = ''
  }

  _transform(chunk, encoding, callback) {
    const pieces = (this.tail + chunk).split(this.searchStr)
    const lastPiece = pieces[pieces.length - 1]
    const tailLen = this.searchStr.length - 1
    this.tail = lastPiece.slice(-tailLen)
    pieces[pieces.length - 1] = lastPiece.slice(0, -tailLen)
    this.push(pieces.join(this.replaceStr))
    callback()
  }

  _flush(callback) {
    this.push(this.tail)
    callback()
  }
}


const replaceStream = new ReplaceStream('World', 'Node.js')
replaceStream.on('data', chunk => console.log(chunk.toString()))
replaceStream.write('Hello W')
replaceStream.write('orld')
replaceStream.end()

其中核心的 _transform() 方法,其有著和 Writable 流的 _write() 方法基本一致的簽名剧罩,但并不會(huì)將處理后的數(shù)據(jù)寫入底層資源,而是通過 this.push() 推送給內(nèi)部 buffer座泳,正如 Readable 流中 _read() 方法的行為惠昔。
所以形成了 Transform 流整體上接收幕与、轉(zhuǎn)換、發(fā)送的行為镇防。
_flush() 則會(huì)在流結(jié)束前調(diào)用啦鸣。

簡化形式

import {Transform} from 'stream'

const searchStr = 'World'
const replaceStr = 'Node.js'
let tail = ''

const replaceStream = new Transform({
  defaultEncoding: 'utf-8',

  transform(chunk, encoding, cb) {
    const pieces = (tail + chunk).split(searchStr)
    const lastPiece = pieces[pieces.length - 1]
    const tailLen = searchStr.length - 1
    tail = lastPiece.slice(-tailLen)
    pieces[pieces.length - 1] = lastPiece.slice(0, -tailLen)
    this.push(pieces.join(replaceStr))
    cb()
  },
  flush(cb) {
    this.push(tail)
    cb()
  }
})
replaceStream.on('data', chunk => console.log(chunk.toString()))
replaceStream.write('Hello W')
replaceStream.write('orld')
replaceStream.end()

Transform 流篩選和聚合數(shù)據(jù)

數(shù)據(jù)源 data.csv

type,country,profit
Household,Namibia,597290.92
Baby Food,Iceland,808579.10
Meat,Russia,277305.60
Meat,Italy,413270.00
Cereal,Malta,174965.25
Meat,Indonesia,145402.40
Household,Italy,728880.54

package.json:

{
  "type": "module",
  "main": "index.js",
  "dependencies": {
    "csv-parse": "^4.10.1"
  },
  "engines": {
    "node": ">=14"
  },
  "engineStrict": true
}

FilterByCountry Transform 流 filter-by-country.js

import {Transform} from 'stream'

export class FilterByCountry extends Transform {
  constructor(country, options = {}) {
    options.objectMode = true
    super(options)
    this.country = country
  }

  _transform(record, enc, cb) {
    if (record.country === this.country) {
      this.push(record)
    }
    cb()
  }
}

SumProfit Transform 流 sum-profit.js

import {Transform} from 'stream'

export class SumProfit extends Transform {
  constructor(options = {}) {
    options.objectMode = true
    super(options)
    this.total = 0
  }

  _transform(record, enc, cb) {
    this.total += Number.parseFloat(record.profit)
    cb()
  }

  _flush(cb) {
    this.push(this.total.toString())
    cb()
  }
}

index.js

import {createReadStream} from 'fs'
import parse from 'csv-parse'
import {FilterByCountry} from './filter-by-conutry.js'
import {SumProfit} from './sum-profit.js'

const csvParser = parse({columns: true})

createReadStream('data.csv')
  .pipe(csvParser)
  .pipe(new FilterByCountry('Italy'))
  .pipe(new SumProfit())
  .pipe(process.stdout)

參考資料

Node.js Design Patterns: Design and implement production-grade Node.js applications using proven patterns and techniques, 3rd Edition

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市来氧,隨后出現(xiàn)的幾起案子诫给,更是在濱河造成了極大的恐慌,老刑警劉巖啦扬,帶你破解...
    沈念sama閱讀 216,372評(píng)論 6 498
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件中狂,死亡現(xiàn)場離奇詭異,居然都是意外死亡扑毡,警方通過查閱死者的電腦和手機(jī)胃榕,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,368評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來瞄摊,“玉大人勋又,你說我怎么就攤上這事』恢模” “怎么了楔壤?”我有些...
    開封第一講書人閱讀 162,415評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長惯驼。 經(jīng)常有香客問我蹲嚣,道長,這世上最難降的妖魔是什么跳座? 我笑而不...
    開封第一講書人閱讀 58,157評(píng)論 1 292
  • 正文 為了忘掉前任端铛,我火速辦了婚禮,結(jié)果婚禮上疲眷,老公的妹妹穿的比我還像新娘禾蚕。我一直安慰自己,他們只是感情好狂丝,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,171評(píng)論 6 388
  • 文/花漫 我一把揭開白布换淆。 她就那樣靜靜地躺著,像睡著了一般几颜。 火紅的嫁衣襯著肌膚如雪倍试。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,125評(píng)論 1 297
  • 那天蛋哭,我揣著相機(jī)與錄音县习,去河邊找鬼。 笑死,一個(gè)胖子當(dāng)著我的面吹牛躁愿,可吹牛的內(nèi)容都是我干的叛本。 我是一名探鬼主播,決...
    沈念sama閱讀 40,028評(píng)論 3 417
  • 文/蒼蘭香墨 我猛地睜開眼彤钟,長吁一口氣:“原來是場噩夢啊……” “哼来候!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起逸雹,我...
    開封第一講書人閱讀 38,887評(píng)論 0 274
  • 序言:老撾萬榮一對(duì)情侶失蹤营搅,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后梆砸,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體转质,經(jīng)...
    沈念sama閱讀 45,310評(píng)論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,533評(píng)論 2 332
  • 正文 我和宋清朗相戀三年辫樱,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了峭拘。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,690評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡狮暑,死狀恐怖鸡挠,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情搬男,我是刑警寧澤拣展,帶...
    沈念sama閱讀 35,411評(píng)論 5 343
  • 正文 年R本政府宣布,位于F島的核電站缔逛,受9級(jí)特大地震影響备埃,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜褐奴,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,004評(píng)論 3 325
  • 文/蒙蒙 一按脚、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧敦冬,春花似錦辅搬、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,659評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至萌庆,卻和暖如春溶褪,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背践险。 一陣腳步聲響...
    開封第一講書人閱讀 32,812評(píng)論 1 268
  • 我被黑心中介騙來泰國打工猿妈, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留吹菱,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 47,693評(píng)論 2 368
  • 正文 我出身青樓于游,卻偏偏與公主長得像毁葱,于是被迫代替她去往敵國和親垫言。 傳聞我的和親對(duì)象是個(gè)殘疾皇子贰剥,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,577評(píng)論 2 353

推薦閱讀更多精彩內(nèi)容

  • 文章翻譯自:Node.js Streams: Everything you need to know 在開發(fā)者中普...
    編程go閱讀 1,513評(píng)論 0 4
  • 概念 Stream模塊 流(stream)在 Node.js 中是處理流數(shù)據(jù)的抽象接口(abstract inte...
    繁華落盡丶lee閱讀 477評(píng)論 0 1
  • 本文是Node.js設(shè)計(jì)模式的筆記, 代碼都是來自 <Node.js Design Patterns> by Ma...
    朱耀鋒閱讀 4,865評(píng)論 0 12
  • 流的概念 流是一組有序的、有起點(diǎn)和終點(diǎn)的字節(jié)數(shù)據(jù)傳輸手段 流不關(guān)心文件的整體內(nèi)容筷频,只關(guān)注是否從文件中讀到了數(shù)據(jù)蚌成,以...
    alipy_258閱讀 168評(píng)論 0 2
  • 簡介 主要對(duì)stream這個(gè)概念做一個(gè)形象的描述和理解,同時(shí)介紹一下比較常用的API凛捏。主要參考了Node.js的官...
    cooody閱讀 1,202評(píng)論 0 0