譯者前言
這是協(xié)程官方文檔中早期的一篇關(guān)于響應(yīng)式流與協(xié)程關(guān)系的一篇指南听诸,2019 年下半年后,由于協(xié)程推出了自己的異步流 API蚕泽,
所以這篇指南已經(jīng)被官方刪除晌梨,但當(dāng)年本人翻譯這篇指南的時候還是花了不少功夫,所以特此在這里保留下來须妻,以備在以后需要的時候參考仔蝌。
響應(yīng)式流與協(xié)程指南
-->協(xié)程指南中的基礎(chǔ)協(xié)程概念不是必須的,
在 kotlinx.coroutines
項目中有一系列和響應(yīng)式流相關(guān)的模塊:
- kotlinx-coroutines-reactive ——為 Reactive Streams 提供的適配
- kotlinx-coroutines-reactor ——為 Reactor 提供的適配
- kotlinx-coroutines-rx2 ——為 RxJava 2.x 提供的適配
本指南主要基于 Reactive Streams 的規(guī)范并使用
Publisher
接口和一些基于 RxJava 2.x 的示例荒吏,
該示例實現(xiàn)了響應(yīng)式流的規(guī)范敛惊。
歡迎你在 Github 上 clone
kotlinx.coroutines
項目
reactive/kotlinx-coroutines-rx2/test/guide
路徑中。
目錄
響應(yīng)式流與通道的區(qū)別
本節(jié)主要包含響應(yīng)式流與以協(xié)程為基礎(chǔ)的通道的不同點绰更。
迭代的基礎(chǔ)
[Channel] 與如下所示的響應(yīng)式流類有類似的概念:
- Reactive stream Publisher瞧挤;
- Rx Java 1.x Observable锡宋;
- Rx Java 2.x Flowable,
Publisher
的實現(xiàn)者特恬。
它們都描述了一個異步的有限或無限的元素流(在 Rx 中又名 items)员辩,
并且都支持背壓。
在通道中每調(diào)用一次 [receive][ReceiveChannel.receive] 就消費一個元素鸵鸥。
讓我們用以下的例子來說明:
fun main() = runBlocking<Unit> {
// 創(chuàng)建一個通道,該通道每200毫秒生產(chǎn)一個數(shù)字丹皱,從 1 到 3
val source = produce<Int> {
println("Begin") // 在輸出中標(biāo)記協(xié)程開始運行
for (x in 1..3) {
delay(200) // 等待 200 毫秒
send(x) // 將數(shù)字 x 發(fā)送到通道中
}
}
// 從 source 中打印元素
println("Elements:")
source.consumeEach { // 在 source 中消費元素
println(it)
}
// 再次從 source 中打印元素
println("Again:")
source.consumeEach { // 從 source 中消費元素
println(it)
}
}
可以在這里獲取完整代碼妒穴。
這段代碼產(chǎn)生了如下輸出:
Elements:
Begin
1
2
3
Again:
注意,“Begin”只被打印了一次摊崭,因為 [produce] 協(xié)程構(gòu)建器 被執(zhí)行的時候讼油,
只創(chuàng)建了一個協(xié)程來生產(chǎn)元素流。所有被生產(chǎn)的元素都被
[ReceiveChannel.consumeEach][consumeEach]
再次嘗試接收元素將不會接收到任何東西呢簸。
讓我們使用 kotlinx-coroutines-reactive
模塊中的 [publish] 協(xié)程構(gòu)建器 代替 kotlinx-coroutines-core
模塊中的 [produce]
來重寫這段代碼矮台。代碼保持相似,
但是在 source
接收 [ReceiveChannel] 類型的地方根时,現(xiàn)在它接收響應(yīng)式流的
Publisher
類型瘦赫,在 [consumeEach] 被用來 消費 來源于通道中的元素的地方,
現(xiàn)在 [collect][org.reactivestreams.Publisher.collect] 被用來從 publisher 中 收集 元素蛤迎。
fun main() = runBlocking<Unit> {
// 創(chuàng)建一個 publisher确虱,每 200 毫秒生產(chǎn)一個數(shù)字,從 1 到 3
val source = publish<Int> {
// ^^^^^^^ <--- 這里與先前的示例不同
println("Begin") // 在輸出中標(biāo)記協(xié)程開始運行
for (x in 1..3) {
delay(200) // 等待 200 毫秒
send(x) // 將數(shù)字 x 發(fā)送到通道中
}
}
// 從 source 中打印元素
println("Elements:")
source.collect { // 收集元素 it
println(it)
}
// 再次從 source 中打印元素
println("Again:")
source.collect { // 收集元素 it
println(it)
}
}
可以在這里獲取完整代碼替裆。
現(xiàn)在這段代碼的輸出變?yōu)椋?/p>
Elements:
Begin
1
2
3
Again:
Begin
1
2
3
-->在流中生產(chǎn)元素的方案校辩。當(dāng) 收集 發(fā)生時,它成為元素的實際流辆童。每個收集器都將接收到一個相同或不同的流宜咒,這取決于取決于 Publisher
的相應(yīng)實現(xiàn)如何工作。
[publish] 協(xié)程構(gòu)建器在上面的示例中沒有啟動協(xié)程把鉴,
但是每個 [collect][org.reactivestreams.Publisher.collect] 調(diào)用都會啟動一個協(xié)程故黑。
在這里有兩個 [publish],這也就是為什么我們看到了“Begin”被打印了兩次纸镊。
注意倍阐,我們可以使用 Rx 中的
publish
操作符與 connect
方法來替換我們在通道中所看到的類似的行為。
訂閱與取消
在先前小節(jié)的第二個示例中逗威,source.collect { ... }
用于收集所有的元素峰搪。
相反,我們可以使用 [openSubscription][org.reactivestreams.Publisher.openSubscription]
fun main() = runBlocking<Unit> {
val source = Flowable.range(1, 5) // 五個數(shù)字的區(qū)間
.doOnSubscribe { println("OnSubscribe") } // 提供了一些可被觀察的點
.doOnComplete { println("OnComplete") } // ...
.doFinally { println("Finally") } // ... 在正在執(zhí)行的代碼中
var cnt = 0
source.openSubscription().consume { // 在源中打開通道
for (x in this) { // 迭代通道以從中接收元素
println(x)
if (++cnt >= 3) break // 當(dāng)三個元素被打印出來的時候凯旭,執(zhí)行 break
}
// 注意:當(dāng)這段代碼執(zhí)行完成并阻塞的時候 `consume` 取消了該通道
}
}
可以在這里獲取完整代碼概耻。
它將產(chǎn)生如下輸出:
OnSubscribe
1
2
3
Finally
使用一個顯式的 openSubscription
我們應(yīng)該從相應(yīng)的訂閱源 [cancel][ReceiveChannel.cancel]
訂閱使套,但是這里不需要顯式調(diào)用 cancel
——
[consume] 會在內(nèi)部為我們做這些事。
配置
doFinally
監(jiān)聽器并打印“Finally”來確認訂閱確實被取消了鞠柄。注意“OnComplete”
永遠不會被打印侦高,因為我們沒有消費所有的元素。
如果我們 collect
所有的元素厌杜,那我們不需要使用顯式的 cancel
:
fun main() = runBlocking<Unit> {
val source = Flowable.range(1, 5) // 五個數(shù)字的區(qū)間
.doOnSubscribe { println("OnSubscribe") } // 提供了一些可被觀察的點
.doOnComplete { println("OnComplete") } // ……
.doFinally { println("Finally") } // …… 在正在執(zhí)行的代碼中
// collect the source fully
source.collect { println(it) }
}
可以在這里獲取完整代碼奉呛。
我們得到如下輸出:
OnSubscribe
1
2
3
OnComplete
Finally
4
5
注意,如何使“OnComplete”與“Finally”在最后的元素“4”與“5”之前輸出夯尽。
在這個示例中它將發(fā)生在我們的 main
函數(shù)在協(xié)程中執(zhí)行時瞧壮,使用 [runBlocking] 協(xié)程構(gòu)建器來啟動它。
我們的主協(xié)程在 flowable 中使用 source.collect { …… }
擴展函數(shù)來接收通道匙握。
當(dāng)它等待源發(fā)射元素的時候該主協(xié)程是 掛起的 咆槽,
當(dāng)最后一個元素被 Flowable.range(1, 5)
發(fā)射時它
恢復(fù) 了主協(xié)程,它被分派到主線程上打印出來
最后一個元素在稍后的時間點打印圈纺,而 source 執(zhí)行完成并打印“Finally”秦忿。
背壓
在 Rx Java 2.x 中一個支持背壓的類被稱為
Flowable。
在下面的示例中蛾娶,我們可以使用 kotlinx-coroutines-rx2
模塊中的協(xié)程構(gòu)建器 [rxFlowable] 來定義一個
發(fā)送從 1 到 3 三個整數(shù)的 flowable灯谣。
在調(diào)用掛起的 [send][SendChannel.send] 函數(shù)之前,
它在輸出中打印了一條消息茫叭,所以我們可以來研究它是如何操作的酬屉。
這些整數(shù)在主線程的上下文中被產(chǎn)生,
但是在使用 Rx 的
observeOn
操作符后緩沖區(qū)大小為 1 的訂閱被轉(zhuǎn)移到了另一個線程揍愁。
為了模擬訂閱者很慢呐萨,它使用了 Thread.sleep
來模擬消耗 500 毫秒來處理每個元素。
fun main() = runBlocking<Unit> {
// 協(xié)程 —— 在主線程的上下文中快速生成元素
val source = rxFlowable {
for (x in 1..3) {
send(x) // 這是一個掛起函數(shù)
println("Sent $x") // 在成功發(fā)送元素后打印
}
}
// 使用 Rx 讓一個處理速度很慢的訂閱者在另一個線程訂閱
source
.observeOn(Schedulers.io(), false, 1) // 指定緩沖區(qū)大小為 1 個元素
.doOnComplete { println("Complete") }
.subscribe { x ->
Thread.sleep(500) // 處理每個元素消耗 500 毫秒
println("Processed $x")
}
delay(2000) // 掛起主線程幾秒鐘
}
可以在這里獲取完整代碼莽囤。
這段代碼的輸出更好地說明了背壓是如何在協(xié)程中工作的:
Sent 1
Processed 1
Sent 2
Processed 2
Sent 3
Processed 3
Complete
當(dāng)嘗試發(fā)送另一個元素的時候谬擦,我們看到這里的處理者協(xié)程是如何將第一個元素放入緩沖區(qū)并掛起的。
只有當(dāng)消費者處理了第一個元素朽缎,處理者才會發(fā)送第二個元素并恢復(fù)惨远,等等。
Rx 主題 vs 廣播通道
[BroadcastChannel]话肖。在 Rx 中有一種主題——
BehaviorSubject
被用來管理狀態(tài):
fun main() {
val subject = BehaviorSubject.create<String>()
subject.onNext("one")
subject.onNext("two") // 更新 BehaviorSubject 的狀態(tài)北秽,“one”變量被丟棄
// 現(xiàn)在訂閱這個主題并打印所有信息
subject.subscribe(System.out::println)
subject.onNext("three")
subject.onNext("four")
}
可以在這里獲取完整代碼。
這段代碼打印訂閱時主題的當(dāng)前狀態(tài)及其所有后續(xù)更新:
two
three
four
您可以像使用任何其他響應(yīng)式流一樣從協(xié)程訂閱主題:
fun main() = runBlocking<Unit> {
val subject = BehaviorSubject.create<String>()
subject.onNext("one")
subject.onNext("two")
// 現(xiàn)在啟動一個協(xié)程來打印所有東西
GlobalScope.launch(Dispatchers.Unconfined) { // 在不受限的上下文中啟動協(xié)程
subject.collect { println(it) }
}
subject.onNext("three")
subject.onNext("four")
}
可以在這里獲取完整代碼最筒。
結(jié)果是相同的:
two
three
four
這里我們使用 [Dispatchers.Unconfined] 協(xié)程上下文以與 Rx 中的訂閱相同的行為啟動消費協(xié)程贺氓。
協(xié)程的優(yōu)點是很容易獲得單線程 UI 更新的混合行為。
一個典型的 UI 應(yīng)用程序不需要響應(yīng)每一個狀態(tài)改變床蜘。只有最近的狀態(tài)需要被響應(yīng)辙培。
應(yīng)用程序狀態(tài)的一系列背靠背更新只需在UI中反映一次蔑水,
-->并釋放主線程:
fun main() = runBlocking<Unit> {
val subject = BehaviorSubject.create<String>()
subject.onNext("one")
subject.onNext("two")
// 現(xiàn)在啟動一個協(xié)程來打印最近的更新
launch { // 為協(xié)程使用主線程的上下文
subject.collect { println(it) }
}
subject.onNext("three")
subject.onNext("four")
yield() // 使主線程讓步來啟動協(xié)程 <--- 這里
subject.onComplete() // 現(xiàn)在也結(jié)束主題的序列來取消消費者
}
可以在這里獲取完整代碼。
現(xiàn)在協(xié)程只處理(打友锶铩)最近的更新:
four
沒有橋接到響應(yīng)式流:
fun main() = runBlocking<Unit> {
val broadcast = ConflatedBroadcastChannel<String>()
broadcast.offer("one")
broadcast.offer("two")
// 現(xiàn)在啟動一個協(xié)程來打印最近的更新
launch { // 為協(xié)程使用主線程的上下文
broadcast.consumeEach { println(it) }
}
broadcast.offer("three")
broadcast.offer("four")
yield() // 使主線程讓步來啟動協(xié)程
broadcast.close() // 現(xiàn)在也結(jié)束主題的序列來取消消費者
}
可以在這里獲取完整代碼搀别。
它與基于 BehaviorSubject
的先前的示例產(chǎn)生了相同的輸出:
four
操作符
-->以及反轉(zhuǎn)來處理相關(guān)的流尾抑。創(chuàng)建你自己的并且支持背壓的操作符是非常臭名昭著以及困難的歇父。
協(xié)程與通道則被設(shè)計為提供完全相反的體驗。這里沒有內(nèi)建的操作符再愈,
但是處理元素流是非常簡單并且自動支持背壓的庶骄,
即使是在你沒有明確思考這一點的情況下。
本節(jié)將展示以協(xié)程為基礎(chǔ)而實現(xiàn)的一系列響應(yīng)式流操作符践磅。
Range
讓我們推出自己的為響應(yīng)式流 Publisher
接口實現(xiàn)的
range
-->這篇博客中。
它需要很多代碼灸异。
以下是與協(xié)同程序相對應(yīng)的代碼:
fun CoroutineScope.range(context: CoroutineContext, start: Int, count: Int) = publish<Int>(context) {
for (x in start until start + count) send(x)
}
-->的小型響應(yīng)式流庫府适。
直接在協(xié)程中使用:
fun main() = runBlocking<Unit> {
// Range 從 runBlocking 中承襲了父 job,但是使用 Dispatchers.Default 來覆蓋調(diào)度器
range(Dispatchers.Default, 1, 5).collect { println(it) }
}
可以在這里獲取完整代碼肺樟。
這段代碼的結(jié)果非常值得我們期待:
1
2
3
4
5
熔合 filter 與 map 操作符
fun <T, R> Publisher<T>.fusedFilterMap(
context: CoroutineContext, // 協(xié)程執(zhí)行的上下文
predicate: (T) -> Boolean, // 過濾器 predicate
mapper: (T) -> R // mapper 函數(shù)
) = publish<R>(context) {
collect { // 收集源流
if (predicate(it)) // 過濾的部分
send(mapper(it)) // 轉(zhuǎn)換的部分
}
}
使用先前 range
中的示例我們可以測試我們的 fusedFilterMap
來過濾偶數(shù)以及將它們映射到字符串:
fun main() = runBlocking<Unit> {
range(1, 5)
.fusedFilterMap(Dispatchers.Unconfined, { it % 2 == 0}, { "$it is even" })
.collect { println(it) } // 打印所有的字符串結(jié)果
}
可以在這里獲取完整代碼檐春。
不難看出其結(jié)果會是:
2 is even
4 is even
Take until
我們來實現(xiàn)自己的
takeUntil
操作符。這非常棘手么伯,
因為需要跟蹤和管理對兩個流的訂閱疟暖。
fun <T, U> Publisher<T>.takeUntil(context: CoroutineContext, other: Publisher<U>) = publish<T>(context) {
this@takeUntil.openSubscription().consume { // 顯式地打開 Publisher<T> 的通道
val current = this
other.openSubscription().consume { // 顯式地打開 Publisher<U> 的通道
val other = this
whileSelect {
other.onReceive { false } // 釋放任何從 `other` 接收到的元素
current.onReceive { send(it); true } // 在這個通道上重新發(fā)送元素并繼續(xù)
}
}
}
}
這段代碼使用 [whileSelect] 作為比 while(select{...}) {}
循環(huán)更好的快捷方式,并且 Kotlin 的
[consume] 表達式會在退出時關(guān)閉通道田柔,并取消訂閱相應(yīng)的發(fā)布者俐巴。
fun CoroutineScope.rangeWithInterval(time: Long, start: Int, count: Int) = publish<Int> {
for (x in start until start + count) {
delay(time) // 在每次發(fā)送數(shù)字之前等待
send(x)
}
}
下面的代碼展示了 takeUntil
是如何工作的:
fun main() = runBlocking<Unit> {
val slowNums = rangeWithInterval(200, 1, 10) // 數(shù)字之間有 200 毫秒的間隔
val stop = rangeWithInterval(500, 1, 10) // 第一個在 500 毫秒之后
slowNums.takeUntil(Dispatchers.Unconfined, stop).collect { println(it) } // 讓我們測試它
}
可以在這里獲取完整代碼。
執(zhí)行
1
2
Merge
使用協(xié)程處理多個數(shù)據(jù)流總是至少有兩種方法硬爆。一種方法是調(diào)用
merge
操作符來使用第二種的方法:
fun <T> Publisher<Publisher<T>>.merge(context: CoroutineContext) = publish<T>(context) {
collect { pub -> // for each publisher collected
launch { // launch a child coroutine
pub.collect { send(it) } // resend all element from this publisher
}
}
}
-->并且當(dāng) publish
協(xié)程被取消或以其它的方式執(zhí)行完畢時將會被取消欣舵。
fun CoroutineScope.testPub() = publish<Publisher<Int>> {
send(rangeWithInterval(250, 1, 4)) // 數(shù)字 1 在 250 毫秒發(fā)射,2 在 500 毫秒缀磕,3 在 750 毫秒缘圈,4 在 1000 毫秒
delay(100) // 等待 100 毫秒
send(rangeWithInterval(500, 11, 3)) // 數(shù)字 11 在 600 毫秒,12 在 1100 毫秒袜蚕,13 在 1600 毫秒
delay(1100) // 在啟動完成后的 1.2 秒之后等待 1.1 秒
}
這段測試代碼在 testPub
上使用了 merge
并且展示結(jié)果:
fun main() = runBlocking<Unit> {
testPub().merge(Dispatchers.Unconfined).collect { println(it) } // 打印整個流
}
可以在這里獲取完整代碼糟把。
并且結(jié)果應(yīng)該是:
1
2
11
3
4
12
13
協(xié)程上下文
所有的示例操作符都在先前的示例中顯式地設(shè)置了
CoroutineContext
線程與 Rx
下面的示例中展示了基本的在 Rx 中管理線程上下文。
這里的 rangeWithIntervalRx
是rangeWithInterval
函數(shù)使用 Rx 的
zip
牲剃、range
以及 interval
操作符的一個實現(xiàn)遣疯。
fun rangeWithIntervalRx(scheduler: Scheduler, time: Long, start: Int, count: Int): Flowable<Int> =
Flowable.zip(
Flowable.range(start, count),
Flowable.interval(time, TimeUnit.MILLISECONDS, scheduler),
BiFunction { x, _ -> x })
fun main() {
rangeWithIntervalRx(Schedulers.computation(), 100, 1, 3)
.subscribe { println("$it on thread ${Thread.currentThread().name}") }
Thread.sleep(1000)
}
可以在這里獲取完整代碼。
我們顯式地通過
Schedulers.computation()
1 on thread RxComputationThreadPool-1
2 on thread RxComputationThreadPool-1
3 on thread RxComputationThreadPool-1
線程與協(xié)程
在協(xié)程的世界中 Schedulers.computation()
大致對應(yīng)于 [Dispatchers.Default]颠黎,
所以先前的示例將變成下面這樣:
fun rangeWithInterval(context: CoroutineContext, time: Long, start: Int, count: Int) = publish<Int>(context) {
for (x in start until start + count) {
delay(time) // 在每次數(shù)字發(fā)射前等待
send(x)
}
}
fun main() {
Flowable.fromPublisher(rangeWithInterval(Dispatchers.Default, 100, 1, 3))
.subscribe { println("$it on thread ${Thread.currentThread().name}") }
Thread.sleep(1000)
}
可以在這里獲取完整代碼另锋。
產(chǎn)生的輸出將類似于:
1 on thread ForkJoinPool.commonPool-worker-1
2 on thread ForkJoinPool.commonPool-worker-1
3 on thread ForkJoinPool.commonPool-worker-1
這里我們使用了 Rx 的
subscribe
Rx observeOn
在 Rx 中你操作使用了特別的操作符來為調(diào)用鏈修改線程上下文滞项。
如果你不熟悉它的話,
你可以從這篇很棒的教程中獲得指導(dǎo)夭坪。
舉例來說文判,這里使用了
observeOn
操作符。讓我們修改先前的示例并觀察使用 Schedulers.computation()
的效果:
fun rangeWithInterval(context: CoroutineContext, time: Long, start: Int, count: Int) = publish<Int>(context) {
for (x in start until start + count) {
delay(time) // 在每次數(shù)字發(fā)射前等待
send(x)
}
}
fun main() {
Flowable.fromPublisher(rangeWithInterval(Dispatchers.Default, 100, 1, 3))
.observeOn(Schedulers.computation()) // <-- 添加了這一行
.subscribe { println("$it on thread ${Thread.currentThread().name}") }
Thread.sleep(1000)
}
可以在這里獲取完整代碼室梅。
這里的輸出有所不同了戏仓,提示了“RxComputationThreadPool”:
1 on thread RxComputationThreadPool-1
2 on thread RxComputationThreadPool-1
3 on thread RxComputationThreadPool-1
使用協(xié)程上下文來管理它們
替代使用 Rx 的 subscribe
操作符遍歷結(jié)果:
fun rangeWithIntervalRx(scheduler: Scheduler, time: Long, start: Int, count: Int): Flowable<Int> =
Flowable.zip(
Flowable.range(start, count),
Flowable.interval(time, TimeUnit.MILLISECONDS, scheduler),
BiFunction { x, _ -> x })
fun main() = runBlocking<Unit> {
rangeWithIntervalRx(Schedulers.computation(), 100, 1, 3)
.collect { println("$it on thread ${Thread.currentThread().name}") }
}
可以在這里獲取完整代碼。
結(jié)果信息將會被打印在主線程中:
1 on thread main
2 on thread main
3 on thread main
不受限的上下文
-->看到了 subscribe
運算符的示例亡鼠。
在協(xié)程的世界中赏殃,[Dispatchers.Unconfined] 則承擔(dān)了類似的任務(wù)。讓我們修改先前的示例间涵,
-->只是等待的時候使用 [Job.join]:
fun rangeWithIntervalRx(scheduler: Scheduler, time: Long, start: Int, count: Int): Flowable<Int> =
Flowable.zip(
Flowable.range(start, count),
Flowable.interval(time, TimeUnit.MILLISECONDS, scheduler),
BiFunction { x, _ -> x })
fun main() = runBlocking<Unit> {
val job = launch(Dispatchers.Unconfined) { // 在不受限的山下文中啟動一個新協(xié)程(沒有它自己的線程池)
rangeWithIntervalRx(Schedulers.computation(), 100, 1, 3)
.collect { println("$it on thread ${Thread.currentThread().name}") }
}
job.join() // 等待我們的協(xié)程結(jié)束
}
可以在這里獲取完整代碼仁热。
1 on thread RxComputationThreadPool-1
2 on thread RxComputationThreadPool-1
3 on thread RxComputationThreadPool-1
注意,該 [Dispatchers.Unconfined] 上下文應(yīng)該被謹(jǐn)慎使用勾哩。由于降低了局部的堆棧操作以及開銷調(diào)度的減少抗蠢,
如果一個協(xié)程將一個元素發(fā)送到一個通道,那么調(diào)用的線程
[send][SendChannel.send] 可能會開始使用 [Dispatchers.Unconfined] 調(diào)度程序執(zhí)行協(xié)程的代碼思劳。
-->線程切換操作符是非常類似的迅矛。這在 Rx 中是默認正常的,因為操作符經(jīng)常做一些非常小塊的工作并且你必須做一些復(fù)雜處理來合并大量的操作符潜叛。然而秽褒,這對于協(xié)程來說是不常見的,
[runBlocking]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/run-blocking.html
[Dispatchers.Unconfined]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-dispatchers/-unconfined.html
[yield]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/yield.html
[Dispatchers.Default]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-dispatchers/-default.html
[Job.join]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-job/join.html
[Channel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-channel/index.html
[ReceiveChannel.receive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/receive.html
[produce]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/produce.html
[consumeEach]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/consume-each.html
[ReceiveChannel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/index.html
[ReceiveChannel.cancel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/cancel.html
[consume]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/consume.html
[SendChannel.send]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-send-channel/send.html
[BroadcastChannel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-broadcast-channel/index.html
[ConflatedBroadcastChannel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-conflated-broadcast-channel/index.html
[select]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.selects/select.html
[whileSelect]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.selects/while-select.html
[publish]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.reactive/publish.html
[org.reactivestreams.Publisher.collect]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.reactive/org.reactivestreams.-publisher/collect.html
[org.reactivestreams.Publisher.openSubscription]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.reactive/org.reactivestreams.-publisher/open-subscription.html
[rxFlowable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/rx-flowable.html