上篇看了Flow的基本用法橄妆,這篇文章就從源碼的角度來看看Flow的運(yùn)行機(jī)制
1.Flow創(chuàng)建
fun simpleFlow() = flow<Int> {
for (i in 1..3) {
delay(100)
emit(i)
}
}
看一下flow函數(shù)的定義
public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)
參數(shù)類型為
@BuilderInference block: suspend FlowCollector<T>.() -> Unit
這里的參數(shù)慨绳,可以理解為 入?yún)⑹且粋€(gè)函數(shù)谆构,該函數(shù)是FlowCollector的一個(gè)擴(kuò)展函數(shù)署惯,沒有入?yún)⑿惺矝]有出參(返回值為Unit烫沙,相當(dāng)于java的void)匹层。對于這塊不理解的,可以參閱 這里
flow函數(shù)調(diào)用了 SafeFlow的構(gòu)造函數(shù)
private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
override suspend fun collectSafely(collector: FlowCollector<T>) {
collector.block()
}
}
AbstractFlow 代碼也比較簡單锌蓄,稍后再說
到這里升筏,F(xiàn)low創(chuàng)建圓滿結(jié)束了
2.接收 collect 函數(shù)
前面介紹過,F(xiàn)low為冷流瘸爽,冷流不會(huì)發(fā)射數(shù)據(jù)您访,只有到了收集(末端操作符)的時(shí)候,數(shù)據(jù)才開始生產(chǎn)并被發(fā)射出去剪决。接下來就來看看emit和collect怎么發(fā)生的關(guān)聯(lián)灵汪。先來看一下collect函數(shù)
public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit =
collect(object : FlowCollector<T> {
override suspend fun emit(value: T) = action(value)
})
這里可以看出,F(xiàn)lowCollector的emit方法柑潦,實(shí)際上調(diào)用的是collect傳入的action方法享言。但是,我們創(chuàng)建Flow的FlowCollector是如何與collect方法傳入的FlowCollector產(chǎn)生關(guān)系的呢渗鬼?
關(guān)鍵就在于SafeFlow這個(gè)類
private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
override suspend fun collectSafely(collector: FlowCollector<T>) {
collector.block()
}
}
AbstractFlow代碼如下
public abstract class AbstractFlow<T> : Flow<T>, CancellableFlow<T> {
@InternalCoroutinesApi
public final override suspend fun collect(collector: FlowCollector<T>) {
val safeCollector = SafeCollector(collector, coroutineContext)
try {
collectSafely(safeCollector)
} finally {
safeCollector.releaseIntercepted()
}
}
/**
* Accepts the given [collector] and [emits][FlowCollector.emit] values into it. * * A valid implementation of this method has the following constraints: * 1) It should not change the coroutine context (e.g. with `withContext(Dispatchers.IO)`) when emitting values. * The emission should happen in the context of the [collect] call. * Please refer to the top-level [Flow] documentation for more details. * 2) It should serialize calls to [emit][FlowCollector.emit] as [FlowCollector] implementations are not * thread-safe by default. * To automatically serialize emissions [channelFlow] builder can be used instead of [flow] * * @throws IllegalStateException if any of the invariants are violated.
*/
public abstract suspend fun collectSafely(collector: FlowCollector<T>)
}
到這里可以看出SafeFlow的collect方法览露,實(shí)際調(diào)用的是collectSafely方法,最終是collect生成的FlowCollector調(diào)用創(chuàng)建時(shí)傳入的block方法譬胎。
有點(diǎn)繞差牛,再捋一遍。
flow構(gòu)造時(shí)银择,傳入FlowCollector的擴(kuò)展方法多糠,我們稱此方法為block
當(dāng)collect方法調(diào)用時(shí),傳入?yún)?shù)action浩考,首先將此action方法包裝成FlowCollector夹孔,我們稱之為safeCollector
而collect最終調(diào)用的為safeCollector.block
到此,我們就理解了,為什么Flow是冷流了搭伤,只有末端操作符才會(huì)調(diào)用其構(gòu)造時(shí)的block
3.協(xié)程切換flowOn方法
直接看源碼
public fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T> {
checkFlowContext(context)
return when {
context == EmptyCoroutineContext -> this
this is FusibleFlow -> fuse(context = context)
else -> ChannelFlowOperatorImpl(this, context = context)
}
}
這里的when方法比較有意思只怎,沒有參數(shù)。kotlin的when支持沒有參數(shù)的條件跳轉(zhuǎn)怜俐,無參時(shí)需要各種條件都是一個(gè)boolean型表達(dá)式身堡, 參見這里
以ChannelFlowOperatorImpl為例來看一下
internal class ChannelFlowOperatorImpl<T>(
flow: Flow<T>,
context: CoroutineContext = EmptyCoroutineContext,
capacity: Int = Channel.OPTIONAL_CHANNEL,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
) : ChannelFlowOperator<T, T>(flow, context, capacity, onBufferOverflow) {
override fun create(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): ChannelFlow<T> =
ChannelFlowOperatorImpl(flow, context, capacity, onBufferOverflow)
override fun dropChannelOperators(): Flow<T> = flow
override suspend fun flowCollect(collector: FlowCollector<T>) =
flow.collect(collector)
}
這里沒什么有價(jià)值的代碼,由于ChannelFlowOperatorImpl繼承自ChannelFlowOperator看一下ChannelFlowOperator的代碼
internal abstract class ChannelFlowOperator<S, T>(
@JvmField protected val flow: Flow<S>,
context: CoroutineContext,
capacity: Int,
onBufferOverflow: BufferOverflow
) : ChannelFlow<T>(context, capacity, onBufferOverflow) {
protected abstract suspend fun flowCollect(collector: FlowCollector<T>)
// Changes collecting context upstream to the specified newContext, while collecting in the original context
private suspend fun collectWithContextUndispatched(collector: FlowCollector<T>, newContext: CoroutineContext) {
val originalContextCollector = collector.withUndispatchedContextCollector(coroutineContext)
// invoke flowCollect(originalContextCollector) in the newContext
return withContextUndispatched(newContext, block = { flowCollect(it) }, value = originalContextCollector)
}
// Slow path when output channel is required
protected override suspend fun collectTo(scope: ProducerScope<T>) =
flowCollect(SendingCollector(scope))
// Optimizations for fast-path when channel creation is optional
override suspend fun collect(collector: FlowCollector<T>) {
// Fast-path: When channel creation is optional (flowOn/flowWith operators without buffer)
if (capacity == Channel.OPTIONAL_CHANNEL) {
val collectContext = coroutineContext
val newContext = collectContext + context // compute resulting collect context
// #1: If the resulting context happens to be the same as it was -- fallback to plain collect
if (newContext == collectContext)
return flowCollect(collector)
// #2: If we don't need to change the dispatcher we can go without channels
if (newContext[ContinuationInterceptor] == collectContext[ContinuationInterceptor])
return collectWithContextUndispatched(collector, newContext)
}
// Slow-path: create the actual channel
super.collect(collector)
}
// debug toString
override fun toString(): String = "$flow -> ${super.toString()}"
}
collect執(zhí)行的時(shí)候拍鲤,如果指定的協(xié)程與現(xiàn)在的不一致贴谎,則走collectWithContextUndispatched方法,走到下面這個(gè)方法
internal suspend fun <T, V> withContextUndispatched(
newContext: CoroutineContext,
value: V,
countOrElement: Any = threadContextElements(newContext), // can be precomputed for speed
block: suspend (V) -> T
): T =
suspendCoroutineUninterceptedOrReturn { uCont ->
withCoroutineContext(newContext, countOrElement) {
block.startCoroutineUninterceptedOrReturn(value, StackFrameContinuation(uCont, newContext))
}
}
withCoroutineContext這個(gè)方法就是協(xié)程切換的地方了季稳。