什么是“異步數(shù)據(jù)流”?它在什么業(yè)務(wù)場景下有用武之地匀奏?它背后的原理是什么瓶殃?讀一讀 Flow 的源碼,嘗試回答這些問題位谋。
同步 & 異步 & 連續(xù)異步
同步和異步是用來形容“調(diào)用”的:
- 同步調(diào)用:當(dāng)調(diào)用發(fā)起者觸發(fā)了同步調(diào)用后山析,它會等待調(diào)用執(zhí)行完畢并返回結(jié)果后才繼續(xù)執(zhí)行后續(xù)代碼。顯然只有當(dāng)調(diào)用者和被調(diào)用者的代碼執(zhí)行在同一個線程中才會發(fā)生這樣的串行執(zhí)行效果倔幼。
- 異步調(diào)用:當(dāng)調(diào)用發(fā)起者觸發(fā)了異步調(diào)用后盖腿,它并不會等待異步調(diào)用中的代碼執(zhí)行完畢,因為異步調(diào)用會立馬返回损同,但并不包含執(zhí)行結(jié)果翩腐,執(zhí)行結(jié)果會用異步的方式另行通知調(diào)用者。當(dāng)調(diào)用者和被調(diào)用者的代碼執(zhí)行在不同線程時就會發(fā)生這種并行執(zhí)行效果膏燃。
異步調(diào)用在 App 開發(fā)中隨處可見茂卦,通常把耗時操作放到另一個線程執(zhí)行,比如寫文件:
suspend fun writeFile(content: String) {
// 寫文件
}
// 啟動協(xié)程寫文件
val content = "xxx"
coroutineScope.launch { wirteFile(content) }
kotlin 中的suspend
方法用于表達一個異步過程组哩,“多個連續(xù)產(chǎn)生的異步過程”如何表達等龙?
for 循環(huán)是首先想到的方案:
val contents = listOf<String>(...) // 將要寫入文件的多個字串
contents.forEach { string ->
coroutineScope.launch { writeFile(string) }
}
用 for 循環(huán)的前提條件是得先拿到所有需要進行異步操作的數(shù)據(jù)。但“多個連續(xù)產(chǎn)生的數(shù)據(jù)”這個場景下伶贰,數(shù)據(jù)是一點一點生成的蛛砰,沒法一下子全部拿到。比如“倒計時 1 分鐘黍衙,每 2 秒做一次耗時運算泥畅,計時結(jié)束后將所有運算結(jié)果累加并在主線程打印”。這個時候就要用“異步數(shù)據(jù)流”重新認識問題琅翻。
異步數(shù)據(jù)流用“生產(chǎn)者/消費”模型來解釋這個場景:倒計時器是這個場景中的生產(chǎn)者位仁,它每隔兩秒產(chǎn)生一個新數(shù)據(jù)。累加器是這個場景中的消費者方椎,他將所有異步數(shù)據(jù)累加聂抢。生產(chǎn)者和消費者之間就好像有一條管道,生產(chǎn)者從管道的一頭插入數(shù)據(jù)棠众,消費者從另一頭取數(shù)據(jù)琳疏。因為管道的存在,數(shù)據(jù)是有序的,遵循先進先出的原則空盼。
傳統(tǒng)方案
在給出 Flow 的解決方案之前疮薇,先看下傳統(tǒng)解決方案。
首先得實現(xiàn)一個定時器我注,它可以在異步線程中以一定時間間隔執(zhí)行異步操作。用線程池就再合適不過了:
// 倒計時器
class Countdown<T>(
private var duration: Long, // 倒計時長
private var interval: Long, // 倒計時間隔
private val action: (Long) -> T // 倒計時后臺任務(wù)
) {
// 任務(wù)結(jié)果累加值
var acc: Any? = null
// 倒計時剩余時間
private var remainTime = duration
// 任務(wù)開始回調(diào)
var onStart: (() -> Unit)? = null
// 任務(wù)結(jié)束回調(diào)
var onEnd: ((T?) -> Unit)? = null
// 任務(wù)結(jié)果累加器
var accumulator: ((T, T) -> T)? = null
// 倒計時任務(wù)包裝類
private val countdownRunnable by lazy { CountDownRunnable() }
// 用于主線程回調(diào)的 Handler
private val handler by lazy { Handler(Looper.getMainLooper()) }
// 線程池
private val executor by lazy { Executors.newSingleThreadScheduledExecutor() }
// 啟動倒計時
fun start(delay: Long = 0) {
if (executor.isShutdown) return
// 向主線程回調(diào)倒計時開始
handler.post(onStart)
executor.scheduleAtFixedRate(countdownRunnable, delay, interval, TimeUnit.MILLISECONDS)
}
// 將倒計時任務(wù)包裝成 Runnable
private inner class CountDownRunnable : Runnable {
override fun run() {
remainTime -= interval
// 執(zhí)行后臺任務(wù)并獲取返回值
val value = action(remainTime)
// 累加任務(wù)返回值
acc = if (acc == null) value else accumulator?.invoke(acc as T, value)
if (remainTime <= 0) {
// 關(guān)閉倒計時
executor?.shutdown()
// 向主線程回調(diào)倒計時結(jié)束
handler.post { onEnd?.invoke(acc as? T) }
}
}
}
}
抽象出Countdown
用于執(zhí)行后臺倒計時任務(wù)迟隅,它使用scheduleAtFixedRate()
構(gòu)造線程池但骨,并按一定間隔執(zhí)行倒計時任務(wù)。
對外倒計時任務(wù)被表達成(Long) -> T
智袭,即輸入倒計時時間輸出異步任務(wù)結(jié)果的 lambda奔缠。在內(nèi)部它又被包裝成一個 Runnable,以便在 run() 方法中實現(xiàn)倒計時及累加邏輯吼野。
然后就可以像這樣使用:
Countdown(60_000, 2_000) { remianTime -> calculate(remianTime) }.apply {
onStart = { Log.v("test", "countdown start") }
onEnd = { ret -> Log.v("test", "countdown end, ret=$ret") }
accumulator = { acc, value -> acc + value }
}.start()
雖然不得不引入一些復(fù)雜度校哎,比如線程池、Handler瞳步、累加器闷哆。但得益于類的封裝和 Kotlin 語法糖,最終調(diào)用形式還是簡潔達意的单起。
Flow 方案
若用 Flow 就可以省去這些復(fù)雜度:
fun <T> countdown(
duration: Long,
interval: Long,
onCountdown: suspend (Long) -> T
): Flow<T> =
flow { (duration - interval downTo 0 step interval).forEach { emit(it) } }
.onEach { delay(interval) }
.onStart { emit(duration) }
.map { onCountdown(it) }
.flowOn(Dispatchers.Default)
定義了一個頂層方法countdown()
抱怔,它返回一個流實例用于在異步線程中生產(chǎn)倒計時,并將倒計時傳入異步任務(wù)onCountdown()
執(zhí)行嘀倒。然后就可以像這樣使用:
val mainScope = MainScope()
mainScope.launch {
val ret = countdown(60_000, 2_000) { remianTime -> calculate(remianTime) }
.onStart { Log.v("test", "countdown start") }
.onCompletion { Log.v("test", "countdown end") }
.reduce { acc, value -> acc + value }
Log.v("test", "coutdown acc ret = $ret")
}
下面就從源碼出發(fā)屈留,一點一點分析流方案背后的原理。
Flow 如何生產(chǎn)并消費數(shù)據(jù)测蘑?
Flow 的定義及其簡單灌危,只包含了 2 個接口:
public interface Flow<out T> {
public suspend fun collect(collector: FlowCollector<T>)
}
Flow 是一個接口,其中定義了一個collect()
方法碳胳,表示“流可以被收集”勇蝙,而收集器也是一個接口:
public interface FlowCollector<in T> {
public suspend fun emit(value: T)
}
流收集器接口中定義了一個emit()
方法表示“流收集器可以發(fā)射數(shù)據(jù)”。
若套用“生產(chǎn)者/消費者”模型固逗,可理解為流中數(shù)據(jù)可以被消費浅蚪,流收集器可以生產(chǎn)數(shù)據(jù)。
一個最簡單的生產(chǎn)和消費數(shù)據(jù)的場景:
// 啟動協(xié)程
GlobalScope.launch {
// 構(gòu)建流
flow { // 定義流如何生產(chǎn)數(shù)據(jù)
(1 .. 3).forEach {
// 每隔 1 秒發(fā)射 1 個數(shù)字
delay(1000)
emit(it)
}
}.collect { // 定義如何消費數(shù)據(jù)
Log.v("test", "num=$it") // 打印數(shù)字
}
}
-
通過
flow{ block }
構(gòu)建了一個流烫罩,它是一個頂層方法:// 構(gòu)建安全流(傳入 block 定義如何生產(chǎn)流數(shù)據(jù)) public fun <T> flow(block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block) // 安全流繼承自抽象流 private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() { override suspend fun collectSafely(collector: FlowCollector<T>) { collector.block()// 收集流數(shù)據(jù)時調(diào)用 block惜傲,即觸發(fā)生產(chǎn)數(shù)據(jù) } } // 抽象流 public abstract class AbstractFlow<T> : Flow<T>, CancellableFlow<T> { // 收集數(shù)據(jù)的具體實現(xiàn) public final override suspend fun collect(collector: FlowCollector<T>) { // 構(gòu)建 FlowCollector 并傳入 collectSafely() val safeCollector = SafeCollector(collector, coroutineContext) try { collectSafely(safeCollector) } finally { safeCollector.releaseIntercepted() } } public abstract suspend fun collectSafely(collector: FlowCollector<T>) }
flow { block }
中的 block 定義了如何生產(chǎn)數(shù)據(jù),而 block 是在collect()
中被調(diào)用的贝攒。所以流中的數(shù)據(jù)不會自動生產(chǎn)盗誊,直到流被收集的那一刻。 -
通過
collect{ action }
收集了這個流,其中的 action 定義了如何消費數(shù)據(jù)哈踱。collect()
是 Flow 的擴展方法:public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit = collect(object : FlowCollector<T> { override suspend fun emit(value: T) = action(value) })
在收集數(shù)據(jù)時新建了一個流收集器荒适,流收集器可以發(fā)射數(shù)據(jù),發(fā)射的方式就是直接將數(shù)據(jù)傳遞給 action开镣,即數(shù)據(jù)消費者刀诬。
將上述兩點綜合一下:
流中的數(shù)據(jù)不會自動生產(chǎn),直到流被收集的那一刻邪财。當(dāng)流被收集的瞬間陕壹,數(shù)據(jù)開始生產(chǎn)并被發(fā)射出去,通過流收集器將其傳遞給消費者树埠。
流和流收集器是成對出現(xiàn)的概念糠馆。流是一組按序產(chǎn)生的數(shù)據(jù),數(shù)據(jù)的產(chǎn)生表現(xiàn)為通過流收集器發(fā)射數(shù)據(jù)怎憋,在這里流收集器像是流數(shù)據(jù)容器(雖然它不持有任何一條數(shù)據(jù))又碌,它定義了如何將數(shù)據(jù)傳遞給消費者。
所以上述的實例代碼绊袋,無異于如下的同步調(diào)用:
// 生產(chǎn)者消費者偽代碼
flow {
emit(data) // 生產(chǎn)
}.collect {
action(data) // 消費
}
// 生產(chǎn)者消費者實際的調(diào)用鏈
Flow.collect {
emit(data) {
action(data)
}
}
經(jīng)過一些 lambda 的抽象毕匀,看上去生產(chǎn)者和消費者好像分居兩地,但其實它們是運行在同一個線程中的同步調(diào)用鏈癌别,即:
默認情況下期揪,流中生產(chǎn)和消費數(shù)據(jù)是在同一個線程中進行的。
現(xiàn)在回看一下倒計時流是如何生產(chǎn)并消費數(shù)據(jù)的:
fun <T> countdown(
duration: Long,
interval: Long,
onCountdown: suspend (Long) -> T
): Flow<T> =
flow { (duration - interval downTo 0 step interval).forEach { emit(it) } }
.onEach { delay(interval) }
.onStart { emit(duration) }
.map { onCountdown(it) }
.flowOn(Dispatchers.Default)
countdown() 方法的第一句就定義了倒計時流中生產(chǎn)數(shù)據(jù)的方式:
flow { (duration - interval downTo 0 step interval).forEach { emit(it) } }
flow {}
構(gòu)建了一個流實例规个。在內(nèi)部創(chuàng)建了一個從 duration - interval 到 0 步長為 step 的值序列凤薛,它被遍歷的同時調(diào)用emit()
將每個值發(fā)射出去。
創(chuàng)建了流實例后诞仓,鏈式調(diào)用了一系列方法缤苫,但并沒有collect()
,是不是說 countdown() 方法只定義了生產(chǎn)數(shù)據(jù)并沒有定義如何消費數(shù)據(jù)墅拭?
collect()
是流數(shù)據(jù)的消費者活玲,生產(chǎn)者和消費者之間的管道可以插入“中間消費者”,它們優(yōu)先消費上游數(shù)據(jù)后再轉(zhuǎn)發(fā)給下游谍婉。正是這些中間消費者舒憾,讓流產(chǎn)生了無窮多樣的玩法。
中間消費者
transform()
transform()
是一個最常見的中間消費者穗熬,它是一個 Flow 的擴展方法:
public inline fun <T, R> Flow<T>.transform(
crossinline transform: suspend FlowCollector<R>.(value: T) -> Unit
): Flow<R> =
// 構(gòu)建下游流
flow {
// 收集上游數(shù)據(jù)(這里的邏輯在下游流被收集的時候調(diào)用)
collect { value ->
// 處理上游數(shù)據(jù)
return@collect transform(value)
}
}
transform() 做了三件事情:構(gòu)建了一個新流(下游流)镀迂,當(dāng)下游流被收集時,會立馬收集上游的流唤蔗,當(dāng)收集到上游數(shù)據(jù)后將其傳遞給transform
這個 lambda探遵。
FlowCollector<R>.(value: T) -> Unit
是一個帶接收者的 lambda窟赏,接收者是FlowCollector
。調(diào)用這種 labmda 時需要指定接收者箱季,在 transform() 的語境中接收者是this
涯穷,所以省略了,如果將其補全藏雏,就是下面這樣:
public inline fun <T, R> Flow<T>.transform(
crossinline transform: suspend FlowCollector<R>.(value: T) -> Unit
): Flow<R> =
// 構(gòu)建下游流
flow { this ->
collect { value ->
return@collect this.transform(value)
}
}
// 構(gòu)建流的 flow {} 中的 lambda 也是帶接收者的
public fun <T> flow(block: suspend FlowCollector<T>.() -> Unit): Flow<T> =
SafeFlow(block)
讓FlowCollector
作為接收者是有好處的拷况,這樣就可以在 lambda 中方便地訪問到FlowCollector.emit()
,即transform()
將“下游流如何生產(chǎn)數(shù)據(jù)”這個策略交由外部傳入的 lambda 決定掘殴。(關(guān)于策略模式的詳解可以點擊一句話總結(jié)殊途同歸的設(shè)計模式:工廠模式=蝠嘉?策略模式=?模版方法模式)杯巨,所以可以得出這樣的結(jié)論:
transform() 建立了一種在流上攔截并轉(zhuǎn)發(fā)的機制:新建下游流,它生產(chǎn)數(shù)據(jù)的方式是通過收集上游數(shù)據(jù)努酸,并將數(shù)據(jù)轉(zhuǎn)發(fā)到一個帶有發(fā)射數(shù)據(jù)能力的 lambda 中服爷。transform() 這個中間消費者在攔截上游數(shù)據(jù)后,就可隨心所欲地將其變換后再轉(zhuǎn)發(fā)給下游消費者获诈。
onEach() & map() & 自定義中間消費者
所以 transform() 通常用于定義新的中間消費者仍源,onEach()
的定義就借助于它:
public fun <T> Flow<T>.onEach(action: suspend (T) -> Unit): Flow<T> = transform { value ->
action(value)
return@transform emit(value)
}
所有的中間消費者都定義成 Flow 的擴展方法,而且都會返回一個新建的下游流舔涎。這樣做是為了讓不同的中間消費者可以方便地通過鏈式調(diào)用串聯(lián)在一起笼踩。
onEach() 通過 transform() 構(gòu)建了一個下游流,并在轉(zhuǎn)發(fā)每一個上游流數(shù)據(jù)前又做了一件額外的事情亡嫌,用lambda action
表示嚎于。
map() 也是通過 transform() 實現(xiàn)的:
public inline fun <T, R> Flow<T>.map(crossinline transform: suspend (value: T) -> R): Flow<R> =
transform { value -> return@transform emit(transform(value)) }
map() 通過 transform() 構(gòu)建了一個下游流,并且在拿到上游流數(shù)據(jù)時先將其進行了transform
變換挟冠,然后再轉(zhuǎn)發(fā)出去于购。
利用 transform() 的機制,可以很方便地自定義一個中間消費者:
fun <T, R> Flow<T>.filterMap(
predicate: (T) -> Boolean,
transform: suspend (T) -> R
): Flow<R> =
transform { value -> if (predicate(value)) emit(transform(value)) }
filterMap() 只對上游數(shù)據(jù)中滿足 predicate 條件的數(shù)據(jù)進行變換并發(fā)射知染。
onStart()
onStart() 也是中間消費者肋僧,但它沒有借助于 transform(),而是通過unsafeFlow()
構(gòu)建了一個下游流:
public fun <T> Flow<T>.onStart(
action: suspend FlowCollector<T>.() -> Unit
): Flow<T> = unsafeFlow { // 構(gòu)建下游流
val safeCollector = SafeCollector<T>(this, currentCoroutineContext())
try {
safeCollector.action() // 在收集上游流數(shù)據(jù)之前執(zhí)行動作
} finally {
safeCollector.releaseIntercepted()
}
collect(this) // 收集上游流數(shù)據(jù)
}
internal inline fun <T> unsafeFlow(crossinline block: suspend FlowCollector<T>.() -> Unit): Flow<T> {
// 構(gòu)建新流
return object : Flow<T> {
override suspend fun collect(collector: FlowCollector<T>) {
collector.block()
}
}
}
unsafeFlow() 直接實例化了Flow
接口控淡,并定義了該流被收集時執(zhí)行的操作嫌吠,即調(diào)用block
。所以 unsafeFlow() 和 transform 很類似掺炭,都新建下游流以收集了上游數(shù)據(jù)辫诅,只不過在收集動作(所有數(shù)據(jù)發(fā)射之前)之前做了一件額外的事。
onCompletion()
public fun <T> Flow<T>.onCompletion(
action: suspend FlowCollector<T>.(cause: Throwable?) -> Unit
): Flow<T> = unsafeFlow { // 構(gòu)建下游流
try {
collect(this) // 1.先收集上游流數(shù)據(jù)
} catch (e: Throwable) {
ThrowingCollector(e).invokeSafely(action, e)
throw e
}
val sc = SafeCollector(this, currentCoroutineContext())
try {
sc.action(null) // 2.再執(zhí)行動作
} finally {
sc.releaseIntercepted()
}
}
onCompletion() 的實現(xiàn)和 onStart() 很類似涧狮,只不過是在收集數(shù)據(jù)之后再執(zhí)行動作泥栖。
因為 onStart() 和 onCompletion() 都用下游流套上游流的方式實現(xiàn)簇宽,只是收集數(shù)據(jù)和執(zhí)行動作的順序不同,就會產(chǎn)生下面這樣有趣的效果:
GlobalScope.launch {
flow {
(1 .. 3).forEach {
delay(1000)
emit(it)
}
}.onStart { Log.v("test","start1") }
.onStart { Log.v("test","start2") }
.onCompletion { Log.v("test","complete1") }
.onCompletion { Log.v("test","complete2") }
.collect { Log.v("test", "$it") }
}
上述代碼的輸出結(jié)果如下:
start2
start1
1
2
3
complete1
complete2
鏈式調(diào)用中出現(xiàn)多個onStart { action }
時吧享,后出現(xiàn)的 action 會先執(zhí)行魏割,因為后續(xù) onStart 構(gòu)建的下游流包在了上游 onStart 的外面,并且 action 會在收集上游流數(shù)據(jù)之前執(zhí)行钢颂。
而這個結(jié)論卻不能沿用到onCompletion { action }
钞它,雖然 onCompletion 構(gòu)建的下游流也包裹在上游 onCompletion 外面,但是 action 總是在收集上游流之后執(zhí)行殊鞭。
終端消費者
上面所有的擴展方法之所以稱為“中間消費者”是因為它們都構(gòu)建了一個新的下游流遭垛,并且只有當(dāng)下游流被收集的時候,它們才會去收集上游流操灿。也就是說锯仪,如果沒有收集下游流,流中的數(shù)據(jù)就永遠不會被發(fā)射趾盐,這個特性稱為冷流庶喜。
看一個冷流的例子:
// 執(zhí)行式的
suspend fun get(): List<String> =
listof("a", "b", "c").onEach {
delay(1000)
print(it)
}
// 聲明式的
fun get(): Flow<String> =
flowOf("a", "b", "c").onEach {
delay(1000)
print(it)
}
分別調(diào)用這兩個 get() 方法時,第一個 get 會立馬打印出結(jié)果救鲤,而第二個什么也不會打印久窟。因為第二個 get() 只是聲明了如何構(gòu)建一個冷流,它并沒有被收集本缠,所以也不會發(fā)射數(shù)據(jù)斥扛。
Flow 是冷流,冷流不會發(fā)射數(shù)據(jù)丹锹,直到它被收集稀颁,所以冷流是“聲明式的”。
所有能觸發(fā)收集數(shù)據(jù)動作的消費者稱為終端消費者楣黍,它就像點燃鞭炮的星火峻村,使得被若干個中間消費者套娃了的流從外向內(nèi)(從下游到上游)一個個的被收集,最終傳導(dǎo)到原始流锡凝,觸發(fā)數(shù)據(jù)的發(fā)射粘昨。
倒計時 demo 的reduce()
就是一個終端消費者:
val mainScope = MainScope()
mainScope.launch {
val ret = countdown(60_000, 2_000) { io(it) }
.onStart { Log.v("test", "countdown start") }
.onCompletion { Log.v("test", "countdown end") }
.reduce { acc, value -> acc + value } // 終端消費者:計算所有異步結(jié)果的和
// 因為 reduce() 是一個 suspend 方法,所以會掛起協(xié)程窜锯,直到倒計時完成才打印所有異步結(jié)果的和
Log.v("test", "coutdown acc ret = $ret")
}
reduce() 的源碼如下:
public suspend fun <S, T : S> Flow<T>.reduce(
operation: suspend (accumulator: S, value: T) -> S // 累加算法
): S {
var accumulator: Any? = NULL
// 收集數(shù)據(jù)
collect { value ->
// 將收集的數(shù)據(jù)累加
accumulator = if (accumulator !== NULL) {
operation(accumulator as S, value)
} else {
value
}
}
if (accumulator === NULL) throw NoSuchElementException("Empty flow can't be reduced")
// 返回累加和
return accumulator as S
}
reduce() 并沒有構(gòu)建新流张肾,而是直接收集了數(shù)據(jù),然后將所有數(shù)據(jù)進行累加并返回锚扎。
所有的終端消費者都是 suspend 方法吞瞪,這意味著收集數(shù)據(jù)必須在協(xié)程中進行。demo 中使用 MainScope 啟動協(xié)程驾孔,所以異步結(jié)果的和會在主線程中被打印芍秆。
線程切換
demo 中還剩下最后一個flowOn()
惯疙,它是中間消費者,略復(fù)雜妖啥,限于篇幅原因霉颠,下次再分析。但這不影響先了解它的效果:它會切換所有上游代碼執(zhí)行的線程荆虱,但不改變下游代碼執(zhí)行的線程蒿偎。
countdown() 方法通過flowOn(Dispatchers.Default)
,實現(xiàn)了后臺執(zhí)行倒計時任務(wù)怀读。而 reduce() 的調(diào)用發(fā)生在 flowOn() 之后诉位,所以異步任務(wù)結(jié)果累加還是在主線程進行的。
onStart()
菜枷、onEach()
珊豹、onCompletion()
迁筛、map()
捺信、reduce()
法希,這些消費者對數(shù)據(jù)的處理都被包裝在用suspend
修飾的 lambda 中。這意味著利用協(xié)程可以輕松地切換每個消費者運行的線程坷衍。也正是suspend
的存在,運行在同一線程中的下游消費者不會發(fā)生背壓条舔,因為下游消費者的掛起方法會天然阻塞上游生產(chǎn)數(shù)據(jù)的速度枫耳。
總結(jié)
- 異步數(shù)據(jù)流可以理解為一條時間軸上按序產(chǎn)生的數(shù)據(jù),它可用于表達多個連續(xù)的異步過程孟抗。
- 異步數(shù)據(jù)流也可以用“生產(chǎn)者/消費者”模型來理解迁杨,生產(chǎn)者和消費者之間就好像有一條管道,生產(chǎn)者從管道的一頭插入數(shù)據(jù)凄硼,消費者從另一頭取數(shù)據(jù)铅协。因為管道的存在,數(shù)據(jù)是有序的摊沉,遵循先進先出的原則狐史。
- Kotlin 中的
suspend
方法用于表達一個異步過程,而Flow
用于表達多連續(xù)個異步過程说墨。Flow
是冷流骏全,冷流不會發(fā)射數(shù)據(jù),直到它被收集的那一刻尼斧,所以冷流是“聲明式的”姜贡。 - 當(dāng)
Flow
被收集的瞬間,數(shù)據(jù)開始生產(chǎn)并被發(fā)射出去棺棵,通過流收集器FlowCollector
將其傳遞給消費者楼咳。流和流收集器是成對出現(xiàn)的概念熄捍。流是一組按序產(chǎn)生的數(shù)據(jù),數(shù)據(jù)的產(chǎn)生表現(xiàn)為通過流收集器發(fā)射數(shù)據(jù)母怜,在這里流收集器像是流數(shù)據(jù)容器(雖然它不持有任何一條數(shù)據(jù))余耽,它定義了如何將數(shù)據(jù)傳遞給消費者。 - 異步數(shù)據(jù)流中糙申,生產(chǎn)者和消費者之間可以插入中間消費者宾添。中間消費者建立了流上的攔截并轉(zhuǎn)發(fā)機制:新建下游流,它生產(chǎn)數(shù)據(jù)的方式是通過收集上游數(shù)據(jù)柜裸,并轉(zhuǎn)發(fā)到一個帶有發(fā)射數(shù)據(jù)能力的 lambda 中缕陕。擁有多個中間消費者的流就像“套娃”一樣,下游流套在上游流外面疙挺。中間消費者通過這種方式攔截了原始數(shù)據(jù)扛邑,就可以對其做任意變換再轉(zhuǎn)發(fā)給下游消費者。因為 Flow 是冷流铐然,所有的中間消費者只是定義了一連串待執(zhí)行的調(diào)用鏈蔬崩。
- 所有能觸發(fā)收集數(shù)據(jù)動作的消費者稱為終端消費者,它就像點燃鞭炮的星火搀暑,使得被若干個中間消費者套娃的流從外向內(nèi)(從下游到上游)一個個的被收集沥阳,最終傳導(dǎo)到原始流,觸發(fā)數(shù)據(jù)的發(fā)射自点。
- 默認情況下桐罕,流中生產(chǎn)和消費數(shù)據(jù)是在同一個線程中進行的。但可以通過
flowOn()
改變上游流執(zhí)行的線程桂敛,這并不影響下游流所執(zhí)行的線程功炮。 -
Flow
中生產(chǎn)和消費數(shù)據(jù)的操作都被包裝在用 suspend 修飾的 lambda 中,用協(xié)程就可以輕松的實現(xiàn)異步生產(chǎn)术唬,異步消費薪伏。
本文轉(zhuǎn)自 https://juejin.cn/post/6989032238079803429,如有侵權(quán)粗仓,請聯(lián)系刪除嫁怀。
作者:唐子玄