描述 | RxJava 1.X | RxJava 2.X |
---|---|---|
package 包名 |
rx.xxx |
io.reactivex.xxx |
Reactive Streams規(guī)范 |
1.X 早于Reactive Streams 規(guī)范出現(xiàn)池磁,僅部分支持規(guī)范 |
完全支持 |
Backpressure 背壓 | 對背壓的支持不完善 |
Observable 設計為不支持背壓新增 Flowable 支持背壓 |
null 空值 |
支持 | 不再支持null 值,傳入null 值會拋出 NullPointerException
|
Schedulers 線程調(diào)度器 |
Schedulers.immediate() Schedulers.trampoline() Schedulers.computation() Schedulers.newThread() Schedulers.io() Schedulers.from(executor) AndroidSchedulers.mainThread()
|
移除Schedulers.immediate() 新增 Schedulers.single() 其它未變 |
Single |
行為類似Observable 湃密,但只會發(fā)射一個onSuccess 或onError
|
按照Reactive Streams 規(guī)范重新設計型型,遵循協(xié)議onSubscribe(onSuccess/onError)
|
Completable |
行為類似Observable 和橙,要么全部成功撬腾,要么就失敗 |
按照Reactive Streams 規(guī)范重新設計椅亚,遵循協(xié)議onSubscribe (onComplete/onError)
|
Maybe |
無 |
2.X 新增,行為類似Observable ,可能會有一個數(shù)據(jù)或一個錯誤想幻,也可能什么都沒有软能。可以將其視為一種返回可空值的方法举畸。這種方法如果不拋出異常的話查排,將總是會返回一些東西,但是返回值可能為空抄沮,也可能不為空跋核。按照Reactive Streams 規(guī)范設計,遵循協(xié)議onSubscribe (onSuccess/onError/onComplete)
|
Flowable |
無 |
2.X 新增叛买,行為類似Observable 砂代,按照Reactive Streams 規(guī)范設計,支持背壓Backpressure
|
Subject |
AsyncSubject BehaviorSubject PublishSubject ReplaySubject UnicastSubject
|
2.X 依然維護這些Subject 現(xiàn)有的功能率挣,并新增:AsyncProcessor BehaviorProcessor PublishProcessor ReplayProcessor UnicastProcessor 支持背壓 Backpressure
|
Subscriber |
Subscriber |
由于與Reactive Streams 的命名沖突刻伊,Subscriber 已重命名為Disposable
|
RxJava 2.X + Retrofit + OkHttp 簡單示例點這里
library
依賴變化
//1.X
compile 'io.reactivex:rxjava:1.2.1'
compile 'io.reactivex:rxandroid:1.2.1'
//2.X
compile 'io.reactivex.rxjava2:rxjava:2.0.0'
compile 'io.reactivex.rxjava2:rxandroid:2.0.0'
package
變化
變動主要為rx.xxx --> io.reactivex.xxx
//1.X
import rx.Observable;
import rx.Subscription;
import rx.android.schedulers.AndroidSchedulers;
import rx.schedulers.Schedulers;
import rx.functions.Action1;
//2.X
import io.reactivex.Observable;
import io.reactivex.ObservableSource;
import io.reactivex.ObservableTransformer;
import io.reactivex.disposables.Disposable;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.functions.Consumer;
null
值
RxJava 2.X
不再支持null
值,如果傳入一個null
會拋出NullPointerException
Observable.just(null);
Single.just(null);
Observable.fromCallable(() -> null)
.subscribe(System.out::println, Throwable::printStackTrace);
Observable.just(1).map(v -> null)
.subscribe(System.out::println, Throwable::printStackTrace);
案例1
//1.X
public static final Observable.Transformer IO_TRANSFORMER = new Observable.Transformer() {
@Override public Object call(Object observable) {
return ((Observable) observable).subscribeOn(Schedulers.io())
.unsubscribeOn(Schedulers.io())
.observeOn(Schedulers.io());
}
};
public static final <T> Observable.Transformer<T, T> applySchedulers(Observable.Transformer transformer){
return (Observable.Transformer<T, T>)transformer;
}
Action1<Integer> onNext = null;
String[] items = { "item1", "item2", "item3" };
Subscription subscription = Observable.from(items)
.compose(RxUtil.<String>applySchedulers(IO_TRANSFORMER))
.map(new Func1<String, Integer>() {
@Override public Integer call(String s) {
return Integer.valueOf(s);
}
})
.subscribe(onNext);
//TODO subscription.unsubscribe();
//2.X
public static final ObservableTransformer IO_TRANSFORMER = new ObservableTransformer() {
@Override public ObservableSource apply(Observable upstream) {
return upstream.subscribeOn(Schedulers.io())
.unsubscribeOn(Schedulers.io())
.observeOn(Schedulers.io());
}
};
public static final <T> ObservableTransformer<T, T> applySchedulers(ObservableTransformer transformer){
return (ObservableTransformer<T, T>)transformer;
}
Consumer<Integer> onNext = null;
String[] items = { "item1", "item2", "item3" };
Disposable disposable = Observable.fromArray(items)
.compose(RxUtil.<String>applySchedulers(IO_TRANSFORMER))
.map(new Function<String, Integer>() {
@Override public Integer apply(String s) throws Exception {
return Integer.valueOf(s);
}
})
.subscribe(onNext);
//TODO disposable.dispose();
-
.subscribe(...)
返回值的變化:1.X為Subscription
, 2.X為Disposable
-
Transformer
的變化:1.X為rx.Observable
內(nèi)部的Transformer
接口, 繼承自Func1<Observable<T>, Observable<R>>
, 2.X為io.reactivexObservableTransformer<Upstream, Downstream>
,是一個獨立的接口 -
AndroidSchedulers
的變化: 1.X為rx.android.schedulers.AndroidSchedulers
, 2.X為io.reactivex.android.schedulers.AndroidSchedulers
-
Func1
的變化: 1.X為rx.functions.Func1
, 2.X為io.reactivex.functions.Function
-
其它重載方法見下方截圖
案例2
//1.X
public class AppBaseActivity extends AppCompatActivity {
...
private CompositeSubscription mCompositeSubscription;
protected void addSubscription(Subscription subscription) {
if (null == mCompositeSubscription) {
mCompositeSubscription = new CompositeSubscription();
}
mCompositeSubscription.add(subscription);
}
@Override protected void onDestroy() {
if (null != mCompositeSubscription) {
mCompositeSubscription.unsubscribe();
}
super.onDestroy();
}
...
}
//2.X
public class AppBaseActivity extends AppCompatActivity {
...
private CompositeDisposable mCompositeDisposable;
protected void addDisposable(Disposable disposable) {
if (null == mCompositeDisposable) {
mCompositeDisposable = new CompositeDisposable();
}
mCompositeDisposable.add(disposable);
}
@Override protected void onDestroy() {
if (null != mCompositeDisposable) {
mCompositeDisposable.clear();
}
super.onDestroy();
}
...
}
我現(xiàn)在維護的項目基本就這些改動椒功,如有錯誤之處捶箱,還望批評指正,謝謝动漾!