RxJava(一)概述與基本使用
RxJava學(xué)習(xí)系列:
- RxJava(一)概述與基本使用
- [RxJava(二)創(chuàng)建操作符]
RxJava是近年來(lái)非晨抛#火熱且復(fù)雜的Android框架庄涡,本文基于RxJava 1.2.9來(lái)對(duì)其進(jìn)行分析腻要。
使用RxJava需要在build.gradle
中修改如下代碼:
dependencies {
...
compile 'io.reactivex:rxjava:1.2.9' //需要添加的代碼
compile 'io.reactivex:rxandroid:1.2.1' //rxandroid的依賴(lài)享完,基于rxjava的擴(kuò)展庫(kù)
}
一舞终、RxJava概述
RxJava是函數(shù)響應(yīng)式編程在Java語(yǔ)言上的實(shí)現(xiàn)霞揉,在了解RxJava之前我們先來(lái)簡(jiǎn)單學(xué)習(xí)下什么是函數(shù)響應(yīng)式編程旬薯。
函數(shù)響應(yīng)式編程
函數(shù)響應(yīng)式編程是函數(shù)式編程和響應(yīng)式編程這兩種編程范式的結(jié)合。
函數(shù)式編程(Functional Programming)是通過(guò)函數(shù)的調(diào)用與組合來(lái)處理數(shù)據(jù)适秩,獲取計(jì)算結(jié)果的一種編程范式绊序。
在函數(shù)式編程中硕舆,函數(shù)是"第一等公民",即與其他數(shù)據(jù)類(lèi)型地位相同:
- 可以賦值給其他變量骤公,因?yàn)樵谠诤瘮?shù)式編程中抚官,只用"表達(dá)式",即每一步都是一個(gè)運(yùn)算過(guò)程阶捆,都有返回值凌节,函數(shù)也必須都有返回值
- 也可以作為參數(shù)傳遞給其他函數(shù),比如閉包作為參數(shù)傳遞洒试。
函數(shù)式編程的優(yōu)點(diǎn):
- 簡(jiǎn)潔易懂倍奢,通過(guò)函數(shù)的鏈?zhǔn)秸{(diào)用使代碼可讀性更強(qiáng)
- 只依賴(lài)輸入的特性,每一個(gè)函數(shù)都可以看做一個(gè)獨(dú)立的單元垒棋,使代碼更易管理
- 由于函數(shù)不修改變量卒煞,所以不需要考慮死鎖的問(wèn)題,易于并發(fā)編程
響應(yīng)式編程(Reactive Programming)是一種面向數(shù)據(jù)流和變化傳播的編程范式叼架。
在響應(yīng)式編程中畔裕,任何的事件都看做是數(shù)據(jù)流的形式。上游發(fā)射數(shù)據(jù)流乖订,下游監(jiān)聽(tīng)數(shù)據(jù)流扮饶,在傳遞的過(guò)程中,對(duì)數(shù)據(jù)流進(jìn)行過(guò)濾垢粮,轉(zhuǎn)變贴届,合并靠粪,去重等處理蜡吧,當(dāng)下游接受到數(shù)據(jù)流時(shí),對(duì)其做出響應(yīng)占键。
在界面顯示中昔善,將要顯示的數(shù)據(jù)源(從網(wǎng)絡(luò)請(qǐng)求,數(shù)據(jù)庫(kù)查詢中得到)以數(shù)據(jù)流的形式畔乙,通過(guò)一系列的流轉(zhuǎn)過(guò)程(對(duì)數(shù)據(jù)進(jìn)行處理君仆,后臺(tái)線程發(fā)送到UI線程),交給界面牲距,界面在對(duì)數(shù)據(jù)做出相應(yīng)的響應(yīng)返咱。在界面的交互中,也是如此牍鞠,將用戶輸入的點(diǎn)擊咖摹,觸摸等事件以數(shù)據(jù)流的形式經(jīng)過(guò)層層傳遞交給對(duì)應(yīng)的窗口,在到對(duì)應(yīng)的控件难述,控件監(jiān)聽(tīng)到相應(yīng)的事件后萤晴,響應(yīng)用戶的行為吐句。
函數(shù)響應(yīng)式編程(Functional reactive programming)是通過(guò)函數(shù)塊(map,reduce店读,filter)來(lái)處理異步數(shù)據(jù)流的一種編程范式嗦枢。
在RxJava中,函數(shù)響應(yīng)式編程通過(guò)設(shè)置一個(gè)可觀察對(duì)象(Observable)和觀察者(Observer/Subscriber)屯断,Subscriber監(jiān)聽(tīng)Observable的事件文虏,Observable以數(shù)據(jù)流的形式發(fā)送事件,再通過(guò)一系列函數(shù)的鏈?zhǔn)秸{(diào)用(map/flatmap/filter等)對(duì)數(shù)據(jù)流進(jìn)行轉(zhuǎn)變和通過(guò)線程調(diào)度器(Scheduler)對(duì)數(shù)據(jù)流進(jìn)行并發(fā)處理殖演,最后由Subscriber接受到事件后對(duì)事件作出響應(yīng)择葡。
RxJava的定義
了解了函數(shù)響應(yīng)式編程后,我們就能理解RxJava的定義了剃氧。
RxJava是響應(yīng)式擴(kuò)展的JVM實(shí)現(xiàn):一個(gè)使用可觀察序列組成異步的敏储,基于事件的程序的庫(kù)。
總的來(lái)說(shuō)朋鞍,RxJava就是一個(gè)實(shí)現(xiàn)異步操作的庫(kù)已添,相比于AsyncTask/Handler等異步操作的機(jī)制來(lái)說(shuō),RxJava的優(yōu)點(diǎn)在于其使用了函數(shù)式的編程范式滥酥,使用函數(shù)的鏈?zhǔn)秸{(diào)用對(duì)數(shù)據(jù)流的發(fā)送更舞,流轉(zhuǎn),接受坎吻,響應(yīng)進(jìn)行處理缆蝉,使得代碼異常簡(jiǎn)潔明了,再加上JDK8中支持的lambda表達(dá)式可以使代碼變的更為簡(jiǎn)化瘦真。
二刊头、觀察者模式
RxJava的實(shí)現(xiàn)使用了觀察者模式,我們就來(lái)講一下觀察者模式诸尽。
什么是觀察者模式
觀察者模式的機(jī)制是存在一個(gè)被觀察者和觀察者原杂,觀察者對(duì)被觀察者的某種特征進(jìn)行監(jiān)聽(tīng),當(dāng)這種特征發(fā)生變化時(shí)您机,觀察者立刻做出反應(yīng)穿肄。例如,在監(jiān)獄里际看,獄警是觀察者咸产,犯人是被觀察者,獄警對(duì)犯人的打開(kāi)牢門(mén)的行為進(jìn)行了監(jiān)聽(tīng)仲闽,當(dāng)犯人打開(kāi)牢門(mén)時(shí)脑溢,獄警需要立刻沖上去把犯人制服。這就是觀察者模式蔼囊,獄警需要時(shí)刻緊盯犯人的行為焚志,觀察其是否有打開(kāi)牢門(mén)的行為衣迷。
編程中的觀察者模式
程序中的觀察者模式則有略微不同,觀察者不需要時(shí)刻去緊盯被觀察者酱酬,而是去訂閱(Subscribe)或注冊(cè)(Register)觀察者感興趣的行為壶谒,當(dāng)被觀察者發(fā)生這種行為時(shí),讓被觀察者去通知他(在上例中就是獄警告訴犯人膳沽,你要開(kāi)牢門(mén)的時(shí)候要提醒我)汗菜。
通常的模式是這樣的:Observable
內(nèi)部有一個(gè)成員變量mObserver
,通過(guò)訂閱函數(shù)setObserver
來(lái)設(shè)置訂閱者挑社,建立訂閱關(guān)系陨界,同時(shí)在被觀察的事件發(fā)生時(shí)調(diào)用通知函數(shù)notifyObserver
來(lái)通知mObserver
去調(diào)用它的響應(yīng)函數(shù)。
點(diǎn)擊事件中的觀察者模式
Android中有很多觀察者的例子痛阻,比如注冊(cè)監(jiān)聽(tīng)事件就是一個(gè)很典型的例子菌瘪。被觀察者View
調(diào)用setOnClickListener
來(lái)給自己設(shè)置一個(gè)觀察者OnClickListener
,約定好OnClickListener
訂閱了View
的點(diǎn)擊事件阱当。當(dāng)View
被點(diǎn)擊時(shí)俏扩,會(huì)調(diào)用performClick
方法去通知OnClickListener
,調(diào)用OnClickListener.onClick
方法弊添,即OnClickListener
對(duì)點(diǎn)擊事件作出響應(yīng)录淡。
三、RxJava的基本使用
RxJava中基本的實(shí)現(xiàn)為創(chuàng)建觀察者油坝,創(chuàng)建被觀察者嫉戚,建立訂閱關(guān)系
第一步:創(chuàng)建Subscriber/Observer
首先是創(chuàng)建一個(gè)觀察者Subscriber/Observer
,其中Observer
是一個(gè)接口澈圈,其中包含onNext(T t)
彬檀,onCompleted()
,onError(Throwable e)
极舔,三個(gè)抽象方法凤覆,而Subscriber
是一個(gè)抽象類(lèi)链瓦,實(shí)現(xiàn)了Observer
接口拆魏,上述三個(gè)方法的實(shí)現(xiàn)由其子類(lèi)來(lái)實(shí)現(xiàn),在使用上兩者區(qū)別不大慈俯,在Observable.subscribe
方法中如果傳入Observer
渤刃,會(huì)將其封裝成Subscriber
來(lái)進(jìn)行訂閱的。Subscriber
提供了更多的功能贴膘,在之后再進(jìn)行講解卖子。
創(chuàng)建Subscriber
:
Subscriber<String> subscriber = new Subscriber<>() {
@Override
public void onNext(String s) {
Log.d("TAG", "onNext: " + s);
}
@Override
public void onCompleted() {
Log.d("TAG", "onCompleted");
}
@Override
public void onError(Throwable e) {
Log.d("TAG", "onError");
}
};
RxJava中將事件看做是一個(gè)隊(duì)列,關(guān)于這三個(gè)事件回調(diào)方法作用如下:
-
onNext(T t)
:不斷的處理下一個(gè)事件刑峡,直至隊(duì)尾 -
onCompleted()
:當(dāng)不在有新的事件時(shí)洋闽,調(diào)用這個(gè)方法作為結(jié)束標(biāo)志玄柠,這個(gè)方法會(huì)在最后一個(gè)onNext
調(diào)用之后調(diào)用 -
onError(Throwable e)
:當(dāng)事件隊(duì)列出現(xiàn)異常時(shí),會(huì)觸發(fā)這個(gè)方法诫舅,并終止事件隊(duì)列羽利,不在處理新的事件。
onError
和onCompleted
在一個(gè)處理流里面只會(huì)調(diào)用一個(gè)刊懈。
第二步:創(chuàng)建Observable
調(diào)用Observable.create(OnSubcribe)
來(lái)創(chuàng)建一個(gè)Observable
这弧,OnSubscirbe
繼承了Action1<T>
接口,Action1<T>
接口中只包含一個(gè)方法void call(T t)
虚汛。在RxJava源碼中有很多ActionX
的接口匾浪,X表示接口的泛型個(gè)數(shù)和call
方法的參數(shù)個(gè)數(shù),call
中每個(gè)參數(shù)與泛型類(lèi)型一一對(duì)應(yīng)卷哩。
我們先來(lái)看OnSubscribe
的代碼:
/**
* Invoked when Observable.subscribe is called.
* @param <T> the output value type
*/
public interface OnSubscribe<T> extends Action1<Subscriber<? super T>> {
// cover for generics insanity
}
從注釋可以看出在建立訂閱關(guān)系Observable.subscribe
調(diào)用后會(huì)去調(diào)用OnSubscribe.call
方法蛋辈,所以call
中就應(yīng)該實(shí)現(xiàn)Subscriber
對(duì)事件作出的響應(yīng)的處理邏輯。
創(chuàng)建Observable:
Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscribe<? super String> subscriber) {
//subcricber對(duì)事件的響應(yīng)邏輯
subscriber.onNext("first");
subscriber.onNext("second");
subscriber.onNext("third");
subscriber.onCompleted();
}
} );
除了通過(guò)Observable.create
來(lái)創(chuàng)建Observable
還可以通過(guò)just(T... t)
将谊,from(T[] t)
梯浪,from(Iterable<? extends T> iterable)來(lái)進(jìn)行創(chuàng)建
Observable`
just
傳入的是多個(gè)事件參數(shù):
Observable observable = Observable.just("first", "second", "third");
from
傳入的是一個(gè)事件數(shù)組:
String[] s = {"first", "second", "third"};
Observable observable = Observable.from(s);
上述兩種方式都和第一種方式?jīng)]有區(qū)別,just
和from
都在其內(nèi)部對(duì)事件回調(diào)函數(shù)進(jìn)行了調(diào)用
第三步:建立訂閱關(guān)系
通過(guò)Observable.subscribe(subscriber)
來(lái)進(jìn)行訂閱關(guān)系的建立
observable.subscribe(subscirber);
subscribe
返回的不是Observable
瓢娜,是Subscription
接口挂洛,后序系列詳解這個(gè)接口
除了這種方式,RxJava還提供了訂閱不完整定義的事件回調(diào)函數(shù)眠砾,在subscribe
方法中直接傳入事件回調(diào)函數(shù):
Subscription subscribe(final Action1<? super T> onNext)
Subscription subscribe(final Action1<? super T> onNext, final Action1<Throwable> onError)
Subscription subscribe(final Action1<? super T> onNext, final Action1<Throwable> onError, final Action0 onCompleted) {
前面已經(jīng)講過(guò)了ActionX
接口的含義虏劲,這里subscribe
的三個(gè)重載就是可以不用麻煩的創(chuàng)建Subscriber
,而是直接去實(shí)現(xiàn)回調(diào)方法就行了褒颈。三個(gè)回調(diào)方法與重載參數(shù)的順序是確定的柒巫,看上述方法參數(shù)的形參名就知道了。
以第三個(gè)方法為例谷丸,實(shí)現(xiàn)跟之前創(chuàng)建Subscriber
一樣的效果:
observable.subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.println("onNext: " + s);
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
System.out.println("onError");
}
}, new Action0() {
@Override
public void call() {
System.out.println("onCompleted");
}
});
可以根據(jù)方法的重載(第一種和第二種情況)堡掏,不用將三個(gè)方法都實(shí)現(xiàn),這樣未實(shí)現(xiàn)的方法就默認(rèn)為什么操作都不執(zhí)行刨疼。
通過(guò)以上方式泉唁,我們就實(shí)現(xiàn)了對(duì)RxJava的基本使用,RxJava使用函數(shù)式編程的思想揩慕,支持鏈?zhǔn)秸{(diào)用亭畜,合起來(lái)代碼如下:
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
//subcricber對(duì)事件的響應(yīng)邏輯
subscriber.onNext("first");
subscriber.onNext("second");
subscriber.onNext("third");
subscriber.onCompleted();
}
}).subscribe(new Subscriber<String>() {
@Override
public void onNext(String s) {
System.out.println("onNext: " + s);
}
@Override
public void onCompleted() {
System.out.println("onCompleted");
}
@Override
public void onError(Throwable e) {
System.out.println("onError");
}
});
運(yùn)行結(jié)果如下:
onNext: first
onNext: second
onNext: third
onCompleted
整個(gè)的調(diào)用流程如下:
首先通過(guò)Observable.create(OnSubscribe)
來(lái)創(chuàng)建一個(gè)Observable
對(duì)象,(當(dāng)然你也可以通過(guò)just
或者from
的方式來(lái)創(chuàng)建迎卤,這樣就不需要OnSubscribe
了拴鸵,在內(nèi)部會(huì)自動(dòng)去調(diào)用Subscriber
的事件回調(diào)方法)。然后調(diào)用Observable.subcribe(Subscriber)
傳入一個(gè)實(shí)現(xiàn)了Subscriber
的實(shí)現(xiàn)類(lèi),這個(gè)實(shí)現(xiàn)類(lèi)里面實(shí)現(xiàn)了onNext
劲藐,onCompleted
八堡,onError
三個(gè)方法。
在Observable.subcribe(Subscriber)
的內(nèi)部會(huì)去調(diào)用OnSubscribe.call()
這個(gè)方法聘芜,而我們?cè)趧?chuàng)建Observable
時(shí)傳入的OnSubscribe
的匿名內(nèi)部類(lèi)中實(shí)現(xiàn)了call(Subscriber<? super String> subscriber)
方法秕重,去調(diào)用傳入進(jìn)來(lái)的subscriber
的事件回調(diào)的三個(gè)函數(shù),來(lái)對(duì)事件作出響應(yīng)厉膀。
由此可以發(fā)現(xiàn)溶耘,RxJava中的觀察者模式與一般的不同點(diǎn)在于:
- RxJava中的事件是一個(gè)虛擬的事件,不是來(lái)源于外部注入的事件服鹅,而是是在創(chuàng)建被觀察者的時(shí)候就已經(jīng)將事件給注入進(jìn)去了凳兵。
- RxJava中被觀察者是在與觀察者建立訂閱關(guān)系的同時(shí),內(nèi)部調(diào)用
OnSubscribe
的call
方法去通知觀察者對(duì)事件作出響應(yīng)的企软。
四庐扫、RxJava的線程調(diào)度
前面已經(jīng)講述了RxJava的基本使用,但是你看了之后可能發(fā)現(xiàn)這個(gè)RxJava不是多此一舉仗哨,整些花里胡哨的形庭,直接調(diào)用幾個(gè)方法去對(duì)這些事件進(jìn)行處理不就行了,為什么還要整那些觀察者厌漂,被觀察者干嘛呢萨醒。剛開(kāi)始我學(xué)習(xí)RxJava的時(shí)候也是這樣覺(jué)得的,就覺(jué)得這個(gè)東西沒(méi)什么用苇倡,還把自己搞的暈頭轉(zhuǎn)向的富纸,但是后來(lái)學(xué)習(xí)了RxJava的異步處理之后才知道這樣做的好處。其實(shí)前面也有將到RxJava的核心思想和優(yōu)勢(shì)旨椒,就是在于對(duì)數(shù)據(jù)流的轉(zhuǎn)換和對(duì)異步操作的處理晓褪。所以這里我們就來(lái)先講一下RxJava中的線程調(diào)度。
Scheduler
RxJava中提供了一個(gè)Scheduler
——線程調(diào)度器的機(jī)制综慎,其作用是指定RxJava中的每個(gè)過(guò)程在什么線程中運(yùn)行涣仿,所以現(xiàn)在你可能就明白了一個(gè)很簡(jiǎn)單的事件響應(yīng)為什么要分成多個(gè)過(guò)程來(lái)處理了,其原因就是為了利于線程的調(diào)度示惊。
在Schedulers
中定義了幾個(gè)常用的Scheduler
(幾個(gè)靜態(tài)方法的返回值):
-
Schedulers.immediate()
:默認(rèn)的Scheduler
好港,直接在當(dāng)前線程運(yùn)行,相當(dāng)于不指定線程 -
Schedulers.newThread()
:?jiǎn)?dòng)一個(gè)新線程涝涤,在新線程中執(zhí)行任務(wù) -
Scheduler.io()
:用于IO異步阻塞操作的Scheduler
媚狰,內(nèi)部是用無(wú)上限的線程池實(shí)現(xiàn)的,可以重用空閑的線程阔拳,效率比newThread
要高。不要把計(jì)算操作放在這個(gè)Scheduler
中。 -
Scheduler.computation()
:用于事件循環(huán)糊肠,回調(diào)以及其他計(jì)算工作的Scheduler
辨宠,內(nèi)部是一個(gè)固定大小為CPU核數(shù)的線程池,不能把IO阻塞操作放在這個(gè)線程中 -
Schedulers.from(executor)
:使用指定的executor
作為Scheduler
-
AndroidSchedulers.mainThread()
:RxAndroid中添加的Scheduler
货裹,表示在Android主線程中運(yùn)行(RxAndroid是一個(gè)基于RxJava的擴(kuò)展庫(kù)嗤形,兼容了Android的一些特性)
現(xiàn)在我們就可以去設(shè)置每個(gè)過(guò)程中的Scheduler
來(lái)指定每個(gè)過(guò)程的執(zhí)行線程了。在RxJava中通過(guò)subcsribeOn()
和observeOn()
來(lái)設(shè)置執(zhí)行線程弧圆,subscribeOn
指定Observable.OnSubscribe
調(diào)用call()
方法時(shí)所在的線程赋兵,即事件產(chǎn)生的線程,observeOn()
指定Subscriber
事件回調(diào)的執(zhí)行線程搔预,即事件響應(yīng)的線程霹期。
那么現(xiàn)在我們就來(lái)寫(xiě)一個(gè)從IO線程讀取圖片的例子:
Observable.create(new Observable.OnSubscribe<Bitmap>() {
@Override
public void call(Subscriber<? super Bitmap> subscriber) {
Bitmap bitmap = BitmapFactory.decodeFile(picPath);
subscriber.onNext(bitmap);
subscriber.onCompleted();
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Bitmap>() {
@Override
public void onCompleted() {
Toast.makeText(RxJavaActivity.this, "圖片加載成功", Toast.LENGTH_LONG).show();
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Bitmap bitmap) {
mIvLocalImage.setImageBitmap(bitmap);
}
});
上述代碼就實(shí)現(xiàn)了在IO
線程中去讀取SD卡上的圖片,然后在主線程中顯示出來(lái)拯田。將上述代碼中完整的subscriber
實(shí)現(xiàn)變?yōu)椴煌暾氖录卣{(diào)函數(shù)历造,并且使用lambda表達(dá)式,代碼如下:
Observable.<Bitmap>create(subscriber -> {
Bitmap bitmap = BitmapFactory.decodeFile(picPath);
subscriber.onNext(bitmap);
subscriber.onCompleted();
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
bitmap -> mIvLocalImage.setImageBitmap(bitmap),
e -> {},
() -> Toast.makeText(RxJavaActivity.this, "圖片加載成功", Toast.LENGTH_LONG).show()
);
RxJava配合Lambda表達(dá)式實(shí)現(xiàn)異步操作船庇,是不是使用起來(lái)特別方便簡(jiǎn)潔吭产!
ps:以上就是關(guān)于RxJava的概述以及RxJava的基本使用了,關(guān)于RxJava的后續(xù)學(xué)習(xí)我會(huì)在后面的文章中進(jìn)行講解的鸭轮,敬請(qǐng)關(guān)注臣淤!
[RxJava(二)創(chuàng)建操作符]: