這篇文章大部分內(nèi)容來自:https://github.com/Kotlin/kotlinx.coroutines/blob/master/coroutines-guide.md
這篇教程基于一系列的例子來講解
kotlinx.coroutines
的核心特性
筆者使用的kotlin版本為1.2.51
属瓣,協(xié)程核心庫的版本為0.23.4
注意:協(xié)程庫還處于實(shí)驗(yàn)階段课竣,API是不穩(wěn)定的,謹(jǐn)慎用于生產(chǎn)環(huán)境
簡介&安裝
作為一個(gè)語言胁编,kotlin僅在標(biāo)準(zhǔn)庫里提供最少的底層API华匾,從而讓其他庫能利用協(xié)程构罗。不像其他有相似能力的語言,async
和await
不是kotlin的關(guān)鍵字溪北,甚至不是標(biāo)準(zhǔn)庫的一部分桐绒。
kotlinx.coroutines
是一個(gè)非常豐富的庫夺脾,包含若干高層協(xié)程啟動(dòng)機(jī)制(launch
,async
等)。你需要添加kotlinx-coroutines-core
模塊的依賴才能在你的項(xiàng)目中使用這些機(jī)制茉继。
<!-- 筆者寫這篇文章時(shí)劳翰,最新的kotlin版本為1.2.51 -->
<properties>
<kotlin.version>1.2.51</kotlin.version>
</properties>
<dependency>
<groupId>org.jetbrains.kotlinx</groupId>
<artifactId>kotlinx-coroutines-core</artifactId>
<version>0.23.4</version>
</dependency>
基本概念
這個(gè)章節(jié)覆蓋了協(xié)程的基本概念。
你的第一個(gè)協(xié)程
運(yùn)行下面的代碼:
fun main(args: Array<String>) {
launch { // 在后臺(tái)啟動(dòng)一個(gè)新的協(xié)程馒疹,然后繼續(xù)執(zhí)行
delay(1000L) // 不阻塞的延遲1s
println("World!") // 延遲后打印
}
println("Hello,") // 當(dāng)協(xié)程延遲時(shí)佳簸,主線程還在跑
Thread.sleep(2000L) // 阻塞主線程2s,為了讓jvm不掛掉
}
運(yùn)行結(jié)果:
Hello,
World!
本質(zhì)上颖变,協(xié)程是輕量級的線程生均。可以使用launch
協(xié)程建造器啟動(dòng)腥刹。你可以將launch { ... }
替換為thread { ... }
马胧,delay(...)
替換為Thread.sleep(...)
以達(dá)到相同的效果。試試看衔峰。
如果你只把launch
替換為thread
佩脊,編譯器會(huì)產(chǎn)生如下錯(cuò)誤:
Suspend functions are only allowed to be called from a coroutine or another suspend function
這是因?yàn)?code>delay是一個(gè)特殊的函數(shù),這里暫且稱之為掛起函數(shù)垫卤,它不會(huì)阻塞線程威彰,但是會(huì)掛起協(xié)程,而且它只能在協(xié)程中使用穴肘。
連接阻塞和非阻塞世界
第一個(gè)例子在同一塊代碼中混合了非阻塞的delay(...)
和阻塞的Thread.sleep(...)
歇盼,很容易就搞暈了哪個(gè)是阻塞的,哪個(gè)是非阻塞的评抚。下面豹缀,我們使用runBlocking
協(xié)程建造器,明確指明阻塞:
fun main(args: Array<String>) {
launch { // 在后臺(tái)啟動(dòng)一個(gè)新的協(xié)程慨代,然后繼續(xù)執(zhí)行
delay(1000L)
println("World!")
}
println("Hello,") // 主線程立即繼續(xù)跑
runBlocking { // 這塊阻塞了主線程
delay(2000L) // 延遲2s邢笙,讓jvm不掛掉
}
}
結(jié)果還是一樣的,但是這代碼只用了非阻塞的dalay
侍匙。主線程調(diào)用了runBlocking
氮惯,然后一直被阻塞,一直到runBlocking
執(zhí)行完成丈积。
這個(gè)例子可以改得更符合語言習(xí)慣些筐骇,用runBlocking
包裝主函數(shù)的執(zhí)行:
fun main(args: Array<String>) = runBlocking<Unit> { // 開始主協(xié)程
launch { // 在后臺(tái)啟動(dòng)一個(gè)新的協(xié)程,然后繼續(xù)執(zhí)行
delay(1000L)
println("World!")
}
println("Hello,") // 主協(xié)程立即繼續(xù)跑
delay(2000L) // 延遲2s江滨,讓jvm不掛掉
}
這里runBlocking<Unit> { ... }
的作用像一個(gè)適配器铛纬,用來啟動(dòng)頂層的主協(xié)程。明確指定是Unit
返回類型唬滑,是因?yàn)橐粋€(gè)格式良好的kotlin主函數(shù)必須返回Unit
告唆。
下面是為掛起函數(shù)寫單元測試的方法:
class MyTest {
@Test
fun testMySuspendingFunction() = runBlocking<Unit> {
// 這里我們可以通過任何我們喜歡的斷言風(fēng)格使用掛起函數(shù)
}
}
等待任務(wù)(job)
當(dāng)另一個(gè)協(xié)程在運(yùn)行時(shí)棺弊,延遲一段時(shí)間并不是一個(gè)好辦法。讓我們明確的等待(非阻塞的方式)擒悬,直到我們啟動(dòng)的后臺(tái)任務(wù)完成:
fun main(args: Array<String>) = runBlocking<Unit> {
val job = launch { // 啟動(dòng)一個(gè)新協(xié)程模她,并創(chuàng)建一個(gè)對其任務(wù)的引用
delay(1000L)
println("World!")
}
println("Hello,")
job.join() // 等到子協(xié)程完成
}
結(jié)果還是一樣的,但是主協(xié)程和后臺(tái)任務(wù)沒有用后臺(tái)任務(wù)的執(zhí)行時(shí)間聯(lián)系在一起懂牧。好多了侈净。
提取函數(shù)重構(gòu)
讓我們來提取出launch { ... }
塊中的代碼到另一個(gè)函數(shù)中。當(dāng)你用“提取函數(shù)”重構(gòu)這塊代碼時(shí)僧凤,你會(huì)得到一個(gè)用suspend
修飾的新函數(shù)畜侦。這是你一個(gè)掛起函數(shù)。掛起函數(shù)可用于協(xié)程中躯保,就像使用普通函數(shù)一樣旋膳,但是它們有額外的特性——可以調(diào)用其他的掛起函數(shù)去掛起協(xié)程的執(zhí)行,像這個(gè)例子中的delay
途事。
fun main(args: Array<String>) = runBlocking<Unit> {
val job = launch { doWorld() }
println("Hello,")
job.join()
}
// 這是你第一個(gè)掛起函數(shù)
suspend fun doWorld() {
delay(1000L)
println("World!")
}
協(xié)程是輕量級的
運(yùn)行下面的代碼:
fun main(args: Array<String>) = runBlocking<Unit> {
val jobs = List(100_000) { // 啟動(dòng)大量的協(xié)程验懊,并返回它們的任務(wù)
launch {
delay(1000L)
print(".")
}
}
jobs.forEach { it.join() } // 等待其他全部的任務(wù)完成
}
這里啟動(dòng)了十萬個(gè)協(xié)程,一秒之后尸变,每個(gè)協(xié)程打印了一個(gè)點(diǎn)义图。你用線程試試?(很有可能就OOM了)
協(xié)程像守護(hù)線程
下面的代碼啟動(dòng)了一個(gè)長時(shí)間運(yùn)行的協(xié)程振惰,一秒打印兩次"I'm sleeping"歌溉,然后延遲一段后,從主函數(shù)返回:
fun main(args: Array<String>) = runBlocking<Unit> {
launch {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
}
delay(1300L) // 延遲后就退出
}
你運(yùn)行看看骑晶,打印了三行,然后就結(jié)束了:
I'm sleeping 0 ...
I'm sleeping 1 ...
I'm sleeping 2 ...
活躍的協(xié)程并不會(huì)辈莼郏活進(jìn)程桶蛔,所以它更像守護(hù)線程。
取消和超時(shí)
這個(gè)章節(jié)包含了協(xié)程的取消和超時(shí)漫谷。
取消協(xié)程執(zhí)行
在小應(yīng)用中仔雷,從主函數(shù)返回看起來是個(gè)結(jié)束所有協(xié)程的好辦法。在更大的舔示、長時(shí)間運(yùn)行的應(yīng)用中碟婆,需要更細(xì)粒度的控制。launch
函數(shù)返回了一個(gè)可以取消協(xié)程執(zhí)行的Job
:
fun main(args: Array<String>) = runBlocking<Unit> {
val job = launch {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
}
delay(1300L) // 延遲一小會(huì)
println("main: I'm tired of waiting!")
job.cancel() // 取消任務(wù)
job.join() // 等待任務(wù)結(jié)束
println("main: Now I can quit.")
}
運(yùn)行惕稻,產(chǎn)生如下輸出:
I'm sleeping 0 ...
I'm sleeping 1 ...
I'm sleeping 2 ...
main: I'm tired of waiting!
main: Now I can quit.
在調(diào)用job.cancel
不久后竖共,因?yàn)閰f(xié)程被取消掉了,所以看不到任何輸出了俺祠。Job
的擴(kuò)展函數(shù)cancelAndJoin
結(jié)合了cancel
和join
的作用公给。
取消是需要配合的
協(xié)程的取消是需要配合的借帘,協(xié)程的代碼必須可配合取消。所有kotlinx.coroutines
中的掛起函數(shù)都是可取消的淌铐,這些掛起函數(shù)會(huì)檢查協(xié)程的取消狀態(tài)肺然,若已取消則拋出CancellationException
。然而腿准,如果協(xié)程正處于運(yùn)算中际起,沒有檢查取消狀態(tài),那么其不可被取消吐葱,如下所示:
fun main(args: Array<String>) = runBlocking<Unit> {
val startTime = System.currentTimeMillis()
val job = launch {
var nextPrintTime = startTime
var i = 0
while (i < 5) { // 浪費(fèi)CPU的循環(huán)運(yùn)算
// 2秒打印一個(gè)消息
if (System.currentTimeMillis() >= nextPrintTime) {
println("I'm sleeping ${i++} ...")
nextPrintTime += 500L
}
}
}
delay(1300L) // 延遲一會(huì)
println("main: I'm tired of waiting!")
job.cancelAndJoin() // 取消任務(wù)加叁,并等待其結(jié)束
println("main: Now I can quit.")
}
運(yùn)行看看。結(jié)果是唇撬,在取消之后它匕,其持續(xù)打印"I'm sleeping",直到循環(huán)5次之后窖认,任務(wù)自己結(jié)束豫柬。
使運(yùn)算代碼可取消
兩種方式使運(yùn)算代碼可取消。
- 周期執(zhí)行掛起函數(shù)扑浸,檢查取消狀態(tài)烧给。
yield
函數(shù)是達(dá)到這個(gè)目的的好辦法。 - 顯式的檢查取消狀態(tài)喝噪。
我們來嘗試下第二種方式:
fun main(args: Array<String>) = runBlocking<Unit> {
val startTime = System.currentTimeMillis()
val job = launch {
var nextPrintTime = startTime
var i = 0
while (isActive) { // 可取消的運(yùn)算
// 一秒打印兩次消息
if (System.currentTimeMillis() >= nextPrintTime) {
println("I'm sleeping ${i++} ...")
nextPrintTime += 500L
}
}
}
delay(1300L) // 延遲一會(huì)
println("main: I'm tired of waiting!")
job.cancelAndJoin() // 取消任務(wù)础嫡,并等待其結(jié)束
println("main: Now I can quit.")
}
現(xiàn)在,循環(huán)就是可取消的了酝惧。isActive
是協(xié)程內(nèi)CoroutineScope
對象的的一個(gè)屬性榴鼎。
用finally釋放資源
可取消的掛起函數(shù)在取消時(shí)會(huì)拋出CancellationException
,通常的方式就可以處理了晚唇。例如巫财,try {...} finally {...}
表達(dá)式或Kotlinuse
(use api)函數(shù),會(huì)在協(xié)程取消時(shí)哩陕,執(zhí)行結(jié)束動(dòng)作平项。
fun main(args: Array<String>) = runBlocking<Unit> {
val job = launch {
try {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
} finally {
println("I'm running finally")
}
}
delay(1300L) // 延遲一會(huì)
println("main: I'm tired of waiting!")
job.cancelAndJoin() // 取消任務(wù),并等待其結(jié)束
println("main: Now I can quit.")
}
join
和cancelAndJoin
都會(huì)等所有結(jié)束動(dòng)作完成悍及,因此以上代碼的輸出如下:
I'm sleeping 0 ...
I'm sleeping 1 ...
I'm sleeping 2 ...
main: I'm tired of waiting!
I'm running finally
main: Now I can quit.
運(yùn)行不可取消的代碼塊
任何嘗試在finally
塊中使用掛起函數(shù)均會(huì)產(chǎn)生CancellationException
闽瓢,因?yàn)檫\(yùn)行代碼的協(xié)程已經(jīng)被取消了。通常心赶,這不是個(gè)問題扣讼,因?yàn)樗杏辛己脤?shí)現(xiàn)的關(guān)閉操作(關(guān)閉文件,取消任務(wù)园担,或關(guān)閉任何種類的溝通通道)通常是非阻塞的届谈,并不需要掛起函數(shù)參與枯夜。但是,在很少的情況下艰山,你需要在取消的協(xié)程中進(jìn)行掛起操作湖雹,那么你可以將相應(yīng)代碼的使用withContext(NonCancellable) {...}
包裝,這里使用了withContext
函數(shù)和NonCancellable
上下文,如下所示:
fun main(args: Array<String>) = runBlocking<Unit> {
val job = launch {
try {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
} finally {
withContext(NonCancellable) {
println("I'm running finally")
delay(1000L)
println("And I've just delayed for 1 sec because I'm non-cancellable")
}
}
}
delay(1300L) // 延遲一會(huì)
println("main: I'm tired of waiting!")
job.cancelAndJoin() // 取消任務(wù),并等待其結(jié)束
println("main: Now I can quit.")
}
超時(shí)
超時(shí)孽文,是實(shí)際應(yīng)用中取消協(xié)程執(zhí)行最顯而易見的原因皮官,因?yàn)槠鋱?zhí)行時(shí)間超時(shí)了靠粪。你還在用手動(dòng)記錄相應(yīng)任務(wù)的引用,然后啟動(dòng)另一個(gè)協(xié)程在延遲一段時(shí)間后取消記錄的那個(gè)協(xié)程?不用那么麻煩啦,這里有個(gè)`withTimeout``函數(shù)诗箍,幫你做了這些工作⊥彀Γ看看吧:
fun main(args: Array<String>) = runBlocking<Unit> {
withTimeout(1300L) {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
}
}
輸出如下:
I'm sleeping 0 ...
I'm sleeping 1 ...
I'm sleeping 2 ...
Exception in thread "main" kotlinx.coroutines.experimental.TimeoutCancellationException: Timed out waiting for 1300 MILLISECONDS
at kotlinx.coroutines.experimental.ScheduledKt.TimeoutCancellationException(Scheduled.kt:202)
at kotlinx.coroutines.experimental.TimeoutCoroutine.run(Scheduled.kt:100)
at kotlinx.coroutines.experimental.EventLoopBase$DelayedRunnableTask.run(EventLoop.kt:322)
at kotlinx.coroutines.experimental.EventLoopBase.processNextEvent(EventLoop.kt:148)
at kotlinx.coroutines.experimental.BlockingCoroutine.joinBlocking(Builders.kt:82)
at kotlinx.coroutines.experimental.BuildersKt__BuildersKt.runBlocking(Builders.kt:58)
...
TimeoutCancellationException
是由withTimeout
拋出的CancellationException
的子類滤祖。之前,我們沒有在控制臺(tái)看到過異常堆棧信息瓶籽,因?yàn)樵谝粋€(gè)取消了的協(xié)程中匠童,CancellationException
通常是一個(gè)結(jié)束協(xié)程的正常原因。然而塑顺,這個(gè)例子中汤求,我們正好在main
函數(shù)中使用了withTimeout
。
因?yàn)槿∠且粋€(gè)異常严拒,因此所有的資源將要被正常的關(guān)閉扬绪。如果你需要針對超時(shí)做一些額外的處理,可以將代碼用try {...} catch (e: TimeoutCancellationException) {...}
包裝糙俗,或者使用與withTimeout
類似的withTimeoutOrNull
勒奇,后者返回null
而不是拋出異常:
fun main(args: Array<String>) = runBlocking<Unit> {
val result = withTimeoutOrNull(1300L) {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
"Done" // will get cancelled before it produces this result
}
println("Result is $result")
}
這次就沒有異常了:
I'm sleeping 0 ...
I'm sleeping 1 ...
I'm sleeping 2 ...
Result is null
組合掛起函數(shù)
這個(gè)章節(jié)覆蓋了組合掛起函數(shù)的多種方式。
默認(rèn)是順序的
假設(shè)我們有倆定義好有用的掛起函數(shù)巧骚,例如遠(yuǎn)程服務(wù)調(diào)用,或者計(jì)算格二。這里劈彪,我們先假設(shè)這倆有用,實(shí)際上就是延遲一小會(huì):
suspend fun doSomethingUsefulOne(): Int {
delay(1000L) // 假裝有一波騷操作
return 13
}
suspend fun doSomethingUsefulTwo(): Int {
delay(1000L) // 假裝有一波騷操作
return 29
}
如果我們要順序執(zhí)行他們顶猜,先執(zhí)行doSomethingUsefulOne
沧奴,再執(zhí)行doSomethingUsefulTwo
,然后其計(jì)算結(jié)果之和长窄,怎么搞滔吠?實(shí)際使用中纲菌,需要用第一個(gè)函數(shù)的返回值來判斷是否需要調(diào)用第二個(gè)函數(shù)或如何去調(diào),才會(huì)這么做疮绷。
我們用順序調(diào)用就可以了翰舌,因?yàn)閰f(xié)程中的代碼和普通的代碼一樣,默認(rèn)是順序執(zhí)行的冬骚。下面的例子通過測量倆掛起函數(shù)總的執(zhí)行時(shí)間來演示:
fun main(args: Array<String>) = runBlocking<Unit> {
val time = measureTimeMillis {
val one = doSomethingUsefulOne()
val two = doSomethingUsefulTwo()
println("The answer is ${one + two}")
}
println("Completed in $time ms")
}
結(jié)果近似如下:
The answer is 42
Completed in 2017 ms
用async來并發(fā)
如果doSomethingUsefulOne
和doSomethingUsefulTwo
的執(zhí)行沒有依賴關(guān)系椅贱,我們想通過并發(fā)來更快的獲取到結(jié)果,那該怎么做呢只冻?async
就是干這茬的庇麦。
概念上來講,async
就跟launch
類似喜德。其啟動(dòng)了一個(gè)與其他協(xié)程并發(fā)運(yùn)行單獨(dú)協(xié)程(輕量級線程)山橄。區(qū)別是,launch
返回了一個(gè)不攜帶任何結(jié)果的Job
舍悯,但是async
返回了一個(gè)Deferred
航棱,一個(gè)輕量級非阻塞的future,表示一會(huì)就會(huì)返回結(jié)果的承諾贱呐。你可以在一個(gè)延期的值(deferred value)使用.await()
來獲取最終的結(jié)果丧诺,但Deferred
也是個(gè)Job
,因此奄薇,需要的話驳阎,你也可以取消掉。
fun main(args: Array<String>) = runBlocking<Unit> {
val time = measureTimeMillis {
val one = async { doSomethingUsefulOne() }
val two = async { doSomethingUsefulTwo() }
println("The answer is ${one.await() + two.await()}")
}
println("Completed in $time ms")
}
產(chǎn)生如下輸出:
The answer is 42
Completed in 1017 ms
快了兩倍馁蒂,因?yàn)橛昧藘蓚€(gè)協(xié)程并發(fā)執(zhí)行呵晚。注意,協(xié)程的并發(fā)性總是明確的(多個(gè)協(xié)程同時(shí)運(yùn)行沫屡,那么肯定是并發(fā)的)饵隙。
懶啟動(dòng)async
async
有個(gè)懶加載選項(xiàng),配置其可選參數(shù)start
沮脖,值設(shè)置為CoroutineStart.LAZY
金矛。只在值被await
需要時(shí),或start
函數(shù)被調(diào)用時(shí)才啟動(dòng)協(xié)程勺届。運(yùn)行下面的例子驶俊,跟前面的例子就多了個(gè)選項(xiàng):
fun main(args: Array<String>) = runBlocking<Unit> {
val time = measureTimeMillis {
val one = async(start = CoroutineStart.LAZY) { doSomethingUsefulOne() }
val two = async(start = CoroutineStart.LAZY) { doSomethingUsefulTwo() }
println("The answer is ${one.await() + two.await()}")
}
println("Completed in $time ms")
}
產(chǎn)生如下輸出:
The answer is 42
Completed in 2017 ms
好吧,又回到了順序執(zhí)行免姿,首先我們啟動(dòng)并等待one
饼酿,然后啟動(dòng)并等待two
。 這并不是懶執(zhí)行的預(yù)期場景。這個(gè)設(shè)計(jì)是用來替換標(biāo)準(zhǔn)的lazy
函數(shù)故俐,如果其計(jì)算涉及了掛起函數(shù)想鹰。
異步風(fēng)格的函數(shù)
我們可以用async
協(xié)程建造器定義異步風(fēng)格的函數(shù),異步的調(diào)用doSomethingUsefulOne
和 doSomethingUsefulTwo
药版。給這些函數(shù)加上Async
后綴是一個(gè)很好的風(fēng)格辑舷,強(qiáng)調(diào)了他們是異步計(jì)算的,需要用延期的值來獲取結(jié)果刚陡。
// somethingUsefulOneAsync 的結(jié)果是 Deferred<Int> 類型
fun somethingUsefulOneAsync() = async {
doSomethingUsefulOne()
}
// somethingUsefulTwoAsync 的結(jié)果是 Deferred<Int> 類型
fun somethingUsefulTwoAsync() = async {
doSomethingUsefulTwo()
}
注意惩妇,這些xxxAsync
函數(shù)不是掛起函數(shù),它們隨處均可使用筐乳。但是它們的使用總是意味著其行為是異步(也相當(dāng)于并發(fā))執(zhí)行的歌殃。
下面的例子展示了在協(xié)程之外的使用:
// 注意,這里沒有用runBlocking
fun main(args: Array<String>) {
val time = measureTimeMillis {
// 我們可以在協(xié)程外部初始化異步操作
val one = somethingUsefulOneAsync()
val two = somethingUsefulTwoAsync()
// 但是等待結(jié)果必須涉及掛起或阻塞
// 這里蝙云,我們用`runBlocking { ... }`阻塞主線程來獲取結(jié)果
runBlocking {
println("The answer is ${one.await() + two.await()}")
}
}
println("Completed in $time ms")
}
結(jié)果如下:
The answer is 42
Completed in 1128 ms
協(xié)程的上下文(context)和調(diào)度器(dispatchers)
協(xié)程總是在上下文中執(zhí)行氓皱,上下文代表的值是CoroutineContext
,定義在Kotlin標(biāo)準(zhǔn)庫中勃刨。
協(xié)程上下文是一系列的元素波材,主要的元素包括我們之前看到過的協(xié)程的Job
,還有調(diào)度器身隐,這個(gè)章節(jié)會(huì)介紹廷区。
調(diào)度器和線程
協(xié)程上下文包括了一個(gè)決定相應(yīng)協(xié)程在哪個(gè)或哪些線程執(zhí)行的協(xié)程調(diào)度器(參見 CoroutineDispatcher)。協(xié)程調(diào)度器可以限制協(xié)程在具體的線程中執(zhí)行贾铝,或調(diào)度到一個(gè)線程池隙轻,或者無限制運(yùn)行。
所有像launch
或async
一樣的協(xié)程建造器都接受一個(gè)可選的CoroutineContext
參數(shù)垢揩,這個(gè)參數(shù)可以用來顯式指定調(diào)度器和其他上下文元素玖绿。
嘗試下面的例子:
fun main(args: Array<String>) = runBlocking<Unit> {
val jobs = arrayListOf<Job>()
jobs += launch(Unconfined) { // 沒有限制 - 將在主線程執(zhí)行
println(" 'Unconfined': I'm working in thread ${Thread.currentThread().name}")
}
jobs += launch(coroutineContext) { // 父協(xié)程的上下文,runBlocking 協(xié)程
println("'coroutineContext': I'm working in thread ${Thread.currentThread().name}")
}
jobs += launch(CommonPool) { // 將會(huì)調(diào)度到ForkJoinPool.commonPool(或等價(jià)的地方)
println(" 'CommonPool': I'm working in thread ${Thread.currentThread().name}")
}
jobs += launch(newSingleThreadContext("MyOwnThread")) { // 新線程
println(" 'newSTC': I'm working in thread ${Thread.currentThread().name}")
}
jobs.forEach { it.join() }
}
產(chǎn)生如下輸出(可能順序不同):
'Unconfined': I'm working in thread main
'CommonPool': I'm working in thread ForkJoinPool.commonPool-worker-1
'newSTC': I'm working in thread MyOwnThread
'coroutineContext': I'm working in thread main
之前章節(jié)中使用的默認(rèn)調(diào)度器是DefaultDispatcher
叁巨,當(dāng)前的實(shí)現(xiàn)中等同于CommonPool
斑匪。因此,launch { ... }
==launch(DefaultDispatcher) { ... }
==launch(CommonPool) { ... }
锋勺。
父coroutineContext
和Unconfined
上下文的區(qū)別一會(huì)看蚀瘸。
注意,newSingleThreadContext
創(chuàng)建了一個(gè)新的線程庶橱,這是非常昂貴的資源苍姜。在實(shí)際應(yīng)用中,要么用完之后就用close
函數(shù)回收悬包,要么就存儲(chǔ)在頂層變量中,在應(yīng)用中到處復(fù)用馍乙。
非限制(Unconfined) VS 限制(confined) 調(diào)度器
Unconfined
協(xié)程調(diào)度器在調(diào)用線程啟動(dòng)協(xié)程布近,但直到第一個(gè)掛起點(diǎn)之前垫释。在掛起之后在什么線程恢復(fù)全權(quán)由之前調(diào)用的掛起函數(shù)決定。Unconfined
調(diào)度器適合在協(xié)程不消耗CPU時(shí)間或不更新任何限制于特定線程共享數(shù)據(jù)(類似UI)的場景撑瞧。
再說coroutineContext
屬性棵譬,它在任何協(xié)程中均可用,引用當(dāng)前協(xié)程的上下文预伺。通過這種方式订咸,父上下文可以被繼承。特別的酬诀,runBlocking
創(chuàng)建的協(xié)程默認(rèn)調(diào)度器限定到調(diào)用者線程脏嚷,因此,繼承runBlocking
的上下文就有了使用可預(yù)測的先進(jìn)先出調(diào)度限制在這個(gè)線程內(nèi)執(zhí)行的作用瞒御。
fun main(args: Array<String>) = runBlocking<Unit> {
val jobs = arrayListOf<Job>()
jobs += launch(Unconfined) { // 沒有限制 -- 在主線程運(yùn)行
println(" 'Unconfined': I'm working in thread ${Thread.currentThread().name}")
delay(500)
println(" 'Unconfined': After delay in thread ${Thread.currentThread().name}")
}
jobs += launch(coroutineContext) { // 父(runBlocking協(xié)程)上下文,
println("'coroutineContext': I'm working in thread ${Thread.currentThread().name}")
delay(1000)
println("'coroutineContext': After delay in thread ${Thread.currentThread().name}")
}
jobs.forEach { it.join() }
}
輸出結(jié)果:
'Unconfined': I'm working in thread main
'coroutineContext': I'm working in thread main
'Unconfined': After delay in thread kotlinx.coroutines.DefaultExecutor
'coroutineContext': After delay in thread main
因此父叙,繼承了runBlocking {...}
的coroutineContext
的協(xié)程繼續(xù)在main
線程執(zhí)行,而沒有限制的協(xié)程在delay
函數(shù)使用的默認(rèn)線程池線程中恢復(fù)肴裙。
調(diào)試協(xié)程和線程
協(xié)程用Unconfined
或默認(rèn)的多線程調(diào)度器可以從一個(gè)線程掛起趾唱,從另一個(gè)線程恢復(fù)。即使是用單線程的調(diào)度器蜻懦,也很難知道協(xié)程在什么地方甜癞,什么時(shí)候在干什么。在多線程應(yīng)用中宛乃,在日志中打印出線程的名字是一個(gè)通常的做法悠咱。一般的日志框架也是支持這個(gè)特性的。但當(dāng)使用協(xié)程時(shí)烤惊,僅線程名稱對上下文的描述不夠充分乔煞,因此,kotlinx.coroutines
包含的設(shè)施讓調(diào)試更容易柒室。
給JVM參數(shù)加上-Dkotlinx.coroutines.debug
渡贾,然后運(yùn)行下面的代碼:
fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
fun main(args: Array<String>) = runBlocking<Unit> {
val a = async(coroutineContext) {
log("I'm computing a piece of the answer")
6
}
val b = async(coroutineContext) {
log("I'm computing another piece of the answer")
7
}
log("The answer is ${a.await() * b.await()}")
}
三個(gè)協(xié)程:
主協(xié)程(#1) - runBlocking
創(chuàng)建的協(xié)程
a
(#2)、b
(#3) - 兩個(gè)計(jì)算延遲返回值的協(xié)程
都在runBlocking
的上下文限定在主線程中執(zhí)行雄右,輸出如下:
[main @coroutine#2] I'm computing a piece of the answer
[main @coroutine#3] I'm computing another piece of the answer
[main @coroutine#1] The answer is 42
log
函數(shù)在方括號(hào)中打印出線程名稱和當(dāng)前執(zhí)行的協(xié)程標(biāo)識(shí)空骚,調(diào)試模式開啟的時(shí)候,這個(gè)標(biāo)識(shí)會(huì)連續(xù)的賦值給創(chuàng)建的協(xié)程擂仍。
線程間切換
給JVM參數(shù)加上-Dkotlinx.coroutines.debug囤屹,然后運(yùn)行下面的代碼:
fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
fun main(args: Array<String>) {
newSingleThreadContext("Ctx1").use { ctx1 ->
newSingleThreadContext("Ctx2").use { ctx2 ->
runBlocking(ctx1) {
log("Started in ctx1")
withContext(ctx2) {
log("Working in ctx2")
}
log("Back to ctx1")
}
}
}
}
這個(gè)例子演示了幾種新技術(shù)。一是使用runBlocking
時(shí)逢渔,指定了特定的上下文肋坚;二是使用withContext
函數(shù)切換協(xié)程的上下文,但依然是在相同的協(xié)程中執(zhí)行。輸出如下:
[Ctx1 @coroutine#1] Started in ctx1
[Ctx2 @coroutine#1] Working in ctx2
[Ctx1 @coroutine#1] Back to ctx1
注意:這里使用了kotlin標(biāo)準(zhǔn)庫里的use
函數(shù)智厌,用于當(dāng)newSingleThreadContext
創(chuàng)建的線程不再被需要時(shí)诲泌,將其釋放。
上下文中的任務(wù)(Job)
協(xié)程的任務(wù)是上下文的一部分铣鹏。協(xié)程可以取出自己上下文的任務(wù)敷扫,用coroutineContext[Job]
表達(dá)式:
fun main(args: Array<String>) = runBlocking<Unit> {
println("My job is ${coroutineContext[Job]}")
}
在調(diào)試模式下,輸出如下:
My job is "coroutine#1":BlockingCoroutine{Active}@6d311334
因此诚卸,CoroutineScope
中的isActive
是coroutineContext[Job]?.isActive == true
的便捷寫法葵第。
協(xié)程的父子關(guān)系
當(dāng)協(xié)程的coroutineContext
用來啟動(dòng)另一個(gè)協(xié)程,那么新協(xié)程的Job
就成了父協(xié)程Job
的兒子合溺。想父協(xié)程取消的時(shí)候卒密,所有的子協(xié)程也會(huì)遞歸取消。
fun main(args: Array<String>) = runBlocking<Unit> {
// 啟動(dòng)一個(gè)協(xié)程來處理請求
val request = launch {
// 生成兩個(gè)任務(wù)辫愉,一個(gè)有自己的上下文
val job1 = launch {
println("job1: I have my own context and execute independently!")
delay(1000)
println("job1: I am not affected by cancellation of the request")
}
// 另一個(gè)繼承父上下文
val job2 = launch(coroutineContext) {
delay(100)
println("job2: I am a child of the request coroutine")
delay(1000)
println("job2: I will not execute this line if my parent request is cancelled")
}
// 當(dāng)子任務(wù)完成栅受,請求才算完成
job1.join()
job2.join()
}
delay(500)
request.cancel() // 取消請求
delay(1000) // 延遲1s,看看會(huì)發(fā)生什么
println("main: Who has survived request cancellation?")
}
輸出如下:
job1: I have my own context and execute independently!
job2: I am a child of the request coroutine
job1: I am not affected by cancellation of the request
main: Who has survived request cancellation?
結(jié)合上下文
協(xié)程上下文可以用+
操作符結(jié)合恭朗。右手邊的上下文替換掉左手邊上下文相關(guān)的條目屏镊。例如,協(xié)程的Job
可以被繼承痰腮,但調(diào)度器會(huì)被替換而芥。
fun main(args: Array<String>) = runBlocking<Unit> {
// 啟動(dòng)一個(gè)協(xié)程處理請求
val request = launch(coroutineContext) { // 使用 `runBlocking` 的上下文
// 在CommonPool中創(chuàng)建CPU密集型的任務(wù)
val job = launch(coroutineContext + CommonPool) {
println("job: I am a child of the request coroutine, but with a different dispatcher")
delay(1000)
println("job: I will not execute this line if my parent request is cancelled")
}
job.join() // 子任務(wù)完成時(shí),請求完成
}
delay(500)
request.cancel() // 取消請求的處理
delay(1000) // 延遲1s看看有啥發(fā)生
println("main: Who has survived request cancellation?")
}
預(yù)期結(jié)果如下:
job: I am a child of the request coroutine, but with a different dispatcher
main: Who has survived request cancellation?
當(dāng)?shù)呢?zé)任
父協(xié)程總是會(huì)等所有的子協(xié)程執(zhí)行完成膀值。父協(xié)程不必顯式的記錄所有其啟動(dòng)的子協(xié)程棍丐,也不必使用Job.join
等待其子協(xié)程執(zhí)行完成。
fun main(args: Array<String>) = runBlocking<Unit> {
// 啟動(dòng)一個(gè)協(xié)程處理請求
val request = launch {
repeat(3) { i -> // 啟動(dòng)幾個(gè)子協(xié)程
launch(coroutineContext) {
delay((i + 1) * 200L) // 可變延遲 200ms, 400ms, 600ms
println("Coroutine $i is done")
}
}
println("request: I'm done and I don't explicitly join my children that are still active")
}
request.join() // 等待請求完成沧踏,也包括其子協(xié)程
println("Now processing of the request is complete")
}
結(jié)果如下:
request: I'm done and I don't explicitly join my children that are still active
Coroutine 0 is done
Coroutine 1 is done
Coroutine 2 is done
Now processing of the request is complete
給協(xié)程命名方便調(diào)試
當(dāng)協(xié)程日志很頻繁或者你只想關(guān)聯(lián)相同協(xié)程產(chǎn)生的日志記錄時(shí)歌逢,自動(dòng)生成id是挺好的。然而翘狱,當(dāng)協(xié)程固定的處理一個(gè)特別的請求秘案,或者處理特定的后臺(tái)任務(wù),為了調(diào)試潦匈,還是命名比較好阱高。CoroutineName
上下文元素與線程名稱的功能一致,在調(diào)試默認(rèn)打開的時(shí)茬缩,執(zhí)行協(xié)程的線程名稱將會(huì)展示為CoroutineName
赤惊。
下面的例子展示了這個(gè)理念:
fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
fun main(args: Array<String>) = runBlocking(CoroutineName("main")) {
log("Started main coroutine")
// 啟動(dòng)兩個(gè)后臺(tái)計(jì)算
val v1 = async(CoroutineName("v1coroutine")) {
delay(500)
log("Computing v1")
252
}
val v2 = async(CoroutineName("v2coroutine")) {
delay(1000)
log("Computing v2")
6
}
log("The answer for v1 / v2 = ${v1.await() / v2.await()}")
}
當(dāng)有JVM參數(shù)-Dkotlinx.coroutines.debug
時(shí),產(chǎn)生如下結(jié)果:
[main @main#1] Started main coroutine
[ForkJoinPool.commonPool-worker-1 @v1coroutine#2] Computing v1
[ForkJoinPool.commonPool-worker-2 @v2coroutine#3] Computing v2
[main @main#1] The answer for v1 / v2 = 42
通過指定任務(wù)取消執(zhí)行
現(xiàn)在凰锡,我們已經(jīng)了解了上下文未舟,父子關(guān)系和任務(wù)圈暗,讓我們把這些玩意兒放一塊耍耍。假設(shè)我們的應(yīng)用有一個(gè)有生命周期的對象处面,這個(gè)對象不是協(xié)程厂置。例如,我們寫一個(gè)Android應(yīng)用時(shí)魂角,在Android Activity上下文中啟動(dòng)了各種各樣的協(xié)程用于異步獲取數(shù)據(jù)和動(dòng)畫計(jì)算。當(dāng)activity銷毀時(shí)智绸,所有的協(xié)程都得取消掉野揪,避免內(nèi)存泄漏。
我們一個(gè)創(chuàng)建一個(gè)跟activity綁定的Job
實(shí)例瞧栗,用于管理我們的協(xié)程斯稳。Job
實(shí)例由Job()
工廠創(chuàng)建,等會(huì)例子會(huì)演示迹恐。為了方便理解挣惰,我們可以launch(coroutineContext, parent = job)
這樣寫,說明用了父job
殴边,而不是用launch(coroutineContext + job)
表達(dá)式憎茂。
現(xiàn)在,一個(gè)Job.cancel
調(diào)用锤岸,將會(huì)所有我們啟動(dòng)的所有協(xié)程竖幔。此外,Job.join
等待所有子協(xié)程完成是偷,因此在下面的例子中我們也可以用cancelAndJoin
:
fun main(args: Array<String>) = runBlocking<Unit> {
val job = Job() // 創(chuàng)建一個(gè)Job來專利我們自己協(xié)程的生命周期
// 為了演示拳氢,啟動(dòng)10個(gè)協(xié)程,每個(gè)運(yùn)行不同的時(shí)間
val coroutines = List(10) { i ->
// 都是我們job對象的兒子
launch(coroutineContext, parent = job) { // 使用runBlocking的上下文蛋铆,但是用我們自己的job
delay((i + 1) * 200L) // 花樣等待
println("Coroutine $i is done")
}
}
println("Launched ${coroutines.size} coroutines")
delay(500L) // 延遲500ms
println("Cancelling the job!")
job.cancelAndJoin() // 取消所有的任務(wù)馋评,并等待其完成
}
輸出如下:
Launched 10 coroutines
Coroutine 0 is done
Coroutine 1 is done
Cancelling the job!
如你所見,只有前兩個(gè)協(xié)程打印了消息刺啦,其他都被一單個(gè)job.cancelAndJoin()
給取消掉了留特。所以,在我們假想的Android應(yīng)用中洪燥,需要做的磕秤,就是在activity創(chuàng)建的時(shí)候創(chuàng)建一個(gè)父job,然后在子協(xié)程創(chuàng)建的時(shí)候使用這個(gè)job捧韵,最后市咆,在activity銷毀時(shí)取消掉這個(gè)job即可。在Android生命周期中再来,我們不能join
它們蒙兰,因?yàn)槭峭降牧琢觥T诮ㄔ旌蠖朔?wù)時(shí),join
用于保證有限的資源訪問是很有用的搜变。
通道(Channels)
延期值(Deferred values)提供了一個(gè)在協(xié)程中轉(zhuǎn)移單個(gè)值的便捷方式采缚。通道提供了一個(gè)方式來轉(zhuǎn)移數(shù)據(jù)流。
通道基礎(chǔ)
Channel
在概念上與BlockingQueue
非常相似挠他。不同之處是前者用可掛起的send
替代后者阻塞的put
操作扳抽,前者用可掛起的receive替代后者是阻塞的take
操作。
fun main(args: Array<String>) = runBlocking<Unit> {
val channel = Channel<Int>()
launch {
// 這里可能是重度消耗CPU的計(jì)算殖侵,或是異步邏輯贸呢,這里我們就發(fā)送幾個(gè)平方數(shù)
for (x in 1..5) channel.send(x * x)
}
// 這里打印5個(gè)收到的整數(shù)
repeat(5) { println(channel.receive()) }
println("Done!")
}
結(jié)果如下:
1
4
9
16
25
Done!
關(guān)閉和遍歷通道
不像隊(duì)列,通道可以關(guān)閉用于表明沒有更多的元素會(huì)過來了拢军。在接收端楞陷,用for
循環(huán)可以很方便的接受通道中傳來的元素。
概念上來說茉唉,close
就像給通道傳遞一個(gè)特殊的關(guān)閉令牌固蛾。在接受到關(guān)閉令牌之時(shí),迭代就會(huì)停止度陆,因此艾凯,這里保證了關(guān)閉之前發(fā)送的元素都被接收到了:
fun main(args: Array<String>) = runBlocking<Unit> {
val channel = Channel<Int>()
launch {
for (x in 1..5) channel.send(x * x)
channel.close() // 發(fā)完了
}
// 用for循環(huán)打印接收值 (在通過關(guān)閉之前)
for (y in channel) println(y)
println("Done!")
}
建造通道生產(chǎn)者
協(xié)程生產(chǎn)一序列元素的模式是比較常見的。這是在并發(fā)代碼中經(jīng)臣嵛撸可以發(fā)現(xiàn)的生產(chǎn)者-消費(fèi)者模式的一部分览芳。你可以將生產(chǎn)者抽象為一個(gè)以通道為參數(shù)的函數(shù),但與常識(shí)相背的是鸿竖,結(jié)果需要被函數(shù)返回沧竟。
fun produceSquares() = produce<Int> {
for (x in 1..5) send(x * x)
}
fun main(args: Array<String>) = runBlocking<Unit> {
val squares = produceSquares()
squares.consumeEach { println(it) }
println("Done!")
}
流水線
流水線,一種協(xié)程可產(chǎn)生無限數(shù)據(jù)流的模式缚忧。
fun produceNumbers() = produce<Int> {
var x = 1
while (true) send(x++) // 從1開始的無限整數(shù)流
}
另一個(gè)協(xié)程或多個(gè)協(xié)程會(huì)消費(fèi)這個(gè)流悟泵,做一些處理,然后產(chǎn)出結(jié)果闪水。下面的例子中糕非,這些數(shù)會(huì)被平方:
fun square(numbers: ReceiveChannel<Int>) = produce<Int> {
for (x in numbers) send(x * x)
}
下面的代碼啟動(dòng)然后連接整個(gè)流水線:
fun main(args: Array<String>) = runBlocking<Unit> {
val numbers = produceNumbers() // 產(chǎn)生從1開始的整數(shù)流
val squares = square(numbers) // 平方整數(shù)
for (i in 1..5) println(squares.receive()) // 打印前五個(gè)
println("Done!")
squares.cancel() // 在大型應(yīng)用中,需要關(guān)閉這些協(xié)程
numbers.cancel()
}
在上面這個(gè)例子中球榆,我們不用取消掉這些協(xié)程朽肥,因?yàn)閰f(xié)程就像守護(hù)線程一樣。但是在大點(diǎn)的應(yīng)用中持钉,如果我們不需要了衡招,那就要停止掉流水線∶壳浚或者始腾,我們可以將流水線協(xié)程作為主協(xié)程的兒子州刽,接下來會(huì)演示。
流水線獲取素?cái)?shù)
下面舉例將流水線應(yīng)用到極致——用流水線協(xié)程生成素?cái)?shù)浪箭。首先穗椅,生成一個(gè)無限的整數(shù)序列。這次我們傳如一個(gè)context
參數(shù)奶栖,并將這個(gè)參數(shù)傳遞給produce
建造器匹表,因此,調(diào)用方可以控制協(xié)程在哪跑:
fun numbersFrom(context: CoroutineContext, start: Int) = produce<Int>(context) {
var x = start
while (true) send(x++) // 從start開始的無限整數(shù)流
}
下面的流水線過濾掉了所有不能被給定素?cái)?shù)除盡的數(shù):
fun filter(context: CoroutineContext, numbers: ReceiveChannel<Int>, prime: Int) = produce<Int>(context) {
for (x in numbers) if (x % prime != 0) send(x)
}
現(xiàn)在驼抹,我們建立一個(gè)從2開始的整數(shù)流桑孩,從當(dāng)前的通道獲取素?cái)?shù),然后為找到的素?cái)?shù)開啟新的通道:
numbersFrom(2) -> filter(2) -> filter(3) -> filter(5) -> filter(7) ...
下面的例子打印了前十個(gè)素?cái)?shù)框冀,在主線程的上下文中運(yùn)行了整個(gè)流水線。因?yàn)樗械膮f(xié)程是作為runBlocking
協(xié)程上下文的兒子啟動(dòng)的敏簿,所以我們不必將所有我們啟動(dòng)的協(xié)程列下來明也,我們用cancelChildren
擴(kuò)展函數(shù)取消所有的子協(xié)程。
fun main(args: Array<String>) = runBlocking<Unit> {
var cur = numbersFrom(coroutineContext, 2)
for (i in 1..10) {
val prime = cur.receive()
println(prime)
cur = filter(coroutineContext, cur, prime)
}
coroutineContext.cancelChildren() // 取消所有的子協(xié)程惯裕,從而讓主線程退出
}
輸出如下:
2
3
5
7
11
13
17
19
23
29
注意温数,你可以用標(biāo)準(zhǔn)庫中的buildIterator
協(xié)程建造器建造相同的流水線。將produce
用buildIterator
替換蜻势,send
用yield
替換撑刺,receive
用next
替換
,ReceiveChannel
用Iterator
替換握玛,并去掉上下文够傍。你也不用runBlocking
了。然而挠铲,上面展示的流水線使用通道的好處冕屯,就是能夠充分利用多核CPU(如果在CommonPool
上下文運(yùn)行)。
扇出
多個(gè)協(xié)程可以從同一個(gè)通道接收拂苹,任務(wù)散布于多個(gè)協(xié)程之間安聘。讓我們從一個(gè)周期產(chǎn)生整數(shù)(1秒10個(gè)數(shù))的生產(chǎn)者協(xié)程開始:
fun produceNumbers() = produce<Int> {
var x = 1 // 從1開始
while (true) {
send(x++) // 生產(chǎn)下一個(gè)
delay(100) // 等1s
}
}
我們可以有多個(gè)處理者協(xié)程,在這個(gè)例子中瓢棒,就只打印他們的id和接收到的數(shù)字:
fun launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
for (msg in channel) {
println("Processor #$id received $msg")
}
}
讓我們啟動(dòng)5個(gè)處理者浴韭,讓它們運(yùn)行將近1秒,看看會(huì)發(fā)生什么:
fun main(args: Array<String>) = runBlocking<Unit> {
val producer = produceNumbers()
repeat(5) { launchProcessor(it, producer) }
delay(950)
producer.cancel() // 取消掉生產(chǎn)者協(xié)程脯宿,這樣就能將所有的協(xié)程取消
}
輸出與下面的結(jié)果類似念颈,盡管接收每個(gè)特定整數(shù)的處理器ID可能不同。
Processor #2 received 1
Processor #4 received 2
Processor #0 received 3
Processor #1 received 4
Processor #3 received 5
Processor #2 received 6
Processor #4 received 7
Processor #0 received 8
Processor #1 received 9
Processor #3 received 10
注意嗅绰,取消生產(chǎn)者協(xié)程關(guān)閉其通道舍肠,最終會(huì)結(jié)束處理者協(xié)程遍歷通道搀继。
同樣的,注意我們在launchProcessor
代碼中如何用for
循環(huán)遍歷通道實(shí)現(xiàn)扇出的翠语。不像consumeEach
叽躯,for
循環(huán)風(fēng)格在多協(xié)程使用時(shí)是妥妥安全的。如果肌括,其中的一個(gè)協(xié)程掛掉了点骑,其他的協(xié)程還會(huì)繼續(xù)處理通道。而當(dāng)處理者用consumeEach
遍歷時(shí)谍夭,正澈诘危或非正常結(jié)束都會(huì)將通道給取消掉。
扇入
多個(gè)協(xié)程可以向相同通道發(fā)送紧索。例如袁辈,我們有一個(gè)字符串通道,一個(gè)有特定延遲周期發(fā)送特定字符串到通道的掛起函數(shù)珠漂。
suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
while (true) {
delay(time)
channel.send(s)
}
}
現(xiàn)在晚缩,來看看如果啟動(dòng)兩個(gè)協(xié)程發(fā)送字符串會(huì)怎么樣(這個(gè)例子中,我們在主線程上下文中啟動(dòng)它們):
fun main(args: Array<String>) = runBlocking<Unit> {
val channel = Channel<String>()
launch(coroutineContext) { sendString(channel, "foo", 200L) }
launch(coroutineContext) { sendString(channel, "BAR!", 500L) }
repeat(6) { // 接收頭6個(gè)
println(channel.receive())
}
coroutineContext.cancelChildren() // 取消所有的子協(xié)程媳危,讓主線程結(jié)束
}
輸出是:
foo
foo
BAR!
foo
foo
BAR!
帶緩沖區(qū)的通道
前面展示的通道都是不帶緩沖的荞彼。沒有緩沖區(qū)的通道只有在發(fā)送者和接收者見到了彼此才傳遞元素。如果發(fā)送先調(diào)用了待笑,那么它將掛起直到接收被調(diào)用鸣皂;如果接收先調(diào)用了,那么它將掛起直到發(fā)送被調(diào)用暮蹂。
Channel()
工廠函數(shù)和produce
建造器接收一個(gè)可選的用來指定緩沖區(qū)大小capacity
參數(shù)寞缝,緩沖區(qū)可以讓發(fā)送者在掛起之前發(fā)送多個(gè)元素,跟指定容量的BlockingQueue
類似椎侠,在緩沖區(qū)滿了之后阻塞第租。
看看下面的代碼會(huì)有啥效果:
fun main(args: Array<String>) = runBlocking<Unit> {
val channel = Channel<Int>(4) // 創(chuàng)建帶緩沖區(qū)的通道
val sender = launch(coroutineContext) { // 啟動(dòng)發(fā)送者協(xié)程
repeat(10) {
println("Sending $it") // 在發(fā)送之前先打印
channel.send(it) // 將會(huì)在緩沖區(qū)滿的時(shí)候掛起
}
}
// 什么都不做,等著
delay(1000)
sender.cancel() // 取消發(fā)送者協(xié)程
}
用緩沖區(qū)大小為4的通道我纪,打印了5次慎宾。
Sending 0
Sending 1
Sending 2
Sending 3
Sending 4
頭4個(gè)元素加入到了緩沖區(qū),當(dāng)試圖加入第五個(gè)的時(shí)候浅悉,發(fā)送者被掛起了趟据。
時(shí)鐘通道
時(shí)鐘通道是一種特別的單緩沖區(qū)通道,每次自上次從此通道消費(fèi)后术健,在給定時(shí)間后會(huì)生產(chǎn)一個(gè)Unit
汹碱。單獨(dú)使用看起來像沒什么用,但是在構(gòu)建復(fù)雜的基于時(shí)間的生產(chǎn)流水線,然后操作者做一些窗口和其他基于時(shí)間的處理時(shí)特別有用拌夏。
用ticker
工廠方法創(chuàng)建時(shí)鐘通道,后續(xù)元素不再需要時(shí)锥惋,用ReceiveChannel.cancel
取消掉跪腹。
看看實(shí)際中如何應(yīng)用:
fun main(args: Array<String>) = runBlocking<Unit> {
val tickerChannel = ticker(delay = 100, initialDelay = 0) // 創(chuàng)建時(shí)鐘通道
var nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
println("Initial element is available immediately: $nextElement") // 最初的延遲還沒結(jié)束
nextElement = withTimeoutOrNull(50) { tickerChannel.receive() } // 后續(xù)的元素都有100ms延遲
println("Next element is not ready in 50 ms: $nextElement")
nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
println("Next element is ready in 100 ms: $nextElement")
// 模擬長時(shí)間消費(fèi)延遲
println("Consumer pauses for 150ms")
delay(150)
// 下個(gè)元素立即可用
nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
println("Next element is available immediately after large consumer delay: $nextElement")
// receive方法調(diào)用間的暫停也算進(jìn)去了褂删,下一個(gè)元素會(huì)更快收到
nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
println("Next element is ready in 50ms after consumer pause in 150ms: $nextElement")
tickerChannel.cancel() // 后面的不要了
}
會(huì)打印下面幾行:
Initial element is available immediately: kotlin.Unit
Next element is not ready in 50 ms: null
Next element is ready in 100 ms: kotlin.Unit
Consumer pauses for 150ms
Next element is available immediately after large consumer delay: kotlin.Unit
Next element is ready in 50ms after consumer pause in 150ms: kotlin.Unit
ticker
知道消費(fèi)者停頓,如果有停頓冲茸,默認(rèn)調(diào)整下次產(chǎn)生產(chǎn)生元素的延遲屯阀,嘗試維護(hù)產(chǎn)生元素的固定速率。
一個(gè)可選的參數(shù)mode
轴术,如果指定為TickerMode.FIXED_PERIOD
难衰,那么ticker
會(huì)維護(hù)一個(gè)元素間固定延遲。默認(rèn)是TickerMode.FIXED_DELAY
逗栽。
這里多講講兩個(gè)模式的區(qū)別盖袭,后面再舉個(gè)例子說明區(qū)別。
TickerMode.FIXED_PERIOD
: 為了保持產(chǎn)生元素的速率彼宠,會(huì)調(diào)整下一個(gè)元素產(chǎn)生的延遲苍凛。
TickerMode.FIXED_DELAY
: 固定的延遲產(chǎn)生元素。
區(qū)分兩個(gè)模式的例子兵志。
fun log(msg: String) {
println("[${Date()}] $msg")
}
fun main(args: Array<String>) = runBlocking<Unit> {
val tickerChannel = ticker(delay = 5000, initialDelay = 0, mode = TickerMode.FIXED_DELAY)
var i = 0
for (item in tickerChannel) {
log("receive $item")
val time = if (i++ % 2 == 0) 4000 else 6000 // 切換使用4s/6s延遲
delay(time)
}
}
如果用TickerMode.FIXED_DELAY
模式:
[Sun Jul 22 16:36:17 CST 2018] receive kotlin.Unit
[Sun Jul 22 16:36:22 CST 2018] receive kotlin.Unit
[Sun Jul 22 16:36:28 CST 2018] receive kotlin.Unit
[Sun Jul 22 16:36:33 CST 2018] receive kotlin.Unit
[Sun Jul 22 16:36:39 CST 2018] receive kotlin.Unit
如果用TickerMode.FIXED_PERIOD
模式:
[Sun Jul 22 16:43:52 CST 2018] receive kotlin.Unit
[Sun Jul 22 16:43:57 CST 2018] receive kotlin.Unit
[Sun Jul 22 16:44:03 CST 2018] receive kotlin.Unit
[Sun Jul 22 16:44:07 CST 2018] receive kotlin.Unit
[Sun Jul 22 16:44:13 CST 2018] receive kotlin.Unit
[Sun Jul 22 16:44:17 CST 2018] receive kotlin.Unit
第一次延遲都是5s,后面的區(qū)別是FIXED_DELAY
延遲在5/6s間切換宣肚;FIXED_PERIOD
的延遲在4/6s間切換想罕。相信大家已經(jīng)能夠區(qū)分了。
通道是公平的
對于從多個(gè)協(xié)程調(diào)用通道的順序霉涨,向通道發(fā)送和接收操作是公平的按价。按照先進(jìn)先出的順序進(jìn)出通道,例如笙瑟,第一個(gè)調(diào)用receive
的協(xié)程獲得元素楼镐。下面的例子中,兩個(gè)協(xié)程("ping"和"pong")從同一個(gè)通道"table"接收"ball"對象往枷。
data class Ball(var hits: Int)
fun main(args: Array<String>) = runBlocking<Unit> {
val table = Channel<Ball>() // 公用一張桌子
launch(coroutineContext) { player("ping", table) }
launch(coroutineContext) { player("pong", table) }
table.send(Ball(0)) // 發(fā)球
delay(1000) // 延遲一秒
coroutineContext.cancelChildren() // 游戲結(jié)束框产,取消它們
}
suspend fun player(name: String, table: Channel<Ball>) {
for (ball in table) { // 在循環(huán)中接球
ball.hits++
println("$name $ball")
delay(300) // 等一會(huì)
table.send(ball) // 將球擊回
}
}
"ping"協(xié)程先開始的,所以错洁,它最先收到球秉宿。即使"ping"協(xié)程在將球擊回桌面后立即再次開始接球,但球還是給"pong"協(xié)程接到了屯碴,因?yàn)?pong"早等著在了:
ping Ball(hits=1)
pong Ball(hits=2)
ping Ball(hits=3)
pong Ball(hits=4)
有時(shí)候描睦,通道可能會(huì)產(chǎn)生看起不公平的執(zhí)行,是因協(xié)程使用到的線程池所致导而。
共享可變狀態(tài)和并發(fā)
協(xié)程可以用多線程的調(diào)度器(例如默認(rèn)的CommonPool
)并發(fā)執(zhí)行忱叭。那么并發(fā)問題也接踵而至隔崎。主要問題是同步訪問共享可變狀態(tài)。在協(xié)程領(lǐng)域里韵丑,這個(gè)問題的解決方案有些與多線程領(lǐng)域中類似爵卒,但是有些則截然不同。
問題
讓我們啟動(dòng)1000個(gè)協(xié)程埂息,做同樣的事情1000次(一共一百萬次執(zhí)行)技潘。為了一會(huì)做比較,我們記錄下執(zhí)行時(shí)間:
suspend fun massiveRun(context: CoroutineContext, action: suspend () -> Unit) {
val n = 1000 // 啟動(dòng)協(xié)程的數(shù)量
val k = 1000 // 每個(gè)協(xié)程執(zhí)行動(dòng)作的次數(shù)
val time = measureTimeMillis {
val jobs = List(n) {
launch(context) {
repeat(k) { action() }
}
}
jobs.forEach { it.join() }
}
println("Completed ${n * k} actions in $time ms")
}
我們以一個(gè)非常簡單的動(dòng)作千康,在多線程的CommonPool
上下文下享幽,累加一個(gè)共享的變量。
var counter = 0
fun main(args: Array<String>) = runBlocking<Unit> {
massiveRun(CommonPool) {
counter++
}
println("Counter = $counter")
}
最終會(huì)打印出個(gè)啥拾弃?應(yīng)該不會(huì)是"Counter = 1000000"值桩,因?yàn)?000個(gè)協(xié)程在多個(gè)線程同步的累加counter
而沒有進(jìn)行同步。
注意:如果你的計(jì)算機(jī)只有2核或更少豪椿,那么你會(huì)一直得到1000000奔坟,因?yàn)?code>CommonPool在這種情況下是單線程的。為了“造成”這個(gè)問題搭盾,需要把代碼改改:
val mtContext = newFixedThreadPoolContext(2, "mtPool") // 定義一個(gè)2線程的上下文
var counter = 0
fun main(args: Array<String>) = runBlocking<Unit> {
massiveRun(mtContext) { // 替代上面例子的CommonPool
counter++
}
println("Counter = $counter")
}
volatile也愛莫能助
有個(gè)常見的誤解咳秉,以為volatile
可以解決并發(fā)問題。試試看:
@Volatile // kotlin 中鸯隅, volatile 是個(gè)注解
var counter = 0
fun main(args: Array<String>) = runBlocking<Unit> {
massiveRun(CommonPool) {
counter++
}
println("Counter = $counter")
}
代碼跑得更慢了澜建,但最終也沒能得到"Counter = 1000000",因?yàn)?code>volatile保證了順序的讀和寫蝌以,但是對大的操作(這里是累加)不保證原子性炕舵。
線程安全的數(shù)據(jù)結(jié)構(gòu)
對協(xié)程和線程都通用的解決方案是利用一個(gè)線程安全的數(shù)據(jù)結(jié)構(gòu),這個(gè)數(shù)據(jù)結(jié)構(gòu)提供對共享狀態(tài)必要的同步操作跟畅。在上面的例子中咽筋,我們可以用AtomicInteger
類,它有個(gè)incrementAndGet
方法:
var counter = AtomicInteger()
fun main(args: Array<String>) = runBlocking<Unit> {
massiveRun(CommonPool) {
counter.incrementAndGet()
}
println("Counter = ${counter.get()}")
}
這是針對這個(gè)問題最快的解決方法徊件。這種解決方法適用于普通計(jì)數(shù)器奸攻,集合,隊(duì)列和其他標(biāo)準(zhǔn)數(shù)據(jù)結(jié)構(gòu)以及它們的基本操作庇忌。但是不容易擴(kuò)展到復(fù)雜狀態(tài)或沒有現(xiàn)成的線程安全實(shí)現(xiàn)的復(fù)雜操作舞箍。
細(xì)粒度線程限制
線程限制一種解決共享可變狀態(tài)的方式,它將共享變量的訪問限制到單個(gè)線程上皆疹。在UI應(yīng)用中很適用疏橄,因?yàn)閁I狀態(tài)一般都限制于事件派發(fā)或應(yīng)用線程。在協(xié)程中用單線程上下文很容易實(shí)現(xiàn):
val counterContext = newSingleThreadContext("CounterContext")
var counter = 0
fun main(args: Array<String>) = runBlocking<Unit> {
massiveRun(CommonPool) { // 在CommonPool中運(yùn)行每個(gè)協(xié)程
withContext(counterContext) { // 但是在單個(gè)線程中累加
counter++
}
}
println("Counter = $counter")
}
這個(gè)代碼跑的很慢,因?yàn)樽龅搅思?xì)粒度的線程限制捎迫。每個(gè)獨(dú)立的累加都利用withContext
塊從多線程CommonPool
上下文切換到單線程的上下文晃酒。
粗粒度線程限制
實(shí)際應(yīng)用中,線程限制是在大塊中執(zhí)行的窄绒。例如贝次,一大塊更新狀態(tài)的業(yè)務(wù)邏輯限制于單個(gè)線程。例如下面這個(gè)例子彰导,在單線程的上下文中運(yùn)行每個(gè)協(xié)程:
val counterContext = newSingleThreadContext("CounterContext")
var counter = 0
fun main(args: Array<String>) = runBlocking<Unit> {
massiveRun(counterContext) { // 在單線程的上下文中運(yùn)行每個(gè)協(xié)程
counter++
}
println("Counter = $counter")
}
這就快多了蛔翅,而且結(jié)果是對的。
互斥操作
互斥位谋,利用一個(gè)用不會(huì)并發(fā)執(zhí)行的臨界區(qū)山析,保護(hù)對共享狀態(tài)的修改。在阻塞的世界里掏父,通常用synchronized
或ReentrantLock
達(dá)到互斥笋轨。協(xié)程中的替代品叫做Mutex
。它用lock
和unlock
方法界定臨界區(qū)赊淑。關(guān)鍵不同之處是爵政,Mutex.lock()
是掛起函數(shù),不會(huì)阻塞線程陶缺。
有一個(gè)擴(kuò)展函數(shù)withLock
钾挟,方便的幫你做了mutex.lock(); try { ... } finally { mutex.unlock() }
這事:
val mutex = Mutex()
var counter = 0
fun main(args: Array<String>) = runBlocking<Unit> {
massiveRun(CommonPool) {
mutex.withLock {
counter++
}
}
println("Counter = $counter")
}
這個(gè)例子中的鎖是細(xì)粒度的,因此饱岸,是有代價(jià)的等龙。然而,在有些你必須周期修改共享狀態(tài)的情況下伶贰,鎖是個(gè)好選擇。同時(shí)罐栈,這個(gè)狀態(tài)沒有限制到某個(gè)線程上黍衙。
Actors
actor
是由協(xié)程、限制并包含于此協(xié)程的狀態(tài)荠诬、與其他協(xié)程交流的通道組成琅翻。一個(gè)簡單的actor
可以寫成一個(gè)函數(shù),但是復(fù)雜狀態(tài)的actor
更適合寫成一個(gè)類柑贞。
有個(gè)actor
協(xié)程建造器方椎,它可以方便地將actor
的郵箱通道組合到其范圍內(nèi),以便從發(fā)送通道接收消息并將發(fā)送通道組合到生成的job
對象中钧嘶。因此棠众,單個(gè)actor
的引用就攜帶了上面所有的東西。
使用actor
的第一步是定義一個(gè)actor
要處理的消息類。Kotlin的密封類(sealed class)非常適合這個(gè)目的闸拿。首先定一個(gè)CounterMsg
密封類空盼,子類IncCounter
作為增加計(jì)數(shù)器的消息,GetCounter
作為獲取計(jì)數(shù)器值的消新荤,后者需要發(fā)送回復(fù)揽趾。CompletableDeferred
表示將來已知的單個(gè)值,此處用于發(fā)送回復(fù)苛骨。
// counterActor的消息類型
sealed class CounterMsg
object IncCounter : CounterMsg() // 增加計(jì)數(shù)器的單向消息
class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg() // 帶回復(fù)的請求
然后我們定義一個(gè)使用actor
協(xié)程建造器啟動(dòng)actor
的函數(shù):
// 此函數(shù)啟動(dòng)一個(gè)新的計(jì)數(shù)器actor
fun counterActor() = actor<CounterMsg> {
var counter = 0 // actor 狀態(tài)
for (msg in channel) { // 遍歷進(jìn)來的消息
when (msg) {
is IncCounter -> counter++
is GetCounter -> msg.response.complete(counter)
}
}
}
主要代碼很簡單:
fun main(args: Array<String>) = runBlocking<Unit> {
val counter = counterActor() // 創(chuàng)建actor
massiveRun(CommonPool) {
counter.send(IncCounter)
}
// 發(fā)送一個(gè)消息篱瞎,用于從actor中獲取計(jì)算器的值
val response = CompletableDeferred<Int>()
counter.send(GetCounter(response))
println("Counter = ${response.await()}")
counter.close() // 關(guān)閉actor
}
actor
自身執(zhí)行的上下文無關(guān)緊要。一個(gè)actor
是一個(gè)協(xié)程痒芝,協(xié)程是順序執(zhí)行的俐筋,因此,將狀態(tài)限制在特定協(xié)程中可以解決共享可變狀態(tài)的問題吼野。實(shí)際上校哎,actor
可以修改自己的私有狀態(tài),但只能通過消息相互影響(避免任何鎖定)瞳步。
actor
比在負(fù)載下鎖定更有效闷哆,因?yàn)樵谶@種情況下它總是有工作要做,而且根本不需要切換到不同的上下文单起。
注意抱怔,
actor
協(xié)程構(gòu)建器是produce
協(xié)程構(gòu)建器的雙重構(gòu)件。actor
與它接收消息的通道相關(guān)聯(lián)嘀倒,produce
與它發(fā)送元素的通道相關(guān)聯(lián)屈留。
Select表達(dá)式
Select表達(dá)式可同時(shí)讓多個(gè)掛起函數(shù)等待,并選擇第一個(gè)使其激活测蘑。
從通道中選擇
讓我們先創(chuàng)建兩個(gè)生產(chǎn)字符串的生產(chǎn)者:fizz
和buzz
灌危。fizz
每300ms產(chǎn)生一個(gè)"Fizz"字符串:
fun fizz(context: CoroutineContext) = produce<String>(context) {
while (true) { // 每300ms發(fā)送一個(gè)"Fizz"
delay(300)
send("Fizz")
}
}
buzz
每500ms產(chǎn)生一個(gè)"Buzz"字符串:
fun buzz(context: CoroutineContext) = produce<String>(context) {
while (true) { // 每500ms發(fā)送一個(gè)"Buzz!"
delay(500)
send("Buzz!")
}
}
使用receive
掛起函數(shù),我們可以從一個(gè)通道或另一個(gè)通道接收碳胳。但select
表達(dá)式允許我們使用其onReceive
子句同時(shí)從兩者接收:
suspend fun selectFizzBuzz(fizz: ReceiveChannel<String>, buzz: ReceiveChannel<String>) {
select<Unit> { // <Unit> 意味著select表達(dá)式?jīng)]有返回
fizz.onReceive { value -> // 第一個(gè)子句
println("fizz -> '$value'")
}
buzz.onReceive { value -> // 第二個(gè)子句
println("buzz -> '$value'")
}
}
}
跑七次這個(gè)函數(shù):
fun main(args: Array<String>) = runBlocking<Unit> {
val fizz = fizz(coroutineContext)
val buzz = buzz(coroutineContext)
repeat(7) {
selectFizzBuzz(fizz, buzz)
}
coroutineContext.cancelChildren() // 取消倆協(xié)程
}
結(jié)果如下:
fizz -> 'Fizz'
buzz -> 'Buzz!'
fizz -> 'Fizz'
fizz -> 'Fizz'
buzz -> 'Buzz!'
fizz -> 'Fizz'
buzz -> 'Buzz!'
選擇關(guān)閉
當(dāng)通道關(guān)閉時(shí)勇蝙,select
中的onReceive
子句報(bào)錯(cuò),導(dǎo)致相應(yīng)的select
拋出異常挨约。我們可以使用onReceiveOrNull
子句在關(guān)閉通道時(shí)執(zhí)行特定操作味混。以下示例還顯示select
是一個(gè)返回其選擇好的子句結(jié)果的表達(dá)式:
suspend fun selectAorB(a: ReceiveChannel<String>, b: ReceiveChannel<String>): String =
select<String> {
a.onReceiveOrNull { value ->
if (value == null)
"Channel 'a' is closed"
else
"a -> '$value'"
}
b.onReceiveOrNull { value ->
if (value == null)
"Channel 'b' is closed"
else
"b -> '$value'"
}
}
讓我們來使用這個(gè)函數(shù),傳入產(chǎn)生"Hello"字符串四次的通道a
和產(chǎn)生"World"四次的頻道b
:
fun main(args: Array<String>) = runBlocking<Unit> {
// 為了可預(yù)測結(jié)果诫惭,我們使用主線程上下文
val a = produce<String>(coroutineContext) {
repeat(4) { send("Hello $it") }
}
val b = produce<String>(coroutineContext) {
repeat(4) { send("World $it") }
}
repeat(8) { // 打印頭8個(gè)結(jié)果
println(selectAorB(a, b))
}
coroutineContext.cancelChildren()
}
觀察得處下列結(jié)論:
首先翁锡,select
傾向于第一個(gè)子句。當(dāng)多個(gè)子句同時(shí)可選時(shí)夕土,它們中的第一個(gè)被選擇馆衔。這里,兩個(gè)通道都不斷的產(chǎn)生字符串,a
作為第一個(gè)哈踱,勝荒适。然后,因?yàn)槲覀冇玫氖遣粠Ь彌_池的通道开镣,a
在調(diào)用send
時(shí)不時(shí)會(huì)掛起刀诬,就給了機(jī)會(huì)讓b
也來一發(fā)。
選擇發(fā)送
select
表達(dá)式具有onSend
子句邪财,可以與選擇的偏見性結(jié)合使用陕壹。
讓我們寫一個(gè)整數(shù)生產(chǎn)者的例子,當(dāng)主通道上的消費(fèi)者無法跟上它時(shí)树埠,它會(huì)將其值發(fā)送到side
通道糠馆。
fun produceNumbers(context: CoroutineContext, side: SendChannel<Int>) = produce<Int>(context) {
for (num in 1..10) { // 產(chǎn)生10個(gè)數(shù)字
delay(100) // 每個(gè)延遲 100 ms
select<Unit> {
onSend(num) {} // 發(fā)送給主通道
side.onSend(num) {} // or to the side channel
}
}
}
消費(fèi)者將會(huì)非常緩慢,需要250毫秒才能處理每個(gè)數(shù)字:
fun main(args: Array<String>) = runBlocking<Unit> {
val side = Channel<Int>() // 分配side通道
launch(coroutineContext) { // 給side通道一個(gè)快速的消費(fèi)者
side.consumeEach { println("Side channel has $it") }
}
produceNumbers(coroutineContext, side).consumeEach {
println("Consuming $it")
delay(250) // 慢慢消費(fèi)怎憋,不著急
}
println("Done consuming")
coroutineContext.cancelChildren()
}
看看發(fā)生什么:
Consuming 1
Side channel has 2
Side channel has 3
Consuming 4
Side channel has 5
Side channel has 6
Consuming 7
Side channel has 8
Side channel has 9
Consuming 10
Done consuming
選擇延期值
可以使用onAwait
子句選擇延遲值又碌。讓我們從一個(gè)異步函數(shù)開始,該函數(shù)在隨機(jī)延遲后返回一個(gè)延遲字符串值:
fun asyncString(time: Int) = async {
delay(time.toLong())
"Waited for $time ms"
}
讓我們隨機(jī)延遲開始十幾個(gè)绊袋。
fun asyncStringsList(): List<Deferred<String>> {
val random = Random(3)
return List(12) { asyncString(random.nextInt(1000)) }
}
現(xiàn)在毕匀,主函數(shù)等待第一個(gè)函數(shù)完成并計(jì)算仍處于活動(dòng)狀態(tài)的延遲值的數(shù)量。注意癌别,我們在這里使用了select
表達(dá)式是Kotlin DSL的事實(shí)皂岔,因此我們可以使用任意代碼為它提供子句。在這個(gè)例子中展姐,我們遍歷一個(gè)延遲值列表躁垛,為每個(gè)延遲值提供onAwait
子句。
fun main(args: Array<String>) = runBlocking<Unit> {
val list = asyncStringsList()
val result = select<String> {
list.withIndex().forEach { (index, deferred) ->
deferred.onAwait { answer ->
"Deferred $index produced answer '$answer'"
}
}
}
println(result)
val countActive = list.count { it.isActive }
println("$countActive coroutines are still active")
}
輸出如下:
Deferred 4 produced answer 'Waited for 128 ms'
11 coroutines are still active
切換延遲值的通道
讓我們編寫一個(gè)消費(fèi)延遲字符串值通道的通道建造器函數(shù)圾笨,直到下一個(gè)延遲值結(jié)束或通道關(guān)閉之前教馆,等待每個(gè)接收到的延遲值。此示例將同一select
中的onReceiveOrNull
和onAwait
子句放在一起:
fun switchMapDeferreds(input: ReceiveChannel<Deferred<String>>) = produce<String> {
var current = input.receive() // 從第一個(gè)收到的延遲值開始
while (isActive) { // 當(dāng)通道沒取消時(shí)循環(huán)
val next = select<Deferred<String>?> { // 從此select返回下一個(gè)延遲值或null
input.onReceiveOrNull { update ->
update // 換下一個(gè)值等
}
current.onAwait { value ->
send(value) // 發(fā)送當(dāng)前延遲值產(chǎn)生的數(shù)據(jù)
input.receiveOrNull() // 使用輸入通道的下一個(gè)延遲值
}
}
if (next == null) {
println("Channel was closed")
break // 退出循環(huán)
} else {
current = next
}
}
}
為了測試它擂达,我們將使用一個(gè)簡單的異步函數(shù)活玲,它在指定的時(shí)間后返回指定的字符串:
fun asyncString(str: String, time: Long) = async {
delay(time)
str
}
main
函數(shù)只是啟動(dòng)一個(gè)協(xié)同程序來打印switchMapDeferreds
的結(jié)果并向它發(fā)送一些測試數(shù)據(jù):
fun main(args: Array<String>) = runBlocking<Unit> {
val chan = Channel<Deferred<String>>() // 測試通道
launch(coroutineContext) { // 開啟打印協(xié)程
for (s in switchMapDeferreds(chan))
println(s) // 打印收到的字符串
}
chan.send(asyncString("BEGIN", 100))
delay(200) // 夠"BEGIN"生產(chǎn)出來了
chan.send(asyncString("Slow", 500))
delay(100) // 不夠生產(chǎn)"Slow"的時(shí)間
chan.send(asyncString("Replace", 100))
delay(500) // 發(fā)送最后的字符串之前給點(diǎn)時(shí)間
chan.send(asyncString("END", 500))
delay(1000) // 給時(shí)間運(yùn)行
chan.close() // 關(guān)閉通道
delay(500) // 等運(yùn)行完
}
結(jié)果是:
BEGIN
Replace
END
Channel was closed