目錄
深入學(xué)習(xí)Kotlin之Flow(一),什么是Flow?Flow的基本使用)
深入學(xué)習(xí)Kotlin之Flow(二),Flow的操作符,以及協(xié)程的背壓
前言
在DataStore里面有提到 DataStore是基于協(xié)程
與Flow
實(shí)現(xiàn)的,那么什么是Flow呢?
什么是Flow
Flow 庫是在 Kotlin Coroutines 1.3.2 發(fā)布之后新增的庫,也叫做異步流
類似 RxJava 的 Observable 、 Flowable 等等,所以很多人都用 Flow 與 RxJava 做對(duì)比茄茁。
Flow 相比于 RxJava 簡單的太多了臂港,你還記得那些 RxJava 傻傻分不清楚的操作符嗎 Observable 巷蚪、 Flowable 帮孔、 Single 克胳、 Completable 德挣、 Maybe 等等恭垦。
Flow解決了什么問題
-
LiveData 是一個(gè)生命周期感知組件,最好在 View 和 ViewModel 層中使用它盲厌,如果在 Repositories 或者 DataSource 中使用會(huì)有幾個(gè)問題
- 它不支持線程切換署照,其次不支持背壓,也就是在一段時(shí)間內(nèi)發(fā)送數(shù)據(jù)的速度 > 接受數(shù)據(jù)的速度吗浩,LiveData 無法正確的處理這些請求
- 使用 LiveData 的最大問題是所有數(shù)據(jù)轉(zhuǎn)換都將在主線程上完成
- RxJava 雖然支持線程切換和背壓建芙,但是 RxJava 那么多傻傻分不清楚的操作符,實(shí)際上在項(xiàng)目中常用的可能只有幾個(gè)例如
Observable
懂扼、Flowable
禁荸、Single
等等,如果我們不去了解背后的原理阀湿,造成內(nèi)存泄露是很正常的事赶熟,大家可以從 StackOverflow 上查看一下,有很多因?yàn)?RxJava 造成內(nèi)存泄露的例子
- RxJava 入門的門檻很高陷嘴,學(xué)習(xí)過的朋友們映砖,我相信能夠體會(huì)到從入門到放棄是什么感覺
相比之下Flow有了很不錯(cuò)的優(yōu)點(diǎn):
- Flow 支持線程切換、背壓
- Flow 入門的門檻很低灾挨,沒有那么多傻傻分不清楚的操作符
- 簡單的數(shù)據(jù)轉(zhuǎn)換與操作符邑退,如 map 等等
- Flow 是對(duì) Kotlin 協(xié)程的擴(kuò)展,讓我們可以像運(yùn)行同步代碼一樣運(yùn)行異步代碼劳澄,使得代碼更加簡潔地技,提高了代碼的可讀性
- 易于做單元測試
- 解決回調(diào)地獄的問題
Flow的基本使用
Flow能夠返回多個(gè)異步值
fun simple(): Flow<Int> = flow { // 流構(gòu)建器
for (i in 1..3) {
delay(100) // 假裝我們在這里做了一些有用的事情
emit(i) // 發(fā)送下一個(gè)值
}
}
fun main() = runBlocking<Unit> {
// 啟動(dòng)并發(fā)的協(xié)程以驗(yàn)證主線程并未阻塞
launch {
for (k in 1..3) {
println("I'm not blocked $k")
delay(100)
}
}
// 收集這個(gè)流
simple().collect { value -> println(value) }
打印結(jié)果:
I'm not blocked 1
1
I'm not blocked 2
2
I'm not blocked 3
3
其中 Flow 接口,只有一個(gè) collect 函數(shù)
如果熟悉 RxJava 的話秒拔,則可以理解為 collect() 對(duì)應(yīng)subscribe()莫矗,而 emit() 對(duì)應(yīng)onNext()。
創(chuàng)建 Flow
創(chuàng)建Flow有幾種方式:
flow builder 通過
flow { ... }
(上述例子就是)flowOf()
flowOf(1,2,3,4,5)
.onEach {
delay(100)
}
.collect{
println(it)
}
- asFlow()
listOf(1, 2, 3, 4, 5).asFlow()
.onEach {
delay(100)
}.collect {
println(it)
}
- channelFlow
{
for (i in 1..5) {
delay(100)
send(i)
}
}.collect{
println(it)
}
}
最后的 channelFlow builder 跟 flow builder 是有一定差異的。
flow 是 Cold Stream
,在沒有切換線程的情況下作谚,生產(chǎn)者和消費(fèi)者是同步非阻塞的三娩。
channel 是 Hot Stream
,而 channelFlow 實(shí)現(xiàn)了生產(chǎn)者和消費(fèi)者異步非阻塞模型。
關(guān)于Cold Stream
與Hot Stream
我們后續(xù)會(huì)講
切換線程
相比于 RxJava 需要使用 observeOn食磕、subscribeOn 來切換線程尽棕,flow 會(huì)更加簡單。只需使用 flowOn彬伦,下面的例子中,展示了 flow builder 和 map 操作符都會(huì)受到 flowOn 的影響
flow {
for (i in 1..5) {
delay(100)
emit(i)
}
}.map {
it * it
}.flowOn(Dispatchers.IO)
.collect {
println(it)
}
而 collect() 指定哪個(gè)線程伊诵,則需要看整個(gè) flow 處于哪個(gè) CoroutineScope 下单绑。
例如,下面的代碼 collect() 則是在 main 線程:
fun main() = runBlocking {
flow {
for (i in 1..5) {
delay(100)
emit(i)
}
}.map {
it * it
}.flowOn(Dispatchers.IO)
.collect {
println("${Thread.currentThread().name}: $it")
}
}
運(yùn)行結(jié)果:
main: 1
main: 4
main: 9
main: 16
main: 25
flow 取消
如果 flow 是在協(xié)程被掛起了曹宴,那么 flow 是可以被取消的搂橙,否則不能取消。
fun main() = runBlocking {
withTimeoutOrNull(2500) {
flow {
for (i in 1..5) {
delay(1000)
emit(i)
}
}.collect {
println(it)
}
}
println("Done")
}
運(yùn)行結(jié)果:
1
2
Done