Kotlin Coroutines Flow 系列(五) 其他的操作符

attractive-beautiful-fashion-female-245388.jpg

八. Flow 其他的操作符

8.1 Transform operators

transform

在使用 transform 操作符時汛闸,可以任意多次調(diào)用 emit 赶诊,這是 transform 跟 map 最大的區(qū)別:

fun main() = runBlocking {

    (1..5).asFlow()
        .transform {
            emit(it * 2)
            delay(100)
            emit(it * 4)
        }
        .collect { println(it) }
}

transform 也可以使用 emit 發(fā)射任意值:

fun main() = runBlocking {

    (1..5).asFlow()
        .transform {
            emit(it * 2)
            delay(100)
            emit("emit $it")
        }
        .collect { println(it) }
}

8.2 Size-limiting operators

take

take 操作符只取前幾個 emit 發(fā)射的值。

fun main() = runBlocking {

    (1..5).asFlow()
        .take(2)
        .collect { println(it) }
}

8.3 Terminal flow operators

Kotlin Coroutines Flow 系列(一) Flow 基本使用 一文最后耗拓,我整理了 Flow 相關(guān)的 Terminal 操作符诡渴。本文介紹 reduce 和 fold 兩個操作符。

reduce

類似于 Kotlin 集合中的 reduce 函數(shù)和屎,能夠?qū)线M(jìn)行計算操作。

例如春瞬,對平方數(shù)列求和:

fun main() = runBlocking {

    val sum = (1..5).asFlow()
        .map { it * it }
        .reduce { a, b -> a + b }

    println(sum)
}

例如柴信,計算階乘:

fun main() = runBlocking {

    val sum = (1..5).asFlow().reduce { a, b -> a * b }

    println(sum)
}

fold

也類似于 Kotlin 集合中的 fold 函數(shù),fold 也需要設(shè)置初始值快鱼。

fun main() = runBlocking {

    val sum = (1..5).asFlow()
        .map { it * it }
        .fold(0) { a, b -> a + b }

    println(sum)
}

在上述代碼中颠印,初始值為0就類似于使用 reduce 函數(shù)實現(xiàn)對平方數(shù)列求和纲岭。

而對于計算階乘:

fun main() = runBlocking {

    val sum = (1..5).asFlow().fold(1) { a, b -> a * b }

    println(sum)
}

初始值為1就類似于使用 reduce 函數(shù)實現(xiàn)計算階乘抹竹。

8.4 Composing flows operators

zip

zip 是可以將2個 flow 進(jìn)行合并的操作符。

fun main() = runBlocking {

    val flowA = (1..5).asFlow()
    val flowB = flowOf("one", "two", "three","four","five")
    flowA.zip(flowB) { a, b -> "$a and $b" }
        .collect { println(it) }
}

執(zhí)行結(jié)果:

1 and one
2 and two
3 and three
4 and four
5 and five

zip 操作符會把 flowA 中的一個 item 和 flowB 中對應(yīng)的一個 item 進(jìn)行合并止潮。即使 flowB 中的每一個 item 都使用了 delay() 函數(shù)窃判,在合并過程中也會等待 delay() 執(zhí)行完后再進(jìn)行合并。

fun main() = runBlocking {

    val flowA = (1..5).asFlow()
    val flowB = flowOf("one", "two", "three", "four", "five").onEach { delay(100) }

    val time = measureTimeMillis {
        flowA.zip(flowB) { a, b -> "$a and $b" }
            .collect { println(it) }
    }

    println("Cost $time ms")
}

執(zhí)行結(jié)果:

1 and one
2 and two
3 and three
4 and four
5 and five
Cost 561 ms

如果 flowA 中 item 個數(shù)大于 flowB 中 item 個數(shù):

fun main() = runBlocking {

    val flowA = (1..6).asFlow()
    val flowB = flowOf("one", "two", "three","four","five")
    flowA.zip(flowB) { a, b -> "$a and $b" }
        .collect { println(it) }
}

執(zhí)行合并后新的 flow 的 item 個數(shù) = 較小的 flow 的 item 個數(shù)喇闸。

執(zhí)行結(jié)果:

1 and one
2 and two
3 and three
4 and four
5 and five

combine

combine 雖然也是合并,但是跟 zip 不太一樣。

使用 combine 合并時桑阶,每次從 flowA 發(fā)出新的 item 霜运,會將其與 flowB 的最新的 item 合并。

fun main() = runBlocking {

    val flowA = (1..5).asFlow().onEach { delay(100)  }
    val flowB = flowOf("one", "two", "three","four","five").onEach { delay(200)  }
    flowA.combine(flowB) { a, b -> "$a and $b" }
        .collect { println(it) }
}

執(zhí)行結(jié)果:

1 and one
2 and one
3 and one
3 and two
4 and two
5 and two
5 and three
5 and four
5 and five

flattenMerge

其實刻蟹,flattenMerge 不會組合多個 flow 逗旁,而是將它們作為單個流執(zhí)行。

fun main() = runBlocking {

    val flowA = (1..5).asFlow()
    val flowB = flowOf("one", "two", "three","four","five")

    flowOf(flowA,flowB)
        .flattenConcat()
        .collect{ println(it) }
}

執(zhí)行結(jié)果:

1
2
3
4
5
one
two
three
four
five

為了能更清楚地看到 flowA舆瘪、flowB 作為單個流的執(zhí)行片效,對他們稍作改動。

fun main() = runBlocking {

    val flowA = (1..5).asFlow().onEach { delay(100) }
    val flowB = flowOf("one", "two", "three","four","five").onEach { delay(200) }

    flowOf(flowA,flowB)
        .flattenMerge(2)
        .collect{ println(it) }
}

執(zhí)行結(jié)果:

1
one
2
3
two
4
5
three
four
five

8.5 Flattening flows operators

flatMapConcat英古、flatMapMerge 類似于 RxJava 的 concatMap淀衣、flatMap 操作符。

flatMapConcat

flatMapConcat 由 map召调、flattenConcat 操作符實現(xiàn)膨桥。

@FlowPreview
public fun <T, R> Flow<T>.flatMapConcat(transform: suspend (value: T) -> Flow<R>): Flow<R> =
    map(transform).flattenConcat()

在調(diào)用 flatMapConcat 后蛮浑,collect 函數(shù)在收集新值之前會等待 flatMapConcat 內(nèi)部的 flow 完成。

fun currTime() = System.currentTimeMillis()

var start: Long = 0

fun main() = runBlocking {

    (1..5).asFlow()
        .onStart { start = currTime() }
        .onEach { delay(100) }
        .flatMapConcat {
            flow {
                emit("$it: First")
                delay(500)
                emit("$it: Second")
            }
        }
        .collect {
            println("$it at ${System.currentTimeMillis() - start} ms from start")
        }
}

執(zhí)行結(jié)果:

1: First at 114 ms from start
1: Second at 619 ms from start
2: First at 719 ms from start
2: Second at 1224 ms from start
3: First at 1330 ms from start
3: Second at 1830 ms from start
4: First at 1932 ms from start
4: Second at 2433 ms from start
5: First at 2538 ms from start
5: Second at 3041 ms from start

flatMapMerge

flatMapMerge 由 map只嚣、flattenMerge 操作符實現(xiàn)陵吸。

@FlowPreview
public fun <T, R> Flow<T>.flatMapMerge(
    concurrency: Int = DEFAULT_CONCURRENCY,
    transform: suspend (value: T) -> Flow<R>
): Flow<R> = map(transform).flattenMerge(concurrency)

flatMapMerge 是順序調(diào)用內(nèi)部代碼塊,并且并行地執(zhí)行 collect 函數(shù)介牙。

fun currTime() = System.currentTimeMillis()

var start: Long = 0

fun main() = runBlocking {

    (1..5).asFlow()
        .onStart { start = currTime() }
        .onEach { delay(100) }
        .flatMapMerge {
            flow {
                emit("$it: First")
                delay(500)
                emit("$it: Second")
            }
        }
        .collect {
            println("$it at ${System.currentTimeMillis() - start} ms from start")
        }
}

執(zhí)行結(jié)果:

1: First at 116 ms from start
2: First at 216 ms from start
3: First at 319 ms from start
4: First at 422 ms from start
5: First at 525 ms from start
1: Second at 618 ms from start
2: Second at 719 ms from start
3: Second at 822 ms from start
4: Second at 924 ms from start
5: Second at 1030 ms from start

flatMapMerge 操作符有一個參數(shù) concurrency 壮虫,它默認(rèn)使用DEFAULT_CONCURRENCY,如果想更直觀地了解 flatMapMerge 的并行环础,可以對這個參數(shù)進(jìn)行修改囚似。例如改成2,就會發(fā)現(xiàn)不一樣的執(zhí)行結(jié)果线得。

flatMapLatest

當(dāng)發(fā)射了新值之后饶唤,上個 flow 就會被取消。

fun currTime() = System.currentTimeMillis()

var start: Long = 0

fun main() = runBlocking {

    (1..5).asFlow()
        .onStart { start = currTime() }
        .onEach { delay(100) }
        .flatMapLatest {
            flow {
                emit("$it: First")
                delay(500)
                emit("$it: Second")
            }
        }
        .collect {
            println("$it at ${System.currentTimeMillis() - start} ms from start")
        }
}

執(zhí)行結(jié)果:

1: First at 114 ms from start
2: First at 220 ms from start
3: First at 321 ms from start
4: First at 422 ms from start
5: First at 524 ms from start
5: Second at 1024 ms from start

九. Flow VS Reactive Streams

天生的多平臺支持

由于 Kotlin 語言自身對多平臺的支持贯钩,使得 Flow 也可以在多平臺上使用募狂。

互操作性

Flow 仍然屬于響應(yīng)式范疇。開發(fā)者通過 kotlinx-coroutines-reactive 模塊中 Flow.asPublisher() 和 Publisher.asFlow() 角雷,可以方便地將 Flow 跟 Reactive Streams 進(jìn)行互操作祸穷。

該系列的相關(guān)文章:

Kotlin Coroutines Flow 系列(一) Flow 基本使用
Kotlin Coroutines Flow 系列(二) Flow VS RxJava2
Kotlin Coroutines Flow 系列(三) 異常處理
Kotlin Coroutines Flow 系列(四) 線程操作

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市勺三,隨后出現(xiàn)的幾起案子雷滚,更是在濱河造成了極大的恐慌,老刑警劉巖吗坚,帶你破解...
    沈念sama閱讀 206,723評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件祈远,死亡現(xiàn)場離奇詭異,居然都是意外死亡商源,警方通過查閱死者的電腦和手機(jī)车份,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,485評論 2 382
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來牡彻,“玉大人扫沼,你說我怎么就攤上這事√直悖” “怎么了充甚?”我有些...
    開封第一講書人閱讀 152,998評論 0 344
  • 文/不壞的土叔 我叫張陵,是天一觀的道長霸褒。 經(jīng)常有香客問我伴找,道長,這世上最難降的妖魔是什么废菱? 我笑而不...
    開封第一講書人閱讀 55,323評論 1 279
  • 正文 為了忘掉前任技矮,我火速辦了婚禮抖誉,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘衰倦。我一直安慰自己袒炉,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,355評論 5 374
  • 文/花漫 我一把揭開白布樊零。 她就那樣靜靜地躺著我磁,像睡著了一般。 火紅的嫁衣襯著肌膚如雪驻襟。 梳的紋絲不亂的頭發(fā)上夺艰,一...
    開封第一講書人閱讀 49,079評論 1 285
  • 那天,我揣著相機(jī)與錄音沉衣,去河邊找鬼郁副。 笑死,一個胖子當(dāng)著我的面吹牛豌习,可吹牛的內(nèi)容都是我干的存谎。 我是一名探鬼主播,決...
    沈念sama閱讀 38,389評論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼肥隆,長吁一口氣:“原來是場噩夢啊……” “哼既荚!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起巷屿,我...
    開封第一講書人閱讀 37,019評論 0 259
  • 序言:老撾萬榮一對情侶失蹤固以,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后嘱巾,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,519評論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡诫钓,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,971評論 2 325
  • 正文 我和宋清朗相戀三年旬昭,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片菌湃。...
    茶點(diǎn)故事閱讀 38,100評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡问拘,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出惧所,到底是詐尸還是另有隱情骤坐,我是刑警寧澤,帶...
    沈念sama閱讀 33,738評論 4 324
  • 正文 年R本政府宣布下愈,位于F島的核電站纽绍,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏势似。R本人自食惡果不足惜拌夏,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,293評論 3 307
  • 文/蒙蒙 一僧著、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧障簿,春花似錦盹愚、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,289評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至西篓,卻和暖如春端逼,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背污淋。 一陣腳步聲響...
    開封第一講書人閱讀 31,517評論 1 262
  • 我被黑心中介騙來泰國打工顶滩, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人寸爆。 一個月前我還...
    沈念sama閱讀 45,547評論 2 354
  • 正文 我出身青樓礁鲁,卻偏偏與公主長得像,于是被迫代替她去往敵國和親赁豆。 傳聞我的和親對象是個殘疾皇子仅醇,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,834評論 2 345