Kotlin - 協(xié)程 簡(jiǎn)介

[TOC]

Mind Map

簡(jiǎn)介

Coroutines are computer program components that generalize subroutines for non-preemptive multitasking, by allowing execution to be suspended and resumed. Coroutines are well-suited for implementing familiar program components such as cooperative tasks, exceptions, event loops, iterators, infinite lists and pipes.
協(xié)程(英語(yǔ):coroutine)是計(jì)算機(jī)程序的一類(lèi)組件颠悬,推廣了協(xié)作式多任務(wù)的子程序焊傅,允許執(zhí)行被掛起與被恢復(fù)台囱。相對(duì)子例程而言泳唠,協(xié)程更為一般和靈活仑氛,但在實(shí)踐中使用沒(méi)有子例程那樣廣泛抑堡。協(xié)程更適合于用來(lái)實(shí)現(xiàn)彼此熟悉的程序組件站玄,如協(xié)作式多任務(wù)、異常處理包券、事件循環(huán)纫谅、迭代器、無(wú)限列表和管道溅固。

以上是維基百科對(duì)協(xié)程的定義付秕。

簡(jiǎn)單來(lái)講,協(xié)程是一種輕量級(jí)線(xiàn)程侍郭。

相對(duì)于線(xiàn)程切換是由操作系統(tǒng)進(jìn)行調(diào)度的询吴,程序員無(wú)法進(jìn)行控制掠河。
而協(xié)程的調(diào)度是由程序員在代碼層面上進(jìn)行控制的,程序員可以通過(guò)控制suspend函數(shù)的掛起和恢復(fù)猛计,從而控制程序運(yùn)行流程唠摹,這在代碼的展示上,相當(dāng)于用同步的代碼書(shū)寫(xiě)異步程序有滑,代碼邏輯非常簡(jiǎn)潔易懂。

理論上嵌削,由于協(xié)程不涉及到操作系統(tǒng)調(diào)度毛好,因此只是在用戶(hù)態(tài)上進(jìn)行操作,而線(xiàn)程需要經(jīng)歷用戶(hù)態(tài)與內(nèi)核態(tài)之間的切換苛秕,所以協(xié)程性能更佳肌访。
但是不同的語(yǔ)言在實(shí)現(xiàn)上可能存在差異,比如本文所要介紹的 Kotlin 下的協(xié)程艇劫,其在 JVM 平臺(tái)上的內(nèi)部實(shí)現(xiàn)也是基于線(xiàn)程吼驶,因此如果其進(jìn)行了協(xié)程調(diào)度,存在一定的可能是進(jìn)行了線(xiàn)程切換店煞。

協(xié)程經(jīng)常拿來(lái)與線(xiàn)程進(jìn)行對(duì)比蟹演,它們彼此很相似,但是也很不同顷蟀。
可以簡(jiǎn)單理解如下:

  • 一個(gè)進(jìn)程可以包含多個(gè)線(xiàn)程
  • 一個(gè)線(xiàn)程可以包含多個(gè)協(xié)程

由于一個(gè)線(xiàn)程可以包含多個(gè)協(xié)程酒请,而協(xié)程具備掛起和恢復(fù)功能,也因此讓我們具備了在一個(gè)線(xiàn)程上執(zhí)行多個(gè)異步任務(wù)的能力鸣个。

:還是如上文所言羞反,Kotlin 協(xié)程的底層實(shí)現(xiàn)存在線(xiàn)程切換,因此異步任務(wù)可能執(zhí)行在另一條線(xiàn)程上囤萤。

名詞釋義

在具體介紹協(xié)程之前昼窗,需要先了解一下以下幾個(gè)概念:

  • 同步:執(zhí)行一個(gè)任務(wù)時(shí),調(diào)用者調(diào)用后即可獲取返回結(jié)果

  • 異步:執(zhí)行一個(gè)任務(wù)時(shí)涛舍,調(diào)用者調(diào)用后直接返回澄惊,不關(guān)心結(jié)果,而是等到任務(wù)結(jié)束時(shí)富雅,通過(guò)回調(diào)等方式通知調(diào)用者結(jié)果

同步與異步是針對(duì)返回結(jié)果來(lái)說(shuō)的缤削,
對(duì)于同步調(diào)用,由調(diào)用者主動(dòng)獲取結(jié)果吹榴。
對(duì)于異步調(diào)用亭敢,調(diào)用者是通過(guò)回調(diào)等方式被動(dòng)獲取結(jié)果的。

簡(jiǎn)單理解图筹,比如對(duì)于一個(gè)函數(shù)調(diào)用帅刀,
同步調(diào)用就是調(diào)用函數(shù)后让腹,直接就可以獲取結(jié)果。
異步調(diào)用就是調(diào)用函數(shù)后扣溺,不關(guān)心結(jié)果骇窍,等函數(shù)體內(nèi)的任務(wù)結(jié)束時(shí),通過(guò)回調(diào)等方式通知調(diào)用者結(jié)果锥余。

  • 阻塞:執(zhí)行一個(gè)任務(wù)時(shí)腹纳,當(dāng)前調(diào)用線(xiàn)程調(diào)用后立即被掛起,無(wú)法執(zhí)行后續(xù)代碼

  • 非阻塞:執(zhí)行一個(gè)任務(wù)時(shí)驱犹,當(dāng)前調(diào)用線(xiàn)程調(diào)用后立即返回嘲恍,可繼續(xù)執(zhí)行后續(xù)代碼

阻塞和非阻塞是針對(duì)當(dāng)前線(xiàn)程是否具備 CPU 執(zhí)行權(quán)來(lái)說(shuō)的,
對(duì)于阻塞調(diào)用雄驹,調(diào)用不立即返回佃牛,當(dāng)前線(xiàn)程被掛起,失去 CPU 執(zhí)行權(quán)医舆,直至調(diào)用任務(wù)完成俘侠,返回結(jié)果。
對(duì)于非阻塞調(diào)用蔬将,調(diào)用立即返回爷速,當(dāng)前線(xiàn)程仍然擁有 CPU 執(zhí)行權(quán),可繼續(xù)執(zhí)行后續(xù)代碼霞怀。

:可以講上述描述中的 任務(wù) 理解為 函數(shù)遍希,更加直觀。

協(xié)程涉及到的一些概念

  • 掛起函數(shù):即suspend函數(shù)里烦,如下代碼所示即為一個(gè)掛起函數(shù):

    suspend fun delay(timeMillis: Long): Unit{
        ...
    }
    

    Kotlin 中凿蒜,suspend關(guān)鍵字具備如下幾層含義:

    • Kotlin 中規(guī)定:掛起函數(shù)只能在協(xié)程或者其他suspend函數(shù)中使用,其實(shí)就相當(dāng)于掛起函數(shù)只能直接或間接地在協(xié)程中進(jìn)行調(diào)用胁黑。
    • suspend關(guān)鍵字只是起一個(gè)標(biāo)識(shí)作用废封,用以表明被suspend修飾的函數(shù)(也即掛起函數(shù))內(nèi)部存在耗時(shí)操作,因此必須放置在協(xié)程中進(jìn)行調(diào)用丧蘸。
    • suspend關(guān)鍵字標(biāo)識(shí)一個(gè)掛起點(diǎn)漂洋,掛起點(diǎn)具備掛起和恢復(fù)執(zhí)行作用。當(dāng)協(xié)程調(diào)用suspend函數(shù)時(shí)力喷,會(huì)掛起當(dāng)前協(xié)程刽漂,開(kāi)啟一個(gè)異步任務(wù),當(dāng)異步任務(wù)完成后弟孟,就會(huì)在當(dāng)前掛起點(diǎn)恢復(fù)協(xié)程贝咙,繼續(xù)該協(xié)程后續(xù)任務(wù)。
  • 協(xié)程上下文(Coroutine Context):協(xié)程上下文是一系列規(guī)則和配置的集合拂募,它決定了協(xié)程的運(yùn)行方式庭猩。
    kotlinx.coroutines 提供的協(xié)程構(gòu)造器(Coroutine Builder)窟她,比如launchasync等蔼水,都會(huì)接收一個(gè)協(xié)程上下文對(duì)象震糖,主要用于設(shè)置協(xié)程異步任務(wù)的執(zhí)行線(xiàn)程策略。如下代碼所示:

    suspend fun main() {
        GlobalScope.launch(context = Dispatchers.Default) {
            println("run inside: ${Thread.currentThread().name}")
        }.join()
    }
    
  • 結(jié)構(gòu)化并發(fā)(Structured concurrency):如果一個(gè)協(xié)程內(nèi)部創(chuàng)建了一個(gè)或多個(gè)子協(xié)程趴腋,只要所有子協(xié)程在父協(xié)程作用域結(jié)束前執(zhí)行完成吊说,就認(rèn)為當(dāng)前協(xié)程具備結(jié)構(gòu)化并發(fā)。如下圖所示:

    Structured concurrency

    更具體來(lái)說(shuō)优炬,當(dāng)父協(xié)程結(jié)束時(shí)颁井,如果其子協(xié)程仍在運(yùn)行谢揪,則父協(xié)程會(huì)阻塞自己(即當(dāng)前協(xié)程)巩掺,讓子協(xié)程運(yùn)行完成后才退出,也就是無(wú)需顯示使用join來(lái)確保子協(xié)程運(yùn)行結(jié)束。

    舉個(gè)例子糊余,如果我們使用GlobalScope.launch創(chuàng)建一個(gè)全局協(xié)程,然后為其添加幾個(gè)子協(xié)程单寂,那么我們必須手動(dòng)對(duì)所有子協(xié)程進(jìn)行管理贬芥,如下所示:

    suspend fun main() {
        GlobalScope.launch {
            // 創(chuàng)建一個(gè)子協(xié)程
            val job1 = launch {
                delay(100)
                println("Hello World")
            }
            // 再創(chuàng)建一個(gè)子協(xié)程
            val job2 = launch {
                delay(200)
                println("Hello World again!!")
            }
            job1.join()
            job2.join()
        }
        // 延遲程序退出
        delay(500)
    }
    

    可以看到,上述代碼中宣决,為了實(shí)現(xiàn)結(jié)構(gòu)化并發(fā)蘸劈,我們必須手動(dòng)維護(hù)所有子協(xié)程協(xié)程狀態(tài),這樣才能確保在父協(xié)程退出前尊沸,子協(xié)程能完成任務(wù)威沫。

    :事實(shí)上,上述例子并未實(shí)現(xiàn)結(jié)構(gòu)化并發(fā)洼专,因?yàn)楦竻f(xié)程GlobalScope作用域其實(shí)已經(jīng)退出了棒掠,只是通過(guò)delay讓子協(xié)程運(yùn)行,造成子協(xié)程都能運(yùn)行完成效果屁商,但事實(shí)上子協(xié)程生命周期超過(guò)了父協(xié)程烟很。

    如果需要自己手動(dòng)維護(hù)結(jié)構(gòu)化并發(fā),操作會(huì)相對(duì)繁瑣蜡镶,因此雾袱,Kotlin 已經(jīng)為我們提供了具備結(jié)構(gòu)化并發(fā)功能協(xié)程構(gòu)造器,比如coroutineScope官还,如下所示:

    suspend fun main() {
        GlobalScope.launch {
            // 子協(xié)程:具備結(jié)構(gòu)化并發(fā)
            coroutineScope {
                // 創(chuàng)建一個(gè)子協(xié)程
                val job1 = launch {
                    delay(100)
                    println("Hello World")
                }
                // 再創(chuàng)建一個(gè)子協(xié)程
                val job2 = launch {
                    delay(200)
                    println("Hello World again!!")
                }
            }
        }
        // 延遲程序退出
        delay(500)
    }
    

    coroutineScope創(chuàng)建的協(xié)程具備結(jié)構(gòu)化并發(fā)功能芹橡,因此只有等到其子協(xié)程完成時(shí),父協(xié)程才可以退出望伦,這樣我們就無(wú)需維護(hù)各個(gè)子協(xié)程的狀態(tài)了僻族。

    結(jié)構(gòu)化并發(fā)強(qiáng)調(diào)的是子協(xié)程的生命周期小于或等于父協(xié)程生命周期粘驰,因此,結(jié)構(gòu)化并發(fā)的一個(gè)最大的好處就是可以很方便地管理協(xié)程述么,比如只要父協(xié)程被取消蝌数,其內(nèi)所有協(xié)程也都會(huì)被自動(dòng)取消。

基本使用

下面用代碼來(lái)創(chuàng)建一個(gè)最簡(jiǎn)單的協(xié)程度秘,了解一下基本的使用顶伞。具體步驟如下:

  • 導(dǎo)入依賴(lài):Kotlin 協(xié)程的官方框架為kotlinx.coroutines,該框架未集成在 Kotlin 的標(biāo)準(zhǔn)庫(kù)中剑梳,因此需要我們手動(dòng)進(jìn)行導(dǎo)入:

    // project: build.gradle
    buildscript {
        ext.kotlin_coroutines = "1.3.8"
    }
    
    // app: build.gradle
    dependencies {
        implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:$kotlin_coroutines"
        // Android 平臺(tái)需要額外導(dǎo)入 kotlinx-coroutines-android
        implementation "org.jetbrains.kotlinx:kotlinx-coroutines-android:$kotlin_coroutines"
    }
    
  • 創(chuàng)建協(xié)程:以下代碼使用runBlocking函數(shù)創(chuàng)建了一個(gè)協(xié)程:

    fun main() {
        runBlocking {
            println("coroutineScope starts: ${Thread.currentThread().name}")
            launch {
                delay(1000)
                println("son coroutine: ${Thread.currentThread().name}")
            }
            println("coroutineScope ends: ${Thread.currentThread().name}")
        }
    }
    

    運(yùn)行上述代碼唆貌,可得到如下輸出:

    可以看到,我們?cè)谥骶€(xiàn)程中通過(guò)協(xié)程以非阻塞方式完成了異步任務(wù)垢乙。

協(xié)程作用域

每個(gè)協(xié)程都擁有自己的作用域范圍锨咙,kotlinx.coroutines 提供了多種協(xié)程作用域,以下介紹常見(jiàn)的幾種:

  • CoroutineScope:創(chuàng)建一個(gè)協(xié)程通用作用域追逮。其文檔如下圖所示:

    CoroutineScope

    最佳的協(xié)程作用域?qū)嵗齽?chuàng)建是通過(guò)CoroutineScopeMainScope內(nèi)置的工廠(chǎng)方法(比如CoroutineScope()酪刀、MainScope())進(jìn)行創(chuàng)建,
    額外的上下文元素可以通過(guò)+操作符進(jìn)行添加(因?yàn)?kotlinx.coroutines 覆寫(xiě)了plus操作符)

  • GlobalScope:全局協(xié)程作用域钮孵。具體內(nèi)容如下圖所示:

    GlobalScope

    從文檔可以看到骂倘,GlobalScope實(shí)現(xiàn)了CoroutineScope,并自行實(shí)現(xiàn)了全局功能巴席。

    一個(gè)全局協(xié)程的生命周期與應(yīng)用的生命周期一致历涝。

  • MainScope:創(chuàng)建一個(gè) UI 組件的協(xié)程作用域。其文檔如下圖所示:

    MainScope

    MainScope自帶SupervisorJobDispatchers.Main上下文元素漾唉。
    可以通過(guò)+操作符為其作用域范圍增添新元素:val scope = MainScope() + CoroutineName("MyActivity")

    :如果在 Android 平臺(tái)上荧库,可以通過(guò)MainScope()方法創(chuàng)建一個(gè)主線(xiàn)程作用域(Dispatchers.Main)協(xié)程,然后通過(guò)為其創(chuàng)建子協(xié)程來(lái)執(zhí)行異步任務(wù)赵刑,然后在資源釋放出直接關(guān)閉協(xié)程即可分衫,這樣就將界面與異步任務(wù)連接了起來(lái),會(huì)很方便進(jìn)行管理料睛。如下所示:

    class Activity {
        private val mainScope = MainScope()
    
        fun onCreate(){
            // 執(zhí)行異步任務(wù)
            this.mainScope.launch(Dispatchers.IO) {
                // ...
            }
    
        }
        
        fun destroy() {
            mainScope.cancel()
        }
    }
    

    事實(shí)上丐箩,其實(shí) Android 已經(jīng)對(duì)具備生命周期的組件實(shí)體都內(nèi)置了相應(yīng)的協(xié)程作用域支持,具體內(nèi)容可查看:lifecyclescope

創(chuàng)建協(xié)程

kotlinx.coroutines 提供了多種協(xié)程構(gòu)造器讓我們創(chuàng)建協(xié)程恤煞。以下列舉一些常見(jiàn)的協(xié)程創(chuàng)建方式:

  • launchCoroutineScope的擴(kuò)展方法屎勘,用于創(chuàng)建一個(gè)新協(xié)程。其文檔如下圖所示:

    CoroutineScope.lauch

    launch函數(shù)啟動(dòng)的新協(xié)程不會(huì)阻塞當(dāng)前線(xiàn)程居扒,同時(shí)會(huì)返回一個(gè)Job對(duì)象概漱,可通過(guò)該對(duì)象取消當(dāng)前協(xié)程執(zhí)行。

    launch函數(shù)啟動(dòng)的協(xié)程會(huì)繼承父協(xié)程所在的協(xié)程上下文context喜喂,如果其上下文context不包含任意協(xié)程調(diào)度器dispatcher或者ContinuationInterceptor瓤摧,則默認(rèn)使用Dispatchers.default竿裂,即異步任務(wù)會(huì)在運(yùn)行在一條 CPU 密集型線(xiàn)程上。

    如果launch函數(shù)創(chuàng)建的協(xié)程內(nèi)部拋出未捕獲異常照弥,那么默認(rèn)情況下會(huì)導(dǎo)致其父協(xié)程取消(除非顯式設(shè)置了CoroutineExceptionHandler)腻异。

    默認(rèn)情況下,launch函數(shù)會(huì)立即執(zhí)行協(xié)程这揣,可以通過(guò)參數(shù)start來(lái)控制其啟動(dòng)選項(xiàng)悔常,比如設(shè)置為CoroutineStart.LAZY,則launch函數(shù)創(chuàng)建的協(xié)程不會(huì)立即啟動(dòng)(懶加載)给赞,可通過(guò)返回對(duì)象的Job.start()顯示進(jìn)行啟動(dòng)机打,或者當(dāng)調(diào)用Job.join()方法時(shí),則會(huì)隱式啟動(dòng)該協(xié)程片迅。
    如下代碼所示:

    suspend fun main() {
        val job = GlobalScope.launch(context = Dispatchers.Default) {
    
            println("${Thread.currentThread().name} --> 啟動(dòng)協(xié)程")
    
            launch {                                                                   // 1
                println("${Thread.currentThread().name} --> 開(kāi)啟一個(gè)子協(xié)程...1")
            }
    
            println("${Thread.currentThread().name} --> 創(chuàng)建懶加載子協(xié)程")
    
            val job1 = launch(start = CoroutineStart.LAZY) {                           // 2
                println("${Thread.currentThread().name} --> 開(kāi)啟一個(gè)懶加載子協(xié)程...2")
            }
    
            val job2 = launch(start = CoroutineStart.LAZY) {                           // 3
                println("${Thread.currentThread().name} --> 開(kāi)啟一個(gè)懶加載子協(xié)程...3")
            }
    
            println("${Thread.currentThread().name} --> 顯式啟動(dòng)懶加載子協(xié)程")
            job1.start()                                                               // 4
    
            println("${Thread.currentThread().name} --> 隱式啟動(dòng)懶加載子協(xié)程")
            job2.join()                                                                // 5
        }
        job.join()
    }
    

    上述代碼的運(yùn)行結(jié)果如下所示:


    運(yùn)行結(jié)果分析:首先我們通過(guò)GlobalScope.launch函數(shù)創(chuàng)建了一個(gè)全局協(xié)程残邀,并在該協(xié)程內(nèi)部創(chuàng)建了 3 個(gè)子協(xié)程,分別位于上述代碼的1柑蛇、23位置芥挣,其中,1位置創(chuàng)建了一個(gè)立即執(zhí)行的子協(xié)程唯蝶,且該協(xié)程會(huì)繼承父協(xié)程上下文九秀,因此launch(..)函數(shù)內(nèi)異步任務(wù)會(huì)立即自動(dòng)分發(fā)到Dispatchers.Default線(xiàn)程上進(jìn)行執(zhí)行遗嗽。
    而由于launch函數(shù)不會(huì)阻塞當(dāng)前線(xiàn)程粘我,我們創(chuàng)建的父協(xié)程位于線(xiàn)程池中的DefaultDispatcher-worker-1,因此父協(xié)程會(huì)立即執(zhí)行后續(xù)代碼痹换。
    然后父協(xié)程繼續(xù)往下執(zhí)行征字,經(jīng)歷23過(guò)程,分別創(chuàng)建了兩個(gè)懶加載子協(xié)程娇豫。
    然后再代碼4處匙姜,顯示啟動(dòng)了子協(xié)程job1,則此時(shí)會(huì)立即將異步任務(wù)分發(fā)給Dispatchers.Default線(xiàn)程進(jìn)行執(zhí)行冯痢。
    Job.start()是一個(gè)非阻塞調(diào)用氮昧,因此父協(xié)程代碼會(huì)繼續(xù)執(zhí)行到5處,通過(guò)Job.join()方法隱式啟動(dòng)子協(xié)程job2浦楣,同樣此時(shí)會(huì)將協(xié)程job2的異步任務(wù)分發(fā)到Dispatchers.Default線(xiàn)程進(jìn)行執(zhí)行袖肥。
    以上就得到上圖的最終輸出。

  • asyncCoroutineScope的擴(kuò)展方法振劳,用于創(chuàng)建一個(gè)新協(xié)程椎组。其文檔如下圖所示:

    CoroutineScope.async

    async函數(shù)的大部分性質(zhì)與launch函數(shù)一致,比如:

    • 為了確保結(jié)構(gòu)化并發(fā)历恐,asynclaunch一樣寸癌,當(dāng)其運(yùn)行失敗時(shí)专筷,會(huì)導(dǎo)致父協(xié)程被取消。
    • 其同樣繼承父協(xié)程上下文context蒸苇,如果其上下文context不包含任意協(xié)程調(diào)度器dispatcher或者ContinuationInterceptor磷蛹,則默認(rèn)使用Dispatchers.default
    • 默認(rèn)創(chuàng)建就自動(dòng)立即執(zhí)行溪烤,也可通過(guò)start參數(shù)控制其行為弦聂,比如CoroutineStart.LAZY來(lái)延時(shí)執(zhí)行。
      此時(shí)氛什,可以通過(guò)Deferred.start()函數(shù)來(lái)顯示啟動(dòng)協(xié)程莺葫,
      也可以通過(guò)Deferred.join()Deferred.await()Deferred.awaitAll等函數(shù)隱式啟動(dòng)協(xié)程枪眉。

    另外捺檬,asynclaunch主要的區(qū)別有以下兩個(gè):

    • 異常處理launch內(nèi)部可能會(huì)拋出異常,因此需用戶(hù)手動(dòng)進(jìn)行處理贸铜。而async默認(rèn)不會(huì)拋出異常堡纬,因?yàn)樗鼤?huì)認(rèn)為你最終必須調(diào)用await來(lái)獲取結(jié)果,因此蒿秦,async內(nèi)部如果出現(xiàn)異常烤镐,用戶(hù)需要在await處進(jìn)行捕獲。
      更多異常處理內(nèi)容棍鳖,請(qǐng)查看后文:異常處理

    • 返回值:一個(gè)最重大的區(qū)別就是炮叶,launch函數(shù)返回的是一個(gè)Job對(duì)象,該對(duì)象主要用于取消協(xié)程運(yùn)行渡处,而async函數(shù)返回的是一個(gè)Deferred對(duì)象镜悉,該對(duì)象不僅可以用于取消協(xié)程,更重要的是可以獲取異步任務(wù)返回結(jié)果医瘫!這是相當(dāng)有用的一個(gè)特性侣肄,如下代碼所示:

    :實(shí)際上,Deferred繼承于Jobpublic interface Deferred<out T> : Job

    suspend fun main() = coroutineScope {
    
        val deferred = async {
            // 模擬耗時(shí)異步任務(wù)
            delay(1000)
            // 返回 100
            100
        }
    
        val result = deferred.await()
        println("異步任務(wù)結(jié)果為:${result}")
    }
    

    上述代碼執(zhí)行結(jié)果如下圖所示:


    可以看到醇份,我們成功獲取得到異步任務(wù)返回的結(jié)果稼锅。

    需要注意的是,對(duì)于async開(kāi)啟的協(xié)程僚纷,我們使用的是Deferred.await()函數(shù)來(lái)獲取異步任務(wù)返回結(jié)果矩距。其文檔如下所示:

    await()

    await()會(huì)阻塞當(dāng)前協(xié)程(即async的父協(xié)程),直至async協(xié)程完成異步任務(wù)返回結(jié)果畔濒,
    或者當(dāng)取消協(xié)程時(shí)剩晴,則會(huì)拋出一個(gè)CancellationException異常。

    await()函數(shù)會(huì)阻塞當(dāng)前協(xié)程,但是不會(huì)阻塞當(dāng)前線(xiàn)程赞弥,因此當(dāng)當(dāng)前線(xiàn)程存在多個(gè)協(xié)程時(shí)毅整,await()會(huì)讓出執(zhí)行權(quán),當(dāng)前線(xiàn)程的其他協(xié)程就有機(jī)會(huì)得到執(zhí)行绽左。如下代碼所示:

    suspend fun main() {
    
        coroutineScope {
            launch {                                     // 1
                val deferred = async {
                    // 模擬耗時(shí)異步任務(wù)
                    delay(1000)
                    // 返回 100
                    100
                }
                val result = deferred.await()
                println("異步任務(wù)結(jié)果為:${result}")
            }
    
            launch {                                    // 2
                println("hello world")
            }
        }
    }
    

    當(dāng)1處子協(xié)程處于await()狀態(tài)時(shí)悼嫉,2處子協(xié)程就可以得到運(yùn)行,結(jié)果如下所示:

  • coroutineScope:創(chuàng)建一個(gè)通用作用域的新協(xié)程拼窥,并在該協(xié)程作用域范圍內(nèi)調(diào)用block塊戏蔑。其文檔如下圖所示:

    coroutineScope

    coroutineScope函數(shù)會(huì)繼承其父協(xié)程的上下文coroutineContext,但會(huì)覆蓋父協(xié)程上下文的Job對(duì)象鲁纠。

    coroutineScope函數(shù)主要設(shè)計(jì)用于并發(fā)分解任務(wù)总棵。但該協(xié)程內(nèi)任一子協(xié)程運(yùn)行失敗時(shí),該協(xié)程就會(huì)失敗改含,并會(huì)取消其內(nèi)所有子協(xié)程情龄。

    coroutineScope函數(shù)具備結(jié)構(gòu)化并發(fā),它會(huì)阻塞當(dāng)前協(xié)程捍壤,直至其函數(shù)體block執(zhí)行完畢和其所有子協(xié)程運(yùn)行完成時(shí)骤视,才會(huì)返回。

    舉個(gè)例子:代碼如下所示:

    suspend fun main() = coroutineScope { // 1
        coroutineScope {                  // 2
            delay(1000)
            println("World")
        }
        println("Hello")                  // 3
    }
    

    上述代碼的執(zhí)行結(jié)果如下所示:

    結(jié)果解析:上述代碼中鹃觉,如1處代碼所示专酗,我們首先通過(guò)coroutineScope函數(shù)創(chuàng)建一個(gè)協(xié)程。
    然后協(xié)程執(zhí)行盗扇,代碼會(huì)進(jìn)行到2處祷肯,此時(shí)又通過(guò)coroutineScope函數(shù)創(chuàng)建了一個(gè)子協(xié)程,又因?yàn)?code>coroutineScope函數(shù)會(huì)阻塞當(dāng)前協(xié)程粱玲,直到其內(nèi)部執(zhí)行完成躬柬,因此3處代碼暫時(shí)未能執(zhí)行拜轨,必須等到2處代碼內(nèi)部執(zhí)行完畢抽减,最終的結(jié)果就如上圖所示。

  • supervisorScope:創(chuàng)建一個(gè)新協(xié)程橄碾,并在該協(xié)程作用域范圍內(nèi)調(diào)用block塊卵沉。其文檔如下圖所示:

    supervisorScope

    supervisorScope函數(shù)會(huì)繼承其父協(xié)程的上下文coroutineContext,但會(huì)使用SupervisorJob覆蓋父協(xié)程上下文的Job對(duì)象法牲。

    對(duì)于supervisorScope函數(shù)創(chuàng)建的協(xié)程史汗,如果其內(nèi)任一子協(xié)程運(yùn)行失敗,不會(huì)導(dǎo)致該協(xié)程失敗拒垃,對(duì)其他子協(xié)程也不會(huì)造成影響停撞,因此可以自定義子協(xié)程失敗處理機(jī)制。

    如果是supervisorScope創(chuàng)建的協(xié)程自身出現(xiàn)錯(cuò)誤(比如block塊拋異常或取消該協(xié)程)戈毒,那么就導(dǎo)致該協(xié)程及其內(nèi)所有子協(xié)程失敗艰猬,但是不會(huì)取消父協(xié)程任務(wù)。

    supervisorScope具備結(jié)構(gòu)化并發(fā)埋市。

  • withContext:該函數(shù)會(huì)創(chuàng)建一個(gè)新協(xié)程冠桃,協(xié)程的異步任務(wù)會(huì)運(yùn)行在指定上下文對(duì)應(yīng)的線(xiàn)程中。其文檔如下所示:

    withContext

    withContext函數(shù)的執(zhí)行模型為:當(dāng)調(diào)用withContext函數(shù)創(chuàng)建一個(gè)協(xié)程時(shí)道宅,必須強(qiáng)制指定一個(gè)協(xié)程上下文context食听,然后withContext函數(shù)會(huì)立即在指定的上下文中執(zhí)行其block塊內(nèi)容,同時(shí)會(huì)阻塞當(dāng)前協(xié)程(即父協(xié)程)污茵,直至其block塊運(yùn)行完成(結(jié)構(gòu)化并發(fā))樱报,然后將block塊最末尾的代碼會(huì)作為異步任務(wù)的返回值。其執(zhí)行模型類(lèi)似于async(...).await()泞当。

    舉個(gè)例子:如下代碼所示:

    suspend fun main() = coroutineScope {
        println("${Thread.currentThread().name} --> 開(kāi)啟協(xié)程")
        val result = withContext(Dispatchers.Default){
            println("${Thread.currentThread().name} --> 模擬耗時(shí)任務(wù)")
            delay(1000)
            println("${Thread.currentThread().name} --> 耗時(shí)任務(wù)結(jié)束")
            10*10
        }
        println("${Thread.currentThread().name} --> 異步任務(wù)結(jié)果:${result}")
    }
    

    上述代碼的運(yùn)行結(jié)果如下圖所示:


    從上圖中可以看出肃弟,withContext函數(shù)在設(shè)置上下文時(shí)候,不僅會(huì)對(duì)其內(nèi)部的異步任務(wù)起作用零蓉,還會(huì)對(duì)coroutineScope后續(xù)協(xié)程作用域起改變上下文效果笤受。

    withContext強(qiáng)制我們必須為其指定一個(gè)協(xié)程上下文,其實(shí)最主要的目的是指定協(xié)程調(diào)度器(上下文包含了調(diào)度器)敌蜂,具體調(diào)度器取值可參考后文:協(xié)程上下文與調(diào)度器

  • withTimeout:創(chuàng)建一個(gè)超時(shí)協(xié)程箩兽,當(dāng)協(xié)程運(yùn)行時(shí)間超過(guò)給定時(shí)間后,拋出一個(gè)TimeoutCancellationException章喉。其文檔如下圖所示:

    withTimeout

    與其相似的還有一個(gè)函數(shù)withTimeoutOrNull汗贫,該函數(shù)功能與withTimeout一樣,只是在超時(shí)后秸脱,不拋出異常落包,而是直接返回null

    withTimeout/withTimeoutOrNull具備結(jié)構(gòu)化并發(fā)摊唇。

  • CoroutineScope(context)):工廠(chǎng)方法咐蝇,用于創(chuàng)建一個(gè)新的協(xié)程。其文檔如下所示:

    CoroutineScope(context)

    CoroutineScope(context)參數(shù)context如果不包含一個(gè)Job元素巷查,則會(huì)使用一個(gè)默認(rèn)的Job()元素有序。這樣子,協(xié)程取消或者任一子協(xié)程運(yùn)行失敗時(shí)岛请,在該作用域范圍內(nèi)的其他所有協(xié)程都會(huì)被取消旭寿,其行為與coroutineScope函數(shù)一致。

    CoroutineScope(context)不具備結(jié)構(gòu)化并發(fā)崇败。

  • runBlocking:創(chuàng)建一個(gè)新協(xié)程异袄,并阻塞當(dāng)前線(xiàn)程,直至協(xié)程運(yùn)行完成败砂。其文檔如下圖所示:

    runBlocking

    :由于runBlocking會(huì)阻塞當(dāng)前線(xiàn)程管嬉,因此在實(shí)際開(kāi)發(fā)中,建議不要使用runBlocking創(chuàng)建協(xié)程。
    該函數(shù)主要是設(shè)計(jì)用于在main函數(shù)或測(cè)試代碼中用于運(yùn)行帶有suspend函數(shù)的庫(kù)接口。

協(xié)程上下文與調(diào)度器(Coroutine Context and Dispatchers)

協(xié)程上下文是一系列不同元素的集合。其中主要元素是協(xié)程的Job檀蹋。

協(xié)程會(huì)一直運(yùn)行在某個(gè)協(xié)程上下文CoroutineContext中。

  • 調(diào)度器和線(xiàn)程:協(xié)程上下文包含一個(gè) 協(xié)程調(diào)度器CoroutineDispatcher云芦,該調(diào)度器決定了協(xié)程異步任務(wù)的執(zhí)行線(xiàn)程俯逾。

    Kotlin 提供了如下調(diào)度器可供我們選擇:

    • Dispatchers.Default:表示使用一種低并發(fā)的線(xiàn)程策略,該策略適合 CPU 密集型計(jì)算任務(wù)舅逸。
      所有的標(biāo)準(zhǔn)協(xié)程構(gòu)造器桌肴,比如launch,async,它們默認(rèn)的CoroutineDispatcher都為Dispatchers.Default
    • Dispatchers.IO:表示使用一種較高并發(fā)的線(xiàn)程策略琉历,適用于網(wǎng)絡(luò)請(qǐng)求坠七,文件讀寫(xiě)等多 IO 操作的場(chǎng)景。
    • Dispatchers.UnConfined:表示一個(gè)未綁定到特定線(xiàn)程的協(xié)程調(diào)度器旗笔。它會(huì)首先在調(diào)用者線(xiàn)程執(zhí)行協(xié)程彪置,當(dāng)遇到suspend函數(shù)恢復(fù)時(shí),線(xiàn)程又會(huì)切換到該suspend函數(shù)對(duì)應(yīng)的協(xié)程上下文中蝇恶。Dispatchers.UnConfined不強(qiáng)制使用任何線(xiàn)程策略拳魁,隨遇而安。開(kāi)發(fā)中建議慎重使用該策略撮弧,因?yàn)闀?huì)導(dǎo)致線(xiàn)程切換不可控潘懊。
    • Dispatchers.Main:表示運(yùn)行在 UI 主線(xiàn)程上。比如在 Android 平臺(tái)上贿衍,就表示不會(huì)開(kāi)啟子線(xiàn)程授舟,而是直接將任務(wù)運(yùn)行在主線(xiàn)程中。
      kotlinx.coroutines 中所有的協(xié)程構(gòu)造器贸辈,比如launch释树,async都可以接收一個(gè)可選參數(shù)CoroutineContext,為協(xié)程顯示指定運(yùn)行環(huán)境裙椭。

    :除了以上默認(rèn)調(diào)度器外躏哩,Kotlin 還提供相關(guān)函數(shù)可以讓我們創(chuàng)建其他類(lèi)型的調(diào)度器,比如一個(gè)很好用的基于單線(xiàn)程運(yùn)行環(huán)境的調(diào)度器:newSingleThreadContext(name)

當(dāng)launch/async等協(xié)程構(gòu)造器未顯示指定CoroutineContext時(shí)揉燃,它會(huì)繼承父協(xié)程的上下文,且異步任務(wù)會(huì)運(yùn)行在父協(xié)程上下文指定的線(xiàn)程環(huán)境中筋栋。

GlobalScope.launch默認(rèn)使用的調(diào)度器為Dispatchers.Default炊汤。

:為了更好地查看調(diào)度器調(diào)度結(jié)果,可以在執(zhí)行代碼時(shí),為 JVM 選項(xiàng)添加開(kāi)啟協(xié)程調(diào)式模式:-Dkotlinx.coroutines.debug抢腐,然后使用如下代碼就可以在運(yùn)行時(shí)打印出協(xié)程實(shí)例和其運(yùn)行所在的線(xiàn)程:

fun <T> log(msg: T) = println("[${Thread.currentThread().name}] $msg")

為了更方便查看協(xié)程姑曙,在開(kāi)啟協(xié)程調(diào)式模式后,也可以通過(guò)CoroutineName為新創(chuàng)建的協(xié)程添加名稱(chēng)迈倍,如下所示:

suspend fun main() {
    coroutineScope {
        log("set name for a coroutine")
        launch(CoroutineName("son-routine")) {
            log("this coroutine name is son-routine")
        }
    }
}

其運(yùn)行結(jié)果如下圖所示:


:協(xié)程上下文CoroutineContext覆寫(xiě)了plus操作符伤靠,因此我們可以通過(guò)+號(hào)連接多個(gè)協(xié)程上下文元素,如下所示:

launch(Dispatchers.Default + CoroutineName("test")) {
    println("I'm working in thread ${Thread.currentThread().name}")
}

取消機(jī)制

kotlinx.coroutines 提供的所有的suspend函數(shù)都是可以進(jìn)行 取消(cancellable) 的啼染。他們會(huì)檢測(cè)協(xié)程取消狀態(tài)宴合,當(dāng)檢測(cè)到取消操作時(shí),就會(huì)拋出CancellationException異常迹鹅,我們可在代碼塊中對(duì)該異常進(jìn)行捕獲卦洽。

如下所示:

suspend fun main() = coroutineScope {
    val job = launch {
        repeat(1000) { i ->
            println("job: I'm sleeping $i ...")
            delay(500L) // delay will detect cancellation for coroutine
        }
    }
    delay(1300L) // delay a bit
    println("main: I'm tired of waiting!")
    job.cancel() // cancels the job
    job.join() // waits for job's completion
    println("main: Now I can quit.")
}

上述代碼運(yùn)行結(jié)果如下:

:取消操作能成功的前提是:協(xié)程內(nèi)部會(huì)對(duì)取消狀態(tài)進(jìn)行檢測(cè)

因此斜棚,并不是說(shuō)我們調(diào)用cancel后阀蒂,就一定能取消協(xié)程運(yùn)行,比如弟蚀,對(duì)于協(xié)程內(nèi)部進(jìn)行 CPU 密集型計(jì)算的操作蚤霞,就無(wú)法進(jìn)行取消,如下所示:

suspend fun outputTwicePerSecond(){
    var startTime = System.currentTimeMillis()
    var nextPrintTime = startTime
    var i = 0
    while (i < 5) { // computation loop, just wastes CPU
        // print a message twice a second
        if (System.currentTimeMillis() >= nextPrintTime) {
            println("job: I'm sleeping ${i++} ...")
            nextPrintTime += 500L
        }
    }
}

suspend fun main() = coroutineScope {
    val job = launch(Dispatchers.Default) {
        outputTwicePerSecond()
    }
    delay(1300L) // delay a bit
    println("main: I'm tired of waiting!")
    job.cancelAndJoin() // cancels the job and waits for its completion
    println("main: Now I can quit.")
}

上述代碼運(yùn)行結(jié)果如下:

可以看到义钉,在我們調(diào)用了cancel操作后争便,子協(xié)程異步任務(wù)仍繼續(xù)執(zhí)行,直到完成断医。

其原因就是子協(xié)程異步任務(wù)中沒(méi)有對(duì)取消狀態(tài)進(jìn)行檢測(cè)滞乙。解決的方法就是要么在異步任務(wù)循環(huán)部分中調(diào)用suspend函數(shù)(yield是一個(gè)不錯(cuò)的選擇),要么就手動(dòng)對(duì)取消狀態(tài)進(jìn)行檢測(cè)鉴嗤,如下所示:

// 調(diào)用 suspend 函數(shù)
suspend fun outputTwicePerSecond(){
    var startTime = System.currentTimeMillis()
    var nextPrintTime = startTime
    var i = 0
    while (i < 5) { // computation loop, just wastes CPU
        // print a message twice a second
        if (System.currentTimeMillis() >= nextPrintTime) {
            println("job: I'm sleeping ${i++} ...")
            nextPrintTime += 500L
        }
        yield()
    }
}

// 或者對(duì) 取消狀態(tài) 進(jìn)行檢測(cè)
fun CoroutineScope.outputTwicePerSecond() {
    var startTime = System.currentTimeMillis()
    var nextPrintTime = startTime
    var i = 0
    while (this.isActive) { // computation loop, just wastes CPU
        // print a message twice a second
        if (System.currentTimeMillis() >= nextPrintTime) {
            println("job: I'm sleeping ${i++} ...")
            nextPrintTime += 500L
        }
    }
}

異常處理

前面內(nèi)容講過(guò)斩启,要關(guān)閉一個(gè)協(xié)程,使用的是cancel操作醉锅,實(shí)質(zhì)上兔簇,取消操作與異常是緊密聯(lián)系的,當(dāng)我們調(diào)用cancel時(shí)硬耍,協(xié)程內(nèi)部其實(shí)是拋出了一個(gè)CancellationException異常垄琐,從而打斷協(xié)程運(yùn)行,但是這個(gè)異常默認(rèn)會(huì)被異常處理器CoroutineExceptionHandler忽略经柴,不過(guò)我們可以通過(guò)catch塊進(jìn)行捕獲狸窘,如下所示:

fun main() = runBlocking {
    val child = launch {
        try {
            delay(Long.MAX_VALUE)
        } finally {
            log("子協(xié)程停止運(yùn)行!")
        }
    }
    // 讓子協(xié)程啟動(dòng)
    yield()
    log("取消子協(xié)程")
    child.cancel()
    child.join()
    // 釋放父協(xié)程執(zhí)行權(quán)
    delay(TimeUnit.SECONDS.toMillis(1))
    log("父協(xié)程仍在運(yùn)行")
}

上述代碼運(yùn)行結(jié)果如下:

根據(jù)上圖可以直到坯认,對(duì)于取消操作翻擒,實(shí)質(zhì)上就是拋出了一個(gè)CancellationException異常氓涣,所以可以捕獲得到,且子協(xié)程拋出的CancellationException不會(huì)阻斷父協(xié)程的執(zhí)行陋气。

以上我們已經(jīng)知道協(xié)程對(duì)于CancellationException異常的處理劳吠,那么對(duì)于協(xié)程內(nèi)部拋出的其他異常,或者是子協(xié)程發(fā)生異常巩趁,則又是怎樣個(gè)處理方法呢痒玩?如下代碼所示:

suspend fun main() {
    GlobalScope.launch {

        log("父協(xié)程啟動(dòng)")
        val child = launch {
            log("子協(xié)程拋出異常")
            throw ArithmeticException("Div 0")
        }
        // 讓子協(xié)程啟動(dòng)
        yield()
        child.join()
        // 釋放父協(xié)程執(zhí)行權(quán)
        delay(TimeUnit.SECONDS.toMillis(1))
        log("父協(xié)程仍在運(yùn)行")
    }.join()
}

這次拋出的是普通異常ArithmeticException,其運(yùn)行結(jié)果如下所示:

可以看到议慰,子協(xié)程拋出非CancellationException異常蠢古,會(huì)打斷父協(xié)程(取消父協(xié)程)運(yùn)行。具體的過(guò)程涉及的內(nèi)容如下:

  • 異常傳播(Exception propagation):協(xié)程構(gòu)造器對(duì)于異常傳播存在兩種機(jī)制:

    • 向上傳播:即與普通異常處理機(jī)制一致褒脯,當(dāng)出現(xiàn)未捕獲異常時(shí)便瑟,自動(dòng)向上傳播該異常,比如launch番川,actor
    • 自動(dòng)包裹:即協(xié)程內(nèi)部不處理該協(xié)程到涂,而是將異常包裹到返回對(duì)象上,在用戶(hù)進(jìn)行特定動(dòng)作時(shí)暴露給用戶(hù)進(jìn)行捕獲颁督,比如async践啄,produce

    當(dāng)創(chuàng)建的是根協(xié)程時(shí),向上傳播異常機(jī)制會(huì)將異常當(dāng)作未捕獲異常(uncaught exception)沉御,捕獲機(jī)制類(lèi)似于 Java 的Thread.uncaughtExceptionHandler類(lèi)似屿讽,
    而自動(dòng)包裹異常機(jī)制說(shuō)的是,即使協(xié)程內(nèi)部出現(xiàn)異常吠裆,但是不會(huì)直接拋出伐谈,而是等到用戶(hù)調(diào)用相關(guān)方法時(shí)(比如await),才進(jìn)行拋出试疙。

    如下例子所示:

    fun main() = runBlocking {
        // 向上傳播
        val job = GlobalScope.launch { // root coroutine with launch
            println("Throwing exception from launch")
            throw IndexOutOfBoundsException() // Will be printed to the console by Thread.defaultUncaughtExceptionHandler
        }
        job.join()
        println("Joined failed job")
    
        // 自動(dòng)包裹
        val deferred = GlobalScope.async { // root coroutine with async
            println("Throwing exception from async")
            throw ArithmeticException() // Nothing is printed, relying on user to call await
        }
        try {
            deferred.await()
            println("Unreached")
        } catch (e: ArithmeticException) {
            println("Caught ArithmeticException")
        }
    }
    

    可以看到诵棵,向上傳播異常直接拋出,而自動(dòng)包裹異常需要用戶(hù)在特定點(diǎn)上進(jìn)行異常捕獲祝旷。

  • 自定義異常處理器(CoroutineExceptionHandler):對(duì)于未捕獲異常履澳,我們可以通過(guò)為根協(xié)程綁定一個(gè)CoroutineExceptionHandler來(lái)對(duì)根協(xié)程及其子協(xié)程進(jìn)行異常捕獲,其功能類(lèi)似于Thread.uncaughtExceptionHandler怀跛。

    CoroutineExceptionHandler只對(duì)未捕獲異常起作用距贷。特別是,當(dāng)子協(xié)程出現(xiàn)未捕獲異常時(shí)吻谋,它會(huì)一級(jí)一級(jí)向上傳播該異常忠蝗,直至根協(xié)程,也因此滨溉,對(duì)子協(xié)程安裝CoroutineExceptionHandler是沒(méi)有作用的什湘。但是對(duì)于Supervisor长赞,其異常傳播機(jī)制為:向下傳播晦攒,即父協(xié)程發(fā)生異常闽撤,會(huì)傳播給所有子協(xié)程,導(dǎo)致協(xié)程全部取消脯颜;而子協(xié)程發(fā)生異常哟旗,不會(huì)向上傳播,也不會(huì)影響其他子協(xié)程和父協(xié)程栋操。子協(xié)程需要自己處理自己的異常闸餐,因此為Supervisor作用域下的子協(xié)程安裝CoroutineExceptionHandler是有效的。

    asnyc函數(shù)對(duì)異常的作用機(jī)制是:async會(huì)將其內(nèi)所有異常進(jìn)行捕獲矾芙,然后放置到返回值Deferred中舍沙,因此,為async安裝CoroutineExceptionHandler也不會(huì)有任何作用剔宪。

    :在 JVM 平臺(tái)上拂铡,可以通過(guò)ServiceLoader為所有協(xié)程注冊(cè)一個(gè)全局異常處理器CoroutineExceptionHandler,全局異常處理器類(lèi)似于Thread.defaultUncaughtExceptionHandler葱绒,在系統(tǒng)沒(méi)有其他處理器的情況下進(jìn)行使用感帅。
    在 Android 平臺(tái)上,有一個(gè)默認(rèn)以安裝的全局異常處理器:uncaughtExceptionPreHandler

    例子如下所示:

    suspend fun main() {
        // 創(chuàng)建一個(gè)異常處理器
        val handler = CoroutineExceptionHandler { _, exception ->
            println("CoroutineExceptionHandler got $exception")
        }
        val job = GlobalScope.launch(handler) {
            // 安裝異常處理器
            log("父協(xié)程開(kāi)啟")
            val child = launch {
                // 子協(xié)程拋出異常
                log("子協(xié)程拋出異常")
                throw AssertionError()
            }
            delay(TimeUnit.SECONDS.toMillis(1))
            log("父協(xié)程仍在運(yùn)行")
        }
        job.join()
    }
    

    上述代碼執(zhí)行結(jié)果如下圖所示:


    可以看到地淀,為根協(xié)程安裝了CoroutineExceptionHandler后失球,成功捕獲到子協(xié)程異常。
    同時(shí)可以看到的是帮毁,CoroutineExceptionHandler只是對(duì)異常進(jìn)行了捕獲顯示实苞,不會(huì)對(duì)協(xié)程有實(shí)質(zhì)影響,子協(xié)程拋出異常后烈疚,同樣會(huì)取消父協(xié)程黔牵。

    :對(duì)于CoroutineExceptionHandler,不要將其安裝到runBlocking中胞得,因?yàn)?code>runBlocking在子協(xié)程異常結(jié)束時(shí)荧止,就會(huì)自動(dòng)cancelCoroutineExceptionHandler不會(huì)起作用阶剑。

  • 異常聚合:如果多個(gè)子協(xié)程都拋出異常跃巡,那么一般規(guī)則是:第一個(gè)拋出的異常勝出,異常處理器會(huì)處理第一個(gè)異常牧愁,其他異常會(huì)作為次級(jí)信息附加到第一個(gè)異常后面素邪。如下代碼所示:

    fun main() = runBlocking {
        val handler = CoroutineExceptionHandler { _, exception ->
            println("CoroutineExceptionHandler got $exception with suppressed ${exception.suppressed.contentToString()}")
        }
        val job = GlobalScope.launch(handler) {
            launch {
                try {
                    delay(Long.MAX_VALUE) // it gets cancelled when another sibling fails with IOException
                } finally {
                    throw ArithmeticException() // the second exception
                }
            }
            launch {
                delay(100)
                throw IOException() // the first exception
            }
            delay(Long.MAX_VALUE)
        }
        job.join()
    }
    

    其運(yùn)行結(jié)果如下所示:


通道(Channel)

Channel 可以讓數(shù)據(jù)流動(dòng)在不同的協(xié)程中,提供了協(xié)程間通信機(jī)制猪半。

本質(zhì)上兔朦,Channel 底層就是一個(gè)線(xiàn)程安全的隊(duì)列(類(lèi)似BlockingQueue)偷线,一個(gè)協(xié)程可以往里面發(fā)送數(shù)據(jù),另一個(gè)協(xié)程就可以在里面獲取數(shù)據(jù)沽甥,完成通信過(guò)程声邦。如下所示:

suspend fun main() {
    val channel = Channel<Int>()

    var producer = GlobalScope.launch {
        for (i in 1..10) {
            log("channel send $i")
            channel.send(i)
        }
    }

    var consumer = GlobalScope.launch {
        repeat(10) {
            val value = channel.receive()
            log("channel receiver $value")
        }
    }
    producer.join()
    consumer.join()
}

上述代碼是基于Channel實(shí)現(xiàn)的一個(gè)簡(jiǎn)單的生產(chǎn)者-消費(fèi)者模式。

sendreceive函數(shù)都是suspend函數(shù)摆舟,因此:

  • 如果Channel未發(fā)送數(shù)據(jù)亥曹,或者底層隊(duì)列已滿(mǎn),則send會(huì)被掛起
  • 如果Channel無(wú)法讀取到數(shù)據(jù)恨诱,則receive會(huì)掛起媳瞪,直到新元素到來(lái)

下面對(duì)Channel的一些基本操作進(jìn)行簡(jiǎn)介:

  • 迭代Channel:我們上面的例子是手動(dòng)指定接收次數(shù),并使用channel.receive進(jìn)行數(shù)據(jù)接收照宝,這種方式不夠靈活蛇受。
    Channel本身提供了迭代器操作:channel.iterator(),這種寫(xiě)法可以簡(jiǎn)化成for...in...操作厕鹃,更加簡(jiǎn)潔方便兢仰。如下所示:

    var consumer = GlobalScope.launch {
        for (element in channel){
            log("channel receive $element")
        }
    }
    
  • 指定緩沖區(qū)大小:前面說(shuō)過(guò),Channel本質(zhì)上是一個(gè)隊(duì)列熊响,發(fā)送數(shù)據(jù)時(shí)旨别,會(huì)把數(shù)據(jù)存儲(chǔ)到到隊(duì)列緩沖區(qū)中,接收數(shù)據(jù)時(shí)汗茄,會(huì)從該隊(duì)列緩沖區(qū)中進(jìn)行讀取秸弛。
    默認(rèn)情況下,緩沖區(qū)大小為 0洪碳,但是我們可以通過(guò)在創(chuàng)建Channel時(shí)递览,手動(dòng)指定該緩沖區(qū)大小,比如:Channel(4)瞳腌,表示緩沖區(qū)大小為 4绞铃。

  • 創(chuàng)建Channel:前面我們都是通過(guò)Channel構(gòu)造函數(shù)單獨(dú)創(chuàng)建一個(gè)Channel,然后嵌套到生產(chǎn)者協(xié)程和消費(fèi)者協(xié)程中嫂侍,這種做法很直觀儿捧,但是代碼量略大。Kotlin 提供了一些擴(kuò)展方法挑宠,可以很方便地讓我們創(chuàng)建Channel菲盾。如下所示:

    suspend fun main() = coroutineScope {
        val receiveChannel = produce<Int> {
            for (i in 1..10) {
                log("send $i")
                send(i)
            }
        }
    
        receiveChannel.consumeEach {
            log("receive $it")
        }
    }
    

    使用擴(kuò)展函數(shù)produce來(lái)創(chuàng)建一個(gè)Channel,使用consumeEach來(lái)消費(fèi)Channel各淀。

  • 關(guān)閉ChannelChannel是可以進(jìn)行關(guān)閉的懒鉴,其關(guān)閉方法為closeclose方法的底層實(shí)現(xiàn)其實(shí)就是通過(guò)發(fā)送一個(gè)特殊的 token 給到Channel,當(dāng)Channel讀取到該 token 時(shí)临谱,就會(huì)進(jìn)行關(guān)閉操作璃俗。因此,close方法其實(shí)不是立即關(guān)閉Channel悉默,而是會(huì)等到Channel底層隊(duì)列緩沖區(qū)數(shù)據(jù)完全被消費(fèi)完畢后城豁,才會(huì)進(jìn)行關(guān)閉操作。

  • 單發(fā)多收:多個(gè)協(xié)程可以從同一個(gè)Channel中接收數(shù)據(jù)麦牺,此時(shí)接收協(xié)程競(jìng)爭(zhēng)該Channel數(shù)據(jù)钮蛛。如下所示:

    fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
        for (msg in channel) {
            log("Processor #$id received $msg")
        }
    }
    suspend fun main() = coroutineScope {
        val singleChannel = produce<Int> {
            var count: Int = 0
            while (true) {
                send(++count)
                delay(100)
            }
        }
    
        repeat(5) {
            launchProcessor(it, singleChannel)
        }
        delay(950)
        singleChannel.cancel()
    }
    

    for...in...即使在多個(gè)協(xié)程中同時(shí)對(duì)同一個(gè)Channel進(jìn)行遍歷操作鞭缭,也是非常安全的剖膳。

    上述代碼運(yùn)行結(jié)果如下圖所示:


  • 單收多發(fā):多個(gè)協(xié)程可同時(shí)向同一個(gè)Channel發(fā)送數(shù)據(jù),這個(gè)很好理解岭辣,就是隊(duì)列入數(shù)據(jù)吱晒。如下所示:

    fun CoroutineScope.sendString(channel: SendChannel<String>, data: String, time: Long) = launch {
        while (true) {
            delay(time)
            log("send $data")
            channel.send(data)
        }
    }
    
    suspend fun main() = coroutineScope {
        val channel = Channel<String>()
        // 開(kāi)辟兩個(gè)協(xié)程發(fā)送數(shù)據(jù)
        sendString(channel, "hello", 200L)
        sendString(channel, "world", 500L)
        // 只接收 6 次
        repeat(6) {
            val data = channel.receive()
            log("receive $data")
        }
        // 關(guān)閉所有子協(xié)程
        coroutineContext.cancelChildren()
    }
    

    上述代碼運(yùn)行結(jié)果如下圖所示:


  • 廣播Channel:前面我們提到過(guò),Channel發(fā)送端和接收端可以存在一對(duì)多的情形沦童,此時(shí)發(fā)送端的數(shù)據(jù)被接收端競(jìng)爭(zhēng)仑濒,一個(gè)數(shù)據(jù)只會(huì)被一個(gè)接收端獲取。
    而如果想實(shí)現(xiàn)的是一個(gè)數(shù)據(jù)可以被所有的接收端獲取得到偷遗,則可以使用廣播通道BroadcastChannel墩瞳,如下所示:

    fun CoroutineScope.register(id: Int, channel: BroadcastChannel<Int>) = launch {
        val receiverChannel = channel.openSubscription()
        for (i in receiverChannel) {
            log("receive broadcast:$id -> $i")
        }
    }
    
    suspend fun main() = coroutineScope {
        // 創(chuàng)建廣播通道
        val channel = BroadcastChannel<Int>(Channel.BUFFERED)
        // 創(chuàng)建兩個(gè)協(xié)程,注冊(cè)到該 channel
        repeat(2) {
            register(it, channel)
        }
        // 發(fā)送廣播
        for (i in 1..3) {
            delay(100)
            log("send broadcast: $i")
            channel.send(i)
        }
        coroutineContext.cancelChildren()
    }
    

    :最好是先進(jìn)行注冊(cè)氏豌,然后再發(fā)送數(shù)據(jù)喉酌,否則可能存在數(shù)據(jù)接收不完整問(wèn)題,因?yàn)?code>BroadcastChannel在發(fā)送數(shù)據(jù)時(shí)泵喘,如果發(fā)現(xiàn)沒(méi)有訂閱者泪电,則會(huì)直接拋棄到該條數(shù)據(jù)。

    上述代碼中纪铺,使用函數(shù)BroadcastChannel來(lái)創(chuàng)建一個(gè)廣播通道相速,然后需要數(shù)據(jù)的協(xié)程可以通過(guò)openSubscription來(lái)進(jìn)行注冊(cè),其結(jié)果如下圖所示:

    可以看到鲜锚,每個(gè)注冊(cè)到廣播通道的協(xié)程都接收到每個(gè)數(shù)據(jù)突诬。

    :除了使用BroadcastChannel創(chuàng)建廣播通道外,還可以將現(xiàn)成的Channel轉(zhuǎn)換成廣播:Channel<Int>(3).broadcast(4)芜繁,這種方法其實(shí)是通過(guò)對(duì)源Channel進(jìn)行一個(gè)讀取操作旺隙,轉(zhuǎn)換為一個(gè)全局廣播GlobalScope.broadcast,如果此時(shí)還有其他協(xié)程讀取該源Channel浆洗,則可能產(chǎn)生如下結(jié)果:

    • 只有源Channel發(fā)送數(shù)據(jù):此時(shí)接收協(xié)程和注冊(cè)廣播的協(xié)程會(huì)競(jìng)爭(zhēng)這些數(shù)據(jù)
    • 只使用廣播通道發(fā)送數(shù)據(jù):此時(shí)則只有注冊(cè)廣播的協(xié)程能接收到這些數(shù)據(jù)
    • 同時(shí)使用源Channel和廣播發(fā)送數(shù)據(jù):此時(shí)數(shù)據(jù)十分混亂催束,不建議使用

冷數(shù)據(jù)流 Flow

一個(gè)suspend函數(shù)可以異步返回一個(gè)數(shù)值,而借助 異步流伏社,我們就可以返回多個(gè)數(shù)值抠刺。

Kotlin 中提供了以下方法可以讓我們返回多個(gè)數(shù)值:

  • Sequence:對(duì)于 CPU 密集型的異步任務(wù)塔淤,我們可以使用 序列 來(lái)返回多個(gè)值,如下所示:

    fun simple(): Sequence<Int> = sequence { // sequence builder
        for (i in 1..3) {
            Thread.sleep(100) // pretend we are computing it
            yield(i) // yield next value
        }
    }
    
    fun main() {
        simple().forEach { value -> println(value) }
    }
    

    上述代碼在序列內(nèi)部通過(guò)yield函數(shù)將值傳遞出去速妖,從而完成多個(gè)值的傳遞高蜂。
    但是任務(wù)運(yùn)行在主線(xiàn)程中,會(huì)卡住主線(xiàn)程罕容,此時(shí)可通過(guò)suspend函數(shù)進(jìn)行解決备恤。

  • suspend函數(shù)suspend函數(shù)直接或間接運(yùn)行在協(xié)程中,使用得當(dāng)則可解決異步任務(wù)卡主線(xiàn)程問(wèn)題锦秒,如下所示:

    suspend fun simple(): List<Int> {
        delay(1000) // pretend we are doing something asynchronous here
        return listOf(1, 2, 3)
    }
    
    fun main() = runBlocking<Unit> {
        simple().forEach { value -> println(value) } 
    }
    

    由于一個(gè)suspend函數(shù)只能返回一個(gè)值露泊,上述代碼通過(guò)返回集合的方式,雖然返回了多個(gè)值旅择,但是是一次性的惭笑,對(duì)于數(shù)據(jù)量較大或是無(wú)限數(shù)據(jù)流的情況不太適用,此時(shí)可采用數(shù)據(jù)流Flow方式進(jìn)行解決生真。

  • 冷數(shù)據(jù)流 FlowFlow是響應(yīng)式編程(比如:RxJava)與 Kotlin 協(xié)程結(jié)合而成的產(chǎn)物沉噩,將數(shù)據(jù)以流的方式進(jìn)行發(fā)送。

    fun simple(): Flow<Int> = flow { // flow builder
        for (i in 1..3) {
            delay // pretend we are doing something useful here
            emit(i) // emit next value
        }
    }
    
    fun main() = runBlocking<Unit> {
        // Collect the flow
        simple().collect { value -> println(value) }
    }
    

    上述代碼通過(guò)flow函數(shù)生成一個(gè)異步數(shù)據(jù)流Flow,在其block塊內(nèi)同通過(guò)emit發(fā)送數(shù)據(jù),每次調(diào)用emit的同時(shí)著角,也會(huì)同樣調(diào)用ensureActive函數(shù),對(duì)協(xié)程取消狀態(tài)進(jìn)行檢測(cè)畜眨。最后是通過(guò)collect函數(shù)對(duì)數(shù)據(jù)進(jìn)行獲取消費(fèi)。

    Flow流是冷數(shù)據(jù)流痰哨,也即在調(diào)用flow創(chuàng)建一個(gè)Flow時(shí)胶果,只有在消費(fèi)時(shí)(比如:collect),才會(huì)真正執(zhí)行數(shù)據(jù)生成邏輯斤斧。

下面對(duì)Flow的一些特性進(jìn)行簡(jiǎn)介:

  • Flow構(gòu)造器:Kotlin 提供了多種用于創(chuàng)建Flow的方法早抠,以下介紹常用的幾個(gè):

    • flow:該方法是用于創(chuàng)建Flow最基礎(chǔ)的方法,其參數(shù)block內(nèi)可通過(guò)emit函數(shù)發(fā)送數(shù)據(jù)撬讽,但是不能直接在其內(nèi)進(jìn)行線(xiàn)程切換蕊连,如下所示:

      flow<Int> {
          emit(1)
          withContext(Dispatchers.IO) { // BAD!!
              emit(2)
          }
      }.collect { println(it) }
      
    • channelFlow:如果想在生成元素時(shí)切換調(diào)度器,可以通過(guò)該函數(shù)來(lái)創(chuàng)建Flow

    channelFlow<Int> {
        send(1)
        withContext(Dispatchers.IO) { // BAD!!
            send(2)
        }
    }.collect { log(it) }
    
    • flowOf:該方法可接收可變數(shù)量類(lèi)型參數(shù)游昼,將其轉(zhuǎn)化為Flow
    flowOf<Int>(1,2,3,4)
    
    • asFlow:集合或者序列可以通過(guò).asFlow()方法轉(zhuǎn)化為Flow甘苍,如下所示:
    sequence<Int> { yield(1) }.asFlow().collect { log("sequence: $it") }
    listOf<Int>(1, 2, 3).asFlow().collect { log("listOf: $it") }
    setOf<Int>(1, 2, 3).asFlow().collect { log("setOf: $it") }
    
  • 取消發(fā)送數(shù)據(jù)Flow的消費(fèi)依賴(lài)于終端操作符(比如collect),終端操作符必須在協(xié)程中進(jìn)行調(diào)用烘豌,只要終端操作符所在的協(xié)程被取消時(shí)载庭,Flow內(nèi)部檢測(cè)到取消狀態(tài),就會(huì)進(jìn)行取消操作。因此囚聚,取消協(xié)程靖榕,只需取消它所在的協(xié)程即可。如下所示:

    suspend fun main() {
        val job = GlobalScope.launch {
            val intFlow = flow {
                (1..3).forEach {
                    delay(1000)
                    log("emit $it")
                    emit(it)
                }
            }
    
            intFlow.collect { log("receive $it") }
        }
        delay(2500)
        log("cancel outter coroutine!!")
        job.cancelAndJoin();
    }
    

    上述代碼運(yùn)行結(jié)果如下圖所示:

  • 操作符:由于Flow是在協(xié)程上實(shí)現(xiàn)的響應(yīng)式編程模型顽铸,因此它與 RxJava 一樣茁计,都有著很多的操作符,可以對(duì)Flow進(jìn)行轉(zhuǎn)換谓松,下面簡(jiǎn)單列舉幾個(gè)常見(jiàn)的操作符:

    • map/filter/take..:轉(zhuǎn)換操作符

      suspend fun main() {
          val value = (1..10).asFlow()
              .filter { it % 2 == 0 }
              .take(3)
              .map {
                  delay(1000)
                  log("map value: $it")
                  it + 1
              }.reduce { a, b -> a + b }
          println("result is $value")
      }
      
    • flattenConcat:該函數(shù)用于將嵌套Flow<Flow<T>>類(lèi)型進(jìn)行打平星压,轉(zhuǎn)換為Flow<T>。如下所示:

      suspend fun main() {
          flow<Int> {
              List(5) { emit(it) }
          }.map { // it = Int
              flow {
                  List(it) { emit(it) }
              }
          }.onEach { // it = Flow<Int>
              it.collect { value -> log(value) }
          }.collect()
      }
      

      flattenConcat不會(huì)打亂拼接順序鬼譬,結(jié)果仍然是生產(chǎn)時(shí)的順序娜膘。
      如果想并發(fā)拼接,則可以使用flattenMerge拧簸,這將產(chǎn)生無(wú)序結(jié)果劲绪。
      其實(shí),flattenConcat在概念上等同于flattenMerge(concurrency = 1)盆赤,但具備更好的性能。

    • transform:該操作符是最通用的一種操作符歉眷,可用它模擬其他簡(jiǎn)單轉(zhuǎn)換的操作符牺六,比如mapfilter汗捡,或者一些更加復(fù)雜的轉(zhuǎn)換淑际。
      使用transform操作符,我們可以對(duì)任意數(shù)值重復(fù)發(fā)送emit任意次數(shù)扇住。如下所示:

    suspend fun main() {
        flowOf(100)
            .transform { value ->
                repeat(3) {
                    emit("send $value") // 重復(fù)發(fā)送 3 次 
                }
            }.collect { log(it) }
    }
    
    • collect/toList/toSet/first/single/reduce/fold:終端操作符春缕,對(duì)Flow流進(jìn)行消費(fèi)。如下所示:
    val sum = (1..5).asFlow()
        .map { it * it } // squares of numbers from 1 to 5                           
        .reduce { a, b -> a + b } // sum them (terminal operator)
    println(sum)
    

    :由于Flow的消費(fèi)端必須運(yùn)行在協(xié)程中艘蹋,因此終端操作符一定是suspend函數(shù)锄贼。

    • flowOn:該操作符可以為Flow指定數(shù)據(jù)流生產(chǎn)所需的調(diào)度器,如下所示:
    flow { /*數(shù)據(jù)生產(chǎn)*/ }.flowOn(Dispatchers.IO).collect {...}
    

    主要就是使用flowOn進(jìn)行線(xiàn)程切換女阀,因此會(huì)導(dǎo)致開(kāi)辟一個(gè)新的協(xié)程宅荤。
    需要注意的是,flowOn只會(huì)對(duì)其前面的操作產(chǎn)生效果浸策,這里也就是對(duì)flow {...}產(chǎn)生影響冯键,對(duì)collect沒(méi)有影響。

    • buffer:該操作符為數(shù)據(jù)流生產(chǎn)和終端操作符消費(fèi)各自開(kāi)辟一個(gè)協(xié)程庸汗,這樣兩者就能并發(fā)運(yùn)行惫确,然后通過(guò)一個(gè)Channel將生產(chǎn)的數(shù)據(jù)流發(fā)送給終端消費(fèi),完成整套操作。

      默認(rèn)情況下改化,Flow流是順序操作的昧诱,也即所有操作符都作用在同一個(gè)協(xié)程中,如下所示:

      flowOf("A", "B", "C")
          .onEach  { println("1$it") }
          .collect { println("2$it") }
      

      上述代碼的運(yùn)行流程如下所示:

      Q : -->-- [1A] -- [2A] -- [1B] -- [2B] -- [1C] -- [2C] -->--
      

      可以看到所袁,元素是一個(gè)一個(gè)按生產(chǎn)先后順序依次進(jìn)行操作的盏档,這樣子的話(huà),如果數(shù)據(jù)流生產(chǎn)和消費(fèi)都存在耗時(shí)操作時(shí)燥爷,總時(shí)間就是所有操作耗時(shí)總合蜈亩,如下所示:

      suspend fun main() {
          val time = measureTimeMillis {
              flowOf("A", "B", "C")
                  .onEach {// 每生產(chǎn)一個(gè)數(shù)據(jù),耗時(shí) 100 ms
                      delay(100)
                      log("generate : $it")
                  }
                  .collect {
                      delay(300) // 每消費(fèi)一個(gè)數(shù)據(jù)前翎,耗時(shí) 300 ms
                      log("consume: $it")
                  }
          }
          println("Collected in $time ms")
      }
      

      上述代碼中稚配,每生產(chǎn)一個(gè)數(shù)據(jù)耗時(shí) 100 ms,每消費(fèi)一個(gè)數(shù)據(jù)耗時(shí) 300 ms港华,如果以默認(rèn)的按順序消費(fèi)道川,則總時(shí)間耗費(fèi)為:(100 + 300)*3 = 1200 ms 左右。
      實(shí)際運(yùn)行結(jié)果如下圖所示:


      no buffer

      但是如果我們?cè)跀?shù)據(jù)和生產(chǎn)消費(fèi)中間添加一個(gè)buffer操作立宜,如下所示:

      suspend fun main() {
          val time = measureTimeMillis {
              flowOf("A", "B", "C")
                  .onEach {// 每生產(chǎn)一個(gè)數(shù)據(jù)冒萄,耗時(shí) 100 ms
                      delay(100)
                      log("generate : $it")
                  }
                  .buffer() // 分割生產(chǎn)和消費(fèi)
                  .collect {
                      delay(300) // 每消費(fèi)一個(gè)數(shù)據(jù),耗時(shí) 300 ms
                      log("consume: $it")
                  }
          }
          println("Collected in $time ms")
      }
      

      上述代碼運(yùn)行結(jié)果如下圖所示:


      可以看到橙数,僅僅只發(fā)送 3 個(gè)數(shù)據(jù)尊流,就有將近 60 ms 的時(shí)間節(jié)省,如果數(shù)據(jù)量更大點(diǎn)灯帮,節(jié)省的時(shí)間會(huì)更加客觀崖技。
      但更重要的是,從上圖中可以看到钟哥,數(shù)據(jù)生產(chǎn)和消費(fèi)是并行運(yùn)行的迎献,因?yàn)榫腿缥覀兩鲜鏊f(shuō)的,buffer會(huì)將其上和其下的操作放置到兩個(gè)獨(dú)立的協(xié)程中腻贰,從而增加了并發(fā)吁恍,節(jié)省時(shí)間。

      這里借用官網(wǎng)的一副示意圖银受,會(huì)更加清晰整個(gè)過(guò)程:

      buffer

      綜上:當(dāng)數(shù)據(jù)生產(chǎn)和終端消費(fèi)都存在耗時(shí)操作時(shí)践盼,使用buffer可以提高并發(fā),節(jié)省時(shí)間宾巍。

    • catchFlow提供了catch操作符咕幻,可以很方便對(duì)異常進(jìn)行捕獲。如下所示:

      suspend fun main() {
          flow {
              emit(1)
              // 手動(dòng)拋出異常
              throw ArithmeticException("divide 0")
          }.catch { t: Throwable ->
              log("caught error: $t")
          }.collect { log(it) }
      }
      

      catch只能捕獲它上游的異常顶霞,因此肄程,通常將catch放置在最后操作符位置锣吼,也即終端操作符前一個(gè)。
      但是對(duì)于終端操作符發(fā)生的異常蓝厌,catch無(wú)法捕捉得到玄叠。此時(shí),其實(shí)可以通過(guò)將數(shù)據(jù)消費(fèi)轉(zhuǎn)移到onEach進(jìn)行處理拓提,如下所示:

      suspend fun main() {
          flow {
              emit(1)
              // 手動(dòng)拋出異常
              throw ArithmeticException("divide 0")
          }.onEach {
              log(it)
          }.catch { t: Throwable ->
              log("caught error: $t")
          }.collect()
      }
      

      注意catch需要放置到onEach前面读恃,這樣才能捕獲到onEach出現(xiàn)的異常。
      同時(shí)代态,我們使用的終端操作符函數(shù)為collect()寺惫,它會(huì)觸發(fā)數(shù)據(jù)消費(fèi)過(guò)程,但本身不對(duì)數(shù)據(jù)進(jìn)行處理蹦疑。

    • cancelable:讓Flow流可以被取消西雀。

      上文我們介紹過(guò),如果未在協(xié)程異步任務(wù)中對(duì)取消狀態(tài)進(jìn)行檢測(cè)歉摧,那么協(xié)程是無(wú)法響應(yīng)cancel的艇肴,而對(duì)于Flow流來(lái)說(shuō),也是一樣的叁温。
      如果Flow流內(nèi)部是通過(guò)emit進(jìn)行數(shù)據(jù)發(fā)送再悼,由于本身就是一個(gè)suspend函數(shù)(其實(shí)質(zhì)是其內(nèi)部會(huì)通過(guò)ensureActive對(duì)取消狀態(tài)進(jìn)行檢測(cè)),故可以響應(yīng)cancle操作券盅。但是如果是由比如intRange.asFlow這類(lèi)直接生成Flow帮哈,其內(nèi)部未對(duì)取消狀態(tài)進(jìn)行檢測(cè),故無(wú)法響應(yīng)cancel操作锰镀,當(dāng)然我們可以通過(guò)onEach操作符為其手動(dòng)添加檢測(cè):.onEach { currentCoroutineContext().ensureActive() },但是使用cancelable操作符會(huì)更加簡(jiǎn)潔方便咖刃,如下所示:

      fun main() = runBlocking<Unit> {
      (1..5).asFlow()
          .cancellable()
          .collect { value -> 
              if (value == 3) cancel()  
              println(value)
      } 
      

更多操作符內(nèi)容泳炉,,請(qǐng)參考:Asynchronous Flow

協(xié)程并發(fā)安全

由于 Kotlin 協(xié)程在 JVM 上的底層實(shí)現(xiàn)是基于線(xiàn)程的嚎杨,因此協(xié)程間狀態(tài)共享存在數(shù)據(jù)競(jìng)爭(zhēng)問(wèn)題花鹅,此時(shí)需要進(jìn)行數(shù)據(jù)同步操作。

如果協(xié)程運(yùn)行不同的調(diào)度器中(或者運(yùn)行在多線(xiàn)程調(diào)度器中枫浙,比如Dispatchers.Default)刨肃,則其異步任務(wù)實(shí)際上運(yùn)行在不同的線(xiàn)程上,此時(shí)通過(guò) Java 自帶的一些線(xiàn)程安全操作(比如加鎖箩帚,原子類(lèi)...)真友,就可以保證協(xié)程并發(fā)安全。

但是協(xié)程本身也提供了一些并發(fā)安全措施紧帕,主要有如下幾方面內(nèi)容:

  • 線(xiàn)程細(xì)粒度控制:如果說(shuō)很多個(gè)協(xié)程(假設(shè)運(yùn)行在不同的線(xiàn)程中)對(duì)同一個(gè)數(shù)據(jù)進(jìn)行修改盔然,其實(shí)就是線(xiàn)程并發(fā)讀寫(xiě)數(shù)據(jù)桅打,這樣數(shù)據(jù)就不安全。
    其實(shí)我們可以將對(duì)數(shù)據(jù)的修改行為都調(diào)度到同一條線(xiàn)程中愈案,這樣就不會(huì)出現(xiàn)競(jìng)爭(zhēng)導(dǎo)致數(shù)據(jù)出錯(cuò)問(wèn)題挺尾。如下所示:

    suspend fun massiveRun(action: suspend () -> Unit) {
        val n = 100  // number of coroutines to launch
        val k = 1000 // times an action is repeated by each coroutine
        val time = measureTimeMillis {
            coroutineScope { // scope for coroutines
                repeat(n) { // 創(chuàng)建 100 個(gè)協(xié)程
                    launch { // 執(zhí)行 1000 次動(dòng)作
                        repeat(k) { action() }
                    }
                }
            }
        }
        println("Completed ${n * k} actions in $time ms")
    }
    
    // 任務(wù)調(diào)度器:?jiǎn)我痪€(xiàn)程
    val counterContext = newSingleThreadContext("CounterContext")
    
    // 共享數(shù)據(jù)
    var counter = 0
    
    fun main() = runBlocking {
    
        withContext(Dispatchers.Default) {
            massiveRun {
                // confine each increment to a single-threaded context
                withContext(counterContext) {
                    counter++
                }
            }
        }
        println("Counter = $counter")
    }
    

    上述代碼在所有協(xié)程中,通過(guò)withContext將對(duì)數(shù)據(jù)的修改都切換到全局唯一的一條線(xiàn)程上站绪,保證了數(shù)據(jù)并發(fā)安全遭铺。結(jié)果如下:

    single thread

  • 線(xiàn)程粗粒度控制:直接在一條線(xiàn)程上開(kāi)啟多個(gè)協(xié)程,這樣就自然保證了數(shù)據(jù)并發(fā)安全恢准。如下所示:

    // 任務(wù)調(diào)度器:?jiǎn)我痪€(xiàn)程
    val counterContext = newSingleThreadContext("CounterContext")
    
    // 共享數(shù)據(jù)
    var counter = 0
    
    fun main() = runBlocking {
        // confine everything to a single-threaded context
        withContext(counterContext) {
            massiveRun {
                counter++
            }
        }
        println("Counter = $counter")
    }
    

    執(zhí)行結(jié)果如下所示:


    可以看到魂挂,效果比細(xì)粒度控制快了兩個(gè)量級(jí)。一個(gè)是因?yàn)榫€(xiàn)程細(xì)粒度控制其實(shí)開(kāi)辟了很多不必要的線(xiàn)程顷歌,系統(tǒng)開(kāi)銷(xiāo)很大锰蓬,然后各個(gè)協(xié)程會(huì)把數(shù)據(jù)修改工作分發(fā)到另一個(gè)全局唯一的任務(wù)協(xié)程中,這就涉及到線(xiàn)程切換問(wèn)題...而粗粒度是直接在一條線(xiàn)程上開(kāi)啟很多個(gè)協(xié)程眯漩,異步完成數(shù)據(jù)修改(由于在同一條線(xiàn)程上修改數(shù)據(jù)芹扭,天然線(xiàn)程安全),幾乎不占用系統(tǒng)資源赦抖,因此速度很快舱卡。這從一方面也驗(yàn)證了:協(xié)程是輕量級(jí)線(xiàn)程。

  • 互斥(Mutual exclusion):互斥方案就是使用永不并發(fā)的 關(guān)鍵部分(critical section) 來(lái)保護(hù)共享狀態(tài)修改队萤。
    在阻塞調(diào)用中轮锥,通常使用synchronizedReentrantLock來(lái)控制線(xiàn)程互斥。而在協(xié)程中要尔,提供的加鎖機(jī)制為Mutex舍杜,可以使用Mutex.lockMutex.unlock來(lái)劃定關(guān)鍵部分。Mutext相對(duì)synchronized等鎖的最大區(qū)別是:Mutext.lock方法是一個(gè)suspend方法赵辕,因此它不會(huì)阻塞線(xiàn)程既绩。

    Mutext提供了一個(gè)簡(jiǎn)便方法withLock,其相當(dāng)于mutext.lock(); try{ ... } finally {mutext.unlock()}还惠。

    下面使用Mutext來(lái)修改上面代碼饲握,如下所示:

    // 共享數(shù)據(jù)
    var counter = 0
    
    // 創(chuàng)建一個(gè)協(xié)程互斥鎖
    val mutex = Mutex()
    
    fun main() = runBlocking {
        withContext(Dispatchers.Default) {
            massiveRun {
                // protect each increment with lock
                mutex.withLock {
                    counter++
                }
            }
        }
        println("Counter = $counter")
    }
    

    直接對(duì)修改部分加鎖即可,其運(yùn)行結(jié)果如下所示:


    直觀看來(lái)蚕键,加鎖的效率不如單線(xiàn)程創(chuàng)建多協(xié)程救欧,一方面加鎖的確會(huì)更慢,但是這里更多的開(kāi)銷(xiāo)應(yīng)該是調(diào)度器默認(rèn)開(kāi)啟的線(xiàn)程池锣光。

  • Actors:一個(gè)actor實(shí)體是由一個(gè)協(xié)程笆怠,被封裝到協(xié)程的狀態(tài)和一個(gè)通道Channel組合而成的。
    Kotlin 提供了actor協(xié)程構(gòu)造器嫉晶,可用于方便創(chuàng)建帶有可以接收消息的郵箱Channel骑疆,同時(shí)將發(fā)送Channel合并進(jìn)返回的Job當(dāng)中田篇。

    使用actor來(lái)達(dá)到數(shù)據(jù)安全的方式如下:

    1. 通過(guò)actor方法創(chuàng)建的協(xié)程,發(fā)送的信息會(huì)被其內(nèi)部代碼塊接收得到箍铭,因此首先需要定義一個(gè)消息體:
    // Message types for counterActor
    sealed class CounterMsg
    object IncCounter : CounterMsg() // one-way message to increment counter
    class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg() // a request with reply
    
    1. 然后泊柬,創(chuàng)建一個(gè)發(fā)送和接收消息體的actor
    // This function launches a new counter actor
    fun CoroutineScope.counterActor() = actor<CounterMsg> {
        var counter = 0 // actor state
        for (msg in channel) { // iterate over incoming messages
            when (msg) {
                is IncCounter -> counter++
                is GetCounter -> msg.response.complete(counter)
            }
        }
    }
    

    actor內(nèi)部包含狀態(tài),對(duì)應(yīng)我們上面的例子诈火,這個(gè)狀態(tài)就是被修改的數(shù)據(jù)counter兽赁。

    1. 并發(fā)修改數(shù)據(jù)的協(xié)程直接使用actor對(duì)象發(fā)送修改信息即可:
    fun main() = runBlocking<Unit> {
        val sendChannel = counterActor() // create the actor
        withContext(Dispatchers.Default) {
            massiveRun {
                sendChannel.send(IncCounter)
            }
        }
        // send a message to get a counter value from an actor
        val response = CompletableDeferred<Int>()
        sendChannel.send(GetCounter(response))
        println("Counter = ${response.await()}")
        sendChannel.close() // shutdown the actor
    }
    

    上述代碼運(yùn)行結(jié)果如下所示:


    其實(shí),actor的本質(zhì)就是一個(gè) 多對(duì)一Channel冷守,多個(gè)協(xié)程通過(guò)Channel發(fā)送修改事件給到actor刀崖,actor修改自身內(nèi)部狀態(tài)即可,所有的修改操作都發(fā)生在actor本身的協(xié)程中拍摇,故數(shù)據(jù)安全亮钦,不用考慮線(xiàn)程同步。

    actor是一個(gè)雙重的produce協(xié)程構(gòu)造器充活,actor與接收通道相關(guān)聯(lián)(內(nèi)部代碼塊接收信息)蜂莉,produce與發(fā)送通道相關(guān)聯(lián)(內(nèi)部代碼用于發(fā)送消息)。

Select Expression (experimental)

Select 表達(dá)式混卵,即 多路復(fù)用映穗,與 Unix 中的 IO 多路復(fù)用功能相似。

Kotlin 中的 多路復(fù)用select幕随,可以對(duì)多個(gè)協(xié)程某些操作進(jìn)行復(fù)用蚁滋,比如:

  • 復(fù)用多個(gè)await:通常使用async來(lái)創(chuàng)建一個(gè)協(xié)程,那么就必須使用await來(lái)獲取協(xié)程結(jié)果赘淮,這是同步調(diào)用辕录。
    但是借助多路復(fù)用select,我們可以使用onAwait來(lái)自動(dòng)獲取結(jié)果梢卸,無(wú)需顯示調(diào)用await踏拜,避免了阻塞協(xié)程操作,這是異步調(diào)用低剔。具體代碼如下所示:

    fun main() = runBlocking {
    
        val deferred = async {
            delay(200)
            "Hello World"
        }
    
        val result = select<String> {
            deferred.onAwait { result ->
                "onAwait receive async result: $result"
            }
        }
        log("result: $result")
    }
    

    結(jié)果如下所示:

    多路復(fù)用select 除了能將同步調(diào)用轉(zhuǎn)化為異步調(diào)用外,他還具備事件競(jìng)爭(zhēng)機(jī)制肮塞,即自動(dòng)獲取最先發(fā)生的事件襟齿。

    比如,如果我們現(xiàn)在有兩個(gè)協(xié)程進(jìn)行網(wǎng)絡(luò)請(qǐng)求枕赵,我們希望獲取第一個(gè)最先返回的結(jié)果猜欺,那么就可以使用多路復(fù)用await,如下所示:

    @ExperimentalTime
    fun main() = runBlocking {
    
        val deferred1 = async {
            val time = DurationUnit.SECONDS.toMillis(Random.nextLong(5))
            log("1 ---> delay: $time")
            delay(time)
            log("1 ---> done")
            "Hello World"
        }
        val deferred2 = async {
            val time = DurationUnit.SECONDS.toMillis(Random.nextLong(3))
            log("2 ---> delay: $time")
            delay(time)
            log("2 ---> done")
            "Hi Select"
        }
    
        val result = select<String> {
            deferred1.onAwait {
                "1111 first --> result: $it"
            }
            deferred2.onAwait {
                "2222 first --> result: $it"
            }
        }
        log("result: $result")
    }
    

    多次運(yùn)行上述代碼拷窜,得到的結(jié)果始終返回最快的協(xié)程結(jié)果开皿,如下所示:


    可以看到涧黄,select返回值是其某一分支的返回值,且select返回結(jié)果后赋荆,并不會(huì)取消其他仍在運(yùn)行的協(xié)程笋妥。

  • 多路復(fù)用Channel:即多個(gè)Channel發(fā)送事件,使用onReceive可以選擇最先發(fā)送成功的數(shù)據(jù)窄潭。

    @ExperimentalTime
    fun randomDelay(until: Long = 5): Long {
        return DurationUnit.SECONDS.toMillis(Random.nextLong(until))
    }
    
    @ExperimentalTime
    fun main() = runBlocking {
        val channel1 = produce<String> {
            while (true) {
                val time = randomDelay()
                log("channel1 --> delay: $time")
                delay(time)
                send("send by channel 1111")
            }
        }
        val channel2 = produce<String> {
            while (true) {
                val time = randomDelay()
                log("channel2 --> delay: $time")
                delay(time)
                send("send by channel 2222")
            }
        }
        repeat(5) {
            log("------- start select -------")
            val result = select<String> {
                channel1.onReceive { it }
                channel2.onReceiveOrNull {
                    it ?: "Channel2 is closed"
                }
            }
            log("result: $result")
        }
        delay(2000)
        coroutineContext.cancelChildren()
    
    }
    

    :對(duì)于onReceive春宣,當(dāng)對(duì)應(yīng)的Channel被關(guān)閉時(shí),select會(huì)直接拋出異常嫉你。
    對(duì)于onReceiveOrNull月帝,當(dāng)對(duì)應(yīng)的Channel被關(guān)閉時(shí),直接返回null幽污。

多路復(fù)用select 除了支持上述的onAwaitonReceive事件外嚷辅,它還支持很多其他事件,比如onJoin距误,onSend...
事實(shí)上簸搞,所有能被select支持的事件都是SelectClasueN類(lèi)型,具體包含以下幾種:

  • SelectClasue0:表示對(duì)應(yīng)事件沒(méi)有返回值深寥。比如join沒(méi)有返回值攘乒,其對(duì)應(yīng)的SelectClasueN類(lèi)型事件即為onJoin

  • SelectClasue1:表示對(duì)應(yīng)事件有返回值。比如onAwait惋鹅,onReceive...

  • SelectClasue2:表示對(duì)應(yīng)事件有返回值则酝,且還帶有一個(gè)額外的參數(shù)。比如onSend闰集,其第一參數(shù)表示要發(fā)送的值沽讹,第二個(gè)參數(shù)sendChannel表示數(shù)據(jù)發(fā)送到的Channel對(duì)象。如下所示:

    fun main() = runBlocking {
        val channel = Channel<String>()
        launch {
            val result = channel.receive()
            log("receive: $result")
        }
        select<Unit> {
            channel.onSend("1") { sendChannel ->
                log("send to Channel: $sendChannel")
                sendChannel.send(Random.nextInt().toString())
            }
        }
    }
    

...

更多具體內(nèi)容武鲁,請(qǐng)參考:Select Expression

綜上爽雄,多路復(fù)用select 的主要作用就是可以同時(shí)等待多個(gè)協(xié)程,并選擇獲取最先可用協(xié)程的結(jié)果沐鼠。

一些注意點(diǎn)

  1. 對(duì)一個(gè)父協(xié)程進(jìn)行取消操作挚瘟,會(huì)自動(dòng)取消它作用域內(nèi)所有的協(xié)程:

    suspend fun showStatus(type: String) {
        log("$type starts")
        try {
            delay(100)
            log("$type done")
        } finally {
            log("$type cancelled")
        }
    }
    
    fun main() = runBlocking {
    
        val scope = CoroutineScope(Dispatchers.Default)
        scope.launch {
            launch { showStatus("launch") }
    
            launch {
                async { showStatus("async") }.await()
            }
    
            launch {
                withContext(Dispatchers.IO) { showStatus("withContext") }
            }
    
            coroutineScope { showStatus("coroutineScope") }
            // ...
        }
    
        delay(80)
        log("父協(xié)程取消")
        //  取消父協(xié)程,導(dǎo)致其作用域內(nèi)所有協(xié)程取消
        scope.cancel()
    }
    

    當(dāng)調(diào)用最外層scope.cancel()時(shí)饲梭,其作用域內(nèi)所有子協(xié)程乘盖、子子協(xié)程...都會(huì)被取消。結(jié)果如下圖所示:

  2. 創(chuàng)建一個(gè)協(xié)程時(shí)憔涉,可以為其上下文對(duì)象添加一個(gè)Job對(duì)象订框,對(duì)該Job對(duì)象調(diào)用取消操作時(shí),與該Job綁定的協(xié)程及其子協(xié)程都會(huì)被取消:

    fun main() = runBlocking {
    
        val job = Job()
        // 將協(xié)程與 Job 綁定
        val scope = CoroutineScope(Dispatchers.Default + job)
        scope.launch {
            // ...
        }
    
        delay(80)
        //  取消 Job 綁定的協(xié)程兜叨,導(dǎo)致其作用域內(nèi)所有協(xié)程取消
        job.cancel()
    }
    

    上述代碼與第 1 點(diǎn)的代碼基本一致穿扳,只是將CoroutineScopeJob綁定了起來(lái)衩侥,因此,調(diào)用Job.cancel()就可以取消與其綁定的協(xié)程矛物,從而該協(xié)程作用域內(nèi)的所有協(xié)程也都會(huì)被取消茫死。

  3. 協(xié)程作用域內(nèi)應(yīng)當(dāng)使用launchasync泽谨、coroutineScope璧榄、supervisorScope這些能繼承父協(xié)程上下文對(duì)象或者withContext等具備結(jié)構(gòu)化并發(fā)特性的協(xié)程構(gòu)造器來(lái)創(chuàng)建子協(xié)程,這樣才能滿(mǎn)足上述 1吧雹,2 條目結(jié)果骨杂,不要使用GlobalScope.launchCoroutineScope.launch等創(chuàng)建子協(xié)程雄卷,因?yàn)檫@些操作創(chuàng)建的協(xié)程運(yùn)行在其他作用域內(nèi)搓蚪,跳脫了當(dāng)前協(xié)程作用域,導(dǎo)致當(dāng)前協(xié)程無(wú)法對(duì)其進(jìn)行控制:

    fun main() = runBlocking {
    
        val scope = CoroutineScope(Dispatchers.Default)
        scope.launch {
    
            GlobalScope.launch {
                showStatus("GlobalScope")
            }
    
            CoroutineScope(Dispatchers.Default).launch {
                showStatus("CoroutineScope")
            }
        }
    
        delay(80)
        log("父協(xié)程取消")
        scope.cancel()
        // 避免程序退出
        delay(1000)
    }
    

    結(jié)果如下圖所示:

    可以看到丁鹉,父協(xié)程取消妒潭,子協(xié)程仍在運(yùn)行,父協(xié)程不具備控制子協(xié)程能力揣钦。

參考

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末雳灾,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子冯凹,更是在濱河造成了極大的恐慌谎亩,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,378評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件宇姚,死亡現(xiàn)場(chǎng)離奇詭異匈庭,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)浑劳,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,356評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門(mén)阱持,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人魔熏,你說(shuō)我怎么就攤上這事衷咽。” “怎么了蒜绽?”我有些...
    開(kāi)封第一講書(shū)人閱讀 152,702評(píng)論 0 342
  • 文/不壞的土叔 我叫張陵兵罢,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我滓窍,道長(zhǎng),這世上最難降的妖魔是什么巩那? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 55,259評(píng)論 1 279
  • 正文 為了忘掉前任吏夯,我火速辦了婚禮此蜈,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘噪生。我一直安慰自己柱宦,他們只是感情好绢馍,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,263評(píng)論 5 371
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著,像睡著了一般贞谓。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上宫纬,一...
    開(kāi)封第一講書(shū)人閱讀 49,036評(píng)論 1 285
  • 那天玻蝌,我揣著相機(jī)與錄音,去河邊找鬼璃吧。 笑死楣导,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的畜挨。 我是一名探鬼主播筒繁,決...
    沈念sama閱讀 38,349評(píng)論 3 400
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼巴元!你這毒婦竟也來(lái)了毡咏?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 36,979評(píng)論 0 259
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤逮刨,失蹤者是張志新(化名)和其女友劉穎呕缭,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體禀忆,經(jīng)...
    沈念sama閱讀 43,469評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡臊旭,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,938評(píng)論 2 323
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了箩退。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片离熏。...
    茶點(diǎn)故事閱讀 38,059評(píng)論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖戴涝,靈堂內(nèi)的尸體忽然破棺而出滋戳,到底是詐尸還是另有隱情,我是刑警寧澤啥刻,帶...
    沈念sama閱讀 33,703評(píng)論 4 323
  • 正文 年R本政府宣布奸鸯,位于F島的核電站,受9級(jí)特大地震影響可帽,放射性物質(zhì)發(fā)生泄漏娄涩。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,257評(píng)論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望蓄拣。 院中可真熱鬧扬虚,春花似錦、人聲如沸球恤。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,262評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)咽斧。三九已至堪置,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間张惹,已是汗流浹背舀锨。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,485評(píng)論 1 262
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留诵叁,地道東北人雁竞。 一個(gè)月前我還...
    沈念sama閱讀 45,501評(píng)論 2 354
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像拧额,于是被迫代替她去往敵國(guó)和親碑诉。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,792評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容