定義
RxJava 在 GitHub 主頁上的自我介紹是 "a library for composing asynchronous and event-based programs using observable sequences for the Java VM"(一個在 Java VM 上使用可觀測的序列來組成異步的卿堂、基于事件的程序的庫)击纬。
說到底,它就是一個實現(xiàn)異步操作的庫。
RxJava好在哪
一個詞:簡潔率触。
示例:假設有這樣一個需求:界面上有一個自定義的視圖 imageCollectorView 抵碟,它的作用是顯示多張圖片豹储,并能使用 addImage(Bitmap) 方法來任意增加顯示的圖片〈ぃ現(xiàn)在需要程序?qū)⒁粋€給出的目錄數(shù)組 File[] folders 中每個目錄下的 png 圖片都加載出來并顯示在 imageCollectorView 中。需要注意的是剥扣,由于讀取圖片的這一過程較為耗時巩剖,需要放在后臺執(zhí)行,而圖片的顯示則必須在 UI 線程執(zhí)行钠怯。
普通寫法:
new Thread() {
@Override
public void run() {
super.run();
for (File folder : folders) {
File[] files = folder.listFiles();
for (File file : files) {
if (file.getName().endsWith(".png")) {
final Bitmap bitmap = getBitmapFromFile(file);
getActivity().runOnUiThread(new Runnable() {
@Override
public void run() {
imageCollectorView.addImage(bitmap);
}
});
}
}
}
}
}.start();
RxJava寫法:
Observable.from(folders)
.flatMap{ Observable.from(file.listFiles()) }
.filter{ file.getName().endsWith(".png") }
.map{ getBitmapFromFile(file) }
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe{ imageCollectorView.addImage(bitmap) }
API介紹和原理解析
RxJava 的觀察者模式
RxJava 有四個基本概念:Observable (可觀察者球及,即被觀察者)、 Observer (觀察者)呻疹、 subscribe (訂閱)、事件。Observable 和 Observer 通過 subscribe() 方法實現(xiàn)訂閱關系刽锤,從而 Observable 可以在需要的時候發(fā)出事件來通知 Observer镊尺。
與傳統(tǒng)觀察者模式不同, RxJava 的事件回調(diào)方法除了普通事件 onNext() (相當于 onClick() / onEvent())之外并思,還定義了兩個特殊的事件:onCompleted() 和 onError()庐氮。
onCompleted(): 事件隊列完結(jié)。RxJava 不僅把每個事件單獨處理宋彼,還會把它們看做一個隊列弄砍。RxJava 規(guī)定,當不會再有新的 onNext() 發(fā)出時输涕,需要觸發(fā) onCompleted() 方法作為標志音婶。
onError():事件隊列異常。在事件處理過程中出異常時莱坎,onError() 會被觸發(fā)衣式,同時隊列自動終止,不允許再有事件發(fā)出檐什。
在一個正確運行的事件序列中, onCompleted() 和 onError() 有且只有一個碴卧,并且是事件序列中的最后一個。需要注意的是乃正,onCompleted() 和 onError() 二者也是互斥的住册,即在隊列中調(diào)用了其中一個,就不應該再調(diào)用另一個瓮具。
基本實現(xiàn)
1.創(chuàng)建 Observer(觀察者荧飞,它決定事件觸發(fā)的時候?qū)⒂性鯓拥男袨?
Observer<String> observer = new Observer<String>() {
@Override
public void onNext(String s) {
Log.d(tag, "Item: " + s);
}
@Override
public void onCompleted() {
Log.d(tag, "Completed!");
}
@Override
public void onError(Throwable e) {
Log.d(tag, "Error!");
}
};
除了 Observer 接口之外,RxJava 還內(nèi)置了一個實現(xiàn)了 Observer 的抽象類:Subscriber搭综。 Subscriber 對 Observer 接口進行了一些擴展垢箕,但他們的基本使用方式是完全一樣的:
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onNext(String s) {
Log.d(tag, "Item: " + s);
}
@Override
public void onCompleted() {
Log.d(tag, "Completed!");
}
@Override
public void onError(Throwable e) {
Log.d(tag, "Error!");
}
};
實質(zhì)上,在 RxJava 的 subscribe 過程中兑巾,Observer 也總是會先被轉(zhuǎn)換成一個 Subscriber 再使用条获。所以如果你只想使用基本功能,選擇 Observer 和 Subscriber 是完全一樣的蒋歌。它們的區(qū)別對于使用者來說主要有兩點:
onStart():這是 Subscriber 增加的方法帅掘。它會在 subscribe 剛開始,而事件還未發(fā)送之前被調(diào)用堂油,可以用于做一些準備工作修档,例如數(shù)據(jù)的清零或重置。這是一個可選方法府框,默認情況下它的實現(xiàn)為空吱窝。需要注意的是,如果對準備工作的線程有要求(例如彈出一個顯示進度的對話框,這必須在主線程執(zhí)行)院峡, onStart() 就不適用了兴使,因為它總是在 subscribe 所發(fā)生的線程被調(diào)用,而不能指定線程照激。
unsubscribe():這是 Subscriber 所實現(xiàn)的另一個接口 Subscription 的方法发魄,用于取消訂閱。在這個方法被調(diào)用后俩垃,Subscriber 將不再接收事件励幼。unsubscribe() 這個方法很重要,因為在 subscribe() 之后口柳, Observable 會持有 Subscriber 的引用苹粟,這個引用如果不能及時被釋放,將有內(nèi)存泄露的風險啄清。所以最好保持一個原則:要在不再使用的時候盡快在合適的地方(例如 onPause() onStop() 等方法中)調(diào)用 unsubscribe() 來解除引用關系六水,以避免內(nèi)存泄露的發(fā)生。
2.創(chuàng)建 Observable(被觀察者辣卒,它決定什么時候觸發(fā)事件以及觸發(fā)怎樣的事件)
Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("Hello");
subscriber.onNext("Hi");
subscriber.onNext("Aloha");
subscriber.onCompleted();
}
});
可以看到掷贾,這里傳入了一個 OnSubscribe 對象作為參數(shù)。OnSubscribe 會被存儲在返回的 Observable 對象中荣茫,它的作用相當于一個計劃表想帅,當 Observable 被訂閱的時候,OnSubscribe 的 call() 方法會自動被調(diào)用啡莉,事件序列就會依照設定依次觸發(fā)(對于上面的代碼港准,就是觀察者Subscriber 將會被調(diào)用三次 onNext() 和一次 onCompleted())。這樣咧欣,由被觀察者調(diào)用了觀察者的回調(diào)方法浅缸,就實現(xiàn)了由被觀察者向觀察者的事件傳遞,即觀察者模式魄咕。
create() 方法是 RxJava 最基本的創(chuàng)造事件序列的方法衩椒。基于這個方法哮兰, RxJava 還提供了一些方法用來快捷創(chuàng)建事件隊列毛萌,例如:
- just(T...) :將傳入的參數(shù)依次發(fā)送出來。
Observable observable = Observable.just("Hello", "Hi", "Aloha");
// 將會依次調(diào)用:
// onNext("Hello");
// onNext("Hi");
// onNext("Aloha");
// onCompleted();
- from(T[]) / from(Iterable<? extends T>) :將傳入的數(shù)組或 Iterable 拆分成具體對象后喝滞,依次發(fā)送出來阁将。
String[] words = {"Hello", "Hi", "Aloha"};
Observable observable = Observable.from(words);
// 將會依次調(diào)用:
// onNext("Hello");
// onNext("Hi");
// onNext("Aloha");
// onCompleted();
3.Subscribe (訂閱)
創(chuàng)建了 Observable 和 Observer 之后,再用 subscribe() 方法將它們聯(lián)結(jié)起來右遭,整條鏈子就可以工作了做盅。
observable.subscribe(observer);
// 或者:
observable.subscribe(subscriber);
除了 subscribe(Observer) 和 subscribe(Subscriber) 缤削,subscribe() 還支持不完整定義的回調(diào),RxJava 會自動根據(jù)定義創(chuàng)建出 Subscriber 吹榴。
Action1<String> onNextAction = new Action1<String>() {
// onNext()
@Override
public void call(String s) {
Log.d(tag, s);
}
};
Action1<Throwable> onErrorAction = new Action1<Throwable>() {
// onError()
@Override
public void call(Throwable throwable) {
// Error handling
}
};
Action0 onCompletedAction = new Action0() {
// onCompleted()
@Override
public void call() {
Log.d(tag, "completed");
}
};
// 自動創(chuàng)建 Subscriber 僻他,并使用 onNextAction 來定義 onNext()
observable.subscribe(onNextAction);
// 自動創(chuàng)建 Subscriber ,并使用 onNextAction 和 onErrorAction 來定義 onNext() 和 onError()
observable.subscribe(onNextAction, onErrorAction);
// 自動創(chuàng)建 Subscriber 腊尚,并使用 onNextAction、 onErrorAction 和 onCompletedAction 來定義 onNext()满哪、 onError() 和 onCompleted()
observable.subscribe(onNextAction, onErrorAction, onCompletedAction);
Action0 是 RxJava 的一個接口婿斥,它只有一個方法 call(),這個方法是無參無返回值的哨鸭;由于 onCompleted() 方法也是無參無返回值的民宿,因此 Action0 可以被當成一個包裝對象,將 onCompleted() 的內(nèi)容打包起來將自己作為一個參數(shù)傳入 subscribe() 以實現(xiàn)不完整定義的回調(diào)像鸡。這樣其實也可以看做將 onCompleted() 方法作為參數(shù)傳進了 subscribe()活鹰,相當于其他某些語言中的『閉包』。
Action1 也是一個接口只估,它同樣只有一個方法 call(T param)志群,這個方法也無返回值,但有一個參數(shù)蛔钙;與 Action0 同理锌云,由于 onNext(T obj) 和 onError(Throwable error) 也是單參數(shù)無返回值的,因此 Action1 可以將 onNext(obj) 和 onError(error) 打包起來傳入 subscribe() 以實現(xiàn)不完整定義的回調(diào)吁脱。
事實上桑涎,雖然 Action0 和 Action1 在 API 中使用最廣泛,但 RxJava 是提供了多個 ActionX 形式的接口 (例如 Action2, Action3) 的兼贡,它們可以被用以包裝不同的無返回值的方法攻冷。
4.場景示例
將字符串數(shù)組 names 中的所有字符串依次打印出來:
String[] names = ...;
Observable.from(names)
.subscribe(new Action1<String>() {
@Override
public void call(String name) {
Log.d(tag, name);
}
});
由 id 取得圖片并顯示
int drawableRes = ...;
ImageView imageView = ...;
Observable.create(new OnSubscribe<Drawable>() {
@Override
public void call(Subscriber<? super Drawable> subscriber) {
Drawable drawable = getTheme().getDrawable(drawableRes));
subscriber.onNext(drawable);
subscriber.onCompleted();
}
}).subscribe(new Observer<Drawable>() {
@Override
public void onNext(Drawable drawable) {
imageView.setImageDrawable(drawable);
}
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
Toast.makeText(activity, "Error!", Toast.LENGTH_SHORT).show();
}
});
在 RxJava 的默認規(guī)則中,事件的發(fā)出和消費都是在同一個線程的遍希。也就是說等曼,如果只用上面的方法,實現(xiàn)出來的只是一個同步的觀察者模式孵班。觀察者模式本身的目的就是『后臺處理涉兽,前臺回調(diào)』的異步機制,因此異步對于 RxJava 是至關重要的篙程。而要實現(xiàn)異步枷畏,則需要用到 RxJava 的另一個概念: Scheduler 。
Scheduler(線程控制)
在不指定線程的情況下虱饿, RxJava 遵循的是線程不變的原則拥诡,即:在哪個線程調(diào)用 subscribe()触趴,就在哪個線程生產(chǎn)事件;在哪個線程生產(chǎn)事件渴肉,就在哪個線程消費事件冗懦。如果需要切換線程,就需要用到 Scheduler (調(diào)度器)仇祭。
Scheduler 的 API
在RxJava 中披蕉,Scheduler ——調(diào)度器,相當于線程控制器乌奇,RxJava 通過它來指定每一段代碼應該運行在什么樣的線程没讲。RxJava 已經(jīng)內(nèi)置了幾個 Scheduler ,它們已經(jīng)適合大多數(shù)的使用場景:
Schedulers.immediate(): 直接在當前線程運行礁苗,相當于不指定線程爬凑。這是默認的 Scheduler。
Schedulers.newThread(): 總是啟用新線程试伙,并在新線程執(zhí)行操作嘁信。
Schedulers.io(): I/O 操作(讀寫文件、讀寫數(shù)據(jù)庫疏叨、網(wǎng)絡信息交互等)所使用的 Scheduler潘靖。行為模式和 newThread() 差不多,區(qū)別在于 io() 的內(nèi)部實現(xiàn)是是用一個無數(shù)量上限的線程池考廉,可以重用空閑的線程秘豹,因此多數(shù)情況下 io() 比 newThread() 更有效率。不要把計算工作放在 io() 中昌粤,可以避免創(chuàng)建不必要的線程既绕。
有了這幾個 Scheduler ,就可以使用 subscribeOn() 和 observeOn() 兩個方法來對線程進行控制了涮坐。
subscribeOn(): 指定 subscribe() 所發(fā)生的線程凄贩,即 Observable.OnSubscribe 被激活時所處的線程「ざ铮或者叫做事件產(chǎn)生的線程疲扎。
observeOn(): 指定 Subscriber 所運行在的線程〗莸瘢或者叫做事件消費的線程椒丧。
Observable.just(1, 2, 3, 4)
.subscribeOn(Schedulers.io()) // 指定 subscribe() 發(fā)生在 IO 線程
.observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回調(diào)發(fā)生在主線程
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer number) {
Log.d(tag, "number:" + number);
}
});
線程多次切換
因為 observeOn() 指定的是 Subscriber 的線程,而這個 Subscriber 并不一定是subscribe() 參數(shù)中的 Subscriber 救巷,而是 observeOn() 執(zhí)行時的當前 Observable 所對應的 Subscriber 壶熏,即它的直接下級 Subscriber 。換句話說浦译,observeOn() 指定的是它之后的操作所在的線程棒假。因此如果有多次切換線程的需求溯职,只要在每個想要切換線程的位置調(diào)用一次 observeOn() 即可。
Observable.just(1, 2, 3, 4) // IO 線程帽哑,由 subscribeOn() 指定
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.newThread())
.map(mapOperator) // 新線程谜酒,由 observeOn() 指定
.observeOn(Schedulers.io())
.map(mapOperator2) // IO 線程,由 observeOn() 指定
.observeOn(AndroidSchedulers.mainThread)
.subscribe(subscriber); // Android 主線程妻枕,由 observeOn() 指定
不過僻族,不同于 observeOn() , subscribeOn() 的位置放在哪里都可以屡谐,但它是只能調(diào)用一次的鹰贵。
doOnSubscribe()
Subscriber 的 onStart() 可以用作流程開始前的初始化。然而 onStart() 由于在 subscribe() 發(fā)生時就被調(diào)用了康嘉,因此不能指定線程,而是只能執(zhí)行在 subscribe() 被調(diào)用時的線程籽前。這就導致如果 onStart() 中含有對線程有要求的代碼亭珍,將會有線程非法的風險,因為有時你無法預測 subscribe() 將會在什么線程執(zhí)行枝哄。
而與 Subscriber.onStart() 相對應的肄梨,有一個方法 Observable.doOnSubscribe() 。它和 Subscriber.onStart() 同樣是在 subscribe() 調(diào)用后而且在事件發(fā)送前執(zhí)行挠锥,但區(qū)別在于它可以指定線程众羡。默認情況下, doOnSubscribe() 執(zhí)行在 subscribe() 發(fā)生的線程蓖租;而如果在 doOnSubscribe() 之后有 subscribeOn() 的話粱侣,它將執(zhí)行在離它最近的 subscribeOn() 所指定的線程。
Observable.create(onSubscribe)
.subscribeOn(Schedulers.io())
.doOnSubscribe(new Action0() {
@Override
public void call() {
progressBar.setVisibility(View.VISIBLE); // 需要在主線程執(zhí)行
}
})
.subscribeOn(AndroidSchedulers.mainThread()) // 指定主線程
.observeOn(AndroidSchedulers.mainThread())
.subscribe(subscriber);
如上蓖宦,在 doOnSubscribe()的后面跟一個 subscribeOn() 齐婴,就能指定準備工作的線程了。
變換
RxJava 提供了對事件序列進行變換的支持稠茂,這是它的核心功能之一柠偶。所謂變換,就是將事件序列中的對象或整個序列進行加工處理睬关,轉(zhuǎn)換成不同的事件或事件序列诱担。
首先看一個 map() 的例子:
Observable.just("images/logo.png") // 輸入類型 String
.map(new Func1<String, Bitmap>() {
@Override
public Bitmap call(String filePath) { // 參數(shù)類型 String
return getBitmapFromPath(filePath); // 返回類型 Bitmap
}
})
.subscribe(new Action1<Bitmap>() {
@Override
public void call(Bitmap bitmap) { // 參數(shù)類型 Bitmap
showBitmap(bitmap);
}
});
這里出現(xiàn)了一個叫做 Func1 的類。它和 Action1 非常相似电爹,也是 RxJava 的一個接口蔫仙,用于包裝含有一個參數(shù)的方法。 Func1 和 Action 的區(qū)別在于藐不, Func1 包裝的是有返回值的方法匀哄。另外秦效,和 ActionX 一樣, FuncX 也有多個涎嚼,用于不同參數(shù)個數(shù)的方法阱州。FuncX 和 ActionX 的區(qū)別在 FuncX 包裝的是有返回值的方法。
可以看到法梯,map() 方法將參數(shù)中的 String 對象轉(zhuǎn)換成一個 Bitmap 對象后返回苔货,而在經(jīng)過 map() 方法后,事件的參數(shù)類型也由 String 轉(zhuǎn)為了 Bitmap立哑。這種直接變換對象并返回的夜惭,是最常見的也最容易理解的變換。不過 RxJava 的變換遠不止這樣铛绰,它不僅可以針對事件對象诈茧,還可以針對整個事件隊列,這使得 RxJava 變得非常靈活捂掰。我列舉幾個常用的變換:
- map(): 事件對象的直接變換敢会,它是 RxJava 最常用的變換。
-
flatMap():
示例:假設有一個數(shù)據(jù)結(jié)構『學生』这嚣,要打印出每個學生所需要修的所有課程的名稱鸥昏。
一般寫法:
Student[] students = ...;
Subscriber<Student> subscriber = new Subscriber<Student>() {
@Override
public void onNext(Student student) {
List<Course> courses = student.getCourses();
for (int i = 0; i < courses.size(); i++) {
Course course = courses.get(i);
Log.d(tag, course.getName());
}
}
...
};
Observable.from(students)
.subscribe(subscriber);
flatMap()寫法:
Student[] students = ...;
Subscriber<Course> subscriber = new Subscriber<Course>() {
@Override
public void onNext(Course course) {
Log.d(tag, course.getName());
}
...
};
Observable.from(students)
.flatMap(new Func1<Student, Observable<Course>>() {
@Override
public Observable<Course> call(Student student) {
return Observable.from(student.getCourses());
}
})
.subscribe(subscriber);
從上面的代碼可以看出, flatMap() 和 map() 有一個相同點:它也是把傳入的參數(shù)轉(zhuǎn)化之后返回另一個對象姐帚。但需要注意吏垮,和 map() 不同的是, flatMap() 中返回的是個 Observable 對象罐旗,并且這個 Observable 對象并不是被直接發(fā)送到了 Subscriber 的回調(diào)方法中膳汪。 flatMap() 的原理是這樣的:1. 使用傳入的事件對象創(chuàng)建一個 Observable 對象;2. 并不發(fā)送這個 Observable, 而是將它激活九秀,于是它開始發(fā)送事件旅敷;3. 每一個創(chuàng)建出來的 Observable 發(fā)送的事件,都被匯入同一個 Observable 颤霎,而這個 Observable 負責將這些事件統(tǒng)一交給 Subscriber 的回調(diào)方法媳谁。
小結(jié)
對于 Android 開發(fā)者來說, RxJava 是一個很難上手的庫友酱,因為它對于 Android 開發(fā)者來說有太多陌生的概念晴音「危可是它真的很牛逼夺姑。
使用場景:
- RxJava + Retrofit 實現(xiàn)網(wǎng)絡請求;
@GET("/user")
public Observable<User> getUser(@Query("userId") String userId);
getUser(userId)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<User>() {
@Override
public void onNext(User user) {
userView.setUser(user);
}
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable error) {
// Error handling
...
}
});
- RxBinding(一組開源庫狼讨,它允許你以RxJava的形式來處理UI事件)或详;
Button button = ...;
RxView.clickEvents(button) // 以 Observable 形式來反饋點擊事件
.throttleFirst(500, TimeUnit.MILLISECONDS)
.subscribe(new Action1<ViewClickEvent>() {
@Override
public void call(ViewClickEvent event) {
// Click handling
}
});
- 各種異步操作系羞;
- RxBus等...