Kotlin協(xié)程-Flow

前言

Flow是kotlin協(xié)程中的流围橡。RxJava就是流式編程的庫。Flow屬于冷流對應RxJava中的Observable Flowable Single MayBe和Completable等战坤。Kotlin協(xié)程中的熱流實現(xiàn)MutableSharedFlow和MutableStateFlow等,對應RxJava中熱流PublisherSubject和BehaviorSubject残拐。

  • 冷流:較少的訪問和修改
  • 熱流:頻繁地讀取和更新

Flow使用

fun main() {
    runBlocking (Dispatchers.Default){
        // 發(fā)送10個元素途茫,從0到9
        val myFlow = flow {
            repeat(10){
                emit(it)
            }
        }
        launch {
            myFlow.collect{
                println("Coroutine1:$it")
            }
        }
        launch {
            myFlow.collect{
                println("Coroutine2:$it")
            }
        }
    }
}

協(xié)程1和2通過Flow.collect訂閱Flow。

fun main() {
    runBlocking (Dispatchers.Default){
        // 發(fā)送10個元素溪食,從0到9
        val myFlow = flow {
            repeat(10){
                // 修改原來的CoroutineContext,會異常
                withContext(Dispatchers.IO){
                    emit(it)
                }
            }
        }
        launch {
            myFlow.collect{
                println("Coroutine1:$it")
            }
        }
    }
}

Flow限制囊卜,不能修改原來的CoroutineContext。可以使用ChannelFlow就能正常使用栅组。

fun main() {
    runBlocking (Dispatchers.Default){
        // 發(fā)送10個元素雀瓢,從0到9
        val myFlow = channelFlow {
            repeat(10){
                // 可以修改原來的CoroutineContext
                withContext(Dispatchers.IO){
                    channel.send(it)
                }
            }
        }
        launch {
            myFlow.collect{
                println("Coroutine1:$it")
            }
        }
    }
}

fun main() {
    runBlocking(Dispatchers.Default) {
        // 發(fā)送10個元素,從0到9
        val myFlow = flow {
            repeat(10) {
                try {
                    emit(it)
                } catch (e: Throwable) {
                    emit(22)
                }
            }
        }
        launch {
            myFlow.collect {
                if (it == 2) {
                    // 這里出現(xiàn)異常后玉掸,collect訂閱就結束了(只打印2次刃麸,第三次就異常了)
                    error("Error")
                }
                println("Coroutine1:$it")
            }
        }
    }
}

Flow中collect異常,那么訂閱就結束了司浪。

Flow工作原理

public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)

private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
    override suspend fun collectSafely(collector: FlowCollector<T>) {
        collector.block()
    }
}

public final override suspend fun collect(collector: FlowCollector<T>) {
    val safeCollector = SafeCollector(collector, coroutineContext)
    try {
        collectSafely(safeCollector)
    } finally {
        safeCollector.releaseIntercepted()
    }
}

創(chuàng)建SafeFlow,繼承于AbstractFlow,訂閱調用的是collect泊业。檢查當前CoroutineContext和調用的collect方法傳入的是否一致,不一致就拋出異常啊易。 Flow被SafeCollector代理去檢查異常吁伺。 轉換前的流稱上游Upstream,處理后再發(fā)送到下游Downstream

flatMap操作符

類似于RxJava中的concatMap操作符

fun main() {
    runBlocking(Dispatchers.Default) {
        val myFlow = flow {
            repeat(3) {
                emit(it)
            }
        }
        launch {
            // 將原來的流元素構建成一個新的流(按照原來的流元素輸出)
            myFlow.flatMapConcat { upstreamValue ->
                flow {
                    delay(1000L - upstreamValue * 100)
                    repeat(2) {
                        emit(upstreamValue * 10 + it)
                    }
                }
            }.collect {
                println("collect $it")
            }
        }
    }
}
輸出:
collect 0
collect 1
collect 10
collect 11
collect 20
collect 21

將原來發(fā)送3個元素,通過flatMapConcat()發(fā)送兩個元素认罩。越是先發(fā)送的元素延遲時間越長,然后按順序輸出6個元素续捂。

flatMapMerge

類似于RxJava中的flatMap

fun main() {
    runBlocking(Dispatchers.Default) {
        val myFlow = flow {
            repeat(3) {
                emit(it)
            }
        }
        launch {
            // 將原來的流元素構建成一個新的流(默認并發(fā)16個)誰先執(zhí)行完就發(fā)送誰
            myFlow.flatMapMerge { upstreamValue ->
                flow {
                    delay(1000L - upstreamValue * 100)
                    repeat(2) {
                        emit(upstreamValue * 10 + it)
                    }
                }
            }.collect {
                println("collect $it")
            }
        }
    }
}
輸出:
collect 20
collect 21
collect 10
collect 11
collect 0
collect 1

不會保證原來的順序垦垂,哪個流先處理完就先發(fā)送數(shù)據(jù)。concurrency默認值16牙瓢,并行執(zhí)行的數(shù)量劫拗。當concurrency為1時和flatMapConcat一樣。

fun main() {
    runBlocking(Dispatchers.Default) {
        val myFlow = flow {
            repeat(3) {
                emit(it)
            }
        }
        launch {
            // 將原來的流元素構建成一個新的流(并發(fā)數(shù)是2矾克,達到2個的時候等待然后再執(zhí)行下一個)
            myFlow.flatMapMerge(2) { upstreamValue ->
                flow {
                    delay(1000L - upstreamValue * 100)
                    repeat(2) {
                        emit(upstreamValue * 10 + it)
                    }
                }
            }.collect {
                println("collect $it")
            }
        }
    }
}
輸出:
collect 10
collect 11
collect 0
collect 1
collect 20
collect 21

flatMapLatest

類似于RxJava中的switchMap

fun main() {
    runBlocking(Dispatchers.Default) {
        val myFlow = flow {
            repeat(3) {
                emit(it)
            }
        }
        launch {
            // 前面沒執(zhí)行完的Flow會被取消页慷,然后被后續(xù)的Flow替換
            myFlow.flatMapLatest  { upstreamValue ->
                flow {
                    delay(1000L - upstreamValue * 100)
                    repeat(2) {
                        emit(upstreamValue * 10 + it)
                    }
                }
            }.collect {
                println("collect $it")
            }
        }
    }
}
輸出:
collect 20
collect 21

總結

介紹了Flow常用的操作符map flatMap(串式) flatMapMerge(并發(fā)) flatmapLatest(取代舊的)等簡單使用胁附。

?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市控妻,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌郎哭,老刑警劉巖,帶你破解...
    沈念sama閱讀 216,544評論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件菇存,死亡現(xiàn)場離奇詭異夸研,居然都是意外死亡,警方通過查閱死者的電腦和手機依鸥,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,430評論 3 392
  • 文/潘曉璐 我一進店門亥至,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人,你說我怎么就攤上這事抬闯【荆” “怎么了?”我有些...
    開封第一講書人閱讀 162,764評論 0 353
  • 文/不壞的土叔 我叫張陵溶握,是天一觀的道長杯缺。 經常有香客問我,道長睡榆,這世上最難降的妖魔是什么萍肆? 我笑而不...
    開封第一講書人閱讀 58,193評論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮胀屿,結果婚禮上塘揣,老公的妹妹穿的比我還像新娘。我一直安慰自己宿崭,他們只是感情好亲铡,可當我...
    茶點故事閱讀 67,216評論 6 388
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著葡兑,像睡著了一般奖蔓。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上讹堤,一...
    開封第一講書人閱讀 51,182評論 1 299
  • 那天吆鹤,我揣著相機與錄音,去河邊找鬼洲守。 笑死梗醇,一個胖子當著我的面吹牛叙谨,可吹牛的內容都是我干的唉俗。 我是一名探鬼主播虫溜,決...
    沈念sama閱讀 40,063評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼吱雏,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了镰惦?” 一聲冷哼從身側響起旺入,我...
    開封第一講書人閱讀 38,917評論 0 274
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎咐鹤,沒想到半個月后祈惶,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體捧请,經...
    沈念sama閱讀 45,329評論 1 310
  • 正文 獨居荒郊野嶺守林人離奇死亡突照,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,543評論 2 332
  • 正文 我和宋清朗相戀三年氧吐,在試婚紗的時候發(fā)現(xiàn)自己被綠了末盔。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,722評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡翠拣,死狀恐怖误墓,靈堂內的尸體忽然破棺而出谜慌,到底是詐尸還是另有隱情欣范,我是刑警寧澤,帶...
    沈念sama閱讀 35,425評論 5 343
  • 正文 年R本政府宣布,位于F島的核電站蛙卤,受9級特大地震影響表窘,放射性物質發(fā)生泄漏甜滨。R本人自食惡果不足惜衣摩,卻給世界環(huán)境...
    茶點故事閱讀 41,019評論 3 326
  • 文/蒙蒙 一既琴、第九天 我趴在偏房一處隱蔽的房頂上張望甫恩。 院中可真熱鬧磺箕,春花似錦松靡、人聲如沸建椰。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,671評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽撞蜂。三九已至,卻和暖如春溉贿,著一層夾襖步出監(jiān)牢的瞬間宇色,已是汗流浹背颁湖。 一陣腳步聲響...
    開封第一講書人閱讀 32,825評論 1 269
  • 我被黑心中介騙來泰國打工抢蚀, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留皿曲,地道東北人屋休。 一個月前我還...
    沈念sama閱讀 47,729評論 2 368
  • 正文 我出身青樓劫樟,卻偏偏與公主長得像叠艳,于是被迫代替她去往敵國和親捧挺。 傳聞我的和親對象是個殘疾皇子闽烙,可洞房花燭夜當晚...
    茶點故事閱讀 44,614評論 2 353

推薦閱讀更多精彩內容