Kotlin SharedFlow 源碼解析

前言

對于SharedFlow使用可以看之前的文章 Kotlin SharedFlow 使用。在這篇文章中已經(jīng)通過多個demo實戰(zhàn)幫大家總結(jié)了SharedFlow的一些特性和使用場景印机。但是也遺留了一些疑惑咐刨,所以本文打算通過輕度閱讀源碼的方式給大家答疑解惑竭恬。放心殷蛇,不會從頭到尾的說。

SharedFlow源碼分析的重點

在源碼中可以分析的點很多直砂,但我認(rèn)為最重要的點有2點史辙。emit和collect是如何關(guān)聯(lián)的汹买,緩存機(jī)制是怎樣的。

從上一篇文章可以了解到emit和collect是掛起函數(shù)髓霞,但是否被掛起是有一定條件的卦睹。而且生產(chǎn)者,消費者出現(xiàn)的時機(jī)也會影響數(shù)據(jù)流的執(zhí)行方库,這些問題就是本文要研究的重點。

下面分析emit和collect的小節(jié)中還會提出幾個問題障斋,帶著大家通過解決問題的方式閱讀源碼纵潦。

回顧下之前文章的demo
為什么先發(fā)射數(shù)據(jù),再collect就收不到數(shù)據(jù)呢

 runBlocking {
        val sharedFlow = MutableSharedFlow<Int>()
        sharedFlow.emit(1)

        launch {
            sharedFlow.collect {
                println("collect: $it")
            }
        }
    }

為什么設(shè)置了replay緩存垃环,超出其緩存數(shù)量的時候邀层,會丟失前面的數(shù)據(jù),只能收到最新replay數(shù)量的數(shù)據(jù)呢遂庄?比如下面demo寥院,只能收到2和3

  runBlocking {
        //默認(rèn)參數(shù)情況,先emit涛目,再collect收不到數(shù)據(jù)
        val sharedFlow = MutableSharedFlow<Int>(replay = 2, extraBufferCapacity = 0)

        sharedFlow.emit(1)
        sharedFlow.emit(2)
        sharedFlow.emit(3)

        launch {
            sharedFlow.collect {
                println("collect: $it")
            }
        }
    }

emit發(fā)射數(shù)據(jù)

對于初學(xué)者來說秸谢,讀懂這個方法確實有一定難度,不過我換了一個角度來帶大家理解霹肝,從不同業(yè)務(wù)場景估蹄,使用角度來逐步分析,大致分為4種沫换。

  • 沒有收集者臭蚁,無replay緩存
  • 沒有收集者,有replay緩存(extraBufferCapacity緩存無影響)
  • 有收集者讯赏,無replay緩存
  • 有收集者垮兑,有replay緩存
  override suspend fun emit(value: T) {
//嘗試發(fā)送數(shù)據(jù),這是一個快速路徑,可以提高發(fā)送數(shù)據(jù)的效率漱挎。 
        if (tryEmit(value)) return // fast-path
//在掛起的協(xié)程中發(fā)送數(shù)據(jù)
        emitSuspend(value)
    }

emit方法先用不需要掛起的方式發(fā)數(shù)據(jù)系枪,發(fā)失敗之后采用掛起的方式發(fā)。

以不掛起的方式發(fā)射數(shù)據(jù)

 override fun tryEmit(value: T): Boolean {
        var resumes: Array<Continuation<Unit>?> = EMPTY_RESUMES
        val emitted = synchronized(this) {
//嘗試發(fā)射數(shù)據(jù)识樱,發(fā)射成功返回true嗤无,失敗返回false
            if (tryEmitLocked(value)) {
              //找到需要恢復(fù)的協(xié)程震束,并將結(jié)果保存到  resumes  數(shù)組中,
                resumes = findSlotsToResumeLocked(resumes)
                true
            } else {
                false
            }
        }
//上面已經(jīng)找到了需要恢復(fù)的協(xié)程当犯,這里只需要恢復(fù)協(xié)程的執(zhí)行
        for (cont in resumes) cont?.resume(Unit)
        return emitted
    }
 private fun tryEmitLocked(value: T): Boolean {
        // 沒有收集者垢村,一定返回true
        if (nCollectors == 0) return tryEmitNoCollectorsLocked(value) 
        // 有收集者,緩存區(qū)已滿嚎卫,超過replay + extraBufferCapacity數(shù)量嘉栓,且消費者沒有消費最舊的數(shù)據(jù)(replayIndex)
        if (bufferSize >= bufferCapacity && minCollectorIndex <= replayIndex) {
//執(zhí)行下面的緩存溢出策略
            when (onBufferOverflow) {
                BufferOverflow.SUSPEND -> return false // will suspend
                BufferOverflow.DROP_LATEST -> return true // just drop incoming
                BufferOverflow.DROP_OLDEST -> {} //丟棄最舊的數(shù)據(jù),這里暫不處理
            }
        }
//將數(shù)據(jù)加入到緩存數(shù)組中拓诸,這里不會掛起emit所在的協(xié)程
        enqueueLocked(value)
        bufferSize++ //緩存數(shù)組長度
        // 上面的緩存溢出策略侵佃,丟棄最老數(shù)據(jù)是沒做處理的,實際上延遲在這里處理
        if (bufferSize > bufferCapacity) dropOldestLocked()
        //  如果replayCache中數(shù)據(jù)的數(shù)量超過了最大容量
        if (replaySize > replay) {
// 更新replayIndex的值奠支,replayIndex向前移動一位
            updateBufferLocked(replayIndex + 1, minCollectorIndex, bufferEndIndex, queueEndIndex)
        }
        return true
    }

這個方法已經(jīng)涉及到shareflow緩存機(jī)制馋辈,所以有必要先來張圖大概了解下緩存機(jī)制。

image.png

shareflow緩存是個數(shù)組倍谜,大小由 bufferSize 控制迈螟,而緩存容量由 bufferCapacity 控制。緩存由3部分組成尔崔,replay緩存的數(shù)量答毫,extraBufferCapacity緩存的數(shù)量,這2部分加起來就是buffered values季春,還有就是掛起時候的Emitter對象洗搂。圖中還展示了兩個慢速收集器的位置,即可能收集緩存隊列中的值的最慢速度的收集器的位置载弄。這兩個位置分別由 minCollectorIndex 和 replayIndex 控制耘拇。

這個tryEmitLocked方法很重要褥芒,因為它可以解釋之前文章中一些業(yè)務(wù)場景的困惑回挽。 下面我們就來跟大家一起解剖下這個方法

tryEmitLocked方法沒有收集者

  //走到這里,說明沒有收集者
 private fun tryEmitNoCollectorsLocked(value: T): Boolean {
        assert { nCollectors == 0 }
//replay緩存為0衫贬,就丟棄數(shù)據(jù)尺碰,emit方法就結(jié)束了
        if (replay == 0) return true 
        enqueueLocked(value) // 加入到緩存數(shù)組
        bufferSize++ // value was added to buffer
         //若是emit發(fā)射的數(shù)量超過了重放個數(shù)挣棕,則丟棄最舊的值
        if (bufferSize > replay) dropOldestLocked()
        minCollectorIndex = head + bufferSize // a default value (max allowed)
        return true
    }

這段代碼也解釋了之前的2個疑惑

  • 在不配置replay緩存的情況下,先emit發(fā)數(shù)據(jù)再collect是收不到數(shù)據(jù)的
  • 在配置了replay的情況下亲桥,先emit再collect是能收到數(shù)據(jù)洛心,但是 emit發(fā)射的數(shù)量超過了replay的話,就只能收到最新的replay個數(shù)的數(shù)據(jù)

如下2段代碼题篷,運(yùn)行結(jié)果都只能收到2和3

 runBlocking {
        //默認(rèn)參數(shù)情況词身,先emit,再collect收不到數(shù)據(jù)
        val sharedFlow = MutableSharedFlow<Int>(replay = 2, extraBufferCapacity = 0)

        sharedFlow.emit(1)
        sharedFlow.emit(2)
        sharedFlow.emit(3)

        launch {
            sharedFlow.collect {
                println("collect: $it")
            }
        }
    }


 runBlocking {
        //這里配置了extraBufferCapacity根本不會起到效果
        val sharedFlow = MutableSharedFlow<Int>(replay = 2, extraBufferCapacity = 2)

        sharedFlow.emit(1)
        sharedFlow.emit(2)
        sharedFlow.emit(3)

        launch {
            sharedFlow.collect {
                println("collect: $it")
            }
        }
    }

從上面demo和tryEmitNoCollectorsLocked源碼分析可以看出:如果emit發(fā)射到buffered values的數(shù)據(jù)數(shù)量超過了replay的值番枚,會丟棄最舊的數(shù)據(jù)法严,保持buffered values中數(shù)據(jù)的數(shù)量最大為replay损敷。

當(dāng)有新的訂閱者時,會先從replayCache中獲取數(shù)據(jù)深啤,在buffered values中拗馒,replayCache前的數(shù)據(jù)只對已經(jīng)訂閱的訂閱者有用,而此時又沒有訂閱者溯街,因此緩存超過replayCache最大容量的數(shù)據(jù)只會占用更多內(nèi)存诱桂,是沒有意義的。記壮饰簟:沒有收集者時挥等,extraBufferCapacity是不會起作用的

tryEmitLocked方法有收集者
上面已經(jīng)分析了這個方法沒有收集者的情況,接下來就分析下有收集者的情況

 private fun tryEmitLocked(value: T): Boolean {
        // 沒有收集者堤尾,一定返回true
        if (nCollectors == 0) return tryEmitNoCollectorsLocked(value) 
        // 有收集者肝劲,緩存區(qū)已滿,超過replay + extraBufferCapacity數(shù)量哀峻,且消費者沒有消費最舊的數(shù)據(jù)(replayIndex)
        if (bufferSize >= bufferCapacity && minCollectorIndex <= replayIndex) {
//執(zhí)行下面的緩存溢出策略
            when (onBufferOverflow) {
                BufferOverflow.SUSPEND -> return false // will suspend
                BufferOverflow.DROP_LATEST -> return true // just drop incoming
                BufferOverflow.DROP_OLDEST -> {} //丟棄最舊的數(shù)據(jù)涡相,這里暫不處理
            }
        }
//將數(shù)據(jù)加入到緩存數(shù)組中,這里不會掛起emit所在的協(xié)程
        enqueueLocked(value)
        bufferSize++ //緩存數(shù)組長度
        // 上面的緩存溢出策略剩蟀,丟棄最老數(shù)據(jù)是沒做處理的,實際上延遲在這里處理
        if (bufferSize > bufferCapacity) dropOldestLocked()
        //  如果replayCache中數(shù)據(jù)的數(shù)量超過了最大容量
        if (replaySize > replay) {
// 更新replayIndex的值切威,replayIndex向前移動一位
            updateBufferLocked(replayIndex + 1, minCollectorIndex, bufferEndIndex, queueEndIndex)
        }
        return true
    }

有收集者育特,但沒有配置任何緩存

 runBlocking {
        //默認(rèn)參數(shù)情況,先emit先朦,再collect收不到數(shù)據(jù)
        val sharedFlow = MutableSharedFlow<Int>(replay = 0, extraBufferCapacity = 0)

        launch {
            sharedFlow.collect {
                println("collect: $it")
            }
        }

        delay(200) //確保已經(jīng)訂閱
        sharedFlow.emit(1)
        sharedFlow.emit(2)
        sharedFlow.emit(3)
    }

上面demo缰冤,是能收到1,2喳魏,3的棉浸。
從源碼可以看出,tryEmitLocked方法中刺彩,有訂閱者的情況迷郑,即使沒有配置緩存也會執(zhí)行enqueueLocked(value)方法把數(shù)據(jù)加入到緩存數(shù)組。

有訂閱者创倔,且配置了replay或者extraBufferCapacity緩存嗡害,會多了一個緩存溢出策略。有3種策略畦攘,掛起協(xié)程霸妹,丟棄最老的數(shù)據(jù),丟棄最新的數(shù)據(jù)知押。

emitSuspend掛起方式發(fā)數(shù)據(jù)

private suspend fun emitSuspend(value: T) = suspendCancellableCoroutine<Unit> sc@{ cont ->
        var resumes: Array<Continuation<Unit>?> = EMPTY_RESUMES
        val emitter = synchronized(this) lock@{
            //再次檢查叹螟,確保緩存區(qū)滿了鹃骂,因為滿了才會執(zhí)行下面的邏輯
            if (tryEmitLocked(value)) {
                cont.resume(Unit)
                resumes = findSlotsToResumeLocked(resumes)
                return@lock null
            }
            // 創(chuàng)建Emitter,加入到buffer里
            //可以去看下前面的那張圖罢绽,Emitter是加到緩存區(qū)的什么位置的
            Emitter(this, head + totalSize, value, cont).also {
                //加到之前的緩存數(shù)組
                enqueueLocked(it)
                queueSize++ // Emitter是掛起的畏线,單獨記錄下數(shù)量
                // 如果buffered value緩存沒有數(shù)據(jù),則收集已經(jīng)掛起的訂閱者的續(xù)體有缆,保存到局部變量resumes中
                if (bufferCapacity == 0) resumes = findSlotsToResumeLocked(resumes)
            }
        }
        // outside of the lock: register dispose on cancellation
        emitter?.let { cont.disposeOnCancellation(it) }
        // 恢復(fù)掛起的訂閱者
        for (r in resumes) r?.resume(Unit)
    }

這個方法比較簡單象踊,緩存區(qū)滿了,創(chuàng)建Emitter加到buffer緩存區(qū)棚壁。

小結(jié)

閱讀emit發(fā)射數(shù)據(jù)的流程杯矩,可以分2部分完成,不掛起發(fā)射和掛起發(fā)射袖外。

如果沒有收集者史隆,emit永遠(yuǎn)不會掛起。

如果有收集者曼验,并且buffered values緩存容量已滿并且最舊的數(shù)據(jù)沒有被消費泌射,則emit有機(jī)會被掛起,當(dāng)然這取決于你的溢出策略鬓照。

collect消費數(shù)據(jù)

 override suspend fun collect(collector: FlowCollector<T>): Nothing {
         //分配槽位
        val slot = allocateSlot()
        try {
            if (collector is SubscribedFlowCollector) collector.onSubscription()
            val collectorJob = currentCoroutineContext()[Job]
            while (true) {
                var newValue: Any?
                while (true) {
                    //嘗試獲取值熔酷,獲取到了就跳出循環(huán),獲取不到就掛起等待
                    newValue = tryTakeValue(slot) // attempt no-suspend fast path first
                    if (newValue !== NO_VALUE) break
                    awaitValue(slot) // await signal that the new value is available
                }
                //判斷訂閱者所在協(xié)程是否是存活
                collectorJob?.ensureActive()
              //回調(diào)到collect方法的lambda
                collector.emit(newValue as T)
            }
        } finally {
            freeSlot(slot)
        }
    }

關(guān)于slot的理解豺裆,可以看下圖


slot.png
  • 消費者開始collect拒秘,根據(jù)index找到buffer下標(biāo)為0的元素即為可以消費的元素;
  • 拿到0號數(shù)據(jù)后臭猜,slot.index=1躺酒,找到buffer下標(biāo)為1的元素
    index++,重復(fù)上訴步驟
 private fun tryTakeValue(slot: SharedFlowSlot): Any? {
        var resumes: Array<Continuation<Unit>?> = EMPTY_RESUMES
        val value = synchronized(this) {
            // 從slot中獲取index
           // index表示當(dāng)前應(yīng)該從緩存數(shù)組的index位置中獲取數(shù)據(jù)
            val index = tryPeekLocked(slot)
            if (index < 0) {
                //沒有數(shù)據(jù)蔑歌,返回空數(shù)據(jù)的標(biāo)識
                NO_VALUE
            } else {
                val oldIndex = slot.index
                //從緩存數(shù)組buffer中獲取index對應(yīng)的數(shù)據(jù)
                val newValue = getPeekedValueLockedAt(index)
              //slot索引加1羹应,表示獲取下個數(shù)據(jù)的位置
                slot.index = index + 1 // points to the next index after peeked one
                resumes = updateCollectorIndexLocked(oldIndex)
                newValue
            }
        }
      //恢復(fù)協(xié)程
        for (resume in resumes) resume?.resume(Unit)
        return value
    }

tryPeekLocked方法,是判斷數(shù)據(jù)所在的位置是否符合要求次屠。

 private suspend fun awaitValue(slot: SharedFlowSlot): Unit = suspendCancellableCoroutine { cont ->
        synchronized(this) lock@{
            //再次檢查index
            val index = tryPeekLocked(slot) // recheck under this lock
            if (index < 0) {
            //保存續(xù)體cont到slot
                slot.cont = cont // Ok -- suspending
            } else {
                //說明有值园匹,不需要再繼續(xù)掛起了,通過resume恢復(fù)協(xié)程
                cont.resume(Unit) // has value, no need to suspend
                return@lock
            }
            slot.cont = cont // suspend, waiting
        }
    }

suspendCancellableCoroutine 是一個掛起函數(shù)帅矗,用于創(chuàng)建可取消的協(xié)程偎肃。在協(xié)程中調(diào)用 suspendCancellableCoroutine 函數(shù)時,它會創(chuàng)建一個 CancellableContinuation 對象浑此,并將其傳遞給一個lambda表達(dá)式累颂。該lambda表達(dá)式中的代碼可以使用 suspend 關(guān)鍵字掛起當(dāng)前協(xié)程,并在某個條件滿足時或協(xié)程被取消時恢復(fù)協(xié)程。

collect小結(jié)
collect方法會構(gòu)造Slot對象紊馏,然后開啟死循環(huán)去不斷匹配緩存區(qū)的數(shù)據(jù)料饥。具體是根據(jù)Slot 中的index匹配緩存區(qū)buffer中的數(shù)據(jù),如果匹配到了朱监,執(zhí)行collect閉包岸啡;匹配不到就掛起協(xié)程,掛起的協(xié)程會在有新數(shù)據(jù)時被生產(chǎn)者所恢復(fù)赫编。無論是否有生產(chǎn)者巡蘸,只要沒拿到數(shù)據(jù),collect都會被掛起擂送。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末悦荒,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子嘹吨,更是在濱河造成了極大的恐慌搬味,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,682評論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件蟀拷,死亡現(xiàn)場離奇詭異碰纬,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)问芬,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,277評論 3 395
  • 文/潘曉璐 我一進(jìn)店門悦析,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人此衅,你說我怎么就攤上這事她按。” “怎么了炕柔?”我有些...
    開封第一講書人閱讀 165,083評論 0 355
  • 文/不壞的土叔 我叫張陵,是天一觀的道長媒佣。 經(jīng)常有香客問我匕累,道長,這世上最難降的妖魔是什么默伍? 我笑而不...
    開封第一講書人閱讀 58,763評論 1 295
  • 正文 為了忘掉前任欢嘿,我火速辦了婚禮,結(jié)果婚禮上也糊,老公的妹妹穿的比我還像新娘炼蹦。我一直安慰自己,他們只是感情好狸剃,可當(dāng)我...
    茶點故事閱讀 67,785評論 6 392
  • 文/花漫 我一把揭開白布掐隐。 她就那樣靜靜地躺著,像睡著了一般。 火紅的嫁衣襯著肌膚如雪虑省。 梳的紋絲不亂的頭發(fā)上匿刮,一...
    開封第一講書人閱讀 51,624評論 1 305
  • 那天,我揣著相機(jī)與錄音探颈,去河邊找鬼熟丸。 笑死,一個胖子當(dāng)著我的面吹牛伪节,可吹牛的內(nèi)容都是我干的光羞。 我是一名探鬼主播,決...
    沈念sama閱讀 40,358評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼怀大,長吁一口氣:“原來是場噩夢啊……” “哼纱兑!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起叉寂,我...
    開封第一講書人閱讀 39,261評論 0 276
  • 序言:老撾萬榮一對情侶失蹤萍启,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后屏鳍,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體勘纯,經(jīng)...
    沈念sama閱讀 45,722評論 1 315
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,900評論 3 336
  • 正文 我和宋清朗相戀三年钓瞭,在試婚紗的時候發(fā)現(xiàn)自己被綠了驳遵。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,030評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡山涡,死狀恐怖堤结,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情鸭丛,我是刑警寧澤竞穷,帶...
    沈念sama閱讀 35,737評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站鳞溉,受9級特大地震影響瘾带,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜熟菲,卻給世界環(huán)境...
    茶點故事閱讀 41,360評論 3 330
  • 文/蒙蒙 一看政、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧抄罕,春花似錦允蚣、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,941評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至,卻和暖如春谴垫,著一層夾襖步出監(jiān)牢的瞬間章母,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,057評論 1 270
  • 我被黑心中介騙來泰國打工翩剪, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留乳怎,地道東北人。 一個月前我還...
    沈念sama閱讀 48,237評論 3 371
  • 正文 我出身青樓前弯,卻偏偏與公主長得像蚪缀,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子恕出,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,976評論 2 355

推薦閱讀更多精彩內(nèi)容