lift方法
lift方法涉及到 Operator 接口, 先看一下 Operator 接口的定義
public interface Operator<R, T> extends Func1<Subscriber<? super R>, Subscriber<? super T>> {
// cover for generics insanity
}
package rx.functions;
/**
* Represents a function with one argument.
* @param <T> the first argument type
* @param <R> the result type
*/
public interface Func1<T, R> extends Function {
R call(T t);
}
所以O(shè)perator接口可以看成
//偽代碼
public interface Operator<R, T> {
public Subscriber<R> call(Subscriber<T> subscriber);
}
Operator的簡單實現(xiàn)
//偽代碼
//將事件序列中的 Integer 對象轉(zhuǎn)換為 String 對象
Operator operator = new Observable.Operator() {
@override
public Subscriber call(final Subscriber subscriber) {
return new Subscriber<Integer>() {
@override
public void onNext(Integer integer) {
String string = "" + integer; //轉(zhuǎn)換操作
subscriber.onNext(string);
}
@override
public void onCompleted() {
subscriber.onCompleted();
}
@override
public void onError(Throwable e) {
subscriber.onError(e);
}
};
}
});
上面的代碼里Operator接口實現(xiàn)中新建一個Subscriber對象并在調(diào)用傳參來的subscriber的onNext方法前做了一個特定操作, 所以只是對Subscriber對象做了一層代理.
lift方法源碼
//偽代碼
//Observable中l(wèi)ift方法的實現(xiàn)
public class Observable<T> {
//...
Observable.OnSubscribe onSubscribe;
public Observable lift(Operator operator) {
return Observable.create(new Observable.OnSubscribe() {
@override
public void call(Subscriber subscriber) {
Subscriber newSubscriber = operator.call(subscriber);
newSubscriber.onStart();
onSubscribe.call(newSubscriber); //關(guān)鍵調(diào)用
}
});
}
//...
}
lift方法中新建了一個Observable對象并在Observable.OnSubscribe的實現(xiàn)中調(diào)用傳參的Operator對象的call方法新建了一個Subscriber對象, 并調(diào)用了onSubscribe.call(newSubscriber), 注意call方法傳參的是newSubscriber對象而不是原始的subscriber對象.
lift方法的用法
//偽代碼
// lift方法將Int類型轉(zhuǎn)換成String類型
1 Observable.create(new Observable.OnSubscribe() {
2 @override
3 public void call(Subscriber subscriber) {
4 subscriber.onNext(1); //Int類型
5 subscriber.onCompleted();
6 }
7 })
8
9 .lift(operator) // Int --> String
10
11 .subscribe(new Subscriber() {
12 @override
13 public void onNext(String s) { //String類型
14 Log.d(tag, "Item: " + s);
15 }
16
17 @override
18 public void onCompleted() {
19 Log.d(tag, "Completed!");
20 }
21
22 @override
23 public void onError(Throwable e) {
24 Log.d(tag, "Error!");
25 }
26 });
結(jié)合fit方法源碼和Operator的實現(xiàn)可以整理出程序運行流程:
- 因為fit返回了一個新Observable對象,所以第11行是調(diào)用這個新Observable對象的subscribe方法, 所以會執(zhí)行fit方法中構(gòu)造新Observable對象的Observable.OnSubscribe接口的call方法
return Observable.create(new Observable.OnSubscribe() {
@override
public void call(Subscriber subscriber) {
Subscriber newSubscriber = operator.call(subscriber);
newSubscriber.onStart();
onSubscribe.call(newSubscriber); //關(guān)鍵調(diào)用
}
});
- 執(zhí)行onSubscribe.call(newSubscriber)方法時, 會調(diào)用構(gòu)造原始Observable對象中的Observable.OnSubscribe接口的call方法, 所以call(Subscriber subscriber)方法中的參數(shù)subscriber是operator.call(subscriber)創(chuàng)建newSubscriber對象
3 public void call(Subscriber subscriber) {
4 subscriber.onNext(1); //Int類型
5 subscriber.onCompleted();
6 }
- 接著調(diào)用第4行subscriber.onNext(1)方法就會執(zhí)行了newSubscriber對象的onNext方法:
public void onNext(Integer integer) {
String string = "" + integer; //轉(zhuǎn)換操作
subscriber.onNext(string);
}
該方法中的subscriber是原始subscriber對象,這時根據(jù)Int生成了String參數(shù)
- 調(diào)用subscriber.onNext(string) 方法時就會執(zhí)行第13行
13 public void onNext(String s) { //String類型
14 Log.d(tag, "Item: " + s);
15 }
這樣流程就執(zhí)行完成了
小結(jié): 這種連續(xù)調(diào)用類似于裝飾者模式(Decorator Pattern), 即
BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
在 RxJava 的內(nèi)部flatMap就是基于基礎(chǔ)的lift方法, RxJava中提供的各種變換雖然功能各有不同煌珊,但實質(zhì)上都是針對事件序列的處理和再發(fā)送.