通過線程一窺協(xié)程
我們先來看一張協(xié)程與線程對比圖:圖片來源
- 協(xié)程本質(zhì)上可以認為是運行在線程上的代碼塊,協(xié)程提供的 掛起 操作會使協(xié)程暫停執(zhí)行杜恰,而不會導(dǎo)致線程阻塞。
- 協(xié)程是一種輕量級資源轧膘,如上圖所示嗜诀,即使創(chuàng)建了上千個協(xié)程,對于系統(tǒng)來說也不是一種很大的負擔(dān)略吨。
- 包含關(guān)系上看魂务,協(xié)程跟線程的關(guān)系曼验,有點像“線程與進程的關(guān)系”,畢竟粘姜,協(xié)程不可能脫離線程運行鬓照;協(xié)程雖然不能脫離線程而運行,但可以在不同的線程之間切換孤紧,如上圖所示豺裆。
- 協(xié)程的核心競爭力在于:簡化異步并發(fā)任務(wù)(同步的寫法實現(xiàn)異步操作)。
一窺協(xié)程之后,我們再來一步步深入臭猜,深入之前先了解一些前置知識躺酒,來更好的幫助我們理解。
前置知識
Continuation Passing Style(CPS)
Continuation Passing Style翻譯過來就是“續(xù)體傳遞風(fēng)格”蔑歌,后面就簡稱CPS羹应。簡單點說CPS其實就是函數(shù)通過回調(diào)傳遞結(jié)果,先讓我們看看例子:
//常規(guī)寫法
class Exemple {
public static long sum(int a, int b) {
return a + b;
}
public static void main(String[] args) {
System.out.println(sum(1, 2));
}
}
//CPS風(fēng)格
class Exemple {
interface Continuation {
void next(int result);
}
public static void sum(int a, int b, Continuation continuation) {
continuation.next(a+ b);
}
public static void main(String[] args) {
plus(1, 2, result -> System.out.println(result));
}
}
很簡單吧次屠,這就是CPS風(fēng)格园匹,函數(shù)的結(jié)果通過回調(diào)來傳遞。協(xié)程里通過, 協(xié)程里通過在CPS的Continuation回調(diào)里結(jié)合狀態(tài)機流轉(zhuǎn)劫灶,來實現(xiàn)協(xié)程掛起-恢復(fù)的功能裸违。
協(xié)程中的續(xù)體——Continuation
Kotlin中被suspend修飾符修飾的函數(shù)在編譯期間會被編譯器做特殊處理。而首先處理的就是CPS變換本昏,他會改變掛起函數(shù)的函數(shù)簽名供汛。
例如下面這個suspend函數(shù):
suspend fun getResponse(): Response {
val response = doRequest() //doRequest() 模擬網(wǎng)絡(luò)請求
return response
}
在編譯期發(fā)生CPS轉(zhuǎn)換后,反編譯成java凛俱,結(jié)果會是這樣:
public static final Object getResponse(Continuation $completion) {
...
}
我們可以看到編譯器對函數(shù)簽名做了改變紊馏,這種改變就是上節(jié)中說過的CPS(續(xù)體傳遞風(fēng)格)變換料饥。
發(fā)生了CPS變換后的函數(shù)多了一個Continuation(續(xù)體)類型的參數(shù)蒲犬,它的聲明如下:
interface Continuation<in T> {
val context: CoroutineContext
fun resumeWith(result: Result<T>)//result 為返回的結(jié)果
}
- 續(xù)體是一個較為抽象的概念,簡單來說它包裝了協(xié)程在掛起之后應(yīng)該繼續(xù)執(zhí)行的代碼岸啡;在編譯的過程中原叮,一個完整的協(xié)程被分割切塊成一個又一個續(xù)體。
- 在suspend函數(shù)或者 await 函數(shù)的掛起結(jié)束以后巡蘸,它會調(diào)用 continuation 參數(shù)的 resumeWith 函數(shù)奋隶,來恢復(fù)執(zhí)行suspend函數(shù)或者await 函數(shù)后面的代碼。
下面就看下suspend函數(shù)CPS轉(zhuǎn)換后的偽代碼:
fun getResponse(ct:Continuation):Any?{
val response = doRequest()
ct.resumeWith(response)
}
注:可以將Continuation認為是一個Callback悦荒,這樣是不是更好理解了唯欣。
最后還要提一點,我們看到發(fā)生 CPS 變換的函數(shù)搬味,返回值類型變成了 Any?境氢,這是因為這個函數(shù)在發(fā)生變換后士聪,除了要返回它本身的返回值惶岭,還要返回一個標記CoroutineSingletons.COROUTINE_SUSPENDED,為了適配各種可能性,CPS 轉(zhuǎn)換后的函數(shù)返回值類型就只能是 Any?
了翼岁。至于CoroutineSingletons.COROUTINE_SUSPENDED是什么悦析,后面我們會說到寿桨。
協(xié)程的啟動
例子:
object CoroutineExample {
private val TAG: String = "CoroutineExample"
fun main(){
GlobalScope.launch(Main) {
request()
}
}
private suspend fun request(): String {
delay(2000)
Log.e(TAG, "request complete")
return "result from request"
}
}
GlobalScope.launch()
從 CoroutineScope.launch
開始跟蹤協(xié)程啟動流程:
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy){
LazyStandaloneCoroutine(newContext, block)
}else{
StandaloneCoroutine(newContext, active = true)
}
coroutine.start(start, coroutine, block)
return coroutine
}
- 參數(shù)一context:協(xié)程上下文,并不是我們平時理解的Android中的上下文强戴,它是一種key-value數(shù)據(jù)結(jié)構(gòu)亭螟。此處我們傳入了Main用于主線程調(diào)度挡鞍,這部分這里就先不擴展了。
- 參數(shù)二start:啟動模式预烙,此處我們沒有傳值則為默認值(DEFAULT)匕累,共有三種啟動模式。
- DEFAULT:默認模式默伍,創(chuàng)建即啟動協(xié)程欢嘿,可隨時取消;
- ATOMIC:自動模式也糊,創(chuàng)建即啟動協(xié)程炼蹦,啟動前不可取消;
- LAZY:延遲啟動模式狸剃,只有當(dāng)調(diào)用start方法時才能啟動掐隐。
- 參數(shù)三block:協(xié)程真正執(zhí)行的代碼塊,即上面例子中l(wèi)aunch{}閉包內(nèi)的代碼塊钞馁。
SuspendLambda
CoroutineScope.launch
中第三個參數(shù)類型為suspend CoroutineScope.() -> Unit
函數(shù)虑省,這是怎么來的呢?我們編寫代碼的時候并沒有這個東西僧凰,其實它由編譯器生成的探颈,我們的block
代碼塊經(jīng)過編譯器編譯后會生成一個繼承Continuation
的類SuspendLambda。一起看下反編譯的java代碼训措,為了關(guān)注主要邏輯方便理解伪节,去掉了一些無關(guān)代碼大概代碼如下:
public final void main() {
BuildersKt.launch$default((CoroutineScope)GlobalScope.INSTANCE, (CoroutineContext)Dispatchers.getMain(), (CoroutineStart)null, (Function2)(new Function2((Continuation)null) {
int label;
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
Object var2 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
switch (this.label) {
case 0:
ResultKt.throwOnFailure($result);
CoroutineExample var10000 = CoroutineExample.this;
this.label = 1;
if (var10000.request(this) == var2) {
return var2;
}
break;
case 1:
ResultKt.throwOnFailure($result);
break;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
return Unit.INSTANCE;
}
···
}
從上面反編譯的java代碼中好像并不能很好的看出來協(xié)程中的block代碼塊具體編譯長什么樣子,但可以確定他是編譯成了Continuation
類绩鸣,因為我們可以看到實現(xiàn)的invokeSuspend
方法實際是來自BaseContinuationImpl
,而BaseContinuationImpl
的父類就是Continuation
怀大。這個繼承關(guān)系我們后面再說。既然從反編譯的java代碼中看的不明顯呀闻,我們直接看上面例子的字節(jié)碼文件化借,其中可以很明顯的看到這樣一段代碼:
final class com/imile/pda/CoroutineExample$main$1 extends kotlin/coroutines/jvm/internal/SuspendLambda implements kotlin/jvm/functions/Function2
這下恍然大悟,launch函數(shù)的第三個參數(shù)捡多,即協(xié)程中的block代碼塊是一個編譯后繼承了SuspendLambda
并且實現(xiàn)了Function2
的實例蓖康。SuspendLambda
本質(zhì)上是一個 Continuation
,前面我們已經(jīng)說過 Continuation 是一個有著恢復(fù)操作的接口局服,其 resume 方法可以恢復(fù)協(xié)程的執(zhí)行钓瞭。
SuspendLambda
繼承機構(gòu)如下:
- Continuation: 續(xù)體,恢復(fù)協(xié)程的執(zhí)行
- BaseContinuationImpl: 實現(xiàn) resumeWith(Result) 方法淫奔,控制狀態(tài)機的執(zhí)行山涡,定義了 invokeSuspend 抽象方法
- ContinuationImpl: 增加 intercepted 攔截器,實現(xiàn)線程調(diào)度等
- SuspendLambda: 封裝協(xié)程體代碼塊
- 協(xié)程體代碼塊生成的子類: 實現(xiàn) invokeSuspend 方法,其內(nèi)實現(xiàn)狀態(tài)機流轉(zhuǎn)邏輯
每一層封裝都對應(yīng)添加了不同的功能鸭丛,我們先忽略掉這些功能細節(jié)竞穷,著眼于我們的主線,繼續(xù)跟進launch
函數(shù)執(zhí)行過程鳞溉,由于第二個參數(shù)是默認值(DEFAULT)瘾带,所以創(chuàng)建的是 StandaloneCoroutine
, 最后啟動協(xié)程:
// 啟動協(xié)程
coroutine.start(start, coroutine, block)
// 啟動協(xié)程
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
start(block, receiver, this)
}
上面 coroutine.start
的調(diào)用涉及到運算符重載熟菲,實際上會調(diào)到 CoroutineStart.invoke()
方法:
public operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>): Unit =
when (this) {
DEFAULT -> block.startCoroutineCancellable(receiver, completion)
ATOMIC -> block.startCoroutine(receiver, completion)
UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
LAZY -> Unit // will start lazily
}
這里啟動方式為DEFAULT看政,所以接著往下看:
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(
receiver: R, completion: Continuation<T>,
onCancellation: ((cause: Throwable) -> Unit)? = null
) = runSafely(completion) {
createCoroutineUnintercepted(receiver, completion)
.intercepted()
.resumeCancellableWith(Result.success(Unit), onCancellation)
}
整理下調(diào)用鏈如下:
coroutine.start(start, coroutine, block)
-> CoroutineStart.start(block, receiver, this)
-> CoroutineStart.invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>)
-> block.startCoroutineCancellable(receiver, completion)
->
createCoroutineUnintercepted(receiver,completion).intercepted().resumeCancellableWith(Result.success(Unit), onCancellation)
最后走到createCoroutineUnintercepted(receiver,completion).intercepted().resumeCancellableWith(Result.success(Unit), onCancellation)
,這里創(chuàng)建了一個協(xié)程抄罕,并鏈式調(diào)用 intercepted
允蚣、resumeCancellable
方法,利用協(xié)程上下文中的 ContinuationInterceptor
對協(xié)程的執(zhí)行進行攔截呆贿,intercepted
實際上調(diào)用的是 ContinuationImpl
的 intercepted
方法:
internal abstract class ContinuationImpl(
completion: Continuation<Any?>?,
private val _context: CoroutineContext?
) : BaseContinuationImpl(completion) {
...
public fun intercepted(): Continuation<Any?> =
intercepted
?:(context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
.also { intercepted = it }
...
}
context[ContinuationInterceptor]?.interceptContinuation
調(diào)用的是 CoroutineDispatcher
的 interceptContinuation
方法:
public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
DispatchedContinuation(this, continuation)
內(nèi)部創(chuàng)建了一個 DispatchedContinuation
可分發(fā)的協(xié)程實例嚷兔,我們繼續(xù)進到看resumeCancellableWith
方法:
internal class DispatchedContinuation<in T>(
@JvmField val dispatcher: CoroutineDispatcher,
@JvmField val continuation: Continuation<T>
) : DispatchedTask<T>(MODE_UNINITIALIZED), CoroutineStackFrame, Continuation<T> by continuation {
...
public fun <T> Continuation<T>.resumeCancellableWith(
result: Result<T>,
onCancellation: ((cause: Throwable) -> Unit)? = null
): Unit = when (this) {
// 判斷是否是DispatchedContinuation 根據(jù)我們前面的代碼追蹤 這里是DispatchedContinuation
is DispatchedContinuation -> resumeCancellableWith(result, onCancellation)
else -> resumeWith(result)
}
inline fun resumeCancellableWith(
result: Result<T>,
noinline onCancellation: ((cause: Throwable) -> Unit)?
) {
val state = result.toState(onCancellation)
// 判斷是否需要線程調(diào)度
// 由于我們之前使用的是 `GlobalScope.launch(Main)` Android主線程調(diào)度器所以這里為true
if (dispatcher.isDispatchNeeded(context)) {
_state = state
resumeMode = MODE_CANCELLABLE
dispatcher.dispatch(context, this)
} else {
executeUnconfined(state, MODE_CANCELLABLE) {
if (!resumeCancelled(state)) {
resumeUndispatchedWith(result)
}
}
}
}
...
}
最終走到 dispatcher.dispatch(context, this)
而這里的 dispatcher
就是通過工廠方法創(chuàng)建的 HandlerDispatcher
,dispatch()
函數(shù)第二個參數(shù)this
是一個runnable
這里為 DispatchedTask
HandlerDispatcher
internal class HandlerContext private constructor(
private val handler: Handler,
private val name: String?,
private val invokeImmediately: Boolean
) : HandlerDispatcher(), Delay {
...
// 最終執(zhí)行這里的 dispatch方法 而handler則是android中的 MainHandler
override fun dispatch(context: CoroutineContext, block: Runnable) {
if (!handler.post(block)) {
cancelOnRejection(context, block)
}
}
...
}
這里借用 Android 的主線程消息隊列來在主線程中執(zhí)行 block Runnable
而這個 Runnable
即為 DispatchedTask
:
internal abstract class DispatchedTask<in T>(
@JvmField public var resumeMode: Int
) : SchedulerTask() {
...
public final override fun run() {
...
withContinuationContext(continuation, delegate.countOrElement) {
...
if (job != null && !job.isActive) {
val cause = job.getCancellationException()
cancelCompletedResult(state, cause)
// 異常情況下
continuation.resumeWithStackTrace(cause)
} else {
if (exception != null) {
// 異常情況下
continuation.resumeWithException(exception)
} else {
// step1:正常情況下走到這一步
continuation.resume(getSuccessfulResult(state))
}
}
}
...
}
}
//step2:這是Continuation的擴展函數(shù)做入,內(nèi)部調(diào)用了resumeWith()
@InlineOnly public inline fun <T> Continuation<T>.resume(value: T): Unit =
resumeWith(Result.success(value))
//step3:最終會調(diào)用到BaseContinuationImpl的resumeWith()方法中
internal abstract class BaseContinuationImpl(...) {
// 實現(xiàn) Continuation 的 resumeWith冒晰,并且是 final 的,不可被重寫
public final override fun resumeWith(result: Result<Any?>) {
...
val outcome = invokeSuspend(param)
...
}
// 由編譯生成的協(xié)程相關(guān)類來實現(xiàn)竟块,例如 CoroutineExample$main$1
protected abstract fun invokeSuspend(result: Result<Any?>): Any?
}
最終調(diào)用到 continuation.resumeWith()
而 resumeWith()
中會調(diào)用 invokeSuspend
壶运,即之前編譯器生成的 SuspendLambda
中的 invokeSuspend
方法:
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
Object var2 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
switch (this.label) {
case 0:
ResultKt.throwOnFailure($result);
CoroutineExample var10000 = CoroutineExample.this;
this.label = 1;
if (var10000.request(this) == var2) {
return var2;
}
break;
case 1:
ResultKt.throwOnFailure($result);
break;
}
}
這段代碼是一個狀態(tài)機機制,每一個掛起點都是一種狀態(tài)彩郊,協(xié)程恢復(fù)只是跳轉(zhuǎn)到下一個狀態(tài)前弯,掛起點將執(zhí)行過程分割成多個片段,利用狀態(tài)機的機制保證各個片段按順序執(zhí)行秫逝。
可以看到協(xié)程非阻塞的異步底層實現(xiàn)其實就是一種Callback回調(diào)(這一點我們在介紹Continuation時有提到過),只不過有多個掛起點時就會有多個Callback回調(diào)询枚,這里協(xié)程把多個Callback回調(diào)封裝成了一個狀態(tài)機违帆。
以上就是協(xié)程的啟動過程,下面我們再來看下協(xié)程中的重點掛起和恢復(fù)金蜀。
協(xié)程的掛起與恢復(fù)
協(xié)程的啟動刷后,掛起和恢復(fù)有兩個關(guān)鍵方法: invokeSuspend()
和 resumeWith(Result)
。我們以上一節(jié)中的例子渊抄,反編譯后逆向剖析協(xié)程的掛起和恢復(fù)尝胆,先整體看下是怎樣的一個過程。
suspend fun reqeust(): String {
delay(2000)
return "result from request"
}
反編譯后的代碼如下(為了方便理解护桦,代碼有刪減和修改):
//1.函數(shù)返回值由String變成Object含衔,入?yún)⒁苍黾恿薈ontinuation參數(shù)
public final Object reqeust(@NotNull Continuation completion) {
//2.通過completion創(chuàng)建一個ContinuationImpl,并且復(fù)寫了invokeSuspend()
Object continuation;
if (completion instanceof <undefinedtype>){
continuation = <undefinedtype>completion
}else{
continuation = new ContinuationImpl(completion) {
Object result;
int label; //初始值為0
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
this.result = $result;
this.label |= Integer.MIN_VALUE;
return request(this);//又調(diào)用了requestUserInfo()方法
}
};
}
Object $result = (continuation).result;
Object var4 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
//狀態(tài)機
//3.方法被恢復(fù)的時候又會走到這里,第一次進入case 0分支贪染,label的值從0變?yōu)?缓呛,第二次進入就會走case 1分支
switch(continuation.label) {
case 0:
ResultKt.throwOnFailure($result);
continuation.label = 1;
//4.delay()方法被suspend修飾,傳入一個continuation回調(diào)杭隙,返回一個object結(jié)果哟绊。這個結(jié)果要么是`COROUTINE_SUSPENDED`,否則就是真實結(jié)果痰憎。
Object delay = DelayKt.delay(2000L, continuation)
if (delay == var4) {//如果是COROUTINE_SUSPENDED則直接return票髓,就不會往下執(zhí)行了,requestUserInfo()被暫停了铣耘。
return var4;
}
break;
case 1:
ResultKt.throwOnFailure($result);
break;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
return "result from request";
}
掛起過程:
- 函數(shù)返回值由
String
變成Object
炬称,編譯器自動增加了Continuation
參數(shù),相當(dāng)于幫我們添加Callback涡拘。 - 根據(jù)
completion
創(chuàng)建了一個ContinuationImpl
(如果已經(jīng)創(chuàng)建就直接用玲躯,避免重復(fù)創(chuàng)建),復(fù)寫了invokeSuspend()
方法鳄乏,在這個方法里面它又調(diào)用了request()
方法跷车,這里又調(diào)用了一次自己(是不是很神奇),并且把continuation
傳遞進去橱野。 - 在 switch 語句中朽缴,
label
的默認初始值為 0,第一次會進入case 0
分支水援,delay()
是一個掛起函數(shù)密强,傳入上面的continuation
參數(shù),會有一個Object
類型的返回值蜗元。這個結(jié)果要么是COROUTINE_SUSPENDED
或渤,否則就是真實結(jié)果。(關(guān)于delay是如何返回COROUTINE_SUSPENDED奕扣,可自行跟下源碼薪鹦,這里就不展開) -
DelayKt.delay(2000, continuation)
的返回結(jié)果如果是COROUTINE_SUSPENDED
, 則直接 return 惯豆,那么方法執(zhí)行就被結(jié)束了池磁,方法就被掛起了。
- 函數(shù)即便被
suspend
修飾了楷兽,但是也未必會掛起地熄。需要里面的代碼編譯后有返回值為COROUTINE_SUSPENDED
這樣的標記位才可以。- 協(xié)程的掛起實際是方法的掛起芯杀,本質(zhì)是return端考。
恢復(fù)過程:
因為
delay()
是 IO操作雅潭,在2000ms后就會通過傳遞給它的continuation
回調(diào)回來。回調(diào)到
ContinuationImpl
類的resumeWith()
方法跛梗,會再次調(diào)用invokeSuspend()
方法寻馏,進而再次調(diào)用requestUserInfo()
方法。程序會再次進入switch語句核偿,由于第一次在
case 0
時把label = 1
賦值為1诚欠,所以這次會進入case 1
分支,并且返回了結(jié)果result from request
漾岳。并且
request()
的返回值作為invokeSuspend()
的返回值返回轰绵。重新被執(zhí)行的時候就代表著方法被恢復(fù)了。
看到大家一定會疑問尼荆,步驟2中invokeSuspend()
是如何被再次調(diào)用呢左腔?我們都知道 ContinuationImpl
的父類是 BaseContinuationImpl
,實際上ContinuationImpl
中調(diào)用的resumeWith()
是來自父類BaseContinuationImpl
捅儒。
internal abstract class BaseContinuationImpl(
public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {
//這個實現(xiàn)是最終的液样,用于展開 resumeWith 遞歸。
public final override fun resumeWith(result: Result<Any?>) {
var current = this
var param = result
while (true) {
with(current) {
val completion = completion!!
val outcome: Result<Any?> =
try {
// 1.調(diào)用 invokeSuspend()方法執(zhí)行巧还,執(zhí)行協(xié)程的真正運算邏輯鞭莽,拿到返回值
val outcome = invokeSuspend(param)
// 2.如果返回的還是COROUTINE_SUSPENDED則提前結(jié)束
if (outcome == COROUTINE_SUSPENDED) return
Result.success(outcome)
} catch (exception: Throwable) {
Result.failure(exception)
}
if (completion is BaseContinuationImpl) {
//3.如果 completion 是 BaseContinuationImpl,內(nèi)部還有suspend方法麸祷,則會進入循環(huán)遞歸澎怒,繼續(xù)執(zhí)行和恢復(fù)
current = completion
param = outcome
} else {
//4.否則是最頂層的completion,則會調(diào)用resumeWith恢復(fù)上一層并且return
// 這里實際調(diào)用的是其父類 AbstractCoroutine 的 resumeWith 方法
completion.resumeWith(outcome)
return
}
}
}
}
實際上任何一個掛起函數(shù)它在恢復(fù)的時候都會調(diào)到 BaseContinuationImpl
的 resumeWith()
方法里面阶牍。
- 一但
invokeSuspend()
方法被執(zhí)行喷面,那么request()
又會再次被調(diào)用,invokeSuspend()
就會拿到request()
的返回值,在ContinuationImpl
里面根據(jù)val outcome = invokeSuspend()
的返回值來判斷我們的request()
方法恢復(fù)了之后的操作走孽。 - 如果
outcome
是COROUTINE_SUSPENDED
常量(可能掛起函數(shù)中又返回了一個掛起函數(shù))惧辈,說明你即使被恢復(fù)了,執(zhí)行了一下融求,if (outcome == COROUTINE_SUSPENDED) return
但是立馬又被掛起了咬像,所以又 return 了。 - 如果本次恢復(fù)
outcome
是一個正常的結(jié)果生宛,就會走到completion.resumeWith(outcome)
,當(dāng)前被掛起的方法已經(jīng)被執(zhí)行完了肮柜,實際調(diào)用的是其父類AbstractCoroutine
的resumeWith
方法陷舅,那么協(xié)程就恢復(fù)了。
我們知道
request()
肯定是會被協(xié)程調(diào)用的(從上面反編譯代碼知道會傳遞一個Continuation completion
參數(shù))审洞,request()
方法恢復(fù)完了就會讓協(xié)程completion.resumeWith()
去恢復(fù)莱睁,所以說協(xié)程的恢復(fù)是方法的恢復(fù)待讳,本質(zhì)其實是callback(resumeWith)回調(diào)。
一張圖總結(jié)一下:
協(xié)程的核心是掛起——恢復(fù)仰剿,掛起——恢復(fù)的本質(zhì)是return & callback
回調(diào)
協(xié)程掛起
我們說過協(xié)程啟動后會調(diào)用到上面這個 resumeWith() 方法创淡,接著調(diào)用其 invokeSuspend() 方法:
- 當(dāng) invokeSuspend() 返回 COROUTINE_SUSPENDED 后,就直接 return 終止執(zhí)行了南吮,此時協(xié)程被掛起琳彩。
- 當(dāng) invokeSuspend() 返回非 COROUTINE_SUSPENDED 后,說明協(xié)程體執(zhí)行完畢了部凑,對于 launch 啟動的協(xié)程體露乏,傳入的 completion 是 AbstractCoroutine 子類對象,最終會調(diào)用其 AbstractCoroutine.resumeWith() 方法做一些狀態(tài)改變之類的收尾邏輯涂邀。至此協(xié)程便執(zhí)行完畢了瘟仿。
協(xié)程恢復(fù)
這里我們接著看上面第一條:協(xié)程執(zhí)行到掛起函數(shù)被掛起后,當(dāng)這個掛起函數(shù)執(zhí)行完畢后是怎么恢復(fù)協(xié)程的比勉,以下面掛起函數(shù)為例:
private suspend fun login() = withContext(Dispatchers.IO) {
Thread.sleep(2000)
return@withContext true
}
通過反編譯可以看到上面掛起函數(shù)中的函數(shù)體也被編譯成了 SuspendLambda 的子類劳较,創(chuàng)建其實例時也需要傳入 Continuation 續(xù)體參數(shù)(調(diào)用該掛起函數(shù)的協(xié)程所在續(xù)體)。貼下 withContext 的源碼:
public suspend fun <T> withContext(
context: CoroutineContext,
block: suspend CoroutineScope.() -> T
): T {
return suspendCoroutineUninterceptedOrReturn sc@ { uCont ->
// 創(chuàng)建 new context
val oldContext = uCont.context
val newContext = oldContext + context
// 檢查新上下文是否作廢
newContext.ensureActive()
// 新上下文與舊上下文相同
if (newContext === oldContext) {
val coroutine = ScopeCoroutine(newContext, uCont)
return@sc coroutine.startUndispatchedOrReturn(coroutine, block)
}
// 新調(diào)度程序與舊調(diào)度程序相同
if (newContext[ContinuationInterceptor] == oldContext[ContinuationInterceptor]) {
val coroutine = UndispatchedCoroutine(newContext, uCont)
// 上下文有變化浩聋,所以這個線程需要更新
withCoroutineContext(newContext, null) {
return@sc coroutine.startUndispatchedOrReturn(coroutine, block)
}
}
// 使用新的調(diào)度程序
val coroutine = DispatchedCoroutine(newContext, uCont)
block.startCoroutineCancellable(coroutine, coroutine)
coroutine.getResult()
}
}
首先調(diào)用了 suspendCoroutineUninterceptedOrReturn 方法观蜗,看注釋知道可以通過它來獲取到當(dāng)前的續(xù)體對象 uCont, 接著有幾條分支調(diào)用,但最終都是會通過續(xù)體對象來創(chuàng)建掛起函數(shù)體對應(yīng)的 SuspendLambda 對象赡勘,并執(zhí)行其 invokeSuspend() 方法嫂便,在其執(zhí)行完畢后調(diào)用 uCont.resume() 來恢復(fù)協(xié)程,具體邏輯大家感興趣可以自己跟代碼闸与,與前面大同小異毙替。
至于其他的頂層掛起函數(shù)如 await()
, suspendCoroutine()
, suspendCancellableCoroutine()
等,其內(nèi)部也是通過 suspendCoroutineUninterceptedOrReturn() 來獲取到當(dāng)前的續(xù)體對象践樱,以便在掛起函數(shù)體執(zhí)行完畢后厂画,能通過這個續(xù)體對象恢復(fù)協(xié)程執(zhí)行。
附錄
附錄線程和協(xié)程間關(guān)系總結(jié)(總結(jié)來源)拷邢,加深理解袱院。
線程:
- 線程是操作系統(tǒng)級別的概念
- 我們開發(fā)者通過編程語言(Thread.java)創(chuàng)建的線程,本質(zhì)還是操作系統(tǒng)內(nèi)核線程的映射
- JVM 中的線程與內(nèi)核線程的存在映射關(guān)系瞭稼,有“一對一”忽洛,“一對多”,“M對N”环肘。JVM 在不同操作系統(tǒng)中的具體實現(xiàn)會有差別欲虚,“一對一”是主流
- 一般情況下,我們說的線程悔雹,都是內(nèi)核線程复哆,線程之間的切換欣喧,調(diào)度,都由操作系統(tǒng)負責(zé)
- 線程也會消耗操作系統(tǒng)資源梯找,但比進程輕量得多
- 線程唆阿,是搶占式的,它們之間能共享內(nèi)存資源锈锤,進程不行
- 線程共享資源導(dǎo)致了多線程同步問題
- 有的編程語言會自己實現(xiàn)一套線程庫驯鳖,從而能在一個內(nèi)核線程中實現(xiàn)多線程效果,早期 JVM 的“綠色線程” 就是這么做的牙咏,這種線程被稱為“用戶線程”
協(xié)程:
- Kotlin 協(xié)程臼隔,不是操作系統(tǒng)級別的概念,無需操作系統(tǒng)支持
- Kotlin 協(xié)程妄壶,有點像上面提到的“綠色線程”摔握,一個線程上可以運行成千上萬個協(xié)程
- Kotlin 協(xié)程,是用戶態(tài)的(userlevel)丁寄,內(nèi)核對協(xié)程無感知
- Kotlin 協(xié)程氨淌,是協(xié)作式的,由開發(fā)者管理伊磺,不需要操作系統(tǒng)進行調(diào)度和切換盛正,也沒有搶占式的消耗,因此它更加高效
- Kotlin 協(xié)程屑埋,它底層基于狀態(tài)機實現(xiàn)豪筝,多協(xié)程之間共用一個實例,資源開銷極小摘能,因此它更加輕量
- Kotlin 協(xié)程续崖,本質(zhì)還是運行于線程之上,它通過協(xié)程調(diào)度器团搞,可以運行到不同的線程上