Rxjava是什么
響應(yīng)式編程
觀察者設(shè)計模式
一個實現(xiàn)異步操作的庫
代碼托管地址
關(guān)于響應(yīng)式編程
百科:
響應(yīng)式編程是一種面向數(shù)據(jù)流和變化傳播的編程范式恋捆,這意味著可以在編程語言中很方便地表達(dá)靜態(tài)或動態(tài)的數(shù)據(jù)流,而相關(guān)的計算模型會自動將變化的值通過數(shù)據(jù)流進(jìn)行傳播伤疙。
其他資料:響應(yīng)式編程就是與異步數(shù)據(jù)流交互的編程范式。
個人理解(Rxjava):
相對于命令式編程/函數(shù)式編程而言辆影,這里指在使用Rxjava過程中的對響應(yīng)式編程的理解;
傳統(tǒng)的兩種是自己主動控制去得到數(shù)據(jù)秸歧,主動控制數(shù)據(jù)得流向(展示/參數(shù))厨姚,然后將數(shù)據(jù)和數(shù)據(jù)的流向代碼組裝起來键菱。
Rxjava中的響應(yīng)式編程是被觀察者拿到數(shù)據(jù)主動傳遞給觀察者谬墙,將展示層和數(shù)據(jù)處理層分離经备,解耦了各個模塊,通過不同線程操控代碼運作配合變換過濾等api操作實現(xiàn)數(shù)據(jù)流傳播侵蒙。
關(guān)于觀察者模式
觀察者訂閱被觀察者造虎,被觀察者主動把所處理的結(jié)果或約定的信息返回給觀察者做處理(代碼上其實是被觀察者訂閱觀察者)
在android中被觀察者負(fù)責(zé)數(shù)據(jù)采集、轉(zhuǎn)化算凿、處理,觀察者接收處理的結(jié)果做出相應(yīng)的操作
- 舉例:點擊事件犁功、Eventbus
Button的點擊監(jiān)聽 OnClickListener 氓轰。對設(shè)置 OnClickListener 來說浸卦, Button 是被觀察者, OnClickListener 是觀察者限嫌,二者通過 setOnClickListener()
方法達(dá)成訂閱關(guān)系靴庆。訂閱之后用戶點擊按鈕的瞬間,Android Framework 就會將點擊事件發(fā)送給已經(jīng)注冊的 OnClickListener ,執(zhí)行Oclick()方法怒医。
EventBus是一個更加強大的可以進(jìn)行線程間操作觀察者模式炉抒,一旦注冊會將當(dāng)前類作為觀察者,內(nèi)部通過反射機制獲取接收數(shù)據(jù)結(jié)果的而各種方法稚叹,被觀察者通過post發(fā)送數(shù)據(jù)
- 觀察者模式的優(yōu)點
Observer模式(Rxjava)的優(yōu)點是實現(xiàn)了展示層和數(shù)據(jù)邏輯層的分離端礼,并定義了更新消息傳遞機制禽笑,提供各種類別清晰的接口,
使得可以有各種各樣不同的表示層(觀察者)蛤奥;一對多佳镜,一個被觀察者可以被多個觀察者監(jiān)聽。
Rxjava的優(yōu)點
異步凡桥、簡潔(邏輯蟀伸、代碼讀寫)。內(nèi)部支持多線程操作缅刽,強大的map和flatmap保證了依賴上一次接口數(shù)據(jù)進(jìn)行二次處理時不會發(fā)生嵌套啊掏,將各個模塊分離。
java1.8和第三方框架支持Lambda流式衰猛。保證了Rxjava的代碼在閱讀上更加簡潔迟蜜。
隨著程序邏輯的復(fù)雜,依然保持簡潔啡省。解耦了各個模塊操作娜睛,單一化,不嵌套卦睹。
lambda配置(配置jdk1.8或以上):
build.gradle(Module:app)中
android{
....
compileOptions {
sourceCompatibility JavaVersion.VERSION_1_8
targetCompatibility JavaVersion.VERSION_1_8}
}
defaultConfig {
....
jackOptions {
enabled true
}
}
設(shè)置成功可以lambda的部分會變灰畦戒,鼠標(biāo)移到上面顯示下圖,只需選中灰色部分alt+enter就可以顯示lambda表達(dá)式
Paste_Image.png
Rxjava的實現(xiàn)方式
1. 依賴
compile 'io.reactivex:rxjava:1.0.14'
compile 'io.reactivex:rxandroid:1.0.1'
2. Observer(觀察者结序,訂閱后最終執(zhí)行的動作)
Observer<String> observer = new Observer<String>() {
@Override
public void onNext(String s) {
//需要執(zhí)行的
Log.d(tag, "Item: " + s);
}
@Override
public void onCompleted() {
//回調(diào)完成
Log.d(tag, "Completed!");
}
@Override
public void onError(Throwable e) {
//回調(diào)失敗
Log.d(tag, "Error!");
}
};
- Observer擴展(Subscriber)
observer是Rxjava觀察者的最基本實現(xiàn)障斋,Rxjava提供了另一個實現(xiàn)Observer的抽象類,完善了Observer的方法(onStart()徐鹤、垃环、unsubscribe())
onStart(): 在訂閱之前執(zhí)行(后面會講到觀察者只有在被觀察者訂閱了才會執(zhí)行),不能被指定線程返敬,所以不能用來操作UI遂庄,只能簡單的做數(shù)據(jù)清空
unsubscribe():這是 Subscriber 所實現(xiàn)的另一個接口 Subscription 的方法,用于取消訂閱,防止內(nèi)存泄露救赐,兩種操作:1在onDestory()判斷然后反訂閱只磷,2主動在observale調(diào)用onCompleted
3. Observable(被觀察者经磅,訂閱后決定操作什么事件)
Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
//一旦被訂閱,在Observer中回調(diào)3次onNext()和一次onCompleted()
subscriber.onNext("a");
subscriber.onNext("b");
subscriber.onNext("c");
subscriber.onCompleted();
}
});
- Observable創(chuàng)建方式擴展(just(T...)/from(T[]) / from(Iterable<? extends T>))
just(T...) 將傳入的參數(shù)依次發(fā)送出來
from(T[]) / from(Iterable<? extends T> 將傳入的數(shù)組或 Iterable 拆分成具體對象后预厌,依次發(fā)送出來
just/from是把其他類型的對象和數(shù)據(jù)類型轉(zhuǎn)化成Observable
- 舉例
Observable observable = Observable.just("a", "b", "c");
// 將會依次調(diào)用:
// onNext("a");
// onNext("b");
// onNext("c");
// onCompleted();
4. Subscribe(訂閱)
observable.subscribe(observer); || observable.subscribe(subscriber);
subscribe核心源碼:
public Subscription subscribe(Subscriber subscriber) {
subscriber.onStart();
onSubscribe.call(subscriber);
return subscriber;
}
首先是調(diào)用傳入的 subscriber.onStart 方法,該方法默認(rèn)不做任何操作轧叽。之后就是調(diào)用創(chuàng)建Observable時保存的OnSubscriber.call方法苗沧,而在 call() 中我們調(diào)用了subscriber的 onNext() 和 onCompelte() 。這就完成了從了Observable到subscriber的數(shù)據(jù)的傳遞待逞。最后返回subscriber是為了方便取消訂閱等操作。
5. 常用方法講解
- ActionX()
Action1:
public interface Action1<T> extends Action {
void call(T t);
}
單參數(shù)無返回值的call方法网严。onNext(T obj)和onError(Throwable error)也是單參數(shù)無返回值的
Action0:
public interface Action0 extends Action {
void call();
}
無參數(shù)無返回值的call方法 ,由于 onCompleted() 方法也是無參無返回值的
總結(jié):在一些情況下不需要去全部實現(xiàn)震束,其實就是observer/Subscriber的部分抽離怜庸,作為一個參數(shù)傳入subscribe()實現(xiàn)不完整回調(diào)(完整的有三個回調(diào)方法)
- FuncX()
FuncX
FuncX和ActionX類似垢村,只是FuncX有返回值,用于observable的數(shù)據(jù)處理
public interface Func1<T1, R> extends Function {
public R call(T1 t1);
}
public interface Func2<T1, T2, R> extends Function {
public R call(T1 t1, T2 t2);
}
至于它的Func0嘉栓、Func1的實現(xiàn)只是多傳了幾個參數(shù),配合map(操作符)胸懈,實現(xiàn)observable數(shù)據(jù)的進(jìn)一步處理
6. 常用操作符講解
- map()
Returns an Observable that applies a specified function to each item emitted by the source Observable and emits the results of these function applications
對Observable發(fā)射的數(shù)據(jù)都應(yīng)用一個函數(shù)担扑,然后再發(fā)射最后的結(jié)果集趣钱。最后map()方法返回一個新的Observable
map配合FuncX()實現(xiàn)數(shù)據(jù)的進(jìn)一步處理
如:
Observable.just("aa").map(new Func1<String, String>() {
@Override
public String call(String s) {
return s+"bb";
}
});
- flatmap()
Returns an Observable that emits items based on applying a function that you supply to each item emitted by the source Observable, where that function returns an Observable, and then merging those resulting Observables and emitting the results of this merger.
對Observable發(fā)射的數(shù)據(jù)都應(yīng)用(apply)一個函數(shù),這個函數(shù)返回一個Observable首有,然后合并這些Observables燕垃,并且發(fā)送(emit)合并的結(jié)果井联。flatMap和map操作符很相像,flatMap發(fā)送的是合并后Observables烙常,map操作符發(fā)送的是應(yīng)用函數(shù)后返回的結(jié)果集轴捎、
變換整個事件隊列蚕脏;
flatMap() 和 map() 有一個相同點:它也是把傳入的參數(shù)轉(zhuǎn)化之后返回另一個對象。但需要注意驼鞭,和 map() 不同的是秦驯, flatMap() 中返回的是個 Observable 對象挣棕, 并且這個 Observable 對象并不是被直接發(fā)送到了 Subscriber 的回調(diào)方法中亲桥。
flatMap() 的原理是這樣的:
- 對于每傳入的一個事件對象創(chuàng)建一個 Observable 對象,當(dāng)然flatMap是操縱整個事件序列固耘,目的是把事件序列中的對象一一取出一一進(jìn)行轉(zhuǎn)換,一般配合Func1<param1厅目,param2>取單個事件悼凑,如果非要傳入一個List璧瞬,就把整個List對象的事件隊列返回一個包括 所有事件的一個Observable,沒什么意義等于還是一對一轉(zhuǎn)換不符合flatmap的原理嗤锉;
- 對于每一個創(chuàng)建出來的 Observable 發(fā)送的事件渔欢,都被匯入一個Observable(新的瘟忱,亂序) 奥额;解釋一下:一旦數(shù)據(jù)序列執(zhí)行flatmap方法后當(dāng)被訂閱時不是取一個事件執(zhí)行一次onNext访诱,而是所有的事件都被轉(zhuǎn)化后再一一執(zhí)行OnNext;如果flatmap后還有其他操作(不是flatmap(...).subscribe(..)直接調(diào)用的這種)則流程和上面一樣触菜,只不過要在每次取得事件時將其他操作執(zhí)行完畢再取下一個九榔。一旦被訂閱就在在單一線程中執(zhí)行(默認(rèn)只有一個線程操作)涡相;如果在操縱數(shù)據(jù)時指定.subscribeOn()則多線程執(zhí)行且輸出亂序;
- 舉例
/**
* Observable.from()/just方法催蝗,它接收一個集合/數(shù)組作為輸入切威,然后每次輸出一個元素給subscriber:返回的是Observable
* func<param1,param2></> 一個函數(shù)丙号,當(dāng)應(yīng)用于由源Observable發(fā)出的項時,返回一個Observable
* @param1,前面?zhèn)鬟^來Observable里面的結(jié)果集犬缨,這里可能說法不太標(biāo)準(zhǔn)喳魏,為了便于理解遍尺,注意是結(jié)果集
* @param2,返回的Observable
*/
List<Integer> integers = Arrays.asList(2, 3, 4, 5, 6, 7, 8, 9, 10);
Observable.from(integers)
.flatMap(new Func1<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> call(Integer integer) {
Log.i("tag", "RxMethod1-----11---::" + integer);
return Observable.just(integer * integer);
}
})
.flatMap(new Func1<Integer, Observable<String>>() {
@Override
public Observable<String> call(final Integer integer) {
return Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
String s = integer + " / ";
subscriber.onNext(s);
Log.i("tag", "RxMethod1-----22---::" + s);
subscriber.onCompleted();
}
})
// .subscribeOn(Schedulers.io())如果把執(zhí)行線程放在這里涮拗,則多線程執(zhí)行乾戏,順序也就不能保證了
;
}
})
.subscribeOn(Schedulers.io())//指定被觀察者執(zhí)行的線程
.observeOn(AndroidSchedulers.mainThread())//指定觀察者執(zhí)行的線程
.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
tv.setText(str1);
}
@Override
public void onError(Throwable e) {
Log.i("tag", "onError--------::" + e.toString());
}
@Override
public void onNext(String s) {
str1 = sb1.append(s).toString();
}
});
解析:
new Func1<傳入的參數(shù), 返回的Observable的數(shù)據(jù)集>()
- concatmap()
和flatmap類似,但是有有順序的從返回的observable輸出到另一個observable中最后統(tǒng)一交給Subscribe的回調(diào)方法,永遠(yuǎn)單一線程執(zhí)行鼓择,保證順序
- doOnSubscribe()
Observable.just(1, 2, 3, 4)
.doOnSubscribe(....)//彈窗等等
.subscribeOn(Schedulers.io()) // 指定 subscribe() 發(fā)生在 IO 線程
.observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回調(diào)發(fā)生在主線程
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer number) {
Log.d(tag, "number:" + number);
}
});
默認(rèn)情況下三幻, doOnSubscribe() 執(zhí)行在 subscribe() 發(fā)生的線程呐能;而如果在 doOnSubscribe() 之后有 subscribeOn() 的話,它將執(zhí)行在離它最近的 subscribeOn() 所指定的線程摆出。
- doOnNext()
允許程序做一些額外的操作朗徊,可能與訂閱執(zhí)行結(jié)果無關(guān)偎漫,僅做一些信息或過程中某一步結(jié)果的的回傳,保留象踊,供其他地方使用
- filter()
做信息過濾,與map一樣返回結(jié)果集
- take()
輸出最多指定數(shù)量的結(jié)果温亲。
- interval()
創(chuàng)建一個定時發(fā)射整數(shù)序列的Observable
- compose(bindToLifecycle())
管理生命周期, 防止內(nèi)存泄露(在訂閱前調(diào)用)
- reduce()
reduce操作符實際上是把傳入的list里的所有item進(jìn)行兩兩比較或添加
- Range()
創(chuàng)建發(fā)射指定范圍的整數(shù)序列的Observable
- Repeat()
創(chuàng)建重復(fù)發(fā)射特定的數(shù)據(jù)或數(shù)據(jù)序列的Observable
- Timer()
創(chuàng)建在一個指定的延遲之后發(fā)射單個數(shù)據(jù)的Observable
7. 總結(jié)
本篇總結(jié)了Rxjava的基本使用流程杯矩,常用的API,操作符的理解史隆,有不
足之處還請各位看官指教魂务,下一篇會將Retrofit配合Rxjava實現(xiàn)網(wǎng)絡(luò)請
求泌射,以及獲取數(shù)據(jù)后的各種處理头镊。
代碼托管地址