Android 里的 LiveData 進化成 Kotlin 的 Flow

本文為 Jose Alcérreca 發(fā)布于 Medium 的文章譯文
原文鏈接為 Migrating from LiveData to Kotlin’s Flow
本文僅作為個人學(xué)習(xí)記錄所用徘钥。如有涉及侵權(quán)奔浅,請相關(guān)人士盡快聯(lián)系譯文作者洒嗤。

LiveData 是在 2017 年被大家所開始使用员串,觀察者模式有效簡化了開發(fā),但 RxJava 等選項在當(dāng)時對于初學(xué)者來說太復(fù)雜了。 Android 架構(gòu)組件團隊創(chuàng)建了 LiveData:一個非常固執(zhí)的可觀察數(shù)據(jù)持有者類,專為 Android 設(shè)計。 它保持簡單以使其易于上手戳护,并且建議將 RxJava 用于更復(fù)雜的反應(yīng)式流案例,利用兩者之間的集成瀑焦。

DeadData?

LiveData 仍然是我們?yōu)?Java 開發(fā)人員腌且、初學(xué)者和簡單情況提供的解決方案。 對于其余的榛瓮,一個不錯的選擇是轉(zhuǎn)向 Kotlin Flows铺董。 Flows 仍然有一個陡峭的學(xué)習(xí)曲線,但它們是 Kotlin 語言的一部分禀晓,由 Jetbrains 提供支持柄粹; Compose 即將到來,它非常適合反應(yīng)式模型匆绣。

我們一直在談?wù)撌褂?Flows 來連接應(yīng)用程序的不同部分,除了視圖和 ViewModel什黑。 現(xiàn)在我們有了一種從 Android UI 收集流的更安全的方法崎淳,我們可以創(chuàng)建一個完整的遷移指南。
在這篇文章中愕把,您將學(xué)習(xí)如何將 Flows 暴露給一個視圖拣凹,如何收集它們,以及如何對其進行微調(diào)以滿足特定需求恨豁。我們一直在談?wù)撌褂?Flows 來連接應(yīng)用程序的不同部分嚣镜,除了視圖和 ViewModel。 現(xiàn)在我們有了一種從 Android UI 收集流的更安全的方法橘蜜,我們可以創(chuàng)建一個完整的遷移指南菊匿。

在這篇文章中付呕,您將學(xué)習(xí)如何將 Flows 暴露給一個視圖,如何收集它們跌捆,以及如何對其進行微調(diào)以滿足特定需求徽职。我們一直在談?wù)撌褂?Flows 來連接應(yīng)用程序的不同部分,除了視圖和 ViewModel佩厚。 現(xiàn)在我們有了一種從 Android UI 收集流的更安全的方法姆钉,我們可以創(chuàng)建一個完整的遷移指南。
在這篇文章中抄瓦,您將學(xué)習(xí)如何將 Flows 暴露給一個視圖潮瓶,如何收集它們,以及如何對其進行微調(diào)以滿足特定需求钙姊。

Flow:簡單的事情更難毯辅,復(fù)雜的事情更容易

LiveData 做了一件事并且做得很好:它在緩存最新值和了解 Android 的生命周期的同時公開數(shù)據(jù)。 后來我們了解到它也可以啟動協(xié)程創(chuàng)建復(fù)雜的轉(zhuǎn)換摸恍,但這有點復(fù)雜悉罕。
讓我們看看一些 LiveData 模式和它們的 Flow 等價物:

#1:使用可變數(shù)據(jù)持有者公開一次性操作的結(jié)果

這是經(jīng)典模式,您可以使用協(xié)程的結(jié)果來改變狀態(tài)持有者:

Expose the result of a one-shot operation with a Mutable data holder (LiveData)
<!-- Copyright 2020 Google LLC. 
   SPDX-License-Identifier: Apache-2.0 -->

class MyViewModel {
    private val _myUiState = MutableLiveData<Result<UiState>>(Result.Loading)
    val myUiState: LiveData<Result<UiState>> = _myUiState

    // Load data from a suspend fun and mutate state
    init {
        viewModelScope.launch { 
            val result = ...
            _myUiState.value = result
        }
    }
}

為了對 Flows 做同樣的事情立镶,我們使用 (Mutable)StateFlow:


Expose the result of a one-shot operation with a Mutable data holder (StateFlow)
class MyViewModel {
    private val _myUiState = MutableStateFlow<Result<UiState>>(Result.Loading)
    val myUiState: StateFlow<Result<UiState>> = _myUiState

    // Load data from a suspend fun and mutate state
    init {
        viewModelScope.launch { 
            val result = ...
            _myUiState.value = result
        }
    }
}

StateFlow 是一種特殊的 SharedFlow(它是一種特殊類型的 Flow)壁袄,最接近 LiveData:
它總是有價值的。
它只有一個值媚媒。
它支持多個觀察者(因此流程是共享的)嗜逻。
它總是 replays 訂閱的最新值,與活躍觀察者的數(shù)量無關(guān)缭召。

向視圖公開 UI 狀態(tài)時栈顷,請使用 StateFlow。 它是一個安全高效的觀察者嵌巷,旨在保持 UI 狀態(tài)萄凤。

#2:公開一次性操作的結(jié)果

這與前面的代碼片段等效,公開了沒有可變支持屬性的協(xié)程調(diào)用的結(jié)果搪哪。
對于 LiveData靡努,我們?yōu)榇耸褂昧?liveData 協(xié)程構(gòu)建器:

Expose the result of a one-shot operation (LiveData)

class MyViewModel(...) : ViewModel() {
    val result: LiveData<Result<UiState>> = liveData {
        emit(Result.Loading)
        emit(repository.fetchItem())
    }
}

由于狀態(tài)持有者總是有一個值,因此最好將我們的 UI 狀態(tài)包裝在某種支持 Loading晓折、Success 和 Error 等狀態(tài)的 Result 類中惑朦。
Flow 等效項涉及更多,因為您必須進行一些配置:

Expose the result of a one-shot operation (StateFlow)

class MyViewModel(...) : ViewModel() {
    val result: StateFlow<Result<UiState>> = flow {
        emit(repository.fetchItem())
    }.stateIn(
        scope = viewModelScope, 
        started = WhileSubscribed(5000), // Or Lazily because it's a one-shot
        initialValue = Result.Loading
    )
}

stateIn 是將 Flow 轉(zhuǎn)換為 StateFlow 的 Flow 運算符漓概。 現(xiàn)在讓我們相信這些參數(shù)漾月,因為我們稍后需要更多的復(fù)雜性來正確解釋它。

3:帶參數(shù)的一次性數(shù)據(jù)加載

假設(shè)您想加載一些取決于用戶 ID 的數(shù)據(jù)胃珍,并且您從暴露流的 AuthManager 獲取此信息:


One-shot data load with parameters (LiveData)

使用 LiveData梁肿,您將執(zhí)行類似以下操作:

class MyViewModel(authManager..., repository...) : ViewModel() {
    private val userId: LiveData<String?> = 
        authManager.observeUser().map { user -> user.id }.asLiveData()

    val result: LiveData<Result<Item>> = userId.switchMap { newUserId ->
        liveData { emit(repository.fetchItem(newUserId)) }
    }
}

switchMap 是一個轉(zhuǎn)換蜓陌,它的主體被執(zhí)行,并且當(dāng) userId 改變時栈雳,訂閱的結(jié)果也隨之改變护奈。
如果 userId 沒有理由成為 LiveData,那么更好的替代方法是將流與 Flow 結(jié)合起來哥纫,最后將公開的結(jié)果轉(zhuǎn)換為 LiveData霉旗。

class MyViewModel(authManager..., repository...) : ViewModel() {
    private val userId: Flow<UserId> = authManager.observeUser().map { user -> user.id }

    val result: LiveData<Result<Item>> = userId.mapLatest { newUserId ->
       repository.fetchItem(newUserId)
    }.asLiveData()
}

使用 Flows 執(zhí)行此操作看起來非常相似:


One-shot data load with parameters (StateFlow)
class MyViewModel(authManager..., repository...) : ViewModel() {
    private val userId: Flow<UserId> = authManager.observeUser().map { user -> user.id }

    val result: StateFlow<Result<Item>> = userId.mapLatest { newUserId ->
        repository.fetchItem(newUserId)
    }.stateIn(
        scope = viewModelScope, 
        started = WhileSubscribed(5000), 
        initialValue = Result.Loading
    )
}

請注意,如果你需要更大的靈活性蛀骇,您還可以使用 transformLatest 并顯式 emit 項目:

    val result = userId.transformLatest { newUserId ->
        emit(Result.LoadingData)
        emit(repository.fetchItem(newUserId))
    }.stateIn(
        scope = viewModelScope, 
        started = WhileSubscribed(5000), 
        initialValue = Result.LoadingUser // Note the different Loading states
    )

4:觀察帶參數(shù)的數(shù)據(jù)流

現(xiàn)在讓我們讓這個例子更具反應(yīng)性厌秒。 數(shù)據(jù)不是獲取的,而是觀察到的擅憔,因此我們將數(shù)據(jù)源中的更改自動傳播到 UI鸵闪。
繼續(xù)我們的例子:我們沒有在數(shù)據(jù)源上調(diào)用 fetchItem,而是使用一個假設(shè)的 observeItem 操作符暑诸,它返回一個 Flow蚌讼。
使用 LiveData,您可以將 Flow 轉(zhuǎn)換為 LiveData 并 emitSource 所有更新:

Observing a stream with parameters (LiveData)

class MyViewModel(authManager..., repository...) : ViewModel() {
    private val userId: LiveData<String?> = 
        authManager.observeUser().map { user -> user.id }.asLiveData()

    val result = userId.switchMap { newUserId ->
        repository.observeItem(newUserId).asLiveData()
    }
}

或者个榕,最好使用 flatMapLatest 組合兩個流篡石,并僅將輸出轉(zhuǎn)換為 LiveData:

class MyViewModel(authManager..., repository...) : ViewModel() {
    private val userId: Flow<String?> = 
        authManager.observeUser().map { user -> user?.id }

    val result: LiveData<Result<Item>> = userId.flatMapLatest { newUserId ->
        repository.observeItem(newUserId)
    }.asLiveData()
}

Flow 的實現(xiàn)類似,但沒有 LiveData 轉(zhuǎn)換:


Observing a stream with parameters (StateFlow)
class MyViewModel(authManager..., repository...) : ViewModel() {
    private val userId: Flow<String?> = 
        authManager.observeUser().map { user -> user?.id }

    val result: StateFlow<Result<Item>> = userId.flatMapLatest { newUserId ->
        repository.observeItem(newUserId)
    }.stateIn(
        scope = viewModelScope, 
        started = WhileSubscribed(5000), 
        initialValue = Result.LoadingUser
    )
}

每當(dāng)用戶更改或存儲庫中的用戶數(shù)據(jù)更改時西采,公開的 StateFlow 都會收到更新凰萨。

#5 組合多個來源:MediatorLiveData -> Flow.combine

MediatorLiveData 可讓您觀察一個或多個更新源(LiveData 可觀察對象)并在它們獲得新數(shù)據(jù)時執(zhí)行某些操作。 通常你更新 MediatorLiveData 的值:

val liveData1: LiveData<Int> = ...
val liveData2: LiveData<Int> = ...

val result = MediatorLiveData<Int>()

result.addSource(liveData1) { value ->
    result.setValue(liveData1.value ?: 0 + (liveData2.value ?: 0))
}
result.addSource(liveData2) { value ->
    result.setValue(liveData1.value ?: 0 + (liveData2.value ?: 0))
}

Flow 等價物更直接:

val flow1: Flow<Int> = ...
val flow2: Flow<Int> = ...

val result = combine(flow1, flow2) { a, b -> a + b }

您還可以使用 combineTransform 函數(shù)或 zip械馆。

配置暴露的 StateFlow(stateIn 操作符)

我們之前使用 stateIn 將常規(guī)流轉(zhuǎn)換為 StateFlow胖眷,但它需要一些配置。 如果你現(xiàn)在不想詳細介紹霹崎,只需要復(fù)制粘貼珊搀,我推薦這種組合:

val result: StateFlow<Result<UiState>> = someFlow
    .stateIn(
        scope = viewModelScope, 
        started = WhileSubscribed(5000), 
        initialValue = Result.Loading
    )

但是,如果您不確定這個看似隨機的 5 秒 started 參數(shù)尾菇,請繼續(xù)閱讀食棕。
stateIn 有 3 個參數(shù)(來自文檔):

@param scope the coroutine scope in which sharing is started.
@param started the strategy that controls when sharing is started and stopped.
@param initialValue the initial value of the state flow.
This value is also used when the state flow is reset using the [SharingStarted.WhileSubscribed] strategy with the replayExpirationMillis parameter.

started可以采用 3 個值:

  • Lazily:在第一個訂閱者出現(xiàn)時開始,在范圍取消時停止错沽。
  • Eagerly:立即開始并在范圍取消時停止
  • WhileSubscribed:這很復(fù)雜。

對于一次性操作眶拉,您可以使用 LazilyEagerly千埃。 但是,如果您正在觀察其他流程忆植,則應(yīng)該使用 WhileSubscribed 來執(zhí)行小而重要的優(yōu)化放可,如下所述谒臼。

WhileSubscribed 策略
WhileSubscribed 在沒有收集器時取消 upstream flow。 使用 stateIn 創(chuàng)建的 StateFlow 向 View 公開數(shù)據(jù)耀里,但它也在觀察來自其他層或應(yīng)用程序(上游)的流蜈缤。 保持這些流處于活動狀態(tài)可能會導(dǎo)致資源浪費,例如冯挎,如果它們繼續(xù)從其他來源(例如數(shù)據(jù)庫連接底哥、硬件傳感器等)讀取數(shù)據(jù)。**When your app goes to the background, you should be a good citizen and stop these coroutines.

WhileSubscribed 有兩個參數(shù):

public fun WhileSubscribed(
stopTimeoutMillis: Long = 0,
replayExpirationMillis: Long = Long.MAX_VALUE
)

停止超時
來至于它的文檔:

stopTimeoutMillis 配置最后一個訂閱者消失和上游流停止之間的延遲(以毫秒為單位)房官。 它默認為零(立即停止)趾徽。

這很有用,因為如果視圖停止偵聽幾分之一秒翰守,您不想取消上游流孵奶。 這一直發(fā)生。例如蜡峰,當(dāng)用戶旋轉(zhuǎn)設(shè)備并且視圖被快速連續(xù)地破壞和重新創(chuàng)建時了袁。
liveData 協(xié)程構(gòu)建器中的解決方案是添加 5 秒的延遲,如果沒有訂閱者湿颅,協(xié)程將在此后停止载绿。 WhileSubscribed(5000) 正是這樣做的:

class MyViewModel(...) : ViewModel() {
    val result = userId.mapLatest { newUserId ->
        repository.observeItem(newUserId)
    }.stateIn(
        scope = viewModelScope, 
        started = WhileSubscribed(5000), 
        initialValue = Result.Loading
    )
}

這種方法檢查所有框:

  • 當(dāng)用戶將您的應(yīng)用程序發(fā)送到后臺時,來自其他層的更新將在 5 秒后停止肖爵,從而節(jié)省電量卢鹦。
  • 最新的值仍會被緩存,這樣當(dāng)用戶回到它時劝堪,視圖會立即有一些數(shù)據(jù)冀自。
  • 訂閱重新啟動,新值將出現(xiàn)秒啦,可用時刷新屏幕熬粗。

Replay expiration
如果您不希望用戶在他們離開太久后看到陳舊數(shù)據(jù)并且你更喜歡顯示加載屏幕,請查看 WhileSubscribed 中的 replayExpirationMillis 參數(shù)余境。 在這種情況下它非常方便驻呐,并且還節(jié)省了一些內(nèi)存,因為緩存的值恢復(fù)到 stateIn 中定義的初始值芳来。 返回應(yīng)用程序不會那么快含末,但您不會顯示舊數(shù)據(jù)。

replayExpirationMillis— configures a delay (in milliseconds) between the stopping of the sharing coroutine and the resetting of the replay cache (which makes the cache empty for the shareIn operator and resets the cached value to the original initialValue for the stateIn operator). It defaults to Long.MAX_VALUE (keep replay cache forever, never reset buffer). Use zero value to expire the cache immediately.

從視圖中觀察 StateFlow

到目前為止即舌,我們已經(jīng)看到佣盒,讓視圖讓 ViewModel 中的 StateFlows 知道它們不再監(jiān)聽是非常重要的。 然而顽聂,與生命周期相關(guān)的所有事情一樣肥惭,事情并沒有那么簡單盯仪。
為了收集流,你需要一個協(xié)程蜜葱。 活動和片段提供了一堆協(xié)程構(gòu)建器:

  • Activity.lifecycleScope.launch:立即啟動協(xié)程全景,活動銷毀時取消。
  • Fragment.lifecycleScope.launch:立即啟動協(xié)程牵囤,并在片段銷毀時取消協(xié)程爸黄。
  • Fragment.viewLifecycleOwner.lifecycleScope.launch:立即啟動協(xié)程,并在片段的視圖生命周期被銷毀時取消協(xié)程奔浅。 如果您正在修改 UI馆纳,您應(yīng)該使用視圖生命周期。

LaunchWhenStarted汹桦、launchWhenResumed…

稱為 launchWhenX 的特殊版本的 launch 將等到 lifecycleOwner 處于X 狀態(tài)并在lifecycleOwner 低于X 狀態(tài)時暫停協(xié)程鲁驶。 重要的是要注意,在其生命周期所有者被銷毀之前舞骆,它們不會取消協(xié)程钥弯。

Collecting Flows with launch/launchWhenX is unsafe

在應(yīng)用程序處于后臺時接收更新可能會導(dǎo)致崩潰,這可以通過暫停視圖中的集合來解決督禽。 但是脆霎,當(dāng)應(yīng)用程序在后臺時,上游流會保持活動狀態(tài)狈惫,這可能會浪費資源睛蛛。
這意味著到目前為止我們?yōu)榕渲?StateFlow 所做的一切都將毫無用處; 然而胧谈,這是一個新的 API忆肾。

Lifecycle.repeatOnLifecycle 來救援

這個新的協(xié)程構(gòu)建器(可從生命周期運行時-ktx 2.4.0-alpha01 獲得)正是我們所需要的:它在特定狀態(tài)下啟動協(xié)程,并在生命周期所有者低于它時停止它們菱肖。

Different Flow collection methods

例如客冈,在一個 Fragment 中:

onCreateView(...) {
    viewLifecycleOwner.lifecycleScope.launch {
        viewLifecycleOwner.lifecycle.repeatOnLifecycle(STARTED) {
            myViewModel.myUiState.collect { ... }
        }
    }
}

這將在 Fragment 的視圖 STARTED 開始收集,將繼續(xù)通過 RESUMED稳强,并在返回到 STOPPED 時停止场仲。可以讀下這篇文章: A safer way to collect flows from Android UIs
將 repeatOnLifecycle API 與上面的 StateFlow 指南混合在一起退疫,可以在充分利用設(shè)備資源的同時獲得最佳性能渠缕。

StateFlow exposed with WhileSubscribed(5000) and collected with repeatOnLifecycle(STARTED)

Warning: The StateFlow support recently added to Data Binding uses launchWhenCreated to collect updates, and it will start using `repeatOnLifecycle``instead when it reaches stable.

For Data Binding, you should use Flows everywhere and simply add asLiveData() to expose them to the view. Data Binding will be updated when lifecycle-runtime-ktx 2.4.0 goes stable.

總結(jié):
從 ViewModel 公開數(shù)據(jù)并從視圖收集數(shù)據(jù)的最佳方法是:
?? 使用 WhileSubscribed 策略公開 StateFlow,并帶有超時褒繁。

class MyViewModel(...) : ViewModel() {
    val result = userId.mapLatest { newUserId ->
        repository.observeItem(newUserId)
    }.stateIn(
        scope = viewModelScope, 
        started = WhileSubscribed(5000), 
        initialValue = Result.Loading
    )
}

?? 使用 repeatOnLifecycle 收集亦鳞。

onCreateView(...) {
    viewLifecycleOwner.lifecycleScope.launch {
        viewLifecycleOwner.lifecycle.repeatOnLifecycle(STARTED) {
            myViewModel.myUiState.collect { ... }
        }
    }
}

任何其他組合都會使上游 Flows 保持活動狀態(tài),從而浪費資源:
? 使用 WhileSubscribed 公開并在生命周期范圍內(nèi)收集。launch/launchWhenX
? 使用 Lazily/Eagerly 公開并使用 repeatOnLifecycle 收集
當(dāng)然蚜迅,如果你不需要 Flow 的全部功能……只需使用 LiveData。 :)


以下附帶 Android 開發(fā)者官我那個對 Kolin 的 Flow 的介紹:
https://developer.android.com/kotlin/flow

在協(xié)程中俊抵,F(xiàn)low 是一種可以順序發(fā)出多個值的類型谁不,而不是只返回一個值的掛起函數(shù)。例如徽诲,您可以使用流從數(shù)據(jù)庫接收實時更新刹帕。

Flows 建立在協(xié)程之上,可以提供多個值谎替。Flow 在概念上是可以異步計算的數(shù)據(jù)流偷溺。發(fā)出的值必須是相同的類型。例如钱贯, Flow<Int> 是一個發(fā)出整數(shù)值的流挫掏。

流與生成值序列的迭代器非常相似,但它使用掛起函數(shù)異步生成和消費值秩命。這意味著尉共,例如,F(xiàn)low 可以安全地發(fā)出網(wǎng)絡(luò)請求以生成下一個值弃锐,而不會阻塞主線程袄友。

數(shù)據(jù)流涉及三個實體:

  • 生產(chǎn)者產(chǎn)生添加到流中的數(shù)據(jù)。多虧了協(xié)程霹菊,流也可以異步產(chǎn)生數(shù)據(jù)剧蚣。

  • (可選)中介可以修改發(fā)送到流中的每個值或流本身。

  • 消費者使用流中的值旋廷。

圖 1. 數(shù)據(jù)流中涉及的實體:消費者鸠按、可選中介和生產(chǎn)者。

在 Android 中柳洋,存儲庫通常是 UI 數(shù)據(jù)的生產(chǎn)者待诅,其用戶界面 (UI) 作為最終顯示數(shù)據(jù)的使用者。 其他時候熊镣,UI 層是用戶輸入事件的生產(chǎn)者卑雁,而層次結(jié)構(gòu)的其他層則使用它們。 生產(chǎn)者和消費者之間的層通常充當(dāng)中間人绪囱,修改數(shù)據(jù)流以使其適應(yīng)下一層的要求测蹲。

創(chuàng)建一個 Flow

要創(chuàng)建 flows,請使用 flow builder APIs鬼吵。 Flow 構(gòu)建器函數(shù)創(chuàng)建一個新 Flow扣甲,您可以在其中使用發(fā)射函數(shù)手動將新值 emit 到數(shù)據(jù)流中。

在以下示例中,數(shù)據(jù)源以固定時間間隔自動獲取最新消息琉挖。 由于掛起函數(shù)不能返回多個連續(xù)值启泣,因此數(shù)據(jù)源創(chuàng)建并返回一個 Flow 來滿足此要求。 在這種情況下示辈,數(shù)據(jù)源充當(dāng)生產(chǎn)者寥茫。

class NewsRemoteDataSource(
    private val newsApi: NewsApi,
    private val refreshIntervalMs: Long = 5000
) {
    val latestNews: Flow<List<ArticleHeadline>> = flow {
        while(true) {
            val latestNews = newsApi.fetchLatestNews()
            emit(latestNews) // Emits the result of the request to the flow
            delay(refreshIntervalMs) // Suspends the coroutine for some time
        }
    }
}

// Interface that provides a way to make network requests with suspend functions
interface NewsApi {
    suspend fun fetchLatestNews(): List<ArticleHeadline>
}

flow builder 在協(xié)程中執(zhí)行。 因此矾麻,它受益于相同的異步 API纱耻,但存在一些限制:

Flows 是連續(xù)的。 由于生產(chǎn)者在協(xié)程中险耀,當(dāng)調(diào)用掛起函數(shù)時弄喘,生產(chǎn)者掛起直到掛起函數(shù)返回。 在這個例子中甩牺,生產(chǎn)者掛起直到 fetchLatestNews 網(wǎng)絡(luò)請求完成蘑志。 只有這樣,結(jié)果才會發(fā)送到流中柴灯。

使用流構(gòu)建器析命,生產(chǎn)者不能從不同的 CoroutineContext 發(fā)出值粘我。 因此,不要通過創(chuàng)建新的協(xié)程或使用 withContext 代碼塊在不同的 CoroutineContext 中調(diào)用發(fā)射。 在這些情況下观游,您可以使用其他流構(gòu)建器亏拉,例如 callbackFlow宵蛀。

修改流

中介可以使用中間操作符來修改數(shù)據(jù)流而不消耗值组力。 這些運算符是函數(shù),當(dāng)應(yīng)用于數(shù)據(jù)流時冬三,會設(shè)置一系列操作匀油,直到將來使用這些值時才會執(zhí)行這些操作。 在 Flow reference documentation 中了解有關(guān)中間運算符的更多信息勾笆。

在下面的示例中敌蚜,存儲庫層使用中間運算符 map 來轉(zhuǎn)換要在視圖上顯示的數(shù)據(jù):

class NewsRepository(
    private val newsRemoteDataSource: NewsRemoteDataSource,
    private val userData: UserData
) {
    /**
     * Returns the favorite latest news applying transformations on the flow.
     * These operations are lazy and don't trigger the flow. They just transform
     * the current value emitted by the flow at that point in time.
     */
    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            // Intermediate operation to filter the list of favorite topics
            .map { news -> news.filter { userData.isFavoriteTopic(it) } }
            // Intermediate operation to save the latest news in the cache
            .onEach { news -> saveInCache(news) }
}

中間運算符可以一個接一個地應(yīng)用,形成一個操作鏈窝爪,當(dāng)一個項目被發(fā)送到 Flow 中時弛车,這些操作鏈會延遲執(zhí)行。 請注意蒲每,簡單地將中間運算符應(yīng)用于流并不會啟動 Flow 集合纷跛。

從 Flow 中收集

使用終端運算符觸發(fā) Flow 以開始偵聽值。 要獲取流中發(fā)出的所有值邀杏,請使用 collect贫奠。

由于 collect 是一個掛起函數(shù),它需要在協(xié)程中執(zhí)行。 它接受一個 lambda 作為參數(shù)唤崭,在每個新值上調(diào)用該參數(shù)拷恨。 由于它是一個掛起函數(shù),調(diào)用 collect 的協(xié)程可能會掛起谢肾,直到 Flow 關(guān)閉挑随。

繼續(xù)前面的示例,這里是一個使用存儲庫層數(shù)據(jù)的 ViewModel 的簡單實現(xiàn):

class LatestNewsViewModel(
    private val newsRepository: NewsRepository
) : ViewModel() {

    init {
        viewModelScope.launch {
            // Trigger the flow and consume its elements using collect
            newsRepository.favoriteLatestNews.collect { favoriteNews ->
                // Update View with the latest favorite news
            }
        }
    }
}

收集 Flow 觸發(fā)更新最新消息的生產(chǎn)者勒叠,并以固定的時間間隔發(fā)出網(wǎng)絡(luò)請求的結(jié)果。由于生產(chǎn)者在 while(true) 循環(huán)中始終保持活動狀態(tài)膏孟,因此當(dāng) ViewModel 被清除并取消 viewModelScope 時眯分,數(shù)據(jù)流將關(guān)閉。

由于以下原因柒桑,F(xiàn)low 收集可能會停止:

  • 收集的協(xié)程被取消弊决,如上例所示。這也阻止了底層生產(chǎn)者魁淳。

  • 生產(chǎn)者完成發(fā)射項目飘诗。在這種情況下,數(shù)據(jù)流關(guān)閉界逛,調(diào)用 collect 的協(xié)程恢復(fù)執(zhí)行昆稿。

除非與其他中間操作符指定,否則 Flow 是冷的和惰性的息拜。這意味著每次在流上調(diào)用終端操作符時都會執(zhí)行生產(chǎn)者代碼溉潭。在前面的示例中,擁有多個流收集器會導(dǎo)致數(shù)據(jù)源以不同的固定時間間隔多次獲取最新消息少欺。要在多個消費者同時收集時優(yōu)化和共享流喳瓣,請使用 shareIn 運算符。

捕獲意外異常

生產(chǎn)者的實現(xiàn)可以來自第三方庫赞别。 這意味著它可以拋出意外的異常畏陕。 要處理這些異常,請使用 catch 中間運算符仿滔。

class LatestNewsViewModel(
    private val newsRepository: NewsRepository
) : ViewModel() {

    init {
        viewModelScope.launch {
            newsRepository.favoriteLatestNews
                // Intermediate catch operator. If an exception is thrown,
                // catch and update the UI
                .catch { exception -> notifyError(exception) }
                .collect { favoriteNews ->
                    // Update View with the latest favorite news
                }
        }
    }
}

在前面的示例中惠毁,當(dāng)發(fā)生異常時,不會調(diào)用 collect lambda堤撵,因為尚未收到新項目仁讨。
catch 還可以向流 emit 項目。 示例存儲庫層可以改為 emit 緩存值:

class NewsRepository(...) {
    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            .map { news -> news.filter { userData.isFavoriteTopic(it) } }
            .onEach { news -> saveInCache(news) }
            // If an error happens, emit the last cached values
            .catch { exception -> emit(lastCachedNews()) }
}

在這個例子中实昨,當(dāng)一個異常發(fā)生時洞豁,collect lambda 被調(diào)用,因為一個新的項目因為異常被發(fā)送到流中。

在不同的 CoroutineContext 中執(zhí)行

默認情況下丈挟,Flow 構(gòu)建器的生產(chǎn)者在從它收集的協(xié)程的 CoroutineContext 中執(zhí)行刁卜,并且如前所述,它不能從不同的 CoroutineContext 發(fā)出值曙咽。 在某些情況下蛔趴,這種行為可能是不可取的。 例如例朱,在本主題中使用的示例中孝情,存儲庫層不應(yīng)在 viewModelScope 使用的 Dispatchers.Main 上執(zhí)行操作。

要更改流的 CoroutineContext洒嗤,請使用中間運算符 flowOn箫荡。 flowOn 改變了上游流的 CoroutineContext,這意味著生產(chǎn)者和任何在 flowOn 之前(或之上)應(yīng)用的中間操作符渔隶。 下游流(flowOn 之后的中間運算符以及消費者)不受影響羔挡,并在用于從流中收集的 CoroutineContext 上執(zhí)行。 如果有多個 flowOn 操作符间唉,每個操作符都會改變其當(dāng)前位置的上游绞灼。

class NewsRepository(
    private val newsRemoteDataSource: NewsRemoteDataSource,
    private val userData: UserData,
    private val defaultDispatcher: CoroutineDispatcher
) {
    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            .map { news -> // Executes on the default dispatcher
                news.filter { userData.isFavoriteTopic(it) }
            }
            .onEach { news -> // Executes on the default dispatcher
                saveInCache(news)
            }
            // flowOn affects the upstream flow ↑
            .flowOn(defaultDispatcher)
            // the downstream flow ↓ is not affected
            .catch { exception -> // Executes in the consumer's context
                emit(lastCachedNews())
            }
}

使用此代碼,onEachmap 操作符使用 defaultDispatcher呈野,而 catch 操作符和使用者在 viewModelScope 使用的 Dispatchers.Main 上執(zhí)行低矮。

由于數(shù)據(jù)源層正在進行 I/O 工作,因此您應(yīng)該使用針對 I/O 操作進行優(yōu)化的調(diào)度程序:

class NewsRemoteDataSource(
    ...,
    private val ioDispatcher: CoroutineDispatcher
) {
    val latestNews: Flow<List<ArticleHeadline>> = flow {
        // Executes on the IO dispatcher
        ...
    }
        .flowOn(ioDispatcher)
}

Jetpack 庫中的流程

Flow 被集成到許多 Jetpack 庫中被冒,它在 Android 第三方庫中很受歡迎商佛。 Flow 非常適合實時數(shù)據(jù)更新和無休止的數(shù)據(jù)流。

您可以使用 Flow with Room 來通知數(shù)據(jù)庫中的更改姆打。 使用數(shù)據(jù)訪問對象 data access objects (DAO) 時良姆,返回 Flow 類型以獲取實時更新。

@Dao
abstract class ExampleDao {
    @Query("SELECT * FROM Example")
    abstract fun getExamples(): Flow<List<Example>>
}

每次示例表中發(fā)生更改時幔戏,都會發(fā)出一個包含數(shù)據(jù)庫中新項目的新列表玛追。

將基于回調(diào)的 API 轉(zhuǎn)換為流

callbackFlow 是一個流構(gòu)建器,可讓您將基于回調(diào)的 API 轉(zhuǎn)換為流闲延。 例如痊剖, Firebase Firestore Android API 使用回調(diào)。 要將這些 API 轉(zhuǎn)換為流并偵聽 Firestore 數(shù)據(jù)庫更新垒玲,您可以使用以下代碼:

class FirestoreUserEventsDataSource(
    private val firestore: FirebaseFirestore
) {
    // Method to get user events from the Firestore database
    fun getUserEvents(): Flow<UserEvents> = callbackFlow {

        // Reference to use in Firestore
        var eventsCollection: CollectionReference? = null
        try {
            eventsCollection = FirebaseFirestore.getInstance()
                .collection("collection")
                .document("app")
        } catch (e: Throwable) {
            // If Firebase cannot be initialized, close the stream of data
            // flow consumers will stop collecting and the coroutine will resume
            close(e)
        }

        // Registers callback to firestore, which will be called on new events
        val subscription = eventsCollection?.addSnapshotListener { snapshot, _ ->
            if (snapshot == null) { return@addSnapshotListener }
            // Sends events to the flow! Consumers will get the new events
            try {
                offer(snapshot.getEvents())
            } catch (e: Throwable) {
                // Event couldn't be sent to the flow
            }
        }

        // The callback inside awaitClose will be executed when the flow is
        // either closed or cancelled.
        // In this case, remove the callback from Firestore
        awaitClose { subscription?.remove() }
    }
}

Flow 構(gòu)建器不同陆馁,callbackFlow 允許使用 send 函數(shù)從不同的 CoroutineContext 發(fā)出值,或者使用 offer 函數(shù)從協(xié)程外部發(fā)出值合愈。

在內(nèi)部叮贩,callbackFlow 使用一個 channel击狮,它在概念上與阻塞 queue 非常相似。 一個通道配置了一個容量益老,即可以緩沖的最大元素數(shù)彪蓬。 在 callbackFlow 中創(chuàng)建的通道默認容量為 64 個元素。 當(dāng)您嘗試將新元素添加到完整頻道時捺萌,發(fā)送會暫停生產(chǎn)者档冬,直到有新元素的空間,而 offer 不會將元素添加到頻道并立即返回 false桃纯。

額外 Flow 資料鏈接:

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末酷誓,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子态坦,更是在濱河造成了極大的恐慌呛牲,老刑警劉巖,帶你破解...
    沈念sama閱讀 219,427評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件驮配,死亡現(xiàn)場離奇詭異,居然都是意外死亡着茸,警方通過查閱死者的電腦和手機壮锻,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,551評論 3 395
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來涮阔,“玉大人猜绣,你說我怎么就攤上這事【刺兀” “怎么了掰邢?”我有些...
    開封第一講書人閱讀 165,747評論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長伟阔。 經(jīng)常有香客問我辣之,道長,這世上最難降的妖魔是什么皱炉? 我笑而不...
    開封第一講書人閱讀 58,939評論 1 295
  • 正文 為了忘掉前任怀估,我火速辦了婚禮,結(jié)果婚禮上合搅,老公的妹妹穿的比我還像新娘多搀。我一直安慰自己,他們只是感情好灾部,可當(dāng)我...
    茶點故事閱讀 67,955評論 6 392
  • 文/花漫 我一把揭開白布康铭。 她就那樣靜靜地躺著,像睡著了一般赌髓。 火紅的嫁衣襯著肌膚如雪从藤。 梳的紋絲不亂的頭發(fā)上催跪,一...
    開封第一講書人閱讀 51,737評論 1 305
  • 那天,我揣著相機與錄音呛哟,去河邊找鬼叠荠。 笑死,一個胖子當(dāng)著我的面吹牛扫责,可吹牛的內(nèi)容都是我干的榛鼎。 我是一名探鬼主播,決...
    沈念sama閱讀 40,448評論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼鳖孤,長吁一口氣:“原來是場噩夢啊……” “哼者娱!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起苏揣,我...
    開封第一講書人閱讀 39,352評論 0 276
  • 序言:老撾萬榮一對情侶失蹤黄鳍,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后平匈,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體框沟,經(jīng)...
    沈念sama閱讀 45,834評論 1 317
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,992評論 3 338
  • 正文 我和宋清朗相戀三年增炭,在試婚紗的時候發(fā)現(xiàn)自己被綠了忍燥。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,133評論 1 351
  • 序言:一個原本活蹦亂跳的男人離奇死亡隙姿,死狀恐怖梅垄,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情输玷,我是刑警寧澤队丝,帶...
    沈念sama閱讀 35,815評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站欲鹏,受9級特大地震影響机久,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜赔嚎,卻給世界環(huán)境...
    茶點故事閱讀 41,477評論 3 331
  • 文/蒙蒙 一吞加、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧尽狠,春花似錦衔憨、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,022評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至沉馆,卻和暖如春码党,著一層夾襖步出監(jiān)牢的瞬間德崭,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,147評論 1 272
  • 我被黑心中介騙來泰國打工揖盘, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留眉厨,地道東北人。 一個月前我還...
    沈念sama閱讀 48,398評論 3 373
  • 正文 我出身青樓兽狭,卻偏偏與公主長得像憾股,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子箕慧,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,077評論 2 355

推薦閱讀更多精彩內(nèi)容