你的第一個(gè)協(xié)程
fun main(args: Array<String>) {
launch { //在后臺(tái)啟動(dòng)新的協(xié)程并繼續(xù)
delay(1000L) //非阻塞延遲1秒(默認(rèn)時(shí)間單位為ms)
println("World!") //延遲后打印
}
println("Hello,") //主線程繼續(xù),而協(xié)程延遲
Thread.sleep(2000L)//阻塞主線程2秒以保持JVM活動(dòng)
}
輸出結(jié)果
Hello,
World!
從本質(zhì)上講陨舱,協(xié)同程序是輕量級(jí)的線程翠拣。它們是與發(fā)布 協(xié)同程序構(gòu)建器一起啟動(dòng)的。您可以實(shí)現(xiàn)相同的結(jié)果替換 launch { … } 用 thread { … } 游盲,并 delay(…) 用 Thread.sleep(…) 误墓。嘗試一下蛮粮。
如果以替換launch為開頭thread,則編譯器會(huì)產(chǎn)生以下錯(cuò)誤:
Error: Kotlin: Suspend functions are only allowed to be called from a coroutine or another suspend function
這是因?yàn)閐elay是一個(gè)特殊的掛起函數(shù)谜慌,它不會(huì)阻塞一個(gè)線程然想,但會(huì)掛起 協(xié)同程序,它只能從協(xié)程中使用畦娄。
橋接阻塞和非阻塞世界
第一個(gè)示例在同一代碼中混合非阻塞 delay(…)和阻塞 Thread.sleep(…)又沾。很容易迷失哪一個(gè)阻塞而另一個(gè)阻塞。讓我們明確說(shuō)明使用runBlocking coroutine builder進(jìn)行阻塞:
fun main(args: Array<String>) {
launch { // launch new coroutine in background and continue
delay(1000L)
println("World!")
}
println("Hello,") // main thread continues here immediately
runBlocking { // but this expression blocks the main thread
delay(2000L) // ... while we delay for 2 seconds to keep JVM alive
}
}
結(jié)果是相同的熙卡,但此代碼僅使用非阻塞延遲杖刷。主線程,調(diào)用runBlocking驳癌,塊滑燃,直到協(xié)程內(nèi)runBlocking完成。
這個(gè)例子也可以用更慣用的方式重寫颓鲜,runBlocking用來(lái)包裝main函數(shù)的執(zhí)行:
等待工作
在另一個(gè)協(xié)程正在工作時(shí)延遲一段時(shí)間并不是一個(gè)好方法表窘。讓我們明確等待(以非阻塞方式),直到我們啟動(dòng)的后臺(tái)作業(yè)完成:
fun main(args: Array<String>) = runBlocking<Unit> {
val job = launch { // launch new coroutine and keep a reference to its Job
delay(1000L)
println("World!")
}
println("Hello,")
job.join() // wait until child coroutine completes
}
提取函數(shù)重構(gòu)
讓我們將代碼塊提取launch { … }到一個(gè)單獨(dú)的函數(shù)中甜滨。當(dāng)您對(duì)此代碼執(zhí)行“提取功能”重構(gòu)時(shí)乐严,您將獲得帶有suspend修飾符的新功能。這是你的第一個(gè)暫停功能衣摩。掛起函數(shù)可以在協(xié)程內(nèi)部使用昂验,就像常規(guī)函數(shù)一樣,但它們的附加功能是它們可以反過(guò)來(lái)使用其他掛起函數(shù)(如delay本示例中所示)來(lái)暫停協(xié)程的執(zhí)行艾扮。
fun main(args: Array<String>) = runBlocking<Unit> {
val job = launch { doWorld() }
println("Hello,")
job.join()
}
// this is your first suspending function
suspend fun doWorld() {
delay(1000L)
println("World!")
}
協(xié)同程序足夠輕量級(jí)
fun main(args: Array<String>) = runBlocking<Unit> {
val jobs = List(100_000) {
// launch a lot of coroutines and list their jobs
launch {
delay(1000L)
print(".")
}
}
jobs.forEach { it.join() } // wait for all jobs to complete
}
它啟動(dòng)了100K協(xié)同程序既琴,一秒鐘之后,每個(gè)協(xié)同程序都打印出一個(gè)點(diǎn)∨葑欤現(xiàn)在甫恩,嘗試使用線程。會(huì)發(fā)生什么酌予?(很可能你的代碼會(huì)產(chǎn)生某種內(nèi)存不足的錯(cuò)誤)
協(xié)同程序就像守護(hù)程序線程
下面的代碼啟動(dòng)一個(gè)長(zhǎng)時(shí)間運(yùn)行的協(xié)同程序磺箕,每秒打印“我正在睡覺”兩次,然后在一段延遲后從main函數(shù)返回:
fun main(args: Array<String>) = runBlocking<Unit> {
launch {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
}
delay(1300L) // just quit after delay
}
您可以運(yùn)行并看到它打印三行并終止:
I'm sleeping 0 ...
I'm sleeping 1 ...
I'm sleeping 2 ...
活動(dòng)協(xié)同程序不會(huì)使進(jìn)程保持活動(dòng)狀態(tài)抛虫。它們就像守護(hù)程序線程松靡。
取消和超時(shí)
在小應(yīng)用程序中,從“main”方法返回可能聽起來(lái)像是一個(gè)好主意莱褒,以便隱式終止所有協(xié)同程序。在較大的長(zhǎng)期運(yùn)行的應(yīng)用程序中涎劈,您需要更精細(xì)的控制广凸。在推出函數(shù)返回一個(gè)作業(yè)阅茶,可用于取消運(yùn)行協(xié)程:
fun main(args: Array<String>) = runBlocking<Unit> {
val job = launch {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancel() // cancels the job
job.join() // waits for job's completion
println("main: Now I can quit.")
}
輸出如下
I'm sleeping 0 ...
I'm sleeping 1 ...
I'm sleeping 2 ...
main: I'm tired of waiting!
main: Now I can quit.
主調(diào)用后job.cancel,我們看不到其他協(xié)同程序的任何輸出谅海,因?yàn)樗驯蝗∠嘲А_€有一個(gè)Job擴(kuò)展函數(shù)cancelAndJoin ,它結(jié)合了取消和連接調(diào)用扭吁。
取消是合作的
協(xié)同取消是合作的撞蜂。協(xié)程代碼必須合作才能取消。所有掛起函數(shù)kotlinx.coroutines都是可取消的侥袜。他們檢查coroutine的取消并在取消時(shí)拋出CancellationException蝌诡。但是,如果協(xié)程正在計(jì)算中并且未檢查取消枫吧,則無(wú)法取消它浦旱,如下例所示:
fun main(args: Array<String>) = runBlocking<Unit> {
val startTime = System.currentTimeMillis()
val job = launch {
var nextPrintTime = startTime
var i = 0
while (i < 5) { // computation loop, just wastes CPU
// print a message twice a second
if (System.currentTimeMillis() >= nextPrintTime) {
println("I'm sleeping ${i++} ...")
nextPrintTime += 500L
}
}
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancelAndJoin() // cancels the job and waits for its completion
println("main: Now I can quit.")
}
運(yùn)行它以查看它繼續(xù)打印“我正在睡覺”,即使在取消之后九杂,直到作業(yè)在五次迭代后自行完成卧须。
輸出結(jié)果
I'm sleep 0
I'm sleep 1
I'm sleep 2
main I;m tried of waiting
I'm sleep 3
I'm sleep 4
main Now I can quit
使計(jì)算代碼可取消
有兩種方法可以使計(jì)算代碼可以取消腊尚。第一個(gè)是定期調(diào)用檢查取消的掛起功能。有一個(gè)收益率的功能是實(shí)現(xiàn)這一目的的好選擇。另一個(gè)是明確檢查取消狀態(tài)侍芝。讓我們嘗試后一種方法。
fun main(args: Array<String>) = runBlocking<Unit> {
val startTime = System.currentTimeMillis()
val job = launch {
var nextPrintTime = startTime
var i = 0
while (isActive) { // cancellable computation loop
// print a message twice a second
if (System.currentTimeMillis() >= nextPrintTime) {
println("I'm sleeping ${i++} ...")
nextPrintTime += 500L
}
}
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancelAndJoin() // cancels the job and waits for its completion
println("main: Now I can quit.")
}
如您所見嗜浮,現(xiàn)在此循環(huán)已取消船逮。isActive是通過(guò)CoroutineScope對(duì)象在協(xié)同程序代碼中可用的屬性。
最后關(guān)閉資源
可取消的掛起函數(shù)會(huì)在取消時(shí)拋出CancellationException鹿响,這可以通過(guò)所有常規(guī)方式處理羡微。例如,當(dāng)取消協(xié)程時(shí)惶我,try {…} finally {…}表達(dá)式和Kotlin use函數(shù)通常會(huì)執(zhí)行其終結(jié)操作:
fun main(args: Array<String>) = runBlocking<Unit> {
val job = launch {
try {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
} finally {
println("I'm running finally")
}
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancelAndJoin() // cancels the job and waits for its completion
println("main: Now I can quit.")
無(wú)論加入和cancelAndJoin等待所有完成動(dòng)作來(lái)完成的妈倔,所以上面的例子產(chǎn)生下面的輸出:
I'm sleeping 0 ...
I'm sleeping 1 ...
I'm sleeping 2 ...
main: I'm tired of waiting!
I'm running finally
main: Now I can quit.
運(yùn)行不可取消的塊
finally在前一個(gè)示例的塊中嘗試使用掛起函數(shù)將導(dǎo)致CancellationException,因?yàn)檫\(yùn)行此代碼的協(xié)程將 被取消绸贡。通常盯蝴,這不是問(wèn)題,因?yàn)樗斜憩F(xiàn)良好的關(guān)閉操作(關(guān)閉文件听怕,取消作業(yè)或關(guān)閉任何類型的通信通道)通常都是非阻塞的捧挺,并且不涉及任何掛起功能。但是尿瞭,在極少數(shù)情況下闽烙,當(dāng)您需要掛起已取消的協(xié)同程序時(shí),可以withContext(NonCancellable) {…}使用withContext函數(shù)和NonCancellable上下文包裝相應(yīng)的代碼, 如下例所示:
fun main(args: Array<String>) = runBlocking<Unit> {
val job = launch {
try {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
} finally {
withContext(NonCancellable) {
println("I'm running finally")
delay(1000L)
println("And I've just delayed for 1 sec because I'm non-cancellable")
}
}
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancelAndJoin() // cancels the job and waits for its completion
println("main: Now I can quit.")
}
超時(shí)退出
在實(shí)踐中取消協(xié)程執(zhí)行的最明顯的原因是因?yàn)樗膱?zhí)行時(shí)間超過(guò)了一些超時(shí)黑竞。雖然您可以手動(dòng)跟蹤對(duì)相應(yīng)作業(yè)的引用并啟動(dòng)單獨(dú)的協(xié)同程序以在延遲后取消跟蹤的協(xié)程捕发,但是有一個(gè)準(zhǔn)備好使用withTimeout函數(shù)執(zhí)行此操作。請(qǐng)看以下示例:
fun main(args: Array<String>) = runBlocking<Unit> {
withTimeout(1300L) {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
}
}
I'm sleeping 0 ...
I'm sleeping 1 ...
I'm sleeping 2 ...
Exception in thread "main" kotlinx.coroutines.experimental.TimeoutCancellationException: Timed out waiting for 1300 MILLISECONDS
該TimeoutCancellationException由拋出withTimeout是的子類CancellationException很魂。我們之前沒(méi)有看到它的堆棧跟蹤打印在控制臺(tái)上扎酷。這是因?yàn)樵谌∠膮f(xié)程中CancellationException被認(rèn)為是協(xié)程完成的正常原因。但是遏匆,在這個(gè)例子中我們withTimeout在main函數(shù)內(nèi)部使用了法挨。
因?yàn)槿∠皇且粋€(gè)例外,所有資源都將以通常的方式關(guān)閉幅聘。您可以在超時(shí)包裹代碼try {…} catch (e: TimeoutCancellationException) {…}塊凡纳,如果你需要專門做一些額外的行動(dòng)在任何類型的超時(shí)或使用withTimeoutOrNull功能類似于withTimeout,但返回null的超時(shí)喊暖,而不是拋出一個(gè)異常:
fun main(args: Array<String>) = runBlocking<Unit> {
val result = withTimeoutOrNull(1300L) {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
"Done" // will get cancelled before it produces this result
}
println("Result is $result")
}
運(yùn)行此代碼時(shí)不再有異常:
I'm sleeping 0 ...
I'm sleeping 1 ...
I'm sleeping 2 ...
Result is null
撰寫暫停功能
默認(rèn)順序
假設(shè)我們?cè)谄渌胤蕉x了兩個(gè)掛起函數(shù)惫企,它們可以像某種遠(yuǎn)程服務(wù)調(diào)用或計(jì)算一樣有用。我們只是假裝它們很有用陵叽,但實(shí)際上每個(gè)只是為了這個(gè)例子的目的而延遲一秒:
suspend fun doSomethingUsefulOne(): Int {
delay(1000L) // pretend we are doing something useful here
return 13
}
suspend fun doSomethingUsefulTwo(): Int {
delay(1000L) // pretend we are doing something useful here, too
return 29
}
如果需要按順序調(diào)用它們狞尔,我們?cè)撛趺醋? 首先doSomethingUsefulOne 然后 doSomethingUsefulTwo計(jì)算結(jié)果的總和?實(shí)際上巩掺,如果我們使用第一個(gè)函數(shù)的結(jié)果來(lái)決定是否需要調(diào)用第二個(gè)函數(shù)或決定如何調(diào)用它偏序,我們就會(huì)這樣做。
我們只使用正常的順序調(diào)用胖替,因?yàn)閰f(xié)程中的代碼與常規(guī)代碼中的代碼一樣研儒,默認(rèn)是順序的。以下示例通過(guò)測(cè)量執(zhí)行兩個(gè)掛起函數(shù)所需的總時(shí)間來(lái)演示它:
fun main(args: Array<String>) = runBlocking<Unit> {
val time = measureTimeMillis {
val one = doSomethingUsefulOne()
val two = doSomethingUsefulTwo()
println("The answer is ${one + two}")
}
println("Completed in $time ms")
}
它產(chǎn)生這樣的東西:
The answer is 42
Completed in 2017 ms
并發(fā)使用異步
如果在調(diào)用doSomethingUsefulOne和之間沒(méi)有依賴關(guān)系独令,doSomethingUsefulTwo并且我們希望通過(guò)同時(shí)執(zhí)行兩者來(lái)更快地得到答案端朵,該怎么辦?這是異步來(lái)幫助的地方燃箭。
從概念上講冲呢,異步就像啟動(dòng)一樣。它啟動(dòng)一個(gè)單獨(dú)的協(xié)程招狸,這是一個(gè)輕量級(jí)的線程敬拓,與所有其他協(xié)同程序同時(shí)工作。不同之處在于launch返回一個(gè)Job并且不攜帶任何結(jié)果值裙戏,同時(shí)async返回Deferred - 一個(gè)輕量級(jí)的非阻塞未來(lái)乘凸,表示稍后提供結(jié)果的承諾。您可以使用.await()延遲值來(lái)獲取其最終結(jié)果累榜,但Deferred也是a Job营勤,因此您可以根據(jù)需要取消它。
fun main(args: Array<String>) = runBlocking<Unit> {
val time = measureTimeMillis {
val one = async { doSomethingUsefulOne() }
val two = async { doSomethingUsefulTwo() }
println("The answer is ${one.await() + two.await()}")
}
println("Completed in $time ms")
}
它產(chǎn)生這樣的東西:
The answer is 42
Completed in 1017 ms
這是兩倍的速度,因?yàn)槲覀兺瑫r(shí)執(zhí)行了兩個(gè)協(xié)同程序葛作。注意醒第,與協(xié)同程序的并發(fā)性始終是顯式的。
懶惰地開始異步
使用值為CoroutineStart.LAZY的可選參數(shù)進(jìn)行異步時(shí)有一個(gè)惰性選項(xiàng)进鸠。它僅在某些等待需要其結(jié)果或調(diào)用啟動(dòng)函數(shù)時(shí)才啟動(dòng)協(xié)同程序 。運(yùn)行以下示例形病,該示例僅與此前一個(gè)示例不同:start
fun main(args: Array<String>) = runBlocking<Unit> {
val time = measureTimeMillis {
val one = async(start = CoroutineStart.LAZY) { doSomethingUsefulOne() }
val two = async(start = CoroutineStart.LAZY) { doSomethingUsefulTwo() }
println("The answer is ${one.await() + two.await()}")
}
println("Completed in $time ms")
}
它產(chǎn)生這樣的東西:
The answer is 42
Completed in 2017 ms
所以客年,我們回到順序執(zhí)行,因?yàn)槲覀兪紫葐?dòng)并等待one漠吻,然后啟動(dòng)并等待two量瓜。它不是懶惰的預(yù)期用例。lazy在計(jì)算值涉及暫停函數(shù)的情況下途乃,它被設(shè)計(jì)為標(biāo)準(zhǔn)函數(shù)的替代绍傲。
異步風(fēng)格的功能
我們可以定義使用異步協(xié)同生成器調(diào)用doSomethingUsefulOne和doSomethingUsefulTwo 異步調(diào)用的異步樣式函數(shù)。使用“Async”后綴命名此類函數(shù)是一種很好的方式耍共,以突出顯示它們只啟動(dòng)異步計(jì)算并且需要使用結(jié)果延遲值來(lái)獲取結(jié)果的事實(shí)烫饼。
// somethingUsefulOneAsync的結(jié)果類型是Deferred <Int>
fun somethingUsefulOneAsync() = async {
doSomethingUsefulOne()
}
// somethingUsefulTwoAsync的結(jié)果類型是Deferred <Int>
fun somethingUsefulTwoAsync() = async {
doSomethingUsefulTwo()
}
注意,這些xxxAsync功能不是 暫停功能试读。它們可以在任何地方使用杠纵。但是,它們的使用總是意味著它們的動(dòng)作與調(diào)用代碼的異步(這里意味著并發(fā))钩骇。
以下示例顯示了它們?cè)趨f(xié)同程序之外的用法:
// note, that we don't have `runBlocking` to the right of `main` in this example
fun main(args: Array<String>) {
val time = measureTimeMillis {
// we can initiate async actions outside of a coroutine
val one = somethingUsefulOneAsync()
val two = somethingUsefulTwoAsync()
// but waiting for a result must involve either suspending or blocking.
// here we use `runBlocking { ... }` to block the main thread while waiting for the result
runBlocking {
println("The answer is ${one.await() + two.await()}")
}
}
println("Completed in $time ms")
}
協(xié)同上下文和調(diào)度員
協(xié)同程序總是在某些上下文中執(zhí)行比藻,該上下文由 在Kotlin標(biāo)準(zhǔn)庫(kù)中定義的CoroutineContext類型的值表示 。
協(xié)程上下文是一組各種元素倘屹。主要元素是我們之前見過(guò)的協(xié)同工作及其調(diào)度程序银亲,本節(jié)將對(duì)其進(jìn)行介紹。
調(diào)度員和線程
協(xié)程上下文包括一個(gè)協(xié)程調(diào)度程序(請(qǐng)參閱CoroutineDispatcher)纽匙,它確定相應(yīng)的協(xié)程用于執(zhí)行的線程务蝠。協(xié)程調(diào)度程序可以將協(xié)程執(zhí)行限制在特定線程,將其分派給線程池哄辣,或讓它無(wú)限制地運(yùn)行请梢。
所有協(xié)同構(gòu)建器(如launch和async)都接受一個(gè)可選的 CoroutineContext 參數(shù),該參數(shù)可用于顯式指定新協(xié)程和其他上下文元素的調(diào)度程序力穗。
請(qǐng)嘗試以下示例:
fun main(args: Array<String>) = runBlocking<Unit> {
val jobs = arrayListOf<Job>()
jobs += launch(Unconfined) { // not confined -- will work with main thread
println(" 'Unconfined': I'm working in thread ${Thread.currentThread().name}")
}
jobs += launch(coroutineContext) { // context of the parent, runBlocking coroutine
println("'coroutineContext': I'm working in thread ${Thread.currentThread().name}")
}
jobs += launch(CommonPool) { // will get dispatched to ForkJoinPool.commonPool (or equivalent)
println(" 'CommonPool': I'm working in thread ${Thread.currentThread().name}")
}
jobs += launch(newSingleThreadContext("MyOwnThread")) { // will get its own new thread
println(" 'newSTC': I'm working in thread ${Thread.currentThread().name}")
}
jobs.forEach { it.join() }
}
它產(chǎn)生以下輸出(可能以不同的順序):
'Unconfined': I'm working in thread main
'CommonPool': I'm working in thread ForkJoinPool.commonPool-worker-1
'newSTC': I'm working in thread MyOwnThread
'coroutineContext': I'm working in thread main
我們?cè)谇懊娌糠种惺褂玫哪J(rèn)調(diào)度程序由DefaultDispatcher表示毅弧,它等于當(dāng)前實(shí)現(xiàn)中的CommonPool。所以当窗,launch { … }是一樣的launch(DefaultDispatcher) { … }够坐,它是一樣的launch(CommonPool) { … }。
父coroutineContext和 Unconfined上下文之間的區(qū)別 將在稍后顯示。
注意元咙,newSingleThreadContext創(chuàng)建一個(gè)新線程梯影,這是一個(gè)非常昂貴的資源。在實(shí)際應(yīng)用程序中庶香,它必須在不再需要時(shí)釋放甲棍,使用close 函數(shù),或者存儲(chǔ)在頂級(jí)變量中并在整個(gè)應(yīng)用程序中重用赶掖。
無(wú)限制與受限制的調(diào)度員
該開敞協(xié)程調(diào)度員開始協(xié)程在調(diào)用線程感猛,但直到第一個(gè)懸掛點(diǎn)。暫停后奢赂,它將在線程中恢復(fù)陪白,該線程完全由調(diào)用的掛起函數(shù)確定。當(dāng)協(xié)同程序不消耗CPU時(shí)間也不更新任何局限于特定線程的共享數(shù)據(jù)(如UI)時(shí)膳灶,無(wú)限制調(diào)度程序是合適的咱士。
另一方面, coroutineContext 屬性(在任何協(xié)同程序中可用)是對(duì)此特定協(xié)同程序的上下文的引用轧钓。這樣序厉,可以繼承父上下文。特別是runBlocking協(xié)同程序的默認(rèn)調(diào)度程序僅限于調(diào)用程序線程毕箍,因此繼承它具有通過(guò)可預(yù)測(cè)的FIFO調(diào)度將執(zhí)行限制在此線程的效果脂矫。
fun main(args: Array<String>) = runBlocking<Unit> {
val jobs = arrayListOf<Job>()
jobs += launch(Unconfined) { // not confined -- will work with main thread
println(" 'Unconfined': I'm working in thread ${Thread.currentThread().name}")
delay(500)
println(" 'Unconfined': After delay in thread ${Thread.currentThread().name}")
}
jobs += launch(coroutineContext) { // context of the parent, runBlocking coroutine
println("'coroutineContext': I'm working in thread ${Thread.currentThread().name}")
delay(1000)
println("'coroutineContext': After delay in thread ${Thread.currentThread().name}")
}
jobs.forEach { it.join() }
}
所以,這繼承了協(xié)程coroutineContext的runBlocking {…}繼續(xù)在執(zhí)行main線程霉晕,而不受限制一個(gè)曾在默認(rèn)執(zhí)行線程重新恢復(fù)延遲 功能使用庭再。
調(diào)試協(xié)程和線程
協(xié)同程序可以暫停在一個(gè)線程,并恢復(fù)與另一個(gè)線程開敞調(diào)度員或默認(rèn)多線程調(diào)度牺堰。即使使用單線程調(diào)度程序拄轻,也可能很難弄清楚協(xié)程正在做什么,何時(shí)何地伟葫。使用線程調(diào)試應(yīng)用程序的常用方法是在每個(gè)日志語(yǔ)句的日志文件中打印線程名稱恨搓。日志框架普遍支持此功能。使用協(xié)同程序時(shí)筏养,單獨(dú)的線程名稱不會(huì)給出很多上下文斧抱,因此 kotlinx.coroutines包括調(diào)試工具以使其更容易。
使用-Dkotlinx.coroutines.debugJVM選項(xiàng)運(yùn)行以下代碼:
fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
fun main(args: Array<String>) = runBlocking<Unit> {
val a = async(coroutineContext) {
log("I'm computing a piece of the answer")
6
}
val b = async(coroutineContext) {
log("I'm computing another piece of the answer")
7
}
log("The answer is ${a.await() * b.await()}")
}
有三個(gè)協(xié)同程序渐溶。主協(xié)程(#1) - runBlocking一個(gè)和兩個(gè)協(xié)程計(jì)算延遲值a(#2)和b(#3)辉浦。它們都在上下文中執(zhí)行,runBlocking并且僅限于主線程茎辐。此代碼的輸出是:
[main @coroutine#2] I'm computing a piece of the answer
[main @coroutine#3] I'm computing another piece of the answer
[main @coroutine#1] The answer is 42
該log函數(shù)在方括號(hào)中打印線程的名稱宪郊,您可以看到它是main 線程掂恕,但是當(dāng)前正在執(zhí)行的協(xié)程的標(biāo)識(shí)符被附加到它。打開調(diào)試模式時(shí)弛槐,會(huì)將此標(biāo)識(shí)符連續(xù)分配給所有已創(chuàng)建的協(xié)同程序懊亡。
您可以在newCoroutineContext函數(shù)的文檔中閱讀有關(guān)調(diào)試工具的更多信息。
在線程之間跳轉(zhuǎn)
使用 -Dkotlinx.coroutines.debug JVM選項(xiàng)運(yùn)行以下代碼:
fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
fun main(args: Array<String>) {
newSingleThreadContext("Ctx1").use { ctx1 ->
newSingleThreadContext("Ctx2").use { ctx2 ->
runBlocking(ctx1) {
log("Started in ctx1")
withContext(ctx2) {
log("Working in ctx2")
}
log("Back to ctx1")
}
}
}
}
它演示了幾種新技術(shù)乎串。一個(gè)是使用帶有明確指定上下文的runBlocking店枣,另一個(gè)是使用withContext函數(shù)來(lái)更改協(xié)程的上下文,同時(shí)仍然保持在下面的輸出中可以看到的相同協(xié)程:
[Ctx1 @coroutine#1] Started in ctx1
[Ctx2 @coroutine#1] Working in ctx2
[Ctx1 @coroutine#1] Back to ctx1
請(qǐng)注意叹誉,此示例還使用useKotlin標(biāo)準(zhǔn)庫(kù)中的函數(shù)來(lái)釋放在不再需要時(shí)使用newSingleThreadContext創(chuàng)建的線程艰争。
工作在上下文中
協(xié)程的工作是其背景的一部分。協(xié)程可以使用coroutineContext[Job]表達(dá)式從其自己的上下文中檢索它:
fun main(args: Array<String>) = runBlocking<Unit> {
println("My job is ${coroutineContext[Job]}")
}
在調(diào)試模式下運(yùn)行時(shí)會(huì)產(chǎn)生類似的東西:
My job is "coroutine#1":BlockingCoroutine{Active}@6d311334
因此桂对,isActive在CoroutineScope僅僅是一個(gè)方便快捷 coroutineContext[Job]?.isActive == true。
子協(xié)程
當(dāng) coroutineContext 協(xié)程的用于啟動(dòng)另一個(gè)協(xié)程鸠匀,該工作新協(xié)程成為孩子的家長(zhǎng)協(xié)程的工作蕉斜。當(dāng)父協(xié)程被取消時(shí),它的所有子節(jié)點(diǎn)也會(huì)被遞歸取消缀棍。
fun main(args: Array<String>) = runBlocking<Unit> {
// launch a coroutine to process some kind of incoming request
val request = launch {
// it spawns two other jobs, one with its separate context
val job1 = launch {
println("job1: I have my own context and execute independently!")
delay(1000)
println("job1: I am not affected by cancellation of the request")
}
// and the other inherits the parent context
val job2 = launch(coroutineContext) {
delay(100)
println("job2: I am a child of the request coroutine")
delay(1000)
println("job2: I will not execute this line if my parent request is cancelled")
}
// request completes when both its sub-jobs complete:
job1.join()
job2.join()
}
delay(500)
request.cancel() // cancel processing of the request
delay(1000) // delay a second to see what happens
println("main: Who has survived request cancellation?")
}
此代碼的輸出是:
job1: I have my own context and execute independently!
job2: I am a child of the request coroutine
job1: I am not affected by cancellation of the request
main: Who has survived request cancellation?
結(jié)合上下文
可以使用+運(yùn)算符組合協(xié)程上下文宅此。右側(cè)的上下文替換了左側(cè)上下文的相關(guān)條目。例如爬范,可以繼承父協(xié)程的Job父腕,同時(shí)替換其調(diào)度程序:
fun main(args: Array<String>) = runBlocking<Unit> {
// start a coroutine to process some kind of incoming request
val request = launch(coroutineContext) { // use the context of `runBlocking`
// spawns CPU-intensive child job in CommonPool !!!
val job = launch(coroutineContext + CommonPool) {
println("job: I am a child of the request coroutine, but with a different dispatcher")
delay(1000)
println("job: I will not execute this line if my parent request is cancelled")
}
job.join() // request completes when its sub-job completes
}
delay(500)
request.cancel() // cancel processing of the request
delay(1000) // delay a second to see what happens
println("main: Who has survived request cancellation?")
}
此代碼的預(yù)期結(jié)果是:
job: I am a child of the request coroutine, but with a different dispatcher
main: Who has survived request cancellation?
父母的責(zé)任
父協(xié)同程序總是等待所有孩子的完成。Parent不必顯式跟蹤它啟動(dòng)的所有子節(jié)點(diǎn)青瀑,也不必使用Job.join在結(jié)束時(shí)等待它們:
fun main(args: Array<String>) = runBlocking<Unit> {
// launch a coroutine to process some kind of incoming request
val request = launch {
repeat(3) { i -> // launch a few children jobs
launch(coroutineContext) {
delay((i + 1) * 200L) // variable delay 200ms, 400ms, 600ms
println("Coroutine $i is done")
}
}
println("request: I'm done and I don't explicitly join my children that are still active")
}
request.join() // wait for completion of the request, including all its children
println("Now processing of the request is complete")
}
結(jié)果將是:
request: I'm done and I don't explicitly join my children that are still active
Coroutine 0 is done
Coroutine 1 is done
Coroutine 2 is done
Now processing of the request is complete
命名協(xié)同程序以進(jìn)行調(diào)試
當(dāng)協(xié)同程序經(jīng)常記錄時(shí)璧亮,自動(dòng)分配的ID很好,您只需要關(guān)聯(lián)來(lái)自同一協(xié)程的日志記錄斥难。但是枝嘶,當(dāng)協(xié)程與特定請(qǐng)求的處理或執(zhí)行某些特定后臺(tái)任務(wù)相關(guān)聯(lián)時(shí),最好將其明確命名以用于調(diào)試目的哑诊。 CoroutineName上下文元素與線程名稱具有相同的功能群扶。當(dāng)調(diào)試模式打開時(shí),它將顯示在執(zhí)行此協(xié)程的線程名稱中镀裤。
以下示例演示了此概念:
fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
fun main(args: Array<String>) = runBlocking(CoroutineName("main")) {
log("Started main coroutine")
// run two background value computations
val v1 = async(CoroutineName("v1coroutine")) {
delay(500)
log("Computing v1")
252
}
val v2 = async(CoroutineName("v2coroutine")) {
delay(1000)
log("Computing v2")
6
}
log("The answer for v1 / v2 = ${v1.await() / v2.await()}")
}
它使用-Dkotlinx.coroutines.debugJVM選項(xiàng)生成的輸出類似于:
[main @main#1] Started main coroutine
[ForkJoinPool.commonPool-worker-1 @v1coroutine#2] Computing v1
[ForkJoinPool.commonPool-worker-2 @v2coroutine#3] Computing v2
[main @main#1] The answer for v1 / v2 = 42
通過(guò)明確的工作取消
讓我們將關(guān)于上下文竞阐,child對(duì)象和工作的知識(shí)放在一起。假設(shè)我們的應(yīng)用程序有一個(gè)具有生命周期的對(duì)象暑劝,但該對(duì)象不是協(xié)程骆莹。例如,我們正在編寫一個(gè)Android應(yīng)用程序担猛,并在Android活動(dòng)的上下文中啟動(dòng)各種協(xié)同程序汪疮,以執(zhí)行異步操作以獲取和更新數(shù)據(jù)峭火,執(zhí)行動(dòng)畫等。所有這些協(xié)同程序必須在活動(dòng)被銷毀時(shí)取消智嚷,以避免內(nèi)存泄漏卖丸。
我們可以通過(guò)創(chuàng)建與我們活動(dòng)的生命周期相關(guān)聯(lián)的Job實(shí)例來(lái)管理協(xié)同程序的生命周期。使用Job()工廠函數(shù)創(chuàng)建作業(yè)實(shí)例盏道,如以下示例所示稍浆。為方便起見,launch(coroutineContext + job)我們可以編寫launch(coroutineContext, parent = job)以明確表示正在使用父作業(yè)的事實(shí)猜嘱,而不是使用表達(dá)式衅枫。
現(xiàn)在,Job.cancel的單個(gè)調(diào)用取消了我們啟動(dòng)的所有孩子朗伶。此外弦撩,Job.join等待所有這些完成,所以我們也可以在這個(gè)示例中使用cancelAndJoin:
fun main(args: Array<String>) = runBlocking<Unit> {
val job = Job() // create a job object to manage our lifecycle
// now launch ten coroutines for a demo, each working for a different time
val coroutines = List(10) { i ->
// they are all children of our job object
launch(coroutineContext, parent = job) { // we use the context of main runBlocking thread, but with our parent job
delay((i + 1) * 200L) // variable delay 200ms, 400ms, ... etc
println("Coroutine $i is done")
}
}
println("Launched ${coroutines.size} coroutines")
delay(500L) // delay for half a second
println("Cancelling the job!")
job.cancelAndJoin() // cancel all our coroutines and wait for all of them to complete
}
Launched 10 coroutines
Coroutine 0 is done
Coroutine 1 is done
Cancelling the job!
正如你所看到的论皆,只有前三個(gè)協(xié)同程序打印了一條消息益楼,而其他協(xié)同程序被一次調(diào)用取消了job.cancelAndJoin()。因此点晴,我們?cè)诩僭O(shè)的Android應(yīng)用程序中需要做的就是在創(chuàng)建活動(dòng)時(shí)創(chuàng)建父作業(yè)對(duì)象感凤,將其用于子協(xié)同程序,并在銷毀活動(dòng)時(shí)取消它粒督。我們不能join在Android生命周期的情況下使用它們陪竿,因?yàn)樗峭降模沁@種連接能力在構(gòu)建后端服務(wù)以確保有限的資源使用時(shí)非常有用屠橄。
通道
延遲值提供了在協(xié)同程序之間傳輸單個(gè)值的便捷方法族跛。管道提供了一種傳輸值流的方法。
通道的基礎(chǔ)知識(shí)
一個(gè)通道是在概念上非常相似BlockingQueue锐墙。一個(gè)關(guān)鍵的區(qū)別是庸蔼,它不是阻塞put操作,而是暫停發(fā)送贮匕,而不是阻塞take操作姐仅,它有一個(gè)暫停接收。
fun main(args: Array<String>) = runBlocking<Unit> {
val channel = Channel<Int>()
launch {
// this might be heavy CPU-consuming computation or async logic, we'll just send five squares
for (x in 1..5) channel.send(x * x)
}
// here we print five received integers:
repeat(5) { println(channel.receive()) }
println("Done!")
}
此代碼的輸出是:
1
4
9
16
25
Done!
關(guān)閉和迭代通道
與隊(duì)列不同刻盐,可以關(guān)閉通道以指示不再有元素到來(lái)掏膏。在接收器端,使用常規(guī)for循環(huán)來(lái)接收來(lái)自信道的元素是方便的敦锌。
從概念上講馒疹,關(guān)閉就像向通道發(fā)送特殊的關(guān)閉令牌。一旦收到此關(guān)閉令牌乙墙,迭代就會(huì)停止颖变,因此可以保證收到關(guān)閉前所有先前發(fā)送的元素:
fun main(args: Array<String>) = runBlocking<Unit> {
val channel = Channel<Int>()
launch {
for (x in 1..5) channel.send(x * x)
channel.close() // we're done sending
}
// here we print received values using `for` loop (until the channel is closed)
for (y in channel) println(y)
println("Done!")
}
建立渠道生產(chǎn)者
協(xié)程生成一系列元素的模式很常見生均。這是生產(chǎn)者 - 消費(fèi)者模式的一部分,通常在并發(fā)代碼中找到腥刹。您可以將這樣的生成器抽象為一個(gè)以通道作為參數(shù)的函數(shù)马胧,但這與必須從函數(shù)返回結(jié)果的常識(shí)相反。
有一個(gè)名為produce的便利協(xié)程構(gòu)建器衔峰,它可以很容易地在生產(chǎn)者端執(zhí)行佩脊,并且擴(kuò)展函數(shù)consumeEach,它取代了for消費(fèi)者端的循環(huán):
fun produceSquares() = produce<Int> {
for (x in 1..5) send(x * x)
}
fun main(args: Array<String>) = runBlocking<Unit> {
val squares = produceSquares()
squares.consumeEach { println(it) }
println("Done!")
}
管道
管道是一個(gè)協(xié)程正在生成的模式垫卤,可能是無(wú)限的值流:
fun produceNumbers() = produce<Int> {
var x = 1
while (true) send(x++) // infinite stream of integers starting from 1
}
而另一個(gè)協(xié)程或協(xié)同程序正在消耗該流威彰,進(jìn)行一些處理,并產(chǎn)生一些其他結(jié)果穴肘。在下面的例子中歇盼,數(shù)字只是平方:
fun square(numbers: ReceiveChannel<Int>) = produce<Int> {
for (x in numbers) send(x * x)
}
主代碼啟動(dòng)并連接整個(gè)管道:
fun main(args: Array<String>) = runBlocking<Unit> {
val numbers = produceNumbers() // produces integers from 1 and on
val squares = square(numbers) // squares integers
for (i in 1..5) println(squares.receive()) // print first five
println("Done!") // we are done
squares.cancel() // need to cancel these coroutines in a larger app
numbers.cancel()
}
我們不必在這個(gè)示例應(yīng)用程序中取消這些協(xié)同程序,因?yàn)?協(xié)同程序就像守護(hù)程序線程评抚,但是在更大的應(yīng)用程序中豹缀,如果我們不再需要它,我們將需要停止我們的管道盈咳。或者边翼,我們可以運(yùn)行管道協(xié)同程序作為 主協(xié)程的子代鱼响,如以下示例所示。
帶管道的素?cái)?shù)
讓我們通過(guò)一個(gè)使用協(xié)程管道生成素?cái)?shù)的例子將管道帶到極端组底。我們從無(wú)限的數(shù)字序列開始丈积。這次我們引入一個(gè)顯式context參數(shù)并將其傳遞給generate構(gòu)建器,以便調(diào)用者可以控制我們的協(xié)程運(yùn)行的位置:
fun numbersFrom(context: CoroutineContext, start: Int) = produce<Int>(context) {
var x = start
while (true) send(x++) // infinite stream of integers from start
}
以下管道階段過(guò)濾傳入的數(shù)字流债鸡,刪除可由給定素?cái)?shù)整除的所有數(shù)字:
fun filter(context: CoroutineContext, numbers: ReceiveChannel<Int>, prime: Int) = produce<Int>(context) {
for (x in numbers) if (x % prime != 0) send(x)
}
現(xiàn)在我們通過(guò)從2開始一個(gè)數(shù)字流來(lái)構(gòu)建我們的管道江滨,從當(dāng)前通道獲取素?cái)?shù),并為找到的每個(gè)素?cái)?shù)啟動(dòng)新的管道階段:
numbersFrom(2) -> filter(2) -> filter(3) -> filter(5) -> filter(7) ...
以下示例打印前十個(gè)素?cái)?shù)厌均,在主線程的上下文中運(yùn)行整個(gè)管道唬滑。由于所有協(xié)同程序都是在其coroutineContext中作為主runBlocking協(xié)程的 子進(jìn)程啟動(dòng)的,因此我們不必保留我們已經(jīng)啟動(dòng)的所有協(xié)同程序的明確列表棺弊。我們使用cancelChildren 擴(kuò)展函數(shù)取消所有子協(xié)同程序晶密。
fun main(args: Array<String>) = runBlocking<Unit> {
var cur = numbersFrom(coroutineContext, 2)
for (i in 1..10) {
val prime = cur.receive()
println(prime)
cur = filter(coroutineContext, cur, prime)
}
coroutineContext.cancelChildren() // cancel all children to let main finish
}
此代碼的輸出是:
2
3
5
7
11
13
17
19
23
29
請(qǐng)注意,您可以使用buildIterator 標(biāo)準(zhǔn)庫(kù)中的coroutine builder 來(lái)構(gòu)建相同的管道 模她。更換produce用buildIterator稻艰,send用yield,receive用next侈净, ReceiveChannel用Iterator尊勿,并擺脫上下文僧凤。你也不需要runBlocking。但是元扔,如上所示使用通道的管道的好處是躯保,如果在CommonPool上下文中運(yùn)行它,它實(shí)際上可以使用多個(gè)CPU內(nèi)核摇展。
無(wú)論如何吻氧,這是找到素?cái)?shù)的極不切實(shí)際的方法。在實(shí)踐中咏连,管道確實(shí)涉及一些其他掛起調(diào)用(如對(duì)遠(yuǎn)程服務(wù)的異步調(diào)用)盯孙,并且這些管道不能使用buildSeqeunce/ 構(gòu)建buildIterator,因?yàn)樗鼈儾辉试S任意掛起祟滴,這與produce完全異步完全不同 振惰。
扇出
多個(gè)協(xié)同程序可以從同一個(gè)通道接收,在它們之間分配工作垄懂。讓我們從生成器協(xié)程開始骑晶,它定期生成整數(shù)(每秒十個(gè)數(shù)字):
fun produceNumbers() = produce<Int> {
var x = 1 // start from 1
while (true) {
send(x++) // produce next
delay(100) // wait 0.1s
}
}
然后我們可以有幾個(gè)處理器協(xié)同程序。在這個(gè)例子中草慧,他們只打印他們的id和收到的號(hào)碼:
fun launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
for (msg in channel) {
println("Processor #$id received $msg")
}
}
現(xiàn)在讓我們啟動(dòng)五個(gè)處理器桶蛔,讓它們工作幾乎一秒鐘。走著瞧吧:
fun main(args: Array<String>) = runBlocking<Unit> {
val producer = produceNumbers()
repeat(5) { launchProcessor(it, producer) }
delay(950)
producer.cancel() // cancel producer coroutine and thus kill them all
}
輸出將類似于以下輸出漫谷,盡管接收每個(gè)特定整數(shù)的處理器ID可能不同:
Processor #2 received 1
Processor #4 received 2
Processor #0 received 3
Processor #1 received 4
Processor #3 received 5
Processor #2 received 6
Processor #4 received 7
Processor #0 received 8
Processor #1 received 9
Processor #3 received 10
注意仔雷,取消生成器協(xié)同程序會(huì)關(guān)閉其通道,從而最終終止處理器協(xié)同程序正在執(zhí)行的通道上的迭代舔示。
另外碟婆,請(qǐng)注意我們?nèi)绾问褂胒or循環(huán)顯式迭代通道以在launchProcessor代碼中執(zhí)行扇出。與consumeEach此不同惕稻,這種for循環(huán)模式可以非常安全地從多個(gè)協(xié)同程序中使用竖共。如果其中一個(gè)處理器協(xié)同程序失敗,則其他處理程序協(xié)同程序仍將處理該通道俺祠,而通過(guò)其寫入的處理器consumeEach 總是在正彻或異常終止時(shí)消耗(取消)底層通道。
扇入
多個(gè)協(xié)同程序可以發(fā)送到同一個(gè)通道蜘渣。例如妓布,讓我們有一個(gè)字符串通道和一個(gè)掛起函數(shù),它以指定的延遲重復(fù)發(fā)送指定的字符串到此通道:
suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
while (true) {
delay(time)
channel.send(s)
}
}
現(xiàn)在宋梧,讓我們看看如果我們啟動(dòng)幾個(gè)協(xié)同程序發(fā)送字符串會(huì)發(fā)生什么(在這個(gè)例子中匣沼,我們?cè)谥骶€程的上下文中將它們作為主協(xié)程的子節(jié)點(diǎn)啟動(dòng)):
fun main(args: Array<String>) = runBlocking<Unit> {
val channel = Channel<String>()
launch(coroutineContext) { sendString(channel, "foo", 200L) }
launch(coroutineContext) { sendString(channel, "BAR!", 500L) }
repeat(6) { // receive first six
println(channel.receive())
}
coroutineContext.cancelChildren() // cancel all children to let main finish
}
輸出是:
foo
foo
BAR!
foo
foo
BAR!
緩沖頻道
到目前為止顯示的通道沒(méi)有緩沖區(qū)。當(dāng)發(fā)送方和接收方彼此相遇(也稱為集合點(diǎn))時(shí)捂龄,無(wú)緩沖的信道傳輸元素释涛。如果首先調(diào)用send加叁,那么它將被掛起,直到調(diào)用receive唇撬,如果先調(diào)用receive它匕,它將被掛起,直到調(diào)用send窖认。
兩個(gè)信道()工廠函數(shù)和產(chǎn)生助洗劑采取可選的capacity參數(shù)來(lái)指定緩沖區(qū)大小豫柬。緩沖區(qū)允許發(fā)送方在掛起之前發(fā)送多個(gè)元素,類似于BlockingQueue具有指定容量的緩沖區(qū)已滿時(shí)阻塞扑浸。
看一下以下代碼的行為:
fun main(args: Array<String>) = runBlocking<Unit> {
val channel = Channel<Int>(4) // create buffered channel
val sender = launch(coroutineContext) { // launch sender coroutine
repeat(10) {
println("Sending $it") // print before sending each element
channel.send(it) // will suspend when buffer is full
}
}
// don't receive anything... just wait....
delay(1000)
sender.cancel() // cancel sender coroutine
}
它使用容量為4的緩沖通道打印“發(fā)送” 五次:
Sending 0
Sending 1
Sending 2
Sending 3
Sending 4
前四個(gè)元素被添加到緩沖區(qū)烧给,發(fā)送方在嘗試發(fā)送第五個(gè)元素時(shí)暫停。
Ticker通道
Ticker通道是一個(gè)特殊的會(huì)合通道喝噪,Unit每次從此通道上次消耗后產(chǎn)生給定的延遲通道础嫡。雖然它可能看起來(lái)沒(méi)有用,但它是一個(gè)有用的構(gòu)建塊酝惧,可以創(chuàng)建復(fù)雜的基于時(shí)間的生產(chǎn) 管道和操作員榴鼎,這些管道和操作員可以進(jìn)行窗口化和其他時(shí)間依賴的處理⊥泶剑可以在select中使用Ticker通道執(zhí)行“on tick”操作巫财。
要?jiǎng)?chuàng)建此類渠道,請(qǐng)使用工廠方法代碼哩陕。要指示不需要其他元素平项,請(qǐng)使用ReceiveChannel.cancel方法。
fun main(args: Array<String>) = runBlocking<Unit> {
val tickerChannel = ticker(delay = 100, initialDelay = 0) // create ticker channel
var nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
println("Initial element is available immediately: $nextElement") // initial delay hasn't passed yet
nextElement = withTimeoutOrNull(50) { tickerChannel.receive() } // all subsequent elements has 100ms delay
println("Next element is not ready in 50 ms: $nextElement")
nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
println("Next element is ready in 100 ms: $nextElement")
// Emulate large consumption delays
println("Consumer pauses for 150ms")
delay(150)
// Next element is available immediately
nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
println("Next element is available immediately after large consumer delay: $nextElement")
// Note that the pause between `receive` calls is taken into account and next element arrives faster
nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
println("Next element is ready in 50ms after consumer pause in 150ms: $nextElement")
tickerChannel.cancel() // indicate that no more elements are needed
}
它打印以下行:
Initial element is available immediately: kotlin.Unit
Next element is not ready in 50 ms: null
Next element is ready in 100 ms: kotlin.Unit
Consumer pauses for 150ms
Next element is available immediately after large consumer delay: kotlin.Unit
Next element is ready in 50ms after consumer pause in 150ms: kotlin.Unit
請(qǐng)注意萌踱,自動(dòng)收?qǐng)?bào)機(jī)知道可能的消費(fèi)者暫停葵礼,并且默認(rèn)情況下号阿,如果發(fā)生暫停并鸵,則調(diào)整下一個(gè)生成的元素延遲,嘗試維持生成元素的固定速率扔涧。
可選地园担,mode可以指定等于[TickerMode.FIXED_DELAY]的參數(shù)以維持元素之間的固定延遲。
渠道公平
對(duì)于從多個(gè)協(xié)同程序調(diào)用它們的順序枯夜,向通道發(fā)送和接收操作是公平的弯汰。它們以先進(jìn)先出順序提供,例如湖雹,要調(diào)用的第一個(gè)協(xié)程receive 獲取元素咏闪。在以下示例中,兩個(gè)協(xié)程“ping”和“pong”正在從共享的“table”通道接收“ball”對(duì)象摔吏。
data class Ball(var hits: Int)
fun main(args: Array<String>) = runBlocking<Unit> {
val table = Channel<Ball>() // a shared table
launch(coroutineContext) { player("ping", table) }
launch(coroutineContext) { player("pong", table) }
table.send(Ball(0)) // serve the ball
delay(1000) // delay 1 second
coroutineContext.cancelChildren() // game over, cancel them
}
suspend fun player(name: String, table: Channel<Ball>) {
for (ball in table) { // receive the ball in a loop
ball.hits++
println("$name $ball")
delay(300) // wait a bit
table.send(ball) // send the ball back
}
}
“ping”協(xié)程首先啟動(dòng)鸽嫂,因此它是第一個(gè)接收球的人纵装。即使“ping”coroutine在將球送回桌面后立即再次接球,球也會(huì)被“pong”協(xié)程接收据某,因?yàn)樗呀?jīng)在等待它了:
ping Ball(hits=1)
pong Ball(hits=2)
ping Ball(hits=3)
pong Ball(hits=4)
請(qǐng)注意橡娄,由于正在使用的執(zhí)行程序的性質(zhì),有時(shí)通道可能會(huì)產(chǎn)生看起來(lái)不公平的執(zhí)行癣籽。有關(guān)詳細(xì)信息挽唉,請(qǐng)參閱此問(wèn)
共享可變狀態(tài)和并發(fā)
可以使用多線程調(diào)度程序(如默認(rèn)的CommonPool)同時(shí)執(zhí)行協(xié)同程序。它提出了所有常見的并發(fā)問(wèn)題筷狼。主要問(wèn)題是同步訪問(wèn)共享可變狀態(tài)瓶籽。在協(xié)同程序領(lǐng)域,這個(gè)問(wèn)題的一些解決方案類似于多線程世界中的解決方案桑逝,但其他解決方案是獨(dú)一無(wú)二的棘劣。
問(wèn)題
讓我們推出一千個(gè)協(xié)同程序,它們都做了一千次相同的動(dòng)作(總計(jì)一百萬(wàn)次執(zhí)行)楞遏。我們還將測(cè)量完成時(shí)間以進(jìn)行進(jìn)一步比較:
suspend fun massiveRun(context: CoroutineContext, action: suspend () -> Unit) {
val n = 1000 // number of coroutines to launch
val k = 1000 // times an action is repeated by each coroutine
val time = measureTimeMillis {
val jobs = List(n) {
launch(context) {
repeat(k) { action() }
}
}
jobs.forEach { it.join() }
}
println("Completed ${n * k} actions in $time ms")
}
我們從一個(gè)非常簡(jiǎn)單的操作開始茬暇,該操作使用多線程CommonPool上下文來(lái)增加共享的可變變量。
var counter = 0
fun main(args: Array<String>) = runBlocking<Unit> {
massiveRun(CommonPool) {
counter++
}
println("Counter = $counter")
}
最后打印什么?它不太可能打印“Counter = 1000000”巧骚,因?yàn)橐磺€(gè)協(xié)程counter從多個(gè)線程同時(shí)增加而沒(méi)有任何同步格二。
注意:如果您的舊系統(tǒng)具有2個(gè)或更少的CPU劈彪,那么您將始終看到1000000,因?yàn)?CommonPool在這種情況下僅在一個(gè)線程中運(yùn)行长窄。要重現(xiàn)此問(wèn)題滔吠,您需要進(jìn)行以下更改:
val mtContext = newFixedThreadPoolContext(2, "mtPool") // explicitly define context with two threads
var counter = 0
fun main(args: Array<String>) = runBlocking<Unit> {
massiveRun(mtContext) { // use it instead of CommonPool in this sample and below
counter++
}
println("Counter = $counter")
}
最后打印什么?它不太可能打印“Counter = 1000000”疮绷,因?yàn)橐磺€(gè)協(xié)程counter從多個(gè)線程同時(shí)增加而沒(méi)有任何同步。
注意:如果您的舊系統(tǒng)具有2個(gè)或更少的CPU驳阎,那么您將始終看到1000000金矛,因?yàn)?CommonPool在這種情況下僅在一個(gè)線程中運(yùn)行肩榕。要重現(xiàn)此問(wèn)題勃刨,您需要進(jìn)行以下更改:
val mtContext = newFixedThreadPoolContext(2, "mtPool") // explicitly define context with two threads
var counter = 0
fun main(args: Array<String>) = runBlocking<Unit> {
massiveRun(mtContext) { // use it instead of CommonPool in this sample and below
counter++
}
println("Counter = $counter")
}
Volatiles 沒(méi)有任何幫助
有一個(gè)常見的誤解是,使變量volatile解決了并發(fā)問(wèn)題股淡。讓我們?cè)囈辉嚕?/p>
@Volatile // in Kotlin `volatile` is an annotation
var counter = 0
fun main(args: Array<String>) = runBlocking<Unit> {
massiveRun(CommonPool) {
counter++
}
println("Counter = $counter")
}
這段代碼運(yùn)行速度較慢身隐,但我們?nèi)匀粵](méi)有得到“Counter = 1000000”,因?yàn)関olatile變量保證可線性化(這是“原子”的技術(shù)術(shù)語(yǔ))讀取和寫入相應(yīng)的變量唯灵,但不提供原子性較大的行動(dòng)(在我們的案例中增加)
線程安全的數(shù)據(jù)結(jié)構(gòu)
適用于線程和協(xié)同程序的通用解決方案是使用線程安全(也稱為同步贾铝,可線性化或原子)數(shù)據(jù)結(jié)構(gòu),該數(shù)據(jù)結(jié)構(gòu)為需要在共享狀態(tài)上執(zhí)行的相應(yīng)操作提供所有必需的同步埠帕。在簡(jiǎn)單計(jì)數(shù)器的情況下线衫,我們可以使用AtomicInteger具有原子incrementAndGet操作的類:
var counter = AtomicInteger()
fun main(args: Array<String>) = runBlocking<Unit> {
massiveRun(CommonPool) {
counter.incrementAndGet()
}
println("Counter = ${counter.get()}")
}
這是針對(duì)此特定問(wèn)題的最快解決方案。它適用于普通計(jì)數(shù)器朗若,集合满葛,隊(duì)列和其他標(biāo)準(zhǔn)數(shù)據(jù)結(jié)構(gòu)以及它們的基本操作。但是琐驴,它不容易擴(kuò)展到復(fù)雜狀態(tài)或沒(méi)有現(xiàn)成的線程安全實(shí)現(xiàn)的復(fù)雜操作俘种。
線程限制細(xì)粒度
線程限制是解決共享可變狀態(tài)問(wèn)題的一種方法,其中對(duì)特定共享狀態(tài)的所有訪問(wèn)都限于單個(gè)線程绝淡。它通常用于UI應(yīng)用程序宙刘,其中所有UI狀態(tài)都局限于單個(gè)事件派發(fā)/應(yīng)用程序線程。使用
單線程上下文很容易應(yīng)用協(xié)同程序:
val counterContext = newSingleThreadContext("CounterContext")
var counter = 0
fun main(args: Array<String>) = runBlocking<Unit> {
massiveRun(CommonPool) { // run each coroutine in CommonPool
withContext(counterContext) { // but confine each increment to the single-threaded context
counter++
}
}
println("Counter = $counter")
}
此代碼的工作速度非常慢牢酵,因?yàn)樗梢赃M(jìn)行細(xì)粒度的線程限制悬包。每個(gè)增量CommonPool使用withContext塊從多線程上下文切換到單線程上下文。
線程限制粗粒度
實(shí)際上馍乙,線程限制是在大塊中執(zhí)行的布近,例如,大塊狀態(tài)更新業(yè)務(wù)邏輯僅限于單個(gè)線程丝格。下面的示例就是這樣撑瞧,在單線程上下文中運(yùn)行每個(gè)協(xié)程開始。
val counterContext = newSingleThreadContext("CounterContext")
var counter = 0
fun main(args: Array<String>) = runBlocking<Unit> {
massiveRun(counterContext) { // run each coroutine in the single-threaded context
counter++
}
println("Counter = $counter")
}
這現(xiàn)在可以更快地運(yùn)行并產(chǎn)生正確的結(jié)果显蝌。
相互排斥
該問(wèn)題的相互排除解決方案 是使用永遠(yuǎn)不會(huì)同時(shí)執(zhí)行的關(guān)鍵部分來(lái)保護(hù)共享狀態(tài)的所有修改预伺。在一個(gè)阻塞的世界中,你通常會(huì)使用synchronized或ReentrantLock為此而使用。Coroutine的替代品叫做Mutex酬诀。它具有鎖定和解鎖功能脏嚷,可以分隔關(guān)鍵部分。關(guān)鍵的區(qū)別在于它Mutex.lock()是一個(gè)暫停功能瞒御。它不會(huì)阻塞線程父叙。
還有withLock擴(kuò)展功能,方便代表 mutex.lock(); try { … } finally { mutex.unlock() }模式:
val mutex = Mutex()
var counter = 0
fun main(args: Array<String>) = runBlocking<Unit> {
massiveRun(CommonPool) {
mutex.withLock {
counter++
}
}
println("Counter = $counter")
}
此示例中的鎖定是細(xì)粒度的肴裙,因此它付出了代價(jià)高每。但是,對(duì)于某些必須定期修改某些共享狀態(tài)的情況践宴,它是一個(gè)不錯(cuò)的選擇鲸匿,但是沒(méi)有自然線程可以限制此狀態(tài)。
Actors
的演員是由一個(gè)協(xié)程阻肩,即被限制和封裝到該協(xié)程的狀態(tài)下带欢,并與其他協(xié)同程序進(jìn)行通信的信道的組合的實(shí)體。一個(gè)簡(jiǎn)單的actor可以寫成一個(gè)函數(shù)烤惊,但是一個(gè)具有復(fù)雜狀態(tài)的actor更適合一個(gè)類乔煞。
有一個(gè)actor協(xié)程構(gòu)建器,它可以方便地將actor的郵箱通道組合到其作用域中柒室,以便從發(fā)送通道接收消息并將其組合到生成的作業(yè)對(duì)象中渡贾,這樣對(duì)actor的單個(gè)引用就可以作為其句柄攜帶。
使用actor的第一步是定義一個(gè)actor要處理的消息類雄右。Kotlin的密封課程非常適合這個(gè)目的空骚。我們CounterMsg使用IncCounter消息定義密封類以增加計(jì)數(shù)器和GetCounter消息以獲取其值。后者需要發(fā)送回復(fù)擂仍。甲CompletableDeferred通信原碼囤屹,即表示將在將來(lái)已知的(傳送)一個(gè)單一的值,在這里用于該目的逢渔。
sealed class CounterMsg
object IncCounter : CounterMsg() // one-way message to increment counter
class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg() // a request with reply
然后我們定義一個(gè)使用actor coroutine builder 啟動(dòng)actor的函數(shù):
// This function launches a new counter actor
fun counterActor() = actor<CounterMsg> {
var counter = 0 // actor state
for (msg in channel) { // iterate over incoming messages
when (msg) {
is IncCounter -> counter++
is GetCounter -> msg.response.complete(counter)
}
}
}
主要代碼很簡(jiǎn)單:
fun main(args: Array<String>) = runBlocking<Unit> {
val counter = counterActor() // create the actor
massiveRun(CommonPool) {
counter.send(IncCounter)
}
// send a message to get a counter value from an actor
val response = CompletableDeferred<Int>()
counter.send(GetCounter(response))
println("Counter = ${response.await()}")
counter.close() // shutdown the actor
}
執(zhí)行者本身執(zhí)行的上下文無(wú)關(guān)緊要(正確性)肋坚。一個(gè)actor是一個(gè)協(xié)程,一個(gè)協(xié)同程序按順序執(zhí)行肃廓,因此將狀態(tài)限制到特定協(xié)程可以解決共享可變狀態(tài)的問(wèn)題智厌。實(shí)際上,演員可以修改自己的私有狀態(tài)盲赊,但只能通過(guò)消息相互影響(避免任何鎖定)铣鹏。
Actor比在負(fù)載下鎖定更有效,因?yàn)樵谶@種情況下它總是有工作要做角钩,而且根本不需要切換到不同的上下文吝沫。
注意,actor協(xié)程構(gòu)建器是產(chǎn)品協(xié)同程序構(gòu)建器的雙重構(gòu)件递礼。一個(gè)actor與它接收消息的頻道相關(guān)聯(lián)惨险,而一個(gè)制作者與它發(fā)送元素的頻道相關(guān)聯(lián)。
選擇表達(dá)式
選擇表達(dá)式可以同時(shí)等待多個(gè)掛起函數(shù)脊髓,并選擇 第一個(gè)可用的掛起函數(shù)辫愉。
從頻道中選擇
讓我們有兩個(gè)字符串生成器:fizz和buzz。該fizz生產(chǎn)“菲斯”串每300毫秒:
fun fizz(context: CoroutineContext) = produce<String>(context) {
while (true) { // sends "Fizz" every 300 ms
delay(300)
send("Fizz")
}
}
而buzz產(chǎn)品“Buzz将硝!” 字符串每500毫秒:
fun buzz(context: CoroutineContext) = produce<String>(context) {
while (true) { // sends "Buzz!" every 500 ms
delay(500)
send("Buzz!")
}
}
使用接收暫停功能恭朗,我們可以接收任一從一個(gè)通道或其他。但select表達(dá)式允許我們同時(shí)使用其 onReceive子句從兩者接收:
suspend fun selectFizzBuzz(fizz: ReceiveChannel<String>, buzz: ReceiveChannel<String>) {
select<Unit> { // <Unit> means that this select expression does not produce any result
fizz.onReceive { value -> // this is the first select clause
println("fizz -> '$value'")
}
buzz.onReceive { value -> // this is the second select clause
println("buzz -> '$value'")
}
}
}
讓我們一起運(yùn)行七次:
fun main(args: Array<String>) = runBlocking<Unit> {
val fizz = fizz(coroutineContext)
val buzz = buzz(coroutineContext)
repeat(7) {
selectFizzBuzz(fizz, buzz)
}
coroutineContext.cancelChildren() // cancel fizz & buzz coroutines
}
這段代碼的結(jié)果是:
fizz -> 'Fizz'
buzz -> 'Buzz!'
fizz -> 'Fizz'
fizz -> 'Fizz'
buzz -> 'Buzz!'
fizz -> 'Fizz'
buzz -> 'Buzz!'
選擇關(guān)閉
所述的onReceive條款select當(dāng)信道被關(guān)閉引起相應(yīng)失敗 select拋出異常依疼。我們可以使用onReceiveOrNull子句在關(guān)閉通道時(shí)執(zhí)行特定操作痰腮。以下示例還顯示該select表達(dá)式返回其所選子句的結(jié)果:
suspend fun selectAorB(a: ReceiveChannel<String>, b: ReceiveChannel<String>): String =
select<String> {
a.onReceiveOrNull { value ->
if (value == null)
"Channel 'a' is closed"
else
"a -> '$value'"
}
b.onReceiveOrNull { value ->
if (value == null)
"Channel 'b' is closed"
else
"b -> '$value'"
}
}
讓我們使用它a產(chǎn)生“Hello”字符串四次的頻道b和產(chǎn)生“世界”四次的頻道:
fun main(args: Array<String>) = runBlocking<Unit> {
// we are using the context of the main thread in this example for predictability ...
val a = produce<String>(coroutineContext) {
repeat(4) { send("Hello $it") }
}
val b = produce<String>(coroutineContext) {
repeat(4) { send("World $it") }
}
repeat(8) { // print first eight results
println(selectAorB(a, b))
}
coroutineContext.cancelChildren()
}
這段代碼的結(jié)果非常有趣,所以我們將在模式細(xì)節(jié)中分析它:
a -> 'Hello 0'
a -> 'Hello 1'
b -> 'World 0'
a -> 'Hello 2'
a -> 'Hello 3'
b -> 'World 1'
Channel 'a' is closed
Channel 'a' is closed
有幾個(gè)觀察要做出來(lái)律罢。
首先膀值,select是偏向于第一條。當(dāng)可以同時(shí)選擇多個(gè)子句時(shí)误辑,其中的第一個(gè)子句將被選中沧踏。在這里,兩個(gè)通道都在不斷地產(chǎn)生字符串巾钉,因此a作為select中的第一個(gè)子句的channel獲勝翘狱。但是,因?yàn)槲覀兪褂玫氖菬o(wú)緩沖通道砰苍,所以a它的發(fā)送調(diào)用會(huì)不時(shí)地暫停潦匈,并且也有機(jī)會(huì)b發(fā)送。
第二個(gè)觀察結(jié)果是赚导,當(dāng)通道已經(jīng)關(guān)閉時(shí)历等,會(huì)立即選擇onReceiveOrNull。
選擇發(fā)送
選擇表達(dá)式具有onSend子句辟癌,可以與選擇的偏見性結(jié)合使用寒屯。
讓我們編寫一個(gè)整數(shù)生成器的示例,side當(dāng)主要通道上的消費(fèi)者無(wú)法跟上它時(shí)黍少,它會(huì)將其值發(fā)送到通道:
fun produceNumbers(context: CoroutineContext, side: SendChannel<Int>) = produce<Int>(context) {
for (num in 1..10) { // produce 10 numbers from 1 to 10
delay(100) // every 100 ms
select<Unit> {
onSend(num) {} // Send to the primary channel
side.onSend(num) {} // or to the side channel
}
}
}
消費(fèi)者將會(huì)非常緩慢寡夹,需要250毫秒才能處理每個(gè)號(hào)碼:
fun main(args: Array<String>) = runBlocking<Unit> {
val side = Channel<Int>() // allocate side channel
launch(coroutineContext) { // this is a very fast consumer for the side channel
side.consumeEach { println("Side channel has $it") }
}
produceNumbers(coroutineContext, side).consumeEach {
println("Consuming $it")
delay(250) // let us digest the consumed number properly, do not hurry
}
println("Done consuming")
coroutineContext.cancelChildren()
}
那么讓我們看看會(huì)發(fā)生什么:
Consuming 1
Side channel has 2
Side channel has 3
Consuming 4
Side channel has 5
Side channel has 6
Consuming 7
Side channel has 8
Side channel has 9
Consuming 10
Done consuming
選擇延期值
可以使用onAwait子句選擇延遲值。讓我們從一個(gè)異步函數(shù)開始厂置,該函數(shù)在隨機(jī)延遲后返回一個(gè)延遲字符串值:
fun asyncString(time: Int) = async {
delay(time.toLong())
"Waited for $time ms"
}
讓我們隨機(jī)延遲開始十幾個(gè)菩掏。
fun asyncStringsList(): List<Deferred<String>> {
val random = Random(3)
return List(12) { asyncString(random.nextInt(1000)) }
}
現(xiàn)在,主函數(shù)等待第一個(gè)函數(shù)完成并計(jì)算仍處于活動(dòng)狀態(tài)的延遲值的數(shù)量昵济。注意智绸,我們?cè)谶@里使用的select表達(dá)式是Kotlin DSL野揪,因此我們可以使用任意代碼為它提供子句。在這種情況下瞧栗,我們遍歷一個(gè)延遲值列表斯稳,onAwait為每個(gè)延遲值提供子句。
fun main(args: Array<String>) = runBlocking<Unit> {
val list = asyncStringsList()
val result = select<String> {
list.withIndex().forEach { (index, deferred) ->
deferred.onAwait { answer ->
"Deferred $index produced answer '$answer'"
}
}
}
println(result)
val countActive = list.count { it.isActive }
println("$countActive coroutines are still active")
}
輸出是:
Deferred 4 produced answer 'Waited for 128 ms'
11 coroutines are still active
切換延遲值的通道
讓我們編寫一個(gè)使用延遲字符串值通道的通道生成器函數(shù)迹恐,等待每個(gè)接收的延遲值挣惰,但只有在下一個(gè)延遲值結(jié)束或通道關(guān)閉之前。這個(gè)例子將onReceiveOrNull和onAwait子句放在一起 select:
fun switchMapDeferreds(input: ReceiveChannel<Deferred<String>>) = produce<String> {
var current = input.receive() // start with first received deferred value
while (isActive) { // loop while not cancelled/closed
val next = select<Deferred<String>?> { // return next deferred value from this select or null
input.onReceiveOrNull { update ->
update // replaces next value to wait
}
current.onAwait { value ->
send(value) // send value that current deferred has produced
input.receiveOrNull() // and use the next deferred from the input channel
}
}
if (next == null) {
println("Channel was closed")
break // out of loop
} else {
current = next
}
}
}
為了測(cè)試它殴边,我們將使用一個(gè)簡(jiǎn)單的異步函數(shù)憎茂,它在指定的時(shí)間后解析為指定的字符串:
fun asyncString(str: String, time: Long) = async {
delay(time)
str
}
main函數(shù)只是啟動(dòng)一個(gè)協(xié)程來(lái)打印結(jié)果switchMapDeferreds并向它發(fā)送一些測(cè)試數(shù)據(jù):
fun main(args: Array<String>) = runBlocking<Unit> {
val chan = Channel<Deferred<String>>() // the channel for test
launch(coroutineContext) { // launch printing coroutine
for (s in switchMapDeferreds(chan))
println(s) // print each received string
}
chan.send(asyncString("BEGIN", 100))
delay(200) // enough time for "BEGIN" to be produced
chan.send(asyncString("Slow", 500))
delay(100) // not enough time to produce slow
chan.send(asyncString("Replace", 100))
delay(500) // give it time before the last one
chan.send(asyncString("END", 500))
delay(1000) // give it time to process
chan.close() // close the channel ...
delay(500) // and wait some time to let it finish
}
這段代碼的結(jié)果:
BEGIN
Replace
END
Channel was closed
Read The Fucking Source