之前看過RxJava的一篇文章非常不錯(cuò)跳轉(zhuǎn)博文通過這篇博文我相信大家都會(huì)對(duì)RxJava這個(gè)框架的使用有了一定的心得申鱼。這里將自己的學(xué)習(xí)的內(nèi)容進(jìn)行總結(jié)风钻,方便后期查用英妓。
一挽放、RxJava簡(jiǎn)介
GitHub地址大家可以自行了解,這里我簡(jiǎn)單介紹一下Android Studio如何添加RxJava依賴:
compile 'io.reactivex:rxjava:1.2.4'
compile 'io.reactivex:rxandroid:1.2.1'
這是截止目前最新的1.x版本了鞋拟,在GitHub上可以看見RxJava已經(jīng)有了2.0的更新版骂维,由于是初學(xué)者并沒有那么急于求成,還是從1.0的版本開始學(xué)習(xí)使用吧贺纲。這里你還可以看到RxAndroid的身影航闺,簡(jiǎn)單介紹一下:
RxAndroid是對(duì)RxJava在Android上的擴(kuò)展,如果你是做安卓開發(fā)的話猴誊,各種主線程和子線程的操作肯定會(huì)讓你覺得頭疼潦刃,RxAndroid可以很容易地解決你的這種困擾。
二懈叹、初步的使用
RxJava 的異步實(shí)現(xiàn)乖杠,是通過一種擴(kuò)展的觀察者模式來實(shí)現(xiàn)的。觀察者最簡(jiǎn)單的對(duì)象關(guān)系如下圖澄成,當(dāng)一個(gè)被觀察者發(fā)出事件時(shí)胧洒,觀察者接收事件并調(diào)用回調(diào)方法響應(yīng)此事件畏吓。
1、觀察者(Observer)
所以我們先定義一個(gè)觀察者對(duì)象observer卫漫,代碼如下:
Observer<String> observer = new Observer<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
}
};
Observer是一個(gè)接口內(nèi)部定義了三個(gè)方法如上菲饼,這里說明一下RxJava 不僅把每個(gè)事件單獨(dú)處理,還會(huì)把它們看做一個(gè)隊(duì)列列赎。
-
onNext()
當(dāng)事件隊(duì)列里有事件發(fā)出時(shí)即會(huì)調(diào)用此方法來處理 -
onError()
當(dāng)事件隊(duì)列在處理過程中出異常時(shí)宏悦,onError()會(huì)被觸發(fā),同時(shí)隊(duì)列自動(dòng)終止包吝,不允許再有事件發(fā)出饼煞。 -
onCompleted()
當(dāng)事件隊(duì)列完結(jié),并且不再有新事件發(fā)出時(shí)即會(huì)調(diào)用此方法诗越。 - 在一個(gè)正確運(yùn)行的事件序列中,
onCompleted()
和onError()
有且只有一個(gè)砖瞧,并且是事件序列中的最后一個(gè)。需要注意的是,onCompleted()
和onError()
二者也是互斥的嚷狞,即在隊(duì)列中調(diào)用了其中一個(gè)芭届,就不應(yīng)該再調(diào)用另一個(gè)。
除了Observer接口之外感耙,RxJava還內(nèi)置了一個(gè)實(shí)現(xiàn)了Observer接口的抽象類Subscriber褂乍,對(duì)Observer接口進(jìn)行了一些擴(kuò)展,但他們的基本使用方式是完全一樣的即硼。定義Subscriber對(duì)象的代碼如下:
subscriber = new Subscriber<String>() {
@Override
public void onStart() {
super.onStart();
// 這是一個(gè)擴(kuò)展的方法逃片,用于初始化一些數(shù)據(jù),與定義Subscriber對(duì)象運(yùn)行在同一個(gè)線程
}
@Override
public void onCompleted() {
Log.e("xns", "onCompleted");
}
@Override
public void onError(Throwable e) {
Log.e("xns", e.toString());
}
@Override
public void onNext(String s) {
Log.e("xns", s);
}
};
@Override
protected void onStop() {
if (subscriber.isUnsubscribed()) {// 釋放資源的方法只酥,防止內(nèi)存泄漏
subscriber.unsubscribe();
}
super.onStop();
}
- onStart()
這是 Subscriber增加的方法褥实。它會(huì)在 subscribe 剛開始,而事件還未發(fā)送之前被調(diào)用裂允,可以用于做一些準(zhǔn)備工作损离,例如數(shù)據(jù)的清零或重置。這是一個(gè)可選方法绝编,默認(rèn)情況下它的實(shí)現(xiàn)為空僻澎。需要注意的是,它總是在 subscribe 所發(fā)生的線程被調(diào)用十饥,而不能指定線程窟勃。 - unsubscribe()
這是 Subscriber所實(shí)現(xiàn)的另一個(gè)接口 Subscription的方法,用于取消訂閱逗堵。在這個(gè)方法被調(diào)用后秉氧,Subscriber將不再接收事件忽洛。一般在這個(gè)方法調(diào)用前锁右,可以使用 isUnsubscribed()先判斷一下狀態(tài)膝擂。 unsubscribe()這個(gè)方法很重要样悟,因?yàn)樵?subscribe()之后, Observable會(huì)持有 Subscriber的引用攘滩,這個(gè)引用如果不能及時(shí)被釋放伞访,將有內(nèi)存泄露的風(fēng)險(xiǎn)。所以最好保持一個(gè)原則:要在不再使用的時(shí)候盡快在合適的地方(例如 onPause()轰驳、onStop()等方法中)調(diào)用 unsubscribe()來解除引用關(guān)系,以避免內(nèi)存泄露的發(fā)生弟灼。
關(guān)于觀察者創(chuàng)建和使用就介紹到這级解,我們?cè)賮砜纯幢挥^察者的創(chuàng)建和使用方法。
2田绑、被觀察者(Obserable)
RxJava使用create()方法來創(chuàng)建一個(gè)Observable對(duì)象勤哗,并為它定義觸發(fā)事件時(shí)的規(guī)則:
Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("Hello");
subscriber.onNext("RxJava");
subscriber.onCompleted();
}
});
這里傳入了一個(gè)OnSubscribe對(duì)象作為參數(shù)。它的作用相當(dāng)于一個(gè)計(jì)劃表掩驱,當(dāng)Obserable被訂閱的時(shí)候芒划,OnSubscribe的call()方法就會(huì)被調(diào)用,就會(huì)依次調(diào)用訂閱者的onNext()方法兩次和onCompleted()方法欧穴。這樣民逼,就完成了被觀察者調(diào)用觀察者的回調(diào)方法來完成事件的傳遞,即觀察者模式涮帘。
-
onCreate()
是RxJava最基本的創(chuàng)建事件序列的方法拼苍。基于這個(gè)方法调缨,RxJava還提供了一些快捷方法來創(chuàng)建疮鲫。 -
just(T...)
將傳入的參數(shù)依次發(fā)送出來。
Observable<String> observable2 = Observable.just("Hello","RxJava");
-
from(T[])
/from(Iterable<? extends T>)
將傳入的數(shù)組或 Iterable拆分成具體對(duì)象后弦叶,依次發(fā)送出來俊犯。
String[] strArr = {"Hello","RxJava"};
Observable<String> observable3 = Observable.from(strArr);
以上這三種方式都是等價(jià)的。
3伤哺、訂閱(Subscribe)
創(chuàng)建了 Observable 和 Observer 之后燕侠,再用 subscribe() 方法將它們聯(lián)結(jié)起來,整條鏈子就可以工作了立莉。代碼形式很簡(jiǎn)單:
observable.subscribe(observer);
// 或者:
observable.subscribe(subscriber);
Observable.subscribe(Subscriber) 的內(nèi)部實(shí)現(xiàn)是這樣的(僅核心代碼):
// 注意:這不是 subscribe() 的源碼贬循,而是將源碼中與性能、兼容性桃序、擴(kuò)展性有關(guān)的代碼剔除后的核心代碼杖虾。
// 如果需要看源碼,可以去 RxJava 的 GitHub 倉庫下載媒熊。
public Subscription subscribe(Subscriber subscriber) {
subscriber.onStart();
onSubscribe.call(subscriber);
return subscriber;
}
可以看到奇适,subscriber() 做了3件事:
- 調(diào)用 Subscriber.onStart() 坟比。這個(gè)方法在前面已經(jīng)介紹過,是一個(gè)可選的準(zhǔn)備方法嚷往。
- 調(diào)用 Observable 中的 OnSubscribe.call(Subscriber) 葛账。在這里,事件發(fā)送的邏輯開始運(yùn)行皮仁。從這也可以看出籍琳,在 RxJava 中, Observable 并不是在創(chuàng)建的時(shí)候就立即開始發(fā)送事件贷祈,而是在它被訂閱的時(shí)候趋急,即當(dāng) subscribe() 方法執(zhí)行的時(shí)候。
- 將傳入的 Subscriber 作為 Subscription 返回势誊。這是為了方便 unsubscribe()呜达。(看源碼你會(huì)發(fā)現(xiàn)Subscriber是Subscription的實(shí)現(xiàn)子類,Subscription接口只定義了兩個(gè)方法
void unsubscribe();
粟耻、boolean isUnsubscribed();
)
除了 subscribe(Observer) 和 subscribe(Subscriber) 查近,subscribe() 還支持不完整定義的回調(diào),RxJava 會(huì)自動(dòng)根據(jù)定義創(chuàng)建出 Subscriber 挤忙。形式如下:
Action1<String> onNextAction = new Action1<String>() {
// onNext()
@Override
public void call(String s) {
Log.d(tag, s);
}
};
Action1<Throwable> onErrorAction = new Action1<Throwable>() {
// onError()
@Override
public void call(Throwable throwable) {
// Error handling
}
};
Action0 onCompletedAction = new Action0() {
// onCompleted()
@Override
public void call() {
Log.d(tag, "completed");
}
};
// 自動(dòng)創(chuàng)建 Subscriber 霜威,并使用 onNextAction 來定義 onNext()
observable.subscribe(onNextAction);
// 自動(dòng)創(chuàng)建 Subscriber ,并使用 onNextAction 和 onErrorAction 來定義 onNext() 和 onError()
observable.subscribe(onNextAction, onErrorAction);
// 自動(dòng)創(chuàng)建 Subscriber 册烈,并使用 onNextAction侥祭、 onErrorAction 和 onCompletedAction 來定義 onNext()、 onError() 和 onCompleted()
observable.subscribe(onNextAction, onErrorAction, onCompletedAction);
這里介紹一下Action0
和Action1
茄厘。它們都是RxJava的內(nèi)部接口矮冬,Action0
接口只用一個(gè)call()
方法,這個(gè)方法無參無返回值次哈;由于 onCompleted()
方法也是無參無返回值的胎署,因此 Action0
可以被當(dāng)成一個(gè)包裝對(duì)象,將 onCompleted()
的內(nèi)容打包起來將自己作為一個(gè)參數(shù)傳入 subscribe()
以實(shí)現(xiàn)不完整定義的回調(diào)窑滞。這樣其實(shí)也可以看做將 onCompleted()
方法作為參數(shù)傳進(jìn)了subscribe()
琼牧,相當(dāng)于其他某些語言中的『閉包』。 Action1接口有一個(gè)call(T param)
方法哀卫,這個(gè)方法也無返回值可以看作是將 onNext(obj)
和 onError(error)
打包起來傳入 subscribe()
以實(shí)現(xiàn)不完整定義的回調(diào)巨坊。RxJava 還提供了多個(gè) ActionX 形式的接口 (例如 Action2
, Action3
) 的,它們可以被用以包裝不同的無返回值的方法此改。
4趾撵、舉例說明:
a.打印字符串?dāng)?shù)組
將字符串?dāng)?shù)組 names中的所有字符串依次打印出來:
private void testRxjava2() {
String[] names = {"xns","wxs","tyc"};
Observable.from(names).subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.d("xns", s);
}
});
}
b. 由 id 取得圖片并顯示
由指定的一個(gè) drawable 文件 id drawableRes 取得圖片,并顯示在 ImageView 中共啃,并在出現(xiàn)異常的時(shí)候打印 Toast 報(bào)錯(cuò):
private void testRxJava3() {
final int drawableResId = 0;
final ImageView imageView = new ImageView(this);
Observable.create(new Observable.OnSubscribe<Drawable>() {
@RequiresApi(api = Build.VERSION_CODES.LOLLIPOP)
@Override
public void call(Subscriber<? super Drawable> subscriber) {
Drawable drawable = getTheme().getDrawable(drawableResId);
subscriber.onNext(drawable);
subscriber.onCompleted();
}
}).subscribe(new Observer<Drawable>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Drawable drawable) {
imageView.setImageDrawable(drawable);
}
});
}
private void testRxjava2() {
String[] names = {"xns","wxs","tyc"};
Observable.from(names).subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.d("xns", s);
}
});
}
以上就是RxJava的基本使用占调。然而在 RxJava 的默認(rèn)規(guī)則中暂题,事件的發(fā)出和消費(fèi)都是在同一個(gè)線程的。也就是說究珊,如果只用上面的方法薪者,實(shí)現(xiàn)出來的只是一個(gè)同步的觀察者模式。觀察者模式本身的目的就是『后臺(tái)處理剿涮,前臺(tái)回調(diào)』的異步機(jī)制言津,因此異步對(duì)于 RxJava 是至關(guān)重要的。而要實(shí)現(xiàn)異步取试,則需要用到 RxJava 的另一個(gè)概念: Scheduler(調(diào)度器)悬槽。下一篇我將和大家繼續(xù)學(xué)習(xí)RxJava接下來的內(nèi)容。