Kotlin Flow 介紹

1.Kotlin Flow 介紹

Flow是kotlin提供的一個工具徒河,使用協(xié)程封裝成生產(chǎn)者-消費者模式娇妓,上流來負(fù)責(zé)生產(chǎn)伐庭,下流來接收消耗用爪。

A cold asynchronous data stream that sequentially emits values
 and completes normally or with an exception原押。

翻譯下就是:按順序發(fā)出值并正常完成或異常完成的Cold異步數(shù)據(jù)流。

  • Hot Observable:無論有沒有 Subscriber 訂閱项钮,事件始終都會發(fā)生班眯。當(dāng) Hot Observable 有多個訂閱者時,Hot Observable 與訂閱者們的關(guān)系是一對多的關(guān)系烁巫,可以與多個訂閱者共享信息。
  • Cold Observable :只有 Subscriber 訂閱時宠能,才開始執(zhí)行發(fā)射數(shù)據(jù)流的代碼亚隙。并且 Cold Observable 和 Subscriber 只能是一對一的關(guān)系,當(dāng)有多個不同的訂閱者時违崇,消息是重新完整發(fā)送的阿弃。也就是說對 Cold Observable 而言,有多個Subscriber的時候羞延,他們各自的事件是獨立的渣淳。

2.flow使用

image.png

2.1 Flow的創(chuàng)建

  1. 可以使用flow構(gòu)建函數(shù)構(gòu)建一個Flow類型返回值的函數(shù)
  2. flow{}構(gòu)建體中可以調(diào)用掛起函數(shù),即上流
  3. 上流使用emit函數(shù)發(fā)射值
  4. 下流使用collect函數(shù)收集值
//上流函數(shù)
fun simpleFlow() = flow {
    for (i in 1..3) {
        delay(100)
        emit(i)
    }
}
 
fun main() {
    runBlocking {
        //下流接收數(shù)據(jù)
        simpleFlow().collect { value ->
            println(value)
        }
 
        println("finished")
    }
}

結(jié)果:
1
2
3
finished

2.2 Flow是冷流伴箩,所以collect是掛起函數(shù)入愧,不是子協(xié)程,并且只有執(zhí)行collect函數(shù)時嗤谚,上流的代碼才會被執(zhí)行棺蛛,所以在一個協(xié)程中多次調(diào)用collect,它們會按順序執(zhí)行巩步。

fun simpleFlow() = flow {
    for (i in 1..3) {
        delay(100)
        emit(i)
    }
}
 
fun main() {
    runBlocking {
        simpleFlow().collect { value ->
            println(value)
        }
 
        println("collect1 finished")
 
        simpleFlow().collect { value ->
            println(value)
        }
 
        println("collect2 finished")
    }
}

結(jié)果:
1
2
3
collect1 finished
1
2
3
collect2 finished

2.3 Flow的連續(xù)性

Flow也支持函數(shù)式編程旁赊,并且從上流到下流的每個過渡操作符都會處理發(fā)射值,最終流入下流

fun main() {
    runBlocking {
        flow {
            for (i in 1..5) {
                delay(100)
                emit(i)
            }
        }.filter {
            it % 2 == 0 //只取偶數(shù)
        }.map {
            "String $it"
        }.collect {
            println(it)
        }
    }
}
結(jié)果:
String 2
String 4

2.4 Flow構(gòu)建器

  1. flow{}
flow {
    (5 .. 10).forEach {
              emit(it)
         }
}.collect{
   println(it)
}
    
  1. flowOf() 幫助可變數(shù)組生成 Flow 實例
flowOf(1,2,3,4,5).collect { println(it) }

其實flowOf調(diào)用的就是第一種flow{}椅野,分別emit發(fā)送值终畅,源碼如下:

public fun <T> flowOf(vararg elements: T): Flow<T> = flow {
    for (element in elements) {
        emit(element)
    }
}
  1. asFlow() 面向數(shù)組、列表等集合
(5 ..10).asFlow().collect { 
                    println(it)
                }
消費數(shù)據(jù)

collect 方法和 RxJava 中的 subscribe 方法一樣竟闪,都是用來消費數(shù)據(jù)的离福。
除了簡單的用法外,這里有兩個問題得注意一下:

  • collect 函數(shù)是一個 suspend 方法瘫怜,所以它必須發(fā)生在協(xié)程或者帶有 suspend 的方法里面术徊,這也是我為什么在一開始的時候啟動了
  • lifecycleScope.launch。lifecycleScope 是我使用的 Lifecycle 的協(xié)程擴(kuò)展庫當(dāng)中的鲸湃,你可以替換成自定義的協(xié)程作用域

3.切換線程

3.1切換線程使用的是flowOn操作符赠涮。

    flow {
        for (i in 1..5) {
            delay(100)
            emit(i)
        }
    }.map {
        it * it
    }.flowOn(Dispatchers.IO)
        .collect {
            println(it)
        }

簡單點理解就是flowOn之前的操作符運(yùn)行在flowOn指定的線程之內(nèi)子寓,flowOn之后的操作符運(yùn)行在整個flow運(yùn)行的CoroutineContext內(nèi)。

例如笋除,下面的代碼collect則是在main線程:

fun main() = runBlocking {
  flowOf(1,2,3,4,5)
                    .flowOn(Dispatchers.Default)
                    .collect { 
                    println(Thread.currentThread().name+" "+it) 
                }
}

打印如下:

main 1
main 2
main 3
main 4
main 5

3.2 除了使用子協(xié)程執(zhí)行上流外斜友,我們還可以使用launchIn函數(shù)來讓Flow使用全新的協(xié)程上下文。

public fun <T> Flow<T>.launchIn(scope: CoroutineScope): Job = scope.launch {
    collect() // tail-call
}

fun main() {
    runBlocking {
        flow {
            println("flow :${Thread.currentThread().name}")
            for (i in 1..5) {
                delay(100)
                emit(i)
            }
        }.flowOn(Dispatchers.Default)
            .onEach { println("collect:${Thread.currentThread().name} $it") }
            .launchIn(CoroutineScope(Dispatchers.IO))
            .join()//主線程等待這個協(xié)程執(zhí)行結(jié)束
    }
}
結(jié)果:
flow :DefaultDispatcher-worker-1
collect:DefaultDispatcher-worker-1 1
collect:DefaultDispatcher-worker-1 2
collect:DefaultDispatcher-worker-1 3
collect:DefaultDispatcher-worker-1 4
collect:DefaultDispatcher-worker-1 5

4.背壓

背壓的產(chǎn)生

  • 原因:通俗來說其實就是因為產(chǎn)生的速度和處理的速度或者說耗時不一致才導(dǎo)致了背壓的產(chǎn)生垃它。
  • 處理:主要分三種:掛起鲜屏、丟棄新的丟棄原來的国拇,我們也可以輔助設(shè)置緩沖池洛史,即暫時把值存下來。
  • 理解:通俗理解為水流管粗細(xì)的問題酱吝,如果上游的水管粗也殖,下游的水管細(xì)就會產(chǎn)生堵住的問題,當(dāng)然也有可能就是撐破了务热,水流出來了忆嗜。
    • 處理起來就是堵住,上游不要流了崎岂;
    • 當(dāng)然也可以在中間建設(shè)一個蓄水池捆毫,先把水放在蓄水池,下游可以繼續(xù)了再放下去冲甘。
    • 蓄水池當(dāng)然也有兩種方式绩卤,上游來水了,蓄水池有水损合,要不把蓄水池水放掉省艳、注入新水,或者直接放掉新水嫁审。

4.1.不處理

       var time :Long = 0

            flow {
                repeat(10){
                    delay(100)
                    emit(it)
                }
            }.onStart {
                time = System.currentTimeMillis()
            }.onCompletion {
                Log.d(TAG.TAG, "finish cost time ${System.currentTimeMillis() - time}")
            }.collect {
                delay(1000)
                Log.d(TAG.TAG, "it is $it,cost time ${System.currentTimeMillis() - time}")
            }

日志如下:

D/Test-TAG: it is 0,cost time 1118
D/Test-TAG: it is 1,cost time 2222
D/Test-TAG: it is 2,cost time 3323
D/Test-TAG: it is 3,cost time 4425
D/Test-TAG: it is 4,cost time 5529
D/Test-TAG: it is 5,cost time 6632
D/Test-TAG: it is 6,cost time 7735
D/Test-TAG: it is 7,cost time 8839
D/Test-TAG: it is 8,cost time 9942
D/Test-TAG: it is 9,cost time 11044
D/Test-TAG: finish cost time 11044

分析:

  • 可以看出耗時是以最后一次結(jié)束的時間計算的跋炕,也就是掛起,通俗理解就是下游堵住了律适,上游等著辐烂,所以耗時是下面的耗時綜合∥婊撸可以看出纠修,一般情況下,上下流執(zhí)行是同步的厂僧。

4.2 直接調(diào)用buffer(),不設(shè)特定參數(shù)

     var time :Long = 0

            flow {
                repeat(10){
                    delay(100)
                    emit(it)
                }
            }.onStart {
                time = System.currentTimeMillis()
            }.onCompletion {
                Log.d(TAG.TAG, "finish cost time ${System.currentTimeMillis() - time}")
            }.buffer().collect {
                delay(1000)
                Log.d(TAG.TAG, "it is $it,cost time ${System.currentTimeMillis() - time}")
            }

日志如下:

D/Test-TAG: finish cost time 1024
D/Test-TAG: it is 0,cost time 1115
D/Test-TAG: it is 1,cost time 2117
D/Test-TAG: it is 2,cost time 3119
D/Test-TAG: it is 3,cost time 4120
D/Test-TAG: it is 4,cost time 5122
D/Test-TAG: it is 5,cost time 6123
D/Test-TAG: it is 6,cost time 7125
D/Test-TAG: it is 7,cost time 8126
D/Test-TAG: it is 8,cost time 9127
D/Test-TAG: it is 9,cost time 10129

分析:

  • 可以看出和不處理非常類似扣草,下游耗時基本是一致的。
  • 但是finish完成的耗時只有1024,也就是全部緩存下來了辰妙,通俗理解就是水全部放在了蓄水池鹰祸,一點一點往下放。

4.3 buffer參數(shù)設(shè)置密浑,設(shè)置buffer為5蛙婴,也就是緩存5個值,三種策略分別為:

1.BufferOverflow.SUSPEND(默認(rèn))
            var time :Long = 0
            flow {
                repeat(10){
                    delay(100)
                    emit(it)
                }
            }.onStart {
                time = System.currentTimeMillis()
            }.onCompletion {
                Log.d(TAG.TAG, "finish cost time ${System.currentTimeMillis() - time}")
            }.buffer(5,BufferOverflow.SUSPEND).collect {
                delay(1000)
                Log.d(TAG.TAG, "it is $it,cost time ${System.currentTimeMillis() - time}")
            }

日志如下:

D/Test-TAG: it is 0,cost time 1120
D/Test-TAG: it is 1,cost time 2121
D/Test-TAG: it is 2,cost time 3123
D/Test-TAG: it is 3,cost time 4125
D/Test-TAG: finish cost time 4125
D/Test-TAG: it is 4,cost time 5126
D/Test-TAG: it is 5,cost time 6129
D/Test-TAG: it is 6,cost time 7131
D/Test-TAG: it is 7,cost time 8133
D/Test-TAG: it is 8,cost time 9134
D/Test-TAG: it is 9,cost time 10136

分析:

  • 可以看出在第四個值打印出來的時候完成了尔破,原因在于發(fā)送第0個街图,下面在處理,下面5個都放在了緩沖池懒构,剩下四個在掛起等待
  • 等待后面處理餐济,處理一個,緩沖池往下放一個痴脾,上游往緩沖池放一個颤介。
  • 下游四個處理完了的時候,在處理第五個赞赖,最后一個就放到了緩沖池,上游就結(jié)束了晕鹊。
2.BufferOverflow.DROP_OLDEST
            var time :Long = 0
            flow {
                repeat(10){
                    delay(100)
                    emit(it)
                }
            }.onStart {
                time = System.currentTimeMillis()
            }.onCompletion {
                Log.d(TAG.TAG, "finish cost time ${System.currentTimeMillis() - time}")
            }.buffer(5,BufferOverflow.DROP_OLDEST).collect {
                delay(1000)
                Log.d(TAG.TAG, "it is $it,cost time ${System.currentTimeMillis() - time}")
            }


打印如下:

D/Test-TAG: finish cost time 1028
D/Test-TAG: it is 0,cost time 1120
D/Test-TAG: it is 5,cost time 2122
D/Test-TAG: it is 6,cost time 3124
D/Test-TAG: it is 7,cost time 4124
D/Test-TAG: it is 8,cost time 5127
D/Test-TAG: it is 9,cost time 6128


分析:

  • 可以看出觉增,finish最先結(jié)束了蚕断,耗時只是上游的時間,每次100ms匿垄,大概就是1000ms。
  • 上游發(fā)送一個归粉,下游在處理第一個時候椿疗,上游繼續(xù)往緩沖池中放。
  • 和BufferOverflow.SUSPEND的區(qū)別在于糠悼,放入了5個(1届榄,2,3倔喂,4铝条,5),放第6個也就是5的時候席噩,不再掛起班缰,而是將1丟棄了,緩沖池變成了(2悼枢,3埠忘,4,5,6)莹妒,后續(xù)類似名船,最后就是1,2动羽,3包帚,4全被丟棄了,所以打印結(jié)果就是0运吓,5渴邦,6,7拘哨,8谋梭,9。

#######3.BufferOverflow.DROP_LATEST

            var time :Long = 0
            flow {
                repeat(10){
                    delay(100)
                    emit(it)
                }
            }.onStart {
                time = System.currentTimeMillis()
            }.onCompletion {
                Log.d(TAG.TAG, "finish cost time ${System.currentTimeMillis() - time}")
            }.buffer(5,BufferOverflow.DROP_LATEST).collect {
                delay(1000)
                Log.d(TAG.TAG, "it is $it,cost time ${System.currentTimeMillis() - time}")
            }


打印如下:

D/Test-TAG: finish cost time 1031
D/Test-TAG: it is 0,cost time 1119
D/Test-TAG: it is 1,cost time 2120
D/Test-TAG: it is 2,cost time 3122
D/Test-TAG: it is 3,cost time 4123
D/Test-TAG: it is 4,cost time 5128
D/Test-TAG: it is 5,cost time 6129

分析:

  • 可以看出倦青,finish也最先結(jié)束了瓮床,耗時只是上游的時間,每次100ms产镐,大概就是1000ms隘庄。
  • 上游發(fā)送一個,下游在處理第一個時候癣亚,上游繼續(xù)往緩沖池中放丑掺。
  • 和BufferOverflow.DROP_OLDEST的區(qū)別在于,放入了5個(1述雾,2街州,3,4玻孟,5)唆缴,放第6個也就是5的時候,不再掛起黍翎,而是將5丟棄了面徽,緩沖池不變,所以5玩敏,6斗忌,7,8旺聚,9全被丟棄了织阳,最后打印出來就是0,1砰粹,2唧躲,3造挽,4,5弄痹。

4.4 conflate conflate是buffer的簡化使用方式饭入,其實相當(dāng)于buffer設(shè)置參數(shù)為0和BufferOverflow.DROP_OLDEST。

下流來不及處理的會被丟棄掉

fun main() {
    runBlocking {
        val flow = flow {
            for (i in 1..3) {
                delay(1000)
                emit(i)
            }
        }
 
        val time = measureTimeMillis {
            flow.conflate()
                .collect {
                    delay(3000)
                    println("$it")
                }
        }
 
        println("time : $time ms")
    }
}
結(jié)果:
1
3
time : 7124 ms

4.5 collectLast可以只接收上流發(fā)射的最后一個元素.

fun main() {
    runBlocking {
        val flow = flow {
            for (i in 1..3) {
                delay(1000)
                emit(i)
            }
        }
 
        val time = measureTimeMillis {
            flow
                .collectLatest {
                    delay(3000)
                    println("$it")
                }
        }
 
        println("time : $time ms")
    }
}
time : 6144 ms

5.Flow操作符

1.transform

在使用transform操作符時肛真,可以任意多次調(diào)用emit谐丢。

runBlocking {

                (1..5).asFlow()
                    .transform {
                        emit(it * 2)
                        delay(100)
                        emit(it * 4)
                    }
                    .collect { println("transform:$it") }
            }

打印如下:

transform:2
transform:4
transform:4
transform:8
transform:6
transform:12
transform:8
transform:16
transform:10
transform:20

transform、transformLatest蚓让、transformWhile 乾忱,transform直接進(jìn)行轉(zhuǎn)換,和map不同的是transform可以控制流速历极,transformLatest則進(jìn)行最新值的轉(zhuǎn)換窄瘟,類似于mapLatest ,transformWhile則要求閉包返回一個boolean值趟卸,為true則繼續(xù)返回蹄葱,為false則后續(xù)的值全部取消。

          val flow = flow {
                repeat(10){
                    delay(10)
                    emit(it)
                }
            }
           flow.transform {
                delay(1000)
                emit(it*10)
            }.collect {
                Log.d(TAG.TAG,"transform is $it")
            }

            flow.transformLatest {
                delay(1000)
                emit("transformLatest $it")
            }.collect {
                Log.d(TAG.TAG,it)
            }

            flow.transformWhile {
                emit("transformWhile $it")
                it!=5
            }.collect {
                Log.d(TAG.TAG,it)
            }


日志如下:

2022-07-29 15:37:03.243 10589-10615/edu.test.demo D/Test-TAG: transform is 0
2022-07-29 15:37:04.255 10589-10615/edu.test.demo D/Test-TAG: transform is 10
2022-07-29 15:37:05.269 10589-10615/edu.test.demo D/Test-TAG: transform is 20
2022-07-29 15:37:06.281 10589-10615/edu.test.demo D/Test-TAG: transform is 30
2022-07-29 15:37:07.294 10589-10615/edu.test.demo D/Test-TAG: transform is 40
2022-07-29 15:37:08.306 10589-10615/edu.test.demo D/Test-TAG: transform is 50
2022-07-29 15:37:09.318 10589-10615/edu.test.demo D/Test-TAG: transform is 60
2022-07-29 15:37:10.330 10589-10615/edu.test.demo D/Test-TAG: transform is 70
2022-07-29 15:37:11.341 10589-10615/edu.test.demo D/Test-TAG: transform is 80
2022-07-29 15:37:12.353 10589-10615/edu.test.demo D/Test-TAG: transform is 90
2022-07-29 15:37:13.470 10589-10617/edu.test.demo D/Test-TAG: transformLatest 9
2022-07-29 15:37:13.483 10589-10617/edu.test.demo D/Test-TAG: transformWhile 0
2022-07-29 15:37:13.495 10589-10617/edu.test.demo D/Test-TAG: transformWhile 1
2022-07-29 15:37:13.509 10589-10617/edu.test.demo D/Test-TAG: transformWhile 2
2022-07-29 15:37:13.521 10589-10617/edu.test.demo D/Test-TAG: transformWhile 3
2022-07-29 15:37:13.532 10589-10617/edu.test.demo D/Test-TAG: transformWhile 4
2022-07-29 15:37:13.544 10589-10617/edu.test.demo D/Test-TAG: transformWhile 5

2.take

take操作符只取前幾個emit發(fā)射锄列。

  (1 .. 5).asFlow().take(2).collect {
                    println("take:$it")
                }

打印結(jié)果:

take:1
take:2

take图云、takeWhiledrop邻邮、dropWhile琼稻,take則是取幾個值返回,takeWhile按條件取值饶囚,如果滿足條件就返回,不滿足則后面全部取消鸠补。drop和take相反萝风,dropWhile和takeWhile相反。

            val flow = flow {
                repeat(10){
                    delay(10)
                    emit(it)
                }
            }
              flow.take(5).collect {
                Log.d(TAG.TAG,"take $it")
            }

            flow.takeWhile {
                it < 5
            }.collect {
                Log.d(TAG.TAG,"takeWhile $it")
            }

            flow.drop(5).collect {
                Log.d(TAG.TAG,"drop $it")
            }
            flow.dropWhile {
                it < 5
            }.collect {
                Log.d(TAG.TAG,"dropWhile $it")
            }

打印如下:

D/Test-TAG: take 0
D/Test-TAG: take 1
D/Test-TAG: take 2
D/Test-TAG: take 3
D/Test-TAG: take 4
D/Test-TAG: takeWhile 0
D/Test-TAG: takeWhile 1
D/Test-TAG: takeWhile 2
D/Test-TAG: takeWhile 3
D/Test-TAG: takeWhile 4
D/Test-TAG: drop 5
D/Test-TAG: drop 6
D/Test-TAG: drop 7
D/Test-TAG: drop 8
D/Test-TAG: drop 9
D/Test-TAG: dropWhile 5
D/Test-TAG: dropWhile 6
D/Test-TAG: dropWhile 7
D/Test-TAG: dropWhile 8
D/Test-TAG: dropWhile 9

分析:

  • 可以看出take5 就取了前面五個進(jìn)行返回紫岩,drop剛好相反规惰。
  • takeWhile則返回了滿足條件的前五個,后面的全部取消泉蝌,dropWhile剛好相反歇万。
  • 也會有人有疑問,后面的都大于等于5了勋陪,所以都取消了贪磺,那后面如果出現(xiàn)個1呢,還會不會返回诅愚,那么再看如下代碼寒锚,可以看出,后面即使出現(xiàn)滿足條件的也被全部取消了:
flow{
                emit(1)
                emit(2)
                emit(5)
                emit(1)
                emit(2)
            }.takeWhile {
                it<5
            }.collect {
                Log.d(TAG.TAG,"takeWhile $it")
            }

D/Test-TAG: takeWhile 1
D/Test-TAG: takeWhile 2

3.reduce
runBlocking {
                val sum=( 1 ..5).asFlow()
//                    .map {
//                    //println("map:${it}")
//                    it*it  }   //1,4,9,16,25

                    .reduce { a, b ->
                        println("reduce:${a},$")
                        a*b
                    }

                 println(sum)

            }

打印如下:

reduce:1,2
reduce:2,3
reduce:6,4
reduce:24,5
120

reduce理解起來稍微有點麻煩刹前,我們看看源碼實現(xiàn)加深理解:

public suspend fun <S, T : S> Flow<T>.reduce(operation: suspend (accumulator: S, value: T) -> S): S {
    var accumulator: Any? = NULL

    collect { value ->
        accumulator = if (accumulator !== NULL) {
            @Suppress("UNCHECKED_CAST")
            operation(accumulator as S, value)
        } else {
            value
        }
    }

    if (accumulator === NULL) throw NoSuchElementException("Empty flow can't be reduced")
    @Suppress("UNCHECKED_CAST")
    return accumulator as S
}

簡單點理解就是兩個元素操作之后拿到的值跟后面的元素進(jìn)行操作,用于把flow 簡化合并為一個值泳赋。

4.fold
runBlocking {
(1 ..5).asFlow().fold(2,{
                        a, b -> a * b
                })
}

5.zip

zip主要實現(xiàn)組合的功能,將兩個flow一一組合發(fā)出喇喉,其中一個結(jié)束祖今,則zip結(jié)束。

fun main() = runBlocking {

    val flowA = (1..5).asFlow()
    val flowB = flowOf("one", "two", "three", "four", "five").onEach { delay(100) }

    val time = measureTimeMillis {
        flowA.zip(flowB) { a, b -> "$a and $b" }
            .collect { println(it) }
    }

    println("Cost $time ms")
}

打印如下:

1 and one
2 and two
3 and three
4 and four
5 and five
Cost 540 ms

如果flowA中的item個數(shù)大于flowB中的item個數(shù)拣技,執(zhí)行合并后新flow的item個數(shù)=較小的flow的item個數(shù)千诬。

6.flattenMerge/flattenConcat

flattenMerge不會組合多個flow,而是將它們作為單個流執(zhí)行过咬。
flattenConcat大渤、flattenMergeflattenConcat將多個flow展平掸绞,通俗點講泵三,減少層級,flattenMergeflattenConcat類似衔掸,但是可以設(shè)置并發(fā)數(shù)烫幕。

val flowA = (1..5).asFlow()
val flowB = flowOf("one", "two", "three", "four", "five")

  flowOf(flowA,flowB).flattenMerge(2).collect {
                    println("flattenMerge:$it")
                }

                flowOf(flowA,flowB).flattenConcat().collect{println("flattenConcat:$it")}

打印如下:

flattenMerge:1
flattenMerge:2
flattenMerge:3
flattenMerge:4
flattenMerge:5
flattenMerge:one
flattenMerge:two
flattenMerge:three
flattenMerge:four
flattenMerge:five

flattenConcat:1
flattenConcat:2
flattenConcat:3
flattenConcat:4
flattenConcat:5
flattenConcat:one
flattenConcat:two
flattenConcat:three
flattenConcat:four
flattenConcat:five

展平操作符

類似于集合的集合,流里也有可能有流敞映,那么這個時候我們就需要使用展平操作符了

7.flatMapConcat
調(diào)用 FlowA.flatMapConcat(FlowB) 代碼 , 先拿到 FlowA , 然后讓 FlowA 每個元素 與 FlowB 進(jìn)行連接 , 以 FlowA 的元素順序為主導(dǎo) ;

flatMapConcat由map,flattenMerge操作符聯(lián)合完成较曼。
源碼如下:

public fun <T, R> Flow<T>.flatMapConcat(transform: suspend (value: T) -> Flow<R>): Flow<R> =
    map(transform).flattenConcat()

測試代碼:

fun currTime() = System.currentTimeMillis()

            var start: Long = 0
            runBlocking {

                (1..5).asFlow()
                    .onStart { start = currTime() }
                    .onEach { delay(100) }
                    .flatMapConcat {
                        flow {
                            emit("$it: First")
                            delay(500)
                            emit("$it: Second")
                        }
                        
                    }
                    .collect {
                        println("$it at ${System.currentTimeMillis() - start} ms from start")
                    }
            }

在調(diào)用 flatMapConcat 后,collect 函數(shù)在收集新值之前會等待 flatMapConcat 內(nèi)部的 flow 完成
打印如下:

1: First at 124 ms from start
1: Second at 625 ms from start
2: First at 726 ms from start
2: Second at 1228 ms from start
3: First at 1328 ms from start
3: Second at 1829 ms from start
4: First at 1930 ms from start
4: Second at 2431 ms from start
5: First at 2532 ms from start
5: Second at 3033 ms from start
8.flatMapMerge

調(diào)用 FlowA.flatMapMerge(FlowB) 代碼 , 先拿到 FlowB , 然后讓 FlowB 每個元素 與 FlowA 進(jìn)行結(jié)合 , 以 FlowB 的元素順序為主導(dǎo) ;

并發(fā)收集flows并且將合并它們的值為一個單一flow振愿,因此發(fā)射地值會盡快被處理捷犹。

fun requestFlow(i: Int): Flow<String> = flow {
    emit("$i: First") 
    delay(500) // wait 500 ms
    emit("$i: Second")    
}

fun main() = runBlocking<Unit> { 
    val startTime = System.currentTimeMillis() // remember the start time 
    (1..3).asFlow().onEach { delay(100) } // a number every 100 ms 
        .flatMapMerge { requestFlow(it) }                                                                           
        .collect { value -> // collect and print 
            println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
        } 
}

可以看出來flatMapMerge并發(fā)特性:

1: First at 136 ms from start
2: First at 231 ms from start
3: First at 333 ms from start
1: Second at 639 ms from start
2: Second at 732 ms from start
3: Second at 833 ms from start
9.flatMapLatest

flatMapLatest和collectLatest操作符很像,只有新flow發(fā)射了新值冕末,那么上個flow就會被取消萍歉。

fun requestFlow(i: Int): Flow<String> = flow {
    emit("$i: First") 
    delay(500) // wait 500 ms
    emit("$i: Second")    
}

fun main() = runBlocking<Unit> { 
    val startTime = System.currentTimeMillis() // remember the start time 
    (1..3).asFlow().onEach { delay(100) } // a number every 100 ms 
        .flatMapLatest { requestFlow(it) }                                                                           
        .collect { value -> // collect and print 
            println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
        } 
}

打印如下:

1: First at 142 ms from start
2: First at 322 ms from start
3: First at 425 ms from start
3: Second at 931 ms from start
10.conflate

當(dāng)一個flow表示操作的部分結(jié)果或者操作狀態(tài)更新,它可能并不需要取處理每一個值档桃,但是需要處理最近的一個值枪孩。在這種場景下,conflate操作符可以被用于忽略中間操作符藻肄。是一種對emit和collector慢處理的一種方式蔑舞,它通過丟棄一些值來實現(xiàn)。

fun foo(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(100) // pretend we are asynchronously waiting 100 ms
        emit(i) // emit next value
    }
}

fun main() = runBlocking<Unit> { 
    val time = measureTimeMillis {
        foo()
            .conflate() // conflate emissions, don't process each one
            .collect { value -> 
                delay(300) // pretend we are processing it for 300 ms
                println(value) 
            } 
    }   
    println("Collected in $time ms")
}

打印如下:

1
3
Collected in 758 ms

11. filter

filter嘹屯、filterNot攻询、filterIsInstancefilterNotNull抚垄、fliter閉包返回一個Boolean值蜕窿,為true則返回谋逻,false則不返回,filterNot剛好相反桐经;filterIsInstance則進(jìn)行類型過濾毁兆,如過濾出String或者Int等,filterNotNull則過濾null值阴挣,返回非空值气堕。

            val flow = flow {
                repeat(10){
                    delay(10)
                    emit(it)
                }
            }
            flow.filter {
                it % 2 == 0
            }.collect {
                Log.d(TAG.TAG,"filter $it")
            }

            flow.filterNot {
                it % 2 == 0
            }.collect {
                Log.d(TAG.TAG,"filterNot $it")
            }

            flow {
                emit(1)
                emit("123")
            }.filterIsInstance<String>().collect {
                Log.d(TAG.TAG,"filterIsInstance $it")
            }

            flow {
                emit(1)
                emit(null)
                emit(2)
            }.filterNotNull().collect {
                Log.d(TAG.TAG,"filterNotNull $it")
            }


打印如下:

2022-07-29 15:50:45.376 10675-10703/edu.test.demo D/Test-TAG: filter 0
2022-07-29 15:50:45.400 10675-10703/edu.test.demo D/Test-TAG: filter 2
2022-07-29 15:50:45.422 10675-10703/edu.test.demo D/Test-TAG: filter 4
2022-07-29 15:50:45.444 10675-10703/edu.test.demo D/Test-TAG: filter 6
2022-07-29 15:50:45.466 10675-10703/edu.test.demo D/Test-TAG: filter 8
2022-07-29 15:50:45.505 10675-10703/edu.test.demo D/Test-TAG: filterNot 1
2022-07-29 15:50:45.528 10675-10703/edu.test.demo D/Test-TAG: filterNot 3
2022-07-29 15:50:45.550 10675-10703/edu.test.demo D/Test-TAG: filterNot 5
2022-07-29 15:50:45.574 10675-10703/edu.test.demo D/Test-TAG: filterNot 7
2022-07-29 15:50:45.597 10675-10703/edu.test.demo D/Test-TAG: filterNot 9
2022-07-29 15:50:45.598 10675-10703/edu.test.demo D/Test-TAG: filterIsInstance 123
2022-07-29 15:50:45.600 10675-10703/edu.test.demo D/Test-TAG: filterNotNull 1
2022-07-29 15:50:45.600 10675-10703/edu.test.demo D/Test-TAG: filterNotNull 2

12.merge

是將兩個flow合并起來,將每個值依次發(fā)出來

            val flow1  = listOf(1,2).asFlow()
            val flow2 = listOf("one","two","three").asFlow()
            merge(flow1,flow2).collect {value->
                Log.d(TAG.TAG,value.toString())
            }


打印如下:

D/Test-TAG: 1
D/Test-TAG: 2
D/Test-TAG: one
D/Test-TAG: two
D/Test-TAG: three

可以看出merge在將flow1和flow2合并之后將五個值依次發(fā)送出來畔咧。

13. retry

retry茎芭、retryWhen retry為retryWhen的簡化版本,可設(shè)置重試次數(shù)誓沸,以及在閉包內(nèi)重試開關(guān)梅桩。
retryWhen控制重試,兩個回調(diào)參數(shù)cause為發(fā)生的異常拜隧,attempt為當(dāng)前重試下標(biāo)宿百,從0開始。

            flow<Int> {
                if (index < 2) {
                    index++
                    throw RuntimeException("runtime exception index $index")
                }
                emit(100)
            }.retry(2).catch {
                Log.e(TAG.TAG, "ex is $it")
            }.collect {
                Log.d(TAG.TAG, "retry(2)  $it")
            }
            index = 0
            flow<Int> {
                if (index < 2) {
                    index++
                    throw RuntimeException("runtime exception index $index")
                }
                emit(100)
            }.retry {
                it is RuntimeException
            }.catch {
                Log.e(TAG.TAG, "ex is $it")
            }.collect {
                Log.d(TAG.TAG, "retry{}  $it")
            }


            index = 0
            flow<Int> {
                if (index < 2) {
                    index++
                    throw RuntimeException("runtime exception index $index")
                }
                emit(100)
            }.retryWhen { cause, attempt ->
                Log.d(TAG.TAG, "cause is $cause,attempt is $attempt")
                cause is RuntimeException
            } .catch {
                Log.e(TAG.TAG, "ex is $it")
            }.collect {
                Log.d(TAG.TAG, "retryWhen  $it")
            }


打印如下:

D/Test-TAG: retry(2)  100
D/Test-TAG: retry{}  100
D/Test-TAG: cause is java.lang.RuntimeException: runtime exception index 1,attempt is 0
D/Test-TAG: cause is java.lang.RuntimeException: runtime exception index 2,attempt is 1
D/Test-TAG: retryWhen  100

分析:

  • 可以看出雖然在一定條件會拋出異常洪添,但是100這個值都提交成功了垦页,這就是重試retry的作用。
  • retry的次數(shù)和閉包返回值可以同時設(shè)置干奢,兩個值為并列關(guān)系痊焊,如果一個不滿足則不會重試,次數(shù)的默認(rèn)值為Int.MAX_VALUE忿峻,閉包的返回值默認(rèn)為true薄啥,所以我們也可以不設(shè)置值,直接調(diào)用retry()也可以實現(xiàn)重試的效果逛尚。
  • retryWhen和retry一致罪佳,閉包返回true則重試,返回false則不再重試黑低。

5.Flow的異常處理

當(dāng)運(yùn)算符中的發(fā)射器或代碼拋出異常,可以有兩種方式處理
1.try catch
2.catch函數(shù)

1.try catch適用于收集時發(fā)生的異常
fun main() {
    runBlocking {
        val flow = flow {
            for (i in 1..3) {
                emit(i)
            }
        }
 
        try {
            flow.collect {
                println(it)
                throw RuntimeException()
            }
        } catch (e: Exception) {
            print("caught: $e")
        }
    }
}
2.雖然上流也可以使用try catch酌毡,但是更推薦catch函數(shù)
fun main() {
    runBlocking {
        val flow = flow {
            for (i in 1..3) {
                emit(i)
                throw RuntimeException()
            }
        }.catch { e ->
            print("caught1: $e")
        }.collect {
            println(it)
        }
    }
}

6.Flow的完成

1.有時候我們需要在Flow完成時克握,做一些其他事情,可以使用下面的方式

fun main() {
    runBlocking {
        try{
            val flow = flow {
                for (i in 1..3) {
                    emit(i)
                }
            }.collect {
                println(it)
            }
        }finally {
            println("done")            
        }
    }
}

2.onCompletion函數(shù)

fun main() {
    runBlocking {
        val flow = flow {
            for (i in 1..3) {
                emit(i)
            }
        }.onCompletion {
            println("done")
        }.collect {
            println(it)
        }
    }
}

7.取消Flow

Flow也是可以被取消的枷踏,最常用的方式就是通過withTimeoutOrNull來取消菩暗,代碼如下所示。

MainScope().launch {
    withTimeoutOrNull(2500) {
        flow {
            for (i in 1..5) {
                delay(1000)
                emit(i)
            }
        }.collect {
            Log.d("xys", "Flow: $it")
        }
    }
}

這樣當(dāng)輸出1旭蠕、2之后停团,F(xiàn)low就被取消了旷坦。

Flow的取消,實際上就是依賴于協(xié)程的取消佑稠。

8.Flow的同步非阻塞模型

首先秒梅,我們要理解下,什么叫同步非阻塞舌胶,默認(rèn)場景下捆蜀,F(xiàn)low在沒有切換線程的時候,運(yùn)行在協(xié)程作用域指定的線程幔嫂,這就是同步辆它,那么非阻塞又是什么呢?我們知道emit和collect都是suspend函數(shù)履恩,所謂suspend函數(shù)锰茉,就是會掛起,將CPU資源讓出去切心,這就是非阻塞飒筑,因為suspend了就可以讓一讓,讓給誰呢昙衅?讓給其它需要執(zhí)行的函數(shù)扬霜,執(zhí)行完畢后,再把資源還給我而涉。

flow {
    for (i in 0..3) {
        emit(i)
    }
}.onStart {
    Log.d("xys", "Start Flow in ${Thread.currentThread().name}")
}.onEach {
    Log.d("xys", "emit value---$it")
}.collect {
    Log.d("xys", "Result---$it")
}

輸出為:

D/xys: Start Flow in main
D/xys: emit value---0
D/xys: Result---0
D/xys: emit value---1
D/xys: Result---1
D/xys: emit value---2
D/xys: Result---2
D/xys: emit value---3
D/xys: Result---3

可以發(fā)現(xiàn)著瓶,emit一個,collect拿一個啼县,這就是同步非阻塞材原,互相謙讓,這樣誰都可以執(zhí)行季眷,看上去flow中的代碼和collect中的代碼余蟹,就是同步執(zhí)行的。

9. 異步非阻塞模型

假如我們給Flow增加一個線程切換子刮,讓Flow執(zhí)行在子線程威酒,同樣是上面的代碼,我們再來看下執(zhí)行情況

flow {
    for (i in 0..3) {
        emit(i)
    }
}.onStart {
    Log.d("xys", "Start Flow in ${Thread.currentThread().name}")
}.onEach {
    Log.d("xys", "emit value---$it")
}.flowOn(Dispatchers.IO).collect {
    Log.d("xys", "Collect Flow in ${Thread.currentThread().name}")
    Log.d("xys", "Result---$it")
}

輸出為:

D/xys: Start Flow in DefaultDispatcher-worker-1
D/xys: emit value---0
D/xys: emit value---1
D/xys: emit value---2
D/xys: emit value---3
D/xys: Collect Flow in main
D/xys: Result---0
D/xys: Collect Flow in main
D/xys: Result---1
D/xys: Collect Flow in main
D/xys: Result---2
D/xys: Collect Flow in main
D/xys: Result---3

這個時候挺峡,F(xiàn)low就變成了異步非阻塞模型葵孤,異步呢,就更好理解了橱赠,因為在不同線程尤仍,而此時的非阻塞,就沒什么意義了狭姨,由于flow代碼先執(zhí)行宰啦,而這里的代碼由于沒有delay苏遥,所以是同步執(zhí)行的,執(zhí)行的同時赡模,collect在主線程進(jìn)行監(jiān)聽田炭。

除了使用flowOn來切換線程,使用channelFlow也可以實現(xiàn)異步非阻塞模型纺裁。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末诫肠,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子欺缘,更是在濱河造成了極大的恐慌栋豫,老刑警劉巖,帶你破解...
    沈念sama閱讀 216,470評論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件谚殊,死亡現(xiàn)場離奇詭異丧鸯,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)嫩絮,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,393評論 3 392
  • 文/潘曉璐 我一進(jìn)店門丛肢,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人剿干,你說我怎么就攤上這事蜂怎。” “怎么了置尔?”我有些...
    開封第一講書人閱讀 162,577評論 0 353
  • 文/不壞的土叔 我叫張陵杠步,是天一觀的道長。 經(jīng)常有香客問我榜轿,道長幽歼,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,176評論 1 292
  • 正文 為了忘掉前任谬盐,我火速辦了婚禮甸私,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘飞傀。我一直安慰自己皇型,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 67,189評論 6 388
  • 文/花漫 我一把揭開白布砸烦。 她就那樣靜靜地躺著犀被,像睡著了一般。 火紅的嫁衣襯著肌膚如雪外冀。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,155評論 1 299
  • 那天掀泳,我揣著相機(jī)與錄音雪隧,去河邊找鬼西轩。 笑死,一個胖子當(dāng)著我的面吹牛脑沿,可吹牛的內(nèi)容都是我干的藕畔。 我是一名探鬼主播,決...
    沈念sama閱讀 40,041評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼庄拇,長吁一口氣:“原來是場噩夢啊……” “哼注服!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起措近,我...
    開封第一講書人閱讀 38,903評論 0 274
  • 序言:老撾萬榮一對情侶失蹤溶弟,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后瞭郑,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體辜御,經(jīng)...
    沈念sama閱讀 45,319評論 1 310
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,539評論 2 332
  • 正文 我和宋清朗相戀三年屈张,在試婚紗的時候發(fā)現(xiàn)自己被綠了擒权。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,703評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡阁谆,死狀恐怖碳抄,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情场绿,我是刑警寧澤剖效,帶...
    沈念sama閱讀 35,417評論 5 343
  • 正文 年R本政府宣布,位于F島的核電站裳凸,受9級特大地震影響贱鄙,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜姨谷,卻給世界環(huán)境...
    茶點故事閱讀 41,013評論 3 325
  • 文/蒙蒙 一逗宁、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧梦湘,春花似錦瞎颗、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,664評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至瓣颅,卻和暖如春倦逐,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背宫补。 一陣腳步聲響...
    開封第一講書人閱讀 32,818評論 1 269
  • 我被黑心中介騙來泰國打工檬姥, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留曾我,地道東北人。 一個月前我還...
    沈念sama閱讀 47,711評論 2 368
  • 正文 我出身青樓健民,卻偏偏與公主長得像抒巢,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子秉犹,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,601評論 2 353

推薦閱讀更多精彩內(nèi)容