RxJava學(xué)習(xí)
RxJava 是什么
異步 : 一個在 Java VM 上使用可觀測的序列來組成異步的、基于事件的程序的庫
RxJava的優(yōu)點
簡潔 : 隨著程序邏輯變得越來越復(fù)雜讯壶,它依然能夠保持簡潔
RxJava 四個重要的概念
- Observable (可觀察者揽浙,即被觀察者)
- Observer (觀察者)
- subscribe (訂閱)
- 事件
Observable 和 Observer 通過 subscribe() 方法實現(xiàn)訂閱關(guān)系状婶,從而 Observable 可以在需要的時候發(fā)出事件來通知 Observer。
RxJava觀察者模式
與傳統(tǒng)觀察者模式不同馅巷, RxJava 的事件回調(diào)方法除了普通事件 onNext() (相當(dāng)于 onClick() / onEvent())之外膛虫,還定義了兩個特殊的事件:onCompleted() 和 onError()。
- onCompleted(): 事件隊列完結(jié)钓猬。RxJava不僅把每個事件單獨處理稍刀,還會把它們看做一個隊列。RxJava 規(guī)定敞曹,當(dāng)不會再有新的 onNext() 發(fā)出時账月,需要觸發(fā)onCompleted()方法作為標(biāo)志。
- onError(): 事件隊列異常澳迫。在事件處理過程中出異常時局齿,onError() 會被觸發(fā),同時隊列自動終止橄登,不允許再有事件發(fā)出抓歼。
- 在一個正確運行的事件序列中, onCompleted() 和 onError() 有且只有一個,并且是事件序列中的最后一個拢锹。需要注意的是谣妻,onCompleted() 和 onError() 二者也是互斥的,即在隊列中調(diào)用了其中一個面褐,就不應(yīng)該再調(diào)用另一個拌禾。
RxJava基本實現(xiàn)
1 創(chuàng)建Observer
Observer observer = new Observer<String>{
public void onNext(String s){
Log.i(tag,s);
}
public void onCompelte(){
Log.i(tag,"ok");
}
public void onError(Throwable e){
Log.i(tag,e.toString);
}
}
除了 Observer 接口之外,RxJava 還內(nèi)置了一個實現(xiàn)了 Observer 的抽象類:Subscriber展哭。 Subscriber 對 Observer 接口進(jìn)行了一些擴(kuò)展湃窍,但他們的基本使用方式是完全一樣的:
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onNext(String s) {
Log.d(tag, "Item: " + s);
}
@Override
public void onCompleted() {
Log.d(tag, "Completed!");
}
@Override
public void onError(Throwable e) {
Log.d(tag, "Error!");
}
};
Observer 和 Subscriber 的區(qū)別
- onStart(): 這是 Subscriber 增加的方法。它會在 subscribe 剛開始匪傍,而事件還未發(fā)送之前被調(diào)用您市,可以用于做一些準(zhǔn)備工作,它總是在 subscribe 所發(fā)生的線程被調(diào)用役衡,而不能指定線程茵休。要在指定的線程來做準(zhǔn)備工作,可以使用 doOnSubscribe() 方法,具體可以在后面的文中看到
- unsubscribe(): 這是 Subscriber 所實現(xiàn)的另一個接口 Subscription 的方法榕莺,用于取消訂閱俐芯。在這個方法被調(diào)用后,Subscriber 將不再接收事件钉鸯。一般在這個方法調(diào)用前吧史,可以使用 isUnsubscribed() 先判斷一下狀態(tài)。 unsubscribe() 這個方法很重要唠雕,因為在 subscribe() 之后贸营, Observable 會持有 Subscriber 的引用,這個引用如果不能及時被釋放岩睁,將有內(nèi)存泄露的風(fēng)險钞脂。所以最好保持一個原則:要在不再使用的時候盡快在合適的地方(例如 onPause() onStop() 等方法中)調(diào)用 unsubscribe() 來解除引用關(guān)系,以避免內(nèi)存泄露的發(fā)生捕儒。
2 創(chuàng)建 Observable
Observable 即被觀察者冰啃,它決定什么時候觸發(fā)事件以及觸發(fā)怎樣的事件。 RxJava 使用 create() 方法來創(chuàng)建一個 Observable 肋层,并為它定義事件觸發(fā)規(guī)則:
Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("Hello");
subscriber.onNext("Hi");
subscriber.onNext("Aloha");
subscriber.onCompleted();
}
});
create() 方法是 RxJava 最基本的創(chuàng)造事件序列的方法亿笤。基于這個方法栋猖, RxJava 還提供了一些方法用來快捷創(chuàng)建事件隊列净薛,例如:
- just(T...): 將傳入的參數(shù)依次發(fā)送出來。
Observable observable = Observable.just("Hello", "Hi", "Aloha");
// 將會依次調(diào)用:
// onNext("Hello");
// onNext("Hi");
// onNext("Aloha");
// onCompleted();
- from(T[]) / from(Iterable<? extends T>) : 將傳入的數(shù)組或 Iterable 拆分成具體對象后蒲拉,依次發(fā)送出來
String[] words = {"Hello", "Hi", "Aloha"};
Observable observable = Observable.from(words);
// 將會依次調(diào)用:
// onNext("Hello");
// onNext("Hi");
// onNext("Aloha");
// onCompleted();
上面 just(T...) 的例子和 from(T[]) 的例子肃拜,都和之前的 create(OnSubscribe) 的例子是等價的。
3 Subscribe (訂閱)
創(chuàng)建了 Observable 和 Observer 之后雌团,再用 subscribe() 方法將它們聯(lián)結(jié)起來燃领,整條鏈子就可以工作了。代碼形式很簡單:
observable.subscribe(observer);
// 或者:
observable.subscribe(subscriber);
Observable.subscribe(Subscriber) 的內(nèi)部實現(xiàn)是這樣的(僅核心代碼):
// 注意:這不是 subscribe() 的源碼锦援,而是將源碼中與性能猛蔽、兼容性、擴(kuò)展性有關(guān)的代碼剔除后的核心代碼灵寺。
// 如果需要看源碼曼库,可以去 RxJava 的 GitHub 倉庫下載。
public Subscription subscribe(Subscriber subscriber) {
subscriber.onStart();
onSubscribe.call(subscriber);
return subscriber;
}
可以看到略板,subscriber() 做了3件事:
- 調(diào)用 Subscriber.onStart()
這個方法在前面已經(jīng)介紹過毁枯,是一個可選的準(zhǔn)備方法匹厘。 - 調(diào)用 Observable 中的 OnSubscribe.call(Subscriber) 现拒。在這里勾效,事件發(fā)送的邏輯開始運行置济。從這也可以看出陵刹,在 RxJava 中憔狞, Observable 并不是在創(chuàng)建的時候就立即開始發(fā)送事件送淆,而是在它被訂閱的時候嫌蚤,即當(dāng) subscribe() 方法執(zhí)行的時候。
- 將傳入的 Subscriber 作為 Subscription 返回右锨。這是為了方便 unsubscribe().
除了 subscribe(Observer) 和 subscribe(Subscriber) 括堤,subscribe() 還支持不完整定義的回調(diào)碌秸,RxJava 會自動根據(jù)定義創(chuàng)建出 Subscriber 绍移。形式如下:
Action1<String> onNextAction = new Action1<String>() {
// onNext()
@Override
public void call(String s) {
Log.d(tag, s);
}
};
Action1<Throwable> onErrorAction = new Action1<Throwable>() {
// onError()
@Override
public void call(Throwable throwable) {
// Error handling
}
};
Action0 onCompletedAction = new Action0() {
// onCompleted()
@Override
public void call() {
Log.d(tag, "completed");
}
};
// 自動創(chuàng)建 Subscriber ,并使用 onNextAction 來定義 onNext()
observable.subscribe(onNextAction);
// 自動創(chuàng)建 Subscriber 讥电,并使用 onNextAction 和 onErrorAction 來定義 onNext() 和 onError()
observable.subscribe(onNextAction, onErrorAction);
// 自動創(chuàng)建 Subscriber 蹂窖,并使用 onNextAction、 onErrorAction 和 onCompletedAction 來定義 onNext()恩敌、 onError() 和 onCompleted()
observable.subscribe(onNextAction, onErrorAction, onCompletedAction);
4 場景示例
a. 打印字符串?dāng)?shù)組
將字符串?dāng)?shù)組 names 中的所有字符串依次打印出來:
String[] names = ...;
Observable.from(names)
.subscribe(new Action1<String>() {
@Override
public void call(String name) {
Log.d(tag, name);
}
});
b.由 id 取得圖片并顯示
由指定的一個 drawable 文件 id drawableRes 取得圖片瞬测,并顯示在 ImageView 中,并在出現(xiàn)異常的時候打印 Toast 報錯:
int drawableRes = ...;
ImageView imageView = ...;
Observable.create(new OnSubscribe<Drawable>() {
@Override
public void call(Subscriber<? super Drawable> subscriber) {
Drawable drawable = getTheme().getDrawable(drawableRes));
subscriber.onNext(drawable);
subscriber.onCompleted();
}
}).subscribe(new Observer<Drawable>() {
@Override
public void onNext(Drawable drawable) {
imageView.setImageDrawable(drawable);
}
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
Toast.makeText(activity, "Error!", Toast.LENGTH_SHORT).show();
}
});
線程控制 —— Scheduler (一)
在不指定線程的情況下纠炮, RxJava 遵循的是線程不變的原則月趟,即:在哪個線程調(diào)用 subscribe(),就在哪個線程生產(chǎn)事件恢口;在哪個線程生產(chǎn)事件孝宗,就在哪個線程消費事件。如果需要切換線程耕肩,就需要用到 Scheduler (調(diào)度器)因妇。
1) Scheduler 的 API (一)
在RxJava 中,Scheduler ——調(diào)度器猿诸,相當(dāng)于線程控制器婚被,RxJava 通過它來指定每一段代碼應(yīng)該運行在什么樣的線程。RxJava 已經(jīng)內(nèi)置了幾個 Scheduler 梳虽,它們已經(jīng)適合大多數(shù)的使用場景:
- Schedulers.immediate(): 直接在當(dāng)前線程運行址芯,相當(dāng)于不指定線程。這是默認(rèn)的 Scheduler窜觉。
- Schedulers.newThread(): 總是啟用新線程谷炸,并在新線程執(zhí)行操作。
- Schedulers.io(): I/O 操作(讀寫文件竖螃、讀寫數(shù)據(jù)庫淑廊、網(wǎng)絡(luò)信息交互等)所使用的 Scheduler。行為模式和 newThread() 差不多特咆,區(qū)別在于 io() 的內(nèi)部實現(xiàn)是是用一個無數(shù)量上限的線程池季惩,可以重用空閑的線程录粱,因此多數(shù)情況下 io() 比 newThread() 更有效率。不要把計算工作放在 io() 中画拾,可以避免創(chuàng)建不必要的線程啥繁。
- Schedulers.computation(): 計算所使用的 Scheduler。這個計算指的是 CPU 密集型計算青抛,即不會被 I/O 等操作限制性能的操作旗闽,例如圖形的計算。這個 Scheduler 使用的固定的線程池蜜另,大小為 CPU 核數(shù)适室。不要把 I/O 操作放在 computation() 中,否則 I/O 操作的等待時間會浪費 CPU举瑰。
- 另外捣辆, Android 還有一個專用的 AndroidSchedulers.mainThread(),它指定的操作將在 Android 主線程運行此迅。
有了這幾個 Scheduler 汽畴,就可以使用 subscribeOn() 和 observeOn() 兩個方法來對線程進(jìn)行控制了。 * subscribeOn(): 指定 subscribe() 所發(fā)生的線程耸序,即 Observable.OnSubscribe 被激活時所處的線程忍些。或者叫做事件產(chǎn)生的線程坎怪。 * observeOn(): 指定 Subscriber 所運行在的線程罢坝。或者叫做事件消費的線程芋忿。
Observable.just(1, 2, 3, 4)
.subscribeOn(Schedulers.io()) // 指定 subscribe() 發(fā)生在 IO 線程
.observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回調(diào)發(fā)生在主線程
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer number) {
Log.d(tag, "number:" + number);
}
});
2) RxJava subscribeOn 和 observeOn 的區(qū)別
subscribeOn
subscribeOn的調(diào)用炸客,改變了調(diào)用前序列所運行的線程。
一般來說戈钢,Observable使用subscribeOn
observeOn
- observeOn 對調(diào)用之前的序列默不關(guān)心痹仙,也不會要求之前的序列運行在指定的線程上
- observeOn 對之前的序列產(chǎn)生的結(jié)果先緩存起來,然后再在指定的線程上殉了,推送給最終的subscriber
- 一般來說开仰,Subscriber 使用observeOn
例子說明
Observable
.map // 操作1
.flatMap // 操作2
.subscribeOn(io)
.map //操作3
.flatMap //操作4
.observeOn(main)
.map //操作5
.flatMap //操作6
.subscribeOn(io) //!!特別注意
.subscribe(handleData)
假設(shè)這里我們是在主線程上調(diào)用這段代碼,
那么
- 操作1薪铜,操作2是在io線程上众弓,因為之后subscribeOn切換了線程
- 操作3,操作4也是在io線程上隔箍,因為在subscribeOn切換了線程之后谓娃,并沒有發(fā)生改變。
- 操作5蜒滩,操作6是在main線程上滨达,因為在他們之前的observeOn切換了線程奶稠。
- 特別注意那一段,對于操作5和操作6是無效的
- 再簡單點總結(jié)就是
- subscribeOn的調(diào)用切換之前的線程捡遍。
- observeOn的調(diào)用切換之后的線程锌订。
- observeOn之后,不可再調(diào)用subscribeOn 切換線程
- subscribeOn的調(diào)用切換之前的線程画株。
- observeOn的調(diào)用切換之后的線程辆飘。
- observeOn之后,不可再調(diào)用subscribeOn 切換線程
4. 變換
RxJava 提供了對事件序列進(jìn)行變換的支持谓传,這是它的核心功能之一蜈项,也是大多數(shù)人說『RxJava 真是太好用了』的最大原因。所謂變換良拼,就是將事件序列中的對象或整個序列進(jìn)行加工處理战得,轉(zhuǎn)換成不同的事件或事件序列 。
1) API
首先看一個 map() 的例子:
Observable.just("images/logo.png") // 輸入類型 String
.map(new Func1<String, Bitmap>() {
@Override
public Bitmap call(String filePath) { // 參數(shù)類型 String
return getBitmapFromPath(filePath); // 返回類型 Bitmap
}
})
.subscribe(new Action1<Bitmap>() {
@Override
public void call(Bitmap bitmap) { // 參數(shù)類型 Bitmap
showBitmap(bitmap);
}
});
這里出現(xiàn)了一個叫做 Func1 的類庸推。它和 Action1 非常相似,也是 RxJava 的一個接口浇冰,用于包裝含有一個參數(shù)的方法贬媒。 Func1 和 Action 的區(qū)別在于, Func1 包裝的是有返回值的方法肘习。另外际乘,和 ActionX 一樣, FuncX 也有多個漂佩,用于不同參數(shù)個數(shù)的方法脖含。FuncX 和 ActionX 的區(qū)別在 FuncX 包裝的是有返回值的方法。
可以看到投蝉,map() 方法將參數(shù)中的 String 對象轉(zhuǎn)換成一個 Bitmap 對象后返回养葵,而在經(jīng)過 map() 方法后,事件的參數(shù)類型也由 String 轉(zhuǎn)為了 Bitmap瘩缆。這種直接變換對象并返回的关拒,是最常見的也最容易理解的變換。不過 RxJava 的變換遠(yuǎn)不止這樣庸娱,它不僅可以針對事件對象着绊,還可以針對整個事件隊列,這使得 RxJava 變得非常靈活熟尉。我列舉幾個常用的變換:
- map(): 事件對象的直接變換归露,具體功能上面已經(jīng)介紹過。它是 RxJava 最常用的變換斤儿。 map() 的示意圖:
- flatMap(): 這是一個很有用但非常難理解的變換剧包,因此我決定花多些篇幅來介紹它腮考。 首先假設(shè)這么一種需求:假設(shè)有一個數(shù)據(jù)結(jié)構(gòu)『學(xué)生』,現(xiàn)在需要打印出一組學(xué)生的名字玄捕。實現(xiàn)方式很簡單:
Student[] students = ...;
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onNext(String name) {
Log.d(tag, name);
}
...
};
Observable.from(students)
.map(new Func1<Student, String>() {
@Override
public String call(Student student) {
return student.getName();
}
})
.subscribe(subscriber);
很簡單踩蔚。那么再假設(shè):如果要打印出每個學(xué)生所需要修的所有課程的名稱呢?(需求的區(qū)別在于枚粘,每個學(xué)生只有一個名字馅闽,但卻有多個課程。)首先可以這樣實現(xiàn):
Student[] students = ...;
Subscriber<Student> subscriber = new Subscriber<Student>() {
@Override
public void onNext(Student student) {
List<Course> courses = student.getCourses();
for (int i = 0; i < courses.size(); i++) {
Course course = courses.get(i);
Log.d(tag, course.getName());
}
}
...
};
Observable.from(students)
.subscribe(subscriber);
依然很簡單馍迄。那么如果我不想在 Subscriber 中使用 for 循環(huán)福也,而是希望 Subscriber 中直接傳入單個的 Course 對象呢(這對于代碼復(fù)用很重要)?用 map() 顯然是不行的攀圈,因為 map() 是一對一的轉(zhuǎn)化暴凑,而我現(xiàn)在的要求是一對多的轉(zhuǎn)化。那怎么才能把一個 Student 轉(zhuǎn)化成多個 Course 呢赘来?
這個時候现喳,就需要用 flatMap() 了:
Student[] students = ...;
Subscriber<Course> subscriber = new Subscriber<Course>() {
@Override
public void onNext(Course course) {
Log.d(tag, course.getName());
}
...
};
Observable.from(students)
.flatMap(new Func1<Student, Observable<Course>>() {
@Override
public Observable<Course> call(Student student) {
return Observable.from(student.getCourses());
}
})
.subscribe(subscriber);
從上面的代碼可以看出, flatMap() 和 map() 有一個相同點:它也是把傳入的參數(shù)轉(zhuǎn)化之后返回另一個對象犬辰。但需要注意嗦篱,和 map() 不同的是, flatMap() 中返回的是個 Observable 對象幌缝,并且這個 Observable 對象并不是被直接發(fā)送到了 Subscriber 的回調(diào)方法中灸促。
flatMap() 的原理是這樣的:
- 使用傳入的事件對象創(chuàng)建一個 Observable 對象;
- 并不發(fā)送這個 Observable, 而是將它激活涵卵,于是它開始發(fā)送事件浴栽;
- 每一個創(chuàng)建出來的 Observable 發(fā)送的事件,都被匯入同一個 Observable 轿偎,而這個 Observable 負(fù)責(zé)將這些事件統(tǒng)一交給 Subscriber 的回調(diào)方法典鸡。
- 這三個步驟,把事件拆成了兩級贴硫,通過一組新創(chuàng)建的 Observable 將初始的對象『鋪平』之后通過統(tǒng)一路徑分發(fā)了下去椿每。而這個『鋪平』就是 flatMap() 所謂的 flat。
flatMap() 示意圖:
2) 變換的原理:lift()
這些變換雖然功能各有不同英遭,但實質(zhì)上都是針對事件序列的處理和再發(fā)送间护。而在 RxJava 的內(nèi)部,它們是基于同一個基礎(chǔ)的變換方法: lift(Operator)挖诸。首先看一下 lift() 的內(nèi)部實現(xiàn)(僅核心代碼):
// 注意:這不是 lift() 的源碼汁尺,而是將源碼中與性能、兼容性多律、擴(kuò)展性有關(guān)的代碼剔除后的核心代碼痴突。
// 如果需要看源碼搂蜓,可以去 RxJava 的 GitHub 倉庫下載。
public <R> Observable<R> lift(Operator<? extends R, ? super T> operator) {
return Observable.create(new OnSubscribe<R>() {
@Override
public void call(Subscriber subscriber) {
Subscriber newSubscriber = operator.call(subscriber);
newSubscriber.onStart();
onSubscribe.call(newSubscriber);
}
});
}
本文為個人從下面文章摘錄辽装,為個人總結(jié)學(xué)習(xí)使用
給 Android 開發(fā)者的 RxJava 詳解