kotlin 協(xié)成系列文章:
你真的了解kotlin中協(xié)程的suspendCoroutine原理嗎?
Kotlin Channel系列(一)之讀懂Channel每一行源碼
kotlin Flow系列之-冷流SafeFlow源碼解析之 - Safe在那里谨设?
kotlin Flow系列之-SharedFlow源碼解析Kotlin Flow系列之-ChannelFlow源碼解析之 -操作符 buffer & fuse & flowOn線程切換
引言:在Kotlin協(xié)程中,如何讓一個(gè)suspned 函數(shù)掛起宅粥?如何讓掛起協(xié)程恢復(fù)琉雳?想必使用過協(xié)程的同學(xué)都非常清楚那就是調(diào)用suspendCoroutine
或者suspendCancellableCoroutine
。使用了這么久,你真的知道他們是怎么回事嗎?.
注:本文源碼班基于kotlin 1.7.10
什么是協(xié)程
先簡要回顧一下什么是協(xié)程紫谷?我們通過協(xié)程最基礎(chǔ)的API createCoroutine
可以創(chuàng)建一個(gè)協(xié)程,然后在調(diào)用resume
開啟協(xié)程齐饮,startCoroutine
創(chuàng)建并直接開啟協(xié)程捐寥。像launch
,async
等這些框架API是在基礎(chǔ)API上的再次封裝,讓我們?cè)趧?chuàng)建和使用協(xié)程時(shí)變得更為方便祖驱。協(xié)程是由一個(gè) suspend
函數(shù)創(chuàng)建出來的,我們來看一個(gè)最原始的創(chuàng)建協(xié)程方式:
//第一步創(chuàng)建一個(gè)suspend 函數(shù)
val suspendFun : suspend () -> Unit = {
//TODO 這里面寫上協(xié)程要執(zhí)行的代碼握恳,也被稱之為協(xié)程體
}
//第二步創(chuàng)建一個(gè)協(xié)程,并傳一個(gè)協(xié)程執(zhí)行完成后的回調(diào)
val continuation = suspendFun.createCoroutine(object :Continuation<Unit>{
override val context: CoroutineContext
get() = EmptyCoroutineContext
//協(xié)程執(zhí)行完成后捺僻,會(huì)回調(diào)該方法乡洼,result代表了協(xié)程的結(jié)果,如果沒有返回值就是Unit,如果協(xié)程體里面發(fā)生異常
//result里面包含有異常信息
override fun resumeWith(result: Result<Unit>) {
println("協(xié)程執(zhí)行完畢")
}
})
//第三步開啟一個(gè)協(xié)程
continuation.resume(Unit)
被創(chuàng)建出來的協(xié)程continuation
到底是一個(gè)什么東西呢?通過createCoroutine
源碼發(fā)現(xiàn):
@SinceKotlin("1.3")
@Suppress("UNCHECKED_CAST")
public fun <R, T> (suspend R.() -> T).createCoroutine(
receiver: R,
completion: Continuation<T>
): Continuation<Unit> =
SafeContinuation(createCoroutineUnintercepted(receiver, completion).intercepted(), COROUTINE_SUSPENDED)
這里面做了三件事:
第一:createCoroutineUnintercepted(receiver, completion)
創(chuàng)建了一個(gè)協(xié)程匕坯,”Unintercepted“說明了創(chuàng)建出來的這個(gè)協(xié)程是不被調(diào)度器協(xié)程(DispatchedContinuation
)所包含或者代理的束昵,這個(gè)就是我們真正執(zhí)行代碼的協(xié)程,我把它稱之為原始協(xié)程葛峻。
第二:調(diào)用原始協(xié)程的intercepted()
锹雏,該方法會(huì)通過我們?cè)趧?chuàng)建協(xié)程時(shí)指定的調(diào)度器創(chuàng)建一個(gè)DispatchedContinuation
,它里面包含了原始協(xié)程和調(diào)度器术奖,如果我們沒有指定調(diào)度器intercepted()
返回原始協(xié)程自己礁遵。
第三步:創(chuàng)建一個(gè)SafeContinuation
,它持有了intercepted()
返回的對(duì)象,設(shè)置了調(diào)度器就是DispatchedContinuation
采记,沒有設(shè)置就是原始協(xié)程佣耐。
原始協(xié)程是被createCoroutineUnintercepted
創(chuàng)建出來的,那到底創(chuàng)建出來的是一個(gè)什么東西呢唧龄?
public actual fun <T> (suspend () -> T).createCoroutineUnintercepted(
completion: Continuation<T>
): Continuation<Unit> {
val probeCompletion = probeCoroutineCreated(completion)
return if (this is BaseContinuationImpl)
create(probeCompletion)
else
createCoroutineFromSuspendFunction(probeCompletion) {
(this as Function1<Continuation<T>, Any?>).invoke(it)
}
}
在createCoroutineUnintercepted
里面this
是該函數(shù)的接收者是一個(gè)suspend () -> T
,其實(shí)就是我們?cè)谏厦娑x的suspned
函數(shù):
val suspendFun : suspend () -> Unit = {
//TODO 這里面寫上協(xié)程要執(zhí)行的代碼兼砖,也被稱之為協(xié)程體
}
在kotlin協(xié)程中,suspend () -> T
函數(shù)類型的父類都是SuspendLambda
。而SuspendLambda
又繼承至ContinuationImpl
讽挟,ContinuationImpl
又繼承至BaseContinuationImpl
然走,BaseContinuationImpl
又實(shí)現(xiàn)了Continuation
接口。因此在createCoroutineUnintercepted
會(huì)調(diào)用suspend () -> T
的create
函數(shù)來創(chuàng)建我們的原始協(xié)程戏挡。create
函數(shù)定義在什么地方呢芍瑞?是父類SuspendLambda
中還是ContinuationImpl
或者還是BaseContinuationImpl
中呢?它里面又是如何實(shí)現(xiàn)呢褐墅?都不是拆檬,craete
函數(shù)編譯器為我們生成的。當(dāng)我們?cè)诖a定義一個(gè)suspend
函數(shù)類型的時(shí)候(注意是函數(shù)類型不是suspend
函數(shù))編譯器會(huì)為我們生成一個(gè)類妥凳,該類繼承至SuspendLambda
竟贯,把我們?cè)疽獔?zhí)行的代碼(協(xié)程體代碼)給放到一個(gè)叫invokeSuspend
函數(shù)中,并且為該類生成create
函數(shù)逝钥,在create
函數(shù)中new
一個(gè)該類的實(shí)例對(duì)象返屑那。
如果有對(duì)這個(gè)比較感興趣的同學(xué)可以在IDEA中把kotlin代碼編譯后的字節(jié)碼轉(zhuǎn)成java代碼查看。
到此我們知道了我們的原始協(xié)程原來是一個(gè)由kotlin編譯器為我們生成的一個(gè)繼承了SuspendLambda的子類艘款。知道了原始協(xié)程為何物后持际,協(xié)程是如何開啟的呢?是怎么執(zhí)行到我們協(xié)程體代碼里面的呢哗咆?在前面說過蜘欲,編譯器會(huì)把我們協(xié)程體要執(zhí)行的代碼放到生成的類中的invokeSuspend
函數(shù)中,因此我們只需要知道什么時(shí)候調(diào)用invokeSuspend
就行晌柬。在上面代碼中的第三步調(diào)用了SafeContinuation
的resume
函數(shù)姥份,SafeContinuation
中會(huì)調(diào)用DispatchedContinuation
的resumeWith
函數(shù),在DispatchedContinuation
中又會(huì)通過調(diào)度器去調(diào)用原始協(xié)程的resumeWith
函數(shù)年碘。原始協(xié)程的resumeWitht
函數(shù)在BaseContinuationImpl
中定義的:
internal abstract class BaseContinuationImpl(
public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {
public final override fun resumeWith(result: Result<Any?>) {
var current = this
var param = result
while (true) {
probeCoroutineResumed(current)
with(current) {
val completion = completion!!
val outcome: Result<Any?> =
try {
//在這里調(diào)用了編譯器生成的類的invokeSuspend澈歉,
//invokeSuspend中就是協(xié)程體的代碼
val outcome = invokeSuspend(param)
if (outcome === COROUTINE_SUSPENDED) return
Result.success(outcome)
} catch (exception: Throwable) {
Result.failure(exception)
}
releaseIntercepted()
if (completion is BaseContinuationImpl) {
current = completion
param = outcome
} else {
completion.resumeWith(outcome)
return
}
}
}
}
xxxxx
}
在BaseContinuationImpl
的resumeWith
中調(diào)用了invokeSuspend
,這樣就執(zhí)行到我們自己寫的協(xié)程體要執(zhí)行的代碼里面了屿衅。
回顧了一大堆埃难,是想說明一個(gè)事情,不管是協(xié)程第一次執(zhí)行傲诵,還是后面協(xié)程從掛起函數(shù)恢復(fù)都要調(diào)用我們?cè)紖f(xié)程的
resumeWith
函數(shù)才行凯砍。協(xié)程內(nèi)部的執(zhí)行(invokeSuspend
內(nèi)部)是一個(gè)狀態(tài)機(jī),每一次調(diào)用invokeSuspend
都會(huì)給狀態(tài)機(jī)設(shè)置一個(gè)不同的狀態(tài)拴竹,使其執(zhí)行invokeSuspend
中不同分支的代碼悟衩。至于協(xié)程狀態(tài)機(jī)的原理不在本文討論之中,不然就偏題了栓拜。
我們什么時(shí)候需掛起協(xié)程座泳?被掛起的協(xié)程什么時(shí)候恢復(fù)惠昔?當(dāng)我們不能立馬返回結(jié)果的時(shí)候,需要把協(xié)程掛起挑势,等結(jié)果準(zhǔn)備好了后通過調(diào)用協(xié)程的resume
函數(shù)進(jìn)行恢復(fù)镇防。那協(xié)程怎樣才能掛起呢?答案就是在suspend
函數(shù)中返回一個(gè)標(biāo)識(shí)(COROUTINE_SUSPENDED
)潮饱,當(dāng)協(xié)程看到這個(gè)標(biāo)識(shí)后就知道協(xié)程需要被掛起了来氧,恢復(fù)協(xié)程的時(shí)候需要調(diào)用協(xié)程的resume
函數(shù),那我么怎么才能在suspend
函數(shù)中拿到協(xié)程這個(gè)對(duì)象呢香拉?只有拿到協(xié)程這個(gè)對(duì)象才能調(diào)用其resume
函數(shù)啦扬。說到這里,想必很多同學(xué)都知道調(diào)用suspendCoroutine
函數(shù)啊凫碌,對(duì)扑毡,沒錯(cuò),當(dāng)我們需要把我們的suspend
函數(shù)掛起的盛险,稍后再恢復(fù)的時(shí)候瞄摊,我們可以有三種方式:
- suspendCoroutine
- suspendCancellableCoroutine
- suspendCoroutineUninterceptedOrReturn
其中suspendCoroutine
和 suspendCancellableCoroutine
兩個(gè)內(nèi)部都是調(diào)用了suspendCoroutineUninterceptedOrReturn
來實(shí)現(xiàn):
public suspend inline fun <T> suspendCoroutine(crossinline block: (Continuation<T>) -> Unit): T {
contract { callsInPlace(block, InvocationKind.EXACTLY_ONCE) }
//直接調(diào)用了suspendCoroutineUninterceptedOrReturn
return suspendCoroutineUninterceptedOrReturn { c: Continuation<T> ->
val safe = SafeContinuation(c.intercepted())
block(safe)
safe.getOrThrow()
}
}
public suspend inline fun <T> suspendCancellableCoroutine(
crossinline block: (CancellableContinuation<T>) -> Unit
): T = suspendCoroutineUninterceptedOrReturn { uCont -> //同樣的直接調(diào)用
val cancellable = CancellableContinuationImpl(uCont.intercepted(), resumeMode = MODE_CANCELLABLE)
cancellable.initCancellability()
block(cancellable)
cancellable.getResult()
}
當(dāng)我們想去看suspendCoroutineUninterceptedOrReturn
的源碼的時(shí)候,發(fā)現(xiàn)無論如何都找不到源碼了苦掘,很正常换帜,因?yàn)?code>suspendCoroutineUninterceptedOrReturn是有編譯器在編譯代碼的時(shí)時(shí)候生成的。所以你找不到很正常鸟蜡。因此搞懂suspendCoroutineUninterceptedOrReturn
的前提下再回過頭來看另外兩個(gè)就簡單了膜赃。
suspendCoroutineUninterceptedOrReturn
佛說世間萬物皆有其因果,任何一個(gè)東西的誕生都是有其原因的揉忘,在日常開發(fā)中,我們經(jīng)常有這樣的需求端铛,調(diào)用一個(gè)函數(shù)獲得一個(gè)想要的數(shù)據(jù)泣矛,有時(shí)候這個(gè)數(shù)據(jù)不能立馬獲得,比如本地緩存沒有禾蚕,需要從網(wǎng)絡(luò)下載您朽,這時(shí)候就需要把協(xié)程掛起,等數(shù)據(jù)回來了后再把協(xié)程恢復(fù)换淆,如果本地有就直接返回哗总,簡而言之就是有可能會(huì)掛起,也有可能不會(huì)掛起直接返回結(jié)果倍试。因此要讓一個(gè)suspend
函數(shù)能滿足這種要求讯屈,那需要具備兩個(gè)條件:1.我們需要再suspend
函數(shù)中拿到協(xié)程對(duì)象,用于恢復(fù)協(xié)程的時(shí)候使用县习,2.suspend
函數(shù)的返回值只能是Any
類型涮母,因?yàn)閽炱鸬臅r(shí)候返回COROUTINE_SUSPENDED
,不需要掛起的事后返回?cái)?shù)據(jù)真實(shí)的類型谆趾。
針對(duì)條件一,我們知道每一個(gè)增加了suspned
關(guān)鍵字標(biāo)識(shí)的函數(shù)在編譯后叛本,函數(shù)參數(shù)中都會(huì)多一個(gè)Continuation
的參數(shù)沪蓬。這個(gè)參數(shù)就是我們的原始協(xié)程,但問題的關(guān)鍵是我們是在編碼階段的時(shí)候需要拿到協(xié)程来候。所以條件一靠我們自己是搞不定的跷叉。
針對(duì)條件2,雖然比較好滿足营搅,在我們定義函數(shù)的時(shí)候性芬,把返回值改成Any
即可,但是也同樣帶來一個(gè)問題剧防,獲得該函數(shù)的結(jié)果后植锉,我們還需要人為去判斷然后轉(zhuǎn)換成我們需要的數(shù)據(jù)類型,如果真實(shí)這樣峭拘,協(xié)程的代碼得有多惡心俊庇,那我想估計(jì)沒幾個(gè)人愿意使用kotlin協(xié)程了。
于是這些kotlin的天才們想了一個(gè)招鸡挠,他們說辉饱,你不是想在suspend
函數(shù)中要拿到協(xié)程對(duì)象嗎?我有啊拣展,我給你彭沼,你只要按照我的要求你隨便定義一個(gè)函數(shù)類型的變量,或者重新再寫一個(gè)非susnpend
函數(shù)备埃。
如果是定義一個(gè)變量: 那么這個(gè)變量的類型必須為一個(gè)函數(shù)類型Continuation<T>) -> Any?
,該函數(shù)接收一個(gè)Continuation
作為參數(shù)姓惑,泛型參數(shù)T
代表了真正需要返回真實(shí)數(shù)據(jù)類型,函數(shù)返回值類型為Any
按脚,給這個(gè)變量賦值一個(gè)lambda
表達(dá)式于毙,把你原本要寫在suspend
函數(shù)的代碼放在這個(gè)lambda
表達(dá)式里面。
如果是定義一個(gè)非suspend
函數(shù):那么這個(gè)函數(shù)的類型同樣為Continuation<T>) -> Any?
,你把原來要寫在suspend
函數(shù)里面的代碼放在這個(gè)非suspend
函數(shù)里面辅搬。
上面兩種方式唯沮,其本質(zhì)是一樣的,都是一個(gè)函數(shù)類型堪遂,定義好后介蛉,kotlin說我給你提供一個(gè)叫某某
的函數(shù),我這個(gè)某某
函數(shù)接收一個(gè)函數(shù)類型為``Continuation<T>) -> Any?的參數(shù)溶褪,我這個(gè)函數(shù)的返回值為泛型
T(代表了你要的結(jié)果的真實(shí)類型)币旧。你在你需要掛起的
suspned中直接調(diào)用我們這個(gè)
某某函數(shù),把你定義的函數(shù)類型變量或者非suspend函數(shù)的引用傳給我竿滨,我在我的
某某`函數(shù)中去調(diào)用你傳進(jìn)來的函數(shù)佳恬,把協(xié)程對(duì)象傳過去捏境,再把計(jì)算的結(jié)果返回給你suspend函數(shù)。這樣就達(dá)到了想要的目的毁葱,既能獲得協(xié)程對(duì)象垫言,又能在需要掛起的時(shí)候返回掛起表示,不需要掛起的時(shí)候返回具體結(jié)果倾剿。
聽起來如果覺得有點(diǎn)抽象筷频,沒關(guān)系,我們寫一段代碼演示一下前痘,比如你現(xiàn)在有一個(gè)需求凛捏,獲得一個(gè)Int類型數(shù)據(jù),如果這個(gè)數(shù)據(jù)之前被計(jì)算出來了就直接返回芹缔,如果沒有就需要重新計(jì)算坯癣,計(jì)算比較耗時(shí),需要把協(xié)程掛起最欠,計(jì)算完成把結(jié)果緩存起來以便下次直接使用示罗,
故事原本是這樣的,我們把代碼寫在suspend
函數(shù)中:
suspend fun calculate(): Any?{
//先不要關(guān)心cache是一個(gè)啥芝硬,只需要知道它可以緩存結(jié)果就行
var result = cache.get()
//如果沒有緩存就需要開一個(gè)子線程去計(jì)算蚜点,讓該函數(shù)掛起
if(result == null){
thread {
Thread.sleep(10000)
val result = 1 + 2
cache.put(result)
//計(jì)算完成了后調(diào)用協(xié)程的resume函數(shù)讓協(xié)程恢復(fù)。并把計(jì)算完成的結(jié)果交給協(xié)程拌阴。
//但是問題來了绍绘,contination在源碼階段是那拿不到的。
contination.resume(result) //error
}
//返回COROUTINE_SUSPENDED的目的是讓協(xié)程掛起迟赃。
return COROUTINE_SUSPENDED
}else{
//如果有緩存陪拘,直接返回
return result
}
}
雖然calculate函數(shù)返回值可以是真實(shí)的數(shù)據(jù)也可以是掛起標(biāo)識(shí)常摧,但是我們拿不到協(xié)程對(duì)象啊铛碑,于是乎我們按照kotlin的要求來,整一個(gè)變量杨帽,于是你又開始寫:
val calculateFun : (Continuation<Int>) -> Any? = { contination ->
var result = cache.get()
if(result == null){
thread {
Thread.sleep(10000)
val result = 1 + 2
cache.put(result)
contination.resume(result)
}
COROUTINE_SUSPENDED
}else{
result
}
}
然后kotlin給你提供了一個(gè)某某
函數(shù)摄乒,這個(gè)函數(shù)叫suspendCoroutineUninterceptedOrReturn
:
inline suspend fun <T> suspendCoroutineUninterceptedOrReturn(block: (Continuation<T>) -> Any?) : T{
//由于suspendCoroutineUninterceptedOrReturn是在編譯期間生成的,因此continuation是能拿到的残黑。
val continuation = xxxx //
return block(continuation)
}
這樣馍佑,你的calculate就可以改為:
suspend fun calculate() = suspendCoroutineUninterceptedOrReturn<Int>(calculateFun)
//為了省去定義calculateFun的環(huán)節(jié),因此可以簡化為:
suspend fun calculate() = suspendCoroutineUninterceptedOrReturn<Int> { contination ->
var result = cache.get()
if(result == null){
thread {
Thread.sleep(10000)
val result = 1 + 2
cache.put(result)
contination.resume(result)
}
COROUTINE_SUSPENDED
}else{
result
}
}
這就達(dá)到了能獲取到Continuation
的目的梨水,再加上 suspendCoroutineUninterceptedOrReturn
還是一個(gè)inline
函數(shù)拭荤,在編譯過后會(huì)內(nèi)聯(lián)到調(diào)用者里面,因此也不存在性能開銷疫诽。
suspendCoroutineUninterceptedOrReturn存在的問題
kotlin提供的這個(gè)suspendCoroutineUninterceptedOrReturn
從名字就可以看出來舅世,你既可以掛起該函數(shù)旦委,也可以直接return
。意思如果你需要掛起時(shí)就返回COROUTINE_SUSPENDED
,不需要掛起時(shí)就返回正確的結(jié)果雏亚。這個(gè)函數(shù)名里面還有一個(gè)’Unintercepted
‘ 意思就是我們拿到的協(xié)程是不包含調(diào)度器的缨硝,也就是是說調(diào)用其resume
函數(shù)時(shí)是不會(huì)走調(diào)度器的,這樣也就存在一個(gè)問題罢低,在什么線程里調(diào)用的resume
查辩,接下里協(xié)程就恢復(fù)在什么線程里面了,這也就導(dǎo)致了我們平時(shí)使用時(shí)很少直接使用suspendCoroutineUninterceptedOrReturn
而是使用另外兩個(gè)替代品suspendCoroutine
和suspendCancellableCoroutine
网持。
我們寫一段代碼來驗(yàn)證一下看看通過suspendCoroutineUninterceptedOrReturn
掛起后協(xié)程恢復(fù)在什么線程:
fun main() {
//第一步創(chuàng)建一個(gè)suspend 函數(shù)
val suspendFun : suspend () -> Unit = {
log("協(xié)程開始執(zhí)行")
val result = testSuspendResumeThread()
log("協(xié)程恢復(fù)后的結(jié)果: $result")
}
//第二步創(chuàng)建一個(gè)協(xié)程
val continuation = suspendFun.createCoroutine(object :Continuation<Unit>{
override val context: CoroutineContext
get() = EmptyCoroutineContext
override fun resumeWith(result: Result<Unit>) {
log("協(xié)程執(zhí)行完畢")
}
})
//第三步開啟一個(gè)協(xié)程
continuation.resume(Unit)
Thread.sleep(100000)
}
suspend fun testSuspendResumeThread() = suspendCoroutineUninterceptedOrReturn<String> {con ->
//開一線程模擬耗時(shí)操作
thread(name = "子線程") {
Thread.sleep(1000)
//結(jié)束后恢復(fù)協(xié)程
con.resume("over")
}
return@suspendCoroutineUninterceptedOrReturn COROUTINE_SUSPENDED
}
輸出結(jié)果:
00:06:22:865[ main ] 協(xié)程開始執(zhí)行
00:06:23:896[ 子線程 ] 協(xié)程恢復(fù)后的結(jié)果: over
00:06:23:897[ 子線程 ] 協(xié)程執(zhí)行完畢
從輸出的結(jié)果來看宜岛,的確如我們所料,協(xié)程被恢復(fù)在了子線程功舀。那有沒有辦法可以讓協(xié)程恢復(fù)在原來的線程環(huán)境里面呢萍倡?可能有的同學(xué)已經(jīng)開始在想了,我們是不是可以用學(xué)學(xué)源碼里面的做法辟汰,調(diào)用協(xié)程的intercepted函數(shù)列敲,獲得一個(gè)DispatchedContinuation
,調(diào)用DispatchedContinuation
的resume函數(shù)呢莉擒?有這個(gè)想法說明你對(duì)協(xié)程原理有一定了解了酿炸,很可惜,intercepted
函數(shù)我們調(diào)用不了涨冀。
suspendCoroutineUninterceptedOrReturn
除了存在協(xié)程恢復(fù)后線程的問題填硕,其實(shí)還有兩個(gè)問題:
第一個(gè):不算問題的問題,那就是我們寫代碼的時(shí)候需要自己去判斷是不是需要掛起鹿鳖,需要掛起的時(shí)候返回值需要通過協(xié)程的resume
函數(shù)傳回到協(xié)程調(diào)用的地方扁眯,然后在lambda中 return
COROUTINE_SUSPENDED
。不要掛起的時(shí)候值可以直接return
結(jié)果翅帜。貌似看上去很合理啊姻檀,邏輯清晰,思路簡單涝滴,但是對(duì)于kotlin這個(gè)最求極致簡單绣版,語法糖豐富的一種語言,怎么能容忍這樣沒有營養(yǎng)的代碼存在歼疮。
第二個(gè):suspendCoroutineUninterceptedOrReturn
使用不當(dāng)容易造成一些奇怪的問題杂抽,比如在需要掛起的時(shí)候忘記寫return COROUTINE_SUSPENDED
,或者既調(diào)用resuem
把值傳回協(xié)程里面韩脏,又直接return
了值缩麸,我們先看忘記return COROUTINE_SUSPENDED
的情況,還是以上面的代碼為基礎(chǔ)赡矢,稍微改動(dòng)一下:
suspend fun testSuspendResumeThread() = suspendCoroutineUninterceptedOrReturn<String> {con ->
thread(name = "子線程") {
Thread.sleep(1000)
con.resume("over")
}
//忘記寫 return COROUTINE_SUSPENDED
}
還有一個(gè)地方需要改一下杭朱,給調(diào)用testSuspendResumeThread
的地方try catch
一下阅仔,為什么要try Catch
,那是因?yàn)槲抑肋@里會(huì)發(fā)生異常。不try catch
也行弧械,也不會(huì)因?yàn)楫惓?dǎo)致進(jìn)程終止八酒,協(xié)程體內(nèi)發(fā)生的異常會(huì)在BaseContinuationImpl
的resumeWith
函數(shù)中被協(xié)程內(nèi)部捕獲,然后交給協(xié)程的completion
的resumeWith
函數(shù)梦谜,在本例中由于我們采用了協(xié)程基礎(chǔ)API創(chuàng)建的協(xié)程丘跌,在createCoroutine
的時(shí)候傳了一個(gè)簡單的匿名內(nèi)部類作為協(xié)程的completion
,在其resumeWith
中沒有對(duì)收到的異常做任何處理,
注意:如果我們采用協(xié)程框架提供的api創(chuàng)建協(xié)程時(shí),協(xié)程的completion
收到異常后唁桩,如果創(chuàng)建協(xié)程時(shí)指定了異常處理器就交給指定的異常處理器處理闭树,如果沒有指定就交給默認(rèn)異常處理器處理,默認(rèn)異常處理器處理就是拋出異常荒澡,進(jìn)程終止报辱。
val suspendFun : suspend () -> Unit = {
log("協(xié)程開始執(zhí)行")
try { //增加了try cacht
val result = testSuspendResumeThread()
log("協(xié)程恢復(fù)后的結(jié)果: $result")
}catch (e : Exception){
log(e) //打印異常信息
}
}
輸出結(jié)果:
00:46:57:354[ main ] 協(xié)程開始執(zhí)行
//類型轉(zhuǎn)換異常,Thread不能轉(zhuǎn)換成Strin類型单山。
00:46:57:397[ main ] 捕獲異常:java.lang.ClassCastException: class kotlin.concurrent.ThreadsKt$thread$thread$1 cannot be cast to class java.lang.String (kotlin.concurrent.ThreadsKt$thread$thread$1 is in unnamed module of loader 'app'; java.lang.String is in module java.base of loader 'bootstrap')
00:46:57:397[ main ] 協(xié)程執(zhí)行完畢
00:46:58:401[ 子線程 ] 協(xié)程恢復(fù)后的結(jié)果: over //協(xié)程執(zhí)行完后又打印了 ’over‘碍现,
Exception in thread "子線程" java.lang.NullPointerException
at kotlin.coroutines.jvm.internal.ContinuationImpl.releaseIntercepted(ContinuationImpl.kt:118)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:39)
at com.m.k.coroutine.example.TextExceptionKt$testSuspendResumeThread$2$thread$1.invoke(TextException.kt:44)
at com.m.k.coroutine.example.TextExceptionKt$testSuspendResumeThread$2$thread$1.invoke(TextException.kt:42)
at kotlin.concurrent.ThreadsKt$thread$thread$1.run(Thread.kt:30)
從輸出的結(jié)果看出以下幾個(gè)問題:
問題1:try catch
處 捕獲到一個(gè)異常(ClassCastException
),協(xié)程內(nèi)部還拋出了一個(gè)異常(NullPointerException
)米奸。
問題2: "協(xié)程執(zhí)行完畢" 先于 "協(xié)程恢復(fù)后的結(jié)果: over" 被輸出昼接。
就因?yàn)橥浄祷?COROUTINE_SUSPENDED
導(dǎo)致了一些列問題,如果我告訴你悴晰,給我們創(chuàng)建協(xié)程指定一個(gè)調(diào)度器后拋出異常又不一樣了慢睡。
問題3:會(huì)由原來的NullPointerException
變成了ClassCastException
.比如:
//第二步創(chuàng)建一個(gè)協(xié)程
val continuation = suspendFun.createCoroutine(object :Continuation<Unit>{
override val context: CoroutineContext
//原來get返回的是一個(gè)EmptyCoroutineContext,現(xiàn)在給他指定一個(gè)默認(rèn)調(diào)度器铡溪,讓協(xié)程運(yùn)行在線程池里面
get() = Dispatchers.Default
override fun resumeWith(result: Result<Unit>) {
log("協(xié)程執(zhí)行完畢")
}
})
其他代碼不變的情況下漂辐,看看輸出結(jié)果:
00:52:28:676[ DefaultDispatcher-worker-1 ] 協(xié)程開始執(zhí)行
//trc catch捕獲到的異常沒變,Thread不能轉(zhuǎn)換成Strin類型棕硫。
00:52:28:731[ DefaultDispatcher-worker-1 ] 捕獲異常:java.lang.ClassCastException: class kotlin.concurrent.ThreadsKt$thread$thread$1 cannot be cast to class java.lang.String (kotlin.concurrent.ThreadsKt$thread$thread$1 is in unnamed module of loader 'app'; java.lang.String is in module java.base of loader 'bootstrap')
00:52:28:732[ DefaultDispatcher-worker-1 ] 協(xié)程執(zhí)行完畢 //同樣優(yōu)先于 “協(xié)程恢復(fù)后的結(jié)果: over”
//協(xié)程內(nèi)部拋出的異常由原來的NullPointerException變成了ClassCastException
//這兒的這個(gè)ClassCastException和上面的那個(gè)ClassCastException不是同一個(gè)髓涯。
Exception in thread "子線程" java.lang.ClassCastException: class kotlin.coroutines.jvm.internal.CompletedContinuation cannot be cast to class kotlinx.coroutines.internal.DispatchedContinuation (kotlin.coroutines.jvm.internal.CompletedContinuation and kotlinx.coroutines.internal.DispatchedContinuation are in unnamed module of loader 'app')
at kotlinx.coroutines.CoroutineDispatcher.releaseInterceptedContinuation(CoroutineDispatcher.kt:147)
at kotlin.coroutines.jvm.internal.ContinuationImpl.releaseIntercepted(ContinuationImpl.kt:118)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:39)
at com.m.k.coroutine.example.TextExceptionKt$testSuspendResumeThread$2$thread$1.invoke(TextException.kt:44)
at com.m.k.coroutine.example.TextExceptionKt$testSuspendResumeThread$2$thread$1.invoke(TextException.kt:42)
at kotlin.concurrent.ThreadsKt$thread$thread$1.run(Thread.kt:30)
00:52:29:731[ 子線程 ] 協(xié)程恢復(fù)后的結(jié)果: over
問題四:打印兩次結(jié)果。
我們接著在看看另一種情況哈扮,既調(diào)用resum又直接返回了結(jié)果:
suspend fun testSuspendResumeThread() = suspendCoroutineUninterceptedOrReturn<String> {con ->
val thread = thread(name = "子線程") {
Thread.sleep(1000)
con.resume("over")
}
//把我們要的結(jié)果`over`直接返回了
return@suspendCoroutineUninterceptedOrReturn "over"
}
看看輸出結(jié)果:
01:01:16:180[ DefaultDispatcher-worker-1 ] 協(xié)程開始執(zhí)行
01:01:16:203[ DefaultDispatcher-worker-1 ] 協(xié)程恢復(fù)后的結(jié)果: over //第一次打印
01:01:16:204[ DefaultDispatcher-worker-1 ] 協(xié)程執(zhí)行完畢
01:01:17:205[ 子線程 ] 協(xié)程恢復(fù)后的結(jié)果: over //第二次打印
//協(xié)程內(nèi)部拋出的異常和上面新增調(diào)度器后拋出的異常是同一個(gè)異常纬纪。
Exception in thread "子線程" java.lang.ClassCastException: class kotlin.coroutines.jvm.internal.CompletedContinuation cannot be cast to class kotlinx.coroutines.internal.DispatchedContinuation (kotlin.coroutines.jvm.internal.CompletedContinuation and kotlinx.coroutines.internal.DispatchedContinuation are in unnamed module of loader 'app')
at kotlinx.coroutines.CoroutineDispatcher.releaseInterceptedContinuation(CoroutineDispatcher.kt:147)
at kotlin.coroutines.jvm.internal.ContinuationImpl.releaseIntercepted(ContinuationImpl.kt:118)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:39)
at com.m.k.coroutine.example.TextExceptionKt$testSuspendResumeThread$2$thread$1.invoke(TextException.kt:44)
at com.m.k.coroutine.example.TextExceptionKt$testSuspendResumeThread$2$thread$1.invoke(TextException.kt:42)
at kotlin.concurrent.ThreadsKt$thread$thread$1.run(Thread.kt:30)
有沒有發(fā)現(xiàn),打印了兩次'over'滑肉。少了一個(gè)異常育八,在我們try里面沒有發(fā)生類型轉(zhuǎn)換異常了,如果你不知道協(xié)程內(nèi)部運(yùn)行原理赦邻,肯定會(huì)一頭霧水,這都什么東西实檀?怎么會(huì)這樣惶洲?
不要著急按声,接下里我們就一個(gè)一個(gè)的把問題都解決了,聽我慢慢到來恬吕,不管是忘記寫COROUTINE_SUSPENDED
還是既調(diào)用resume
又直接return
結(jié)果签则,他們出問題的原因是一樣的。在lambda
表達(dá)式中不明確return
的時(shí)候铐料,默認(rèn)會(huì)把最后一行的代碼的結(jié)果作為返回值渐裂。我們回頭看看上面的忘記寫COROUTINE_SUSPENDED
的時(shí)候的代碼:
suspend fun testSuspendResumeThread() = suspendCoroutineUninterceptedOrReturn<String> {con ->
thread(name = "子線程") {
Thread.sleep(1000)
con.resume("over")
}
//忘記寫 return COROUTINE_SUSPENDED
}
這個(gè)lambda最后一個(gè)代碼的結(jié)果是啥?沒錯(cuò)钠惩,就是Thread
對(duì)象柒凉。thrad
函數(shù)開啟一個(gè)線程后會(huì)返回Thred
對(duì)象。所以以上代碼等效于如下:
suspend fun testSuspendResumeThread() = suspendCoroutineUninterceptedOrReturn<String> {con ->
val thread = thread(name = "子線程") {
Thread.sleep(1000)
con.resume("over")
}
return@suspendCoroutineUninterceptedOrReturn thread
}
我們要求在不掛起的情況下真實(shí)數(shù)據(jù)的返回值類型通過suspendCoroutineUninterceptedOrReturn
函數(shù)的泛型參數(shù)指定了是String
,你要么返回一個(gè)String
要么就返回一個(gè)掛起標(biāo)識(shí)COROUTINE_SUSPENDED
篓跛,你現(xiàn)在給我來個(gè)Thread
類型膝捞,在編譯后的協(xié)程體代碼里面優(yōu)先判斷返回值是不是掛起標(biāo)識(shí),如果是愧沟,協(xié)程掛起蔬咬,如果不是會(huì)把結(jié)果強(qiáng)轉(zhuǎn)成泛型參數(shù)指定類型,此處為String
,所以協(xié)try catch
捕獲到了一個(gè)Thread
不能轉(zhuǎn)換成Strin
類型的異常沐寺。是不是合情合理林艘。
ok第一個(gè)異常問題搞清楚原因了。繼續(xù)找第二個(gè)異常的問題的原因混坞,在說原因之前狐援,你得先知道一個(gè)結(jié)論,一個(gè)協(xié)程在什么時(shí)候被認(rèn)為執(zhí)行完成拔第,一個(gè)協(xié)程被認(rèn)為執(zhí)行完成的條件是協(xié)程體最后一行代碼被執(zhí)行了并且里面的所有子協(xié)程都執(zhí)行完成咕村。有了這個(gè)結(jié)論后我們繼續(xù)看我們的代碼執(zhí)行流程,以下面代碼為基礎(chǔ):
fun main() {
//第一步創(chuàng)建一個(gè)suspend 函數(shù)
val suspendFun : suspend () -> Unit = {
log("協(xié)程開始執(zhí)行")
try {
val result = testSuspendResumeThread()
log("協(xié)程恢復(fù)后的結(jié)果: $result")
}catch (e : Exception){
log("捕獲異常:$e")
}
}
//第二步創(chuàng)建一個(gè)協(xié)程蚊俺,用一個(gè)匿名內(nèi)部類作為協(xié)程的comletion懈涛。也就是協(xié)程執(zhí)行完后的回調(diào)
val continuation = suspendFun.createCoroutine(object :Continuation<Unit>{
override val context: CoroutineContext
get() = EmptyCoroutineContext //沒有指定調(diào)度器
//由于上面的異常已經(jīng)被try catch了,這里就收不到了
override fun resumeWith(result: Result<Unit>) {
log("協(xié)程執(zhí)行完畢")
}
})
//第三步開啟一個(gè)協(xié)程
continuation.resume(Unit)
Thread.sleep(100000)
}
suspend fun testSuspendResumeThread() = suspendCoroutineUninterceptedOrReturn<String> {con ->
val thread = thread(name = "子線程") {
Thread.sleep(1000)
con.resume("over")
}
return@suspendCoroutineUninterceptedOrReturn thread
}
在我們創(chuàng)建的協(xié)程里面泳猬,調(diào)用testSuspendResumeThread
批钠,開啟一個(gè)子線程后,里面立馬返回了Thread
對(duì)象得封,所以協(xié)程不會(huì)被掛起埋心,繼續(xù)執(zhí)行,try cache
就捕獲到了類型轉(zhuǎn)換異常忙上,打印出異常信息后拷呆,協(xié)程體里面就沒有其他代碼了,并且也沒有任何其他子協(xié)程,因此我們創(chuàng)建的協(xié)程就執(zhí)行完成了茬斧,我們的協(xié)程體里面的代碼是被BaseContinuationImpl
中的resumeWith
函數(shù)里面的invokeSuspend
調(diào)用的腰懂,協(xié)程執(zhí)行完成后,resumeWith
還會(huì)執(zhí)行其他剩余操作:
internal abstract class BaseContinuationImpl(
public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {
public final override fun resumeWith(result: Result<Any?>) {
var current = this
var param = result
while (true) {
probeCoroutineResumed(current)
with(current) {
val completion = completion!!
val outcome: Result<Any?> =
try {
//在這里調(diào)用了編譯器生成的類的invokeSuspend项秉,可以理解為調(diào)用創(chuàng)建的
//協(xié)程體里面的代碼绣溜。由于我們?cè)趨f(xié)程題里面try cache了異常,因此
//異常不會(huì)被這里的try cache捕獲到娄蔼。協(xié)程體里面的代碼全部執(zhí)行完成
//outcome為Unit,接著就會(huì)走下面的releaseIntercepted()
val outcome = invokeSuspend(param)
if (outcome === COROUTINE_SUSPENDED) return
Result.success(outcome)
} catch (exception: Throwable) {
//如果我們不在協(xié)程體里面try catch異常怖喻,異常會(huì)被這里捕獲。存入到Result中
//這個(gè)Result被賦值給outcome,最終會(huì)被下面的completion.resumeWith(outcome)調(diào)用岁诉。
//既然發(fā)了異常就不能藏著锚沸,必須得讓外界知道。
Result.failure(exception)
}
//協(xié)程體里面代碼執(zhí)行完成后唉侄,繼續(xù)執(zhí)行releaseIntercepted
releaseIntercepted()
if (completion is BaseContinuationImpl) {
current = completion
param = outcome
} else {
//completion就是我們調(diào)用createCoroutine時(shí)傳進(jìn)去的那個(gè)匿名對(duì)象咒吐。
//這一行的代碼的代用打印了”協(xié)程執(zhí)行完畢“,
// 協(xié)程沒有返回值outcome里面就是Unit,如果協(xié)程體內(nèi)發(fā)生未被捕獲的異常,outcome
//里面就會(huì)包含被上面try catch的異常属划。
completion.resumeWith(outcome)
return
}
}
}
}
xxxxx
}
進(jìn)入到協(xié)程的releaseIntercepted里面:
protected override fun releaseIntercepted() {
//由于沒有指定調(diào)度器恬叹,因此intercepted =this。 this就是我們創(chuàng)建的那個(gè)原始協(xié)程同眯,BaseContinuationImpl對(duì)象
val intercepted = intercepted
//因此if條件不滿足绽昼,不會(huì)走進(jìn)去
if (intercepted != null && intercepted !== this) {
context[ContinuationInterceptor]!!.releaseInterceptedContinuation(intercepted)
}
//把CompletedContinuation賦值給intercepted,記住這里是關(guān)鍵
this.intercepted = CompletedContinuation
}
//這個(gè)函數(shù)在創(chuàng)建協(xié)程的時(shí)候會(huì)調(diào)用。因此有了intercepted = this
public fun intercepted(): Continuation<Any?> =
//如果沒有指定調(diào)度器须蜗,返回this,并把this賦值給intercepted
intercepted ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
.also { intercepted = it }
到這里硅确,一切正常,由于testSuspendResumeThread
返回值不是COROUTINE_SUSPENDED
而是一個(gè)Thread
對(duì)象明肮,協(xié)程沒有被掛起菱农,因?yàn)榉祷仡愋偷腻e(cuò)誤倒是捕獲了一個(gè)異常后協(xié)程快速的執(zhí)行完成了,一秒鐘后柿估,在testSuspendResumeThread
開啟的子線程調(diào)用了協(xié)程的resume
函數(shù):
public inline fun <T> Continuation<T>.resume(value: T): Unit =
resumeWith(Result.success(value))
resume
為Continuation
的擴(kuò)展函數(shù)循未,直接調(diào)用了協(xié)程的resumeWith
,因此也就調(diào)用到了BaseContinuationImpl
的resumeWith
函數(shù)秫舌,并且把 結(jié)果"over" 傳到了 BaseContinuationImpl
的resumeWith函數(shù)里面的妖,繼續(xù)看resumeWith
里面做了什么:
public final override fun resumeWith(result: Result<Any?>) {
var current = this
var param = result
while (true) {
probeCoroutineResumed(current)
with(current) {
val completion = completion!!
val outcome: Result<Any?> =
try {
//第一步:
//param里面包含了結(jié)果"over",再次調(diào)用了我們創(chuàng)建的協(xié)程體里面的代碼
//所以才有了協(xié)程執(zhí)行完后才又打印了'over'的輸出足陨,這也就解釋了上面提到的兩個(gè)問題中的
//第二個(gè)問題:"協(xié)程執(zhí)行完畢" 先于 "協(xié)程恢復(fù)后的結(jié)果: over" 被輸出嫂粟。
//這一次不會(huì)拋出類型轉(zhuǎn)換異常了,協(xié)程體里面代碼正常執(zhí)行完畢墨缘,outcome 為Unit.
//為什么返回值為Unit星虹,因?yàn)槲覀儎?chuàng)建的就是一個(gè)沒有返回值的的協(xié)程零抬。
val outcome = invokeSuspend(param)
if (outcome === COROUTINE_SUSPENDED) return
Result.success(outcome)
} catch (exception: Throwable) {
Result.failure(exception)
}
//第二步:繼續(xù)調(diào)用releaseIntercepted
releaseIntercepted()
if (completion is BaseContinuationImpl) {
current = completion
param = outcome
} else {
completion.resumeWith(outcome)
return
}
}
}
}
xxxxx
}
又走到了協(xié)程的releaseIntercepted
函數(shù)里面:
protected override fun releaseIntercepted() {
//在前一次releaseIntercepted調(diào)用的時(shí)候intercepted被賦值為了CompletedContinuation
val intercepted = intercepted
//if條件滿足。this為我們?cè)紖f(xié)程BaseContinuationImpl對(duì)象搁凸。
if (intercepted != null && intercepted !== this) {
//我們創(chuàng)建寫的時(shí)候沒有指定調(diào)度器媚值,因此 context[ContinuationInterceptor] 為null.
//這就是協(xié)程內(nèi)部拋出NullPointerException的原因。
context[ContinuationInterceptor]!!.releaseInterceptedContinuation(intercepted)
}
this.intercepted = CompletedContinuation
}
到目前我只护糖,我們提出的前兩個(gè)問題:
1. try catch處 捕獲到一個(gè)異常(`ClassCastException`),協(xié)程內(nèi)部還拋出了一個(gè)異常(`NullPointerException`)嚼松。
2. ` 協(xié)程執(zhí)行完畢` 先于 `協(xié)程恢復(fù)后的結(jié)果: over` 被輸出嫡良。
都找到愿原因了。別著急献酗,還沒完呢寝受。前面不是說了,給創(chuàng)建的協(xié)程指定了一個(gè)調(diào)度器后罕偎,協(xié)程內(nèi)部拋出的異常又不是NullPointerException
很澄,而是ClassCastException
。這個(gè)問題的原因也是出在第二次調(diào)用releaseIntercepted
這流程里面颜及。其他地方執(zhí)行流程都不變甩苛,我們重新回顧依稀兩次調(diào)用releaseIntercepted
。
第一次:
protected override fun releaseIntercepted() {
//由于指定了調(diào)度器俏站,因此intercepted為DispatchedContination
val intercepted = intercepted
//if條件滿足讯蒲,會(huì)走進(jìn)去
if (intercepted != null && intercepted !== this) {
//拿到調(diào)度器CoroutineDispatcher,由于指定了調(diào)度器肄扎,因此不會(huì)為空。
context[ContinuationInterceptor]!!.releaseInterceptedContinuation(intercepted)
}
//把CompletedContinuation賦值給intercepted
this.intercepted = CompletedContinuation
}
代碼進(jìn)入到了CoroutineDispatche
的releaseInterceptedContinuation
里面:
public final override fun releaseInterceptedContinuation(continuation: Continuation<*>) {
//第一次continuation為DispatchedContination,轉(zhuǎn)換沒有問題象浑,
val dispatched = continuation as DispatchedContinuation<*>
dispatched.release()
}
第一次順利執(zhí)行完成窖铡,第二次執(zhí)行releaseIntercepted
的時(shí)候:
protected override fun releaseIntercepted() {
//第一次執(zhí)行后 intercepted = CompletedContinuation
val intercepted = intercepted
//if條件滿足,會(huì)走進(jìn)去
if (intercepted != null && intercepted !== this) {
//拿到調(diào)度器CoroutineDispatcher,此時(shí)intercepted為CompletedContinuation
context[ContinuationInterceptor]!!.releaseInterceptedContinuation(intercepted)
}
//把CompletedContinuation賦值給intercepted
this.intercepted = CompletedContinuation
}
再次進(jìn)入CoroutineDispatche
的releaseInterceptedContinuation
里面:
public final override fun releaseInterceptedContinuation(continuation: Continuation<*>) {
//第二次continuation為CompletedContinuation衡载,出現(xiàn)類型轉(zhuǎn)換異常搔耕,
val dispatched = continuation as DispatchedContinuation<*>
dispatched.release()
}
回頭看看前面的輸出結(jié)果中是不是輸出的CompletedContinuation
不是DispatchedContinuation
的一個(gè)類型轉(zhuǎn)換異常。這下問題3也找到原因了月劈。
經(jīng)過前面3個(gè)問題的分析度迂,問題4兩次打印的問題是不是就呼之欲出了,在testSuspendResumeThread
中直接返回結(jié)果"over"協(xié)程不會(huì)被掛起有了第一次打印猜揪。一秒后惭墓,子線程調(diào)用協(xié)程的resuem
再次把結(jié)果"over"傳到了協(xié)程體里面就有了第二次打印。
在kotlin源碼的的注釋中也寫到而姐,不建議在suspendCoroutineUninterceptedOrReturn
里面直接調(diào)用協(xié)程的resume
腊凶,這里所說的直接調(diào)用是指的同步調(diào)用,也就是說在同一個(gè)線程里面調(diào)用,同一個(gè)線程棧里面調(diào)用钧萍,比如:
suspend fun testSuspendResumeThread() = suspendCoroutineUninterceptedOrReturn<String> {con ->
con.resume("over")
}
這也相對(duì)于:
suspend fun testSuspendResumeThread() = suspendCoroutineUninterceptedOrReturn<String> {con ->
con.resume("over")
return@suspendCoroutineUninterceptedOrReturn Unit
}
至于這樣寫了會(huì)出現(xiàn)什么問題褐缠?想必經(jīng)過前面的學(xué)習(xí),你應(yīng)該知道了风瘦。第一:Unit不能轉(zhuǎn)成String的類型異常队魏,第二,沒有指定調(diào)度器會(huì)協(xié)程內(nèi)部會(huì)拋出空指針異常万搔,指定了調(diào)度器會(huì)拋出類型轉(zhuǎn)換異常胡桨。
總結(jié):
suspendCoroutineUninterceptedOrReturn
存在的問題如下:
- 調(diào)用
suspendCoroutineUninterceptedOrReturn
掛起后協(xié)程不會(huì)被恢復(fù)到原來的線程環(huán)境里面執(zhí)行剩余代碼。- 使用不當(dāng)會(huì)造成各種異常瞬雹。
經(jīng)過這么一分析昧谊,suspendCoroutineUninterceptedOrReturn
存在這么多問題,那為什么還要發(fā)明他呢酗捌?存在即合理呢诬,suspendCoroutine
和suspendCancellableCoroutine
中用到了他,并且還把suspendCoroutineUninterceptedOrReturn
存在的問題完美解決了胖缤。
suspendCoroutine
有了前面suspendCoroutineUninterceptedOrReturn
的經(jīng)驗(yàn)尚镰,從命名上就可以知道suspendCoroutine
他要做的事情是掛起協(xié)程,不支持直接返回草姻,從源頭上杜絕你使用不當(dāng)造成suspendCoroutineUninterceptedOrReturn
中retun
錯(cuò)存在的問題钓猬。那你肯定會(huì)想,如果我的需求是有可能掛起也有可能不需要掛起直接返回怎么辦呢撩独?難道koltin協(xié)程的開發(fā)者就這么傻嗎敞曹?想不到這么簡單需求的問題嗎?顯然是不可能的综膀,看似不支持直接返回澳迫,必須掛起,實(shí)際上是可以直接返回的剧劝。使用suspendCoroutine
時(shí)橄登,不管是掛起還是不需要掛起,想要把結(jié)果返回讥此,都要通過調(diào)用continuation.resume
這種方式拢锹,并且suspendCoroutine
還完美絕了掛起恢復(fù)后的線程問題。它是怎么做到的呢萄喳?我們先看源碼:
public suspend inline fun <T> suspendCoroutine(crossinline block: (Continuation<T>) -> Unit): T {
contract { callsInPlace(block, InvocationKind.EXACTLY_ONCE) }
return suspendCoroutineUninterceptedOrReturn { c: Continuation<T> ->
/**
* 第一步:拿到協(xié)程卒稳,c 這個(gè)c為原始協(xié)程。
* 第二步:通過c.intercepted()拿到原始協(xié)程的調(diào)度器協(xié)程DispatchedContinuation他巨,
* DispatchedContinuation也是一個(gè)Continuation,它里面包含了原始協(xié)程和一個(gè)調(diào)度器
* 第三步:創(chuàng)建一個(gè)SafeContinuation充坑,把DispatchedContinuation放進(jìn)去减江。
* 第四步:block為我們的lambda表達(dá)式,執(zhí)行block的事后使用SafeContinuation代替了原始協(xié)程捻爷。
*/
val safe = SafeContinuation(c.intercepted())
//block中返回任何東西都對(duì)suspendCoroutine最終的返回值沒有任何影響
//這里就杜絕了 suspendCoroutineUninterceptedOrReturn返回不當(dāng)?shù)膯栴}辈灼。
//但是suspendCoroutineUninterceptedOrReturn總歸需要有返回值的,那么怎么辦呢也榄?
//kotlin開發(fā)者們幫你做了巡莹。通過 safe.getOrThrow()把值返回出去。
block(safe)
/**
*
*
* suspendCoroutine的返回值有兩種情況:
* 第一:在safe.getOrThrow()這行代碼執(zhí)行之前如果在block代碼中調(diào)用了
* SafeContinuation的resume手蝎,那么safe.getOrThrow()的結(jié)果就是resume時(shí)傳進(jìn)去的值榕莺。
* 這樣suspendCoroutine就直接把結(jié)果返回出去了,調(diào)用suspendCoroutine的函數(shù)
* 就不會(huì)被掛起了棵介。
* 第二:在執(zhí)行safe.getOrThrow()這行代碼的時(shí)候,SafeContinuation的resume還沒被調(diào)用吧史,那么
* safe.getOrThrow()的結(jié)果就是COROUTINE_SUSPENDED邮辽。調(diào)用suspendCoroutine的函數(shù)就
* 會(huì)被掛起。
*
*
* 是不是很巧妙的就解決了返回值不當(dāng)問題贸营。既能滿足掛起返回OROUTINE_SUSPENDED又能滿足不需要
* 掛起時(shí)返回需要的數(shù)據(jù)吨述。
*
* 同時(shí)通過c.intercepted()拿到了原始協(xié)程的調(diào)度器協(xié)程,讓掛起的協(xié)程恢復(fù)在原來的線程環(huán)境里面钞脂。
*/
safe.getOrThrow()
}
}
所以當(dāng)我們?cè)谑褂?code>suspendCoroutine函數(shù)后揣云,收到的Continuation
并不是我們?cè)紖f(xié)程,而是經(jīng)過層層包裝的協(xié)程冰啃,他們之間的關(guān)系如下:
原來我們想要在suspend
函數(shù)中使用Continuation邓夕,可以通過suspendCoroutineUninterceptedOrReturn
,現(xiàn)在kotlin又提供了一個(gè)函數(shù)叫suspendCoroutine
阎毅,通過它我們也可以拿到Continuation焚刚,我們又多了一種在suspned
函數(shù)中獲取Continuation
的方式:
在suspendCoroutine
中kotlin為我們把原始協(xié)程進(jìn)行了層層包裝,最終我么拿到的Continuatin
就變成了SafeContinuation
.當(dāng)我們?cè)诖a中調(diào)用resume時(shí)也就變成了調(diào)用SafeContinuation
的resume
扇调。這樣做目的是什么呢矿咕?這樣做用兩個(gè)重要的目的:
第一:安全(Safe):這兒的安全是指resume只能調(diào)用一次。不能多次調(diào)用resume
,多次調(diào)用會(huì)拋出一個(gè) IllegalStateException("Already resumed")
第二:線程切換:使用suspendCoroutineUninterceptedOrReturn
時(shí)拿到的Continuation
為原始協(xié)程狼钮,調(diào)用它的resume
被恢復(fù)后的協(xié)程所運(yùn)行的線程不受協(xié)程調(diào)度器控制了碳柱,而是由調(diào)用resume
函數(shù)所在的線程決定。因此為了讓協(xié)程被恢復(fù)在原來的線程里面熬芜。為了解決這個(gè)問題莲镣,suspendCoroutine
傳給我們的協(xié)程不再是原始協(xié)程,而是SafeContinuation
猛蔽。所以在恢復(fù)一個(gè)協(xié)程時(shí)我們調(diào)用SafeContinuation
的resume
剥悟,SafeContinuation
中調(diào)用DispatchedContinuation
的resume
灵寺,在DispatchedContinuation
中用調(diào)度器去執(zhí)行原始協(xié)程的resume
調(diào)用操作。這樣就達(dá)到了讓原始協(xié)程被恢復(fù)在其原來的線程環(huán)境里面区岗。
SafeContinuation
中的安全除了上面提到的略板,還有一個(gè)安全作用就是解決了suspendCoroutineUninterceptedOrReturn
使用不當(dāng)?shù)母鞣N異常問題
suspendCoroutine函數(shù)的簽名:
public suspend inline fun <T> suspendCoroutine(crossinline block: (Continuation<T>) -> Unit): T
我們代碼是寫在block函數(shù)里面的,但是blocK
的函數(shù)是沒有返回值的慈缔,也就是說不管你在block
代碼塊return任何東西都沒有任何意義叮称。這樣就杜絕了像在使用suspendCoroutineUninterceptedOrReturn
時(shí)返回不當(dāng)?shù)母鞣N異常,我們一個(gè)簡單的例子來看看到底是怎么做到的藐鹤?
fun main() {
GlobalScope.launch {
val result = test("kotlin coroutine")
println( result)
}
Thread.sleep(100000)
}
suspend fun test(arg : String) = suspendCoroutine<String> { con ->
//此處沒有任何耗時(shí)操作瓤檐。直接調(diào)用resume。協(xié)程會(huì)掛起嗎娱节?
con.resume("Hello $arg by resume ")
}
在跟進(jìn)去看SafeContinuation
的resume
是怎么做到之前挠蛉,先看看suspendCoroutine
里面是怎么做的:
public suspend inline fun <T> suspendCoroutine(crossinline block: (Continuation<T>) -> Unit): T {
contract { callsInPlace(block, InvocationKind.EXACTLY_ONCE) }
return suspendCoroutineUninterceptedOrReturn { c: Continuation<T> ->
//創(chuàng)建好一個(gè)SafeContinuation后
val safe = SafeContinuation(c.intercepted())
//調(diào)用block函數(shù),也就是執(zhí)行上面 con.resume("Hello $arg by resume ")這一行代碼
block(safe)
safe.getOrThrow()
}
}
resume
函數(shù)Continuationde
的擴(kuò)展函數(shù):
public inline fun <T> Continuation<T>.resume(value: T): Unit =
resumeWith(Result.success(value))
進(jìn)入到SafeContinuation
的resumeWith
函數(shù):
public actual override fun resumeWith(result: Result<T>) {
while (true) { // lock-free loop
//根據(jù)result決定肄满,這個(gè)result是個(gè)什么呢谴古?
val cur = this.result // atomic read
when {
cur === UNDECIDED -> if (RESULT.compareAndSet(this, UNDECIDED, result.value)) return
cur === COROUTINE_SUSPENDED -> if (RESULT.compareAndSet(this, COROUTINE_SUSPENDED, RESUMED)) {
delegate.resumeWith(result)
return
}
else -> throw IllegalStateException("Already resumed")
}
}
}
看看這個(gè)resut
是在什么時(shí)候初始化的。
internal actual class SafeContinuation<in T>
internal actual constructor(
private val delegate: Continuation<T>,
initialResult: Any?
) : Continuation<T>, CoroutineStackFrame {
//在suspendCoroutine中調(diào)用該構(gòu)造函數(shù)稠歉,這個(gè)構(gòu)造函數(shù)又調(diào)用主構(gòu)造函數(shù)掰担,
internal actual constructor(delegate: Continuation<T>) : this(delegate, UNDECIDED)
//初始值就可以知道了為 UNDECIDED
private var result: Any? = initialResult
}
所以在執(zhí)行resumeWith的時(shí)候result的值為UNDECIDED。ok 接著繼續(xù):
public actual override fun resumeWith(result: Result<T>) {
while (true) { // lock-free loop
//cur == UNDECIDED
val cur = this.result
when {
/**
* 這個(gè)RESULT.compareAndSet操作就是一個(gè)CAS操作怒炸,意思就是如果this.result原來的值
* 為UNDECIDED带饱,那么就把result.value(這個(gè)result.value就是我們?cè)谕饷嬲{(diào)用resume時(shí)傳的那個(gè)值)
* 賦值給this.result,并返回true,相當(dāng)于把resume調(diào)用時(shí)傳進(jìn)來的值保存在了this.result這個(gè)變量中阅羹。
*
* 如果this.result原來的值不為UNDECIDED就return false.
*
* 所以此處就會(huì)把外面調(diào)用resume時(shí)傳進(jìn)來的值保存在了this.result變量中勺疼,然后就return 出去了。
* 也就是說們上面con.resume("Hello $arg by resume ")這一行代碼只是把”Hello $arg by resume“
* 保存在了this.result中灯蝴,并沒有去做喚醒原始協(xié)程的操作恢口。
*/
cur === UNDECIDED -> if (RESULT.compareAndSet(this, UNDECIDED, result.value)) return
cur === COROUTINE_SUSPENDED -> if (RESULT.compareAndSet(this, COROUTINE_SUSPENDED, RESUMED)) {
delegate.resumeWith(result)
return
}
else -> throw IllegalStateException("Already resumed")
}
}
}
在resumeWith
中把值保存了后,就直接return
了穷躁。這個(gè)時(shí)候在回到suspendCoroutine
中耕肩,block
就執(zhí)行完了。執(zhí)行下一行代碼safe.getOrThrow():
@PublishedApi
internal actual fun getOrThrow(): Any? {
var result = this.result
//在上面resumeWith中问潭,this.result值已經(jīng)被設(shè)置為了 ”Hello $arg by resume“
if (result === UNDECIDED) {
if (RESULT.compareAndSet(this, UNDECIDED, COROUTINE_SUSPENDED)) return COROUTINE_SUSPENDED
result = this.result // reread volatile var
}
return when {
result === RESUMED -> COROUTINE_SUSPENDED
result is Result.Failure -> throw result.exception
else -> result //所以最終走到這里猿诸,把”Hello $arg by resume“返回出去了
}
}
所以最終suspendCoroutine
方法中:
public suspend inline fun <T> suspendCoroutine(crossinline block: (Continuation<T>) -> Unit): T {
contract { callsInPlace(block, InvocationKind.EXACTLY_ONCE) }
return suspendCoroutineUninterceptedOrReturn { c: Continuation<T> ->
//創(chuàng)建好一個(gè)SafeContinuation后
val safe = SafeContinuation(c.intercepted())
//調(diào)用block函數(shù),也就是執(zhí)行上面 con.resume("Hello $arg by resume ")這一行代碼
block(safe)
/**
* 所以最后safe.getOrThrow()的結(jié)果為"Hello $arg by resume“狡忙,作為
* suspendCoroutineUninterceptedOrReturn的返回值返回到
* suspendCoroutine中梳虽,在suspendCoroutine中又把結(jié)果返回到調(diào)用它的test函數(shù)中。
* 最終test函數(shù)直接把這個(gè)結(jié)果返回出去了灾茁。從而實(shí)現(xiàn)了沒有掛起直接返回結(jié)果窜觉。
*/
safe.getOrThrow()
}
}
如果在test中需要掛起又是怎么一個(gè)流程呢谷炸?
suspend fun test(arg : String) = suspendCoroutine<String> { con ->
//開啟一個(gè)子線程模擬耗時(shí)操作
thread {
Thread.sleep(1000)
//耗時(shí)完成后,調(diào)用resume恢復(fù)協(xié)程
con.resume("Hello $arg by resume ")
}
}
同樣在suspendCoroutine
函數(shù)中:
public suspend inline fun <T> suspendCoroutine(crossinline block: (Continuation<T>) -> Unit): T {
contract { callsInPlace(block, InvocationKind.EXACTLY_ONCE) }
return suspendCoroutineUninterceptedOrReturn { c: Continuation<T> ->
//創(chuàng)建好一個(gè)SafeContinuation后
val safe = SafeContinuation(c.intercepted())
//調(diào)用block函數(shù)禀挫,開啟線程后旬陡,block就執(zhí)行完了。
block(safe)
//因此調(diào)用safe.getOrThrow()的時(shí)候语婴,resume還沒有調(diào)用描孟。
safe.getOrThrow()
}
}
先調(diào)用getOrThrow():
@PublishedApi
internal actual fun getOrThrow(): Any? {
var result = this.result
//這個(gè)時(shí)候this.result的值還是初始值為UNDECIDED
if (result === UNDECIDED) {
//把this.result值設(shè)置為COROUTINE_SUSPENDED,然后返回COROUTINE_SUSPENDED
if (RESULT.compareAndSet(this, UNDECIDED, COROUTINE_SUSPENDED)) return COROUTINE_SUSPENDED
result = this.result
}
return when {
result === RESUMED -> COROUTINE_SUSPENDED
result is Result.Failure -> throw result.exception
else -> result
}
}
getOrThrow()
返回了COROUTINE_SUSPENDED
砰左,因此匿醒,suspendCoroutineUninterceptedOrReturn
把COROUTINE_SUSPENDED
返回給了suspendCoroutine
,把``suspendCoroutine又把
COROUTINE_SUSPENDED作為結(jié)果返回到了
test中缠导。最終
test的返回結(jié)果就是
COROUTINE_SUSPENDED廉羔,這樣
test`函數(shù)就被協(xié)程掛起了。
耗時(shí)操作執(zhí)行完成后在僻造,再調(diào)用resume
的時(shí)候:
public actual override fun resumeWith(result: Result<T>) {
while (true) {
//在調(diào)用getOrThrow時(shí) this.result值被設(shè)置為了COROUTINE_SUSPENDED
val cur = this.result
when {
cur === UNDECIDED -> if (RESULT.compareAndSet(this, UNDECIDED, result.value)) return
//通過compareAndSet把this.result設(shè)置為RESUMED
cur === COROUTINE_SUSPENDED -> if (RESULT.compareAndSet(this, COROUTINE_SUSPENDED, RESUMED)) {
//delegate為創(chuàng)建SafeContinuation是傳的DispatchedContinuation
//在DispatchedContinuation的resumeWith中會(huì)用協(xié)程的調(diào)度器去把原始協(xié)程恢復(fù)
//從而讓掛起的協(xié)程恢復(fù)在原來的線程環(huán)境里面蜜另。
delegate.resumeWith(result)
return
}
else -> throw IllegalStateException("Already resumed")
}
}
}
suspendCoroutine
的整個(gè)流程就差不多完事了。
suspendCancellableCoroutine
這個(gè)和suspendCoroutine
在使用行為上基本一致嫡意,只是它多了一個(gè)可支持取消的功能。
public suspend inline fun <T> suspendCancellableCoroutine(
crossinline block: (CancellableContinuation<T>) -> Unit
): T =
suspendCoroutineUninterceptedOrReturn { uCont ->
val cancellable = CancellableContinuationImpl(uCont.intercepted(), resumeMode = MODE_CANCELLABLE)
cancellable.initCancellability()
block(cancellable)
cancellable.getResult()
}
從他的源碼中可以看出它里面不是使用的SafeContinuation
捣辆。而是CancellableContinuationImpl
蔬螟。我們拿到這個(gè)CancellableContinuationImpl
后可以調(diào)用他的invokeOnCancellation
注冊(cè)一個(gè)協(xié)程取消的監(jiān)聽。一遍再協(xié)程取消時(shí)取消我們的耗時(shí)任務(wù)汽畴。
此處就在過多分析旧巾。感興趣的可以自行研究。
kotlin 還有一個(gè)internal的suspendCancellableCoroutineReusable
函數(shù)忍些,這個(gè)函數(shù)和suspendCancellableCoroutine
很像鲁猩,重在一個(gè)Resuable。意思即使它不用每次都重新重建CancellableContinuationImpl對(duì)象