一篇文章理解Kotlin協(xié)程

這篇文章大部分內(nèi)容來自:https://github.com/Kotlin/kotlinx.coroutines/blob/master/coroutines-guide.md

這篇教程基于一系列的例子來講解kotlinx.coroutines的核心特性
筆者使用的kotlin版本為1.2.51属瓣,協(xié)程核心庫的版本為0.23.4
注意:協(xié)程庫還處于實(shí)驗(yàn)階段课竣,API是不穩(wěn)定的,謹(jǐn)慎用于生產(chǎn)環(huán)境

簡介&安裝

作為一個(gè)語言胁编,kotlin僅在標(biāo)準(zhǔn)庫里提供最少的底層API华匾,從而讓其他庫能利用協(xié)程构罗。不像其他有相似能力的語言,asyncawait不是kotlin的關(guān)鍵字溪北,甚至不是標(biāo)準(zhǔn)庫的一部分桐绒。

kotlinx.coroutines是一個(gè)非常豐富的庫夺脾,包含若干高層協(xié)程啟動(dòng)機(jī)制(launch,async等)。你需要添加kotlinx-coroutines-core模塊的依賴才能在你的項(xiàng)目中使用這些機(jī)制茉继。

<!-- 筆者寫這篇文章時(shí)劳翰,最新的kotlin版本為1.2.51 -->
<properties>
    <kotlin.version>1.2.51</kotlin.version>
</properties>

<dependency>
    <groupId>org.jetbrains.kotlinx</groupId>
    <artifactId>kotlinx-coroutines-core</artifactId>
    <version>0.23.4</version>
</dependency>

基本概念

這個(gè)章節(jié)覆蓋了協(xié)程的基本概念。

你的第一個(gè)協(xié)程

運(yùn)行下面的代碼:

fun main(args: Array<String>) {
    launch { // 在后臺(tái)啟動(dòng)一個(gè)新的協(xié)程馒疹,然后繼續(xù)執(zhí)行
        delay(1000L) // 不阻塞的延遲1s
        println("World!") // 延遲后打印
    }
    println("Hello,") // 當(dāng)協(xié)程延遲時(shí)佳簸,主線程還在跑
    Thread.sleep(2000L) // 阻塞主線程2s,為了讓jvm不掛掉
}

運(yùn)行結(jié)果:

Hello,
World!

本質(zhì)上颖变,協(xié)程是輕量級的線程生均。可以使用launch協(xié)程建造器啟動(dòng)腥刹。你可以將launch { ... }替換為thread { ... }马胧,delay(...)替換為Thread.sleep(...)以達(dá)到相同的效果。試試看衔峰。

如果你只把launch替換為thread佩脊,編譯器會(huì)產(chǎn)生如下錯(cuò)誤:

Suspend functions are only allowed to be called from a coroutine or another suspend function

這是因?yàn)?code>delay是一個(gè)特殊的函數(shù),這里暫且稱之為掛起函數(shù)垫卤,它不會(huì)阻塞線程威彰,但是會(huì)掛起協(xié)程,而且它只能在協(xié)程中使用穴肘。

連接阻塞和非阻塞世界

第一個(gè)例子在同一塊代碼中混合了非阻塞delay(...)和阻塞的Thread.sleep(...)歇盼,很容易就搞暈了哪個(gè)是阻塞的,哪個(gè)是非阻塞的评抚。下面豹缀,我們使用runBlocking協(xié)程建造器,明確指明阻塞:

fun main(args: Array<String>) { 
    launch { // 在后臺(tái)啟動(dòng)一個(gè)新的協(xié)程慨代,然后繼續(xù)執(zhí)行
        delay(1000L)
        println("World!")
    }
    println("Hello,") // 主線程立即繼續(xù)跑
    runBlocking {     // 這塊阻塞了主線程
        delay(2000L)  // 延遲2s邢笙,讓jvm不掛掉
    } 
}

結(jié)果還是一樣的,但是這代碼只用了非阻塞的dalay侍匙。主線程調(diào)用了runBlocking氮惯,然后一直被阻塞,一直到runBlocking執(zhí)行完成丈积。

這個(gè)例子可以改得更符合語言習(xí)慣些筐骇,用runBlocking包裝主函數(shù)的執(zhí)行:

fun main(args: Array<String>) = runBlocking<Unit> { // 開始主協(xié)程
    launch { // 在后臺(tái)啟動(dòng)一個(gè)新的協(xié)程,然后繼續(xù)執(zhí)行
        delay(1000L)
        println("World!")
    }
    println("Hello,") // 主協(xié)程立即繼續(xù)跑
    delay(2000L)      // 延遲2s江滨,讓jvm不掛掉
}

這里runBlocking<Unit> { ... }的作用像一個(gè)適配器铛纬,用來啟動(dòng)頂層的主協(xié)程。明確指定是Unit返回類型唬滑,是因?yàn)橐粋€(gè)格式良好的kotlin主函數(shù)必須返回Unit告唆。

下面是為掛起函數(shù)寫單元測試的方法:

class MyTest {
    @Test
    fun testMySuspendingFunction() = runBlocking<Unit> {
        // 這里我們可以通過任何我們喜歡的斷言風(fēng)格使用掛起函數(shù)
    }
}
等待任務(wù)(job)

當(dāng)另一個(gè)協(xié)程在運(yùn)行時(shí)棺弊,延遲一段時(shí)間并不是一個(gè)好辦法。讓我們明確的等待(非阻塞的方式)擒悬,直到我們啟動(dòng)的后臺(tái)任務(wù)完成:

fun main(args: Array<String>) = runBlocking<Unit> {
    val job = launch { // 啟動(dòng)一個(gè)新協(xié)程模她,并創(chuàng)建一個(gè)對其任務(wù)的引用
        delay(1000L)
        println("World!")
    }
    println("Hello,")
    job.join() // 等到子協(xié)程完成
}

結(jié)果還是一樣的,但是主協(xié)程和后臺(tái)任務(wù)沒有用后臺(tái)任務(wù)的執(zhí)行時(shí)間聯(lián)系在一起懂牧。好多了侈净。

提取函數(shù)重構(gòu)

讓我們來提取出launch { ... }塊中的代碼到另一個(gè)函數(shù)中。當(dāng)你用“提取函數(shù)”重構(gòu)這塊代碼時(shí)僧凤,你會(huì)得到一個(gè)用suspend修飾的新函數(shù)畜侦。這是你一個(gè)掛起函數(shù)。掛起函數(shù)可用于協(xié)程中躯保,就像使用普通函數(shù)一樣旋膳,但是它們有額外的特性——可以調(diào)用其他的掛起函數(shù)去掛起協(xié)程的執(zhí)行,像這個(gè)例子中的delay途事。

fun main(args: Array<String>) = runBlocking<Unit> {
    val job = launch { doWorld() }
    println("Hello,")
    job.join()
}

// 這是你第一個(gè)掛起函數(shù)
suspend fun doWorld() {
    delay(1000L)
    println("World!")
}
協(xié)程是輕量級的

運(yùn)行下面的代碼:

fun main(args: Array<String>) = runBlocking<Unit> {
    val jobs = List(100_000) { // 啟動(dòng)大量的協(xié)程验懊,并返回它們的任務(wù)
        launch {
            delay(1000L)
            print(".")
        }
    }
    jobs.forEach { it.join() } // 等待其他全部的任務(wù)完成
}

這里啟動(dòng)了十萬個(gè)協(xié)程,一秒之后尸变,每個(gè)協(xié)程打印了一個(gè)點(diǎn)义图。你用線程試試?(很有可能就OOM了)

協(xié)程像守護(hù)線程

下面的代碼啟動(dòng)了一個(gè)長時(shí)間運(yùn)行的協(xié)程振惰,一秒打印兩次"I'm sleeping"歌溉,然后延遲一段后,從主函數(shù)返回:

fun main(args: Array<String>) = runBlocking<Unit> {
    launch {
        repeat(1000) { i ->
            println("I'm sleeping $i ...")
            delay(500L)
        }
    }
    delay(1300L) // 延遲后就退出
}

你運(yùn)行看看骑晶,打印了三行,然后就結(jié)束了:

I'm sleeping 0 ...
I'm sleeping 1 ...
I'm sleeping 2 ...

活躍的協(xié)程并不會(huì)辈莼郏活進(jìn)程桶蛔,所以它更像守護(hù)線程。

取消和超時(shí)

這個(gè)章節(jié)包含了協(xié)程的取消和超時(shí)漫谷。

取消協(xié)程執(zhí)行

在小應(yīng)用中仔雷,從主函數(shù)返回看起來是個(gè)結(jié)束所有協(xié)程的好辦法。在更大的舔示、長時(shí)間運(yùn)行的應(yīng)用中碟婆,需要更細(xì)粒度的控制。launch函數(shù)返回了一個(gè)可以取消協(xié)程執(zhí)行的Job

fun main(args: Array<String>) = runBlocking<Unit> {
    val job = launch {
        repeat(1000) { i ->
            println("I'm sleeping $i ...")
            delay(500L)
        }
    }
    delay(1300L) // 延遲一小會(huì)
    println("main: I'm tired of waiting!")
    job.cancel() // 取消任務(wù)
    job.join() // 等待任務(wù)結(jié)束
    println("main: Now I can quit.")
}

運(yùn)行惕稻,產(chǎn)生如下輸出:

I'm sleeping 0 ...
I'm sleeping 1 ...
I'm sleeping 2 ...
main: I'm tired of waiting!
main: Now I can quit.

在調(diào)用job.cancel不久后竖共,因?yàn)閰f(xié)程被取消掉了,所以看不到任何輸出了俺祠。Job的擴(kuò)展函數(shù)cancelAndJoin結(jié)合了canceljoin的作用公给。

取消是需要配合的

協(xié)程的取消是需要配合的借帘,協(xié)程的代碼必須可配合取消。所有kotlinx.coroutines中的掛起函數(shù)都是可取消的淌铐,這些掛起函數(shù)會(huì)檢查協(xié)程的取消狀態(tài)肺然,若已取消則拋出CancellationException。然而腿准,如果協(xié)程正處于運(yùn)算中际起,沒有檢查取消狀態(tài),那么其不可被取消吐葱,如下所示:

fun main(args: Array<String>) = runBlocking<Unit> {
    val startTime = System.currentTimeMillis()
    val job = launch {
        var nextPrintTime = startTime
        var i = 0
        while (i < 5) { // 浪費(fèi)CPU的循環(huán)運(yùn)算
            // 2秒打印一個(gè)消息
            if (System.currentTimeMillis() >= nextPrintTime) {
                println("I'm sleeping ${i++} ...")
                nextPrintTime += 500L
            }
        }
    }
    delay(1300L) // 延遲一會(huì)
    println("main: I'm tired of waiting!")
    job.cancelAndJoin() // 取消任務(wù)加叁,并等待其結(jié)束
    println("main: Now I can quit.")
}

運(yùn)行看看。結(jié)果是唇撬,在取消之后它匕,其持續(xù)打印"I'm sleeping",直到循環(huán)5次之后窖认,任務(wù)自己結(jié)束豫柬。

使運(yùn)算代碼可取消

兩種方式使運(yùn)算代碼可取消。

  1. 周期執(zhí)行掛起函數(shù)扑浸,檢查取消狀態(tài)烧给。yield函數(shù)是達(dá)到這個(gè)目的的好辦法。
  2. 顯式的檢查取消狀態(tài)喝噪。

我們來嘗試下第二種方式:

fun main(args: Array<String>) = runBlocking<Unit> {
    val startTime = System.currentTimeMillis()
    val job = launch {
        var nextPrintTime = startTime
        var i = 0
        while (isActive) { // 可取消的運(yùn)算
            // 一秒打印兩次消息
            if (System.currentTimeMillis() >= nextPrintTime) {
                println("I'm sleeping ${i++} ...")
                nextPrintTime += 500L
            }
        }
    }
    delay(1300L) // 延遲一會(huì)
    println("main: I'm tired of waiting!")
    job.cancelAndJoin() // 取消任務(wù)础嫡,并等待其結(jié)束
    println("main: Now I can quit.")
}

現(xiàn)在,循環(huán)就是可取消的了酝惧。isActive是協(xié)程內(nèi)CoroutineScope對象的的一個(gè)屬性榴鼎。

用finally釋放資源

可取消的掛起函數(shù)在取消時(shí)會(huì)拋出CancellationException,通常的方式就可以處理了晚唇。例如巫财,try {...} finally {...}表達(dá)式或Kotlinuseuse api)函數(shù),會(huì)在協(xié)程取消時(shí)哩陕,執(zhí)行結(jié)束動(dòng)作平项。

fun main(args: Array<String>) = runBlocking<Unit> {
    val job = launch {
        try {
            repeat(1000) { i ->
                println("I'm sleeping $i ...")
                delay(500L)
            }
        } finally {
            println("I'm running finally")
        }
    }
    delay(1300L) // 延遲一會(huì)
    println("main: I'm tired of waiting!")
    job.cancelAndJoin() // 取消任務(wù),并等待其結(jié)束
    println("main: Now I can quit.")
}

joincancelAndJoin都會(huì)等所有結(jié)束動(dòng)作完成悍及,因此以上代碼的輸出如下:

I'm sleeping 0 ...
I'm sleeping 1 ...
I'm sleeping 2 ...
main: I'm tired of waiting!
I'm running finally
main: Now I can quit.

運(yùn)行不可取消的代碼塊

任何嘗試在finally塊中使用掛起函數(shù)均會(huì)產(chǎn)生CancellationException闽瓢,因?yàn)檫\(yùn)行代碼的協(xié)程已經(jīng)被取消了。通常心赶,這不是個(gè)問題扣讼,因?yàn)樗杏辛己脤?shí)現(xiàn)的關(guān)閉操作(關(guān)閉文件,取消任務(wù)园担,或關(guān)閉任何種類的溝通通道)通常是非阻塞的届谈,并不需要掛起函數(shù)參與枯夜。但是,在很少的情況下艰山,你需要在取消的協(xié)程中進(jìn)行掛起操作湖雹,那么你可以將相應(yīng)代碼的使用withContext(NonCancellable) {...}包裝,這里使用了withContext函數(shù)和NonCancellable上下文,如下所示:

fun main(args: Array<String>) = runBlocking<Unit> {
    val job = launch {
        try {
            repeat(1000) { i ->
                println("I'm sleeping $i ...")
                delay(500L)
            }
        } finally {
            withContext(NonCancellable) {
                println("I'm running finally")
                delay(1000L)
                println("And I've just delayed for 1 sec because I'm non-cancellable")
            }
        }
    }
    delay(1300L) // 延遲一會(huì)
    println("main: I'm tired of waiting!")
    job.cancelAndJoin() // 取消任務(wù),并等待其結(jié)束
    println("main: Now I can quit.")
}
超時(shí)

超時(shí)孽文,是實(shí)際應(yīng)用中取消協(xié)程執(zhí)行最顯而易見的原因皮官,因?yàn)槠鋱?zhí)行時(shí)間超時(shí)了靠粪。你還在用手動(dòng)記錄相應(yīng)任務(wù)的引用,然后啟動(dòng)另一個(gè)協(xié)程在延遲一段時(shí)間后取消記錄的那個(gè)協(xié)程?不用那么麻煩啦,這里有個(gè)`withTimeout``函數(shù)诗箍,幫你做了這些工作⊥彀Γ看看吧:

fun main(args: Array<String>) = runBlocking<Unit> {
    withTimeout(1300L) {
        repeat(1000) { i ->
            println("I'm sleeping $i ...")
            delay(500L)
        }
    }
}

輸出如下:

I'm sleeping 0 ...
I'm sleeping 1 ...
I'm sleeping 2 ...
Exception in thread "main" kotlinx.coroutines.experimental.TimeoutCancellationException: Timed out waiting for 1300 MILLISECONDS
    at kotlinx.coroutines.experimental.ScheduledKt.TimeoutCancellationException(Scheduled.kt:202)
    at kotlinx.coroutines.experimental.TimeoutCoroutine.run(Scheduled.kt:100)
    at kotlinx.coroutines.experimental.EventLoopBase$DelayedRunnableTask.run(EventLoop.kt:322)
    at kotlinx.coroutines.experimental.EventLoopBase.processNextEvent(EventLoop.kt:148)
    at kotlinx.coroutines.experimental.BlockingCoroutine.joinBlocking(Builders.kt:82)
    at kotlinx.coroutines.experimental.BuildersKt__BuildersKt.runBlocking(Builders.kt:58)
    ...

TimeoutCancellationException是由withTimeout拋出的CancellationException的子類滤祖。之前,我們沒有在控制臺(tái)看到過異常堆棧信息瓶籽,因?yàn)樵谝粋€(gè)取消了的協(xié)程中匠童,CancellationException通常是一個(gè)結(jié)束協(xié)程的正常原因。然而塑顺,這個(gè)例子中汤求,我們正好在main函數(shù)中使用了withTimeout
因?yàn)槿∠且粋€(gè)異常严拒,因此所有的資源將要被正常的關(guān)閉扬绪。如果你需要針對超時(shí)做一些額外的處理,可以將代碼用try {...} catch (e: TimeoutCancellationException) {...}包裝糙俗,或者使用與withTimeout類似的withTimeoutOrNull勒奇,后者返回null而不是拋出異常:

fun main(args: Array<String>) = runBlocking<Unit> {
    val result = withTimeoutOrNull(1300L) {
        repeat(1000) { i ->
            println("I'm sleeping $i ...")
            delay(500L)
        }
        "Done" // will get cancelled before it produces this result
    }
    println("Result is $result")
}

這次就沒有異常了:

I'm sleeping 0 ...
I'm sleeping 1 ...
I'm sleeping 2 ...
Result is null

組合掛起函數(shù)

這個(gè)章節(jié)覆蓋了組合掛起函數(shù)的多種方式。

默認(rèn)是順序的

假設(shè)我們有倆定義好有用的掛起函數(shù)巧骚,例如遠(yuǎn)程服務(wù)調(diào)用,或者計(jì)算格二。這里劈彪,我們先假設(shè)這倆有用,實(shí)際上就是延遲一小會(huì):

suspend fun doSomethingUsefulOne(): Int {
    delay(1000L) // 假裝有一波騷操作
    return 13
}

suspend fun doSomethingUsefulTwo(): Int {
    delay(1000L) // 假裝有一波騷操作
    return 29
}

如果我們要順序執(zhí)行他們顶猜,先執(zhí)行doSomethingUsefulOne沧奴,再執(zhí)行doSomethingUsefulTwo,然后其計(jì)算結(jié)果之和长窄,怎么搞滔吠?實(shí)際使用中纲菌,需要用第一個(gè)函數(shù)的返回值來判斷是否需要調(diào)用第二個(gè)函數(shù)或如何去調(diào),才會(huì)這么做疮绷。
我們用順序調(diào)用就可以了翰舌,因?yàn)閰f(xié)程中的代碼和普通的代碼一樣,默認(rèn)是順序執(zhí)行的冬骚。下面的例子通過測量倆掛起函數(shù)總的執(zhí)行時(shí)間來演示:

fun main(args: Array<String>) = runBlocking<Unit> {
    val time = measureTimeMillis {
        val one = doSomethingUsefulOne()
        val two = doSomethingUsefulTwo()
        println("The answer is ${one + two}")
    }
    println("Completed in $time ms")
}

結(jié)果近似如下:

The answer is 42
Completed in 2017 ms

用async來并發(fā)

如果doSomethingUsefulOnedoSomethingUsefulTwo的執(zhí)行沒有依賴關(guān)系椅贱,我們想通過并發(fā)來更快的獲取到結(jié)果,那該怎么做呢只冻?async就是干這茬的庇麦。
概念上來講,async就跟launch類似喜德。其啟動(dòng)了一個(gè)與其他協(xié)程并發(fā)運(yùn)行單獨(dú)協(xié)程(輕量級線程)山橄。區(qū)別是,launch返回了一個(gè)不攜帶任何結(jié)果的Job舍悯,但是async返回了一個(gè)Deferred航棱,一個(gè)輕量級非阻塞的future,表示一會(huì)就會(huì)返回結(jié)果的承諾贱呐。你可以在一個(gè)延期的值(deferred value)使用.await()來獲取最終的結(jié)果丧诺,但Deferred也是個(gè)Job,因此奄薇,需要的話驳阎,你也可以取消掉。

fun main(args: Array<String>) = runBlocking<Unit> {
    val time = measureTimeMillis {
        val one = async { doSomethingUsefulOne() }
        val two = async { doSomethingUsefulTwo() }
        println("The answer is ${one.await() + two.await()}")
    }
    println("Completed in $time ms")
}

產(chǎn)生如下輸出:

The answer is 42
Completed in 1017 ms

快了兩倍馁蒂,因?yàn)橛昧藘蓚€(gè)協(xié)程并發(fā)執(zhí)行呵晚。注意,協(xié)程的并發(fā)性總是明確的(多個(gè)協(xié)程同時(shí)運(yùn)行沫屡,那么肯定是并發(fā)的)饵隙。

懶啟動(dòng)async

async有個(gè)懶加載選項(xiàng),配置其可選參數(shù)start沮脖,值設(shè)置為CoroutineStart.LAZY金矛。只在值被await需要時(shí),或start函數(shù)被調(diào)用時(shí)才啟動(dòng)協(xié)程勺届。運(yùn)行下面的例子驶俊,跟前面的例子就多了個(gè)選項(xiàng):

fun main(args: Array<String>) = runBlocking<Unit> {
    val time = measureTimeMillis {
        val one = async(start = CoroutineStart.LAZY) { doSomethingUsefulOne() }
        val two = async(start = CoroutineStart.LAZY) { doSomethingUsefulTwo() }
        println("The answer is ${one.await() + two.await()}")
    }
    println("Completed in $time ms")
}

產(chǎn)生如下輸出:

The answer is 42
Completed in 2017 ms

好吧,又回到了順序執(zhí)行免姿,首先我們啟動(dòng)并等待one饼酿,然后啟動(dòng)并等待two。 這并不是懶執(zhí)行的預(yù)期場景。這個(gè)設(shè)計(jì)是用來替換標(biāo)準(zhǔn)的lazy函數(shù)故俐,如果其計(jì)算涉及了掛起函數(shù)想鹰。

異步風(fēng)格的函數(shù)

我們可以用async協(xié)程建造器定義異步風(fēng)格的函數(shù),異步的調(diào)用doSomethingUsefulOnedoSomethingUsefulTwo药版。給這些函數(shù)加上Async后綴是一個(gè)很好的風(fēng)格辑舷,強(qiáng)調(diào)了他們是異步計(jì)算的,需要用延期的值來獲取結(jié)果刚陡。

//  somethingUsefulOneAsync 的結(jié)果是 Deferred<Int> 類型
fun somethingUsefulOneAsync() = async {
    doSomethingUsefulOne()
}

// somethingUsefulTwoAsync 的結(jié)果是 Deferred<Int> 類型
fun somethingUsefulTwoAsync() = async {
    doSomethingUsefulTwo()
}

注意惩妇,這些xxxAsync函數(shù)不是掛起函數(shù),它們隨處均可使用筐乳。但是它們的使用總是意味著其行為是異步(也相當(dāng)于并發(fā))執(zhí)行的歌殃。
下面的例子展示了在協(xié)程之外的使用:

// 注意,這里沒有用runBlocking
fun main(args: Array<String>) {
    val time = measureTimeMillis {
        // 我們可以在協(xié)程外部初始化異步操作
        val one = somethingUsefulOneAsync()
        val two = somethingUsefulTwoAsync()
        // 但是等待結(jié)果必須涉及掛起或阻塞
        // 這里蝙云,我們用`runBlocking { ... }`阻塞主線程來獲取結(jié)果
        runBlocking {
            println("The answer is ${one.await() + two.await()}")
        }
    }
    println("Completed in $time ms")
}

結(jié)果如下:

The answer is 42
Completed in 1128 ms

協(xié)程的上下文(context)和調(diào)度器(dispatchers)

協(xié)程總是在上下文中執(zhí)行氓皱,上下文代表的值是CoroutineContext,定義在Kotlin標(biāo)準(zhǔn)庫中勃刨。
協(xié)程上下文是一系列的元素波材,主要的元素包括我們之前看到過的協(xié)程的Job,還有調(diào)度器身隐,這個(gè)章節(jié)會(huì)介紹廷区。

調(diào)度器和線程

協(xié)程上下文包括了一個(gè)決定相應(yīng)協(xié)程在哪個(gè)或哪些線程執(zhí)行的協(xié)程調(diào)度器(參見 CoroutineDispatcher)。協(xié)程調(diào)度器可以限制協(xié)程在具體的線程中執(zhí)行贾铝,或調(diào)度到一個(gè)線程池隙轻,或者無限制運(yùn)行。

所有像launchasync一樣的協(xié)程建造器都接受一個(gè)可選的CoroutineContext參數(shù)垢揩,這個(gè)參數(shù)可以用來顯式指定調(diào)度器和其他上下文元素玖绿。

嘗試下面的例子:

fun main(args: Array<String>) = runBlocking<Unit> {
    val jobs = arrayListOf<Job>()
    jobs += launch(Unconfined) { // 沒有限制 - 將在主線程執(zhí)行
        println("      'Unconfined': I'm working in thread ${Thread.currentThread().name}")
    }
    jobs += launch(coroutineContext) { // 父協(xié)程的上下文,runBlocking 協(xié)程
        println("'coroutineContext': I'm working in thread ${Thread.currentThread().name}")
    }
    jobs += launch(CommonPool) { // 將會(huì)調(diào)度到ForkJoinPool.commonPool(或等價(jià)的地方)
        println("      'CommonPool': I'm working in thread ${Thread.currentThread().name}")
    }
    jobs += launch(newSingleThreadContext("MyOwnThread")) { // 新線程
        println("          'newSTC': I'm working in thread ${Thread.currentThread().name}")
    }
    jobs.forEach { it.join() }
}

產(chǎn)生如下輸出(可能順序不同):

      'Unconfined': I'm working in thread main
      'CommonPool': I'm working in thread ForkJoinPool.commonPool-worker-1
          'newSTC': I'm working in thread MyOwnThread
'coroutineContext': I'm working in thread main

之前章節(jié)中使用的默認(rèn)調(diào)度器是DefaultDispatcher叁巨,當(dāng)前的實(shí)現(xiàn)中等同于CommonPool斑匪。因此,launch { ... }==launch(DefaultDispatcher) { ... }==launch(CommonPool) { ... }锋勺。
coroutineContextUnconfined上下文的區(qū)別一會(huì)看蚀瘸。

注意,newSingleThreadContext創(chuàng)建了一個(gè)新的線程庶橱,這是非常昂貴的資源苍姜。在實(shí)際應(yīng)用中,要么用完之后就用close函數(shù)回收悬包,要么就存儲(chǔ)在頂層變量中,在應(yīng)用中到處復(fù)用馍乙。

非限制(Unconfined) VS 限制(confined) 調(diào)度器

Unconfined協(xié)程調(diào)度器在調(diào)用線程啟動(dòng)協(xié)程布近,但直到第一個(gè)掛起點(diǎn)之前垫释。在掛起之后在什么線程恢復(fù)全權(quán)由之前調(diào)用的掛起函數(shù)決定。Unconfined調(diào)度器適合在協(xié)程不消耗CPU時(shí)間或不更新任何限制于特定線程共享數(shù)據(jù)(類似UI)的場景撑瞧。

再說coroutineContext屬性棵譬,它在任何協(xié)程中均可用,引用當(dāng)前協(xié)程的上下文预伺。通過這種方式订咸,父上下文可以被繼承。特別的酬诀,runBlocking創(chuàng)建的協(xié)程默認(rèn)調(diào)度器限定到調(diào)用者線程脏嚷,因此,繼承runBlocking的上下文就有了使用可預(yù)測的先進(jìn)先出調(diào)度限制在這個(gè)線程內(nèi)執(zhí)行的作用瞒御。

fun main(args: Array<String>) = runBlocking<Unit> {
    val jobs = arrayListOf<Job>()
    jobs += launch(Unconfined) { // 沒有限制 -- 在主線程運(yùn)行
        println("      'Unconfined': I'm working in thread ${Thread.currentThread().name}")
        delay(500)
        println("      'Unconfined': After delay in thread ${Thread.currentThread().name}")
    }
    jobs += launch(coroutineContext) { // 父(runBlocking協(xié)程)上下文,
        println("'coroutineContext': I'm working in thread ${Thread.currentThread().name}")
        delay(1000)
        println("'coroutineContext': After delay in thread ${Thread.currentThread().name}")
    }
    jobs.forEach { it.join() }
}

輸出結(jié)果:

      'Unconfined': I'm working in thread main
'coroutineContext': I'm working in thread main
      'Unconfined': After delay in thread kotlinx.coroutines.DefaultExecutor
'coroutineContext': After delay in thread main

因此父叙,繼承了runBlocking {...}coroutineContext的協(xié)程繼續(xù)在main線程執(zhí)行,而沒有限制的協(xié)程在delay函數(shù)使用的默認(rèn)線程池線程中恢復(fù)肴裙。

調(diào)試協(xié)程和線程

協(xié)程用Unconfined或默認(rèn)的多線程調(diào)度器可以從一個(gè)線程掛起趾唱,從另一個(gè)線程恢復(fù)。即使是用單線程的調(diào)度器蜻懦,也很難知道協(xié)程在什么地方甜癞,什么時(shí)候在干什么。在多線程應(yīng)用中宛乃,在日志中打印出線程的名字是一個(gè)通常的做法悠咱。一般的日志框架也是支持這個(gè)特性的。但當(dāng)使用協(xié)程時(shí)烤惊,僅線程名稱對上下文的描述不夠充分乔煞,因此,kotlinx.coroutines包含的設(shè)施讓調(diào)試更容易柒室。
給JVM參數(shù)加上-Dkotlinx.coroutines.debug渡贾,然后運(yùn)行下面的代碼:

fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")

fun main(args: Array<String>) = runBlocking<Unit> {
    val a = async(coroutineContext) {
        log("I'm computing a piece of the answer")
        6
    }
    val b = async(coroutineContext) {
        log("I'm computing another piece of the answer")
        7
    }
    log("The answer is ${a.await() * b.await()}")
}

三個(gè)協(xié)程:
主協(xié)程(#1) - runBlocking創(chuàng)建的協(xié)程
a(#2)、b(#3) - 兩個(gè)計(jì)算延遲返回值的協(xié)程
都在runBlocking的上下文限定在主線程中執(zhí)行雄右,輸出如下:

[main @coroutine#2] I'm computing a piece of the answer
[main @coroutine#3] I'm computing another piece of the answer
[main @coroutine#1] The answer is 42

log函數(shù)在方括號(hào)中打印出線程名稱和當(dāng)前執(zhí)行的協(xié)程標(biāo)識(shí)空骚,調(diào)試模式開啟的時(shí)候,這個(gè)標(biāo)識(shí)會(huì)連續(xù)的賦值給創(chuàng)建的協(xié)程擂仍。

線程間切換

給JVM參數(shù)加上-Dkotlinx.coroutines.debug囤屹,然后運(yùn)行下面的代碼:

fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")

fun main(args: Array<String>) {
    newSingleThreadContext("Ctx1").use { ctx1 ->
        newSingleThreadContext("Ctx2").use { ctx2 ->
            runBlocking(ctx1) {
                log("Started in ctx1")
                withContext(ctx2) {
                    log("Working in ctx2")
                }
                log("Back to ctx1")
            }
        }
    }
}

這個(gè)例子演示了幾種新技術(shù)。一是使用runBlocking時(shí)逢渔,指定了特定的上下文肋坚;二是使用withContext函數(shù)切換協(xié)程的上下文,但依然是在相同的協(xié)程中執(zhí)行。輸出如下:

[Ctx1 @coroutine#1] Started in ctx1
[Ctx2 @coroutine#1] Working in ctx2
[Ctx1 @coroutine#1] Back to ctx1

注意:這里使用了kotlin標(biāo)準(zhǔn)庫里的use函數(shù)智厌,用于當(dāng)newSingleThreadContext創(chuàng)建的線程不再被需要時(shí)诲泌,將其釋放。

上下文中的任務(wù)(Job)

協(xié)程的任務(wù)是上下文的一部分铣鹏。協(xié)程可以取出自己上下文的任務(wù)敷扫,用coroutineContext[Job]表達(dá)式:

fun main(args: Array<String>) = runBlocking<Unit> {
    println("My job is ${coroutineContext[Job]}")
}

在調(diào)試模式下,輸出如下:

My job is "coroutine#1":BlockingCoroutine{Active}@6d311334

因此诚卸,CoroutineScope中的isActivecoroutineContext[Job]?.isActive == true的便捷寫法葵第。

協(xié)程的父子關(guān)系

當(dāng)協(xié)程的coroutineContext用來啟動(dòng)另一個(gè)協(xié)程,那么新協(xié)程的Job就成了父協(xié)程Job的兒子合溺。想父協(xié)程取消的時(shí)候卒密,所有的子協(xié)程也會(huì)遞歸取消。

fun main(args: Array<String>) = runBlocking<Unit> {
    // 啟動(dòng)一個(gè)協(xié)程來處理請求
    val request = launch {
        // 生成兩個(gè)任務(wù)辫愉,一個(gè)有自己的上下文
        val job1 = launch {
            println("job1: I have my own context and execute independently!")
            delay(1000)
            println("job1: I am not affected by cancellation of the request")
        }
        // 另一個(gè)繼承父上下文
        val job2 = launch(coroutineContext) {
            delay(100)
            println("job2: I am a child of the request coroutine")
            delay(1000)
            println("job2: I will not execute this line if my parent request is cancelled")
        }
        // 當(dāng)子任務(wù)完成栅受,請求才算完成
        job1.join()
        job2.join()
    }
    delay(500)
    request.cancel() // 取消請求
    delay(1000) // 延遲1s,看看會(huì)發(fā)生什么
    println("main: Who has survived request cancellation?")
}

輸出如下:

job1: I have my own context and execute independently!
job2: I am a child of the request coroutine
job1: I am not affected by cancellation of the request
main: Who has survived request cancellation?

結(jié)合上下文

協(xié)程上下文可以用+操作符結(jié)合恭朗。右手邊的上下文替換掉左手邊上下文相關(guān)的條目屏镊。例如,協(xié)程的Job可以被繼承痰腮,但調(diào)度器會(huì)被替換而芥。

fun main(args: Array<String>) = runBlocking<Unit> {
    // 啟動(dòng)一個(gè)協(xié)程處理請求
    val request = launch(coroutineContext) { // 使用 `runBlocking` 的上下文
        // 在CommonPool中創(chuàng)建CPU密集型的任務(wù)
        val job = launch(coroutineContext + CommonPool) {
            println("job: I am a child of the request coroutine, but with a different dispatcher")
            delay(1000)
            println("job: I will not execute this line if my parent request is cancelled")
        }
        job.join() // 子任務(wù)完成時(shí),請求完成
    }
    delay(500)
    request.cancel() // 取消請求的處理
    delay(1000) // 延遲1s看看有啥發(fā)生
    println("main: Who has survived request cancellation?")
}

預(yù)期結(jié)果如下:

job: I am a child of the request coroutine, but with a different dispatcher
main: Who has survived request cancellation?

當(dāng)?shù)呢?zé)任

父協(xié)程總是會(huì)等所有的子協(xié)程執(zhí)行完成膀值。父協(xié)程不必顯式的記錄所有其啟動(dòng)的子協(xié)程棍丐,也不必使用Job.join等待其子協(xié)程執(zhí)行完成。

fun main(args: Array<String>) = runBlocking<Unit> {
    // 啟動(dòng)一個(gè)協(xié)程處理請求
    val request = launch {
        repeat(3) { i -> // 啟動(dòng)幾個(gè)子協(xié)程
            launch(coroutineContext)  {
                delay((i + 1) * 200L) // 可變延遲 200ms, 400ms, 600ms
                println("Coroutine $i is done")
            }
        }
        println("request: I'm done and I don't explicitly join my children that are still active")
    }
    request.join() // 等待請求完成沧踏,也包括其子協(xié)程
    println("Now processing of the request is complete")
}

結(jié)果如下:

request: I'm done and I don't explicitly join my children that are still active
Coroutine 0 is done
Coroutine 1 is done
Coroutine 2 is done
Now processing of the request is complete

給協(xié)程命名方便調(diào)試

當(dāng)協(xié)程日志很頻繁或者你只想關(guān)聯(lián)相同協(xié)程產(chǎn)生的日志記錄時(shí)歌逢,自動(dòng)生成id是挺好的。然而翘狱,當(dāng)協(xié)程固定的處理一個(gè)特別的請求秘案,或者處理特定的后臺(tái)任務(wù),為了調(diào)試潦匈,還是命名比較好阱高。CoroutineName上下文元素與線程名稱的功能一致,在調(diào)試默認(rèn)打開的時(shí)茬缩,執(zhí)行協(xié)程的線程名稱將會(huì)展示為CoroutineName赤惊。
下面的例子展示了這個(gè)理念:

fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")

fun main(args: Array<String>) = runBlocking(CoroutineName("main")) {
    log("Started main coroutine")
    // 啟動(dòng)兩個(gè)后臺(tái)計(jì)算
    val v1 = async(CoroutineName("v1coroutine")) {
        delay(500)
        log("Computing v1")
        252
    }
    val v2 = async(CoroutineName("v2coroutine")) {
        delay(1000)
        log("Computing v2")
        6
    }
    log("The answer for v1 / v2 = ${v1.await() / v2.await()}")
}

當(dāng)有JVM參數(shù)-Dkotlinx.coroutines.debug時(shí),產(chǎn)生如下結(jié)果:

[main @main#1] Started main coroutine
[ForkJoinPool.commonPool-worker-1 @v1coroutine#2] Computing v1
[ForkJoinPool.commonPool-worker-2 @v2coroutine#3] Computing v2
[main @main#1] The answer for v1 / v2 = 42

通過指定任務(wù)取消執(zhí)行

現(xiàn)在凰锡,我們已經(jīng)了解了上下文未舟,父子關(guān)系和任務(wù)圈暗,讓我們把這些玩意兒放一塊耍耍。假設(shè)我們的應(yīng)用有一個(gè)有生命周期的對象处面,這個(gè)對象不是協(xié)程厂置。例如,我們寫一個(gè)Android應(yīng)用時(shí)魂角,在Android Activity上下文中啟動(dòng)了各種各樣的協(xié)程用于異步獲取數(shù)據(jù)和動(dòng)畫計(jì)算。當(dāng)activity銷毀時(shí)智绸,所有的協(xié)程都得取消掉野揪,避免內(nèi)存泄漏。

我們一個(gè)創(chuàng)建一個(gè)跟activity綁定的Job實(shí)例瞧栗,用于管理我們的協(xié)程斯稳。Job實(shí)例由Job()工廠創(chuàng)建,等會(huì)例子會(huì)演示迹恐。為了方便理解挣惰,我們可以launch(coroutineContext, parent = job)這樣寫,說明用了父job殴边,而不是用launch(coroutineContext + job)表達(dá)式憎茂。

現(xiàn)在,一個(gè)Job.cancel調(diào)用锤岸,將會(huì)所有我們啟動(dòng)的所有協(xié)程竖幔。此外,Job.join等待所有子協(xié)程完成是偷,因此在下面的例子中我們也可以用cancelAndJoin:

fun main(args: Array<String>) = runBlocking<Unit> {
    val job = Job() // 創(chuàng)建一個(gè)Job來專利我們自己協(xié)程的生命周期
    // 為了演示拳氢,啟動(dòng)10個(gè)協(xié)程,每個(gè)運(yùn)行不同的時(shí)間
    val coroutines = List(10) { i ->
        // 都是我們job對象的兒子
        launch(coroutineContext, parent = job) { // 使用runBlocking的上下文蛋铆,但是用我們自己的job
            delay((i + 1) * 200L) // 花樣等待
            println("Coroutine $i is done")
        }
    }
    println("Launched ${coroutines.size} coroutines")
    delay(500L) // 延遲500ms
    println("Cancelling the job!")
    job.cancelAndJoin() // 取消所有的任務(wù)馋评,并等待其完成
}

輸出如下:

Launched 10 coroutines
Coroutine 0 is done
Coroutine 1 is done
Cancelling the job!

如你所見,只有前兩個(gè)協(xié)程打印了消息刺啦,其他都被一單個(gè)job.cancelAndJoin()給取消掉了留特。所以,在我們假想的Android應(yīng)用中洪燥,需要做的磕秤,就是在activity創(chuàng)建的時(shí)候創(chuàng)建一個(gè)父job,然后在子協(xié)程創(chuàng)建的時(shí)候使用這個(gè)job捧韵,最后市咆,在activity銷毀時(shí)取消掉這個(gè)job即可。在Android生命周期中再来,我們不能join它們蒙兰,因?yàn)槭峭降牧琢觥T诮ㄔ旌蠖朔?wù)時(shí),join用于保證有限的資源訪問是很有用的搜变。

通道(Channels)

延期值(Deferred values)提供了一個(gè)在協(xié)程中轉(zhuǎn)移單個(gè)值的便捷方式采缚。通道提供了一個(gè)方式來轉(zhuǎn)移數(shù)據(jù)流。

通道基礎(chǔ)

Channel在概念上與BlockingQueue非常相似挠他。不同之處是前者用可掛起的send替代后者阻塞的put操作扳抽,前者用可掛起的receive替代后者是阻塞的take操作。

fun main(args: Array<String>) = runBlocking<Unit> {
    val channel = Channel<Int>()
    launch {
        // 這里可能是重度消耗CPU的計(jì)算殖侵,或是異步邏輯贸呢,這里我們就發(fā)送幾個(gè)平方數(shù)
        for (x in 1..5) channel.send(x * x)
    }
    // 這里打印5個(gè)收到的整數(shù)
    repeat(5) { println(channel.receive()) }
    println("Done!")
}

結(jié)果如下:

1
4
9
16
25
Done!

關(guān)閉和遍歷通道

不像隊(duì)列,通道可以關(guān)閉用于表明沒有更多的元素會(huì)過來了拢军。在接收端楞陷,用for循環(huán)可以很方便的接受通道中傳來的元素。
概念上來說茉唉,close就像給通道傳遞一個(gè)特殊的關(guān)閉令牌固蛾。在接受到關(guān)閉令牌之時(shí),迭代就會(huì)停止度陆,因此艾凯,這里保證了關(guān)閉之前發(fā)送的元素都被接收到了:

fun main(args: Array<String>) = runBlocking<Unit> {
    val channel = Channel<Int>()
    launch {
        for (x in 1..5) channel.send(x * x)
        channel.close() // 發(fā)完了
    }
    // 用for循環(huán)打印接收值 (在通過關(guān)閉之前)
    for (y in channel) println(y)
    println("Done!")
}
建造通道生產(chǎn)者

協(xié)程生產(chǎn)一序列元素的模式是比較常見的。這是在并發(fā)代碼中經(jīng)臣嵛撸可以發(fā)現(xiàn)的生產(chǎn)者-消費(fèi)者模式的一部分览芳。你可以將生產(chǎn)者抽象為一個(gè)以通道為參數(shù)的函數(shù),但與常識(shí)相背的是鸿竖,結(jié)果需要被函數(shù)返回沧竟。

fun produceSquares() = produce<Int> {
    for (x in 1..5) send(x * x)
}

fun main(args: Array<String>) = runBlocking<Unit> {
    val squares = produceSquares()
    squares.consumeEach { println(it) }
    println("Done!")
}
流水線

流水線,一種協(xié)程可產(chǎn)生無限數(shù)據(jù)流的模式缚忧。

fun produceNumbers() = produce<Int> {
    var x = 1
    while (true) send(x++) // 從1開始的無限整數(shù)流
}

另一個(gè)協(xié)程或多個(gè)協(xié)程會(huì)消費(fèi)這個(gè)流悟泵,做一些處理,然后產(chǎn)出結(jié)果闪水。下面的例子中糕非,這些數(shù)會(huì)被平方:

fun square(numbers: ReceiveChannel<Int>) = produce<Int> {
    for (x in numbers) send(x * x)
}

下面的代碼啟動(dòng)然后連接整個(gè)流水線:

fun main(args: Array<String>) = runBlocking<Unit> {
    val numbers = produceNumbers() // 產(chǎn)生從1開始的整數(shù)流
    val squares = square(numbers) // 平方整數(shù)
    for (i in 1..5) println(squares.receive()) // 打印前五個(gè)
    println("Done!") 
    squares.cancel() // 在大型應(yīng)用中,需要關(guān)閉這些協(xié)程
    numbers.cancel()
}

在上面這個(gè)例子中球榆,我們不用取消掉這些協(xié)程朽肥,因?yàn)閰f(xié)程就像守護(hù)線程一樣。但是在大點(diǎn)的應(yīng)用中持钉,如果我們不需要了衡招,那就要停止掉流水線∶壳浚或者始腾,我們可以將流水線協(xié)程作為主協(xié)程的兒子州刽,接下來會(huì)演示。

流水線獲取素?cái)?shù)

下面舉例將流水線應(yīng)用到極致——用流水線協(xié)程生成素?cái)?shù)浪箭。首先穗椅,生成一個(gè)無限的整數(shù)序列。這次我們傳如一個(gè)context參數(shù)奶栖,并將這個(gè)參數(shù)傳遞給produce建造器匹表,因此,調(diào)用方可以控制協(xié)程在哪跑:

fun numbersFrom(context: CoroutineContext, start: Int) = produce<Int>(context) {
    var x = start
    while (true) send(x++) // 從start開始的無限整數(shù)流
}

下面的流水線過濾掉了所有不能被給定素?cái)?shù)除盡的數(shù):

fun filter(context: CoroutineContext, numbers: ReceiveChannel<Int>, prime: Int) = produce<Int>(context) {
    for (x in numbers) if (x % prime != 0) send(x)
}

現(xiàn)在驼抹,我們建立一個(gè)從2開始的整數(shù)流桑孩,從當(dāng)前的通道獲取素?cái)?shù),然后為找到的素?cái)?shù)開啟新的通道:

numbersFrom(2) -> filter(2) -> filter(3) -> filter(5) -> filter(7) ...

下面的例子打印了前十個(gè)素?cái)?shù)框冀,在主線程的上下文中運(yùn)行了整個(gè)流水線。因?yàn)樗械膮f(xié)程是作為runBlocking協(xié)程上下文的兒子啟動(dòng)的敏簿,所以我們不必將所有我們啟動(dòng)的協(xié)程列下來明也,我們用cancelChildren擴(kuò)展函數(shù)取消所有的子協(xié)程。

fun main(args: Array<String>) = runBlocking<Unit> {
    var cur = numbersFrom(coroutineContext, 2)
    for (i in 1..10) {
        val prime = cur.receive()
        println(prime)
        cur = filter(coroutineContext, cur, prime)
    }
    coroutineContext.cancelChildren() // 取消所有的子協(xié)程惯裕,從而讓主線程退出
}

輸出如下:

2
3
5
7
11
13
17
19
23
29

注意温数,你可以用標(biāo)準(zhǔn)庫中的buildIterator協(xié)程建造器建造相同的流水線。將producebuildIterator替換蜻势,sendyield替換撑刺,receivenext替換
ReceiveChannelIterator替換握玛,并去掉上下文够傍。你也不用runBlocking了。然而挠铲,上面展示的流水線使用通道的好處冕屯,就是能夠充分利用多核CPU(如果在CommonPool上下文運(yùn)行)。

扇出

多個(gè)協(xié)程可以從同一個(gè)通道接收拂苹,任務(wù)散布于多個(gè)協(xié)程之間安聘。讓我們從一個(gè)周期產(chǎn)生整數(shù)(1秒10個(gè)數(shù))的生產(chǎn)者協(xié)程開始:

fun produceNumbers() = produce<Int> {
    var x = 1 // 從1開始
    while (true) {
        send(x++) // 生產(chǎn)下一個(gè)
        delay(100) // 等1s
    }
}

我們可以有多個(gè)處理者協(xié)程,在這個(gè)例子中瓢棒,就只打印他們的id和接收到的數(shù)字:

fun launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
    for (msg in channel) {
        println("Processor #$id received $msg")
    }    
}

讓我們啟動(dòng)5個(gè)處理者浴韭,讓它們運(yùn)行將近1秒,看看會(huì)發(fā)生什么:

fun main(args: Array<String>) = runBlocking<Unit> {
    val producer = produceNumbers()
    repeat(5) { launchProcessor(it, producer) }
    delay(950)
    producer.cancel() // 取消掉生產(chǎn)者協(xié)程脯宿,這樣就能將所有的協(xié)程取消
}

輸出與下面的結(jié)果類似念颈,盡管接收每個(gè)特定整數(shù)的處理器ID可能不同。

Processor #2 received 1
Processor #4 received 2
Processor #0 received 3
Processor #1 received 4
Processor #3 received 5
Processor #2 received 6
Processor #4 received 7
Processor #0 received 8
Processor #1 received 9
Processor #3 received 10

注意嗅绰,取消生產(chǎn)者協(xié)程關(guān)閉其通道舍肠,最終會(huì)結(jié)束處理者協(xié)程遍歷通道搀继。
同樣的,注意我們在launchProcessor代碼中如何用for循環(huán)遍歷通道實(shí)現(xiàn)扇出的翠语。不像consumeEach叽躯,for循環(huán)風(fēng)格在多協(xié)程使用時(shí)是妥妥安全的。如果肌括,其中的一個(gè)協(xié)程掛掉了点骑,其他的協(xié)程還會(huì)繼續(xù)處理通道。而當(dāng)處理者用consumeEach遍歷時(shí)谍夭,正澈诘危或非正常結(jié)束都會(huì)將通道給取消掉。

扇入

多個(gè)協(xié)程可以向相同通道發(fā)送紧索。例如袁辈,我們有一個(gè)字符串通道,一個(gè)有特定延遲周期發(fā)送特定字符串到通道的掛起函數(shù)珠漂。

suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
    while (true) {
        delay(time)
        channel.send(s)
    }
}

現(xiàn)在晚缩,來看看如果啟動(dòng)兩個(gè)協(xié)程發(fā)送字符串會(huì)怎么樣(這個(gè)例子中,我們在主線程上下文中啟動(dòng)它們):

fun main(args: Array<String>) = runBlocking<Unit> {
    val channel = Channel<String>()
    launch(coroutineContext) { sendString(channel, "foo", 200L) }
    launch(coroutineContext) { sendString(channel, "BAR!", 500L) }
    repeat(6) { // 接收頭6個(gè)
        println(channel.receive())
    }
    coroutineContext.cancelChildren() // 取消所有的子協(xié)程媳危,讓主線程結(jié)束
}

輸出是:

foo
foo
BAR!
foo
foo
BAR!

帶緩沖區(qū)的通道

前面展示的通道都是不帶緩沖的荞彼。沒有緩沖區(qū)的通道只有在發(fā)送者和接收者見到了彼此才傳遞元素。如果發(fā)送先調(diào)用了待笑,那么它將掛起直到接收被調(diào)用鸣皂;如果接收先調(diào)用了,那么它將掛起直到發(fā)送被調(diào)用暮蹂。
Channel()工廠函數(shù)和produce建造器接收一個(gè)可選的用來指定緩沖區(qū)大小capacity參數(shù)寞缝,緩沖區(qū)可以讓發(fā)送者在掛起之前發(fā)送多個(gè)元素,跟指定容量的BlockingQueue類似椎侠,在緩沖區(qū)滿了之后阻塞第租。
看看下面的代碼會(huì)有啥效果:

fun main(args: Array<String>) = runBlocking<Unit> {
    val channel = Channel<Int>(4) // 創(chuàng)建帶緩沖區(qū)的通道
    val sender = launch(coroutineContext) { // 啟動(dòng)發(fā)送者協(xié)程
        repeat(10) {
            println("Sending $it") // 在發(fā)送之前先打印
            channel.send(it) // 將會(huì)在緩沖區(qū)滿的時(shí)候掛起
        }
    }
    // 什么都不做,等著
    delay(1000)
    sender.cancel() // 取消發(fā)送者協(xié)程
}

用緩沖區(qū)大小為4的通道我纪,打印了5次慎宾。

Sending 0
Sending 1
Sending 2
Sending 3
Sending 4

頭4個(gè)元素加入到了緩沖區(qū),當(dāng)試圖加入第五個(gè)的時(shí)候浅悉,發(fā)送者被掛起了趟据。

時(shí)鐘通道

時(shí)鐘通道是一種特別的單緩沖區(qū)通道,每次自上次從此通道消費(fèi)后术健,在給定時(shí)間后會(huì)生產(chǎn)一個(gè)Unit汹碱。單獨(dú)使用看起來像沒什么用,但是在構(gòu)建復(fù)雜的基于時(shí)間的生產(chǎn)流水線,然后操作者做一些窗口和其他基于時(shí)間的處理時(shí)特別有用拌夏。

ticker工廠方法創(chuàng)建時(shí)鐘通道,后續(xù)元素不再需要時(shí)锥惋,用ReceiveChannel.cancel取消掉跪腹。

看看實(shí)際中如何應(yīng)用:

fun main(args: Array<String>) = runBlocking<Unit> {
    val tickerChannel = ticker(delay = 100, initialDelay = 0) // 創(chuàng)建時(shí)鐘通道
    var nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
    println("Initial element is available immediately: $nextElement") // 最初的延遲還沒結(jié)束

    nextElement = withTimeoutOrNull(50) { tickerChannel.receive() } // 后續(xù)的元素都有100ms延遲
    println("Next element is not ready in 50 ms: $nextElement")

    nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
    println("Next element is ready in 100 ms: $nextElement")

    // 模擬長時(shí)間消費(fèi)延遲
    println("Consumer pauses for 150ms")
    delay(150)
    // 下個(gè)元素立即可用
    nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
    println("Next element is available immediately after large consumer delay: $nextElement")
    // receive方法調(diào)用間的暫停也算進(jìn)去了褂删,下一個(gè)元素會(huì)更快收到
    nextElement = withTimeoutOrNull(60) { tickerChannel.receive() } 
    println("Next element is ready in 50ms after consumer pause in 150ms: $nextElement")

    tickerChannel.cancel() // 后面的不要了
}

會(huì)打印下面幾行:

Initial element is available immediately: kotlin.Unit
Next element is not ready in 50 ms: null
Next element is ready in 100 ms: kotlin.Unit
Consumer pauses for 150ms
Next element is available immediately after large consumer delay: kotlin.Unit
Next element is ready in 50ms after consumer pause in 150ms: kotlin.Unit

ticker知道消費(fèi)者停頓,如果有停頓冲茸,默認(rèn)調(diào)整下次產(chǎn)生產(chǎn)生元素的延遲屯阀,嘗試維護(hù)產(chǎn)生元素的固定速率。

一個(gè)可選的參數(shù)mode轴术,如果指定為TickerMode.FIXED_PERIOD难衰,那么ticker會(huì)維護(hù)一個(gè)元素間固定延遲。默認(rèn)是TickerMode.FIXED_DELAY逗栽。

這里多講講兩個(gè)模式的區(qū)別盖袭,后面再舉個(gè)例子說明區(qū)別。
TickerMode.FIXED_PERIOD: 為了保持產(chǎn)生元素的速率彼宠,會(huì)調(diào)整下一個(gè)元素產(chǎn)生的延遲苍凛。
TickerMode.FIXED_DELAY: 固定的延遲產(chǎn)生元素。

區(qū)分兩個(gè)模式的例子兵志。

fun log(msg: String) {
   println("[${Date()}] $msg")
}

fun main(args: Array<String>) = runBlocking<Unit> {
   val tickerChannel = ticker(delay = 5000, initialDelay = 0, mode = TickerMode.FIXED_DELAY)

   var i = 0
   for (item in tickerChannel) {
       log("receive $item")
       val time = if (i++ % 2 == 0) 4000 else 6000 // 切換使用4s/6s延遲
       delay(time)
   }
}

如果用TickerMode.FIXED_DELAY模式:

[Sun Jul 22 16:36:17 CST 2018] receive kotlin.Unit
[Sun Jul 22 16:36:22 CST 2018] receive kotlin.Unit
[Sun Jul 22 16:36:28 CST 2018] receive kotlin.Unit
[Sun Jul 22 16:36:33 CST 2018] receive kotlin.Unit
[Sun Jul 22 16:36:39 CST 2018] receive kotlin.Unit

如果用TickerMode.FIXED_PERIOD模式:

[Sun Jul 22 16:43:52 CST 2018] receive kotlin.Unit
[Sun Jul 22 16:43:57 CST 2018] receive kotlin.Unit
[Sun Jul 22 16:44:03 CST 2018] receive kotlin.Unit
[Sun Jul 22 16:44:07 CST 2018] receive kotlin.Unit
[Sun Jul 22 16:44:13 CST 2018] receive kotlin.Unit
[Sun Jul 22 16:44:17 CST 2018] receive kotlin.Unit

第一次延遲都是5s,后面的區(qū)別是FIXED_DELAY延遲在5/6s間切換宣肚;FIXED_PERIOD的延遲在4/6s間切換想罕。相信大家已經(jīng)能夠區(qū)分了。

通道是公平的

對于從多個(gè)協(xié)程調(diào)用通道的順序霉涨,向通道發(fā)送和接收操作是公平的按价。按照先進(jìn)先出的順序進(jìn)出通道,例如笙瑟,第一個(gè)調(diào)用receive的協(xié)程獲得元素楼镐。下面的例子中,兩個(gè)協(xié)程("ping"和"pong")從同一個(gè)通道"table"接收"ball"對象往枷。

data class Ball(var hits: Int)

fun main(args: Array<String>) = runBlocking<Unit> {
    val table = Channel<Ball>() // 公用一張桌子
    launch(coroutineContext) { player("ping", table) }
    launch(coroutineContext) { player("pong", table) }
    table.send(Ball(0)) // 發(fā)球
    delay(1000) // 延遲一秒
    coroutineContext.cancelChildren() // 游戲結(jié)束框产,取消它們
}

suspend fun player(name: String, table: Channel<Ball>) {
    for (ball in table) { // 在循環(huán)中接球
        ball.hits++
        println("$name $ball")
        delay(300) // 等一會(huì)
        table.send(ball) // 將球擊回
    }
}

"ping"協(xié)程先開始的,所以错洁,它最先收到球秉宿。即使"ping"協(xié)程在將球擊回桌面后立即再次開始接球,但球還是給"pong"協(xié)程接到了屯碴,因?yàn)?pong"早等著在了:

ping Ball(hits=1)
pong Ball(hits=2)
ping Ball(hits=3)
pong Ball(hits=4)

有時(shí)候描睦,通道可能會(huì)產(chǎn)生看起不公平的執(zhí)行,是因協(xié)程使用到的線程池所致导而。

共享可變狀態(tài)和并發(fā)

協(xié)程可以用多線程的調(diào)度器(例如默認(rèn)的CommonPool)并發(fā)執(zhí)行忱叭。那么并發(fā)問題也接踵而至隔崎。主要問題是同步訪問共享可變狀態(tài)。在協(xié)程領(lǐng)域里韵丑,這個(gè)問題的解決方案有些與多線程領(lǐng)域中類似爵卒,但是有些則截然不同。

問題

讓我們啟動(dòng)1000個(gè)協(xié)程埂息,做同樣的事情1000次(一共一百萬次執(zhí)行)技潘。為了一會(huì)做比較,我們記錄下執(zhí)行時(shí)間:

suspend fun massiveRun(context: CoroutineContext, action: suspend () -> Unit) {
    val n = 1000 // 啟動(dòng)協(xié)程的數(shù)量
    val k = 1000 // 每個(gè)協(xié)程執(zhí)行動(dòng)作的次數(shù)
    val time = measureTimeMillis {
        val jobs = List(n) {
            launch(context) {
                repeat(k) { action() }
            }
        }
        jobs.forEach { it.join() }
    }
    println("Completed ${n * k} actions in $time ms")    
}

我們以一個(gè)非常簡單的動(dòng)作千康,在多線程的CommonPool上下文下享幽,累加一個(gè)共享的變量。

var counter = 0

fun main(args: Array<String>) = runBlocking<Unit> {
    massiveRun(CommonPool) {
        counter++
    }
    println("Counter = $counter")
}

最終會(huì)打印出個(gè)啥拾弃?應(yīng)該不會(huì)是"Counter = 1000000"值桩,因?yàn)?000個(gè)協(xié)程在多個(gè)線程同步的累加counter而沒有進(jìn)行同步。

注意:如果你的計(jì)算機(jī)只有2核或更少豪椿,那么你會(huì)一直得到1000000奔坟,因?yàn)?code>CommonPool在這種情況下是單線程的。為了“造成”這個(gè)問題搭盾,需要把代碼改改:

val mtContext = newFixedThreadPoolContext(2, "mtPool") // 定義一個(gè)2線程的上下文
var counter = 0

fun main(args: Array<String>) = runBlocking<Unit> {
    massiveRun(mtContext) { // 替代上面例子的CommonPool
        counter++
    }
    println("Counter = $counter")
}
volatile也愛莫能助

有個(gè)常見的誤解咳秉,以為volatile可以解決并發(fā)問題。試試看:

@Volatile // kotlin 中鸯隅, volatile 是個(gè)注解 
var counter = 0

fun main(args: Array<String>) = runBlocking<Unit> {
    massiveRun(CommonPool) {
        counter++
    }
    println("Counter = $counter")
}

代碼跑得更慢了澜建,但最終也沒能得到"Counter = 1000000",因?yàn)?code>volatile保證了順序的讀和寫蝌以,但是對大的操作(這里是累加)不保證原子性炕舵。

線程安全的數(shù)據(jù)結(jié)構(gòu)

對協(xié)程和線程都通用的解決方案是利用一個(gè)線程安全的數(shù)據(jù)結(jié)構(gòu),這個(gè)數(shù)據(jù)結(jié)構(gòu)提供對共享狀態(tài)必要的同步操作跟畅。在上面的例子中咽筋,我們可以用AtomicInteger類,它有個(gè)incrementAndGet方法:

var counter = AtomicInteger()

fun main(args: Array<String>) = runBlocking<Unit> {
    massiveRun(CommonPool) {
        counter.incrementAndGet()
    }
    println("Counter = ${counter.get()}")
}

這是針對這個(gè)問題最快的解決方法徊件。這種解決方法適用于普通計(jì)數(shù)器奸攻,集合,隊(duì)列和其他標(biāo)準(zhǔn)數(shù)據(jù)結(jié)構(gòu)以及它們的基本操作庇忌。但是不容易擴(kuò)展到復(fù)雜狀態(tài)或沒有現(xiàn)成的線程安全實(shí)現(xiàn)的復(fù)雜操作舞箍。

細(xì)粒度線程限制

線程限制一種解決共享可變狀態(tài)的方式,它將共享變量的訪問限制到單個(gè)線程上皆疹。在UI應(yīng)用中很適用疏橄,因?yàn)閁I狀態(tài)一般都限制于事件派發(fā)或應(yīng)用線程。在協(xié)程中用單線程上下文很容易實(shí)現(xiàn):

val counterContext = newSingleThreadContext("CounterContext")
var counter = 0

fun main(args: Array<String>) = runBlocking<Unit> {
    massiveRun(CommonPool) { // 在CommonPool中運(yùn)行每個(gè)協(xié)程
        withContext(counterContext) { // 但是在單個(gè)線程中累加
            counter++
        }
    }
    println("Counter = $counter")
}

這個(gè)代碼跑的很慢,因?yàn)樽龅搅思?xì)粒度的線程限制捎迫。每個(gè)獨(dú)立的累加都利用withContext塊從多線程CommonPool上下文切換到單線程的上下文晃酒。

粗粒度線程限制

實(shí)際應(yīng)用中,線程限制是在大塊中執(zhí)行的窄绒。例如贝次,一大塊更新狀態(tài)的業(yè)務(wù)邏輯限制于單個(gè)線程。例如下面這個(gè)例子彰导,在單線程的上下文中運(yùn)行每個(gè)協(xié)程:

val counterContext = newSingleThreadContext("CounterContext")
var counter = 0

fun main(args: Array<String>) = runBlocking<Unit> {
    massiveRun(counterContext) { // 在單線程的上下文中運(yùn)行每個(gè)協(xié)程
        counter++
    }
    println("Counter = $counter")
}

這就快多了蛔翅,而且結(jié)果是對的。

互斥操作

互斥位谋,利用一個(gè)用不會(huì)并發(fā)執(zhí)行的臨界區(qū)山析,保護(hù)對共享狀態(tài)的修改。在阻塞的世界里掏父,通常用synchronizedReentrantLock達(dá)到互斥笋轨。協(xié)程中的替代品叫做Mutex。它用lockunlock方法界定臨界區(qū)赊淑。關(guān)鍵不同之處是爵政,Mutex.lock()是掛起函數(shù),不會(huì)阻塞線程陶缺。
有一個(gè)擴(kuò)展函數(shù)withLock钾挟,方便的幫你做了mutex.lock(); try { ... } finally { mutex.unlock() }這事:

val mutex = Mutex()
var counter = 0

fun main(args: Array<String>) = runBlocking<Unit> {
    massiveRun(CommonPool) {
        mutex.withLock {
            counter++        
        }
    }
    println("Counter = $counter")
}

這個(gè)例子中的鎖是細(xì)粒度的,因此饱岸,是有代價(jià)的等龙。然而,在有些你必須周期修改共享狀態(tài)的情況下伶贰,鎖是個(gè)好選擇。同時(shí)罐栈,這個(gè)狀態(tài)沒有限制到某個(gè)線程上黍衙。

Actors

actor是由協(xié)程、限制并包含于此協(xié)程的狀態(tài)荠诬、與其他協(xié)程交流的通道組成琅翻。一個(gè)簡單的actor可以寫成一個(gè)函數(shù),但是復(fù)雜狀態(tài)的actor更適合寫成一個(gè)類柑贞。

有個(gè)actor協(xié)程建造器方椎,它可以方便地將actor的郵箱通道組合到其范圍內(nèi),以便從發(fā)送通道接收消息并將發(fā)送通道組合到生成的job對象中钧嘶。因此棠众,單個(gè)actor的引用就攜帶了上面所有的東西。

使用actor的第一步是定義一個(gè)actor要處理的消息類。Kotlin的密封類(sealed class)非常適合這個(gè)目的闸拿。首先定一個(gè)CounterMsg密封類空盼,子類IncCounter作為增加計(jì)數(shù)器的消息,GetCounter作為獲取計(jì)數(shù)器值的消新荤,后者需要發(fā)送回復(fù)揽趾。CompletableDeferred表示將來已知的單個(gè)值,此處用于發(fā)送回復(fù)苛骨。

// counterActor的消息類型
sealed class CounterMsg
object IncCounter : CounterMsg() // 增加計(jì)數(shù)器的單向消息
class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg() // 帶回復(fù)的請求

然后我們定義一個(gè)使用actor協(xié)程建造器啟動(dòng)actor的函數(shù):

// 此函數(shù)啟動(dòng)一個(gè)新的計(jì)數(shù)器actor
fun counterActor() = actor<CounterMsg> {
    var counter = 0 // actor 狀態(tài)
    for (msg in channel) { // 遍歷進(jìn)來的消息
        when (msg) {
            is IncCounter -> counter++
            is GetCounter -> msg.response.complete(counter)
        }
    }
}

主要代碼很簡單:

fun main(args: Array<String>) = runBlocking<Unit> {
    val counter = counterActor() // 創(chuàng)建actor
    massiveRun(CommonPool) {
        counter.send(IncCounter)
    }
    // 發(fā)送一個(gè)消息篱瞎,用于從actor中獲取計(jì)算器的值
    val response = CompletableDeferred<Int>()
    counter.send(GetCounter(response))
    println("Counter = ${response.await()}")
    counter.close() // 關(guān)閉actor
}

actor自身執(zhí)行的上下文無關(guān)緊要。一個(gè)actor是一個(gè)協(xié)程痒芝,協(xié)程是順序執(zhí)行的俐筋,因此,將狀態(tài)限制在特定協(xié)程中可以解決共享可變狀態(tài)的問題吼野。實(shí)際上校哎,actor可以修改自己的私有狀態(tài),但只能通過消息相互影響(避免任何鎖定)瞳步。

actor比在負(fù)載下鎖定更有效闷哆,因?yàn)樵谶@種情況下它總是有工作要做,而且根本不需要切換到不同的上下文单起。

注意抱怔,actor協(xié)程構(gòu)建器是produce協(xié)程構(gòu)建器的雙重構(gòu)件。actor與它接收消息的通道相關(guān)聯(lián)嘀倒,produce與它發(fā)送元素的通道相關(guān)聯(lián)屈留。

Select表達(dá)式

Select表達(dá)式可同時(shí)讓多個(gè)掛起函數(shù)等待,并選擇第一個(gè)使其激活测蘑。

從通道中選擇

讓我們先創(chuàng)建兩個(gè)生產(chǎn)字符串的生產(chǎn)者:fizzbuzz灌危。fizz每300ms產(chǎn)生一個(gè)"Fizz"字符串:

fun fizz(context: CoroutineContext) = produce<String>(context) {
    while (true) { // 每300ms發(fā)送一個(gè)"Fizz"
        delay(300)
        send("Fizz")
    }
}

buzz每500ms產(chǎn)生一個(gè)"Buzz"字符串:

fun buzz(context: CoroutineContext) = produce<String>(context) {
    while (true) { // 每500ms發(fā)送一個(gè)"Buzz!"
        delay(500)
        send("Buzz!")
    }
}

使用receive掛起函數(shù),我們可以從一個(gè)通道或另一個(gè)通道接收碳胳。但select表達(dá)式允許我們使用其onReceive子句同時(shí)從兩者接收:

suspend fun selectFizzBuzz(fizz: ReceiveChannel<String>, buzz: ReceiveChannel<String>) {
    select<Unit> { // <Unit> 意味著select表達(dá)式?jīng)]有返回 
        fizz.onReceive { value ->  // 第一個(gè)子句
            println("fizz -> '$value'")
        }
        buzz.onReceive { value ->  // 第二個(gè)子句
            println("buzz -> '$value'")
        }
    }
}

跑七次這個(gè)函數(shù):

fun main(args: Array<String>) = runBlocking<Unit> {
    val fizz = fizz(coroutineContext)
    val buzz = buzz(coroutineContext)
    repeat(7) {
        selectFizzBuzz(fizz, buzz)
    }
    coroutineContext.cancelChildren() // 取消倆協(xié)程
}

結(jié)果如下:

fizz -> 'Fizz'
buzz -> 'Buzz!'
fizz -> 'Fizz'
fizz -> 'Fizz'
buzz -> 'Buzz!'
fizz -> 'Fizz'
buzz -> 'Buzz!'

選擇關(guān)閉

當(dāng)通道關(guān)閉時(shí)勇蝙,select中的onReceive子句報(bào)錯(cuò),導(dǎo)致相應(yīng)的select拋出異常挨约。我們可以使用onReceiveOrNull子句在關(guān)閉通道時(shí)執(zhí)行特定操作味混。以下示例還顯示select是一個(gè)返回其選擇好的子句結(jié)果的表達(dá)式:

suspend fun selectAorB(a: ReceiveChannel<String>, b: ReceiveChannel<String>): String =
    select<String> {
        a.onReceiveOrNull { value -> 
            if (value == null) 
                "Channel 'a' is closed" 
            else 
                "a -> '$value'"
        }
        b.onReceiveOrNull { value -> 
            if (value == null) 
                "Channel 'b' is closed"
            else    
                "b -> '$value'"
        }
    }

讓我們來使用這個(gè)函數(shù),傳入產(chǎn)生"Hello"字符串四次的通道a和產(chǎn)生"World"四次的頻道b

fun main(args: Array<String>) = runBlocking<Unit> {
    // 為了可預(yù)測結(jié)果诫惭,我們使用主線程上下文
    val a = produce<String>(coroutineContext) {
        repeat(4) { send("Hello $it") }
    }
    val b = produce<String>(coroutineContext) {
        repeat(4) { send("World $it") }
    }
    repeat(8) { // 打印頭8個(gè)結(jié)果
        println(selectAorB(a, b))
    }
    coroutineContext.cancelChildren()    
}

觀察得處下列結(jié)論:
首先翁锡,select傾向于第一個(gè)子句。當(dāng)多個(gè)子句同時(shí)可選時(shí)夕土,它們中的第一個(gè)被選擇馆衔。這里,兩個(gè)通道都不斷的產(chǎn)生字符串,a作為第一個(gè)哈踱,勝荒适。然后,因?yàn)槲覀冇玫氖遣粠Ь彌_池的通道开镣,a在調(diào)用send時(shí)不時(shí)會(huì)掛起刀诬,就給了機(jī)會(huì)讓b也來一發(fā)。

選擇發(fā)送

select表達(dá)式具有onSend子句邪财,可以與選擇的偏見性結(jié)合使用陕壹。

讓我們寫一個(gè)整數(shù)生產(chǎn)者的例子,當(dāng)主通道上的消費(fèi)者無法跟上它時(shí)树埠,它會(huì)將其值發(fā)送到side通道糠馆。

fun produceNumbers(context: CoroutineContext, side: SendChannel<Int>) = produce<Int>(context) {
    for (num in 1..10) { // 產(chǎn)生10個(gè)數(shù)字
        delay(100) // 每個(gè)延遲 100 ms
        select<Unit> {
            onSend(num) {} // 發(fā)送給主通道
            side.onSend(num) {} // or to the side channel     
        }
    }
}

消費(fèi)者將會(huì)非常緩慢,需要250毫秒才能處理每個(gè)數(shù)字:

fun main(args: Array<String>) = runBlocking<Unit> {
    val side = Channel<Int>() // 分配side通道
    launch(coroutineContext) { // 給side通道一個(gè)快速的消費(fèi)者
        side.consumeEach { println("Side channel has $it") }
    }
    produceNumbers(coroutineContext, side).consumeEach { 
        println("Consuming $it")
        delay(250) // 慢慢消費(fèi)怎憋,不著急
    }
    println("Done consuming")
    coroutineContext.cancelChildren()    
}

看看發(fā)生什么:

Consuming 1
Side channel has 2
Side channel has 3
Consuming 4
Side channel has 5
Side channel has 6
Consuming 7
Side channel has 8
Side channel has 9
Consuming 10
Done consuming

選擇延期值

可以使用onAwait子句選擇延遲值又碌。讓我們從一個(gè)異步函數(shù)開始,該函數(shù)在隨機(jī)延遲后返回一個(gè)延遲字符串值:

fun asyncString(time: Int) = async {
    delay(time.toLong())
    "Waited for $time ms"
}

讓我們隨機(jī)延遲開始十幾個(gè)绊袋。

fun asyncStringsList(): List<Deferred<String>> {
    val random = Random(3)
    return List(12) { asyncString(random.nextInt(1000)) }
}

現(xiàn)在毕匀,主函數(shù)等待第一個(gè)函數(shù)完成并計(jì)算仍處于活動(dòng)狀態(tài)的延遲值的數(shù)量。注意癌别,我們在這里使用了select表達(dá)式是Kotlin DSL的事實(shí)皂岔,因此我們可以使用任意代碼為它提供子句。在這個(gè)例子中展姐,我們遍歷一個(gè)延遲值列表躁垛,為每個(gè)延遲值提供onAwait子句。

fun main(args: Array<String>) = runBlocking<Unit> {
    val list = asyncStringsList()
    val result = select<String> {
        list.withIndex().forEach { (index, deferred) ->
            deferred.onAwait { answer ->
                "Deferred $index produced answer '$answer'"
            }
        }
    }
    println(result)
    val countActive = list.count { it.isActive }
    println("$countActive coroutines are still active")
}

輸出如下:

Deferred 4 produced answer 'Waited for 128 ms'
11 coroutines are still active

切換延遲值的通道

讓我們編寫一個(gè)消費(fèi)延遲字符串值通道的通道建造器函數(shù)圾笨,直到下一個(gè)延遲值結(jié)束或通道關(guān)閉之前教馆,等待每個(gè)接收到的延遲值。此示例將同一select中的onReceiveOrNullonAwait子句放在一起:

fun switchMapDeferreds(input: ReceiveChannel<Deferred<String>>) = produce<String> {
    var current = input.receive() // 從第一個(gè)收到的延遲值開始
    while (isActive) { // 當(dāng)通道沒取消時(shí)循環(huán)
        val next = select<Deferred<String>?> { // 從此select返回下一個(gè)延遲值或null
            input.onReceiveOrNull { update ->
                update // 換下一個(gè)值等
            }
            current.onAwait { value ->  
                send(value) // 發(fā)送當(dāng)前延遲值產(chǎn)生的數(shù)據(jù)
                input.receiveOrNull() // 使用輸入通道的下一個(gè)延遲值
            }
        }
        if (next == null) {
            println("Channel was closed")
            break // 退出循環(huán)
        } else {
            current = next
        }
    }
}

為了測試它擂达,我們將使用一個(gè)簡單的異步函數(shù)活玲,它在指定的時(shí)間后返回指定的字符串:

fun asyncString(str: String, time: Long) = async {
    delay(time)
    str
}

main函數(shù)只是啟動(dòng)一個(gè)協(xié)同程序來打印switchMapDeferreds的結(jié)果并向它發(fā)送一些測試數(shù)據(jù):

fun main(args: Array<String>) = runBlocking<Unit> {
    val chan = Channel<Deferred<String>>() // 測試通道
    launch(coroutineContext) { // 開啟打印協(xié)程
        for (s in switchMapDeferreds(chan)) 
            println(s) // 打印收到的字符串
    }
    chan.send(asyncString("BEGIN", 100))
    delay(200) // 夠"BEGIN"生產(chǎn)出來了
    chan.send(asyncString("Slow", 500))
    delay(100) // 不夠生產(chǎn)"Slow"的時(shí)間
    chan.send(asyncString("Replace", 100))
    delay(500) // 發(fā)送最后的字符串之前給點(diǎn)時(shí)間
    chan.send(asyncString("END", 500))
    delay(1000) // 給時(shí)間運(yùn)行
    chan.close() // 關(guān)閉通道
    delay(500) // 等運(yùn)行完
}

結(jié)果是:

BEGIN
Replace
END
Channel was closed

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市谍婉,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌镀钓,老刑警劉巖穗熬,帶你破解...
    沈念sama閱讀 206,968評論 6 482
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異丁溅,居然都是意外死亡唤蔗,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,601評論 2 382
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來妓柜,“玉大人箱季,你說我怎么就攤上這事」髌” “怎么了藏雏?”我有些...
    開封第一講書人閱讀 153,220評論 0 344
  • 文/不壞的土叔 我叫張陵,是天一觀的道長作煌。 經(jīng)常有香客問我掘殴,道長,這世上最難降的妖魔是什么粟誓? 我笑而不...
    開封第一講書人閱讀 55,416評論 1 279
  • 正文 為了忘掉前任奏寨,我火速辦了婚禮,結(jié)果婚禮上鹰服,老公的妹妹穿的比我還像新娘病瞳。我一直安慰自己,他們只是感情好悲酷,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,425評論 5 374
  • 文/花漫 我一把揭開白布套菜。 她就那樣靜靜地躺著,像睡著了一般舔涎。 火紅的嫁衣襯著肌膚如雪笼踩。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,144評論 1 285
  • 那天亡嫌,我揣著相機(jī)與錄音嚎于,去河邊找鬼。 笑死挟冠,一個(gè)胖子當(dāng)著我的面吹牛于购,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播知染,決...
    沈念sama閱讀 38,432評論 3 401
  • 文/蒼蘭香墨 我猛地睜開眼肋僧,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了控淡?” 一聲冷哼從身側(cè)響起嫌吠,我...
    開封第一講書人閱讀 37,088評論 0 261
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎掺炭,沒想到半個(gè)月后辫诅,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,586評論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡涧狮,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,028評論 2 325
  • 正文 我和宋清朗相戀三年炕矮,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了么夫。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,137評論 1 334
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡肤视,死狀恐怖档痪,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情邢滑,我是刑警寧澤腐螟,帶...
    沈念sama閱讀 33,783評論 4 324
  • 正文 年R本政府宣布,位于F島的核電站殊鞭,受9級特大地震影響遭垛,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜操灿,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,343評論 3 307
  • 文/蒙蒙 一锯仪、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧趾盐,春花似錦庶喜、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,333評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至本缠,卻和暖如春斥扛,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背丹锹。 一陣腳步聲響...
    開封第一講書人閱讀 31,559評論 1 262
  • 我被黑心中介騙來泰國打工稀颁, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人楣黍。 一個(gè)月前我還...
    沈念sama閱讀 45,595評論 2 355
  • 正文 我出身青樓匾灶,卻偏偏與公主長得像,于是被迫代替她去往敵國和親租漂。 傳聞我的和親對象是個(gè)殘疾皇子阶女,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,901評論 2 345

推薦閱讀更多精彩內(nèi)容