庖丁解牛座慰,一文搞懂Kotlin協(xié)程的掛起和恢復

通過上篇文章大致理解了協(xié)程框架是怎么運行的奴紧,知道了作用域CoroutinScope,上下文CoroutinContext厅篓,續(xù)體Continuation酣栈,攔截器CorotineDispatcher惰说,協(xié)程線程池CoroutineScheduler這些構成協(xié)程運行的基本要素舆乔,通過源碼跟蹤知道協(xié)程就是通過續(xù)體的不斷交互和攔截器線程池的調度達到切換線程運行的目的岳服,但這顯然并不是協(xié)程的精髓,協(xié)程真正的精髓在于讓我們可以寫同步代碼一樣實現(xiàn)異步操作希俩,而不需要用傳統(tǒng)的回調接口派阱。

沒錯,就是它賴以成名的掛起和恢復機制斜纪,這個機制才是協(xié)程的精華和實用性的代表。

協(xié)程的掛起和恢復

所謂協(xié)程的掛起和恢復文兑,我認為可以這樣描述:協(xié)程的掛起就是讓運行在A線程的協(xié)程里的掛起函數(shù)(suspend修飾)可以切換到B線程去執(zhí)行一段時間(通常是耗時操作)盒刚,此時外面的協(xié)程處于掛起狀態(tài)(線程A沒有阻塞等待,而是該干嘛干嘛)绿贞,等協(xié)程掛起函數(shù)執(zhí)行完后(通常還帶有返回結果)恢復(實際上是主動通知協(xié)程該恢復了)到協(xié)程里繼續(xù)執(zhí)行因块,此時的代碼編寫形式完全不需要回調接口的參與,都是同步寫在協(xié)程體里的籍铁。
這種形式的寫法避免了以前回調地獄的問題涡上,但是本質上還是線程切換,只是寫法上更人性化更優(yōu)雅拒名。
很多人說這樣更具性能吩愧,在協(xié)程數(shù)量不多的情況下,其實不見得更具性能增显。協(xié)程可以看做是一個基于線程的框架實現(xiàn)雁佳,在我看來,協(xié)程的最大亮點就是共享了線程池和掛起恢復機制同云。
舉個例子吧:

GlobalScope.launch(Dispatchers.Main){
            println("launch start in Thread:${Thread.currentThread().name}")
            // 掛起點糖权,主要是withContext函數(shù)實現(xiàn)了掛起和恢復
            val withResult = withContext(Dispatchers.IO){
                println("withContext run in Thread:${Thread.currentThread().name}")
                "withContext success"
            }
            // 恢復點
            println("launch end  in Thread:${Thread.currentThread().name}, withResult = $withResult")
        }

執(zhí)行結果:
2023-08-31 18:00:34.193 4994-4994/com.example.coroutinexe I/System.out: launch start in Thread:main
2023-08-31 18:00:34.196 4994-6990/com.example.coroutinexe I/System.out: withContext run in Thread:DefaultDispatcher-worker-1
2023-08-31 18:00:34.199 4994-4994/com.example.coroutinexe I/System.out: launch end  in Thread:main, withResult = withContext success

以上例子很好地詮釋了協(xié)程的掛起和恢復機制,沒有回調接口炸站,都是以串行的順序執(zhí)行代碼星澳,非常容易閱讀。接下來就跟蹤一下withContext函數(shù)看看它是怎么實現(xiàn)協(xié)程的掛起和恢復的旱易。
繼續(xù)上篇分析到的BaseContinuationImpl的resumeWith(result: Result<Any?>)方法禁偎,那里是真正執(zhí)行invokeSuspend()也就是我們協(xié)程體的地方:

BaseContinuationImpl:
 public final override fun resumeWith(result: Result<Any?>) {
        // This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
        var current = this
        var param = result
        while (true) {
            // Invoke "resume" debug probe on every resumed continuation, so that a debugging library infrastructure
            // can precisely track what part of suspended callstack was already resumed
            probeCoroutineResumed(current)
            with(current) { // 循環(huán)腿堤,主要是循環(huán)執(zhí)行invokeSuspend,根據(jù)該方法里的lable狀態(tài)執(zhí)行不同的掛起函數(shù)
                val completion = completion!! // fail fast when trying to resume continuation without completion
                val outcome: Result<Any?> =
                    try {
                        // 執(zhí)行協(xié)程體
                        val outcome = invokeSuspend(param)
                        // 重點關注這里如果返回值是COROUTINE_SUSPENDED届垫,則跳出循環(huán)释液,也就是不再執(zhí)行invokeSuspend方法,其實這就是掛起點了
                        // 那就看看什么時候invokeSuspend會返回COROUTINE_SUSPENDED呢装处,這就需要往invokeSuspend追溯
                        if (outcome === COROUTINE_SUSPENDED) return
                        Result.success(outcome)
                    } catch (exception: Throwable) {
                        Result.failure(exception)
                    }
                releaseIntercepted() // this state machine instance is terminating
                if (completion is BaseContinuationImpl) {
                    // unrolling recursion via loop
                    current = completion
                    param = outcome
                } else {
                    // top-level completion reached -- invoke and return
                    completion.resumeWith(outcome)
                    return
                }
            }
        }
    }

invokeSuspend方法:

           int label;
            /*
             * Unable to fully structure code
             * Enabled aggressive block sorting
             * Lifted jumps to return sites
             */
            @Nullable
            public final Object invokeSuspend(@NotNull Object var1_1) {
                var3_2 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
                switch (this.label) {
                    case 0: {
                        ResultKt.throwOnFailure((Object)var1_1);
                        System.out.println((Object)Intrinsics.stringPlus((String)"launch start in Thread:", (Object)Thread.currentThread().getName()));
                        this.label = 1;
                        // 重點看這里误债,這個時候的withContext方法返回的不是我們定義里面的block里定義的"withContext success",注意別搞混妄迁,這就要去看看withContext的實現(xiàn)
                        v0 = BuildersKt.withContext((CoroutineContext)((CoroutineContext)Dispatchers.getIO()), (Function2)((Function2)new Function2<CoroutineScope, Continuation<? super String>, Object>(null){
                            int label;

                            @Nullable
                            public final Object invokeSuspend(@NotNull Object object) {
                                IntrinsicsKt.getCOROUTINE_SUSPENDED();
                                switch (this.label) {
                                    case 0: {
                                        ResultKt.throwOnFailure((Object)object);
                                        System.out.println((Object)Intrinsics.stringPlus((String)"withContext run in Thread:", (Object)Thread.currentThread().getName()));
                                        return "withContext success";
                                    }
                                }
                                throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
                            }

                            @NotNull
                            public final Continuation<Unit> create(@Nullable Object value, @NotNull Continuation<?> $completion) {
                                return (Continuation)new /* invalid duplicate definition of identical inner class */;
                            }

                            @Nullable
                            public final Object invoke(@NotNull CoroutineScope p1, @Nullable Continuation<? super String> p2) {
                                return (this.create((Object)p1, p2)).invokeSuspend((Object)Unit.INSTANCE);
                            }
                        }), (Continuation)((Continuation)this));
                        // 如果返回值等于COROUTINE_SUSPENDED寝蹈,就return COROUTINE_SUSPENDED,這不就巧了登淘,正是上面resumeWith分析到的情況
                        if (v0 == var3_2) {
                            return var3_2;
                        }
                        ** GOTO lbl14
                    }
                    case 1: {
                        ResultKt.throwOnFailure((Object)$result);
                        v0 = $result;
lbl14:
                        // 2 sources

                        withResult = (String)v0;
                        System.out.println((Object)("launch end  in Thread:" + Thread.currentThread().getName() + ", withResult = " + withResult));
                        return Unit.INSTANCE;
                    }
                }
                throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
            }
協(xié)程的掛起
// context參數(shù)一般傳調度器Dispatchers,代碼塊會運行在指定的線程池
public suspend fun <T> withContext(  
     context: CoroutineContext,  
     block: suspend CoroutineScope.() -> T  
 ): T {  
     contract {  
         callsInPlace(block, InvocationKind.EXACTLY_ONCE)  
     }  
     // 返回啟動withContext的協(xié)程體  
     return suspendCoroutineUninterceptedOrReturn sc@ { uCont ->  
         // 構建一個新的newContext箫老,合并當前協(xié)程體以及withContext協(xié)程體的CoroutineContext  
         val oldContext = uCont.context  
         val newContext = oldContext + context  
         // 檢查協(xié)程是否活躍,如果線程處于非活躍的狀態(tài)黔州,拋出cancle異常  
         newContext.checkCompletion()  
         ...  
         // DispatchedCoroutine也是一個AbstractCoroutine對象耍鬓,負責協(xié)程完成的回調,  
         // 注意這里的Continuation的傳參為uCont流妻,及發(fā)起withContext的協(xié)程對象  
         val coroutine = DispatchedCoroutine(newContext, uCont)  
         coroutine.initParentJob()  
                  // 和協(xié)程啟動的流程一樣牲蜀,啟動withContext的協(xié)程  
         // 注意這里的傳參coroutine為DispatchedCoroutine,它持有需要恢復的協(xié)程 绅这,這時候已經(jīng)開啟了block代碼的協(xié)程了涣达,可能是異步的
         block.startCoroutineCancellable(coroutine, coroutine)  
         // 但是此時會返回一個結果,就是告訴父類協(xié)程這里要掛起了证薇,你被掛起了度苔,按照上面的分析,這里應該是返回了 COROUTINE_SUSPENDED
         coroutine.getResult()  
     }  
 }  

看下來就是執(zhí)行了suspendCoroutineUninterceptedOrReturn函數(shù)浑度,這個函數(shù)看不到源碼寇窑,是屬于kotlin編譯器的內(nèi)建函數(shù),看不到也不用太糾結箩张,只要記住那個uCont參數(shù)是withContext外面的協(xié)程續(xù)體即可疗认,它用來恢復外面的協(xié)程。
DispatchedCoroutine:

// 定義的三種狀態(tài)  待定伏钠、掛起横漏、恢復
private const val UNDECIDED = 0 
private const val SUSPENDED = 1
private const val RESUMED = 2
// Used by withContext when context dispatcher changes
internal class DispatchedCoroutine<in T>(
    context: CoroutineContext,
    uCont: Continuation<T>
) : ScopeCoroutine<T>(context, uCont) {
    // this is copy-and-paste of a decision state machine inside AbstractionContinuation
    // todo: we may some-how abstract it via inline class
    private val _decision = atomic(UNDECIDED)
 // 第一次調用trySuspend的時候肯定返回了true,看他的命名就很有意思
    private fun trySuspend(): Boolean {
        _decision.loop { decision ->
            when (decision) {
                UNDECIDED -> if (this._decision.compareAndSet(UNDECIDED, SUSPENDED)) return true
                RESUMED -> return false
                else -> error("Already suspended")
            }
        }
    }

    private fun tryResume(): Boolean {
        _decision.loop { decision ->
            when (decision) {
                UNDECIDED -> if (this._decision.compareAndSet(UNDECIDED, RESUMED)) return true
                SUSPENDED -> return false
                else -> error("Already resumed")
            }
        }
    }

    override fun afterCompletion(state: Any?) {
        // Call afterResume from afterCompletion and not vice-versa, because stack-size is more
        // important for afterResume implementation
        afterResume(state)
    }

    override fun afterResume(state: Any?) {
        if (tryResume()) return // completed before getResult invocation -- bail out
        // Resume in a cancellable way because we have to switch back to the original dispatcher
        uCont.intercepted().resumeCancellableWith(recoverResult(state, uCont))
    }
    // 第一次調用getResult的時候肯定返回了COROUTINE_SUSPENDED
    fun getResult(): Any? {
        if (trySuspend()) return COROUTINE_SUSPENDED
        // otherwise, onCompletionInternal was already invoked & invoked tryResume, and the result is in the state
        val state = this.state.unboxState()
        if (state is CompletedExceptionally) throw state.cause
        @Suppress("UNCHECKED_CAST")
        return state as T
    }
}

至此熟掂,掛起的邏輯算是走通了缎浇,現(xiàn)在來捋一下:
1.父協(xié)程體執(zhí)行的時候遇到了掛起函數(shù)withContext,withContext里開啟自身的協(xié)程執(zhí)行自身的協(xié)程體任務赴肚;
2.同時withContext返回COROUTINE_SUSPENDED
3.父協(xié)程體resumeWith方法里判斷到了返回值是COROUTINE_SUSPENDED素跺,于是結束了resumeWith的執(zhí)行二蓝,父協(xié)程體里面withContext后面的代碼暫停執(zhí)行,但其實沒有阻塞線程
4.父協(xié)程的協(xié)程體SuspendLambda保存了協(xié)程狀態(tài)指厌,記錄了當前掛起的地方刊愚,此時withContext的協(xié)程體在執(zhí)行,父協(xié)程處于掛起狀態(tài)踩验,等待被通知恢復

協(xié)程的恢復

父協(xié)程被掛起后鸥诽,需要怎么恢復呢,首先它肯定不是自己去恢復箕憾,它是等待掛起點去通知它恢復牡借,所以我們要到withContext的協(xié)程體里去找答案,因為只有它才知道自己什么時候執(zhí)行完袭异,然后通知父協(xié)程恢復執(zhí)行钠龙。
所以還是要找到BaseContinuationImpl的resumeWith()方法,那里在執(zhí)行withContext里的協(xié)程體block:

BaseContinuationImpl:
 public final override fun resumeWith(result: Result<Any?>) {
        // This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
        var current = this
        var param = result
        while (true) {
            // Invoke "resume" debug probe on every resumed continuation, so that a debugging library infrastructure
            // can precisely track what part of suspended callstack was already resumed
            probeCoroutineResumed(current)
            with(current) { 
                val completion = completion!! // fail fast when trying to resume continuation without completion
                val outcome: Result<Any?> =
                    try {
                        // 執(zhí)行協(xié)程體御铃,注意此時執(zhí)行的是掛起函數(shù)的block碴里,正常會返回我們寫的"withContext success"字符串
                        val outcome = invokeSuspend(param)
                        if (outcome === COROUTINE_SUSPENDED) return
                        Result.success(outcome)
                    } catch (exception: Throwable) {
                        Result.failure(exception)
                    }
                releaseIntercepted() // this state machine instance is terminating
                // 注意這個completion是我們創(chuàng)建的DispatchedCoroutine
                if (completion is BaseContinuationImpl) {
                    // unrolling recursion via loop
                    current = completion
                    param = outcome
                } else {
                    // top-level completion reached -- invoke and return
                    // 執(zhí)行完后走這里,執(zhí)行DispatchedCoroutine的resumeWith上真,這里其實就是協(xié)程掛起點的恢復標志了
                    completion.resumeWith(outcome)
                    return
                }
            }
        }
    }

DispatchedCoroutine:
    //  DispatchedCoroutine父類AbstractCoroutine的方法
    public final override fun resumeWith(result: Result<T>) {
        val state = makeCompletingOnce(result.toState())
        if (state === COMPLETING_WAITING_CHILDREN) return
        afterResume(state)
    }
// DispatchedCoroutine自身實現(xiàn)的方法
override fun afterResume(state: Any?) {
        if (tryResume()) return // completed before getResult invocation -- bail out
        // Resume in a cancellable way because we have to switch back to the original dispatcher
        // 注意這個uCont是父協(xié)程的續(xù)體咬腋,他是個SuspendLambda,最終它又開啟了攔截器攔截方法(所以說協(xié)程掛起
        // 恢復后執(zhí)行的線程未必是掛起之前運行的線程谷羞,因為它又攔截了一次,也就又讓線程池分配了一次)溜徙,并在之前保存的狀態(tài)處恢復了父協(xié)程的協(xié)程體代碼執(zhí)行
        uCont.intercepted().resumeCancellableWith(recoverResult(state, uCont))
    }

以上便是協(xié)程掛起后恢復的邏輯湃缎,關鍵還是對續(xù)體Continuation的巧妙調用和狀態(tài)機模式的靈活設計,理解起來倒是不難蠢壹。
另外提一個知識點嗓违,掛起函數(shù)一般有個關鍵字suspend修飾,但并不代表有suspend修飾的方法都會被掛起图贸,只有像delay和withContext設計返回了那個掛起標志COROUTINE_SUSPENDED才會掛起蹂季,事實上,關鍵字suspend修飾的方法只是會讓kotlin編譯器在編譯的時候新增一個父協(xié)程的續(xù)體入?yún)⑹枞眨绻銢]有手動調用這個續(xù)體的恢復方法偿洁,那么也不會執(zhí)行恢復操作會一直掛起。
下篇會對協(xié)程的線程池展開分析沟优。

最后編輯于
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末涕滋,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子挠阁,更是在濱河造成了極大的恐慌宾肺,老刑警劉巖溯饵,帶你破解...
    沈念sama閱讀 218,284評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異锨用,居然都是意外死亡丰刊,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,115評論 3 395
  • 文/潘曉璐 我一進店門增拥,熙熙樓的掌柜王于貴愁眉苦臉地迎上來啄巧,“玉大人,你說我怎么就攤上這事跪者】妹保” “怎么了?”我有些...
    開封第一講書人閱讀 164,614評論 0 354
  • 文/不壞的土叔 我叫張陵渣玲,是天一觀的道長逗概。 經(jīng)常有香客問我,道長忘衍,這世上最難降的妖魔是什么逾苫? 我笑而不...
    開封第一講書人閱讀 58,671評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮枚钓,結果婚禮上铅搓,老公的妹妹穿的比我還像新娘。我一直安慰自己搀捷,他們只是感情好星掰,可當我...
    茶點故事閱讀 67,699評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著嫩舟,像睡著了一般氢烘。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上家厌,一...
    開封第一講書人閱讀 51,562評論 1 305
  • 那天播玖,我揣著相機與錄音,去河邊找鬼饭于。 笑死蜀踏,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的掰吕。 我是一名探鬼主播果覆,決...
    沈念sama閱讀 40,309評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼殖熟!你這毒婦竟也來了随静?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 39,223評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎燎猛,沒想到半個月后恋捆,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,668評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡重绷,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,859評論 3 336
  • 正文 我和宋清朗相戀三年沸停,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片昭卓。...
    茶點故事閱讀 39,981評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡愤钾,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出候醒,到底是詐尸還是另有隱情能颁,我是刑警寧澤,帶...
    沈念sama閱讀 35,705評論 5 347
  • 正文 年R本政府宣布倒淫,位于F島的核電站伙菊,受9級特大地震影響,放射性物質發(fā)生泄漏敌土。R本人自食惡果不足惜镜硕,卻給世界環(huán)境...
    茶點故事閱讀 41,310評論 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望返干。 院中可真熱鬧兴枯,春花似錦、人聲如沸矩欠。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,904評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽癌淮。三九已至躺坟,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間该默,已是汗流浹背瞳氓。 一陣腳步聲響...
    開封第一講書人閱讀 33,023評論 1 270
  • 我被黑心中介騙來泰國打工策彤, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留栓袖,地道東北人。 一個月前我還...
    沈念sama閱讀 48,146評論 3 370
  • 正文 我出身青樓店诗,卻偏偏與公主長得像裹刮,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子庞瘸,可洞房花燭夜當晚...
    茶點故事閱讀 44,933評論 2 355

推薦閱讀更多精彩內(nèi)容