這一篇我們好好聊一聊協(xié)程的原理愚屁,通過上一篇的學(xué)習(xí)济竹,相信大家對(duì)于如何使用協(xié)程已經(jīng)非常熟悉了痕檬。
故事還得從上次的協(xié)程分享開始,由于大家對(duì)協(xié)程的實(shí)踐并不多送浊,所以大家對(duì)下面的這段代碼如何執(zhí)行爭(zhēng)論不休:
GlobalScope.launch {
val a = async {
1+2
}
val b = async {
1+3
}
val c = a + b
Log.e(TAG,"result:$c")
}
復(fù)制代碼
有人說梦谜,a 和 b 會(huì)串行執(zhí)行,有人說袭景,a 和 b 會(huì)并行執(zhí)行唁桩,那么執(zhí)行的結(jié)果到底是什么樣的?我們將在下面的文章給出耸棒。
一荒澡、結(jié)構(gòu)簡(jiǎn)要介紹
首先,我們得明確協(xié)程中有哪些東西与殃,如果你會(huì)使用協(xié)程单山,那你肯定知道協(xié)程中有 CoroutineScope
、CoroutineContext
和 CoroutineDispatcher
幅疼,這些都是使用過程中我們可以接觸到的 API米奸。
我簡(jiǎn)單的整理了協(xié)程中主要的基礎(chǔ)類:
協(xié)程的類結(jié)構(gòu)可分為三部分:CoroutineScope
、CoroutineContext
和 Continuation
爽篷。
1. Continuation
如果你會(huì)使用協(xié)程悴晰,那你肯定知道,協(xié)程遇到耗時(shí) suspend
操作可以掛起逐工,等到任務(wù)結(jié)束的時(shí)候铡溪,協(xié)程會(huì)自動(dòng)切回來。
它的奧秘就是 Continuation
泪喊,Continuation
可以理解程續(xù)體棕硫,你可以理解其每次在協(xié)程掛起點(diǎn)將剩余的代碼包括起來,等到結(jié)束以后執(zhí)行剩余的內(nèi)容窘俺。一個(gè)協(xié)程的代碼塊可能會(huì)被切割成若干個(gè) Continuation
饲帅,在每個(gè)需要掛起的地方都會(huì)分配一個(gè) Continuation
。
先拋出一些結(jié)論瘤泪,協(xié)程在做耗時(shí)操作的時(shí)候灶泵,如果執(zhí)行了耗時(shí) suspend
操作,會(huì)自動(dòng)掛起对途,但是這個(gè)耗時(shí)操作終究是要做的赦邻,只不過切換到其他線程去做了,做完以后協(xié)程就需要切回來实檀,但是切到哪兒呢惶洲?這便是 Continuation
需要解決的問題按声。
Continuation
的流程是這樣的:
<figcaption></figcaption>
無論是使用 launch
還是 async
啟動(dòng)的協(xié)程,都會(huì)有一個(gè)結(jié)束的時(shí)候用來回調(diào)的 continuation
恬吕。
2. CoroutineScope
關(guān)于 CoroutineScope
沒有特別多要說的签则,它持有了 CoroutineContext
,主要對(duì)協(xié)程的生命周期進(jìn)行管理铐料。
3. CoroutineContext
一開始看 CoroutineContext
覺得特別暈渐裂,不明白為啥要這么設(shè)計(jì),看了 Bennyhuo 大佬的文章以后才稍微好轉(zhuǎn)钠惩。
從上面協(xié)程的類的機(jī)構(gòu)中可以看出柒凉,光看這個(gè) CoroutineContext
這個(gè)接口(源碼內(nèi)容我們下面講),會(huì)發(fā)現(xiàn)它有點(diǎn)像 List
集合篓跛,而繼承自 CoroutineContext
接口的 Element
接口則定義了其中的元素膝捞。
隨后,這個(gè) Element
接口被劃分成了兩種類愧沟,Job
和 ContinuationInterceptor
:
-
Job
:從字面上來講蔬咬,它代表一個(gè)任務(wù),Thread
也是執(zhí)行任務(wù)央渣,所以我們可以理解它定義了協(xié)程的一些東西计盒,比如協(xié)程的狀態(tài),協(xié)程和子協(xié)程的管理方式等等芽丹。 -
ContinuationInterceptor
:也從字面上來看北启,它是Continuation
的攔截器,通過攔截Continuation
拔第,完成我們想要完成的工作咕村,比如說線程的切換。
二蚊俺、結(jié)構(gòu)源碼分析
上面我們從概念上介紹了協(xié)程的三大件懈涛,在這部分,我們從源碼分析泳猬。
1. Continuation
suspend
修飾的方法會(huì)在在編譯期間被編譯器做特殊處理批钠,這種處理被成為CPS(續(xù)體轉(zhuǎn)換風(fēng)格) 轉(zhuǎn)化,suspend
方法會(huì)被包裹成 Continuation
得封。
說了這么久的 Continuation
埋心,我們還沒有見過接口代碼,由于接口內(nèi)容不多忙上,我就把所有的內(nèi)容貼出來了:
/**
* Interface representing a continuation after a suspension point that returns a value of type `T`.
*/
@SinceKotlin("1.3")
public interface Continuation<in T> {
/**
* The context of the coroutine that corresponds to this continuation.
*/
public val context: CoroutineContext
/**
* Resumes the execution of the corresponding coroutine passing a successful or failed [result] as the
* return value of the last suspension point.
*/
public fun resumeWith(result: Result<T>)
}
復(fù)制代碼
我們重點(diǎn)關(guān)注Continuation#resumeWith()
方法拷呆,從注釋來看,通過返回 suspend
掛起點(diǎn)的值來恢復(fù)協(xié)程的執(zhí)行,協(xié)程可以從參數(shù) Result<T>)
獲取成功的值或者失敗的結(jié)果茬斧,如果沒有結(jié)果腰懂,那么 Result<T>
的泛型是 Unit
。Resulut
這個(gè)類也特別簡(jiǎn)單项秉,感興趣的同學(xué)可以查看源碼绣溜。
BaseContinuationImpl
實(shí)現(xiàn)了 Continuation
接口,我們看一下 Continuation#resumeWith
方法的實(shí)現(xiàn):
internal abstract class BaseContinuationImpl(
// 完成后調(diào)用的 Continuation
public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {
public final override fun resumeWith(result: Result<Any?>) {
// This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
var current = this
var param = result
while (true) {
probeCoroutineResumed(current)
with(current) {
val completion = completion!! // fail fast when trying to resume continuation without completion
val outcome: Result<Any?> =
try {
// 1\. 執(zhí)行 suspend 中的代碼塊
val outcome = invokeSuspend(param)
// 2\. 如果代碼掛起就提前返回
if (outcome === COROUTINE_SUSPENDED) return
// 3\. 返回結(jié)果
Result.success(outcome)
} catch (exception: Throwable) {
// 3\. 返回失敗結(jié)果
Result.failure(exception)
}
releaseIntercepted() // this state machine instance is terminating
if (completion is BaseContinuationImpl) {
// 4\. 如果 completion 中還有子 completion伙狐,遞歸
current = completion
param = outcome
} else {
// 5\. 結(jié)果通知
completion.resumeWith(outcome)
return
}
}
}
}
}
復(fù)制代碼
主要的過程我在注釋中已經(jīng)標(biāo)注出來了涮毫,我來解釋一下 Continuation
的機(jī)制。
每個(gè) suspend
方法生成的 BaseContinuationImpl
贷屎,其構(gòu)造方法有一個(gè)參數(shù)叫 completion
,它也是一個(gè) Continuation
艘虎,它的調(diào)用時(shí)機(jī)是在 suspen
方法執(zhí)行完畢的時(shí)候唉侄。我們后面稱
這個(gè)流程展示給我們的內(nèi)容很直觀了,簡(jiǎn)單起見野建,我們直接看3属划、4和5這一個(gè) launch
啟動(dòng)流程就好,通常一個(gè) launch
生成一個(gè)外層 Continuation
一個(gè)相應(yīng)的結(jié)果 Continuation
候生,我們后面稱結(jié)果 continuation
為 complete
同眯,Continuation
調(diào)用順序是:
- 調(diào)用外層
Continuation
中的Continuation#resumeWith()
方法。 - 該方法會(huì)去執(zhí)行
launch
包裹的代碼塊唯鸭,并返回一個(gè)結(jié)果须蜗。 - 將上述代碼塊執(zhí)行的結(jié)果交給
completion
,由它完成協(xié)程結(jié)束的通知目溉。
上述的過程只存在于一個(gè) launch
并且里面沒有執(zhí)行其他耗時(shí)的掛起操作明肮,對(duì)于這些情況,我們將會(huì)在下面的文章討論缭付。
拋出問題一: 可以看到柿估,在注釋2,遇到耗時(shí)的 suspend
陷猫,返回的結(jié)果是一個(gè) COROUTINE_SUSPENDED
秫舌,后面會(huì)直接返回,耗時(shí)操作結(jié)束的時(shí)候绣檬,我們的 completion
怎么恢復(fù)呢足陨?
2. CoroutineContext 和 Element
在概要分析的時(shí)候,我們說 CoroutineContext
的結(jié)構(gòu)像一個(gè)集合河咽,是從它的接口得出結(jié)論的:
public interface CoroutineContext {
// get 方法钠右,通過 key 獲取
public operator fun <E : Element> get(key: Key<E>): E?
// 累加操作
public fun <R> fold(initial: R, operation: (R, Element) -> R): R
// 操作符 + , 實(shí)際的實(shí)現(xiàn)調(diào)用了 fold 方法
public operator fun plus(context: CoroutineContext): CoroutineContext
// 移除操作
public fun minusKey(key: Key<*>): CoroutineContext
// CoroutineContext 定義的 Key
public interface Key<E : Element>
// CoroutineContext 中元素的定義
public interface Element : CoroutineContext {
// key
public val key: Key<*>
//...
}
}
復(fù)制代碼
從中我們可以大致看出,CoroutineContext
中可以通過 Key
來獲取元素 Element
忘蟹,并且 Element
接口也是繼承自 CoroutineContext
接口飒房。
除此以外搁凸,CoroutineContext
支持增加和移除操作,并且支持 +
操作符來完成增加狠毯。+
操作符即 plus
方法是有具體實(shí)現(xiàn)的护糖,感興趣的可以自己看一下,主要涉及到了攔截器 ContinuationInterceptor
的添加嚼松。
1.1 Job
Job
的注釋中闡述定義是這樣的:
A background job. Conceptually, a job is a cancellable thing with a life-cycle that culminates in its completion.
從中我們可以得出:
- 后臺(tái)任務(wù)
- 可取消
- 生命周期在完成它的時(shí)候結(jié)束
從后臺(tái)任務(wù)的角度來看嫡良,Job
聽著有點(diǎn)像 Thread
,和 Thread
一樣献酗,Job
也有各種狀態(tài)寝受,文檔中對(duì) Job
各種狀態(tài)的注釋(感覺大佬們的注釋寫的真棒~):
Job
另一個(gè)值得關(guān)注的點(diǎn)是對(duì)子 Job
的管理,主要的規(guī)則如下:
- 子
Job
都會(huì)結(jié)束的時(shí)候罕偎,父Job
才會(huì)結(jié)束 - 父
Job
取消的時(shí)候很澄,子Job
也會(huì)取消
上述的一些內(nèi)容都可以從 Job
的接口文檔中得出。那么颜及,Job
哪里來的甩苛?如果你看一下CoroutineScope#launch
方法,你就會(huì)得出結(jié)論俏站,該方法的返回類型就是 Job
讯蒲,我們每次調(diào)用該方法,都會(huì)創(chuàng)建一個(gè) Job
肄扎。
1.2 ContinuationInterceptor
顧名思義墨林,Continuation
攔截器,先看接口:
interface ContinuationInterceptor : CoroutineContext.Element {
// ContinuationInterceptor 在 CoroutineContext 中的 Key
companion object Key : CoroutineContext.Key<ContinuationInterceptor>
/**
* 攔截 continuation
*/
fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T>
//...
}
復(fù)制代碼
這個(gè)接口可以提煉的就這兩個(gè)信息:
- 攔截器的
Key
反浓,也就是說萌丈,無論你后面一個(gè)CoroutineContext
放了多少個(gè)攔截器,Key
為ContinuationInterceptor
的攔截器只能有一個(gè)雷则。 - 我們都知道辆雾,
Continuation
在調(diào)用其Continuation#resumeWith()
方法,會(huì)執(zhí)行其suspend
修飾的函數(shù)的代碼塊月劈,如果我們提前攔截到度迂,是不是可以做點(diǎn)其他事情,比如說切換線程猜揪,這也是ContinuationInterceptor
的作用之一惭墓。
需要說明一下,我們通過 Dispatchers
來指定協(xié)程發(fā)生的線程而姐,Dispatchers
實(shí)現(xiàn)了 ContinuationInterceptor
接口腊凶。
3. CoroutineScope
CoroutineScope
的接口很簡(jiǎn)單:
public interface CoroutineScope {
public val coroutineContext: CoroutineContext
}
復(fù)制代碼
它要求后續(xù)的實(shí)現(xiàn)都要提供 CoroutineContext
,不過我們都知道,CoroutineContext
是協(xié)程中很重要的東西钧萍,既包括 Job
褐缠,也包括調(diào)度器。
在上面的代碼中风瘦,我多次使用了 Android Jetpack 中的 Lifecycle 中協(xié)程的擴(kuò)展庫队魏,好處我們獲取 CoroutineScope
更加簡(jiǎn)單,無需在組件 onDestroy
的時(shí)候手動(dòng) cancel
万搔,并且它的源碼超級(jí)簡(jiǎn)單胡桨,前提是你會(huì)使用 Lifecycle
:
internal class LifecycleCoroutineScopeImpl(
override val lifecycle: Lifecycle,
override val coroutineContext: CoroutineContext
) : LifecycleCoroutineScope(), LifecycleEventObserver {
// ...
override fun onStateChanged(source: LifecycleOwner, event: Lifecycle.Event) {
if (lifecycle.currentState <= Lifecycle.State.DESTROYED) {
lifecycle.removeObserver(this)
coroutineContext.cancel()
}
}
}
復(fù)制代碼
并且它也支持你在指定的生命周期調(diào)用協(xié)程,大家看一下接口就明白了瞬雹。
三昧谊、過程源碼分析
先上一段使用代碼:
lifecycleScope.launch(Dispatchers.Main) {
val a = async { getResult(1, 2) }
val b = async { getResult(3, 5) }
val c = a.await() + b.await()
Log.e(TAG, "result:$c")
}
suspend fun getResult(a: Int, b: Int): Int {
return withContext(Dispatchers.IO) {
delay(1000)
return@withContext a + b
}
}
復(fù)制代碼
雖然代碼很簡(jiǎn)單,但是源碼還是比較復(fù)雜的挖炬,我們分步講揽浙。
第一步 獲取 CoroutineScope
我已經(jīng)在上面說明了,我們使用的 Lifecycle
的協(xié)程拓展庫意敛,如果我們不使用拓展庫,就得使用 MainScope
膛虫,它們的 CoroutineContext
都是一樣的:
public fun MainScope(): CoroutineScope = ContextScope(SupervisorJob() + Dispatchers.Main)
// LifecycleCoroutineScope
val Lifecycle.coroutineScope: LifecycleCoroutineScope
get() {
while (true) {
// ...
val newScope = LifecycleCoroutineScopeImpl(
this,
SupervisorJob() + Dispatchers.Main.immediate
)
// ...
return newScope
}
}
復(fù)制代碼
顯而易見草姻,MainScope
和 LifecycleCoroutineScope
都使用了 SupervisorJob() + Dispatchers.Main
, 作為它們的 CoroutineContext
稍刀。
說明一下撩独,SupervisorJob
和Dispatchers.Main
很重要,它們分別代表了CoroutineContext
之前提及的 Job
和 ContinuationInterceptor
账月,后面用到的時(shí)候再分析综膀。
第二步 啟動(dòng)協(xié)程
直接進(jìn)入 CoroutineScope#launch()
方法:
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}
復(fù)制代碼
上面的方法一共有三個(gè)參數(shù),前兩個(gè)不作過多介紹局齿,第三個(gè)參數(shù):
block: suspend CoroutineScope.() -> Unit)
復(fù)制代碼
這是一個(gè)方法剧劝,是一個(gè) lambda
參數(shù),同時(shí)也表明了它需要被 suspend
修飾抓歼。 繼續(xù)看 launch
方法讥此,發(fā)現(xiàn)它主要做了兩件事:
- 組合新的
CoroutineContext
- 再創(chuàng)建一個(gè)
Continuation
組合新的CoroutineContext
在第一行代碼 val newContext = newCoroutineContext(context)
做了第一件事,這里的 newCoroutineContext(context)
是一個(gè)擴(kuò)展方法:
public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext {
val combined = coroutineContext + context
val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined
return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null)
debug + Dispatchers.Default else debug
}
復(fù)制代碼
CoroutineScope
使用本身的 coroutineContext
集合谣妻,利用 +
操作符將我們?cè)?launch
方法中提供的 coroutineContext
添加進(jìn)來萄喳。
再創(chuàng)建一個(gè)Continuation
回到上一段代碼,通常我們不會(huì)指定 start
參數(shù)蹋半,所以它會(huì)使用默認(rèn)的 CoroutineStart.DEFAULT
他巨,最終 coroutine
會(huì)得到一個(gè) StandaloneCoroutine
。
StandaloneCoroutine
實(shí)現(xiàn)自 AbstractCoroutine
,翻開上面的類圖染突,你會(huì)發(fā)現(xiàn)捻爷,它實(shí)現(xiàn)了 Continuation
、Job
和 CoroutineScope
等一堆接口觉痛。需要說明一下役衡,這個(gè) StandaloneCoroutine
其實(shí)是我們當(dāng)前 Suspend Contination
的 complete
。
接著會(huì)調(diào)用
coroutine.start(start, coroutine, block)
復(fù)制代碼
這就表明協(xié)程開始啟動(dòng)了薪棒。
第三步 start
進(jìn)入到 AbstractCoroutine#start
方法:
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
initParentJob()
start(block, receiver, this)
}
復(fù)制代碼
跳過層層嵌套手蝎,最后到達(dá)了:
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) =
runSafely(completion) {
// 外面再包一層 Coroutine
createCoroutineUnintercepted(receiver, completion)
// 如果需要,做攔截處理
.intercepted()
// 調(diào)用 resumeWith 方法
.resumeCancellableWith(Result.success(Unit))
}
復(fù)制代碼
雖然這僅僅是一個(gè)函數(shù)俐芯,但是后面主要的邏輯都揭露了:
- 創(chuàng)建一個(gè)沒有攔截過的
Continuation
棵介。 - 攔截
Continuation
。 - 執(zhí)行
Continuation#resumeWith
方法吧史。
第四步 又創(chuàng)建 Continuation
我這里用了 又邮辽,因?yàn)槲覀冊(cè)?launch
中已經(jīng)創(chuàng)建了一個(gè) AbstractContinuaion
,不過它是一個(gè) complete
贸营,從各個(gè)函數(shù)的行參就可以看出來吨述。
不過我們 suspend
修飾的外層 Continuation
還沒有創(chuàng)建,它來了钞脂,是 SuspendLambda
揣云,它繼承自 ContinuationImpl
,如果你問我為什么源碼中沒找到具體實(shí)現(xiàn)冰啃,我覺得可能跟 suspend
修飾符有關(guān)邓夕,由編譯器處理,但是調(diào)用棧確實(shí)是這樣的:
看一下 SuspendLambda
類的實(shí)現(xiàn):
internal abstract class SuspendLambda(
public override val arity: Int,
completion: Continuation<Any?>?
) : ContinuationImpl(completion), FunctionBase<Any?>, SuspendFunction {
constructor(arity: Int) : this(arity, null)
//...
}
復(fù)制代碼
可以看到阎毅,它的構(gòu)造方法的形參就包括一個(gè) complete
焚刚。
第五步 攔截處理
回到:
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) =
runSafely(completion) {
// 外面再包一層 Coroutine
createCoroutineUnintercepted(receiver, completion)
// 如果需要,做攔截處理
.intercepted()
// 調(diào)用 resumeWith 方法
.resumeCancellableWith(Result.success(Unit))
}
復(fù)制代碼
里面的攔截方法 Continuation#intercepted()
方法是一個(gè)擴(kuò)展方法:
@SinceKotlin("1.3")
public actual fun <T> Continuation<T>.intercepted(): Continuation<T> =
(this as? ContinuationImpl)?.intercepted() ?: this
復(fù)制代碼
createCoroutineUnintercepted(receiver, completion)
返回的是一個(gè) SuspendLambda
扇调,所以它肯定是一個(gè) ContinuationImpl
矿咕,看一下它的攔截方法的實(shí)現(xiàn):
internal abstract class ContinuationImpl(
completion: Continuation<Any?>?,
private val _context: CoroutineContext?
) : BaseContinuationImpl(completion) {
constructor(completion: Continuation<Any?>?) : this(completion, completion?.context)
public override val context: CoroutineContext
get() = _context!!
public fun intercepted(): Continuation<Any?> =
intercepted
?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
.also { intercepted = it }
// ...
}
復(fù)制代碼
在 ContinuationImpl#intercepted()
方法中,直接利用 context
這個(gè)數(shù)據(jù)結(jié)構(gòu)通過 context[ContinuationInterceptor]
獲取攔截器肃拜。
CoroutineDispatcher攔截實(shí)現(xiàn)
我們都知道 ContinuationInterceptor
具有攔截作用痴腌,它的直接實(shí)現(xiàn)是 CoroutineDispatcher
這個(gè)抽象類,所有其他調(diào)度器都直接或者間接繼承這個(gè)類燃领,我們關(guān)注一下它的攔截方法:
public abstract class CoroutineDispatcher :
AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
//...
public abstract fun dispatch(context: CoroutineContext, block: Runnable)
// 1.攔截的 Continuation 被包了一層 DispatchedContinuation
public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
DispatchedContinuation(this, continuation)
//...
}
internal class DispatchedContinuation<in T>(
@JvmField val dispatcher: CoroutineDispatcher,
@JvmField val continuation: Continuation<T>
) : DispatchedTask<T>(MODE_ATOMIC_DEFAULT), CoroutineStackFrame, Continuation<T> by continuation {
// ...
override fun resumeWith(result: Result<T>) {
// ...
if (dispatcher.isDispatchNeeded(context)) {
// 2\. 后面一個(gè)參數(shù)需要提供 Runnable士聪,父類已經(jīng)實(shí)現(xiàn)
dispatcher.dispatch(context, this)
}
//...
}
// ...
}
// SchedulerTask 是一個(gè) Runnable
internal abstract class DispatchedTask<in T>(
@JvmField public var resumeMode: Int
) : SchedulerTask() {
// ...
public final override fun run() {
// ...
try {
//...
withCoroutineContext(context, delegate.countOrElement) {
// 3\. continuation 是 DispatchedContinuation 包裹的 continuation
continuation.resume(...)
}
}
//...
}
}
復(fù)制代碼
簡(jiǎn)單來說,就是對(duì)原有的 Continuation
的 resumeWith
操作加了一層攔截猛蔽,就像這樣:
<figcaption></figcaption>
加入 CoroutineDispatcher
以后剥悟,執(zhí)行真正的 Continue#resumeWith()
之前灵寺,會(huì)執(zhí)行 CoroutineDispatcher#dispatch()
方法,所以我們現(xiàn)在關(guān)注 CoroutineDispatcher#dispatch
具體實(shí)現(xiàn)即可区岗。
講一個(gè)CoroutineDispatcher具體實(shí)現(xiàn)
首先我們得明確這個(gè) CoroutineDispatcher
來自哪里略板?它從 context
獲取,context
來自哪里慈缔?
注意 SuspendLambda
和 ContinuationImpl
的構(gòu)造方法叮称,SuspendLambda
中的參數(shù)沒有 CoroutineContext
,所以只能來自 completion
中的 CoroutineContext
藐鹤,而completion
的 CoroutineContext
來自 launch
方法中來自 CoroutineScope
瓤檐,默認(rèn)是 SupervisorJob() + Dispatchers.Main
,不過只有 Dispatchers.Main
繼承了 CoroutineDispatcher
娱节。
Dispatchers.Main
是一個(gè) MainCoroutineDispatcher
挠蛉,Android 中對(duì)應(yīng)的 MainCoroutineDispatcher
是 HandlerContext
:
internal class HandlerContext private constructor(
private val handler: Handler,
private val name: String?,
private val invokeImmediately: Boolean
) : HandlerDispatcher(), Delay {
public constructor(
handler: Handler,
name: String? = null
) : this(handler, name, false)
//...
override fun dispatch(context: CoroutineContext, block: Runnable) {
// 利用主線程的 Handler 執(zhí)行任務(wù)
handler.post(block)
}
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
// 利用主線程的 Handler 延遲執(zhí)行任務(wù),將完成的 continuation 放在任務(wù)中執(zhí)行
val block = Runnable {
with(continuation) { resumeUndispatched(Unit) }
}
handler.postDelayed(block, timeMillis.coerceAtMost(MAX_DELAY))
continuation.invokeOnCancellation { handler.removeCallbacks(block) }
}
//..
}
復(fù)制代碼
重點(diǎn)來了肄满,調(diào)度任務(wù)最后竟然交給了主線程的 Handler
谴古,其實(shí)想想也對(duì),主線程的任務(wù)最后一般都會(huì)交給主線程的 Handler
稠歉。
好奇的同學(xué)可能問了掰担,如果不是主線程呢?不是主線程就利用的線程池:
public open class ExperimentalCoroutineDispatcher(
private val corePoolSize: Int,
private val maxPoolSize: Int,
private val idleWorkerKeepAliveNs: Long,
private val schedulerName: String = "CoroutineScheduler"
) : ExecutorCoroutineDispatcher() {
// 執(zhí)行期
override val executor: Executor
get() = coroutineScheduler
private var coroutineScheduler = createScheduler()
override fun dispatch(context: CoroutineContext, block: Runnable): Unit =
try {
coroutineScheduler.dispatch(block)
} catch (e: RejectedExecutionException) {
DefaultExecutor.dispatch(context, block)
}
}
復(fù)制代碼
結(jié)果可以說是很清晰了怒炸,coroutineScheduler
是一個(gè)線程池恩敌,如果像了解具體的過程,同學(xué)們可以自行查看代碼横媚。
讀到這里,你可能有一點(diǎn)明白 CoroutineContext
為什么要設(shè)計(jì)成一種數(shù)據(jù)結(jié)構(gòu):
-
coroutineContext[ContinuationInterceptor]
就可以直接取到當(dāng)前協(xié)程的攔截器月趟,并且一個(gè)協(xié)程只能對(duì)應(yīng)一個(gè)調(diào)度器灯蝴。 - 調(diào)度器都放在其他
coroutineContext
的前面,所以在執(zhí)行協(xié)程的時(shí)候孝宗,可以做攔截處理穷躁。
同理,我們也可以使用 coroutineContext[Job]
獲取當(dāng)前協(xié)程因妇。
第六步 resumeWith
再次回到:
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) =
runSafely(completion) {
// 外面再包一層 Coroutine
createCoroutineUnintercepted(receiver, completion)
// 如果需要问潭,做攔截處理
.intercepted()
// 調(diào)用 resumeWith 方法
.resumeCancellableWith(Result.success(Unit))
}
復(fù)制代碼
現(xiàn)在我們看 Continue#resumeCancellableWith()
方法,它是一個(gè)擴(kuò)展方法婚被,里面的調(diào)度邏輯是:
DispatchContinuation#resumeCancellableWith
CoroutineDispatcher#dispatch
Continuation#resumeWith
這里的 Continuation
就是 SuspendLambda
狡忙,它繼承了 BaseContinuationImpl
,我們看一下它的實(shí)現(xiàn)方法:
internal abstract class BaseContinuationImpl(
public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {
// This implementation is final. This fact is used to unroll resumeWith recursion.
public final override fun resumeWith(result: Result<Any?>) {
// This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
var current = this
var param = result
while (true) {
// Invoke "resume" debug probe on every resumed continuation, so that a debugging library infrastructure
// can precisely track what part of suspended callstack was already resumed
probeCoroutineResumed(current)
with(current) {
val completion = completion!! // fail fast when trying to resume continuation without completion
val outcome: Result<Any?> =
try {
// 1\. 執(zhí)行 suspend 里面的代碼塊
val outcome = invokeSuspend(param)
// 2\. 如果代碼塊里面執(zhí)行了掛起方法址芯,會(huì)提前返回
if (outcome === COROUTINE_SUSPENDED) return
Result.success(outcome)
} catch (exception: Throwable) {
Result.failure(exception)
}
releaseIntercepted() // this state machine instance is terminating
if (completion is BaseContinuationImpl) {
// 3\. 如果完成的completion也是BaseContinuationImpl灾茁,就會(huì)進(jìn)入循環(huán)
current = completion
param = outcome
} else {
// 4\. 執(zhí)行 completion resumeWith 方法
completion.resumeWith(outcome)
return
}
}
}
}
}
復(fù)制代碼
這邊被我分為2個(gè)部分:
- 執(zhí)行
suspend
方法窜觉,并獲取結(jié)果 - 調(diào)用
complete
(放在下一步講)
執(zhí)行suspend方法
在第一處會(huì)先執(zhí)行 suspend
修飾的方法內(nèi)容,在方法里面可能又會(huì)調(diào)度 suspend
方法北专,比如說我們的實(shí)例方法:
lifecycleScope.launch(Dispatchers.Main) {
val a = async { getResult(1, 2) }
val b = async { getResult(3, 5) }
val c = a.await() + b.await()
Log.e(TAG, "result:$c")
}
suspend fun getResult(a: Int, b: Int): Int {
return withContext(Dispatchers.IO) {
delay(1000)
return@withContext a + b
}
}
復(fù)制代碼
因?yàn)槲覀冊(cè)?getResult
執(zhí)行了延時(shí)操作禀挫,所以我們 launch
方法肯定執(zhí)行了耗時(shí)掛起方法,所以 BaseContinuationImpl#invokeSuspend
方法會(huì)返回一個(gè) COROUTINE_SUSPENDED
拓颓,結(jié)果你也看到了语婴,該方法會(huì)提前結(jié)束。(說明一下驶睦,我沒有找到BaseContinuationImpl#invokeSuspend
方法的具體實(shí)現(xiàn)砰左,我猜可能跟編譯器有關(guān))
我猜你肯定跟我一樣好奇,遇到耗時(shí)掛起會(huì)提前返回啥繁,那么耗時(shí)掛起如何對(duì) complete
進(jìn)行恢復(fù)的菜职?
我們看一下 delay(1000)
這個(gè)延時(shí)操作在主線程是如何處理的:
public suspend fun delay(timeMillis: Long) {
if (timeMillis <= 0) return // don't delay
return suspendCancellableCoroutine sc@ { cont: CancellableContinuation<Unit> ->
cont.context.delay.scheduleResumeAfterDelay(timeMillis, cont)
}
}
internal class HandlerContext private constructor(
private val handler: Handler,
private val name: String?,
private val invokeImmediately: Boolean
) : HandlerDispatcher(), Delay {
//...
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
val block = Runnable {
with(continuation) { resumeUndispatched(Unit) }
}
handler.postDelayed(block, timeMillis.coerceAtMost(MAX_DELAY))
continuation.invokeOnCancellation { handler.removeCallbacks(block) }
}
//...
}
復(fù)制代碼
可以看到,將恢復(fù)任務(wù)包了一個(gè) Runnable
旗闽,交給 Handler
的 Handler#postDelayed()
方法了酬核。
第七步 complete resumeWith
對(duì)于 complete
的處理一般會(huì)有兩種。
complete是BaseContinuationImpl
第一種情況是我們稱之為套娃适室,完成回調(diào)的 Continuation
它本身也有自己的完成回調(diào) Continuation
嫡意,接下來循環(huán)就對(duì)了。
調(diào)用complete的resumeWith
第二種情況捣辆,就是通過 complete
去完成回調(diào)蔬螟,由于 complete
是 AbstractContinuation
,我們看一下它的 resumeWith
:
public abstract class AbstractCoroutine<in T>(
/**
* The context of the parent coroutine.
*/
@JvmField
protected val parentContext: CoroutineContext,
active: Boolean = true
) : JobSupport(active), Job, Continuation<T>, CoroutineScope {
// ...
public final override fun resumeWith(result: Result<T>) {
// 1\. 獲取當(dāng)前協(xié)程的技術(shù)狀態(tài)
val state = makeCompletingOnce(result.toState())
// 2\. 如果當(dāng)前還在等待完成汽畴,說明還有子協(xié)程沒有結(jié)束
if (state === COMPLETING_WAITING_CHILDREN) return
// 3\. 執(zhí)行結(jié)束恢復(fù)的方法旧巾,默認(rèn)為空
afterResume(state)
}
// 這是父類 JobSupport 中的 makeCompletingOnce 方法
// 為了方便查看,我復(fù)制過來
internal fun makeCompletingOnce(proposedUpdate: Any?): Any? {
loopOnState { state ->
// tryMakeCompleting 的內(nèi)容主要根據(jù)是否有子Job做不同處理
val finalState = tryMakeCompleting(state, proposedUpdate)
when {
finalState === COMPLETING_ALREADY ->
throw IllegalStateException(
"Job $this is already complete or completing, " +
"but is being completed with $proposedUpdate", proposedUpdate.exceptionOrNull
)
finalState === COMPLETING_RETRY -> return@loopOnState
else -> return finalState // COMPLETING_WAITING_CHILDREN or final state
}
}
}
}
復(fù)制代碼
這段代碼的意思其實(shí)也很簡(jiǎn)單忍些,就是協(xié)程即將完成鲁猩,得先評(píng)估一下協(xié)程的技術(shù)狀態(tài),別協(xié)程還有東西在運(yùn)行罢坝,就給結(jié)束了廓握。對(duì)于一些有子協(xié)程的一些協(xié)程,會(huì)等待子協(xié)程結(jié)束的時(shí)候嘁酿,才會(huì)結(jié)束當(dāng)前協(xié)程隙券。
一個(gè) launch
的過程大概就是這樣了。大致的流程圖是這樣的:
下面我們?cè)僬務(wù)?async
闹司。
四娱仔、關(guān)于async
async
和 launch
的代碼相似度很高:
public fun <T> CoroutineScope.async(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> T
): Deferred<T> {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyDeferredCoroutine(newContext, block) else
DeferredCoroutine<T>(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}
復(fù)制代碼
最終也會(huì)進(jìn)行三步走:
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) =
runSafely(completion) {
// 外面再包一層 Coroutine
createCoroutineUnintercepted(receiver, completion)
// 如果需要,做攔截處理
.intercepted()
// 調(diào)用 resumeWith 方法
.resumeCancellableWith(Result.success(Unit))
}
復(fù)制代碼
不同的是开仰,async
返回的是一個(gè) Deferred<T>
拟枚,我們需要調(diào)用 Deferred#await()
去獲取返回結(jié)果薪铜,它的實(shí)現(xiàn)在 JobSupport
:
private open class DeferredCoroutine<T>(
parentContext: CoroutineContext,
active: Boolean
) : AbstractCoroutine<T>(parentContext, active), Deferred<T>, SelectClause1<T> {
// ... awaitInternal方法來自父類 JobSupport
override suspend fun await(): T = awaitInternal() as T
// ...
// 這是 JobSupport 中的實(shí)現(xiàn)
internal suspend fun awaitInternal(): Any? {
// 循環(huán)獲取結(jié)果
while (true) { // lock-free loop on state
val state = this.state
// 1\. 如果處于完成狀態(tài)
if (state !is Incomplete) {
if (state is CompletedExceptionally) { // Slow path to recover stacktrace
recoverAndThrow(state.cause)
}
return state.unboxState()
}
// 2\. 除非需要重試,不然就 break
if (startInternal(state) >= 0) break
}
// 等待掛起的方法
return awaitSuspend() // slow-path
}
}
復(fù)制代碼
它的具體過程可以從我的注釋看出恩溅,就不一一介紹了隔箍,感興趣的同學(xué)可以查看源碼。
1. 本文一開始的討論
本文一開始的代碼是錯(cuò)的脚乡,連編譯器都過不了蜒滩,尷尬~
正確的代碼應(yīng)該是:
GlobalScope.launch {
val a = async {
1+2
}
val b = async {
1+3
}
val c = a.await() + bawait()
Log.e(TAG,"result:$c")
}
復(fù)制代碼
如果是正確的代碼,這里可能分兩種情況:
如果你放在UI線程奶稠,那肯定是串行的俯艰,這時(shí)候有人說,我在 a
里使用 delay(1000)
锌订,在 b
里使用 delay(2000)
竹握,得到 c
的時(shí)候就花了 2000
毫秒啊,這不是并行嗎辆飘?事情并不是這樣的啦辐,delay
操作使用了 Handler#postDelay
方法,一個(gè)延遲了 1000
毫秒執(zhí)行蜈项,一個(gè)延遲了 2000
毫秒執(zhí)行芹关,但是主線程只有一個(gè),所以只能是串行紧卒。
如果是子線程侥衬,通常都是并行的,因?yàn)槲覀兪褂昧司€程池啊~
總結(jié)
寫這邊源碼分析的時(shí)候跑芳,一些細(xì)節(jié)總是找不到轴总,比如說 suspendLambda
的子類找不到,自己對(duì) Kotlin 的學(xué)習(xí)有待深入博个。
所以本文有些地方還值得商榷肘习,如果你有更好的理解,歡迎下方交流坡倔。