-
流上下文
流的收集總是在調(diào)用協(xié)程的上下文中發(fā)生锯茄。例如,如果有一個流 simple,然后以下代碼在它的編寫者指定的上下文中運(yùn)行肌幽,而無論流 simple 的實(shí)現(xiàn)細(xì)節(jié)如何:
withContext(context) {
simple().collect { value ->
println(value) // 運(yùn)行在指定上下文中
}
}
流的該屬性稱為 上下文保存 晚碾。
所以默認(rèn)的,flow { ... } 構(gòu)建器中的代碼運(yùn)行在相應(yīng)流的收集器提供的上下文中喂急。舉例來說格嘁,考慮打印線程的一個 simple 函數(shù)的實(shí)現(xiàn), 它被調(diào)用并發(fā)射三個數(shù)字:
fun simple(): Flow<Int> = flow {
log("Started simple flow")
for (i in 1..3) {
emit(i)
}
}
fun main() = runBlocking<Unit> {
simple().collect { value -> log("Collected $value") }
}
[main @coroutine#1] Started simple flow
[main @coroutine#1] Collected 1
[main @coroutine#1] Collected 2
[main @coroutine#1] Collected 3
由于 simple().collect 是在主線程調(diào)用的廊移,那么 simple 的流主體也是在主線程調(diào)用的糕簿。 這是快速運(yùn)行或異步代碼的理想默認(rèn)形式,它不關(guān)心執(zhí)行的上下文并且不會阻塞調(diào)用者画机。
-
withContext 發(fā)出錯誤
然而冶伞,長時間運(yùn)行的消耗 CPU 的代碼也許需要在 Dispatchers.Default 上下文中執(zhí)行,并且更新 UI 的代碼也許需要在 Dispatchers.Main 中執(zhí)行步氏。通常响禽,withContext 用于在 Kotlin 協(xié)程中改變代碼的上下文,但是 flow {...}
構(gòu)建器中的代碼必須遵循上下文保存屬性荚醒,并且不允許從其他上下文中發(fā)射(emit)芋类。
報錯代碼
fun simple(): Flow<Int> = flow {
// 在流構(gòu)建器中更改消耗 CPU 代碼的上下文的錯誤方式
kotlinx.coroutines.withContext(Dispatchers.Default) {
for (i in 1..3) {
Thread.sleep(100) // 假裝我們以消耗 CPU 的方式進(jìn)行計(jì)算
emit(i) // 發(fā)射下一個值
}
}
}
fun main() = runBlocking<Unit> {
simple().collect { value -> println(value) }
}
error
Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated:
Flow was collected in [CoroutineId(1), "coroutine#1":BlockingCoroutine{Active}@4a03ee16, BlockingEventLoop@c4f7368],
but emission happened in [CoroutineId(1), "coroutine#1":DispatchedCoroutine{Active}@673ab05c, Dispatchers.Default].
Please refer to 'flow' documentation or use 'flowOn' instead
at kotlinx.coroutines.flow.internal.SafeCollector_commonKt.checkContext (SafeCollector.common.kt:84)
at kotlinx.coroutines.flow.internal.SafeCollector.checkContext (SafeCollector.kt:88)
at kotlinx.coroutines.flow.internal.SafeCollector.emit (SafeCollector.kt:74)
-
flowOn 操作符
例外的是 flowOn 函數(shù),該函數(shù)用于更改流發(fā)射的上下文界阁。 以下示例展示了更改流上下文的正確方法侯繁,該示例還通過打印相應(yīng)線程的名字以展示它們的工作方式:
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
Thread.sleep(100) // 假裝我們以消耗 CPU 的方式進(jìn)行計(jì)算
log("Emitting $i")
emit(i) // 發(fā)射下一個值
}
}.flowOn(Dispatchers.Default) // 在流構(gòu)建器中改變消耗 CPU 代碼上下文的正確方式
fun main() = runBlocking<Unit> {
simple().collect { value ->
log("Collected $value")
}
}
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 1
[main @coroutine#1] Collected 1
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 2
[main @coroutine#1] Collected 2
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 3
[main @coroutine#1] Collected 3
這里要觀察的另一件事是 flowOn 操作符已改變流的默認(rèn)順序性。 現(xiàn)在收集發(fā)生在一個協(xié)程中(“coroutine#1”)而發(fā)射發(fā)生在運(yùn)行于另一個線程中與收集協(xié)程并發(fā)運(yùn)行的另一個協(xié)程(“coroutine#2”)中泡躯。當(dāng)上游流必須改變其上下文中的 CoroutineDispatcher 的時候贮竟,flowOn 操作符創(chuàng)建了另一個協(xié)程。
-
緩沖
從收集流所花費(fèi)的時間來看较剃,將流的不同部分運(yùn)行在不同的協(xié)程中將會很有幫助咕别,特別是當(dāng)涉及到長時間運(yùn)行的異步操作時。例如写穴,考慮一種情況惰拱, 一個 simple 流的發(fā)射很慢,它每花費(fèi) 100 毫秒才產(chǎn)生一個元素啊送;而收集器也非常慢偿短, 需要花費(fèi) 300 毫秒來處理元素。讓我們看看從該流收集三個數(shù)字要花費(fèi)多長時間:
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
delay(100) // 假裝我們異步等待了 100 毫秒
emit(i) // 發(fā)射下一個值
}
}
fun main() = runBlocking<Unit> {
val time = measureTimeMillis {
simple().collect { value ->
delay(300) // 假裝我們花費(fèi) 300 毫秒來處理它
println(value)
}
}
println("Collected in $time ms")
}
1
2
3
Collected in 1217 ms
它會產(chǎn)生這樣的結(jié)果馋没,整個收集過程大約需要 1200 毫秒(3 個數(shù)字昔逗,每個花費(fèi) 400 毫秒):
我們可以在流上使用 buffer 操作符來并發(fā)運(yùn)行這個 simple
流中發(fā)射元素的代碼以及收集的代碼, 而不是順序運(yùn)行它們:
val time = measureTimeMillis {
simple()
.buffer() // 緩沖發(fā)射項(xiàng)篷朵,無需等待
.collect { value ->
delay(300) // 假裝我們花費(fèi) 300 毫秒來處理它
println(value)
}
}
println("Collected in $time ms")
1
2
3
Collected in 1047 ms
它產(chǎn)生了相同的數(shù)字纤子,只是更快了,由于我們高效地創(chuàng)建了處理流水線, 僅僅需要等待第一個數(shù)字產(chǎn)生的 100 毫秒以及處理每個數(shù)字各需花費(fèi)的 300 毫秒控硼。這種方式大約花費(fèi)了 1000 毫秒來運(yùn)行:
注意泽论,當(dāng)必須更改 時,[flowOn]操作符使用了相同的緩沖機(jī)制卡乾, 但是我們在這里顯式地請求緩沖而不改變執(zhí)行上下文翼悴。
-
合并
當(dāng)流代表部分操作結(jié)果或操作狀態(tài)更新時,可能沒有必要處理每個值幔妨,而是只處理最新的那個鹦赎。在本示例中,當(dāng)收集器處理它們太慢的時候误堡, conflate 操作符可以用于跳過中間值古话。構(gòu)建前面的示例:
val time = measureTimeMillis {
simple()
.conflate() // 合并發(fā)射項(xiàng),不對每個值進(jìn)行處理
.collect { value ->
delay(300) // 假裝我們花費(fèi) 300 毫秒來處理它
println(value)
}
}
println("Collected in $time ms")
1
3
Collected in 746 ms
我們看到锁施,雖然第一個數(shù)字仍在處理中陪踩,但第二個和第三個數(shù)字已經(jīng)產(chǎn)生,因此第二個是 conflated 悉抵,只有最新的(第三個)被交付給收集器