簡(jiǎn)介
這片文章主要講解kotlin
中delay
函數(shù)的實(shí)現(xiàn)原理谦炬,delay
是一個(gè)掛起函數(shù)。kotlin攜程使用過(guò)程中,經(jīng)常使用到掛起函數(shù)黔州,在我學(xué)習(xí)kotlin攜程
的時(shí)候耍鬓,一些現(xiàn)象讓我很是困惑,所以打算從源碼角度來(lái)逐一分析流妻。
說(shuō)明
在分析delay
源碼實(shí)現(xiàn)過(guò)程中牲蜀,由于對(duì)kotlin有些語(yǔ)法還不是很熟悉,所以并不會(huì)把每一步將得很透徹绅这,只會(huì)梳理一個(gè)大致的流程涣达,如果講解有誤的地方,歡迎指出证薇。
例子先行
fun main() = runBlocking {
println("${treadName()}======start")
launch {
println("${treadName()}======delay 1s start")
delay(1000)
println("${treadName()}======delay 1s end")
}
println("${treadName()}======delay 3s start")
delay(3000)
println("${treadName()}======delay 3s end")
// 延遲度苔,保活進(jìn)程
Thread.sleep(500000)
}
輸出如下:
main======start
main======delay 3s start
main======delay 1s start
main======delay 1s end
main======delay 3s end
根據(jù)日志可以看出:
- 日志輸出環(huán)境是在主線程浑度。
- 執(zhí)行3s延遲函數(shù)后寇窑,切換到了
launch
攜程體執(zhí)行。 - delay掛起函數(shù)恢復(fù)后執(zhí)行各自的打印函數(shù)箩张。
如果真像打印日志輸出一樣甩骏,所以的操作都是在一個(gè)線程(主線程)完成,那么問(wèn)題來(lái)了伏钠。第一:
按照J(rèn)ava線程知識(shí)横漏,單線程執(zhí)行是按照順序的,是單條線的熟掂。那么不管delay
里是何等騷操作缎浇,只要沒(méi)有重新起線程,應(yīng)該不能夠像上面輸入的那樣吧赴肚,你說(shuō)sleep
素跺,wait
,如果你這么想,那么你可以去補(bǔ)一補(bǔ)Java多線程基礎(chǔ)知識(shí)了誉券。猜想
:1. 難得真有什么我不知道的騷操作可以在一個(gè)線程里面同時(shí)執(zhí)行delay
和其它代碼指厌,真像很多人說(shuō)的,攜程性能很好踊跟,使用掛起函數(shù)可以不用啟動(dòng)新的線程踩验,就可以異步執(zhí)行,那真的就很不錯(cuò)
商玫。2.delay
啟動(dòng)了新的線程箕憾,上面的現(xiàn)象只不過(guò)是進(jìn)行了線程切換,那么如果多次調(diào)用delay
那么豈不是要?jiǎng)?chuàng)建很多線程拳昌,這性能問(wèn)題和資源問(wèn)題怎么解決袭异。3.delay
基于某種任務(wù)調(diào)度策略。
delay源碼
public suspend inline fun <T> suspendCancellableCoroutine(
crossinline block: (CancellableContinuation<T>) -> Unit
): T =
suspendCoroutineUninterceptedOrReturn { uCont ->
val cancellable = CancellableContinuationImpl(uCont.intercepted(), resumeMode = MODE_CANCELLABLE)
cancellable.initCancellability()
block(cancellable)
cancellable.getResult()
}
cancellable
是一個(gè)CancellableContinuationImpl
對(duì)象炬藤,執(zhí)行 block(cancellable)御铃,回到下面函數(shù)碴里。
public suspend fun delay(timeMillis: Long) {
if (timeMillis <= 0) return // don't delay
return suspendCancellableCoroutine sc@ { cont: CancellableContinuation<Unit> ->
// if timeMillis == Long.MAX_VALUE then just wait forever like awaitCancellation, don't schedule.
if (timeMillis < Long.MAX_VALUE) {
cont.context.delay.scheduleResumeAfterDelay(timeMillis, cont)
}
}
}
看一下cont.context.delay
的get
方法
internal val CoroutineContext.delay: Delay get() = get(ContinuationInterceptor) as? Delay ?: DefaultDelay
如果get(ContinuationInterceptor)
是Delay
類(lèi)型對(duì)象,那么直接返回該對(duì)象上真,如果不是返回DefaultDelay
變量咬腋,看一下DefaultDelay
初始化可以知道,它是一個(gè)DefaultExecutor
對(duì)象睡互,繼承了EventLoopImplBase
類(lèi)帝火。
runBlocking
執(zhí)行過(guò)程中有這樣一行代碼createCoroutineUnintercepted(receiver, completion).intercepted()
會(huì)被ContinuationInterceptor
進(jìn)行包裝。所以上面cont.context.delay
返回的就是被包裝的攜程體上下文湃缎。
查看scheduleResumeAfterDelay方法。
public override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
val timeNanos = delayToNanos(timeMillis)
if (timeNanos < MAX_DELAY_NS) {
val now = nanoTime()
DelayedResumeTask(now + timeNanos, continuation).also { task ->
continuation.disposeOnCancellation(task)
schedule(now, task)
}
}
}
創(chuàng)建DelayedResumeTask對(duì)象蠢壹,在also執(zhí)行相關(guān)計(jì)劃任務(wù)嗓违,看一下schedule
方法。
public fun schedule(now: Long, delayedTask: DelayedTask) {
when (scheduleImpl(now, delayedTask)) {
SCHEDULE_OK -> if (shouldUnpark(delayedTask)) unpark()
SCHEDULE_COMPLETED -> reschedule(now, delayedTask)
SCHEDULE_DISPOSED -> {} // do nothing -- task was already disposed
else -> error("unexpected result")
}
}
這里返回SCHEDULE_OK
,執(zhí)行unpark
函數(shù)图贸,這里用到了Java提供的LockSupport
線程操作相關(guān)知識(shí)蹂季。
讀取線程
val thread = thread
-
如果delay是當(dāng)前攜程的上下文
那么把延時(shí)任務(wù)加入到隊(duì)列后,那么又是怎么達(dá)到線程延遲呢疏日〕ソ啵回到runBlocking
執(zhí)行流程,會(huì)執(zhí)行coroutine.joinBlocking()
這樣一行代碼沟优。fun joinBlocking(): T { registerTimeLoopThread() try { eventLoop?.incrementUseCount() try { while (true) { @Suppress("DEPRECATION") if (Thread.interrupted()) throw InterruptedException().also { cancelCoroutine(it) } val parkNanos = eventLoop?.processNextEvent() ?: Long.MAX_VALUE // note: process next even may loose unpark flag, so check if completed before parking if (isCompleted) break parkNanos(this, parkNanos) } } finally { // paranoia eventLoop?.decrementUseCount() } } finally { // paranoia unregisterTimeLoopThread() } // now return result val state = this.state.unboxState() (state as? CompletedExceptionally)?.let { throw it.cause } return state as T }
執(zhí)行:
val parkNanos = eventLoop?.processNextEvent() ?: Long.MAX_VALUE
看一下
processNextEvent
override fun processNextEvent(): Long { // unconfined events take priority if (processUnconfinedEvent()) return 0 // queue all delayed tasks that are due to be executed val delayed = _delayed.value if (delayed != null && !delayed.isEmpty) { val now = nanoTime() while (true) { delayed.removeFirstIf { if (it.timeToExecute(now)) { enqueueImpl(it) } else false } ?: break // quit loop when nothing more to remove or enqueueImpl returns false on "isComplete" } } // then process one event from queue val task = dequeue() if (task != null) { task.run() return 0 } return nextTime }
從延遲隊(duì)列取任務(wù)
val delayed = _delayed.value
掛起當(dāng)前線程
parkNanos(this, parkNanos)
這里是一個(gè)
while
循環(huán)涕滋,當(dāng)掛起時(shí)間到,線程喚醒挠阁,繼續(xù)從任務(wù)隊(duì)列中取任務(wù)執(zhí)行宾肺。如果還是延遲任務(wù),這根據(jù)當(dāng)前時(shí)間點(diǎn)侵俗,計(jì)算線程需要掛起的時(shí)間锨用,這也是為什么多個(gè)延遲任務(wù)好像是同時(shí)執(zhí)行的。 -
如果delay是DefaultExecutor
比如這個(gè)例子:攜程上下文沒(méi)有像CoroutineStart.DEFAULT
那樣進(jìn)行包裝隘谣。fun main() { GlobalScope.launch(start = CoroutineStart.UNDISPATCHED){ println("${treadName()}======我開(kāi)始執(zhí)行了~") delay(1000) println("${treadName()}======全局?jǐn)y程~") } println("${treadName()}======我要睡覺(jué)~") Thread.sleep(3000) }
然后調(diào)用
DefaultExecutor
類(lèi)中thread的get
方法:override val thread: Thread get() = _thread ?: createThreadSync()
看一下
createThreadSync
函數(shù)private fun createThreadSync(): Thread { return _thread ?: Thread(this, THREAD_NAME).apply { _thread = this isDaemon = true start() } }
創(chuàng)建一個(gè)叫
"kotlinx.coroutines.DefaultExecutor
的新線程增拥,并且開(kāi)始運(yùn)行。這時(shí)候會(huì)執(zhí)行DefaultExecutor
中的run
方法寻歧。在run
方法中有這樣一行代碼:parkNanos(this, parkNanos)
點(diǎn)進(jìn)去看看:
internal inline fun parkNanos(blocker: Any, nanos: Long) { timeSource?.parkNanos(blocker, nanos) ?: LockSupport.parkNanos(blocker, nanos) }
調(diào)用Java提供的LockSupport.parkNanos(blocker, nanos)方法掌栅,阻塞當(dāng)前線程,實(shí)現(xiàn)掛起熄求,當(dāng)達(dá)到阻塞的時(shí)間渣玲,恢復(fù)線程執(zhí)行。
查看進(jìn)行中線程情況方法
fun main() {
println("${treadName()}======doSuspendTwo")
Thread.sleep(500000)
}
運(yùn)行main
弟晚,通過(guò)命令jps
找到對(duì)應(yīng)Java進(jìn)程(沒(méi)有特別指定忘衍,進(jìn)程名為文件名)號(hào)逾苫。
...
3406 KotlinCoreutinesSuspendKt
...
執(zhí)行jstack 進(jìn)程號(hào)
查看進(jìn)程對(duì)應(yīng)的線程資源。