github博客
λ:
今天起 android demo 項(xiàng)目新加個(gè)sdk:騰訊云IM弃榨,最近正在用嘱吗,而且接口多,涉及到的需求也挺全擂错。正好練手。同時(shí)也有flutter
的sdk掷漱。順路把flutter
也寫了粘室。
大多數(shù)sdk或者庫在提供api時(shí),對(duì)于異步處理一般都是提供回調(diào)卜范。好處是通用,兼容鹿榜,不管java, kotlin海雪,不用管其他依賴庫。 壞處就不用再提了舱殿。
IM也不例外是一堆回調(diào)奥裸,MVVM模式下,一層層傳回調(diào)上去就很low沪袭,所以把IM用到的接口整理成Service
湾宙,在里邊把回調(diào)包成kotlin 協(xié)程掛起函數(shù)。
suspendCancellableCoroutine
public suspend inline fun <T> suspendCancellableCoroutine(crossinline block: (CancellableContinuation<T>) -> Unit): T
public suspend inline fun <T> suspendCoroutine(crossinline block: (Continuation<T>) -> Unit): T
協(xié)程庫提供的兩個(gè)內(nèi)聯(lián)函數(shù)
冈绊。通過操作其中的CancellableContinuation
提交結(jié)果侠鳄。點(diǎn)進(jìn)去看源碼,查看支持的操作死宣。
public interface Continuation<in T> {
public val context: CoroutineContext
public fun resumeWith(result: Result<T>)
}
public interface CancellableContinuation<in T> : Continuation<T> {
public val isActive: Boolean
public val isCompleted: Boolean
public val isCancelled: Boolean
public fun cancel(cause: Throwable? = null): Boolean
public fun invokeOnCancellation(handler: CompletionHandler)
... 試驗(yàn)性接口
}
public inline fun <T> Continuation<T>.resume(value: T): Unit = resumeWith(Result.success(value))
public inline fun <T> Continuation<T>.resumeWithException(exception: Throwable): Unit = resumeWith(Result.failure(exception))
public inline fun <T> Continuation(context: CoroutineContext, crossinline resumeWith: (Result<T>) -> Unit): Continuation<T>
忽略掉被打了標(biāo)簽的接口(不確定伟恶,試驗(yàn)性,即將廢棄毅该,等)博秫,看函數(shù)名基本就知道干嘛用,還剩這么點(diǎn)眶掌。 同時(shí)提供一堆拓展函數(shù)
挡育。
所以可以通過 resume
和 resumeWithException
提交回調(diào)返回的結(jié)果。
通過invokeOnCancellation
注冊(cè)取消時(shí)要執(zhí)行的任務(wù)朴爬,比如關(guān)閉流之類的即寒。
如IM中獲取所有已加入群,當(dāng)回調(diào)返回為失敗時(shí)寝殴,直接提交一個(gè)空列表蒿叠。或者提交個(gè)Throwable
蚣常。
suspend fun getJoinedGroupList(): List<V2TIMGroupInfo> =
suspendCancellableCoroutine { continuation ->
V2TIMManager.getGroupManager().getJoinedGroupList(object : V2TIMValueCallback<List<V2TIMGroupInfo>> {
override fun onSuccess(t: List<V2TIMGroupInfo>) {
continuation.resume(t)
}
override fun onError(code: Int, desc: String?) {
continuation.resume(emptyList())
// continuation.resumeWithException(Exception("code: $code, desc: $desc"))
}
})
}
callbackFlow, SharedFlow, StateFlow
有些回調(diào)是在實(shí)時(shí)監(jiān)聽數(shù)據(jù)市咽。比如位置信息,音量變化抵蚊,IM中數(shù)據(jù)變化施绎,新消息送達(dá)等等溯革。所以這種回調(diào)用 kotlin Flow 和 Channel
處理。
callbackFlow
現(xiàn)在callbackFlow
仍標(biāo)記為@ExperimentalCoroutinesApi
谷醉。所以等 雞啄完了米致稀,狗舔完了面,火燒斷了鎖 俱尼。我再用抖单。
允許在不同的CoroutineContext
中提交數(shù)據(jù)。刨一下源碼:
public fun <T> callbackFlow(@BuilderInference block: suspend ProducerScope<T>.() -> Unit): Flow<T> = CallbackFlowBuilder(block)
private class CallbackFlowBuilder<T>(...) : ChannelFlowBuilder<T>(...)
private open class ChannelFlowBuilder<T>(...) : ChannelFlow<T>(...)
最底層就是個(gè)ChannelFlow
遇八,也就是開個(gè)帶緩沖區(qū)Channel
來收集數(shù)據(jù)矛绘,在Flow
里接收數(shù)據(jù)。
而CallbackFlowBuilder
在此基礎(chǔ)上加了awaitClose
: 當(dāng)流要關(guān)閉時(shí)要執(zhí)行的操作刃永,常見的是注銷掉回調(diào)函數(shù)货矮。如果沒有awaitClose
,將會(huì)拋出IllegalStateException
異常斯够。
所以提交數(shù)據(jù)的方式和SendChannel
一樣囚玫。
callbackFlow<T> {
send(T) // 發(fā)送數(shù)據(jù)
offer(T) // 允許在協(xié)程外提交
sendBlocking(T) //嘗試用offer提交,如果失敗則runBlocking{ send(T) }读规,阻塞式提交
awaitClose(block: () -> Unit = {}) // 關(guān)閉時(shí)執(zhí)行的操作
}
// demo
fun flowFrom(api: CallbackBasedApi): Flow<T> = callbackFlow {
val callback = object : Callback {
override fun onNextValue(value: T) {
try {
sendBlocking(value)
} catch (e: Exception) {
}
}
override fun onApiError(cause: Throwable) {
cancel(CancellationException("API Error", cause))
}
override fun onCompleted() = channel.close()
}
api.register(callback)
awaitClose { api.unregister(callback) }
}
SharedFlow
取代BroadcastChannel
抓督。
SharedFlow
, MutableSharedFlow
都是 interface
。 同時(shí)提供了fun MutableSharedFlow
用于快速構(gòu)造掖桦。
public fun <T> MutableSharedFlow(
replay: Int = 0,
extraBufferCapacity: Int = 0,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): MutableSharedFlow<T>
replay
:重播n個(gè)之前收到的數(shù)據(jù)給新訂閱者本昏。 >= 0
extraBufferCapacity
:除了重播之外緩沖區(qū)大小。當(dāng)緩沖區(qū)不滿時(shí)枪汪,提交數(shù)據(jù)不會(huì)掛起
onBufferOverflow
:緩沖區(qū)溢出時(shí)的策略(replay != 0 || extraBufferCapacity != 0 才有效)涌穆。默認(rèn)SUSPEND: 暫停發(fā)送,DROP_OLDEST:刪掉最舊數(shù)據(jù)雀久,DROP_LATEST:刪掉最新數(shù)據(jù)宿稀。
通過emit(T)
在協(xié)程中提交數(shù)據(jù)。
tryEmit(T): Boolean
嘗試在不掛起的情況下提交數(shù)據(jù)赖捌,成功則返回true
祝沸。 如果onBufferOverflow = BufferOverflow.SUSPEND
,在緩沖區(qū)滿時(shí)越庇,tryEmit
會(huì)返回false
罩锐,直到有新空間。而如果是DROP_OLDEST
或DROP_LATEST
卤唉,不會(huì)阻塞涩惑,tryEmit
永為true
。
所以我用MutableSharedFlow
替代callbackFlow
的提交過程桑驱。
// demo
val resFlow = MutableSharedFlow<Res<T>>(
extraBufferCapacity = 1,
onBufferOverflow: BufferOverflow = BufferOverflow.DROP_OLDEST
)
fun registerCallback() {
val callback = object : Callback {
override fun onNextValue(value: T) {
resFlow.tryEmit(Res.Success(value))
}
override fun onApiError(cause: Throwable) {
resFlow.tryEmit(Res.Failed(cause))
}
override fun onCompleted() {
resFlow.tryEmit(Res.Finish)
}
}
api.register(callback)
}
StateFlow
繼承自SharedFlow
竭恬,同樣也提供了快速構(gòu)造的函數(shù)跛蛋。函數(shù)必須提交一個(gè)初始的value
。底層相當(dāng)于開了一個(gè) MutableSharedFlow(replay = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST)
.
public interface StateFlow<out T> : SharedFlow<T> {
public val value: T
}
public interface MutableStateFlow<T> : StateFlow<T>, MutableSharedFlow<T> {
public override var value: T
public fun compareAndSet(expect: T, update: T): Boolean
}
@Suppress("FunctionName")
public fun <T> MutableStateFlow(value: T): MutableStateFlow<T> = StateFlowImpl(value ?: NULL)
compareAndSet
: 如果當(dāng)前值為expect
, 則更新為update
痊硕。如果更新則返回true
(包括current == expect && current == update
的情況)赊级。
// demo
class LatestNewsViewModel(
private val newsRepository: NewsRepository) : ViewModel() {
private val _uiState = MutableStateFlow(LatestNewsUiState.Success(emptyList()))
val uiState: StateFlow<LatestNewsUiState> = _uiState
init {
viewModelScope.launch {
newsRepository.favoriteLatestNews.collect { favoriteNews ->
_uiState.value = LatestNewsUiState.Success(favoriteNews)
}
}
}
}
sealed class LatestNewsUiState {
data class Success(news: List<ArticleHeadline>): LatestNewsUiState()
data class Error(exception: Throwable): LatestNewsUiState()
}