響應(yīng)式編程
RxJava提供了響應(yīng)式編碼規(guī)范,而RxAndroid是專供Android平臺的RxJava(只是針對平臺增加了少量類),一般Android開發(fā)者口中的RxJava指的便是RxAndroid爆土。關(guān)于響應(yīng)式編程的概念這里不多說蜘澜,網(wǎng)上一大堆,我就簡單說明下響應(yīng)式編程與傳統(tǒng)編程的區(qū)別翔烁。如果邏輯B依賴于邏輯A,邏輯C依賴于邏輯B旨涝,那么:
- 傳統(tǒng)編程中的作法是在A執(zhí)行完成后去執(zhí)行B蹬屹,B執(zhí)行完畢后執(zhí)行C,除了A白华、B慨默、C本身的邏輯外,這部分由依賴產(chǎn)生的先后執(zhí)行邏輯也是由開發(fā)者編寫弧腥。
- 響應(yīng)式編程中厦取,會先將A、B管搪、C做個依賴綁定虾攻,A->B->C,就像水流一樣更鲁,從A流到B霎箍,再從B流到C,依賴產(chǎn)生的先后執(zhí)行邏輯由語言或者庫提供支持澡为,開發(fā)者只需要編寫A漂坏、B、C本身的邏輯以及告訴提供方三者間的關(guān)系即可。換句話說顶别,就是B會自動響應(yīng)A的執(zhí)行結(jié)果谷徙,C會自動響應(yīng)B的執(zhí)行結(jié)果。
響應(yīng)式編程可以很優(yōu)雅地處理任務(wù)間(或者業(yè)務(wù)間)的依賴關(guān)系驯绎,而這關(guān)系就像流一樣完慧,并且多用于異步場景,且由于大家熟知的響應(yīng)式編程庫RxJava的主要應(yīng)用場景也是異步剩失,主要應(yīng)用手段是流屈尼,因此不少文章簡單粗暴地將響應(yīng)式編程等同是異步+流,這個是比較片面的赴叹。
示例
我們來看個具體的例子,加深下理解指蚜。本例中需要處理三個任務(wù)乞巧,分別為A、B摊鸡、C绽媒,這三個任務(wù)都需要在工作線程中執(zhí)行,A執(zhí)行完以后延遲1S執(zhí)行B免猾,B執(zhí)行完以后延遲1S執(zhí)行C是辕。
以下為公共代碼,定義了A猎提、B获三、C三個任務(wù)以及定時器:
public class TaskA implements Runnable {
@Override
public void run() {
// do something
}
}
public class TaskB implements Runnable {
@Override
public void run() {
// do something
}
}
public class TaskC implements Runnable {
@Override
public void run() {
// do something
}
}
public class Timer {
private static final Timer INSTANCE = new Timer();
private Handler mHandler;
private Timer() {
HandlerThread thread = new HandlerThread("timer");
thread.start();
mHandler = new Handler(thread.getLooper());
}
public static Timer get() {
return INSTANCE;
}
public void post(Runnable runnable) {
mHandler.post(runnable);
}
public void postDelayed(Runnable runnable, long delay) {
mHandler.postDelayed(runnable, delay);
}
}
傳統(tǒng)編程
Timer.get().post(new Runnable() {
@Override
public void run() {
new TaskA().run();
Timer.get().postDelayed(new Runnable() {
@Override
public void run() {
new TaskB().run();
Timer.get().postDelayed(new Runnable() {
@Override
public void run() {
new TaskC().run();
}
}, 1000);
}
}, 1000);
}
});
傳統(tǒng)編程方式的問題非常突出,嵌入層次太多锨苏,隨著依賴關(guān)系的增多以及復(fù)雜化疙教,這樣的代碼會變得極其臃腫且不易閱讀和維護(hù)。
響應(yīng)式編程
首先我們需要編寫一個支持響應(yīng)式編程規(guī)范的庫伞租,代碼如下(這個類很多情況沒有考慮到贞谓,僅僅是為了演示用):
public class StreamTimer implements Runnable {
private List<Task> mTasks = new LinkedList<>();
public StreamTimer() {
}
public StreamTimer next(Runnable runnable) {
return next(runnable, 0);
}
public StreamTimer next(Runnable runnable, long delay) {
Task task = new Task(runnable, delay);
mTasks.add(task);
return this;
}
public void startup() {
startNextTimer();
}
private void startNextTimer() {
if(mTasks.isEmpty()) {
return;
}
Task task = mTasks.get(0);
Timer.get().postDelayed(this, task.delay);
}
@Override
public void run() {
Task task = mTasks.remove(0);
task.runnable.run();
startNextTimer();
}
private class Task {
Runnable runnable;
long delay;
Task(Runnable runnable, long delay) {
this.runnable = runnable;
this.delay = delay;
}
}
}
執(zhí)行A、B葵诈、C任務(wù)的代碼如下:
new StreamTimer()
.next(new TaskA())
.next(new TaskB(), 1000)
.next(new TaskC(), 1000)
.startup();
next
表示新增一個任務(wù)且需要在上一次任務(wù)執(zhí)行完畢后才執(zhí)行裸弦。以上代碼非常簡潔且可讀性很強(qiáng),任務(wù)間的依賴關(guān)系非常清晰作喘,拓展也非常簡單理疙,新增任務(wù)時只需要在合適的地方插入next
方法即可。
比起上述例子泞坦,RxJava的功能要強(qiáng)大得多沪斟,也復(fù)雜得多,上述例子只是為了讓新手能快速掌握響應(yīng)式編程及流式調(diào)用(也稱為鏈?zhǔn)秸{(diào)用),接下去我們開始講解RxJava主之。
RxAndroid基本使用
使用RxAndroid需要在build.gradle中加入如下依賴:
compile 'io.reactivex.rxjava2:rxandroid:2.1.1'
compile 'io.reactivex.rxjava2:rxjava:2.2.7'
RxAndroid也直接去https://github.com/ReactiveX/RxAndroid下載源碼择吊,里面就不到10個類。
Observable和Observer
- Observable:被觀察者槽奕,用來處理事件的派發(fā)几睛。
- Observer:觀察者,觀察目標(biāo)為Observable粤攒,Observable派發(fā)出來的事件將被它處理所森,一個Observable可以有多個Observer。當(dāng)Observable有變化時夯接,Observer能夠立即響應(yīng)這些變化焕济。
熟悉觀察者模式的同學(xué)應(yīng)該對這兩者有非常深刻的認(rèn)識了,它們是RxJava中最基礎(chǔ)的東西盔几,RxJava中其他的對象晴弃、方法、操作符都是圍繞這二者進(jìn)行或拓展的逊拍。來看下最簡單的例子:
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("A");
emitter.onNext("B");
emitter.onNext("C");
emitter.onComplete();
}
});
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
log("onSubscribe");
}
@Override
public void onNext(String s) {
log("onNext:" + s);
}
@Override
public void onError(Throwable e) {
error("onError", e);
}
@Override
public void onComplete() {
log("onComplete");
}
};
observable.subscribe(observer);
日志輸出如下:
onSubscribe
onNext:A
onNext:B
onNext:C
onComplete
來看下上述例子中Observable
相關(guān)的對象和方法:
- 使用
Observable.create
創(chuàng)建Observable
對象上鞠。 -
ObservableEmitter
為發(fā)射器,Observable
使用它發(fā)射事件給所有Observer
芯丧。 - 使用
Observable.subscribe
添加一個Observer
芍阎。
接著來看下Observer
的幾個方法:
- onSubscribe:在訂閱
observable
時回調(diào),可以在這里調(diào)用Disposable.dispose
取消訂閱或者將Disposable
對象保存起來以便在后續(xù)某個時刻取消訂閱缨恒。 - onNext:在
ObservableEmitter.onNext
執(zhí)行后回調(diào)谴咸,onNext
表示的是整個響應(yīng)鏈中的一環(huán),在這里處理響應(yīng)鏈中的其中一個任務(wù)骗露,可以多次調(diào)用寿冕。 - onComplete:在
ObservableEmitter.onComplete
執(zhí)行后回調(diào),表示任務(wù)已全部完成椒袍,可以在這里做收尾工作驼唱。 - onError:在
ObservableEmitter.onError
執(zhí)行后或者鏈中任一環(huán)節(jié)出現(xiàn)異常時回調(diào),表示任務(wù)執(zhí)行失敗驹暑。
除了onSubscribe
玫恳,其它幾個方法有如下特點(diǎn):
- 和
ObservableEmitter
中的同名方法一一對應(yīng),在ObservableEmitter
的同名方法執(zhí)行后回調(diào)优俘。 -
onComplete
和onError
互斥京办,兩者只能觸發(fā)其中之一,且觸發(fā)后onNext
便不會再觸發(fā)帆焕。 -
onComplete
觸發(fā)后惭婿,后續(xù)ObservableEmitter
調(diào)用任何方法都不會再生效不恭。 -
onError
觸發(fā)后,如果ObservableEmitter
再調(diào)用onError
或者onComplete
财饥,RxJava會拋出異常换吧,開發(fā)者需要自行保證唯一性。(處理方式為什么不同onComplete
钥星?)
需要特別注意的沾瓦,ObservableOnSubscribe.subscribe
方法在每次有新的Observer
加入時,都會在Observer.onSubscribe
回調(diào)之后觸發(fā)谦炒,這就保證了所有的Observer
都能接收到事件贯莺。
subscribe
在上面提到了通過subscribe
為Observable
和Observer
建立了綁定關(guān)系,我們來看下方法原型:
void subscribe(Observer<? super T> observer)
除此之外宁改,subscribe
還有很多重載方法缕探,我們來看下所有的方法原型:
Disposable subscribe();
Disposable subscribe(Consumer<? super T> onNext);
Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError);
Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete);
Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {
LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);
subscribe(ls);
return ls;
}
以上方法最終都會調(diào)用到參數(shù)最多的那個方法,而該方法內(nèi)部又調(diào)用了subscribe(Observer<? super T> observer)
还蹲,這些方法相對來說更為簡潔易用爹耗。可以發(fā)現(xiàn)新增了兩個類秽誊,分別是Consumer
和Action
鲸沮,其中Observer
的onNext
琳骡、onError
和onSubscribe
方法被Consumer.accept
方法取代锅论,onComplete
方法被Action.run
方法取代。因此楣号,如果我們僅僅只關(guān)心Observer
的其中一個或多個回調(diào)最易,那么便可以通過Consumer
或Action
來代替Observer
注冊到Observable
中。如上述的例子中炫狱,如果我們只關(guān)心onNext
藻懒,那么可以這么使用:
Observable<String> observable = Observable.create(new ObservableOnSubscribe
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("A");
emitter.onNext("B");
emitter.onNext("C");
emitter.onComplete();
}
});
observable.subscribe(
new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
log("onNext:" + s);
}
}, Functions.<Throwable>emptyConsumer(),
new Action() {
@Override
public void run() throws Exception {
log("onComplete");
}
}
);
使用lambda表達(dá)式之后的代碼如下:
Observable<String> observable = Observable.create(
(ObservableEmitter<String> emitter) -> {
emitter.onNext("A");
emitter.onNext("B");
emitter.onNext("C");
emitter.onComplete();
}
);
observable.subscribe(
(String s) -> log("onNext:" + s),
Functions.<Throwable>emptyConsumer(),
() -> log("onComplete")
);
代碼顯得簡潔優(yōu)雅易讀得多,實際上RxJava是為響應(yīng)式編程和函數(shù)式編程而生视译,因此使用lambda表達(dá)式才能完美的使用RxJava嬉荆。
Android使用lambda表達(dá)式需要使用jdk1.8,且gradle版本需要4.0以上(也可能是3.x某個版本酷含,記不住了鄙早,4.0以上準(zhǔn)沒錯),在build.gradle中加入如下代碼:
android { compileOptions { sourceCompatibility > JavaVersion.VERSION_1_8 targetCompatibility > JavaVersion.VERSION_1_8 } }
線程調(diào)度
以下稱Observer.onXXX
回調(diào)時所在線程為Observer
工作線程椅亚,Observable
發(fā)射事件時所在線程為Observable
工作線程限番。默認(rèn)情況下,Observer
和Observable
工作線程是同一個呀舔,該線程即調(diào)用Observable.subscribe
時所在的線程弥虐。然而,異步調(diào)用才是RxJava的核心應(yīng)用場景,下面我們來看下如何改變Observer
和Observable
的工作線程霜瘪。使用Observable.subscribeOn
配置Observable
工作線程珠插,使用Observable.observeOn
配置Observer
工作線程。很多情況下粥庄,置Observable
工作線程位于子線程中丧失,因為可能存在網(wǎng)絡(luò)請求、數(shù)據(jù)存取等耗時操作惜互;而Observer
工作線程位于主線程布讹,因為接收到事件后需要刷新UI。下面來看下這種場景下的應(yīng)用示例:
Observable<String> observable = Observable.create(
(ObservableEmitter<String> emitter) -> {
log("emit thread=" + Thread.currentThread().getName());
emitter.onNext("A");
emitter.onNext("B");
emitter.onNext("C");
emitter.onComplete();
}
);
observable.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
(String s) -> log("onNext:" + s + " thread=" + Thread.currentThread().getName()),
Functions.<Throwable>emptyConsumer(),
() -> log("onComplete thread=" + Thread.currentThread().getName())
);
日志輸出如下:
emit thread=RxNewThreadScheduler-2
onNext:A thread=main
onNext:B thread=main
onNext:C thread=main
onComplete thread=main
可以看到線程調(diào)度確實如我們所期望的了训堆。需要特別強(qiáng)調(diào)的描验,subscribeOn
和observeOn
返回的不是原本的Observable
對象,因此如果沒有采用鏈?zhǔn)秸{(diào)用坑鱼,在調(diào)用這兩個方法之后必須重新賦值給Observable
對象膘流,如:
// 錯誤調(diào)用
observable.subscribeOn(xxx)
.observeOn(xxx);
observable.subscribe(xxx);
// 正確調(diào)用
observable = observable.subscribeOn(xxx)
.observeOn(xxx);
observable.subscribe(xxx);
如果重復(fù)調(diào)用subscribeOn
和observeOn
會怎么樣呢,我們來看段代碼:
Observable<String> observable = Observable.create(
(ObservableEmitter<String> emitter) -> {
log("emit thread=" + Thread.currentThread().getName());
emitter.onNext("A");
}
);
observable = observable.subscribeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.single())
.observeOn(AndroidSchedulers.mainThread())
.observeOn(Schedulers.single());
observable.subscribe((String s) -> log("onNext1:" + s + " thread=" + Thread.currentThread().getName()));
日志輸出如下:
emit thread=main
onNext1:A thread=RxSingleScheduler-1
可以得出如下臨時結(jié)論:
-
subscribeOn
以第一次調(diào)用為準(zhǔn)鲁沥。 -
observeOn
以最后一次調(diào)用為準(zhǔn)呼股。
我們再來看下更復(fù)雜的例子:
Observable<String> observable = Observable.create(
(ObservableEmitter<String> emitter) -> {
log("emit thread=" + Thread.currentThread().getName());
emitter.onNext("A");
}
);
observable = observable.subscribeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.single())
.observeOn(AndroidSchedulers.mainThread())
.observeOn(Schedulers.single());
observable.subscribe((String s) -> log("onNext1:" + s + " thread=" + Thread.currentThread().getName()));
observable = observable.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io()) ;
observable.subscribe((String s) -> log("onNext2:" + s + " thread=" + Thread.currentThread().getName()));
日志輸出如下:
emit thread=main
emit thread=main
onNext1:A thread=RxSingleScheduler-1
onNext2:A thread=RxCachedThreadScheduler-2
現(xiàn)在,我們可以得出最終結(jié)論了:
-
subscribeOn
以第一次調(diào)用為準(zhǔn)画恰。 -
observeOn
以調(diào)用subscribe
前的最后一次調(diào)用為準(zhǔn)彭谁,每個subscribe
單獨(dú)計算。
由此可知允扇,我們可以讓不同的Observer
在不同線程中調(diào)度缠局。
RxJava使用Scheduler
來表示線程調(diào)度,上面提到的Schedulers.newThread()
和AndroidSchedulers.mainThread()
都是由RxJava提供的Scheduler
實現(xiàn)類考润。一般我們不需要手動去實現(xiàn)Scheduler
狭园,而是通過Schedulers
或者AndroidSchedulers
(Android專用)獲取。下面分別來看下二者所提供的創(chuàng)建能力糊治。
Schedulers
newThread
大致等同于new Thread(runnable).start();
唱矛,線程數(shù)沒有上限,除了測試場景一般不會用到它井辜。
io
用于I/O操作場景绎谦,線程數(shù)沒有上限。與newThread
比較相似抑胎,區(qū)別在于該調(diào)度器的內(nèi)部使用了一個無數(shù)量上限的線程池燥滑,可以復(fù)用空閑的線程,因此效率更高阿逃。
computation
用于計算場景铭拧,計算指的是CPU密集型計算赃蛛,即不會被I/O等操作限制性能的操作,因此不要把I/O操作放在這里搀菩。該類型的Scheduler
使用固定數(shù)量的線程池呕臂,數(shù)量為處理器核數(shù)。除了阻塞(包括I/O操作肪跋、wait
等)外歧蒋,其他操作都可以使用該調(diào)度器,不過通常用處理事件循環(huán)州既,大數(shù)據(jù)運(yùn)算等谜洽。
single
單線程調(diào)度,所有任務(wù)都需要排隊依次運(yùn)行吴叶。
trampoline
任務(wù)在當(dāng)前線程運(yùn)行阐虚。
from(Executor executor)
使用指定的線程池調(diào)度。
AndroidSchedulers
mainThread
任務(wù)在主線程上運(yùn)行蚌卤。
from(Looper looper)
任務(wù)在指定Looper上調(diào)度实束。