上一篇文章 寫了一個(gè)極其簡化的 Rxjava Observable ,現(xiàn)在,我試圖添加一個(gè) map
操作符卫键。
public <R> Observable<R> map(Func1<T, R> func1) {
return new Observable<R>(subscriber -> this.subscribe(new Subscriber<T>() {
@Override
public void onCompleted() {
subscriber.onCompleted();
}
@Override
public void onError(Throwable e) {
subscriber.onError(e);
}
@Override
public void onNext(T t) {
R r = null;
try {
r = func1.call(t);
} catch (Throwable e) {
unsubscribe();
return;
}
subscriber.onNext(r);
}
@Override
public void unsubscribe() {
subscriber.unsubscribe();
}
@Override
public boolean isUnsubscribed() {
return subscriber.isUnsubscribed();
}
}));
}
Java 本身語言限制,導(dǎo)致代碼臃腫虱朵。代碼的核心部分就是
return new Observable<R>(subscriber -> this.subscribe(new Subscriber<T>() {
@Override
public void onNext(T t) {
R r = null;
try {
r = func1.call(t);
} catch (Throwable e) {
unsubscribe();
return;
}
subscriber.onNext(r);
}
}
這里看到以下幾點(diǎn)
-
map
接收兩個(gè)參數(shù) 永罚,注,對于成員函數(shù)卧秘,第一個(gè)參數(shù)是this
呢袱。 - 第一個(gè)參數(shù)是
Observable<T> this
, - 第二個(gè)參數(shù)是
Func1<T,R> func1
;-
func1
接收一個(gè)參數(shù)T
-
func1.call(t)
返回一個(gè)R
-
-
map
要返回一個(gè)Observable<R>
翅敌,那么就要在OnSubscribe
的時(shí)候羞福,需要從this
里面得到一個(gè)個(gè)T t
,然后用func1.call(t)
蚯涮,然后轉(zhuǎn)移給下一個(gè)subscriber
治专。
因?yàn)? Java8 lambda 關(guān)鍵字的引入卖陵,我們看到函數(shù)式編程中的 variable capture 的強(qiáng)大。
這是一個(gè)非常簡化的 map
實(shí)現(xiàn)张峰,還有很多問題泪蔫。
- 還有非常多的操作符和
map
很類似,這里有很多重復(fù)代碼喘批。 - backpressure 沒有處理撩荣。
-
unsubscribe
還沒有處理好。subscriber 鏈的關(guān)系沒有處理饶深。 - 異常也沒有處理好餐曹。
- 沒有保證
onComplete
只被調(diào)用一次 。
這個(gè)簡化的實(shí)現(xiàn)盡管有很多問題敌厘,但是可以幫助我們理解原有復(fù)雜完整的實(shí)現(xiàn)台猴。Map
的核心結(jié)構(gòu)是這樣
- 本身含有一個(gè)
Subscriber
對象,訂閱上層的Observable
- 返回一個(gè)
Observable
對象俱两,提供給下層訂閱饱狂。 - 這種方法組合了
Observable
,構(gòu)成了一個(gè)鏈條宪彩。
OnSubscribe<?> onSubscribe = subscriber /*傳遞進(jìn)來的 subscriber 參數(shù)休讳,給下層產(chǎn)生數(shù)據(jù)*/
-> {
/* this 是上層的 Observable,訂閱上層 */
this.subscribe(new Subscriber<T>() {
@Override
public void onNext(T t) {
R r = null;
try {
r = func1.call(t);
} catch (Throwable e) {
unsubscribe();
return;
}
/* 當(dāng)上層產(chǎn)生數(shù)據(jù)的時(shí)候毯焕,經(jīng)過轉(zhuǎn)換,傳遞給下層*/
subscriber.onNext(r);
}
};
Observable<R> ret = new Observable<R>(onSubscribe);
return ret;
}