RxJava 2.0 簡(jiǎn)單介紹
一年一年有一年,RxJava也新增了2.0版本,那么為什么是新增版本而不說(shuō)升級(jí)版本呢?
因?yàn)?.0版本和1.0版本兩者并不兼容信轿,2.0版本是基于Reactive-Streams規(guī)范重新設(shè)計(jì)而來(lái);同時(shí)1.x版本和2.x版本兩者會(huì)并行開發(fā)維護(hù)残吩,但是1.x版本只維護(hù)到2018-03-31财忽。
下面我們簡(jiǎn)單介紹一下兩者的不同。
0x00 依賴&包名不同
使用rxjava 1.x泣侮、2.x版本的依賴如下:
// rxjava 1.x
compile 'io.reactivex:rxjava:1.1.6'
// rxjava 2.x
compile "io.reactivex.rxjava2:rxjava:2.x.y"
包名修改如下:
// 1.x -> 2.x
rx.** -> io.reactivex.**
0x01 Observable與Flowable
Observable
在2.0版本不支持backpressure即彪,它會(huì)緩存全部的數(shù)據(jù),一一發(fā)送給消費(fèi)者旁瘫,如果消費(fèi)不及時(shí)祖凫,會(huì)產(chǎn)生OOM。于此對(duì)應(yīng)酬凳,在2.x版本新增了Flowable
惠况,支持設(shè)置/自定義backpressure,同時(shí)在創(chuàng)建時(shí)必須制定backpressure宁仔。
Flowable.create(new FlowableOnSubscribe<Object>() {
@Override
public void subscribe(FlowableEmitter<Object> e) throws Exception {
for (int i = 0; i < 256; i++) {
e.onNext(i);
}
e.onComplete();
}
}, BackpressureStrategy.BUFFER).subscribe(System.out::println, Throwable::printStackTrace);
0x02 Single
當(dāng)使用Single
時(shí)稠屠,生產(chǎn)者調(diào)用onSuccess()
通知訂閱者,同時(shí)終止整個(gè)事件流,生產(chǎn)者只能發(fā)送一個(gè)success事件权埠,訂閱者也只能收到一個(gè)success事件榨了,適用于網(wǎng)絡(luò)請(qǐng)求等確定只有單個(gè)事件的事件流。對(duì)于1.x版本而言攘蔽,則需要主動(dòng)調(diào)用onComplete()來(lái)終止事件流龙屉。
注意: Single沒有onComplete()方法;只能產(chǎn)生success满俗、error兩種事件转捕。
Single.create(s -> s.onSuccess("aaaa"))
.subscribe(System.out::println, Throwable::printStackTrace);
0x03 Completable
當(dāng)使用Completable
時(shí),生產(chǎn)者通過(guò)調(diào)用onComplete()
終止事件流唆垃,訂閱者會(huì)收到事件結(jié)束回調(diào)五芝,適用于訂閱者僅需要知道事件結(jié)束,而不需要執(zhí)行結(jié)果的情形辕万。
注意: Completable沒有onSuccess()方法枢步;只能產(chǎn)生complete、error兩種事件渐尿。
Completable.create(new CompletableOnSubscribe() {
@Override
public void subscribe(CompletableEmitter e) throws Exception {
// do something;
e.onComplete();
}
}).subscribe();
0x04 Maybe
Maybe
是Single
和Completable
的組合體醉途,相較于Single
只能發(fā)送一次item,Completable
只能通知事件結(jié)束砖茸,Maybe
可以發(fā)送最多一個(gè)item结蟋,也就是可以發(fā)送一個(gè)item或者直接終止事件流。
當(dāng)Maybe
調(diào)用onSuccess()
結(jié)束事件流時(shí)渔彰,訂閱者收到一次success事件;當(dāng)Maybe
調(diào)用onComplete()
結(jié)束事件流時(shí)推正,訂閱者只能收到事件結(jié)束事件恍涂。
- onSuccess()收到一次事件:
Maybe.create(new MaybeOnSubscribe<Object>() {
@Override
public void subscribe(MaybeEmitter<Object> e) throws Exception {
e.onSuccess("aaa");
}
}).subscribe(System.out::println, Throwable::printStackTrace, () -> {
System.out.println("onCompletable...");
});
- onComplete()收到結(jié)束事件:
Maybe.create(new MaybeOnSubscribe<Object>() {
@Override
public void subscribe(MaybeEmitter<Object> e) throws Exception {
e.onComplete();
}
}).subscribe(System.out::println, Throwable::printStackTrace, () -> {
System.out.println("onCompletable...");
});
注意: Maybe擁有onSuccess()和onComplete()方法;可以產(chǎn)生success植榕、complete再沧、error三種事件,其中success和complete是對(duì)立的尊残。
0x05 Null
2.0x版本不支持傳遞null事件炒瘸,會(huì)拋出NullPointerException
終止整個(gè)事件流。
Single.create(new SingleOnSubscribe<Object>() {
@Override
public void subscribe(SingleEmitter<Object> e) throws Exception {
e.onSuccess(null);
}
}).subscribe(System.out::println, Throwable::printStackTrace);
錯(cuò)誤日志如下:
java.lang.NullPointerException: onSuccess called with null. Null values are generally not allowed in 2.x operators and sources.
0x06 取消訂閱
1. 接口改變
2.x版本由于按照Reactive-Streams規(guī)范進(jìn)行開發(fā)寝衫,而在Reactive-Streams中已經(jīng)定義了org.reactivestreams.Subscription
接口
package org.reactivestreams;
public interface Subscription {
void request(long var1);
void cancel();
}
顷扩,而1.x版本也定義了一個(gè)rx.Subscription
接口
package rx;
public interface Subscription {
void unsubscribe();
boolean isUnsubscribed();
}
2. 簡(jiǎn)單取消訂閱
可以看到兩個(gè)類名一樣,但是接口方法并不一樣慰毅,含義也不相同隘截,所以為了避免歧義,2.x版本中干掉了舊的Subscription
,同時(shí)使用Disposable
接口來(lái)替代舊的Subscription
婶芭。具體代碼如下:
// 1.x 調(diào)用unsubscribe()方法來(lái)取消訂閱
final rx.Subscription subscription = rx.Observable.just(1, 2, 3).subscribe();
subscription.unsubscribe();
// 2.x 調(diào)用dispose()方法來(lái)取消訂閱
final Disposable subscriber = Flowable.just(1, 2, 3).subscribe();
subscriber.dispose();
3. 使用Subscriber取消訂閱
在1.x版本中东臀,我們調(diào)用subscribe()
后會(huì)返回一個(gè)rx.Subscription
,我們可以使用它進(jìn)行操作犀农;在2.x版本中惰赋,我們調(diào)用subscribe()
時(shí),如果傳入的是Subscriber
呵哨,那就返回值是void
赁濒,需要大家自己保存引用。
// 1.x
rx.Subscription subscription = rx.Observable.just(1, 2, 3)
.subscribe(new rx.Subscriber<Integer>() {
@Override
public void onNext(Integer integer) {
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
});
subscription.unsubscribe();
// 2.x
ResourceSubscriber<Integer> resourceSubscriber = new ResourceSubscriber<Integer>() {
@Override
public void onNext(Integer integer) {
}
@Override
public void onError(Throwable t) {
// must dispose;
dispose();
}
@Override
public void onComplete() {
// must dispose;
dispose();
}
};
// 注意當(dāng)傳入subscriber進(jìn)行訂閱時(shí)仇穗,返回值是void流部,所以需要自己保存;
Flowable.just(1, 2, 3).subscribe(resourceSubscriber);
resourceSubscriber.dispose();
4. 批量取消訂閱
1.x版本使用rx.CompositeSubscription
批量取消訂閱;2.x版本使用io.reactivex.disposables.CompositeDisposable
批量取消訂閱纹坐。
0x07 Subject & Processor
按照Reactive-Streams規(guī)范枝冀,Subject
是一種行為,既是消費(fèi)者耘子,同時(shí)也是生成者果漾,最終被定義為org.reactivestreams.Processor
接口,故而谷誓,在1.x版本中的subject
绒障,在2.x版本中就變成了processor
,并且支持backpressure捍歪。同時(shí)2.x版本中保留了1.x版本的subject
户辱,配合Observable
使用,不過(guò)也不支持backpressure糙臼。如:
// 1.x
Subject<Object, Object> subject= new SerializedSubject<>(PublishSubject.<Object>create());
subject.onNext("aaa");
subject.onError("aaa");
subject.onComplete();
// 2.x
final FlowableProcessor<Object> objectFlowableProcessor =
PublishProcessor.create().toSerialized();
objectFlowableProcessor.onNext("aa");
objectFlowableProcessor.onError(new Throwable());
objectFlowableProcessor.onComplete();