- 原文鏈接: RxJava's repeatWhen and retryWhen, explained
- 原文作者: Daniel Lew
- 譯文出自: 小鄧子的簡書
- 譯者: 小鄧子
- 狀態(tài): 完成
- 譯者注:為了方便因Lambda(譯文)還不夠了解的同學(xué)進(jìn)行閱讀器仗,本篇譯文替換了原文中全部Lambda表達(dá)式。
第一次見到.repeatWhen()和.retryWhen()這兩個操作符的時(shí)候就非常困惑了童番。不得不說精钮,它們絕對是“最令人困惑彈珠圖”的有力角逐者。
然而它們都是非常有用的操作符:允許你有條件的重新訂閱已經(jīng)結(jié)束的Observable
剃斧。我最近研究了它們的工作原理轨香,現(xiàn)在我希望嘗試著去解釋它們(因?yàn)椋乙彩呛馁M(fèi)了一些精力才參透它們)幼东。
Repeat與Retry的對比
首先臂容,來了解一下.repeat()和.retry()之間最直觀的區(qū)別是什么?這個問題并不難:區(qū)別就在于什么樣的終止事件會觸發(fā)重訂閱根蟹。
當(dāng).repeat()接收到.onCompleted()事件后觸發(fā)重訂閱脓杉。
當(dāng).retry()接收到.onError()事件后觸發(fā)重訂閱。
然而简逮,這種簡單的敘述尚不能令人滿意球散。試想如果你要實(shí)現(xiàn)一個延遲數(shù)秒的重訂閱該如何去做?或者想通過觀察錯誤來決定是否應(yīng)該重訂閱呢散庶?這種情況下就需要.repeatWhen()
和.retryWhen()
的介入了蕉堰,因?yàn)樗鼈冊试S你為重試提供自定義邏輯凌净。
Notification Handler
你可以通過一個叫做notificationHandler
的函數(shù)來實(shí)現(xiàn)重試邏輯。這是.retryWhen()
的方法簽名(譯者注:方法簽名屋讶,指方法名稱冰寻、參數(shù)類型和參數(shù)數(shù)量等):
retryWhen(Func1<? super Observable<? extends java.lang.Throwable>,? extends Observable<?>> notificationHandler)
簽名很長,甚至不能一口氣讀完皿渗。我發(fā)現(xiàn)它很難理解的原因是因?yàn)榇嬖谝淮蠖训姆盒图s定性雄。
簡化后,它包括三個部分:
-
Func1
像個工廠類羹奉,用來實(shí)現(xiàn)你自己的重試邏輯秒旋。 - 輸入的是一個
Observable<Throwable>
。 - 輸出的是一個
Observable<?>
诀拭。
首先迁筛,讓我們來看一下最后一部分。被返回的Observable<?>
所要發(fā)送的事件決定了重訂閱是否會發(fā)生耕挨。如果發(fā)送的是onCompleted
或者onError
事件细卧,將不會觸發(fā)重訂閱。相對的筒占,如果它發(fā)送onNext
事件贪庙,則觸發(fā)重訂閱(不管onNext
實(shí)際上是什么事件)。這就是為什么使用了通配符作為泛型類型:這僅僅是個通知(next, error或者completed)翰苫,一個很重要的通知而已止邮。
source每次一調(diào)用onError(Throwable)
,Observable<Throwable>
都會被作為輸入傳入方法中奏窑。換句話說就是导披,它的每一次調(diào)用你都需要決定是否需要重訂閱。
當(dāng)訂閱發(fā)生的時(shí)候埃唯,工廠Func1
被調(diào)用撩匕,從而準(zhǔn)備重試邏輯。那樣的話墨叛,當(dāng)onError
被調(diào)用后止毕,你已經(jīng)定義的重試邏輯就能夠處理它了。
這里有個例子展示了我們應(yīng)該在哪些場景下訂閱source漠趁,比如扁凛,只有在Throwable
是IOException
的情況下請求重訂閱,否則不(重訂閱)棚潦。
source.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
@Override public Observable<?> call(Observable<? extends Throwable> errors) {
return errors.flatMap(new Func1<Throwable, Observable<?>>() {
@Override public Observable<?> call(Throwable error) {
// For IOExceptions, we retry
if (error instanceof IOException) {
return Observable.just(null);
}
// For anything else, don't retry
return Observable.error(error);
}
});
}
})
由于每一個error都被flatmap過令漂,因此我們不能通過直接調(diào)用.onNext(null)
觸發(fā)重訂閱或者.onError(error)
來避免重訂閱。
經(jīng)驗(yàn)之談
這里有一些關(guān)于.repeatWhen()
和.retryWhen()
的要點(diǎn)丸边,我們應(yīng)該牢記于心叠必。
.repeatWhen()
與.retryWhen()
非常相似,只不過不再響應(yīng)onError
作為重試條件妹窖,而是onCompleted
纬朝。因?yàn)?code>onCompleted沒有類型,所有輸入變?yōu)?code>Observable<Void>骄呼。每一次事件流的訂閱
notificationHandler
(也就是Func1
)只會調(diào)用一次共苛。這也是講得通的,因?yàn)槟阌幸粋€可觀測的Observable<Throwable>
蜓萄,它能夠發(fā)送任意數(shù)量的error隅茎。輸入的
Observable
必須作為輸出Observable
的源。你必須對Observable<Throwable>
做出反應(yīng)嫉沽,然后基于它發(fā)送事件辟犀;你不能只返回一個通用泛型流。
換言之就是绸硕,你不能做類似的操作:
.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
@Override public Observable<?> call(Observable<? extends Throwable> errors) {
return Observable.just(null);}
})
因?yàn)樗粌H不能奏效堂竟,而且還會打斷你的鏈?zhǔn)浇Y(jié)構(gòu)。你應(yīng)該做的是玻佩,而且至少應(yīng)該做的是出嘹,把輸入作為結(jié)果返回,就像這樣:
.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
@Override public Observable<?> call(Observable<? extends Throwable> errors) {
return errors;
}
})
(順便提一下咬崔,這在邏輯上與單純使用.retry()
操作符的效果是一樣噠)
- 輸入
Observable
只在終止事件發(fā)生的時(shí)候才會觸發(fā)(對于.repeatWhen()
來說是onCompleted
,而對于.retryWhen()
來說是onError
)税稼。它不會從源中接收到任何onNext
的通知,所以你不能通過觀察被發(fā)送的事件來決定重訂閱垮斯。如果你真的需要這樣做娶聘,你應(yīng)該添加像.takeUntil()
這樣的操作符,來攔截事件流甚脉。
使用方式
現(xiàn)在丸升,假設(shè)你已大概了解了.repeatWhen()
和.retryWhen()
,那么你能將一些什么樣的精簡邏輯放入到notificationHandler
中呢牺氨?
使用.repeatWhen()
+ .delay()
定期輪詢數(shù)據(jù):
source.repeatWhen(new Func1<Observable<? extends Void>, Observable<?>>() {
@Override public Observable<?> call(Observable<? extends Void> completed) {
return completed.delay(5, TimeUnit.SECONDS);
}
})
直到notificationHandler
發(fā)送onNext()
才會重訂閱到source狡耻。因?yàn)樵诎l(fā)送onNext()
之前delay
了一段時(shí)間,所以優(yōu)雅的實(shí)現(xiàn)了延遲重訂閱猴凹,從而避免了不間斷的數(shù)據(jù)輪詢夷狰。
非此即彼,使用.flatMap()
+ .timer()
實(shí)現(xiàn)延遲重訂閱:
(譯者注:在RxJava 1.0.0及其之后的版本郊霎,官方已不再提倡使用.timer()
操作符沼头,因?yàn)?code>.interval()具有同樣的功能)
source.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
@Override public Observable<?> call(Observable<? extends Throwable> errors) {
return errors.flatMap(new Func1<Throwable, Observable<?>>() {
@Override public Observable<?> call(Throwable error) {
return Observable.timer(5, TimeUnit.SECONDS);
}
});
}
})
當(dāng)需要與其他邏輯協(xié)同的時(shí)候,這種替代方案就變得非常有用了,比如进倍。土至。。
使用.zip()
+ .range()
實(shí)現(xiàn)有限次數(shù)的重訂閱
source.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
@Override public Observable<?> call(Observable<? extends Throwable> errors) {
return errors.zipWith(Observable.range(1, 3), new Func2<Throwable, Integer, Integer>() {
@Override public Integer call(Throwable throwable, Integer i) {
return i;
}
});
}
})
最后的結(jié)果就是每個error都與range
中一個輸出配對出現(xiàn)猾昆,就像這樣:
zip(error1, 1) -> onNext(1) <-- Resubscribe
zip(error2, 2) -> onNext(2) <-- Resubscribe
zip(error3, 3) -> onNext(3) <-- Resubscribe
onCompleted() <-- No resubscription
因?yàn)楫?dāng)?shù)谒拇蝒rror出現(xiàn)的時(shí)候陶因,range(1,3)
中的數(shù)字已經(jīng)耗盡了,所以它隱式調(diào)用了onCompleted()
垂蜗,從而導(dǎo)致整個zip
的結(jié)束楷扬。防止了進(jìn)一步的重試。
將可變延遲策略與次數(shù)限制的重試機(jī)制結(jié)合起來
source.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
@Override public Observable<?> call(Observable<? extends Throwable> errors) {
return errors.zipWith(Observable.range(1, 3), new Func2<Throwable, Integer, Integer>() {
@Override public Integer call(Throwable throwable, Integer i) {
return i;
}
}).flatMap(new Func1<Integer, Observable<? extends Long>>() {
@Override public Observable<? extends Long> call(Integer retryCount) {
return Observable.timer((long) Math.pow(5, retryCount), TimeUnit.SECONDS);
}
});
}
})
在這種用例的比較上贴见,我認(rèn)為.flatMap()
+.timer()
的組合比單純使用.delay()
更可取烘苹,因?yàn)槲覀兛梢酝ㄟ^重試次數(shù)來修改延遲時(shí)間。重試三次片部,并且每一次的重試時(shí)間都是5 ^ retryCount
镣衡,僅僅通過一些操作符的組合就幫助我們實(shí)現(xiàn)了指數(shù)退避算法(譯者注:可參考二進(jìn)制指數(shù)退避算法)。