RxJava是ReactiveX中使用Java語言實現(xiàn)的版本兽愤,而ReactiveX是一種基于異步數(shù)據(jù)流概念的編程模式,響應(yīng)式編程屡律。
RxJava 有四個基本概念:
Observable (可觀察者腌逢,即被觀察者)、 Observer (觀察者)超埋、 subscribe (訂閱)搏讶、事件佳鳖。
Observable 和 Observer 通過 subscribe() 方法實現(xiàn)訂閱關(guān)系,從而 Observable 可以在需要的時候發(fā)出事件來通知 Observer媒惕。
- onCompleted(): 事件隊列完結(jié)系吩。RxJava 不僅把每個事件單獨處理,還會把它們看做一個隊列妒蔚。RxJava 規(guī)定穿挨,當(dāng)不會再有新的
- onNext() 發(fā)出時,需要觸發(fā) onCompleted() 方法作為標(biāo)志肴盏。
- onError(): 事件隊列異常科盛。在事件處理過程中出異常時,onError() 會被觸發(fā)菜皂,同時隊列自動終止贞绵,不允許再有事件發(fā)出。
- 在一個正確運行的事件序列中, onCompleted() 和 onError() 有且只有一個恍飘,并且是事件序列中的最后一個榨崩。需要注意的是,onCompleted() 和 onError() 二者也是互斥的章母,即在隊列中調(diào)用了其中一個蜡饵,就不應(yīng)該再調(diào)用另一個。
Observer 觀察者
它決定事件觸發(fā)的時候?qū)⒂性鯓拥男袨楦焓T贠bserver中有三個方法,onNext()肢专、onCompleted()舞肆、onError();
//創(chuàng)建Observer
Observer<String> observer = new Observer<String>() {
@Override
public void onNext(String s) {
Log.d(tag, "Item: " + s);
}
@Override
public void onCompleted() {
Log.d(tag, "Completed!");
}
@Override
public void onError(Throwable e) {
Log.d(tag, "Error!");
}
};
除了Observer接口之外,RxJava還內(nèi)置了一個實現(xiàn)了 Observer 的抽象類:Subscriber博杖。Subscriber對Observer接口進(jìn)行了一些擴展椿胯,但他們的基本使用方式是完全一樣。
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onNext(String s) {
Log.d(tag, "Item: " + s);
}
@Override
public void onCompleted() {
Log.d(tag, "Completed!");
}
@Override
public void onError(Throwable e) {
Log.d(tag, "Error!");
}
};
-
onStart(): 這是 Subscriber 增加的方法剃根。它會在 subscribe 剛開始哩盲,而事件還未發(fā)送之前被調(diào)用,可以用于做一些準(zhǔn)備工作狈醉,例如數(shù)據(jù)的清零或重置廉油。
這是一個可選方法,默認(rèn)情況下它的實現(xiàn)為空苗傅。需要注意的是抒线,如果對準(zhǔn)備工作的線程有要求(例如彈出一個顯示進(jìn)度的對話框,這必須在主線程執(zhí)行)渣慕, onStart() 就不適用了嘶炭,因為它總是在 subscribe 所發(fā)生的線程被調(diào)用抱慌,而不能指定線程。要在指定的線程來做準(zhǔn)備工作眨猎,可以使用 doOnSubscribe() 方法抑进,具體可以在后面的文中看到。
-
unsubscribe(): 這是 Subscriber 所實現(xiàn)的另一個接口 Subscription 的方法睡陪,用于取消訂閱寺渗。在這個方法被調(diào)用后,Subscriber 將不再接收事件宝穗。一般在這個方法調(diào)用前户秤,可以使用 isUnsubscribed() 先判斷一下狀態(tài)。
unsubscribe() 這個方法很重要逮矛,因為在 subscribe() 之后鸡号, Observable 會持有 Subscriber 的引用,這個引用如果不能及時被釋放须鼎,將有內(nèi)存泄露的風(fēng)險鲸伴。所以最好保持一個原則:要在不再使用的時候盡快在合適的地方(例如 onPause() onStop() 等方法中)調(diào)用 unsubscribe() 來解除引用關(guān)系,以避免內(nèi)存泄露的發(fā)生晋控。
Observable 被觀察者
它決定什么時候觸發(fā)事件以及觸發(fā)怎樣的事件汞窗。 RxJava 使用 create() 方法來創(chuàng)建一個 Observable。
在RxJava中有幾種不同的Observables:
- Observable<T>:能夠發(fā)射0或者n個數(shù)據(jù)赡译,并以成功或錯誤事件終止仲吏。
- Flowable<T>:能夠發(fā)射0或者n個數(shù)據(jù),并以成功或錯誤事件終止蝌焚。支持Backpressure裹唆,可以控制數(shù)據(jù)流發(fā)射速度。
- Single<T>:只發(fā)射單個數(shù)據(jù)或錯誤事件只洒。
- Completable:他從來不發(fā)射數(shù)據(jù)许帐,只處理onComplete和onError事件”锨矗可以看成是Rx的Runable成畦。
- Mapbe<T>:能夠發(fā)射0或者1個數(shù)據(jù),要么成功涝开,要么失敗循帐,有點類似于Optional
//創(chuàng)建Observable
Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("Hello");
subscriber.onNext("RxJava");
subscriber.onCompleted();
}
});
create() 方法是 RxJava 最基本的創(chuàng)造事件序列的方法∫ㄎ洌基于這個方法惧浴, RxJava 還提供了一些方法用來快捷創(chuàng)建事件隊列:
- just(T...): 將傳入的參數(shù)依次發(fā)送出來。
Observable observable = Observable.just("Hello", "RxJava");
// 將會依次調(diào)用:
// onNext("Hello");
// onNext("RxJava");
// onCompleted();
- from(T[]) / from(Iterable<? extends T>) : 將傳入的數(shù)組或 Iterable 拆分成具體對象后奕剃,依次發(fā)送出來衷旅。
String[] words = {"Hello", "RxJava"};
Observable observable = Observable.from(words);
// 將會依次調(diào)用:
// onNext("Hello");
// onNext("RxJava");
// onCompleted();
Subscribe 訂閱
創(chuàng)建了 Observable 和 Observer 之后捐腿,再用 subscribe() 方法將它們聯(lián)結(jié)起來,整條鏈子就可以工作了柿顶。
observable.subscribe(observer);
// 或者:
observable.subscribe(subscriber);
Action
Action是RxJava 的一個接口茄袖,常用的有Action0和Action1。
- Action0: 它只有一個方法 call()嘁锯,這個方法是無參無返回值的宪祥;
由于 onCompleted() 方法也是無參無返回值的,因此 Action0 可以被當(dāng)成一個包裝對象家乘,將 onCompleted() 的內(nèi)容打包起來將自己作為一個參數(shù)傳入 subscribe() 以實現(xiàn)不完整定義的回調(diào)蝗羊。
- Action1:它同樣只有一個方法 call(T param),這個方法也無返回值仁锯,但有一個參數(shù)耀找;
與 Action0 同理,由于 onNext(T obj) 和 onError(Throwable error) 也是單參數(shù)無返回值的业崖,因此 Action1 可以將 onNext(obj)和 onError(error) 打包起來傳入 subscribe() 以實現(xiàn)不完整定義的回調(diào)
創(chuàng)建被觀察者和觀察者并訂閱野芒,Override時只用了onNext(obj),但是還有onError(error)和onCompleted()必須被重寫双炕,但是這兩個方法并沒有被用到導(dǎo)致產(chǎn)生無用代碼狞悲。
Observable.just("Hello", "RxJava")
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
Log.i(TAG, s);
}
});
//Action來代替Subscriber
Observable.just("Hello", "RxJava")
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.i(TAG, s);
}
});
定義三個對象,分別打包onNext(obj)妇斤、onError(error) 摇锋、onCompleted()方法。
Observable observable = Observable.just("Hello", "RxJava");
//處理onNext()中的內(nèi)容
Action1<String> onNextAction = new Action1<String>() {
@Override
public void call(String s) {
Log.i(TAG, s);
}
};
//處理onError()中的內(nèi)容
Action1<Throwable> onErrorAction = new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
}
};
//處理onCompleted()中的內(nèi)容
Action0 onCompletedAction = new Action0() {
@Override
public void call() {
Log.i(TAG, "Completed");
}
};
使用subscribe重載的方法
//使用 onNextAction 來定義 onNext()
Observable.just("Hello", "RxJava").subscribe(onNextAction);
//使用 onNextAction 和 onErrorAction 來定義 onNext() 和 onError()
Observable.just("Hello", "RxJava").subscribe(onNextAction, onErrorAction);
//使用 onNextAction站超、 onErrorAction 和 onCompletedAction 來定義 onNext()荸恕、 onError() 和 onCompleted()
Observable.just("Hello", "RxJava").subscribe(onNextAction, onErrorAction, onCompletedAction);
map和flatmap
除了map和flatMap之外,還有其他操作符以供使用顷编。其他常用的操作符如下:
- filter:集合進(jìn)行過濾
- each:遍歷集合
- take:取出集合中的前幾個
- skip:跳過前幾個元素
map
在使用map時涉及到一個接口,F(xiàn)unc1剑刑。
Func1與Action1很相似媳纬,區(qū)別在于Func1包裝的有返回值方法。
得到多個Person對象中的name施掏,保存到nameList中
Observable.just(person1, person2, person3)
//使用map進(jìn)行轉(zhuǎn)換钮惠,參數(shù)1:轉(zhuǎn)換前的類型,參數(shù)2:轉(zhuǎn)換后的類型
.map(new Func1<Person, String>() {
@Override
public String call(Person p) {
String name = p.getName();//獲取Person對象中的name
return name;//返回name
}
})
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
nameList.add(s);
}
});
多次使用map七芭,想用幾個用幾個
Observable.just("Hello", "RxJava")
.map(new Func1<String, Integer>() {//將String類型的轉(zhuǎn)化為Integer類型的哈希碼
@Override
public Integer call(String s) {
return s.hashCode();
}
})
.map(new Func1<Integer, String>() {//將轉(zhuǎn)化后得到的Integer類型的哈希碼再轉(zhuǎn)化為String類型
@Override
public String call(Integer integer) {
return integer.intValue() + "";
}
})
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.i(TAG, s);
}
});
flatmap
flatmap和map的不同在于素挽,flatmap進(jìn)行遍歷操作時不需要用循環(huán)語句。
使用map來實現(xiàn)打印所有學(xué)生所修個課程名
List<Student> students = new ArrayList<Student>();
students.add...
...
Action1<List<Course>> action1 = new Action1<List<Course>>() {
@Override
public void call(List<Course> courses) {
//遍歷courses狸驳,輸出cuouses的name
for (int i = 0; i < courses.size(); i++){
Log.i(TAG, courses.get(i).getName());
}
}
};
Observable.from(students)
.map(new Func1<Student, List<Course>>() {
@Override
public List<Course> call(Student student) {
//返回coursesList
return student.getCoursesList();
}
})
.subscribe(action1);
在Action1中出現(xiàn)了for來循環(huán)打印課程名预明,使用RxJava就是為了剔除這樣的嵌套結(jié)構(gòu)缩赛。
List<Student> students = new ArrayList<Student>();
students.add...
...
Observable.from(students)
.flatMap(new Func1<Student, Observable<Course>>() {
@Override
public Observable<Course> call(Student student) {
return Observable.from(student.getCoursesList());
}
})
.subscribe(new Action1<Course>() {
@Override
public void call(Course course) {
Log.i(TAG, course.getName());
}
});
flatmap對數(shù)據(jù)做的合并操作,不能保證數(shù)據(jù)傳入和取出的順序一樣撰糠,也就是說flatmap的操作是無序的酥馍;如果想要有序的效果,使用concatMap阅酪。
Scheduler
Scheduler:線程控制器旨袒,可以指定每一段代碼在什么樣的線程中執(zhí)行。
我個人對觀察者模式的理解就是為了解決术辐,異步操作砚尽;后臺處理邏輯,處理結(jié)果回調(diào)到前臺顯示辉词。所以Scheduler線程控制對RxJava來說相當(dāng)重要必孤!
在RxJava中有幾種不同的Scheduler:
- Schedulers.immediate():直接在當(dāng)前線程運行,相當(dāng)于不指定線程较屿。這是默認(rèn)的 Scheduler隧魄。
- Schedulers.newThread():總是啟用新線程,并在新線程執(zhí)行操作隘蝎。
- Schedulers.io(): I/O 操作(讀寫文件购啄、讀寫數(shù)據(jù)庫、網(wǎng)絡(luò)信息交互等)所使用的 Scheduler嘱么。行為模式和 newThread() 差不多狮含,區(qū)別在于 io() 的內(nèi)部實現(xiàn)是是用一個無數(shù)量上限的線程池,可以重用空閑的線程曼振,因此多數(shù)情況下 io() 比newThread() 更有效率几迄。不要把計算工作放在 io() 中,可以避免創(chuàng)建不必要的線程冰评。
- Schedulers.computation():計算所使用的 Scheduler映胁。這個計算指的是 CPU 密集型計算,即不會被 I/O 等操作限制性能的操作甲雅,例如圖形的計算解孙。這個 Scheduler 使用的固定的線程池,大小為 CPU 核數(shù)抛人。不要把 I/O 操作放在 computation() 中弛姜,否則 I/O 操作的等待時間會浪費 CPU。
- AndroidSchedulers.mainThread():它指定的操作將在 Android 主線程運行妖枚。
新的線程發(fā)起事件廷臼,在主線程中消費
Observable.just("Hello", "Word")
.subscribeOn(Schedulers.newThread())//指定 subscribe() 發(fā)生在新的線程
.observeOn(AndroidSchedulers.mainThread())// 指定 Subscriber 的回調(diào)發(fā)生在主線程
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.i(TAG, s);
}
});
subscribeOn(),和observeOn()方法來指定發(fā)生的線程和消費的線程。
- subscribeOn():指定subscribe() 所發(fā)生的線程荠商,即 Observable.OnSubscribe 被激活時所處的線程寂恬。或者叫做事件產(chǎn)生的線程结啼。
- observeOn():指定Subscriber 所運行在的線程掠剑。或者叫做事件消費的線程郊愧。
以及參數(shù)Scheduler朴译。
多次切換線程
Observable.just("Hello", "RxJava")
.subscribeOn(Schedulers.newThread())//指定:在新的線程中發(fā)起
.observeOn(Schedulers.io()) //指定:在io線程中處理
.map(new Func1<String, String>() {
@Override
public String call(String s) {
return handleString(s); //處理數(shù)據(jù)
}
})
.observeOn(AndroidSchedulers.mainThread())//指定:在主線程中處理
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
show(s); //消費事件
}
});
observeOn()被調(diào)用了兩次,分別指定了map的處理的現(xiàn)場和消費事件show(s)的線程属铁。
若將observeOn(AndroidSchedulers.mainThread())去掉會怎么樣眠寿?不為消費事件show(s)指定線程后,map的處理和最后的消費事件show(s)都會在io線程中執(zhí)行焦蘑。observeOn() 指定的是它之后的操作所在的線程盯拱。
observeOn()可以多次使用,可以隨意變換線程
RxJava與Retrofit結(jié)合使用
使用Retrofit傳統(tǒng)方式定義接口和使用
//定義接口
@GET("/user")
public void getUser(@Query("userId") String userId, Callback<User> callback);
//使用
getUser(userId, new Callback<User>() {
@Override
public void success(User user) {
userView.setUser(user);
}
@Override
public void failure(RetrofitError error) {
// Error handling
...
}
};
使用Retrofit+Rxjava定義接口和使用
//定義接口
@GET("/user")
public Observable<User> getUser(@Query("userId") String userId);
//使用
getUser(userId)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<User>() {
@Override
public void onNext(User user) {
userView.setUser(user);
}
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable error) {
// Error handling
...
}
});
Retrofit 把請求封裝進(jìn) Observable 例嘱,在請求結(jié)束后調(diào)用 onNext() 或在請求失敗后調(diào)用 onError()狡逢。
使用普通方式修改數(shù)據(jù)庫中User的值,耗時操作放在工作線程執(zhí)行拼卵,是異步操作奢浑。
getUser(userId, new Callback<User>() {
@Override
public void success(User user) {
new Thread() {
@Override
public void run() {
processUser(user); // 嘗試修正 User 數(shù)據(jù)
runOnUiThread(new Runnable() { // 切回 UI 線程
@Override
public void run() {
userView.setUser(user);
}
});
}).start();
}
@Override
public void failure(RetrofitError error) {
// Error handling
...
}
};
使用RxJava
getUser(userId)
.doOnNext(new Action1<User>() {
@Override
public void call(User user) {
processUser(user);
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<User>() {
@Override
public void onNext(User user) {
userView.setUser(user);
}
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable error) {
// Error handling
...
}
});