協(xié)程作用域CoroutineScope
在 Android 環(huán)境中蟹漓,通常每個界面(Activity畔师、Fragment 等)啟動的 Coroutine 只在該界面有意義,如果用戶在等待 Coroutine 執(zhí)行的時候退出了這個界面牧牢,則再繼續(xù)執(zhí)行這個 Coroutine 可能是沒必要的。另外 Coroutine 也需要在適當(dāng)?shù)?context 中執(zhí)行姿锭,否則會出現(xiàn)錯誤塔鳍,比如在非 UI 線程去訪問 View。 所以 Coroutine 在設(shè)計的時候呻此,要求在一個范圍(Scope)內(nèi)執(zhí)行轮纫,這樣當(dāng)這個 Scope 取消的時候,里面所有的子 Coroutine 也自動取消焚鲜。所以要使用 Coroutine 必須要先創(chuàng)建一個對應(yīng)的 CoroutineScope
掌唾。
CoroutineScope 接口
CoroutineScope 是一個接口,要是查看這個接口的源代碼的話就發(fā)現(xiàn)這個接口里面只定義了一個屬性 CoroutineContext
:
public interface CoroutineScope {
// Scope 的 Context
public val coroutineContext: CoroutineContext
}
所以 CoroutineScope 只是定義了一個新 Coroutine 的執(zhí)行 Scope忿磅。每個協(xié)程coroutine builder(launch 糯彬、async等) 都是 CoroutineScope 的擴展方法,并且自動的繼承了當(dāng)前 Scope 的 coroutineContext 和取消操作葱她。
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
......
}
每個 coroutine builder 和 scope 方法(withContext撩扒、coroutineScope 等)都使用自己的 Scope 和 自己管理的 Job 來運行提供給這些函數(shù)的代碼塊。并且也會等待該代碼塊中所有子 Coroutine 執(zhí)行吨些,當(dāng)所有子 Coroutine 執(zhí)行完畢并且返回的時候搓谆, 該代碼塊才執(zhí)行完畢炒辉,這種行為被稱之為 “structured concurrency”。
全局作用域GlobalScope
GlobalScope 是 CoroutineScope 的一個單例實現(xiàn)泉手,其代碼也是非常簡單的:
public object GlobalScope : CoroutineScope {
/**
* Returns [EmptyCoroutineContext].
*/
override val coroutineContext: CoroutineContext
get() = EmptyCoroutineContext
}
用法
GlobalScope.launch(Dispatchers.Main) {
delay(7000)
val result = "content"
//主線程里更新 UI
text.text = result
}
該實例所用的 CoroutineContext 是一個 EmptyCoroutineContext 實例(這也是一個單例 object 對象)黔寇。由于 GlobalScope 對象沒有和應(yīng)用生命周期組件相關(guān)聯(lián)斩萌,需要自己管理 GlobalScope 所創(chuàng)建的 Coroutine缝裤,所以一般而言我們不直接使用 GlobalScope 來創(chuàng)建 Coroutine。
與生命周期綁定的作用域
一般而言术裸,在應(yīng)用中具有生命周期的組件應(yīng)該實現(xiàn) CoroutineScope 接口,并負責(zé)該組件內(nèi) Coroutine 的創(chuàng)建和管理袭艺。例如對于 Android 應(yīng)用來說搀崭,可以在 Activity 中實現(xiàn) CoroutineScope 接口, 例如:
class ScopedActivity : Activity(), CoroutineScope {
lateinit var job: Job
// CoroutineScope 的實現(xiàn)
override val coroutineContext: CoroutineContext
get() = Dispatchers.Main + job
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
job = Job()
/*
* 注意 coroutine builder 的 scope猾编, 如果 activity 被銷毀了或者該函數(shù)內(nèi)創(chuàng)建的 Coroutine
* 拋出異常了瘤睹,則所有子 Coroutines 都會被自動取消。不需要手工去取消答倡。
*/
launch { // <- 自動繼承當(dāng)前 activity 的 scope context轰传,所以在 UI 線程執(zhí)行
val ioData = async(Dispatchers.IO) { // <- launch scope 的擴展函數(shù),指定了 IO dispatcher瘪撇,所以在 IO 線程運行
// 在這里執(zhí)行阻塞的 I/O 耗時操作
}
// 和上面的并非 I/O 同時執(zhí)行的其他操作
val data = ioData.await() // 等待阻塞 I/O 操作的返回結(jié)果
draw(data) // 在 UI 線程顯示執(zhí)行的結(jié)果
}
}
override fun onDestroy() {
super.onDestroy()
// 當(dāng) Activity 銷毀的時候取消該 Scope 管理的 job获茬。
// 這樣在該 Scope 內(nèi)創(chuàng)建的子 Coroutine 都會被自動的取消。
job.cancel()
}
}
由于所有的 Coroutine 都需要一個 CoroutineScope倔既,所以為了方便創(chuàng)建 Coroutine恕曲,在 CoroutineScope 上有很多擴展函數(shù),比如 launch渤涌、async佩谣、actor、cancel 等实蓬。
MainScope
在 Android 中會經(jīng)常需要實現(xiàn)這個 CoroutineScope茸俭,所以為了方便開發(fā)者使用, 標準庫中定義了一個 MainScope() 函數(shù)安皱,該函數(shù)定義了一個使用 SupervisorJob 和 Dispatchers.Main 為 Scope context 的實現(xiàn)调鬓。所以上面的代碼可以簡化為:
class ScopedActivity : Activity(),
CoroutineScope by MainScope(){ // 使用 by 指定代理實現(xiàn)
override fun onDestroy() {
super.onDestroy()
cancel() // 調(diào)用 CoroutineScope 的 cancel 函數(shù)
}
在mvvm模式使用作用域
class ViewModelOne : ViewModel() {
private val viewModelJob = SupervisorJob()
private val uiScope = CoroutineScope(Dispatchers.Main + viewModelJob)
val mMessage: MutableLiveData<String> = MutableLiveData()
fun getMessage(message: String) {
uiScope.launch {
val deferred = async(Dispatchers.IO) {
delay(2000)
"post $message"
}
mMessage.value = deferred.await()
}
}
override fun onCleared() {
super.onCleared()
viewModelJob.cancel()
}
}
ViewModelScope 方式
AndroidX Lifecycle v2.1.0 在 ViewModel 中引入 viewModelScope,當(dāng) ViewModel 被銷毀時它會自動取消協(xié)程任務(wù)酌伊,這個特性真的好用袖迎。viewModelScope 管理協(xié)程的方式與我們在 ViewModel 引入?yún)f(xié)程的方式一樣,代碼實現(xiàn)如下:
class MyViewModel : ViewModel() {
fun launchDataLoad() {
viewModelScope.launch {
sortList()
// Modify UI
}
}
suspend fun sortList() = withContext(Dispatchers.Default) {
// Heavy work
}
}
協(xié)程上下文CoroutineContext
CoroutineContext是一個接口,我們常用到的Job, Dispatchers都是實現(xiàn)了該接口的類燕锥,此外還包括 CoroutineName 和CoroutineId等類辜贵。
@SinceKotlin("1.3")
public interface CoroutineContext {
public operator fun <E : Element> get(key: Key<E>): E?
public fun <R> fold(initial: R, operation: (R, Element) -> R): R
public operator fun plus(context: CoroutineContext): CoroutineContext = ...
public fun minusKey(key: Key<*>): CoroutineContext
public interface Key<E : Element>
public interface Element : CoroutineContext {
public val key: Key<*>
...
}
}
類似于一個以 Key 為索引的 List:
CoroutineContext 作為一個集合,它的元素就是源碼中看到的 Element归形,每一個 Element 都有一個 key托慨,因此它可以作為元素出現(xiàn),同時它也是 CoroutineContext 的子接口暇榴,因此也可以作為集合出現(xiàn)厚棵。
我們看下實現(xiàn)了上下文接口的Job類
public interface Job : CoroutineContext.Element {
/**
* Key for [Job] instance in the coroutine context.
*/
public companion object Key : CoroutineContext.Key<Job> {
init {
/*
* Here we make sure that CoroutineExceptionHandler is always initialized in advance, so
* that if a coroutine fails due to StackOverflowError we don't fail to report this error
* trying to initialize CoroutineExceptionHandler
*/
CoroutineExceptionHandler
}
}
}
可以看到Job實現(xiàn)關(guān)系是Job<=Element<=CoroutineContext
public interface Element : CoroutineContext {
/**
* A key of this coroutine context element.
*/
public val key: Key<*>
public override operator fun <E : Element> get(key: Key<E>): E? =
@Suppress("UNCHECKED_CAST")
if (this.key == key) this as E else null
public override fun <R> fold(initial: R, operation: (R, Element) -> R): R =
operation(initial, this)
public override fun minusKey(key: Key<*>): CoroutineContext =
if (this.key == key) EmptyCoroutineContext else this
}
協(xié)程生成器 Coroutine builders
我們知道生成協(xié)程的方式有很多種,比如 launch蔼紧、async婆硬、runBlocking等,他們都是
CoroutineScope的擴展方法奸例,且都會創(chuàng)建一個新的協(xié)程
下面我們拿launch開啟一個協(xié)程的例子來了解下如何設(shè)置上下文
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
val newContext = newCoroutineContext(context)
...
}
我們?yōu)樯舷挛奶砑诱{(diào)度器與上下文名字
launch(context = Dispatchers.Main.plus(CoroutineName("jason"))) {
val job = coroutineContext.get(Job)
val dispatcher = coroutineContext.get(ContinuationInterceptor)
val name = coroutineContext.get(CoroutineName)
println("job:$job")
println("dispatcher:$dispatcher")
println("name:$name")
}
打印輸出結(jié)果
job:StandaloneCoroutine{Active}@ad739cb
dispatcher:Main
name:CoroutineName(jason)
上下文切換器withContext
與 launch彬犯、async、runBlocking 等不同查吊,withContext 不會創(chuàng)建新的協(xié)程谐区。
withContext 允許切換協(xié)程上下文,使用時必須傳遞一個 CoroutineContext
public suspend fun <T> withContext(
context: CoroutineContext,
block: suspend CoroutineScope.() -> T
): T = suspendCoroutineUninterceptedOrReturn sc@ { uCont ->
...
}
看下例子
launch(context = Dispatchers.IO.plus(CoroutineName("c1"))) {
val dispatcher = coroutineContext[ContinuationInterceptor]
val name = coroutineContext[CoroutineName]
println("scope:$this")
println("dispatcher:$dispatcher")
println("name:$name")
withContext(Dispatchers.Main.plus(CoroutineName("c2"))){
val dispatcher2 = coroutineContext[ContinuationInterceptor]
val name2 = coroutineContext[CoroutineName]
println("scope2:$this")
println("dispatcher2:$dispatcher2")
println("name2:$name2")
}
}
日志輸出結(jié)果
scope:StandaloneCoroutine{Active}@60d84a6
dispatcher:LimitingDispatcher@903ee7[dispatcher = DefaultDispatcher]
name:CoroutineName(c1)
scope2:DispatchedCoroutine{Active}@e3f5a94
dispatcher2:Main
name2:CoroutineName(c2)
續(xù)體攔截器ContinuationInterceptor
攔截器也是上下文的一種逻卖,它實現(xiàn)了上下文接口
我們可以用攔截器打日志等宋列。調(diào)度器就是基于攔截器實現(xiàn)的,換句話說調(diào)度器就是攔截器的一種评也。
public interface ContinuationInterceptor : CoroutineContext.Element {
companion object Key : CoroutineContext.Key<ContinuationInterceptor>
public fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T>
...
}
下面我們自己定義一個攔截器放到我們的協(xié)程上下文中炼杖,看看會發(fā)生什么。
class MyContinuationInterceptor: ContinuationInterceptor {
override val key = ContinuationInterceptor
override fun <T> interceptContinuation(continuation: Continuation<T>) = MyContinuation(continuation)
}
class MyContinuation<T>(val continuation: Continuation<T>): Continuation<T> {
override val context = continuation.context
override fun resumeWith(result: Result<T>) {
Log.d("jason","result=$result" )
continuation.resumeWith(result)
}
}
launch {
launch(MyContinuationInterceptor()) {
log(1)
val deferred = async {
log(2)
delay(1000)
log(3)
"我是返回值"
}
log(4)
val result = deferred.await()
log("5. $result")
}.join()
log(6)
}
我們通過 launch 啟動了一個協(xié)程盗迟,為它指定了我們自己的攔截器作為上下文嘹叫,緊接著在其中用 async 啟動了一個協(xié)程,async 與 launch 從功能上是同等類型的函數(shù)诈乒,它們都被稱作協(xié)程的 Builder 函數(shù),不同之處在于 async 啟動的 Job 也就是實際上的 Deferred 可以有返回結(jié)果婆芦,可以通過 await 方法獲取怕磨。
輸出日志
23:02:58.595 5241-5241/com.example.mytest D/jason: result=Success(kotlin.Unit) //1
23:02:58.596 5241-5241/com.example.mytest D/jason: 1
23:02:58.598 5241-5241/com.example.mytest D/jason: result=Success(kotlin.Unit) //2
23:02:58.598 5241-5241/com.example.mytest D/jason: 2
23:02:58.602 5241-5241/com.example.mytest D/jason: 4
23:02:59.602 5241-5273/com.example.mytest D/jason: result=Success(kotlin.Unit) //3
23:02:59.602 5241-5273/com.example.mytest D/jason: 3
23:02:59.603 5241-5273/com.example.mytest D/jason: result=Success(我是返回值) //4
23:02:59.604 5241-5273/com.example.mytest D/jason: 5. 我是返回值
23:02:59.605 5241-5241/com.example.mytest D/jason: 6
首先,這段代碼中一共有4次調(diào)度機會消约,所有協(xié)程啟動的時候肠鲫,都會有一次 Continuation.resumeWith 的操作,這一次操作對于調(diào)度器來說就是一次調(diào)度的機會或粮,我們的協(xié)程有機會調(diào)度到其他線程的關(guān)鍵之處就在于此导饲。 ①、② 兩處都是這種情況。
其次渣锦,delay 是掛起點硝岗,1000ms 之后需要繼續(xù)調(diào)度執(zhí)行該協(xié)程,因此就有了 ③ 處的日志袋毙。
最后型檀,④ 處的日志就很容易理解了,正是我們的返回結(jié)果听盖。
如果我們在攔截器當(dāng)中自己處理了線程切換胀溺,那么就實現(xiàn)了自己的一個簡單的調(diào)度器,大家有興趣可以自己去嘗試皆看。
協(xié)程任務(wù)執(zhí)行環(huán)境-Dispatcher(調(diào)度器)
public abstract class CoroutineDispatcher :
AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
...
public abstract fun dispatch(context: CoroutineContext, block: Runnable)
...
}
它本身是協(xié)程上下文的子類仓坞,同時實現(xiàn)了攔截器的接口, dispatch 方法會在攔截器的方法 interceptContinuation 中調(diào)用腰吟,進而實現(xiàn)協(xié)程的調(diào)度无埃。所以如果我們想要實現(xiàn)自己的調(diào)度器,繼承這個類就可以了蝎困,不過通常我們都用現(xiàn)成的
現(xiàn)成的調(diào)度器有:
調(diào)度器名稱 | 使用線程 |
---|---|
Main | UI線程 |
IO | 線程池 |
Default | 線程池 |
Unconfined | 直接執(zhí)行 |
也能使用這種方式簡單的創(chuàng)建一個自定義的協(xié)程調(diào)度器
val myDispatcher= Executors.newSingleThreadExecutor{ r -> Thread(r, "MyThread") }.asCoroutineDispatcher()
這里我們主要看一下我們最常使用的Dispatcher.Main和Dispatcher.IO兩個派發(fā)器录语。
Dispatcher.Main
Dispatcher.Main沒有默認實現(xiàn),依賴于各個平臺的實現(xiàn)禾乘,如果沒有引入android依賴包澎埠,則會拋異常提示,那么kotlin是怎么支持這種動態(tài)的類呢始藕?
- 首先kotlin提供了一個工廠類接口蒲稳,用來創(chuàng)建MainDispatcher
public interface MainDispatcherFactory {
fun createDispatcher(allFactories: List<MainDispatcherFactory>): MainCoroutineDispatcher
}
- 然后再看反編譯的源碼
public final <S> List<S> loadProviders$kotlinx_coroutines_core(@NotNull Class<S> paramClass, @NotNull ClassLoader paramClassLoader) {
//從apk的META-INF/services/文件夾下那類名
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append("META-INF/services/");
stringBuilder.append(paramClass.getName());
Enumeration enumeration = paramClassLoader.getResources(stringBuilder.toString());
ArrayList arrayList = Collections.list(enumeration);
Iterable iterable = (Iterable)arrayList;
Collection collection = (Collection)new ArrayList();
for (URL uRL : iterable) {
FastServiceLoader fastServiceLoader = INSTANCE;
Intrinsics.checkExpressionValueIsNotNull(uRL, "it");
CollectionsKt.addAll(collection, (Iterable)fastServiceLoader.parse(uRL));
}
collection = CollectionsKt.toSet((Iterable)collection);
iterable = (Iterable)collection;
collection = (Collection)new ArrayList(CollectionsKt.collectionSizeOrDefault(iterable, 10));
//將類名解析為實例對象
for (String str : iterable)
collection.add(INSTANCE.getProviderInstance(str, paramClassLoader, paramClass));
return (List)collection;
}
MainDispatcher的factory會從apk的META-INF/services/文件夾下獲取。
-
再看編譯生成的apk文件的該文件夾內(nèi)容
image.png
所以android的依賴包是通過向該文件注冊類名實現(xiàn)的注冊類伍派,并且factory類為AndroidDispatcherFactory江耀。
- 最后我們再來看下AndroidDispatcherFactory類
internal class AndroidDispatcherFactory : MainDispatcherFactory {
override fun createDispatcher(allFactories: List<MainDispatcherFactory>) = HandlerContext(Looper.getMainLooper().asHandler(async = true), "Main")
}
internal class HandlerContext private constructor(
private val handler: Handler,
private val name: String?,
private val invokeImmediately: Boolean
) : HandlerDispatcher(), Delay {
public constructor(
handler: Handler,
name: String? = null
) : this(handler, name, false)
//android中需要向主looper進行提交調(diào)度
override fun isDispatchNeeded(context: CoroutineContext): Boolean {
return !invokeImmediately || Looper.myLooper() != handler.looper
}
//通過持有主線程looper的handler進行調(diào)度
override fun dispatch(context: CoroutineContext, block: Runnable) {
handler.post(block)
}
...
}
很清楚,就是用持有主線程looper的handler進行任務(wù)的調(diào)度诉植,確保任務(wù)會在主線程執(zhí)行祥国。
Dispatcher.IO
internal object DefaultScheduler : ExperimentalCoroutineDispatcher() {
val IO = blocking(systemProp(IO_PARALLELISM_PROPERTY_NAME, 64.coerceAtLeast(AVAILABLE_PROCESSORS)))
public fun blocking(parallelism: Int = BLOCKING_DEFAULT_PARALLELISM): CoroutineDispatcher {
return LimitingDispatcher(this, parallelism, TaskMode.PROBABLY_BLOCKING)
}
}
Dispatcher.IO是一個LimitingDispatcher實例,他可以控制同時并發(fā)任務(wù)數(shù)晾腔,默認為64個舌稀,即最多有64個任務(wù)同時在運行。
private class LimitingDispatcher(
val dispatcher: ExperimentalCoroutineDispatcher,
val parallelism: Int,
override val taskMode: TaskMode
) : ExecutorCoroutineDispatcher()
而LimitingDispatcher內(nèi)部真正調(diào)度任務(wù)的dispatcher是一個ExperimentalCoroutineDispatcher對象灼擂。
open class ExperimentalCoroutineDispatcher(
private val corePoolSize: Int,
private val maxPoolSize: Int,
private val idleWorkerKeepAliveNs: Long,
private val schedulerName: String = "CoroutineScheduler"
) : ExecutorCoroutineDispatcher() {
constructor(
corePoolSize: Int = CORE_POOL_SIZE,
maxPoolSize: Int = MAX_POOL_SIZE,
schedulerName: String = DEFAULT_SCHEDULER_NAME
) : this(corePoolSize, maxPoolSize, IDLE_WORKER_KEEP_ALIVE_NS, schedulerName)
private var coroutineScheduler = createScheduler()
override fun dispatch(context: CoroutineContext, block: Runnable): Unit =
try {
coroutineScheduler.dispatch(block)
} catch (e: RejectedExecutionException) {
DefaultExecutor.dispatch(context, block)
}
private fun createScheduler() = CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName)
}
我們看到壁查,該dispatcher里面的真正的線程池,是CoroutineScheduler對象剔应,而核心線程數(shù)和最大線程數(shù)睡腿,取決于可用CPU的數(shù)量语御。
internal val CORE_POOL_SIZE = systemProp(
"kotlinx.coroutines.scheduler.core.pool.size",
AVAILABLE_PROCESSORS.coerceAtLeast(2), // !!! at least two here
minValue = CoroutineScheduler.MIN_SUPPORTED_POOL_SIZE
)
internal val AVAILABLE_PROCESSORS = Runtime.getRuntime().availableProcessors()
協(xié)程調(diào)度器-CoroutineScheduler
這里我們挑幾個小細節(jié)看一下CoroutineScheduler是如何來優(yōu)化對線程的使用的。
i. 盡量使用當(dāng)前線程
private fun submitToLocalQueue(task: Task, fair: Boolean): Int {
val worker = currentWorker() ?: return NOT_ADDED
...
worker.localQueue.add(task, globalQueue)
...
}
private fun currentWorker(): Worker? = (Thread.currentThread() as? Worker)?.takeIf { it.scheduler == this }
如果當(dāng)前線程是Dispatcher.IO開啟的工作線程席怪,那么任務(wù)優(yōu)先交由該線程的任務(wù)隊列应闯,等待處理。
ii. 雙重隊列
fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, fair: Boolean = false) {
...
when (submitToLocalQueue(task, fair)) {
ADDED -> return
NOT_ADDED -> {
//本地隊列已滿何恶,放入全局隊列孽锥,所有線程可取
globalQueue.addLast(task)
}
else -> requestCpuWorker() // ask for help
}
}
如果工作線程本地隊列無限大,一味的放入本地隊列的話细层,可能會造成單一線程工作惜辑,效率極低,于是每個工作線程有固定大小的queue疫赎,滿了之后盛撑,會放到全局queue中,等待任意空閑工作線程執(zhí)行捧搞。
iii.搶占其他線程的任務(wù)
//工作線程Worker類
override fun run() {
while (!isTerminated && state != WorkerState.TERMINATED) {
val task = findTask()
...
}
...
}
private fun findTaskWithCpuPermit(): Task? {
...
//從本地queue獲取任務(wù)
localQueue.poll()?.let { return it }
//從全局queue獲取任務(wù)
if (!globalFirst) globalQueue.removeFirstOrNull()?.let { return it }
//搶占其他線程任務(wù)
return trySteal()
}
private fun trySteal(): Task? {
...
//隨機一個工作線程
if (stealIndex == 0) stealIndex = nextInt(created)
...
val worker = workers[stealIndex]
if (worker !== null && worker !== this) {
//將其queue里的任務(wù)放到自己queue中
if (localQueue.trySteal(worker.localQueue, globalQueue)) {
return localQueue.poll()
}
}
return null
}
如果一個工作線程的本地queue和全局queue都沒有任務(wù)了抵卫,但是其他線程的queue還有任務(wù),此時讓其空閑胎撇,一是沒有充分利用線程提升工作效率介粘,二是線程的空閑狀態(tài)切換需要開銷,所以此時會嘗試從任一工作線程的queue中取出任務(wù)晚树,放入自己的queue中執(zhí)行姻采。
以上三點的相互配合,可以充分利用線程資源爵憎,避免過多線程的使用及開銷慨亲,也保證了多任務(wù)時的工作效率。
協(xié)程執(zhí)行過程源碼追蹤分析
我們以一個請求數(shù)據(jù)后在主線程更新界面的代碼來進行分析
fun setUpUI(){
GlobalScope.launch(Main) {
val dataDeferred = requestDataAsync()
doSomethingElse()
val data = dataDeferred.await()
processData(data)
}
Thread.sleep(1000)
doSomethingElse2()
}
fun requestDataAsync():Deferred<String>{
// 啟動一個異步協(xié)程去執(zhí)行耗時任務(wù)
return GlobalScope.async {
requestData()
}
}
fun doSomethingElse2(){
println("doSomethingElse2")
}
編譯后生成偽代碼
final void setUpUI() {
BuildersKt__Builders_commonKt.launch$default(GlobalScope.INSTANCE,
Dispatchers.getMain(),
null,
// 傳入的是一個 KotlinTest$setUpUI.KotlinTest$setUpUI$1 對象
(Function2)new KotlinTest$setUpUI.KotlinTest$setUpUI$1(this, (Continuation)null), 2, null);
this.doSomethingElse2();
}
final class setUpUI$1 extends SuspendLambda implements Function2{
public final Object invokeSuspend(Object result) {
switch (this.label) {
case 0:
doSomethingElse()
// 新建并啟動 async 協(xié)程
Deferred async$default = BuildersKt.async$default(coroutineScope, (CoroutineContext) Dispatchers.getDefault(), null, (Function2) new 1(null), 2, null);
this.label = 1;
// 如果 async 協(xié)程還沒完成為掛起狀態(tài) 則直接返回宝鼓,等待下次喚醒重入
if (async$default.await(this) == coroutine_suspended) {
return coroutine_suspended;
}
break;
case 1:
val data = result;
processData(data)
break;
}
}
}
可以看到傳入到 launch 函數(shù)第四個參數(shù)位置的是一個編譯后生成的 SuspendLambda 類實例setUpUI$1刑棵,SuspendLambda 本質(zhì)上是一個續(xù)體 Continuation,而 Continuation 是一個有著恢復(fù)操作的接口
/**
* 在一個掛起點之后可以返回類型T值的續(xù)集continuation的接口
* Interface representing a continuation after a suspension point that returns value of type `T`.
*/
@SinceKotlin("1.3")
public interface Continuation<in T> {
/**
* Context of the coroutine that corresponds to this continuation.
*/
// todo: shall we provide default impl with EmptyCoroutineContext?
public val context: CoroutineContext
/**
* Resumes the execution of the corresponding coroutine passing successful or failed [result] as the
* return value of the last suspension point.
*/
public fun resumeWith(result: Result<T>)
}
SuspendLambda 繼承結(jié)構(gòu)如下
SuspendLambda > ContinuationImpl > BaseContinuationImpl > Continuation
每一層封裝對應(yīng)添加了不同的功能愚铡,我們先忽略掉這些功能細節(jié)蛉签,著眼于我們的主線,繼續(xù)跟進launch 函數(shù)執(zhí)行過程沥寥,由于第二個參數(shù)是默認值碍舍,所以創(chuàng)建的是 StandaloneCoroutine, 調(diào)用鏈如下:
coroutine.start(start, coroutine, block)
-> CoroutineStart.start(block, receiver, this)
-> CoroutineStart.invoke(block: suspend () -> T, completion: Continuation<T>)
-> block.startCoroutineCancellable(completion)
-> createCoroutineUnintercepted(completion).intercepted().resumeCancellable(Unit)
我們看最后創(chuàng)建了一個協(xié)程营曼,并鏈式調(diào)用 intercepted、resumeCancellable 方法愚隧,利用協(xié)程上下文中的續(xù)體攔截器 ContinuationInterceptor 對協(xié)程的執(zhí)行進行攔截蒂阱,intercepted 實際上調(diào)用的是 ContinuationImpl 的 intercepted 方法
internal abstract class ContinuationImpl(
completion: Continuation<Any?>?,
private val _context: CoroutineContext?
) : BaseContinuationImpl(completion) {
...
public fun intercepted(): Continuation<Any?> =
intercepted
?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
.also { intercepted = it }
...
}
context[ContinuationInterceptor]?.interceptContinuation調(diào)用的是 CoroutineDispatcher 的 interceptContinuation 方法
public final <T> Continuation<T> interceptContinuation(@NotNull final Continuation<? super T> continuation) {
Intrinsics.checkParameterIsNotNull(continuation, "continuation");
return new DispatchedContinuation<T>(this, continuation);
}
最終創(chuàng)建了一個 DispatchedContinuation 可分發(fā)的協(xié)程實例锻全,我們繼續(xù)看resumeCancellable 方法
internal fun <T> Continuation<T>.resumeCancellable(value: T) = when (this) {
// 判斷是否是DispatchedContinuation 根據(jù)我們前面的代碼追蹤 這里是DispatchedContinuation
is DispatchedContinuation -> resumeCancellable(value)
else -> resume(value)
}
inline fun resumeCancellable(value: T) {
// 判斷是否需要線程調(diào)度
if (dispatcher.isDispatchNeeded(context)) {
_state = value
resumeMode = MODE_CANCELLABLE
dispatcher.dispatch(context, this)
} else {
UndispatchedEventLoop.execute(this, value, MODE_CANCELLABLE) {
if (!resumeCancelled()) {
resumeUndispatched(value)
}
}
}
}
最終走到 dispatcher.dispatch(context, this) 而這里的 dispatcher 就是通過工廠方法創(chuàng)建的 HandlerDispatcher ,dispatch() 函數(shù)第二個參數(shù)this是一個runnable這里為 DispatchedTask
HandlerDispatcher
/**
* Implements [CoroutineDispatcher] on top of an arbitrary Android [Handler].
*/
internal class HandlerContext private constructor(
private val handler: Handler,
private val name: String?,
private val invokeImmediately: Boolean
) : HandlerDispatcher(), Delay {
...
// 最終執(zhí)行這里的 dispatch方法 而handler則是android中的 MainHandler
override fun dispatch(context: CoroutineContext, block: Runnable) {
handler.post(block)
}
...
}
這里借用 Android 的主線程消息隊列來在主線程中執(zhí)行 block Runnable 而這個 Runnable 即為 DispatchedTask
internal abstract class DispatchedTask<in T>(
@JvmField var resumeMode: Int
) : SchedulerTask() {
...
public final override fun run() {
...
withCoroutineContext(context, delegate.countOrElement) {
if (job != null && !job.isActive)
// 異常情況下
continuation.resumeWithException(job.getCancellationException())
else {
val exception = getExceptionalResult(state)
if (exception != null)
// 異常情況下
continuation.resumeWithStackTrace(exception)
else
// 正常情況下走到這一步
continuation.resume(getSuccessfulResult(state))
}
}
...
}
}
@InlineOnly public inline fun <T> Continuation<T>.resume(value: T): Unit =
resumeWith(Result.success(value))
internal abstract class BaseContinuationImpl(...) {
// 實現(xiàn) Continuation 的 resumeWith录煤,并且是 final 的鳄厌,不可被重寫
public final override fun resumeWith(result: Result<Any?>) {
...
val outcome = invokeSuspend(param)
...
}
// 由編譯生成的協(xié)程相關(guān)類來實現(xiàn),例如 setUpUI$1
protected abstract fun invokeSuspend(result: Result<Any?>): Any?
}
最終調(diào)用到 continuation.resumeWith() 而 resumeWith() 中會調(diào)用 invokeSuspend妈踊,即之前編譯器生成的 SuspendLambda 中的 invokeSuspend 方法
final class setUpUI$1 extends SuspendLambda implements Function2{
public final Object invokeSuspend(Object result) {
switch (this.label) {
case 0:
doSomethingElse()
// 新建并啟動 async 協(xié)程
Deferred async$default = BuildersKt.async$default(coroutineScope, (CoroutineContext) Dispatchers.getDefault(), null, (Function2) new 1(null), 2, null);
this.label = 1;
// 如果 async 協(xié)程還沒完成為掛起狀態(tài) 則直接返回了嚎,等待下次喚醒重入
if (async$default.await(this) == coroutine_suspended) {
return coroutine_suspended;
}
break;
case 1:
val data = result;
processData(data)
break;
}
}
}
這段代碼是一個狀態(tài)機機制,每一個掛起點都是一種狀態(tài)廊营,協(xié)程恢復(fù)只是跳轉(zhuǎn)到下一個狀態(tài)歪泳,掛起點將執(zhí)行過程分割成多個片段,利用狀態(tài)機的機制保證各個片段按順序執(zhí)行露筒。
如果沒有掛起點就只有一個初始狀態(tài)呐伞,類似于callback回調(diào),所以對應(yīng)了之前我們分析的非阻塞的異步底層實現(xiàn)其實也是一種callback回調(diào)慎式,只不過有多個掛起點時就會有多個callback回調(diào)伶氢,我們把多個callback回調(diào)封裝成了一個狀態(tài)機。
協(xié)程的掛起
從協(xié)程的調(diào)度過程我們知道瘪吏,調(diào)度后會到編譯器生成的 SuspendLambda 的 invokeSuspend 方法中的一個掛起方法癣防,以例子中的await為例
if (async$default.await(this) == coroutine_suspended) {
//目前還在掛起中,則return等待掛起結(jié)束后的invokeSuspend
return coroutine_suspended;
}
async 也是一個協(xié)程掌眠,如果狀態(tài)為掛起coroutine_suspended蕾盯,則執(zhí)行流直接 return 返回,如果已達到完成狀態(tài)直接跳轉(zhuǎn)下一個狀態(tài) case 1 最終走完整個協(xié)程代碼塊扇救。
這里需要注意的是:
啟動一個新的協(xié)程并不會掛起當(dāng)前協(xié)程刑枝,只有當(dāng)使用庫函數(shù) await、yield方法時才會將當(dāng)前的協(xié)程掛起迅腔。
協(xié)程掛起并不會阻塞線程装畅,線程在掛起點 return 后可以去執(zhí)行其他的代碼塊。
協(xié)程的掛起過程很簡單沧烈,代碼塊直接返回掠兄,當(dāng)前狀態(tài)保存在狀態(tài)機 SuspendLambda 中,可以想象到協(xié)程恢復(fù)的時候也是調(diào)用 SuspendLambda 的 invokeSuspend 從而進入下一個狀態(tài)繼續(xù)執(zhí)行的锌雀。
delay 的實現(xiàn)
public suspend fun delay(timeMillis: Long) {
if (timeMillis <= 0) return // don't delay
return suspendCancellableCoroutine sc@ { cont: CancellableContinuation<Unit> ->
cont.context.delay.scheduleResumeAfterDelay(timeMillis, cont)
}
}
/** Returns [Delay] implementation of the given context */
internal val CoroutineContext.delay: Delay get() = get(ContinuationInterceptor) as? Delay ?: DefaultDelay
internal actual val DefaultDelay: Delay = DefaultExecutor
delay 使用suspendCancellableCoroutine掛起協(xié)程蚂夕,而協(xié)程恢復(fù)的一般情況下是關(guān)鍵在DefaultExecutor.scheduleResumeAfterDelay(),其中實現(xiàn)是schedule(DelayedResumeTask(timeMillis, continuation))腋逆,其中的關(guān)鍵邏輯是將 DelayedResumeTask 放到 DefaultExecutor 的隊列最后婿牍,在延遲的時間到達就會執(zhí)行 DelayedResumeTask,那么該 task 里面的實現(xiàn)是什么:
override fun run() {
// 直接在調(diào)用者線程恢復(fù)協(xié)程
with(cont) { resumeUndispatched(Unit) }
}
yield 的實現(xiàn)
yield()的作用是掛起當(dāng)前協(xié)程惩歉,然后將協(xié)程分發(fā)到 Dispatcher 的隊列等脂,這樣可以讓該協(xié)程所在線程或線程池可以運行其他協(xié)程邏輯俏蛮,然后在 Dispatcher 空閑的時候繼續(xù)執(zhí)行原來協(xié)程。簡單的來說就是讓出自己的執(zhí)行權(quán)上遥,給其他協(xié)程使用搏屑,當(dāng)其他協(xié)程執(zhí)行完成或也讓出執(zhí)行權(quán)時,一開始的協(xié)程可以恢復(fù)繼續(xù)運行粉楚。
看下面的代碼示例:
fun main(args: Array<String>) = runBlocking<Unit> {
launch {
repeat(3) {
println("job1 repeat $it times")
yield()
}
}
launch {
repeat(3) {
println("job2 repeat $it times")
yield()
}
}
}
通過yield()實現(xiàn) job1 和 job2 兩個協(xié)程交替運行辣恋,輸出如下:
job1 repeat 0 times
job2 repeat 0 times
job1 repeat 1 times
job2 repeat 1 times
job1 repeat 2 times
job2 repeat 2 times
現(xiàn)在來看其實現(xiàn):
public suspend fun yield(): Unit = suspendCoroutineUninterceptedOrReturn sc@ { uCont ->
val context = uCont.context
// 檢測協(xié)程是否已經(jīng)取消或者完成,如果是的話拋出 CancellationException
context.checkCompletion()
// 如果協(xié)程沒有線程調(diào)度器模软,或者像 Dispatchers.Unconfined 一樣沒有進行調(diào)度伟骨,則直接返回
val cont = uCont.intercepted() as? DispatchedContinuation<Unit> ?: return@sc Unit
if (!cont.dispatcher.isDispatchNeeded(context)) return@sc Unit
// dispatchYield(Unit) 最終會調(diào)用到 dispatcher.dispatch(context, block) 將協(xié)程分發(fā)到調(diào)度器隊列中,這樣線程可以執(zhí)行其他協(xié)程
cont.dispatchYield(Unit)
COROUTINE_SUSPENDED
}
所以注意到撵摆,yield()需要依賴協(xié)程的線程調(diào)度器底靠,而調(diào)度器再次執(zhí)行該協(xié)程時,會調(diào)用resume來恢復(fù)協(xié)程運行特铝。
現(xiàn)在來看封裝異步邏輯為掛起函數(shù)的關(guān)鍵是用suspendCoroutineUninterceptedOrReturn函數(shù)包裝暑中,然后在異步邏輯完成時調(diào)用resume手動恢復(fù)協(xié)程。