【譯】協(xié)程與響應(yīng)式流

譯者前言

這是協(xié)程官方文檔中早期的一篇關(guān)于響應(yīng)式流與協(xié)程關(guān)系的一篇指南听诸,2019 年下半年后,由于協(xié)程推出了自己的異步流 API蚕泽,
所以這篇指南已經(jīng)被官方刪除晌梨,但當(dāng)年本人翻譯這篇指南的時候還是花了不少功夫,所以特此在這里保留下來须妻,以備在以后需要的時候參考仔蝌。

響應(yīng)式流與協(xié)程指南

-->協(xié)程指南中的基礎(chǔ)協(xié)程概念不是必須的,

kotlinx.coroutines 項目中有一系列和響應(yīng)式流相關(guān)的模塊:

本指南主要基于 Reactive Streams 的規(guī)范并使用
Publisher 接口和一些基于 RxJava 2.x 的示例荒吏,
該示例實現(xiàn)了響應(yīng)式流的規(guī)范敛惊。

歡迎你在 Github 上 clone
kotlinx.coroutines 項目

reactive/kotlinx-coroutines-rx2/test/guide
路徑中。

目錄

響應(yīng)式流與通道的區(qū)別

本節(jié)主要包含響應(yīng)式流與以協(xié)程為基礎(chǔ)的通道的不同點绰更。

迭代的基礎(chǔ)

[Channel] 與如下所示的響應(yīng)式流類有類似的概念:

它們都描述了一個異步的有限或無限的元素流(在 Rx 中又名 items)员辩,
并且都支持背壓。

在通道中每調(diào)用一次 [receive][ReceiveChannel.receive] 就消費一個元素鸵鸥。
讓我們用以下的例子來說明:

fun main() = runBlocking<Unit> {
    // 創(chuàng)建一個通道,該通道每200毫秒生產(chǎn)一個數(shù)字丹皱,從 1 到 3
    val source = produce<Int> {
        println("Begin") // 在輸出中標(biāo)記協(xié)程開始運行
        for (x in 1..3) {
            delay(200) // 等待 200 毫秒
            send(x) // 將數(shù)字 x 發(fā)送到通道中
        }
    }
    // 從 source 中打印元素
    println("Elements:")
    source.consumeEach { // 在 source 中消費元素
        println(it)
    }
    // 再次從 source 中打印元素
    println("Again:")
    source.consumeEach { // 從 source 中消費元素
        println(it)
    }
}

可以在這里獲取完整代碼妒穴。

這段代碼產(chǎn)生了如下輸出:

Elements:
Begin
1
2
3
Again:

注意,“Begin”只被打印了一次摊崭,因為 [produce] 協(xié)程構(gòu)建器 被執(zhí)行的時候讼油,
只創(chuàng)建了一個協(xié)程來生產(chǎn)元素流。所有被生產(chǎn)的元素都被
[ReceiveChannel.consumeEach][consumeEach]

再次嘗試接收元素將不會接收到任何東西呢簸。

讓我們使用 kotlinx-coroutines-reactive 模塊中的 [publish] 協(xié)程構(gòu)建器 代替 kotlinx-coroutines-core 模塊中的 [produce]
來重寫這段代碼矮台。代碼保持相似,
但是在 source 接收 [ReceiveChannel] 類型的地方根时,現(xiàn)在它接收響應(yīng)式流的
Publisher
類型瘦赫,在 [consumeEach] 被用來 消費 來源于通道中的元素的地方,
現(xiàn)在 [collect][org.reactivestreams.Publisher.collect] 被用來從 publisher 中 收集 元素蛤迎。

fun main() = runBlocking<Unit> {
    // 創(chuàng)建一個 publisher确虱,每 200 毫秒生產(chǎn)一個數(shù)字,從 1 到 3
    val source = publish<Int> {
    //           ^^^^^^^  <--- 這里與先前的示例不同
        println("Begin") // 在輸出中標(biāo)記協(xié)程開始運行
        for (x in 1..3) {
            delay(200) // 等待 200 毫秒
            send(x) // 將數(shù)字 x 發(fā)送到通道中
        }
    }
    // 從 source 中打印元素
    println("Elements:")
    source.collect { // 收集元素 it
        println(it)
    }
    // 再次從 source 中打印元素
    println("Again:")
    source.collect { // 收集元素 it
        println(it)
    }
}

可以在這里獲取完整代碼替裆。

現(xiàn)在這段代碼的輸出變?yōu)椋?/p>

Elements:
Begin
1
2
3
Again:
Begin
1
2
3

-->在流中生產(chǎn)元素的方案校辩。當(dāng) 收集 發(fā)生時,它成為元素的實際流辆童。每個收集器都將接收到一個相同或不同的流宜咒,這取決于取決于 Publisher 的相應(yīng)實現(xiàn)如何工作。

[publish] 協(xié)程構(gòu)建器在上面的示例中沒有啟動協(xié)程把鉴,
但是每個 [collect][org.reactivestreams.Publisher.collect] 調(diào)用都會啟動一個協(xié)程故黑。
在這里有兩個 [publish],這也就是為什么我們看到了“Begin”被打印了兩次纸镊。

注意倍阐,我們可以使用 Rx 中的
publish
操作符與 connect
方法來替換我們在通道中所看到的類似的行為。

訂閱與取消

在先前小節(jié)的第二個示例中逗威,source.collect { ... } 用于收集所有的元素峰搪。
相反,我們可以使用 [openSubscription][org.reactivestreams.Publisher.openSubscription]

fun main() = runBlocking<Unit> {
    val source = Flowable.range(1, 5) // 五個數(shù)字的區(qū)間
        .doOnSubscribe { println("OnSubscribe") } // 提供了一些可被觀察的點
        .doOnComplete { println("OnComplete") }   // ...
        .doFinally { println("Finally") }         // ... 在正在執(zhí)行的代碼中
    var cnt = 0 
    source.openSubscription().consume { // 在源中打開通道
        for (x in this) { // 迭代通道以從中接收元素
            println(x)
            if (++cnt >= 3) break // 當(dāng)三個元素被打印出來的時候凯旭,執(zhí)行 break
        }
        // 注意:當(dāng)這段代碼執(zhí)行完成并阻塞的時候 `consume` 取消了該通道
    }
}

可以在這里獲取完整代碼概耻。

它將產(chǎn)生如下輸出:

OnSubscribe
1
2
3
Finally

使用一個顯式的 openSubscription 我們應(yīng)該從相應(yīng)的訂閱源 [cancel][ReceiveChannel.cancel]
訂閱使套,但是這里不需要顯式調(diào)用 cancel——
[consume] 會在內(nèi)部為我們做這些事。
配置
doFinally
監(jiān)聽器并打印“Finally”來確認訂閱確實被取消了鞠柄。注意“OnComplete”
永遠不會被打印侦高,因為我們沒有消費所有的元素。

如果我們 collect 所有的元素厌杜,那我們不需要使用顯式的 cancel

fun main() = runBlocking<Unit> {
    val source = Flowable.range(1, 5) // 五個數(shù)字的區(qū)間
        .doOnSubscribe { println("OnSubscribe") } // 提供了一些可被觀察的點
        .doOnComplete { println("OnComplete") }   // ……
        .doFinally { println("Finally") }         // …… 在正在執(zhí)行的代碼中
    // collect the source fully
    source.collect { println(it) }
}

可以在這里獲取完整代碼奉呛。

我們得到如下輸出:

OnSubscribe
1
2
3
OnComplete
Finally
4
5

注意,如何使“OnComplete”與“Finally”在最后的元素“4”與“5”之前輸出夯尽。
在這個示例中它將發(fā)生在我們的 main
函數(shù)在協(xié)程中執(zhí)行時瞧壮,使用 [runBlocking] 協(xié)程構(gòu)建器來啟動它。
我們的主協(xié)程在 flowable 中使用 source.collect { …… } 擴展函數(shù)來接收通道匙握。
當(dāng)它等待源發(fā)射元素的時候該主協(xié)程是 掛起的 咆槽,
當(dāng)最后一個元素被 Flowable.range(1, 5) 發(fā)射時它
恢復(fù) 了主協(xié)程,它被分派到主線程上打印出來
最后一個元素在稍后的時間點打印圈纺,而 source 執(zhí)行完成并打印“Finally”秦忿。

背壓

在 Rx Java 2.x 中一個支持背壓的類被稱為
Flowable
在下面的示例中蛾娶,我們可以使用 kotlinx-coroutines-rx2 模塊中的協(xié)程構(gòu)建器 [rxFlowable] 來定義一個
發(fā)送從 1 到 3 三個整數(shù)的 flowable灯谣。
在調(diào)用掛起的 [send][SendChannel.send] 函數(shù)之前,
它在輸出中打印了一條消息茫叭,所以我們可以來研究它是如何操作的酬屉。

這些整數(shù)在主線程的上下文中被產(chǎn)生,
但是在使用 Rx 的
observeOn
操作符后緩沖區(qū)大小為 1 的訂閱被轉(zhuǎn)移到了另一個線程揍愁。
為了模擬訂閱者很慢呐萨,它使用了 Thread.sleep 來模擬消耗 500 毫秒來處理每個元素。

fun main() = runBlocking<Unit> { 
    // 協(xié)程 —— 在主線程的上下文中快速生成元素
    val source = rxFlowable {
        for (x in 1..3) {
            send(x) // 這是一個掛起函數(shù)
            println("Sent $x") // 在成功發(fā)送元素后打印
        }
    }
    // 使用 Rx 讓一個處理速度很慢的訂閱者在另一個線程訂閱
    source
        .observeOn(Schedulers.io(), false, 1) // 指定緩沖區(qū)大小為 1 個元素
        .doOnComplete { println("Complete") }
        .subscribe { x ->
            Thread.sleep(500) // 處理每個元素消耗 500 毫秒
            println("Processed $x")
        }
    delay(2000) // 掛起主線程幾秒鐘
}

可以在這里獲取完整代碼莽囤。

這段代碼的輸出更好地說明了背壓是如何在協(xié)程中工作的:

Sent 1
Processed 1
Sent 2
Processed 2
Sent 3
Processed 3
Complete

當(dāng)嘗試發(fā)送另一個元素的時候谬擦,我們看到這里的處理者協(xié)程是如何將第一個元素放入緩沖區(qū)并掛起的。
只有當(dāng)消費者處理了第一個元素朽缎,處理者才會發(fā)送第二個元素并恢復(fù)惨远,等等。

Rx 主題 vs 廣播通道

[BroadcastChannel]话肖。在 Rx 中有一種主題——
BehaviorSubject
被用來管理狀態(tài):

fun main() {
    val subject = BehaviorSubject.create<String>()
    subject.onNext("one")
    subject.onNext("two") // 更新 BehaviorSubject 的狀態(tài)北秽,“one”變量被丟棄
    // 現(xiàn)在訂閱這個主題并打印所有信息
    subject.subscribe(System.out::println)
    subject.onNext("three")
    subject.onNext("four")
}

可以在這里獲取完整代碼。

這段代碼打印訂閱時主題的當(dāng)前狀態(tài)及其所有后續(xù)更新:

two
three
four

您可以像使用任何其他響應(yīng)式流一樣從協(xié)程訂閱主題:

fun main() = runBlocking<Unit> {
    val subject = BehaviorSubject.create<String>()
    subject.onNext("one")
    subject.onNext("two")
    // 現(xiàn)在啟動一個協(xié)程來打印所有東西
    GlobalScope.launch(Dispatchers.Unconfined) { // 在不受限的上下文中啟動協(xié)程
        subject.collect { println(it) }
    }
    subject.onNext("three")
    subject.onNext("four")
}

可以在這里獲取完整代碼最筒。

結(jié)果是相同的:

two
three
four

這里我們使用 [Dispatchers.Unconfined] 協(xié)程上下文以與 Rx 中的訂閱相同的行為啟動消費協(xié)程贺氓。

協(xié)程的優(yōu)點是很容易獲得單線程 UI 更新的混合行為。
一個典型的 UI 應(yīng)用程序不需要響應(yīng)每一個狀態(tài)改變床蜘。只有最近的狀態(tài)需要被響應(yīng)辙培。
應(yīng)用程序狀態(tài)的一系列背靠背更新只需在UI中反映一次蔑水,

-->并釋放主線程:

fun main() = runBlocking<Unit> {
    val subject = BehaviorSubject.create<String>()
    subject.onNext("one")
    subject.onNext("two")
    // 現(xiàn)在啟動一個協(xié)程來打印最近的更新
    launch { // 為協(xié)程使用主線程的上下文
        subject.collect { println(it) }
    }
    subject.onNext("three")
    subject.onNext("four")
    yield() // 使主線程讓步來啟動協(xié)程 <--- 這里
    subject.onComplete() // 現(xiàn)在也結(jié)束主題的序列來取消消費者
}

可以在這里獲取完整代碼。

現(xiàn)在協(xié)程只處理(打友锶铩)最近的更新:

four

沒有橋接到響應(yīng)式流:

fun main() = runBlocking<Unit> {
    val broadcast = ConflatedBroadcastChannel<String>()
    broadcast.offer("one")
    broadcast.offer("two")
    // 現(xiàn)在啟動一個協(xié)程來打印最近的更新
    launch { // 為協(xié)程使用主線程的上下文
        broadcast.consumeEach { println(it) }
    }
    broadcast.offer("three")
    broadcast.offer("four")
    yield() // 使主線程讓步來啟動協(xié)程
    broadcast.close() // 現(xiàn)在也結(jié)束主題的序列來取消消費者
}

可以在這里獲取完整代碼搀别。

它與基于 BehaviorSubject 的先前的示例產(chǎn)生了相同的輸出:

four

PublishSubject

操作符

-->以及反轉(zhuǎn)來處理相關(guān)的流尾抑。創(chuàng)建你自己的并且支持背壓的操作符是非常臭名昭著以及困難的歇父。

協(xié)程與通道則被設(shè)計為提供完全相反的體驗。這里沒有內(nèi)建的操作符再愈,
但是處理元素流是非常簡單并且自動支持背壓的庶骄,
即使是在你沒有明確思考這一點的情況下。

本節(jié)將展示以協(xié)程為基礎(chǔ)而實現(xiàn)的一系列響應(yīng)式流操作符践磅。

Range

讓我們推出自己的為響應(yīng)式流 Publisher 接口實現(xiàn)的
range

-->這篇博客中。
它需要很多代碼灸异。
以下是與協(xié)同程序相對應(yīng)的代碼:

fun CoroutineScope.range(context: CoroutineContext, start: Int, count: Int) = publish<Int>(context) {
    for (x in start until start + count) send(x)
}

-->的小型響應(yīng)式流庫府适。

直接在協(xié)程中使用:

fun main() = runBlocking<Unit> {
    // Range 從 runBlocking 中承襲了父 job,但是使用 Dispatchers.Default 來覆蓋調(diào)度器
    range(Dispatchers.Default, 1, 5).collect { println(it) }
}

可以在這里獲取完整代碼肺樟。

這段代碼的結(jié)果非常值得我們期待:

1
2
3
4
5

熔合 filter 與 map 操作符

響應(yīng)式操作符比如:
filter 以及
map

fun <T, R> Publisher<T>.fusedFilterMap(
    context: CoroutineContext,   // 協(xié)程執(zhí)行的上下文
    predicate: (T) -> Boolean,   // 過濾器 predicate
    mapper: (T) -> R             // mapper 函數(shù)
) = publish<R>(context) {
    collect {                    // 收集源流
        if (predicate(it))       // 過濾的部分
            send(mapper(it))     // 轉(zhuǎn)換的部分
    }        
}

使用先前 range 中的示例我們可以測試我們的 fusedFilterMap
來過濾偶數(shù)以及將它們映射到字符串:

fun main() = runBlocking<Unit> {
   range(1, 5)
       .fusedFilterMap(Dispatchers.Unconfined, { it % 2 == 0}, { "$it is even" })
       .collect { println(it) } // 打印所有的字符串結(jié)果
}

可以在這里獲取完整代碼檐春。

不難看出其結(jié)果會是:

2 is even
4 is even

Take until

我們來實現(xiàn)自己的
takeUntil
操作符。這非常棘手么伯,
因為需要跟蹤和管理對兩個流的訂閱疟暖。

fun <T, U> Publisher<T>.takeUntil(context: CoroutineContext, other: Publisher<U>) = publish<T>(context) {
    this@takeUntil.openSubscription().consume { // 顯式地打開 Publisher<T> 的通道
        val current = this
        other.openSubscription().consume { // 顯式地打開 Publisher<U> 的通道
            val other = this
            whileSelect {
                other.onReceive { false }            // 釋放任何從 `other` 接收到的元素
                current.onReceive { send(it); true } // 在這個通道上重新發(fā)送元素并繼續(xù)
            }
        }
    }
}

這段代碼使用 [whileSelect] 作為比 while(select{...}) {} 循環(huán)更好的快捷方式,并且 Kotlin 的
[consume] 表達式會在退出時關(guān)閉通道田柔,并取消訂閱相應(yīng)的發(fā)布者俐巴。

在下面手寫的
range
interval

fun CoroutineScope.rangeWithInterval(time: Long, start: Int, count: Int) = publish<Int> {
    for (x in start until start + count) { 
        delay(time) // 在每次發(fā)送數(shù)字之前等待
        send(x)
    }
}

下面的代碼展示了 takeUntil 是如何工作的:

fun main() = runBlocking<Unit> {
    val slowNums = rangeWithInterval(200, 1, 10)         // 數(shù)字之間有 200 毫秒的間隔
    val stop = rangeWithInterval(500, 1, 10)             // 第一個在 500 毫秒之后
    slowNums.takeUntil(Dispatchers.Unconfined, stop).collect { println(it) } // 讓我們測試它
}

可以在這里獲取完整代碼。

執(zhí)行

1
2

Merge

使用協(xié)程處理多個數(shù)據(jù)流總是至少有兩種方法硬爆。一種方法是調(diào)用

merge
操作符來使用第二種的方法:

fun <T> Publisher<Publisher<T>>.merge(context: CoroutineContext) = publish<T>(context) {
  collect { pub -> // for each publisher collected
      launch {  // launch a child coroutine
          pub.collect { send(it) } // resend all element from this publisher
      }
  }
}

-->并且當(dāng) publish 協(xié)程被取消或以其它的方式執(zhí)行完畢時將會被取消欣舵。

fun CoroutineScope.testPub() = publish<Publisher<Int>> {
    send(rangeWithInterval(250, 1, 4)) // 數(shù)字 1 在 250 毫秒發(fā)射,2 在 500 毫秒缀磕,3 在 750 毫秒缘圈,4 在 1000 毫秒
    delay(100) // 等待 100 毫秒
    send(rangeWithInterval(500, 11, 3)) // 數(shù)字 11 在 600 毫秒,12 在 1100 毫秒袜蚕,13 在 1600 毫秒
    delay(1100) // 在啟動完成后的 1.2 秒之后等待 1.1 秒
}

這段測試代碼在 testPub 上使用了 merge 并且展示結(jié)果:

fun main() = runBlocking<Unit> {
    testPub().merge(Dispatchers.Unconfined).collect { println(it) } // 打印整個流
}

可以在這里獲取完整代碼糟把。

并且結(jié)果應(yīng)該是:

1
2
11
3
4
12
13

協(xié)程上下文

所有的示例操作符都在先前的示例中顯式地設(shè)置了
CoroutineContext

線程與 Rx

下面的示例中展示了基本的在 Rx 中管理線程上下文。
這里的 rangeWithIntervalRxrangeWithInterval 函數(shù)使用 Rx 的
zip牲剃、range 以及 interval 操作符的一個實現(xiàn)遣疯。

fun rangeWithIntervalRx(scheduler: Scheduler, time: Long, start: Int, count: Int): Flowable<Int> = 
    Flowable.zip(
        Flowable.range(start, count),
        Flowable.interval(time, TimeUnit.MILLISECONDS, scheduler),
        BiFunction { x, _ -> x })

fun main() {
    rangeWithIntervalRx(Schedulers.computation(), 100, 1, 3)
        .subscribe { println("$it on thread ${Thread.currentThread().name}") }
    Thread.sleep(1000)
}

可以在這里獲取完整代碼。

我們顯式地通過
Schedulers.computation()

1 on thread RxComputationThreadPool-1
2 on thread RxComputationThreadPool-1
3 on thread RxComputationThreadPool-1

線程與協(xié)程

在協(xié)程的世界中 Schedulers.computation() 大致對應(yīng)于 [Dispatchers.Default]颠黎,
所以先前的示例將變成下面這樣:

fun rangeWithInterval(context: CoroutineContext, time: Long, start: Int, count: Int) = publish<Int>(context) {
    for (x in start until start + count) { 
        delay(time) // 在每次數(shù)字發(fā)射前等待
        send(x)
    }
}

fun main() {
    Flowable.fromPublisher(rangeWithInterval(Dispatchers.Default, 100, 1, 3))
        .subscribe { println("$it on thread ${Thread.currentThread().name}") }
    Thread.sleep(1000)
}

可以在這里獲取完整代碼另锋。

產(chǎn)生的輸出將類似于:

1 on thread ForkJoinPool.commonPool-worker-1
2 on thread ForkJoinPool.commonPool-worker-1
3 on thread ForkJoinPool.commonPool-worker-1

這里我們使用了 Rx 的
subscribe

Rx observeOn

在 Rx 中你操作使用了特別的操作符來為調(diào)用鏈修改線程上下文滞项。
如果你不熟悉它的話,
你可以從這篇很棒的教程中獲得指導(dǎo)夭坪。

舉例來說文判,這里使用了
observeOn
操作符。讓我們修改先前的示例并觀察使用 Schedulers.computation() 的效果:

fun rangeWithInterval(context: CoroutineContext, time: Long, start: Int, count: Int) = publish<Int>(context) {
    for (x in start until start + count) { 
        delay(time) // 在每次數(shù)字發(fā)射前等待
        send(x)
    }
}

fun main() {
    Flowable.fromPublisher(rangeWithInterval(Dispatchers.Default, 100, 1, 3))
        .observeOn(Schedulers.computation())                           // <-- 添加了這一行
        .subscribe { println("$it on thread ${Thread.currentThread().name}") }
    Thread.sleep(1000)
}

可以在這里獲取完整代碼室梅。

這里的輸出有所不同了戏仓,提示了“RxComputationThreadPool”:

1 on thread RxComputationThreadPool-1
2 on thread RxComputationThreadPool-1
3 on thread RxComputationThreadPool-1

使用協(xié)程上下文來管理它們

替代使用 Rx 的 subscribe 操作符遍歷結(jié)果:

fun rangeWithIntervalRx(scheduler: Scheduler, time: Long, start: Int, count: Int): Flowable<Int> =
    Flowable.zip(
        Flowable.range(start, count),
        Flowable.interval(time, TimeUnit.MILLISECONDS, scheduler),
        BiFunction { x, _ -> x })

fun main() = runBlocking<Unit> {
    rangeWithIntervalRx(Schedulers.computation(), 100, 1, 3)
        .collect { println("$it on thread ${Thread.currentThread().name}") }
}

可以在這里獲取完整代碼。

結(jié)果信息將會被打印在主線程中:

1 on thread main
2 on thread main
3 on thread main

不受限的上下文

-->看到了 subscribe 運算符的示例亡鼠。

在協(xié)程的世界中赏殃,[Dispatchers.Unconfined] 則承擔(dān)了類似的任務(wù)。讓我們修改先前的示例间涵,

-->只是等待的時候使用 [Job.join]:

fun rangeWithIntervalRx(scheduler: Scheduler, time: Long, start: Int, count: Int): Flowable<Int> =
    Flowable.zip(
        Flowable.range(start, count),
        Flowable.interval(time, TimeUnit.MILLISECONDS, scheduler),
        BiFunction { x, _ -> x })

fun main() = runBlocking<Unit> {
    val job = launch(Dispatchers.Unconfined) { // 在不受限的山下文中啟動一個新協(xié)程(沒有它自己的線程池)
        rangeWithIntervalRx(Schedulers.computation(), 100, 1, 3)
            .collect { println("$it on thread ${Thread.currentThread().name}") }
    }
    job.join() // 等待我們的協(xié)程結(jié)束
}

可以在這里獲取完整代碼仁热。

1 on thread RxComputationThreadPool-1
2 on thread RxComputationThreadPool-1
3 on thread RxComputationThreadPool-1

注意,該 [Dispatchers.Unconfined] 上下文應(yīng)該被謹(jǐn)慎使用勾哩。由于降低了局部的堆棧操作以及開銷調(diào)度的減少抗蠢,

如果一個協(xié)程將一個元素發(fā)送到一個通道,那么調(diào)用的線程
[send][SendChannel.send] 可能會開始使用 [Dispatchers.Unconfined] 調(diào)度程序執(zhí)行協(xié)程的代碼思劳。

-->線程切換操作符是非常類似的迅矛。這在 Rx 中是默認正常的,因為操作符經(jīng)常做一些非常小塊的工作并且你必須做一些復(fù)雜處理來合并大量的操作符潜叛。然而秽褒,這對于協(xié)程來說是不常見的,



[runBlocking]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/run-blocking.html
[Dispatchers.Unconfined]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-dispatchers/-unconfined.html
[yield]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/yield.html
[Dispatchers.Default]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-dispatchers/-default.html
[Job.join]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-job/join.html

[Channel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-channel/index.html
[ReceiveChannel.receive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/receive.html
[produce]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/produce.html
[consumeEach]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/consume-each.html
[ReceiveChannel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/index.html
[ReceiveChannel.cancel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/cancel.html
[consume]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/consume.html
[SendChannel.send]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-send-channel/send.html
[BroadcastChannel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-broadcast-channel/index.html
[ConflatedBroadcastChannel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-conflated-broadcast-channel/index.html

[select]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.selects/select.html
[whileSelect]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.selects/while-select.html


[publish]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.reactive/publish.html
[org.reactivestreams.Publisher.collect]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.reactive/org.reactivestreams.-publisher/collect.html
[org.reactivestreams.Publisher.openSubscription]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.reactive/org.reactivestreams.-publisher/open-subscription.html


[rxFlowable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/rx-flowable.html

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末威兜,一起剝皮案震驚了整個濱河市销斟,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌椒舵,老刑警劉巖票堵,帶你破解...
    沈念sama閱讀 206,602評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異逮栅,居然都是意外死亡悴势,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,442評論 2 382
  • 文/潘曉璐 我一進店門措伐,熙熙樓的掌柜王于貴愁眉苦臉地迎上來特纤,“玉大人,你說我怎么就攤上這事侥加∨醮妫” “怎么了?”我有些...
    開封第一講書人閱讀 152,878評論 0 344
  • 文/不壞的土叔 我叫張陵,是天一觀的道長昔穴。 經(jīng)常有香客問我镰官,道長,這世上最難降的妖魔是什么吗货? 我笑而不...
    開封第一講書人閱讀 55,306評論 1 279
  • 正文 為了忘掉前任泳唠,我火速辦了婚禮瓷产,結(jié)果婚禮上溪椎,老公的妹妹穿的比我還像新娘。我一直安慰自己馒吴,他們只是感情好勇垛,可當(dāng)我...
    茶點故事閱讀 64,330評論 5 373
  • 文/花漫 我一把揭開白布脖母。 她就那樣靜靜地躺著,像睡著了一般闲孤。 火紅的嫁衣襯著肌膚如雪谆级。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,071評論 1 285
  • 那天讼积,我揣著相機與錄音哨苛,去河邊找鬼。 笑死币砂,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的玻侥。 我是一名探鬼主播决摧,決...
    沈念sama閱讀 38,382評論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼凑兰!你這毒婦竟也來了掌桩?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,006評論 0 259
  • 序言:老撾萬榮一對情侶失蹤姑食,失蹤者是張志新(化名)和其女友劉穎波岛,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體音半,經(jīng)...
    沈念sama閱讀 43,512評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡则拷,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 35,965評論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了曹鸠。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片煌茬。...
    茶點故事閱讀 38,094評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖彻桃,靈堂內(nèi)的尸體忽然破棺而出坛善,到底是詐尸還是另有隱情,我是刑警寧澤,帶...
    沈念sama閱讀 33,732評論 4 323
  • 正文 年R本政府宣布眠屎,位于F島的核電站剔交,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏改衩。R本人自食惡果不足惜岖常,卻給世界環(huán)境...
    茶點故事閱讀 39,283評論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望燎字。 院中可真熱鬧腥椒,春花似錦、人聲如沸候衍。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,286評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽蛉鹿。三九已至滨砍,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間妖异,已是汗流浹背惋戏。 一陣腳步聲響...
    開封第一講書人閱讀 31,512評論 1 262
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留他膳,地道東北人响逢。 一個月前我還...
    沈念sama閱讀 45,536評論 2 354
  • 正文 我出身青樓,卻偏偏與公主長得像棕孙,于是被迫代替她去往敵國和親舔亭。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 42,828評論 2 345

推薦閱讀更多精彩內(nèi)容