這篇文章大概是2017年時(shí)整理的,一直在OneNote中存放著铭乾,如今創(chuàng)建了個(gè)人博客吉拳,自然拿了出來。PS:從OneNote中復(fù)制出來后格式亂碼烈疚,整理的我想吐...
介紹
RxJava是Java上一個(gè)靈活的黔牵、使用可觀測序列組成的一個(gè)異步的、基于事件的庫爷肝。
特點(diǎn):
- 作用:異步
- 模式:觀察者模式-本質(zhì)上是基于回調(diào)
- 結(jié)構(gòu):響應(yīng)式編程
- 邏輯簡潔猾浦,可讀性高,易維護(hù)
- 鏈?zhǔn)浇Y(jié)構(gòu)的執(zhí)行順序
基本流程
- 創(chuàng)建事件資源灯抛,也就是被觀察者金赦。可以用
Observable.create/just/from
等方法來創(chuàng)建对嚼。 - 通過filter/debounce等操作符夹抗,進(jìn)行自定義事件過濾。
- 通過Schedules進(jìn)行事件發(fā)送和訂閱的線程控制纵竖,也就是
subscribeOn()和observeOn()
漠烧。 - 通過map/flatMap/compose等操作符杏愤,進(jìn)行事件的變換
- 調(diào)用subscribe進(jìn)行事件訂閱。
- 最后已脓,不要忘了對訂閱者生命周期的控制珊楼,不用的時(shí)候,記得調(diào)用
unsubscribe()
度液,以免引發(fā)內(nèi)存泄漏厕宗。
注意:未取消訂閱而引起的內(nèi)存泄漏。在
Activtity.onDestroy()
或不需要繼續(xù)執(zhí)行時(shí)取消訂閱堕担。
CompositeSubscription已慢, 相當(dāng)于一個(gè)Subscription集合,來取消所有訂閱霹购。
示例
CompositeSubscription list = new CompositeSubscription();
list.add(subscription1);
list.add(subscription2);
list.add(subscription3);
// 統(tǒng)一調(diào)用一次unsubscribe蛇受,就可以把所有的訂閱都取消
list.unsubscribe();
基礎(chǔ)知識
Observer和Subscriber的關(guān)系
- Observer是觀察者,Subscriber也是觀察者厕鹃。
- Subscriber是一個(gè)實(shí)現(xiàn)了Observer接口的抽象類兢仰,對Observer進(jìn)行了部分?jǐn)U展,在使用上基本沒有區(qū)別剂碴。
- Subscriber多了發(fā)送之前調(diào)用的
onStart()
和解除訂閱關(guān)系的unsubscribe()
方法把将。 - 在RxJava的subscribe過程中,Observer也總是會先被轉(zhuǎn)換成一個(gè)Subscriber再使用忆矛。
- RxJava開發(fā)過程中一般都使用Subscriber察蹲。
RxJava的事件訂閱回調(diào)
支持以下三種不完整定義的回調(diào),我們可以根據(jù)當(dāng)前需要催训,傳入對應(yīng)的Action洽议,RxJava會相應(yīng)的自動創(chuàng)建Subscriber。
observable.subscribe(onNextAction);
observable.subscribe(onNextAction, onErrorAction);
observable.subscribe(onNextAction, onErrorAction, onCompleteAction);
響應(yīng)式編程
Observable發(fā)出一系列事件漫拭,它是事件的產(chǎn)生者亚兄。
Subscriber負(fù)責(zé)處理事件,它是事件的消費(fèi)者采驻。
-
Operator是對Observable發(fā)出的事件進(jìn)行修改和變換 审胚。
注意:若事件從產(chǎn)生到消費(fèi)不需要其他處理,則可以省略掉中間的Operator礼旅,從而流程變?yōu)?strong>Obsevable -> Subscriber膳叨。
Subscriber通常在主線程執(zhí)行,所以原則上不要去處理太多的事務(wù)痘系,而這些復(fù)雜的事務(wù)處理則交給Operator菲嘴。
知識點(diǎn)
Scheduler線程控制
默認(rèn)情況下,RxJava事件產(chǎn)生和消費(fèi)均在同一個(gè)線程中,例如在主線程中調(diào)用龄坪,那么事件的產(chǎn)生和消費(fèi)都在主線程昭雌,但RxJava可以自由切換線程。
RxJava線程調(diào)度器
Schedulers.io();
I/O操作(讀寫文件悉默、數(shù)據(jù)庫、網(wǎng)絡(luò)請求等)苟穆,與 newThread() 差不多抄课,區(qū)別在于 io() 的內(nèi)部實(shí)現(xiàn)是是用一個(gè)無數(shù)量上限的線程池,可以重用空閑的線程雳旅,因此多數(shù)情況下 io() 效率比 newThread() 更高跟磨。值得注意的是,在 io() 下攒盈,不要進(jìn)行大量的計(jì)算抵拘,以免產(chǎn)生不必要的線程。Schedulers.newThread();
開啟新線程操作Schedulers.immediate();
默認(rèn)指定的線程型豁,也就是當(dāng)前線程Schedulers.computation();
計(jì)算所使用的調(diào)度器僵蛛。這個(gè)計(jì)算指的是CPU密集型計(jì)算,即不會被I/O等操作限制性能的操作迎变,例如圖形的計(jì)算充尉。這個(gè)Scheduler使用的固定的線程池,大小為CPU核數(shù)衣形。值得注意的是不要把I/O操作放在computation()中否則I/O操作的等待時(shí)間會浪費(fèi)CPU驼侠。
注意:
AndroidSchedulers.mainThread();
是RxJava擴(kuò)展的Android主線程。- 通過
subscribeOn()
和observeOn()
這兩個(gè)方法來進(jìn)行線程調(diào)度谆吴。
變換操作符(重點(diǎn))
RxJava可以將發(fā)送的事件或事件序列倒源,加工后轉(zhuǎn)換成不用的事件或事件序列。
map操作符
- 是一對一的變換
- 返回的是變換后的對象
- 變換后的對象直接發(fā)到Subscriber回調(diào)中
flatMap操作符
- 可以適應(yīng)一對多的變換
- 返回的是一個(gè)Observable被觀察者對象
- 返回的Observable對象并不是直接發(fā)送到Subscriber的回調(diào)中句狼,而是重新創(chuàng)建一個(gè)Observable對象笋熬,并激活這個(gè)Observable對象,使之開始發(fā)送事件
- flatMap變換后產(chǎn)生的每一個(gè)Observable對象發(fā)送的事件腻菇,最終都匯入同一個(gè)Observable突诬,進(jìn)而發(fā)送給Subscriber回調(diào)
注意:
- map的返回類型與flatMap返回的Observable事件類型,可以與原來的事件類型一樣
- 可以對一個(gè)Observable多次使用map和flatMap
- flatMap常常被用于嵌套的異步操作芜繁,例如:嵌套網(wǎng)絡(luò)請求
map操作符代碼示例
final ImageView ivLogo = (ImageView) findViewById(R.id.ivLogo);
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("https://ss2.baidu.com/-vo3dSag_xI4khGko9WTAnF6hhy/image/h%3D200/sign=4db5130a073b5bb5a1d727fe06d2d523/cf1b9d16fdfaaf51965f931e885494eef11f7ad6.jpg");
}
})
.map(new Func1<String, Drawable>() {
@Override
public Drawable call(String url) {
try {
Drawable drawable = Drawable.createFromStream(new URL(url).openStream(), "src");
return drawable;
} catch (IOException e) {
}
return null;
}
})
// 指定subscribe()所在的線程旺隙,也就是call()方法調(diào)用的線程
.subscribeOn(Schedulers.io())
// 指定Subscriber回調(diào)方法所在的線程,也就是onCompleted,onError,onNext回調(diào)的線程
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Drawable>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
Log.e(TAG, e.toString());
}
@Override
public void onNext(Drawable drawable) {
if (drawable != null) {
ivLogo.setImageDrawable(drawable);
}
}
});
flatMap操作符代碼示例
final List<Employee> list = new ArrayList<Employee>() {
{
add(new Employee("jackson", mission_list1));
add(new Employee("sunny", mission_list2));
}
};
Observable.from(list)
.flatMap(new Func1<Employee, Observable<Employee.Mission>>() {
@Override
public Observable<Employee.Mission> call(Employee employee) {
return Observable.from(employee.missions);
}
})
.subscribe(new Subscriber<Employee.Mission>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Employee.Mission mission) {
Log.i(TAG, mission.desc);
}
});
from操作符
接收一個(gè)集合作為輸入骏令,每次輸出一個(gè)元素給subscriber
注意:若需要執(zhí)行耗時(shí)操作蔬捷,即使在from中使用
subscribeOn(Schedulers.io())
,仍然是在主線程執(zhí)行,會造成界面卡頓甚至崩潰周拐。
from操作符代碼示例
// 格式:Observable.from(T[] params)
Observable.from(new Integer[]{1, 2, 3, 4, 5})
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer number) {
Log.i(TAG, "number:" + number);
}
});
just操作符
接收一個(gè)可變參數(shù)作為輸入铡俐,最終也是生成數(shù)組,調(diào)用from()妥粟,每次輸出一個(gè)元素給subscriber
just操作符代碼示例
// Observable.just(T... params)审丘,params的個(gè)數(shù)為1 ~ 10
Observable.just(1, 2, 3, 4, 5)
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer number) {
Log.i(TAG, "number:" + number);
}
});
filter操作符
條件過濾,用于去除不符合條件的事件
filter操作符代碼示例
Observable.from(new Integer[]{1, 2, 3, 4, 5})
.filter(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer number) {
// 偶數(shù)返回true勾给,則表示剔除奇數(shù)滩报,留下偶數(shù)
return number % 2 == 0;
}
})
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer number) {
Log.i(TAG, "number:" + number);
}
});
take操作符
最多保留的事件數(shù)
doOnNext操作符
在處理下一個(gè)事件前要做的事
take和doOnNext操作符代碼示例
Observable.from(new Integer[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12})
.filter(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer number) {
// 偶數(shù)返回true,則表示剔除奇數(shù)
return number % 2 == 0;
}
})
// 最多保留三個(gè)播急,也就是最后剩三個(gè)偶數(shù)
.take(3)
.doOnNext(new Action1<Integer>() {
@Override
public void call(Integer number) {
// 在輸出偶數(shù)之前輸出它的hashCode
Log.i(TAG, "hahcode = " + number.hashCode() + "");
}
})
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer number) {
Log.i(TAG, "number = " + number);
}
});
輸出結(jié)果:
hahcode = 2 number = 2 hahcode = 4 number = 4 hahcode = 6 number = 6
debounce操作符
過濾在指定的時(shí)間間隔之間的事件脓钾,接收一個(gè)事件后將在指定時(shí)間間隔后開始接收事件
debounce操作符代碼示例
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
int i = 0;
int[] times = new int[]{100, 1000};
while (true) {
i++;
if (i >= 100) break;
subscriber.onNext(i);
try {
/** 注意:
當(dāng)i為奇數(shù)時(shí),休眠1000ms桩警,然后才發(fā)送i+1可训,這時(shí)i不會被過濾掉。
當(dāng)i為偶數(shù)時(shí)捶枢,只休眠100ms握截,便發(fā)送i+1,這時(shí)i會被過濾掉
*/
Thread.sleep(times[i % 2]);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
subscriber.onCompleted();
}
})
// 間隔400ms以內(nèi)的事件將被丟棄
.debounce(400, TimeUnit.MILLISECONDS)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
Log.i(TAG, "complete");
}
@Override
public void onError(Throwable e) {
Log.e(TAG, e.toString());
}
@Override
public void onNext(Integer integer) {
Log.i(TAG, "integer = " + integer);
}
});
merge操作符
用于合并兩個(gè)Observable為一個(gè)Observable
格式:
Observable.merge(Observable1, Observable2)
.subscribe(subscriber);
concat操作符
順序的執(zhí)行多個(gè)Ovservable烂叔,個(gè)數(shù)為1—9(示例見first操作符)
compose操作符
類似flatMap川蒙,都是進(jìn)行變換,返回Observable對象长已,激活并發(fā)送事件
和flatmap區(qū)別
- compose是唯一一個(gè)能從數(shù)據(jù)流中得到原始Observable的操作符畜眨,需要對整個(gè)數(shù)據(jù)流產(chǎn)生作用的操作需使用compose來實(shí)現(xiàn)。如subscribeOn()和observeOn()术瓮,在flatMap中使用的話康聂,僅對在flatMap中創(chuàng)建的Observable起作用,不會對剩下的流產(chǎn)生影響.
- compose是對Observable整體的變換胞四。flatMap轉(zhuǎn)換Observable里的每一個(gè)事件恬汁,compose轉(zhuǎn)換的是整個(gè)Observable數(shù)據(jù)流。
- flatMap每發(fā)送一個(gè)事件都創(chuàng)建一個(gè)Observable辜伟,效率低氓侧。compose只在主干數(shù)據(jù)流上執(zhí)行操作。
- 建議使用compose代替flatMap导狡。
first操作符
只發(fā)送符合條件的第一個(gè)事件约巷。如:可以結(jié)合contact做網(wǎng)絡(luò)三級緩存
first操作符代碼示例
// 從緩存獲取
Observable<BookList> fromDisk = Observable.create(new Observable.OnSubscribe<BookList>() {
@Override
public void call(Subscriber<? super BookList> subscriber) {
BookList list = getFromDisk();
if (list != null) {
subscriber.onNext(list);
} else {
subscriber.onCompleted();
}
}
});
// 從網(wǎng)絡(luò)獲取
Observable<BookList> fromNetWork = bookApi.getBookDetailDisscussionList();
Observable.concat(fromDisk, fromNetWork)
// 如果緩存不為null,則不再進(jìn)行網(wǎng)絡(luò)請求旱捧。
.first()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<BookList>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(BookList discussionList) {
}
});
timer操作符
定時(shí)器独郎,可以做定時(shí)操作或延遲操作
timer操作符代碼示例
Observable.timer(2, TimeUnit.SECONDS)
.subscribe(new Subscriber<Long>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Long aLong) {
Log.i(TAG, "Hello World!");
}
});
interval操作符
定時(shí)的周期性操作踩麦,與timer操作符的區(qū)別是可以重復(fù)操作
throttleFirst操作符
類似debounce操作符,時(shí)間間隔太短就會丟棄事件氓癌∥角可用于防抖操作,如防止雙擊
throttleFirst操作符代碼示例
RxView.clicks(button)
.throttleFirst(1, TimeUnit.SECONDS)
.subscribe(new Observer<Object>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Object o) {
Log.i(TAG, "do clicked!");
}
});
Single操作符
相當(dāng)于Observable的精簡版贪婉。觀察者回調(diào)的不是onNext/onError/onCompleted反粥,而是回調(diào)onSuccess/onError
subject操作符
既是事件的生產(chǎn)者,又是事件的消費(fèi)者
subject操作符代碼示例
Subject subject = PublishSubject.create();
subject.debounce(400, TimeUnit.MILLISECONDS)
.subscribe(new Subscriber<Object>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Object o) {
// request
}
});
edittext.addTextChangedListener(new TextWatcher() {
@Override
public void beforeTextChanged(CharSequence s, int start, int count, int after) { }
@Override
public void onTextChanged(CharSequence s, int start, int before, int count) {
subject.onNext(s.toString());
}
@Override
public void afterTextChanged(Editable s) { }
});
參考資料->RxJava詳解-由淺入深
這是當(dāng)時(shí)的文章名稱疲迂,如今去看作者已經(jīng)進(jìn)行了更新RxJava 從入門到出軌才顿,也是騷的不行~