參考
學(xué)習(xí)資料匯總
深入淺出RxJava(一:基礎(chǔ)篇)作者:大頭鬼
給 Android 開(kāi)發(fā)者的 RxJava 詳解 作者:扔物線
BaronZhang RxJava系列
可能是東半球最全的RxJava使用場(chǎng)景小結(jié)
一夹姥、基本概念
RxJava最核心的兩個(gè)東西是Observables(被觀察者晌区,事件源)和Subscribers(觀察者)。Observables發(fā)出一系列事件家凯,Subscribers處理這些事件。這里的事件可以是任何你感興趣的東西(觸摸事件,web接口調(diào)用返回的數(shù)據(jù)鞍匾。。骑科。)
一個(gè)Observable可以發(fā)出零個(gè)或者多個(gè)事件橡淑,直到結(jié)束或者出錯(cuò)。每發(fā)出一個(gè)事件纵散,就會(huì)調(diào)用它的Subscriber的onNext方法梳码,最后調(diào)用Subscriber.onNext()或者Subscriber.onError()結(jié)束。
Rxjava的看起來(lái)很想設(shè)計(jì)模式中的觀察者模式伍掀,但是有一點(diǎn)明顯不同掰茶,那就是如果一個(gè)Observerble沒(méi)有任何的的Subscriber,那么這個(gè)Observable是不會(huì)發(fā)出任何事件的蜜笤。
Hello World
創(chuàng)建一個(gè)Observable對(duì)象很簡(jiǎn)單濒蒋,直接調(diào)用Observable.create即可
Observable<String> myObservable = Observable.create(
new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> sub) {
sub.onNext("Hello, world!");
sub.onCompleted();
}
}
);
這里定義的Observable對(duì)象僅僅發(fā)出一個(gè)Hello World字符串,然后就結(jié)束了把兔。接著我們創(chuàng)建一個(gè)Subscriber來(lái)處理Observable對(duì)象發(fā)出的字符串沪伙。
Subscriber<String> mySubscriber = new Subscriber<String>() {
@Override
public void onNext(String s) { System.out.println(s); }
@Override
public void onCompleted() { }
@Override
public void onError(Throwable e) { }
};
這里subscriber僅僅就是打印observable發(fā)出的字符串。通過(guò)subscribe函數(shù)就可以將我們定義的myObservable對(duì)象和mySubscriber對(duì)象關(guān)聯(lián)起來(lái)县好,這樣就完成了subscriber對(duì)observable的訂閱围橡。
myObservable.subscribe(mySubscriber);
一旦mySubscriber訂閱了myObservable,myObservable就是調(diào)用mySubscriber對(duì)象的onNext和onComplete方法缕贡,mySubscriber就會(huì)打印出Hello World翁授!
1.Observable (可觀察者,即被觀察者)
使用create來(lái)創(chuàng)建
Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("Hello");
subscriber.onNext("Hi");
subscriber.onNext("Aloha");
subscriber.onCompleted();
}
});
這里傳入了一個(gè) OnSubscribe 對(duì)象作為參數(shù)晾咪。OnSubscribe 會(huì)被存儲(chǔ)在返回的 Observable 對(duì)象中收擦,它的作用相當(dāng)于一個(gè)計(jì)劃表,當(dāng) Observable 被訂閱的時(shí)候谍倦,OnSubscribe 的 call() 方法會(huì)自動(dòng)被調(diào)用塞赂,事件序列就會(huì)依照設(shè)定依次觸發(fā)(對(duì)于上面的代碼,就是觀察者Subscriber 將會(huì)被調(diào)用三次 onNext() 和一次 onCompleted())昼蛀。
create方法寫(xiě)起來(lái)太長(zhǎng)宴猾,也有等價(jià)的簡(jiǎn)化寫(xiě)法just和from:
just(T...): 將傳入的參數(shù)依次發(fā)送出來(lái)。
from(T[]) / from(Iterable<? extends T>) : 將傳入的數(shù)組或 Iterable 拆分成具體對(duì)象后曹洽,依次發(fā)送出來(lái)鳍置。
Observable observable = Observable.just("Hello", "Hi", "Aloha");
// 將會(huì)依次調(diào)用:
// onNext("Hello");
// onNext("Hi");
// onNext("Aloha");
// onCompleted();
String[] words = {"Hello", "Hi", "Aloha"};
Observable observable = Observable.from(words);
// 將會(huì)依次調(diào)用:
// onNext("Hello");
// onNext("Hi");
// onNext("Aloha");
// onCompleted();
2.Observer (觀察者)
onNext()
onCompleted()
onError()
**3.Observer 的抽象類:Subscriber。 **
Subscriber<String> mySubscriber = new Subscriber<String>() {
@Override
public void onNext(String s) { System.out.println(s); }
@Override
public void onCompleted() { }
@Override
public void onError(Throwable e) { }
};
Subscriber 對(duì) Observer 接口進(jìn)行了一些擴(kuò)展送淆,但他們的基本使用方式是完全一樣的.實(shí)質(zhì)上,在 RxJava 的 subscribe 過(guò)程中怕轿,Observer 也總是會(huì)先被轉(zhuǎn)換成一個(gè) Subscriber 再使用偷崩。所以如果你只想使用基本功能辟拷,選擇 Observer 和 Subscriber 是完全一樣的。它們的區(qū)別對(duì)于使用者來(lái)說(shuō)主要有兩點(diǎn):
onStart(): 這是 Subscriber 增加的方法阐斜。它會(huì)在 subscribe 剛開(kāi)始衫冻,而事件還未發(fā)送之前被調(diào)用,可以用于做一些準(zhǔn)備工作谒出,例如數(shù)據(jù)的清零或重置隅俘。這是一個(gè)可選方法,默認(rèn)情況下它的實(shí)現(xiàn)為空笤喳。需要注意的是为居,如果對(duì)準(zhǔn)備工作的線程有要求(例如彈出一個(gè)顯示進(jìn)度的對(duì)話框,這必須在主線程執(zhí)行)杀狡, onStart() 就不適用了蒙畴,因?yàn)樗偸窃?subscribe 所發(fā)生的線程被調(diào)用,而不能指定線程呜象。要在指定的線程來(lái)做準(zhǔn)備工作膳凝,可以使用 doOnSubscribe() 方法,具體可以在后面的文中看到恭陡。
unsubscribe(): 這是 Subscriber 所實(shí)現(xiàn)的另一個(gè)接口 Subscription 的方法蹬音,用于取消訂閱。在這個(gè)方法被調(diào)用后休玩,Subscriber 將不再接收事件著淆。一般在這個(gè)方法調(diào)用前,可以使用 isUnsubscribed() 先判斷一下?tīng)顟B(tài)哥捕。 unsubscribe() 這個(gè)方法很重要牧抽,因?yàn)樵?subscribe() 之后, Observable 會(huì)持有 Subscriber 的引用遥赚,這個(gè)引用如果不能及時(shí)被釋放栏饮,將有內(nèi)存泄露的風(fēng)險(xiǎn)猬错。所以最好保持一個(gè)原則:要在不再使用的時(shí)候盡快在合適的地方(例如 onPause() onStop() 等方法中)調(diào)用 unsubscribe() 來(lái)解除引用關(guān)系,以避免內(nèi)存泄露的發(fā)生。
4.Subscribe方法 (訂閱)
Observable.subscribe(observer);
Observable.subscribe(Subscriber) 的內(nèi)部實(shí)現(xiàn)是這樣的(僅核心代碼):
// 注意:這不是 subscribe() 的源碼祭埂,而是將源碼中
//與性能、兼容性杯缺、擴(kuò)展性有關(guān)的代碼剔除后的核心代碼掏愁。
// 如果需要看源碼,可以去 RxJava 的 GitHub 倉(cāng)庫(kù)下載毫炉。
public Subscription subscribe(Subscriber subscriber) {
subscriber.onStart();
onSubscribe.call(subscriber);
return subscriber;
}
可以看到瓮栗,subscriber() 做了3件事:
調(diào)用 Subscriber.onStart() 。這個(gè)方法在前面已經(jīng)介紹過(guò),是一個(gè)可選的準(zhǔn)備方法费奸。
調(diào)用 Observable 中的 OnSubscribe.call(Subscriber) 弥激。在這里,事件發(fā)送的邏輯開(kāi)始運(yùn)行愿阐。從這也可以看出微服,在 RxJava 中, Observable 并不是在創(chuàng)建的時(shí)候就立即開(kāi)始發(fā)送事件缨历,而是在它被訂閱的時(shí)候以蕴,即當(dāng) subscribe() 方法執(zhí)行的時(shí)候。
將傳入的 Subscriber 作為 Subscription 返回辛孵。這是為了方便 unsubscribe().
5.Action
注:正如前面所提到的丛肮,Observer 和 Subscriber 具有相同的角色,而且 Observer 在 subscribe() 過(guò)程中最終會(huì)被轉(zhuǎn)換成 Subscriber 對(duì)象觉吭,因此腾供,從這里開(kāi)始,后面的描述我將用 Subscriber 來(lái)代替 Observer 鲜滩,這樣更加嚴(yán)謹(jǐn)伴鳖。
上面提到了使用just,from來(lái)簡(jiǎn)化next。現(xiàn)在說(shuō)一下用Action來(lái)簡(jiǎn)化 Subscriber徙硅。
上面的例子中榜聂,我們其實(shí)并不關(guān)心OnComplete和OnError,我們只需要在onNext的時(shí)候做一些處理嗓蘑,這時(shí)候就可以使用Action1類须肆。
Observable.just("Hello, world!")
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.println(s);
}
});
Action系列是沒(méi)有返回值的,只有一個(gè)Call方法桩皿。call方法里沒(méi)有參數(shù)就用action0豌汇,一個(gè)參數(shù)就用action1。注意Action中的call方法和observable的create方法中那個(gè)一訂閱就執(zhí)行的call方法是有區(qū)別的泄隔。
Action0 onCompletedAction = new Action0() {
// onCompleted()
@Override
public void call() {
Log.d(tag, "completed");
}
};
6.例子
a. 打印字符串?dāng)?shù)組
將字符串?dāng)?shù)組 names中的所有字符串依次打印出來(lái):
<pre>
String[] names = ...;
Observable.from(names)
.subscribe(new Action1<String>() {
@Override
public void call(String name) {
Log.d(tag, name);
}
});
</pre>
b. 由 id 取得圖片并顯示
由指定的一個(gè) drawable 文件 id drawableRes取得圖片拒贱,并顯示在 ImageView中,并在出現(xiàn)異常的時(shí)候打印 Toast 報(bào)錯(cuò):
<pre>
int drawableRes = ...;
ImageView imageView = ...;
Observable.create(new OnSubscribe<Drawable>() {
@Override
public void call(Subscriber<? super Drawable> subscriber) {
Drawable drawable = getTheme().getDrawable(drawableRes));
subscriber.onNext(drawable);
subscriber.onCompleted();
}
}).subscribe(new Observer<Drawable>() {
@Override
public void onNext(Drawable drawable) {
imageView.setImageDrawable(drawable);
}
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
Toast.makeText(activity, "Error!", Toast.LENGTH_SHORT).show();
}
});
</pre>
總結(jié):
根據(jù)訂閱者模式佛嬉,被訂閱者使用事件或回調(diào)逻澳,讓那個(gè)訂閱者去執(zhí)行某些事情。Observable在執(zhí)行subscribe時(shí)觸發(fā)了自己的call方法暖呕,去加載資源斜做,然后在加載完成時(shí)去觸發(fā)訂閱者的onNext,onCompleted.顯然,這不應(yīng)該都在一個(gè)線程湾揽,下面看一下切換線程瓤逼。
二笼吟、切換線程Scheduler
1.subscribeOn(): 指定subscribe()所發(fā)生的線程,即 Observable.OnSubscribe被激活時(shí)所處的線程抛姑≡薏蓿或者叫做事件產(chǎn)生的線程艳狐。
2.observeOn(): 指定Subscriber所運(yùn)行在的線程定硝。或者叫做事件消費(fèi)的線程毫目。
上面的例子可以改為蔬啡,在IO線程加載圖片,在UI線程顯示圖片:
<pre>
int drawableRes = ...;
ImageView imageView = ...;
Observable.create(new OnSubscribe<Drawable>() {
@Override
public void call(Subscriber<? super Drawable> subscriber) {
Drawable drawable = getTheme().getDrawable(drawableRes));
subscriber.onNext(drawable);
subscriber.onCompleted();
}
})
.subscribeOn(Schedulers.io()) // 指定 subscribe() 發(fā)生在 IO 線程
.observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回調(diào)發(fā)生在主線程
.subscribe(new Observer<Drawable>() {
@Override
public void onNext(Drawable drawable) {
imageView.setImageDrawable(drawable);
}
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
Toast.makeText(activity, "Error!", Toast.LENGTH_SHORT).show();
}
});
</pre>
在RxJava 中镀虐,Scheduler ——調(diào)度器箱蟆,相當(dāng)于線程控制器,RxJava 通過(guò)它來(lái)指定每一段代碼應(yīng)該運(yùn)行在什么樣的線程刮便。RxJava 已經(jīng)內(nèi)置了幾個(gè) Scheduler 空猜,它們已經(jīng)適合大多數(shù)的使用場(chǎng)景:
- Schedulers.immediate(): 直接在當(dāng)前線程運(yùn)行,相當(dāng)于不指定線程恨旱。這是默認(rèn)的 Scheduler辈毯。
- Schedulers.newThread(): 總是啟用新線程,并在新線程執(zhí)行操作搜贤。
- Schedulers.io(): I/O 操作(讀寫(xiě)文件谆沃、讀寫(xiě)數(shù)據(jù)庫(kù)、網(wǎng)絡(luò)信息交互等)所使用的 Scheduler仪芒。行為模式和 newThread() 差不多唁影,區(qū)別在于 io() 的內(nèi)部實(shí)現(xiàn)是是用一個(gè)無(wú)數(shù)量上限的線程池,可以重用空閑的線程掂名,因此多數(shù)情況下 io() 比 newThread() 更有效率据沈。不要把計(jì)算工作放在 io() 中,可以避免創(chuàng)建不必要的線程饺蔑。
- Schedulers.computation(): 計(jì)算所使用的 Scheduler锌介。這個(gè)計(jì)算指的是 CPU 密集型計(jì)算,即不會(huì)被 I/O 等操作限制性能的操作膀钠,例如圖形的計(jì)算掏湾。這個(gè) Scheduler 使用的固定的線程池,大小為 CPU 核數(shù)肿嘲。不要把 I/O 操作放在 computation() 中融击,否則 I/O 操作的等待時(shí)間會(huì)浪費(fèi) CPU。
- 另外雳窟, Android 還有一個(gè)專用的 AndroidSchedulers.mainThread()尊浪,它指定的操作將在 Android 主線程運(yùn)行匣屡。
三、變換
為什么要用變換拇涤,或者說(shuō)哪些地方需要變換捣作?
比如我想在hello world中加上我的簽名,你可能會(huì)想到去修改Observable對(duì)象:
Observable.just("Hello, world! -Dan").subscribe(s -> System.out.println(s));
如果你能夠改變Observable對(duì)象鹅士,這當(dāng)然是可以的券躁,但是如果你不能修改Observable對(duì)象呢?比如Observable對(duì)象是第三方庫(kù)提供的掉盅?比如我的Observable對(duì)象被多個(gè)Subscriber訂閱也拜,但是我只想在對(duì)某個(gè)訂閱者做修改呢?
那么在Subscriber中對(duì)事件進(jìn)行修改怎么樣呢趾痘?比如下面的代碼:
Observable.just("Hello, world!").subscribe(s -> System.out.println(s + " -Dan"));
這種方式仍然不能讓人滿意慢哈,因?yàn)槲蚁M业腟ubscribers越輕量越好,因?yàn)槲矣锌赡軙?huì)在mainThread中運(yùn)行subscriber永票。另外卵贱,根據(jù)響應(yīng)式函數(shù)編程的概念,Subscribers更應(yīng)該做的事情是“響應(yīng)”侣集,響應(yīng)Observable發(fā)出的事件键俱,而不是去修改。如果我能在某些中間步驟中對(duì)“Hello World肚吏!”進(jìn)行變換是不是很酷方妖?
<pre>
Observable.just("images/logo.png") // 輸入類型 String
.map(new Func1<String, Bitmap>() {
@Override
public Bitmap call(String filePath) { // 參數(shù)類型 String
return getBitmapFromPath(filePath); // 返回類型 Bitmap
}
})
.subscribe(new Action1<Bitmap>() {
@Override
public void call(Bitmap bitmap) { // 參數(shù)類型 Bitmap
showBitmap(bitmap);
}
});
</pre>
Func系列和Action系列類似,不過(guò)他是要返回參數(shù)的罚攀。傳入的String類型党觅,返回了bitmap類型,然后提供給subscriber.
再來(lái)看另一個(gè)例子:
打印出每個(gè)學(xué)生所需要修的所有課程的名稱
<pre>
Student[] students = ...;
Subscriber<Student> subscriber = new Subscriber<Student>() {
@Override
public void onNext(Student student) {
List<Course> courses = student.getCourses();
for (int i = 0; i < courses.size(); i++) {
Course course = courses.get(i);
Log.d(tag, course.getName());
}
}
...
};
Observable.from(students)
.subscribe(subscriber);
</pre>
如果不想在 Subscriber中使用 for 循環(huán)斋泄,而是希望 Subscriber中直接傳入單個(gè)的 Course對(duì)象呢(這對(duì)于代碼復(fù)用很重要)
<pre>
Student[] students = ...;
Subscriber<Course> subscriber = new Subscriber<Course>() {
@Override
public void onNext(Course course) {
Log.d(tag, course.getName());
}
...
};
Observable.from(students)
.flatMap(new Func1<Student, Observable<Course>>() {
@Override
public Observable<Course> call(Student student) {
return Observable.from(student.getCourses());
}
})
.subscribe(subscriber);
</pre>
flatMap很奇怪杯瞻,它確實(shí)返回了一個(gè)Observable,這一點(diǎn)和map不同炫掐。map返回的是一個(gè)subscriber能用的類型對(duì)象魁莉,但flatMap返回的Observable居然subscriber也是能用的,連類型都沒(méi)有問(wèn)題募胃,就像用Observable.from直接拆出來(lái)的一樣旗唁。下面這段話比較費(fèi)解:
flatMap() 的原理是這樣的:1. 使用傳入的事件對(duì)象創(chuàng)建一個(gè) Observable 對(duì)象;2. 并不發(fā)送這個(gè) Observable, 而是將它激活痹束,于是它開(kāi)始發(fā)送事件检疫;3. 每一個(gè)創(chuàng)建出來(lái)的 Observable 發(fā)送的事件,都被匯入同一個(gè) Observable 祷嘶,而這個(gè) Observable 負(fù)責(zé)將這些事件統(tǒng)一交給 Subscriber 的回調(diào)方法屎媳。這三個(gè)步驟夺溢,把事件拆成了兩級(jí),通過(guò)一組新創(chuàng)建的 Observable 將初始的對(duì)象『鋪平』之后通過(guò)統(tǒng)一路徑分發(fā)了下去烛谊。而這個(gè)『鋪平』就是 flatMap() 所謂的 flat风响。
四、內(nèi)存泄露
參考
在Android開(kāi)發(fā)中使用RxJava
RxJava 和 RxAndroid 三(生命周期控制和內(nèi)存優(yōu)化)
我們之前提到丹禀,使用AsyncTask的一大缺點(diǎn)就是它可能會(huì)造成內(nèi)存泄露状勤。當(dāng)它們持有的Activity/Fragment的引用沒(méi)有正確處理時(shí)就會(huì)這樣。不幸的是湃崩,RxJava并不會(huì)自動(dòng)防止這種情況發(fā)生荧降,好在它可以很容易地防止內(nèi)存泄露。Observable.subscribe()方法會(huì)返回一個(gè)Subscription對(duì)象攒读,這個(gè)對(duì)象僅僅有兩個(gè)方法:isSbscribed()與unsubscribe()。你可以在Activity/Fragment的onDestroy方法中調(diào)用Subscription.isSubscribed()檢測(cè)是否這個(gè)異步任務(wù)仍在進(jìn)行辛友。如果它仍在進(jìn)行薄扁,則調(diào)用unsubscribe()方法來(lái)結(jié)束任務(wù),從而釋放其中的強(qiáng)引用废累,防止內(nèi)存泄露邓梅。如果你使用了多個(gè)Observable與Subscriber,那么你可以將它們添加到CompositeSubscription中邑滨,并調(diào)用CompositeSubscription.unsubscribe()結(jié)束所有的任務(wù)日缨。
1、取消訂閱 subscription.unsubscribe() ;
2掖看、線程調(diào)度
3匣距、rxlifecycle 框架的使用