前言
對于SharedFlow使用可以看之前的文章 Kotlin SharedFlow 使用。在這篇文章中已經(jīng)通過多個demo實戰(zhàn)幫大家總結(jié)了SharedFlow的一些特性和使用場景印机。但是也遺留了一些疑惑咐刨,所以本文打算通過輕度閱讀源碼的方式給大家答疑解惑竭恬。放心殷蛇,不會從頭到尾的說。
SharedFlow源碼分析的重點
在源碼中可以分析的點很多直砂,但我認(rèn)為最重要的點有2點史辙。emit和collect是如何關(guān)聯(lián)的汹买,緩存機(jī)制是怎樣的。
從上一篇文章可以了解到emit和collect是掛起函數(shù)髓霞,但是否被掛起是有一定條件的卦睹。而且生產(chǎn)者,消費者出現(xiàn)的時機(jī)也會影響數(shù)據(jù)流的執(zhí)行方库,這些問題就是本文要研究的重點。
下面分析emit和collect的小節(jié)中還會提出幾個問題障斋,帶著大家通過解決問題的方式閱讀源碼纵潦。
回顧下之前文章的demo
為什么先發(fā)射數(shù)據(jù),再collect就收不到數(shù)據(jù)呢
runBlocking {
val sharedFlow = MutableSharedFlow<Int>()
sharedFlow.emit(1)
launch {
sharedFlow.collect {
println("collect: $it")
}
}
}
為什么設(shè)置了replay緩存垃环,超出其緩存數(shù)量的時候邀层,會丟失前面的數(shù)據(jù),只能收到最新replay數(shù)量的數(shù)據(jù)呢遂庄?比如下面demo寥院,只能收到2和3
runBlocking {
//默認(rèn)參數(shù)情況,先emit涛目,再collect收不到數(shù)據(jù)
val sharedFlow = MutableSharedFlow<Int>(replay = 2, extraBufferCapacity = 0)
sharedFlow.emit(1)
sharedFlow.emit(2)
sharedFlow.emit(3)
launch {
sharedFlow.collect {
println("collect: $it")
}
}
}
emit發(fā)射數(shù)據(jù)
對于初學(xué)者來說秸谢,讀懂這個方法確實有一定難度,不過我換了一個角度來帶大家理解霹肝,從不同業(yè)務(wù)場景估蹄,使用角度來逐步分析,大致分為4種沫换。
- 沒有收集者臭蚁,無replay緩存
- 沒有收集者,有replay緩存(extraBufferCapacity緩存無影響)
- 有收集者讯赏,無replay緩存
- 有收集者垮兑,有replay緩存
override suspend fun emit(value: T) {
//嘗試發(fā)送數(shù)據(jù),這是一個快速路徑,可以提高發(fā)送數(shù)據(jù)的效率漱挎。
if (tryEmit(value)) return // fast-path
//在掛起的協(xié)程中發(fā)送數(shù)據(jù)
emitSuspend(value)
}
emit方法先用不需要掛起的方式發(fā)數(shù)據(jù)系枪,發(fā)失敗之后采用掛起的方式發(fā)。
以不掛起的方式發(fā)射數(shù)據(jù)
override fun tryEmit(value: T): Boolean {
var resumes: Array<Continuation<Unit>?> = EMPTY_RESUMES
val emitted = synchronized(this) {
//嘗試發(fā)射數(shù)據(jù)识樱,發(fā)射成功返回true嗤无,失敗返回false
if (tryEmitLocked(value)) {
//找到需要恢復(fù)的協(xié)程震束,并將結(jié)果保存到 resumes 數(shù)組中,
resumes = findSlotsToResumeLocked(resumes)
true
} else {
false
}
}
//上面已經(jīng)找到了需要恢復(fù)的協(xié)程当犯,這里只需要恢復(fù)協(xié)程的執(zhí)行
for (cont in resumes) cont?.resume(Unit)
return emitted
}
private fun tryEmitLocked(value: T): Boolean {
// 沒有收集者垢村,一定返回true
if (nCollectors == 0) return tryEmitNoCollectorsLocked(value)
// 有收集者,緩存區(qū)已滿嚎卫,超過replay + extraBufferCapacity數(shù)量嘉栓,且消費者沒有消費最舊的數(shù)據(jù)(replayIndex)
if (bufferSize >= bufferCapacity && minCollectorIndex <= replayIndex) {
//執(zhí)行下面的緩存溢出策略
when (onBufferOverflow) {
BufferOverflow.SUSPEND -> return false // will suspend
BufferOverflow.DROP_LATEST -> return true // just drop incoming
BufferOverflow.DROP_OLDEST -> {} //丟棄最舊的數(shù)據(jù),這里暫不處理
}
}
//將數(shù)據(jù)加入到緩存數(shù)組中拓诸,這里不會掛起emit所在的協(xié)程
enqueueLocked(value)
bufferSize++ //緩存數(shù)組長度
// 上面的緩存溢出策略侵佃,丟棄最老數(shù)據(jù)是沒做處理的,實際上延遲在這里處理
if (bufferSize > bufferCapacity) dropOldestLocked()
// 如果replayCache中數(shù)據(jù)的數(shù)量超過了最大容量
if (replaySize > replay) {
// 更新replayIndex的值奠支,replayIndex向前移動一位
updateBufferLocked(replayIndex + 1, minCollectorIndex, bufferEndIndex, queueEndIndex)
}
return true
}
這個方法已經(jīng)涉及到shareflow緩存機(jī)制馋辈,所以有必要先來張圖大概了解下緩存機(jī)制。
shareflow緩存是個數(shù)組倍谜,大小由 bufferSize 控制迈螟,而緩存容量由 bufferCapacity 控制。緩存由3部分組成尔崔,replay緩存的數(shù)量答毫,extraBufferCapacity緩存的數(shù)量,這2部分加起來就是buffered values季春,還有就是掛起時候的Emitter對象洗搂。圖中還展示了兩個慢速收集器的位置,即可能收集緩存隊列中的值的最慢速度的收集器的位置载弄。這兩個位置分別由 minCollectorIndex 和 replayIndex 控制耘拇。
這個tryEmitLocked
方法很重要褥芒,因為它可以解釋之前文章中一些業(yè)務(wù)場景的困惑回挽。 下面我們就來跟大家一起解剖下這個方法
tryEmitLocked方法沒有收集者
//走到這里,說明沒有收集者
private fun tryEmitNoCollectorsLocked(value: T): Boolean {
assert { nCollectors == 0 }
//replay緩存為0衫贬,就丟棄數(shù)據(jù)尺碰,emit方法就結(jié)束了
if (replay == 0) return true
enqueueLocked(value) // 加入到緩存數(shù)組
bufferSize++ // value was added to buffer
//若是emit發(fā)射的數(shù)量超過了重放個數(shù)挣棕,則丟棄最舊的值
if (bufferSize > replay) dropOldestLocked()
minCollectorIndex = head + bufferSize // a default value (max allowed)
return true
}
這段代碼也解釋了之前的2個疑惑
- 在不配置replay緩存的情況下,先emit發(fā)數(shù)據(jù)再collect是收不到數(shù)據(jù)的
- 在配置了replay的情況下亲桥,先emit再collect是能收到數(shù)據(jù)洛心,但是 emit發(fā)射的數(shù)量超過了replay的話,就只能收到最新的replay個數(shù)的數(shù)據(jù)
如下2段代碼题篷,運(yùn)行結(jié)果都只能收到2和3
runBlocking {
//默認(rèn)參數(shù)情況词身,先emit,再collect收不到數(shù)據(jù)
val sharedFlow = MutableSharedFlow<Int>(replay = 2, extraBufferCapacity = 0)
sharedFlow.emit(1)
sharedFlow.emit(2)
sharedFlow.emit(3)
launch {
sharedFlow.collect {
println("collect: $it")
}
}
}
runBlocking {
//這里配置了extraBufferCapacity根本不會起到效果
val sharedFlow = MutableSharedFlow<Int>(replay = 2, extraBufferCapacity = 2)
sharedFlow.emit(1)
sharedFlow.emit(2)
sharedFlow.emit(3)
launch {
sharedFlow.collect {
println("collect: $it")
}
}
}
從上面demo和
tryEmitNoCollectorsLocked
源碼分析可以看出:如果emit
發(fā)射到buffered values的數(shù)據(jù)數(shù)量超過了replay的值番枚,會丟棄最舊的數(shù)據(jù)法严,保持buffered values中數(shù)據(jù)的數(shù)量最大為replay损敷。
當(dāng)有新的訂閱者時,會先從replayCache中獲取數(shù)據(jù)深啤,在buffered values中拗馒,replayCache前的數(shù)據(jù)只對已經(jīng)訂閱的訂閱者有用,而此時又沒有訂閱者溯街,因此緩存超過replayCache最大容量的數(shù)據(jù)只會占用更多內(nèi)存诱桂,是沒有意義的。記壮饰簟:沒有收集者時挥等,extraBufferCapacity是不會起作用的
tryEmitLocked方法有收集者
上面已經(jīng)分析了這個方法沒有收集者的情況,接下來就分析下有收集者的情況
private fun tryEmitLocked(value: T): Boolean {
// 沒有收集者堤尾,一定返回true
if (nCollectors == 0) return tryEmitNoCollectorsLocked(value)
// 有收集者肝劲,緩存區(qū)已滿,超過replay + extraBufferCapacity數(shù)量哀峻,且消費者沒有消費最舊的數(shù)據(jù)(replayIndex)
if (bufferSize >= bufferCapacity && minCollectorIndex <= replayIndex) {
//執(zhí)行下面的緩存溢出策略
when (onBufferOverflow) {
BufferOverflow.SUSPEND -> return false // will suspend
BufferOverflow.DROP_LATEST -> return true // just drop incoming
BufferOverflow.DROP_OLDEST -> {} //丟棄最舊的數(shù)據(jù)涡相,這里暫不處理
}
}
//將數(shù)據(jù)加入到緩存數(shù)組中,這里不會掛起emit所在的協(xié)程
enqueueLocked(value)
bufferSize++ //緩存數(shù)組長度
// 上面的緩存溢出策略剩蟀,丟棄最老數(shù)據(jù)是沒做處理的,實際上延遲在這里處理
if (bufferSize > bufferCapacity) dropOldestLocked()
// 如果replayCache中數(shù)據(jù)的數(shù)量超過了最大容量
if (replaySize > replay) {
// 更新replayIndex的值切威,replayIndex向前移動一位
updateBufferLocked(replayIndex + 1, minCollectorIndex, bufferEndIndex, queueEndIndex)
}
return true
}
有收集者育特,但沒有配置任何緩存
runBlocking {
//默認(rèn)參數(shù)情況,先emit先朦,再collect收不到數(shù)據(jù)
val sharedFlow = MutableSharedFlow<Int>(replay = 0, extraBufferCapacity = 0)
launch {
sharedFlow.collect {
println("collect: $it")
}
}
delay(200) //確保已經(jīng)訂閱
sharedFlow.emit(1)
sharedFlow.emit(2)
sharedFlow.emit(3)
}
上面demo缰冤,是能收到1,2喳魏,3的棉浸。
從源碼可以看出,tryEmitLocked
方法中刺彩,有訂閱者的情況迷郑,即使沒有配置緩存也會執(zhí)行enqueueLocked(value)方法把數(shù)據(jù)加入到緩存數(shù)組。
有訂閱者创倔,且配置了replay或者extraBufferCapacity緩存嗡害,會多了一個緩存溢出策略。有3種策略畦攘,掛起協(xié)程霸妹,丟棄最老的數(shù)據(jù),丟棄最新的數(shù)據(jù)知押。
emitSuspend掛起方式發(fā)數(shù)據(jù)
private suspend fun emitSuspend(value: T) = suspendCancellableCoroutine<Unit> sc@{ cont ->
var resumes: Array<Continuation<Unit>?> = EMPTY_RESUMES
val emitter = synchronized(this) lock@{
//再次檢查叹螟,確保緩存區(qū)滿了鹃骂,因為滿了才會執(zhí)行下面的邏輯
if (tryEmitLocked(value)) {
cont.resume(Unit)
resumes = findSlotsToResumeLocked(resumes)
return@lock null
}
// 創(chuàng)建Emitter,加入到buffer里
//可以去看下前面的那張圖罢绽,Emitter是加到緩存區(qū)的什么位置的
Emitter(this, head + totalSize, value, cont).also {
//加到之前的緩存數(shù)組
enqueueLocked(it)
queueSize++ // Emitter是掛起的畏线,單獨記錄下數(shù)量
// 如果buffered value緩存沒有數(shù)據(jù),則收集已經(jīng)掛起的訂閱者的續(xù)體有缆,保存到局部變量resumes中
if (bufferCapacity == 0) resumes = findSlotsToResumeLocked(resumes)
}
}
// outside of the lock: register dispose on cancellation
emitter?.let { cont.disposeOnCancellation(it) }
// 恢復(fù)掛起的訂閱者
for (r in resumes) r?.resume(Unit)
}
這個方法比較簡單象踊,緩存區(qū)滿了,創(chuàng)建Emitter加到buffer緩存區(qū)棚壁。
小結(jié)
閱讀emit發(fā)射數(shù)據(jù)的流程杯矩,可以分2部分完成,不掛起發(fā)射和掛起發(fā)射袖外。
如果沒有收集者史隆,emit永遠(yuǎn)不會掛起。
如果有收集者曼验,并且buffered values緩存容量已滿并且最舊的數(shù)據(jù)沒有被消費泌射,則emit有機(jī)會被掛起,當(dāng)然這取決于你的溢出策略鬓照。
collect消費數(shù)據(jù)
override suspend fun collect(collector: FlowCollector<T>): Nothing {
//分配槽位
val slot = allocateSlot()
try {
if (collector is SubscribedFlowCollector) collector.onSubscription()
val collectorJob = currentCoroutineContext()[Job]
while (true) {
var newValue: Any?
while (true) {
//嘗試獲取值熔酷,獲取到了就跳出循環(huán),獲取不到就掛起等待
newValue = tryTakeValue(slot) // attempt no-suspend fast path first
if (newValue !== NO_VALUE) break
awaitValue(slot) // await signal that the new value is available
}
//判斷訂閱者所在協(xié)程是否是存活
collectorJob?.ensureActive()
//回調(diào)到collect方法的lambda
collector.emit(newValue as T)
}
} finally {
freeSlot(slot)
}
}
關(guān)于slot的理解豺裆,可以看下圖
- 消費者開始collect拒秘,根據(jù)index找到buffer下標(biāo)為0的元素即為可以消費的元素;
- 拿到0號數(shù)據(jù)后臭猜,slot.index=1躺酒,找到buffer下標(biāo)為1的元素
index++,重復(fù)上訴步驟
private fun tryTakeValue(slot: SharedFlowSlot): Any? {
var resumes: Array<Continuation<Unit>?> = EMPTY_RESUMES
val value = synchronized(this) {
// 從slot中獲取index
// index表示當(dāng)前應(yīng)該從緩存數(shù)組的index位置中獲取數(shù)據(jù)
val index = tryPeekLocked(slot)
if (index < 0) {
//沒有數(shù)據(jù)蔑歌,返回空數(shù)據(jù)的標(biāo)識
NO_VALUE
} else {
val oldIndex = slot.index
//從緩存數(shù)組buffer中獲取index對應(yīng)的數(shù)據(jù)
val newValue = getPeekedValueLockedAt(index)
//slot索引加1羹应,表示獲取下個數(shù)據(jù)的位置
slot.index = index + 1 // points to the next index after peeked one
resumes = updateCollectorIndexLocked(oldIndex)
newValue
}
}
//恢復(fù)協(xié)程
for (resume in resumes) resume?.resume(Unit)
return value
}
tryPeekLocked
方法,是判斷數(shù)據(jù)所在的位置是否符合要求次屠。
private suspend fun awaitValue(slot: SharedFlowSlot): Unit = suspendCancellableCoroutine { cont ->
synchronized(this) lock@{
//再次檢查index
val index = tryPeekLocked(slot) // recheck under this lock
if (index < 0) {
//保存續(xù)體cont到slot
slot.cont = cont // Ok -- suspending
} else {
//說明有值园匹,不需要再繼續(xù)掛起了,通過resume恢復(fù)協(xié)程
cont.resume(Unit) // has value, no need to suspend
return@lock
}
slot.cont = cont // suspend, waiting
}
}
suspendCancellableCoroutine
是一個掛起函數(shù)帅矗,用于創(chuàng)建可取消的協(xié)程偎肃。在協(xié)程中調(diào)用suspendCancellableCoroutine
函數(shù)時,它會創(chuàng)建一個CancellableContinuation
對象浑此,并將其傳遞給一個lambda表達(dá)式累颂。該lambda表達(dá)式中的代碼可以使用suspend
關(guān)鍵字掛起當(dāng)前協(xié)程,并在某個條件滿足時或協(xié)程被取消時恢復(fù)協(xié)程。
collect小結(jié)
collect方法會構(gòu)造Slot對象紊馏,然后開啟死循環(huán)去不斷匹配緩存區(qū)的數(shù)據(jù)料饥。具體是根據(jù)Slot 中的index匹配緩存區(qū)buffer中的數(shù)據(jù),如果匹配到了朱监,執(zhí)行collect閉包岸啡;匹配不到就掛起協(xié)程,掛起的協(xié)程會在有新數(shù)據(jù)時被生產(chǎn)者所恢復(fù)赫编。無論是否有生產(chǎn)者巡蘸,只要沒拿到數(shù)據(jù),collect都會被掛起擂送。