RxJava 2相比于RxJava 1首繁,改動(dòng)還是比較大的作郭,這里我們來簡(jiǎn)單說一下API上的改變
1.Flowable
RxJava1 中 Observable 不能很好地支持 backpressure ,最常見的例如UI事件弦疮,而不處理backpressure有可能導(dǎo)致MissingBackpressureException的出現(xiàn)夹攒。所以在 RxJava2 中 Oberservable 不再支持 backpressure ,而使用新增的 Flowable 來支持 backpressure 胁塞。
Flowable的用法跟原先的Observable是一樣的咏尝。
2.ActionN 和 FuncN 改名
ActionN 和 FuncN 遵循Java 8的命名規(guī)則。
其中啸罢,Action0 改名成Action编检,Action1改名成Consumer,而Action2改名成了BiConsumer扰才,而Action3 - Action9都不再使用了允懂,ActionN變成了Consumer<Object[]> 。
同樣衩匣,F(xiàn)unc改名成Function蕾总,F(xiàn)unc2改名成BiFunction粥航,F(xiàn)unc3 - Func9 改名成 Function3 - Function9,F(xiàn)uncN 由 Function<Object[], R> 取代生百。
3.Observable.OnSubscribe 變成 ObservableOnSubscribe
RxJava1的寫法:
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("hello");
}
}).subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.println(s);
}
});
RxJava2的寫法:
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("hello");
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) {
System.out.println(s);
}
});
4. ObservableOnSubscribe 中使用 ObservableEmitter 發(fā)送數(shù)據(jù)給 Observer
結(jié)合上一條递雀,ObservableOnSubscribe 不再使用 Subscriber 而是用 ObservableEmitter 替代。
ObservableEmitter 可以理解為發(fā)射器蚀浆,是用來發(fā)出事件的缀程,它可以發(fā)出三種類型的事件,通過調(diào)用emitter的onNext(T value) 市俊、onComplete()和onError(Throwable error)可以分別發(fā)出next事件杠输、complete事件和error事件。 如果只關(guān)心next事件的話秕衙,只需單獨(dú)使用onNext()即可蠢甲。
需要特別注意,emitter的onComplete()調(diào)用后据忘,Consumer不再接收任何next事件鹦牛。
5. Observable.Transformer 變成 ObservableTransformer
RxJava1的寫法:
/**
* 跟compose()配合使用,比如ObservableUtils.wrap(obj).compose(toMain())
* @param <T>
* @return
*/
public static <T> Observable.Transformer<T, T> toMain() {
return new Observable.Transformer<T, T>() {
@Override
public Observable<T> call(Observable<T> tObservable) {
return tObservable
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
};
}
RxJava2的寫法:
/**
* 跟compose()配合使用,比如ObservableUtils.wrap(obj).compose(toMain())
* @param <T>
* @return
*/
public static <T> ObservableTransformer<T, T> toMain() {
return new ObservableTransformer<T, T>() {
@Override
public ObservableSource<T> apply(Observable<T> upstream) {
return upstream.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
};
}
由于新增了Flowable,同理也增加了FlowableTransformer
public static <T> FlowableTransformer<T, T> toMain() {
return new FlowableTransformer<T, T>() {
@Override
public Publisher<T> apply(Flowable<T> upstream) {
return upstream.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
};
}
6.Subscription 改名為 Disposable
在 RxJava2 中勇吊,由于已經(jīng)存在了 org.reactivestreams.subscription 這個(gè)類曼追,為了避免名字沖突將原先的 rx.Subscription 改名為 io.reactivex.disposables.Disposable 。
剛開始不知道汉规,在升級(jí) RxJava2 時(shí)發(fā)現(xiàn) org.reactivestreams.subscription 這個(gè)類完全沒法做原先 rx.Subscription 的事情:(
順便說下礼殊,Disposable必須單次使用,用完就要銷毀针史。
7. first() 用法改變
官方文檔是這么描述的first()
的用法
1.x | 2.x |
---|---|
first() | RC3 renamed to firstElement and returns Maybe<T> |
first(Func1) | dropped, use filter(predicate).first() |
firstOrDefault(T) | renamed to first(T) and RC3 returns Single<T> |
firstOrDefault(Func1, T) | renamed to first(T) and RC3 returns Single<T> |
以first(Func1)
為例晶伦,first(Func1)
后面還使用了push()
,原先 Rxjava1會(huì)這樣寫
ConnectableObservable<Data> connectableObservable = Observable
.concat(Observable.from(list))
.first(new Func1<Data, Boolean>() {
@Override
public Boolean call(Data data) {
return DataUtils.isAvailable(data);
}
}).publish();
RxJava2 改成這樣
ConnectableObservable<Data> connectableObservable = Observable
.concat(Observable.fromIterable(list))
.filter(new Predicate<Data>() {
@Override
public boolean test(@NonNull Data data) throws Exception {
return DataUtils.isAvailable(data);
}
}).firstElement().toObservable().publish();
8. toBlocking().y 被 blockingY() 取代
在我的框架中存在著一個(gè)Optional類啄枕,它跟Java 8的Optional作用差不多婚陪,原先是使用RxJava1來編寫的。
import rx.Observable;
/**
* 使用方法:
* String s = null;
* Optional.ofNullable(s).orElse("default")); // 如果s為null,則顯示default,否則顯示s的值
* @author Tony Shen
*
*/
public class Optional<T> {
Observable<T> obs;
public Optional(Observable<T> obs) {
this.obs = obs;
}
public static <T> Optional<T> of(T value) {
if (value == null) {
throw new NullPointerException();
} else {
return new Optional<T>(Observable.just(value));
}
}
public static <T> Optional<T> ofNullable(T value) {
if (value == null) {
return new Optional<T>(Observable.<T>empty());
} else {
return new Optional<T>(Observable.just(value));
}
}
public T get() {
return obs.toBlocking().single();
}
public T orElse(T defaultValue) {
return obs.defaultIfEmpty(defaultValue).toBlocking().single();
}
}
升級(jí)到RxJava2之后频祝,get()
和 orElse()
方法都會(huì)報(bào)錯(cuò)泌参,修改之后是這樣的。
import io.reactivex.Observable;
/**
* 使用方法:
* String s = null;
* Optional.ofNullable(s).orElse("default"); // 如果s為null,則顯示default,否則顯示s的值
* @author Tony Shen
*
*/
public class Optional<T> {
Observable<T> obs;
public Optional(Observable<T> obs) {
this.obs = obs;
}
public static <T> Optional<T> of(T value) {
if (value == null) {
throw new NullPointerException();
} else {
return new Optional<T>(Observable.just(value));
}
}
public static <T> Optional<T> ofNullable(T value) {
if (value == null) {
return new Optional<T>(Observable.<T>empty());
} else {
return new Optional<T>(Observable.just(value));
}
}
public T get() {
return obs.blockingSingle();
}
public T orElse(T defaultValue) {
return obs.defaultIfEmpty(defaultValue).blockingSingle();
}
}
9. PublishSubject
包括 PublishSubject 以及各種 Subject(ReplaySubject常空、BehaviorSubject沽一、AsyncSubject) 都不再支持backpressure。
總結(jié)
RxJava2 所帶來的變化遠(yuǎn)遠(yuǎn)不止這些漓糙,以后遇到的話還會(huì)繼續(xù)整理和總結(jié)铣缠,畢竟我使用的 RxJava2 還是很少的一部分內(nèi)容。
RxJava2 最好到文檔依然是官方文檔。如果是新項(xiàng)目到話攘残,可以毫不猶豫地使用RxJava2拙友,如果是在線上已經(jīng)成熟穩(wěn)定的項(xiàng)目为狸,可以再等等歼郭。對(duì)于新手的話,可以直接從 RxJava2 學(xué)起辐棒,RxJava1 就直接略過吧病曾。對(duì)于老手,RxJava2 還是使用原來的思想漾根,區(qū)別不大泰涂,從 RxJava1 遷移到 Rxjava2 也花不了多少工夫。