什么匾灶?RxCache是什么惧互?先整一段代碼
RequestApi.api(ApiClient.create(Api.class).getBanner())
.cacheKey("banner")
.cacheStrategy(CacheStrategy.CACHE_AND_REMOTE)
.cacheable(data -> data.hasData())
.buildCacheWithCacheResult(new CacheType<ApiResponse<List<BannerBean>>>() {})
.subscribe(new CacheSubscriber<ApiResponse<List<BannerBean>>>() {
@Override
public void onResponse(boolean isFromCache, ApiResponse<List<BannerBean>> result) {
((TextView) findViewById(R.id.textview)).setText(new Gson().toJson(result.data));
Toast.makeText(MainActivity.this, "來自" + (isFromCache ? "緩存" : "網(wǎng)絡(luò)"), Toast.LENGTH_SHORT).show();
}
});
上面這段代碼干了兩件事情:
- 從緩存取數(shù)據(jù)
- 加載網(wǎng)絡(luò)更新緩存
這就是RxCache哎媚,基于RxJava+DiskLruCache實(shí)現(xiàn)的磁盤緩存庫,支持根據(jù)策略自動(dòng)處理網(wǎng)絡(luò)數(shù)據(jù)緩存壹哺。
代碼傳送門:
隨著kotlin的崛起抄伍,協(xié)程、Flow的出現(xiàn)管宵,我思考著使用Flow重寫一下RxCache。這次重寫的過程攀甚,對(duì)Flow有了更深的理解箩朴,和大家一起分享。
這里簡單說下使用方式秋度。
API
初始化
使用前必須先進(jìn)行初始化操作炸庞。
RxCache.initialize(context)
也可以設(shè)置更多參數(shù)
/**
* 初始化
*
* @param cacheDir 緩存目錄
* @param cacheVersion 緩存版本
* @param maxCacheSize 緩存最大size
* @param cacheConverter 緩存Converter
*/
fun initialize(
cacheDir: File,
cacheConverter: GsonCacheConverter = GsonCacheConverter(Gson()),
cacheVersion: Int = 1,
maxCacheSize: Long = MAX_CACHE_SIZE
)
寫入數(shù)據(jù)
// 同步寫入數(shù)據(jù)
RxCache.apply {
put("url", "111")
put("data", BannerBean().apply {
desc = "flutter"
title = "flutter 中文社區(qū)"
})
}
// 異步寫入數(shù)據(jù)
lifecycleScope.launch {
RxCache.rxPut("ulr2","222").collect()
}
讀取數(shù)據(jù)
// 同步讀取數(shù)據(jù)
RxCache.get("url", String::class.java)
// 異步讀取數(shù)據(jù)
lifecycleScope.launch {
RxCache.rxGet("data", BannerBean::class.java).collect {
ToastUtil.toast("rxGet data = ${it?.title}")
}
}
移除某緩存
RxCache.remove("url");
// 異步
lifecycleScope.launch {
RxCache.rxRemove("url").collect()
}
清除全部緩存
// 同步
RxCache.clear()
// 異步
lifecycleScope.launch {
RxCache.rxClear().collect()
RxCache().clearAsync()
}
緩存策略
定義了IStrategy接口,框架內(nèi)部提供了6中緩存策略荚斯,支持自定義埠居。
緩存策略 | 說明 |
---|---|
NO_CACHE | 不使用RxCache進(jìn)行緩存 |
ONLY_REMOTE | 只請(qǐng)求網(wǎng)絡(luò),但數(shù)據(jù)依然會(huì)被緩存 |
ONLY_CACHE | 只加載緩存事期,如離線模式 |
FIRST_REMOTE | 優(yōu)先請(qǐng)求網(wǎng)絡(luò)滥壕,網(wǎng)絡(luò)數(shù)據(jù)無效后,再加載緩存 (如果緩存也沒有兽泣,則會(huì)響應(yīng)網(wǎng)絡(luò)的response or error) |
FIRST_CACHE | 優(yōu)先加載緩存绎橘,緩存沒有再去請(qǐng)求網(wǎng)絡(luò) |
CACHE_AND_REMOTE | 先加載緩存(成功才會(huì)回調(diào)緩存response),不管緩存什么結(jié)果都會(huì)再請(qǐng)求網(wǎng)絡(luò)唠倦。 如果緩存成功称鳞,網(wǎng)絡(luò)請(qǐng)求數(shù)據(jù)無效,則網(wǎng)絡(luò)不回調(diào)稠鼻。 如果緩存成功冈止,網(wǎng)絡(luò)也成功,且網(wǎng)絡(luò)和緩存數(shù)據(jù)相同則只有緩存回調(diào)候齿,網(wǎng)絡(luò)不再二次回調(diào)熙暴,否則會(huì)二次回調(diào) |
網(wǎng)絡(luò)請(qǐng)求
- 生成請(qǐng)求的flow
- 設(shè)置緩存策略
- 設(shè)置cacheKey
- 設(shè)置cacheable闺属,用于判斷數(shù)據(jù)是否有效,有效才進(jìn)行緩存
- buildCacheWithCacheResult構(gòu)建
- flowOn(Dispatchers.IO)指定運(yùn)行在線程中
- catch異常
- collect獲取數(shù)據(jù)
lifecycleScope.launch {
RequestApi(
flow {
emit(ApiClient.create(Api::class.java).getBanner())
}
) // 創(chuàng)建flow
.cacheStrategy(CacheStrategy.CACHE_AND_REMOTE) // 配置緩存策略
.cacheKey("banner") // 設(shè)置緩存key
.cacheable(object : ICacheable<ApiResponse<MutableList<BannerBean>>> { // 判斷數(shù)據(jù)是否有效怨咪,有效才緩存
override fun cacheable(data: ApiResponse<MutableList<BannerBean>>?): Boolean {
return data?.errorCode == 0 && data.data != null
}
})
// .buildCache(object : CacheType<ApiResponse<MutableList<BannerBean>>>() {})
.buildCacheWithCacheResult(object : CacheType<ApiResponse<MutableList<BannerBean>>>() {})//構(gòu)建
.flowOn(Dispatchers.IO)
.catch { // 捕獲異常
it.printStackTrace()
ToastUtil.toast(it.message)
binding.textview.text = null
}
.collect {
ToastUtil.toast("數(shù)據(jù)是否來自緩存:${it.isFromCache}")
binding.textview.text = Gson().toJson(it.data?.data)
}
}
Flow
下面我們通過RxJava與Flow對(duì)比來認(rèn)識(shí)Flow的操作符
對(duì)比 | RxJava | Flow |
---|---|---|
數(shù)據(jù)源 | Observable<T> | Flow<T> |
發(fā)射數(shù)據(jù) | onNext | emit |
改變數(shù)據(jù)發(fā)射的線程 | subscribeOn | flowOn |
改變消費(fèi)數(shù)據(jù)的線程 | observeOn | 協(xié)程launch的時(shí)候指定context |
捕獲異常 | onError | catch或者try-cathch塊 |
完成 | onComplete | onCompletion |
map | map | map |
flatMap | flatMap | flatMapConcat |
compose | compose | let(transformer) |
轉(zhuǎn)換 | transformer | transformer |
去重 | distinct | distinctUntilChanged |
合并 | concatWith | onCompletion { emitAll(other) } |
onErrorResumeNext | onErrorResumeNext | catch { emitAll(fallback) } |
onErrorReturn | onErrorReturn | catch { emit(fallback) } |
壓縮 | zip | zip |
創(chuàng)建flow
創(chuàng)建flow有多種方式
- flowOf
flowOf(1)
flowOf(1, 2, 3)
- asFlow()
方法.asFlow()
(1..3).asFlow()
- flow{ emit(value) }
flow {
emit(1)
}
切換線程
RxJava中可以使用subscribeOn來切換發(fā)射線程屋剑,使用observeOn來指定消費(fèi)線程。而Flow只能通過flowOn來切換發(fā)射線程诗眨,不能切換消費(fèi)線程唉匾。collect執(zhí)行的線程取決于協(xié)程launch時(shí)指定的上下文。
異常
RxJava中使用onError來捕獲異常匠楚。Flow中使用catch{}或者try-cathch()語句來捕獲異常巍膘。推薦使用catch{}。
lifecycleScope.launch {
flow<String> {
emit("111")
throw NullPointerException()
emit("222") // 這個(gè)不會(huì)發(fā)射
}.catch {
it.printStackTrace()
emit("333")
}.flowOn(Dispatchers.Main)
.collect {
println("collect >> $it")
}
}
// 輸出結(jié)果:
collect >> 111
collect >> 333
catch可以調(diào)用多次芋簿,作用范圍是調(diào)用catch之前的代碼峡懈。catch內(nèi)部可以調(diào)用emit(value)或者emitAll(flow)再次發(fā)射數(shù)據(jù)。
onCompletion
不管onCompletion之前是否發(fā)生了異常与斤,都會(huì)回調(diào)該方法肪康。只有onCompletion之前發(fā)生了異常且沒有被catch,參數(shù)cause才不會(huì)空撩穿,參數(shù)是throwable磷支。
lifecycleScope.launch {
flow<String> {
emit("111")
throw NullPointerException()
emit("222") // 這個(gè)不會(huì)發(fā)射
}.catch {
it.printStackTrace()
emit("333")
}.onCompletion { throwable ->
if (throwable != null) { // 發(fā)生了異常
emit("444")
} else {
emit("555")
}
}.flowOn(Dispatchers.Main)
.collect {
println("collect >> $it")
}
}
// 輸出結(jié)果:
collect >> 111
collect >> 333
collect >> 555
雖然發(fā)生了異常,但由于onCompletion被catch住了食寡,所以到onCompletion時(shí)是不存在異常的雾狈,所以throwable==null。如果前面沒有catch抵皱,那么throwable就是上面的NullPointerException善榛。
CacheAndRemoteStrategy
下面我們通過分析緩存策略CacheAndRemoteStrategy的實(shí)現(xiàn)過程,來簡單分析下Flow的操作符呻畸。
策略:先加載緩存(成功才會(huì)回調(diào)緩存response)移盆,不管緩存什么結(jié)果都會(huì)再請(qǐng)求網(wǎng)絡(luò)。
如果緩存成功擂错,網(wǎng)絡(luò)請(qǐng)求數(shù)據(jù)無效味滞,則網(wǎng)絡(luò)不回調(diào)
如果緩存成功,網(wǎng)絡(luò)也成功钮呀,且網(wǎng)絡(luò)和緩存數(shù)據(jù)相同則只有緩存回調(diào)剑鞍,網(wǎng)絡(luò)不再二次回調(diào),否則會(huì)二次回調(diào)
- 首先我們是要發(fā)射兩個(gè)數(shù)據(jù)源爽醋,一個(gè)是cache一個(gè)是net蚁署。在RxJava中,我們可以使用concatWith來組合兩個(gè)Observable蚂四。
Flow也有這個(gè)操作符光戈,不過我們看下實(shí)現(xiàn)哪痰。
@Deprecated(
level = DeprecationLevel.ERROR,
message = "Flow analogue of 'concatWith' is 'onCompletion'. Use 'onCompletion { emit(value) }'",
replaceWith = ReplaceWith("onCompletion { emit(value) }")
)
public fun <T> Flow<T>.concatWith(value: T): Flow<T> = noImpl()
這個(gè)擴(kuò)展方法是不能調(diào)用的,讓我們使用onCompletion { emit(value) }實(shí)現(xiàn)久妆。
還有subscribe晌杰、compose、onErrorXxx等RxJava的常用操作符筷弦±哐荩看到這里,我有點(diǎn)懷疑烂琴,官方是故意這么設(shè)計(jì)的爹殊。參考RxJava的常用操作符,讓我們可以快速熟悉使用奸绷。
不得不說梗夸,F(xiàn)low的操作符的實(shí)現(xiàn)更簡單,沒有各種復(fù)雜的操作符号醉,而是通過簡單的集中操作符組合來實(shí)現(xiàn)功能反症,更方便理解和使用。
- loadCache返回cache的Flow畔派。onCompletion在cache執(zhí)行完成之后惰帽,通過emitAll發(fā)射網(wǎng)絡(luò)數(shù)據(jù)。
前面說過父虑,如果loadCache發(fā)生了異常(無緩存時(shí),內(nèi)部會(huì)拋出NoCacheException)授药,onCompletion中可以拿到這個(gè)異常士嚎。
我們看下網(wǎng)絡(luò)Flow都干了什么。
- netSource通過flatMapConcat來判斷網(wǎng)絡(luò)數(shù)據(jù)是否有效悔叽,有效則直接發(fā)射netResult莱衩、如果無效,則判斷當(dāng)前緩存是否有效娇澎。
if (cacheEx != null) 意味著loadCache發(fā)生了異常笨蚁,沒有緩存,那么我們發(fā)射netResult趟庄。這么做的目的是能響應(yīng)錯(cuò)誤數(shù)據(jù)括细,如響應(yīng)不同的錯(cuò)誤碼,或者toast 錯(cuò)誤信息戚啥。
如果有緩存奋单,則發(fā)射一個(gè)空的Flow,跳過netFlow猫十。
再往下看览濒,一個(gè)catch呆盖。
- 這一步的目的是,如果netSource發(fā)生了異常贷笛,如網(wǎng)絡(luò)相關(guān)異常应又、數(shù)據(jù)解析異常等,那么上面的flatMapConcat里的代碼就不會(huì)被執(zhí)行了乏苦。
在catch中株扛,我們判斷是否有緩存,如果沒有緩存邑贴,則拋出netSource發(fā)生的異常席里。
繼續(xù)往下看,distinctUntilChanged的作用是去重
- 判斷兩個(gè)數(shù)據(jù)源response是否一致拢驾,一致則不再觸發(fā)collect二次回調(diào)奖磁。
下面還有個(gè)catch
- 這一步的目的是捕獲住無緩存的異常,如果不是NoCacheException則拋出繁疤。
至此咖为,我們就實(shí)現(xiàn)了策略CacheAndRemoteStrategy。這算是一個(gè)比較復(fù)雜的Flow的場景了稠腊。
- 組合Flow
- Flow的數(shù)據(jù)轉(zhuǎn)換flatMapConcat
- 發(fā)射數(shù)據(jù)flowOf
- 發(fā)射emptyFlow
- 異常處理躁染,多次catch的使用
- 異常中發(fā)射數(shù)據(jù)emitAll
- 去重distinctUntilChanged
override fun <T> execute(
cache: RxCache,
cacheKey: String,
netSource: Flow<CacheResult<T?>>,
type: Type
): Flow<CacheResult<T?>> {
return loadCache<T>(cache, cacheKey, type)
.onCompletion { cacheEx ->
// 判斷是否發(fā)生異常
emitAll(
netSource.flatMapConcat { netResult ->
// 如果網(wǎng)絡(luò)數(shù)據(jù)有效則正常處理
if (netResult.cacheable) {
flowOf(netResult)
} else {
// 如果網(wǎng)絡(luò)數(shù)據(jù)是無效的,緩存也是無效的架忌,則拋出網(wǎng)絡(luò)的結(jié)果吞彤。如果有緩存,則網(wǎng)絡(luò)結(jié)果不再分發(fā)
if (cacheEx != null) { // 沒有緩存
flowOf(netResult)
} else {
emptyFlow()
}
}
}.catch { netEx ->
// 網(wǎng)絡(luò)請(qǐng)求發(fā)生了異常叹放,根據(jù)是否有緩存判斷如何分發(fā)
if (cacheEx != null) { // 沒有緩存饰恕,則分發(fā)網(wǎng)絡(luò)結(jié)果
throw netEx
} else { // 有緩存則不發(fā)射網(wǎng)絡(luò)結(jié)果
emitAll(emptyFlow())
}
}
)
}
.distinctUntilChanged { old, new ->
// 如果網(wǎng)絡(luò)數(shù)據(jù)和緩存數(shù)據(jù)一致,則只發(fā)射一次
if (old.data == null || !new.cacheable) { // 網(wǎng)絡(luò)無數(shù)據(jù)或沒有緩存
false
} else {
isDataSame(old.data, new.data)
}
}
.catch { // 捕獲NoCacheException
if (it !is NoCacheException) {
throw it
}
}
}
通過這次重寫井仰,不得不說埋嵌,F(xiàn)low是真的爽。
現(xiàn)在Kotlin越來越流行了俱恶,協(xié)程也逐漸興起雹嗦,新知識(shí)更新很快,一不留神合是,就落后了了罪。我個(gè)人覺得協(xié)程和Flow都很簡單,只要對(duì)比這RxJava端仰,然后多加練習(xí)捶惜,很快就能上手。