RxJava源碼打算花一周看看, 時間太少了, 不知道一周可以看幾個操作符;
按以下的demo來學習一個RxJava的思路;
demo:
Observable
.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
LogUtils.log(getClass(), "threadName:" + Thread.currentThread().getName());
e.onNext(1);
e.onComplete();
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
LogUtils.log(getClass(), "onSubscribe()");
}
@Override
public void onNext(Integer value) {
LogUtils.log(getClass(), "onNext()");
}
@Override
public void onError(Throwable e) {
LogUtils.log(getClass(), "onError()");
}
@Override
public void onComplete() {
LogUtils.log(getClass(), "onComplete()");
}
});
- 打印結(jié)果:
04-23 23:58:14.755 3681-3681/com.test V/AndroidTest: ->onSubscribe()
04-23 23:58:14.755 3681-3681/com.test V/AndroidTest: ->subscribe()
04-23 23:58:14.755 3681-3681/com.test V/AndroidTest: ->onNext()
04-23 23:58:14.755 3681-3681/com.test V/AndroidTest: ->onComplete()
ObservableEmitter.onNext, ObservableEmitter.onComplete到底做了哪些事情?
一、Observable.create:
1.1 Observable.create:
public abstract class Observable<T> implements ObservableSource<T> {
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
return new ObservableCreate<T>(source);
}
}
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
}
- 創(chuàng)建ObservableCreate實例, ObservableCreate持有ObservableOnSubscribe的引用;
二、Observable.subscribe:
2.1 Observable.subscribe:
public abstract class Observable<T> implements ObservableSource<T> {
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
subscribeActual(observer);
}
}
public final class ObservableCreate<T> extends Observable<T> {
@Override
protected void subscribeActual(Observer<? super T> observer) {
/**
* 1. CreateEmitter與Observer互相持有對方的引用, gc如果采用引用計數(shù)算法, 解決不了這種情況;
* 2. 可以看出來, 被觀察者持有CreateEmitter的引用, CreateEmitter持有觀察者Oberser的引用,
* 被觀察者Observable通過CreateEmitter.onXXX方法觸發(fā)觀察者Observer對應(yīng)的onXXX方法;
*/
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
source.subscribe(parent);
}
}
2.2 CreateEmitter:
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
}
- demo太短, 可能并不能體現(xiàn)這個操作流程的巧妙之處;
如果換成我們自己來寫觀察者模式, 可能出現(xiàn)的代碼是如下方式:
public void register() {
Observable observable = new Observable();
observable.addObserver(new Observer(1));
observable.addObserver(new Observer(2));
observable.addObserver(new Observer(3));
}
public void publish() {
observable.publish(something);
}
public class Observable {
public void publish(something) {
if(succ) {...}
else {...}
}
}
- 反觀Rxjava, 沒有那么多套路, 直接一個鏈式調(diào)用搞定;