RxJava 2.0配置
在項目Build.gradle文件里面添加如下代碼汛骂,即可:
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
compile 'io.reactivex.rxjava2:rxjava:2.0.1'
基本操作
1.Observable.create()
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
for(int i=0; i<5; i++){
e.onNext(String.valueOf(i));
}
e.onComplete();
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println(s);
}
});
a). 傳入ObservableOnSubscribe 里面就一個方法subscribe井佑,傳入一個可觀察的數(shù)據(jù)發(fā)射器彩郊,繼承關(guān)系:ObservableEmitter -> Emitter尸诽。Emitter代表發(fā)射器,代碼如下:
public interface Emitter<T> {
void onNext(T value);
void onError(Throwable error);
void onComplete();
}
這和RxJava1.0使用的Subscriber的主要功能是一模一樣的吕晌,提供數(shù)據(jù)源的發(fā)送熄诡。
public interface Subscriber<T> {
public void onSubscribe(Subscription s);
public void onNext(T t);
public void onError(Throwable t);
public void onComplete();
}
b).我們常用的subscribe方法列表彼宠,如下:
Disposable subscribe(Consumer<? super T> onNext)
Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError)
Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe)
void subscribe(Observer<? super T> observer)
Disposable定義如下趣席,與RxJava1.0 Subscription對應(yīng)兵志,提供流的解除訂閱:
public interface Disposable {
void dispose();
boolean isDisposed();
}
Observer定義如下:
public interface Observer<T> {
void onSubscribe(Disposable d);
void onNext(T t);
void onError(Throwable e);
void onComplete();
}
其中醇蝴,我們將看不到RxJava1.0的action1之類的接口宣肚,取而代之的是與Java8命名類似的函數(shù)式接口。
查看源碼悠栓,subscribe前幾個方法最后都是調(diào)用的都是subscribe(Observer observer)霉涨,如下:
public Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete, Consumer<? super Disposable> onSubscribe) {
...
LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);
subscribe(ls);
return ls;
}
LambdaObserver簡單封裝了我們傳入的函數(shù)接口,作為一個Observer惭适。
我們再看最基本的方法subscribe(Observer)的實現(xiàn)笙瑟,如下:
public final void subscribe(Observer<? super T> observer) {
observer = RxJavaPlugins.onSubscribe(this, observer);
subscribeActual(observer);
}
RxJavaPlugins就是以前RxJava的RxJavaHooks,可以看成代理包裝癞志,不設(shè)置就是直接返回原對象往枷,可以忽略,最后調(diào)用subscribeActual()這個是抽象方法凄杯,真正調(diào)用流都是在這個方法里面進一步實現(xiàn)的错洁。
2.分析Observable.just()調(diào)用
示例代碼,打印“5”戒突,如下:
Disposable d = Observable.just("5").subscribe(s -> System.out.println(s));
Observable.just 是最簡單的rxjava操作了屯碴,就是生成數(shù)據(jù),如下:
public static <T> Observable<T> just(T item) {
return RxJavaPlugins.onAssembly(new ObservableJust<T>(item));
}
很簡單膊存,生成一個ObservableJust對象导而,內(nèi)容如下:
public final class ObservableJust<T> extends Observable<T> implements ScalarCallable<T> {
private final T value;
public ObservableJust(final T value) {
this.value = value;
}
@Override
protected void subscribeActual(Observer<? super T> s) {
ScalarDisposable<T> sd = new ScalarDisposable<T>(s, value);
s.onSubscribe(sd);
sd.run();
}
@Override
public T call() {
return value;
}
}
所以,我們調(diào)用Observable.just()隔崎,會拿到我們調(diào)用鏈的第一個Observable對象---ObservableJust今艺,它繼承了Observable<T>并實現(xiàn)subscribeActual()方法。如上面示例代碼爵卒,根據(jù)前面分析虚缎,我們將打印的Consumer-->轉(zhuǎn)換為了LambdaObserver,傳入了ObservableJust對象的subscribeActual(LambdaObserver)技潘。
下一步遥巴,傳入obersver和value,生成ScalarDisposable對象享幽,直接調(diào)用該對象的run方法铲掐,完成了整個調(diào)用過程。ScalarDisposable.run方法值桩,如下:
@Override
public void run() {
if (get() == START && compareAndSet(START, ON_NEXT)) {
observer.onNext(value);
if (get() == ON_NEXT) {
lazySet(ON_COMPLETE);
observer.onComplete();
}
}
}
忽略其他代碼摆霉,基本看出,調(diào)用了我們傳入的observer.onNext() -> observer.onComplete(),完成打印携栋。
3.組合Observable.just() 和 Observable.map()
示例代碼搭盾,將數(shù)字轉(zhuǎn)換為字符串,并打印婉支,如下:
Disposable d = Observable.just(1).map(i -> String.valueOf(i)).subscribe(s -> System.out.println(s));
我們知道鸯隅,Observable.just()返回了一個ObservableJust對象,也就是Observable<T>的實例向挖,因此蝌以,上述等式相當于,如下:
new ObservableJust().map(xxx);
因此我們先記住何之,當前對象是ObservableJust跟畅,打開map方法定義,如下:
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
可以看到溶推,返回了一個ObservableMap對象徊件,該對象傳入了一個this,也就是當前的Observable對象蒜危,所以我們可以知道虱痕,如果這個操作符是轉(zhuǎn)換用的,肯定會傳入this當前observable對象舰褪,而Just操作是數(shù)據(jù)源的開頭皆疹,所以不需要。ObservableMap代碼如下:
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends U> function;
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);
this.function = function;
}
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
}
一樣的我們點開AbstractObservableWithUpstream<T, U>知道它繼承Observable<U>對象占拍,轉(zhuǎn)換流的能力抽象略就,從原始的T類型流,轉(zhuǎn)換為我們需要的U類型流晃酒。ObservableMap的構(gòu)造函數(shù)傳入了原始的T類型流表牢,也就是我們的ObservableJust<Integer>類型流,以及我們的轉(zhuǎn)換函數(shù)接口贝次,將subscribeActual具體實現(xiàn)的時候崔兴,ObservableJust收到的Observer對象是個類似Observer<Integer>代理對象,里面封裝了map的轉(zhuǎn)換邏輯和原始的observer<String>對象蛔翅,到這里ObservableJust的數(shù)據(jù)就全部接入MapObserver里面敲茄。MapObserver代碼如下:
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}
@Override
public void onNext(T t) {
...
U v;
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper
actual.onNext(v);
}
}
代碼可以看出,收到ObservableJust對象的Integer值后山析,通過轉(zhuǎn)換函數(shù)接口Function堰燎,將T類型轉(zhuǎn)換了U類型,也就是我們要的String類型笋轨,然后發(fā)送給實際的接收者秆剪,完成整個流過程赊淑。偽代碼如下:
方法組合過程:
ObservableJust just = new ObservableJust(數(shù)據(jù));
ObservableMap map = new ObservableMap (just, function);
訂閱過程:
Observer 實際接收者;
--->
map.subscribe(實際接收者)仅讽;
--->
Observer mapObserver = new Observer(實際接收者);
just.subsribe(mapObserver);
--->
導(dǎo)致數(shù)據(jù)流動開始陶缺,int ---> mapObserver --> string --> 實際接收者
簡單分析了一下組合調(diào)用流程。