1.觀察者模式
- 定義:
定義對(duì)象間一對(duì)多的依賴(lài)依賴(lài)關(guān)系,當(dāng)一個(gè)對(duì)象改變狀態(tài)時(shí)抬驴,所有依賴(lài)它的對(duì)象會(huì)收到通知并自動(dòng)更新。 - 理解:
其實(shí)就是發(fā)布訂閱模式缆巧,發(fā)布者發(fā)布信息布持,訂閱者獲取信息,訂閱了就能收到信息陕悬,沒(méi)訂閱就收不到信息题暖。屬于行為型設(shè)計(jì)模式的一種。 - 使用場(chǎng)景:
有一個(gè)微信公眾號(hào)服務(wù)捉超,不定時(shí)發(fā)布一些消息胧卤,關(guān)注公眾號(hào)就可以收到推送消息,取消關(guān)注就收不到推送消息拼岳。其中枝誊,客戶(hù)是觀察者,公眾號(hào)是被觀察者惜纸。
2.什么是RxJava(ReactiveX.io鏈?zhǔn)骄幊蹋?/h1>
定義:一個(gè)在 Java VM 上使用可觀測(cè)的序列來(lái)組成異步的叶撒、基于事件的程序的庫(kù)。
總結(jié):RxJava 是一個(gè) 基于事件流耐版、實(shí)現(xiàn)異步操作的庫(kù)
理解:RXJava是一個(gè)響應(yīng)式編程框架 祠够,采用觀察者設(shè)計(jì)模式,觀察者模式本身的目的就是『后臺(tái)處理粪牲,前臺(tái)回調(diào)』的異步機(jī)制
優(yōu)點(diǎn):
由于 RxJava是基于事件流的鏈?zhǔn)秸{(diào)用古瓤,所以使得 RxJava:
邏輯簡(jiǎn)潔
實(shí)現(xiàn)優(yōu)雅
使用簡(jiǎn)單
- 作用:
實(shí)現(xiàn)異步操作
類(lèi)似于 Android中的 AsyncTask 、Handler作用
3.Rxjava基本概念(觀察者模式)
- 案例:按鈕點(diǎn)擊處理腺阳、廣播注冊(cè)
通過(guò)setOnClickListener()方法湿滓,Button持有OnClickListener的引用;當(dāng)用戶(hù)擊時(shí)舌狗,Button自動(dòng)調(diào)用OnClickListener的onClick()方法叽奥。
Button——>被觀察者
OnClickListener——>觀察者
setOnClickListener ——>訂閱
onClick ——>事件
4. RxJava 有3個(gè)基本概念及原理
1.Observable(被觀察者)
2.Observer(觀察者)
3.subscribe(訂閱)事件。
原理:被觀察者(Observable)
通過(guò) 訂閱(Subscribe)
按順序發(fā)送事件 給觀察者(Observer)
痛侍, 觀察者(Observer)
按順序接收事件 & 作出對(duì)應(yīng)的響應(yīng)動(dòng)作朝氓。
普通事件 : onNext() 接收被觀察者發(fā)送的消息
特殊的事件:
onCompleted() 事件隊(duì)列完結(jié)
onError () 事件隊(duì)列異常
注意:
1)RxJava 不僅把每個(gè)事件單獨(dú)處理魔市,還會(huì)把它們看做一個(gè)隊(duì)列。
2)RxJava 規(guī)定赵哲,onNext() 接收被觀察者發(fā)送的消息待德、可以執(zhí)行多次;當(dāng)不會(huì)再有新的 onNext () 發(fā)出時(shí)枫夺,需要觸發(fā) onCompleted () 方法作為標(biāo)志将宪。onError():事件隊(duì)列異常。在事件處理過(guò)程中出異常時(shí)橡庞,onError() 會(huì)被觸發(fā)较坛,同時(shí)隊(duì)列自動(dòng)終止,不允許再有事件發(fā)出扒最。
3)在一個(gè)正確運(yùn)行的事件序列中, onCompleted() 和 onError () 有且只有一個(gè)丑勤,并且是事件序列中的最后一個(gè)。
4)需要注意的是吧趣,onCompleted()和 onError () 二者也是互斥的法竞,即在隊(duì)列中調(diào)用了其中一個(gè),就不應(yīng)該再調(diào)用另一個(gè)强挫。
5.調(diào)度器
- RxJava中調(diào)度器設(shè)置方法
subscribeOn():或者叫做事件產(chǎn)生的線(xiàn)程岔霸。
指定 subscribe()所發(fā)生的線(xiàn)程,即 Observable.OnSubscribe 被激活時(shí)所處的線(xiàn)程俯渤。
observeOn():或者叫做事件消費(fèi)的線(xiàn)程呆细。指定 Subscriber所運(yùn)行在的線(xiàn)程。
幾種調(diào)度器
在RxJava 中Scheduler——調(diào)度器稠诲,相當(dāng)于線(xiàn)程控制器侦鹏,
RxJava 通過(guò)它來(lái)指定每一段代碼應(yīng)該運(yùn)行在什么樣的線(xiàn)程诡曙。
RxJava 已經(jīng)內(nèi)置了幾個(gè)Scheduler臀叙,它們已經(jīng)適合大多數(shù)的使用場(chǎng)景:
1:Schedulers.immediate():直接在當(dāng)前線(xiàn)程運(yùn)行,相當(dāng)于不指定線(xiàn)程价卤。這是默認(rèn)的Scheduler
2:Schedulers.newThread():總是啟用新線(xiàn)程劝萤,并在新線(xiàn)程執(zhí)行操作。
3:Schedulers.io():I/O 操作(讀寫(xiě)文件慎璧、讀寫(xiě)數(shù)據(jù)庫(kù)床嫌、網(wǎng)絡(luò)信息交互等)所使用的 Scheduler
行為模式和 newThread()差不多區(qū)別在于 io()的內(nèi)部實(shí)現(xiàn)是是用一個(gè)無(wú)數(shù)量上限的線(xiàn)
程池可以重用空閑的線(xiàn)程,因此多數(shù)情況下 io()比 newThread()更有效率胸私。不要把計(jì)算
工作放在 io()中可以避免創(chuàng)建不必要的線(xiàn)程厌处。
4:Schedulers.computation():計(jì)算所使用的 Scheduler這個(gè)計(jì)算指的是 CPU密集型計(jì)算,
即不會(huì)被 I/O 等操作限制性能的操作岁疼,例如圖形的計(jì)算阔涉。這個(gè) Scheduler使用的固定
的線(xiàn)程池,大小為 CPU核數(shù)。不要把 I/O 操作放在computation()中瑰排,否則 I/O 操作
的等待時(shí)間會(huì)浪費(fèi)CPU贯要。
5.AndroidSchedulers.mainThread():Android 還有一個(gè)專(zhuān)用的
它指定的操作將在 Android主線(xiàn)程運(yùn)行。有了這幾個(gè) Scheduler椭住,就可以使用
subscribeOn()和 observeOn()兩個(gè)方法來(lái)對(duì)線(xiàn)程進(jìn)行控制了崇渗。
6.依賴(lài)庫(kù)
//RxJava
implementation 'io.reactivex.rxjava2:rxjava:2.2.4'
implementation 'io.reactivex.rxjava2:rxandroid:2.1.0'
implementation 'com.squareup.retrofit2:retrofit:2.5.0'//retrofit 庫(kù)
implementation 'com.squareup.retrofit2:converter-gson:2.5.0'//轉(zhuǎn)換器,請(qǐng)求結(jié)果轉(zhuǎn)換成Model
implementation 'com.squareup.retrofit2:adapter-rxjava2:2.5.0'//配合Rxjava 使用
implementation 'com.google.code.gson:gson:2.6.2'//Gson 庫(kù)
7.簡(jiǎn)單使用
public static void baseRx(){
//1.創(chuàng)建被觀察者
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) {
emitter.onNext("1111");
emitter.onNext("2222");
emitter.onNext("3333");
emitter.onNext("4444");
//emitter.onError(new Throwable("abc"));
//emitter.onComplete();
}
});
//2.創(chuàng)建觀察者
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {//關(guān)閉線(xiàn)程
Log.e(TAG, "onSubscribe: " );
}
@Override
public void onNext(String s) {
Log.e(TAG, "onNext: "+ s );
}
@Override
public void onError(Throwable e) {//失敗
Log.e(TAG, "onError: "+e.getMessage() );
}
@Override
public void onComplete() {//成功
Log.e(TAG, "onComplete: " );
}
};
//3.被觀察者訂閱觀察者
observable.subscribe(observer);
//線(xiàn)程切換
observable
//被訂閱者在子線(xiàn)程中
.subscribeOn(Schedulers.io())
//訂閱者在主線(xiàn)程中
.observeOn(AndroidSchedulers.mainThread())
.subscribe(observer);
//觀察中可以重復(fù)指定線(xiàn)程
observable
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())//主
.observeOn(Schedulers.io())//子
.observeOn(AndroidSchedulers.mainThread())//主
.subscribe(observer);
}
- 演示鏈?zhǔn)秸{(diào)用
8.Android功能使用
private void rxAndroid() {
Retrofit retrofit = new Retrofit.Builder()
.baseUrl(MyServer.Url)
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
.addConverterFactory(GsonConverterFactory.create())
.build();
MyServer myServer = retrofit.create(MyServer.class);
Observable<ResponseBody> call = myServer.getDate();
call.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<ResponseBody>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(ResponseBody responseBody) {
try {
Log.e(TAG, "onNext: "+responseBody.string() );
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
private void rxAndroidBean() {
Retrofit retrofit = new Retrofit.Builder()
.baseUrl(MyServer.Url)
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
.addConverterFactory(GsonConverterFactory.create())
.build();
MyServer myServer = retrofit.create(MyServer.class);
Observable<Bean> call = myServer.getDate2();
call.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Bean>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Bean responseBody) {
Log.e(TAG, "onNext: "+ responseBody.getRESULT() );
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
9.其他操作符使用( 查看操作符)
- 創(chuàng)建操作符
//遍歷輸出
public static void rxFrom(){
Integer[] a = {1,2,3,4,5};
// Observable.fromArray(1,2,3,4)
//Observable.fromArray("a","b","c")
Observable.fromArray(a).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "accept: "+integer);
}
});
}
//數(shù)組合并輸出
public static void rxJust(){
Integer[] a = {1,2,3};
Integer[] b = {9,8,7};
Observable.just(a,b).subscribe(new Consumer<Integer[]>() {
@Override
public void accept(Integer[] integers) throws Exception {
for (Integer i: integers) {
Log.e(TAG, "accept: "+i);
}
}
});
}
//范圍輸出
public static void rxRange(){
Observable.range(0,20).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "accept: "+integer );
}
});
}
//定時(shí)器
public static void rxInterval(){
Observable.interval(1,1,TimeUnit.SECONDS).subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.e(TAG, "accept: "+aLong );
}
});
}
//閃屏
private void rxjavaInterval() {
final Long time = 5L;
subscribe = Observable.interval(1, TimeUnit.SECONDS)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.e("TAG", "倒計(jì)時(shí):" + aLong);
if (aLong < time && !subscribe.isDisposed()) {
tv.setText("記錄改變生活" + (time - aLong - 1));
} else {
Intent intent = new Intent(WelcomActivity.this, MainActivity.class);
startActivity(intent);
finish();
}
}
});
}
@Override
protected void onDestroy() {
super.onDestroy();
subscribe.dispose();
subscribe = null;
}
- 過(guò)濾操作符
//過(guò)濾輸出
public static void rxFilter(){
Integer[] a = {1,2,3,4,5};
Observable.fromArray(a).filter(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
if (integer>3){
return true;
}
return false;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "accept: "+integer );
}
});
}
- 變換操作符
定義:一個(gè)在 Java VM 上使用可觀測(cè)的序列來(lái)組成異步的叶撒、基于事件的程序的庫(kù)。
總結(jié):RxJava 是一個(gè) 基于事件流耐版、實(shí)現(xiàn)異步操作的庫(kù)
理解:RXJava是一個(gè)響應(yīng)式編程框架 祠够,采用觀察者設(shè)計(jì)模式,觀察者模式本身的目的就是『后臺(tái)處理粪牲,前臺(tái)回調(diào)』的異步機(jī)制
優(yōu)點(diǎn):
由于 RxJava是基于事件流的鏈?zhǔn)秸{(diào)用古瓤,所以使得 RxJava:
邏輯簡(jiǎn)潔
實(shí)現(xiàn)優(yōu)雅
使用簡(jiǎn)單
實(shí)現(xiàn)異步操作
類(lèi)似于 Android中的 AsyncTask 、Handler作用
通過(guò)setOnClickListener()方法湿滓,Button持有OnClickListener的引用;當(dāng)用戶(hù)擊時(shí)舌狗,Button自動(dòng)調(diào)用OnClickListener的onClick()方法叽奥。
Button——>被觀察者
OnClickListener——>觀察者
setOnClickListener ——>訂閱
onClick ——>事件
1.Observable(被觀察者)
2.Observer(觀察者)
3.subscribe(訂閱)事件。
原理:被觀察者(Observable)
通過(guò) 訂閱(Subscribe)
按順序發(fā)送事件 給觀察者(Observer)
痛侍, 觀察者(Observer)
按順序接收事件 & 作出對(duì)應(yīng)的響應(yīng)動(dòng)作朝氓。
普通事件 : onNext() 接收被觀察者發(fā)送的消息
特殊的事件:
onCompleted() 事件隊(duì)列完結(jié)
onError () 事件隊(duì)列異常
1)RxJava 不僅把每個(gè)事件單獨(dú)處理魔市,還會(huì)把它們看做一個(gè)隊(duì)列。
2)RxJava 規(guī)定赵哲,onNext() 接收被觀察者發(fā)送的消息待德、可以執(zhí)行多次;當(dāng)不會(huì)再有新的 onNext () 發(fā)出時(shí)枫夺,需要觸發(fā) onCompleted () 方法作為標(biāo)志将宪。onError():事件隊(duì)列異常。在事件處理過(guò)程中出異常時(shí)橡庞,onError() 會(huì)被觸發(fā)较坛,同時(shí)隊(duì)列自動(dòng)終止,不允許再有事件發(fā)出扒最。
3)在一個(gè)正確運(yùn)行的事件序列中, onCompleted() 和 onError () 有且只有一個(gè)丑勤,并且是事件序列中的最后一個(gè)。
4)需要注意的是吧趣,onCompleted()和 onError () 二者也是互斥的法竞,即在隊(duì)列中調(diào)用了其中一個(gè),就不應(yīng)該再調(diào)用另一個(gè)强挫。
subscribeOn():或者叫做事件產(chǎn)生的線(xiàn)程岔霸。
指定 subscribe()所發(fā)生的線(xiàn)程,即 Observable.OnSubscribe 被激活時(shí)所處的線(xiàn)程俯渤。
observeOn():或者叫做事件消費(fèi)的線(xiàn)程呆细。指定 Subscriber所運(yùn)行在的線(xiàn)程。
在RxJava 中Scheduler——調(diào)度器稠诲,相當(dāng)于線(xiàn)程控制器侦鹏,
RxJava 通過(guò)它來(lái)指定每一段代碼應(yīng)該運(yùn)行在什么樣的線(xiàn)程诡曙。
RxJava 已經(jīng)內(nèi)置了幾個(gè)Scheduler臀叙,它們已經(jīng)適合大多數(shù)的使用場(chǎng)景:
1:Schedulers.immediate():直接在當(dāng)前線(xiàn)程運(yùn)行,相當(dāng)于不指定線(xiàn)程价卤。這是默認(rèn)的Scheduler
2:Schedulers.newThread():總是啟用新線(xiàn)程劝萤,并在新線(xiàn)程執(zhí)行操作。
3:Schedulers.io():I/O 操作(讀寫(xiě)文件慎璧、讀寫(xiě)數(shù)據(jù)庫(kù)床嫌、網(wǎng)絡(luò)信息交互等)所使用的 Scheduler
行為模式和 newThread()差不多區(qū)別在于 io()的內(nèi)部實(shí)現(xiàn)是是用一個(gè)無(wú)數(shù)量上限的線(xiàn)
程池可以重用空閑的線(xiàn)程,因此多數(shù)情況下 io()比 newThread()更有效率胸私。不要把計(jì)算
工作放在 io()中可以避免創(chuàng)建不必要的線(xiàn)程厌处。
4:Schedulers.computation():計(jì)算所使用的 Scheduler這個(gè)計(jì)算指的是 CPU密集型計(jì)算,
即不會(huì)被 I/O 等操作限制性能的操作岁疼,例如圖形的計(jì)算阔涉。這個(gè) Scheduler使用的固定
的線(xiàn)程池,大小為 CPU核數(shù)。不要把 I/O 操作放在computation()中瑰排,否則 I/O 操作
的等待時(shí)間會(huì)浪費(fèi)CPU贯要。
5.AndroidSchedulers.mainThread():Android 還有一個(gè)專(zhuān)用的
它指定的操作將在 Android主線(xiàn)程運(yùn)行。有了這幾個(gè) Scheduler椭住,就可以使用
subscribeOn()和 observeOn()兩個(gè)方法來(lái)對(duì)線(xiàn)程進(jìn)行控制了崇渗。
//RxJava
implementation 'io.reactivex.rxjava2:rxjava:2.2.4'
implementation 'io.reactivex.rxjava2:rxandroid:2.1.0'
implementation 'com.squareup.retrofit2:retrofit:2.5.0'//retrofit 庫(kù)
implementation 'com.squareup.retrofit2:converter-gson:2.5.0'//轉(zhuǎn)換器,請(qǐng)求結(jié)果轉(zhuǎn)換成Model
implementation 'com.squareup.retrofit2:adapter-rxjava2:2.5.0'//配合Rxjava 使用
implementation 'com.google.code.gson:gson:2.6.2'//Gson 庫(kù)
public static void baseRx(){
//1.創(chuàng)建被觀察者
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) {
emitter.onNext("1111");
emitter.onNext("2222");
emitter.onNext("3333");
emitter.onNext("4444");
//emitter.onError(new Throwable("abc"));
//emitter.onComplete();
}
});
//2.創(chuàng)建觀察者
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {//關(guān)閉線(xiàn)程
Log.e(TAG, "onSubscribe: " );
}
@Override
public void onNext(String s) {
Log.e(TAG, "onNext: "+ s );
}
@Override
public void onError(Throwable e) {//失敗
Log.e(TAG, "onError: "+e.getMessage() );
}
@Override
public void onComplete() {//成功
Log.e(TAG, "onComplete: " );
}
};
//3.被觀察者訂閱觀察者
observable.subscribe(observer);
//線(xiàn)程切換
observable
//被訂閱者在子線(xiàn)程中
.subscribeOn(Schedulers.io())
//訂閱者在主線(xiàn)程中
.observeOn(AndroidSchedulers.mainThread())
.subscribe(observer);
//觀察中可以重復(fù)指定線(xiàn)程
observable
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())//主
.observeOn(Schedulers.io())//子
.observeOn(AndroidSchedulers.mainThread())//主
.subscribe(observer);
}
private void rxAndroid() {
Retrofit retrofit = new Retrofit.Builder()
.baseUrl(MyServer.Url)
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
.addConverterFactory(GsonConverterFactory.create())
.build();
MyServer myServer = retrofit.create(MyServer.class);
Observable<ResponseBody> call = myServer.getDate();
call.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<ResponseBody>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(ResponseBody responseBody) {
try {
Log.e(TAG, "onNext: "+responseBody.string() );
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
private void rxAndroidBean() {
Retrofit retrofit = new Retrofit.Builder()
.baseUrl(MyServer.Url)
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
.addConverterFactory(GsonConverterFactory.create())
.build();
MyServer myServer = retrofit.create(MyServer.class);
Observable<Bean> call = myServer.getDate2();
call.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Bean>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Bean responseBody) {
Log.e(TAG, "onNext: "+ responseBody.getRESULT() );
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
//遍歷輸出
public static void rxFrom(){
Integer[] a = {1,2,3,4,5};
// Observable.fromArray(1,2,3,4)
//Observable.fromArray("a","b","c")
Observable.fromArray(a).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "accept: "+integer);
}
});
}
//數(shù)組合并輸出
public static void rxJust(){
Integer[] a = {1,2,3};
Integer[] b = {9,8,7};
Observable.just(a,b).subscribe(new Consumer<Integer[]>() {
@Override
public void accept(Integer[] integers) throws Exception {
for (Integer i: integers) {
Log.e(TAG, "accept: "+i);
}
}
});
}
//范圍輸出
public static void rxRange(){
Observable.range(0,20).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "accept: "+integer );
}
});
}
//定時(shí)器
public static void rxInterval(){
Observable.interval(1,1,TimeUnit.SECONDS).subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.e(TAG, "accept: "+aLong );
}
});
}
//閃屏
private void rxjavaInterval() {
final Long time = 5L;
subscribe = Observable.interval(1, TimeUnit.SECONDS)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.e("TAG", "倒計(jì)時(shí):" + aLong);
if (aLong < time && !subscribe.isDisposed()) {
tv.setText("記錄改變生活" + (time - aLong - 1));
} else {
Intent intent = new Intent(WelcomActivity.this, MainActivity.class);
startActivity(intent);
finish();
}
}
});
}
@Override
protected void onDestroy() {
super.onDestroy();
subscribe.dispose();
subscribe = null;
}
//過(guò)濾輸出
public static void rxFilter(){
Integer[] a = {1,2,3,4,5};
Observable.fromArray(a).filter(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
if (integer>3){
return true;
}
return false;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "accept: "+integer );
}
});
}
①M(fèi)ap:通過(guò)指定一個(gè)Fun函數(shù)將Observeble轉(zhuǎn)換成一個(gè)新的Observable對(duì)象并發(fā)射京郑,觀察者收到新的observable處理宅广。
public static void rxMap(){
Integer[] a = {1,2,3,4,5};
Observable.fromArray(a).map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) {
return integer+"abc";
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) {
Log.e(TAG, "accept: "+s );
}
});
}
②FlatMap:將被觀察者發(fā)送的事件序列進(jìn)行 拆分 & 單獨(dú)轉(zhuǎn)換,再合并成一個(gè)新的事件序列傻挂,最后再進(jìn)行發(fā)送
public static void rxFlatMap(){
Integer[] a = {1,2,3,4,5};
Observable.fromArray(a).flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
String[] strs = new String[3];
for (int i =0;i<strs.length;i++){
strs[i] = integer + strs[i];
}
return Observable.fromArray(strs);
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e(TAG, "accept: "+s );
}
});
}
注意:flatmap的合并允許交叉乘碑,也就是說(shuō)可能會(huì)交錯(cuò)的發(fā)送事件,最終結(jié)果的順序可能并不是由原始發(fā)送時(shí)候的順序金拒。
③concatMap:concatMap操作符功能與flatmap功能一致兽肤,不過(guò)他解決了flatmap交叉問(wèn)題,提供了一種能夠把發(fā)射的值連續(xù)在一起的函數(shù)绪抛,而并不是合并他們资铡。具體使用和flatmap一致。
- 組合/合并操作符
//Observable壓縮合并ZIP:合并 多個(gè)被觀察者(Observable)發(fā)送的事件幢码,生成一個(gè)新的事件序列(即組合過(guò)后的事件序列)笤休,并最終發(fā)送
public static void rxZip(){
Integer[] a= {1,2,3};
Integer[] b={4,5,6};
Observable<Integer> observableA = Observable.fromArray(a);
Observable<Integer> observableB = Observable.fromArray(b);
Observable.zip(observableA, observableB, new BiFunction<Integer, Integer, String>() {
@Override
public String apply(Integer integer, Integer integer2) throws Exception {
return integer + ":" + integer2;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e(TAG, "accept: "+s );
}
});
}
//合并:組合多個(gè)被觀察者一起發(fā)送數(shù)據(jù),合并后 按時(shí)間線(xiàn)并行執(zhí)行
//merge()組合被觀察者數(shù)量≤4個(gè),而mergeArray()則可>4個(gè)
public static void rxMerge(){
Integer[] a ={1,2,3};
String[] b = {"abc","aaa","bbb"};
char[] c = {'a','b','c'};
Observable<Integer> A = Observable.fromArray(a);
Observable<String> B = Observable.fromArray(b);
Observable<char[]> C = Observable.fromArray(c);
Observable
.merge(A,B,C)
.subscribe(new Consumer<Serializable>() {
@Override
public void accept(Serializable serializable) throws Exception {
Log.e(TAG, "accept: ."+serializable );
}
});
}
- 功能性操作符
subscribOn()
observeOn()
10.RxAndroid好處
用途
是一個(gè)實(shí)現(xiàn)異步操作的庫(kù)症副,具有簡(jiǎn)潔的鏈?zhǔn)酱a店雅,提供強(qiáng)大的數(shù)據(jù)變換。優(yōu)勢(shì)
異步好簡(jiǎn)單贞铣、代碼好簡(jiǎn)潔闹啦,一個(gè)簡(jiǎn)單、一個(gè)簡(jiǎn)潔辕坝,這就意味著工作效率窍奋。注意
subscribeOn只能定義一次,除非是在定義doOnSubscribe
observeOn可以定義多次酱畅,決定后續(xù)代碼所在的線(xiàn)程
11.RxJava:好處
使用Rxjava的好處在于琳袄,我們可以方便的切換方法的執(zhí)行線(xiàn)程表窘,對(duì)線(xiàn)程動(dòng)態(tài)切換厨内,該過(guò)程無(wú)需我們自己手動(dòng)創(chuàng)建和啟動(dòng)線(xiàn)程宝当。使用Rxjava創(chuàng)建的代碼雖然出現(xiàn)在同一個(gè)線(xiàn)程中芒帕,但是我們可以設(shè)置使得不同方法在不同線(xiàn)程中執(zhí)行占婉。上述功能的實(shí)現(xiàn)主要?dú)w功于RxJava的Scheduler實(shí)現(xiàn)菇绵,Scheduler 提供了『后臺(tái)處理费尽,前臺(tái)回調(diào)』的異步機(jī)制至会。
12. AsyncTask與RxJava的區(qū)別
- RxJava 實(shí)現(xiàn)異步操作是通過(guò)一種擴(kuò)展的觀察者模式來(lái)實(shí)現(xiàn)的。
- 異步矮慕、簡(jiǎn)潔(邏輯帮匾、代碼讀寫(xiě))。
- RxJava 內(nèi)部支持多線(xiàn)程操作
- AyncTask是采用線(xiàn)程池的形式實(shí)現(xiàn)的痴鳄。
- 出現(xiàn)錯(cuò)誤的處理-rxjava 自身有錯(cuò)誤的方法回調(diào)瘟斜,aync無(wú)法做到。
- 并發(fā)的請(qǐng)求痪寻,rxjava 通過(guò)操作符能夠完成各種并發(fā)情況螺句,而AyncTask不行。