一莫鸭、基礎(chǔ)概述
RxJava
的關(guān)鍵是異步,即使隨著程序的邏輯變得復(fù)雜横殴,它依然能夠保持簡潔被因。
二、API
介紹和原理剖析
觀察者模式面向的需求是:A
對(duì)象(觀察者)對(duì)B
對(duì)象(被觀察者)的某種變化高度敏感,需要在B
變化的一瞬間做出反應(yīng)梨与,觀察者采用注冊(cè)Register
或者訂閱Subscribe
的方式堕花,告訴觀察者,我需要你的某某狀態(tài)蛋欣,并在它變化的時(shí)候通知我航徙,在RxJava
當(dāng)中,Observable
是被觀察者陷虎,Observer
就是觀察者到踏。
RxJava
有四個(gè)基本概念:
-
Observable
:被觀察者。 -
Observer
:觀察者尚猿。 -
Subscribe
:訂閱窝稿。 -
Event
:事件。
Observable
和Observer
通過subscribe
方法實(shí)現(xiàn)訂閱關(guān)系凿掂,Observable
可以在需要的時(shí)候發(fā)出事件來通知Observer
伴榔。
RxJava
有以下三種事件:
-
onNext
:普通事件。 -
onCompleted
:RxJava
不僅把每個(gè)事件單獨(dú)處理庄萎,還會(huì)把它們看作一個(gè)隊(duì)列踪少,當(dāng)不會(huì)再有新的onNext
事件發(fā)出時(shí),需要觸發(fā)onCompleted
事件作為標(biāo)志糠涛。 -
onError
:onCompleted
和有且僅有一個(gè)援奢,并且是事件序列中的最后一個(gè)。
三忍捡、基本實(shí)現(xiàn)
RxJava
的基本實(shí)現(xiàn)有以下三點(diǎn):
1)創(chuàng)建觀察者 - Observer
Observer<String> observer = new Observer<String>() {
@Override
public void onCompleted() {
Log.d(TAG, "onCompleted");
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError");
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext");
}
};
除了Observer
接口之外集漾,RxJava
還內(nèi)置了一個(gè)實(shí)現(xiàn)了Observer
的抽象類:Subscriber
,它對(duì)Observer
接口進(jìn)行了一些擴(kuò)展砸脊,實(shí)質(zhì)上在RxJava
的subscribe
過程中具篇,Observer
也總是被轉(zhuǎn)換成為一個(gè)Subscriber
再使用,他們的區(qū)別在與:
-
onStart
:這是新增的方法凌埂,它會(huì)在subscribe
剛開始驱显,而事件還未發(fā)送之前被調(diào)用,它總是在subscribe
所發(fā)生的線程被調(diào)用瞳抓。 -
unsubscribe
:這是它實(shí)現(xiàn)的另一個(gè)接口Subscription
的方法埃疫,用于取消訂閱,在這個(gè)方法被調(diào)用后挨下,Subscriber
將不再接收事件,一般在調(diào)用這個(gè)方法前脐湾,可以使用isUnsubscribed
判斷一下狀態(tài)臭笆,Observable
在訂閱之后會(huì)持有Subscriber
的引用,因此不釋放會(huì)有內(nèi)存泄漏的危險(xiǎn)。
2)創(chuàng)建被觀察者 - Observable
RxJava
用create
方法來創(chuàng)建一個(gè)observable
愁铺,
rx.Observable observable = rx.Observable.create(new rx.Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("Hello World!");
subscriber.onCompleted();
}
});
這里傳入了一個(gè)Observable.OnSubscribe<T>
對(duì)象作為參數(shù)鹰霍,它會(huì)被存儲(chǔ)在返回的Observable
對(duì)象當(dāng)中,它的作用相當(dāng)于一個(gè)計(jì)劃表茵乱,當(dāng)Observable
被訂閱的時(shí)候茂洒,OnSubscribe
的call
方法會(huì)自動(dòng)被調(diào)用,事件序列被依次觸發(fā)瓶竭。
create
是RxJava
最基本的創(chuàng)造事件序列的方法督勺,基于這個(gè)方法,還提供了一些快捷方法來創(chuàng)建事件隊(duì)列:
just(T...)
Observable observable = Observable.just("Hello", "Hi", "Aloha");
from(T[]) / from(Iterable<? extends T>)
String[] words = {"Hello", "Hi", "Aloha"};
Observable observable = Observable.from(words);
3)訂閱 - subscribe
observable.subscribe(observer);
observable.subscribe(subscriber);
其內(nèi)部核心的代碼類似于:
public Subscription subscribe(Subscriber subscriber) {
//準(zhǔn)備方法斤贰。
subscriber.onStart();
//事件發(fā)送的邏輯開始執(zhí)行智哀,這個(gè)onSubscribe就是創(chuàng)建Observable時(shí)新建的OnSubscribe對(duì)象。
onSubscribe.call(subscriber);
//把傳入的Subscriber轉(zhuǎn)換為Subscription并返回荧恍,方便unsubscribe瓷叫。
return subscriber;
}
Observable.subscribe
方法除了支持傳入Observer
和Subscriber
,還支持傳入Action0
送巡、Action1
這樣不完整定義的回調(diào)摹菠,RxJava
會(huì)自動(dòng)根據(jù)定義創(chuàng)建出Subscriber
。
四骗爆、線程控制
在不指定線程的情況下次氨,RxJava
遵循這樣的原則,在哪個(gè)線程調(diào)用subscribe
淮腾,就在哪個(gè)線程產(chǎn)生事件糟需,在哪個(gè)線程產(chǎn)生事件,就在哪個(gè)線程消費(fèi)事件谷朝,如果需要消費(fèi)線程洲押,那么就需要用到Scheduler
, RxJava
內(nèi)置了幾個(gè)Scheduler
:
-
Schedulers.immediate
:直接在當(dāng)前線程運(yùn)行圆凰。 -
Schedulers.newThread
:總是啟用新線程杈帐,并在線程執(zhí)行操作。 -
Schedulers.io
:其內(nèi)部實(shí)現(xiàn)是一個(gè)無數(shù)量上限的的線程池专钉,可以重用空閑的線程挑童,不要把計(jì)算工作放在io
,可以避免創(chuàng)建不必要的線程跃须。 -
Schedulers.computation
:使用固定的線程池站叼,大小為CPU
核數(shù)。 -
AndroidSchedulers.mainThread
:指定的操作將在Android
主線程中運(yùn)行菇民。
對(duì)線程控制有以下兩個(gè)方法:
-
subscribeOn
:指定subscribe
發(fā)生的線程尽楔,即Observable.OnSubscribe
被激活時(shí)所處的線程投储,也就是call
方法執(zhí)行時(shí)所處的線程。 -
observeOn
:指定Subscriber
所運(yùn)行在的線程阔馋。
observeOn
指定的是Subscriber
的線程玛荞,而這個(gè)Subscriber
并不一定是subscribe()
參數(shù)中的Subscriber
,而是observeOn
執(zhí)行時(shí)的當(dāng)前Observable
所對(duì)應(yīng)的Subscriber
呕寝,即它的直接下級(jí)Subscriber
勋眯,也就是它之后的操作所在的線程,因此下梢,如果有多次切換線程的要求客蹋,只要在每個(gè)想要切換線程的位置調(diào)用依次observeOn
即可。
和observeOn
不同怔球,subscribeOn
只能調(diào)用一次嚼酝,下面我們來分析一下它的內(nèi)部實(shí)現(xiàn),首先是subscribeOn
的原理:
subscribeOn
和ObserveOn
都做了線程切換的工作:
-
subscribeOn
的線程切換發(fā)生在OnSubscribe
中竟坛,即在它通知上一級(jí)的OnSubscribe
時(shí)闽巩,這時(shí)事件還沒有發(fā)送,因此subscribeOn
的線程控制可以從事件發(fā)出的開端造成影響担汤。
-
observeOn
的線程切換則發(fā)生在它內(nèi)建的Subscriber
中涎跨,即發(fā)生在它即將給下一級(jí)Subscriber
發(fā)送事件時(shí),因此控制的是它后面的線程崭歧。
五隅很、變換
變換,就是將事件序列中的對(duì)象或整個(gè)序列進(jìn)行加工處理率碾,轉(zhuǎn)換不同的事件或者序列叔营。
5.1 map()
通過FuncX
,把參數(shù)中的Integer
轉(zhuǎn)換成為String
所宰,是最常用的變換绒尊,這個(gè)變換是發(fā)生在subscribeOn
所指定的線程當(dāng)中的。
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
long nextId = Thread.currentThread().getId();
Log.d(TAG, "onNext:" + s + ", threadId=" + nextId);
}
};
Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
long callId = Thread.currentThread().getId();
subscriber.onNext(5);
subscriber.onCompleted();
}
});
observable.map(new Func1<Integer, String>() {
@Override
public String call(Integer integer) {
long mapId = Thread.currentThread().getId();
return "My Number is:" + integer;
}
}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(subscriber);
其示意圖類似于:
5.2 flatMap
它和map
有一個(gè)共同點(diǎn)仔粥,就是把傳入的參數(shù)轉(zhuǎn)化之后返回另一個(gè)對(duì)象婴谱,但是和map
不同的是,flatMap
返回的是一個(gè)Observable
對(duì)象躯泰,而且它并不直接把這個(gè)對(duì)象傳給Subscriber
谭羔,而是通過這個(gè)新建的Observable
來發(fā)送事件,其整個(gè)的調(diào)用過程:
- 使用傳入的事件對(duì)象創(chuàng)建一個(gè)
Observable
麦向。 - 激活這個(gè)
Observable
瘟裸,通過它來發(fā)送事件。 - 每一個(gè)創(chuàng)建出來的
Observable
發(fā)送的事件诵竭,被匯入同一個(gè)Observable
话告,它復(fù)雜將這些事件同一交給Subscriber
的回調(diào)方法十办。
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onCompleted() {}
@Override
public void onError(Throwable e) {}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext, s=" + s);
}
};
Observable<List<String>> observable = Observable.create(new Observable.OnSubscribe<List<String>>() {
@Override
public void call(Subscriber<? super List<String>> subscriber) {
List<String> list = new ArrayList<>();
list.add("First");
list.add("Second");
list.add("Third");
subscriber.onNext(list);
}
});
observable.flatMap(new Func1<List<String>, Observable<String>>() {
@Override
public Observable<String> call(List<String> strings) {
return Observable.from(strings);
}
}).subscribe(subscriber);
其示意圖:
六、變換的原理
變換的實(shí)質(zhì)是針對(duì)事件序列的處理和再發(fā)送超棺,在RxJava
的內(nèi)部,它們是基于同一個(gè)基礎(chǔ)的變換方法lift(operator)
//生成了一個(gè)新的Observable并返回呵燕。
public <R> Observable<R> lift(Operator<? extends R, ? super T> operator) {
//構(gòu)造新的Observable時(shí)棠绘,同時(shí)新建了一個(gè)OnSubscribe對(duì)象。
return Observable.create(new OnSubscribe<R>() {
@Override
public void call(Subscriber subscriber) {
Subscriber newSubscriber = operator.call(subscriber);
newSubscriber.onStart();
//原始的onSubscribe再扭。
onSubscribe.call(newSubscriber);
}
});
}
示意圖:
-
lift
創(chuàng)建了一個(gè)Observable
后氧苍,加上之前的原始Observable
,有兩個(gè)Observable
泛范。 - 新的
Observable
里的OnSubscribe
加上原始的让虐,共有兩個(gè)OnSubscribe
。 - 當(dāng)用戶通過調(diào)用
lift/map
創(chuàng)建的Observable
對(duì)象的subscribe
方法時(shí)罢荡,于是它觸發(fā)了上面的call
方法中的內(nèi)容赡突。 - 在這個(gè)新的
OnSubscribe
的call
方法中,傳入了目標(biāo)的Subscriber
区赵,同時(shí)其外部類中還持有了原始的OnSubscribe
惭缰。我們先通過operator.call(oldSubscriber)
方法,生成了新的Subscriber(new Subscriber)
笼才,然后利用這個(gè)新的Subscriber
向原始的Observable
進(jìn)行訂閱漱受。
下面我們以前面map
實(shí)現(xiàn)的例子來分析一下源碼,上面的例子通過map
操作符把Integer
類型的Observable
和String
類型的Subscriber
生成了訂閱關(guān)系骡送。
-
map
方法昂羡,它通過lift
方法返回了一個(gè)String
類型的Observable
。
//其中T=Integer摔踱,R=String虐先。
public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
return lift(new OperatorMap<T, R>(func));
}
- 下面看下
OperatorMap
這個(gè)對(duì)象,這個(gè)對(duì)象實(shí)現(xiàn)了operator<R,T>
接口昌渤,而這個(gè)接口繼承于Func1<Subscriber<? super R>, Subscriber<? super T>>
赴穗,在它實(shí)現(xiàn)的call
方法中傳入了String
類型的Subscriber
(目標(biāo)Subscriber
),并返回了Integer
類型的Subscriber
(代理Subscriber
)膀息,當(dāng)它的方法被回調(diào)時(shí)般眉,會(huì)調(diào)用目標(biāo)Subscriber
的對(duì)應(yīng)方法,其中在調(diào)用onNext
時(shí)潜支,就用上了外部傳入的Func1
函數(shù):
@Override
public Subscriber<? super T> call(final Subscriber<? super R> o) {
return new Subscriber<T>(o) {
@Override
public void onCompleted() {
o.onCompleted();
}
@Override
public void onError(Throwable e) {
o.onError(e);
}
@Override
public void onNext(T t) {
try {
o.onNext(transformer.call(t));
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
onError(OnErrorThrowable.addValueAsLastCause(e, t));
}
}
};
}
- 接著再回過頭來看
lift
方法:
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
return new Observable<R>(new OnSubscribe<R>() {
@Override
public void call(Subscriber<? super R> o) {
try {
//返回一個(gè)Integer類型的Subscriber甸赃。
Subscriber<? super T> st = hook.onLift(operator).call(o);
try {
st.onStart();
//關(guān)鍵方法:Integer類型的OnSubscribe調(diào)用對(duì)應(yīng)的Subscribe,這個(gè)call方法里面寫了我們的邏輯冗酿,當(dāng)它調(diào)用onNext(Integer integer)時(shí)埠对,實(shí)際上調(diào)用的是onNext(String str)络断。
onSubscribe.call(st);
} catch (Throwable e) {
if (e instanceof OnErrorNotImplementedException) {
throw (OnErrorNotImplementedException) e;
}
st.onError(e);
}
} catch (Throwable e) {
if (e instanceof OnErrorNotImplementedException) {
throw (OnErrorNotImplementedException) e;
}
o.onError(e);
}
}
});
}
- 最后就是調(diào)用
subscribe
方法。