classpath 'me.tatarka:gradle-retrolambda:3.2.0'
compile 'io.reactivex:rxjava:1.0.14'
compile 'io.reactivex:rxandroid:1.0.1'
Observable & Observer
- 觀察者(Observer)對可觀察對象(Observable)發(fā)射的數(shù)據(jù)或者數(shù)據(jù)序列作出響應(yīng)侥啤。
- 這種模式極大的簡化了并發(fā)操作咧欣,創(chuàng)建一個處于待命狀態(tài)的觀察者哨兵匀奏,在未來某個時刻響應(yīng)Observable的通知羊苟,不需要阻塞等待Observable發(fā)射數(shù)據(jù)
public class Observable<T> {
final OnSubscribe<T> onSubscribe;
protected Observable(OnSubscribe<T> f) {
this.onSubscribe = f;
}
}
Observable的具體工作都是在OnSubscribe中完成的
public interface OnSubscribe<T> extends Action1<Subscriber<? super T>> {
void call(Subscriber<? super T> subscriber);
}
Observable調(diào)用onNext()
發(fā)射數(shù)據(jù)祭阀,方法的參數(shù)就是發(fā)射的數(shù)據(jù)傻工,可能會被調(diào)用零次或多次;最后會有一次onCompleted()
咙冗、onError()
調(diào)用(不會同時)
public interface Observer<T> {
void onCompleted() {}
void onError(Throwable e) {}
void onNext(T t) {}
}
創(chuàng)建Observable
-
Observable.create();
Observable.create(new Observable.OnSubscribe<String>() { public void call(Subscriber<? super String> subscriber) { // TODO 做自己的事情 if (subscriber.isUnsubscribed()) return; subscriber.onNext("Hello"); if (!subscriber.isUnsubscribed()) { subscriber.onCompleted(); } } });
-
Observable.just(xxx()); 將傳統(tǒng)的java方法轉(zhuǎn)變?yōu)镺bservable
T t = xxx(); Observable.create((subscriber) -> { subscriber.onNext(t); subscriber.onCompleted(); });
Observable.just(T, T); // 等價于Observable.from(T[]);
Observable.from(Iterable); // 從集合中一個接一個的發(fā)出每一個對象
-
Observable.empty();
Observable.create((subscriber) -> { subscriber.onCompleted(); });
-
Observable.never();
Observable.create((subscriber) -> { // do nothing });
subscribe()
真正開始發(fā)射數(shù)據(jù)
-
沒有傳Oberver沾歪,僅僅是為了開啟Observable,而不用管發(fā)出的任何值
observable.subscribe();
-
傳入Observer:內(nèi)部new Subscriber()雾消,將回調(diào)委托給傳入的Observer
observable.subscribe(observer);
-
傳入Action1<T>:內(nèi)部new Subscriber()灾搏,action作為onNext()的回調(diào)
observable.subscribe(new Action1<T>() { public void call(T t) {} });
操作符
用于在Observable和最終的Subscriber之間修改Observable發(fā)出的數(shù)據(jù)挫望;通過代理將subscriber層層組合
public interface Operator<R, T> extends Func1<Subscriber<? super R>, Subscriber<? super T>> {
Subscriber<? super T> call(Subscriber<? super R> s);
}
-
filter():過濾不為空,且以org開頭的
return Observable.from(getAllApps()) .filter(new Func1<AppInfo, Boolean>() { public Boolean call(AppInfo appInfo) { return appInfo != null && appInfo.getPackName().startsWith("org"); } });
skip() / skipLast():跳過前兩個 / 跳過后兩個
take() / takeLast():獲取前三個 / 獲取后三個
repate(3):重復(fù)執(zhí)行
-
distinct():記錄每一個發(fā)射數(shù)據(jù)狂窑,過濾掉重復(fù)的數(shù)據(jù)項(xiàng)
Observable.just(1, 2, 1, 1, 2, 3).distinct(); // 1, 2, 3
distinctUntilChanged():只判定一個數(shù)據(jù)和它的直接前驅(qū)是否不同
sample():
timeout():
doOnNext():類似切面
doOnCompleted():
-
map():把一個事件轉(zhuǎn)換為另一個事件
Observable.from(getAllApps()) .map(new Func1<AppInfo, AppInfo>() { public AppInfo call(AppInfo appInfo) { String curPackName = appInfo.getPackName(); appInfo.setPackName(curPackName.toUpperCase()); return appInfo; } });
flatMap():接收一個Observable的輸出作為輸入媳板,同時輸出另外一個Observable
Subject
既是Observable可以發(fā)出數(shù)據(jù),也是Observer接收數(shù)據(jù)泉哈;可以作為橋梁
PublishSubject
PublishSubject<String> subject = PublishSubject.create();
subject.subscribe(new Observer<String>() {
public void onCompleted() {}
public void onError(Throwable e) {}
public void onNext(String s) {}
});
subject.onNext("Hello World");
Scheduler
- Schedulers.immediate(); // 默認(rèn),立即在當(dāng)前線程執(zhí)行指定的工作
- Schedulers.newThread()
- Schedulers.io()
- Schedulers.computation()
- Schedulers.tram
Schedulers.io().createWorker().schedule(() -> {});
不同的操作符對應(yīng)不同的調(diào)度器
observable
.subscribeOn(AndroidSchedulers.mainThread()) // 指定觀察者代碼運(yùn)行的線程
.observeOn(Schedulers.computation()) // 指定訂閱者運(yùn)行的線
Android
-
Android的調(diào)度器
- AndroidSchedulers.mainThread()
- HandlerScheduler.from(handler)
-
當(dāng)在Activity中訂閱一個Observable的結(jié)果時拷肌,必須在onDestory里取消訂閱
private Subscription subscription; protected void onCreate(Bundle savedInstanceState) { this.subscription = observable.subscribe(this); } protected void onDestory() { super.onDestory(); this.subscription.unsubscribe(); }