要用RxJava,那么就得先了解兩個(gè)概念
Observable被觀察者
Observer觀察者
比如放羊,羊是被觀察者,放羊娃就是觀察者,當(dāng)羊餓了的時(shí)候,咩咩的叫個(gè)不停,相當(dāng)于向放羊娃發(fā)出信號(hào)說(shuō)我餓了,你看著辦,放羊娃呢,在聽(tīng)到羊的叫聲后就會(huì)做出相應(yīng)的處理.那么這里前提是二者要關(guān)聯(lián)成為觀察與被觀察的關(guān)系.否則沒(méi)有關(guān)聯(lián)的關(guān)系,放羊娃才不會(huì)那么傻去喂養(yǎng)和自己沒(méi)關(guān)系的羊.
觀實(shí)者與被觀察者就是這樣的狀態(tài),本來(lái)各不相干,但是一旦二者關(guān)聯(lián),那么觀察者就和被觀察者形成關(guān)系,觀察者會(huì)對(duì)被觀察者的言行舉止做出相應(yīng)的響應(yīng).
Observable被觀察者(小羊)
首先是被觀察者的創(chuàng)建:(羊來(lái)了)
Observable observable = Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter e) throws Exception {
? ? ? ? ? ? ? ? ? // ?羊咩咩的叫個(gè)不停于是用e向外發(fā)射信號(hào)
????????????????????e.onNext("我餓了");
}
});
此處有兩個(gè)概念:
ObservableOnSubscribe :?一個(gè)接口,創(chuàng)建被觀察者時(shí)需要傳入他的實(shí)例
ObservableEmitter :?事件發(fā)射器
create方法做了判空處理后,返回了ObservableCreate的實(shí)例
ObservableCreate是Observable的子類(lèi)實(shí)現(xiàn)
Observer?觀察者(放羊娃)
觀察者的創(chuàng)建
Observer observer = new Observer() {
@Override
public void onSubscribe(Disposable d) { }
@Override
public void onNext(String s) {
//?接收到羊發(fā)來(lái)的信號(hào)s,我知道啦,馬上給你準(zhǔn)備草去
}
@Override
public void onError(Throwable e) {}
@Override
public void onComplete() {? ? }
};
Observer用來(lái)對(duì)被觀察者發(fā)射出的事件做相應(yīng)的處理,也可理解為對(duì)監(jiān)聽(tīng)到被觀察者的相應(yīng)的狀態(tài)做出相應(yīng)的處理.
observable.subscribe(observer);
最后通過(guò)subscribe方法完成被觀察者與觀察者的訂閱連接?,此時(shí)羊與放羊娃明確關(guān)系.
subscribe方法內(nèi)部實(shí)現(xiàn)其實(shí)最終調(diào)用的是observable的subscribeActual?實(shí)現(xiàn)方法
@Override
protected void subscribeActual(Observer observer) {
CreateEmitter parent = new CreateEmitter(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
可以看到,該方法會(huì)將我們傳入的observer再次傳給CreateEmitter, CreateEmitter是ObservableCreater?的一個(gè)內(nèi)部類(lèi),同時(shí)實(shí)現(xiàn)了ObservableEmitter,Disposable這兩個(gè)接口,相當(dāng)于關(guān)聯(lián)了Observer的基礎(chǔ)上又做了進(jìn)一步的封裝,因?yàn)槠鋬?nèi)部實(shí)現(xiàn)最終事件的執(zhí)行都是通過(guò)傳入的observer來(lái)完成的
其持有了observer實(shí)例后,在可以執(zhí)行observer的相應(yīng)事件方法的基礎(chǔ)上進(jìn)一步的添加了一些控制訂閱,以及狀態(tài)判斷的方法.如下源碼可見(jiàn):
static final class CreateEmitter
extends AtomicReference
implements ObservableEmitter, Disposable {
private static final long serialVersionUID = -3434801548987643227L;
final Observer observer;
CreateEmitter(Observer observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
@Override
public void onError(Throwable t) {
if (!tryOnError(t)) {
RxJavaPlugins.onError(t);
}
}
@Override
public boolean tryOnError(Throwable t) {
if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
if (!isDisposed()) {
try {
observer.onError(t);
} finally {
dispose();
}
return true;
}
return false;
}
@Override
public void onComplete() {
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
dispose();
}
}
}
@Override
public void setDisposable(Disposable d) {
DisposableHelper.set(this, d);
}
@Override
public void setCancellable(Cancellable c) {
setDisposable(new CancellableDisposable(c));
}
@Override
public ObservableEmitter serialize() {
return new SerializedEmitter(this);
}
@Override
public void dispose() {
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
}
ObservableEmitter也是一個(gè)接口,是Emitter的子類(lèi),如下源碼:
public interface ObservableEmitter extends Emitter
public interface Emitter {
/**
* Signal a normal value.
* @param value the value to signal, not null
*/
void onNext(@NonNull T value);
/**
* Signal a Throwable exception.
* @param error the Throwable to signal, not null
*/
void onError(@NonNull Throwable error);
/**
* Signal a completion.
*/
void onComplete();
}
***********************************************************
public interface Disposable {
/**
* Dispose the resource, the operation should be idempotent.
*/
void dispose();
/**
* Returns true if this resource has been disposed.
* @return true if this resource has been disposed
*/
boolean isDisposed();
}
onSubscribe方法將觀察者和emmitter發(fā)射器關(guān)聯(lián)起來(lái),
往下走source.subscribe(parent);
此處的source就是我們?cè)趧?chuàng)建被觀察者是傳入的(new ObservableOnSubscribe())
至此可以看到ObservableOnSubscribe實(shí)例的subscriber方法將自身與關(guān)聯(lián)了Observer的事件發(fā)射器進(jìn)行了關(guān)聯(lián).
總結(jié)
1.被觀察者Observabler的創(chuàng)建最終返回的是ObservableCreate的實(shí)例,所以被觀察者Observabler的狀態(tài)以及事件的執(zhí)行都是通過(guò)ObservableCreate的內(nèi)部類(lèi)CreateEmitter來(lái)實(shí)現(xiàn)的,
2.CreateEmitter對(duì)訂閱的Observer進(jìn)行了關(guān)聯(lián)封裝
3.被觀察者Observable通過(guò)subscribe方法將觀察者傳給他的內(nèi)部類(lèi)CreateEmitter用來(lái)進(jìn)行關(guān)聯(lián)封裝.
4.ObservableCreate創(chuàng)建的時(shí)候接收了Observable.create方法傳入的(newObservableOnSubscribe())并與自身的內(nèi)部類(lèi)CreateEmitter進(jìn)行了關(guān)聯(lián)
5.被觀察者Observabler要執(zhí)行的事件都會(huì)在傳入的ObservableOnSubscribe實(shí)例的subscriber方法中進(jìn)行操作再經(jīng)由關(guān)聯(lián)的CreateEmitter向外發(fā)射.最終由CreateEmitter關(guān)聯(lián)封裝的Observer接收事件并作出響應(yīng).