一. Kotlin Flow 介紹
Flow 庫是在 Kotlin Coroutines 1.3.2 發(fā)布之后新增的庫。
官方文檔給予了一句話簡單的介紹:
Flow — cold asynchronous stream with flow builder and comprehensive operator set (filter, map, etc);
Flow 從文檔的介紹來看,它有點類似 RxJava 的 Observable。因為 Observable 也有 Cold 匈棘、Hot 之分。
二. Flow 基本使用
Flow 能夠返回多個異步計算的值析命,例如下面的 flow builder :
flow {
for (i in 1..5) {
delay(100)
emit(i)
}
}.collect{
println(it)
}
其中 Flow 接口主卫,只有一個 collect 函數(shù)
public interface Flow<out T> {
@InternalCoroutinesApi
public suspend fun collect(collector: FlowCollector<T>)
}
如果熟悉 RxJava 的話,則可以理解為 collect() 對應(yīng)subscribe()
鹃愤,而 emit() 對應(yīng)onNext()
簇搅。
2.1 創(chuàng)建 flow
除了剛剛展示的 flow builder 可以用于創(chuàng)建 flow,還有其他的幾種方式:
flowOf()
flowOf(1,2,3,4,5)
.onEach {
delay(100)
}
.collect{
println(it)
}
asFlow()
listOf(1, 2, 3, 4, 5).asFlow()
.onEach {
delay(100)
}.collect {
println(it)
}
channelFlow()
channelFlow {
for (i in 1..5) {
delay(100)
send(i)
}
}.collect{
println(it)
}
最后的 channelFlow builder 跟 flow builder 是有一定差異的软吐。
flow 是 Cold Stream瘩将。在沒有切換線程的情況下,生產(chǎn)者和消費者是同步非阻塞的。
channel 是 Hot Stream姿现。而 channelFlow 實現(xiàn)了生產(chǎn)者和消費者異步非阻塞模型肠仪。
下面的代碼,展示了使用 flow builder 的情況备典,大致花費1秒:
fun main() = runBlocking {
val time = measureTimeMillis {
flow {
for (i in 1..5) {
delay(100)
emit(i)
}
}.collect{
delay(100)
println(it)
}
}
print("cost $time")
}
使用 channelFlow builder 的情況异旧,大致花費700毫秒:
fun main() = runBlocking {
val time = measureTimeMillis{
channelFlow {
for (i in 1..5) {
delay(100)
send(i)
}
}.collect{
delay(100)
println(it)
}
}
print("cost $time")
}
當(dāng)然,flow 如果切換線程的話提佣,花費的時間也是大致700毫秒吮蛹,跟使用 channelFlow builder 效果差不多。
fun main() = runBlocking {
val time = measureTimeMillis{
flow {
for (i in 1..5) {
delay(100)
emit(i)
}
}.flowOn(Dispatchers.IO)
.collect {
delay(100)
println(it)
}
}
print("cost $time")
}
2.2 切換線程
相比于 RxJava 需要使用 observeOn拌屏、subscribeOn 來切換線程潮针,flow 會更加簡單。只需使用 flowOn
槐壳,下面的例子中然低,展示了 flow builder 和 map 操作符都會受到 flowOn 的影響。
flow {
for (i in 1..5) {
delay(100)
emit(i)
}
}.map {
it * it
}.flowOn(Dispatchers.IO)
.collect {
println(it)
}
而 collect() 指定哪個線程务唐,則需要看整個 flow 處于哪個 CoroutineScope 下雳攘。
例如,下面的代碼 collect() 則是在 main 線程:
fun main() = runBlocking {
flow {
for (i in 1..5) {
delay(100)
emit(i)
}
}.map {
it * it
}.flowOn(Dispatchers.IO)
.collect {
println("${Thread.currentThread().name}: $it")
}
}
執(zhí)行結(jié)果:
main: 1
main: 4
main: 9
main: 16
main: 25
值得注意的地方枫笛,不要使用 withContext() 來切換 flow 的線程吨灭。
2.3 flow 取消
如果 flow 是在一個掛起函數(shù)內(nèi)被掛起了,那么 flow 是可以被取消的刑巧,否則不能取消喧兄。
fun main() = runBlocking {
withTimeoutOrNull(2500) {
flow {
for (i in 1..5) {
delay(1000)
emit(i)
}
}.collect {
println(it)
}
}
println("Done")
}
執(zhí)行結(jié)果:
1
2
Done
2.4 Terminal flow operators
Flow 的 API 有點類似于 Java Stream 的 API。它也同樣擁有 Intermediate Operations啊楚、Terminal Operations吠冤。
Flow 的 Terminal 運算符可以是 suspend 函數(shù),如 collect恭理、single拯辙、reduce、toList 等颜价;也可以是 launchIn 運算符涯保,用于在指定 CoroutineScope 內(nèi)使用 flow。
@ExperimentalCoroutinesApi // tentatively stable in 1.3.0
public fun <T> Flow<T>.launchIn(scope: CoroutineScope): Job = scope.launch {
collect() // tail-call
}
整理一下 Flow 的 Terminal 運算符
- collect
- single/first
- toList/toSet/toCollection
- count
- fold/reduce
- launchIn/produceIn/broadcastIn
該系列的相關(guān)文章:
Kotlin Coroutines Flow 系列(二) Flow VS RxJava2
Kotlin Coroutines Flow 系列(三) 異常處理
Kotlin Coroutines Flow 系列(四) 線程操作
Kotlin Coroutines Flow 系列(五) 其他的操作符