Kotlin Coroutines 協(xié)程其實(shí)挺簡單
本文收錄于: https://github.com/mengdd/KotlinTutorials
Coroutines概念
Coroutines(協(xié)程), 計算機(jī)程序組件, 通過允許任務(wù)掛起和恢復(fù)執(zhí)行, 來支持非搶占式的多任務(wù). (見Wiki).
協(xié)程主要是為了異步, 非阻塞的代碼. 這個概念并不是Kotlin特有的, Go, Python等多個語言中都有支持.
Kotlin Coroutines
Kotlin中用協(xié)程來做異步和非阻塞任務(wù), 主要優(yōu)點(diǎn)是代碼可讀性好, 不用回調(diào)函數(shù). (用協(xié)程寫的異步代碼乍一看很像同步代碼.)
Kotlin對協(xié)程的支持是在語言級別的, 在標(biāo)準(zhǔn)庫中只提供了最低程度的APIs, 然后把很多功能都代理到庫中.
Kotlin中只加了suspend
作為關(guān)鍵字.
async
和await
不是Kotlin的關(guān)鍵字, 也不是標(biāo)準(zhǔn)庫的一部分.
比起futures和promises, kotlin中suspending function
的概念為異步操作提供了一種更安全和不易出錯的抽象.
kotlinx.coroutines
是協(xié)程的庫, 為了使用它的核心功能, 項(xiàng)目需要增加kotlinx-coroutines-core
的依賴.
Coroutines Basics: 協(xié)程到底是什么?
先上一段官方的demo:
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
fun main() {
GlobalScope.launch { // launch a new coroutine in background and continue
delay(1000L) // non-blocking delay for 1 second (default time unit is ms)
println("World!") // print after delay
}
println("Hello,") // main thread continues while coroutine is delayed
Thread.sleep(2000L) // block main thread for 2 seconds to keep JVM alive
}
這段代碼的輸出:
先打印Hello, 延遲1s之后, 打印World.
對這段代碼的解釋:
launch
開始了一個計算, 這個計算是可掛起的(suspendable), 它在計算過程中, 釋放了底層的線程, 當(dāng)協(xié)程執(zhí)行完成, 就會恢復(fù)(resume).
這種可掛起的計算就叫做一個協(xié)程(coroutine). 所以我們可以簡單地說launch
開始了一個新的協(xié)程.
注意, 主線程需要等待協(xié)程結(jié)束, 如果注釋掉最后一行的Thread.sleep(2000L)
, 則只打印Hello, 沒有World.
協(xié)程和線程的關(guān)系
coroutine(協(xié)程)可以理解為輕量級的線程. 多個協(xié)程可以并行運(yùn)行, 互相等待, 互相通信. 協(xié)程和線程的最大區(qū)別就是協(xié)程非常輕量(cheap), 我們可以創(chuàng)建成千上萬個協(xié)程而不必考慮性能.
協(xié)程是運(yùn)行在線程上可以被掛起的運(yùn)算. 可以被掛起, 意味著運(yùn)算可以被暫停, 從線程移除, 存儲在內(nèi)存里. 此時, 線程就可以自由做其他事情. 當(dāng)計算準(zhǔn)備好繼續(xù)進(jìn)行時, 它會返回線程(但不一定要是同一個線程).
默認(rèn)情況下, 協(xié)程運(yùn)行在一個共享的線程池里, 線程還是存在的, 只是一個線程可以運(yùn)行多個協(xié)程, 所以線程沒必要太多.
調(diào)試
在上面的代碼中加上線程的名字:
fun main() {
GlobalScope.launch {
// launch a new coroutine in background and continue
delay(1000L) // non-blocking delay for 1 second (default time unit is ms)
println("World! + ${Thread.currentThread().name}") // print after delay
}
println("Hello, + ${Thread.currentThread().name}") // main thread continues while coroutine is delayed
Thread.sleep(2000L) // block main thread for 2 seconds to keep JVM alive
}
可以在IDE的Edit Configurations中設(shè)置VM options: -Dkotlinx.coroutines.debug
, 運(yùn)行程序, 會在log中打印出代碼運(yùn)行的協(xié)程信息:
Hello, + main
World! + DefaultDispatcher-worker-1 @coroutine#1
suspend function
上面例子中的delay
方法是一個suspend function
.
delay()
和Thread.sleep()
的區(qū)別是: delay()
方法可以在不阻塞線程的情況下延遲協(xié)程. (It doesn't block a thread, but only suspends the coroutine itself). 而Thread.sleep()
則阻塞了當(dāng)前線程.
所以, suspend的意思就是協(xié)程作用域被掛起了, 但是當(dāng)前線程中協(xié)程作用域之外的代碼不被阻塞.
如果把GlobalScope.launch
替換為thread
, delay方法下面會出現(xiàn)紅線報錯:
Suspend functions are only allowed to be called from a coroutine or another suspend function
suspend方法只能在協(xié)程或者另一個suspend方法中被調(diào)用.
在協(xié)程等待的過程中, 線程會返回線程池, 當(dāng)協(xié)程等待結(jié)束, 協(xié)程會在線程池中一個空閑的線程上恢復(fù). (The thread is returned to the pool while the coroutine is waiting, and when the waiting is done, the coroutine resumes on a free thread in the pool.)
啟動協(xié)程
啟動一個新的協(xié)程, 常用的主要有以下幾種方式:
launch
async
runBlocking
它們被稱為coroutine builders
. 不同的庫可以定義其他更多的構(gòu)建方式.
runBlocking: 連接blocking和non-blocking的世界
runBlocking
用來連接阻塞和非阻塞的世界.
runBlocking
可以建立一個阻塞當(dāng)前線程的協(xié)程. 所以它主要被用來在main函數(shù)中或者測試中使用, 作為連接函數(shù).
比如前面的例子可以改寫成:
fun main() = runBlocking<Unit> {
// start main coroutine
GlobalScope.launch {
// launch a new coroutine in background and continue
delay(1000L)
println("World! + ${Thread.currentThread().name}")
}
println("Hello, + ${Thread.currentThread().name}") // main coroutine continues here immediately
delay(2000L) // delaying for 2 seconds to keep JVM alive
}
最后不再使用Thread.sleep()
, 使用delay()
就可以了.
程序輸出:
Hello, + main @coroutine#1
World! + DefaultDispatcher-worker-1 @coroutine#2
launch: 返回Job
上面的例子delay了一段時間來等待一個協(xié)程結(jié)束, 不是一個好的方法.
launch
返回Job
, 代表一個協(xié)程, 我們可以用Job
的join()
方法來顯式地等待這個協(xié)程結(jié)束:
fun main() = runBlocking {
val job = GlobalScope.launch {
// launch a new coroutine and keep a reference to its Job
delay(1000L)
println("World! + ${Thread.currentThread().name}")
}
println("Hello, + ${Thread.currentThread().name}")
job.join() // wait until child coroutine completes
}
輸出結(jié)果和上面是一樣的.
Job
還有一個重要的用途是cancel()
, 用于取消不再需要的協(xié)程任務(wù).
async: 從協(xié)程返回值
async
開啟協(xié)程, 返回Deferred<T>
, Deferred<T>
是Job
的子類, 有一個await()
函數(shù), 可以返回協(xié)程的結(jié)果.
await()
也是suspend函數(shù), 只能在協(xié)程之內(nèi)調(diào)用.
fun main() = runBlocking {
// @coroutine#1
println(Thread.currentThread().name)
val deferred: Deferred<Int> = async {
// @coroutine#2
loadData()
}
println("waiting..." + Thread.currentThread().name)
println(deferred.await()) // suspend @coroutine#1
}
suspend fun loadData(): Int {
println("loading..." + Thread.currentThread().name)
delay(1000L) // suspend @coroutine#2
println("loaded!" + Thread.currentThread().name)
return 42
}
運(yùn)行結(jié)果:
main @coroutine#1
waiting...main @coroutine#1
loading...main @coroutine#2
loaded!main @coroutine#2
42
Context, Dispatcher和Scope
看一下launch
方法的聲明:
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
...
}
其中有幾個相關(guān)概念我們要了解一下.
協(xié)程總是在一個context下運(yùn)行, 類型是接口CoroutineContext
. 協(xié)程的context是一個索引集合, 其中包含各種元素, 重要元素就有Job
和dispatcher. Job
代表了這個協(xié)程, 那么dispatcher是做什么的呢?
構(gòu)建協(xié)程的coroutine builder: launch
, async
, 都是CoroutineScope
類型的擴(kuò)展方法. 查看CoroutineScope
接口, 其中含有CoroutineContext
的引用. scope是什么? 有什么作用呢?
下面我們就來回答這些問題.
Dispatchers和線程
Context中的CoroutineDispatcher
可以指定協(xié)程運(yùn)行在什么線程上. 可以是一個指定的線程, 線程池, 或者不限.
看一個例子:
fun main() = runBlocking<Unit> {
launch {
// context of the parent, main runBlocking coroutine
println("main runBlocking : I'm working in thread ${Thread.currentThread().name}")
}
launch(Dispatchers.Unconfined) {
// not confined -- will work with main thread
println("Unconfined : I'm working in thread ${Thread.currentThread().name}")
}
launch(Dispatchers.Default) {
// will get dispatched to DefaultDispatcher
println("Default : I'm working in thread ${Thread.currentThread().name}")
}
launch(newSingleThreadContext("MyOwnThread")) {
// will get its own new thread
println("newSingleThreadContext: I'm working in thread ${Thread.currentThread().name}")
}
}
運(yùn)行后打印出:
Unconfined : I'm working in thread main
Default : I'm working in thread DefaultDispatcher-worker-1
newSingleThreadContext: I'm working in thread MyOwnThread
main runBlocking : I'm working in thread main
API提供了幾種選項(xiàng):
-
Dispatchers.Default
代表使用JVM上的共享線程池, 其大小由CPU核數(shù)決定, 不過即便是單核也有兩個線程. 通常用來做CPU密集型工作, 比如排序或復(fù)雜計算等. -
Dispatchers.Main
指定主線程, 用來做UI更新相關(guān)的事情. (需要添加依賴, 比如kotlinx-coroutines-android
.) 如果我們在主線程上啟動一個新的協(xié)程時, 主線程忙碌, 這個協(xié)程也會被掛起, 僅當(dāng)線程有空時會被恢復(fù)執(zhí)行. -
Dispatchers.IO
: 采用on-demand創(chuàng)建的線程池, 用于網(wǎng)絡(luò)或者是讀寫文件的工作. -
Dispatchers.Unconfined
: 不指定特定線程, 這是一個特殊的dispatcher.
如果不明確指定dispatcher, 協(xié)程將會繼承它被啟動的那個scope的context(其中包含了dispatcher).
在實(shí)踐中, 更推薦使用外部scope的dispatcher, 由調(diào)用方?jīng)Q定上下文. 這樣也方便測試.
newSingleThreadContext
創(chuàng)建了一個線程來跑協(xié)程, 一個專注的線程算是一種昂貴的資源, 在實(shí)際的應(yīng)用中需要被釋放或者存儲復(fù)用.
切換線程還可以用withContext
, 可以在指定的協(xié)程context下運(yùn)行代碼, 掛起直到它結(jié)束, 返回結(jié)果.
另一種方式是新啟一個協(xié)程, 然后用join
明確地掛起等待.
在Android這種UI應(yīng)用中, 比較常見的做法是, 頂部協(xié)程用CoroutineDispatchers.Main
, 當(dāng)需要在別的線程上做一些事情的時候, 再明確指定一個不同的dispatcher.
Scope是什么?
當(dāng)launch
, async
或runBlocking
開啟新協(xié)程的時候, 它們自動創(chuàng)建相應(yīng)的scope. 所有的這些方法都有一個帶receiver的lambda參數(shù), 默認(rèn)的receiver類型是CoroutineScope
.
IDE會提示this: CoroutineScope
:
launch { /* this: CoroutineScope */
}
當(dāng)我們在runBlocking
, launch
, 或async
的大括號里面再創(chuàng)建一個新的協(xié)程的時候, 自動就在這個scope里創(chuàng)建:
fun main() = runBlocking {
/* this: CoroutineScope */
launch { /* ... */ }
// the same as:
this.launch { /* ... */ }
}
因?yàn)?code>launch是一個擴(kuò)展方法, 所以上面例子中默認(rèn)的receiver是this
.
這個例子中launch
所啟動的協(xié)程被稱作外部協(xié)程(runBlocking
啟動的協(xié)程)的child. 這種"parent-child"的關(guān)系通過scope傳遞: child在parent的scope中啟動.
協(xié)程的父子關(guān)系:
- 當(dāng)一個協(xié)程在另一個協(xié)程的scope中被啟動時, 自動繼承其context, 并且新協(xié)程的Job會作為父協(xié)程Job的child.
所以, 關(guān)于scope目前有兩個關(guān)鍵知識點(diǎn):
- 我們開啟一個協(xié)程的時候, 總是在一個
CoroutineScope
里. - Scope用來管理不同協(xié)程之間的父子關(guān)系和結(jié)構(gòu).
協(xié)程的父子關(guān)系有以下兩個特性:
- 父協(xié)程被取消時, 所有的子協(xié)程都被取消.
- 父協(xié)程永遠(yuǎn)會等待所有的子協(xié)程結(jié)束.
值得注意的是, 也可以不啟動協(xié)程就創(chuàng)建一個新的scope. 創(chuàng)建scope可以用工廠方法: MainScope()
或CoroutineScope()
.
coroutineScope()
方法也可以創(chuàng)建scope. 當(dāng)我們需要以結(jié)構(gòu)化的方式在suspend函數(shù)內(nèi)部啟動新的協(xié)程, 我們創(chuàng)建的新的scope, 自動成為suspend函數(shù)被調(diào)用的外部scope的child.
所以上面的父子關(guān)系, 可以進(jìn)一步抽象到, 沒有parent協(xié)程, 由scope來管理其中所有的子協(xié)程.
(注意: 實(shí)際上scope會提供默認(rèn)job, cancel
操作是由scope中的job支持的.)
Scope在實(shí)際應(yīng)用中解決什么問題呢? 如果我們的應(yīng)用中, 有一個對象是有自己的生命周期的, 但是這個對象又不是協(xié)程, 比如Android應(yīng)用中的Activity, 其中啟動了一些協(xié)程來做異步操作, 更新數(shù)據(jù)等, 當(dāng)Activity被銷毀的時候需要取消所有的協(xié)程, 來避免內(nèi)存泄漏. 我們就可以利用CoroutineScope
來做這件事: 創(chuàng)建一個CoroutineScope
對象和activity的生命周期綁定, 或者讓activity實(shí)現(xiàn)CoroutineScope
接口.
所以, scope的主要作用就是記錄所有的協(xié)程, 并且可以取消它們.
A CoroutineScope keeps track of all your coroutines, and it can cancel all of the coroutines started in it.
Structured Concurrency
這種利用scope將協(xié)程結(jié)構(gòu)化組織起來的機(jī)制, 被稱為"structured concurrency".
好處是:
- scope自動負(fù)責(zé)子協(xié)程, 子協(xié)程的生命和scope綁定.
- scope可以自動取消所有的子協(xié)程.
- scope自動等待所有的子協(xié)程結(jié)束. 如果scope和一個parent協(xié)程綁定, 父協(xié)程會等待這個scope中所有的子協(xié)程完成.
通過這種結(jié)構(gòu)化的并發(fā)模式: 我們可以在創(chuàng)建top級別的協(xié)程時, 指定主要的context一次, 所有嵌套的協(xié)程會自動繼承這個context, 只在有需要的時候進(jìn)行修改即可.
GlobalScope: daemon
GlobalScope
啟動的協(xié)程都是獨(dú)立的, 它們的生命只受到application的限制. 即GlobalScope
啟動的協(xié)程沒有parent, 和它被啟動時所在的外部的scope沒有關(guān)系.
launch(Dispatchers.Default) { ... }
和GlobalScope.launch { ... }
用的dispatcher是一樣的.
GlobalScope
啟動的協(xié)程并不會保持進(jìn)程活躍. 它們就像daemon threads(守護(hù)線程)一樣, 如果JVM發(fā)現(xiàn)沒有其他一般的線程, 就會關(guān)閉.
Key takeaways
- Coroutine協(xié)程機(jī)制: suspend, resume, 簡化回調(diào)代碼.
- suspend方法.
- 啟動協(xié)程的幾種方法.
- Dispatcher指定線程.
- Structured Concurrency: 依靠scope來架構(gòu)化管理協(xié)程.
參考
- Coroutine Wiki
- 官方文檔 Overview頁
- 官方文檔 Coroutines Guide
- Asynchronous Programming Techniques
- Your first coroutine with Kotlin
- Introduction to Coroutines and Channels
- Github: Kotlin/kotlinx.coroutines
- Github: Coroutines Guide
- Github: KEEP: Kotlin Coroutines
第三方博客:
- Coroutines on Android (part I): Getting the background
- Async Operations with Kotlin Coroutines — Part 1
- Kotlin Coroutines Tutorial for Android
- Coroutine Context and Scope
歡迎關(guān)注公眾號: 圣騎士Wind