Kotlin - 協(xié)程 - Flow

在Kotlin普及之前,RxJava無疑是Android開發(fā)領(lǐng)域中最受歡迎的響應(yīng)式編程的三方庫元镀,而RxJava在我們?nèi)粘5腁ndroid開發(fā)應(yīng)用的最多的場景就是配合Retrofit進行網(wǎng)絡(luò)請求和類似EventBus的事件訂閱(RxBus)绍填。但是到了2017年,隨著LiveData剛一面世栖疑,就受到了很大的關(guān)注讨永,LiveData是一個以觀察者模式為核心,讓界面對變量進行訂閱蔽挠,從而實現(xiàn)自動通知刷新的組件住闯。跟一般的訂閱比起來,LiveData有兩大特點:一是他的目標(biāo)非常直接澳淑,直指界面刷新比原,所以它的數(shù)據(jù)更新只發(fā)生在主線程。二是它借助了另外一個組件Lifecycle的功能杠巡,讓它可以只在界面到了前臺的時候才通知更新量窘,避免了浪費性能。并且LiveData相比RxJava來說也有兩大優(yōu)點:

  • LiveData的學(xué)習(xí)成本比較低
  • LiveData相比較于RxJava要輕量級很多

所以在一些簡單場景人們逐漸從RxJava過渡到LiveData氢拥,而一些比較復(fù)雜的場景還是使用RxJava蚌铜,因為LiveData的輕量級也決定了它不夠強大锨侯,不適合一些復(fù)雜場景。而隨著Kotlin協(xié)程庫的更新冬殃,Flow誕生了囚痴。

Flow 庫是在 Kotlin Coroutines 1.3.2 發(fā)布之后新增的庫。從文檔的介紹來看Flow有點類似 RxJava审葬,都是一種基于事件的響應(yīng)式編程深滚。那么接下來我們就看一下Flow的基本使用。

1.創(chuàng)建Flow

fun simpleFlow(): Flow<Int> = flow { // flow builder

    for (i in 1..3) {

        delay(100) // pretend we are doing something useful here

        emit(i) // emit next value

    }

}

 

fun main() = runBlocking<Unit> {

    // Collect the flow

    simpleFlow().collect { value -> println(value) } 

}

通過上面例子可以看到涣觉,F(xiàn)low有以下特征:

  • 可以用flow{ ... } 構(gòu)建一個Flow類型

  • flow { ... }內(nèi)可以使用suspend函數(shù).

  • simpleFlow()不需要是suspend函數(shù)

  • emit方法用來發(fā)射數(shù)據(jù)

  • collect方法用來遍歷結(jié)果

2.Flow是冷流

Flow是一種冷流痴荐,Flow構(gòu)建器中的代碼只有在collect函數(shù)被執(zhí)行的時候才會運行。這一點與 Channel 正對應(yīng):Channel 的發(fā)送端并不依賴于接收端官册。

fun simpleFlow2() = flow<Int> {

    println("Flow started")

    for (i in 1..3) {

        delay(1000)

        emit(i)

    }

}



@Test

fun `test flow is cold`() = runBlocking<Unit> {

    val flow = simpleFlow2()

    println("Calling collect...")

    flow.collect { value -> println(value) }

    println("Calling collect again...")

    flow.collect { value -> println(value) }

}

3.Flow是具有連續(xù)性的流

流的每次單獨收集都是按順序執(zhí)行的生兆,除非使用了特殊的操作符,從上游到下游每個過渡操作符都會處理每個發(fā)射出的值膝宁,然后再交給末端操作符

@Test

fun `test flow continuation`() = runBlocking<Unit> {

    (1..5).asFlow().filter {

        it % 2 == 0

    }.map {

        "string $it"

    }.collect {

        println("Collect $it")

    }

}

4.Flow的構(gòu)建器

通常情況下有兩種方式可以構(gòu)建一個Flow,一種是通過flowOf構(gòu)建器定義一個發(fā)射固定值集的流

flowOf("one","two","three")

        .onEach { delay(1000) }

        .collect { value ->

            println(value)

        }

另一種方式是使用.asFlow()擴展函數(shù)可以將各種集合與序列轉(zhuǎn)換為Flow

(1..3).asFlow().collect { value ->

    println(value)

}

5.Flow的上下文

  1. Flow的收集總是在調(diào)用協(xié)程的上下文中發(fā)生的鸦难,Flow的該屬性稱為上下文保存
fun simpleFlow3() = flow<Int> {

    println("Flow started ${Thread.currentThread().name}")

    for (i in 1..3) {

        delay(1000)

        emit(i)

    }

}



@Test

fun `test flow context`() = runBlocking<Unit> {

    simpleFlow3()

            .collect { value -> println("Collected $value ${Thread.currentThread().name}") }

}
  1. flow{...}構(gòu)建器中的代碼必須遵循上下文保存屬性,并且不允許從其他上下文中發(fā)射(emit)
fun simpleFlow4() = flow<Int> {

    withContext(Dispatchers.IO) {

        println("Flow started ${Thread.currentThread().name}")

        for (i in 1..3) {

            delay(1000)

            emit(i)

        }

    }

} //Error
  1. flowOn操作符用于更改流發(fā)射的上下文
fun simpleFlow5() = flow<Int> {

    println("Flow started ${Thread.currentThread().name}")

    for (i in 1..3) {

        delay(1000)

        emit(i)

    }

}.flowOn(Dispatchers.Default)

6.分離 Flow 的消費和觸發(fā)

我們除了可以在 collect 處消費 Flow 的元素以外昆汹,還可以通過 onEach 來做到這一點明刷。這樣消費的具體操作就不需要與末端操作符放到一起,collect 函數(shù)可以放到其他任意位置調(diào)用满粗,例如:

fun createFlow() = flow<Int> {

    (1..3).forEach {

      emit(it)

      delay(100)

    }

  }.onEach { println(it) }



fun main(){

  GlobalScope.launch {

    createFlow().collect()

  }

}

7.Flow的取消

Flow本身并沒有提供取消操作辈末, Flow 的消費依賴于 collect 這樣的末端操作符,而它們又必須在協(xié)程當(dāng)中調(diào)用映皆,因此 Flow的取消主要依賴于末端操作符所在的協(xié)程的狀態(tài)挤聘。像往常一樣,Flow的收集可以是當(dāng)流在一個可取消的掛起函數(shù)中取消的時候取消捅彻。

fun simpleFlow6() = flow<Int> {

    for (i in 1..3) {

        delay(1000)

        emit(i)

        println("Emitting $i")

    }

}





@Test

fun `test cancel flow`() = runBlocking<Unit> {

    withTimeoutOrNull(2500) {

        simpleFlow6().collect { value -> println(value) }

    }

    println("Done")

}

8.Flow的取消檢測

  • 為方便起見组去,流構(gòu)建器對每個發(fā)射值執(zhí)行附加的enureActive檢測以進行取消,這意味著從flow{...}發(fā)出的繁忙循環(huán)是可以取消的
fun simpleFlow7() = flow<Int> {

    for (i in 1..5) {

        emit(i)

        println("Emitting $i")

    }

}



@Test

fun `test cancel flow check`() = runBlocking<Unit> {

    simpleFlow7().collect { value ->

        println(value)

        if (value == 3) cancel()

    }

}
  • 出于性能原因步淹,大多數(shù)其他流操作不會自行執(zhí)行其他取消檢測从隆,在協(xié)程出于繁忙循環(huán)的情況下,必須明確檢測是否取消缭裆。
@Test

fun `test cancel flow check`() = runBlocking<Unit> {

   (1..5).asFlow().collect { value ->

        println(value)

        if (value == 3) cancel()

    }

}
  • 一般使用cancellable操作符來執(zhí)行此操作
@Test

fun `test cancel flow check`() = runBlocking<Unit> {

   (1..5).asFlow().cancellable().collect { value ->

        println(value)

        if (value == 3) cancel()

    }

}

9.Flow的背壓

只要是響應(yīng)式編程键闺,就一定會有背壓問題,生產(chǎn)者的生產(chǎn)速率高于消費者的處理速率的情況下出現(xiàn)澈驼。為了保證數(shù)據(jù)不丟失辛燥,我們也會考慮添加緩存來緩解問題,buffer的本質(zhì)是并發(fā)運行流中發(fā)射元素的代碼

fun simpleFlow8() = flow<Int> {

    for (i in 1..3) {

        delay(100)

        emit(i)

        println("Emitting $i ${Thread.currentThread().name}")

    }

}



@Test

fun `test flow back pressure`() = runBlocking<Unit> {

    val time = measureTimeMillis {

        simpleFlow8()

               .buffer(50)

               .collect { value ->

            delay(300)   

            println("Collected $value ${Thread.currentThread().name}")

        }

    } 

    println("Collected in $time ms") //1028ms

}

不過,如果我們只是單純地添加緩存挎塌,而不是從根本上解決問題就始終會造成數(shù)據(jù)積壓徘六。問題產(chǎn)生的根本原因是生產(chǎn)和消費速率的不匹配,除直接優(yōu)化消費者的性能以外榴都,我們也可以采取一些取舍的手段待锈。第一種是 conflate,conflate()的策略是如果緩存池滿了,新數(shù)據(jù)會覆蓋老數(shù)據(jù)

fun simpleFlow8() = flow<Int> {

    for (i in 1..3) {

        delay(100)

        emit(i)

        println("Emitting $i ${Thread.currentThread().name}")

    }

}



@Test

fun `test flow back pressure`() = runBlocking<Unit> {

    val time = measureTimeMillis {

        simpleFlow8() 

                .conflate()

                .collect { value ->

            println("Collected start $value ${Thread.currentThread().name}")    

            delay(300)   

            println("Collected end $value ${Thread.currentThread().name}")

        }

    }



    println("Collected in $time ms") //770ms

}

第二種是 collectLatest缭贡。顧名思義炉擅,只處理最新的數(shù)據(jù),這看上去似乎與 conflate 沒有區(qū)別阳惹,其實區(qū)別大了:它并不會直接用新數(shù)據(jù)覆蓋老數(shù)據(jù),而是每一個都會被處理眶俩,只不過如果前一個還沒被處理完后一個就來了的話莹汤,處理前一個數(shù)據(jù)的邏輯就會被取消。

fun simpleFlow8() = flow<Int> {

    for (i in 1..3) {

        delay(100)

        emit(i)

        println("Emitting $i ${Thread.currentThread().name}")

    }

}



@Test

fun `test flow back pressure`() = runBlocking<Unit> {

    val time = measureTimeMillis {

        simpleFlow8()

            .collectLatest { value ->

            println("Collected start $value ${Thread.currentThread().name}")  

            delay(300)  

            println("Collected $value ${Thread.currentThread().name}")

        }

    }

    println("Collected in $time ms")//785ms

}

collectLatest 之外還有 mapLatest颠印、flatMapLatest 等等纲岭,都是這個作用。

10.Flow的操作符

  1. 轉(zhuǎn)換操作符(過渡操作符)
  • 可以使用操作符轉(zhuǎn)換流线罕,就像使用集合與序列一樣
  • 過渡操作符應(yīng)用于上游流止潮,并返回下游流
  • 這些操作符也是冷操作符,并且這類操作符本身并不是掛起函數(shù)
  • 運行速度很快钞楼,返回新的轉(zhuǎn)換流的定義
suspend fun performRequest(request: Int): String {

    delay(1000)

    return "response $request"

}



@Test

fun `test transform flow operator1`() = runBlocking<Unit> {

    (1..3).asFlow()

            .map { request -> performRequest(request) }

            .collect { value -> println(value) }

}





@Test

fun `test transform flow operator2`() = runBlocking<Unit> {

    (1..3).asFlow()

            .transform { request ->

                emit("Making request $request")

                emit(performRequest(request))

            }.collect { value -> println(value) }



}
  1. 限長操作符
fun numbers() = flow<Int> { 

    emit(1)

    emit(2)

    emit(3)

}



@Test

fun `test limit length operator`() = runBlocking<Unit> {

    numbers().take(2).collect { value -> println(value) }

}
  1. 末端操作符

末端操作符是在流上用于啟動流收集的掛起函數(shù)喇闸。collect是最基礎(chǔ)的末端操作符,功能與 RxJava 的 subscribe 類似询件。但還有另外一些更方便使用的末端操作符,大體分為兩類:

  • 集合類型轉(zhuǎn)換操作燃乍,包括 toListtoSet 等宛琅。
  • 聚合操作刻蟹,包括將 Flow 規(guī)約到單值的 reducefold 等操作嘿辟,以及獲得單個元素的操作包括 single舆瘪、singleOrNullfirst 等红伦。

實際上英古,識別是否為末端操作符,還有一個簡單方法色建,由于 Flow 的消費端一定需要運行在協(xié)程當(dāng)中哺呜,因此末端操作符都是掛起函數(shù)。

@Test

fun `test terminal operator`() = runBlocking<Unit> {

    val sum = (1..5).asFlow()

            .map { it * it }

            .reduce { a, b -> a + b }

    println(sum)

}
  1. 組合操作符

就像Kotlin標(biāo)準(zhǔn)庫中的Sequence.zip()擴展函數(shù)一樣,可以使用zip操作符組合兩個流中的相關(guān)值

@Test

fun `test zip`() = runBlocking<Unit> {

    val numbs = (1..3).asFlow()

    val strs = flowOf("One", "Two", "Three")

    numbs.zip(strs) { a, b -> "$a -> $b" }.collect { println(it) }

}

combine 雖然也是合并某残,但是跟 zip 不太一樣国撵。

使用 combine 合并時,每次從 flowA 發(fā)出新的 item 玻墅,會將其與 flowB 的最新的 item 合并介牙。

fun main() = runBlocking {

    val flowA = (1..5).asFlow().onEach { delay(100)  }

    val flowB = flowOf("one", "two", "three","four","five").onEach { delay(200)  }

    flowA.combine(flowB) { a, b -> "$a and $b" }

        .collect { println(it) }

}
1 and one

2 and one

3 and one

3 and two

4 and two

5 and two

5 and three

5 and four

5 and five
  1. 展平操作符

Flow表示異步接收的值序列,所以很容易遇到這樣的情況:每個值都會觸發(fā)對另一個值序列的請求澳厢。然而由于流具有“異步”的性質(zhì)环础,因此需要不同的展平模式,為此剩拢,存在一系列的展平操作符:

  1. flatMapConcat
  2. flatMapMerge
  3. flatMapLatest
fun requestFlow(i: Int) = flow<String> {

    emit("$i: First")

    delay(500)

    emit("$i: Second")

}



@Test

fun `test map`() = runBlocking<Unit> {

    //Flow<Flow<String>>

    val startTime = System.currentTimeMillis()

    (1..3).asFlow()

            .map { requestFlow(it) }

            .collect { println("$it at ${System.currentTimeMillis() - startTime} ms from start") }

}



@Test

fun `test flatMapConcat`() = runBlocking<Unit> {

    val startTime = System.currentTimeMillis()

    (1..3).asFlow()

            .onEach { delay(100) }

            .flatMapConcat { requestFlow(it) }

            .collect { println("$it at ${System.currentTimeMillis() - startTime} ms from start") }

}



1: First

1: Second

2: First

2: Second

3: First

3: Second



@Test

fun `test flatMapMerge`() = runBlocking<Unit> {

    val startTime = System.currentTimeMillis()

    (1..3).asFlow()

            .onEach { delay(100) }

            .flatMapMerge { requestFlow(it) }

            .collect { println("$it at ${System.currentTimeMillis() - startTime} ms from start") }

}



1: First

2: First

3: First

1: Second

2: Second

3: Second



@Test

fun `test flatMapLatest`() = runBlocking<Unit> {

    val startTime = System.currentTimeMillis()

    (1..3).asFlow()

            .onEach { delay(100) }

            .flatMapLatest { requestFlow(it) }

            .collect { println("$it at ${System.currentTimeMillis() - startTime} ms from start") }

}



1: First

2: First

3: First

3: Second

11.Flow的異常處理

當(dāng)運算符中的發(fā)射器或者代碼拋出異常時线得,通常有一下兩個處理方法:

第一種是 try/catch代碼塊

fun simpleFlow() = flow<Int> {

    for (i in 1..3) {

        println("Emitting $i")

        emit(i)

    }

}



@Test

fun `test flow exception`() = runBlocking<Unit> {

    try {

        simpleFlow().collect { value ->

            println(value)

            check(value <= 1) { "Collected $value" }

        }

    } catch (e: Throwable) {

        println("Caught $e")

    }

}

第二種是通過catch()函數(shù),但是只能捕獲上游異常

@Test

fun `test flow exception2`() = runBlocking<Unit> {

    flow {

        emit(1)

        throw ArithmeticException("Div 0")

    }.catch { e: Throwable -> println("Caught $e") }

        .flowOn(Dispatchers.IO)

        .collect { println(it) }



}

異承旆ィ恢復(fù)

@Test

fun `test flow exception2`() = runBlocking<Unit> {

    flow {

        throw ArithmeticException("Div 0")

        emit(1)

    }.catch { e: Throwable ->

        println("Caught $e")

        emit(10)

    }.collect { println(it) }

}

12.Flow的完成

當(dāng)流收集完成時贯钩,它可能需要執(zhí)行一個動作

  1. 命令式finally代碼塊
  2. onCompletion聲明式處理,onCompletion 用起來比較類似于 try ... catch ... finally 中的 finally办素,無論前面是否存在異常角雷,它都會被調(diào)用,參數(shù) t 則是前面未捕獲的異常性穿。
flow {

  emit(1)

  throw ArithmeticException("Div 0")

}.catch { t: Throwable ->

  println("caught error: $t")

}.onCompletion { t: Throwable? ->

  println("finally.")

}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末勺三,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子需曾,更是在濱河造成了極大的恐慌吗坚,老刑警劉巖,帶你破解...
    沈念sama閱讀 222,183評論 6 516
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件胯舷,死亡現(xiàn)場離奇詭異刻蚯,居然都是意外死亡,警方通過查閱死者的電腦和手機桑嘶,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,850評論 3 399
  • 文/潘曉璐 我一進店門炊汹,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人逃顶,你說我怎么就攤上這事讨便。” “怎么了以政?”我有些...
    開封第一講書人閱讀 168,766評論 0 361
  • 文/不壞的土叔 我叫張陵霸褒,是天一觀的道長。 經(jīng)常有香客問我盈蛮,道長废菱,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 59,854評論 1 299
  • 正文 為了忘掉前任,我火速辦了婚禮殊轴,結(jié)果婚禮上衰倦,老公的妹妹穿的比我還像新娘。我一直安慰自己旁理,他們只是感情好樊零,可當(dāng)我...
    茶點故事閱讀 68,871評論 6 398
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著孽文,像睡著了一般驻襟。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上芋哭,一...
    開封第一講書人閱讀 52,457評論 1 311
  • 那天沉衣,我揣著相機與錄音,去河邊找鬼楷掉。 笑死厢蒜,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的烹植。 我是一名探鬼主播,決...
    沈念sama閱讀 40,999評論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼愕贡,長吁一口氣:“原來是場噩夢啊……” “哼草雕!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起固以,我...
    開封第一講書人閱讀 39,914評論 0 277
  • 序言:老撾萬榮一對情侶失蹤墩虹,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后憨琳,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體诫钓,經(jīng)...
    沈念sama閱讀 46,465評論 1 319
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,543評論 3 342
  • 正文 我和宋清朗相戀三年篙螟,在試婚紗的時候發(fā)現(xiàn)自己被綠了菌湃。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,675評論 1 353
  • 序言:一個原本活蹦亂跳的男人離奇死亡遍略,死狀恐怖惧所,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情绪杏,我是刑警寧澤下愈,帶...
    沈念sama閱讀 36,354評論 5 351
  • 正文 年R本政府宣布,位于F島的核電站蕾久,受9級特大地震影響势似,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 42,029評論 3 335
  • 文/蒙蒙 一履因、第九天 我趴在偏房一處隱蔽的房頂上張望障簿。 院中可真熱鬧,春花似錦搓逾、人聲如沸卷谈。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,514評論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽世蔗。三九已至,卻和暖如春朗兵,著一層夾襖步出監(jiān)牢的瞬間污淋,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,616評論 1 274
  • 我被黑心中介騙來泰國打工余掖, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留寸爆,地道東北人。 一個月前我還...
    沈念sama閱讀 49,091評論 3 378
  • 正文 我出身青樓盐欺,卻偏偏與公主長得像赁豆,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子冗美,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,685評論 2 360

推薦閱讀更多精彩內(nèi)容