協(xié)程到底是怎么創(chuàng)建和啟動(dòng)的肖油?本篇文章帶你揭曉。
createCoroutine 和 startCoroutine
在Continuation.kt文件中,有2個(gè)基礎(chǔ)API稚疹,這里單獨(dú)提出來(lái)說(shuō)一下镀钓,方便后面我們理解launch穗熬。
public fun <T> (suspend () -> T).createCoroutine(
completion: Continuation<T>
): Continuation<Unit> =
SafeContinuation(createCoroutineUnintercepted(completion).intercepted(), COROUTINE_SUSPENDED)
public fun <T> (suspend () -> T).startCoroutine(
completion: Continuation<T>
) {
createCoroutineUnintercepted(completion).intercepted().resume(Unit)
}
createCoroutine和startCoroutine就是用來(lái)創(chuàng)建和啟動(dòng)協(xié)程的基礎(chǔ)API,launch掸宛、async等在底層一定程度上都使用了該基礎(chǔ)API死陆,launch和async只不過(guò)是封裝而已。所以唧瘾,我們先掌握它們措译。
這2個(gè)函數(shù)看起來(lái)差別不大,一個(gè)調(diào)用了resume開(kāi)始了協(xié)程饰序,一個(gè)沒(méi)有調(diào)用领虹,需要外部去調(diào)用resume(createCoroutine會(huì)把Continuation返回出去)。
既然launch和async可以用它們來(lái)創(chuàng)建和啟動(dòng)協(xié)程求豫,那我們是否可以直接用它們來(lái)創(chuàng)建和啟動(dòng)協(xié)程塌衰?那當(dāng)然可以诉稍。這里我舉個(gè)startCoroutine的例子,仔細(xì)看它的函數(shù)聲明最疆,它其實(shí)是個(gè)擴(kuò)展函數(shù)杯巨,擴(kuò)展的是(suspend () -> T)
這種類型。
(suspend () -> T)
:suspend函數(shù)+返回類型是T
它可以有2種寫(xiě)法:
//方式1-----------
val block = suspend {
...
"云天明"
}
block.startCoroutine(continuation)
//方式2--------------
suspend fun getUserName(): String {
...
return "云天明"
}
(::getUserName).startCoroutine(continuation)
一種是匿名的suspend函數(shù)努酸,一種是正常的有名字的suspend函數(shù)》現(xiàn)在,我們簡(jiǎn)單寫(xiě)個(gè)demo來(lái)調(diào)一下startCoroutine获诈。
//StartCoroutine.kt
fun main() {
val continuation = object : Continuation<String> {
override val context: CoroutineContext
get() = EmptyCoroutineContext
override fun resumeWith(result: Result<String>) {
println("結(jié)果: ${result.getOrNull()}")
}
}
block.startCoroutine(continuation)
Thread.sleep(3000L)
}
val block = suspend {
println("start")
delay(2000L)
println("end")
"DX3906"
}
調(diào)起非常簡(jiǎn)單仍源,startCoroutine是(suspend () -> T)
的擴(kuò)展函數(shù),且需要傳遞一個(gè)Continuation參數(shù)舔涎。我們先反編譯看一下笼踩,長(zhǎng)什么樣子。
public final class StartCoroutineKt {
//block那塊被轉(zhuǎn)換成了一個(gè)類StartCoroutineKt$block$1亡嫌,這里創(chuàng)建好一個(gè)實(shí)例對(duì)象嚎于,待會(huì)兒可以直接使用
private static final Function1<Continuation<? super String>, Object> block = new StartCoroutineKt$block$1((Continuation<? super StartCoroutineKt$block$1>) null);
public static final void main() {
//調(diào)用擴(kuò)展函數(shù),將block和continuation參數(shù)傳入挟冠。
ContinuationKt.startCoroutine(block, new StartCoroutineKt$main$continuation$1());
Thread.sleep(3000);
}
public static final Function1<Continuation<? super String>, Object> getBlock() {
return block;
}
}
//對(duì)應(yīng)block那塊
final class StartCoroutineKt$block$1 extends SuspendLambda implements Function1<Continuation<? super String>, Object> {
int label;
StartCoroutineKt$block$1(Continuation<? super StartCoroutineKt$block$1> continuation) {
super(1, continuation);
}
//創(chuàng)建StartCoroutineKt$block$1實(shí)例
public final Continuation<Unit> create(Continuation<?> continuation) {
return new StartCoroutineKt$block$1(continuation);
}
public final Object invoke(Continuation<? super String> continuation) {
//創(chuàng)建StartCoroutineKt$block$1實(shí)例并執(zhí)行invokeSuspend
return ((StartCoroutineKt$block$1) create(continuation)).invokeSuspend(Unit.INSTANCE);
}
public final Object invokeSuspend(Object $result) {
Object coroutine_suspended = IntrinsicsKt.getCOROUTINE_SUSPENDED();
//狀態(tài)機(jī)
switch (this.label) {
case 0:
//label一開(kāi)始是0
ResultKt.throwOnFailure($result);
System.out.println("start");
this.label = 1;
//這里正常情況會(huì)返回COROUTINE_SUSPENDED匾旭,label已經(jīng)改成1了,下次走case 1的邏輯
if (DelayKt.delay(2000, this) != coroutine_suspended) {
break;
} else {
return coroutine_suspended;
}
case 1:
//label為1圃郊,沒(méi)有return价涝,繼續(xù)走最后的結(jié)束語(yǔ)句
ResultKt.throwOnFailure($result);
break;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
//結(jié)束
System.out.println("end");
return "云天明";
}
}
//對(duì)應(yīng)Continuation那塊
public final class StartCoroutineKt$main$continuation$1 implements Continuation<String> {
StartCoroutineKt$main$continuation$1() {
}
public CoroutineContext getContext() {
return EmptyCoroutineContext.INSTANCE;
}
public void resumeWith(Object result) {
//輸出結(jié)果
StringBuilder sb = new StringBuilder();
sb.append("結(jié)果: ");
sb.append((String) (Result.m29isFailureimpl(result) ? null : result));
System.out.println(sb.toString());
}
}
還是比較清晰的,
- 首先
object : Continuation<String>
是肯定會(huì)生成一個(gè)匿名內(nèi)部類持舆,在該類中色瘩,簡(jiǎn)單在resumeWith里面輸出了一下結(jié)果 - block那塊代碼,也會(huì)生成一個(gè)匿名內(nèi)部類逸寓。需要注意的是居兆,它繼承自SuspendLambda,這個(gè)沒(méi)見(jiàn)過(guò)竹伸,待會(huì)兒分析泥栖,里面有幾個(gè)方法:create、invoke勋篓、invokeSuspend吧享。其中create是創(chuàng)建該類的實(shí)例,invoke是調(diào)用create方法并執(zhí)行invokeSuspend譬嚣,invokeSuspend里面是狀態(tài)機(jī)相關(guān)的邏輯钢颂。
- main里面執(zhí)行了
ContinuationKt.startCoroutine(block, continuation)
,調(diào)起了擴(kuò)展方法(擴(kuò)展方法的原理就是這樣的)
反編譯出來(lái)的代碼大致結(jié)構(gòu)我們是了解了拜银,現(xiàn)在需要分析一下startCoroutine具體是怎么走的了殊鞭,看它是怎么利用這些反編譯出來(lái)的代碼的遭垛。
createCoroutineUnintercepted
public fun <T> (suspend () -> T).startCoroutine(
completion: Continuation<T>
) {
createCoroutineUnintercepted(completion).intercepted().resume(Unit)
}
//這個(gè)函數(shù)是expect的,沒(méi)有函數(shù)體
public expect fun <T> (suspend () -> T).createCoroutineUnintercepted(
completion: Continuation<T>
): Continuation<Unit>
startCoroutine首先是調(diào)用了createCoroutineUnintercepted函數(shù)操灿,而createCoroutineUnintercepted是expect的锯仪,它是一種聲明。因?yàn)镵otlin是跨平臺(tái)的趾盐,所以部分邏輯與平臺(tái)相關(guān)卵酪,這個(gè)createCoroutineUnintercepted就是這種。它沒(méi)有函數(shù)體谤碳,我們只關(guān)心JVM平臺(tái),所以需要到JVM平臺(tái)上找該函數(shù)的實(shí)現(xiàn)溢豆。在Kotlin源碼地圖文章中蜒简,我們提到協(xié)程源碼,分為2個(gè)倉(cāng)庫(kù)漩仙,一個(gè)是Kotlin倉(cāng)庫(kù)搓茬,一個(gè)是Kotlin協(xié)程倉(cāng)庫(kù)。這個(gè)createCoroutineUnintercepted是在Kotlin倉(cāng)庫(kù)中队他,具體位置是:kotlin/libraries/stdlib/jvm/src/kotlin/coroutines/intrinsics/IntrinsicsJvm.kt
public actual fun <T> (suspend () -> T).createCoroutineUnintercepted(
completion: Continuation<T>
): Continuation<Unit> {
val probeCompletion = probeCoroutineCreated(completion)
return if (this is BaseContinuationImpl)
//走這里
create(probeCompletion)
else
createCoroutineFromSuspendFunction(probeCompletion) {
(this as Function1<Continuation<T>, Any?>).invoke(it)
}
}
咦卷仑,createCoroutineUnintercepted居然也是(suspend () -> T)
的擴(kuò)展函數(shù),所以if那里的this指的就是block麸折,也就是StartCoroutineKt1锡凝。它繼承自SuspendLambda。
internal abstract class SuspendLambda(
public override val arity: Int,
completion: Continuation<Any?>?
) : ContinuationImpl(completion), FunctionBase<Any?>, SuspendFunction {
constructor(arity: Int) : this(arity, null)
public override fun toString(): String =
if (completion == null)
Reflection.renderLambdaToString(this) // this is lambda
else
super.toString() // this is continuation
}
internal abstract class ContinuationImpl(
completion: Continuation<Any?>?,
private val _context: CoroutineContext?
) : BaseContinuationImpl(completion) {
......
}
//BaseContinuationImpl實(shí)現(xiàn)了Continuation接口
internal abstract class BaseContinuationImpl(
public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {
...
}
SuspendLambda是ContinuationImpl的子類垢啼,而ContinuationImpl是BaseContinuationImpl的子類窜锯。所以上面的if (this is BaseContinuationImpl)
判斷是ok的,會(huì)走到create(probeCompletion)
芭析。也就是StartCoroutineKt$block$1
的create方法锚扎,在里面會(huì)創(chuàng)建StartCoroutineKt$block$1
實(shí)例。
public final Continuation<Unit> create(Continuation<?> continuation) {
return new StartCoroutineKt$block$1(continuation);
}
走到這里相當(dāng)于startCoroutine中的createCoroutineUnintercepted(completion)
這一步就走完了馁启,它最終返回的是StartCoroutineKt$block$1
的實(shí)例驾孔,也就是一個(gè)Continuation。它標(biāo)志著協(xié)程被創(chuàng)建好了惯疙。再來(lái)看下intercepted是什么邏輯
intercepted
public fun <T> (suspend () -> T).startCoroutine(
completion: Continuation<T>
) {
createCoroutineUnintercepted(completion).intercepted().resume(Unit)
}
//好家伙翠勉,intercepted也是expect的
public expect fun <T> Continuation<T>.intercepted(): Continuation<T>
發(fā)現(xiàn)這里的intercepted擴(kuò)展函數(shù)也是expect的,又得去kotlin倉(cāng)庫(kù)里面找jvm相關(guān)的實(shí)現(xiàn)霉颠。我找了下眉菱,路徑在這里:kotlin/libraries/stdlib/jvm/src/kotlin/coroutines/intrinsics/IntrinsicsJvm.kt
public actual fun <T> Continuation<T>.intercepted(): Continuation<T> =
(this as? ContinuationImpl)?.intercepted() ?: this
intercepted是一個(gè)擴(kuò)展函數(shù),這里的this也就是前面createCoroutineUnintercepted(completion)
創(chuàng)建出來(lái)的StartCoroutineKt$block$1
實(shí)例掉分,它本身是SuspendLambda的子類俭缓,而SuspendLambda就是ContinuationImpl的子類克伊。所以這里的as?
會(huì)轉(zhuǎn)換成功,轉(zhuǎn)換出來(lái)的不是null华坦。也就是說(shuō)走到了ContinuationImpl的intercepted()
internal abstract class ContinuationImpl(
completion: Continuation<Any?>?,
private val _context: CoroutineContext?
) : BaseContinuationImpl(completion) {
constructor(completion: Continuation<Any?>?) : this(completion, completion?.context)
//這個(gè)context其實(shí)就是傳入的Continuation中的context
public override val context: CoroutineContext
get() = _context!!
@Transient
private var intercepted: Continuation<Any?>? = null
public fun intercepted(): Continuation<Any?> =
intercepted
?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
.also { intercepted = it }
}
@Transient
private var intercepted: Continuation<Any?>? = null
public fun intercepted(): Continuation<Any?> =
intercepted
?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
.also { intercepted = it }
第一次執(zhí)行這里時(shí)intercepted是null愿吹,那么會(huì)從context中取ContinuationInterceptor,而context就是Continuation傳入的context惜姐,我們傳入的是EmptyCoroutineContext犁跪,取出來(lái)是null(ContinuationInterceptor會(huì)對(duì)Continuation進(jìn)行攔截,然后將執(zhí)行邏輯指派到對(duì)應(yīng)的線程之上去歹袁,這塊的邏輯后面再細(xì)說(shuō)坷衍,就不詳細(xì)展開(kāi)了。)条舔,所以這里intercepted()最終執(zhí)行結(jié)果就是返回this枫耳,this也就是StartCoroutineKt$block$1
(block函數(shù)生成的類)。
intercepted()
走完后再回到startCoroutine:
public fun <T> (suspend () -> T).startCoroutine(
completion: Continuation<T>
) {
createCoroutineUnintercepted(completion).intercepted().resume(Unit)
}
resume
就差最后一個(gè)resume(Unit)了孟抗,前面createCoroutineUnintercepted(completion).intercepted()
創(chuàng)建出來(lái)的是StartCoroutineKt$block$1
實(shí)例迁杨,所以我們需要到這個(gè)類里面去找resume函數(shù)。
再提一下類的繼承關(guān)系:
StartCoroutineKt$block$1 extends SuspendLambda implements Function1
internal abstract class SuspendLambda(
public override val arity: Int,
completion: Continuation<Any?>?
) : ContinuationImpl(completion), FunctionBase<Any?>, SuspendFunction
internal abstract class ContinuationImpl(
completion: Continuation<Any?>?,
private val _context: CoroutineContext?
) : BaseContinuationImpl(completion)
internal abstract class BaseContinuationImpl(
public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable
public interface Continuation<in T> {
public val context: CoroutineContext
public fun resumeWith(result: Result<T>)
}
StartCoroutineKt$block$1
中沒(méi)有該resume函數(shù)凄硼,其父類SuspendLambda也沒(méi)有該函數(shù)铅协,再到SuspendLambda的父類ContinuationImpl中,發(fā)現(xiàn)也沒(méi)有摊沉。再到ContinuationImpl的父類BaseContinuationImpl中狐史,也沒(méi)有該函數(shù),只有一個(gè)resumeWith说墨,奇了怪了预皇。后來(lái),我發(fā)現(xiàn)這個(gè)resume函數(shù)是一個(gè)擴(kuò)展函數(shù):
public inline fun <T> Continuation<T>.resume(value: T): Unit =
resumeWith(Result.success(value))
而resume這個(gè)擴(kuò)展函數(shù)最終是調(diào)用的resumeWith婉刀,resumeWidth的實(shí)現(xiàn)在BaseContinuationImpl中吟温。
public final override fun resumeWith(result: Result<Any?>) {
var current = this
var param = result
while (true) {
probeCoroutineResumed(current)
with(current) {
val completion = completion!! // fail fast when trying to resume continuation without completion
val outcome: Result<Any?> =
try {
val outcome = invokeSuspend(param)
if (outcome === COROUTINE_SUSPENDED) return
Result.success(outcome)
} catch (exception: Throwable) {
Result.failure(exception)
}
releaseIntercepted() // this state machine instance is terminating
if (completion is BaseContinuationImpl) {
current = completion
param = outcome
} else {
//label等于1時(shí)走這里
completion.resumeWith(outcome)
return
}
}
}
}
這個(gè)開(kāi)了個(gè)while(true)
循環(huán),不斷地執(zhí)行invokeSuspend()突颊,如果遇到invokeSuspend返回結(jié)果是COROUTINE_SUSPENDED
則退出while(true)
循環(huán)鲁豪。
public final Object invokeSuspend(Object $result) {
Object coroutine_suspended = IntrinsicsKt.getCOROUTINE_SUSPENDED();
//狀態(tài)機(jī)
switch (this.label) {
case 0:
//label一開(kāi)始是0
ResultKt.throwOnFailure($result);
System.out.println("start");
this.label = 1;
//這里正常情況會(huì)返回COROUTINE_SUSPENDED,label已經(jīng)改成1了律秃,下次走case 1的邏輯
if (DelayKt.delay(2000, this) != coroutine_suspended) {
break;
} else {
return coroutine_suspended;
}
case 1:
//label為1爬橡,沒(méi)有return,繼續(xù)走最后的結(jié)束語(yǔ)句
ResultKt.throwOnFailure($result);
break;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
//結(jié)束
System.out.println("end");
return "云天明";
}
invokeSuspend實(shí)際上就是我們的demo中的StartCoroutineKt$block$1
里的invokeSuspend函數(shù)棒动。在demo中糙申,這個(gè)invokeSuspend第一次的時(shí)候狀態(tài)機(jī)那里,label是0船惨,所以會(huì)隨即走到DelayKt.delay(2000, this)
柜裸,它是一個(gè)掛起函數(shù)缕陕,此時(shí)會(huì)拿到結(jié)果:COROUTINE_SUSPENDED
。resumeWith遇到COROUTINE_SUSPENDED
就不會(huì)繼續(xù)往下走了疙挺,等到delay執(zhí)行完成之后扛邑,會(huì)回調(diào)這個(gè)resumeWith函數(shù),再繼續(xù)走invokeSuspend铐然,此時(shí)label已經(jīng)是1了蔬崩,走到狀態(tài)機(jī)邏輯那里,返回結(jié)果“云天明”搀暑。
這個(gè)結(jié)果會(huì)被resumeWidth的outcome接收住沥阳,resumeWidth中的這個(gè)completion其實(shí)就是我們demo中的StartCoroutineKt$main$continuation$1
(實(shí)現(xiàn)Continuation<String>
的那個(gè)類,是通過(guò)構(gòu)造函數(shù)傳進(jìn)來(lái)的)自点,最終會(huì)走到completion.resumeWith(outcome)
桐罕,也就是來(lái)到了輸出結(jié)果的地方:println("結(jié)果: ${result.getOrNull()}")
。整個(gè)流程就走完了樟氢。
小結(jié)
createCoroutine用來(lái)創(chuàng)建協(xié)程,startCoroutine用來(lái)創(chuàng)建并啟動(dòng)協(xié)程侠鳄。它們內(nèi)部的原理是類似的埠啃,只是一個(gè)沒(méi)有調(diào)用resume啟動(dòng)協(xié)程,另一個(gè)調(diào)用了resume啟動(dòng)協(xié)程伟恶。編譯的時(shí)候碴开,會(huì)生成一個(gè)SuspendLambda的實(shí)現(xiàn)類,該類invokeSuspend用于執(zhí)行狀態(tài)機(jī)的邏輯博秫,調(diào)用resume后該狀態(tài)機(jī)會(huì)被觸發(fā)潦牛,狀態(tài)機(jī)走完,協(xié)程也就走完了挡育。