[TOC]
簡(jiǎn)介
Coroutines are computer program components that generalize subroutines for non-preemptive multitasking, by allowing execution to be suspended and resumed. Coroutines are well-suited for implementing familiar program components such as cooperative tasks, exceptions, event loops, iterators, infinite lists and pipes.
協(xié)程(英語(yǔ):coroutine)是計(jì)算機(jī)程序的一類(lèi)組件颠悬,推廣了協(xié)作式多任務(wù)的子程序焊傅,允許執(zhí)行被掛起與被恢復(fù)台囱。相對(duì)子例程而言泳唠,協(xié)程更為一般和靈活仑氛,但在實(shí)踐中使用沒(méi)有子例程那樣廣泛抑堡。協(xié)程更適合于用來(lái)實(shí)現(xiàn)彼此熟悉的程序組件站玄,如協(xié)作式多任務(wù)、異常處理包券、事件循環(huán)纫谅、迭代器、無(wú)限列表和管道溅固。
簡(jiǎn)單來(lái)講,協(xié)程是一種輕量級(jí)線(xiàn)程侍郭。
相對(duì)于線(xiàn)程切換是由操作系統(tǒng)進(jìn)行調(diào)度的询吴,程序員無(wú)法進(jìn)行控制掠河。
而協(xié)程的調(diào)度是由程序員在代碼層面上進(jìn)行控制的,程序員可以通過(guò)控制suspend
函數(shù)的掛起和恢復(fù)猛计,從而控制程序運(yùn)行流程唠摹,這在代碼的展示上,相當(dāng)于用同步的代碼書(shū)寫(xiě)異步程序有滑,代碼邏輯非常簡(jiǎn)潔易懂。
理論上嵌削,由于協(xié)程不涉及到操作系統(tǒng)調(diào)度毛好,因此只是在用戶(hù)態(tài)上進(jìn)行操作,而線(xiàn)程需要經(jīng)歷用戶(hù)態(tài)與內(nèi)核態(tài)之間的切換苛秕,所以協(xié)程性能更佳肌访。
但是不同的語(yǔ)言在實(shí)現(xiàn)上可能存在差異,比如本文所要介紹的 Kotlin 下的協(xié)程艇劫,其在 JVM 平臺(tái)上的內(nèi)部實(shí)現(xiàn)也是基于線(xiàn)程吼驶,因此如果其進(jìn)行了協(xié)程調(diào)度,存在一定的可能是進(jìn)行了線(xiàn)程切換店煞。
協(xié)程經(jīng)常拿來(lái)與線(xiàn)程進(jìn)行對(duì)比蟹演,它們彼此很相似,但是也很不同顷蟀。
可以簡(jiǎn)單理解如下:
- 一個(gè)進(jìn)程可以包含多個(gè)線(xiàn)程
- 一個(gè)線(xiàn)程可以包含多個(gè)協(xié)程
由于一個(gè)線(xiàn)程可以包含多個(gè)協(xié)程酒请,而協(xié)程具備掛起和恢復(fù)功能,也因此讓我們具備了在一個(gè)線(xiàn)程上執(zhí)行多個(gè)異步任務(wù)的能力鸣个。
注:還是如上文所言羞反,Kotlin 協(xié)程的底層實(shí)現(xiàn)存在線(xiàn)程切換,因此異步任務(wù)可能執(zhí)行在另一條線(xiàn)程上囤萤。
名詞釋義
在具體介紹協(xié)程之前昼窗,需要先了解一下以下幾個(gè)概念:
同步:執(zhí)行一個(gè)任務(wù)時(shí),調(diào)用者調(diào)用后即可獲取返回結(jié)果
異步:執(zhí)行一個(gè)任務(wù)時(shí)涛舍,調(diào)用者調(diào)用后直接返回澄惊,不關(guān)心結(jié)果,而是等到任務(wù)結(jié)束時(shí)富雅,通過(guò)回調(diào)等方式通知調(diào)用者結(jié)果
同步與異步是針對(duì)返回結(jié)果來(lái)說(shuō)的缤削,
對(duì)于同步調(diào)用,由調(diào)用者主動(dòng)獲取結(jié)果吹榴。
對(duì)于異步調(diào)用亭敢,調(diào)用者是通過(guò)回調(diào)等方式被動(dòng)獲取結(jié)果的。
簡(jiǎn)單理解图筹,比如對(duì)于一個(gè)函數(shù)調(diào)用帅刀,
同步調(diào)用就是調(diào)用函數(shù)后让腹,直接就可以獲取結(jié)果。
異步調(diào)用就是調(diào)用函數(shù)后扣溺,不關(guān)心結(jié)果骇窍,等函數(shù)體內(nèi)的任務(wù)結(jié)束時(shí),通過(guò)回調(diào)等方式通知調(diào)用者結(jié)果锥余。
阻塞:執(zhí)行一個(gè)任務(wù)時(shí)腹纳,當(dāng)前調(diào)用線(xiàn)程調(diào)用后立即被掛起,無(wú)法執(zhí)行后續(xù)代碼
非阻塞:執(zhí)行一個(gè)任務(wù)時(shí)驱犹,當(dāng)前調(diào)用線(xiàn)程調(diào)用后立即返回嘲恍,可繼續(xù)執(zhí)行后續(xù)代碼
阻塞和非阻塞是針對(duì)當(dāng)前線(xiàn)程是否具備 CPU 執(zhí)行權(quán)來(lái)說(shuō)的,
對(duì)于阻塞調(diào)用雄驹,調(diào)用不立即返回佃牛,當(dāng)前線(xiàn)程被掛起,失去 CPU 執(zhí)行權(quán)医舆,直至調(diào)用任務(wù)完成俘侠,返回結(jié)果。
對(duì)于非阻塞調(diào)用蔬将,調(diào)用立即返回爷速,當(dāng)前線(xiàn)程仍然擁有 CPU 執(zhí)行權(quán),可繼續(xù)執(zhí)行后續(xù)代碼霞怀。
注:可以講上述描述中的 任務(wù) 理解為 函數(shù)遍希,更加直觀。
協(xié)程涉及到的一些概念
-
掛起函數(shù):即
suspend
函數(shù)里烦,如下代碼所示即為一個(gè)掛起函數(shù):suspend fun delay(timeMillis: Long): Unit{ ... }
Kotlin 中凿蒜,
suspend
關(guān)鍵字具備如下幾層含義:- Kotlin 中規(guī)定:掛起函數(shù)只能在協(xié)程或者其他
suspend
函數(shù)中使用,其實(shí)就相當(dāng)于掛起函數(shù)只能直接或間接地在協(xié)程中進(jìn)行調(diào)用胁黑。 -
suspend
關(guān)鍵字只是起一個(gè)標(biāo)識(shí)作用废封,用以表明被suspend
修飾的函數(shù)(也即掛起函數(shù))內(nèi)部存在耗時(shí)操作,因此必須放置在協(xié)程中進(jìn)行調(diào)用丧蘸。 -
suspend
關(guān)鍵字標(biāo)識(shí)一個(gè)掛起點(diǎn)漂洋,掛起點(diǎn)具備掛起和恢復(fù)執(zhí)行作用。當(dāng)協(xié)程調(diào)用suspend
函數(shù)時(shí)力喷,會(huì)掛起當(dāng)前協(xié)程刽漂,開(kāi)啟一個(gè)異步任務(wù),當(dāng)異步任務(wù)完成后弟孟,就會(huì)在當(dāng)前掛起點(diǎn)恢復(fù)協(xié)程贝咙,繼續(xù)該協(xié)程后續(xù)任務(wù)。
- Kotlin 中規(guī)定:掛起函數(shù)只能在協(xié)程或者其他
-
協(xié)程上下文(Coroutine Context):協(xié)程上下文是一系列規(guī)則和配置的集合拂募,它決定了協(xié)程的運(yùn)行方式庭猩。
kotlinx.coroutines 提供的協(xié)程構(gòu)造器(Coroutine Builder)窟她,比如launch
、async
等蔼水,都會(huì)接收一個(gè)協(xié)程上下文對(duì)象震糖,主要用于設(shè)置協(xié)程異步任務(wù)的執(zhí)行線(xiàn)程策略。如下代碼所示:suspend fun main() { GlobalScope.launch(context = Dispatchers.Default) { println("run inside: ${Thread.currentThread().name}") }.join() }
-
結(jié)構(gòu)化并發(fā)(Structured concurrency):如果一個(gè)協(xié)程內(nèi)部創(chuàng)建了一個(gè)或多個(gè)子協(xié)程趴腋,只要所有子協(xié)程在父協(xié)程作用域結(jié)束前執(zhí)行完成吊说,就認(rèn)為當(dāng)前協(xié)程具備結(jié)構(gòu)化并發(fā)。如下圖所示:
更具體來(lái)說(shuō)优炬,當(dāng)父協(xié)程結(jié)束時(shí)颁井,如果其子協(xié)程仍在運(yùn)行谢揪,則父協(xié)程會(huì)阻塞自己(即當(dāng)前協(xié)程)巩掺,讓子協(xié)程運(yùn)行完成后才退出,也就是無(wú)需顯示使用
join
來(lái)確保子協(xié)程運(yùn)行結(jié)束。舉個(gè)例子糊余,如果我們使用
GlobalScope.launch
創(chuàng)建一個(gè)全局協(xié)程,然后為其添加幾個(gè)子協(xié)程单寂,那么我們必須手動(dòng)對(duì)所有子協(xié)程進(jìn)行管理贬芥,如下所示:suspend fun main() { GlobalScope.launch { // 創(chuàng)建一個(gè)子協(xié)程 val job1 = launch { delay(100) println("Hello World") } // 再創(chuàng)建一個(gè)子協(xié)程 val job2 = launch { delay(200) println("Hello World again!!") } job1.join() job2.join() } // 延遲程序退出 delay(500) }
可以看到,上述代碼中宣决,為了實(shí)現(xiàn)結(jié)構(gòu)化并發(fā)蘸劈,我們必須手動(dòng)維護(hù)所有子協(xié)程協(xié)程狀態(tài),這樣才能確保在父協(xié)程退出前尊沸,子協(xié)程能完成任務(wù)威沫。
注:事實(shí)上,上述例子并未實(shí)現(xiàn)結(jié)構(gòu)化并發(fā)洼专,因?yàn)楦竻f(xié)程
GlobalScope
作用域其實(shí)已經(jīng)退出了棒掠,只是通過(guò)delay
讓子協(xié)程運(yùn)行,造成子協(xié)程都能運(yùn)行完成效果屁商,但事實(shí)上子協(xié)程生命周期超過(guò)了父協(xié)程烟很。如果需要自己手動(dòng)維護(hù)結(jié)構(gòu)化并發(fā),操作會(huì)相對(duì)繁瑣蜡镶,因此雾袱,Kotlin 已經(jīng)為我們提供了具備結(jié)構(gòu)化并發(fā)功能協(xié)程構(gòu)造器,比如
coroutineScope
官还,如下所示:suspend fun main() { GlobalScope.launch { // 子協(xié)程:具備結(jié)構(gòu)化并發(fā) coroutineScope { // 創(chuàng)建一個(gè)子協(xié)程 val job1 = launch { delay(100) println("Hello World") } // 再創(chuàng)建一個(gè)子協(xié)程 val job2 = launch { delay(200) println("Hello World again!!") } } } // 延遲程序退出 delay(500) }
coroutineScope
創(chuàng)建的協(xié)程具備結(jié)構(gòu)化并發(fā)功能芹橡,因此只有等到其子協(xié)程完成時(shí),父協(xié)程才可以退出望伦,這樣我們就無(wú)需維護(hù)各個(gè)子協(xié)程的狀態(tài)了僻族。結(jié)構(gòu)化并發(fā)強(qiáng)調(diào)的是子協(xié)程的生命周期小于或等于父協(xié)程生命周期粘驰,因此,結(jié)構(gòu)化并發(fā)的一個(gè)最大的好處就是可以很方便地管理協(xié)程述么,比如只要父協(xié)程被取消蝌数,其內(nèi)所有協(xié)程也都會(huì)被自動(dòng)取消。
基本使用
下面用代碼來(lái)創(chuàng)建一個(gè)最簡(jiǎn)單的協(xié)程度秘,了解一下基本的使用顶伞。具體步驟如下:
-
導(dǎo)入依賴(lài):Kotlin 協(xié)程的官方框架為kotlinx.coroutines,該框架未集成在 Kotlin 的標(biāo)準(zhǔn)庫(kù)中剑梳,因此需要我們手動(dòng)進(jìn)行導(dǎo)入:
// project: build.gradle buildscript { ext.kotlin_coroutines = "1.3.8" } // app: build.gradle dependencies { implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:$kotlin_coroutines" // Android 平臺(tái)需要額外導(dǎo)入 kotlinx-coroutines-android implementation "org.jetbrains.kotlinx:kotlinx-coroutines-android:$kotlin_coroutines" }
-
創(chuàng)建協(xié)程:以下代碼使用
runBlocking
函數(shù)創(chuàng)建了一個(gè)協(xié)程:fun main() { runBlocking { println("coroutineScope starts: ${Thread.currentThread().name}") launch { delay(1000) println("son coroutine: ${Thread.currentThread().name}") } println("coroutineScope ends: ${Thread.currentThread().name}") } }
運(yùn)行上述代碼唆貌,可得到如下輸出:
可以看到,我們?cè)谥骶€(xiàn)程中通過(guò)協(xié)程以非阻塞方式完成了異步任務(wù)垢乙。
協(xié)程作用域
每個(gè)協(xié)程都擁有自己的作用域范圍锨咙,kotlinx.coroutines 提供了多種協(xié)程作用域,以下介紹常見(jiàn)的幾種:
-
CoroutineScope
:創(chuàng)建一個(gè)協(xié)程通用作用域追逮。其文檔如下圖所示:最佳的協(xié)程作用域?qū)嵗齽?chuàng)建是通過(guò)
CoroutineScope
和MainScope
內(nèi)置的工廠(chǎng)方法(比如CoroutineScope()
酪刀、MainScope()
)進(jìn)行創(chuàng)建,
額外的上下文元素可以通過(guò)+
操作符進(jìn)行添加(因?yàn)?kotlinx.coroutines 覆寫(xiě)了plus
操作符)
-
GlobalScope
:全局協(xié)程作用域钮孵。具體內(nèi)容如下圖所示:從文檔可以看到骂倘,
GlobalScope
實(shí)現(xiàn)了CoroutineScope
,并自行實(shí)現(xiàn)了全局功能巴席。一個(gè)全局協(xié)程的生命周期與應(yīng)用的生命周期一致历涝。
-
MainScope
:創(chuàng)建一個(gè) UI 組件的協(xié)程作用域。其文檔如下圖所示:MainScope
自帶SupervisorJob
和Dispatchers.Main
上下文元素漾唉。
可以通過(guò)+
操作符為其作用域范圍增添新元素:val scope = MainScope() + CoroutineName("MyActivity")
注:如果在 Android 平臺(tái)上荧库,可以通過(guò)
MainScope()
方法創(chuàng)建一個(gè)主線(xiàn)程作用域(Dispatchers.Main
)協(xié)程,然后通過(guò)為其創(chuàng)建子協(xié)程來(lái)執(zhí)行異步任務(wù)赵刑,然后在資源釋放出直接關(guān)閉協(xié)程即可分衫,這樣就將界面與異步任務(wù)連接了起來(lái),會(huì)很方便進(jìn)行管理料睛。如下所示:class Activity { private val mainScope = MainScope() fun onCreate(){ // 執(zhí)行異步任務(wù) this.mainScope.launch(Dispatchers.IO) { // ... } } fun destroy() { mainScope.cancel() } }
事實(shí)上丐箩,其實(shí) Android 已經(jīng)對(duì)具備生命周期的組件實(shí)體都內(nèi)置了相應(yīng)的協(xié)程作用域支持,具體內(nèi)容可查看:lifecyclescope
創(chuàng)建協(xié)程
kotlinx.coroutines 提供了多種協(xié)程構(gòu)造器讓我們創(chuàng)建協(xié)程恤煞。以下列舉一些常見(jiàn)的協(xié)程創(chuàng)建方式:
-
launch
:CoroutineScope
的擴(kuò)展方法屎勘,用于創(chuàng)建一個(gè)新協(xié)程。其文檔如下圖所示:launch
函數(shù)啟動(dòng)的新協(xié)程不會(huì)阻塞當(dāng)前線(xiàn)程居扒,同時(shí)會(huì)返回一個(gè)Job
對(duì)象概漱,可通過(guò)該對(duì)象取消當(dāng)前協(xié)程執(zhí)行。launch
函數(shù)啟動(dòng)的協(xié)程會(huì)繼承父協(xié)程所在的協(xié)程上下文context
喜喂,如果其上下文context
不包含任意協(xié)程調(diào)度器dispatcher
或者ContinuationInterceptor
瓤摧,則默認(rèn)使用Dispatchers.default
竿裂,即異步任務(wù)會(huì)在運(yùn)行在一條 CPU 密集型線(xiàn)程上。如果
launch
函數(shù)創(chuàng)建的協(xié)程內(nèi)部拋出未捕獲異常照弥,那么默認(rèn)情況下會(huì)導(dǎo)致其父協(xié)程取消(除非顯式設(shè)置了CoroutineExceptionHandler
)腻异。默認(rèn)情況下,
launch
函數(shù)會(huì)立即執(zhí)行協(xié)程这揣,可以通過(guò)參數(shù)start
來(lái)控制其啟動(dòng)選項(xiàng)悔常,比如設(shè)置為CoroutineStart.LAZY
,則launch
函數(shù)創(chuàng)建的協(xié)程不會(huì)立即啟動(dòng)(懶加載)给赞,可通過(guò)返回對(duì)象的Job.start()
顯示進(jìn)行啟動(dòng)机打,或者當(dāng)調(diào)用Job.join()
方法時(shí),則會(huì)隱式啟動(dòng)該協(xié)程片迅。
如下代碼所示:suspend fun main() { val job = GlobalScope.launch(context = Dispatchers.Default) { println("${Thread.currentThread().name} --> 啟動(dòng)協(xié)程") launch { // 1 println("${Thread.currentThread().name} --> 開(kāi)啟一個(gè)子協(xié)程...1") } println("${Thread.currentThread().name} --> 創(chuàng)建懶加載子協(xié)程") val job1 = launch(start = CoroutineStart.LAZY) { // 2 println("${Thread.currentThread().name} --> 開(kāi)啟一個(gè)懶加載子協(xié)程...2") } val job2 = launch(start = CoroutineStart.LAZY) { // 3 println("${Thread.currentThread().name} --> 開(kāi)啟一個(gè)懶加載子協(xié)程...3") } println("${Thread.currentThread().name} --> 顯式啟動(dòng)懶加載子協(xié)程") job1.start() // 4 println("${Thread.currentThread().name} --> 隱式啟動(dòng)懶加載子協(xié)程") job2.join() // 5 } job.join() }
上述代碼的運(yùn)行結(jié)果如下所示:
運(yùn)行結(jié)果分析:首先我們通過(guò)
GlobalScope.launch
函數(shù)創(chuàng)建了一個(gè)全局協(xié)程残邀,并在該協(xié)程內(nèi)部創(chuàng)建了 3 個(gè)子協(xié)程,分別位于上述代碼的1
柑蛇、2
和3
位置芥挣,其中,1
位置創(chuàng)建了一個(gè)立即執(zhí)行的子協(xié)程唯蝶,且該協(xié)程會(huì)繼承父協(xié)程上下文九秀,因此launch(..)
函數(shù)內(nèi)異步任務(wù)會(huì)立即自動(dòng)分發(fā)到Dispatchers.Default
線(xiàn)程上進(jìn)行執(zhí)行遗嗽。
而由于launch
函數(shù)不會(huì)阻塞當(dāng)前線(xiàn)程粘我,我們創(chuàng)建的父協(xié)程位于線(xiàn)程池中的DefaultDispatcher-worker-1
,因此父協(xié)程會(huì)立即執(zhí)行后續(xù)代碼痹换。
然后父協(xié)程繼續(xù)往下執(zhí)行征字,經(jīng)歷2
和3
過(guò)程,分別創(chuàng)建了兩個(gè)懶加載子協(xié)程娇豫。
然后再代碼4
處匙姜,顯示啟動(dòng)了子協(xié)程job1
,則此時(shí)會(huì)立即將異步任務(wù)分發(fā)給Dispatchers.Default
線(xiàn)程進(jìn)行執(zhí)行冯痢。
Job.start()
是一個(gè)非阻塞調(diào)用氮昧,因此父協(xié)程代碼會(huì)繼續(xù)執(zhí)行到5
處,通過(guò)Job.join()
方法隱式啟動(dòng)子協(xié)程job2
浦楣,同樣此時(shí)會(huì)將協(xié)程job2
的異步任務(wù)分發(fā)到Dispatchers.Default
線(xiàn)程進(jìn)行執(zhí)行袖肥。
以上就得到上圖的最終輸出。 -
async
:CoroutineScope
的擴(kuò)展方法振劳,用于創(chuàng)建一個(gè)新協(xié)程椎组。其文檔如下圖所示:async
函數(shù)的大部分性質(zhì)與launch
函數(shù)一致,比如:- 為了確保結(jié)構(gòu)化并發(fā)历恐,
async
與launch
一樣寸癌,當(dāng)其運(yùn)行失敗時(shí)专筷,會(huì)導(dǎo)致父協(xié)程被取消。 - 其同樣繼承父協(xié)程上下文
context
蒸苇,如果其上下文context
不包含任意協(xié)程調(diào)度器dispatcher
或者ContinuationInterceptor
磷蛹,則默認(rèn)使用Dispatchers.default
。 - 默認(rèn)創(chuàng)建就自動(dòng)立即執(zhí)行溪烤,也可通過(guò)
start
參數(shù)控制其行為弦聂,比如CoroutineStart.LAZY
來(lái)延時(shí)執(zhí)行。
此時(shí)氛什,可以通過(guò)Deferred.start()
函數(shù)來(lái)顯示啟動(dòng)協(xié)程莺葫,
也可以通過(guò)Deferred.join()
、Deferred.await()
和Deferred.awaitAll
等函數(shù)隱式啟動(dòng)協(xié)程枪眉。
另外捺檬,
async
與launch
主要的區(qū)別有以下兩個(gè):異常處理:
launch
內(nèi)部可能會(huì)拋出異常,因此需用戶(hù)手動(dòng)進(jìn)行處理贸铜。而async
默認(rèn)不會(huì)拋出異常堡纬,因?yàn)樗鼤?huì)認(rèn)為你最終必須調(diào)用await
來(lái)獲取結(jié)果,因此蒿秦,async
內(nèi)部如果出現(xiàn)異常烤镐,用戶(hù)需要在await
處進(jìn)行捕獲。
更多異常處理內(nèi)容棍鳖,請(qǐng)查看后文:異常處理返回值:一個(gè)最重大的區(qū)別就是炮叶,
launch
函數(shù)返回的是一個(gè)Job
對(duì)象,該對(duì)象主要用于取消協(xié)程運(yùn)行渡处,而async
函數(shù)返回的是一個(gè)Deferred
對(duì)象镜悉,該對(duì)象不僅可以用于取消協(xié)程,更重要的是可以獲取異步任務(wù)返回結(jié)果医瘫!這是相當(dāng)有用的一個(gè)特性侣肄,如下代碼所示:
注:實(shí)際上,
Deferred
繼承于Job
:public interface Deferred<out T> : Job
suspend fun main() = coroutineScope { val deferred = async { // 模擬耗時(shí)異步任務(wù) delay(1000) // 返回 100 100 } val result = deferred.await() println("異步任務(wù)結(jié)果為:${result}") }
上述代碼執(zhí)行結(jié)果如下圖所示:
可以看到醇份,我們成功獲取得到異步任務(wù)返回的結(jié)果稼锅。
需要注意的是,對(duì)于
async
開(kāi)啟的協(xié)程僚纷,我們使用的是Deferred.await()
函數(shù)來(lái)獲取異步任務(wù)返回結(jié)果矩距。其文檔如下所示:await()
會(huì)阻塞當(dāng)前協(xié)程(即async
的父協(xié)程),直至async
協(xié)程完成異步任務(wù)返回結(jié)果畔濒,
或者當(dāng)取消協(xié)程時(shí)剩晴,則會(huì)拋出一個(gè)CancellationException
異常。await()
函數(shù)會(huì)阻塞當(dāng)前協(xié)程,但是不會(huì)阻塞當(dāng)前線(xiàn)程赞弥,因此當(dāng)當(dāng)前線(xiàn)程存在多個(gè)協(xié)程時(shí)毅整,await()
會(huì)讓出執(zhí)行權(quán),當(dāng)前線(xiàn)程的其他協(xié)程就有機(jī)會(huì)得到執(zhí)行绽左。如下代碼所示:suspend fun main() { coroutineScope { launch { // 1 val deferred = async { // 模擬耗時(shí)異步任務(wù) delay(1000) // 返回 100 100 } val result = deferred.await() println("異步任務(wù)結(jié)果為:${result}") } launch { // 2 println("hello world") } } }
當(dāng)
1
處子協(xié)程處于await()
狀態(tài)時(shí)悼嫉,2
處子協(xié)程就可以得到運(yùn)行,結(jié)果如下所示:
- 為了確保結(jié)構(gòu)化并發(fā)历恐,
-
coroutineScope
:創(chuàng)建一個(gè)通用作用域的新協(xié)程拼窥,并在該協(xié)程作用域范圍內(nèi)調(diào)用block
塊戏蔑。其文檔如下圖所示:coroutineScope
函數(shù)會(huì)繼承其父協(xié)程的上下文coroutineContext
,但會(huì)覆蓋父協(xié)程上下文的Job
對(duì)象鲁纠。coroutineScope
函數(shù)主要設(shè)計(jì)用于并發(fā)分解任務(wù)总棵。但該協(xié)程內(nèi)任一子協(xié)程運(yùn)行失敗時(shí),該協(xié)程就會(huì)失敗改含,并會(huì)取消其內(nèi)所有子協(xié)程情龄。coroutineScope
函數(shù)具備結(jié)構(gòu)化并發(fā),它會(huì)阻塞當(dāng)前協(xié)程捍壤,直至其函數(shù)體block
執(zhí)行完畢和其所有子協(xié)程運(yùn)行完成時(shí)骤视,才會(huì)返回。舉個(gè)例子:代碼如下所示:
suspend fun main() = coroutineScope { // 1 coroutineScope { // 2 delay(1000) println("World") } println("Hello") // 3 }
上述代碼的執(zhí)行結(jié)果如下所示:
結(jié)果解析:上述代碼中鹃觉,如
1
處代碼所示专酗,我們首先通過(guò)coroutineScope
函數(shù)創(chuàng)建一個(gè)協(xié)程。
然后協(xié)程執(zhí)行盗扇,代碼會(huì)進(jìn)行到2
處祷肯,此時(shí)又通過(guò)coroutineScope
函數(shù)創(chuàng)建了一個(gè)子協(xié)程,又因?yàn)?code>coroutineScope函數(shù)會(huì)阻塞當(dāng)前協(xié)程粱玲,直到其內(nèi)部執(zhí)行完成躬柬,因此3
處代碼暫時(shí)未能執(zhí)行拜轨,必須等到2
處代碼內(nèi)部執(zhí)行完畢抽减,最終的結(jié)果就如上圖所示。 -
supervisorScope
:創(chuàng)建一個(gè)新協(xié)程橄碾,并在該協(xié)程作用域范圍內(nèi)調(diào)用block
塊卵沉。其文檔如下圖所示:supervisorScope
函數(shù)會(huì)繼承其父協(xié)程的上下文coroutineContext
,但會(huì)使用SupervisorJob
覆蓋父協(xié)程上下文的Job
對(duì)象法牲。對(duì)于
supervisorScope
函數(shù)創(chuàng)建的協(xié)程史汗,如果其內(nèi)任一子協(xié)程運(yùn)行失敗,不會(huì)導(dǎo)致該協(xié)程失敗拒垃,對(duì)其他子協(xié)程也不會(huì)造成影響停撞,因此可以自定義子協(xié)程失敗處理機(jī)制。如果是
supervisorScope
創(chuàng)建的協(xié)程自身出現(xiàn)錯(cuò)誤(比如block
塊拋異常或取消該協(xié)程)戈毒,那么就導(dǎo)致該協(xié)程及其內(nèi)所有子協(xié)程失敗艰猬,但是不會(huì)取消父協(xié)程任務(wù)。supervisorScope
具備結(jié)構(gòu)化并發(fā)埋市。 -
withContext
:該函數(shù)會(huì)創(chuàng)建一個(gè)新協(xié)程冠桃,協(xié)程的異步任務(wù)會(huì)運(yùn)行在指定上下文對(duì)應(yīng)的線(xiàn)程中。其文檔如下所示:withContext
函數(shù)的執(zhí)行模型為:當(dāng)調(diào)用withContext
函數(shù)創(chuàng)建一個(gè)協(xié)程時(shí)道宅,必須強(qiáng)制指定一個(gè)協(xié)程上下文context
食听,然后withContext
函數(shù)會(huì)立即在指定的上下文中執(zhí)行其block
塊內(nèi)容,同時(shí)會(huì)阻塞當(dāng)前協(xié)程(即父協(xié)程)污茵,直至其block
塊運(yùn)行完成(結(jié)構(gòu)化并發(fā))樱报,然后將block
塊最末尾的代碼會(huì)作為異步任務(wù)的返回值。其執(zhí)行模型類(lèi)似于async(...).await()
泞当。舉個(gè)例子:如下代碼所示:
suspend fun main() = coroutineScope { println("${Thread.currentThread().name} --> 開(kāi)啟協(xié)程") val result = withContext(Dispatchers.Default){ println("${Thread.currentThread().name} --> 模擬耗時(shí)任務(wù)") delay(1000) println("${Thread.currentThread().name} --> 耗時(shí)任務(wù)結(jié)束") 10*10 } println("${Thread.currentThread().name} --> 異步任務(wù)結(jié)果:${result}") }
上述代碼的運(yùn)行結(jié)果如下圖所示:
從上圖中可以看出肃弟,
withContext
函數(shù)在設(shè)置上下文時(shí)候,不僅會(huì)對(duì)其內(nèi)部的異步任務(wù)起作用零蓉,還會(huì)對(duì)coroutineScope
后續(xù)協(xié)程作用域起改變上下文效果笤受。withContext
強(qiáng)制我們必須為其指定一個(gè)協(xié)程上下文,其實(shí)最主要的目的是指定協(xié)程調(diào)度器(上下文包含了調(diào)度器)敌蜂,具體調(diào)度器取值可參考后文:協(xié)程上下文與調(diào)度器 -
withTimeout
:創(chuàng)建一個(gè)超時(shí)協(xié)程箩兽,當(dāng)協(xié)程運(yùn)行時(shí)間超過(guò)給定時(shí)間后,拋出一個(gè)TimeoutCancellationException
章喉。其文檔如下圖所示:與其相似的還有一個(gè)函數(shù)
withTimeoutOrNull
汗贫,該函數(shù)功能與withTimeout
一樣,只是在超時(shí)后秸脱,不拋出異常落包,而是直接返回null
。withTimeout
/withTimeoutOrNull
具備結(jié)構(gòu)化并發(fā)摊唇。 -
CoroutineScope(context))
:工廠(chǎng)方法咐蝇,用于創(chuàng)建一個(gè)新的協(xié)程。其文檔如下所示:CoroutineScope(context)
參數(shù)context
如果不包含一個(gè)Job
元素巷查,則會(huì)使用一個(gè)默認(rèn)的Job()
元素有序。這樣子,協(xié)程取消或者任一子協(xié)程運(yùn)行失敗時(shí)岛请,在該作用域范圍內(nèi)的其他所有協(xié)程都會(huì)被取消旭寿,其行為與coroutineScope
函數(shù)一致。CoroutineScope(context)
不具備結(jié)構(gòu)化并發(fā)崇败。 -
runBlocking
:創(chuàng)建一個(gè)新協(xié)程异袄,并阻塞當(dāng)前線(xiàn)程,直至協(xié)程運(yùn)行完成败砂。其文檔如下圖所示:注:由于
runBlocking
會(huì)阻塞當(dāng)前線(xiàn)程管嬉,因此在實(shí)際開(kāi)發(fā)中,建議不要使用runBlocking
創(chuàng)建協(xié)程。
該函數(shù)主要是設(shè)計(jì)用于在main
函數(shù)或測(cè)試代碼中用于運(yùn)行帶有suspend
函數(shù)的庫(kù)接口。
協(xié)程上下文與調(diào)度器(Coroutine Context and Dispatchers)
協(xié)程上下文是一系列不同元素的集合。其中主要元素是協(xié)程的Job
檀蹋。
協(xié)程會(huì)一直運(yùn)行在某個(gè)協(xié)程上下文CoroutineContext
中。
-
調(diào)度器和線(xiàn)程:協(xié)程上下文包含一個(gè) 協(xié)程調(diào)度器
CoroutineDispatcher
云芦,該調(diào)度器決定了協(xié)程異步任務(wù)的執(zhí)行線(xiàn)程俯逾。Kotlin 提供了如下調(diào)度器可供我們選擇:
-
Dispatchers.Default
:表示使用一種低并發(fā)的線(xiàn)程策略,該策略適合 CPU 密集型計(jì)算任務(wù)舅逸。
所有的標(biāo)準(zhǔn)協(xié)程構(gòu)造器桌肴,比如launch
,async
,它們默認(rèn)的CoroutineDispatcher
都為Dispatchers.Default
-
Dispatchers.IO
:表示使用一種較高并發(fā)的線(xiàn)程策略琉历,適用于網(wǎng)絡(luò)請(qǐng)求坠七,文件讀寫(xiě)等多 IO 操作的場(chǎng)景。 -
Dispatchers.UnConfined
:表示一個(gè)未綁定到特定線(xiàn)程的協(xié)程調(diào)度器旗笔。它會(huì)首先在調(diào)用者線(xiàn)程執(zhí)行協(xié)程彪置,當(dāng)遇到suspend
函數(shù)恢復(fù)時(shí),線(xiàn)程又會(huì)切換到該suspend
函數(shù)對(duì)應(yīng)的協(xié)程上下文中蝇恶。Dispatchers.UnConfined
不強(qiáng)制使用任何線(xiàn)程策略拳魁,隨遇而安。開(kāi)發(fā)中建議慎重使用該策略撮弧,因?yàn)闀?huì)導(dǎo)致線(xiàn)程切換不可控潘懊。 -
Dispatchers.Main
:表示運(yùn)行在 UI 主線(xiàn)程上。比如在 Android 平臺(tái)上贿衍,就表示不會(huì)開(kāi)啟子線(xiàn)程授舟,而是直接將任務(wù)運(yùn)行在主線(xiàn)程中。
kotlinx.coroutines 中所有的協(xié)程構(gòu)造器贸辈,比如launch
释树,async
都可以接收一個(gè)可選參數(shù)CoroutineContext
,為協(xié)程顯示指定運(yùn)行環(huán)境裙椭。
注:除了以上默認(rèn)調(diào)度器外躏哩,Kotlin 還提供相關(guān)函數(shù)可以讓我們創(chuàng)建其他類(lèi)型的調(diào)度器,比如一個(gè)很好用的基于單線(xiàn)程運(yùn)行環(huán)境的調(diào)度器:
newSingleThreadContext(name)
-
當(dāng)launch
/async
等協(xié)程構(gòu)造器未顯示指定CoroutineContext
時(shí)揉燃,它會(huì)繼承父協(xié)程的上下文,且異步任務(wù)會(huì)運(yùn)行在父協(xié)程上下文指定的線(xiàn)程環(huán)境中筋栋。
GlobalScope.launch
默認(rèn)使用的調(diào)度器為Dispatchers.Default
炊汤。
注:為了更好地查看調(diào)度器調(diào)度結(jié)果,可以在執(zhí)行代碼時(shí),為 JVM 選項(xiàng)添加開(kāi)啟協(xié)程調(diào)式模式:-Dkotlinx.coroutines.debug
抢腐,然后使用如下代碼就可以在運(yùn)行時(shí)打印出協(xié)程實(shí)例和其運(yùn)行所在的線(xiàn)程:
fun <T> log(msg: T) = println("[${Thread.currentThread().name}] $msg")
為了更方便查看協(xié)程姑曙,在開(kāi)啟協(xié)程調(diào)式模式后,也可以通過(guò)CoroutineName
為新創(chuàng)建的協(xié)程添加名稱(chēng)迈倍,如下所示:
suspend fun main() {
coroutineScope {
log("set name for a coroutine")
launch(CoroutineName("son-routine")) {
log("this coroutine name is son-routine")
}
}
}
其運(yùn)行結(jié)果如下圖所示:
注:協(xié)程上下文CoroutineContext
覆寫(xiě)了plus
操作符伤靠,因此我們可以通過(guò)+
號(hào)連接多個(gè)協(xié)程上下文元素,如下所示:
launch(Dispatchers.Default + CoroutineName("test")) {
println("I'm working in thread ${Thread.currentThread().name}")
}
取消機(jī)制
kotlinx.coroutines 提供的所有的suspend
函數(shù)都是可以進(jìn)行 取消(cancellable) 的啼染。他們會(huì)檢測(cè)協(xié)程取消狀態(tài)宴合,當(dāng)檢測(cè)到取消操作時(shí),就會(huì)拋出CancellationException
異常迹鹅,我們可在代碼塊中對(duì)該異常進(jìn)行捕獲卦洽。
如下所示:
suspend fun main() = coroutineScope {
val job = launch {
repeat(1000) { i ->
println("job: I'm sleeping $i ...")
delay(500L) // delay will detect cancellation for coroutine
}
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancel() // cancels the job
job.join() // waits for job's completion
println("main: Now I can quit.")
}
上述代碼運(yùn)行結(jié)果如下:
注:取消操作能成功的前提是:協(xié)程內(nèi)部會(huì)對(duì)取消狀態(tài)進(jìn)行檢測(cè)。
因此斜棚,并不是說(shuō)我們調(diào)用cancel
后阀蒂,就一定能取消協(xié)程運(yùn)行,比如弟蚀,對(duì)于協(xié)程內(nèi)部進(jìn)行 CPU 密集型計(jì)算的操作蚤霞,就無(wú)法進(jìn)行取消,如下所示:
suspend fun outputTwicePerSecond(){
var startTime = System.currentTimeMillis()
var nextPrintTime = startTime
var i = 0
while (i < 5) { // computation loop, just wastes CPU
// print a message twice a second
if (System.currentTimeMillis() >= nextPrintTime) {
println("job: I'm sleeping ${i++} ...")
nextPrintTime += 500L
}
}
}
suspend fun main() = coroutineScope {
val job = launch(Dispatchers.Default) {
outputTwicePerSecond()
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancelAndJoin() // cancels the job and waits for its completion
println("main: Now I can quit.")
}
上述代碼運(yùn)行結(jié)果如下:
可以看到义钉,在我們調(diào)用了cancel
操作后争便,子協(xié)程異步任務(wù)仍繼續(xù)執(zhí)行,直到完成断医。
其原因就是子協(xié)程異步任務(wù)中沒(méi)有對(duì)取消狀態(tài)進(jìn)行檢測(cè)滞乙。解決的方法就是要么在異步任務(wù)循環(huán)部分中調(diào)用suspend
函數(shù)(yield
是一個(gè)不錯(cuò)的選擇),要么就手動(dòng)對(duì)取消狀態(tài)進(jìn)行檢測(cè)鉴嗤,如下所示:
// 調(diào)用 suspend 函數(shù)
suspend fun outputTwicePerSecond(){
var startTime = System.currentTimeMillis()
var nextPrintTime = startTime
var i = 0
while (i < 5) { // computation loop, just wastes CPU
// print a message twice a second
if (System.currentTimeMillis() >= nextPrintTime) {
println("job: I'm sleeping ${i++} ...")
nextPrintTime += 500L
}
yield()
}
}
// 或者對(duì) 取消狀態(tài) 進(jìn)行檢測(cè)
fun CoroutineScope.outputTwicePerSecond() {
var startTime = System.currentTimeMillis()
var nextPrintTime = startTime
var i = 0
while (this.isActive) { // computation loop, just wastes CPU
// print a message twice a second
if (System.currentTimeMillis() >= nextPrintTime) {
println("job: I'm sleeping ${i++} ...")
nextPrintTime += 500L
}
}
}
異常處理
前面內(nèi)容講過(guò)斩启,要關(guān)閉一個(gè)協(xié)程,使用的是cancel
操作醉锅,實(shí)質(zhì)上兔簇,取消操作與異常是緊密聯(lián)系的,當(dāng)我們調(diào)用cancel
時(shí)硬耍,協(xié)程內(nèi)部其實(shí)是拋出了一個(gè)CancellationException
異常垄琐,從而打斷協(xié)程運(yùn)行,但是這個(gè)異常默認(rèn)會(huì)被異常處理器CoroutineExceptionHandler
忽略经柴,不過(guò)我們可以通過(guò)catch
塊進(jìn)行捕獲狸窘,如下所示:
fun main() = runBlocking {
val child = launch {
try {
delay(Long.MAX_VALUE)
} finally {
log("子協(xié)程停止運(yùn)行!")
}
}
// 讓子協(xié)程啟動(dòng)
yield()
log("取消子協(xié)程")
child.cancel()
child.join()
// 釋放父協(xié)程執(zhí)行權(quán)
delay(TimeUnit.SECONDS.toMillis(1))
log("父協(xié)程仍在運(yùn)行")
}
上述代碼運(yùn)行結(jié)果如下:
根據(jù)上圖可以直到坯认,對(duì)于取消操作翻擒,實(shí)質(zhì)上就是拋出了一個(gè)CancellationException
異常氓涣,所以可以捕獲得到,且子協(xié)程拋出的CancellationException
不會(huì)阻斷父協(xié)程的執(zhí)行陋气。
以上我們已經(jīng)知道協(xié)程對(duì)于CancellationException
異常的處理劳吠,那么對(duì)于協(xié)程內(nèi)部拋出的其他異常,或者是子協(xié)程發(fā)生異常巩趁,則又是怎樣個(gè)處理方法呢痒玩?如下代碼所示:
suspend fun main() {
GlobalScope.launch {
log("父協(xié)程啟動(dòng)")
val child = launch {
log("子協(xié)程拋出異常")
throw ArithmeticException("Div 0")
}
// 讓子協(xié)程啟動(dòng)
yield()
child.join()
// 釋放父協(xié)程執(zhí)行權(quán)
delay(TimeUnit.SECONDS.toMillis(1))
log("父協(xié)程仍在運(yùn)行")
}.join()
}
這次拋出的是普通異常ArithmeticException
,其運(yùn)行結(jié)果如下所示:
可以看到议慰,子協(xié)程拋出非CancellationException
異常蠢古,會(huì)打斷父協(xié)程(取消父協(xié)程)運(yùn)行。具體的過(guò)程涉及的內(nèi)容如下:
-
異常傳播(Exception propagation):協(xié)程構(gòu)造器對(duì)于異常傳播存在兩種機(jī)制:
-
向上傳播:即與普通異常處理機(jī)制一致褒脯,當(dāng)出現(xiàn)未捕獲異常時(shí)便瑟,自動(dòng)向上傳播該異常,比如
launch
番川,actor
-
自動(dòng)包裹:即協(xié)程內(nèi)部不處理該協(xié)程到涂,而是將異常包裹到返回對(duì)象上,在用戶(hù)進(jìn)行特定動(dòng)作時(shí)暴露給用戶(hù)進(jìn)行捕獲颁督,比如
async
践啄,produce
當(dāng)創(chuàng)建的是根協(xié)程時(shí),向上傳播異常機(jī)制會(huì)將異常當(dāng)作未捕獲異常(uncaught exception)沉御,捕獲機(jī)制類(lèi)似于 Java 的
Thread.uncaughtExceptionHandler
類(lèi)似屿讽,
而自動(dòng)包裹異常機(jī)制說(shuō)的是,即使協(xié)程內(nèi)部出現(xiàn)異常吠裆,但是不會(huì)直接拋出伐谈,而是等到用戶(hù)調(diào)用相關(guān)方法時(shí)(比如await
),才進(jìn)行拋出试疙。如下例子所示:
fun main() = runBlocking { // 向上傳播 val job = GlobalScope.launch { // root coroutine with launch println("Throwing exception from launch") throw IndexOutOfBoundsException() // Will be printed to the console by Thread.defaultUncaughtExceptionHandler } job.join() println("Joined failed job") // 自動(dòng)包裹 val deferred = GlobalScope.async { // root coroutine with async println("Throwing exception from async") throw ArithmeticException() // Nothing is printed, relying on user to call await } try { deferred.await() println("Unreached") } catch (e: ArithmeticException) { println("Caught ArithmeticException") } }
可以看到诵棵,向上傳播異常直接拋出,而自動(dòng)包裹異常需要用戶(hù)在特定點(diǎn)上進(jìn)行異常捕獲祝旷。
-
向上傳播:即與普通異常處理機(jī)制一致褒脯,當(dāng)出現(xiàn)未捕獲異常時(shí)便瑟,自動(dòng)向上傳播該異常,比如
-
自定義異常處理器(CoroutineExceptionHandler)
:對(duì)于未捕獲異常履澳,我們可以通過(guò)為根協(xié)程綁定一個(gè)CoroutineExceptionHandler
來(lái)對(duì)根協(xié)程及其子協(xié)程進(jìn)行異常捕獲,其功能類(lèi)似于Thread.uncaughtExceptionHandler
怀跛。注:
CoroutineExceptionHandler
只對(duì)未捕獲異常起作用距贷。特別是,當(dāng)子協(xié)程出現(xiàn)未捕獲異常時(shí)吻谋,它會(huì)一級(jí)一級(jí)向上傳播該異常忠蝗,直至根協(xié)程,也因此滨溉,對(duì)子協(xié)程安裝CoroutineExceptionHandler
是沒(méi)有作用的什湘。但是對(duì)于Supervisor
长赞,其異常傳播機(jī)制為:向下傳播晦攒,即父協(xié)程發(fā)生異常闽撤,會(huì)傳播給所有子協(xié)程,導(dǎo)致協(xié)程全部取消脯颜;而子協(xié)程發(fā)生異常哟旗,不會(huì)向上傳播,也不會(huì)影響其他子協(xié)程和父協(xié)程栋操。子協(xié)程需要自己處理自己的異常闸餐,因此為Supervisor
作用域下的子協(xié)程安裝CoroutineExceptionHandler
是有效的。注:
asnyc
函數(shù)對(duì)異常的作用機(jī)制是:async
會(huì)將其內(nèi)所有異常進(jìn)行捕獲矾芙,然后放置到返回值Deferred
中舍沙,因此,為async
安裝CoroutineExceptionHandler
也不會(huì)有任何作用剔宪。注:在 JVM 平臺(tái)上拂铡,可以通過(guò)
ServiceLoader
為所有協(xié)程注冊(cè)一個(gè)全局異常處理器CoroutineExceptionHandler
,全局異常處理器類(lèi)似于Thread.defaultUncaughtExceptionHandler
葱绒,在系統(tǒng)沒(méi)有其他處理器的情況下進(jìn)行使用感帅。
在 Android 平臺(tái)上,有一個(gè)默認(rèn)以安裝的全局異常處理器:uncaughtExceptionPreHandler
例子如下所示:
suspend fun main() { // 創(chuàng)建一個(gè)異常處理器 val handler = CoroutineExceptionHandler { _, exception -> println("CoroutineExceptionHandler got $exception") } val job = GlobalScope.launch(handler) { // 安裝異常處理器 log("父協(xié)程開(kāi)啟") val child = launch { // 子協(xié)程拋出異常 log("子協(xié)程拋出異常") throw AssertionError() } delay(TimeUnit.SECONDS.toMillis(1)) log("父協(xié)程仍在運(yùn)行") } job.join() }
上述代碼執(zhí)行結(jié)果如下圖所示:
可以看到地淀,為根協(xié)程安裝了
CoroutineExceptionHandler
后失球,成功捕獲到子協(xié)程異常。
同時(shí)可以看到的是帮毁,CoroutineExceptionHandler
只是對(duì)異常進(jìn)行了捕獲顯示实苞,不會(huì)對(duì)協(xié)程有實(shí)質(zhì)影響,子協(xié)程拋出異常后烈疚,同樣會(huì)取消父協(xié)程黔牵。注:對(duì)于
CoroutineExceptionHandler
,不要將其安裝到runBlocking
中胞得,因?yàn)?code>runBlocking在子協(xié)程異常結(jié)束時(shí)荧止,就會(huì)自動(dòng)cancel
,CoroutineExceptionHandler
不會(huì)起作用阶剑。 -
異常聚合:如果多個(gè)子協(xié)程都拋出異常跃巡,那么一般規(guī)則是:第一個(gè)拋出的異常勝出,異常處理器會(huì)處理第一個(gè)異常牧愁,其他異常會(huì)作為次級(jí)信息附加到第一個(gè)異常后面素邪。如下代碼所示:
fun main() = runBlocking { val handler = CoroutineExceptionHandler { _, exception -> println("CoroutineExceptionHandler got $exception with suppressed ${exception.suppressed.contentToString()}") } val job = GlobalScope.launch(handler) { launch { try { delay(Long.MAX_VALUE) // it gets cancelled when another sibling fails with IOException } finally { throw ArithmeticException() // the second exception } } launch { delay(100) throw IOException() // the first exception } delay(Long.MAX_VALUE) } job.join() }
其運(yùn)行結(jié)果如下所示:
通道(Channel)
Channel 可以讓數(shù)據(jù)流動(dòng)在不同的協(xié)程中,提供了協(xié)程間通信機(jī)制猪半。
本質(zhì)上兔朦,Channel 底層就是一個(gè)線(xiàn)程安全的隊(duì)列(類(lèi)似BlockingQueue
)偷线,一個(gè)協(xié)程可以往里面發(fā)送數(shù)據(jù),另一個(gè)協(xié)程就可以在里面獲取數(shù)據(jù)沽甥,完成通信過(guò)程声邦。如下所示:
suspend fun main() {
val channel = Channel<Int>()
var producer = GlobalScope.launch {
for (i in 1..10) {
log("channel send $i")
channel.send(i)
}
}
var consumer = GlobalScope.launch {
repeat(10) {
val value = channel.receive()
log("channel receiver $value")
}
}
producer.join()
consumer.join()
}
上述代碼是基于Channel
實(shí)現(xiàn)的一個(gè)簡(jiǎn)單的生產(chǎn)者-消費(fèi)者模式。
注:send
和receive
函數(shù)都是suspend
函數(shù)摆舟,因此:
- 如果
Channel
未發(fā)送數(shù)據(jù)亥曹,或者底層隊(duì)列已滿(mǎn),則send
會(huì)被掛起 - 如果
Channel
無(wú)法讀取到數(shù)據(jù)恨诱,則receive
會(huì)掛起媳瞪,直到新元素到來(lái)
下面對(duì)Channel
的一些基本操作進(jìn)行簡(jiǎn)介:
-
迭代
Channel
:我們上面的例子是手動(dòng)指定接收次數(shù),并使用channel.receive
進(jìn)行數(shù)據(jù)接收照宝,這種方式不夠靈活蛇受。
Channel
本身提供了迭代器操作:channel.iterator()
,這種寫(xiě)法可以簡(jiǎn)化成for...in...
操作厕鹃,更加簡(jiǎn)潔方便兢仰。如下所示:var consumer = GlobalScope.launch { for (element in channel){ log("channel receive $element") } }
指定緩沖區(qū)大小:前面說(shuō)過(guò),
Channel
本質(zhì)上是一個(gè)隊(duì)列熊响,發(fā)送數(shù)據(jù)時(shí)旨别,會(huì)把數(shù)據(jù)存儲(chǔ)到到隊(duì)列緩沖區(qū)中,接收數(shù)據(jù)時(shí)汗茄,會(huì)從該隊(duì)列緩沖區(qū)中進(jìn)行讀取秸弛。
默認(rèn)情況下,緩沖區(qū)大小為 0洪碳,但是我們可以通過(guò)在創(chuàng)建Channel
時(shí)递览,手動(dòng)指定該緩沖區(qū)大小,比如:Channel(4)
瞳腌,表示緩沖區(qū)大小為 4绞铃。-
創(chuàng)建
Channel
:前面我們都是通過(guò)Channel
構(gòu)造函數(shù)單獨(dú)創(chuàng)建一個(gè)Channel
,然后嵌套到生產(chǎn)者協(xié)程和消費(fèi)者協(xié)程中嫂侍,這種做法很直觀儿捧,但是代碼量略大。Kotlin 提供了一些擴(kuò)展方法挑宠,可以很方便地讓我們創(chuàng)建Channel
菲盾。如下所示:suspend fun main() = coroutineScope { val receiveChannel = produce<Int> { for (i in 1..10) { log("send $i") send(i) } } receiveChannel.consumeEach { log("receive $it") } }
使用擴(kuò)展函數(shù)
produce
來(lái)創(chuàng)建一個(gè)Channel
,使用consumeEach
來(lái)消費(fèi)Channel
各淀。 關(guān)閉
Channel
:Channel
是可以進(jìn)行關(guān)閉的懒鉴,其關(guān)閉方法為close
。close
方法的底層實(shí)現(xiàn)其實(shí)就是通過(guò)發(fā)送一個(gè)特殊的 token 給到Channel
,當(dāng)Channel
讀取到該 token 時(shí)临谱,就會(huì)進(jìn)行關(guān)閉操作璃俗。因此,close
方法其實(shí)不是立即關(guān)閉Channel
悉默,而是會(huì)等到Channel
底層隊(duì)列緩沖區(qū)數(shù)據(jù)完全被消費(fèi)完畢后城豁,才會(huì)進(jìn)行關(guān)閉操作。
-
單發(fā)多收:多個(gè)協(xié)程可以從同一個(gè)
Channel
中接收數(shù)據(jù)麦牺,此時(shí)接收協(xié)程競(jìng)爭(zhēng)該Channel
數(shù)據(jù)钮蛛。如下所示:fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch { for (msg in channel) { log("Processor #$id received $msg") } } suspend fun main() = coroutineScope { val singleChannel = produce<Int> { var count: Int = 0 while (true) { send(++count) delay(100) } } repeat(5) { launchProcessor(it, singleChannel) } delay(950) singleChannel.cancel() }
注:
for...in...
即使在多個(gè)協(xié)程中同時(shí)對(duì)同一個(gè)Channel
進(jìn)行遍歷操作鞭缭,也是非常安全的剖膳。上述代碼運(yùn)行結(jié)果如下圖所示:
-
單收多發(fā):多個(gè)協(xié)程可同時(shí)向同一個(gè)
Channel
發(fā)送數(shù)據(jù),這個(gè)很好理解岭辣,就是隊(duì)列入數(shù)據(jù)吱晒。如下所示:fun CoroutineScope.sendString(channel: SendChannel<String>, data: String, time: Long) = launch { while (true) { delay(time) log("send $data") channel.send(data) } } suspend fun main() = coroutineScope { val channel = Channel<String>() // 開(kāi)辟兩個(gè)協(xié)程發(fā)送數(shù)據(jù) sendString(channel, "hello", 200L) sendString(channel, "world", 500L) // 只接收 6 次 repeat(6) { val data = channel.receive() log("receive $data") } // 關(guān)閉所有子協(xié)程 coroutineContext.cancelChildren() }
上述代碼運(yùn)行結(jié)果如下圖所示:
-
廣播
Channel
:前面我們提到過(guò),Channel
發(fā)送端和接收端可以存在一對(duì)多的情形沦童,此時(shí)發(fā)送端的數(shù)據(jù)被接收端競(jìng)爭(zhēng)仑濒,一個(gè)數(shù)據(jù)只會(huì)被一個(gè)接收端獲取。
而如果想實(shí)現(xiàn)的是一個(gè)數(shù)據(jù)可以被所有的接收端獲取得到偷遗,則可以使用廣播通道BroadcastChannel
墩瞳,如下所示:fun CoroutineScope.register(id: Int, channel: BroadcastChannel<Int>) = launch { val receiverChannel = channel.openSubscription() for (i in receiverChannel) { log("receive broadcast:$id -> $i") } } suspend fun main() = coroutineScope { // 創(chuàng)建廣播通道 val channel = BroadcastChannel<Int>(Channel.BUFFERED) // 創(chuàng)建兩個(gè)協(xié)程,注冊(cè)到該 channel repeat(2) { register(it, channel) } // 發(fā)送廣播 for (i in 1..3) { delay(100) log("send broadcast: $i") channel.send(i) } coroutineContext.cancelChildren() }
注:最好是先進(jìn)行注冊(cè)氏豌,然后再發(fā)送數(shù)據(jù)喉酌,否則可能存在數(shù)據(jù)接收不完整問(wèn)題,因?yàn)?code>BroadcastChannel在發(fā)送數(shù)據(jù)時(shí)泵喘,如果發(fā)現(xiàn)沒(méi)有訂閱者泪电,則會(huì)直接拋棄到該條數(shù)據(jù)。
上述代碼中纪铺,使用函數(shù)
BroadcastChannel
來(lái)創(chuàng)建一個(gè)廣播通道相速,然后需要數(shù)據(jù)的協(xié)程可以通過(guò)openSubscription
來(lái)進(jìn)行注冊(cè),其結(jié)果如下圖所示:
可以看到鲜锚,每個(gè)注冊(cè)到廣播通道的協(xié)程都接收到每個(gè)數(shù)據(jù)突诬。
注:除了使用
BroadcastChannel
創(chuàng)建廣播通道外,還可以將現(xiàn)成的Channel
轉(zhuǎn)換成廣播:Channel<Int>(3).broadcast(4)
芜繁,這種方法其實(shí)是通過(guò)對(duì)源Channel
進(jìn)行一個(gè)讀取操作旺隙,轉(zhuǎn)換為一個(gè)全局廣播GlobalScope.broadcast
,如果此時(shí)還有其他協(xié)程讀取該源Channel
浆洗,則可能產(chǎn)生如下結(jié)果:-
只有源
Channel
發(fā)送數(shù)據(jù):此時(shí)接收協(xié)程和注冊(cè)廣播的協(xié)程會(huì)競(jìng)爭(zhēng)這些數(shù)據(jù) - 只使用廣播通道發(fā)送數(shù)據(jù):此時(shí)則只有注冊(cè)廣播的協(xié)程能接收到這些數(shù)據(jù)
-
同時(shí)使用源
Channel
和廣播發(fā)送數(shù)據(jù):此時(shí)數(shù)據(jù)十分混亂催束,不建議使用
-
只有源
冷數(shù)據(jù)流 Flow
一個(gè)suspend
函數(shù)可以異步返回一個(gè)數(shù)值,而借助 異步流伏社,我們就可以返回多個(gè)數(shù)值抠刺。
Kotlin 中提供了以下方法可以讓我們返回多個(gè)數(shù)值:
-
Sequence:對(duì)于 CPU 密集型的異步任務(wù)塔淤,我們可以使用 序列 來(lái)返回多個(gè)值,如下所示:
fun simple(): Sequence<Int> = sequence { // sequence builder for (i in 1..3) { Thread.sleep(100) // pretend we are computing it yield(i) // yield next value } } fun main() { simple().forEach { value -> println(value) } }
上述代碼在序列內(nèi)部通過(guò)
yield
函數(shù)將值傳遞出去速妖,從而完成多個(gè)值的傳遞高蜂。
但是任務(wù)運(yùn)行在主線(xiàn)程中,會(huì)卡住主線(xiàn)程罕容,此時(shí)可通過(guò)suspend
函數(shù)進(jìn)行解決备恤。 -
suspend
函數(shù):suspend
函數(shù)直接或間接運(yùn)行在協(xié)程中,使用得當(dāng)則可解決異步任務(wù)卡主線(xiàn)程問(wèn)題锦秒,如下所示:suspend fun simple(): List<Int> { delay(1000) // pretend we are doing something asynchronous here return listOf(1, 2, 3) } fun main() = runBlocking<Unit> { simple().forEach { value -> println(value) } }
由于一個(gè)
suspend
函數(shù)只能返回一個(gè)值露泊,上述代碼通過(guò)返回集合的方式,雖然返回了多個(gè)值旅择,但是是一次性的惭笑,對(duì)于數(shù)據(jù)量較大或是無(wú)限數(shù)據(jù)流的情況不太適用,此時(shí)可采用數(shù)據(jù)流Flow
方式進(jìn)行解決生真。 -
冷數(shù)據(jù)流 Flow:
Flow
是響應(yīng)式編程(比如:RxJava)與 Kotlin 協(xié)程結(jié)合而成的產(chǎn)物沉噩,將數(shù)據(jù)以流的方式進(jìn)行發(fā)送。fun simple(): Flow<Int> = flow { // flow builder for (i in 1..3) { delay // pretend we are doing something useful here emit(i) // emit next value } } fun main() = runBlocking<Unit> { // Collect the flow simple().collect { value -> println(value) } }
上述代碼通過(guò)
flow
函數(shù)生成一個(gè)異步數(shù)據(jù)流Flow
,在其block
塊內(nèi)同通過(guò)emit
發(fā)送數(shù)據(jù),每次調(diào)用emit
的同時(shí)著角,也會(huì)同樣調(diào)用ensureActive
函數(shù),對(duì)協(xié)程取消狀態(tài)進(jìn)行檢測(cè)畜眨。最后是通過(guò)collect
函數(shù)對(duì)數(shù)據(jù)進(jìn)行獲取消費(fèi)。注:
Flow
流是冷數(shù)據(jù)流痰哨,也即在調(diào)用flow
創(chuàng)建一個(gè)Flow
時(shí)胶果,只有在消費(fèi)時(shí)(比如:collect
),才會(huì)真正執(zhí)行數(shù)據(jù)生成邏輯斤斧。
下面對(duì)Flow
的一些特性進(jìn)行簡(jiǎn)介:
-
Flow
構(gòu)造器:Kotlin 提供了多種用于創(chuàng)建Flow
的方法早抠,以下介紹常用的幾個(gè):-
flow
:該方法是用于創(chuàng)建Flow
最基礎(chǔ)的方法,其參數(shù)block
內(nèi)可通過(guò)emit
函數(shù)發(fā)送數(shù)據(jù)撬讽,但是不能直接在其內(nèi)進(jìn)行線(xiàn)程切換蕊连,如下所示:flow<Int> { emit(1) withContext(Dispatchers.IO) { // BAD!! emit(2) } }.collect { println(it) }
channelFlow
:如果想在生成元素時(shí)切換調(diào)度器,可以通過(guò)該函數(shù)來(lái)創(chuàng)建Flow
:
channelFlow<Int> { send(1) withContext(Dispatchers.IO) { // BAD!! send(2) } }.collect { log(it) }
-
flowOf
:該方法可接收可變數(shù)量類(lèi)型參數(shù)游昼,將其轉(zhuǎn)化為Flow
:
flowOf<Int>(1,2,3,4)
-
asFlow
:集合或者序列可以通過(guò).asFlow()
方法轉(zhuǎn)化為Flow
甘苍,如下所示:
sequence<Int> { yield(1) }.asFlow().collect { log("sequence: $it") } listOf<Int>(1, 2, 3).asFlow().collect { log("listOf: $it") } setOf<Int>(1, 2, 3).asFlow().collect { log("setOf: $it") }
-
-
取消發(fā)送數(shù)據(jù):
Flow
的消費(fèi)依賴(lài)于終端操作符(比如collect
),終端操作符必須在協(xié)程中進(jìn)行調(diào)用烘豌,只要終端操作符所在的協(xié)程被取消時(shí)载庭,Flow
內(nèi)部檢測(cè)到取消狀態(tài),就會(huì)進(jìn)行取消操作。因此囚聚,取消協(xié)程靖榕,只需取消它所在的協(xié)程即可。如下所示:suspend fun main() { val job = GlobalScope.launch { val intFlow = flow { (1..3).forEach { delay(1000) log("emit $it") emit(it) } } intFlow.collect { log("receive $it") } } delay(2500) log("cancel outter coroutine!!") job.cancelAndJoin(); }
上述代碼運(yùn)行結(jié)果如下圖所示:
-
操作符:由于
Flow
是在協(xié)程上實(shí)現(xiàn)的響應(yīng)式編程模型顽铸,因此它與 RxJava 一樣茁计,都有著很多的操作符,可以對(duì)Flow
進(jìn)行轉(zhuǎn)換谓松,下面簡(jiǎn)單列舉幾個(gè)常見(jiàn)的操作符:-
map
/filter
/take
..:轉(zhuǎn)換操作符suspend fun main() { val value = (1..10).asFlow() .filter { it % 2 == 0 } .take(3) .map { delay(1000) log("map value: $it") it + 1 }.reduce { a, b -> a + b } println("result is $value") }
-
flattenConcat
:該函數(shù)用于將嵌套Flow<Flow<T>>
類(lèi)型進(jìn)行打平星压,轉(zhuǎn)換為Flow<T>
。如下所示:suspend fun main() { flow<Int> { List(5) { emit(it) } }.map { // it = Int flow { List(it) { emit(it) } } }.onEach { // it = Flow<Int> it.collect { value -> log(value) } }.collect() }
注:
flattenConcat
不會(huì)打亂拼接順序鬼譬,結(jié)果仍然是生產(chǎn)時(shí)的順序娜膘。
如果想并發(fā)拼接,則可以使用flattenMerge
拧簸,這將產(chǎn)生無(wú)序結(jié)果劲绪。
其實(shí),flattenConcat
在概念上等同于flattenMerge(concurrency = 1)
盆赤,但具備更好的性能。 transform
:該操作符是最通用的一種操作符歉眷,可用它模擬其他簡(jiǎn)單轉(zhuǎn)換的操作符牺六,比如map
,filter
汗捡,或者一些更加復(fù)雜的轉(zhuǎn)換淑际。
使用transform
操作符,我們可以對(duì)任意數(shù)值重復(fù)發(fā)送emit
任意次數(shù)扇住。如下所示:
suspend fun main() { flowOf(100) .transform { value -> repeat(3) { emit("send $value") // 重復(fù)發(fā)送 3 次 } }.collect { log(it) } }
-
collect
/toList
/toSet
/first
/single
/reduce
/fold
:終端操作符春缕,對(duì)Flow
流進(jìn)行消費(fèi)。如下所示:
val sum = (1..5).asFlow() .map { it * it } // squares of numbers from 1 to 5 .reduce { a, b -> a + b } // sum them (terminal operator) println(sum)
注:由于
Flow
的消費(fèi)端必須運(yùn)行在協(xié)程中艘蹋,因此終端操作符一定是suspend
函數(shù)锄贼。-
flowOn
:該操作符可以為Flow
指定數(shù)據(jù)流生產(chǎn)所需的調(diào)度器,如下所示:
flow { /*數(shù)據(jù)生產(chǎn)*/ }.flowOn(Dispatchers.IO).collect {...}
主要就是使用
flowOn
進(jìn)行線(xiàn)程切換女阀,因此會(huì)導(dǎo)致開(kāi)辟一個(gè)新的協(xié)程宅荤。
需要注意的是,flowOn
只會(huì)對(duì)其前面的操作產(chǎn)生效果浸策,這里也就是對(duì)flow {...}
產(chǎn)生影響冯键,對(duì)collect
沒(méi)有影響。-
buffer
:該操作符為數(shù)據(jù)流生產(chǎn)和終端操作符消費(fèi)各自開(kāi)辟一個(gè)協(xié)程庸汗,這樣兩者就能并發(fā)運(yùn)行惫确,然后通過(guò)一個(gè)Channel
將生產(chǎn)的數(shù)據(jù)流發(fā)送給終端消費(fèi),完成整套操作。默認(rèn)情況下改化,
Flow
流是順序操作的昧诱,也即所有操作符都作用在同一個(gè)協(xié)程中,如下所示:flowOf("A", "B", "C") .onEach { println("1$it") } .collect { println("2$it") }
上述代碼的運(yùn)行流程如下所示:
Q : -->-- [1A] -- [2A] -- [1B] -- [2B] -- [1C] -- [2C] -->--
可以看到所袁,元素是一個(gè)一個(gè)按生產(chǎn)先后順序依次進(jìn)行操作的盏档,這樣子的話(huà),如果數(shù)據(jù)流生產(chǎn)和消費(fèi)都存在耗時(shí)操作時(shí)燥爷,總時(shí)間就是所有操作耗時(shí)總合蜈亩,如下所示:
suspend fun main() { val time = measureTimeMillis { flowOf("A", "B", "C") .onEach {// 每生產(chǎn)一個(gè)數(shù)據(jù),耗時(shí) 100 ms delay(100) log("generate : $it") } .collect { delay(300) // 每消費(fèi)一個(gè)數(shù)據(jù)前翎,耗時(shí) 300 ms log("consume: $it") } } println("Collected in $time ms") }
上述代碼中稚配,每生產(chǎn)一個(gè)數(shù)據(jù)耗時(shí) 100 ms,每消費(fèi)一個(gè)數(shù)據(jù)耗時(shí) 300 ms港华,如果以默認(rèn)的按順序消費(fèi)道川,則總時(shí)間耗費(fèi)為:(100 + 300)*3 = 1200 ms 左右。
實(shí)際運(yùn)行結(jié)果如下圖所示:
但是如果我們?cè)跀?shù)據(jù)和生產(chǎn)消費(fèi)中間添加一個(gè)
buffer
操作立宜,如下所示:suspend fun main() { val time = measureTimeMillis { flowOf("A", "B", "C") .onEach {// 每生產(chǎn)一個(gè)數(shù)據(jù)冒萄,耗時(shí) 100 ms delay(100) log("generate : $it") } .buffer() // 分割生產(chǎn)和消費(fèi) .collect { delay(300) // 每消費(fèi)一個(gè)數(shù)據(jù),耗時(shí) 300 ms log("consume: $it") } } println("Collected in $time ms") }
上述代碼運(yùn)行結(jié)果如下圖所示:
可以看到橙数,僅僅只發(fā)送 3 個(gè)數(shù)據(jù)尊流,就有將近 60 ms 的時(shí)間節(jié)省,如果數(shù)據(jù)量更大點(diǎn)灯帮,節(jié)省的時(shí)間會(huì)更加客觀崖技。
但更重要的是,從上圖中可以看到钟哥,數(shù)據(jù)生產(chǎn)和消費(fèi)是并行運(yùn)行的迎献,因?yàn)榫腿缥覀兩鲜鏊f(shuō)的,buffer
會(huì)將其上和其下的操作放置到兩個(gè)獨(dú)立的協(xié)程中腻贰,從而增加了并發(fā)吁恍,節(jié)省時(shí)間。這里借用官網(wǎng)的一副示意圖银受,會(huì)更加清晰整個(gè)過(guò)程:
綜上:當(dāng)數(shù)據(jù)生產(chǎn)和終端消費(fèi)都存在耗時(shí)操作時(shí)践盼,使用
buffer
可以提高并發(fā),節(jié)省時(shí)間宾巍。 -
catch
:Flow
提供了catch
操作符咕幻,可以很方便對(duì)異常進(jìn)行捕獲。如下所示:suspend fun main() { flow { emit(1) // 手動(dòng)拋出異常 throw ArithmeticException("divide 0") }.catch { t: Throwable -> log("caught error: $t") }.collect { log(it) } }
注:
catch
只能捕獲它上游的異常顶霞,因此肄程,通常將catch
放置在最后操作符位置锣吼,也即終端操作符前一個(gè)。
但是對(duì)于終端操作符發(fā)生的異常蓝厌,catch
無(wú)法捕捉得到玄叠。此時(shí),其實(shí)可以通過(guò)將數(shù)據(jù)消費(fèi)轉(zhuǎn)移到onEach
進(jìn)行處理拓提,如下所示:suspend fun main() { flow { emit(1) // 手動(dòng)拋出異常 throw ArithmeticException("divide 0") }.onEach { log(it) }.catch { t: Throwable -> log("caught error: $t") }.collect() }
注意
catch
需要放置到onEach
前面读恃,這樣才能捕獲到onEach
出現(xiàn)的異常。
同時(shí)代态,我們使用的終端操作符函數(shù)為collect()
寺惫,它會(huì)觸發(fā)數(shù)據(jù)消費(fèi)過(guò)程,但本身不對(duì)數(shù)據(jù)進(jìn)行處理蹦疑。
-
cancelable
:讓Flow
流可以被取消西雀。上文我們介紹過(guò),如果未在協(xié)程異步任務(wù)中對(duì)取消狀態(tài)進(jìn)行檢測(cè)歉摧,那么協(xié)程是無(wú)法響應(yīng)
cancel
的艇肴,而對(duì)于Flow
流來(lái)說(shuō),也是一樣的叁温。
如果Flow
流內(nèi)部是通過(guò)emit
進(jìn)行數(shù)據(jù)發(fā)送再悼,由于本身就是一個(gè)suspend
函數(shù)(其實(shí)質(zhì)是其內(nèi)部會(huì)通過(guò)ensureActive
對(duì)取消狀態(tài)進(jìn)行檢測(cè)),故可以響應(yīng)cancle
操作券盅。但是如果是由比如intRange.asFlow
這類(lèi)直接生成Flow
帮哈,其內(nèi)部未對(duì)取消狀態(tài)進(jìn)行檢測(cè),故無(wú)法響應(yīng)cancel
操作锰镀,當(dāng)然我們可以通過(guò)onEach
操作符為其手動(dòng)添加檢測(cè):.onEach { currentCoroutineContext().ensureActive() }
,但是使用cancelable
操作符會(huì)更加簡(jiǎn)潔方便咖刃,如下所示:fun main() = runBlocking<Unit> { (1..5).asFlow() .cancellable() .collect { value -> if (value == 3) cancel() println(value) }
-
更多操作符內(nèi)容泳炉,,請(qǐng)參考:Asynchronous Flow
協(xié)程并發(fā)安全
由于 Kotlin 協(xié)程在 JVM 上的底層實(shí)現(xiàn)是基于線(xiàn)程的嚎杨,因此協(xié)程間狀態(tài)共享存在數(shù)據(jù)競(jìng)爭(zhēng)問(wèn)題花鹅,此時(shí)需要進(jìn)行數(shù)據(jù)同步操作。
如果協(xié)程運(yùn)行不同的調(diào)度器中(或者運(yùn)行在多線(xiàn)程調(diào)度器中枫浙,比如Dispatchers.Default
)刨肃,則其異步任務(wù)實(shí)際上運(yùn)行在不同的線(xiàn)程上,此時(shí)通過(guò) Java 自帶的一些線(xiàn)程安全操作(比如加鎖箩帚,原子類(lèi)...)真友,就可以保證協(xié)程并發(fā)安全。
但是協(xié)程本身也提供了一些并發(fā)安全措施紧帕,主要有如下幾方面內(nèi)容:
-
線(xiàn)程細(xì)粒度控制:如果說(shuō)很多個(gè)協(xié)程(假設(shè)運(yùn)行在不同的線(xiàn)程中)對(duì)同一個(gè)數(shù)據(jù)進(jìn)行修改盔然,其實(shí)就是線(xiàn)程并發(fā)讀寫(xiě)數(shù)據(jù)桅打,這樣數(shù)據(jù)就不安全。
其實(shí)我們可以將對(duì)數(shù)據(jù)的修改行為都調(diào)度到同一條線(xiàn)程中愈案,這樣就不會(huì)出現(xiàn)競(jìng)爭(zhēng)導(dǎo)致數(shù)據(jù)出錯(cuò)問(wèn)題挺尾。如下所示:suspend fun massiveRun(action: suspend () -> Unit) { val n = 100 // number of coroutines to launch val k = 1000 // times an action is repeated by each coroutine val time = measureTimeMillis { coroutineScope { // scope for coroutines repeat(n) { // 創(chuàng)建 100 個(gè)協(xié)程 launch { // 執(zhí)行 1000 次動(dòng)作 repeat(k) { action() } } } } } println("Completed ${n * k} actions in $time ms") } // 任務(wù)調(diào)度器:?jiǎn)我痪€(xiàn)程 val counterContext = newSingleThreadContext("CounterContext") // 共享數(shù)據(jù) var counter = 0 fun main() = runBlocking { withContext(Dispatchers.Default) { massiveRun { // confine each increment to a single-threaded context withContext(counterContext) { counter++ } } } println("Counter = $counter") }
上述代碼在所有協(xié)程中,通過(guò)
withContext
將對(duì)數(shù)據(jù)的修改都切換到全局唯一的一條線(xiàn)程上站绪,保證了數(shù)據(jù)并發(fā)安全遭铺。結(jié)果如下:
-
線(xiàn)程粗粒度控制:直接在一條線(xiàn)程上開(kāi)啟多個(gè)協(xié)程,這樣就自然保證了數(shù)據(jù)并發(fā)安全恢准。如下所示:
// 任務(wù)調(diào)度器:?jiǎn)我痪€(xiàn)程 val counterContext = newSingleThreadContext("CounterContext") // 共享數(shù)據(jù) var counter = 0 fun main() = runBlocking { // confine everything to a single-threaded context withContext(counterContext) { massiveRun { counter++ } } println("Counter = $counter") }
執(zhí)行結(jié)果如下所示:
可以看到魂挂,效果比細(xì)粒度控制快了兩個(gè)量級(jí)。一個(gè)是因?yàn)榫€(xiàn)程細(xì)粒度控制其實(shí)開(kāi)辟了很多不必要的線(xiàn)程顷歌,系統(tǒng)開(kāi)銷(xiāo)很大锰蓬,然后各個(gè)協(xié)程會(huì)把數(shù)據(jù)修改工作分發(fā)到另一個(gè)全局唯一的任務(wù)協(xié)程中,這就涉及到線(xiàn)程切換問(wèn)題...而粗粒度是直接在一條線(xiàn)程上開(kāi)啟很多個(gè)協(xié)程眯漩,異步完成數(shù)據(jù)修改(由于在同一條線(xiàn)程上修改數(shù)據(jù)芹扭,天然線(xiàn)程安全),幾乎不占用系統(tǒng)資源赦抖,因此速度很快舱卡。這從一方面也驗(yàn)證了:協(xié)程是輕量級(jí)線(xiàn)程。
-
互斥(Mutual exclusion):互斥方案就是使用永不并發(fā)的 關(guān)鍵部分(critical section) 來(lái)保護(hù)共享狀態(tài)修改队萤。
在阻塞調(diào)用中轮锥,通常使用synchronized
或ReentrantLock
來(lái)控制線(xiàn)程互斥。而在協(xié)程中要尔,提供的加鎖機(jī)制為Mutex
舍杜,可以使用Mutex.lock
和Mutex.unlock
來(lái)劃定關(guān)鍵部分。Mutext
相對(duì)synchronized
等鎖的最大區(qū)別是:Mutext.lock
方法是一個(gè)suspend
方法赵辕,因此它不會(huì)阻塞線(xiàn)程既绩。注:
Mutext
提供了一個(gè)簡(jiǎn)便方法withLock
,其相當(dāng)于mutext.lock(); try{ ... } finally {mutext.unlock()}
还惠。下面使用
Mutext
來(lái)修改上面代碼饲握,如下所示:// 共享數(shù)據(jù) var counter = 0 // 創(chuàng)建一個(gè)協(xié)程互斥鎖 val mutex = Mutex() fun main() = runBlocking { withContext(Dispatchers.Default) { massiveRun { // protect each increment with lock mutex.withLock { counter++ } } } println("Counter = $counter") }
直接對(duì)修改部分加鎖即可,其運(yùn)行結(jié)果如下所示:
直觀看來(lái)蚕键,加鎖的效率不如單線(xiàn)程創(chuàng)建多協(xié)程救欧,一方面加鎖的確會(huì)更慢,但是這里更多的開(kāi)銷(xiāo)應(yīng)該是調(diào)度器默認(rèn)開(kāi)啟的線(xiàn)程池锣光。
-
Actors:一個(gè)
actor
實(shí)體是由一個(gè)協(xié)程笆怠,被封裝到協(xié)程的狀態(tài)和一個(gè)通道Channel
組合而成的。
Kotlin 提供了actor
協(xié)程構(gòu)造器嫉晶,可用于方便創(chuàng)建帶有可以接收消息的郵箱Channel
骑疆,同時(shí)將發(fā)送Channel
合并進(jìn)返回的Job
當(dāng)中田篇。使用
actor
來(lái)達(dá)到數(shù)據(jù)安全的方式如下:- 通過(guò)
actor
方法創(chuàng)建的協(xié)程,發(fā)送的信息會(huì)被其內(nèi)部代碼塊接收得到箍铭,因此首先需要定義一個(gè)消息體:
// Message types for counterActor sealed class CounterMsg object IncCounter : CounterMsg() // one-way message to increment counter class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg() // a request with reply
- 然后泊柬,創(chuàng)建一個(gè)發(fā)送和接收消息體的
actor
:
// This function launches a new counter actor fun CoroutineScope.counterActor() = actor<CounterMsg> { var counter = 0 // actor state for (msg in channel) { // iterate over incoming messages when (msg) { is IncCounter -> counter++ is GetCounter -> msg.response.complete(counter) } } }
注:
actor
內(nèi)部包含狀態(tài),對(duì)應(yīng)我們上面的例子诈火,這個(gè)狀態(tài)就是被修改的數(shù)據(jù)counter
兽赁。- 并發(fā)修改數(shù)據(jù)的協(xié)程直接使用
actor
對(duì)象發(fā)送修改信息即可:
fun main() = runBlocking<Unit> { val sendChannel = counterActor() // create the actor withContext(Dispatchers.Default) { massiveRun { sendChannel.send(IncCounter) } } // send a message to get a counter value from an actor val response = CompletableDeferred<Int>() sendChannel.send(GetCounter(response)) println("Counter = ${response.await()}") sendChannel.close() // shutdown the actor }
上述代碼運(yùn)行結(jié)果如下所示:
其實(shí),
actor
的本質(zhì)就是一個(gè) 多對(duì)一 的Channel
冷守,多個(gè)協(xié)程通過(guò)Channel
發(fā)送修改事件給到actor
刀崖,actor
修改自身內(nèi)部狀態(tài)即可,所有的修改操作都發(fā)生在actor
本身的協(xié)程中拍摇,故數(shù)據(jù)安全亮钦,不用考慮線(xiàn)程同步。注:
actor
是一個(gè)雙重的produce
協(xié)程構(gòu)造器充活,actor
與接收通道相關(guān)聯(lián)(內(nèi)部代碼塊接收信息)蜂莉,produce
與發(fā)送通道相關(guān)聯(lián)(內(nèi)部代碼用于發(fā)送消息)。 - 通過(guò)
Select Expression (experimental)
Select 表達(dá)式混卵,即 多路復(fù)用映穗,與 Unix 中的 IO 多路復(fù)用功能相似。
Kotlin 中的 多路復(fù)用select幕随,可以對(duì)多個(gè)協(xié)程某些操作進(jìn)行復(fù)用蚁滋,比如:
-
復(fù)用多個(gè)
await
:通常使用async
來(lái)創(chuàng)建一個(gè)協(xié)程,那么就必須使用await
來(lái)獲取協(xié)程結(jié)果赘淮,這是同步調(diào)用辕录。
但是借助多路復(fù)用select,我們可以使用onAwait
來(lái)自動(dòng)獲取結(jié)果梢卸,無(wú)需顯示調(diào)用await
踏拜,避免了阻塞協(xié)程操作,這是異步調(diào)用低剔。具體代碼如下所示:fun main() = runBlocking { val deferred = async { delay(200) "Hello World" } val result = select<String> { deferred.onAwait { result -> "onAwait receive async result: $result" } } log("result: $result") }
結(jié)果如下所示:
多路復(fù)用select 除了能將同步調(diào)用轉(zhuǎn)化為異步調(diào)用外,他還具備事件競(jìng)爭(zhēng)機(jī)制肮塞,即自動(dòng)獲取最先發(fā)生的事件襟齿。
比如,如果我們現(xiàn)在有兩個(gè)協(xié)程進(jìn)行網(wǎng)絡(luò)請(qǐng)求枕赵,我們希望獲取第一個(gè)最先返回的結(jié)果猜欺,那么就可以使用多路復(fù)用
await
,如下所示:@ExperimentalTime fun main() = runBlocking { val deferred1 = async { val time = DurationUnit.SECONDS.toMillis(Random.nextLong(5)) log("1 ---> delay: $time") delay(time) log("1 ---> done") "Hello World" } val deferred2 = async { val time = DurationUnit.SECONDS.toMillis(Random.nextLong(3)) log("2 ---> delay: $time") delay(time) log("2 ---> done") "Hi Select" } val result = select<String> { deferred1.onAwait { "1111 first --> result: $it" } deferred2.onAwait { "2222 first --> result: $it" } } log("result: $result") }
多次運(yùn)行上述代碼拷窜,得到的結(jié)果始終返回最快的協(xié)程結(jié)果开皿,如下所示:
可以看到涧黄,
select
返回值是其某一分支的返回值,且select
返回結(jié)果后赋荆,并不會(huì)取消其他仍在運(yùn)行的協(xié)程笋妥。 -
多路復(fù)用
Channel
:即多個(gè)Channel
發(fā)送事件,使用onReceive
可以選擇最先發(fā)送成功的數(shù)據(jù)窄潭。@ExperimentalTime fun randomDelay(until: Long = 5): Long { return DurationUnit.SECONDS.toMillis(Random.nextLong(until)) } @ExperimentalTime fun main() = runBlocking { val channel1 = produce<String> { while (true) { val time = randomDelay() log("channel1 --> delay: $time") delay(time) send("send by channel 1111") } } val channel2 = produce<String> { while (true) { val time = randomDelay() log("channel2 --> delay: $time") delay(time) send("send by channel 2222") } } repeat(5) { log("------- start select -------") val result = select<String> { channel1.onReceive { it } channel2.onReceiveOrNull { it ?: "Channel2 is closed" } } log("result: $result") } delay(2000) coroutineContext.cancelChildren() }
注:對(duì)于
onReceive
春宣,當(dāng)對(duì)應(yīng)的Channel
被關(guān)閉時(shí),select
會(huì)直接拋出異常嫉你。
對(duì)于onReceiveOrNull
月帝,當(dāng)對(duì)應(yīng)的Channel
被關(guān)閉時(shí),直接返回null
幽污。
多路復(fù)用select 除了支持上述的onAwait
和onReceive
事件外嚷辅,它還支持很多其他事件,比如onJoin
距误,onSend
...
事實(shí)上簸搞,所有能被select
支持的事件都是SelectClasueN
類(lèi)型,具體包含以下幾種:
SelectClasue0
:表示對(duì)應(yīng)事件沒(méi)有返回值深寥。比如join
沒(méi)有返回值攘乒,其對(duì)應(yīng)的SelectClasueN
類(lèi)型事件即為onJoin
SelectClasue1
:表示對(duì)應(yīng)事件有返回值。比如onAwait
惋鹅,onReceive
...-
SelectClasue2
:表示對(duì)應(yīng)事件有返回值则酝,且還帶有一個(gè)額外的參數(shù)。比如onSend
闰集,其第一參數(shù)表示要發(fā)送的值沽讹,第二個(gè)參數(shù)sendChannel
表示數(shù)據(jù)發(fā)送到的Channel
對(duì)象。如下所示:fun main() = runBlocking { val channel = Channel<String>() launch { val result = channel.receive() log("receive: $result") } select<Unit> { channel.onSend("1") { sendChannel -> log("send to Channel: $sendChannel") sendChannel.send(Random.nextInt().toString()) } } }
...
更多具體內(nèi)容武鲁,請(qǐng)參考:Select Expression
綜上爽雄,多路復(fù)用select 的主要作用就是可以同時(shí)等待多個(gè)協(xié)程,并選擇獲取最先可用協(xié)程的結(jié)果沐鼠。
一些注意點(diǎn)
-
對(duì)一個(gè)父協(xié)程進(jìn)行取消操作挚瘟,會(huì)自動(dòng)取消它作用域內(nèi)所有的協(xié)程:
suspend fun showStatus(type: String) { log("$type starts") try { delay(100) log("$type done") } finally { log("$type cancelled") } } fun main() = runBlocking { val scope = CoroutineScope(Dispatchers.Default) scope.launch { launch { showStatus("launch") } launch { async { showStatus("async") }.await() } launch { withContext(Dispatchers.IO) { showStatus("withContext") } } coroutineScope { showStatus("coroutineScope") } // ... } delay(80) log("父協(xié)程取消") // 取消父協(xié)程,導(dǎo)致其作用域內(nèi)所有協(xié)程取消 scope.cancel() }
當(dāng)調(diào)用最外層
scope.cancel()
時(shí)饲梭,其作用域內(nèi)所有子協(xié)程乘盖、子子協(xié)程...都會(huì)被取消。結(jié)果如下圖所示: -
創(chuàng)建一個(gè)協(xié)程時(shí)憔涉,可以為其上下文對(duì)象添加一個(gè)
Job
對(duì)象订框,對(duì)該Job
對(duì)象調(diào)用取消操作時(shí),與該Job
綁定的協(xié)程及其子協(xié)程都會(huì)被取消:fun main() = runBlocking { val job = Job() // 將協(xié)程與 Job 綁定 val scope = CoroutineScope(Dispatchers.Default + job) scope.launch { // ... } delay(80) // 取消 Job 綁定的協(xié)程兜叨,導(dǎo)致其作用域內(nèi)所有協(xié)程取消 job.cancel() }
上述代碼與第 1 點(diǎn)的代碼基本一致穿扳,只是將
CoroutineScope
與Job
綁定了起來(lái)衩侥,因此,調(diào)用Job.cancel()
就可以取消與其綁定的協(xié)程矛物,從而該協(xié)程作用域內(nèi)的所有協(xié)程也都會(huì)被取消茫死。 -
協(xié)程作用域內(nèi)應(yīng)當(dāng)使用
launch
、async
泽谨、coroutineScope
璧榄、supervisorScope
這些能繼承父協(xié)程上下文對(duì)象或者withContext
等具備結(jié)構(gòu)化并發(fā)特性的協(xié)程構(gòu)造器來(lái)創(chuàng)建子協(xié)程,這樣才能滿(mǎn)足上述 1吧雹,2 條目結(jié)果骨杂,不要使用GlobalScope.launch
,CoroutineScope.launch
等創(chuàng)建子協(xié)程雄卷,因?yàn)檫@些操作創(chuàng)建的協(xié)程運(yùn)行在其他作用域內(nèi)搓蚪,跳脫了當(dāng)前協(xié)程作用域,導(dǎo)致當(dāng)前協(xié)程無(wú)法對(duì)其進(jìn)行控制:fun main() = runBlocking { val scope = CoroutineScope(Dispatchers.Default) scope.launch { GlobalScope.launch { showStatus("GlobalScope") } CoroutineScope(Dispatchers.Default).launch { showStatus("CoroutineScope") } } delay(80) log("父協(xié)程取消") scope.cancel() // 避免程序退出 delay(1000) }
結(jié)果如下圖所示:
可以看到丁鹉,父協(xié)程取消妒潭,子協(xié)程仍在運(yùn)行,父協(xié)程不具備控制子協(xié)程能力揣钦。