前言
首先,感謝以下作者分享自己對(duì)于RxJava的理解
給 Android 開(kāi)發(fā)者的 RxJava 詳解
針對(duì)RxJava1.×起便,入門必備
RxJava2 只看這一篇文章就夠了 - 掘金
RxJava2的API大全蒜埋,適合需要使用對(duì)應(yīng)APID時(shí)作為資料查看
本文設(shè)計(jì)源碼以及介紹均是基于RxJava2鹰服,示例均是為了示例而示例齿风,項(xiàng)目中要使用RxJava,首先在gradle中添加依賴:
implementation 'io.reactivex.rxjava2:rxjava:2.1.4'
implementation 'io.reactivex.rxjava2:rxandroid:2.0.2'
一您朽、RxJava2簡(jiǎn)介
什么是RxJava
A library for composing asynchronous and event-based programs by using observable sequences.
一個(gè)在 Java VM 上使用可觀測(cè)的序列來(lái)組成異步的狂丝、基于事件的程序的庫(kù)
引用RxJava Github Readme中的介紹,可能看起來(lái)仍然讓人感到費(fèi)解哗总,我的理解是:
使用觀察者模式几颜,采用鏈?zhǔn)骄幊蹋谑录膶?shí)現(xiàn)異步的庫(kù)
到這里讯屈,沒(méi)有使用過(guò)RxJava的話蛋哭,一定還是不知道這個(gè)庫(kù)究竟是做什么的,沒(méi)關(guān)系涮母,我們慢慢道來(lái)谆趾。首先我們需要知道RxJava究竟解決了什么問(wèn)題。
RxJava解決了什么問(wèn)題
即使沒(méi)有使用過(guò)叛本,一定也聽(tīng)說(shuō)過(guò)RxJava沪蓬,RxJava應(yīng)用如此廣泛,那么它究竟解決了什么問(wèn)題呢来候?一個(gè)詞:異步跷叉,但是同樣是異步,可以使用的工具有很多营搅,我們?yōu)槭裁雌褂肦xJava呢云挟?
為什么使用RxJava
我們知道,Android中實(shí)現(xiàn)異步的方式有很多转质,Handler园欣、AsyncTask、runOnUiThread等休蟹,為什么我們就要使用RxJava來(lái)實(shí)現(xiàn)異步呢沸枯?換言之日矫,RxJava相比其他實(shí)現(xiàn)異步的方法有什么優(yōu)勢(shì)呢?最大的優(yōu)勢(shì)就是邏輯清晰辉饱。
假設(shè)有這樣一個(gè)需求:界面上有一個(gè)自定義的視圖 imageCollectorView 搬男,它的作用是顯示多張圖片拣展,并能使用 add(Bitmap) 方法來(lái)任意增加顯示的圖片∨碚樱現(xiàn)在需要程序?qū)⒁粋€(gè)給出的目錄數(shù)組 File[] folders 中每個(gè)目錄下的 png 圖片都加載出來(lái)并顯示在 imageCollectorView 中。需要注意的是备埃,由于讀取圖片的這一過(guò)程較為耗時(shí)姓惑,需要放在后臺(tái)執(zhí)行,而圖片的顯示則必須在 UI 線程執(zhí)行按脚。常用的實(shí)現(xiàn)方式有多種于毙,我這里貼出其中一種:
new Thread(() -> {
for (File folder : folders) {
File[] files = folder.listFiles();
for (File file : files) {
if (file.getName().endsWith(".png")) {
final Bitmap bitmap = getBitmapFromFile(file);
runOnUiThread(() -> imageCollectorView.add(bitmap));
}
}
}
}).start();
而如果使用RxJava,實(shí)現(xiàn)方式是這樣的:
Observable.fromArray(folders)
.flatMap((Function<File, ObservableSource<File>>) folder -> Observable.fromArray(folder.listFiles()))
.filter(file -> file.getName().endsWith("png"))
.map((Function<File, Bitmap>) file -> getBitmapFromFile(file))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(bitmap -> imageCollectorView.add(bitmap));
一定有人會(huì)覺(jué)得辅搬,這完全沒(méi)有簡(jiǎn)潔呀唯沮,哪里看出來(lái)簡(jiǎn)潔了,上面所說(shuō)的簡(jiǎn)潔只是邏輯簡(jiǎn)潔堪遂,而非代碼簡(jiǎn)潔溶褪。
觀察一下,你就會(huì)發(fā)現(xiàn)猿妈,使用RxJava吹菱,是一條鏈?zhǔn)秸{(diào)用下來(lái)的鳍刷,完全沒(méi)有任何嵌套,這種優(yōu)勢(shì)在需求變得復(fù)雜之后俯抖,優(yōu)勢(shì)會(huì)更加明顯输瓜,比如我們現(xiàn)在增加一條需求前痘,只需要加載前十張圖片担忧,不使用RxJava要怎么去做呢瓶盛?相信大家都會(huì)做示罗,不妨實(shí)現(xiàn)去看看蚜点,而如果是使用RxJava绍绘,修改后的實(shí)現(xiàn)如下:
Observable.fromArray(folders)
.flatMap((Function<File, ObservableSource<File>>) folder -> Observable.fromArray(folder.listFiles()))
.filter(file -> file.getName().endsWith("png"))
.map((Function<File, Bitmap>) file -> getBitmapFromFile(file))
.take(LOAD_IMAGE_NUM)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(bitmap -> imageCollectorView.add(bitmap));
可以看到陪拘,僅僅只增加了一行代碼纤壁,且未新增任何嵌套酌媒。
所以RxJava最大的優(yōu)勢(shì)就是簡(jiǎn)潔,無(wú)論邏輯如何復(fù)雜喇辽,總是一條鏈?zhǔn)秸{(diào)用的簡(jiǎn)潔拭荤。
二舅世、基本使用
在分析RxJava的原理之前雏亚,我們首先要了解一下觀察者模式罢低,如果已經(jīng)熟知觀察者模式,可以跳過(guò)這段宜岛。
觀察者模式
先讓我們想象一下生活中一個(gè)場(chǎng)景:
讀者(觀察者)發(fā)現(xiàn)一個(gè)微信公眾號(hào)(被觀察者)很不錯(cuò)萍倡,希望長(zhǎng)期關(guān)注列敲,于是便關(guān)注(注冊(cè))了這個(gè)微信公眾號(hào)戴而,當(dāng)這個(gè)微信公眾號(hào)發(fā)布了一篇新的文章所意,關(guān)注了該微信公眾號(hào)的讀者就會(huì)接收到通知(被觀察者發(fā)送通知給注冊(cè)了的觀察者)扁眯,如果讀者不再關(guān)注(注銷)該微信公眾號(hào)翅帜,就再不會(huì)接收到通知了
以上就是一個(gè)典型的觀察者模式的流程涝滴,生活中像這樣的場(chǎng)景還有很多歼疮,那么什么是觀察者模式呢韩脏?
概念
引用《Head First設(shè)計(jì)模式》一書中的原話
觀察者模式定義了對(duì)象之間一對(duì)多的依賴赡矢,這樣一來(lái)吹散,當(dāng)該對(duì)象改變狀態(tài)時(shí),它的所有依賴者會(huì)受到通知并自動(dòng)更新
——Head First設(shè)計(jì)模式
以下是觀察者模式UML圖
我們可以看到,觀察者模式首先定義了兩個(gè)接口類:Observable(被觀察者)画饥、Observer(觀察者)抖甘,
Observable接口中包含三個(gè)方法
- registerObserver()注冊(cè)
- removeObserver()注銷
- notifyObservers()發(fā)送通知
Observer接口中包含一個(gè)方法
- update()觀察者接收到通知后的行為
以上兩個(gè)接口单山,很簡(jiǎn)單,在這里我就不貼代碼了昼接,那么實(shí)現(xiàn)類要怎么做呢慢睡?
以下是實(shí)現(xiàn)類的代碼
Observable實(shí)現(xiàn)類:
public class ConcreteObservable implements Observable {
private static final String TAG = ConcreteObservable.class.getSimpleName();
List<Observer> observers;
public ConcreteObservable(){
observers = new ArrayList<>();
}
@Override
public void register(Observer observer) {
if (observer == null) {
throw new NullPointerException("observer cannot be null");
}
if (observers.contains(observer)) {
Log.d("TAG", "you have registered this");
} else {
Log.d("TAG", observer.toString() + " register");
observers.add(observer);
}
}
@Override
public void removeObserver(Observer observer) {
if (observer == null) {
throw new NullPointerException("observer cannot be null");
}
if (observers.contains(observer)) {
Log.d("TAG", observer.toString() + " remove");
observers.remove(observer);
} else {
Log.d("TAG", "you have not registered this");
}
}
@Override
public void notifyObservers() {
for (Observer observer : observers) {
observer.update();
}
}
Observer實(shí)現(xiàn)類:
public class TomObserver implements Observer {
private static final String TAG = TomObserver.class.getSimpleName();
@Override
public void update() {
Log.d("TAG", "notify Tom");
}
@Override
public String toString() {
return "Tom";
}
}
public class KittyObserver implements Observer {
@Override
public void update() {
Log.d("TAG", "notify Kitty");
}
@Override
public String toString() {
return "Kitty";
}
}
使用:
public class MainActivity extends AppCompatActivity {
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
Observer tomObserver = new TomObserver(), kittyObserver = new KittyObserver();
Observable observable = new ConcreteObservable();
observable.register(tomObserver);
observable.register(kittyObserver);
observable.notifyObservers();
observable.removeObserver(tomObserver);
observable.notifyObservers();
}
}
打印日志
D/TAG: Tom register
D/TAG: Kitty register
D/TAG: notify Tom
D/TAG: notify Kitty
D/TAG: Tom remove
D/TAG: notify Kitty
觀察者模式在RxJava中的應(yīng)用
RxJava中有四個(gè)概念棕硫,Observable(被觀察者)、Observer(觀察者)纬纪、subscribe(訂閱)包各、事件问畅,
Observer和Observable通過(guò)subscribe方法實(shí)現(xiàn)訂閱护姆,從而Observable發(fā)送事件時(shí)签则,Observer可以收到相關(guān)通知渐裂。
與標(biāo)準(zhǔn)觀察者模式不同的是钠惩,RxJava除了onNext(對(duì)應(yīng)標(biāo)準(zhǔn)觀察者模式的notifyObservers())事件以外篓跛,還有onError()和onComplete()愧沟,當(dāng)Observable發(fā)送完一系列onNext()事件后,必須調(diào)用onComplete()標(biāo)志著事件序列的結(jié)束盖奈,中間如果出現(xiàn)錯(cuò)誤钢坦,便會(huì)走到onError()事件爹凹,這兩個(gè)事件均只會(huì)被調(diào)用一次镶殷,且是相互排斥的批钠,即走到了onComplete()就不會(huì)走到onError()埋心,反之亦然拷呆。
基本實(shí)現(xiàn)
Observable(被觀察者)
Observable<String> observable = Observable.create(emitter -> {
emitter.onNext("Hello, observer");
emitter.onComplete();
});
可以看到通過(guò)Observable.create()方法創(chuàng)建了Observable對(duì)象茬斧,方法中傳入了ObservableOnSubscribe<T>對(duì)象项秉,這個(gè)類中使用了泛型娄蔼,這里的泛型就對(duì)應(yīng)著onNext()事件傳入的參數(shù)類型岁诉,以上例子這里的數(shù)據(jù)類型是String涕癣,在傳入的ObservableOnSubscribe<T>對(duì)象中坠韩,重寫了subscribe方法只搁,上述例子發(fā)送了一個(gè)onNext()事件须蜗,又發(fā)送了一個(gè)onComplete()事件明肮,告訴Observer事件序列結(jié)束,不會(huì)再有新的onNext()事件循未。
當(dāng)然的妖,Observable對(duì)象并不止這一種創(chuàng)建方式嫂粟,RxJava提供了很多相關(guān)的API星虹,例如fromArray()宽涌、just()等等卸亮,暫且按下不表兼贸。
Observer(觀察者)
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d("TAG", "onSubscribe");
}
@Override
public void onNext(String o) {
Log.d("TAG", o);
}
@Override
public void onError(Throwable e) {
Log.d("TAG", "onError");
}
@Override
public void onComplete() {
Log.d("TAG", "onComplete");
}
};
創(chuàng)建Observer對(duì)象需要重寫四個(gè)方法,可以看到其中三個(gè)重寫的方法剛好對(duì)應(yīng)三個(gè)事件很澄,可以猜測(cè)甩苛,Observable發(fā)送的事件應(yīng)該是交由對(duì)應(yīng)的方法處理,但是多出來(lái)的onSubscribe()方法是做什么的呢?onSubscribe()方法是在Observable發(fā)送事件前被調(diào)用的赁酝,稍后會(huì)說(shuō)到這點(diǎn)酌呆。
當(dāng)然搔耕,RxJava中的觀察者對(duì)象并不止Observer這一個(gè)類弃榨,RxJava還提供了一些其他的類供實(shí)現(xiàn)鲸睛,例如Consumer等腊凶。
subscribe(訂閱)
observable.subscribe(observer);
這一步就非常簡(jiǎn)單钧萍,但是會(huì)發(fā)現(xiàn)這里有個(gè)奇怪的地方风瘦,正常流程應(yīng)該是觀察者訂閱被觀察者万搔,但是這里卻是被觀察者訂閱觀察者,顯然酗捌,observer.subscribe(observable)這樣的寫法更加容易讓讀者接受
過(guò)如果把 API 設(shè)計(jì)成 observer.subscribe(observable) / subscriber.subscribe(observable) 胖缤,雖然更加符合思維邏輯哪廓,但對(duì)流式 API 的設(shè)計(jì)就造成影響了涡真,比較起來(lái)明顯是得不償失的哆料。
以上引用扔物線的一段話
RxJava中線程控制
有一些讀者一定好奇剧劝,說(shuō)了半天讥此,好像也沒(méi)有看到實(shí)現(xiàn)了異步呀萄喳,別著急他巨,這就介紹RxJava是如何在這個(gè)鏈?zhǔn)骄幊讨袑?shí)現(xiàn)異步的染突。
我們看一下下面的示例代碼
Observable
.create((ObservableOnSubscribe<String>) emitter -> {
//模擬網(wǎng)絡(luò)訪問(wèn)流程
Log.d("TAG", "subscribe " + Thread.currentThread().getName());
Thread.sleep(200);
emitter.onNext("json ");
emitter.onComplete();
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d("TAG", "onSubscribe " + Thread.currentThread().getName());
}
@Override
public void onNext(String s) {
Log.d("TAG", s + Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
Log.d("TAG", "onError " + Thread.currentThread().getName());
}
@Override
public void onComplete() {
Log.d("TAG", "onComplete " + Thread.currentThread().getName());
}
});
以下是日志
D/TAG: onSubscribe main
D/TAG: subscribe RxCachedThreadScheduler-1
D/TAG: json main
D/TAG: onComplete main
我們發(fā)現(xiàn)也榄,Observable發(fā)送事件的方法是運(yùn)行在子線程的甜紫,而Observer處理事件的方法均是運(yùn)行在主線程(UI線程)囚霸,而我們僅僅只是增加了兩行代碼拓型,就實(shí)現(xiàn)了異步編程岩睁,是不是很方便呢捕儒?但是有一個(gè)例外刘莹,onSubscribe()方法根據(jù)日志打印点弯,是運(yùn)行在主線程的抢肛,那么它真的就是和其他的事件一樣捡絮,由于增加了兩行代碼福稳,運(yùn)行在了主線程嗎的圆?我們讓這個(gè)任務(wù)在子線程運(yùn)行越妈,再看看打印的日志
D/TAG: onSubscribe Thread-4
D/TAG: subscribe RxCachedThreadScheduler-1
D/TAG: json main
D/TAG: onComplete main
由此我們知道,onSubscribe()既不是運(yùn)行在發(fā)送事件的線程,也不是運(yùn)行在處理事件的線程娱节,而是運(yùn)行在當(dāng)前線程肄满,具體這是為什么呢稠歉,我們后面會(huì)介紹〈ィ現(xiàn)在我們先來(lái)看一下勺疼,增加的兩行代碼究竟是什么意思呢执庐?
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
subscribeOn(Scheduler scheduler)方法定義了發(fā)送事件運(yùn)行的線程轨淌,而observeOn(Scheduler scheduler)方法則是指定處理事件運(yùn)行的線程递鹉,RxJava中也指定了一些可以直接使用的Scheduler
-
AndroidSchedulers.mainThread()
Android主線程,即UI線程 -
Schedulers.newThread()
創(chuàng)建一個(gè)新的線程 -
Schedulers.io()
用于 IO 密集型任務(wù)窜觉,如果異步阻塞 IO 操作禀挫。 -
Schedulers.computation()
用于使用計(jì)算任務(wù)语婴,如事件循環(huán)和回調(diào)處理 -
Schedulers.trampoline()
當(dāng)前線程 -
Schedulers.single()
單線程砰左,如果線程中有正在處理的任務(wù),新來(lái)的任務(wù)會(huì)進(jìn)入等待隊(duì)列 -
Schedulers.from(Executor executor)
指定一個(gè)線程池
常用Api
創(chuàng)建操作符
1. create()
有什么用
創(chuàng)建一個(gè)Observable
怎么用
Observable
.create((ObservableOnSubscribe<String>) emitter -> {
emitter.onNext("1");
emitter.onNext("2");
emitter.onNext("3");
emitter.onComplete();
})
.subscribe((s -> Log.d(TAG, "onNext: " + s)));
日志打印
D/MainActivity: onNext: 1
D/MainActivity: onNext: 2
D/MainActivity: onNext: 3
2. just()
有什么用
傳入不超過(guò)十個(gè)對(duì)象僻造,依次發(fā)送
怎么用
Observable.just(1, 3, 5)
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
日志打印
D/MainActivity: onNext: 1
D/MainActivity: onNext: 3
D/MainActivity: onNext: 5
D/MainActivity: onComplete
3. fromArray()
有什么用
與just()類似竹挡,只不過(guò)fromArray()可以傳入超過(guò)10個(gè)item立膛,也可以傳入數(shù)組揪罕、集合等
怎么用
Integer[] array = {1, 3, 5};
Observable.fromArray(array)
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
日志打印
D/MainActivity: onNext: 1
D/MainActivity: onNext: 3
D/MainActivity: onNext: 5
D/MainActivity: onComplete
注意:這里的數(shù)組必須是對(duì)象數(shù)組,而不能是基本類型數(shù)組
4. fromCallable()
有什么用
這里的 Callable 是 java.util.concurrent 中的 Callable旧巾,Callable 和 Runnable 的用法基本一致耸序,只是它會(huì)返回一個(gè)結(jié)果值,這個(gè)結(jié)果值就是發(fā)給觀察者的鲁猩。
怎么用
Observable.fromCallable(() -> 1)
.subscribe(integer -> Log.d(TAG, "accept" + integer));
日志打印
D/MainActivity: accept: 1
5. timer()
有什么用
指定時(shí)間倒計(jì)時(shí)結(jié)束后,會(huì)發(fā)送一個(gè)0L的值給觀察者
怎么用
Observable.timer(5, TimeUnit.SECONDS)
.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe");
}
@Override
public void onNext(Long aLong) {
Log.d(TAG, "onNext: " + aLong);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
日志打印
2019-05-20 21:59:42.912 5248-5248/com.danc.calendartext D/MainActivity: onSubscribe
2019-05-20 21:59:47.918 5248-5302/com.danc.calendartext D/MainActivity: onNext: 0
可以看到廓握,onSubscribe之后隔了5s中搅窿,接收到onNext事件
6. interval()
有什么用
每隔指定時(shí)間發(fā)送一個(gè)事件,這個(gè)事件包含一個(gè)數(shù)字隙券,從0開(kāi)始男应,每次自增1
怎么用
Observable.interval(0, 1, TimeUnit.SECONDS)
.subscribe(aLong -> {
Log.d(TAG, "計(jì)時(shí)(s): " + aLong);
});
日志打印
2019-05-20 22:03:29.786 5375-5420/com.danc.calendartext D/MainActivity: 計(jì)時(shí)(s): 0
2019-05-20 22:03:30.786 5375-5420/com.danc.calendartext D/MainActivity: 計(jì)時(shí)(s): 1
2019-05-20 22:03:31.786 5375-5420/com.danc.calendartext D/MainActivity: 計(jì)時(shí)(s): 2
2019-05-20 22:03:32.787 5375-5420/com.danc.calendartext D/MainActivity: 計(jì)時(shí)(s): 3
2019-05-20 22:03:33.786 5375-5420/com.danc.calendartext D/MainActivity: 計(jì)時(shí)(s): 4
2019-05-20 22:03:34.786 5375-5420/com.danc.calendartext D/MainActivity: 計(jì)時(shí)(s): 5
轉(zhuǎn)化操作符
1. map()
有什么用
將被觀察者發(fā)送的數(shù)據(jù)類型轉(zhuǎn)換為其他類型
怎么用
以下實(shí)例將Integer類型轉(zhuǎn)換為String
Observable.just(1, 3, 5)
.map(integer -> "--" + integer)
.subscribe(s -> Log.d(TAG, "accept: " + s));
日志打印
2019-05-20 22:05:58.079 5494-5494/com.danc.calendartext D/MainActivity: accept: --1
2019-05-20 22:05:58.079 5494-5494/com.danc.calendartext D/MainActivity: accept: --3
2019-05-20 22:05:58.079 5494-5494/com.danc.calendartext D/MainActivity: accept: --5
2. flatMap()
有什么用
這個(gè)方法可以將事件序列中的元素進(jìn)行整合加工,返回一個(gè)新的被觀察者娱仔。
怎么用
flatMap() 其實(shí)與 map() 類似沐飘,但是 flatMap() 返回的是一個(gè) Observerable。現(xiàn)在用一個(gè)例子來(lái)說(shuō)明 flatMap() 的用法牲迫。
假設(shè)一個(gè)有一個(gè) Person 類耐朴,這個(gè)類的定義如下:
public class Person {
private String name;
private List<Plan> planList = new ArrayList<>();
public Person(String name, List<Plan> planList) {
this.name = name;
this.planList = planList;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public List<Plan> getPlanList() {
return planList;
}
public void setPlanList(List<Plan> planList) {
this.planList = planList;
}
}
Person 類有一個(gè) name 和 planList 兩個(gè)變量,分別代表的是人名和計(jì)劃清單盹憎。
Plan 類的定義如下:
public class Plan {
private String time;
private String content;
private List<String> actionList = new ArrayList<>();
public Plan(String time, String content) {
this.time = time;
this.content = content;
}
public String getTime() {
return time;
}
public void setTime(String time) {
this.time = time;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
public List<String> getActionList() {
return actionList;
}
public void setActionList(List<String> actionList) {
this.actionList = actionList;
}
}
現(xiàn)在有一個(gè)需求就是要將 Person 集合中的每個(gè)元素中的 Plan 的 action 打印出來(lái)筛峭。
首先用 map() 來(lái)實(shí)現(xiàn)這個(gè)需求看看:
Observable.fromIterable(personList)
.map(new Function < Person, List < Plan >> () {
@Override
public List < Plan > apply(Person person) throws Exception {
return person.getPlanList();
}
})
.subscribe(new Observer < List < Plan >> () {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(List < Plan > plans) {
for (Plan plan: plans) {
List < String > planActionList = plan.getActionList();
for (String action: planActionList) {
Log.d(TAG, "==================action " + action);
}
}
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
可以看到 onNext() 用了嵌套 for 循環(huán)來(lái)實(shí)現(xiàn),如果代碼邏輯復(fù)雜起來(lái)的話陪每,可能需要多重循環(huán)才可以實(shí)現(xiàn)影晓。
現(xiàn)在看下使用 flatMap() 實(shí)現(xiàn):
Observable.fromIterable(personList)
.flatMap(new Function < Person, ObservableSource < Plan >> () {
@Override
public ObservableSource < Plan > apply(Person person) {
return Observable.fromIterable(person.getPlanList());
}
})
.flatMap(new Function < Plan, ObservableSource < String >> () {
@Override
public ObservableSource < String > apply(Plan plan) throws Exception {
return Observable.fromIterable(plan.getActionList());
}
})
.subscribe(new Observer < String > () {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
Log.d(TAG, "==================action: " + s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
從代碼可以看出,只需要兩個(gè) flatMap() 就可以完成需求檩禾,并且代碼邏輯非常清晰挂签。
組合操作符
1. zip()
有什么用
將兩個(gè)被觀察者事件整合發(fā)送給觀察者,事件數(shù)量與兩個(gè)觀察這種事件數(shù)量較小的相同
怎么用
Observable.zip(Observable.intervalRange(1, 5, 1, 2, TimeUnit.SECONDS)
.map(aLong -> {
String s1 = "A" + aLong;
Log.d(TAG, "===================A 發(fā)送的事件 " + s1);
return s1;
}).subscribeOn(Schedulers.io()),
Observable.intervalRange(1, 6, 1, 1, TimeUnit.SECONDS)
.map(aLong -> {
String s2 = "B" + aLong;
Log.d(TAG, "===================B 發(fā)送的事件 " + s2);
return s2;
}).subscribeOn(Schedulers.io()),
(s, s2) -> {
String res = s + s2;
return res;
})
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "===================onSubscribe ");
}
@Override
public void onNext(String s) {
Log.d(TAG, "===================onNext " + s);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "===================onError ");
}
@Override
public void onComplete() {
Log.d(TAG, "===================onComplete ");
}
});
日志打优尾:
2019-05-20 22:19:10.293 6183-6183/com.danc.calendartext D/MainActivity: ===================onSubscribe
2019-05-20 22:19:11.308 6183-6210/com.danc.calendartext D/MainActivity: ===================A 發(fā)送的事件 A1
2019-05-20 22:19:11.308 6183-6209/com.danc.calendartext D/MainActivity: ===================B 發(fā)送的事件 B1
2019-05-20 22:19:11.308 6183-6209/com.danc.calendartext D/MainActivity: ===================onNext A1B1
2019-05-20 22:19:12.307 6183-6209/com.danc.calendartext D/MainActivity: ===================B 發(fā)送的事件 B2
2019-05-20 22:19:13.307 6183-6210/com.danc.calendartext D/MainActivity: ===================A 發(fā)送的事件 A2
2019-05-20 22:19:13.307 6183-6210/com.danc.calendartext D/MainActivity: ===================onNext A2B2
2019-05-20 22:19:13.307 6183-6209/com.danc.calendartext D/MainActivity: ===================B 發(fā)送的事件 B3
2019-05-20 22:19:14.308 6183-6209/com.danc.calendartext D/MainActivity: ===================B 發(fā)送的事件 B4
2019-05-20 22:19:15.306 6183-6210/com.danc.calendartext D/MainActivity: ===================A 發(fā)送的事件 A3
2019-05-20 22:19:15.307 6183-6210/com.danc.calendartext D/MainActivity: ===================onNext A3B3
2019-05-20 22:19:15.307 6183-6209/com.danc.calendartext D/MainActivity: ===================B 發(fā)送的事件 B5
2019-05-20 22:19:16.307 6183-6209/com.danc.calendartext D/MainActivity: ===================B 發(fā)送的事件 B6
2019-05-20 22:19:17.307 6183-6210/com.danc.calendartext D/MainActivity: ===================A 發(fā)送的事件 A4
2019-05-20 22:19:17.308 6183-6210/com.danc.calendartext D/MainActivity: ===================onNext A4B4
2019-05-20 22:19:19.307 6183-6210/com.danc.calendartext D/MainActivity: ===================A 發(fā)送的事件 A5
2019-05-20 22:19:19.307 6183-6210/com.danc.calendartext D/MainActivity: ===================onNext A5B5
2019-05-20 22:19:19.307 6183-6210/com.danc.calendartext D/MainActivity: ===================onComplete
功能操作符
1. delay()
有什么用
訂閱后竹握,延遲一段時(shí)間發(fā)送事件
怎么用
Observable.just(1, 3, 5)
.delay(2, TimeUnit.SECONDS)
.subscribe(integer -> Log.d(TAG, "accept: " + integer));
打印日志
2019-05-20 22:24:53.879 6430-6430/com.danc.calendartext D/MainActivity: onSubscribe
2019-05-20 22:24:55.881 6430-6475/com.danc.calendartext D/MainActivity: accept: 1
2019-05-20 22:24:55.881 6430-6475/com.danc.calendartext D/MainActivity: accept: 3
2019-05-20 22:24:55.882 6430-6475/com.danc.calendartext D/MainActivity: accept: 5
從日志可以暗處,訂閱之后延遲2s開(kāi)始發(fā)送事件
2. doOnNext()
有什么用
每次收到onNext()事件辆飘,先執(zhí)行doOnNext()
怎么用
Observable.create((ObservableOnSubscribe<Integer>) e -> {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
})
.doOnNext(integer -> Log.d(TAG, "doOnNext " + integer))
.subscribe(new Observer < Integer > () {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe ");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError ");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete ");
}
});
日志打印
D/MainActivity: onSubscribe
D/MainActivity: doOnNext 1
D/MainActivity: onNext 1
D/MainActivity: doOnNext 2
D/MainActivity: onNext 2
D/MainActivity: doOnNext 3
D/MainActivity: onNext 3
D/MainActivity: onComplete
想要查看更多的RxJava2的API,可以看下面這篇文章。這篇文章更像是查閱的資料驹闰,只需要通覽了解一遍即可晨雳,熟悉常用的API,其余有個(gè)印象紧卒,需要的時(shí)候再查閱
RxJava2 只看這一篇文章就夠了 - 掘金
生命周期管理
RxJava固然很好用侥衬,但是使用不當(dāng),很容易造成內(nèi)存泄漏跑芳。
比如轴总,使用RxJava發(fā)布一個(gè)訂閱后,當(dāng)Activity被finish博个,此時(shí)訂閱邏輯還未完成怀樟,如果沒(méi)有及時(shí)取消訂閱,就會(huì)導(dǎo)致Activity無(wú)法被回收盆佣,從而引發(fā)內(nèi)存泄漏往堡。
目前,針對(duì)以上這種情況共耍,主流的有兩種解決方案:
- 通過(guò)封裝虑灰,手動(dòng)控制每一次訂閱,在合適的時(shí)機(jī)取消訂閱
- 使用RxLifecycle痹兜,關(guān)注Fragment穆咐、Activity的生命周期,在onDestroy()生命周期中自動(dòng)取消訂閱
手動(dòng)實(shí)現(xiàn)
以下是基于MVP架構(gòu)封裝的
protected CompositeDisposable compositeDisposable;
@UiThread
public void attachView(V view) {
mView = view;
compositeDisposable = new CompositeDisposable();
}
@UiThread
public void detachView() {
mView = null;
if (compositeDisposable != null) {
compositeDisposable.clear();
compositeDisposable = null;
}
}
如何使用
在每次發(fā)生訂閱關(guān)系時(shí)字旭,將返回的Disposable實(shí)例保存在CompositeDisposable中对湃,在onDestroy()生命周期解除所有的訂閱關(guān)系
RxLifecycle
1. 添加依賴
在項(xiàng)目中添加以下依賴
implementation 'com.trello.rxlifecycle2:rxlifecycle:2.2.1'
implementation 'com.trello.rxlifecycle2:rxlifecycle-android:2.2.1'
implementation 'com.trello.rxlifecycle2:rxlifecycle-components:2.2.1'
2. 綁定生命周期
RxLifecycle提供了大量的Activity、Fragment以供繼承
代碼如下:
public class MainActivity extends RxAppCompatActivity {
...
...
}
使用bindToLifecycle()
以Activity為例熟尉,在Activity中使用bindToLifecycle()方法,完成Observable發(fā)布的事件和當(dāng)前的組件綁定洲脂,實(shí)現(xiàn)生命周期同步斤儿。從而實(shí)現(xiàn)當(dāng)前組件生命周期結(jié)束時(shí),自動(dòng)取消對(duì)Observable訂閱恐锦,代碼如下:
public class MainActivity extends RxAppCompatActivity {
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
// 當(dāng)執(zhí)行onDestory()時(shí)往果, 自動(dòng)解除訂閱
Observable.interval(1, TimeUnit.SECONDS)
.doOnDispose(new Action() {
@Override
public void run() throws Exception {
Log.i(TAG, "Unsubscribing subscription from onCreate()");
}
})
.compose(this.<Long>bindToLifecycle())
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long num) throws Exception {
Log.i(TAG, "Started in onCreate(), running until onDestory(): " + num);
}
});
}
}
下一篇文章將針對(duì)訂閱流程以及線程管理的源碼進(jìn)行分析