Kotlin 異步 | Flow 應(yīng)用場景及原理

什么是“異步數(shù)據(jù)流”?它在什么業(yè)務(wù)場景下有用武之地匀奏?它背后的原理是什么瓶殃?讀一讀 Flow 的源碼,嘗試回答這些問題位谋。

同步 & 異步 & 連續(xù)異步

同步和異步是用來形容“調(diào)用”的:

  • 同步調(diào)用:當(dāng)調(diào)用發(fā)起者觸發(fā)了同步調(diào)用后山析,它會等待調(diào)用執(zhí)行完畢并返回結(jié)果后才繼續(xù)執(zhí)行后續(xù)代碼。顯然只有當(dāng)調(diào)用者和被調(diào)用者的代碼執(zhí)行在同一個線程中才會發(fā)生這樣的串行執(zhí)行效果倔幼。
  • 異步調(diào)用:當(dāng)調(diào)用發(fā)起者觸發(fā)了異步調(diào)用后盖腿,它并不會等待異步調(diào)用中的代碼執(zhí)行完畢,因為異步調(diào)用會立馬返回损同,但并不包含執(zhí)行結(jié)果翩腐,執(zhí)行結(jié)果會用異步的方式另行通知調(diào)用者。當(dāng)調(diào)用者和被調(diào)用者的代碼執(zhí)行在不同線程時就會發(fā)生這種并行執(zhí)行效果膏燃。

異步調(diào)用在 App 開發(fā)中隨處可見茂卦,通常把耗時操作放到另一個線程執(zhí)行,比如寫文件:

suspend fun writeFile(content: String) { 
    // 寫文件 
}

// 啟動協(xié)程寫文件
val content = "xxx"
coroutineScope.launch { wirteFile(content) } 

kotlin 中的suspend方法用于表達一個異步過程组哩,“多個連續(xù)產(chǎn)生的異步過程”如何表達等龙?

for 循環(huán)是首先想到的方案:

val contents = listOf<String>(...) // 將要寫入文件的多個字串
contents.forEach { string ->
    coroutineScope.launch { writeFile(string) }
}

用 for 循環(huán)的前提條件是得先拿到所有需要進行異步操作的數(shù)據(jù)。但“多個連續(xù)產(chǎn)生的數(shù)據(jù)”這個場景下伶贰,數(shù)據(jù)是一點一點生成的蛛砰,沒法一下子全部拿到。比如“倒計時 1 分鐘黍衙,每 2 秒做一次耗時運算泥畅,計時結(jié)束后將所有運算結(jié)果累加并在主線程打印”。這個時候就要用“異步數(shù)據(jù)流”重新認識問題琅翻。

異步數(shù)據(jù)流用“生產(chǎn)者/消費”模型來解釋這個場景:倒計時器是這個場景中的生產(chǎn)者位仁,它每隔兩秒產(chǎn)生一個新數(shù)據(jù)。累加器是這個場景中的消費者方椎,他將所有異步數(shù)據(jù)累加聂抢。生產(chǎn)者和消費者之間就好像有一條管道,生產(chǎn)者從管道的一頭插入數(shù)據(jù)棠众,消費者從另一頭取數(shù)據(jù)琳疏。因為管道的存在,數(shù)據(jù)是有序的,遵循先進先出的原則空盼。

傳統(tǒng)方案

在給出 Flow 的解決方案之前疮薇,先看下傳統(tǒng)解決方案。

首先得實現(xiàn)一個定時器我注,它可以在異步線程中以一定時間間隔執(zhí)行異步操作。用線程池就再合適不過了:

// 倒計時器
class Countdown<T>(
    private var duration: Long, // 倒計時長
    private var interval: Long, // 倒計時間隔
    private val action: (Long) -> T // 倒計時后臺任務(wù)
) {
    // 任務(wù)結(jié)果累加值
    var acc: Any? = null 
    // 倒計時剩余時間
    private var remainTime = duration 
    // 任務(wù)開始回調(diào)
    var onStart: (() -> Unit)? = null 
    // 任務(wù)結(jié)束回調(diào)
    var onEnd: ((T?) -> Unit)? = null 
    // 任務(wù)結(jié)果累加器
    var accumulator: ((T, T) -> T)? = null 
    // 倒計時任務(wù)包裝類
    private val countdownRunnable by lazy { CountDownRunnable() }
    // 用于主線程回調(diào)的 Handler
    private val handler by lazy { Handler(Looper.getMainLooper()) } 
    // 線程池
    private val executor by lazy { Executors.newSingleThreadScheduledExecutor() } 

    // 啟動倒計時
    fun start(delay: Long = 0) {
        if (executor.isShutdown) return
        // 向主線程回調(diào)倒計時開始
        handler.post(onStart)
        executor.scheduleAtFixedRate(countdownRunnable, delay, interval, TimeUnit.MILLISECONDS)
    }

    // 將倒計時任務(wù)包裝成 Runnable
    private inner class CountDownRunnable : Runnable {
        override fun run() {
            remainTime -= interval
            // 執(zhí)行后臺任務(wù)并獲取返回值
            val value = action(remainTime)
            // 累加任務(wù)返回值
            acc = if (acc == null) value else accumulator?.invoke(acc as T, value)
            if (remainTime <= 0) {
                // 關(guān)閉倒計時
                executor?.shutdown()
                // 向主線程回調(diào)倒計時結(jié)束
                handler.post { onEnd?.invoke(acc as? T) }
            }
        }
    }
}

抽象出Countdown用于執(zhí)行后臺倒計時任務(wù)迟隅,它使用scheduleAtFixedRate()構(gòu)造線程池但骨,并按一定間隔執(zhí)行倒計時任務(wù)。

對外倒計時任務(wù)被表達成(Long) -> T智袭,即輸入倒計時時間輸出異步任務(wù)結(jié)果的 lambda奔缠。在內(nèi)部它又被包裝成一個 Runnable,以便在 run() 方法中實現(xiàn)倒計時及累加邏輯吼野。

然后就可以像這樣使用:

Countdown(60_000, 2_000) { remianTime -> calculate(remianTime) }.apply {
    onStart = { Log.v("test", "countdown start") }
    onEnd = { ret -> Log.v("test", "countdown end, ret=$ret") }
    accumulator = { acc, value -> acc + value }
}.start()

雖然不得不引入一些復(fù)雜度校哎,比如線程池、Handler瞳步、累加器闷哆。但得益于類的封裝和 Kotlin 語法糖,最終調(diào)用形式還是簡潔達意的单起。

Flow 方案

若用 Flow 就可以省去這些復(fù)雜度:

fun <T> countdown(
    duration: Long, 
    interval: Long, 
    onCountdown: suspend (Long) -> T
): Flow<T> =
    flow { (duration - interval downTo 0 step interval).forEach { emit(it) } }
        .onEach { delay(interval) }
        .onStart { emit(duration) }
        .map { onCountdown(it) }
        .flowOn(Dispatchers.Default)

定義了一個頂層方法countdown()抱怔,它返回一個流實例用于在異步線程中生產(chǎn)倒計時,并將倒計時傳入異步任務(wù)onCountdown()執(zhí)行嘀倒。然后就可以像這樣使用:

val mainScope = MainScope()
mainScope.launch {
    val ret = countdown(60_000, 2_000) { remianTime -> calculate(remianTime) }
        .onStart { Log.v("test", "countdown start") }
        .onCompletion { Log.v("test", "countdown end") }
        .reduce { acc, value -> acc + value }
    Log.v("test", "coutdown acc ret = $ret")
}

下面就從源碼出發(fā)屈留,一點一點分析流方案背后的原理。

Flow 如何生產(chǎn)并消費數(shù)據(jù)测蘑?

Flow 的定義及其簡單灌危,只包含了 2 個接口:

public interface Flow<out T> {
    public suspend fun collect(collector: FlowCollector<T>)
}

Flow 是一個接口,其中定義了一個collect()方法碳胳,表示“流可以被收集”勇蝙,而收集器也是一個接口:

public interface FlowCollector<in T> {
    public suspend fun emit(value: T)
}

流收集器接口中定義了一個emit()方法表示“流收集器可以發(fā)射數(shù)據(jù)”。

若套用“生產(chǎn)者/消費者”模型固逗,可理解為流中數(shù)據(jù)可以被消費浅蚪,流收集器可以生產(chǎn)數(shù)據(jù)

一個最簡單的生產(chǎn)和消費數(shù)據(jù)的場景:

// 啟動協(xié)程
GlobalScope.launch {
    // 構(gòu)建流
    flow { // 定義流如何生產(chǎn)數(shù)據(jù)
        (1 .. 3).forEach {
            // 每隔 1 秒發(fā)射 1 個數(shù)字
            delay(1000)
            emit(it)
        }
    }.collect { // 定義如何消費數(shù)據(jù)
        Log.v("test", "num=$it") // 打印數(shù)字
    }
}

  • 通過flow{ block }構(gòu)建了一個流烫罩,它是一個頂層方法:

    // 構(gòu)建安全流(傳入 block 定義如何生產(chǎn)流數(shù)據(jù))
    public fun <T> flow(block: suspend FlowCollector<T>.() -> Unit): Flow<T> =
        SafeFlow(block)
    
        // 安全流繼承自抽象流
        private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
        override suspend fun collectSafely(collector: FlowCollector<T>) {
            collector.block()// 收集流數(shù)據(jù)時調(diào)用 block惜傲,即觸發(fā)生產(chǎn)數(shù)據(jù)
        }
    }
    
    // 抽象流
    public abstract class AbstractFlow<T> : Flow<T>, CancellableFlow<T> {
        // 收集數(shù)據(jù)的具體實現(xiàn)
        public final override suspend fun collect(collector: FlowCollector<T>) {
            // 構(gòu)建 FlowCollector 并傳入 collectSafely()
            val safeCollector = SafeCollector(collector, coroutineContext)
            try {
                collectSafely(safeCollector)
            } finally {
                safeCollector.releaseIntercepted()
            }
        }
        public abstract suspend fun collectSafely(collector: FlowCollector<T>)
    }
    
    

    flow { block }中的 block 定義了如何生產(chǎn)數(shù)據(jù),而 block 是在collect()中被調(diào)用的贝攒。所以流中的數(shù)據(jù)不會自動生產(chǎn)盗誊,直到流被收集的那一刻。

  • 通過collect{ action }收集了這個流,其中的 action 定義了如何消費數(shù)據(jù)哈踱。collect()是 Flow 的擴展方法:

    public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit =
        collect(object : FlowCollector<T> {
            override suspend fun emit(value: T) = action(value)
        })
    
    

    在收集數(shù)據(jù)時新建了一個流收集器荒适,流收集器可以發(fā)射數(shù)據(jù),發(fā)射的方式就是直接將數(shù)據(jù)傳遞給 action开镣,即數(shù)據(jù)消費者刀诬。

將上述兩點綜合一下:

  1. 流中的數(shù)據(jù)不會自動生產(chǎn),直到流被收集的那一刻邪财。當(dāng)流被收集的瞬間陕壹,數(shù)據(jù)開始生產(chǎn)并被發(fā)射出去,通過流收集器將其傳遞給消費者树埠。

  2. 流和流收集器是成對出現(xiàn)的概念糠馆。流是一組按序產(chǎn)生的數(shù)據(jù),數(shù)據(jù)的產(chǎn)生表現(xiàn)為通過流收集器發(fā)射數(shù)據(jù)怎憋,在這里流收集器像是流數(shù)據(jù)容器(雖然它不持有任何一條數(shù)據(jù))又碌,它定義了如何將數(shù)據(jù)傳遞給消費者。

所以上述的實例代碼绊袋,無異于如下的同步調(diào)用:

// 生產(chǎn)者消費者偽代碼
flow {
    emit(data) // 生產(chǎn)
}.collect { 
    action(data) // 消費
}

// 生產(chǎn)者消費者實際的調(diào)用鏈
Flow.collect {
    emit(data) {
        action(data)
    }
}

經(jīng)過一些 lambda 的抽象毕匀,看上去生產(chǎn)者和消費者好像分居兩地,但其實它們是運行在同一個線程中的同步調(diào)用鏈癌别,即:

默認情況下期揪,流中生產(chǎn)和消費數(shù)據(jù)是在同一個線程中進行的。

現(xiàn)在回看一下倒計時流是如何生產(chǎn)并消費數(shù)據(jù)的:

fun <T> countdown(
    duration: Long, 
    interval: Long, 
    onCountdown: suspend (Long) -> T
): Flow<T> =
    flow { (duration - interval downTo 0 step interval).forEach { emit(it) } }
        .onEach { delay(interval) }
        .onStart { emit(duration) }
        .map { onCountdown(it) }
        .flowOn(Dispatchers.Default)

countdown() 方法的第一句就定義了倒計時流中生產(chǎn)數(shù)據(jù)的方式:

flow { (duration - interval downTo 0 step interval).forEach { emit(it) } }

flow {}構(gòu)建了一個流實例规个。在內(nèi)部創(chuàng)建了一個從 duration - interval 到 0 步長為 step 的值序列凤薛,它被遍歷的同時調(diào)用emit()將每個值發(fā)射出去。

創(chuàng)建了流實例后诞仓,鏈式調(diào)用了一系列方法缤苫,但并沒有collect(),是不是說 countdown() 方法只定義了生產(chǎn)數(shù)據(jù)并沒有定義如何消費數(shù)據(jù)墅拭?

collect()是流數(shù)據(jù)的消費者活玲,生產(chǎn)者和消費者之間的管道可以插入“中間消費者”,它們優(yōu)先消費上游數(shù)據(jù)后再轉(zhuǎn)發(fā)給下游谍婉。正是這些中間消費者舒憾,讓流產(chǎn)生了無窮多樣的玩法。

中間消費者

transform()

transform()是一個最常見的中間消費者穗熬,它是一個 Flow 的擴展方法:

public inline fun <T, R> Flow<T>.transform(
    crossinline transform: suspend FlowCollector<R>.(value: T) -> Unit
): Flow<R> = 
    // 構(gòu)建下游流
    flow {
        // 收集上游數(shù)據(jù)(這里的邏輯在下游流被收集的時候調(diào)用)
        collect { value ->
            // 處理上游數(shù)據(jù)
            return@collect transform(value)
        }
}

transform() 做了三件事情:構(gòu)建了一個新流(下游流)镀迂,當(dāng)下游流被收集時,會立馬收集上游的流唤蔗,當(dāng)收集到上游數(shù)據(jù)后將其傳遞給transform這個 lambda探遵。

FlowCollector<R>.(value: T) -> Unit是一個帶接收者的 lambda窟赏,接收者是FlowCollector。調(diào)用這種 labmda 時需要指定接收者箱季,在 transform() 的語境中接收者是this涯穷,所以省略了,如果將其補全藏雏,就是下面這樣:

public inline fun <T, R> Flow<T>.transform(
    crossinline transform: suspend FlowCollector<R>.(value: T) -> Unit
): Flow<R> = 
    // 構(gòu)建下游流
    flow { this ->
        collect { value ->
            return@collect this.transform(value)
        }
}

// 構(gòu)建流的 flow {} 中的 lambda 也是帶接收者的
public fun <T> flow(block: suspend FlowCollector<T>.() -> Unit): Flow<T> = 
    SafeFlow(block)

FlowCollector作為接收者是有好處的拷况,這樣就可以在 lambda 中方便地訪問到FlowCollector.emit(),即transform()將“下游流如何生產(chǎn)數(shù)據(jù)”這個策略交由外部傳入的 lambda 決定掘殴。(關(guān)于策略模式的詳解可以點擊一句話總結(jié)殊途同歸的設(shè)計模式:工廠模式=蝠嘉?策略模式=?模版方法模式)杯巨,所以可以得出這樣的結(jié)論:

transform() 建立了一種在流上攔截并轉(zhuǎn)發(fā)的機制:新建下游流,它生產(chǎn)數(shù)據(jù)的方式是通過收集上游數(shù)據(jù)努酸,并將數(shù)據(jù)轉(zhuǎn)發(fā)到一個帶有發(fā)射數(shù)據(jù)能力的 lambda 中服爷。transform() 這個中間消費者在攔截上游數(shù)據(jù)后,就可隨心所欲地將其變換后再轉(zhuǎn)發(fā)給下游消費者获诈。

onEach() & map() & 自定義中間消費者

所以 transform() 通常用于定義新的中間消費者仍源,onEach()的定義就借助于它:

public fun <T> Flow<T>.onEach(action: suspend (T) -> Unit): Flow<T> = transform { value ->
    action(value)
    return@transform emit(value)
}

所有的中間消費者都定義成 Flow 的擴展方法,而且都會返回一個新建的下游流舔涎。這樣做是為了讓不同的中間消費者可以方便地通過鏈式調(diào)用串聯(lián)在一起笼踩。

onEach() 通過 transform() 構(gòu)建了一個下游流,并在轉(zhuǎn)發(fā)每一個上游流數(shù)據(jù)前又做了一件額外的事情亡嫌,用lambda action表示嚎于。

map() 也是通過 transform() 實現(xiàn)的:

public inline fun <T, R> Flow<T>.map(crossinline transform: suspend (value: T) -> R): Flow<R> = 
    transform { value -> return@transform emit(transform(value)) }

map() 通過 transform() 構(gòu)建了一個下游流,并且在拿到上游流數(shù)據(jù)時先將其進行了transform變換挟冠,然后再轉(zhuǎn)發(fā)出去于购。

利用 transform() 的機制,可以很方便地自定義一個中間消費者:

fun <T, R> Flow<T>.filterMap(
    predicate: (T) -> Boolean, 
    transform: suspend (T) -> R
): Flow<R> = 
    transform { value -> if (predicate(value)) emit(transform(value)) }

filterMap() 只對上游數(shù)據(jù)中滿足 predicate 條件的數(shù)據(jù)進行變換并發(fā)射知染。

onStart()

onStart() 也是中間消費者肋僧,但它沒有借助于 transform(),而是通過unsafeFlow()構(gòu)建了一個下游流:

public fun <T> Flow<T>.onStart(
    action: suspend FlowCollector<T>.() -> Unit
): Flow<T> = unsafeFlow { // 構(gòu)建下游流
    val safeCollector = SafeCollector<T>(this, currentCoroutineContext())
    try {
        safeCollector.action() // 在收集上游流數(shù)據(jù)之前執(zhí)行動作
    } finally {
        safeCollector.releaseIntercepted()
    }
    collect(this) // 收集上游流數(shù)據(jù)
}

internal inline fun <T> unsafeFlow(crossinline block: suspend FlowCollector<T>.() -> Unit): Flow<T> {
    // 構(gòu)建新流
    return object : Flow<T> {
        override suspend fun collect(collector: FlowCollector<T>) {
            collector.block()
        }
    }
}

unsafeFlow() 直接實例化了Flow接口控淡,并定義了該流被收集時執(zhí)行的操作嫌吠,即調(diào)用block。所以 unsafeFlow() 和 transform 很類似掺炭,都新建下游流以收集了上游數(shù)據(jù)辫诅,只不過在收集動作(所有數(shù)據(jù)發(fā)射之前)之前做了一件額外的事。

onCompletion()

public fun <T> Flow<T>.onCompletion(
    action: suspend FlowCollector<T>.(cause: Throwable?) -> Unit
): Flow<T> = unsafeFlow { // 構(gòu)建下游流
    try {
        collect(this) // 1.先收集上游流數(shù)據(jù)
    } catch (e: Throwable) {
        ThrowingCollector(e).invokeSafely(action, e)
        throw e
    }
    val sc = SafeCollector(this, currentCoroutineContext())
    try {
        sc.action(null) // 2.再執(zhí)行動作
    } finally {
        sc.releaseIntercepted()
    }
}

onCompletion() 的實現(xiàn)和 onStart() 很類似涧狮,只不過是在收集數(shù)據(jù)之后再執(zhí)行動作泥栖。

因為 onStart() 和 onCompletion() 都用下游流套上游流的方式實現(xiàn)簇宽,只是收集數(shù)據(jù)和執(zhí)行動作的順序不同,就會產(chǎn)生下面這樣有趣的效果:

GlobalScope.launch {
    flow { 
        (1 .. 3).forEach {
            delay(1000)
            emit(it)
        }
    }.onStart { Log.v("test","start1") }
        .onStart { Log.v("test","start2") }
        .onCompletion { Log.v("test","complete1") }
        .onCompletion { Log.v("test","complete2") }
        .collect { Log.v("test", "$it") }
}

上述代碼的輸出結(jié)果如下:

start2
start1
1
2
3
complete1
complete2

鏈式調(diào)用中出現(xiàn)多個onStart { action }時吧享,后出現(xiàn)的 action 會先執(zhí)行魏割,因為后續(xù) onStart 構(gòu)建的下游流包在了上游 onStart 的外面,并且 action 會在收集上游流數(shù)據(jù)之前執(zhí)行钢颂。

而這個結(jié)論卻不能沿用到onCompletion { action }钞它,雖然 onCompletion 構(gòu)建的下游流也包裹在上游 onCompletion 外面,但是 action 總是在收集上游流之后執(zhí)行殊鞭。

終端消費者

上面所有的擴展方法之所以稱為“中間消費者”是因為它們都構(gòu)建了一個新的下游流遭垛,并且只有當(dāng)下游流被收集的時候,它們才會去收集上游流操灿。也就是說锯仪,如果沒有收集下游流,流中的數(shù)據(jù)就永遠不會被發(fā)射趾盐,這個特性稱為冷流庶喜。

看一個冷流的例子:

// 執(zhí)行式的
suspend fun get(): List<String> = 
    listof("a", "b", "c").onEach { 
        delay(1000)
        print(it)
    }

// 聲明式的
fun get(): Flow<String> = 
    flowOf("a", "b", "c").onEach { 
        delay(1000)
        print(it)
    }

分別調(diào)用這兩個 get() 方法時,第一個 get 會立馬打印出結(jié)果救鲤,而第二個什么也不會打印久窟。因為第二個 get() 只是聲明了如何構(gòu)建一個冷流,它并沒有被收集本缠,所以也不會發(fā)射數(shù)據(jù)斥扛。

Flow 是冷流,冷流不會發(fā)射數(shù)據(jù)丹锹,直到它被收集稀颁,所以冷流是“聲明式的”。

所有能觸發(fā)收集數(shù)據(jù)動作的消費者稱為終端消費者楣黍,它就像點燃鞭炮的星火峻村,使得被若干個中間消費者套娃了的流從外向內(nèi)(從下游到上游)一個個的被收集,最終傳導(dǎo)到原始流锡凝,觸發(fā)數(shù)據(jù)的發(fā)射粘昨。

倒計時 demo 的reduce()就是一個終端消費者:

val mainScope = MainScope()
mainScope.launch {
    val ret = countdown(60_000, 2_000) { io(it) }
        .onStart { Log.v("test", "countdown start") }
        .onCompletion { Log.v("test", "countdown end") }
        .reduce { acc, value -> acc + value } // 終端消費者:計算所有異步結(jié)果的和
    // 因為 reduce() 是一個 suspend 方法,所以會掛起協(xié)程窜锯,直到倒計時完成才打印所有異步結(jié)果的和
    Log.v("test", "coutdown acc ret = $ret")
}

reduce() 的源碼如下:

public suspend fun <S, T : S> Flow<T>.reduce(
    operation: suspend (accumulator: S, value: T) -> S // 累加算法
): S {
    var accumulator: Any? = NULL
    // 收集數(shù)據(jù)
    collect { value ->
        // 將收集的數(shù)據(jù)累加
        accumulator = if (accumulator !== NULL) {
            operation(accumulator as S, value)
        } else {
            value
        }
    }

    if (accumulator === NULL) throw NoSuchElementException("Empty flow can't be reduced")
    // 返回累加和
    return accumulator as S
}

reduce() 并沒有構(gòu)建新流张肾,而是直接收集了數(shù)據(jù),然后將所有數(shù)據(jù)進行累加并返回锚扎。

所有的終端消費者都是 suspend 方法吞瞪,這意味著收集數(shù)據(jù)必須在協(xié)程中進行。demo 中使用 MainScope 啟動協(xié)程驾孔,所以異步結(jié)果的和會在主線程中被打印芍秆。

線程切換

demo 中還剩下最后一個flowOn()惯疙,它是中間消費者,略復(fù)雜妖啥,限于篇幅原因霉颠,下次再分析。但這不影響先了解它的效果:它會切換所有上游代碼執(zhí)行的線程荆虱,但不改變下游代碼執(zhí)行的線程蒿偎。

countdown() 方法通過flowOn(Dispatchers.Default),實現(xiàn)了后臺執(zhí)行倒計時任務(wù)怀读。而 reduce() 的調(diào)用發(fā)生在 flowOn() 之后诉位,所以異步任務(wù)結(jié)果累加還是在主線程進行的。

onStart()菜枷、onEach()珊豹、onCompletion()迁筛、map()捺信、reduce()法希,這些消費者對數(shù)據(jù)的處理都被包裝在用suspend修飾的 lambda 中。這意味著利用協(xié)程可以輕松地切換每個消費者運行的線程坷衍。也正是suspend的存在,運行在同一線程中的下游消費者不會發(fā)生背壓条舔,因為下游消費者的掛起方法會天然阻塞上游生產(chǎn)數(shù)據(jù)的速度枫耳。

總結(jié)

  1. 異步數(shù)據(jù)流可以理解為一條時間軸上按序產(chǎn)生的數(shù)據(jù),它可用于表達多個連續(xù)的異步過程孟抗。
  2. 異步數(shù)據(jù)流也可以用“生產(chǎn)者/消費者”模型來理解迁杨,生產(chǎn)者和消費者之間就好像有一條管道,生產(chǎn)者從管道的一頭插入數(shù)據(jù)凄硼,消費者從另一頭取數(shù)據(jù)铅协。因為管道的存在,數(shù)據(jù)是有序的摊沉,遵循先進先出的原則狐史。
  3. Kotlin 中的suspend方法用于表達一個異步過程,而Flow用于表達多連續(xù)個異步過程说墨。Flow是冷流骏全,冷流不會發(fā)射數(shù)據(jù),直到它被收集的那一刻尼斧,所以冷流是“聲明式的”姜贡。
  4. 當(dāng)Flow被收集的瞬間,數(shù)據(jù)開始生產(chǎn)并被發(fā)射出去棺棵,通過流收集器FlowCollector將其傳遞給消費者楼咳。流和流收集器是成對出現(xiàn)的概念熄捍。流是一組按序產(chǎn)生的數(shù)據(jù),數(shù)據(jù)的產(chǎn)生表現(xiàn)為通過流收集器發(fā)射數(shù)據(jù)母怜,在這里流收集器像是流數(shù)據(jù)容器(雖然它不持有任何一條數(shù)據(jù))余耽,它定義了如何將數(shù)據(jù)傳遞給消費者。
  5. 異步數(shù)據(jù)流中糙申,生產(chǎn)者和消費者之間可以插入中間消費者宾添。中間消費者建立了流上的攔截并轉(zhuǎn)發(fā)機制:新建下游流,它生產(chǎn)數(shù)據(jù)的方式是通過收集上游數(shù)據(jù)柜裸,并轉(zhuǎn)發(fā)到一個帶有發(fā)射數(shù)據(jù)能力的 lambda 中缕陕。擁有多個中間消費者的流就像“套娃”一樣,下游流套在上游流外面疙挺。中間消費者通過這種方式攔截了原始數(shù)據(jù)扛邑,就可以對其做任意變換再轉(zhuǎn)發(fā)給下游消費者。因為 Flow 是冷流铐然,所有的中間消費者只是定義了一連串待執(zhí)行的調(diào)用鏈蔬崩。
  6. 所有能觸發(fā)收集數(shù)據(jù)動作的消費者稱為終端消費者,它就像點燃鞭炮的星火搀暑,使得被若干個中間消費者套娃的流從外向內(nèi)(從下游到上游)一個個的被收集沥阳,最終傳導(dǎo)到原始流,觸發(fā)數(shù)據(jù)的發(fā)射自点。
  7. 默認情況下桐罕,流中生產(chǎn)和消費數(shù)據(jù)是在同一個線程中進行的。但可以通過flowOn()改變上游流執(zhí)行的線程桂敛,這并不影響下游流所執(zhí)行的線程功炮。
  8. Flow中生產(chǎn)和消費數(shù)據(jù)的操作都被包裝在用 suspend 修飾的 lambda 中,用協(xié)程就可以輕松的實現(xiàn)異步生產(chǎn)术唬,異步消費薪伏。

本文轉(zhuǎn)自 https://juejin.cn/post/6989032238079803429,如有侵權(quán)粗仓,請聯(lián)系刪除嫁怀。
作者:唐子玄

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市借浊,隨后出現(xiàn)的幾起案子眶掌,更是在濱河造成了極大的恐慌,老刑警劉巖巴碗,帶你破解...
    沈念sama閱讀 219,188評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件朴爬,死亡現(xiàn)場離奇詭異,居然都是意外死亡橡淆,警方通過查閱死者的電腦和手機召噩,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,464評論 3 395
  • 文/潘曉璐 我一進店門母赵,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人具滴,你說我怎么就攤上這事凹嘲。” “怎么了构韵?”我有些...
    開封第一講書人閱讀 165,562評論 0 356
  • 文/不壞的土叔 我叫張陵周蹭,是天一觀的道長。 經(jīng)常有香客問我疲恢,道長凶朗,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,893評論 1 295
  • 正文 為了忘掉前任显拳,我火速辦了婚禮棚愤,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘杂数。我一直安慰自己宛畦,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 67,917評論 6 392
  • 文/花漫 我一把揭開白布揍移。 她就那樣靜靜地躺著次和,像睡著了一般。 火紅的嫁衣襯著肌膚如雪那伐。 梳的紋絲不亂的頭發(fā)上踏施,一...
    開封第一講書人閱讀 51,708評論 1 305
  • 那天,我揣著相機與錄音喧锦,去河邊找鬼读规。 笑死抓督,一個胖子當(dāng)著我的面吹牛燃少,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播铃在,決...
    沈念sama閱讀 40,430評論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼阵具,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了定铜?” 一聲冷哼從身側(cè)響起阳液,我...
    開封第一講書人閱讀 39,342評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎揣炕,沒想到半個月后帘皿,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,801評論 1 317
  • 正文 獨居荒郊野嶺守林人離奇死亡畸陡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,976評論 3 337
  • 正文 我和宋清朗相戀三年鹰溜,在試婚紗的時候發(fā)現(xiàn)自己被綠了虽填。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,115評論 1 351
  • 序言:一個原本活蹦亂跳的男人離奇死亡曹动,死狀恐怖斋日,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情墓陈,我是刑警寧澤恶守,帶...
    沈念sama閱讀 35,804評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站贡必,受9級特大地震影響兔港,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜赊级,卻給世界環(huán)境...
    茶點故事閱讀 41,458評論 3 331
  • 文/蒙蒙 一押框、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧理逊,春花似錦橡伞、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,008評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至羡洛,卻和暖如春挂脑,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背欲侮。 一陣腳步聲響...
    開封第一講書人閱讀 33,135評論 1 272
  • 我被黑心中介騙來泰國打工崭闲, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人威蕉。 一個月前我還...
    沈念sama閱讀 48,365評論 3 373
  • 正文 我出身青樓刁俭,卻偏偏與公主長得像,于是被迫代替她去往敵國和親韧涨。 傳聞我的和親對象是個殘疾皇子牍戚,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,055評論 2 355

推薦閱讀更多精彩內(nèi)容