前一節(jié)(Kotlin 學習筆記(六)—— Flow 數(shù)據(jù)流學習實踐指北(二)StateFlow 與 SharedFlow)介紹完了兩種熱流的構(gòu)造方法以及它們的特點惩猫,那有沒有方法可以將冷流轉(zhuǎn)化為熱流呢?當然是有的。那為什么需要將冷流轉(zhuǎn)化為熱流呢如迟?
假如有這么一個場景:一開始有一個冷流 coldFlow 和它對應(yīng)的消費者诀拭,后來下游又有幾個新來的消費者需要使用這個 coldFlow摸吠,并且還需要之前已發(fā)送過的數(shù)據(jù)纬凤。而冷流的生產(chǎn)者與消費者是一對一的關(guān)系姿搜,且沒有 replay
緩存機制哀托,為新的消費者再創(chuàng)建一個冷流開銷較大,這種情況下將冷流轉(zhuǎn)為熱流就顯得事半功倍了劳秋。
1. shareIn 操作符
Flow 中的 shareIn
操作符就可以將冷流轉(zhuǎn)為熱流仓手,它的方法聲明是:
// code 1
public fun <T> Flow<T>.shareIn(
scope: CoroutineScope,
started: SharingStarted,
replay: Int = 0
): SharedFlow<T>
首先看返回值,最終確實會轉(zhuǎn)化為一個熱流 SharedFlow 實例玻淑。方法參數(shù)先來看最簡單的 replay
參數(shù)嗽冒,就是設(shè)置回播到每個新增消費者的數(shù)據(jù)個數(shù),默認為 0补履。所以默認情況下添坊,新增的消費者只能收到從它開始收集的時間點之后,生產(chǎn)者發(fā)送的數(shù)據(jù)箫锤。
再來看第一個 scope
參數(shù)贬蛙,用于設(shè)置一個 CoroutineScope 作用域雨女,注意其生命周期的長度需要比任何消費者都要長,保證被轉(zhuǎn)化成的熱流能在所有消費者收集數(shù)據(jù)進行消費時阳准,都能處于活躍狀態(tài)氛堕。新被轉(zhuǎn)化的熱流其實就是一個共享數(shù)據(jù)流,可以被所有的消費者共享使用野蝇。
第二個參數(shù) started
復(fù)雜一些讼稚,它是用于設(shè)置被轉(zhuǎn)化為共享數(shù)據(jù)流的啟動方式,官方提供有 3 種方式绕沈,下面一個個說:
SharingStarted.Eagerly
勤快式啟動方式锐想。不等第一個消費者出現(xiàn)就會立即啟動,需要注意的是乍狐,這種方式只會保留啟動時數(shù)據(jù)流發(fā)送的前 replay
個數(shù)據(jù)赠摇,再之前的數(shù)據(jù)會立即丟棄。即不對數(shù)據(jù)流緩存區(qū)以外的數(shù)據(jù)負責浅蚪,所以 replay
緩存區(qū)大小設(shè)置很重要藕帜。
SharingStarted.Lazily
懶漢式啟動方式。需要等第一個消費者出現(xiàn)才會啟動掘鄙,第一個消費者可以接收到數(shù)據(jù)流所有發(fā)送的數(shù)據(jù)耘戚;但其他后面的消費者只能接收到最近的 replay
個數(shù)據(jù)。這種方式啟動的數(shù)據(jù)流會一直保持活躍狀態(tài)操漠,甚至所有的的消費者都退出觀察不再接收了收津,數(shù)據(jù)流仍然會緩存最近的 replay
個數(shù)據(jù)。
SharingStarted.WhileSubscribed()
靈活式啟動方式浊伙。默認情況下就是有消費者來它就立即啟動撞秋,沒消費者接收了它就立即停止。所以在第一個消費者出現(xiàn)數(shù)據(jù)流就啟動嚣鄙,當最后一個消費者退出它就立即停止吻贿,但它仍會永久緩存最近的 replay
個數(shù)據(jù)。此外哑子,這種啟動方式還可以根據(jù)需求自定義設(shè)置參數(shù):
// code 2
public fun WhileSubscribed(
stopTimeoutMillis: Long = 0,
replayExpirationMillis: Long = Long.MAX_VALUE
): SharingStarted =
StartedWhileSubscribed(stopTimeoutMillis, replayExpirationMillis)
stopTimeoutMillis:設(shè)置最后一個消費者退出后舅列,多長時間后再關(guān)閉數(shù)據(jù)流。默認是 0卧蜓,即立即關(guān)閉帐要。
replayExpirationMillis:設(shè)置關(guān)閉流之后等待多長時間后,再重置清空緩存區(qū) replay cache
的數(shù)據(jù)弥奸。默認是 Long.MAX_VALUE
榨惠,即永遠保存。
自定義 SharingStarted
其實還可以自定義啟動方式,自己實現(xiàn) SharingStarted
接口即可赠橙。如果看了前三種啟動方式的源碼耽装,不難會發(fā)現(xiàn),其實啟動方式都是使用固定的幾個 SharingCommand
實現(xiàn)的期揪。SharingCommand
有三種:
// code 3
public enum class SharingCommand {
/**
* 開始啟動掉奄,并開始收集上游數(shù)據(jù)流.
* 多次發(fā)送這個命令并沒有什么用(支持防抖),如果先發(fā)送 STOP 再發(fā)送 START 則是重啟一個上游數(shù)據(jù)流横侦。
*/
START,
/**
* 停止數(shù)據(jù)流, 取消上游數(shù)據(jù)流的收集所在協(xié)程挥萌。
*/
STOP,
/**
* 停止數(shù)據(jù)流, 取消上游數(shù)據(jù)流的收集所在協(xié)程绰姻。并且將 replayCache 緩沖區(qū)的值重置為初始狀態(tài)枉侧。
* 如果是 shareIn 操作符,則會調(diào)用 [MutableSharedFlow.resetReplayCache] 方法;
* 如果是 stateIn 操作符狂芋,則會將緩沖數(shù)據(jù)重置為最初設(shè)置的初始值.
*/
STOP_AND_RESET_REPLAY_CACHE
}
感興趣的同學可以看看 SharingStarted.WhileSubscribed()
的具體實現(xiàn)類 StartedWhileSubscribed
里面的源碼榨馁。如果需要自定義啟動方式,照著葫蘆畫瓢即可帜矾。
既然有 shareIn
翼虫,那自然就少不了 stateIn
了。
2. stateIn 操作符
方法聲明:
// code 4
public fun <T> Flow<T>.stateIn(
scope: CoroutineScope,
started: SharingStarted,
initialValue: T
): StateFlow<T>
首先可以看出返回值是一個熱流 StateFlow 實例屡萤,那么自然而然就需要一個參數(shù)給它設(shè)置一個初始值珍剑,即第三個參數(shù) initialValue
。 前兩個參數(shù)與 shareIn
一樣死陆,這里就不再贅述招拙。
3. shareIn 與 stateIn 使用指北
3.1 SharingStarted.WhileSubscribed() 實際使用
從上面的介紹可知,這種啟動方式可以在沒有消費者時自動取消上游數(shù)據(jù)流措译,從而避免資源的浪費别凤。但在實際使用中,建議使用 SharingStarted.WhileSubscribed(5000)
领虹,即在最后一個消費者停止后再保持數(shù)據(jù)流 5 秒鐘的活躍狀態(tài)规哪。避免在某些特定情況下(如配置改變——最常見就是橫豎屏切換、暗夜模式切換)重啟上游的數(shù)據(jù)流塌衰。
3.2 shareIn诉稍、stateIn 適用于屬性聲明而非方法返回值
shareIn
和 stateIn
都會創(chuàng)建一個新的數(shù)據(jù)流,具體說就是 shareIn
會構(gòu)建一個 ReadonlySharedFlow 實例最疆;stateIn
則會構(gòu)建一個 ReadonlyStateFlow 實例杯巨。而新創(chuàng)建的數(shù)據(jù)流會一直保存在內(nèi)存中,直到傳入數(shù)據(jù)流的作用域被取消或者沒有任何引用時才會被 GC 回收肚菠。
所以下面代碼中舔箭,前一部分代碼是禁止使用的,正確的使用應(yīng)該是如后一部分的代碼,即在屬性中使用层扶。
// code 5
//錯誤示例:每次調(diào)用方法都會構(gòu)建新的數(shù)據(jù)流
fun getUser(): Flow<User> =
userLocalDataSource.getUser()
.shareIn(externalScope, WhileSubscribed())
//正確示例:在屬性中使用 shareIn 或 stateIn
val user: Flow<User> =
userLocalDataSource.getUser().shareIn(externalScope, WhileSubscribed())
3.3 MutableSharedFlow 的 subscriptionCount 參數(shù)
這個參數(shù)表示的是 MutableSharedFlow 中活躍的消費者數(shù)目箫章,即訂閱者的個數(shù)【祷幔可用于監(jiān)聽消費者的數(shù)目變更檬寂,下面就是一個例子:
// code 6
sharedFlow.subscriptionCount
.map { count -> count > 0 } // count > 0 說明有消費者,返回 true戳表;= 0 說明沒有消費者了桶至,返回 false
.distinctUntilChanged() // only react to true<->false changes
.onEach { isActive -> // configure an action
if (isActive) { // do something } else { // do something }
}
.launchIn(scope) // launch it
這個例子可以在有消費者收集數(shù)據(jù)流時,做一些自己的操作匾旭;當所有消費者都停止收集時镣屹,再處理另外的一些操作,比如資源回收等价涝。
distinctUntilChanged
操作符比較面生女蜈,它就是過濾掉前面接收到的重復(fù)值,從而使得后面只會接收到發(fā)生了變化的新值色瘩,和 StateFlow 特性一樣伪窖。
onEach
操作符也比較常見,可以在流上新增一些處理操作居兆,再發(fā)給下游覆山。
3.4 與操作符的搭配使用
如果在實際使用中,需要得知上游數(shù)據(jù)流的一些狀態(tài)泥栖,比如開始簇宽、完成等,則需要在上游數(shù)據(jù)流轉(zhuǎn)為熱流之前添加一些操作符起到監(jiān)聽的作用聊倔。
onStart 操作符監(jiān)聽啟動晦毙,onCompletion 操作符監(jiān)聽完成
// code 7
private fun shareInOnStartDemo() {
val testFlow = flow {
println("++++emit before")
emit(4)
delay(1000)
emit(5)
delay(1000)
emit(6)
}.onStart {
emit(-1)
println("++++ onStart")
}.onCompletion {
emit(-100)
println("++++ onCompletion")
}.shareIn(
lifecycleScope,
SharingStarted.WhileSubscribed(),
8
)
lifecycleScope.launch {
testFlow.collect {
println("++++ collector receive $it")
}
}
}
從打印的 log 可以看到,確實可以監(jiān)聽狀態(tài)耙蔑。當然也可以在相同的位置添加 catch
操作符用于監(jiān)聽異常的發(fā)生见妒,感興趣的同學可以試試看。
4. StateFlow 代碼實戰(zhàn)
說了這么多 Flow 的東西甸陌,最后以一個實際的例子結(jié)束這一章節(jié)的學習筆記吧须揣!
下面我將用一個應(yīng)用實例來講解 StateFlow 的實際應(yīng)用。這個例子將會用到 debounce
钱豁、distinctUnitChanged
耻卡、flatMapLatest
等操作符,用這些操作符去實現(xiàn)一個文本輸入中實時查詢的例子牲尺。
假設(shè)有個需求卵酪,要實現(xiàn)一個瀏覽器搜索的功能幌蚊,根據(jù)用戶不斷輸入的字符去查詢相關(guān)的內(nèi)容。如果不做任何處理溃卡,用戶對鍵入的字符串做的任何修改溢豆,都會去請求一次接口,那后端服務(wù)器肯定是吃不消的瘸羡;對于用戶而言漩仙,在不斷輸入的過程中返回的結(jié)果用戶并不會很關(guān)心,他只會關(guān)心最終輸入完成之后請求的數(shù)據(jù)犹赖。那么队他,如何減少后端的接口請求次數(shù)是關(guān)鍵所在。
先來看看核心的代碼:
// code 8 ViewModel.kt 文件
val queryStateFlow = MutableStateFlow("")
fun getQueryResult(): Flow<String> {
return queryStateFlow
.debounce(300L)
.distinctUntilChanged()
.flatMapLatest {
if (it.isNullOrBlank()) {
flow { emit("") }
} else {
dataFromNetwork(it).catch {
emitAll( flow { emit("") } )
}
}
}
.flowOn(Dispatchers.IO)
}
// 模擬網(wǎng)絡(luò)請求的耗時操作
private fun dataFromNetwork(query: String): Flow<String> {
return flow {
delay(2000)
emit(query) // 返回請求的結(jié)果
}
}
首先可以直觀地感受到峻村,使用 Flow 去處理這一邏輯較為簡單麸折,代碼量較少,這也是 Flow 的魅力所在雀哨。我們按順序介紹一下所使用到的 Flow 操作符:
debounce 操作符
具體的操作符方法聲明:
// code 9
public fun <T> Flow<T>.debounce(timeoutMillis: Long): Flow<T>
用于過濾掉最新的發(fā)射值之前 timeoutMillis 時間內(nèi)發(fā)射的值磕谅,返回一個過濾后的 Flow私爷。官方栗子非常清楚:
// code 10
flow {
emit(1)
delay(90)
emit(2)
delay(90)
emit(3)
delay(1010)
emit(4)
delay(1010)
emit(5)
}.debounce(1000)
最終會發(fā)射出下面的三個值:
3, 4, 5
發(fā)射 1 之后不到 1000ms 又發(fā)射了 2雾棺,所以 1 就會被過濾掉不會發(fā)射了,以此類推衬浑。所以最后發(fā)射的值是一定可以發(fā)射成功的捌浩。通過這個操作符,我們就可以有效減少頻繁請求接口的問題工秩,這里設(shè)置的 timeout 為 300ms尸饺,即在用戶連續(xù)輸入過程中每間隔 300ms 才去請求一次數(shù)據(jù)。
distinctUntilChanged 操作符
具體操作符聲明為:
// code 11
public fun <T> Flow<T>.distinctUntilChanged(): Flow<T>
用于過濾掉重復(fù)的發(fā)射值助币。雖然 StateFlow 本身就可過濾掉沒有變化的發(fā)射值浪听,但在這里還是需要的,因為用戶可能會刪除剛輸入的字符眉菱,這一操作符可進一步減少不必要的接口請求迹栓。
flatMapLatest 操作符
我看的代碼版本這個操作符還是實驗性api,后續(xù)可能被移除俭缓。具體操作符聲明為:
// code 12
@ExperimentalCoroutinesApi
public inline fun <T, R> Flow<T>.flatMapLatest(@BuilderInference crossinline transform: suspend (value: T) -> Flow<R>): Flow<R>
這個操作符可以在原流的基礎(chǔ)上生成一個新流克伊,當原流依次發(fā)出 a、b 兩值時华坦,新流都會接收愿吹,但如果新流 a 值的相關(guān)操作還未結(jié)束,則會取消 a 值的相關(guān)操作惜姐,并用 b 值進行操作犁跪。簡單說就是,丟棄舊值操作,換用新值操作坷衍。下面是一個例子:
// code 13
fun flatMapLatestDemo() {
val testFlow = flow {
emit("a")
delay(100)
emit("b")
}.flatMapLatest {
flow {
emit("receive $it")
delay(200)
emit("send $it")
}
}
lifecycleScope.launch {
testFlow.collect {
println("----$it")
}
}
}
通過打印的 log 可以看出撵颊,a,b 都被 flatMapLatest
操作符接收到了惫叛,只有 b 最終通過倡勇。這是因為 a 先到達,等待了 100ms 后新的值 b 也到了嘉涌,但 a 還在等待中妻熊,這時 flatMapLatest
就會取消掉 a 后續(xù)的操作。如果把 delay(200)
改成 delay(50)
仑最,那最終 a扔役,b 都能被打印出來。
所以這個操作符在 code 8 中的作用就是進一步減少接口請求的次數(shù)警医。當輸入的新字符串到來時讶踪,就會將之前舊字符串還未結(jié)束的請求操作取消掉竹挡,用新的字符串去請求數(shù)據(jù)。
ViewModel.kt
的代碼終于說完了,其他的代碼就比較常規(guī)了淋硝,直接上碼:
// code 14 MainActivity.kt
binding.editText.addTextChangedListener(object : TextWatcher{
override fun beforeTextChanged(s: CharSequence?, start: Int, count: Int, after: Int) { }
override fun onTextChanged(input: CharSequence?, start: Int, before: Int, count: Int) {
viewModel.queryStateFlow.value = input.toString()
}
override fun afterTextChanged(s: Editable?) { }
})
lifecycleScope.launch {
repeatOnLifecycle(Lifecycle.State.STARTED) {
viewModel.getQueryResult()
.collect {
binding.tvText.text = it
}
}
}
有關(guān) Flow 的相關(guān)知識就到此結(jié)束了举农,來個簡單總結(jié)吧~
總結(jié)
1)shareIn
和 stateIn
都可將冷流轉(zhuǎn)化為熱流侦啸,將數(shù)據(jù)共享給多個消費者梅鹦,無需為每個消費者創(chuàng)建同一個數(shù)據(jù)流的新實例。兩者通常用于提升性能鲁豪,在沒有消費者時緩存數(shù)據(jù)潘悼;
2)SharingStarted
啟動方式有 Eagerly
、Lazily
爬橡、WhileSubscribed
三種治唤,最常用的還是 WhileSubscribed
,有消費者就啟動糙申,沒有就停止宾添,還能設(shè)置停止延時時長和緩存過期時長;
3)注意 shareIn
郭宝、stateIn
都會新建一個 Flow辞槐,不要用于方法的返回值,建議賦值給屬性粘室;
4)shareIn
榄檬、stateIn
與 onStart
、onCompletion
等搭配可監(jiān)聽轉(zhuǎn)成的熱流的狀態(tài)衔统;
5)distinctUntilChanged
操作符可過濾重復(fù)數(shù)據(jù)鹿榜,一般用于 SharedFlow海雪;debounce
可用于在某一時間段內(nèi)防抖;flatMapLatest
操作符可以用最新值替換舊值發(fā)送給下游舱殿,舊值直接被取消作廢奥裸。
更多內(nèi)容,歡迎關(guān)注公眾號:修之竹
或者查看 修之竹的 Android 專輯
贊人玫瑰沪袭,手留余香湾宙!歡迎點贊、轉(zhuǎn)發(fā)~ 轉(zhuǎn)發(fā)請注明出處~
參考文獻
- StateFlow 和 SharedFlow 官方文檔 https://developer.android.google.cn/kotlin/flow/stateflow-and-sharedflow?hl=zh-cn
- Flow 操作符 shareIn 和 stateIn 使用須知冈绊;Android開發(fā)者侠鳄;https://mp.weixin.qq.com/s/PbqF-vzDrttYq-cSR6NDmQ
- Kotlin協(xié)程:冷流轉(zhuǎn)換熱流的使用與原理;LeeDuo死宣;https://blog.csdn.net/LeeDuoZuiShuai/article/details/127145092