叫這個(gè)題目也是因?yàn)檫@篇博客寫了太久太久了!有段時(shí)間都覺(jué)得完全沒(méi)有必要寫下去的挥吵,索性終于完工了炫惩,也算是對(duì)這段時(shí)間的肯定吧!
RxJava基本概念
RxJava
有四個(gè)基本概念:Observable
(可觀察者扁瓢,即被觀察者)、 Observer
(觀察者)补君、 subscribe
(訂閱)引几、事件。Observable
和 Observer
通過(guò) subscribe()
方法實(shí)現(xiàn)訂閱關(guān)系挽铁,從而 Observable
可以在需要的時(shí)候發(fā)出事件來(lái)通知 Observer
伟桅。
接下來(lái)就圍繞Observable創(chuàng)建、Observer創(chuàng)建叽掘、線程切換楣铁、事件類型轉(zhuǎn)換、訂閱和取消訂閱展開更扁。
Observerble的基本創(chuàng)建方式:
1盖腕、create()//最基本的
Observable<String> observable1 = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("xxixxii");
subscriber.onCompleted();//這里必須調(diào)用該方法或者onError(),通知訂閱者發(fā)送完畢,否者無(wú)法進(jìn)行解除訂閱浓镜。
}
});
2溃列、form()//適配集合等
ArrayList<Student> students = new ArrayList<>();
students.add(s1);
students.add(s2);
students.add(s3);
students.add(s1);
students.add(s4);
students.add(s5);
students.add(s6);
Observable.from(students)
3、just()//適配已經(jīng)寫好的方法
Student s1 = new Student(19, "xiaoqiang");
Student s2 = new Student(19, "xiaoqiang1");
Student s3 = new Student(20, "xiaoqiang1");
Student s4 = new Student(19, "xiaoqiang");
Student s5 = new Student(21, "xiaoqiang");
Student s6 = new Student(22, "xiaoqiang");
Observable.just(s1, s2, s3, s4, s5, s6)
4膛薛、merge(o1,02)//將多個(gè)合并為一個(gè)
5听隐、concat(o1,o2)//one by one emit!
Observer Subscriber的創(chuàng)建
Subscriber<String> stringSubscriber = new Subscriber<String>() {
@Override
public void onStart() {
Log.e(TAG, "onStart: ");
}
@Override
public void onCompleted() {
Log.e(TAG, "onCompleted: ");
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError: ");
}
@Override
public void onNext(String s) {
Log.e(TAG, "onNext: " + s);
}
};
Observer<String> stringObserver = new Observer<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
}
};
這里需要注意:Observer
和Subscriber
不僅基本使用方式一樣,實(shí)質(zhì)上哄啄,在 RxJava
的 subscribe
過(guò)程中雅任,Observer
也總是會(huì)先被轉(zhuǎn)換成一個(gè) Subscriber
再使用风范。所以如果你只想使用基本功能,選擇 Observer
和 Subscriber
是完全一樣的沪么。它們的區(qū)別對(duì)于使用者來(lái)說(shuō)主要有兩點(diǎn):
onStart()
: 這是Subscriber
增加的方法乌企。它會(huì)在subscribe
剛開始,而事件還未發(fā)送之前被調(diào)用成玫,可以用于做一些準(zhǔn)備工作加酵,例如數(shù)據(jù)的清零或重置。這是一個(gè)可選方法哭当,默認(rèn)情況下它的實(shí)現(xiàn)為空猪腕。需要注意的是,如果對(duì)準(zhǔn)備工作的線程有要求(例如彈出一個(gè)顯示進(jìn)度的對(duì)話框钦勘,這必須在主線程執(zhí)行)陋葡,onStart()
就不適用了,因?yàn)樗偸窃?subscribe
所發(fā)生的線程被調(diào)用彻采,而不能指定線程腐缤。要在指定的線程來(lái)做準(zhǔn)備工作,可以使用doOnSubscribe()
方法肛响,具體可以在后面的文中看到岭粤。
unsubscribe()
: 這是Subscriber
所實(shí)現(xiàn)的另一個(gè)接口Subscription
的方法,用于取消訂閱特笋。在這個(gè)方法被調(diào)用后剃浇,Subscriber
將不再接收事件。一般在這個(gè)方法調(diào)用前猎物,可以使用isUnsubscribed()
先判斷一下狀態(tài)虎囚。unsubscribe()
這個(gè)方法很重要,因?yàn)樵?subscribe()
之后蔫磨,Observable
會(huì)持有Subscriber
的引用淘讥,這個(gè)引用如果不能及時(shí)被釋放,將有內(nèi)存泄露的風(fēng)險(xiǎn)堤如。所以最好保持一個(gè)原則:要在不再使用的時(shí)候盡快在合適的地方(例如onStop()
蒲列、onDestory()
等方法中)調(diào)用unsubscribe()
來(lái)解除引用關(guān)系,以避免內(nèi)存泄露的發(fā)生煤惩。
強(qiáng)大的條件篩選
說(shuō)了這么多沒(méi)用的東西嫉嘀,肯定要來(lái)點(diǎn)兒實(shí)際的才能體會(huì)到RxJava的強(qiáng)大功能!
1魄揉、take(?)
只發(fā)送指定數(shù)量的事件剪侮。
2、filter(?)
過(guò)濾指定條件的事件。
3瓣俯、first()
只發(fā)送第一個(gè)事件杰标。
4、distinct(?)
只發(fā)送不同的事件彩匕。(怎么定義為不同腔剂?!)
其實(shí)還有很多驼仪。掸犬。。
隨時(shí)隨地線程切換
說(shuō)完創(chuàng)建過(guò)濾你可能覺(jué)得這也沒(méi)撒嘛绪爸!那么接下來(lái)想想之前在Android開發(fā)里要切換線程需要怎么處理呢湾碎?view.post()
或者使用handler.sendMessage()
!而在RxJava中,線程切換不用這么搞了奠货,Schedulers
是RxJava中用來(lái)管理相關(guān)線程調(diào)度的介褥,基于訂閱和被訂閱,這里有兩個(gè)方法递惋!
1柔滔、subscribeOn()
事件產(chǎn)生在哪個(gè)線程。
2萍虽、observeOn()
事件消費(fèi)在哪個(gè)線程睛廊。
3、Schedulers.immediate()
: 直接在當(dāng)前線程運(yùn)行贩挣,相當(dāng)于不指定線程喉前。這是默認(rèn)的 Scheduler。
4王财、Schedulers.newThread()
: 總是啟用新線程,并在新線程執(zhí)行操作裕便。
5绒净、Schedulers.io()
: I/O 操作(讀寫文件、讀寫數(shù)據(jù)庫(kù)偿衰、網(wǎng)絡(luò)信息交互等)所使用的 Scheduler
挂疆。其行為模式和 newThread()
是差不多滴,但是區(qū)別在于io()
的內(nèi)部實(shí)現(xiàn)是是用一個(gè)無(wú)數(shù)量上限的線程池下翎,可以重用空閑的線程缤言,因此多數(shù)情況下 io()
比newThread()
更有效率。不要把計(jì)算工作放在io()
中视事,可以避免創(chuàng)建不必要的線程胆萧。
6、Schedulers.computation()
: 計(jì)算所使用的 Scheduler。這個(gè)計(jì)算指的是CPU 密集型計(jì)算跌穗,即不會(huì)被 I/O 等操作限制性能的操作订晌,例如圖形的計(jì)算。這個(gè) Scheduler使用的固定的線程池蚌吸,大小為 CPU 核數(shù)锈拨。不要把 I/O 操作放在 computation()中,否則 I/O 操作的等待時(shí)間會(huì)浪費(fèi) CPU羹唠。
7奕枢、AndroidSchedulers.mainThread()
:Android 特供,它指定的操作將在 Android 主線程運(yùn)行佩微。
transform 轉(zhuǎn)換
強(qiáng)大的內(nèi)部轉(zhuǎn)換功能验辞,讓你可以做到要什么就是什么。
1喊衫、map()
:進(jìn)行對(duì)象轉(zhuǎn)換跌造,不會(huì)創(chuàng)建新的Observable
。
2族购、flatMap()
:也是進(jìn)行對(duì)象轉(zhuǎn)換壳贪,會(huì)創(chuàng)建新的Observable
。
3寝杖、buffer()
违施、:緩沖區(qū),緩沖指定的Observable
包裝成新的
4瑟幕、Observable
發(fā)射磕蒲。
5、toList()
:將單個(gè)的對(duì)象轉(zhuǎn)換為集合只盹。
取消訂閱
爽了之后重視要記住一件事辣往,那就是要釋放相關(guān)資源!不然后果也是很嚴(yán)重的殖卑,尤其是在使用RxView
相關(guān)的方法時(shí)會(huì)警告你需要調(diào)用Unsubscribe()
來(lái)釋放相關(guān)的引用站削。
釋放操作其實(shí)很簡(jiǎn)單。定義一個(gè)集合維護(hù)相關(guān)的Subscription
孵稽,然后在Activity
的onStop()
或者onDestroy()
方法中釋放相關(guān)資源许起。
Subscription clickSubscribe = RxView.clicks(findViewById(R.id.bt))
.throttleFirst(1, TimeUnit.SECONDS)
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
Log.e(TAG, "clicks->doOnUnsubscribe");
}
})
.subscribe(new Action1<Void>() {
@Override
public void call(Void aVoid) {
methd6();
}
});
//維護(hù)相關(guān)的資源引用
subscriptions.add(clickSubscribe);
@Override
protected void onDestroy() {
for (Subscription s : subscriptions) {
if (!s.isUnsubscribed()) {
s.unsubscribe();
Log.e(TAG, "onDestroy: 取消訂閱!");
}
}
super.onDestroy();
}
手動(dòng)create()
一個(gè)Observable
的話菩鲜,一定要調(diào)用 onComplete()
或者onError()
來(lái)結(jié)束這個(gè)事件园细,不然資源也不會(huì)被釋放的。
動(dòng)手時(shí)間
練習(xí)一
統(tǒng)計(jì)集合中年齡大于20的學(xué)生姓名(年齡姓名一致的視為同一個(gè)=有!)
首先當(dāng)然是創(chuàng)建Observable
猛频!
//初始化數(shù)據(jù)
Student s1 = new Student(19, "xiaoqiang0");
Student s2 = new Student(20, "xiaoqiang1");
Student s3 = new Student(21, "xiaoqiang2");
Student s4 = new Student(22, "xiaoqiang3");
Student s5 = new Student(23, "xiaoqiang4");
Student s6 = new Student(24, "xiaoqiang5");
Student s7 = new Student(25, "xiaoqiang6");
Student s8 = new Student(25, "xiaoqiang5");
students = new ArrayList<>();
students.add(s1);
students.add(s2);
students.add(s3);
students.add(s1);
students.add(s4);
students.add(s5);
students.add(s6);
students.add(s7);
students.add(s8);
Observable.just(students)//創(chuàng)建Observable
.flatMap(new Func1<ArrayList<Student>, Observable<Student>>() {
@Override
public Observable<Student> call(ArrayList<Student> students) {
//變換為新的Observable
return Observable.from(students);
}
})
//過(guò)濾掉年齡和姓名相同的對(duì)象
.distinct()
//過(guò)濾掉年齡小于20的對(duì)象
.filter(new Func1<Student, Boolean>() {
@Override
public Boolean call(Student student) {
return student.getAge() >= 20;
}
})
//將事件對(duì)象由Student 轉(zhuǎn)換為 String
.map(new Func1<Student, String>() {
@Override
public String call(Student student) {
return student.getName();
}
})
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
Log.e(TAG, "call: 取消訂閱了!!");
}
})
//訂閱
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
Log.e(TAG, "onNext: " + s);
}
});
E/MainActivity: onNext: xiaoqiang1
E/MainActivity: onNext: xiaoqiang2
E/MainActivity: onNext: xiaoqiang3
E/MainActivity: onNext: xiaoqiang4
E/MainActivity: onNext: xiaoqiang5
E/MainActivity: onNext: xiaoqiang6
E/MainActivity: onNext: xiaoqiang5
E/MainActivity: call: 取消訂閱了B浊恰厉亏!
PS:至于這里對(duì)象的唯一性判斷是通過(guò)復(fù)寫equal()
和hashCode()
來(lái)實(shí)現(xiàn)的!
@Override
public boolean equals(Object o) {
return o instanceof Student && this.getAge() == ((Student) o).getAge() && this.getName().equals(((Student) o).getName());
}
@Override
public int hashCode() {
return Arrays.hashCode(new Object[]{getAge(), getName()});
}
練習(xí)二
concat()
使用:本地有緩存讀取本地的數(shù)據(jù)烈和,沒(méi)有走網(wǎng)絡(luò)請(qǐng)求并緩存到本地爱只。
Observable<String> netObservable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
Log.e(TAG, "走網(wǎng)絡(luò)了!招刹!");
subscriber.onNext("這是緩存數(shù)據(jù)L袷浴!");
subscriber.onCompleted();
}
}).doOnNext(new Action1<String>() {
@Override
public void call(String s) {
Log.e(TAG, "call: 保存數(shù)據(jù)到本地");
rxPreferences.getString("cash").asAction().call(s);
}
});
Observable<String> nativeObservable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
if (TextUtils.isEmpty(rxPreferences.getString("cash").get())) {
Log.e(TAG, "沒(méi)有緩存疯暑,走網(wǎng)絡(luò)训柴!");
subscriber.onCompleted();
} else {
Log.e(TAG, "有緩存!");
subscriber.onNext(rxPreferences.getString("cash").get());
subscriber.onCompleted();
}
}
});
Observable.concat(nativeObservable, netObservable)
.first()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
Log.e(TAG, "完成了妇拯!");
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "錯(cuò)誤了幻馁!");
}
@Override
public void onNext(String s) {
Log.e(TAG, s);
}
});
//第一次
E/MainActivity: 沒(méi)有緩存,走網(wǎng)絡(luò)越锈!
E/MainActivity: 走網(wǎng)絡(luò)了U锑隆!
E/MainActivity: call: 保存數(shù)據(jù)到本地
E/MainActivity: 這是緩存數(shù)據(jù)8势尽稀拐!
E/MainActivity: 完成了!
//第二次
E/MainActivity: 有緩存丹弱!
E/MainActivity: 這是緩存數(shù)據(jù)5虑恕!
E/MainActivity: 完成了躲胳!
練習(xí)三
merge()
使用:服務(wù)端和本地都有相關(guān)數(shù)據(jù)蜓洪,匯總展示。使用merge()對(duì)應(yīng)的事件順序是無(wú)序的泛鸟,誰(shuí)先產(chǎn)生了誰(shuí)就先發(fā)送蝠咆!
Observable<String> just = Observable.just("S", "O", "S")
.subscribeOn(Schedulers.newThread())
.doOnNext(new Action1<String>() {
@Override
public void call(String s) {
SystemClock.sleep(20);
}
});
Observable<String> just1 = Observable.just("S","T","R").subscribeOn(Schedulers.newThread())
.doOnNext(new Action1<String>() {
@Override
public void call(String s) {
SystemClock.sleep(20);
}
});
Observable.merge(just1, just)
.subscribeOn(Schedulers.newThread())
.distinct()
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.e(TAG, "call: " + s);
}
});
E/MainActivity: call: S
E/MainActivity: call: T
E/MainActivity: call: R
E/MainActivity: call: O
//或者這樣
E/MainActivity: call: S
E/MainActivity: call: T
E/MainActivity: call: O
E/MainActivity: call: R
練習(xí)四
RxView和RxCompoundButton的使用:
RxCompoundButton.checked(checkBox).call(rxPreferences.getBoolean("checked").get());
//noinspection ConstantConditions
Subscription checkedSubscription1 = RxCompoundButton.checkedChanges(checkBox)
.subscribe(rxPreferences.getBoolean("checked").asAction());
subscriptions.add(checkedSubscription1);
Subscription checkedSubscription = rxPreferences.getBoolean("checked")
.asObservable()
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
Log.e(TAG, "rxPreferences->doOnUnsubscribe");
}
})
.subscribe(new Subscriber<Boolean>() {
@Override
public void onCompleted() {
Log.e(TAG, "rxPreferences->onNext: +onCompleted");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Boolean aBoolean) {
Log.e(TAG, "rxPreferences->onNext: " + aBoolean);
}
});
subscriptions.add(checkedSubscription);
練習(xí)五
RxJava和Retrofit的搭配使用:
Subscription beforeSubscribe = BaseDataManager.getDailyApiService()
.getBeforeDailyStories(date)
.subscribeOn(Schedulers.io())//事件產(chǎn)生在子線程
.doOnSubscribe(new Action0() {//subscribe之后,事件發(fā)送前執(zhí)行北滥。
@Override
public void call() {
isLoadingMore = true;
Log.e("TAG", "call: true");
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<DailyStories>() {
@Override
public void call(DailyStories dailyStories) {
mView.onLoadMore(dailyStories);
mView.isLoadingMore(false);
Log.e("TAG", "call: false");
isLoadingMore = false;
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
mView.onLoadError(throwable.toString());
isLoadingMore = false;
}
});
subscribe(beforeSubscribe);
相關(guān)回調(diào)
對(duì)于Observable,這里有一系列的回調(diào)方法闸翅,作用在不同的時(shí)期再芋,其中常用的是doOnSubscribe()
和doOnNext()
。
另外在Subscriber里坚冀,還有一個(gè)onStart()
的方法济赎!
Observable.just("L", "O", "V", "E")
.doOnSubscribe(new Action0() {
@Override
public void call() {
Log.e(TAG, "call: doOnSubscribe");
}
})
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
Log.e(TAG, "call: doOnUnsubscribe");
}
})
.doOnEach(new Subscriber<String>() {
@Override
public void onCompleted() {
Log.e(TAG, "doOnEach: onCompleted");
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "doOnEach: onError");
}
@Override
public void onNext(String s) {
Log.e(TAG, "doOnEach: onNext:"+s);
}
})
.doOnNext(new Action1<String>() {
@Override
public void call(String s) {
Log.e(TAG, "call: doOnNext");
}
})
.doOnRequest(new Action1<Long>() {
@Override
public void call(Long aLong) {
Log.e(TAG, "call: doOnRequest");
}
})
.doOnTerminate(new Action0() {
@Override
public void call() {
Log.e(TAG, "call: doOnTerminate");
}
})
.doAfterTerminate(new Action0() {
@Override
public void call() {
Log.e(TAG, "call: doAfterTerminate");
}
})
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
Log.e(TAG, "subscribe->call: onCompleted");
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "subscribe->call: onError");
}
@Override
public void onNext(String s) {
Log.e(TAG, "subscribe->call: onNext:" + s);
}
});
E/MainActivity: subscribe->call: onStart:
E/MainActivity: call: doOnRequest
E/MainActivity: call: doOnSubscribe
E/MainActivity: doOnEach: onNext:L
E/MainActivity: call: doOnNext
E/MainActivity: subscribe->call: onNext:L
E/MainActivity: doOnEach: onNext:O
E/MainActivity: call: doOnNext
E/MainActivity: subscribe->call: onNext:O
E/MainActivity: doOnEach: onNext:V
E/MainActivity: call: doOnNext
E/MainActivity: subscribe->call: onNext:V
E/MainActivity: doOnEach: onNext:E
E/MainActivity: call: doOnNext
E/MainActivity: subscribe->call: onNext:E
E/MainActivity: doOnEach: onCompleted
E/MainActivity: call: doOnTerminate
E/MainActivity: subscribe->call: onCompleted
E/MainActivity: call: doOnUnsubscribe
E/MainActivity: call: doAfterTerminate
通過(guò)Log可以看到一個(gè)事件的訂閱過(guò)程。
首先回調(diào)Subscriber.onStart()
表明Observable
和Subscriber
已經(jīng)建立了連接,但是這個(gè)時(shí)候事件還沒(méi)有開始發(fā)射司训。
然后是 doOnRequest()
和doOnSubscribe()
,這個(gè)時(shí)候事件也沒(méi)有開始發(fā)射构捡。
接著是doOnEach()
和doOnNext()
和onNext()
,這個(gè)時(shí)候事件正式發(fā)射了。
最后發(fā)射完了壳猜,就是onCompleted
勾徽、doOnTerminate()
,然后取消訂閱釋放資源doOnUnsubscribe()
。
Rx全家桶
歡迎加入Rx全家桶豪華套餐统扳,只有你想不到的喘帚,沒(méi)有它做不到的!
小結(jié)
總的來(lái)說(shuō)咒钟,使用RxJava之后吹由,可以讓我們的代碼更加清晰一目了然,而且線程的切換也非常的簡(jiǎn)單朱嘴,不用自己維護(hù)相關(guān)線程和寫那該死的Runnable
,內(nèi)部強(qiáng)大的事件轉(zhuǎn)換和篩選過(guò)濾也是為我們開發(fā)省去了不少的工作倾鲫。
參考文檔
1、給 Android 開發(fā)者的 RxJava 詳解
2萍嬉、RxJava wiki
3乌昔、RxJava使用場(chǎng)景小結(jié)
4、Demo下載
---- Edit By Joe ----