在Kotlin普及之前,RxJava
無疑是Android開發(fā)領(lǐng)域中最受歡迎的響應(yīng)式編程的三方庫元镀,而RxJava
在我們?nèi)粘5腁ndroid開發(fā)應(yīng)用的最多的場景就是配合Retrofit
進行網(wǎng)絡(luò)請求和類似EventBus
的事件訂閱(RxBus)绍填。但是到了2017年,隨著LiveData
剛一面世栖疑,就受到了很大的關(guān)注讨永,LiveData
是一個以觀察者模式為核心,讓界面對變量進行訂閱蔽挠,從而實現(xiàn)自動通知刷新的組件住闯。跟一般的訂閱比起來,LiveData
有兩大特點:一是他的目標(biāo)非常直接澳淑,直指界面刷新比原,所以它的數(shù)據(jù)更新只發(fā)生在主線程。二是它借助了另外一個組件Lifecycle
的功能杠巡,讓它可以只在界面到了前臺的時候才通知更新量窘,避免了浪費性能。并且LiveData
相比RxJava
來說也有兩大優(yōu)點:
-
LiveData
的學(xué)習(xí)成本比較低 -
LiveData
相比較于RxJava
要輕量級很多
所以在一些簡單場景人們逐漸從RxJava
過渡到LiveData
氢拥,而一些比較復(fù)雜的場景還是使用RxJava
蚌铜,因為LiveData
的輕量級也決定了它不夠強大锨侯,不適合一些復(fù)雜場景。而隨著Kotlin協(xié)程庫的更新冬殃,Flow
誕生了囚痴。
Flow
庫是在 Kotlin Coroutines 1.3.2 發(fā)布之后新增的庫。從文檔的介紹來看Flow
有點類似 RxJava
审葬,都是一種基于事件的響應(yīng)式編程深滚。那么接下來我們就看一下Flow
的基本使用。
1.創(chuàng)建Flow
fun simpleFlow(): Flow<Int> = flow { // flow builder
for (i in 1..3) {
delay(100) // pretend we are doing something useful here
emit(i) // emit next value
}
}
fun main() = runBlocking<Unit> {
// Collect the flow
simpleFlow().collect { value -> println(value) }
}
通過上面例子可以看到涣觉,F(xiàn)low有以下特征:
可以用flow{ ... } 構(gòu)建一個Flow類型
flow { ... }內(nèi)可以使用suspend函數(shù).
simpleFlow()不需要是suspend函數(shù)
emit方法用來發(fā)射數(shù)據(jù)
collect方法用來遍歷結(jié)果
2.Flow是冷流
Flow
是一種冷流痴荐,Flow
構(gòu)建器中的代碼只有在collect
函數(shù)被執(zhí)行的時候才會運行。這一點與 Channel
正對應(yīng):Channel
的發(fā)送端并不依賴于接收端官册。
fun simpleFlow2() = flow<Int> {
println("Flow started")
for (i in 1..3) {
delay(1000)
emit(i)
}
}
@Test
fun `test flow is cold`() = runBlocking<Unit> {
val flow = simpleFlow2()
println("Calling collect...")
flow.collect { value -> println(value) }
println("Calling collect again...")
flow.collect { value -> println(value) }
}
3.Flow是具有連續(xù)性的流
流的每次單獨收集都是按順序執(zhí)行的生兆,除非使用了特殊的操作符,從上游到下游每個過渡操作符都會處理每個發(fā)射出的值膝宁,然后再交給末端操作符
@Test
fun `test flow continuation`() = runBlocking<Unit> {
(1..5).asFlow().filter {
it % 2 == 0
}.map {
"string $it"
}.collect {
println("Collect $it")
}
}
4.Flow的構(gòu)建器
通常情況下有兩種方式可以構(gòu)建一個Flow
,一種是通過flowOf
構(gòu)建器定義一個發(fā)射固定值集的流
flowOf("one","two","three")
.onEach { delay(1000) }
.collect { value ->
println(value)
}
另一種方式是使用.asFlow()
擴展函數(shù)可以將各種集合與序列轉(zhuǎn)換為Flow
(1..3).asFlow().collect { value ->
println(value)
}
5.Flow的上下文
-
Flow
的收集總是在調(diào)用協(xié)程的上下文中發(fā)生的鸦难,Flow
的該屬性稱為上下文保存
fun simpleFlow3() = flow<Int> {
println("Flow started ${Thread.currentThread().name}")
for (i in 1..3) {
delay(1000)
emit(i)
}
}
@Test
fun `test flow context`() = runBlocking<Unit> {
simpleFlow3()
.collect { value -> println("Collected $value ${Thread.currentThread().name}") }
}
-
flow{...}
構(gòu)建器中的代碼必須遵循上下文保存屬性,并且不允許從其他上下文中發(fā)射(emit)
fun simpleFlow4() = flow<Int> {
withContext(Dispatchers.IO) {
println("Flow started ${Thread.currentThread().name}")
for (i in 1..3) {
delay(1000)
emit(i)
}
}
} //Error
-
flowOn
操作符用于更改流發(fā)射的上下文
fun simpleFlow5() = flow<Int> {
println("Flow started ${Thread.currentThread().name}")
for (i in 1..3) {
delay(1000)
emit(i)
}
}.flowOn(Dispatchers.Default)
6.分離 Flow 的消費和觸發(fā)
我們除了可以在 collect
處消費 Flow 的元素以外昆汹,還可以通過 onEach
來做到這一點明刷。這樣消費的具體操作就不需要與末端操作符放到一起,collect
函數(shù)可以放到其他任意位置調(diào)用满粗,例如:
fun createFlow() = flow<Int> {
(1..3).forEach {
emit(it)
delay(100)
}
}.onEach { println(it) }
fun main(){
GlobalScope.launch {
createFlow().collect()
}
}
7.Flow的取消
Flow
本身并沒有提供取消操作辈末, Flow 的消費依賴于 collect
這樣的末端操作符,而它們又必須在協(xié)程當(dāng)中調(diào)用映皆,因此 Flow
的取消主要依賴于末端操作符所在的協(xié)程的狀態(tài)挤聘。像往常一樣,Flow
的收集可以是當(dāng)流在一個可取消的掛起函數(shù)中取消的時候取消捅彻。
fun simpleFlow6() = flow<Int> {
for (i in 1..3) {
delay(1000)
emit(i)
println("Emitting $i")
}
}
@Test
fun `test cancel flow`() = runBlocking<Unit> {
withTimeoutOrNull(2500) {
simpleFlow6().collect { value -> println(value) }
}
println("Done")
}
8.Flow的取消檢測
- 為方便起見组去,流構(gòu)建器對每個發(fā)射值執(zhí)行附加的enureActive檢測以進行取消,這意味著從
flow{...}
發(fā)出的繁忙循環(huán)是可以取消的
fun simpleFlow7() = flow<Int> {
for (i in 1..5) {
emit(i)
println("Emitting $i")
}
}
@Test
fun `test cancel flow check`() = runBlocking<Unit> {
simpleFlow7().collect { value ->
println(value)
if (value == 3) cancel()
}
}
- 出于性能原因步淹,大多數(shù)其他流操作不會自行執(zhí)行其他取消檢測从隆,在協(xié)程出于繁忙循環(huán)的情況下,必須明確檢測是否取消缭裆。
@Test
fun `test cancel flow check`() = runBlocking<Unit> {
(1..5).asFlow().collect { value ->
println(value)
if (value == 3) cancel()
}
}
- 一般使用
cancellable
操作符來執(zhí)行此操作
@Test
fun `test cancel flow check`() = runBlocking<Unit> {
(1..5).asFlow().cancellable().collect { value ->
println(value)
if (value == 3) cancel()
}
}
9.Flow的背壓
只要是響應(yīng)式編程键闺,就一定會有背壓問題,生產(chǎn)者的生產(chǎn)速率高于消費者的處理速率的情況下出現(xiàn)澈驼。為了保證數(shù)據(jù)不丟失辛燥,我們也會考慮添加緩存來緩解問題,buffer
的本質(zhì)是并發(fā)運行流中發(fā)射元素的代碼
fun simpleFlow8() = flow<Int> {
for (i in 1..3) {
delay(100)
emit(i)
println("Emitting $i ${Thread.currentThread().name}")
}
}
@Test
fun `test flow back pressure`() = runBlocking<Unit> {
val time = measureTimeMillis {
simpleFlow8()
.buffer(50)
.collect { value ->
delay(300)
println("Collected $value ${Thread.currentThread().name}")
}
}
println("Collected in $time ms") //1028ms
}
不過,如果我們只是單純地添加緩存挎塌,而不是從根本上解決問題就始終會造成數(shù)據(jù)積壓徘六。問題產(chǎn)生的根本原因是生產(chǎn)和消費速率的不匹配,除直接優(yōu)化消費者的性能以外榴都,我們也可以采取一些取舍的手段待锈。第一種是 conflate
,conflate()
的策略是如果緩存池滿了,新數(shù)據(jù)會覆蓋老數(shù)據(jù)
fun simpleFlow8() = flow<Int> {
for (i in 1..3) {
delay(100)
emit(i)
println("Emitting $i ${Thread.currentThread().name}")
}
}
@Test
fun `test flow back pressure`() = runBlocking<Unit> {
val time = measureTimeMillis {
simpleFlow8()
.conflate()
.collect { value ->
println("Collected start $value ${Thread.currentThread().name}")
delay(300)
println("Collected end $value ${Thread.currentThread().name}")
}
}
println("Collected in $time ms") //770ms
}
第二種是 collectLatest
缭贡。顧名思義炉擅,只處理最新的數(shù)據(jù),這看上去似乎與 conflate
沒有區(qū)別阳惹,其實區(qū)別大了:它并不會直接用新數(shù)據(jù)覆蓋老數(shù)據(jù),而是每一個都會被處理眶俩,只不過如果前一個還沒被處理完后一個就來了的話莹汤,處理前一個數(shù)據(jù)的邏輯就會被取消。
fun simpleFlow8() = flow<Int> {
for (i in 1..3) {
delay(100)
emit(i)
println("Emitting $i ${Thread.currentThread().name}")
}
}
@Test
fun `test flow back pressure`() = runBlocking<Unit> {
val time = measureTimeMillis {
simpleFlow8()
.collectLatest { value ->
println("Collected start $value ${Thread.currentThread().name}")
delay(300)
println("Collected $value ${Thread.currentThread().name}")
}
}
println("Collected in $time ms")//785ms
}
除 collectLatest
之外還有 mapLatest
颠印、flatMapLatest
等等纲岭,都是這個作用。
10.Flow的操作符
- 轉(zhuǎn)換操作符(過渡操作符)
- 可以使用操作符轉(zhuǎn)換流线罕,就像使用集合與序列一樣
- 過渡操作符應(yīng)用于上游流止潮,并返回下游流
- 這些操作符也是冷操作符,并且這類操作符本身并不是掛起函數(shù)
- 運行速度很快钞楼,返回新的轉(zhuǎn)換流的定義
suspend fun performRequest(request: Int): String {
delay(1000)
return "response $request"
}
@Test
fun `test transform flow operator1`() = runBlocking<Unit> {
(1..3).asFlow()
.map { request -> performRequest(request) }
.collect { value -> println(value) }
}
@Test
fun `test transform flow operator2`() = runBlocking<Unit> {
(1..3).asFlow()
.transform { request ->
emit("Making request $request")
emit(performRequest(request))
}.collect { value -> println(value) }
}
- 限長操作符
fun numbers() = flow<Int> {
emit(1)
emit(2)
emit(3)
}
@Test
fun `test limit length operator`() = runBlocking<Unit> {
numbers().take(2).collect { value -> println(value) }
}
- 末端操作符
末端操作符是在流上用于啟動流收集的掛起函數(shù)喇闸。collect
是最基礎(chǔ)的末端操作符,功能與 RxJava 的 subscribe
類似询件。但還有另外一些更方便使用的末端操作符,大體分為兩類:
- 集合類型轉(zhuǎn)換操作燃乍,包括
toList
、toSet
等宛琅。 - 聚合操作刻蟹,包括將 Flow 規(guī)約到單值的
reduce
、fold
等操作嘿辟,以及獲得單個元素的操作包括single
舆瘪、singleOrNull
、first
等红伦。
實際上英古,識別是否為末端操作符,還有一個簡單方法色建,由于 Flow 的消費端一定需要運行在協(xié)程當(dāng)中哺呜,因此末端操作符都是掛起函數(shù)。
@Test
fun `test terminal operator`() = runBlocking<Unit> {
val sum = (1..5).asFlow()
.map { it * it }
.reduce { a, b -> a + b }
println(sum)
}
- 組合操作符
就像Kotlin標(biāo)準(zhǔn)庫中的Sequence.zip()
擴展函數(shù)一樣,可以使用zip
操作符組合兩個流中的相關(guān)值
@Test
fun `test zip`() = runBlocking<Unit> {
val numbs = (1..3).asFlow()
val strs = flowOf("One", "Two", "Three")
numbs.zip(strs) { a, b -> "$a -> $b" }.collect { println(it) }
}
combine 雖然也是合并某残,但是跟 zip 不太一樣国撵。
使用 combine 合并時,每次從 flowA 發(fā)出新的 item 玻墅,會將其與 flowB 的最新的 item 合并介牙。
fun main() = runBlocking {
val flowA = (1..5).asFlow().onEach { delay(100) }
val flowB = flowOf("one", "two", "three","four","five").onEach { delay(200) }
flowA.combine(flowB) { a, b -> "$a and $b" }
.collect { println(it) }
}
1 and one
2 and one
3 and one
3 and two
4 and two
5 and two
5 and three
5 and four
5 and five
- 展平操作符
Flow
表示異步接收的值序列,所以很容易遇到這樣的情況:每個值都會觸發(fā)對另一個值序列的請求澳厢。然而由于流具有“異步”的性質(zhì)环础,因此需要不同的展平模式,為此剩拢,存在一系列的展平操作符:
flatMapConcat
flatMapMerge
flatMapLatest
fun requestFlow(i: Int) = flow<String> {
emit("$i: First")
delay(500)
emit("$i: Second")
}
@Test
fun `test map`() = runBlocking<Unit> {
//Flow<Flow<String>>
val startTime = System.currentTimeMillis()
(1..3).asFlow()
.map { requestFlow(it) }
.collect { println("$it at ${System.currentTimeMillis() - startTime} ms from start") }
}
@Test
fun `test flatMapConcat`() = runBlocking<Unit> {
val startTime = System.currentTimeMillis()
(1..3).asFlow()
.onEach { delay(100) }
.flatMapConcat { requestFlow(it) }
.collect { println("$it at ${System.currentTimeMillis() - startTime} ms from start") }
}
1: First
1: Second
2: First
2: Second
3: First
3: Second
@Test
fun `test flatMapMerge`() = runBlocking<Unit> {
val startTime = System.currentTimeMillis()
(1..3).asFlow()
.onEach { delay(100) }
.flatMapMerge { requestFlow(it) }
.collect { println("$it at ${System.currentTimeMillis() - startTime} ms from start") }
}
1: First
2: First
3: First
1: Second
2: Second
3: Second
@Test
fun `test flatMapLatest`() = runBlocking<Unit> {
val startTime = System.currentTimeMillis()
(1..3).asFlow()
.onEach { delay(100) }
.flatMapLatest { requestFlow(it) }
.collect { println("$it at ${System.currentTimeMillis() - startTime} ms from start") }
}
1: First
2: First
3: First
3: Second
11.Flow的異常處理
當(dāng)運算符中的發(fā)射器或者代碼拋出異常時线得,通常有一下兩個處理方法:
第一種是 try/catch
代碼塊
fun simpleFlow() = flow<Int> {
for (i in 1..3) {
println("Emitting $i")
emit(i)
}
}
@Test
fun `test flow exception`() = runBlocking<Unit> {
try {
simpleFlow().collect { value ->
println(value)
check(value <= 1) { "Collected $value" }
}
} catch (e: Throwable) {
println("Caught $e")
}
}
第二種是通過catch()
函數(shù),但是只能捕獲上游異常
@Test
fun `test flow exception2`() = runBlocking<Unit> {
flow {
emit(1)
throw ArithmeticException("Div 0")
}.catch { e: Throwable -> println("Caught $e") }
.flowOn(Dispatchers.IO)
.collect { println(it) }
}
異承旆ィ恢復(fù)
@Test
fun `test flow exception2`() = runBlocking<Unit> {
flow {
throw ArithmeticException("Div 0")
emit(1)
}.catch { e: Throwable ->
println("Caught $e")
emit(10)
}.collect { println(it) }
}
12.Flow的完成
當(dāng)流收集完成時贯钩,它可能需要執(zhí)行一個動作
- 命令式
finally
代碼塊 -
onCompletion
聲明式處理,onCompletion
用起來比較類似于try ... catch ... finally
中的finally
办素,無論前面是否存在異常角雷,它都會被調(diào)用,參數(shù) t 則是前面未捕獲的異常性穿。
flow {
emit(1)
throw ArithmeticException("Div 0")
}.catch { t: Throwable ->
println("caught error: $t")
}.onCompletion { t: Throwable? ->
println("finally.")
}