八. Flow 其他的操作符
8.1 Transform operators
transform
在使用 transform 操作符時汛闸,可以任意多次調(diào)用 emit 赶诊,這是 transform 跟 map 最大的區(qū)別:
fun main() = runBlocking {
(1..5).asFlow()
.transform {
emit(it * 2)
delay(100)
emit(it * 4)
}
.collect { println(it) }
}
transform 也可以使用 emit 發(fā)射任意值:
fun main() = runBlocking {
(1..5).asFlow()
.transform {
emit(it * 2)
delay(100)
emit("emit $it")
}
.collect { println(it) }
}
8.2 Size-limiting operators
take
take 操作符只取前幾個 emit 發(fā)射的值。
fun main() = runBlocking {
(1..5).asFlow()
.take(2)
.collect { println(it) }
}
8.3 Terminal flow operators
在 Kotlin Coroutines Flow 系列(一) Flow 基本使用 一文最后耗拓,我整理了 Flow 相關(guān)的 Terminal 操作符诡渴。本文介紹 reduce 和 fold 兩個操作符。
reduce
類似于 Kotlin 集合中的 reduce 函數(shù)和屎,能夠?qū)线M(jìn)行計算操作。
例如春瞬,對平方數(shù)列求和:
fun main() = runBlocking {
val sum = (1..5).asFlow()
.map { it * it }
.reduce { a, b -> a + b }
println(sum)
}
例如柴信,計算階乘:
fun main() = runBlocking {
val sum = (1..5).asFlow().reduce { a, b -> a * b }
println(sum)
}
fold
也類似于 Kotlin 集合中的 fold 函數(shù),fold 也需要設(shè)置初始值快鱼。
fun main() = runBlocking {
val sum = (1..5).asFlow()
.map { it * it }
.fold(0) { a, b -> a + b }
println(sum)
}
在上述代碼中颠印,初始值為0就類似于使用 reduce 函數(shù)實現(xiàn)對平方數(shù)列求和纲岭。
而對于計算階乘:
fun main() = runBlocking {
val sum = (1..5).asFlow().fold(1) { a, b -> a * b }
println(sum)
}
初始值為1就類似于使用 reduce 函數(shù)實現(xiàn)計算階乘抹竹。
8.4 Composing flows operators
zip
zip 是可以將2個 flow 進(jìn)行合并的操作符。
fun main() = runBlocking {
val flowA = (1..5).asFlow()
val flowB = flowOf("one", "two", "three","four","five")
flowA.zip(flowB) { a, b -> "$a and $b" }
.collect { println(it) }
}
執(zhí)行結(jié)果:
1 and one
2 and two
3 and three
4 and four
5 and five
zip 操作符會把 flowA 中的一個 item 和 flowB 中對應(yīng)的一個 item 進(jìn)行合并止潮。即使 flowB 中的每一個 item 都使用了 delay() 函數(shù)窃判,在合并過程中也會等待 delay() 執(zhí)行完后再進(jìn)行合并。
fun main() = runBlocking {
val flowA = (1..5).asFlow()
val flowB = flowOf("one", "two", "three", "four", "five").onEach { delay(100) }
val time = measureTimeMillis {
flowA.zip(flowB) { a, b -> "$a and $b" }
.collect { println(it) }
}
println("Cost $time ms")
}
執(zhí)行結(jié)果:
1 and one
2 and two
3 and three
4 and four
5 and five
Cost 561 ms
如果 flowA 中 item 個數(shù)大于 flowB 中 item 個數(shù):
fun main() = runBlocking {
val flowA = (1..6).asFlow()
val flowB = flowOf("one", "two", "three","four","five")
flowA.zip(flowB) { a, b -> "$a and $b" }
.collect { println(it) }
}
執(zhí)行合并后新的 flow 的 item 個數(shù) = 較小的 flow 的 item 個數(shù)喇闸。
執(zhí)行結(jié)果:
1 and one
2 and two
3 and three
4 and four
5 and five
combine
combine 雖然也是合并,但是跟 zip 不太一樣。
使用 combine 合并時桑阶,每次從 flowA 發(fā)出新的 item 霜运,會將其與 flowB 的最新的 item 合并。
fun main() = runBlocking {
val flowA = (1..5).asFlow().onEach { delay(100) }
val flowB = flowOf("one", "two", "three","four","five").onEach { delay(200) }
flowA.combine(flowB) { a, b -> "$a and $b" }
.collect { println(it) }
}
執(zhí)行結(jié)果:
1 and one
2 and one
3 and one
3 and two
4 and two
5 and two
5 and three
5 and four
5 and five
flattenMerge
其實刻蟹,flattenMerge 不會組合多個 flow 逗旁,而是將它們作為單個流執(zhí)行。
fun main() = runBlocking {
val flowA = (1..5).asFlow()
val flowB = flowOf("one", "two", "three","four","five")
flowOf(flowA,flowB)
.flattenConcat()
.collect{ println(it) }
}
執(zhí)行結(jié)果:
1
2
3
4
5
one
two
three
four
five
為了能更清楚地看到 flowA舆瘪、flowB 作為單個流的執(zhí)行片效,對他們稍作改動。
fun main() = runBlocking {
val flowA = (1..5).asFlow().onEach { delay(100) }
val flowB = flowOf("one", "two", "three","four","five").onEach { delay(200) }
flowOf(flowA,flowB)
.flattenMerge(2)
.collect{ println(it) }
}
執(zhí)行結(jié)果:
1
one
2
3
two
4
5
three
four
five
8.5 Flattening flows operators
flatMapConcat英古、flatMapMerge 類似于 RxJava 的 concatMap淀衣、flatMap 操作符。
flatMapConcat
flatMapConcat 由 map召调、flattenConcat 操作符實現(xiàn)膨桥。
@FlowPreview
public fun <T, R> Flow<T>.flatMapConcat(transform: suspend (value: T) -> Flow<R>): Flow<R> =
map(transform).flattenConcat()
在調(diào)用 flatMapConcat 后蛮浑,collect 函數(shù)在收集新值之前會等待 flatMapConcat 內(nèi)部的 flow 完成。
fun currTime() = System.currentTimeMillis()
var start: Long = 0
fun main() = runBlocking {
(1..5).asFlow()
.onStart { start = currTime() }
.onEach { delay(100) }
.flatMapConcat {
flow {
emit("$it: First")
delay(500)
emit("$it: Second")
}
}
.collect {
println("$it at ${System.currentTimeMillis() - start} ms from start")
}
}
執(zhí)行結(jié)果:
1: First at 114 ms from start
1: Second at 619 ms from start
2: First at 719 ms from start
2: Second at 1224 ms from start
3: First at 1330 ms from start
3: Second at 1830 ms from start
4: First at 1932 ms from start
4: Second at 2433 ms from start
5: First at 2538 ms from start
5: Second at 3041 ms from start
flatMapMerge
flatMapMerge 由 map只嚣、flattenMerge 操作符實現(xiàn)陵吸。
@FlowPreview
public fun <T, R> Flow<T>.flatMapMerge(
concurrency: Int = DEFAULT_CONCURRENCY,
transform: suspend (value: T) -> Flow<R>
): Flow<R> = map(transform).flattenMerge(concurrency)
flatMapMerge 是順序調(diào)用內(nèi)部代碼塊,并且并行地執(zhí)行 collect 函數(shù)介牙。
fun currTime() = System.currentTimeMillis()
var start: Long = 0
fun main() = runBlocking {
(1..5).asFlow()
.onStart { start = currTime() }
.onEach { delay(100) }
.flatMapMerge {
flow {
emit("$it: First")
delay(500)
emit("$it: Second")
}
}
.collect {
println("$it at ${System.currentTimeMillis() - start} ms from start")
}
}
執(zhí)行結(jié)果:
1: First at 116 ms from start
2: First at 216 ms from start
3: First at 319 ms from start
4: First at 422 ms from start
5: First at 525 ms from start
1: Second at 618 ms from start
2: Second at 719 ms from start
3: Second at 822 ms from start
4: Second at 924 ms from start
5: Second at 1030 ms from start
flatMapMerge 操作符有一個參數(shù) concurrency 壮虫,它默認(rèn)使用DEFAULT_CONCURRENCY
,如果想更直觀地了解 flatMapMerge 的并行环础,可以對這個參數(shù)進(jìn)行修改囚似。例如改成2,就會發(fā)現(xiàn)不一樣的執(zhí)行結(jié)果线得。
flatMapLatest
當(dāng)發(fā)射了新值之后饶唤,上個 flow 就會被取消。
fun currTime() = System.currentTimeMillis()
var start: Long = 0
fun main() = runBlocking {
(1..5).asFlow()
.onStart { start = currTime() }
.onEach { delay(100) }
.flatMapLatest {
flow {
emit("$it: First")
delay(500)
emit("$it: Second")
}
}
.collect {
println("$it at ${System.currentTimeMillis() - start} ms from start")
}
}
執(zhí)行結(jié)果:
1: First at 114 ms from start
2: First at 220 ms from start
3: First at 321 ms from start
4: First at 422 ms from start
5: First at 524 ms from start
5: Second at 1024 ms from start
九. Flow VS Reactive Streams
天生的多平臺支持
由于 Kotlin 語言自身對多平臺的支持贯钩,使得 Flow 也可以在多平臺上使用募狂。
互操作性
Flow 仍然屬于響應(yīng)式范疇。開發(fā)者通過 kotlinx-coroutines-reactive 模塊中 Flow.asPublisher() 和 Publisher.asFlow() 角雷,可以方便地將 Flow 跟 Reactive Streams 進(jìn)行互操作祸穷。
該系列的相關(guān)文章:
Kotlin Coroutines Flow 系列(一) Flow 基本使用
Kotlin Coroutines Flow 系列(二) Flow VS RxJava2
Kotlin Coroutines Flow 系列(三) 異常處理
Kotlin Coroutines Flow 系列(四) 線程操作