本文章已授權(quán)微信公眾號(hào)鴻洋(hongyangAndroid)轉(zhuǎn)載娜汁。
本文章講解的內(nèi)容是在在Android中使用協(xié)程以及協(xié)程源碼分析委乌。
在說(shuō)協(xié)程之前司草,我先說(shuō)下線程和線程池:
線程是操作系統(tǒng)的內(nèi)核資源搬泥,是CPU調(diào)度的最小單位秸架,所有的應(yīng)用程序都運(yùn)行在線程上小腊,它是我們實(shí)現(xiàn)并發(fā)和異步的基礎(chǔ)救鲤。在Java的API中,Thread是實(shí)現(xiàn)線程的基礎(chǔ)類秩冈,每創(chuàng)建一個(gè)Thread對(duì)象本缠,操作系統(tǒng)內(nèi)核就會(huì)啟動(dòng)一個(gè)線程,在Thread的源碼中入问,它的內(nèi)部實(shí)現(xiàn)是大量的JNI調(diào)用丹锹,因?yàn)?strong>線程的實(shí)現(xiàn)必須由操作系統(tǒng)提供直接支持,在Linux操作系統(tǒng)中芬失,每一個(gè)Java thread對(duì)應(yīng)一個(gè)native thread楣黍,它們是一一對(duì)應(yīng)的,在Android中棱烂,創(chuàng)建Thread的過(guò)程中都會(huì)調(diào)用Linux API中的pthread_create函數(shù)租漂。
線程的調(diào)用會(huì)有存在以下問(wèn)題:
- 線程不是輕量級(jí)資源,大量創(chuàng)建線程會(huì)消耗系統(tǒng)大量資源,傳統(tǒng)的阻塞調(diào)用會(huì)導(dǎo)致系統(tǒng)存在大量的因?yàn)?strong>阻塞而不能運(yùn)行的線程哩治,這很浪費(fèi)系統(tǒng)資源秃踩。
- 線程阻塞狀態(tài)和運(yùn)行狀態(tài)的切換會(huì)存在相當(dāng)大的開(kāi)銷,一直以來(lái)都是個(gè)優(yōu)化點(diǎn)业筏,例如:JVM在運(yùn)行時(shí)會(huì)對(duì)鎖進(jìn)行優(yōu)化憔杨,就像自旋鎖、鎖粗化和鎖消除等等蒜胖。
線程池(Thread Pool)是一種基于池化思想管理線程的工具消别,使用線程池有如下好處:
- 降低資源消耗:通過(guò)池化技術(shù)重復(fù)利用已創(chuàng)建的線程,降低線程的創(chuàng)建和銷毀的損耗台谢。
- 提高響應(yīng)速度:任務(wù)到達(dá)時(shí)妖啥,無(wú)需等待線程創(chuàng)建即可立即執(zhí)行。
- 提高線程的可管理性:線程是稀缺資源对碌,如果無(wú)限制創(chuàng)建荆虱,不僅會(huì)消耗系統(tǒng)資源,還會(huì)因?yàn)?strong>線程的不合理分布導(dǎo)致資源調(diào)度失衡朽们,降低系統(tǒng)的穩(wěn)定性怀读,使用線程池可以進(jìn)行統(tǒng)一的分配、調(diào)優(yōu)和監(jiān)控骑脱。
- 提供更多更強(qiáng)大的功能:線程池具備可拓展性菜枷,允許開(kāi)發(fā)人員向其中增加更多的功能。
那協(xié)程與線程有什么關(guān)系呢叁丧?在Java中啤誊,協(xié)程是基于線程池的API,它并沒(méi)有脫離Java或者Kotlin已經(jīng)有的東西拥娄。
協(xié)程的定義
協(xié)程源自Simula和Modula-2語(yǔ)言蚊锹,它是一種編程思想,并不局限于特定的語(yǔ)言稚瘾,在1958年的時(shí)候牡昆,Melvin Edward Conway提出這個(gè)術(shù)語(yǔ)并用于構(gòu)建匯編程序。在Android中使用它可以簡(jiǎn)化異步執(zhí)行的代碼摊欠,它是在版本1.3中添加到Kotlin丢烘。
協(xié)程的使用
下面來(lái)介紹如何使用協(xié)程:
依賴
要使用協(xié)程,需要在build.gradle文件中添加如下依賴:
項(xiàng)目根目錄的build.gradle文件:
// build.gradle(AndroidGenericFramework)
ext {
// 省略部分代碼
kotlinxCoroutinesVersion = '1.3.1'
// 省略部分代碼
}
module的build.gradle文件:
// build.gradle(:app)
dependencies {
// 省略部分代碼
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:$kotlinxCoroutinesVersion"
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-android:$kotlinxCoroutinesVersion"
// 省略部分代碼
testImplementation "org.jetbrains.kotlinx:kotlinx-coroutines-test:$kotlinxCoroutinesVersion"
// 省略部分代碼
}
- org.jetbrains.kotlinx:kotlinx-coroutines-core:協(xié)程的核心庫(kù)些椒,它是協(xié)程的公共API播瞳,有了這一層公共代碼才能使協(xié)程在各個(gè)平臺(tái)的接口得到統(tǒng)一。
- org.jetbrains.kotlinx:kotlinx-coroutines-android:協(xié)程的當(dāng)前平臺(tái)對(duì)應(yīng)的平臺(tái)庫(kù)免糕,當(dāng)前平臺(tái)是Android赢乓,它是協(xié)程在具體平臺(tái)的具體實(shí)現(xiàn)痒给,因?yàn)轭愃?strong>多線程在各個(gè)平臺(tái)的實(shí)現(xiàn)方式是有差異的。
- org.jetbrains.kotlinx:kotlinx-coroutines-test:協(xié)程的測(cè)試庫(kù)骏全,它方便我們?cè)?strong>測(cè)試中使用協(xié)程。
這里要注意的是尼斧,這三個(gè)庫(kù)的版本要保持一致姜贡。
基礎(chǔ)
下面是協(xié)程的基礎(chǔ)部分:
啟動(dòng)協(xié)程
可以通過(guò)以下兩種方式來(lái)啟動(dòng)協(xié)程:
- launch:可以啟動(dòng)新協(xié)程,但是不將結(jié)果返回給調(diào)用方棺棵。
- async:可以啟動(dòng)新協(xié)程楼咳,并且允許使用await暫停函數(shù)返回結(jié)果。
通常我們使用launch函數(shù)從常規(guī)函數(shù)啟動(dòng)新協(xié)程烛恤,如果要執(zhí)行并行分解的話才使用async函數(shù)母怜。
async函數(shù)可以返回結(jié)果,代碼如下所示:
// Builders.common.kt
public fun <T> CoroutineScope.async(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> T
): Deferred<T> {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyDeferredCoroutine(newContext, block) else
DeferredCoroutine<T>(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}
async函數(shù)返回的是Deferred接口缚柏,繼承Job接口苹熏,它是一個(gè)非阻塞、可取消的future币喧。
要注意的是launch函數(shù)和async函數(shù)以不同的方式處理異常轨域,在使用async函數(shù)時(shí)候可以調(diào)用await函數(shù)得到結(jié)果,如果出現(xiàn)異常將會(huì)以靜默方式拋出杀餐,也就是說(shuō)不會(huì)出現(xiàn)在崩潰指標(biāo)中干发,也不會(huì)在logcat中注明。
await函數(shù)是針對(duì)單個(gè)協(xié)程的史翘,awaitAll函數(shù)是針對(duì)多個(gè)協(xié)程的枉长,它們都能保證這些協(xié)程在返回結(jié)果之前完成。
通常協(xié)程有三種方式創(chuàng)建琼讽,如下所示:
runBlocking
使用runBlocking頂層函數(shù)來(lái)創(chuàng)建協(xié)程必峰,這種方式是線程阻塞的,適用于單元測(cè)試钻蹬,一般業(yè)務(wù)開(kāi)發(fā)不會(huì)使用這種自点,示例代碼如下所示:
runBlocking {
login()
}
runBlocking函數(shù)源碼如下所示:
// Builders.kt
@Throws(InterruptedException::class)
// 第一個(gè)參數(shù)context是協(xié)程上下文,默認(rèn)值為EmptyCoroutineContext脉让,第二個(gè)參數(shù)是帶有CoroutineScope接受者對(duì)象桂敛,不接受任何參數(shù)返回T的Lambda表達(dá)式
public fun <T> runBlocking(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T): T {
val currentThread = Thread.currentThread()
val contextInterceptor = context[ContinuationInterceptor]
val eventLoop: EventLoop?
val newContext: CoroutineContext
if (contextInterceptor == null) {
// 如果沒(méi)有指定調(diào)度程序(dispatcher),就創(chuàng)建或者使用私有事件循環(huán)
eventLoop = ThreadLocalEventLoop.eventLoop
newContext = GlobalScope.newCoroutineContext(context + eventLoop)
} else {
// 看看上下文(context)的攔截器(interceptor)是否是一個(gè)我們將要使用的事件循環(huán)(用來(lái)支持TestContext)或者如果存在thread-local事件循環(huán)溅潜,就使用它來(lái)避免阻塞术唬,不過(guò)它不會(huì)去新建一個(gè)
eventLoop = (contextInterceptor as? EventLoop)?.takeIf { it.shouldBeProcessedFromContext() }
?: ThreadLocalEventLoop.currentOrNull()
newContext = GlobalScope.newCoroutineContext(context)
}
val coroutine = BlockingCoroutine<T>(newContext, currentThread, eventLoop)
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
return coroutine.joinBlocking()
}
GlobalScope
使用GlobalScope單例對(duì)象,并且調(diào)用launch函數(shù)來(lái)創(chuàng)建協(xié)程滚澜,這種方式不會(huì)阻塞線程粗仓,但是不推薦在Android中使用這種方式,因?yàn)樗?strong>生命周期是整個(gè)應(yīng)用程序的生命周期,如果處理不好借浊,容易導(dǎo)致內(nèi)存泄漏塘淑,而且不能取消,示例代碼如下所示:
GlobalScope.launch {
login()
}
GlobalScope源碼如下所示:
// CoroutineScope.kt
public object GlobalScope : CoroutineScope {
/**
* Returns [EmptyCoroutineContext].
*/
override val coroutineContext: CoroutineContext
get() = EmptyCoroutineContext
}
EmptyCoroutineContext是一個(gè)空的協(xié)程上下文蚂斤。
CoroutineScope
使用CoroutineScope對(duì)象存捺,并且調(diào)用launch函數(shù)來(lái)創(chuàng)建協(xié)程,這種方式可以通過(guò)傳入的CoroutineContext來(lái)控制協(xié)程的生命周期曙蒸,推薦使用這種方式捌治,示例代碼如下所示:
CoroutineScope(Dispatchers.IO).launch {
login()
}
Dispatchers.IO是CoroutineContext其中一種類型,下面會(huì)講到這個(gè)纽窟。
CoroutineScope可以管理一個(gè)或者多個(gè)相關(guān)的協(xié)程肖油,可以使用它在指定范圍內(nèi)啟動(dòng)新協(xié)程。
與調(diào)度程序不同臂港,CoroutineScope不運(yùn)行協(xié)程森枪。
CoroutineScope的一項(xiàng)重要功能就是在用戶離開(kāi)你應(yīng)用中的內(nèi)容區(qū)域時(shí)停止執(zhí)行協(xié)程,它可以確保所有正在運(yùn)行的操作都能正確停止审孽。
CoroutineScope源碼如下所示:
// CoroutineScope.kt
// 參數(shù)block是帶有CoroutineScope接受者對(duì)象疲恢,不接受任何參數(shù)返回R的Lambda表達(dá)式
public suspend fun <R> coroutineScope(block: suspend CoroutineScope.() -> R): R =
suspendCoroutineUninterceptedOrReturn { uCont ->
val coroutine = ScopeCoroutine(uCont.context, uCont)
coroutine.startUndispatchedOrReturn(coroutine, block)
}
在Android中使用協(xié)程
在Android平臺(tái)上,協(xié)程有助于解決兩個(gè)主要問(wèn)題:
- 管理長(zhǎng)時(shí)間運(yùn)行的任務(wù)瓷胧,如果管理不當(dāng)显拳,這些任務(wù)可能會(huì)阻塞主線程并導(dǎo)致你的應(yīng)用界面凍結(jié)。
- 提供主線程安全性搓萧,或者從主線程安全地調(diào)用網(wǎng)絡(luò)或者磁盤(pán)操作杂数。
管理長(zhǎng)時(shí)間運(yùn)行的任務(wù)
在Android平臺(tái)上,每個(gè)應(yīng)用都有一個(gè)用于處理界面并且管理用戶交互的主線程瘸洛。如果你的應(yīng)用為主線程分配的工作太多揍移,會(huì)導(dǎo)致界面呈現(xiàn)速度緩慢或者界面凍結(jié),對(duì)觸摸事件的響應(yīng)速度很慢反肋,例如:網(wǎng)絡(luò)請(qǐng)求那伐、JSON解析、寫(xiě)入或者讀取數(shù)據(jù)庫(kù)石蔗、遍歷大型列表罕邀,這些都應(yīng)該在工作線程完成。
協(xié)程在常規(guī)函數(shù)的基礎(chǔ)上添加了兩項(xiàng)操作养距,用于處理長(zhǎng)時(shí)間運(yùn)行的任務(wù)诉探。在invoke或者call和return之外,協(xié)程添加了suspend和resume:
- suspend用于暫停執(zhí)行當(dāng)前協(xié)程棍厌,并保存所有的局部變量肾胯。
- resume用于讓已暫停的協(xié)程從其暫停處繼續(xù)執(zhí)行竖席。
要調(diào)用suspend函數(shù),只能從其他suspend函數(shù)進(jìn)行調(diào)用敬肚,或者通過(guò)使用協(xié)程構(gòu)建器(例如:launch)來(lái)啟動(dòng)新的協(xié)程毕荐。
Kotlin使用堆棧幀來(lái)管理要運(yùn)行哪個(gè)函數(shù)以及所有的局部變量。暫停協(xié)程時(shí)會(huì)復(fù)制并保存當(dāng)前的堆棧幀以供稍后使用艳馒;恢復(fù)協(xié)程時(shí)會(huì)將堆棧幀從其保存位置復(fù)制回來(lái)憎亚,然后函數(shù)再次開(kāi)始運(yùn)行。
編譯器會(huì)在編譯期間對(duì)被suspend修飾符修飾的函數(shù)進(jìn)行續(xù)體傳遞風(fēng)格(CPS)變換鹰溜,它會(huì)改變suspend函數(shù)的函數(shù)簽名,我舉個(gè)例子:
await函數(shù)是個(gè)suspend函數(shù)丁恭,函數(shù)簽名如下所示:
suspend fun <T> CompletableFuture<T>.await(): T
在編譯期間進(jìn)行續(xù)體傳遞風(fēng)格(CPS)變換后:
fun <T> CompletableFuture<T>.await(continuation: Continuation<T>): Any?
我們可以看到進(jìn)行續(xù)體傳遞風(fēng)格(CPS)變換后的函數(shù)多了一個(gè)類型為Continuation<T>的參數(shù)曹动,Continuation代碼如下所示:
interface Continuation<in T> {
val context: CoroutineContext
fun resumeWith(result: Result<T>)
}
續(xù)體包裝了協(xié)程在掛起之后繼續(xù)執(zhí)行的代碼,在編譯過(guò)程中牲览,一個(gè)完整的協(xié)程被分割成一個(gè)又一個(gè)續(xù)體墓陈,在await函數(shù)的掛起結(jié)束之后,它會(huì)調(diào)用參數(shù)continuation的resumeWith函數(shù)來(lái)恢復(fù)執(zhí)行await之后的代碼第献。
進(jìn)行續(xù)體傳遞風(fēng)格(CPS)變換后的函數(shù)返回值是Any?贡必,這是因?yàn)檫@個(gè)函數(shù)發(fā)生變換后,它會(huì)返回一個(gè)類型為T(mén)(返回它本身)和COROUTINE_SUSPENDED標(biāo)記的聯(lián)合類型庸毫,因?yàn)?strong>Kotlin沒(méi)有聯(lián)合類型語(yǔ)法仔拟,所以就使用最泛化的類型Any?來(lái)表示,COROUTINE_SUSPENDED標(biāo)記表示的是這個(gè)suspend函數(shù)會(huì)發(fā)生事實(shí)上的掛起操作飒赃。
在下面介紹的三個(gè)調(diào)度程序利花,它們都會(huì)繼承CoroutineDispatcher類,源碼如下所示:
// CorountineDispatcher.kt
public abstract class CoroutineDispatcher :
AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
// 省略部分代碼
}
這個(gè)類實(shí)現(xiàn)了ContinuationInterceptor接口载佳,源碼如下所示:
// ContinuationInterceptor.kt
@SinceKotlin("1.3")
public interface ContinuationInterceptor : CoroutineContext.Element {
// 定義上下文攔截器的鍵
companion object Key : CoroutineContext.Key<ContinuationInterceptor>
// 返回原始封裝的Continuation炒事,從而攔截所有的恢復(fù)
public fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T>
// 初始化時(shí),為interceptContinuation返回的Continuation實(shí)例調(diào)用
public fun releaseInterceptedContinuation(continuation: Continuation<*>) {
/* do nothing by default */
}
public override operator fun <E : CoroutineContext.Element> get(key: CoroutineContext.Key<E>): E? {
// getPolymorphicKey專門(mén)用于ContinuationInterceptor的鍵
@OptIn(ExperimentalStdlibApi::class)
if (key is AbstractCoroutineContextKey<*, *>) {
@Suppress("UNCHECKED_CAST")
return if (key.isSubKey(this.key)) key.tryCast(this) as? E else null
}
@Suppress("UNCHECKED_CAST")
return if (ContinuationInterceptor === key) this as E else null
}
public override fun minusKey(key: CoroutineContext.Key<*>): CoroutineContext {
// minusPolymorphicKey專門(mén)用于ContinuationInterceptor的鍵
@OptIn(ExperimentalStdlibApi::class)
if (key is AbstractCoroutineContextKey<*, *>) {
return if (key.isSubKey(this.key) && key.tryCast(this) != null) EmptyCoroutineContext else this
}
return if (ContinuationInterceptor === key) EmptyCoroutineContext else this
}
}
這個(gè)接口叫做續(xù)體攔截器蔫慧,它負(fù)責(zé)攔截協(xié)程在恢復(fù)后執(zhí)行的代碼(也就是續(xù)體)挠乳,并且將其在指定線程或者線程池恢復(fù)。
在編譯期間姑躲,每個(gè)suspend函數(shù)會(huì)被編譯成實(shí)現(xiàn)了Continuation接口的匿名類睡扬,其實(shí)調(diào)用suspend函數(shù)時(shí),并不一定會(huì)掛起協(xié)程黍析,舉個(gè)例子:有個(gè)網(wǎng)絡(luò)請(qǐng)求的邏輯調(diào)用了await函數(shù)威蕉,如果網(wǎng)絡(luò)請(qǐng)求還沒(méi)得到結(jié)果,那么協(xié)程就會(huì)被掛起橄仍,直到得到結(jié)果為止韧涨,續(xù)體攔截器只會(huì)攔截發(fā)生掛起后的掛起點(diǎn)的續(xù)體牍戚,而對(duì)于沒(méi)發(fā)生掛起的掛起點(diǎn),協(xié)程會(huì)調(diào)用resumeWith函數(shù)虑粥,而不再需要續(xù)體攔截器處理如孝。
續(xù)體攔截器會(huì)緩存攔截過(guò)的續(xù)體,并且在不需要它的時(shí)候調(diào)用releaseInterceptedContinuation函數(shù)釋放娩贷。
使用協(xié)程確保主線程安全
Kotlin協(xié)程使用調(diào)度程序來(lái)確定哪些線程用于執(zhí)行協(xié)程第晰,所有協(xié)程都必須在調(diào)度程序中運(yùn)行,協(xié)程可以可以暫停彬祖,而調(diào)度程序負(fù)責(zé)將其恢復(fù)茁瘦。
Kotlin提供了三個(gè)調(diào)度程序,可以使用它們來(lái)指定應(yīng)在何處運(yùn)行協(xié)程:
- Dispatchers.Main:使用此調(diào)度程序可在Android主線程上運(yùn)行協(xié)程储笑,只能用于界面交互和執(zhí)行快速工作甜熔,例如:調(diào)用suspend函數(shù)、運(yùn)行Android界面框架操作和更新LiveData對(duì)象突倍。
- Dispatchers.Default:此調(diào)度程序適合在主線程之外執(zhí)行占用大量CPU資源的工作腔稀,例如:對(duì)列表排序和解析JSON。
- Dispatchers.IO:此調(diào)度程序適合在主線程之外執(zhí)行磁盤(pán)或者網(wǎng)絡(luò)I/O羽历,例如:操作數(shù)據(jù)庫(kù)(使用Room)焊虏、向文件中寫(xiě)入數(shù)據(jù)或者從文件中讀取數(shù)據(jù)和運(yùn)行任何網(wǎng)絡(luò)操作。
我們可以調(diào)用withContext函數(shù)秕磷,并且傳入相應(yīng)的協(xié)程上下文(CoroutineContext)就可以調(diào)度線程诵闭。
withContext函數(shù)是個(gè)suspend函數(shù),它可以在不引用回調(diào)的情況下控制任何代碼行的線程池澎嚣,因此可以將其應(yīng)用于非常小的函數(shù)涂圆,示例代碼如下所示:
suspend fun getUserInfo() { // Dispatchers.Main
val data = fetchUserInfo() // Dispatchers.Main
show(data) // Dispatchers.Main
}
suspend fun fetchUserInfo() { // Dispatchers.Main
withContext(Dispatchers.IO) { // Dispatchers.IO
// 執(zhí)行網(wǎng)絡(luò)請(qǐng)求 // Dispatchers.IO
} // Dispatchers.Main
}
在示例代碼中,getUserInfo函數(shù)在主線程上執(zhí)行币叹,它可以安全地調(diào)用fetchUserInfo函數(shù)润歉,在工作線程中執(zhí)行網(wǎng)絡(luò)請(qǐng)求,并且掛起颈抚,在withContext代碼塊執(zhí)行完成后踩衩,主線程上的協(xié)程就會(huì)根據(jù)fetchUserInfo函數(shù)的結(jié)果恢復(fù)后面的邏輯。
相比于回調(diào)實(shí)現(xiàn)贩汉,使用withContext函數(shù)不會(huì)增加額外的開(kāi)銷驱富,在某些情況下,甚至優(yōu)于回調(diào)實(shí)現(xiàn)匹舞,例如:某個(gè)函數(shù)執(zhí)行了很多次的網(wǎng)絡(luò)請(qǐng)求褐鸥,使用外部withContext函數(shù)可以讓Kotlin停留在同一個(gè)調(diào)度程序,并且只切換一次線程赐稽,此外叫榕,Kotlin還優(yōu)化了Dispatchers.Default和Dispatchers.IO之間的切換浑侥,以盡可能避免線程切換。
要注意的是晰绎,Dispatchers.Default和Dispatchers.IO都是使用線程池的調(diào)度程序寓落,它們不能保證代碼塊在同一線程從上到下執(zhí)行,因?yàn)樵谀承┣闆r下荞下,Kotlin會(huì)在掛起和恢復(fù)后伶选,將執(zhí)行工作移交給另外一個(gè)線程,這意味著尖昏,對(duì)于整個(gè)withContext代碼塊仰税,線程局部變量并不指向同一個(gè)值。
Dispatchers.Main
源碼如下所示:
// Dispatchers.kt
public actual object Dispatchers {
// 省略部分代碼
@JvmStatic
public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher
// 省略部分代碼
}
然后看下MainDispatcherLoader.dispatcher抽诉,源碼如下所示:
// MainDispatchers.kt
internal object MainDispatcherLoader {
// 省略部分代碼
@JvmField
val dispatcher: MainCoroutineDispatcher = loadMainDispatcher()
// 省略部分代碼
}
變量dispatcher為MainCoroutineDispatcher類型陨簇,MainCoroutineDispatcher是個(gè)抽象類,然后它的其中一個(gè)實(shí)現(xiàn)類是包裝類(sealed class)HandlerDispatcher掸鹅,也就是它的子類肯定是在這個(gè)類的所在的文件中塞帐,然后我找到HandlerContext這個(gè)類拦赠,這個(gè)類是HandlerDispatcher的唯一子類巍沙,源碼如下所示:
// MainCoroutineDispatcher.kt
internal class HandlerContext private constructor(
private val handler: Handler,
private val name: String?,
private val invokeImmediately: Boolean
) : HandlerDispatcher(), Delay {
// HandlerContext的構(gòu)造函數(shù),參數(shù)handler為要傳進(jìn)來(lái)的Handler荷鼠,參數(shù)name為用于調(diào)試的可選名稱
public constructor(
handler: Handler,
name: String? = null
) : this(handler, name, false)
@Volatile
private var _immediate: HandlerContext? = if (invokeImmediately) this else null
override val immediate: HandlerContext = _immediate ?:
HandlerContext(handler, name, true).also { _immediate = it }
// 判斷是否需要調(diào)度句携,參數(shù)context為CoroutineContext
override fun isDispatchNeeded(context: CoroutineContext): Boolean {
// 判斷invokeImmediately的值或者是否是同一個(gè)線程
return !invokeImmediately || Looper.myLooper() != handler.looper
}
// 調(diào)度線程,參數(shù)context為CoroutineContext允乐,參數(shù)block為Runnable
override fun dispatch(context: CoroutineContext, block: Runnable) {
// 調(diào)用Handler的post方法矮嫉,將Runnable添加到消息隊(duì)列中,這個(gè)Runnable會(huì)在這個(gè)Handler附加在線程上的時(shí)候運(yùn)行
handler.post(block)
}
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
val block = Runnable {
with(continuation) { resumeUndispatched(Unit) }
}
// 調(diào)用Handler的postDelayed方法牍疏,將Runnable添加到消息隊(duì)列中蠢笋,并且在指定的時(shí)間結(jié)束后運(yùn)行
handler.postDelayed(block, timeMillis.coerceAtMost(MAX_DELAY))
continuation.invokeOnCancellation { handler.removeCallbacks(block) }
}
override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle {
// 調(diào)用Handler的postDelayed方法,將Runnable添加到消息隊(duì)列中鳞陨,并且在指定的時(shí)間結(jié)束后運(yùn)行
handler.postDelayed(block, timeMillis.coerceAtMost(MAX_DELAY))
return object : DisposableHandle {
override fun dispose() {
// 調(diào)用Handler的removeCallbacks方法昨寞,刪除消息隊(duì)列中的Runnable
handler.removeCallbacks(block)
}
}
}
override fun toString(): String =
if (name != null) {
if (invokeImmediately) "$name [immediate]" else name
} else {
handler.toString()
}
override fun equals(other: Any?): Boolean = other is HandlerContext && other.handler === handler
override fun hashCode(): Int = System.identityHashCode(handler)
}
然后我們找下調(diào)用HandlerContext的構(gòu)造函數(shù)的地方,源碼如下所示:
// HandlerDispatcher.kt
@JvmField
@Deprecated("Use Dispatchers.Main instead", level = DeprecationLevel.HIDDEN)
internal val Main: HandlerDispatcher? = runCatching { HandlerContext(Looper.getMainLooper().asHandler(async = true), "Main") }.getOrNull()
可以看到傳入了Looper.getMainLooper方法厦滤,也就是應(yīng)用程序的主循環(huán)程序(Main Looper)援岩,它位于應(yīng)用程序的主線程中。
可以看到使用了很多Handler相關(guān)的方法掏导,也就是它還是依賴于Android的消息機(jī)制享怀。
Dispatchers.Default
源碼如下所示:
// Dispatchers.kt
public actual object Dispatchers {
// 省略部分代碼
@JvmStatic
public actual val Default: CoroutineDispatcher = createDefaultDispatcher()
// 省略部分代碼
}
然后看下createDefaultDispatcher函數(shù),源碼如下所示:
// CoroutineContext.kt
internal actual fun createDefaultDispatcher(): CoroutineDispatcher =
if (useCoroutinesScheduler) DefaultScheduler else CommonPool
這里會(huì)根據(jù)內(nèi)部變量(internal val)useCoroutinesScheduler判斷返回是DefaultScheduler還是CommonPool趟咆,useCoroutinesScheduler源碼如下所示:
// CoroutineContext.kt
internal const val COROUTINES_SCHEDULER_PROPERTY_NAME = "kotlinx.coroutines.scheduler"
internal val useCoroutinesScheduler = systemProp(COROUTINES_SCHEDULER_PROPERTY_NAME).let { value ->
when (value) {
null, "", "on" -> true
"off" -> false
else -> error("System property '$COROUTINES_SCHEDULER_PROPERTY_NAME' has unrecognized value '$value'")
}
}
這個(gè)內(nèi)部變量(internal val)useCoroutinesScheduler是根據(jù)JVM的System.getProperty方法獲取的添瓷,通過(guò)傳入"kotlinx.coroutines.scheduler"作為鍵(key)梅屉,返回的值為on,useCoroutinesScheduler為true仰坦;返回的值是off履植,useCoroutinesScheduler為false。
先看下DefaultScheduler這種情況悄晃,源碼如下所示:
// Dispatcher.kt
internal object DefaultScheduler : ExperimentalCoroutineDispatcher() {
val IO = blocking(systemProp(IO_PARALLELISM_PROPERTY_NAME, 64.coerceAtLeast(AVAILABLE_PROCESSORS)))
override fun close() {
throw UnsupportedOperationException("$DEFAULT_SCHEDULER_NAME cannot be closed")
}
override fun toString(): String = DEFAULT_SCHEDULER_NAME
@InternalCoroutinesApi
@Suppress("UNUSED")
public fun toDebugString(): String = super.toString()
}
它繼承ExperimentalCoroutineDispatcher類玫霎,它是個(gè)不穩(wěn)定的類,以后可能會(huì)改變妈橄,可以看下這個(gè)類的dispatch函數(shù)庶近,這個(gè)函數(shù)負(fù)責(zé)調(diào)度線程,源碼如下所示:
// Dispatcher.kt
@InternalCoroutinesApi
open class ExperimentalCoroutineDispatcher(
// 核心線程數(shù)
private val corePoolSize: Int,
// 最大線程數(shù)
private val maxPoolSize: Int,
// 調(diào)度器保持存活的時(shí)間(單位:納秒)
private val idleWorkerKeepAliveNs: Long,
// 調(diào)度器名字
private val schedulerName: String = "CoroutineScheduler"
) : ExecutorCoroutineDispatcher() {
constructor(
corePoolSize: Int = CORE_POOL_SIZE,
maxPoolSize: Int = MAX_POOL_SIZE,
schedulerName: String = DEFAULT_SCHEDULER_NAME
) : this(corePoolSize, maxPoolSize, IDLE_WORKER_KEEP_ALIVE_NS, schedulerName)
@Deprecated(message = "Binary compatibility for Ktor 1.0-beta", level = DeprecationLevel.HIDDEN)
constructor(
corePoolSize: Int = CORE_POOL_SIZE,
maxPoolSize: Int = MAX_POOL_SIZE
) : this(corePoolSize, maxPoolSize, IDLE_WORKER_KEEP_ALIVE_NS)
// 省略部分代碼
override fun dispatch(context: CoroutineContext, block: Runnable): Unit =
try {
// 調(diào)用了coroutineScheduler的dispatch函數(shù)
coroutineScheduler.dispatch(block)
} catch (e: RejectedExecutionException) {
DefaultExecutor.dispatch(context, block)
}
// 省略部分代碼
}
看下CoroutineScheduler這個(gè)類眷蚓,然后再看下它的dispatch函數(shù)鼻种,源碼如下所示:
// CoroutineScheduler.kt
@Suppress("NOTHING_TO_INLINE")
internal class CoroutineScheduler(
// 核心線程數(shù)
@JvmField val corePoolSize: Int,
// 最大線程數(shù)
@JvmField val maxPoolSize: Int,
// 調(diào)度器保持存活的時(shí)間(單位:納秒)
@JvmField val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS,
// 調(diào)度器名字
@JvmField val schedulerName: String = DEFAULT_SCHEDULER_NAME
) : Executor, Closeable {
init {
require(corePoolSize >= MIN_SUPPORTED_POOL_SIZE) {
"Core pool size $corePoolSize should be at least $MIN_SUPPORTED_POOL_SIZE"
}
require(maxPoolSize >= corePoolSize) {
"Max pool size $maxPoolSize should be greater than or equals to core pool size $corePoolSize"
}
require(maxPoolSize <= MAX_SUPPORTED_POOL_SIZE) {
"Max pool size $maxPoolSize should not exceed maximal supported number of threads $MAX_SUPPORTED_POOL_SIZE"
}
require(idleWorkerKeepAliveNs > 0) {
"Idle worker keep alive time $idleWorkerKeepAliveNs must be positive"
}
}
// 省略部分代碼
fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false) {
// 用于支持虛擬時(shí)間
trackTask()
val task = createTask(block, taskContext)
// 嘗試將任務(wù)提交到本地隊(duì)列,并且根據(jù)結(jié)果采取執(zhí)行相關(guān)的邏輯
val currentWorker = currentWorker()
val notAdded = currentWorker.submitToLocalQueue(task, tailDispatch)
if (notAdded != null) {
if (!addToGlobalQueue(notAdded)) {
// 全局隊(duì)列在最后一步關(guān)閉時(shí)不應(yīng)該接受更多的任務(wù)
throw RejectedExecutionException("$schedulerName was terminated")
}
}
val skipUnpark = tailDispatch && currentWorker != null
if (task.mode == TASK_NON_BLOCKING) {
if (skipUnpark) return
// 執(zhí)行任務(wù)
signalCpuWork()
} else {
// 增加阻塞任務(wù)
signalBlockingWork(skipUnpark = skipUnpark)
}
}
// 省略部分代碼
}
可以看到CoroutineScheduler實(shí)現(xiàn)了Executor接口沙热,在Java中線程池的核心實(shí)現(xiàn)類是ThreadPoolExecutor類叉钥,它也是實(shí)現(xiàn)了Executor接口,所以這個(gè)CoroutineScheduler是協(xié)程中線程池的一種實(shí)現(xiàn)篙贸。
corePoolSize是核心線程數(shù)量投队,它是通過(guò)調(diào)用JVM的Runtime.getRuntime().availableProcessors()方法得到當(dāng)前處理器可運(yùn)行的線程數(shù),它的缺省值強(qiáng)制設(shè)置為至少兩個(gè)線程爵川。
maxPoolSize是最大線程數(shù)量敷鸦,最小值為corePoolSize,最大值為(1 shl BLOCKING_SHIFT) - 2寝贡,BLOCKING_SHIFT為21扒披,也就是1向左位移21位再減去2,確保Runtime.getRuntime().availableProcessors()得到的值再乘以2在最小值和最大值之間圃泡。
這個(gè)函數(shù)做的事情就是將傳入的任務(wù)壓入任務(wù)棧碟案,然后調(diào)用signalCpuWork執(zhí)行任務(wù)或者調(diào)用signalBlockingWork來(lái)增加阻塞任務(wù)。
然后再看下另外一種情況:CommonPool颇蜡,源碼如下所示:
// CommonPool.kt
internal object CommonPool : ExecutorCoroutineDispatcher() {
// 省略部分代碼
private fun createPool(): ExecutorService {
if (System.getSecurityManager() != null) return createPlainPool()
// ForkJoinPool類的反射价说,方便它在JDK6上可以運(yùn)行(這里沒(méi)有),如果沒(méi)有就使用普通線程池
val fjpClass = Try { Class.forName("java.util.concurrent.ForkJoinPool") }
?: return createPlainPool()
// 嘗試使用commonPool澡匪,除非顯式指定了并行性或者在調(diào)試privatePool模式
if (!usePrivatePool && requestedParallelism < 0) {
Try { fjpClass.getMethod("commonPool")?.invoke(null) as? ExecutorService }
?.takeIf { isGoodCommonPool(fjpClass, it) }
?.let { return it }
}
// 嘗試創(chuàng)建私有ForkJoinPool實(shí)例
Try { fjpClass.getConstructor(Int::class.java).newInstance(parallelism) as? ExecutorService }
?. let { return it }
// 使用普通線城市
return createPlainPool()
}
// 省略部分代碼
// 創(chuàng)建普通線程池
private fun createPlainPool(): ExecutorService {
val threadId = AtomicInteger()
// 使用Java的newFixedThreadPool線程池
return Executors.newFixedThreadPool(parallelism) {
Thread(it, "CommonPool-worker-${threadId.incrementAndGet()}").apply { isDaemon = true }
}
}
// 省略部分代碼
// 調(diào)度線程
override fun dispatch(context: CoroutineContext, block: Runnable) {
try {
(pool ?: getOrCreatePoolSync()).execute(wrapTask(block))
} catch (e: RejectedExecutionException) {
unTrackTask()
DefaultExecutor.enqueue(block)
}
}
// 省略部分代碼
}
可以看到使用CommonPool熔任,其實(shí)就是使用Java的newFixedThreadPool線程池。
Dispatchers.Default調(diào)度器的核心線程池和處理器的線程數(shù)是相等的唁情,因此它可以用于處理密集型計(jì)算疑苔,適合在主線程之外執(zhí)行占用大量CPU資源的工作,例如:對(duì)列表排序和解析JSON甸鸟,和RxJava的計(jì)算線程池的思想有點(diǎn)類似惦费。
Dispatchers.IO
源碼如下所示:
// Dispatchers.kt
public actual object Dispatchers {
// 省略部分代碼
@JvmStatic
public val IO: CoroutineDispatcher = DefaultScheduler.IO
}
可以看到IO其實(shí)是DefaultScheduler的一個(gè)成員變量兵迅,源碼如下所示:
internal object DefaultScheduler : ExperimentalCoroutineDispatcher() {
// 調(diào)用了父類ExperimentalCoroutineDispatcher的blocking函數(shù)
val IO = blocking(systemProp(IO_PARALLELISM_PROPERTY_NAME, 64.coerceAtLeast(AVAILABLE_PROCESSORS)))
override fun close() {
throw UnsupportedOperationException("$DEFAULT_SCHEDULER_NAME cannot be closed")
}
override fun toString(): String = DEFAULT_SCHEDULER_NAME
@InternalCoroutinesApi
@Suppress("UNUSED")
public fun toDebugString(): String = super.toString()
}
可以看下它的父類ExperimentalCoroutineDispatcher的blocking函數(shù),源碼如下所示:
// Dispatcher.kt
@InternalCoroutinesApi
open class ExperimentalCoroutineDispatcher(
// 核心線程數(shù)
private val corePoolSize: Int,
// 最大線程數(shù)
private val maxPoolSize: Int,
// 調(diào)度器保持存活的時(shí)間(單位:納秒)
private val idleWorkerKeepAliveNs: Long,
// 調(diào)度器名字
private val schedulerName: String = "CoroutineScheduler"
) : ExecutorCoroutineDispatcher() {
constructor(
corePoolSize: Int = CORE_POOL_SIZE,
maxPoolSize: Int = MAX_POOL_SIZE,
schedulerName: String = DEFAULT_SCHEDULER_NAME
) : this(corePoolSize, maxPoolSize, IDLE_WORKER_KEEP_ALIVE_NS, schedulerName)
@Deprecated(message = "Binary compatibility for Ktor 1.0-beta", level = DeprecationLevel.HIDDEN)
constructor(
corePoolSize: Int = CORE_POOL_SIZE,
maxPoolSize: Int = MAX_POOL_SIZE
) : this(corePoolSize, maxPoolSize, IDLE_WORKER_KEEP_ALIVE_NS)
// 省略部分代碼
public fun blocking(parallelism: Int = BLOCKING_DEFAULT_PARALLELISM): CoroutineDispatcher {
require(parallelism > 0) { "Expected positive parallelism level, but have $parallelism" }
// 創(chuàng)建LimitingDispatcher對(duì)象
return LimitingDispatcher(this, parallelism, TASK_PROBABLY_BLOCKING)
}
// 省略部分代碼
}
看下LimitingDispatcher類薪贫,源碼如下所示:
// Dispatcher.kt
private class LimitingDispatcher(
// final變量dispatcher為ExperimentalCoroutineDispatcher類型
val dispatcher: ExperimentalCoroutineDispatcher,
val parallelism: Int,
override val taskMode: Int
) : ExecutorCoroutineDispatcher(), TaskContext, Executor {
// 省略部分代碼
// 調(diào)度線程恍箭,調(diào)用dispatch(block: Runnable, tailDispatch: Boolean)函數(shù)
override fun dispatch(context: CoroutineContext, block: Runnable) = dispatch(block, false)
private fun dispatch(block: Runnable, tailDispatch: Boolean) {
var taskToSchedule = block
while (true) {
// 提交正在執(zhí)行的任務(wù)槽
val inFlight = inFlightTasks.incrementAndGet()
// 快速路徑,如果沒(méi)有達(dá)到并行性限制瞧省,就會(huì)分派任務(wù)并且返回
if (inFlight <= parallelism) {
// 調(diào)用ExperimentalCoroutineDispatcher的dispatchWithContext函數(shù)
dispatcher.dispatchWithContext(taskToSchedule, this, tailDispatch)
return
}
// 達(dá)到并行性限制后就將任務(wù)添加到隊(duì)列中
queue.add(taskToSchedule)
if (inFlightTasks.decrementAndGet() >= parallelism) {
return
}
taskToSchedule = queue.poll() ?: return
}
}
// 省略部分代碼
}
可以看到其實(shí)Dispatchers.Default調(diào)度器和Dispatchers.IO調(diào)度器是共用同一個(gè)線程池的扯夭。
指定CoroutineScope
在定義協(xié)程時(shí),必須指定其CoroutineScope鞍匾,CoroutineScope可以管理一個(gè)或者多個(gè)相關(guān)的協(xié)程交洗,可以使用它在指定范圍內(nèi)啟動(dòng)新協(xié)程。
與調(diào)度程序不同橡淑,CoroutineScope不運(yùn)行協(xié)程构拳。
CoroutineScope的一項(xiàng)重要功能就是在用戶離開(kāi)應(yīng)用中內(nèi)容區(qū)域時(shí)停止執(zhí)行協(xié)程,可以確保所有正在運(yùn)行的操作都能正確停止梁棠。
在Android平臺(tái)上置森,可以將CoroutineScope實(shí)現(xiàn)與組件中生命周期相關(guān)聯(lián),例如:Lifecycle和ViewModel符糊,這樣可以避免內(nèi)存泄漏和不再對(duì)與用戶相關(guān)的Activity或者Fragment執(zhí)行額外的工作凫海,例如:ViewModelScope、LifecycleScope和liveData濒蒋。
添加KTX依賴項(xiàng)
- 對(duì)于ViewModelScope盐碱,請(qǐng)使用androidx.lifecycle:lifecycle-viewmodel-ktx:2.1.0-beta01或更高版本把兔。
- 對(duì)于LifecycleScope沪伙,請(qǐng)使用androidx.lifecycle:lifecycle-runtime-ktx:2.2.0-alpha01或更高版本。
- 對(duì)于liveData县好,請(qǐng)使用androidx.lifecycle:lifecycle-livedata-ktx:2.2.0-alpha01或更高版本围橡。
生命周期感知型CoroutineScope
ViewModelScope
為ViewModel定義ViewModelScope,如果ViewModel已清除缕贡,那么在這個(gè)范圍內(nèi)的啟動(dòng)的協(xié)程就會(huì)自動(dòng)取消翁授,如果你的工作需要在ViewModel處于活動(dòng)狀態(tài)下才能完成的話,可以使用它晾咪,示例代碼如下所示:
class MyViewModel : ViewModel() {
init {
viewModelScope.launch {
// 當(dāng)ViewModel被清除收擦,這個(gè)范圍內(nèi)啟動(dòng)的協(xié)程就會(huì)自動(dòng)取消
}
}
}
LifecycleScope
為每個(gè)Lifecycle對(duì)象定義LifecycleScope,在這個(gè)范圍內(nèi)啟動(dòng)的協(xié)程會(huì)在Lifecycle被銷毀的時(shí)候自動(dòng)取消谍倦,可以通過(guò)lifecycle.coroutineScope或者lifecycleOwner.lifecycleScope屬性訪問(wèn)Lifecycle的CoroutineScope塞赂,示例代碼如下所示:
class MyFragment : Fragment() {
override fun onViewCreated(view: View, savedInstanceState: Bundle?) {
super.onViewCreated(view, savedInstanceState)
viewLifecycleOwner.lifecycleScope.launch {
// 在這個(gè)范圍內(nèi)啟動(dòng)的協(xié)程會(huì)在Lifecycle被銷毀的時(shí)候自動(dòng)取消
}
}
}
即使CoroutineScope提供了適當(dāng)?shù)姆椒▉?lái)自動(dòng)取消長(zhǎng)時(shí)間運(yùn)行的操作,在某些情況下昼蛀,可能需要暫停執(zhí)行代碼塊宴猾,例如:要使用FragmentTransaction圆存,Fragment的生命周期至少處于STARTED狀態(tài),對(duì)于這種情況仇哆,Lifecycle提供了這些方法:lifecycle.whenCreated沦辙、lifecycle.whenStarted和lifecycle.whenResumed,如果Lifecycle未至少處于所需的最低狀態(tài)讹剔,那么就會(huì)暫停在這些代碼塊內(nèi)運(yùn)行的任何協(xié)程油讯,示例代碼如下所示:
class MyFragment : Fragment() {
init {
// 在Fragment的構(gòu)造函數(shù)可以安全地啟動(dòng)
lifecycleScope.launch {
whenCreateed {
// 只有當(dāng)Fragment的生命周期至少處于CREATED狀態(tài)下,這個(gè)代碼塊才會(huì)執(zhí)行延欠,并且可以調(diào)用其他suspend函數(shù)
}
whenStarted {
// 只有當(dāng)Fragment的生命周期至少處于STARTED狀態(tài)下撞羽,這個(gè)代碼塊才會(huì)執(zhí)行,并且可以調(diào)用其他suspend函數(shù)
}
whenResumed {
// 只有當(dāng)Fragment的生命周期至少處于RESUMED狀態(tài)下衫冻,這個(gè)代碼塊才會(huì)執(zhí)行诀紊,并且可以調(diào)用其他suspend函數(shù)
}
}
}
}
liveData
使用LiveData時(shí),我們可能需要異步計(jì)算值隅俘,例如:獲取了用戶信息后顯示到界面邻奠,在這種情況下,我們可以使用liveData構(gòu)建器函數(shù)調(diào)用suspend函數(shù)为居,并且將結(jié)果作為LiveData對(duì)象返回碌宴,示例代碼如下所示:
val userInfoData: LiveData<UserInfoData> = liveData {
// getUserInfo函數(shù)是一個(gè)suspend函數(shù)
val data = remoteSource.getUserInfo()
emit(data)
}
liveData構(gòu)建塊用作協(xié)程和LiveData之間的結(jié)構(gòu)化并發(fā)基元。
當(dāng)LiveData變?yōu)?strong>活動(dòng)狀態(tài)的時(shí)候蒙畴,代碼塊開(kāi)始執(zhí)行贰镣;當(dāng)LiveData變?yōu)?strong>非活動(dòng)狀態(tài)的時(shí)候,代碼塊會(huì)在可配置的超時(shí)過(guò)后自動(dòng)取消膳凝。如果代碼塊在完成前取消碑隆,則會(huì)在LiveData再次變成活動(dòng)狀態(tài)后重啟;如果在上次運(yùn)行中成功完成蹬音,則不會(huì)重啟上煤。要注意的是,代碼塊只有在自動(dòng)取消的情況下才會(huì)重啟著淆,如果代碼塊由于任何其他原因(例如:拋出CancelationException)而取消劫狠,則不會(huì)重啟。
我們可以從代碼塊中發(fā)出多個(gè)值永部,每次調(diào)用emit函數(shù)都會(huì)暫停執(zhí)行代碼塊独泞,直到在主線程上設(shè)置LiveData值。
我的GitHub:TanJiaJunBeyond
Android通用框架:Android通用框架
我的掘金:譚嘉俊
我的簡(jiǎn)書(shū):譚嘉俊
我的CSDN:譚嘉俊