【kotlin】- 攜程的執(zhí)行流程

簡(jiǎn)介

這篇文章將從源碼的角度,分析攜程的執(zhí)行流程灾茁,我們創(chuàng)建一個(gè)攜程窜觉,系統(tǒng)是怎么進(jìn)行調(diào)度的谷炸,什么時(shí)候執(zhí)行的,是否需要?jiǎng)?chuàng)建新線程等等禀挫,帶著這些疑問(wèn)旬陡,一起往下看吧。

例子先行

fun main(): Unit = runBlocking {
    launch {
        println("${treadName()}======1")
    }
    GlobalScope.launch {
        println("${treadName()}======3")
    }
    launch {
        println("${treadName()}======2")
    }
    println("${treadName()}======4")
    Thread.sleep(2000)
}

輸出如下:

DefaultDispatcher-worker-1======3
main======4
main======1
main======2

Process finished with exit code 0

根據(jù)打印语婴,如果根據(jù)單線程執(zhí)行流程來(lái)看描孟,是不是感覺(jué)上面的日志打印順序有點(diǎn)不好理解,下面我們就逐步來(lái)進(jìn)行分解砰左。

  • runBlocking攜程體
    這里將其它代碼省略到了匿醒,我這里都是按照一條簡(jiǎn)單的執(zhí)行流程進(jìn)行講解。

    public fun <T> runBlocking(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T): T {
    
        val eventLoop: EventLoop?
        val newContext: CoroutineContext
        ...
        if (contextInterceptor == null) {
            eventLoop = ThreadLocalEventLoop.eventLoop
            newContext = GlobalScope.newCoroutineContext(context + eventLoop)
        }
        ...
        val coroutine = BlockingCoroutine<T>(newContext, currentThread, eventLoop)
        coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
        return coroutine.joinBlocking()
    }
    

    看一下eventLoop的初始化,會(huì) 在當(dāng)前線程(主線程)創(chuàng)建BlockingEventLoop對(duì)象缠导。

    internal val eventLoop: EventLoop
          get() = ref.get() ?: createEventLoop().also { ref.set(it) }
    
    internal actual fun createEventLoop(): EventLoop = BlockingEventLoop(Thread.currentThread())
    

    看一下newContext初始化,這里會(huì)對(duì)攜程上下文進(jìn)行組合廉羔,返回新的上下文。最后返回的是一個(gè)BlockingEventLoop對(duì)象僻造。

    public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext {
       val combined = coroutineContext + context
        val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined
        return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null)
          debug + Dispatchers.Default else debug
    }
    

    開(kāi)始對(duì)攜程進(jìn)行調(diào)度

     coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
    

    看一下執(zhí)行這句代碼之前蜜另,各變量的值

    111.png

    而上面的代碼最終調(diào)用的是CoroutineStart.DEFAULTinvoke方法。

      public operator fun <T> invoke(block: suspend () -> T, completion: Continuation<T>): Unit =
          when (this) {
              DEFAULT -> block.startCoroutineCancellable(completion)
              ATOMIC -> block.startCoroutine(completion)
              UNDISPATCHED -> block.startCoroutineUndispatched(completion)
              LAZY -> Unit // will start lazily
          }
    

    我們使用的是DEFAULT啟動(dòng)模式嫡意。然后會(huì)執(zhí)行resumeCancellableWith方法。

      inline fun resumeCancellableWith(
          result: Result<T>,
          noinline onCancellation: ((cause: Throwable) -> Unit)?
      ) {
          val state = result.toState(onCancellation)
          if (dispatcher.isDispatchNeeded(context)) {
              _state = state
              resumeMode = MODE_CANCELLABLE
              dispatcher.dispatch(context, this)
          } else {
              executeUnconfined(state, MODE_CANCELLABLE) {
                  if (!resumeCancelled(state)) {
                      resumeUndispatchedWith(result)
                  }
              }
          }
      }
    

    dispatcherBlockingEventLoop對(duì)象捣辆,沒(méi)有重寫(xiě)isDispatchNeeded蔬螟,默認(rèn)返回true。然后調(diào)用dispatch繼續(xù)進(jìn)行分發(fā)汽畴。BlockingEventLoop繼承了EventLoopImplBase并調(diào)用其dispatch方法旧巾。把任務(wù)加入到隊(duì)列中。

    public final override fun dispatch(context: CoroutineContext, block: Runnable) = enqueue(block)
    

    回到最開(kāi)始忍些,在coroutine.start(CoroutineStart.DEFAULT, coroutine, block)執(zhí)行完鲁猩,還執(zhí)行了coroutine.joinBlocking()看一下實(shí)現(xiàn)。

        fun joinBlocking(): T {
          registerTimeLoopThread()
          try {
              eventLoop?.incrementUseCount()
              try {
                  while (true) {
                      @Suppress("DEPRECATION")
                      if (Thread.interrupted()) throw InterruptedException().also { cancelCoroutine(it) }
                      val parkNanos = eventLoop?.processNextEvent() ?: Long.MAX_VALUE
                      // note: process next even may loose unpark flag, so check if completed before parking
                      if (isCompleted) break
                      parkNanos(this, parkNanos)
                  }
              } finally { // paranoia
                  eventLoop?.decrementUseCount()
              }
          } finally { // paranoia
              unregisterTimeLoopThread()
          }
          // now return result
          val state = this.state.unboxState()
          (state as? CompletedExceptionally)?.let { throw it.cause }
          return state as T
      }
    

    執(zhí)行val parkNanos = eventLoop?.processNextEvent() ?: Long.MAX_VALUE罢坝,取出任務(wù)進(jìn)行執(zhí)行廓握,也就是runBlocking攜程體。

  • launch {} 執(zhí)行流程

    public fun CoroutineScope.launch(
        context: CoroutineContext = EmptyCoroutineContext,
        start: CoroutineStart = CoroutineStart.DEFAULT,
        block: suspend CoroutineScope.() -> Unit
    ): Job {
        val newContext = newCoroutineContext(context)
        val coroutine = if (start.isLazy)
            LazyStandaloneCoroutine(newContext, block) else
            StandaloneCoroutine(newContext, active = true)
        coroutine.start(start, coroutine, block)
        return coroutine
    }
    

    因?yàn)?code>launch是直接在runBlocking(父攜程體)里新的創(chuàng)建的子攜程體嘁酿,所以執(zhí)行流程上和之前將的差不多隙券,只不過(guò)不會(huì)像runBlocking再去創(chuàng)建BlockingEventLoop對(duì)象,而是直接用runBlocking(父攜程體)的闹司,然后把任務(wù)加到里面娱仔,所以通過(guò)這種方式其實(shí)就是單線程對(duì)任務(wù)的調(diào)度而已。所以在runBlocking(父攜程體)內(nèi)通過(guò)launch啟動(dòng)再多的攜程體游桩,其實(shí)都是在同一線程牲迫,按照任務(wù)隊(duì)列的順序執(zhí)行的耐朴。

根據(jù)上面日志輸出,并沒(méi)有先執(zhí)行兩個(gè)launch攜程體盹憎,這是為什么呢筛峭,根據(jù)上面的講解,應(yīng)用知道脚乡,runBlocking(父攜程體)是第一被添加的隊(duì)列的任務(wù)蜒滩,其次是launch,所以是這樣的順序奶稠。那可以讓launch立即執(zhí)行嗎俯艰?答案是可以的,這就要說(shuō)攜程的啟動(dòng)模式了锌订。

  • CoroutineStart 是協(xié)程的啟動(dòng)模式竹握,存在以下4種模式:

    1. DEFAULT 立即調(diào)度,可以在執(zhí)行前被取消
    2. LAZY 需要時(shí)才啟動(dòng)辆飘,需要start啦辐、join等函數(shù)觸發(fā)才可進(jìn)行調(diào)度
    3. ATOMIC 立即調(diào)度,協(xié)程肯定會(huì)執(zhí)行蜈项,執(zhí)行前不可以被取消
    4. UNDISPATCHED 立即在當(dāng)前線程執(zhí)行芹关,直到遇到第一個(gè)掛起點(diǎn)(可能切線程)

    我們使用UNDISPATCHED就可以使攜程體馬上在當(dāng)前線程執(zhí)行〗糇洌看一下是怎么實(shí)現(xiàn)的侥衬。看一下實(shí)現(xiàn):

使用這種啟動(dòng)模式執(zhí)行UNDISPATCHED -> block.startCoroutineUndispatched(completion)方法跑芳。

internal fun <T> (suspend () -> T).startCoroutineUndispatched(completion: Continuation<T>) {
    startDirect(completion) { actualCompletion ->
        withCoroutineContext(completion.context, null) {
            startCoroutineUninterceptedOrReturn(actualCompletion)
        }
    }
}

大家可以自己點(diǎn)擊去看一下轴总,大概就是會(huì)立即執(zhí)行攜程體,而不是將任務(wù)放入隊(duì)列博个。

但是GlobalScope.launch卻不是按照這樣的邏輯怀樟,這是因?yàn)?code>GlobalScope.launch啟動(dòng)的全局?jǐn)y程,是一個(gè)獨(dú)立的攜程體了盆佣,并不是runBlocking(父攜程體)子攜程往堡。看一下通過(guò)GlobalScope.launch有什么不同罪塔。

  • GlobalScope.launch執(zhí)行流程
    1. 啟動(dòng)全局?jǐn)y程
    GlobalScope.launch
    
    newCoroutineContext(context)返回Dispatchers.Default對(duì)象投蝉。而DefaultScheduler繼承了ExperimentalCoroutineDispatcher類≌骺埃看一下ExperimentalCoroutineDispatcher中的dispatch代碼:
     override fun dispatch(context: CoroutineContext, block: Runnable): Unit =
         ...
             coroutineScheduler.dispatch(block)
         ...
    
    看一下coroutineScheduler初始化
    private fun createScheduler() = CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName)
    
    CoroutineScheduler實(shí)現(xiàn)了Executor接口瘩缆,里面還有兩個(gè)全局隊(duì)列和線程池相關(guān)的參數(shù)。
    @JvmField
    val globalCpuQueue = GlobalQueue()
    @JvmField
    val globalBlockingQueue = GlobalQueue()
    
    繼續(xù)調(diào)用CoroutineScheduler中的dispatch方法
      fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false) {
          trackTask() // this is needed for virtual time support
          val task = createTask(block, taskContext)
          // try to submit the task to the local queue and act depending on the result
          val currentWorker = currentWorker()
          val notAdded = currentWorker.submitToLocalQueue(task, tailDispatch)
          if (notAdded != null) {
              if (!addToGlobalQueue(notAdded)) {
                  // Global queue is closed in the last step of close/shutdown -- no more tasks should be accepted
                  throw RejectedExecutionException("$schedulerName was terminated")
              }
          }
          val skipUnpark = tailDispatch && currentWorker != null
          // Checking 'task' instead of 'notAdded' is completely okay
          if (task.mode == TASK_NON_BLOCKING) {
              if (skipUnpark) return
              signalCpuWork()
          } else {
              // Increment blocking tasks anyway
              signalBlockingWork(skipUnpark = skipUnpark)
          }
      }
    
    1. val task = createTask(block, taskContext)包裝成TaskImpl對(duì)象佃蚜。
    2. val currentWorker = currentWorker()當(dāng)前是主線程庸娱,運(yùn)行程序時(shí)由進(jìn)程創(chuàng)建着绊,肯定不是Worker對(duì)象,Worker是一個(gè)繼承了Thread的類 ,并且在初始化時(shí)都指定為守護(hù)線程熟尉。
      Worker存在5種狀態(tài):
      CPU_ACQUIRED 獲取到cpu權(quán)限
      BLOCKING 正在執(zhí)行IO阻塞任務(wù)
      PARKING 已處理完所有任務(wù)归露,線程掛起
      DORMANT 初始態(tài)
      TERMINATED 終止態(tài)
      
  1. val notAdded = currentWorker.submitToLocalQueue(task, tailDispatch)由于currentWorker是null,直接返回task對(duì)象斤儿。
  2. addToGlobalQueue(notAdded)根據(jù)任務(wù)是否是阻塞任務(wù)剧包,將task添加到全局任務(wù)隊(duì)列中。這里被添加到globalCpuQueue中往果。
  3. 執(zhí)行signalCpuWork()來(lái)喚醒一個(gè)線程或者啟動(dòng)一個(gè)新的線程疆液。
    fun signalCpuWork() {
      if (tryUnpark()) return
      if (tryCreateWorker()) return
      tryUnpark()
  }
 private fun tryCreateWorker(state: Long = controlState.value): Boolean {  
     val created = createdWorkers(state)// 創(chuàng)建的的線程總數(shù)  
     val blocking = blockingTasks(state)// 處理阻塞任務(wù)的線程數(shù)量  
     val cpuWorkers = (created - blocking).coerceAtLeast(0)//得到非阻塞任務(wù)的線程數(shù)量  
     if (cpuWorkers < corePoolSize) {// 小于核心線程數(shù)量,進(jìn)行線程的創(chuàng)建  
         val newCpuWorkers = createNewWorker()  
         if (newCpuWorkers == 1 && corePoolSize > 1) createNewWorker()// 當(dāng)前非阻塞型線程數(shù)量為1陕贮,同時(shí)核心線程數(shù)量大于1時(shí)堕油,再進(jìn)行一個(gè)線程的創(chuàng)建,  
         if (newCpuWorkers > 0) return true  
     }  
     return false  
 }  
   
  // 創(chuàng)建線程  
  private fun createNewWorker(): Int {  
     synchronized(workers) {  
         ...  
         val created = createdWorkers(state)// 創(chuàng)建的的線程總數(shù)  
         val blocking = blockingTasks(state)// 阻塞的線程數(shù)量  
         val cpuWorkers = (created - blocking).coerceAtLeast(0) // 得到非阻塞線程數(shù)量  
         if (cpuWorkers >= corePoolSize) return 0//超過(guò)最大核心線程數(shù)肮之,不能進(jìn)行新線程創(chuàng)建  
        if (created >= maxPoolSize) return 0// 超過(guò)最大線程數(shù)限制掉缺,不能進(jìn)行新線程創(chuàng)建  
         ...  
         val worker = Worker(newIndex)  
         workers[newIndex] = worker  
         require(newIndex == incrementCreatedWorkers())  
         worker.start()// 線程啟動(dòng)  
         return cpuWorkers + 1  
     }  
 }

那么這里面的任務(wù)又是怎么調(diào)度的呢,當(dāng)全局任務(wù)被執(zhí)行的時(shí)候戈擒,看一下Worker中的run方法:

 override fun run() = runWorker()

執(zhí)行runWorker方法眶明,該方法會(huì)從隊(duì)列中找到執(zhí)行任務(wù),然后開(kāi)始執(zhí)行筐高。詳細(xì)代碼赘来,可以自行翻閱。

所以GlobalScope.launch使用的就是線程池凯傲,沒(méi)有所謂的性能好。

  • Dispatchers調(diào)度器
    Dispatchers是協(xié)程中提供的線程調(diào)度器嗦篱,用來(lái)切換線程冰单,指定協(xié)程所運(yùn)行的線程。灸促,上面用的是默認(rèn)調(diào)度器Dispatchers.Default诫欠。

Dispatchers中提供了4種類型調(diào)度器:
Default 默認(rèn)調(diào)度器:適合CPU密集型任務(wù)調(diào)度器 比如邏輯計(jì)算;
Main UI調(diào)度器
Unconfined 無(wú)限制調(diào)度器:對(duì)協(xié)程執(zhí)行的線程不做限制浴栽,協(xié)程恢復(fù)時(shí)可以在任意線程荒叼;
IO調(diào)度器:適合IO密集型任務(wù)調(diào)度器 比如讀寫(xiě)文件,網(wǎng)絡(luò)請(qǐng)求等典鸡。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末被廓,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子萝玷,更是在濱河造成了極大的恐慌嫁乘,老刑警劉巖昆婿,帶你破解...
    沈念sama閱讀 211,123評(píng)論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異蜓斧,居然都是意外死亡仓蛆,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,031評(píng)論 2 384
  • 文/潘曉璐 我一進(jìn)店門挎春,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)看疙,“玉大人,你說(shuō)我怎么就攤上這事直奋∧芮欤” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 156,723評(píng)論 0 345
  • 文/不壞的土叔 我叫張陵帮碰,是天一觀的道長(zhǎng)相味。 經(jīng)常有香客問(wèn)我,道長(zhǎng)殉挽,這世上最難降的妖魔是什么丰涉? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 56,357評(píng)論 1 283
  • 正文 為了忘掉前任,我火速辦了婚禮斯碌,結(jié)果婚禮上一死,老公的妹妹穿的比我還像新娘。我一直安慰自己傻唾,他們只是感情好投慈,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,412評(píng)論 5 384
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著冠骄,像睡著了一般伪煤。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上凛辣,一...
    開(kāi)封第一講書(shū)人閱讀 49,760評(píng)論 1 289
  • 那天抱既,我揣著相機(jī)與錄音,去河邊找鬼扁誓。 笑死防泵,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的蝗敢。 我是一名探鬼主播捷泞,決...
    沈念sama閱讀 38,904評(píng)論 3 405
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼寿谴!你這毒婦竟也來(lái)了锁右?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 37,672評(píng)論 0 266
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎骡湖,沒(méi)想到半個(gè)月后贱纠,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,118評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡响蕴,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,456評(píng)論 2 325
  • 正文 我和宋清朗相戀三年谆焊,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片浦夷。...
    茶點(diǎn)故事閱讀 38,599評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡辖试,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出劈狐,到底是詐尸還是另有隱情罐孝,我是刑警寧澤,帶...
    沈念sama閱讀 34,264評(píng)論 4 328
  • 正文 年R本政府宣布肥缔,位于F島的核電站莲兢,受9級(jí)特大地震影響初肉,放射性物質(zhì)發(fā)生泄漏膝藕。R本人自食惡果不足惜闹炉,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,857評(píng)論 3 312
  • 文/蒙蒙 一隘梨、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧拼苍,春花似錦玖姑、人聲如沸端朵。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,731評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至鸥咖,卻和暖如春燕鸽,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背啼辣。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,956評(píng)論 1 264
  • 我被黑心中介騙來(lái)泰國(guó)打工绵咱, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人熙兔。 一個(gè)月前我還...
    沈念sama閱讀 46,286評(píng)論 2 360
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像艾恼,于是被迫代替她去往敵國(guó)和親住涉。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,465評(píng)論 2 348

推薦閱讀更多精彩內(nèi)容

  • 為什么要搞出和用協(xié)程呢 是節(jié)省CPU钠绍,避免系統(tǒng)內(nèi)核級(jí)的線程頻繁切換舆声,造成的CPU資源浪費(fèi)。好鋼用在刀刃上。而協(xié)程是...
    靜默的小貓閱讀 632評(píng)論 0 2
  • kotlin 1.3出來(lái)了媳握,而協(xié)程(coroutines)也正式發(fā)布穩(wěn)定版碱屁。雖然目前項(xiàng)目不是kotlin語(yǔ)言,但為...
    小小的coder閱讀 949評(píng)論 0 0
  • [TOC] 簡(jiǎn)介 Coroutines are computer program components that ...
    Whyn閱讀 5,899評(píng)論 5 15
  • 協(xié)程(Coroutine) 協(xié)程引入 異步加載圖片 普通代碼:val view = ...loadImageAsy...
    晨起清風(fēng)閱讀 1,269評(píng)論 0 1
  • 什么是協(xié)程蛾找? 官方描述:協(xié)程通過(guò)將復(fù)雜性放入庫(kù)來(lái)簡(jiǎn)化異步編程娩脾。程序的邏輯可以在協(xié)程中順序地表達(dá),而底層庫(kù)會(huì)為我們解...
    pureChild閱讀 836評(píng)論 0 1