Kotlin Coroutines(協(xié)程) 完全解析(二)棵逊,深入理解協(xié)程的掛起秸歧、恢復與調(diào)度

Kotlin Coroutines(協(xié)程) 完全解析系列:

Kotlin Coroutines(協(xié)程) 完全解析(一)今布,協(xié)程簡介

Kotlin Coroutines(協(xié)程) 完全解析(二)纷闺,深入理解協(xié)程的掛起浸卦、恢復與調(diào)度

Kotlin Coroutines(協(xié)程) 完全解析(三)炉抒,封裝異步回調(diào)佳镜、協(xié)程間關(guān)系及協(xié)程的取消

Kotlin Coroutines(協(xié)程) 完全解析(四)蠢络,協(xié)程的異常處理

Kotlin Coroutines(協(xié)程) 完全解析(五),協(xié)程的并發(fā)

本文基于 Kotlin v1.3.0-rc-146障斋,Kotlin-Coroutines v1.0.0-RC1

前面一篇文章協(xié)程簡介,簡單介紹了協(xié)程的一些基本概念以及其簡化異步編程的優(yōu)勢只磷,但是協(xié)程與線程有什么區(qū)別轧叽,協(xié)程的掛起與恢復是如何實現(xiàn)的甥角,還有協(xié)程運行在哪個線程上当犯,依然不是很清楚胸懈。這篇文章將分析協(xié)程的實現(xiàn)原理,一步步揭開協(xié)程的面紗枢劝。先來看看協(xié)程中最關(guān)鍵的掛起函數(shù)的實現(xiàn)原理:

1. 掛起函數(shù)的工作原理

協(xié)程的內(nèi)部實現(xiàn)使用了 Kotlin 編譯器的一些編譯技術(shù)秦驯,當掛起函數(shù)調(diào)用時,背后大致細節(jié)如下:

掛起函數(shù)或掛起 lambda 表達式調(diào)用時渐夸,都有一個隱式的參數(shù)額外傳入苫幢,這個參數(shù)是Continuation類型,封裝了協(xié)程恢復后的執(zhí)行的代碼邏輯剩蟀。

用前文中的一個掛起函數(shù)為例:

suspend fun requestToken(): Token { ... }

實際上在 JVM 中更像下面這樣:

Object requestToken(Continuation<Token> cont) { ... }

Continuation的定義如下,類似于一個通用的回調(diào)接口:

/**
 * Interface representing a continuation after a suspension point that returns value of type `T`.
 */
public interface Continuation<in T> {
    /**
     * Context of the coroutine that corresponds to this continuation.
     */
    public val context: CoroutineContext

    /**
     * Resumes the execution of the corresponding coroutine passing successful or failed [result] as the
     * return value of the last suspension point.
     */
    public fun resumeWith(result: Result<T>)
}

現(xiàn)在再看之前postItem函數(shù):

suspend fun requestToken(): Token { ... }   // 掛起函數(shù)
suspend fun createPost(token: Token, item: Item): Post { ... }  // 掛起函數(shù)
fun processPost(post: Post) { ... }

fun postItem(item: Item) {
    GlobalScope.launch {
        val token = requestToken()
        val post = createPost(token, item)
        processPost(post)
    }
}

然而偎漫,協(xié)程內(nèi)部實現(xiàn)不是使用普通回調(diào)的形式,而是使用狀態(tài)機來處理不同的掛起點有缆,大致的 CPS(Continuation Passing Style) 代碼為:

// 編譯后生成的內(nèi)部類大致如下
final class postItem$1 extends SuspendLambda ... {
    public final Object invokeSuspend(Object result) {
        ...
        switch (this.label) {
            case 0:
                this.label = 1;
                token = requestToken(this)
                break;
            case 1:
                this.label = 2;
                Token token = result;
                post = createPost(token, this.item, this)
                break;
            case 2:
                Post post = result;
                processPost(post)
                break;
        }
    }
}

上面代碼中每一個掛起點和初始掛起點對應(yīng)的 Continuation 都會轉(zhuǎn)化為一種狀態(tài)象踊,協(xié)程恢復只是跳轉(zhuǎn)到下一種狀態(tài)中。掛起函數(shù)將執(zhí)行過程分為多個 Continuation 片段棚壁,并且利用狀態(tài)機的方式保證各個片段是順序執(zhí)行的杯矩。

coroutine-continuation.png

1.1 掛起函數(shù)可能會掛起協(xié)程

掛起函數(shù)使用 CPS style 的代碼來掛起協(xié)程,保證掛起點后面的代碼只能在掛起函數(shù)執(zhí)行完后才能執(zhí)行袖外,所以掛起函數(shù)保證了協(xié)程內(nèi)的順序執(zhí)行順序史隆。

在多個協(xié)程的情況下,掛起函數(shù)的作用更加明顯:

fun postItem(item: Item) {
    GlobalScope.launch {
        // async { requestToken() } 新建一個協(xié)程曼验,可能在另一個線程運行
        // 但是 await() 是掛起函數(shù)逆害,當前協(xié)程執(zhí)行邏輯卡在第一個分支,第一種狀態(tài)蚣驼,當 async 的協(xié)程執(zhí)行完后恢復當前協(xié)程魄幕,才會切換到下一個分支
        val token = async { requestToken() }.await()
        // 在第二個分支狀態(tài)中,又新建一個協(xié)程颖杏,使用 await 掛起函數(shù)將之后代碼作為 Continuation 放倒下一個分支狀態(tài)纯陨,直到 async 協(xié)程執(zhí)行完
        val post = aync { createPost(token, item) }.await()
        // 最后一個分支狀態(tài),直接在當前協(xié)程處理
        processPost(post)
    }
}

上面的例子中,await()掛起函數(shù)掛起當前協(xié)程翼抠,直到異步協(xié)程完成執(zhí)行咙轩,但是這里并沒有阻塞線程,是使用狀態(tài)機的控制邏輯來實現(xiàn)阴颖。而且掛起函數(shù)可以保證掛起點之后的代碼一定在掛起點前代碼執(zhí)行完成后才會執(zhí)行活喊,掛起函數(shù)保證順序執(zhí)行,所以異步邏輯也可以用順序的代碼順序來編寫量愧。

注意掛起函數(shù)不一定會掛起協(xié)程钾菊,如果相關(guān)調(diào)用的結(jié)果已經(jīng)可用,庫可以決定繼續(xù)進行而不掛起偎肃,例如async { requestToken() }的返回值Deferred的結(jié)果已經(jīng)可用時煞烫,await()掛起函數(shù)可以直接返回結(jié)果,不用再掛起協(xié)程累颂。

1.2 掛起函數(shù)不會阻塞線程

掛起函數(shù)掛起協(xié)程滞详,并不會阻塞協(xié)程所在的線程,例如協(xié)程的delay()掛起函數(shù)會暫停協(xié)程一定時間紊馏,并不會阻塞協(xié)程所在線程料饥,但是Thread.sleep()函數(shù)會阻塞線程。

看下面一個例子朱监,兩個協(xié)程運行在同一線程上:

fun main(args: Array<String>) {
    // 創(chuàng)建一個單線程的協(xié)程調(diào)度器稀火,下面兩個協(xié)程都運行在這同一線程上
    val coroutineDispatcher = newSingleThreadContext("ctx")
    // 啟動協(xié)程 1
    GlobalScope.launch(coroutineDispatcher) {
        println("the first coroutine")
        delay(200)
        println("the first coroutine")
    }
    // 啟動協(xié)程 2
    GlobalScope.launch(coroutineDispatcher) {
        println("the second coroutine")
        delay(100)
        println("the second coroutine")
    }
    // 保證 main 線程存活,確保上面兩個協(xié)程運行完成
    Thread.sleep(500)
}

運行結(jié)果為:

the first coroutine
the second coroutine
the second coroutine
the first coroutine

從上面結(jié)果可以看出赌朋,當協(xié)程 1 暫停 200 ms 時,線程并沒有阻塞篇裁,而是執(zhí)行協(xié)程 2 的代碼沛慢,然后在 200 ms 時間到后,繼續(xù)執(zhí)行協(xié)程 1 的邏輯达布。所以掛起函數(shù)并不會阻塞線程团甲,這樣可以節(jié)省線程資源,協(xié)程掛起時黍聂,線程可以繼續(xù)執(zhí)行其他邏輯躺苦。

1.3 掛起函數(shù)恢復協(xié)程后運行在哪個線程

協(xié)程的所屬的線程調(diào)度在前一篇文章《協(xié)程簡介》中有提到過,主要是由協(xié)程的CoroutineDispatcher控制产还,CoroutineDispatcher可以指定協(xié)程運行在某一特定線程上匹厘、運作在線程池中或者不指定所運行的線程。所以協(xié)程調(diào)度器可以分為Confined dispatcherUnconfined dispatcher脐区,Dispatchers.Default愈诚、Dispatchers.IODispatchers.Main屬于Confined dispatcher,都指定了協(xié)程所運行的線程或線程池,掛起函數(shù)恢復后協(xié)程也是運行在指定的線程或線程池上的炕柔,而Dispatchers.Unconfined屬于Unconfined dispatcher酌泰,協(xié)程啟動并運行在 Caller Thread 上,但是只是在第一個掛起點之前是這樣的匕累,掛起恢復后運行在哪個線程完全由所調(diào)用的掛起函數(shù)決定陵刹。

fun main(args: Array<String>) = runBlocking<Unit> {
    launch { // 默認繼承 parent coroutine 的 CoroutineDispatcher,指定運行在 main 線程
        println("main runBlocking: I'm working in thread ${Thread.currentThread().name}")
        delay(100)
        println("main runBlocking: After delay in thread ${Thread.currentThread().name}")
    }
    launch(Dispatchers.Unconfined) {
        println("Unconfined      : I'm working in thread ${Thread.currentThread().name}")
        delay(100)
        println("Unconfined      : After delay in thread ${Thread.currentThread().name}")
    }
}

輸出如下:

Unconfined      : I'm working in thread main
main runBlocking: I'm working in thread main
Unconfined      : After delay in thread kotlinx.coroutines.DefaultExecutor
main runBlocking: After delay in thread main

上面第三行輸出欢嘿,經(jīng)過delay掛起函數(shù)后衰琐,使用Dispatchers.Unconfined的協(xié)程掛起恢復后依然在delay函數(shù)使用的DefaultExecutor上。

2. 協(xié)程深入解析

上面更多地是通過 demo 的方式說明掛起函數(shù)函數(shù)的一些特性际插,但是協(xié)程的創(chuàng)建碘耳、啟動、恢復框弛、線程調(diào)度辛辨、協(xié)程切換是如何實現(xiàn)的呢,還是不清楚瑟枫,下面結(jié)合源碼詳細地解析協(xié)程斗搞。

2.1 協(xié)程的創(chuàng)建與啟動

先從新建一個協(xié)程開始分析協(xié)程的創(chuàng)建,最常見的協(xié)程創(chuàng)建方式為CoroutineScope.launch {}慷妙,關(guān)鍵源碼如下:

public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job {
    ...
    coroutine.start(start, coroutine, block)
    return coroutine
}

coroutine.start(start, coroutine, block)默認情況下會走到startCoroutineCancellable僻焚,最終會調(diào)用到createCoroutineUnintercepted

/**
 * Creates unintercepted coroutine without receiver and with result type [T].
 * This function creates a new, fresh instance of suspendable computation every time it is invoked.
 *
 * To start executing the created coroutine, invoke `resume(Unit)` on the returned [Continuation] instance.
 * The [completion] continuation is invoked when coroutine completes with result or exception.
 ...
 */
 public actual fun <T> (suspend () -> T).createCoroutineUnintercepted(
    completion: Continuation<T>
): Continuation<Unit> { ... }

重點注意該方法的注釋膝擂,創(chuàng)建一個協(xié)程虑啤,創(chuàng)建了一個新的可掛起計算,通過調(diào)用resume(Unit)啟動該協(xié)程架馋。而且返回值為Continuation狞山,Continuation提供了resumeWith恢復協(xié)程的接口,用以實現(xiàn)協(xié)程恢復叉寂,Continuation封裝了協(xié)程的代碼運行邏輯和恢復接口萍启。

再看之前協(xié)程代碼編譯生成的內(nèi)部類final class postItem$1 extends SuspendLambda ...,協(xié)程的計算邏輯封裝在invokeSuspend方法中屏鳍,而SuspendLambda的繼承關(guān)系為 SuspendLambda -> ContinuationImpl -> BaseContinuationImpl -> Continuation勘纯,其中BaseContinuationImpl 部分關(guān)鍵源碼如下:

internal abstract class BaseContinuationImpl(...) {
    // 實現(xiàn) Continuation 的 resumeWith,并且是 final 的钓瞭,不可被重寫
    public final override fun resumeWith(result: Result<Any?>) {
        ...
        val outcome = invokeSuspend(param)
        ...
    }

    // 由編譯生成的協(xié)程相關(guān)類來實現(xiàn)驳遵,例如 postItem$1
    protected abstract fun invokeSuspend(result: Result<Any?>): Any?
}

而這部分與之前的分析也是吻合的,啟動協(xié)程流程是resume(Unit)->resumeWith()->invokeSuspend()山涡,協(xié)程的掛起通過suspend掛起函數(shù)實現(xiàn)超埋,協(xié)程的恢復通過Continuation.resumeWith實現(xiàn)搏讶。

2.2 協(xié)程的線程調(diào)度

協(xié)程的線程調(diào)度是通過攔截器實現(xiàn)的,前面提到了協(xié)程啟動調(diào)用到了startCoroutineCancellable霍殴,該方法實現(xiàn)為:

internal fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>) =
    createCoroutineUnintercepted(completion).intercepted().resumeCancellable(Unit)
// createCoroutineUnintercepted(completion) 會創(chuàng)建一個新的協(xié)程媒惕,返回值類型為 Continuation
// intercepted() 是給 Continuation 加上 ContinuationInterceptor 攔截器,也是線程調(diào)度的關(guān)鍵
// resumeCancellable(Unit) 最終將調(diào)用 resume(Unit) 啟動協(xié)程

再看intercepted()的具體實現(xiàn):

public actual fun <T> Continuation<T>.intercepted(): Continuation<T> =
    (this as? ContinuationImpl)?.intercepted() ?: this
// ContinuationImpl 是 SuspendLambda 的父類

internal abstract class ContinuationImpl(...) : BaseContinuationImpl(completion) {
    @Transient
    private var intercepted: Continuation<Any?>? = null

    public fun intercepted(): Continuation<Any?> =
        intercepted
            ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
                .also { intercepted = it }
    // intercepted() 方法關(guān)鍵是 context[ContinuationInterceptor]?.interceptContinuation(this)
    // context[ContinuationInterceptor] 就是協(xié)程的 CoroutineDispatcher
}

public abstract class CoroutineDispatcher :
    AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
    /**
     * Returns continuation that wraps the original [continuation], thus intercepting all resumptions.
     */
    public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
        DispatchedContinuation(this, continuation)
}

所以intercepted()最終會使用協(xié)程的CoroutineDispatcherinterceptContinuation方法包裝原來的 Continuation来庭,攔截所有的協(xié)程運行操作妒蔚。

DispatchedContinuation攔截了協(xié)程的啟動和恢復,分別是resumeCancellable(Unit)和重寫的resumeWith(Result)

internal class DispatchedContinuation<in T>(
    @JvmField val dispatcher: CoroutineDispatcher,
    @JvmField val continuation: Continuation<T>
) : Continuation<T> by continuation, DispatchedTask<T> {
    inline fun resumeCancellable(value: T) {
        // 判斷是否需要線程調(diào)度
        if (dispatcher.isDispatchNeeded(context)) {
            ...
            // 將協(xié)程的運算分發(fā)到另一個線程
            dispatcher.dispatch(context, this)
        } else {
            ...
            // 如果不需要調(diào)度月弛,直接在當前線程執(zhí)行協(xié)程運算
            resumeUndispatched(value)
        }
    }

    override fun resumeWith(result: Result<T>) {
        // 判斷是否需要線程調(diào)度
        if (dispatcher.isDispatchNeeded(context)) {
            ...
            // 將協(xié)程的運算分發(fā)到另一個線程
            dispatcher.dispatch(context, this)
        } else {
            ...
            // 如果不需要調(diào)度肴盏,直接在當前線程執(zhí)行協(xié)程運算
            continuation.resumeWith(result)
        }
    }
}

internal interface DispatchedTask<in T> : Runnable {
    public override fun run() {
        ...
        // 封裝了 continuation.resume 邏輯
    }
}

繼續(xù)跟蹤newSingleThreadContext()Dispatchers.IOdispatch方法的實現(xiàn)帽衙,發(fā)現(xiàn)其實都調(diào)用了Executor.execute(Runnable)方法菜皂,而Dispatchers.Unconfined的實現(xiàn)更簡單,關(guān)鍵在于isDispatchNeeded()返回為false厉萝。

2.3 協(xié)程的掛起和恢復

Kotlin 編譯器會生成繼承自SuspendLambda的子類恍飘,協(xié)程的真正運算邏輯都在invokeSuspend中。但是協(xié)程掛起的具體實現(xiàn)是如何呢谴垫?先看下面示例代碼:

fun main(args: Array<String>) = runBlocking<Unit> { // 新建并啟動 blocking 協(xié)程章母,運行在 main 線程上,等待所有子協(xié)程運行完成后才會結(jié)束
    launch(Dispatchers.Unconfined) { // 新建并啟動 launch 協(xié)程翩剪,沒有指定所運行線程乳怎,一開始運行在調(diào)用者所在的 main 線程上
        println("${Thread.currentThread().name} : launch start")
        async(Dispatchers.Default) { // 新建并啟動 async 協(xié)程,運行在 Dispatchers.Default 的線程池中
            println("${Thread.currentThread().name} : async start")
            delay(100)  // 掛起 async 協(xié)程 100 ms
            println("${Thread.currentThread().name} : async end")
        }.await() // 掛起 launch 協(xié)程前弯,直到 async 協(xié)程結(jié)束
        println("${Thread.currentThread().name} : launch end")
    }
}

其中 launch 協(xié)程編譯生成的 SuspendLambda 子類的invokeSuspend方法如下:

public final Object invokeSuspend(@NotNull Object result) {
    Object coroutine_suspended = IntrinsicsKt.getCOROUTINE_SUSPENDED();
    switch (this.label) {
        case 0:
            ...
            System.out.println(stringBuilder.append(currentThread.getName()).append(" : launch start").toString());
            // 新建并啟動 async 協(xié)程 
            Deferred async$default = BuildersKt.async$default(coroutineScope, (CoroutineContext) Dispatchers.getDefault(), null, (Function2) new 1(null), 2, null);
            this.label = 1;
            // 調(diào)用 await() 掛起函數(shù)
            if (async$default.await(this) == coroutine_suspended) {
                return coroutine_suspended;
            }
            break;
        case 1:
            if (result instanceof Failure) {
                throw ((Failure) result).exception;
            }
            // 恢復協(xié)程后再執(zhí)行一次 resumeWith()蚪缀,然后無異常的話執(zhí)行最后的 println()
            break;
        default:
            throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
    }
    ...
    System.out.println(stringBuilder2.append(currentThread2.getName()).append(" : launch end").toString());
    return Unit.INSTANCE;
}

上面代碼中 launch 協(xié)程掛起的關(guān)鍵在于async$default.await(this) == coroutine_suspended,如果此時 async 線程未執(zhí)行完成恕出,await()返回為IntrinsicsKt.getCOROUTINE_SUSPENDED()询枚,就會 return,launch 協(xié)程的invokeSuspend方法執(zhí)行完成剃根,協(xié)程所在線程繼續(xù)往下運行,此時 launch 線程處于掛起狀態(tài)前方。所以協(xié)程掛起就是協(xié)程掛起點之前邏輯執(zhí)行完成狈醉,協(xié)程的運算關(guān)鍵方法resumeWith()執(zhí)行完成,線程繼續(xù)執(zhí)行往下執(zhí)行其他邏輯惠险。

協(xié)程掛起有三點需要注意的:

  • 啟動其他協(xié)程并不會掛起當前協(xié)程苗傅,所以launchasync啟動線程時,除非新協(xié)程運行在當前線程班巩,則當前協(xié)程只能在新協(xié)程運行完成后繼續(xù)執(zhí)行渣慕,否則當前協(xié)程都會馬上繼續(xù)運行嘶炭。

  • 協(xié)程掛起并不會阻塞線程,因為協(xié)程掛起時相當于執(zhí)行完協(xié)程的方法逊桦,線程繼續(xù)執(zhí)行其他之后的邏輯眨猎。

  • 掛起函數(shù)并一定都會掛起協(xié)程,例如await()掛起函數(shù)如果返回值不等于IntrinsicsKt.getCOROUTINE_SUSPENDED()强经,則協(xié)程繼續(xù)執(zhí)行掛起點之后邏輯睡陪。

下面繼續(xù)分析await()的實現(xiàn)原理,它的實現(xiàn)中關(guān)鍵是調(diào)用了JobSupport.awaitSuspend()方法:

private suspend fun awaitSuspend(): Any? = suspendCoroutineUninterceptedOrReturn { uCont ->
    /*
        * Custom code here, so that parent coroutine that is using await
        * on its child deferred (async) coroutine would throw the exception that this child had
        * thrown and not a JobCancellationException.
        */
    val cont = AwaitContinuation(uCont.intercepted(), this)
    cont.initCancellability()
    invokeOnCompletion(ResumeAwaitOnCompletion(this, cont).asHandler)
    cont.getResult()
}

private class ResumeAwaitOnCompletion<T>(
    job: JobSupport,
    private val continuation: AbstractContinuation<T>
) : JobNode<JobSupport>(job) {
    override fun invoke(cause: Throwable?) {
        val state = job.state
        check(state !is Incomplete)
        if (state is CompletedExceptionally) {
            // Resume with exception in atomic way to preserve exception
            continuation.resumeWithExceptionMode(state.cause, MODE_ATOMIC_DEFAULT)
        } else {
            // Resuming with value in a cancellable way (AwaitContinuation is configured for this mode).
            @Suppress("UNCHECKED_CAST")
            continuation.resume(state as T)
        }
    }
    override fun toString() = "ResumeAwaitOnCompletion[$continuation]"
}

上面源碼中ResumeAwaitOnCompletioninvoke方法的邏輯就是調(diào)用continuation.resume(state as T)恢復協(xié)程匿情。invokeOnCompletion函數(shù)里面是如何實現(xiàn) async 協(xié)程完成后自動恢復之前協(xié)程的呢兰迫,源碼實現(xiàn)有些復雜,因為很多邊界情況處理就不全部展開炬称,其中最關(guān)鍵的邏輯如下:

// handler 就是 ResumeAwaitOnCompletion 的實例汁果,將 handler 作為節(jié)點
val node = nodeCache ?: makeNode(handler, onCancelling).also { nodeCache = it }
// 將 node 節(jié)點添加到 state.list 中
if (!addLastAtomic(state, list, node)) return@loopOnState // retry

接下來我斷點調(diào)試 launch 協(xié)程恢復的過程,從 async 協(xié)程的SuspendLambda的子類的completion.resumeWith(outcome) -> AbstractCoroutine.resumeWith(result) ..-> JobSupport.tryFinalizeSimpleState() -> JobSupport.completeStateFinalization() -> state.list?.notifyCompletion(cause) -> node.invoke玲躯,最后 handler 節(jié)點里面通過調(diào)用resume(result)恢復協(xié)程据德。

所以await()掛起函數(shù)恢復協(xié)程的原理是,將 launch 協(xié)程封裝為 ResumeAwaitOnCompletion 作為 handler 節(jié)點添加到 aynsc 協(xié)程的 state.list府蔗,然后在 async 協(xié)程完成時會通知 handler 節(jié)點調(diào)用 launch 協(xié)程的 resume(result) 方法將結(jié)果傳給 launch 協(xié)程晋控,并恢復 launch 協(xié)程繼續(xù)執(zhí)行 await 掛起點之后的邏輯。

而這過程中有兩個finalresumeWith 方法姓赤,一個是SuspendLambda的父類BaseContinuationImpl的赡译,我們再來詳細分析一篇:

internal abstract class BaseContinuationImpl(
    public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {
    public final override fun resumeWith(result: Result<Any?>) {
        ...
        var param = result
        while (true) {
            with(current) {
                val completion = completion!!
                val outcome: Result<Any?> =
                    try {
                        // 調(diào)用 invokeSuspend 方法執(zhí)行,執(zhí)行協(xié)程的真正運算邏輯
                        val outcome = invokeSuspend(param)
                        // 協(xié)程掛起時 invokeSuspend 才會返回 COROUTINE_SUSPENDED不铆,所以協(xié)程掛起時蝌焚,其實只是協(xié)程的 resumeWith 運行邏輯執(zhí)行完成,再次調(diào)用 resumeWith 時誓斥,協(xié)程掛起點之后的邏輯才能繼續(xù)執(zhí)行
                        if (outcome === COROUTINE_SUSPENDED) return
                        Result.success(outcome)
                    } catch (exception: Throwable) {
                        Result.failure(exception)
                    }
                releaseIntercepted() // this state machine instance is terminating
                // 這里可以看出 Continuation 其實分為兩類只洒,一種是 BaseContinuationImpl,封裝了協(xié)程的真正運算邏輯
                if (completion is BaseContinuationImpl) {
                    // unrolling recursion via loop
                    current = completion
                    param = outcome
                } else {
                    // 斷點時發(fā)現(xiàn) completion 是 DeferredCoroutine 實例劳坑,這里實際調(diào)用的是其父類 AbstractCoroutine 的 resumeWith 方法
                    completion.resumeWith(outcome)
                    return
                }
            }
        }
    }
}

接下來再來看另外一類 Continuation毕谴,AbstractCoroutine 的resumeWith實現(xiàn):

public abstract class AbstractCoroutine<in T>(
    @JvmField
    protected val parentContext: CoroutineContext,
    active: Boolean = true
) : JobSupport(active), Job, Continuation<T>, CoroutineScope {
    /**
     * Completes execution of this with coroutine with the specified result.
     */
    public final override fun resumeWith(result: Result<T>) {
        // makeCompletingOnce 大致實現(xiàn)是修改協(xié)程狀態(tài),如果需要的話還會將結(jié)果返回給調(diào)用者協(xié)程距芬,并恢復調(diào)用者協(xié)程
        makeCompletingOnce(result.toState(), defaultResumeMode)
    }
}

所以其中一類 Continuation BaseContinuationImplresumeWith封裝了協(xié)程的運算邏輯涝开,用以協(xié)程的啟動和恢復;而另一類 Continuation AbstractCoroutine框仔,主要是負責維護協(xié)程的狀態(tài)和管理舀武,它的resumeWith則是完成協(xié)程,恢復調(diào)用者協(xié)程离斩。

2.4 協(xié)程的三層包裝

通過一步步的分析银舱,慢慢發(fā)現(xiàn)協(xié)程其實有三層包裝瘪匿。常用的launchasync返回的JobDeferred寻馏,里面封裝了協(xié)程狀態(tài)棋弥,提供了取消協(xié)程接口,而它們的實例都是繼承自AbstractCoroutine操软,它是協(xié)程的第一層包裝嘁锯。第二層包裝是編譯器生成的SuspendLambda的子類,封裝了協(xié)程的真正運算邏輯聂薪,繼承自BaseContinuationImpl家乘,其中completion屬性就是協(xié)程的第一層包裝。第三層包裝是前面分析協(xié)程的線程調(diào)度時提到的DispatchedContinuation藏澳,封裝了線程調(diào)度邏輯仁锯,包含了協(xié)程的第二層包裝。三層包裝都實現(xiàn)了Continuation接口翔悠,通過代理模式將協(xié)程的各層包裝組合在一起业崖,每層負責不同的功能。

下面是協(xié)程運行的流程圖:

coroutine_flow.jpg

3. 小結(jié)

經(jīng)過以上解析之后蓄愁,再來看協(xié)程就是一段可以掛起和恢復執(zhí)行的運算邏輯双炕,而協(xié)程的掛起是通過掛起函數(shù)實現(xiàn)的,掛起函數(shù)用狀態(tài)機的方式用掛起點將協(xié)程的運算邏輯拆分為不同的片段撮抓,每次運行協(xié)程執(zhí)行的不同的邏輯片段妇斤。所以協(xié)程有兩個很大的好處:一是簡化異步編程,支持異步返回丹拯;而是掛起不阻塞線程站超,提供線程利用率。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末乖酬,一起剝皮案震驚了整個濱河市死相,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌咬像,老刑警劉巖算撮,帶你破解...
    沈念sama閱讀 218,682評論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異县昂,居然都是意外死亡肮柜,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,277評論 3 395
  • 文/潘曉璐 我一進店門七芭,熙熙樓的掌柜王于貴愁眉苦臉地迎上來素挽,“玉大人蔑赘,你說我怎么就攤上這事狸驳≡っ鳎” “怎么了?”我有些...
    開封第一講書人閱讀 165,083評論 0 355
  • 文/不壞的土叔 我叫張陵耙箍,是天一觀的道長撰糠。 經(jīng)常有香客問我,道長辩昆,這世上最難降的妖魔是什么阅酪? 我笑而不...
    開封第一講書人閱讀 58,763評論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮汁针,結(jié)果婚禮上术辐,老公的妹妹穿的比我還像新娘。我一直安慰自己施无,他們只是感情好辉词,可當我...
    茶點故事閱讀 67,785評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著猾骡,像睡著了一般瑞躺。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上兴想,一...
    開封第一講書人閱讀 51,624評論 1 305
  • 那天幢哨,我揣著相機與錄音,去河邊找鬼嫂便。 笑死捞镰,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的顽悼。 我是一名探鬼主播曼振,決...
    沈念sama閱讀 40,358評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼蔚龙!你這毒婦竟也來了冰评?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,261評論 0 276
  • 序言:老撾萬榮一對情侶失蹤木羹,失蹤者是張志新(化名)和其女友劉穎甲雅,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體坑填,經(jīng)...
    沈念sama閱讀 45,722評論 1 315
  • 正文 獨居荒郊野嶺守林人離奇死亡抛人,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,900評論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了脐瑰。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片妖枚。...
    茶點故事閱讀 40,030評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖苍在,靈堂內(nèi)的尸體忽然破棺而出绝页,到底是詐尸還是另有隱情荠商,我是刑警寧澤,帶...
    沈念sama閱讀 35,737評論 5 346
  • 正文 年R本政府宣布续誉,位于F島的核電站莱没,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏酷鸦。R本人自食惡果不足惜饰躲,卻給世界環(huán)境...
    茶點故事閱讀 41,360評論 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望臼隔。 院中可真熱鬧嘹裂,春花似錦、人聲如沸摔握。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,941評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽盒发。三九已至例嘱,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間宁舰,已是汗流浹背拼卵。 一陣腳步聲響...
    開封第一講書人閱讀 33,057評論 1 270
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留蛮艰,地道東北人腋腮。 一個月前我還...
    沈念sama閱讀 48,237評論 3 371
  • 正文 我出身青樓,卻偏偏與公主長得像壤蚜,于是被迫代替她去往敵國和親即寡。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 44,976評論 2 355