上一篇中我們了解了什么是RxJava,用一個詞來總結(jié)就是異步。
這里我們來講講RxJava的異步實現(xiàn)眠副。它是通過一種擴展的觀察者模式來實現(xiàn)网持。
一宜岛、觀察者模式
先簡書一下觀察者模式。
觀察者模式面向的需求是:觀察者對被觀察者的某種變化作出反應(yīng)功舀。比如警察抓小偷萍倡,警察需要在小偷作案時實施抓捕。在這里面小偷是被觀察者辟汰,警察是觀察者列敲。而程序的觀察者模式跟真正的觀察略有不同,觀察者不需要時時刻刻頂著被觀察者帖汞,而是采用注冊(Register)或者被稱為訂閱(Subscribe)方式告訴觀察者:我需要你的某種狀態(tài)戴而,你要在你變化的時候通知我。
Android開發(fā)中典型的觀察者模式就是監(jiān)聽器事件Listener翩蘸。對設(shè)置OnClickListener來說所意,View是被觀察者,OnClickListener是觀察者催首,兩者通過setOnClickListener()方法達成注冊(訂閱)關(guān)系扶踊。訂閱之后用戶點擊按鈕的瞬間,Android Framework 就會將點擊事件發(fā)送給已經(jīng)注冊的 OnClickListener郎任。
OnClickListener的模式圖
如圖所示秧耗,通過 setOnClickListener()方法,Button持有 OnClickListener的引用(這一過程沒有在圖上畫出)舶治;當用戶點擊時分井,Button自動調(diào)用 OnClickListener的 onClick()方法胶台。另外,如果把這張圖中的概念抽象出來(Button -> 被觀察者杂抽、OnClickListener-> 觀察者诈唬、setOnClickListener()-> 訂閱,onClick() -> 事件)缩麸,就由專用的觀察者模式(例如只用于監(jiān)聽控件點擊)轉(zhuǎn)變成了通用的觀察者模式铸磅。如下圖:
而 RxJava 作為一個工具庫,使用的就是通用形式的觀察者模式杭朱。
二阅仔、RxJava的觀察者模式
RxJava的四個基本概念
Observable(被觀察者)
Observer(觀察者)
subscribe(訂閱)
事件
Oservable和Observer通過 subscribe()方法實現(xiàn)訂閱關(guān)系,從而 Observable可以在需要的時候發(fā)出事件來通知 Observer弧械。
與傳統(tǒng)觀察者模式不同八酒, RxJava 的事件回調(diào)方法除了普通事件 onNext()(相當于 onClick() / onEvent())之外,還定義了兩個特殊的事件:onCompleted()和 onError()刃唐。
onCompleted(): 事件隊列完結(jié)羞迷。RxJava 不僅把每個事件單獨處理,還會把它們看做一個隊列画饥。RxJava 規(guī)定衔瓮,當不會再有新的onNext()發(fā)出時,需要觸發(fā) onCompleted()方法作為標志抖甘。
onError(): 事件隊列異常热鞍。在事件處理過程中出異常時,onError()會被觸發(fā)衔彻,同時隊列自動終止薇宠,不允許再有事件發(fā)出。
在一個正確運行的事件序列中, onCompleted()和 onError()有且只有一個艰额,并且是事件序列中的最后一個澄港。需要注意的是,onCompleted()
和 onError()二者也是互斥的悴晰,即在隊列中調(diào)用了其中一個慢睡,就不應(yīng)該再調(diào)用另一個。
RxJava 的觀察者模式大致如下圖:
三铡溪、基本實現(xiàn)
1、創(chuàng)建Observer
Observer 即觀察者泪喊,它決定事件觸發(fā)的時候?qū)⒂性鯓拥男袨椤?RxJava 中的 Observer接口的實現(xiàn)方式:
Observer<String> observer = new Observer<String>() {
@Override
public void onNext(String s) {
Log.d(tag, "Item: " + s);
}
@Override
public void onCompleted() {
Log.d(tag, "Completed!");
}
@Override
public void onError(Throwable e) {
Log.d(tag, "Error!");
}
};
除了 Observer 接口之外棕硫,RxJava 還內(nèi)置了一個實現(xiàn)了 Observer的抽象類:Subscriber。 Subscriber對 Observer接口進行了一些擴展袒啼,但他們的基本使用方式是完全一樣的:
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onNext(String s) {
Log.d(tag, "Item: " + s);
}
@Override
public void onCompleted() {
Log.d(tag, "Completed!");
}
@Override
public void onError(Throwable e) {
Log.d(tag, "Error!");
}
};
不僅基本使用方式一樣哈扮,實質(zhì)上纬纪,在 RxJava 的 subscribe 過程中,Observer也總是會先被轉(zhuǎn)換成一個 Subscriber再使用滑肉。所以如果你只想使用基本功能包各,選擇 Observer和 Subscriber是完全一樣的。它們的區(qū)別對于使用者來說主要有兩點:
onStart(): 這是 Subscriber增加的方法靶庙。它會在 subscribe 剛開始问畅,而事件還未發(fā)送之前被調(diào)用,可以用于做一些準備工作六荒,例如數(shù)據(jù)的清零或重置护姆。這是一個可選方法,默認情況下它的實現(xiàn)為空掏击。需要注意的是卵皂,如果對準備工作的線程有要求(例如彈出一個顯示進度的對話框,這必須在主線程執(zhí)行)砚亭, onStart()就不適用了灯变,因為它總是在 subscribe 所發(fā)生的線程被調(diào)用,而不能指定線程捅膘。要在指定的線程來做準備工作柒凉,可以使用 doOnSubscribe()方法,具體可以在后面的文中看到篓跛。
unsubscribe(): 這是 Subscriber所實現(xiàn)的另一個接口 Subscription的方法膝捞,用于取消訂閱。在這個方法被調(diào)用后愧沟,Subscriber將不再接收事件蔬咬。一般在這個方法調(diào)用前,可以使用 isUnsubscribed()先判斷一下狀態(tài)沐寺。 unsubscribe()這個方法很重要林艘,因為在 subscribe()之后, Observable會持有 Subscriber的引用混坞,這個引用如果不能及時被釋放狐援,將有內(nèi)存泄露的風險。所以最好保持一個原則:要在不再使用的時候盡快在合適的地方(例如 onPause()onStop()等方法中)調(diào)用unsubscribe()來解除引用關(guān)系究孕,以避免內(nèi)存泄露的發(fā)生啥酱。
2、 創(chuàng)建 Observable
Observable 即被觀察者厨诸,它決定什么時候觸發(fā)事件以及觸發(fā)怎樣的事件镶殷。 RxJava 使用 create()方法來創(chuàng)建一個 Observable ,并為它定義事件觸發(fā)規(guī)則:
Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("Hello");
subscriber.onNext("Hi");
subscriber.onNext("Aloha");
subscriber.onCompleted();
}
});
可以看到微酬,這里傳入了一個 OnSubscribe對象作為參數(shù)绘趋。OnSubscribe會被存儲在返回的 Observable對象中颤陶,它的作用相當于一個計劃表,當 Observable被訂閱的時候陷遮,OnSubscribe的 call()方法會自動被調(diào)用滓走,事件序列就會依照設(shè)定依次觸發(fā)(對于上面的代碼,就是觀察者Subscriber將會被調(diào)用三次 onNext()和一次 onCompleted())帽馋。這樣搅方,由被觀察者調(diào)用了觀察者的回調(diào)方法,就實現(xiàn)了由被觀察者向觀察者的事件傳遞茬斧,即觀察者模式腰懂。
create()方法是 RxJava 最基本的創(chuàng)造事件序列的方法∠畋基于這個方法绣溜, RxJava 還提供了一些方法用來快捷創(chuàng)建事件隊列,例如:
just(T...): 將傳入的參數(shù)依次發(fā)送出來娄蔼。
Observable observable = Observable.just("Hello", "Hi", "Aloha");
// 將會依次調(diào)用:
// onNext("Hello");
// onNext("Hi");
// onNext("Aloha");
// onCompleted();
from(T[])/ from(Iterable<? extends T>): 將傳入的數(shù)組或 Iterable拆分成具體對象后怖喻,依次發(fā)送出來。
String[] words = {"Hello", "Hi", "Aloha"};
Observable observable = Observable.from(words);
// 將會依次調(diào)用:
// onNext("Hello");
// onNext("Hi");
// onNext("Aloha");
// onCompleted();
上面 just(T...)的例子和 from(T[])的例子岁诉,都和之前的 create(OnSubscribe)的例子是等價的锚沸。
3、Subscribe (訂閱)
創(chuàng)建了 Observable和 Observer之后涕癣,再用 subscribe()方法將它們聯(lián)結(jié)起來哗蜈,整條鏈子就可以工作了。代碼形式很簡單:
observable.subscribe(observer);
// 或者:
observable.subscribe(subscriber);
除了 subscribe(Observer)和 subscribe(Subscriber)坠韩,subscribe()還支持不完整定義的回調(diào)距潘,RxJava 會自動根據(jù)定義創(chuàng)建出Subscriber 扳躬。形式如下:
Action1<String> onNextAction = new Action1<String>() {
// onNext()
@Override
public void call(String s) {
Log.d(tag, s);
}
};
Action1<Throwable> onErrorAction = new Action1<Throwable>() {
// onError()
@Override
public void call(Throwable throwable) {
// Error handling
}
};
Action0 onCompletedAction = new Action0() {
// onCompleted()
@Override
public void call() {
Log.d(tag, "completed");
}
};
// 自動創(chuàng)建 Subscriber 月杉,并使用 onNextAction 來定義 onNext()
observable.subscribe(onNextAction);
// 自動創(chuàng)建 Subscriber ,并使用 onNextAction 和 onErrorAction 來定義 onNext() 和 onError()
observable.subscribe(onNextAction, onErrorAction);
// 自動創(chuàng)建 Subscriber 隧哮,并使用 onNextAction氢惋、 onErrorAction 和 onCompletedAction 來定義 onNext()洞翩、 onError() 和 onCompleted()
observable.subscribe(onNextAction, onErrorAction, onCompletedAction);
簡單解釋一下這段代碼中出現(xiàn)的 Action1和 Action0。 Action0是 RxJava 的一個接口焰望,它只有一個方法 call()骚亿,這個方法是無參無返回值的;由于 onCompleted()方法也是無參無返回值的柿估,因此 Action0可以被當成一個包裝對象循未,將 onCompleted()的內(nèi)容打包起來將自己作為一個參數(shù)傳入 subscribe()以實現(xiàn)不完整定義的回調(diào)。這樣其實也可以看做將 onCompleted() 方法作為參數(shù)傳進了subscribe()秫舌,相當于其他某些語言中的『閉包』的妖。
Action1也是一個接口,它同樣只有一個方法 call(T param)足陨,這個方法也無返回值嫂粟,但有一個參數(shù);與 Action0同理墨缘,由于 onNext(T obj)和 onError(Throwable error)也是單參數(shù)無返回值的星虹,因此 Action1可以將 onNext(obj)和 onError(error)打包起來傳入 subscribe()以實現(xiàn)不完整定義的回調(diào)。事實上镊讼,雖然 Action0和 Action1在 API 中使用最廣泛宽涌,但 RxJava 是提供了多個 ActionX形式的接口 (例如 Action2, Action3) 的,它們可以被用以包裝不同的無返回值的方法蝶棋。
三卸亮、場景示例
1. 打印字符串數(shù)組
將字符串數(shù)組 names中的所有字符串依次打印出來:
String[] names = ...;
Observable.from(names)
.subscribe(new Action1<String>() {
@Override
public void call(String name) {
Log.d(tag, name);
}
});
在 RxJava 的默認規(guī)則中,事件的發(fā)出和消費都是在同一個線程的玩裙。也就是說兼贸,如果只用上面的方法,實現(xiàn)出來的只是一個同步的觀察者模式吃溅。觀察者模式本身的目的就是『后臺處理溶诞,前臺回調(diào)』的異步機制,因此異步對于 RxJava 是至關(guān)重要的决侈。而要實現(xiàn)異步螺垢,則需要用到 RxJava 的另一個概念: Scheduler。今天就先講到這里赖歌,下篇繼續(xù)枉圃。