使用協(xié)程和 Flow 簡化 API 設計

image

如果您是庫作者涨岁,您也許希望用戶在使用 Kotlin 協(xié)程與 Flow 時可以更加輕松地調(diào)用您基于 Java 或回調(diào)的 API拐袜。另外,如果您是 API 的使用者梢薪,則可能愿意將第三方 API 界面適配協(xié)程蹬铺,以使它們對 Kotlin 更友好。

本文將會介紹如何使用協(xié)程和 Flow 簡化 API秉撇,以及如何使用 suspendCancellableCoroutine 和 callbackFlow API 創(chuàng)建您自己的適配器甜攀。針對那些富有好奇心的讀者,本文還會對這些 API 進行剖析琐馆,以幫您了解它們底層的工作原理规阀。

如果您更喜歡觀看視頻,可以 點擊這里瘦麸。

檢查現(xiàn)有協(xié)程適配器

在您為現(xiàn)有 API 編寫自己的封裝之前谁撼,請檢查是否已經(jīng)存在針對您的用例的適配器或者 擴展方法。下面是一些包含常見類型協(xié)程適配器的庫滋饲。

Future 類型

對于 future 類型厉碟,Java 8 集成了 CompletableFuture,而 Guava 集成了 ListenableFuture屠缭。這里提到的并不是全部箍鼓,您可以在線搜索以確定是否存在適用于您的 future 類型的適配器。

// 等待 CompletionStage 的執(zhí)行完成而不阻塞線程
suspend fun <T> CompletionStage<T>.await(): T 
 
// 等待 ListenableFuture 的執(zhí)行完成而不阻塞線程
suspend fun <T> ListenableFuture<T>.await(): T

使用這些函數(shù)呵曹,您可以擺脫回調(diào)并掛起協(xié)程直到 future 的結果被返回袄秩。

Reactive Stream

對于響應式流的庫,有針對 RxJavaJava 9 API響應式流庫 的集成:

// 將給定的響應式 Publisher 轉換為 Flow
fun <T : Any> Publisher<T>.asFlow(): Flow<T>

這些函數(shù)將響應式流轉換為了 Flow之剧。

Android 專用 API

對于 Jetpack 庫或 Android 平臺 API郭卫,您可以參閱 Jetpack KTX 庫 列表。現(xiàn)有超過 20 個庫擁有 KTX 版本背稼,構成了您所熟悉的 Java API贰军。其中包括 SharedPreferences、ViewModels蟹肘、SQLite 以及 Play Core词疼。

回調(diào)

回調(diào)是實現(xiàn)異步通訊時非常常見的做法。事實上帘腹,我們在 后臺線程任務運行指南 中將回調(diào)作為 Java 編程語言的默認解決方案贰盗。然而,回調(diào)也有許多缺點: 這一設計會導致令人費解的回調(diào)嵌套阳欲。同時舵盈,由于沒有簡單的傳播方式,錯誤處理也更加復雜球化。在 Kotlin 中秽晚,您可以簡單地使用協(xié)程調(diào)用回調(diào),但前提是您必須創(chuàng)建您自己的適配器筒愚。

創(chuàng)建您自己的適配器

如果沒有找到適合您用例的適配器赴蝇,更直接的做法是自己編寫適配器。對于一次性異步調(diào)用巢掺,可以使用 suspendCancellableCoroutine API句伶;而對于流數(shù)據(jù),可以使用 callbackFlow API陆淀。

作為練習熄阻,下面的示例將會使用來自 Google Play Services 的 Fused Location Provider API 來獲取位置數(shù)據(jù)。此 API 界面十分簡單倔约,但是它使用回調(diào)來執(zhí)行異步操作。當邏輯變得復雜時坝初,這些回調(diào)容易使代碼變得不可讀浸剩,而我們可以使用協(xié)程來擺脫它們。

如果您希望探索其它解決方案鳄袍,可以通過上面函數(shù)所鏈接的源代碼為您帶來啟發(fā)绢要。

一次性異步調(diào)用

Fused Location Provider API 提供了 getLastLocation 方法來獲得 最后已知位置。對于協(xié)程來說拗小,理想的 API 是一個直接返回確切結果的掛起函數(shù)重罪。

注意: 這一 API 返回值為 Task,并且已經(jīng)有了對應的 適配器。出于學習的目的剿配,我們用它作為范例搅幅。

我們可以通過為 FusedLocationProviderClient 創(chuàng)建擴展函數(shù)來獲得更好的 API:

suspend fun FusedLocationProviderClient.awaitLastLocation(): Location

由于這是一個一次性異步操作,我們使用 suspendCancellableCoroutine 函數(shù): 一個用于從協(xié)程庫創(chuàng)建掛起函數(shù)的底層構建塊呼胚。

suspendCancellableCoroutine 會執(zhí)行作為參數(shù)傳入的代碼塊茄唐,然后在等待繼續(xù)信號期間掛起協(xié)程的執(zhí)行。當協(xié)程 Continuation 對象中的 resumeresumeWithException 方法被調(diào)用時蝇更,協(xié)程會被恢復執(zhí)行沪编。有關 Continuation 的更多信息,請參閱: Kotlin Vocabulary | 揭秘協(xié)程中的 suspend 修飾符年扩。

我們使用可以添加到 getLastLocation 方法中的回調(diào)來在合適的時機恢復協(xié)程蚁廓。參見下面的實現(xiàn):

// FusedLocationProviderClient 的擴展函數(shù)厨幻,返回最后已知位置
suspend fun FusedLocationProviderClient.awaitLastLocation(): Location =

  // 創(chuàng)建新的可取消協(xié)程
  suspendCancellableCoroutine<Location> { continuation ->

    // 添加恢復協(xié)程執(zhí)行的監(jiān)聽器
    lastLocation.addOnSuccessListener { location ->
      // 恢復協(xié)程并返回位置
      continuation.resume(location)
    }.addOnFailureListener { e ->
      // 通過拋出異常來恢復協(xié)程
      continuation.resumeWithException(e)
    }

    // suspendCancellableCoroutine 塊的結尾克胳。這里會掛起協(xié)程
    //直到某個回調(diào)調(diào)用了 continuation 參數(shù)
  }

注意: 盡管協(xié)程庫中同樣包含了不可取消版本的協(xié)程構建器 (即 suspendCoroutine)捏雌,但最好始終選擇使用 suspendCancellableCoroutine 處理協(xié)程作用域的取消及從底層 API 傳播取消事件。

suspendCancellableCoroutine 原理

在內(nèi)部,suspendCancellableCoroutine 使用 suspendCoroutineUninterceptedOrReturn 在掛起函數(shù)的協(xié)程中獲得 Continuation宵荒。這一 Continuation 對象會被一個 CancellableContinuation 對象攔截,后者會從此時開始控制協(xié)程的生命周期 (其 實現(xiàn) 具有 Job 的功能暑刃,但是有一些限制)溜嗜。

接下來,傳遞給 suspendCancellableCoroutine 的 lambda 表達式會被執(zhí)行焙压。如果該 lambda 返回了結果,則協(xié)程將立即恢復幻件;否則協(xié)程將會在 CancellableContinuation 被 lambda 手動恢復前保持掛起狀態(tài)贺待。

您可以通過我在下面代碼片段 (原版實現(xiàn)) 中的注釋來了解發(fā)生了什么:

public suspend inline fun <T> suspendCancellableCoroutine(
  crossinline block: (CancellableContinuation<T>) -> Unit
): T =
  // 獲取運行此掛起函數(shù)的協(xié)程的 Continuation 對象 
  suspendCoroutineUninterceptedOrReturn { uCont ->

    // 接管協(xié)程秃臣。Continuation 已經(jīng)被攔截,
    // 接下來將會遵循 CancellableContinuationImpl 的生命周期
    val cancellable = CancellableContinuationImpl(uCont.intercepted(), ...)
    /* ... */
 
    // 使用可取消 Continuation 調(diào)用代碼塊
    block(cancellable)
        
    // 掛起協(xié)程并且等待 Continuation 在 “block” 中被恢復稚虎,或者在 “block” 結束執(zhí)行時返回結果 
    cancellable.getResult()
  }

想了解更多有關掛起函數(shù)的工作原理,請參閱這篇: Kotlin Vocabulary | 揭秘協(xié)程中的 suspend 修飾符寻拂。

流數(shù)據(jù)

如果我們轉而希望用戶的設備在真實的環(huán)境中移動時赡麦,周期性地接收位置更新 (使用 requestLocationUpdates 函數(shù))泛粹,我們就需要使用 Flow 來創(chuàng)建數(shù)據(jù)流扒接。理想的 API 看起來應該像下面這樣:

fun FusedLocationProviderClient.locationFlow(): Flow<Location>

為了將基于回調(diào)的 API 轉換為 Flow,可以使用 callbackFlow 流構建器來創(chuàng)建新的 flow。callbackFlow 的 lambda 表達式的內(nèi)部處于一個協(xié)程的上下文中矾利,這意味著它可以調(diào)用掛起函數(shù)欣鳖。不同于 flow 流構建器让网,channelFlow 可以在不同的 CoroutineContext 或協(xié)程之外使用 offer 方法發(fā)送數(shù)據(jù)。

通常情況下,使用 callbackFlow 構建流適配器遵循以下三個步驟:

  1. 創(chuàng)建使用 offer 向 flow 添加元素的回調(diào);
  2. 注冊回調(diào)商佑;
  3. 等待消費者取消協(xié)程,并注銷回調(diào)。

將上述步驟應用于當前用例,我們得到以下實現(xiàn):

// 發(fā)送位置更新給消費者
fun FusedLocationProviderClient.locationFlow() = callbackFlow<Location> {
  // 創(chuàng)建了新的 Flow新症。這段代碼會在協(xié)程中執(zhí)行徒爹。
  // 1. 創(chuàng)建回調(diào)并向 flow 中添加元素
  val callback = object : LocationCallback() {
    override fun onLocationResult(result: LocationResult?) {
      result ?: return  // 忽略為空的結果
      for (location in result.locations) {
        try {
          offer(location)  // 將位置發(fā)送到 flow
        } catch (t: Throwable) {
          // 位置無法發(fā)送到 flow
        }
      }
    }
  }
  
  // 2. 注冊回調(diào)并通過調(diào)用 requestLocationUpdates 獲取位置更新荚醒。
  requestLocationUpdates(
    createLocationRequest(),
    callback,
    Looper.getMainLooper()
  ).addOnFailureListener { e ->
    close(e)  // 在出錯時關閉 flow
  }
  
  // 3. 等待消費者取消協(xié)程并注銷回調(diào)。這一過程會掛起協(xié)程隆嗅,直到 Flow 被關閉界阁。
  awaitClose {
    // 在這里清理代碼
    removeLocationUpdates(callback)
  }
}

callbackFlow 內(nèi)部原理

在內(nèi)部,callbackFlow 使用了一個 channel胖喳。channel 在概念上很接近阻塞 隊列 —— 它在配置時需要指定容量 (capacity): 即可以緩沖的元素個數(shù)泡躯。在 callbackFlow 中創(chuàng)建的 channel 默認容量是 64 個元素。如果將新元素添加到已滿的 channel丽焊,由于 offer 不會將元素添加到 channel 中较剃,并且會立即返回 false,所以 send 會暫停生產(chǎn)者技健,直到頻道 channel 中有新元素的可用空間為止写穴。

awaitClose 內(nèi)部原理

有趣的是,awaitClose 內(nèi)部使用的是 suspendCancellableCoroutine雌贱。您可以通過我在以下代碼片段中的注釋 (查看 原始實現(xiàn)) 一窺究竟:

public suspend fun ProducerScope<*>.awaitClose(block: () -> Unit = {}) {
  ...
  try {
    // 使用可取消 continuation 掛起協(xié)程
    suspendCancellableCoroutine<Unit> { cont ->
      // 僅在 Flow 或 Channel 關閉時成功恢復協(xié)程啊送,否則保持掛起
      invokeOnClose { cont.resume(Unit) }
    }
  } finally {
    // 總是會執(zhí)行調(diào)用者的清理代碼
    block()
  }
}

復用 Flow

除非額外使用中間操作符 (如: conflate),否則 Flow 是冷且惰性的欣孤。這意味著每次調(diào)用 flow 的終端操作符時馋没,都會執(zhí)行構建塊。對于我們的用例來說降传,由于添加一個新的位置監(jiān)聽器開銷很小篷朵,所以這一特性不會有什么大問題。然而對于另外的一些實現(xiàn)可就不一定了婆排。

您可以使用 shareIn 中間操作符在多個收集器間復用同一個 flow声旺,并使冷流成為熱流控硼。

val FusedLocationProviderClient.locationFlow() = callbackFlow<Location> {
  ...
}.shareIn(
  // 讓 flow 跟隨 applicationScope
  applicationScope,
  // 向新的收集器發(fā)送上一次發(fā)送的元素
  replay = 1,
  // 在有活躍的訂閱者時,保持生產(chǎn)者的活躍狀態(tài)
  started = SharingStarted.WhileSubscribed()
)

您可以通過文章《協(xié)程中的取消和異常 | 駐留任務詳解》來了解更多有關在應用中使用 applicationScope 的最佳實踐艾少。

您應當考慮通過創(chuàng)建協(xié)程適配器使您的 API 或現(xiàn)存 API 簡潔、易讀且符合 Kotlin 的使用習慣翼悴。首先檢查是否已經(jīng)存在可用的適配器缚够,如果沒有,您可以使用 suspendCancellableCoroutine 針對一次性調(diào)用鹦赎;或使用 callbackFlow 針對流數(shù)據(jù)谍椅,來創(chuàng)建您自己的適配器。

您可以通過 codelab: 創(chuàng)建 Kotlin 擴展庫古话,來上手本文所介紹的話題雏吭。

?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市陪踩,隨后出現(xiàn)的幾起案子杖们,更是在濱河造成了極大的恐慌,老刑警劉巖肩狂,帶你破解...
    沈念sama閱讀 219,188評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件摘完,死亡現(xiàn)場離奇詭異,居然都是意外死亡傻谁,警方通過查閱死者的電腦和手機孝治,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,464評論 3 395
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來审磁,“玉大人谈飒,你說我怎么就攤上這事√伲” “怎么了杭措?”我有些...
    開封第一講書人閱讀 165,562評論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長吃媒。 經(jīng)常有香客問我瓤介,道長,這世上最難降的妖魔是什么赘那? 我笑而不...
    開封第一講書人閱讀 58,893評論 1 295
  • 正文 為了忘掉前任刑桑,我火速辦了婚禮,結果婚禮上募舟,老公的妹妹穿的比我還像新娘祠斧。我一直安慰自己,他們只是感情好拱礁,可當我...
    茶點故事閱讀 67,917評論 6 392
  • 文/花漫 我一把揭開白布琢锋。 她就那樣靜靜地躺著辕漂,像睡著了一般。 火紅的嫁衣襯著肌膚如雪吴超。 梳的紋絲不亂的頭發(fā)上钉嘹,一...
    開封第一講書人閱讀 51,708評論 1 305
  • 那天,我揣著相機與錄音鲸阻,去河邊找鬼跋涣。 笑死,一個胖子當著我的面吹牛鸟悴,可吹牛的內(nèi)容都是我干的陈辱。 我是一名探鬼主播,決...
    沈念sama閱讀 40,430評論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼细诸,長吁一口氣:“原來是場噩夢啊……” “哼沛贪!你這毒婦竟也來了?” 一聲冷哼從身側響起震贵,我...
    開封第一講書人閱讀 39,342評論 0 276
  • 序言:老撾萬榮一對情侶失蹤利赋,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后猩系,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體隐砸,經(jīng)...
    沈念sama閱讀 45,801評論 1 317
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,976評論 3 337
  • 正文 我和宋清朗相戀三年蝙眶,在試婚紗的時候發(fā)現(xiàn)自己被綠了季希。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,115評論 1 351
  • 序言:一個原本活蹦亂跳的男人離奇死亡幽纷,死狀恐怖式塌,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情友浸,我是刑警寧澤峰尝,帶...
    沈念sama閱讀 35,804評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站收恢,受9級特大地震影響武学,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜伦意,卻給世界環(huán)境...
    茶點故事閱讀 41,458評論 3 331
  • 文/蒙蒙 一火窒、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧驮肉,春花似錦熏矿、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,008評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽褪储。三九已至,卻和暖如春慧域,著一層夾襖步出監(jiān)牢的瞬間鲤竹,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,135評論 1 272
  • 我被黑心中介騙來泰國打工昔榴, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留宛裕,地道東北人。 一個月前我還...
    沈念sama閱讀 48,365評論 3 373
  • 正文 我出身青樓论泛,卻偏偏與公主長得像,于是被迫代替她去往敵國和親蛹屿。 傳聞我的和親對象是個殘疾皇子屁奏,可洞房花燭夜當晚...
    茶點故事閱讀 45,055評論 2 355

推薦閱讀更多精彩內(nèi)容