函數(shù)響應(yīng)式編程
RxJava到底是什么
一個在 Java VM 上使用可觀測的序列來組成異步的届垫、基于事件的程序的庫。RxJava 的本質(zhì)可以壓縮為異步這一個詞。說到根上,它就是一個實現(xiàn)異步操作的庫摹迷。
特點
簡潔 隨著程序邏輯變得越來越復(fù)雜疟赊,它依然能夠保持簡潔郊供。
創(chuàng)建Observer
Observer 即觀察者,它決定事件觸發(fā)的時候?qū)⒂性鯓拥男袨?/p>
Observer<String> observer = new Observer<String>() {
@Override
public void onNext(String s) {
Log.d(tag, "Item: " + s);
}
@Override
public void onCompleted() {
Log.d(tag, "Completed!");
}
@Override
public void onError(Throwable e) {
Log.d(tag, "Error!");
}
};
Subscriber是Observer 的子類
Subscriber<String> mSubscriber = new Subscriber<String>() {
@Override
public void onStart() {
super.onStart();
}
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
Toast.makeText(MainActivity.this, s, Toast.LENGTH_SHORT).show();
}
};
- Observer比Subscriber多一個
onStart()
方法近哟。它會在 subscribe 剛開始驮审,而事件還未發(fā)送之前被調(diào)用,可以用于做一些準備工作吉执,例如數(shù)據(jù)的清零或重置疯淫。這是一個可選方法,默認情況下它的實現(xiàn)為空戳玫。
需要注意的是熙掺,如果對準備工作的線程有要求(例如彈出一個顯示進度的對話框,這必須在主線程執(zhí)行)咕宿, onStart()就不適用了币绩,因為它總是在 subscribe 所發(fā)生的線程被調(diào)用蜡秽,而不能指定線程。要在指定的線程來做準備工作缆镣,可以使用doOnSubscribe()
方法芽突。
Observable.create(onSubscribe)
.subscribeOn(Schedulers.io())
.doOnSubscribe(new Action0() {
@Override
public void call() {
progressBar.setVisibility(View.VISIBLE); // 需要在主線程執(zhí)行
}
})
.subscribeOn(AndroidSchedulers.mainThread()) // 指定主線程
.observeOn(AndroidSchedulers.mainThread())
.subscribe(subscriber);
它和
Subscriber.onStart()
同樣是在subscribe()
調(diào)用后而且在事件發(fā)送前執(zhí)行,但區(qū)別在于它可以指定線程董瞻。默認情況下寞蚌,doOnSubscribe()
執(zhí)行在subscribe()
發(fā)生的線程;而如果在doOnSubscribe()
之后有subscribeOn()
的話钠糊,它將執(zhí)行在離它最近的subscribeOn()
所指定的線程挟秤。
-
unsubscribe()
: 這是 Subscriber所實現(xiàn)的另一個接口 Subscription
的方法,用于取消訂閱抄伍。在這個方法被調(diào)用后煞聪,Subscriber將不再接收事件。一般在這個方法調(diào)用前逝慧,可以使用isUnsubscribed()
先判斷一下狀態(tài)昔脯。unsubscribe()
這個方法很重要,因為在subscribe()
后笛臣, Observable會持有 Subscriber的引用云稚,這個引用如果不能及時被釋放,將有內(nèi)存泄露的風(fēng)險沈堡。所以最好保持一個原則:要在不再使用的時候盡快在合適的地方(例如onPause()
onStop()
等方法中)調(diào)用unsubscribe()
來解除引用關(guān)系静陈,以避免內(nèi)存泄露的發(fā)生。
創(chuàng)建Observable
Observable 即被觀察者诞丽,它決定什么時候觸發(fā)事件以及觸發(fā)怎樣的事件鲸拥。
- 使用
create()
方法創(chuàng)建
Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("Hello");
subscriber.onNext("Hi");
subscriber.onNext("Aloha");
subscriber.onCompleted();
}
});
當(dāng) Observable 被訂閱的時候,OnSubscribe 的 call() 方法會自動被調(diào)用僧免,事件序列就會依照設(shè)定依次觸發(fā)(對于上面的代碼刑赶,就是觀察者Subscriber 將會被調(diào)用三次 onNext() 和一次 onCompleted())
-
just(T...)
: 將傳入的參數(shù)依次發(fā)送出來。
Observable observable = Observable.just("Hello", "Hi", "Aloha");
-
from(T[]) / from(Iterable<? extends T>)
: 將傳入的數(shù)組或 Iterable 拆分成具體對象后懂衩,依次發(fā)送出來撞叨。
String[] words = {"Hello", "Hi", "Aloha"};
Observable observable = Observable.from(words);
Subscribe (訂閱)
創(chuàng)建了 Observable 和 Observer 之后,再用 subscribe()
方法將它們聯(lián)結(jié)起來浊洞。
observable.subscribe(observer);
// 或者:
observable.subscribe(subscriber);```
>有人可能會注意到牵敷, subscribe() 這個方法有點怪:它看起來是『observalbe 訂閱了 observer / subscriber』而不是『observer / subscriber 訂閱了 observalbe』,這看起來就像『雜志訂閱了讀者』一樣顛倒了對象關(guān)系法希。這讓人讀起來有點別扭枷餐,不過如果把 API 設(shè)計成observer.subscribe(observable) / subscriber.subscribe(observable) ,雖然更加符合思維邏輯苫亦,但對流式 API 的設(shè)計就造成影響了毛肋,比較起來明顯是得不償失的奕锌。
`Observable.subscribe(Subscriber)`的內(nèi)部實現(xiàn)是這樣的(僅核心代碼):
// 注意:這不是 subscribe() 的源碼,而是將源碼中與性能村生、兼容性惊暴、擴展性有關(guān)的代碼剔除后的核心代碼。
// 如果需要看源碼趁桃,可以去 RxJava 的 GitHub 倉庫下載辽话。
public Subscription subscribe(Subscriber subscriber) {
subscriber.onStart();
onSubscribe.call(subscriber);
return subscriber;
}
可以看到,`subscriber()` 做了3件事:
* 調(diào)用 `Subscriber.onStart()` 卫病。這個方法在前面已經(jīng)介紹過油啤,是一個可選的準備方法。
* 調(diào)用 `Observable` 中的 `OnSubscribe.call(Subscriber)` 蟀苛。在這里益咬,事件發(fā)送的邏輯開始運行。從這也可以看出帜平,在 RxJava 中幽告, `Observable` 并不是在創(chuàng)建的時候就立即開始發(fā)送事件,而是在它被訂閱的時候裆甩,即當(dāng) `subscribe()` 方法執(zhí)行的時候冗锁。
* 將傳入的 `Subscriber` 作為 `Subscription` 返回。這是為了方便 `unsubscribe()`.
除了 `subscribe(Observer)` 和 `subscribe(Subscriber)` 嗤栓,`subscribe()` 還支持不完整定義的回調(diào)冻河,RxJava 會自動根據(jù)定義創(chuàng)建出 Subscriber 。形式如下:
Action1<String> onNextAction = new Action1<String>() {
// onNext()
@Override
public void call(String s) {
Log.d(tag, s);
}
};
Action1<Throwable> onErrorAction = new Action1<Throwable>() {
// onError()
@Override
public void call(Throwable throwable) {
// Error handling
}
};
Action0 onCompletedAction = new Action0() {
// onCompleted()
@Override
public void call() {
Log.d(tag, "completed");
}
};
// 自動創(chuàng)建 Subscriber 茉帅,并使用 onNextAction 來定義 onNext()
observable.subscribe(onNextAction);
// 自動創(chuàng)建 Subscriber 叨叙,并使用 onNextAction 和 onErrorAction 來定義 onNext() 和 onError()
observable.subscribe(onNextAction, onErrorAction);
// 自動創(chuàng)建 Subscriber ,并使用 onNextAction堪澎、 onErrorAction 和 onCompletedAction 來定義 onNext()擂错、 onError() 和 onCompleted()
observable.subscribe(onNextAction, onErrorAction, onCompletedAction);
Observable.just("1111")
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
tv1.setText(s);
}
});
簡單解釋一下這段代碼中出現(xiàn)的 Action1 和 Action0。 Action0 是 RxJava 的一個接口全封,它只有一個方法 `call()`马昙,這個方法是無參無返回值的;由于 `onCompleted() `方法也是無參無返回值的刹悴,因此 `Action0` 可以被當(dāng)成一個包裝對象,將 `onCompleted()` 的內(nèi)容打包起來將自己作為一個參數(shù)傳入 `subscribe() `以實現(xiàn)不完整定義的回調(diào)攒暇。這樣其實也可以看做將 `onCompleted() `方法作為參數(shù)傳進了 `subscribe()`土匀,相當(dāng)于其他某些語言中的『閉包』。 Action1 也是一個接口形用,它同樣只有一個方法` call(T param)`就轧,這個方法也無返回值证杭,但有一個參數(shù);與 Action0 同理妒御,由于 `onNext(T obj)` 和 `onError(Throwable error)` 也是單參數(shù)無返回值的解愤,因此 Action1 可以將 `onNext(obj) `和 `onError(error) `打包起來傳入 `subscribe() `以實現(xiàn)不完整定義的回調(diào)。事實上乎莉,雖然 Action0 和 Action1 在 API 中使用最廣泛送讲,但 RxJava 是提供了多個 ActionX 形式的接口 (例如 Action2, Action3) 的,它們可以被用以包裝不同的無返回值的方法惋啃。
>正如前面所提到的哼鬓,Observer 和 Subscriber 具有相同的角色,而且 Observer 在 `subscribe()` 過程中最終會被轉(zhuǎn)換成 Subscriber 對象边灭,因此异希,從這里開始,后面的描述我將用 Subscriber 來代替 Observer 绒瘦,這樣更加嚴謹称簿。
---------------
##場景示例
######a.打印字符串?dāng)?shù)組
將字符串?dāng)?shù)組 names中的所有字符串依次打印出來:
String[] names = ...;
Observable.from(names)
.subscribe(new Action1<String>() {
@Override
public void call(String name) {
Log.d(tag, name);
}
});
######b. 由 id 取得圖片并顯示
由指定的一個 drawable 文件 id drawableRes 取得圖片,并顯示在 ImageView 中惰帽,并在出現(xiàn)異常的時候打印 Toast 報錯:
int drawableRes = ...;
ImageView imageView = ...;
Observable.create(new OnSubscribe<Drawable>() {
@Override
public void call(Subscriber<? super Drawable> subscriber) {
Drawable drawable = getTheme().getDrawable(drawableRes));
subscriber.onNext(drawable);
subscriber.onCompleted();
}
}).subscribe(new Observer<Drawable>() {
@Override
public void onNext(Drawable drawable) {
imageView.setImageDrawable(drawable);
}
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
Toast.makeText(activity, "Error!", Toast.LENGTH_SHORT).show();
}
});
然而
![](http://upload-images.jianshu.io/upload_images/1798389-9e6a44afb9cb0501.jpg?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)在 RxJava 的默認規(guī)則中予跌,事件的發(fā)出和消費都是在同一個線程的。也就是說善茎,如果只用上面的方法券册,實現(xiàn)出來的只是一個同步的觀察者模式。觀察者模式本身的目的就是『后臺處理垂涯,前臺回調(diào)』的異步機制烁焙,因此異步對于 RxJava 是至關(guān)重要的。而要實現(xiàn)異步耕赘,則需要用到 RxJava 的另一個概念: Scheduler 骄蝇。
##線程控制 —— Scheduler
在RxJava 中,Scheduler ——調(diào)度器操骡,相當(dāng)于線程控制器九火,RxJava 通過它來指定每一段代碼應(yīng)該運行在什么樣的線程。RxJava 已經(jīng)內(nèi)置了幾個 Scheduler 册招,它們已經(jīng)適合大多數(shù)的使用場景:
* `Schedulers.immediate()`: 直接在當(dāng)前線程運行岔激,相當(dāng)于不指定線程。這是默認的 Scheduler是掰。
* `Schedulers.newThread()`: 總是啟用新線程虑鼎,并在新線程執(zhí)行操作。
* `Schedulers.io()`: I/O 操作(讀寫文件、讀寫數(shù)據(jù)庫炫彩、網(wǎng)絡(luò)信息交互等)所使用的 Scheduler匾七。行為模式和 newThread() 差不多艘狭,區(qū)別在于 io() 的內(nèi)部實現(xiàn)是是用一個無數(shù)量上限的線程池杯聚,可以重用空閑的線程,因此多數(shù)情況下 io() 比 newThread() 更有效率抓督。不要把計算工作放在 io() 中杉允,可以避免創(chuàng)建不必要的線程邑贴。
* `Schedulers.computation()`: 計算所使用的 Scheduler。這個計算指的是 CPU 密集型計算夺颤,即不會被 I/O 等操作限制性能的操作痢缎,例如圖形的計算。這個 Scheduler 使用的固定的線程池世澜,大小為 CPU 核數(shù)独旷。不要把 I/O 操作放在 computation() 中,否則 I/O 操作的等待時間會浪費 CPU寥裂。
* 另外嵌洼, Android 還有一個專用的` AndroidSchedulers.mainThread()`,它指定的操作將在 Android 主線程運行封恰。
>有了這幾個 Scheduler 麻养,就可以使用 `subscribeOn()` 和 `observeOn() `兩個方法來對線程進行控制了。 `subscribeOn()`: 指定 `subscribe() `所發(fā)生的線程诺舔,即 `Observable.OnSubscribe `被激活時所處的線程鳖昌。或者叫做事件產(chǎn)生的線程低飒。 `observeOn()`: 指定 Subscriber 所運行在的線程许昨。或者叫做事件消費的線程褥赊。
Observable.just(1, 2, 3, 4)
.subscribeOn(Schedulers.io()) // 指定 subscribe() 發(fā)生在 IO 線程
.observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回調(diào)發(fā)生在主線程
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer number) {
Log.d(tag, "number:" + number);
}
});
>上面這段代碼中糕档,由于 `subscribeOn(Schedulers.io()) `的指定,被創(chuàng)建的事件的內(nèi)容 1拌喉、2速那、3、4 將會在 IO 線程發(fā)出尿背;而由于 `observeOn(AndroidScheculers.mainThread()) `的指定端仰,因此 subscriber 數(shù)字的打印將發(fā)生在主線程 。事實上残家,這種在` subscribe() `之前寫上兩句 `subscribeOn(Scheduler.io()) `和` observeOn(AndroidSchedulers.mainThread()) `的使用方式非常常見榆俺,它適用于多數(shù)的 『后臺線程取數(shù)據(jù),主線程顯示』的程序策略坞淮。
而前面提到的由圖片 id 取得圖片并顯示的例子茴晋,如果也加上這兩句:
int drawableRes = ...;
ImageView imageView = ...;
Observable.create(new OnSubscribe<Drawable>() {
@Override
public void call(Subscriber<? super Drawable> subscriber) {
Drawable drawable = getTheme().getDrawable(drawableRes));
subscriber.onNext(drawable);
subscriber.onCompleted();
}
})
.subscribeOn(Schedulers.io()) // 指定 subscribe() 發(fā)生在 IO 線程
.observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回調(diào)發(fā)生在主線程
.subscribe(new Observer<Drawable>() {
@Override
public void onNext(Drawable drawable) {
imageView.setImageDrawable(drawable);
}
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
Toast.makeText(activity, "Error!", Toast.LENGTH_SHORT).show();
}
});
那么,加載圖片將會發(fā)生在 IO 線程回窘,而設(shè)置圖片則被設(shè)定在了主線程诺擅。這就意味著,即使加載圖片耗費了幾十甚至幾百毫秒的時間啡直,也不會造成絲毫界面的卡頓烁涌。