七. Flow 線程操作
7.1 更為簡化的線程切換
相對于 RxJava 多線程的學習曲線疹尾,F(xiàn)low 對線程的切換友好地多。
在之前的 Kotlin Coroutines Flow 系列(一) Flow 基本使用 一文中曾經介紹過 Flow 的切換線程骤肛,以及 flowOn 操作符纳本。
Flow 只需使用 flowOn 操作符,而不必像 RxJava 需要去深入理解 observeOn腋颠、subscribeOn 之間的區(qū)別繁成。
7.2 flowOn VS RxJava 的 observeOn
RxJava 的 observeOn 操作符,接收一個 Scheduler 參數(shù)淑玫,用來指定下游操作運行在特定的線程調度器 Scheduler 上巾腕。
Flow 的 flowOn 操作符,接收一個 CoroutineContext 參數(shù)絮蒿,影響的是上游的操作尊搬。
例如:
fun main() = runBlocking {
flow {
for (i in 1..5) {
delay(100)
emit(i)
}
}.map {
it * it
}.flowOn(Dispatchers.IO)
.collect {
println("${Thread.currentThread().name}: $it")
}
}
flow builder 和 map 操作符都會受到flowOn
的影響,并使用 Dispatchers.io 線程池土涝。
再例如:
val customerDispatcher = Executors.newFixedThreadPool(5).asCoroutineDispatcher()
fun main() = runBlocking {
flow {
for (i in 1..5) {
delay(100)
emit(i)
}
}.map {
it * it
}.flowOn(Dispatchers.IO)
.map {
it+1
}
.flowOn(customerDispatcher)
.collect {
println("${Thread.currentThread().name}: $it")
}
}
flow builder 和兩個 map 操作符都會受到兩個flowOn
的影響佛寿,其中 flow builder 和第一個 map 操作符跟上面的例子一樣,第二個 map 操作符會切換到指定的 customerDispatcher 線程池但壮。
7.3 buffer 實現(xiàn)并發(fā)操作
在 Kotlin Coroutines Flow 系列(二) Flow VS RxJava2 一文中冀泻,曾介紹 buffer 操作符對應 RxJava Backpressure 中的 BUFFER 策略。
事實上 buffer 操作符也可以并發(fā)地執(zhí)行任務蜡饵,它是除了使用 flowOn 操作符之外的另一種方式弹渔,只是不能顯示地指定 Dispatchers。
例如:
fun main() = runBlocking {
val time = measureTimeMillis {
flow {
for (i in 1..5) {
delay(100)
emit(i)
}
}
.buffer()
.collect { value ->
delay(300)
println(value)
}
}
println("Collected in $time ms")
}
執(zhí)行結果:
1
2
3
4
5
Collected in 1676 ms
在上述例子中溯祸,所有的 delay 所花費的時間是2000ms巾乳。然而通過 buffer 操作符并發(fā)
地執(zhí)行 emit,再順序地執(zhí)行 collect 函數(shù)后鸟召,所花費的時間在 1700ms 左右胆绊。
如果去掉 buffer 操作符。
fun main() = runBlocking {
val time = measureTimeMillis {
flow {
for (i in 1..5) {
delay(100)
emit(i)
}
}
.collect { value ->
delay(300)
println(value)
}
}
println("Collected in $time ms")
}
執(zhí)行結果:
1
2
3
4
5
Collected in 2039 ms
所花費的時間比剛才多了300多ms欧募。
7.4 并行操作
在講解并行操作之前压状,先來了解一下并發(fā)和并行的區(qū)別。
并發(fā)(concurrency):是指一個處理器同時處理多個任務跟继。
并行(parallelism):是多個處理器或者是多核的處理器同時處理多個不同的任務种冬。并行是同時發(fā)生的多個并發(fā)事件,具有并發(fā)的含義舔糖,而并發(fā)則不一定是并行娱两。
RxJava 可以借助 flatMap 操作符實現(xiàn)并行,亦可以使用 ParallelFlowable 類實現(xiàn)并行操作金吗。
下面十兢,以 flatMap 操作符為例實現(xiàn) RxJava 的并行:
Observable.range(1,100)
.flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
return Observable.just(integer)
.subscribeOn(Schedulers.io())
.map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return integer.toString();
}
});
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(String str) throws Exception {
System.out.println(str);
}
});
Flow 也有相應的操作符 flatMapMerge 可以實現(xiàn)并行。
fun main() = runBlocking {
val result = arrayListOf<Int>()
for (index in 1..100){
result.add(index)
}
result.asFlow()
.flatMapMerge {
flow {
emit(it)
}
.flowOn(Dispatchers.IO)
}
.collect { println("$it") }
}
總體而言摇庙,F(xiàn)low 相比于 RxJava 更加簡潔一些旱物。
該系列的相關文章:
Kotlin Coroutines Flow 系列(一) Flow 基本使用
Kotlin Coroutines Flow 系列(二) Flow VS RxJava2
Kotlin Coroutines Flow 系列(三) 異常處理
Kotlin Coroutines Flow 系列(五) 其他的操作符