代碼示例
Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
LogUtils.loge("subscriber call ...");
if (!subscriber.isUnsubscribed()) {
subscriber.onNext("test1");
subscriber.onCompleted();
}
}
});
Observer<String> observer = new Observer<String>() {
@Override
public void onCompleted() {
LogUtils.loge("Observer onCompleted");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
LogUtils.loge("Observer onNext s = " + s);
}
};
Subscription subscription = observable.subscribe(observer);
常用類說明
被觀察者
rx.Observable
訂閱
rx.Subscription
public interface Subscription {
void unsubscribe();
boolean isUnsubscribed();
}
觀察者
rx.Observer
public interface Observer<T> {
void onCompleted();
void onError(Throwable e);
void onNext(T t);
}
執(zhí)行流程
rx.Observable#create(rx.Observable.OnSubscribe<T>)
public static <T> Observable<T> create(OnSubscribe<T> f) {
// 加載RxJavaHooks的static,初始化資源
// 返回一個Observable對象
return new Observable<T>(RxJavaHooks.onCreate(f));
}
rx.plugins.RxJavaHooks#onCreate(rx.Observable.OnSubscribe<T>)
public static <T> Observable.OnSubscribe<T> onCreate(Observable.OnSubscribe<T> onSubscribe) {
Func1<Observable.OnSubscribe, Observable.OnSubscribe> f = onObservableCreate;
if (f != null) {
// 這里其實是調(diào)用到了onObservableCreate的call方法
return f.call(onSubscribe);
}
return onSubscribe;
}
rx.plugins.RxJavaHooks
static {
init();
}
static void init() {
onObservableStart = new Func2<Observable, Observable.OnSubscribe, Observable.OnSubscribe>() {
@Override
public Observable.OnSubscribe call(Observable t1, Observable.OnSubscribe t2) {
// 調(diào)用開始訂閱的方法
return RxJavaPlugins.getInstance().getObservableExecutionHook().onSubscribeStart(t1, t2);
}
};
onObservableReturn = new Func1<Subscription, Subscription>() {
@Override
public Subscription call(Subscription f) {
return RxJavaPlugins.getInstance().getObservableExecutionHook().onSubscribeReturn(f);
}
};
initCreate();
}
static void initCreate() {
onObservableCreate = new Func1<Observable.OnSubscribe, Observable.OnSubscribe>() {
@Override
public Observable.OnSubscribe call(Observable.OnSubscribe f) {
/*
這里1. 初始化RxJavaObservableExecutionHook
2. 返回我們傳入的Observable.OnSubscribe
*/
return RxJavaPlugins.getInstance().getObservableExecutionHook().onCreate(f);
}
};
}
rx.plugins.RxJavaPlugins#getObservableExecutionHook
public RxJavaObservableExecutionHook getObservableExecutionHook() {
if (observableExecutionHook.get() == null) {
// 從系統(tǒng)配置文件中查找一個RxJavaObservableExecutionHook的實現(xiàn)類
Object impl = getPluginImplementationViaProperty(RxJavaObservableExecutionHook.class, System.getProperties());
// impl = null
if (impl == null) {
// 沒有找到就使用這個默認的RxJavaObservableExecutionHookDefault實現(xiàn)類
observableExecutionHook.compareAndSet(null, RxJavaObservableExecutionHookDefault.getInstance());
// we don't return from here but call get() again in case of thread-race so the winner will always get returned
} else {
// we received an implementation from the system property so use it
observableExecutionHook.compareAndSet(null, (RxJavaObservableExecutionHook) impl);
}
}
return observableExecutionHook.get();
}
rx.plugins.RxJavaObservableExecutionHookDefault
rx.Observable#subscribe(rx.Observer<? super T>)
public final Subscription subscribe(final Observer<? super T> observer) {
// 使用ObserverSubscriber對observer進行包裝
return subscribe(new ObserverSubscriber<T>(observer));
}
rx.Observable#subscribe(rx.Subscriber<? super T>)
public final Subscription subscribe(Subscriber<? super T> subscriber) {
return Observable.subscribe(subscriber, this);
}
rx.Observable#subscribe(rx.Subscriber<? super T>, rx.Observable<T>)
static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
// new Subscriber so onStart it
subscriber.onStart();
// 保證線程安全
if (!(subscriber instanceof SafeSubscriber)) {
// assign to `observer` so we return the protected version
subscriber = new SafeSubscriber<T>(subscriber);
}
try {
// 調(diào)用先RxJavaHooks的onObservableStart的call方法,然后再調(diào)用我們在activity中定義的onSubscribe的call方法
// 這里其實就是調(diào)用了開始訂閱
RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
return RxJavaHooks.onObservableReturn(subscriber);
}
}
rx.plugins.RxJavaHooks#onObservableStart
public static <T> Observable.OnSubscribe<T> onObservableStart(Observable<T> instance, Observable.OnSubscribe<T> onSubscribe) {
Func2<Observable, Observable.OnSubscribe, Observable.OnSubscribe> f = onObservableStart;
if (f != null) {
return f.call(instance, onSubscribe);
}
return onSubscribe;
}
rx.plugins.RxJavaHooks#onObservableReturn
public static Subscription onObservableReturn(Subscription subscription) {
Func1<Subscription, Subscription> f = onObservableReturn;
if (f != null) {
return f.call(subscription);
}
return subscription;
}
ObserverSubscriber類
public final class ObserverSubscriber<T> extends Subscriber<T> {
final Observer<? super T> observer;
public ObserverSubscriber(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
// 調(diào)用observer方法
observer.onNext(t);
}
@Override
public void onError(Throwable e) {
observer.onError(e);
}
@Override
public void onCompleted() {
// 調(diào)用observer方法
observer.onCompleted();
}
}
源碼閱讀總結(jié)
Subscription關(guān)聯(lián)觀察者和訂閱者
static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
try {
// 調(diào)用先RxJavaHooks的onObservableStart的call方法夸楣,然后再調(diào)用我們在activity中定義的onSubscribe的call方法
// 這里其實就是調(diào)用了開始訂閱
RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
return RxJavaHooks.onObservableReturn(subscriber);
}
}
observable.onSubscribe執(zhí)行
RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
課堂總結(jié)
Observable
- 觀察得到的-被觀察者
- 通過Observable創(chuàng)建一個可觀察的序列(create方法)
- 通過subscribe去注冊一個觀察者
Observer
- 用于接收數(shù)據(jù)-觀察者
- 作為Observable的subsceibe方法的參數(shù)
Subscription
- 訂閱,用于描述被觀察者和觀察者之間的關(guān)系
- 用于取消訂閱和獲取當前訂閱狀態(tài)
OnSubscribe
- 當訂閱時會觸發(fā)此接口的調(diào)用
- 在Observable內(nèi)部凸丸,實際作用是向訂閱者發(fā)射數(shù)據(jù)
Subscribe
- 實現(xiàn)了Observer和Subscription
- 只有自己才能阻止自己