舉個(gè)栗子
通過下面的例子我們一起來一步一步來剖析kotlin協(xié)程
界面
界面邏輯
```
class WanAndroidActivity:AppCompatActivity() {
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_wanandroid)
btnStart.setOnClickListener {
startLaunch()
}
}
private fun startLaunch() {
GlobalScope.launch {
println("啟動(dòng)協(xié)程-- Thread name = ${Thread.currentThread().name}")
val wanAndroidApi = NetContext.get().create(WanAndroidApi::class.java)
val token = login()
println("token = $token")
val users = getUserList(token)
println("users = $users")
}
}
private suspend fun getUserList(token: String): List<String> {
delay(2000)
return listOf("abc","bcd","cde")
}
private suspend fun login(): String {
delay(2000)
return "token"
}
}
```
界面的布局和邏輯比較簡單
字節(jié)碼分析
對(duì)代碼進(jìn)行反編譯稍走,點(diǎn)擊AS Tools->kotlin->show kotlin bytecode
在上面的類定義了三個(gè)函數(shù)分別是 getUserList(token: String)
、login()
、startLaunch()
明郭,我們逐個(gè)來看:
getUserList(token: String)
login()
startLaunch()
我們把startLaunch()
反編譯的代碼绰寞,其中Function2
的實(shí)現(xiàn)先抽出來渤刃,簡化一下代碼,如下:
private final void startLaunch() {
CoroutineScope coroutineScope = CoroutineScope.INSTANCE;
CoroutineContext context = null;
CoroutineStart coroutineStart = null;
Function2 function2 = SuspendLambda(){};
BuildersKt.launch$default(coroutineScope,context,coroutineStart,function2, null);
}
SuspendLambda
怎么來的:
為什么Function2
創(chuàng)建的實(shí)例是SuspendLambda
麸恍,這個(gè)我們可以在字節(jié)碼里可以看出
在字節(jié)碼里調(diào)用了指令 NEW com/maiml/pdfdemo/wanandroid/WanAndroidActivity$startLaunch$1
來創(chuàng)建對(duì)象
從上面的字節(jié)碼可以看出com/maiml/pdfdemo/wanandroid/WanAndroidActivity$startLaunch$1
對(duì)象繼承了SuspendLambda
并且實(shí)現(xiàn)了Function2
接口。
BuildersKt.launch$default
方法的實(shí)現(xiàn):
直至此我們通過反編譯可以看到協(xié)程體生成的代碼內(nèi)容是什么搀矫,即SuspendLambda
或南。
協(xié)程的創(chuàng)建、啟動(dòng)艾君、恢復(fù)
我們來看CoroutineScope.launch{}
的源碼采够,來看一下協(xié)程的創(chuàng)建、啟動(dòng)和恢復(fù)
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}
其中 block: suspend CoroutineScope.() -> Unit
是SuspendLambda
//AbstractCoroutine.kt
//receiver:StandaloneCoroutine
//completion:StandaloneCoroutine<Unit>
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
initParentJob()
start(block, receiver, this)
}
start(block, receiver, this)
調(diào)用 CoroutineStart 中的 invoke 方法
//receiver:StandaloneCoroutine
//completion:StandaloneCoroutine<Unit>
@InternalCoroutinesApi
public operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>) =
when (this) {
CoroutineStart.DEFAULT -> block.startCoroutineCancellable(receiver, completion)
CoroutineStart.ATOMIC -> block.startCoroutine(receiver, completion)
CoroutineStart.UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
CoroutineStart.LAZY -> Unit // will start lazily
}
會(huì)走到CoroutineStart.DEFAULT
//Cacellable.kt
//receiver:StandaloneCoroutine
//completion:StandaloneCoroutine<Unit>
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) =
runSafely(completion) {
createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit))
}
調(diào)用3個(gè)方法冰垄,createCoroutineUnintercepted()
蹬癌,intercepted()
,resumeCancellableWith()
先看看createCoroutineUnintercepted()
//IntrinsicsJvm.kt
@SinceKotlin("1.3")
public actual fun <R, T> (suspend R.() -> T).createCoroutineUnintercepted(
receiver: R,
completion: Continuation<T>
): Continuation<Unit> {
val probeCompletion = probeCoroutineCreated(completion)
return if (this is BaseContinuationImpl)
create(receiver, probeCompletion)
else {
createCoroutineFromSuspendFunction(probeCompletion) {
(this as Function2<R, Continuation<T>, Any?>).invoke(receiver, it)
}
}
}
可以看到是調(diào)用 BaseContinuationImpl
的 create
的方法
public open fun create(value: Any?, completion: Continuation<*>): Continuation<Unit> {
throw UnsupportedOperationException("create(Any?;Continuation) has not been overridden")
}
create方法具體實(shí)現(xiàn)是什么虹茶,在 com/maiml/pdfdemo/wanandroid/WanAndroidActivity$startLaunch$1
的字節(jié)碼找到相對(duì)應(yīng)的實(shí)現(xiàn)
// access flags 0x11
// signature (Ljava/lang/Object;Lkotlin/coroutines/Continuation<*>;)Lkotlin/coroutines/Continuation<Lkotlin/Unit;>;
// declaration: kotlin.coroutines.Continuation<kotlin.Unit> create(java.lang.Object, kotlin.coroutines.Continuation<?>)
public final create(Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Lkotlin/coroutines/Continuation;
@Lorg/jetbrains/annotations/NotNull;() // invisible
// annotable parameter count: 2 (visible)
// annotable parameter count: 2 (invisible)
@Lorg/jetbrains/annotations/Nullable;() // invisible, parameter 0
@Lorg/jetbrains/annotations/NotNull;() // invisible, parameter 1
L0
ALOAD 2
LDC "completion"
INVOKESTATIC kotlin/jvm/internal/Intrinsics.checkParameterIsNotNull (Ljava/lang/Object;Ljava/lang/String;)V
NEW com/maiml/pdfdemo/wanandroid/WanAndroidActivity$startLaunch$1
DUP
ALOAD 0
GETFIELD com/maiml/pdfdemo/wanandroid/WanAndroidActivity$startLaunch$1.this$0 : Lcom/maiml/pdfdemo/wanandroid/WanAndroidActivity;
ALOAD 2
INVOKESPECIAL com/maiml/pdfdemo/wanandroid/WanAndroidActivity$startLaunch$1.<init> (Lcom/maiml/pdfdemo/wanandroid/WanAndroidActivity;Lkotlin/coroutines/Continuation;)V
ASTORE 3
ALOAD 3
ARETURN
L1
LOCALVARIABLE this Lkotlin/coroutines/jvm/internal/BaseContinuationImpl; L0 L1 0
LOCALVARIABLE value Ljava/lang/Object; L0 L1 1
LOCALVARIABLE completion Lkotlin/coroutines/Continuation; L0 L1 2
MAXSTACK = 4
MAXLOCALS = 4
上面的代碼可以看到是創(chuàng)建出了 com/maiml/pdfdemo/wanandroid/WanAndroidActivity$startLaunch$1
并返回
在反編譯的Java類也可以看出
可能有疑問為什么BaseContinuationImpl
的 create
的方法的實(shí)現(xiàn)要在com/maiml/pdfdemo/wanandroid/WanAndroidActivity$startLaunch$1
字節(jié)碼找逝薪,在上面我們分析了com/maiml/pdfdemo/wanandroid/WanAndroidActivity$startLaunch$1
是SuspendLambda
,而SuspendLambda
的繼承關(guān)系是:SuspendLambda
->ContinuationImpl
->BaseContinuationImpl
蝴罪。
intercepted()
//IntrinsicsJvm.kt
//這里的 this 是 `com/maiml/pdfdemo/wanandroid/WanAndroidActivity$startLaunch$1` 實(shí)例 - ContinuationImpl的子類
@SinceKotlin("1.3")
public actual fun <T> Continuation<T>.intercepted(): Continuation<T> =
(this as? ContinuationImpl)?.intercepted() ?: this
//ContinuationImpl
// context[ContinuationInterceptor]是 CoroutineDispatcher 實(shí)例
public fun intercepted(): Continuation<Any?> =
intercepted
?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
.also { intercepted = it }
//CoroutineDispatcher
public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
DispatchedContinuation(this, continuation)
創(chuàng)建DispatchedContinuation
用于線程調(diào)度
resumeCancellableWith()
// this 是 DispatchedContinuation
public fun <T> Continuation<T>.resumeCancellableWith(result: Result<T>) = when (this) {
is DispatchedContinuation -> resumeCancellableWith(result)
else -> resumeWith(result)
}
inline fun resumeCancellableWith(result: Result<T>) {
val state = result.toState()
//判斷是否需要線程調(diào)度董济,我們這里需要
if (dispatcher.isDispatchNeeded(context)) {
_state = state
resumeMode = MODE_CANCELLABLE
dispatcher.dispatch(context, this)
} else {
executeUnconfined(state, MODE_CANCELLABLE) {
if (!resumeCancelled()) {
resumeUndispatchedWith(result)
}
}
}
}
dispatcher
是:
public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext {
val combined = coroutineContext + context
val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined
return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null)
debug + Dispatchers.Default else debug
}
在 newCoroutineContext
創(chuàng)建
coroutine 的時(shí)候如果沒有指定Dispatchers會(huì) 添加默認(rèn)Dispatchers.Default
@JvmStatic
public actual val Default: CoroutineDispatcher = createDefaultDispatcher()
internal const val COROUTINES_SCHEDULER_PROPERTY_NAME = "kotlinx.coroutines.scheduler"
internal val useCoroutinesScheduler = systemProp(COROUTINES_SCHEDULER_PROPERTY_NAME).let { value ->
when (value) {
null, "", "on" -> true
"off" -> false
else -> error("System property '$COROUTINES_SCHEDULER_PROPERTY_NAME' has unrecognized value '$value'")
}
}
internal actual fun createDefaultDispatcher(): CoroutineDispatcher =
if (useCoroutinesScheduler) DefaultScheduler else CommonPool
COROUTINES_SCHEDULER_PROPERTY_NAME
的值默認(rèn)是on
,即會(huì)創(chuàng)建DefaultScheduler
要门,dispatcher.dispatch
是的 DefaultScheduler類的dispatch方法
//Dispatcher.kt
override fun dispatch(context: CoroutineContext, block: Runnable): Unit =
try {
coroutineScheduler.dispatch(block)
} catch (e: RejectedExecutionException) {
DefaultExecutor.dispatch(context, block)
}
//CoroutineScheduler.kt
fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, fair: Boolean = false) {
trackTask() // this is needed for virtual time support
val task = createTask(block, taskContext)
// try to submit the task to the local queue and act depending on the result
val notAdded = submitToLocalQueue(task, fair)
if (notAdded != null) {
if (!addToGlobalQueue(notAdded)) {
// Global queue is closed in the last step of close/shutdown -- no more tasks should be accepted
throw RejectedExecutionException("$schedulerName was terminated")
}
}
// Checking 'task' instead of 'notAdded' is completely okay
if (task.mode == TaskMode.NON_BLOCKING) {
signalCpuWork()
} else {
signalBlockingWork()
}
}
internal fun createTask(block: Runnable, taskContext: TaskContext): Task {
val nanoTime = schedulerTimeSource.nanoTime()
if (block is Task) {
block.submissionTime = nanoTime
block.taskContext = taskContext
return block
}
return TaskImpl(block, nanoTime, taskContext)
}
任務(wù)調(diào)度到會(huì)執(zhí)行TaskIpml
的run
方法虏肾,然后調(diào)用的是block.run()
,block是DispatchedContinuation并且繼承了DispatchedTask廓啊,因此調(diào)用到DispatchedTask的run方法
如果不出現(xiàn)異常會(huì)走到 continuation.resume()
// Continuation.kt
public inline fun <T> Continuation<T>.resume(value: T): Unit =
resumeWith(Result.success(value))
Continuation對(duì)象是分析的SuspendLambda
,它的繼承關(guān)系是:SuspendLambda
->ContinuationImpl
->BaseContinuationImpl
封豪,最終會(huì)調(diào)用到BaseContinuationImpl的resumeWith方法
調(diào)用 SuspendLambda
的 invokeSuspend
方法
看到72-76行谴轮,判斷var10000
是否等于COROUTINE_SUSPENDED,如果等于COROUTINE_SUSPENDED代表沒有可用結(jié)果吹埠,需要掛起等待可用結(jié)果返回
看看調(diào)用的var13.login()
方法會(huì)返回什么
只貼關(guān)鍵代碼
public suspend fun delay(timeMillis: Long) {
if (timeMillis <= 0) return // don't delay
return suspendCancellableCoroutine sc@ { cont: CancellableContinuation<Unit> ->
cont.context.delay.scheduleResumeAfterDelay(timeMillis, cont)
}
}
返回suspendCancellableCoroutine即COROUTINE_SUSPENDED第步,需要掛起等待結(jié)果返回,DelayKt.delay()我們可以類比成Handler.postDelayed缘琅,執(zhí)行完成后會(huì)調(diào)用會(huì)continuation.resume() ->BaseContinuationImpl.resumeWith()->SuspendLambda.invokeSuspend() 進(jìn)行恢復(fù)粘都。
總結(jié)
執(zhí)行到
login()
方法時(shí),login()
方法會(huì)返回COROUTINE_SUSPENDED
刷袍,在71行把label 設(shè)置為1翩隧,執(zhí)行var10000 == var7 然后return。為什么login()
方法會(huì)返回COROUTINE_SUSPENDED
做个,因?yàn)樵?code>login()方法調(diào)用delay()
方法鸽心,協(xié)程會(huì)被掛起,代表當(dāng)前結(jié)果不可用居暖。當(dāng)
login()
方法執(zhí)行結(jié)束,并有可用結(jié)果返回顽频,回調(diào)到invokeSuspend
方法,此時(shí)的label=1太闺,把結(jié)果賦值給var10000糯景,然后break執(zhí)行下面的邏輯。來到89行省骂,token=var10000蟀淮,并把label設(shè)置為2,執(zhí)行
getUserList()
方法钞澳,因?yàn)?code>getUserList()方法內(nèi)部也是調(diào)用delay()
方法怠惶,因此協(xié)程也會(huì)被掛起,返回COROUTINE_SUSPENDED
轧粟,執(zhí)行var10000 == var7 然后return策治,繼續(xù)等待可用結(jié)果返回。當(dāng)
getUserList()
方法執(zhí)行結(jié)束兰吟,并有可用結(jié)果返回通惫,回調(diào)到invokeSuspend
方法,此時(shí)的label=2混蔼,把結(jié)果賦值給var10000履腋,并執(zhí)行break label17,來到101行執(zhí)行剩下的邏輯。
參考
Kotlin協(xié)程之深入理解協(xié)程工作原理 - 掘金 (juejin.cn)
Kotlin Coroutines(協(xié)程) 完全解析(二)遵湖,深入理解協(xié)程的掛起悔政、恢復(fù)與調(diào)度 - 簡書 (jianshu.com)