最近一直在了解關(guān)于 Kotlin協(xié)程 的知識(shí)牺氨,那最好的學(xué)習(xí)資料自然是官方提供的學(xué)習(xí)文檔了锅铅,看了看后我就萌生了翻譯官方文檔的想法。前后花了要接近一個(gè)月時(shí)間层玲,一共九篇文章,在這里也分享出來反症,希望對(duì)讀者有所幫助辛块。個(gè)人知識(shí)所限,有些翻譯得不是太順暢铅碍,也希望讀者能提出意見
協(xié)程官方文檔:coroutines-guide
掛起函數(shù)可以異步返回單個(gè)值润绵,但如何返回多個(gè)異步計(jì)算值呢?這就是 kotlin Flows(流) 的用處了
一胞谈、表示多個(gè)值
可以使用集合在 kotlin 中表示多個(gè)值尘盼。例如,有一個(gè)函數(shù) foo()烦绳,它返回包含三個(gè)數(shù)字的 List卿捎,然后使用 forEach 打印它們
fun foo(): List<Int> = listOf(1, 2, 3)
fun main() {
foo().forEach { value -> println(value) }
}
輸出結(jié)果:
1
2
3
1.1、序列
如果我們使用一些 CPU 消耗型 的阻塞代碼(每次計(jì)算需要100毫秒)來計(jì)算數(shù)字径密,那么我們可以使用一個(gè)序列(Sequence)來表示數(shù)字:
fun foo(): Sequence<Int> = sequence {
// sequence builder
for (i in 1..3) {
Thread.sleep(100) // pretend we are computing it
yield(i) // yield next value
}
}
fun main() {
foo().forEach { value -> println(value) }
}
這段代碼輸出相同的數(shù)字列表午阵,但每打印一個(gè)數(shù)字前都需要等待100毫秒
1.2、掛起函數(shù)
上一節(jié)的代碼的計(jì)算操作會(huì)阻塞運(yùn)行代碼的主線程享扔。當(dāng)這些值由異步代碼計(jì)算時(shí)底桂,我們可以用 suspend 修飾符標(biāo)記函數(shù) foo,以便它可以在不阻塞的情況下執(zhí)行其工作惧眠,并將結(jié)果作為列表返回
import kotlinx.coroutines.*
//sampleStart
suspend fun foo(): List<Int> {
delay(1000) // pretend we are doing something asynchronous here
return listOf(1, 2, 3)
}
fun main() = runBlocking<Unit> {
foo().forEach { value -> println(value) }
}
//sampleEnd
這段代碼在等待一秒后輸出數(shù)字
1.3籽懦、Flows
使用 List< Int > 作為返回值類型,意味著我們只能同時(shí)返回所有值锉试。為了表示異步計(jì)算的值流猫十,我們可以使用 Flow< Int > 類型览濒,就像同步計(jì)算值的 Sequence< Int > 類型一樣
//sampleStart
fun foo(): Flow<Int> = flow { // flow builder
for (i in 1..3) {
delay(100) // pretend we are doing something useful here
emit(i) // emit next value
}
}
fun main() = runBlocking<Unit> {
// Launch a concurrent coroutine to check if the main thread is blocked
launch {
for (k in 1..3) {
println("I'm not blocked $k")
delay(100)
}
}
// Collect the flow
foo().collect { value -> println(value) }
}
//sampleEnd
此代碼在打印每個(gè)數(shù)字前等待100毫秒,但不會(huì)阻塞主線程拖云。通過從主線程中運(yùn)行的單獨(dú)協(xié)程中每隔100毫秒打印了一次 “I'm not blocked”贷笛,可以驗(yàn)證這一點(diǎn):
I'm not blocked 1
1
I'm not blocked 2
2
I'm not blocked 3
3
請(qǐng)注意,代碼與前面示例中的 Flow 有以下不同:
- Flow 類型的構(gòu)造器函數(shù)名為 flow
- flow{...} 中的代碼塊可以掛起
- foo 函數(shù)不再標(biāo)記 suspend 修飾符
- 值通過 emit 函數(shù)從流中發(fā)出
- 通過 collect 函數(shù)從 flow 中取值
我們可以用 Thread.sleep 來代替 flow{...} 中的 delay宙项,可以看到在這種情況下主線程被阻塞住了
二乏苦、流是冷的
Flows 是冷流(cold streams),類似于序列(sequences)尤筐,flow builder 中的代碼在開始收集流值之前不會(huì)運(yùn)行汇荐。在下面的示例中可以清楚地看到這一點(diǎn):
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
//sampleStart
fun foo(): Flow<Int> = flow {
println("Flow started")
for (i in 1..3) {
delay(100)
emit(i)
}
}
fun main() = runBlocking<Unit> {
println("Calling foo...")
val flow = foo()
println("Calling collect...")
flow.collect { value -> println(value) }
println("Calling collect again...")
flow.collect { value -> println(value) }
}
//sampleEnd
運(yùn)行結(jié)果:
Calling foo...
Calling collect...
Flow started
1
2
3
Calling collect again...
Flow started
1
2
3
這是 foo() 函數(shù)(返回了 flow)未標(biāo)記 suspend
修飾符的一個(gè)關(guān)鍵原因。foo()
本身返回很快盆繁,不會(huì)進(jìn)行任何等待掀淘。flow 每次收集時(shí)都會(huì)啟動(dòng),這就是我們?cè)俅握{(diào)用 collect
時(shí)會(huì)看到“flow started”的原因
三油昂、取消流
Flow 采用和協(xié)程取同樣的協(xié)作取消革娄。但是,F(xiàn)low 實(shí)現(xiàn)基礎(chǔ)并沒有引入額外的取消點(diǎn)冕碟,它對(duì)于取消操作是完全透明的拦惋。通常,流的收集操作可以在當(dāng)流在一個(gè)可取消的掛起函數(shù)(如 delay)中掛起的時(shí)候取消安寺,否則不能取消
以下示例展示了在 withTimeoutOrNull 塊中流如何在超時(shí)時(shí)被取消并停止執(zhí)行
//sampleStart
fun foo(): Flow<Int> = flow {
for (i in 1..3) {
delay(100)
println("Emitting $i")
emit(i)
}
}
fun main() = runBlocking<Unit> {
withTimeoutOrNull(250) {
// Timeout after 250ms
foo().collect { value -> println(value) }
}
println("Done")
}
//sampleEnd
注意厕妖,foo() 函數(shù)中的 Flow 只傳出兩個(gè)數(shù)字,得到以下輸出:
Emitting 1
1
Emitting 2
2
Done
相對(duì)應(yīng)的挑庶,可以注釋掉 flow 中的 delay 函數(shù)言秸,并增大 for 循環(huán)的循環(huán)范圍,此時(shí)可以發(fā)現(xiàn) flow 沒有被取消挠羔,因?yàn)?flow 中沒有引入額外的掛起點(diǎn)
//sampleStart
fun foo(): Flow<Int> = flow {
for (i in 1..Int.MAX_VALUE) {
// delay(100)
println("Emitting $i")
emit(i)
}
}
fun main() = runBlocking<Unit> {
withTimeoutOrNull(250) {
// Timeout after 250ms
foo().collect { value -> println(value) }
}
println("Done")
}
//sampleEnd
四井仰、流構(gòu)建器
前面例子中的 flow{...} 是最基礎(chǔ)的一個(gè)流構(gòu)建器,還有其它的構(gòu)建器可以更容易地聲明流:
- flowOf() 定義了一個(gè)發(fā)出固定值集的流構(gòu)建器
- 可以使用擴(kuò)展函數(shù)
.asFlow()
將各種集合和序列轉(zhuǎn)換為流
因此破加,從流中打印從 1 到 3 的數(shù)字的例子可以改寫成:
fun main() = runBlocking<Unit> {
//sampleStart
// Convert an integer range to a flow
(1..3).asFlow().collect { value -> println(value) }
//sampleEnd
}
五俱恶、中間流運(yùn)算符
可以使用運(yùn)算符來轉(zhuǎn)換流,就像使用集合和序列一樣范舀。中間運(yùn)算符應(yīng)用于上游流并返回下游流合是。這些運(yùn)算符是冷操作符,和流一樣锭环。此類運(yùn)算符本身不是掛起函數(shù)聪全,它工作得很快,其返回一個(gè)新的轉(zhuǎn)換后的流辅辩,但引用僅包含對(duì)新流的操作定義难礼,并不馬上進(jìn)行轉(zhuǎn)換
基礎(chǔ)運(yùn)算符有著熟悉的名稱娃圆,例如 map 和 filter。流運(yùn)算符和序列的重要區(qū)別在于流運(yùn)算符中的代碼可以調(diào)用掛起函數(shù)
例如蛾茉,可以使用 map 運(yùn)算符將傳入請(qǐng)求流映射為結(jié)果值讼呢,即使執(zhí)行請(qǐng)求是由掛起函數(shù)實(shí)現(xiàn)的長(zhǎng)時(shí)間運(yùn)行的操作:
//sampleStart
suspend fun performRequest(request: Int): String {
delay(1000) // imitate long-running asynchronous work
return "response $request"
}
fun main() = runBlocking<Unit> {
(1..3).asFlow() // a flow of requests
.map { request -> performRequest(request) }
.collect { response -> println(response) }
}
//sampleEnd
運(yùn)行結(jié)果共有三行,每一秒打印一行輸出
response 1
response 2
response 3
5.1谦炬、轉(zhuǎn)換操作符
在流的轉(zhuǎn)換運(yùn)算符中悦屏,最常用的一個(gè)稱為 transform
。它可以用來模擬簡(jiǎn)單的數(shù)據(jù)轉(zhuǎn)換(就像 map 和 filter)键思,以及實(shí)現(xiàn)更復(fù)雜的轉(zhuǎn)換础爬。使用 transform
運(yùn)算符,我們可以發(fā)出任意次數(shù)的任意值
例如吼鳞,通過使用 transform
看蚜,我們可以在執(zhí)行長(zhǎng)時(shí)間運(yùn)行的異步請(qǐng)求之前發(fā)出一個(gè)字符串,并在該字符串后面跟隨一個(gè)響應(yīng):
suspend fun performRequest(request: Int): String {
delay(1000) // imitate long-running asynchronous work
return "response $request"
}
fun main() = runBlocking<Unit> {
//sampleStart
(1..3).asFlow() // a flow of requests
.transform { request ->
emit("Making request $request")
emit(performRequest(request))
}
.collect { response -> println(response) }
//sampleEnd
}
輸出值:
Making request 1
response 1
Making request 2
response 2
Making request 3
response 3
5.2赖条、限長(zhǎng)運(yùn)算符
限長(zhǎng)中間運(yùn)算符在達(dá)到相應(yīng)限制時(shí)取消流的執(zhí)行失乾。協(xié)程中的取消總是通過拋出異常來實(shí)現(xiàn),這樣所有的資源管理函數(shù)(例如 try { ... } finally { ... } )就可以在取消時(shí)正常執(zhí)行
//sampleStart
fun numbers(): Flow<Int> = flow {
try {
emit(1)
emit(2)
println("This line will not execute")
emit(3)
} finally {
println("Finally in numbers")
}
}
fun main() = runBlocking<Unit> {
numbers()
.take(2) // take only the first two
.collect { value -> println(value) }
}
//sampleEnd
這段代碼的輸出清楚地顯示了 numbers() 函數(shù)中的 flow{...} 函數(shù)體在發(fā)出第二個(gè)數(shù)字后就停止了:
1
2
Finally in numbers
六纬乍、流運(yùn)算符
終端流運(yùn)算符是用于啟動(dòng)流的掛起函數(shù)。collect
是最基本的終端流運(yùn)算符裸卫,但還有其它終端運(yùn)算符仿贬,可以使得操作更加簡(jiǎn)便:
- 轉(zhuǎn)換為各種集合,如 toList 和 toSet 函數(shù)
- first 運(yùn)算符用于獲取第一個(gè)值墓贿,single 運(yùn)算符用于確保流發(fā)出單個(gè)值
- 使用 reduce 和 fold 將流還原為某個(gè)值
例如:
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking<Unit> {
//sampleStart
val sum = (1..5).asFlow()
.map { it * it } // squares of numbers from 1 to 5
.reduce { a, b -> a + b } // sum them (terminal operator)
println(sum)
//sampleEnd
}
輸出單個(gè)值:
55
七茧泪、流是連續(xù)的
除非使用對(duì)多個(gè)流進(jìn)行操作的特殊運(yùn)算符,否則每個(gè)流的單獨(dú)集合都是按順序執(zhí)行的聋袋。集合直接在調(diào)用終端運(yùn)算符的協(xié)程中工作队伟,默認(rèn)情況下不會(huì)啟動(dòng)新的協(xié)程。每個(gè)發(fā)出的值都由所有中間運(yùn)算符從上游到下游進(jìn)行處理幽勒,然后在之后傳遞給終端運(yùn)算符
請(qǐng)參閱以下示例嗜侮,該示例過濾偶數(shù)并將其映射到字符串:
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking<Unit> {
//sampleStart
(1..5).asFlow()
.filter {
println("Filter $it")
it % 2 == 0
}
.map {
println("Map $it")
"string $it"
}.collect {
println("Collect $it")
}
//sampleEnd
}
輸出:
Filter 1
Filter 2
Map 2
Collect string 2
Filter 3
Filter 4
Map 4
Collect string 4
Filter 5
八、流上下文
流的收集總是在調(diào)用協(xié)程的上下文中執(zhí)行啥容。例如锈颗,如果存在 foo 流,則無(wú)論 foo 流的實(shí)現(xiàn)詳細(xì)信息如何咪惠,以下代碼都將在該開發(fā)者指定的上下文中執(zhí)行:
withContext(context) {
foo.collect { value ->
println(value) // run in the specified context
}
}
流的這個(gè)特性稱為上下文保留
所以击吱,默認(rèn)情況下,flow{...} 中的代碼在相應(yīng)流的收集器提供的上下文中運(yùn)行遥昧。例如覆醇,觀察 foo 的實(shí)現(xiàn)朵纷,它打印調(diào)用它的線程并發(fā)出三個(gè)數(shù)字:
fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
//sampleStart
fun foo(): Flow<Int> = flow {
log("Started foo flow")
for (i in 1..3) {
emit(i)
}
}
fun main() = runBlocking<Unit> {
foo().collect { value -> log("Collected $value") }
}
//sampleEnd
運(yùn)行結(jié)果:
[main @coroutine#1] Started foo flow
[main @coroutine#1] Collected 1
[main @coroutine#1] Collected 2
[main @coroutine#1] Collected 3
由于 foo().collect
是在主線程調(diào)用的,所以 foo 流也是在主線程中調(diào)用永脓。對(duì)于不關(guān)心執(zhí)行上下文且不阻塞調(diào)用方的快速返回代碼或者異步代碼袍辞,這是完美的默認(rèn)設(shè)置
8.1、錯(cuò)誤地使用 withContext
但是憨奸,可能需要在 Dispatchers 的上下文中執(zhí)行長(zhǎng)時(shí)間運(yùn)行的占用 CPU 的代碼革屠,可能需要在 Dispatchers.Main 的上下文中執(zhí)行默認(rèn)代碼和 UI 更新。通常排宰,withContext 用于在使用 kotlin 協(xié)程時(shí)更改代碼中的上下文似芝,但 fow{...}
中的代碼必須遵守上下文本保留屬性,并且不允許從其它上下文中觸發(fā)
嘗試運(yùn)行以下代碼:
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
//sampleStart
fun foo(): Flow<Int> = flow {
// The WRONG way to change context for CPU-consuming code in flow builder
kotlinx.coroutines.withContext(Dispatchers.Default) {
for (i in 1..3) {
Thread.sleep(100) // pretend we are computing it in CPU-consuming way
emit(i) // emit next value
}
}
}
fun main() = runBlocking<Unit> {
foo().collect { value -> println(value) }
}
//sampleEnd
代碼會(huì)生成以下異常:
Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated:
Flow was collected in [CoroutineId(1), "coroutine#1":BlockingCoroutine{Active}@5511c7f8, BlockingEventLoop@2eac3323],
but emission happened in [CoroutineId(1), "coroutine#1":DispatchedCoroutine{Active}@2dae0000, DefaultDispatcher].
Please refer to 'flow' documentation or use 'flowOn' instead
at ...
8.2板甘、flowOn 運(yùn)算符
有個(gè)例外情況党瓮,flowOn 函數(shù)能用于改變流發(fā)送值時(shí)的上下文。改變流上下文的正確方式如下面的示例所示盐类,該示例還打印了相應(yīng)線程的名稱寞奸,以顯示所有線程的工作方式:
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
//sampleStart
fun foo(): Flow<Int> = flow {
for (i in 1..3) {
Thread.sleep(100) // pretend we are computing it in CPU-consuming way
log("Emitting $i")
emit(i) // emit next value
}
}.flowOn(Dispatchers.Default) // RIGHT way to change context for CPU-consuming code in flow builder
fun main() = runBlocking<Unit> {
foo().collect { value ->
log("Collected $value")
}
}
//sampleEnd
注意,flow{...} 在后臺(tái)線程中工作在跳,而在主線程中進(jìn)行取值
這里要注意的另一件事是 flowOn 操作符改變了流的默認(rèn)順序性質(zhì)∏固眩現(xiàn)在取值操作發(fā)生在協(xié)程 "coroutine#1" 中,而發(fā)射值的操作同時(shí)運(yùn)行在另一個(gè)線程中的協(xié)程 "coroutine#2" 上猫妙。當(dāng)必須在上游流的上下文中更改 CoroutineDispatcher 時(shí)瓷翻,flowOn 運(yùn)算符將為該上游流創(chuàng)建另一個(gè)協(xié)程
九、緩沖
從收集流所需的總時(shí)間的角度來看割坠,在不同的協(xié)程中運(yùn)行流的不同部分可能會(huì)有所幫助齐帚,特別是當(dāng)涉及到長(zhǎng)時(shí)間運(yùn)行的異步操作時(shí)。例如彼哼,假設(shè) foo() 流的發(fā)射很慢对妄,生成元素需要100毫秒;收集器也很慢敢朱,處理元素需要300毫秒剪菱。讓我們看看用三個(gè)數(shù)字收集這樣的流需要多長(zhǎng)時(shí)間:
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.system.*
//sampleStart
fun foo(): Flow<Int> = flow {
for (i in 1..3) {
delay(100) // pretend we are asynchronously waiting 100 ms
emit(i) // emit next value
}
}
fun main() = runBlocking<Unit> {
val time = measureTimeMillis {
foo().collect { value ->
delay(300) // pretend we are processing it for 300 ms
println(value)
}
}
println("Collected in $time ms")
}
//sampleEnd
以上代碼會(huì)產(chǎn)生如下類似的結(jié)果,整個(gè)收集過程大約需要1200毫秒(三個(gè)數(shù)字蔫饰,每個(gè)400毫秒)
1
2
3
Collected in 1220 ms
我們可以在流上使用 buffer 運(yùn)算符琅豆,在運(yùn)行取集代碼的同時(shí)運(yùn)行 foo() 的發(fā)值代碼,而不是按順序運(yùn)行它們
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.system.*
fun foo(): Flow<Int> = flow {
for (i in 1..3) {
delay(100) // pretend we are asynchronously waiting 100 ms
emit(i) // emit next value
}
}
fun main() = runBlocking<Unit> {
//sampleStart
val time = measureTimeMillis {
foo()
.buffer() // buffer emissions, don't wait
.collect { value ->
delay(300) // pretend we are processing it for 300 ms
println(value)
}
}
println("Collected in $time ms")
//sampleEnd
}
這可以得到相同的輸出結(jié)果但運(yùn)行速度更快篓吁,因?yàn)槲覀円呀?jīng)有效地創(chuàng)建了一個(gè)處理管道茫因,第一個(gè)數(shù)字只需要等待100毫秒,然后只需要花費(fèi)300毫秒來處理每個(gè)數(shù)字杖剪。這樣運(yùn)行大約需要1000毫秒:
1
2
3
Collected in 1071 ms
請(qǐng)注意冻押,flowOn 運(yùn)算符在必須更改 CoroutineDispatcher 時(shí)使用相同的緩沖機(jī)制驰贷,但這里我們顯示地請(qǐng)求緩沖而不更改執(zhí)行上下文
9.1、合并
當(dāng)流用于表示操作或操作狀態(tài)更新的部分結(jié)果時(shí)洛巢,可能不需要處理每個(gè)值括袒,而是只處理最近的值。在這種情況下稿茉,當(dāng)取值器處理中間值太慢時(shí)锹锰,可以使用合并運(yùn)算符跳過中間值。在前面的例子的基礎(chǔ)上再來修改下:
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.system.*
fun foo(): Flow<Int> = flow {
for (i in 1..3) {
delay(100) // pretend we are asynchronously waiting 100 ms
emit(i) // emit next value
}
}
fun main() = runBlocking<Unit> {
//sampleStart
val time = measureTimeMillis {
foo()
.conflate() // conflate emissions, don't process each one
.collect { value ->
delay(300) // pretend we are processing it for 300 ms
println(value)
}
}
println("Collected in $time ms")
//sampleEnd
}
可以看到漓库,雖然第一個(gè)數(shù)字仍在處理中恃慧,但第二個(gè)數(shù)字和第三個(gè)數(shù)字已經(jīng)生成,因此第二個(gè)數(shù)字被合并(丟棄)渺蒿,只有最近的一個(gè)數(shù)字(第三個(gè))被交付給取值器:
1
3
Collected in 758 ms
9.2痢士、處理最新值
在發(fā)射端和處理端都很慢的情況下,合并是加快處理速度的一種方法茂装。它通過丟棄發(fā)射的值來實(shí)現(xiàn)怠蹂。另一種方法是取消慢速收集器,并在每次發(fā)出新值時(shí)重新啟動(dòng)它少态。有一系列 xxxLatest 運(yùn)算符與 xxx 運(yùn)算符執(zhí)行相同的基本邏輯城侧,但是在新值產(chǎn)生的時(shí)候取消執(zhí)行其塊中的代碼。在前面的示例中彼妻,我們嘗試將 conflate 更改為 collectLatest:
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.system.*
fun foo(): Flow<Int> = flow {
for (i in 1..3) {
delay(100) // pretend we are asynchronously waiting 100 ms
emit(i) // emit next value
}
}
fun main() = runBlocking<Unit> {
//sampleStart
val time = measureTimeMillis {
foo()
.collectLatest { value -> // cancel & restart on the latest value
println("Collecting $value")
delay(300) // pretend we are processing it for 300 ms
println("Done $value")
}
}
println("Collected in $time ms")
//sampleEnd
}
由于 collectLatest 的主體需要延遲300毫秒赞庶,而每100毫秒會(huì)發(fā)出一個(gè)新值,因此我們可以看到 collectLatest 代碼塊得到了每一個(gè)發(fā)射值澳骤,但最終只完成了最后一個(gè)值:
Collecting 1
Collecting 2
Collecting 3
Done 3
Collected in 741 ms
十、組合多個(gè)流
有許多方法可以組合多個(gè)流
10.1澜薄、zip
與 Kotlin 標(biāo)準(zhǔn)庫(kù)中的 Sequence.zip 擴(kuò)展函數(shù)一樣为肮,流有一個(gè) zip 運(yùn)算符,用于組合兩個(gè)流的相應(yīng)值:
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking<Unit> {
//sampleStart
val nums = (1..3).asFlow() // numbers 1..3
val strs = flowOf("one", "two", "three") // strings
nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string
.collect { println(it) } // collect and print
//sampleEnd
}
運(yùn)行結(jié)果:
1 -> one
2 -> two
3 -> three
10.2肤京、Combine
當(dāng) flow 表示變量或操作的最新值時(shí)(參閱有關(guān) conflation 的相關(guān)章節(jié))颊艳,可能需要執(zhí)行依賴于相應(yīng)流的最新值的計(jì)算,并在任何上游流發(fā)出值時(shí)重新計(jì)算它忘分。相應(yīng)的運(yùn)算符族稱為 combine
例如棋枕,如果上例中的數(shù)字每300毫秒更新一次,但字符串每400毫秒更新一次妒峦,則使用 zip 運(yùn)算符壓縮它們?nèi)詴?huì)產(chǎn)生相同的結(jié)果重斑,盡管結(jié)果是每400毫秒打印一次
在本例中,我們使用中間運(yùn)算符 onEach 來延遲每個(gè)元素肯骇,并使發(fā)出樣本流的代碼更具聲明性窥浪,更加簡(jiǎn)短
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking<Unit> {
//sampleStart
val nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms
val strs = flowOf("one", "two", "three").onEach { delay(400) } // strings every 400 ms
val startTime = System.currentTimeMillis() // remember the start time
nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string with "zip"
.collect { value -> // collect and print
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
//sampleEnd
}
但是祖很,如果在此處使用 combine 運(yùn)算符而不是 zip 時(shí):
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking<Unit> {
//sampleStart
val nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms
val strs = flowOf("one", "two", "three").onEach { delay(400) } // strings every 400 ms
val startTime = System.currentTimeMillis() // remember the start time
nums.combine(strs) { a, b -> "$a -> $b" } // compose a single string with "combine"
.collect { value -> // collect and print
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
//sampleEnd
}
我們得到了完全不同的輸出:
1 -> one at 452 ms from start
2 -> one at 651 ms from start
2 -> two at 854 ms from start
3 -> two at 952 ms from start
3 -> three at 1256 ms from start
十一、展平流
流表示異步接收的值序列漾脂,因此在每個(gè)值觸發(fā)對(duì)另一個(gè)值序列的請(qǐng)求的情況下很容易獲取新值假颇。例如,我們可以使用以下函數(shù)骨稿,該函數(shù)返回相隔500毫秒的兩個(gè)字符串流:
fun requestFlow(i: Int): Flow<String> = flow {
emit("$i: First")
delay(500) // wait 500 ms
emit("$i: Second")
}
現(xiàn)在笨鸡,如果我們有一個(gè)包含三個(gè)整數(shù)的流,并為每個(gè)整數(shù)調(diào)用 requestFlow坦冠,如下所示:
(1..3).asFlow().map { requestFlow(it) }
然后我們最終得到一個(gè)流(flow< flow< String >>)形耗,需要將其展平為單獨(dú)一個(gè)流以進(jìn)行進(jìn)一步處理。集合和序列對(duì)此提供了 flatten 和 flatMap 運(yùn)算符蓝牲。然而趟脂,由于流的異步特性,它們需要不同的展開模式例衍,因此流上有一系列 flattening 運(yùn)算符
11.1昔期、flatMapConcat
flatMapConcat 和 flattencat 運(yùn)算符實(shí)現(xiàn)了 Concatenating 模式,它們是與序列運(yùn)算符最直接的類比佛玄。它們等待內(nèi)部流完成硼一,然后開始收集下一個(gè)流,如下例所示:
fun requestFlow(i: Int): Flow<String> = flow {
emit("$i: First")
delay(500) // wait 500 ms
emit("$i: Second")
}
fun main() = runBlocking<Unit> {
//sampleStart
val startTime = System.currentTimeMillis() // remember the start time
(1..3).asFlow().onEach { delay(100) } // a number every 100 ms
.flatMapConcat { requestFlow(it) }
.collect { value ->
// collect and print
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
//sampleEnd
}
flatMapConcat 的順序特性在輸出結(jié)果中清晰可見:
1: First at 121 ms from start
1: Second at 622 ms from start
2: First at 727 ms from start
2: Second at 1227 ms from start
3: First at 1328 ms from start
3: Second at 1829 ms from start
11.2梦抢、flatMapMerge
另一種 flattening 模式是同時(shí)收集所有傳入流并將其值合并到單個(gè)流中般贼,以便盡快發(fā)出值。它由 flatMapMerge 和 flattenMerge 運(yùn)算符實(shí)現(xiàn)奥吩。它們都接受一個(gè)可選的并發(fā)參數(shù)哼蛆,該參數(shù)用于限制同時(shí)收集的并發(fā)流的數(shù)量(默認(rèn)情況下等于 DEFAULT_CONCURRENCY)
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun requestFlow(i: Int): Flow<String> = flow {
emit("$i: First")
delay(500) // wait 500 ms
emit("$i: Second")
}
fun main() = runBlocking<Unit> {
//sampleStart
val startTime = System.currentTimeMillis() // remember the start time
(1..3).asFlow().onEach { delay(100) } // a number every 100 ms
.flatMapMerge { requestFlow(it) }
.collect { value -> // collect and print
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
//sampleEnd
}
flatMapMerge 的并發(fā)性是顯而易見的:
1: First at 136 ms from start
2: First at 231 ms from start
3: First at 333 ms from start
1: Second at 639 ms from start
2: Second at 732 ms from start
3: Second at 833 ms from start
請(qǐng)注意,flatMapMerge 按順序調(diào)用其代碼塊({requestFlow(it)})霞赫,但同時(shí)收集結(jié)果流腮介,這相當(dāng)于先執(zhí)行序列 map{requestFlow(it)},然后對(duì)返回值調(diào)用 flattenMerge
11.3端衰、flatMapLatest
與“Processing the latest value(處理最新值)”章節(jié)介紹的 collectLatest 操作符類似叠洗,存在相應(yīng)的 "Latest" flattening 模式。在該模式下旅东,一旦發(fā)出新流灭抑,將取消先前已發(fā)出的流。這通過 flatMapLatest 運(yùn)算符實(shí)現(xiàn)
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun requestFlow(i: Int): Flow<String> = flow {
emit("$i: First")
delay(500) // wait 500 ms
emit("$i: Second")
}
fun main() = runBlocking<Unit> {
//sampleStart
val startTime = System.currentTimeMillis() // remember the start time
(1..3).asFlow().onEach { delay(100) } // a number every 100 ms
.flatMapLatest { requestFlow(it) }
.collect { value -> // collect and print
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
//sampleEnd
}
本例中的輸出很好的演示了 flatMapLatest 的工作原理
1: First at 142 ms from start
2: First at 322 ms from start
3: First at 425 ms from start
3: Second at 931 ms from start
請(qǐng)注意抵代,當(dāng)新值到來時(shí)腾节,flatMapLatest 將取消其塊中的所有代碼({requestFlow(it)})。requestFlow 函數(shù)本身的調(diào)用是很快速的,并非掛起函數(shù)禀倔,如果其內(nèi)部不包含額外的掛起點(diǎn)榄融,那么它就不能被取消,所以此處就在其內(nèi)部使用了 delay 函數(shù)救湖,使其可以達(dá)到被取消的目的
十二愧杯、流異常
當(dāng)發(fā)射器或運(yùn)算符內(nèi)部的代碼引發(fā)異常時(shí),流收集器可以結(jié)束運(yùn)行鞋既,但會(huì)出現(xiàn)異常力九。有幾種方法可以處理這些異常
12.1、收集器 try 與 catch
收集器可以使用 kotlin 的 try/catch 代碼塊來處理異常
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
//sampleStart
fun foo(): Flow<Int> = flow {
for (i in 1..3) {
println("Emitting $i")
emit(i) // emit next value
}
}
fun main() = runBlocking<Unit> {
try {
foo().collect { value ->
println(value)
check(value <= 1) { "Collected $value" }
}
} catch (e: Throwable) {
println("Caught $e")
}
}
//sampleEnd
此代碼成功捕獲 collect 運(yùn)算符中的異常邑闺,如我們所見跌前,在此之后不再發(fā)出任何值:
Emitting 1
1
Emitting 2
2
Caught java.lang.IllegalStateException: Collected 2
12.2、一切都已捕獲
前面的示例實(shí)際上捕獲了發(fā)射器或任何中間或終端運(yùn)算符中發(fā)生的任何異常陡舅。例如抵乓,讓我們更改代碼,以便將發(fā)出的值映射到字符串靶衍,但相應(yīng)的代碼會(huì)產(chǎn)生異常:
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
//sampleStart
fun foo(): Flow<String> =
flow {
for (i in 1..3) {
println("Emitting $i")
emit(i) // emit next value
}
}
.map { value ->
check(value <= 1) { "Crashed on $value" }
"string $value"
}
fun main() = runBlocking<Unit> {
try {
foo().collect { value -> println(value) }
} catch (e: Throwable) {
println("Caught $e")
}
}
//sampleEnd
仍捕獲此異常并停止收集:
Emitting 1
string 1
Emitting 2
Caught java.lang.IllegalStateException: Crashed on 2
十三灾炭、異常透明性
但是發(fā)射器的代碼如何封裝其異常處理行為呢?
flows 對(duì)于異常必須是透明的颅眶,并且在 flow{...} 構(gòu)建器中發(fā)射值有可能拋出異常時(shí)蜈出,異常必須顯式地從 try/catch 塊內(nèi)部拋出。這保證了拋出異常的收集器始終可以使用 try/catch 來捕獲異常涛酗,如前一個(gè)示例所示
發(fā)射器可以使用 catch 運(yùn)算符來保持此異常的透明性铡原,并允許封裝其異常處理行為。catch 運(yùn)算符可以分析異常并根據(jù)捕獲到的異常以不同的方式對(duì)其作出反應(yīng):
- 可以使用 throw 重新引發(fā)異常
- 使用 catch 的 emit 可以將異常轉(zhuǎn)換為值的 emission
- 異成烫荆可以被其他代碼忽略燕刻、記錄或處理
例如,讓我們?cè)诓东@異常時(shí)發(fā)出一段文本:
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun foo(): Flow<String> =
flow {
for (i in 1..3) {
println("Emitting $i")
emit(i) // emit next value
}
}
.map { value ->
check(value <= 1) { "Crashed on $value" }
"string $value"
}
fun main() = runBlocking<Unit> {
//sampleStart
foo()
.catch { e -> emit("Caught $e") } // emit on exception
.collect { value -> println(value) }
//sampleEnd
}
示例代碼的輸出結(jié)果是與之前相同的剖笙,即使我們不再在代碼周圍使用 try/catch
13.1酌儒、透明捕獲
catch 中間運(yùn)算符遵循異常透明性,只捕獲上游異常(即 catch 上所有運(yùn)算符的異常枯途,而不是 catch 下所有運(yùn)算符的異常)。如果 collect{...}(放在 catch 下面)拋出異常籍滴,程序?qū)⑼顺觯?/p>
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
//sampleStart
fun foo(): Flow<Int> = flow {
for (i in 1..3) {
println("Emitting $i")
emit(i)
}
}
fun main() = runBlocking<Unit> {
foo()
.catch { e -> println("Caught $e") } // does not catch downstream exceptions
.collect { value ->
check(value <= 1) { "Collected $value" }
println(value)
}
}
//sampleEnd
盡管存在 catch 運(yùn)算符酪夷,但不會(huì)打印 “Caught ...” 日志
13.2、聲明式捕獲
我們可以將 catch 運(yùn)算符的聲明性與處理所有異常的愿望結(jié)合起來孽惰,方法是將 collect 運(yùn)算符原先所要做的操作移動(dòng)到 onEach 中晚岭,并將其放在 catch 運(yùn)算符之前。此流的取值操作必須由不帶參數(shù)的 collect() 函數(shù)來調(diào)用觸發(fā):
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun foo(): Flow<Int> = flow {
for (i in 1..3) {
println("Emitting $i")
emit(i)
}
}
fun main() = runBlocking<Unit> {
//sampleStart
foo()
.onEach { value ->
check(value <= 1) { "Collected $value" }
println(value)
}
.catch { e -> println("Caught $e") }
.collect()
//sampleEnd
}
現(xiàn)在我們可以看到打印了一條 “Caught ...” 消息勋功,至此我們捕獲了所有異常坦报,而無(wú)需顯式使用 try/catch
十四库说、流完成
當(dāng)流收集完成時(shí)(正常或異常)片择,它可能需要執(zhí)行一個(gè)操作潜的。正如你可能已經(jīng)注意到的,它可以通過兩種方式完成:命令式或聲明式
14.1字管、命令式 finally 塊
除了 try/catch 外啰挪,收集器還可以使用 finally 在收集完成時(shí)執(zhí)行操作
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
//sampleStart
fun foo(): Flow<Int> = (1..3).asFlow()
fun main() = runBlocking<Unit> {
try {
foo().collect { value -> println(value) }
} finally {
println("Done")
}
}
//sampleEnd
此代碼打印 fon() 流生成的三個(gè)數(shù)字,之后跟隨 "Done" 字符串
1
2
3
Done
14.2嘲叔、聲明式處理
對(duì)于聲明性方法亡呵,flow 有一個(gè) onCompletion 中間運(yùn)算符,該運(yùn)算符在流完全收集后調(diào)用
前面的示例可以使用 onCompletion 運(yùn)算符重寫硫戈,并生成相同的輸出:
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun foo(): Flow<Int> = (1..3).asFlow()
fun main() = runBlocking<Unit> {
//sampleStart
foo()
.onCompletion { println("Done") }
.collect { value -> println(value) }
//sampleEnd
}
onCompletion 的主要優(yōu)點(diǎn)是包含一個(gè) lambda 參數(shù)锰什,該 lambda 包含一個(gè)可空的 Throwable 參數(shù),該 Throwable 參數(shù)可用于確定流收集是正常完成還是異常完成丁逝。在以下示例中汁胆,foo() 流在發(fā)出數(shù)字1后引發(fā)異常:
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
//sampleStart
fun foo(): Flow<Int> = flow {
emit(1)
throw RuntimeException()
}
fun main() = runBlocking<Unit> {
foo()
.onCompletion { cause -> if (cause != null) println("Flow completed exceptionally") }
.catch { cause -> println("Caught exception") }
.collect { value -> println(value) }
}
//sampleEnd
如你所料,將打庸邸:
1
Flow completed exceptionally
Caught exception
與 catch 運(yùn)算符不同沦泌,onCompletion 運(yùn)算符不處理異常。正如我們從上面的示例代碼中看到的辛掠,異常仍然會(huì)流向下游谢谦。它將被傳遞給其他完成 onCompletion 運(yùn)算符,并可以使用 catch 運(yùn)算符進(jìn)行處理
14.3萝衩、僅限上游異常
就像 catch 操作符一樣回挽,onCompletion 只看到來自上游的異常,而看不到下游的異常猩谊。例如千劈,運(yùn)行以下代碼:
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
//sampleStart
fun foo(): Flow<Int> = (1..3).asFlow()
fun main() = runBlocking<Unit> {
foo()
.onCompletion { cause -> println("Flow completed with $cause") }
.collect { value ->
check(value <= 1) { "Collected $value" }
println(value)
}
}
//sampleEnd
我們可以看到 completion cause 為空,但流收集失敗并拋出異常:
1
Flow completed with null
Exception in thread "main" java.lang.IllegalStateException: Collected 2
十五牌捷、命令式還是聲明式
現(xiàn)在我們知道如何收集流墙牌,并以命令式和聲明式的方式處理它的完成和異常。這里很自然的就有了個(gè)問題暗甥,應(yīng)該首選哪種方法呢喜滨?為什么?作為一個(gè)庫(kù)撤防,我們不提倡任何特定的方法虽风,并且相信這兩種方式都是有效的,應(yīng)該根據(jù)你自己的偏好和代碼風(fēng)格來選擇
十六、啟動(dòng)流
很容易使用流來表示來自某個(gè)數(shù)據(jù)源的異步事件辜膝。在這種情況下无牵,我們需要一個(gè)模擬的 addEventListener 函數(shù),該函數(shù)將一段代碼注冊(cè)為對(duì)傳入事件的響應(yīng)厂抖,并繼續(xù)進(jìn)一步工作茎毁。onEach 運(yùn)算符可以擔(dān)任此角色。然而验游,onEach 是一個(gè)中間運(yùn)算符充岛。我們還需要一個(gè)終端運(yùn)算符來收集數(shù)據(jù)。否則耕蝉,只注冊(cè) onEach 是沒有效果的
如果在 onEach 之后使用 collect 終端運(yùn)算符崔梗,則在 collect 之后的代碼將等待流被收集完成后再運(yùn)行:
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
//sampleStart
// Imitate a flow of events
fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }
fun main() = runBlocking<Unit> {
events()
.onEach { event -> println("Event: $event") }
.collect() // <--- Collecting the flow waits
println("Done")
}
//sampleEnd
如你所見,將打印
Event: 1
Event: 2
Event: 3
Done
launchIn 終端運(yùn)算符在這里是很實(shí)用的垒在。通過將 collect 替換為 launchIn蒜魄,我們可以在單獨(dú)的協(xié)程中啟動(dòng)收集流數(shù)據(jù)的操作,以便立即繼續(xù)執(zhí)行下一步的代碼:
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
// Imitate a flow of events
fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }
//sampleStart
fun main() = runBlocking<Unit> {
events()
.onEach { event -> println("Event: $event") }
.launchIn(this) // <--- Launching the flow in a separate coroutine
println("Done")
}
//sampleEnd
運(yùn)行結(jié)果:
Done
Event: 1
Event: 2
Event: 3
launchIn 所需的參數(shù)用于指定啟動(dòng)用于收集流的協(xié)程的作用域场躯。在上面的示例中谈为,此作用域來自 runBlocking,因此當(dāng)流運(yùn)行時(shí)踢关,runBlocking 作用域等待其子協(xié)程完成伞鲫,并阻止主函數(shù)返回和終止此示例代碼
在實(shí)際應(yīng)用程序中,作用域?qū)碜陨芷谑怯邢薜膶?shí)體签舞。一旦此實(shí)體的生命周期終止秕脓,相應(yīng)的作用域?qū)⒈蝗∠瑥亩∠鄳?yīng)流的收集儒搭。onEach { ... }.launchIn(scope) 的工作方式與 addEventListener 類似吠架。但是,不需要相應(yīng)的 removeEventListener 函數(shù)搂鲫,因?yàn)?cancellation 和結(jié)構(gòu)化并發(fā)可以達(dá)到這個(gè)目的
請(qǐng)注意傍药,launchIn 還返回一個(gè) Job 對(duì)象,該 Job 僅可用于取消相應(yīng)的流數(shù)據(jù)收集協(xié)程魂仍,而不取消整個(gè)作用域或加入它
十七拐辽、Flow and Reactive Streams
For those who are familiar with Reactive Streams or reactive frameworks such as RxJava and project Reactor, design of the Flow may look very familiar.
Indeed, its design was inspired by Reactive Streams and its various implementations. But Flow main goal is to have as simple design as possible, be Kotlin and suspension friendly and respect structured concurrency. Achieving this goal would be impossible without reactive pioneers and their tremendous work. You can read the complete story in Reactive Streams and Kotlin Flows article.
While being different, conceptually, Flow is a reactive stream and it is possible to convert it to the reactive (spec and TCK compliant) Publisher and vice versa. Such converters are provided by kotlinx.coroutines out-of-the-box and can be found in corresponding reactive modules (kotlinx-coroutines-reactive for Reactive Streams, kotlinx-coroutines-reactor for Project Reactor and kotlinx-coroutines-rx2 for RxJava2). Integration modules include conversions from and to Flow, integration with Reactor's Context and suspension-friendly ways to work with various reactive entities.