協(xié)程作用域雄妥、上下文與調(diào)度

協(xié)程作用域CoroutineScope

在 Android 環(huán)境中蟹漓,通常每個界面(Activity畔师、Fragment 等)啟動的 Coroutine 只在該界面有意義,如果用戶在等待 Coroutine 執(zhí)行的時候退出了這個界面牧牢,則再繼續(xù)執(zhí)行這個 Coroutine 可能是沒必要的。另外 Coroutine 也需要在適當(dāng)?shù)?context 中執(zhí)行姿锭,否則會出現(xiàn)錯誤塔鳍,比如在非 UI 線程去訪問 View。 所以 Coroutine 在設(shè)計的時候呻此,要求在一個范圍(Scope)內(nèi)執(zhí)行轮纫,這樣當(dāng)這個 Scope 取消的時候,里面所有的子 Coroutine 也自動取消焚鲜。所以要使用 Coroutine 必須要先創(chuàng)建一個對應(yīng)的 CoroutineScope掌唾。

CoroutineScope 接口

CoroutineScope 是一個接口,要是查看這個接口的源代碼的話就發(fā)現(xiàn)這個接口里面只定義了一個屬性 CoroutineContext

public interface CoroutineScope {
    // Scope 的 Context
    public val coroutineContext: CoroutineContext
}

所以 CoroutineScope 只是定義了一個新 Coroutine 的執(zhí)行 Scope忿磅。每個協(xié)程coroutine builder(launch 糯彬、async等) 都是 CoroutineScope 的擴展方法,并且自動的繼承了當(dāng)前 Scope 的 coroutineContext 和取消操作葱她。

public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job {
   ......
}

每個 coroutine builder 和 scope 方法(withContext撩扒、coroutineScope 等)都使用自己的 Scope 和 自己管理的 Job 來運行提供給這些函數(shù)的代碼塊。并且也會等待該代碼塊中所有子 Coroutine 執(zhí)行吨些,當(dāng)所有子 Coroutine 執(zhí)行完畢并且返回的時候搓谆, 該代碼塊才執(zhí)行完畢炒辉,這種行為被稱之為 “structured concurrency”。

全局作用域GlobalScope

GlobalScope 是 CoroutineScope 的一個單例實現(xiàn)泉手,其代碼也是非常簡單的:

public object GlobalScope : CoroutineScope {
    /**
     * Returns [EmptyCoroutineContext].
     */
    override val coroutineContext: CoroutineContext
        get() = EmptyCoroutineContext
}

用法

        GlobalScope.launch(Dispatchers.Main) {
            delay(7000)
            val result = "content"
            //主線程里更新 UI
            text.text = result
        }

該實例所用的 CoroutineContext 是一個 EmptyCoroutineContext 實例(這也是一個單例 object 對象)黔寇。由于 GlobalScope 對象沒有和應(yīng)用生命周期組件相關(guān)聯(lián)斩萌,需要自己管理 GlobalScope 所創(chuàng)建的 Coroutine缝裤,所以一般而言我們不直接使用 GlobalScope 來創(chuàng)建 Coroutine。

與生命周期綁定的作用域

一般而言术裸,在應(yīng)用中具有生命周期的組件應(yīng)該實現(xiàn) CoroutineScope 接口,并負責(zé)該組件內(nèi) Coroutine 的創(chuàng)建和管理袭艺。例如對于 Android 應(yīng)用來說搀崭,可以在 Activity 中實現(xiàn) CoroutineScope 接口, 例如:

class ScopedActivity : Activity(), CoroutineScope {
    lateinit var job: Job
    // CoroutineScope 的實現(xiàn)
    override val coroutineContext: CoroutineContext
        get() = Dispatchers.Main + job

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        job = Job()

    /*
     * 注意 coroutine builder 的 scope猾编, 如果 activity 被銷毀了或者該函數(shù)內(nèi)創(chuàng)建的 Coroutine
     * 拋出異常了瘤睹,則所有子 Coroutines 都會被自動取消。不需要手工去取消答倡。
     */
      launch { // <- 自動繼承當(dāng)前 activity 的 scope context轰传,所以在 UI 線程執(zhí)行
          val ioData = async(Dispatchers.IO) { // <- launch scope 的擴展函數(shù),指定了 IO dispatcher瘪撇,所以在 IO 線程運行
            // 在這里執(zhí)行阻塞的 I/O 耗時操作
          }
        // 和上面的并非 I/O 同時執(zhí)行的其他操作
          val data = ioData.await() // 等待阻塞 I/O 操作的返回結(jié)果
          draw(data) // 在 UI 線程顯示執(zhí)行的結(jié)果
      }
    }

    override fun onDestroy() {
        super.onDestroy()
        // 當(dāng) Activity 銷毀的時候取消該 Scope 管理的 job获茬。
        // 這樣在該 Scope 內(nèi)創(chuàng)建的子 Coroutine 都會被自動的取消。
        job.cancel()
    }


}

由于所有的 Coroutine 都需要一個 CoroutineScope倔既,所以為了方便創(chuàng)建 Coroutine恕曲,在 CoroutineScope 上有很多擴展函數(shù),比如 launch渤涌、async佩谣、actor、cancel 等实蓬。

MainScope

在 Android 中會經(jīng)常需要實現(xiàn)這個 CoroutineScope茸俭,所以為了方便開發(fā)者使用, 標準庫中定義了一個 MainScope() 函數(shù)安皱,該函數(shù)定義了一個使用 SupervisorJob 和 Dispatchers.Main 為 Scope context 的實現(xiàn)调鬓。所以上面的代碼可以簡化為:

class ScopedActivity : Activity(), 
    CoroutineScope by MainScope(){ // 使用 by 指定代理實現(xiàn)

    override fun onDestroy() {
        super.onDestroy()
        cancel() // 調(diào)用 CoroutineScope 的 cancel 函數(shù)
    }

在mvvm模式使用作用域

class ViewModelOne : ViewModel() {

    private val viewModelJob = SupervisorJob()
    private val uiScope = CoroutineScope(Dispatchers.Main + viewModelJob)

    val mMessage: MutableLiveData<String> = MutableLiveData()

    fun getMessage(message: String) {
        uiScope.launch {
            val deferred = async(Dispatchers.IO) {
                delay(2000)
                "post $message"
            }
            mMessage.value = deferred.await()
        }
    }

    override fun onCleared() {
        super.onCleared()
        viewModelJob.cancel()
    }
}

ViewModelScope 方式

AndroidX Lifecycle v2.1.0 在 ViewModel 中引入 viewModelScope,當(dāng) ViewModel 被銷毀時它會自動取消協(xié)程任務(wù)酌伊,這個特性真的好用袖迎。viewModelScope 管理協(xié)程的方式與我們在 ViewModel 引入?yún)f(xié)程的方式一樣,代碼實現(xiàn)如下:

class MyViewModel : ViewModel() {
  
    fun launchDataLoad() {
        viewModelScope.launch {
            sortList()
            // Modify UI
        }
    }
  
    suspend fun sortList() = withContext(Dispatchers.Default) {
        // Heavy work
    }
}

協(xié)程上下文CoroutineContext

CoroutineContext是一個接口,我們常用到的Job, Dispatchers都是實現(xiàn)了該接口的類燕锥,此外還包括 CoroutineName 和CoroutineId等類辜贵。

@SinceKotlin("1.3")
public interface CoroutineContext {
    public operator fun <E : Element> get(key: Key<E>): E?
    public fun <R> fold(initial: R, operation: (R, Element) -> R): R
    public operator fun plus(context: CoroutineContext): CoroutineContext = ...
    public fun minusKey(key: Key<*>): CoroutineContext

    public interface Key<E : Element>

    public interface Element : CoroutineContext {
        public val key: Key<*>
        ...
    }
}

類似于一個以 Key 為索引的 List:

CoroutineContext 作為一個集合,它的元素就是源碼中看到的 Element归形,每一個 Element 都有一個 key托慨,因此它可以作為元素出現(xiàn),同時它也是 CoroutineContext 的子接口暇榴,因此也可以作為集合出現(xiàn)厚棵。

我們看下實現(xiàn)了上下文接口的Job類

public interface Job : CoroutineContext.Element {
    /**
     * Key for [Job] instance in the coroutine context.
     */
    public companion object Key : CoroutineContext.Key<Job> {
        init {
            /*
             * Here we make sure that CoroutineExceptionHandler is always initialized in advance, so
             * that if a coroutine fails due to StackOverflowError we don't fail to report this error
             * trying to initialize CoroutineExceptionHandler
             */
            CoroutineExceptionHandler
        }
    }
}

可以看到Job實現(xiàn)關(guān)系是Job<=Element<=CoroutineContext

    public interface Element : CoroutineContext {
        /**
         * A key of this coroutine context element.
         */
        public val key: Key<*>

        public override operator fun <E : Element> get(key: Key<E>): E? =
            @Suppress("UNCHECKED_CAST")
            if (this.key == key) this as E else null

        public override fun <R> fold(initial: R, operation: (R, Element) -> R): R =
            operation(initial, this)

        public override fun minusKey(key: Key<*>): CoroutineContext =
            if (this.key == key) EmptyCoroutineContext else this
    }

協(xié)程生成器 Coroutine builders

我們知道生成協(xié)程的方式有很多種,比如 launch蔼紧、async婆硬、runBlocking等,他們都是
CoroutineScope的擴展方法奸例,且都會創(chuàng)建一個新的協(xié)程
下面我們拿launch開啟一個協(xié)程的例子來了解下如何設(shè)置上下文

public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job {
    val newContext = newCoroutineContext(context)
    ...
}

我們?yōu)樯舷挛奶砑诱{(diào)度器與上下文名字

        launch(context = Dispatchers.Main.plus(CoroutineName("jason"))) {
            val job = coroutineContext.get(Job)
            val dispatcher = coroutineContext.get(ContinuationInterceptor)
            val name = coroutineContext.get(CoroutineName)
            println("job:$job")
            println("dispatcher:$dispatcher")
            println("name:$name")
        }

打印輸出結(jié)果

job:StandaloneCoroutine{Active}@ad739cb
dispatcher:Main
name:CoroutineName(jason)

上下文切換器withContext

與 launch彬犯、async、runBlocking 等不同查吊,withContext 不會創(chuàng)建新的協(xié)程谐区。
withContext 允許切換協(xié)程上下文,使用時必須傳遞一個 CoroutineContext

public suspend fun <T> withContext(
    context: CoroutineContext,
    block: suspend CoroutineScope.() -> T
): T = suspendCoroutineUninterceptedOrReturn sc@ { uCont ->
  ...
}

看下例子

        launch(context = Dispatchers.IO.plus(CoroutineName("c1"))) {
            val dispatcher = coroutineContext[ContinuationInterceptor]
            val name = coroutineContext[CoroutineName]
            println("scope:$this")
            println("dispatcher:$dispatcher")
            println("name:$name")
            withContext(Dispatchers.Main.plus(CoroutineName("c2"))){
                val dispatcher2 = coroutineContext[ContinuationInterceptor]
                val name2 = coroutineContext[CoroutineName]
                println("scope2:$this")
                println("dispatcher2:$dispatcher2")
                println("name2:$name2")
            }
        }

日志輸出結(jié)果

scope:StandaloneCoroutine{Active}@60d84a6
dispatcher:LimitingDispatcher@903ee7[dispatcher = DefaultDispatcher]
name:CoroutineName(c1)

scope2:DispatchedCoroutine{Active}@e3f5a94
dispatcher2:Main
name2:CoroutineName(c2)

續(xù)體攔截器ContinuationInterceptor

攔截器也是上下文的一種逻卖,它實現(xiàn)了上下文接口
我們可以用攔截器打日志等宋列。調(diào)度器就是基于攔截器實現(xiàn)的,換句話說調(diào)度器就是攔截器的一種评也。

public interface ContinuationInterceptor : CoroutineContext.Element {
    companion object Key : CoroutineContext.Key<ContinuationInterceptor>
    
    public fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T>
    ...
}

下面我們自己定義一個攔截器放到我們的協(xié)程上下文中炼杖,看看會發(fā)生什么。

class MyContinuationInterceptor: ContinuationInterceptor {
    override val key = ContinuationInterceptor
    override fun <T> interceptContinuation(continuation: Continuation<T>) = MyContinuation(continuation)
}

class MyContinuation<T>(val continuation: Continuation<T>): Continuation<T> {
    override val context = continuation.context
    override fun resumeWith(result: Result<T>) {
        Log.d("jason","result=$result" )
        continuation.resumeWith(result)
    }
}
        launch {
            launch(MyContinuationInterceptor()) {
                log(1)
                val deferred = async {
                    log(2)
                    delay(1000)
                    log(3)
                    "我是返回值"
                }
                log(4)
                val result = deferred.await()
                log("5. $result")
            }.join()
            log(6)
        }

我們通過 launch 啟動了一個協(xié)程盗迟,為它指定了我們自己的攔截器作為上下文嘹叫,緊接著在其中用 async 啟動了一個協(xié)程,async 與 launch 從功能上是同等類型的函數(shù)诈乒,它們都被稱作協(xié)程的 Builder 函數(shù),不同之處在于 async 啟動的 Job 也就是實際上的 Deferred 可以有返回結(jié)果婆芦,可以通過 await 方法獲取怕磨。
輸出日志

23:02:58.595 5241-5241/com.example.mytest D/jason: result=Success(kotlin.Unit) //1
23:02:58.596 5241-5241/com.example.mytest D/jason: 1
23:02:58.598 5241-5241/com.example.mytest D/jason: result=Success(kotlin.Unit) //2
23:02:58.598 5241-5241/com.example.mytest D/jason: 2 
23:02:58.602 5241-5241/com.example.mytest D/jason: 4
23:02:59.602 5241-5273/com.example.mytest D/jason: result=Success(kotlin.Unit) //3
23:02:59.602 5241-5273/com.example.mytest D/jason: 3
23:02:59.603 5241-5273/com.example.mytest D/jason: result=Success(我是返回值) //4
23:02:59.604 5241-5273/com.example.mytest D/jason: 5. 我是返回值
23:02:59.605 5241-5241/com.example.mytest D/jason: 6

首先,這段代碼中一共有4次調(diào)度機會消约,所有協(xié)程啟動的時候肠鲫,都會有一次 Continuation.resumeWith 的操作,這一次操作對于調(diào)度器來說就是一次調(diào)度的機會或粮,我們的協(xié)程有機會調(diào)度到其他線程的關(guān)鍵之處就在于此导饲。 ①、② 兩處都是這種情況。

其次渣锦,delay 是掛起點硝岗,1000ms 之后需要繼續(xù)調(diào)度執(zhí)行該協(xié)程,因此就有了 ③ 處的日志袋毙。

最后型檀,④ 處的日志就很容易理解了,正是我們的返回結(jié)果听盖。

如果我們在攔截器當(dāng)中自己處理了線程切換胀溺,那么就實現(xiàn)了自己的一個簡單的調(diào)度器,大家有興趣可以自己去嘗試皆看。

協(xié)程任務(wù)執(zhí)行環(huán)境-Dispatcher(調(diào)度器)

public abstract class CoroutineDispatcher :
    AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
    ...
    public abstract fun dispatch(context: CoroutineContext, block: Runnable)
    ...
}

它本身是協(xié)程上下文的子類仓坞,同時實現(xiàn)了攔截器的接口, dispatch 方法會在攔截器的方法 interceptContinuation 中調(diào)用腰吟,進而實現(xiàn)協(xié)程的調(diào)度无埃。所以如果我們想要實現(xiàn)自己的調(diào)度器,繼承這個類就可以了蝎困,不過通常我們都用現(xiàn)成的
現(xiàn)成的調(diào)度器有:

調(diào)度器名稱 使用線程
Main UI線程
IO 線程池
Default 線程池
Unconfined 直接執(zhí)行

也能使用這種方式簡單的創(chuàng)建一個自定義的協(xié)程調(diào)度器

val myDispatcher= Executors.newSingleThreadExecutor{ r -> Thread(r, "MyThread") }.asCoroutineDispatcher()

這里我們主要看一下我們最常使用的Dispatcher.Main和Dispatcher.IO兩個派發(fā)器录语。


image.png

Dispatcher.Main

Dispatcher.Main沒有默認實現(xiàn),依賴于各個平臺的實現(xiàn)禾乘,如果沒有引入android依賴包澎埠,則會拋異常提示,那么kotlin是怎么支持這種動態(tài)的類呢始藕?

  1. 首先kotlin提供了一個工廠類接口蒲稳,用來創(chuàng)建MainDispatcher
public interface MainDispatcherFactory {
    fun createDispatcher(allFactories: List<MainDispatcherFactory>): MainCoroutineDispatcher
}
  1. 然后再看反編譯的源碼
public final <S> List<S> loadProviders$kotlinx_coroutines_core(@NotNull Class<S> paramClass, @NotNull ClassLoader paramClassLoader) {
    //從apk的META-INF/services/文件夾下那類名
    StringBuilder stringBuilder = new StringBuilder();
    stringBuilder.append("META-INF/services/");
    stringBuilder.append(paramClass.getName());
    Enumeration enumeration = paramClassLoader.getResources(stringBuilder.toString());
    ArrayList arrayList = Collections.list(enumeration);
    Iterable iterable = (Iterable)arrayList;
    Collection collection = (Collection)new ArrayList();
    for (URL uRL : iterable) {
      FastServiceLoader fastServiceLoader = INSTANCE;
      Intrinsics.checkExpressionValueIsNotNull(uRL, "it");
      CollectionsKt.addAll(collection, (Iterable)fastServiceLoader.parse(uRL));
    } 
    collection = CollectionsKt.toSet((Iterable)collection);
    iterable = (Iterable)collection;
    collection = (Collection)new ArrayList(CollectionsKt.collectionSizeOrDefault(iterable, 10));
    //將類名解析為實例對象
    for (String str : iterable)
      collection.add(INSTANCE.getProviderInstance(str, paramClassLoader, paramClass)); 
    return (List)collection;
  }

MainDispatcher的factory會從apk的META-INF/services/文件夾下獲取。

  1. 再看編譯生成的apk文件的該文件夾內(nèi)容


    image.png

    所以android的依賴包是通過向該文件注冊類名實現(xiàn)的注冊類伍派,并且factory類為AndroidDispatcherFactory江耀。

  2. 最后我們再來看下AndroidDispatcherFactory類
internal class AndroidDispatcherFactory : MainDispatcherFactory {
    override fun createDispatcher(allFactories: List<MainDispatcherFactory>) = HandlerContext(Looper.getMainLooper().asHandler(async = true), "Main")
}
internal class HandlerContext private constructor(
    private val handler: Handler,
    private val name: String?,
    private val invokeImmediately: Boolean
) : HandlerDispatcher(), Delay {
    public constructor(
        handler: Handler,
        name: String? = null
    ) : this(handler, name, false)

    //android中需要向主looper進行提交調(diào)度
    override fun isDispatchNeeded(context: CoroutineContext): Boolean {
        return !invokeImmediately || Looper.myLooper() != handler.looper
    }

    //通過持有主線程looper的handler進行調(diào)度
    override fun dispatch(context: CoroutineContext, block: Runnable) {
        handler.post(block)
    }
    ...
}

很清楚,就是用持有主線程looper的handler進行任務(wù)的調(diào)度诉植,確保任務(wù)會在主線程執(zhí)行祥国。

Dispatcher.IO

internal object DefaultScheduler : ExperimentalCoroutineDispatcher() {
    val IO = blocking(systemProp(IO_PARALLELISM_PROPERTY_NAME, 64.coerceAtLeast(AVAILABLE_PROCESSORS)))
    public fun blocking(parallelism: Int = BLOCKING_DEFAULT_PARALLELISM): CoroutineDispatcher {
        return LimitingDispatcher(this, parallelism, TaskMode.PROBABLY_BLOCKING)
    }
}

Dispatcher.IO是一個LimitingDispatcher實例,他可以控制同時并發(fā)任務(wù)數(shù)晾腔,默認為64個舌稀,即最多有64個任務(wù)同時在運行。

private class LimitingDispatcher(
    val dispatcher: ExperimentalCoroutineDispatcher,
    val parallelism: Int,
    override val taskMode: TaskMode
) : ExecutorCoroutineDispatcher()

而LimitingDispatcher內(nèi)部真正調(diào)度任務(wù)的dispatcher是一個ExperimentalCoroutineDispatcher對象灼擂。

open class ExperimentalCoroutineDispatcher(
    private val corePoolSize: Int,
    private val maxPoolSize: Int,
    private val idleWorkerKeepAliveNs: Long,
    private val schedulerName: String = "CoroutineScheduler"
) : ExecutorCoroutineDispatcher() {
    constructor(
        corePoolSize: Int = CORE_POOL_SIZE,
        maxPoolSize: Int = MAX_POOL_SIZE,
        schedulerName: String = DEFAULT_SCHEDULER_NAME
    ) : this(corePoolSize, maxPoolSize, IDLE_WORKER_KEEP_ALIVE_NS, schedulerName)

    private var coroutineScheduler = createScheduler()

    override fun dispatch(context: CoroutineContext, block: Runnable): Unit =
        try {
            coroutineScheduler.dispatch(block)
        } catch (e: RejectedExecutionException) {
            DefaultExecutor.dispatch(context, block)
        }

    private fun createScheduler() = CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName)
}

我們看到壁查,該dispatcher里面的真正的線程池,是CoroutineScheduler對象剔应,而核心線程數(shù)和最大線程數(shù)睡腿,取決于可用CPU的數(shù)量语御。

internal val CORE_POOL_SIZE = systemProp(
    "kotlinx.coroutines.scheduler.core.pool.size",
    AVAILABLE_PROCESSORS.coerceAtLeast(2), // !!! at least two here
    minValue = CoroutineScheduler.MIN_SUPPORTED_POOL_SIZE
)
internal val AVAILABLE_PROCESSORS = Runtime.getRuntime().availableProcessors()

協(xié)程調(diào)度器-CoroutineScheduler

這里我們挑幾個小細節(jié)看一下CoroutineScheduler是如何來優(yōu)化對線程的使用的。

i. 盡量使用當(dāng)前線程

private fun submitToLocalQueue(task: Task, fair: Boolean): Int {
        val worker = currentWorker() ?: return NOT_ADDED
            ...
        worker.localQueue.add(task, globalQueue)
            ...
}
private fun currentWorker(): Worker? = (Thread.currentThread() as? Worker)?.takeIf { it.scheduler == this }

如果當(dāng)前線程是Dispatcher.IO開啟的工作線程席怪,那么任務(wù)優(yōu)先交由該線程的任務(wù)隊列应闯,等待處理。

ii. 雙重隊列

fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, fair: Boolean = false) {
  ...
  when (submitToLocalQueue(task, fair)) {
    ADDED -> return
    NOT_ADDED -> {
      //本地隊列已滿何恶,放入全局隊列孽锥,所有線程可取
      globalQueue.addLast(task)
    }
    else -> requestCpuWorker() // ask for help
  }
}

如果工作線程本地隊列無限大,一味的放入本地隊列的話细层,可能會造成單一線程工作惜辑,效率極低,于是每個工作線程有固定大小的queue疫赎,滿了之后盛撑,會放到全局queue中,等待任意空閑工作線程執(zhí)行捧搞。

iii.搶占其他線程的任務(wù)

//工作線程Worker類
override fun run() {
  while (!isTerminated && state != WorkerState.TERMINATED) {
    val task = findTask()
    ...
  }
  ...
}
private fun findTaskWithCpuPermit(): Task? {
    ...
  //從本地queue獲取任務(wù)
  localQueue.poll()?.let { return it }
  //從全局queue獲取任務(wù)
  if (!globalFirst) globalQueue.removeFirstOrNull()?.let { return it }
  //搶占其他線程任務(wù)
  return trySteal()
}
private fun trySteal(): Task? {
  ...
  //隨機一個工作線程
  if (stealIndex == 0) stealIndex = nextInt(created)
  ...
  val worker = workers[stealIndex]
  if (worker !== null && worker !== this) {
    //將其queue里的任務(wù)放到自己queue中
    if (localQueue.trySteal(worker.localQueue, globalQueue)) {
      return localQueue.poll()
    }
  }
  return null
}

如果一個工作線程的本地queue和全局queue都沒有任務(wù)了抵卫,但是其他線程的queue還有任務(wù),此時讓其空閑胎撇,一是沒有充分利用線程提升工作效率介粘,二是線程的空閑狀態(tài)切換需要開銷,所以此時會嘗試從任一工作線程的queue中取出任務(wù)晚树,放入自己的queue中執(zhí)行姻采。

以上三點的相互配合,可以充分利用線程資源爵憎,避免過多線程的使用及開銷慨亲,也保證了多任務(wù)時的工作效率。

協(xié)程執(zhí)行過程源碼追蹤分析

我們以一個請求數(shù)據(jù)后在主線程更新界面的代碼來進行分析

 fun setUpUI(){
        GlobalScope.launch(Main) { 
            val dataDeferred  = requestDataAsync()
            doSomethingElse()
            val data = dataDeferred.await()
            processData(data)
        }
        Thread.sleep(1000)
        doSomethingElse2()
    }

    fun requestDataAsync():Deferred<String>{
        // 啟動一個異步協(xié)程去執(zhí)行耗時任務(wù)
        return GlobalScope.async { 
            requestData()
        }
    }  

    fun doSomethingElse2(){
        println("doSomethingElse2")
    }

編譯后生成偽代碼

final void setUpUI() {
        BuildersKt__Builders_commonKt.launch$default(GlobalScope.INSTANCE, 
        Dispatchers.getMain(), 
        null,
        // 傳入的是一個 KotlinTest$setUpUI.KotlinTest$setUpUI$1 對象
        (Function2)new KotlinTest$setUpUI.KotlinTest$setUpUI$1(this, (Continuation)null), 2, null);
        this.doSomethingElse2();
}

final class setUpUI$1 extends SuspendLambda implements Function2{
    public final Object invokeSuspend(Object result) {
        switch (this.label) {
            case 0:
                doSomethingElse()
                // 新建并啟動 async 協(xié)程
                Deferred async$default = BuildersKt.async$default(coroutineScope, (CoroutineContext) Dispatchers.getDefault(), null, (Function2) new 1(null), 2, null);
                this.label = 1;
                // 如果 async 協(xié)程還沒完成為掛起狀態(tài) 則直接返回宝鼓,等待下次喚醒重入
                if (async$default.await(this) == coroutine_suspended) {
                    return coroutine_suspended;
                }
                break;
            case 1:
                val data = result;
                processData(data)
                break;
        }
    }
}

可以看到傳入到 launch 函數(shù)第四個參數(shù)位置的是一個編譯后生成的 SuspendLambda 類實例setUpUI$1刑棵,SuspendLambda 本質(zhì)上是一個續(xù)體 Continuation,而 Continuation 是一個有著恢復(fù)操作的接口

/**
 * 在一個掛起點之后可以返回類型T值的續(xù)集continuation的接口
 * Interface representing a continuation after a suspension point that returns value of type `T`.
 */
@SinceKotlin("1.3")
public interface Continuation<in T> {
    /**
     * Context of the coroutine that corresponds to this continuation.
     */
    // todo: shall we provide default impl with EmptyCoroutineContext?
    public val context: CoroutineContext

    /**
     * Resumes the execution of the corresponding coroutine passing successful or failed [result] as the
     * return value of the last suspension point.
     */
    public fun resumeWith(result: Result<T>)
}

SuspendLambda 繼承結(jié)構(gòu)如下
SuspendLambda > ContinuationImpl > BaseContinuationImpl > Continuation

每一層封裝對應(yīng)添加了不同的功能愚铡,我們先忽略掉這些功能細節(jié)蛉签,著眼于我們的主線,繼續(xù)跟進launch 函數(shù)執(zhí)行過程沥寥,由于第二個參數(shù)是默認值碍舍,所以創(chuàng)建的是 StandaloneCoroutine, 調(diào)用鏈如下:

coroutine.start(start, coroutine, block)
-> CoroutineStart.start(block, receiver, this)
-> CoroutineStart.invoke(block: suspend () -> T, completion: Continuation<T>)
-> block.startCoroutineCancellable(completion)
-> createCoroutineUnintercepted(completion).intercepted().resumeCancellable(Unit)
我們看最后創(chuàng)建了一個協(xié)程营曼,并鏈式調(diào)用 intercepted、resumeCancellable 方法愚隧,利用協(xié)程上下文中的續(xù)體攔截器 ContinuationInterceptor 對協(xié)程的執(zhí)行進行攔截蒂阱,intercepted 實際上調(diào)用的是 ContinuationImpl 的 intercepted 方法

internal abstract class ContinuationImpl(
    completion: Continuation<Any?>?,
    private val _context: CoroutineContext?
) : BaseContinuationImpl(completion) {
    ...
    public fun intercepted(): Continuation<Any?> =
        intercepted
            ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
                .also { intercepted = it }
    ...
}

context[ContinuationInterceptor]?.interceptContinuation調(diào)用的是 CoroutineDispatcher 的 interceptContinuation 方法

public final <T> Continuation<T> interceptContinuation(@NotNull final Continuation<? super T> continuation) {
        Intrinsics.checkParameterIsNotNull(continuation, "continuation");
        return new DispatchedContinuation<T>(this, continuation);
    }

最終創(chuàng)建了一個 DispatchedContinuation 可分發(fā)的協(xié)程實例锻全,我們繼續(xù)看resumeCancellable 方法

internal fun <T> Continuation<T>.resumeCancellable(value: T) = when (this) {
    // 判斷是否是DispatchedContinuation 根據(jù)我們前面的代碼追蹤 這里是DispatchedContinuation
    is DispatchedContinuation -> resumeCancellable(value)
    else -> resume(value)
}

inline fun resumeCancellable(value: T) {
        // 判斷是否需要線程調(diào)度 
        if (dispatcher.isDispatchNeeded(context)) {
            _state = value
            resumeMode = MODE_CANCELLABLE
            dispatcher.dispatch(context, this)
        } else {
            UndispatchedEventLoop.execute(this, value, MODE_CANCELLABLE) {
                if (!resumeCancelled()) {
                    resumeUndispatched(value)
                }
            }
        }
    }

最終走到 dispatcher.dispatch(context, this) 而這里的 dispatcher 就是通過工廠方法創(chuàng)建的 HandlerDispatcher ,dispatch() 函數(shù)第二個參數(shù)this是一個runnable這里為 DispatchedTask

HandlerDispatcher

/**
 * Implements [CoroutineDispatcher] on top of an arbitrary Android [Handler].
 */
internal class HandlerContext private constructor(
    private val handler: Handler,
    private val name: String?,
    private val invokeImmediately: Boolean
) : HandlerDispatcher(), Delay {
    ...
    //  最終執(zhí)行這里的 dispatch方法 而handler則是android中的 MainHandler
    override fun dispatch(context: CoroutineContext, block: Runnable) {
        handler.post(block)
    }
    ...
}

這里借用 Android 的主線程消息隊列來在主線程中執(zhí)行 block Runnable 而這個 Runnable 即為 DispatchedTask

internal abstract class DispatchedTask<in T>(
    @JvmField var resumeMode: Int
) : SchedulerTask() {
    ...
    public final override fun run() {
          ...
            withCoroutineContext(context, delegate.countOrElement) {
                if (job != null && !job.isActive)
                    // 異常情況下
                    continuation.resumeWithException(job.getCancellationException())
                else {
                    val exception = getExceptionalResult(state)
                    if (exception != null)
                        // 異常情況下
                        continuation.resumeWithStackTrace(exception)
                    else
                        // 正常情況下走到這一步
                        continuation.resume(getSuccessfulResult(state))
                }
            }
             ...
    }
}

@InlineOnly public inline fun <T> Continuation<T>.resume(value: T): Unit =
    resumeWith(Result.success(value))

internal abstract class BaseContinuationImpl(...) {
    // 實現(xiàn) Continuation 的 resumeWith录煤,并且是 final 的鳄厌,不可被重寫
    public final override fun resumeWith(result: Result<Any?>) {
        ...
        val outcome = invokeSuspend(param)
        ...
    }
    // 由編譯生成的協(xié)程相關(guān)類來實現(xiàn),例如 setUpUI$1
    protected abstract fun invokeSuspend(result: Result<Any?>): Any?
}

最終調(diào)用到 continuation.resumeWith() 而 resumeWith() 中會調(diào)用 invokeSuspend妈踊,即之前編譯器生成的 SuspendLambda 中的 invokeSuspend 方法

final class setUpUI$1 extends SuspendLambda implements Function2{
    public final Object invokeSuspend(Object result) {
        switch (this.label) {
            case 0:
                doSomethingElse()
                // 新建并啟動 async 協(xié)程
                Deferred async$default = BuildersKt.async$default(coroutineScope, (CoroutineContext) Dispatchers.getDefault(), null, (Function2) new 1(null), 2, null);
                this.label = 1;
                // 如果 async 協(xié)程還沒完成為掛起狀態(tài) 則直接返回了嚎,等待下次喚醒重入
                if (async$default.await(this) == coroutine_suspended) {
                    return coroutine_suspended;
                }
                break;
            case 1:
                val data = result;
                processData(data)
                break;
        }
    }
}

這段代碼是一個狀態(tài)機機制,每一個掛起點都是一種狀態(tài)廊营,協(xié)程恢復(fù)只是跳轉(zhuǎn)到下一個狀態(tài)歪泳,掛起點將執(zhí)行過程分割成多個片段,利用狀態(tài)機的機制保證各個片段按順序執(zhí)行露筒。

如果沒有掛起點就只有一個初始狀態(tài)呐伞,類似于callback回調(diào),所以對應(yīng)了之前我們分析的非阻塞的異步底層實現(xiàn)其實也是一種callback回調(diào)慎式,只不過有多個掛起點時就會有多個callback回調(diào)伶氢,我們把多個callback回調(diào)封裝成了一個狀態(tài)機。

協(xié)程的掛起

從協(xié)程的調(diào)度過程我們知道瘪吏,調(diào)度后會到編譯器生成的 SuspendLambda 的 invokeSuspend 方法中的一個掛起方法癣防,以例子中的await為例

if (async$default.await(this) == coroutine_suspended) {
        //目前還在掛起中,則return等待掛起結(jié)束后的invokeSuspend
        return coroutine_suspended;
}

async 也是一個協(xié)程掌眠,如果狀態(tài)為掛起coroutine_suspended蕾盯,則執(zhí)行流直接 return 返回,如果已達到完成狀態(tài)直接跳轉(zhuǎn)下一個狀態(tài) case 1 最終走完整個協(xié)程代碼塊扇救。

這里需要注意的是:

啟動一個新的協(xié)程并不會掛起當(dāng)前協(xié)程刑枝,只有當(dāng)使用庫函數(shù) await、yield方法時才會將當(dāng)前的協(xié)程掛起迅腔。
協(xié)程掛起并不會阻塞線程装畅,線程在掛起點 return 后可以去執(zhí)行其他的代碼塊。
協(xié)程的掛起過程很簡單沧烈,代碼塊直接返回掠兄,當(dāng)前狀態(tài)保存在狀態(tài)機 SuspendLambda 中,可以想象到協(xié)程恢復(fù)的時候也是調(diào)用 SuspendLambda 的 invokeSuspend 從而進入下一個狀態(tài)繼續(xù)執(zhí)行的锌雀。

delay 的實現(xiàn)

public suspend fun delay(timeMillis: Long) {
    if (timeMillis <= 0) return // don't delay
    return suspendCancellableCoroutine sc@ { cont: CancellableContinuation<Unit> ->
        cont.context.delay.scheduleResumeAfterDelay(timeMillis, cont)
    }
}

/** Returns [Delay] implementation of the given context */
internal val CoroutineContext.delay: Delay get() = get(ContinuationInterceptor) as? Delay ?: DefaultDelay

internal actual val DefaultDelay: Delay = DefaultExecutor

delay 使用suspendCancellableCoroutine掛起協(xié)程蚂夕,而協(xié)程恢復(fù)的一般情況下是關(guān)鍵在DefaultExecutor.scheduleResumeAfterDelay(),其中實現(xiàn)是schedule(DelayedResumeTask(timeMillis, continuation))腋逆,其中的關(guān)鍵邏輯是將 DelayedResumeTask 放到 DefaultExecutor 的隊列最后婿牍,在延遲的時間到達就會執(zhí)行 DelayedResumeTask,那么該 task 里面的實現(xiàn)是什么:

override fun run() {
    // 直接在調(diào)用者線程恢復(fù)協(xié)程
    with(cont) { resumeUndispatched(Unit) }
}

yield 的實現(xiàn)

yield()的作用是掛起當(dāng)前協(xié)程惩歉,然后將協(xié)程分發(fā)到 Dispatcher 的隊列等脂,這樣可以讓該協(xié)程所在線程或線程池可以運行其他協(xié)程邏輯俏蛮,然后在 Dispatcher 空閑的時候繼續(xù)執(zhí)行原來協(xié)程。簡單的來說就是讓出自己的執(zhí)行權(quán)上遥,給其他協(xié)程使用搏屑,當(dāng)其他協(xié)程執(zhí)行完成或也讓出執(zhí)行權(quán)時,一開始的協(xié)程可以恢復(fù)繼續(xù)運行粉楚。
看下面的代碼示例:

fun main(args: Array<String>) = runBlocking<Unit> {
    launch {
        repeat(3) {
            println("job1 repeat $it times")
            yield()
        }
    }
    launch {
        repeat(3) {
            println("job2 repeat $it times")
            yield()
        }
    }
}

通過yield()實現(xiàn) job1 和 job2 兩個協(xié)程交替運行辣恋,輸出如下:

job1 repeat 0 times
job2 repeat 0 times
job1 repeat 1 times
job2 repeat 1 times
job1 repeat 2 times
job2 repeat 2 times

現(xiàn)在來看其實現(xiàn):

public suspend fun yield(): Unit = suspendCoroutineUninterceptedOrReturn sc@ { uCont ->
    val context = uCont.context
    // 檢測協(xié)程是否已經(jīng)取消或者完成,如果是的話拋出 CancellationException
    context.checkCompletion()
    // 如果協(xié)程沒有線程調(diào)度器模软,或者像 Dispatchers.Unconfined 一樣沒有進行調(diào)度伟骨,則直接返回
    val cont = uCont.intercepted() as? DispatchedContinuation<Unit> ?: return@sc Unit
    if (!cont.dispatcher.isDispatchNeeded(context)) return@sc Unit
    // dispatchYield(Unit) 最終會調(diào)用到 dispatcher.dispatch(context, block) 將協(xié)程分發(fā)到調(diào)度器隊列中,這樣線程可以執(zhí)行其他協(xié)程
    cont.dispatchYield(Unit)
    COROUTINE_SUSPENDED
}

所以注意到撵摆,yield()需要依賴協(xié)程的線程調(diào)度器底靠,而調(diào)度器再次執(zhí)行該協(xié)程時,會調(diào)用resume來恢復(fù)協(xié)程運行特铝。

現(xiàn)在來看封裝異步邏輯為掛起函數(shù)的關(guān)鍵是用suspendCoroutineUninterceptedOrReturn函數(shù)包裝暑中,然后在異步邏輯完成時調(diào)用resume手動恢復(fù)協(xié)程。

協(xié)程工作流程圖
image.png
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末鲫剿,一起剝皮案震驚了整個濱河市鳄逾,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌灵莲,老刑警劉巖雕凹,帶你破解...
    沈念sama閱讀 218,122評論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異政冻,居然都是意外死亡枚抵,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,070評論 3 395
  • 文/潘曉璐 我一進店門明场,熙熙樓的掌柜王于貴愁眉苦臉地迎上來汽摹,“玉大人,你說我怎么就攤上這事苦锨”破” “怎么了?”我有些...
    開封第一講書人閱讀 164,491評論 0 354
  • 文/不壞的土叔 我叫張陵舟舒,是天一觀的道長拉庶。 經(jīng)常有香客問我,道長秃励,這世上最難降的妖魔是什么氏仗? 我笑而不...
    開封第一講書人閱讀 58,636評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮夺鲜,結(jié)果婚禮上皆尔,老公的妹妹穿的比我還像新娘帚稠。我一直安慰自己,他們只是感情好床佳,可當(dāng)我...
    茶點故事閱讀 67,676評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著榄审,像睡著了一般砌们。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上搁进,一...
    開封第一講書人閱讀 51,541評論 1 305
  • 那天浪感,我揣著相機與錄音,去河邊找鬼饼问。 笑死影兽,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的莱革。 我是一名探鬼主播峻堰,決...
    沈念sama閱讀 40,292評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼盅视!你這毒婦竟也來了捐名?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,211評論 0 276
  • 序言:老撾萬榮一對情侶失蹤闹击,失蹤者是張志新(化名)和其女友劉穎镶蹋,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體赏半,經(jīng)...
    沈念sama閱讀 45,655評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡贺归,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,846評論 3 336
  • 正文 我和宋清朗相戀三年还惠,在試婚紗的時候發(fā)現(xiàn)自己被綠了接奈。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,965評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡比原,死狀恐怖瑰枫,靈堂內(nèi)的尸體忽然破棺而出踱葛,到底是詐尸還是另有隱情,我是刑警寧澤光坝,帶...
    沈念sama閱讀 35,684評論 5 347
  • 正文 年R本政府宣布尸诽,位于F島的核電站,受9級特大地震影響盯另,放射性物質(zhì)發(fā)生泄漏性含。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,295評論 3 329
  • 文/蒙蒙 一鸳惯、第九天 我趴在偏房一處隱蔽的房頂上張望商蕴。 院中可真熱鬧叠萍,春花似錦、人聲如沸绪商。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,894評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽格郁。三九已至腹殿,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間例书,已是汗流浹背锣尉。 一陣腳步聲響...
    開封第一講書人閱讀 33,012評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留决采,地道東北人自沧。 一個月前我還...
    沈念sama閱讀 48,126評論 3 370
  • 正文 我出身青樓,卻偏偏與公主長得像树瞭,于是被迫代替她去往敵國和親拇厢。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,914評論 2 355

推薦閱讀更多精彩內(nèi)容