?異步掛起函數(shù)能夠返回單一值,那么我們?nèi)绾畏祷囟鄠€(gè)異步計(jì)算的值呢旨椒?而這個(gè)就是Kotlin Flow需要解決地。
Representing multiple values
?在kotlin漩氨,多個(gè)值可以由Collections表示饭耳。
fun foo(): List<Int> = listOf(1, 2, 3)
fun main() {
foo().forEach { value -> println(value) }
}
以上代碼輸出如下:
1
2
3
Sequences
?如果我們使用CPU消費(fèi)型阻塞代碼生產(chǎn)numbers,我們可以使用Sequences表示這些numbers睛约。
fun foo(): Sequence<Int> = sequence { // sequence builder
for (i in 1..3) {
Thread.sleep(100) // pretend we are computing it
yield(i) // yield next value
}
}
fun main() {
foo().forEach { value -> println(value) }
}
?以上代碼會(huì)每隔100ms鼎俘,打印出一個(gè)數(shù)字。
Suspending functions
?以上代碼會(huì)阻塞主線程辩涝,我們可以給函數(shù)添加suspend修飾符來實(shí)現(xiàn)異步計(jì)算贸伐。
suspend fun foo(): List<Int> {
delay(1000) // pretend we are doing something asynchronous here
return listOf(1, 2, 3)
}
fun main() = runBlocking<Unit> {
foo().forEach { value -> println(value) }
}
?使用List<Int>返回類型意味著,我們需要一次性返回所有值怔揩。為了表示異步計(jì)算的值的流捉邢,我們可以使用Flow<Int>,就像之前使用Sequence<Int>同步計(jì)算值一樣商膊。
fun foo(): Flow<Int> = flow { // flow builder
for (i in 1..3) {
delay(100) // pretend we are doing something useful here
emit(i) // emit next value
}
}
fun main() = runBlocking<Unit> {
// Launch a concurrent coroutine to check if the main thread is blocked
launch {
for (k in 1..3) {
println("I'm not blocked $k")
delay(100)
}
}
// Collect the flow
foo().collect { value -> println(value) }
}
輸出如下:
I'm not blocked 1
1
I'm not blocked 2
2
I'm not blocked 3
3
Flow和之前例子的差別:
- Flow 類型 builder 函數(shù)稱為 flow
- flow{}內(nèi)部代碼是可以掛起的
- 函數(shù) foo()不再需要suspend修飾符
- 使用emit函數(shù)發(fā)射值
- 使用collect函數(shù)收集值
我們可以使用Thread.Sleep 來替換 delay,那么當(dāng)前Main Thread就會(huì)被阻塞
Flows are cold
?Flow是和sequence一樣的code stream伏伐,在flow內(nèi)的代碼塊只有到flow開始被collected時(shí)才開始運(yùn)行;
fun foo(): Flow<Int> = flow {
println("Flow started")
for (i in 1..3) {
delay(100)
emit(i)
}
}
fun main() = runBlocking<Unit> {
println("Calling foo...")
val flow = foo()
println("Calling collect...")
flow.collect { value -> println(value) }
println("Calling collect again...")
flow.collect { value -> println(value) }
}
輸出如下:
Calling foo...
Calling collect...
Flow started
1
2
3
Calling collect again...
Flow started
1
2
3
?這個(gè)是返回flow的foo()函數(shù)不需要被標(biāo)記為suspend的關(guān)鍵理由.foo()函數(shù)會(huì)馬上返回不會(huì)有任何等待晕拆,flow每次都是在collect調(diào)用之后才開始執(zhí)行藐翎,這就是為什么我們?cè)谡{(diào)用collect之后才打印出來 "Flow started"。
Flow cancellation
?Flow堅(jiān)持使用通用的協(xié)作式協(xié)程取消方式实幕。flow底層并沒有采用附加的取消點(diǎn)阱高。對(duì)于取消這是完全透明的。和往常一樣茬缩,如果flow是在一個(gè)掛起函數(shù)內(nèi)被掛起了赤惊,那么flow collection是可以被取消的,并且在其他情況下是不能被取消的凰锡。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun foo(): Flow<Int> = flow {
for (i in 1..3) {
delay(100)
println("Emitting $i")
emit(i)
}
}
fun main() = runBlocking<Unit> {
withTimeoutOrNull(250) { // Timeout after 250ms
foo().collect { value -> println(value) }
}
println("Done")
}
輸出如下:
Emitting 1
1
Emitting 2
2
Done
Flow builders
?在上述例子中未舟,flow { }構(gòu)造者是最簡(jiǎn)單的圈暗。以下還有更簡(jiǎn)單的實(shí)現(xiàn):
- flowOf定義了一個(gè)flow用來emit一個(gè)固定的集合;
- 各種collections和sequences都可以通過asFlow()函數(shù)來轉(zhuǎn)換為flows;
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking<Unit> {
// Convert an integer range to a flow
(1..3).asFlow().collect { value -> println(value) }
}
Intermediate flow operators
?Flows可以通過操作符來進(jìn)行轉(zhuǎn)換,就如同你使用collections和sequences一樣裕膀。中間操作符用來應(yīng)用于上游flow任何產(chǎn)生下游flow员串。這些操作符都是冷啟動(dòng)的,就像flows一樣昼扛。對(duì)于這些操作符的調(diào)用也都不是掛起函數(shù)寸齐。它工作很快,返回新的轉(zhuǎn)換的flow的定義抄谐。
?基礎(chǔ)操作符也有著和map和filter類似的名字渺鹦。和sequences一個(gè)很重要的差別是在這些操作符內(nèi)部可以調(diào)用掛起函數(shù)。
?舉個(gè)例子蛹含,一個(gè)請(qǐng)求flow可以通過map操作符映射為result毅厚,即便這個(gè)請(qǐng)求是在一個(gè)掛起函數(shù)內(nèi)需要長(zhǎng)時(shí)間的運(yùn)行。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
suspend fun performRequest(request: Int): String {
delay(1000) // imitate long-running asynchronous work
return "response $request"
}
fun main() = runBlocking<Unit> {
(1..3).asFlow() // a flow of requests
.map { request -> performRequest(request) }
.collect { response -> println(response) }
}
輸出如下:
response 1
response 2
response 3
Transform operator
?在flow轉(zhuǎn)換操作符中浦箱,最經(jīng)常使用的就是transform吸耿,它可以用來模擬簡(jiǎn)單轉(zhuǎn)換,就和 map和filter一樣酷窥,也可以用來實(shí)現(xiàn)更加復(fù)雜地轉(zhuǎn)換咽安。可以每次emit任意數(shù)量地任意值蓬推。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
suspend fun performRequest(request: Int): String {
delay(1000) // imitate long-running asynchronous work
return "response $request"
}
fun main() = runBlocking<Unit> {
(1..3).asFlow() // a flow of requests
.transform { request ->
emit("Making request $request")
emit(performRequest(request))
}
.collect { response -> println(response) }
}
Making request 1
response 1
Making request 2
response 2
Making request 3
response 3
Size-limiting operators
?Size-limiting 中間操作符會(huì)比如take會(huì)取消flow繼續(xù)執(zhí)行妆棒,當(dāng)設(shè)置地limit已經(jīng)達(dá)到了設(shè)定值。協(xié)程取消總是會(huì)拋出一個(gè)異常拳氢,所以所有的資源管理函數(shù)對(duì)于取消操作都會(huì)添加比如try{}finally{}代碼塊募逞。
import kotlinx.coroutines.flow.*
fun numbers(): Flow<Int> = flow {
try {
emit(1)
emit(2)
println("This line will not execute")
emit(3)
} finally {
println("Finally in numbers")
}
}
fun main() = runBlocking<Unit> {
numbers()
.take(2) // take only the first two
.collect { value -> println(value) }
}
1
2
Finally in numbers
Terminal flow operators
?對(duì)于flows的末端操作符都是開始flow采集的掛起函數(shù)蛋铆。collect操作符是最基礎(chǔ)的馋评,除此以外還有其他操作符:
- toList和toSet.可以作集合轉(zhuǎn)換
- 操作符可以獲取第一個(gè)值并且確保flow只會(huì)emit一個(gè)值
- 使用reduce 和 fold 把flow 簡(jiǎn)化合并為一個(gè)值
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking<Unit> {
val sum = (1..5).asFlow()
.map { it * it } // squares of numbers from 1 to 5
.reduce { a, b -> a + b } // sum them (terminal operator)
println(sum)
}
55
Flows are sequential
?每一個(gè)flow集合都是順序執(zhí)行,除非應(yīng)用了某個(gè)特定地針對(duì)多個(gè)flow執(zhí)行地操作符刺啦。每一個(gè)值都是經(jīng)過中間操作符留特,從上游到達(dá)下游,最終到達(dá)末端操作符玛瘸。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking<Unit> {
(1..5).asFlow()
.filter {
println("Filter $it")
it % 2 == 0
}
.map {
println("Map $it")
"string $it"
}.collect {
println("Collect $it")
}
}
Filter 1
Filter 2
Map 2
Collect string 2
Filter 3
Filter 4
Map 4
Collect string 4
Filter 5
Flow context
?flow collection總是會(huì)在調(diào)用者協(xié)程發(fā)生蜕青。舉個(gè)例子,如果有一個(gè)foo flow糊渊,那么以下代碼會(huì)在作者指定地上下文中執(zhí)行右核,而不用去看foo flow的具體執(zhí)行細(xì)節(jié)。
withContext(context) {
foo.collect { value ->
println(value) // run in the specified context
}
}
?因此渺绒,flow{}內(nèi)的代碼是運(yùn)行在collector指定的上下文中贺喝。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
fun foo(): Flow<Int> = flow {
log("Started foo flow")
for (i in 1..3) {
emit(i)
}
}
fun main() = runBlocking<Unit> {
foo().collect { value -> log("Collected $value") }
}
[main @coroutine#1] Started foo flow
[main @coroutine#1] Collected 1
[main @coroutine#1] Collected 2
[main @coroutine#1] Collected 3
?因?yàn)閒oo().collect是在主線程中被調(diào)用的菱鸥。foo的flow內(nèi)部代碼也是在主線程中執(zhí)行。這是個(gè)完美地實(shí)現(xiàn)躏鱼,解決快速運(yùn)行或者異步代碼氮采,不用關(guān)心執(zhí)行環(huán)境和阻塞調(diào)用者線程。
Wrong emission withContext
?然而染苛,長(zhǎng)時(shí)間運(yùn)行CPU消費(fèi)型任務(wù)需要在Dispatchers.Default中執(zhí)行鹊漠,UI更新代碼需要在Dispatchers.Main中執(zhí)行。通常茶行,withContext用來切換kotlin 協(xié)程的上下文躯概。但是flow{}內(nèi)部的代碼必須要尊重上下文保留屬性,并且不允許從不同的上下文emit值拢军。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun foo(): Flow<Int> = flow {
// The WRONG way to change context for CPU-consuming code in flow builder
kotlinx.coroutines.withContext(Dispatchers.Default) {
for (i in 1..3) {
Thread.sleep(100) // pretend we are computing it in CPU-consuming way
emit(i) // emit next value
}
}
}
fun main() = runBlocking<Unit> {
foo().collect { value -> println(value) }
}
Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated:
Flow was collected in [CoroutineId(1), "coroutine#1":BlockingCoroutine{Active}@5511c7f8, BlockingEventLoop@2eac3323],
but emission happened in [CoroutineId(1), "coroutine#1":DispatchedCoroutine{Active}@2dae0000, DefaultDispatcher].
Please refer to 'flow' documentation or use 'flowOn' instead
at ...
flowOn operator
?以下展示正確切換flow上下文的方式:
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
fun foo(): Flow<Int> = flow {
for (i in 1..3) {
Thread.sleep(100) // pretend we are computing it in CPU-consuming way
log("Emitting $i")
emit(i) // emit next value
}
}.flowOn(Dispatchers.Default) // RIGHT way to change context for CPU-consuming code in flow builder
fun main() = runBlocking<Unit> {
foo().collect { value ->
log("Collected $value")
}
}
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 1
[main @coroutine#1] Collected 1
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 2
[main @coroutine#1] Collected 2
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 3
[main @coroutine#1] Collected 3
flowOn改變了flow本身的順序執(zhí)行上下文楞陷。collection發(fā)生在協(xié)程"coroutine#1"中,emission發(fā)生在協(xié)程"coroutine#2"并且是運(yùn)行咋另一個(gè)線程中茉唉,和collect操作是同時(shí)進(jìn)行地固蛾。當(dāng)必須要為上下文改變CoroutineDispatcher時(shí),flowOn操作符就會(huì)為上游flow創(chuàng)建一個(gè)新協(xié)程度陆。
Buffering
?在不同協(xié)程中運(yùn)行flow的不同部分艾凯,在整體立場(chǎng)上是非常有幫助的,特別是涉及到長(zhǎng)時(shí)間運(yùn)行的異步操作懂傀。舉個(gè)例子趾诗,當(dāng)foo()的flow的emission操作比較慢,比如沒100ms生產(chǎn)一個(gè)一個(gè)element蹬蚁,并且collector也比較慢恃泪,花費(fèi)300ms去處理一個(gè)元素。讓我門看看一個(gè)處理三個(gè)數(shù)字的flow需要多少時(shí)間:
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.system.*
fun foo(): Flow<Int> = flow {
for (i in 1..3) {
delay(100) // pretend we are asynchronously waiting 100 ms
emit(i) // emit next value
}
}
fun main() = runBlocking<Unit> {
val time = measureTimeMillis {
foo().collect { value ->
delay(300) // pretend we are processing it for 300 ms
println(value)
}
}
println("Collected in $time ms")
}
輸出如下犀斋,整個(gè)collection操作需要花費(fèi)大概1200ms贝乎。
1
2
3
Collected in 1220 ms
?我們可以使用 buffer操作符去并發(fā)執(zhí)行foo()方法里的emit代碼段,然后順序執(zhí)行collection操作叽粹。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.system.*
fun foo(): Flow<Int> = flow {
for (i in 1..3) {
delay(100) // pretend we are asynchronously waiting 100 ms
emit(i) // emit next value
}
}
fun main() = runBlocking<Unit> {
val time = measureTimeMillis {
foo()
.buffer() // buffer emissions, don't wait
.collect { value ->
delay(300) // pretend we are processing it for 300 ms
println(value)
}
}
println("Collected in $time ms")
}
?上述方式會(huì)更快生產(chǎn)numbers览效,因?yàn)槲覀冇行?chuàng)建了處理流程,只需要在第一個(gè)數(shù)字上等待100ms虫几,然后每個(gè)數(shù)字都花費(fèi)300ms去做處理锤灿。這樣方式會(huì)花費(fèi)大概1000ms。
1
2
3
Collected in 1071 ms
注意: flowOn操作符使用了相同的緩存機(jī)制辆脸,但是它必須切換CoroutineDispatcher,但是在這里但校,我們顯示請(qǐng)求了buffer而不是切換執(zhí)行上下文。
Conflation
?當(dāng)一個(gè)flow表示操作的部分結(jié)果或者操作狀態(tài)更新啡氢,它可能并不需要去處理每一個(gè)值状囱,但是需要處理最近的一個(gè)值州刽。在這種場(chǎng)景下, conflate操作符可以被用于忽略中間操作符浪箭,當(dāng)collector處理太慢穗椅。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.system.*
fun foo(): Flow<Int> = flow {
for (i in 1..3) {
delay(100) // pretend we are asynchronously waiting 100 ms
emit(i) // emit next value
}
}
fun main() = runBlocking<Unit> {
val time = measureTimeMillis {
foo()
.conflate() // conflate emissions, don't process each one
.collect { value ->
delay(300) // pretend we are processing it for 300 ms
println(value)
}
}
println("Collected in $time ms")
}
?從上面可以看出來,第一個(gè)數(shù)字會(huì)被處理奶栖,第二第三個(gè)也會(huì)被處理匹表,而第二個(gè)數(shù)字會(huì)被合并,只有第三個(gè)數(shù)字發(fā)送給collector進(jìn)行處理宣鄙。
1
3
Collected in 758 ms
Processing the latest value
? Conflation是一種對(duì)emitter和collector慢處理的一種加速方式袍镀。它通過丟棄一些值來做實(shí)現(xiàn)。另外一種方式就是通過取消一個(gè)慢處理collector然后重啟collector接受已經(jīng)發(fā)射的新值冻晤。有一族的xxxlatest操作符來執(zhí)行和xxx操作符同樣的基本邏輯苇羡。取消emit新值的代碼塊內(nèi)的代碼。我們把上個(gè)例子中的conflate改成collectlatest鼻弧。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.system.*
fun foo(): Flow<Int> = flow {
for (i in 1..3) {
delay(100) // pretend we are asynchronously waiting 100 ms
emit(i) // emit next value
}
}
fun main() = runBlocking<Unit> {
val time = measureTimeMillis {
foo()
.collectLatest { value -> // cancel & restart on the latest value
println("Collecting $value")
delay(300) // pretend we are processing it for 300 ms
println("Done $value")
}
}
println("Collected in $time ms")
}
?因?yàn)?a target="_blank">collectLatest花費(fèi)來300ms设江,每次發(fā)送一個(gè)新值都是100ms。我們看見代碼塊都是運(yùn)行在新值上攘轩,但是只會(huì)以最新值完成叉存。
Collecting 1
Collecting 2
Collecting 3
Done 3
Collected in 741 ms
Composing multiple flows
有不少方式來組合多個(gè)flow。
?就像kotlin標(biāo)準(zhǔn)庫(kù)內(nèi)的Sequence.zip擴(kuò)展函數(shù)一樣度帮,flows有zip操作符來組合不同的flows的值歼捏。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking<Unit> {
val nums = (1..3).asFlow() // numbers 1..3
val strs = flowOf("one", "two", "three") // strings
nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string
.collect { println(it) } // collect and print
}
1 -> one
2 -> two
3 -> three
Combine
?當(dāng)flow表示操作或者變量的最近值,它可能需要去執(zhí)行計(jì)算根據(jù)所依賴的相應(yīng)的flow的最近的值笨篷,并且會(huì)重新計(jì)算瞳秽,當(dāng)上游flow發(fā)射新值的時(shí)候。相應(yīng)的操作符族被稱作combine率翅。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking<Unit> {
val nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms
val strs = flowOf("one", "two", "three").onEach { delay(400) } // strings every 400 ms
val startTime = System.currentTimeMillis() // remember the start time
nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string with "zip"
.collect { value -> // collect and print
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
}
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking<Unit> {
val nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms
val strs = flowOf("one", "two", "three").onEach { delay(400) } // strings every 400 ms
val startTime = System.currentTimeMillis() // remember the start time
nums.combine(strs) { a, b -> "$a -> $b" } // compose a single string with "combine"
.collect { value -> // collect and print
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
}
1 -> one at 452 ms from start
2 -> one at 651 ms from start
2 -> two at 854 ms from start
3 -> two at 952 ms from start
3 -> three at 1256 ms from start
Flattening flows
?Flows表示異步值接收序列,它會(huì)很容易地觸發(fā)請(qǐng)求另一個(gè)sequence值安聘。比如痰洒,以下例子返回了兩個(gè)500ms間隔字符串地flow瓢棒。
fun requestFlow(i: Int): Flow<String> = flow {
emit("$i: First")
delay(500) // wait 500 ms
emit("$i: Second")
}
?現(xiàn)在我們有三個(gè)整型的flow浴韭,我們調(diào)用requestFlow來請(qǐng)求值。
(1..3).asFlow().map { requestFlow(it) }
?現(xiàn)在我們結(jié)束flows的每一個(gè)flow脯宿,需要將其扁平化為一個(gè)flow來進(jìn)行進(jìn)一步地處理念颈。Collections和sequences有 flatten和flatMap操作符,由于flow地異步性質(zhì)连霉,它會(huì)調(diào)用不同地flattening地模型榴芳,因此嗡靡,flows有一族的flattening操作符。
flatMapConcat
?連接模式由flatMapConcat和flattenConcat操作符實(shí)現(xiàn)窟感。 它們和sequence是最相似的讨彼。在收集新值之前,它們會(huì)等待內(nèi)部flow完成柿祈,如同下面的例子描述地一樣:
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun requestFlow(i: Int): Flow<String> = flow {
emit("$i: First")
delay(500) // wait 500 ms
emit("$i: Second")
}
fun main() = runBlocking<Unit> {
val startTime = System.currentTimeMillis() // remember the start time
(1..3).asFlow().onEach { delay(100) } // a number every 100 ms
.flatMapConcat { requestFlow(it) }
.collect { value -> // collect and print
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
}
可以看出來 flatMapConcat的順序性質(zhì)
1: First at 121 ms from start
1: Second at 622 ms from start
2: First at 727 ms from start
2: Second at 1227 ms from start
3: First at 1328 ms from start
3: Second at 1829 ms from start
flatMapMerge
另一個(gè)flattening mode是并發(fā)收集flows并且將合并它們的值為一個(gè)單一flow哈误,因此發(fā)射地值會(huì)盡快被處理。這些由flatMapMerge和flattenMerge來實(shí)現(xiàn)躏嚎。它們都有一個(gè)可選的concurrency參數(shù)來限制并發(fā)同時(shí)進(jìn)行收集的flow個(gè)數(shù)蜜自。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun requestFlow(i: Int): Flow<String> = flow {
emit("$i: First")
delay(500) // wait 500 ms
emit("$i: Second")
}
fun main() = runBlocking<Unit> {
val startTime = System.currentTimeMillis() // remember the start time
(1..3).asFlow().onEach { delay(100) } // a number every 100 ms
.flatMapMerge { requestFlow(it) }
.collect { value -> // collect and print
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
}
可以看出來flatMapMerge并發(fā)特性:
1: First at 136 ms from start
2: First at 231 ms from start
3: First at 333 ms from start
1: Second at 639 ms from start
2: Second at 732 ms from start
3: Second at 833 ms from start
可以看出來flatMapMerge是順序調(diào)用內(nèi)部代碼塊(在這個(gè)例子中是{ requestFlow(it) } )但是并發(fā)收集結(jié)果flows,它等價(jià)于順序執(zhí)行map { requestFlow(it) }卢佣,然后對(duì)于結(jié)果調(diào)用flattenMerge重荠。
flatMapLatest
?flatMapLatest和collectLatest操作符很像,在"Processing the latest value"章節(jié)有介紹虚茶,里面有關(guān)于"Latest"模式的介紹戈鲁,只有新flow發(fā)射了了新值,那么上個(gè)flow就會(huì)被取消嘹叫,由 flatMapLatest實(shí)現(xiàn)荞彼。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun requestFlow(i: Int): Flow<String> = flow {
emit("$i: First")
delay(500) // wait 500 ms
emit("$i: Second")
}
fun main() = runBlocking<Unit> {
val startTime = System.currentTimeMillis() // remember the start time
(1..3).asFlow().onEach { delay(100) } // a number every 100 ms
.flatMapLatest { requestFlow(it) }
.collect { value -> // collect and print
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
}
1: First at 142 ms from start
2: First at 322 ms from start
3: First at 425 ms from start
3: Second at 931 ms from start
flatMapLatest取消了代碼快內(nèi)所有代碼(在這個(gè)例子中是{ requestFlow(it) })當(dāng)flow發(fā)射新值。在這個(gè)特定例子中沒有差別待笑,因?yàn)閷?duì)requestFlow的調(diào)用是非趁恚快的,非掛起也不能取消暮蹂。如果我們使用掛起函數(shù)寞缝,比如delay就會(huì)按照期望的來顯示。
Flow exceptions
?如果在emitter內(nèi)部或者操作符內(nèi)部拋出一個(gè)異常仰泻,flow collection也是可以正常完成荆陆。也有好幾種處理異常的方式。
Collector try and catch
?在collector內(nèi)部使用try/catch
代碼塊來處理異常集侯。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun foo(): Flow<Int> = flow {
for (i in 1..3) {
println("Emitting $i")
emit(i) // emit next value
}
}
fun main() = runBlocking<Unit> {
try {
foo().collect { value ->
println(value)
check(value <= 1) { "Collected $value" }
}
} catch (e: Throwable) {
println("Caught $e")
}
}
?這段代碼成功捕捉了 collect 末端操作符內(nèi)的異常被啼,在拋出異常之后就沒有再發(fā)送新值。
Emitting 1
1
Emitting 2
2
Caught java.lang.IllegalStateException: Collected 2
Everything is caught
?上個(gè)例子確實(shí)捕捉了emitter棠枉,中間操作符浓体,末端操作符內(nèi)的異常。我們修改以下代碼辈讶,將emitter發(fā)射值通過mapped修改為strings命浴,但是除了異常之外。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun foo(): Flow<String> =
flow {
for (i in 1..3) {
println("Emitting $i")
emit(i) // emit next value
}
}
.map { value ->
check(value <= 1) { "Crashed on $value" }
"string $value"
}
fun main() = runBlocking<Unit> {
try {
foo().collect { value -> println(value) }
} catch (e: Throwable) {
println("Caught $e")
}
}
Emitting 1
string 1
Emitting 2
Caught java.lang.IllegalStateException: Crashed on 2
Exception transparency
?但是怎么樣才能將封裝處理emitter代碼異常處理邏輯呢?
?Flow必須對(duì)異常處理透明生闲,在flow{}內(nèi)使用try/catch違背了異常處理透明化原則媳溺。在上述例子中,保證了從collector拋出異常也能在try/catch內(nèi)捕獲碍讯。
?emitter可以使用catch操作符來實(shí)現(xiàn)異常透明化處理悬蔽,運(yùn)行異常處理封裝。catch操作符可以分析異常捉兴,根據(jù)不同的異常作出相應(yīng)處理屯阀。
- 可以使用throw再次拋出異常
- 異常可以在catch內(nèi)轉(zhuǎn)換為發(fā)射值
- 異持崾酰可以被忽略难衰、打印、或者由其他邏輯代碼進(jìn)行處理
?舉個(gè)例子逗栽,我們?cè)赾atch異常之后再發(fā)射一段文本:
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun foo(): Flow<String> =
flow {
for (i in 1..3) {
println("Emitting $i")
emit(i) // emit next value
}
}
.map { value ->
check(value <= 1) { "Crashed on $value" }
"string $value"
}
fun main() = runBlocking<Unit> {
foo()
.catch { e -> emit("Caught $e") } // emit on exception
.collect { value -> println(value) }
}
Transparent catch
?catch中間操作符盖袭,履行了異常透明化原則,只是捕捉上游異常(僅僅會(huì)捕獲catch操作符以上的異常彼宠,而不會(huì)捕獲catch以下的異常)鳄虱,如果是在collect{}代碼塊內(nèi)的異常則會(huì)逃逸挨厚。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun foo(): Flow<Int> = flow {
for (i in 1..3) {
println("Emitting $i")
emit(i)
}
}
fun main() = runBlocking<Unit> {
foo()
.catch { e -> println("Caught $e") } // does not catch downstream exceptions
.collect { value ->
check(value <= 1) { "Collected $value" }
println(value)
}
}
Catching declaratively
?我們可以結(jié)合 catch操作符屬性然后如果要求去處理所有異常的話就可以把 collect函數(shù)體內(nèi)的邏輯轉(zhuǎn)移到onEach并且放到catch操作符前面去旁钧。這樣的Flow Collection 必須由對(duì)collect的調(diào)用觸發(fā)并且collect方法沒有參數(shù)出嘹。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun foo(): Flow<Int> = flow {
for (i in 1..3) {
println("Emitting $i")
emit(i)
}
}
fun main() = runBlocking<Unit> {
foo()
.onEach { value ->
check(value <= 1) { "Collected $value" }
println(value)
}
.catch { e -> println("Caught $e") }
.collect()
}
?現(xiàn)在我們可以看見"Caught …"消息打印出來闹司,現(xiàn)在就可以捕獲所有異常而不用顯示編寫try/catch代碼塊。
Flow completion
?在flow collection完成之后(正常完成或者異常完成)可能需要執(zhí)行一個(gè)操作桐汤,可以通過兩種方式來完成: imperative 或 declarative梯捕。
Imperative finally block
?對(duì)于try/catch笛厦,collector也可以通過使用finally來執(zhí)行完成操作索昂。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun foo(): Flow<Int> = (1..3).asFlow()
fun main() = runBlocking<Unit> {
try {
foo().collect { value -> println(value) }
} finally {
println("Done")
}
}
1
2
3
Done
Declarative handling
?對(duì)于declarative方式建车,flow有一個(gè)onCompletion中間操作符,在flow完成collect操作后就會(huì)被調(diào)用椒惨。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun foo(): Flow<Int> = (1..3).asFlow()
fun main() = runBlocking<Unit> {
foo()
.onCompletion { println("Done") }
.collect { value -> println(value) }
}
?對(duì)于 onCompletion的關(guān)鍵性優(yōu)點(diǎn)則是可以為空的Throwable參數(shù)缤至,可以以此判斷flow是正常完成還是異常完成,在以下例子中康谆,foo() flow在emit數(shù)字1之后拋出來異常领斥。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun foo(): Flow<Int> = flow {
emit(1)
throw RuntimeException()
}
fun main() = runBlocking<Unit> {
foo()
.onCompletion { cause -> if (cause != null) println("Flow completed exceptionally") }
.catch { cause -> println("Caught exception") }
.collect { value -> println(value) }
}
輸入如下
1
Flow completed exceptionally
Caught exception
?onCompletion并不像catch一樣,它不會(huì)處理異常沃暗。正如我們上面看見的一樣月洛,異常依然向下游流動(dòng),被進(jìn)一步分發(fā)到onCompletion操作符描睦,也可以在catch操作符內(nèi)做處理膊存。
Upstream exceptions only
?和catch操作符一樣导而,onCompletion只會(huì)看見來自上游的異常忱叭,不能看見下游異常隔崎。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun foo(): Flow<Int> = (1..3).asFlow()
fun main() = runBlocking<Unit> {
foo()
.onCompletion { cause -> println("Flow completed with $cause") }
.collect { value ->
check(value <= 1) { "Collected $value" }
println(value)
}
}
1
Flow completed with null
Exception in thread "main" java.lang.IllegalStateException: Collected 2
Imperative versus declarative
?現(xiàn)在我們知道如何去收集flow,處理completion和exceptions以imperative和declarative方式韵丑。
Launching flow
?在一些數(shù)據(jù)源爵卒,使用flows來表示異步事件是非常簡(jiǎn)單地。我們需要和addEventListener一樣的類似處理方式撵彻,注冊(cè)一段代碼用來處理新事件和進(jìn)行進(jìn)一步的工作钓株。onEach正好能服務(wù)這一角色。然而陌僵,onEach是一個(gè)中間操作符轴合。我們也需要一個(gè)末端操作符來收集操作符。其他情況下調(diào)用onEach則毫無影響碗短。
?如果我們?cè)?collect末端操作符之前使用onEach受葛,在那之后的代碼將會(huì)等待直到flow收集完成。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
// Imitate a flow of events
fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }
fun main() = runBlocking<Unit> {
events()
.onEach { event -> println("Event: $event") }
.collect() // <--- Collecting the flow waits
println("Done")
}
Event: 1
Event: 2
Event: 3
Done
?launchIn操作符是很方便地偎谁。使用collect來替換launchIn总滩,我們就可以在一個(gè)獨(dú)立協(xié)程中執(zhí)行flow采集,所以后面的代碼就會(huì)馬上執(zhí)行巡雨。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
// Imitate a flow of events
fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }
fun main() = runBlocking<Unit> {
events()
.onEach { event -> println("Event: $event") }
.launchIn(this) // <--- Launching the flow in a separate coroutine
println("Done")
}
Done
Event: 1
Event: 2
Event: 3
? launchIn參數(shù)可以指定一個(gè) CoroutineScope闰渔,用來表示flow收集的協(xié)程所在的作用域。在上述例子中铐望,這個(gè)作用域來自于 runBlocking協(xié)程構(gòu)造器冈涧,runBlocking作用域會(huì)等待內(nèi)部子作用域完成,在這例子中保持主函數(shù)的返回和介紹正蛙。
?在實(shí)際應(yīng)用程序中炕舵,一個(gè)作用域會(huì)來自一個(gè)有限生命周期的實(shí)體。只要這個(gè)實(shí)體的生命周期終結(jié)了跟畅,那么相應(yīng)的作用域也會(huì)被取消咽筋,取消相應(yīng)flow的采集工作。在這種方式下徊件,使用onEach { ... }.launchIn(scope)和addEventListener就很類似奸攻,然而,沒有必要直接調(diào)用removeEventListener虱痕,因?yàn)槿∠僮骱徒Y(jié)構(gòu)化并發(fā)會(huì)自動(dòng)完成這個(gè)操作睹耐。
?請(qǐng)注意,launchIn會(huì)返回 Job部翘,在沒有取消整個(gè)作用域或join時(shí)硝训,job可以用來cancel相應(yīng)的flow收集協(xié)程。