先來(lái)個(gè)RxAndroid的github地址
官方例子
Observable.just("one", "two", "three", "four", "five")
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())吧
.subscribe(/* an Observer */);
- - -
###簡(jiǎn)化例子
```java
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(LOG_TAG,"[onSubscribe] " + Thread.currentThread().getId());
}
@Override
public void onNext(String value) {
Log.i(LOG_TAG,"[onNext] "+value + Thread.currentThread().getId());
}
@Override
public void onError(Throwable e) {
Log.i(LOG_TAG,"[onError] "+e);
}
@Override
public void onComplete() {
Log.i(LOG_TAG,"[onComplete] "+Thread.currentThread().getId());
}
};
Observable.just("next -- > 1","next --> 2")
.subscribe(observer);
- 接下來(lái)看看Observable.just()方法的實(shí)現(xiàn)
public static <T> Observable<T> just(T item1, T item2) {
ObjectHelper.requireNonNull(item1, "The first item is null");
ObjectHelper.requireNonNull(item2, "The second item is null");
/*
ObjectHelper.requireNonNull(item1, "The first item is null");
這個(gè)方法僅僅是判斷item1是不是null
public static <T> T requireNonNull(T object, String message) {
if (object == null) {
throw new NullPointerException(message);
}
return object;
}
*/
return fromArray(item1, item2);
}
- 可以看到j(luò)ust方法最后調(diào)用了 fromArray() 方法 接下來(lái)看看fromArray方法的實(shí)現(xiàn)
public static <T> Observable<T> fromArray(T... items) { ObjectHelper.requireNonNull(items, "items is null"); if (items.length == 0) { return return RxJavaPlugins.onAssembly((Observable<T>) ObservableEmpty.INSTANCE); } else if (items.length == 1) { return return RxJavaPlugins.onAssembly(new ObservableJust<T>(items[0])); } return RxJavaPlugins.onAssembly(new ObservableFromArray<T>(items)); }
- 可以看到fromArray方法就是調(diào)用了RxJavaPlugins.onAssembly這個(gè)方法怀喉,根據(jù)items的長(zhǎng)度不同傳遞了不同的參數(shù)
- 先分析items.length == 1的情況迅诬,這個(gè)情況下傳入的實(shí)例是:new ObservableJust<T>(items[0]))刺覆,對(duì)應(yīng)上面的例子就是new ObservableJust<String>("one"));
public final class ObservableJust<T> extends Observable<T> implements ScalarCallable<T> { private final T value; public ObservableJust(final T value) { this.value = value; } @Override protected void subscribeActual(Observer<? super T> s) { ScalarDisposable<T> sd = new ScalarDisposable<T>(s, value); s.onSubscribe(sd); sd.run(); } @Override public T call() { return value; }
}
- 這里有些疑問(wèn)。subscribeActual()這個(gè)方法是干什么的?以及里面的**ScalarDisposable**又是做什么的肘交?先放一放后面再說(shuō)
- 再來(lái)看看**RxJavaPlugins.onAssembly**這個(gè)方法
```java
public static <T> Observable<T> onAssembly(Observable<T> source) {
Function<Observable, Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
- 這里有一個(gè)非空的判斷(以后再說(shuō)這個(gè))氯窍,可以先理解成就直接吧source返回
- 小結(jié)一哈just方法,首先是判斷傳入的item是不是空锤躁,如果不是空就調(diào)用了fromArray方法搁料,在fromArray里面構(gòu)造了一個(gè)Observable對(duì)象,然后直接返回系羞。
- just方法構(gòu)造完了以后就調(diào)用了subscribe()方法并傳入了一個(gè)Observer對(duì)象郭计。看看suubscribe的核心代碼(就兩句話)
public final void subscribe(Observer<? super T> observer) { observer = RxJavaPlugins.onSubscribe(this, observer); subscribeActual(observer); }
- 先來(lái)看看RxJavaPlugins.onSubscribe(this, observer)這個(gè)究竟做了什么事~~
public static <T> Observer<? super T> onSubscribe(Observable<T> source, Observer<? super T> observer) { BiFunction<Observable, Observer, Observer> f = onObservableSubscribe; if (f != null) { return apply(f, source, observer); } return observer; }
- 這里面又有一個(gè)非空的判斷椒振,不管這個(gè)非空判斷昭伸,也就是直接返回了傳入的observer對(duì)象
- 現(xiàn)在代碼就比較清晰了,其實(shí)就是直接調(diào)用了subscribeActual(observer)這個(gè)方法
-
小結(jié)一哈澎迎,Observable.just()根據(jù)參數(shù)的長(zhǎng)度構(gòu)造了一個(gè)特定的Observable對(duì)象并返回庐杨,然后調(diào)用了該對(duì)象的subscribeActual方法并傳入observer
- 接下來(lái)再來(lái)看前面留下的問(wèn)題,fromArray方法里面有根據(jù)items的長(zhǎng)度進(jìn)行實(shí)例化不同的Observable
- item長(zhǎng)度為1的時(shí)候 --> ObservableJust
- 它的subscribeActual()方法
ScalarDisposable<T> sd = new ScalarDisposable<T>(s, value);
s.onSubscribe(sd);
sd.run();
}- 這里又引入了一個(gè)新的類**ScalarDisposable**夹供,來(lái)看看這個(gè)又是做什么的 ```java public static final class ScalarDisposable<T> extends AtomicInteger //這個(gè)類是用來(lái)原子操作的類,java里面i++都不是線程安全的~~ implements QueueDisposable<T>, Runnable { private static final long serialVersionUID = 3880992722410194083L; final Observer<? super T> observer; final T value; static final int START = 0; static final int FUSED = 1; static final int ON_NEXT = 2; static final int ON_COMPLETE = 3; public ScalarDisposable(Observer<? super T> observer, T value) { this.observer = observer; this.value = value; } //中間省略了一大堆方法~~ @Override public void run() { if (get() == START && compareAndSet(START, ON_NEXT)) { //上面就是比較和賦值原子操作 observer.onNext(value);//在這里可以看到調(diào)用了onNext() if (get() == ON_NEXT) { lazySet(ON_COMPLETE); observer.onComplete(); } } } }
- 它的subscribeActual()方法
- 當(dāng)items的長(zhǎng)度大于1的時(shí)候 --> ObservableFromArray
- 它的subscribeActual()方法
public void subscribeActual(Observer<? super T> s) { FromArrayDisposable<T> d = new FromArrayDisposable<T>(s, array); s.onSubscribe(d); if (d.fusionMode) { return; } d.run(); }
- 這里又出現(xiàn)了一個(gè)新的Disposable --> FromArrayDisposable灵份,但是不管怎么樣兒,最后都調(diào)用了d.run()方法
- item長(zhǎng)度為1的時(shí)候 --> ObservableJust
static final class FromArrayDisposable<T> extends BasicQueueDisposable<T> {
final Observer<? super T> actual;
final T[] array;
int index;
boolean fusionMode;
volatile boolean disposed;
FromArrayDisposable(Observer<? super T> actual, T[] array) {
this.actual = actual;
this.array = array;
}
void run() {
T[] a = array;
int n = a.length;
for (int i = 0; i < n && !isDisposed(); i++) {//for循環(huán)調(diào)用撒~~
T value = a[i];
if (value == null) {
actual.onError(new NullPointerException("The " + i + "th element is null"));
return;
}
actual.onNext(value);//調(diào)用onNext方法
}
if (!isDisposed()) {
actual.onComplete();
}
}
}
```
小結(jié):
調(diào)用just的時(shí)候構(gòu)造了一個(gè)Observable對(duì)象罩引,并根據(jù)不同的參數(shù)實(shí)例化不同的Observable各吨,不同的Observable有不同的subscribeActual()方法實(shí)現(xiàn),subscribeActual方法里面都有一個(gè)Disposable對(duì)象,最后都調(diào)用了Disposable的run(該方法調(diào)用了onNext()方法)方法揭蜒,最后在subscribe的時(shí)候?qū)嶋H上就是調(diào)用了Observable的subscribeActual方法横浑。
線程切換分析
eg:
Observable.just("next -- > 1","next --> 2")
.subscribeOn(AndroidSchedulers.mainThread())
.observeOn(Schedulers.newThread())
.subscribe(observer);
-
subscribeOn
public final Observable<T> subscribeOn(Scheduler scheduler) { ObjectHelper.requireNonNull(scheduler, "scheduler is null"); return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler)); }
- 可以看到這里也是調(diào)用了RxJavaPlugins.onAsswmbly()方法,只是這里的參數(shù)變成了ObservableSubscribeOn的實(shí)例屉更。
- ObservableSubscribeOn
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> { final Scheduler scheduler; public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) { super(source); this.scheduler = scheduler; } @Override public void subscribeActual(final Observer<? super T> s) { final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s); s.onSubscribe(parent); parent.setDisposable(scheduler.scheduleDirect(new Runnable() { @Override public void run() { source.subscribe(parent); } })); } ....省略其他代碼
- 可以看到里面主要是調(diào)用了徙融,Scheduler的schedulerDirect()方法,并在這個(gè)里面調(diào)用了瑰谜,source.subscribe()
- 這里我們就僅僅去看看Scheduler.newThread()的實(shí)現(xiàn)
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, TimeUnit unit, DisposableContainer parent) {
//省略部分代碼....
Future<?> f;
try {
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
return sr;
}
```
- 這里就可以看出來(lái)實(shí)際上就是用的線程池來(lái)做的~~
- observeOn
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
- 這個(gè)也是調(diào)用了RxJavaPlugins的onAssembly方法欺冀,傳入的對(duì)象是ObservableObserveOn的實(shí)例。
- ObservableObserveOn
//僅僅提出了核心代碼哈 protected void subscribeActual(Observer<? super T> observer) { if (scheduler instanceof TrampolineScheduler) { source.subscribe(observer); } else { Scheduler.Worker w = scheduler.createWorker(); source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize)); } }
- 這里對(duì)傳入的Scheduler進(jìn)行了判斷萨脑,如果是TrampolineScheduler類型就直接調(diào)用了隐轩,Source的subscribe方法,這個(gè)Scource其實(shí)就是調(diào)用observeOn方法的Observable
- 先來(lái)看看當(dāng)Scheduler是newThreadScheduler的時(shí)候渤早,可以看到實(shí)例化了一個(gè)ObserveOnObserver
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T> implements Observer<T>, Runnable { private static final long serialVersionUID = 6576896619930983584L; final Observer<? super T> actual; final Scheduler.Worker worker; final boolean delayError; final int bufferSize; SimpleQueue<T> queue; Disposable s; Throwable error; volatile boolean done; volatile boolean cancelled; int sourceMode; boolean outputFused; ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) { this.actual = actual; this.worker = worker; this.delayError = delayError; this.bufferSize = bufferSize; } //省略了很多代碼.... @Override public void onNext(T t) { if (done) { return; } if (sourceMode != QueueDisposable.ASYNC) { queue.offer(t); } schedule(); } ``` - 從源碼可以看出來(lái)职车,**ObserveOnObserver**其實(shí)就是對(duì)Observer的一個(gè)包裝 - 在**onNext**方法中可以看到線程切換的代碼
小結(jié):
其實(shí)搞了半天就是一個(gè)線程池在里面切換,對(duì)對(duì)象的各種包裝鹊杖。subscribeOn就是對(duì)Observable的包裝悴灵,切換了線程來(lái)調(diào)用source.subscribe()方法,而observeOn則是對(duì)Observer的包裝骂蓖,并重寫了里面的回調(diào)方法积瞒,在回調(diào)的時(shí)候會(huì)自動(dòng)切換線程。
AndroidSchedulers.mainThread()這個(gè)Scheduler的分析
public final class AndroidSchedulers {
private static final class MainHolder {
static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
}
private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
new Callable<Scheduler>() {
@Override public Scheduler call() throws Exception {
return MainHolder.DEFAULT;
}
});
/** A {@link Scheduler} which executes actions on the Android main thread. */
public static Scheduler mainThread() {
return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
}
/** A {@link Scheduler} which executes actions on {@code looper}. */
public static Scheduler from(Looper looper) {
if (looper == null) throw new NullPointerException("looper == null");
return new HandlerScheduler(new Handler(looper));
}
private AndroidSchedulers() {
throw new AssertionError("No instances.");
}
}
- 可以看到最后都是實(shí)例化了HandlerScheduler登下,不同的是Looper的不同茫孔,
- 再來(lái)看看HandlerScheduler的實(shí)現(xiàn)(僅僅貼出了主要的兩個(gè)方法)
@Override
public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
if (run == null) throw new NullPointerException("run == null");
if (unit == null) throw new NullPointerException("unit == null");
run = RxJavaPlugins.onSchedule(run);
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
handler.postDelayed(scheduled, Math.max(0L, unit.toMillis(delay)));
return scheduled;
}
@Override
public Worker createWorker() {
return new HandlerWorker(handler);
}
private static final class HandlerWorker extends Worker {
private final Handler handler;
private volatile boolean disposed;
HandlerWorker(Handler handler) {
this.handler = handler;
}
@Override
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
if (run == null) throw new NullPointerException("run == null");
if (unit == null) throw new NullPointerException("unit == null");
if (disposed) {
return Disposables.disposed();
}
run = RxJavaPlugins.onSchedule(run);
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
Message message = Message.obtain(handler, scheduled);
message.obj = this; // Used as token for batch disposal of this worker's runnables.
handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));
// Re-check disposed state for removing in case we were racing a call to dispose().
if (disposed) {
handler.removeCallbacks(scheduled);
return Disposables.disposed();
}
return scheduled;
}
```
- 最后都是用的handler.postDelayed方法來(lái)做的線程切換,so android上面的Schulder其實(shí)就是用了庐船,Handler機(jī)制~~
RxAndroid使用不當(dāng)會(huì)有內(nèi)存泄漏的哦~~
Nothing is certain in this life. The only thing i know for sure is that. I love you and my life. That is the only thing i know. have a good day