1.Flow原理探究
我們還是以最簡單的測試代碼入手:
private suspend fun testFlow() {
//上游操作符机杜,創(chuàng)建Flow的同時禽作,發(fā)射數(shù)據(jù)
flow {
emit(1)
emit(2)
emit(3)
emit(4)
emit(5)
}
//終止操作符呆瞻,接收數(shù)據(jù)
.collect {
println(it)
}
}
直接看一下flow{}高階函數(shù):
public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)
分析如下:
flow{}的返回值是Flow類型幸缕,其實是返回一個SafeFlow對象薄啥。
block的參數(shù)是suspend FlowCollector<T>.() -> Unit撞反,可知這段lambda是掛起函數(shù)鸦列,最終會被編譯成SuspendLambda類型對象租冠;其次它的接收者類型是FlowCollector<T>,也就是可以把block看成FlowCollector的成員方法薯嗤。
這里的SafeFlow定義:
private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
override suspend fun collectSafely(collector: FlowCollector<T>) {
collector.block()
}
}
SafeFlow繼承至AbstractFlow顽爹,即Flow的抽象類(可以看成是Flow的基礎抽象實現(xiàn)),重寫了collectSafely方法骆姐,注意該方法中镜粤,會調(diào)用collector.block()捏题,而這個動作就會觸發(fā)lambda代碼塊的執(zhí)行。
所以說這是一個重點方法肉渴,我們看看什么地方會調(diào)用該方法公荧,分析其父類AbstractFlow:
//代碼1
//Flow抽象類
public abstract class AbstractFlow<T> : Flow<T>, CancellableFlow<T> {
//實現(xiàn)Flow中的唯一接口
public final override suspend fun collect(collector: FlowCollector<T>) {
//注釋1,簡單封裝
val safeCollector = SafeCollector(collector, coroutineContext)
try {
//注釋2
collectSafely(safeCollector)
} finally {
safeCollector.releaseIntercepted()
}
}
public abstract suspend fun collectSafely(collector: FlowCollector<T>)
}
該抽象類實現(xiàn)了Flow接口同规,F(xiàn)low接口只有唯一的collect方法循狰,會在這里實現(xiàn)。
- 注釋1處券勺,collector就是調(diào)用collect方法時绪钥,傳入的FlowCollector對象。
- 注釋1處关炼,coroutineContext是協(xié)程上下文程腹,這是因為collect本身就是一個掛起函數(shù),是掛起函數(shù)就肯定有Continuation對象儒拂,而這里不對外暴露Continuation對象寸潦,但是可以通過編譯器優(yōu)化,我們可以拿到上下文對象侣灶。
- 在封裝為safeCollector后甸祭,就會調(diào)用collectSafely(safeCollector)方法,根據(jù)前面分析褥影,就會執(zhí)行上游的lambda中的操作池户。
即分析到現(xiàn)在,我們知道flow{}的lambda中凡怎,可以調(diào)用emit發(fā)射方法校焦,而該對象就是這個safeCollector,我們稱為上游的FlowCollector统倒。
暫時先不分析SafeCollector寨典,我們來看看下游終止操作符collect{},根據(jù)前面分析我們可知flow{}會創(chuàng)建一個SafeFlow的對象房匆,所以我們可以調(diào)用其實現(xiàn)接口Flow的唯一方法collect:
public suspend fun collect(collector: FlowCollector<T>)
其實最開始的測試代碼的寫法耸成,是經(jīng)過簡化的,其實效果如下:
//把collect簡寫復原
flow {
emit(1)
emit(2)
emit(3)
emit(4)
emit(5)
}
.collect(object: FlowCollector<Int>{
override suspend fun emit(value: Int) {
println(value)
}
})
這里使用collect方法創(chuàng)建了一個下游FlowCollector對象浴鸿,而Flow中的數(shù)據(jù)是通過回調(diào)該下游FlowCollector對象的emit方法收集到井氢。
再回到代碼1的AbstractFlow中,是不是有一種恍然大悟的感覺:只有調(diào)用了collect方法岳链,才會讓上游的lambda執(zhí)行花竞,這也就是"冷"的表現(xiàn)。
那么還剩一個問題:在AbstractFlow的collect(collector: FlowCollector)方法中掸哑,通知下游數(shù)據(jù)是通過下游操作符的collector的emit方法约急,而發(fā)送數(shù)據(jù)是通過上游操作符的safeCollector的emit方法零远,這是如何結(jié)合起來的呢?
謎底就在SafeCollector類厌蔽,該類定義:
//注釋1牵辣,函數(shù)引用
private val emitFun =
FlowCollector<Any?>::emit as Function3<FlowCollector<Any?>, Any?, Continuation<Unit>, Any?>
//實現(xiàn)類,該類又實現(xiàn)了FlowCollector接口
internal actual class SafeCollector<T> actual constructor(
//終止操作符的FlowCollector
@JvmField internal actual val collector: FlowCollector<T>,
//協(xié)程上下文
@JvmField internal actual val collectContext: CoroutineContext
) : FlowCollector<T>, ContinuationImpl(NoOpContinuation, EmptyCoroutineContext), CoroutineStackFrame {
...
//注釋2躺枕,上游操作符中的FlowCollector會調(diào)用的發(fā)射方法
override suspend fun emit(value: T) {
return suspendCoroutineUninterceptedOrReturn sc@{ uCont ->
try {
//發(fā)射數(shù)據(jù)
emit(uCont, value)
} catch (e: Throwable) {
lastEmissionContext = DownstreamExceptionElement(e)
throw e
}
}
}
//內(nèi)部方法
private fun emit(uCont: Continuation<Unit>, value: T): Any? {
val currentContext = uCont.context
currentContext.ensureActive()
// This check is triggered once per flow on happy path.
val previousContext = lastEmissionContext
if (previousContext !== currentContext) {
checkContext(currentContext, previousContext, value)
}
completion = uCont
//注釋3
return emitFun(collector as FlowCollector<Any?>, value, this as Continuation<Unit>)
}
...
}
這里省略了部分方法實現(xiàn)服猪,只展示重要代碼段,分析:
- 首先是注釋2拐云,由于SafeCollector是實現(xiàn)FlowCollector接口罢猪,所以注釋2處的emit就是上游操作符中的emit方法,即flow{ emit(0) }就是調(diào)用該方法叉瘩。
- 該方法中膳帕,會使用suspendCoroutineUninterceptedOrReturn來實現(xiàn)掛起函數(shù),即發(fā)送數(shù)據(jù)薇缅,調(diào)用私有的內(nèi)部emit方法危彩。
- 在私有的emit方法的注釋3處,會調(diào)用emit()方法泳桦,方法參數(shù)分別為:collector也就是下游操作符中的FlowCollector汤徽,value也就是發(fā)射的數(shù)據(jù)0,this是Continuation對象灸撰。
- 結(jié)合注釋1處的定義谒府,這里使用了函數(shù)引用,用到了掛起函數(shù)CPS的原理浮毯,即FlowCollector的emit掛起函數(shù)完疫,其CPS后的函數(shù)引用就是Function3<FlowCollector<Any?>, Any?, Continuation<Unit>, Any?> ,表示意思是:參數(shù)分別是接收者FlowCollector债蓝,發(fā)射的值Any>壳鹤,拼接在參數(shù)后面的Continuation對象,和返回值Any?饰迹。
所以這里注釋3處芳誓,就是調(diào)用了下游操作符的FlowCollector的emit方法,這樣也就可以完美收集到數(shù)據(jù)了啊鸭。
這里我們可以總結(jié)一下:
1.下游調(diào)用了collect方法锹淌,傳遞下游FlowCollector對象,才會觸發(fā)上游數(shù)據(jù)發(fā)射莉掂。
2.上游數(shù)據(jù)發(fā)射葛圃,即上游的FlowCollector調(diào)用emit發(fā)射的數(shù)據(jù)千扔,會通過轉(zhuǎn)換調(diào)用下游的FlowCollector的emit方法來接收數(shù)據(jù)憎妙。
這里第一點解釋了Flow冷的原因库正,第二點解釋了Flow懶惰的原因:一次只能發(fā)送和接收一個數(shù)據(jù)。
2.總結(jié)
這么一分析完厘唾,其實可以發(fā)現(xiàn)Flow還是非常簡單的褥符,實現(xiàn)思路就類似與Callbck傳遞,終止操作符collect{}設置Callback(FlowCollector)抚垃,觸發(fā)上游flow{}的Callback(FlowCollector)發(fā)射數(shù)據(jù)喷楣。
而中間操作符,也是一樣的思想:觸發(fā)上游鹤树,接收上游數(shù)據(jù)铣焊。本篇重點分析了filter{},會發(fā)現(xiàn)最后還是返回一個Flow對象罕伯,在fitler{}實現(xiàn)中會調(diào)用collect{}方法曲伊,然后調(diào)用emit方法。