Kotlin(二十一)協(xié)程(異步流)

1.Flows(Flows)

掛起函數(shù)可以通過(guò)返回值返回單個(gè)計(jì)算值,但如何返回多個(gè)計(jì)算值呢欧漱?我們可以使用Flows隘击。我們使用集合遍歷,打印來(lái)返回多個(gè)計(jì)算值來(lái)舉例子

package com.example.kotlin01
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
 fun main()  = runBlocking {
     launch {
        for (k in 1..3){
            println("我沒(méi)有被阻塞凉泄,當(dāng)前值$k")
            delay(100)
        }
     }
     foo().collect{
         value ->
         println(value)
     }
 }
fun foo(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(100)
        emit(i)
    }
}
我沒(méi)有被阻塞,當(dāng)前值1
1
我沒(méi)有被阻塞蚯根,當(dāng)前值2
2
我沒(méi)有被阻塞后众,當(dāng)前值3
3

從打印的結(jié)果來(lái)看,兩個(gè)打印的地方都是異步的,并沒(méi)有阻塞主線程蒂誉。

  • Flow類(lèi)型的構(gòu)造器函數(shù)名為flow
  • flow中的代碼塊可以掛起
  • 計(jì)算值通過(guò)emit函數(shù)發(fā)出
  • 結(jié)果值通過(guò)collect函數(shù)從fllow取出
  • 此場(chǎng)景有點(diǎn)類(lèi)型RxJava

2.流是冷的(Flows are cold)

流是冷的意思是教藻,flow build中的代碼在開(kāi)始收集之前不會(huì)運(yùn)行,也就是在collect之前右锨,不會(huì)運(yùn)行flow里面的代碼塊

package com.example.kotlin01

import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking


fun main() = runBlocking {
    var flow = foo()
    flow.collect { value ->
        println(value)
    }
    println("flow agin")
    flow.collect { value ->
        println(value)
    }
}


fun foo(): Flow<Int> = flow {
    println("flow start")
    for (i in 1..3) {
        delay(100)
        emit(i)
    }
}



flow start
1
2
3
flow agin
flow start
1
2
3

每次 collect的時(shí)候都會(huì)start

3.取消流(Flow cancellation)

流的收集操作可以在當(dāng)流在一個(gè)可以取消的掛起函數(shù)中掛起的時(shí)候取消(比如delay ),否則不能取消
下面通過(guò)withTimeoutOrNull 塊演示流如何在超時(shí)的時(shí)候被取消并停止執(zhí)行

package com.example.kotlin01

import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withTimeoutOrNull


fun main() = runBlocking {
    var flow = foo()
    withTimeoutOrNull(250) {
        flow.collect { value ->
            println("collect $value")
        }
    }
    println("done")


}


fun foo(): Flow<Int> = flow {

    for (i in 1..Int.MAX_VALUE) {
        delay(100)
        println("emit $i")
        emit(i)
    }
}



emit 1
collect 1
emit 2
done

以上的代碼括堤,foo函數(shù)中每隔100毫秒收集一次,main函數(shù)绍移,收集flow的計(jì)算值悄窃。并且在250毫秒的時(shí)候超時(shí)。我們看打印知道蹂窖,打印了1和2之后轧抗,就打印了 done,因?yàn)榇藭r(shí)當(dāng)前流在一個(gè)可以取消的掛起函數(shù)中delay函數(shù)掛起。因?yàn)槌瑫r(shí)之后被取消了恼策。

4.流構(gòu)建器(Flow builders)

前面說(shuō)的flow{}是最基礎(chǔ)的流構(gòu)建器鸦致,還有其他構(gòu)建器可以聲明流

  • flowOf()定義了一個(gè)發(fā)射固定值集的流構(gòu)建器
  • 可以使用asFlow()擴(kuò)展函數(shù)將各種集合和序列轉(zhuǎn)換成流
fun main() = runBlocking {
    (1..3).asFlow().collect() { value -> println(value) }

}

1
2
3

5. 中間流運(yùn)算符

可以使用運(yùn)算符來(lái)轉(zhuǎn)換流,中間運(yùn)算符應(yīng)用于上游流并返回下游流涣楷,比如我們熟悉的map分唾,filter
使用map將請(qǐng)求值映射成結(jié)果值

import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.runBlocking


fun main() = runBlocking {
    (1..5).asFlow().map { value -> request(value) }
        .collect { response -> println(response) }


}
suspend fun request(parm: Int): Int {
    delay(100)
    return parm
}

1
2
3
4
5

5.1.轉(zhuǎn)換操作符(Transform operator)

轉(zhuǎn)換運(yùn)算符Transform可以用來(lái)模擬簡(jiǎn)單的數(shù)據(jù)轉(zhuǎn)換也可以用來(lái)更復(fù)雜的運(yùn)算符轉(zhuǎn)換
使用Transform模擬在執(zhí)行一個(gè)耗時(shí)任務(wù)之前發(fā)出一個(gè)字符串并在字符串后跟出一個(gè)響應(yīng)

package com.example.kotlin01

import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.transform
import kotlinx.coroutines.runBlocking


fun main() = runBlocking<Unit> {
   val flow =  (1..5).asFlow().transform { param ->
        emit("這是第 $param 次請(qǐng)求")
        emit(request(param))
    }
    flow.collect{data-> println(data)}


}

suspend fun request(parm: Int): Int {
    delay(100)
    return parm
}



這是第 1 次請(qǐng)求
1
這是第 2 次請(qǐng)求
2
這是第 3 次請(qǐng)求
3
這是第 4 次請(qǐng)求
4
這是第 5 次請(qǐng)求
5

5.2.限長(zhǎng)運(yùn)算符(Size-limiting operators)

限長(zhǎng)運(yùn)算符在達(dá)到相應(yīng)限制的時(shí)候,取消流的執(zhí)行狮斗。協(xié)程中的取消總是通過(guò)拋出異常來(lái)實(shí)現(xiàn)的绽乔,這樣所有的資源管理函數(shù)就可以在取消時(shí)正常執(zhí)行
限制長(zhǎng)度是2,我們可以看到打印的結(jié)果2之后就停止了

package com.example.kotlin01
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.take
import kotlinx.coroutines.runBlocking
fun main() = runBlocking<Unit> {
  try {
      val flow = (1..5).asFlow().take(2).map { value ->
          request(value)
      }.collect { response -> println(response) }
  } finally {
      println("Finally in numbers")

  }
}
suspend fun request(parm: Int): Int {
  delay(100)
  return parm
}
1
2
Finally in numbers

6. 流運(yùn)算符(Terminal flow operators)

終端流運(yùn)算符是用于啟動(dòng)流的掛起函數(shù),collect是最基本的終端流運(yùn)算符碳褒,但還要其他的終端流運(yùn)算符折砸,可以使得操作更加簡(jiǎn)單

  • 轉(zhuǎn)換各種集合 toList,toSet
  • first 用于獲取第一個(gè)值沙峻,single睦授,用于確保流發(fā)出單個(gè)值
  • 使用reduce和fold 將流還原為某個(gè)值
package com.example.kotlin01

import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking


fun main() = runBlocking<Unit> {
    try {
        val list = (1..5).asFlow().take(2).map { value ->
            request(value)
        }.toList()
        println(list)
    } finally {
        println("Finally in numbers")

    }


}

suspend fun request(parm: Int): Int {
    delay(100)
    return parm
}


[1, 2]
Finally in numbers

將流轉(zhuǎn)換成集合

7. 流是連續(xù)的(Flows are sequential)

每個(gè)流的單獨(dú)集合都是按照順序執(zhí)行的。

package com.example.kotlin01

import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.runBlocking


fun main() = runBlocking<Unit> {
    (1..5).asFlow().filter {
        println("當(dāng)前傳入的值$it")
        it % 2 == 0//過(guò)濾下 模 2 = 0的
    }
        .map {
            println("當(dāng)前轉(zhuǎn)換的值$it")
            "$it"
        }
        .collect {
            println("當(dāng)前收集的值 $it ")
        }

}



當(dāng)前傳入的值1
當(dāng)前傳入的值2
當(dāng)前轉(zhuǎn)換的值2
當(dāng)前收集的值 2 
當(dāng)前傳入的值3
當(dāng)前傳入的值4
當(dāng)前轉(zhuǎn)換的值4
當(dāng)前收集的值 4 
當(dāng)前傳入的值5

只有2和4是滿足的摔寨,2和4會(huì)執(zhí)行到收集完畢去枷,再繼續(xù)發(fā)射下一個(gè)值。

8. 流上下文

流的收集總是在調(diào)用協(xié)程的上下文中執(zhí)行

package com.example.kotlin01

import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.runBlocking


fun main() = runBlocking {
    foo().collect { value -> println("當(dāng)前線程${Thread.currentThread().name}-當(dāng)前值$value") }

}

fun foo(): Flow<Int> = flow {
    for (i in 1..4) {
        emit(i)
    }

}

當(dāng)前線程main-當(dāng)前值1
當(dāng)前線程main-當(dāng)前值2
當(dāng)前線程main-當(dāng)前值3
當(dāng)前線程main-當(dāng)前值4

調(diào)用協(xié)程的地方是在主線程中執(zhí)行的是复,所有流的收集也是在主線程中執(zhí)行删顶。因?yàn)槠渖舷挛脑谥骶€程。

8.1.錯(cuò)誤地使用 withContext(Wrong emission withContext)

通常withContext用于使用協(xié)程的時(shí)候更改代碼中的上下文淑廊,但是flow中的代碼塊必須要保留上下文的屬性逗余,不能修改上下文。

package com.example.kotlin01

import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withContext


fun main() = runBlocking {
    foo().collect { value -> println("當(dāng)前線程${Thread.currentThread().name}-當(dāng)前值$value") }

}

fun foo(): Flow<Int> = flow {
    withContext(Dispatchers.Default){
        for (i in 1..4) {
            emit(i)
        }
    }


}

Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated:
        Flow was collected in [BlockingCoroutine{Active}@614b37e6, BlockingEventLoop@185a864f],
        but emission happened in [DispatchedCoroutine{Active}@64e54ce0, Dispatchers.Default].
        Please refer to 'flow' documentation or use 'flowOn' instead
    at kotlinx.coroutines.flow.internal.SafeCollector_commonKt.checkContext(SafeCollector.common.kt:84)
    at kotlinx.coroutines.flow.internal.SafeCollector.checkContext(SafeCollector.kt:84)
    at kotlinx.coroutines.flow.internal.SafeCollector.emit(SafeCollector.kt:70)
    at kotlinx.coroutines.flow.internal.SafeCollector.emit(SafeCollector.kt:55)
    at com.example.kotlin01.CoroutineKt$foo$1$1.invokeSuspend(coroutine.kt:19)

8.2.flowOn 運(yùn)算符(flowOn operator)

有個(gè)例外就是flowOn運(yùn)算符可以修改發(fā)射時(shí)的上下文


import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.flowOn
import kotlinx.coroutines.runBlocking


fun main() = runBlocking {
    foo().collect { value -> println("當(dāng)前線程${Thread.currentThread().name}-當(dāng)前值$value") }

}

fun foo(): Flow<Int> = flow {
        for (i in 1..4) {
            Thread.sleep(100)
            emit(i)
            println("發(fā)射值的當(dāng)前線程${Thread.currentThread().name}")
        }
}.flowOn(Dispatchers.Default)



發(fā)射值的當(dāng)前線程DefaultDispatcher-worker-1
當(dāng)前線程main-當(dāng)前值1
發(fā)射值的當(dāng)前線程DefaultDispatcher-worker-1
當(dāng)前線程main-當(dāng)前值2
發(fā)射值的當(dāng)前線程DefaultDispatcher-worker-1
當(dāng)前線程main-當(dāng)前值3
發(fā)射值的當(dāng)前線程DefaultDispatcher-worker-1
當(dāng)前線程main-當(dāng)前值4

結(jié)果是發(fā)射時(shí)的線程是在DefaultDispatcher季惩,而收集值的線程是在main

9.緩沖(Buffering)

package com.example.kotlin01

import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking
import kotlin.system.measureTimeMillis


fun main() = runBlocking {
    val time = measureTimeMillis {
        foo().collect {
                value ->
            delay(300)
        }
    }
    println("當(dāng)前流執(zhí)行需要花費(fèi)的時(shí)間$time")

}

fun foo(): Flow<Int> = flow {
    for (i in 1..10) {
        delay(100)
        emit(i)
    }
}



當(dāng)前流執(zhí)行需要花費(fèi)的時(shí)間4081

以上的代碼我們是按照?qǐng)?zhí)行录粱,先執(zhí)行完收集代碼腻格,在執(zhí)行fool.時(shí)間花費(fèi)是4081

package com.example.kotlin01

import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking
import kotlin.system.measureTimeMillis


fun main() = runBlocking {
    val time = measureTimeMillis {
        foo().buffer().collect {
                value ->
            delay(300)
        }
    }
    println("當(dāng)前流執(zhí)行需要花費(fèi)的時(shí)間$time")

}

fun foo(): Flow<Int> = flow {
    for (i in 1..10) {
        delay(100)
        emit(i)
    }
}



當(dāng)前流執(zhí)行需要花費(fèi)的時(shí)間3214

使用buffer運(yùn)算符在運(yùn)行取值代碼的同時(shí)同時(shí)運(yùn)行 fool(),而不是按順序運(yùn)行他門(mén).時(shí)間花費(fèi)了3214

9.1合并(Conflation)

有時(shí)候我們的需求是不關(guān)心每一個(gè)值的結(jié)果,值關(guān)心最近的值,比如最新的狀態(tài)等.當(dāng)時(shí)可能前面的值還在處理中,導(dǎo)致沒(méi)法生成后面的值,這樣我們可以使用Conflation跳過(guò)前面的值

package com.example.kotlin01

import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking
import kotlin.system.measureTimeMillis


fun main() = runBlocking {
    val time = measureTimeMillis {
        foo().conflate().collect {
                value ->
            delay(300)
            println(value)
        }
    }
    println("當(dāng)前流執(zhí)行需要花費(fèi)的時(shí)間$time")

}

fun foo(): Flow<Int> = flow {
    for (i in 1..10) {
        delay(100)
        emit(i)
    }
}



1
4
6
9
10

我們看到前面沒(méi)有完全打印,比如4,是因?yàn)?這個(gè)數(shù)字還在處理中,而234已經(jīng)生成,所以就可以合并到4中.所以打印了4

9.2.處理最新值(Processing the latest value)

在發(fā)射端和收集端處理很慢的時(shí)候,合并加快處理速度的一種方法,如9.1,她通過(guò)丟棄發(fā)射值來(lái)實(shí)現(xiàn).另外一種方式是取消慢速的收集器,每次發(fā)送出新值的時(shí)候,重新啟動(dòng)她.比如我們可以把Conflation改成collectLatest.字面意思上就是收集最近的發(fā)射值

package com.example.kotlin01

import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking
import kotlin.system.measureTimeMillis


fun main() = runBlocking {
    val time = measureTimeMillis {
        foo().collectLatest { value ->
            delay(300)
            println(value)
        }
    }
    println("當(dāng)前流執(zhí)行需要花費(fèi)的時(shí)間$time")

}

fun foo(): Flow<Int> = flow {
    for (i in 1..10) {
        delay(100)
        emit(i)
    }
}

10
當(dāng)前流執(zhí)行需要花費(fèi)的時(shí)間1467

10.組合多個(gè)流(Composing multiple flows)

10.1.zip

和kotlin標(biāo)準(zhǔn)庫(kù)中的 Sequence.zip 擴(kuò)展函數(shù)一樣,流有一個(gè)zip合并運(yùn)算符,組合兩個(gè)流的相應(yīng)的值

package com.example.kotlin01

import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.zip
import kotlinx.coroutines.runBlocking


fun main() = runBlocking {
    val numbers = (1..3).asFlow()
    val strs = flowOf("a", "b", "c", "d")
    numbers.zip(strs) { a, b ->"$a and $b" }.collect { value ->
        println(value)
    }


}
1 and a
2 and b
3 and c
image.png

從上面我們知道兩個(gè)流是對(duì)應(yīng)合并
10.2.Combine
合并最近的

package com.example.kotlin01

import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking


fun main() = runBlocking {
    val numbers = (1..3).asFlow()
    val strs = flowOf("a", "b", "c")
    numbers.combine(strs) { a, b ->"$a and $b" }.collect { value ->
        println("$value")

    }

}

1 and a
2 and a
2 and b
3 and b
3 and c

image.png
  • (1) 1和a - >1a
  • (2) a和2 ->2a
  • (3) 2和b ->2b
  • (4) b和3 ->3b
  • (5) 3和c ->3c

11.展平流(Flattening flows)

package com.example.kotlin01

import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking


fun main() = runBlocking<Unit> {
    //map運(yùn)算符將請(qǐng)求值映射成結(jié)果值
    //傳入的是 1 2 3,得到的結(jié)果是一個(gè)flow  <flow<String>>
    (1..3).asFlow().map { request(it) }

}

fun request(request: Int): Flow<String> = flow {
    emit("這是第一個(gè) $request")
    delay(500)
    emit("這是第二個(gè) $request")
}

我們看以上的代碼,可以看到,main函數(shù)中的代碼塊得到的是一個(gè)flow <flow<String>>流.也就是流中包含了流.所以我們需要將其展平為單獨(dú)的一個(gè)流進(jìn)行處理.

11.1.flatMapConcat

使用flatMapConcat操作符.,等待內(nèi)部完成時(shí),然后收集下一個(gè)流

package com.example.kotlin01

import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking


fun main() = runBlocking<Unit> {
    //map運(yùn)算符將請(qǐng)求值映射成結(jié)果值
    //傳入的是 1 2 3啥繁,得到的結(jié)果是一個(gè)flow  <flow<String>>
    (1..3).asFlow().flatMapConcat { request(it) }.collect{ println(it)}
}

fun request(request: Int): Flow<String> = flow {
    emit("這是第一個(gè) $request")
//    delay(500)
//    emit("這是第二個(gè) $request")
}
這是第一個(gè) 1
這是第一個(gè) 2
這是第一個(gè) 3

11.2.flatMapMerge

flatMapMerge操作符模式是同時(shí)收集傳入的流,然后合并到單個(gè)流中.

package com.example.kotlin01

import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking


fun main() = runBlocking<Unit> {
    //map運(yùn)算符將請(qǐng)求值映射成結(jié)果值
    //傳入的是 1 2 3荒叶,得到的結(jié)果是一個(gè)flow  <flow<String>>
    //concurrency參數(shù)指的是限制同時(shí)收集的并發(fā)數(shù)
    //(1..3).asFlow().flatMapMerge(100, { request(it) }).collect { println(it) }
    (1..3).asFlow().flatMapMerge { request(it)  }.collect{ println(it)}
}

fun request(request: Int): Flow<String> = flow {
    emit("這是第一個(gè) $request")
//    delay(500)
//    emit("這是第二個(gè) $request")
}
這是第一個(gè) 1
這是第一個(gè) 2
這是第一個(gè) 3

11.3.flatMapLatest

和之前章節(jié)的collectLatest 操作符類(lèi)似,一旦有新流發(fā)出,將取消之前已經(jīng)發(fā)出的流.

package com.example.kotlin01

import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking


fun main() = runBlocking<Unit> {
    //map運(yùn)算符將請(qǐng)求值映射成結(jié)果值
    //傳入的是 1 2 3,得到的結(jié)果是一個(gè)flow  <flow<String>>
    (1..3).asFlow().flatMapLatest { request(it)  }.collect{ println(it)}
}

fun request(request: Int): Flow<String> = flow {
    emit("這是第一個(gè) $request")
//    delay(500)
//    emit("這是第二個(gè) $request")
}
這是第一個(gè) 1
這是第一個(gè) 2
這是第一個(gè) 3

以上我們通過(guò)三個(gè)操作符都可以將流展平.

12.流異常(Flow exceptions)

當(dāng)發(fā)射器或運(yùn)算符內(nèi)部的代碼引發(fā)異常的時(shí)候,流收集器可以結(jié)束運(yùn)行.但會(huì)出現(xiàn)異常.可以通過(guò) try catch

fun main() = runBlocking<Unit> {
    //map運(yùn)算符將請(qǐng)求值映射成結(jié)果值
    //傳入的是 1 2 3输虱,得到的結(jié)果是一個(gè)flow  <flow<String>>
    try {
        (1..3).asFlow().collect {
            println(it)
            it / 0
        }
    }
    catch (e:Exception){
        println(e.message)
    }

}

13.異常透明性

使用catch運(yùn)算符捕捉異常,catch只能收集下游的異常,如果上游的代碼出現(xiàn)異常就不會(huì)收集.

package com.example.kotlin01

import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking
import java.lang.Exception


fun main() = runBlocking<Unit> {
    request(2).catch { println("這是異常...$it") }
        .collect {
            println(it)
        }
}

fun request(request: Int): Flow<String> = flow {
    emit("這是第一個(gè) $request")
    request/0

}
這是第一個(gè) 2
這是異常...java.lang.ArithmeticException: / by zero

14.流完成(Flow completion)

流程在完成的之后(正常完成或者異常完成)可以做一些操作,可以通過(guò)兩種方式完成,命令式和聲明式

14.1.命令式

使用try/finally

package com.example.kotlin01

import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking
import java.lang.Exception


fun main() = runBlocking<Unit> {
    try {
        request(2).catch { println("這是異常...$it") }
            .collect {
                println(it)
            }
    }
    finally {
        println("done")
    }

}

fun request(request: Int): Flow<String> = flow {
    emit("這是第一個(gè) $request")
    request/0

}
這是第一個(gè) 2
這是異常...java.lang.ArithmeticException: / by zero
done

14.2.聲明式

使用onCompletion 操作符

package com.example.kotlin01

import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking
import java.lang.Exception


fun main() = runBlocking<Unit> {

    request(2).onCompletion { println("done") }.catch { println("這是異常...$it") }
        .collect {
            println(it)
        }
}

fun request(request: Int): Flow<String> = flow {
    emit("這是第一個(gè) $request")
}

這是第一個(gè) 2
done

十五、命令式還是聲明式

我們知道如何收集流,并以命令和聲明式的處理完成和異常.那到底選擇哪一種呢,根據(jù)編程愛(ài)好來(lái),我覺(jué)得使用聲明式比較舒服

十六脂凶、啟動(dòng)流(Launching flow)

我們可以使用launchIn代替collect收集流,launchIn的作用是在單獨(dú)協(xié)程中啟動(dòng)收集流.而collect不是.

package com.example.kotlin01

import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking


fun main() = runBlocking<Unit> {

    request(2)
        //onEach 是一個(gè)中間運(yùn)算符
        .onEach { println(it) }
        //單獨(dú)啟動(dòng)收集流的協(xié)程進(jìn)行對(duì)流的收集
        //.collect()
        .launchIn(this)
}

fun request(request: Int): Flow<String> = flow {
    emit("這是第一個(gè) $request")

}

這是第一個(gè) 2
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末宪睹,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子蚕钦,更是在濱河造成了極大的恐慌亭病,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,941評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件嘶居,死亡現(xiàn)場(chǎng)離奇詭異罪帖,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)邮屁,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,397評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門(mén)整袁,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人佑吝,你說(shuō)我怎么就攤上這事坐昙。” “怎么了芋忿?”我有些...
    開(kāi)封第一講書(shū)人閱讀 165,345評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵炸客,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我戈钢,道長(zhǎng)痹仙,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,851評(píng)論 1 295
  • 正文 為了忘掉前任殉了,我火速辦了婚禮开仰,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘宣渗。我一直安慰自己抖所,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,868評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布痕囱。 她就那樣靜靜地躺著田轧,像睡著了一般。 火紅的嫁衣襯著肌膚如雪鞍恢。 梳的紋絲不亂的頭發(fā)上傻粘,一...
    開(kāi)封第一講書(shū)人閱讀 51,688評(píng)論 1 305
  • 那天每窖,我揣著相機(jī)與錄音,去河邊找鬼弦悉。 笑死窒典,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的稽莉。 我是一名探鬼主播瀑志,決...
    沈念sama閱讀 40,414評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼污秆!你這毒婦竟也來(lái)了劈猪?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 39,319評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤良拼,失蹤者是張志新(化名)和其女友劉穎战得,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體庸推,經(jīng)...
    沈念sama閱讀 45,775評(píng)論 1 315
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡常侦,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,945評(píng)論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了贬媒。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片聋亡。...
    茶點(diǎn)故事閱讀 40,096評(píng)論 1 350
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖掖蛤,靈堂內(nèi)的尸體忽然破棺而出杀捻,到底是詐尸還是另有隱情,我是刑警寧澤蚓庭,帶...
    沈念sama閱讀 35,789評(píng)論 5 346
  • 正文 年R本政府宣布致讥,位于F島的核電站,受9級(jí)特大地震影響器赞,放射性物質(zhì)發(fā)生泄漏垢袱。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,437評(píng)論 3 331
  • 文/蒙蒙 一港柜、第九天 我趴在偏房一處隱蔽的房頂上張望请契。 院中可真熱鬧,春花似錦夏醉、人聲如沸爽锥。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,993評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)氯夷。三九已至,卻和暖如春靶擦,著一層夾襖步出監(jiān)牢的瞬間腮考,已是汗流浹背雇毫。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,107評(píng)論 1 271
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留踩蔚,地道東北人棚放。 一個(gè)月前我還...
    沈念sama閱讀 48,308評(píng)論 3 372
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像馅闽,于是被迫代替她去往敵國(guó)和親飘蚯。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,037評(píng)論 2 355