Kotlin Coroutines(協(xié)程) 完全解析系列:
Kotlin Coroutines(協(xié)程) 完全解析(一)今布,協(xié)程簡介
Kotlin Coroutines(協(xié)程) 完全解析(二)纷闺,深入理解協(xié)程的掛起浸卦、恢復與調(diào)度
Kotlin Coroutines(協(xié)程) 完全解析(三)炉抒,封裝異步回調(diào)佳镜、協(xié)程間關(guān)系及協(xié)程的取消
Kotlin Coroutines(協(xié)程) 完全解析(四)蠢络,協(xié)程的異常處理
Kotlin Coroutines(協(xié)程) 完全解析(五),協(xié)程的并發(fā)
本文基于 Kotlin v1.3.0-rc-146障斋,Kotlin-Coroutines v1.0.0-RC1
前面一篇文章協(xié)程簡介,簡單介紹了協(xié)程的一些基本概念以及其簡化異步編程的優(yōu)勢只磷,但是協(xié)程與線程有什么區(qū)別轧叽,協(xié)程的掛起與恢復是如何實現(xiàn)的甥角,還有協(xié)程運行在哪個線程上当犯,依然不是很清楚胸懈。這篇文章將分析協(xié)程的實現(xiàn)原理,一步步揭開協(xié)程的面紗枢劝。先來看看協(xié)程中最關(guān)鍵的掛起函數(shù)的實現(xiàn)原理:
1. 掛起函數(shù)的工作原理
協(xié)程的內(nèi)部實現(xiàn)使用了 Kotlin 編譯器的一些編譯技術(shù)秦驯,當掛起函數(shù)調(diào)用時,背后大致細節(jié)如下:
掛起函數(shù)或掛起 lambda 表達式調(diào)用時渐夸,都有一個隱式的參數(shù)額外傳入苫幢,這個參數(shù)是Continuation
類型,封裝了協(xié)程恢復后的執(zhí)行的代碼邏輯剩蟀。
用前文中的一個掛起函數(shù)為例:
suspend fun requestToken(): Token { ... }
實際上在 JVM 中更像下面這樣:
Object requestToken(Continuation<Token> cont) { ... }
Continuation
的定義如下,類似于一個通用的回調(diào)接口:
/**
* Interface representing a continuation after a suspension point that returns value of type `T`.
*/
public interface Continuation<in T> {
/**
* Context of the coroutine that corresponds to this continuation.
*/
public val context: CoroutineContext
/**
* Resumes the execution of the corresponding coroutine passing successful or failed [result] as the
* return value of the last suspension point.
*/
public fun resumeWith(result: Result<T>)
}
現(xiàn)在再看之前postItem
函數(shù):
suspend fun requestToken(): Token { ... } // 掛起函數(shù)
suspend fun createPost(token: Token, item: Item): Post { ... } // 掛起函數(shù)
fun processPost(post: Post) { ... }
fun postItem(item: Item) {
GlobalScope.launch {
val token = requestToken()
val post = createPost(token, item)
processPost(post)
}
}
然而偎漫,協(xié)程內(nèi)部實現(xiàn)不是使用普通回調(diào)的形式,而是使用狀態(tài)機來處理不同的掛起點有缆,大致的 CPS(Continuation Passing Style) 代碼為:
// 編譯后生成的內(nèi)部類大致如下
final class postItem$1 extends SuspendLambda ... {
public final Object invokeSuspend(Object result) {
...
switch (this.label) {
case 0:
this.label = 1;
token = requestToken(this)
break;
case 1:
this.label = 2;
Token token = result;
post = createPost(token, this.item, this)
break;
case 2:
Post post = result;
processPost(post)
break;
}
}
}
上面代碼中每一個掛起點和初始掛起點對應(yīng)的 Continuation 都會轉(zhuǎn)化為一種狀態(tài)象踊,協(xié)程恢復只是跳轉(zhuǎn)到下一種狀態(tài)中。掛起函數(shù)將執(zhí)行過程分為多個 Continuation 片段棚壁,并且利用狀態(tài)機的方式保證各個片段是順序執(zhí)行的杯矩。
1.1 掛起函數(shù)可能會掛起協(xié)程
掛起函數(shù)使用 CPS style 的代碼來掛起協(xié)程,保證掛起點后面的代碼只能在掛起函數(shù)執(zhí)行完后才能執(zhí)行袖外,所以掛起函數(shù)保證了協(xié)程內(nèi)的順序執(zhí)行順序史隆。
在多個協(xié)程的情況下,掛起函數(shù)的作用更加明顯:
fun postItem(item: Item) {
GlobalScope.launch {
// async { requestToken() } 新建一個協(xié)程曼验,可能在另一個線程運行
// 但是 await() 是掛起函數(shù)逆害,當前協(xié)程執(zhí)行邏輯卡在第一個分支,第一種狀態(tài)蚣驼,當 async 的協(xié)程執(zhí)行完后恢復當前協(xié)程魄幕,才會切換到下一個分支
val token = async { requestToken() }.await()
// 在第二個分支狀態(tài)中,又新建一個協(xié)程颖杏,使用 await 掛起函數(shù)將之后代碼作為 Continuation 放倒下一個分支狀態(tài)纯陨,直到 async 協(xié)程執(zhí)行完
val post = aync { createPost(token, item) }.await()
// 最后一個分支狀態(tài),直接在當前協(xié)程處理
processPost(post)
}
}
上面的例子中,await()
掛起函數(shù)掛起當前協(xié)程翼抠,直到異步協(xié)程完成執(zhí)行咙轩,但是這里并沒有阻塞線程,是使用狀態(tài)機的控制邏輯來實現(xiàn)阴颖。而且掛起函數(shù)可以保證掛起點之后的代碼一定在掛起點前代碼執(zhí)行完成后才會執(zhí)行活喊,掛起函數(shù)保證順序執(zhí)行,所以異步邏輯也可以用順序的代碼順序來編寫量愧。
注意掛起函數(shù)不一定會掛起協(xié)程钾菊,如果相關(guān)調(diào)用的結(jié)果已經(jīng)可用,庫可以決定繼續(xù)進行而不掛起偎肃,例如async { requestToken() }
的返回值Deferred
的結(jié)果已經(jīng)可用時煞烫,await()
掛起函數(shù)可以直接返回結(jié)果,不用再掛起協(xié)程累颂。
1.2 掛起函數(shù)不會阻塞線程
掛起函數(shù)掛起協(xié)程滞详,并不會阻塞協(xié)程所在的線程,例如協(xié)程的delay()
掛起函數(shù)會暫停協(xié)程一定時間紊馏,并不會阻塞協(xié)程所在線程料饥,但是Thread.sleep()
函數(shù)會阻塞線程。
看下面一個例子朱监,兩個協(xié)程運行在同一線程上:
fun main(args: Array<String>) {
// 創(chuàng)建一個單線程的協(xié)程調(diào)度器稀火,下面兩個協(xié)程都運行在這同一線程上
val coroutineDispatcher = newSingleThreadContext("ctx")
// 啟動協(xié)程 1
GlobalScope.launch(coroutineDispatcher) {
println("the first coroutine")
delay(200)
println("the first coroutine")
}
// 啟動協(xié)程 2
GlobalScope.launch(coroutineDispatcher) {
println("the second coroutine")
delay(100)
println("the second coroutine")
}
// 保證 main 線程存活,確保上面兩個協(xié)程運行完成
Thread.sleep(500)
}
運行結(jié)果為:
the first coroutine
the second coroutine
the second coroutine
the first coroutine
從上面結(jié)果可以看出赌朋,當協(xié)程 1 暫停 200 ms 時,線程并沒有阻塞篇裁,而是執(zhí)行協(xié)程 2 的代碼沛慢,然后在 200 ms 時間到后,繼續(xù)執(zhí)行協(xié)程 1 的邏輯达布。所以掛起函數(shù)并不會阻塞線程团甲,這樣可以節(jié)省線程資源,協(xié)程掛起時黍聂,線程可以繼續(xù)執(zhí)行其他邏輯躺苦。
1.3 掛起函數(shù)恢復協(xié)程后運行在哪個線程
協(xié)程的所屬的線程調(diào)度在前一篇文章《協(xié)程簡介》中有提到過,主要是由協(xié)程的CoroutineDispatcher
控制产还,CoroutineDispatcher
可以指定協(xié)程運行在某一特定線程上匹厘、運作在線程池中或者不指定所運行的線程。所以協(xié)程調(diào)度器可以分為Confined dispatcher
和Unconfined dispatcher
脐区,Dispatchers.Default
愈诚、Dispatchers.IO
和Dispatchers.Main
屬于Confined dispatcher
,都指定了協(xié)程所運行的線程或線程池,掛起函數(shù)恢復后協(xié)程也是運行在指定的線程或線程池上的炕柔,而Dispatchers.Unconfined
屬于Unconfined dispatcher
酌泰,協(xié)程啟動并運行在 Caller Thread 上,但是只是在第一個掛起點之前是這樣的匕累,掛起恢復后運行在哪個線程完全由所調(diào)用的掛起函數(shù)決定陵刹。
fun main(args: Array<String>) = runBlocking<Unit> {
launch { // 默認繼承 parent coroutine 的 CoroutineDispatcher,指定運行在 main 線程
println("main runBlocking: I'm working in thread ${Thread.currentThread().name}")
delay(100)
println("main runBlocking: After delay in thread ${Thread.currentThread().name}")
}
launch(Dispatchers.Unconfined) {
println("Unconfined : I'm working in thread ${Thread.currentThread().name}")
delay(100)
println("Unconfined : After delay in thread ${Thread.currentThread().name}")
}
}
輸出如下:
Unconfined : I'm working in thread main
main runBlocking: I'm working in thread main
Unconfined : After delay in thread kotlinx.coroutines.DefaultExecutor
main runBlocking: After delay in thread main
上面第三行輸出欢嘿,經(jīng)過delay
掛起函數(shù)后衰琐,使用Dispatchers.Unconfined
的協(xié)程掛起恢復后依然在delay
函數(shù)使用的DefaultExecutor
上。
2. 協(xié)程深入解析
上面更多地是通過 demo 的方式說明掛起函數(shù)函數(shù)的一些特性际插,但是協(xié)程的創(chuàng)建碘耳、啟動、恢復框弛、線程調(diào)度辛辨、協(xié)程切換是如何實現(xiàn)的呢,還是不清楚瑟枫,下面結(jié)合源碼詳細地解析協(xié)程斗搞。
2.1 協(xié)程的創(chuàng)建與啟動
先從新建一個協(xié)程開始分析協(xié)程的創(chuàng)建,最常見的協(xié)程創(chuàng)建方式為CoroutineScope.launch {}
慷妙,關(guān)鍵源碼如下:
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
...
coroutine.start(start, coroutine, block)
return coroutine
}
coroutine.start(start, coroutine, block)
默認情況下會走到startCoroutineCancellable
僻焚,最終會調(diào)用到createCoroutineUnintercepted
。
/**
* Creates unintercepted coroutine without receiver and with result type [T].
* This function creates a new, fresh instance of suspendable computation every time it is invoked.
*
* To start executing the created coroutine, invoke `resume(Unit)` on the returned [Continuation] instance.
* The [completion] continuation is invoked when coroutine completes with result or exception.
...
*/
public actual fun <T> (suspend () -> T).createCoroutineUnintercepted(
completion: Continuation<T>
): Continuation<Unit> { ... }
重點注意該方法的注釋膝擂,創(chuàng)建一個協(xié)程虑啤,創(chuàng)建了一個新的可掛起計算,通過調(diào)用resume(Unit)
啟動該協(xié)程架馋。而且返回值為Continuation
狞山,Continuation
提供了resumeWith
恢復協(xié)程的接口,用以實現(xiàn)協(xié)程恢復叉寂,Continuation
封裝了協(xié)程的代碼運行邏輯和恢復接口萍启。
再看之前協(xié)程代碼編譯生成的內(nèi)部類final class postItem$1 extends SuspendLambda ...
,協(xié)程的計算邏輯封裝在invokeSuspend
方法中屏鳍,而SuspendLambda
的繼承關(guān)系為 SuspendLambda -> ContinuationImpl -> BaseContinuationImpl -> Continuation勘纯,其中BaseContinuationImpl
部分關(guān)鍵源碼如下:
internal abstract class BaseContinuationImpl(...) {
// 實現(xiàn) Continuation 的 resumeWith,并且是 final 的钓瞭,不可被重寫
public final override fun resumeWith(result: Result<Any?>) {
...
val outcome = invokeSuspend(param)
...
}
// 由編譯生成的協(xié)程相關(guān)類來實現(xiàn)驳遵,例如 postItem$1
protected abstract fun invokeSuspend(result: Result<Any?>): Any?
}
而這部分與之前的分析也是吻合的,啟動協(xié)程流程是resume(Unit)
->resumeWith()
->invokeSuspend()
山涡,協(xié)程的掛起通過suspend
掛起函數(shù)實現(xiàn)超埋,協(xié)程的恢復通過Continuation.resumeWith
實現(xiàn)搏讶。
2.2 協(xié)程的線程調(diào)度
協(xié)程的線程調(diào)度是通過攔截器實現(xiàn)的,前面提到了協(xié)程啟動調(diào)用到了startCoroutineCancellable
霍殴,該方法實現(xiàn)為:
internal fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>) =
createCoroutineUnintercepted(completion).intercepted().resumeCancellable(Unit)
// createCoroutineUnintercepted(completion) 會創(chuàng)建一個新的協(xié)程媒惕,返回值類型為 Continuation
// intercepted() 是給 Continuation 加上 ContinuationInterceptor 攔截器,也是線程調(diào)度的關(guān)鍵
// resumeCancellable(Unit) 最終將調(diào)用 resume(Unit) 啟動協(xié)程
再看intercepted()
的具體實現(xiàn):
public actual fun <T> Continuation<T>.intercepted(): Continuation<T> =
(this as? ContinuationImpl)?.intercepted() ?: this
// ContinuationImpl 是 SuspendLambda 的父類
internal abstract class ContinuationImpl(...) : BaseContinuationImpl(completion) {
@Transient
private var intercepted: Continuation<Any?>? = null
public fun intercepted(): Continuation<Any?> =
intercepted
?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
.also { intercepted = it }
// intercepted() 方法關(guān)鍵是 context[ContinuationInterceptor]?.interceptContinuation(this)
// context[ContinuationInterceptor] 就是協(xié)程的 CoroutineDispatcher
}
public abstract class CoroutineDispatcher :
AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
/**
* Returns continuation that wraps the original [continuation], thus intercepting all resumptions.
*/
public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
DispatchedContinuation(this, continuation)
}
所以intercepted()
最終會使用協(xié)程的CoroutineDispatcher
的interceptContinuation
方法包裝原來的 Continuation来庭,攔截所有的協(xié)程運行操作妒蔚。
DispatchedContinuation
攔截了協(xié)程的啟動和恢復,分別是resumeCancellable(Unit)
和重寫的resumeWith(Result)
:
internal class DispatchedContinuation<in T>(
@JvmField val dispatcher: CoroutineDispatcher,
@JvmField val continuation: Continuation<T>
) : Continuation<T> by continuation, DispatchedTask<T> {
inline fun resumeCancellable(value: T) {
// 判斷是否需要線程調(diào)度
if (dispatcher.isDispatchNeeded(context)) {
...
// 將協(xié)程的運算分發(fā)到另一個線程
dispatcher.dispatch(context, this)
} else {
...
// 如果不需要調(diào)度月弛,直接在當前線程執(zhí)行協(xié)程運算
resumeUndispatched(value)
}
}
override fun resumeWith(result: Result<T>) {
// 判斷是否需要線程調(diào)度
if (dispatcher.isDispatchNeeded(context)) {
...
// 將協(xié)程的運算分發(fā)到另一個線程
dispatcher.dispatch(context, this)
} else {
...
// 如果不需要調(diào)度肴盏,直接在當前線程執(zhí)行協(xié)程運算
continuation.resumeWith(result)
}
}
}
internal interface DispatchedTask<in T> : Runnable {
public override fun run() {
...
// 封裝了 continuation.resume 邏輯
}
}
繼續(xù)跟蹤newSingleThreadContext()
、Dispatchers.IO
等dispatch
方法的實現(xiàn)帽衙,發(fā)現(xiàn)其實都調(diào)用了Executor.execute(Runnable)
方法菜皂,而Dispatchers.Unconfined
的實現(xiàn)更簡單,關(guān)鍵在于isDispatchNeeded()
返回為false
厉萝。
2.3 協(xié)程的掛起和恢復
Kotlin 編譯器會生成繼承自SuspendLambda
的子類恍飘,協(xié)程的真正運算邏輯都在invokeSuspend
中。但是協(xié)程掛起的具體實現(xiàn)是如何呢谴垫?先看下面示例代碼:
fun main(args: Array<String>) = runBlocking<Unit> { // 新建并啟動 blocking 協(xié)程章母,運行在 main 線程上,等待所有子協(xié)程運行完成后才會結(jié)束
launch(Dispatchers.Unconfined) { // 新建并啟動 launch 協(xié)程翩剪,沒有指定所運行線程乳怎,一開始運行在調(diào)用者所在的 main 線程上
println("${Thread.currentThread().name} : launch start")
async(Dispatchers.Default) { // 新建并啟動 async 協(xié)程,運行在 Dispatchers.Default 的線程池中
println("${Thread.currentThread().name} : async start")
delay(100) // 掛起 async 協(xié)程 100 ms
println("${Thread.currentThread().name} : async end")
}.await() // 掛起 launch 協(xié)程前弯,直到 async 協(xié)程結(jié)束
println("${Thread.currentThread().name} : launch end")
}
}
其中 launch 協(xié)程編譯生成的 SuspendLambda 子類的invokeSuspend
方法如下:
public final Object invokeSuspend(@NotNull Object result) {
Object coroutine_suspended = IntrinsicsKt.getCOROUTINE_SUSPENDED();
switch (this.label) {
case 0:
...
System.out.println(stringBuilder.append(currentThread.getName()).append(" : launch start").toString());
// 新建并啟動 async 協(xié)程
Deferred async$default = BuildersKt.async$default(coroutineScope, (CoroutineContext) Dispatchers.getDefault(), null, (Function2) new 1(null), 2, null);
this.label = 1;
// 調(diào)用 await() 掛起函數(shù)
if (async$default.await(this) == coroutine_suspended) {
return coroutine_suspended;
}
break;
case 1:
if (result instanceof Failure) {
throw ((Failure) result).exception;
}
// 恢復協(xié)程后再執(zhí)行一次 resumeWith()蚪缀,然后無異常的話執(zhí)行最后的 println()
break;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
...
System.out.println(stringBuilder2.append(currentThread2.getName()).append(" : launch end").toString());
return Unit.INSTANCE;
}
上面代碼中 launch 協(xié)程掛起的關(guān)鍵在于async$default.await(this) == coroutine_suspended
,如果此時 async 線程未執(zhí)行完成恕出,await()
返回為IntrinsicsKt.getCOROUTINE_SUSPENDED()
询枚,就會 return,launch 協(xié)程的invokeSuspend
方法執(zhí)行完成剃根,協(xié)程所在線程繼續(xù)往下運行,此時 launch 線程處于掛起狀態(tài)前方。所以協(xié)程掛起就是協(xié)程掛起點之前邏輯執(zhí)行完成狈醉,協(xié)程的運算關(guān)鍵方法resumeWith()
執(zhí)行完成,線程繼續(xù)執(zhí)行往下執(zhí)行其他邏輯惠险。
協(xié)程掛起有三點需要注意的:
啟動其他協(xié)程并不會掛起當前協(xié)程苗傅,所以
launch
和async
啟動線程時,除非新協(xié)程運行在當前線程班巩,則當前協(xié)程只能在新協(xié)程運行完成后繼續(xù)執(zhí)行渣慕,否則當前協(xié)程都會馬上繼續(xù)運行嘶炭。協(xié)程掛起并不會阻塞線程,因為協(xié)程掛起時相當于執(zhí)行完協(xié)程的方法逊桦,線程繼續(xù)執(zhí)行其他之后的邏輯眨猎。
掛起函數(shù)并一定都會掛起協(xié)程,例如
await()
掛起函數(shù)如果返回值不等于IntrinsicsKt.getCOROUTINE_SUSPENDED()
强经,則協(xié)程繼續(xù)執(zhí)行掛起點之后邏輯睡陪。
下面繼續(xù)分析await()
的實現(xiàn)原理,它的實現(xiàn)中關(guān)鍵是調(diào)用了JobSupport.awaitSuspend()
方法:
private suspend fun awaitSuspend(): Any? = suspendCoroutineUninterceptedOrReturn { uCont ->
/*
* Custom code here, so that parent coroutine that is using await
* on its child deferred (async) coroutine would throw the exception that this child had
* thrown and not a JobCancellationException.
*/
val cont = AwaitContinuation(uCont.intercepted(), this)
cont.initCancellability()
invokeOnCompletion(ResumeAwaitOnCompletion(this, cont).asHandler)
cont.getResult()
}
private class ResumeAwaitOnCompletion<T>(
job: JobSupport,
private val continuation: AbstractContinuation<T>
) : JobNode<JobSupport>(job) {
override fun invoke(cause: Throwable?) {
val state = job.state
check(state !is Incomplete)
if (state is CompletedExceptionally) {
// Resume with exception in atomic way to preserve exception
continuation.resumeWithExceptionMode(state.cause, MODE_ATOMIC_DEFAULT)
} else {
// Resuming with value in a cancellable way (AwaitContinuation is configured for this mode).
@Suppress("UNCHECKED_CAST")
continuation.resume(state as T)
}
}
override fun toString() = "ResumeAwaitOnCompletion[$continuation]"
}
上面源碼中ResumeAwaitOnCompletion
的invoke
方法的邏輯就是調(diào)用continuation.resume(state as T)
恢復協(xié)程匿情。invokeOnCompletion
函數(shù)里面是如何實現(xiàn) async 協(xié)程完成后自動恢復之前協(xié)程的呢兰迫,源碼實現(xiàn)有些復雜,因為很多邊界情況處理就不全部展開炬称,其中最關(guān)鍵的邏輯如下:
// handler 就是 ResumeAwaitOnCompletion 的實例汁果,將 handler 作為節(jié)點
val node = nodeCache ?: makeNode(handler, onCancelling).also { nodeCache = it }
// 將 node 節(jié)點添加到 state.list 中
if (!addLastAtomic(state, list, node)) return@loopOnState // retry
接下來我斷點調(diào)試 launch 協(xié)程恢復的過程,從 async 協(xié)程的SuspendLambda
的子類的completion.resumeWith(outcome)
-> AbstractCoroutine.resumeWith(result)
..-> JobSupport.tryFinalizeSimpleState()
-> JobSupport.completeStateFinalization()
-> state.list?.notifyCompletion(cause)
-> node.invoke
玲躯,最后 handler 節(jié)點里面通過調(diào)用resume(result)
恢復協(xié)程据德。
所以await()
掛起函數(shù)恢復協(xié)程的原理是,將 launch 協(xié)程封裝為 ResumeAwaitOnCompletion 作為 handler 節(jié)點添加到 aynsc 協(xié)程的 state.list府蔗,然后在 async 協(xié)程完成時會通知 handler 節(jié)點調(diào)用 launch 協(xié)程的 resume(result) 方法將結(jié)果傳給 launch 協(xié)程晋控,并恢復 launch 協(xié)程繼續(xù)執(zhí)行 await 掛起點之后的邏輯。
而這過程中有兩個final
的resumeWith
方法姓赤,一個是SuspendLambda
的父類BaseContinuationImpl
的赡译,我們再來詳細分析一篇:
internal abstract class BaseContinuationImpl(
public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {
public final override fun resumeWith(result: Result<Any?>) {
...
var param = result
while (true) {
with(current) {
val completion = completion!!
val outcome: Result<Any?> =
try {
// 調(diào)用 invokeSuspend 方法執(zhí)行,執(zhí)行協(xié)程的真正運算邏輯
val outcome = invokeSuspend(param)
// 協(xié)程掛起時 invokeSuspend 才會返回 COROUTINE_SUSPENDED不铆,所以協(xié)程掛起時蝌焚,其實只是協(xié)程的 resumeWith 運行邏輯執(zhí)行完成,再次調(diào)用 resumeWith 時誓斥,協(xié)程掛起點之后的邏輯才能繼續(xù)執(zhí)行
if (outcome === COROUTINE_SUSPENDED) return
Result.success(outcome)
} catch (exception: Throwable) {
Result.failure(exception)
}
releaseIntercepted() // this state machine instance is terminating
// 這里可以看出 Continuation 其實分為兩類只洒,一種是 BaseContinuationImpl,封裝了協(xié)程的真正運算邏輯
if (completion is BaseContinuationImpl) {
// unrolling recursion via loop
current = completion
param = outcome
} else {
// 斷點時發(fā)現(xiàn) completion 是 DeferredCoroutine 實例劳坑,這里實際調(diào)用的是其父類 AbstractCoroutine 的 resumeWith 方法
completion.resumeWith(outcome)
return
}
}
}
}
}
接下來再來看另外一類 Continuation毕谴,AbstractCoroutine 的resumeWith
實現(xiàn):
public abstract class AbstractCoroutine<in T>(
@JvmField
protected val parentContext: CoroutineContext,
active: Boolean = true
) : JobSupport(active), Job, Continuation<T>, CoroutineScope {
/**
* Completes execution of this with coroutine with the specified result.
*/
public final override fun resumeWith(result: Result<T>) {
// makeCompletingOnce 大致實現(xiàn)是修改協(xié)程狀態(tài),如果需要的話還會將結(jié)果返回給調(diào)用者協(xié)程距芬,并恢復調(diào)用者協(xié)程
makeCompletingOnce(result.toState(), defaultResumeMode)
}
}
所以其中一類 Continuation BaseContinuationImpl
的resumeWith
封裝了協(xié)程的運算邏輯涝开,用以協(xié)程的啟動和恢復;而另一類 Continuation AbstractCoroutine
框仔,主要是負責維護協(xié)程的狀態(tài)和管理舀武,它的resumeWith
則是完成協(xié)程,恢復調(diào)用者協(xié)程离斩。
2.4 協(xié)程的三層包裝
通過一步步的分析银舱,慢慢發(fā)現(xiàn)協(xié)程其實有三層包裝瘪匿。常用的launch
和async
返回的Job
、Deferred
寻馏,里面封裝了協(xié)程狀態(tài)棋弥,提供了取消協(xié)程接口,而它們的實例都是繼承自AbstractCoroutine
操软,它是協(xié)程的第一層包裝嘁锯。第二層包裝是編譯器生成的SuspendLambda
的子類,封裝了協(xié)程的真正運算邏輯聂薪,繼承自BaseContinuationImpl
家乘,其中completion
屬性就是協(xié)程的第一層包裝。第三層包裝是前面分析協(xié)程的線程調(diào)度時提到的DispatchedContinuation
藏澳,封裝了線程調(diào)度邏輯仁锯,包含了協(xié)程的第二層包裝。三層包裝都實現(xiàn)了Continuation
接口翔悠,通過代理模式將協(xié)程的各層包裝組合在一起业崖,每層負責不同的功能。
下面是協(xié)程運行的流程圖:
3. 小結(jié)
經(jīng)過以上解析之后蓄愁,再來看協(xié)程就是一段可以掛起和恢復執(zhí)行的運算邏輯双炕,而協(xié)程的掛起是通過掛起函數(shù)實現(xiàn)的,掛起函數(shù)用狀態(tài)機的方式用掛起點將協(xié)程的運算邏輯拆分為不同的片段撮抓,每次運行協(xié)程執(zhí)行的不同的邏輯片段妇斤。所以協(xié)程有兩個很大的好處:一是簡化異步編程,支持異步返回丹拯;而是掛起不阻塞線程站超,提供線程利用率。