如果您是庫作者涨岁,您也許希望用戶在使用 Kotlin 協(xié)程與 Flow 時可以更加輕松地調(diào)用您基于 Java 或回調(diào)的 API拐袜。另外,如果您是 API 的使用者梢薪,則可能愿意將第三方 API 界面適配協(xié)程蹬铺,以使它們對 Kotlin 更友好。
本文將會介紹如何使用協(xié)程和 Flow 簡化 API秉撇,以及如何使用 suspendCancellableCoroutine 和 callbackFlow API 創(chuàng)建您自己的適配器甜攀。針對那些富有好奇心的讀者,本文還會對這些 API 進行剖析琐馆,以幫您了解它們底層的工作原理规阀。
如果您更喜歡觀看視頻,可以 點擊這里瘦麸。
檢查現(xiàn)有協(xié)程適配器
在您為現(xiàn)有 API 編寫自己的封裝之前谁撼,請檢查是否已經(jīng)存在針對您的用例的適配器或者 擴展方法。下面是一些包含常見類型協(xié)程適配器的庫滋饲。
Future 類型
對于 future 類型厉碟,Java 8 集成了 CompletableFuture,而 Guava 集成了 ListenableFuture屠缭。這里提到的并不是全部箍鼓,您可以在線搜索以確定是否存在適用于您的 future 類型的適配器。
// 等待 CompletionStage 的執(zhí)行完成而不阻塞線程
suspend fun <T> CompletionStage<T>.await(): T
// 等待 ListenableFuture 的執(zhí)行完成而不阻塞線程
suspend fun <T> ListenableFuture<T>.await(): T
使用這些函數(shù)呵曹,您可以擺脫回調(diào)并掛起協(xié)程直到 future 的結果被返回袄秩。
Reactive Stream
對于響應式流的庫,有針對 RxJava、Java 9 API 與 響應式流庫 的集成:
// 將給定的響應式 Publisher 轉換為 Flow
fun <T : Any> Publisher<T>.asFlow(): Flow<T>
這些函數(shù)將響應式流轉換為了 Flow之剧。
Android 專用 API
對于 Jetpack 庫或 Android 平臺 API郭卫,您可以參閱 Jetpack KTX 庫 列表。現(xiàn)有超過 20 個庫擁有 KTX 版本背稼,構成了您所熟悉的 Java API贰军。其中包括 SharedPreferences、ViewModels蟹肘、SQLite 以及 Play Core词疼。
回調(diào)
回調(diào)是實現(xiàn)異步通訊時非常常見的做法。事實上帘腹,我們在 后臺線程任務運行指南 中將回調(diào)作為 Java 編程語言的默認解決方案贰盗。然而,回調(diào)也有許多缺點: 這一設計會導致令人費解的回調(diào)嵌套阳欲。同時舵盈,由于沒有簡單的傳播方式,錯誤處理也更加復雜球化。在 Kotlin 中秽晚,您可以簡單地使用協(xié)程調(diào)用回調(diào),但前提是您必須創(chuàng)建您自己的適配器筒愚。
創(chuàng)建您自己的適配器
如果沒有找到適合您用例的適配器赴蝇,更直接的做法是自己編寫適配器。對于一次性異步調(diào)用巢掺,可以使用 suspendCancellableCoroutine API句伶;而對于流數(shù)據(jù),可以使用 callbackFlow API陆淀。
作為練習熄阻,下面的示例將會使用來自 Google Play Services 的 Fused Location Provider API 來獲取位置數(shù)據(jù)。此 API 界面十分簡單倔约,但是它使用回調(diào)來執(zhí)行異步操作。當邏輯變得復雜時坝初,這些回調(diào)容易使代碼變得不可讀浸剩,而我們可以使用協(xié)程來擺脫它們。
如果您希望探索其它解決方案鳄袍,可以通過上面函數(shù)所鏈接的源代碼為您帶來啟發(fā)绢要。
一次性異步調(diào)用
Fused Location Provider API 提供了 getLastLocation 方法來獲得 最后已知位置。對于協(xié)程來說拗小,理想的 API 是一個直接返回確切結果的掛起函數(shù)重罪。
注意: 這一 API 返回值為 Task,并且已經(jīng)有了對應的 適配器。出于學習的目的剿配,我們用它作為范例搅幅。
我們可以通過為 FusedLocationProviderClient
創(chuàng)建擴展函數(shù)來獲得更好的 API:
suspend fun FusedLocationProviderClient.awaitLastLocation(): Location
由于這是一個一次性異步操作,我們使用 suspendCancellableCoroutine
函數(shù): 一個用于從協(xié)程庫創(chuàng)建掛起函數(shù)的底層構建塊呼胚。
suspendCancellableCoroutine
會執(zhí)行作為參數(shù)傳入的代碼塊茄唐,然后在等待繼續(xù)信號期間掛起協(xié)程的執(zhí)行。當協(xié)程 Continuation 對象中的 resume
或 resumeWithException
方法被調(diào)用時蝇更,協(xié)程會被恢復執(zhí)行沪编。有關 Continuation 的更多信息,請參閱: Kotlin Vocabulary | 揭秘協(xié)程中的 suspend 修飾符年扩。
我們使用可以添加到 getLastLocation 方法中的回調(diào)來在合適的時機恢復協(xié)程蚁廓。參見下面的實現(xiàn):
// FusedLocationProviderClient 的擴展函數(shù)厨幻,返回最后已知位置
suspend fun FusedLocationProviderClient.awaitLastLocation(): Location =
// 創(chuàng)建新的可取消協(xié)程
suspendCancellableCoroutine<Location> { continuation ->
// 添加恢復協(xié)程執(zhí)行的監(jiān)聽器
lastLocation.addOnSuccessListener { location ->
// 恢復協(xié)程并返回位置
continuation.resume(location)
}.addOnFailureListener { e ->
// 通過拋出異常來恢復協(xié)程
continuation.resumeWithException(e)
}
// suspendCancellableCoroutine 塊的結尾克胳。這里會掛起協(xié)程
//直到某個回調(diào)調(diào)用了 continuation 參數(shù)
}
注意: 盡管協(xié)程庫中同樣包含了不可取消版本的協(xié)程構建器 (即 suspendCoroutine
)捏雌,但最好始終選擇使用 suspendCancellableCoroutine
處理協(xié)程作用域的取消及從底層 API 傳播取消事件。
suspendCancellableCoroutine 原理
在內(nèi)部,suspendCancellableCoroutine 使用 suspendCoroutineUninterceptedOrReturn 在掛起函數(shù)的協(xié)程中獲得 Continuation宵荒。這一 Continuation
對象會被一個 CancellableContinuation 對象攔截,后者會從此時開始控制協(xié)程的生命周期 (其 實現(xiàn) 具有 Job 的功能暑刃,但是有一些限制)溜嗜。
接下來,傳遞給 suspendCancellableCoroutine
的 lambda 表達式會被執(zhí)行焙压。如果該 lambda 返回了結果,則協(xié)程將立即恢復幻件;否則協(xié)程將會在 CancellableContinuation 被 lambda 手動恢復前保持掛起狀態(tài)贺待。
您可以通過我在下面代碼片段 (原版實現(xiàn)) 中的注釋來了解發(fā)生了什么:
public suspend inline fun <T> suspendCancellableCoroutine(
crossinline block: (CancellableContinuation<T>) -> Unit
): T =
// 獲取運行此掛起函數(shù)的協(xié)程的 Continuation 對象
suspendCoroutineUninterceptedOrReturn { uCont ->
// 接管協(xié)程秃臣。Continuation 已經(jīng)被攔截,
// 接下來將會遵循 CancellableContinuationImpl 的生命周期
val cancellable = CancellableContinuationImpl(uCont.intercepted(), ...)
/* ... */
// 使用可取消 Continuation 調(diào)用代碼塊
block(cancellable)
// 掛起協(xié)程并且等待 Continuation 在 “block” 中被恢復稚虎,或者在 “block” 結束執(zhí)行時返回結果
cancellable.getResult()
}
想了解更多有關掛起函數(shù)的工作原理,請參閱這篇: Kotlin Vocabulary | 揭秘協(xié)程中的 suspend 修飾符寻拂。
流數(shù)據(jù)
如果我們轉而希望用戶的設備在真實的環(huán)境中移動時赡麦,周期性地接收位置更新 (使用 requestLocationUpdates 函數(shù))泛粹,我們就需要使用 Flow 來創(chuàng)建數(shù)據(jù)流扒接。理想的 API 看起來應該像下面這樣:
fun FusedLocationProviderClient.locationFlow(): Flow<Location>
為了將基于回調(diào)的 API 轉換為 Flow,可以使用 callbackFlow 流構建器來創(chuàng)建新的 flow。callbackFlow
的 lambda 表達式的內(nèi)部處于一個協(xié)程的上下文中矾利,這意味著它可以調(diào)用掛起函數(shù)欣鳖。不同于 flow 流構建器让网,channelFlow 可以在不同的 CoroutineContext 或協(xié)程之外使用 offer 方法發(fā)送數(shù)據(jù)。
通常情況下,使用 callbackFlow 構建流適配器遵循以下三個步驟:
- 創(chuàng)建使用 offer 向 flow 添加元素的回調(diào);
- 注冊回調(diào)商佑;
- 等待消費者取消協(xié)程,并注銷回調(diào)。
將上述步驟應用于當前用例,我們得到以下實現(xiàn):
// 發(fā)送位置更新給消費者
fun FusedLocationProviderClient.locationFlow() = callbackFlow<Location> {
// 創(chuàng)建了新的 Flow新症。這段代碼會在協(xié)程中執(zhí)行徒爹。
// 1. 創(chuàng)建回調(diào)并向 flow 中添加元素
val callback = object : LocationCallback() {
override fun onLocationResult(result: LocationResult?) {
result ?: return // 忽略為空的結果
for (location in result.locations) {
try {
offer(location) // 將位置發(fā)送到 flow
} catch (t: Throwable) {
// 位置無法發(fā)送到 flow
}
}
}
}
// 2. 注冊回調(diào)并通過調(diào)用 requestLocationUpdates 獲取位置更新荚醒。
requestLocationUpdates(
createLocationRequest(),
callback,
Looper.getMainLooper()
).addOnFailureListener { e ->
close(e) // 在出錯時關閉 flow
}
// 3. 等待消費者取消協(xié)程并注銷回調(diào)。這一過程會掛起協(xié)程隆嗅,直到 Flow 被關閉界阁。
awaitClose {
// 在這里清理代碼
removeLocationUpdates(callback)
}
}
callbackFlow 內(nèi)部原理
在內(nèi)部,callbackFlow 使用了一個 channel胖喳。channel 在概念上很接近阻塞 隊列 —— 它在配置時需要指定容量 (capacity): 即可以緩沖的元素個數(shù)泡躯。在 callbackFlow 中創(chuàng)建的 channel 默認容量是 64 個元素。如果將新元素添加到已滿的 channel丽焊,由于 offer 不會將元素添加到 channel 中较剃,并且會立即返回 false,所以 send 會暫停生產(chǎn)者技健,直到頻道 channel 中有新元素的可用空間為止写穴。
awaitClose 內(nèi)部原理
有趣的是,awaitClose
內(nèi)部使用的是 suspendCancellableCoroutine
雌贱。您可以通過我在以下代碼片段中的注釋 (查看 原始實現(xiàn)) 一窺究竟:
public suspend fun ProducerScope<*>.awaitClose(block: () -> Unit = {}) {
...
try {
// 使用可取消 continuation 掛起協(xié)程
suspendCancellableCoroutine<Unit> { cont ->
// 僅在 Flow 或 Channel 關閉時成功恢復協(xié)程啊送,否則保持掛起
invokeOnClose { cont.resume(Unit) }
}
} finally {
// 總是會執(zhí)行調(diào)用者的清理代碼
block()
}
}
復用 Flow
除非額外使用中間操作符 (如: conflate
),否則 Flow 是冷且惰性的欣孤。這意味著每次調(diào)用 flow 的終端操作符時馋没,都會執(zhí)行構建塊。對于我們的用例來說降传,由于添加一個新的位置監(jiān)聽器開銷很小篷朵,所以這一特性不會有什么大問題。然而對于另外的一些實現(xiàn)可就不一定了婆排。
您可以使用 shareIn
中間操作符在多個收集器間復用同一個 flow声旺,并使冷流成為熱流控硼。
val FusedLocationProviderClient.locationFlow() = callbackFlow<Location> {
...
}.shareIn(
// 讓 flow 跟隨 applicationScope
applicationScope,
// 向新的收集器發(fā)送上一次發(fā)送的元素
replay = 1,
// 在有活躍的訂閱者時,保持生產(chǎn)者的活躍狀態(tài)
started = SharingStarted.WhileSubscribed()
)
您可以通過文章《協(xié)程中的取消和異常 | 駐留任務詳解》來了解更多有關在應用中使用 applicationScope
的最佳實踐艾少。
您應當考慮通過創(chuàng)建協(xié)程適配器使您的 API 或現(xiàn)存 API 簡潔、易讀且符合 Kotlin 的使用習慣翼悴。首先檢查是否已經(jīng)存在可用的適配器缚够,如果沒有,您可以使用 suspendCancellableCoroutine
針對一次性調(diào)用鹦赎;或使用 callbackFlow
針對流數(shù)據(jù)谍椅,來創(chuàng)建您自己的適配器。
您可以通過 codelab: 創(chuàng)建 Kotlin 擴展庫古话,來上手本文所介紹的話題雏吭。