【Kotlin協(xié)程】一文弄懂協(xié)程工作原理

通過線程一窺協(xié)程

我們先來看一張協(xié)程與線程對比圖:圖片來源

image-20240311232446870.png

  • 協(xié)程本質(zhì)上可以認為是運行在線程上的代碼塊,協(xié)程提供的 掛起 操作會使協(xié)程暫停執(zhí)行杜恰,而不會導(dǎo)致線程阻塞。
  • 協(xié)程是一種輕量級資源轧膘,如上圖所示嗜诀,即使創(chuàng)建了上千個協(xié)程,對于系統(tǒng)來說也不是一種很大的負擔(dān)略吨。
  • 包含關(guān)系上看魂务,協(xié)程跟線程的關(guān)系曼验,有點像“線程與進程的關(guān)系”,畢竟粘姜,協(xié)程不可能脫離線程運行鬓照;協(xié)程雖然不能脫離線程而運行,但可以在不同的線程之間切換孤紧,如上圖所示豺裆。
  • 協(xié)程的核心競爭力在于:簡化異步并發(fā)任務(wù)(同步的寫法實現(xiàn)異步操作)。

一窺協(xié)程之后,我們再來一步步深入臭猜,深入之前先了解一些前置知識躺酒,來更好的幫助我們理解。

前置知識

Continuation Passing Style(CPS)

Continuation Passing Style翻譯過來就是“續(xù)體傳遞風(fēng)格”蔑歌,后面就簡稱CPS羹应。簡單點說CPS其實就是函數(shù)通過回調(diào)傳遞結(jié)果,先讓我們看看例子:

//常規(guī)寫法
class Exemple {
    public static long sum(int a, int b) {
        return a + b;
    }
    public static void main(String[] args) {
        System.out.println(sum(1, 2));
    }
}

//CPS風(fēng)格
class Exemple {
    interface Continuation {
        void next(int result);
    }
    public static void sum(int a, int b, Continuation continuation) {
        continuation.next(a+ b);
    }
    public static void main(String[] args) {
        plus(1, 2, result -> System.out.println(result));
    }
}

很簡單吧次屠,這就是CPS風(fēng)格园匹,函數(shù)的結(jié)果通過回調(diào)來傳遞。協(xié)程里通過, 協(xié)程里通過在CPS的Continuation回調(diào)里結(jié)合狀態(tài)機流轉(zhuǎn)劫灶,來實現(xiàn)協(xié)程掛起-恢復(fù)的功能裸违。

協(xié)程中的續(xù)體——Continuation

Kotlin中被suspend修飾符修飾的函數(shù)在編譯期間會被編譯器做特殊處理。而首先處理的就是CPS變換本昏,他會改變掛起函數(shù)的函數(shù)簽名供汛。

例如下面這個suspend函數(shù):

suspend fun getResponse(): Response {
    val response = doRequest() //doRequest() 模擬網(wǎng)絡(luò)請求
    return response
}

在編譯期發(fā)生CPS轉(zhuǎn)換后,反編譯成java凛俱,結(jié)果會是這樣:

public static final Object getResponse(Continuation $completion) {
  ...
}

我們可以看到編譯器對函數(shù)簽名做了改變紊馏,這種改變就是上節(jié)中說過的CPS(續(xù)體傳遞風(fēng)格)變換料饥。

發(fā)生了CPS變換后的函數(shù)多了一個Continuation(續(xù)體)類型的參數(shù)蒲犬,它的聲明如下:

interface Continuation<in T> {
   val context: CoroutineContext
   fun resumeWith(result: Result<T>)//result 為返回的結(jié)果
}
  • 續(xù)體是一個較為抽象的概念,簡單來說它包裝了協(xié)程在掛起之后應(yīng)該繼續(xù)執(zhí)行的代碼岸啡;在編譯的過程中原叮,一個完整的協(xié)程被分割切塊成一個又一個續(xù)體
  • 在suspend函數(shù)或者 await 函數(shù)的掛起結(jié)束以后巡蘸,它會調(diào)用 continuation 參數(shù)的 resumeWith 函數(shù)奋隶,來恢復(fù)執(zhí)行suspend函數(shù)或者await 函數(shù)后面的代碼。

下面就看下suspend函數(shù)CPS轉(zhuǎn)換后的偽代碼:

fun getResponse(ct:Continuation):Any?{
  val response = doRequest()
  ct.resumeWith(response)
}

注:可以將Continuation認為是一個Callback悦荒,這樣是不是更好理解了唯欣。

最后還要提一點,我們看到發(fā)生 CPS 變換的函數(shù)搬味,返回值類型變成了 Any?境氢,這是因為這個函數(shù)在發(fā)生變換后士聪,除了要返回它本身的返回值惶岭,還要返回一個標記CoroutineSingletons.COROUTINE_SUSPENDED,為了適配各種可能性,CPS 轉(zhuǎn)換后的函數(shù)返回值類型就只能是 Any?了翼岁。至于CoroutineSingletons.COROUTINE_SUSPENDED是什么悦析,后面我們會說到寿桨。

協(xié)程的啟動

例子:

object CoroutineExample {
    private val TAG: String = "CoroutineExample"

    fun main(){
        GlobalScope.launch(Main) {
            request()
        }
    }
    private suspend fun request(): String {
        delay(2000)
        Log.e(TAG, "request complete")
        return "result from request"
    }
}

GlobalScope.launch()

CoroutineScope.launch 開始跟蹤協(xié)程啟動流程:

public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job {
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy){
        LazyStandaloneCoroutine(newContext, block) 
    }else{
        StandaloneCoroutine(newContext, active = true)
    }
    coroutine.start(start, coroutine, block)
    return coroutine
}
  • 參數(shù)一context:協(xié)程上下文,并不是我們平時理解的Android中的上下文强戴,它是一種key-value數(shù)據(jù)結(jié)構(gòu)亭螟。此處我們傳入了Main用于主線程調(diào)度挡鞍,這部分這里就先不擴展了。
  • 參數(shù)二start:啟動模式预烙,此處我們沒有傳值則為默認值(DEFAULT)匕累,共有三種啟動模式。
    • DEFAULT:默認模式默伍,創(chuàng)建即啟動協(xié)程欢嘿,可隨時取消;
    • ATOMIC:自動模式也糊,創(chuàng)建即啟動協(xié)程炼蹦,啟動前不可取消;
    • LAZY:延遲啟動模式狸剃,只有當(dāng)調(diào)用start方法時才能啟動掐隐。
  • 參數(shù)三block:協(xié)程真正執(zhí)行的代碼塊,即上面例子中l(wèi)aunch{}閉包內(nèi)的代碼塊钞馁。

SuspendLambda

CoroutineScope.launch中第三個參數(shù)類型為suspend CoroutineScope.() -> Unit函數(shù)虑省,這是怎么來的呢?我們編寫代碼的時候并沒有這個東西僧凰,其實它由編譯器生成的探颈,我們的block代碼塊經(jīng)過編譯器編譯后會生成一個繼承Continuation的類SuspendLambda。一起看下反編譯的java代碼训措,為了關(guān)注主要邏輯方便理解伪节,去掉了一些無關(guān)代碼大概代碼如下:

public final void main() {
      BuildersKt.launch$default((CoroutineScope)GlobalScope.INSTANCE, (CoroutineContext)Dispatchers.getMain(), (CoroutineStart)null, (Function2)(new Function2((Continuation)null) {
         int label;

         @Nullable
         public final Object invokeSuspend(@NotNull Object $result) {
            Object var2 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
            switch (this.label) {
               case 0:
                  ResultKt.throwOnFailure($result);
                  CoroutineExample var10000 = CoroutineExample.this;
                  this.label = 1;
                  if (var10000.request(this) == var2) {
                     return var2;
                  }
                  break;
               case 1:
                  ResultKt.throwOnFailure($result);
                  break;
               default:
                  throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
            }

            return Unit.INSTANCE;
         }
        
        ···
 }

從上面反編譯的java代碼中好像并不能很好的看出來協(xié)程中的block代碼塊具體編譯長什么樣子,但可以確定他是編譯成了Continuation類绩鸣,因為我們可以看到實現(xiàn)的invokeSuspend方法實際是來自BaseContinuationImpl,而BaseContinuationImpl的父類就是Continuation怀大。這個繼承關(guān)系我們后面再說。既然從反編譯的java代碼中看的不明顯呀闻,我們直接看上面例子的字節(jié)碼文件化借,其中可以很明顯的看到這樣一段代碼:

final class com/imile/pda/CoroutineExample$main$1 extends kotlin/coroutines/jvm/internal/SuspendLambda implements kotlin/jvm/functions/Function2

這下恍然大悟,launch函數(shù)的第三個參數(shù)捡多,即協(xié)程中的block代碼塊是一個編譯后繼承了SuspendLambda并且實現(xiàn)了Function2的實例蓖康。SuspendLambda 本質(zhì)上是一個 Continuation,前面我們已經(jīng)說過 Continuation 是一個有著恢復(fù)操作的接口局服,其 resume 方法可以恢復(fù)協(xié)程的執(zhí)行钓瞭。

SuspendLambda繼承機構(gòu)如下:

- Continuation: 續(xù)體,恢復(fù)協(xié)程的執(zhí)行
    - BaseContinuationImpl: 實現(xiàn) resumeWith(Result) 方法淫奔,控制狀態(tài)機的執(zhí)行山涡,定義了 invokeSuspend 抽象方法
        - ContinuationImpl: 增加 intercepted 攔截器,實現(xiàn)線程調(diào)度等
            - SuspendLambda: 封裝協(xié)程體代碼塊
                - 協(xié)程體代碼塊生成的子類: 實現(xiàn) invokeSuspend 方法,其內(nèi)實現(xiàn)狀態(tài)機流轉(zhuǎn)邏輯

每一層封裝都對應(yīng)添加了不同的功能鸭丛,我們先忽略掉這些功能細節(jié)竞穷,著眼于我們的主線,繼續(xù)跟進launch 函數(shù)執(zhí)行過程鳞溉,由于第二個參數(shù)是默認值(DEFAULT)瘾带,所以創(chuàng)建的是 StandaloneCoroutine, 最后啟動協(xié)程:

 // 啟動協(xié)程
coroutine.start(start, coroutine, block)

// 啟動協(xié)程
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
    start(block, receiver, this)
}

上面 coroutine.start 的調(diào)用涉及到運算符重載熟菲,實際上會調(diào)到 CoroutineStart.invoke() 方法:

public operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>): Unit =
    when (this) {
        DEFAULT -> block.startCoroutineCancellable(receiver, completion)
        ATOMIC -> block.startCoroutine(receiver, completion)
        UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
        LAZY -> Unit // will start lazily
    }

這里啟動方式為DEFAULT看政,所以接著往下看:

internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(
    receiver: R, completion: Continuation<T>,
    onCancellation: ((cause: Throwable) -> Unit)? = null
) = runSafely(completion) {
    createCoroutineUnintercepted(receiver, completion)
        .intercepted()
        .resumeCancellableWith(Result.success(Unit), onCancellation)
}

整理下調(diào)用鏈如下:

coroutine.start(start, coroutine, block)
-> CoroutineStart.start(block, receiver, this)
-> CoroutineStart.invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>)
->  block.startCoroutineCancellable(receiver, completion)
-> 
createCoroutineUnintercepted(receiver,completion).intercepted().resumeCancellableWith(Result.success(Unit), onCancellation)

最后走到createCoroutineUnintercepted(receiver,completion).intercepted().resumeCancellableWith(Result.success(Unit), onCancellation),這里創(chuàng)建了一個協(xié)程抄罕,并鏈式調(diào)用 intercepted允蚣、resumeCancellable 方法,利用協(xié)程上下文中的 ContinuationInterceptor 對協(xié)程的執(zhí)行進行攔截呆贿,intercepted 實際上調(diào)用的是 ContinuationImplintercepted 方法:

internal abstract class ContinuationImpl(
    completion: Continuation<Any?>?,
    private val _context: CoroutineContext?
) : BaseContinuationImpl(completion) {
  ...
    public fun intercepted(): Continuation<Any?> =
        intercepted
            ?:(context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
                .also { intercepted = it }
  ...
}

context[ContinuationInterceptor]?.interceptContinuation調(diào)用的是 CoroutineDispatcherinterceptContinuation 方法:

    public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
        DispatchedContinuation(this, continuation)

內(nèi)部創(chuàng)建了一個 DispatchedContinuation 可分發(fā)的協(xié)程實例嚷兔,我們繼續(xù)進到看resumeCancellableWith 方法:

internal class DispatchedContinuation<in T>(
    @JvmField val dispatcher: CoroutineDispatcher,
    @JvmField val continuation: Continuation<T>
) : DispatchedTask<T>(MODE_UNINITIALIZED), CoroutineStackFrame, Continuation<T> by continuation {
  ...
  
  public fun <T> Continuation<T>.resumeCancellableWith(
    result: Result<T>,
    onCancellation: ((cause: Throwable) -> Unit)? = null
): Unit = when (this) {
  // 判斷是否是DispatchedContinuation 根據(jù)我們前面的代碼追蹤 這里是DispatchedContinuation
    is DispatchedContinuation -> resumeCancellableWith(result, onCancellation)
    else -> resumeWith(result)
}

inline fun resumeCancellableWith(
        result: Result<T>,
        noinline onCancellation: ((cause: Throwable) -> Unit)?
    ) {
        val state = result.toState(onCancellation)
    // 判斷是否需要線程調(diào)度 
   // 由于我們之前使用的是 `GlobalScope.launch(Main)` Android主線程調(diào)度器所以這里為true     
        if (dispatcher.isDispatchNeeded(context)) {
            _state = state
            resumeMode = MODE_CANCELLABLE
            dispatcher.dispatch(context, this)
        } else {
            executeUnconfined(state, MODE_CANCELLABLE) {
                if (!resumeCancelled(state)) {
                    resumeUndispatchedWith(result)
                }
            }
        }
}
  
  ...
}

最終走到 dispatcher.dispatch(context, this) 而這里的 dispatcher 就是通過工廠方法創(chuàng)建的 HandlerDispatcherdispatch() 函數(shù)第二個參數(shù)this是一個runnable這里為 DispatchedTask

HandlerDispatcher

internal class HandlerContext private constructor(
    private val handler: Handler,
    private val name: String?,
    private val invokeImmediately: Boolean
) : HandlerDispatcher(), Delay {
  ...
  
   //  最終執(zhí)行這里的 dispatch方法 而handler則是android中的 MainHandler
    override fun dispatch(context: CoroutineContext, block: Runnable) {
        if (!handler.post(block)) {
            cancelOnRejection(context, block)
        }
    }
  
  ...
}

這里借用 Android 的主線程消息隊列來在主線程中執(zhí)行 block Runnable而這個 Runnable 即為 DispatchedTask

internal abstract class DispatchedTask<in T>(
    @JvmField public var resumeMode: Int
) : SchedulerTask() {
  ...
 public final override fun run() {
            ...
            withContinuationContext(continuation, delegate.countOrElement) {
                 ...
                if (job != null && !job.isActive) {
                    val cause = job.getCancellationException()
                    cancelCompletedResult(state, cause)
                    // 異常情況下
                    continuation.resumeWithStackTrace(cause)
                } else {
                    if (exception != null) {
                       // 異常情況下
                       continuation.resumeWithException(exception)
                    } else {
                      // step1:正常情況下走到這一步
                       continuation.resume(getSuccessfulResult(state))
                    }
                }
            }
           ...
   }
}

//step2:這是Continuation的擴展函數(shù)做入,內(nèi)部調(diào)用了resumeWith()
@InlineOnly public inline fun <T> Continuation<T>.resume(value: T): Unit =
    resumeWith(Result.success(value))


//step3:最終會調(diào)用到BaseContinuationImpl的resumeWith()方法中
internal abstract class BaseContinuationImpl(...) {
    // 實現(xiàn) Continuation 的 resumeWith冒晰,并且是 final 的,不可被重寫
    public final override fun resumeWith(result: Result<Any?>) {
        ...
        val outcome = invokeSuspend(param)
        ...
    }
    // 由編譯生成的協(xié)程相關(guān)類來實現(xiàn)竟块,例如 CoroutineExample$main$1
    protected abstract fun invokeSuspend(result: Result<Any?>): Any?
}

最終調(diào)用到 continuation.resumeWith()resumeWith() 中會調(diào)用 invokeSuspend壶运,即之前編譯器生成的 SuspendLambda 中的 invokeSuspend 方法:

 @Nullable
     public final Object invokeSuspend(@NotNull Object $result) {
            Object var2 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
            switch (this.label) {
               case 0:
                  ResultKt.throwOnFailure($result);
                  CoroutineExample var10000 = CoroutineExample.this;
                  this.label = 1;
                  if (var10000.request(this) == var2) {
                     return var2;
                  }
                  break;
               case 1:
                  ResultKt.throwOnFailure($result);
                  break;
      }
 }

這段代碼是一個狀態(tài)機機制,每一個掛起點都是一種狀態(tài)彩郊,協(xié)程恢復(fù)只是跳轉(zhuǎn)到下一個狀態(tài)前弯,掛起點將執(zhí)行過程分割成多個片段,利用狀態(tài)機的機制保證各個片段按順序執(zhí)行秫逝。

可以看到協(xié)程非阻塞的異步底層實現(xiàn)其實就是一種Callback回調(diào)(這一點我們在介紹Continuation時有提到過),只不過有多個掛起點時就會有多個Callback回調(diào)询枚,這里協(xié)程把多個Callback回調(diào)封裝成了一個狀態(tài)機违帆。

以上就是協(xié)程的啟動過程,下面我們再來看下協(xié)程中的重點掛起恢復(fù)金蜀。

協(xié)程的掛起與恢復(fù)

協(xié)程的啟動刷后,掛起和恢復(fù)有兩個關(guān)鍵方法: invokeSuspend()resumeWith(Result)。我們以上一節(jié)中的例子渊抄,反編譯后逆向剖析協(xié)程的掛起和恢復(fù)尝胆,先整體看下是怎樣的一個過程。

suspend fun reqeust(): String {
    delay(2000)
    return "result from request"
}

反編譯后的代碼如下(為了方便理解护桦,代碼有刪減和修改):

//1.函數(shù)返回值由String變成Object含衔,入?yún)⒁苍黾恿薈ontinuation參數(shù)
public final Object reqeust(@NotNull Continuation completion) {
   //2.通過completion創(chuàng)建一個ContinuationImpl,并且復(fù)寫了invokeSuspend()
   Object continuation;
   if (completion instanceof <undefinedtype>){
     continuation =  <undefinedtype>completion
   }else{
     continuation = new ContinuationImpl(completion) {
       Object result;
       int label; //初始值為0
        
       @Nullable
       public final Object invokeSuspend(@NotNull Object $result) {
          this.result = $result;
          this.label |= Integer.MIN_VALUE;
          return request(this);//又調(diào)用了requestUserInfo()方法
       }
    };
  }

   Object $result = (continuation).result;
   Object var4 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
   //狀態(tài)機  
   //3.方法被恢復(fù)的時候又會走到這里,第一次進入case 0分支贪染,label的值從0變?yōu)?缓呛,第二次進入就會走case 1分支
   switch(continuation.label) {
       case 0:
          ResultKt.throwOnFailure($result);
          continuation.label = 1;
          //4.delay()方法被suspend修飾,傳入一個continuation回調(diào)杭隙,返回一個object結(jié)果哟绊。這個結(jié)果要么是`COROUTINE_SUSPENDED`,否則就是真實結(jié)果痰憎。
          Object delay = DelayKt.delay(2000L, continuation)
          if (delay == var4) {//如果是COROUTINE_SUSPENDED則直接return票髓,就不會往下執(zhí)行了,requestUserInfo()被暫停了铣耘。
             return var4;
          }
          break;
       case 1:
          ResultKt.throwOnFailure($result);
          break;
       default:
          throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
       }
   return "result from request";
}

掛起過程:

  1. 函數(shù)返回值由 String 變成 Object炬称,編譯器自動增加了Continuation參數(shù),相當(dāng)于幫我們添加Callback涡拘。
  2. 根據(jù) completion 創(chuàng)建了一個 ContinuationImpl(如果已經(jīng)創(chuàng)建就直接用玲躯,避免重復(fù)創(chuàng)建),復(fù)寫了 invokeSuspend() 方法鳄乏,在這個方法里面它又調(diào)用了 request() 方法跷车,這里又調(diào)用了一次自己(是不是很神奇),并且把 continuation 傳遞進去橱野。
  3. 在 switch 語句中朽缴,label 的默認初始值為 0,第一次會進入 case 0 分支水援,delay() 是一個掛起函數(shù)密强,傳入上面的 continuation 參數(shù),會有一個 Object 類型的返回值蜗元。這個結(jié)果要么是COROUTINE_SUSPENDED或渤,否則就是真實結(jié)果。(關(guān)于delay是如何返回COROUTINE_SUSPENDED奕扣,可自行跟下源碼薪鹦,這里就不展開)
  4. DelayKt.delay(2000, continuation)的返回結(jié)果如果是 COROUTINE_SUSPENDED, 則直接 return 惯豆,那么方法執(zhí)行就被結(jié)束了池磁,方法就被掛起了。
  • 函數(shù)即便被 suspend 修飾了楷兽,但是也未必會掛起地熄。需要里面的代碼編譯后有返回值為 COROUTINE_SUSPENDED 這樣的標記位才可以。
  • 協(xié)程的掛起實際是方法的掛起芯杀,本質(zhì)是return端考。

恢復(fù)過程:

  1. 因為 delay() 是 IO操作雅潭,在2000ms后就會通過傳遞給它的 continuation 回調(diào)回來。

  2. 回調(diào)到 ContinuationImpl 類的 resumeWith() 方法跛梗,會再次調(diào)用 invokeSuspend() 方法寻馏,進而再次調(diào)用 requestUserInfo() 方法。

  3. 程序會再次進入switch語句核偿,由于第一次在 case 0 時把 label = 1 賦值為1诚欠,所以這次會進入 case 1 分支,并且返回了結(jié)果result from request漾岳。

  4. 并且 request() 的返回值作為 invokeSuspend() 的返回值返回轰绵。重新被執(zhí)行的時候就代表著方法被恢復(fù)了。

看到大家一定會疑問尼荆,步驟2中invokeSuspend() 是如何被再次調(diào)用呢左腔?我們都知道 ContinuationImpl 的父類是 BaseContinuationImpl,實際上ContinuationImpl中調(diào)用的resumeWith()是來自父類BaseContinuationImpl捅儒。

internal abstract class BaseContinuationImpl(
    public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {
    //這個實現(xiàn)是最終的液样,用于展開 resumeWith 遞歸。
    public final override fun resumeWith(result: Result<Any?>) {
        var current = this
        var param = result
        while (true) {
            with(current) {
                val completion = completion!!
                val outcome: Result<Any?> =
                    try {
                        // 1.調(diào)用 invokeSuspend()方法執(zhí)行巧还,執(zhí)行協(xié)程的真正運算邏輯鞭莽,拿到返回值
                        val outcome = invokeSuspend(param)
                        // 2.如果返回的還是COROUTINE_SUSPENDED則提前結(jié)束
                        if (outcome == COROUTINE_SUSPENDED) return 
                        Result.success(outcome)
                    } catch (exception: Throwable) {
                        Result.failure(exception)
                    }
                if (completion is BaseContinuationImpl) {
                    //3.如果 completion 是 BaseContinuationImpl,內(nèi)部還有suspend方法麸祷,則會進入循環(huán)遞歸澎怒,繼續(xù)執(zhí)行和恢復(fù)
                    current = completion
                    param = outcome
                } else {
                    //4.否則是最頂層的completion,則會調(diào)用resumeWith恢復(fù)上一層并且return
                    // 這里實際調(diào)用的是其父類 AbstractCoroutine 的 resumeWith 方法
                    completion.resumeWith(outcome)
                    return
                }
            }
        }
    }

實際上任何一個掛起函數(shù)它在恢復(fù)的時候都會調(diào)到 BaseContinuationImplresumeWith() 方法里面阶牍。

  1. 一但 invokeSuspend() 方法被執(zhí)行喷面,那么 request() 又會再次被調(diào)用, invokeSuspend() 就會拿到 request() 的返回值,在 ContinuationImpl 里面根據(jù) val outcome = invokeSuspend() 的返回值來判斷我們的 request() 方法恢復(fù)了之后的操作走孽。
  2. 如果 outcomeCOROUTINE_SUSPENDED 常量(可能掛起函數(shù)中又返回了一個掛起函數(shù))惧辈,說明你即使被恢復(fù)了,執(zhí)行了一下融求, if (outcome == COROUTINE_SUSPENDED) return但是立馬又被掛起了咬像,所以又 return 了。
  3. 如果本次恢復(fù) outcome 是一個正常的結(jié)果生宛,就會走到 completion.resumeWith(outcome),當(dāng)前被掛起的方法已經(jīng)被執(zhí)行完了肮柜,實際調(diào)用的是其父類 AbstractCoroutineresumeWith 方法陷舅,那么協(xié)程就恢復(fù)了。

我們知道 request() 肯定是會被協(xié)程調(diào)用的(從上面反編譯代碼知道會傳遞一個Continuation completion參數(shù))审洞,request() 方法恢復(fù)完了就會讓協(xié)程completion.resumeWith()去恢復(fù)莱睁,所以說協(xié)程的恢復(fù)是方法的恢復(fù)待讳,本質(zhì)其實是callback(resumeWith)回調(diào)。

一張圖總結(jié)一下:

協(xié)程的核心是掛起——恢復(fù)仰剿,掛起——恢復(fù)的本質(zhì)是return & callback回調(diào)

image-20240309151135185.png

協(xié)程掛起

我們說過協(xié)程啟動后會調(diào)用到上面這個 resumeWith() 方法创淡,接著調(diào)用其 invokeSuspend() 方法:

  1. 當(dāng) invokeSuspend() 返回 COROUTINE_SUSPENDED 后,就直接 return 終止執(zhí)行了南吮,此時協(xié)程被掛起琳彩。
  2. 當(dāng) invokeSuspend() 返回非 COROUTINE_SUSPENDED 后,說明協(xié)程體執(zhí)行完畢了部凑,對于 launch 啟動的協(xié)程體露乏,傳入的 completion 是 AbstractCoroutine 子類對象,最終會調(diào)用其 AbstractCoroutine.resumeWith() 方法做一些狀態(tài)改變之類的收尾邏輯涂邀。至此協(xié)程便執(zhí)行完畢了瘟仿。

協(xié)程恢復(fù)

這里我們接著看上面第一條:協(xié)程執(zhí)行到掛起函數(shù)被掛起后,當(dāng)這個掛起函數(shù)執(zhí)行完畢后是怎么恢復(fù)協(xié)程的比勉,以下面掛起函數(shù)為例:

private suspend fun login() = withContext(Dispatchers.IO) {
    Thread.sleep(2000)
    return@withContext true
}

通過反編譯可以看到上面掛起函數(shù)中的函數(shù)體也被編譯成了 SuspendLambda 的子類劳较,創(chuàng)建其實例時也需要傳入 Continuation 續(xù)體參數(shù)(調(diào)用該掛起函數(shù)的協(xié)程所在續(xù)體)。貼下 withContext 的源碼:

public suspend fun <T> withContext(
    context: CoroutineContext,
    block: suspend CoroutineScope.() -> T
): T {
    return suspendCoroutineUninterceptedOrReturn sc@ { uCont ->
        // 創(chuàng)建 new context
        val oldContext = uCont.context
        val newContext = oldContext + context
        // 檢查新上下文是否作廢
        newContext.ensureActive()
        // 新上下文與舊上下文相同
        if (newContext === oldContext) {
            val coroutine = ScopeCoroutine(newContext, uCont)
            return@sc coroutine.startUndispatchedOrReturn(coroutine, block)
        }
        // 新調(diào)度程序與舊調(diào)度程序相同
        if (newContext[ContinuationInterceptor] == oldContext[ContinuationInterceptor]) {
            val coroutine = UndispatchedCoroutine(newContext, uCont)
            // 上下文有變化浩聋,所以這個線程需要更新
            withCoroutineContext(newContext, null) {
                return@sc coroutine.startUndispatchedOrReturn(coroutine, block)
            }
        }
        // 使用新的調(diào)度程序
        val coroutine = DispatchedCoroutine(newContext, uCont)
        block.startCoroutineCancellable(coroutine, coroutine)
        coroutine.getResult()
    }
}

首先調(diào)用了 suspendCoroutineUninterceptedOrReturn 方法观蜗,看注釋知道可以通過它來獲取到當(dāng)前的續(xù)體對象 uCont, 接著有幾條分支調(diào)用,但最終都是會通過續(xù)體對象來創(chuàng)建掛起函數(shù)體對應(yīng)的 SuspendLambda 對象赡勘,并執(zhí)行其 invokeSuspend() 方法嫂便,在其執(zhí)行完畢后調(diào)用 uCont.resume() 來恢復(fù)協(xié)程,具體邏輯大家感興趣可以自己跟代碼闸与,與前面大同小異毙替。

至于其他的頂層掛起函數(shù)如 await(), suspendCoroutine(), suspendCancellableCoroutine() 等,其內(nèi)部也是通過 suspendCoroutineUninterceptedOrReturn() 來獲取到當(dāng)前的續(xù)體對象践樱,以便在掛起函數(shù)體執(zhí)行完畢后厂画,能通過這個續(xù)體對象恢復(fù)協(xié)程執(zhí)行。

附錄

附錄線程和協(xié)程間關(guān)系總結(jié)(總結(jié)來源)拷邢,加深理解袱院。

線程:

  • 線程是操作系統(tǒng)級別的概念
  • 我們開發(fā)者通過編程語言(Thread.java)創(chuàng)建的線程,本質(zhì)還是操作系統(tǒng)內(nèi)核線程的映射
  • JVM 中的線程與內(nèi)核線程的存在映射關(guān)系瞭稼,有“一對一”忽洛,“一對多”,“M對N”环肘。JVM 在不同操作系統(tǒng)中的具體實現(xiàn)會有差別欲虚,“一對一”是主流
  • 一般情況下,我們說的線程悔雹,都是內(nèi)核線程复哆,線程之間的切換欣喧,調(diào)度,都由操作系統(tǒng)負責(zé)
  • 線程也會消耗操作系統(tǒng)資源梯找,但比進程輕量得多
  • 線程唆阿,是搶占式的,它們之間能共享內(nèi)存資源锈锤,進程不行
  • 線程共享資源導(dǎo)致了多線程同步問題
  • 有的編程語言會自己實現(xiàn)一套線程庫驯鳖,從而能在一個內(nèi)核線程中實現(xiàn)多線程效果,早期 JVM 的“綠色線程” 就是這么做的牙咏,這種線程被稱為“用戶線程”

協(xié)程:

  • Kotlin 協(xié)程臼隔,不是操作系統(tǒng)級別的概念,無需操作系統(tǒng)支持
  • Kotlin 協(xié)程妄壶,有點像上面提到的“綠色線程”摔握,一個線程上可以運行成千上萬個協(xié)程
  • Kotlin 協(xié)程,是用戶態(tài)的(userlevel)丁寄,內(nèi)核對協(xié)程無感知
  • Kotlin 協(xié)程氨淌,是協(xié)作式的,由開發(fā)者管理伊磺,不需要操作系統(tǒng)進行調(diào)度和切換盛正,也沒有搶占式的消耗,因此它更加高效
  • Kotlin 協(xié)程屑埋,它底層基于狀態(tài)機實現(xiàn)豪筝,多協(xié)程之間共用一個實例,資源開銷極小摘能,因此它更加輕量
  • Kotlin 協(xié)程续崖,本質(zhì)還是運行于線程之上,它通過協(xié)程調(diào)度器团搞,可以運行到不同的線程上
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末严望,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子逻恐,更是在濱河造成了極大的恐慌像吻,老刑警劉巖,帶你破解...
    沈念sama閱讀 219,427評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件复隆,死亡現(xiàn)場離奇詭異拨匆,居然都是意外死亡,警方通過查閱死者的電腦和手機挽拂,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,551評論 3 395
  • 文/潘曉璐 我一進店門涮雷,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人轻局,你說我怎么就攤上這事洪鸭。” “怎么了仑扑?”我有些...
    開封第一講書人閱讀 165,747評論 0 356
  • 文/不壞的土叔 我叫張陵览爵,是天一觀的道長。 經(jīng)常有香客問我镇饮,道長蜓竹,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,939評論 1 295
  • 正文 為了忘掉前任储藐,我火速辦了婚禮俱济,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘钙勃。我一直安慰自己蛛碌,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 67,955評論 6 392
  • 文/花漫 我一把揭開白布辖源。 她就那樣靜靜地躺著蔚携,像睡著了一般。 火紅的嫁衣襯著肌膚如雪克饶。 梳的紋絲不亂的頭發(fā)上酝蜒,一...
    開封第一講書人閱讀 51,737評論 1 305
  • 那天,我揣著相機與錄音矾湃,去河邊找鬼亡脑。 笑死,一個胖子當(dāng)著我的面吹牛邀跃,可吹牛的內(nèi)容都是我干的霉咨。 我是一名探鬼主播,決...
    沈念sama閱讀 40,448評論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼坞嘀,長吁一口氣:“原來是場噩夢啊……” “哼躯护!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起丽涩,我...
    開封第一講書人閱讀 39,352評論 0 276
  • 序言:老撾萬榮一對情侶失蹤棺滞,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后矢渊,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體继准,經(jīng)...
    沈念sama閱讀 45,834評論 1 317
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,992評論 3 338
  • 正文 我和宋清朗相戀三年矮男,在試婚紗的時候發(fā)現(xiàn)自己被綠了移必。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,133評論 1 351
  • 序言:一個原本活蹦亂跳的男人離奇死亡毡鉴,死狀恐怖崔泵,靈堂內(nèi)的尸體忽然破棺而出秒赤,到底是詐尸還是另有隱情,我是刑警寧澤憎瘸,帶...
    沈念sama閱讀 35,815評論 5 346
  • 正文 年R本政府宣布入篮,位于F島的核電站,受9級特大地震影響幌甘,放射性物質(zhì)發(fā)生泄漏潮售。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,477評論 3 331
  • 文/蒙蒙 一锅风、第九天 我趴在偏房一處隱蔽的房頂上張望酥诽。 院中可真熱鬧,春花似錦皱埠、人聲如沸肮帐。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,022評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽泪姨。三九已至,卻和暖如春饰抒,著一層夾襖步出監(jiān)牢的瞬間肮砾,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,147評論 1 272
  • 我被黑心中介騙來泰國打工袋坑, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留仗处,地道東北人。 一個月前我還...
    沈念sama閱讀 48,398評論 3 373
  • 正文 我出身青樓枣宫,卻偏偏與公主長得像婆誓,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子也颤,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,077評論 2 355

推薦閱讀更多精彩內(nèi)容