如何表示多個值
- 掛起函數(shù)可以異步的返回單個值掌实,但是該如何異步返回多個計算好的值呢?
異步返回多個值的方案
- 集合
- 序列
- 掛起函數(shù)
- Flow
/*suspend*/ fun simpleFlow() = flow {
for(i in 1..3){
delay(1000)
emit(i)
}
}
Flow與其他方式區(qū)別
- 名為flow的Flow類型構(gòu)建器函數(shù)
- flow{...}構(gòu)建塊中的代碼可以掛起
- 函數(shù)simpleFlow不再標有suspend修飾符
- 流使用emit函數(shù)發(fā)射值
- 流使用collect函數(shù)收集值
Flow應用
-
在Android當中跨跨,文件下載是Flow的一個非常典型的應用
20211222-103630@2x.png
冷流
- Flow是一種類似于序列的冷流潮峦,flow構(gòu)建器中的代碼直到流被收集的時候才運行
fun simpleFlow() = flow {
println("Flow started")
for(i in 1..3){
delay(1000)
emit(i)
}
}
fun testFlowIsCode() = runBlocking {
val flow = simpleFlow()
println("Flow Collect")
flow.collect { println(it) }
println("Flow Collect again")
flow.collect { println(it) }
}
返回結(jié)果
Flow Collect
Flow started
1
2
3
Flow Collect again
Flow started
1
2
3
Process finished with exit code 0
根據(jù)以上返回結(jié)果可以看出代碼執(zhí)行val flow = simpleFlow()的時候沒有執(zhí)行flow{...}構(gòu)建塊中的代碼囱皿,只有調(diào)用collect的時候才執(zhí)行,這就是冷流
流的連續(xù)性
- 流的每次單獨收集都是按照順序執(zhí)行的忱嘹,除非使用特殊操作符
- 從上游到下游每個過度操作符都會處理每個發(fā)射出的值嘱腥,然后再交給末端操作符
fun testFlowContinuation() = runBlocking {
(1..5).asFlow().filter {
it%2 == 0
}.map {
"string $it"
}.collect {
println(it)
}
}
流構(gòu)建器
- flowOf構(gòu)建器定義了一個發(fā)射固定值集的流
- 使用.asFlow()擴展函數(shù),可是將各種集合與序列轉(zhuǎn)換為流
fun testFlowOf() = runBlocking {
flowOf("one", "two", "three")
.onEach {
delay(1000)//每隔1s發(fā)射一個元素
}.collect{
println(it)
}
}
流上下文
- 流的收集總是在調(diào)用協(xié)程的上下文中發(fā)生拘悦,流的該屬性稱為上下文保存
- flow{...}構(gòu)建器中的代碼必須遵循上下文保存屬性齿兔,并且不允許從其他上下文中發(fā)射(emit)
fun simpleFlow() = flow {
println1("Flow started ${Thread.currentThread().name}")
for(i in 1..3) {
delay(1000)
emit(i)
}
}
fun testFlowContext() = runBlocking {
simpleFlow().collect { value ->
println1("$value ${Thread.currentThread().name}")
}
}
看以上代碼兩個方法都存在于主線程,那如果flow{...}改變一下線程如下
fun simpleFlow() = flow {
withContext(Dispatchers.IO){
println1("Flow started ${Thread.currentThread().name}")
for(i in 1..3) {
delay(1000)
emit(i)
}
}
}
執(zhí)行一下础米,就會報錯
Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated:
Flow was collected in [BlockingCoroutine{Active}@599ad64a, BlockingEventLoop@2cbb850d],
but emission happened in [DispatchedCoroutine{Active}@42689e0c, Dispatchers.IO].
Please refer to 'flow' documentation or use 'flowOn' instead
這就證明必須遵循上下文保存屬性分苇,并且不允許從其他上下文中發(fā)射
- flowOn操作符,該函數(shù)用于更改流發(fā)射的上下文
fun simpleFlow() = flow {
println1("Flow started ${Thread.currentThread().name}")
for(i in 1..3) {
delay(1000)
emit(i)
}
}.flowOn(Dispatchers.IO)
啟動流
- 使用launchIn替換collect我們可以在單獨的協(xié)程中啟動流的收集
//事件源
fun event() = (1..3).asFlow().onEach { delay(100) }.flowOn(Dispatchers.Default)
fun testEventLaunch() = runBlocking {
event().onEach {event->
println1("Event: $event ${Thread.currentThread().name}")
}.launchIn(CoroutineScope(Dispatchers.IO)).join()
}
流的取消
- 流采用與協(xié)程同樣的協(xié)作取消屁桑。像往常一樣医寿,流的收集可以是當流在一個可取消的掛起函數(shù)(例如 delay)中掛起的時候取消
fun simpleFlow1() = flow {
println1("Flow started ${Thread.currentThread().name}")
for(i in 1..3) {
delay(1000)
emit(i)
println1("Emitting $i")
}
}.flowOn(Dispatchers.IO)
fun testCancelFlow() = runBlocking {
withTimeoutOrNull(2500) {
simpleFlow1().collect { value -> println1(value) }
}
println1("Done")
}
流的取消監(jiān)測
- 為方便起見,流構(gòu)建器對每個發(fā)射值執(zhí)行附加的 ensureActive 監(jiān)測以進行取消蘑斧,這意味著從flow{...}發(fā)出繁忙循環(huán)是可以取消的
- 出于性能原因靖秩,大多數(shù)其他流操作不會自行執(zhí)行其他取消監(jiān)測,在協(xié)程出于繁忙循環(huán)的情況下竖瘾,必須明確監(jiān)測是否取消
- 通過cancellable操作符來執(zhí)行此操作
fun testCancelFlowCheck() = runBlocking {
(1..5).asFlow().cancellable().collect {
println1(it)
if(it == 3)cancel()
}
}
背壓
水流收到與流動方向一致的壓力叫做背壓或者生產(chǎn)者的效率大于消費者的效率
- buffer(),并發(fā)運行流中發(fā)射元素的代碼,相當于把管道延長沟突,增加緩沖區(qū)
- conflate(),合并發(fā)射項捕传,不對每個值進行處理
- collectLates(),取消并重新發(fā)射最后一個值
- 當必須更改CoroutineDispatcher時惠拭,flowOn操作符使用了相同的緩存機制,但是buffer函數(shù)顯式地請求緩沖而不改變執(zhí)行上下文
fun simpleFlow2() = flow {
for(i in 1..3) {
delay(100)
emit(i)
println1("Emitting $i")
}
}
fun testFlowBackPressure() = runBlocking {
val time = measureTimeMillis {
simpleFlow2()
// .flowOn(Dispatchers.Default)
// .buffer(50)
// .conflate()
// .collectLatest {
// }
.collect {
delay(300)
println1("$it")
}
}
println1(time)
}
過渡流操作符
- 可以使用操作符轉(zhuǎn)換流庸论,就像使用集合與序列一樣
- 過渡操作符應用于上游流职辅,并返回下游流
- 這些操作符也是冷操作符,就像流一樣聂示。這類操作符本身不是掛起函數(shù)
- 它運行速度很快罐农,返回新的轉(zhuǎn)換流的定義
suspend fun performRequest(request: Int): String {
delay(1000)
return "response $request"
}
fun testTransformFlowOperator() = runBlocking {
(1..5).asFlow().transform { request ->
emit("Making request $request")
emit(performRequest(request))
}.collect {
println1(it)
}
}
限長操作符
take
fun number() = flow {
try {
emit(1)
emit(2)
emit(3)
} finally {
println1("Finally in numbers")
}
}
fun testLimitLengthOperator() = runBlocking {
number().take(2).collect { println1(it) }
}
末端操作符
- 末端操作符是在流上用于啟動流收集的掛起函數(shù)。collect是最基礎(chǔ)的末端操作符催什,但是還有另外一些方便使用的末端操作符
- 轉(zhuǎn)化為各種集合涵亏,例如toList和toSet
- 獲取第一個(first)值和確保流發(fā)射單個(single)值的操作符
- 使用reduce與fold將流規(guī)約到單個值
fun testTerminalOperator() = runBlocking {
val sum = (1..5).asFlow().map { it * it }.reduce { a, b ->
a+b
}
println1(sum)
}
fun testTerminalOperator() = runBlocking {
val sum = (1..5).asFlow()
.fold(0, {acc, i -> acc + i })//以0為初始值,求1到5的和
println1(sum)
}
組合多個流
- 就像Kotlin標準庫中的Sequence.zip擴展函數(shù)一樣蒲凶,流擁有一個zip操作符用于組合兩個流中的相關(guān)值
fun testZip() = runBlocking {
val numbers = (1..5).asFlow()
val strs = flowOf("one", "two", "three")
numbers.zip(strs) { a, b ->
"$a -> $b"
}.collect { println1(it) }
}
展平流
- 流表示異步接收的值序列气筋,所以很容易遇到這樣的情況:每個值都會觸發(fā)對另一個值序列的請求,然而旋圆,由于流具有異步的性質(zhì)宠默,因此需要不同的展平方式,為此灵巧,存在一系列的流展平操作符
- flatmapConcat 連接模式
- flatMapMerge 合并模式
- flatMapLatest最新展平模式
fun requestFlow(i: Int) = flow {
emit("$i first")
delay(500)
emit("$i second")
}
fun testFlatMapConcat() = runBlocking {
(1..3).asFlow().onEach { delay(100) }
// .map { requestFlow(it) } // 轉(zhuǎn)換后會變成 Flow<Flow<String>>因此需要展平處理
.flatMapConcat { requestFlow(it) }
// .flatMapConcat { requestFlow(it) }
// .flatMapConcat { requestFlow(it) }
.collect { println1(it) }
}
返回結(jié)果對比
flatMapConcat 模式
1 first
1 second
2 first
2 second
3 first
3 second
flatMapMerge 模式
1 first
2 first
3 first
1 second
2 second
3 second
flatMapLatest 模式
1 first
2 first
3 first
3 second
流的異常處理
- 當運算符中發(fā)射器或代碼拋出異常時搀矫,有幾種處理異常的方法
- try/catch塊
- catch函數(shù)
fun number() = flow {
try {
emit(1)
emit(2)
emit(3)
} finally {
println1("Finally in numbers")
}
}.catch { e : Throwable ->
// 在catch塊中 還可以繼續(xù)發(fā)射元素
emit(4)
}
流的完成
- 當流收集完成時(普通情況或異常情況)抹沪,它可能需要執(zhí)行一個動作。
- 命令式finally塊
- onCompletion聲明式處理
fun simpleFlow4() = (1..3).asFlow()
fun testFlowCompleteInFinally() = runBlocking {
// try {
// simpleFlow4().collect { println1(it) }
// } finally {
// println1("Done")
// }
simpleFlow4().onCompletion {exception-> //onCompletion還可以打印出上游下游異常信息
println1("Done ${exception ?: ""}")
}.collect {
println1(it)
}
}