原創(chuàng)博客地址
對于程序猿來說譬圣,Demo
是最好的起手。
而對于RxJava
來說,你可以簡單理解成:
- 是一個觀察者模式框架
- 替代
AsyncTask
成為更好的異步操作工具 - 即便邏輯再復(fù)雜絮识,對于
RxJava
來說就是:簡潔
首先上Demo
:
public static void main(String[] args) {
// 0.準(zhǔn)備一些數(shù)據(jù)
Integer[] numbers = { 1, 2, 3, 4 };
List<Integer> lists = Arrays.asList(numbers);
// 1.創(chuàng)建一個被觀察者
// 被觀察者很明顯從List集合獲取數(shù)據(jù)洼畅,現(xiàn)在就等著有人來訂閱~
Observable<Integer> observable = Observable.from(lists);
// 2.創(chuàng)建一個觀察者
// SubScriber是Observer的實(shí)現(xiàn)類抱环,所以也是一個觀察者
Observer<Integer> observer = new Observer<Integer>() {
@Override
public void onNext(Integer data) {
// 被觀察者發(fā)送的數(shù)據(jù)都會送到這里
System.out.println("Rx -- onNext:" + data);
}
@Override
public void onCompleted() {
// 被觀察者發(fā)送完數(shù)據(jù)會調(diào)用該方法
System.out.println("Rx -- Complete!");
}
@Override
public void onError(Throwable e) {
// 被觀察者傳輸數(shù)據(jù)中發(fā)生異常會調(diào)用該方法
System.out.println("Rx -- Error!");
}
};
// 3.訂閱
// 正常來說應(yīng)該是:observer.subscribe(observable); 看起來更合乎邏輯
// 這樣反而像是:被觀察者 訂閱了 觀察者(報(bào)紙 訂閱了 讀者)
// 這涉及到流式編程羽嫡,姑且先這樣記住吧
observable.subscribe(observer);
}
運(yùn)行結(jié)果:
- 在觀察者訂閱的順間撩笆,被觀察者就發(fā)送數(shù)據(jù)過來了
- 數(shù)據(jù)發(fā)送過來調(diào)用的方法:
onNext()
- 數(shù)據(jù)發(fā)送完成調(diào)用的方法:
onCompleted()
- 數(shù)據(jù)發(fā)送期間出現(xiàn)異常調(diào)用的方法:
onError()
不要看代碼多了篙悯,但邏輯很簡潔澜沟!只有邏輯上的簡潔才是真正的簡潔婴氮!
上面的Demo
看完一遍,大概知道有什么樣的角色在扮演课锌。
現(xiàn)在分析下每個角色:
觀察者
作用:接收數(shù)據(jù)并進(jìn)行處理
觀察者毫無疑問就是Observer
,但它是接口娶桦。在實(shí)際操作中,一般都使用它的抽象實(shí)現(xiàn)類Subscriber
。兩者使用方式完全一樣乡摹。
public abstract class Subscriber<T> implements Observer<T>, Subscription
現(xiàn)在來看看觀察者常用的創(chuàng)建方式:
第一種:new Observer()接口
Observer<Integer> observer = new Observer<Integer>(){
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer i) {
}
};
第二種:new Subscriber()抽象類
Subscriber<Integer> subscriber = new Subscriber<Integer>(){
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer i) {
}
};
Subscriber
中有一個方法:
/**
* This method is invoked when the Subscriber and Observable have been connected but the Observable has
* not yet begun to emit items or send notifications to the Subscriber. Override this method to add any
* useful initialization to your subscription, for instance to initiate backpressure.
*/
public void onStart() {
// do nothing by default
}
- 很明顯是留給調(diào)用者自己重寫的
- 英文好的可以自己看注釋
- 這里大致說下意思:這個方法是在觀察者和被觀察者已連接筋岛,但是被觀察者還沒有向觀察者發(fā)送數(shù)據(jù)時進(jìn)行調(diào)用。
- 所以,這個方法就是用來做初始化用的。
除此之外盘榨,Subscriber
實(shí)現(xiàn)的Subscription
接口還有兩個方法:
public interface Subscription {
void unsubscribe(); // 取消訂閱
boolean isUnsubscribed(); // 是否已經(jīng)取消訂閱
}
- 取消訂閱后棚亩,觀察者將不會再接收事件
- 取消之前先判斷一下
isUnsubscribed()
- 如果程序中沒有調(diào)用取消訂閱方法,被觀察者會始終持有觀察者的引用玻靡。造成內(nèi)存泄漏依溯。
被觀察者
作用:作為數(shù)據(jù)的發(fā)送方慷嗜,它決定什么時候發(fā)送,怎么發(fā)送
被觀察者Observable
到逊,Java
里也有已艰。很多地方都喜歡用這個單詞作為被觀察者痊末,這也是它的直譯。但是就因?yàn)槎家粯恿ú簦孕⌒?strong>不要導(dǎo)錯包了凿叠。
現(xiàn)在來看看被觀察者常用的創(chuàng)建方式:
第一種:Observable.create()
Observable observable = Observable.create(new Observable.OnSubscribe<Integer>(){
@Override
public void call(Subscriber<? super Integer> subscriber) {
}
});
-
create()
方法接收一個OnSubscribe
接口參數(shù) -
OnSubscribe
是Observable
的內(nèi)部接口
public interface OnSubscribe<T> extends Action1<Subscriber<? super T>> {
// cover for generics insanity
}
- 根據(jù)接口名,顧名思義嚼吞。當(dāng)觀察者被訂閱的時候盒件,會調(diào)用這個
call()
方法
- 下面舉個小例子:
public static void main(String[] args) {
// 觀察者
Observer<Integer> observer = new Observer<Integer>(){
@Override
public void onCompleted() {
System.out.println("接收數(shù)據(jù)結(jié)束");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer t) {
System.out.println("接收數(shù)據(jù):" + t);
}
};
// 被觀察者
Observable observable = Observable.create(new Observable.OnSubscribe<Integer>(){
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(1);
subscriber.onNext(2);
subscriber.onNext(3);
subscriber.onCompleted();
}
});
// 訂閱
observable.subscribe(observer);
}
運(yùn)行結(jié)果:
注意:
這個方法已經(jīng)被廢棄了,推薦使用
SyncOnSubscribe
或AsyncOnSubscribe
看名字應(yīng)該知道是什么意思
第二種:Observable.from()
Integer[] nums = {1, 2, 3};
Observable observable = Observable.from(nums);
- 從一個數(shù)組或
Iterable
中依次發(fā)送數(shù)據(jù)元素
第三種:Observable.just()
Observable observable = Observable.just(1, 2, 3);
- 這個更直接舱禽。將參數(shù)依次發(fā)送過來炒刁。
訂閱
observable.subscribe(observer);
其內(nèi)部實(shí)現(xiàn):
-
subscriber.onStart()
就是觀察者中內(nèi)置的用于初始化的方法 - 被觀察者.call(subscriber)就是
- 最后把觀察者當(dāng)成訂閱者返回。前面說過
public abstract class Subscriber<T> implements Observer<T>, Subscription
- 所以誊稚,你可以:
// 訂閱
Subscription subscription = observable.subscribe(observer);
// 取消訂閱
subscription.unsubscribe();
- 形成鏈?zhǔn)骄幊?/li>
關(guān)于Action
前面在被觀察者的第一種創(chuàng)建方式Observable.create()
中翔始,接收的參數(shù)是OnSubscribe
接口罗心。它繼承了Action1
。
public interface OnSubscribe<T> extends Action1<Subscriber<? super T>> {
// cover for generics insanity
}
- 當(dāng)被觀察者被訂閱時城瞎,
OnSubscribe
的call()
方法才會被調(diào)用 - 這個
call()
就是Action1
的
public interface Action1<T> extends Action {
void call(T t);
}
至于這個Action
渤闷,你可以理解為就是一次單純的行為,一個單純的回調(diào)全谤。
有很多的Actionx
public interface Action0 extends Action {
void call();
}
public interface Action1<T> extends Action {
void call(T t);
}
public interface Action2<T1, T2> extends Action {
void call(T1 t1, T2 t2);
}
public interface Action3<T1, T2, T3> extends Action {
void call(T1 t1, T2 t2, T3 t3);
}
- 0就代表call()方法沒有參數(shù)
- 1就代表call()方法有1個參數(shù)
- 2就代表call()方法有2個參數(shù)
- 至于ActionN接口
public interface ActionN extends Action {
void call(Object... args);
}
Observable.subscribe(..)
的時候肤晓,里面除了Observer
和Subscriber
這兩個觀察者之外。還可以接受一個Action
认然。
Action1<Integer> action1 = new Action1<Integer>() {
@Override
public void call(Integer num) {
System.out.println("接收到數(shù)據(jù):" + num);
}
};
observable.subscribe(action1);
常用方法
map
People[] peoples = new People[]{
new People("張三", 18, new String[]{"睡覺", "吃飯", "打豆豆"}),
new People("李四", 19, new String[]{"編程", "泡妞", "LOL"})
};
// 觀察者
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onNext(String name) {
System.out.println("接收信息:" + name);
}
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
};
// 被觀察者
Observable.from(peoples).map(new Func1<People, String>() {
@Override
public String call(People people) {
return people.getName();
}
}).subscribe(subscriber);
- 可以看到被觀察者從
People
數(shù)組里讀取每一個元素 - 在
map
方法里找到每一個元素對象的name
并傳遞給觀察者 - 觀察者接收并使用
- 這里轉(zhuǎn)換范圍很大补憾,不僅僅只是提取屬性。
運(yùn)行結(jié)果:
flatMap
People[] peoples = new People[]{
new People("張三", 18, new String[]{"睡覺", "吃飯", "打豆豆"}),
new People("李四", 19, new String[]{"編程", "泡妞", "LOL"})
};
// 觀察者
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onNext(String hobby) {
System.out.println("接收信息:" + hobby);
}
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
};
// 被觀察者
Observable.from(peoples).flatMap(new Func1<People, Observable<String>>() {
@Override
public Observable<String> call(People people) {
return Observable.from(people.getHobby());
}
}).subscribe(subscriber);
- 效果和
map
是類似的 - 區(qū)別在于
map
是用于一對一卷员,而flatMap
是用于一對多 - 被觀察者從
People
數(shù)組讀取每一個對象盈匾,call()
里讀取每一個對象的hobby
屬性,并依次返回其中的一個元素
運(yùn)行結(jié)果:
filter
People[] peoples = new People[]{
new People("張三", 18, new String[]{"睡覺", "吃飯", "打豆豆"}),
new People("李四", 19, new String[]{"編程", "泡妞", "LOL"})
};
// 觀察者
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onNext(String name) {
System.out.println("接收信息:" + name);
}
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
};
// 被觀察者
Observable.from(peoples).filter(new Func1<People, Boolean>() {
@Override
public Boolean call(People t) {
return t.getAge() > 18;
}
}).map(new Func1<People, String>() {
@Override
public String call(People people) {
return people.getName();
}
}).subscribe(subscriber);
運(yùn)行結(jié)果:
線程
RxJava
遵循的線程原則:在那個線程訂閱毕骡,則被觀察者和觀察者的操作都在該線程削饵。
通過Schedulers
切換線程。
-
Schedulers.immediate()
:默認(rèn)值未巫。在當(dāng)前線程運(yùn)行窿撬。 -
AndroidSchedulers.mainThread()
:在Android主線程運(yùn)行。- 注意:這個是
RxAndroid
里的叙凡。必須要導(dǎo)入RxAndroid
的jar
包劈伴。RxJava
里是沒有的。
- 注意:這個是
-
Schedulers.newThread()
:總是開啟新線程運(yùn)行握爷。 -
Schedulers.io()
:如果操作涉及到I/O使用該項(xiàng)跛璧。- 也是總是開啟新線程運(yùn)行
- 內(nèi)部有線程池和復(fù)用
-
Schedulers.computation()
:如果操作涉及到圖形計(jì)算等使用該項(xiàng)。
還是之前例子新啼,但是增加兩行代碼:
People[] peoples = new People[]{
new People("張三", 18, new String[]{"睡覺", "吃飯", "打豆豆"}),
new People("李四", 19, new String[]{"編程", "泡妞", "LOL"})
};
// 觀察者
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onNext(String name) {
System.out.println("接收信息:" + name);
}
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
};
// 被觀察者
Observable.from(peoples).filter(new Func1<People, Boolean>() {
@Override
public Boolean call(People t) {
return t.getAge() > 18;
}
}).map(new Func1<People, String>() {
@Override
public String call(People people) {
return people.getName();
}
}).subscribeOn(Schedulers.immediate()) // 當(dāng)前線程
.observeOn(Schedulers.io()) // io線程
.subscribe(subscriber);
-
被觀察者在新開起的IO線程做
讀取/過濾/轉(zhuǎn)換
操作 - 數(shù)據(jù)傳給觀察者
- 觀察者在當(dāng)前線程顯示數(shù)據(jù)
運(yùn)行結(jié)果:
總結(jié)
- RxJava確實(shí)是一個非常強(qiáng)大的流式編程工具
- 再復(fù)雜的邏輯追城,RxJava都能很簡潔的表示
- 一句代碼完成線程切換,很方便
- 用多了才知道它的美~