在今年的三月份沟突,我因為需要為項目搭建一個新的網(wǎng)絡(luò)請求框架開始接觸 Kotlin 協(xié)程揩悄。那時我司項目中同時存在著兩種網(wǎng)絡(luò)請求方式,采用的技術(shù)棧各不相同斗这,Java动猬、Kotlin、RxJava表箭、LiveData 各種混搭赁咙,技術(shù)棧的不統(tǒng)一長遠(yuǎn)來看肯定是會造成很多不便的,所以就打算封裝一個新的網(wǎng)絡(luò)請求框架來作為項目的統(tǒng)一規(guī)范(前面的人估計也是這么想的免钻,所以就造成了同個項目中的網(wǎng)絡(luò)請求方式越來越多 ????)彼水,那么就需要考慮采用什么技術(shù)棧來實現(xiàn)了
采用 Kotlin 語言來實現(xiàn)必不可少,都這年頭了還用 Java 也說不過去极舔。Retrofit 也必不可少凤覆,而目前 Retrofit 也已經(jīng)支持 Kotlin 協(xié)程了,Google 官方推出的 Jetpack 協(xié)程擴展庫也越來越多拆魏,就最終決定棄用 RxJava 擁抱 Kotlin 協(xié)程盯桦,將協(xié)程作為技術(shù)棧之一
當(dāng)時我是通過翻譯協(xié)程官方文檔來作為入門手段,到現(xiàn)在也大半年了渤刃,回頭來看感覺官方文檔還是挺晦澀難懂的拥峦,就想著再來寫一兩篇入門或進階的文章來加深下理解,希望對你有所幫助
附上我當(dāng)時翻譯的協(xié)程官方文檔:
- Kotlin 協(xié)程官方文檔(1)-協(xié)程基礎(chǔ)(Coroutine Basics)
- Kotlin 協(xié)程官方文檔(2)-取消和超時(Cancellation and Timeouts)
- Kotlin 協(xié)程官方文檔(3)-組合掛起函數(shù)(Coroutine Context and Dispatchers)
- Kotlin 協(xié)程官方文檔(4)-協(xié)程上下文和調(diào)度器(Coroutine Context and Dispatchers)
- Kotlin 協(xié)程官方文檔(5)-異步流(Asynchronous Flow)
- Kotlin 協(xié)程官方文檔(6)-通道(Channels)
- Kotlin 協(xié)程官方文檔(7)-異常處理(Exception Handling)
- Kotlin 協(xié)程官方文檔(8)-共享可變狀態(tài)和并發(fā)性(Shared mutable state and concurrency)
- Kotlin 協(xié)程官方文檔(9)-選擇表達(dá)式(實驗階段)(Select Expression (experimental)
一卖子、Kotlin 協(xié)程
Kotlin 協(xié)程提供了一種全新處理并發(fā)的方式略号,你可以在 Android 平臺上使用它來簡化異步執(zhí)行的代碼。協(xié)程從 Kotlin 1.3 版本開始引入,但這一概念在編程世界誕生的黎明之際就有了玄柠,最早使用協(xié)程的編程語言可以追溯到 1967 年的 Simula 語言突梦。在過去幾年間,協(xié)程這個概念發(fā)展勢頭迅猛羽利,現(xiàn)已經(jīng)被諸多主流編程語言采用宫患,比如 Javascript、C#铐伴、Python撮奏、Ruby 以及 Go 等俏讹。Kotlin 協(xié)程是基于來自其他語言的既定概念
Google 官方推薦將 Kotlin 協(xié)程作為在 Android 上進行異步編程的解決方案当宴,值得關(guān)注的功能點包括:
- 輕量:可以在單個線程上運行多個協(xié)程,因為協(xié)程支持掛起泽疆,不會使正在運行協(xié)程的線程阻塞户矢。掛起比阻塞節(jié)省內(nèi)存,且支持多個并行操作
- 內(nèi)存泄露更少:使用結(jié)構(gòu)化并發(fā)機制在一個作用域內(nèi)執(zhí)行多個操作
- 內(nèi)置取消支持:取消功能會自動通過正在運行的協(xié)程層次結(jié)構(gòu)傳播
- Jetpack 集成:許多 Jetpack 庫都包含提供全面協(xié)程支持的擴展殉疼。某些庫還提供自己的協(xié)程作用域梯浪,可供你用于結(jié)構(gòu)化并發(fā)
如果是用于 Android 平臺的話,可以只引用以下的 coroutines-android瓢娜,當(dāng)中已經(jīng)包含了 coroutines-core
implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.5.2'
implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-android:1.5.2'
二挂洛、第一個協(xié)程
協(xié)程可以稱為 輕量級線程。Kotlin 協(xié)程在 CoroutineScope 的上下文中通過 launch眠砾、async 等 協(xié)程構(gòu)造器(CoroutineBuilder)來聲明并啟動
fun main() {
GlobalScope.launch(context = Dispatchers.IO) {
//延時一秒
delay(1000)
log("launch")
}
//主動休眠兩秒虏劲,防止 JVM 過快退出
Thread.sleep(2000)
log("end")
}
private fun log(msg: Any?) = println("[${Thread.currentThread().name}] $msg")
[DefaultDispatcher-worker-1] launch
[main] end
在上面的例子中,通過 GlobalScope(全局作用域)啟動了一個協(xié)程褒颈,在延遲一秒后輸出一行日志柒巫。從輸出結(jié)果可以看出來,啟動的協(xié)程是運行在協(xié)程內(nèi)部的線程池中谷丸。雖然從表現(xiàn)結(jié)果上來看堡掏,啟動一個協(xié)程類似于我們直接使用 Thread 來執(zhí)行耗時任務(wù),但實際上協(xié)程和線程有著本質(zhì)上的區(qū)別刨疼。通過使用協(xié)程泉唁,可以極大的提高線程的并發(fā)效率,避免以往的嵌套回調(diào)地獄揩慕,極大提高了代碼的可讀性
以上代碼就涉及到了協(xié)程的四個基礎(chǔ)概念:
- suspend function亭畜。即掛起函數(shù),delay() 就是協(xié)程庫提供的一個用于實現(xiàn)非阻塞式延時的掛起函數(shù)
- CoroutineScope漩绵。即協(xié)程作用域贱案,GlobalScope 是 CoroutineScope 的一個實現(xiàn)類,用于指定協(xié)程的作用范圍,可用于管理多個協(xié)程的生命周期宝踪,所有協(xié)程都需要通過 CoroutineScope 來啟動
- CoroutineContext侨糟。即協(xié)程上下文,包含多種類型的配置參數(shù)瘩燥。
Dispatchers.IO
就是 CoroutineContext 這個抽象概念的一種實現(xiàn)秕重,用于指定協(xié)程的運行載體,即用于指定協(xié)程要運行在哪類線程上 - CoroutineBuilder厉膀。即協(xié)程構(gòu)建器溶耘,協(xié)程在 CoroutineScope 的上下文中通過 launch、async 等協(xié)程構(gòu)建器來進行聲明并啟動服鹅。launch凳兵、async 均被聲明為 CoroutineScope 的擴展方法
三、suspend
如果上述例子試圖直接在 GlobalScope 外調(diào)用 delay()
函數(shù)的話企软,IDE 就會提示一個錯誤:Suspend function 'delay' should be called only from a coroutine or another suspend function庐扫。意思是:delay()
函數(shù)是一個掛起函數(shù),只能由協(xié)程或者由其它掛起函數(shù)來調(diào)用
delay()
函數(shù)就使用了 suspend 進行修飾仗哨,用 suspend 修飾的函數(shù)就是掛起函數(shù)
public suspend fun delay(timeMillis: Long)
讀者在網(wǎng)上看關(guān)于協(xié)程的文章的時候形庭,應(yīng)該經(jīng)常會看到這么一句話:掛起函數(shù)不會阻塞其所在線程,而是會將協(xié)程掛起厌漂,在特定的時候才再恢復(fù)執(zhí)行
對于這句話我的理解是:delay()
函數(shù)類似于 Java 中的 Thread.sleep()
萨醒,而之所以說 delay()
函數(shù)是非阻塞的,是因為它和單純的線程休眠有著本質(zhì)的區(qū)別苇倡。例如富纸,當(dāng)在 ThreadA 上運行的 CoroutineA 調(diào)用了delay(1000L)
函數(shù)指定延遲一秒后再運行,ThreadA 會轉(zhuǎn)而去執(zhí)行 CoroutineB雏节,等到一秒后再來繼續(xù)執(zhí)行 CoroutineA胜嗓。所以,ThreadA 并不會因為 CoroutineA 的延時而阻塞钩乍,而是能繼續(xù)去執(zhí)行其它任務(wù)辞州,所以掛起函數(shù)并不會阻塞其所在線程,這樣就極大地提高了線程的并發(fā)靈活度寥粹,最大化了線程的利用效率变过。而如果是使用Thread.sleep()
的話,線程就只能干等著而不能去執(zhí)行其它任務(wù)涝涤,降低了線程的利用效率
協(xié)程是運行于線程上的媚狰,一個線程可以運行多個(幾千上萬個)協(xié)程。線程的調(diào)度行為是由操作系統(tǒng)來管理的阔拳,而協(xié)程的調(diào)度行為是可以由開發(fā)者來指定并由編譯器來實現(xiàn)的崭孤,協(xié)程能夠細(xì)粒度地控制多個任務(wù)的執(zhí)行時機和執(zhí)行線程,當(dāng)線程所執(zhí)行的當(dāng)前協(xié)程被 suspend 后,該線程也可以騰出資源去處理其他任務(wù)
四辨宠、suspend 掛起與恢復(fù)
協(xié)程在常規(guī)函數(shù)的基礎(chǔ)上添加了兩項操作用于處理長時間運行的任務(wù)遗锣,在invoke
(或 call
)和return
之外,協(xié)程添加了suspend
和 resume
:
-
suspend
用于暫停執(zhí)行當(dāng)前協(xié)程嗤形,并保存所有局部變量 -
resume
用于讓已暫停的協(xié)程從暫停處繼續(xù)執(zhí)行
suspend 函數(shù)只能由其它 suspend 函數(shù)調(diào)用精偿,或者是由協(xié)程來調(diào)用
以下示例展示了一項任務(wù)(假設(shè) get 方法是一個網(wǎng)絡(luò)請求任務(wù))的簡單協(xié)程實現(xiàn):
suspend fun fetchDocs() { // Dispatchers.Main
val result = get("https://developer.android.com") // Dispatchers.IO for `get`
show(result) // Dispatchers.Main
}
suspend fun get(url: String) = withContext(Dispatchers.IO) { /* ... */ }
在上面的示例中,get()
仍在主線程上被調(diào)用赋兵,但它會在啟動網(wǎng)絡(luò)請求之前暫停協(xié)程笔咽。get()
主體內(nèi)通過調(diào)用 withContext(Dispatchers.IO)
創(chuàng)建了一個在 IO 線程池中運行的代碼塊,在該塊內(nèi)的任何代碼都始終通過 IO 調(diào)度器執(zhí)行霹期。當(dāng)網(wǎng)絡(luò)請求完成后叶组,get()
會恢復(fù)已暫停的協(xié)程,使得主線程協(xié)程可以直接拿到網(wǎng)絡(luò)請求結(jié)果而不用使用回調(diào)來通知主線程经伙。Retrofit 就是以這種方式來實現(xiàn)對協(xié)程的支持
Kotlin 使用 堆棧幀 來管理要運行哪個函數(shù)以及所有局部變量扶叉。暫停協(xié)程時,系統(tǒng)會復(fù)制并保存當(dāng)前的堆棧幀以供稍后使用帕膜。恢復(fù)時溢十,會將堆棧幀從其保存的位置復(fù)制回來垮刹,然后函數(shù)再次開始運行。雖然代碼可能看起來像普通的順序阻塞請求张弛,協(xié)程也能確保網(wǎng)絡(luò)請求不會阻塞主線程
在主線程進行的 暫停協(xié)程 和 恢復(fù)協(xié)程 的兩個操作荒典,既實現(xiàn)了將耗時任務(wù)交由后臺線程完成,保障了主線程安全吞鸭,又以同步代碼的方式完成了實際上的多線程異步調(diào)用寺董。可以說刻剥,在 Android 平臺上協(xié)程主要就用來解決兩個問題:
- 處理耗時任務(wù) (Long running tasks)遮咖,這種任務(wù)常常會阻塞主線程
- 保證主線程安全 (Main-safety),即確保安全地從主線程調(diào)用任何 suspend 函數(shù)
五造虏、CoroutineScope
CoroutineScope 即 協(xié)程作用域御吞,用于對協(xié)程進行追蹤看彼。如果我們啟動了多個協(xié)程但是沒有一個可以對其進行統(tǒng)一管理的途徑的話摧冀,就會導(dǎo)致我們的代碼臃腫雜亂晤锹,甚至發(fā)生內(nèi)存泄露或者任務(wù)泄露粘室。為了確保所有的協(xié)程都會被追蹤践剂,Kotlin 不允許在沒有 CoroutineScope 的情況下啟動協(xié)程船万。CoroutineScope 可被看作是一個具有超能力的 ExecutorService 的輕量級版本籽慢。它能啟動協(xié)程犯助,同時這個協(xié)程還具備上文所說的 suspend 和 resume 的優(yōu)勢
所有的協(xié)程都需要通過 CoroutineScope 來啟動,它會跟蹤通過 launch
或 async
創(chuàng)建的所有協(xié)程暑脆,你可以隨時調(diào)用 scope.cancel()
取消正在運行的協(xié)程交排。CoroutineScope 本身并不運行協(xié)程,它只是確保你不會失去對協(xié)程的追蹤饵筑,即使協(xié)程被掛起也是如此埃篓。在 Android 中,某些 ktx 庫為某些生命周期類提供了自己的 CoroutineScope根资,例如架专,ViewModel 有 viewModelScope
,Lifecycle 有 lifecycleScope
CoroutineScope 大體上可以分為三種:
- GlobalScope玄帕。即全局協(xié)程作用域部脚,在這個范圍內(nèi)啟動的協(xié)程可以一直運行直到應(yīng)用停止運行。GlobalScope 本身不會阻塞當(dāng)前線程裤纹,且啟動的協(xié)程相當(dāng)于守護線程委刘,不會阻止 JVM 結(jié)束運行
- runBlocking。一個頂層函數(shù)鹰椒,和 GlobalScope 不一樣锡移,它會阻塞當(dāng)前線程直到其內(nèi)部所有相同作用域的協(xié)程執(zhí)行結(jié)束
- 自定義 CoroutineScope∑峒剩可用于實現(xiàn)主動控制協(xié)程的生命周期范圍淆珊,對于 Android 開發(fā)來說最大意義之一就是可以在 Activity、Fragment奸汇、ViewModel 等具有生命周期的對象中按需取消所有協(xié)程任務(wù)施符,從而確保生命周期安全,避免內(nèi)存泄露
1擂找、GlobalScope
GlobalScope 屬于 全局作用域戳吝,這意味著通過 GlobalScope 啟動的協(xié)程的生命周期只受整個應(yīng)用程序的生命周期的限制,只要整個應(yīng)用程序還在運行且協(xié)程的任務(wù)還未結(jié)束贯涎,協(xié)程就可以一直運行
GlobalScope 不會阻塞其所在線程听哭,所以以下代碼中主線程的日志會早于 GlobalScope 內(nèi)部輸出日志。此外柬采,GlobalScope 啟動的協(xié)程相當(dāng)于守護線程欢唾,不會阻止 JVM 結(jié)束運行,所以如果將主線程的休眠時間改為三百毫秒的話粉捻,就不會看到 launch A 輸出日志
fun main() {
log("start")
GlobalScope.launch {
launch {
delay(400)
log("launch A")
}
launch {
delay(300)
log("launch B")
}
log("GlobalScope")
}
log("end")
Thread.sleep(500)
}
[main] start
[main] end
[DefaultDispatcher-worker-1] GlobalScope
[DefaultDispatcher-worker-3] launch B
[DefaultDispatcher-worker-3] launch A
GlobalScope.launch
會創(chuàng)建一個頂級協(xié)程礁遣,盡管它很輕量級,但在運行時還是會消耗一些內(nèi)存資源肩刃,且可以一直運行直到整個應(yīng)用程序停止(只要任務(wù)還未結(jié)束)祟霍,這可能會導(dǎo)致內(nèi)存泄露杏头,所以在日常開發(fā)中應(yīng)該謹(jǐn)慎使用 GlobalScope
2、runBlocking
也可以使用 runBlocking 這個頂層函數(shù)來啟動協(xié)程沸呐,runBlocking 函數(shù)的第二個參數(shù)即協(xié)程的執(zhí)行體醇王,該參數(shù)被聲明為 CoroutineScope 的擴展函數(shù),因此執(zhí)行體就包含了一個隱式的 CoroutineScope崭添,所以在 runBlocking 內(nèi)部可以來直接啟動協(xié)程
public fun <T> runBlocking(context: CoroutineContext =
EmptyCoroutineContext, block: suspend CoroutineScope.() -> T): T
runBlocking 的一個方便之處就是:只有當(dāng)內(nèi)部相同作用域的所有協(xié)程都運行結(jié)束后寓娩,聲明在 runBlocking 之后的代碼才能執(zhí)行,即 runBlocking 會阻塞其所在線程
看以下代碼呼渣。runBlocking 內(nèi)部啟動的兩個協(xié)程會各自做耗時操作棘伴,從輸出結(jié)果可以看出來兩個協(xié)程還是在交叉并發(fā)執(zhí)行,且 runBlocking 會等到兩個協(xié)程都執(zhí)行結(jié)束后才會退出屁置,外部的日志輸出結(jié)果有明確的先后順序焊夸。即 runBlocking 內(nèi)部啟動的協(xié)程是非阻塞式的,但 runBlocking 阻塞了其所在線程蓝角。此外阱穗,runBlocking 只會等待相同作用域的協(xié)程完成才會退出,而不會等待 GlobalScope 等其它作用域啟動的協(xié)程
fun main() {
log("start")
runBlocking {
launch {
repeat(3) {
delay(100)
log("launchA - $it")
}
}
launch {
repeat(3) {
delay(100)
log("launchB - $it")
}
}
GlobalScope.launch {
repeat(3) {
delay(120)
log("GlobalScope - $it")
}
}
}
log("end")
}
[main] start
[main] launchA - 0
[main] launchB - 0
[DefaultDispatcher-worker-1] GlobalScope - 0
[main] launchA - 1
[main] launchB - 1
[DefaultDispatcher-worker-1] GlobalScope - 1
[main] launchA - 2
[main] launchB - 2
[main] end
所以說使鹅,runBlocking 本身帶有阻塞線程的意味揪阶,但其內(nèi)部運行的協(xié)程又是非阻塞的,讀者需要明白這兩者的區(qū)別
基于是否會阻塞線程的區(qū)別并徘,以下代碼中 runBlocking 會早于 GlobalScope 輸出日志
fun main() {
GlobalScope.launch(Dispatchers.IO) {
delay(600)
log("GlobalScope")
}
runBlocking {
delay(500)
log("runBlocking")
}
//主動休眠兩百毫秒遣钳,使得和 runBlocking 加起來的延遲時間多于六百毫秒
Thread.sleep(200)
log("after sleep")
}
[main] runBlocking
[DefaultDispatcher-worker-1] GlobalScope
[main] after sleep
3、coroutineScope
coroutineScope
函數(shù)用于創(chuàng)建一個獨立的協(xié)程作用域麦乞,直到所有啟動的協(xié)程都完成后才結(jié)束自身。runBlocking
和 coroutineScope
看起來很像劝评,因為它們都需要等待其內(nèi)部所有相同作用域的協(xié)程結(jié)束后才會結(jié)束自己姐直。兩者的主要區(qū)別在于 runBlocking
方法會阻塞當(dāng)前線程,而 coroutineScope
不會蒋畜,而是會掛起并釋放底層線程以供其它協(xié)程使用声畏。基于這個差別姻成,runBlocking
是一個普通函數(shù)插龄,而 coroutineScope
是一個掛起函數(shù)
fun main() = runBlocking {
launch {
delay(100)
log("Task from runBlocking")
}
coroutineScope {
launch {
delay(500)
log("Task from nested launch")
}
delay(50)
log("Task from coroutine scope")
}
log("Coroutine scope is over")
}
[main] Task from coroutine scope
[main] Task from runBlocking
[main] Task from nested launch
[main] Coroutine scope is over
4、supervisorScope
supervisorScope
函數(shù)用于創(chuàng)建一個使用了 SupervisorJob 的 coroutineScope科展,該作用域的特點就是拋出的異常不會連鎖取消同級協(xié)程和父協(xié)程
fun main() = runBlocking {
launch {
delay(100)
log("Task from runBlocking")
}
supervisorScope {
launch {
delay(500)
log("Task throw Exception")
throw Exception("failed")
}
launch {
delay(600)
log("Task from nested launch")
}
}
log("Coroutine scope is over")
}
[main] Task from runBlocking
[main] Task throw Exception
[main] Task from nested launch
[main] Coroutine scope is over
5均牢、自定義 CoroutineScope
假設(shè)我們在 Activity 中先后啟動了多個協(xié)程用于執(zhí)行異步耗時操作,那么當(dāng) Activity 退出時才睹,必須取消所有協(xié)程以避免內(nèi)存泄漏徘跪。我們可以通過保留每一個 Job 引用然后在 onDestroy
方法里來手動取消甘邀,但這種方式相當(dāng)來說會比較繁瑣和低效。kotlinx.coroutines 提供了 CoroutineScope 來管理多個協(xié)程的生命周期
我們可以通過創(chuàng)建與 Activity 生命周期相關(guān)聯(lián)的協(xié)程作用域來管理協(xié)程的生命周期垮庐。CoroutineScope 的實例可以通過 CoroutineScope()
或 MainScope()
的工廠函數(shù)來構(gòu)建松邪。前者創(chuàng)建通用作用域,后者創(chuàng)建 UI 應(yīng)用程序的作用域并使用 Dispatchers.Main 作為默認(rèn)的調(diào)度器
class Activity {
private val mainScope = MainScope()
fun onCreate() {
mainScope.launch {
repeat(5) {
delay(1000L * it)
}
}
}
fun onDestroy() {
mainScope.cancel()
}
}
或者哨查,我們可以通過委托模式來讓 Activity 實現(xiàn) CoroutineScope 接口逗抑,從而可以在 Activity 內(nèi)直接啟動協(xié)程而不必顯示地指定它們的上下文,并且在 onDestroy()
中自動取消所有協(xié)程
class Activity : CoroutineScope by CoroutineScope(Dispatchers.Default) {
fun onCreate() {
launch {
repeat(5) {
delay(200L * it)
log(it)
}
}
log("Activity Created")
}
fun onDestroy() {
cancel()
log("Activity Destroyed")
}
}
fun main() = runBlocking {
val activity = Activity()
activity.onCreate()
delay(1000)
activity.onDestroy()
delay(1000)
}
從輸出結(jié)果可以看出寒亥,當(dāng)回調(diào)了onDestroy()
方法后協(xié)程就不會再輸出日志了
[main] Activity Created
[DefaultDispatcher-worker-1] 0
[DefaultDispatcher-worker-1] 1
[DefaultDispatcher-worker-1] 2
[main] Activity Destroyed
已取消的作用域無法再創(chuàng)建協(xié)程邮府。因此,僅當(dāng)控制其生命周期的類被銷毀時护盈,才應(yīng)調(diào)用 scope.cancel()
挟纱。例如,使用 viewModelScope
時腐宋, ViewModel 會在自身的 onCleared()
方法中自動取消作用域
六紊服、CoroutineBuilder
1、launch
看下 launch
函數(shù)的方法簽名胸竞。launch
是一個作用于 CoroutineScope 的擴展函數(shù)欺嗤,用于在不阻塞當(dāng)前線程的情況下啟動一個協(xié)程,并返回對該協(xié)程任務(wù)的引用卫枝,即 Job 對象
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job
launch
函數(shù)共包含三個參數(shù):
- context煎饼。用于指定協(xié)程的上下文
- start。用于指定協(xié)程的啟動方式校赤,默認(rèn)值為
CoroutineStart.DEFAULT
吆玖,即協(xié)程會在聲明的同時就立即進入等待調(diào)度的狀態(tài),即可以立即執(zhí)行的狀態(tài)马篮≌闯耍可以通過將其設(shè)置為CoroutineStart.LAZY
來實現(xiàn)延遲啟動,即懶加載 - block浑测。用于傳遞協(xié)程的執(zhí)行體翅阵,即希望交由協(xié)程執(zhí)行的任務(wù)
可以看到 launchA 和 launchB 是并行交叉執(zhí)行的
fun main() = runBlocking {
val launchA = launch {
repeat(3) {
delay(100)
log("launchA - $it")
}
}
val launchB = launch {
repeat(3) {
delay(100)
log("launchB - $it")
}
}
}
[main] launchA - 0
[main] launchB - 0
[main] launchA - 1
[main] launchB - 1
[main] launchA - 2
[main] launchB - 2
2、Job
Job 是協(xié)程的句柄迁央。使用 launch
或 async
創(chuàng)建的每個協(xié)程都會返回一個 Job 實例掷匠,該實例唯一標(biāo)識協(xié)程并管理其生命周期。Job 是一個接口類型岖圈,這里列舉 Job 幾個比較有用的屬性和函數(shù)
//當(dāng) Job 處于活動狀態(tài)時為 true
//如果 Job 未被取消或沒有失敗讹语,則均處于 active 狀態(tài)
public val isActive: Boolean
//當(dāng) Job 正常結(jié)束或者由于異常結(jié)束,均返回 true
public val isCompleted: Boolean
//當(dāng) Job 被主動取消或者由于異常結(jié)束幅狮,均返回 true
public val isCancelled: Boolean
//啟動 Job
//如果此調(diào)用的確啟動了 Job募强,則返回 true
//如果 Job 調(diào)用前就已處于 started 或者是 completed 狀態(tài)株灸,則返回 false
public fun start(): Boolean
//用于取消 Job,可同時通過傳入 Exception 來標(biāo)明取消原因
public fun cancel(cause: CancellationException? = null)
//阻塞等待直到此 Job 結(jié)束運行
public suspend fun join()
//當(dāng) Job 結(jié)束運行時(不管由于什么原因)回調(diào)此方法擎值,可用于接收可能存在的運行異常
public fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle
Job 具有以下幾種狀態(tài)值慌烧,每種狀態(tài)對應(yīng)的屬性值各不相同
State | isActive | isCompleted | isCancelled |
---|---|---|---|
New (optional initial state) | false | false | false |
Active (default initial state) | true | false | false |
Completing (transient state) | true | false | false |
Cancelling (transient state) | false | false | true |
Cancelled (final state) | false | true | true |
Completed (final state) | false | true | false |
fun main() {
//將協(xié)程設(shè)置為延遲啟動
val job = GlobalScope.launch(start = CoroutineStart.LAZY) {
for (i in 0..100) {
//每循環(huán)一次均延遲一百毫秒
delay(100)
}
}
job.invokeOnCompletion {
log("invokeOnCompletion:$it")
}
log("1. job.isActive:${job.isActive}")
log("1. job.isCancelled:${job.isCancelled}")
log("1. job.isCompleted:${job.isCompleted}")
job.start()
log("2. job.isActive:${job.isActive}")
log("2. job.isCancelled:${job.isCancelled}")
log("2. job.isCompleted:${job.isCompleted}")
//休眠四百毫秒后再主動取消協(xié)程
Thread.sleep(400)
job.cancel(CancellationException("test"))
//休眠四百毫秒防止JVM過快停止導(dǎo)致 invokeOnCompletion 來不及回調(diào)
Thread.sleep(400)
log("3. job.isActive:${job.isActive}")
log("3. job.isCancelled:${job.isCancelled}")
log("3. job.isCompleted:${job.isCompleted}")
}
[main] 1. job.isActive:false
[main] 1. job.isCancelled:false
[main] 1. job.isCompleted:false
[main] 2. job.isActive:true
[main] 2. job.isCancelled:false
[main] 2. job.isCompleted:false
[DefaultDispatcher-worker-2] invokeOnCompletion:java.util.concurrent.CancellationException: test
[main] 3. job.isActive:false
[main] 3. job.isCancelled:true
[main] 3. job.isCompleted:true
3、async
看下 async
函數(shù)的方法簽名鸠儿。async
也是一個作用于 CoroutineScope 的擴展函數(shù)屹蚊,和 launch
的區(qū)別主要就在于:async
可以返回協(xié)程的執(zhí)行結(jié)果,而 launch
不行
public fun <T> CoroutineScope.async(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> T
): Deferred<T>
通過await()
方法可以拿到 async 協(xié)程的執(zhí)行結(jié)果进每,可以看到兩個協(xié)程的總耗時是遠(yuǎn)少于七秒的汹粤,總耗時基本等于耗時最長的協(xié)程
fun main() {
val time = measureTimeMillis {
runBlocking {
val asyncA = async {
delay(3000)
1
}
val asyncB = async {
delay(4000)
2
}
log(asyncA.await() + asyncB.await())
}
}
log(time)
}
[main] 3
[main] 4070
由于 launch 和 async 僅能夠在 CouroutineScope 中使用,所以任何創(chuàng)建的協(xié)程都會被該 scope 追蹤田晚。Kotlin 禁止創(chuàng)建不能夠被追蹤的協(xié)程嘱兼,從而避免協(xié)程泄漏
4、async 錯誤用法
修改上述代碼贤徒,可以發(fā)現(xiàn)兩個協(xié)程的總耗時就會變?yōu)槠呙胱笥?/p>
fun main() {
val time = measureTimeMillis {
runBlocking {
val asyncA = async(start = CoroutineStart.LAZY) {
delay(3000)
1
}
val asyncB = async(start = CoroutineStart.LAZY) {
delay(4000)
2
}
log(asyncA.await() + asyncB.await())
}
}
log(time)
}
[main] 3
[main] 7077
會造成這不同區(qū)別是因為 CoroutineStart.LAZY
不會主動啟動協(xié)程芹壕,而是直到調(diào)用async.await()
或者async.satrt()
后才會啟動(即懶加載模式),所以asyncA.await() + asyncB.await()
會導(dǎo)致兩個協(xié)程其實是在順序執(zhí)行接奈。而默認(rèn)值 CoroutineStart.DEFAULT
參數(shù)會使得協(xié)程在聲明的同時就被啟動了(實際上還需要等待被調(diào)度執(zhí)行踢涌,但可以看做是立即就執(zhí)行了),所以此時調(diào)用第一個 async.await()
時兩個協(xié)程其實都是處于運行狀態(tài)序宦,所以總耗時就是四秒左右
此時可以通過先調(diào)用start()
再調(diào)用await()
來實現(xiàn)第一個例子的效果
asyncA.start()
asyncB.start()
log(asyncA.await() + asyncB.await())
5睁壁、async 并行分解
由 suspend
函數(shù)啟動的所有協(xié)程都必須在該函數(shù)返回結(jié)果時停止,因此你可能需要保證這些協(xié)程在返回結(jié)果之前完成互捌。借助 Kotlin 中的結(jié)構(gòu)化并發(fā)機制潘明,你可以定義用于啟動一個或多個協(xié)程的 coroutineScope
。然后秕噪,你可以使用 await()
(針對單個協(xié)程)或 awaitAll()
(針對多個協(xié)程)保證這些協(xié)程在從函數(shù)返回結(jié)果之前完成
假設(shè)我們定義一個用于異步獲取兩個文檔的 coroutineScope
钉疫,通過對每個延遲引用調(diào)用 await()
,我們可以保證這兩項 async
操作在返回值之前完成:
suspend fun fetchTwoDocs() = coroutineScope {
val deferredOne = async { fetchDoc(1) }
val deferredTwo = async { fetchDoc(2) }
deferredOne.await()
deferredTwo.await()
}
還可以對集合使用 awaitAll()
來達(dá)到相同效果巢价。雖然 fetchTwoDocs()
使用 async
啟動新協(xié)程,但該函數(shù)使用 awaitAll()
等待啟動的協(xié)程完成后才會返回結(jié)果固阁。不過壤躲,即使我們沒有調(diào)用 awaitAll()
,coroutineScope
構(gòu)建器也會等到所有內(nèi)部協(xié)程都完成后才會恢復(fù)名為 fetchTwoDocs
的協(xié)程备燃。此外碉克,coroutineScope
會捕獲協(xié)程拋出的所有異常,并將其傳送給調(diào)用方
suspend fun fetchTwoDocs() = coroutineScope {
val deferreds = listOf(
async { fetchDoc(1) },
async { fetchDoc(2) }
)
deferreds.awaitAll()
}
6并齐、Deferred
async
函數(shù)的返回值是一個 Deferred 對象漏麦。Deferred 是一個接口類型客税,繼承于 Job 接口,所以 Job 包含的屬性和方法 Deferred 都有撕贞,其主要是在 Job 的基礎(chǔ)上擴展了 await()
方法
七更耻、CoroutineContext
CoroutineContext 使用以下元素集定義協(xié)程的行為:
- Job:控制協(xié)程的生命周期
- CoroutineDispatcher:將任務(wù)指派給適當(dāng)?shù)木€程
- CoroutineName:協(xié)程的名稱,可用于調(diào)試
- CoroutineExceptionHandler:處理未捕獲的異常
1捏膨、Job
協(xié)程中的 Job 是其上下文 CoroutineContext 中的一部分秧均,可以通過 coroutineContext[Job]
表達(dá)式從上下文中獲取到,我們可以通過控制 Job 來控制 CoroutineScope 的生命周期
val job = Job()
val scope = CoroutineScope(job + Dispatchers.IO)
fun main(): Unit = runBlocking {
log("job is $job")
val job = scope.launch {
try {
delay(3000)
} catch (e: CancellationException) {
log("job is cancelled")
throw e
}
log("end")
}
delay(1000)
log("scope job is ${scope.coroutineContext[Job]}")
scope.coroutineContext[Job]?.cancel()
}
[main] job is JobImpl{Active}@759ebb3d
[main] scope job is JobImpl{Active}@759ebb3d
[DefaultDispatcher-worker-1] job is cancelled
實際上 CoroutineScope 的 isActive
這個擴展屬性只是 coroutineContext[Job]?.isActive ?: true
的一種簡便寫法
public val CoroutineScope.isActive: Boolean
get() = coroutineContext[Job]?.isActive ?: true
2号涯、CoroutineDispatcher
CoroutineContext 包含一個 CoroutineDispatcher(協(xié)程調(diào)度器)用于指定執(zhí)行協(xié)程的目標(biāo)載體目胡,即 運行于哪個線程。CoroutineDispatcher 可以將協(xié)程的執(zhí)行操作限制在特定線程上链快,也可以將其分派到線程池中誉己,或者讓它無限制地運行。所有的協(xié)程構(gòu)造器(如 launch 和 async)都接受一個可選參數(shù)域蜗,即 CoroutineContext 巨双,該參數(shù)可用于顯式指定要創(chuàng)建的協(xié)程和其它上下文元素所要使用的 CoroutineDispatcher
要在主線程之外運行代碼,可以指定 Kotlin 協(xié)程在 Default 或 IO 調(diào)度程序上執(zhí)行工作地消。在 Kotlin 中炉峰,所有協(xié)程都必須在 CoroutineDispatcher 中運行,即使它們在主線程上運行也是如此脉执。協(xié)程可以自行暫停疼阔,而 CoroutineDispatcher 負(fù)責(zé)將其恢復(fù)
Kotlin 協(xié)程庫提供了四個 Dispatcher 用于指定在哪一類線程中執(zhí)行協(xié)程:
- Dispatchers.Default。默認(rèn)調(diào)度器半夷,適合用于執(zhí)行占用大量 CPU 資源的任務(wù)婆廊。例如:對列表排序和解析 JSON
- Dispatchers.IO。適合用于執(zhí)行磁盤或網(wǎng)絡(luò) I/O 的任務(wù)巫橄。例如:使用 Room 組件淘邻、讀寫磁盤文件总滩,執(zhí)行網(wǎng)絡(luò)請求
- Dispatchers.Unconfined读整。對執(zhí)行協(xié)程的線程不做限制,可以直接在當(dāng)前調(diào)度器所在線程上執(zhí)行
- Dispatchers.Main炊林。使用此調(diào)度程序可用于在 Android 主線程上運行協(xié)程彩倚,只能用于與界面交互和執(zhí)行快速工作筹我,例如:更新 UI、調(diào)用
LiveData.setValue
fun main() = runBlocking<Unit> {
launch {
log("main runBlocking")
}
launch(Dispatchers.Default) {
log("Default")
launch(Dispatchers.Unconfined) {
log("Unconfined 1")
}
}
launch(Dispatchers.IO) {
log("IO")
launch(Dispatchers.Unconfined) {
log("Unconfined 2")
}
}
launch(newSingleThreadContext("MyOwnThread")) {
log("newSingleThreadContext")
launch(Dispatchers.Unconfined) {
log("Unconfined 4")
}
}
launch(Dispatchers.Unconfined) {
log("Unconfined 3")
}
GlobalScope.launch {
log("GlobalScope")
}
}
[DefaultDispatcher-worker-2] Default
[DefaultDispatcher-worker-1] IO
[DefaultDispatcher-worker-2] Unconfined 1
[DefaultDispatcher-worker-1] Unconfined 2
[MyOwnThread] newSingleThreadContext
[main] Unconfined 3
[MyOwnThread] Unconfined 4
[DefaultDispatcher-worker-1] GlobalScope
[main] main runBlocking
-
launch
在不執(zhí)行 Dispatchers 的情況下使用時帆离,它從外部的協(xié)程作用域繼承上下文和調(diào)度器蔬蕊,即和 runBlocking 保持一致,均在 main 線程執(zhí)行 - IO 和 Default 均依靠后臺線程池來執(zhí)行
- Unconfined 則不限定具體的線程類型哥谷,當(dāng)前調(diào)度器在哪個線程岸夯,就在該線程上進行執(zhí)行麻献,因此上述例子中每個 Unconfined 協(xié)程所在線程均不一樣
- GlobalScope 啟動協(xié)程時默認(rèn)使用的調(diào)度器是 Dispatchers.Default,因此也是在后臺線程池中執(zhí)行
-
newSingleThreadContext
用于為協(xié)程專門創(chuàng)建一個新的線程猜扮,專用線程是一種成本非常昂貴的資源勉吻,在實際開發(fā)時必須當(dāng)不再需要時釋放掉線程資源,或者存儲在頂級變量中以便在整個應(yīng)用程序中進行復(fù)用
3破镰、withContext
對于以下代碼餐曼,get
方法內(nèi)使用withContext(Dispatchers.IO)
創(chuàng)建了一個指定在 IO 線程池中運行的代碼塊,該區(qū)間內(nèi)的任何代碼都始終通過 IO 線程來執(zhí)行鲜漩。由于 withContext
方法本身就是一個掛起函數(shù)源譬,因此 get
方法也必須定義為掛起函數(shù)
suspend fun fetchDocs() { // Dispatchers.Main
val result = get("developer.android.com") // Dispatchers.Main
show(result) // Dispatchers.Main
}
suspend fun get(url: String) = // Dispatchers.Main
withContext(Dispatchers.IO) { // Dispatchers.IO (main-safety block)
/* perform network IO here */ // Dispatchers.IO (main-safety block)
} // Dispatchers.Main
}
借助協(xié)程,你可以細(xì)粒度地來調(diào)度線程孕似。由于withContext()
支持在不引入回調(diào)的情況下控制任何代碼的執(zhí)行線程池踩娘,因此你可以將其應(yīng)用于非常小的函數(shù),例如從數(shù)據(jù)庫中讀取數(shù)據(jù)或執(zhí)行網(wǎng)絡(luò)請求喉祭。一種不錯的做法是使用 withContext()
來確保每個函數(shù)都是主線程安全的养渴,這意味著,你可以從主線程調(diào)用每個函數(shù)泛烙。這樣理卑,調(diào)用方就從不需要考慮應(yīng)該使用哪個線程來執(zhí)行函數(shù)了
在前面的示例中,fetchDocs()
方法在主線程上執(zhí)行蔽氨,不過它可以安全地調(diào)用 get
方法藐唠,因為get
方法已確保網(wǎng)絡(luò)請求會在子線程中執(zhí)行。由于協(xié)程支持 suspend
和 resume
操作鹉究,因此 withContext
塊完成后宇立,主線程上的協(xié)程會立即根據(jù) get
結(jié)果恢復(fù)
與基于回調(diào)的等效實現(xiàn)相比,withContext()
不會增加額外的開銷自赔。此外妈嘹,在某些情況下,還可以優(yōu)化 withContext()
調(diào)用绍妨,使其超越基于回調(diào)的等效實現(xiàn)润脸。例如,如果某個函數(shù)需要先后調(diào)用十次網(wǎng)絡(luò)請求他去,你可以在最外層調(diào)用 withContext()
讓協(xié)程只切換一次線程津函,這樣即使每個網(wǎng)絡(luò)請求內(nèi)部均會使用 withContext()
,它也會留在同一調(diào)度程序上孤页,從而避免頻率切換線程。此外涩馆,協(xié)程還優(yōu)化了 Dispatchers.Default
與 Dispatchers.IO
之間的切換行施,以盡可能避免線程切換
使用線程池的調(diào)度器(例如
Dispatchers.IO
或Dispatchers.Default
)不能保證代碼塊一直在同一線程上從上到下執(zhí)行允坚,在某些情況下,協(xié)程在suspend
和resume
后可能會將任務(wù)移交給另一個線程來執(zhí)行蛾号。這意味著稠项,對于整個withContext()
塊,由于多線程并發(fā)之間的原子性和可見性等原因鲜结,先后讀取到的線程局部變量可能并非是同個值
4展运、CoroutineName
CoroutineName 用于為協(xié)程指定一個名字,方便調(diào)試和定位問題
fun main() = runBlocking<Unit>(CoroutineName("RunBlocking")) {
log("start")
launch(CoroutineName("MainCoroutine")) {
launch(CoroutineName("Coroutine#A")) {
delay(400)
log("launch A")
}
launch(CoroutineName("Coroutine#B")) {
delay(300)
log("launch B")
}
}
}
5精刷、CoroutineExceptionHandler
在下文的異常處理會講到
6拗胜、組合上下文元素
有時我們需要為協(xié)程上下文定義多個元素,此時就可以用 +
運算符怒允。例如埂软,我們可以同時為協(xié)程指定 Dispatcher 和 CoroutineName
fun main() = runBlocking<Unit> {
launch(Dispatchers.Default + CoroutineName("test")) {
log("Hello World")
}
}
而由于 CoroutineContext 是由一組元素組成的,所以加號右側(cè)的元素會覆蓋加號左側(cè)的元素纫事,從而組成新的 CoroutineContext勘畔。比如,(Dispatchers.Main, "name") + (Dispatchers.IO)
的運行結(jié)果是:(Dispatchers.IO, "name")
八丽惶、取消協(xié)程
如果用戶退出啟動了協(xié)程的 Activity / Fragment炫七,那正常情況下就應(yīng)該取消所有協(xié)程
job.cancel()
就用于取消協(xié)程,job.join()
用于阻塞等待協(xié)程運行結(jié)束钾唬。因為 cancel()
函數(shù)調(diào)用后會馬上返回而不是等待協(xié)程結(jié)束后再返回万哪,所以此時協(xié)程不一定就是已經(jīng)停止運行了。如果需要確保協(xié)程結(jié)束運行后再執(zhí)行后續(xù)代碼知纷,就需要調(diào)用 join()
方法來阻塞等待壤圃。也可以通過調(diào)用 Job 的擴展函數(shù) cancelAndJoin()
來完成相同操作,它結(jié)合了 cancel
和 join
兩個操作
fun main() = runBlocking {
val job = launch {
repeat(1000) { i ->
log("job: I'm sleeping $i ...")
delay(500L)
}
}
delay(1300L)
log("main: I'm tired of waiting!")
job.cancel()
job.join()
log("main: Now I can quit.")
}
[main] job: I'm sleeping 0 ...
[main] job: I'm sleeping 1 ...
[main] job: I'm sleeping 2 ...
[main] main: I'm tired of waiting!
[main] main: Now I can quit.
1琅轧、協(xié)程可能無法取消
并不是所有協(xié)程都可以響應(yīng)取消操作伍绳,協(xié)程的取消操作是需要協(xié)作 (cooperative) 完成的,協(xié)程必須協(xié)作才能被取消乍桂。協(xié)程庫中的所有掛起函數(shù)都是可取消的冲杀,它們在運行前檢查協(xié)程是否被取消了,并在取消時拋出 CancellationException 從而結(jié)束整個任務(wù)睹酌。而如果協(xié)程在執(zhí)行計算任務(wù)前沒有判斷自身是否已被取消的話权谁,此時就無法取消協(xié)程
所以即使以下代碼主動取消了協(xié)程,協(xié)程也只會在完成既定循環(huán)后才結(jié)束運行憋沿,因為協(xié)程沒有在每次循環(huán)前先進行檢查旺芽,導(dǎo)致任務(wù)不受取消操作的影響
fun main() = runBlocking {
val startTime = System.currentTimeMillis()
val job = launch(Dispatchers.Default) {
var nextPrintTime = startTime
var i = 0
while (i < 5) {
if (System.currentTimeMillis() >= nextPrintTime) {
log("job: I'm sleeping ${i++} ...")
nextPrintTime += 500L
}
}
}
delay(1300L)
log("main: I'm tired of waiting!")
job.cancelAndJoin()
log("main: Now I can quit.")
}
[DefaultDispatcher-worker-1] job: I'm sleeping 0 ...
[DefaultDispatcher-worker-1] job: I'm sleeping 1 ...
[DefaultDispatcher-worker-1] job: I'm sleeping 2 ...
[main] main: I'm tired of waiting!
[DefaultDispatcher-worker-1] job: I'm sleeping 3 ...
[DefaultDispatcher-worker-1] job: I'm sleeping 4 ...
[main] main: Now I can quit.
為了實現(xiàn)取消協(xié)程的目的,就需要為上述代碼加上判斷協(xié)程是否還處于可運行狀態(tài)的邏輯,當(dāng)不可運行時就主動退出協(xié)程采章。isActive
是 CoroutineScope 的擴展屬性运嗜,就用于判斷協(xié)程是否還處于可運行狀態(tài)
fun main() = runBlocking {
val startTime = System.currentTimeMillis()
val job = launch(Dispatchers.Default) {
var nextPrintTime = startTime
var i = 0
while (i < 5) {
if (isActive) {
if (System.currentTimeMillis() >= nextPrintTime) {
log("job: I'm sleeping ${i++} ...")
nextPrintTime += 500L
}
} else {
return@launch
}
}
}
delay(1300L)
log("main: I'm tired of waiting!")
job.cancelAndJoin()
log("main: Now I can quit.")
}
取消協(xié)程這個操作類似于在 Java 中調(diào)用Thread.interrupt()
方法來向線程發(fā)起中斷請求,這兩個操作都不會強制停止協(xié)程和線程悯舟,外部只是相當(dāng)于發(fā)起一個停止運行的請求担租,需要依靠協(xié)程和線程響應(yīng)請求后主動停止運行
Java 和 Kotlin 之所以均沒有提供一個可以直接強制停止線程或協(xié)程的方法,是因為這個操作可能會帶來各種意想不到的情況抵怎。例如奋救,在停止線程或協(xié)程的時候,它們可能還持有著某些排他性資源(例如:鎖反惕,數(shù)據(jù)庫鏈接)尝艘,如果強制性地停止,它們持有的鎖就會一直無法得到釋放承璃,導(dǎo)致其它線程或協(xié)程一直無法得到目標(biāo)資源利耍,最終就可能導(dǎo)致線程死鎖。所以Thread.stop()
方法目前也是處于廢棄狀態(tài)盔粹,Java 官方并沒有提供一個可靠的停止線程的方法
2隘梨、用 finally 釋放資源
可取消的掛起函數(shù)在取消時會拋出 CancellationException,可以依靠try {...} finally {...}
或者 Kotlin 的 use
函數(shù)在取消協(xié)程后釋放持有的資源
fun main() = runBlocking {
val job = launch {
try {
repeat(1000) { i ->
log("job: I'm sleeping $i ...")
delay(500L)
}
} catch (e: Throwable) {
log(e.message)
} finally {
log("job: I'm running finally")
}
}
delay(1300L)
log("main: I'm tired of waiting!")
job.cancelAndJoin()
log("main: Now I can quit.")
}
[main] job: I'm sleeping 0 ...
[main] job: I'm sleeping 1 ...
[main] job: I'm sleeping 2 ...
[main] main: I'm tired of waiting!
[main] StandaloneCoroutine was cancelled
[main] job: I'm running finally
[main] main: Now I can quit.
3舷嗡、NonCancellable
如果在上一個例子中的 finally
塊中再調(diào)用掛起函數(shù)的話轴猎,將會導(dǎo)致拋出 CancellationException,因為此時協(xié)程已經(jīng)被取消了进萄。通常我們并不會遇到這種情況捻脖,因為常見的資源釋放操作都是非阻塞的,且不涉及任何掛起函數(shù)中鼠。但在極少數(shù)情況下我們需要在取消的協(xié)程中再調(diào)用掛起函數(shù)可婶,此時可以使用 withContext
函數(shù)和 NonCancellable
上下文將相應(yīng)的代碼包裝在 withContext(NonCancellable) {...}
代碼塊中,NonCancellable 就用于創(chuàng)建一個無法取消的協(xié)程作用域
fun main() = runBlocking {
log("start")
val launchA = launch {
try {
repeat(5) {
delay(50)
log("launchA-$it")
}
} finally {
delay(50)
log("launchA isCompleted")
}
}
val launchB = launch {
try {
repeat(5) {
delay(50)
log("launchB-$it")
}
} finally {
withContext(NonCancellable) {
delay(50)
log("launchB isCompleted")
}
}
}
//延時一百毫秒援雇,保證兩個協(xié)程都已經(jīng)被啟動了
delay(200)
launchA.cancel()
launchB.cancel()
log("end")
}
[main] start
[main] launchA-0
[main] launchB-0
[main] launchA-1
[main] launchB-1
[main] launchA-2
[main] launchB-2
[main] end
[main] launchB isCompleted
4矛渴、父協(xié)程和子協(xié)程
當(dāng)一個協(xié)程在另外一個協(xié)程的協(xié)程作用域中啟動時,它將通過 CoroutineScope.coroutineContext
繼承其上下文惫搏,新啟動的協(xié)程就被稱為子協(xié)程具温,子協(xié)程的 Job 將成為父協(xié)程 Job 的子 Job。父協(xié)程總是會等待其所有子協(xié)程都完成后才結(jié)束自身筐赔,所以父協(xié)程不必顯式跟蹤它啟動的所有子協(xié)程铣猩,也不必使用 Job.join
在末尾等待子協(xié)程完成
所以雖然 parentJob 啟動的三個子協(xié)程的延時時間各不相同,但它們最終都會打印出日志
fun main() = runBlocking {
val parentJob = launch {
repeat(3) { i ->
launch {
delay((i + 1) * 200L)
log("Coroutine $i is done")
}
}
log("request: I'm done and I don't explicitly join my children that are still active")
}
}
[main] request: I'm done and I don't explicitly join my children that are still active
[main] Coroutine 0 is done
[main] Coroutine 1 is done
[main] Coroutine 2 is done
5茴丰、傳播取消操作
一般情況下达皿,協(xié)程的取消操作會通過協(xié)程的層次結(jié)構(gòu)來進行傳播:如果取消父協(xié)程或者父協(xié)程拋出異常天吓,那么子協(xié)程都會被取消;而如果子協(xié)程被取消鳞绕,則不會影響同級協(xié)程和父協(xié)程失仁,但如果子協(xié)程拋出異常則也會導(dǎo)致同級協(xié)程和父協(xié)程被取消
對于以下代碼,子協(xié)程 jon1 被取消并不影響子協(xié)程 jon2 和父協(xié)程繼續(xù)運行们何,但父協(xié)程被取消后子協(xié)程都會被遞歸取消
fun main() = runBlocking {
val request = launch {
val job1 = launch {
repeat(10) {
delay(300)
log("job1: $it")
if (it == 2) {
log("job1 canceled")
cancel()
}
}
}
val job2 = launch {
repeat(10) {
delay(300)
log("job2: $it")
}
}
}
delay(1600)
log("parent job canceled")
request.cancel()
delay(1000)
}
[main] job1: 0
[main] job2: 0
[main] job1: 1
[main] job2: 1
[main] job1: 2
[main] job1 canceled
[main] job2: 2
[main] job2: 3
[main] job2: 4
[main] parent job canceled
6、withTimeout
withTimeout
函數(shù)用于指定協(xié)程的運行超時時間控轿,如果超時則會拋出 TimeoutCancellationException冤竹,從而令協(xié)程結(jié)束運行
fun main() = runBlocking {
log("start")
val result = withTimeout(300) {
repeat(5) {
delay(100)
}
200
}
log(result)
log("end")
}
[main] start
Exception in thread "main" kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 300 ms
at kotlinx.coroutines.TimeoutKt.TimeoutCancellationException(Timeout.kt:186)
at kotlinx.coroutines.TimeoutCoroutine.run(Timeout.kt:156)
at kotlinx.coroutines.EventLoopImplBase$DelayedRunnableTask.run(EventLoop.common.kt:497)
at kotlinx.coroutines.EventLoopImplBase.processNextEvent(EventLoop.common.kt:274)
at kotlinx.coroutines.DefaultExecutor.run(DefaultExecutor.kt:69)
at java.lang.Thread.run(Thread.java:748)
withTimeout
方法拋出的 TimeoutCancellationException 是 CancellationException 的子類,之前我們并未在輸出日志上看到關(guān)于 CancellationException 這類異常的堆棧信息茬射,這是因為對于一個已取消的協(xié)程來說鹦蠕,CancellationException 被認(rèn)為是觸發(fā)協(xié)程結(jié)束的正常原因。但對于withTimeout
方法來說在抛,拋出異常是其上報超時情況的一種手段钟病,所以該異常不會被協(xié)程內(nèi)部消化掉
如果不希望因為異常導(dǎo)致協(xié)程結(jié)束,可以改用withTimeoutOrNull
方法刚梭,如果超時就會返回 null
九肠阱、異常處理
當(dāng)一個協(xié)程由于異常而運行失敗時,它會傳播這個異常并傳遞給它的父協(xié)程朴读。接下來屹徘,父協(xié)程會進行下面幾步操作:
- 取消它自己的子級
- 取消它自己
- 將異常傳播并傳遞給它的父級
異常會到達(dá)層級的根部,且當(dāng)前 CoroutineScope 所啟動的所有協(xié)程都會被取消衅金,但協(xié)程并非都是一發(fā)現(xiàn)異常就執(zhí)行以上流程噪伊,launch 和 async 在處理異常方面有著一些差異
launch 將異常視為未捕獲異常,類似于 Java 的 Thread.uncaughtExceptionHandler
氮唯,當(dāng)發(fā)現(xiàn)異常時就會馬上拋出鉴吹。async 期望最終通過調(diào)用 await 來獲取結(jié)果 (或者異常),所以默認(rèn)情況下它不會拋出異常惩琉,這意味著如果使用 async 啟動新的協(xié)程豆励,它會靜默地將異常丟棄,直到調(diào)用 async.await()
才會得到目標(biāo)值或者拋出存在的異常
例如琳水,以下的 fetchDocs()
方法由于并沒有調(diào)用 Deferred.await()
肆糕,因此異常并不會被拋給調(diào)用方,而如果使用的是 launch 而非 async 的話在孝,異常就會馬上被拋出
private val ioScope = CoroutineScope(Dispatchers.IO)
private fun fetchDocs() {
ioScope.async {
delay(500)
log("taskA throw AssertionError")
throw AssertionError()
}
}
1诚啃、CoroutineExceptionHandler
如果想主動捕獲異常信息,可以使用 CoroutineExceptionHandler 作為協(xié)程的上下文元素之一私沮,在這里進行自定義日志記錄或異常處理始赎,它類似于對線程使用 Thread.uncaughtExceptionHandler
。但是,CoroutineExceptionHandler 只會在預(yù)計不會由用戶處理的異常上調(diào)用造垛,因此在 async 中使用它沒有任何效果魔招,當(dāng) async 內(nèi)部發(fā)生了異常且沒有捕獲時,那么調(diào)用 async.await()
依然會導(dǎo)致應(yīng)用崩潰
以下代碼只會捕獲到 launch 拋出的異常
fun main() = runBlocking {
val handler = CoroutineExceptionHandler { _, exception ->
log("Caught $exception")
}
val job = GlobalScope.launch(handler) {
throw AssertionError()
}
val deferred = GlobalScope.async(handler) {
throw ArithmeticException()
}
joinAll(job, deferred)
}
[DefaultDispatcher-worker-2] Caught java.lang.AssertionError
2五辽、SupervisorJob
由于異常導(dǎo)致的取消在協(xié)程中是一種雙向關(guān)系办斑,會在整個協(xié)程層次結(jié)構(gòu)中傳播,那如果我們需要的是單向取消該怎么實現(xiàn)呢杆逗?
例如乡翅,假設(shè)在 Activity 中啟動了多個協(xié)程,如果單個協(xié)程所代表的子任務(wù)失敗了罪郊,此時并不一定需要連鎖終止整個 Activity 內(nèi)部的所有其它協(xié)程任務(wù)蠕蚜,即此時希望子協(xié)程的異常不會傳播給同級協(xié)程和父協(xié)程。而當(dāng) Activity 退出后悔橄,父協(xié)程的異常(即 CancellationException)又應(yīng)該連鎖傳播給所有子協(xié)程靶累,終止所有子協(xié)程
可以使用 SupervisorJob 來實現(xiàn)上述效果,取消操作只會向下傳播癣疟,一個子協(xié)程的運行失敗不會影響到同級協(xié)程和父協(xié)程
例如挣柬,以下示例中 firstChild 拋出的異常不會導(dǎo)致 secondChild 被取消,但當(dāng) supervisor 被取消時 secondChild 也被同時取消了
fun main() = runBlocking {
val supervisor = SupervisorJob()
with(CoroutineScope(coroutineContext + supervisor)) {
val firstChild = launch(CoroutineExceptionHandler { _, _ -> }) {
log("First child is failing")
throw AssertionError("First child is cancelled")
}
val secondChild = launch {
firstChild.join()
log("First child is cancelled: ${firstChild.isCancelled}, but second one is still active")
try {
delay(Long.MAX_VALUE)
} finally {
log("Second child is cancelled because supervisor is cancelled")
}
}
firstChild.join()
log("Cancelling supervisor")
//取消所有協(xié)程
supervisor.cancel()
secondChild.join()
}
}
[main] First child is failing
[main] First child is cancelled: true, but second one is still active
[main] Cancelling supervisor
[main] Second child is cancelled because supervisor is cancelled
但是争舞,如果異常沒有被處理且 CoroutineContext 沒有包含一個 CoroutineExceptionHandler 的話凛忿,異常會到達(dá)默認(rèn)線程的 ExceptionHandler。在 JVM 中竞川,異常會被打印在控制臺店溢;而在 Android 中,無論異常在那個 Dispatcher 中發(fā)生委乌,都會直接導(dǎo)致應(yīng)用崩潰床牧。所以如果上述例子中移除了 firstChild 包含的 CoroutineExceptionHandler 的話,就會導(dǎo)致 Android 應(yīng)用崩潰
十遭贸、Android ktx
Android ktx 是包含在 Android Jetpack 及其他 Android 庫中的一組 Kotlin 擴展程序戈咳。ktx 擴展程序可以為 Jetpack、Android 平臺及其他 API 提供簡潔的慣用 Kotlin 代碼壕吹,這些擴展程序利用了多種 Kotlin 語言功能著蛙,其中就包括了對 Kotlin 協(xié)程的支持
1、Lifecycle ktx
Lifecycle ktx 為每個 Lifecycle 對象(Activity耳贬、Fragment踏堡、Process 等)定義了一個 LifecycleScope,該作用域具有生命周期安全的保障咒劲,在此范圍內(nèi)啟動的協(xié)程會在 Lifecycle 被銷毀時同時取消顷蟆,可以使用 lifecycle.coroutineScope
或 lifecycleOwner.lifecycleScope
屬性來拿到該 CoroutineScope
引入依賴
dependencies {
implementation "androidx.lifecycle:lifecycle-runtime-ktx:2.4.0"
}
使用示例
class MyActivity : AppCompatActivity() {
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
lifecycleScope.launch {
//Do Something
}
lifecycle.coroutineScope.launch {
//Do Something
}
}
}
lifecycleScope
和 lifecycle.coroutineScope
兩者是等價的诫隅,lifecycleScope
只是 ktx 庫提供的一種簡便寫法。從源碼也可以看到帐偎,lifecycleScope 是存儲在抽象類 Lifecycle 的 mInternalScopeRef
字段中逐纬,且使用的是 SupervisorJob 和 Dispatchers.Main.immediate
,因此我們不必?fù)?dān)心任意子協(xié)程的異常情況會影響到全局的協(xié)程任務(wù)削樊,且其默認(rèn)就是在主線程運行協(xié)程
public val LifecycleOwner.lifecycleScope: LifecycleCoroutineScope
get() = lifecycle.coroutineScope
public val Lifecycle.coroutineScope: LifecycleCoroutineScope
get() {
while (true) {
val existing = mInternalScopeRef.get() as LifecycleCoroutineScopeImpl?
if (existing != null) {
return existing
}
val newScope = LifecycleCoroutineScopeImpl(
this,
SupervisorJob() + Dispatchers.Main.immediate
)
if (mInternalScopeRef.compareAndSet(null, newScope)) {
newScope.register()
return newScope
}
}
}
2豁生、ViewModel ktx
ViewModel ktx 庫提供了一個 viewModelScope,用于在 ViewModel 中啟動協(xié)程漫贞,該作用域的生命周期和 ViewModel 相等沛硅,當(dāng) ViewModel 回調(diào)了 onCleared()
方法時會自動取消該作用域
引入依賴
dependencies {
implementation "androidx.lifecycle:lifecycle-viewmodel-ktx:2.4.0"
}
例如,以下 fetchDocs()
方法內(nèi)就依靠 viewModelScope
啟動了一個協(xié)程绕辖,用于在后臺線程發(fā)起網(wǎng)絡(luò)請求
class MyViewModel : ViewModel() {
fun fetchDocs() {
viewModelScope.launch {
val result = get("https://developer.android.com")
show(result)
}
}
suspend fun get(url: String) = withContext(Dispatchers.IO) { /* ... */ }
}
從源碼可以看到其大體實現(xiàn)思路和lifecycleScope
類似,存儲在 ViewModel 類的 mBagOfTags
這個 Map 中擂红,且使用的也是 SupervisorJob 和 Dispatchers.Main.immediate
public val ViewModel.viewModelScope: CoroutineScope
get() {
val scope: CoroutineScope? = this.getTag(JOB_KEY)
if (scope != null) {
return scope
}
return setTagIfAbsent(
JOB_KEY,
CloseableCoroutineScope(SupervisorJob() + Dispatchers.Main.immediate)
)
}
internal class CloseableCoroutineScope(context: CoroutineContext) : Closeable, CoroutineScope {
override val coroutineContext: CoroutineContext = context
override fun close() {
coroutineContext.cancel()
}
}
3仪际、LiveData ktx
在某些情況下,我們需要先完成特定的異步計算任務(wù)昵骤,根據(jù)計算結(jié)果來向 LiveData 回調(diào)值树碱,此時就可以使用 LiveData ktx 提供的 liveData
構(gòu)建器函數(shù)來執(zhí)行 suspend 函數(shù)所代表的異步計算任務(wù)(耗時任務(wù)),并將結(jié)果賦值給 LiveData
引入依賴
dependencies {
implementation "androidx.lifecycle:lifecycle-livedata-ktx:2.4.0"
}
在以下示例中变秦,loadUser()
是在其它地方聲明的 suspend 函數(shù)欺旧,你可以使用 liveData
構(gòu)建器函數(shù)異步調(diào)用 loadUser()
羽历,然后使用 emit()
來發(fā)出結(jié)果:
val user: LiveData<User> = liveData {
val data = database.loadUser()
emit(data)
}
從源碼可以看到,我們所傳入的 suspend 任務(wù)體 block 最終是會被 CoroutineLiveData 包裝為一個 BlockRunner 對象,而 CoroutineLiveData 會在自身開始有 Observer 監(jiān)聽時執(zhí)行 blockRunner痴柔,并在所有 Observer 均被移除時自動 Cancel 掉 blockRunner
public fun <T> liveData(
context: CoroutineContext = EmptyCoroutineContext,
timeoutInMs: Long = DEFAULT_TIMEOUT,
@BuilderInference block: suspend LiveDataScope<T>.() -> Unit
): LiveData<T> = CoroutineLiveData(context, timeoutInMs, block)
internal class CoroutineLiveData<T>(
context: CoroutineContext = EmptyCoroutineContext,
timeoutInMs: Long = DEFAULT_TIMEOUT,
block: Block<T>
) : MediatorLiveData<T>() {
private var blockRunner: BlockRunner<T>?
private var emittedSource: EmittedSource? = null
init {
val supervisorJob = SupervisorJob(context[Job])
val scope = CoroutineScope(Dispatchers.Main.immediate + context + supervisorJob)
blockRunner = BlockRunner(
liveData = this,
block = block,
timeoutInMs = timeoutInMs,
scope = scope
) {
blockRunner = null
}
}
override fun onActive() {
super.onActive()
blockRunner?.maybeRun()
}
override fun onInactive() {
super.onInactive()
blockRunner?.cancel()
}
}
internal class BlockRunner<T>(
private val liveData: CoroutineLiveData<T>,
private val block: Block<T>,
private val timeoutInMs: Long,
private val scope: CoroutineScope,
private val onDone: () -> Unit
) {
// currently running block job.
private var runningJob: Job? = null
// cancelation job created in cancel.
private var cancellationJob: Job? = null
@MainThread
fun maybeRun() {
cancellationJob?.cancel()
cancellationJob = null
if (runningJob != null) {
return
}
runningJob = scope.launch {
val liveDataScope = LiveDataScopeImpl(liveData, coroutineContext)
block(liveDataScope)
onDone()
}
}
@MainThread
fun cancel() {
if (cancellationJob != null) {
error("Cancel call cannot happen without a maybeRun")
}
cancellationJob = scope.launch(Dispatchers.Main.immediate) {
delay(timeoutInMs)
if (!liveData.hasActiveObservers()) {
runningJob?.cancel()
runningJob = null
}
}
}
}
十一、參考資料
本文參考了以下文章中的很多資料郑趁,在此表示感謝