上兩篇文章梳理了協(xié)程的運(yùn)行原理豌注,因?yàn)榫€程池相對(duì)于協(xié)程實(shí)現(xiàn)來說是可以單獨(dú)拿出來講的,所以分析到線程池的時(shí)候沒有繼續(xù)深入,現(xiàn)在就單獨(dú)來看看協(xié)程線程池的實(shí)現(xiàn)。
協(xié)程線程池是由分發(fā)器Dispatchers來維護(hù)的,主要是Dispatchers.DEFAULT和Dispatchers.IO兩個(gè)分發(fā)器灵巧。
Dispatchers.DEFAULT
Dispatchers.DEFAULT它持有的線程池是CoroutineScheduler:
【SchedulerCoroutineDispatcher】
internal open class SchedulerCoroutineDispatcher(
private val corePoolSize: Int = CORE_POOL_SIZE,
private val maxPoolSize: Int = MAX_POOL_SIZE,
private val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS,
private val schedulerName: String = "CoroutineScheduler",
) : ExecutorCoroutineDispatcher() {
private var coroutineScheduler = createScheduler()
private fun createScheduler() =
CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName)
override fun dispatch(context: CoroutineContext, block: Runnable): Unit = coroutineScheduler.dispatch(block) // 線程池分發(fā)
...略
}
【CoroutineScheduler】
internal class CoroutineScheduler(
@JvmField val corePoolSize: Int, // 核心線程數(shù)
@JvmField val maxPoolSize: Int, // 最大線程數(shù)
@JvmField val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS, // 空閑線程存活時(shí)間
@JvmField val schedulerName: String = DEFAULT_SCHEDULER_NAME // 線程池名稱"CoroutineScheduler"
) : Executor, Closeable {
// 用于存儲(chǔ)全局的純CPU(不阻塞)任務(wù)
@JvmField
val globalCpuQueue = GlobalQueue()
// 用于存儲(chǔ)全局的執(zhí)行非純CPU(可能阻塞)任務(wù)
@JvmField
val globalBlockingQueue = GlobalQueue()
// 用于記錄當(dāng)前處于Parked狀態(tài)(一段時(shí)間后自動(dòng)終止)的線程的數(shù)量
private val parkedWorkersStack = atomic(0L)
// 用于保存當(dāng)前線程池中的線程
// workers[0]永遠(yuǎn)為null抹沪,作為哨兵位
// index從1到maxPoolSize為有效線程
@JvmField
val workers = AtomicReferenceArray<Worker?>(maxPoolSize + 1)
// 控制狀態(tài)
private val controlState = atomic(corePoolSize.toLong() shl CPU_PERMITS_SHIFT)
// 表示已經(jīng)創(chuàng)建的線程的數(shù)量
private val createdWorkers: Int inline get() = (controlState.value and CREATED_MASK).toInt()
// 表示可以獲取的CPU令牌數(shù)量刻肄,初始值為線程池核心線程數(shù)量
private val availableCpuPermits: Int inline get() = availableCpuPermits(controlState.value)
// 獲取指定的狀態(tài)的已經(jīng)創(chuàng)建的線程的數(shù)量
private inline fun createdWorkers(state: Long): Int = (state and CREATED_MASK).toInt()
// 獲取指定的狀態(tài)的執(zhí)行阻塞任務(wù)的數(shù)量
private inline fun blockingTasks(state: Long): Int = (state and BLOCKING_MASK shr BLOCKING_SHIFT).toInt()
// 獲取指定的狀態(tài)的CPU令牌數(shù)量
public inline fun availableCpuPermits(state: Long): Int = (state and CPU_PERMITS_MASK shr CPU_PERMITS_SHIFT).toInt()
// 當(dāng)前已經(jīng)創(chuàng)建的線程數(shù)量加1
private inline fun incrementCreatedWorkers(): Int = createdWorkers(controlState.incrementAndGet())
// 當(dāng)前已經(jīng)創(chuàng)建的線程數(shù)量減1
private inline fun decrementCreatedWorkers(): Int = createdWorkers(controlState.getAndDecrement())
// 當(dāng)前執(zhí)行阻塞任務(wù)的線程數(shù)量加1
private inline fun incrementBlockingTasks() = controlState.addAndGet(1L shl BLOCKING_SHIFT)
// 當(dāng)前執(zhí)行阻塞任務(wù)的線程數(shù)量減1
private inline fun decrementBlockingTasks() {
controlState.addAndGet(-(1L shl BLOCKING_SHIFT))
}
// 嘗試獲取CPU令牌
private inline fun tryAcquireCpuPermit(): Boolean = controlState.loop { state ->
val available = availableCpuPermits(state)
if (available == 0) return false
val update = state - (1L shl CPU_PERMITS_SHIFT)
if (controlState.compareAndSet(state, update)) return true
}
// 釋放CPU令牌
private inline fun releaseCpuPermit() = controlState.addAndGet(1L shl CPU_PERMITS_SHIFT)
// 表示當(dāng)前線程池是否關(guān)閉
private val _isTerminated = atomic(false)
val isTerminated: Boolean get() = _isTerminated.value
companion object {
// 用于標(biāo)記一個(gè)線程是否在parkedWorkersStack中(處于Parked狀態(tài))
@JvmField
val NOT_IN_STACK = Symbol("NOT_IN_STACK")
// 線程的三個(gè)狀態(tài)
// CLAIMED表示線程可以執(zhí)行任務(wù)
// PARKED表示線程暫停執(zhí)行任務(wù),一段時(shí)間后會(huì)自動(dòng)進(jìn)入終止?fàn)顟B(tài)
// TERMINATED表示線程處于終止?fàn)顟B(tài)
private const val PARKED = -1
private const val CLAIMED = 0
private const val TERMINATED = 1
// 以下五個(gè)常量為掩碼
private const val BLOCKING_SHIFT = 21 // 2x1024x1024
// 1-21位
private const val CREATED_MASK: Long = (1L shl BLOCKING_SHIFT) - 1
// 22-42位
private const val BLOCKING_MASK: Long = CREATED_MASK shl BLOCKING_SHIFT
// 42
private const val CPU_PERMITS_SHIFT = BLOCKING_SHIFT * 2
// 43-63位
private const val CPU_PERMITS_MASK = CREATED_MASK shl CPU_PERMITS_SHIFT
// 以下兩個(gè)常量用于require中參數(shù)判斷
internal const val MIN_SUPPORTED_POOL_SIZE = 1
// 2x1024x1024-2
internal const val MAX_SUPPORTED_POOL_SIZE = (1 shl BLOCKING_SHIFT) - 2
// parkedWorkersStack的掩碼
private const val PARKED_INDEX_MASK = CREATED_MASK
// inv表示01反轉(zhuǎn)
private const val PARKED_VERSION_MASK = CREATED_MASK.inv()
private const val PARKED_VERSION_INC = 1L shl BLOCKING_SHIFT
}
fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false) {
trackTask() // this is needed for virtual time support
// 傳入ruanable和taskContext構(gòu)建一個(gè)Task實(shí)例融欧,taskContext決定線程是cpu密集型還是阻塞型敏弃,通過上面Dispatchers.DEFAULT的代碼可以知
// 道,它分發(fā)任務(wù)的時(shí)候是創(chuàng)建的NonBlockingContext噪馏,也就是非阻塞型的
val task = createTask(block, taskContext)
// 判斷當(dāng)前線程是否運(yùn)行在當(dāng)前線程池
val currentWorker = currentWorker()
// 嘗試加入本地隊(duì)列麦到,注意這個(gè)方法是Woker的擴(kuò)展方法绿饵,這個(gè)本地隊(duì)列是Woker的變量,
// Woker是對(duì)Thread的一層封裝瓶颠,專門用于協(xié)程線程池里用的
val notAdded = currentWorker.submitToLocalQueue(task, tailDispatch)
// notAdded不為null拟赊,代表加入本地任務(wù)隊(duì)列失敗,也代表此時(shí)的線程不是運(yùn)行在線程池
if (notAdded != null) {
// 嘗試加入全局任務(wù)隊(duì)列粹淋,這里的全局指的是線程池里維護(hù)的兩個(gè)變量
if (!addToGlobalQueue(notAdded)) {
// Global queue is closed in the last step of close/shutdown -- no more tasks should be accepted
throw RejectedExecutionException("$schedulerName was terminated")
}
}
val skipUnpark = tailDispatch && currentWorker != null
// Checking 'task' instead of 'notAdded' is completely okay
// 如果任務(wù)是非阻塞任務(wù)吸祟,則喚醒cpu線程
if (task.mode == TASK_NON_BLOCKING) {
if (skipUnpark) return
signalCpuWork()
} else {
// 否則就喚醒阻塞線程
signalBlockingWork(skipUnpark = skipUnpark)
}
}
// currentWorker方法
private fun currentWorker(): Worker? = (Thread.currentThread() as? Worker)?.takeIf { it.scheduler == this }
// Worker的擴(kuò)展函數(shù)submitToLocalQueue
private fun CoroutineScheduler.Worker?.submitToLocalQueue(task: Task, tailDispatch: Boolean): Task? {
// Worker 為空,直接返回任務(wù)本身
if (this == null) return task
// 非阻塞的任務(wù)且此時(shí)Worker處于阻塞狀態(tài)桃移,則直接返回
if (task.mode == TASK_NON_BLOCKING && state === CoroutineScheduler.WorkerState.BLOCKING) {
return task
}
//表示本地隊(duì)列里存有任務(wù)了
mayHaveLocalTasks = true
//加入到本地隊(duì)列里
//localQueue 為Worker的成員變量
return localQueue.add(task, fair = tailDispatch)
}
// addToGlobalQueue方法
private fun addToGlobalQueue(task: Task): Boolean {
return if (task.isBlocking) {
//加入到全局阻塞隊(duì)列
globalBlockingQueue.addLast(task)
} else {
//加入到全局cpu隊(duì)列
globalCpuQueue.addLast(task)
}
}
fun signalCpuWork() {
//嘗試去喚醒正在掛起的線程欢搜,若是有線程可以被喚醒,則無需創(chuàng)建新線程
if (tryUnpark()) return
//若喚醒不成功谴轮,則需要嘗試創(chuàng)建線程
if (tryCreateWorker()) return
//再試一次,邊界條件
tryUnpark()
}
// 嘗試創(chuàng)建線程的方法
private fun tryCreateWorker(state: Long = controlState.value): Boolean {
//獲取當(dāng)前已經(jīng)創(chuàng)建的線程數(shù)
val created = createdWorkers(state)
//獲取當(dāng)前阻塞的任務(wù)數(shù)
val blocking = blockingTasks(state)
//已創(chuàng)建的線程數(shù)-阻塞的任務(wù)數(shù)=非阻塞的線程數(shù)
//coerceAtLeast(0) 表示結(jié)果至少是0
val cpuWorkers = (created - blocking).coerceAtLeast(0)
//如果非阻塞數(shù)小于核心線程數(shù)
// 現(xiàn)在若是已經(jīng)創(chuàng)建了5個(gè)線程吹埠,而這幾個(gè)線程都在執(zhí)行IO任務(wù)第步,此時(shí)就需要再創(chuàng)建新的線程來執(zhí)行任務(wù),因?yàn)榇藭r(shí)CPU是空閑的缘琅。
//只要非阻塞任務(wù)的個(gè)數(shù)小于核心線程數(shù)粘都,那么就需要?jiǎng)?chuàng)建新的線程,目的是為了充分利用CPU刷袍。
if (cpuWorkers < corePoolSize) {
//創(chuàng)建線程
val newCpuWorkers = createNewWorker()
//如果當(dāng)前只有一個(gè)非阻塞線程并且核心線程數(shù)>1翩隧,那么再創(chuàng)建一個(gè)線程
//目的是為了方便"偷"任務(wù)...
if (newCpuWorkers == 1 && corePoolSize > 1) createNewWorker()
//創(chuàng)建成功
if (newCpuWorkers > 0) return true
}
return false
}
// 創(chuàng)建線程的方法
//workers 為Worker 數(shù)組,因?yàn)樾枰獙?duì)數(shù)組進(jìn)行add 操作呻纹,因此需要同步訪問
private fun createNewWorker(): Int {
synchronized(workers) {
if (isTerminated) return -1
val state = controlState.value
//獲取已創(chuàng)建的線程數(shù)
val created = createdWorkers(state)
//阻塞的任務(wù)數(shù)
val blocking = blockingTasks(state)
//非阻塞的線程數(shù)
val cpuWorkers = (created - blocking).coerceAtLeast(0)
//非阻塞的線程數(shù)不能超過核心線程數(shù)
if (cpuWorkers >= corePoolSize) return 0
//已創(chuàng)建的線程數(shù)不能大于最大線程數(shù)
if (created >= maxPoolSize) return 0
val newIndex = createdWorkers + 1
require(newIndex > 0 && workers[newIndex] == null)
//構(gòu)造線程
val worker = Worker(newIndex)
//記錄到數(shù)組里
workers[newIndex] = worker
//記錄創(chuàng)建的線程數(shù)
require(newIndex == incrementCreatedWorkers())
//開啟線程
worker.start()
//當(dāng)前非阻塞線程數(shù)
return cpuWorkers + 1
}
}
...略
}
通過以上分析堆生,知道有三個(gè)任務(wù)隊(duì)列,這里要理清一下這些任務(wù)隊(duì)列的關(guān)系:
線程池dispatch的操作可以大概做個(gè)總結(jié):
1.先把分發(fā)下來的runable任務(wù)封裝成Task雷酪,并標(biāo)記它是阻塞型的還是非阻塞型的
2.判斷當(dāng)前線程是否是運(yùn)行在當(dāng)前線程池里的線程淑仆,是的話就把傳進(jìn)來的Task直接加入當(dāng)前線程的任務(wù)隊(duì)列中
3.否則根據(jù)task類型加入到線程池的相應(yīng)任務(wù)隊(duì)列中
4.嘗試喚醒相應(yīng)類型的線程,沒有的話就創(chuàng)建線程來執(zhí)行工作
線程池里的Worker
看看真正執(zhí)行任務(wù)的地方哥力,重點(diǎn)看Worker里的runWorker方法
// Worker內(nèi)部類
internal inner class Worker private constructor() : Thread() {
init {
isDaemon = true
}
// guarded by scheduler lock, index in workers array, 0 when not in array (terminated)
@Volatile // volatile for push/pop operation into parkedWorkersStack
var indexInArray = 0
set(index) {
name = "$schedulerName-worker-${if (index == 0) "TERMINATED" else index.toString()}"
field = index
}
constructor(index: Int) : this() {
indexInArray = index
}
inline val scheduler get() = this@CoroutineScheduler
@JvmField
val localQueue: WorkQueue = WorkQueue()
// 線程狀態(tài)
var state = WorkerState.DORMANT
override fun run() = runWorker()
private fun runWorker() {
var rescanned = false
//一直查找任務(wù)去執(zhí)行蔗怠,除非worker終止了
while (!isTerminated && state != CoroutineScheduler.WorkerState.TERMINATED) {
//從隊(duì)列里尋找任務(wù)
//mayHaveLocalTasks:本地隊(duì)列里是否有任務(wù)
val task = findTask(mayHaveLocalTasks)
if (task != null) {
rescanned = false
minDelayUntilStealableTaskNs = 0L
//任務(wù)獲取到后,執(zhí)行任務(wù)
executeTask(task)
//任務(wù)執(zhí)行完畢吩跋,繼續(xù)循環(huán)查找任務(wù)
continue
} else {
mayHaveLocalTasks = false
}
if (minDelayUntilStealableTaskNs != 0L) {
// 這個(gè)rescanned控制下面的分支代碼的執(zhí)行
if (!rescanned) {
rescanned = true
} else {
//掛起一段時(shí)間再去偷
rescanned = false
tryReleaseCpu(WorkerState.PARKING)
interrupted()
LockSupport.parkNanos(minDelayUntilStealableTaskNs)
minDelayUntilStealableTaskNs = 0L // 只執(zhí)行一次寞射,下次循環(huán)不會(huì)再命中
}
continue
}
//嘗試掛起
tryPark()
}
//釋放token
tryReleaseCpu(CoroutineScheduler.WorkerState.TERMINATED)
}
fun findTask(scanLocalQueue: Boolean): Task? {
//嘗試獲取cpu 許可
//若是拿到cpu 許可,則可以執(zhí)行任何任務(wù)
// 它和核心線程數(shù)相關(guān)锌钮,假設(shè)我們是8核CPU桥温,那么同一時(shí)間最多只能有8個(gè)線程在CPU上執(zhí)行。因此梁丘,若是其它線程想
// 要執(zhí)行非阻塞任務(wù)(占用CPU)策治,需要申請(qǐng)?jiān)S可(token)脓魏,申請(qǐng)成功說明有CPU空閑,此時(shí)該線程可以執(zhí)行非阻塞任務(wù)通惫。否則茂翔,只能執(zhí)行阻塞任務(wù)。
if (tryAcquireCpuPermit())
return findAnyTask(scanLocalQueue)
//拿不到履腋,若是本地隊(duì)列有任務(wù)珊燎,則從本地取,否則從全局阻塞隊(duì)列取
val task = if (scanLocalQueue) {
localQueue.poll() ?: globalBlockingQueue.removeFirstOrNull()
} else {
globalBlockingQueue.removeFirstOrNull()
}
//都拿不到遵湖,則偷別人的
// 當(dāng)從本地隊(duì)列悔政、全局隊(duì)列里都沒找出任務(wù)時(shí),當(dāng)前的Worker打起了別個(gè)Woker的主意延旧。我們知道全局隊(duì)列是所有Worker共
// 享谋国,而本地隊(duì)列是每個(gè)Worker私有的。因此迁沫,當(dāng)前Worker發(fā)現(xiàn)自己沒任務(wù)可以執(zhí)行的時(shí)候會(huì)去看看其它Worker的本地隊(duì)列里
// 是否有可以執(zhí)行的任務(wù)芦瘾,若是有就可以偷過來用。
return task ?: trySteal(blockingOnly = true)
}
private fun findAnyTask(scanLocalQueue: Boolean): Task? {
if (scanLocalQueue) {
//可以從本地隊(duì)列找
val globalFirst = nextInt(2 * corePoolSize) == 0
if (globalFirst) pollGlobalQueues()?.let { return it }
localQueue.poll()?.let { return it }
if (!globalFirst) pollGlobalQueues()?.let { return it }
} else {
//從全局隊(duì)列找
pollGlobalQueues()?.let { return it }
}
//偷別人的
return trySteal(blockingOnly = false)
}
// 從全局隊(duì)列獲取任務(wù)
private fun pollGlobalQueues(): Task? {
// 隨機(jī)獲取CPU任務(wù)或者非CPU任務(wù)
if (nextInt(2) == 0) {
// 優(yōu)先獲取CPU任務(wù)
globalCpuQueue.removeFirstOrNull()?.let { return it }
return globalBlockingQueue.removeFirstOrNull()
} else {
// 優(yōu)先獲取非CPU任務(wù)
globalBlockingQueue.removeFirstOrNull()?.let { return it }
return globalCpuQueue.removeFirstOrNull()
}
}
// 掛起函數(shù)集畅,這是針對(duì)woker的狀態(tài)
private fun tryPark() {
//沒有在掛起棧里
if (!inStack()) {
//將worker放入掛起棧里
parkedWorkersStackPush(this)
return
}
while (inStack() && workerCtl.value == CoroutineScheduler.PARKED) { // Prevent spurious wakeups
if (isTerminated || state == CoroutineScheduler.WorkerState.TERMINATED) break
//真正掛起(不是實(shí)時(shí)近弟,會(huì)暫時(shí)掛起一段時(shí)間idleWorkerKeepAliveNs,線程空閑時(shí)間)挺智,并標(biāo)記worker state 狀態(tài)祷愉,會(huì)修改state = WorkerState.TERMINATED,runWorker循環(huán)里會(huì)判斷該標(biāo)記赦颇,若是終止了二鳄,則循環(huán)停止,整個(gè)線程執(zhí)行結(jié)束媒怯。
park()
}
...略
}
做個(gè)小總結(jié):
1.線程執(zhí)行的時(shí)候從全局隊(duì)列泥从、本地隊(duì)列里查找任務(wù)。
2.若是沒找到沪摄,則嘗試從別的Worker 本地隊(duì)列里偷取任務(wù)躯嫉。
3.能夠找到任務(wù)則最終會(huì)執(zhí)行協(xié)程體里的代碼。
4.若是沒有任務(wù)杨拐,則根據(jù)策略掛起一段時(shí)間或是最終退出線程的執(zhí)行祈餐。
Dispatchers.IO:
internal object DefaultIoScheduler : ExecutorCoroutineDispatcher(), Executor {
private val default = UnlimitedIoScheduler.limitedParallelism(
systemProp(
IO_PARALLELISM_PROPERTY_NAME,
64.coerceAtLeast(AVAILABLE_PROCESSORS)
)
)
override fun dispatch(context: CoroutineContext, block: Runnable) {
default.dispatch(context, block)
}
...略
}
// The unlimited instance of Dispatchers.IO that utilizes all the threads CoroutineScheduler provides
private object UnlimitedIoScheduler : CoroutineDispatcher() {
@InternalCoroutinesApi
override fun dispatchYield(context: CoroutineContext, block: Runnable) {
DefaultScheduler.dispatchWithContext(block, BlockingContext, true)
}
// 最終調(diào)用了DefaultScheduler的分分發(fā)方法
override fun dispatch(context: CoroutineContext, block: Runnable) {
DefaultScheduler.dispatchWithContext(block, BlockingContext, false)
}
}
// UnlimitedIoScheduler父類CoroutineDispatcher的方法
public open fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
parallelism.checkParallelism()
// 返回一個(gè)UnlimitedIoScheduler的代理Dispatcher
return LimitedDispatcher(this, parallelism)
}
通過以上代碼可以看到Dispatchers.IO最終調(diào)用到的線程池分發(fā)方法是DEFAULT里的,而DEFAULT是個(gè)單例哄陶,所以兩者其實(shí)共享了線程池CoroutineScheduler.
但是隨著對(duì)代理類LimitedDispatcher的深入研究發(fā)現(xiàn)Dispatchers.IO策略上有所不同帆阳。
// 因?yàn)镈ispatchers.IO是單例的,所以內(nèi)部的這個(gè)LimitedDispatcher也是單例的屋吨,先看名字蜒谤,這是一個(gè)受限制的分發(fā)器山宾,
// 限制啥?限制的是最大并行數(shù)量鳍徽,由系統(tǒng)屬性設(shè)定的值或 CPU 核心數(shù)的最大值決定资锰,系統(tǒng)屬性值一般設(shè)置的是 64,也就是說阶祭,一般來說绷杜,該調(diào)度器可能會(huì)創(chuàng)建 64 個(gè)線程來執(zhí)行任務(wù)
internal class LimitedDispatcher(
private val dispatcher: CoroutineDispatcher,
private val parallelism: Int
) : CoroutineDispatcher(), Runnable, Delay by (dispatcher as? Delay ?: DefaultDelay) {
@Volatile
private var runningWorkers = 0
private val queue = LockFreeTaskQueue<Runnable>(singleConsumer = false)
// A separate object that we can synchronize on for K/N
private val workerAllocationLock = SynchronizedObject()
@ExperimentalCoroutinesApi
override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
parallelism.checkParallelism()
if (parallelism >= this.parallelism) return this
return super.limitedParallelism(parallelism)
}
// 核心方法,分發(fā)任務(wù)
override fun dispatch(context: CoroutineContext, block: Runnable) {
// 先執(zhí)行dispatchInternal
dispatchInternal(block) {
// 再把自己作為runnable參數(shù)給到代理的dispatcher
dispatcher.dispatch(this, this)
}
}
private inline fun dispatchInternal(block: Runnable, dispatch: () -> Unit) {
// 添加任務(wù)到隊(duì)列濒募,如果此時(shí)最大并行任務(wù)超過了限制就直接return鞭盟,暫時(shí)不執(zhí)行dispatch方法
if (addAndTryDispatching(block)) return
// 統(tǒng)計(jì)當(dāng)前的并行任務(wù),超過了就直接return瑰剃,暫時(shí)不執(zhí)行dispatch方法
if (!tryAllocateWorker()) return
// 做完了添加操作執(zhí)行dispatch()齿诉,此時(shí)才會(huì)真正分發(fā)到協(xié)程線程池CoroutineSchduler
dispatch()
}
// 添加任務(wù)到隊(duì)列并嘗試分發(fā)
private fun addAndTryDispatching(block: Runnable): Boolean {
// 添加本地任務(wù)隊(duì)列,添加不需要條件 來了就添加
queue.addLast(block)
// 判斷正在執(zhí)行的任務(wù)的數(shù)量是否大于最大并行線程數(shù)量晌姚,正是在這里做到了限制最大并行IO線程的作用
return runningWorkers >= parallelism
}
// 嘗試分配線程粤剧,其實(shí)不會(huì)新建線程,只是統(tǒng)計(jì)一下當(dāng)前的并行線程數(shù)量舀凛,在這里仍然會(huì)先判斷是否大于了最大并行限制
private fun tryAllocateWorker(): Boolean {
synchronized(workerAllocationLock) {
if (runningWorkers >= parallelism) return false
++runningWorkers
return true
}
}
// 注意他自己實(shí)現(xiàn)了Runnable,最終他是把自己送到CoroutineSchduler去執(zhí)行的途蒋,協(xié)程傳進(jìn)來的任務(wù)在這里被包裝了執(zhí)行
override fun run() {
var fairnessCounter = 0
// 別被這個(gè)循環(huán)體迷惑猛遍,就以為線程都是串行執(zhí)行的,實(shí)際每次新協(xié)程創(chuàng)建任務(wù)都會(huì)執(zhí)行run方法号坡,然后最終包裝到協(xié)程線程池去并發(fā)執(zhí)行
// 這個(gè)循環(huán)是為了執(zhí)行超過最大并發(fā)數(shù)的時(shí)候懊烤,那些只添加到了隊(duì)列但是沒有立馬執(zhí)行的任務(wù)
while (true) {
val task = queue.removeFirstOrNull()
if (task != null) {
try {
task.run()
} catch (e: Throwable) {
handleCoroutineException(EmptyCoroutineContext, e)
}
// 這里比較有意思,當(dāng)有大量的并發(fā)任務(wù)(比如短時(shí)間添加了200個(gè)任務(wù))宽堆,那么這個(gè)方法有可能會(huì)長(zhǎng)時(shí)間的執(zhí)行下去腌紧,所以
// 這里為了公平起見不長(zhǎng)期霸占資源,當(dāng)超過執(zhí)行了16個(gè)任務(wù)后就重新分發(fā)一次畜隶,這樣就能短暫的讓出cpu讓別的線程執(zhí)行(不知道理解的對(duì)不對(duì))
if (++fairnessCounter >= 16 && dispatcher.isDispatchNeeded(this)) {
// Do "yield" to let other views to execute their runnable as well
// Note that we do not decrement 'runningWorkers' as we still committed to do our part of work
dispatcher.dispatch(this, this)
return
}
continue
}
synchronized(workerAllocationLock) {
--runningWorkers
if (queue.size == 0) return
++runningWorkers
fairnessCounter = 0
}
}
}
}
由上可以看出Dispatchers.IO 任務(wù)分發(fā)是借助于DefaultScheduler壁肋,也就是Dispatchers.Default的能力,因此兩者是共用一個(gè)線程池籽慢。
只是Dispatchers.IO 比較特殊浸遗,它有個(gè)隊(duì)列,該隊(duì)列作用:
當(dāng)IO 任務(wù)分派個(gè)數(shù)超過設(shè)定的并行數(shù)時(shí)箱亿,不會(huì)直接進(jìn)行分發(fā)跛锌,而是先存放在隊(duì)列里。
到此協(xié)程線程池基本原理分析完畢届惋,下篇打算探討一些協(xié)程的常用案例髓帽,看看哪些適合應(yīng)用在實(shí)際開發(fā)中以及注意事項(xiàng)菠赚。