一缤言、實現(xiàn)的思路
RxJava 提供了很多的變換操作符,將上游的數(shù)據(jù)轉(zhuǎn)換成另一種數(shù)據(jù)宝当,可以在傳送數(shù)據(jù)流的過程中構(gòu)造一個新的類,這個類即持有上游的 Observable 也持有下游的 Observer 和 變換的能力來實現(xiàn)
二胆萧、具體代碼
1.創(chuàng)建 map 操作符
map 操作符本身持有 ObservableOnSubscribe 對象庆揩,這里將 ObservableOnSubscribe 交給一個 ObservableMap 讓它持有了上游的能力。同時也持有了變換的能力
/**
* 自定義 map 操作符
* map 操作符會拿到 SelfObservableMap 所持有的 Observable 對象
* 交給一個 ObservableMap 對象 然后替換一個新的(持有 ObservableMap 這個 ObservableOnSubscribe 的引用跌穗,已經(jīng)一個轉(zhuǎn)換的函數(shù))
* 即擁有控制上一層的能力 也擁有控制下一層的能力
* <p>
* 變換操作符只考慮上一層的類型 變換成新的類型后 給到下一層
*/
public <R> SelfObserverable<R> map(CFunction<? super T, R> function) {
ObservableMap observableMap = new ObservableMap(observable, function);
// 這里會把 source 替換成 ObservableMap
// 在 subscribe 時實際調(diào)用的 observable map 所持有的 source 的 subscribe 方案
return new SelfObserverable<>(observableMap);
}
2.ObservableMap 同時提供了 subscribe 函數(shù)订晌,讓它持有了下游的能力
/**
* 為 Map 專門定義的 ObservableOnSubscribe
* T 接收的類型 R 返回的類型
*
*/
public class ObservableMap<T,R> implements ObservableOnSubscribe<R> {
// 這里持有的是第一次通過 create 函數(shù)創(chuàng)建持有的 ObservableOnSubscribe 對象
private ObservableOnSubscribe observable;
private CFunction<? super T, ? extends R> function;
public ObservableMap(ObservableOnSubscribe source, CFunction<? super T, ? extends R> function) {
this.observable = source;
this.function = function;
}
/**
* observableEmitter 是外層通過 subscribe 函數(shù)傳遞進來的 Observer 類
* @param observer
*/
@Override
public void subscribe(Observer<? super R> observer) {
// SelfObserverable 調(diào)用 subscribe 函數(shù)時 實際上調(diào)用的是
MapObserver mapObserver = new MapObserver(observer, function);
// observable 是通過 create 或者 just 傳遞進來的 observable,現(xiàn)在交給了它一個 MapObserver
/**
* observable ----> create 的 new ObservableOnSubscribe
* // 使用 Map 操作符
* SelfObserverable.create(new ObservableOnSubscribe<Integer>() {
* @Override
* public void subscribe(Observer<? super Integer> observableEmitter) {
* observableEmitter.onNext(1);
* observableEmitter.onComplete();
* }
* })
*
* 這個 observable.subscribe 調(diào)用的就是 ObservableOnSubscribe 的 subscribe 函數(shù)的方法瞻离。
* 會觸發(fā) MapObserver 類的 onNext 和 onComplete
*/
observable.subscribe(mapObserver);
}
3.在 subscribe 時又對 observer 做了進一步的包裝,讓被觀察者 ObservableOnSubscribe 持有了 MapObserver 這個新的觀察者類腾仅。在調(diào)用 onNext 時首先調(diào)用 CFunction 函數(shù)進行一次數(shù)據(jù)轉(zhuǎn)換
/**
* 對 Observer 的包裝
*
* @param <T>
*/
class MapObserver<T> implements Observer<T> {
Observer<? super R> observableEmitter;
ObservableOnSubscribe source;
CFunction<? super T, ? extends R> function;
/**
*
* @param observableEmitter 通過 subscribe 傳遞進來的觀察者
* @param function function 是負(fù)責(zé)轉(zhuǎn)換的函數(shù)
*/
public MapObserver(Observer<? super R> observableEmitter,
CFunction<? super T, ? extends R> function) {
this.observableEmitter = observableEmitter;
this.function = function;
}
@Override
public void onSubscirbe() {
observableEmitter.onSubscirbe();
}
@Override
public void onNext(T value) {
// 將轉(zhuǎn)換后的值交給 onNext
R next = function.apply(value);
observableEmitter.onNext(next);
}
@Override
public void onComplete() {
observableEmitter.onComplete();
}
@Override
public void onError(Throwable throwable) {
observableEmitter.onError(throwable);
}
}
四、流程示例
RxJava-2.jpg
五套利、使用示例
// 使用 Map 操作符
SelfObserverable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(Observer<? super Integer> observableEmitter) {
observableEmitter.onNext(1);
observableEmitter.onComplete();
}
}).map(new CFunction<Integer, String>() {
@Override
public String apply(Integer integer) {
return "我被轉(zhuǎn)換啦";
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscirbe() {
Log.d("TAG", "on subscribe with map");
}
@Override
public void onNext(String value) {
Log.d("TAG", "我被轉(zhuǎn)換啦");
}
@Override
public void onComplete() {
}
@Override
public void onError(Throwable throwable) {
}
});