Coroutine協(xié)程是kotlin實現(xiàn)的一種異步執(zhí)行邏輯的方式晌纫,相對與傳統(tǒng)的線程,協(xié)程更加簡潔永丝,高效缸匪,占用資源少。那協(xié)程到底是怎么實現(xiàn)異步的呢?
線程
在現(xiàn)在的操作系統(tǒng)中苫纤,線程是CPU調(diào)度的最少單元液斜。所有的程序邏輯運行在線程之上。在Java API中榨咐, Thread是實現(xiàn)線程的基本類。它的內(nèi)部實現(xiàn)是大量的 JNI 調(diào)用,因為線程的實現(xiàn)必須由操作系統(tǒng)直接提供支持蛇耀。在 Android 平臺上,Thread 的創(chuàng)建過程中坎弯,會調(diào)用 Linux API 中的 pthread_create 函數(shù)纺涤,這直接說明了 Java 層中的 Thread 和 Linux 系統(tǒng)級別的中的線程是一一對應的译暂。
線程的問題是阻塞與運行兩種狀態(tài)之間的切換有相當大的資源開銷,線程并不是一種輕量級資源撩炊,大量創(chuàng)建線程是對系統(tǒng)資源的一種消耗外永,而線程的阻塞調(diào)用會導致系統(tǒng)中存在大量因阻塞而不運行的線程,這對系統(tǒng)資源是一種極大的浪費拧咳。
協(xié)程
協(xié)程本質(zhì)上可以認為是運行在線程上的代碼塊伯顶,協(xié)程提供的 掛起 操作會使協(xié)程暫停執(zhí)行,而不會導致線程阻塞骆膝。而且協(xié)程是一種輕量級資源祭衩,一個應用中即使創(chuàng)建了上千個協(xié)程也不會造成太大的負擔。
協(xié)程的是通過’suspend‘修飾符來修飾需要掛起的方法阅签。suspend并不是Java的API掐暮,是kotlin通過編譯器實現(xiàn)的。
CPS 變換
被 suspend 修飾符修飾的函數(shù)在編譯期間會被編譯器做特殊處理政钟,這個特殊處理的第一步就是做CPS 變換路克。
CPS (Continuation-passing style)變換是一種編程風格,就是將控制流顯式表示為continuation的一種編程風格. 簡單來理解就是顯式使用函數(shù)表示函數(shù)返回的后續(xù)操作锥涕。
例如:
suspend fun <T> foo<T>.await(): T
在編譯期發(fā)生 CPS 變換之后:
fun <T> foo<T>.await(continuation: Continuation<T>): Any?
CPS 變換后的函數(shù)多了一個 Continuation<T> 類型的參數(shù)衷戈,Continuation就是續(xù)體。源碼:
interface Continuation<in T> {
val context: CoroutineContext
fun resumeWith(result: Result<T>)
}
Continuation是一個抽象的概念层坠,簡單來說它包裝了協(xié)程在掛起之后應該繼續(xù)執(zhí)行的代碼殖妇;在編譯的過程中,一個完整的協(xié)程被分割切塊成多個續(xù)體破花。在 await 函數(shù)的掛起結(jié)束以后谦趣,它會調(diào)用 continuation 參數(shù)的 resumeWith 函數(shù),來恢復執(zhí)行 await 函數(shù)后面的代碼座每。
方法經(jīng)過CPS 變換之后前鹅,返回值類型變成了 Any?,這是因為這個函數(shù)在發(fā)生變換后峭梳,除了要返回它本身的返回值舰绘,還要返回一個標記——COROUTINE_SUSPENDED,而這個返回類型事實上是返回類型 T 與 COROUTINE_SUSPENDED 的聯(lián)合類型葱椭。由于Kotlin 中沒有聯(lián)合類型捂寿,所以只好用最泛化的類型 Any? 來表示,而 COROUTINE_SUSPENDED 是一個標記孵运,返回它的掛起函數(shù)表示這個掛起函數(shù)會發(fā)生事實上的掛起操作秦陋。
狀態(tài)機
Continuation為了直接支持掛起(即使協(xié)程在掛起點中斷執(zhí)行而在適當?shù)臅r機在恢復)操作,編譯器在編譯掛起函數(shù)時會將函數(shù)體編譯為狀態(tài)機治笨。主要是為了性能考慮驳概,避免多創(chuàng)建類和對象赤嚼。
如:
val a = a()
val y = foo(a).await() // #1
b()
val z = bar(a, y).await() // #2
c(z)
編譯之后生成的偽代碼:
class <anonymous_for_state_machine> extends SuspendLambda<...> {
// 狀態(tài)機當前狀態(tài)
int label = 0
// 協(xié)程的局部變量
A a = null
Y y = null
void resumeWith(Object result) {
if (label == 0) goto L0
if (label == 1) goto L1
if (label == 2) goto L2
else throw IllegalStateException()
L0:
// 這次調(diào)用,result 應該為空
a = a()
label = 1
result = foo(a).await(this) // 'this' 作為續(xù)體傳遞
if (result == COROUTINE_SUSPENDED) return // 如果 await 掛起了執(zhí)行則返回
L1:
// 外部代碼傳入 .await() 的結(jié)果恢復協(xié)程
y = (Y) result
b()
label = 2
result = bar(a, y).await(this) // 'this' 作為續(xù)體傳遞
if (result == COROUTINE_SUSPENDED) return // 如果 await 掛起了執(zhí)行則返回
L2:
// 外部代碼傳入 .await() 的結(jié)果恢復協(xié)程
Z z = (Z) result
c(z)
label = -1 // 沒有其他步驟了
return
}
}
一個掛起函數(shù)會被編譯成一個匿名類顺又,匿名類中的一個函數(shù)實現(xiàn)了這個狀態(tài)機更卒。成員變量 label 代表了當前狀態(tài)機的狀態(tài),每一個續(xù)體(即掛起點中間的部分以及掛起點與函數(shù)頭尾之間的部分)都各自對應了一個狀態(tài)待榔,當函數(shù)運行到每個掛起點時逞壁,label 的值都受限會發(fā)生改變,并且當前的續(xù)體(也就是代碼中的this)都會作為實參傳遞給發(fā)生了 CPS 變換的掛起函數(shù)锐锣,如果這個掛起函數(shù)沒有發(fā)生事實上的掛起腌闯,函數(shù)繼續(xù)運行,如果發(fā)生了事實上的掛起雕憔,則函數(shù)直接 return姿骏。
由于 label 記錄了狀態(tài),所以在協(xié)程恢復的時候斤彼,可以根據(jù)狀態(tài)使用 goto 語句直接跳轉(zhuǎn)至上次的掛起點并向后執(zhí)行分瘦,這就是協(xié)程掛起的原理。另外琉苇,雖然 Java 中沒有 goto 語句嘲玫,但是 class 字節(jié)碼中支持 goto。
續(xù)體攔截器
掛起函數(shù)在恢復的時候并扇,理論上可能會在任何一個線程上恢復去团,有時我們需要限定協(xié)程運行在指定的線程,例如在Android中穷蛹,更新 UI 的操作只能在 UI 主線程中進行土陪。
android MainDispatcherLoader的實現(xiàn):
// Main 調(diào)度器
@JvmStatic
public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher
// dispatcher 由 loadMainDispatcher() 函數(shù)創(chuàng)建
internal object MainDispatcherLoader {
@JvmField
val dispatcher: MainCoroutineDispatcher = loadMainDispatcher()
private fun loadMainDispatcher(): MainCoroutineDispatcher {
......
}
}
// MainCoroutineDispatcher
public abstract class MainCoroutineDispatcher : CoroutineDispatcher() {
@ExperimentalCoroutinesApi
public abstract val immediate: MainCoroutineDispatcher
}
// CoroutineDispatcher
public abstract class CoroutineDispatcher :
AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
......
}
@InternalCoroutinesApi
public fun MainDispatcherFactory.tryCreateDispatcher(factories: List<MainDispatcherFactory>): MainCoroutineDispatcher =
try {
createDispatcher(factories)
} catch (cause: Throwable) {
MissingMainCoroutineDispatcher(cause, hintOnError())
}
/**
* @suppress
*/
@InternalCoroutinesApi
public object MissingMainCoroutineDispatcherFactory : MainDispatcherFactory {
override val loadPriority: Int
get() = -1
override fun createDispatcher(allFactories: List<MainDispatcherFactory>): MainCoroutineDispatcher {
return MissingMainCoroutineDispatcher(null)
}
}
// ContinuationInterceptor(續(xù)體攔截器)
public interface ContinuationInterceptor : CoroutineContext.Element {
companion object Key : CoroutineContext.Key<ContinuationInterceptor>
public fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T>
public fun releaseInterceptedContinuation(continuation: Continuation<*>) {
/* do nothing by default */
}
// Performance optimization for a singleton Key
public override operator fun <E : CoroutineContext.Element> get(key: CoroutineContext.Key<E>): E? =
@Suppress("UNCHECKED_CAST")
if (key === Key) this as E else null
// Performance optimization to a singleton Key
public override fun minusKey(key: CoroutineContext.Key<*>): CoroutineContext =
if (key === Key) EmptyCoroutineContext else this
}
ContinuationInterceptor,負責攔截協(xié)程在恢復后應執(zhí)行的代碼(即續(xù)體)并將其在指定線程或線程池恢復
在掛起函數(shù)的編譯中肴熏,每個掛起函數(shù)都會被編譯為一個實現(xiàn)了 Continuation 接口的匿名類鬼雀,而續(xù)體攔截器會攔截真正掛起協(xié)程的掛起點的續(xù)體。在協(xié)程中調(diào)用掛起函數(shù)蛙吏,掛起函數(shù)不一定會真正掛起協(xié)程
如:
launch {
val deferred = async {
// 異步邏輯
......
}
......
deferred.await()
......
}
在 deferred.await() 這行執(zhí)行的時候源哩,如果異步邏輯已經(jīng)執(zhí)行完成并取得了結(jié)果,那 await 函數(shù)會直接取得結(jié)果鸦做,而不會掛起協(xié)程璧疗。相反,如果網(wǎng)絡(luò)請求還未產(chǎn)生結(jié)果馁龟,await 函數(shù)就會使協(xié)程掛起。續(xù)體攔截器只攔截真正發(fā)生掛起的掛起點后的續(xù)體漆魔,對于未發(fā)生掛起的掛起點坷檩,續(xù)體會被直接調(diào)用 resumeWith 這一類的函數(shù)而不需要續(xù)攔截器對它進行操作却音。除此之外,續(xù)體攔截器還會緩存攔截過的續(xù)體矢炼,并且在不再需要它的時候調(diào)用 releaseInterceptedContinuation 函數(shù)釋放它系瓢。