簡(jiǎn)介
這篇文章將從源碼的角度,分析攜程的執(zhí)行流程灾茁,我們創(chuàng)建一個(gè)攜程窜觉,系統(tǒng)是怎么進(jìn)行調(diào)度的谷炸,什么時(shí)候執(zhí)行的,是否需要?jiǎng)?chuàng)建新線程等等禀挫,帶著這些疑問(wèn)旬陡,一起往下看吧。
例子先行
fun main(): Unit = runBlocking {
launch {
println("${treadName()}======1")
}
GlobalScope.launch {
println("${treadName()}======3")
}
launch {
println("${treadName()}======2")
}
println("${treadName()}======4")
Thread.sleep(2000)
}
輸出如下:
DefaultDispatcher-worker-1======3
main======4
main======1
main======2
Process finished with exit code 0
根據(jù)打印语婴,如果根據(jù)單線程執(zhí)行流程來(lái)看描孟,是不是感覺(jué)上面的日志打印順序有點(diǎn)不好理解,下面我們就逐步來(lái)進(jìn)行分解砰左。
-
runBlocking攜程體
這里將其它代碼省略到了匿醒,我這里都是按照一條簡(jiǎn)單的執(zhí)行流程進(jìn)行講解。
public fun <T> runBlocking(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T): T { val eventLoop: EventLoop? val newContext: CoroutineContext ... if (contextInterceptor == null) { eventLoop = ThreadLocalEventLoop.eventLoop newContext = GlobalScope.newCoroutineContext(context + eventLoop) } ... val coroutine = BlockingCoroutine<T>(newContext, currentThread, eventLoop) coroutine.start(CoroutineStart.DEFAULT, coroutine, block) return coroutine.joinBlocking() }
看一下
eventLoop
的初始化,會(huì) 在當(dāng)前線程(主線程)創(chuàng)建BlockingEventLoop對(duì)象缠导。internal val eventLoop: EventLoop get() = ref.get() ?: createEventLoop().also { ref.set(it) } internal actual fun createEventLoop(): EventLoop = BlockingEventLoop(Thread.currentThread())
看一下
newContext
初始化,這里會(huì)對(duì)攜程上下文進(jìn)行組合廉羔,返回新的上下文。最后返回的是一個(gè)BlockingEventLoop對(duì)象僻造。public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext { val combined = coroutineContext + context val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null) debug + Dispatchers.Default else debug }
開(kāi)始對(duì)攜程進(jìn)行調(diào)度
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
看一下執(zhí)行這句代碼之前蜜另,各變量的值
111.png
而上面的代碼最終調(diào)用的是CoroutineStart.DEFAULT
的invoke
方法。public operator fun <T> invoke(block: suspend () -> T, completion: Continuation<T>): Unit = when (this) { DEFAULT -> block.startCoroutineCancellable(completion) ATOMIC -> block.startCoroutine(completion) UNDISPATCHED -> block.startCoroutineUndispatched(completion) LAZY -> Unit // will start lazily }
我們使用的是
DEFAULT
啟動(dòng)模式嫡意。然后會(huì)執(zhí)行resumeCancellableWith
方法。inline fun resumeCancellableWith( result: Result<T>, noinline onCancellation: ((cause: Throwable) -> Unit)? ) { val state = result.toState(onCancellation) if (dispatcher.isDispatchNeeded(context)) { _state = state resumeMode = MODE_CANCELLABLE dispatcher.dispatch(context, this) } else { executeUnconfined(state, MODE_CANCELLABLE) { if (!resumeCancelled(state)) { resumeUndispatchedWith(result) } } } }
dispatcher
是BlockingEventLoop
對(duì)象捣辆,沒(méi)有重寫(xiě)isDispatchNeeded
蔬螟,默認(rèn)返回true。然后調(diào)用dispatch
繼續(xù)進(jìn)行分發(fā)汽畴。BlockingEventLoop
繼承了EventLoopImplBase
并調(diào)用其dispatch
方法旧巾。把任務(wù)加入到隊(duì)列中。public final override fun dispatch(context: CoroutineContext, block: Runnable) = enqueue(block)
回到最開(kāi)始忍些,在
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
執(zhí)行完鲁猩,還執(zhí)行了coroutine.joinBlocking()
看一下實(shí)現(xiàn)。fun joinBlocking(): T { registerTimeLoopThread() try { eventLoop?.incrementUseCount() try { while (true) { @Suppress("DEPRECATION") if (Thread.interrupted()) throw InterruptedException().also { cancelCoroutine(it) } val parkNanos = eventLoop?.processNextEvent() ?: Long.MAX_VALUE // note: process next even may loose unpark flag, so check if completed before parking if (isCompleted) break parkNanos(this, parkNanos) } } finally { // paranoia eventLoop?.decrementUseCount() } } finally { // paranoia unregisterTimeLoopThread() } // now return result val state = this.state.unboxState() (state as? CompletedExceptionally)?.let { throw it.cause } return state as T }
執(zhí)行
val parkNanos = eventLoop?.processNextEvent() ?: Long.MAX_VALUE
罢坝,取出任務(wù)進(jìn)行執(zhí)行廓握,也就是runBlocking
攜程體。 -
launch {}
執(zhí)行流程public fun CoroutineScope.launch( context: CoroutineContext = EmptyCoroutineContext, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope.() -> Unit ): Job { val newContext = newCoroutineContext(context) val coroutine = if (start.isLazy) LazyStandaloneCoroutine(newContext, block) else StandaloneCoroutine(newContext, active = true) coroutine.start(start, coroutine, block) return coroutine }
因?yàn)?code>launch是直接在
runBlocking(父攜程體)
里新的創(chuàng)建的子攜程體嘁酿,所以執(zhí)行流程上和之前將的差不多隙券,只不過(guò)不會(huì)像runBlocking
再去創(chuàng)建BlockingEventLoop
對(duì)象,而是直接用runBlocking(父攜程體)
的闹司,然后把任務(wù)加到里面娱仔,所以通過(guò)這種方式其實(shí)就是單線程對(duì)任務(wù)的調(diào)度
而已。所以在runBlocking(父攜程體)
內(nèi)通過(guò)launch
啟動(dòng)再多的攜程體游桩,其實(shí)都是在同一線程牲迫,按照任務(wù)
隊(duì)列的順序執(zhí)行的耐朴。
根據(jù)上面日志輸出,并沒(méi)有先執(zhí)行兩個(gè)
launch
攜程體盹憎,這是為什么呢筛峭,根據(jù)上面的講解,應(yīng)用知道脚乡,runBlocking(父攜程體)
是第一被添加的隊(duì)列的任務(wù)蜒滩,其次是launch
,所以是這樣的順序奶稠。那可以讓launch
立即執(zhí)行嗎俯艰?答案是可以的,這就要說(shuō)攜程的啟動(dòng)模式了锌订。
-
CoroutineStart 是協(xié)程的啟動(dòng)模式
竹握,存在以下4種模式:- DEFAULT 立即調(diào)度,可以在執(zhí)行前被取消
- LAZY 需要時(shí)才啟動(dòng)辆飘,需要start啦辐、join等函數(shù)觸發(fā)才可進(jìn)行調(diào)度
- ATOMIC 立即調(diào)度,協(xié)程肯定會(huì)執(zhí)行蜈项,執(zhí)行前不可以被取消
- UNDISPATCHED 立即在當(dāng)前線程執(zhí)行芹关,直到遇到第一個(gè)掛起點(diǎn)(可能切線程)
我們使用
UNDISPATCHED
就可以使攜程體馬上在當(dāng)前線程執(zhí)行〗糇洌看一下是怎么實(shí)現(xiàn)的侥衬。看一下實(shí)現(xiàn):
使用這種啟動(dòng)模式執(zhí)行UNDISPATCHED -> block.startCoroutineUndispatched(completion)
方法跑芳。
internal fun <T> (suspend () -> T).startCoroutineUndispatched(completion: Continuation<T>) {
startDirect(completion) { actualCompletion ->
withCoroutineContext(completion.context, null) {
startCoroutineUninterceptedOrReturn(actualCompletion)
}
}
}
大家可以自己點(diǎn)擊去看一下轴总,大概就是會(huì)立即執(zhí)行攜程體,而不是將任務(wù)放入隊(duì)列博个。
但是
GlobalScope.launch
卻不是按照這樣的邏輯怀樟,這是因?yàn)?code>GlobalScope.launch啟動(dòng)的全局?jǐn)y程,是一個(gè)獨(dú)立的攜程體了盆佣,并不是runBlocking(父攜程體)
子攜程往堡。看一下通過(guò)GlobalScope.launch
有什么不同罪塔。
-
GlobalScope.launch執(zhí)行流程
- 啟動(dòng)全局?jǐn)y程
GlobalScope.launch
newCoroutineContext(context)
返回Dispatchers.Default
對(duì)象投蝉。而DefaultScheduler繼承了ExperimentalCoroutineDispatcher類≌骺埃看一下ExperimentalCoroutineDispatcher
中的dispatch
代碼:
看一下override fun dispatch(context: CoroutineContext, block: Runnable): Unit = ... coroutineScheduler.dispatch(block) ...
coroutineScheduler
初始化private fun createScheduler() = CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName)
CoroutineScheduler
實(shí)現(xiàn)了Executor
接口瘩缆,里面還有兩個(gè)全局隊(duì)列和線程池相關(guān)的參數(shù)。
繼續(xù)調(diào)用@JvmField val globalCpuQueue = GlobalQueue() @JvmField val globalBlockingQueue = GlobalQueue()
CoroutineScheduler
中的dispatch
方法fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false) { trackTask() // this is needed for virtual time support val task = createTask(block, taskContext) // try to submit the task to the local queue and act depending on the result val currentWorker = currentWorker() val notAdded = currentWorker.submitToLocalQueue(task, tailDispatch) if (notAdded != null) { if (!addToGlobalQueue(notAdded)) { // Global queue is closed in the last step of close/shutdown -- no more tasks should be accepted throw RejectedExecutionException("$schedulerName was terminated") } } val skipUnpark = tailDispatch && currentWorker != null // Checking 'task' instead of 'notAdded' is completely okay if (task.mode == TASK_NON_BLOCKING) { if (skipUnpark) return signalCpuWork() } else { // Increment blocking tasks anyway signalBlockingWork(skipUnpark = skipUnpark) } }
-
val task = createTask(block, taskContext)
包裝成TaskImpl
對(duì)象佃蚜。 -
val currentWorker = currentWorker()
當(dāng)前是主線程庸娱,運(yùn)行程序時(shí)由進(jìn)程創(chuàng)建着绊,肯定不是Worker
對(duì)象,Worker
是一個(gè)繼承了Thread
的類 ,并且在初始化時(shí)都指定為守護(hù)線程
熟尉。Worker存在5種狀態(tài): CPU_ACQUIRED 獲取到cpu權(quán)限 BLOCKING 正在執(zhí)行IO阻塞任務(wù) PARKING 已處理完所有任務(wù)归露,線程掛起 DORMANT 初始態(tài) TERMINATED 終止態(tài)
-
val notAdded = currentWorker.submitToLocalQueue(task, tailDispatch)
由于currentWorker是null,直接返回task
對(duì)象斤儿。 -
addToGlobalQueue(notAdded)
根據(jù)任務(wù)是否是阻塞任務(wù)剧包,將task
添加到全局任務(wù)隊(duì)列中。這里被添加到globalCpuQueue
中往果。 - 執(zhí)行
signalCpuWork()
來(lái)喚醒一個(gè)線程或者啟動(dòng)一個(gè)新的線程疆液。
fun signalCpuWork() {
if (tryUnpark()) return
if (tryCreateWorker()) return
tryUnpark()
}
private fun tryCreateWorker(state: Long = controlState.value): Boolean {
val created = createdWorkers(state)// 創(chuàng)建的的線程總數(shù)
val blocking = blockingTasks(state)// 處理阻塞任務(wù)的線程數(shù)量
val cpuWorkers = (created - blocking).coerceAtLeast(0)//得到非阻塞任務(wù)的線程數(shù)量
if (cpuWorkers < corePoolSize) {// 小于核心線程數(shù)量,進(jìn)行線程的創(chuàng)建
val newCpuWorkers = createNewWorker()
if (newCpuWorkers == 1 && corePoolSize > 1) createNewWorker()// 當(dāng)前非阻塞型線程數(shù)量為1陕贮,同時(shí)核心線程數(shù)量大于1時(shí)堕油,再進(jìn)行一個(gè)線程的創(chuàng)建,
if (newCpuWorkers > 0) return true
}
return false
}
// 創(chuàng)建線程
private fun createNewWorker(): Int {
synchronized(workers) {
...
val created = createdWorkers(state)// 創(chuàng)建的的線程總數(shù)
val blocking = blockingTasks(state)// 阻塞的線程數(shù)量
val cpuWorkers = (created - blocking).coerceAtLeast(0) // 得到非阻塞線程數(shù)量
if (cpuWorkers >= corePoolSize) return 0//超過(guò)最大核心線程數(shù)肮之,不能進(jìn)行新線程創(chuàng)建
if (created >= maxPoolSize) return 0// 超過(guò)最大線程數(shù)限制掉缺,不能進(jìn)行新線程創(chuàng)建
...
val worker = Worker(newIndex)
workers[newIndex] = worker
require(newIndex == incrementCreatedWorkers())
worker.start()// 線程啟動(dòng)
return cpuWorkers + 1
}
}
那么這里面的任務(wù)又是怎么調(diào)度的呢,當(dāng)全局任務(wù)被執(zhí)行的時(shí)候戈擒,看一下Worker
中的run
方法:
override fun run() = runWorker()
執(zhí)行runWorker
方法眶明,該方法會(huì)從隊(duì)列中找到執(zhí)行任務(wù),然后開(kāi)始執(zhí)行筐高。詳細(xì)代碼赘来,可以自行翻閱。
所以
GlobalScope.launch
使用的就是線程池凯傲,沒(méi)有所謂的性能好。
-
Dispatchers調(diào)度器
Dispatchers是協(xié)程中提供的線程調(diào)度器嗦篱,用來(lái)切換線程冰单,指定協(xié)程所運(yùn)行的線程。灸促,上面用的是默認(rèn)調(diào)度器Dispatchers.Default
诫欠。
Dispatchers中提供了4種類型調(diào)度器:
Default 默認(rèn)調(diào)度器
:適合CPU密集型任務(wù)調(diào)度器 比如邏輯計(jì)算;
Main UI調(diào)度器
Unconfined 無(wú)限制調(diào)度器
:對(duì)協(xié)程執(zhí)行的線程不做限制浴栽,協(xié)程恢復(fù)時(shí)可以在任意線程荒叼;
IO調(diào)度器
:適合IO密集型任務(wù)調(diào)度器 比如讀寫(xiě)文件,網(wǎng)絡(luò)請(qǐng)求等典鸡。