Kotlin協(xié)程之Flow-異步流

如何表示多個值
  • 掛起函數(shù)可以異步的返回單個值掌实,但是該如何異步返回多個計算好的值呢?
異步返回多個值的方案
  • 集合
  • 序列
  • 掛起函數(shù)
  • Flow
/*suspend*/ fun simpleFlow() = flow {
    for(i in 1..3){
        delay(1000)
        emit(i)
    }
}
Flow與其他方式區(qū)別
  • 名為flow的Flow類型構(gòu)建器函數(shù)
  • flow{...}構(gòu)建塊中的代碼可以掛起
  • 函數(shù)simpleFlow不再標有suspend修飾符
  • 流使用emit函數(shù)發(fā)射值
  • 流使用collect函數(shù)收集值
Flow應用
  • 在Android當中跨跨,文件下載是Flow的一個非常典型的應用


    20211222-103630@2x.png
冷流
  • Flow是一種類似于序列的冷流潮峦,flow構(gòu)建器中的代碼直到流被收集的時候才運行
fun simpleFlow() = flow {
    println("Flow started")
    for(i in 1..3){
        delay(1000)
        emit(i)
    }
}

fun testFlowIsCode() = runBlocking {
    val flow = simpleFlow()
    println("Flow Collect")
    flow.collect { println(it) }
    println("Flow Collect again")
    flow.collect { println(it) }
}

返回結(jié)果

Flow Collect
Flow started
1
2
3
Flow Collect again
Flow started
1
2
3

Process finished with exit code 0

根據(jù)以上返回結(jié)果可以看出代碼執(zhí)行val flow = simpleFlow()的時候沒有執(zhí)行flow{...}構(gòu)建塊中的代碼囱皿,只有調(diào)用collect的時候才執(zhí)行,這就是冷流

流的連續(xù)性
  • 流的每次單獨收集都是按照順序執(zhí)行的忱嘹,除非使用特殊操作符
  • 從上游到下游每個過度操作符都會處理每個發(fā)射出的值嘱腥,然后再交給末端操作符
fun testFlowContinuation() = runBlocking {
    (1..5).asFlow().filter {
        it%2 == 0
    }.map {
        "string $it"
    }.collect {
        println(it)
    }
}
流構(gòu)建器
  • flowOf構(gòu)建器定義了一個發(fā)射固定值集的流
  • 使用.asFlow()擴展函數(shù),可是將各種集合與序列轉(zhuǎn)換為流
fun testFlowOf() = runBlocking {
    flowOf("one", "two", "three")
        .onEach {
            delay(1000)//每隔1s發(fā)射一個元素 
        }.collect{
            println(it)
        }
}
流上下文
  • 流的收集總是在調(diào)用協(xié)程的上下文中發(fā)生拘悦,流的該屬性稱為上下文保存
  • flow{...}構(gòu)建器中的代碼必須遵循上下文保存屬性齿兔,并且不允許從其他上下文中發(fā)射(emit)
fun simpleFlow() = flow {
    println1("Flow started ${Thread.currentThread().name}")
    for(i in 1..3) {
        delay(1000)
        emit(i)
    }
}

fun testFlowContext() = runBlocking {
    simpleFlow().collect { value ->
        println1("$value ${Thread.currentThread().name}")
    }
}

看以上代碼兩個方法都存在于主線程,那如果flow{...}改變一下線程如下

fun simpleFlow() = flow {
   withContext(Dispatchers.IO){
       println1("Flow started ${Thread.currentThread().name}")
       for(i in 1..3) {
           delay(1000)
           emit(i)
       }
   }
}

執(zhí)行一下础米,就會報錯

Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated:
Flow was collected in [BlockingCoroutine{Active}@599ad64a, BlockingEventLoop@2cbb850d],
        but emission happened in [DispatchedCoroutine{Active}@42689e0c, Dispatchers.IO].
        Please refer to 'flow' documentation or use 'flowOn' instead

這就證明必須遵循上下文保存屬性分苇,并且不允許從其他上下文中發(fā)射

  • flowOn操作符,該函數(shù)用于更改流發(fā)射的上下文
fun simpleFlow() = flow {
    println1("Flow started ${Thread.currentThread().name}")
    for(i in 1..3) {
        delay(1000)
        emit(i)
    }
}.flowOn(Dispatchers.IO)
啟動流
  • 使用launchIn替換collect我們可以在單獨的協(xié)程中啟動流的收集
//事件源
fun event() = (1..3).asFlow().onEach { delay(100) }.flowOn(Dispatchers.Default)

fun testEventLaunch() = runBlocking {
    event().onEach {event->
        println1("Event: $event  ${Thread.currentThread().name}")
    }.launchIn(CoroutineScope(Dispatchers.IO)).join()
}
流的取消
  • 流采用與協(xié)程同樣的協(xié)作取消屁桑。像往常一樣医寿,流的收集可以是當流在一個可取消的掛起函數(shù)(例如 delay)中掛起的時候取消
fun simpleFlow1() = flow {
    println1("Flow started ${Thread.currentThread().name}")
    for(i in 1..3) {
        delay(1000)
        emit(i)
        println1("Emitting $i")
    }
}.flowOn(Dispatchers.IO)

fun testCancelFlow() = runBlocking {
    withTimeoutOrNull(2500) {
        simpleFlow1().collect { value -> println1(value) }
    }
    println1("Done")
}
流的取消監(jiān)測
  • 為方便起見,流構(gòu)建器對每個發(fā)射值執(zhí)行附加的 ensureActive 監(jiān)測以進行取消蘑斧,這意味著從flow{...}發(fā)出繁忙循環(huán)是可以取消的
  • 出于性能原因靖秩,大多數(shù)其他流操作不會自行執(zhí)行其他取消監(jiān)測,在協(xié)程出于繁忙循環(huán)的情況下竖瘾,必須明確監(jiān)測是否取消
  • 通過cancellable操作符來執(zhí)行此操作
fun testCancelFlowCheck() = runBlocking {
    (1..5).asFlow().cancellable().collect {
        println1(it)
        if(it == 3)cancel()
    }
}
背壓

水流收到與流動方向一致的壓力叫做背壓或者生產(chǎn)者的效率大于消費者的效率

  • buffer(),并發(fā)運行流中發(fā)射元素的代碼,相當于把管道延長沟突,增加緩沖區(qū)
  • conflate(),合并發(fā)射項捕传,不對每個值進行處理
  • collectLates(),取消并重新發(fā)射最后一個值
  • 當必須更改CoroutineDispatcher時惠拭,flowOn操作符使用了相同的緩存機制,但是buffer函數(shù)顯式地請求緩沖而不改變執(zhí)行上下文
fun simpleFlow2() = flow {
    for(i in 1..3) {
        delay(100)
        emit(i)
        println1("Emitting $i")
    }
}

fun testFlowBackPressure() = runBlocking {
    val time = measureTimeMillis {
        simpleFlow2()
//            .flowOn(Dispatchers.Default)
//            .buffer(50)
//            .conflate()
//            .collectLatest {
//            }
            .collect {
            delay(300)
            println1("$it")
        }
    }
    println1(time)
}
過渡流操作符
  • 可以使用操作符轉(zhuǎn)換流庸论,就像使用集合與序列一樣
  • 過渡操作符應用于上游流职辅,并返回下游流
  • 這些操作符也是冷操作符,就像流一樣聂示。這類操作符本身不是掛起函數(shù)
  • 它運行速度很快罐农,返回新的轉(zhuǎn)換流的定義
suspend fun performRequest(request: Int): String {
    delay(1000)
    return "response $request"
}

fun testTransformFlowOperator() = runBlocking {
    (1..5).asFlow().transform { request ->
        emit("Making request $request")
        emit(performRequest(request))
    }.collect {
        println1(it)
    }
}
限長操作符

take

fun number() = flow {
    try {
        emit(1)
        emit(2)
        emit(3)
    } finally {
        println1("Finally in numbers")
    }
}

fun testLimitLengthOperator() = runBlocking {
    number().take(2).collect { println1(it) }
}
末端操作符
  • 末端操作符是在流上用于啟動流收集的掛起函數(shù)。collect是最基礎(chǔ)的末端操作符催什,但是還有另外一些方便使用的末端操作符
  1. 轉(zhuǎn)化為各種集合涵亏,例如toList和toSet
  2. 獲取第一個(first)值和確保流發(fā)射單個(single)值的操作符
  3. 使用reduce與fold將流規(guī)約到單個值
fun testTerminalOperator() = runBlocking {
    val sum = (1..5).asFlow().map { it * it }.reduce { a, b ->
        a+b
    }
    println1(sum)
}
fun testTerminalOperator() = runBlocking {
    val sum = (1..5).asFlow()
    .fold(0, {acc, i -> acc + i })//以0為初始值,求1到5的和
    println1(sum)
}
組合多個流
  • 就像Kotlin標準庫中的Sequence.zip擴展函數(shù)一樣蒲凶,流擁有一個zip操作符用于組合兩個流中的相關(guān)值
fun testZip() = runBlocking {
    val numbers = (1..5).asFlow()
    val strs = flowOf("one", "two", "three")
    numbers.zip(strs) { a, b ->
        "$a -> $b"
    }.collect { println1(it) }
}
展平流
  • 流表示異步接收的值序列气筋,所以很容易遇到這樣的情況:每個值都會觸發(fā)對另一個值序列的請求,然而旋圆,由于流具有異步的性質(zhì)宠默,因此需要不同的展平方式,為此灵巧,存在一系列的流展平操作符
  1. flatmapConcat 連接模式
  2. flatMapMerge 合并模式
  3. flatMapLatest最新展平模式
fun requestFlow(i: Int) = flow {
    emit("$i first")
    delay(500)
    emit("$i second")
}

fun testFlatMapConcat() = runBlocking {
    (1..3).asFlow().onEach { delay(100) }
//        .map { requestFlow(it) } // 轉(zhuǎn)換后會變成 Flow<Flow<String>>因此需要展平處理
        .flatMapConcat { requestFlow(it) }
//        .flatMapConcat { requestFlow(it) }
//        .flatMapConcat { requestFlow(it) }
        .collect { println1(it) }
}

返回結(jié)果對比

flatMapConcat 模式
1 first
1 second
2 first
2 second
3 first
3 second
flatMapMerge 模式
1 first
2 first
3 first
1 second
2 second
3 second
flatMapLatest 模式
1 first
2 first
3 first
3 second
流的異常處理
  • 當運算符中發(fā)射器或代碼拋出異常時搀矫,有幾種處理異常的方法
  1. try/catch塊
  2. catch函數(shù)
fun number() = flow {
    try {
        emit(1)
        emit(2)
        emit(3)
    } finally {
        println1("Finally in numbers")
    }
}.catch { e : Throwable ->
    // 在catch塊中 還可以繼續(xù)發(fā)射元素
    emit(4)
}
流的完成
  • 當流收集完成時(普通情況或異常情況)抹沪,它可能需要執(zhí)行一個動作。
  1. 命令式finally塊
  2. onCompletion聲明式處理
fun simpleFlow4() = (1..3).asFlow()

fun testFlowCompleteInFinally() = runBlocking {
//    try {
//        simpleFlow4().collect { println1(it) }
//    } finally {
//        println1("Done")
//    }
    simpleFlow4().onCompletion {exception-> //onCompletion還可以打印出上游下游異常信息
        println1("Done ${exception ?: ""}")
    }.collect {
        println1(it)
    }
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末瓤球,一起剝皮案震驚了整個濱河市融欧,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌卦羡,老刑警劉巖噪馏,帶你破解...
    沈念sama閱讀 222,104評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異绿饵,居然都是意外死亡欠肾,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,816評論 3 399
  • 文/潘曉璐 我一進店門拟赊,熙熙樓的掌柜王于貴愁眉苦臉地迎上來刺桃,“玉大人,你說我怎么就攤上這事吸祟÷采觯” “怎么了?”我有些...
    開封第一講書人閱讀 168,697評論 0 360
  • 文/不壞的土叔 我叫張陵欢搜,是天一觀的道長。 經(jīng)常有香客問我谴轮,道長炒瘟,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 59,836評論 1 298
  • 正文 為了忘掉前任第步,我火速辦了婚禮疮装,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘粘都。我一直安慰自己廓推,他們只是感情好,可當我...
    茶點故事閱讀 68,851評論 6 397
  • 文/花漫 我一把揭開白布翩隧。 她就那樣靜靜地躺著樊展,像睡著了一般。 火紅的嫁衣襯著肌膚如雪堆生。 梳的紋絲不亂的頭發(fā)上专缠,一...
    開封第一講書人閱讀 52,441評論 1 310
  • 那天,我揣著相機與錄音淑仆,去河邊找鬼涝婉。 笑死,一個胖子當著我的面吹牛蔗怠,可吹牛的內(nèi)容都是我干的墩弯。 我是一名探鬼主播吩跋,決...
    沈念sama閱讀 40,992評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼渔工!你這毒婦竟也來了锌钮?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,899評論 0 276
  • 序言:老撾萬榮一對情侶失蹤涨缚,失蹤者是張志新(化名)和其女友劉穎轧粟,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體脓魏,經(jīng)...
    沈念sama閱讀 46,457評論 1 318
  • 正文 獨居荒郊野嶺守林人離奇死亡兰吟,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,529評論 3 341
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了茂翔。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片混蔼。...
    茶點故事閱讀 40,664評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖珊燎,靈堂內(nèi)的尸體忽然破棺而出惭嚣,到底是詐尸還是另有隱情,我是刑警寧澤悔政,帶...
    沈念sama閱讀 36,346評論 5 350
  • 正文 年R本政府宣布晚吞,位于F島的核電站,受9級特大地震影響谋国,放射性物質(zhì)發(fā)生泄漏槽地。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 42,025評論 3 334
  • 文/蒙蒙 一芦瘾、第九天 我趴在偏房一處隱蔽的房頂上張望捌蚊。 院中可真熱鬧,春花似錦近弟、人聲如沸缅糟。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,511評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽窗宦。三九已至,卻和暖如春二鳄,著一層夾襖步出監(jiān)牢的瞬間迫摔,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,611評論 1 272
  • 我被黑心中介騙來泰國打工泥从, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留句占,地道東北人。 一個月前我還...
    沈念sama閱讀 49,081評論 3 377
  • 正文 我出身青樓躯嫉,卻偏偏與公主長得像纱烘,于是被迫代替她去往敵國和親杨拐。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 45,675評論 2 359

推薦閱讀更多精彩內(nèi)容