在協(xié)程中季希,F(xiàn)low 是一種可以順序發(fā)出多個(gè)值的類型,而不是只返回單個(gè)值的掛起函數(shù)幽纷。例如式塌,你可以使用 Flow 從數(shù)據(jù)庫接收實(shí)時(shí)更新。
數(shù)據(jù)流建立在協(xié)程之上友浸,可以提供多個(gè)值峰尝。Flow 在概念上是可以異步計(jì)算的數(shù)據(jù)流。發(fā)出的值必須是同一類型收恢。例如武学, Flow<Int>
是一個(gè)發(fā)出整數(shù)值的流。
數(shù)據(jù)流與生成一組序列值的 Iterator
非常相似伦意,但它使用掛起函數(shù)來異步生成和使用值禀倔。這意味著鬼癣,例如,F(xiàn)low 可以安全地發(fā)出網(wǎng)絡(luò)請求以生成下一個(gè)值,而不會(huì)阻塞主線程响巢。
數(shù)據(jù)流涉及三個(gè)實(shí)體:
- 提供方會(huì)生成添加到數(shù)據(jù)流中的數(shù)據(jù)。得益于協(xié)程胞谭,數(shù)據(jù)流還可以異步生成數(shù)據(jù)蛔琅。
- (可選)中介可以修改發(fā)送到數(shù)據(jù)流的值,或修正數(shù)據(jù)流本身奈辰。
- 使用方則使用數(shù)據(jù)流中的值栏妖。
在 Android 中,代碼庫通常是界面數(shù)據(jù)的提供方奖恰,其將界面用作最終顯示數(shù)據(jù)的使用方吊趾。 而其他時(shí)候,UI 層是用戶輸入事件的生產(chǎn)者瑟啃,而層次結(jié)構(gòu)的其他層使用它們论泛。 提供方和使用方之間的層通常充當(dāng)中介,修改數(shù)據(jù)流以使其適應(yīng)下一層的要求蛹屿。
創(chuàng)建 Flow
要?jiǎng)?chuàng)建流屁奏,請使用flow
構(gòu)建器 API。 流構(gòu)建器函數(shù)創(chuàng)建一個(gè)新的 Flow错负,你可以在其中使用 emit 函數(shù)手動(dòng)將新值發(fā)送到數(shù)據(jù)流中坟瓢。
在以下示例中勇边,數(shù)據(jù)源以固定的時(shí)間間隔自動(dòng)獲取最新新聞資訊。 由于掛起函數(shù)不能返回多個(gè)連續(xù)值折联,數(shù)據(jù)源創(chuàng)建并返回一個(gè)數(shù)據(jù)流來滿足這個(gè)要求粒褒。 在這種情況下,數(shù)據(jù)源充當(dāng)提供方诚镰。
class NewsRemoteDataSource(
private val newsApi: NewsApi,
private val refreshIntervalMs: Long = 5000
) {
val latestNews: Flow<List<ArticleHeadline>> = flow {
while(true) {
val latestNews = newsApi.fetchLatestNews()
emit(latestNews) // 將請求的結(jié)果發(fā)送到數(shù)據(jù)流
delay(refreshIntervalMs) // 暫停協(xié)程一段時(shí)間
}
}
}
// 提供一種通過掛起功能發(fā)出網(wǎng)絡(luò)請求的方法的接口
interface NewsApi {
suspend fun fetchLatestNews(): List<ArticleHeadline>
}
flow
構(gòu)建器在協(xié)程中執(zhí)行奕坟。 因此,它受益于相同的異步 API怕享,但有一些限制:
- 數(shù)據(jù)流是有順序的执赡。當(dāng)協(xié)程內(nèi)的提供方調(diào)用掛起函數(shù)時(shí),提供方會(huì)掛起函筋,直到掛起函數(shù)返回沙合。 在示例中,提供方會(huì)掛起跌帐,直到
fetchLatestNews
網(wǎng)絡(luò)請求完成為止首懈。只有這樣,請求結(jié)果才會(huì)發(fā)送到數(shù)據(jù)流中谨敛。 - 使用
flow
構(gòu)建器究履,生產(chǎn)者不能從不同的 CoroutineContext 發(fā)出值。 因此脸狸,不要通過創(chuàng)建新的協(xié)程或使用 withContext 代碼塊在不同的 CoroutineContext 中調(diào)用 emit最仑。 在這些情況下,您可以使用其他流構(gòu)建器炊甲,例如 callbackFlow泥彤。
修改數(shù)據(jù)流
中介可以使用中間運(yùn)算符來修改數(shù)據(jù)流,而無需使用這些值卿啡。 這些操作符是函數(shù)吟吝,當(dāng)應(yīng)用于數(shù)據(jù)流時(shí),會(huì)設(shè)置一系列暫不執(zhí)行的鏈?zhǔn)竭\(yùn)算颈娜,直到將來使用這些值時(shí)才會(huì)執(zhí)行這些操作剑逃。 在 Flow 參考文檔中了解有關(guān)中間運(yùn)算符的更多信息。
在下面的示例中官辽,存儲(chǔ)庫層使用中間運(yùn)算符 map 來轉(zhuǎn)換要在 View
上顯示的數(shù)據(jù):
class NewsRepository(
private val newsRemoteDataSource: NewsRemoteDataSource,
private val userData: UserData
) {
/**
* 返回對應(yīng)流轉(zhuǎn)換的最喜歡的最新新聞資訊蛹磺。這些操作是惰性的,不會(huì)觸發(fā)流程同仆。
* 它們只是轉(zhuǎn)換流在該時(shí)間點(diǎn)發(fā)出的當(dāng)前值萤捆。
*/
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
// 過濾收藏主題列表的中間操作
.map { news -> news.filter { userData.isFavoriteTopic(it) } }
// 將最新消息保存在緩存中的中間操作
.onEach { news -> saveInCache(news) }
}
中間運(yùn)算符可以接連應(yīng)用,形成鏈?zhǔn)竭\(yùn)算,在數(shù)據(jù)項(xiàng)被發(fā)送到數(shù)據(jù)流時(shí)延遲執(zhí)行鳖轰。 請注意,僅將一個(gè)中間運(yùn)算符應(yīng)用于數(shù)據(jù)流不會(huì)啟動(dòng)數(shù)據(jù)流收集扶镀。
從 Flow(數(shù)據(jù)流) 中收集
使用終端運(yùn)算符可觸發(fā)數(shù)據(jù)流開始監(jiān)聽值蕴侣。如需獲取數(shù)據(jù)流中的所有發(fā)出值,請使用 collect臭觉。 你可以在官方 Flow 文檔中了解更多關(guān)于終端運(yùn)算符的信息昆雀。
因?yàn)?collect
是一個(gè)掛起函數(shù),所以它需要在協(xié)程中執(zhí)行蝠筑。 它接受 lambda 作為在每個(gè)新值上調(diào)用的參數(shù)狞膘。 由于它是一個(gè)掛起函數(shù),調(diào)用 collect
的協(xié)程可能會(huì)掛起什乙,直到流程關(guān)閉挽封。
繼續(xù)前面的示例,這里是一個(gè) ViewModel 的簡單實(shí)現(xiàn)臣镣,它使用來自存儲(chǔ)庫層的數(shù)據(jù):
class LatestNewsViewModel(
private val newsRepository: NewsRepository
) : ViewModel() {
init {
viewModelScope.launch {
// 使用 collect 觸發(fā)流并使用其元素
newsRepository.favoriteLatestNews.collect { favoriteNews ->
// 使用最新喜歡的新聞資訊更新視圖
}
}
}
}
收集數(shù)據(jù)流會(huì)觸發(fā)提供方刷新最新消息辅愿,并以固定時(shí)間間隔發(fā)出網(wǎng)絡(luò)請求的結(jié)果。 由于提供方在 while(true) 循環(huán)中始終保持活動(dòng)狀態(tài)忆某,因此當(dāng) ViewModel
被清除并 viewModelScope
被取消時(shí)点待,數(shù)據(jù)流將被關(guān)閉。
收集數(shù)據(jù)流可能會(huì)因以下原因停止:
- 如上例所示弃舒,協(xié)程收集被取消癞埠。此操作也會(huì)讓底層提供方停止活動(dòng)。
- 提供方完成發(fā)出數(shù)據(jù)項(xiàng)聋呢。在這種情況下苗踪,數(shù)據(jù)流將關(guān)閉,調(diào)用
collect
的協(xié)程則繼續(xù)執(zhí)行坝冕。
除非使用其他中間運(yùn)算符指定流徒探,否則數(shù)據(jù)流始終為冷數(shù)據(jù)并延遲執(zhí)行。 這意味著每次在流上調(diào)用終端操作符時(shí)都會(huì)執(zhí)行提供方的代碼喂窟。 在前面的示例中测暗,擁有多個(gè)流收集器會(huì)導(dǎo)致數(shù)據(jù)源在不同的固定時(shí)間間隔內(nèi)多次獲取最新消息。 要在多個(gè)消費(fèi)者同時(shí)收集時(shí)優(yōu)化和共享數(shù)據(jù)流磨澡,請使用 shareIn 運(yùn)算符碗啄。
捕捉意外的異常
提供方的數(shù)據(jù)實(shí)現(xiàn)可以來自第三方庫。 這意味著它可能會(huì)拋出意外的異常稳摄。 要處理這些異常稚字,請使用 catch 中間運(yùn)算符。
class LatestNewsViewModel(
private val newsRepository: NewsRepository
) : ViewModel() {
init {
viewModelScope.launch {
newsRepository.favoriteLatestNews
// 中介捕獲操作員。 如果拋出異常胆描,
// 捕獲并更新 UI
.catch { exception -> notifyError(exception) }
.collect { favoriteNews ->
// 使用最新喜歡的新聞資訊更新視圖
}
}
}
}
在前面的示例中瘫想,當(dāng)發(fā)生異常時(shí),不會(huì)調(diào)用 collect
lambda昌讲,因?yàn)樯形词盏叫聰?shù)據(jù)項(xiàng)国夜。
catch
還可執(zhí)行 emit
操作,向數(shù)據(jù)流發(fā)出數(shù)據(jù)項(xiàng)短绸。示例存儲(chǔ)庫層可以改為對緩存值執(zhí)行 emit
操作:
class NewsRepository(...) {
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
.map { news -> news.filter { userData.isFavoriteTopic(it) } }
.onEach { news -> saveInCache(news) }
// 如果發(fā)生錯(cuò)誤车吹,則發(fā)出最后緩存的值
.catch { exception -> emit(lastCachedNews()) }
}
在此示例中,當(dāng)發(fā)生異常時(shí)醋闭,將調(diào)用 collect
lambda窄驹,因?yàn)橛捎诋惓6鴮⑿聰?shù)據(jù)項(xiàng)發(fā)送到數(shù)據(jù)流中。
在不同的 CoroutineContext 中執(zhí)行
默認(rèn)情況下证逻,flow
構(gòu)建器的提供方在從它收集的協(xié)程的 CoroutineContext
中執(zhí)行乐埠,并且如前所述,它不能從不同的 CoroutineContext
對值執(zhí)行 emit
操作瑟曲。 在某些情況下饮戳,這種行為可能是不可取的。 例如洞拨,在本文章中使用的示例中扯罐,存儲(chǔ)庫層不應(yīng)在 viewModelScope
使用的 Dispatchers.Main
上執(zhí)行操作。
要更改流的 CoroutineContext
烦衣,請使用中間運(yùn)算符 flowOn
歹河。 flowOn
改變了上游流的 CoroutineContext
,這意味提供方和任何在 flowOn
之前(或之上)應(yīng)用的中間操作符花吟。 下游數(shù)據(jù)流(晚于 flowOn
的中間運(yùn)算符和使用方)不受影響秸歧,并會(huì)在 CoroutineContext 上執(zhí)行以從數(shù)據(jù)流執(zhí)行 collect
操作。 如果有多個(gè) flowOn
操作符衅澈,每個(gè)操作符都會(huì)從其當(dāng)前位置更改上游數(shù)據(jù)流键菱。
class NewsRepository(
private val newsRemoteDataSource: NewsRemoteDataSource,
private val userData: UserData,
private val defaultDispatcher: CoroutineDispatcher
) {
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
.map { news -> // 在默認(rèn)調(diào)度程序上執(zhí)行
news.filter { userData.isFavoriteTopic(it) }
}
.onEach { news -> // 在默認(rèn)調(diào)度程序上執(zhí)行
saveInCache(news)
}
// flowOn 影響上游流 ↑
.flowOn(defaultDispatcher)
// 下游流 ↓ 不受影響
.catch { exception -> // 在消費(fèi)者的上下文中執(zhí)行
emit(lastCachedNews())
}
}
使用此代碼,·
onEach
和 map
運(yùn)算符使用 defaultDispatcher
今布,而 catch
運(yùn)算符和使用者在 viewModelScope
使用的 Dispatchers.Main
上執(zhí)行经备。
由于數(shù)據(jù)源層正在執(zhí)行 I/O 工作,因此你應(yīng)該使用針對 I/O 操作進(jìn)行了優(yōu)化的調(diào)度程序:
class NewsRemoteDataSource(
...,
private val ioDispatcher: CoroutineDispatcher
) {
val latestNews: Flow<List<ArticleHeadline>> = flow {
// 在 IO 調(diào)度程序上執(zhí)行
...
}
.flowOn(ioDispatcher)
}
Jetpack 庫中的數(shù)據(jù)流
Flow 已集成到許多 Jetpack 庫中部默,并且在 Android 第三方庫中很受歡迎侵蒙。 Flow 非常適合實(shí)時(shí)數(shù)據(jù)更新和無限的數(shù)據(jù)流。
你可以將 Flow 與 Room 結(jié)合使用傅蹂,以便在數(shù)據(jù)庫發(fā)生更改時(shí)收到通知纷闺。 使用數(shù)據(jù)訪問對象 (DAO) 時(shí)算凿,返回 Flow 類型以獲取實(shí)時(shí)更新。
@Dao
abstract class ExampleDao {
@Query("SELECT * FROM Example")
abstract fun getExamples(): Flow<List<Example>>
}
每當(dāng) Example
數(shù)據(jù)表發(fā)生更改時(shí)犁功,系統(tǒng)都會(huì)發(fā)出包含數(shù)據(jù)庫新數(shù)據(jù)項(xiàng)的新列表氓轰。
將 callback-based APIs 轉(zhuǎn)換為數(shù)據(jù)流
callbackFlow 是一個(gè)數(shù)據(jù)流構(gòu)建器,可讓你將 callback-based APIs轉(zhuǎn)換為 數(shù)據(jù)流浸卦。
與 flow
構(gòu)建器不同戒努,callbackFlow
允許使用 send 函數(shù)從不同的 CoroutineContext
或使用 offer
函數(shù)在協(xié)程外部發(fā)出值。
在協(xié)程內(nèi)部镐躲,callbackFlow
使用一個(gè) channel,它在概念上與阻塞隊(duì)列非常相似侍筛。 通道都有容量配置萤皂,限定了可緩沖元素?cái)?shù)的上限。 callbackFlow
中創(chuàng)建的通道的默認(rèn)容量為 64 個(gè)元素匣椰。 當(dāng)你嘗試將新元素添加到完整頻道時(shí)裆熙,send
會(huì)將數(shù)據(jù)提供方掛起,直到有新元素的空間禽笑,而 offer
不會(huì)將相關(guān)元素添加到通道中入录,并會(huì)立即返回 false。
Kotlin Channel和阻塞隊(duì)列很類似佳镜,區(qū)別在于Channel用掛起的send操作代替了阻塞的put僚稿,用掛起的receive操作代替了阻塞的take。
使用lifecycle-runtime-ktx庫中的launchWhenX方法蟀伸,對Channel的收集協(xié)程會(huì)在組件生命周期 < X時(shí)掛起蚀同,從而避免異常。也可以使用repeatOnLifecycle(State) 來在UI層收集啊掏,當(dāng)生命周期 < State時(shí)蠢络,會(huì)取消協(xié)程,恢復(fù)時(shí)再重新啟動(dòng)協(xié)程迟蜜。
看起來使用Channel承載事件是個(gè)不錯(cuò)的選擇髓霞,并且一般來說事件分發(fā)都是一對一酸茴,因此并不需要支持一對多的BroadcastChannel(后者已經(jīng)逐漸被廢棄薪捍,被SharedFlow替代)
如何創(chuàng)建Channel酪穿?看一下Channel對外暴露可供使用的構(gòu)造方法被济,考慮傳入合適的參數(shù)经磅。
public fun <E> Channel(
// 緩沖區(qū)容量预厌,當(dāng)超出容量時(shí)會(huì)觸發(fā)onBufferOverflow指定的策略
capacity: Int = RENDEZVOUS,
// 緩沖區(qū)溢出策略轧叽,默認(rèn)為掛起炭晒,還有DROP_OLDEST和DROP_LATEST
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
// 處理元素未能成功送達(dá)處理的情況,如訂閱者被取消或者拋異常
onUndeliveredElement: ((E) -> Unit)? = null
): Channel<E>
首先Channel是熱的屿笼,即任意時(shí)刻發(fā)送元素到Channel即使沒有訂閱者也會(huì)執(zhí)行驴一。所以考慮到存在訂閱者協(xié)程被取消時(shí)發(fā)送事件的情況肝断,即存在Channel處在無訂閱者時(shí)的空檔期收到事件情況。例如當(dāng)Activity使用repeatOnLifecycle方法啟動(dòng)協(xié)程去消費(fèi)ViewModel持有的Channel里的事件消息恰响,當(dāng)前Activity因?yàn)樘幱赟TOPED狀態(tài)而取消了協(xié)程首有。
StateFlow(狀態(tài)流) 和 SharedFlow(共享流)
StateFlow
和 SharedFlow
是 Flow API井联,允許數(shù)據(jù)流以最優(yōu)方式發(fā)出狀態(tài)更新并向多個(gè)使用方發(fā)出值烙常。
StateFlow和SharedFlow侦副,兩者擁有Channel的很多特性跃洛,可以看作是將Flow推向臺(tái)前,將Channel雪藏幕后的一手重要操作穴张。
首先二者都是熱流皂甘,并支持在構(gòu)造器外發(fā)射數(shù)據(jù)偿枕。簡單看下它們的構(gòu)造方法
public fun <T> MutableSharedFlow(
// 每個(gè)新的訂閱者訂閱時(shí)收到的回放的數(shù)目渐夸,默認(rèn)0
replay: Int = 0,
// 除了replay數(shù)目之外,緩存的容量奥额,默認(rèn)0
extraBufferCapacity: Int = 0,
// 緩存區(qū)溢出時(shí)的策略韩肝,默認(rèn)為掛起哀峻。只有當(dāng)至少有一個(gè)訂閱者時(shí)漾峡,onBufferOverflow才會(huì)生效生逸。當(dāng)無訂閱者時(shí),只有最近replay數(shù)目的值會(huì)保存遍尺,并且onBufferOverflow無效。
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
)
//MutableStateFlow等價(jià)于使用如下構(gòu)造參數(shù)的SharedFlow
MutableSharedFlow(
replay = 1,
onBufferOverflow = BufferOverflow.DROP_OLDEST
)
StateFlow
StateFlow
是一個(gè)狀態(tài)容器式可觀察數(shù)據(jù)流三热,可以向其收集器發(fā)出當(dāng)前狀態(tài)更新和新狀態(tài)更新呐能。還可通過其 value
屬性讀取當(dāng)前狀態(tài)值。如需更新狀態(tài)并將其發(fā)送到數(shù)據(jù)流偎漫,請為 MutableStateFlow
類的 value
屬性分配一個(gè)新值骑丸。
在 Android 中通危,StateFlow
非常適合需要讓可變狀態(tài)保持可觀察的類。
按照 Kotlin 數(shù)據(jù)流中的示例逆害,可以從 LatestNewsViewModel
公開 StateFlow
,以便 View
能夠監(jiān)聽界面狀態(tài)更新相艇,并自行使屏幕狀態(tài)在配置更改后繼續(xù)有效坛芽。
class LatestNewsViewModel(
private val newsRepository: NewsRepository
) : ViewModel() {
// Backing property to avoid state updates from other classes
private val _uiState = MutableStateFlow(LatestNewsUiState.Success(emptyList()))
// The UI collects from this StateFlow to get its state updates
val uiState: StateFlow<LatestNewsUiState> = _uiState
init {
viewModelScope.launch {
newsRepository.favoriteLatestNews
// Update View with the latest favorite news
// Writes to the value property of MutableStateFlow,
// adding a new element to the flow and updating all
// of its collectors
.collect { favoriteNews ->
_uiState.value = LatestNewsUiState.Success(favoriteNews)
}
}
}
}
// Represents different states for the LatestNews screen
sealed class LatestNewsUiState {
data class Success(news: List<ArticleHeadline>): LatestNewsUiState()
data class Error(exception: Throwable): LatestNewsUiState()
}
負(fù)責(zé)更新 MutableStateFlow
的類是提供方活喊,從 StateFlow
收集的所有類都是使用方量愧。與使用 flow
構(gòu)建器構(gòu)建的冷數(shù)據(jù)流不同钾菊,StateFlow
是熱數(shù)據(jù)流:從此類數(shù)據(jù)流收集數(shù)據(jù)不會(huì)觸發(fā)任何提供方代碼。StateFlow
始終處于活躍狀態(tài)并存于內(nèi)存中偎肃,而且只有在垃圾回收根中未涉及對它的其他引用時(shí)煞烫,它才符合垃圾回收條件。
當(dāng)新使用方開始從數(shù)據(jù)流中收集數(shù)據(jù)時(shí)软棺,它將接收信息流中的最近一個(gè)狀態(tài)及任何后續(xù)狀態(tài)。您可在 LiveData
等其他可觀察類中找到此操作行為尤勋。
與處理任何其他數(shù)據(jù)流一樣喘落,View
會(huì)監(jiān)聽 StateFlow
:
class LatestNewsActivity : AppCompatActivity() {
private val latestNewsViewModel = // getViewModel()
override fun onCreate(savedInstanceState: Bundle?) {
...
// 在生命周期范圍內(nèi)啟動(dòng)協(xié)程
lifecycleScope.launch {
// 每次 ifecycle 處于 STARTED 狀態(tài)(或更高)時(shí)赌朋,repeatOnLifecycle 在新的協(xié)程中啟動(dòng)塊团甲,
// 并在它停止時(shí)取消它嘀趟。
repeatOnLifecycle(Lifecycle.State.STARTED) {
// 觸發(fā)流程并開始監(jiān)聽值。
// 請注意哩罪,當(dāng)生命周期開始時(shí)會(huì)發(fā)生這種情況框弛,當(dāng)生命周期停止時(shí)會(huì)停止收集
latestNewsViewModel.uiState.collect { uiState ->
// 收到的新值
when (uiState) {
is LatestNewsUiState.Success -> showFavoriteNews(uiState.news)
is LatestNewsUiState.Error -> showError(uiState.exception)
}
}
}
}
}
}
警告:如果需要更新界面,切勿使用 launch
或 launchIn
擴(kuò)展函數(shù)從界面直接收集數(shù)據(jù)流狞山。即使 View 不可見伊约,這些函數(shù)也會(huì)處理事件。此行為可能會(huì)導(dǎo)致應(yīng)用崩潰。 為避免這種情況肴盏,請使用 repeatOnLifecycle
API(如上所示)谴垫。
注意:repeatOnLifecycle
API 僅在 androidx.lifecycle:lifecycle-runtime-ktx:2.4.0-alpha01
庫及更高版本中提供焦辅。
如需將任何數(shù)據(jù)流轉(zhuǎn)換為 StateFlow
廉油,請使用 stateIn
中間運(yùn)算符。
StateFlow睡陪、Flow 和 LiveData
StateFlow
和 LiveData
具有相似之處宝穗。兩者都是可觀察的數(shù)據(jù)容器類逮矛,并且在應(yīng)用架構(gòu)中使用時(shí),兩者都遵循相似模式。
但請注意循帐,StateFlow
和 LiveData
的行為確實(shí)有所不同:
-
StateFlow
需要將初始狀態(tài)傳遞給構(gòu)造函數(shù),而LiveData
不需要操软。 - 當(dāng) View 進(jìn)入
STOPPED
狀態(tài)時(shí)嘁锯,LiveData.observe()
會(huì)自動(dòng)取消注冊使用方,而從StateFlow
或任何其他數(shù)據(jù)流收集數(shù)據(jù)的操作并不會(huì)自動(dòng)停止聂薪。如需實(shí)現(xiàn)相同的行為家乘,您需要從Lifecycle.repeatOnLifecycle
塊收集數(shù)據(jù)流。
利用 shareIn
使冷數(shù)據(jù)流變?yōu)闊釘?shù)據(jù)流
StateFlow
是熱數(shù)據(jù)流藏澳,只要該數(shù)據(jù)流被收集仁锯,或?qū)λ娜魏纹渌迷诶厥崭写嬖冢摂?shù)據(jù)流就會(huì)一直存于內(nèi)存中翔悠。您可以使用 shareIn
運(yùn)算符將冷數(shù)據(jù)流變?yōu)闊釘?shù)據(jù)流业崖。
以在 Kotlin 數(shù)據(jù)流中創(chuàng)建的 callbackFlow
為例,您無需為每個(gè)收集器都創(chuàng)建一個(gè)新數(shù)據(jù)流蓄愁,而是可以使用 shareIn
在收集器間共享從 Firestore 檢索到的數(shù)據(jù)双炕。您需要傳入以下內(nèi)容:
- 用于共享數(shù)據(jù)流的
CoroutineScope
。此作用域函數(shù)的生命周期應(yīng)長于任何使用方撮抓,以使共享數(shù)據(jù)流在足夠長的時(shí)間內(nèi)保持活躍狀態(tài)妇斤。 - 要重放 (replay) 至每個(gè)新收集器的數(shù)據(jù)項(xiàng)數(shù)量。
- “啟動(dòng)”行為政策丹拯。
class NewsRemoteDataSource(...,
private val externalScope: CoroutineScope,
) {
val latestNews: Flow<List<ArticleHeadline>> = flow {
...
}.shareIn(
externalScope,
replay = 1,
started = SharingStarted.WhileSubscribed()
)
}
在此示例中站超,latestNews
數(shù)據(jù)流將上次發(fā)出的數(shù)據(jù)項(xiàng)重放至新收集器,只要 externalScope
處于活躍狀態(tài)并且存在活躍收集器咽笼,它就會(huì)一直處于活躍狀態(tài)顷编。當(dāng)存在活躍訂閱者時(shí)戚炫,SharingStarted.WhileSubscribed()
“啟動(dòng)”政策將使上游提供方保持活躍狀態(tài)剑刑。可使用其他啟動(dòng)政策,例如使用 SharingStarted.Eagerly
可立即啟動(dòng)提供方施掏,使用 SharingStarted.Lazily
可在第一個(gè)訂閱者出現(xiàn)后開始共享數(shù)據(jù)钮惠,并使數(shù)據(jù)流永遠(yuǎn)保持活躍狀態(tài)。
注意:如需詳細(xì)了解 externalScope
的模式七芭,請查看這篇文章素挽。
SharedFlow
shareIn
函數(shù)會(huì)返回一個(gè)熱數(shù)據(jù)流 SharedFlow
,此數(shù)據(jù)流會(huì)向從其中收集值的所有使用方發(fā)出數(shù)據(jù)狸驳。SharedFlow
是 StateFlow
的可配置性極高的泛化數(shù)據(jù)流预明。
您無需使用 shareIn
即可創(chuàng)建 SharedFlow
。例如耙箍,您可以使用 SharedFlow
將 tick 信息發(fā)送到應(yīng)用的其余部分撰糠,以便讓所有內(nèi)容定期同時(shí)刷新。除了獲取最新資訊之外辩昆,您可能還想要使用用戶最喜歡的主題集刷新用戶信息部分阅酪。在以下代碼段中,TickHandler
公開了 SharedFlow
汁针,以便其他類知道要在何時(shí)刷新其內(nèi)容术辐。與 StateFlow
一樣,請?jiān)陬愔惺褂妙愋?MutableSharedFlow
的后備屬性將數(shù)據(jù)項(xiàng)發(fā)送給數(shù)據(jù)流:
// 當(dāng)應(yīng)用程序的內(nèi)容需要刷新時(shí)集中的類
class TickHandler(
private val externalScope: CoroutineScope,
private val tickIntervalMs: Long = 5000
) {
// Backing property to avoid flow emissions from other classes
private val _tickFlow = MutableSharedFlow<Unit>(replay = 0)
val tickFlow: SharedFlow<Event<String>> = _tickFlow
init {
externalScope.launch {
while(true) {
_tickFlow.emit(Unit)
delay(tickIntervalMs)
}
}
}
}
class NewsRepository(
...,
private val tickHandler: TickHandler,
private val externalScope: CoroutineScope
) {
init {
externalScope.launch {
// Listen for tick updates
tickHandler.tickFlow.collect {
refreshLatestNews()
}
}
}
suspend fun refreshLatestNews() { ... }
...
}
您可通過以下方式自定義 SharedFlow
行為:
通過 replay
施无,您可以針對新訂閱者重新發(fā)送多個(gè)之前已發(fā)出的值辉词。
通過 onBufferOverflow
,您可以指定相關(guān)政策來處理緩沖區(qū)中已存滿要發(fā)送的數(shù)據(jù)項(xiàng)的情況猾骡。默認(rèn)值為 BufferOverflow.SUSPEND
较屿,這會(huì)使調(diào)用方掛起。其他選項(xiàng)包括 DROP_LATEST
或 DROP_OLDEST
卓练。
MutableSharedFlow
還具有 subscriptionCount
屬性隘蝎,其中包含處于活躍狀態(tài)的收集器的數(shù)量,以便您相應(yīng)地優(yōu)化業(yè)務(wù)邏輯襟企。MutableSharedFlow
還包含一個(gè) resetReplayCache
函數(shù)嘱么,供您在不想重放已向數(shù)據(jù)流發(fā)送的最新信息的情況下使用。
SharedFlow
在無訂閱者時(shí)會(huì)丟棄數(shù)據(jù)顽悼。SharedFlow
類似BroadcastChannel曼振, 支持被多個(gè)訂閱者訂閱,可以使同一個(gè)事件會(huì)被多次消費(fèi)蔚龙。
相關(guān)官方文檔:
https://developer.android.com/kotlin/flow
https://kotlinlang.org/docs/flow.html