該實(shí)現(xiàn)方式與之前幾種不同的捺氢,該實(shí)現(xiàn)方式僅針對(duì)Nodejs環(huán)境。在Nodejs環(huán)境中扁远,提供了Stream類(lèi)酬荞,包括Readable、Transform粟瞬、Writeable等子類(lèi)都是可擴(kuò)展的同仆。從字面上看,正好對(duì)應(yīng)Rx中的生產(chǎn)者裙品、傳遞者俗批、消費(fèi)者。
實(shí)現(xiàn)該庫(kù)的起因是市怎,一次在Nodejs中需要在koa框架里面提供event-stream功能岁忘,目前除了IE瀏覽器外其他瀏覽器都支持了服務(wù)端事件推送,這個(gè)功能可以很好的代替輪詢区匠。webpack用的熱更新就是通過(guò)這個(gè)功能實(shí)現(xiàn)的干像。
言歸正傳,首先得實(shí)現(xiàn)生產(chǎn)者驰弄,我們先來(lái)看interval
class Interval extends Readable {
constructor(period) {
super({ objectMode: true })
this.period = period
this.i = 0
}
_read(size) {
setTimeout(() => this.push(this.i++), this.period)
}
}
exports.interval = period => new Interval(period)
說(shuō)明一下麻汰,構(gòu)造函數(shù)傳入objectMode:true的對(duì)象是讓stream處于對(duì)象模式,而不是二進(jìn)制流模式戚篙。_read函數(shù)必須覆蓋父類(lèi)五鲫,否則出錯(cuò),當(dāng)有訂閱者連接上來(lái)后已球,就會(huì)調(diào)用_read方法臣镣。我們?cè)谶@個(gè)方法里面發(fā)送數(shù)據(jù),即調(diào)用push方法智亮,將數(shù)據(jù)發(fā)送給流的接收者忆某。
當(dāng)調(diào)用過(guò)push方法后,后面的接收者如果調(diào)用了callback回調(diào)阔蛉,則表示數(shù)據(jù)消費(fèi)完畢弃舒,會(huì)再次調(diào)用_read方法,直到push(null)表示生產(chǎn)者已經(jīng)complete
FromArray也十分簡(jiǎn)單易讀
class FromArray extends Readable {
constructor(array) {
super({ objectMode: true })
this.array = array
this.pos = 0
this.size = array.length
}
_read(size) {
if (this.pos < this.size) {
this.push(this.array[this.pos++])
} else
this.push(null)
}
}
exports.fromArray = array => new FromArray(array)
下面要實(shí)現(xiàn)一個(gè)轉(zhuǎn)換器(操作符)Filter
class Filter extends Transform {
constructor(f) {
super({ readableObjectMode: true, writableObjectMode: true })
this.f = f
}
_transform(data, encoding, callback) {
const f = this.f
if (f(data)) {
this.push(data);
}
callback();
}
_flush(callback) {
callback()
}
}
exports.filter = f => new Filter(f)
這時(shí)候我們需要覆蓋_transform、_flush函數(shù)聋呢,同樣的苗踪,push方法會(huì)讓數(shù)據(jù)流到下面的流中,而callback回調(diào)會(huì)使得上一個(gè)流繼續(xù)發(fā)送數(shù)據(jù)削锰。
最后我們來(lái)實(shí)現(xiàn)Subscriber
class Subscriber extends Writable {
constructor(n, e, c) {
super({ objectMode: true })
this.n = n
this.e = e
this.c = c
}
_write(chunk, encoding, callback) {
this.n(chunk)
callback(null)
}
_final(callback) {
this.c()
callback()
}
}
exports.subscribe = (n, e = noop, c = noop) => new Subscriber(n, e, c)
Subscriber是一個(gè)可寫(xiě)流通铲,我們必須覆蓋_write方法用于消費(fèi)數(shù)據(jù),_final方法用于complete事件處理器贩。這里沒(méi)有實(shí)現(xiàn)error事件颅夺。有興趣的同學(xué)可以思考如何實(shí)現(xiàn)。
最后我們需要把各種stream串起來(lái)蛹稍,變成一個(gè)長(zhǎng)長(zhǎng)的水管
exports.pipe = pipeline || ((first, ...cbs) => cbs.reduce((aac, c) => aac.pipe(c), first));
高版本的Nodejs已經(jīng)提供了pipeline方法吧黄,可以直接使用,低版本的話唆姐,可以用上面的方法進(jìn)行連接拗慨。
至此,我們已經(jīng)使用Nodejs提供的Stream類(lèi)實(shí)現(xiàn)了Rx的基本邏輯奉芦。(完)