關(guān)鍵詞:Kotlin 協(xié)程 協(xié)程取消 任務(wù)停止
協(xié)程的任務(wù)的取消需要靠協(xié)程內(nèi)部調(diào)用的協(xié)作支持,這就類似于我們線程中斷以及對中斷狀態(tài)的響應(yīng)一樣规个。
1. 線程的中斷
我們先從大家熟悉的話題講起萧豆。線程有一個被廢棄的 stop
方法,這個方法會讓線程立即死掉谜酒,并且釋放它持有的鎖本姥,這樣會讓它正在讀寫的存儲處于一個不安全的狀態(tài),因此 stop
被廢棄了焦辅。如果我們啟動了一個線程并讓它執(zhí)行一些任務(wù)鸟召,但很快我們就后悔了,stop
還不讓用氨鹏,那該怎么辦?
val thread = thread {
...
}
thread.stop() // !!! Deprecated!!!
我們應(yīng)該想辦法讓線程內(nèi)部正在運行的任務(wù)跟我們合作把任務(wù)停掉压状,這樣線程內(nèi)部的任務(wù)停止之前還有機會清理一些資源仆抵,比如關(guān)閉流等等跟继。
val thread = thread {
try {
Thread.sleep(10000)
} catch (e: InterruptedException) {
log("Interrupted, do cleaning stuff.")
}
}
thread.interrupt()
像 sleep
這樣的方法調(diào)用,文檔明確指出它支持 InterruptedException
镣丑,因此當線程被標記為中斷狀態(tài)時舔糖,它就會拋出 InterruptedException
,那么我們自然就可以捕獲異常并做資源清理了莺匠。
所以請注意所謂的協(xié)作式的任務(wù)終止金吗,協(xié)程的取消也就是 cancel
機制的思路也是如此。
2. 協(xié)程類似的例子
我們來看一個協(xié)程取消的例子:
fun main() = runBlocking {
val job1 = launch { // ①
log(1)
delay(1000) // ②
log(2)
}
delay(100)
log(3)
job1.cancel() // ③
log(4)
}
這次我們用了一個不一樣的寫法趣竣,我們沒有用 suspend main摇庙,而是直接用 runBlocking
啟動協(xié)程,這個方法在 Native 上也存在遥缕,都是基于當前線程啟動一個類似于 Android 的 Looper 的死循環(huán)卫袒,或者叫消息隊列,可以不斷的發(fā)送消息給它進行處理单匣。runBlocking
會啟動一個 Job
夕凝,因此這里也存在默認的作用域,不過這對于我們今天的討論暫時沒有太大影響户秤。
這段代碼 ① 處啟動了一個子協(xié)程码秉,它內(nèi)部先輸出 1,接著開始 delay
鸡号, delay
與線程的 sleep
不同转砖,它不會阻塞線程,你可以認為它實際上就是觸發(fā)了一個延時任務(wù)膜蠢,告訴協(xié)程調(diào)度系統(tǒng) 1000ms 之后再來執(zhí)行后面的這段代碼(也就是 log(2))堪藐;而在這期間,我們在 ③ 處對剛才啟動的協(xié)程觸發(fā)了取消挑围,因此在 ② 處的 delay
還沒有回調(diào)的時候協(xié)程就被取消了礁竞,因為 delay
可以響應(yīng)取消,因此 delay
后面的代碼就不會再次調(diào)度了杉辙,不調(diào)度的原因也很簡單模捂,② 處的 delay
會拋一個 CancellationException
:
...
log(1)
try {
delay(1000)
} catch (e: Exception) {
log("cancelled. $e")
}
log(2)
...
那么輸出的結(jié)果就不一樣了:
06:54:56:361 [main] 1
06:54:56:408 [main] 3
06:54:56:411 [main] 4
06:54:56:413 [main] cancelled. kotlinx.coroutines.JobCancellationException: Job was cancelled; job=StandaloneCoroutine{Cancelling}@e73f9ac
06:54:56:413 [main] 2
大家看,這與線程的中斷邏輯是不是非常的類似呢蜘矢?
3. 完善我們之前的例子
之前我們有個例子狂男,上一篇文章已經(jīng)加入了異常處理邏輯,那么這次我們給它加上取消邏輯品腹。之前是這樣:
suspend fun getUserCoroutine() = suspendCoroutine<User> { continuation ->
getUser(object : Callback<User> {
override fun onSuccess(value: User) {
continuation.resume(value)
}
override fun onError(t: Throwable) {
continuation.resumeWithException(t)
}
})
}
加取消邏輯岖食,那需要我們的 getUser
回調(diào)版本支持取消,我們看下我們的 getUser
是怎么實現(xiàn)的:
fun getUser(callback: Callback<User>) {
val call = OkHttpClient().newCall(
Request.Builder()
.get().url("https://api.github.com/users/bennyhuo")
.build())
call.enqueue(object : okhttp3.Callback {
override fun onFailure(call: Call, e: IOException) {
callback.onError(e)
}
override fun onResponse(call: Call, response: Response) {
response.body()?.let {
try {
callback.onSuccess(User.from(it.string()))
} catch (e: Exception) {
callback.onError(e) // 這里可能是解析異常
}
}?: callback.onError(NullPointerException("ResponseBody is null."))
}
})
}
我們發(fā)了個網(wǎng)絡(luò)請求給 Github舞吭,讓它把一個叫 bennyhuo
的用戶信息返回來泡垃,我們知道 OkHttp 的這個 Call
是支持 cancel
的析珊, 取消后,網(wǎng)絡(luò)請求過程中如果讀取到這個取消的狀態(tài)蔑穴,就會把請求給停止掉忠寻。既然這樣,我們干脆直接改造 getUser
好了存和,這樣還能省掉我們自己的 Callback
回調(diào)過程:
suspend fun getUserCoroutine() = suspendCancellableCoroutine<User> { continuation ->
val call = OkHttpClient().newCall(...)
continuation.invokeOnCancellation { // ①
log("invokeOnCancellation: cancel the request.")
call.cancel()
}
call.enqueue(object : okhttp3.Callback {
override fun onFailure(call: Call, e: IOException) {
log("onFailure: $e")
continuation.resumeWithException(e)
}
override fun onResponse(call: Call, response: Response) {
log("onResponse: ${response.code()}")
response.body()?.let {
try {
continuation.resume(User.from(it.string()))
} catch (e: Exception) {
continuation.resumeWithException(e)
}
} ?: continuation.resumeWithException(NullPointerException("ResponseBody is null."))
}
})
}
我們這里用到了 suspendCancellableCoroutine
奕剃,而不是之前的 suspendCoroutine
,這就是為了讓我們的掛起函數(shù)支持協(xié)程的取消捐腿。該方法將獲取到的 Continuation
包裝成了一個 CancellableContinuation
纵朋,通過調(diào)用它的 invokeOnCancellation
方法可以設(shè)置一個取消事件的回調(diào),一旦這個回調(diào)被調(diào)用叙量,那么意味著 getUserCoroutine
調(diào)用所在的協(xié)程被取消了倡蝙,這時候我們也要相應(yīng)的做出取消的響應(yīng),也就是把 OkHttp 發(fā)出去的請求給取消掉绞佩。
那么我們在調(diào)用它的時候寺鸥,如果遇到了取消,會怎么樣呢品山?
val job1 = launch { //①
log(1)
val user = getUserCoroutine()
log(user)
log(2)
}
delay(10)
log(3)
job1.cancel()
log(4)
注意我們啟動 ① 之后僅僅延遲了 10ms 就取消了它胆建,網(wǎng)絡(luò)請求的速度一般來講還不會這么快,因此取消的時候大概率 getUserCoroutine
被掛起了肘交,因此結(jié)果大概率是:
07:31:30:751 [main] 1
07:31:31:120 [main] 3
07:31:31:124 [main] invokeOnCancellation: cancel the request.
07:31:31:129 [main] 4
07:31:31:131 [OkHttp https://api.github.com/...] onFailure: java.io.IOException: Canceled
我們發(fā)現(xiàn)笆载,取消的回調(diào)被調(diào)用了,OkHttp 在收到我們的取消指令之后涯呻,也確實停止了網(wǎng)絡(luò)請求凉驻,并且回調(diào)給我們一個 IO 異常,這時候我們的協(xié)程已經(jīng)被取消复罐,在處于取消狀態(tài)的協(xié)程上調(diào)用 Continuation.resume
涝登、 Continuation.resumeWithException
或者 Continuation.resumeWith
都會被忽略,因此 OkHttp 回調(diào)中我們收到 IO 異常后調(diào)用的 continuation.resumeWithException(e)
不會有任何副作用效诅。
4. 再談 Retrofit 的協(xié)程擴展
4.1 Jake Wharton 的 Adapter 存在的問題
我在破解 Kotlin 協(xié)程 - 入門篇 提到了 Jake Wharton 大神為 Retrofit 寫的 協(xié)程 Adapter胀滚,
implementation 'com.jakewharton.retrofit:retrofit2-kotlin-coroutines-adapter:0.9.2'
它確實可以完成網(wǎng)絡(luò)請求,不過有細心的小伙伴發(fā)現(xiàn)了它的問題:它怎么取消呢乱投?我們把使用它的代碼貼出來:
interface GitHubServiceApi {
@GET("users/{login}")
fun getUserCoroutine(@Path("login") login: String): Deferred<User>
}
定義好接口咽笼,創(chuàng)建 Retrofit 實例的時候傳入對應(yīng)的 Adapter:
val gitHubServiceApi by lazy {
val retrofit = retrofit2.Retrofit.Builder()
.baseUrl("https://api.github.com")
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(CoroutineCallAdapterFactory()) // 這里添加 Adapter
.build()
retrofit.create(GitHubServiceApi::class.java)
}
用的時候就這樣:
val deferred = gitHubServiceApi.getUserCoroutine("bennyhuo")
try {
showUser(deferred.await())
} catch (e: Exception) {
showError(e)
}
如果要取消,我們可以直接調(diào)用 deferred.cancel()
戚炫,例如:
log("1")
val deferred = gitHubServiceApi.getUserCoroutine("bennyhuo")
log("2")
withContext(Dispatchers.IO){
deferred.cancel()
}
try {
showUser(deferred.await())
} catch (e: Exception) {
showError(e)
}
運行結(jié)果如下:
12:59:54:185 [DefaultDispatcher-worker-1] 1
12:59:54:587 [DefaultDispatcher-worker-1] 2
kotlinx.coroutines.JobCancellationException: Job was cancelled; job=CompletableDeferredImpl{Cancelled}@36699211
這種情況下剑刑,其實網(wǎng)絡(luò)請求確實是被取消的,這一點我們可以看下源碼的處理:
...
override fun adapt(call: Call<T>): Deferred<T> {
val deferred = CompletableDeferred<T>()
deferred.invokeOnCompletion { // ①
if (deferred.isCancelled) {
call.cancel()
}
}
call.enqueue(object : Callback<T> {
...
}
}
...
注意 ① 處双肤,invokeOnCompletion
在協(xié)程進入完成狀態(tài)時觸發(fā)施掏,包括異常和正常完成层宫,那么在這時候如果發(fā)現(xiàn)它的狀態(tài)是已經(jīng)取消的,那么結(jié)果就直接調(diào)用 Call
的取消即可其监。
這看上去確實很正常啊~ 不過 @阿永 在公眾號的評論里面提到了一個 Case,仔細一看還真是有問題限匣。我們給出示例來復(fù)現(xiàn)這個 Case:
val job = GlobalScope.launch {
log("1")
val deferred = gitHubServiceApi.getUserCoroutine("bennyhuo")
log("2")
deferred.invokeOnCompletion {
log("invokeOnCompletion, $it, ${deferred.isCancelled}")
}
try {
showUser(deferred.await())
} catch (e: Exception) {
showError(e)
}
log(3)
}
delay(10)
job.cancelAndJoin()
我們啟動一個協(xié)程抖苦,在其中執(zhí)行網(wǎng)絡(luò)請求,那么正常來說米死,這時候 getUserCoroutine
返回的 Deferred
可以當做一個子協(xié)程锌历,它應(yīng)當遵循默認的作用域規(guī)則,在父作用域取消時被取消掉峦筒,但現(xiàn)實卻并不是這樣:
13:06:54:332 [DefaultDispatcher-worker-1] 1
13:06:54:829 [DefaultDispatcher-worker-1] 2
kotlinx.coroutines.JobCancellationException: Job was cancelled; job=StandaloneCoroutine{Cancelling}@19aea38c
13:06:54:846 [DefaultDispatcher-worker-1] 3
13:06:56:937 [OkHttp https://api.github.com/...] invokeOnCompletion, null, false
我們看到在調(diào)用 deferred.await()
的時候拋了個取消異常究西,這主要是因為 await()
所在的協(xié)程已經(jīng)被我們用 cancelAndJoin()
取消,但從隨后 invokeOnCompletion
的回調(diào)結(jié)果來看物喷, getUserCoroutine
返回的 Deferred
并沒有被取消卤材,再仔細一看,時間上這個回調(diào)比前面的操作晚了 2s峦失,那必然是網(wǎng)絡(luò)請求返回之后才回調(diào)的扇丛。
所以問題究竟在哪里?在 CoroutineCallAdapterFactory
的實現(xiàn)中尉辑,為了實現(xiàn)異步轉(zhuǎn)換帆精,手動創(chuàng)建了一個 CompletableDeferred
:
override fun adapt(call: Call<T>): Deferred<T> {
val deferred = CompletableDeferred<T>() // ①
...
}
這個 CompletableDeferred
本身就是一個 Job
的實現(xiàn),它的構(gòu)造可接受一個 Job
實例作為它的父協(xié)程隧魄,那么問題來了卓练,這里并沒有告訴它父協(xié)程究竟是誰,因此也就談不上作用域的事兒了购啄,這好像我們用 GlobalScope.launch
啟動了一個協(xié)程一樣襟企。如果大家在 Android 當中使用 MainScope
,那么同樣因為前面說到的這個原因闸溃,導(dǎo)致 CompletableDeferred
沒有辦法被取消整吆。
@阿永 在公眾號評論中提到這個問題,并提到了一個比較好的解決方案辉川,下面我們?yōu)榇蠹以敿毥榻B表蝙。感謝 @阿永。
說到這里我們再簡單回顧下乓旗,作用域主要有 GlobalScope
府蛇、coroutineScope
、supervisorScope
屿愚,對于取消汇跨,除了 supervisorScope
比較特別是單向取消务荆,即父協(xié)程取消后子協(xié)程都取消,Android 中 MainScope
就是一個調(diào)度到 UI 線程的 supervisorScope
穷遂;coroutineScope
的邏輯則是父子相互取消的邏輯函匕;而 GlobalScope
會啟動一個全新的作用域,與它外部隔離蚪黑,內(nèi)部遵循默認的協(xié)程作用域規(guī)則盅惜。
那么有沒有辦法解決這個問題呢?
直接解決還是比較困難的忌穿,因為 CompletableDeferred
構(gòu)造所處的調(diào)用環(huán)境不是 suspend 函數(shù)抒寂,因而也沒有辦法拿到(很可能根本就沒有!)父協(xié)程掠剑。
4.2 如何正確的將回調(diào)轉(zhuǎn)換為協(xié)程
前面我們提到既然 adapt
方法不是 suspend 方法屈芜,那么我們是不是應(yīng)該在其他位置創(chuàng)建協(xié)程呢?
其實我們前面在講 getUserCoroutine
的時候就不斷為大家展示了如何將一個回調(diào)轉(zhuǎn)換為協(xié)程調(diào)用的方法:
suspend fun getUserCoroutine() = suspendCancellableCoroutine<User> { continuation ->
...
}
suspendCancellableCoroutine
跟最初我們提到的 suspendCoroutine
一樣朴译,都是要獲取當前協(xié)程的 Continuation
實例井佑,這實際上就相當于要繼承當前協(xié)程的上下文,因此我們只需要在真正需要切換協(xié)程的時候再去做這個轉(zhuǎn)換即可:
public suspend fun <T : Any> Call<T>.await(): T {
return suspendCancellableCoroutine { continuation ->
enqueue(object : Callback<T> {
override fun onResponse(call: Call<T>?, response: Response<T?>) {
continuation.resumeWith(runCatching { // ①
if (response.isSuccessful) {
response.body()
?: throw NullPointerException("Response body is null: $response")
} else {
throw HttpException(response)
}
})
}
override fun onFailure(call: Call<T>, t: Throwable) {
if (continuation.isCancelled) return // ②
continuation.resumeWithException(t)
}
})
continuation.invokeOnCancellation {
try {
cancel()
} catch (ex: Throwable) { // ③
//Ignore cancel exception
}
}
}
}
大家看著這段代碼會不會很眼熟动分?這與我們 getUserCoroutine
的寫法幾乎如出一轍毅糟,不過有幾處細節(jié)值得關(guān)注,我用數(shù)字標注了他們的位置:
- ① 處
runCatching
可以將一段代碼的運行結(jié)果或者拋出的異常封裝到一個Result
類型當中澜公,Kotlin 1.3 開始新增了Continuation.resumeWith(Result)
這個方法姆另, 這個點比起我們前面的寫法更具 Kotlin 風格。 - ② 處在異常拋出時坟乾,判斷了是否已經(jīng)被取消迹辐。實際上如果網(wǎng)絡(luò)請求被取消,這個回調(diào)確實會被調(diào)到甚侣,那么由于取消的操作是協(xié)程的由
Continuation
的取消發(fā)起的明吩,因此這時候沒必要再調(diào)用continuation.resumeWithException(t)
來將異常再拋回來了。盡管我們前面其實也提到過殷费,這時候繼續(xù)調(diào)用continuation.resumeWithException(t)
也沒有任何邏輯上的副作用印荔,但性能上多少還是會有一些開銷。 - ③ 處详羡,盡管
Call.cancel
的調(diào)用比較安全仍律,但網(wǎng)絡(luò)環(huán)境和狀態(tài)難免情況復(fù)雜,因此對異常進行捕獲會讓這段代碼更加健壯实柠。如果cancel
拋異常而沒有捕獲的話水泉,那么等同于協(xié)程體內(nèi)部拋出異常,具體如何傳播看所在作用域的相關(guān)定義了。
需要指出的是草则,這段代碼片段源自 gildor/kotlin-coroutines-retrofit 钢拧,大家也可以直接添加依賴進行使用:
compile 'ru.gildor.coroutines:kotlin-coroutines-retrofit:1.1.0'
這個框架代碼量很少,但經(jīng)過各路 Kotlin 協(xié)程專家的錘煉炕横,邏輯手法很細膩源内,值得大家學(xué)習(xí)。
5. 小結(jié)
這篇文章我們從線程中斷的概念切入份殿,類比學(xué)習(xí)協(xié)程的取消姿锭,實際上大家就會發(fā)現(xiàn)這二者從邏輯上和場景上有多么的相似。接著我們將之前我們一直提到的回調(diào)轉(zhuǎn)協(xié)程的例子進一步升級伯铣,支持取消,這樣大家就可以輕易的將回調(diào)轉(zhuǎn)變?yōu)閰f(xié)程的掛起調(diào)用了轮纫。最后我們還分析了一下 Retrofit 的協(xié)程擴展的一些問題和解決方法腔寡,這個例子也進一步可以引發(fā)我們對協(xié)程作用域以及如何將現(xiàn)有程序協(xié)程化的思考。
再稍微提一句掌唾,協(xié)程不是一個簡單的東西放前,畢竟它的原理涉及到對操作系統(tǒng)調(diào)度、程序運行機制這樣程序界畢竟原始的話題糯彬,但你說如果我對前面提到的這些都不是很熟悉或者根本沒有接觸過凭语,是不是就要跟協(xié)程拜拜了呢,其實也不是撩扒,只不過如果你對這些都不熟悉似扔,那么可能需要多加練習(xí)培養(yǎng)出感覺,而不必一開始就關(guān)注原理和細節(jié)搓谆,依樣畫葫蘆一樣可以用的很好炒辉,就像大家不知道 RxJava 原理一樣可以用的很好一樣,協(xié)程也可以做到這一點的泉手。
當然黔寇,作為一個有追求的程序員,我們不止要會用斩萌,還要用得好缝裤,無論如何我們都需要知道來龍去脈,這其中涉及到的基礎(chǔ)知識的欠缺也是需要盡快補充的颊郎,不能偷懶哈 :)
歡迎關(guān)注 Kotlin 中文社區(qū)憋飞!
中文官網(wǎng):https://www.kotlincn.net/
中文官方博客:https://www.kotliner.cn/
公眾號:Kotlin
知乎專欄:Kotlin
CSDN:Kotlin中文社區(qū)
開發(fā)者頭條:Kotlin中文社區(qū)