[譯] Kotlin Asynchronous Flow - 異步流

?異步掛起函數(shù)能夠返回單一值,那么我們?nèi)绾畏祷囟鄠€(gè)異步計(jì)算的值呢旨椒?而這個(gè)就是Kotlin Flow需要解決地。

Representing multiple values

?在kotlin漩氨,多個(gè)值可以由Collections表示饭耳。

fun foo(): List<Int> = listOf(1, 2, 3)
 
fun main() {
    foo().forEach { value -> println(value) } 
}

以上代碼輸出如下:

1
2
3

Sequences

?如果我們使用CPU消費(fèi)型阻塞代碼生產(chǎn)numbers,我們可以使用Sequences表示這些numbers睛约。

fun foo(): Sequence<Int> = sequence { // sequence builder
    for (i in 1..3) {
        Thread.sleep(100) // pretend we are computing it
        yield(i) // yield next value
    }
}

fun main() {
    foo().forEach { value -> println(value) } 
}

?以上代碼會(huì)每隔100ms鼎俘,打印出一個(gè)數(shù)字。

Suspending functions

?以上代碼會(huì)阻塞主線程辩涝,我們可以給函數(shù)添加suspend修飾符來實(shí)現(xiàn)異步計(jì)算贸伐。

suspend fun foo(): List<Int> {
    delay(1000) // pretend we are doing something asynchronous here
    return listOf(1, 2, 3)
}

fun main() = runBlocking<Unit> {
    foo().forEach { value -> println(value) } 
}

?使用List<Int>返回類型意味著,我們需要一次性返回所有值怔揩。為了表示異步計(jì)算的值的流捉邢,我們可以使用Flow<Int>,就像之前使用Sequence<Int>同步計(jì)算值一樣商膊。

fun foo(): Flow<Int> = flow { // flow builder
    for (i in 1..3) {
        delay(100) // pretend we are doing something useful here
        emit(i) // emit next value
    }
}

fun main() = runBlocking<Unit> {
    // Launch a concurrent coroutine to check if the main thread is blocked
    launch {
        for (k in 1..3) {
            println("I'm not blocked $k")
            delay(100)
        }
    }
    // Collect the flow
    foo().collect { value -> println(value) } 
}

輸出如下:

I'm not blocked 1
1
I'm not blocked 2
2
I'm not blocked 3
3

Flow和之前例子的差別:

  • Flow 類型 builder 函數(shù)稱為 flow
  • flow{}內(nèi)部代碼是可以掛起的
  • 函數(shù) foo()不再需要suspend修飾符
  • 使用emit函數(shù)發(fā)射值
  • 使用collect函數(shù)收集值

我們可以使用Thread.Sleep 來替換 delay,那么當(dāng)前Main Thread就會(huì)被阻塞

Flows are cold

?Flow是和sequence一樣的code stream伏伐,在flow內(nèi)的代碼塊只有到flow開始被collected時(shí)才開始運(yùn)行;

     
fun foo(): Flow<Int> = flow { 
    println("Flow started")
    for (i in 1..3) {
        delay(100)
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    println("Calling foo...")
    val flow = foo()
    println("Calling collect...")
    flow.collect { value -> println(value) } 
    println("Calling collect again...")
    flow.collect { value -> println(value) } 
}

輸出如下:

Calling foo...
Calling collect...
Flow started
1
2
3
Calling collect again...
Flow started
1
2
3

?這個(gè)是返回flow的foo()函數(shù)不需要被標(biāo)記為suspend的關(guān)鍵理由.foo()函數(shù)會(huì)馬上返回不會(huì)有任何等待晕拆,flow每次都是在collect調(diào)用之后才開始執(zhí)行藐翎,這就是為什么我們?cè)谡{(diào)用collect之后才打印出來 "Flow started"。

Flow cancellation

?Flow堅(jiān)持使用通用的協(xié)作式協(xié)程取消方式实幕。flow底層并沒有采用附加的取消點(diǎn)阱高。對(duì)于取消這是完全透明的。和往常一樣茬缩,如果flow是在一個(gè)掛起函數(shù)內(nèi)被掛起了赤惊,那么flow collection是可以被取消的,并且在其他情況下是不能被取消的凰锡。

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

        
fun foo(): Flow<Int> = flow { 
  for (i in 1..3) {
      delay(100)          
      println("Emitting $i")
      emit(i)
  }
}

fun main() = runBlocking<Unit> {
  withTimeoutOrNull(250) { // Timeout after 250ms 
      foo().collect { value -> println(value) } 
  }
  println("Done")
}

輸出如下:

Emitting 1
1
Emitting 2
2
Done

Flow builders

?在上述例子中未舟,flow { }構(gòu)造者是最簡(jiǎn)單的圈暗。以下還有更簡(jiǎn)單的實(shí)現(xiàn):

  • flowOf定義了一個(gè)flow用來emit一個(gè)固定的集合;
  • 各種collections和sequences都可以通過asFlow()函數(shù)來轉(zhuǎn)換為flows;
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking<Unit> {
    // Convert an integer range to a flow
    (1..3).asFlow().collect { value -> println(value) } 
}

Intermediate flow operators

?Flows可以通過操作符來進(jìn)行轉(zhuǎn)換,就如同你使用collections和sequences一樣裕膀。中間操作符用來應(yīng)用于上游flow任何產(chǎn)生下游flow员串。這些操作符都是冷啟動(dòng)的,就像flows一樣昼扛。對(duì)于這些操作符的調(diào)用也都不是掛起函數(shù)寸齐。它工作很快,返回新的轉(zhuǎn)換的flow的定義抄谐。
?基礎(chǔ)操作符也有著和map和filter類似的名字渺鹦。和sequences一個(gè)很重要的差別是在這些操作符內(nèi)部可以調(diào)用掛起函數(shù)。
?舉個(gè)例子蛹含,一個(gè)請(qǐng)求flow可以通過map操作符映射為result毅厚,即便這個(gè)請(qǐng)求是在一個(gè)掛起函數(shù)內(nèi)需要長(zhǎng)時(shí)間的運(yùn)行。

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

          
suspend fun performRequest(request: Int): String {
    delay(1000) // imitate long-running asynchronous work
    return "response $request"
}

fun main() = runBlocking<Unit> {
    (1..3).asFlow() // a flow of requests
        .map { request -> performRequest(request) }
        .collect { response -> println(response) }
}

輸出如下:

response 1
response 2
response 3

Transform operator

?在flow轉(zhuǎn)換操作符中浦箱,最經(jīng)常使用的就是transform吸耿,它可以用來模擬簡(jiǎn)單轉(zhuǎn)換,就和 mapfilter一樣酷窥,也可以用來實(shí)現(xiàn)更加復(fù)雜地轉(zhuǎn)換咽安。可以每次emit任意數(shù)量地任意值蓬推。

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

suspend fun performRequest(request: Int): String {
    delay(1000) // imitate long-running asynchronous work
    return "response $request"
}

fun main() = runBlocking<Unit> {
    (1..3).asFlow() // a flow of requests
        .transform { request ->
            emit("Making request $request") 
            emit(performRequest(request)) 
        }
        .collect { response -> println(response) }
}
Making request 1
response 1
Making request 2
response 2
Making request 3
response 3

Size-limiting operators

?Size-limiting 中間操作符會(huì)比如take會(huì)取消flow繼續(xù)執(zhí)行妆棒,當(dāng)設(shè)置地limit已經(jīng)達(dá)到了設(shè)定值。協(xié)程取消總是會(huì)拋出一個(gè)異常拳氢,所以所有的資源管理函數(shù)對(duì)于取消操作都會(huì)添加比如try{}finally{}代碼塊募逞。

import kotlinx.coroutines.flow.*

fun numbers(): Flow<Int> = flow {
    try {                          
        emit(1)
        emit(2) 
        println("This line will not execute")
        emit(3)    
    } finally {
        println("Finally in numbers")
    }
}

fun main() = runBlocking<Unit> {
    numbers() 
        .take(2) // take only the first two
        .collect { value -> println(value) }
}
1
2
Finally in numbers

Terminal flow operators

?對(duì)于flows的末端操作符都是開始flow采集的掛起函數(shù)蛋铆。collect操作符是最基礎(chǔ)的馋评,除此以外還有其他操作符:

  • toListtoSet.可以作集合轉(zhuǎn)換
  • 操作符可以獲取第一個(gè)值并且確保flow只會(huì)emit一個(gè)值
  • 使用reducefold 把flow 簡(jiǎn)化合并為一個(gè)值
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking<Unit> {

   val sum = (1..5).asFlow()
       .map { it * it } // squares of numbers from 1 to 5                           
       .reduce { a, b -> a + b } // sum them (terminal operator)
   println(sum)     
}
55

Flows are sequential

?每一個(gè)flow集合都是順序執(zhí)行,除非應(yīng)用了某個(gè)特定地針對(duì)多個(gè)flow執(zhí)行地操作符刺啦。每一個(gè)值都是經(jīng)過中間操作符留特,從上游到達(dá)下游,最終到達(dá)末端操作符玛瘸。

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking<Unit> {

    (1..5).asFlow()
        .filter {
            println("Filter $it")
            it % 2 == 0              
        }              
        .map { 
            println("Map $it")
            "string $it"
        }.collect { 
            println("Collect $it")
        }                      
}
Filter 1
Filter 2
Map 2
Collect string 2
Filter 3
Filter 4
Map 4
Collect string 4
Filter 5

Flow context

?flow collection總是會(huì)在調(diào)用者協(xié)程發(fā)生蜕青。舉個(gè)例子,如果有一個(gè)foo flow糊渊,那么以下代碼會(huì)在作者指定地上下文中執(zhí)行右核,而不用去看foo flow的具體執(zhí)行細(xì)節(jié)。

withContext(context) {
    foo.collect { value ->
        println(value) // run in the specified context 
    }
}

?因此渺绒,flow{}內(nèi)的代碼是運(yùn)行在collector指定的上下文中贺喝。

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
           
fun foo(): Flow<Int> = flow {
    log("Started foo flow")
    for (i in 1..3) {
        emit(i)
    }
}  

fun main() = runBlocking<Unit> {
    foo().collect { value -> log("Collected $value") } 
}      
[main @coroutine#1] Started foo flow
[main @coroutine#1] Collected 1
[main @coroutine#1] Collected 2
[main @coroutine#1] Collected 3

?因?yàn)閒oo().collect是在主線程中被調(diào)用的菱鸥。foo的flow內(nèi)部代碼也是在主線程中執(zhí)行。這是個(gè)完美地實(shí)現(xiàn)躏鱼,解決快速運(yùn)行或者異步代碼氮采,不用關(guān)心執(zhí)行環(huán)境和阻塞調(diào)用者線程。

Wrong emission withContext

?然而染苛,長(zhǎng)時(shí)間運(yùn)行CPU消費(fèi)型任務(wù)需要在Dispatchers.Default中執(zhí)行鹊漠,UI更新代碼需要在Dispatchers.Main中執(zhí)行。通常茶行,withContext用來切換kotlin 協(xié)程的上下文躯概。但是flow{}內(nèi)部的代碼必須要尊重上下文保留屬性,并且不允許從不同的上下文emit值拢军。

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
                     
fun foo(): Flow<Int> = flow {
   // The WRONG way to change context for CPU-consuming code in flow builder
   kotlinx.coroutines.withContext(Dispatchers.Default) {
       for (i in 1..3) {
           Thread.sleep(100) // pretend we are computing it in CPU-consuming way
           emit(i) // emit next value
       }
   }
}

fun main() = runBlocking<Unit> {
   foo().collect { value -> println(value) } 
}   
Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated:
        Flow was collected in [CoroutineId(1), "coroutine#1":BlockingCoroutine{Active}@5511c7f8, BlockingEventLoop@2eac3323],
        but emission happened in [CoroutineId(1), "coroutine#1":DispatchedCoroutine{Active}@2dae0000, DefaultDispatcher].
        Please refer to 'flow' documentation or use 'flowOn' instead
    at ...

flowOn operator

?以下展示正確切換flow上下文的方式:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
           
fun foo(): Flow<Int> = flow {
    for (i in 1..3) {
        Thread.sleep(100) // pretend we are computing it in CPU-consuming way
        log("Emitting $i")
        emit(i) // emit next value
    }
}.flowOn(Dispatchers.Default) // RIGHT way to change context for CPU-consuming code in flow builder

fun main() = runBlocking<Unit> {
    foo().collect { value ->
        log("Collected $value") 
    } 
} 
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 1
[main @coroutine#1] Collected 1
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 2
[main @coroutine#1] Collected 2
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 3
[main @coroutine#1] Collected 3

flowOn改變了flow本身的順序執(zhí)行上下文楞陷。collection發(fā)生在協(xié)程"coroutine#1"中,emission發(fā)生在協(xié)程"coroutine#2"并且是運(yùn)行咋另一個(gè)線程中茉唉,和collect操作是同時(shí)進(jìn)行地固蛾。當(dāng)必須要為上下文改變CoroutineDispatcher時(shí),flowOn操作符就會(huì)為上游flow創(chuàng)建一個(gè)新協(xié)程度陆。

Buffering

?在不同協(xié)程中運(yùn)行flow的不同部分艾凯,在整體立場(chǎng)上是非常有幫助的,特別是涉及到長(zhǎng)時(shí)間運(yùn)行的異步操作懂傀。舉個(gè)例子趾诗,當(dāng)foo()的flow的emission操作比較慢,比如沒100ms生產(chǎn)一個(gè)一個(gè)element蹬蚁,并且collector也比較慢恃泪,花費(fèi)300ms去處理一個(gè)元素。讓我門看看一個(gè)處理三個(gè)數(shù)字的flow需要多少時(shí)間:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.system.*

fun foo(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(100) // pretend we are asynchronously waiting 100 ms
        emit(i) // emit next value
    }
}

fun main() = runBlocking<Unit> { 
    val time = measureTimeMillis {
        foo().collect { value -> 
            delay(300) // pretend we are processing it for 300 ms
            println(value) 
        } 
    }   
    println("Collected in $time ms")
}

輸出如下犀斋,整個(gè)collection操作需要花費(fèi)大概1200ms贝乎。

1
2
3
Collected in 1220 ms

?我們可以使用 buffer操作符去并發(fā)執(zhí)行foo()方法里的emit代碼段,然后順序執(zhí)行collection操作叽粹。

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.system.*

fun foo(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(100) // pretend we are asynchronously waiting 100 ms
        emit(i) // emit next value
    }
}

fun main() = runBlocking<Unit> { 
    val time = measureTimeMillis {
        foo()
            .buffer() // buffer emissions, don't wait
            .collect { value -> 
                delay(300) // pretend we are processing it for 300 ms
                println(value) 
            } 
    }   
    println("Collected in $time ms")
}

?上述方式會(huì)更快生產(chǎn)numbers览效,因?yàn)槲覀冇行?chuàng)建了處理流程,只需要在第一個(gè)數(shù)字上等待100ms虫几,然后每個(gè)數(shù)字都花費(fèi)300ms去做處理锤灿。這樣方式會(huì)花費(fèi)大概1000ms。

1
2
3
Collected in 1071 ms

注意: flowOn操作符使用了相同的緩存機(jī)制辆脸,但是它必須切換CoroutineDispatcher,但是在這里但校,我們顯示請(qǐng)求了buffer而不是切換執(zhí)行上下文。

Conflation

?當(dāng)一個(gè)flow表示操作的部分結(jié)果或者操作狀態(tài)更新啡氢,它可能并不需要去處理每一個(gè)值状囱,但是需要處理最近的一個(gè)值州刽。在這種場(chǎng)景下, conflate操作符可以被用于忽略中間操作符浪箭,當(dāng)collector處理太慢穗椅。

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.system.*

fun foo(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(100) // pretend we are asynchronously waiting 100 ms
        emit(i) // emit next value
    }
}

fun main() = runBlocking<Unit> { 
    val time = measureTimeMillis {
        foo()
            .conflate() // conflate emissions, don't process each one
            .collect { value -> 
                delay(300) // pretend we are processing it for 300 ms
                println(value) 
            } 
    }   
    println("Collected in $time ms")
}

?從上面可以看出來,第一個(gè)數(shù)字會(huì)被處理奶栖,第二第三個(gè)也會(huì)被處理匹表,而第二個(gè)數(shù)字會(huì)被合并,只有第三個(gè)數(shù)字發(fā)送給collector進(jìn)行處理宣鄙。

1
3
Collected in 758 ms

Processing the latest value

? Conflation是一種對(duì)emitter和collector慢處理的一種加速方式袍镀。它通過丟棄一些值來做實(shí)現(xiàn)。另外一種方式就是通過取消一個(gè)慢處理collector然后重啟collector接受已經(jīng)發(fā)射的新值冻晤。有一族的xxxlatest操作符來執(zhí)行和xxx操作符同樣的基本邏輯苇羡。取消emit新值的代碼塊內(nèi)的代碼。我們把上個(gè)例子中的conflate改成collectlatest鼻弧。

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.system.*

fun foo(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(100) // pretend we are asynchronously waiting 100 ms
        emit(i) // emit next value
    }
}

fun main() = runBlocking<Unit> { 
    val time = measureTimeMillis {
        foo()
            .collectLatest { value -> // cancel & restart on the latest value
                println("Collecting $value") 
                delay(300) // pretend we are processing it for 300 ms
                println("Done $value") 
            } 
    }   
    println("Collected in $time ms")
}

?因?yàn)?a target="_blank">collectLatest花費(fèi)來300ms设江,每次發(fā)送一個(gè)新值都是100ms。我們看見代碼塊都是運(yùn)行在新值上攘轩,但是只會(huì)以最新值完成叉存。

Collecting 1
Collecting 2
Collecting 3
Done 3
Collected in 741 ms

Composing multiple flows

有不少方式來組合多個(gè)flow。

?就像kotlin標(biāo)準(zhǔn)庫(kù)內(nèi)的Sequence.zip擴(kuò)展函數(shù)一樣度帮,flows有zip操作符來組合不同的flows的值歼捏。

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking<Unit> { 

    val nums = (1..3).asFlow() // numbers 1..3
    val strs = flowOf("one", "two", "three") // strings 
    nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string
        .collect { println(it) } // collect and print
}
1 -> one
2 -> two
3 -> three

Combine

?當(dāng)flow表示操作或者變量的最近值,它可能需要去執(zhí)行計(jì)算根據(jù)所依賴的相應(yīng)的flow的最近的值笨篷,并且會(huì)重新計(jì)算瞳秽,當(dāng)上游flow發(fā)射新值的時(shí)候。相應(yīng)的操作符族被稱作combine率翅。

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking<Unit> { 

    val nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms
    val strs = flowOf("one", "two", "three").onEach { delay(400) } // strings every 400 ms
    val startTime = System.currentTimeMillis() // remember the start time 
    nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string with "zip"
        .collect { value -> // collect and print 
            println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
        } 
}

使用combine替換zip练俐。

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking<Unit> { 

    val nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms
    val strs = flowOf("one", "two", "three").onEach { delay(400) } // strings every 400 ms          
    val startTime = System.currentTimeMillis() // remember the start time 
    nums.combine(strs) { a, b -> "$a -> $b" } // compose a single string with "combine"
        .collect { value -> // collect and print 
            println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
        } 
}
1 -> one at 452 ms from start
2 -> one at 651 ms from start
2 -> two at 854 ms from start
3 -> two at 952 ms from start
3 -> three at 1256 ms from start

Flattening flows

?Flows表示異步值接收序列,它會(huì)很容易地觸發(fā)請(qǐng)求另一個(gè)sequence值安聘。比如痰洒,以下例子返回了兩個(gè)500ms間隔字符串地flow瓢棒。

fun requestFlow(i: Int): Flow<String> = flow {
    emit("$i: First") 
    delay(500) // wait 500 ms
    emit("$i: Second")    
}

?現(xiàn)在我們有三個(gè)整型的flow浴韭,我們調(diào)用requestFlow來請(qǐng)求值。

(1..3).asFlow().map { requestFlow(it) }

?現(xiàn)在我們結(jié)束flows的每一個(gè)flow脯宿,需要將其扁平化為一個(gè)flow來進(jìn)行進(jìn)一步地處理念颈。Collections和sequences有 flattenflatMap操作符,由于flow地異步性質(zhì)连霉,它會(huì)調(diào)用不同地flattening地模型榴芳,因此嗡靡,flows有一族的flattening操作符。

flatMapConcat

?連接模式由flatMapConcatflattenConcat操作符實(shí)現(xiàn)窟感。 它們和sequence是最相似的讨彼。在收集新值之前,它們會(huì)等待內(nèi)部flow完成柿祈,如同下面的例子描述地一樣:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun requestFlow(i: Int): Flow<String> = flow {
    emit("$i: First") 
    delay(500) // wait 500 ms
    emit("$i: Second")    
}

fun main() = runBlocking<Unit> { 
    val startTime = System.currentTimeMillis() // remember the start time 
    (1..3).asFlow().onEach { delay(100) } // a number every 100 ms 
        .flatMapConcat { requestFlow(it) }                                                                           
        .collect { value -> // collect and print 
            println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
        } 
}

可以看出來 flatMapConcat的順序性質(zhì)

1: First at 121 ms from start
1: Second at 622 ms from start
2: First at 727 ms from start
2: Second at 1227 ms from start
3: First at 1328 ms from start
3: Second at 1829 ms from start

flatMapMerge

另一個(gè)flattening mode是并發(fā)收集flows并且將合并它們的值為一個(gè)單一flow哈误,因此發(fā)射地值會(huì)盡快被處理。這些由flatMapMergeflattenMerge來實(shí)現(xiàn)躏嚎。它們都有一個(gè)可選的concurrency參數(shù)來限制并發(fā)同時(shí)進(jìn)行收集的flow個(gè)數(shù)蜜自。

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun requestFlow(i: Int): Flow<String> = flow {
    emit("$i: First") 
    delay(500) // wait 500 ms
    emit("$i: Second")    
}

fun main() = runBlocking<Unit> { 
    val startTime = System.currentTimeMillis() // remember the start time 
    (1..3).asFlow().onEach { delay(100) } // a number every 100 ms 
        .flatMapMerge { requestFlow(it) }                                                                           
        .collect { value -> // collect and print 
            println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
        } 
}

可以看出來flatMapMerge并發(fā)特性:

1: First at 136 ms from start
2: First at 231 ms from start
3: First at 333 ms from start
1: Second at 639 ms from start
2: Second at 732 ms from start
3: Second at 833 ms from start

可以看出來flatMapMerge是順序調(diào)用內(nèi)部代碼塊(在這個(gè)例子中是{ requestFlow(it) } )但是并發(fā)收集結(jié)果flows,它等價(jià)于順序執(zhí)行map { requestFlow(it) }卢佣,然后對(duì)于結(jié)果調(diào)用flattenMerge重荠。

flatMapLatest

?flatMapLatest和collectLatest操作符很像,在"Processing the latest value"章節(jié)有介紹虚茶,里面有關(guān)于"Latest"模式的介紹戈鲁,只有新flow發(fā)射了了新值,那么上個(gè)flow就會(huì)被取消嘹叫,由 flatMapLatest實(shí)現(xiàn)荞彼。

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun requestFlow(i: Int): Flow<String> = flow {
    emit("$i: First") 
    delay(500) // wait 500 ms
    emit("$i: Second")    
}

fun main() = runBlocking<Unit> { 
    val startTime = System.currentTimeMillis() // remember the start time 
    (1..3).asFlow().onEach { delay(100) } // a number every 100 ms 
        .flatMapLatest { requestFlow(it) }                                                                           
        .collect { value -> // collect and print 
            println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
        } 
}
1: First at 142 ms from start
2: First at 322 ms from start
3: First at 425 ms from start
3: Second at 931 ms from start

flatMapLatest取消了代碼快內(nèi)所有代碼(在這個(gè)例子中是{ requestFlow(it) })當(dāng)flow發(fā)射新值。在這個(gè)特定例子中沒有差別待笑,因?yàn)閷?duì)requestFlow的調(diào)用是非趁恚快的,非掛起也不能取消暮蹂。如果我們使用掛起函數(shù)寞缝,比如delay就會(huì)按照期望的來顯示。

Flow exceptions

?如果在emitter內(nèi)部或者操作符內(nèi)部拋出一個(gè)異常仰泻,flow collection也是可以正常完成荆陆。也有好幾種處理異常的方式。

Collector try and catch

?在collector內(nèi)部使用try/catch代碼塊來處理異常集侯。

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun foo(): Flow<Int> = flow {
    for (i in 1..3) {
        println("Emitting $i")
        emit(i) // emit next value
    }
}

fun main() = runBlocking<Unit> {
    try {
        foo().collect { value ->         
            println(value)
            check(value <= 1) { "Collected $value" }
        }
    } catch (e: Throwable) {
        println("Caught $e")
    } 
}    

?這段代碼成功捕捉了 collect 末端操作符內(nèi)的異常被啼,在拋出異常之后就沒有再發(fā)送新值。

Emitting 1
1
Emitting 2
2
Caught java.lang.IllegalStateException: Collected 2

Everything is caught

?上個(gè)例子確實(shí)捕捉了emitter棠枉,中間操作符浓体,末端操作符內(nèi)的異常。我們修改以下代碼辈讶,將emitter發(fā)射值通過mapped修改為strings命浴,但是除了異常之外。

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun foo(): Flow<String> = 
    flow {
        for (i in 1..3) {
            println("Emitting $i")
            emit(i) // emit next value
        }
    }
    .map { value ->
        check(value <= 1) { "Crashed on $value" }                 
        "string $value"
    }

fun main() = runBlocking<Unit> {
    try {
        foo().collect { value -> println(value) }
    } catch (e: Throwable) {
        println("Caught $e")
    } 
}            
Emitting 1
string 1
Emitting 2
Caught java.lang.IllegalStateException: Crashed on 2

Exception transparency

?但是怎么樣才能將封裝處理emitter代碼異常處理邏輯呢?
?Flow必須對(duì)異常處理透明生闲,在flow{}內(nèi)使用try/catch違背了異常處理透明化原則媳溺。在上述例子中,保證了從collector拋出異常也能在try/catch內(nèi)捕獲碍讯。
?emitter可以使用catch操作符來實(shí)現(xiàn)異常透明化處理悬蔽,運(yùn)行異常處理封裝。catch操作符可以分析異常捉兴,根據(jù)不同的異常作出相應(yīng)處理屯阀。

  • 可以使用throw再次拋出異常
  • 異常可以在catch內(nèi)轉(zhuǎn)換為發(fā)射值
  • 異持崾酰可以被忽略难衰、打印、或者由其他邏輯代碼進(jìn)行處理
    ?舉個(gè)例子逗栽,我們?cè)赾atch異常之后再發(fā)射一段文本:
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun foo(): Flow<String> = 
    flow {
        for (i in 1..3) {
            println("Emitting $i")
            emit(i) // emit next value
        }
    }
    .map { value ->
        check(value <= 1) { "Crashed on $value" }                 
        "string $value"
    }

fun main() = runBlocking<Unit> {
    foo()
        .catch { e -> emit("Caught $e") } // emit on exception
        .collect { value -> println(value) }
}

Transparent catch

?catch中間操作符盖袭,履行了異常透明化原則,只是捕捉上游異常(僅僅會(huì)捕獲catch操作符以上的異常彼宠,而不會(huì)捕獲catch以下的異常)鳄虱,如果是在collect{}代碼塊內(nèi)的異常則會(huì)逃逸挨厚。

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun foo(): Flow<Int> = flow {
    for (i in 1..3) {
        println("Emitting $i")
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    foo()
        .catch { e -> println("Caught $e") } // does not catch downstream exceptions
        .collect { value ->
            check(value <= 1) { "Collected $value" }                 
            println(value) 
        }
} 

Catching declaratively

?我們可以結(jié)合 catch操作符屬性然后如果要求去處理所有異常的話就可以把 collect函數(shù)體內(nèi)的邏輯轉(zhuǎn)移到onEach并且放到catch操作符前面去旁钧。這樣的Flow Collection 必須由對(duì)collect的調(diào)用觸發(fā)并且collect方法沒有參數(shù)出嘹。

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun foo(): Flow<Int> = flow {
    for (i in 1..3) {
        println("Emitting $i")
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    foo()
        .onEach { value ->
            check(value <= 1) { "Collected $value" }                 
            println(value) 
        }
        .catch { e -> println("Caught $e") }
        .collect()
}

?現(xiàn)在我們可以看見"Caught …"消息打印出來闹司,現(xiàn)在就可以捕獲所有異常而不用顯示編寫try/catch代碼塊。

Flow completion

?在flow collection完成之后(正常完成或者異常完成)可能需要執(zhí)行一個(gè)操作桐汤,可以通過兩種方式來完成: imperative 或 declarative梯捕。

Imperative finally block

?對(duì)于try/catch笛厦,collector也可以通過使用finally來執(zhí)行完成操作索昂。

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun foo(): Flow<Int> = (1..3).asFlow()

fun main() = runBlocking<Unit> {
    try {
        foo().collect { value -> println(value) }
    } finally {
        println("Done")
    }
}  
1
2
3
Done

Declarative handling

?對(duì)于declarative方式建车,flow有一個(gè)onCompletion中間操作符,在flow完成collect操作后就會(huì)被調(diào)用椒惨。

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun foo(): Flow<Int> = (1..3).asFlow()

fun main() = runBlocking<Unit> {
    foo()
        .onCompletion { println("Done") }
        .collect { value -> println(value) }
}

?對(duì)于 onCompletion的關(guān)鍵性優(yōu)點(diǎn)則是可以為空的Throwable參數(shù)缤至,可以以此判斷flow是正常完成還是異常完成,在以下例子中康谆,foo() flow在emit數(shù)字1之后拋出來異常领斥。

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun foo(): Flow<Int> = flow {
    emit(1)
    throw RuntimeException()
}

fun main() = runBlocking<Unit> {
    foo()
        .onCompletion { cause -> if (cause != null) println("Flow completed exceptionally") }
        .catch { cause -> println("Caught exception") }
        .collect { value -> println(value) }
} 

輸入如下

1
Flow completed exceptionally
Caught exception

?onCompletion并不像catch一樣,它不會(huì)處理異常沃暗。正如我們上面看見的一樣月洛,異常依然向下游流動(dòng),被進(jìn)一步分發(fā)到onCompletion操作符描睦,也可以在catch操作符內(nèi)做處理膊存。

Upstream exceptions only

?和catch操作符一樣导而,onCompletion只會(huì)看見來自上游的異常忱叭,不能看見下游異常隔崎。

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun foo(): Flow<Int> = (1..3).asFlow()

fun main() = runBlocking<Unit> {
    foo()
        .onCompletion { cause -> println("Flow completed with $cause") }
        .collect { value ->
            check(value <= 1) { "Collected $value" }                 
            println(value) 
        }
}
1
Flow completed with null
Exception in thread "main" java.lang.IllegalStateException: Collected 2

Imperative versus declarative

?現(xiàn)在我們知道如何去收集flow,處理completion和exceptions以imperative和declarative方式韵丑。

Launching flow

?在一些數(shù)據(jù)源爵卒,使用flows來表示異步事件是非常簡(jiǎn)單地。我們需要和addEventListener一樣的類似處理方式撵彻,注冊(cè)一段代碼用來處理新事件和進(jìn)行進(jìn)一步的工作钓株。onEach正好能服務(wù)這一角色。然而陌僵,onEach是一個(gè)中間操作符轴合。我們也需要一個(gè)末端操作符來收集操作符。其他情況下調(diào)用onEach則毫無影響碗短。
?如果我們?cè)?collect末端操作符之前使用onEach受葛,在那之后的代碼將會(huì)等待直到flow收集完成。

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

// Imitate a flow of events
fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }

fun main() = runBlocking<Unit> {
    events()
        .onEach { event -> println("Event: $event") }
        .collect() // <--- Collecting the flow waits
    println("Done")
} 
Event: 1
Event: 2
Event: 3
Done

?launchIn操作符是很方便地偎谁。使用collect來替換launchIn总滩,我們就可以在一個(gè)獨(dú)立協(xié)程中執(zhí)行flow采集,所以后面的代碼就會(huì)馬上執(zhí)行巡雨。

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

// Imitate a flow of events
fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }

fun main() = runBlocking<Unit> {
    events()
        .onEach { event -> println("Event: $event") }
        .launchIn(this) // <--- Launching the flow in a separate coroutine
    println("Done")
}   
Done
Event: 1
Event: 2
Event: 3

? launchIn參數(shù)可以指定一個(gè) CoroutineScope闰渔,用來表示flow收集的協(xié)程所在的作用域。在上述例子中铐望,這個(gè)作用域來自于 runBlocking協(xié)程構(gòu)造器冈涧,runBlocking作用域會(huì)等待內(nèi)部子作用域完成,在這例子中保持主函數(shù)的返回和介紹正蛙。
?在實(shí)際應(yīng)用程序中炕舵,一個(gè)作用域會(huì)來自一個(gè)有限生命周期的實(shí)體。只要這個(gè)實(shí)體的生命周期終結(jié)了跟畅,那么相應(yīng)的作用域也會(huì)被取消咽筋,取消相應(yīng)flow的采集工作。在這種方式下徊件,使用onEach { ... }.launchIn(scope)和addEventListener就很類似奸攻,然而,沒有必要直接調(diào)用removeEventListener虱痕,因?yàn)槿∠僮骱徒Y(jié)構(gòu)化并發(fā)會(huì)自動(dòng)完成這個(gè)操作睹耐。
?請(qǐng)注意,launchIn會(huì)返回 Job部翘,在沒有取消整個(gè)作用域或join時(shí)硝训,job可以用來cancel相應(yīng)的flow收集協(xié)程。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市窖梁,隨后出現(xiàn)的幾起案子赘风,更是在濱河造成了極大的恐慌,老刑警劉巖纵刘,帶你破解...
    沈念sama閱讀 216,496評(píng)論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件邀窃,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡假哎,警方通過查閱死者的電腦和手機(jī)瞬捕,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,407評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來舵抹,“玉大人肪虎,你說我怎么就攤上這事【逵迹” “怎么了笋轨?”我有些...
    開封第一講書人閱讀 162,632評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)赊淑。 經(jīng)常有香客問我爵政,道長(zhǎng),這世上最難降的妖魔是什么陶缺? 我笑而不...
    開封第一講書人閱讀 58,180評(píng)論 1 292
  • 正文 為了忘掉前任钾挟,我火速辦了婚禮,結(jié)果婚禮上饱岸,老公的妹妹穿的比我還像新娘掺出。我一直安慰自己,他們只是感情好苫费,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,198評(píng)論 6 388
  • 文/花漫 我一把揭開白布汤锨。 她就那樣靜靜地躺著,像睡著了一般百框。 火紅的嫁衣襯著肌膚如雪闲礼。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,165評(píng)論 1 299
  • 那天铐维,我揣著相機(jī)與錄音柬泽,去河邊找鬼。 笑死嫁蛇,一個(gè)胖子當(dāng)著我的面吹牛锨并,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播睬棚,決...
    沈念sama閱讀 40,052評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼第煮,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼解幼!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起包警,我...
    開封第一講書人閱讀 38,910評(píng)論 0 274
  • 序言:老撾萬榮一對(duì)情侶失蹤撵摆,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后揽趾,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體台汇,經(jīng)...
    沈念sama閱讀 45,324評(píng)論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡苛骨,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,542評(píng)論 2 332
  • 正文 我和宋清朗相戀三年篱瞎,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片痒芝。...
    茶點(diǎn)故事閱讀 39,711評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡俐筋,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出严衬,到底是詐尸還是另有隱情澄者,我是刑警寧澤,帶...
    沈念sama閱讀 35,424評(píng)論 5 343
  • 正文 年R本政府宣布请琳,位于F島的核電站粱挡,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏俄精。R本人自食惡果不足惜询筏,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,017評(píng)論 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望竖慧。 院中可真熱鬧嫌套,春花似錦、人聲如沸圾旨。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,668評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)砍的。三九已至痹筛,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間廓鞠,已是汗流浹背味混。 一陣腳步聲響...
    開封第一講書人閱讀 32,823評(píng)論 1 269
  • 我被黑心中介騙來泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留诫惭,地道東北人翁锡。 一個(gè)月前我還...
    沈念sama閱讀 47,722評(píng)論 2 368
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像夕土,于是被迫代替她去往敵國(guó)和親馆衔。 傳聞我的和親對(duì)象是個(gè)殘疾皇子瘟判,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,611評(píng)論 2 353

推薦閱讀更多精彩內(nèi)容