參考鏈接:
http://www.reibang.com/p/464fa025229e
Rxjava2學(xué)習(xí)筆記二:RxJava2進(jìn)階使用-zip操作符
http://www.reibang.com/p/ef8b620fdc4c
Rxjava2學(xué)習(xí)筆記三:RxJava2進(jìn)階使用-map操作符
http://www.reibang.com/p/f7efc1aeb6c9
1.Gradle配置
- compile 'io.reactivex.rxjava2:rxjava:2.0.1'
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
2.原理
-
先假設(shè)2根水管:
- 上面一根水管為事件產(chǎn)生的水管辛藻,叫它上游吧,下面一根水管為事件接收的水管叫它下游吧
- 兩根水管通過某種方式連接在一起:使得上游每產(chǎn)生一個(gè)事件食棕,下游都能收到該事件
上游事件產(chǎn)生順序:1->2->3;下游事件接收順序:1->2->3
基本使用例1
注:Rxjava2中的emitter-》發(fā)射器,用于被觀察者發(fā)射事件
Disposable-》RxJava1.x中的Subscription,用于解除訂閱
//創(chuàng)建一個(gè)上游 Observable:
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
});
//創(chuàng)建一個(gè)下游 Observer
Observer<Integer> observer = new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "subscribe");
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "" + value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "error");
}
@Override
public void onComplete() {
Log.d(TAG, "complete");
}
};
//建立連接
observable.subscribe(observer);
基本使用例2-鏈?zhǔn)秸{(diào)用
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "subscribe");
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "" + value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "error");
}
@Override
public void onComplete() {
Log.d(TAG, "complete");
}
});
- 注:ObservableEmitter[發(fā)射器]:發(fā)出事件
->調(diào)用emitter的onNext(T value)琢岩、onComplete()和onError(Throwable error)
->發(fā)出next事件、complete事件和error事件
Emitter-發(fā)送規(guī)則
- 1.上游可以發(fā)送無限個(gè)onNext, 下游也可以接收無限個(gè)onNext.
- 2.當(dāng)上游發(fā)送了一個(gè)onComplete后, 上游onComplete之后的事件將會(huì)繼續(xù)發(fā)送,
而下游收到onComplete事件之后將不再繼續(xù)接收事件. - 3.當(dāng)上游發(fā)送了一個(gè)onError后, 上游onError之后的事件將繼續(xù)發(fā)送, 而下游收到onError事件之后將不再繼續(xù)接收事件.
- 4.上游可以不發(fā)送onComplete或onError.
- 5.最為關(guān)鍵的是onComplete和onError必須唯一并且互斥, 即不能發(fā)多個(gè)onComplete, 也不能發(fā)多個(gè)onError, 也不能先發(fā)一個(gè)onComplete, 然后再發(fā)一個(gè)onError, 反之亦然
發(fā)送規(guī)則示意圖
1.發(fā)送onNext()事件
2.發(fā)送onComplete()事件
Disposable-訂閱
1.相當(dāng)于RxJava1.x中的Subscription,用于解除訂閱
2.解除訂閱:disposable.dispose();
3.多個(gè)Disposable時(shí)取消訂閱,RxJava中已經(jīng)內(nèi)置了一個(gè)容器CompositeDisposable,每當(dāng)我們得到一個(gè)Disposable時(shí)就調(diào)用CompositeDisposable.add()將它添加到容器中, 在退出的時(shí)候, 調(diào)用CompositeDisposable.clear() 即可切斷所有的水管.
-
4.eg:
Disposable disposable = observable.subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { //這里接收數(shù)據(jù)項(xiàng) } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { //這里接收onError } }, new Action() { @Override public void run() throws Exception { //這里接收onComplete间狂。 } }); disposable.dispose();//解除訂閱
Consumer-消費(fèi)者
- 1.用于接收單個(gè)值暂吉,其他的如:BiConsumer則是接收兩個(gè)值陆蟆,F(xiàn)unction用于變換對(duì)象字支,Predicate用于判斷用法如上的例子
- 2.替代了RxJava1.x中的Action/Func接口
3.Rxjava線程調(diào)度
- 1 subscribeOn(Schedulers.io())//上游發(fā)送事件的線程,第一次有效(如網(wǎng)絡(luò)請(qǐng)求可在IO或子線程發(fā)送事件)
observeOn(AndroidSchedulers.mainThread())//下游接收事件的線程(主線程接收返回信息后更新UI)
observeOn(Schedulers.io())//線程切換凤藏;可多次切換每調(diào)用一次observeOn() , 下游的線程就會(huì)切換一次.