只需一篇文章:【Kotlin - 協(xié)程】原來這么簡(jiǎn)單匙睹!

這一篇我們好好聊一聊協(xié)程的原理愚屁,通過上一篇的學(xué)習(xí)济竹,相信大家對(duì)于如何使用協(xié)程已經(jīng)非常熟悉了痕檬。

故事還得從上次的協(xié)程分享開始,由于大家對(duì)協(xié)程的實(shí)踐并不多送浊,所以大家對(duì)下面的這段代碼如何執(zhí)行爭(zhēng)論不休:

GlobalScope.launch {
    val a = async {
        1+2
    }

    val b = async {
        1+3
    }

    val c = a + b
    Log.e(TAG,"result:$c")
}
復(fù)制代碼

有人說梦谜,a 和 b 會(huì)串行執(zhí)行,有人說袭景,a 和 b 會(huì)并行執(zhí)行唁桩,那么執(zhí)行的結(jié)果到底是什么樣的?我們將在下面的文章給出耸棒。

一荒澡、結(jié)構(gòu)簡(jiǎn)要介紹

首先,我們得明確協(xié)程中有哪些東西与殃,如果你會(huì)使用協(xié)程单山,那你肯定知道協(xié)程中有 CoroutineScopeCoroutineContextCoroutineDispatcher幅疼,這些都是使用過程中我們可以接觸到的 API米奸。

我簡(jiǎn)單的整理了協(xié)程中主要的基礎(chǔ)類:

類結(jié)構(gòu)

協(xié)程的類結(jié)構(gòu)可分為三部分:CoroutineScopeCoroutineContextContinuation爽篷。

1. Continuation

如果你會(huì)使用協(xié)程悴晰,那你肯定知道,協(xié)程遇到耗時(shí) suspend 操作可以掛起逐工,等到任務(wù)結(jié)束的時(shí)候铡溪,協(xié)程會(huì)自動(dòng)切回來。

它的奧秘就是 Continuation泪喊,Continuation 可以理解程續(xù)體棕硫,你可以理解其每次在協(xié)程掛起點(diǎn)將剩余的代碼包括起來,等到結(jié)束以后執(zhí)行剩余的內(nèi)容窘俺。一個(gè)協(xié)程的代碼塊可能會(huì)被切割成若干個(gè) Continuation饲帅,在每個(gè)需要掛起的地方都會(huì)分配一個(gè) Continuation

先拋出一些結(jié)論瘤泪,協(xié)程在做耗時(shí)操作的時(shí)候灶泵,如果執(zhí)行了耗時(shí) suspend 操作,會(huì)自動(dòng)掛起对途,但是這個(gè)耗時(shí)操作終究是要做的赦邻,只不過切換到其他線程去做了,做完以后協(xié)程就需要切回來实檀,但是切到哪兒呢惶洲?這便是 Continuation 需要解決的問題按声。

Continuation 的流程是這樣的:

Continuation流程

<figcaption></figcaption>

無論是使用 launch 還是 async 啟動(dòng)的協(xié)程,都會(huì)有一個(gè)結(jié)束的時(shí)候用來回調(diào)的 continuation恬吕。

2. CoroutineScope

關(guān)于 CoroutineScope 沒有特別多要說的签则,它持有了 CoroutineContext,主要對(duì)協(xié)程的生命周期進(jìn)行管理铐料。

3. CoroutineContext

一開始看 CoroutineContext 覺得特別暈渐裂,不明白為啥要這么設(shè)計(jì),看了 Bennyhuo 大佬的文章以后才稍微好轉(zhuǎn)钠惩。

從上面協(xié)程的類的機(jī)構(gòu)中可以看出柒凉,光看這個(gè) CoroutineContext 這個(gè)接口(源碼內(nèi)容我們下面講),會(huì)發(fā)現(xiàn)它有點(diǎn)像 List 集合篓跛,而繼承自 CoroutineContext 接口的 Element 接口則定義了其中的元素膝捞。

隨后,這個(gè) Element 接口被劃分成了兩種類愧沟,JobContinuationInterceptor

  • Job:從字面上來講蔬咬,它代表一個(gè)任務(wù),Thread 也是執(zhí)行任務(wù)央渣,所以我們可以理解它定義了協(xié)程的一些東西计盒,比如協(xié)程的狀態(tài),協(xié)程和子協(xié)程的管理方式等等芽丹。
  • ContinuationInterceptor:也從字面上來看北启,它是 Continuation 的攔截器,通過攔截 Continuation拔第,完成我們想要完成的工作咕村,比如說線程的切換。

二蚊俺、結(jié)構(gòu)源碼分析

上面我們從概念上介紹了協(xié)程的三大件懈涛,在這部分,我們從源碼分析泳猬。

1. Continuation

suspend 修飾的方法會(huì)在在編譯期間被編譯器做特殊處理批钠,這種處理被成為CPS(續(xù)體轉(zhuǎn)換風(fēng)格) 轉(zhuǎn)化suspend 方法會(huì)被包裹成 Continuation得封。

說了這么久的 Continuation埋心,我們還沒有見過接口代碼,由于接口內(nèi)容不多忙上,我就把所有的內(nèi)容貼出來了:

/**
 * Interface representing a continuation after a suspension point that returns a value of type `T`.
 */
@SinceKotlin("1.3")
public interface Continuation<in T> {
    /**
     * The context of the coroutine that corresponds to this continuation.
     */
    public val context: CoroutineContext

    /**
     * Resumes the execution of the corresponding coroutine passing a successful or failed [result] as the
     * return value of the last suspension point.
     */
    public fun resumeWith(result: Result<T>)
}
復(fù)制代碼

我們重點(diǎn)關(guān)注Continuation#resumeWith()方法拷呆,從注釋來看,通過返回 suspend 掛起點(diǎn)的值來恢復(fù)協(xié)程的執(zhí)行,協(xié)程可以從參數(shù) Result<T>) 獲取成功的值或者失敗的結(jié)果茬斧,如果沒有結(jié)果腰懂,那么 Result<T> 的泛型是 UnitResulut 這個(gè)類也特別簡(jiǎn)單项秉,感興趣的同學(xué)可以查看源碼绣溜。

BaseContinuationImpl 實(shí)現(xiàn)了 Continuation 接口,我們看一下 Continuation#resumeWith 方法的實(shí)現(xiàn):

internal abstract class BaseContinuationImpl(
    // 完成后調(diào)用的 Continuation
    public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {

    public final override fun resumeWith(result: Result<Any?>) {
        // This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
        var current = this
        var param = result
        while (true) {
            probeCoroutineResumed(current)
            with(current) {
                val completion = completion!! // fail fast when trying to resume continuation without completion
                val outcome: Result<Any?> =
                    try {
                        // 1\. 執(zhí)行 suspend 中的代碼塊
                        val outcome = invokeSuspend(param)
                        // 2\. 如果代碼掛起就提前返回
                        if (outcome === COROUTINE_SUSPENDED) return
                        // 3\. 返回結(jié)果
                        Result.success(outcome)
                    } catch (exception: Throwable) {
                        // 3\. 返回失敗結(jié)果
                        Result.failure(exception)
                    }
                releaseIntercepted() // this state machine instance is terminating
                if (completion is BaseContinuationImpl) {
                    // 4\. 如果 completion 中還有子 completion伙狐,遞歸
                    current = completion
                    param = outcome
                } else {
                    // 5\. 結(jié)果通知
                    completion.resumeWith(outcome)
                    return
                }
            }
        }
    }
}
復(fù)制代碼

主要的過程我在注釋中已經(jīng)標(biāo)注出來了涮毫,我來解釋一下 Continuation 的機(jī)制。

每個(gè) suspend 方法生成的 BaseContinuationImpl 贷屎,其構(gòu)造方法有一個(gè)參數(shù)叫 completion ,它也是一個(gè) Continuation艘虎,它的調(diào)用時(shí)機(jī)是在 suspen 方法執(zhí)行完畢的時(shí)候唉侄。我們后面稱

Job流程

這個(gè)流程展示給我們的內(nèi)容很直觀了,簡(jiǎn)單起見野建,我們直接看3属划、4和5這一個(gè) launch 啟動(dòng)流程就好,通常一個(gè) launch 生成一個(gè)外層 Continuation一個(gè)相應(yīng)的結(jié)果 Continuation候生,我們后面稱結(jié)果 continuationcomplete同眯,Continuation 調(diào)用順序是:

  1. 調(diào)用外層 Continuation 中的 Continuation#resumeWith() 方法。
  2. 該方法會(huì)去執(zhí)行 launch 包裹的代碼塊唯鸭,并返回一個(gè)結(jié)果须蜗。
  3. 將上述代碼塊執(zhí)行的結(jié)果交給 completion,由它完成協(xié)程結(jié)束的通知目溉。

上述的過程只存在于一個(gè) launch 并且里面沒有執(zhí)行其他耗時(shí)的掛起操作明肮,對(duì)于這些情況,我們將會(huì)在下面的文章討論缭付。

拋出問題一: 可以看到柿估,在注釋2,遇到耗時(shí)的 suspend陷猫,返回的結(jié)果是一個(gè) COROUTINE_SUSPENDED秫舌,后面會(huì)直接返回,耗時(shí)操作結(jié)束的時(shí)候绣檬,我們的 completion 怎么恢復(fù)呢足陨?

2. CoroutineContext 和 Element

在概要分析的時(shí)候,我們說 CoroutineContext 的結(jié)構(gòu)像一個(gè)集合河咽,是從它的接口得出結(jié)論的:

public interface CoroutineContext {
    // get 方法钠右,通過 key 獲取
    public operator fun <E : Element> get(key: Key<E>): E?
    // 累加操作
    public fun <R> fold(initial: R, operation: (R, Element) -> R): R
    // 操作符 + , 實(shí)際的實(shí)現(xiàn)調(diào)用了 fold 方法
    public operator fun plus(context: CoroutineContext): CoroutineContext
    // 移除操作
    public fun minusKey(key: Key<*>): CoroutineContext

    // CoroutineContext 定義的 Key
    public interface Key<E : Element>

    // CoroutineContext 中元素的定義
    public interface Element : CoroutineContext {
        // key
        public val key: Key<*>
        //...
    }
}
復(fù)制代碼

從中我們可以大致看出,CoroutineContext 中可以通過 Key 來獲取元素 Element忘蟹,并且 Element 接口也是繼承自 CoroutineContext 接口飒房。

除此以外搁凸,CoroutineContext 支持增加和移除操作,并且支持 + 操作符來完成增加狠毯。+ 操作符即 plus 方法是有具體實(shí)現(xiàn)的护糖,感興趣的可以自己看一下,主要涉及到了攔截器 ContinuationInterceptor 的添加嚼松。

1.1 Job

Job 的注釋中闡述定義是這樣的:

A background job. Conceptually, a job is a cancellable thing with a life-cycle that culminates in its completion.

從中我們可以得出:

  1. 后臺(tái)任務(wù)
  2. 可取消
  3. 生命周期在完成它的時(shí)候結(jié)束

從后臺(tái)任務(wù)的角度來看嫡良,Job 聽著有點(diǎn)像 Thread,和 Thread 一樣献酗,Job 也有各種狀態(tài)寝受,文檔中對(duì) Job 各種狀態(tài)的注釋(感覺大佬們的注釋寫的真棒~):

注釋

Job 另一個(gè)值得關(guān)注的點(diǎn)是對(duì)子 Job 的管理,主要的規(guī)則如下:

  1. Job都會(huì)結(jié)束的時(shí)候罕偎,父 Job 才會(huì)結(jié)束
  2. Job 取消的時(shí)候很澄,子 Job 也會(huì)取消

上述的一些內(nèi)容都可以從 Job 的接口文檔中得出。那么颜及,Job哪里來的甩苛?如果你看一下CoroutineScope#launch方法,你就會(huì)得出結(jié)論俏站,該方法的返回類型就是 Job讯蒲,我們每次調(diào)用該方法,都會(huì)創(chuàng)建一個(gè) Job肄扎。

1.2 ContinuationInterceptor

顧名思義墨林,Continuation 攔截器,先看接口:

interface ContinuationInterceptor : CoroutineContext.Element {
    // ContinuationInterceptor 在 CoroutineContext 中的 Key
    companion object Key : CoroutineContext.Key<ContinuationInterceptor>
    /**
     * 攔截 continuation
     */
    fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T>

    //...
}
復(fù)制代碼

這個(gè)接口可以提煉的就這兩個(gè)信息:

  1. 攔截器的 Key反浓,也就是說萌丈,無論你后面一個(gè) CoroutineContext 放了多少個(gè)攔截器,KeyContinuationInterceptor 的攔截器只能有一個(gè)雷则。
  2. 我們都知道辆雾,Continuation 在調(diào)用其 Continuation#resumeWith() 方法,會(huì)執(zhí)行其 suspend 修飾的函數(shù)的代碼塊月劈,如果我們提前攔截到度迂,是不是可以做點(diǎn)其他事情,比如說切換線程猜揪,這也是 ContinuationInterceptor 的作用之一惭墓。

需要說明一下,我們通過 Dispatchers 來指定協(xié)程發(fā)生的線程而姐,Dispatchers 實(shí)現(xiàn)了 ContinuationInterceptor接口腊凶。

3. CoroutineScope

CoroutineScope 的接口很簡(jiǎn)單:

public interface CoroutineScope {
    public val coroutineContext: CoroutineContext
}
復(fù)制代碼

它要求后續(xù)的實(shí)現(xiàn)都要提供 CoroutineContext,不過我們都知道,CoroutineContext 是協(xié)程中很重要的東西钧萍,既包括 Job褐缠,也包括調(diào)度器。

在上面的代碼中风瘦,我多次使用了 Android Jetpack 中的 Lifecycle 中協(xié)程的擴(kuò)展庫队魏,好處我們獲取 CoroutineScope 更加簡(jiǎn)單,無需在組件 onDestroy 的時(shí)候手動(dòng) cancel万搔,并且它的源碼超級(jí)簡(jiǎn)單胡桨,前提是你會(huì)使用 Lifecycle

internal class LifecycleCoroutineScopeImpl(
    override val lifecycle: Lifecycle,
    override val coroutineContext: CoroutineContext
) : LifecycleCoroutineScope(), LifecycleEventObserver {
    // ...

    override fun onStateChanged(source: LifecycleOwner, event: Lifecycle.Event) {
        if (lifecycle.currentState <= Lifecycle.State.DESTROYED) {
            lifecycle.removeObserver(this)
            coroutineContext.cancel()
        }
    }
}
復(fù)制代碼

并且它也支持你在指定的生命周期調(diào)用協(xié)程,大家看一下接口就明白了瞬雹。

三昧谊、過程源碼分析

先上一段使用代碼:

lifecycleScope.launch(Dispatchers.Main) {
    val a = async { getResult(1, 2) }
    val b = async { getResult(3, 5) }

    val c = a.await() + b.await()
    Log.e(TAG, "result:$c")
} 

suspend fun getResult(a: Int, b: Int): Int {
    return withContext(Dispatchers.IO) {
        delay(1000)
        return@withContext a + b
    }
}
復(fù)制代碼

雖然代碼很簡(jiǎn)單,但是源碼還是比較復(fù)雜的挖炬,我們分步講揽浙。

第一步 獲取 CoroutineScope

我已經(jīng)在上面說明了,我們使用的 Lifecycle 的協(xié)程拓展庫意敛,如果我們不使用拓展庫,就得使用 MainScope膛虫,它們的 CoroutineContext 都是一樣的:

public fun MainScope(): CoroutineScope = ContextScope(SupervisorJob() + Dispatchers.Main)

// LifecycleCoroutineScope
val Lifecycle.coroutineScope: LifecycleCoroutineScope
    get() {
        while (true) {
            // ...
            val newScope = LifecycleCoroutineScopeImpl(
                this,
                SupervisorJob() + Dispatchers.Main.immediate
            )
            // ...
            return newScope
        }
    }
復(fù)制代碼

顯而易見草姻,MainScopeLifecycleCoroutineScope 都使用了 SupervisorJob() + Dispatchers.Main, 作為它們的 CoroutineContext稍刀。

說明一下撩独,SupervisorJobDispatchers.Main 很重要,它們分別代表了CoroutineContext 之前提及的 JobContinuationInterceptor账月,后面用到的時(shí)候再分析综膀。

第二步 啟動(dòng)協(xié)程

直接進(jìn)入 CoroutineScope#launch() 方法:

public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job {
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        LazyStandaloneCoroutine(newContext, block) else
        StandaloneCoroutine(newContext, active = true)
    coroutine.start(start, coroutine, block)
    return coroutine
}
復(fù)制代碼

上面的方法一共有三個(gè)參數(shù),前兩個(gè)不作過多介紹局齿,第三個(gè)參數(shù):

block: suspend CoroutineScope.() -> Unit)
復(fù)制代碼

這是一個(gè)方法剧劝,是一個(gè) lambda 參數(shù),同時(shí)也表明了它需要被 suspend 修飾抓歼。 繼續(xù)看 launch 方法讥此,發(fā)現(xiàn)它主要做了兩件事:

  1. 組合新的 CoroutineContext
  2. 再創(chuàng)建一個(gè) Continuation

組合新的CoroutineContext

在第一行代碼 val newContext = newCoroutineContext(context) 做了第一件事,這里的 newCoroutineContext(context) 是一個(gè)擴(kuò)展方法:

public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext {
    val combined = coroutineContext + context
    val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined
    return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null)
        debug + Dispatchers.Default else debug
}
復(fù)制代碼

CoroutineScope 使用本身的 coroutineContext 集合谣妻,利用 + 操作符將我們?cè)?launch 方法中提供的 coroutineContext 添加進(jìn)來萄喳。

再創(chuàng)建一個(gè)Continuation

回到上一段代碼,通常我們不會(huì)指定 start 參數(shù)蹋半,所以它會(huì)使用默認(rèn)的 CoroutineStart.DEFAULT他巨,最終 coroutine 會(huì)得到一個(gè) StandaloneCoroutine

StandaloneCoroutine 實(shí)現(xiàn)自 AbstractCoroutine,翻開上面的類圖染突,你會(huì)發(fā)現(xiàn)捻爷,它實(shí)現(xiàn)了 ContinuationJobCoroutineScope 等一堆接口觉痛。需要說明一下役衡,這個(gè) StandaloneCoroutine 其實(shí)是我們當(dāng)前 Suspend Continationcomplete

接著會(huì)調(diào)用

coroutine.start(start, coroutine, block)
復(fù)制代碼

這就表明協(xié)程開始啟動(dòng)了薪棒。

第三步 start

進(jìn)入到 AbstractCoroutine#start 方法:

public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
    initParentJob()
    start(block, receiver, this)
}
復(fù)制代碼

跳過層層嵌套手蝎,最后到達(dá)了:

internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) =
    runSafely(completion) {
        // 外面再包一層 Coroutine
        createCoroutineUnintercepted(receiver, completion)
            // 如果需要,做攔截處理
            .intercepted()
            // 調(diào)用 resumeWith 方法      
            .resumeCancellableWith(Result.success(Unit))
    }
復(fù)制代碼

雖然這僅僅是一個(gè)函數(shù)俐芯,但是后面主要的邏輯都揭露了:

  1. 創(chuàng)建一個(gè)沒有攔截過的 Continuation棵介。
  2. 攔截 Continuation
  3. 執(zhí)行 Continuation#resumeWith 方法吧史。

第四步 又創(chuàng)建 Continuation

我這里用了 邮辽,因?yàn)槲覀冊(cè)?launch 中已經(jīng)創(chuàng)建了一個(gè) AbstractContinuaion,不過它是一個(gè) complete贸营,從各個(gè)函數(shù)的行參就可以看出來吨述。

不過我們 suspend 修飾的外層 Continuation 還沒有創(chuàng)建,它來了钞脂,是 SuspendLambda揣云,它繼承自 ContinuationImpl,如果你問我為什么源碼中沒找到具體實(shí)現(xiàn)冰啃,我覺得可能跟 suspend 修飾符有關(guān)邓夕,由編譯器處理,但是調(diào)用棧確實(shí)是這樣的:

調(diào)用棧

看一下 SuspendLambda 類的實(shí)現(xiàn):

internal abstract class SuspendLambda(
    public override val arity: Int,
    completion: Continuation<Any?>?
) : ContinuationImpl(completion), FunctionBase<Any?>, SuspendFunction {
    constructor(arity: Int) : this(arity, null)
    //...
}
復(fù)制代碼

可以看到阎毅,它的構(gòu)造方法的形參就包括一個(gè) complete焚刚。

第五步 攔截處理

回到:

internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) =
    runSafely(completion) {
        // 外面再包一層 Coroutine
        createCoroutineUnintercepted(receiver, completion)
            // 如果需要,做攔截處理
            .intercepted()
            // 調(diào)用 resumeWith 方法      
            .resumeCancellableWith(Result.success(Unit))
    }
復(fù)制代碼

里面的攔截方法 Continuation#intercepted() 方法是一個(gè)擴(kuò)展方法:

@SinceKotlin("1.3")
public actual fun <T> Continuation<T>.intercepted(): Continuation<T> =
    (this as? ContinuationImpl)?.intercepted() ?: this
復(fù)制代碼

createCoroutineUnintercepted(receiver, completion) 返回的是一個(gè) SuspendLambda扇调,所以它肯定是一個(gè) ContinuationImpl矿咕,看一下它的攔截方法的實(shí)現(xiàn):

internal abstract class ContinuationImpl(
    completion: Continuation<Any?>?,
    private val _context: CoroutineContext?
) : BaseContinuationImpl(completion) {
    constructor(completion: Continuation<Any?>?) : this(completion, completion?.context)

    public override val context: CoroutineContext
        get() = _context!!

    public fun intercepted(): Continuation<Any?> =
        intercepted
            ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
                .also { intercepted = it }
    // ...
}
復(fù)制代碼

ContinuationImpl#intercepted()方法中,直接利用 context 這個(gè)數(shù)據(jù)結(jié)構(gòu)通過 context[ContinuationInterceptor] 獲取攔截器肃拜。

CoroutineDispatcher攔截實(shí)現(xiàn)

我們都知道 ContinuationInterceptor 具有攔截作用痴腌,它的直接實(shí)現(xiàn)是 CoroutineDispatcher 這個(gè)抽象類,所有其他調(diào)度器都直接或者間接繼承這個(gè)類燃领,我們關(guān)注一下它的攔截方法:

public abstract class CoroutineDispatcher :
    AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
    //...
    public abstract fun dispatch(context: CoroutineContext, block: Runnable)
    // 1.攔截的 Continuation 被包了一層 DispatchedContinuation
    public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
        DispatchedContinuation(this, continuation)
    //...
}

internal class DispatchedContinuation<in T>(
    @JvmField val dispatcher: CoroutineDispatcher,
    @JvmField val continuation: Continuation<T>
) : DispatchedTask<T>(MODE_ATOMIC_DEFAULT), CoroutineStackFrame, Continuation<T> by continuation {
    // ...
    override fun resumeWith(result: Result<T>) {
        // ...
        if (dispatcher.isDispatchNeeded(context)) {
            // 2\. 后面一個(gè)參數(shù)需要提供 Runnable士聪,父類已經(jīng)實(shí)現(xiàn)
            dispatcher.dispatch(context, this)
        } 
        //...
    }
    // ...
}

// SchedulerTask 是一個(gè) Runnable
internal abstract class DispatchedTask<in T>(
    @JvmField public var resumeMode: Int
) : SchedulerTask() {
    // ...
    public final override fun run() {
        // ...
        try {
            //...
            withCoroutineContext(context, delegate.countOrElement) {
                // 3\. continuation 是 DispatchedContinuation 包裹的 continuation
                continuation.resume(...)
            }
        }
        //...
    }
}
復(fù)制代碼

簡(jiǎn)單來說,就是對(duì)原有的 ContinuationresumeWith 操作加了一層攔截猛蔽,就像這樣:

攔截流程

<figcaption></figcaption>

加入 CoroutineDispatcher 以后剥悟,執(zhí)行真正的 Continue#resumeWith() 之前灵寺,會(huì)執(zhí)行 CoroutineDispatcher#dispatch() 方法,所以我們現(xiàn)在關(guān)注 CoroutineDispatcher#dispatch 具體實(shí)現(xiàn)即可区岗。

講一個(gè)CoroutineDispatcher具體實(shí)現(xiàn)

首先我們得明確這個(gè) CoroutineDispatcher 來自哪里略板?它從 context 獲取,context來自哪里慈缔?

注意 SuspendLambdaContinuationImpl 的構(gòu)造方法叮称,SuspendLambda 中的參數(shù)沒有 CoroutineContext,所以只能來自 completion 中的 CoroutineContext藐鹤,而completionCoroutineContext 來自 launch 方法中來自 CoroutineScope瓤檐,默認(rèn)是 SupervisorJob() + Dispatchers.Main,不過只有 Dispatchers.Main 繼承了 CoroutineDispatcher娱节。

Dispatchers.Main 是一個(gè) MainCoroutineDispatcher挠蛉,Android 中對(duì)應(yīng)的 MainCoroutineDispatcherHandlerContext

internal class HandlerContext private constructor(
    private val handler: Handler,
    private val name: String?,
    private val invokeImmediately: Boolean
) : HandlerDispatcher(), Delay {
    public constructor(
        handler: Handler,
        name: String? = null
    ) : this(handler, name, false)

    //...

    override fun dispatch(context: CoroutineContext, block: Runnable) {
        // 利用主線程的 Handler 執(zhí)行任務(wù)
        handler.post(block)
    }

    override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
        // 利用主線程的 Handler 延遲執(zhí)行任務(wù),將完成的 continuation 放在任務(wù)中執(zhí)行
        val block = Runnable {
            with(continuation) { resumeUndispatched(Unit) }
        }
        handler.postDelayed(block, timeMillis.coerceAtMost(MAX_DELAY))
        continuation.invokeOnCancellation { handler.removeCallbacks(block) }
    }

    //..
}
復(fù)制代碼

重點(diǎn)來了肄满,調(diào)度任務(wù)最后竟然交給了主線程的 Handler谴古,其實(shí)想想也對(duì),主線程的任務(wù)最后一般都會(huì)交給主線程的 Handler稠歉。

好奇的同學(xué)可能問了掰担,如果不是主線程呢?不是主線程就利用的線程池:

public open class ExperimentalCoroutineDispatcher(
    private val corePoolSize: Int,
    private val maxPoolSize: Int,
    private val idleWorkerKeepAliveNs: Long,
    private val schedulerName: String = "CoroutineScheduler"
) : ExecutorCoroutineDispatcher() {
    // 執(zhí)行期
    override val executor: Executor
        get() = coroutineScheduler
    private var coroutineScheduler = createScheduler()

    override fun dispatch(context: CoroutineContext, block: Runnable): Unit =
        try {
            coroutineScheduler.dispatch(block)
        } catch (e: RejectedExecutionException) {
            DefaultExecutor.dispatch(context, block)
        }
}
復(fù)制代碼

結(jié)果可以說是很清晰了怒炸,coroutineScheduler 是一個(gè)線程池恩敌,如果像了解具體的過程,同學(xué)們可以自行查看代碼横媚。

讀到這里,你可能有一點(diǎn)明白 CoroutineContext 為什么要設(shè)計(jì)成一種數(shù)據(jù)結(jié)構(gòu):

  1. coroutineContext[ContinuationInterceptor] 就可以直接取到當(dāng)前協(xié)程的攔截器月趟,并且一個(gè)協(xié)程只能對(duì)應(yīng)一個(gè)調(diào)度器灯蝴。
  2. 調(diào)度器都放在其他 coroutineContext 的前面,所以在執(zhí)行協(xié)程的時(shí)候孝宗,可以做攔截處理穷躁。

同理,我們也可以使用 coroutineContext[Job] 獲取當(dāng)前協(xié)程因妇。

第六步 resumeWith

再次回到:

internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) =
    runSafely(completion) {
        // 外面再包一層 Coroutine
        createCoroutineUnintercepted(receiver, completion)
            // 如果需要问潭,做攔截處理
            .intercepted()
            // 調(diào)用 resumeWith 方法      
            .resumeCancellableWith(Result.success(Unit))
    }
復(fù)制代碼

現(xiàn)在我們看 Continue#resumeCancellableWith() 方法,它是一個(gè)擴(kuò)展方法婚被,里面的調(diào)度邏輯是:

  1. DispatchContinuation#resumeCancellableWith
  2. CoroutineDispatcher#dispatch
  3. Continuation#resumeWith

這里的 Continuation 就是 SuspendLambda狡忙,它繼承了 BaseContinuationImpl,我們看一下它的實(shí)現(xiàn)方法:

internal abstract class BaseContinuationImpl(
    public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {
    // This implementation is final. This fact is used to unroll resumeWith recursion.
    public final override fun resumeWith(result: Result<Any?>) {
        // This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
        var current = this
        var param = result
        while (true) {
            // Invoke "resume" debug probe on every resumed continuation, so that a debugging library infrastructure
            // can precisely track what part of suspended callstack was already resumed
            probeCoroutineResumed(current)
            with(current) {
                val completion = completion!! // fail fast when trying to resume continuation without completion
                val outcome: Result<Any?> =
                    try {
                        // 1\. 執(zhí)行 suspend 里面的代碼塊
                        val outcome = invokeSuspend(param)
                        // 2\. 如果代碼塊里面執(zhí)行了掛起方法址芯,會(huì)提前返回
                        if (outcome === COROUTINE_SUSPENDED) return
                        Result.success(outcome)
                    } catch (exception: Throwable) {
                        Result.failure(exception)
                    }
                releaseIntercepted() // this state machine instance is terminating
                if (completion is BaseContinuationImpl) {
                    // 3\. 如果完成的completion也是BaseContinuationImpl灾茁,就會(huì)進(jìn)入循環(huán)
                    current = completion
                    param = outcome
                } else {
                    // 4\. 執(zhí)行 completion resumeWith 方法 
                    completion.resumeWith(outcome)
                    return
                }
            }
        }
    }
}
復(fù)制代碼

這邊被我分為2個(gè)部分:

  • 執(zhí)行 suspend 方法窜觉,并獲取結(jié)果
  • 調(diào)用 complete(放在下一步講)

執(zhí)行suspend方法

在第一處會(huì)先執(zhí)行 suspend 修飾的方法內(nèi)容,在方法里面可能又會(huì)調(diào)度 suspend 方法北专,比如說我們的實(shí)例方法:

lifecycleScope.launch(Dispatchers.Main) {
    val a = async { getResult(1, 2) }
    val b = async { getResult(3, 5) }

    val c = a.await() + b.await()
    Log.e(TAG, "result:$c")
} 

suspend fun getResult(a: Int, b: Int): Int {
    return withContext(Dispatchers.IO) {
        delay(1000)
        return@withContext a + b
    }
}
復(fù)制代碼

因?yàn)槲覀冊(cè)?getResult 執(zhí)行了延時(shí)操作禀挫,所以我們 launch 方法肯定執(zhí)行了耗時(shí)掛起方法,所以 BaseContinuationImpl#invokeSuspend 方法會(huì)返回一個(gè) COROUTINE_SUSPENDED 拓颓,結(jié)果你也看到了语婴,該方法會(huì)提前結(jié)束。(說明一下驶睦,我沒有找到BaseContinuationImpl#invokeSuspend 方法的具體實(shí)現(xiàn)砰左,我猜可能跟編譯器有關(guān))

我猜你肯定跟我一樣好奇,遇到耗時(shí)掛起會(huì)提前返回啥繁,那么耗時(shí)掛起如何對(duì) complete 進(jìn)行恢復(fù)的菜职?

我們看一下 delay(1000) 這個(gè)延時(shí)操作在主線程是如何處理的:

public suspend fun delay(timeMillis: Long) {
    if (timeMillis <= 0) return // don't delay
    return suspendCancellableCoroutine sc@ { cont: CancellableContinuation<Unit> ->
        cont.context.delay.scheduleResumeAfterDelay(timeMillis, cont)
    }
}

internal class HandlerContext private constructor(
    private val handler: Handler,
    private val name: String?,
    private val invokeImmediately: Boolean
) : HandlerDispatcher(), Delay {
    //...

    override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
        val block = Runnable {
            with(continuation) { resumeUndispatched(Unit) }
        }
        handler.postDelayed(block, timeMillis.coerceAtMost(MAX_DELAY))
        continuation.invokeOnCancellation { handler.removeCallbacks(block) }
    }

    //...
}
復(fù)制代碼

可以看到,將恢復(fù)任務(wù)包了一個(gè) Runnable旗闽,交給 HandlerHandler#postDelayed() 方法了酬核。

第七步 complete resumeWith

對(duì)于 complete 的處理一般會(huì)有兩種。

complete是BaseContinuationImpl

第一種情況是我們稱之為套娃适室,完成回調(diào)的 Continuation 它本身也有自己的完成回調(diào) Continuation嫡意,接下來循環(huán)就對(duì)了。

調(diào)用complete的resumeWith

第二種情況捣辆,就是通過 complete 去完成回調(diào)蔬螟,由于 completeAbstractContinuation,我們看一下它的 resumeWith

public abstract class AbstractCoroutine<in T>(
    /**
     * The context of the parent coroutine.
     */
    @JvmField
    protected val parentContext: CoroutineContext,
    active: Boolean = true
) : JobSupport(active), Job, Continuation<T>, CoroutineScope {
    // ...
    public final override fun resumeWith(result: Result<T>) {
        // 1\. 獲取當(dāng)前協(xié)程的技術(shù)狀態(tài)
        val state = makeCompletingOnce(result.toState())
        // 2\. 如果當(dāng)前還在等待完成汽畴,說明還有子協(xié)程沒有結(jié)束
        if (state === COMPLETING_WAITING_CHILDREN) return
        // 3\. 執(zhí)行結(jié)束恢復(fù)的方法旧巾,默認(rèn)為空
        afterResume(state)
    }

    // 這是父類 JobSupport 中的 makeCompletingOnce 方法
    // 為了方便查看,我復(fù)制過來
    internal fun makeCompletingOnce(proposedUpdate: Any?): Any? {
        loopOnState { state ->
            // tryMakeCompleting 的內(nèi)容主要根據(jù)是否有子Job做不同處理
            val finalState = tryMakeCompleting(state, proposedUpdate)
            when {
                finalState === COMPLETING_ALREADY ->
                    throw IllegalStateException(
                        "Job $this is already complete or completing, " +
                                "but is being completed with $proposedUpdate", proposedUpdate.exceptionOrNull
                    )
                finalState === COMPLETING_RETRY -> return@loopOnState
                else -> return finalState // COMPLETING_WAITING_CHILDREN or final state
            }
        }
    }
}
復(fù)制代碼

這段代碼的意思其實(shí)也很簡(jiǎn)單忍些,就是協(xié)程即將完成鲁猩,得先評(píng)估一下協(xié)程的技術(shù)狀態(tài),別協(xié)程還有東西在運(yùn)行罢坝,就給結(jié)束了廓握。對(duì)于一些有子協(xié)程的一些協(xié)程,會(huì)等待子協(xié)程結(jié)束的時(shí)候嘁酿,才會(huì)結(jié)束當(dāng)前協(xié)程隙券。

一個(gè) launch 的過程大概就是這樣了。大致的流程圖是這樣的:

launch

下面我們?cè)僬務(wù)?async闹司。

四娱仔、關(guān)于async

asynclaunch 的代碼相似度很高:

public fun <T> CoroutineScope.async(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> T
): Deferred<T> {
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        LazyDeferredCoroutine(newContext, block) else
        DeferredCoroutine<T>(newContext, active = true)
    coroutine.start(start, coroutine, block)
    return coroutine
}
復(fù)制代碼

最終也會(huì)進(jìn)行三步走:

internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) =
    runSafely(completion) {
        // 外面再包一層 Coroutine
        createCoroutineUnintercepted(receiver, completion)
            // 如果需要,做攔截處理
            .intercepted()
            // 調(diào)用 resumeWith 方法      
            .resumeCancellableWith(Result.success(Unit))
    }
復(fù)制代碼

不同的是开仰,async 返回的是一個(gè) Deferred<T>拟枚,我們需要調(diào)用 Deferred#await() 去獲取返回結(jié)果薪铜,它的實(shí)現(xiàn)在 JobSupport

private open class DeferredCoroutine<T>(
    parentContext: CoroutineContext,
    active: Boolean
) : AbstractCoroutine<T>(parentContext, active), Deferred<T>, SelectClause1<T> {
    // ... awaitInternal方法來自父類 JobSupport
    override suspend fun await(): T = awaitInternal() as T
    // ...

    // 這是 JobSupport 中的實(shí)現(xiàn)
    internal suspend fun awaitInternal(): Any? {
        // 循環(huán)獲取結(jié)果
        while (true) { // lock-free loop on state
            val state = this.state
            // 1\. 如果處于完成狀態(tài)
            if (state !is Incomplete) {
                if (state is CompletedExceptionally) { // Slow path to recover stacktrace
                    recoverAndThrow(state.cause)
                }
                return state.unboxState()
            }
            // 2\. 除非需要重試,不然就 break
            if (startInternal(state) >= 0) break 
        }
        // 等待掛起的方法
        return awaitSuspend() // slow-path
    }
}
復(fù)制代碼

它的具體過程可以從我的注釋看出恩溅,就不一一介紹了隔箍,感興趣的同學(xué)可以查看源碼。

1. 本文一開始的討論

本文一開始的代碼是錯(cuò)的脚乡,連編譯器都過不了蜒滩,尷尬~

正確的代碼應(yīng)該是:

GlobalScope.launch {
    val a = async {
        1+2
    }

    val b = async {
        1+3
    }

    val c = a.await() + bawait()
    Log.e(TAG,"result:$c")
}
復(fù)制代碼

如果是正確的代碼,這里可能分兩種情況:

如果你放在UI線程奶稠,那肯定是串行的俯艰,這時(shí)候有人說,我在 a 里使用 delay(1000)锌订,在 b 里使用 delay(2000)竹握,得到 c 的時(shí)候就花了 2000 毫秒啊,這不是并行嗎辆飘?事情并不是這樣的啦辐,delay 操作使用了 Handler#postDelay 方法,一個(gè)延遲了 1000 毫秒執(zhí)行蜈项,一個(gè)延遲了 2000 毫秒執(zhí)行芹关,但是主線程只有一個(gè),所以只能是串行紧卒。

如果是子線程侥衬,通常都是并行的,因?yàn)槲覀兪褂昧司€程池啊~

總結(jié)

寫這邊源碼分析的時(shí)候跑芳,一些細(xì)節(jié)總是找不到轴总,比如說 suspendLambda 的子類找不到,自己對(duì) Kotlin 的學(xué)習(xí)有待深入博个。

所以本文有些地方還值得商榷肘习,如果你有更好的理解,歡迎下方交流坡倔。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市脖含,隨后出現(xiàn)的幾起案子罪塔,更是在濱河造成了極大的恐慌,老刑警劉巖养葵,帶你破解...
    沈念sama閱讀 218,755評(píng)論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件征堪,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡关拒,警方通過查閱死者的電腦和手機(jī)佃蚜,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,305評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門庸娱,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人谐算,你說我怎么就攤上這事熟尉。” “怎么了洲脂?”我有些...
    開封第一講書人閱讀 165,138評(píng)論 0 355
  • 文/不壞的土叔 我叫張陵斤儿,是天一觀的道長(zhǎng)。 經(jīng)常有香客問我恐锦,道長(zhǎng)往果,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,791評(píng)論 1 295
  • 正文 為了忘掉前任一铅,我火速辦了婚禮陕贮,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘潘飘。我一直安慰自己肮之,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,794評(píng)論 6 392
  • 文/花漫 我一把揭開白布福也。 她就那樣靜靜地躺著局骤,像睡著了一般。 火紅的嫁衣襯著肌膚如雪暴凑。 梳的紋絲不亂的頭發(fā)上峦甩,一...
    開封第一講書人閱讀 51,631評(píng)論 1 305
  • 那天,我揣著相機(jī)與錄音现喳,去河邊找鬼凯傲。 笑死,一個(gè)胖子當(dāng)著我的面吹牛嗦篱,可吹牛的內(nèi)容都是我干的冰单。 我是一名探鬼主播,決...
    沈念sama閱讀 40,362評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼灸促,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼诫欠!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起浴栽,我...
    開封第一講書人閱讀 39,264評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤荒叼,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后典鸡,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體被廓,經(jīng)...
    沈念sama閱讀 45,724評(píng)論 1 315
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,900評(píng)論 3 336
  • 正文 我和宋清朗相戀三年萝玷,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,040評(píng)論 1 350
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡端圈,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出仓蛆,到底是詐尸還是另有隱情,我是刑警寧澤法精,帶...
    沈念sama閱讀 35,742評(píng)論 5 346
  • 正文 年R本政府宣布多律,位于F島的核電站,受9級(jí)特大地震影響搂蜓,放射性物質(zhì)發(fā)生泄漏狼荞。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,364評(píng)論 3 330
  • 文/蒙蒙 一帮碰、第九天 我趴在偏房一處隱蔽的房頂上張望相味。 院中可真熱鬧,春花似錦殉挽、人聲如沸丰涉。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,944評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽一死。三九已至,卻和暖如春傻唾,著一層夾襖步出監(jiān)牢的瞬間投慈,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,060評(píng)論 1 270
  • 我被黑心中介騙來泰國(guó)打工冠骄, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留伪煤,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,247評(píng)論 3 371
  • 正文 我出身青樓凛辣,卻偏偏與公主長(zhǎng)得像抱既,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子扁誓,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,979評(píng)論 2 355