今天我們來聊聊Kotlin
的協(xié)程Coroutine
萨咕。
如果你還沒有接觸過協(xié)程瓦灶,推薦你先閱讀這篇入門級文章What? 你還不知道Kotlin Coroutine?
如果你已經(jīng)接觸過協(xié)程允瞧,但對協(xié)程的原理存在疑惑稠歉,那么在閱讀本篇文章之前推薦你先閱讀下面的文章,這樣能讓你更全面更順暢的理解這篇文章诀紊。
Kotlin協(xié)程實現(xiàn)原理:Suspend&CoroutineContext
Kotlin協(xié)程實現(xiàn)原理:CoroutineScope&Job
如果你已經(jīng)接觸過協(xié)程,相信你都有過以下幾個疑問:
- 協(xié)程到底是個什么東西隅俘?
- 協(xié)程的
suspend
有什么作用邻奠,工作原理是怎樣的? - 協(xié)程中的一些關(guān)鍵名稱(例如:
Job
为居、Coroutine
碌宴、Dispatcher
、CoroutineContext
與CoroutineScope
)它們之間到底是怎么樣的關(guān)系蒙畴? - 協(xié)程的所謂非阻塞式掛起與恢復又是什么贰镣?
- 協(xié)程的內(nèi)部實現(xiàn)原理是怎么樣的?
- ...
接下來的一些文章試著來分析一下這些疑問忍抽,也歡迎大家一起加入來討論八孝。
ContinuationInterceptor
看到Interceptor
相信第一印象應該就是攔截器,例如在Okhttp
中被廣泛應用鸠项。自然在協(xié)程中ContinuationInterceptor
的作用也是用來做攔截協(xié)程的干跛。
下面來看下它的實現(xiàn)。
public interface ContinuationInterceptor : CoroutineContext.Element {
/**
* The key that defines *the* context interceptor.
*/
companion object Key : CoroutineContext.Key<ContinuationInterceptor>
/**
* Returns continuation that wraps the original [continuation], thus intercepting all resumptions.
* This function is invoked by coroutines framework when needed and the resulting continuations are
* cached internally per each instance of the original [continuation].
*
* This function may simply return original [continuation] if it does not want to intercept this particular continuation.
*
* When the original [continuation] completes, coroutine framework invokes [releaseInterceptedContinuation]
* with the resulting continuation if it was intercepted, that is if `interceptContinuation` had previously
* returned a different continuation instance.
*/
public fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T>
...
}
只給出了關(guān)鍵部分祟绊,ContinuationInterceptor
繼承于CoroutineContext.Element
楼入,所以它也是CoroutineContext
,同時提供了interceptContinuation
方法牧抽,先記住這個方法后續(xù)會用到嘉熊。
大家是否還記得在Kotlin協(xié)程實現(xiàn)原理系列的第一篇文章中,我們分析了CoroutineContext
的內(nèi)部結(jié)構(gòu)扬舒,當時提到了它的plus
方法阐肤,就是下面這段代碼
public operator fun plus(context: CoroutineContext): CoroutineContext =
if (context === EmptyCoroutineContext) this else // fast path -- avoid lambda creation
context.fold(this) { acc, element ->
val removed = acc.minusKey(element.key)
if (removed === EmptyCoroutineContext) element else {
// make sure interceptor is always last in the context (and thus is fast to get when present)
val interceptor = removed[ContinuationInterceptor]
if (interceptor == null) CombinedContext(removed, element) else {
val left = removed.minusKey(ContinuationInterceptor)
if (left === EmptyCoroutineContext) CombinedContext(element, interceptor) else
CombinedContext(CombinedContext(left, element), interceptor)
}
}
}
在這里第一次看到了ContinuationInterceptor
的身影,當時核心是為了分析CoroutineContext
,所以只是提了plus
方法每次都會將ContinuationInterceptor
添加到拼接鏈的尾部孕惜。
不知道有沒有老鐵想過這個問題愧薛,為什么要每次新加入一個CoroutineContext
都要調(diào)整ContinuationInterceptor
的位置,并將它添加到尾部衫画?
這里其實涉及到兩點毫炉。
其中一點是由于CombinedContext
的結(jié)構(gòu)決定的。它有兩個元素分別是left
與element
削罩。而left
類似于前驅(qū)節(jié)點瞄勾,它是一個前驅(qū)集合,而element
只是一個純碎的CoroutineContext
弥激,而它的get
方法每次都是從element
開始進行查找對應Key
的CoroutineContext
對象进陡;沒有匹配到才會去left
集合中進行遞歸查找。
所以為了加快查找ContinuationInterceptor
類型的實例秆撮,才將它加入到拼接鏈的尾部四濒,對應的就是element
换况。
另一個原因是ContinuationInterceptor
使用的很頻繁职辨,因為每次創(chuàng)建協(xié)程都會去嘗試查找當前協(xié)程的CoroutineContext
中是否存在ContinuationInterceptor
。例如我們通過launch
來看協(xié)程的啟動戈二。
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}
如果你使用launch
的默認參數(shù)舒裤,那么此時的Coroutine
就是StandaloneCoroutine
,然后調(diào)用start
方法啟動協(xié)程觉吭。
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
initParentJob()
start(block, receiver, this)
}
在start
中進入了CoroutineStart
腾供,對應的就是下面這段代碼
public operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>) =
when (this) {
CoroutineStart.DEFAULT -> block.startCoroutineCancellable(receiver, completion)
CoroutineStart.ATOMIC -> block.startCoroutine(receiver, completion)
CoroutineStart.UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
CoroutineStart.LAZY -> Unit // will start lazily
}
因為我們使用的是默認參數(shù),所以這里對應的就是CoroutineStart.DEFAULT
鲜滩,最終來到block.startCoroutineCancellable
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) =
runSafely(completion) {
createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellable(Unit)
}
在這里我們終于看到了intercepted
伴鳖。
首先通過createCoroutineUnintercepted
來創(chuàng)建一個協(xié)程(內(nèi)部具體如何創(chuàng)建的這篇文章先不說,后續(xù)文章會單獨分析)徙硅,然后再調(diào)用了intercepted
方法進行攔截操作榜聂,最后再resumeCancellable
,這個方法最終調(diào)用的就是Continuation
的resumeWith
方法嗓蘑,即啟動協(xié)程须肆。
所以每次啟動協(xié)程都會自動回調(diào)一次resumeWith
方法。
今天的主題是ContinuationInterceptor
所以我們直接看intercepted
桩皿。
public expect fun <T> Continuation<T>.intercepted(): Continuation<T>
發(fā)現(xiàn)它是一個expect
方法豌汇,它會根據(jù)不同平臺實現(xiàn)不同的邏輯。因為我們是Android
所以直接看Android
上的actual
的實現(xiàn)
public actual fun <T> Continuation<T>.intercepted(): Continuation<T> =
(this as? ContinuationImpl)?.intercepted() ?: this
最終來到ContinuationImpl
的intercepted
方法
public fun intercepted(): Continuation<Any?> =
intercepted
?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
.also { intercepted = it }
在這里看到了熟悉的context
泄隔,獲取到ContinuationInterceptor
實例拒贱,并且調(diào)用它的interceptContinuation
方法返回一個處理過的Continuation
。
多次調(diào)用
intercepted
佛嬉,對應的interceptContinuation
只會調(diào)用一次逻澳。
所以ContinuationInterceptor
的攔截是通過interceptContinuation
方法進行的岩调。既然已經(jīng)明白了它的攔截方式,我們自己來手動寫一個攔截器來驗證一下赡盘。
val interceptor = object : ContinuationInterceptor {
override val key: CoroutineContext.Key<*> = ContinuationInterceptor
override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> {
println("intercept todo something. change run to thread")
return object : Continuation<T> by continuation {
override fun resumeWith(result: Result<T>) {
println("create new thread")
thread {
continuation.resumeWith(result)
}
}
}
}
}
println(Thread.currentThread().name)
lifecycleScope.launch(interceptor) {
println("launch start. current thread: ${Thread.currentThread().name}")
withContext(Dispatchers.Main) {
println("new continuation todo something in the main thread. current thread: ${Thread.currentThread().name}")
}
launch {
println("new continuation todo something. current thread: ${Thread.currentThread().name}")
}
println("launch end. current thread: ${Thread.currentThread().name}")
}
這里簡單實現(xiàn)了一個ContinuationInterceptor
号枕,如果攔截成功就會輸出interceptContinuation
中對應的語句。下面是程序運行后的輸出日志陨享。
main
// 第一次launch
intercept todo something. change run to thread
create new thread
launch start. current thread: Thread-2
new continuation todo something in the main thread. current thread: main
create new thread
// 第二次launch
intercept todo something. change run to thread
create new thread
launch end. current thread: Thread-7
new continuation todo something. current thread: Thread-8
分析一下上面的日志葱淳,首先程序運行在main
線程,通過lifecycleScope.launch
啟動協(xié)程并將我們自定義的intercetpor
加入到CoroutineContext
中抛姑;然后在啟動的過程中發(fā)現(xiàn)我們自定義的interceptor
攔截成功了赞厕,同時將原本在main
線程運行的程序切換到了新的thread
線程。同時第二次launch
的時候也攔截成功定硝。
到這里就已經(jīng)可以證明我們上面對ContinuationInterceptor
理解是正確的皿桑,它可以在協(xié)程啟動的時候進行攔截操作。
下面我們繼續(xù)看日志蔬啡,發(fā)現(xiàn)withContext
并沒有攔截成功诲侮,這是為什么呢?注意看Dispatchers.Main
箱蟆。這也是接下來需要分析的內(nèi)容沟绪。
另外還有一點,如果細心的老鐵就會發(fā)現(xiàn)空猜,launch start
與launch end
所處的線程不一樣绽慈,這是因為在withContext
結(jié)束之后,它內(nèi)部還會進行一次線程恢復辈毯,將自身所處的main
線程切換到之前的線程坝疼,但為什么又與之前launch start
的線程不同呢?
大家不要忘了谆沃,協(xié)程每一個掛起后的恢復都是通過回調(diào)resumeWith
進行的钝凶,然而外部launch
協(xié)程我們進行了攔截,在它返回的Continuation
的resumeWith
回調(diào)中總是會創(chuàng)建新的thread
管毙。所以發(fā)生這種情況也就不奇怪了腿椎,這是我們攔截的效果。
整體再來看這個例子夭咬,它是不是像一個簡易版的協(xié)程的線程切換呢啃炸?
CoroutineDispatcher
現(xiàn)在我們來看Dispatchers.Main
,為什么它會導致我們攔截失敗呢卓舵?要探究原因沒有直接看源碼更加直接有效的南用。
public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher
主要看它的類型,它返回的是MainCoroutineDispatcher
,然后再看它是什么
public abstract class MainCoroutineDispatcher : CoroutineDispatcher() {}
發(fā)現(xiàn)MainCoroutineDispatcher
繼承于CoroutineDispatcher
裹虫,主角登場了肿嘲,但還不夠我們繼續(xù)看CoroutineDispatcher
是什么
public abstract class CoroutineDispatcher :
AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
public open fun isDispatchNeeded(context: CoroutineContext): Boolean = true
public abstract fun dispatch(context: CoroutineContext, block: Runnable)
public open fun dispatchYield(context: CoroutineContext, block: Runnable) = dispatch(context, block)
public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
DispatchedContinuation(this, continuation)
}
真想已經(jīng)浮出水面了,原來CoroutineDispatcher
實現(xiàn)了ContinuationInterceptor
筑公,說明CoroutineDispatcher
也具有攔截器的功能雳窟。然后再結(jié)合CoroutineContext
的性質(zhì),就很好解釋為什么我們自定義的攔截器沒有生效匣屡。
原因就是它與我們自定義的攔截器一樣都實現(xiàn)了ContinuationInterceptor
接口封救,一旦使用Dispatchers.Main
就會替換掉我們自定義的攔截器。
因果關(guān)系弄明白了現(xiàn)在就好辦了捣作。我們已經(jīng)知道它具有攔截功能誉结,再來看CoroutineDispatcher
提供的另外幾個方法isDispatchNeeded
與dispatch
。
我們可以大膽猜測券躁,isDispatchNeeded
就是判斷是否需要分發(fā)惩坑,然后dispatch
就是如何進行分發(fā),接下來我們來驗證一下也拜。
ContinuationInterceptor
重要的方法就是interceptContinuation
以舒,在CoroutineDispatcher
中直接返回了DispatchedContinuation
對象,它是一個Continuation
類型搪泳。那么自然重點就是它的resumeWith
方法稀轨。
override fun resumeWith(result: Result<T>) {
val context = continuation.context
val state = result.toState()
if (dispatcher.isDispatchNeeded(context)) {
_state = state
resumeMode = MODE_ATOMIC_DEFAULT
dispatcher.dispatch(context, this)
} else {
executeUnconfined(state, MODE_ATOMIC_DEFAULT) {
withCoroutineContext(this.context, countOrElement) {
continuation.resumeWith(result)
}
}
}
}
這里我們看到了isDispatchNeeded
與dispatch
方法,如果不需要分發(fā)自然是直接調(diào)用原始的continuation
對象的resumeWith
方法岸军,也就沒有什么類似于線程的切換。
那什么時候isDispatcheNeeded
為true
呢瓦侮?這就要看它的dispatcer
是什么艰赞。
由于現(xiàn)在我們是拿Dispatchers.Main
作分析。所以這里我直接告訴你們它的dispatcher
是HandlerContext
override fun createDispatcher(allFactories: List<MainDispatcherFactory>) =
HandlerContext(Looper.getMainLooper().asHandler(async = true), "Main")
internal class HandlerContext private constructor(
private val handler: Handler,
private val name: String?,
private val invokeImmediately: Boolean
) : HandlerDispatcher(), Delay {
/**
* Creates [CoroutineDispatcher] for the given Android [handler].
*
* @param handler a handler.
* @param name an optional name for debugging.
*/
public constructor(
handler: Handler,
name: String? = null
) : this(handler, name, false)
@Volatile
private var _immediate: HandlerContext? = if (invokeImmediately) this else null
override val immediate: HandlerContext = _immediate ?:
HandlerContext(handler, name, true).also { _immediate = it }
override fun isDispatchNeeded(context: CoroutineContext): Boolean {
return !invokeImmediately || Looper.myLooper() != handler.looper
}
override fun dispatch(context: CoroutineContext, block: Runnable) {
handler.post(block)
}
...
}
它繼承于HandlerDispatcher
肚吏,而HandlerDispatcher
繼承于MainCoroutineDispatcher
方妖。
條件都符合,我們直接看isDispatchNeeded
方法返回true
的邏輯罚攀。
首先通過invokeImmediately
判斷党觅,它代表當前線程是否與自身的線程相同,如何你外部使用者能夠保證這一點斋泄,就可以直接使用Dispatcher.Main.immediate
來避免進行線程的切換邏輯杯瞻。當然為了保證外部的判斷失敗,最后也會通過Looper.myLooper() != handler.looper
來進行校正炫掐。對于Dispatchers.Main
這個的handle.looper
自然是主線程的looper
魁莉。
如果不能保證則invokeImmediately
為false
,直接進行線程切換。然后進入dispatch
方法旗唁,下面是Dispatchers.Main
中dispatch
的處理邏輯畦浓。
override fun dispatch(context: CoroutineContext, block: Runnable) {
handler.post(block)
}
這個再熟悉不過了,因為這個時候的handler.post
就是代表向主線程推送消息检疫,此時的block
將會在主線程進行調(diào)用讶请。
這樣線程的切換就完成。
所以綜上來看屎媳,CoroutineDispatcher
為協(xié)程提供了一個線程切換的統(tǒng)一判斷與執(zhí)行標準秽梅。
首先在協(xié)程進行啟動的時候通過攔截器的方式進行攔截,對應的方法是interceptContinuation
剿牺,然后返回一個具有切換線程功能的Continuation
企垦,在每次進行resumeWith
的時候,內(nèi)部再通過isDispatchNeeded
進行判斷當前協(xié)程的運行是否需要切換線程晒来。如果需要則調(diào)用dispatch
進行線程的切換钞诡,保證協(xié)程的正確運行。
如果我要自定義協(xié)程線程的切換邏輯湃崩,就可以通過繼承于CoroutineDispatcher
來實現(xiàn)荧降,將它的核心方法進行自定義即可。
當然攒读,如果你是在Android
中使用協(xié)程朵诫,那基本上是不需要自定義線程的切換邏輯。因為kotlin
已經(jīng)為我們提供了日常所需的Dispatchers
薄扁。主要有四種分別為:
-
Dispatchers.Default
: 適合在主線程之外執(zhí)行占用大量CPU
資源的工作 -
Dispatchers.Main
:Android
主線程 -
Dispatchers.Unconfined
: 它不會切換線程剪返,只是啟動一個協(xié)程進行掛起,至于恢復之后所在的線程完全由調(diào)用它恢復的協(xié)程控制邓梅。 -
Dispatchers.IO
: 適合在主線程之外執(zhí)行磁盤或網(wǎng)絡I/O
最后我們再來簡單提一下withContext
脱盲。
withContext
CoroutineDispatcher
雖然能夠提供線程的切換,但這只是單方向的日缨,因為它沒有提供線程的恢復钱反。
試想一下,我們有個網(wǎng)絡請求匣距,我們通過CoroutineDispatcher
將線程切換到Dispatchers.IO
面哥,當拿到請求成功的數(shù)據(jù)之后,所在的線程還是IO
線程毅待,這樣并不能有利于我們UI
操作尚卫。所以為了解決這個問題kotlin
提供了withContext
,它不僅能夠接受CoroutineDispatcher
來幫助我們切換線程恩静,同時在執(zhí)行完畢之后還會幫助我們將之前切換掉的線程進恢復焕毫,保證協(xié)程運行的連貫性蹲坷。這也是為什么官方推薦使用withContext
進行協(xié)程線程的切換的原因。
而withContext
的線程恢復原理是它內(nèi)部生成了一個DispatchedCoroutine
邑飒,保存切換線程時的CoroutineContext
與切換之前的Continuation
循签,最后在onCompletionInternal
進行恢復。
internal override fun onCompletionInternal(state: Any?, mode: Int, suppressed: Boolean) {
if (state is CompletedExceptionally) {
val exception = if (mode == MODE_IGNORE) state.cause else recoverStackTrace(state.cause, uCont)
uCont.resumeUninterceptedWithExceptionMode(exception, mode)
} else {
uCont.resumeUninterceptedMode(state as T, mode)
}
}
這個uCont
就是切換線程之前的Continuation
疙咸。具體實現(xiàn)就不在這分析了县匠,感興趣的老鐵可以自己翻一翻源碼。
本篇文章主要介紹了ContinuationInterceptor
作用與如何攔截協(xié)程的撒轮,同時也分析了CoroutineDispatcher
內(nèi)部結(jié)構(gòu)乞旦,進一步剖析了協(xié)程線程切換的原理。希望對學習協(xié)程的伙伴們能夠有所幫助题山,敬請期待后續(xù)的協(xié)程分析兰粉。
項目
android_startup: 提供一種在應用啟動時能夠更加簡單、高效的方式來初始化組件顶瞳,優(yōu)化啟動速度玖姑。不僅支持Jetpack App Startup
的全部功能,還提供額外的同步與異步等待慨菱、線程控制與多進程支持等功能焰络。
AwesomeGithub: 基于Github
客戶端,純練習項目符喝,支持組件化開發(fā)闪彼,支持賬戶密碼與認證登陸。使用Kotlin
語言進行開發(fā)协饲,項目架構(gòu)是基于Jetpack&DataBinding
的MVVM
畏腕;項目中使用了Arouter
、Retrofit
囱稽、Coroutine
郊尝、Glide
、Dagger
與Hilt
等流行開源技術(shù)战惊。
flutter_github: 基于Flutter
的跨平臺版本Github
客戶端,與AwesomeGithub
相對應扎即。
android-api-analysis: 結(jié)合詳細的Demo
來全面解析Android
相關(guān)的知識點, 幫助讀者能夠更快的掌握與理解所闡述的要點吞获。
daily_algorithm: 每日一算法,由淺入深谚鄙,歡迎加入一起共勉各拷。