通過上篇文章大致理解了協(xié)程框架是怎么運行的奴紧,知道了作用域CoroutinScope,上下文CoroutinContext厅篓,續(xù)體Continuation酣栈,攔截器CorotineDispatcher惰说,協(xié)程線程池CoroutineScheduler這些構成協(xié)程運行的基本要素舆乔,通過源碼跟蹤知道協(xié)程就是通過續(xù)體的不斷交互和攔截器線程池的調度達到切換線程運行的目的岳服,但這顯然并不是協(xié)程的精髓,協(xié)程真正的精髓在于讓我們可以寫同步代碼一樣實現(xiàn)異步操作希俩,而不需要用傳統(tǒng)的回調接口派阱。
沒錯,就是它賴以成名的掛起和恢復機制斜纪,這個機制才是協(xié)程的精華和實用性的代表。
協(xié)程的掛起和恢復
所謂協(xié)程的掛起和恢復文兑,我認為可以這樣描述:協(xié)程的掛起就是讓運行在A線程的協(xié)程里的掛起函數(shù)(suspend修飾)可以切換到B線程去執(zhí)行一段時間(通常是耗時操作)盒刚,此時外面的協(xié)程處于掛起狀態(tài)(線程A沒有阻塞等待,而是該干嘛干嘛)绿贞,等協(xié)程掛起函數(shù)執(zhí)行完后(通常還帶有返回結果)恢復(實際上是主動通知協(xié)程該恢復了)到協(xié)程里繼續(xù)執(zhí)行因块,此時的代碼編寫形式完全不需要回調接口的參與,都是同步寫在協(xié)程體里的籍铁。
這種形式的寫法避免了以前回調地獄的問題涡上,但是本質上還是線程切換,只是寫法上更人性化更優(yōu)雅拒名。
很多人說這樣更具性能吩愧,在協(xié)程數(shù)量不多的情況下,其實不見得更具性能增显。協(xié)程可以看做是一個基于線程的框架實現(xiàn)雁佳,在我看來,協(xié)程的最大亮點就是共享了線程池和掛起恢復機制同云。
舉個例子吧:
GlobalScope.launch(Dispatchers.Main){
println("launch start in Thread:${Thread.currentThread().name}")
// 掛起點糖权,主要是withContext函數(shù)實現(xiàn)了掛起和恢復
val withResult = withContext(Dispatchers.IO){
println("withContext run in Thread:${Thread.currentThread().name}")
"withContext success"
}
// 恢復點
println("launch end in Thread:${Thread.currentThread().name}, withResult = $withResult")
}
執(zhí)行結果:
2023-08-31 18:00:34.193 4994-4994/com.example.coroutinexe I/System.out: launch start in Thread:main
2023-08-31 18:00:34.196 4994-6990/com.example.coroutinexe I/System.out: withContext run in Thread:DefaultDispatcher-worker-1
2023-08-31 18:00:34.199 4994-4994/com.example.coroutinexe I/System.out: launch end in Thread:main, withResult = withContext success
以上例子很好地詮釋了協(xié)程的掛起和恢復機制,沒有回調接口炸站,都是以串行的順序執(zhí)行代碼星澳,非常容易閱讀。接下來就跟蹤一下withContext函數(shù)看看它是怎么實現(xiàn)協(xié)程的掛起和恢復的旱易。
繼續(xù)上篇分析到的BaseContinuationImpl的resumeWith(result: Result<Any?>)方法禁偎,那里是真正執(zhí)行invokeSuspend()也就是我們協(xié)程體的地方:
BaseContinuationImpl:
public final override fun resumeWith(result: Result<Any?>) {
// This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
var current = this
var param = result
while (true) {
// Invoke "resume" debug probe on every resumed continuation, so that a debugging library infrastructure
// can precisely track what part of suspended callstack was already resumed
probeCoroutineResumed(current)
with(current) { // 循環(huán)腿堤,主要是循環(huán)執(zhí)行invokeSuspend,根據(jù)該方法里的lable狀態(tài)執(zhí)行不同的掛起函數(shù)
val completion = completion!! // fail fast when trying to resume continuation without completion
val outcome: Result<Any?> =
try {
// 執(zhí)行協(xié)程體
val outcome = invokeSuspend(param)
// 重點關注這里如果返回值是COROUTINE_SUSPENDED届垫,則跳出循環(huán)释液,也就是不再執(zhí)行invokeSuspend方法,其實這就是掛起點了
// 那就看看什么時候invokeSuspend會返回COROUTINE_SUSPENDED呢装处,這就需要往invokeSuspend追溯
if (outcome === COROUTINE_SUSPENDED) return
Result.success(outcome)
} catch (exception: Throwable) {
Result.failure(exception)
}
releaseIntercepted() // this state machine instance is terminating
if (completion is BaseContinuationImpl) {
// unrolling recursion via loop
current = completion
param = outcome
} else {
// top-level completion reached -- invoke and return
completion.resumeWith(outcome)
return
}
}
}
}
invokeSuspend方法:
int label;
/*
* Unable to fully structure code
* Enabled aggressive block sorting
* Lifted jumps to return sites
*/
@Nullable
public final Object invokeSuspend(@NotNull Object var1_1) {
var3_2 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
switch (this.label) {
case 0: {
ResultKt.throwOnFailure((Object)var1_1);
System.out.println((Object)Intrinsics.stringPlus((String)"launch start in Thread:", (Object)Thread.currentThread().getName()));
this.label = 1;
// 重點看這里误债,這個時候的withContext方法返回的不是我們定義里面的block里定義的"withContext success",注意別搞混妄迁,這就要去看看withContext的實現(xiàn)
v0 = BuildersKt.withContext((CoroutineContext)((CoroutineContext)Dispatchers.getIO()), (Function2)((Function2)new Function2<CoroutineScope, Continuation<? super String>, Object>(null){
int label;
@Nullable
public final Object invokeSuspend(@NotNull Object object) {
IntrinsicsKt.getCOROUTINE_SUSPENDED();
switch (this.label) {
case 0: {
ResultKt.throwOnFailure((Object)object);
System.out.println((Object)Intrinsics.stringPlus((String)"withContext run in Thread:", (Object)Thread.currentThread().getName()));
return "withContext success";
}
}
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
@NotNull
public final Continuation<Unit> create(@Nullable Object value, @NotNull Continuation<?> $completion) {
return (Continuation)new /* invalid duplicate definition of identical inner class */;
}
@Nullable
public final Object invoke(@NotNull CoroutineScope p1, @Nullable Continuation<? super String> p2) {
return (this.create((Object)p1, p2)).invokeSuspend((Object)Unit.INSTANCE);
}
}), (Continuation)((Continuation)this));
// 如果返回值等于COROUTINE_SUSPENDED寝蹈,就return COROUTINE_SUSPENDED,這不就巧了登淘,正是上面resumeWith分析到的情況
if (v0 == var3_2) {
return var3_2;
}
** GOTO lbl14
}
case 1: {
ResultKt.throwOnFailure((Object)$result);
v0 = $result;
lbl14:
// 2 sources
withResult = (String)v0;
System.out.println((Object)("launch end in Thread:" + Thread.currentThread().getName() + ", withResult = " + withResult));
return Unit.INSTANCE;
}
}
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
協(xié)程的掛起
// context參數(shù)一般傳調度器Dispatchers,代碼塊會運行在指定的線程池
public suspend fun <T> withContext(
context: CoroutineContext,
block: suspend CoroutineScope.() -> T
): T {
contract {
callsInPlace(block, InvocationKind.EXACTLY_ONCE)
}
// 返回啟動withContext的協(xié)程體
return suspendCoroutineUninterceptedOrReturn sc@ { uCont ->
// 構建一個新的newContext箫老,合并當前協(xié)程體以及withContext協(xié)程體的CoroutineContext
val oldContext = uCont.context
val newContext = oldContext + context
// 檢查協(xié)程是否活躍,如果線程處于非活躍的狀態(tài)黔州,拋出cancle異常
newContext.checkCompletion()
...
// DispatchedCoroutine也是一個AbstractCoroutine對象耍鬓,負責協(xié)程完成的回調,
// 注意這里的Continuation的傳參為uCont流妻,及發(fā)起withContext的協(xié)程對象
val coroutine = DispatchedCoroutine(newContext, uCont)
coroutine.initParentJob()
// 和協(xié)程啟動的流程一樣牲蜀,啟動withContext的協(xié)程
// 注意這里的傳參coroutine為DispatchedCoroutine,它持有需要恢復的協(xié)程 绅这,這時候已經(jīng)開啟了block代碼的協(xié)程了涣达,可能是異步的
block.startCoroutineCancellable(coroutine, coroutine)
// 但是此時會返回一個結果,就是告訴父類協(xié)程這里要掛起了证薇,你被掛起了度苔,按照上面的分析,這里應該是返回了 COROUTINE_SUSPENDED
coroutine.getResult()
}
}
看下來就是執(zhí)行了suspendCoroutineUninterceptedOrReturn函數(shù)浑度,這個函數(shù)看不到源碼寇窑,是屬于kotlin編譯器的內(nèi)建函數(shù),看不到也不用太糾結箩张,只要記住那個uCont參數(shù)是withContext外面的協(xié)程續(xù)體即可疗认,它用來恢復外面的協(xié)程。
DispatchedCoroutine:
// 定義的三種狀態(tài) 待定伏钠、掛起横漏、恢復
private const val UNDECIDED = 0
private const val SUSPENDED = 1
private const val RESUMED = 2
// Used by withContext when context dispatcher changes
internal class DispatchedCoroutine<in T>(
context: CoroutineContext,
uCont: Continuation<T>
) : ScopeCoroutine<T>(context, uCont) {
// this is copy-and-paste of a decision state machine inside AbstractionContinuation
// todo: we may some-how abstract it via inline class
private val _decision = atomic(UNDECIDED)
// 第一次調用trySuspend的時候肯定返回了true,看他的命名就很有意思
private fun trySuspend(): Boolean {
_decision.loop { decision ->
when (decision) {
UNDECIDED -> if (this._decision.compareAndSet(UNDECIDED, SUSPENDED)) return true
RESUMED -> return false
else -> error("Already suspended")
}
}
}
private fun tryResume(): Boolean {
_decision.loop { decision ->
when (decision) {
UNDECIDED -> if (this._decision.compareAndSet(UNDECIDED, RESUMED)) return true
SUSPENDED -> return false
else -> error("Already resumed")
}
}
}
override fun afterCompletion(state: Any?) {
// Call afterResume from afterCompletion and not vice-versa, because stack-size is more
// important for afterResume implementation
afterResume(state)
}
override fun afterResume(state: Any?) {
if (tryResume()) return // completed before getResult invocation -- bail out
// Resume in a cancellable way because we have to switch back to the original dispatcher
uCont.intercepted().resumeCancellableWith(recoverResult(state, uCont))
}
// 第一次調用getResult的時候肯定返回了COROUTINE_SUSPENDED
fun getResult(): Any? {
if (trySuspend()) return COROUTINE_SUSPENDED
// otherwise, onCompletionInternal was already invoked & invoked tryResume, and the result is in the state
val state = this.state.unboxState()
if (state is CompletedExceptionally) throw state.cause
@Suppress("UNCHECKED_CAST")
return state as T
}
}
至此熟掂,掛起的邏輯算是走通了缎浇,現(xiàn)在來捋一下:
1.父協(xié)程體執(zhí)行的時候遇到了掛起函數(shù)withContext,withContext里開啟自身的協(xié)程執(zhí)行自身的協(xié)程體任務赴肚;
2.同時withContext返回COROUTINE_SUSPENDED
3.父協(xié)程體resumeWith方法里判斷到了返回值是COROUTINE_SUSPENDED素跺,于是結束了resumeWith的執(zhí)行二蓝,父協(xié)程體里面withContext后面的代碼暫停執(zhí)行,但其實沒有阻塞線程
4.父協(xié)程的協(xié)程體SuspendLambda保存了協(xié)程狀態(tài)指厌,記錄了當前掛起的地方刊愚,此時withContext的協(xié)程體在執(zhí)行,父協(xié)程處于掛起狀態(tài)踩验,等待被通知恢復
協(xié)程的恢復
父協(xié)程被掛起后鸥诽,需要怎么恢復呢,首先它肯定不是自己去恢復箕憾,它是等待掛起點去通知它恢復牡借,所以我們要到withContext的協(xié)程體里去找答案,因為只有它才知道自己什么時候執(zhí)行完袭异,然后通知父協(xié)程恢復執(zhí)行钠龙。
所以還是要找到BaseContinuationImpl的resumeWith()方法,那里在執(zhí)行withContext里的協(xié)程體block:
BaseContinuationImpl:
public final override fun resumeWith(result: Result<Any?>) {
// This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
var current = this
var param = result
while (true) {
// Invoke "resume" debug probe on every resumed continuation, so that a debugging library infrastructure
// can precisely track what part of suspended callstack was already resumed
probeCoroutineResumed(current)
with(current) {
val completion = completion!! // fail fast when trying to resume continuation without completion
val outcome: Result<Any?> =
try {
// 執(zhí)行協(xié)程體御铃,注意此時執(zhí)行的是掛起函數(shù)的block碴里,正常會返回我們寫的"withContext success"字符串
val outcome = invokeSuspend(param)
if (outcome === COROUTINE_SUSPENDED) return
Result.success(outcome)
} catch (exception: Throwable) {
Result.failure(exception)
}
releaseIntercepted() // this state machine instance is terminating
// 注意這個completion是我們創(chuàng)建的DispatchedCoroutine
if (completion is BaseContinuationImpl) {
// unrolling recursion via loop
current = completion
param = outcome
} else {
// top-level completion reached -- invoke and return
// 執(zhí)行完后走這里,執(zhí)行DispatchedCoroutine的resumeWith上真,這里其實就是協(xié)程掛起點的恢復標志了
completion.resumeWith(outcome)
return
}
}
}
}
DispatchedCoroutine:
// DispatchedCoroutine父類AbstractCoroutine的方法
public final override fun resumeWith(result: Result<T>) {
val state = makeCompletingOnce(result.toState())
if (state === COMPLETING_WAITING_CHILDREN) return
afterResume(state)
}
// DispatchedCoroutine自身實現(xiàn)的方法
override fun afterResume(state: Any?) {
if (tryResume()) return // completed before getResult invocation -- bail out
// Resume in a cancellable way because we have to switch back to the original dispatcher
// 注意這個uCont是父協(xié)程的續(xù)體咬腋,他是個SuspendLambda,最終它又開啟了攔截器攔截方法(所以說協(xié)程掛起
// 恢復后執(zhí)行的線程未必是掛起之前運行的線程谷羞,因為它又攔截了一次,也就又讓線程池分配了一次)溜徙,并在之前保存的狀態(tài)處恢復了父協(xié)程的協(xié)程體代碼執(zhí)行
uCont.intercepted().resumeCancellableWith(recoverResult(state, uCont))
}
以上便是協(xié)程掛起后恢復的邏輯湃缎,關鍵還是對續(xù)體Continuation的巧妙調用和狀態(tài)機模式的靈活設計,理解起來倒是不難蠢壹。
另外提一個知識點嗓违,掛起函數(shù)一般有個關鍵字suspend修飾,但并不代表有suspend修飾的方法都會被掛起图贸,只有像delay和withContext設計返回了那個掛起標志COROUTINE_SUSPENDED才會掛起蹂季,事實上,關鍵字suspend修飾的方法只是會讓kotlin編譯器在編譯的時候新增一個父協(xié)程的續(xù)體入?yún)⑹枞眨绻銢]有手動調用這個續(xù)體的恢復方法偿洁,那么也不會執(zhí)行恢復操作會一直掛起。
下篇會對協(xié)程的線程池展開分析沟优。