庖丁解牛骇径,一文搞懂Kotlin協(xié)程的線程池

上兩篇文章梳理了協(xié)程的運(yùn)行原理豌注,因?yàn)榫€程池相對(duì)于協(xié)程實(shí)現(xiàn)來說是可以單獨(dú)拿出來講的,所以分析到線程池的時(shí)候沒有繼續(xù)深入,現(xiàn)在就單獨(dú)來看看協(xié)程線程池的實(shí)現(xiàn)。
協(xié)程線程池是由分發(fā)器Dispatchers來維護(hù)的,主要是Dispatchers.DEFAULT和Dispatchers.IO兩個(gè)分發(fā)器灵巧。

Dispatchers.DEFAULT

Dispatchers.DEFAULT它持有的線程池是CoroutineScheduler:

  【SchedulerCoroutineDispatcher】
    internal open class SchedulerCoroutineDispatcher(
    private val corePoolSize: Int = CORE_POOL_SIZE,
    private val maxPoolSize: Int = MAX_POOL_SIZE,
    private val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS,
    private val schedulerName: String = "CoroutineScheduler",
) : ExecutorCoroutineDispatcher() {
   private var coroutineScheduler = createScheduler()

    private fun createScheduler() =
        CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName)
    
    override fun dispatch(context: CoroutineContext, block: Runnable): Unit = coroutineScheduler.dispatch(block) // 線程池分發(fā)

    ...略
}

  【CoroutineScheduler】
   internal class CoroutineScheduler(
    @JvmField val corePoolSize: Int, // 核心線程數(shù)
    @JvmField val maxPoolSize: Int, // 最大線程數(shù)
    @JvmField val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS, // 空閑線程存活時(shí)間
    @JvmField val schedulerName: String = DEFAULT_SCHEDULER_NAME // 線程池名稱"CoroutineScheduler"
) : Executor, Closeable {
    // 用于存儲(chǔ)全局的純CPU(不阻塞)任務(wù)
    @JvmField
    val globalCpuQueue = GlobalQueue()

    // 用于存儲(chǔ)全局的執(zhí)行非純CPU(可能阻塞)任務(wù)
    @JvmField
    val globalBlockingQueue = GlobalQueue()

    // 用于記錄當(dāng)前處于Parked狀態(tài)(一段時(shí)間后自動(dòng)終止)的線程的數(shù)量
    private val parkedWorkersStack = atomic(0L)

    // 用于保存當(dāng)前線程池中的線程
    // workers[0]永遠(yuǎn)為null抹沪,作為哨兵位
    // index從1到maxPoolSize為有效線程
    @JvmField
    val workers = AtomicReferenceArray<Worker?>(maxPoolSize + 1)

    // 控制狀態(tài)
    private val controlState = atomic(corePoolSize.toLong() shl CPU_PERMITS_SHIFT)
    // 表示已經(jīng)創(chuàng)建的線程的數(shù)量
    private val createdWorkers: Int inline get() = (controlState.value and CREATED_MASK).toInt()
    // 表示可以獲取的CPU令牌數(shù)量刻肄,初始值為線程池核心線程數(shù)量
    private val availableCpuPermits: Int inline get() = availableCpuPermits(controlState.value)

    // 獲取指定的狀態(tài)的已經(jīng)創(chuàng)建的線程的數(shù)量
    private inline fun createdWorkers(state: Long): Int = (state and CREATED_MASK).toInt()
    // 獲取指定的狀態(tài)的執(zhí)行阻塞任務(wù)的數(shù)量
    private inline fun blockingTasks(state: Long): Int = (state and BLOCKING_MASK shr BLOCKING_SHIFT).toInt()
    // 獲取指定的狀態(tài)的CPU令牌數(shù)量
    public inline fun availableCpuPermits(state: Long): Int = (state and CPU_PERMITS_MASK shr CPU_PERMITS_SHIFT).toInt()

    // 當(dāng)前已經(jīng)創(chuàng)建的線程數(shù)量加1
    private inline fun incrementCreatedWorkers(): Int = createdWorkers(controlState.incrementAndGet())
    // 當(dāng)前已經(jīng)創(chuàng)建的線程數(shù)量減1
    private inline fun decrementCreatedWorkers(): Int = createdWorkers(controlState.getAndDecrement())
    // 當(dāng)前執(zhí)行阻塞任務(wù)的線程數(shù)量加1
    private inline fun incrementBlockingTasks() = controlState.addAndGet(1L shl BLOCKING_SHIFT)
    // 當(dāng)前執(zhí)行阻塞任務(wù)的線程數(shù)量減1
    private inline fun decrementBlockingTasks() {
        controlState.addAndGet(-(1L shl BLOCKING_SHIFT))
    }

    // 嘗試獲取CPU令牌
    private inline fun tryAcquireCpuPermit(): Boolean = controlState.loop { state ->
        val available = availableCpuPermits(state)
        if (available == 0) return false
        val update = state - (1L shl CPU_PERMITS_SHIFT)
        if (controlState.compareAndSet(state, update)) return true
    }
    // 釋放CPU令牌
    private inline fun releaseCpuPermit() = controlState.addAndGet(1L shl CPU_PERMITS_SHIFT)

    // 表示當(dāng)前線程池是否關(guān)閉
    private val _isTerminated = atomic(false)
    val isTerminated: Boolean get() = _isTerminated.value

companion object {
    // 用于標(biāo)記一個(gè)線程是否在parkedWorkersStack中(處于Parked狀態(tài))
    @JvmField
    val NOT_IN_STACK = Symbol("NOT_IN_STACK")

    // 線程的三個(gè)狀態(tài)
    // CLAIMED表示線程可以執(zhí)行任務(wù)
    // PARKED表示線程暫停執(zhí)行任務(wù),一段時(shí)間后會(huì)自動(dòng)進(jìn)入終止?fàn)顟B(tài)
    // TERMINATED表示線程處于終止?fàn)顟B(tài)
    private const val PARKED = -1
    private const val CLAIMED = 0
    private const val TERMINATED = 1

    // 以下五個(gè)常量為掩碼
    private const val BLOCKING_SHIFT = 21 // 2x1024x1024
    // 1-21位
    private const val CREATED_MASK: Long = (1L shl BLOCKING_SHIFT) - 1
    // 22-42位
    private const val BLOCKING_MASK: Long = CREATED_MASK shl BLOCKING_SHIFT
    // 42
    private const val CPU_PERMITS_SHIFT = BLOCKING_SHIFT * 2
    // 43-63位
    private const val CPU_PERMITS_MASK = CREATED_MASK shl CPU_PERMITS_SHIFT

    // 以下兩個(gè)常量用于require中參數(shù)判斷
    internal const val MIN_SUPPORTED_POOL_SIZE = 1
    // 2x1024x1024-2
    internal const val MAX_SUPPORTED_POOL_SIZE = (1 shl BLOCKING_SHIFT) - 2

    // parkedWorkersStack的掩碼
    private const val PARKED_INDEX_MASK = CREATED_MASK
    // inv表示01反轉(zhuǎn)
    private const val PARKED_VERSION_MASK = CREATED_MASK.inv()
    private const val PARKED_VERSION_INC = 1L shl BLOCKING_SHIFT
}

       
    

    fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false) {
        trackTask() // this is needed for virtual time support
        // 傳入ruanable和taskContext構(gòu)建一個(gè)Task實(shí)例融欧,taskContext決定線程是cpu密集型還是阻塞型敏弃,通過上面Dispatchers.DEFAULT的代碼可以知
        // 道,它分發(fā)任務(wù)的時(shí)候是創(chuàng)建的NonBlockingContext噪馏,也就是非阻塞型的
        val task = createTask(block, taskContext) 
        // 判斷當(dāng)前線程是否運(yùn)行在當(dāng)前線程池
        val currentWorker = currentWorker()
        // 嘗試加入本地隊(duì)列麦到,注意這個(gè)方法是Woker的擴(kuò)展方法绿饵,這個(gè)本地隊(duì)列是Woker的變量,
        // Woker是對(duì)Thread的一層封裝瓶颠,專門用于協(xié)程線程池里用的
        val notAdded = currentWorker.submitToLocalQueue(task, tailDispatch)
        // notAdded不為null拟赊,代表加入本地任務(wù)隊(duì)列失敗,也代表此時(shí)的線程不是運(yùn)行在線程池
        if (notAdded != null) {
            // 嘗試加入全局任務(wù)隊(duì)列粹淋,這里的全局指的是線程池里維護(hù)的兩個(gè)變量
            if (!addToGlobalQueue(notAdded)) {
                // Global queue is closed in the last step of close/shutdown -- no more tasks should be accepted
                throw RejectedExecutionException("$schedulerName was terminated")
            }
        }
        val skipUnpark = tailDispatch && currentWorker != null
        // Checking 'task' instead of 'notAdded' is completely okay
        // 如果任務(wù)是非阻塞任務(wù)吸祟,則喚醒cpu線程
        if (task.mode == TASK_NON_BLOCKING) {
            if (skipUnpark) return
            signalCpuWork()
        } else {
            // 否則就喚醒阻塞線程
            signalBlockingWork(skipUnpark = skipUnpark)
        }
    }
    // currentWorker方法
    private fun currentWorker(): Worker? = (Thread.currentThread() as? Worker)?.takeIf { it.scheduler == this }

    // Worker的擴(kuò)展函數(shù)submitToLocalQueue
    private fun CoroutineScheduler.Worker?.submitToLocalQueue(task: Task, tailDispatch: Boolean): Task? {
      // Worker 為空,直接返回任務(wù)本身
      if (this == null) return task
      // 非阻塞的任務(wù)且此時(shí)Worker處于阻塞狀態(tài)桃移,則直接返回
      if (task.mode == TASK_NON_BLOCKING && state === CoroutineScheduler.WorkerState.BLOCKING) {
        return task
      }
      //表示本地隊(duì)列里存有任務(wù)了
      mayHaveLocalTasks = true
      //加入到本地隊(duì)列里
      //localQueue 為Worker的成員變量
      return localQueue.add(task, fair = tailDispatch)
    }
    // addToGlobalQueue方法
    private fun addToGlobalQueue(task: Task): Boolean {
      return if (task.isBlocking) {
        //加入到全局阻塞隊(duì)列
        globalBlockingQueue.addLast(task)
    } else {
        //加入到全局cpu隊(duì)列
        globalCpuQueue.addLast(task)
      }
    }
    fun signalCpuWork() {
     //嘗試去喚醒正在掛起的線程欢搜,若是有線程可以被喚醒,則無需創(chuàng)建新線程
      if (tryUnpark()) return
      //若喚醒不成功谴轮,則需要嘗試創(chuàng)建線程
      if (tryCreateWorker()) return
      //再試一次,邊界條件
      tryUnpark()
    }
    // 嘗試創(chuàng)建線程的方法
    private fun tryCreateWorker(state: Long = controlState.value): Boolean {
     //獲取當(dāng)前已經(jīng)創(chuàng)建的線程數(shù)
      val created = createdWorkers(state)
      //獲取當(dāng)前阻塞的任務(wù)數(shù)
      val blocking = blockingTasks(state)
      //已創(chuàng)建的線程數(shù)-阻塞的任務(wù)數(shù)=非阻塞的線程數(shù)
      //coerceAtLeast(0) 表示結(jié)果至少是0
      val cpuWorkers = (created - blocking).coerceAtLeast(0)
      //如果非阻塞數(shù)小于核心線程數(shù)
    // 現(xiàn)在若是已經(jīng)創(chuàng)建了5個(gè)線程吹埠,而這幾個(gè)線程都在執(zhí)行IO任務(wù)第步,此時(shí)就需要再創(chuàng)建新的線程來執(zhí)行任務(wù),因?yàn)榇藭r(shí)CPU是空閑的缘琅。
     //只要非阻塞任務(wù)的個(gè)數(shù)小于核心線程數(shù)粘都,那么就需要?jiǎng)?chuàng)建新的線程,目的是為了充分利用CPU刷袍。
      if (cpuWorkers < corePoolSize) {
          //創(chuàng)建線程
          val newCpuWorkers = createNewWorker()
          //如果當(dāng)前只有一個(gè)非阻塞線程并且核心線程數(shù)>1翩隧,那么再創(chuàng)建一個(gè)線程
          //目的是為了方便"偷"任務(wù)...
          if (newCpuWorkers == 1 && corePoolSize > 1) createNewWorker()
          //創(chuàng)建成功
          if (newCpuWorkers > 0) return true
      }
      return false
    }
    // 創(chuàng)建線程的方法
    //workers 為Worker 數(shù)組,因?yàn)樾枰獙?duì)數(shù)組進(jìn)行add 操作呻纹,因此需要同步訪問
    private fun createNewWorker(): Int {
      synchronized(workers) {
        if (isTerminated) return -1
        val state = controlState.value
        //獲取已創(chuàng)建的線程數(shù)
        val created = createdWorkers(state)
        //阻塞的任務(wù)數(shù)
        val blocking = blockingTasks(state)
        //非阻塞的線程數(shù)
        val cpuWorkers = (created - blocking).coerceAtLeast(0)
        //非阻塞的線程數(shù)不能超過核心線程數(shù)
        if (cpuWorkers >= corePoolSize) return 0
        //已創(chuàng)建的線程數(shù)不能大于最大線程數(shù)
        if (created >= maxPoolSize) return 0
        val newIndex = createdWorkers + 1
        require(newIndex > 0 && workers[newIndex] == null)
        //構(gòu)造線程
        val worker = Worker(newIndex)
        //記錄到數(shù)組里
        workers[newIndex] = worker
        //記錄創(chuàng)建的線程數(shù)
        require(newIndex == incrementCreatedWorkers())
        //開啟線程
        worker.start()
        //當(dāng)前非阻塞線程數(shù)
        return cpuWorkers + 1
    }
}

    ...略
}

通過以上分析堆生,知道有三個(gè)任務(wù)隊(duì)列,這里要理清一下這些任務(wù)隊(duì)列的關(guān)系:


線程池dispatch的操作可以大概做個(gè)總結(jié):
1.先把分發(fā)下來的runable任務(wù)封裝成Task雷酪,并標(biāo)記它是阻塞型的還是非阻塞型的
2.判斷當(dāng)前線程是否是運(yùn)行在當(dāng)前線程池里的線程淑仆,是的話就把傳進(jìn)來的Task直接加入當(dāng)前線程的任務(wù)隊(duì)列中
3.否則根據(jù)task類型加入到線程池的相應(yīng)任務(wù)隊(duì)列中
4.嘗試喚醒相應(yīng)類型的線程,沒有的話就創(chuàng)建線程來執(zhí)行工作

線程池里的Worker

看看真正執(zhí)行任務(wù)的地方哥力,重點(diǎn)看Worker里的runWorker方法

// Worker內(nèi)部類
    internal inner class Worker private constructor() : Thread() {
        init {
            isDaemon = true
        }

        // guarded by scheduler lock, index in workers array, 0 when not in array (terminated)
        @Volatile // volatile for push/pop operation into parkedWorkersStack
        var indexInArray = 0
            set(index) {
                name = "$schedulerName-worker-${if (index == 0) "TERMINATED" else index.toString()}"
                field = index
            }

        constructor(index: Int) : this() {
            indexInArray = index
        }

        inline val scheduler get() = this@CoroutineScheduler

        @JvmField
        val localQueue: WorkQueue = WorkQueue()
        // 線程狀態(tài)
        var state = WorkerState.DORMANT
        

        override fun run() = runWorker()

        private fun runWorker() {
            var rescanned = false
          //一直查找任務(wù)去執(zhí)行蔗怠,除非worker終止了
          while (!isTerminated && state != CoroutineScheduler.WorkerState.TERMINATED) {
            //從隊(duì)列里尋找任務(wù)
            //mayHaveLocalTasks:本地隊(duì)列里是否有任務(wù)
            val task = findTask(mayHaveLocalTasks)
            if (task != null) {
                rescanned = false
                minDelayUntilStealableTaskNs = 0L
                //任務(wù)獲取到后,執(zhí)行任務(wù)
                executeTask(task)
                //任務(wù)執(zhí)行完畢吩跋,繼續(xù)循環(huán)查找任務(wù)
                continue
            } else {
                mayHaveLocalTasks = false
            }
            if (minDelayUntilStealableTaskNs != 0L) {
                // 這個(gè)rescanned控制下面的分支代碼的執(zhí)行
                if (!rescanned) {
                    rescanned = true
                } else {
                    //掛起一段時(shí)間再去偷
                        rescanned = false
                        tryReleaseCpu(WorkerState.PARKING)
                        interrupted()
                        LockSupport.parkNanos(minDelayUntilStealableTaskNs)
                        minDelayUntilStealableTaskNs = 0L   // 只執(zhí)行一次寞射,下次循環(huán)不會(huì)再命中              
                }  
                continue
            }
          //嘗試掛起
          tryPark()
        }
        //釋放token
        tryReleaseCpu(CoroutineScheduler.WorkerState.TERMINATED)
      }

      fun findTask(scanLocalQueue: Boolean): Task? {
          //嘗試獲取cpu 許可
          //若是拿到cpu 許可,則可以執(zhí)行任何任務(wù)
           // 它和核心線程數(shù)相關(guān)锌钮,假設(shè)我們是8核CPU桥温,那么同一時(shí)間最多只能有8個(gè)線程在CPU上執(zhí)行。因此梁丘,若是其它線程想
          // 要執(zhí)行非阻塞任務(wù)(占用CPU)策治,需要申請(qǐng)?jiān)S可(token)脓魏,申請(qǐng)成功說明有CPU空閑,此時(shí)該線程可以執(zhí)行非阻塞任務(wù)通惫。否則茂翔,只能執(zhí)行阻塞任務(wù)。
          if (tryAcquireCpuPermit())
               return findAnyTask(scanLocalQueue)
           //拿不到履腋,若是本地隊(duì)列有任務(wù)珊燎,則從本地取,否則從全局阻塞隊(duì)列取
           val task = if (scanLocalQueue) {
               localQueue.poll() ?: globalBlockingQueue.removeFirstOrNull()
           } else {
               globalBlockingQueue.removeFirstOrNull()
           }
           //都拿不到遵湖,則偷別人的
            // 當(dāng)從本地隊(duì)列悔政、全局隊(duì)列里都沒找出任務(wù)時(shí),當(dāng)前的Worker打起了別個(gè)Woker的主意延旧。我們知道全局隊(duì)列是所有Worker共
          // 享谋国,而本地隊(duì)列是每個(gè)Worker私有的。因此迁沫,當(dāng)前Worker發(fā)現(xiàn)自己沒任務(wù)可以執(zhí)行的時(shí)候會(huì)去看看其它Worker的本地隊(duì)列里
          // 是否有可以執(zhí)行的任務(wù)芦瘾,若是有就可以偷過來用。
           return task ?: trySteal(blockingOnly = true)
       }

       private fun findAnyTask(scanLocalQueue: Boolean): Task? {
           if (scanLocalQueue) {
               //可以從本地隊(duì)列找
               val globalFirst = nextInt(2 * corePoolSize) == 0
               if (globalFirst) pollGlobalQueues()?.let { return it }
               localQueue.poll()?.let { return it }
               if (!globalFirst) pollGlobalQueues()?.let { return it }
           } else {
               //從全局隊(duì)列找
               pollGlobalQueues()?.let { return it }
           }
           //偷別人的
           return trySteal(blockingOnly = false)
       }
// 從全局隊(duì)列獲取任務(wù)
private fun pollGlobalQueues(): Task? {
    // 隨機(jī)獲取CPU任務(wù)或者非CPU任務(wù)
    if (nextInt(2) == 0) {
        // 優(yōu)先獲取CPU任務(wù)
        globalCpuQueue.removeFirstOrNull()?.let { return it }
        return globalBlockingQueue.removeFirstOrNull()
    } else {
        // 優(yōu)先獲取非CPU任務(wù)
        globalBlockingQueue.removeFirstOrNull()?.let { return it }
        return globalCpuQueue.removeFirstOrNull()
    }
}

      // 掛起函數(shù)集畅,這是針對(duì)woker的狀態(tài)
      private fun tryPark() {
          //沒有在掛起棧里
        if (!inStack()) {
            //將worker放入掛起棧里
            parkedWorkersStackPush(this)
            return
        }
        while (inStack() && workerCtl.value == CoroutineScheduler.PARKED) { // Prevent spurious wakeups
            if (isTerminated || state == CoroutineScheduler.WorkerState.TERMINATED) break
            //真正掛起(不是實(shí)時(shí)近弟,會(huì)暫時(shí)掛起一段時(shí)間idleWorkerKeepAliveNs,線程空閑時(shí)間)挺智,并標(biāo)記worker state 狀態(tài)祷愉,會(huì)修改state = WorkerState.TERMINATED,runWorker循環(huán)里會(huì)判斷該標(biāo)記赦颇,若是終止了二鳄,則循環(huán)停止,整個(gè)線程執(zhí)行結(jié)束媒怯。
            park()
        }

        ...略
    }

做個(gè)小總結(jié):
1.線程執(zhí)行的時(shí)候從全局隊(duì)列泥从、本地隊(duì)列里查找任務(wù)。
2.若是沒找到沪摄,則嘗試從別的Worker 本地隊(duì)列里偷取任務(wù)躯嫉。
3.能夠找到任務(wù)則最終會(huì)執(zhí)行協(xié)程體里的代碼。
4.若是沒有任務(wù)杨拐,則根據(jù)策略掛起一段時(shí)間或是最終退出線程的執(zhí)行祈餐。

Dispatchers.IO:
internal object DefaultIoScheduler : ExecutorCoroutineDispatcher(), Executor {

    private val default = UnlimitedIoScheduler.limitedParallelism(
        systemProp(
            IO_PARALLELISM_PROPERTY_NAME,
            64.coerceAtLeast(AVAILABLE_PROCESSORS)
        )
    )
    override fun dispatch(context: CoroutineContext, block: Runnable) {
            default.dispatch(context, block)
    }
    ...略
}

// The unlimited instance of Dispatchers.IO that utilizes all the threads CoroutineScheduler provides
private object UnlimitedIoScheduler : CoroutineDispatcher() {

    @InternalCoroutinesApi
    override fun dispatchYield(context: CoroutineContext, block: Runnable) {
        DefaultScheduler.dispatchWithContext(block, BlockingContext, true)
    }
     // 最終調(diào)用了DefaultScheduler的分分發(fā)方法
    override fun dispatch(context: CoroutineContext, block: Runnable) {
        DefaultScheduler.dispatchWithContext(block, BlockingContext, false)
    }
}
// UnlimitedIoScheduler父類CoroutineDispatcher的方法
    public open fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
        parallelism.checkParallelism()
        // 返回一個(gè)UnlimitedIoScheduler的代理Dispatcher
        return LimitedDispatcher(this, parallelism)
    }

通過以上代碼可以看到Dispatchers.IO最終調(diào)用到的線程池分發(fā)方法是DEFAULT里的,而DEFAULT是個(gè)單例哄陶,所以兩者其實(shí)共享了線程池CoroutineScheduler.
但是隨著對(duì)代理類LimitedDispatcher的深入研究發(fā)現(xiàn)Dispatchers.IO策略上有所不同帆阳。

// 因?yàn)镈ispatchers.IO是單例的,所以內(nèi)部的這個(gè)LimitedDispatcher也是單例的屋吨,先看名字蜒谤,這是一個(gè)受限制的分發(fā)器山宾,
// 限制啥?限制的是最大并行數(shù)量鳍徽,由系統(tǒng)屬性設(shè)定的值或 CPU 核心數(shù)的最大值決定资锰,系統(tǒng)屬性值一般設(shè)置的是 64,也就是說阶祭,一般來說绷杜,該調(diào)度器可能會(huì)創(chuàng)建 64 個(gè)線程來執(zhí)行任務(wù)
internal class LimitedDispatcher(
    private val dispatcher: CoroutineDispatcher,
    private val parallelism: Int
) : CoroutineDispatcher(), Runnable, Delay by (dispatcher as? Delay ?: DefaultDelay) {

    @Volatile
    private var runningWorkers = 0

    private val queue = LockFreeTaskQueue<Runnable>(singleConsumer = false)

    // A separate object that we can synchronize on for K/N
    private val workerAllocationLock = SynchronizedObject()

    @ExperimentalCoroutinesApi
    override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
        parallelism.checkParallelism()
        if (parallelism >= this.parallelism) return this
        return super.limitedParallelism(parallelism)
    }

    // 核心方法,分發(fā)任務(wù)
    override fun dispatch(context: CoroutineContext, block: Runnable) {
        // 先執(zhí)行dispatchInternal
        dispatchInternal(block) {
            // 再把自己作為runnable參數(shù)給到代理的dispatcher
            dispatcher.dispatch(this, this)
        }
    }
  
    private inline fun dispatchInternal(block: Runnable, dispatch: () -> Unit) {
        // 添加任務(wù)到隊(duì)列濒募,如果此時(shí)最大并行任務(wù)超過了限制就直接return鞭盟,暫時(shí)不執(zhí)行dispatch方法
        if (addAndTryDispatching(block)) return
        // 統(tǒng)計(jì)當(dāng)前的并行任務(wù),超過了就直接return瑰剃,暫時(shí)不執(zhí)行dispatch方法
        if (!tryAllocateWorker()) return
        // 做完了添加操作執(zhí)行dispatch()齿诉,此時(shí)才會(huì)真正分發(fā)到協(xié)程線程池CoroutineSchduler
        dispatch()
    }
    // 添加任務(wù)到隊(duì)列并嘗試分發(fā)
     private fun addAndTryDispatching(block: Runnable): Boolean {
         // 添加本地任務(wù)隊(duì)列,添加不需要條件 來了就添加
        queue.addLast(block)
        // 判斷正在執(zhí)行的任務(wù)的數(shù)量是否大于最大并行線程數(shù)量晌姚,正是在這里做到了限制最大并行IO線程的作用
        return runningWorkers >= parallelism 
    }
    // 嘗試分配線程粤剧,其實(shí)不會(huì)新建線程,只是統(tǒng)計(jì)一下當(dāng)前的并行線程數(shù)量舀凛,在這里仍然會(huì)先判斷是否大于了最大并行限制
    private fun tryAllocateWorker(): Boolean {
        synchronized(workerAllocationLock) {
            if (runningWorkers >= parallelism) return false
            ++runningWorkers
            return true
        }
    }

       // 注意他自己實(shí)現(xiàn)了Runnable,最終他是把自己送到CoroutineSchduler去執(zhí)行的途蒋,協(xié)程傳進(jìn)來的任務(wù)在這里被包裝了執(zhí)行
    override fun run() {
        var fairnessCounter = 0
        // 別被這個(gè)循環(huán)體迷惑猛遍,就以為線程都是串行執(zhí)行的,實(shí)際每次新協(xié)程創(chuàng)建任務(wù)都會(huì)執(zhí)行run方法号坡,然后最終包裝到協(xié)程線程池去并發(fā)執(zhí)行
        // 這個(gè)循環(huán)是為了執(zhí)行超過最大并發(fā)數(shù)的時(shí)候懊烤,那些只添加到了隊(duì)列但是沒有立馬執(zhí)行的任務(wù)
        while (true) {
            val task = queue.removeFirstOrNull()
            if (task != null) {
                try {
                    task.run()
                } catch (e: Throwable) {
                    handleCoroutineException(EmptyCoroutineContext, e)
                }
                //  這里比較有意思,當(dāng)有大量的并發(fā)任務(wù)(比如短時(shí)間添加了200個(gè)任務(wù))宽堆,那么這個(gè)方法有可能會(huì)長(zhǎng)時(shí)間的執(zhí)行下去腌紧,所以
              // 這里為了公平起見不長(zhǎng)期霸占資源,當(dāng)超過執(zhí)行了16個(gè)任務(wù)后就重新分發(fā)一次畜隶,這樣就能短暫的讓出cpu讓別的線程執(zhí)行(不知道理解的對(duì)不對(duì))
                if (++fairnessCounter >= 16 && dispatcher.isDispatchNeeded(this)) {
                    // Do "yield" to let other views to execute their runnable as well
                    // Note that we do not decrement 'runningWorkers' as we still committed to do our part of work
                    dispatcher.dispatch(this, this)
                    return
                }
                continue
            }

            synchronized(workerAllocationLock) {
                --runningWorkers
                if (queue.size == 0) return
                ++runningWorkers
                fairnessCounter = 0
            }
        }
    }
}

由上可以看出Dispatchers.IO 任務(wù)分發(fā)是借助于DefaultScheduler壁肋,也就是Dispatchers.Default的能力,因此兩者是共用一個(gè)線程池籽慢。
只是Dispatchers.IO 比較特殊浸遗,它有個(gè)隊(duì)列,該隊(duì)列作用:
當(dāng)IO 任務(wù)分派個(gè)數(shù)超過設(shè)定的并行數(shù)時(shí)箱亿,不會(huì)直接進(jìn)行分發(fā)跛锌,而是先存放在隊(duì)列里。

到此協(xié)程線程池基本原理分析完畢届惋,下篇打算探討一些協(xié)程的常用案例髓帽,看看哪些適合應(yīng)用在實(shí)際開發(fā)中以及注意事項(xiàng)菠赚。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市郑藏,隨后出現(xiàn)的幾起案子衡查,更是在濱河造成了極大的恐慌,老刑警劉巖译秦,帶你破解...
    沈念sama閱讀 219,039評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件峡捡,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡筑悴,警方通過查閱死者的電腦和手機(jī)们拙,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,426評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來阁吝,“玉大人砚婆,你說我怎么就攤上這事⊥挥拢” “怎么了装盯?”我有些...
    開封第一講書人閱讀 165,417評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)甲馋。 經(jīng)常有香客問我埂奈,道長(zhǎng),這世上最難降的妖魔是什么定躏? 我笑而不...
    開封第一講書人閱讀 58,868評(píng)論 1 295
  • 正文 為了忘掉前任账磺,我火速辦了婚禮,結(jié)果婚禮上痊远,老公的妹妹穿的比我還像新娘垮抗。我一直安慰自己,他們只是感情好碧聪,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,892評(píng)論 6 392
  • 文/花漫 我一把揭開白布冒版。 她就那樣靜靜地躺著,像睡著了一般逞姿。 火紅的嫁衣襯著肌膚如雪辞嗡。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,692評(píng)論 1 305
  • 那天滞造,我揣著相機(jī)與錄音欲间,去河邊找鬼。 笑死断部,一個(gè)胖子當(dāng)著我的面吹牛猎贴,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播,決...
    沈念sama閱讀 40,416評(píng)論 3 419
  • 文/蒼蘭香墨 我猛地睜開眼她渴,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼达址!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起趁耗,我...
    開封第一講書人閱讀 39,326評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤沉唠,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后苛败,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體满葛,經(jīng)...
    沈念sama閱讀 45,782評(píng)論 1 316
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,957評(píng)論 3 337
  • 正文 我和宋清朗相戀三年罢屈,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了嘀韧。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,102評(píng)論 1 350
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡缠捌,死狀恐怖锄贷,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情曼月,我是刑警寧澤谊却,帶...
    沈念sama閱讀 35,790評(píng)論 5 346
  • 正文 年R本政府宣布,位于F島的核電站哑芹,受9級(jí)特大地震影響炎辨,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜聪姿,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,442評(píng)論 3 331
  • 文/蒙蒙 一碴萧、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧咳燕,春花似錦勿决、人聲如沸乒躺。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,996評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)嘉冒。三九已至曹货,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間讳推,已是汗流浹背顶籽。 一陣腳步聲響...
    開封第一講書人閱讀 33,113評(píng)論 1 272
  • 我被黑心中介騙來泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留银觅,地道東北人礼饱。 一個(gè)月前我還...
    沈念sama閱讀 48,332評(píng)論 3 373
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親镊绪。 傳聞我的和親對(duì)象是個(gè)殘疾皇子匀伏,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,044評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容