Flow源碼閱讀筆記

上篇看了Flow的基本用法橄妆,這篇文章就從源碼的角度來看看Flow的運(yùn)行機(jī)制

1.Flow創(chuàng)建
fun simpleFlow() = flow<Int> {  
    for (i in 1..3) {  
        delay(100)  
        emit(i)  
    }  
}

看一下flow函數(shù)的定義

public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)

參數(shù)類型為

@BuilderInference block: suspend FlowCollector<T>.() -> Unit

這里的參數(shù)慨绳,可以理解為 入?yún)⑹且粋€(gè)函數(shù)谆构,該函數(shù)是FlowCollector的一個(gè)擴(kuò)展函數(shù)署惯,沒有入?yún)⑿惺矝]有出參(返回值為Unit烫沙,相當(dāng)于java的void)匹层。對于這塊不理解的,可以參閱 這里

flow函數(shù)調(diào)用了 SafeFlow的構(gòu)造函數(shù)

private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {  
    override suspend fun collectSafely(collector: FlowCollector<T>) {  
        collector.block()  
    }  
}

AbstractFlow 代碼也比較簡單锌蓄,稍后再說

到這里升筏,F(xiàn)low創(chuàng)建圓滿結(jié)束了

2.接收 collect 函數(shù)

前面介紹過,F(xiàn)low為冷流瘸爽,冷流不會(huì)發(fā)射數(shù)據(jù)您访,只有到了收集(末端操作符)的時(shí)候,數(shù)據(jù)才開始生產(chǎn)并被發(fā)射出去剪决。接下來就來看看emit和collect怎么發(fā)生的關(guān)聯(lián)灵汪。先來看一下collect函數(shù)

public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit =  
    collect(object : FlowCollector<T> {  
        override suspend fun emit(value: T) = action(value)  
    })

這里可以看出,F(xiàn)lowCollector的emit方法柑潦,實(shí)際上調(diào)用的是collect傳入的action方法享言。但是,我們創(chuàng)建Flow的FlowCollector是如何與collect方法傳入的FlowCollector產(chǎn)生關(guān)系的呢渗鬼?

關(guān)鍵就在于SafeFlow這個(gè)類

private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {  
    override suspend fun collectSafely(collector: FlowCollector<T>) {  
       collector.block()  
    }  
}

AbstractFlow代碼如下

public abstract class AbstractFlow<T> : Flow<T>, CancellableFlow<T> {
    @InternalCoroutinesApi  
    public final override suspend fun collect(collector: FlowCollector<T>) {  
        val safeCollector = SafeCollector(collector, coroutineContext)  
        try {  
            collectSafely(safeCollector)  
        } finally {  
            safeCollector.releaseIntercepted()  
        }  
    }  

    /**  
     * Accepts the given [collector] and [emits][FlowCollector.emit] values into it.     *     * A valid implementation of this method has the following constraints:     * 1) It should not change the coroutine context (e.g. with `withContext(Dispatchers.IO)`) when emitting values.     *    The emission should happen in the context of the [collect] call.     *    Please refer to the top-level [Flow] documentation for more details.     * 2) It should serialize calls to [emit][FlowCollector.emit] as [FlowCollector] implementations are not     *    thread-safe by default.     *    To automatically serialize emissions [channelFlow] builder can be used instead of [flow]     *     * @throws IllegalStateException if any of the invariants are violated.  
     */    
    public abstract suspend fun collectSafely(collector: FlowCollector<T>)  
}

到這里可以看出SafeFlow的collect方法览露,實(shí)際調(diào)用的是collectSafely方法,最終是collect生成的FlowCollector調(diào)用創(chuàng)建時(shí)傳入的block方法譬胎。

有點(diǎn)繞差牛,再捋一遍。

flow構(gòu)造時(shí)银择,傳入FlowCollector的擴(kuò)展方法多糠,我們稱此方法為block

當(dāng)collect方法調(diào)用時(shí),傳入?yún)?shù)action浩考,首先將此action方法包裝成FlowCollector夹孔,我們稱之為safeCollector

而collect最終調(diào)用的為safeCollector.block

到此,我們就理解了,為什么Flow是冷流了搭伤,只有末端操作符才會(huì)調(diào)用其構(gòu)造時(shí)的block

3.協(xié)程切換flowOn方法

直接看源碼

public fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T> {  
    checkFlowContext(context)  
    return when {  
        context == EmptyCoroutineContext -> this  
        this is FusibleFlow -> fuse(context = context)  
        else -> ChannelFlowOperatorImpl(this, context = context)  
    }  
}

這里的when方法比較有意思只怎,沒有參數(shù)。kotlin的when支持沒有參數(shù)的條件跳轉(zhuǎn)怜俐,無參時(shí)需要各種條件都是一個(gè)boolean型表達(dá)式身堡, 參見這里

以ChannelFlowOperatorImpl為例來看一下

internal class ChannelFlowOperatorImpl<T>(  
flow: Flow<T>,  
context: CoroutineContext = EmptyCoroutineContext,  
capacity: Int = Channel.OPTIONAL_CHANNEL,  
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND  
) : ChannelFlowOperator<T, T>(flow, context, capacity, onBufferOverflow) {  

    override fun create(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): ChannelFlow<T> =  
        ChannelFlowOperatorImpl(flow, context, capacity, onBufferOverflow)
    override fun dropChannelOperators(): Flow<T> = flow  

    override suspend fun flowCollect(collector: FlowCollector<T>) =  
        flow.collect(collector)  
}

這里沒什么有價(jià)值的代碼,由于ChannelFlowOperatorImpl繼承自ChannelFlowOperator看一下ChannelFlowOperator的代碼

internal abstract class ChannelFlowOperator<S, T>(
    @JvmField protected val flow: Flow<S>,
    context: CoroutineContext,
    capacity: Int,
    onBufferOverflow: BufferOverflow
) : ChannelFlow<T>(context, capacity, onBufferOverflow) {
    protected abstract suspend fun flowCollect(collector: FlowCollector<T>)

    // Changes collecting context upstream to the specified newContext, while collecting in the original context
    private suspend fun collectWithContextUndispatched(collector: FlowCollector<T>, newContext: CoroutineContext) {
        val originalContextCollector = collector.withUndispatchedContextCollector(coroutineContext)
        // invoke flowCollect(originalContextCollector) in the newContext
        return withContextUndispatched(newContext, block = { flowCollect(it) }, value = originalContextCollector)
    }

    // Slow path when output channel is required
    protected override suspend fun collectTo(scope: ProducerScope<T>) =
        flowCollect(SendingCollector(scope))

    // Optimizations for fast-path when channel creation is optional
    override suspend fun collect(collector: FlowCollector<T>) {
        // Fast-path: When channel creation is optional (flowOn/flowWith operators without buffer)
        if (capacity == Channel.OPTIONAL_CHANNEL) {
            val collectContext = coroutineContext
            val newContext = collectContext + context // compute resulting collect context
            // #1: If the resulting context happens to be the same as it was -- fallback to plain collect
            if (newContext == collectContext)
                return flowCollect(collector)
            // #2: If we don't need to change the dispatcher we can go without channels
            if (newContext[ContinuationInterceptor] == collectContext[ContinuationInterceptor])
                return collectWithContextUndispatched(collector, newContext)
        }
        // Slow-path: create the actual channel
        super.collect(collector)
    }

    // debug toString
    override fun toString(): String = "$flow -> ${super.toString()}"
}

collect執(zhí)行的時(shí)候拍鲤,如果指定的協(xié)程與現(xiàn)在的不一致贴谎,則走collectWithContextUndispatched方法,走到下面這個(gè)方法

internal suspend fun <T, V> withContextUndispatched(
    newContext: CoroutineContext,
    value: V,
    countOrElement: Any = threadContextElements(newContext), // can be precomputed for speed
    block: suspend (V) -> T
): T =
    suspendCoroutineUninterceptedOrReturn { uCont ->
        withCoroutineContext(newContext, countOrElement) {
            block.startCoroutineUninterceptedOrReturn(value, StackFrameContinuation(uCont, newContext))
        }
    }

withCoroutineContext這個(gè)方法就是協(xié)程切換的地方了季稳。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末擅这,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子景鼠,更是在濱河造成了極大的恐慌仲翎,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,755評(píng)論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件铛漓,死亡現(xiàn)場離奇詭異溯香,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)浓恶,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,305評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門玫坛,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人问顷,你說我怎么就攤上這事昂秃≠魇幔” “怎么了杜窄?”我有些...
    開封第一講書人閱讀 165,138評(píng)論 0 355
  • 文/不壞的土叔 我叫張陵,是天一觀的道長算途。 經(jīng)常有香客問我塞耕,道長,這世上最難降的妖魔是什么嘴瓤? 我笑而不...
    開封第一講書人閱讀 58,791評(píng)論 1 295
  • 正文 為了忘掉前任扫外,我火速辦了婚禮,結(jié)果婚禮上廓脆,老公的妹妹穿的比我還像新娘筛谚。我一直安慰自己,他們只是感情好停忿,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,794評(píng)論 6 392
  • 文/花漫 我一把揭開白布驾讲。 她就那樣靜靜地躺著,像睡著了一般。 火紅的嫁衣襯著肌膚如雪吮铭。 梳的紋絲不亂的頭發(fā)上时迫,一...
    開封第一講書人閱讀 51,631評(píng)論 1 305
  • 那天,我揣著相機(jī)與錄音谓晌,去河邊找鬼掠拳。 笑死,一個(gè)胖子當(dāng)著我的面吹牛纸肉,可吹牛的內(nèi)容都是我干的溺欧。 我是一名探鬼主播,決...
    沈念sama閱讀 40,362評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼柏肪,長吁一口氣:“原來是場噩夢啊……” “哼胧奔!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起预吆,我...
    開封第一講書人閱讀 39,264評(píng)論 0 276
  • 序言:老撾萬榮一對情侶失蹤龙填,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后拐叉,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體岩遗,經(jīng)...
    沈念sama閱讀 45,724評(píng)論 1 315
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,900評(píng)論 3 336
  • 正文 我和宋清朗相戀三年凤瘦,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了宿礁。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,040評(píng)論 1 350
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡蔬芥,死狀恐怖梆靖,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情笔诵,我是刑警寧澤返吻,帶...
    沈念sama閱讀 35,742評(píng)論 5 346
  • 正文 年R本政府宣布,位于F島的核電站乎婿,受9級(jí)特大地震影響测僵,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜谢翎,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,364評(píng)論 3 330
  • 文/蒙蒙 一捍靠、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧森逮,春花似錦榨婆、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,944評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽颜武。三九已至,卻和暖如春拖吼,著一層夾襖步出監(jiān)牢的瞬間鳞上,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,060評(píng)論 1 270
  • 我被黑心中介騙來泰國打工吊档, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留篙议,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,247評(píng)論 3 371
  • 正文 我出身青樓怠硼,卻偏偏與公主長得像鬼贱,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個(gè)殘疾皇子香璃,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,979評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容